メインコンテンツへスキップ

MQTTクライアントにおける排他制御

🔐はじめに

MQTTは軽量なPub/Subモデルのプロトコルで、IoTやリアルタイム通信に広く利用されています。
複数スレッドや非同期処理が絡むクライアント実装では、受信データや共有リソースへのアクセス競合(Race Condition)を防ぐ排他制御が重要になります。

この記事では、MQTTクライアントでデータを安全に扱うための排他制御の基本を解説します。


🧵基本概念:なぜ排他制御が必要なのか?

🔁 MQTTは非同期通信が基本

  • on_message()などのコールバック関数は非同期に実行される

  • サブスクリプションにより複数のトピックから同時にデータが届く可能性がある

  • 受信データをグローバル変数やリスト、辞書などに格納している場合、他スレッドとの競合が発生する

⚠️ 排他制御が必要な例

  • 複数トピックからデータを受けて1つのデータ構造にまとめて保存する

  • データを受信している間に、別スレッドで処理や送信を行う

  • ログファイル外部デバイスへの書き込みが発生する


🛠️ よく使われる排他制御の方法(Python編)

Pythonのpaho-mqttライブラリを例に、主な排他制御の方法を紹介します。

🔒 threading.Lockを使う(最も基本的な方法)

import threading
import paho.mqtt.client as mqtt

lock = threading.Lock()
shared_data = {}

def on_message(client, userdata, msg):
    with lock:
        shared_data[msg.topic] = msg.payload.decode()

【情報】
with lock:構文で囲んだブロックは同時に1つのスレッドしか実行できません。他スレッドがロックを取得しようとすると待機状態になります。


🧳 queue.Queueを使って非同期処理

from queue import Queue
import threading

data_queue = Queue()

def on_message(client, userdata, msg):
    data_queue.put((msg.topic, msg.payload.decode()))

def consumer():
    while True:
        topic, payload = data_queue.get()
        # 安全に処理できる

Queueを使うと、スレッド間通信の安全性が保証されているため、受信スレッドと処理スレッドを分離できます。


🧬 asyncioとの併用(高度な構成)

非同期アプリでMQTTを扱う場合、paho-mqttの代わりにgmqttasyncio-mqttを用いることで、排他制御をasyncio.Lockで実現できます。

import asyncio
from asyncio_mqtt import Client

lock = asyncio.Lock()
shared_data = {}

async def main():
    async with Client("broker.hivemq.com") as client:
        async with client.unfiltered_messages() as messages:
            await client.subscribe("topic/test")
            async for message in messages:
                async with lock:
                    shared_data[message.topic] = message.payload.decode()

asyncio.Lockは、イベントループ内での並行制御に適したロックです。


🧭 どの手法を選ぶべき?

処理スタイル おすすめの排他制御手法
マルチスレッド threading.Lock or Queue
シングルスレッド + 非同期 asyncio.Lock
メッセージを貯めて後処理 Queue
書き込み対象が外部ファイル・DB すべてにLock推奨

🧩補足:排他制御を使わないとどうなる?

  • データ構造が壊れる(辞書に複数スレッドから同時に書き込み)

  • 途中の処理結果が消える(更新が上書きされる)

  • プログラムが不安定になる(例:RuntimeError: dictionary changed size during iteration

こうしたバグは再現が難しく、見つけにくいため、予防的に排他制御を導入することが重要です。


🏁まとめ

  • MQTTクライアントは非同期にメッセージを受信するため、排他制御が必要

  • LockQueueを使ってデータ構造や処理を守る

  • 処理スタイルに合わせた制御方法を選ぶのがコツ

  • 排他制御を怠ると、予期せぬバグやデータ破壊につながる