Skip to content

MQTT API

This Library is a high-performance [MQTT] library for IoT publish/subscribe communications.

The library supports MQTT version 3.1.1 and has the following features:

  • Supports connect, publish, subscribe, ping and disconnect messages.
  • Message quality of service for reliable delivery.
  • Retained messages.
  • TLS encryption with ALPN over port 443.
  • High message throughput with exceptionally low overhead.
  • Wait for delivery or acknowledgement options.
  • Auto reconnect on network failures.
  • Parallelism via fiber coroutines.

Extensions

Mqtt MQTT Protocol.

Functions

Mqtt *mqttAlloc(cchar *clientId, MqttEventProc proc)
 Allocate an MQTT object.
intmqttConnect(Mqtt *mq, RSocket *sock, int flags, MqttWaitFlags waitFlags)
 Establish a session with the MQTT broker.
intmqttDisconnect(Mqtt *mq)
 Send a disconnection packet.
voidmqttFree(Mqtt *mq)
 Free an Mqtt instance.
cchar *mqttGetError(struct Mqtt *mq)
 Returns an error message for error code, error.
TicksmqttGetLastActivity(Mqtt *mq)
 Return the time of last I/O activity.
boolmqttIsConnected(Mqtt *mq)
 Return true if the MQTT instance is connected to a peer.
intmqttMsgsToSend(Mqtt *mq)
 Get the number of messages in the queue.
intmqttPing(Mqtt *mq)
 Ping the broker.
intmqttPublish(Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Publish an application message to the MQTT broker.
intmqttPublishRetained(Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Publish a retained message to the MQTT broker.
voidmqttSetKeepAlive(Mqtt *mq, Ticks keepAlive)
 Set the keep-alive timeout.
voidmqttSetMessageSize(Mqtt *mq, int size)
 Set the maximum message size.
voidmqttSetTimeout(Mqtt *mq, Ticks timeout)
 Set the idle connection timeout.
voidmqttSetWill(Mqtt *mq, cchar *topic, cvoid *msg, ssize length)
 Set the will and testament message.
intmqttSubscribe(Mqtt *mq, MqttCallback callback, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Subscribe to a topic.
intmqttSubscribeMaster(Mqtt *mq, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...)
 Perform a master subscription.
voidmqttThrottle(Mqtt *mq)
 Throttle sending.
intmqttUnsubscribe(Mqtt *mq, cchar *topic, MqttWaitFlags wait)
 Unsubscribe from a topic.

Typedefs

MqttMQTT instance.
MqttCallbackMessage receipt callback.
MqttEventProcMQTt event callback.
MqttHdrFixed header of a packet.
MqttMsgMqtt message.
MqttRecvA struct used to deserialize/interpret an incoming packet from the broker.

Defines

#defineMQTT_BUF_SIZE   4096
 Receive buffer size.
#defineMQTT_EVENT_ATTACH   1
 Attach a socket.
#defineMQTT_EVENT_CONNECTED   2
 A new connection was established.
#defineMQTT_EVENT_DISCONNECT   3
 Connection closed.
#defineMQTT_EVENT_TIMEOUT   4
 The idle connection has timed out.
#defineMQTT_INLINE_BUF_SIZE   128
 Size of inline buffer.
#defineMQTT_KEEP_ALIVE   (20 * 60 * TPS)
 Default connection keep alive time.
#defineMQTT_MAX_MESSAGE_SIZE   256 * 1024 * 1024
 Max message size.
#defineMQTT_MSG_TIMEOUT   (30 * TPS)
 Default message timeout.
#defineMQTT_PROTOCOL_LEVEL   0x04
 Protocol version 3.1.1.
#defineMQTT_TIMEOUT   (MAXINT)
 Default connection timeout in msec.
#defineMQTT_TOPIC_SIZE   128
 Max topic size.
#defineMQTT_WAIT_NONE   0x0
 Wait flags.
#defineMQTT_PROTOCOL_LEVEL   0x04
 Protocol version 3.1.1.
#defineMQTT_WAIT_NONE   0x0
 Wait flags.

Mqtt

Mqtt

MQTT Protocol.

API Stability:
Evolving.
Fields:
RBuf *buf I/O read buffer.
uintconnected Mqtt is currently connected flag.
interror Mqtt error flag.
char *errorMsg Mqtt error message.
uintfreed Safe free detection.
MqttMsghead Head of message queue.
char *id Client ID.
TickskeepAlive Server side keep alive duration in seconds.
REventkeepAliveEvent Keep alive event.
TickslastActivity Time of last I/O activity.
intmask R library wait event mask.
char *masterTopic Master subscription topic.
intmaxMessage Maximum message size.
intmsgTimeout Message timeout for retransmit.
intnextId Next message ID.
char *password Username for connect.
MqttEventProcproc Notification event callback
uintprocessing ProcessMqtt is running.
RSocket *sock Underlying socket transport.
uintsubscribedApi Reserved.
Ticksthrottle Throttle delay in msec.
TicksthrottleLastPub Time of last publish or throttle.
TicksthrottleMark Throttle sending until Time.
Tickstimeout Inactivity timeout for on-demand connections.
RList *topics List of subscribed topics.
char *username Password for connect.
char *willMsg Will and testament message.
ssizewillMsgSize Size of will message.
char *willTopic Will and testament topic.

Mqtt * * mqttAlloc (cchar *clientId, MqttEventProc proc)

Allocate an MQTT object.

Parameters:
clientIdUnique client identifier string.
procEvent notification callback procedure.
API Stability:
Evolving.

int mqttConnect (Mqtt *mq, RSocket *sock, int flags, MqttWaitFlags waitFlags)

Establish a session with the MQTT broker.

Description:
This call established a new MQTT connection to the broker using the supplied socket. The MQTT object keeps a reference to the socket. If the socket is closed or freed by the caller, the caller must call mqttDisconnect.
Parameters:
mqThe Mqtt object.
sockThe underlying socket to use for communications.
flagsAdditional MqttConnectFlags to use when establishing the connection. These flags are for forcing the session to start clean: MQTT_CONNECT_CLEAN_SESSION, the QOS level to publish the will and testament messages with, and whether or not the broker should retain the will_message, MQTT_CONNECT_WILL_RETAIN.
waitFlagsWait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK.
Returns:
Zero if successful.
API Stability:
Evolving.

int mqttDisconnect (Mqtt *mq)

Send a disconnection packet.

Description:
This will not close the socket. The peer, upon receiving the disconnection will close the connection.
Returns:
Zero if successful.
API Stability:
Evolving.

cchar * * mqttGetError (struct Mqtt *mq)

Returns an error message for error code, error.

Parameters:
mqMqtt object.
Returns:
The associated error message.
API Stability:
Evolving.

Ticks mqttGetLastActivity (Mqtt *mq)

Return the time of last I/O activity.

Returns:
The time in Ticks of the last I/O.
API Stability:
Evolving.

bool mqttIsConnected (Mqtt *mq)

Return true if the MQTT instance is connected to a peer.

Parameters:
mqThe MQTT mq.
Returns:
True if connected.
API Stability:
Prototype.

int mqttMsgsToSend (Mqtt *mq)

Get the number of messages in the queue.

Parameters:
mqThe MQTT mq.
Returns:
The number of messages in the queue.
API Stability:
Evolving.

int mqttPing (Mqtt *mq)

Ping the broker.

Parameters:
mqThe MQTT mq.
Returns:
Zero if successful.
API Stability:
Evolving.

int mqttPublish (Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)

Publish an application message to the MQTT broker.

Parameters:
mqThe Mqtt object.
msgThe data to be published.
sizeThe size of application_message in bytes.
qosQuality of service. 0, 1, or 2.
waitFlagsWait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK.
topicPrintf style topic string.
...Topic args.
Returns:
Zero if successful.

int mqttPublishRetained (Mqtt *mq, cvoid *msg, ssize size, int qos, MqttWaitFlags waitFlags, cchar *topic, ...)

Publish a retained message to the MQTT broker.

Parameters:
mqThe Mqtt object.
msgThe data to be published.
sizeThe size of application_message in bytes.
qosQuality of service. 0, 1, or 2.
waitFlagsWait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK.
topicPrintf style topic string.
...Topic args.
Returns:
Zero if successful.
API Stability:
Evolving.

void mqttSetKeepAlive (Mqtt *mq, Ticks keepAlive)

Set the keep-alive timeout.

Parameters:
mqThe MQTT mq.
keepAliveTime to wait in milliseconds before sending a keep-alive message.
API Stability:
Evolving.

void mqttSetMessageSize (Mqtt *mq, int size)

Set the maximum message size.

Description:
AWS supports a smaller maximum message size.
Parameters:
mqThe MQTT mq.
sizeThe maximum message size.
API Stability:
Evolving.

void mqttSetTimeout (Mqtt *mq, Ticks timeout)

Set the idle connection timeout.

Parameters:
mqThe MQTT mq.
timeoutTime to wait in milliseconds before closing the connection.
API Stability:
Evolving.

void mqttSetWill (Mqtt *mq, cchar *topic, cvoid *msg, ssize length)

Set the will and testament message.

Parameters:
mqThe MQTT mq.
topicWill message topic.
msgMessage to send.
lengthMessage size.
API Stability:
Evolving.

int mqttSubscribe (Mqtt *mq, MqttCallback callback, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...)

Subscribe to a topic.

Parameters:
mqMqtt object.
callbackFunction to invoke on receipt of messages.
maxQosMaximum quality of service message to receive.
waitFlagsWait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK.
topicPrintf style topic string.
...Topic args.
Returns:
Zero if successful.
API Stability:
Evolving.

int mqttSubscribeMaster (Mqtt *mq, int maxQos, MqttWaitFlags waitFlags, cchar *topic, ...)

Perform a master subscription.

Description:
To minimize the number of active MQTT subscriptions, this call can establish a master subscription. Subsequent subscriptions using this master topic as a prefix will not incurr an MQTT protocol subscription. But will be processed off the master subscription locally.
Parameters:
mqMqtt object.
maxQosMaximum quality of service message to receive. This is used for all local subscriptions using this master.
waitFlagsWait flags. Set to MQTT_WAIT_NONE, MQTT_WAIT_SENT or MQTT_WAIT_ACK.
topicPrintf style topic string.
...Topic args.
Returns:
Zero if successful.
API Stability:
Prototype.

int mqttUnsubscribe (Mqtt *mq, cchar *topic, MqttWaitFlags wait)

Unsubscribe from a topic.

Parameters:
mqThe MQTT mq.
topicThe name of the topic to unsubscribe from.
waitWait flags.
Returns:
Zero if successful.
API Stability:
Evolving.

Functions

void mqttFree (Mqtt *mq)

Free an Mqtt instance.

Parameters:
mqMqtt instance allocated via mqttAlloc

void mqttThrottle (Mqtt *mq)

Throttle sending.

API Stability:
Internal.

Typedefs

typedef void(* MqttCallback) (struct MqttRecv *resp).

Message receipt callback.

Parameters:
respMessage received structure.
API Stability:
Evolving.

typedef void(* MqttEventProc) (struct Mqtt *mq, int event).

MQTt event callback.

Parameters:
mqMqtt object created via mqttAlloc
eventEvent type, set to MQTT_EVENT_CONNECT, MQTT_EVENT_DISCONNECT or MQTT_EVENT_STOPPING.
API Stability:
Evolving.

Mqtt

MQTT instance.

API Stability:
Evolving.
Fields:
RBuf *buf I/O read buffer.
uintconnected Mqtt is currently connected flag.
interror Mqtt error flag.
char *errorMsg Mqtt error message.
uintfreed Safe free detection.
MqttMsghead Head of message queue.
char *id Client ID.
TickskeepAlive Server side keep alive duration in seconds.
REventkeepAliveEvent Keep alive event.
TickslastActivity Time of last I/O activity.
intmask R library wait event mask.
char *masterTopic Master subscription topic.
intmaxMessage Maximum message size.
intmsgTimeout Message timeout for retransmit.
intnextId Next message ID.
char *password Username for connect.
MqttEventProcproc Notification event callback
uintprocessing ProcessMqtt is running.
RSocket *sock Underlying socket transport.
uintsubscribedApi Reserved.
Ticksthrottle Throttle delay in msec.
TicksthrottleLastPub Time of last publish or throttle.
TicksthrottleMark Throttle sending until Time.
Tickstimeout Inactivity timeout for on-demand connections.
RList *topics List of subscribed topics.
char *username Password for connect.
char *willMsg Will and testament message.
ssizewillMsgSize Size of will message.
char *willTopic Will and testament topic.

MqttHdr

Fixed header of a packet.

API Stability:
Evolving.
Fields:
intflags Packet control flags.
intlength Size in of the variable portion after fixed header and packet length.

MqttMsg

Mqtt message.

API Stability:
Internal.
Fields:
uchar *buf External message text buffer for large messages.
uchar *end End of message.
uchar *endbuf End of message buffer.
RFiber *fiber Message fiber to process the message.
inthold Dont free message.
intid Message sequence ID.
ucharinlineBuf[MQTT_INLINE_BUF_SIZE] Inline message text buffer for small message efficiency.
struct MqttMsg *next Next message in the queue.
struct MqttMsg *prev Previous message in the queue.
intqos Message quality of service.
Tickssent Time the message was sent.
uchar *start Start of message.
MqttMsgStatestate Message send status.
MqttPacketTypetype Message packet type.
MqttWaitFlagswait Message wait flags.

MqttRecv

A struct used to deserialize/interpret an incoming packet from the broker.

API Stability:
Evolving.
Fields:
MqttConnCodecode Connection response code.
cuchar *codes Array of return codes for subscribed topics.
char *data Published message.
intdataSize Size of data.
uchardup Set to 0 on first attempt to send packet.
uinthasSession Connection using an existing session.
struct MqttHdrhdr MQTT message fixed header.
intid Message ID.
MqttTopic *matched Matched topic.
struct Mqtt *mq Message queue.
intnumCodes Size of codes.
ucharqos Quality of service.
ucharretain Message is retained.
char *topic Topic string.
inttopicSize Size of topic.