1/* 2 * This file is subject to the terms and conditions of the GNU General Public 3 * License. See the file "COPYING" in the main directory of this archive 4 * for more details. 5 * 6 * Copyright (c) 2004-2006 Silicon Graphics, Inc. All Rights Reserved. 7 */ 8 9 10/* 11 * Cross Partition Communication (XPC) channel support. 12 * 13 * This is the part of XPC that manages the channels and 14 * sends/receives messages across them to/from other partitions. 15 * 16 */ 17 18 19#include <linux/kernel.h> 20#include <linux/init.h> 21#include <linux/sched.h> 22#include <linux/cache.h> 23#include <linux/interrupt.h> 24#include <linux/mutex.h> 25#include <linux/completion.h> 26#include <asm/sn/bte.h> 27#include <asm/sn/sn_sal.h> 28#include <asm/sn/xpc.h> 29 30 31/* 32 * Guarantee that the kzalloc'd memory is cacheline aligned. 33 */ 34static void * 35xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base) 36{ 37 /* see if kzalloc will give us cachline aligned memory by default */ 38 *base = kzalloc(size, flags); 39 if (*base == NULL) { 40 return NULL; 41 } 42 if ((u64) *base == L1_CACHE_ALIGN((u64) *base)) { 43 return *base; 44 } 45 kfree(*base); 46 47 /* nope, we'll have to do it ourselves */ 48 *base = kzalloc(size + L1_CACHE_BYTES, flags); 49 if (*base == NULL) { 50 return NULL; 51 } 52 return (void *) L1_CACHE_ALIGN((u64) *base); 53} 54 55 56/* 57 * Set up the initial values for the XPartition Communication channels. 58 */ 59static void 60xpc_initialize_channels(struct xpc_partition *part, partid_t partid) 61{ 62 int ch_number; 63 struct xpc_channel *ch; 64 65 66 for (ch_number = 0; ch_number < part->nchannels; ch_number++) { 67 ch = &part->channels[ch_number]; 68 69 ch->partid = partid; 70 ch->number = ch_number; 71 ch->flags = XPC_C_DISCONNECTED; 72 73 ch->local_GP = &part->local_GPs[ch_number]; 74 ch->local_openclose_args = 75 &part->local_openclose_args[ch_number]; 76 77 atomic_set(&ch->kthreads_assigned, 0); 78 atomic_set(&ch->kthreads_idle, 0); 79 atomic_set(&ch->kthreads_active, 0); 80 81 atomic_set(&ch->references, 0); 82 atomic_set(&ch->n_to_notify, 0); 83 84 spin_lock_init(&ch->lock); 85 mutex_init(&ch->msg_to_pull_mutex); 86 init_completion(&ch->wdisconnect_wait); 87 88 atomic_set(&ch->n_on_msg_allocate_wq, 0); 89 init_waitqueue_head(&ch->msg_allocate_wq); 90 init_waitqueue_head(&ch->idle_wq); 91 } 92} 93 94 95/* 96 * Setup the infrastructure necessary to support XPartition Communication 97 * between the specified remote partition and the local one. 98 */ 99enum xpc_retval 100xpc_setup_infrastructure(struct xpc_partition *part) 101{ 102 int ret, cpuid; 103 struct timer_list *timer; 104 partid_t partid = XPC_PARTID(part); 105 106 107 /* 108 * Zero out MOST of the entry for this partition. Only the fields 109 * starting with `nchannels' will be zeroed. The preceding fields must 110 * remain `viable' across partition ups and downs, since they may be 111 * referenced during this memset() operation. 112 */ 113 memset(&part->nchannels, 0, sizeof(struct xpc_partition) - 114 offsetof(struct xpc_partition, nchannels)); 115 116 /* 117 * Allocate all of the channel structures as a contiguous chunk of 118 * memory. 119 */ 120 part->channels = kzalloc(sizeof(struct xpc_channel) * XPC_NCHANNELS, 121 GFP_KERNEL); 122 if (part->channels == NULL) { 123 dev_err(xpc_chan, "can't get memory for channels\n"); 124 return xpcNoMemory; 125 } 126 127 part->nchannels = XPC_NCHANNELS; 128 129 130 /* allocate all the required GET/PUT values */ 131 132 part->local_GPs = xpc_kzalloc_cacheline_aligned(XPC_GP_SIZE, 133 GFP_KERNEL, &part->local_GPs_base); 134 if (part->local_GPs == NULL) { 135 kfree(part->channels); 136 part->channels = NULL; 137 dev_err(xpc_chan, "can't get memory for local get/put " 138 "values\n"); 139 return xpcNoMemory; 140 } 141 142 part->remote_GPs = xpc_kzalloc_cacheline_aligned(XPC_GP_SIZE, 143 GFP_KERNEL, &part->remote_GPs_base); 144 if (part->remote_GPs == NULL) { 145 dev_err(xpc_chan, "can't get memory for remote get/put " 146 "values\n"); 147 kfree(part->local_GPs_base); 148 part->local_GPs = NULL; 149 kfree(part->channels); 150 part->channels = NULL; 151 return xpcNoMemory; 152 } 153 154 155 /* allocate all the required open and close args */ 156 157 part->local_openclose_args = xpc_kzalloc_cacheline_aligned( 158 XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL, 159 &part->local_openclose_args_base); 160 if (part->local_openclose_args == NULL) { 161 dev_err(xpc_chan, "can't get memory for local connect args\n"); 162 kfree(part->remote_GPs_base); 163 part->remote_GPs = NULL; 164 kfree(part->local_GPs_base); 165 part->local_GPs = NULL; 166 kfree(part->channels); 167 part->channels = NULL; 168 return xpcNoMemory; 169 } 170 171 part->remote_openclose_args = xpc_kzalloc_cacheline_aligned( 172 XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL, 173 &part->remote_openclose_args_base); 174 if (part->remote_openclose_args == NULL) { 175 dev_err(xpc_chan, "can't get memory for remote connect args\n"); 176 kfree(part->local_openclose_args_base); 177 part->local_openclose_args = NULL; 178 kfree(part->remote_GPs_base); 179 part->remote_GPs = NULL; 180 kfree(part->local_GPs_base); 181 part->local_GPs = NULL; 182 kfree(part->channels); 183 part->channels = NULL; 184 return xpcNoMemory; 185 } 186 187 188 xpc_initialize_channels(part, partid); 189 190 atomic_set(&part->nchannels_active, 0); 191 atomic_set(&part->nchannels_engaged, 0); 192 193 194 /* local_IPI_amo were set to 0 by an earlier memset() */ 195 196 /* Initialize this partitions AMO_t structure */ 197 part->local_IPI_amo_va = xpc_IPI_init(partid); 198 199 spin_lock_init(&part->IPI_lock); 200 201 atomic_set(&part->channel_mgr_requests, 1); 202 init_waitqueue_head(&part->channel_mgr_wq); 203 204 sprintf(part->IPI_owner, "xpc%02d", partid); 205 ret = request_irq(SGI_XPC_NOTIFY, xpc_notify_IRQ_handler, IRQF_SHARED, 206 part->IPI_owner, (void *) (u64) partid); 207 if (ret != 0) { 208 dev_err(xpc_chan, "can't register NOTIFY IRQ handler, " 209 "errno=%d\n", -ret); 210 kfree(part->remote_openclose_args_base); 211 part->remote_openclose_args = NULL; 212 kfree(part->local_openclose_args_base); 213 part->local_openclose_args = NULL; 214 kfree(part->remote_GPs_base); 215 part->remote_GPs = NULL; 216 kfree(part->local_GPs_base); 217 part->local_GPs = NULL; 218 kfree(part->channels); 219 part->channels = NULL; 220 return xpcLackOfResources; 221 } 222 223 /* Setup a timer to check for dropped IPIs */ 224 timer = &part->dropped_IPI_timer; 225 init_timer(timer); 226 timer->function = (void (*)(unsigned long)) xpc_dropped_IPI_check; 227 timer->data = (unsigned long) part; 228 timer->expires = jiffies + XPC_P_DROPPED_IPI_WAIT; 229 add_timer(timer); 230 231 /* 232 * With the setting of the partition setup_state to XPC_P_SETUP, we're 233 * declaring that this partition is ready to go. 234 */ 235 part->setup_state = XPC_P_SETUP; 236 237 238 /* 239 * Setup the per partition specific variables required by the 240 * remote partition to establish channel connections with us. 241 * 242 * The setting of the magic # indicates that these per partition 243 * specific variables are ready to be used. 244 */ 245 xpc_vars_part[partid].GPs_pa = __pa(part->local_GPs); 246 xpc_vars_part[partid].openclose_args_pa = 247 __pa(part->local_openclose_args); 248 xpc_vars_part[partid].IPI_amo_pa = __pa(part->local_IPI_amo_va); 249 cpuid = raw_smp_processor_id(); /* any CPU in this partition will do */ 250 xpc_vars_part[partid].IPI_nasid = cpuid_to_nasid(cpuid); 251 xpc_vars_part[partid].IPI_phys_cpuid = cpu_physical_id(cpuid); 252 xpc_vars_part[partid].nchannels = part->nchannels; 253 xpc_vars_part[partid].magic = XPC_VP_MAGIC1; 254 255 return xpcSuccess; 256} 257 258 259/* 260 * Create a wrapper that hides the underlying mechanism for pulling a cacheline 261 * (or multiple cachelines) from a remote partition. 262 * 263 * src must be a cacheline aligned physical address on the remote partition. 264 * dst must be a cacheline aligned virtual address on this partition. 265 * cnt must be an cacheline sized 266 */ 267static enum xpc_retval 268xpc_pull_remote_cachelines(struct xpc_partition *part, void *dst, 269 const void *src, size_t cnt) 270{ 271 bte_result_t bte_ret; 272 273 274 DBUG_ON((u64) src != L1_CACHE_ALIGN((u64) src)); 275 DBUG_ON((u64) dst != L1_CACHE_ALIGN((u64) dst)); 276 DBUG_ON(cnt != L1_CACHE_ALIGN(cnt)); 277 278 if (part->act_state == XPC_P_DEACTIVATING) { 279 return part->reason; 280 } 281 282 bte_ret = xp_bte_copy((u64) src, (u64) dst, (u64) cnt, 283 (BTE_NORMAL | BTE_WACQUIRE), NULL); 284 if (bte_ret == BTE_SUCCESS) { 285 return xpcSuccess; 286 } 287 288 dev_dbg(xpc_chan, "xp_bte_copy() from partition %d failed, ret=%d\n", 289 XPC_PARTID(part), bte_ret); 290 291 return xpc_map_bte_errors(bte_ret); 292} 293 294 295/* 296 * Pull the remote per partition specific variables from the specified 297 * partition. 298 */ 299enum xpc_retval 300xpc_pull_remote_vars_part(struct xpc_partition *part) 301{ 302 u8 buffer[L1_CACHE_BYTES * 2]; 303 struct xpc_vars_part *pulled_entry_cacheline = 304 (struct xpc_vars_part *) L1_CACHE_ALIGN((u64) buffer); 305 struct xpc_vars_part *pulled_entry; 306 u64 remote_entry_cacheline_pa, remote_entry_pa; 307 partid_t partid = XPC_PARTID(part); 308 enum xpc_retval ret; 309 310 311 /* pull the cacheline that contains the variables we're interested in */ 312 313 DBUG_ON(part->remote_vars_part_pa != 314 L1_CACHE_ALIGN(part->remote_vars_part_pa)); 315 DBUG_ON(sizeof(struct xpc_vars_part) != L1_CACHE_BYTES / 2); 316 317 remote_entry_pa = part->remote_vars_part_pa + 318 sn_partition_id * sizeof(struct xpc_vars_part); 319 320 remote_entry_cacheline_pa = (remote_entry_pa & ~(L1_CACHE_BYTES - 1)); 321 322 pulled_entry = (struct xpc_vars_part *) ((u64) pulled_entry_cacheline + 323 (remote_entry_pa & (L1_CACHE_BYTES - 1))); 324 325 ret = xpc_pull_remote_cachelines(part, pulled_entry_cacheline, 326 (void *) remote_entry_cacheline_pa, 327 L1_CACHE_BYTES); 328 if (ret != xpcSuccess) { 329 dev_dbg(xpc_chan, "failed to pull XPC vars_part from " 330 "partition %d, ret=%d\n", partid, ret); 331 return ret; 332 } 333 334 335 /* see if they've been set up yet */ 336 337 if (pulled_entry->magic != XPC_VP_MAGIC1 && 338 pulled_entry->magic != XPC_VP_MAGIC2) { 339 340 if (pulled_entry->magic != 0) { 341 dev_dbg(xpc_chan, "partition %d's XPC vars_part for " 342 "partition %d has bad magic value (=0x%lx)\n", 343 partid, sn_partition_id, pulled_entry->magic); 344 return xpcBadMagic; 345 } 346 347 /* they've not been initialized yet */ 348 return xpcRetry; 349 } 350 351 if (xpc_vars_part[partid].magic == XPC_VP_MAGIC1) { 352 353 /* validate the variables */ 354 355 if (pulled_entry->GPs_pa == 0 || 356 pulled_entry->openclose_args_pa == 0 || 357 pulled_entry->IPI_amo_pa == 0) { 358 359 dev_err(xpc_chan, "partition %d's XPC vars_part for " 360 "partition %d are not valid\n", partid, 361 sn_partition_id); 362 return xpcInvalidAddress; 363 } 364 365 /* the variables we imported look to be valid */ 366 367 part->remote_GPs_pa = pulled_entry->GPs_pa; 368 part->remote_openclose_args_pa = 369 pulled_entry->openclose_args_pa; 370 part->remote_IPI_amo_va = 371 (AMO_t *) __va(pulled_entry->IPI_amo_pa); 372 part->remote_IPI_nasid = pulled_entry->IPI_nasid; 373 part->remote_IPI_phys_cpuid = pulled_entry->IPI_phys_cpuid; 374 375 if (part->nchannels > pulled_entry->nchannels) { 376 part->nchannels = pulled_entry->nchannels; 377 } 378 379 /* let the other side know that we've pulled their variables */ 380 381 xpc_vars_part[partid].magic = XPC_VP_MAGIC2; 382 } 383 384 if (pulled_entry->magic == XPC_VP_MAGIC1) { 385 return xpcRetry; 386 } 387 388 return xpcSuccess; 389} 390 391 392/* 393 * Get the IPI flags and pull the openclose args and/or remote GPs as needed. 394 */ 395static u64 396xpc_get_IPI_flags(struct xpc_partition *part) 397{ 398 unsigned long irq_flags; 399 u64 IPI_amo; 400 enum xpc_retval ret; 401 402 403 /* 404 * See if there are any IPI flags to be handled. 405 */ 406 407 spin_lock_irqsave(&part->IPI_lock, irq_flags); 408 if ((IPI_amo = part->local_IPI_amo) != 0) { 409 part->local_IPI_amo = 0; 410 } 411 spin_unlock_irqrestore(&part->IPI_lock, irq_flags); 412 413 414 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_amo)) { 415 ret = xpc_pull_remote_cachelines(part, 416 part->remote_openclose_args, 417 (void *) part->remote_openclose_args_pa, 418 XPC_OPENCLOSE_ARGS_SIZE); 419 if (ret != xpcSuccess) { 420 XPC_DEACTIVATE_PARTITION(part, ret); 421 422 dev_dbg(xpc_chan, "failed to pull openclose args from " 423 "partition %d, ret=%d\n", XPC_PARTID(part), 424 ret); 425 426 /* don't bother processing IPIs anymore */ 427 IPI_amo = 0; 428 } 429 } 430 431 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_amo)) { 432 ret = xpc_pull_remote_cachelines(part, part->remote_GPs, 433 (void *) part->remote_GPs_pa, 434 XPC_GP_SIZE); 435 if (ret != xpcSuccess) { 436 XPC_DEACTIVATE_PARTITION(part, ret); 437 438 dev_dbg(xpc_chan, "failed to pull GPs from partition " 439 "%d, ret=%d\n", XPC_PARTID(part), ret); 440 441 /* don't bother processing IPIs anymore */ 442 IPI_amo = 0; 443 } 444 } 445 446 return IPI_amo; 447} 448 449 450/* 451 * Allocate the local message queue and the notify queue. 452 */ 453static enum xpc_retval 454xpc_allocate_local_msgqueue(struct xpc_channel *ch) 455{ 456 unsigned long irq_flags; 457 int nentries; 458 size_t nbytes; 459 460 461 // >>> may want to check for ch->flags & XPC_C_DISCONNECTING between 462 // >>> iterations of the for-loop, bail if set? 463 464 // >>> should we impose a minimum #of entries? like 4 or 8? 465 for (nentries = ch->local_nentries; nentries > 0; nentries--) { 466 467 nbytes = nentries * ch->msg_size; 468 ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes, 469 GFP_KERNEL, 470 &ch->local_msgqueue_base); 471 if (ch->local_msgqueue == NULL) { 472 continue; 473 } 474 475 nbytes = nentries * sizeof(struct xpc_notify); 476 ch->notify_queue = kzalloc(nbytes, GFP_KERNEL); 477 if (ch->notify_queue == NULL) { 478 kfree(ch->local_msgqueue_base); 479 ch->local_msgqueue = NULL; 480 continue; 481 } 482 483 spin_lock_irqsave(&ch->lock, irq_flags); 484 if (nentries < ch->local_nentries) { 485 dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, " 486 "partid=%d, channel=%d\n", nentries, 487 ch->local_nentries, ch->partid, ch->number); 488 489 ch->local_nentries = nentries; 490 } 491 spin_unlock_irqrestore(&ch->lock, irq_flags); 492 return xpcSuccess; 493 } 494 495 dev_dbg(xpc_chan, "can't get memory for local message queue and notify " 496 "queue, partid=%d, channel=%d\n", ch->partid, ch->number); 497 return xpcNoMemory; 498} 499 500 501/* 502 * Allocate the cached remote message queue. 503 */ 504static enum xpc_retval 505xpc_allocate_remote_msgqueue(struct xpc_channel *ch) 506{ 507 unsigned long irq_flags; 508 int nentries; 509 size_t nbytes; 510 511 512 DBUG_ON(ch->remote_nentries <= 0); 513 514 // >>> may want to check for ch->flags & XPC_C_DISCONNECTING between 515 // >>> iterations of the for-loop, bail if set? 516 517 // >>> should we impose a minimum #of entries? like 4 or 8? 518 for (nentries = ch->remote_nentries; nentries > 0; nentries--) { 519 520 nbytes = nentries * ch->msg_size; 521 ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes, 522 GFP_KERNEL, 523 &ch->remote_msgqueue_base); 524 if (ch->remote_msgqueue == NULL) { 525 continue; 526 } 527 528 spin_lock_irqsave(&ch->lock, irq_flags); 529 if (nentries < ch->remote_nentries) { 530 dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, " 531 "partid=%d, channel=%d\n", nentries, 532 ch->remote_nentries, ch->partid, ch->number); 533 534 ch->remote_nentries = nentries; 535 } 536 spin_unlock_irqrestore(&ch->lock, irq_flags); 537 return xpcSuccess; 538 } 539 540 dev_dbg(xpc_chan, "can't get memory for cached remote message queue, " 541 "partid=%d, channel=%d\n", ch->partid, ch->number); 542 return xpcNoMemory; 543} 544 545 546/* 547 * Allocate message queues and other stuff associated with a channel. 548 * 549 * Note: Assumes all of the channel sizes are filled in. 550 */ 551static enum xpc_retval 552xpc_allocate_msgqueues(struct xpc_channel *ch) 553{ 554 unsigned long irq_flags; 555 enum xpc_retval ret; 556 557 558 DBUG_ON(ch->flags & XPC_C_SETUP); 559 560 if ((ret = xpc_allocate_local_msgqueue(ch)) != xpcSuccess) { 561 return ret; 562 } 563 564 if ((ret = xpc_allocate_remote_msgqueue(ch)) != xpcSuccess) { 565 kfree(ch->local_msgqueue_base); 566 ch->local_msgqueue = NULL; 567 kfree(ch->notify_queue); 568 ch->notify_queue = NULL; 569 return ret; 570 } 571 572 spin_lock_irqsave(&ch->lock, irq_flags); 573 ch->flags |= XPC_C_SETUP; 574 spin_unlock_irqrestore(&ch->lock, irq_flags); 575 576 return xpcSuccess; 577} 578 579 580/* 581 * Process a connect message from a remote partition. 582 * 583 * Note: xpc_process_connect() is expecting to be called with the 584 * spin_lock_irqsave held and will leave it locked upon return. 585 */ 586static void 587xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags) 588{ 589 enum xpc_retval ret; 590 591 592 DBUG_ON(!spin_is_locked(&ch->lock)); 593 594 if (!(ch->flags & XPC_C_OPENREQUEST) || 595 !(ch->flags & XPC_C_ROPENREQUEST)) { 596 /* nothing more to do for now */ 597 return; 598 } 599 DBUG_ON(!(ch->flags & XPC_C_CONNECTING)); 600 601 if (!(ch->flags & XPC_C_SETUP)) { 602 spin_unlock_irqrestore(&ch->lock, *irq_flags); 603 ret = xpc_allocate_msgqueues(ch); 604 spin_lock_irqsave(&ch->lock, *irq_flags); 605 606 if (ret != xpcSuccess) { 607 XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags); 608 } 609 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING)) { 610 return; 611 } 612 613 DBUG_ON(!(ch->flags & XPC_C_SETUP)); 614 DBUG_ON(ch->local_msgqueue == NULL); 615 DBUG_ON(ch->remote_msgqueue == NULL); 616 } 617 618 if (!(ch->flags & XPC_C_OPENREPLY)) { 619 ch->flags |= XPC_C_OPENREPLY; 620 xpc_IPI_send_openreply(ch, irq_flags); 621 } 622 623 if (!(ch->flags & XPC_C_ROPENREPLY)) { 624 return; 625 } 626 627 DBUG_ON(ch->remote_msgqueue_pa == 0); 628 629 ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP); /* clear all else */ 630 631 dev_info(xpc_chan, "channel %d to partition %d connected\n", 632 ch->number, ch->partid); 633 634 spin_unlock_irqrestore(&ch->lock, *irq_flags); 635 xpc_create_kthreads(ch, 1, 0); 636 spin_lock_irqsave(&ch->lock, *irq_flags); 637} 638 639 640/* 641 * Notify those who wanted to be notified upon delivery of their message. 642 */ 643static void 644xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put) 645{ 646 struct xpc_notify *notify; 647 u8 notify_type; 648 s64 get = ch->w_remote_GP.get - 1; 649 650 651 while (++get < put && atomic_read(&ch->n_to_notify) > 0) { 652 653 notify = &ch->notify_queue[get % ch->local_nentries]; 654 655 /* 656 * See if the notify entry indicates it was associated with 657 * a message who's sender wants to be notified. It is possible 658 * that it is, but someone else is doing or has done the 659 * notification. 660 */ 661 notify_type = notify->type; 662 if (notify_type == 0 || 663 cmpxchg(¬ify->type, notify_type, 0) != 664 notify_type) { 665 continue; 666 } 667 668 DBUG_ON(notify_type != XPC_N_CALL); 669 670 atomic_dec(&ch->n_to_notify); 671 672 if (notify->func != NULL) { 673 dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, " 674 "msg_number=%ld, partid=%d, channel=%d\n", 675 (void *) notify, get, ch->partid, ch->number); 676 677 notify->func(reason, ch->partid, ch->number, 678 notify->key); 679 680 dev_dbg(xpc_chan, "notify->func() returned, " 681 "notify=0x%p, msg_number=%ld, partid=%d, " 682 "channel=%d\n", (void *) notify, get, 683 ch->partid, ch->number); 684 } 685 } 686} 687 688 689/* 690 * Free up message queues and other stuff that were allocated for the specified 691 * channel. 692 * 693 * Note: ch->reason and ch->reason_line are left set for debugging purposes, 694 * they're cleared when XPC_C_DISCONNECTED is cleared. 695 */ 696static void 697xpc_free_msgqueues(struct xpc_channel *ch) 698{ 699 DBUG_ON(!spin_is_locked(&ch->lock)); 700 DBUG_ON(atomic_read(&ch->n_to_notify) != 0); 701 702 ch->remote_msgqueue_pa = 0; 703 ch->func = NULL; 704 ch->key = NULL; 705 ch->msg_size = 0; 706 ch->local_nentries = 0; 707 ch->remote_nentries = 0; 708 ch->kthreads_assigned_limit = 0; 709 ch->kthreads_idle_limit = 0; 710 711 ch->local_GP->get = 0; 712 ch->local_GP->put = 0; 713 ch->remote_GP.get = 0; 714 ch->remote_GP.put = 0; 715 ch->w_local_GP.get = 0; 716 ch->w_local_GP.put = 0; 717 ch->w_remote_GP.get = 0; 718 ch->w_remote_GP.put = 0; 719 ch->next_msg_to_pull = 0; 720 721 if (ch->flags & XPC_C_SETUP) { 722 ch->flags &= ~XPC_C_SETUP; 723 724 dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n", 725 ch->flags, ch->partid, ch->number); 726 727 kfree(ch->local_msgqueue_base); 728 ch->local_msgqueue = NULL; 729 kfree(ch->remote_msgqueue_base); 730 ch->remote_msgqueue = NULL; 731 kfree(ch->notify_queue); 732 ch->notify_queue = NULL; 733 } 734} 735 736 737/* 738 * spin_lock_irqsave() is expected to be held on entry. 739 */ 740static void 741xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags) 742{ 743 struct xpc_partition *part = &xpc_partitions[ch->partid]; 744 u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED); 745 746 747 DBUG_ON(!spin_is_locked(&ch->lock)); 748 749 if (!(ch->flags & XPC_C_DISCONNECTING)) { 750 return; 751 } 752 753 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); 754 755 /* make sure all activity has settled down first */ 756 757 if (atomic_read(&ch->kthreads_assigned) > 0 || 758 atomic_read(&ch->references) > 0) { 759 return; 760 } 761 DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) && 762 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE)); 763 764 if (part->act_state == XPC_P_DEACTIVATING) { 765 /* can't proceed until the other side disengages from us */ 766 if (xpc_partition_engaged(1UL << ch->partid)) { 767 return; 768 } 769 770 } else { 771 772 /* as long as the other side is up do the full protocol */ 773 774 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) { 775 return; 776 } 777 778 if (!(ch->flags & XPC_C_CLOSEREPLY)) { 779 ch->flags |= XPC_C_CLOSEREPLY; 780 xpc_IPI_send_closereply(ch, irq_flags); 781 } 782 783 if (!(ch->flags & XPC_C_RCLOSEREPLY)) { 784 return; 785 } 786 } 787 788 /* wake those waiting for notify completion */ 789 if (atomic_read(&ch->n_to_notify) > 0) { 790 /* >>> we do callout while holding ch->lock */ 791 xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put); 792 } 793 794 /* both sides are disconnected now */ 795 796 if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) { 797 spin_unlock_irqrestore(&ch->lock, *irq_flags); 798 xpc_disconnect_callout(ch, xpcDisconnected); 799 spin_lock_irqsave(&ch->lock, *irq_flags); 800 } 801 802 /* it's now safe to free the channel's message queues */ 803 xpc_free_msgqueues(ch); 804 805 /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */ 806 ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT)); 807 808 atomic_dec(&part->nchannels_active); 809 810 if (channel_was_connected) { 811 dev_info(xpc_chan, "channel %d to partition %d disconnected, " 812 "reason=%d\n", ch->number, ch->partid, ch->reason); 813 } 814 815 if (ch->flags & XPC_C_WDISCONNECT) { 816 /* we won't lose the CPU since we're holding ch->lock */ 817 complete(&ch->wdisconnect_wait); 818 } else if (ch->delayed_IPI_flags) { 819 if (part->act_state != XPC_P_DEACTIVATING) { 820 /* time to take action on any delayed IPI flags */ 821 spin_lock(&part->IPI_lock); 822 XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number, 823 ch->delayed_IPI_flags); 824 spin_unlock(&part->IPI_lock); 825 } 826 ch->delayed_IPI_flags = 0; 827 } 828} 829 830 831/* 832 * Process a change in the channel's remote connection state. 833 */ 834static void 835xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, 836 u8 IPI_flags) 837{ 838 unsigned long irq_flags; 839 struct xpc_openclose_args *args = 840 &part->remote_openclose_args[ch_number]; 841 struct xpc_channel *ch = &part->channels[ch_number]; 842 enum xpc_retval reason; 843 844 845 846 spin_lock_irqsave(&ch->lock, irq_flags); 847 848again: 849 850 if ((ch->flags & XPC_C_DISCONNECTED) && 851 (ch->flags & XPC_C_WDISCONNECT)) { 852 /* 853 * Delay processing IPI flags until thread waiting disconnect 854 * has had a chance to see that the channel is disconnected. 855 */ 856 ch->delayed_IPI_flags |= IPI_flags; 857 spin_unlock_irqrestore(&ch->lock, irq_flags); 858 return; 859 } 860 861 862 if (IPI_flags & XPC_IPI_CLOSEREQUEST) { 863 864 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received " 865 "from partid=%d, channel=%d\n", args->reason, 866 ch->partid, ch->number); 867 868 /* 869 * If RCLOSEREQUEST is set, we're probably waiting for 870 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed 871 * with this RCLOSEREQUEST in the IPI_flags. 872 */ 873 874 if (ch->flags & XPC_C_RCLOSEREQUEST) { 875 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING)); 876 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); 877 DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY)); 878 DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY); 879 880 DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY)); 881 IPI_flags &= ~XPC_IPI_CLOSEREPLY; 882 ch->flags |= XPC_C_RCLOSEREPLY; 883 884 /* both sides have finished disconnecting */ 885 xpc_process_disconnect(ch, &irq_flags); 886 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED)); 887 goto again; 888 } 889 890 if (ch->flags & XPC_C_DISCONNECTED) { 891 if (!(IPI_flags & XPC_IPI_OPENREQUEST)) { 892 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, 893 ch_number) & XPC_IPI_OPENREQUEST)) { 894 895 DBUG_ON(ch->delayed_IPI_flags != 0); 896 spin_lock(&part->IPI_lock); 897 XPC_SET_IPI_FLAGS(part->local_IPI_amo, 898 ch_number, 899 XPC_IPI_CLOSEREQUEST); 900 spin_unlock(&part->IPI_lock); 901 } 902 spin_unlock_irqrestore(&ch->lock, irq_flags); 903 return; 904 } 905 906 XPC_SET_REASON(ch, 0, 0); 907 ch->flags &= ~XPC_C_DISCONNECTED; 908 909 atomic_inc(&part->nchannels_active); 910 ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST); 911 } 912 913 IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY); 914 915 /* 916 * The meaningful CLOSEREQUEST connection state fields are: 917 * reason = reason connection is to be closed 918 */ 919 920 ch->flags |= XPC_C_RCLOSEREQUEST; 921 922 if (!(ch->flags & XPC_C_DISCONNECTING)) { 923 reason = args->reason; 924 if (reason <= xpcSuccess || reason > xpcUnknownReason) { 925 reason = xpcUnknownReason; 926 } else if (reason == xpcUnregistering) { 927 reason = xpcOtherUnregistering; 928 } 929 930 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags); 931 932 DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY); 933 spin_unlock_irqrestore(&ch->lock, irq_flags); 934 return; 935 } 936 937 xpc_process_disconnect(ch, &irq_flags); 938 } 939 940 941 if (IPI_flags & XPC_IPI_CLOSEREPLY) { 942 943 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d," 944 " channel=%d\n", ch->partid, ch->number); 945 946 if (ch->flags & XPC_C_DISCONNECTED) { 947 DBUG_ON(part->act_state != XPC_P_DEACTIVATING); 948 spin_unlock_irqrestore(&ch->lock, irq_flags); 949 return; 950 } 951 952 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); 953 954 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) { 955 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number) 956 & XPC_IPI_CLOSEREQUEST)) { 957 958 DBUG_ON(ch->delayed_IPI_flags != 0); 959 spin_lock(&part->IPI_lock); 960 XPC_SET_IPI_FLAGS(part->local_IPI_amo, 961 ch_number, XPC_IPI_CLOSEREPLY); 962 spin_unlock(&part->IPI_lock); 963 } 964 spin_unlock_irqrestore(&ch->lock, irq_flags); 965 return; 966 } 967 968 ch->flags |= XPC_C_RCLOSEREPLY; 969 970 if (ch->flags & XPC_C_CLOSEREPLY) { 971 /* both sides have finished disconnecting */ 972 xpc_process_disconnect(ch, &irq_flags); 973 } 974 } 975 976 977 if (IPI_flags & XPC_IPI_OPENREQUEST) { 978 979 dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, " 980 "local_nentries=%d) received from partid=%d, " 981 "channel=%d\n", args->msg_size, args->local_nentries, 982 ch->partid, ch->number); 983 984 if (part->act_state == XPC_P_DEACTIVATING || 985 (ch->flags & XPC_C_ROPENREQUEST)) { 986 spin_unlock_irqrestore(&ch->lock, irq_flags); 987 return; 988 } 989 990 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) { 991 ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST; 992 spin_unlock_irqrestore(&ch->lock, irq_flags); 993 return; 994 } 995 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED | 996 XPC_C_OPENREQUEST))); 997 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY | 998 XPC_C_OPENREPLY | XPC_C_CONNECTED)); 999 1000 /* 1001 * The meaningful OPENREQUEST connection state fields are: 1002 * msg_size = size of channel's messages in bytes 1003 * local_nentries = remote partition's local_nentries 1004 */ 1005 if (args->msg_size == 0 || args->local_nentries == 0) { 1006 /* assume OPENREQUEST was delayed by mistake */ 1007 spin_unlock_irqrestore(&ch->lock, irq_flags); 1008 return; 1009 } 1010 1011 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING); 1012 ch->remote_nentries = args->local_nentries; 1013 1014 1015 if (ch->flags & XPC_C_OPENREQUEST) { 1016 if (args->msg_size != ch->msg_size) { 1017 XPC_DISCONNECT_CHANNEL(ch, xpcUnequalMsgSizes, 1018 &irq_flags); 1019 spin_unlock_irqrestore(&ch->lock, irq_flags); 1020 return; 1021 } 1022 } else { 1023 ch->msg_size = args->msg_size; 1024 1025 XPC_SET_REASON(ch, 0, 0); 1026 ch->flags &= ~XPC_C_DISCONNECTED; 1027 1028 atomic_inc(&part->nchannels_active); 1029 } 1030 1031 xpc_process_connect(ch, &irq_flags); 1032 } 1033 1034 1035 if (IPI_flags & XPC_IPI_OPENREPLY) { 1036 1037 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, " 1038 "local_nentries=%d, remote_nentries=%d) received from " 1039 "partid=%d, channel=%d\n", args->local_msgqueue_pa, 1040 args->local_nentries, args->remote_nentries, 1041 ch->partid, ch->number); 1042 1043 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) { 1044 spin_unlock_irqrestore(&ch->lock, irq_flags); 1045 return; 1046 } 1047 if (!(ch->flags & XPC_C_OPENREQUEST)) { 1048 XPC_DISCONNECT_CHANNEL(ch, xpcOpenCloseError, 1049 &irq_flags); 1050 spin_unlock_irqrestore(&ch->lock, irq_flags); 1051 return; 1052 } 1053 1054 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST)); 1055 DBUG_ON(ch->flags & XPC_C_CONNECTED); 1056 1057 /* 1058 * The meaningful OPENREPLY connection state fields are: 1059 * local_msgqueue_pa = physical address of remote 1060 * partition's local_msgqueue 1061 * local_nentries = remote partition's local_nentries 1062 * remote_nentries = remote partition's remote_nentries 1063 */ 1064 DBUG_ON(args->local_msgqueue_pa == 0); 1065 DBUG_ON(args->local_nentries == 0); 1066 DBUG_ON(args->remote_nentries == 0); 1067 1068 ch->flags |= XPC_C_ROPENREPLY; 1069 ch->remote_msgqueue_pa = args->local_msgqueue_pa; 1070 1071 if (args->local_nentries < ch->remote_nentries) { 1072 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new " 1073 "remote_nentries=%d, old remote_nentries=%d, " 1074 "partid=%d, channel=%d\n", 1075 args->local_nentries, ch->remote_nentries, 1076 ch->partid, ch->number); 1077 1078 ch->remote_nentries = args->local_nentries; 1079 } 1080 if (args->remote_nentries < ch->local_nentries) { 1081 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new " 1082 "local_nentries=%d, old local_nentries=%d, " 1083 "partid=%d, channel=%d\n", 1084 args->remote_nentries, ch->local_nentries, 1085 ch->partid, ch->number); 1086 1087 ch->local_nentries = args->remote_nentries; 1088 } 1089 1090 xpc_process_connect(ch, &irq_flags); 1091 } 1092 1093 spin_unlock_irqrestore(&ch->lock, irq_flags); 1094} 1095 1096 1097/* 1098 * Attempt to establish a channel connection to a remote partition. 1099 */ 1100static enum xpc_retval 1101xpc_connect_channel(struct xpc_channel *ch) 1102{ 1103 unsigned long irq_flags; 1104 struct xpc_registration *registration = &xpc_registrations[ch->number]; 1105 1106 1107 if (mutex_trylock(®istration->mutex) == 0) { 1108 return xpcRetry; 1109 } 1110 1111 if (!XPC_CHANNEL_REGISTERED(ch->number)) { 1112 mutex_unlock(®istration->mutex); 1113 return xpcUnregistered; 1114 } 1115 1116 spin_lock_irqsave(&ch->lock, irq_flags); 1117 1118 DBUG_ON(ch->flags & XPC_C_CONNECTED); 1119 DBUG_ON(ch->flags & XPC_C_OPENREQUEST); 1120 1121 if (ch->flags & XPC_C_DISCONNECTING) { 1122 spin_unlock_irqrestore(&ch->lock, irq_flags); 1123 mutex_unlock(®istration->mutex); 1124 return ch->reason; 1125 } 1126 1127 1128 /* add info from the channel connect registration to the channel */ 1129 1130 ch->kthreads_assigned_limit = registration->assigned_limit; 1131 ch->kthreads_idle_limit = registration->idle_limit; 1132 DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0); 1133 DBUG_ON(atomic_read(&ch->kthreads_idle) != 0); 1134 DBUG_ON(atomic_read(&ch->kthreads_active) != 0); 1135 1136 ch->func = registration->func; 1137 DBUG_ON(registration->func == NULL); 1138 ch->key = registration->key; 1139 1140 ch->local_nentries = registration->nentries; 1141 1142 if (ch->flags & XPC_C_ROPENREQUEST) { 1143 if (registration->msg_size != ch->msg_size) { 1144 /* the local and remote sides aren't the same */ 1145 1146 /* 1147 * Because XPC_DISCONNECT_CHANNEL() can block we're 1148 * forced to up the registration sema before we unlock 1149 * the channel lock. But that's okay here because we're 1150 * done with the part that required the registration 1151 * sema. XPC_DISCONNECT_CHANNEL() requires that the 1152 * channel lock be locked and will unlock and relock 1153 * the channel lock as needed. 1154 */ 1155 mutex_unlock(®istration->mutex); 1156 XPC_DISCONNECT_CHANNEL(ch, xpcUnequalMsgSizes, 1157 &irq_flags); 1158 spin_unlock_irqrestore(&ch->lock, irq_flags); 1159 return xpcUnequalMsgSizes; 1160 } 1161 } else { 1162 ch->msg_size = registration->msg_size; 1163 1164 XPC_SET_REASON(ch, 0, 0); 1165 ch->flags &= ~XPC_C_DISCONNECTED; 1166 1167 atomic_inc(&xpc_partitions[ch->partid].nchannels_active); 1168 } 1169 1170 mutex_unlock(®istration->mutex); 1171 1172 1173 /* initiate the connection */ 1174 1175 ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING); 1176 xpc_IPI_send_openrequest(ch, &irq_flags); 1177 1178 xpc_process_connect(ch, &irq_flags); 1179 1180 spin_unlock_irqrestore(&ch->lock, irq_flags); 1181 1182 return xpcSuccess; 1183} 1184 1185 1186/* 1187 * Clear some of the msg flags in the local message queue. 1188 */ 1189static inline void 1190xpc_clear_local_msgqueue_flags(struct xpc_channel *ch) 1191{ 1192 struct xpc_msg *msg; 1193 s64 get; 1194 1195 1196 get = ch->w_remote_GP.get; 1197 do { 1198 msg = (struct xpc_msg *) ((u64) ch->local_msgqueue + 1199 (get % ch->local_nentries) * ch->msg_size); 1200 msg->flags = 0; 1201 } while (++get < (volatile s64) ch->remote_GP.get); 1202} 1203 1204 1205/* 1206 * Clear some of the msg flags in the remote message queue. 1207 */ 1208static inline void 1209xpc_clear_remote_msgqueue_flags(struct xpc_channel *ch) 1210{ 1211 struct xpc_msg *msg; 1212 s64 put; 1213 1214 1215 put = ch->w_remote_GP.put; 1216 do { 1217 msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue + 1218 (put % ch->remote_nentries) * ch->msg_size); 1219 msg->flags = 0; 1220 } while (++put < (volatile s64) ch->remote_GP.put); 1221} 1222 1223 1224static void 1225xpc_process_msg_IPI(struct xpc_partition *part, int ch_number) 1226{ 1227 struct xpc_channel *ch = &part->channels[ch_number]; 1228 int nmsgs_sent; 1229 1230 1231 ch->remote_GP = part->remote_GPs[ch_number]; 1232 1233 1234 /* See what, if anything, has changed for each connected channel */ 1235 1236 xpc_msgqueue_ref(ch); 1237 1238 if (ch->w_remote_GP.get == ch->remote_GP.get && 1239 ch->w_remote_GP.put == ch->remote_GP.put) { 1240 /* nothing changed since GPs were last pulled */ 1241 xpc_msgqueue_deref(ch); 1242 return; 1243 } 1244 1245 if (!(ch->flags & XPC_C_CONNECTED)){ 1246 xpc_msgqueue_deref(ch); 1247 return; 1248 } 1249 1250 1251 /* 1252 * First check to see if messages recently sent by us have been 1253 * received by the other side. (The remote GET value will have 1254 * changed since we last looked at it.) 1255 */ 1256 1257 if (ch->w_remote_GP.get != ch->remote_GP.get) { 1258 1259 /* 1260 * We need to notify any senders that want to be notified 1261 * that their sent messages have been received by their 1262 * intended recipients. We need to do this before updating 1263 * w_remote_GP.get so that we don't allocate the same message 1264 * queue entries prematurely (see xpc_allocate_msg()). 1265 */ 1266 if (atomic_read(&ch->n_to_notify) > 0) { 1267 /* 1268 * Notify senders that messages sent have been 1269 * received and delivered by the other side. 1270 */ 1271 xpc_notify_senders(ch, xpcMsgDelivered, 1272 ch->remote_GP.get); 1273 } 1274 1275 /* 1276 * Clear msg->flags in previously sent messages, so that 1277 * they're ready for xpc_allocate_msg(). 1278 */ 1279 xpc_clear_local_msgqueue_flags(ch); 1280 1281 ch->w_remote_GP.get = ch->remote_GP.get; 1282 1283 dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, " 1284 "channel=%d\n", ch->w_remote_GP.get, ch->partid, 1285 ch->number); 1286 1287 /* 1288 * If anyone was waiting for message queue entries to become 1289 * available, wake them up. 1290 */ 1291 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) { 1292 wake_up(&ch->msg_allocate_wq); 1293 } 1294 } 1295 1296 1297 /* 1298 * Now check for newly sent messages by the other side. (The remote 1299 * PUT value will have changed since we last looked at it.) 1300 */ 1301 1302 if (ch->w_remote_GP.put != ch->remote_GP.put) { 1303 /* 1304 * Clear msg->flags in previously received messages, so that 1305 * they're ready for xpc_get_deliverable_msg(). 1306 */ 1307 xpc_clear_remote_msgqueue_flags(ch); 1308 1309 ch->w_remote_GP.put = ch->remote_GP.put; 1310 1311 dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, " 1312 "channel=%d\n", ch->w_remote_GP.put, ch->partid, 1313 ch->number); 1314 1315 nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get; 1316 if (nmsgs_sent > 0) { 1317 dev_dbg(xpc_chan, "msgs waiting to be copied and " 1318 "delivered=%d, partid=%d, channel=%d\n", 1319 nmsgs_sent, ch->partid, ch->number); 1320 1321 if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) { 1322 xpc_activate_kthreads(ch, nmsgs_sent); 1323 } 1324 } 1325 } 1326 1327 xpc_msgqueue_deref(ch); 1328} 1329 1330 1331void 1332xpc_process_channel_activity(struct xpc_partition *part) 1333{ 1334 unsigned long irq_flags; 1335 u64 IPI_amo, IPI_flags; 1336 struct xpc_channel *ch; 1337 int ch_number; 1338 u32 ch_flags; 1339 1340 1341 IPI_amo = xpc_get_IPI_flags(part); 1342 1343 /* 1344 * Initiate channel connections for registered channels. 1345 * 1346 * For each connected channel that has pending messages activate idle 1347 * kthreads and/or create new kthreads as needed. 1348 */ 1349 1350 for (ch_number = 0; ch_number < part->nchannels; ch_number++) { 1351 ch = &part->channels[ch_number]; 1352 1353 1354 /* 1355 * Process any open or close related IPI flags, and then deal 1356 * with connecting or disconnecting the channel as required. 1357 */ 1358 1359 IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number); 1360 1361 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags)) { 1362 xpc_process_openclose_IPI(part, ch_number, IPI_flags); 1363 } 1364 1365 ch_flags = ch->flags; /* need an atomic snapshot of flags */ 1366 1367 if (ch_flags & XPC_C_DISCONNECTING) { 1368 spin_lock_irqsave(&ch->lock, irq_flags); 1369 xpc_process_disconnect(ch, &irq_flags); 1370 spin_unlock_irqrestore(&ch->lock, irq_flags); 1371 continue; 1372 } 1373 1374 if (part->act_state == XPC_P_DEACTIVATING) { 1375 continue; 1376 } 1377 1378 if (!(ch_flags & XPC_C_CONNECTED)) { 1379 if (!(ch_flags & XPC_C_OPENREQUEST)) { 1380 DBUG_ON(ch_flags & XPC_C_SETUP); 1381 (void) xpc_connect_channel(ch); 1382 } else { 1383 spin_lock_irqsave(&ch->lock, irq_flags); 1384 xpc_process_connect(ch, &irq_flags); 1385 spin_unlock_irqrestore(&ch->lock, irq_flags); 1386 } 1387 continue; 1388 } 1389 1390 1391 /* 1392 * Process any message related IPI flags, this may involve the 1393 * activation of kthreads to deliver any pending messages sent 1394 * from the other partition. 1395 */ 1396 1397 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags)) { 1398 xpc_process_msg_IPI(part, ch_number); 1399 } 1400 } 1401} 1402 1403 1404/* 1405 * XPC's heartbeat code calls this function to inform XPC that a partition is 1406 * going down. XPC responds by tearing down the XPartition Communication 1407 * infrastructure used for the just downed partition. 1408 * 1409 * XPC's heartbeat code will never call this function and xpc_partition_up() 1410 * at the same time. Nor will it ever make multiple calls to either function 1411 * at the same time. 1412 */ 1413void 1414xpc_partition_going_down(struct xpc_partition *part, enum xpc_retval reason) 1415{ 1416 unsigned long irq_flags; 1417 int ch_number; 1418 struct xpc_channel *ch; 1419 1420 1421 dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n", 1422 XPC_PARTID(part), reason); 1423 1424 if (!xpc_part_ref(part)) { 1425 /* infrastructure for this partition isn't currently set up */ 1426 return; 1427 } 1428 1429 1430 /* disconnect channels associated with the partition going down */ 1431 1432 for (ch_number = 0; ch_number < part->nchannels; ch_number++) { 1433 ch = &part->channels[ch_number]; 1434 1435 xpc_msgqueue_ref(ch); 1436 spin_lock_irqsave(&ch->lock, irq_flags); 1437 1438 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags); 1439 1440 spin_unlock_irqrestore(&ch->lock, irq_flags); 1441 xpc_msgqueue_deref(ch); 1442 } 1443 1444 xpc_wakeup_channel_mgr(part); 1445 1446 xpc_part_deref(part); 1447} 1448 1449 1450/* 1451 * Teardown the infrastructure necessary to support XPartition Communication 1452 * between the specified remote partition and the local one. 1453 */ 1454void 1455xpc_teardown_infrastructure(struct xpc_partition *part) 1456{ 1457 partid_t partid = XPC_PARTID(part); 1458 1459 1460 /* 1461 * We start off by making this partition inaccessible to local 1462 * processes by marking it as no longer setup. Then we make it 1463 * inaccessible to remote processes by clearing the XPC per partition 1464 * specific variable's magic # (which indicates that these variables 1465 * are no longer valid) and by ignoring all XPC notify IPIs sent to 1466 * this partition. 1467 */ 1468 1469 DBUG_ON(atomic_read(&part->nchannels_engaged) != 0); 1470 DBUG_ON(atomic_read(&part->nchannels_active) != 0); 1471 DBUG_ON(part->setup_state != XPC_P_SETUP); 1472 part->setup_state = XPC_P_WTEARDOWN; 1473 1474 xpc_vars_part[partid].magic = 0; 1475 1476 1477 free_irq(SGI_XPC_NOTIFY, (void *) (u64) partid); 1478 1479 1480 /* 1481 * Before proceeding with the teardown we have to wait until all 1482 * existing references cease. 1483 */ 1484 wait_event(part->teardown_wq, (atomic_read(&part->references) == 0)); 1485 1486 1487 /* now we can begin tearing down the infrastructure */ 1488 1489 part->setup_state = XPC_P_TORNDOWN; 1490 1491 /* in case we've still got outstanding timers registered... */ 1492 del_timer_sync(&part->dropped_IPI_timer); 1493 1494 kfree(part->remote_openclose_args_base); 1495 part->remote_openclose_args = NULL; 1496 kfree(part->local_openclose_args_base); 1497 part->local_openclose_args = NULL; 1498 kfree(part->remote_GPs_base); 1499 part->remote_GPs = NULL; 1500 kfree(part->local_GPs_base); 1501 part->local_GPs = NULL; 1502 kfree(part->channels); 1503 part->channels = NULL; 1504 part->local_IPI_amo_va = NULL; 1505} 1506 1507 1508/* 1509 * Called by XP at the time of channel connection registration to cause 1510 * XPC to establish connections to all currently active partitions. 1511 */ 1512void 1513xpc_initiate_connect(int ch_number) 1514{ 1515 partid_t partid; 1516 struct xpc_partition *part; 1517 struct xpc_channel *ch; 1518 1519 1520 DBUG_ON(ch_number < 0 || ch_number >= XPC_NCHANNELS); 1521 1522 for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) { 1523 part = &xpc_partitions[partid]; 1524 1525 if (xpc_part_ref(part)) { 1526 ch = &part->channels[ch_number]; 1527 1528 /* 1529 * Initiate the establishment of a connection on the 1530 * newly registered channel to the remote partition. 1531 */ 1532 xpc_wakeup_channel_mgr(part); 1533 xpc_part_deref(part); 1534 } 1535 } 1536} 1537 1538 1539void 1540xpc_connected_callout(struct xpc_channel *ch) 1541{ 1542 /* let the registerer know that a connection has been established */ 1543 1544 if (ch->func != NULL) { 1545 dev_dbg(xpc_chan, "ch->func() called, reason=xpcConnected, " 1546 "partid=%d, channel=%d\n", ch->partid, ch->number); 1547 1548 ch->func(xpcConnected, ch->partid, ch->number, 1549 (void *) (u64) ch->local_nentries, ch->key); 1550 1551 dev_dbg(xpc_chan, "ch->func() returned, reason=xpcConnected, " 1552 "partid=%d, channel=%d\n", ch->partid, ch->number); 1553 } 1554} 1555 1556 1557/* 1558 * Called by XP at the time of channel connection unregistration to cause 1559 * XPC to teardown all current connections for the specified channel. 1560 * 1561 * Before returning xpc_initiate_disconnect() will wait until all connections 1562 * on the specified channel have been closed/torndown. So the caller can be 1563 * assured that they will not be receiving any more callouts from XPC to the 1564 * function they registered via xpc_connect(). 1565 * 1566 * Arguments: 1567 * 1568 * ch_number - channel # to unregister. 1569 */ 1570void 1571xpc_initiate_disconnect(int ch_number) 1572{ 1573 unsigned long irq_flags; 1574 partid_t partid; 1575 struct xpc_partition *part; 1576 struct xpc_channel *ch; 1577 1578 1579 DBUG_ON(ch_number < 0 || ch_number >= XPC_NCHANNELS); 1580 1581 /* initiate the channel disconnect for every active partition */ 1582 for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) { 1583 part = &xpc_partitions[partid]; 1584 1585 if (xpc_part_ref(part)) { 1586 ch = &part->channels[ch_number]; 1587 xpc_msgqueue_ref(ch); 1588 1589 spin_lock_irqsave(&ch->lock, irq_flags); 1590 1591 if (!(ch->flags & XPC_C_DISCONNECTED)) { 1592 ch->flags |= XPC_C_WDISCONNECT; 1593 1594 XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering, 1595 &irq_flags); 1596 } 1597 1598 spin_unlock_irqrestore(&ch->lock, irq_flags); 1599 1600 xpc_msgqueue_deref(ch); 1601 xpc_part_deref(part); 1602 } 1603 } 1604 1605 xpc_disconnect_wait(ch_number); 1606} 1607 1608 1609/* 1610 * To disconnect a channel, and reflect it back to all who may be waiting. 1611 * 1612 * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by 1613 * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by 1614 * xpc_disconnect_wait(). 1615 * 1616 * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN. 1617 */ 1618void 1619xpc_disconnect_channel(const int line, struct xpc_channel *ch, 1620 enum xpc_retval reason, unsigned long *irq_flags) 1621{ 1622 u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED); 1623 1624 1625 DBUG_ON(!spin_is_locked(&ch->lock)); 1626 1627 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) { 1628 return; 1629 } 1630 DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED))); 1631 1632 dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n", 1633 reason, line, ch->partid, ch->number); 1634 1635 XPC_SET_REASON(ch, reason, line); 1636 1637 ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING); 1638 /* some of these may not have been set */ 1639 ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY | 1640 XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY | 1641 XPC_C_CONNECTING | XPC_C_CONNECTED); 1642 1643 xpc_IPI_send_closerequest(ch, irq_flags); 1644 1645 if (channel_was_connected) { 1646 ch->flags |= XPC_C_WASCONNECTED; 1647 } 1648 1649 spin_unlock_irqrestore(&ch->lock, *irq_flags); 1650 1651 /* wake all idle kthreads so they can exit */ 1652 if (atomic_read(&ch->kthreads_idle) > 0) { 1653 wake_up_all(&ch->idle_wq); 1654 1655 } else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) && 1656 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) { 1657 /* start a kthread that will do the xpcDisconnecting callout */ 1658 xpc_create_kthreads(ch, 1, 1); 1659 } 1660 1661 /* wake those waiting to allocate an entry from the local msg queue */ 1662 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) { 1663 wake_up(&ch->msg_allocate_wq); 1664 } 1665 1666 spin_lock_irqsave(&ch->lock, *irq_flags); 1667} 1668 1669 1670void 1671xpc_disconnect_callout(struct xpc_channel *ch, enum xpc_retval reason) 1672{ 1673 /* 1674 * Let the channel's registerer know that the channel is being 1675 * disconnected. We don't want to do this if the registerer was never 1676 * informed of a connection being made. 1677 */ 1678 1679 if (ch->func != NULL) { 1680 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, " 1681 "channel=%d\n", reason, ch->partid, ch->number); 1682 1683 ch->func(reason, ch->partid, ch->number, NULL, ch->key); 1684 1685 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, " 1686 "channel=%d\n", reason, ch->partid, ch->number); 1687 } 1688} 1689 1690 1691/* 1692 * Wait for a message entry to become available for the specified channel, 1693 * but don't wait any longer than 1 jiffy. 1694 */ 1695static enum xpc_retval 1696xpc_allocate_msg_wait(struct xpc_channel *ch) 1697{ 1698 enum xpc_retval ret; 1699 1700 1701 if (ch->flags & XPC_C_DISCONNECTING) { 1702 DBUG_ON(ch->reason == xpcInterrupted); // >>> Is this true? 1703 return ch->reason; 1704 } 1705 1706 atomic_inc(&ch->n_on_msg_allocate_wq); 1707 ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1); 1708 atomic_dec(&ch->n_on_msg_allocate_wq); 1709 1710 if (ch->flags & XPC_C_DISCONNECTING) { 1711 ret = ch->reason; 1712 DBUG_ON(ch->reason == xpcInterrupted); // >>> Is this true? 1713 } else if (ret == 0) { 1714 ret = xpcTimeout; 1715 } else { 1716 ret = xpcInterrupted; 1717 } 1718 1719 return ret; 1720} 1721 1722 1723/* 1724 * Allocate an entry for a message from the message queue associated with the 1725 * specified channel. 1726 */ 1727static enum xpc_retval 1728xpc_allocate_msg(struct xpc_channel *ch, u32 flags, 1729 struct xpc_msg **address_of_msg) 1730{ 1731 struct xpc_msg *msg; 1732 enum xpc_retval ret; 1733 s64 put; 1734 1735 1736 /* this reference will be dropped in xpc_send_msg() */ 1737 xpc_msgqueue_ref(ch); 1738 1739 if (ch->flags & XPC_C_DISCONNECTING) { 1740 xpc_msgqueue_deref(ch); 1741 return ch->reason; 1742 } 1743 if (!(ch->flags & XPC_C_CONNECTED)) { 1744 xpc_msgqueue_deref(ch); 1745 return xpcNotConnected; 1746 } 1747 1748 1749 /* 1750 * Get the next available message entry from the local message queue. 1751 * If none are available, we'll make sure that we grab the latest 1752 * GP values. 1753 */ 1754 ret = xpcTimeout; 1755 1756 while (1) { 1757 1758 put = (volatile s64) ch->w_local_GP.put; 1759 if (put - (volatile s64) ch->w_remote_GP.get < 1760 ch->local_nentries) { 1761 1762 /* There are available message entries. We need to try 1763 * to secure one for ourselves. We'll do this by trying 1764 * to increment w_local_GP.put as long as someone else 1765 * doesn't beat us to it. If they do, we'll have to 1766 * try again. 1767 */ 1768 if (cmpxchg(&ch->w_local_GP.put, put, put + 1) == 1769 put) { 1770 /* we got the entry referenced by put */ 1771 break; 1772 } 1773 continue; /* try again */ 1774 } 1775 1776 1777 /* 1778 * There aren't any available msg entries at this time. 1779 * 1780 * In waiting for a message entry to become available, 1781 * we set a timeout in case the other side is not 1782 * sending completion IPIs. This lets us fake an IPI 1783 * that will cause the IPI handler to fetch the latest 1784 * GP values as if an IPI was sent by the other side. 1785 */ 1786 if (ret == xpcTimeout) { 1787 xpc_IPI_send_local_msgrequest(ch); 1788 } 1789 1790 if (flags & XPC_NOWAIT) { 1791 xpc_msgqueue_deref(ch); 1792 return xpcNoWait; 1793 } 1794 1795 ret = xpc_allocate_msg_wait(ch); 1796 if (ret != xpcInterrupted && ret != xpcTimeout) { 1797 xpc_msgqueue_deref(ch); 1798 return ret; 1799 } 1800 } 1801 1802 1803 /* get the message's address and initialize it */ 1804 msg = (struct xpc_msg *) ((u64) ch->local_msgqueue + 1805 (put % ch->local_nentries) * ch->msg_size); 1806 1807 1808 DBUG_ON(msg->flags != 0); 1809 msg->number = put; 1810 1811 dev_dbg(xpc_chan, "w_local_GP.put changed to %ld; msg=0x%p, " 1812 "msg_number=%ld, partid=%d, channel=%d\n", put + 1, 1813 (void *) msg, msg->number, ch->partid, ch->number); 1814 1815 *address_of_msg = msg; 1816 1817 return xpcSuccess; 1818} 1819 1820 1821/* 1822 * Allocate an entry for a message from the message queue associated with the 1823 * specified channel. NOTE that this routine can sleep waiting for a message 1824 * entry to become available. To not sleep, pass in the XPC_NOWAIT flag. 1825 * 1826 * Arguments: 1827 * 1828 * partid - ID of partition to which the channel is connected. 1829 * ch_number - channel #. 1830 * flags - see xpc.h for valid flags. 1831 * payload - address of the allocated payload area pointer (filled in on 1832 * return) in which the user-defined message is constructed. 1833 */ 1834enum xpc_retval 1835xpc_initiate_allocate(partid_t partid, int ch_number, u32 flags, void **payload) 1836{ 1837 struct xpc_partition *part = &xpc_partitions[partid]; 1838 enum xpc_retval ret = xpcUnknownReason; 1839 struct xpc_msg *msg = NULL; 1840 1841 1842 DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS); 1843 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 1844 1845 *payload = NULL; 1846 1847 if (xpc_part_ref(part)) { 1848 ret = xpc_allocate_msg(&part->channels[ch_number], flags, &msg); 1849 xpc_part_deref(part); 1850 1851 if (msg != NULL) { 1852 *payload = &msg->payload; 1853 } 1854 } 1855 1856 return ret; 1857} 1858 1859 1860/* 1861 * Now we actually send the messages that are ready to be sent by advancing 1862 * the local message queue's Put value and then send an IPI to the recipient 1863 * partition. 1864 */ 1865static void 1866xpc_send_msgs(struct xpc_channel *ch, s64 initial_put) 1867{ 1868 struct xpc_msg *msg; 1869 s64 put = initial_put + 1; 1870 int send_IPI = 0; 1871 1872 1873 while (1) { 1874 1875 while (1) { 1876 if (put == (volatile s64) ch->w_local_GP.put) { 1877 break; 1878 } 1879 1880 msg = (struct xpc_msg *) ((u64) ch->local_msgqueue + 1881 (put % ch->local_nentries) * ch->msg_size); 1882 1883 if (!(msg->flags & XPC_M_READY)) { 1884 break; 1885 } 1886 1887 put++; 1888 } 1889 1890 if (put == initial_put) { 1891 /* nothing's changed */ 1892 break; 1893 } 1894 1895 if (cmpxchg_rel(&ch->local_GP->put, initial_put, put) != 1896 initial_put) { 1897 /* someone else beat us to it */ 1898 DBUG_ON((volatile s64) ch->local_GP->put < initial_put); 1899 break; 1900 } 1901 1902 /* we just set the new value of local_GP->put */ 1903 1904 dev_dbg(xpc_chan, "local_GP->put changed to %ld, partid=%d, " 1905 "channel=%d\n", put, ch->partid, ch->number); 1906 1907 send_IPI = 1; 1908 1909 /* 1910 * We need to ensure that the message referenced by 1911 * local_GP->put is not XPC_M_READY or that local_GP->put 1912 * equals w_local_GP.put, so we'll go have a look. 1913 */ 1914 initial_put = put; 1915 } 1916 1917 if (send_IPI) { 1918 xpc_IPI_send_msgrequest(ch); 1919 } 1920} 1921 1922 1923/* 1924 * Common code that does the actual sending of the message by advancing the 1925 * local message queue's Put value and sends an IPI to the partition the 1926 * message is being sent to. 1927 */ 1928static enum xpc_retval 1929xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type, 1930 xpc_notify_func func, void *key) 1931{ 1932 enum xpc_retval ret = xpcSuccess; 1933 struct xpc_notify *notify = notify; 1934 s64 put, msg_number = msg->number; 1935 1936 1937 DBUG_ON(notify_type == XPC_N_CALL && func == NULL); 1938 DBUG_ON((((u64) msg - (u64) ch->local_msgqueue) / ch->msg_size) != 1939 msg_number % ch->local_nentries); 1940 DBUG_ON(msg->flags & XPC_M_READY); 1941 1942 if (ch->flags & XPC_C_DISCONNECTING) { 1943 /* drop the reference grabbed in xpc_allocate_msg() */ 1944 xpc_msgqueue_deref(ch); 1945 return ch->reason; 1946 } 1947 1948 if (notify_type != 0) { 1949 /* 1950 * Tell the remote side to send an ACK interrupt when the 1951 * message has been delivered. 1952 */ 1953 msg->flags |= XPC_M_INTERRUPT; 1954 1955 atomic_inc(&ch->n_to_notify); 1956 1957 notify = &ch->notify_queue[msg_number % ch->local_nentries]; 1958 notify->func = func; 1959 notify->key = key; 1960 notify->type = notify_type; 1961 1962 // >>> is a mb() needed here? 1963 1964 if (ch->flags & XPC_C_DISCONNECTING) { 1965 /* 1966 * An error occurred between our last error check and 1967 * this one. We will try to clear the type field from 1968 * the notify entry. If we succeed then 1969 * xpc_disconnect_channel() didn't already process 1970 * the notify entry. 1971 */ 1972 if (cmpxchg(¬ify->type, notify_type, 0) == 1973 notify_type) { 1974 atomic_dec(&ch->n_to_notify); 1975 ret = ch->reason; 1976 } 1977 1978 /* drop the reference grabbed in xpc_allocate_msg() */ 1979 xpc_msgqueue_deref(ch); 1980 return ret; 1981 } 1982 } 1983 1984 msg->flags |= XPC_M_READY; 1985 1986 /* 1987 * The preceding store of msg->flags must occur before the following 1988 * load of ch->local_GP->put. 1989 */ 1990 mb(); 1991 1992 /* see if the message is next in line to be sent, if so send it */ 1993 1994 put = ch->local_GP->put; 1995 if (put == msg_number) { 1996 xpc_send_msgs(ch, put); 1997 } 1998 1999 /* drop the reference grabbed in xpc_allocate_msg() */ 2000 xpc_msgqueue_deref(ch); 2001 return ret; 2002} 2003 2004 2005/* 2006 * Send a message previously allocated using xpc_initiate_allocate() on the 2007 * specified channel connected to the specified partition. 2008 * 2009 * This routine will not wait for the message to be received, nor will 2010 * notification be given when it does happen. Once this routine has returned 2011 * the message entry allocated via xpc_initiate_allocate() is no longer 2012 * accessable to the caller. 2013 * 2014 * This routine, although called by users, does not call xpc_part_ref() to 2015 * ensure that the partition infrastructure is in place. It relies on the 2016 * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg(). 2017 * 2018 * Arguments: 2019 * 2020 * partid - ID of partition to which the channel is connected. 2021 * ch_number - channel # to send message on. 2022 * payload - pointer to the payload area allocated via 2023 * xpc_initiate_allocate(). 2024 */ 2025enum xpc_retval 2026xpc_initiate_send(partid_t partid, int ch_number, void *payload) 2027{ 2028 struct xpc_partition *part = &xpc_partitions[partid]; 2029 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); 2030 enum xpc_retval ret; 2031 2032 2033 dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *) msg, 2034 partid, ch_number); 2035 2036 DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS); 2037 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 2038 DBUG_ON(msg == NULL); 2039 2040 ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL); 2041 2042 return ret; 2043} 2044 2045 2046/* 2047 * Send a message previously allocated using xpc_initiate_allocate on the 2048 * specified channel connected to the specified partition. 2049 * 2050 * This routine will not wait for the message to be sent. Once this routine 2051 * has returned the message entry allocated via xpc_initiate_allocate() is no 2052 * longer accessable to the caller. 2053 * 2054 * Once the remote end of the channel has received the message, the function 2055 * passed as an argument to xpc_initiate_send_notify() will be called. This 2056 * allows the sender to free up or re-use any buffers referenced by the 2057 * message, but does NOT mean the message has been processed at the remote 2058 * end by a receiver. 2059 * 2060 * If this routine returns an error, the caller's function will NOT be called. 2061 * 2062 * This routine, although called by users, does not call xpc_part_ref() to 2063 * ensure that the partition infrastructure is in place. It relies on the 2064 * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg(). 2065 * 2066 * Arguments: 2067 * 2068 * partid - ID of partition to which the channel is connected. 2069 * ch_number - channel # to send message on. 2070 * payload - pointer to the payload area allocated via 2071 * xpc_initiate_allocate(). 2072 * func - function to call with asynchronous notification of message 2073 * receipt. THIS FUNCTION MUST BE NON-BLOCKING. 2074 * key - user-defined key to be passed to the function when it's called. 2075 */ 2076enum xpc_retval 2077xpc_initiate_send_notify(partid_t partid, int ch_number, void *payload, 2078 xpc_notify_func func, void *key) 2079{ 2080 struct xpc_partition *part = &xpc_partitions[partid]; 2081 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); 2082 enum xpc_retval ret; 2083 2084 2085 dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *) msg, 2086 partid, ch_number); 2087 2088 DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS); 2089 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 2090 DBUG_ON(msg == NULL); 2091 DBUG_ON(func == NULL); 2092 2093 ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL, 2094 func, key); 2095 return ret; 2096} 2097 2098 2099static struct xpc_msg * 2100xpc_pull_remote_msg(struct xpc_channel *ch, s64 get) 2101{ 2102 struct xpc_partition *part = &xpc_partitions[ch->partid]; 2103 struct xpc_msg *remote_msg, *msg; 2104 u32 msg_index, nmsgs; 2105 u64 msg_offset; 2106 enum xpc_retval ret; 2107 2108 2109 if (mutex_lock_interruptible(&ch->msg_to_pull_mutex) != 0) { 2110 /* we were interrupted by a signal */ 2111 return NULL; 2112 } 2113 2114 while (get >= ch->next_msg_to_pull) { 2115 2116 /* pull as many messages as are ready and able to be pulled */ 2117 2118 msg_index = ch->next_msg_to_pull % ch->remote_nentries; 2119 2120 DBUG_ON(ch->next_msg_to_pull >= 2121 (volatile s64) ch->w_remote_GP.put); 2122 nmsgs = (volatile s64) ch->w_remote_GP.put - 2123 ch->next_msg_to_pull; 2124 if (msg_index + nmsgs > ch->remote_nentries) { 2125 /* ignore the ones that wrap the msg queue for now */ 2126 nmsgs = ch->remote_nentries - msg_index; 2127 } 2128 2129 msg_offset = msg_index * ch->msg_size; 2130 msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue + 2131 msg_offset); 2132 remote_msg = (struct xpc_msg *) (ch->remote_msgqueue_pa + 2133 msg_offset); 2134 2135 if ((ret = xpc_pull_remote_cachelines(part, msg, remote_msg, 2136 nmsgs * ch->msg_size)) != xpcSuccess) { 2137 2138 dev_dbg(xpc_chan, "failed to pull %d msgs starting with" 2139 " msg %ld from partition %d, channel=%d, " 2140 "ret=%d\n", nmsgs, ch->next_msg_to_pull, 2141 ch->partid, ch->number, ret); 2142 2143 XPC_DEACTIVATE_PARTITION(part, ret); 2144 2145 mutex_unlock(&ch->msg_to_pull_mutex); 2146 return NULL; 2147 } 2148 2149 mb(); /* >>> this may not be needed, we're not sure */ 2150 2151 ch->next_msg_to_pull += nmsgs; 2152 } 2153 2154 mutex_unlock(&ch->msg_to_pull_mutex); 2155 2156 /* return the message we were looking for */ 2157 msg_offset = (get % ch->remote_nentries) * ch->msg_size; 2158 msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue + msg_offset); 2159 2160 return msg; 2161} 2162 2163 2164/* 2165 * Get a message to be delivered. 2166 */ 2167static struct xpc_msg * 2168xpc_get_deliverable_msg(struct xpc_channel *ch) 2169{ 2170 struct xpc_msg *msg = NULL; 2171 s64 get; 2172 2173 2174 do { 2175 if ((volatile u32) ch->flags & XPC_C_DISCONNECTING) { 2176 break; 2177 } 2178 2179 get = (volatile s64) ch->w_local_GP.get; 2180 if (get == (volatile s64) ch->w_remote_GP.put) { 2181 break; 2182 } 2183 2184 /* There are messages waiting to be pulled and delivered. 2185 * We need to try to secure one for ourselves. We'll do this 2186 * by trying to increment w_local_GP.get and hope that no one 2187 * else beats us to it. If they do, we'll we'll simply have 2188 * to try again for the next one. 2189 */ 2190 2191 if (cmpxchg(&ch->w_local_GP.get, get, get + 1) == get) { 2192 /* we got the entry referenced by get */ 2193 2194 dev_dbg(xpc_chan, "w_local_GP.get changed to %ld, " 2195 "partid=%d, channel=%d\n", get + 1, 2196 ch->partid, ch->number); 2197 2198 /* pull the message from the remote partition */ 2199 2200 msg = xpc_pull_remote_msg(ch, get); 2201 2202 DBUG_ON(msg != NULL && msg->number != get); 2203 DBUG_ON(msg != NULL && (msg->flags & XPC_M_DONE)); 2204 DBUG_ON(msg != NULL && !(msg->flags & XPC_M_READY)); 2205 2206 break; 2207 } 2208 2209 } while (1); 2210 2211 return msg; 2212} 2213 2214 2215/* 2216 * Deliver a message to its intended recipient. 2217 */ 2218void 2219xpc_deliver_msg(struct xpc_channel *ch) 2220{ 2221 struct xpc_msg *msg; 2222 2223 2224 if ((msg = xpc_get_deliverable_msg(ch)) != NULL) { 2225 2226 /* 2227 * This ref is taken to protect the payload itself from being 2228 * freed before the user is finished with it, which the user 2229 * indicates by calling xpc_initiate_received(). 2230 */ 2231 xpc_msgqueue_ref(ch); 2232 2233 atomic_inc(&ch->kthreads_active); 2234 2235 if (ch->func != NULL) { 2236 dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, " 2237 "msg_number=%ld, partid=%d, channel=%d\n", 2238 (void *) msg, msg->number, ch->partid, 2239 ch->number); 2240 2241 /* deliver the message to its intended recipient */ 2242 ch->func(xpcMsgReceived, ch->partid, ch->number, 2243 &msg->payload, ch->key); 2244 2245 dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, " 2246 "msg_number=%ld, partid=%d, channel=%d\n", 2247 (void *) msg, msg->number, ch->partid, 2248 ch->number); 2249 } 2250 2251 atomic_dec(&ch->kthreads_active); 2252 } 2253} 2254 2255 2256/* 2257 * Now we actually acknowledge the messages that have been delivered and ack'd 2258 * by advancing the cached remote message queue's Get value and if requested 2259 * send an IPI to the message sender's partition. 2260 */ 2261static void 2262xpc_acknowledge_msgs(struct xpc_channel *ch, s64 initial_get, u8 msg_flags) 2263{ 2264 struct xpc_msg *msg; 2265 s64 get = initial_get + 1; 2266 int send_IPI = 0; 2267 2268 2269 while (1) { 2270 2271 while (1) { 2272 if (get == (volatile s64) ch->w_local_GP.get) { 2273 break; 2274 } 2275 2276 msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue + 2277 (get % ch->remote_nentries) * ch->msg_size); 2278 2279 if (!(msg->flags & XPC_M_DONE)) { 2280 break; 2281 } 2282 2283 msg_flags |= msg->flags; 2284 get++; 2285 } 2286 2287 if (get == initial_get) { 2288 /* nothing's changed */ 2289 break; 2290 } 2291 2292 if (cmpxchg_rel(&ch->local_GP->get, initial_get, get) != 2293 initial_get) { 2294 /* someone else beat us to it */ 2295 DBUG_ON((volatile s64) ch->local_GP->get <= 2296 initial_get); 2297 break; 2298 } 2299 2300 /* we just set the new value of local_GP->get */ 2301 2302 dev_dbg(xpc_chan, "local_GP->get changed to %ld, partid=%d, " 2303 "channel=%d\n", get, ch->partid, ch->number); 2304 2305 send_IPI = (msg_flags & XPC_M_INTERRUPT); 2306 2307 /* 2308 * We need to ensure that the message referenced by 2309 * local_GP->get is not XPC_M_DONE or that local_GP->get 2310 * equals w_local_GP.get, so we'll go have a look. 2311 */ 2312 initial_get = get; 2313 } 2314 2315 if (send_IPI) { 2316 xpc_IPI_send_msgrequest(ch); 2317 } 2318} 2319 2320 2321/* 2322 * Acknowledge receipt of a delivered message. 2323 * 2324 * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition 2325 * that sent the message. 2326 * 2327 * This function, although called by users, does not call xpc_part_ref() to 2328 * ensure that the partition infrastructure is in place. It relies on the 2329 * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg(). 2330 * 2331 * Arguments: 2332 * 2333 * partid - ID of partition to which the channel is connected. 2334 * ch_number - channel # message received on. 2335 * payload - pointer to the payload area allocated via 2336 * xpc_initiate_allocate(). 2337 */ 2338void 2339xpc_initiate_received(partid_t partid, int ch_number, void *payload) 2340{ 2341 struct xpc_partition *part = &xpc_partitions[partid]; 2342 struct xpc_channel *ch; 2343 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); 2344 s64 get, msg_number = msg->number; 2345 2346 2347 DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS); 2348 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 2349 2350 ch = &part->channels[ch_number]; 2351 2352 dev_dbg(xpc_chan, "msg=0x%p, msg_number=%ld, partid=%d, channel=%d\n", 2353 (void *) msg, msg_number, ch->partid, ch->number); 2354 2355 DBUG_ON((((u64) msg - (u64) ch->remote_msgqueue) / ch->msg_size) != 2356 msg_number % ch->remote_nentries); 2357 DBUG_ON(msg->flags & XPC_M_DONE); 2358 2359 msg->flags |= XPC_M_DONE; 2360 2361 /* 2362 * The preceding store of msg->flags must occur before the following 2363 * load of ch->local_GP->get. 2364 */ 2365 mb(); 2366 2367 /* 2368 * See if this message is next in line to be acknowledged as having 2369 * been delivered. 2370 */ 2371 get = ch->local_GP->get; 2372 if (get == msg_number) { 2373 xpc_acknowledge_msgs(ch, get, msg->flags); 2374 } 2375 2376 /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg() */ 2377 xpc_msgqueue_deref(ch); 2378} 2379