MQTTサブスクライバー用テスト環境の作り方
開発やテスト段階では、ローカルホスト(localhost
)でMQTTブローカー+パブリッシャーダミーを動かすのが一般的です。
以下に、ローカル環境での構成と手順を整理しておきます。
✅ 構成概要
┌────────────┐
│ MQTT Broker│ ← ローカルで動かす(例:Mosquitto)
└────┬───────┘
│
│
┌────▼────┐ ┌─────────────────┐
│Publisher│──▶───▶│Subscriber + 組立 │
│(ダミー) │ │ + データ公開 │
└────────┘ └─────────────────┘
🛠 手順(ローカル開発)
① MQTTブローカー(Mosquitto)を起動
▸ Windows/Mac/Linux
以下どちらか:
-
Docker(おすすめ):
docker run -it -p 1883:1883 eclipse-mosquitto
-
またはインストール:
-
Ubuntu:
sudo apt install mosquitto
-
macOS (brew):
brew install mosquitto
-
② ダミーパブリッシャー(Python)
import time
import json
import paho.mqtt.client as mqtt
client = mqtt.Client()
client.connect("localhost", 1883, 60)
while True:
now = time.time()
for name, value in zip(["p1", "p2", "p3"], [1.1, 2.2, 3.3]):
payload = json.dumps({"name": name, "value": value})
client.publish("test/data", payload)
time.sleep(0.1) # 少し間を空ける
time.sleep(1)
③ サブスクライバー(さっきの組立コード+on_message
)
import json
import time
import paho.mqtt.client as mqtt
# さきほどの Data, DataAssembler, DataReceiver をここに定義またはインポート
receiver = DataReceiver()
assembler = DataAssembler(receiver.receive)
def on_connect(client, userdata, flags, rc):
print("Connected with result code", rc)
client.subscribe("test/data")
def on_message(client, userdata, msg):
payload = json.loads(msg.payload)
name = payload['name']
value = payload['value']
timestamp = time.time()
assembler.update(name, value, timestamp)
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()
🧪 試すときのポイント
-
順序:先に MQTT ブローカー(mosquitto)、次に Subscriber、最後に Publisher を起動。
-
トピック名:
test/data
で統一してます(変更可)。
🔄 補足
-
localhost
→ ブローカーがリモートの場合は IP や FQDN に。 -
デバッグ時は
mosquitto_sub -t 'test/#' -v
で内容確認できます。