MQTT API
This Library is a high-performance [MQTT] library for IoT publish/subscribe communications.
The library supports MQTT version 3.1.1 and has the following features:
- Supports connect, publish, subscribe, ping and disconnect messages.
- Message quality of service for reliable delivery.
- Retained messages.
- TLS encryption with ALPN over port 443.
- High message throughput with exceptionally low overhead.
- Wait for delivery or acknowledgement options.
- Auto reconnect on network failures.
- Parallelism via fiber coroutines.
Extensions
Mqtt | MQTT Protocol. |
Functions
Mqtt * | mqttAlloc(cchar *clientId, MqttEventProc proc) |
Allocate an MQTT object. | |
int | mqttConnect(Mqtt *mq, RSocket *sock, int flags, MqttWaitFlags waitFlags) |
Establish a session with the MQTT broker. | |
int | mqttDisconnect(Mqtt *mq) |
Send a disconnection packet. | |
void | mqttFree(Mqtt *mq) |
Free an Mqtt instance. | |
cchar * | mqttGetError(struct Mqtt *mq) |
Returns an error message for error code, error. | |
Ticks | mqttGetLastActivity(Mqtt *mq) |
Return the time of last I/O activity. | |
bool | mqttIsConnected(Mqtt *mq) |
Return true if the MQTT instance is connected to a peer. | |
int | mqttMsgsToSend(Mqtt *mq) |
Get the number of messages in the queue. | |
int | mqttPing(Mqtt *mq) |
Ping the broker. | |
int | mqttPublish(Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...) |
Publish an application message to the MQTT broker. | |
int | mqttPublishRetained(Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...) |
Publish a retained message to the MQTT broker. | |
void | mqttSetKeepAlive(Mqtt *mq, Ticks keepAlive) |
Set the keep-alive timeout. | |
void | mqttSetMessageSize(Mqtt *mq, int size) |
Set the maximum message size. | |
void | mqttSetTimeout(Mqtt *mq, Ticks timeout) |
Set the idle connection timeout. | |
void | mqttSetWill(Mqtt *mq, cchar *topic, cvoid *msg, ssize length) |
Set the will and testament message. | |
int | mqttSubscribe(Mqtt *mq, MqttCallback callback, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...) |
Subscribe to a topic. | |
int | mqttSubscribeMaster(Mqtt *mq, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...) |
Perform a master subscription. | |
void | mqttThrottle(Mqtt *mq) |
Throttle sending. | |
int | mqttUnsubscribe(Mqtt *mq, cchar *topic, MqttWaitFlags wait) |
Unsubscribe from a topic. |
Typedefs
Mqtt | MQTT instance. |
MqttCallback | Message receipt callback. |
MqttEventProc | MQTt event callback. |
MqttHdr | Fixed header of a packet. |
MqttMsg | Mqtt message. |
MqttRecv | A struct used to deserialize/interpret an incoming packet from the broker. |
Defines
#define | MQTT_BUF_SIZE 4096 |
Receive buffer size. | |
#define | MQTT_EVENT_ATTACH 1 |
Attach a socket. | |
#define | MQTT_EVENT_CONNECTED 2 |
A new connection was established. | |
#define | MQTT_EVENT_DISCONNECT 3 |
Connection closed. | |
#define | MQTT_EVENT_TIMEOUT 4 |
The idle connection has timed out. | |
#define | MQTT_INLINE_BUF_SIZE 128 |
Size of inline buffer. | |
#define | MQTT_KEEP_ALIVE |
Default connection keep alive time. | |
#define | MQTT_MAX_MESSAGE_SIZE 256 * 1024 * 1024 |
Max message size. | |
#define | MQTT_MSG_TIMEOUT |
Default message timeout. | |
#define | MQTT_PROTOCOL_LEVEL 0x04 |
Protocol version 3.1.1. | |
#define | MQTT_TIMEOUT (MAXINT) |
Default connection timeout in msec. | |
#define | MQTT_TOPIC_SIZE 128 |
Max topic size. | |
#define | MQTT_WAIT_NONE 0x0 |
Wait flags. | |
#define | MQTT_PROTOCOL_LEVEL 0x04 |
Protocol version 3.1.1. | |
#define | MQTT_WAIT_NONE 0x0 |
Wait flags. |
Mqtt
MQTT Protocol.
- API Stability:
- Evolving.
- Fields:
-
RBuf * buf I/O read buffer. uint connected Mqtt is currently connected flag. int error Mqtt error flag. char * errorMsg Mqtt error message. uint freed Safe free detection. MqttMsg head Head of message queue. char * id Client ID. Ticks keepAlive Server side keep alive duration in seconds. REvent keepAliveEvent Keep alive event. Ticks lastActivity Time of last I/O activity. int mask R library wait event mask. char * masterTopic Master subscription topic. int maxMessage Maximum message size. int msgTimeout Message timeout for retransmit. int nextId Next message ID. char * password Username for connect. MqttEventProc proc Notification event callback
uint processing ProcessMqtt is running. RSocket * sock Underlying socket transport. uint subscribedApi Reserved. Ticks throttle Throttle delay in msec. Ticks throttleLastPub Time of last publish or throttle. Ticks throttleMark Throttle sending until Time. Ticks timeout Inactivity timeout for on-demand connections. RList * topics List of subscribed topics. char * username Password for connect. char * willMsg Will and testament message. ssize willMsgSize Size of will message. char * willTopic Will and testament topic.
Allocate an MQTT object.
- Parameters:
-
clientId Unique client identifier string. proc Event notification callback procedure.
- API Stability:
- Evolving.
Establish a session with the MQTT broker.
- Description:
- This call established a new MQTT connection to the broker using the supplied socket. The MQTT object keeps a reference to the socket. If the socket is closed or freed by the caller, the caller must call mqttDisconnect.
- Parameters:
-
mq The Mqtt object. sock The underlying socket to use for communications. flags Additional MqttConnectFlags to use when establishing the connection. These flags are for forcing the session to start clean: MQTT_CONNECT_CLEAN_SESSION, the QOS level to publish the will and testament messages with, and whether or not the broker should retain the will_message, MQTT_CONNECT_WILL_RETAIN. waitFlags Wait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK.
- Returns:
- Zero if successful.
- API Stability:
- Evolving.
Send a disconnection packet.
- Description:
- This will not close the socket. The peer, upon receiving the disconnection will close the connection.
- Returns:
- Zero if successful.
- API Stability:
- Evolving.
Returns an error message for error code, error.
- Parameters:
-
mq Mqtt object.
- Returns:
- The associated error message.
- API Stability:
- Evolving.
Return the time of last I/O activity.
- Returns:
- The time in Ticks of the last I/O.
- API Stability:
- Evolving.
Return true if the MQTT instance is connected to a peer.
- Parameters:
-
mq The MQTT mq.
- Returns:
- True if connected.
- API Stability:
- Prototype.
Get the number of messages in the queue.
- Parameters:
-
mq The MQTT mq.
- Returns:
- The number of messages in the queue.
- API Stability:
- Evolving.
Ping the broker.
- Parameters:
-
mq The MQTT mq.
- Returns:
- Zero if successful.
- API Stability:
- Evolving.
Publish an application message to the MQTT broker.
- Parameters:
-
mq The Mqtt object. msg The data to be published. size The size of application_message in bytes. qos Quality of service. 0, 1, or 2. waitFlags Wait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK. topic Printf style topic string. ... Topic args.
- Returns:
- Zero if successful.
Publish a retained message to the MQTT broker.
- Parameters:
-
mq The Mqtt object. msg The data to be published. size The size of application_message in bytes. qos Quality of service. 0, 1, or 2. waitFlags Wait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK. topic Printf style topic string. ... Topic args.
- Returns:
- Zero if successful.
- API Stability:
- Evolving.
Set the keep-alive timeout.
- Parameters:
-
mq The MQTT mq. keepAlive Time to wait in milliseconds before sending a keep-alive message.
- API Stability:
- Evolving.
Set the maximum message size.
- Description:
- AWS supports a smaller maximum message size.
- Parameters:
-
mq The MQTT mq. size The maximum message size.
- API Stability:
- Evolving.
Set the idle connection timeout.
- Parameters:
-
mq The MQTT mq. timeout Time to wait in milliseconds before closing the connection.
- API Stability:
- Evolving.
Set the will and testament message.
- Parameters:
-
mq The MQTT mq. topic Will message topic. msg Message to send. length Message size.
- API Stability:
- Evolving.
Subscribe to a topic.
- Parameters:
-
mq Mqtt object. callback Function to invoke on receipt of messages. maxQos Maximum quality of service message to receive. waitFlags Wait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK. topic Printf style topic string. ... Topic args.
- Returns:
- Zero if successful.
- API Stability:
- Evolving.
Perform a master subscription.
- Description:
- To minimize the number of active MQTT subscriptions, this call can establish a master subscription. Subsequent subscriptions using this master topic as a prefix will not incurr an MQTT protocol subscription. But will be processed off the master subscription locally.
- Parameters:
-
mq Mqtt object. maxQos Maximum quality of service message to receive. This is used for all local subscriptions using this master. waitFlags Wait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK. topic Printf style topic string. ... Topic args.
- Returns:
- Zero if successful.
- API Stability:
- Prototype.
Unsubscribe from a topic.
- Parameters:
-
mq The MQTT mq. topic The name of the topic to unsubscribe from. wait Wait flags.
- Returns:
- Zero if successful.
- API Stability:
- Evolving.
Functions
Throttle sending.
- API Stability:
- Internal.
Typedefs
Message receipt callback.
- Parameters:
-
resp Message received structure.
- API Stability:
- Evolving.
MQTT instance.
- API Stability:
- Evolving.
- Fields:
-
RBuf * buf I/O read buffer. uint connected Mqtt is currently connected flag. int error Mqtt error flag. char * errorMsg Mqtt error message. uint freed Safe free detection. MqttMsg head Head of message queue. char * id Client ID. Ticks keepAlive Server side keep alive duration in seconds. REvent keepAliveEvent Keep alive event. Ticks lastActivity Time of last I/O activity. int mask R library wait event mask. char * masterTopic Master subscription topic. int maxMessage Maximum message size. int msgTimeout Message timeout for retransmit. int nextId Next message ID. char * password Username for connect. MqttEventProc proc Notification event callback
uint processing ProcessMqtt is running. RSocket * sock Underlying socket transport. uint subscribedApi Reserved. Ticks throttle Throttle delay in msec. Ticks throttleLastPub Time of last publish or throttle. Ticks throttleMark Throttle sending until Time. Ticks timeout Inactivity timeout for on-demand connections. RList * topics List of subscribed topics. char * username Password for connect. char * willMsg Will and testament message. ssize willMsgSize Size of will message. char * willTopic Will and testament topic.
Fixed header of a packet.
- API Stability:
- Evolving.
- Fields:
-
int flags Packet control flags. int length Size in of the variable portion after fixed header and packet length.
Mqtt message.
- API Stability:
- Internal.
- Fields:
-
uchar * buf External message text buffer for large messages. uchar * end End of message. uchar * endbuf End of message buffer. RFiber * fiber Message fiber to process the message. int hold Dont free message. int id Message sequence ID. uchar inlineBuf[MQTT_INLINE_BUF_SIZE] Inline message text buffer for small message efficiency. struct MqttMsg * next Next message in the queue. struct MqttMsg * prev Previous message in the queue. int qos Message quality of service. Ticks sent Time the message was sent. uchar * start Start of message. MqttMsgState state Message send status. MqttPacketType type Message packet type. MqttWaitFlags wait Message wait flags.
A struct used to deserialize/interpret an incoming packet from the broker.
- API Stability:
- Evolving.
- Fields:
-
MqttConnCode code Connection response code. cuchar * codes Array of return codes for subscribed topics. char * data Published message. int dataSize Size of data. uchar dup Set to 0 on first attempt to send packet. uint hasSession Connection using an existing session. struct MqttHdr hdr MQTT message fixed header. int id Message ID. MqttTopic * matched Matched topic. struct Mqtt * mq Message queue. int numCodes Size of codes. uchar qos Quality of service. uchar retain Message is retained. char * topic Topic string. int topicSize Size of topic.