メインコンテンツへスキップ

MQTTサブスクライバー用テスト環境の作り方

開発やテスト段階では、ローカルホスト(localhost)でMQTTブローカー+パブリッシャーダミーを動かすのが一般的です。

以下に、ローカル環境での構成と手順を整理しておきます。


✅ 構成概要

┌────────────┐
│ MQTT Broker│ ← ローカルで動かす(例:Mosquitto)
└────┬───────┘
     │
     │
┌────▼────┐       ┌─────────────────┐
│Publisher│──▶───▶│Subscriber + 組立 │
│(ダミー) │       │ + データ公開     │
└────────┘       └─────────────────┘

🛠 手順(ローカル開発)

① MQTTブローカー(Mosquitto)を起動

▸ Windows/Mac/Linux

以下どちらか:

  • Docker(おすすめ):

docker run -it -p 1883:1883 eclipse-mosquitto
  • またはインストール

    • Ubuntu: sudo apt install mosquitto

    • macOS (brew): brew install mosquitto


② ダミーパブリッシャー(Python)

import time
import json
import paho.mqtt.client as mqtt

client = mqtt.Client()
client.connect("localhost", 1883, 60)

while True:
    now = time.time()
    for name, value in zip(["p1", "p2", "p3"], [1.1, 2.2, 3.3]):
        payload = json.dumps({"name": name, "value": value})
        client.publish("test/data", payload)
        time.sleep(0.1)  # 少し間を空ける
    time.sleep(1)

③ サブスクライバー(さっきの組立コード+on_message

import json
import time
import paho.mqtt.client as mqtt

# さきほどの Data, DataAssembler, DataReceiver をここに定義またはインポート

receiver = DataReceiver()
assembler = DataAssembler(receiver.receive)

def on_connect(client, userdata, flags, rc):
    print("Connected with result code", rc)
    client.subscribe("test/data")

def on_message(client, userdata, msg):
    payload = json.loads(msg.payload)
    name = payload['name']
    value = payload['value']
    timestamp = time.time()
    assembler.update(name, value, timestamp)

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()

🧪 試すときのポイント

  • 順序:先に MQTT ブローカー(mosquitto)、次に Subscriber、最後に Publisher を起動。

  • トピック名test/data で統一してます(変更可)。


🔄 補足

  • localhost → ブローカーがリモートの場合は IP や FQDN に。

  • デバッグ時は mosquitto_sub -t 'test/#' -v で内容確認できます。