1/******************************************************************************
2*******************************************************************************
3**
4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
6**
7**  This copyrighted material is made available to anyone wishing to use,
8**  modify, copy, or redistribute it subject to the terms and conditions
9**  of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is its
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * lowcomms will choose to use either TCP or SCTP as its transport layer
40 * depending on the configuration variable 'protocol'. This should be set
41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
43 * for the DLM to function.
44 *
45 */
46
47#include <asm/ioctls.h>
48#include <net/sock.h>
49#include <net/tcp.h>
50#include <linux/pagemap.h>
51#include <linux/file.h>
52#include <linux/mutex.h>
53#include <linux/sctp.h>
54#include <linux/slab.h>
55#include <net/sctp/user.h>
56#include <net/ipv6.h>
57
58#include "dlm_internal.h"
59#include "lowcomms.h"
60#include "midcomms.h"
61#include "config.h"
62
63#define NEEDED_RMEM (4*1024*1024)
64#define CONN_HASH_SIZE 32
65
66struct cbuf {
67	unsigned int base;
68	unsigned int len;
69	unsigned int mask;
70};
71
72static void cbuf_add(struct cbuf *cb, int n)
73{
74	cb->len += n;
75}
76
77static int cbuf_data(struct cbuf *cb)
78{
79	return ((cb->base + cb->len) & cb->mask);
80}
81
82static void cbuf_init(struct cbuf *cb, int size)
83{
84	cb->base = cb->len = 0;
85	cb->mask = size-1;
86}
87
88static void cbuf_eat(struct cbuf *cb, int n)
89{
90	cb->len  -= n;
91	cb->base += n;
92	cb->base &= cb->mask;
93}
94
95static bool cbuf_empty(struct cbuf *cb)
96{
97	return cb->len == 0;
98}
99
100struct connection {
101	struct socket *sock;	/* NULL if not connected */
102	uint32_t nodeid;	/* So we know who we are in the list */
103	struct mutex sock_mutex;
104	unsigned long flags;
105#define CF_READ_PENDING 1
106#define CF_WRITE_PENDING 2
107#define CF_CONNECT_PENDING 3
108#define CF_INIT_PENDING 4
109#define CF_IS_OTHERCON 5
110#define CF_CLOSE 6
111	struct list_head writequeue;  /* List of outgoing writequeue_entries */
112	spinlock_t writequeue_lock;
113	int (*rx_action) (struct connection *);	/* What to do when active */
114	void (*connect_action) (struct connection *);	/* What to do to connect */
115	struct page *rx_page;
116	struct cbuf cb;
117	int retries;
118#define MAX_CONNECT_RETRIES 3
119	int sctp_assoc;
120	struct hlist_node list;
121	struct connection *othercon;
122	struct work_struct rwork; /* Receive workqueue */
123	struct work_struct swork; /* Send workqueue */
124};
125#define sock2con(x) ((struct connection *)(x)->sk_user_data)
126
127/* An entry waiting to be sent */
128struct writequeue_entry {
129	struct list_head list;
130	struct page *page;
131	int offset;
132	int len;
133	int end;
134	int users;
135	struct connection *con;
136};
137
138static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
139static int dlm_local_count;
140
141/* Work queues */
142static struct workqueue_struct *recv_workqueue;
143static struct workqueue_struct *send_workqueue;
144
145static struct hlist_head connection_hash[CONN_HASH_SIZE];
146static DEFINE_MUTEX(connections_lock);
147static struct kmem_cache *con_cache;
148
149static void process_recv_sockets(struct work_struct *work);
150static void process_send_sockets(struct work_struct *work);
151
152
153/* This is deliberately very simple because most clusters have simple
154   sequential nodeids, so we should be able to go straight to a connection
155   struct in the array */
156static inline int nodeid_hash(int nodeid)
157{
158	return nodeid & (CONN_HASH_SIZE-1);
159}
160
161static struct connection *__find_con(int nodeid)
162{
163	int r;
164	struct hlist_node *h;
165	struct connection *con;
166
167	r = nodeid_hash(nodeid);
168
169	hlist_for_each_entry(con, h, &connection_hash[r], list) {
170		if (con->nodeid == nodeid)
171			return con;
172	}
173	return NULL;
174}
175
176/*
177 * If 'allocation' is zero then we don't attempt to create a new
178 * connection structure for this node.
179 */
180static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
181{
182	struct connection *con = NULL;
183	int r;
184
185	con = __find_con(nodeid);
186	if (con || !alloc)
187		return con;
188
189	con = kmem_cache_zalloc(con_cache, alloc);
190	if (!con)
191		return NULL;
192
193	r = nodeid_hash(nodeid);
194	hlist_add_head(&con->list, &connection_hash[r]);
195
196	con->nodeid = nodeid;
197	mutex_init(&con->sock_mutex);
198	INIT_LIST_HEAD(&con->writequeue);
199	spin_lock_init(&con->writequeue_lock);
200	INIT_WORK(&con->swork, process_send_sockets);
201	INIT_WORK(&con->rwork, process_recv_sockets);
202
203	/* Setup action pointers for child sockets */
204	if (con->nodeid) {
205		struct connection *zerocon = __find_con(0);
206
207		con->connect_action = zerocon->connect_action;
208		if (!con->rx_action)
209			con->rx_action = zerocon->rx_action;
210	}
211
212	return con;
213}
214
215/* Loop round all connections */
216static void foreach_conn(void (*conn_func)(struct connection *c))
217{
218	int i;
219	struct hlist_node *h, *n;
220	struct connection *con;
221
222	for (i = 0; i < CONN_HASH_SIZE; i++) {
223		hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){
224			conn_func(con);
225		}
226	}
227}
228
229static struct connection *nodeid2con(int nodeid, gfp_t allocation)
230{
231	struct connection *con;
232
233	mutex_lock(&connections_lock);
234	con = __nodeid2con(nodeid, allocation);
235	mutex_unlock(&connections_lock);
236
237	return con;
238}
239
240/* This is a bit drastic, but only called when things go wrong */
241static struct connection *assoc2con(int assoc_id)
242{
243	int i;
244	struct hlist_node *h;
245	struct connection *con;
246
247	mutex_lock(&connections_lock);
248
249	for (i = 0 ; i < CONN_HASH_SIZE; i++) {
250		hlist_for_each_entry(con, h, &connection_hash[i], list) {
251			if (con->sctp_assoc == assoc_id) {
252				mutex_unlock(&connections_lock);
253				return con;
254			}
255		}
256	}
257	mutex_unlock(&connections_lock);
258	return NULL;
259}
260
261static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
262{
263	struct sockaddr_storage addr;
264	int error;
265
266	if (!dlm_local_count)
267		return -1;
268
269	error = dlm_nodeid_to_addr(nodeid, &addr);
270	if (error)
271		return error;
272
273	if (dlm_local_addr[0]->ss_family == AF_INET) {
274		struct sockaddr_in *in4  = (struct sockaddr_in *) &addr;
275		struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
276		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
277	} else {
278		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &addr;
279		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
280		ipv6_addr_copy(&ret6->sin6_addr, &in6->sin6_addr);
281	}
282
283	return 0;
284}
285
286/* Data available on socket or listen socket received a connect */
287static void lowcomms_data_ready(struct sock *sk, int count_unused)
288{
289	struct connection *con = sock2con(sk);
290	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
291		queue_work(recv_workqueue, &con->rwork);
292}
293
294static void lowcomms_write_space(struct sock *sk)
295{
296	struct connection *con = sock2con(sk);
297
298	if (con && !test_and_set_bit(CF_WRITE_PENDING, &con->flags))
299		queue_work(send_workqueue, &con->swork);
300}
301
302static inline void lowcomms_connect_sock(struct connection *con)
303{
304	if (test_bit(CF_CLOSE, &con->flags))
305		return;
306	if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
307		queue_work(send_workqueue, &con->swork);
308}
309
310static void lowcomms_state_change(struct sock *sk)
311{
312	if (sk->sk_state == TCP_ESTABLISHED)
313		lowcomms_write_space(sk);
314}
315
316int dlm_lowcomms_connect_node(int nodeid)
317{
318	struct connection *con;
319
320	/* with sctp there's no connecting without sending */
321	if (dlm_config.ci_protocol != 0)
322		return 0;
323
324	if (nodeid == dlm_our_nodeid())
325		return 0;
326
327	con = nodeid2con(nodeid, GFP_NOFS);
328	if (!con)
329		return -ENOMEM;
330	lowcomms_connect_sock(con);
331	return 0;
332}
333
334/* Make a socket active */
335static int add_sock(struct socket *sock, struct connection *con)
336{
337	con->sock = sock;
338
339	/* Install a data_ready callback */
340	con->sock->sk->sk_data_ready = lowcomms_data_ready;
341	con->sock->sk->sk_write_space = lowcomms_write_space;
342	con->sock->sk->sk_state_change = lowcomms_state_change;
343	con->sock->sk->sk_user_data = con;
344	con->sock->sk->sk_allocation = GFP_NOFS;
345	return 0;
346}
347
348/* Add the port number to an IPv6 or 4 sockaddr and return the address
349   length */
350static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
351			  int *addr_len)
352{
353	saddr->ss_family =  dlm_local_addr[0]->ss_family;
354	if (saddr->ss_family == AF_INET) {
355		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
356		in4_addr->sin_port = cpu_to_be16(port);
357		*addr_len = sizeof(struct sockaddr_in);
358		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
359	} else {
360		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
361		in6_addr->sin6_port = cpu_to_be16(port);
362		*addr_len = sizeof(struct sockaddr_in6);
363	}
364	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
365}
366
367/* Close a remote connection and tidy up */
368static void close_connection(struct connection *con, bool and_other)
369{
370	mutex_lock(&con->sock_mutex);
371
372	if (con->sock) {
373		sock_release(con->sock);
374		con->sock = NULL;
375	}
376	if (con->othercon && and_other) {
377		/* Will only re-enter once. */
378		close_connection(con->othercon, false);
379	}
380	if (con->rx_page) {
381		__free_page(con->rx_page);
382		con->rx_page = NULL;
383	}
384
385	con->retries = 0;
386	mutex_unlock(&con->sock_mutex);
387}
388
389/* We only send shutdown messages to nodes that are not part of the cluster */
390static void sctp_send_shutdown(sctp_assoc_t associd)
391{
392	static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
393	struct msghdr outmessage;
394	struct cmsghdr *cmsg;
395	struct sctp_sndrcvinfo *sinfo;
396	int ret;
397	struct connection *con;
398
399	con = nodeid2con(0,0);
400	BUG_ON(con == NULL);
401
402	outmessage.msg_name = NULL;
403	outmessage.msg_namelen = 0;
404	outmessage.msg_control = outcmsg;
405	outmessage.msg_controllen = sizeof(outcmsg);
406	outmessage.msg_flags = MSG_EOR;
407
408	cmsg = CMSG_FIRSTHDR(&outmessage);
409	cmsg->cmsg_level = IPPROTO_SCTP;
410	cmsg->cmsg_type = SCTP_SNDRCV;
411	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
412	outmessage.msg_controllen = cmsg->cmsg_len;
413	sinfo = CMSG_DATA(cmsg);
414	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
415
416	sinfo->sinfo_flags |= MSG_EOF;
417	sinfo->sinfo_assoc_id = associd;
418
419	ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
420
421	if (ret != 0)
422		log_print("send EOF to node failed: %d", ret);
423}
424
425static void sctp_init_failed_foreach(struct connection *con)
426{
427	con->sctp_assoc = 0;
428	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
429		if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
430			queue_work(send_workqueue, &con->swork);
431	}
432}
433
434/* INIT failed but we don't know which node...
435   restart INIT on all pending nodes */
436static void sctp_init_failed(void)
437{
438	mutex_lock(&connections_lock);
439
440	foreach_conn(sctp_init_failed_foreach);
441
442	mutex_unlock(&connections_lock);
443}
444
445/* Something happened to an association */
446static void process_sctp_notification(struct connection *con,
447				      struct msghdr *msg, char *buf)
448{
449	union sctp_notification *sn = (union sctp_notification *)buf;
450
451	if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
452		switch (sn->sn_assoc_change.sac_state) {
453
454		case SCTP_COMM_UP:
455		case SCTP_RESTART:
456		{
457			/* Check that the new node is in the lockspace */
458			struct sctp_prim prim;
459			int nodeid;
460			int prim_len, ret;
461			int addr_len;
462			struct connection *new_con;
463			sctp_peeloff_arg_t parg;
464			int parglen = sizeof(parg);
465			int err;
466
467			/*
468			 * We get this before any data for an association.
469			 * We verify that the node is in the cluster and
470			 * then peel off a socket for it.
471			 */
472			if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
473				log_print("COMM_UP for invalid assoc ID %d",
474					 (int)sn->sn_assoc_change.sac_assoc_id);
475				sctp_init_failed();
476				return;
477			}
478			memset(&prim, 0, sizeof(struct sctp_prim));
479			prim_len = sizeof(struct sctp_prim);
480			prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
481
482			ret = kernel_getsockopt(con->sock,
483						IPPROTO_SCTP,
484						SCTP_PRIMARY_ADDR,
485						(char*)&prim,
486						&prim_len);
487			if (ret < 0) {
488				log_print("getsockopt/sctp_primary_addr on "
489					  "new assoc %d failed : %d",
490					  (int)sn->sn_assoc_change.sac_assoc_id,
491					  ret);
492
493				/* Retry INIT later */
494				new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
495				if (new_con)
496					clear_bit(CF_CONNECT_PENDING, &con->flags);
497				return;
498			}
499			make_sockaddr(&prim.ssp_addr, 0, &addr_len);
500			if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
501				int i;
502				unsigned char *b=(unsigned char *)&prim.ssp_addr;
503				log_print("reject connect from unknown addr");
504				for (i=0; i<sizeof(struct sockaddr_storage);i++)
505					printk("%02x ", b[i]);
506				printk("\n");
507				sctp_send_shutdown(prim.ssp_assoc_id);
508				return;
509			}
510
511			new_con = nodeid2con(nodeid, GFP_NOFS);
512			if (!new_con)
513				return;
514
515			/* Peel off a new sock */
516			parg.associd = sn->sn_assoc_change.sac_assoc_id;
517			ret = kernel_getsockopt(con->sock, IPPROTO_SCTP,
518						SCTP_SOCKOPT_PEELOFF,
519						(void *)&parg, &parglen);
520			if (ret < 0) {
521				log_print("Can't peel off a socket for "
522					  "connection %d to node %d: err=%d",
523					  parg.associd, nodeid, ret);
524				return;
525			}
526			new_con->sock = sockfd_lookup(parg.sd, &err);
527			if (!new_con->sock) {
528				log_print("sockfd_lookup error %d", err);
529				return;
530			}
531			add_sock(new_con->sock, new_con);
532			sockfd_put(new_con->sock);
533
534			log_print("connecting to %d sctp association %d",
535				 nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
536
537			/* Send any pending writes */
538			clear_bit(CF_CONNECT_PENDING, &new_con->flags);
539			clear_bit(CF_INIT_PENDING, &con->flags);
540			if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
541				queue_work(send_workqueue, &new_con->swork);
542			}
543			if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
544				queue_work(recv_workqueue, &new_con->rwork);
545		}
546		break;
547
548		case SCTP_COMM_LOST:
549		case SCTP_SHUTDOWN_COMP:
550		{
551			con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
552			if (con) {
553				con->sctp_assoc = 0;
554			}
555		}
556		break;
557
558		/* We don't know which INIT failed, so clear the PENDING flags
559		 * on them all.  if assoc_id is zero then it will then try
560		 * again */
561
562		case SCTP_CANT_STR_ASSOC:
563		{
564			log_print("Can't start SCTP association - retrying");
565			sctp_init_failed();
566		}
567		break;
568
569		default:
570			log_print("unexpected SCTP assoc change id=%d state=%d",
571				  (int)sn->sn_assoc_change.sac_assoc_id,
572				  sn->sn_assoc_change.sac_state);
573		}
574	}
575}
576
577/* Data received from remote end */
578static int receive_from_sock(struct connection *con)
579{
580	int ret = 0;
581	struct msghdr msg = {};
582	struct kvec iov[2];
583	unsigned len;
584	int r;
585	int call_again_soon = 0;
586	int nvec;
587	char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
588
589	mutex_lock(&con->sock_mutex);
590
591	if (con->sock == NULL) {
592		ret = -EAGAIN;
593		goto out_close;
594	}
595
596	if (con->rx_page == NULL) {
597		/*
598		 * This doesn't need to be atomic, but I think it should
599		 * improve performance if it is.
600		 */
601		con->rx_page = alloc_page(GFP_ATOMIC);
602		if (con->rx_page == NULL)
603			goto out_resched;
604		cbuf_init(&con->cb, PAGE_CACHE_SIZE);
605	}
606
607	/* Only SCTP needs these really */
608	memset(&incmsg, 0, sizeof(incmsg));
609	msg.msg_control = incmsg;
610	msg.msg_controllen = sizeof(incmsg);
611
612	/*
613	 * iov[0] is the bit of the circular buffer between the current end
614	 * point (cb.base + cb.len) and the end of the buffer.
615	 */
616	iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
617	iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
618	iov[1].iov_len = 0;
619	nvec = 1;
620
621	/*
622	 * iov[1] is the bit of the circular buffer between the start of the
623	 * buffer and the start of the currently used section (cb.base)
624	 */
625	if (cbuf_data(&con->cb) >= con->cb.base) {
626		iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
627		iov[1].iov_len = con->cb.base;
628		iov[1].iov_base = page_address(con->rx_page);
629		nvec = 2;
630	}
631	len = iov[0].iov_len + iov[1].iov_len;
632
633	r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
634			       MSG_DONTWAIT | MSG_NOSIGNAL);
635	if (ret <= 0)
636		goto out_close;
637
638	/* Process SCTP notifications */
639	if (msg.msg_flags & MSG_NOTIFICATION) {
640		msg.msg_control = incmsg;
641		msg.msg_controllen = sizeof(incmsg);
642
643		process_sctp_notification(con, &msg,
644				page_address(con->rx_page) + con->cb.base);
645		mutex_unlock(&con->sock_mutex);
646		return 0;
647	}
648	BUG_ON(con->nodeid == 0);
649
650	if (ret == len)
651		call_again_soon = 1;
652	cbuf_add(&con->cb, ret);
653	ret = dlm_process_incoming_buffer(con->nodeid,
654					  page_address(con->rx_page),
655					  con->cb.base, con->cb.len,
656					  PAGE_CACHE_SIZE);
657	if (ret == -EBADMSG) {
658		log_print("lowcomms: addr=%p, base=%u, len=%u, "
659			  "iov_len=%u, iov_base[0]=%p, read=%d",
660			  page_address(con->rx_page), con->cb.base, con->cb.len,
661			  len, iov[0].iov_base, r);
662	}
663	if (ret < 0)
664		goto out_close;
665	cbuf_eat(&con->cb, ret);
666
667	if (cbuf_empty(&con->cb) && !call_again_soon) {
668		__free_page(con->rx_page);
669		con->rx_page = NULL;
670	}
671
672	if (call_again_soon)
673		goto out_resched;
674	mutex_unlock(&con->sock_mutex);
675	return 0;
676
677out_resched:
678	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
679		queue_work(recv_workqueue, &con->rwork);
680	mutex_unlock(&con->sock_mutex);
681	return -EAGAIN;
682
683out_close:
684	mutex_unlock(&con->sock_mutex);
685	if (ret != -EAGAIN) {
686		close_connection(con, false);
687		/* Reconnect when there is something to send */
688	}
689	/* Don't return success if we really got EOF */
690	if (ret == 0)
691		ret = -EAGAIN;
692
693	return ret;
694}
695
696/* Listening socket is busy, accept a connection */
697static int tcp_accept_from_sock(struct connection *con)
698{
699	int result;
700	struct sockaddr_storage peeraddr;
701	struct socket *newsock;
702	int len;
703	int nodeid;
704	struct connection *newcon;
705	struct connection *addcon;
706
707	memset(&peeraddr, 0, sizeof(peeraddr));
708	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
709				  IPPROTO_TCP, &newsock);
710	if (result < 0)
711		return -ENOMEM;
712
713	mutex_lock_nested(&con->sock_mutex, 0);
714
715	result = -ENOTCONN;
716	if (con->sock == NULL)
717		goto accept_err;
718
719	newsock->type = con->sock->type;
720	newsock->ops = con->sock->ops;
721
722	result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
723	if (result < 0)
724		goto accept_err;
725
726	/* Get the connected socket's peer */
727	memset(&peeraddr, 0, sizeof(peeraddr));
728	if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
729				  &len, 2)) {
730		result = -ECONNABORTED;
731		goto accept_err;
732	}
733
734	/* Get the new node's NODEID */
735	make_sockaddr(&peeraddr, 0, &len);
736	if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
737		log_print("connect from non cluster node");
738		sock_release(newsock);
739		mutex_unlock(&con->sock_mutex);
740		return -1;
741	}
742
743	log_print("got connection from %d", nodeid);
744
745	/*  Check to see if we already have a connection to this node. This
746	 *  could happen if the two nodes initiate a connection at roughly
747	 *  the same time and the connections cross on the wire.
748	 *  In this case we store the incoming one in "othercon"
749	 */
750	newcon = nodeid2con(nodeid, GFP_NOFS);
751	if (!newcon) {
752		result = -ENOMEM;
753		goto accept_err;
754	}
755	mutex_lock_nested(&newcon->sock_mutex, 1);
756	if (newcon->sock) {
757		struct connection *othercon = newcon->othercon;
758
759		if (!othercon) {
760			othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
761			if (!othercon) {
762				log_print("failed to allocate incoming socket");
763				mutex_unlock(&newcon->sock_mutex);
764				result = -ENOMEM;
765				goto accept_err;
766			}
767			othercon->nodeid = nodeid;
768			othercon->rx_action = receive_from_sock;
769			mutex_init(&othercon->sock_mutex);
770			INIT_WORK(&othercon->swork, process_send_sockets);
771			INIT_WORK(&othercon->rwork, process_recv_sockets);
772			set_bit(CF_IS_OTHERCON, &othercon->flags);
773		}
774		if (!othercon->sock) {
775			newcon->othercon = othercon;
776			othercon->sock = newsock;
777			newsock->sk->sk_user_data = othercon;
778			add_sock(newsock, othercon);
779			addcon = othercon;
780		}
781		else {
782			printk("Extra connection from node %d attempted\n", nodeid);
783			result = -EAGAIN;
784			mutex_unlock(&newcon->sock_mutex);
785			goto accept_err;
786		}
787	}
788	else {
789		newsock->sk->sk_user_data = newcon;
790		newcon->rx_action = receive_from_sock;
791		add_sock(newsock, newcon);
792		addcon = newcon;
793	}
794
795	mutex_unlock(&newcon->sock_mutex);
796
797	/*
798	 * Add it to the active queue in case we got data
799	 * beween processing the accept adding the socket
800	 * to the read_sockets list
801	 */
802	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
803		queue_work(recv_workqueue, &addcon->rwork);
804	mutex_unlock(&con->sock_mutex);
805
806	return 0;
807
808accept_err:
809	mutex_unlock(&con->sock_mutex);
810	sock_release(newsock);
811
812	if (result != -EAGAIN)
813		log_print("error accepting connection from node: %d", result);
814	return result;
815}
816
817static void free_entry(struct writequeue_entry *e)
818{
819	__free_page(e->page);
820	kfree(e);
821}
822
823/* Initiate an SCTP association.
824   This is a special case of send_to_sock() in that we don't yet have a
825   peeled-off socket for this association, so we use the listening socket
826   and add the primary IP address of the remote node.
827 */
828static void sctp_init_assoc(struct connection *con)
829{
830	struct sockaddr_storage rem_addr;
831	char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
832	struct msghdr outmessage;
833	struct cmsghdr *cmsg;
834	struct sctp_sndrcvinfo *sinfo;
835	struct connection *base_con;
836	struct writequeue_entry *e;
837	int len, offset;
838	int ret;
839	int addrlen;
840	struct kvec iov[1];
841
842	if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
843		return;
844
845	if (con->retries++ > MAX_CONNECT_RETRIES)
846		return;
847
848	if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) {
849		log_print("no address for nodeid %d", con->nodeid);
850		return;
851	}
852	base_con = nodeid2con(0, 0);
853	BUG_ON(base_con == NULL);
854
855	make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
856
857	outmessage.msg_name = &rem_addr;
858	outmessage.msg_namelen = addrlen;
859	outmessage.msg_control = outcmsg;
860	outmessage.msg_controllen = sizeof(outcmsg);
861	outmessage.msg_flags = MSG_EOR;
862
863	spin_lock(&con->writequeue_lock);
864
865	if (list_empty(&con->writequeue)) {
866		spin_unlock(&con->writequeue_lock);
867		log_print("writequeue empty for nodeid %d", con->nodeid);
868		return;
869	}
870
871	e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
872	len = e->len;
873	offset = e->offset;
874	spin_unlock(&con->writequeue_lock);
875
876	/* Send the first block off the write queue */
877	iov[0].iov_base = page_address(e->page)+offset;
878	iov[0].iov_len = len;
879
880	cmsg = CMSG_FIRSTHDR(&outmessage);
881	cmsg->cmsg_level = IPPROTO_SCTP;
882	cmsg->cmsg_type = SCTP_SNDRCV;
883	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
884	sinfo = CMSG_DATA(cmsg);
885	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
886	sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
887	outmessage.msg_controllen = cmsg->cmsg_len;
888
889	ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
890	if (ret < 0) {
891		log_print("Send first packet to node %d failed: %d",
892			  con->nodeid, ret);
893
894		/* Try again later */
895		clear_bit(CF_CONNECT_PENDING, &con->flags);
896		clear_bit(CF_INIT_PENDING, &con->flags);
897	}
898	else {
899		spin_lock(&con->writequeue_lock);
900		e->offset += ret;
901		e->len -= ret;
902
903		if (e->len == 0 && e->users == 0) {
904			list_del(&e->list);
905			free_entry(e);
906		}
907		spin_unlock(&con->writequeue_lock);
908	}
909}
910
911/* Connect a new socket to its peer */
912static void tcp_connect_to_sock(struct connection *con)
913{
914	int result = -EHOSTUNREACH;
915	struct sockaddr_storage saddr, src_addr;
916	int addr_len;
917	struct socket *sock = NULL;
918
919	if (con->nodeid == 0) {
920		log_print("attempt to connect sock 0 foiled");
921		return;
922	}
923
924	mutex_lock(&con->sock_mutex);
925	if (con->retries++ > MAX_CONNECT_RETRIES)
926		goto out;
927
928	/* Some odd races can cause double-connects, ignore them */
929	if (con->sock) {
930		result = 0;
931		goto out;
932	}
933
934	/* Create a socket to communicate with */
935	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
936				  IPPROTO_TCP, &sock);
937	if (result < 0)
938		goto out_err;
939
940	memset(&saddr, 0, sizeof(saddr));
941	if (dlm_nodeid_to_addr(con->nodeid, &saddr))
942		goto out_err;
943
944	sock->sk->sk_user_data = con;
945	con->rx_action = receive_from_sock;
946	con->connect_action = tcp_connect_to_sock;
947	add_sock(sock, con);
948
949	/* Bind to our cluster-known address connecting to avoid
950	   routing problems */
951	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
952	make_sockaddr(&src_addr, 0, &addr_len);
953	result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
954				 addr_len);
955	if (result < 0) {
956		log_print("could not bind for connect: %d", result);
957		/* This *may* not indicate a critical error */
958	}
959
960	make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
961
962	log_print("connecting to %d", con->nodeid);
963	result =
964		sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
965				   O_NONBLOCK);
966	if (result == -EINPROGRESS)
967		result = 0;
968	if (result == 0)
969		goto out;
970
971out_err:
972	if (con->sock) {
973		sock_release(con->sock);
974		con->sock = NULL;
975	} else if (sock) {
976		sock_release(sock);
977	}
978	/*
979	 * Some errors are fatal and this list might need adjusting. For other
980	 * errors we try again until the max number of retries is reached.
981	 */
982	if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
983	    result != -ENETDOWN && result != -EINVAL
984	    && result != -EPROTONOSUPPORT) {
985		lowcomms_connect_sock(con);
986		result = 0;
987	}
988out:
989	mutex_unlock(&con->sock_mutex);
990	return;
991}
992
993static struct socket *tcp_create_listen_sock(struct connection *con,
994					     struct sockaddr_storage *saddr)
995{
996	struct socket *sock = NULL;
997	int result = 0;
998	int one = 1;
999	int addr_len;
1000
1001	if (dlm_local_addr[0]->ss_family == AF_INET)
1002		addr_len = sizeof(struct sockaddr_in);
1003	else
1004		addr_len = sizeof(struct sockaddr_in6);
1005
1006	/* Create a socket to communicate with */
1007	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1008				  IPPROTO_TCP, &sock);
1009	if (result < 0) {
1010		log_print("Can't create listening comms socket");
1011		goto create_out;
1012	}
1013
1014	result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1015				   (char *)&one, sizeof(one));
1016
1017	if (result < 0) {
1018		log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1019	}
1020	sock->sk->sk_user_data = con;
1021	con->rx_action = tcp_accept_from_sock;
1022	con->connect_action = tcp_connect_to_sock;
1023	con->sock = sock;
1024
1025	/* Bind to our port */
1026	make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1027	result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1028	if (result < 0) {
1029		log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1030		sock_release(sock);
1031		sock = NULL;
1032		con->sock = NULL;
1033		goto create_out;
1034	}
1035	result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1036				 (char *)&one, sizeof(one));
1037	if (result < 0) {
1038		log_print("Set keepalive failed: %d", result);
1039	}
1040
1041	result = sock->ops->listen(sock, 5);
1042	if (result < 0) {
1043		log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1044		sock_release(sock);
1045		sock = NULL;
1046		goto create_out;
1047	}
1048
1049create_out:
1050	return sock;
1051}
1052
1053/* Get local addresses */
1054static void init_local(void)
1055{
1056	struct sockaddr_storage sas, *addr;
1057	int i;
1058
1059	dlm_local_count = 0;
1060	for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
1061		if (dlm_our_addr(&sas, i))
1062			break;
1063
1064		addr = kmalloc(sizeof(*addr), GFP_NOFS);
1065		if (!addr)
1066			break;
1067		memcpy(addr, &sas, sizeof(*addr));
1068		dlm_local_addr[dlm_local_count++] = addr;
1069	}
1070}
1071
1072/* Bind to an IP address. SCTP allows multiple address so it can do
1073   multi-homing */
1074static int add_sctp_bind_addr(struct connection *sctp_con,
1075			      struct sockaddr_storage *addr,
1076			      int addr_len, int num)
1077{
1078	int result = 0;
1079
1080	if (num == 1)
1081		result = kernel_bind(sctp_con->sock,
1082				     (struct sockaddr *) addr,
1083				     addr_len);
1084	else
1085		result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1086					   SCTP_SOCKOPT_BINDX_ADD,
1087					   (char *)addr, addr_len);
1088
1089	if (result < 0)
1090		log_print("Can't bind to port %d addr number %d",
1091			  dlm_config.ci_tcp_port, num);
1092
1093	return result;
1094}
1095
1096/* Initialise SCTP socket and bind to all interfaces */
1097static int sctp_listen_for_all(void)
1098{
1099	struct socket *sock = NULL;
1100	struct sockaddr_storage localaddr;
1101	struct sctp_event_subscribe subscribe;
1102	int result = -EINVAL, num = 1, i, addr_len;
1103	struct connection *con = nodeid2con(0, GFP_NOFS);
1104	int bufsize = NEEDED_RMEM;
1105
1106	if (!con)
1107		return -ENOMEM;
1108
1109	log_print("Using SCTP for communications");
1110
1111	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1112				  IPPROTO_SCTP, &sock);
1113	if (result < 0) {
1114		log_print("Can't create comms socket, check SCTP is loaded");
1115		goto out;
1116	}
1117
1118	/* Listen for events */
1119	memset(&subscribe, 0, sizeof(subscribe));
1120	subscribe.sctp_data_io_event = 1;
1121	subscribe.sctp_association_event = 1;
1122	subscribe.sctp_send_failure_event = 1;
1123	subscribe.sctp_shutdown_event = 1;
1124	subscribe.sctp_partial_delivery_event = 1;
1125
1126	result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1127				 (char *)&bufsize, sizeof(bufsize));
1128	if (result)
1129		log_print("Error increasing buffer space on socket %d", result);
1130
1131	result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1132				   (char *)&subscribe, sizeof(subscribe));
1133	if (result < 0) {
1134		log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1135			  result);
1136		goto create_delsock;
1137	}
1138
1139	/* Init con struct */
1140	sock->sk->sk_user_data = con;
1141	con->sock = sock;
1142	con->sock->sk->sk_data_ready = lowcomms_data_ready;
1143	con->rx_action = receive_from_sock;
1144	con->connect_action = sctp_init_assoc;
1145
1146	/* Bind to all interfaces. */
1147	for (i = 0; i < dlm_local_count; i++) {
1148		memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1149		make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1150
1151		result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1152		if (result)
1153			goto create_delsock;
1154		++num;
1155	}
1156
1157	result = sock->ops->listen(sock, 5);
1158	if (result < 0) {
1159		log_print("Can't set socket listening");
1160		goto create_delsock;
1161	}
1162
1163	return 0;
1164
1165create_delsock:
1166	sock_release(sock);
1167	con->sock = NULL;
1168out:
1169	return result;
1170}
1171
1172static int tcp_listen_for_all(void)
1173{
1174	struct socket *sock = NULL;
1175	struct connection *con = nodeid2con(0, GFP_NOFS);
1176	int result = -EINVAL;
1177
1178	if (!con)
1179		return -ENOMEM;
1180
1181	/* We don't support multi-homed hosts */
1182	if (dlm_local_addr[1] != NULL) {
1183		log_print("TCP protocol can't handle multi-homed hosts, "
1184			  "try SCTP");
1185		return -EINVAL;
1186	}
1187
1188	log_print("Using TCP for communications");
1189
1190	sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1191	if (sock) {
1192		add_sock(sock, con);
1193		result = 0;
1194	}
1195	else {
1196		result = -EADDRINUSE;
1197	}
1198
1199	return result;
1200}
1201
1202
1203
1204static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1205						     gfp_t allocation)
1206{
1207	struct writequeue_entry *entry;
1208
1209	entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1210	if (!entry)
1211		return NULL;
1212
1213	entry->page = alloc_page(allocation);
1214	if (!entry->page) {
1215		kfree(entry);
1216		return NULL;
1217	}
1218
1219	entry->offset = 0;
1220	entry->len = 0;
1221	entry->end = 0;
1222	entry->users = 0;
1223	entry->con = con;
1224
1225	return entry;
1226}
1227
1228void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1229{
1230	struct connection *con;
1231	struct writequeue_entry *e;
1232	int offset = 0;
1233	int users = 0;
1234
1235	con = nodeid2con(nodeid, allocation);
1236	if (!con)
1237		return NULL;
1238
1239	spin_lock(&con->writequeue_lock);
1240	e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1241	if ((&e->list == &con->writequeue) ||
1242	    (PAGE_CACHE_SIZE - e->end < len)) {
1243		e = NULL;
1244	} else {
1245		offset = e->end;
1246		e->end += len;
1247		users = e->users++;
1248	}
1249	spin_unlock(&con->writequeue_lock);
1250
1251	if (e) {
1252	got_one:
1253		*ppc = page_address(e->page) + offset;
1254		return e;
1255	}
1256
1257	e = new_writequeue_entry(con, allocation);
1258	if (e) {
1259		spin_lock(&con->writequeue_lock);
1260		offset = e->end;
1261		e->end += len;
1262		users = e->users++;
1263		list_add_tail(&e->list, &con->writequeue);
1264		spin_unlock(&con->writequeue_lock);
1265		goto got_one;
1266	}
1267	return NULL;
1268}
1269
1270void dlm_lowcomms_commit_buffer(void *mh)
1271{
1272	struct writequeue_entry *e = (struct writequeue_entry *)mh;
1273	struct connection *con = e->con;
1274	int users;
1275
1276	spin_lock(&con->writequeue_lock);
1277	users = --e->users;
1278	if (users)
1279		goto out;
1280	e->len = e->end - e->offset;
1281	spin_unlock(&con->writequeue_lock);
1282
1283	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1284		queue_work(send_workqueue, &con->swork);
1285	}
1286	return;
1287
1288out:
1289	spin_unlock(&con->writequeue_lock);
1290	return;
1291}
1292
1293/* Send a message */
1294static void send_to_sock(struct connection *con)
1295{
1296	int ret = 0;
1297	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1298	struct writequeue_entry *e;
1299	int len, offset;
1300
1301	mutex_lock(&con->sock_mutex);
1302	if (con->sock == NULL)
1303		goto out_connect;
1304
1305	spin_lock(&con->writequeue_lock);
1306	for (;;) {
1307		e = list_entry(con->writequeue.next, struct writequeue_entry,
1308			       list);
1309		if ((struct list_head *) e == &con->writequeue)
1310			break;
1311
1312		len = e->len;
1313		offset = e->offset;
1314		BUG_ON(len == 0 && e->users == 0);
1315		spin_unlock(&con->writequeue_lock);
1316
1317		ret = 0;
1318		if (len) {
1319			ret = kernel_sendpage(con->sock, e->page, offset, len,
1320					      msg_flags);
1321			if (ret == -EAGAIN || ret == 0) {
1322				cond_resched();
1323				goto out;
1324			}
1325			if (ret <= 0)
1326				goto send_error;
1327		}
1328			/* Don't starve people filling buffers */
1329			cond_resched();
1330
1331		spin_lock(&con->writequeue_lock);
1332		e->offset += ret;
1333		e->len -= ret;
1334
1335		if (e->len == 0 && e->users == 0) {
1336			list_del(&e->list);
1337			free_entry(e);
1338			continue;
1339		}
1340	}
1341	spin_unlock(&con->writequeue_lock);
1342out:
1343	mutex_unlock(&con->sock_mutex);
1344	return;
1345
1346send_error:
1347	mutex_unlock(&con->sock_mutex);
1348	close_connection(con, false);
1349	lowcomms_connect_sock(con);
1350	return;
1351
1352out_connect:
1353	mutex_unlock(&con->sock_mutex);
1354	if (!test_bit(CF_INIT_PENDING, &con->flags))
1355		lowcomms_connect_sock(con);
1356	return;
1357}
1358
1359static void clean_one_writequeue(struct connection *con)
1360{
1361	struct writequeue_entry *e, *safe;
1362
1363	spin_lock(&con->writequeue_lock);
1364	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1365		list_del(&e->list);
1366		free_entry(e);
1367	}
1368	spin_unlock(&con->writequeue_lock);
1369}
1370
1371/* Called from recovery when it knows that a node has
1372   left the cluster */
1373int dlm_lowcomms_close(int nodeid)
1374{
1375	struct connection *con;
1376
1377	log_print("closing connection to node %d", nodeid);
1378	con = nodeid2con(nodeid, 0);
1379	if (con) {
1380		clear_bit(CF_CONNECT_PENDING, &con->flags);
1381		clear_bit(CF_WRITE_PENDING, &con->flags);
1382		set_bit(CF_CLOSE, &con->flags);
1383		if (cancel_work_sync(&con->swork))
1384			log_print("canceled swork for node %d", nodeid);
1385		if (cancel_work_sync(&con->rwork))
1386			log_print("canceled rwork for node %d", nodeid);
1387		clean_one_writequeue(con);
1388		close_connection(con, true);
1389	}
1390	return 0;
1391}
1392
1393/* Receive workqueue function */
1394static void process_recv_sockets(struct work_struct *work)
1395{
1396	struct connection *con = container_of(work, struct connection, rwork);
1397	int err;
1398
1399	clear_bit(CF_READ_PENDING, &con->flags);
1400	do {
1401		err = con->rx_action(con);
1402	} while (!err);
1403}
1404
1405/* Send workqueue function */
1406static void process_send_sockets(struct work_struct *work)
1407{
1408	struct connection *con = container_of(work, struct connection, swork);
1409
1410	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
1411		con->connect_action(con);
1412		set_bit(CF_WRITE_PENDING, &con->flags);
1413	}
1414	if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1415		send_to_sock(con);
1416}
1417
1418
1419/* Discard all entries on the write queues */
1420static void clean_writequeues(void)
1421{
1422	foreach_conn(clean_one_writequeue);
1423}
1424
1425static void work_stop(void)
1426{
1427	destroy_workqueue(recv_workqueue);
1428	destroy_workqueue(send_workqueue);
1429}
1430
1431static int work_start(void)
1432{
1433	int error;
1434	recv_workqueue = create_workqueue("dlm_recv");
1435	error = IS_ERR(recv_workqueue);
1436	if (error) {
1437		log_print("can't start dlm_recv %d", error);
1438		return error;
1439	}
1440
1441	send_workqueue = create_singlethread_workqueue("dlm_send");
1442	error = IS_ERR(send_workqueue);
1443	if (error) {
1444		log_print("can't start dlm_send %d", error);
1445		destroy_workqueue(recv_workqueue);
1446		return error;
1447	}
1448
1449	return 0;
1450}
1451
1452static void stop_conn(struct connection *con)
1453{
1454	con->flags |= 0x0F;
1455	if (con->sock && con->sock->sk)
1456		con->sock->sk->sk_user_data = NULL;
1457}
1458
1459static void free_conn(struct connection *con)
1460{
1461	close_connection(con, true);
1462	if (con->othercon)
1463		kmem_cache_free(con_cache, con->othercon);
1464	hlist_del(&con->list);
1465	kmem_cache_free(con_cache, con);
1466}
1467
1468void dlm_lowcomms_stop(void)
1469{
1470	/* Set all the flags to prevent any
1471	   socket activity.
1472	*/
1473	mutex_lock(&connections_lock);
1474	foreach_conn(stop_conn);
1475	mutex_unlock(&connections_lock);
1476
1477	work_stop();
1478
1479	mutex_lock(&connections_lock);
1480	clean_writequeues();
1481
1482	foreach_conn(free_conn);
1483
1484	mutex_unlock(&connections_lock);
1485	kmem_cache_destroy(con_cache);
1486}
1487
1488int dlm_lowcomms_start(void)
1489{
1490	int error = -EINVAL;
1491	struct connection *con;
1492	int i;
1493
1494	for (i = 0; i < CONN_HASH_SIZE; i++)
1495		INIT_HLIST_HEAD(&connection_hash[i]);
1496
1497	init_local();
1498	if (!dlm_local_count) {
1499		error = -ENOTCONN;
1500		log_print("no local IP address has been set");
1501		goto out;
1502	}
1503
1504	error = -ENOMEM;
1505	con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1506				      __alignof__(struct connection), 0,
1507				      NULL);
1508	if (!con_cache)
1509		goto out;
1510
1511	/* Start listening */
1512	if (dlm_config.ci_protocol == 0)
1513		error = tcp_listen_for_all();
1514	else
1515		error = sctp_listen_for_all();
1516	if (error)
1517		goto fail_unlisten;
1518
1519	error = work_start();
1520	if (error)
1521		goto fail_unlisten;
1522
1523	return 0;
1524
1525fail_unlisten:
1526	con = nodeid2con(0,0);
1527	if (con) {
1528		close_connection(con, false);
1529		kmem_cache_free(con_cache, con);
1530	}
1531	kmem_cache_destroy(con_cache);
1532
1533out:
1534	return error;
1535}
1536