1078 lines
28 KiB
C
1078 lines
28 KiB
C
|
/*******************************************************************************
|
||
|
* Copyright (c) 2009, 2023 IBM Corp. and Ian Craggs
|
||
|
*
|
||
|
* All rights reserved. This program and the accompanying materials
|
||
|
* are made available under the terms of the Eclipse Public License v2.0
|
||
|
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||
|
*
|
||
|
* The Eclipse Public License is available at
|
||
|
* https://www.eclipse.org/legal/epl-2.0/
|
||
|
* and the Eclipse Distribution License is available at
|
||
|
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||
|
*
|
||
|
* Contributors:
|
||
|
* Ian Craggs - initial API and implementation and/or initial documentation
|
||
|
* Ian Craggs, Allan Stockdill-Mander - SSL updates
|
||
|
* Ian Craggs - MQTT 3.1.1 support
|
||
|
* Ian Craggs - fix for issue 453
|
||
|
* Ian Craggs - MQTT 5.0 support
|
||
|
*******************************************************************************/
|
||
|
|
||
|
/**
|
||
|
* @file
|
||
|
* \brief functions to deal with reading and writing of MQTT packets from and to sockets
|
||
|
*
|
||
|
* Some other related functions are in the MQTTPacketOut module
|
||
|
*/
|
||
|
|
||
|
#include "MQTTPacket.h"
|
||
|
#include "Log.h"
|
||
|
#if !defined(NO_PERSISTENCE)
|
||
|
#include "MQTTPersistence.h"
|
||
|
#endif
|
||
|
#include "Messages.h"
|
||
|
#include "StackTrace.h"
|
||
|
#include "WebSocket.h"
|
||
|
#include "MQTTTime.h"
|
||
|
|
||
|
#include <stdlib.h>
|
||
|
#include <string.h>
|
||
|
|
||
|
#include "Heap.h"
|
||
|
|
||
|
#if !defined(min)
|
||
|
#define min(A,B) ( (A) < (B) ? (A):(B))
|
||
|
#endif
|
||
|
|
||
|
/**
|
||
|
* List of the predefined MQTT v3/v5 packet names.
|
||
|
*/
|
||
|
static const char *packet_names[] =
|
||
|
{
|
||
|
"RESERVED", "CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL",
|
||
|
"PUBCOMP", "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK",
|
||
|
"PINGREQ", "PINGRESP", "DISCONNECT", "AUTH"
|
||
|
};
|
||
|
|
||
|
const char** MQTTClient_packet_names = packet_names;
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Converts an MQTT packet code into its name
|
||
|
* @param ptype packet code
|
||
|
* @return the corresponding string, or "UNKNOWN"
|
||
|
*/
|
||
|
const char* MQTTPacket_name(int ptype)
|
||
|
{
|
||
|
return (ptype >= 0 && ptype <= AUTH) ? packet_names[ptype] : "UNKNOWN";
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Array of functions to build packets, indexed according to packet code
|
||
|
*/
|
||
|
pf new_packets[] =
|
||
|
{
|
||
|
NULL, /**< reserved */
|
||
|
NULL, /**< MQTTPacket_connect*/
|
||
|
MQTTPacket_connack, /**< CONNACK */
|
||
|
MQTTPacket_publish, /**< PUBLISH */
|
||
|
MQTTPacket_ack, /**< PUBACK */
|
||
|
MQTTPacket_ack, /**< PUBREC */
|
||
|
MQTTPacket_ack, /**< PUBREL */
|
||
|
MQTTPacket_ack, /**< PUBCOMP */
|
||
|
NULL, /**< MQTTPacket_subscribe*/
|
||
|
MQTTPacket_suback, /**< SUBACK */
|
||
|
NULL, /**< MQTTPacket_unsubscribe*/
|
||
|
MQTTPacket_unsuback, /**< UNSUBACK */
|
||
|
MQTTPacket_header_only, /**< PINGREQ */
|
||
|
MQTTPacket_header_only, /**< PINGRESP */
|
||
|
MQTTPacket_ack, /**< DISCONNECT */
|
||
|
MQTTPacket_ack /**< AUTH */
|
||
|
};
|
||
|
|
||
|
|
||
|
static char* readUTFlen(char** pptr, char* enddata, int* len);
|
||
|
static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net);
|
||
|
|
||
|
/**
|
||
|
* Reads one MQTT packet from a socket.
|
||
|
* @param socket a socket from which to read an MQTT packet
|
||
|
* @param error pointer to the error code which is completed if no packet is returned
|
||
|
* @return the packet structure or NULL if there was an error
|
||
|
*/
|
||
|
void* MQTTPacket_Factory(int MQTTVersion, networkHandles* net, int* error)
|
||
|
{
|
||
|
char* data = NULL;
|
||
|
static Header header;
|
||
|
size_t remaining_length;
|
||
|
int ptype;
|
||
|
void* pack = NULL;
|
||
|
size_t actual_len = 0;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
*error = SOCKET_ERROR; /* indicate whether an error occurred, or not */
|
||
|
|
||
|
const size_t headerWsFramePos = WebSocket_framePos();
|
||
|
|
||
|
/* read the packet data from the socket */
|
||
|
*error = WebSocket_getch(net, &header.byte);
|
||
|
if (*error != TCPSOCKET_COMPLETE) /* first byte is the header byte */
|
||
|
goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
|
||
|
|
||
|
/* now read the remaining length, so we know how much more to read */
|
||
|
if ((*error = MQTTPacket_decode(net, &remaining_length)) != TCPSOCKET_COMPLETE)
|
||
|
goto exit; /* packet not read, *error indicates whether SOCKET_ERROR occurred */
|
||
|
|
||
|
/* now read the rest, the variable header and payload */
|
||
|
data = WebSocket_getdata(net, remaining_length, &actual_len);
|
||
|
if (remaining_length && data == NULL)
|
||
|
{
|
||
|
*error = SOCKET_ERROR;
|
||
|
goto exit; /* socket error */
|
||
|
}
|
||
|
|
||
|
if (actual_len < remaining_length)
|
||
|
{
|
||
|
*error = TCPSOCKET_INTERRUPTED;
|
||
|
net->lastReceived = MQTTTime_now();
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
ptype = header.bits.type;
|
||
|
if (ptype < CONNECT || (MQTTVersion < MQTTVERSION_5 && ptype >= DISCONNECT) ||
|
||
|
(MQTTVersion >= MQTTVERSION_5 && ptype > AUTH) ||
|
||
|
new_packets[ptype] == NULL)
|
||
|
Log(TRACE_MIN, 2, NULL, ptype);
|
||
|
else
|
||
|
{
|
||
|
if ((pack = (*new_packets[ptype])(MQTTVersion, header.byte, data, remaining_length)) == NULL)
|
||
|
{
|
||
|
*error = SOCKET_ERROR; // was BAD_MQTT_PACKET;
|
||
|
Log(LOG_ERROR, -1, "Bad MQTT packet, type %d", ptype);
|
||
|
}
|
||
|
#if !defined(NO_PERSISTENCE)
|
||
|
else if (header.bits.type == PUBLISH && header.bits.qos == 2)
|
||
|
{
|
||
|
int buf0len;
|
||
|
char *buf = malloc(10);
|
||
|
|
||
|
if (buf == NULL)
|
||
|
{
|
||
|
*error = SOCKET_ERROR;
|
||
|
goto exit;
|
||
|
}
|
||
|
buf[0] = header.byte;
|
||
|
buf0len = 1 + MQTTPacket_encode(&buf[1], remaining_length);
|
||
|
*error = MQTTPersistence_putPacket(net->socket, buf, buf0len, 1,
|
||
|
&data, &remaining_length, header.bits.type, ((Publish *)pack)->msgId, 1, MQTTVersion);
|
||
|
free(buf);
|
||
|
}
|
||
|
#endif
|
||
|
}
|
||
|
}
|
||
|
if (pack)
|
||
|
net->lastReceived = MQTTTime_now();
|
||
|
exit:
|
||
|
if (*error == TCPSOCKET_INTERRUPTED)
|
||
|
WebSocket_framePosSeekTo(headerWsFramePos);
|
||
|
|
||
|
FUNC_EXIT_RC(*error);
|
||
|
return pack;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Sends an MQTT packet in one system call write
|
||
|
* @param socket the socket to which to write the data
|
||
|
* @param header the one-byte MQTT header
|
||
|
* @param buffer the rest of the buffer to write (not including remaining length)
|
||
|
* @param buflen the length of the data in buffer to be written
|
||
|
* @param MQTTVersion the version of MQTT being used
|
||
|
* @return the completion code (TCPSOCKET_COMPLETE etc)
|
||
|
*/
|
||
|
int MQTTPacket_send(networkHandles* net, Header header, char* buffer, size_t buflen, int freeData,
|
||
|
int MQTTVersion)
|
||
|
{
|
||
|
int rc = SOCKET_ERROR;
|
||
|
size_t buf0len;
|
||
|
char *buf;
|
||
|
PacketBuffers packetbufs;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
buf0len = 1 + MQTTPacket_encode(NULL, buflen);
|
||
|
buf = malloc(buf0len);
|
||
|
if (buf == NULL)
|
||
|
{
|
||
|
rc = SOCKET_ERROR;
|
||
|
goto exit;
|
||
|
}
|
||
|
buf[0] = header.byte;
|
||
|
MQTTPacket_encode(&buf[1], buflen);
|
||
|
|
||
|
#if !defined(NO_PERSISTENCE)
|
||
|
if (header.bits.type == PUBREL)
|
||
|
{
|
||
|
char* ptraux = buffer;
|
||
|
int msgId = readInt(&ptraux);
|
||
|
|
||
|
rc = MQTTPersistence_putPacket(net->socket, buf, buf0len, 1, &buffer, &buflen,
|
||
|
header.bits.type, msgId, 0, MQTTVersion);
|
||
|
}
|
||
|
#endif
|
||
|
packetbufs.count = 1;
|
||
|
packetbufs.buffers = &buffer;
|
||
|
packetbufs.buflens = &buflen;
|
||
|
packetbufs.frees = &freeData;
|
||
|
memset(packetbufs.mask, '\0', sizeof(packetbufs.mask));
|
||
|
rc = WebSocket_putdatas(net, &buf, &buf0len, &packetbufs);
|
||
|
|
||
|
if (rc == TCPSOCKET_COMPLETE)
|
||
|
net->lastSent = MQTTTime_now();
|
||
|
|
||
|
if (rc != TCPSOCKET_INTERRUPTED)
|
||
|
free(buf);
|
||
|
|
||
|
exit:
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Sends an MQTT packet from multiple buffers in one system call write
|
||
|
* @param socket the socket to which to write the data
|
||
|
* @param header the one-byte MQTT header
|
||
|
* @param count the number of buffers
|
||
|
* @param buffers the rest of the buffers to write (not including remaining length)
|
||
|
* @param buflens the lengths of the data in the array of buffers to be written
|
||
|
* @param the MQTT version being used
|
||
|
* @return the completion code (TCPSOCKET_COMPLETE etc)
|
||
|
*/
|
||
|
int MQTTPacket_sends(networkHandles* net, Header header, PacketBuffers* bufs, int MQTTVersion)
|
||
|
{
|
||
|
int i, rc = SOCKET_ERROR;
|
||
|
size_t buf0len, total = 0;
|
||
|
char *buf;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
for (i = 0; i < bufs->count; i++)
|
||
|
total += bufs->buflens[i];
|
||
|
buf0len = 1 + MQTTPacket_encode(NULL, total);
|
||
|
buf = malloc(buf0len);
|
||
|
if (buf == NULL)
|
||
|
{
|
||
|
rc = SOCKET_ERROR;
|
||
|
goto exit;
|
||
|
}
|
||
|
buf[0] = header.byte;
|
||
|
MQTTPacket_encode(&buf[1], total);
|
||
|
|
||
|
#if !defined(NO_PERSISTENCE)
|
||
|
if (header.bits.type == PUBLISH && header.bits.qos != 0)
|
||
|
{ /* persist PUBLISH QoS1 and Qo2 */
|
||
|
char *ptraux = bufs->buffers[2];
|
||
|
int msgId = readInt(&ptraux);
|
||
|
rc = MQTTPersistence_putPacket(net->socket, buf, buf0len, bufs->count, bufs->buffers, bufs->buflens,
|
||
|
header.bits.type, msgId, 0, MQTTVersion);
|
||
|
}
|
||
|
#endif
|
||
|
rc = WebSocket_putdatas(net, &buf, &buf0len, bufs);
|
||
|
|
||
|
if (rc == TCPSOCKET_COMPLETE)
|
||
|
net->lastSent = MQTTTime_now();
|
||
|
|
||
|
if (rc != TCPSOCKET_INTERRUPTED)
|
||
|
free(buf);
|
||
|
exit:
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Encodes the message length according to the MQTT algorithm
|
||
|
* @param buf the buffer into which the encoded data is written
|
||
|
* @param length the length to be encoded
|
||
|
* @return the number of bytes written to buffer
|
||
|
*/
|
||
|
int MQTTPacket_encode(char* buf, size_t length)
|
||
|
{
|
||
|
int rc = 0;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
do
|
||
|
{
|
||
|
char d = length % 128;
|
||
|
length /= 128;
|
||
|
/* if there are more digits to encode, set the top bit of this digit */
|
||
|
if (length > 0)
|
||
|
d |= 0x80;
|
||
|
if (buf)
|
||
|
buf[rc++] = d;
|
||
|
else
|
||
|
rc++;
|
||
|
} while (length > 0);
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Decodes the message length according to the MQTT algorithm
|
||
|
* @param socket the socket from which to read the bytes
|
||
|
* @param value the decoded length returned
|
||
|
* @return the number of bytes read from the socket
|
||
|
*/
|
||
|
int MQTTPacket_decode(networkHandles* net, size_t* value)
|
||
|
{
|
||
|
int rc = SOCKET_ERROR;
|
||
|
char c;
|
||
|
int multiplier = 1;
|
||
|
int len = 0;
|
||
|
#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
*value = 0;
|
||
|
do
|
||
|
{
|
||
|
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
|
||
|
{
|
||
|
rc = SOCKET_ERROR; /* bad data */
|
||
|
goto exit;
|
||
|
}
|
||
|
rc = WebSocket_getch(net, &c);
|
||
|
if (rc != TCPSOCKET_COMPLETE)
|
||
|
goto exit;
|
||
|
*value += (c & 127) * multiplier;
|
||
|
multiplier *= 128;
|
||
|
} while ((c & 128) != 0);
|
||
|
exit:
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Calculates an integer from two bytes read from the input buffer
|
||
|
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
|
||
|
* @return the integer value calculated
|
||
|
*/
|
||
|
int readInt(char** pptr)
|
||
|
{
|
||
|
char* ptr = *pptr;
|
||
|
int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
|
||
|
*pptr += 2;
|
||
|
return len;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means
|
||
|
* a length delimited string. So it reads the two byte length then the data according to
|
||
|
* that length. The end of the buffer is provided too, so we can prevent buffer overruns caused
|
||
|
* by an incorrect length.
|
||
|
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
|
||
|
* @param enddata pointer to the end of the buffer not to be read beyond
|
||
|
* @param len returns the calculcated value of the length bytes read
|
||
|
* @return an allocated C string holding the characters read, or NULL if the length read would
|
||
|
* have caused an overrun.
|
||
|
*
|
||
|
*/
|
||
|
static char* readUTFlen(char** pptr, char* enddata, int* len)
|
||
|
{
|
||
|
char* string = NULL;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
if (enddata - (*pptr) > 1) /* enough length to read the integer? */
|
||
|
{
|
||
|
*len = readInt(pptr);
|
||
|
if (&(*pptr)[*len] <= enddata)
|
||
|
{
|
||
|
if ((string = malloc(*len+1)) == NULL)
|
||
|
goto exit;
|
||
|
memcpy(string, *pptr, *len);
|
||
|
string[*len] = '\0';
|
||
|
*pptr += *len;
|
||
|
}
|
||
|
}
|
||
|
exit:
|
||
|
FUNC_EXIT;
|
||
|
return string;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Reads a "UTF" string from the input buffer. UTF as in the MQTT v3 spec which really means
|
||
|
* a length delimited string. So it reads the two byte length then the data according to
|
||
|
* that length. The end of the buffer is provided too, so we can prevent buffer overruns caused
|
||
|
* by an incorrect length.
|
||
|
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
|
||
|
* @param enddata pointer to the end of the buffer not to be read beyond
|
||
|
* @return an allocated C string holding the characters read, or NULL if the length read would
|
||
|
* have caused an overrun.
|
||
|
*/
|
||
|
char* readUTF(char** pptr, char* enddata)
|
||
|
{
|
||
|
int len;
|
||
|
return readUTFlen(pptr, enddata, &len);
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Reads one character from the input buffer.
|
||
|
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
|
||
|
* @return the character read
|
||
|
*/
|
||
|
unsigned char readChar(char** pptr)
|
||
|
{
|
||
|
unsigned char c = **pptr;
|
||
|
(*pptr)++;
|
||
|
return c;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Writes one character to an output buffer.
|
||
|
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
|
||
|
* @param c the character to write
|
||
|
*/
|
||
|
void writeChar(char** pptr, char c)
|
||
|
{
|
||
|
**pptr = c;
|
||
|
(*pptr)++;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Writes an integer as 2 bytes to an output buffer.
|
||
|
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
|
||
|
* @param anInt the integer to write
|
||
|
*/
|
||
|
void writeInt(char** pptr, int anInt)
|
||
|
{
|
||
|
**pptr = (char)(anInt / 256);
|
||
|
(*pptr)++;
|
||
|
**pptr = (char)(anInt % 256);
|
||
|
(*pptr)++;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Writes a "UTF" string to an output buffer. Converts C string to length-delimited.
|
||
|
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
|
||
|
* @param string the C string to write
|
||
|
*/
|
||
|
void writeUTF(char** pptr, const char* string)
|
||
|
{
|
||
|
size_t len = strlen(string);
|
||
|
writeInt(pptr, (int)len);
|
||
|
memcpy(*pptr, string, len);
|
||
|
*pptr += len;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Writes length delimited data to an output buffer
|
||
|
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
|
||
|
* @param data the data to write
|
||
|
* @param datalen the length of the data to write
|
||
|
*/
|
||
|
void writeData(char** pptr, const void* data, int datalen)
|
||
|
{
|
||
|
writeInt(pptr, datalen);
|
||
|
memcpy(*pptr, data, datalen);
|
||
|
*pptr += datalen;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Function used in the new packets table to create packets which have only a header.
|
||
|
* @param MQTTVersion the version of MQTT
|
||
|
* @param aHeader the MQTT header byte
|
||
|
* @param data the rest of the packet
|
||
|
* @param datalen the length of the rest of the packet
|
||
|
* @return pointer to the packet structure
|
||
|
*/
|
||
|
void* MQTTPacket_header_only(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
|
||
|
{
|
||
|
static unsigned char header = 0;
|
||
|
header = aHeader;
|
||
|
return &header;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Send an MQTT disconnect packet down a socket.
|
||
|
* @param socket the open socket to send the data to
|
||
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
||
|
*/
|
||
|
int MQTTPacket_send_disconnect(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
|
||
|
{
|
||
|
Header header;
|
||
|
int rc = 0;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
header.byte = 0;
|
||
|
header.bits.type = DISCONNECT;
|
||
|
|
||
|
if (client->MQTTVersion >= 5 && (props || reason != MQTTREASONCODE_SUCCESS))
|
||
|
{
|
||
|
size_t buflen = 1 + ((props == NULL) ? 0 : MQTTProperties_len(props));
|
||
|
char *buf = NULL;
|
||
|
char *ptr = NULL;
|
||
|
|
||
|
if ((buf = malloc(buflen)) == NULL)
|
||
|
{
|
||
|
rc = SOCKET_ERROR;
|
||
|
goto exit;
|
||
|
}
|
||
|
ptr = buf;
|
||
|
writeChar(&ptr, reason);
|
||
|
if (props)
|
||
|
MQTTProperties_write(&ptr, props);
|
||
|
if ((rc = MQTTPacket_send(&client->net, header, buf, buflen, 1,
|
||
|
client->MQTTVersion)) != TCPSOCKET_INTERRUPTED)
|
||
|
free(buf);
|
||
|
}
|
||
|
else
|
||
|
rc = MQTTPacket_send(&client->net, header, NULL, 0, 0, client->MQTTVersion);
|
||
|
exit:
|
||
|
Log(LOG_PROTOCOL, 28, NULL, client->net.socket, client->clientID, rc);
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Function used in the new packets table to create publish packets.
|
||
|
* @param MQTTVersion
|
||
|
* @param aHeader the MQTT header byte
|
||
|
* @param data the rest of the packet
|
||
|
* @param datalen the length of the rest of the packet
|
||
|
* @return pointer to the packet structure
|
||
|
*/
|
||
|
void* MQTTPacket_publish(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
|
||
|
{
|
||
|
Publish* pack = NULL;
|
||
|
char* curdata = data;
|
||
|
char* enddata = &data[datalen];
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
if ((pack = malloc(sizeof(Publish))) == NULL)
|
||
|
goto exit;
|
||
|
memset(pack, '\0', sizeof(Publish));
|
||
|
pack->MQTTVersion = MQTTVersion;
|
||
|
pack->header.byte = aHeader;
|
||
|
if ((pack->topic = readUTFlen(&curdata, enddata, &pack->topiclen)) == NULL) /* Topic name on which to publish */
|
||
|
{
|
||
|
free(pack);
|
||
|
pack = NULL;
|
||
|
goto exit;
|
||
|
}
|
||
|
if (pack->header.bits.qos > 0) /* Msgid only exists for QoS 1 or 2 */
|
||
|
{
|
||
|
if (enddata - curdata < 2) /* Is there enough data for the msgid? */
|
||
|
{
|
||
|
free(pack);
|
||
|
pack = NULL;
|
||
|
goto exit;
|
||
|
}
|
||
|
pack->msgId = readInt(&curdata);
|
||
|
}
|
||
|
else
|
||
|
pack->msgId = 0;
|
||
|
if (MQTTVersion >= MQTTVERSION_5)
|
||
|
{
|
||
|
MQTTProperties props = MQTTProperties_initializer;
|
||
|
pack->properties = props;
|
||
|
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
|
||
|
{
|
||
|
if (pack->properties.array)
|
||
|
free(pack->properties.array);
|
||
|
if (pack)
|
||
|
free(pack);
|
||
|
pack = NULL; /* signal protocol error */
|
||
|
goto exit;
|
||
|
}
|
||
|
}
|
||
|
pack->payload = curdata;
|
||
|
pack->payloadlen = (int)(datalen-(curdata-data));
|
||
|
exit:
|
||
|
FUNC_EXIT;
|
||
|
return pack;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Free allocated storage for a publish packet.
|
||
|
* @param pack pointer to the publish packet structure
|
||
|
*/
|
||
|
void MQTTPacket_freePublish(Publish* pack)
|
||
|
{
|
||
|
FUNC_ENTRY;
|
||
|
if (pack->topic != NULL)
|
||
|
free(pack->topic);
|
||
|
if (pack->MQTTVersion >= MQTTVERSION_5)
|
||
|
MQTTProperties_free(&pack->properties);
|
||
|
free(pack);
|
||
|
FUNC_EXIT;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Free allocated storage for an ack packet.
|
||
|
* @param pack pointer to the publish packet structure
|
||
|
*/
|
||
|
void MQTTPacket_freeAck(Ack* pack)
|
||
|
{
|
||
|
FUNC_ENTRY;
|
||
|
if (pack->MQTTVersion >= MQTTVERSION_5)
|
||
|
MQTTProperties_free(&pack->properties);
|
||
|
free(pack);
|
||
|
FUNC_EXIT;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Send an MQTT acknowledgement packet down a socket.
|
||
|
* @param MQTTVersion the version of MQTT being used
|
||
|
* @param type the MQTT packet type e.g. SUBACK
|
||
|
* @param msgid the MQTT message id to use
|
||
|
* @param dup boolean - whether to set the MQTT DUP flag
|
||
|
* @param net the network handle to send the data to
|
||
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
||
|
*/
|
||
|
static int MQTTPacket_send_ack(int MQTTVersion, int type, int msgid, int dup, networkHandles *net)
|
||
|
{
|
||
|
Header header;
|
||
|
int rc = SOCKET_ERROR;
|
||
|
char *buf = NULL;
|
||
|
char *ptr = NULL;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
if ((ptr = buf = malloc(2)) == NULL)
|
||
|
goto exit;
|
||
|
header.byte = 0;
|
||
|
header.bits.type = type;
|
||
|
header.bits.dup = dup;
|
||
|
if (type == PUBREL)
|
||
|
header.bits.qos = 1;
|
||
|
writeInt(&ptr, msgid);
|
||
|
if ((rc = MQTTPacket_send(net, header, buf, 2, 1, MQTTVersion)) != TCPSOCKET_INTERRUPTED)
|
||
|
free(buf);
|
||
|
exit:
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Send an MQTT PUBACK packet down a socket.
|
||
|
* @param MQTTVersion the version of MQTT being used
|
||
|
* @param msgid the MQTT message id to use
|
||
|
* @param socket the open socket to send the data to
|
||
|
* @param clientID the string client identifier, only used for tracing
|
||
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
||
|
*/
|
||
|
int MQTTPacket_send_puback(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
|
||
|
{
|
||
|
int rc = 0;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
rc = MQTTPacket_send_ack(MQTTVersion, PUBACK, msgid, 0, net);
|
||
|
Log(LOG_PROTOCOL, 12, NULL, net->socket, clientID, msgid, rc);
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Free allocated storage for a suback packet.
|
||
|
* @param pack pointer to the suback packet structure
|
||
|
*/
|
||
|
void MQTTPacket_freeSuback(Suback* pack)
|
||
|
{
|
||
|
FUNC_ENTRY;
|
||
|
if (pack->MQTTVersion >= MQTTVERSION_5)
|
||
|
MQTTProperties_free(&pack->properties);
|
||
|
if (pack->qoss != NULL)
|
||
|
ListFree(pack->qoss);
|
||
|
free(pack);
|
||
|
FUNC_EXIT;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Free allocated storage for a suback packet.
|
||
|
* @param pack pointer to the suback packet structure
|
||
|
*/
|
||
|
void MQTTPacket_freeUnsuback(Unsuback* pack)
|
||
|
{
|
||
|
FUNC_ENTRY;
|
||
|
if (pack->MQTTVersion >= MQTTVERSION_5)
|
||
|
{
|
||
|
MQTTProperties_free(&pack->properties);
|
||
|
if (pack->reasonCodes != NULL)
|
||
|
ListFree(pack->reasonCodes);
|
||
|
}
|
||
|
free(pack);
|
||
|
FUNC_EXIT;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Send an MQTT PUBREC packet down a socket.
|
||
|
* @param MQTTVersion the version of MQTT being used
|
||
|
* @param msgid the MQTT message id to use
|
||
|
* @param socket the open socket to send the data to
|
||
|
* @param clientID the string client identifier, only used for tracing
|
||
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
||
|
*/
|
||
|
int MQTTPacket_send_pubrec(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
|
||
|
{
|
||
|
int rc = 0;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
rc = MQTTPacket_send_ack(MQTTVersion, PUBREC, msgid, 0, net);
|
||
|
Log(LOG_PROTOCOL, 13, NULL, net->socket, clientID, msgid, rc);
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Send an MQTT PUBREL packet down a socket.
|
||
|
* @param MQTTVersion the version of MQTT being used
|
||
|
* @param msgid the MQTT message id to use
|
||
|
* @param dup boolean - whether to set the MQTT DUP flag
|
||
|
* @param socket the open socket to send the data to
|
||
|
* @param clientID the string client identifier, only used for tracing
|
||
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
||
|
*/
|
||
|
int MQTTPacket_send_pubrel(int MQTTVersion, int msgid, int dup, networkHandles* net, const char* clientID)
|
||
|
{
|
||
|
int rc = 0;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
rc = MQTTPacket_send_ack(MQTTVersion, PUBREL, msgid, dup, net);
|
||
|
Log(LOG_PROTOCOL, 16, NULL, net->socket, clientID, msgid, rc);
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Send an MQTT PUBCOMP packet down a socket.
|
||
|
* @param MQTTVersion the version of MQTT being used
|
||
|
* @param msgid the MQTT message id to use
|
||
|
* @param socket the open socket to send the data to
|
||
|
* @param clientID the string client identifier, only used for tracing
|
||
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
||
|
*/
|
||
|
int MQTTPacket_send_pubcomp(int MQTTVersion, int msgid, networkHandles* net, const char* clientID)
|
||
|
{
|
||
|
int rc = 0;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
rc = MQTTPacket_send_ack(MQTTVersion, PUBCOMP, msgid, 0, net);
|
||
|
Log(LOG_PROTOCOL, 18, NULL, net->socket, clientID, msgid, rc);
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Function used in the new packets table to create acknowledgement packets.
|
||
|
* @param MQTTVersion the version of MQTT being used
|
||
|
* @param aHeader the MQTT header byte
|
||
|
* @param data the rest of the packet
|
||
|
* @param datalen the length of the rest of the packet
|
||
|
* @return pointer to the packet structure
|
||
|
*/
|
||
|
void* MQTTPacket_ack(int MQTTVersion, unsigned char aHeader, char* data, size_t datalen)
|
||
|
{
|
||
|
Ack* pack = NULL;
|
||
|
char* curdata = data;
|
||
|
char* enddata = &data[datalen];
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
if ((pack = malloc(sizeof(Ack))) == NULL)
|
||
|
goto exit;
|
||
|
pack->MQTTVersion = MQTTVersion;
|
||
|
pack->header.byte = aHeader;
|
||
|
if (pack->header.bits.type != DISCONNECT)
|
||
|
{
|
||
|
if (enddata - curdata < 2) /* Is there enough data for the msgid? */
|
||
|
{
|
||
|
free(pack);
|
||
|
pack = NULL;
|
||
|
goto exit;
|
||
|
}
|
||
|
pack->msgId = readInt(&curdata);
|
||
|
}
|
||
|
if (MQTTVersion >= MQTTVERSION_5)
|
||
|
{
|
||
|
MQTTProperties props = MQTTProperties_initializer;
|
||
|
|
||
|
pack->rc = MQTTREASONCODE_SUCCESS;
|
||
|
pack->properties = props;
|
||
|
|
||
|
/* disconnect has no msgid */
|
||
|
if (datalen > 2 || (pack->header.bits.type == DISCONNECT && datalen > 0))
|
||
|
pack->rc = readChar(&curdata); /* reason code */
|
||
|
|
||
|
if (datalen > 3 || (pack->header.bits.type == DISCONNECT && datalen > 1))
|
||
|
{
|
||
|
if (MQTTProperties_read(&pack->properties, &curdata, enddata) != 1)
|
||
|
{
|
||
|
if (pack->properties.array)
|
||
|
free(pack->properties.array);
|
||
|
if (pack)
|
||
|
free(pack);
|
||
|
pack = NULL; /* signal protocol error */
|
||
|
goto exit;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
exit:
|
||
|
FUNC_EXIT;
|
||
|
return pack;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Send an MQTT PUBLISH packet down a socket.
|
||
|
* @param pack a structure from which to get some values to use, e.g topic, payload
|
||
|
* @param dup boolean - whether to set the MQTT DUP flag
|
||
|
* @param qos the value to use for the MQTT QoS setting
|
||
|
* @param retained boolean - whether to set the MQTT retained flag
|
||
|
* @param socket the open socket to send the data to
|
||
|
* @param clientID the string client identifier, only used for tracing
|
||
|
* @return the completion code (e.g. TCPSOCKET_COMPLETE)
|
||
|
*/
|
||
|
int MQTTPacket_send_publish(Publish* pack, int dup, int qos, int retained, networkHandles* net, const char* clientID)
|
||
|
{
|
||
|
Header header;
|
||
|
char *topiclen;
|
||
|
int rc = SOCKET_ERROR;
|
||
|
|
||
|
FUNC_ENTRY;
|
||
|
topiclen = malloc(2);
|
||
|
if (topiclen == NULL)
|
||
|
goto exit;
|
||
|
|
||
|
header.bits.type = PUBLISH;
|
||
|
header.bits.dup = dup;
|
||
|
header.bits.qos = qos;
|
||
|
header.bits.retain = retained;
|
||
|
if (qos > 0 || pack->MQTTVersion >= 5)
|
||
|
{
|
||
|
int buflen = ((qos > 0) ? 2 : 0) + ((pack->MQTTVersion >= 5) ? MQTTProperties_len(&pack->properties) : 0);
|
||
|
char *ptr = NULL;
|
||
|
char* bufs[4] = {topiclen, pack->topic, NULL, pack->payload};
|
||
|
size_t lens[4] = {2, strlen(pack->topic), buflen, pack->payloadlen};
|
||
|
int frees[4] = {1, 0, 1, 0};
|
||
|
PacketBuffers packetbufs = {4, bufs, lens, frees, {pack->mask[0], pack->mask[1], pack->mask[2], pack->mask[3]}};
|
||
|
|
||
|
bufs[2] = ptr = malloc(buflen);
|
||
|
if (ptr == NULL)
|
||
|
goto exit_free;
|
||
|
if (qos > 0)
|
||
|
writeInt(&ptr, pack->msgId);
|
||
|
if (pack->MQTTVersion >= 5)
|
||
|
MQTTProperties_write(&ptr, &pack->properties);
|
||
|
|
||
|
ptr = topiclen;
|
||
|
writeInt(&ptr, (int)lens[1]);
|
||
|
rc = MQTTPacket_sends(net, header, &packetbufs, pack->MQTTVersion);
|
||
|
if (rc != TCPSOCKET_INTERRUPTED)
|
||
|
free(bufs[2]);
|
||
|
memcpy(pack->mask, packetbufs.mask, sizeof(pack->mask));
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
char* ptr = topiclen;
|
||
|
char* bufs[3] = {topiclen, pack->topic, pack->payload};
|
||
|
size_t lens[3] = {2, strlen(pack->topic), pack->payloadlen};
|
||
|
int frees[3] = {1, 0, 0};
|
||
|
PacketBuffers packetbufs = {3, bufs, lens, frees, {pack->mask[0], pack->mask[1], pack->mask[2], pack->mask[3]}};
|
||
|
|
||
|
writeInt(&ptr, (int)lens[1]);
|
||
|
rc = MQTTPacket_sends(net, header, &packetbufs, pack->MQTTVersion);
|
||
|
memcpy(pack->mask, packetbufs.mask, sizeof(pack->mask));
|
||
|
}
|
||
|
if (qos == 0)
|
||
|
Log(LOG_PROTOCOL, 27, NULL, net->socket, clientID, retained, rc, pack->payloadlen,
|
||
|
min(20, pack->payloadlen), pack->payload);
|
||
|
else
|
||
|
Log(LOG_PROTOCOL, 10, NULL, net->socket, clientID, pack->msgId, qos, retained, rc, pack->payloadlen,
|
||
|
min(20, pack->payloadlen), pack->payload);
|
||
|
exit_free:
|
||
|
if (rc != TCPSOCKET_INTERRUPTED)
|
||
|
free(topiclen);
|
||
|
exit:
|
||
|
FUNC_EXIT_RC(rc);
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Free allocated storage for a various packet tyoes
|
||
|
* @param pack pointer to the suback packet structure
|
||
|
*/
|
||
|
void MQTTPacket_free_packet(MQTTPacket* pack)
|
||
|
{
|
||
|
FUNC_ENTRY;
|
||
|
if (pack->header.bits.type == PUBLISH)
|
||
|
MQTTPacket_freePublish((Publish*)pack);
|
||
|
/*else if (pack->header.type == SUBSCRIBE)
|
||
|
MQTTPacket_freeSubscribe((Subscribe*)pack, 1);
|
||
|
else if (pack->header.type == UNSUBSCRIBE)
|
||
|
MQTTPacket_freeUnsubscribe((Unsubscribe*)pack);*/
|
||
|
else
|
||
|
free(pack);
|
||
|
FUNC_EXIT;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Writes an integer as 4 bytes to an output buffer.
|
||
|
* @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
|
||
|
* @param anInt the integer to write
|
||
|
*/
|
||
|
void writeInt4(char** pptr, unsigned int anInt)
|
||
|
{
|
||
|
**pptr = (char)(anInt / 16777216);
|
||
|
(*pptr)++;
|
||
|
anInt %= 16777216;
|
||
|
**pptr = (char)(anInt / 65536);
|
||
|
(*pptr)++;
|
||
|
anInt %= 65536;
|
||
|
**pptr = (char)(anInt / 256);
|
||
|
(*pptr)++;
|
||
|
**pptr = (char)(anInt % 256);
|
||
|
(*pptr)++;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Calculates an integer from two bytes read from the input buffer
|
||
|
* @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
|
||
|
* @return the integer value calculated
|
||
|
*/
|
||
|
unsigned int readInt4(char** pptr)
|
||
|
{
|
||
|
unsigned char* ptr = (unsigned char*)*pptr;
|
||
|
unsigned int value = 16777216*(*ptr) + 65536*(*(ptr+1)) + 256*(*(ptr+2)) + (*(ptr+3));
|
||
|
*pptr += 4;
|
||
|
return value;
|
||
|
}
|
||
|
|
||
|
|
||
|
void writeMQTTLenString(char** pptr, MQTTLenString lenstring)
|
||
|
{
|
||
|
writeInt(pptr, lenstring.len);
|
||
|
memcpy(*pptr, lenstring.data, lenstring.len);
|
||
|
*pptr += lenstring.len;
|
||
|
}
|
||
|
|
||
|
|
||
|
int MQTTLenStringRead(MQTTLenString* lenstring, char** pptr, char* enddata)
|
||
|
{
|
||
|
int len = -1;
|
||
|
|
||
|
/* the first two bytes are the length of the string */
|
||
|
if (enddata - (*pptr) > 1) /* enough length to read the integer? */
|
||
|
{
|
||
|
lenstring->len = readInt(pptr); /* increments pptr to point past length */
|
||
|
if (&(*pptr)[lenstring->len] <= enddata)
|
||
|
{
|
||
|
lenstring->data = (char*)*pptr;
|
||
|
*pptr += lenstring->len;
|
||
|
len = 2 + lenstring->len;
|
||
|
}
|
||
|
}
|
||
|
return len;
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
if (prop->value.integer4 >= 0 && prop->value.integer4 <= 127)
|
||
|
len = 1;
|
||
|
else if (prop->value.integer4 >= 128 && prop->value.integer4 <= 16383)
|
||
|
len = 2;
|
||
|
else if (prop->value.integer4 >= 16384 && prop->value.integer4 < 2097151)
|
||
|
len = 3;
|
||
|
else if (prop->value.integer4 >= 2097152 && prop->value.integer4 < 268435455)
|
||
|
len = 4;
|
||
|
*/
|
||
|
int MQTTPacket_VBIlen(int rem_len)
|
||
|
{
|
||
|
int rc = 0;
|
||
|
|
||
|
if (rem_len < 128)
|
||
|
rc = 1;
|
||
|
else if (rem_len < 16384)
|
||
|
rc = 2;
|
||
|
else if (rem_len < 2097152)
|
||
|
rc = 3;
|
||
|
else
|
||
|
rc = 4;
|
||
|
return rc;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Decodes the message length according to the MQTT algorithm
|
||
|
* @param getcharfn pointer to function to read the next character from the data source
|
||
|
* @param value the decoded length returned
|
||
|
* @return the number of bytes read from the socket
|
||
|
*/
|
||
|
int MQTTPacket_VBIdecode(int (*getcharfn)(char*, int), unsigned int* value)
|
||
|
{
|
||
|
char c;
|
||
|
int multiplier = 1;
|
||
|
int len = 0;
|
||
|
#define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
|
||
|
|
||
|
*value = 0;
|
||
|
do
|
||
|
{
|
||
|
int rc = MQTTPACKET_READ_ERROR;
|
||
|
|
||
|
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
|
||
|
{
|
||
|
rc = MQTTPACKET_READ_ERROR; /* bad data */
|
||
|
goto exit;
|
||
|
}
|
||
|
rc = (*getcharfn)(&c, 1);
|
||
|
if (rc != 1)
|
||
|
goto exit;
|
||
|
*value += (c & 127) * multiplier;
|
||
|
multiplier *= 128;
|
||
|
} while ((c & 128) != 0);
|
||
|
exit:
|
||
|
return len;
|
||
|
}
|
||
|
|
||
|
|
||
|
static char* bufptr;
|
||
|
|
||
|
int bufchar(char* c, int count)
|
||
|
{
|
||
|
int i;
|
||
|
|
||
|
for (i = 0; i < count; ++i)
|
||
|
*c = *bufptr++;
|
||
|
return count;
|
||
|
}
|
||
|
|
||
|
|
||
|
int MQTTPacket_decodeBuf(char* buf, unsigned int* value)
|
||
|
{
|
||
|
bufptr = buf;
|
||
|
return MQTTPacket_VBIdecode(bufchar, value);
|
||
|
}
|
||
|
|