這個問題困擾了我估計有一個星期問題,而問題的一開始我沒有任何頭緒,問題的表現就是調用節點去請求任務時,時常報錯:

 

報錯信息諸如:

Protocol Error: , b'\x00\x00\x00\x00\x00\x00\x00\x00\x00*3'
Error while reading from socket: (9, 'Bad file descriptor')
'int' object has no attribute 'decode'
name 'self' is not defined
only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
Protocol Error: ", b'status": "SUCCESS", "result": false, "traceback": null, "children": [], "task_id": "1cbba409-b48a-49a6-b2e3-a5d6d203fc6d"}'

我的celery backend 為redis,一開始,我的排錯心酸歷程如下:

1.可能是redis所在的服務器出口網速不足,導致客戶端從redis取數據延遲,于是調增了系統tcp最大并發數

2.可能是節點服務器性能與網速不足,導致結果寫入redis有延遲或錯誤

于是我花了2000多升級了帶寬與內存,發現然并卵

3.可能是線程鎖的問題,然并卵

4.請求的網址越慢出錯率越高,讓我深信為延遲導致。

5.可能是redis中存儲的數據格式字節數太多,于是修改“yaml”

app = Celery('finger_module_celery', broker=brokers, backend=backend,task_serializer='yaml')

6.去redis里看了下,存儲的結果并沒有問題,所以還是從redis取數據過程出現了問題,

于是我打印詳細點:

基本上判定,只要取結果出現了錯誤,推送任務就會出錯,并且很多時候單線程也會出錯。

處于對celery的信任,我沒有懷疑celery中get()方法有問題。

前幾天想了下,是不是該用個笨方法:把取數據時間延遲一點,確保能取到數據,處于對技術的完美追求,覺得這個方法有點侮辱自己的代碼,沒有試,昨天晚上還是決定嘗試一下。

代碼如下:

def Celery_get(function,list,queue):
    '''
    :param function: 分布式需要推送的函數
    :param list: 函數的args
    :param queue: celery服務端中的任務隊列
    :return: 節點處理完的數據
    '''
    time = 0.1
    try:
        res = function.apply_async(args=list, queue=queue)
    except Exception as e:
        print('推送任務錯誤:', list, e)
        return False
    try:
        while True:
            if res.ready():
                return res.get()
            else:
                sleep(time)
    except Exception as e:
        print('取結果錯誤:',e)
        return False

單獨定義一個推送任務與取結果的函數,用延時來讓直到ready()為真時才取結果,如果不行再延遲。

經過測試發現,貌似就差這么0.1秒,完美運行無瑕疵,沒有再報錯。

2018年12月12日 更新

今天偶然發現文章開頭提到的報錯,在部分情況下也是會受帶寬影響。

如果任務量比較大,分布式節點使用外網鏈接,一定要觀察rabbitmq 服務器的帶寬使用情況,是否已經達到了出口帶寬瓶頸

如下圖:3M左右出口帶寬的機器,在celery使用外網的時候,txkb/s 維持在800多,已經達到了瓶頸;在修改成內網鏈接后 網絡吞吐達到2790多txkb/s,報錯也少了很多,但還會有,所以帶寬問題雖然不是根本問題,但也是可以優化的一個維度。

實時查看流量命令: sar?-n?DEV?1?100

您的支持將鼓勵我們繼續創作!

[微信] 掃描二維碼打賞

[支付寶] 掃描二維碼打賞