Lines Matching refs:con

134 	struct connection *con;
164 int (*connect)(struct connection *con, struct socket *sock,
215 static void lowcomms_queue_swork(struct connection *con)
217 assert_spin_locked(&con->writequeue_lock);
219 if (!test_bit(CF_IO_STOP, &con->flags) &&
220 !test_bit(CF_APP_LIMITED, &con->flags) &&
221 !test_and_set_bit(CF_SEND_PENDING, &con->flags))
222 queue_work(io_workqueue, &con->swork);
225 static void lowcomms_queue_rwork(struct connection *con)
228 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk));
231 if (!test_bit(CF_IO_STOP, &con->flags) &&
232 !test_and_set_bit(CF_RECV_PENDING, &con->flags))
233 queue_work(io_workqueue, &con->rwork);
255 static struct writequeue_entry *con_next_wq(struct connection *con)
259 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
272 struct connection *con;
274 hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
275 if (con->nodeid == nodeid)
276 return con;
282 static void dlm_con_init(struct connection *con, int nodeid)
284 con->nodeid = nodeid;
285 init_rwsem(&con->sock_lock);
286 INIT_LIST_HEAD(&con->writequeue);
287 spin_lock_init(&con->writequeue_lock);
288 INIT_WORK(&con->swork, process_send_sockets);
289 INIT_WORK(&con->rwork, process_recv_sockets);
290 spin_lock_init(&con->addrs_lock);
291 init_waitqueue_head(&con->shutdown_wait);
300 struct connection *con, *tmp;
304 con = __find_con(nodeid, r);
305 if (con || !alloc)
306 return con;
308 con = kzalloc(sizeof(*con), alloc);
309 if (!con)
312 dlm_con_init(con, nodeid);
324 kfree(con);
328 hlist_add_head_rcu(&con->list, &connection_hash[r]);
331 return con;
367 struct connection *con;
374 con = nodeid2con(nodeid, 0);
375 if (!con) {
380 spin_lock(&con->addrs_lock);
381 if (!con->addr_count) {
382 spin_unlock(&con->addrs_lock);
387 memcpy(&sas, &con->addr[con->curr_addr_index],
391 con->curr_addr_index++;
392 if (con->curr_addr_index == con->addr_count)
393 con->curr_addr_index = 0;
396 *mark = con->mark;
397 spin_unlock(&con->addrs_lock);
424 struct connection *con;
429 hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
430 WARN_ON_ONCE(!con->addr_count);
432 spin_lock(&con->addrs_lock);
433 for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
434 if (addr_compare(&con->addr[addr_i], addr)) {
435 *nodeid = con->nodeid;
436 *mark = con->mark;
437 spin_unlock(&con->addrs_lock);
442 spin_unlock(&con->addrs_lock);
450 static bool dlm_lowcomms_con_has_addr(const struct connection *con,
455 for (i = 0; i < con->addr_count; i++) {
456 if (addr_compare(&con->addr[i], addr))
465 struct connection *con;
469 con = nodeid2con(nodeid, GFP_NOFS);
470 if (!con) {
475 spin_lock(&con->addrs_lock);
476 if (!con->addr_count) {
477 memcpy(&con->addr[0], addr, sizeof(*addr));
478 con->addr_count = 1;
479 con->mark = dlm_config.ci_mark;
480 spin_unlock(&con->addrs_lock);
485 ret = dlm_lowcomms_con_has_addr(con, addr);
487 spin_unlock(&con->addrs_lock);
492 if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
493 spin_unlock(&con->addrs_lock);
498 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
500 spin_unlock(&con->addrs_lock);
507 struct connection *con = sock2con(sk);
511 set_bit(CF_RECV_INTR, &con->flags);
512 lowcomms_queue_rwork(con);
517 struct connection *con = sock2con(sk);
519 clear_bit(SOCK_NOSPACE, &con->sock->flags);
521 spin_lock_bh(&con->writequeue_lock);
522 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
523 con->sock->sk->sk_write_pending--;
524 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
527 lowcomms_queue_swork(con);
528 spin_unlock_bh(&con->writequeue_lock);
549 struct connection *con;
553 con = nodeid2con(nodeid, 0);
554 if (WARN_ON_ONCE(!con)) {
559 down_read(&con->sock_lock);
560 if (!con->sock) {
561 spin_lock_bh(&con->writequeue_lock);
562 lowcomms_queue_swork(con);
563 spin_unlock_bh(&con->writequeue_lock);
565 up_read(&con->sock_lock);
574 struct connection *con;
578 con = nodeid2con(nodeid, 0);
579 if (!con) {
584 spin_lock(&con->addrs_lock);
585 con->mark = mark;
586 spin_unlock(&con->addrs_lock);
593 struct connection *con = sock2con(sk);
602 con->nodeid, &inet->inet_daddr,
611 con->nodeid, &sk->sk_v6_daddr,
625 dlm_midcomms_unack_msg_resend(con->nodeid);
644 static void add_sock(struct socket *sock, struct connection *con)
649 con->sock = sock;
651 sk->sk_user_data = con;
726 static void allow_connection_io(struct connection *con)
728 if (con->othercon)
729 clear_bit(CF_IO_STOP, &con->othercon->flags);
730 clear_bit(CF_IO_STOP, &con->flags);
733 static void stop_connection_io(struct connection *con)
735 if (con->othercon)
736 stop_connection_io(con->othercon);
738 spin_lock_bh(&con->writequeue_lock);
739 set_bit(CF_IO_STOP, &con->flags);
740 spin_unlock_bh(&con->writequeue_lock);
742 down_write(&con->sock_lock);
743 if (con->sock) {
744 lock_sock(con->sock->sk);
745 restore_callbacks(con->sock->sk);
746 release_sock(con->sock->sk);
748 up_write(&con->sock_lock);
750 cancel_work_sync(&con->swork);
751 cancel_work_sync(&con->rwork);
755 static void close_connection(struct connection *con, bool and_other)
759 if (con->othercon && and_other)
760 close_connection(con->othercon, false);
762 down_write(&con->sock_lock);
763 if (!con->sock) {
764 up_write(&con->sock_lock);
768 dlm_close_sock(&con->sock);
781 spin_lock_bh(&con->writequeue_lock);
782 if (!list_empty(&con->writequeue)) {
783 e = list_first_entry(&con->writequeue, struct writequeue_entry,
788 spin_unlock_bh(&con->writequeue_lock);
790 con->rx_leftover = 0;
791 con->retries = 0;
792 clear_bit(CF_APP_LIMITED, &con->flags);
793 clear_bit(CF_RECV_PENDING, &con->flags);
794 clear_bit(CF_SEND_PENDING, &con->flags);
795 up_write(&con->sock_lock);
798 static void shutdown_connection(struct connection *con, bool and_other)
802 if (con->othercon && and_other)
803 shutdown_connection(con->othercon, false);
806 down_read(&con->sock_lock);
808 if (!con->sock) {
809 up_read(&con->sock_lock);
813 ret = kernel_sock_shutdown(con->sock, SHUT_WR);
814 up_read(&con->sock_lock);
817 con, ret);
820 ret = wait_event_timeout(con->shutdown_wait, !con->sock,
824 con);
832 close_connection(con, false);
904 static int receive_from_sock(struct connection *con, int buflen)
911 pentry = new_processqueue_entry(con->nodeid, buflen);
915 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover);
920 iov.iov_base = pentry->buf + con->rx_leftover;
921 iov.iov_len = buflen - con->rx_leftover;
925 clear_bit(CF_RECV_INTR, &con->flags);
927 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
929 trace_dlm_recv(con->nodeid, ret);
931 lock_sock(con->sock->sk);
932 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) {
933 release_sock(con->sock->sk);
937 clear_bit(CF_RECV_PENDING, &con->flags);
938 release_sock(con->sock->sk);
951 buflen_real = ret + con->rx_leftover;
952 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf,
965 con->rx_leftover = buflen_real - ret;
966 memmove(con->rx_leftover_buf, pentry->buf + ret,
967 con->rx_leftover);
1072 /* close other sock con if we have something new */
1168 static struct writequeue_entry *new_writequeue_entry(struct connection *con)
1186 entry->con = con;
1192 static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
1198 spin_lock_bh(&con->writequeue_lock);
1199 if (!list_empty(&con->writequeue)) {
1200 e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
1214 e = new_writequeue_entry(con);
1224 list_add_tail(&e->list, &con->writequeue);
1227 spin_unlock_bh(&con->writequeue_lock);
1231 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1245 e = new_wq_entry(con, len, ppc, cb, data);
1268 struct connection *con;
1281 con = nodeid2con(nodeid, 0);
1282 if (WARN_ON_ONCE(!con)) {
1287 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
1304 struct connection *con = e->con;
1307 spin_lock_bh(&con->writequeue_lock);
1317 lowcomms_queue_swork(con);
1320 spin_unlock_bh(&con->writequeue_lock);
1351 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
1368 static int send_to_sock(struct connection *con)
1377 spin_lock_bh(&con->writequeue_lock);
1378 e = con_next_wq(con);
1380 clear_bit(CF_SEND_PENDING, &con->flags);
1381 spin_unlock_bh(&con->writequeue_lock);
1388 spin_unlock_bh(&con->writequeue_lock);
1392 ret = sock_sendmsg(con->sock, &msg);
1393 trace_dlm_send(con->nodeid, ret);
1395 lock_sock(con->sock->sk);
1396 spin_lock_bh(&con->writequeue_lock);
1397 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1398 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1402 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags);
1403 con->sock->sk->sk_write_pending++;
1405 clear_bit(CF_SEND_PENDING, &con->flags);
1406 spin_unlock_bh(&con->writequeue_lock);
1407 release_sock(con->sock->sk);
1412 spin_unlock_bh(&con->writequeue_lock);
1413 release_sock(con->sock->sk);
1420 spin_lock_bh(&con->writequeue_lock);
1422 spin_unlock_bh(&con->writequeue_lock);
1427 static void clean_one_writequeue(struct connection *con)
1431 spin_lock_bh(&con->writequeue_lock);
1432 list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1435 spin_unlock_bh(&con->writequeue_lock);
1440 struct connection *con = container_of(rcu, struct connection, rcu);
1442 WARN_ON_ONCE(!list_empty(&con->writequeue));
1443 WARN_ON_ONCE(con->sock);
1444 kfree(con);
1451 struct connection *con;
1457 con = nodeid2con(nodeid, 0);
1458 if (WARN_ON_ONCE(!con)) {
1463 stop_connection_io(con);
1465 close_connection(con, true);
1468 hlist_del_rcu(&con->list);
1471 clean_one_writequeue(con);
1472 call_srcu(&connections_srcu, &con->rcu, connection_release);
1473 if (con->othercon) {
1474 clean_one_writequeue(con->othercon);
1475 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
1491 struct connection *con = container_of(work, struct connection, rwork);
1494 down_read(&con->sock_lock);
1495 if (!con->sock) {
1496 up_read(&con->sock_lock);
1502 ret = receive_from_sock(con, buflen);
1504 up_read(&con->sock_lock);
1511 close_connection(con, false);
1512 wake_up(&con->shutdown_wait);
1520 queue_work(io_workqueue, &con->rwork);
1525 if (test_bit(CF_IS_OTHERCON, &con->flags)) {
1526 close_connection(con, false);
1528 spin_lock_bh(&con->writequeue_lock);
1529 lowcomms_queue_swork(con);
1530 spin_unlock_bh(&con->writequeue_lock);
1560 static int dlm_connect(struct connection *con)
1568 result = nodeid_to_addr(con->nodeid, &addr, NULL,
1571 log_print("no address for nodeid %d", con->nodeid);
1590 add_sock(sock, con);
1592 log_print_ratelimited("connecting to %d", con->nodeid);
1594 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
1604 dlm_close_sock(&con->sock);
1615 struct connection *con = container_of(work, struct connection, swork);
1618 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags));
1620 down_read(&con->sock_lock);
1621 if (!con->sock) {
1622 up_read(&con->sock_lock);
1623 down_write(&con->sock_lock);
1624 if (!con->sock) {
1625 ret = dlm_connect(con);
1638 up_write(&con->sock_lock);
1640 con->nodeid, con->retries++, ret);
1647 queue_work(io_workqueue, &con->swork);
1651 downgrade_write(&con->sock_lock);
1655 ret = send_to_sock(con);
1657 up_read(&con->sock_lock);
1666 queue_work(io_workqueue, &con->swork);
1670 close_connection(con, false);
1673 spin_lock_bh(&con->writequeue_lock);
1674 lowcomms_queue_swork(con);
1675 spin_unlock_bh(&con->writequeue_lock);
1723 struct connection *con;
1736 hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1737 shutdown_connection(con, true);
1738 stop_connection_io(con);
1740 close_connection(con, true);
1742 clean_one_writequeue(con);
1743 if (con->othercon)
1744 clean_one_writequeue(con->othercon);
1745 allow_connection_io(con);
1830 static int dlm_tcp_connect(struct connection *con, struct socket *sock,
1885 static int dlm_sctp_connect(struct connection *con, struct socket *sock,
1992 struct connection *con;
1997 hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1999 hlist_del_rcu(&con->list);
2002 if (con->othercon)
2003 call_srcu(&connections_srcu, &con->othercon->rcu,
2005 call_srcu(&connections_srcu, &con->rcu, connection_release);