ctl_ha.c revision 287855
1/*-
2 * Copyright (c) 2015 Alexander Motin <mav@FreeBSD.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer,
10 *    without modification, immediately at the beginning of the file.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
28__FBSDID("$FreeBSD: head/sys/cam/ctl/ctl_ha.c 287855 2015-09-16 09:59:05Z mav $");
29
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/kernel.h>
33#include <sys/kthread.h>
34#include <sys/types.h>
35#include <sys/limits.h>
36#include <sys/lock.h>
37#include <sys/module.h>
38#include <sys/mutex.h>
39#include <sys/condvar.h>
40#include <sys/malloc.h>
41#include <sys/mbuf.h>
42#include <sys/proc.h>
43#include <sys/conf.h>
44#include <sys/queue.h>
45#include <sys/sysctl.h>
46#include <sys/socket.h>
47#include <sys/socketvar.h>
48#include <sys/uio.h>
49#include <netinet/in.h>
50#include <netinet/tcp.h>
51#include <vm/uma.h>
52
53#include <cam/cam.h>
54#include <cam/scsi/scsi_all.h>
55#include <cam/scsi/scsi_da.h>
56#include <cam/ctl/ctl_io.h>
57#include <cam/ctl/ctl.h>
58#include <cam/ctl/ctl_frontend.h>
59#include <cam/ctl/ctl_util.h>
60#include <cam/ctl/ctl_backend.h>
61#include <cam/ctl/ctl_ioctl.h>
62#include <cam/ctl/ctl_ha.h>
63#include <cam/ctl/ctl_private.h>
64#include <cam/ctl/ctl_debug.h>
65#include <cam/ctl/ctl_error.h>
66
67#if (__FreeBSD_version < 1100000)
68struct mbufq {
69	struct mbuf *head;
70	struct mbuf *tail;
71};
72
73static void
74mbufq_init(struct mbufq *q, int limit)
75{
76
77	q->head = q->tail = NULL;
78}
79
80static void
81mbufq_drain(struct mbufq *q)
82{
83	struct mbuf *m;
84
85	while ((m = q->head) != NULL) {
86		q->head = m->m_nextpkt;
87		m_freem(m);
88	}
89	q->tail = NULL;
90}
91
92static struct mbuf *
93mbufq_dequeue(struct mbufq *q)
94{
95	struct mbuf *m;
96
97	m = q->head;
98	if (m) {
99		if (q->tail == m)
100			q->tail = NULL;
101		q->head = m->m_nextpkt;
102		m->m_nextpkt = NULL;
103	}
104	return (m);
105}
106
107static void
108mbufq_enqueue(struct mbufq *q, struct mbuf *m)
109{
110
111	m->m_nextpkt = NULL;
112	if (q->tail)
113		q->tail->m_nextpkt = m;
114	else
115		q->head = m;
116	q->tail = m;
117}
118
119static u_int
120sbavail(struct sockbuf *sb)
121{
122	return (sb->sb_cc);
123}
124
125#if (__FreeBSD_version < 1000000)
126#define	mtodo(m, o)	((void *)(((m)->m_data) + (o)))
127#endif
128#endif
129
130struct ha_msg_wire {
131	uint32_t	 channel;
132	uint32_t	 length;
133};
134
135struct ha_dt_msg_wire {
136	ctl_ha_dt_cmd	command;
137	uint32_t	size;
138	uint8_t		*local;
139	uint8_t		*remote;
140};
141
142struct ha_softc {
143	struct ctl_softc *ha_ctl_softc;
144	ctl_evt_handler	 ha_handler[CTL_HA_CHAN_MAX];
145	char		 ha_peer[128];
146	struct sockaddr_in  ha_peer_in;
147	struct socket	*ha_lso;
148	struct socket	*ha_so;
149	struct mbufq	 ha_sendq;
150	struct mbuf	*ha_sending;
151	struct mtx	 ha_lock;
152	int		 ha_connect;
153	int		 ha_listen;
154	int		 ha_connected;
155	int		 ha_receiving;
156	int		 ha_wakeup;
157	int		 ha_disconnect;
158	TAILQ_HEAD(, ctl_ha_dt_req) ha_dts;
159} ha_softc;
160
161extern struct ctl_softc *control_softc;
162
163static void
164ctl_ha_conn_wake(struct ha_softc *softc)
165{
166
167	mtx_lock(&softc->ha_lock);
168	softc->ha_wakeup = 1;
169	mtx_unlock(&softc->ha_lock);
170	wakeup(&softc->ha_wakeup);
171}
172
173static int
174ctl_ha_lupcall(struct socket *so, void *arg, int waitflag)
175{
176	struct ha_softc *softc = arg;
177
178	ctl_ha_conn_wake(softc);
179	return (SU_OK);
180}
181
182static int
183ctl_ha_rupcall(struct socket *so, void *arg, int waitflag)
184{
185	struct ha_softc *softc = arg;
186
187	wakeup(&softc->ha_receiving);
188	return (SU_OK);
189}
190
191static int
192ctl_ha_supcall(struct socket *so, void *arg, int waitflag)
193{
194	struct ha_softc *softc = arg;
195
196	ctl_ha_conn_wake(softc);
197	return (SU_OK);
198}
199
200static void
201ctl_ha_evt(struct ha_softc *softc, ctl_ha_channel ch, ctl_ha_event evt,
202    int param)
203{
204	int i;
205
206	if (ch < CTL_HA_CHAN_MAX) {
207		if (softc->ha_handler[ch])
208			softc->ha_handler[ch](ch, evt, param);
209		return;
210	}
211	for (i = 0; i < CTL_HA_CHAN_MAX; i++) {
212		if (softc->ha_handler[i])
213			softc->ha_handler[i](i, evt, param);
214	}
215}
216
217static void
218ctl_ha_close(struct ha_softc *softc)
219{
220	struct socket *so = softc->ha_so;
221	int report = 0;
222
223	if (softc->ha_connected || softc->ha_disconnect) {
224		softc->ha_connected = 0;
225		mbufq_drain(&softc->ha_sendq);
226		m_freem(softc->ha_sending);
227		softc->ha_sending = NULL;
228		report = 1;
229	}
230	if (so) {
231		SOCKBUF_LOCK(&so->so_rcv);
232		soupcall_clear(so, SO_RCV);
233		while (softc->ha_receiving) {
234			wakeup(&softc->ha_receiving);
235			msleep(&softc->ha_receiving, SOCKBUF_MTX(&so->so_rcv),
236			    0, "ha_rx exit", 0);
237		}
238		SOCKBUF_UNLOCK(&so->so_rcv);
239		SOCKBUF_LOCK(&so->so_snd);
240		soupcall_clear(so, SO_SND);
241		SOCKBUF_UNLOCK(&so->so_snd);
242		softc->ha_so = NULL;
243		if (softc->ha_connect)
244			pause("reconnect", hz / 2);
245		soclose(so);
246	}
247	if (report) {
248		ctl_ha_evt(softc, CTL_HA_CHAN_MAX, CTL_HA_EVT_LINK_CHANGE,
249		    (softc->ha_connect || softc->ha_listen) ?
250		    CTL_HA_LINK_UNKNOWN : CTL_HA_LINK_OFFLINE);
251	}
252}
253
254static void
255ctl_ha_lclose(struct ha_softc *softc)
256{
257
258	if (softc->ha_lso) {
259		SOCKBUF_LOCK(&softc->ha_lso->so_rcv);
260		soupcall_clear(softc->ha_lso, SO_RCV);
261		SOCKBUF_UNLOCK(&softc->ha_lso->so_rcv);
262		soclose(softc->ha_lso);
263		softc->ha_lso = NULL;
264	}
265}
266
267static void
268ctl_ha_rx_thread(void *arg)
269{
270	struct ha_softc *softc = arg;
271	struct socket *so = softc->ha_so;
272	struct ha_msg_wire wire_hdr;
273	struct uio uio;
274	struct iovec iov;
275	int error, flags, next;
276
277	bzero(&wire_hdr, sizeof(wire_hdr));
278	while (1) {
279		if (wire_hdr.length > 0)
280			next = wire_hdr.length;
281		else
282			next = sizeof(wire_hdr);
283		SOCKBUF_LOCK(&so->so_rcv);
284		while (sbavail(&so->so_rcv) < next) {
285			if (softc->ha_connected == 0 || so->so_error ||
286			    (so->so_rcv.sb_state & SBS_CANTRCVMORE)) {
287				goto errout;
288			}
289			so->so_rcv.sb_lowat = next;
290			msleep(&softc->ha_receiving, SOCKBUF_MTX(&so->so_rcv),
291			    0, "-", 0);
292		}
293		SOCKBUF_UNLOCK(&so->so_rcv);
294
295		if (wire_hdr.length == 0) {
296			iov.iov_base = &wire_hdr;
297			iov.iov_len = sizeof(wire_hdr);
298			uio.uio_iov = &iov;
299			uio.uio_iovcnt = 1;
300			uio.uio_rw = UIO_READ;
301			uio.uio_segflg = UIO_SYSSPACE;
302			uio.uio_td = curthread;
303			uio.uio_resid = sizeof(wire_hdr);
304			flags = MSG_DONTWAIT;
305			error = soreceive(softc->ha_so, NULL, &uio, NULL,
306			    NULL, &flags);
307			if (error != 0) {
308				printf("%s: header receive error %d\n",
309				    __func__, error);
310				SOCKBUF_LOCK(&so->so_rcv);
311				goto errout;
312			}
313		} else {
314			ctl_ha_evt(softc, wire_hdr.channel,
315			    CTL_HA_EVT_MSG_RECV, wire_hdr.length);
316			wire_hdr.length = 0;
317		}
318	}
319
320errout:
321	softc->ha_receiving = 0;
322	wakeup(&softc->ha_receiving);
323	SOCKBUF_UNLOCK(&so->so_rcv);
324	ctl_ha_conn_wake(softc);
325	kthread_exit();
326}
327
328static void
329ctl_ha_send(struct ha_softc *softc)
330{
331	struct socket *so = softc->ha_so;
332	int error;
333
334	while (1) {
335		if (softc->ha_sending == NULL) {
336			mtx_lock(&softc->ha_lock);
337			softc->ha_sending = mbufq_dequeue(&softc->ha_sendq);
338			mtx_unlock(&softc->ha_lock);
339			if (softc->ha_sending == NULL) {
340				so->so_snd.sb_lowat = so->so_snd.sb_hiwat + 1;
341				break;
342			}
343		}
344		SOCKBUF_LOCK(&so->so_snd);
345		if (sbspace(&so->so_snd) < softc->ha_sending->m_pkthdr.len) {
346			so->so_snd.sb_lowat = softc->ha_sending->m_pkthdr.len;
347			SOCKBUF_UNLOCK(&so->so_snd);
348			break;
349		}
350		SOCKBUF_UNLOCK(&so->so_snd);
351		error = sosend(softc->ha_so, NULL, NULL, softc->ha_sending,
352		    NULL, MSG_DONTWAIT, curthread);
353		softc->ha_sending = NULL;
354		if (error != 0) {
355			printf("%s: sosend() error %d\n", __func__, error);
356			return;
357		}
358	};
359}
360
361static void
362ctl_ha_sock_setup(struct ha_softc *softc)
363{
364	struct sockopt opt;
365	struct socket *so = softc->ha_so;
366	int error, val;
367
368	val = 1024 * 1024;
369	error = soreserve(so, val, val);
370	if (error)
371		printf("%s: soreserve failed %d\n", __func__, error);
372
373	SOCKBUF_LOCK(&so->so_rcv);
374	so->so_rcv.sb_lowat = sizeof(struct ha_msg_wire);
375	soupcall_set(so, SO_RCV, ctl_ha_rupcall, softc);
376	SOCKBUF_UNLOCK(&so->so_rcv);
377	SOCKBUF_LOCK(&so->so_snd);
378	so->so_snd.sb_lowat = sizeof(struct ha_msg_wire);
379	soupcall_set(so, SO_SND, ctl_ha_supcall, softc);
380	SOCKBUF_UNLOCK(&so->so_snd);
381
382	bzero(&opt, sizeof(struct sockopt));
383	opt.sopt_dir = SOPT_SET;
384	opt.sopt_level = SOL_SOCKET;
385	opt.sopt_name = SO_KEEPALIVE;
386	opt.sopt_val = &val;
387	opt.sopt_valsize = sizeof(val);
388	val = 1;
389	error = sosetopt(so, &opt);
390	if (error)
391		printf("%s: KEEPALIVE setting failed %d\n", __func__, error);
392
393	opt.sopt_level = IPPROTO_TCP;
394	opt.sopt_name = TCP_NODELAY;
395	val = 1;
396	error = sosetopt(so, &opt);
397	if (error)
398		printf("%s: NODELAY setting failed %d\n", __func__, error);
399
400	opt.sopt_name = TCP_KEEPINIT;
401	val = 3;
402	error = sosetopt(so, &opt);
403	if (error)
404		printf("%s: KEEPINIT setting failed %d\n", __func__, error);
405
406	opt.sopt_name = TCP_KEEPIDLE;
407	val = 1;
408	error = sosetopt(so, &opt);
409	if (error)
410		printf("%s: KEEPIDLE setting failed %d\n", __func__, error);
411
412	opt.sopt_name = TCP_KEEPINTVL;
413	val = 1;
414	error = sosetopt(so, &opt);
415	if (error)
416		printf("%s: KEEPINTVL setting failed %d\n", __func__, error);
417
418	opt.sopt_name = TCP_KEEPCNT;
419	val = 5;
420	error = sosetopt(so, &opt);
421	if (error)
422		printf("%s: KEEPCNT setting failed %d\n", __func__, error);
423}
424
425static int
426ctl_ha_connect(struct ha_softc *softc)
427{
428	struct thread *td = curthread;
429	struct socket *so;
430	int error;
431
432	/* Create the socket */
433	error = socreate(PF_INET, &so, SOCK_STREAM,
434	    IPPROTO_TCP, td->td_ucred, td);
435	if (error != 0) {
436		printf("%s: socreate() error %d\n", __func__, error);
437		return (error);
438	}
439	softc->ha_so = so;
440	ctl_ha_sock_setup(softc);
441
442	error = soconnect(so, (struct sockaddr *)&softc->ha_peer_in, td);
443	if (error != 0) {
444		printf("%s: soconnect() error %d\n", __func__, error);
445		goto out;
446	}
447	return (0);
448
449out:
450	ctl_ha_close(softc);
451	return (error);
452}
453
454static int
455ctl_ha_accept(struct ha_softc *softc)
456{
457	struct socket *so;
458	struct sockaddr *sap;
459	int error;
460
461	ACCEPT_LOCK();
462	if (softc->ha_lso->so_rcv.sb_state & SBS_CANTRCVMORE)
463		softc->ha_lso->so_error = ECONNABORTED;
464	if (softc->ha_lso->so_error) {
465		error = softc->ha_lso->so_error;
466		softc->ha_lso->so_error = 0;
467		ACCEPT_UNLOCK();
468		printf("%s: socket error %d\n", __func__, error);
469		goto out;
470	}
471	so = TAILQ_FIRST(&softc->ha_lso->so_comp);
472	if (so == NULL) {
473		ACCEPT_UNLOCK();
474		return (EWOULDBLOCK);
475	}
476	KASSERT(!(so->so_qstate & SQ_INCOMP), ("accept1: so SQ_INCOMP"));
477	KASSERT(so->so_qstate & SQ_COMP, ("accept1: so not SQ_COMP"));
478
479	/*
480	 * Before changing the flags on the socket, we have to bump the
481	 * reference count.  Otherwise, if the protocol calls sofree(),
482	 * the socket will be released due to a zero refcount.
483	 */
484	SOCK_LOCK(so);			/* soref() and so_state update */
485	soref(so);			/* file descriptor reference */
486
487	TAILQ_REMOVE(&softc->ha_lso->so_comp, so, so_list);
488	softc->ha_lso->so_qlen--;
489	so->so_state |= SS_NBIO;
490	so->so_qstate &= ~SQ_COMP;
491	so->so_head = NULL;
492
493	SOCK_UNLOCK(so);
494	ACCEPT_UNLOCK();
495
496	sap = NULL;
497	error = soaccept(so, &sap);
498	if (error != 0) {
499		printf("%s: soaccept() error %d\n", __func__, error);
500		if (sap != NULL)
501			free(sap, M_SONAME);
502		goto out;
503	}
504	if (sap != NULL)
505		free(sap, M_SONAME);
506	softc->ha_so = so;
507	ctl_ha_sock_setup(softc);
508	return (0);
509
510out:
511	ctl_ha_lclose(softc);
512	return (error);
513}
514
515static int
516ctl_ha_listen(struct ha_softc *softc)
517{
518	struct thread *td = curthread;
519	struct sockopt opt;
520	int error, val;
521
522	/* Create the socket */
523	if (softc->ha_lso == NULL) {
524		error = socreate(PF_INET, &softc->ha_lso, SOCK_STREAM,
525		    IPPROTO_TCP, td->td_ucred, td);
526		if (error != 0) {
527			printf("%s: socreate() error %d\n", __func__, error);
528			return (error);
529		}
530		bzero(&opt, sizeof(struct sockopt));
531		opt.sopt_dir = SOPT_SET;
532		opt.sopt_level = SOL_SOCKET;
533		opt.sopt_name = SO_REUSEADDR;
534		opt.sopt_val = &val;
535		opt.sopt_valsize = sizeof(val);
536		val = 1;
537		error = sosetopt(softc->ha_lso, &opt);
538		if (error) {
539			printf("%s: REUSEADDR setting failed %d\n",
540			    __func__, error);
541		}
542		SOCKBUF_LOCK(&softc->ha_lso->so_rcv);
543		soupcall_set(softc->ha_lso, SO_RCV, ctl_ha_lupcall, softc);
544		SOCKBUF_UNLOCK(&softc->ha_lso->so_rcv);
545	}
546
547	error = sobind(softc->ha_lso, (struct sockaddr *)&softc->ha_peer_in, td);
548	if (error != 0) {
549		printf("%s: sobind() error %d\n", __func__, error);
550		goto out;
551	}
552	error = solisten(softc->ha_lso, 1, td);
553	if (error != 0) {
554		printf("%s: solisten() error %d\n", __func__, error);
555		goto out;
556	}
557	return (0);
558
559out:
560	ctl_ha_lclose(softc);
561	return (error);
562}
563
564static void
565ctl_ha_conn_thread(void *arg)
566{
567	struct ha_softc *softc = arg;
568	int error;
569
570	while (1) {
571		if (softc->ha_disconnect) {
572			ctl_ha_close(softc);
573			ctl_ha_lclose(softc);
574			softc->ha_disconnect = 0;
575		} else if (softc->ha_so != NULL &&
576		    (softc->ha_so->so_error ||
577		     softc->ha_so->so_rcv.sb_state & SBS_CANTRCVMORE))
578			ctl_ha_close(softc);
579		if (softc->ha_so == NULL) {
580			if (softc->ha_lso != NULL)
581				ctl_ha_accept(softc);
582			else if (softc->ha_listen)
583				ctl_ha_listen(softc);
584			else if (softc->ha_connect)
585				ctl_ha_connect(softc);
586		}
587		if (softc->ha_so != NULL) {
588			if (softc->ha_connected == 0 &&
589			    softc->ha_so->so_error == 0 &&
590			    (softc->ha_so->so_state & SS_ISCONNECTING) == 0) {
591				softc->ha_connected = 1;
592				ctl_ha_evt(softc, CTL_HA_CHAN_MAX,
593				    CTL_HA_EVT_LINK_CHANGE,
594				    CTL_HA_LINK_ONLINE);
595				softc->ha_receiving = 1;
596				error = kproc_kthread_add(ctl_ha_rx_thread,
597				    softc, &softc->ha_ctl_softc->ctl_proc,
598				    NULL, 0, 0, "ctl", "ha_rx");
599				if (error != 0) {
600					printf("Error creating CTL HA rx thread!\n");
601					softc->ha_receiving = 0;
602					softc->ha_disconnect = 1;
603				}
604			}
605			ctl_ha_send(softc);
606		}
607		mtx_lock(&softc->ha_lock);
608		if (softc->ha_so != NULL &&
609		    (softc->ha_so->so_error ||
610		     softc->ha_so->so_rcv.sb_state & SBS_CANTRCVMORE))
611			;
612		else if (!softc->ha_wakeup)
613			msleep(&softc->ha_wakeup, &softc->ha_lock, 0, "-", hz);
614		softc->ha_wakeup = 0;
615		mtx_unlock(&softc->ha_lock);
616	}
617}
618
619static int
620ctl_ha_peer_sysctl(SYSCTL_HANDLER_ARGS)
621{
622	struct ha_softc *softc = (struct ha_softc *)arg1;
623	struct sockaddr_in *sa;
624	int error, b1, b2, b3, b4, p, num;
625	char buf[128];
626
627	strlcpy(buf, softc->ha_peer, sizeof(buf));
628	error = sysctl_handle_string(oidp, buf, sizeof(buf), req);
629	if ((error != 0) || (req->newptr == NULL) ||
630	    strncmp(buf, softc->ha_peer, sizeof(buf)) == 0)
631		return (error);
632
633	sa = &softc->ha_peer_in;
634	mtx_lock(&softc->ha_lock);
635	if ((num = sscanf(buf, "connect %d.%d.%d.%d:%d",
636	    &b1, &b2, &b3, &b4, &p)) >= 4) {
637		softc->ha_connect = 1;
638		softc->ha_listen = 0;
639	} else if ((num = sscanf(buf, "listen %d.%d.%d.%d:%d",
640	    &b1, &b2, &b3, &b4, &p)) >= 4) {
641		softc->ha_connect = 0;
642		softc->ha_listen = 1;
643	} else {
644		softc->ha_connect = 0;
645		softc->ha_listen = 0;
646		if (buf[0] != 0) {
647			buf[0] = 0;
648			error = EINVAL;
649		}
650	}
651	strlcpy(softc->ha_peer, buf, sizeof(softc->ha_peer));
652	if (softc->ha_connect || softc->ha_listen) {
653		memset(sa, 0, sizeof(*sa));
654		sa->sin_len = sizeof(struct sockaddr_in);
655		sa->sin_family = AF_INET;
656		sa->sin_port = htons((num >= 5) ? p : 999);
657		sa->sin_addr.s_addr =
658		    htonl((b1 << 24) + (b2 << 16) + (b3 << 8) + b4);
659	}
660	softc->ha_disconnect = 1;
661	softc->ha_wakeup = 1;
662	mtx_unlock(&softc->ha_lock);
663	wakeup(&softc->ha_wakeup);
664	return (error);
665}
666
667ctl_ha_status
668ctl_ha_msg_register(ctl_ha_channel channel, ctl_evt_handler handler)
669{
670	struct ha_softc *softc = &ha_softc;
671
672	KASSERT(channel < CTL_HA_CHAN_MAX,
673	    ("Wrong CTL HA channel %d", channel));
674	softc->ha_handler[channel] = handler;
675	return (CTL_HA_STATUS_SUCCESS);
676}
677
678ctl_ha_status
679ctl_ha_msg_deregister(ctl_ha_channel channel)
680{
681	struct ha_softc *softc = &ha_softc;
682
683	KASSERT(channel < CTL_HA_CHAN_MAX,
684	    ("Wrong CTL HA channel %d", channel));
685	softc->ha_handler[channel] = NULL;
686	return (CTL_HA_STATUS_SUCCESS);
687}
688
689/*
690 * Receive a message of the specified size.
691 */
692ctl_ha_status
693ctl_ha_msg_recv(ctl_ha_channel channel, void *addr, size_t len,
694		int wait)
695{
696	struct ha_softc *softc = &ha_softc;
697	struct uio uio;
698	struct iovec iov;
699	int error, flags;
700
701	if (!softc->ha_connected)
702		return (CTL_HA_STATUS_DISCONNECT);
703
704	iov.iov_base = addr;
705	iov.iov_len = len;
706	uio.uio_iov = &iov;
707	uio.uio_iovcnt = 1;
708	uio.uio_rw = UIO_READ;
709	uio.uio_segflg = UIO_SYSSPACE;
710	uio.uio_td = curthread;
711	uio.uio_resid = len;
712	flags = wait ? 0 : MSG_DONTWAIT;
713	error = soreceive(softc->ha_so, NULL, &uio, NULL, NULL, &flags);
714	if (error == 0)
715		return (CTL_HA_STATUS_SUCCESS);
716
717	/* Consider all errors fatal for HA sanity. */
718	mtx_lock(&softc->ha_lock);
719	if (softc->ha_connected) {
720		softc->ha_disconnect = 1;
721		softc->ha_wakeup = 1;
722		wakeup(&softc->ha_wakeup);
723	}
724	mtx_unlock(&softc->ha_lock);
725	return (CTL_HA_STATUS_ERROR);
726}
727
728/*
729 * Send a message of the specified size.
730 */
731ctl_ha_status
732ctl_ha_msg_send2(ctl_ha_channel channel, const void *addr, size_t len,
733    const void *addr2, size_t len2, int wait)
734{
735	struct ha_softc *softc = &ha_softc;
736	struct mbuf *mb, *newmb;
737	struct ha_msg_wire hdr;
738	size_t copylen, off;
739
740	if (!softc->ha_connected)
741		return (CTL_HA_STATUS_DISCONNECT);
742
743	newmb = m_getm2(NULL, sizeof(hdr) + len + len2, wait, MT_DATA,
744	    M_PKTHDR);
745	if (newmb == NULL) {
746		/* Consider all errors fatal for HA sanity. */
747		mtx_lock(&softc->ha_lock);
748		if (softc->ha_connected) {
749			softc->ha_disconnect = 1;
750			softc->ha_wakeup = 1;
751			wakeup(&softc->ha_wakeup);
752		}
753		mtx_unlock(&softc->ha_lock);
754		printf("%s: Can't allocate mbuf chain\n", __func__);
755		return (CTL_HA_STATUS_ERROR);
756	}
757	hdr.channel = channel;
758	hdr.length = len + len2;
759	mb = newmb;
760	memcpy(mtodo(mb, 0), &hdr, sizeof(hdr));
761	mb->m_len += sizeof(hdr);
762	off = 0;
763	for (; mb != NULL && off < len; mb = mb->m_next) {
764		copylen = min(M_TRAILINGSPACE(mb), len - off);
765		memcpy(mtodo(mb, mb->m_len), (const char *)addr + off, copylen);
766		mb->m_len += copylen;
767		off += copylen;
768		if (off == len)
769			break;
770	}
771	KASSERT(off == len, ("%s: off (%zu) != len (%zu)", __func__,
772	    off, len));
773	off = 0;
774	for (; mb != NULL && off < len2; mb = mb->m_next) {
775		copylen = min(M_TRAILINGSPACE(mb), len2 - off);
776		memcpy(mtodo(mb, mb->m_len), (const char *)addr2 + off, copylen);
777		mb->m_len += copylen;
778		off += copylen;
779	}
780	KASSERT(off == len2, ("%s: off (%zu) != len2 (%zu)", __func__,
781	    off, len2));
782	newmb->m_pkthdr.len = sizeof(hdr) + len + len2;
783
784	mtx_lock(&softc->ha_lock);
785	if (!softc->ha_connected) {
786		mtx_unlock(&softc->ha_lock);
787		m_freem(newmb);
788		return (CTL_HA_STATUS_DISCONNECT);
789	}
790	mbufq_enqueue(&softc->ha_sendq, newmb);
791	softc->ha_wakeup = 1;
792	mtx_unlock(&softc->ha_lock);
793	wakeup(&softc->ha_wakeup);
794	return (CTL_HA_STATUS_SUCCESS);
795}
796
797ctl_ha_status
798ctl_ha_msg_send(ctl_ha_channel channel, const void *addr, size_t len,
799    int wait)
800{
801
802	return (ctl_ha_msg_send2(channel, addr, len, NULL, 0, wait));
803}
804
805/*
806 * Allocate a data transfer request structure.
807 */
808struct ctl_ha_dt_req *
809ctl_dt_req_alloc(void)
810{
811
812	return (malloc(sizeof(struct ctl_ha_dt_req), M_CTL, M_WAITOK | M_ZERO));
813}
814
815/*
816 * Free a data transfer request structure.
817 */
818void
819ctl_dt_req_free(struct ctl_ha_dt_req *req)
820{
821
822	free(req, M_CTL);
823}
824
825/*
826 * Issue a DMA request for a single buffer.
827 */
828ctl_ha_status
829ctl_dt_single(struct ctl_ha_dt_req *req)
830{
831	struct ha_softc *softc = &ha_softc;
832	struct ha_dt_msg_wire wire_dt;
833	ctl_ha_status status;
834
835	wire_dt.command = req->command;
836	wire_dt.size = req->size;
837	wire_dt.local = req->local;
838	wire_dt.remote = req->remote;
839	if (req->command == CTL_HA_DT_CMD_READ && req->callback != NULL) {
840		mtx_lock(&softc->ha_lock);
841		TAILQ_INSERT_TAIL(&softc->ha_dts, req, links);
842		mtx_unlock(&softc->ha_lock);
843		ctl_ha_msg_send(CTL_HA_CHAN_DATA, &wire_dt, sizeof(wire_dt),
844		    M_WAITOK);
845		return (CTL_HA_STATUS_WAIT);
846	}
847	if (req->command == CTL_HA_DT_CMD_READ) {
848		status = ctl_ha_msg_send(CTL_HA_CHAN_DATA, &wire_dt,
849		    sizeof(wire_dt), M_WAITOK);
850	} else {
851		status = ctl_ha_msg_send2(CTL_HA_CHAN_DATA, &wire_dt,
852		    sizeof(wire_dt), req->local, req->size, M_WAITOK);
853	}
854	return (status);
855}
856
857static void
858ctl_dt_event_handler(ctl_ha_channel channel, ctl_ha_event event, int param)
859{
860	struct ha_softc *softc = &ha_softc;
861	struct ctl_ha_dt_req *req;
862	ctl_ha_status isc_status;
863
864	if (event == CTL_HA_EVT_MSG_RECV) {
865		struct ha_dt_msg_wire wire_dt;
866		uint8_t *tmp;
867		int size;
868
869		size = min(sizeof(wire_dt), param);
870		isc_status = ctl_ha_msg_recv(CTL_HA_CHAN_DATA, &wire_dt,
871					     size, M_WAITOK);
872		if (isc_status != CTL_HA_STATUS_SUCCESS) {
873			printf("%s: Error receiving message: %d\n",
874			    __func__, isc_status);
875			return;
876		}
877
878		if (wire_dt.command == CTL_HA_DT_CMD_READ) {
879			wire_dt.command = CTL_HA_DT_CMD_WRITE;
880			tmp = wire_dt.local;
881			wire_dt.local = wire_dt.remote;
882			wire_dt.remote = tmp;
883			ctl_ha_msg_send2(CTL_HA_CHAN_DATA, &wire_dt,
884			    sizeof(wire_dt), wire_dt.local, wire_dt.size,
885			    M_WAITOK);
886		} else if (wire_dt.command == CTL_HA_DT_CMD_WRITE) {
887			isc_status = ctl_ha_msg_recv(CTL_HA_CHAN_DATA,
888			    wire_dt.remote, wire_dt.size, M_WAITOK);
889			mtx_lock(&softc->ha_lock);
890			TAILQ_FOREACH(req, &softc->ha_dts, links) {
891				if (req->local == wire_dt.remote) {
892					TAILQ_REMOVE(&softc->ha_dts, req, links);
893					break;
894				}
895			}
896			mtx_unlock(&softc->ha_lock);
897			if (req) {
898				req->ret = isc_status;
899				req->callback(req);
900			}
901		}
902	} else if (event == CTL_HA_EVT_LINK_CHANGE) {
903		CTL_DEBUG_PRINT(("%s: Link state change to %d\n", __func__,
904		    param));
905		if (param != CTL_HA_LINK_ONLINE) {
906			mtx_lock(&softc->ha_lock);
907			while ((req = TAILQ_FIRST(&softc->ha_dts)) != NULL) {
908				TAILQ_REMOVE(&softc->ha_dts, req, links);
909				mtx_unlock(&softc->ha_lock);
910				req->ret = CTL_HA_STATUS_DISCONNECT;
911				req->callback(req);
912				mtx_lock(&softc->ha_lock);
913			}
914			mtx_unlock(&softc->ha_lock);
915		}
916	} else {
917		printf("%s: Unknown event %d\n", __func__, event);
918	}
919}
920
921
922ctl_ha_status
923ctl_ha_msg_init(struct ctl_softc *ctl_softc)
924{
925	struct ha_softc *softc = &ha_softc;
926	int error;
927
928	softc->ha_ctl_softc = ctl_softc;
929	mtx_init(&softc->ha_lock, "CTL HA mutex", NULL, MTX_DEF);
930	mbufq_init(&softc->ha_sendq, INT_MAX);
931	TAILQ_INIT(&softc->ha_dts);
932	error = kproc_kthread_add(ctl_ha_conn_thread, softc,
933	    &ctl_softc->ctl_proc, NULL, 0, 0, "ctl", "ha_tx");
934	if (error != 0) {
935		printf("error creating CTL HA connection thread!\n");
936		mtx_destroy(&softc->ha_lock);
937		return (CTL_HA_STATUS_ERROR);
938	}
939	SYSCTL_ADD_PROC(&ctl_softc->sysctl_ctx,
940	    SYSCTL_CHILDREN(ctl_softc->sysctl_tree),
941	    OID_AUTO, "ha_peer", CTLTYPE_STRING | CTLFLAG_RWTUN,
942	    softc, 0, ctl_ha_peer_sysctl, "A", "HA peer connection method");
943
944	if (ctl_ha_msg_register(CTL_HA_CHAN_DATA, ctl_dt_event_handler)
945	    != CTL_HA_STATUS_SUCCESS) {
946		printf("%s: ctl_ha_msg_register failed.\n", __func__);
947	}
948
949	return (CTL_HA_STATUS_SUCCESS);
950};
951
952ctl_ha_status
953ctl_ha_msg_shutdown(struct ctl_softc *ctl_softc)
954{
955	struct ha_softc *softc = &ha_softc;
956
957	if (ctl_ha_msg_deregister(CTL_HA_CHAN_DATA) != CTL_HA_STATUS_SUCCESS) {
958		printf("%s: ctl_ha_msg_deregister failed.\n", __func__);
959	}
960
961	mtx_destroy(&softc->ha_lock);
962	return (CTL_HA_STATUS_SUCCESS);
963};
964