Lines Matching refs:con

100 void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
104 clear_bit(con_flag, &con->flags);
107 void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag)
111 set_bit(con_flag, &con->flags);
114 bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag)
118 return test_bit(con_flag, &con->flags);
121 bool ceph_con_flag_test_and_clear(struct ceph_connection *con,
126 return test_and_clear_bit(con_flag, &con->flags);
129 bool ceph_con_flag_test_and_set(struct ceph_connection *con,
134 return test_and_set_bit(con_flag, &con->flags);
145 static void queue_con(struct ceph_connection *con);
146 static void cancel_con(struct ceph_connection *con);
148 static void con_fault(struct ceph_connection *con);
280 static void con_sock_state_init(struct ceph_connection *con)
284 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
287 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
291 static void con_sock_state_connecting(struct ceph_connection *con)
295 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
298 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
302 static void con_sock_state_connected(struct ceph_connection *con)
306 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
309 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
313 static void con_sock_state_closing(struct ceph_connection *con)
317 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
322 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
326 static void con_sock_state_closed(struct ceph_connection *con)
330 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
336 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
347 struct ceph_connection *con = sk->sk_user_data;
351 if (atomic_read(&con->msgr->stopping)) {
357 con, con->state);
358 queue_con(con);
365 struct ceph_connection *con = sk->sk_user_data;
374 if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) {
376 dout("%s %p queueing write work\n", __func__, con);
378 queue_con(con);
381 dout("%s %p nothing to write\n", __func__, con);
388 struct ceph_connection *con = sk->sk_user_data;
391 con, con->state, sk->sk_state);
399 con_sock_state_closing(con);
400 ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED);
401 queue_con(con);
405 con_sock_state_connected(con);
406 queue_con(con);
417 struct ceph_connection *con)
420 sk->sk_user_data = con;
434 int ceph_tcp_connect(struct ceph_connection *con)
436 struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */
441 dout("%s con %p peer_addr %s\n", __func__, con,
442 ceph_pr_addr(&con->peer_addr));
443 BUG_ON(con->sock);
447 ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family,
459 set_sock_callbacks(sock, con);
461 con_sock_state_connecting(con);
466 ceph_pr_addr(&con->peer_addr),
470 ceph_pr_addr(&con->peer_addr), ret);
475 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY))
478 con->sock = sock;
485 int ceph_con_close_socket(struct ceph_connection *con)
489 dout("%s con %p sock %p\n", __func__, con, con->sock);
490 if (con->sock) {
491 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
492 sock_release(con->sock);
493 con->sock = NULL;
502 ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED);
504 con_sock_state_closed(con);
508 static void ceph_con_reset_protocol(struct ceph_connection *con)
510 dout("%s con %p\n", __func__, con);
512 ceph_con_close_socket(con);
513 if (con->in_msg) {
514 WARN_ON(con->in_msg->con != con);
515 ceph_msg_put(con->in_msg);
516 con->in_msg = NULL;
518 if (con->out_msg) {
519 WARN_ON(con->out_msg->con != con);
520 ceph_msg_put(con->out_msg);
521 con->out_msg = NULL;
523 if (con->bounce_page) {
524 __free_page(con->bounce_page);
525 con->bounce_page = NULL;
528 if (ceph_msgr2(from_msgr(con->msgr)))
529 ceph_con_v2_reset_protocol(con);
531 ceph_con_v1_reset_protocol(con);
554 void ceph_con_reset_session(struct ceph_connection *con)
556 dout("%s con %p\n", __func__, con);
558 WARN_ON(con->in_msg);
559 WARN_ON(con->out_msg);
560 ceph_msg_remove_list(&con->out_queue);
561 ceph_msg_remove_list(&con->out_sent);
562 con->out_seq = 0;
563 con->in_seq = 0;
564 con->in_seq_acked = 0;
566 if (ceph_msgr2(from_msgr(con->msgr)))
567 ceph_con_v2_reset_session(con);
569 ceph_con_v1_reset_session(con);
575 void ceph_con_close(struct ceph_connection *con)
577 mutex_lock(&con->mutex);
578 dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
579 con->state = CEPH_CON_S_CLOSED;
581 ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next
583 ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING);
584 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
585 ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF);
587 ceph_con_reset_protocol(con);
588 ceph_con_reset_session(con);
589 cancel_con(con);
590 mutex_unlock(&con->mutex);
597 void ceph_con_open(struct ceph_connection *con,
601 mutex_lock(&con->mutex);
602 dout("con_open %p %s\n", con, ceph_pr_addr(addr));
604 WARN_ON(con->state != CEPH_CON_S_CLOSED);
605 con->state = CEPH_CON_S_PREOPEN;
607 con->peer_name.type = (__u8) entity_type;
608 con->peer_name.num = cpu_to_le64(entity_num);
610 memcpy(&con->peer_addr, addr, sizeof(*addr));
611 con->delay = 0; /* reset backoff memory */
612 mutex_unlock(&con->mutex);
613 queue_con(con);
620 bool ceph_con_opened(struct ceph_connection *con)
622 if (ceph_msgr2(from_msgr(con->msgr)))
623 return ceph_con_v2_opened(con);
625 return ceph_con_v1_opened(con);
631 void ceph_con_init(struct ceph_connection *con, void *private,
635 dout("con_init %p\n", con);
636 memset(con, 0, sizeof(*con));
637 con->private = private;
638 con->ops = ops;
639 con->msgr = msgr;
641 con_sock_state_init(con);
643 mutex_init(&con->mutex);
644 INIT_LIST_HEAD(&con->out_queue);
645 INIT_LIST_HEAD(&con->out_sent);
646 INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
648 con->state = CEPH_CON_S_CLOSED;
671 void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
676 dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq);
677 while (!list_empty(&con->out_sent)) {
678 msg = list_first_entry(&con->out_sent, struct ceph_msg,
685 dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
696 void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
701 dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
702 while (!list_empty(&con->out_queue)) {
703 msg = list_first_entry(&con->out_queue, struct ceph_msg,
711 dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
1401 void ceph_con_process_message(struct ceph_connection *con)
1403 struct ceph_msg *msg = con->in_msg;
1405 BUG_ON(con->in_msg->con != con);
1406 con->in_msg = NULL;
1409 if (con->peer_name.type == 0)
1410 con->peer_name = msg->hdr.src;
1412 con->in_seq++;
1413 mutex_unlock(&con->mutex);
1423 con->in_front_crc, con->in_middle_crc, con->in_data_crc);
1424 con->ops->dispatch(con, msg);
1426 mutex_lock(&con->mutex);
1431 * Bump @con reference to avoid races with connection teardown.
1434 static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
1436 if (!con->ops->get(con)) {
1437 dout("%s %p ref count 0\n", __func__, con);
1444 dout("%s %p %lu\n", __func__, con, delay);
1445 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
1446 dout("%s %p - already queued\n", __func__, con);
1447 con->ops->put(con);
1454 static void queue_con(struct ceph_connection *con)
1456 (void) queue_con_delay(con, 0);
1459 static void cancel_con(struct ceph_connection *con)
1461 if (cancel_delayed_work(&con->work)) {
1462 dout("%s %p\n", __func__, con);
1463 con->ops->put(con);
1467 static bool con_sock_closed(struct ceph_connection *con)
1469 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED))
1474 con->error_msg = "socket closed (con state " #x ")"; \
1477 switch (con->state) {
1499 static bool con_backoff(struct ceph_connection *con)
1503 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF))
1506 ret = queue_con_delay(con, con->delay);
1508 dout("%s: con %p FAILED to back off %lu\n", __func__,
1509 con, con->delay);
1511 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
1517 /* Finish fault handling; con->mutex must *not* be held here */
1519 static void con_fault_finish(struct ceph_connection *con)
1521 dout("%s %p\n", __func__, con);
1527 if (con->v1.auth_retry) {
1528 dout("auth_retry %d, invalidating\n", con->v1.auth_retry);
1529 if (con->ops->invalidate_authorizer)
1530 con->ops->invalidate_authorizer(con);
1531 con->v1.auth_retry = 0;
1534 if (con->ops->fault)
1535 con->ops->fault(con);
1543 struct ceph_connection *con = container_of(work, struct ceph_connection,
1547 mutex_lock(&con->mutex);
1551 if ((fault = con_sock_closed(con))) {
1552 dout("%s: con %p SOCK_CLOSED\n", __func__, con);
1555 if (con_backoff(con)) {
1556 dout("%s: con %p BACKOFF\n", __func__, con);
1559 if (con->state == CEPH_CON_S_STANDBY) {
1560 dout("%s: con %p STANDBY\n", __func__, con);
1563 if (con->state == CEPH_CON_S_CLOSED) {
1564 dout("%s: con %p CLOSED\n", __func__, con);
1565 BUG_ON(con->sock);
1568 if (con->state == CEPH_CON_S_PREOPEN) {
1569 dout("%s: con %p PREOPEN\n", __func__, con);
1570 BUG_ON(con->sock);
1573 if (ceph_msgr2(from_msgr(con->msgr)))
1574 ret = ceph_con_v2_try_read(con);
1576 ret = ceph_con_v1_try_read(con);
1580 if (!con->error_msg)
1581 con->error_msg = "socket error on read";
1586 if (ceph_msgr2(from_msgr(con->msgr)))
1587 ret = ceph_con_v2_try_write(con);
1589 ret = ceph_con_v1_try_write(con);
1593 if (!con->error_msg)
1594 con->error_msg = "socket error on write";
1601 con_fault(con);
1602 mutex_unlock(&con->mutex);
1605 con_fault_finish(con);
1607 con->ops->put(con);
1614 static void con_fault(struct ceph_connection *con)
1617 con, con->state, ceph_pr_addr(&con->peer_addr));
1619 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
1620 ceph_pr_addr(&con->peer_addr), con->error_msg);
1621 con->error_msg = NULL;
1623 WARN_ON(con->state == CEPH_CON_S_STANDBY ||
1624 con->state == CEPH_CON_S_CLOSED);
1626 ceph_con_reset_protocol(con);
1628 if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
1630 con->state = CEPH_CON_S_CLOSED;
1635 list_splice_init(&con->out_sent, &con->out_queue);
1639 if (list_empty(&con->out_queue) &&
1640 !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
1641 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
1642 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1643 con->state = CEPH_CON_S_STANDBY;
1646 con->state = CEPH_CON_S_PREOPEN;
1647 if (!con->delay) {
1648 con->delay = BASE_DELAY_INTERVAL;
1649 } else if (con->delay < MAX_DELAY_INTERVAL) {
1650 con->delay *= 2;
1651 if (con->delay > MAX_DELAY_INTERVAL)
1652 con->delay = MAX_DELAY_INTERVAL;
1654 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
1655 queue_con(con);
1704 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
1706 if (msg->con)
1707 msg->con->ops->put(msg->con);
1709 msg->con = con ? con->ops->get(con) : NULL;
1710 BUG_ON(msg->con != con);
1713 static void clear_standby(struct ceph_connection *con)
1716 if (con->state == CEPH_CON_S_STANDBY) {
1717 dout("clear_standby %p and ++connect_seq\n", con);
1718 con->state = CEPH_CON_S_PREOPEN;
1719 con->v1.connect_seq++;
1720 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
1721 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
1730 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1733 msg->hdr.src = con->msgr->inst.name;
1737 mutex_lock(&con->mutex);
1739 if (con->state == CEPH_CON_S_CLOSED) {
1740 dout("con_send %p closed, dropping %p\n", con, msg);
1742 mutex_unlock(&con->mutex);
1746 msg_con_set(msg, con);
1749 list_add_tail(&msg->list_head, &con->out_queue);
1751 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
1757 clear_standby(con);
1758 mutex_unlock(&con->mutex);
1762 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
1763 queue_con(con);
1772 struct ceph_connection *con = msg->con;
1774 if (!con) {
1775 dout("%s msg %p null con\n", __func__, msg);
1779 mutex_lock(&con->mutex);
1781 WARN_ON(con->out_msg == msg);
1782 dout("%s con %p msg %p not linked\n", __func__, con, msg);
1783 mutex_unlock(&con->mutex);
1787 dout("%s con %p msg %p was linked\n", __func__, con, msg);
1791 if (con->out_msg == msg) {
1792 WARN_ON(con->state != CEPH_CON_S_OPEN);
1793 dout("%s con %p msg %p was sending\n", __func__, con, msg);
1794 if (ceph_msgr2(from_msgr(con->msgr)))
1795 ceph_con_v2_revoke(con);
1797 ceph_con_v1_revoke(con);
1798 ceph_msg_put(con->out_msg);
1799 con->out_msg = NULL;
1801 dout("%s con %p msg %p not current, out_msg %p\n", __func__,
1802 con, msg, con->out_msg);
1804 mutex_unlock(&con->mutex);
1812 struct ceph_connection *con = msg->con;
1814 if (!con) {
1815 dout("%s msg %p null con\n", __func__, msg);
1819 mutex_lock(&con->mutex);
1820 if (con->in_msg == msg) {
1821 WARN_ON(con->state != CEPH_CON_S_OPEN);
1822 dout("%s con %p msg %p was recving\n", __func__, con, msg);
1823 if (ceph_msgr2(from_msgr(con->msgr)))
1824 ceph_con_v2_revoke_incoming(con);
1826 ceph_con_v1_revoke_incoming(con);
1827 ceph_msg_put(con->in_msg);
1828 con->in_msg = NULL;
1830 dout("%s con %p msg %p not current, in_msg %p\n", __func__,
1831 con, msg, con->in_msg);
1833 mutex_unlock(&con->mutex);
1839 void ceph_con_keepalive(struct ceph_connection *con)
1841 dout("con_keepalive %p\n", con);
1842 mutex_lock(&con->mutex);
1843 clear_standby(con);
1844 ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
1845 mutex_unlock(&con->mutex);
1847 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
1848 queue_con(con);
1852 bool ceph_con_keepalive_expired(struct ceph_connection *con,
1856 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
1861 ts = timespec64_add(con->last_keepalive_ack, ts);
2034 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2052 * connection, and save the result in con->in_msg. Uses the
2059 * - con->in_msg == NULL
2061 * - con->in_msg is non-null.
2063 * - con->in_msg == NULL
2065 int ceph_con_in_msg_alloc(struct ceph_connection *con,
2072 BUG_ON(con->in_msg != NULL);
2073 BUG_ON(!con->ops->alloc_msg);
2075 mutex_unlock(&con->mutex);
2076 msg = con->ops->alloc_msg(con, hdr, skip);
2077 mutex_lock(&con->mutex);
2078 if (con->state != CEPH_CON_S_OPEN) {
2085 msg_con_set(msg, con);
2086 con->in_msg = msg;
2096 con->error_msg = "error allocating memory for incoming message";
2099 memcpy(&con->in_msg->hdr, hdr, sizeof(*hdr));
2101 if (middle_len && !con->in_msg->middle) {
2102 ret = ceph_alloc_middle(con, con->in_msg);
2104 ceph_msg_put(con->in_msg);
2105 con->in_msg = NULL;
2112 void ceph_con_get_out_msg(struct ceph_connection *con)
2116 BUG_ON(list_empty(&con->out_queue));
2117 msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
2118 WARN_ON(msg->con != con);
2124 list_move_tail(&msg->list_head, &con->out_sent);
2131 msg->hdr.seq = cpu_to_le64(++con->out_seq);
2134 if (con->ops->reencode_message)
2135 con->ops->reencode_message(msg);
2142 WARN_ON(con->out_msg);
2143 con->out_msg = ceph_msg_get(msg);