2026-04-02 23:12:36 +08:00

11 KiB
Raw Blame History

title, createTime, tags, categories
title createTime tags categories
使用Docker Compose构建MQTT消息记录服务 2025/06/19 23:30:29
Docker
随记

思路

使用两个Broker将主Broker消息单向桥接至存储Broker挂载一个中间件订阅存储Broker收到的所有消息并存储在数据库中。

实现

主Broker

mosquitto.conf

persistence true
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
listener 9001
port 1883
allow_anonymous true

connection mosquitto_storage
address mosquitto_storage:1883
topic # out 0
start_type automatic

存储Broker

mosquitto.conf

persistence true
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
listener 9001
port 1883
allow_anonymous true

数据库

init.sh

#!/bin/bash
psql -v ON_ERROR_STOP=1 --username "postgres" <<-EOSQL
 CREATE DATABASE mqtt_data;
EOSQL

中间件

main.py

import paho.mqtt.client as mqtt
import psycopg2
from psycopg2 import sql
import logging
import time

class Middleware:
    def __init__(self, main_broker, storage_broker, db_name, db_user, db_password, db_host, db_port, blank_topic_name):
        self.main_broker = main_broker
        self.storage_broker = storage_broker
        self.db_name = db_name
        self.db_user = db_user
        self.db_password = db_password
        self.db_host = db_host
        self.db_port = db_port
        self.blank_topic_name = blank_topic_name

        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

        # 初始化数据库连接
        self.postgres_conn = None
        self.connect_to_database()

        # 初始化MQTT客户端
        self.storage_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
        self.storage_client.on_connect = self.on_connect
        self.storage_client.on_message = self.on_message
        self.storage_client.on_disconnect = self.on_disconnect

    def connect_to_database(self):
        """连接到PostgreSQL数据库支持重试"""
        max_retries = 5
        retry_delay = 5  # 秒

        for attempt in range(max_retries):
            try:
                self.postgres_conn = psycopg2.connect(
                    dbname=self.db_name,
                    user=self.db_user,
                    password=self.db_password,
                    host=self.db_host,
                    port=self.db_port
                )
                self.logger.info("Successfully connected to PostgreSQL database")
                return
            except psycopg2.OperationalError as e:
                self.logger.warning(f"Database connection failed (attempt {attempt+1}/{max_retries}): {e}")
                if attempt < max_retries - 1:
                    time.sleep(retry_delay)
                    retry_delay *= 2  # 指数退避
                else:
                    self.logger.error("Failed to connect to database after multiple attempts")
                    raise

    def start(self):
        """启动MQTT客户端并连接"""
        try:
            self.logger.info(f"Connecting to MQTT broker: {self.storage_broker}")
            self.storage_client.connect(self.storage_broker, 1883, 60)
            self.storage_client.loop_forever()
        except Exception as e:
            self.logger.error(f"Error in MQTT loop: {e}")
        finally:
            self.cleanup()

    def on_connect(self, client, userdata, flags, reason_code, properties):
        if reason_code.is_failure:
            self.logger.error(f"Failed to connect to MQTT broker: {reason_code}")
            return

        self.logger.info(f"Connected to MQTT broker with code {reason_code}")
        client.subscribe("#")
        self.logger.info("Subscribed to all topics (#)")

    def on_disconnect(self, client, userdata, reason_code):
        if reason_code != 0:
            self.logger.warning(f"Unexpected MQTT disconnection: {reason_code}")
            # 尝试重新连接
            time.sleep(5)
            try:
                client.reconnect()
            except Exception as e:
                self.logger.error(f"Reconnection failed: {e}")

    def on_message(self, client, userdata, msg):
        try:
            payload = msg.payload.decode('utf-8')
            self.logger.info(f"Received message from topic: {msg.topic}")

            # 处理表名
            topic_parts = msg.topic.split("/")
            table_name = topic_parts[0] if topic_parts else self.blank_topic_name
            table_name = table_name.replace("/", "_").replace(" ", "_").replace(".", "_")[:50]  # 表名长度限制
            # 分表
            now = time.localtime()
            year_month = f"{now.tm_year % 100:02d}{now.tm_mon:02d}"
            table_name = f"{table_name}_{year_month}"

            # 处理消息内容
            if not payload.strip():
                self.logger.warning(f"Empty payload received on topic: {msg.topic}")
                return

            self.store_message(table_name, msg.topic, payload)
        except UnicodeDecodeError:
            self.logger.error(f"Failed to decode message payload from topic: {msg.topic}")
        except Exception as e:
            self.logger.error(f"Error processing message: {e}")

    def store_message(self, table_name, topic, payload):
        """将消息存储到数据库,尝试提取数值"""
        cursor = None
        try:
            cursor = self.postgres_conn.cursor()

            # 创建表(如果不存在),包含数值列
            create_table_query = sql.SQL("""
                CREATE TABLE IF NOT EXISTS {} (
                    id SERIAL PRIMARY KEY,
                    topic TEXT NOT NULL,
                    payload TEXT NOT NULL,
                    value NUMERIC,
                    timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()
                )
            """).format(sql.Identifier(table_name))

            cursor.execute(create_table_query)

            # 尝试将payload转换为数值
            value_num = None
            try:
                # 尝试直接转换为浮点数
                value_num = float(payload)
                self.logger.info(f"Payload converted to numeric value: {value_num}")
            except (ValueError, TypeError):
                # 转换失败保持为NULL
                pass

            # 插入数据
            insert_query = sql.SQL("""
                INSERT INTO {} (topic, payload, value)
                VALUES (%s, %s, %s)
            """).format(sql.Identifier(table_name))

            cursor.execute(insert_query, (topic, payload, value_num))
            self.postgres_conn.commit()
            self.logger.info(f"Stored message in table '{table_name}'")
        except psycopg2.InterfaceError:
            self.logger.warning("Database connection lost, attempting to reconnect...")
            self.connect_to_database()
        except psycopg2.DatabaseError as e:
            self.logger.error(f"Database error: {e}")
            if cursor:
                cursor.execute("ROLLBACK")
        except Exception as e:
            self.logger.error(f"Unexpected error: {e}")
            if cursor:
                cursor.execute("ROLLBACK")
        finally:
            if cursor:
                cursor.close()

    def cleanup(self):
        """清理资源"""
        self.logger.info("Cleaning up resources...")
        try:
            if self.storage_client.is_connected():
                self.storage_client.disconnect()
        except Exception as e:
            self.logger.error(f"Error disconnecting MQTT client: {e}")

        try:
            if self.postgres_conn and not self.postgres_conn.closed:
                self.postgres_conn.close()
        except Exception as e:
            self.logger.error(f"Error closing database connection: {e}")

if __name__ == "__main__":
    # 配置参数
    MAIN_BROKER = "mosquitto_main"
    STORAGE_BROKER = "mosquitto_storage"
    DB_NAME = "mqtt_data"
    DB_USER = "postgres"
    DB_PASSWORD = "mosquitto"
    DB_HOST = "postgresql"
    DB_PORT = "5432"
    BLANK_TOPIC_NAME = "blank_topic"

    # 创建中间件实例并启动
    middleware = Middleware(
        main_broker=MAIN_BROKER,
        storage_broker=STORAGE_BROKER,
        db_name=DB_NAME,
        db_user=DB_USER,
        db_password=DB_PASSWORD,
        db_host=DB_HOST,
        db_port=DB_PORT,
        blank_topic_name=BLANK_TOPIC_NAME
    )

    try:
        middleware.start()
    except KeyboardInterrupt:
        middleware.logger.info("Shutting down by user request")
        middleware.cleanup()
    except Exception as e:
        middleware.logger.error(f"Unexpected shutdown: {e}")
        middleware.cleanup()

Docker Compose

services:
  mosquitto_main:
    image: eclipse-mosquitto
    container_name: mosquitto_main
    ports:
      - 'custom_port:1883'
      - 'custom_port:9001'
    volumes:
      - /custom_path/mosquitto/main_broker/config:/mosquitto/config
      - /custom_path/mosquitto/main_broker/data:/mosquitto/data
      - /custom_path/mosquitto/main_broker/log:/mosquitto/log
    environment:
      - TZ=Asia/Shanghai
    restart: always

  mosquitto_storage:
    image: eclipse-mosquitto
    container_name: mosquitto_storage
    ports:
      - 'custom_port:1883'
      - 'custom_port:9001'
    volumes:
      - /custom_path/mosquitto/storage_broker/config:/mosquitto/config
      - /custom_path/mosquitto/storage_broker/data:/mosquitto/data
      - /custom_path/mosquitto/storage_broker/log:/mosquitto/log
    environment:
      - TZ=Asia/Shanghai
    restart: always

  postgresql:
    image: postgres:latest
    container_name: mosquitto_postgresql
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: mosquitto
      PGDATA: /var/lib/postgresql/data/pgdata
      TZ: Asia/Shanghai
    ports:
      - 'custom_port:5432'
    volumes:
      - /custom_path/mosquitto/postgresql/init.sh:/docker-entrypoint-initdb.d/init-db.sh
      - postgresql_data:/var/lib/postgresql/data
    restart: always

  middleware:
    image: python:3.10-slim
    container_name: mqtt_middleware
    volumes:
      - /custom_path/mosquitto/middleware/requirements.txt:/requirements.txt
      - /custom_path/mosquitto/middleware/main.py:/middleware.py
    command: >
      sh -c "pip install -i https://pypi.mirrors.ustc.edu.cn/simple --upgrade pip &&
             pip install -i https://pypi.mirrors.ustc.edu.cn/simple -r /requirements.txt &&
             python /middleware.py"
    environment:
      - TZ=Asia/Shanghai
    depends_on:
      - mosquitto_storage
      - postgresql
    restart: always

volumes:
  postgresql_data:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: /data/mosquitto_postgresql_data

直接docker compose up -d拉起即可