RabbitMQ Dockerコンテナでメッセージを永続化する方法

RabbitMQ Dockerコンテナでメッセージを永続化する方法

プライベートな開発で複数のDockerコンテナで並列処理を実装する必要があり、複数コンテナのタスクを管理するためにRabbitMQというOSSのメッセージブローカーを使い始めました。

RabbitMQのデフォルト設定だと、サーバーが停止したりクラッシュするとサーバー上のキューやタスクは削除されてしまいます。
さすがに運用する場合はこの仕様だと困るので、RabbitMQサーバー上のメッセージを永続化させる必要があります。
このメッセージの永続化設定で少しハマったので備忘録的にまとめます。

RabbitMQとは何かという説明は省きます。
詳しくはこちらの記事が分かりやすいです。

実行環境

ライブラリ

  • Python==3.9
  • pika==1.2.0

ディレクトリ構造

.
├── Dockerfile
├── README.md
├── rabitmq-data/
├── poetry.lock
├── pyproject.toml
├── docker-compose.yml
└── mq/
     ├── __init__.py
     └── publish.py

実装コードはGitHubにおいています。

キューとメッセージ

まず、キューを宣言したり、メッセージをパブリッシュ(publish)する際に永続化の設定が必要です。
ただし、RabbitMQでは登録済みのキューの再定義はできません。
永続化したい場合は新規に名前を付けてキューを定義します。
以下はpublishを実装したコードで、RabbitMQのホスト名、トピック名、メッセージをモジュール実行時のコマンドライン引数で指定できるようにしています。
mq/publish.py

import click
import pika
   
   
@click.command()
@click.option(
   "--host",
   type=str,
   required=True
)
@click.option(
   "--topic",
   type=str,
   required=True
)
@click.option(
   "--message",
   type=str,
   required=True
)
def publish(host, topic, message):
   pika_param = pika.ConnectionParameters(host)
   properties = pika.BasicProperties(delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE)

   with pika.BlockingConnection(pika_param) as connection:
       channel = connection.channel()
       channel.queue_declare(queue=topic, durable=True)

       channel.basic_publish(exchange="", routing_key=topic, body=message, properties=properties)


if __name__ == "__main__":
   
   publish()

Dockerコンテナの定義

RabbitMQではノード名に基づいてデータが蓄積されていきます。
※実際にvar/lib/rabbitmq/mnesia/にキューおよびメッセージが蓄積されていますが、ホスト名で紐づけられています。
ノード名はデフォルトではそのインスタンスのホスト名(hostname)が割り当てられます。
従って、Dockerコンテナを実行する際は–hostnameパラメータでホスト名を指定する必要があります。
もしホスト名を指定せずにDockerコンテナを起動するとランダムなhostnameが割り当てられ、いったん再起動するとキューやメッセージのデータをトラッキングできなくなります。
また、メッセージを保存しているdatabase dirのvolume mountも必要です。
上記を考慮したdocker-compose.ymlは以下になります。
appサービスはpublishを実行するためのコンテナです。

version: "3.8"
services:
  app:
    build: ./
    container_name: app
    volumes:
      - ./:/app
    working_dir: /app
  mq:
    hostname: rabbitmq
    container_name: rabbitmq
    image: rabbitmq:3.8.9-management
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - ./rabbitmq-data:/var/lib/rabbitmq

検証

ではメッセージを永続化できているかを検証します。
本記事のリポジトリをクローンし、Dockerイメージのビルド、コンテナを起動します。

docker-compose up -d

コンテナ起動後、RabbitMQ管理コンソール(http://localhost:15672)にWebブラウザで接続します。
まだメッセージをpublishしていないのでno queuesと表示されているはずです。

次にメッセージをpublishします。
以下はホスト名、トピック名、メッセージを実行引数にmq/publish.pyを実行するコマンドです。

docker-compose run --rm app python -m mq.publish --host=rabbitmq --topic=test --message=hello

メッセージのpublishが完了するとキューが表示されます。

では、コンテナを再起動してみます。

docker-compose restart

すると、再起動後もメッセージがそのまま残っていることを確認できるはずです。

最後に

database dirのvolumeマウントやホスト名の設定などのDockerコンテナの定義に関する記事はすぐに見つかったのですが、キューの定義やメッセージをpublishする際の設定が必要だと辿り着くまでにやや時間がかかってしまいました。
Dockerイメージを使った実装でのトラブルシューティングではDockerと絡めた問題だと思いこみがちですが、そもそもそのサービスの仕様理解が浅いことがトラブルの要因だったりする場合も多い気がしている今日この頃です。

参考