1/**
2 * @file
3 * MQTT client
4 *
5 * @defgroup mqtt MQTT client
6 * @ingroup apps
7 * @verbinclude mqtt_client.txt
8 */
9
10/*
11 * Copyright (c) 2016 Erik Andersson <erian747@gmail.com>
12 * All rights reserved.
13 *
14 * Redistribution and use in source and binary forms, with or without modification,
15 * are permitted provided that the following conditions are met:
16 *
17 * 1. Redistributions of source code must retain the above copyright notice,
18 *    this list of conditions and the following disclaimer.
19 * 2. Redistributions in binary form must reproduce the above copyright notice,
20 *    this list of conditions and the following disclaimer in the documentation
21 *    and/or other materials provided with the distribution.
22 * 3. The name of the author may not be used to endorse or promote products
23 *    derived from this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
26 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
27 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
28 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
30 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
33 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
34 * OF SUCH DAMAGE.
35 *
36 * This file is part of the lwIP TCP/IP stack
37 *
38 * Author: Erik Andersson <erian747@gmail.com>
39 *
40 *
41 * @todo:
42 * - Handle large outgoing payloads for PUBLISH messages
43 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
44 * - Add support for legacy MQTT protocol version
45 *
46 * Please coordinate changes and requests with Erik Andersson
47 * Erik Andersson <erian747@gmail.com>
48 *
49 */
50#include "lwip/apps/mqtt.h"
51#include "lwip/apps/mqtt_priv.h"
52#include "lwip/timeouts.h"
53#include "lwip/ip_addr.h"
54#include "lwip/mem.h"
55#include "lwip/err.h"
56#include "lwip/pbuf.h"
57#include "lwip/altcp.h"
58#include "lwip/altcp_tcp.h"
59#include "lwip/altcp_tls.h"
60#include <string.h>
61
62#if LWIP_TCP && LWIP_CALLBACK_API
63
64/**
65 * MQTT_DEBUG: Default is off.
66 */
67#if !defined MQTT_DEBUG || defined __DOXYGEN__
68#define MQTT_DEBUG                  LWIP_DBG_OFF
69#endif
70
71#define MQTT_DEBUG_TRACE        (MQTT_DEBUG | LWIP_DBG_TRACE)
72#define MQTT_DEBUG_STATE        (MQTT_DEBUG | LWIP_DBG_STATE)
73#define MQTT_DEBUG_WARN         (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
74#define MQTT_DEBUG_WARN_STATE   (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
75#define MQTT_DEBUG_SERIOUS      (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
76
77
78
79/**
80 * MQTT client connection states
81 */
82enum {
83  TCP_DISCONNECTED,
84  TCP_CONNECTING,
85  MQTT_CONNECTING,
86  MQTT_CONNECTED
87};
88
89/**
90 * MQTT control message types
91 */
92enum mqtt_message_type {
93  MQTT_MSG_TYPE_CONNECT = 1,
94  MQTT_MSG_TYPE_CONNACK = 2,
95  MQTT_MSG_TYPE_PUBLISH = 3,
96  MQTT_MSG_TYPE_PUBACK = 4,
97  MQTT_MSG_TYPE_PUBREC = 5,
98  MQTT_MSG_TYPE_PUBREL = 6,
99  MQTT_MSG_TYPE_PUBCOMP = 7,
100  MQTT_MSG_TYPE_SUBSCRIBE = 8,
101  MQTT_MSG_TYPE_SUBACK = 9,
102  MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
103  MQTT_MSG_TYPE_UNSUBACK = 11,
104  MQTT_MSG_TYPE_PINGREQ = 12,
105  MQTT_MSG_TYPE_PINGRESP = 13,
106  MQTT_MSG_TYPE_DISCONNECT = 14
107};
108
109/** Helpers to extract control packet type and qos from first byte in fixed header */
110#define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
111#define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
112
113/**
114 * MQTT connect flags, only used in CONNECT message
115 */
116enum mqtt_connect_flag {
117  MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
118  MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
119  MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
120  MQTT_CONNECT_FLAG_WILL = 1 << 2,
121  MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
122};
123
124
125static void mqtt_cyclic_timer(void *arg);
126
127#if defined(LWIP_DEBUG)
128static const char *const mqtt_message_type_str[15] = {
129  "UNDEFINED",
130  "CONNECT",
131  "CONNACK",
132  "PUBLISH",
133  "PUBACK",
134  "PUBREC",
135  "PUBREL",
136  "PUBCOMP",
137  "SUBSCRIBE",
138  "SUBACK",
139  "UNSUBSCRIBE",
140  "UNSUBACK",
141  "PINGREQ",
142  "PINGRESP",
143  "DISCONNECT"
144};
145
146/**
147 * Message type value to string
148 * @param msg_type see enum mqtt_message_type
149 *
150 * @return Control message type text string
151 */
152static const char *
153mqtt_msg_type_to_str(u8_t msg_type)
154{
155  if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
156    msg_type = 0;
157  }
158  return mqtt_message_type_str[msg_type];
159}
160
161#endif
162
163
164/**
165 * Generate MQTT packet identifier
166 * @param client MQTT client
167 * @return New packet identifier, range 1 to 65535
168 */
169static u16_t
170msg_generate_packet_id(mqtt_client_t *client)
171{
172  client->pkt_id_seq++;
173  if (client->pkt_id_seq == 0) {
174    client->pkt_id_seq++;
175  }
176  return client->pkt_id_seq;
177}
178
179/*--------------------------------------------------------------------------------------------------------------------- */
180/* Output ring buffer */
181
182/** Add single item to ring buffer */
183static void
184mqtt_ringbuf_put(struct mqtt_ringbuf_t *rb, u8_t item)
185{
186  rb->buf[rb->put] = item;
187  rb->put++;
188  if (rb->put >= MQTT_OUTPUT_RINGBUF_SIZE) {
189    rb->put = 0;
190  }
191}
192
193/** Return pointer to ring buffer get position */
194static u8_t *
195mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t *rb)
196{
197  return &rb->buf[rb->get];
198}
199
200static void
201mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t *rb, u16_t len)
202{
203  LWIP_ASSERT("mqtt_ringbuf_advance_get_idx: len < MQTT_OUTPUT_RINGBUF_SIZE", len < MQTT_OUTPUT_RINGBUF_SIZE);
204
205  rb->get += len;
206  if (rb->get >= MQTT_OUTPUT_RINGBUF_SIZE) {
207    rb->get = rb->get - MQTT_OUTPUT_RINGBUF_SIZE;
208  }
209}
210
211/** Return number of bytes in ring buffer */
212static u16_t
213mqtt_ringbuf_len(struct mqtt_ringbuf_t *rb)
214{
215  u32_t len = rb->put - rb->get;
216  if (len > 0xFFFF) {
217    len += MQTT_OUTPUT_RINGBUF_SIZE;
218  }
219  return (u16_t)len;
220}
221
222/** Return number of bytes free in ring buffer */
223#define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
224
225/** Return number of bytes possible to read without wrapping around */
226#define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - (rb)->get))
227
228/**
229 * Try send as many bytes as possible from output ring buffer
230 * @param rb Output ring buffer
231 * @param tpcb TCP connection handle
232 */
233static void
234mqtt_output_send(struct mqtt_ringbuf_t *rb, struct altcp_pcb *tpcb)
235{
236  err_t err;
237  u8_t wrap = 0;
238  u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
239  u16_t send_len = altcp_sndbuf(tpcb);
240  LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
241
242  if (send_len == 0 || ringbuf_lin_len == 0) {
243    return;
244  }
245
246  LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
247                                 send_len, ringbuf_lin_len, rb->get, rb->put));
248
249  if (send_len > ringbuf_lin_len) {
250    /* Space in TCP output buffer is larger than available in ring buffer linear portion */
251    send_len = ringbuf_lin_len;
252    /* Wrap around if more data in ring buffer after linear portion */
253    wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
254  }
255  err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
256  if ((err == ERR_OK) && wrap) {
257    mqtt_ringbuf_advance_get_idx(rb, send_len);
258    /* Use the lesser one of ring buffer linear length and TCP send buffer size */
259    send_len = LWIP_MIN(altcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
260    err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
261  }
262
263  if (err == ERR_OK) {
264    mqtt_ringbuf_advance_get_idx(rb, send_len);
265    /* Flush */
266    altcp_output(tpcb);
267  } else {
268    LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
269  }
270}
271
272
273
274/*--------------------------------------------------------------------------------------------------------------------- */
275/* Request queue */
276
277/**
278 * Create request item
279 * @param r_objs Pointer to request objects
280 * @param r_objs_len Number of array entries
281 * @param pkt_id Packet identifier of request
282 * @param cb Packet callback to call when requests lifetime ends
283 * @param arg Parameter following callback
284 * @return Request or NULL if failed to create
285 */
286static struct mqtt_request_t *
287mqtt_create_request(struct mqtt_request_t *r_objs, size_t r_objs_len, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
288{
289  struct mqtt_request_t *r = NULL;
290  u8_t n;
291  LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
292  for (n = 0; n < r_objs_len; n++) {
293    /* Item point to itself if not in use */
294    if (r_objs[n].next == &r_objs[n]) {
295      r = &r_objs[n];
296      r->next = NULL;
297      r->cb = cb;
298      r->arg = arg;
299      r->pkt_id = pkt_id;
300      break;
301    }
302  }
303  return r;
304}
305
306
307/**
308 * Append request to pending request queue
309 * @param tail Pointer to request queue tail pointer
310 * @param r Request to append
311 */
312static void
313mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
314{
315  struct mqtt_request_t *head = NULL;
316  s16_t time_before = 0;
317  struct mqtt_request_t *iter;
318
319  LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
320
321  /* Iterate trough queue to find head, and count total timeout time */
322  for (iter = *tail; iter != NULL; iter = iter->next) {
323    time_before += iter->timeout_diff;
324    head = iter;
325  }
326
327  LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
328  r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
329  if (head == NULL) {
330    *tail = r;
331  } else {
332    head->next = r;
333  }
334}
335
336
337/**
338 * Delete request item
339 * @param r Request item to delete
340 */
341static void
342mqtt_delete_request(struct mqtt_request_t *r)
343{
344  if (r != NULL) {
345    r->next = r;
346  }
347}
348
349/**
350 * Remove a request item with a specific packet identifier from request queue
351 * @param tail Pointer to request queue tail pointer
352 * @param pkt_id Packet identifier of request to take
353 * @return Request item if found, NULL if not
354 */
355static struct mqtt_request_t *
356mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
357{
358  struct mqtt_request_t *iter = NULL, *prev = NULL;
359  LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
360  /* Search all request for pkt_id */
361  for (iter = *tail; iter != NULL; iter = iter->next) {
362    if (iter->pkt_id == pkt_id) {
363      break;
364    }
365    prev = iter;
366  }
367
368  /* If request was found */
369  if (iter != NULL) {
370    /* unchain */
371    if (prev == NULL) {
372      *tail = iter->next;
373    } else {
374      prev->next = iter->next;
375    }
376    /* If exists, add remaining timeout time for the request to next */
377    if (iter->next != NULL) {
378      iter->next->timeout_diff += iter->timeout_diff;
379    }
380    iter->next = NULL;
381  }
382  return iter;
383}
384
385/**
386 * Handle requests timeout
387 * @param tail Pointer to request queue tail pointer
388 * @param t Time since last call in seconds
389 */
390static void
391mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
392{
393  struct mqtt_request_t *r;
394  LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
395  r = *tail;
396  while (t > 0 && r != NULL) {
397    if (t >= r->timeout_diff) {
398      t -= (u8_t)r->timeout_diff;
399      /* Unchain */
400      *tail = r->next;
401      /* Notify upper layer about timeout */
402      if (r->cb != NULL) {
403        r->cb(r->arg, ERR_TIMEOUT);
404      }
405      mqtt_delete_request(r);
406      /* Tail might be be modified in callback, so re-read it in every iteration */
407      r = *(struct mqtt_request_t *const volatile *)tail;
408    } else {
409      r->timeout_diff -= t;
410      t = 0;
411    }
412  }
413}
414
415/**
416 * Free all request items
417 * @param tail Pointer to request queue tail pointer
418 */
419static void
420mqtt_clear_requests(struct mqtt_request_t **tail)
421{
422  struct mqtt_request_t *iter, *next;
423  LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
424  for (iter = *tail; iter != NULL; iter = next) {
425    next = iter->next;
426    mqtt_delete_request(iter);
427  }
428  *tail = NULL;
429}
430/**
431 * Initialize all request items
432 * @param r_objs Pointer to request objects
433 * @param r_objs_len Number of array entries
434 */
435static void
436mqtt_init_requests(struct mqtt_request_t *r_objs, size_t r_objs_len)
437{
438  u8_t n;
439  LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
440  for (n = 0; n < r_objs_len; n++) {
441    /* Item pointing to itself indicates unused */
442    r_objs[n].next = &r_objs[n];
443  }
444}
445
446/*--------------------------------------------------------------------------------------------------------------------- */
447/* Output message build helpers */
448
449
450static void
451mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
452{
453  mqtt_ringbuf_put(rb, value);
454}
455
456static
457void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
458{
459  mqtt_ringbuf_put(rb, value >> 8);
460  mqtt_ringbuf_put(rb, value & 0xff);
461}
462
463static void
464mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
465{
466  u16_t n;
467  for (n = 0; n < length; n++) {
468    mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
469  }
470}
471
472static void
473mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
474{
475  u16_t n;
476  mqtt_ringbuf_put(rb, length >> 8);
477  mqtt_ringbuf_put(rb, length & 0xff);
478  for (n = 0; n < length; n++) {
479    mqtt_ringbuf_put(rb, str[n]);
480  }
481}
482
483/**
484 * Append fixed header
485 * @param rb Output ring buffer
486 * @param msg_type see enum mqtt_message_type
487 * @param fdup MQTT DUP flag
488 * @param fqos MQTT QoS field
489 * @param fretain MQTT retain flag
490 * @param r_length Remaining length after fixed header
491 */
492
493static void
494mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t fdup,
495                                u8_t fqos, u8_t fretain, u16_t r_length)
496{
497  /* Start with control byte */
498  mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((fdup & 1) << 3) | ((fqos & 3) << 1) | (fretain & 1)));
499  /* Encode remaining length field */
500  do {
501    mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
502    r_length >>= 7;
503  } while (r_length > 0);
504}
505
506
507/**
508 * Check output buffer space
509 * @param rb Output ring buffer
510 * @param r_length Remaining length after fixed header
511 * @return 1 if message will fit, 0 if not enough buffer space
512 */
513static u8_t
514mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
515{
516  /* Start with length of type byte + remaining length */
517  u16_t total_len = 1 + r_length;
518
519  LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
520
521  /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
522  do {
523    total_len++;
524    r_length >>= 7;
525  } while (r_length > 0);
526
527  return (total_len <= mqtt_ringbuf_free(rb));
528}
529
530
531/**
532 * Close connection to server
533 * @param client MQTT client
534 * @param reason Reason for disconnection
535 */
536static void
537mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
538{
539  LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
540
541  /* Bring down TCP connection if not already done */
542  if (client->conn != NULL) {
543    err_t res;
544    altcp_recv(client->conn, NULL);
545    altcp_err(client->conn,  NULL);
546    altcp_sent(client->conn, NULL);
547    res = altcp_close(client->conn);
548    if (res != ERR_OK) {
549      altcp_abort(client->conn);
550      LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_close: Close err=%s\n", lwip_strerr(res)));
551    }
552    client->conn = NULL;
553  }
554
555  /* Remove all pending requests */
556  mqtt_clear_requests(&client->pend_req_queue);
557  /* Stop cyclic timer */
558  sys_untimeout(mqtt_cyclic_timer, client);
559
560  /* Notify upper layer of disconnection if changed state */
561  if (client->conn_state != TCP_DISCONNECTED) {
562
563    client->conn_state = TCP_DISCONNECTED;
564    if (client->connect_cb != NULL) {
565      client->connect_cb(client, client->connect_arg, reason);
566    }
567  }
568}
569
570
571/**
572 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
573 * @param arg MQTT client
574 */
575static void
576mqtt_cyclic_timer(void *arg)
577{
578  u8_t restart_timer = 1;
579  mqtt_client_t *client = (mqtt_client_t *)arg;
580  LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
581
582  if (client->conn_state == MQTT_CONNECTING) {
583    client->cyclic_tick++;
584    if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
585      LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
586      /* Disconnect TCP */
587      mqtt_close(client, MQTT_CONNECT_TIMEOUT);
588      restart_timer = 0;
589    }
590  } else if (client->conn_state == MQTT_CONNECTED) {
591    /* Handle timeout for pending requests */
592    mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
593
594    /* keep_alive > 0 means keep alive functionality shall be used */
595    if (client->keep_alive > 0) {
596
597      client->server_watchdog++;
598      /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
599      if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive / 2)) {
600        LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
601        mqtt_close(client, MQTT_CONNECT_TIMEOUT);
602        restart_timer = 0;
603      }
604
605      /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
606      if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
607        LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
608        if (mqtt_output_check_space(&client->output, 0) != 0) {
609          mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
610          client->cyclic_tick = 0;
611        }
612      } else {
613        client->cyclic_tick++;
614      }
615    }
616  } else {
617    LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
618    restart_timer = 0;
619  }
620  if (restart_timer) {
621    sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, arg);
622  }
623}
624
625
626/**
627 * Send PUBACK, PUBREC or PUBREL response message
628 * @param client MQTT client
629 * @param msg PUBACK, PUBREC or PUBREL
630 * @param pkt_id Packet identifier
631 * @param qos QoS value
632 * @return ERR_OK if successful, ERR_MEM if out of memory
633 */
634static err_t
635pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
636{
637  err_t err = ERR_OK;
638  if (mqtt_output_check_space(&client->output, 2)) {
639    mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
640    mqtt_output_append_u16(&client->output, pkt_id);
641    mqtt_output_send(&client->output, client->conn);
642  } else {
643    LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
644                                   mqtt_msg_type_to_str(msg), pkt_id));
645    err = ERR_MEM;
646  }
647  return err;
648}
649
650/**
651 * Subscribe response from server
652 * @param r Matching request
653 * @param result Result code from server
654 */
655static void
656mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
657{
658  if (r->cb != NULL) {
659    r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
660  }
661}
662
663
664/**
665 * Complete MQTT message received or buffer full
666 * @param client MQTT client
667 * @param fixed_hdr_idx header index
668 * @param length length received part
669 * @param remaining_length Remaining length of complete message
670 */
671static mqtt_connection_status_t
672mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length)
673{
674  mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
675
676  u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx;
677  size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_idx;
678
679  /* Control packet type */
680  u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
681  u16_t pkt_id = 0;
682
683  LWIP_ASSERT("client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN", client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN);
684  LWIP_ASSERT("fixed_hdr_idx <= client->msg_idx", fixed_hdr_idx <= client->msg_idx);
685  LWIP_ERROR("buffer length mismatch", fixed_hdr_idx + length <= MQTT_VAR_HEADER_BUFFER_LEN,
686             return MQTT_CONNECT_DISCONNECTED);
687
688  if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
689    if (client->conn_state == MQTT_CONNECTING) {
690      if (length < 2) {
691        LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short CONNACK message\n"));
692        goto out_disconnect;
693      }
694      /* Get result code from CONNACK */
695      res = (mqtt_connection_status_t)var_hdr_payload[1];
696      LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: Connect response code %d\n", res));
697      if (res == MQTT_CONNECT_ACCEPTED) {
698        /* Reset cyclic_tick when changing to connected state */
699        client->cyclic_tick = 0;
700        client->conn_state = MQTT_CONNECTED;
701        /* Notify upper layer */
702        if (client->connect_cb != 0) {
703          client->connect_cb(client, client->connect_arg, res);
704        }
705      }
706    } else {
707      LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Received CONNACK in connected state\n"));
708    }
709  } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
710    LWIP_DEBUGF(MQTT_DEBUG_TRACE, ( "mqtt_message_received: Received PINGRESP from server\n"));
711
712  } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
713    u16_t payload_offset = 0;
714    u16_t payload_length = length;
715    u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
716
717    if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
718      /* Should have topic and pkt id*/
719      u8_t *topic;
720      u16_t after_topic;
721      u8_t bkp;
722      u16_t topic_len;
723      u16_t qos_len = (qos ? 2U : 0U);
724      if (length < 2 + qos_len) {
725        LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet\n"));
726        goto out_disconnect;
727      }
728      topic_len = var_hdr_payload[0];
729      topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
730      if ((topic_len > length - (2 + qos_len)) ||
731          (topic_len > var_hdr_payload_bufsize - (2 + qos_len))) {
732        LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (topic)\n"));
733        goto out_disconnect;
734      }
735
736      topic = var_hdr_payload + 2;
737      after_topic = 2 + topic_len;
738      /* Check buffer length, add one byte even for QoS 0 so that zero termination will fit */
739      if ((after_topic + (qos ? 2U : 1U)) > var_hdr_payload_bufsize) {
740        LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
741        goto out_disconnect;
742      }
743
744      /* id for QoS 1 and 2 */
745      if (qos > 0) {
746        if (length < after_topic + 2U) {
747          LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (after_topic)\n"));
748          goto out_disconnect;
749        }
750        client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
751        after_topic += 2;
752      } else {
753        client->inpub_pkt_id = 0;
754      }
755      /* Take backup of byte after topic */
756      bkp = topic[topic_len];
757      /* Zero terminate string */
758      topic[topic_len] = 0;
759      /* Payload data remaining in receive buffer */
760      payload_length = length - after_topic;
761      payload_offset = after_topic;
762
763      LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %"U32_F"\n",
764                                     qos, topic, remaining_length + payload_length));
765      if (client->pub_cb != NULL) {
766        client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
767      }
768      /* Restore byte after topic */
769      topic[topic_len] = bkp;
770    }
771    if (payload_length > 0 || remaining_length == 0) {
772      if (length < (size_t)(payload_offset + payload_length)) {
773        LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short packet (payload)\n"));
774        goto out_disconnect;
775      }
776      client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
777      /* Reply if QoS > 0 */
778      if (remaining_length == 0 && qos > 0) {
779        /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
780        u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
781        LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
782                                       mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
783        pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
784      }
785    }
786  } else {
787    /* Get packet identifier */
788    pkt_id = (u16_t)var_hdr_payload[0] << 8;
789    pkt_id |= (u16_t)var_hdr_payload[1];
790    if (pkt_id == 0) {
791      LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
792      goto out_disconnect;
793    }
794    if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
795      LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n", pkt_id));
796      pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
797
798    } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
799      LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n", pkt_id));
800      pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
801
802    } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
803               pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
804      struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
805      if (r != NULL) {
806        LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
807        if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
808          if (length < 3) {
809            LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: To small SUBACK packet\n"));
810            goto out_disconnect;
811          } else {
812            mqtt_incomming_suback(r, var_hdr_payload[2]);
813          }
814        } else if (r->cb != NULL) {
815          r->cb(r->arg, ERR_OK);
816        }
817        mqtt_delete_request(r);
818      } else {
819        LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
820      }
821    } else {
822      LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
823      goto out_disconnect;
824    }
825  }
826  return res;
827out_disconnect:
828  return MQTT_CONNECT_DISCONNECTED;
829}
830
831
832/**
833 * MQTT incoming message parser
834 * @param client MQTT client
835 * @param p PBUF chain of received data
836 * @return Connection status
837 */
838static mqtt_connection_status_t
839mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
840{
841  u16_t in_offset = 0;
842  u32_t msg_rem_len = 0;
843  u8_t fixed_hdr_idx = 0;
844  u8_t b = 0;
845
846  while (p->tot_len > in_offset) {
847    /* We ALWAYS parse the header here first. Even if the header was not
848       included in this segment, we re-parse it here by buffering it in
849       client->rx_buffer. client->msg_idx keeps track of this. */
850    if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) {
851
852      if (fixed_hdr_idx < client->msg_idx) {
853        /* parse header from old pbuf (buffered in client->rx_buffer) */
854        b = client->rx_buffer[fixed_hdr_idx];
855      } else {
856        /* parse header from this pbuf and save it in client->rx_buffer in case
857           it comes in segmented */
858        b = pbuf_get_at(p, in_offset++);
859        client->rx_buffer[client->msg_idx++] = b;
860      }
861      fixed_hdr_idx++;
862
863      if (fixed_hdr_idx >= 2) {
864        /* fixed header contains at least 2 bytes but can contain more, depending on
865           'remaining length'. All bytes but the last of this have 0x80 set to
866           indicate more bytes are coming. */
867        msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7);
868        if ((b & 0x80) == 0) {
869          /* fixed header is done */
870          LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len));
871          if (msg_rem_len == 0) {
872            /* Complete message with no extra headers of payload received */
873            mqtt_message_received(client, fixed_hdr_idx, 0, 0);
874            client->msg_idx = 0;
875            fixed_hdr_idx = 0;
876          } else {
877            /* Bytes remaining in message (changes remaining length if this is
878               not the first segment of this message) */
879            msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx;
880          }
881        }
882      }
883    } else {
884      /* Fixed header has been parsed, parse variable header */
885      u16_t cpy_len, cpy_start, buffer_space;
886
887      cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
888
889      /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
890      cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
891
892      /* Limit to available space in buffer */
893      buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start;
894      if (cpy_len > buffer_space) {
895        cpy_len = buffer_space;
896      }
897      pbuf_copy_partial(p, client->rx_buffer + cpy_start, cpy_len, in_offset);
898
899      /* Advance get and put indexes  */
900      client->msg_idx += cpy_len;
901      in_offset += cpy_len;
902      msg_rem_len -= cpy_len;
903
904      LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len));
905      if ((msg_rem_len == 0) || (cpy_len == buffer_space)) {
906        /* Whole message received or buffer is full */
907        mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len);
908        if (res != MQTT_CONNECT_ACCEPTED) {
909          return res;
910        }
911        if (msg_rem_len == 0) {
912          /* Reset parser state */
913          client->msg_idx = 0;
914          /* msg_tot_len = 0; */
915          fixed_hdr_idx = 0;
916        }
917      }
918    }
919  }
920  return MQTT_CONNECT_ACCEPTED;
921}
922
923
924/**
925 * TCP received callback function. @see tcp_recv_fn
926 * @param arg MQTT client
927 * @param p PBUF chain of received data
928 * @param err Passed as return value if not ERR_OK
929 * @return ERR_OK or err passed into callback
930 */
931static err_t
932mqtt_tcp_recv_cb(void *arg, struct altcp_pcb *pcb, struct pbuf *p, err_t err)
933{
934  mqtt_client_t *client = (mqtt_client_t *)arg;
935  LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
936  LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
937
938  if (p == NULL) {
939    LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
940    mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
941  } else {
942    mqtt_connection_status_t res;
943    if (err != ERR_OK) {
944      LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_recv_cb: Recv err=%d\n", err));
945      pbuf_free(p);
946      return err;
947    }
948
949    /* Tell remote that data has been received */
950    altcp_recved(pcb, p->tot_len);
951    res = mqtt_parse_incoming(client, p);
952    pbuf_free(p);
953
954    if (res != MQTT_CONNECT_ACCEPTED) {
955      mqtt_close(client, res);
956    }
957    /* If keep alive functionality is used */
958    if (client->keep_alive != 0) {
959      /* Reset server alive watchdog */
960      client->server_watchdog = 0;
961    }
962
963  }
964  return ERR_OK;
965}
966
967
968/**
969 * TCP data sent callback function. @see tcp_sent_fn
970 * @param arg MQTT client
971 * @param tpcb TCP connection handle
972 * @param len Number of bytes sent
973 * @return ERR_OK
974 */
975static err_t
976mqtt_tcp_sent_cb(void *arg, struct altcp_pcb *tpcb, u16_t len)
977{
978  mqtt_client_t *client = (mqtt_client_t *)arg;
979
980  LWIP_UNUSED_ARG(tpcb);
981  LWIP_UNUSED_ARG(len);
982
983  if (client->conn_state == MQTT_CONNECTED) {
984    struct mqtt_request_t *r;
985
986    /* Reset keep-alive send timer and server watchdog */
987    client->cyclic_tick = 0;
988    client->server_watchdog = 0;
989    /* QoS 0 publish has no response from server, so call its callbacks here */
990    while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
991      LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
992      if (r->cb != NULL) {
993        r->cb(r->arg, ERR_OK);
994      }
995      mqtt_delete_request(r);
996    }
997    /* Try send any remaining buffers from output queue */
998    mqtt_output_send(&client->output, client->conn);
999  }
1000  return ERR_OK;
1001}
1002
1003/**
1004 * TCP error callback function. @see tcp_err_fn
1005 * @param arg MQTT client
1006 * @param err Error encountered
1007 */
1008static void
1009mqtt_tcp_err_cb(void *arg, err_t err)
1010{
1011  mqtt_client_t *client = (mqtt_client_t *)arg;
1012  LWIP_UNUSED_ARG(err); /* only used for debug output */
1013  LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
1014  LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
1015  /* Set conn to null before calling close as pcb is already deallocated*/
1016  client->conn = 0;
1017  mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
1018}
1019
1020/**
1021 * TCP poll callback function. @see tcp_poll_fn
1022 * @param arg MQTT client
1023 * @param tpcb TCP connection handle
1024 * @return err ERR_OK
1025 */
1026static err_t
1027mqtt_tcp_poll_cb(void *arg, struct altcp_pcb *tpcb)
1028{
1029  mqtt_client_t *client = (mqtt_client_t *)arg;
1030  if (client->conn_state == MQTT_CONNECTED) {
1031    /* Try send any remaining buffers from output queue */
1032    mqtt_output_send(&client->output, tpcb);
1033  }
1034  return ERR_OK;
1035}
1036
1037/**
1038 * TCP connect callback function. @see tcp_connected_fn
1039 * @param arg MQTT client
1040 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
1041 * @return ERR_OK
1042 */
1043static err_t
1044mqtt_tcp_connect_cb(void *arg, struct altcp_pcb *tpcb, err_t err)
1045{
1046  mqtt_client_t *client = (mqtt_client_t *)arg;
1047
1048  if (err != ERR_OK) {
1049    LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
1050    return err;
1051  }
1052
1053  /* Initiate receiver state */
1054  client->msg_idx = 0;
1055
1056  /* Setup TCP callbacks */
1057  altcp_recv(tpcb, mqtt_tcp_recv_cb);
1058  altcp_sent(tpcb, mqtt_tcp_sent_cb);
1059  altcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
1060
1061  LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_connect_cb: TCP connection established to server\n"));
1062  /* Enter MQTT connect state */
1063  client->conn_state = MQTT_CONNECTING;
1064
1065  /* Start cyclic timer */
1066  sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, client);
1067  client->cyclic_tick = 0;
1068
1069  /* Start transmission from output queue, connect message is the first one out*/
1070  mqtt_output_send(&client->output, client->conn);
1071
1072  return ERR_OK;
1073}
1074
1075
1076
1077/*---------------------------------------------------------------------------------------------------- */
1078/* Public API */
1079
1080
1081/**
1082 * @ingroup mqtt
1083 * MQTT publish function.
1084 * @param client MQTT client
1085 * @param topic Publish topic string
1086 * @param payload Data to publish (NULL is allowed)
1087 * @param payload_length Length of payload (0 is allowed)
1088 * @param qos Quality of service, 0 1 or 2
1089 * @param retain MQTT retain flag
1090 * @param cb Callback to call when publish is complete or has timed out
1091 * @param arg User supplied argument to publish callback
1092 * @return ERR_OK if successful
1093 *         ERR_CONN if client is disconnected
1094 *         ERR_MEM if short on memory
1095 */
1096err_t
1097mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
1098             mqtt_request_cb_t cb, void *arg)
1099{
1100  struct mqtt_request_t *r;
1101  u16_t pkt_id;
1102  size_t topic_strlen;
1103  size_t total_len;
1104  u16_t topic_len;
1105  u16_t remaining_length;
1106
1107  LWIP_ASSERT_CORE_LOCKED();
1108  LWIP_ASSERT("mqtt_publish: client != NULL", client);
1109  LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
1110  LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
1111
1112  topic_strlen = strlen(topic);
1113  LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1114  topic_len = (u16_t)topic_strlen;
1115  total_len = 2 + topic_len + payload_length;
1116
1117  if (qos > 0) {
1118    total_len += 2;
1119    /* Generate pkt_id id for QoS1 and 2 */
1120    pkt_id = msg_generate_packet_id(client);
1121  } else {
1122    /* Use reserved value pkt_id 0 for QoS 0 in request handle */
1123    pkt_id = 0;
1124  }
1125  LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1126  remaining_length = (u16_t)total_len;
1127
1128  LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
1129
1130  r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg);
1131  if (r == NULL) {
1132    return ERR_MEM;
1133  }
1134
1135  if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1136    mqtt_delete_request(r);
1137    return ERR_MEM;
1138  }
1139  /* Append fixed header */
1140  mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
1141
1142  /* Append Topic */
1143  mqtt_output_append_string(&client->output, topic, topic_len);
1144
1145  /* Append packet if for QoS 1 and 2*/
1146  if (qos > 0) {
1147    mqtt_output_append_u16(&client->output, pkt_id);
1148  }
1149
1150  /* Append optional publish payload */
1151  if ((payload != NULL) && (payload_length > 0)) {
1152    mqtt_output_append_buf(&client->output, payload, payload_length);
1153  }
1154
1155  mqtt_append_request(&client->pend_req_queue, r);
1156  mqtt_output_send(&client->output, client->conn);
1157  return ERR_OK;
1158}
1159
1160
1161/**
1162 * @ingroup mqtt
1163 * MQTT subscribe/unsubscribe function.
1164 * @param client MQTT client
1165 * @param topic topic to subscribe to
1166 * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
1167 * @param cb Callback to call when subscribe/unsubscribe reponse is received
1168 * @param arg User supplied argument to publish callback
1169 * @param sub 1 for subscribe, 0 for unsubscribe
1170 * @return ERR_OK if successful, @see err_t enum for other results
1171 */
1172err_t
1173mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
1174{
1175  size_t topic_strlen;
1176  size_t total_len;
1177  u16_t topic_len;
1178  u16_t remaining_length;
1179  u16_t pkt_id;
1180  struct mqtt_request_t *r;
1181
1182  LWIP_ASSERT_CORE_LOCKED();
1183  LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
1184  LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
1185
1186  topic_strlen = strlen(topic);
1187  LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1188  topic_len = (u16_t)topic_strlen;
1189  /* Topic string, pkt_id, qos for subscribe */
1190  total_len =  topic_len + 2 + 2 + (sub != 0);
1191  LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1192  remaining_length = (u16_t)total_len;
1193
1194  LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
1195  if (client->conn_state == TCP_DISCONNECTED) {
1196    LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
1197    return ERR_CONN;
1198  }
1199
1200  pkt_id = msg_generate_packet_id(client);
1201  r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg);
1202  if (r == NULL) {
1203    return ERR_MEM;
1204  }
1205
1206  if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1207    mqtt_delete_request(r);
1208    return ERR_MEM;
1209  }
1210
1211  LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
1212
1213  mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
1214  /* Packet id */
1215  mqtt_output_append_u16(&client->output, pkt_id);
1216  /* Topic */
1217  mqtt_output_append_string(&client->output, topic, topic_len);
1218  /* QoS */
1219  if (sub != 0) {
1220    mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
1221  }
1222
1223  mqtt_append_request(&client->pend_req_queue, r);
1224  mqtt_output_send(&client->output, client->conn);
1225  return ERR_OK;
1226}
1227
1228
1229/**
1230 * @ingroup mqtt
1231 * Set callback to handle incoming publish requests from server
1232 * @param client MQTT client
1233 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
1234 * @param data_cb Callback for each fragment of payload that arrives
1235 * @param arg User supplied argument to both callbacks
1236 */
1237void
1238mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
1239                        mqtt_incoming_data_cb_t data_cb, void *arg)
1240{
1241  LWIP_ASSERT_CORE_LOCKED();
1242  LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
1243  client->data_cb = data_cb;
1244  client->pub_cb = pub_cb;
1245  client->inpub_arg = arg;
1246}
1247
1248/**
1249 * @ingroup mqtt
1250 * Create a new MQTT client instance
1251 * @return Pointer to instance on success, NULL otherwise
1252 */
1253mqtt_client_t *
1254mqtt_client_new(void)
1255{
1256  LWIP_ASSERT_CORE_LOCKED();
1257  return (mqtt_client_t *)mem_calloc(1, sizeof(mqtt_client_t));
1258}
1259
1260/**
1261 * @ingroup mqtt
1262 * Free MQTT client instance
1263 * @param client Pointer to instance to be freed
1264 */
1265void
1266mqtt_client_free(mqtt_client_t *client)
1267{
1268  mem_free(client);
1269}
1270
1271/**
1272 * @ingroup mqtt
1273 * Connect to MQTT server
1274 * @param client MQTT client
1275 * @param ip_addr Server IP
1276 * @param port Server port
1277 * @param cb Connection state change callback
1278 * @param arg User supplied argument to connection callback
1279 * @param client_info Client identification and connection options
1280 * @return ERR_OK if successful, @see err_t enum for other results
1281 */
1282err_t
1283mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
1284                    const struct mqtt_connect_client_info_t *client_info)
1285{
1286  err_t err;
1287  size_t len;
1288  u16_t client_id_length;
1289  /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
1290  u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
1291  u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
1292  u16_t client_user_len = 0, client_pass_len = 0;
1293
1294  LWIP_ASSERT_CORE_LOCKED();
1295  LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
1296  LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
1297  LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
1298  LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
1299
1300  if (client->conn_state != TCP_DISCONNECTED) {
1301    LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Already connected\n"));
1302    return ERR_ISCONN;
1303  }
1304
1305  /* Wipe clean */
1306  memset(client, 0, sizeof(mqtt_client_t));
1307  client->connect_arg = arg;
1308  client->connect_cb = cb;
1309  client->keep_alive = client_info->keep_alive;
1310  mqtt_init_requests(client->req_list, LWIP_ARRAYSIZE(client->req_list));
1311
1312  /* Build connect message */
1313  if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
1314    flags |= MQTT_CONNECT_FLAG_WILL;
1315    flags |= (client_info->will_qos & 3) << 3;
1316    if (client_info->will_retain) {
1317      flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
1318    }
1319    len = strlen(client_info->will_topic);
1320    LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
1321    LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
1322    will_topic_len = (u8_t)len;
1323    len = strlen(client_info->will_msg);
1324    LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
1325    will_msg_len = (u8_t)len;
1326    len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
1327    LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1328    remaining_length = (u16_t)len;
1329  }
1330  if (client_info->client_user != NULL) {
1331    flags |= MQTT_CONNECT_FLAG_USERNAME;
1332    len = strlen(client_info->client_user);
1333    LWIP_ERROR("mqtt_client_connect: client_info->client_user length overflow", len <= 0xFFFF, return ERR_VAL);
1334    LWIP_ERROR("mqtt_client_connect: client_info->client_user length must be > 0", len > 0, return ERR_VAL);
1335    client_user_len = (u16_t)len;
1336    len = remaining_length + 2 + client_user_len;
1337    LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1338    remaining_length = (u16_t)len;
1339  }
1340  if (client_info->client_pass != NULL) {
1341    flags |= MQTT_CONNECT_FLAG_PASSWORD;
1342    len = strlen(client_info->client_pass);
1343    LWIP_ERROR("mqtt_client_connect: client_info->client_pass length overflow", len <= 0xFFFF, return ERR_VAL);
1344    LWIP_ERROR("mqtt_client_connect: client_info->client_pass length must be > 0", len > 0, return ERR_VAL);
1345    client_pass_len = (u16_t)len;
1346    len = remaining_length + 2 + client_pass_len;
1347    LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1348    remaining_length = (u16_t)len;
1349  }
1350
1351  /* Don't complicate things, always connect using clean session */
1352  flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
1353
1354  len = strlen(client_info->client_id);
1355  LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
1356  client_id_length = (u16_t)len;
1357  len = remaining_length + 2 + client_id_length;
1358  LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1359  remaining_length = (u16_t)len;
1360
1361  if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1362    return ERR_MEM;
1363  }
1364
1365#if LWIP_ALTCP && LWIP_ALTCP_TLS
1366  if (client_info->tls_config) {
1367    client->conn = altcp_tls_new(client_info->tls_config, IP_GET_TYPE(ip_addr));
1368  } else
1369#endif
1370  {
1371    client->conn = altcp_tcp_new_ip_type(IP_GET_TYPE(ip_addr));
1372  }
1373  if (client->conn == NULL) {
1374    return ERR_MEM;
1375  }
1376
1377  /* Set arg pointer for callbacks */
1378  altcp_arg(client->conn, client);
1379  /* Any local address, pick random local port number */
1380  err = altcp_bind(client->conn, IP_ADDR_ANY, 0);
1381  if (err != ERR_OK) {
1382    LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
1383    goto tcp_fail;
1384  }
1385  LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
1386
1387  /* Connect to server */
1388  err = altcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
1389  if (err != ERR_OK) {
1390    LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
1391    goto tcp_fail;
1392  }
1393  /* Set error callback */
1394  altcp_err(client->conn, mqtt_tcp_err_cb);
1395  client->conn_state = TCP_CONNECTING;
1396
1397  /* Append fixed header */
1398  mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
1399  /* Append Protocol string */
1400  mqtt_output_append_string(&client->output, "MQTT", 4);
1401  /* Append Protocol level */
1402  mqtt_output_append_u8(&client->output, 4);
1403  /* Append connect flags */
1404  mqtt_output_append_u8(&client->output, flags);
1405  /* Append keep-alive */
1406  mqtt_output_append_u16(&client->output, client_info->keep_alive);
1407  /* Append client id */
1408  mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
1409  /* Append will message if used */
1410  if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
1411    mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
1412    mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
1413  }
1414  /* Append user name if given */
1415  if ((flags & MQTT_CONNECT_FLAG_USERNAME) != 0) {
1416    mqtt_output_append_string(&client->output, client_info->client_user, client_user_len);
1417  }
1418  /* Append password if given */
1419  if ((flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) {
1420    mqtt_output_append_string(&client->output, client_info->client_pass, client_pass_len);
1421  }
1422  return ERR_OK;
1423
1424tcp_fail:
1425  altcp_abort(client->conn);
1426  client->conn = NULL;
1427  return err;
1428}
1429
1430
1431/**
1432 * @ingroup mqtt
1433 * Disconnect from MQTT server
1434 * @param client MQTT client
1435 */
1436void
1437mqtt_disconnect(mqtt_client_t *client)
1438{
1439  LWIP_ASSERT_CORE_LOCKED();
1440  LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
1441  /* If connection in not already closed */
1442  if (client->conn_state != TCP_DISCONNECTED) {
1443    /* Set conn_state before calling mqtt_close to prevent callback from being called */
1444    client->conn_state = TCP_DISCONNECTED;
1445    mqtt_close(client, (mqtt_connection_status_t)0);
1446  }
1447}
1448
1449/**
1450 * @ingroup mqtt
1451 * Check connection with server
1452 * @param client MQTT client
1453 * @return 1 if connected to server, 0 otherwise
1454 */
1455u8_t
1456mqtt_client_is_connected(mqtt_client_t *client)
1457{
1458  LWIP_ASSERT_CORE_LOCKED();
1459  LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
1460  return client->conn_state == MQTT_CONNECTED;
1461}
1462
1463#endif /* LWIP_TCP && LWIP_CALLBACK_API */
1464