49 lines
1.2 KiB
Python
49 lines
1.2 KiB
Python
|
import paho.mqtt.client as mqtt
|
||
|
from influxdb import InfluxDBClient
|
||
|
import datetime
|
||
|
import logging
|
||
|
import json
|
||
|
import os
|
||
|
|
||
|
|
||
|
def on_connect(client, userdata, flags, rc):
|
||
|
for topic in topics:
|
||
|
client.subscribe(topic)
|
||
|
|
||
|
|
||
|
def on_message(client, userdata, msg):
|
||
|
current_time = datetime.datetime.utcnow().isoformat()
|
||
|
data = json.loads(msg.payload)
|
||
|
print(data)
|
||
|
json_body = [
|
||
|
{
|
||
|
"measurement": msg.topic,
|
||
|
"tags": {},
|
||
|
"time": current_time,
|
||
|
"fields": data
|
||
|
}
|
||
|
]
|
||
|
logging.info(json_body)
|
||
|
influx_client.write_points(json_body)
|
||
|
|
||
|
if 'TOPICS' not in os.environ:
|
||
|
print("At least one topic is needed.")
|
||
|
exit(1)
|
||
|
topics = os.environ['TOPICS'].split(' ')
|
||
|
print(topics)
|
||
|
|
||
|
logging.basicConfig(level=logging.INFO)
|
||
|
influx_client = InfluxDBClient(os.environ['INFLUX_HOST'], int(os.environ['INFLUX_PORT']), database=os.environ["INFLUX_DB"])
|
||
|
influx_client.create_database(os.environ["INFLUX_DB"])
|
||
|
|
||
|
|
||
|
client = mqtt.Client()
|
||
|
|
||
|
client.on_connect = on_connect
|
||
|
client.on_message = on_message
|
||
|
|
||
|
client.connect(os.environ['MQTT_HOST'], int(os.environ["MQTT_PORT"]), 60)
|
||
|
|
||
|
print("Starting")
|
||
|
|
||
|
client.loop_forever()
|