1/****************************************************************************** 2 * xenbus_xs.c 3 * 4 * This is the kernel equivalent of the "xs" library. We don't need everything 5 * and we use xenbus_comms for communication. 6 * 7 * Copyright (C) 2005 Rusty Russell, IBM Corporation 8 * 9 * This program is free software; you can redistribute it and/or 10 * modify it under the terms of the GNU General Public License version 2 11 * as published by the Free Software Foundation; or, when distributed 12 * separately from the Linux kernel or incorporated into other 13 * software packages, subject to the following license: 14 * 15 * Permission is hereby granted, free of charge, to any person obtaining a copy 16 * of this source file (the "Software"), to deal in the Software without 17 * restriction, including without limitation the rights to use, copy, modify, 18 * merge, publish, distribute, sublicense, and/or sell copies of the Software, 19 * and to permit persons to whom the Software is furnished to do so, subject to 20 * the following conditions: 21 * 22 * The above copyright notice and this permission notice shall be included in 23 * all copies or substantial portions of the Software. 24 * 25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 26 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 27 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 28 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 29 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 30 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 31 * IN THE SOFTWARE. 32 */ 33 34#include <linux/unistd.h> 35#include <linux/errno.h> 36#include <linux/types.h> 37#include <linux/uio.h> 38#include <linux/kernel.h> 39#include <linux/string.h> 40#include <linux/err.h> 41#include <linux/slab.h> 42#include <linux/fcntl.h> 43#include <linux/kthread.h> 44#include <linux/rwsem.h> 45#include <linux/module.h> 46#include <linux/mutex.h> 47#include <xen/xenbus.h> 48#include "xenbus_comms.h" 49 50struct xs_stored_msg { 51 struct list_head list; 52 53 struct xsd_sockmsg hdr; 54 55 union { 56 /* Queued replies. */ 57 struct { 58 char *body; 59 } reply; 60 61 /* Queued watch events. */ 62 struct { 63 struct xenbus_watch *handle; 64 char **vec; 65 unsigned int vec_size; 66 } watch; 67 } u; 68}; 69 70struct xs_handle { 71 /* A list of replies. Currently only one will ever be outstanding. */ 72 struct list_head reply_list; 73 spinlock_t reply_lock; 74 wait_queue_head_t reply_waitq; 75 76 /* 77 * Mutex ordering: transaction_mutex -> watch_mutex -> request_mutex. 78 * response_mutex is never taken simultaneously with the other three. 79 * 80 * transaction_mutex must be held before incrementing 81 * transaction_count. The mutex is held when a suspend is in 82 * progress to prevent new transactions starting. 83 * 84 * When decrementing transaction_count to zero the wait queue 85 * should be woken up, the suspend code waits for count to 86 * reach zero. 87 */ 88 89 /* One request at a time. */ 90 struct mutex request_mutex; 91 92 /* Protect xenbus reader thread against save/restore. */ 93 struct mutex response_mutex; 94 95 /* Protect transactions against save/restore. */ 96 struct mutex transaction_mutex; 97 atomic_t transaction_count; 98 wait_queue_head_t transaction_wq; 99 100 /* Protect watch (de)register against save/restore. */ 101 struct rw_semaphore watch_mutex; 102}; 103 104static struct xs_handle xs_state; 105 106/* List of registered watches, and a lock to protect it. */ 107static LIST_HEAD(watches); 108static DEFINE_SPINLOCK(watches_lock); 109 110/* List of pending watch callback events, and a lock to protect it. */ 111static LIST_HEAD(watch_events); 112static DEFINE_SPINLOCK(watch_events_lock); 113 114/* 115 * Details of the xenwatch callback kernel thread. The thread waits on the 116 * watch_events_waitq for work to do (queued on watch_events list). When it 117 * wakes up it acquires the xenwatch_mutex before reading the list and 118 * carrying out work. 119 */ 120static pid_t xenwatch_pid; 121static DEFINE_MUTEX(xenwatch_mutex); 122static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq); 123 124static int get_error(const char *errorstring) 125{ 126 unsigned int i; 127 128 for (i = 0; strcmp(errorstring, xsd_errors[i].errstring) != 0; i++) { 129 if (i == ARRAY_SIZE(xsd_errors) - 1) { 130 printk(KERN_WARNING 131 "XENBUS xen store gave: unknown error %s", 132 errorstring); 133 return EINVAL; 134 } 135 } 136 return xsd_errors[i].errnum; 137} 138 139static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len) 140{ 141 struct xs_stored_msg *msg; 142 char *body; 143 144 spin_lock(&xs_state.reply_lock); 145 146 while (list_empty(&xs_state.reply_list)) { 147 spin_unlock(&xs_state.reply_lock); 148 wait_event(xs_state.reply_waitq, 149 !list_empty(&xs_state.reply_list)); 150 spin_lock(&xs_state.reply_lock); 151 } 152 153 msg = list_entry(xs_state.reply_list.next, 154 struct xs_stored_msg, list); 155 list_del(&msg->list); 156 157 spin_unlock(&xs_state.reply_lock); 158 159 *type = msg->hdr.type; 160 if (len) 161 *len = msg->hdr.len; 162 body = msg->u.reply.body; 163 164 kfree(msg); 165 166 return body; 167} 168 169static void transaction_start(void) 170{ 171 mutex_lock(&xs_state.transaction_mutex); 172 atomic_inc(&xs_state.transaction_count); 173 mutex_unlock(&xs_state.transaction_mutex); 174} 175 176static void transaction_end(void) 177{ 178 if (atomic_dec_and_test(&xs_state.transaction_count)) 179 wake_up(&xs_state.transaction_wq); 180} 181 182static void transaction_suspend(void) 183{ 184 mutex_lock(&xs_state.transaction_mutex); 185 wait_event(xs_state.transaction_wq, 186 atomic_read(&xs_state.transaction_count) == 0); 187} 188 189static void transaction_resume(void) 190{ 191 mutex_unlock(&xs_state.transaction_mutex); 192} 193 194void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg) 195{ 196 void *ret; 197 struct xsd_sockmsg req_msg = *msg; 198 int err; 199 200 if (req_msg.type == XS_TRANSACTION_START) 201 transaction_start(); 202 203 mutex_lock(&xs_state.request_mutex); 204 205 err = xb_write(msg, sizeof(*msg) + msg->len); 206 if (err) { 207 msg->type = XS_ERROR; 208 ret = ERR_PTR(err); 209 } else 210 ret = read_reply(&msg->type, &msg->len); 211 212 mutex_unlock(&xs_state.request_mutex); 213 214 if ((msg->type == XS_TRANSACTION_END) || 215 ((req_msg.type == XS_TRANSACTION_START) && 216 (msg->type == XS_ERROR))) 217 transaction_end(); 218 219 return ret; 220} 221EXPORT_SYMBOL(xenbus_dev_request_and_reply); 222 223/* Send message to xs, get kmalloc'ed reply. ERR_PTR() on error. */ 224static void *xs_talkv(struct xenbus_transaction t, 225 enum xsd_sockmsg_type type, 226 const struct kvec *iovec, 227 unsigned int num_vecs, 228 unsigned int *len) 229{ 230 struct xsd_sockmsg msg; 231 void *ret = NULL; 232 unsigned int i; 233 int err; 234 235 msg.tx_id = t.id; 236 msg.req_id = 0; 237 msg.type = type; 238 msg.len = 0; 239 for (i = 0; i < num_vecs; i++) 240 msg.len += iovec[i].iov_len; 241 242 mutex_lock(&xs_state.request_mutex); 243 244 err = xb_write(&msg, sizeof(msg)); 245 if (err) { 246 mutex_unlock(&xs_state.request_mutex); 247 return ERR_PTR(err); 248 } 249 250 for (i = 0; i < num_vecs; i++) { 251 err = xb_write(iovec[i].iov_base, iovec[i].iov_len); 252 if (err) { 253 mutex_unlock(&xs_state.request_mutex); 254 return ERR_PTR(err); 255 } 256 } 257 258 ret = read_reply(&msg.type, len); 259 260 mutex_unlock(&xs_state.request_mutex); 261 262 if (IS_ERR(ret)) 263 return ret; 264 265 if (msg.type == XS_ERROR) { 266 err = get_error(ret); 267 kfree(ret); 268 return ERR_PTR(-err); 269 } 270 271 if (msg.type != type) { 272 if (printk_ratelimit()) 273 printk(KERN_WARNING 274 "XENBUS unexpected type [%d], expected [%d]\n", 275 msg.type, type); 276 kfree(ret); 277 return ERR_PTR(-EINVAL); 278 } 279 return ret; 280} 281 282/* Simplified version of xs_talkv: single message. */ 283static void *xs_single(struct xenbus_transaction t, 284 enum xsd_sockmsg_type type, 285 const char *string, 286 unsigned int *len) 287{ 288 struct kvec iovec; 289 290 iovec.iov_base = (void *)string; 291 iovec.iov_len = strlen(string) + 1; 292 return xs_talkv(t, type, &iovec, 1, len); 293} 294 295/* Many commands only need an ack, don't care what it says. */ 296static int xs_error(char *reply) 297{ 298 if (IS_ERR(reply)) 299 return PTR_ERR(reply); 300 kfree(reply); 301 return 0; 302} 303 304static unsigned int count_strings(const char *strings, unsigned int len) 305{ 306 unsigned int num; 307 const char *p; 308 309 for (p = strings, num = 0; p < strings + len; p += strlen(p) + 1) 310 num++; 311 312 return num; 313} 314 315/* Return the path to dir with /name appended. Buffer must be kfree()'ed. */ 316static char *join(const char *dir, const char *name) 317{ 318 char *buffer; 319 320 if (strlen(name) == 0) 321 buffer = kasprintf(GFP_NOIO | __GFP_HIGH, "%s", dir); 322 else 323 buffer = kasprintf(GFP_NOIO | __GFP_HIGH, "%s/%s", dir, name); 324 return (!buffer) ? ERR_PTR(-ENOMEM) : buffer; 325} 326 327static char **split(char *strings, unsigned int len, unsigned int *num) 328{ 329 char *p, **ret; 330 331 /* Count the strings. */ 332 *num = count_strings(strings, len); 333 334 /* Transfer to one big alloc for easy freeing. */ 335 ret = kmalloc(*num * sizeof(char *) + len, GFP_NOIO | __GFP_HIGH); 336 if (!ret) { 337 kfree(strings); 338 return ERR_PTR(-ENOMEM); 339 } 340 memcpy(&ret[*num], strings, len); 341 kfree(strings); 342 343 strings = (char *)&ret[*num]; 344 for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1) 345 ret[(*num)++] = p; 346 347 return ret; 348} 349 350char **xenbus_directory(struct xenbus_transaction t, 351 const char *dir, const char *node, unsigned int *num) 352{ 353 char *strings, *path; 354 unsigned int len; 355 356 path = join(dir, node); 357 if (IS_ERR(path)) 358 return (char **)path; 359 360 strings = xs_single(t, XS_DIRECTORY, path, &len); 361 kfree(path); 362 if (IS_ERR(strings)) 363 return (char **)strings; 364 365 return split(strings, len, num); 366} 367EXPORT_SYMBOL_GPL(xenbus_directory); 368 369/* Check if a path exists. Return 1 if it does. */ 370int xenbus_exists(struct xenbus_transaction t, 371 const char *dir, const char *node) 372{ 373 char **d; 374 int dir_n; 375 376 d = xenbus_directory(t, dir, node, &dir_n); 377 if (IS_ERR(d)) 378 return 0; 379 kfree(d); 380 return 1; 381} 382EXPORT_SYMBOL_GPL(xenbus_exists); 383 384/* Get the value of a single file. 385 * Returns a kmalloced value: call free() on it after use. 386 * len indicates length in bytes. 387 */ 388void *xenbus_read(struct xenbus_transaction t, 389 const char *dir, const char *node, unsigned int *len) 390{ 391 char *path; 392 void *ret; 393 394 path = join(dir, node); 395 if (IS_ERR(path)) 396 return (void *)path; 397 398 ret = xs_single(t, XS_READ, path, len); 399 kfree(path); 400 return ret; 401} 402EXPORT_SYMBOL_GPL(xenbus_read); 403 404/* Write the value of a single file. 405 * Returns -err on failure. 406 */ 407int xenbus_write(struct xenbus_transaction t, 408 const char *dir, const char *node, const char *string) 409{ 410 const char *path; 411 struct kvec iovec[2]; 412 int ret; 413 414 path = join(dir, node); 415 if (IS_ERR(path)) 416 return PTR_ERR(path); 417 418 iovec[0].iov_base = (void *)path; 419 iovec[0].iov_len = strlen(path) + 1; 420 iovec[1].iov_base = (void *)string; 421 iovec[1].iov_len = strlen(string); 422 423 ret = xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL)); 424 kfree(path); 425 return ret; 426} 427EXPORT_SYMBOL_GPL(xenbus_write); 428 429/* Create a new directory. */ 430int xenbus_mkdir(struct xenbus_transaction t, 431 const char *dir, const char *node) 432{ 433 char *path; 434 int ret; 435 436 path = join(dir, node); 437 if (IS_ERR(path)) 438 return PTR_ERR(path); 439 440 ret = xs_error(xs_single(t, XS_MKDIR, path, NULL)); 441 kfree(path); 442 return ret; 443} 444EXPORT_SYMBOL_GPL(xenbus_mkdir); 445 446/* Destroy a file or directory (directories must be empty). */ 447int xenbus_rm(struct xenbus_transaction t, const char *dir, const char *node) 448{ 449 char *path; 450 int ret; 451 452 path = join(dir, node); 453 if (IS_ERR(path)) 454 return PTR_ERR(path); 455 456 ret = xs_error(xs_single(t, XS_RM, path, NULL)); 457 kfree(path); 458 return ret; 459} 460EXPORT_SYMBOL_GPL(xenbus_rm); 461 462/* Start a transaction: changes by others will not be seen during this 463 * transaction, and changes will not be visible to others until end. 464 */ 465int xenbus_transaction_start(struct xenbus_transaction *t) 466{ 467 char *id_str; 468 469 transaction_start(); 470 471 id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL); 472 if (IS_ERR(id_str)) { 473 transaction_end(); 474 return PTR_ERR(id_str); 475 } 476 477 t->id = simple_strtoul(id_str, NULL, 0); 478 kfree(id_str); 479 return 0; 480} 481EXPORT_SYMBOL_GPL(xenbus_transaction_start); 482 483/* End a transaction. 484 * If abandon is true, transaction is discarded instead of committed. 485 */ 486int xenbus_transaction_end(struct xenbus_transaction t, int abort) 487{ 488 char abortstr[2]; 489 int err; 490 491 if (abort) 492 strcpy(abortstr, "F"); 493 else 494 strcpy(abortstr, "T"); 495 496 err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL)); 497 498 transaction_end(); 499 500 return err; 501} 502EXPORT_SYMBOL_GPL(xenbus_transaction_end); 503 504/* Single read and scanf: returns -errno or num scanned. */ 505int xenbus_scanf(struct xenbus_transaction t, 506 const char *dir, const char *node, const char *fmt, ...) 507{ 508 va_list ap; 509 int ret; 510 char *val; 511 512 val = xenbus_read(t, dir, node, NULL); 513 if (IS_ERR(val)) 514 return PTR_ERR(val); 515 516 va_start(ap, fmt); 517 ret = vsscanf(val, fmt, ap); 518 va_end(ap); 519 kfree(val); 520 /* Distinctive errno. */ 521 if (ret == 0) 522 return -ERANGE; 523 return ret; 524} 525EXPORT_SYMBOL_GPL(xenbus_scanf); 526 527/* Single printf and write: returns -errno or 0. */ 528int xenbus_printf(struct xenbus_transaction t, 529 const char *dir, const char *node, const char *fmt, ...) 530{ 531 va_list ap; 532 int ret; 533#define PRINTF_BUFFER_SIZE 4096 534 char *printf_buffer; 535 536 printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_NOIO | __GFP_HIGH); 537 if (printf_buffer == NULL) 538 return -ENOMEM; 539 540 va_start(ap, fmt); 541 ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap); 542 va_end(ap); 543 544 BUG_ON(ret > PRINTF_BUFFER_SIZE-1); 545 ret = xenbus_write(t, dir, node, printf_buffer); 546 547 kfree(printf_buffer); 548 549 return ret; 550} 551EXPORT_SYMBOL_GPL(xenbus_printf); 552 553/* Takes tuples of names, scanf-style args, and void **, NULL terminated. */ 554int xenbus_gather(struct xenbus_transaction t, const char *dir, ...) 555{ 556 va_list ap; 557 const char *name; 558 int ret = 0; 559 560 va_start(ap, dir); 561 while (ret == 0 && (name = va_arg(ap, char *)) != NULL) { 562 const char *fmt = va_arg(ap, char *); 563 void *result = va_arg(ap, void *); 564 char *p; 565 566 p = xenbus_read(t, dir, name, NULL); 567 if (IS_ERR(p)) { 568 ret = PTR_ERR(p); 569 break; 570 } 571 if (fmt) { 572 if (sscanf(p, fmt, result) == 0) 573 ret = -EINVAL; 574 kfree(p); 575 } else 576 *(char **)result = p; 577 } 578 va_end(ap); 579 return ret; 580} 581EXPORT_SYMBOL_GPL(xenbus_gather); 582 583static int xs_watch(const char *path, const char *token) 584{ 585 struct kvec iov[2]; 586 587 iov[0].iov_base = (void *)path; 588 iov[0].iov_len = strlen(path) + 1; 589 iov[1].iov_base = (void *)token; 590 iov[1].iov_len = strlen(token) + 1; 591 592 return xs_error(xs_talkv(XBT_NIL, XS_WATCH, iov, 593 ARRAY_SIZE(iov), NULL)); 594} 595 596static int xs_unwatch(const char *path, const char *token) 597{ 598 struct kvec iov[2]; 599 600 iov[0].iov_base = (char *)path; 601 iov[0].iov_len = strlen(path) + 1; 602 iov[1].iov_base = (char *)token; 603 iov[1].iov_len = strlen(token) + 1; 604 605 return xs_error(xs_talkv(XBT_NIL, XS_UNWATCH, iov, 606 ARRAY_SIZE(iov), NULL)); 607} 608 609static struct xenbus_watch *find_watch(const char *token) 610{ 611 struct xenbus_watch *i, *cmp; 612 613 cmp = (void *)simple_strtoul(token, NULL, 16); 614 615 list_for_each_entry(i, &watches, list) 616 if (i == cmp) 617 return i; 618 619 return NULL; 620} 621 622/* Register callback to watch this node. */ 623int register_xenbus_watch(struct xenbus_watch *watch) 624{ 625 /* Pointer in ascii is the token. */ 626 char token[sizeof(watch) * 2 + 1]; 627 int err; 628 629 sprintf(token, "%lX", (long)watch); 630 631 down_read(&xs_state.watch_mutex); 632 633 spin_lock(&watches_lock); 634 BUG_ON(find_watch(token)); 635 list_add(&watch->list, &watches); 636 spin_unlock(&watches_lock); 637 638 err = xs_watch(watch->node, token); 639 640 /* Ignore errors due to multiple registration. */ 641 if ((err != 0) && (err != -EEXIST)) { 642 spin_lock(&watches_lock); 643 list_del(&watch->list); 644 spin_unlock(&watches_lock); 645 } 646 647 up_read(&xs_state.watch_mutex); 648 649 return err; 650} 651EXPORT_SYMBOL_GPL(register_xenbus_watch); 652 653void unregister_xenbus_watch(struct xenbus_watch *watch) 654{ 655 struct xs_stored_msg *msg, *tmp; 656 char token[sizeof(watch) * 2 + 1]; 657 int err; 658 659 sprintf(token, "%lX", (long)watch); 660 661 down_read(&xs_state.watch_mutex); 662 663 spin_lock(&watches_lock); 664 BUG_ON(!find_watch(token)); 665 list_del(&watch->list); 666 spin_unlock(&watches_lock); 667 668 err = xs_unwatch(watch->node, token); 669 if (err) 670 printk(KERN_WARNING 671 "XENBUS Failed to release watch %s: %i\n", 672 watch->node, err); 673 674 up_read(&xs_state.watch_mutex); 675 676 /* Make sure there are no callbacks running currently (unless 677 its us) */ 678 if (current->pid != xenwatch_pid) 679 mutex_lock(&xenwatch_mutex); 680 681 /* Cancel pending watch events. */ 682 spin_lock(&watch_events_lock); 683 list_for_each_entry_safe(msg, tmp, &watch_events, list) { 684 if (msg->u.watch.handle != watch) 685 continue; 686 list_del(&msg->list); 687 kfree(msg->u.watch.vec); 688 kfree(msg); 689 } 690 spin_unlock(&watch_events_lock); 691 692 if (current->pid != xenwatch_pid) 693 mutex_unlock(&xenwatch_mutex); 694} 695EXPORT_SYMBOL_GPL(unregister_xenbus_watch); 696 697void xs_suspend(void) 698{ 699 transaction_suspend(); 700 down_write(&xs_state.watch_mutex); 701 mutex_lock(&xs_state.request_mutex); 702 mutex_lock(&xs_state.response_mutex); 703} 704 705void xs_resume(void) 706{ 707 struct xenbus_watch *watch; 708 char token[sizeof(watch) * 2 + 1]; 709 710 xb_init_comms(); 711 712 mutex_unlock(&xs_state.response_mutex); 713 mutex_unlock(&xs_state.request_mutex); 714 transaction_resume(); 715 716 /* No need for watches_lock: the watch_mutex is sufficient. */ 717 list_for_each_entry(watch, &watches, list) { 718 sprintf(token, "%lX", (long)watch); 719 xs_watch(watch->node, token); 720 } 721 722 up_write(&xs_state.watch_mutex); 723} 724 725void xs_suspend_cancel(void) 726{ 727 mutex_unlock(&xs_state.response_mutex); 728 mutex_unlock(&xs_state.request_mutex); 729 up_write(&xs_state.watch_mutex); 730 mutex_unlock(&xs_state.transaction_mutex); 731} 732 733static int xenwatch_thread(void *unused) 734{ 735 struct list_head *ent; 736 struct xs_stored_msg *msg; 737 738 for (;;) { 739 wait_event_interruptible(watch_events_waitq, 740 !list_empty(&watch_events)); 741 742 if (kthread_should_stop()) 743 break; 744 745 mutex_lock(&xenwatch_mutex); 746 747 spin_lock(&watch_events_lock); 748 ent = watch_events.next; 749 if (ent != &watch_events) 750 list_del(ent); 751 spin_unlock(&watch_events_lock); 752 753 if (ent != &watch_events) { 754 msg = list_entry(ent, struct xs_stored_msg, list); 755 msg->u.watch.handle->callback( 756 msg->u.watch.handle, 757 (const char **)msg->u.watch.vec, 758 msg->u.watch.vec_size); 759 kfree(msg->u.watch.vec); 760 kfree(msg); 761 } 762 763 mutex_unlock(&xenwatch_mutex); 764 } 765 766 return 0; 767} 768 769static int process_msg(void) 770{ 771 struct xs_stored_msg *msg; 772 char *body; 773 int err; 774 775 /* 776 * We must disallow save/restore while reading a xenstore message. 777 * A partial read across s/r leaves us out of sync with xenstored. 778 */ 779 for (;;) { 780 err = xb_wait_for_data_to_read(); 781 if (err) 782 return err; 783 mutex_lock(&xs_state.response_mutex); 784 if (xb_data_to_read()) 785 break; 786 /* We raced with save/restore: pending data 'disappeared'. */ 787 mutex_unlock(&xs_state.response_mutex); 788 } 789 790 791 msg = kmalloc(sizeof(*msg), GFP_NOIO | __GFP_HIGH); 792 if (msg == NULL) { 793 err = -ENOMEM; 794 goto out; 795 } 796 797 err = xb_read(&msg->hdr, sizeof(msg->hdr)); 798 if (err) { 799 kfree(msg); 800 goto out; 801 } 802 803 body = kmalloc(msg->hdr.len + 1, GFP_NOIO | __GFP_HIGH); 804 if (body == NULL) { 805 kfree(msg); 806 err = -ENOMEM; 807 goto out; 808 } 809 810 err = xb_read(body, msg->hdr.len); 811 if (err) { 812 kfree(body); 813 kfree(msg); 814 goto out; 815 } 816 body[msg->hdr.len] = '\0'; 817 818 if (msg->hdr.type == XS_WATCH_EVENT) { 819 msg->u.watch.vec = split(body, msg->hdr.len, 820 &msg->u.watch.vec_size); 821 if (IS_ERR(msg->u.watch.vec)) { 822 err = PTR_ERR(msg->u.watch.vec); 823 kfree(msg); 824 goto out; 825 } 826 827 spin_lock(&watches_lock); 828 msg->u.watch.handle = find_watch( 829 msg->u.watch.vec[XS_WATCH_TOKEN]); 830 if (msg->u.watch.handle != NULL) { 831 spin_lock(&watch_events_lock); 832 list_add_tail(&msg->list, &watch_events); 833 wake_up(&watch_events_waitq); 834 spin_unlock(&watch_events_lock); 835 } else { 836 kfree(msg->u.watch.vec); 837 kfree(msg); 838 } 839 spin_unlock(&watches_lock); 840 } else { 841 msg->u.reply.body = body; 842 spin_lock(&xs_state.reply_lock); 843 list_add_tail(&msg->list, &xs_state.reply_list); 844 spin_unlock(&xs_state.reply_lock); 845 wake_up(&xs_state.reply_waitq); 846 } 847 848 out: 849 mutex_unlock(&xs_state.response_mutex); 850 return err; 851} 852 853static int xenbus_thread(void *unused) 854{ 855 int err; 856 857 for (;;) { 858 err = process_msg(); 859 if (err) 860 printk(KERN_WARNING "XENBUS error %d while reading " 861 "message\n", err); 862 if (kthread_should_stop()) 863 break; 864 } 865 866 return 0; 867} 868 869int xs_init(void) 870{ 871 int err; 872 struct task_struct *task; 873 874 INIT_LIST_HEAD(&xs_state.reply_list); 875 spin_lock_init(&xs_state.reply_lock); 876 init_waitqueue_head(&xs_state.reply_waitq); 877 878 mutex_init(&xs_state.request_mutex); 879 mutex_init(&xs_state.response_mutex); 880 mutex_init(&xs_state.transaction_mutex); 881 init_rwsem(&xs_state.watch_mutex); 882 atomic_set(&xs_state.transaction_count, 0); 883 init_waitqueue_head(&xs_state.transaction_wq); 884 885 /* Initialize the shared memory rings to talk to xenstored */ 886 err = xb_init_comms(); 887 if (err) 888 return err; 889 890 task = kthread_run(xenwatch_thread, NULL, "xenwatch"); 891 if (IS_ERR(task)) 892 return PTR_ERR(task); 893 xenwatch_pid = task->pid; 894 895 task = kthread_run(xenbus_thread, NULL, "xenbus"); 896 if (IS_ERR(task)) 897 return PTR_ERR(task); 898 899 return 0; 900} 901