• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /netgear-R7000-V1.0.7.12_1.2.5/components/opensource/linux/linux-2.6.36/drivers/xen/xenbus/
1/******************************************************************************
2 * xenbus_xs.c
3 *
4 * This is the kernel equivalent of the "xs" library.  We don't need everything
5 * and we use xenbus_comms for communication.
6 *
7 * Copyright (C) 2005 Rusty Russell, IBM Corporation
8 *
9 * This program is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU General Public License version 2
11 * as published by the Free Software Foundation; or, when distributed
12 * separately from the Linux kernel or incorporated into other
13 * software packages, subject to the following license:
14 *
15 * Permission is hereby granted, free of charge, to any person obtaining a copy
16 * of this source file (the "Software"), to deal in the Software without
17 * restriction, including without limitation the rights to use, copy, modify,
18 * merge, publish, distribute, sublicense, and/or sell copies of the Software,
19 * and to permit persons to whom the Software is furnished to do so, subject to
20 * the following conditions:
21 *
22 * The above copyright notice and this permission notice shall be included in
23 * all copies or substantial portions of the Software.
24 *
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
26 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
27 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
28 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
29 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
30 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
31 * IN THE SOFTWARE.
32 */
33
34#include <linux/unistd.h>
35#include <linux/errno.h>
36#include <linux/types.h>
37#include <linux/uio.h>
38#include <linux/kernel.h>
39#include <linux/string.h>
40#include <linux/err.h>
41#include <linux/slab.h>
42#include <linux/fcntl.h>
43#include <linux/kthread.h>
44#include <linux/rwsem.h>
45#include <linux/module.h>
46#include <linux/mutex.h>
47#include <xen/xenbus.h>
48#include "xenbus_comms.h"
49
50struct xs_stored_msg {
51	struct list_head list;
52
53	struct xsd_sockmsg hdr;
54
55	union {
56		/* Queued replies. */
57		struct {
58			char *body;
59		} reply;
60
61		/* Queued watch events. */
62		struct {
63			struct xenbus_watch *handle;
64			char **vec;
65			unsigned int vec_size;
66		} watch;
67	} u;
68};
69
70struct xs_handle {
71	/* A list of replies. Currently only one will ever be outstanding. */
72	struct list_head reply_list;
73	spinlock_t reply_lock;
74	wait_queue_head_t reply_waitq;
75
76	/*
77	 * Mutex ordering: transaction_mutex -> watch_mutex -> request_mutex.
78	 * response_mutex is never taken simultaneously with the other three.
79	 *
80	 * transaction_mutex must be held before incrementing
81	 * transaction_count. The mutex is held when a suspend is in
82	 * progress to prevent new transactions starting.
83	 *
84	 * When decrementing transaction_count to zero the wait queue
85	 * should be woken up, the suspend code waits for count to
86	 * reach zero.
87	 */
88
89	/* One request at a time. */
90	struct mutex request_mutex;
91
92	/* Protect xenbus reader thread against save/restore. */
93	struct mutex response_mutex;
94
95	/* Protect transactions against save/restore. */
96	struct mutex transaction_mutex;
97	atomic_t transaction_count;
98	wait_queue_head_t transaction_wq;
99
100	/* Protect watch (de)register against save/restore. */
101	struct rw_semaphore watch_mutex;
102};
103
104static struct xs_handle xs_state;
105
106/* List of registered watches, and a lock to protect it. */
107static LIST_HEAD(watches);
108static DEFINE_SPINLOCK(watches_lock);
109
110/* List of pending watch callback events, and a lock to protect it. */
111static LIST_HEAD(watch_events);
112static DEFINE_SPINLOCK(watch_events_lock);
113
114/*
115 * Details of the xenwatch callback kernel thread. The thread waits on the
116 * watch_events_waitq for work to do (queued on watch_events list). When it
117 * wakes up it acquires the xenwatch_mutex before reading the list and
118 * carrying out work.
119 */
120static pid_t xenwatch_pid;
121static DEFINE_MUTEX(xenwatch_mutex);
122static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq);
123
124static int get_error(const char *errorstring)
125{
126	unsigned int i;
127
128	for (i = 0; strcmp(errorstring, xsd_errors[i].errstring) != 0; i++) {
129		if (i == ARRAY_SIZE(xsd_errors) - 1) {
130			printk(KERN_WARNING
131			       "XENBUS xen store gave: unknown error %s",
132			       errorstring);
133			return EINVAL;
134		}
135	}
136	return xsd_errors[i].errnum;
137}
138
139static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
140{
141	struct xs_stored_msg *msg;
142	char *body;
143
144	spin_lock(&xs_state.reply_lock);
145
146	while (list_empty(&xs_state.reply_list)) {
147		spin_unlock(&xs_state.reply_lock);
148		wait_event(xs_state.reply_waitq,
149			   !list_empty(&xs_state.reply_list));
150		spin_lock(&xs_state.reply_lock);
151	}
152
153	msg = list_entry(xs_state.reply_list.next,
154			 struct xs_stored_msg, list);
155	list_del(&msg->list);
156
157	spin_unlock(&xs_state.reply_lock);
158
159	*type = msg->hdr.type;
160	if (len)
161		*len = msg->hdr.len;
162	body = msg->u.reply.body;
163
164	kfree(msg);
165
166	return body;
167}
168
169static void transaction_start(void)
170{
171	mutex_lock(&xs_state.transaction_mutex);
172	atomic_inc(&xs_state.transaction_count);
173	mutex_unlock(&xs_state.transaction_mutex);
174}
175
176static void transaction_end(void)
177{
178	if (atomic_dec_and_test(&xs_state.transaction_count))
179		wake_up(&xs_state.transaction_wq);
180}
181
182static void transaction_suspend(void)
183{
184	mutex_lock(&xs_state.transaction_mutex);
185	wait_event(xs_state.transaction_wq,
186		   atomic_read(&xs_state.transaction_count) == 0);
187}
188
189static void transaction_resume(void)
190{
191	mutex_unlock(&xs_state.transaction_mutex);
192}
193
194void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
195{
196	void *ret;
197	struct xsd_sockmsg req_msg = *msg;
198	int err;
199
200	if (req_msg.type == XS_TRANSACTION_START)
201		transaction_start();
202
203	mutex_lock(&xs_state.request_mutex);
204
205	err = xb_write(msg, sizeof(*msg) + msg->len);
206	if (err) {
207		msg->type = XS_ERROR;
208		ret = ERR_PTR(err);
209	} else
210		ret = read_reply(&msg->type, &msg->len);
211
212	mutex_unlock(&xs_state.request_mutex);
213
214	if ((msg->type == XS_TRANSACTION_END) ||
215	    ((req_msg.type == XS_TRANSACTION_START) &&
216	     (msg->type == XS_ERROR)))
217		transaction_end();
218
219	return ret;
220}
221EXPORT_SYMBOL(xenbus_dev_request_and_reply);
222
223/* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
224static void *xs_talkv(struct xenbus_transaction t,
225		      enum xsd_sockmsg_type type,
226		      const struct kvec *iovec,
227		      unsigned int num_vecs,
228		      unsigned int *len)
229{
230	struct xsd_sockmsg msg;
231	void *ret = NULL;
232	unsigned int i;
233	int err;
234
235	msg.tx_id = t.id;
236	msg.req_id = 0;
237	msg.type = type;
238	msg.len = 0;
239	for (i = 0; i < num_vecs; i++)
240		msg.len += iovec[i].iov_len;
241
242	mutex_lock(&xs_state.request_mutex);
243
244	err = xb_write(&msg, sizeof(msg));
245	if (err) {
246		mutex_unlock(&xs_state.request_mutex);
247		return ERR_PTR(err);
248	}
249
250	for (i = 0; i < num_vecs; i++) {
251		err = xb_write(iovec[i].iov_base, iovec[i].iov_len);
252		if (err) {
253			mutex_unlock(&xs_state.request_mutex);
254			return ERR_PTR(err);
255		}
256	}
257
258	ret = read_reply(&msg.type, len);
259
260	mutex_unlock(&xs_state.request_mutex);
261
262	if (IS_ERR(ret))
263		return ret;
264
265	if (msg.type == XS_ERROR) {
266		err = get_error(ret);
267		kfree(ret);
268		return ERR_PTR(-err);
269	}
270
271	if (msg.type != type) {
272		if (printk_ratelimit())
273			printk(KERN_WARNING
274			       "XENBUS unexpected type [%d], expected [%d]\n",
275			       msg.type, type);
276		kfree(ret);
277		return ERR_PTR(-EINVAL);
278	}
279	return ret;
280}
281
282/* Simplified version of xs_talkv: single message. */
283static void *xs_single(struct xenbus_transaction t,
284		       enum xsd_sockmsg_type type,
285		       const char *string,
286		       unsigned int *len)
287{
288	struct kvec iovec;
289
290	iovec.iov_base = (void *)string;
291	iovec.iov_len = strlen(string) + 1;
292	return xs_talkv(t, type, &iovec, 1, len);
293}
294
295/* Many commands only need an ack, don't care what it says. */
296static int xs_error(char *reply)
297{
298	if (IS_ERR(reply))
299		return PTR_ERR(reply);
300	kfree(reply);
301	return 0;
302}
303
304static unsigned int count_strings(const char *strings, unsigned int len)
305{
306	unsigned int num;
307	const char *p;
308
309	for (p = strings, num = 0; p < strings + len; p += strlen(p) + 1)
310		num++;
311
312	return num;
313}
314
315/* Return the path to dir with /name appended. Buffer must be kfree()'ed. */
316static char *join(const char *dir, const char *name)
317{
318	char *buffer;
319
320	if (strlen(name) == 0)
321		buffer = kasprintf(GFP_NOIO | __GFP_HIGH, "%s", dir);
322	else
323		buffer = kasprintf(GFP_NOIO | __GFP_HIGH, "%s/%s", dir, name);
324	return (!buffer) ? ERR_PTR(-ENOMEM) : buffer;
325}
326
327static char **split(char *strings, unsigned int len, unsigned int *num)
328{
329	char *p, **ret;
330
331	/* Count the strings. */
332	*num = count_strings(strings, len);
333
334	/* Transfer to one big alloc for easy freeing. */
335	ret = kmalloc(*num * sizeof(char *) + len, GFP_NOIO | __GFP_HIGH);
336	if (!ret) {
337		kfree(strings);
338		return ERR_PTR(-ENOMEM);
339	}
340	memcpy(&ret[*num], strings, len);
341	kfree(strings);
342
343	strings = (char *)&ret[*num];
344	for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
345		ret[(*num)++] = p;
346
347	return ret;
348}
349
350char **xenbus_directory(struct xenbus_transaction t,
351			const char *dir, const char *node, unsigned int *num)
352{
353	char *strings, *path;
354	unsigned int len;
355
356	path = join(dir, node);
357	if (IS_ERR(path))
358		return (char **)path;
359
360	strings = xs_single(t, XS_DIRECTORY, path, &len);
361	kfree(path);
362	if (IS_ERR(strings))
363		return (char **)strings;
364
365	return split(strings, len, num);
366}
367EXPORT_SYMBOL_GPL(xenbus_directory);
368
369/* Check if a path exists. Return 1 if it does. */
370int xenbus_exists(struct xenbus_transaction t,
371		  const char *dir, const char *node)
372{
373	char **d;
374	int dir_n;
375
376	d = xenbus_directory(t, dir, node, &dir_n);
377	if (IS_ERR(d))
378		return 0;
379	kfree(d);
380	return 1;
381}
382EXPORT_SYMBOL_GPL(xenbus_exists);
383
384/* Get the value of a single file.
385 * Returns a kmalloced value: call free() on it after use.
386 * len indicates length in bytes.
387 */
388void *xenbus_read(struct xenbus_transaction t,
389		  const char *dir, const char *node, unsigned int *len)
390{
391	char *path;
392	void *ret;
393
394	path = join(dir, node);
395	if (IS_ERR(path))
396		return (void *)path;
397
398	ret = xs_single(t, XS_READ, path, len);
399	kfree(path);
400	return ret;
401}
402EXPORT_SYMBOL_GPL(xenbus_read);
403
404/* Write the value of a single file.
405 * Returns -err on failure.
406 */
407int xenbus_write(struct xenbus_transaction t,
408		 const char *dir, const char *node, const char *string)
409{
410	const char *path;
411	struct kvec iovec[2];
412	int ret;
413
414	path = join(dir, node);
415	if (IS_ERR(path))
416		return PTR_ERR(path);
417
418	iovec[0].iov_base = (void *)path;
419	iovec[0].iov_len = strlen(path) + 1;
420	iovec[1].iov_base = (void *)string;
421	iovec[1].iov_len = strlen(string);
422
423	ret = xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
424	kfree(path);
425	return ret;
426}
427EXPORT_SYMBOL_GPL(xenbus_write);
428
429/* Create a new directory. */
430int xenbus_mkdir(struct xenbus_transaction t,
431		 const char *dir, const char *node)
432{
433	char *path;
434	int ret;
435
436	path = join(dir, node);
437	if (IS_ERR(path))
438		return PTR_ERR(path);
439
440	ret = xs_error(xs_single(t, XS_MKDIR, path, NULL));
441	kfree(path);
442	return ret;
443}
444EXPORT_SYMBOL_GPL(xenbus_mkdir);
445
446/* Destroy a file or directory (directories must be empty). */
447int xenbus_rm(struct xenbus_transaction t, const char *dir, const char *node)
448{
449	char *path;
450	int ret;
451
452	path = join(dir, node);
453	if (IS_ERR(path))
454		return PTR_ERR(path);
455
456	ret = xs_error(xs_single(t, XS_RM, path, NULL));
457	kfree(path);
458	return ret;
459}
460EXPORT_SYMBOL_GPL(xenbus_rm);
461
462/* Start a transaction: changes by others will not be seen during this
463 * transaction, and changes will not be visible to others until end.
464 */
465int xenbus_transaction_start(struct xenbus_transaction *t)
466{
467	char *id_str;
468
469	transaction_start();
470
471	id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL);
472	if (IS_ERR(id_str)) {
473		transaction_end();
474		return PTR_ERR(id_str);
475	}
476
477	t->id = simple_strtoul(id_str, NULL, 0);
478	kfree(id_str);
479	return 0;
480}
481EXPORT_SYMBOL_GPL(xenbus_transaction_start);
482
483/* End a transaction.
484 * If abandon is true, transaction is discarded instead of committed.
485 */
486int xenbus_transaction_end(struct xenbus_transaction t, int abort)
487{
488	char abortstr[2];
489	int err;
490
491	if (abort)
492		strcpy(abortstr, "F");
493	else
494		strcpy(abortstr, "T");
495
496	err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
497
498	transaction_end();
499
500	return err;
501}
502EXPORT_SYMBOL_GPL(xenbus_transaction_end);
503
504/* Single read and scanf: returns -errno or num scanned. */
505int xenbus_scanf(struct xenbus_transaction t,
506		 const char *dir, const char *node, const char *fmt, ...)
507{
508	va_list ap;
509	int ret;
510	char *val;
511
512	val = xenbus_read(t, dir, node, NULL);
513	if (IS_ERR(val))
514		return PTR_ERR(val);
515
516	va_start(ap, fmt);
517	ret = vsscanf(val, fmt, ap);
518	va_end(ap);
519	kfree(val);
520	/* Distinctive errno. */
521	if (ret == 0)
522		return -ERANGE;
523	return ret;
524}
525EXPORT_SYMBOL_GPL(xenbus_scanf);
526
527/* Single printf and write: returns -errno or 0. */
528int xenbus_printf(struct xenbus_transaction t,
529		  const char *dir, const char *node, const char *fmt, ...)
530{
531	va_list ap;
532	int ret;
533#define PRINTF_BUFFER_SIZE 4096
534	char *printf_buffer;
535
536	printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_NOIO | __GFP_HIGH);
537	if (printf_buffer == NULL)
538		return -ENOMEM;
539
540	va_start(ap, fmt);
541	ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
542	va_end(ap);
543
544	BUG_ON(ret > PRINTF_BUFFER_SIZE-1);
545	ret = xenbus_write(t, dir, node, printf_buffer);
546
547	kfree(printf_buffer);
548
549	return ret;
550}
551EXPORT_SYMBOL_GPL(xenbus_printf);
552
553/* Takes tuples of names, scanf-style args, and void **, NULL terminated. */
554int xenbus_gather(struct xenbus_transaction t, const char *dir, ...)
555{
556	va_list ap;
557	const char *name;
558	int ret = 0;
559
560	va_start(ap, dir);
561	while (ret == 0 && (name = va_arg(ap, char *)) != NULL) {
562		const char *fmt = va_arg(ap, char *);
563		void *result = va_arg(ap, void *);
564		char *p;
565
566		p = xenbus_read(t, dir, name, NULL);
567		if (IS_ERR(p)) {
568			ret = PTR_ERR(p);
569			break;
570		}
571		if (fmt) {
572			if (sscanf(p, fmt, result) == 0)
573				ret = -EINVAL;
574			kfree(p);
575		} else
576			*(char **)result = p;
577	}
578	va_end(ap);
579	return ret;
580}
581EXPORT_SYMBOL_GPL(xenbus_gather);
582
583static int xs_watch(const char *path, const char *token)
584{
585	struct kvec iov[2];
586
587	iov[0].iov_base = (void *)path;
588	iov[0].iov_len = strlen(path) + 1;
589	iov[1].iov_base = (void *)token;
590	iov[1].iov_len = strlen(token) + 1;
591
592	return xs_error(xs_talkv(XBT_NIL, XS_WATCH, iov,
593				 ARRAY_SIZE(iov), NULL));
594}
595
596static int xs_unwatch(const char *path, const char *token)
597{
598	struct kvec iov[2];
599
600	iov[0].iov_base = (char *)path;
601	iov[0].iov_len = strlen(path) + 1;
602	iov[1].iov_base = (char *)token;
603	iov[1].iov_len = strlen(token) + 1;
604
605	return xs_error(xs_talkv(XBT_NIL, XS_UNWATCH, iov,
606				 ARRAY_SIZE(iov), NULL));
607}
608
609static struct xenbus_watch *find_watch(const char *token)
610{
611	struct xenbus_watch *i, *cmp;
612
613	cmp = (void *)simple_strtoul(token, NULL, 16);
614
615	list_for_each_entry(i, &watches, list)
616		if (i == cmp)
617			return i;
618
619	return NULL;
620}
621
622/* Register callback to watch this node. */
623int register_xenbus_watch(struct xenbus_watch *watch)
624{
625	/* Pointer in ascii is the token. */
626	char token[sizeof(watch) * 2 + 1];
627	int err;
628
629	sprintf(token, "%lX", (long)watch);
630
631	down_read(&xs_state.watch_mutex);
632
633	spin_lock(&watches_lock);
634	BUG_ON(find_watch(token));
635	list_add(&watch->list, &watches);
636	spin_unlock(&watches_lock);
637
638	err = xs_watch(watch->node, token);
639
640	/* Ignore errors due to multiple registration. */
641	if ((err != 0) && (err != -EEXIST)) {
642		spin_lock(&watches_lock);
643		list_del(&watch->list);
644		spin_unlock(&watches_lock);
645	}
646
647	up_read(&xs_state.watch_mutex);
648
649	return err;
650}
651EXPORT_SYMBOL_GPL(register_xenbus_watch);
652
653void unregister_xenbus_watch(struct xenbus_watch *watch)
654{
655	struct xs_stored_msg *msg, *tmp;
656	char token[sizeof(watch) * 2 + 1];
657	int err;
658
659	sprintf(token, "%lX", (long)watch);
660
661	down_read(&xs_state.watch_mutex);
662
663	spin_lock(&watches_lock);
664	BUG_ON(!find_watch(token));
665	list_del(&watch->list);
666	spin_unlock(&watches_lock);
667
668	err = xs_unwatch(watch->node, token);
669	if (err)
670		printk(KERN_WARNING
671		       "XENBUS Failed to release watch %s: %i\n",
672		       watch->node, err);
673
674	up_read(&xs_state.watch_mutex);
675
676	/* Make sure there are no callbacks running currently (unless
677	   its us) */
678	if (current->pid != xenwatch_pid)
679		mutex_lock(&xenwatch_mutex);
680
681	/* Cancel pending watch events. */
682	spin_lock(&watch_events_lock);
683	list_for_each_entry_safe(msg, tmp, &watch_events, list) {
684		if (msg->u.watch.handle != watch)
685			continue;
686		list_del(&msg->list);
687		kfree(msg->u.watch.vec);
688		kfree(msg);
689	}
690	spin_unlock(&watch_events_lock);
691
692	if (current->pid != xenwatch_pid)
693		mutex_unlock(&xenwatch_mutex);
694}
695EXPORT_SYMBOL_GPL(unregister_xenbus_watch);
696
697void xs_suspend(void)
698{
699	transaction_suspend();
700	down_write(&xs_state.watch_mutex);
701	mutex_lock(&xs_state.request_mutex);
702	mutex_lock(&xs_state.response_mutex);
703}
704
705void xs_resume(void)
706{
707	struct xenbus_watch *watch;
708	char token[sizeof(watch) * 2 + 1];
709
710	xb_init_comms();
711
712	mutex_unlock(&xs_state.response_mutex);
713	mutex_unlock(&xs_state.request_mutex);
714	transaction_resume();
715
716	/* No need for watches_lock: the watch_mutex is sufficient. */
717	list_for_each_entry(watch, &watches, list) {
718		sprintf(token, "%lX", (long)watch);
719		xs_watch(watch->node, token);
720	}
721
722	up_write(&xs_state.watch_mutex);
723}
724
725void xs_suspend_cancel(void)
726{
727	mutex_unlock(&xs_state.response_mutex);
728	mutex_unlock(&xs_state.request_mutex);
729	up_write(&xs_state.watch_mutex);
730	mutex_unlock(&xs_state.transaction_mutex);
731}
732
733static int xenwatch_thread(void *unused)
734{
735	struct list_head *ent;
736	struct xs_stored_msg *msg;
737
738	for (;;) {
739		wait_event_interruptible(watch_events_waitq,
740					 !list_empty(&watch_events));
741
742		if (kthread_should_stop())
743			break;
744
745		mutex_lock(&xenwatch_mutex);
746
747		spin_lock(&watch_events_lock);
748		ent = watch_events.next;
749		if (ent != &watch_events)
750			list_del(ent);
751		spin_unlock(&watch_events_lock);
752
753		if (ent != &watch_events) {
754			msg = list_entry(ent, struct xs_stored_msg, list);
755			msg->u.watch.handle->callback(
756				msg->u.watch.handle,
757				(const char **)msg->u.watch.vec,
758				msg->u.watch.vec_size);
759			kfree(msg->u.watch.vec);
760			kfree(msg);
761		}
762
763		mutex_unlock(&xenwatch_mutex);
764	}
765
766	return 0;
767}
768
769static int process_msg(void)
770{
771	struct xs_stored_msg *msg;
772	char *body;
773	int err;
774
775	/*
776	 * We must disallow save/restore while reading a xenstore message.
777	 * A partial read across s/r leaves us out of sync with xenstored.
778	 */
779	for (;;) {
780		err = xb_wait_for_data_to_read();
781		if (err)
782			return err;
783		mutex_lock(&xs_state.response_mutex);
784		if (xb_data_to_read())
785			break;
786		/* We raced with save/restore: pending data 'disappeared'. */
787		mutex_unlock(&xs_state.response_mutex);
788	}
789
790
791	msg = kmalloc(sizeof(*msg), GFP_NOIO | __GFP_HIGH);
792	if (msg == NULL) {
793		err = -ENOMEM;
794		goto out;
795	}
796
797	err = xb_read(&msg->hdr, sizeof(msg->hdr));
798	if (err) {
799		kfree(msg);
800		goto out;
801	}
802
803	body = kmalloc(msg->hdr.len + 1, GFP_NOIO | __GFP_HIGH);
804	if (body == NULL) {
805		kfree(msg);
806		err = -ENOMEM;
807		goto out;
808	}
809
810	err = xb_read(body, msg->hdr.len);
811	if (err) {
812		kfree(body);
813		kfree(msg);
814		goto out;
815	}
816	body[msg->hdr.len] = '\0';
817
818	if (msg->hdr.type == XS_WATCH_EVENT) {
819		msg->u.watch.vec = split(body, msg->hdr.len,
820					 &msg->u.watch.vec_size);
821		if (IS_ERR(msg->u.watch.vec)) {
822			err = PTR_ERR(msg->u.watch.vec);
823			kfree(msg);
824			goto out;
825		}
826
827		spin_lock(&watches_lock);
828		msg->u.watch.handle = find_watch(
829			msg->u.watch.vec[XS_WATCH_TOKEN]);
830		if (msg->u.watch.handle != NULL) {
831			spin_lock(&watch_events_lock);
832			list_add_tail(&msg->list, &watch_events);
833			wake_up(&watch_events_waitq);
834			spin_unlock(&watch_events_lock);
835		} else {
836			kfree(msg->u.watch.vec);
837			kfree(msg);
838		}
839		spin_unlock(&watches_lock);
840	} else {
841		msg->u.reply.body = body;
842		spin_lock(&xs_state.reply_lock);
843		list_add_tail(&msg->list, &xs_state.reply_list);
844		spin_unlock(&xs_state.reply_lock);
845		wake_up(&xs_state.reply_waitq);
846	}
847
848 out:
849	mutex_unlock(&xs_state.response_mutex);
850	return err;
851}
852
853static int xenbus_thread(void *unused)
854{
855	int err;
856
857	for (;;) {
858		err = process_msg();
859		if (err)
860			printk(KERN_WARNING "XENBUS error %d while reading "
861			       "message\n", err);
862		if (kthread_should_stop())
863			break;
864	}
865
866	return 0;
867}
868
869int xs_init(void)
870{
871	int err;
872	struct task_struct *task;
873
874	INIT_LIST_HEAD(&xs_state.reply_list);
875	spin_lock_init(&xs_state.reply_lock);
876	init_waitqueue_head(&xs_state.reply_waitq);
877
878	mutex_init(&xs_state.request_mutex);
879	mutex_init(&xs_state.response_mutex);
880	mutex_init(&xs_state.transaction_mutex);
881	init_rwsem(&xs_state.watch_mutex);
882	atomic_set(&xs_state.transaction_count, 0);
883	init_waitqueue_head(&xs_state.transaction_wq);
884
885	/* Initialize the shared memory rings to talk to xenstored */
886	err = xb_init_comms();
887	if (err)
888		return err;
889
890	task = kthread_run(xenwatch_thread, NULL, "xenwatch");
891	if (IS_ERR(task))
892		return PTR_ERR(task);
893	xenwatch_pid = task->pid;
894
895	task = kthread_run(xenbus_thread, NULL, "xenbus");
896	if (IS_ERR(task))
897		return PTR_ERR(task);
898
899	return 0;
900}
901