1/** 2 * @file 3 * MQTT client 4 * 5 * @defgroup mqtt MQTT client 6 * @ingroup apps 7 * @verbinclude mqtt_client.txt 8 */ 9 10/* 11 * Copyright (c) 2016 Erik Andersson <erian747@gmail.com> 12 * All rights reserved. 13 * 14 * Redistribution and use in source and binary forms, with or without modification, 15 * are permitted provided that the following conditions are met: 16 * 17 * 1. Redistributions of source code must retain the above copyright notice, 18 * this list of conditions and the following disclaimer. 19 * 2. Redistributions in binary form must reproduce the above copyright notice, 20 * this list of conditions and the following disclaimer in the documentation 21 * and/or other materials provided with the distribution. 22 * 3. The name of the author may not be used to endorse or promote products 23 * derived from this software without specific prior written permission. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 26 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 27 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT 28 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 29 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 30 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 33 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY 34 * OF SUCH DAMAGE. 35 * 36 * This file is part of the lwIP TCP/IP stack 37 * 38 * Author: Erik Andersson <erian747@gmail.com> 39 * 40 * 41 * @todo: 42 * - Handle large outgoing payloads for PUBLISH messages 43 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics) 44 * - Add support for legacy MQTT protocol version 45 * 46 * Please coordinate changes and requests with Erik Andersson 47 * Erik Andersson <erian747@gmail.com> 48 * 49 */ 50#include "lwip/apps/mqtt.h" 51#include "lwip/apps/mqtt_priv.h" 52#include "lwip/timeouts.h" 53#include "lwip/ip_addr.h" 54#include "lwip/mem.h" 55#include "lwip/err.h" 56#include "lwip/pbuf.h" 57#include "lwip/altcp.h" 58#include "lwip/altcp_tcp.h" 59#include "lwip/altcp_tls.h" 60#include <string.h> 61 62#if LWIP_TCP && LWIP_CALLBACK_API 63 64/** 65 * MQTT_DEBUG: Default is off. 66 */ 67#if !defined MQTT_DEBUG || defined __DOXYGEN__ 68#define MQTT_DEBUG LWIP_DBG_OFF 69#endif 70 71#define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE) 72#define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE) 73#define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING) 74#define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE) 75#define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS) 76 77 78 79/** 80 * MQTT client connection states 81 */ 82enum { 83 TCP_DISCONNECTED, 84 TCP_CONNECTING, 85 MQTT_CONNECTING, 86 MQTT_CONNECTED 87}; 88 89/** 90 * MQTT control message types 91 */ 92enum mqtt_message_type { 93 MQTT_MSG_TYPE_CONNECT = 1, 94 MQTT_MSG_TYPE_CONNACK = 2, 95 MQTT_MSG_TYPE_PUBLISH = 3, 96 MQTT_MSG_TYPE_PUBACK = 4, 97 MQTT_MSG_TYPE_PUBREC = 5, 98 MQTT_MSG_TYPE_PUBREL = 6, 99 MQTT_MSG_TYPE_PUBCOMP = 7, 100 MQTT_MSG_TYPE_SUBSCRIBE = 8, 101 MQTT_MSG_TYPE_SUBACK = 9, 102 MQTT_MSG_TYPE_UNSUBSCRIBE = 10, 103 MQTT_MSG_TYPE_UNSUBACK = 11, 104 MQTT_MSG_TYPE_PINGREQ = 12, 105 MQTT_MSG_TYPE_PINGRESP = 13, 106 MQTT_MSG_TYPE_DISCONNECT = 14 107}; 108 109/** Helpers to extract control packet type and qos from first byte in fixed header */ 110#define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4) 111#define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1) 112 113/** 114 * MQTT connect flags, only used in CONNECT message 115 */ 116enum mqtt_connect_flag { 117 MQTT_CONNECT_FLAG_USERNAME = 1 << 7, 118 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, 119 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, 120 MQTT_CONNECT_FLAG_WILL = 1 << 2, 121 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 122}; 123 124 125static void mqtt_cyclic_timer(void *arg); 126 127#if defined(LWIP_DEBUG) 128static const char *const mqtt_message_type_str[15] = { 129 "UNDEFINED", 130 "CONNECT", 131 "CONNACK", 132 "PUBLISH", 133 "PUBACK", 134 "PUBREC", 135 "PUBREL", 136 "PUBCOMP", 137 "SUBSCRIBE", 138 "SUBACK", 139 "UNSUBSCRIBE", 140 "UNSUBACK", 141 "PINGREQ", 142 "PINGRESP", 143 "DISCONNECT" 144}; 145 146/** 147 * Message type value to string 148 * @param msg_type see enum mqtt_message_type 149 * 150 * @return Control message type text string 151 */ 152static const char * 153mqtt_msg_type_to_str(u8_t msg_type) 154{ 155 if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) { 156 msg_type = 0; 157 } 158 return mqtt_message_type_str[msg_type]; 159} 160 161#endif 162 163 164/** 165 * Generate MQTT packet identifier 166 * @param client MQTT client 167 * @return New packet identifier, range 1 to 65535 168 */ 169static u16_t 170msg_generate_packet_id(mqtt_client_t *client) 171{ 172 client->pkt_id_seq++; 173 if (client->pkt_id_seq == 0) { 174 client->pkt_id_seq++; 175 } 176 return client->pkt_id_seq; 177} 178 179/*--------------------------------------------------------------------------------------------------------------------- */ 180/* Output ring buffer */ 181 182/** Add single item to ring buffer */ 183static void 184mqtt_ringbuf_put(struct mqtt_ringbuf_t *rb, u8_t item) 185{ 186 rb->buf[rb->put] = item; 187 rb->put++; 188 if (rb->put >= MQTT_OUTPUT_RINGBUF_SIZE) { 189 rb->put = 0; 190 } 191} 192 193/** Return pointer to ring buffer get position */ 194static u8_t * 195mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t *rb) 196{ 197 return &rb->buf[rb->get]; 198} 199 200static void 201mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t *rb, u16_t len) 202{ 203 LWIP_ASSERT("mqtt_ringbuf_advance_get_idx: len < MQTT_OUTPUT_RINGBUF_SIZE", len < MQTT_OUTPUT_RINGBUF_SIZE); 204 205 rb->get += len; 206 if (rb->get >= MQTT_OUTPUT_RINGBUF_SIZE) { 207 rb->get = rb->get - MQTT_OUTPUT_RINGBUF_SIZE; 208 } 209} 210 211/** Return number of bytes in ring buffer */ 212static u16_t 213mqtt_ringbuf_len(struct mqtt_ringbuf_t *rb) 214{ 215 u32_t len = rb->put - rb->get; 216 if (len > 0xFFFF) { 217 len += MQTT_OUTPUT_RINGBUF_SIZE; 218 } 219 return (u16_t)len; 220} 221 222/** Return number of bytes free in ring buffer */ 223#define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb)) 224 225/** Return number of bytes possible to read without wrapping around */ 226#define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - (rb)->get)) 227 228/** 229 * Try send as many bytes as possible from output ring buffer 230 * @param rb Output ring buffer 231 * @param tpcb TCP connection handle 232 */ 233static void 234mqtt_output_send(struct mqtt_ringbuf_t *rb, struct altcp_pcb *tpcb) 235{ 236 err_t err; 237 u8_t wrap = 0; 238 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb); 239 u16_t send_len = altcp_sndbuf(tpcb); 240 LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL); 241 242 if (send_len == 0 || ringbuf_lin_len == 0) { 243 return; 244 } 245 246 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n", 247 send_len, ringbuf_lin_len, rb->get, rb->put)); 248 249 if (send_len > ringbuf_lin_len) { 250 /* Space in TCP output buffer is larger than available in ring buffer linear portion */ 251 send_len = ringbuf_lin_len; 252 /* Wrap around if more data in ring buffer after linear portion */ 253 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len); 254 } 255 err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0)); 256 if ((err == ERR_OK) && wrap) { 257 mqtt_ringbuf_advance_get_idx(rb, send_len); 258 /* Use the lesser one of ring buffer linear length and TCP send buffer size */ 259 send_len = LWIP_MIN(altcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb)); 260 err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY); 261 } 262 263 if (err == ERR_OK) { 264 mqtt_ringbuf_advance_get_idx(rb, send_len); 265 /* Flush */ 266 altcp_output(tpcb); 267 } else { 268 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err))); 269 } 270} 271 272 273 274/*--------------------------------------------------------------------------------------------------------------------- */ 275/* Request queue */ 276 277/** 278 * Create request item 279 * @param r_objs Pointer to request objects 280 * @param r_objs_len Number of array entries 281 * @param pkt_id Packet identifier of request 282 * @param cb Packet callback to call when requests lifetime ends 283 * @param arg Parameter following callback 284 * @return Request or NULL if failed to create 285 */ 286static struct mqtt_request_t * 287mqtt_create_request(struct mqtt_request_t *r_objs, size_t r_objs_len, u16_t pkt_id, mqtt_request_cb_t cb, void *arg) 288{ 289 struct mqtt_request_t *r = NULL; 290 u8_t n; 291 LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL); 292 for (n = 0; n < r_objs_len; n++) { 293 /* Item point to itself if not in use */ 294 if (r_objs[n].next == &r_objs[n]) { 295 r = &r_objs[n]; 296 r->next = NULL; 297 r->cb = cb; 298 r->arg = arg; 299 r->pkt_id = pkt_id; 300 break; 301 } 302 } 303 return r; 304} 305 306 307/** 308 * Append request to pending request queue 309 * @param tail Pointer to request queue tail pointer 310 * @param r Request to append 311 */ 312static void 313mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r) 314{ 315 struct mqtt_request_t *head = NULL; 316 s16_t time_before = 0; 317 struct mqtt_request_t *iter; 318 319 LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL); 320 321 /* Iterate trough queue to find head, and count total timeout time */ 322 for (iter = *tail; iter != NULL; iter = iter->next) { 323 time_before += iter->timeout_diff; 324 head = iter; 325 } 326 327 LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT); 328 r->timeout_diff = MQTT_REQ_TIMEOUT - time_before; 329 if (head == NULL) { 330 *tail = r; 331 } else { 332 head->next = r; 333 } 334} 335 336 337/** 338 * Delete request item 339 * @param r Request item to delete 340 */ 341static void 342mqtt_delete_request(struct mqtt_request_t *r) 343{ 344 if (r != NULL) { 345 r->next = r; 346 } 347} 348 349/** 350 * Remove a request item with a specific packet identifier from request queue 351 * @param tail Pointer to request queue tail pointer 352 * @param pkt_id Packet identifier of request to take 353 * @return Request item if found, NULL if not 354 */ 355static struct mqtt_request_t * 356mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id) 357{ 358 struct mqtt_request_t *iter = NULL, *prev = NULL; 359 LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL); 360 /* Search all request for pkt_id */ 361 for (iter = *tail; iter != NULL; iter = iter->next) { 362 if (iter->pkt_id == pkt_id) { 363 break; 364 } 365 prev = iter; 366 } 367 368 /* If request was found */ 369 if (iter != NULL) { 370 /* unchain */ 371 if (prev == NULL) { 372 *tail = iter->next; 373 } else { 374 prev->next = iter->next; 375 } 376 /* If exists, add remaining timeout time for the request to next */ 377 if (iter->next != NULL) { 378 iter->next->timeout_diff += iter->timeout_diff; 379 } 380 iter->next = NULL; 381 } 382 return iter; 383} 384 385/** 386 * Handle requests timeout 387 * @param tail Pointer to request queue tail pointer 388 * @param t Time since last call in seconds 389 */ 390static void 391mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t) 392{ 393 struct mqtt_request_t *r; 394 LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL); 395 r = *tail; 396 while (t > 0 && r != NULL) { 397 if (t >= r->timeout_diff) { 398 t -= (u8_t)r->timeout_diff; 399 /* Unchain */ 400 *tail = r->next; 401 /* Notify upper layer about timeout */ 402 if (r->cb != NULL) { 403 r->cb(r->arg, ERR_TIMEOUT); 404 } 405 mqtt_delete_request(r); 406 /* Tail might be be modified in callback, so re-read it in every iteration */ 407 r = *(struct mqtt_request_t *const volatile *)tail; 408 } else { 409 r->timeout_diff -= t; 410 t = 0; 411 } 412 } 413} 414 415/** 416 * Free all request items 417 * @param tail Pointer to request queue tail pointer 418 */ 419static void 420mqtt_clear_requests(struct mqtt_request_t **tail) 421{ 422 struct mqtt_request_t *iter, *next; 423 LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL); 424 for (iter = *tail; iter != NULL; iter = next) { 425 next = iter->next; 426 mqtt_delete_request(iter); 427 } 428 *tail = NULL; 429} 430/** 431 * Initialize all request items 432 * @param r_objs Pointer to request objects 433 * @param r_objs_len Number of array entries 434 */ 435static void 436mqtt_init_requests(struct mqtt_request_t *r_objs, size_t r_objs_len) 437{ 438 u8_t n; 439 LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL); 440 for (n = 0; n < r_objs_len; n++) { 441 /* Item pointing to itself indicates unused */ 442 r_objs[n].next = &r_objs[n]; 443 } 444} 445 446/*--------------------------------------------------------------------------------------------------------------------- */ 447/* Output message build helpers */ 448 449 450static void 451mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value) 452{ 453 mqtt_ringbuf_put(rb, value); 454} 455 456static 457void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value) 458{ 459 mqtt_ringbuf_put(rb, value >> 8); 460 mqtt_ringbuf_put(rb, value & 0xff); 461} 462 463static void 464mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length) 465{ 466 u16_t n; 467 for (n = 0; n < length; n++) { 468 mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]); 469 } 470} 471 472static void 473mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length) 474{ 475 u16_t n; 476 mqtt_ringbuf_put(rb, length >> 8); 477 mqtt_ringbuf_put(rb, length & 0xff); 478 for (n = 0; n < length; n++) { 479 mqtt_ringbuf_put(rb, str[n]); 480 } 481} 482 483/** 484 * Append fixed header 485 * @param rb Output ring buffer 486 * @param msg_type see enum mqtt_message_type 487 * @param fdup MQTT DUP flag 488 * @param fqos MQTT QoS field 489 * @param fretain MQTT retain flag 490 * @param r_length Remaining length after fixed header 491 */ 492 493static void 494mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t fdup, 495 u8_t fqos, u8_t fretain, u16_t r_length) 496{ 497 /* Start with control byte */ 498 mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((fdup & 1) << 3) | ((fqos & 3) << 1) | (fretain & 1))); 499 /* Encode remaining length field */ 500 do { 501 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0)); 502 r_length >>= 7; 503 } while (r_length > 0); 504} 505 506 507/** 508 * Check output buffer space 509 * @param rb Output ring buffer 510 * @param r_length Remaining length after fixed header 511 * @return 1 if message will fit, 0 if not enough buffer space 512 */ 513static u8_t 514mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length) 515{ 516 /* Start with length of type byte + remaining length */ 517 u16_t total_len = 1 + r_length; 518 519 LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL); 520 521 /* Calculate number of required bytes to contain the remaining bytes field and add to total*/ 522 do { 523 total_len++; 524 r_length >>= 7; 525 } while (r_length > 0); 526 527 return (total_len <= mqtt_ringbuf_free(rb)); 528} 529 530 531/** 532 * Close connection to server 533 * @param client MQTT client 534 * @param reason Reason for disconnection 535 */ 536static void 537mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason) 538{ 539 LWIP_ASSERT("mqtt_close: client != NULL", client != NULL); 540 541 /* Bring down TCP connection if not already done */ 542 if (client->conn != NULL) { 543 err_t res; 544 altcp_recv(client->conn, NULL); 545 altcp_err(client->conn, NULL); 546 altcp_sent(client->conn, NULL); 547 res = altcp_close(client->conn); 548 if (res != ERR_OK) { 549 altcp_abort(client->conn); 550 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_close: Close err=%s\n", lwip_strerr(res))); 551 } 552 client->conn = NULL; 553 } 554 555 /* Remove all pending requests */ 556 mqtt_clear_requests(&client->pend_req_queue); 557 /* Stop cyclic timer */ 558 sys_untimeout(mqtt_cyclic_timer, client); 559 560 /* Notify upper layer of disconnection if changed state */ 561 if (client->conn_state != TCP_DISCONNECTED) { 562 563 client->conn_state = TCP_DISCONNECTED; 564 if (client->connect_cb != NULL) { 565 client->connect_cb(client, client->connect_arg, reason); 566 } 567 } 568} 569 570 571/** 572 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states 573 * @param arg MQTT client 574 */ 575static void 576mqtt_cyclic_timer(void *arg) 577{ 578 u8_t restart_timer = 1; 579 mqtt_client_t *client = (mqtt_client_t *)arg; 580 LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL); 581 582 if (client->conn_state == MQTT_CONNECTING) { 583 client->cyclic_tick++; 584 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) { 585 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: CONNECT attempt to server timed out\n")); 586 /* Disconnect TCP */ 587 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 588 restart_timer = 0; 589 } 590 } else if (client->conn_state == MQTT_CONNECTED) { 591 /* Handle timeout for pending requests */ 592 mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL); 593 594 /* keep_alive > 0 means keep alive functionality shall be used */ 595 if (client->keep_alive > 0) { 596 597 client->server_watchdog++; 598 /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */ 599 if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive / 2)) { 600 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Server incoming keep-alive timeout\n")); 601 mqtt_close(client, MQTT_CONNECT_TIMEOUT); 602 restart_timer = 0; 603 } 604 605 /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */ 606 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) { 607 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: Sending keep-alive message to server\n")); 608 if (mqtt_output_check_space(&client->output, 0) != 0) { 609 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0); 610 client->cyclic_tick = 0; 611 } 612 } else { 613 client->cyclic_tick++; 614 } 615 } 616 } else { 617 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state)); 618 restart_timer = 0; 619 } 620 if (restart_timer) { 621 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, arg); 622 } 623} 624 625 626/** 627 * Send PUBACK, PUBREC or PUBREL response message 628 * @param client MQTT client 629 * @param msg PUBACK, PUBREC or PUBREL 630 * @param pkt_id Packet identifier 631 * @param qos QoS value 632 * @return ERR_OK if successful, ERR_MEM if out of memory 633 */ 634static err_t 635pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos) 636{ 637 err_t err = ERR_OK; 638 if (mqtt_output_check_space(&client->output, 2)) { 639 mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2); 640 mqtt_output_append_u16(&client->output, pkt_id); 641 mqtt_output_send(&client->output, client->conn); 642 } else { 643 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n", 644 mqtt_msg_type_to_str(msg), pkt_id)); 645 err = ERR_MEM; 646 } 647 return err; 648} 649 650/** 651 * Subscribe response from server 652 * @param r Matching request 653 * @param result Result code from server 654 */ 655static void 656mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result) 657{ 658 if (r->cb != NULL) { 659 r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT); 660 } 661} 662 663 664/** 665 * Complete MQTT message received or buffer full 666 * @param client MQTT client 667 * @param fixed_hdr_idx header index 668 * @param length length received part 669 * @param remaining_length Remaining length of complete message 670 */ 671static mqtt_connection_status_t 672mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length) 673{ 674 mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED; 675 676 u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx; 677 size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_idx; 678 679 /* Control packet type */ 680 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]); 681 u16_t pkt_id = 0; 682 683 LWIP_ASSERT("client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN", client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN); 684 LWIP_ASSERT("fixed_hdr_idx <= client->msg_idx", fixed_hdr_idx <= client->msg_idx); 685 LWIP_ERROR("buffer length mismatch", fixed_hdr_idx + length <= MQTT_VAR_HEADER_BUFFER_LEN, 686 return MQTT_CONNECT_DISCONNECTED); 687 688 if (pkt_type == MQTT_MSG_TYPE_CONNACK) { 689 if (client->conn_state == MQTT_CONNECTING) { 690 if (length < 2) { 691 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short CONNACK message\n")); 692 goto out_disconnect; 693 } 694 /* Get result code from CONNACK */ 695 res = (mqtt_connection_status_t)var_hdr_payload[1]; 696 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: Connect response code %d\n", res)); 697 if (res == MQTT_CONNECT_ACCEPTED) { 698 /* Reset cyclic_tick when changing to connected state */ 699 client->cyclic_tick = 0; 700 client->conn_state = MQTT_CONNECTED; 701 /* Notify upper layer */ 702 if (client->connect_cb != 0) { 703 client->connect_cb(client, client->connect_arg, res); 704 } 705 } 706 } else { 707 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Received CONNACK in connected state\n")); 708 } 709 } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) { 710 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ( "mqtt_message_received: Received PINGRESP from server\n")); 711 712 } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) { 713 u16_t payload_offset = 0; 714 u16_t payload_length = length; 715 u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]); 716 717 if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) { 718 /* Should have topic and pkt id*/ 719 u8_t *topic; 720 u16_t after_topic; 721 u8_t bkp; 722 u16_t topic_len; 723 u16_t qos_len = (qos ? 2U : 0U); 724 if (length < 2 + qos_len) { 725 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet\n")); 726 goto out_disconnect; 727 } 728 topic_len = var_hdr_payload[0]; 729 topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]); 730 if ((topic_len > length - (2 + qos_len)) || 731 (topic_len > var_hdr_payload_bufsize - (2 + qos_len))) { 732 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (topic)\n")); 733 goto out_disconnect; 734 } 735 736 topic = var_hdr_payload + 2; 737 after_topic = 2 + topic_len; 738 /* Check buffer length, add one byte even for QoS 0 so that zero termination will fit */ 739 if ((after_topic + (qos ? 2U : 1U)) > var_hdr_payload_bufsize) { 740 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n")); 741 goto out_disconnect; 742 } 743 744 /* id for QoS 1 and 2 */ 745 if (qos > 0) { 746 if (length < after_topic + 2U) { 747 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (after_topic)\n")); 748 goto out_disconnect; 749 } 750 client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1]; 751 after_topic += 2; 752 } else { 753 client->inpub_pkt_id = 0; 754 } 755 /* Take backup of byte after topic */ 756 bkp = topic[topic_len]; 757 /* Zero terminate string */ 758 topic[topic_len] = 0; 759 /* Payload data remaining in receive buffer */ 760 payload_length = length - after_topic; 761 payload_offset = after_topic; 762 763 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %"U32_F"\n", 764 qos, topic, remaining_length + payload_length)); 765 if (client->pub_cb != NULL) { 766 client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length); 767 } 768 /* Restore byte after topic */ 769 topic[topic_len] = bkp; 770 } 771 if (payload_length > 0 || remaining_length == 0) { 772 if (length < (size_t)(payload_offset + payload_length)) { 773 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short packet (payload)\n")); 774 goto out_disconnect; 775 } 776 client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0); 777 /* Reply if QoS > 0 */ 778 if (remaining_length == 0 && qos > 0) { 779 /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */ 780 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC; 781 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n", 782 mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id)); 783 pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0); 784 } 785 } 786 } else { 787 /* Get packet identifier */ 788 pkt_id = (u16_t)var_hdr_payload[0] << 8; 789 pkt_id |= (u16_t)var_hdr_payload[1]; 790 if (pkt_id == 0) { 791 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Got message with illegal packet identifier: 0\n")); 792 goto out_disconnect; 793 } 794 if (pkt_type == MQTT_MSG_TYPE_PUBREC) { 795 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n", pkt_id)); 796 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1); 797 798 } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) { 799 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n", pkt_id)); 800 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0); 801 802 } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK || 803 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) { 804 struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id); 805 if (r != NULL) { 806 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 807 if (pkt_type == MQTT_MSG_TYPE_SUBACK) { 808 if (length < 3) { 809 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: To small SUBACK packet\n")); 810 goto out_disconnect; 811 } else { 812 mqtt_incomming_suback(r, var_hdr_payload[2]); 813 } 814 } else if (r->cb != NULL) { 815 r->cb(r->arg, ERR_OK); 816 } 817 mqtt_delete_request(r); 818 } else { 819 LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id)); 820 } 821 } else { 822 LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received unknown message type: %d\n", pkt_type)); 823 goto out_disconnect; 824 } 825 } 826 return res; 827out_disconnect: 828 return MQTT_CONNECT_DISCONNECTED; 829} 830 831 832/** 833 * MQTT incoming message parser 834 * @param client MQTT client 835 * @param p PBUF chain of received data 836 * @return Connection status 837 */ 838static mqtt_connection_status_t 839mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p) 840{ 841 u16_t in_offset = 0; 842 u32_t msg_rem_len = 0; 843 u8_t fixed_hdr_idx = 0; 844 u8_t b = 0; 845 846 while (p->tot_len > in_offset) { 847 /* We ALWAYS parse the header here first. Even if the header was not 848 included in this segment, we re-parse it here by buffering it in 849 client->rx_buffer. client->msg_idx keeps track of this. */ 850 if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) { 851 852 if (fixed_hdr_idx < client->msg_idx) { 853 /* parse header from old pbuf (buffered in client->rx_buffer) */ 854 b = client->rx_buffer[fixed_hdr_idx]; 855 } else { 856 /* parse header from this pbuf and save it in client->rx_buffer in case 857 it comes in segmented */ 858 b = pbuf_get_at(p, in_offset++); 859 client->rx_buffer[client->msg_idx++] = b; 860 } 861 fixed_hdr_idx++; 862 863 if (fixed_hdr_idx >= 2) { 864 /* fixed header contains at least 2 bytes but can contain more, depending on 865 'remaining length'. All bytes but the last of this have 0x80 set to 866 indicate more bytes are coming. */ 867 msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7); 868 if ((b & 0x80) == 0) { 869 /* fixed header is done */ 870 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len)); 871 if (msg_rem_len == 0) { 872 /* Complete message with no extra headers of payload received */ 873 mqtt_message_received(client, fixed_hdr_idx, 0, 0); 874 client->msg_idx = 0; 875 fixed_hdr_idx = 0; 876 } else { 877 /* Bytes remaining in message (changes remaining length if this is 878 not the first segment of this message) */ 879 msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx; 880 } 881 } 882 } 883 } else { 884 /* Fixed header has been parsed, parse variable header */ 885 u16_t cpy_len, cpy_start, buffer_space; 886 887 cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx; 888 889 /* Allow to copy the lesser one of available length in input data or bytes remaining in message */ 890 cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len); 891 892 /* Limit to available space in buffer */ 893 buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start; 894 if (cpy_len > buffer_space) { 895 cpy_len = buffer_space; 896 } 897 pbuf_copy_partial(p, client->rx_buffer + cpy_start, cpy_len, in_offset); 898 899 /* Advance get and put indexes */ 900 client->msg_idx += cpy_len; 901 in_offset += cpy_len; 902 msg_rem_len -= cpy_len; 903 904 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len)); 905 if ((msg_rem_len == 0) || (cpy_len == buffer_space)) { 906 /* Whole message received or buffer is full */ 907 mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len); 908 if (res != MQTT_CONNECT_ACCEPTED) { 909 return res; 910 } 911 if (msg_rem_len == 0) { 912 /* Reset parser state */ 913 client->msg_idx = 0; 914 /* msg_tot_len = 0; */ 915 fixed_hdr_idx = 0; 916 } 917 } 918 } 919 } 920 return MQTT_CONNECT_ACCEPTED; 921} 922 923 924/** 925 * TCP received callback function. @see tcp_recv_fn 926 * @param arg MQTT client 927 * @param p PBUF chain of received data 928 * @param err Passed as return value if not ERR_OK 929 * @return ERR_OK or err passed into callback 930 */ 931static err_t 932mqtt_tcp_recv_cb(void *arg, struct altcp_pcb *pcb, struct pbuf *p, err_t err) 933{ 934 mqtt_client_t *client = (mqtt_client_t *)arg; 935 LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL); 936 LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb); 937 938 if (p == NULL) { 939 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n")); 940 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 941 } else { 942 mqtt_connection_status_t res; 943 if (err != ERR_OK) { 944 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_recv_cb: Recv err=%d\n", err)); 945 pbuf_free(p); 946 return err; 947 } 948 949 /* Tell remote that data has been received */ 950 altcp_recved(pcb, p->tot_len); 951 res = mqtt_parse_incoming(client, p); 952 pbuf_free(p); 953 954 if (res != MQTT_CONNECT_ACCEPTED) { 955 mqtt_close(client, res); 956 } 957 /* If keep alive functionality is used */ 958 if (client->keep_alive != 0) { 959 /* Reset server alive watchdog */ 960 client->server_watchdog = 0; 961 } 962 963 } 964 return ERR_OK; 965} 966 967 968/** 969 * TCP data sent callback function. @see tcp_sent_fn 970 * @param arg MQTT client 971 * @param tpcb TCP connection handle 972 * @param len Number of bytes sent 973 * @return ERR_OK 974 */ 975static err_t 976mqtt_tcp_sent_cb(void *arg, struct altcp_pcb *tpcb, u16_t len) 977{ 978 mqtt_client_t *client = (mqtt_client_t *)arg; 979 980 LWIP_UNUSED_ARG(tpcb); 981 LWIP_UNUSED_ARG(len); 982 983 if (client->conn_state == MQTT_CONNECTED) { 984 struct mqtt_request_t *r; 985 986 /* Reset keep-alive send timer and server watchdog */ 987 client->cyclic_tick = 0; 988 client->server_watchdog = 0; 989 /* QoS 0 publish has no response from server, so call its callbacks here */ 990 while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) { 991 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n")); 992 if (r->cb != NULL) { 993 r->cb(r->arg, ERR_OK); 994 } 995 mqtt_delete_request(r); 996 } 997 /* Try send any remaining buffers from output queue */ 998 mqtt_output_send(&client->output, client->conn); 999 } 1000 return ERR_OK; 1001} 1002 1003/** 1004 * TCP error callback function. @see tcp_err_fn 1005 * @param arg MQTT client 1006 * @param err Error encountered 1007 */ 1008static void 1009mqtt_tcp_err_cb(void *arg, err_t err) 1010{ 1011 mqtt_client_t *client = (mqtt_client_t *)arg; 1012 LWIP_UNUSED_ARG(err); /* only used for debug output */ 1013 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg)); 1014 LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL); 1015 /* Set conn to null before calling close as pcb is already deallocated*/ 1016 client->conn = 0; 1017 mqtt_close(client, MQTT_CONNECT_DISCONNECTED); 1018} 1019 1020/** 1021 * TCP poll callback function. @see tcp_poll_fn 1022 * @param arg MQTT client 1023 * @param tpcb TCP connection handle 1024 * @return err ERR_OK 1025 */ 1026static err_t 1027mqtt_tcp_poll_cb(void *arg, struct altcp_pcb *tpcb) 1028{ 1029 mqtt_client_t *client = (mqtt_client_t *)arg; 1030 if (client->conn_state == MQTT_CONNECTED) { 1031 /* Try send any remaining buffers from output queue */ 1032 mqtt_output_send(&client->output, tpcb); 1033 } 1034 return ERR_OK; 1035} 1036 1037/** 1038 * TCP connect callback function. @see tcp_connected_fn 1039 * @param arg MQTT client 1040 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error 1041 * @return ERR_OK 1042 */ 1043static err_t 1044mqtt_tcp_connect_cb(void *arg, struct altcp_pcb *tpcb, err_t err) 1045{ 1046 mqtt_client_t *client = (mqtt_client_t *)arg; 1047 1048 if (err != ERR_OK) { 1049 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_connect_cb: TCP connect error %d\n", err)); 1050 return err; 1051 } 1052 1053 /* Initiate receiver state */ 1054 client->msg_idx = 0; 1055 1056 /* Setup TCP callbacks */ 1057 altcp_recv(tpcb, mqtt_tcp_recv_cb); 1058 altcp_sent(tpcb, mqtt_tcp_sent_cb); 1059 altcp_poll(tpcb, mqtt_tcp_poll_cb, 2); 1060 1061 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_connect_cb: TCP connection established to server\n")); 1062 /* Enter MQTT connect state */ 1063 client->conn_state = MQTT_CONNECTING; 1064 1065 /* Start cyclic timer */ 1066 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, client); 1067 client->cyclic_tick = 0; 1068 1069 /* Start transmission from output queue, connect message is the first one out*/ 1070 mqtt_output_send(&client->output, client->conn); 1071 1072 return ERR_OK; 1073} 1074 1075 1076 1077/*---------------------------------------------------------------------------------------------------- */ 1078/* Public API */ 1079 1080 1081/** 1082 * @ingroup mqtt 1083 * MQTT publish function. 1084 * @param client MQTT client 1085 * @param topic Publish topic string 1086 * @param payload Data to publish (NULL is allowed) 1087 * @param payload_length Length of payload (0 is allowed) 1088 * @param qos Quality of service, 0 1 or 2 1089 * @param retain MQTT retain flag 1090 * @param cb Callback to call when publish is complete or has timed out 1091 * @param arg User supplied argument to publish callback 1092 * @return ERR_OK if successful 1093 * ERR_CONN if client is disconnected 1094 * ERR_MEM if short on memory 1095 */ 1096err_t 1097mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain, 1098 mqtt_request_cb_t cb, void *arg) 1099{ 1100 struct mqtt_request_t *r; 1101 u16_t pkt_id; 1102 size_t topic_strlen; 1103 size_t total_len; 1104 u16_t topic_len; 1105 u16_t remaining_length; 1106 1107 LWIP_ASSERT_CORE_LOCKED(); 1108 LWIP_ASSERT("mqtt_publish: client != NULL", client); 1109 LWIP_ASSERT("mqtt_publish: topic != NULL", topic); 1110 LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN); 1111 1112 topic_strlen = strlen(topic); 1113 LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 1114 topic_len = (u16_t)topic_strlen; 1115 total_len = 2 + topic_len + payload_length; 1116 1117 if (qos > 0) { 1118 total_len += 2; 1119 /* Generate pkt_id id for QoS1 and 2 */ 1120 pkt_id = msg_generate_packet_id(client); 1121 } else { 1122 /* Use reserved value pkt_id 0 for QoS 0 in request handle */ 1123 pkt_id = 0; 1124 } 1125 LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 1126 remaining_length = (u16_t)total_len; 1127 1128 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic)); 1129 1130 r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg); 1131 if (r == NULL) { 1132 return ERR_MEM; 1133 } 1134 1135 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 1136 mqtt_delete_request(r); 1137 return ERR_MEM; 1138 } 1139 /* Append fixed header */ 1140 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length); 1141 1142 /* Append Topic */ 1143 mqtt_output_append_string(&client->output, topic, topic_len); 1144 1145 /* Append packet if for QoS 1 and 2*/ 1146 if (qos > 0) { 1147 mqtt_output_append_u16(&client->output, pkt_id); 1148 } 1149 1150 /* Append optional publish payload */ 1151 if ((payload != NULL) && (payload_length > 0)) { 1152 mqtt_output_append_buf(&client->output, payload, payload_length); 1153 } 1154 1155 mqtt_append_request(&client->pend_req_queue, r); 1156 mqtt_output_send(&client->output, client->conn); 1157 return ERR_OK; 1158} 1159 1160 1161/** 1162 * @ingroup mqtt 1163 * MQTT subscribe/unsubscribe function. 1164 * @param client MQTT client 1165 * @param topic topic to subscribe to 1166 * @param qos Quality of service, 0 1 or 2 (only used for subscribe) 1167 * @param cb Callback to call when subscribe/unsubscribe reponse is received 1168 * @param arg User supplied argument to publish callback 1169 * @param sub 1 for subscribe, 0 for unsubscribe 1170 * @return ERR_OK if successful, @see err_t enum for other results 1171 */ 1172err_t 1173mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub) 1174{ 1175 size_t topic_strlen; 1176 size_t total_len; 1177 u16_t topic_len; 1178 u16_t remaining_length; 1179 u16_t pkt_id; 1180 struct mqtt_request_t *r; 1181 1182 LWIP_ASSERT_CORE_LOCKED(); 1183 LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client); 1184 LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic); 1185 1186 topic_strlen = strlen(topic); 1187 LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG); 1188 topic_len = (u16_t)topic_strlen; 1189 /* Topic string, pkt_id, qos for subscribe */ 1190 total_len = topic_len + 2 + 2 + (sub != 0); 1191 LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG); 1192 remaining_length = (u16_t)total_len; 1193 1194 LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3); 1195 if (client->conn_state == TCP_DISCONNECTED) { 1196 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n")); 1197 return ERR_CONN; 1198 } 1199 1200 pkt_id = msg_generate_packet_id(client); 1201 r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg); 1202 if (r == NULL) { 1203 return ERR_MEM; 1204 } 1205 1206 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 1207 mqtt_delete_request(r); 1208 return ERR_MEM; 1209 } 1210 1211 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id)); 1212 1213 mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length); 1214 /* Packet id */ 1215 mqtt_output_append_u16(&client->output, pkt_id); 1216 /* Topic */ 1217 mqtt_output_append_string(&client->output, topic, topic_len); 1218 /* QoS */ 1219 if (sub != 0) { 1220 mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2)); 1221 } 1222 1223 mqtt_append_request(&client->pend_req_queue, r); 1224 mqtt_output_send(&client->output, client->conn); 1225 return ERR_OK; 1226} 1227 1228 1229/** 1230 * @ingroup mqtt 1231 * Set callback to handle incoming publish requests from server 1232 * @param client MQTT client 1233 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload 1234 * @param data_cb Callback for each fragment of payload that arrives 1235 * @param arg User supplied argument to both callbacks 1236 */ 1237void 1238mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb, 1239 mqtt_incoming_data_cb_t data_cb, void *arg) 1240{ 1241 LWIP_ASSERT_CORE_LOCKED(); 1242 LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL); 1243 client->data_cb = data_cb; 1244 client->pub_cb = pub_cb; 1245 client->inpub_arg = arg; 1246} 1247 1248/** 1249 * @ingroup mqtt 1250 * Create a new MQTT client instance 1251 * @return Pointer to instance on success, NULL otherwise 1252 */ 1253mqtt_client_t * 1254mqtt_client_new(void) 1255{ 1256 LWIP_ASSERT_CORE_LOCKED(); 1257 return (mqtt_client_t *)mem_calloc(1, sizeof(mqtt_client_t)); 1258} 1259 1260/** 1261 * @ingroup mqtt 1262 * Free MQTT client instance 1263 * @param client Pointer to instance to be freed 1264 */ 1265void 1266mqtt_client_free(mqtt_client_t *client) 1267{ 1268 mem_free(client); 1269} 1270 1271/** 1272 * @ingroup mqtt 1273 * Connect to MQTT server 1274 * @param client MQTT client 1275 * @param ip_addr Server IP 1276 * @param port Server port 1277 * @param cb Connection state change callback 1278 * @param arg User supplied argument to connection callback 1279 * @param client_info Client identification and connection options 1280 * @return ERR_OK if successful, @see err_t enum for other results 1281 */ 1282err_t 1283mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg, 1284 const struct mqtt_connect_client_info_t *client_info) 1285{ 1286 err_t err; 1287 size_t len; 1288 u16_t client_id_length; 1289 /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */ 1290 u16_t remaining_length = 2 + 4 + 1 + 1 + 2; 1291 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0; 1292 u16_t client_user_len = 0, client_pass_len = 0; 1293 1294 LWIP_ASSERT_CORE_LOCKED(); 1295 LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL); 1296 LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL); 1297 LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL); 1298 LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL); 1299 1300 if (client->conn_state != TCP_DISCONNECTED) { 1301 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Already connected\n")); 1302 return ERR_ISCONN; 1303 } 1304 1305 /* Wipe clean */ 1306 memset(client, 0, sizeof(mqtt_client_t)); 1307 client->connect_arg = arg; 1308 client->connect_cb = cb; 1309 client->keep_alive = client_info->keep_alive; 1310 mqtt_init_requests(client->req_list, LWIP_ARRAYSIZE(client->req_list)); 1311 1312 /* Build connect message */ 1313 if (client_info->will_topic != NULL && client_info->will_msg != NULL) { 1314 flags |= MQTT_CONNECT_FLAG_WILL; 1315 flags |= (client_info->will_qos & 3) << 3; 1316 if (client_info->will_retain) { 1317 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; 1318 } 1319 len = strlen(client_info->will_topic); 1320 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL); 1321 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL); 1322 will_topic_len = (u8_t)len; 1323 len = strlen(client_info->will_msg); 1324 LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL); 1325 will_msg_len = (u8_t)len; 1326 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len; 1327 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 1328 remaining_length = (u16_t)len; 1329 } 1330 if (client_info->client_user != NULL) { 1331 flags |= MQTT_CONNECT_FLAG_USERNAME; 1332 len = strlen(client_info->client_user); 1333 LWIP_ERROR("mqtt_client_connect: client_info->client_user length overflow", len <= 0xFFFF, return ERR_VAL); 1334 LWIP_ERROR("mqtt_client_connect: client_info->client_user length must be > 0", len > 0, return ERR_VAL); 1335 client_user_len = (u16_t)len; 1336 len = remaining_length + 2 + client_user_len; 1337 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 1338 remaining_length = (u16_t)len; 1339 } 1340 if (client_info->client_pass != NULL) { 1341 flags |= MQTT_CONNECT_FLAG_PASSWORD; 1342 len = strlen(client_info->client_pass); 1343 LWIP_ERROR("mqtt_client_connect: client_info->client_pass length overflow", len <= 0xFFFF, return ERR_VAL); 1344 LWIP_ERROR("mqtt_client_connect: client_info->client_pass length must be > 0", len > 0, return ERR_VAL); 1345 client_pass_len = (u16_t)len; 1346 len = remaining_length + 2 + client_pass_len; 1347 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 1348 remaining_length = (u16_t)len; 1349 } 1350 1351 /* Don't complicate things, always connect using clean session */ 1352 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; 1353 1354 len = strlen(client_info->client_id); 1355 LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL); 1356 client_id_length = (u16_t)len; 1357 len = remaining_length + 2 + client_id_length; 1358 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL); 1359 remaining_length = (u16_t)len; 1360 1361 if (mqtt_output_check_space(&client->output, remaining_length) == 0) { 1362 return ERR_MEM; 1363 } 1364 1365#if LWIP_ALTCP && LWIP_ALTCP_TLS 1366 if (client_info->tls_config) { 1367 client->conn = altcp_tls_new(client_info->tls_config, IP_GET_TYPE(ip_addr)); 1368 } else 1369#endif 1370 { 1371 client->conn = altcp_tcp_new_ip_type(IP_GET_TYPE(ip_addr)); 1372 } 1373 if (client->conn == NULL) { 1374 return ERR_MEM; 1375 } 1376 1377 /* Set arg pointer for callbacks */ 1378 altcp_arg(client->conn, client); 1379 /* Any local address, pick random local port number */ 1380 err = altcp_bind(client->conn, IP_ADDR_ANY, 0); 1381 if (err != ERR_OK) { 1382 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Error binding to local ip/port, %d\n", err)); 1383 goto tcp_fail; 1384 } 1385 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port)); 1386 1387 /* Connect to server */ 1388 err = altcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb); 1389 if (err != ERR_OK) { 1390 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err)); 1391 goto tcp_fail; 1392 } 1393 /* Set error callback */ 1394 altcp_err(client->conn, mqtt_tcp_err_cb); 1395 client->conn_state = TCP_CONNECTING; 1396 1397 /* Append fixed header */ 1398 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length); 1399 /* Append Protocol string */ 1400 mqtt_output_append_string(&client->output, "MQTT", 4); 1401 /* Append Protocol level */ 1402 mqtt_output_append_u8(&client->output, 4); 1403 /* Append connect flags */ 1404 mqtt_output_append_u8(&client->output, flags); 1405 /* Append keep-alive */ 1406 mqtt_output_append_u16(&client->output, client_info->keep_alive); 1407 /* Append client id */ 1408 mqtt_output_append_string(&client->output, client_info->client_id, client_id_length); 1409 /* Append will message if used */ 1410 if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) { 1411 mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len); 1412 mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len); 1413 } 1414 /* Append user name if given */ 1415 if ((flags & MQTT_CONNECT_FLAG_USERNAME) != 0) { 1416 mqtt_output_append_string(&client->output, client_info->client_user, client_user_len); 1417 } 1418 /* Append password if given */ 1419 if ((flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) { 1420 mqtt_output_append_string(&client->output, client_info->client_pass, client_pass_len); 1421 } 1422 return ERR_OK; 1423 1424tcp_fail: 1425 altcp_abort(client->conn); 1426 client->conn = NULL; 1427 return err; 1428} 1429 1430 1431/** 1432 * @ingroup mqtt 1433 * Disconnect from MQTT server 1434 * @param client MQTT client 1435 */ 1436void 1437mqtt_disconnect(mqtt_client_t *client) 1438{ 1439 LWIP_ASSERT_CORE_LOCKED(); 1440 LWIP_ASSERT("mqtt_disconnect: client != NULL", client); 1441 /* If connection in not already closed */ 1442 if (client->conn_state != TCP_DISCONNECTED) { 1443 /* Set conn_state before calling mqtt_close to prevent callback from being called */ 1444 client->conn_state = TCP_DISCONNECTED; 1445 mqtt_close(client, (mqtt_connection_status_t)0); 1446 } 1447} 1448 1449/** 1450 * @ingroup mqtt 1451 * Check connection with server 1452 * @param client MQTT client 1453 * @return 1 if connected to server, 0 otherwise 1454 */ 1455u8_t 1456mqtt_client_is_connected(mqtt_client_t *client) 1457{ 1458 LWIP_ASSERT_CORE_LOCKED(); 1459 LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client); 1460 return client->conn_state == MQTT_CONNECTED; 1461} 1462 1463#endif /* LWIP_TCP && LWIP_CALLBACK_API */ 1464