Source

django-celery-about / source / transport_message.rst

Kombu Virtual Transport Message

Kombu Virtual Transport Message

kombu.transport.virtual.Message

class Message(base.Message):
...
def __init__(self):
    ...
    fields = {'body': body,
              'delivery_tag': properties['delivery_tag'],
              'content_type': payload.get('content-type'),
              'content_encoding': payload.get('content-encoding'),
              'headers': payload.get('headers'),
              'properties': properties,
              'delivery_info': properties.get('delivery_info'),
              'postencode': 'utf-8'}
    ...

MessageはJSONにシリアライズ

{
    "body": "gAJ9c......",
    "headers": {},

    "content-type": "application/x-python-serialize",
    "content-encoding": "binary",

    "properties": {
        "name": "celery",
        "exclusive": false,
        "durable": true,
        "body_encoding": "base64",
        "delivery_info": {
            "priority": 0,
            "routing_key": "celery",
            "exchange": "celery"
        },
        "delivery_mode": 2,
        "no_ack": false,
        "alias": null,
        "queue_arguments": null,
        "binding_arguments": null,
        "delivery_tag": 1,
        "auto_delete": false
    }
}

Headers

  • どうつかう?
  • compression : 圧縮されているかどうか
  • redelivered : 再送されたとか(?)

Serialization

Content-Type

  • Body のシリアライゼーション
  • Serializer で決まります。

Content-Encoding

  • Serializer で決まります。
  • utf-8, binary, or us-ascii ...

Transport Properties

  • name

    • Exchange の名前。無い場合もある。Celeryのデフォルトは "celery"
  • exclusive

    • true : 現在のConnectionからのみConsumeできます。auto_delete=trueに強制的になります。
    • false : 他のConnectionでもConsumeできます(デフォルト )
  • durable

    • true : Workerがリスタートした時に、実行する(デフォルト)
    • false : Workerが停止した時に purge される
  • body_encoding

    • Virtual Transportではbase64 です。バイナリ転送で問題が発生することがあったので、今は必ずbase64です。
    • RabbitMQだとbinaryもあるかも。
  • delivery_info

    • Message が ChannelでPublishされた時のパラメータ

      • exchage : Publishされた時のExchange
      • routing_key : ルーティングキー
      • priority : メッセージプライオリティ(どうやってつかうかな)
  • delivery_mode

    • 2 : persisitent (デフォルト)
    • 1 : transient
  • no_ack

    • false : ConsumerしたらAcknkowlege します。(デフォルト )
    • true : Acknowlegeしません。
  • alias

    • Kombu では未使用
  • queue_arguments

    • Queueの宣言に対する追加属性
  • binding_arguments

    • Queueのbinding に対する追加属性
  • delivery_tag

    • ampq の deliver_tag (よくわからん)
  • auto_delete

    • false : Exchangeを使っている全てのQueueが終了しても、Exchangeを削除しない (デフォルト )
    • true : .... 削除する。