922 lines
25 KiB
C
922 lines
25 KiB
C
/*******************************************************************************
|
|
* Copyright (c) 2009, 2023 IBM Corp.
|
|
*
|
|
* All rights reserved. This program and the accompanying materials
|
|
* are made available under the terms of the Eclipse Public License v2.0
|
|
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
|
*
|
|
* The Eclipse Public License is available at
|
|
* https://www.eclipse.org/legal/epl-2.0/
|
|
* and the Eclipse Distribution License is available at
|
|
* http://www.eclipse.org/org/documents/edl-v10.php.
|
|
*
|
|
* Contributors:
|
|
* Ian Craggs - initial API and implementation and/or initial documentation
|
|
* Ian Craggs - async client updates
|
|
* Ian Craggs - fix for bug 432903 - queue persistence
|
|
* Ian Craggs - MQTT V5 updates
|
|
*******************************************************************************/
|
|
|
|
/**
|
|
* @file
|
|
* \brief Functions that apply to persistence operations.
|
|
*
|
|
*/
|
|
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
|
|
#include "MQTTPersistence.h"
|
|
#include "MQTTPersistenceDefault.h"
|
|
#include "MQTTProtocolClient.h"
|
|
#include "Heap.h"
|
|
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
#define snprintf _snprintf
|
|
#endif
|
|
|
|
|
|
static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion);
|
|
static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size);
|
|
|
|
/**
|
|
* Creates a ::MQTTClient_persistence structure representing a persistence implementation.
|
|
* @param persistence the ::MQTTClient_persistence structure.
|
|
* @param type the type of the persistence implementation. See ::MQTTClient_create.
|
|
* @param pcontext the context for this persistence implementation. See ::MQTTClient_create.
|
|
* @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
|
|
*/
|
|
#include "StackTrace.h"
|
|
|
|
int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, void* pcontext)
|
|
{
|
|
int rc = 0;
|
|
MQTTClient_persistence* per = NULL;
|
|
|
|
FUNC_ENTRY;
|
|
#if !defined(NO_PERSISTENCE)
|
|
switch (type)
|
|
{
|
|
case MQTTCLIENT_PERSISTENCE_NONE :
|
|
per = NULL;
|
|
break;
|
|
case MQTTCLIENT_PERSISTENCE_DEFAULT :
|
|
per = malloc(sizeof(MQTTClient_persistence));
|
|
if ( per != NULL )
|
|
{
|
|
if ( pcontext == NULL )
|
|
pcontext = "."; /* working directory */
|
|
if ((per->context = malloc(strlen(pcontext) + 1)) == NULL)
|
|
{
|
|
free(per);
|
|
rc = PAHO_MEMORY_ERROR;
|
|
goto exit;
|
|
}
|
|
strcpy(per->context, pcontext);
|
|
/* file system functions */
|
|
per->popen = pstopen;
|
|
per->pclose = pstclose;
|
|
per->pput = pstput;
|
|
per->pget = pstget;
|
|
per->premove = pstremove;
|
|
per->pkeys = pstkeys;
|
|
per->pclear = pstclear;
|
|
per->pcontainskey = pstcontainskey;
|
|
}
|
|
else
|
|
rc = PAHO_MEMORY_ERROR;
|
|
break;
|
|
case MQTTCLIENT_PERSISTENCE_USER :
|
|
per = (MQTTClient_persistence *)pcontext;
|
|
if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL ||
|
|
per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL ||
|
|
per->popen == NULL || per->pput == NULL || per->premove == NULL)) )
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
break;
|
|
default:
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
break;
|
|
}
|
|
#endif
|
|
|
|
*persistence = per;
|
|
exit:
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Open persistent store and restore any persisted messages.
|
|
* @param client the client as ::Clients.
|
|
* @param serverURI the URI of the remote end.
|
|
* @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
|
|
*/
|
|
int MQTTPersistence_initialize(Clients *c, const char *serverURI)
|
|
{
|
|
int rc = 0;
|
|
|
|
FUNC_ENTRY;
|
|
if ( c->persistence != NULL )
|
|
{
|
|
rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context);
|
|
if ( rc == 0 )
|
|
rc = MQTTPersistence_restorePackets(c);
|
|
}
|
|
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Close persistent store.
|
|
* @param client the client as ::Clients.
|
|
* @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
|
|
*/
|
|
int MQTTPersistence_close(Clients *c)
|
|
{
|
|
int rc = 0;
|
|
|
|
FUNC_ENTRY;
|
|
#if !defined(NO_PERSISTENCE)
|
|
if (c->persistence != NULL)
|
|
{
|
|
rc = c->persistence->pclose(c->phandle);
|
|
|
|
if (c->persistence->popen == pstopen) {
|
|
if (c->persistence->context)
|
|
free(c->persistence->context);
|
|
free(c->persistence);
|
|
}
|
|
|
|
c->phandle = NULL;
|
|
c->persistence = NULL;
|
|
}
|
|
#endif
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
/**
|
|
* Clears the persistent store.
|
|
* @param client the client as ::Clients.
|
|
* @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
|
|
*/
|
|
int MQTTPersistence_clear(Clients *c)
|
|
{
|
|
int rc = 0;
|
|
|
|
FUNC_ENTRY;
|
|
if (c->persistence != NULL)
|
|
rc = c->persistence->pclear(c->phandle);
|
|
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Restores the persisted records to the outbound and inbound message queues of the
|
|
* client.
|
|
* @param client the client as ::Clients.
|
|
* @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
|
|
*/
|
|
int MQTTPersistence_restorePackets(Clients *c)
|
|
{
|
|
int rc = 0;
|
|
char **msgkeys = NULL,
|
|
*buffer = NULL;
|
|
int nkeys, buflen;
|
|
int i = 0;
|
|
int msgs_sent = 0;
|
|
int msgs_rcvd = 0;
|
|
|
|
FUNC_ENTRY;
|
|
if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
|
|
{
|
|
while (rc == 0 && i < nkeys)
|
|
{
|
|
if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0 ||
|
|
strncmp(msgkeys[i], PERSISTENCE_V5_COMMAND_KEY, strlen(PERSISTENCE_V5_COMMAND_KEY)) == 0)
|
|
{
|
|
;
|
|
}
|
|
else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0 ||
|
|
strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
|
|
{
|
|
;
|
|
}
|
|
else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
|
|
(c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
|
|
{
|
|
int data_MQTTVersion = MQTTVERSION_3_1_1;
|
|
char* cur_key = msgkeys[i];
|
|
MQTTPacket* pack = NULL;
|
|
|
|
if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_RECEIVED,
|
|
strlen(PERSISTENCE_V5_PUBLISH_RECEIVED)) == 0)
|
|
{
|
|
data_MQTTVersion = MQTTVERSION_5;
|
|
cur_key = PERSISTENCE_PUBLISH_RECEIVED;
|
|
}
|
|
else if (strncmp(cur_key, PERSISTENCE_V5_PUBLISH_SENT,
|
|
strlen(PERSISTENCE_V5_PUBLISH_SENT)) == 0)
|
|
{
|
|
data_MQTTVersion = MQTTVERSION_5;
|
|
cur_key = PERSISTENCE_PUBLISH_SENT;
|
|
}
|
|
else if (strncmp(cur_key, PERSISTENCE_V5_PUBREL,
|
|
strlen(PERSISTENCE_V5_PUBREL)) == 0)
|
|
{
|
|
data_MQTTVersion = MQTTVERSION_5;
|
|
cur_key = PERSISTENCE_PUBREL;
|
|
}
|
|
|
|
if (data_MQTTVersion == MQTTVERSION_5 && c->MQTTVersion < MQTTVERSION_5)
|
|
{
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR; /* can't restore version 5 data with a version 3 client */
|
|
goto exit;
|
|
}
|
|
|
|
pack = MQTTPersistence_restorePacket(data_MQTTVersion, buffer, buflen);
|
|
if ( pack != NULL )
|
|
{
|
|
if (strncmp(cur_key, PERSISTENCE_PUBLISH_RECEIVED,
|
|
strlen(PERSISTENCE_PUBLISH_RECEIVED)) == 0)
|
|
{
|
|
Publish* publish = (Publish*)pack;
|
|
Messages* msg = NULL;
|
|
publish->MQTTVersion = c->MQTTVersion;
|
|
msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
|
|
msg->nextMessageType = PUBREL;
|
|
/* order does not matter for persisted received messages */
|
|
ListAppend(c->inboundMsgs, msg, msg->len);
|
|
if (c->MQTTVersion >= MQTTVERSION_5)
|
|
{
|
|
free(msg->publish->payload);
|
|
free(msg->publish->topic);
|
|
msg->publish->payload = msg->publish->topic = NULL;
|
|
}
|
|
publish->topic = NULL;
|
|
MQTTPacket_freePublish(publish);
|
|
msgs_rcvd++;
|
|
}
|
|
else if (strncmp(cur_key, PERSISTENCE_PUBLISH_SENT,
|
|
strlen(PERSISTENCE_PUBLISH_SENT)) == 0)
|
|
{
|
|
Publish* publish = (Publish*)pack;
|
|
Messages* msg = NULL;
|
|
const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
|
|
char *key = malloc(keysize);
|
|
int chars = 0;
|
|
|
|
if (!key)
|
|
{
|
|
rc = PAHO_MEMORY_ERROR;
|
|
goto exit;
|
|
}
|
|
publish->MQTTVersion = c->MQTTVersion;
|
|
if (publish->MQTTVersion >= MQTTVERSION_5)
|
|
chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBREL, publish->msgId);
|
|
else
|
|
chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBREL, publish->msgId);
|
|
if (chars >= keysize)
|
|
{
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
|
|
}
|
|
else
|
|
{
|
|
msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain, 1);
|
|
if (c->persistence->pcontainskey(c->phandle, key) == 0)
|
|
/* PUBLISH Qo2 and PUBREL sent */
|
|
msg->nextMessageType = PUBCOMP;
|
|
/* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */
|
|
/* retry at the first opportunity */
|
|
memset(&msg->lastTouch, '\0', sizeof(msg->lastTouch));
|
|
MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len);
|
|
publish->topic = NULL;
|
|
MQTTPacket_freePublish(publish);
|
|
msgs_sent++;
|
|
}
|
|
free(key);
|
|
}
|
|
else if (strncmp(cur_key, PERSISTENCE_PUBREL, strlen(PERSISTENCE_PUBREL)) == 0)
|
|
{
|
|
/* orphaned PUBRELs ? */
|
|
Pubrel* pubrel = (Pubrel*)pack;
|
|
const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
|
|
char *key = malloc(keysize);
|
|
int chars = 0;
|
|
|
|
if (!key)
|
|
{
|
|
rc = PAHO_MEMORY_ERROR;
|
|
goto exit;
|
|
}
|
|
pubrel->MQTTVersion = c->MQTTVersion;
|
|
if (pubrel->MQTTVersion >= MQTTVERSION_5)
|
|
chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, pubrel->msgId);
|
|
else
|
|
chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId);
|
|
if (chars >= keysize)
|
|
{
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
|
|
}
|
|
else if (c->persistence->pcontainskey(c->phandle, key) != 0)
|
|
rc = c->persistence->premove(c->phandle, msgkeys[i]);
|
|
free(pubrel);
|
|
free(key);
|
|
}
|
|
}
|
|
else /* pack == NULL -> bad persisted record */
|
|
rc = c->persistence->premove(c->phandle, msgkeys[i]);
|
|
}
|
|
if (buffer)
|
|
{
|
|
free(buffer);
|
|
buffer = NULL;
|
|
}
|
|
if (msgkeys[i])
|
|
{
|
|
free(msgkeys[i]);
|
|
msgkeys[i] = NULL;
|
|
}
|
|
i++;
|
|
}
|
|
}
|
|
Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n",
|
|
msgs_sent, msgs_rcvd, c->clientID);
|
|
MQTTPersistence_wrapMsgID(c);
|
|
exit:
|
|
if (msgkeys)
|
|
{
|
|
int i = 0;
|
|
for (i = 0; i < nkeys; ++i)
|
|
{
|
|
if (msgkeys[i])
|
|
free(msgkeys[i]);
|
|
}
|
|
free(msgkeys);
|
|
}
|
|
if (buffer)
|
|
free(buffer);
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Returns a MQTT packet restored from persisted data.
|
|
* @param buffer the persisted data.
|
|
* @param buflen the number of bytes of the data buffer.
|
|
*/
|
|
void* MQTTPersistence_restorePacket(int MQTTVersion, char* buffer, size_t buflen)
|
|
{
|
|
void* pack = NULL;
|
|
Header header;
|
|
int fixed_header_length = 1, ptype, remaining_length = 0;
|
|
char c;
|
|
int multiplier = 1;
|
|
extern pf new_packets[];
|
|
|
|
FUNC_ENTRY;
|
|
header.byte = buffer[0];
|
|
/* decode the message length according to the MQTT algorithm */
|
|
do
|
|
{
|
|
c = *(++buffer);
|
|
remaining_length += (c & 127) * multiplier;
|
|
multiplier *= 128;
|
|
fixed_header_length++;
|
|
} while ((c & 128) != 0);
|
|
|
|
if ( (fixed_header_length + remaining_length) == buflen )
|
|
{
|
|
ptype = header.bits.type;
|
|
if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL)
|
|
pack = (*new_packets[ptype])(MQTTVersion, header.byte, ++buffer, remaining_length);
|
|
}
|
|
|
|
FUNC_EXIT;
|
|
return pack;
|
|
}
|
|
|
|
|
|
/**
|
|
* Inserts the specified message into the list, maintaining message ID order.
|
|
* @param list the list to insert the message into.
|
|
* @param content the message to add.
|
|
* @param size size of the message.
|
|
*/
|
|
void MQTTPersistence_insertInOrder(List* list, void* content, size_t size)
|
|
{
|
|
ListElement* index = NULL;
|
|
ListElement* current = NULL;
|
|
|
|
FUNC_ENTRY;
|
|
while(ListNextElement(list, ¤t) != NULL && index == NULL)
|
|
{
|
|
if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid )
|
|
index = current;
|
|
}
|
|
|
|
ListInsert(list, content, size, index);
|
|
FUNC_EXIT;
|
|
}
|
|
|
|
|
|
/**
|
|
* Adds a record to the persistent store. This function must not be called for QoS0
|
|
* messages.
|
|
* @param socket the socket of the client.
|
|
* @param buf0 fixed header.
|
|
* @param buf0len length of the fixed header.
|
|
* @param count number of buffers representing the variable header and/or the payload.
|
|
* @param buffers the buffers representing the variable header and/or the payload.
|
|
* @param buflens length of the buffers representing the variable header and/or the payload.
|
|
* @param htype MQTT packet type - PUBLISH or PUBREL
|
|
* @param msgId the message ID.
|
|
* @param scr 0 indicates message in the sending direction; 1 indicates message in the
|
|
* receiving direction.
|
|
* @param the MQTT version being used (>= MQTTVERSION_5 means properties included)
|
|
* @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
|
|
*/
|
|
int MQTTPersistence_putPacket(SOCKET socket, char* buf0, size_t buf0len, int count,
|
|
char** buffers, size_t* buflens, int htype, int msgId, int scr, int MQTTVersion)
|
|
{
|
|
int rc = 0;
|
|
extern ClientStates* bstate;
|
|
int nbufs, i;
|
|
int* lens = NULL;
|
|
char** bufs = NULL;
|
|
char *key;
|
|
Clients* client = NULL;
|
|
|
|
FUNC_ENTRY;
|
|
client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content);
|
|
if (client->persistence != NULL)
|
|
{
|
|
const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
|
|
if ((key = malloc(keysize)) == NULL)
|
|
{
|
|
rc = PAHO_MEMORY_ERROR;
|
|
goto exit;
|
|
}
|
|
nbufs = 1 + count;
|
|
if ((lens = (int *)malloc(nbufs * sizeof(int))) == NULL)
|
|
{
|
|
free(key);
|
|
rc = PAHO_MEMORY_ERROR;
|
|
goto exit;
|
|
}
|
|
if ((bufs = (char **)malloc(nbufs * sizeof(char *))) == NULL)
|
|
{
|
|
free(key);
|
|
free(lens);
|
|
rc = PAHO_MEMORY_ERROR;
|
|
goto exit;
|
|
}
|
|
lens[0] = (int)buf0len;
|
|
bufs[0] = buf0;
|
|
for (i = 0; i < count; i++)
|
|
{
|
|
lens[i+1] = (int)buflens[i];
|
|
bufs[i+1] = buffers[i];
|
|
}
|
|
|
|
/* key */
|
|
if (scr == 0)
|
|
{ /* sending */
|
|
char* key_id = PERSISTENCE_PUBLISH_SENT;
|
|
|
|
if (htype == PUBLISH) /* PUBLISH QoS1 and QoS2*/
|
|
{
|
|
if (MQTTVersion >= MQTTVERSION_5)
|
|
key_id = PERSISTENCE_V5_PUBLISH_SENT;
|
|
}
|
|
else if (htype == PUBREL) /* PUBREL */
|
|
{
|
|
if (MQTTVersion >= MQTTVERSION_5)
|
|
key_id = PERSISTENCE_V5_PUBREL;
|
|
else
|
|
key_id = PERSISTENCE_PUBREL;
|
|
}
|
|
if (snprintf(key, keysize, "%s%d", key_id, msgId) >= keysize)
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
}
|
|
else if (scr == 1) /* receiving PUBLISH QoS2 */
|
|
{
|
|
char* key_id = PERSISTENCE_PUBLISH_RECEIVED;
|
|
|
|
if (MQTTVersion >= MQTTVERSION_5)
|
|
key_id = PERSISTENCE_V5_PUBLISH_RECEIVED;
|
|
if (snprintf(key, keysize, "%s%d", key_id, msgId) >= keysize)
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
}
|
|
|
|
if (rc == 0 && client->beforeWrite)
|
|
rc = client->beforeWrite(client->beforeWrite_context, nbufs, bufs, lens);
|
|
|
|
if (rc == 0)
|
|
rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens);
|
|
|
|
free(key);
|
|
free(lens);
|
|
free(bufs);
|
|
}
|
|
|
|
exit:
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Deletes a record from the persistent store.
|
|
* @param client the client as ::Clients.
|
|
* @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, #PERSISTENCE_PUBREL
|
|
* or #PERSISTENCE_PUBLISH_RECEIVED.
|
|
* @param qos the qos field of the message.
|
|
* @param msgId the message ID.
|
|
* @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise.
|
|
*/
|
|
int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId)
|
|
{
|
|
int rc = 0;
|
|
|
|
FUNC_ENTRY;
|
|
if (c->persistence != NULL)
|
|
{
|
|
const size_t keysize = PERSISTENCE_MAX_KEY_LENGTH + 1;
|
|
char *key = malloc(keysize);
|
|
int chars = 0;
|
|
|
|
if (!key)
|
|
{
|
|
rc = PAHO_MEMORY_ERROR;
|
|
goto exit;
|
|
}
|
|
if (strcmp(type, PERSISTENCE_PUBLISH_SENT) == 0 ||
|
|
strcmp(type, PERSISTENCE_V5_PUBLISH_SENT) == 0)
|
|
{
|
|
if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_SENT, msgId)) >= keysize)
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
else
|
|
{
|
|
rc = c->persistence->premove(c->phandle, key);
|
|
if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBREL, msgId)) >= keysize)
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
else
|
|
{
|
|
rc += c->persistence->premove(c->phandle, key);
|
|
if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId)) >= keysize)
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
else
|
|
{
|
|
rc += c->persistence->premove(c->phandle, key);
|
|
if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBREL, msgId)) >= keysize)
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
else
|
|
rc += c->persistence->premove(c->phandle, key);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */
|
|
{ /* or PERSISTENCE_PUBLISH_RECEIVED */
|
|
|
|
if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_V5_PUBLISH_RECEIVED, msgId)) >= keysize)
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
else
|
|
{
|
|
rc = c->persistence->premove(c->phandle, key);
|
|
if ((chars = snprintf(key, keysize, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId)) >= keysize)
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
else
|
|
rc += c->persistence->premove(c->phandle, key);
|
|
}
|
|
}
|
|
if (rc == MQTTCLIENT_PERSISTENCE_ERROR)
|
|
Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
|
|
free(key);
|
|
}
|
|
|
|
exit:
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* Checks whether the message IDs wrapped by looking for the largest gap between two consecutive
|
|
* message IDs in the outboundMsgs queue.
|
|
* @param client the client as ::Clients.
|
|
*/
|
|
void MQTTPersistence_wrapMsgID(Clients *client)
|
|
{
|
|
ListElement* wrapel = NULL;
|
|
ListElement* current = NULL;
|
|
|
|
FUNC_ENTRY;
|
|
if ( client->outboundMsgs->count > 0 )
|
|
{
|
|
int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid;
|
|
int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid;
|
|
int gap = MAX_MSG_ID - lastMsgID + firstMsgID;
|
|
current = ListNextElement(client->outboundMsgs, ¤t);
|
|
|
|
while(ListNextElement(client->outboundMsgs, ¤t) != NULL)
|
|
{
|
|
int curMsgID = ((Messages*)current->content)->msgid;
|
|
int curPrevMsgID = ((Messages*)current->prev->content)->msgid;
|
|
int curgap = curMsgID - curPrevMsgID;
|
|
if ( curgap > gap )
|
|
{
|
|
gap = curgap;
|
|
wrapel = current;
|
|
}
|
|
}
|
|
}
|
|
|
|
if ( wrapel != NULL )
|
|
{
|
|
/* put wrapel at the beginning of the queue */
|
|
client->outboundMsgs->first->prev = client->outboundMsgs->last;
|
|
client->outboundMsgs->last->next = client->outboundMsgs->first;
|
|
client->outboundMsgs->first = wrapel;
|
|
client->outboundMsgs->last = wrapel->prev;
|
|
client->outboundMsgs->first->prev = NULL;
|
|
client->outboundMsgs->last->next = NULL;
|
|
}
|
|
FUNC_EXIT;
|
|
}
|
|
|
|
|
|
#if !defined(NO_PERSISTENCE)
|
|
int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe)
|
|
{
|
|
int rc = 0;
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
#define KEYSIZE PERSISTENCE_MAX_KEY_LENGTH + 1
|
|
#else
|
|
const size_t KEYSIZE = PERSISTENCE_MAX_KEY_LENGTH + 1;
|
|
#endif
|
|
char key[KEYSIZE];
|
|
int chars = 0;
|
|
|
|
FUNC_ENTRY;
|
|
if (client->MQTTVersion >= MQTTVERSION_5)
|
|
chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_V5_QUEUE_KEY, qe->seqno);
|
|
else
|
|
chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno);
|
|
if (chars >= KEYSIZE)
|
|
{
|
|
Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
}
|
|
else if ((rc = client->persistence->premove(client->phandle, key)) != 0)
|
|
Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
#define MAX_NO_OF_BUFFERS 9
|
|
int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe)
|
|
{
|
|
int rc = 0;
|
|
int bufindex = 0;
|
|
#if !defined(_WIN32) && !defined(_WIN64)
|
|
const size_t KEYSIZE = PERSISTENCE_MAX_KEY_LENGTH + 1;
|
|
#endif
|
|
char key[KEYSIZE];
|
|
int chars = 0;
|
|
int lens[MAX_NO_OF_BUFFERS];
|
|
void* bufs[MAX_NO_OF_BUFFERS];
|
|
int props_allocated = 0;
|
|
|
|
FUNC_ENTRY;
|
|
bufs[bufindex] = &qe->msg->payloadlen;
|
|
lens[bufindex++] = sizeof(qe->msg->payloadlen);
|
|
|
|
bufs[bufindex] = qe->msg->payload;
|
|
lens[bufindex++] = qe->msg->payloadlen;
|
|
|
|
bufs[bufindex] = &qe->msg->qos;
|
|
lens[bufindex++] = sizeof(qe->msg->qos);
|
|
|
|
bufs[bufindex] = &qe->msg->retained;
|
|
lens[bufindex++] = sizeof(qe->msg->retained);
|
|
|
|
bufs[bufindex] = &qe->msg->dup;
|
|
lens[bufindex++] = sizeof(qe->msg->dup);
|
|
|
|
bufs[bufindex] = &qe->msg->msgid;
|
|
lens[bufindex++] = sizeof(qe->msg->msgid);
|
|
|
|
bufs[bufindex] = qe->topicName;
|
|
lens[bufindex++] = (int)strlen(qe->topicName) + 1;
|
|
|
|
bufs[bufindex] = &qe->topicLen;
|
|
lens[bufindex++] = sizeof(qe->topicLen);
|
|
|
|
if (++aclient->qentry_seqno == PERSISTENCE_SEQNO_LIMIT)
|
|
aclient->qentry_seqno = 0;
|
|
|
|
if (aclient->MQTTVersion >= MQTTVERSION_5) /* persist properties */
|
|
{
|
|
MQTTProperties no_props = MQTTProperties_initializer;
|
|
MQTTProperties* props = &no_props;
|
|
int temp_len = 0;
|
|
char* ptr = NULL;
|
|
|
|
if (qe->msg->struct_version >= 1)
|
|
props = &qe->msg->properties;
|
|
|
|
temp_len = MQTTProperties_len(props);
|
|
ptr = bufs[bufindex] = malloc(temp_len);
|
|
if (!ptr)
|
|
{
|
|
rc = PAHO_MEMORY_ERROR;
|
|
goto exit;
|
|
}
|
|
props_allocated = bufindex;
|
|
rc = MQTTProperties_write(&ptr, props);
|
|
lens[bufindex++] = temp_len;
|
|
|
|
chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_V5_QUEUE_KEY, aclient->qentry_seqno);
|
|
}
|
|
else
|
|
chars = snprintf(key, KEYSIZE, "%s%u", PERSISTENCE_QUEUE_KEY, aclient->qentry_seqno);
|
|
|
|
if (chars >= KEYSIZE)
|
|
rc = MQTTCLIENT_PERSISTENCE_ERROR;
|
|
else
|
|
{
|
|
qe->seqno = aclient->qentry_seqno;
|
|
|
|
if (aclient->beforeWrite)
|
|
rc = aclient->beforeWrite(aclient->beforeWrite_context, bufindex, (char**)bufs, lens);
|
|
|
|
if (rc == 0 && (rc = aclient->persistence->pput(aclient->phandle, key, bufindex, (char**)bufs, lens)) != 0)
|
|
Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
|
|
}
|
|
if (props_allocated != 0)
|
|
free(bufs[props_allocated]);
|
|
|
|
exit:
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
|
|
|
|
static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen, int MQTTVersion)
|
|
{
|
|
MQTTPersistence_qEntry* qe = NULL;
|
|
char* ptr = buffer;
|
|
int data_size;
|
|
|
|
FUNC_ENTRY;
|
|
if ((qe = malloc(sizeof(MQTTPersistence_qEntry))) == NULL)
|
|
goto exit;
|
|
memset(qe, '\0', sizeof(MQTTPersistence_qEntry));
|
|
|
|
if ((qe->msg = malloc(sizeof(MQTTPersistence_message))) == NULL)
|
|
{
|
|
free(qe);
|
|
qe = NULL;
|
|
goto exit;
|
|
}
|
|
memset(qe->msg, '\0', sizeof(MQTTPersistence_message));
|
|
|
|
qe->msg->struct_version = 1;
|
|
|
|
qe->msg->payloadlen = *(int*)ptr;
|
|
ptr += sizeof(int);
|
|
|
|
data_size = qe->msg->payloadlen;
|
|
if ((qe->msg->payload = malloc(data_size)) == NULL)
|
|
{
|
|
free(qe->msg);
|
|
free(qe);
|
|
qe = NULL;
|
|
goto exit;
|
|
}
|
|
memcpy(qe->msg->payload, ptr, data_size);
|
|
ptr += data_size;
|
|
|
|
qe->msg->qos = *(int*)ptr;
|
|
ptr += sizeof(int);
|
|
|
|
qe->msg->retained = *(int*)ptr;
|
|
ptr += sizeof(int);
|
|
|
|
qe->msg->dup = *(int*)ptr;
|
|
ptr += sizeof(int);
|
|
|
|
qe->msg->msgid = *(int*)ptr;
|
|
ptr += sizeof(int);
|
|
|
|
data_size = (int)strlen(ptr) + 1;
|
|
if ((qe->topicName = malloc(data_size)) == NULL)
|
|
{
|
|
free(qe->msg->payload);
|
|
free(qe->msg);
|
|
free(qe);
|
|
qe = NULL;
|
|
goto exit;
|
|
}
|
|
strcpy(qe->topicName, ptr);
|
|
ptr += data_size;
|
|
|
|
qe->topicLen = *(int*)ptr;
|
|
ptr += sizeof(int);
|
|
|
|
if (MQTTVersion >= MQTTVERSION_5 &&
|
|
MQTTProperties_read(&qe->msg->properties, &ptr, buffer + buflen) != 1)
|
|
Log(LOG_ERROR, -1, "Error restoring properties from persistence");
|
|
|
|
exit:
|
|
FUNC_EXIT;
|
|
return qe;
|
|
}
|
|
|
|
|
|
static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size)
|
|
{
|
|
ListElement* index = NULL;
|
|
ListElement* current = NULL;
|
|
|
|
FUNC_ENTRY;
|
|
while (ListNextElement(list, ¤t) != NULL && index == NULL)
|
|
{
|
|
if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno)
|
|
index = current;
|
|
}
|
|
ListInsert(list, qEntry, size, index);
|
|
FUNC_EXIT;
|
|
}
|
|
|
|
|
|
/**
|
|
* Restores a queue of messages from persistence to memory
|
|
* @param c the client as ::Clients - the client object to restore the messages to
|
|
* @return return code, 0 if successful
|
|
*/
|
|
int MQTTPersistence_restoreMessageQueue(Clients* c)
|
|
{
|
|
int rc = 0;
|
|
char **msgkeys;
|
|
int nkeys;
|
|
int i = 0;
|
|
int entries_restored = 0;
|
|
|
|
FUNC_ENTRY;
|
|
if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
|
|
{
|
|
while (rc == 0 && i < nkeys)
|
|
{
|
|
char *buffer = NULL;
|
|
int buflen;
|
|
|
|
if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0 &&
|
|
strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) != 0)
|
|
{
|
|
; /* ignore if not a queue entry key */
|
|
}
|
|
else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0 &&
|
|
(c->afterRead == NULL || (rc = c->afterRead(c->afterRead_context, &buffer, &buflen)) == 0))
|
|
{
|
|
int MQTTVersion =
|
|
(strncmp(msgkeys[i], PERSISTENCE_V5_QUEUE_KEY, strlen(PERSISTENCE_V5_QUEUE_KEY)) == 0)
|
|
? MQTTVERSION_5 : MQTTVERSION_3_1_1;
|
|
MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen, MQTTVersion);
|
|
|
|
if (qe)
|
|
{
|
|
qe->seqno = atoi(strchr(msgkeys[i], '-')+1); /* key format is tag'-'seqno */
|
|
MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry));
|
|
c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
|
|
entries_restored++;
|
|
}
|
|
if (buffer)
|
|
free(buffer);
|
|
}
|
|
if (msgkeys[i])
|
|
{
|
|
free(msgkeys[i]);
|
|
}
|
|
i++;
|
|
}
|
|
if (msgkeys != NULL)
|
|
free(msgkeys);
|
|
}
|
|
Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
|
|
FUNC_EXIT_RC(rc);
|
|
return rc;
|
|
}
|
|
#endif
|