1#include "ceph_debug.h" 2 3#include <linux/crc32c.h> 4#include <linux/ctype.h> 5#include <linux/highmem.h> 6#include <linux/inet.h> 7#include <linux/kthread.h> 8#include <linux/net.h> 9#include <linux/slab.h> 10#include <linux/socket.h> 11#include <linux/string.h> 12#include <net/tcp.h> 13 14#include "super.h" 15#include "messenger.h" 16#include "decode.h" 17#include "pagelist.h" 18 19/* 20 * Ceph uses the messenger to exchange ceph_msg messages with other 21 * hosts in the system. The messenger provides ordered and reliable 22 * delivery. We tolerate TCP disconnects by reconnecting (with 23 * exponential backoff) in the case of a fault (disconnection, bad 24 * crc, protocol error). Acks allow sent messages to be discarded by 25 * the sender. 26 */ 27 28/* static tag bytes (protocol control messages) */ 29static char tag_msg = CEPH_MSGR_TAG_MSG; 30static char tag_ack = CEPH_MSGR_TAG_ACK; 31static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 32 33#ifdef CONFIG_LOCKDEP 34static struct lock_class_key socket_class; 35#endif 36 37 38static void queue_con(struct ceph_connection *con); 39static void con_work(struct work_struct *); 40static void ceph_fault(struct ceph_connection *con); 41 42/* 43 * nicely render a sockaddr as a string. 44 */ 45#define MAX_ADDR_STR 20 46#define MAX_ADDR_STR_LEN 60 47static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN]; 48static DEFINE_SPINLOCK(addr_str_lock); 49static int last_addr_str; 50 51const char *pr_addr(const struct sockaddr_storage *ss) 52{ 53 int i; 54 char *s; 55 struct sockaddr_in *in4 = (void *)ss; 56 struct sockaddr_in6 *in6 = (void *)ss; 57 58 spin_lock(&addr_str_lock); 59 i = last_addr_str++; 60 if (last_addr_str == MAX_ADDR_STR) 61 last_addr_str = 0; 62 spin_unlock(&addr_str_lock); 63 s = addr_str[i]; 64 65 switch (ss->ss_family) { 66 case AF_INET: 67 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr, 68 (unsigned int)ntohs(in4->sin_port)); 69 break; 70 71 case AF_INET6: 72 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr, 73 (unsigned int)ntohs(in6->sin6_port)); 74 break; 75 76 default: 77 sprintf(s, "(unknown sockaddr family %d)", (int)ss->ss_family); 78 } 79 80 return s; 81} 82 83static void encode_my_addr(struct ceph_messenger *msgr) 84{ 85 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); 86 ceph_encode_addr(&msgr->my_enc_addr); 87} 88 89/* 90 * work queue for all reading and writing to/from the socket. 91 */ 92struct workqueue_struct *ceph_msgr_wq; 93 94int __init ceph_msgr_init(void) 95{ 96 ceph_msgr_wq = create_workqueue("ceph-msgr"); 97 if (IS_ERR(ceph_msgr_wq)) { 98 int ret = PTR_ERR(ceph_msgr_wq); 99 pr_err("msgr_init failed to create workqueue: %d\n", ret); 100 ceph_msgr_wq = NULL; 101 return ret; 102 } 103 return 0; 104} 105 106void ceph_msgr_exit(void) 107{ 108 destroy_workqueue(ceph_msgr_wq); 109} 110 111void ceph_msgr_flush(void) 112{ 113 flush_workqueue(ceph_msgr_wq); 114} 115 116 117/* 118 * socket callback functions 119 */ 120 121/* data available on socket, or listen socket received a connect */ 122static void ceph_data_ready(struct sock *sk, int count_unused) 123{ 124 struct ceph_connection *con = 125 (struct ceph_connection *)sk->sk_user_data; 126 if (sk->sk_state != TCP_CLOSE_WAIT) { 127 dout("ceph_data_ready on %p state = %lu, queueing work\n", 128 con, con->state); 129 queue_con(con); 130 } 131} 132 133/* socket has buffer space for writing */ 134static void ceph_write_space(struct sock *sk) 135{ 136 struct ceph_connection *con = 137 (struct ceph_connection *)sk->sk_user_data; 138 139 /* only queue to workqueue if there is data we want to write. */ 140 if (test_bit(WRITE_PENDING, &con->state)) { 141 dout("ceph_write_space %p queueing write work\n", con); 142 queue_con(con); 143 } else { 144 dout("ceph_write_space %p nothing to write\n", con); 145 } 146 147 /* since we have our own write_space, clear the SOCK_NOSPACE flag */ 148 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 149} 150 151/* socket's state has changed */ 152static void ceph_state_change(struct sock *sk) 153{ 154 struct ceph_connection *con = 155 (struct ceph_connection *)sk->sk_user_data; 156 157 dout("ceph_state_change %p state = %lu sk_state = %u\n", 158 con, con->state, sk->sk_state); 159 160 if (test_bit(CLOSED, &con->state)) 161 return; 162 163 switch (sk->sk_state) { 164 case TCP_CLOSE: 165 dout("ceph_state_change TCP_CLOSE\n"); 166 case TCP_CLOSE_WAIT: 167 dout("ceph_state_change TCP_CLOSE_WAIT\n"); 168 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { 169 if (test_bit(CONNECTING, &con->state)) 170 con->error_msg = "connection failed"; 171 else 172 con->error_msg = "socket closed"; 173 queue_con(con); 174 } 175 break; 176 case TCP_ESTABLISHED: 177 dout("ceph_state_change TCP_ESTABLISHED\n"); 178 queue_con(con); 179 break; 180 } 181} 182 183/* 184 * set up socket callbacks 185 */ 186static void set_sock_callbacks(struct socket *sock, 187 struct ceph_connection *con) 188{ 189 struct sock *sk = sock->sk; 190 sk->sk_user_data = (void *)con; 191 sk->sk_data_ready = ceph_data_ready; 192 sk->sk_write_space = ceph_write_space; 193 sk->sk_state_change = ceph_state_change; 194} 195 196 197/* 198 * socket helpers 199 */ 200 201/* 202 * initiate connection to a remote socket. 203 */ 204static struct socket *ceph_tcp_connect(struct ceph_connection *con) 205{ 206 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; 207 struct socket *sock; 208 int ret; 209 210 BUG_ON(con->sock); 211 ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, 212 IPPROTO_TCP, &sock); 213 if (ret) 214 return ERR_PTR(ret); 215 con->sock = sock; 216 sock->sk->sk_allocation = GFP_NOFS; 217 218#ifdef CONFIG_LOCKDEP 219 lockdep_set_class(&sock->sk->sk_lock, &socket_class); 220#endif 221 222 set_sock_callbacks(sock, con); 223 224 dout("connect %s\n", pr_addr(&con->peer_addr.in_addr)); 225 226 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), 227 O_NONBLOCK); 228 if (ret == -EINPROGRESS) { 229 dout("connect %s EINPROGRESS sk_state = %u\n", 230 pr_addr(&con->peer_addr.in_addr), 231 sock->sk->sk_state); 232 ret = 0; 233 } 234 if (ret < 0) { 235 pr_err("connect %s error %d\n", 236 pr_addr(&con->peer_addr.in_addr), ret); 237 sock_release(sock); 238 con->sock = NULL; 239 con->error_msg = "connect error"; 240 } 241 242 if (ret < 0) 243 return ERR_PTR(ret); 244 return sock; 245} 246 247static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 248{ 249 struct kvec iov = {buf, len}; 250 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 251 252 return kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); 253} 254 255/* 256 * write something. @more is true if caller will be sending more data 257 * shortly. 258 */ 259static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 260 size_t kvlen, size_t len, int more) 261{ 262 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 263 264 if (more) 265 msg.msg_flags |= MSG_MORE; 266 else 267 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 268 269 return kernel_sendmsg(sock, &msg, iov, kvlen, len); 270} 271 272 273/* 274 * Shutdown/close the socket for the given connection. 275 */ 276static int con_close_socket(struct ceph_connection *con) 277{ 278 int rc; 279 280 dout("con_close_socket on %p sock %p\n", con, con->sock); 281 if (!con->sock) 282 return 0; 283 set_bit(SOCK_CLOSED, &con->state); 284 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 285 sock_release(con->sock); 286 con->sock = NULL; 287 clear_bit(SOCK_CLOSED, &con->state); 288 return rc; 289} 290 291/* 292 * Reset a connection. Discard all incoming and outgoing messages 293 * and clear *_seq state. 294 */ 295static void ceph_msg_remove(struct ceph_msg *msg) 296{ 297 list_del_init(&msg->list_head); 298 ceph_msg_put(msg); 299} 300static void ceph_msg_remove_list(struct list_head *head) 301{ 302 while (!list_empty(head)) { 303 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 304 list_head); 305 ceph_msg_remove(msg); 306 } 307} 308 309static void reset_connection(struct ceph_connection *con) 310{ 311 /* reset connection, out_queue, msg_ and connect_seq */ 312 /* discard existing out_queue and msg_seq */ 313 ceph_msg_remove_list(&con->out_queue); 314 ceph_msg_remove_list(&con->out_sent); 315 316 if (con->in_msg) { 317 ceph_msg_put(con->in_msg); 318 con->in_msg = NULL; 319 } 320 321 con->connect_seq = 0; 322 con->out_seq = 0; 323 if (con->out_msg) { 324 ceph_msg_put(con->out_msg); 325 con->out_msg = NULL; 326 } 327 con->out_keepalive_pending = false; 328 con->in_seq = 0; 329 con->in_seq_acked = 0; 330} 331 332/* 333 * mark a peer down. drop any open connections. 334 */ 335void ceph_con_close(struct ceph_connection *con) 336{ 337 dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr)); 338 set_bit(CLOSED, &con->state); /* in case there's queued work */ 339 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 340 clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ 341 clear_bit(KEEPALIVE_PENDING, &con->state); 342 clear_bit(WRITE_PENDING, &con->state); 343 mutex_lock(&con->mutex); 344 reset_connection(con); 345 con->peer_global_seq = 0; 346 cancel_delayed_work(&con->work); 347 mutex_unlock(&con->mutex); 348 queue_con(con); 349} 350 351/* 352 * Reopen a closed connection, with a new peer address. 353 */ 354void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) 355{ 356 dout("con_open %p %s\n", con, pr_addr(&addr->in_addr)); 357 set_bit(OPENING, &con->state); 358 clear_bit(CLOSED, &con->state); 359 memcpy(&con->peer_addr, addr, sizeof(*addr)); 360 con->delay = 0; /* reset backoff memory */ 361 queue_con(con); 362} 363 364/* 365 * return true if this connection ever successfully opened 366 */ 367bool ceph_con_opened(struct ceph_connection *con) 368{ 369 return con->connect_seq > 0; 370} 371 372/* 373 * generic get/put 374 */ 375struct ceph_connection *ceph_con_get(struct ceph_connection *con) 376{ 377 dout("con_get %p nref = %d -> %d\n", con, 378 atomic_read(&con->nref), atomic_read(&con->nref) + 1); 379 if (atomic_inc_not_zero(&con->nref)) 380 return con; 381 return NULL; 382} 383 384void ceph_con_put(struct ceph_connection *con) 385{ 386 dout("con_put %p nref = %d -> %d\n", con, 387 atomic_read(&con->nref), atomic_read(&con->nref) - 1); 388 BUG_ON(atomic_read(&con->nref) == 0); 389 if (atomic_dec_and_test(&con->nref)) { 390 BUG_ON(con->sock); 391 kfree(con); 392 } 393} 394 395/* 396 * initialize a new connection. 397 */ 398void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) 399{ 400 dout("con_init %p\n", con); 401 memset(con, 0, sizeof(*con)); 402 atomic_set(&con->nref, 1); 403 con->msgr = msgr; 404 mutex_init(&con->mutex); 405 INIT_LIST_HEAD(&con->out_queue); 406 INIT_LIST_HEAD(&con->out_sent); 407 INIT_DELAYED_WORK(&con->work, con_work); 408} 409 410 411/* 412 * We maintain a global counter to order connection attempts. Get 413 * a unique seq greater than @gt. 414 */ 415static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) 416{ 417 u32 ret; 418 419 spin_lock(&msgr->global_seq_lock); 420 if (msgr->global_seq < gt) 421 msgr->global_seq = gt; 422 ret = ++msgr->global_seq; 423 spin_unlock(&msgr->global_seq_lock); 424 return ret; 425} 426 427 428/* 429 * Prepare footer for currently outgoing message, and finish things 430 * off. Assumes out_kvec* are already valid.. we just add on to the end. 431 */ 432static void prepare_write_message_footer(struct ceph_connection *con, int v) 433{ 434 struct ceph_msg *m = con->out_msg; 435 436 dout("prepare_write_message_footer %p\n", con); 437 con->out_kvec_is_msg = true; 438 con->out_kvec[v].iov_base = &m->footer; 439 con->out_kvec[v].iov_len = sizeof(m->footer); 440 con->out_kvec_bytes += sizeof(m->footer); 441 con->out_kvec_left++; 442 con->out_more = m->more_to_follow; 443 con->out_msg_done = true; 444} 445 446/* 447 * Prepare headers for the next outgoing message. 448 */ 449static void prepare_write_message(struct ceph_connection *con) 450{ 451 struct ceph_msg *m; 452 int v = 0; 453 454 con->out_kvec_bytes = 0; 455 con->out_kvec_is_msg = true; 456 con->out_msg_done = false; 457 458 /* Sneak an ack in there first? If we can get it into the same 459 * TCP packet that's a good thing. */ 460 if (con->in_seq > con->in_seq_acked) { 461 con->in_seq_acked = con->in_seq; 462 con->out_kvec[v].iov_base = &tag_ack; 463 con->out_kvec[v++].iov_len = 1; 464 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 465 con->out_kvec[v].iov_base = &con->out_temp_ack; 466 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack); 467 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); 468 } 469 470 m = list_first_entry(&con->out_queue, 471 struct ceph_msg, list_head); 472 con->out_msg = m; 473 if (test_bit(LOSSYTX, &con->state)) { 474 list_del_init(&m->list_head); 475 } else { 476 /* put message on sent list */ 477 ceph_msg_get(m); 478 list_move_tail(&m->list_head, &con->out_sent); 479 } 480 481 /* 482 * only assign outgoing seq # if we haven't sent this message 483 * yet. if it is requeued, resend with it's original seq. 484 */ 485 if (m->needs_out_seq) { 486 m->hdr.seq = cpu_to_le64(++con->out_seq); 487 m->needs_out_seq = false; 488 } 489 490 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 491 m, con->out_seq, le16_to_cpu(m->hdr.type), 492 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 493 le32_to_cpu(m->hdr.data_len), 494 m->nr_pages); 495 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 496 497 /* tag + hdr + front + middle */ 498 con->out_kvec[v].iov_base = &tag_msg; 499 con->out_kvec[v++].iov_len = 1; 500 con->out_kvec[v].iov_base = &m->hdr; 501 con->out_kvec[v++].iov_len = sizeof(m->hdr); 502 con->out_kvec[v++] = m->front; 503 if (m->middle) 504 con->out_kvec[v++] = m->middle->vec; 505 con->out_kvec_left = v; 506 con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len + 507 (m->middle ? m->middle->vec.iov_len : 0); 508 con->out_kvec_cur = con->out_kvec; 509 510 /* fill in crc (except data pages), footer */ 511 con->out_msg->hdr.crc = 512 cpu_to_le32(crc32c(0, (void *)&m->hdr, 513 sizeof(m->hdr) - sizeof(m->hdr.crc))); 514 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; 515 con->out_msg->footer.front_crc = 516 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len)); 517 if (m->middle) 518 con->out_msg->footer.middle_crc = 519 cpu_to_le32(crc32c(0, m->middle->vec.iov_base, 520 m->middle->vec.iov_len)); 521 else 522 con->out_msg->footer.middle_crc = 0; 523 con->out_msg->footer.data_crc = 0; 524 dout("prepare_write_message front_crc %u data_crc %u\n", 525 le32_to_cpu(con->out_msg->footer.front_crc), 526 le32_to_cpu(con->out_msg->footer.middle_crc)); 527 528 /* is there a data payload? */ 529 if (le32_to_cpu(m->hdr.data_len) > 0) { 530 /* initialize page iterator */ 531 con->out_msg_pos.page = 0; 532 con->out_msg_pos.page_pos = 533 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK; 534 con->out_msg_pos.data_pos = 0; 535 con->out_msg_pos.did_page_crc = 0; 536 con->out_more = 1; /* data + footer will follow */ 537 } else { 538 /* no, queue up footer too and be done */ 539 prepare_write_message_footer(con, v); 540 } 541 542 set_bit(WRITE_PENDING, &con->state); 543} 544 545/* 546 * Prepare an ack. 547 */ 548static void prepare_write_ack(struct ceph_connection *con) 549{ 550 dout("prepare_write_ack %p %llu -> %llu\n", con, 551 con->in_seq_acked, con->in_seq); 552 con->in_seq_acked = con->in_seq; 553 554 con->out_kvec[0].iov_base = &tag_ack; 555 con->out_kvec[0].iov_len = 1; 556 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 557 con->out_kvec[1].iov_base = &con->out_temp_ack; 558 con->out_kvec[1].iov_len = sizeof(con->out_temp_ack); 559 con->out_kvec_left = 2; 560 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); 561 con->out_kvec_cur = con->out_kvec; 562 con->out_more = 1; /* more will follow.. eventually.. */ 563 set_bit(WRITE_PENDING, &con->state); 564} 565 566/* 567 * Prepare to write keepalive byte. 568 */ 569static void prepare_write_keepalive(struct ceph_connection *con) 570{ 571 dout("prepare_write_keepalive %p\n", con); 572 con->out_kvec[0].iov_base = &tag_keepalive; 573 con->out_kvec[0].iov_len = 1; 574 con->out_kvec_left = 1; 575 con->out_kvec_bytes = 1; 576 con->out_kvec_cur = con->out_kvec; 577 set_bit(WRITE_PENDING, &con->state); 578} 579 580/* 581 * Connection negotiation. 582 */ 583 584static void prepare_connect_authorizer(struct ceph_connection *con) 585{ 586 void *auth_buf; 587 int auth_len = 0; 588 int auth_protocol = 0; 589 590 mutex_unlock(&con->mutex); 591 if (con->ops->get_authorizer) 592 con->ops->get_authorizer(con, &auth_buf, &auth_len, 593 &auth_protocol, &con->auth_reply_buf, 594 &con->auth_reply_buf_len, 595 con->auth_retry); 596 mutex_lock(&con->mutex); 597 598 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 599 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 600 601 con->out_kvec[con->out_kvec_left].iov_base = auth_buf; 602 con->out_kvec[con->out_kvec_left].iov_len = auth_len; 603 con->out_kvec_left++; 604 con->out_kvec_bytes += auth_len; 605} 606 607/* 608 * We connected to a peer and are saying hello. 609 */ 610static void prepare_write_banner(struct ceph_messenger *msgr, 611 struct ceph_connection *con) 612{ 613 int len = strlen(CEPH_BANNER); 614 615 con->out_kvec[0].iov_base = CEPH_BANNER; 616 con->out_kvec[0].iov_len = len; 617 con->out_kvec[1].iov_base = &msgr->my_enc_addr; 618 con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr); 619 con->out_kvec_left = 2; 620 con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr); 621 con->out_kvec_cur = con->out_kvec; 622 con->out_more = 0; 623 set_bit(WRITE_PENDING, &con->state); 624} 625 626static void prepare_write_connect(struct ceph_messenger *msgr, 627 struct ceph_connection *con, 628 int after_banner) 629{ 630 unsigned global_seq = get_global_seq(con->msgr, 0); 631 int proto; 632 633 switch (con->peer_name.type) { 634 case CEPH_ENTITY_TYPE_MON: 635 proto = CEPH_MONC_PROTOCOL; 636 break; 637 case CEPH_ENTITY_TYPE_OSD: 638 proto = CEPH_OSDC_PROTOCOL; 639 break; 640 case CEPH_ENTITY_TYPE_MDS: 641 proto = CEPH_MDSC_PROTOCOL; 642 break; 643 default: 644 BUG(); 645 } 646 647 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 648 con->connect_seq, global_seq, proto); 649 650 con->out_connect.features = cpu_to_le64(CEPH_FEATURE_SUPPORTED); 651 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 652 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 653 con->out_connect.global_seq = cpu_to_le32(global_seq); 654 con->out_connect.protocol_version = cpu_to_le32(proto); 655 con->out_connect.flags = 0; 656 657 if (!after_banner) { 658 con->out_kvec_left = 0; 659 con->out_kvec_bytes = 0; 660 } 661 con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect; 662 con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect); 663 con->out_kvec_left++; 664 con->out_kvec_bytes += sizeof(con->out_connect); 665 con->out_kvec_cur = con->out_kvec; 666 con->out_more = 0; 667 set_bit(WRITE_PENDING, &con->state); 668 669 prepare_connect_authorizer(con); 670} 671 672 673/* 674 * write as much of pending kvecs to the socket as we can. 675 * 1 -> done 676 * 0 -> socket full, but more to do 677 * <0 -> error 678 */ 679static int write_partial_kvec(struct ceph_connection *con) 680{ 681 int ret; 682 683 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 684 while (con->out_kvec_bytes > 0) { 685 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 686 con->out_kvec_left, con->out_kvec_bytes, 687 con->out_more); 688 if (ret <= 0) 689 goto out; 690 con->out_kvec_bytes -= ret; 691 if (con->out_kvec_bytes == 0) 692 break; /* done */ 693 while (ret > 0) { 694 if (ret >= con->out_kvec_cur->iov_len) { 695 ret -= con->out_kvec_cur->iov_len; 696 con->out_kvec_cur++; 697 con->out_kvec_left--; 698 } else { 699 con->out_kvec_cur->iov_len -= ret; 700 con->out_kvec_cur->iov_base += ret; 701 ret = 0; 702 break; 703 } 704 } 705 } 706 con->out_kvec_left = 0; 707 con->out_kvec_is_msg = false; 708 ret = 1; 709out: 710 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 711 con->out_kvec_bytes, con->out_kvec_left, ret); 712 return ret; /* done! */ 713} 714 715/* 716 * Write as much message data payload as we can. If we finish, queue 717 * up the footer. 718 * 1 -> done, footer is now queued in out_kvec[]. 719 * 0 -> socket full, but more to do 720 * <0 -> error 721 */ 722static int write_partial_msg_pages(struct ceph_connection *con) 723{ 724 struct ceph_msg *msg = con->out_msg; 725 unsigned data_len = le32_to_cpu(msg->hdr.data_len); 726 size_t len; 727 int crc = con->msgr->nocrc; 728 int ret; 729 730 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 731 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 732 con->out_msg_pos.page_pos); 733 734 while (con->out_msg_pos.page < con->out_msg->nr_pages) { 735 struct page *page = NULL; 736 void *kaddr = NULL; 737 738 /* 739 * if we are calculating the data crc (the default), we need 740 * to map the page. if our pages[] has been revoked, use the 741 * zero page. 742 */ 743 if (msg->pages) { 744 page = msg->pages[con->out_msg_pos.page]; 745 if (crc) 746 kaddr = kmap(page); 747 } else if (msg->pagelist) { 748 page = list_first_entry(&msg->pagelist->head, 749 struct page, lru); 750 if (crc) 751 kaddr = kmap(page); 752 } else { 753 page = con->msgr->zero_page; 754 if (crc) 755 kaddr = page_address(con->msgr->zero_page); 756 } 757 len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), 758 (int)(data_len - con->out_msg_pos.data_pos)); 759 if (crc && !con->out_msg_pos.did_page_crc) { 760 void *base = kaddr + con->out_msg_pos.page_pos; 761 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 762 763 BUG_ON(kaddr == NULL); 764 con->out_msg->footer.data_crc = 765 cpu_to_le32(crc32c(tmpcrc, base, len)); 766 con->out_msg_pos.did_page_crc = 1; 767 } 768 769 ret = kernel_sendpage(con->sock, page, 770 con->out_msg_pos.page_pos, len, 771 MSG_DONTWAIT | MSG_NOSIGNAL | 772 MSG_MORE); 773 774 if (crc && (msg->pages || msg->pagelist)) 775 kunmap(page); 776 777 if (ret <= 0) 778 goto out; 779 780 con->out_msg_pos.data_pos += ret; 781 con->out_msg_pos.page_pos += ret; 782 if (ret == len) { 783 con->out_msg_pos.page_pos = 0; 784 con->out_msg_pos.page++; 785 con->out_msg_pos.did_page_crc = 0; 786 if (msg->pagelist) 787 list_move_tail(&page->lru, 788 &msg->pagelist->head); 789 } 790 } 791 792 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 793 794 /* prepare and queue up footer, too */ 795 if (!crc) 796 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 797 con->out_kvec_bytes = 0; 798 con->out_kvec_left = 0; 799 con->out_kvec_cur = con->out_kvec; 800 prepare_write_message_footer(con, 0); 801 ret = 1; 802out: 803 return ret; 804} 805 806/* 807 * write some zeros 808 */ 809static int write_partial_skip(struct ceph_connection *con) 810{ 811 int ret; 812 813 while (con->out_skip > 0) { 814 struct kvec iov = { 815 .iov_base = page_address(con->msgr->zero_page), 816 .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE) 817 }; 818 819 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1); 820 if (ret <= 0) 821 goto out; 822 con->out_skip -= ret; 823 } 824 ret = 1; 825out: 826 return ret; 827} 828 829/* 830 * Prepare to read connection handshake, or an ack. 831 */ 832static void prepare_read_banner(struct ceph_connection *con) 833{ 834 dout("prepare_read_banner %p\n", con); 835 con->in_base_pos = 0; 836} 837 838static void prepare_read_connect(struct ceph_connection *con) 839{ 840 dout("prepare_read_connect %p\n", con); 841 con->in_base_pos = 0; 842} 843 844static void prepare_read_ack(struct ceph_connection *con) 845{ 846 dout("prepare_read_ack %p\n", con); 847 con->in_base_pos = 0; 848} 849 850static void prepare_read_tag(struct ceph_connection *con) 851{ 852 dout("prepare_read_tag %p\n", con); 853 con->in_base_pos = 0; 854 con->in_tag = CEPH_MSGR_TAG_READY; 855} 856 857/* 858 * Prepare to read a message. 859 */ 860static int prepare_read_message(struct ceph_connection *con) 861{ 862 dout("prepare_read_message %p\n", con); 863 BUG_ON(con->in_msg != NULL); 864 con->in_base_pos = 0; 865 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 866 return 0; 867} 868 869 870static int read_partial(struct ceph_connection *con, 871 int *to, int size, void *object) 872{ 873 *to += size; 874 while (con->in_base_pos < *to) { 875 int left = *to - con->in_base_pos; 876 int have = size - left; 877 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 878 if (ret <= 0) 879 return ret; 880 con->in_base_pos += ret; 881 } 882 return 1; 883} 884 885 886/* 887 * Read all or part of the connect-side handshake on a new connection 888 */ 889static int read_partial_banner(struct ceph_connection *con) 890{ 891 int ret, to = 0; 892 893 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 894 895 /* peer's banner */ 896 ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); 897 if (ret <= 0) 898 goto out; 899 ret = read_partial(con, &to, sizeof(con->actual_peer_addr), 900 &con->actual_peer_addr); 901 if (ret <= 0) 902 goto out; 903 ret = read_partial(con, &to, sizeof(con->peer_addr_for_me), 904 &con->peer_addr_for_me); 905 if (ret <= 0) 906 goto out; 907out: 908 return ret; 909} 910 911static int read_partial_connect(struct ceph_connection *con) 912{ 913 int ret, to = 0; 914 915 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 916 917 ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); 918 if (ret <= 0) 919 goto out; 920 ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len), 921 con->auth_reply_buf); 922 if (ret <= 0) 923 goto out; 924 925 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 926 con, (int)con->in_reply.tag, 927 le32_to_cpu(con->in_reply.connect_seq), 928 le32_to_cpu(con->in_reply.global_seq)); 929out: 930 return ret; 931 932} 933 934/* 935 * Verify the hello banner looks okay. 936 */ 937static int verify_hello(struct ceph_connection *con) 938{ 939 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 940 pr_err("connect to %s got bad banner\n", 941 pr_addr(&con->peer_addr.in_addr)); 942 con->error_msg = "protocol error, bad banner"; 943 return -1; 944 } 945 return 0; 946} 947 948static bool addr_is_blank(struct sockaddr_storage *ss) 949{ 950 switch (ss->ss_family) { 951 case AF_INET: 952 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; 953 case AF_INET6: 954 return 955 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && 956 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && 957 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && 958 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; 959 } 960 return false; 961} 962 963static int addr_port(struct sockaddr_storage *ss) 964{ 965 switch (ss->ss_family) { 966 case AF_INET: 967 return ntohs(((struct sockaddr_in *)ss)->sin_port); 968 case AF_INET6: 969 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port); 970 } 971 return 0; 972} 973 974static void addr_set_port(struct sockaddr_storage *ss, int p) 975{ 976 switch (ss->ss_family) { 977 case AF_INET: 978 ((struct sockaddr_in *)ss)->sin_port = htons(p); 979 case AF_INET6: 980 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p); 981 } 982} 983 984/* 985 * Parse an ip[:port] list into an addr array. Use the default 986 * monitor port if a port isn't specified. 987 */ 988int ceph_parse_ips(const char *c, const char *end, 989 struct ceph_entity_addr *addr, 990 int max_count, int *count) 991{ 992 int i; 993 const char *p = c; 994 995 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 996 for (i = 0; i < max_count; i++) { 997 const char *ipend; 998 struct sockaddr_storage *ss = &addr[i].in_addr; 999 struct sockaddr_in *in4 = (void *)ss; 1000 struct sockaddr_in6 *in6 = (void *)ss; 1001 int port; 1002 char delim = ','; 1003 1004 if (*p == '[') { 1005 delim = ']'; 1006 p++; 1007 } 1008 1009 memset(ss, 0, sizeof(*ss)); 1010 if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr, 1011 delim, &ipend)) 1012 ss->ss_family = AF_INET; 1013 else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr, 1014 delim, &ipend)) 1015 ss->ss_family = AF_INET6; 1016 else 1017 goto bad; 1018 p = ipend; 1019 1020 if (delim == ']') { 1021 if (*p != ']') { 1022 dout("missing matching ']'\n"); 1023 goto bad; 1024 } 1025 p++; 1026 } 1027 1028 /* port? */ 1029 if (p < end && *p == ':') { 1030 port = 0; 1031 p++; 1032 while (p < end && *p >= '0' && *p <= '9') { 1033 port = (port * 10) + (*p - '0'); 1034 p++; 1035 } 1036 if (port > 65535 || port == 0) 1037 goto bad; 1038 } else { 1039 port = CEPH_MON_PORT; 1040 } 1041 1042 addr_set_port(ss, port); 1043 1044 dout("parse_ips got %s\n", pr_addr(ss)); 1045 1046 if (p == end) 1047 break; 1048 if (*p != ',') 1049 goto bad; 1050 p++; 1051 } 1052 1053 if (p != end) 1054 goto bad; 1055 1056 if (count) 1057 *count = i + 1; 1058 return 0; 1059 1060bad: 1061 pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); 1062 return -EINVAL; 1063} 1064 1065static int process_banner(struct ceph_connection *con) 1066{ 1067 dout("process_banner on %p\n", con); 1068 1069 if (verify_hello(con) < 0) 1070 return -1; 1071 1072 ceph_decode_addr(&con->actual_peer_addr); 1073 ceph_decode_addr(&con->peer_addr_for_me); 1074 1075 /* 1076 * Make sure the other end is who we wanted. note that the other 1077 * end may not yet know their ip address, so if it's 0.0.0.0, give 1078 * them the benefit of the doubt. 1079 */ 1080 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 1081 sizeof(con->peer_addr)) != 0 && 1082 !(addr_is_blank(&con->actual_peer_addr.in_addr) && 1083 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 1084 pr_warning("wrong peer, want %s/%d, got %s/%d\n", 1085 pr_addr(&con->peer_addr.in_addr), 1086 (int)le32_to_cpu(con->peer_addr.nonce), 1087 pr_addr(&con->actual_peer_addr.in_addr), 1088 (int)le32_to_cpu(con->actual_peer_addr.nonce)); 1089 con->error_msg = "wrong peer at address"; 1090 return -1; 1091 } 1092 1093 /* 1094 * did we learn our address? 1095 */ 1096 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { 1097 int port = addr_port(&con->msgr->inst.addr.in_addr); 1098 1099 memcpy(&con->msgr->inst.addr.in_addr, 1100 &con->peer_addr_for_me.in_addr, 1101 sizeof(con->peer_addr_for_me.in_addr)); 1102 addr_set_port(&con->msgr->inst.addr.in_addr, port); 1103 encode_my_addr(con->msgr); 1104 dout("process_banner learned my addr is %s\n", 1105 pr_addr(&con->msgr->inst.addr.in_addr)); 1106 } 1107 1108 set_bit(NEGOTIATING, &con->state); 1109 prepare_read_connect(con); 1110 return 0; 1111} 1112 1113static void fail_protocol(struct ceph_connection *con) 1114{ 1115 reset_connection(con); 1116 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1117 1118 mutex_unlock(&con->mutex); 1119 if (con->ops->bad_proto) 1120 con->ops->bad_proto(con); 1121 mutex_lock(&con->mutex); 1122} 1123 1124static int process_connect(struct ceph_connection *con) 1125{ 1126 u64 sup_feat = CEPH_FEATURE_SUPPORTED; 1127 u64 req_feat = CEPH_FEATURE_REQUIRED; 1128 u64 server_feat = le64_to_cpu(con->in_reply.features); 1129 1130 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1131 1132 switch (con->in_reply.tag) { 1133 case CEPH_MSGR_TAG_FEATURES: 1134 pr_err("%s%lld %s feature set mismatch," 1135 " my %llx < server's %llx, missing %llx\n", 1136 ENTITY_NAME(con->peer_name), 1137 pr_addr(&con->peer_addr.in_addr), 1138 sup_feat, server_feat, server_feat & ~sup_feat); 1139 con->error_msg = "missing required protocol features"; 1140 fail_protocol(con); 1141 return -1; 1142 1143 case CEPH_MSGR_TAG_BADPROTOVER: 1144 pr_err("%s%lld %s protocol version mismatch," 1145 " my %d != server's %d\n", 1146 ENTITY_NAME(con->peer_name), 1147 pr_addr(&con->peer_addr.in_addr), 1148 le32_to_cpu(con->out_connect.protocol_version), 1149 le32_to_cpu(con->in_reply.protocol_version)); 1150 con->error_msg = "protocol version mismatch"; 1151 fail_protocol(con); 1152 return -1; 1153 1154 case CEPH_MSGR_TAG_BADAUTHORIZER: 1155 con->auth_retry++; 1156 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 1157 con->auth_retry); 1158 if (con->auth_retry == 2) { 1159 con->error_msg = "connect authorization failure"; 1160 reset_connection(con); 1161 set_bit(CLOSED, &con->state); 1162 return -1; 1163 } 1164 con->auth_retry = 1; 1165 prepare_write_connect(con->msgr, con, 0); 1166 prepare_read_connect(con); 1167 break; 1168 1169 case CEPH_MSGR_TAG_RESETSESSION: 1170 /* 1171 * If we connected with a large connect_seq but the peer 1172 * has no record of a session with us (no connection, or 1173 * connect_seq == 0), they will send RESETSESION to indicate 1174 * that they must have reset their session, and may have 1175 * dropped messages. 1176 */ 1177 dout("process_connect got RESET peer seq %u\n", 1178 le32_to_cpu(con->in_connect.connect_seq)); 1179 pr_err("%s%lld %s connection reset\n", 1180 ENTITY_NAME(con->peer_name), 1181 pr_addr(&con->peer_addr.in_addr)); 1182 reset_connection(con); 1183 prepare_write_connect(con->msgr, con, 0); 1184 prepare_read_connect(con); 1185 1186 /* Tell ceph about it. */ 1187 mutex_unlock(&con->mutex); 1188 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); 1189 if (con->ops->peer_reset) 1190 con->ops->peer_reset(con); 1191 mutex_lock(&con->mutex); 1192 break; 1193 1194 case CEPH_MSGR_TAG_RETRY_SESSION: 1195 /* 1196 * If we sent a smaller connect_seq than the peer has, try 1197 * again with a larger value. 1198 */ 1199 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n", 1200 le32_to_cpu(con->out_connect.connect_seq), 1201 le32_to_cpu(con->in_connect.connect_seq)); 1202 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1203 prepare_write_connect(con->msgr, con, 0); 1204 prepare_read_connect(con); 1205 break; 1206 1207 case CEPH_MSGR_TAG_RETRY_GLOBAL: 1208 /* 1209 * If we sent a smaller global_seq than the peer has, try 1210 * again with a larger value. 1211 */ 1212 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 1213 con->peer_global_seq, 1214 le32_to_cpu(con->in_connect.global_seq)); 1215 get_global_seq(con->msgr, 1216 le32_to_cpu(con->in_connect.global_seq)); 1217 prepare_write_connect(con->msgr, con, 0); 1218 prepare_read_connect(con); 1219 break; 1220 1221 case CEPH_MSGR_TAG_READY: 1222 if (req_feat & ~server_feat) { 1223 pr_err("%s%lld %s protocol feature mismatch," 1224 " my required %llx > server's %llx, need %llx\n", 1225 ENTITY_NAME(con->peer_name), 1226 pr_addr(&con->peer_addr.in_addr), 1227 req_feat, server_feat, req_feat & ~server_feat); 1228 con->error_msg = "missing required protocol features"; 1229 fail_protocol(con); 1230 return -1; 1231 } 1232 clear_bit(CONNECTING, &con->state); 1233 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1234 con->connect_seq++; 1235 con->peer_features = server_feat; 1236 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1237 con->peer_global_seq, 1238 le32_to_cpu(con->in_reply.connect_seq), 1239 con->connect_seq); 1240 WARN_ON(con->connect_seq != 1241 le32_to_cpu(con->in_reply.connect_seq)); 1242 1243 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1244 set_bit(LOSSYTX, &con->state); 1245 1246 prepare_read_tag(con); 1247 break; 1248 1249 case CEPH_MSGR_TAG_WAIT: 1250 /* 1251 * If there is a connection race (we are opening 1252 * connections to each other), one of us may just have 1253 * to WAIT. This shouldn't happen if we are the 1254 * client. 1255 */ 1256 pr_err("process_connect peer connecting WAIT\n"); 1257 1258 default: 1259 pr_err("connect protocol error, will retry\n"); 1260 con->error_msg = "protocol error, garbage tag during connect"; 1261 return -1; 1262 } 1263 return 0; 1264} 1265 1266 1267/* 1268 * read (part of) an ack 1269 */ 1270static int read_partial_ack(struct ceph_connection *con) 1271{ 1272 int to = 0; 1273 1274 return read_partial(con, &to, sizeof(con->in_temp_ack), 1275 &con->in_temp_ack); 1276} 1277 1278 1279/* 1280 * We can finally discard anything that's been acked. 1281 */ 1282static void process_ack(struct ceph_connection *con) 1283{ 1284 struct ceph_msg *m; 1285 u64 ack = le64_to_cpu(con->in_temp_ack); 1286 u64 seq; 1287 1288 while (!list_empty(&con->out_sent)) { 1289 m = list_first_entry(&con->out_sent, struct ceph_msg, 1290 list_head); 1291 seq = le64_to_cpu(m->hdr.seq); 1292 if (seq > ack) 1293 break; 1294 dout("got ack for seq %llu type %d at %p\n", seq, 1295 le16_to_cpu(m->hdr.type), m); 1296 ceph_msg_remove(m); 1297 } 1298 prepare_read_tag(con); 1299} 1300 1301 1302 1303 1304static int read_partial_message_section(struct ceph_connection *con, 1305 struct kvec *section, 1306 unsigned int sec_len, u32 *crc) 1307{ 1308 int left; 1309 int ret; 1310 1311 BUG_ON(!section); 1312 1313 while (section->iov_len < sec_len) { 1314 BUG_ON(section->iov_base == NULL); 1315 left = sec_len - section->iov_len; 1316 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 1317 section->iov_len, left); 1318 if (ret <= 0) 1319 return ret; 1320 section->iov_len += ret; 1321 if (section->iov_len == sec_len) 1322 *crc = crc32c(0, section->iov_base, 1323 section->iov_len); 1324 } 1325 1326 return 1; 1327} 1328 1329static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1330 struct ceph_msg_header *hdr, 1331 int *skip); 1332/* 1333 * read (part of) a message. 1334 */ 1335static int read_partial_message(struct ceph_connection *con) 1336{ 1337 struct ceph_msg *m = con->in_msg; 1338 void *p; 1339 int ret; 1340 int to, left; 1341 unsigned front_len, middle_len, data_len, data_off; 1342 int datacrc = con->msgr->nocrc; 1343 int skip; 1344 u64 seq; 1345 1346 dout("read_partial_message con %p msg %p\n", con, m); 1347 1348 /* header */ 1349 while (con->in_base_pos < sizeof(con->in_hdr)) { 1350 left = sizeof(con->in_hdr) - con->in_base_pos; 1351 ret = ceph_tcp_recvmsg(con->sock, 1352 (char *)&con->in_hdr + con->in_base_pos, 1353 left); 1354 if (ret <= 0) 1355 return ret; 1356 con->in_base_pos += ret; 1357 if (con->in_base_pos == sizeof(con->in_hdr)) { 1358 u32 crc = crc32c(0, (void *)&con->in_hdr, 1359 sizeof(con->in_hdr) - sizeof(con->in_hdr.crc)); 1360 if (crc != le32_to_cpu(con->in_hdr.crc)) { 1361 pr_err("read_partial_message bad hdr " 1362 " crc %u != expected %u\n", 1363 crc, con->in_hdr.crc); 1364 return -EBADMSG; 1365 } 1366 } 1367 } 1368 front_len = le32_to_cpu(con->in_hdr.front_len); 1369 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1370 return -EIO; 1371 middle_len = le32_to_cpu(con->in_hdr.middle_len); 1372 if (middle_len > CEPH_MSG_MAX_DATA_LEN) 1373 return -EIO; 1374 data_len = le32_to_cpu(con->in_hdr.data_len); 1375 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1376 return -EIO; 1377 data_off = le16_to_cpu(con->in_hdr.data_off); 1378 1379 /* verify seq# */ 1380 seq = le64_to_cpu(con->in_hdr.seq); 1381 if ((s64)seq - (s64)con->in_seq < 1) { 1382 pr_info("skipping %s%lld %s seq %lld, expected %lld\n", 1383 ENTITY_NAME(con->peer_name), 1384 pr_addr(&con->peer_addr.in_addr), 1385 seq, con->in_seq + 1); 1386 con->in_base_pos = -front_len - middle_len - data_len - 1387 sizeof(m->footer); 1388 con->in_tag = CEPH_MSGR_TAG_READY; 1389 con->in_seq++; 1390 return 0; 1391 } else if ((s64)seq - (s64)con->in_seq > 1) { 1392 pr_err("read_partial_message bad seq %lld expected %lld\n", 1393 seq, con->in_seq + 1); 1394 con->error_msg = "bad message sequence # for incoming message"; 1395 return -EBADMSG; 1396 } 1397 1398 /* allocate message? */ 1399 if (!con->in_msg) { 1400 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1401 con->in_hdr.front_len, con->in_hdr.data_len); 1402 skip = 0; 1403 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1404 if (skip) { 1405 /* skip this message */ 1406 dout("alloc_msg said skip message\n"); 1407 BUG_ON(con->in_msg); 1408 con->in_base_pos = -front_len - middle_len - data_len - 1409 sizeof(m->footer); 1410 con->in_tag = CEPH_MSGR_TAG_READY; 1411 con->in_seq++; 1412 return 0; 1413 } 1414 if (!con->in_msg) { 1415 con->error_msg = 1416 "error allocating memory for incoming message"; 1417 return -ENOMEM; 1418 } 1419 m = con->in_msg; 1420 m->front.iov_len = 0; /* haven't read it yet */ 1421 if (m->middle) 1422 m->middle->vec.iov_len = 0; 1423 1424 con->in_msg_pos.page = 0; 1425 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; 1426 con->in_msg_pos.data_pos = 0; 1427 } 1428 1429 /* front */ 1430 ret = read_partial_message_section(con, &m->front, front_len, 1431 &con->in_front_crc); 1432 if (ret <= 0) 1433 return ret; 1434 1435 /* middle */ 1436 if (m->middle) { 1437 ret = read_partial_message_section(con, &m->middle->vec, 1438 middle_len, 1439 &con->in_middle_crc); 1440 if (ret <= 0) 1441 return ret; 1442 } 1443 1444 /* (page) data */ 1445 while (con->in_msg_pos.data_pos < data_len) { 1446 left = min((int)(data_len - con->in_msg_pos.data_pos), 1447 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1448 BUG_ON(m->pages == NULL); 1449 p = kmap(m->pages[con->in_msg_pos.page]); 1450 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1451 left); 1452 if (ret > 0 && datacrc) 1453 con->in_data_crc = 1454 crc32c(con->in_data_crc, 1455 p + con->in_msg_pos.page_pos, ret); 1456 kunmap(m->pages[con->in_msg_pos.page]); 1457 if (ret <= 0) 1458 return ret; 1459 con->in_msg_pos.data_pos += ret; 1460 con->in_msg_pos.page_pos += ret; 1461 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1462 con->in_msg_pos.page_pos = 0; 1463 con->in_msg_pos.page++; 1464 } 1465 } 1466 1467 /* footer */ 1468 to = sizeof(m->hdr) + sizeof(m->footer); 1469 while (con->in_base_pos < to) { 1470 left = to - con->in_base_pos; 1471 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + 1472 (con->in_base_pos - sizeof(m->hdr)), 1473 left); 1474 if (ret <= 0) 1475 return ret; 1476 con->in_base_pos += ret; 1477 } 1478 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1479 m, front_len, m->footer.front_crc, middle_len, 1480 m->footer.middle_crc, data_len, m->footer.data_crc); 1481 1482 /* crc ok? */ 1483 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 1484 pr_err("read_partial_message %p front crc %u != exp. %u\n", 1485 m, con->in_front_crc, m->footer.front_crc); 1486 return -EBADMSG; 1487 } 1488 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 1489 pr_err("read_partial_message %p middle crc %u != exp %u\n", 1490 m, con->in_middle_crc, m->footer.middle_crc); 1491 return -EBADMSG; 1492 } 1493 if (datacrc && 1494 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 1495 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 1496 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 1497 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 1498 return -EBADMSG; 1499 } 1500 1501 return 1; /* done! */ 1502} 1503 1504/* 1505 * Process message. This happens in the worker thread. The callback should 1506 * be careful not to do anything that waits on other incoming messages or it 1507 * may deadlock. 1508 */ 1509static void process_message(struct ceph_connection *con) 1510{ 1511 struct ceph_msg *msg; 1512 1513 msg = con->in_msg; 1514 con->in_msg = NULL; 1515 1516 /* if first message, set peer_name */ 1517 if (con->peer_name.type == 0) 1518 con->peer_name = msg->hdr.src; 1519 1520 con->in_seq++; 1521 mutex_unlock(&con->mutex); 1522 1523 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1524 msg, le64_to_cpu(msg->hdr.seq), 1525 ENTITY_NAME(msg->hdr.src), 1526 le16_to_cpu(msg->hdr.type), 1527 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1528 le32_to_cpu(msg->hdr.front_len), 1529 le32_to_cpu(msg->hdr.data_len), 1530 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 1531 con->ops->dispatch(con, msg); 1532 1533 mutex_lock(&con->mutex); 1534 prepare_read_tag(con); 1535} 1536 1537 1538/* 1539 * Write something to the socket. Called in a worker thread when the 1540 * socket appears to be writeable and we have something ready to send. 1541 */ 1542static int try_write(struct ceph_connection *con) 1543{ 1544 struct ceph_messenger *msgr = con->msgr; 1545 int ret = 1; 1546 1547 dout("try_write start %p state %lu nref %d\n", con, con->state, 1548 atomic_read(&con->nref)); 1549 1550more: 1551 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1552 1553 /* open the socket first? */ 1554 if (con->sock == NULL) { 1555 /* 1556 * if we were STANDBY and are reconnecting _this_ 1557 * connection, bump connect_seq now. Always bump 1558 * global_seq. 1559 */ 1560 if (test_and_clear_bit(STANDBY, &con->state)) 1561 con->connect_seq++; 1562 1563 prepare_write_banner(msgr, con); 1564 prepare_write_connect(msgr, con, 1); 1565 prepare_read_banner(con); 1566 set_bit(CONNECTING, &con->state); 1567 clear_bit(NEGOTIATING, &con->state); 1568 1569 BUG_ON(con->in_msg); 1570 con->in_tag = CEPH_MSGR_TAG_READY; 1571 dout("try_write initiating connect on %p new state %lu\n", 1572 con, con->state); 1573 con->sock = ceph_tcp_connect(con); 1574 if (IS_ERR(con->sock)) { 1575 con->sock = NULL; 1576 con->error_msg = "connect error"; 1577 ret = -1; 1578 goto out; 1579 } 1580 } 1581 1582more_kvec: 1583 /* kvec data queued? */ 1584 if (con->out_skip) { 1585 ret = write_partial_skip(con); 1586 if (ret <= 0) 1587 goto done; 1588 if (ret < 0) { 1589 dout("try_write write_partial_skip err %d\n", ret); 1590 goto done; 1591 } 1592 } 1593 if (con->out_kvec_left) { 1594 ret = write_partial_kvec(con); 1595 if (ret <= 0) 1596 goto done; 1597 } 1598 1599 /* msg pages? */ 1600 if (con->out_msg) { 1601 if (con->out_msg_done) { 1602 ceph_msg_put(con->out_msg); 1603 con->out_msg = NULL; /* we're done with this one */ 1604 goto do_next; 1605 } 1606 1607 ret = write_partial_msg_pages(con); 1608 if (ret == 1) 1609 goto more_kvec; /* we need to send the footer, too! */ 1610 if (ret == 0) 1611 goto done; 1612 if (ret < 0) { 1613 dout("try_write write_partial_msg_pages err %d\n", 1614 ret); 1615 goto done; 1616 } 1617 } 1618 1619do_next: 1620 if (!test_bit(CONNECTING, &con->state)) { 1621 /* is anything else pending? */ 1622 if (!list_empty(&con->out_queue)) { 1623 prepare_write_message(con); 1624 goto more; 1625 } 1626 if (con->in_seq > con->in_seq_acked) { 1627 prepare_write_ack(con); 1628 goto more; 1629 } 1630 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { 1631 prepare_write_keepalive(con); 1632 goto more; 1633 } 1634 } 1635 1636 /* Nothing to do! */ 1637 clear_bit(WRITE_PENDING, &con->state); 1638 dout("try_write nothing else to write.\n"); 1639done: 1640 ret = 0; 1641out: 1642 dout("try_write done on %p\n", con); 1643 return ret; 1644} 1645 1646 1647 1648/* 1649 * Read what we can from the socket. 1650 */ 1651static int try_read(struct ceph_connection *con) 1652{ 1653 int ret = -1; 1654 1655 if (!con->sock) 1656 return 0; 1657 1658 if (test_bit(STANDBY, &con->state)) 1659 return 0; 1660 1661 dout("try_read start on %p\n", con); 1662 1663more: 1664 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1665 con->in_base_pos); 1666 if (test_bit(CONNECTING, &con->state)) { 1667 if (!test_bit(NEGOTIATING, &con->state)) { 1668 dout("try_read connecting\n"); 1669 ret = read_partial_banner(con); 1670 if (ret <= 0) 1671 goto done; 1672 if (process_banner(con) < 0) { 1673 ret = -1; 1674 goto out; 1675 } 1676 } 1677 ret = read_partial_connect(con); 1678 if (ret <= 0) 1679 goto done; 1680 if (process_connect(con) < 0) { 1681 ret = -1; 1682 goto out; 1683 } 1684 goto more; 1685 } 1686 1687 if (con->in_base_pos < 0) { 1688 static char buf[1024]; 1689 int skip = min(1024, -con->in_base_pos); 1690 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); 1691 ret = ceph_tcp_recvmsg(con->sock, buf, skip); 1692 if (ret <= 0) 1693 goto done; 1694 con->in_base_pos += ret; 1695 if (con->in_base_pos) 1696 goto more; 1697 } 1698 if (con->in_tag == CEPH_MSGR_TAG_READY) { 1699 /* 1700 * what's next? 1701 */ 1702 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 1703 if (ret <= 0) 1704 goto done; 1705 dout("try_read got tag %d\n", (int)con->in_tag); 1706 switch (con->in_tag) { 1707 case CEPH_MSGR_TAG_MSG: 1708 prepare_read_message(con); 1709 break; 1710 case CEPH_MSGR_TAG_ACK: 1711 prepare_read_ack(con); 1712 break; 1713 case CEPH_MSGR_TAG_CLOSE: 1714 set_bit(CLOSED, &con->state); 1715 goto done; 1716 default: 1717 goto bad_tag; 1718 } 1719 } 1720 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 1721 ret = read_partial_message(con); 1722 if (ret <= 0) { 1723 switch (ret) { 1724 case -EBADMSG: 1725 con->error_msg = "bad crc"; 1726 ret = -EIO; 1727 goto out; 1728 case -EIO: 1729 con->error_msg = "io error"; 1730 goto out; 1731 default: 1732 goto done; 1733 } 1734 } 1735 if (con->in_tag == CEPH_MSGR_TAG_READY) 1736 goto more; 1737 process_message(con); 1738 goto more; 1739 } 1740 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 1741 ret = read_partial_ack(con); 1742 if (ret <= 0) 1743 goto done; 1744 process_ack(con); 1745 goto more; 1746 } 1747 1748done: 1749 ret = 0; 1750out: 1751 dout("try_read done on %p\n", con); 1752 return ret; 1753 1754bad_tag: 1755 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 1756 con->error_msg = "protocol error, garbage tag"; 1757 ret = -1; 1758 goto out; 1759} 1760 1761 1762/* 1763 * Atomically queue work on a connection. Bump @con reference to 1764 * avoid races with connection teardown. 1765 * 1766 * There is some trickery going on with QUEUED and BUSY because we 1767 * only want a _single_ thread operating on each connection at any 1768 * point in time, but we want to use all available CPUs. 1769 * 1770 * The worker thread only proceeds if it can atomically set BUSY. It 1771 * clears QUEUED and does it's thing. When it thinks it's done, it 1772 * clears BUSY, then rechecks QUEUED.. if it's set again, it loops 1773 * (tries again to set BUSY). 1774 * 1775 * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we 1776 * try to queue work. If that fails (work is already queued, or BUSY) 1777 * we give up (work also already being done or is queued) but leave QUEUED 1778 * set so that the worker thread will loop if necessary. 1779 */ 1780static void queue_con(struct ceph_connection *con) 1781{ 1782 if (test_bit(DEAD, &con->state)) { 1783 dout("queue_con %p ignoring: DEAD\n", 1784 con); 1785 return; 1786 } 1787 1788 if (!con->ops->get(con)) { 1789 dout("queue_con %p ref count 0\n", con); 1790 return; 1791 } 1792 1793 set_bit(QUEUED, &con->state); 1794 if (test_bit(BUSY, &con->state)) { 1795 dout("queue_con %p - already BUSY\n", con); 1796 con->ops->put(con); 1797 } else if (!queue_work(ceph_msgr_wq, &con->work.work)) { 1798 dout("queue_con %p - already queued\n", con); 1799 con->ops->put(con); 1800 } else { 1801 dout("queue_con %p\n", con); 1802 } 1803} 1804 1805/* 1806 * Do some work on a connection. Drop a connection ref when we're done. 1807 */ 1808static void con_work(struct work_struct *work) 1809{ 1810 struct ceph_connection *con = container_of(work, struct ceph_connection, 1811 work.work); 1812 int backoff = 0; 1813 1814more: 1815 if (test_and_set_bit(BUSY, &con->state) != 0) { 1816 dout("con_work %p BUSY already set\n", con); 1817 goto out; 1818 } 1819 dout("con_work %p start, clearing QUEUED\n", con); 1820 clear_bit(QUEUED, &con->state); 1821 1822 mutex_lock(&con->mutex); 1823 1824 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 1825 dout("con_work CLOSED\n"); 1826 con_close_socket(con); 1827 goto done; 1828 } 1829 if (test_and_clear_bit(OPENING, &con->state)) { 1830 /* reopen w/ new peer */ 1831 dout("con_work OPENING\n"); 1832 con_close_socket(con); 1833 } 1834 1835 if (test_and_clear_bit(SOCK_CLOSED, &con->state) || 1836 try_read(con) < 0 || 1837 try_write(con) < 0) { 1838 mutex_unlock(&con->mutex); 1839 backoff = 1; 1840 ceph_fault(con); /* error/fault path */ 1841 goto done_unlocked; 1842 } 1843 1844done: 1845 mutex_unlock(&con->mutex); 1846 1847done_unlocked: 1848 clear_bit(BUSY, &con->state); 1849 dout("con->state=%lu\n", con->state); 1850 if (test_bit(QUEUED, &con->state)) { 1851 if (!backoff || test_bit(OPENING, &con->state)) { 1852 dout("con_work %p QUEUED reset, looping\n", con); 1853 goto more; 1854 } 1855 dout("con_work %p QUEUED reset, but just faulted\n", con); 1856 clear_bit(QUEUED, &con->state); 1857 } 1858 dout("con_work %p done\n", con); 1859 1860out: 1861 con->ops->put(con); 1862} 1863 1864 1865/* 1866 * Generic error/fault handler. A retry mechanism is used with 1867 * exponential backoff 1868 */ 1869static void ceph_fault(struct ceph_connection *con) 1870{ 1871 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 1872 pr_addr(&con->peer_addr.in_addr), con->error_msg); 1873 dout("fault %p state %lu to peer %s\n", 1874 con, con->state, pr_addr(&con->peer_addr.in_addr)); 1875 1876 if (test_bit(LOSSYTX, &con->state)) { 1877 dout("fault on LOSSYTX channel\n"); 1878 goto out; 1879 } 1880 1881 mutex_lock(&con->mutex); 1882 if (test_bit(CLOSED, &con->state)) 1883 goto out_unlock; 1884 1885 con_close_socket(con); 1886 1887 if (con->in_msg) { 1888 ceph_msg_put(con->in_msg); 1889 con->in_msg = NULL; 1890 } 1891 1892 /* Requeue anything that hasn't been acked */ 1893 list_splice_init(&con->out_sent, &con->out_queue); 1894 1895 /* If there are no messages in the queue, place the connection 1896 * in a STANDBY state (i.e., don't try to reconnect just yet). */ 1897 if (list_empty(&con->out_queue) && !con->out_keepalive_pending) { 1898 dout("fault setting STANDBY\n"); 1899 set_bit(STANDBY, &con->state); 1900 } else { 1901 /* retry after a delay. */ 1902 if (con->delay == 0) 1903 con->delay = BASE_DELAY_INTERVAL; 1904 else if (con->delay < MAX_DELAY_INTERVAL) 1905 con->delay *= 2; 1906 dout("fault queueing %p delay %lu\n", con, con->delay); 1907 con->ops->get(con); 1908 if (queue_delayed_work(ceph_msgr_wq, &con->work, 1909 round_jiffies_relative(con->delay)) == 0) 1910 con->ops->put(con); 1911 } 1912 1913out_unlock: 1914 mutex_unlock(&con->mutex); 1915out: 1916 /* 1917 * in case we faulted due to authentication, invalidate our 1918 * current tickets so that we can get new ones. 1919 */ 1920 if (con->auth_retry && con->ops->invalidate_authorizer) { 1921 dout("calling invalidate_authorizer()\n"); 1922 con->ops->invalidate_authorizer(con); 1923 } 1924 1925 if (con->ops->fault) 1926 con->ops->fault(con); 1927} 1928 1929 1930 1931/* 1932 * create a new messenger instance 1933 */ 1934struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) 1935{ 1936 struct ceph_messenger *msgr; 1937 1938 msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); 1939 if (msgr == NULL) 1940 return ERR_PTR(-ENOMEM); 1941 1942 spin_lock_init(&msgr->global_seq_lock); 1943 1944 /* the zero page is needed if a request is "canceled" while the message 1945 * is being written over the socket */ 1946 msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO); 1947 if (!msgr->zero_page) { 1948 kfree(msgr); 1949 return ERR_PTR(-ENOMEM); 1950 } 1951 kmap(msgr->zero_page); 1952 1953 if (myaddr) 1954 msgr->inst.addr = *myaddr; 1955 1956 /* select a random nonce */ 1957 msgr->inst.addr.type = 0; 1958 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 1959 encode_my_addr(msgr); 1960 1961 dout("messenger_create %p\n", msgr); 1962 return msgr; 1963} 1964 1965void ceph_messenger_destroy(struct ceph_messenger *msgr) 1966{ 1967 dout("destroy %p\n", msgr); 1968 kunmap(msgr->zero_page); 1969 __free_page(msgr->zero_page); 1970 kfree(msgr); 1971 dout("destroyed messenger %p\n", msgr); 1972} 1973 1974/* 1975 * Queue up an outgoing message on the given connection. 1976 */ 1977void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 1978{ 1979 if (test_bit(CLOSED, &con->state)) { 1980 dout("con_send %p closed, dropping %p\n", con, msg); 1981 ceph_msg_put(msg); 1982 return; 1983 } 1984 1985 /* set src+dst */ 1986 msg->hdr.src = con->msgr->inst.name; 1987 1988 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 1989 1990 msg->needs_out_seq = true; 1991 1992 /* queue */ 1993 mutex_lock(&con->mutex); 1994 BUG_ON(!list_empty(&msg->list_head)); 1995 list_add_tail(&msg->list_head, &con->out_queue); 1996 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 1997 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), 1998 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1999 le32_to_cpu(msg->hdr.front_len), 2000 le32_to_cpu(msg->hdr.middle_len), 2001 le32_to_cpu(msg->hdr.data_len)); 2002 mutex_unlock(&con->mutex); 2003 2004 /* if there wasn't anything waiting to send before, queue 2005 * new work */ 2006 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2007 queue_con(con); 2008} 2009 2010/* 2011 * Revoke a message that was previously queued for send 2012 */ 2013void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) 2014{ 2015 mutex_lock(&con->mutex); 2016 if (!list_empty(&msg->list_head)) { 2017 dout("con_revoke %p msg %p - was on queue\n", con, msg); 2018 list_del_init(&msg->list_head); 2019 ceph_msg_put(msg); 2020 msg->hdr.seq = 0; 2021 } 2022 if (con->out_msg == msg) { 2023 dout("con_revoke %p msg %p - was sending\n", con, msg); 2024 con->out_msg = NULL; 2025 if (con->out_kvec_is_msg) { 2026 con->out_skip = con->out_kvec_bytes; 2027 con->out_kvec_is_msg = false; 2028 } 2029 ceph_msg_put(msg); 2030 msg->hdr.seq = 0; 2031 } 2032 mutex_unlock(&con->mutex); 2033} 2034 2035/* 2036 * Revoke a message that we may be reading data into 2037 */ 2038void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) 2039{ 2040 mutex_lock(&con->mutex); 2041 if (con->in_msg && con->in_msg == msg) { 2042 unsigned front_len = le32_to_cpu(con->in_hdr.front_len); 2043 unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len); 2044 unsigned data_len = le32_to_cpu(con->in_hdr.data_len); 2045 2046 /* skip rest of message */ 2047 dout("con_revoke_pages %p msg %p revoked\n", con, msg); 2048 con->in_base_pos = con->in_base_pos - 2049 sizeof(struct ceph_msg_header) - 2050 front_len - 2051 middle_len - 2052 data_len - 2053 sizeof(struct ceph_msg_footer); 2054 ceph_msg_put(con->in_msg); 2055 con->in_msg = NULL; 2056 con->in_tag = CEPH_MSGR_TAG_READY; 2057 con->in_seq++; 2058 } else { 2059 dout("con_revoke_pages %p msg %p pages %p no-op\n", 2060 con, con->in_msg, msg); 2061 } 2062 mutex_unlock(&con->mutex); 2063} 2064 2065/* 2066 * Queue a keepalive byte to ensure the tcp connection is alive. 2067 */ 2068void ceph_con_keepalive(struct ceph_connection *con) 2069{ 2070 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2071 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2072 queue_con(con); 2073} 2074 2075 2076/* 2077 * construct a new message with given type, size 2078 * the new msg has a ref count of 1. 2079 */ 2080struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) 2081{ 2082 struct ceph_msg *m; 2083 2084 m = kmalloc(sizeof(*m), flags); 2085 if (m == NULL) 2086 goto out; 2087 kref_init(&m->kref); 2088 INIT_LIST_HEAD(&m->list_head); 2089 2090 m->hdr.tid = 0; 2091 m->hdr.type = cpu_to_le16(type); 2092 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 2093 m->hdr.version = 0; 2094 m->hdr.front_len = cpu_to_le32(front_len); 2095 m->hdr.middle_len = 0; 2096 m->hdr.data_len = 0; 2097 m->hdr.data_off = 0; 2098 m->hdr.reserved = 0; 2099 m->footer.front_crc = 0; 2100 m->footer.middle_crc = 0; 2101 m->footer.data_crc = 0; 2102 m->footer.flags = 0; 2103 m->front_max = front_len; 2104 m->front_is_vmalloc = false; 2105 m->more_to_follow = false; 2106 m->pool = NULL; 2107 2108 /* front */ 2109 if (front_len) { 2110 if (front_len > PAGE_CACHE_SIZE) { 2111 m->front.iov_base = __vmalloc(front_len, flags, 2112 PAGE_KERNEL); 2113 m->front_is_vmalloc = true; 2114 } else { 2115 m->front.iov_base = kmalloc(front_len, flags); 2116 } 2117 if (m->front.iov_base == NULL) { 2118 pr_err("msg_new can't allocate %d bytes\n", 2119 front_len); 2120 goto out2; 2121 } 2122 } else { 2123 m->front.iov_base = NULL; 2124 } 2125 m->front.iov_len = front_len; 2126 2127 /* middle */ 2128 m->middle = NULL; 2129 2130 /* data */ 2131 m->nr_pages = 0; 2132 m->pages = NULL; 2133 m->pagelist = NULL; 2134 2135 dout("ceph_msg_new %p front %d\n", m, front_len); 2136 return m; 2137 2138out2: 2139 ceph_msg_put(m); 2140out: 2141 pr_err("msg_new can't create type %d front %d\n", type, front_len); 2142 return NULL; 2143} 2144 2145/* 2146 * Allocate "middle" portion of a message, if it is needed and wasn't 2147 * allocated by alloc_msg. This allows us to read a small fixed-size 2148 * per-type header in the front and then gracefully fail (i.e., 2149 * propagate the error to the caller based on info in the front) when 2150 * the middle is too large. 2151 */ 2152static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 2153{ 2154 int type = le16_to_cpu(msg->hdr.type); 2155 int middle_len = le32_to_cpu(msg->hdr.middle_len); 2156 2157 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, 2158 ceph_msg_type_name(type), middle_len); 2159 BUG_ON(!middle_len); 2160 BUG_ON(msg->middle); 2161 2162 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS); 2163 if (!msg->middle) 2164 return -ENOMEM; 2165 return 0; 2166} 2167 2168/* 2169 * Generic message allocator, for incoming messages. 2170 */ 2171static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 2172 struct ceph_msg_header *hdr, 2173 int *skip) 2174{ 2175 int type = le16_to_cpu(hdr->type); 2176 int front_len = le32_to_cpu(hdr->front_len); 2177 int middle_len = le32_to_cpu(hdr->middle_len); 2178 struct ceph_msg *msg = NULL; 2179 int ret; 2180 2181 if (con->ops->alloc_msg) { 2182 mutex_unlock(&con->mutex); 2183 msg = con->ops->alloc_msg(con, hdr, skip); 2184 mutex_lock(&con->mutex); 2185 if (!msg || *skip) 2186 return NULL; 2187 } 2188 if (!msg) { 2189 *skip = 0; 2190 msg = ceph_msg_new(type, front_len, GFP_NOFS); 2191 if (!msg) { 2192 pr_err("unable to allocate msg type %d len %d\n", 2193 type, front_len); 2194 return NULL; 2195 } 2196 } 2197 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2198 2199 if (middle_len && !msg->middle) { 2200 ret = ceph_alloc_middle(con, msg); 2201 if (ret < 0) { 2202 ceph_msg_put(msg); 2203 return NULL; 2204 } 2205 } 2206 2207 return msg; 2208} 2209 2210 2211/* 2212 * Free a generically kmalloc'd message. 2213 */ 2214void ceph_msg_kfree(struct ceph_msg *m) 2215{ 2216 dout("msg_kfree %p\n", m); 2217 if (m->front_is_vmalloc) 2218 vfree(m->front.iov_base); 2219 else 2220 kfree(m->front.iov_base); 2221 kfree(m); 2222} 2223 2224/* 2225 * Drop a msg ref. Destroy as needed. 2226 */ 2227void ceph_msg_last_put(struct kref *kref) 2228{ 2229 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 2230 2231 dout("ceph_msg_put last one on %p\n", m); 2232 WARN_ON(!list_empty(&m->list_head)); 2233 2234 /* drop middle, data, if any */ 2235 if (m->middle) { 2236 ceph_buffer_put(m->middle); 2237 m->middle = NULL; 2238 } 2239 m->nr_pages = 0; 2240 m->pages = NULL; 2241 2242 if (m->pagelist) { 2243 ceph_pagelist_release(m->pagelist); 2244 kfree(m->pagelist); 2245 m->pagelist = NULL; 2246 } 2247 2248 if (m->pool) 2249 ceph_msgpool_put(m->pool, m); 2250 else 2251 ceph_msg_kfree(m); 2252} 2253 2254void ceph_msg_dump(struct ceph_msg *msg) 2255{ 2256 pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, 2257 msg->front_max, msg->nr_pages); 2258 print_hex_dump(KERN_DEBUG, "header: ", 2259 DUMP_PREFIX_OFFSET, 16, 1, 2260 &msg->hdr, sizeof(msg->hdr), true); 2261 print_hex_dump(KERN_DEBUG, " front: ", 2262 DUMP_PREFIX_OFFSET, 16, 1, 2263 msg->front.iov_base, msg->front.iov_len, true); 2264 if (msg->middle) 2265 print_hex_dump(KERN_DEBUG, "middle: ", 2266 DUMP_PREFIX_OFFSET, 16, 1, 2267 msg->middle->vec.iov_base, 2268 msg->middle->vec.iov_len, true); 2269 print_hex_dump(KERN_DEBUG, "footer: ", 2270 DUMP_PREFIX_OFFSET, 16, 1, 2271 &msg->footer, sizeof(msg->footer), true); 2272} 2273