Asked 11 months ago by CometCaptain647
How Can I Resolve MQTT Socket Errors and Optimize My Django API Design?
The post content has been automatically edited by the Moderator Agent for consistency and clarity.
Asked 11 months ago by CometCaptain647
The post content has been automatically edited by the Moderator Agent for consistency and clarity.
Hi everyone,
I'm developing a Django REST API that integrates with an MQTT broker to manage home device settings. The API connects to the broker on-demand to fetch real-time device settings without using a database or cache. However, I occasionally encounter socket errors when connecting to the broker. Additionally, I'm evaluating if this real-time approach is optimal or if storing MQTT payloads in a database like InfluxDB might be a better solution.
Here's a brief rundown of the workflow:
The following is the API endpoint code:
PYTHONclass ProductSettingsView(GenericAPIView): permission_classes = [IsAuthenticated] serializer_class = ProductSerializer def post(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if serializer.is_valid(raise_exception=True): product_id = serializer.validated_data['product_id'] product = ALL_POMP_AO_PRODUCTS.get(owner_id=self.request.user.id, id=product_id) user = self.request.user if not user: raise exceptions.BadRequest( {'detail': _("User doesn't exist"), "code": "user_do_not_exist"} ) if not product: raise exceptions.BadRequest( {"detail": _("User/Product relation violation"), "code": "invalid_relation"} ) try: certificate = Certificate.objects.get(client_id=user.id) except Certificate.DoesNotExist: raise exceptions.BadRequest( {"detail": _("Certificate doesn't exist"), "code": "certificate_do_not_exist"} ) try: with MqttClient(user=str(user.uuid), certificate=certificate.certificate, key=certificate.key, product=product) as mqtt_client: message = mqtt_client.get_message() if message: return Response(message, status=status.HTTP_200_OK) else: return Response( {"error": "Timeout or no message received"}, status=status.HTTP_504_GATEWAY_TIMEOUT ) except exceptions.BadRequest as e: return Response(e.detail, status=status.HTTP_400_BAD_REQUEST)
And here is the MQTT client code:
PYTHONclass MqttProduct: def __init__(self, user, certificate, key, product): self.user_name = user self.product = product self.certfile = certificate self.keyfile = key self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) self.ssl_context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile) self.ssl_context.load_verify_locations(cafile=ENV.path("MQTT_SERVER_CA")) self.mqtt_client = mqtt.Client(client_id=self.user_name, clean_session=True) self.mqtt_client.tls_set_context(self.ssl_context) self.mqtt_client.on_connect = self.on_connect self.mqtt_client.on_message = self.on_message self.lock = threading.Condition() def on_connect(self, client, userdata, flags, rc): if rc == 0: print("Connected to broker") self.mqtt_client.subscribe(f"path/{self.product.name}/settings", qos=1) else: print("Failed to connect, return code %d\n", rc) def on_message(self, client, userdata, msg): with self.lock: # Process my message here self.lock.notify() def get_message(self): try: self.connection_ready = threading.Event() self.mqtt_client.connect(self.BROKER, self.BROKERPORT, 60) self.mqtt_client.loop_start() if not self.connection_ready.wait(timeout=10): # Wait for connection print("Connection timeout") return None with self.lock: if not self.lock.wait(timeout=10): print("Timeout waiting for message") return None with self.lock: return list(self.messages.values()) except (Exception, ssl.SSLError) as e: logging.error(f"MQTT connection failed: {e}") raise exceptions.BadRequest( { "detail": _("MQTT Connection Error"), "code": "mqtt_connection_error" } ) finally: self.stop()
I need help with the following questions:
I appreciate any advice or suggestions. Thanks for your help!
Comments 0
•Answers 0
•Views 154
No comments yet.