355 lines
11 KiB
Markdown
355 lines
11 KiB
Markdown
---
|
||
title: 使用Docker Compose构建MQTT消息记录服务
|
||
createTime: 2025/06/19 23:30:29
|
||
tags:
|
||
- Docker
|
||
categories:
|
||
- 随记
|
||
---
|
||
|
||
## 思路
|
||
|
||
使用两个Broker,将主Broker消息单向桥接至存储Broker,挂载一个中间件订阅存储Broker收到的所有消息并存储在数据库中。
|
||
|
||
## 实现
|
||
|
||
### 主Broker
|
||
|
||
`mosquitto.conf`
|
||
|
||
```ini
|
||
persistence true
|
||
persistence_location /mosquitto/data
|
||
log_dest file /mosquitto/log/mosquitto.log
|
||
listener 9001
|
||
port 1883
|
||
allow_anonymous true
|
||
|
||
connection mosquitto_storage
|
||
address mosquitto_storage:1883
|
||
topic # out 0
|
||
start_type automatic
|
||
```
|
||
|
||
### 存储Broker
|
||
|
||
`mosquitto.conf`
|
||
|
||
```ini
|
||
persistence true
|
||
persistence_location /mosquitto/data
|
||
log_dest file /mosquitto/log/mosquitto.log
|
||
listener 9001
|
||
port 1883
|
||
allow_anonymous true
|
||
```
|
||
|
||
### 数据库
|
||
|
||
`init.sh`
|
||
|
||
```bash
|
||
#!/bin/bash
|
||
psql -v ON_ERROR_STOP=1 --username "postgres" <<-EOSQL
|
||
CREATE DATABASE mqtt_data;
|
||
EOSQL
|
||
```
|
||
|
||
### 中间件
|
||
|
||
`main.py`
|
||
|
||
```python
|
||
import paho.mqtt.client as mqtt
|
||
import psycopg2
|
||
from psycopg2 import sql
|
||
import logging
|
||
import time
|
||
|
||
class Middleware:
|
||
def __init__(self, main_broker, storage_broker, db_name, db_user, db_password, db_host, db_port, blank_topic_name):
|
||
self.main_broker = main_broker
|
||
self.storage_broker = storage_broker
|
||
self.db_name = db_name
|
||
self.db_user = db_user
|
||
self.db_password = db_password
|
||
self.db_host = db_host
|
||
self.db_port = db_port
|
||
self.blank_topic_name = blank_topic_name
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
# 初始化数据库连接
|
||
self.postgres_conn = None
|
||
self.connect_to_database()
|
||
|
||
# 初始化MQTT客户端
|
||
self.storage_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
||
self.storage_client.on_connect = self.on_connect
|
||
self.storage_client.on_message = self.on_message
|
||
self.storage_client.on_disconnect = self.on_disconnect
|
||
|
||
def connect_to_database(self):
|
||
"""连接到PostgreSQL数据库,支持重试"""
|
||
max_retries = 5
|
||
retry_delay = 5 # 秒
|
||
|
||
for attempt in range(max_retries):
|
||
try:
|
||
self.postgres_conn = psycopg2.connect(
|
||
dbname=self.db_name,
|
||
user=self.db_user,
|
||
password=self.db_password,
|
||
host=self.db_host,
|
||
port=self.db_port
|
||
)
|
||
self.logger.info("Successfully connected to PostgreSQL database")
|
||
return
|
||
except psycopg2.OperationalError as e:
|
||
self.logger.warning(f"Database connection failed (attempt {attempt+1}/{max_retries}): {e}")
|
||
if attempt < max_retries - 1:
|
||
time.sleep(retry_delay)
|
||
retry_delay *= 2 # 指数退避
|
||
else:
|
||
self.logger.error("Failed to connect to database after multiple attempts")
|
||
raise
|
||
|
||
def start(self):
|
||
"""启动MQTT客户端并连接"""
|
||
try:
|
||
self.logger.info(f"Connecting to MQTT broker: {self.storage_broker}")
|
||
self.storage_client.connect(self.storage_broker, 1883, 60)
|
||
self.storage_client.loop_forever()
|
||
except Exception as e:
|
||
self.logger.error(f"Error in MQTT loop: {e}")
|
||
finally:
|
||
self.cleanup()
|
||
|
||
def on_connect(self, client, userdata, flags, reason_code, properties):
|
||
if reason_code.is_failure:
|
||
self.logger.error(f"Failed to connect to MQTT broker: {reason_code}")
|
||
return
|
||
|
||
self.logger.info(f"Connected to MQTT broker with code {reason_code}")
|
||
client.subscribe("#")
|
||
self.logger.info("Subscribed to all topics (#)")
|
||
|
||
def on_disconnect(self, client, userdata, reason_code):
|
||
if reason_code != 0:
|
||
self.logger.warning(f"Unexpected MQTT disconnection: {reason_code}")
|
||
# 尝试重新连接
|
||
time.sleep(5)
|
||
try:
|
||
client.reconnect()
|
||
except Exception as e:
|
||
self.logger.error(f"Reconnection failed: {e}")
|
||
|
||
def on_message(self, client, userdata, msg):
|
||
try:
|
||
payload = msg.payload.decode('utf-8')
|
||
self.logger.info(f"Received message from topic: {msg.topic}")
|
||
|
||
# 处理表名
|
||
topic_parts = msg.topic.split("/")
|
||
table_name = topic_parts[0] if topic_parts else self.blank_topic_name
|
||
table_name = table_name.replace("/", "_").replace(" ", "_").replace(".", "_")[:50] # 表名长度限制
|
||
# 分表
|
||
now = time.localtime()
|
||
year_month = f"{now.tm_year % 100:02d}{now.tm_mon:02d}"
|
||
table_name = f"{table_name}_{year_month}"
|
||
|
||
# 处理消息内容
|
||
if not payload.strip():
|
||
self.logger.warning(f"Empty payload received on topic: {msg.topic}")
|
||
return
|
||
|
||
self.store_message(table_name, msg.topic, payload)
|
||
except UnicodeDecodeError:
|
||
self.logger.error(f"Failed to decode message payload from topic: {msg.topic}")
|
||
except Exception as e:
|
||
self.logger.error(f"Error processing message: {e}")
|
||
|
||
def store_message(self, table_name, topic, payload):
|
||
"""将消息存储到数据库,尝试提取数值"""
|
||
cursor = None
|
||
try:
|
||
cursor = self.postgres_conn.cursor()
|
||
|
||
# 创建表(如果不存在),包含数值列
|
||
create_table_query = sql.SQL("""
|
||
CREATE TABLE IF NOT EXISTS {} (
|
||
id SERIAL PRIMARY KEY,
|
||
topic TEXT NOT NULL,
|
||
payload TEXT NOT NULL,
|
||
value NUMERIC,
|
||
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||
)
|
||
""").format(sql.Identifier(table_name))
|
||
|
||
cursor.execute(create_table_query)
|
||
|
||
# 尝试将payload转换为数值
|
||
value_num = None
|
||
try:
|
||
# 尝试直接转换为浮点数
|
||
value_num = float(payload)
|
||
self.logger.info(f"Payload converted to numeric value: {value_num}")
|
||
except (ValueError, TypeError):
|
||
# 转换失败,保持为NULL
|
||
pass
|
||
|
||
# 插入数据
|
||
insert_query = sql.SQL("""
|
||
INSERT INTO {} (topic, payload, value)
|
||
VALUES (%s, %s, %s)
|
||
""").format(sql.Identifier(table_name))
|
||
|
||
cursor.execute(insert_query, (topic, payload, value_num))
|
||
self.postgres_conn.commit()
|
||
self.logger.info(f"Stored message in table '{table_name}'")
|
||
except psycopg2.InterfaceError:
|
||
self.logger.warning("Database connection lost, attempting to reconnect...")
|
||
self.connect_to_database()
|
||
except psycopg2.DatabaseError as e:
|
||
self.logger.error(f"Database error: {e}")
|
||
if cursor:
|
||
cursor.execute("ROLLBACK")
|
||
except Exception as e:
|
||
self.logger.error(f"Unexpected error: {e}")
|
||
if cursor:
|
||
cursor.execute("ROLLBACK")
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
def cleanup(self):
|
||
"""清理资源"""
|
||
self.logger.info("Cleaning up resources...")
|
||
try:
|
||
if self.storage_client.is_connected():
|
||
self.storage_client.disconnect()
|
||
except Exception as e:
|
||
self.logger.error(f"Error disconnecting MQTT client: {e}")
|
||
|
||
try:
|
||
if self.postgres_conn and not self.postgres_conn.closed:
|
||
self.postgres_conn.close()
|
||
except Exception as e:
|
||
self.logger.error(f"Error closing database connection: {e}")
|
||
|
||
if __name__ == "__main__":
|
||
# 配置参数
|
||
MAIN_BROKER = "mosquitto_main"
|
||
STORAGE_BROKER = "mosquitto_storage"
|
||
DB_NAME = "mqtt_data"
|
||
DB_USER = "postgres"
|
||
DB_PASSWORD = "mosquitto"
|
||
DB_HOST = "postgresql"
|
||
DB_PORT = "5432"
|
||
BLANK_TOPIC_NAME = "blank_topic"
|
||
|
||
# 创建中间件实例并启动
|
||
middleware = Middleware(
|
||
main_broker=MAIN_BROKER,
|
||
storage_broker=STORAGE_BROKER,
|
||
db_name=DB_NAME,
|
||
db_user=DB_USER,
|
||
db_password=DB_PASSWORD,
|
||
db_host=DB_HOST,
|
||
db_port=DB_PORT,
|
||
blank_topic_name=BLANK_TOPIC_NAME
|
||
)
|
||
|
||
try:
|
||
middleware.start()
|
||
except KeyboardInterrupt:
|
||
middleware.logger.info("Shutting down by user request")
|
||
middleware.cleanup()
|
||
except Exception as e:
|
||
middleware.logger.error(f"Unexpected shutdown: {e}")
|
||
middleware.cleanup()
|
||
```
|
||
|
||
### Docker Compose
|
||
|
||
```yaml
|
||
services:
|
||
mosquitto_main:
|
||
image: eclipse-mosquitto
|
||
container_name: mosquitto_main
|
||
ports:
|
||
- 'custom_port:1883'
|
||
- 'custom_port:9001'
|
||
volumes:
|
||
- /custom_path/mosquitto/main_broker/config:/mosquitto/config
|
||
- /custom_path/mosquitto/main_broker/data:/mosquitto/data
|
||
- /custom_path/mosquitto/main_broker/log:/mosquitto/log
|
||
environment:
|
||
- TZ=Asia/Shanghai
|
||
restart: always
|
||
|
||
mosquitto_storage:
|
||
image: eclipse-mosquitto
|
||
container_name: mosquitto_storage
|
||
ports:
|
||
- 'custom_port:1883'
|
||
- 'custom_port:9001'
|
||
volumes:
|
||
- /custom_path/mosquitto/storage_broker/config:/mosquitto/config
|
||
- /custom_path/mosquitto/storage_broker/data:/mosquitto/data
|
||
- /custom_path/mosquitto/storage_broker/log:/mosquitto/log
|
||
environment:
|
||
- TZ=Asia/Shanghai
|
||
restart: always
|
||
|
||
postgresql:
|
||
image: postgres:latest
|
||
container_name: mosquitto_postgresql
|
||
environment:
|
||
POSTGRES_USER: postgres
|
||
POSTGRES_PASSWORD: mosquitto
|
||
PGDATA: /var/lib/postgresql/data/pgdata
|
||
TZ: Asia/Shanghai
|
||
ports:
|
||
- 'custom_port:5432'
|
||
volumes:
|
||
- /custom_path/mosquitto/postgresql/init.sh:/docker-entrypoint-initdb.d/init-db.sh
|
||
- postgresql_data:/var/lib/postgresql/data
|
||
restart: always
|
||
|
||
middleware:
|
||
image: python:3.10-slim
|
||
container_name: mqtt_middleware
|
||
volumes:
|
||
- /custom_path/mosquitto/middleware/requirements.txt:/requirements.txt
|
||
- /custom_path/mosquitto/middleware/main.py:/middleware.py
|
||
command: >
|
||
sh -c "pip install -i https://pypi.mirrors.ustc.edu.cn/simple --upgrade pip &&
|
||
pip install -i https://pypi.mirrors.ustc.edu.cn/simple -r /requirements.txt &&
|
||
python /middleware.py"
|
||
environment:
|
||
- TZ=Asia/Shanghai
|
||
depends_on:
|
||
- mosquitto_storage
|
||
- postgresql
|
||
restart: always
|
||
|
||
volumes:
|
||
postgresql_data:
|
||
driver: local
|
||
driver_opts:
|
||
type: none
|
||
o: bind
|
||
device: /data/mosquitto_postgresql_data
|
||
```
|
||
|
||
直接`docker compose up -d`拉起即可
|