MQTTクライアントにおける排他制御
🔐はじめに
MQTTは軽量なPub/Subモデルのプロトコルで、IoTやリアルタイム通信に広く利用されています。
複数スレッドや非同期処理が絡むクライアント実装では、受信データや共有リソースへのアクセス競合(Race Condition)を防ぐ排他制御が重要になります。
この記事では、MQTTクライアントでデータを安全に扱うための排他制御の基本を解説します。
🧵基本概念:なぜ排他制御が必要なのか?
🔁 MQTTは非同期通信が基本
-
on_message()
などのコールバック関数は非同期に実行される -
サブスクリプションにより複数のトピックから同時にデータが届く可能性がある
-
受信データをグローバル変数やリスト、辞書などに格納している場合、他スレッドとの競合が発生する
⚠️ 排他制御が必要な例
-
複数トピックからデータを受けて1つのデータ構造にまとめて保存する
-
データを受信している間に、別スレッドで処理や送信を行う
-
ログファイルや外部デバイスへの書き込みが発生する
🛠️ よく使われる排他制御の方法(Python編)
Pythonのpaho-mqtt
ライブラリを例に、主な排他制御の方法を紹介します。
🔒 threading.Lock
を使う(最も基本的な方法)
import threading
import paho.mqtt.client as mqtt
lock = threading.Lock()
shared_data = {}
def on_message(client, userdata, msg):
with lock:
shared_data[msg.topic] = msg.payload.decode()
【情報】with lock:
構文で囲んだブロックは同時に1つのスレッドしか実行できません。他スレッドがロックを取得しようとすると待機状態になります。
🧳 queue.Queue
を使って非同期処理
from queue import Queue
import threading
data_queue = Queue()
def on_message(client, userdata, msg):
data_queue.put((msg.topic, msg.payload.decode()))
def consumer():
while True:
topic, payload = data_queue.get()
# 安全に処理できる
Queueを使うと、スレッド間通信の安全性が保証されているため、受信スレッドと処理スレッドを分離できます。
🧬 asyncio
との併用(高度な構成)
非同期アプリでMQTTを扱う場合、paho-mqtt
の代わりにgmqtt
やasyncio-mqtt
を用いることで、排他制御をasyncio.Lock
で実現できます。
import asyncio
from asyncio_mqtt import Client
lock = asyncio.Lock()
shared_data = {}
async def main():
async with Client("broker.hivemq.com") as client:
async with client.unfiltered_messages() as messages:
await client.subscribe("topic/test")
async for message in messages:
async with lock:
shared_data[message.topic] = message.payload.decode()
asyncio.Lock
は、イベントループ内での並行制御に適したロックです。
🧭 どの手法を選ぶべき?
処理スタイル | おすすめの排他制御手法 |
---|---|
マルチスレッド | threading.Lock or Queue |
シングルスレッド + 非同期 | asyncio.Lock |
メッセージを貯めて後処理 | Queue |
書き込み対象が外部ファイル・DB | すべてにLock 推奨 |
🧩補足:排他制御を使わないとどうなる?
-
データ構造が壊れる(辞書に複数スレッドから同時に書き込み)
-
途中の処理結果が消える(更新が上書きされる)
-
プログラムが不安定になる(例:
RuntimeError: dictionary changed size during iteration
)
こうしたバグは再現が難しく、見つけにくいため、予防的に排他制御を導入することが重要です。
🏁まとめ
-
MQTTクライアントは非同期にメッセージを受信するため、排他制御が必要
-
Lock
やQueue
を使ってデータ構造や処理を守る -
処理スタイルに合わせた制御方法を選ぶのがコツ
-
排他制御を怠ると、予期せぬバグやデータ破壊につながる