1/* 2 * Copyright (c) 2010 Apple Inc. All rights reserved. 3 * 4 * @APPLE_LICENSE_HEADER_START@ 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the name of Apple Inc. ("Apple") nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY 20 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 22 * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY 23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 26 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 * 30 * Portions of this software have been released under the following terms: 31 * 32 * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC. 33 * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY 34 * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION 35 * 36 * To anyone who acknowledges that this file is provided "AS IS" 37 * without any express or implied warranty: 38 * permission to use, copy, modify, and distribute this file for any 39 * purpose is hereby granted without fee, provided that the above 40 * copyright notices and this notice appears in all source code copies, 41 * and that none of the names of Open Software Foundation, Inc., Hewlett- 42 * Packard Company or Digital Equipment Corporation be used 43 * in advertising or publicity pertaining to distribution of the software 44 * without specific, written prior permission. Neither Open Software 45 * Foundation, Inc., Hewlett-Packard Company nor Digital 46 * Equipment Corporation makes any representations about the suitability 47 * of this software for any purpose. 48 * 49 * Copyright (c) 2007, Novell, Inc. All rights reserved. 50 * Redistribution and use in source and binary forms, with or without 51 * modification, are permitted provided that the following conditions 52 * are met: 53 * 54 * 1. Redistributions of source code must retain the above copyright 55 * notice, this list of conditions and the following disclaimer. 56 * 2. Redistributions in binary form must reproduce the above copyright 57 * notice, this list of conditions and the following disclaimer in the 58 * documentation and/or other materials provided with the distribution. 59 * 3. Neither the name of Novell Inc. nor the names of its contributors 60 * may be used to endorse or promote products derived from this 61 * this software without specific prior written permission. 62 * 63 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY 64 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 65 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 66 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY 67 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 68 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 69 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 70 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 71 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 72 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 73 * 74 * @APPLE_LICENSE_HEADER_END@ 75 */ 76 77/* 78** 79** NAME: 80** 81** dgcall.c 82** 83** FACILITY: 84** 85** Remote Procedure Call (RPC) 86** 87** ABSTRACT: 88** 89** DG protocol service routines. Handles call (CALL) handles. 90** 91** 92*/ 93 94#include <dg.h> 95#include <dgrq.h> 96#include <dgxq.h> 97#include <dgpkt.h> 98#include <dgccall.h> 99#include <dgcall.h> 100#include <dgexec.h> 101#include <comauth.h> 102 103/* 104 * R P C _ _ D G _ C A L L _ S T A T E _ N A M E 105 * 106 * Return the text name of an activity state. This can't simply be a variable 107 * because of the vagaries of global libraries. 108 */ 109 110#ifdef DEBUG 111 112PRIVATE const char *rpc__dg_call_state_name 113( 114 rpc_dg_call_state_t state 115) 116{ 117 static const char *names[] = { 118 "init", 119 "xmit", 120 "recv", 121 "final", 122 "idle", 123 "orphan" 124 }; 125 126 if ((int)state > (int)rpc_e_dg_cs_orphan) 127 return("unknown"); 128 129 return(names[(int) state]); 130} 131 132#endif /* DEBUG */ 133 134/* 135 * R P C _ _ D G _ C A L L _ X M I T _ F A C K 136 * 137 * Transmit a "fack" appropriate to the state of the passed "call". 138 * 139 * Note that it's possible that a call might want to send a fack 140 * but not currently be holding onto an RQE (if the call just got 141 * dequeued, for example). In such a case, it can send NULL for the 142 * rqe parameter. 143 */ 144 145PRIVATE void rpc__dg_call_xmit_fack 146( 147 rpc_dg_call_p_t call, 148 rpc_dg_recvq_elt_p_t rqe, 149 boolean32 is_nocall 150) 151{ 152 unsigned32 *maskp = NULL, num_elements = 0, i, shift; 153 rpc_socket_iovec_t iov[3]; 154 rpc_dg_pkt_hdr_t hdr; 155#ifndef MISPACKED_HDR 156 rpc_dg_fackpkt_body_t body; 157#else 158 rpc_dg_raw_fackpkt_body_t body; 159#endif 160 boolean b; 161 rpc_dg_recvq_p_t rq = &call->rq; 162 rpc_dg_xmitq_p_t xq = &call->xq; 163 rpc_dg_recvq_elt_p_t rqe_p; 164 unsigned32 is_rationing; 165 unsigned32 low_on_pkts; 166 167 RPC_DG_CALL_LOCK_ASSERT(call); 168 169 /* 170 * Create a pkt header initialized with the prototype's contents. 171 */ 172 173 hdr = xq->hdr; 174 175 RPC_DG_HDR_SET_PTYPE(&hdr, is_nocall ? RPC_C_DG_PT_NOCALL : RPC_C_DG_PT_FACK); 176 177 hdr.flags = 0; 178 hdr.len = RPC_C_DG_RAW_FACKPKT_BODY_SIZE; 179 hdr.fragnum = rq->next_fragnum - 1; 180 181 /* 182 * Create the fack packet's body. 183 */ 184 185#ifndef MISPACKED_HDR 186 body.vers = RPC_C_DG_FACKPKT_BODY_VERS; 187 body.pad1 = 0; 188 body.max_tsdu = xq->max_rcv_tsdu; 189 190 /* 191 * Before advertising our max_frag_size, make sure that we have 192 * enough packets reserved. We advertise what we can actually 193 * receive. 194 */ 195 if (call->n_resvs >= call->max_resvs) 196 { 197 /* 198 * We already have the maximum reservation. 199 */ 200 body.max_frag_size = xq->max_frag_size; 201 } 202 else if (RPC_DG_CALL_IS_SERVER(call) && 203 (call->n_resvs == 0 || ((rpc_dg_scall_p_t) call)->call_is_queued)) 204 { 205 /* 206 * If the scall doesn't have the reservation, it hasn't got to the 207 * execution phase yet. We will advertise the minimum fragment 208 * size. 209 * Also, a queued scall shouldn't increase the reservation. 210 * 211 * If this is a callback scall with the private socket, should we 212 * try the reservation? Probably not. 213 */ 214 body.max_frag_size = RPC_C_DG_MUST_RECV_FRAG_SIZE; 215 } 216 else 217 { 218 /* 219 * The ccall with the private socket may not have the reservation 220 * yet. We will try the reservation here because 1) we are in the 221 * receive state and 2) if we don't advertise larger fragment size, 222 * the large OUTs call can not take advantage of the multi-buffer 223 * fragment. 224 */ 225 226 if (rpc__dg_pkt_adjust_reservation(call, call->max_resvs, false)) 227 { 228 body.max_frag_size = xq->max_frag_size; 229 } 230 else if (call->n_resvs == 0) 231 { 232 /* 233 * The ccall with the private socket has no reservation. 234 */ 235 body.max_frag_size = RPC_C_DG_MUST_RECV_FRAG_SIZE; 236 } 237 else 238 { 239 /* 240 * We can receive whatever fits in the reserved packets. 241 */ 242 RPC_DG_NUM_PKTS_TO_FRAG_SIZE(call->n_resvs, body.max_frag_size); 243 } 244 } 245 246 if (rqe != NULL) 247 { 248 if (rqe->hdrp == NULL) 249 body.serial_num = call->rq.head_serial_num; 250 else 251 body.serial_num = 252 (rqe->hdrp->serial_hi << 8) | rqe->hdrp->serial_lo; 253 } 254 else 255 body.serial_num = call->rq.high_serial_num; 256 257 body.selack_len = 0; 258 body.selack[0] = 0; 259 260 /* 261 * Determine what to advertise as the window size. Ignoring other 262 * considerations, the window size is the amount of buffering we 263 * expect the underlying system to provide if we get behind in 264 * processing incoming data. 265 * 266 * However, there are certain situations in which we might want to 267 * advertise a smaller window size-- 268 * 269 * Queued calls are not allowed to add data to their receive 270 * queues. In this case we would advertise a window size of 271 * 0, to tell the sender to stop sending data. 272 * 273 * Server call threads that have not yet acquired a packet 274 * reservation. This situation is treated the same as for queued 275 * calls. Client threads without reservations are treated 276 * differently; the only way they could have proceeded to this 277 * stage (ie. receiving a response) is if they are using a private 278 * socket with private packets. 279 * 280 * When the system is rationing, advertise a window of 1, since 281 * this is all that a call is allowed to queue. 282 * 283 * If the packet's 'low_on_pkts' flag is set, advertise a window 284 * of 2. The low_on_pkts flag indicates that the number of free 285 * packets is less that two times the number of reservations. 286 * In such a case, it makes no sense to advertise a full window. 287 * 288 * Calls whose recvq length is approaching its maximum allowable 289 * value. In this case we don't want the sender to send any 290 * more than the amount of space available on the receive queue. 291 * 292 * We need to adjust the window size from the number of fragments 293 * to kilobytes as per AES. 294 * 295 * When talking to the older runtime, which expects the window size 296 * in the number of packets, the fack receiver may induce more data 297 * in the network. We don't think that this is a problem because the 298 * window size is an advisory information. 299 * 300 * If the max_frag_size is not a multiple of 1024, we advertise less 301 * than what we think, which is problematic in rationing and 302 * low_on_pkts cases because we may stop(!) the sender. Thus in 303 * these two cases we don't adjust the window size. 304 */ 305 is_rationing = rpc__dg_pkt_is_rationing(&low_on_pkts); 306 307 if (call->n_resvs == 0 && RPC_DG_CALL_IS_SERVER(call)) 308 { 309 body.window_size = 0; 310 } 311 else if (is_rationing) 312 { 313 body.window_size = 1; 314 } 315 else if (low_on_pkts) 316 { 317 body.window_size = 2; 318 } 319 else 320 { 321 /* 322 * If high_rcv_frag_size > RPC_C_DG_MUST_RECV_FRAG_SIZE, that means 323 * we have advertised MBF and the sender can cope with it. Thus, 324 * each slots on the recvq takes xq->max_frag_size. 325 * 326 * Otherwise, we make a conservative (or wild?) guess that the 327 * sender will send a single buffer fragment. 328 */ 329 if (rq->high_rcv_frag_size > RPC_C_DG_MUST_RECV_FRAG_SIZE) 330 { 331 body.window_size = ((rq->max_queue_len - rq->queue_len) 332 * xq->max_frag_size) >> 10; 333 } 334 else 335 { 336 body.window_size = ((rq->max_queue_len - rq->queue_len) 337 * RPC_C_DG_MUST_RECV_FRAG_SIZE) >> 10; 338 } 339 body.window_size = MIN(rq->window_size, body.window_size); 340 } 341 342 /* 343 * See if we need to send selective ack information. 344 */ 345 346 if (RPC_DG_FRAGNUM_IS_LT(rq->next_fragnum, rq->high_fragnum)) 347 { 348 /* 349 * Determine the number of elements in the selective ack mask 350 * array, worrying about wrap-around. If we need only one, just 351 * use the fack packet. Otherwise, we need to allocate an array 352 * to hold the entire mask. Either way, initialize the mask 353 * to 0. 354 */ 355 356 num_elements = (((unsigned16)(rq->high_fragnum - rq->next_fragnum)) 357 / 32) + 1; 358 359 if (num_elements == 1) 360 maskp = &body.selack[0]; 361 else 362 { 363 RPC_MEM_ALLOC(maskp, unsigned32 *, num_elements * sizeof *maskp, 364 RPC_C_MEM_DG_SELACK_MASK, RPC_C_MEM_NOWAIT); 365 } 366 for (i = 0; i < num_elements; i++) 367 *(maskp + i) = 0; 368 369 body.selack_len = num_elements; 370 371 /* 372 * Loop through the list of received packets which have fragnums 373 * greater than that which is explicitly mentioned in this fack, 374 * and add them to the selective ack bit mask. If there are 375 * in-order frags, they would have been explicitly acked, so 376 * start with the next frag. If there are no in-order frags, 377 * start at the head of the queue. 378 */ 379 380 rqe_p = (rq->last_inorder != NULL) ? rq->last_inorder->next : rq->head; 381 382 while (rqe_p != NULL ) 383 { 384 /* 385 * Calculate the number of bits we would have to shift the 386 * current fragnum into a bitmask of unlimited size, worrying 387 * about wrap- around. Munge the shift value to get the 388 * bit into the correct mask element. Currently, shift will 389 * almost always be less than 32, so the assignment is 390 * effectively 391 * *maskp &= 1 << shift; 392 */ 393 394 shift = rqe_p->hdrp->fragnum - (unsigned16) rq->next_fragnum; 395 *(maskp + (shift/32)) |= 1 << (shift%32); 396 rqe_p = rqe_p->next; 397 } 398 } 399#else 400 !!! 401#endif 402 403 /* 404 * Setup the iov and send the packet. 405 */ 406 407 iov[0].iov_base = (byte_p_t) &hdr; 408 iov[0].iov_len = RPC_C_DG_RAW_PKT_HDR_SIZE; 409 iov[1].iov_base = (byte_p_t) &body; 410 iov[1].iov_len = hdr.len; 411 412 if (num_elements > 1) 413 { 414 /* 415 * If we need to send multiple selective ack masks, subtract 416 * the length of the one built into the fack packet from the 417 * second iov element, and setup a third iov element to hold 418 * the extended mask. Note that in setting the header length, 419 * we need to remember that the fack body size includes the size 420 * of one mask element, thus the subtraction. 421 */ 422 423 iov[1].iov_len -= sizeof *maskp; 424 hdr.len = RPC_C_DG_RAW_FACKPKT_BODY_SIZE + 425 ((num_elements - 1) * sizeof *maskp); 426 iov[2].iov_base = (byte_p_t) maskp; 427 iov[2].iov_len = num_elements * sizeof *maskp; 428 } 429 430 rpc__dg_xmit_pkt(call->sock_ref->sock, call->addr, iov, num_elements > 1 ? 3 : 2, &b); 431 432 RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5, 433 ("(rpc__dg_call_xmit_fack) %d.%u ws %d, tsdu %u, fs %u, mask [0x%x]\n", 434 hdr.fragnum, body.serial_num, body.window_size, 435 body.max_tsdu, body.max_frag_size, 436 num_elements > 1 ? *maskp : body.selack[0])); 437 438 /* 439 * Free any memory we may have allocated. 440 */ 441 442 if (num_elements > 1) 443 { 444 RPC_MEM_FREE(maskp, RPC_C_MEM_DG_SELACK_MASK); 445 } 446} 447 448/* 449 * R P C _ _ D G _ C A L L _ X M I T 450 * 451 * Transmit one blast of packets. 452 * 453 * This routine expects that a blast's worth of packets have been queued 454 * and that the fack request frequency has been set. It is the callers 455 * responsibility to put the queue into a sensible state, and to make 456 * sure that the last packet is tagged appropriately. 457 */ 458 459PRIVATE void rpc__dg_call_xmit 460( 461 rpc_dg_call_p_t call, 462 boolean32 block 463) 464{ 465 unsigned32 num_sent, extra_fack = 0; 466 unsigned8 freqs_total; 467 boolean rexmiting = true; 468 rpc_dg_xmitq_p_t xq = &call->xq; 469 unsigned8 blast_size = xq->blast_size; 470 rpc_dg_xmitq_elt_p_t xqe = xq->rexmitq; 471 /* 472 * Below is a table for mapping window sizes to the number of fack 473 * requests per window that would be appropriate for that window. 474 * Inducing an appropriate number of facks per RTT serves two purposes: 475 * 476 * 1) it increases the granularity of the ack clock we use to do 477 * transmissions, allowing us to decrease the blast size needed to 478 * keep the congestion window full, and to achieve a better temporal 479 * spacing of the packets within the RTT. 480 481 * 2) it provides redundancy, increasing the likelihood that we will 482 * detect packet loss without having to timeout. 483 * 484 * The trade-off is the increase in backward traffic. The table below 485 * specifies what we think are the best compromises for a given window 486 * size. (The macro that follows just helps to hide the details.) 487 * Whenever the advertised window size stored in the xmitq is updated 488 * (do_fack_body), this table is consulted for an an appropriate 489 * 'outstanding fack' count. Note that the table is actually declared 490 * in the routine do_fack_body, where it is used. 491 */ 492 static unsigned8 window_size_to_freqs_out[] = 493 { 0, 1, 2, 2, 2, 494 3, 3, 3, 3, 495 3, 4, 4, 4, 496 4, 4, 4, 4 }; 497 498 RPC_DG_CALL_LOCK_ASSERT(call); 499 500 /* 501 * Determine an appropriate number of outstanding fack requests 502 * per RTT. 503 */ 504 freqs_total = (call)->xq.window_size > 16 ? 505 (call)->xq.window_size / 4 : 506 window_size_to_freqs_out[(call)->xq.window_size]; 507 508 /* 509 * If are more than 1 away from our target, set extra_fack to 510 * the number of the packet within this blast which should request 511 * a fack (The last packet always asks for a fack). We don't set 512 * this value above 2 because in the case where the sender is slower 513 * than the receiver, it will never be able to fully open the 514 * congestion window and will end up requesting facks on every packet. 515 */ 516 517 if (freqs_total - xq->freqs_out > 1) 518 { 519 extra_fack = blast_size / 2; 520 } 521 522 /* 523 * Send up to a blast size worth of retransmits/new-transmits. 524 */ 525 526 for (num_sent = 1; num_sent <= blast_size; num_sent++) 527 { 528 /* 529 * If we have gotten to the end of the retransmit queue, switch 530 * over to the regular queue. 531 */ 532 533 if (xqe == NULL && rexmiting) 534 { 535 xqe = xq->first_unsent; 536 rexmiting = false; 537 } 538 539 /* 540 * If we have gotten to the end of the regular queue, we're done 541 * early. 542 */ 543 544 if (xqe == NULL) 545 { 546 RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5, 547 ("(rpc__dg_call_xmit) Premature end of queue\n")); 548 break; 549 } 550 551 if (RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_FRAG)) 552 { 553 if (num_sent == extra_fack || num_sent == blast_size) 554 { 555 xqe->flags &= ~ RPC_C_DG_PF_NO_FACK; 556 /* 557 * Make note of newly outstanding fack request. 558 */ 559 xq->freqs_out++; 560 } 561 else 562 xqe->flags |= RPC_C_DG_PF_NO_FACK; 563 } 564 else 565 { 566 xq->freqs_out++; 567 } 568 569 /* 570 * Mark xqe as part of the current congestion window. 571 */ 572 573 xqe->in_cwindow = true; 574 575 rpc__dg_xmitq_elt_xmit(xqe, call, block); 576 577 if (rexmiting) 578 { 579 xqe = xqe->next_rexmit; 580 RPC_DG_STATS_INCR(dups_sent); 581 } 582 else 583 { 584 xqe = xqe->next; 585 xq->first_unsent = xqe; 586 } 587 } 588 589 /* 590 * Bump congestion window count, and adjust high-water mark if 591 * appropriate. 592 */ 593 594 xq->cwindow_size += num_sent - 1; 595 if (xq->cwindow_size > xq->high_cwindow) 596 xq->high_cwindow = xq->cwindow_size; 597 598 xq->timestamp = rpc__clock_stamp(); 599 xq->rexmitq = NULL; 600 xq->blast_size = 0; 601} 602 603/* 604 * R P C _ _ D G _ C A L L _ X M I T Q _ T I M E R 605 * 606 * Do any time-based retransmissions for a call's transmit queue. 607 */ 608 609PRIVATE void rpc__dg_call_xmitq_timer 610( 611 rpc_dg_call_p_t call 612) 613{ 614 rpc_dg_xmitq_p_t xq = &call->xq; 615 rpc_dg_xmitq_elt_p_t xqe; 616 617 RPC_DG_CALL_LOCK_ASSERT(call); 618 619 /* 620 * If the call is in an error state there's no need to keep 621 * transmitting things off the xmitq... with the exception of the 622 * call faulted error where the fault pkt is on the xmitq kinda 623 * masquerading as a response pkt. The fault response may (depends 624 * if the call is idempotent or not) be required to be "robust" and 625 * receive an ack. 626 */ 627 if (call->status != rpc_s_ok 628 && call->status != rpc_s_call_faulted) 629 { 630 return; 631 } 632 633 /* 634 * If there's nothing on the xmitq or we haven't reached the time 635 * to do retransmits for this xmitq, just return now. 636 */ 637 638 xqe = xq->head; 639 640 if (xqe == NULL || ! rpc__clock_aged(xq->timestamp, xq->rexmit_timeout)) 641 { 642 return; 643 } 644 645 /* 646 * See if we've been waiting too long for an acknowledgement, 647 * and if we have, blow off the call. 648 */ 649 650 if (rpc__dg_xmitq_awaiting_ack_tmo(xq, call->com_timeout_knob)) 651 { 652 rpc__dg_call_signal_failure(call, rpc_s_comm_failure); 653 return; 654 } 655 656 /* 657 * Transmit the first pkt on the xmitq. 658 * 659 * Note how we blithely ignore window constraints. If the receiver 660 * has closed the window on us, we want to occasionally elicit acks 661 * so that we'll know when window space opens up. 662 * 663 * Don't "rexmit" a pkt that has not yet been sent. If the pkt is 664 * the first unsent, then just setting the blast size to 1 (with 665 * an empty rexmitq) will cause START_XMIT to send this pkt and 666 * properly manage the xq pointers. 667 */ 668 669 if (xqe != xq->first_unsent) 670 { 671 xq->rexmitq = xqe; 672 xqe->next_rexmit = NULL; 673 } 674 675 /* 676 * Adjust the re-xmit age for the next time (i.e., be a good network 677 * citizen and backoff the next re-xmit). 678 */ 679 680 xq->rexmit_timeout = MIN(RPC_C_DG_MAX_REXMIT_TIMEOUT, (xq->rexmit_timeout << 1)); 681 682 /* 683 * If this packet is considered part of the current congestion 684 * window, remove it. This may also require decrementing the 685 * count of outstanding facks. 686 */ 687 688 if (xqe->in_cwindow) 689 { 690 xq->cwindow_size--; 691 xqe->in_cwindow = false; 692 693 if (! RPC_DG_FLAG_IS_SET(xqe->flags, RPC_C_DG_PF_NO_FACK) || 694 RPC_DG_FLAG_IS_SET(xqe->flags, RPC_C_DG_PF_LAST_FRAG)) 695 { 696 xq->freqs_out--; 697 } 698 } 699 700 xq->blast_size = 1; 701 702 RPC_DBG_PRINTF(rpc_e_dbg_xmit, 4, ( 703 "(rpc__dg_call_xmitq_timer) re-xmit'ing %u.%u [%s]\n", 704 xq->hdr.seq, xqe->fragnum, rpc__dg_act_seq_string(&xq->hdr))); 705 706 RPC_DG_START_XMIT(call); 707} 708 709/* 710 * R P C _ _ D G _ C A L L _ I N I T 711 * 712 * Initialize the parts of call handles (rpc_dg_call_t) that are 713 * initialized in the same fashion for both client and server call handles. 714 * 715 * No lock requirements. The call is private at this point. 716 */ 717 718PRIVATE void rpc__dg_call_init 719( 720 rpc_dg_call_p_t call 721) 722{ 723 rpc_clock_t now = rpc__clock_stamp(); 724 725 RPC_DG_CALL_LOCK_INIT(call); 726 727 call->c.protocol_id = RPC_C_PROTOCOL_ID_NCADG; 728 729 call->next = NULL; 730 call->state = rpc_e_dg_cs_init; 731 call->status = rpc_s_ok; 732 call->state_timestamp = now; 733 734 RPC_DG_CALL_COND_INIT(call); 735 736 rpc__dg_xmitq_init(&call->xq); 737 rpc__dg_recvq_init(&call->rq); 738 739 call->sock_ref = NULL; 740 call->actid_hash = 0; 741 call->key_info = NULL; 742 call->auth_epv = NULL; 743 call->addr = NULL; 744 memset(&call->timer, 0, sizeof(call->timer)); 745 call->last_rcv_timestamp = 0; 746 call->start_time = now; 747 call->high_seq = 0; 748 call->pkt_chain = NULL; 749 call->com_timeout_knob = 0; 750 call->refcnt = 0; 751 call->n_resvs = 0; 752 call->n_resvs_wait = 0; 753 call->max_resvs = 0; 754 755 call->blocked_in_receive = false; 756 call->priv_cond_signal = false; 757 call->stop_timer = false; 758 call->is_cbk = false; 759 call->is_in_pkt_chain = false; 760 761 memset(&call->thread_id, 0, sizeof(call->thread_id)); 762} 763 764/* 765 * R P C _ _ D G _ C A L L _ F R E E 766 * 767 * Free the resources referenced by the common call handle header 768 * (rpc_dg_call_t). 769 * 770 * This is an low level routine that should be invoked by the higher 771 * level call handle type specific (i.e. ccall, scall) free routines. 772 * The higher level routines are responsible for freeing resources 773 * associated with the non-common portion of the call handle as well 774 * as freeing the call handle object storage. 775 * 776 * This routine has the side effect of releasing the call's lock (which 777 * is only natural since the call structure, including it's lock are 778 * destroyed). 779 */ 780 781PRIVATE void rpc__dg_call_free 782( 783 rpc_dg_call_p_t call 784) 785{ 786 unsigned32 st; 787 788 RPC_DG_CALL_LOCK_ASSERT(call); 789 790 rpc__naf_addr_free(&call->addr, &st); 791 792 rpc__dg_xmitq_free(&call->xq, call); 793 rpc__dg_recvq_free(&call->rq); 794 795 /* 796 * In the event that this call was using a private socket, it is 797 * important that the socket is released from the call handle only 798 * *after* the xmit/recv queues have been freed. Every private socket 799 * is allocated a recv/xmit packet pair, which is available for use 800 * by the call. Since at any given time these packets may be queued 801 * on the call handle, we want to make sure that they are reassociated 802 * with the socket before we lose our reference to it. Freeing the 803 * queues before the socket ensures that this will happen. 804 */ 805 rpc__dg_network_sock_release(&call->sock_ref); 806 807 if (call->key_info) 808 RPC_DG_KEY_RELEASE(call->key_info); 809 CLOBBER_PTR(call->key_info); 810 811 RPC_DG_CALL_UNLOCK(call); 812 RPC_DG_CALL_LOCK_DELETE(call); 813 RPC_DG_CALL_COND_DELETE(call); 814 815 CLOBBER_PTR(call->next); 816 /* common call handle header may no longer be referenced */ 817} 818 819/* 820 * R P C _ _ D G _ C A L L _ W A I T 821 * 822 * Wait for a change in the call's state using the call's condition var 823 * Note, the caller is responsible for determing if the purpose for 824 * waiting was satisfied - don't assume anything. 825 * 826 * The 'event' argument indicates if the caller is waiting for a local 827 * event to occur. Currently there are two such events on which a call 828 * might need to block: 1) waiting for a packet pool reservation to 829 * become available, or 2) waiting for a datagram packet to become 830 * available. 831 * 832 * The local/remote distinction is only meaningful for calls that are 833 * using private sockets. Such calls wait for local events using the 834 * call's condition variable, and wait for remote events by blocking 835 * in recvfrom. Calls using a shared socket always wait on a condition 836 * variable. 837 * 838 * Detect pending cancels if we're called with general cancelability 839 * enabled (i.e. user cancels or orphans) and arrange for proper cancel 840 * handling. This is the central place where others synchronously detect 841 * that a call now has an error associated with it (see 842 * dg_call_signal_failure()). 843 * 844 * This routine *must* be called with the call's mutex held. This routine 845 * returns with the lock held. 846 */ 847 848PRIVATE void rpc__dg_call_wait 849( 850 rpc_dg_call_p_t call, 851 rpc_dg_wait_event_t event, 852 unsigned32 *st 853) 854{ 855 boolean is_server = RPC_DG_CALL_IS_SERVER(call); 856 rpc_dg_ccall_p_t ccall = (rpc_dg_ccall_p_t) call; 857 858 /* !!! RPC_UNLOCK_ASSERT(); */ 859 RPC_DG_CALL_LOCK_ASSERT(call); 860 861 *st = call->status; 862 if (*st != rpc_s_ok) 863 return; 864 865 /* 866 * Anytime the call handle is unlocked, it is possible for the listener 867 * (or timer) thread to modify its xmitq, and signal it to do a send. 868 * If the signal occurs during some window in which the call handle 869 * is unlocked, we'll miss it. Therefore, check here, before 870 * waiting(), to see if there is any data to transmit. (See bottom 871 * of routine for more information about transmissions.) 872 * 873 * If we do transmit data, return immediately. In the normal case 874 * the caller is waiting for queue space to open up on the xmitq, 875 * which will have happened by calling call_xmit(). 876 */ 877 878 if (RPC_DG_CALL_READY_TO_SEND(call)) 879 { 880 rpc__dg_call_xmit(call, true); 881 return; 882 } 883 884 /* 885 * If this is a client call handle make sure that we haven't been 886 * asked to run a callback prior to waiting. If we detect a local 887 * cancel, process it. 888 */ 889 890 if ((! is_server && ! ccall->cbk_start) || is_server) 891 { 892 /* 893 * If the call is using a private socket, and is waiting 894 * for a network event, let it wait in recvfrom. 895 */ 896 if (call->sock_ref->is_private && 897 event == rpc_e_dg_wait_on_network_event) 898 { 899 RPC_DBG_PRINTF(rpc_e_dbg_dg_sockets, 5, ( 900 "(rpc__dg_call_wait) waiting on network event\n")); 901 902 rpc__dg_network_select_dispatch(ccall->c.sock_ref->sock, 903 (dce_pointer_t) ccall->c.sock_ref, (boolean32) true, st); 904 } 905 else 906 { 907 908#ifndef _PTHREAD_NO_CANCEL_SUPPORT 909 DCETHREAD_TRY { 910 RPC_DG_CALL_COND_WAIT(call); 911 } DCETHREAD_CATCH(dcethread_interrupt_e) { 912 rpc__dg_call_local_cancel(call); 913 } DCETHREAD_CATCH_ALL(THIS_CATCH) { 914 rpc__dg_call_signal_failure(call, 915 (unsigned32) -1 /* !!! rpc_s_unknown_exception */); 916 } DCETHREAD_ENDTRY 917#else 918 /* 919 * We certainly can't try to detect a non-existant cancel; 920 * just wait (and avoid the TRY/CATCH overhead). 921 */ 922 RPC_DG_CALL_COND_WAIT(call); 923#endif 924 } 925 926 *st = call->status; 927 if (*st != rpc_s_ok) 928 return; 929 } 930 931 /* 932 * If this is a client call handle and we've been asked to do a 933 * callback, now's the time to do it. While the callback is running, 934 * we must unlock the original CCALL. Additionally, we lock and bump 935 * the cbk_scall's reference count (mimicing the normal processing 936 * of handing off a scall reference to the call executor). 937 * 938 * To provide the same server call executor thread environment 939 * as a 'real' server call executor, disable / restore cancelability 940 * while in the server call processing (see rpc__dg_call_local_cancel()). 941 */ 942 943 if (! is_server) 944 { 945 if (ccall->cbk_start) 946 { 947 int oc; 948 949 assert(ccall->cbk_scall != NULL); 950 ccall->cbk_start = false; 951 952 RPC_DG_CALL_LOCK(&ccall->cbk_scall->c); 953 RPC_DG_CALL_REFERENCE(&ccall->cbk_scall->c); 954 ccall->cbk_scall->has_call_executor_ref = true; 955 956 RPC_DG_CALL_UNLOCK(&ccall->c); 957 958 oc = dcethread_enableinterrupt_throw(0); 959 rpc__dg_execute_call((dce_pointer_t) &ccall->cbk_scall->c, false); 960 dcethread_enableinterrupt_throw(oc); 961 962 RPC_DG_CALL_LOCK(&ccall->c); 963 } 964 } 965 966 /* 967 * If there is data to transmit (or retransmit), send it now. Transmissions 968 * have been moved into this routine based on the following observations. 969 * 970 * - The ability to transmit (open congestion window space) 971 * occurs asynchronously to the execution of the thread. A 972 * transmitting thread is only interested in queueing data, 973 * it doesn't care/know about the actual (re)transmission. 974 * threads do not wait on the ability to transmit directly. 975 * 976 * - When cwindow space opens up, the thread is signalled to 977 * do the send. The thread will have been waiting on a 978 * different condition (probably space on the xmitq) but will 979 * need to check first if there is data that needs to be sent. 980 * 981 * - Each time a thread needs to wait for any condition (e.g. 982 * a packet to become available) it must also check that it 983 * wasn't woken to do a send. 984 * 985 * - Whenever a thread is awoken to do a send, it *always* wants 986 * to do the send immediately, before checking on whatever 987 * other condition it was waiting on. 988 * 989 * - It is error prone to require each caller of call_wait to 990 * perform its own check for transmissable data, as well as 991 * whatever condition it was really interested in. 992 */ 993 994 if (RPC_DG_CALL_READY_TO_SEND(call)) 995 rpc__dg_call_xmit(call, true); 996} 997 998/* 999 * R P C _ _ D G _ C A L L _ S I G N A L 1000 * 1001 * Signal a change in the call's state. 1002 */ 1003 1004PRIVATE void rpc__dg_call_signal 1005( 1006 rpc_dg_call_p_t call 1007) 1008{ 1009 RPC_DG_CALL_LOCK_ASSERT(call); 1010 1011 /* 1012 * If the call is using a private socket, and is currently blocked in 1013 * a call to recvfrom, we'll need to post a cancel against its thread 1014 * in order to wake it. Otherwise, just signal the call's condition 1015 * variable. 1016 */ 1017 if (call->sock_ref->is_private && call->blocked_in_receive) 1018 { 1019 RPC_DBG_PRINTF(rpc_e_dbg_dg_sockets, 5, ( 1020 "(rpc__dg_call_signal) cancelling private socket thread\n")); 1021 1022 call->priv_cond_signal = true; 1023 dcethread_interrupt_throw(call->thread_id); 1024 } 1025 else 1026 { 1027 /* 1028 * normally (always?) at most one waiter will be present 1029 */ 1030 RPC_DG_CALL_COND_SIGNAL(call); 1031 } 1032} 1033 1034/* 1035 * R P C _ _ D G _ C A L L _ S I G N A L _ F A I L U R E 1036 * 1037 * Signal a change in the call's state due to a failure. 1038 * 1039 * The idea is that is that dg_call_signal_failure() and dg_call_wait() 1040 * are the only routines that reference the the call's error status 1041 * field. Additionally, this field's value is what will be returned to 1042 * the stubs. Retain the 1st failure status. 1043 */ 1044 1045PRIVATE void rpc__dg_call_signal_failure 1046( 1047 rpc_dg_call_p_t call, 1048 unsigned32 stcode 1049) 1050{ 1051 RPC_DG_CALL_LOCK_ASSERT(call); 1052 1053 RPC_DBG_GPRINTF( 1054 ("(rpc__dg_call_signal_failure) %s st = 0x%x (orig st = 0x%x) [%s]\n", 1055 RPC_DG_CALL_IS_SERVER(call) ? "SCALL" : "CCALL", 1056 stcode, call->status, rpc__dg_act_seq_string(&call->xq.hdr))); 1057 1058 if (call->status == rpc_s_ok) 1059 call->status = stcode; 1060 1061 rpc__dg_call_signal(call); 1062} 1063 1064/* 1065 * R P C _ _ D G _ C A L L _ X M I T Q _ P U S H 1066 * 1067 * Push out any queue transmit data. 1068 */ 1069 1070PRIVATE void rpc__dg_call_xmitq_push 1071( 1072 rpc_dg_call_p_t call, 1073 unsigned32 *st 1074) 1075{ 1076 rpc_dg_xmitq_p_t xq = &call->xq; 1077 1078 RPC_DG_CALL_LOCK_ASSERT(call); 1079 1080 *st = rpc_s_ok; 1081 1082 /* 1083 * Normally, we want to loop in here while the (re)transmission logic 1084 * sends whatever data remains on the xmitq. In the case where the 1085 * queue is quiescent, this requires a little bootstrapping. This 1086 * will be the case if either 1) no data has been sent from this 1087 * xmitq yet, or 2) everyting on the queue was already facked and 1088 * the queue is now empty. In either case, we can't wait for a fack 1089 * or a timeout before doing the next transmit. Both cases are 1090 * identified by a NULL xq->head, and are handled by setting the 1091 * blast size to 1 so that the partial packet get sent immediately. 1092 */ 1093 1094 if (xq->head == NULL) 1095 xq->blast_size = 1; 1096 /* 1097 * If the queue is not empty and has no outstanding fack request, 1098 * send two fragments. 1099 * 1100 * call_transmit_int() does not start the window until the two 1101 * fragments are filled. Thus, we could have one full and one 1102 * partial fragments here. 1103 */ 1104 else if (xq->freqs_out == 0) 1105 xq->blast_size = 2; 1106 1107 /* 1108 * Turn around the connection and force (and wait for) all queued 1109 * up packets to be transmitted; the listener handles retransmits. 1110 * If the call has an error condition we're done. 1111 */ 1112 1113 xq->push = true; 1114 1115 /* 1116 * Ensure that a final partial packet gets added to the queue. 1117 * Note that this call must follow the setting of the push flag 1118 * so that append_pp knows whether to set the frag bit in the 1119 * packet header. 1120 */ 1121 1122 rpc__dg_xmitq_append_pp(call, st); 1123 if (*st != rpc_s_ok) 1124 return; 1125 1126 while (true) 1127 { 1128 if (RPC_DG_CALL_READY_TO_SEND(call)) 1129 rpc__dg_call_xmit(call, true); 1130 if (xq->first_unsent == NULL) 1131 break; 1132 rpc__dg_call_wait(call, rpc_e_dg_wait_on_network_event, st); 1133 if (*st != rpc_s_ok) 1134 return; 1135 } 1136} 1137 1138/* 1139 * R P C _ _ D G _ C A L L _ R E C V Q _ I N S E R T 1140 * 1141 * Insert a receive queue element (packet) into a receive queue in the 1142 * right place. Drop duplicate packets and generate a "fack" if one 1143 * is requested (even for duplicates). Return "true" iff we actually 1144 * add the packet to the receive queue. The "rqe_is_head_inorder" output 1145 * param is set to "true" iff the passed receive queue element became 1146 * head of the queue and is in-order. 1147 */ 1148 1149PRIVATE boolean rpc__dg_call_recvq_insert 1150( 1151 rpc_dg_call_p_t call, 1152 rpc_dg_recvq_elt_p_t rqe, 1153 boolean *wake_thread 1154) 1155{ 1156 rpc_dg_recvq_p_t rq = &call->rq; 1157 rpc_dg_recvq_elt_p_t scan_rqe, prev_scan_rqe; 1158 unsigned16 fragnum = rqe->hdrp->fragnum; 1159 unsigned16 next_fragnum = rq->next_fragnum; 1160 unsigned16 curr_serial = (rqe->hdrp->serial_hi << 8) | rqe->hdrp->serial_lo; 1161 boolean added_to_queue; 1162 1163 RPC_DG_CALL_LOCK_ASSERT(call); 1164 1165 added_to_queue = true; 1166 *wake_thread = false; 1167 1168 /* 1169 * We could rework things to lessen the processing for the single 1170 * pkt stream case, but at this point this "additional" overhead's 1171 * performance impact has to be a second order effect... 1172 */ 1173 1174 /* 1175 * Remember if we're recving frags. To insulate us from potentially 1176 * bogus transmitters, once we're recving frags we stay that way. 1177 * It's cheaper and more effective to do this than to "assert" it. 1178 */ 1179 rq->recving_frags |= RPC_DG_HDR_FLAG_IS_SET(rqe->hdrp, RPC_C_DG_PF_FRAG); 1180 1181 /* 1182 * The receiver needs to keep track of the highest serial number it 1183 * has seen so far, just in case it ever needs to send a fack but 1184 * isn't holding onto an rqe at the time. 1185 */ 1186 if (RPC_DG_SERIAL_IS_LT(rq->high_serial_num, curr_serial)) 1187 rq->high_serial_num = curr_serial; 1188 1189 /* 1190 * The receiver needs to keep track of the largest fragment size it 1191 * has seen so far, so that when it becomes the sender it can start 1192 * sending that much data. 1193 */ 1194 1195 if (rqe->frag_len > rq->high_rcv_frag_size) { 1196 RPC_DBG_PRINTF(rpc_e_dbg_recv, 7, 1197 ("(rpc__dg_call_recvq_insert) Set high_rcv_frag %lu was %lu\n", 1198 rqe->frag_len, rq->high_rcv_frag_size)); 1199 rq->high_rcv_frag_size = rqe->frag_len; 1200 1201 /* 1202 * If high_rcv_frag_size > RPC_C_DG_MUST_RECV_FRAG_SIZE, that means 1203 * we have advertised MBF and the sender can cope with it. We need 1204 * to adjust the max_queue_len, if possible. 1205 * 1206 * Note: MBF (Multi-Buffer Fragments) implies that the call has the 1207 * more than 1 packet reservations. Also if rq->max_queue_len != 1208 * RPC_C_DG_MAX_RECVQ_LEN, then the max_queue_len is already 1209 * adjusted and no need for re-adjustment (because the packet 1210 * reservation gets adjusted only once). 1211 * 1212 * LBF (Large-Buffer Fragment) and the fragment size kept between 1213 * calls means that the first request pdu could be greater than 1214 * RPC_C_DG_MUST_RECV_FRAG_SIZE. 1215 */ 1216 if (call->n_resvs > 0 1217 && rq->high_rcv_frag_size > RPC_C_DG_MUST_RECV_FRAG_SIZE 1218 && rq->max_queue_len == RPC_C_DG_MAX_RECVQ_LEN) 1219 { 1220 unsigned32 max_queue_len = 1221 RPC_C_DG_MAX_RECVQ_LEN / MIN(call->n_resvs, call->max_resvs); 1222 1223 /* 1224 * Update rq->max_queue_len. 1225 * 1226 * If the current queue_len already exceeds the new 1227 * max_queue_len, we can't adjust it here. 1228 * 1229 * Also, we must have at least one inorder fragment. Otherwise 1230 * nothing gets dequeued. 1231 */ 1232 if (rq->queue_len <= max_queue_len 1233 && rq->inorder_len > 0) 1234 { 1235 rq->max_queue_len = max_queue_len; 1236 RPC_DBG_PRINTF(rpc_e_dbg_recv, 7, 1237 ("(rpc__dg_call_recvq_insert) Set max_queue_len %u\n", 1238 rq->max_queue_len)); 1239 } 1240 } 1241 } 1242 1243 /* 1244 * Scalls without packet pool reservations are not allowed to queue 1245 * packets. The two cases in which we encounter this condition are 1246 * for queued calls, and scalls that were started when no reservations 1247 * were available. 1248 * 1249 * The callback scall should be allowed to queue data without the 1250 * reservation if using the private socket. However, since the 1251 * callback is not officially supported (and the scall doesn't use 1252 * the private socket), we make no distinction. (Trying a little 1253 * performance recovery...) 1254 * 1255 * Note that it is ok for a ccall to queue data before it has made 1256 * a packet reservation, since ccalls may have private packets 1257 * associated with their socket refs. 1258 */ 1259 1260 if (RPC_DG_CALL_IS_SERVER(call) && call->n_resvs == 0) 1261 { 1262 RPC_DBG_PRINTF(rpc_e_dbg_recv, 6, ( 1263 "(rpc__dg_call_recvq_insert) dropping queued/un-setup call's pkt\n")); 1264 1265 added_to_queue = false; 1266 } 1267 else if (fragnum == next_fragnum) 1268 { 1269 /* 1270 * This is the fast path. First make sure that the recvq does not 1271 * already contain the max allowable packets; if so, we can't accept 1272 * this one. 1273 */ 1274 1275 if (rq->queue_len == rq->max_queue_len) 1276 { 1277 added_to_queue = false; 1278 } 1279 1280 /* 1281 * If we are rationing packets, we can only accept a packet if 1282 * it is the next immediately usable packet for this call. This 1283 * will be the case if the current packet will become the new 1284 * head of this call's recvq. 1285 * 1286 * Another check needs to be made to make sure that the call 1287 * executor call isn't holding onto a packet (in dg_execute_call) 1288 * before the call's WAY has been done. The call thread will set 1289 * a flag in the recvq to indicate that it's done with its WAY 1290 * processing. 1291 */ 1292 else if (rpc__dg_pkt_is_rationing(NULL) && 1293 (rq->last_inorder != NULL || 1294 (! rq->is_way_validated && fragnum != 0))) 1295 { 1296 RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 1, 1297 ("(rpc__dg_call_recvq_insert) rationing, not 'next' pkt, fc %u pkt %u [%s]\n", 1298 rpc_g_dg_pkt_pool.free_count, rpc_g_dg_pkt_pool.pkt_count, 1299 rpc__dg_act_seq_string(&call->xq.hdr))); 1300 1301 added_to_queue = false; 1302 } 1303 1304 /* 1305 * Otherwise, it's the next in-order packet and we add it to 1306 * the queue. If there are currently no in-order pkts, we know 1307 * that this pkt will be inserted at the head of the queue and 1308 * make the head in-order. Otherwise, there are already in-order 1309 * fragments on the queue (hence the head is already in-order). 1310 * We insert this frag after the "last_inorder" frag and scan 1311 * to locate the new "last_inorder" frag. In both cases, the 1312 * "last_inorder" and "next_fragnum" are updated appropriatly. 1313 */ 1314 1315 else 1316 { 1317 if (rq->last_inorder == NULL) 1318 { 1319 rqe->next = rq->head; 1320 rq->head = rqe; 1321 rq->head_fragnum = fragnum; 1322 } 1323 else 1324 { 1325 rqe->next = rq->last_inorder->next; 1326 rq->last_inorder->next = rqe; 1327 } 1328 1329 prev_scan_rqe = rqe; 1330 scan_rqe = rqe->next; 1331 next_fragnum++; 1332 rq->inorder_len++; 1333 1334 while (scan_rqe != NULL && scan_rqe->hdrp->fragnum == next_fragnum) 1335 { 1336 prev_scan_rqe = scan_rqe; 1337 scan_rqe = scan_rqe->next; 1338 next_fragnum++; 1339 rq->inorder_len++; 1340 } 1341 1342 rq->last_inorder = prev_scan_rqe; 1343 rq->next_fragnum = next_fragnum; 1344 } 1345 } 1346 else if (RPC_DG_FRAGNUM_IS_LT(fragnum, next_fragnum)) 1347 { 1348 /* 1349 * This is an old fragment and we don't add it to the queue. 1350 */ 1351 1352 added_to_queue = false; 1353 RPC_DG_STATS_INCR(dups_rcvd); 1354 } 1355 else 1356 { 1357 RPC_DBG_PRINTF(rpc_e_dbg_recv, 6, ( 1358 "(rpc__dg_call_recvq_insert) recv out of order %u.%u, next_fragnum = .%u\n", 1359 rqe->hdrp->seq, fragnum, next_fragnum)); 1360 1361 RPC_DG_STATS_INCR(oo_rcvd); 1362 1363 /* 1364 * This is not an in-order fragment (i.e. the head can not now 1365 * become in-order nor can the "last_inorder" pointer require 1366 * updating); it may also be a duplicate. 1367 * 1368 * If we're currently rationing packets, then we must drop this 1369 * one. 1370 */ 1371 1372 if (rpc__dg_pkt_is_rationing(NULL)) 1373 { 1374 RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 1, 1375 ("(rpc__dg_call_recvq_insert) rationing, oo pkt, fc %u pkt %u [%s]\n", 1376 rpc_g_dg_pkt_pool.free_count, rpc_g_dg_pkt_pool.pkt_count, 1377 rpc__dg_act_seq_string(&call->xq.hdr))); 1378 1379 added_to_queue = false; 1380 } 1381 1382 /* 1383 * Don't accept the packet if it is greater than max_recvq_len 1384 * away from the first packet (to be) queued. This check avoids the 1385 * possibility of having a queue of max length (which can't be 1386 * added to), but which doesn't contain the next awaited packet. 1387 */ 1388 1389 else if ((rq->head != NULL && 1390 RPC_DG_FRAGNUM_IS_LT(rq->head_fragnum, next_fragnum) && 1391 RPC_DG_FRAGNUM_IS_LTE(rq->head_fragnum + 1392 rq->max_queue_len, fragnum)) 1393 || RPC_DG_FRAGNUM_IS_LTE(next_fragnum + 1394 rq->max_queue_len, fragnum)) 1395 { 1396 added_to_queue = false; 1397 } 1398 1399 /* 1400 * If the queue is empty, the frag becomes the head. Otherwise, 1401 * we must scan the queue for the proper insertion point and 1402 * ensure this isn't a duplicate. If there are no in-order 1403 * fragments, the scan starts at the head of the queue. If there 1404 * ARE in-order frags, the scan starts at the first frag after 1405 * the last in-order frag. 1406 */ 1407 1408 else 1409 { 1410 if (rq->head == NULL) 1411 { 1412 rq->head = rqe; 1413 rq->head_fragnum = fragnum; 1414 } 1415 else 1416 { 1417 if (rq->last_inorder == NULL) 1418 { 1419 scan_rqe = rq->head; 1420 prev_scan_rqe = NULL; 1421 } 1422 else 1423 { 1424 scan_rqe = rq->last_inorder->next; 1425 prev_scan_rqe = rq->last_inorder; 1426 } 1427 1428 while (scan_rqe != NULL && 1429 RPC_DG_FRAGNUM_IS_LT(scan_rqe->hdrp->fragnum, fragnum)) 1430 { 1431 prev_scan_rqe = scan_rqe; 1432 scan_rqe = scan_rqe->next; 1433 } 1434 1435 if (scan_rqe != NULL && fragnum == scan_rqe->hdrp->fragnum) 1436 { 1437 added_to_queue = false; 1438 } 1439 else 1440 { 1441 rqe->next = scan_rqe; 1442 1443 /* 1444 * Before setting the pointer in the up-stream packet, 1445 * check to see if we just pointed this packet at the 1446 * previous head of the list. If so, this packet becomes 1447 * the new head. 1448 */ 1449 1450 if (scan_rqe == rq->head) 1451 { 1452 rq->head = rqe; 1453 rq->head_fragnum = fragnum; 1454 } 1455 else { 1456 assert(prev_scan_rqe != NULL); 1457 prev_scan_rqe->next = rqe; 1458 } 1459 } 1460 } 1461 } 1462 } 1463 1464 /* 1465 * We *always* fall through to this common processing. 1466 */ 1467 1468 /* 1469 * if we're accepting the packet, update some queue state. 1470 */ 1471 if (added_to_queue) 1472 { 1473 rq->queue_len++; 1474 1475 /* 1476 * So that the selective ack code can easily determine the length 1477 * of the mask needed for a fack body, keep track of the highest 1478 * numbered fragment we have on the queue. 1479 */ 1480 if (RPC_DG_FRAGNUM_IS_LT(rq->high_fragnum, fragnum)) 1481 { 1482 rq->high_fragnum = fragnum; 1483 } 1484 1485 /* 1486 * The data stream is complete when our queue has inorder data 1487 * and it's not a fragmented stream, or if the last inorder 1488 * fragment has the last_frag bit set. 1489 * 1490 * If rq->last_inorder->hdrp == NULL (can happen iff last_inorder == 1491 * head), we must have added the out of order fragment, that means 1492 * the data stream is incomplete. 1493 */ 1494 if (rq->last_inorder != NULL && 1495 (! rq->recving_frags || 1496 (rq->last_inorder->hdrp != NULL && 1497 RPC_DG_HDR_FLAG_IS_SET(rq->last_inorder->hdrp, 1498 RPC_C_DG_PF_LAST_FRAG)))) 1499 { 1500 rq->all_pkts_recvd = true; 1501 } 1502 } 1503 1504 RPC_DBG_PRINTF(rpc_e_dbg_recv, 7, 1505 ("(rpc__dg_call_recvq_insert) recv %s %u.%u.%u len=%d %s\n", 1506 rpc__dg_pkt_name(RPC_DG_HDR_INQ_PTYPE(rqe->hdrp)), 1507 rqe->hdrp->seq, 1508 fragnum, (rqe->hdrp->serial_hi << 8) | rqe->hdrp->serial_lo, 1509 rqe->hdrp->len, 1510 (rq->recving_frags && 1511 ! RPC_DG_HDR_FLAG_IS_SET(rqe->hdrp, RPC_C_DG_PF_NO_FACK)) ? 1512 "frq" : "")); 1513 RPC_DBG_PRINTF(rpc_e_dbg_recv, 7, 1514 ("(rpc__dg_call_recvq_insert) recv frag_len %lu\n", rqe->frag_len)); 1515 1516 /* 1517 * If we've got a fragmented receive stream consider sending a fack. 1518 * Send a fack if the receive queue is full or if the sender asked 1519 * for a fack. To avoid confusing pre-v2 code, don't send a fack 1520 * when the stream is complete. 1521 */ 1522 if (rq->recving_frags 1523 && ! rq->all_pkts_recvd 1524 && (rq->queue_len == rq->max_queue_len || 1525 ! RPC_DG_HDR_FLAG_IS_SET(rqe->hdrp, RPC_C_DG_PF_NO_FACK))) 1526 { 1527 RPC_DBG_PRINTF(rpc_e_dbg_recv, 6, ( 1528 "(rpc__dg_call_recvq_insert) recv data %u.%u frq, fack --> .%u\n", 1529 rqe->hdrp->seq, fragnum, rq->next_fragnum - 1)); 1530 rpc__dg_call_xmit_fack(call, rqe, false); 1531 } 1532 1533 /* 1534 * If the call has a private socket, then it was the call thread 1535 * itself that called us. No need to signal it. 1536 */ 1537 if (call->sock_ref->is_private) 1538 return (added_to_queue); 1539 1540 /* 1541 * It is appropriate to awaken a receiver when: 1542 * Their receive stream is complete 1543 * -OR- 1544 * The recvq is full (the head should be inorder) 1545 * -OR- 1546 * There are a sufficient number of inorder RQE's 1547 * -OR- 1548 * We're rationing, and the thread has a useable packet. 1549 * 1550 * Note: We estimate # of inorder bytes based on high_rcv_frag_size. 1551 */ 1552 if (rq->all_pkts_recvd || 1553 rq->queue_len == rq->max_queue_len || 1554 rq->high_rcv_frag_size * rq->inorder_len >= rq->wake_thread_qsize || 1555 (rpc__dg_pkt_is_rationing(NULL) && rq->inorder_len > 0)) 1556 *wake_thread = true; 1557 1558 return(added_to_queue); 1559} 1560 1561/* 1562 * R P C _ _ D G _ C A L L _ L O C A L _ C A N C E L 1563 * 1564 * A local cancel has been detected (i.e. a cancelable operation 1565 * caused an unwind). What needs to be done depends on the type 1566 * of call handle. Callbacks complicate matters - as always :-) 1567 * The actual code is pretty simple; it's the case analysis that's 1568 * a bear; presumably, this 'documentation' will be useful. 1569 */ 1570 1571PRIVATE void rpc__dg_call_local_cancel 1572( 1573 rpc_dg_call_p_t call 1574) 1575{ 1576 RPC_DG_CALL_LOCK_ASSERT(call); 1577 1578 if (RPC_DG_CALL_IS_CLIENT(call)) 1579 { 1580 rpc_dg_ccall_p_t ccall = (rpc_dg_ccall_p_t) call; 1581 1582 /* 1583 * All local detected cancels while dealing with a ccall are 1584 * forwarded to the associated server. This is because any such 1585 * cancel *was* generated locally. The original client will 1586 * only accept a forwarded cancel request (resulting in a remotely 1587 * generated cancel) while a callback is in progress. The callback 1588 * would flush any pending cancels when it completes. 1589 * 1590 * If the call has yet to establish a cancel timeout, do so now. 1591 */ 1592 ccall->cancel.local_count++; 1593 rpc__dg_ccall_setup_cancel_tmo(ccall); 1594 1595 RPC_DBG_PRINTF(rpc_e_dbg_cancel, 10, 1596 ("(rpc__dg_call_local_cancel) ccall fwd [%s]\n", 1597 rpc__dg_act_seq_string(&ccall->c.xq.hdr))); 1598 1599 rpc__dg_ccall_xmit_cancel_quit(ccall, ccall->cancel.local_count); 1600 } 1601 else 1602 { 1603 rpc_dg_scall_p_t scall = (rpc_dg_scall_p_t) call; 1604 1605 /* 1606 * If we're handling a non-callback scall (i.e. a normal scall) 1607 * we don't (typically) expect to see local cancels since the 1608 * scall execution thread (typically) executes with cancel delivery 1609 * disabled; the sstub enables delivery only while in the manager. 1610 * 1611 * In the event that we have detected the local cancel, then 1612 * someone enabled cancel delivery while calling back into the 1613 * runtime. This could happen (a) because the sstub was trying 1614 * to implement a cancelable-prologue mode (though NIDL doesn't 1615 * support this at the time this was written); (b) a manager 1616 * called a pipe handler without disabling cancel delivery 1617 * (presumably because it wanted a cancel to be able to terminate 1618 * the call); (c) runtime or sstub bug :-) 1619 * 1620 * Such a local cancel was presumably generated by the reception 1621 * of a cancel-request. However, some local manager could have 1622 * (illegally?) posted a cancel to this call execution thread. 1623 * Lastly, the local cancel may have originated due to local 1624 * orphan call processing. 1625 * 1626 * In any case, the effect is the same. Local cancel detection while 1627 * processing a normal scall results in a call failure. 1628 */ 1629 if (!scall->c.is_cbk) 1630 { 1631 RPC_DBG_GPRINTF( 1632 ("(rpc__dg_call_local_cancel) scall failure [%s]\n", 1633 rpc__dg_act_seq_string(&scall->c.xq.hdr))); 1634 1635 rpc__dg_call_signal_failure(&scall->c, rpc_s_call_cancelled); 1636 } 1637 else 1638 1639 /* 1640 * We're handling a callback scall and we've detected a local cancel. 1641 * 1642 * Callbacks and cancels are a very poor / ugly story. Basically 1643 * (given the current client / server RPC cancel design) callbacks 1644 * and cancels can only work properly when clients (both the original 1645 * and the callback originating manager) are executing with cancel-on. 1646 * Other scenarios result in very inproper handling of a RPC cancel. 1647 * 1648 * If the original client RPC was being made with cancel delivery 1649 * disabled, then the callback's sstub forceable enabling cancel 1650 * delivery while in the callback manager has already broken 1651 * the cancel model. A cancel against the original client thread 1652 * was NOT suppose to be seen. 1653 * 1654 * Another case where callbacks and cancels don't work is if 1655 * the client RPC is being performed cancel-on and the manager 1656 * decides to perform a callback with cancel-off. Again, a cancel 1657 * against the original thread can result in terminating the 1658 * callback manager (which wasn't suppose to see a cancel). 1659 * 1660 * Since the only callback cancel case that can work correctly is 1661 * when all components of the original call are being made with 1662 * cancel delivery enabled, that's the only case we'll try to make 1663 * work. Here's what we do: 1664 * If we detected a local cancel for a cbk_scall 1665 * AND the cbk_scall had received a forwarded 1666 * cancel-request 1667 * then we treat this just like the normal scall and 1668 * fail the cbk_scall 1669 * otherwise, the cancel must have been a result 1670 * of a dcethread_interrupt_throw() against the original 1671 * client and the cancel should treated just like 1672 * the normal ccall; forward the cancel to the server 1673 */ 1674 { 1675 if (scall->c.c.u.server.cancel.count > 0) 1676 { 1677 /* 1678 * The callback accepted a cancel-request. 1679 */ 1680 RPC_DBG_GPRINTF( 1681 ("(rpc__dg_call_local_cancel) cbk_scall failure [%s]\n", 1682 rpc__dg_act_seq_string(&scall->c.xq.hdr))); 1683 1684 rpc__dg_call_signal_failure(&scall->c, rpc_s_call_cancelled); 1685 } 1686 else 1687 { 1688 /* 1689 * Locally generated cancel - forward it to the originator 1690 * (the server) of this callback (as above). 1691 * 1692 * The callback's cbk_ccall is the one where the info 1693 * must be updated. Unfortunately there's a locking 1694 * hierarchy violation that we have to deal with. In 1695 * the case that we can't get the lock, just repost the 1696 * cancel to the thread so we don't loose it (we will 1697 * try to handle it again the next time the thread calls 1698 * a cancelable operation). 1699 */ 1700 1701 rpc_dg_ccall_p_t ccall = scall->cbk_ccall; 1702 boolean b; 1703 1704 RPC_DG_CALL_TRY_LOCK(&ccall->c, &b); 1705 if (!b) 1706 { 1707 RPC_DBG_PRINTF(rpc_e_dbg_cancel, 3, 1708 ("(rpc__dg_call_local_cancel) cbk_scall can't get ccall lock [%s]\n", 1709 rpc__dg_act_seq_string(&scall->c.xq.hdr))); 1710 1711 dcethread_interrupt_throw(dcethread_self()); 1712 } 1713 else 1714 { 1715 ccall->cancel.local_count++; 1716 rpc__dg_ccall_setup_cancel_tmo(ccall); 1717 1718 RPC_DBG_PRINTF(rpc_e_dbg_cancel, 10, 1719 ("(rpc__dg_call_local_cancel) cbk_scall ccall fwd [%s]\n", 1720 rpc__dg_act_seq_string(&ccall->c.xq.hdr))); 1721 1722 rpc__dg_ccall_xmit_cancel_quit(ccall, 1723 ccall->cancel.local_count); 1724 RPC_DG_CALL_UNLOCK(&ccall->c); 1725 } 1726 } 1727 } 1728 } 1729} 1730