MQTT
For an API reference, see API Reference/MQTT
The MQTT module is an interface to the MQTT stack located on the onboard modem on the AVR-IoT Cellular Mini. MQTT is a widely-used communication protocol in IoT and is classified as a All messages are sent on a topic. Every device can subscribe to different topics and receives all messages on said topics. MQTT is a lightweight implementation of the said messaging scheme.PubSub protocol. Services such as Amazon Web Services (AWS) use MQTT to communicate with IoT Nodes. MqttClient
is a Also called the "Singleton Pattern"Singleton Class, meaning there can only be one instance of it. Singleton allows direct calls. For instance MqttClient.begin()
.
MQTT can be used through two mechanisms, polling and interrupt. It can be configured to use TLS and the onboard secure element (ECC608B).
Example
This example uses polling without TLS.
// Initialize the modem and connect to the network
Lte.begin();
// Connect to "mqttbroker.mydomain.com:1883" without encryption, without TLS and
// optional authentication by username and password
MqttClient.begin("myThingName",
"mqttbroker.mydomain.com",
1883,
false,
60,
false,
"<optional username>",
"<optional password>");
// Subscribe to a topic
MqttClient.subscribe("aSubTopic");
while(true) {
// Read the message
String message = MqttClient.readMessage(MQTT_SUB_TOPIC);
if (message != "") { // If no message, readMessage returns an empty string
Log.info(message);
}
// Publish the message back to another topic
MqttClient.publish("anotherTopic", message);
}
Initialization & Dependencies
Remember to connect the Antenna to the board before running any examples that use a wireless feature or web service
Initialize by calling MqttClient.begin()
. It takes the following arguments:
Device Name
: A unique identifier for the deviceBroker URL
: The MQTT BrokerBroker Port
: The MQTT Broker portUse TLS
: If true, connect to the broker using TLSKeep Alive
(optional, default: 1200): How often the broker is pingedUse ECC
(optional, default: true): If true, use the ECC to sign messagesUsername
(optional, default: no username): If set, use a username for authenticationPassword
(optional, default: no password): If set, use a password for authenticationTimeout
(optional, default: 30000): Timeout in milliseconds for connecting to the brokerPrint Messages
(optional, default: true): Prints "Connecting to broker..."
Calling MqttClient.begin
is The function does not return before the operation succeeds or an error occurs.blocking. It will attempt to connect to the broker within the given timeout and only return once it succeeds, an error occurs or it times out.
// Connect to "mqttbroker.mydomain.com:1883" without encryption, without TLS and
// optional authentication by username and password
MqttClient.begin("myThingName",
"mqttbroker.mydomain.com",
1883,
false,
1200,
false,
"<optional username>",
"<optional password>",
<optional timeout>);
AWS
For AWS applications, the module can be initialized with MqttClient.beginAWS()
instead of MqttClient.begin()
. The call only takes an optional keep alive
argument and assumes the device has been provisioned for AWS. Except for the beginAWS
call, the usage is the same as for any other MQTT broker. See the Provisioning User Guide for more information.
// Connect to AWS
MqttClient.beginAWS(<optional keep alive, defaults to 1200>);
AWS implements a security mechanism for which devices are allowed to perform various actions on specified resources. These rules are implemented as policies and must be configured for AWS to work correctly. The IoT Provisioning Tool creates a default policy named zt_policy when either the -m mar
or -m jitr
flag is provided.
The default policy only allows for publication and subscription on the format thing_name/topic
, where thing_name
is the name of the board, and topic
is any arbitrary topic. The thing_name
can be found
- As the name of the device in AWS
- In the log output of the provisioning tool
- Using the
ECC608.readProvisionItem()
call. See the sandbox example
To use a topic name that resides outside the thing_name
namespace, a new policy must be created and assigned to the certificate of the board. See AWS IoT Core policies.
Azure
As with AWS, a convenience function for Azure can be used. The board has to be provisioned for Azure IoT Hub. See the Provisioning User Guide for more information.
// Connect to Azure
MqttClient.beginAzure(<optional keep alive, defaults to 1200>);
Using Interrupts
If unfamiliar with interrupts, see the AVR datasheet. and/or the Arduino Interrupt Page
The modem sends notifications to the AVR on certain events and state changes. These notifications can be picked up and handled through a user-defined interrupt handler. Callback functions can be registered to these notifications using the onSomething
calls. These are:
- Cellular disconnect
Lte.onDisconnect(disconnect_callback)
- MQTT broker disconnect
MqttClient.onDisconnect(disconnected_callback)
- MQTT New Message Notification
MqttClient.onReceive(callback)
The callback descriptions for all callbacks except onReceive
is void myCallback(void)
. For new messages, onReceive
accepts a function with the following description void newMessage(const char *topic, const uint16_t message_length, const int32_t message_id)
. See Receiving Messages/Interrupt for more information.
Example
This code turns off the connection LED when the modem is disconnected:
void disconnectedFromNetwork(void) {
LedCtrl.off(Led::CELL);
}
Lte.onDisconnect(disconnectedFromNetwork);
For a complete interrupt MQTT Example, see Examples -> sandbox
in the Arduino IDE. The sandbox is the application that comes preloaded with the device. The documentation for it is here.
The above example used the Led Controller to control the LED.
Receiving Messages (Subscribe)
To receive a message published on a given topic, the client must subscribe to said topic. Subscribe to a topic by calling MqttClient.subscribe(topicName, QoS)
. It returns true
if successful, false
if not. For an explanation of the second argument (QoS), see Quality of Service. The default argument for the quality of service is AT_MOST_ONCE
and is not necessary to pass if you don't want to change the default QoS.
if(!MqttClient.subscribe("myTopic")) {
Log.error("Could not subscribe to myTopic");
} else {
Log.info("Subscribed to myTopic");
}
Polling
When using polling, the MqttClient.readMessage(topicName)
function must be called regularly to receive new messages.
while(1){
String myMessage = MqttClient.readMessage("myTopic");
if (myMessage != "") {
Log.infof("New message: %s\r\n", myMessage);
}
}
Interrupt
When using interrupts, the MqttClient.onReceive(callback)
notification can be used. The callback must be a function with the following structure:
void callback(const char *topic, const uint16_t message_length, const int32_t message_id);
Where topic
is the topic name of the message, message_length
is the length of the message and message_id
is a unique message ID. If the QoS of the topic is AT_MOST_ONCE
, message_id
is always -1.
After receiving the notification, the message is read by using the MqttClient.readMessage()
call.
volatile bool new_message_flag = false;
volatile uint16_t message_length = 0;
volatile int32_t message_id = 0;
char topicBuffer[250];
void newMessage(const char *topic, const uint16_t msg_length, const int32_t msg_id) {
// Copy over the topic name to the buffer
strcpy(topicBuffer, topic);
message_length = msg_length;
message_id = msg_id;
new_message_flag = true;
}
void setup() {
// ...
// Assuming we're already connected to the cellular network and the MQTT broker
MqttClient.onReceive(newMessage);
MqttClient.subscribe("mytopic");
}
void loop() {
// If a new message has arrived
if (new_message_flag) {
// Create a receive buffer, add 16 bytes for termination
char receiveBuffer[message_length + 16];
bool r = MqttClient.readMessage(topicBuffer, receiveBuffer, sizeof(receiveBuffer));
if (!r) {
Log.error("Failed to read message");
} else {
Log.infof("Read message %s\r\n", receiveBuffer);
}
new_message_flag = false;
}
}
Publishing to a Topic
Messages can be published to a topic with MqttClient.publish("myTopic", message, quality_of_service)
. The quality of service argument is optional and has the default value of AT_LEAST_ONCE
. This process is the same for both polling and interrupt mode. Returns true
if the publishing was successful.
Using the Secure Element (ECC608B)
The AVR-IoT Cellular (Mini) board comes bundled with a secure element (ECC608B). The main purpose of the secure element is to store the cryptographic keys and perform cryptographic calculations in an ultra-secure manner.
Enable the ECC
When connecting to an MQTT Broker with Transport Layer Security (TLS), the successor of the now-deprecated Secure Sockets Layer (SSL), is a cryptographic protocol designed to provide communications security over a computer network.TLS, if the Use ECC
flag is set in the begin()
call, all signing is performed by the secure element. When using beginAWS()
or beginAzure()
calls, the ECC is always used.
// Enable the ECC, non-AWS
MqttClient.begin(MQTT_THING_NAME, MQTT_BROKER, MQTT_PORT, true, 60, true);
// Enable the ECC, AWS
MqttClient.beginAWS();
// Enable the ECC, Azure
MqttClient.beginAzure();
Signing Mechanism
When the board connects to AWS or Azure, the connection message must be signed. The signing mechanism is performed through a callback. The modem sends a signing request to the AVR, which sets an internal flag. Upon a successful sign, the signed message is forwarded back to the modem.
Quality of Service
The second argument to both publish
and subscribe
is QoS (Quality of Service), which sets how much effort goes into ensuring the delivery/reception of the message. There are three main QoS levels:
AT_MOST_ONCE
: The message is delivered once, no acknowledgmentAT_LEAST_ONCE
: The message is delivered at least once, with acknowledgmentEXACTLTY_ONCE
: The message is delivered exactly once, with acknowledgment
Mosquitto.org has an excellent explanation of this process
Example (Interactive)
Use the interactive code editor to compile a hex file that you can program onto your board. Everything except for the MQTT Client has already been configured.