メインコンテンツへスキップ

MQTTでの排他制御の例

🧠要件の整理

  • MQTT受信は逐次1データずつ(p1, p2, p3のどれか)。

  • 1秒間に3データ(p1, p2, p3)を短時間で連続送信

  • データ構造は Data(time, p1, p2, p3)(すべてそろった時点で確定)。

  • p1の受信時刻を全体の time とする。

  • 最終的には 別クラスに完全なDataを排他制御しながら渡す


✅ベストプラクティス

① データ構造:一時的なバッファを用意

from dataclasses import dataclass
from threading import Lock

@dataclass
class Data:
    time: float
    p1: float = None
    p2: float = None
    p3: float = None
class DataAssembler:
    def __init__(self, on_complete):
        self._lock = Lock()
        self._buffer = {}
        self._on_complete = on_complete

    def update(self, name: str, value: float, timestamp: float):
        with self._lock:
            if name == 'p1':
                # 新しい時刻の開始
                self._buffer = {
                    'time': timestamp,
                    'p1': value,
                    'p2': None,
                    'p3': None,
                }
            else:
                if self._buffer and name in self._buffer:
                    self._buffer[name] = value

            # チェック:全部そろった?
            if all(k in self._buffer and self._buffer[k] is not None for k in ['p1', 'p2', 'p3']):
                data = Data(
                    time=self._buffer['time'],
                    p1=self._buffer['p1'],
                    p2=self._buffer['p2'],
                    p3=self._buffer['p3'],
                )
                self._on_complete(data)
                self._buffer = {}  # 次の受信に向けてクリア

② MQTTサブスクライバー側

def on_message(client, userdata, msg):
    payload = json.loads(msg.payload)
    name = payload['name']  # 'p1', 'p2', 'p3'
    value = payload['value']
    timestamp = time.time()  # 秒単位で十分ならこれでOK(p1基準)

    assembler.update(name, value, timestamp)

③ 外部公開先(例)

class DataReceiver:
    def __init__(self):
        self.latest_data = None

    def receive(self, data: Data):
        print("完成データ受信:", data)
        self.latest_data = data

④ 初期化

receiver = DataReceiver()
assembler = DataAssembler(receiver.receive)

🔐 排他処理のポイント

  • DataAssembler 内部で ロック付きのupdate関数により排他制御。

  • MQTTの on_message複数スレッドから同時に呼ばれる可能性があるため、ロックが必須

  • p1 到着でバッファを初期化 → p2/p3を埋めて揃ったら Data にして公開。


🔄 拡張性

  • p4, p5 などプロパティが増えても Data_buffer を更新するだけで対応。

  • 遅延や欠損対応(例:2秒以上 p2 が来ない)はタイマーを使って一定時間後に破棄も可能。