Skip to Content
This is the Beta version of our new Learning Paths approach. Please email feedback.

MQTT Connect, Publish and Subscribe

mqtt_connect_pub_sub.ino
/* Project: Smart Home Component: Arduino Ek R4 MQTT over Built-in WiFi Sketch Description: This sketch connects to an MQTT broker, publishes data to topics and subscribes to (receives data from) topics. The sketch also has the option to work with data in JSON format. Author: STEMVentor Educonsulting This code is copyrighted. Please do not reuse or share for any purpose other than for learning with subscribed STEMVentor programs. This code is for educational purposes only and is not for production use. */ // There will be two topics, one for sensor data and one for control signals (similar to services in BLE). // If multiple boards are used, each board will have a different topic on the MQTT broker. // Topics should ideally be named with the following convention: board_name.sensor.data and board_name.control.signals // Data will be transmitted via MQTT as serialized JSON (a string with all keys and values in ""). // The structure will mimic the BLE approach (characteristic uuid and value) // JSON Object -> Serialized JSON -> Deserialized JSON -> JSON Object // We will use the ArduinoJSON library for this. // We will use the BLE UUIDs as identifiers when sending control signals and sensor data using MQTT. // The app will publish a control signal message with the following JSON structure (stringified): // { // uuid: [String], // value: [integer], // } // // The board will publish a sensor data message with the following JSON structure (stringified): // { // uuid: [String], // value: [integer], // } // // LIBRARIES /* There are two popular libraries for Arduino MQTT: PubSubClient by Nick O'Leary and ArduinoMqttClient by Arduino. The PubSubClient is easier to get started with for basic MQTT tasks, while ArduinoMqttClient provides more features and control for advanced applications and specific hardware environments. We will use PubSubClient for now. */ // Install the library "PubSubClient" by Nick O'Leary from the IDE library manager. // There is a newer version "PubSubClient3" forked from the original but maintained by another developer. Watch that. #include <PubSubClient.h> // Install the library "ArduinoJson" by Benoit Blanchon from the IDE library manager. #include <ArduinoJson.h> // The WiFi library is loaded along with the board package. #include <WiFiS3.h> // PIN DEFINTIONS // GLOBAL CONSTANTS // GLOBAL VARIABLES // Allocate memory for the JSON document // The 256 inside the brackets <> or () is the capacity of the allocated memory pool in bytes. // Change this value as required by the expected JSON document. // Use arduinojson.org/v6/assistant to compute the capacity. // StaticJsonDocument<N> allocates memory on the stack. StaticJsonDocument<256> json_doc_control_signal; StaticJsonDocument<256> json_doc_sensor_data; // This will receive a JSON string from an MQTT topic and deserialize that to a JSON object. void mqttCallback(char* topic, byte* payload, unsigned int length) { if(OPERATING_MODE == 'D'){ Serial.print("Message arrived ["); Serial.print(topic); Serial.print("] "); for (int i=0;i<length;i++) { Serial.print((char)payload[i]); } } // Convert the JSON string to a JSON object. DeserializationError error = deserializeJson(json_doc_control_signal, payload); if(error){ Serial.print(F("deserializeJson() failed: ")); Serial.println(error.c_str()); return; }else{ // Control signals will be in the same format for MQTT and BLE // And a common function will process them. // Extract the values String characteristic_uuid = json_doc_control_signal["characteristic_uuid"]; const uint8_t value = json_doc_control_signal["value"]; if(OPERATING_MODE == 'D'){ Serial.println(characteristic_uuid); Serial.println(value); } processControlSignal(characteristic_uuid, value); // Extract the values // const char* pin = json_doc_control_signal["pin"]; // const uint8_t value = json_doc_control_signal["value"]; // const char* control_type = json_doc_control_signal["control_type"]; // // Print the values // Serial.println(pin); // Serial.println(value); // Serial.println(control_type); // // Send the control signal value to the pin. // if(control_type == "Digital"){ // digitalWrite((int)pin, value); // }else if(control_type == "PWM"){ // analogWrite((int)pin, value); // }else if(control_type == "Servo"){ // moveServoByDegrees((int)pin, value); // } } } // INITIALIZE OBJECTS /* Libraries usually follow an object-oriented approach that requires * an instance of the class to call its methods. */ WiFiClient wifi_client; //The client object for WiFi communication. // PubSubClient pubsub_client(wifi_client); // The client object for communication with the MQTT broker. PubSubClient pubsub_client(MQTT_SERVER, MQTT_PORT, mqttCallback, wifi_client); // LOCAL FUNCTIONS void connectToMQTTServer(){ //Connect to the MQTT broker // pubsub_client.setServer(MQTT_SERVER, MQTT_PORT); // pubsub_client.setCallback(callback); // Loop until we're reconnected while (!pubsub_client.connected()) { Serial.print("Attempting MQTT connection..."); // Attempt to connect. // Use a unique client id since a second attempt with the same id // will disconnect the previous connection with the same id. // This can happen if you try the same code with two boards at the same time. // Also, interesingly, using a public code sample and a public broker leads to this // issue since there is a high chance someone else is using the same code and broker. if (pubsub_client.connect("stemventor")) { Serial.println("connected"); // Once connected, publish an announcement... json_doc_sensor_data["uuid"] = "Test Message"; json_doc_sensor_data["value"] = "Connected and Publishing"; // Convert JSON object to JSON string. char mqtt_message_outgoing[256]; serializeJson(json_doc_sensor_data, mqtt_message_outgoing); // pubsub_client.publish("stemventor_pubtop","hello world"); pubsub_client.publish(SENSOR_DATA_TOPIC, mqtt_message_outgoing); // ... and subscribe pubsub_client.subscribe(CONTROL_SIGNALS_TOPIC); } else { if(OPERATING_MODE == 'D'){ Serial.print("failed, rc="); Serial.print(pubsub_client.state()); Serial.println(" try again in 5 seconds"); } // Wait 5 seconds before retrying delay(5000); } } } void publishToMQTTServer(String uuid, uint8_t value){ // Assuming there is a connection to the MQTT broker. json_doc_sensor_data["uuid"] = uuid; json_doc_sensor_data["value"] = value; // Convert JSON object to JSON string. char mqtt_message_sensor_data[256]; serializeJson(json_doc_sensor_data, mqtt_message_sensor_data); // pubsub_client.publish("stemventor_pubtop","hello world"); pubsub_client.publish(SENSOR_DATA_TOPIC, mqtt_message_sensor_data); } void keepaliveMQTTConnection(){ if (!pubsub_client.connected()) { if(OPERATING_MODE == 'D'){ Serial.println("Not connected...reconnecting..."); } connectToMQTTServer(); } pubsub_client.loop(); }