353 lines
11 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: 使用Docker Compose构建MQTT消息记录服务
createTime: 2025/06/19 23:30:29
tags:
- Docker
---
## 思路
使用两个Broker将主Broker消息单向桥接至存储Broker挂载一个中间件订阅存储Broker收到的所有消息并存储在数据库中。
## 实现
### 主Broker
`mosquitto.conf`
```ini
persistence true
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
listener 9001
port 1883
allow_anonymous true
connection mosquitto_storage
address mosquitto_storage:1883
topic # out 0
start_type automatic
```
### 存储Broker
`mosquitto.conf`
```ini
persistence true
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
listener 9001
port 1883
allow_anonymous true
```
### 数据库
`init.sh`
```bash
#!/bin/bash
psql -v ON_ERROR_STOP=1 --username "postgres" <<-EOSQL
CREATE DATABASE mqtt_data;
EOSQL
```
### 中间件
`main.py`
```python
import paho.mqtt.client as mqtt
import psycopg2
from psycopg2 import sql
import logging
import time
class Middleware:
def __init__(self, main_broker, storage_broker, db_name, db_user, db_password, db_host, db_port, blank_topic_name):
self.main_broker = main_broker
self.storage_broker = storage_broker
self.db_name = db_name
self.db_user = db_user
self.db_password = db_password
self.db_host = db_host
self.db_port = db_port
self.blank_topic_name = blank_topic_name
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# 初始化数据库连接
self.postgres_conn = None
self.connect_to_database()
# 初始化MQTT客户端
self.storage_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.storage_client.on_connect = self.on_connect
self.storage_client.on_message = self.on_message
self.storage_client.on_disconnect = self.on_disconnect
def connect_to_database(self):
"""连接到PostgreSQL数据库支持重试"""
max_retries = 5
retry_delay = 5 # 秒
for attempt in range(max_retries):
try:
self.postgres_conn = psycopg2.connect(
dbname=self.db_name,
user=self.db_user,
password=self.db_password,
host=self.db_host,
port=self.db_port
)
self.logger.info("Successfully connected to PostgreSQL database")
return
except psycopg2.OperationalError as e:
self.logger.warning(f"Database connection failed (attempt {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
else:
self.logger.error("Failed to connect to database after multiple attempts")
raise
def start(self):
"""启动MQTT客户端并连接"""
try:
self.logger.info(f"Connecting to MQTT broker: {self.storage_broker}")
self.storage_client.connect(self.storage_broker, 1883, 60)
self.storage_client.loop_forever()
except Exception as e:
self.logger.error(f"Error in MQTT loop: {e}")
finally:
self.cleanup()
def on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code.is_failure:
self.logger.error(f"Failed to connect to MQTT broker: {reason_code}")
return
self.logger.info(f"Connected to MQTT broker with code {reason_code}")
client.subscribe("#")
self.logger.info("Subscribed to all topics (#)")
def on_disconnect(self, client, userdata, reason_code):
if reason_code != 0:
self.logger.warning(f"Unexpected MQTT disconnection: {reason_code}")
# 尝试重新连接
time.sleep(5)
try:
client.reconnect()
except Exception as e:
self.logger.error(f"Reconnection failed: {e}")
def on_message(self, client, userdata, msg):
try:
payload = msg.payload.decode('utf-8')
self.logger.info(f"Received message from topic: {msg.topic}")
# 处理表名
topic_parts = msg.topic.split("/")
table_name = topic_parts[0] if topic_parts else self.blank_topic_name
table_name = table_name.replace("/", "_").replace(" ", "_").replace(".", "_")[:50] # 表名长度限制
# 分表
now = time.localtime()
year_month = f"{now.tm_year % 100:02d}{now.tm_mon:02d}"
table_name = f"{table_name}_{year_month}"
# 处理消息内容
if not payload.strip():
self.logger.warning(f"Empty payload received on topic: {msg.topic}")
return
self.store_message(table_name, msg.topic, payload)
except UnicodeDecodeError:
self.logger.error(f"Failed to decode message payload from topic: {msg.topic}")
except Exception as e:
self.logger.error(f"Error processing message: {e}")
def store_message(self, table_name, topic, payload):
"""将消息存储到数据库,尝试提取数值"""
cursor = None
try:
cursor = self.postgres_conn.cursor()
# 创建表(如果不存在),包含数值列
create_table_query = sql.SQL("""
CREATE TABLE IF NOT EXISTS {} (
id SERIAL PRIMARY KEY,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
value NUMERIC,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""").format(sql.Identifier(table_name))
cursor.execute(create_table_query)
# 尝试将payload转换为数值
value_num = None
try:
# 尝试直接转换为浮点数
value_num = float(payload)
self.logger.info(f"Payload converted to numeric value: {value_num}")
except (ValueError, TypeError):
# 转换失败保持为NULL
pass
# 插入数据
insert_query = sql.SQL("""
INSERT INTO {} (topic, payload, value)
VALUES (%s, %s, %s)
""").format(sql.Identifier(table_name))
cursor.execute(insert_query, (topic, payload, value_num))
self.postgres_conn.commit()
self.logger.info(f"Stored message in table '{table_name}'")
except psycopg2.InterfaceError:
self.logger.warning("Database connection lost, attempting to reconnect...")
self.connect_to_database()
except psycopg2.DatabaseError as e:
self.logger.error(f"Database error: {e}")
if cursor:
cursor.execute("ROLLBACK")
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
if cursor:
cursor.execute("ROLLBACK")
finally:
if cursor:
cursor.close()
def cleanup(self):
"""清理资源"""
self.logger.info("Cleaning up resources...")
try:
if self.storage_client.is_connected():
self.storage_client.disconnect()
except Exception as e:
self.logger.error(f"Error disconnecting MQTT client: {e}")
try:
if self.postgres_conn and not self.postgres_conn.closed:
self.postgres_conn.close()
except Exception as e:
self.logger.error(f"Error closing database connection: {e}")
if __name__ == "__main__":
# 配置参数
MAIN_BROKER = "mosquitto_main"
STORAGE_BROKER = "mosquitto_storage"
DB_NAME = "mqtt_data"
DB_USER = "postgres"
DB_PASSWORD = "mosquitto"
DB_HOST = "postgresql"
DB_PORT = "5432"
BLANK_TOPIC_NAME = "blank_topic"
# 创建中间件实例并启动
middleware = Middleware(
main_broker=MAIN_BROKER,
storage_broker=STORAGE_BROKER,
db_name=DB_NAME,
db_user=DB_USER,
db_password=DB_PASSWORD,
db_host=DB_HOST,
db_port=DB_PORT,
blank_topic_name=BLANK_TOPIC_NAME
)
try:
middleware.start()
except KeyboardInterrupt:
middleware.logger.info("Shutting down by user request")
middleware.cleanup()
except Exception as e:
middleware.logger.error(f"Unexpected shutdown: {e}")
middleware.cleanup()
```
### Docker Compose
```yaml
services:
mosquitto_main:
image: eclipse-mosquitto
container_name: mosquitto_main
ports:
- 'custom_port:1883'
- 'custom_port:9001'
volumes:
- /custom_path/mosquitto/main_broker/config:/mosquitto/config
- /custom_path/mosquitto/main_broker/data:/mosquitto/data
- /custom_path/mosquitto/main_broker/log:/mosquitto/log
environment:
- TZ=Asia/Shanghai
restart: always
mosquitto_storage:
image: eclipse-mosquitto
container_name: mosquitto_storage
ports:
- 'custom_port:1883'
- 'custom_port:9001'
volumes:
- /custom_path/mosquitto/storage_broker/config:/mosquitto/config
- /custom_path/mosquitto/storage_broker/data:/mosquitto/data
- /custom_path/mosquitto/storage_broker/log:/mosquitto/log
environment:
- TZ=Asia/Shanghai
restart: always
postgresql:
image: postgres:latest
container_name: mosquitto_postgresql
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: mosquitto
PGDATA: /var/lib/postgresql/data/pgdata
TZ: Asia/Shanghai
ports:
- 'custom_port:5432'
volumes:
- /custom_path/mosquitto/postgresql/init.sh:/docker-entrypoint-initdb.d/init-db.sh
- postgresql_data:/var/lib/postgresql/data
restart: always
middleware:
image: python:3.10-slim
container_name: mqtt_middleware
volumes:
- /custom_path/mosquitto/middleware/requirements.txt:/requirements.txt
- /custom_path/mosquitto/middleware/main.py:/middleware.py
command: >
sh -c "pip install -i https://pypi.mirrors.ustc.edu.cn/simple --upgrade pip &&
pip install -i https://pypi.mirrors.ustc.edu.cn/simple -r /requirements.txt &&
python /middleware.py"
environment:
- TZ=Asia/Shanghai
depends_on:
- mosquitto_storage
- postgresql
restart: always
volumes:
postgresql_data:
driver: local
driver_opts:
type: none
o: bind
device: /data/mosquitto_postgresql_data
```
直接`docker compose up -d`拉起即可