xenstore.c revision 184958
1/******************************************************************************
2 * xenbus_xs.c
3 *
4 * This is the kernel equivalent of the "xs" library.  We don't need everything
5 * and we use xenbus_comms for communication.
6 *
7 * Copyright (C) 2005 Rusty Russell, IBM Corporation
8 *
9 * This file may be distributed separately from the Linux kernel, or
10 * incorporated into other software packages, subject to the following license:
11 *
12 * Permission is hereby granted, free of charge, to any person obtaining a copy
13 * of this source file (the "Software"), to deal in the Software without
14 * restriction, including without limitation the rights to use, copy, modify,
15 * merge, publish, distribute, sublicense, and/or sell copies of the Software,
16 * and to permit persons to whom the Software is furnished to do so, subject to
17 * the following conditions:
18 *
19 * The above copyright notice and this permission notice shall be included in
20 * all copies or substantial portions of the Software.
21 *
22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
27 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
28 * IN THE SOFTWARE.
29 */
30
31
32#include <sys/cdefs.h>
33__FBSDID("$FreeBSD: head/sys/xen/xenbus/xenbus_xs.c 184958 2008-11-14 07:06:27Z kmacy $");
34
35#include <sys/param.h>
36#include <sys/types.h>
37#include <sys/cdefs.h>
38#include <sys/unistd.h>
39#include <sys/errno.h>
40#include <sys/uio.h>
41#include <sys/kernel.h>
42#include <sys/time.h>
43#include <sys/lock.h>
44#include <sys/mutex.h>
45#include <sys/sx.h>
46#include <sys/sema.h>
47#include <sys/syslog.h>
48#include <sys/malloc.h>
49#include <sys/libkern.h>
50#include <sys/systm.h>
51#include <sys/proc.h>
52#include <sys/kthread.h>
53
54#include <machine/xen/xen-os.h>
55#include <machine/xen/hypervisor.h>
56#include <machine/xen/xenbus.h>
57#include <machine/stdarg.h>
58
59#include <xen/xenbus/xenbus_comms.h>
60static int xs_process_msg(enum xsd_sockmsg_type *type);
61
62#define kmalloc(size, unused) malloc(size, M_DEVBUF, M_WAITOK)
63#define BUG_ON        PANIC_IF
64#define DEFINE_SPINLOCK(lock) struct mtx lock
65#define u32           uint32_t
66#define list_del(head, ent)      TAILQ_REMOVE(head, ent, list)
67#define simple_strtoul strtoul
68#define ARRAY_SIZE(x) (sizeof(x)/sizeof(x[0]))
69#define list_empty    TAILQ_EMPTY
70
71#define streq(a, b) (strcmp((a), (b)) == 0)
72int xenwatch_running = 0;
73int xenbus_running = 0;
74
75struct kvec {
76		const void *iov_base;
77		size_t      iov_len;
78};
79
80struct xs_stored_msg {
81		TAILQ_ENTRY(xs_stored_msg) list;
82
83		struct xsd_sockmsg hdr;
84
85		union {
86				/* Queued replies. */
87				struct {
88						char *body;
89				} reply;
90
91				/* Queued watch events. */
92				struct {
93						struct xenbus_watch *handle;
94						char **vec;
95						unsigned int vec_size;
96				} watch;
97		} u;
98};
99
100struct xs_handle {
101		/* A list of replies. Currently only one will ever be outstanding. */
102		TAILQ_HEAD(xs_handle_list, xs_stored_msg) reply_list;
103		struct mtx reply_lock;
104		int reply_waitq;
105
106		/* One request at a time. */
107		struct sx request_mutex;
108
109		/* Protect transactions against save/restore. */
110		struct sx suspend_mutex;
111};
112
113static struct xs_handle xs_state;
114
115/* List of registered watches, and a lock to protect it. */
116static LIST_HEAD(watch_list_head, xenbus_watch) watches;
117static DEFINE_SPINLOCK(watches_lock);
118/* List of pending watch callback events, and a lock to protect it. */
119static TAILQ_HEAD(event_list_head, xs_stored_msg) watch_events;
120static DEFINE_SPINLOCK(watch_events_lock);
121/*
122 * Details of the xenwatch callback kernel thread. The thread waits on the
123 * watch_events_waitq for work to do (queued on watch_events list). When it
124 * wakes up it acquires the xenwatch_mutex before reading the list and
125 * carrying out work.
126 */
127static pid_t xenwatch_pid;
128struct sx xenwatch_mutex;
129static int watch_events_waitq;
130
131static int get_error(const char *errorstring)
132{
133		unsigned int i;
134
135		for (i = 0; !streq(errorstring, xsd_errors[i].errstring); i++) {
136				if (i == ARRAY_SIZE(xsd_errors) - 1) {
137						log(LOG_WARNING, "XENBUS xen store gave: unknown error %s",
138							   errorstring);
139						return EINVAL;
140				}
141		}
142		return xsd_errors[i].errnum;
143}
144
145extern void idle_block(void);
146extern void kdb_backtrace(void);
147
148static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
149{
150		struct xs_stored_msg *msg;
151		char *body;
152		int i, err;
153		enum xsd_sockmsg_type itype = *type;
154
155		printf("read_reply ");
156		if (xenbus_running == 0) {
157				/*
158				 * Give other domain time to run :-/
159				 */
160				for (i = 0; i < 1000000 && (xenbus_running == 0); i++) {
161						err = xs_process_msg(type);
162
163						if ((err == 0)
164							&& (*type != XS_WATCH_EVENT))
165								break;
166
167						HYPERVISOR_yield();
168				}
169
170				if (list_empty(&xs_state.reply_list)) {
171						printf("giving up and returning an error type=%d\n",
172								*type);
173						kdb_backtrace();
174 						return (ERR_PTR(-1));
175				}
176
177		}
178
179		mtx_lock(&xs_state.reply_lock);
180		if (xenbus_running) {
181				while (list_empty(&xs_state.reply_list)) {
182						mtx_unlock(&xs_state.reply_lock);
183						wait_event_interruptible(&xs_state.reply_waitq,
184												 !list_empty(&xs_state.reply_list));
185
186						mtx_lock(&xs_state.reply_lock);
187				}
188		}
189
190		msg = TAILQ_FIRST(&xs_state.reply_list);
191		list_del(&xs_state.reply_list, msg);
192
193		mtx_unlock(&xs_state.reply_lock);
194
195		printf("itype=%d htype=%d ", itype, msg->hdr.type);
196		*type = msg->hdr.type;
197		if (len)
198				*len = msg->hdr.len;
199		body = msg->u.reply.body;
200
201		kfree(msg);
202		if (len)
203				printf("len=%d\n", *len);
204		else
205				printf("len=NULL\n");
206		return body;
207}
208
209#if 0
210/* Emergency write. UNUSED*/
211void xenbus_debug_write(const char *str, unsigned int count)
212{
213		struct xsd_sockmsg msg = { 0 };
214
215		msg.type = XS_DEBUG;
216		msg.len = sizeof("print") + count + 1;
217
218		sx_xlock(&xs_state.request_mutex);
219		xb_write(&msg, sizeof(msg));
220		xb_write("print", sizeof("print"));
221		xb_write(str, count);
222		xb_write("", 1);
223		sx_xunlock(&xs_state.request_mutex);
224}
225
226#endif
227void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
228{
229		void *ret;
230		struct xsd_sockmsg req_msg = *msg;
231		int err;
232
233		if (req_msg.type == XS_TRANSACTION_START)
234				sx_slock(&xs_state.suspend_mutex);
235
236		sx_xlock(&xs_state.request_mutex);
237
238		err = xb_write(msg, sizeof(*msg) + msg->len);
239		if (err) {
240				msg->type = XS_ERROR;
241				ret = ERR_PTR(err);
242		} else {
243				ret = read_reply(&msg->type, &msg->len);
244		}
245
246		sx_xunlock(&xs_state.request_mutex);
247
248		if ((msg->type == XS_TRANSACTION_END) ||
249			((req_msg.type == XS_TRANSACTION_START) &&
250			 (msg->type == XS_ERROR)))
251				sx_sunlock(&xs_state.suspend_mutex);
252
253		return ret;
254}
255
256static int xenwatch_inline;
257
258/* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
259static void *xs_talkv(struct xenbus_transaction t,
260					  enum xsd_sockmsg_type type,
261					  const struct kvec *iovec,
262					  unsigned int num_vecs,
263					  unsigned int *len)
264{
265		struct xsd_sockmsg msg;
266		void *ret = NULL;
267		unsigned int i;
268		int err;
269
270		msg.tx_id = t.id;
271		msg.req_id = 0;
272		msg.type = type;
273		msg.len = 0;
274		for (i = 0; i < num_vecs; i++)
275				msg.len += iovec[i].iov_len;
276
277		printf("xs_talkv ");
278
279		sx_xlock(&xs_state.request_mutex);
280
281		err = xb_write(&msg, sizeof(msg));
282		if (err) {
283				sx_xunlock(&xs_state.request_mutex);
284				printf("xs_talkv failed %d\n", err);
285				return ERR_PTR(err);
286		}
287
288		for (i = 0; i < num_vecs; i++) {
289				err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
290				if (err) {
291						sx_xunlock(&xs_state.request_mutex);
292						printf("xs_talkv failed %d\n", err);
293						return ERR_PTR(err);
294				}
295		}
296
297		ret = read_reply(&msg.type, len);
298
299		sx_xunlock(&xs_state.request_mutex);
300
301		if (IS_ERR(ret))
302				return ret;
303
304		if (msg.type == XS_ERROR) {
305				err = get_error(ret);
306				kfree(ret);
307				return ERR_PTR(-err);
308		}
309
310		if ((xenwatch_running == 0) && (xenwatch_inline == 0)) {
311				xenwatch_inline = 1;
312				while (!TAILQ_EMPTY(&watch_events)
313						&& xenwatch_running == 0) {
314
315						struct xs_stored_msg *wmsg = TAILQ_FIRST(&watch_events);
316						list_del(&watch_events, wmsg);
317						printf("handling %p ...", wmsg->u.watch.handle->callback);
318
319							   wmsg->u.watch.handle->callback(
320								wmsg->u.watch.handle,
321								(const char **)wmsg->u.watch.vec,
322								wmsg->u.watch.vec_size);
323						printf("... %p done\n", wmsg->u.watch.handle->callback);
324							   kfree(wmsg->u.watch.vec);
325						kfree(wmsg);
326				}
327				xenwatch_inline = 0;
328		}
329		BUG_ON(msg.type != type);
330
331		return ret;
332}
333
334/* Simplified version of xs_talkv: single message. */
335static void *xs_single(struct xenbus_transaction t,
336					   enum xsd_sockmsg_type type,
337					   const char *string,
338					   unsigned int *len)
339{
340		struct kvec iovec;
341
342		printf("xs_single %s ", string);
343		iovec.iov_base = (const void *)string;
344		iovec.iov_len = strlen(string) + 1;
345		return xs_talkv(t, type, &iovec, 1, len);
346}
347
348/* Many commands only need an ack, don't care what it says. */
349static int xs_error(char *reply)
350{
351		if (IS_ERR(reply))
352				return PTR_ERR(reply);
353		kfree(reply);
354		return 0;
355}
356
357static unsigned int count_strings(const char *strings, unsigned int len)
358{
359		unsigned int num;
360		const char *p;
361
362		for (p = strings, num = 0; p < strings + len; p += strlen(p) + 1)
363				num++;
364
365		return num;
366}
367
368/* Return the path to dir with /name appended. Buffer must be kfree()'ed. */
369static char *join(const char *dir, const char *name)
370{
371		char *buffer;
372
373		buffer = kmalloc(strlen(dir) + strlen("/") + strlen(name) + 1,
374						 GFP_KERNEL);
375		if (buffer == NULL)
376				return ERR_PTR(-ENOMEM);
377
378		strcpy(buffer, dir);
379		if (!streq(name, "")) {
380				strcat(buffer, "/");
381				strcat(buffer, name);
382		}
383
384		return buffer;
385}
386
387static char **split(char *strings, unsigned int len, unsigned int *num)
388{
389		char *p, **ret;
390
391		/* Count the strings. */
392		*num = count_strings(strings, len) + 1;
393
394		/* Transfer to one big alloc for easy freeing. */
395		ret = kmalloc(*num * sizeof(char *) + len, GFP_KERNEL);
396		if (!ret) {
397				kfree(strings);
398				return ERR_PTR(-ENOMEM);
399		}
400		memcpy(&ret[*num], strings, len);
401		kfree(strings);
402
403		strings = (char *)&ret[*num];
404		for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
405				ret[(*num)++] = p;
406
407		ret[*num] = strings + len;
408
409		return ret;
410}
411
412char **xenbus_directory(struct xenbus_transaction t,
413						const char *dir, const char *node, unsigned int *num)
414{
415		char *strings, *path;
416		unsigned int len = 0;
417
418		path = join(dir, node);
419		if (IS_ERR(path))
420				return (char **)path;
421
422		strings = xs_single(t, XS_DIRECTORY, path, &len);
423		kfree(path);
424		if (IS_ERR(strings))
425				return (char **)strings;
426
427		return split(strings, len, num);
428}
429EXPORT_SYMBOL(xenbus_directory);
430
431/* Check if a path exists. Return 1 if it does. */
432int xenbus_exists(struct xenbus_transaction t,
433				  const char *dir, const char *node)
434{
435		char **d;
436		int dir_n;
437
438		d = xenbus_directory(t, dir, node, &dir_n);
439		if (IS_ERR(d))
440				return 0;
441		kfree(d);
442		return 1;
443}
444EXPORT_SYMBOL(xenbus_exists);
445
446/* Get the value of a single file.
447 * Returns a kmalloced value: call free() on it after use.
448 * len indicates length in bytes.
449 */
450void *xenbus_read(struct xenbus_transaction t,
451				  const char *dir, const char *node, unsigned int *len)
452{
453		char *path;
454		void *ret;
455
456		path = join(dir, node);
457		if (IS_ERR(path))
458				return (void *)path;
459
460		printf("xs_read ");
461		ret = xs_single(t, XS_READ, path, len);
462		kfree(path);
463		return ret;
464}
465EXPORT_SYMBOL(xenbus_read);
466
467/* Write the value of a single file.
468 * Returns -err on failure.
469 */
470int xenbus_write(struct xenbus_transaction t,
471				 const char *dir, const char *node, const char *string)
472{
473		char *path;
474		struct kvec iovec[2];
475		int ret;
476
477		path = join(dir, node);
478		if (IS_ERR(path))
479				return PTR_ERR(path);
480
481		iovec[0].iov_base = path;
482		iovec[0].iov_len = strlen(path) + 1;
483		iovec[1].iov_base = string;
484		iovec[1].iov_len = strlen(string);
485
486		printf("xenbus_write dir=%s val=%s ", dir, string);
487		ret = xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
488		kfree(path);
489		return ret;
490}
491EXPORT_SYMBOL(xenbus_write);
492
493/* Create a new directory. */
494int xenbus_mkdir(struct xenbus_transaction t,
495				 const char *dir, const char *node)
496{
497		char *path;
498		int ret;
499
500		path = join(dir, node);
501		if (IS_ERR(path))
502				return PTR_ERR(path);
503
504		ret = xs_error(xs_single(t, XS_MKDIR, path, NULL));
505		kfree(path);
506		return ret;
507}
508EXPORT_SYMBOL(xenbus_mkdir);
509
510/* Destroy a file or directory (directories must be empty). */
511int xenbus_rm(struct xenbus_transaction t, const char *dir, const char *node)
512{
513		char *path;
514		int ret;
515
516		path = join(dir, node);
517		if (IS_ERR(path))
518				return PTR_ERR(path);
519
520		ret = xs_error(xs_single(t, XS_RM, path, NULL));
521		kfree(path);
522		return ret;
523}
524EXPORT_SYMBOL(xenbus_rm);
525
526/* Start a transaction: changes by others will not be seen during this
527 * transaction, and changes will not be visible to others until end.
528 */
529int xenbus_transaction_start(struct xenbus_transaction *t)
530{
531		char *id_str;
532
533		sx_slock(&xs_state.suspend_mutex);
534		id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL);
535		if (IS_ERR(id_str)) {
536				sx_sunlock(&xs_state.suspend_mutex);
537				return PTR_ERR(id_str);
538		}
539
540		t->id = simple_strtoul(id_str, NULL, 0);
541		kfree(id_str);
542
543		return 0;
544}
545EXPORT_SYMBOL(xenbus_transaction_start);
546
547/* End a transaction.
548 * If abandon is true, transaction is discarded instead of committed.
549 */
550int xenbus_transaction_end(struct xenbus_transaction t, int abort)
551{
552		char abortstr[2];
553		int err;
554
555		if (abort)
556				strcpy(abortstr, "F");
557		else
558				strcpy(abortstr, "T");
559
560		printf("xenbus_transaction_end ");
561		err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
562
563		sx_sunlock(&xs_state.suspend_mutex);
564
565		return err;
566}
567EXPORT_SYMBOL(xenbus_transaction_end);
568
569/* Single read and scanf: returns -errno or num scanned. */
570int xenbus_scanf(struct xenbus_transaction t,
571				 const char *dir, const char *node, const char *fmt, ...)
572{
573		va_list ap;
574		int ret;
575		char *val;
576
577		val = xenbus_read(t, dir, node, NULL);
578		if (IS_ERR(val))
579				return PTR_ERR(val);
580
581		va_start(ap, fmt);
582		ret = vsscanf(val, fmt, ap);
583		va_end(ap);
584		kfree(val);
585		/* Distinctive errno. */
586		if (ret == 0)
587				return -ERANGE;
588		return ret;
589}
590EXPORT_SYMBOL(xenbus_scanf);
591
592/* Single printf and write: returns -errno or 0. */
593int xenbus_printf(struct xenbus_transaction t,
594				  const char *dir, const char *node, const char *fmt, ...)
595{
596		va_list ap;
597		int ret;
598#define PRINTF_BUFFER_SIZE 4096
599		char *printf_buffer;
600
601		printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
602		if (printf_buffer == NULL)
603				return -ENOMEM;
604
605		va_start(ap, fmt);
606		ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
607		va_end(ap);
608
609		BUG_ON(ret > PRINTF_BUFFER_SIZE-1);
610		ret = xenbus_write(t, dir, node, printf_buffer);
611
612		kfree(printf_buffer);
613
614		return ret;
615}
616EXPORT_SYMBOL(xenbus_printf);
617
618/* Takes tuples of names, scanf-style args, and void **, NULL terminated. */
619int xenbus_gather(struct xenbus_transaction t, const char *dir, ...)
620{
621		va_list ap;
622		const char *name;
623		int i, ret = 0;
624
625		for (i = 0; i < 10000; i++)
626				HYPERVISOR_yield();
627
628		printf("gather ");
629		va_start(ap, dir);
630		while (ret == 0 && (name = va_arg(ap, char *)) != NULL) {
631				const char *fmt = va_arg(ap, char *);
632				void *result = va_arg(ap, void *);
633				char *p;
634
635				p = xenbus_read(t, dir, name, NULL);
636				if (IS_ERR(p)) {
637						ret = PTR_ERR(p);
638						break;
639				}
640				printf(" %s ", p);
641				if (fmt) {
642						if (sscanf(p, fmt, result) == 0)
643								ret = -EINVAL;
644						kfree(p);
645				} else
646						*(char **)result = p;
647		}
648		va_end(ap);
649		printf("\n");
650		return ret;
651}
652EXPORT_SYMBOL(xenbus_gather);
653
654static int xs_watch(const char *path, const char *token)
655{
656		struct kvec iov[2];
657
658		iov[0].iov_base = path;
659		iov[0].iov_len = strlen(path) + 1;
660		iov[1].iov_base = token;
661		iov[1].iov_len = strlen(token) + 1;
662
663		return xs_error(xs_talkv(XBT_NIL, XS_WATCH, iov,
664								 ARRAY_SIZE(iov), NULL));
665}
666
667static int xs_unwatch(const char *path, const char *token)
668{
669		struct kvec iov[2];
670
671		iov[0].iov_base = path;
672		iov[0].iov_len = strlen(path) + 1;
673		iov[1].iov_base = token;
674		iov[1].iov_len = strlen(token) + 1;
675
676		return xs_error(xs_talkv(XBT_NIL, XS_UNWATCH, iov,
677								 ARRAY_SIZE(iov), NULL));
678}
679
680static struct xenbus_watch *find_watch(const char *token)
681{
682		struct xenbus_watch *i, *cmp;
683
684		cmp = (void *)simple_strtoul(token, NULL, 16);
685
686		LIST_FOREACH(i, &watches, list)
687				if (i == cmp)
688						return i;
689
690		return NULL;
691}
692
693/* Register callback to watch this node. */
694int register_xenbus_watch(struct xenbus_watch *watch)
695{
696		/* Pointer in ascii is the token. */
697		char token[sizeof(watch) * 2 + 1];
698		int err;
699
700		sprintf(token, "%lX", (long)watch);
701
702		sx_slock(&xs_state.suspend_mutex);
703
704		mtx_lock(&watches_lock);
705		BUG_ON(find_watch(token) != NULL);
706		LIST_INSERT_HEAD(&watches, watch, list);
707		mtx_unlock(&watches_lock);
708
709		err = xs_watch(watch->node, token);
710
711		/* Ignore errors due to multiple registration. */
712		if ((err != 0) && (err != -EEXIST)) {
713				mtx_lock(&watches_lock);
714				LIST_REMOVE(watch, list);
715				mtx_unlock(&watches_lock);
716		}
717
718		sx_sunlock(&xs_state.suspend_mutex);
719
720		return err;
721}
722EXPORT_SYMBOL(register_xenbus_watch);
723
724void unregister_xenbus_watch(struct xenbus_watch *watch)
725{
726		struct xs_stored_msg *msg, *tmp;
727		char token[sizeof(watch) * 2 + 1];
728		int err;
729
730		sprintf(token, "%lX", (long)watch);
731
732		sx_slock(&xs_state.suspend_mutex);
733
734		mtx_lock(&watches_lock);
735		BUG_ON(!find_watch(token));
736		LIST_REMOVE(watch, list);
737		mtx_unlock(&watches_lock);
738
739		err = xs_unwatch(watch->node, token);
740		if (err)
741				log(LOG_WARNING, "XENBUS Failed to release watch %s: %i\n",
742					   watch->node, err);
743
744		sx_sunlock(&xs_state.suspend_mutex);
745
746		/* Cancel pending watch events. */
747		mtx_lock(&watch_events_lock);
748		TAILQ_FOREACH_SAFE(msg, &watch_events, list, tmp) {
749				if (msg->u.watch.handle != watch)
750						continue;
751				list_del(&watch_events, msg);
752				kfree(msg->u.watch.vec);
753				kfree(msg);
754		}
755		mtx_unlock(&watch_events_lock);
756
757		/* Flush any currently-executing callback, unless we are it. :-) */
758		if (curproc->p_pid != xenwatch_pid) {
759				sx_xlock(&xenwatch_mutex);
760				sx_xunlock(&xenwatch_mutex);
761		}
762}
763EXPORT_SYMBOL(unregister_xenbus_watch);
764
765void xs_suspend(void)
766{
767		sx_xlock(&xs_state.suspend_mutex);
768		sx_xlock(&xs_state.request_mutex);
769}
770
771void xs_resume(void)
772{
773		struct xenbus_watch *watch;
774		char token[sizeof(watch) * 2 + 1];
775
776		sx_xunlock(&xs_state.request_mutex);
777
778		/* No need for watches_lock: the suspend_mutex is sufficient. */
779		LIST_FOREACH(watch, &watches, list) {
780				sprintf(token, "%lX", (long)watch);
781				xs_watch(watch->node, token);
782		}
783
784		sx_xunlock(&xs_state.suspend_mutex);
785}
786
787static void xenwatch_thread(void *unused)
788{
789		struct xs_stored_msg *msg;
790
791		xenwatch_running = 1;
792
793		DELAY(100000);
794		while (xenwatch_inline) {
795				printf("xenwatch inline still running\n");
796				DELAY(100000);
797		}
798
799
800		for (;;) {
801
802				while (list_empty(&watch_events))
803						tsleep(&watch_events_waitq,
804							   PWAIT | PCATCH, "waitev", hz/10);
805
806				sx_xlock(&xenwatch_mutex);
807
808				mtx_lock(&watch_events_lock);
809				msg = TAILQ_FIRST(&watch_events);
810				if (msg)
811						list_del(&watch_events, msg);
812				mtx_unlock(&watch_events_lock);
813
814				if (msg != NULL) {
815						msg->u.watch.handle->callback(
816								msg->u.watch.handle,
817								(const char **)msg->u.watch.vec,
818								msg->u.watch.vec_size);
819						kfree(msg->u.watch.vec);
820						kfree(msg);
821				}
822
823				sx_xunlock(&xenwatch_mutex);
824		}
825}
826
827static int xs_process_msg(enum xsd_sockmsg_type *type)
828{
829		struct xs_stored_msg *msg;
830		char *body;
831		int err;
832
833		msg = kmalloc(sizeof(*msg), GFP_KERNEL);
834		if (msg == NULL)
835				return -ENOMEM;
836
837		err = xb_read(&msg->hdr, sizeof(msg->hdr));
838		if (err) {
839				kfree(msg);
840				return err;
841		}
842
843		body = kmalloc(msg->hdr.len + 1, GFP_KERNEL);
844		if (body == NULL) {
845				kfree(msg);
846				return -ENOMEM;
847		}
848
849		err = xb_read(body, msg->hdr.len);
850		if (err) {
851				kfree(body);
852				kfree(msg);
853				return err;
854		}
855		body[msg->hdr.len] = '\0';
856
857		*type = msg->hdr.type;
858		if (msg->hdr.type == XS_WATCH_EVENT) {
859				msg->u.watch.vec = split(body, msg->hdr.len,
860										 &msg->u.watch.vec_size);
861				if (IS_ERR(msg->u.watch.vec)) {
862						kfree(msg);
863						return PTR_ERR(msg->u.watch.vec);
864				}
865
866				mtx_lock(&watches_lock);
867				msg->u.watch.handle = find_watch(
868						msg->u.watch.vec[XS_WATCH_TOKEN]);
869				if (msg->u.watch.handle != NULL) {
870						mtx_lock(&watch_events_lock);
871						TAILQ_INSERT_TAIL(&watch_events, msg, list);
872						wakeup(&watch_events_waitq);
873						mtx_unlock(&watch_events_lock);
874				} else {
875						kfree(msg->u.watch.vec);
876						kfree(msg);
877				}
878				mtx_unlock(&watches_lock);
879		} else {
880				printf("event=%d ", *type);
881				msg->u.reply.body = body;
882				mtx_lock(&xs_state.reply_lock);
883				TAILQ_INSERT_TAIL(&xs_state.reply_list, msg, list);
884				wakeup(&xs_state.reply_waitq);
885				mtx_unlock(&xs_state.reply_lock);
886		}
887		if (*type == XS_WATCH_EVENT)
888				printf("\n");
889
890		return 0;
891}
892
893static void xenbus_thread(void *unused)
894{
895		int err;
896		enum xsd_sockmsg_type type;
897
898		DELAY(10000);
899		xenbus_running = 1;
900		pause("xenbus", hz/10);
901
902		for (;;) {
903				err = xs_process_msg(&type);
904				if (err)
905						printf("XENBUS error %d while reading "
906							   "message\n", err);
907
908		}
909}
910
911int xs_init(void)
912{
913		int err;
914		struct proc *p;
915
916		TAILQ_INIT(&xs_state.reply_list);
917		TAILQ_INIT(&watch_events);
918		sx_init(&xenwatch_mutex, "xenwatch");
919
920
921		mtx_init(&xs_state.reply_lock, "state reply", NULL, MTX_DEF);
922		sx_init(&xs_state.request_mutex, "xenstore request");
923		sx_init(&xs_state.suspend_mutex, "xenstore suspend");
924
925
926#if 0
927		mtx_init(&xs_state.suspend_mutex, "xenstore suspend", NULL, MTX_DEF);
928		sema_init(&xs_state.request_mutex, 1, "xenstore request");
929		sema_init(&xenwatch_mutex, 1, "xenwatch");
930#endif
931		mtx_init(&watches_lock, "watches", NULL, MTX_DEF);
932		mtx_init(&watch_events_lock, "watch events", NULL, MTX_DEF);
933
934		/* Initialize the shared memory rings to talk to xenstored */
935		err = xb_init_comms();
936		if (err)
937				return err;
938
939		err = kproc_create(xenwatch_thread, NULL, &p,
940							 RFHIGHPID, 0, "xenwatch");
941		if (err)
942				return err;
943		xenwatch_pid = p->p_pid;
944
945		err = kproc_create(xenbus_thread, NULL, NULL,
946							 RFHIGHPID, 0, "xenbus");
947
948		return err;
949}
950
951
952/*
953 * Local variables:
954 *  c-file-style: "bsd"
955 *  indent-tabs-mode: t
956 *  c-indent-level: 4
957 *  c-basic-offset: 8
958 *  tab-width: 4
959 * End:
960 */
961