474 lines
13 KiB
C
474 lines
13 KiB
C
/*******************************************************************************
|
|
* Copyright (c) 2009, 2021 IBM Corp. and Ian Craggs
|
|
*
|
|
* All rights reserved. This program and the accompanying materials
|
|
* are made available under the terms of the Eclipse Public License v2.0
|
|
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
|
*
|
|
* The Eclipse Public License is available at
|
|
* https://www.eclipse.org/legal/epl-2.0/
|
|
* and the Eclipse Distribution License is available at
|
|
* http://www.eclipse.org/org/documents/edl-v10.php.
|
|
*
|
|
* Contributors:
|
|
* Ian Craggs - initial API and implementation and/or initial documentation
|
|
* Ian Craggs, Allan Stockdill-Mander - SSL updates
|
|
* Ian Craggs - MQTT 3.1.1 support
|
|
* Rong Xiang, Ian Craggs - C++ compatibility
|
|
* Ian Craggs - binary password and will payload
|
|
* Ian Craggs - MQTT 5.0 support
|
|
*******************************************************************************/
|
|
|
|
/**
|
|
* @file
|
|
* \brief functions to deal with reading and writing of MQTT packets from and to sockets
|
|
*
|
|
* Some other related functions are in the MQTTPacket module
|
|
*/
|
|
|
|
|
|
#include "MQTTPacketOut.h"
|
|
#include "Log.h"
|
|
#include "StackTrace.h"
|
|
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
|
|
#include "Heap.h"
|
|
|
|
|
|
/**
|
|
* Send an MQTT CONNECT packet down a socket for V5 or later
|
|
* @param client a structure from which to get all the required values
|
|
* @param MQTTVersion the MQTT version to connect with
|
|
* @param connectProperties MQTT V5 properties for the connect packet
|
|
* @param willProperties MQTT V5 properties for the will message, if any
|
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
|
*/
|
|
int MQTTPacket_send_connect(Clients* client, int MQTTVersion,
|
|
MQTTProperties* connectProperties, MQTTProperties* willProperties)
|
|
{
|
|
char *buf, *ptr;
|
|
Connect packet;
|
|
int rc = SOCKET_ERROR, len;
|
|
|
|
FUNC_ENTRY;
|
|
packet.header.byte = 0;
|
|
packet.header.bits.type = CONNECT;
|
|
|
|
len = ((MQTTVersion == MQTTVERSION_3_1) ? 12 : 10) + (int)strlen(client->clientID)+2;
|
|
if (client->will)
|
|
len += (int)strlen(client->will->topic)+2 + client->will->payloadlen+2;
|
|
if (client->username)
|
|
len += (int)strlen(client->username)+2;
|
|
if (client->password)
|
|
len += client->passwordlen+2;
|
|
if (MQTTVersion >= MQTTVERSION_5)
|
|
{
|
|
len += MQTTProperties_len(connectProperties);
|
|
if (client->will)
|
|
len += MQTTProperties_len(willProperties);
|
|
}
|
|
|
|
ptr = buf = malloc(len);
|
|
if (ptr == NULL)
|
|
goto exit_nofree;
|
|
if (MQTTVersion == MQTTVERSION_3_1)
|
|
{
|
|
writeUTF(&ptr, "MQIsdp");
|
|
writeChar(&ptr, (char)MQTTVERSION_3_1);
|
|
}
|
|
else if (MQTTVersion == MQTTVERSION_3_1_1 || MQTTVersion == MQTTVERSION_5)
|
|
{
|
|
writeUTF(&ptr, "MQTT");
|
|
writeChar(&ptr, (char)MQTTVersion);
|
|
}
|
|
else
|
|
goto exit;
|
|
|
|
packet.flags.all = 0;
|
|
if (MQTTVersion >= MQTTVERSION_5)
|
|
packet.flags.bits.cleanstart = client->cleanstart;
|
|
else
|
|
packet.flags.bits.cleanstart = client->cleansession;
|
|
packet.flags.bits.will = (client->will) ? 1 : 0;
|
|
if (packet.flags.bits.will)
|
|
{
|
|
packet.flags.bits.willQoS = client->will->qos;
|
|
packet.flags.bits.willRetain = client->will->retained;
|
|
}
|
|
if (client->username)
|
|
packet.flags.bits.username = 1;
|
|
if (client->password)
|
|
packet.flags.bits.password = 1;
|
|
|
|
writeChar(&ptr, packet.flags.all);
|
|
writeInt(&ptr, client->keepAliveInterval);
|
|
if (MQTTVersion >= MQTTVERSION_5)
|
|
MQTTProperties_write(&ptr, connectProperties);
|
|
writeUTF(&ptr, client->clientID);
|
|
if (client->will)
|
|
{
|
|
if (MQTTVersion >= MQTTVERSION_5)
|
|
MQTTProperties_write(&ptr, willProperties);
|
|
writeUTF(&ptr, client->will->topic);
|
|
writeData(&ptr, client->will->payload, client->will->payloadlen);
|
|
}
|
|
if (client->username)
|
|
writeUTF(&ptr, client->username);
|
|
if (client->password)
|
|
writeData(&ptr, client->password, client->passwordlen);
|
|
|
|
rc = MQTTPacket_send(&client->net, packet.header, buf, len, 1, MQTTVersion);
|
|
Log(LOG_PROTOCOL, 0, NULL, client->net.socket, client->clientID,
|
|
MQTTVersion, client->cleansession, rc);
|
|
exit:
|
|
if (rc != TCPSOCKET_INTERRUPTED)
|
|
free(buf);
|
|
exit_nofree:
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Function used in the new packets table to create connack packets.
|
|
* @param MQTTVersion MQTT 5 or less?
|
|
* @param aHeader the MQTT header byte
|
|
* @param data the rest of the packet
|
|
* @param datalen the length of the rest of the packet
|
|
* @return pointer to the packet structure
|
|
*/
|
|
void* MQTTPacket_connack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
|
|
{
|
|
Connack* pack = NULL;
|
|
char* curdata = data;
|
|
char* enddata = &data[datalen];
|
|
|
|
FUNC_ENTRY;
|
|
if ((pack = malloc(sizeof(Connack))) == NULL)
|
|
goto exit;
|
|
pack->MQTTVersion = MQTTVersion;
|
|
pack->header.byte = aHeader;
|
|
if (datalen < 2) /* enough data for connect flags and reason code? */
|
|
{
|
|
free(pack);
|
|
pack = NULL;
|
|
goto exit;
|
|
}
|
|
pack->flags.all = readChar(&curdata); /* connect flags */
|
|
pack->rc = readChar(&curdata); /* reason code */
|
|
if (MQTTVersion >= MQTTVERSION_5 && datalen > 2)
|
|
{
|
|
MQTTProperties props = MQTTProperties_initializer;
|
|
pack->properties = props;
|
|
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
|
|
{
|
|
if (pack->properties.array)
|
|
free(pack->properties.array);
|
|
if (pack)
|
|
free(pack);
|
|
pack = NULL; /* signal protocol error */
|
|
goto exit;
|
|
}
|
|
}
|
|
exit:
|
|
FUNC_EXIT;
|
|
return pack;
|
|
}
|
|
|
|
|
|
/**
|
|
* Free allocated storage for a connack packet.
|
|
* @param pack pointer to the connack packet structure
|
|
*/
|
|
void MQTTPacket_freeConnack(Connack* pack)
|
|
{
|
|
FUNC_ENTRY;
|
|
if (pack->MQTTVersion >= MQTTVERSION_5)
|
|
MQTTProperties_free(&pack->properties);
|
|
free(pack);
|
|
FUNC_EXIT;
|
|
}
|
|
|
|
|
|
/**
|
|
* Send an MQTT PINGREQ packet down a socket.
|
|
* @param socket the open socket to send the data to
|
|
* @param clientID the string client identifier, only used for tracing
|
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
|
*/
|
|
int MQTTPacket_send_pingreq(networkHandles* net, const char* clientID)
|
|
{
|
|
Header header;
|
|
int rc = 0;
|
|
|
|
FUNC_ENTRY;
|
|
header.byte = 0;
|
|
header.bits.type = PINGREQ;
|
|
rc = MQTTPacket_send(net, header, NULL, 0, 0, MQTTVERSION_3_1_1);
|
|
Log(LOG_PROTOCOL, 20, NULL, net->socket, clientID, rc);
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Send an MQTT subscribe packet down a socket.
|
|
* @param topics list of topics
|
|
* @param qoss list of corresponding QoSs
|
|
* @param msgid the MQTT message id to use
|
|
* @param dup boolean - whether to set the MQTT DUP flag
|
|
* @param socket the open socket to send the data to
|
|
* @param clientID the string client identifier, only used for tracing
|
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
|
*/
|
|
int MQTTPacket_send_subscribe(List* topics, List* qoss, MQTTSubscribe_options* opts, MQTTProperties* props,
|
|
int msgid, int dup, Clients* client)
|
|
{
|
|
Header header;
|
|
char *data, *ptr;
|
|
int rc = -1;
|
|
ListElement *elem = NULL, *qosElem = NULL;
|
|
int datalen, i = 0;
|
|
|
|
FUNC_ENTRY;
|
|
header.bits.type = SUBSCRIBE;
|
|
header.bits.dup = dup;
|
|
header.bits.qos = 1;
|
|
header.bits.retain = 0;
|
|
|
|
datalen = 2 + topics->count * 3; /* utf length + char qos == 3 */
|
|
while (ListNextElement(topics, &elem))
|
|
datalen += (int)strlen((char*)(elem->content));
|
|
if (client->MQTTVersion >= MQTTVERSION_5)
|
|
datalen += MQTTProperties_len(props);
|
|
|
|
ptr = data = malloc(datalen);
|
|
if (ptr == NULL)
|
|
goto exit;
|
|
writeInt(&ptr, msgid);
|
|
|
|
if (client->MQTTVersion >= MQTTVERSION_5)
|
|
MQTTProperties_write(&ptr, props);
|
|
|
|
elem = NULL;
|
|
while (ListNextElement(topics, &elem))
|
|
{
|
|
char subopts = 0;
|
|
|
|
ListNextElement(qoss, &qosElem);
|
|
writeUTF(&ptr, (char*)(elem->content));
|
|
subopts = *(int*)(qosElem->content);
|
|
if (client->MQTTVersion >= MQTTVERSION_5 && opts != NULL)
|
|
{
|
|
subopts |= (opts[i].noLocal << 2); /* 1 bit */
|
|
subopts |= (opts[i].retainAsPublished << 3); /* 1 bit */
|
|
subopts |= (opts[i].retainHandling << 4); /* 2 bits */
|
|
}
|
|
writeChar(&ptr, subopts);
|
|
++i;
|
|
}
|
|
rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
|
|
Log(LOG_PROTOCOL, 22, NULL, client->net.socket, client->clientID, msgid, rc);
|
|
if (rc != TCPSOCKET_INTERRUPTED)
|
|
free(data);
|
|
exit:
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Function used in the new packets table to create suback packets.
|
|
* @param MQTTVersion the version of MQTT
|
|
* @param aHeader the MQTT header byte
|
|
* @param data the rest of the packet
|
|
* @param datalen the length of the rest of the packet
|
|
* @return pointer to the packet structure
|
|
*/
|
|
void* MQTTPacket_suback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
|
|
{
|
|
Suback* pack = NULL;
|
|
char* curdata = data;
|
|
char* enddata = &data[datalen];
|
|
|
|
FUNC_ENTRY;
|
|
if ((pack = malloc(sizeof(Suback))) == NULL)
|
|
goto exit;
|
|
pack->MQTTVersion = MQTTVersion;
|
|
pack->header.byte = aHeader;
|
|
if (enddata - curdata < 2) /* Is there enough data to read the msgid? */
|
|
{
|
|
free(pack);
|
|
pack = NULL;
|
|
goto exit;
|
|
}
|
|
pack->msgId = readInt(&curdata);
|
|
if (MQTTVersion >= MQTTVERSION_5)
|
|
{
|
|
MQTTProperties props = MQTTProperties_initializer;
|
|
pack->properties = props;
|
|
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
|
|
{
|
|
if (pack->properties.array)
|
|
free(pack->properties.array);
|
|
if (pack)
|
|
free(pack);
|
|
pack = NULL; /* signal protocol error */
|
|
goto exit;
|
|
}
|
|
}
|
|
pack->qoss = ListInitialize();
|
|
while ((size_t)(curdata - data) < datalen)
|
|
{
|
|
unsigned int* newint;
|
|
newint = malloc(sizeof(unsigned int));
|
|
if (newint == NULL)
|
|
{
|
|
if (pack->properties.array)
|
|
free(pack->properties.array);
|
|
if (pack)
|
|
free(pack);
|
|
pack = NULL; /* signal protocol error */
|
|
goto exit;
|
|
}
|
|
*newint = (unsigned int)readChar(&curdata);
|
|
ListAppend(pack->qoss, newint, sizeof(unsigned int));
|
|
}
|
|
if (pack->qoss->count == 0)
|
|
{
|
|
if (pack->properties.array)
|
|
free(pack->properties.array);
|
|
ListFree(pack->qoss);
|
|
free(pack);
|
|
pack = NULL;
|
|
}
|
|
exit:
|
|
FUNC_EXIT;
|
|
return pack;
|
|
}
|
|
|
|
|
|
/**
|
|
* Send an MQTT unsubscribe packet down a socket.
|
|
* @param topics list of topics
|
|
* @param msgid the MQTT message id to use
|
|
* @param dup boolean - whether to set the MQTT DUP flag
|
|
* @param socket the open socket to send the data to
|
|
* @param clientID the string client identifier, only used for tracing
|
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
|
*/
|
|
int MQTTPacket_send_unsubscribe(List* topics, MQTTProperties* props, int msgid, int dup, Clients* client)
|
|
{
|
|
Header header;
|
|
char *data, *ptr;
|
|
int rc = SOCKET_ERROR;
|
|
ListElement *elem = NULL;
|
|
int datalen;
|
|
|
|
FUNC_ENTRY;
|
|
header.bits.type = UNSUBSCRIBE;
|
|
header.bits.dup = dup;
|
|
header.bits.qos = 1;
|
|
header.bits.retain = 0;
|
|
|
|
datalen = 2 + topics->count * 2; /* utf length == 2 */
|
|
while (ListNextElement(topics, &elem))
|
|
datalen += (int)strlen((char*)(elem->content));
|
|
if (client->MQTTVersion >= MQTTVERSION_5)
|
|
datalen += MQTTProperties_len(props);
|
|
ptr = data = malloc(datalen);
|
|
if (ptr == NULL)
|
|
goto exit;
|
|
|
|
writeInt(&ptr, msgid);
|
|
|
|
if (client->MQTTVersion >= MQTTVERSION_5)
|
|
MQTTProperties_write(&ptr, props);
|
|
|
|
elem = NULL;
|
|
while (ListNextElement(topics, &elem))
|
|
writeUTF(&ptr, (char*)(elem->content));
|
|
rc = MQTTPacket_send(&client->net, header, data, datalen, 1, client->MQTTVersion);
|
|
Log(LOG_PROTOCOL, 25, NULL, client->net.socket, client->clientID, msgid, rc);
|
|
if (rc != TCPSOCKET_INTERRUPTED)
|
|
free(data);
|
|
exit:
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Function used in the new packets table to create unsuback packets.
|
|
* @param MQTTVersion the version of MQTT
|
|
* @param aHeader the MQTT header byte
|
|
* @param data the rest of the packet
|
|
* @param datalen the length of the rest of the packet
|
|
* @return pointer to the packet structure
|
|
*/
|
|
void* MQTTPacket_unsuback(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
|
|
{
|
|
Unsuback* pack = NULL;
|
|
char* curdata = data;
|
|
char* enddata = &data[datalen];
|
|
|
|
FUNC_ENTRY;
|
|
if ((pack = malloc(sizeof(Unsuback))) == NULL)
|
|
goto exit;
|
|
pack->MQTTVersion = MQTTVersion;
|
|
pack->header.byte = aHeader;
|
|
if (enddata - curdata < 2) /* Is there enough data? */
|
|
{
|
|
free(pack);
|
|
pack = NULL;
|
|
goto exit;
|
|
}
|
|
pack->msgId = readInt(&curdata);
|
|
pack->reasonCodes = NULL;
|
|
if (MQTTVersion >= MQTTVERSION_5)
|
|
{
|
|
MQTTProperties props = MQTTProperties_initializer;
|
|
pack->properties = props;
|
|
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
|
|
{
|
|
if (pack->properties.array)
|
|
free(pack->properties.array);
|
|
if (pack)
|
|
free(pack);
|
|
pack = NULL; /* signal protocol error */
|
|
goto exit;
|
|
}
|
|
pack->reasonCodes = ListInitialize();
|
|
while ((size_t)(curdata - data) < datalen)
|
|
{
|
|
enum MQTTReasonCodes* newrc;
|
|
newrc = malloc(sizeof(enum MQTTReasonCodes));
|
|
if (newrc == NULL)
|
|
{
|
|
if (pack->properties.array)
|
|
free(pack->properties.array);
|
|
if (pack)
|
|
free(pack);
|
|
pack = NULL; /* signal protocol error */
|
|
goto exit;
|
|
}
|
|
*newrc = (enum MQTTReasonCodes)readChar(&curdata);
|
|
ListAppend(pack->reasonCodes, newrc, sizeof(enum MQTTReasonCodes));
|
|
}
|
|
if (pack->reasonCodes->count == 0)
|
|
{
|
|
ListFree(pack->reasonCodes);
|
|
if (pack->properties.array)
|
|
free(pack->properties.array);
|
|
if (pack)
|
|
free(pack);
|
|
pack = NULL;
|
|
}
|
|
}
|
|
exit:
|
|
FUNC_EXIT;
|
|
return pack;
|
|
}
|