1/* 2 * Copyright (c) 2010 Apple Inc. All rights reserved. 3 * 4 * @APPLE_LICENSE_HEADER_START@ 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the name of Apple Inc. ("Apple") nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY 20 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 22 * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY 23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 26 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 * 30 * Portions of this software have been released under the following terms: 31 * 32 * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC. 33 * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY 34 * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION 35 * 36 * To anyone who acknowledges that this file is provided "AS IS" 37 * without any express or implied warranty: 38 * permission to use, copy, modify, and distribute this file for any 39 * purpose is hereby granted without fee, provided that the above 40 * copyright notices and this notice appears in all source code copies, 41 * and that none of the names of Open Software Foundation, Inc., Hewlett- 42 * Packard Company or Digital Equipment Corporation be used 43 * in advertising or publicity pertaining to distribution of the software 44 * without specific, written prior permission. Neither Open Software 45 * Foundation, Inc., Hewlett-Packard Company nor Digital 46 * Equipment Corporation makes any representations about the suitability 47 * of this software for any purpose. 48 * 49 * Copyright (c) 2007, Novell, Inc. All rights reserved. 50 * Redistribution and use in source and binary forms, with or without 51 * modification, are permitted provided that the following conditions 52 * are met: 53 * 54 * 1. Redistributions of source code must retain the above copyright 55 * notice, this list of conditions and the following disclaimer. 56 * 2. Redistributions in binary form must reproduce the above copyright 57 * notice, this list of conditions and the following disclaimer in the 58 * documentation and/or other materials provided with the distribution. 59 * 3. Neither the name of Novell Inc. nor the names of its contributors 60 * may be used to endorse or promote products derived from this 61 * this software without specific prior written permission. 62 * 63 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY 64 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 65 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 66 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY 67 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 68 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 69 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 70 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 71 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 72 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 73 * 74 * @APPLE_LICENSE_HEADER_END@ 75 */ 76 77/* 78** 79** NAME: 80** 81** dgxq.c 82** 83** FACILITY: 84** 85** Remote Procedure Call (RPC) 86** 87** ABSTRACT: 88** 89** DG protocol service routines. Handles transmit queues. 90** 91** 92*/ 93 94#include <dg.h> 95#include <dgxq.h> 96#include <dgpkt.h> 97#include <dgcall.h> 98 99/*========================================================================= */ 100 101#define MAX_SENDMSG_RETRIES 5 102 103/*========================================================================= */ 104 105#define XQE_FREE_LIST_MAX_LENGTH 32 106 107INTERNAL struct { 108 rpc_dg_xmitq_elt_p_t head; 109 unsigned16 length; 110} xqe_free_list ATTRIBUTE_UNUSED; 111 112/*========================================================================= */ 113 114/* 115 * com_timeout controllable parameters... 116 * 117 * max_xq_awaiting_ack_time: 118 * The maximum time that we will attempt to retransmit pkts to a receiver 119 * without any apparent type of acknowledgment. This is a general xq 120 * related value and mechanism that removes the need for the old server 121 * specific "final" state timeout processing. 122 */ 123 124typedef struct { 125 unsigned32 max_xq_awaiting_ack_time; 126} com_timeout_params_t; 127 128/*========================================================================= */ 129 130/* 131 * R P C _ _ D G _ X M I T Q _ A W A I T I N G _ A C K _ T M O 132 * 133 * Return true iff the xmitq is awaiting an ack and it's been waiting 134 * too long. 135 */ 136 137PRIVATE boolean rpc__dg_xmitq_awaiting_ack_tmo 138( 139 rpc_dg_xmitq_p_t xq, 140 unsigned32 com_timeout_knob 141) 142{ 143 rpc_clock_t timestamp, wait_time; 144 static com_timeout_params_t xq_com_timeout_params[] = { 145 /* max_xq_awaiting_ack_time */ 146 /* 0 min */ {RPC_CLOCK_SEC(1)}, 147 /* 1 */ {RPC_CLOCK_SEC(2)}, 148 /* 2 */ {RPC_CLOCK_SEC(4)}, 149 /* 3 */ {RPC_CLOCK_SEC(8)}, 150 /* 4 */ {RPC_CLOCK_SEC(15)}, 151 /* 5 def */ {RPC_CLOCK_SEC(30)}, 152 /* 6 */ {RPC_CLOCK_SEC(2*30)}, 153 /* 7 */ {RPC_CLOCK_SEC(4*30)}, 154 /* 8 */ {RPC_CLOCK_SEC(8*30)}, 155 /* 9 */ {RPC_CLOCK_SEC(16*30)}, 156 /* 10 infinite */ {RPC_CLOCK_SEC(0)} 157 }; 158 159 timestamp = xq->awaiting_ack_timestamp; 160 wait_time = xq_com_timeout_params[com_timeout_knob].max_xq_awaiting_ack_time; 161 162 if (xq->awaiting_ack && rpc__clock_aged(timestamp, wait_time) 163 && com_timeout_knob != rpc_c_binding_infinite_timeout) 164 { 165 RPC_DBG_GPRINTF( 166 ("(rpc__dg_xmitq_awaiting_ack_tmo) timeout (timestamp=%lu, wait_time=%lu, now=%lu) [%s]\n", 167 (unsigned long)timestamp, (unsigned long)wait_time, 168 (unsigned long)rpc__clock_stamp(), 169 rpc__dg_act_seq_string(&xq->hdr))); 170 return (true); 171 } 172 else 173 { 174 return (false); 175 } 176} 177 178/* 179 * S E N D _ B R O A D C A S T 180 * 181 * Send a datagram out all the appropriate broadcast addresses. 182 */ 183 184INTERNAL void send_broadcast ( 185 rpc_dg_call_p_t /*call*/, 186 rpc_socket_iovec_p_t /*iov*/, 187 int /*iovlen*/ 188 ); 189 190INTERNAL void send_broadcast 191( 192 rpc_dg_call_p_t call, 193 rpc_socket_iovec_p_t iov, 194 int iovlen 195) 196{ 197 unsigned32 st, j; 198 rpc_socket_error_t serr; 199 int i; 200 size_t sentcc, sendcc; 201 rpc_dg_sock_pool_elt_p_t sp; 202 unsigned_char_p_t endpoint = NULL; 203 rpc_dg_ccall_p_t ccall = (rpc_dg_ccall_p_t) call; 204 205 assert(RPC_DG_CALL_IS_CLIENT(call)); 206 207 sp = ccall->c.sock_ref; 208 209 /* 210 * See if we have already inquired about the broadcast addresses 211 * for this socket. If not, enable broadcasts on the socket (if 212 * necessary) and call the NAF routine for the broadcast addresses. 213 * (Note that we're ignoring the result of "rpc__socket_set_broadcast" 214 * because what's the worst that can happen?) 215 */ 216 217 if (sp->brd_addrs == NULL) 218 { 219 (void) rpc__socket_set_broadcast(sp->sock); 220 221 rpc__naf_get_broadcast(rpc_g_protseq_id[sp->pseq_id].naf_id, 222 ccall->h->c.c.rpc_addr->rpc_protseq_id, 223 &sp->brd_addrs, &st); 224 if (st != rpc_s_ok) 225 { 226 return; 227 } 228 } 229 230 sendcc = 0; 231 for (i = 0; i < iovlen; i++) 232 sendcc += iov[i].iov_len; 233 234 rpc__naf_addr_inq_endpoint(call->addr, &endpoint, &st); 235 236 for (j = 0; j < sp->brd_addrs->len; j++) 237 { 238 rpc__naf_addr_set_endpoint(endpoint, &sp->brd_addrs->addrs[j], &st); 239 240 RPC_DG_SOCKET_SENDMSG_OOL(sp->sock, iov, iovlen, 241 sp->brd_addrs->addrs[j], &sentcc, &serr); 242 243 RPC_DG_SOCK_UPDATE_ERR_COUNT(sp, serr); 244 245 if (RPC_SOCKET_IS_ERR(serr) || sentcc != sendcc) 246 { 247 RPC_DBG_GPRINTF(("(send_broadcast) sendmsg failed, sendcc = %ld, sentcc = %ld, error = %d\n", 248 sendcc, sentcc, RPC_SOCKET_ETOI(serr))); 249 break; 250 } 251 RPC_DG_STATS_INCR(pkts_sent); 252 RPC_DG_STATS_INCR(brds_sent); 253 } 254 255 RPC_MEM_FREE(endpoint, RPC_C_MEM_STRING); 256} 257 258/* 259 * R P C _ _ D G _ X M I T Q _ E L T _ X M I T 260 * 261 * Send the request/response packet denoted by the transmit queue element 262 * on the specified call. 263 */ 264 265PRIVATE void rpc__dg_xmitq_elt_xmit 266( 267 rpc_dg_xmitq_elt_p_t xqe, 268 rpc_dg_call_p_t call, 269 boolean32 block 270) 271{ 272 rpc_socket_iovec_t iov[RPC_C_DG_MAX_NUM_PKTS_IN_FRAG+1]; 273 int iovlen; 274 rpc_dg_xmitq_elt_p_t last_xqe = xqe; 275 rpc_dg_xmitq_p_t xq = &call->xq; 276 rpc_key_info_p_t key_info; 277 rpc_dg_auth_epv_p_t auth_epv; 278 size_t sentcc, sendcc; 279 unsigned32 original_seq = 0; 280 rpc_socket_error_t serr; 281 unsigned16 i; 282 int ptype; 283#ifdef MISPACKED_HDR 284 rpc_dg_raw_pkt_hdr_t raw_hdr; 285#endif 286 287 RPC_DG_CALL_LOCK_ASSERT(call); 288 289 /* 290 * First, make sure the socket we're about to use has not been 291 * disabled. If it has been, we might as well fault the call 292 * now. 293 */ 294 295 if (RPC_DG_SOCK_IS_DISABLED(call->sock_ref)) 296 { 297 RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5, 298 ("(rpc__dg_xmitq_elt_xmit) socket %p has been disabled\n", 299 call->sock_ref->sock)); 300 rpc__dg_call_signal_failure(call, rpc_s_socket_failure); 301 return; 302 } 303 304 key_info = call->key_info; 305 auth_epv = call->auth_epv; 306 307 /* 308 * Fill in the prototype header in the transmit queue using values 309 * from transmit queue element. 310 */ 311 312 xq->hdr.fragnum = xqe->fragnum; 313 xq->hdr.flags = xq->base_flags | xqe->flags; 314 xq->hdr.flags2 = xq->base_flags2; 315 xq->hdr.len = xqe->frag_len; 316 xq->hdr.auth_proto = 0; 317 318 /* 319 * Tag this packet with a serial number, and record that number 320 * into the packet header. 321 */ 322 323 xqe->serial_num = xq->next_serial_num++; 324 xq->hdr.serial_hi = (xqe->serial_num & 0xFF00) >> 8; 325 xq->hdr.serial_lo = xqe->serial_num & 0xFF;; 326 327 ptype = RPC_DG_HDR_INQ_PTYPE(&xq->hdr); 328 329 /* 330 * For response packets we need to use the highest sequence number 331 * we've seen up to this time, so that the client can keep track of 332 * the sequence number space. To work with 1.5.1 clients, we still 333 * need to use the original sequence number for facks, ping responses, 334 * etc. 335 */ 336 337 if (ptype == RPC_C_DG_PT_RESPONSE) 338 { 339 original_seq = xq->hdr.seq; 340 xq->hdr.seq = call->high_seq; 341 } 342 343 /* 344 * Only authenticate data packets -- setting key_info to NULL 345 * disables authentication machinery for this packet. 346 */ 347 348 if (!RPC_DG_PT_IS_DATA(ptype)) 349 key_info = NULL; 350 351 /* 352 * Set "last frag" bit as necessary. (Note that we make sure that 353 * last frags never ask for facks. Also, we don't set any of the 354 * frag bits if all the data is being sent in a single packet [i.e., 355 * we're NOT fragmenting at all]). 356 */ 357 358 if (xq->push && xqe == xq->tail && 359 RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_FRAG)) 360 xq->hdr.flags |= RPC_C_DG_PF_LAST_FRAG | RPC_C_DG_PF_NO_FACK; 361 362 /* 363 * If this is the call's final xqe (fragmented or not) or we're 364 * requesting a fack, then start up the awaiting_ack detection 365 * machinery (if it isn't already setup). The receiver needs to 366 * send a xmitq acknowledgment (i.e. a fack, working, response or 367 * ack pkt) sooner or later. 368 */ 369 370 if (! xq->awaiting_ack 371 && (! RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_FRAG) 372 || RPC_DG_HDR_FLAG_IS_SET(&xq->hdr, RPC_C_DG_PF_LAST_FRAG) 373 || ! RPC_DG_HDR_FLAG_IS_SET(&xq->hdr, RPC_C_DG_PF_NO_FACK))) 374 { 375 RPC_DG_XMITQ_AWAITING_ACK_SET(xq); 376 } 377 378 /* 379 * If we're authenticating, set auth_proto field in packet header 380 * to indicate which auth protocol we're using. 381 */ 382 if (key_info != NULL) 383 xq->hdr.auth_proto = auth_epv->auth_proto; 384 385 /* 386 * Set up an I/O vector with two elements: The first points to the 387 * header (from the transmit queue) and the second points to the 388 * body (from the transmit queue element). Note that in the case 389 * of "mispacked header" systems, we have to make a correctly formatted 390 * header before we can transmit. 391 */ 392 393 iov[0].iov_base = (byte_p_t) &xq->hdr; 394 iov[0].iov_len = RPC_C_DG_RAW_PKT_HDR_SIZE; 395 iovlen = 1; 396 sendcc = iov[0].iov_len; 397 398 iov[1].iov_base = (byte_p_t) xqe->body; 399 iov[1].iov_len = xqe->body_len; 400 401 if (xqe->body_len != 0) 402 { 403 iov[iovlen].iov_base = (byte_p_t) last_xqe->body; 404 iov[iovlen].iov_len = last_xqe->body_len; 405 sendcc += iov[iovlen++].iov_len; 406 407 while (last_xqe->more_data != NULL) 408 { 409 iov[iovlen].iov_base = (byte_p_t) last_xqe->more_data->body; 410 iov[iovlen].iov_len = last_xqe->more_data->body_len; 411 sendcc += iov[iovlen++].iov_len; 412 last_xqe = last_xqe->more_data; 413 } 414 } 415 416 assert(iovlen > 0 && iovlen <= RPC_C_DG_MAX_NUM_PKTS_IN_FRAG + 1); 417 assert(xqe->frag_len == 0 || 418 (unsigned32)sendcc == (xqe->frag_len + RPC_C_DG_RAW_PKT_HDR_SIZE)); 419 420 xqe->frag_len = xq->hdr.len = sendcc - RPC_C_DG_RAW_PKT_HDR_SIZE; 421 422 RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5, 423 ("(rpc__dg_xmitq_elt_xmit) %s %lu.%u.%u len=%lu %s\n", 424 rpc__dg_pkt_name(RPC_DG_HDR_INQ_PTYPE(&call->xq.hdr)), 425 (unsigned long)call->xq.hdr.seq, xqe->fragnum, 426 xqe->serial_num, (unsigned long)xq->hdr.len, 427 RPC_DG_HDR_FLAG_IS_SET(&xq->hdr, RPC_C_DG_PF_NO_FACK) ? "" : "frq")); 428 429#ifdef MISPACKED_HDR 430 /* !!! ...compress hdr pointed to by iov[0] into raw_hdr... !!! */ 431 hdrp = iov[0].base; 432 compress_hdr(&xq->hdr, &raw_hdr); 433 iov[0].base = (byte_p_t) &raw_hdr; 434#endif 435 436 /* 437 * Attach any authentication information now. 438 */ 439 440 if (key_info != NULL) 441 { 442 unsigned32 st; 443 int overhead; 444 445 dce_pointer_t cksum = last_xqe->body->args + last_xqe->body_len; 446 447 (*auth_epv->pre_send) (key_info, xqe, &xq->hdr, iov, iovlen, cksum, &st); 448 if (st != 0) 449 { 450 RPC_DBG_GPRINTF( 451 ("(rpc__dg_xmitq_elt_xmit) auth pre_send failed, status = %x\n", 452 st)); 453 rpc__dg_call_signal_failure (call, st); 454 return; 455 } 456 overhead = auth_epv->overhead; 457 458 if (iovlen == 1) 459 { 460 iovlen = 2; 461 iov[1].iov_len += overhead; 462 } 463 else 464 { 465 iov[iovlen-1].iov_len += overhead; 466 } 467 sendcc += overhead; 468 } 469 RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5, 470 ("(rpc__dg_xmitq_elt_xmit) iovlen %d, sendcc %ld\n", 471 iovlen, sendcc)); 472 473 /* 474 * Send out the datagram (checking whether we should broadcast it). 475 * For the non-broadcast cast, we accept a few EWOULDBLOCK send 476 * failures (by sleeping for a bit and then trying again). We wish 477 * we could have the socket in blocking I/O mode, but the receive 478 * path really wants the socket in NON-blocking mode. 479 */ 480 481 if (RPC_DG_FLAG_IS_SET(call->xq.base_flags, RPC_C_DG_PF_BROADCAST)) 482 { 483 send_broadcast(call, iov, iovlen); 484 } 485 else 486 { 487 for (i = 0; i < MAX_SENDMSG_RETRIES; i++) 488 { 489 RPC_DG_SOCKET_SENDMSG(call->sock_ref->sock, iov, iovlen, 490 call->addr, &sentcc, &serr); 491 492 RPC_DG_SOCK_UPDATE_ERR_COUNT(call->sock_ref, serr); 493 494 if (! RPC_SOCKET_IS_ERR(serr) && sentcc == sendcc) 495 { 496 RPC_DG_STATS_INCR(pkts_sent); 497 RPC_DG_STATS_INCR(pstats[RPC_DG_HDR_INQ_PTYPE(&xq->hdr)].sent); 498 break; 499 } 500 501 if (! RPC_SOCKET_ERR_EQ(serr, RPC_C_SOCKET_EWOULDBLOCK)) 502 { 503 RPC_DBG_GPRINTF( 504 ("(rpc__dg_xmitq_elt_xmit) sendmsg failed, sendcc = %ld, sentcc = %ld, error = %d\n", 505 sendcc, sentcc, RPC_SOCKET_ETOI(serr))); 506 break; 507 } 508 509 if (! block) 510 break; 511 512 /* 513 * Handle EWOULDBLOCKs waiting for the condition to go away. 514 */ 515 516 RPC_DBG_PRINTF(rpc_e_dbg_xmit, 2, 517 ("(rpc__dg_xmitq_elt_xmit) sendmsg failed with EWOULDBLOCK; waiting\n")); 518 519 rpc__socket_nowriteblock_wait(call->sock_ref->sock, NULL); 520 } 521 } 522 523 xq->timestamp = rpc__clock_stamp(); 524 525 if (RPC_DG_HDR_INQ_PTYPE(&xq->hdr) == RPC_C_DG_PT_RESPONSE) 526 { 527 xq->hdr.seq = original_seq; 528 } 529} 530 531/* 532 * R P C _ _ D G _ X M I T Q _ I N I T 533 * 534 * Initialize a transmit queue (rpc_dg_xmit_q_t). Note that we DON'T 535 * fill in (all) the prototype packet header here because we don't have 536 * enough info to do it at this point. 537 */ 538 539PRIVATE void rpc__dg_xmitq_init 540( 541 rpc_dg_xmitq_p_t xq 542) 543{ 544 /* 545 * Presumably the call is either locked or "private" at this point 546 * RPC_DG_CALL_LOCK_ASSERT(call); 547 */ 548 549 rpc__dg_xmitq_reinit(xq); 550 551 xq->max_rcv_tsdu = RPC_C_DG_INITIAL_MAX_PKT_SIZE; 552 xq->max_snd_tsdu = RPC_C_DG_INITIAL_MAX_PKT_SIZE; 553 xq->max_frag_size = RPC_C_DG_INITIAL_MAX_PKT_SIZE; 554 xq->snd_frag_size = RPC_C_DG_INITIAL_MAX_PKT_SIZE; 555 xq->max_blast_size = RPC_C_DG_INITIAL_MAX_BLAST_SIZE; 556 xq->xq_timer = RPC_C_DG_INITIAL_XQ_TIMER; 557 xq->xq_timer_throttle = 1; 558 xq->high_cwindow = 0; 559 560 /* 561 * Initialize some highly constant fields (RPC protocol version and 562 * local NDR drep) in the xmitq's prototype packet header. 563 */ 564 565 RPC_DG_HDR_SET_VERS(&xq->hdr); 566 RPC_DG_HDR_SET_DREP(&xq->hdr); 567 568 xq->hdr.auth_proto = 0; 569} 570 571/* 572 * R P C _ _ D G _ X M I T Q _ R E I N I T 573 * 574 * Reinitialize a transmit queue (rpc_dg_xmit_q_t). Note that we DON'T 575 * fill in the prototype packet header here because we don't have enough 576 * info to do it at this point. Calling this routine will retain some 577 * state between calls; see xmitq_init for which state is retained. 578 */ 579 580PRIVATE void rpc__dg_xmitq_reinit 581( 582 rpc_dg_xmitq_p_t xq 583) 584{ 585 /* 586 * Presumably the call is either locked or "private" at this point 587 * RPC_DG_CALL_LOCK_ASSERT(call); 588 */ 589 590 xq->head = xq->first_unsent = xq->tail = xq->part_xqe = xq->rexmitq = NULL; 591 592 xq->next_fragnum = 0; 593 xq->next_serial_num = 0; 594 xq->last_fack_serial = -1; 595 xq->cwindow_size = 0; 596 xq->window_size = RPC_C_DG_INITIAL_WINDOW_SIZE; 597 xq->blast_size = 0; 598 xq->freqs_out = 0; 599 xq->push = false; 600 xq->awaiting_ack = false; 601 xq->rexmit_timeout = RPC_C_DG_INITIAL_REXMIT_TIMEOUT; 602 /* 603 * Temporarily, we turn off the first fack wait logic for recovering 604 * the small in/out performance. We'll revisit it later. 605 */ 606 xq->first_fack_seen = true; 607} 608 609/* 610 * R P C _ _ D G _ X M I T Q _ F R E E 611 * 612 * Frees data referenced by a transmit queue (rpc_dg_xmit_q_t). The 613 * transmit queue itself is NOT freed, since it's (assumed to be) part 614 * of a larger structure. Clearly this means any sort of xmitq related 615 * ack must have arrived. 616 */ 617 618PRIVATE void rpc__dg_xmitq_free 619( 620 rpc_dg_xmitq_p_t xq, 621 rpc_dg_call_p_t call 622) 623{ 624 /* 625 * presumably the call is either locked or 'private' at this point 626 * RPC_DG_CALL_LOCK_ASSERT(call); 627 */ 628 629 RPC_DG_XMITQ_AWAITING_ACK_CLR(xq); 630 631 while (xq->head != NULL) 632 { 633 rpc_dg_xmitq_elt_p_t xqe = xq->head; 634 635 xq->head = xqe->next; 636 rpc__dg_pkt_free_xqe(xqe, call); 637 } 638 xq->first_unsent = xq->tail = xq->rexmitq = NULL; 639 640 /* 641 * Clear any previously set blast_size. 642 */ 643 644 xq->blast_size = 0; 645} 646 647/* 648 * R P C _ _ D G _ X M I T Q _ A P P E N D _ P P 649 * 650 * Append a transmit queue's partial packet to the transmit queue itself. 651 */ 652 653PRIVATE void rpc__dg_xmitq_append_pp 654( 655 rpc_dg_call_p_t call, 656 unsigned32 *st 657) 658{ 659 rpc_dg_xmitq_p_t xq = &call->xq; 660 rpc_dg_xmitq_elt_p_t xqe = xq->part_xqe; 661 rpc_key_info_p_t key_info = call->key_info; 662 int ptype; 663 unsigned32 frag_length = 0; /* # bytes in the body of the fragment */ 664 rpc_dg_xmitq_elt_p_t last_xqe = xqe; 665 666 *st = rpc_s_ok; 667 668 RPC_DG_CALL_LOCK_ASSERT(call); 669 670 if (xqe == NULL) 671 return; 672 673 /* 674 * Compute the fragment length and store it. 675 */ 676 677 frag_length = last_xqe->body_len; 678 679 while (last_xqe->more_data != NULL) 680 { 681 frag_length += last_xqe->more_data->body_len; 682 last_xqe = last_xqe->more_data; 683 } 684 685 xqe->frag_len = frag_length; 686 687 xqe->next = NULL; 688 xqe->fragnum = xq->next_fragnum++; 689 xqe->flags = 0; 690 691 /* 692 * Add the partial xqe to the queue. 693 */ 694 695 if (xq->head == NULL) 696 { 697 xq->head = xq->tail = xq->first_unsent = xqe; 698 } 699 else 700 { 701 xq->tail->next = xqe; 702 xq->tail = xqe; 703 if (xq->first_unsent == NULL) 704 xq->first_unsent = xqe; 705 } 706 707 /* 708 * "Normal" idempotent operations with *large ins* get tagged as 709 * non-idempotent calls. Old (1.5.1) servers (and 2.0 as well) can 710 * dispose of their response packet in the case of idempotent calls 711 * with "small outs". V2.0 clients dispose of acknowledged input 712 * stream frags hence, they are unable to rerun an idempotent call 713 * with large ins in the event that the server response is lost. 714 * 715 * The condition below is triggered for fragnum 0 only if we aren't 716 * 'pushing' the queue; ie. there are more packets to follow so we 717 * can assume the call has large INS. 718 */ 719 720 if (xqe->fragnum == 0 && ! xq->push) 721 { 722 if (RPC_DG_HDR_INQ_PTYPE(&xq->hdr) == RPC_C_DG_PT_REQUEST 723 && ! RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_MAYBE)) 724 { 725 xq->base_flags &= ~RPC_C_DG_PF_IDEMPOTENT; 726 } 727 } 728 729 /* 730 * Set the "frag" bit appropriately. The only time we don't set 731 * this bit is when the first packet is being pushed from xmitq_push. 732 * If a packet is appended at any time before we begin "pushing," 733 * we can be sure that the xmit is fragmented. Once the flag is 734 * set, it never gets unset, so no need to worry that the last frag 735 * will reset the flag. 736 */ 737 738 if (! xq->push) 739 { 740 xq->base_flags |= RPC_C_DG_PF_FRAG; 741 } 742 743 xq->part_xqe = NULL; 744 745 /* 746 * Only encrypt data packets -- setting key_info to NULL 747 * disables encrypt machinery for this packet. 748 */ 749 750 ptype = RPC_DG_HDR_INQ_PTYPE(&xq->hdr); 751 if (!RPC_DG_PT_IS_DATA(ptype)) 752 key_info = NULL; 753 754 if (key_info != NULL) 755 { 756 rpc_dg_auth_epv_p_t auth_epv = call->auth_epv; 757 unsigned32 blocksize = auth_epv->blocksize; 758 759 /* 760 * If the packet length isn't a multiple of the encryption block 761 * size, round it up now. 762 */ 763 frag_length = (((frag_length) + blocksize - 1) 764 / blocksize) * blocksize; 765 last_xqe->body_len += (frag_length - xqe->frag_len); 766 xqe->frag_len = frag_length; 767 768 assert(RPC_C_DG_RAW_PKT_HDR_SIZE + frag_length + auth_epv->overhead <= xq->snd_frag_size); 769 770 if (last_xqe->body_len + auth_epv->overhead 771 > RPC_C_DG_MAX_PKT_BODY_SIZE) 772 { 773 /* 774 * This can happen if the fragment gets pushed. 775 */ 776 last_xqe->more_data = rpc__dg_pkt_alloc_xqe(call, st); 777 if (*st != rpc_s_ok) 778 return; 779 } 780 (*auth_epv->encrypt) (key_info, xqe, st); 781 if (*st != rpc_s_ok) 782 return; 783 } 784} 785 786/* 787 * R P C _ _ D G _ X M I T Q _ R E S T A R T 788 * 789 * Timeout everything on the transmit queue. This routine is used when 790 * we infer that the receiver has lost some of the data previously sent 791 * it. Situations in which we are forced to *infer* data loss include 792 * receiving a no-call in response to a ping, and receiving a ping while 793 * we are in the xmit of final states. In such situations we recover 794 * by beginning to send the xmitq again, and hoping the flow control 795 * login will kick in. 796 */ 797 798PRIVATE void rpc__dg_xmitq_restart 799( 800 rpc_dg_call_p_t call 801) 802{ 803 rpc_dg_xmitq_p_t xq = &call->xq; 804 rpc_dg_xmitq_elt_p_t xqe, tail = NULL; 805 unsigned32 rexmit_cnt = 0; 806 807 /* 808 * If the xmitq has already been set up to do a transmission, leave 809 * it alone. Since 'restarting' the queue is a meat-axe approach 810 * to error recovery, we'll defer to any 'normal' processing that 811 * might also be trying to handle the xmitq. 812 */ 813 814 if (RPC_DG_CALL_READY_TO_SEND(call)) 815 { 816 RPC_DG_START_XMIT(call); 817 return; 818 } 819 820 for (xqe = xq->head; xqe != NULL && xqe != xq->first_unsent; 821 xqe = xqe->next) 822 { 823 rexmit_cnt++; 824 825 /* 826 * If the packets is counted in the current congestion window, 827 * remove it. Also check to see if the packet counted as one 828 * of our outstanding fack requests. 829 */ 830 831 if (xqe->in_cwindow) 832 { 833 xqe->in_cwindow = false; 834 xq->cwindow_size--; 835 if (! RPC_DG_FLAG_IS_SET(xqe->flags, RPC_C_DG_PF_NO_FACK) || 836 RPC_DG_FLAG_IS_SET(xqe->flags, RPC_C_DG_PF_LAST_FRAG)) 837 { 838 xq->freqs_out--; 839 } 840 } 841 if (rexmit_cnt == 1) 842 xq->rexmitq = xqe; 843 else 844 tail->next_rexmit = xqe; 845 846 xqe->next_rexmit = NULL; 847 tail = xqe; 848 } 849 850 /* 851 * If we didn't find any packets to retransmit, then let's send 852 * the first unsent packet. 853 */ 854 855 if (rexmit_cnt == 0 && xq->first_unsent != NULL) 856 { 857 rexmit_cnt = 1; 858 } 859 860 xq->blast_size = MIN(rexmit_cnt, RPC_C_DG_INITIAL_BLAST_SIZE); 861 862 if (RPC_DG_CALL_READY_TO_SEND(call)) 863 { 864 RPC_DG_START_XMIT(call); 865 } 866} 867