MQTTでの排他制御の例
🧠要件の整理
-
MQTT受信は逐次1データずつ(p1, p2, p3のどれか)。
-
1秒間に3データ(p1, p2, p3)を短時間で連続送信。
-
データ構造は
Data(time, p1, p2, p3)
(すべてそろった時点で確定)。 -
p1の受信時刻を全体の
time
とする。 -
最終的には 別クラスに完全なDataを排他制御しながら渡す。
✅ベストプラクティス
① データ構造:一時的なバッファを用意
from dataclasses import dataclass
from threading import Lock
@dataclass
class Data:
time: float
p1: float = None
p2: float = None
p3: float = None
class DataAssembler:
def __init__(self, on_complete):
self._lock = Lock()
self._buffer = {}
self._on_complete = on_complete
def update(self, name: str, value: float, timestamp: float):
with self._lock:
if name == 'p1':
# 新しい時刻の開始
self._buffer = {
'time': timestamp,
'p1': value,
'p2': None,
'p3': None,
}
else:
if self._buffer and name in self._buffer:
self._buffer[name] = value
# チェック:全部そろった?
if all(k in self._buffer and self._buffer[k] is not None for k in ['p1', 'p2', 'p3']):
data = Data(
time=self._buffer['time'],
p1=self._buffer['p1'],
p2=self._buffer['p2'],
p3=self._buffer['p3'],
)
self._on_complete(data)
self._buffer = {} # 次の受信に向けてクリア
② MQTTサブスクライバー側
def on_message(client, userdata, msg):
payload = json.loads(msg.payload)
name = payload['name'] # 'p1', 'p2', 'p3'
value = payload['value']
timestamp = time.time() # 秒単位で十分ならこれでOK(p1基準)
assembler.update(name, value, timestamp)
③ 外部公開先(例)
class DataReceiver:
def __init__(self):
self.latest_data = None
def receive(self, data: Data):
print("完成データ受信:", data)
self.latest_data = data
④ 初期化
receiver = DataReceiver()
assembler = DataAssembler(receiver.receive)
🔐 排他処理のポイント
-
DataAssembler
内部で ロック付きのupdate
関数により排他制御。 -
MQTTの
on_message
は複数スレッドから同時に呼ばれる可能性があるため、ロックが必須。 -
p1
到着でバッファを初期化 →p2
/p3
を埋めて揃ったらData
にして公開。
🔄 拡張性
-
p4
,p5
などプロパティが増えてもData
と_buffer
を更新するだけで対応。 -
遅延や欠損対応(例:2秒以上
p2
が来ない)はタイマーを使って一定時間後に破棄も可能。