xenstore.c revision 186557
1/******************************************************************************
2 * xenbus_xs.c
3 *
4 * This is the kernel equivalent of the "xs" library.  We don't need everything
5 * and we use xenbus_comms for communication.
6 *
7 * Copyright (C) 2005 Rusty Russell, IBM Corporation
8 *
9 * This file may be distributed separately from the Linux kernel, or
10 * incorporated into other software packages, subject to the following license:
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this source file (the "Software"), to deal in the Software without
14 * restriction, including without limitation the rights to use, copy, modify,
15 * merge, publish, distribute, sublicense, and/or sell copies of the Software,
16 * and to permit persons to whom the Software is furnished to do so, subject to
17 * the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in
20 * all copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
27 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
28 * IN THE SOFTWARE.
29 */
30
31
32#include <sys/cdefs.h>
33__FBSDID("$FreeBSD: head/sys/xen/xenbus/xenbus_xs.c 186557 2008-12-29 06:31:03Z kmacy $");
34
35#include <sys/param.h>
36#include <sys/uio.h>
37#include <sys/kernel.h>
38#include <sys/lock.h>
39#include <sys/mutex.h>
40#include <sys/sx.h>
41#include <sys/syslog.h>
42#include <sys/malloc.h>
43#include <sys/systm.h>
44#include <sys/proc.h>
45#include <sys/kthread.h>
46#include <sys/unistd.h>
47
48#include <machine/xen/xen-os.h>
49#include <xen/hypervisor.h>
50#include <machine/stdarg.h>
51
52#include <xen/xenbus/xenbusvar.h>
53#include <xen/xenbus/xenbus_comms.h>
54#include <xen/interface/hvm/params.h>
55
56#include <vm/vm.h>
57#include <vm/pmap.h>
58
59static int xs_process_msg(enum xsd_sockmsg_type *type);
60
61int xenwatch_running = 0;
62int xenbus_running = 0;
63int xen_store_evtchn;
64
65struct xs_stored_msg {
66	TAILQ_ENTRY(xs_stored_msg) list;
67
68	struct xsd_sockmsg hdr;
69
70	union {
71		/* Queued replies. */
72		struct {
73			char *body;
74		} reply;
75
76		/* Queued watch events. */
77		struct {
78			struct xenbus_watch *handle;
79			char **vec;
80			unsigned int vec_size;
81		} watch;
82	} u;
83};
84
85struct xs_handle {
86	/* A list of replies. Currently only one will ever be outstanding. */
87	TAILQ_HEAD(xs_handle_list, xs_stored_msg) reply_list;
88	struct mtx reply_lock;
89	int reply_waitq;
90
91	/* One request at a time. */
92	struct sx request_mutex;
93
94	/* Protect transactions against save/restore. */
95	struct sx suspend_mutex;
96};
97
98static struct xs_handle xs_state;
99
100/* List of registered watches, and a lock to protect it. */
101static LIST_HEAD(watch_list_head, xenbus_watch) watches;
102static struct mtx watches_lock;
103/* List of pending watch callback events, and a lock to protect it. */
104static TAILQ_HEAD(event_list_head, xs_stored_msg) watch_events;
105static struct mtx watch_events_lock;
106
107/*
108 * Details of the xenwatch callback kernel thread. The thread waits on the
109 * watch_events_waitq for work to do (queued on watch_events list). When it
110 * wakes up it acquires the xenwatch_mutex before reading the list and
111 * carrying out work.
112 */
113static pid_t xenwatch_pid;
114struct sx xenwatch_mutex;
115static int watch_events_waitq;
116
117#define xsd_error_count	(sizeof(xsd_errors) / sizeof(xsd_errors[0]))
118
119static int
120xs_get_error(const char *errorstring)
121{
122	unsigned int i;
123
124	for (i = 0; i < xsd_error_count; i++) {
125		if (!strcmp(errorstring, xsd_errors[i].errstring))
126			return (xsd_errors[i].errnum);
127	}
128	log(LOG_WARNING, "XENBUS xen store gave: unknown error %s",
129	    errorstring);
130	return (EINVAL);
131}
132
133extern void kdb_backtrace(void);
134
135static int
136xs_read_reply(enum xsd_sockmsg_type *type, unsigned int *len, void **result)
137{
138	struct xs_stored_msg *msg;
139	char *body;
140	int error;
141
142	mtx_lock(&xs_state.reply_lock);
143
144	while (TAILQ_EMPTY(&xs_state.reply_list)) {
145			while (TAILQ_EMPTY(&xs_state.reply_list)) {
146				error = mtx_sleep(&xs_state.reply_waitq,
147				    &xs_state.reply_lock,
148				    PCATCH, "xswait", hz/10);
149				if (error && error != EWOULDBLOCK) {
150					mtx_unlock(&xs_state.reply_lock);
151					return (error);
152				}
153
154			}
155
156
157		}
158
159
160	msg = TAILQ_FIRST(&xs_state.reply_list);
161	TAILQ_REMOVE(&xs_state.reply_list, msg, list);
162
163	mtx_unlock(&xs_state.reply_lock);
164
165	*type = msg->hdr.type;
166	if (len)
167		*len = msg->hdr.len;
168	body = msg->u.reply.body;
169
170	free(msg, M_DEVBUF);
171	*result = body;
172	return (0);
173}
174
175#if 0
176/* Emergency write. UNUSED*/
177void xenbus_debug_write(const char *str, unsigned int count)
178{
179	struct xsd_sockmsg msg = { 0 };
180
181	msg.type = XS_DEBUG;
182	msg.len = sizeof("print") + count + 1;
183
184	sx_xlock(&xs_state.request_mutex);
185	xb_write(&msg, sizeof(msg));
186	xb_write("print", sizeof("print"));
187	xb_write(str, count);
188	xb_write("", 1);
189	sx_xunlock(&xs_state.request_mutex);
190}
191
192#endif
193
194int
195xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void **result)
196{
197	struct xsd_sockmsg req_msg = *msg;
198	int error;
199
200	if (req_msg.type == XS_TRANSACTION_START)
201		sx_slock(&xs_state.suspend_mutex);
202
203	sx_xlock(&xs_state.request_mutex);
204
205	error = xb_write(msg, sizeof(*msg) + msg->len, &xs_state.request_mutex.lock_object);
206	if (error) {
207		msg->type = XS_ERROR;
208	} else {
209		error = xs_read_reply(&msg->type, &msg->len, result);
210	}
211
212	sx_xunlock(&xs_state.request_mutex);
213
214	if ((msg->type == XS_TRANSACTION_END) ||
215	    ((req_msg.type == XS_TRANSACTION_START) &&
216		(msg->type == XS_ERROR)))
217		sx_sunlock(&xs_state.suspend_mutex);
218
219	return (error);
220}
221
222/*
223 * Send message to xs. The reply is returned in *result and should be
224 * fred with free(*result, M_DEVBUF). Return zero on success or an
225 * error code on failure.
226 */
227static int
228xs_talkv(struct xenbus_transaction t, enum xsd_sockmsg_type type,
229    const struct iovec *iovec, unsigned int num_vecs,
230    unsigned int *len, void **result)
231{
232	struct xsd_sockmsg msg;
233	void *ret = NULL;
234	unsigned int i;
235	int error;
236
237	msg.tx_id = t.id;
238	msg.req_id = 0;
239	msg.type = type;
240	msg.len = 0;
241	for (i = 0; i < num_vecs; i++)
242		msg.len += iovec[i].iov_len;
243
244	sx_xlock(&xs_state.request_mutex);
245
246	error = xb_write(&msg, sizeof(msg), &xs_state.request_mutex.lock_object);
247	if (error) {
248		sx_xunlock(&xs_state.request_mutex);
249		printf("xs_talkv failed %d\n", error);
250		return (error);
251	}
252
253	for (i = 0; i < num_vecs; i++) {
254		error = xb_write(iovec[i].iov_base, iovec[i].iov_len, &xs_state.request_mutex.lock_object);
255		if (error) {
256			sx_xunlock(&xs_state.request_mutex);
257			printf("xs_talkv failed %d\n", error);
258			return (error);
259		}
260	}
261
262	error = xs_read_reply(&msg.type, len, &ret);
263
264	sx_xunlock(&xs_state.request_mutex);
265
266	if (error)
267		return (error);
268
269	if (msg.type == XS_ERROR) {
270		error = xs_get_error(ret);
271		free(ret, M_DEVBUF);
272		return (error);
273	}
274
275#if 0
276	if ((xenwatch_running == 0) && (xenwatch_inline == 0)) {
277		xenwatch_inline = 1;
278		while (!TAILQ_EMPTY(&watch_events)
279		    && xenwatch_running == 0) {
280
281			struct xs_stored_msg *wmsg = TAILQ_FIRST(&watch_events);
282			TAILQ_REMOVE(&watch_events, wmsg, list);
283
284			wmsg->u.watch.handle->callback(
285				wmsg->u.watch.handle,
286				(const char **)wmsg->u.watch.vec,
287				wmsg->u.watch.vec_size);
288			free(wmsg->u.watch.vec, M_DEVBUF);
289			free(wmsg, M_DEVBUF);
290		}
291		xenwatch_inline = 0;
292	}
293#endif
294	KASSERT(msg.type == type, ("bad xenstore message type"));
295
296	if (result)
297		*result = ret;
298	else
299		free(ret, M_DEVBUF);
300
301	return (0);
302}
303
304/* Simplified version of xs_talkv: single message. */
305static int
306xs_single(struct xenbus_transaction t, enum xsd_sockmsg_type type,
307    const char *string, unsigned int *len, void **result)
308{
309	struct iovec iovec;
310
311	iovec.iov_base = (void *)(uintptr_t) string;
312	iovec.iov_len = strlen(string) + 1;
313
314	return (xs_talkv(t, type, &iovec, 1, len, result));
315}
316
317static unsigned int
318count_strings(const char *strings, unsigned int len)
319{
320	unsigned int num;
321	const char *p;
322
323	for (p = strings, num = 0; p < strings + len; p += strlen(p) + 1)
324		num++;
325
326	return num;
327}
328
329/* Return the path to dir with /name appended. Buffer must be kfree()'ed. */
330static char *
331join(const char *dir, const char *name)
332{
333	char *buffer;
334
335	buffer = malloc(strlen(dir) + strlen("/") + strlen(name) + 1,
336	    M_DEVBUF, M_WAITOK);
337
338	strcpy(buffer, dir);
339	if (strcmp(name, "")) {
340		strcat(buffer, "/");
341		strcat(buffer, name);
342	}
343
344	return (buffer);
345}
346
347static char **
348split(char *strings, unsigned int len, unsigned int *num)
349{
350	char *p, **ret;
351
352	/* Count the strings. */
353	*num = count_strings(strings, len) + 1;
354
355	/* Transfer to one big alloc for easy freeing. */
356	ret = malloc(*num * sizeof(char *) + len, M_DEVBUF, M_WAITOK);
357	memcpy(&ret[*num], strings, len);
358	free(strings, M_DEVBUF);
359
360	strings = (char *)&ret[*num];
361	for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
362		ret[(*num)++] = p;
363
364	ret[*num] = strings + len;
365
366	return ret;
367}
368
369/*
370 * Return the contents of a directory in *result which should be freed
371 * with free(*result, M_DEVBUF).
372 */
373int
374xenbus_directory(struct xenbus_transaction t, const char *dir,
375    const char *node, unsigned int *num, char ***result)
376{
377	char *strings, *path;
378	unsigned int len = 0;
379	int error;
380
381	path = join(dir, node);
382	error = xs_single(t, XS_DIRECTORY, path, &len, (void **) &strings);
383	free(path, M_DEVBUF);
384	if (error)
385		return (error);
386
387	*result = split(strings, len, num);
388	return (0);
389}
390
391/*
392 * Check if a path exists. Return 1 if it does.
393 */
394int
395xenbus_exists(struct xenbus_transaction t, const char *dir, const char *node)
396{
397	char **d;
398	int error, dir_n;
399
400	error = xenbus_directory(t, dir, node, &dir_n, &d);
401	if (error)
402		return (0);
403	free(d, M_DEVBUF);
404	return (1);
405}
406
407/*
408 * Get the value of a single file.  Returns the contents in *result
409 * which should be freed with free(*result, M_DEVBUF) after use.
410 * The length of the value in bytes is returned in *len.
411 */
412int
413xenbus_read(struct xenbus_transaction t, const char *dir, const char *node,
414    unsigned int *len, void **result)
415{
416	char *path;
417	void *ret;
418	int error;
419
420	path = join(dir, node);
421	error = xs_single(t, XS_READ, path, len, &ret);
422	free(path, M_DEVBUF);
423	if (error)
424		return (error);
425	*result = ret;
426	return (0);
427}
428
429/*
430 * Write the value of a single file.  Returns error on failure.
431 */
432int
433xenbus_write(struct xenbus_transaction t, const char *dir, const char *node,
434    const char *string)
435{
436	char *path;
437	struct iovec iovec[2];
438	int error;
439
440	path = join(dir, node);
441
442	iovec[0].iov_base = (void *)(uintptr_t) path;
443	iovec[0].iov_len = strlen(path) + 1;
444	iovec[1].iov_base = (void *)(uintptr_t) string;
445	iovec[1].iov_len = strlen(string);
446
447	error = xs_talkv(t, XS_WRITE, iovec, 2, NULL, NULL);
448	free(path, M_DEVBUF);
449
450	return (error);
451}
452
453/*
454 * Create a new directory.
455 */
456int
457xenbus_mkdir(struct xenbus_transaction t, const char *dir, const char *node)
458{
459	char *path;
460	int ret;
461
462	path = join(dir, node);
463	ret = xs_single(t, XS_MKDIR, path, NULL, NULL);
464	free(path, M_DEVBUF);
465
466	return (ret);
467}
468
469/*
470 * Destroy a file or directory (directories must be empty).
471 */
472int
473xenbus_rm(struct xenbus_transaction t, const char *dir, const char *node)
474{
475	char *path;
476	int ret;
477
478	path = join(dir, node);
479	ret = xs_single(t, XS_RM, path, NULL, NULL);
480	free(path, M_DEVBUF);
481
482	return (ret);
483}
484
485/*
486 * Start a transaction: changes by others will not be seen during this
487 * transaction, and changes will not be visible to others until end.
488 */
489int
490xenbus_transaction_start(struct xenbus_transaction *t)
491{
492	char *id_str;
493	int error;
494
495	sx_slock(&xs_state.suspend_mutex);
496	error = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL,
497	    (void **) &id_str);
498	if (error) {
499		sx_sunlock(&xs_state.suspend_mutex);
500		return (error);
501	}
502
503	t->id = strtoul(id_str, NULL, 0);
504	free(id_str, M_DEVBUF);
505
506	return (0);
507}
508
509/*
510 * End a transaction.  If abandon is true, transaction is discarded
511 * instead of committed.
512 */
513int xenbus_transaction_end(struct xenbus_transaction t, int abort)
514{
515	char abortstr[2];
516	int error;
517
518	if (abort)
519		strcpy(abortstr, "F");
520	else
521		strcpy(abortstr, "T");
522
523	error = xs_single(t, XS_TRANSACTION_END, abortstr, NULL, NULL);
524
525	sx_sunlock(&xs_state.suspend_mutex);
526
527	return (error);
528}
529
530/* Single read and scanf: returns zero or errno. */
531int
532xenbus_scanf(struct xenbus_transaction t,
533    const char *dir, const char *node, int *scancountp, const char *fmt, ...)
534{
535	va_list ap;
536	int error, ns;
537	char *val;
538
539	error = xenbus_read(t, dir, node, NULL, (void **) &val);
540	if (error)
541		return (error);
542
543	va_start(ap, fmt);
544	ns = vsscanf(val, fmt, ap);
545	va_end(ap);
546	free(val, M_DEVBUF);
547	/* Distinctive errno. */
548	if (ns == 0)
549		return (ERANGE);
550	if (scancountp)
551		*scancountp = ns;
552	return (0);
553}
554
555/* Single printf and write: returns zero or errno. */
556int
557xenbus_printf(struct xenbus_transaction t,
558    const char *dir, const char *node, const char *fmt, ...)
559{
560	va_list ap;
561	int error, ret;
562#define PRINTF_BUFFER_SIZE 4096
563	char *printf_buffer;
564
565	printf_buffer = malloc(PRINTF_BUFFER_SIZE, M_DEVBUF, M_WAITOK);
566
567	va_start(ap, fmt);
568	ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
569	va_end(ap);
570
571	KASSERT(ret <= PRINTF_BUFFER_SIZE-1, ("xenbus_printf: message too large"));
572	error = xenbus_write(t, dir, node, printf_buffer);
573
574	free(printf_buffer, M_DEVBUF);
575
576	return (error);
577}
578
579/* Takes tuples of names, scanf-style args, and void **, NULL terminated. */
580int
581xenbus_gather(struct xenbus_transaction t, const char *dir, ...)
582{
583	va_list ap;
584	const char *name;
585	int error, i;
586
587	for (i = 0; i < 10000; i++)
588		HYPERVISOR_yield();
589
590	va_start(ap, dir);
591	error = 0;
592	while (error == 0 && (name = va_arg(ap, char *)) != NULL) {
593		const char *fmt = va_arg(ap, char *);
594		void *result = va_arg(ap, void *);
595		char *p;
596
597		error = xenbus_read(t, dir, name, NULL, (void **) &p);
598		if (error)
599			break;
600
601		if (fmt) {
602			if (sscanf(p, fmt, result) == 0)
603				error = EINVAL;
604			free(p, M_DEVBUF);
605		} else
606			*(char **)result = p;
607	}
608	va_end(ap);
609
610	return (error);
611}
612
613static int
614xs_watch(const char *path, const char *token)
615{
616	struct iovec iov[2];
617
618	iov[0].iov_base = (void *)(uintptr_t) path;
619	iov[0].iov_len = strlen(path) + 1;
620	iov[1].iov_base = (void *)(uintptr_t) token;
621	iov[1].iov_len = strlen(token) + 1;
622
623	return (xs_talkv(XBT_NIL, XS_WATCH, iov, 2, NULL, NULL));
624}
625
626static int
627xs_unwatch(const char *path, const char *token)
628{
629	struct iovec iov[2];
630
631	iov[0].iov_base = (void *)(uintptr_t) path;
632	iov[0].iov_len = strlen(path) + 1;
633	iov[1].iov_base = (void *)(uintptr_t) token;
634	iov[1].iov_len = strlen(token) + 1;
635
636	return (xs_talkv(XBT_NIL, XS_UNWATCH, iov, 2, NULL, NULL));
637}
638
639static struct xenbus_watch *
640find_watch(const char *token)
641{
642	struct xenbus_watch *i, *cmp;
643
644	cmp = (void *)strtoul(token, NULL, 16);
645
646	LIST_FOREACH(i, &watches, list)
647		if (i == cmp)
648			return (i);
649
650	return (NULL);
651}
652
653/* Register callback to watch this node. */
654int
655register_xenbus_watch(struct xenbus_watch *watch)
656{
657	/* Pointer in ascii is the token. */
658	char token[sizeof(watch) * 2 + 1];
659	int error;
660
661	sprintf(token, "%lX", (long)watch);
662
663	sx_slock(&xs_state.suspend_mutex);
664
665	mtx_lock(&watches_lock);
666	KASSERT(find_watch(token) == NULL, ("watch already registered"));
667	LIST_INSERT_HEAD(&watches, watch, list);
668	mtx_unlock(&watches_lock);
669
670	error = xs_watch(watch->node, token);
671
672	/* Ignore errors due to multiple registration. */
673	if (error == EEXIST) {
674		mtx_lock(&watches_lock);
675		LIST_REMOVE(watch, list);
676		mtx_unlock(&watches_lock);
677	}
678
679	sx_sunlock(&xs_state.suspend_mutex);
680
681	return (error);
682}
683
684void
685unregister_xenbus_watch(struct xenbus_watch *watch)
686{
687	struct xs_stored_msg *msg, *tmp;
688	char token[sizeof(watch) * 2 + 1];
689	int error;
690
691	sprintf(token, "%lX", (long)watch);
692
693	sx_slock(&xs_state.suspend_mutex);
694
695	mtx_lock(&watches_lock);
696	KASSERT(find_watch(token), ("watch not registered"));
697	LIST_REMOVE(watch, list);
698	mtx_unlock(&watches_lock);
699
700	error = xs_unwatch(watch->node, token);
701	if (error)
702		log(LOG_WARNING, "XENBUS Failed to release watch %s: %i\n",
703		    watch->node, error);
704
705	sx_sunlock(&xs_state.suspend_mutex);
706
707	/* Cancel pending watch events. */
708	mtx_lock(&watch_events_lock);
709	TAILQ_FOREACH_SAFE(msg, &watch_events, list, tmp) {
710		if (msg->u.watch.handle != watch)
711			continue;
712		TAILQ_REMOVE(&watch_events, msg, list);
713		free(msg->u.watch.vec, M_DEVBUF);
714		free(msg, M_DEVBUF);
715	}
716	mtx_unlock(&watch_events_lock);
717
718	/* Flush any currently-executing callback, unless we are it. :-) */
719	if (curproc->p_pid != xenwatch_pid) {
720		sx_xlock(&xenwatch_mutex);
721		sx_xunlock(&xenwatch_mutex);
722	}
723}
724
725void
726xs_suspend(void)
727{
728
729	sx_xlock(&xs_state.suspend_mutex);
730	sx_xlock(&xs_state.request_mutex);
731}
732
733void
734xs_resume(void)
735{
736	struct xenbus_watch *watch;
737	char token[sizeof(watch) * 2 + 1];
738
739	sx_xunlock(&xs_state.request_mutex);
740
741	/* No need for watches_lock: the suspend_mutex is sufficient. */
742	LIST_FOREACH(watch, &watches, list) {
743		sprintf(token, "%lX", (long)watch);
744		xs_watch(watch->node, token);
745	}
746
747	sx_xunlock(&xs_state.suspend_mutex);
748}
749
750static void
751xenwatch_thread(void *unused)
752{
753	struct xs_stored_msg *msg;
754
755	for (;;) {
756
757		mtx_lock(&watch_events_lock);
758		while (TAILQ_EMPTY(&watch_events))
759			mtx_sleep(&watch_events_waitq,
760			    &watch_events_lock,
761			    PWAIT | PCATCH, "waitev", hz/10);
762
763		mtx_unlock(&watch_events_lock);
764		sx_xlock(&xenwatch_mutex);
765
766		mtx_lock(&watch_events_lock);
767		msg = TAILQ_FIRST(&watch_events);
768		if (msg)
769			TAILQ_REMOVE(&watch_events, msg, list);
770		mtx_unlock(&watch_events_lock);
771
772		if (msg != NULL) {
773			msg->u.watch.handle->callback(
774				msg->u.watch.handle,
775				(const char **)msg->u.watch.vec,
776				msg->u.watch.vec_size);
777			free(msg->u.watch.vec, M_DEVBUF);
778			free(msg, M_DEVBUF);
779		}
780
781		sx_xunlock(&xenwatch_mutex);
782	}
783}
784
785static int
786xs_process_msg(enum xsd_sockmsg_type *type)
787{
788	struct xs_stored_msg *msg;
789	char *body;
790	int error;
791
792	msg = malloc(sizeof(*msg), M_DEVBUF, M_WAITOK);
793	mtx_lock(&xs_state.reply_lock);
794	error = xb_read(&msg->hdr, sizeof(msg->hdr), &xs_state.reply_lock.lock_object);
795	mtx_unlock(&xs_state.reply_lock);
796	if (error) {
797		free(msg, M_DEVBUF);
798		return (error);
799	}
800
801	body = malloc(msg->hdr.len + 1, M_DEVBUF, M_WAITOK);
802	mtx_lock(&xs_state.reply_lock);
803	error = xb_read(body, msg->hdr.len, &xs_state.reply_lock.lock_object);
804	mtx_unlock(&xs_state.reply_lock);
805	if (error) {
806		free(body, M_DEVBUF);
807		free(msg, M_DEVBUF);
808		return (error);
809	}
810	body[msg->hdr.len] = '\0';
811
812	*type = msg->hdr.type;
813	if (msg->hdr.type == XS_WATCH_EVENT) {
814		msg->u.watch.vec = split(body, msg->hdr.len,
815		    &msg->u.watch.vec_size);
816
817		mtx_lock(&watches_lock);
818		msg->u.watch.handle = find_watch(
819			msg->u.watch.vec[XS_WATCH_TOKEN]);
820		if (msg->u.watch.handle != NULL) {
821			mtx_lock(&watch_events_lock);
822			TAILQ_INSERT_TAIL(&watch_events, msg, list);
823			wakeup(&watch_events_waitq);
824			mtx_unlock(&watch_events_lock);
825		} else {
826			free(msg->u.watch.vec, M_DEVBUF);
827			free(msg, M_DEVBUF);
828		}
829		mtx_unlock(&watches_lock);
830	} else {
831		msg->u.reply.body = body;
832		mtx_lock(&xs_state.reply_lock);
833		TAILQ_INSERT_TAIL(&xs_state.reply_list, msg, list);
834		wakeup(&xs_state.reply_waitq);
835		mtx_unlock(&xs_state.reply_lock);
836	}
837
838	return 0;
839}
840
841static void
842xenbus_thread(void *unused)
843{
844	int error;
845	enum xsd_sockmsg_type type;
846	xenbus_running = 1;
847
848	for (;;) {
849		error = xs_process_msg(&type);
850		if (error)
851			printf("XENBUS error %d while reading message\n",
852			    error);
853	}
854}
855
856#ifdef XENHVM
857static unsigned long xen_store_mfn;
858char *xen_store;
859
860static inline unsigned long
861hvm_get_parameter(int index)
862{
863	struct xen_hvm_param xhv;
864	int error;
865
866	xhv.domid = DOMID_SELF;
867	xhv.index = index;
868	error = HYPERVISOR_hvm_op(HVMOP_get_param, &xhv);
869	if (error) {
870		printf("hvm_get_parameter: failed to get %d, error %d\n",
871		    index, error);
872		return (0);
873	}
874	return (xhv.value);
875}
876
877#endif
878
879int
880xs_init(void)
881{
882	int error;
883	struct proc *p;
884
885#ifdef XENHVM
886	xen_store_evtchn = hvm_get_parameter(HVM_PARAM_STORE_EVTCHN);
887	xen_store_mfn = hvm_get_parameter(HVM_PARAM_STORE_PFN);
888	xen_store = pmap_mapdev(xen_store_mfn * PAGE_SIZE, PAGE_SIZE);
889#else
890	xen_store_evtchn = xen_start_info->store_evtchn;
891#endif
892
893	TAILQ_INIT(&xs_state.reply_list);
894	TAILQ_INIT(&watch_events);
895	sx_init(&xenwatch_mutex, "xenwatch");
896
897
898	mtx_init(&xs_state.reply_lock, "state reply", NULL, MTX_DEF);
899	sx_init(&xs_state.request_mutex, "xenstore request");
900	sx_init(&xs_state.suspend_mutex, "xenstore suspend");
901
902
903#if 0
904	mtx_init(&xs_state.suspend_mutex, "xenstore suspend", NULL, MTX_DEF);
905	sema_init(&xs_state.request_mutex, 1, "xenstore request");
906	sema_init(&xenwatch_mutex, 1, "xenwatch");
907#endif
908	mtx_init(&watches_lock, "watches", NULL, MTX_DEF);
909	mtx_init(&watch_events_lock, "watch events", NULL, MTX_DEF);
910
911	/* Initialize the shared memory rings to talk to xenstored */
912	error = xb_init_comms();
913	if (error)
914		return (error);
915
916	xenwatch_running = 1;
917	error = kproc_create(xenwatch_thread, NULL, &p,
918	    RFHIGHPID, 0, "xenwatch");
919	if (error)
920		return (error);
921	xenwatch_pid = p->p_pid;
922
923	error = kproc_create(xenbus_thread, NULL, NULL,
924	    RFHIGHPID, 0, "xenbus");
925
926	return (error);
927}
928