1/*
2 * Copyright (c) 2010 Apple Inc. All rights reserved.
3 *
4 * @APPLE_LICENSE_HEADER_START@
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1.  Redistributions of source code must retain the above copyright
11 *     notice, this list of conditions and the following disclaimer.
12 * 2.  Redistributions in binary form must reproduce the above copyright
13 *     notice, this list of conditions and the following disclaimer in the
14 *     documentation and/or other materials provided with the distribution.
15 * 3.  Neither the name of Apple Inc. ("Apple") nor the names of its
16 *     contributors may be used to endorse or promote products derived from
17 *     this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY
20 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY
23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 *
30 * Portions of this software have been released under the following terms:
31 *
32 * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC.
33 * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY
34 * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION
35 *
36 * To anyone who acknowledges that this file is provided "AS IS"
37 * without any express or implied warranty:
38 * permission to use, copy, modify, and distribute this file for any
39 * purpose is hereby granted without fee, provided that the above
40 * copyright notices and this notice appears in all source code copies,
41 * and that none of the names of Open Software Foundation, Inc., Hewlett-
42 * Packard Company or Digital Equipment Corporation be used
43 * in advertising or publicity pertaining to distribution of the software
44 * without specific, written prior permission.  Neither Open Software
45 * Foundation, Inc., Hewlett-Packard Company nor Digital
46 * Equipment Corporation makes any representations about the suitability
47 * of this software for any purpose.
48 *
49 * Copyright (c) 2007, Novell, Inc. All rights reserved.
50 * Redistribution and use in source and binary forms, with or without
51 * modification, are permitted provided that the following conditions
52 * are met:
53 *
54 * 1.  Redistributions of source code must retain the above copyright
55 *     notice, this list of conditions and the following disclaimer.
56 * 2.  Redistributions in binary form must reproduce the above copyright
57 *     notice, this list of conditions and the following disclaimer in the
58 *     documentation and/or other materials provided with the distribution.
59 * 3.  Neither the name of Novell Inc. nor the names of its contributors
60 *     may be used to endorse or promote products derived from this
61 *     this software without specific prior written permission.
62 *
63 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
64 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
65 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
66 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY
67 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
68 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
69 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
70 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
71 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
72 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
73 *
74 * @APPLE_LICENSE_HEADER_END@
75 */
76
77/*
78**
79**  NAME:
80**
81**      dgcall.c
82**
83**  FACILITY:
84**
85**      Remote Procedure Call (RPC)
86**
87**  ABSTRACT:
88**
89**  DG protocol service routines.  Handles call (CALL) handles.
90**
91**
92*/
93
94#include <dg.h>
95#include <dgrq.h>
96#include <dgxq.h>
97#include <dgpkt.h>
98#include <dgccall.h>
99#include <dgcall.h>
100#include <dgexec.h>
101#include <comauth.h>
102
103/*
104 * R P C _ _ D G _ C A L L _ S T A T E _ N A M E
105 *
106 * Return the text name of an activity state.  This can't simply be a variable
107 * because of the vagaries of global libraries.
108 */
109
110#ifdef DEBUG
111
112PRIVATE const char *rpc__dg_call_state_name
113(
114    rpc_dg_call_state_t state
115)
116{
117    static const char *names[] = {
118        "init",
119        "xmit",
120        "recv",
121        "final",
122        "idle",
123        "orphan"
124    };
125
126    if ((int)state > (int)rpc_e_dg_cs_orphan)
127        return("unknown");
128
129    return(names[(int) state]);
130}
131
132#endif /* DEBUG */
133
134/*
135 * R P C _ _ D G _ C A L L _ X M I T _ F A C K
136 *
137 * Transmit a "fack" appropriate to the state of the passed "call".
138 *
139 * Note that it's possible that a call might want to send a fack
140 * but not currently be holding onto an RQE (if the call just got
141 * dequeued, for example).  In such a case, it can send NULL for the
142 * rqe parameter.
143 */
144
145PRIVATE void rpc__dg_call_xmit_fack
146(
147    rpc_dg_call_p_t call,
148    rpc_dg_recvq_elt_p_t rqe,
149    boolean32 is_nocall
150)
151{
152    unsigned32 *maskp = NULL, num_elements = 0, i, shift;
153    rpc_socket_iovec_t iov[3];
154    rpc_dg_pkt_hdr_t hdr;
155#ifndef MISPACKED_HDR
156    rpc_dg_fackpkt_body_t body;
157#else
158    rpc_dg_raw_fackpkt_body_t body;
159#endif
160    boolean b;
161    rpc_dg_recvq_p_t rq = &call->rq;
162    rpc_dg_xmitq_p_t xq = &call->xq;
163    rpc_dg_recvq_elt_p_t rqe_p;
164    unsigned32 is_rationing;
165    unsigned32 low_on_pkts;
166
167    RPC_DG_CALL_LOCK_ASSERT(call);
168
169    /*
170     * Create a pkt header initialized with the prototype's contents.
171     */
172
173    hdr = xq->hdr;
174
175    RPC_DG_HDR_SET_PTYPE(&hdr, is_nocall ? RPC_C_DG_PT_NOCALL : RPC_C_DG_PT_FACK);
176
177    hdr.flags       = 0;
178    hdr.len         = RPC_C_DG_RAW_FACKPKT_BODY_SIZE;
179    hdr.fragnum     = rq->next_fragnum - 1;
180
181    /*
182     * Create the fack packet's body.
183     */
184
185#ifndef MISPACKED_HDR
186    body.vers           = RPC_C_DG_FACKPKT_BODY_VERS;
187    body.pad1           = 0;
188    body.max_tsdu       = xq->max_rcv_tsdu;
189
190    /*
191     * Before advertising our max_frag_size, make sure that we have
192     * enough packets reserved. We advertise what we can actually
193     * receive.
194     */
195    if (call->n_resvs >= call->max_resvs)
196    {
197        /*
198         * We already have the maximum reservation.
199         */
200        body.max_frag_size  = xq->max_frag_size;
201    }
202    else if (RPC_DG_CALL_IS_SERVER(call) &&
203             (call->n_resvs == 0 || ((rpc_dg_scall_p_t) call)->call_is_queued))
204    {
205        /*
206         * If the scall doesn't have the reservation, it hasn't got to the
207         * execution phase yet. We will advertise the minimum fragment
208         * size.
209         * Also, a queued scall shouldn't increase the reservation.
210         *
211         * If this is a callback scall with the private socket, should we
212         * try the reservation? Probably not.
213         */
214        body.max_frag_size = RPC_C_DG_MUST_RECV_FRAG_SIZE;
215    }
216    else
217    {
218        /*
219         * The ccall with the private socket may not have the reservation
220         * yet. We will try the reservation here because 1) we are in the
221         * receive state and 2) if we don't advertise larger fragment size,
222         * the large OUTs call can not take advantage of the multi-buffer
223         * fragment.
224         */
225
226        if (rpc__dg_pkt_adjust_reservation(call, call->max_resvs, false))
227        {
228            body.max_frag_size  = xq->max_frag_size;
229        }
230        else if (call->n_resvs == 0)
231        {
232            /*
233             * The ccall with the private socket has no reservation.
234             */
235            body.max_frag_size = RPC_C_DG_MUST_RECV_FRAG_SIZE;
236        }
237        else
238        {
239            /*
240             * We can receive whatever fits in the reserved packets.
241             */
242            RPC_DG_NUM_PKTS_TO_FRAG_SIZE(call->n_resvs, body.max_frag_size);
243        }
244    }
245
246    if (rqe != NULL)
247    {
248        if (rqe->hdrp == NULL)
249            body.serial_num = call->rq.head_serial_num;
250        else
251            body.serial_num =
252                (rqe->hdrp->serial_hi << 8) | rqe->hdrp->serial_lo;
253    }
254    else
255        body.serial_num     = call->rq.high_serial_num;
256
257    body.selack_len     = 0;
258    body.selack[0]      = 0;
259
260    /*
261     * Determine what to advertise as the window size.  Ignoring other
262     * considerations, the window size is the amount of buffering we
263     * expect the underlying system to provide if we get behind in
264     * processing incoming data.
265     *
266     * However, there are certain situations in which we might want to
267     * advertise a smaller window size--
268     *
269     *     Queued calls are not allowed to add data to their receive
270     *     queues.  In this case we would advertise a window size of
271     *     0, to tell the sender to stop sending data.
272     *
273     *     Server call threads that have not yet acquired a packet
274     *     reservation.  This situation is treated the same as for queued
275     *     calls.  Client threads without reservations are treated
276     *     differently; the only way they could have proceeded to this
277     *     stage (ie. receiving a response) is if they are using a private
278     *     socket with private packets.
279     *
280     *     When the system is rationing, advertise a window of 1, since
281     *     this is all that a call is allowed to queue.
282     *
283     *     If the packet's 'low_on_pkts' flag is set, advertise a window
284     *     of 2.  The low_on_pkts flag indicates that the number of free
285     *     packets is less that two times the number of reservations.
286     *     In such a case, it makes no sense to advertise a full window.
287     *
288     *     Calls whose recvq length is approaching its maximum allowable
289     *     value.  In this case we don't want the sender to send any
290     *     more than the amount of space available on the receive queue.
291     *
292     * We need to adjust the window size from the number of fragments
293     * to kilobytes as per AES.
294     *
295     * When talking to the older runtime, which expects the window size
296     * in the number of packets, the fack receiver may induce more data
297     * in the network. We don't think that this is a problem because the
298     * window size is an advisory information.
299     *
300     * If the max_frag_size is not a multiple of 1024, we advertise less
301     * than what we think, which is problematic in rationing and
302     * low_on_pkts cases because we may stop(!) the sender. Thus in
303     * these two cases we don't adjust the window size.
304     */
305    is_rationing = rpc__dg_pkt_is_rationing(&low_on_pkts);
306
307    if (call->n_resvs == 0 && RPC_DG_CALL_IS_SERVER(call))
308    {
309        body.window_size = 0;
310    }
311    else if (is_rationing)
312    {
313        body.window_size = 1;
314    }
315    else if (low_on_pkts)
316    {
317        body.window_size = 2;
318    }
319    else
320    {
321        /*
322         * If high_rcv_frag_size > RPC_C_DG_MUST_RECV_FRAG_SIZE, that means
323         * we have advertised MBF and the sender can cope with it. Thus,
324         * each slots on the recvq takes xq->max_frag_size.
325         *
326         * Otherwise, we make a conservative (or wild?) guess that the
327         * sender will send a single buffer fragment.
328         */
329        if (rq->high_rcv_frag_size > RPC_C_DG_MUST_RECV_FRAG_SIZE)
330        {
331            body.window_size = ((rq->max_queue_len - rq->queue_len)
332                                * xq->max_frag_size) >> 10;
333        }
334        else
335        {
336            body.window_size = ((rq->max_queue_len - rq->queue_len)
337                                * RPC_C_DG_MUST_RECV_FRAG_SIZE) >> 10;
338        }
339        body.window_size = MIN(rq->window_size, body.window_size);
340    }
341
342    /*
343     * See if we need to send selective ack information.
344     */
345
346    if (RPC_DG_FRAGNUM_IS_LT(rq->next_fragnum, rq->high_fragnum))
347    {
348        /*
349         * Determine the number of elements in the selective ack mask
350         * array, worrying about wrap-around.  If we need only one, just
351         * use the fack packet.  Otherwise, we need to allocate an array
352         * to hold the entire mask.  Either way, initialize the mask
353         * to 0.
354         */
355
356        num_elements = (((unsigned16)(rq->high_fragnum - rq->next_fragnum))
357                       / 32) + 1;
358
359        if (num_elements == 1)
360            maskp = &body.selack[0];
361        else
362        {
363            RPC_MEM_ALLOC(maskp, unsigned32 *, num_elements * sizeof *maskp,
364                RPC_C_MEM_DG_SELACK_MASK, RPC_C_MEM_NOWAIT);
365        }
366        for (i = 0; i < num_elements; i++)
367            *(maskp + i) = 0;
368
369        body.selack_len = num_elements;
370
371        /*
372         * Loop through the list of received packets which have fragnums
373         * greater than that which is explicitly mentioned in this fack,
374         * and add them to the selective ack bit mask.  If there are
375         * in-order frags, they would have been explicitly acked, so
376         * start with the next frag.  If there are no in-order frags,
377         * start at the head of the queue.
378         */
379
380        rqe_p = (rq->last_inorder != NULL) ? rq->last_inorder->next : rq->head;
381
382        while (rqe_p != NULL )
383        {
384            /*
385             * Calculate the number of bits we would have to shift the
386             * current fragnum into a bitmask of unlimited size, worrying
387             * about wrap- around.  Munge the shift value to get the
388             * bit into the correct mask element.  Currently, shift will
389             * almost always be less than 32, so the assignment is
390             * effectively
391             *                *maskp &= 1 << shift;
392             */
393
394            shift = rqe_p->hdrp->fragnum - (unsigned16) rq->next_fragnum;
395            *(maskp + (shift/32)) |= 1 << (shift%32);
396            rqe_p = rqe_p->next;
397        }
398    }
399#else
400    !!!
401#endif
402
403    /*
404     * Setup the iov and send the packet.
405     */
406
407    iov[0].iov_base = (byte_p_t) &hdr;
408    iov[0].iov_len  = RPC_C_DG_RAW_PKT_HDR_SIZE;
409    iov[1].iov_base = (byte_p_t) &body;
410    iov[1].iov_len  = hdr.len;
411
412    if (num_elements > 1)
413    {
414        /*
415         * If we need to send multiple selective ack masks, subtract
416         * the length of the one built into the fack packet from the
417         * second iov element, and setup a third iov element to hold
418         * the extended mask.  Note that in setting the header length,
419         * we need to remember that the fack body size includes the size
420         * of one mask element, thus the subtraction.
421         */
422
423        iov[1].iov_len -= sizeof *maskp;
424        hdr.len = RPC_C_DG_RAW_FACKPKT_BODY_SIZE +
425                  ((num_elements - 1) * sizeof *maskp);
426        iov[2].iov_base = (byte_p_t) maskp;
427        iov[2].iov_len  = num_elements  * sizeof *maskp;
428    }
429
430    rpc__dg_xmit_pkt(call->sock_ref->sock, call->addr, iov, num_elements > 1 ? 3 : 2, &b);
431
432    RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5,
433        ("(rpc__dg_call_xmit_fack) %d.%u ws %d, tsdu %u, fs %u, mask [0x%x]\n",
434        hdr.fragnum, body.serial_num, body.window_size,
435        body.max_tsdu, body.max_frag_size,
436        num_elements > 1 ? *maskp : body.selack[0]));
437
438    /*
439     * Free any memory we may have allocated.
440     */
441
442    if (num_elements > 1)
443    {
444        RPC_MEM_FREE(maskp, RPC_C_MEM_DG_SELACK_MASK);
445    }
446}
447
448/*
449 * R P C _ _ D G _  C A L L  _ X M I T
450 *
451 * Transmit one blast of packets.
452 *
453 * This routine expects that a blast's worth of packets have been queued
454 * and that the fack request frequency has been set.  It is the callers
455 * responsibility to put the queue into a sensible state, and to make
456 * sure that the last packet is tagged appropriately.
457 */
458
459PRIVATE void rpc__dg_call_xmit
460(
461    rpc_dg_call_p_t call,
462    boolean32 block
463)
464{
465    unsigned32 num_sent, extra_fack = 0;
466    unsigned8 freqs_total;
467    boolean rexmiting = true;
468    rpc_dg_xmitq_p_t xq = &call->xq;
469    unsigned8 blast_size = xq->blast_size;
470    rpc_dg_xmitq_elt_p_t xqe = xq->rexmitq;
471    /*
472     * Below is a table for mapping window sizes to the number of fack
473     * requests per window that would be appropriate for that window.
474     * Inducing an appropriate number of facks per RTT serves two purposes:
475     *
476     *   1) it increases the granularity of the ack clock we use to do
477     *   transmissions, allowing us to decrease the blast size needed to
478     *   keep the congestion window full, and to achieve a better temporal
479     *   spacing of the packets within the RTT.
480
481     *   2) it provides redundancy, increasing the likelihood that we will
482     *   detect packet loss without having to timeout.
483     *
484     * The trade-off is the increase in backward traffic.  The table below
485     * specifies what we think are the best compromises for a given window
486     * size.  (The macro that follows just helps to hide the details.)
487     * Whenever the advertised window size stored in the xmitq is updated
488     * (do_fack_body), this table is consulted for an an appropriate
489     * 'outstanding fack' count.  Note that the table is actually declared
490     * in the routine do_fack_body, where it is used.
491     */
492     static unsigned8 window_size_to_freqs_out[] =
493                    {   0, 1, 2, 2, 2,
494                           3, 3, 3, 3,
495                           3, 4, 4, 4,
496                           4, 4, 4, 4 };
497
498    RPC_DG_CALL_LOCK_ASSERT(call);
499
500    /*
501     * Determine an appropriate number of outstanding fack requests
502     * per RTT.
503     */
504    freqs_total = (call)->xq.window_size > 16 ?
505                               (call)->xq.window_size / 4 :
506                               window_size_to_freqs_out[(call)->xq.window_size];
507
508    /*
509     * If are more than 1 away from our target, set extra_fack to
510     * the number of the packet within this blast which should request
511     * a fack (The last packet always asks for a fack).  We don't set
512     * this value above 2 because in the case where the sender is slower
513     * than the receiver, it will never be able to fully open the
514     * congestion window and will end up requesting facks on every packet.
515     */
516
517    if (freqs_total - xq->freqs_out > 1)
518    {
519        extra_fack = blast_size / 2;
520    }
521
522    /*
523     * Send up to a blast size worth of retransmits/new-transmits.
524     */
525
526    for (num_sent = 1; num_sent <= blast_size; num_sent++)
527    {
528        /*
529         * If we have gotten to the end of the retransmit queue, switch
530         * over to the regular queue.
531         */
532
533        if (xqe == NULL && rexmiting)
534        {
535            xqe = xq->first_unsent;
536            rexmiting = false;
537        }
538
539        /*
540         * If we have gotten to the end of the regular queue, we're done
541         * early.
542         */
543
544        if (xqe == NULL)
545        {
546            RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5,
547                           ("(rpc__dg_call_xmit) Premature end of queue\n"));
548            break;
549        }
550
551        if (RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_FRAG))
552        {
553            if (num_sent == extra_fack || num_sent == blast_size)
554            {
555                xqe->flags &= ~ RPC_C_DG_PF_NO_FACK;
556                /*
557                 * Make note of newly outstanding fack request.
558                 */
559                xq->freqs_out++;
560            }
561            else
562                xqe->flags |= RPC_C_DG_PF_NO_FACK;
563        }
564        else
565        {
566            xq->freqs_out++;
567        }
568
569        /*
570         * Mark xqe as part of the current congestion window.
571         */
572
573        xqe->in_cwindow = true;
574
575        rpc__dg_xmitq_elt_xmit(xqe, call, block);
576
577        if (rexmiting)
578        {
579            xqe = xqe->next_rexmit;
580            RPC_DG_STATS_INCR(dups_sent);
581        }
582        else
583        {
584            xqe = xqe->next;
585            xq->first_unsent = xqe;
586        }
587    }
588
589    /*
590     * Bump congestion window count, and adjust high-water mark if
591     * appropriate.
592     */
593
594    xq->cwindow_size += num_sent - 1;
595    if (xq->cwindow_size > xq->high_cwindow)
596       xq->high_cwindow = xq->cwindow_size;
597
598    xq->timestamp = rpc__clock_stamp();
599    xq->rexmitq = NULL;
600    xq->blast_size = 0;
601}
602
603/*
604 * R P C _ _ D G _ C A L L _ X M I T Q _ T I M E R
605 *
606 * Do any time-based retransmissions for a call's transmit queue.
607 */
608
609PRIVATE void rpc__dg_call_xmitq_timer
610(
611    rpc_dg_call_p_t call
612)
613{
614    rpc_dg_xmitq_p_t xq = &call->xq;
615    rpc_dg_xmitq_elt_p_t xqe;
616
617    RPC_DG_CALL_LOCK_ASSERT(call);
618
619    /*
620     * If the call is in an error state there's no need to keep
621     * transmitting things off the xmitq...  with the exception of the
622     * call faulted error where the fault pkt is on the xmitq kinda
623     * masquerading as a response pkt.  The fault response may (depends
624     * if the call is idempotent or not) be required to be "robust" and
625     * receive an ack.
626     */
627    if (call->status != rpc_s_ok
628        && call->status != rpc_s_call_faulted)
629    {
630        return;
631    }
632
633    /*
634     * If there's nothing on the xmitq or we haven't reached the time
635     * to do retransmits for this xmitq, just return now.
636     */
637
638    xqe = xq->head;
639
640    if (xqe == NULL || ! rpc__clock_aged(xq->timestamp, xq->rexmit_timeout))
641    {
642        return;
643    }
644
645    /*
646     * See if we've been waiting too long for an acknowledgement,
647     * and if we have, blow off the call.
648     */
649
650    if (rpc__dg_xmitq_awaiting_ack_tmo(xq, call->com_timeout_knob))
651    {
652        rpc__dg_call_signal_failure(call, rpc_s_comm_failure);
653        return;
654    }
655
656    /*
657     * Transmit the first pkt on the xmitq.
658     *
659     * Note how we blithely ignore window constraints.  If the receiver
660     * has closed the window on us, we want to occasionally elicit acks
661     * so that we'll know when window space opens up.
662     *
663     * Don't "rexmit" a pkt that has not yet been sent.  If the pkt is
664     * the first unsent, then just setting the blast size to 1 (with
665     * an empty rexmitq) will cause START_XMIT to send this pkt and
666     * properly manage the xq pointers.
667     */
668
669    if (xqe != xq->first_unsent)
670    {
671        xq->rexmitq = xqe;
672        xqe->next_rexmit = NULL;
673    }
674
675    /*
676     * Adjust the re-xmit age for the next time (i.e., be a good network
677     * citizen and backoff the next re-xmit).
678     */
679
680    xq->rexmit_timeout = MIN(RPC_C_DG_MAX_REXMIT_TIMEOUT, (xq->rexmit_timeout << 1));
681
682    /*
683     * If this packet is considered part of the current congestion
684     * window, remove it.  This may also require decrementing the
685     * count of outstanding facks.
686     */
687
688    if (xqe->in_cwindow)
689    {
690        xq->cwindow_size--;
691        xqe->in_cwindow = false;
692
693        if (! RPC_DG_FLAG_IS_SET(xqe->flags, RPC_C_DG_PF_NO_FACK) ||
694            RPC_DG_FLAG_IS_SET(xqe->flags, RPC_C_DG_PF_LAST_FRAG))
695        {
696            xq->freqs_out--;
697        }
698    }
699
700    xq->blast_size = 1;
701
702    RPC_DBG_PRINTF(rpc_e_dbg_xmit, 4, (
703        "(rpc__dg_call_xmitq_timer) re-xmit'ing %u.%u [%s]\n",
704        xq->hdr.seq, xqe->fragnum, rpc__dg_act_seq_string(&xq->hdr)));
705
706    RPC_DG_START_XMIT(call);
707}
708
709/*
710 * R P C _ _ D G _ C A L L _ I N I T
711 *
712 * Initialize the parts of call handles (rpc_dg_call_t) that are
713 * initialized in the same fashion for both client and server call handles.
714 *
715 * No lock requirements. The call is private at this point.
716 */
717
718PRIVATE void rpc__dg_call_init
719(
720    rpc_dg_call_p_t call
721)
722{
723    rpc_clock_t now = rpc__clock_stamp();
724
725    RPC_DG_CALL_LOCK_INIT(call);
726
727    call->c.protocol_id = RPC_C_PROTOCOL_ID_NCADG;
728
729    call->next                  = NULL;
730    call->state                 = rpc_e_dg_cs_init;
731    call->status                = rpc_s_ok;
732    call->state_timestamp       = now;
733
734    RPC_DG_CALL_COND_INIT(call);
735
736    rpc__dg_xmitq_init(&call->xq);
737    rpc__dg_recvq_init(&call->rq);
738
739    call->sock_ref              = NULL;
740    call->actid_hash            = 0;
741    call->key_info              = NULL;
742    call->auth_epv              = NULL;
743    call->addr                  = NULL;
744    memset(&call->timer, 0, sizeof(call->timer));
745    call->last_rcv_timestamp    = 0;
746    call->start_time            = now;
747    call->high_seq              = 0;
748    call->pkt_chain             = NULL;
749    call->com_timeout_knob      = 0;
750    call->refcnt                = 0;
751    call->n_resvs               = 0;
752    call->n_resvs_wait          = 0;
753    call->max_resvs             = 0;
754
755    call->blocked_in_receive    = false;
756    call->priv_cond_signal      = false;
757    call->stop_timer            = false;
758    call->is_cbk                = false;
759    call->is_in_pkt_chain       = false;
760
761    memset(&call->thread_id, 0, sizeof(call->thread_id));
762}
763
764/*
765 * R P C _ _ D G _ C A L L _ F R E E
766 *
767 * Free the resources referenced by the common call handle header
768 * (rpc_dg_call_t).
769 *
770 * This is an low level routine that should be invoked by the higher
771 * level call handle type specific (i.e. ccall, scall) free routines.
772 * The higher level routines are responsible for freeing resources
773 * associated with the non-common portion of the call handle as well
774 * as freeing the call handle object storage.
775 *
776 * This routine has the side effect of releasing the call's lock (which
777 * is only natural since the call structure, including it's lock are
778 * destroyed).
779 */
780
781PRIVATE void rpc__dg_call_free
782(
783    rpc_dg_call_p_t call
784)
785{
786    unsigned32 st;
787
788    RPC_DG_CALL_LOCK_ASSERT(call);
789
790    rpc__naf_addr_free(&call->addr, &st);
791
792    rpc__dg_xmitq_free(&call->xq, call);
793    rpc__dg_recvq_free(&call->rq);
794
795    /*
796     * In the event that this call was using a private socket, it is
797     * important that the socket is released from the call handle only
798     * *after* the xmit/recv queues have been freed.  Every private socket
799     * is allocated a recv/xmit packet pair, which is available for use
800     * by the call.  Since at any given time these packets may be queued
801     * on the call handle, we want to make sure that they are reassociated
802     * with the socket before we lose our reference to it.  Freeing the
803     * queues before the socket ensures that this will happen.
804     */
805    rpc__dg_network_sock_release(&call->sock_ref);
806
807    if (call->key_info)
808        RPC_DG_KEY_RELEASE(call->key_info);
809    CLOBBER_PTR(call->key_info);
810
811    RPC_DG_CALL_UNLOCK(call);
812    RPC_DG_CALL_LOCK_DELETE(call);
813    RPC_DG_CALL_COND_DELETE(call);
814
815    CLOBBER_PTR(call->next);
816    /* common call handle header may no longer be referenced */
817}
818
819/*
820 * R P C _ _ D G _ C A L L _ W A I T
821 *
822 * Wait for a change in the call's state using the call's condition var
823 * Note, the caller is responsible for determing if the purpose for
824 * waiting was satisfied - don't assume anything.
825 *
826 * The 'event' argument indicates if the caller is waiting for a local
827 * event to occur.  Currently there are two such events on which a call
828 * might need to block: 1) waiting for a packet pool reservation to
829 * become available, or 2) waiting for a datagram packet to become
830 * available.
831 *
832 * The local/remote distinction is only meaningful for calls that are
833 * using private sockets.  Such calls wait for local events using the
834 * call's condition variable, and wait for remote events by blocking
835 * in recvfrom.  Calls using a shared socket always wait on a condition
836 * variable.
837 *
838 * Detect pending cancels if we're called with general cancelability
839 * enabled (i.e. user cancels or orphans) and arrange for proper cancel
840 * handling.  This is the central place where others synchronously detect
841 * that a call now has an error associated with it (see
842 * dg_call_signal_failure()).
843 *
844 * This routine *must* be called with the call's mutex held.  This routine
845 * returns with the lock held.
846 */
847
848PRIVATE void rpc__dg_call_wait
849(
850    rpc_dg_call_p_t call,
851    rpc_dg_wait_event_t event,
852    unsigned32 *st
853)
854{
855    boolean is_server = RPC_DG_CALL_IS_SERVER(call);
856    rpc_dg_ccall_p_t ccall = (rpc_dg_ccall_p_t) call;
857
858    /* !!! RPC_UNLOCK_ASSERT(); */
859    RPC_DG_CALL_LOCK_ASSERT(call);
860
861    *st = call->status;
862    if (*st != rpc_s_ok)
863            return;
864
865    /*
866     * Anytime the call handle is unlocked, it is possible for the listener
867     * (or timer) thread to modify its xmitq, and signal it to do a send.
868     * If the signal occurs during some window in which the call handle
869     * is unlocked, we'll miss it.  Therefore, check here, before
870     * waiting(), to see if there is any data to transmit.  (See bottom
871     * of routine for more information about transmissions.)
872     *
873     * If we do transmit data, return immediately.  In the normal case
874     * the caller is waiting for queue space to open up on the xmitq,
875     * which will have happened by calling call_xmit().
876     */
877
878    if (RPC_DG_CALL_READY_TO_SEND(call))
879    {
880        rpc__dg_call_xmit(call, true);
881        return;
882    }
883
884    /*
885     * If this is a client call handle make sure that we haven't been
886     * asked to run a callback prior to waiting.  If we detect a local
887     * cancel, process it.
888     */
889
890    if ((! is_server && ! ccall->cbk_start) || is_server)
891    {
892        /*
893         * If the call is using a private socket, and is waiting
894         * for a network event, let it wait in recvfrom.
895         */
896        if (call->sock_ref->is_private &&
897            event == rpc_e_dg_wait_on_network_event)
898        {
899            RPC_DBG_PRINTF(rpc_e_dbg_dg_sockets, 5, (
900                    "(rpc__dg_call_wait) waiting on network event\n"));
901
902            rpc__dg_network_select_dispatch(ccall->c.sock_ref->sock,
903                (dce_pointer_t) ccall->c.sock_ref, (boolean32) true, st);
904        }
905        else
906        {
907
908#ifndef _PTHREAD_NO_CANCEL_SUPPORT
909            DCETHREAD_TRY {
910                RPC_DG_CALL_COND_WAIT(call);
911            } DCETHREAD_CATCH(dcethread_interrupt_e) {
912                rpc__dg_call_local_cancel(call);
913            } DCETHREAD_CATCH_ALL(THIS_CATCH) {
914                rpc__dg_call_signal_failure(call,
915                           (unsigned32) -1 /* !!! rpc_s_unknown_exception */);
916            } DCETHREAD_ENDTRY
917#else
918            /*
919             * We certainly can't try to detect a non-existant cancel;
920             * just wait (and avoid the TRY/CATCH overhead).
921             */
922            RPC_DG_CALL_COND_WAIT(call);
923#endif
924        }
925
926        *st = call->status;
927        if (*st != rpc_s_ok)
928            return;
929    }
930
931    /*
932     * If this is a client call handle and we've been asked to do a
933     * callback, now's the time to do it.  While the callback is running,
934     * we must unlock the original CCALL.  Additionally, we lock and bump
935     * the cbk_scall's reference count (mimicing the normal processing
936     * of handing off a scall reference to the call executor).
937     *
938     * To provide the same server call executor thread environment
939     * as a 'real' server call executor, disable / restore cancelability
940     * while in the server call processing (see rpc__dg_call_local_cancel()).
941     */
942
943    if (! is_server)
944    {
945        if (ccall->cbk_start)
946        {
947            int oc;
948
949            assert(ccall->cbk_scall != NULL);
950            ccall->cbk_start = false;
951
952            RPC_DG_CALL_LOCK(&ccall->cbk_scall->c);
953            RPC_DG_CALL_REFERENCE(&ccall->cbk_scall->c);
954            ccall->cbk_scall->has_call_executor_ref = true;
955
956            RPC_DG_CALL_UNLOCK(&ccall->c);
957
958            oc = dcethread_enableinterrupt_throw(0);
959            rpc__dg_execute_call((dce_pointer_t) &ccall->cbk_scall->c, false);
960            dcethread_enableinterrupt_throw(oc);
961
962            RPC_DG_CALL_LOCK(&ccall->c);
963        }
964    }
965
966    /*
967     * If there is data to transmit (or retransmit), send it now.  Transmissions
968     * have been moved into this routine based on the following observations.
969     *
970     *      - The ability to transmit (open congestion window space)
971     *        occurs asynchronously to the execution of the thread.  A
972     *        transmitting thread is only interested in queueing data,
973     *        it doesn't care/know about the actual (re)transmission.
974     *        threads do not wait on the ability to transmit directly.
975     *
976     *      - When cwindow space opens up, the thread is signalled to
977     *        do the send.  The thread will have been waiting on a
978     *        different condition (probably space on the xmitq) but will
979     *        need to check first if there is data that needs to be sent.
980     *
981     *      - Each time a thread needs to wait for any condition (e.g.
982     *        a packet to become available) it must also check that it
983     *        wasn't woken to do a send.
984     *
985     *      - Whenever a thread is awoken to do a send, it *always* wants
986     *        to do the send immediately, before checking on whatever
987     *        other condition it was waiting on.
988     *
989     *      - It is error prone to require each caller of call_wait to
990     *        perform its own check for transmissable data, as well as
991     *        whatever condition it was really interested in.
992     */
993
994    if (RPC_DG_CALL_READY_TO_SEND(call))
995        rpc__dg_call_xmit(call, true);
996}
997
998/*
999 * R P C _ _ D G _ C A L L _ S I G N A L
1000 *
1001 * Signal a change in the call's state.
1002 */
1003
1004PRIVATE void rpc__dg_call_signal
1005(
1006    rpc_dg_call_p_t call
1007)
1008{
1009    RPC_DG_CALL_LOCK_ASSERT(call);
1010
1011    /*
1012     * If the call is using a private socket, and is currently blocked in
1013     * a call to recvfrom, we'll need to post a cancel against its thread
1014     * in order to wake it.  Otherwise, just signal the call's condition
1015     * variable.
1016     */
1017    if (call->sock_ref->is_private && call->blocked_in_receive)
1018    {
1019        RPC_DBG_PRINTF(rpc_e_dbg_dg_sockets, 5, (
1020                    "(rpc__dg_call_signal) cancelling private socket thread\n"));
1021
1022        call->priv_cond_signal = true;
1023        dcethread_interrupt_throw(call->thread_id);
1024    }
1025    else
1026    {
1027        /*
1028         * normally (always?) at most one waiter will be present
1029         */
1030        RPC_DG_CALL_COND_SIGNAL(call);
1031    }
1032}
1033
1034/*
1035 * R P C _ _ D G _ C A L L _ S I G N A L _ F A I L U R E
1036 *
1037 * Signal a change in the call's state due to a failure.
1038 *
1039 * The idea is that is that dg_call_signal_failure() and dg_call_wait()
1040 * are the only routines that reference the the call's error status
1041 * field.  Additionally, this field's value is what will be returned to
1042 * the stubs.  Retain the 1st failure status.
1043 */
1044
1045PRIVATE void rpc__dg_call_signal_failure
1046(
1047    rpc_dg_call_p_t call,
1048    unsigned32 stcode
1049)
1050{
1051    RPC_DG_CALL_LOCK_ASSERT(call);
1052
1053    RPC_DBG_GPRINTF(
1054        ("(rpc__dg_call_signal_failure) %s st = 0x%x (orig st = 0x%x) [%s]\n",
1055        RPC_DG_CALL_IS_SERVER(call) ? "SCALL" : "CCALL",
1056        stcode, call->status, rpc__dg_act_seq_string(&call->xq.hdr)));
1057
1058    if (call->status == rpc_s_ok)
1059        call->status = stcode;
1060
1061    rpc__dg_call_signal(call);
1062}
1063
1064/*
1065 * R P C _ _ D G _ C A L L _ X M I T Q _ P U S H
1066 *
1067 * Push out any queue transmit data.
1068 */
1069
1070PRIVATE void rpc__dg_call_xmitq_push
1071(
1072    rpc_dg_call_p_t call,
1073    unsigned32 *st
1074)
1075{
1076    rpc_dg_xmitq_p_t xq = &call->xq;
1077
1078    RPC_DG_CALL_LOCK_ASSERT(call);
1079
1080    *st = rpc_s_ok;
1081
1082    /*
1083     * Normally, we want to loop in here while the (re)transmission logic
1084     * sends whatever data remains on the xmitq.  In the case where the
1085     * queue is quiescent, this requires a little bootstrapping.  This
1086     * will be the case if either 1) no data has been sent from this
1087     * xmitq yet, or 2) everyting on the queue was already facked and
1088     * the queue is now empty.  In either case, we can't wait for a fack
1089     * or a timeout before doing the next transmit.  Both cases are
1090     * identified by a NULL xq->head, and are handled by setting the
1091     * blast size to 1 so that the partial packet get sent immediately.
1092     */
1093
1094    if (xq->head == NULL)
1095        xq->blast_size = 1;
1096    /*
1097     * If the queue is not empty and has no outstanding fack request,
1098     * send two fragments.
1099     *
1100     * call_transmit_int() does not start the window until the two
1101     * fragments are filled. Thus, we could have one full and one
1102     * partial fragments here.
1103     */
1104    else if (xq->freqs_out == 0)
1105        xq->blast_size = 2;
1106
1107    /*
1108     * Turn around the connection and force (and wait for) all queued
1109     * up packets to be transmitted; the listener handles retransmits.
1110     * If the call has an error condition we're done.
1111     */
1112
1113    xq->push = true;
1114
1115    /*
1116     * Ensure that a final partial packet gets added to the queue.
1117     * Note that this call must follow the setting of the push flag
1118     * so that append_pp knows whether to set the frag bit in the
1119     * packet header.
1120     */
1121
1122    rpc__dg_xmitq_append_pp(call, st);
1123    if (*st != rpc_s_ok)
1124        return;
1125
1126    while (true)
1127    {
1128        if (RPC_DG_CALL_READY_TO_SEND(call))
1129            rpc__dg_call_xmit(call, true);
1130        if (xq->first_unsent == NULL)
1131            break;
1132        rpc__dg_call_wait(call, rpc_e_dg_wait_on_network_event, st);
1133        if (*st != rpc_s_ok)
1134            return;
1135    }
1136}
1137
1138/*
1139 * R P C _ _ D G _ C A L L _ R E C V Q _ I N S E R T
1140 *
1141 * Insert a receive queue element (packet) into a receive queue in the
1142 * right place.  Drop duplicate packets and generate a "fack" if one
1143 * is requested (even for duplicates).  Return "true" iff we actually
1144 * add the packet to the receive queue.  The "rqe_is_head_inorder" output
1145 * param is set to "true" iff the passed receive queue element became
1146 * head of the queue and is in-order.
1147 */
1148
1149PRIVATE boolean rpc__dg_call_recvq_insert
1150(
1151    rpc_dg_call_p_t call,
1152    rpc_dg_recvq_elt_p_t rqe,
1153    boolean *wake_thread
1154)
1155{
1156    rpc_dg_recvq_p_t rq = &call->rq;
1157    rpc_dg_recvq_elt_p_t scan_rqe, prev_scan_rqe;
1158    unsigned16 fragnum = rqe->hdrp->fragnum;
1159    unsigned16 next_fragnum = rq->next_fragnum;
1160    unsigned16 curr_serial = (rqe->hdrp->serial_hi << 8) | rqe->hdrp->serial_lo;
1161    boolean added_to_queue;
1162
1163    RPC_DG_CALL_LOCK_ASSERT(call);
1164
1165    added_to_queue = true;
1166    *wake_thread = false;
1167
1168    /*
1169     * We could rework things to lessen the processing for the single
1170     * pkt stream case, but at this point this "additional" overhead's
1171     * performance impact has to be a second order effect...
1172     */
1173
1174    /*
1175     * Remember if we're recving frags.  To insulate us from potentially
1176     * bogus transmitters, once we're recving frags we stay that way.
1177     * It's cheaper and more effective to do this than to "assert" it.
1178     */
1179    rq->recving_frags |= RPC_DG_HDR_FLAG_IS_SET(rqe->hdrp, RPC_C_DG_PF_FRAG);
1180
1181    /*
1182     * The receiver needs to keep track of the highest serial number it
1183     * has seen so far, just in case it ever needs to send a fack but
1184     * isn't holding onto an rqe at the time.
1185     */
1186    if (RPC_DG_SERIAL_IS_LT(rq->high_serial_num, curr_serial))
1187        rq->high_serial_num = curr_serial;
1188
1189    /*
1190     * The receiver needs to keep track of the largest fragment size it
1191     * has seen so far, so that when it becomes the sender it can start
1192     * sending that much data.
1193     */
1194
1195    if (rqe->frag_len > rq->high_rcv_frag_size) {
1196        RPC_DBG_PRINTF(rpc_e_dbg_recv, 7,
1197                ("(rpc__dg_call_recvq_insert) Set high_rcv_frag %lu was %lu\n",
1198                        rqe->frag_len, rq->high_rcv_frag_size));
1199        rq->high_rcv_frag_size = rqe->frag_len;
1200
1201        /*
1202         * If high_rcv_frag_size > RPC_C_DG_MUST_RECV_FRAG_SIZE, that means
1203         * we have advertised MBF and the sender can cope with it. We need
1204         * to adjust the max_queue_len, if possible.
1205         *
1206         * Note: MBF (Multi-Buffer Fragments) implies that the call has the
1207         * more than 1 packet reservations. Also if rq->max_queue_len !=
1208         * RPC_C_DG_MAX_RECVQ_LEN, then the max_queue_len is already
1209         * adjusted and no need for re-adjustment (because the packet
1210         * reservation gets adjusted only once).
1211         *
1212         * LBF (Large-Buffer Fragment) and the fragment size kept between
1213         * calls means that the first request pdu could be greater than
1214         * RPC_C_DG_MUST_RECV_FRAG_SIZE.
1215         */
1216        if (call->n_resvs > 0
1217            && rq->high_rcv_frag_size > RPC_C_DG_MUST_RECV_FRAG_SIZE
1218            && rq->max_queue_len == RPC_C_DG_MAX_RECVQ_LEN)
1219        {
1220            unsigned32 max_queue_len =
1221                RPC_C_DG_MAX_RECVQ_LEN / MIN(call->n_resvs, call->max_resvs);
1222
1223            /*
1224             * Update rq->max_queue_len.
1225             *
1226             * If the current queue_len already exceeds the new
1227             * max_queue_len, we can't adjust it here.
1228             *
1229             * Also, we must have at least one inorder fragment. Otherwise
1230             * nothing gets dequeued.
1231             */
1232            if (rq->queue_len <= max_queue_len
1233                && rq->inorder_len > 0)
1234            {
1235                rq->max_queue_len = max_queue_len;
1236                RPC_DBG_PRINTF(rpc_e_dbg_recv, 7,
1237                       ("(rpc__dg_call_recvq_insert) Set max_queue_len %u\n",
1238                                rq->max_queue_len));
1239            }
1240        }
1241    }
1242
1243    /*
1244     * Scalls without packet pool reservations are not allowed to queue
1245     * packets.  The two cases in which we encounter this condition are
1246     * for queued calls, and scalls that were started when no reservations
1247     * were available.
1248     *
1249     * The callback scall should be allowed to queue data without the
1250     * reservation if using the private socket. However, since the
1251     * callback is not officially supported (and the scall doesn't use
1252     * the private socket), we make no distinction. (Trying a little
1253     * performance recovery...)
1254     *
1255     * Note that it is ok for a ccall to queue data before it has made
1256     * a packet reservation, since ccalls may have private packets
1257     * associated with their socket refs.
1258     */
1259
1260    if (RPC_DG_CALL_IS_SERVER(call) && call->n_resvs == 0)
1261    {
1262        RPC_DBG_PRINTF(rpc_e_dbg_recv, 6, (
1263            "(rpc__dg_call_recvq_insert) dropping queued/un-setup call's pkt\n"));
1264
1265        added_to_queue = false;
1266    }
1267    else if (fragnum == next_fragnum)
1268    {
1269        /*
1270         * This is the fast path.  First make sure that the recvq does not
1271         * already contain the max allowable packets; if so, we can't accept
1272         * this one.
1273         */
1274
1275        if (rq->queue_len == rq->max_queue_len)
1276        {
1277            added_to_queue = false;
1278        }
1279
1280        /*
1281         * If we are rationing packets, we can only accept a packet if
1282         * it is the next immediately usable packet for this call.  This
1283         * will be the case if the current packet will become the new
1284         * head of this call's recvq.
1285         *
1286         * Another check needs to be made to make sure that the call
1287         * executor call isn't holding onto a packet (in dg_execute_call)
1288         * before the call's WAY has been done.  The call thread will set
1289         * a flag in the recvq to indicate that it's done with its WAY
1290         * processing.
1291         */
1292        else if (rpc__dg_pkt_is_rationing(NULL) &&
1293                 (rq->last_inorder != NULL ||
1294                                       (! rq->is_way_validated && fragnum != 0)))
1295        {
1296            RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 1,
1297                ("(rpc__dg_call_recvq_insert) rationing, not 'next' pkt, fc %u pkt %u [%s]\n",
1298                rpc_g_dg_pkt_pool.free_count, rpc_g_dg_pkt_pool.pkt_count,
1299                rpc__dg_act_seq_string(&call->xq.hdr)));
1300
1301            added_to_queue = false;
1302        }
1303
1304        /*
1305         * Otherwise, it's the next in-order packet and we add it to
1306         * the queue.  If there are currently no in-order pkts, we know
1307         * that this pkt will be inserted at the head of the queue and
1308         * make the head in-order.  Otherwise, there are already in-order
1309         * fragments on the queue (hence the head is already in-order).
1310         * We insert this frag after the "last_inorder" frag and scan
1311         * to locate the new "last_inorder" frag.  In both cases, the
1312         * "last_inorder" and "next_fragnum" are updated appropriatly.
1313         */
1314
1315        else
1316        {
1317            if (rq->last_inorder == NULL)
1318            {
1319                rqe->next = rq->head;
1320                rq->head = rqe;
1321                rq->head_fragnum = fragnum;
1322            }
1323            else
1324            {
1325                rqe->next = rq->last_inorder->next;
1326                rq->last_inorder->next = rqe;
1327            }
1328
1329            prev_scan_rqe = rqe;
1330            scan_rqe = rqe->next;
1331            next_fragnum++;
1332            rq->inorder_len++;
1333
1334            while (scan_rqe != NULL && scan_rqe->hdrp->fragnum == next_fragnum)
1335            {
1336                prev_scan_rqe = scan_rqe;
1337                scan_rqe = scan_rqe->next;
1338                next_fragnum++;
1339                rq->inorder_len++;
1340            }
1341
1342            rq->last_inorder = prev_scan_rqe;
1343            rq->next_fragnum = next_fragnum;
1344        }
1345    }
1346    else  if (RPC_DG_FRAGNUM_IS_LT(fragnum, next_fragnum))
1347    {
1348        /*
1349         * This is an old fragment and we don't add it to the queue.
1350         */
1351
1352        added_to_queue = false;
1353        RPC_DG_STATS_INCR(dups_rcvd);
1354    }
1355    else
1356    {
1357        RPC_DBG_PRINTF(rpc_e_dbg_recv, 6, (
1358            "(rpc__dg_call_recvq_insert) recv out of order %u.%u, next_fragnum = .%u\n",
1359            rqe->hdrp->seq, fragnum, next_fragnum));
1360
1361        RPC_DG_STATS_INCR(oo_rcvd);
1362
1363        /*
1364         * This is not an in-order fragment (i.e. the head can not now
1365         * become in-order nor can the "last_inorder" pointer require
1366         * updating); it may also be a duplicate.
1367         *
1368         * If we're currently rationing packets, then we must drop this
1369         * one.
1370         */
1371
1372        if (rpc__dg_pkt_is_rationing(NULL))
1373        {
1374            RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 1,
1375                ("(rpc__dg_call_recvq_insert) rationing, oo pkt, fc %u pkt %u [%s]\n",
1376                rpc_g_dg_pkt_pool.free_count, rpc_g_dg_pkt_pool.pkt_count,
1377                rpc__dg_act_seq_string(&call->xq.hdr)));
1378
1379            added_to_queue = false;
1380        }
1381
1382        /*
1383         * Don't accept the packet if it is greater than max_recvq_len
1384         * away from the first packet (to be) queued.  This check avoids the
1385         * possibility of having a queue of max length (which can't be
1386         * added to), but which doesn't contain the next awaited packet.
1387         */
1388
1389        else if ((rq->head != NULL &&
1390                  RPC_DG_FRAGNUM_IS_LT(rq->head_fragnum, next_fragnum) &&
1391                  RPC_DG_FRAGNUM_IS_LTE(rq->head_fragnum +
1392                                        rq->max_queue_len, fragnum))
1393                 || RPC_DG_FRAGNUM_IS_LTE(next_fragnum +
1394                                          rq->max_queue_len, fragnum))
1395        {
1396            added_to_queue = false;
1397        }
1398
1399        /*
1400         * If the queue is empty, the frag becomes the head.  Otherwise,
1401         * we must scan the queue for the proper insertion point and
1402         * ensure this isn't a duplicate.  If there are no in-order
1403         * fragments, the scan starts at the head of the queue.  If there
1404         * ARE in-order frags, the scan starts at the first frag after
1405         * the last in-order frag.
1406         */
1407
1408        else
1409        {
1410            if (rq->head == NULL)
1411            {
1412                rq->head = rqe;
1413                rq->head_fragnum = fragnum;
1414            }
1415            else
1416            {
1417                if (rq->last_inorder == NULL)
1418                {
1419                    scan_rqe = rq->head;
1420                    prev_scan_rqe = NULL;
1421                }
1422                else
1423                {
1424                    scan_rqe = rq->last_inorder->next;
1425                    prev_scan_rqe = rq->last_inorder;
1426                }
1427
1428                while (scan_rqe != NULL &&
1429                       RPC_DG_FRAGNUM_IS_LT(scan_rqe->hdrp->fragnum, fragnum))
1430                {
1431                    prev_scan_rqe = scan_rqe;
1432                    scan_rqe = scan_rqe->next;
1433                }
1434
1435                if (scan_rqe != NULL && fragnum == scan_rqe->hdrp->fragnum)
1436                {
1437                    added_to_queue = false;
1438                }
1439                else
1440                {
1441                    rqe->next = scan_rqe;
1442
1443                    /*
1444                     * Before setting the pointer in the up-stream packet,
1445                     * check to see if we just pointed this packet at the
1446                     * previous head of the list.  If so, this packet becomes
1447                     * the new head.
1448                     */
1449
1450                    if (scan_rqe == rq->head)
1451                    {
1452                        rq->head = rqe;
1453                        rq->head_fragnum = fragnum;
1454                    }
1455                    else {
1456                        assert(prev_scan_rqe != NULL);
1457                        prev_scan_rqe->next = rqe;
1458                    }
1459                }
1460            }
1461        }
1462    }
1463
1464    /*
1465     * We *always* fall through to this common processing.
1466     */
1467
1468    /*
1469     * if we're accepting the packet, update some queue state.
1470     */
1471    if (added_to_queue)
1472    {
1473        rq->queue_len++;
1474
1475        /*
1476         * So that the selective ack code can easily determine the length
1477         * of the mask needed for a fack body, keep track of the highest
1478         * numbered fragment we have on the queue.
1479         */
1480        if (RPC_DG_FRAGNUM_IS_LT(rq->high_fragnum, fragnum))
1481        {
1482            rq->high_fragnum = fragnum;
1483        }
1484
1485        /*
1486         * The data stream is complete when our queue has inorder data
1487         * and it's not a fragmented stream, or if the last inorder
1488         * fragment has the last_frag bit set.
1489         *
1490         * If rq->last_inorder->hdrp == NULL (can happen iff last_inorder ==
1491         * head), we must have added the out of order fragment, that means
1492         * the data stream is incomplete.
1493         */
1494        if (rq->last_inorder != NULL &&
1495            (! rq->recving_frags ||
1496             (rq->last_inorder->hdrp != NULL &&
1497              RPC_DG_HDR_FLAG_IS_SET(rq->last_inorder->hdrp,
1498                                     RPC_C_DG_PF_LAST_FRAG))))
1499        {
1500            rq->all_pkts_recvd = true;
1501        }
1502    }
1503
1504    RPC_DBG_PRINTF(rpc_e_dbg_recv, 7,
1505        ("(rpc__dg_call_recvq_insert) recv %s %u.%u.%u len=%d %s\n",
1506        rpc__dg_pkt_name(RPC_DG_HDR_INQ_PTYPE(rqe->hdrp)),
1507        rqe->hdrp->seq,
1508        fragnum, (rqe->hdrp->serial_hi << 8) | rqe->hdrp->serial_lo,
1509        rqe->hdrp->len,
1510        (rq->recving_frags &&
1511            ! RPC_DG_HDR_FLAG_IS_SET(rqe->hdrp, RPC_C_DG_PF_NO_FACK)) ?
1512                "frq" : ""));
1513    RPC_DBG_PRINTF(rpc_e_dbg_recv, 7,
1514        ("(rpc__dg_call_recvq_insert) recv frag_len %lu\n", rqe->frag_len));
1515
1516    /*
1517     * If we've got a fragmented receive stream consider sending a fack.
1518     * Send a fack if the receive queue is full or if the sender asked
1519     * for a fack.  To avoid confusing pre-v2 code, don't send a fack
1520     * when the stream is complete.
1521     */
1522    if (rq->recving_frags
1523        && ! rq->all_pkts_recvd
1524        && (rq->queue_len == rq->max_queue_len ||
1525            ! RPC_DG_HDR_FLAG_IS_SET(rqe->hdrp, RPC_C_DG_PF_NO_FACK)))
1526    {
1527        RPC_DBG_PRINTF(rpc_e_dbg_recv, 6, (
1528            "(rpc__dg_call_recvq_insert) recv data %u.%u frq, fack --> .%u\n",
1529            rqe->hdrp->seq, fragnum, rq->next_fragnum - 1));
1530        rpc__dg_call_xmit_fack(call, rqe, false);
1531    }
1532
1533    /*
1534     * If the call has a private socket, then it was the call thread
1535     * itself that called us.  No need to signal it.
1536     */
1537    if (call->sock_ref->is_private)
1538        return (added_to_queue);
1539
1540    /*
1541     * It is appropriate to awaken a receiver when:
1542     *      Their receive stream is complete
1543     *              -OR-
1544     *      The recvq is full (the head should be inorder)
1545     *              -OR-
1546     *      There are a sufficient number of inorder RQE's
1547     *              -OR-
1548     *      We're rationing, and the thread has a useable packet.
1549     *
1550     * Note: We estimate # of inorder bytes based on high_rcv_frag_size.
1551     */
1552    if (rq->all_pkts_recvd ||
1553        rq->queue_len == rq->max_queue_len ||
1554        rq->high_rcv_frag_size * rq->inorder_len >= rq->wake_thread_qsize ||
1555        (rpc__dg_pkt_is_rationing(NULL) && rq->inorder_len > 0))
1556        *wake_thread = true;
1557
1558    return(added_to_queue);
1559}
1560
1561/*
1562 * R P C _ _ D G _ C A L L _ L O C A L _ C A N C E L
1563 *
1564 * A local cancel has been detected (i.e. a cancelable operation
1565 * caused an unwind).  What needs to be done depends on the type
1566 * of call handle.  Callbacks complicate matters - as always :-)
1567 * The actual code is pretty simple; it's the case analysis that's
1568 * a bear; presumably, this 'documentation' will be useful.
1569 */
1570
1571PRIVATE void rpc__dg_call_local_cancel
1572(
1573    rpc_dg_call_p_t call
1574)
1575{
1576    RPC_DG_CALL_LOCK_ASSERT(call);
1577
1578    if (RPC_DG_CALL_IS_CLIENT(call))
1579    {
1580        rpc_dg_ccall_p_t ccall = (rpc_dg_ccall_p_t) call;
1581
1582        /*
1583         * All local detected cancels while dealing with a ccall are
1584         * forwarded to the associated server.  This is because any such
1585         * cancel *was* generated locally.  The original client will
1586         * only accept a forwarded cancel request (resulting in a remotely
1587         * generated cancel) while a callback is in progress.  The callback
1588         * would flush any pending cancels when it completes.
1589         *
1590         * If the call has yet to establish a cancel timeout, do so now.
1591         */
1592        ccall->cancel.local_count++;
1593        rpc__dg_ccall_setup_cancel_tmo(ccall);
1594
1595        RPC_DBG_PRINTF(rpc_e_dbg_cancel, 10,
1596                ("(rpc__dg_call_local_cancel) ccall fwd [%s]\n",
1597                rpc__dg_act_seq_string(&ccall->c.xq.hdr)));
1598
1599        rpc__dg_ccall_xmit_cancel_quit(ccall, ccall->cancel.local_count);
1600    }
1601    else
1602    {
1603        rpc_dg_scall_p_t scall = (rpc_dg_scall_p_t) call;
1604
1605        /*
1606         * If we're handling a non-callback scall (i.e. a normal scall)
1607         * we don't (typically) expect to see local cancels since the
1608         * scall execution thread (typically) executes with cancel delivery
1609         * disabled; the sstub enables delivery only while in the manager.
1610         *
1611         * In the event that we have detected the local cancel, then
1612         * someone enabled cancel delivery while calling back into the
1613         * runtime.  This could happen (a) because the sstub was trying
1614         * to implement a cancelable-prologue mode (though NIDL doesn't
1615         * support this at the time this was written); (b) a manager
1616         * called a pipe handler without disabling cancel delivery
1617         * (presumably because it wanted a cancel to be able to terminate
1618         * the call); (c) runtime or sstub bug :-)
1619         *
1620         * Such a local cancel was presumably generated by the reception
1621         * of a cancel-request.  However, some local manager could have
1622         * (illegally?) posted a cancel to this call execution thread.
1623         * Lastly, the local cancel may have originated due to local
1624         * orphan call processing.
1625         *
1626         * In any case, the effect is the same.  Local cancel detection while
1627         * processing a normal scall results in a call failure.
1628         */
1629        if (!scall->c.is_cbk)
1630        {
1631            RPC_DBG_GPRINTF(
1632                    ("(rpc__dg_call_local_cancel) scall failure [%s]\n",
1633                    rpc__dg_act_seq_string(&scall->c.xq.hdr)));
1634
1635            rpc__dg_call_signal_failure(&scall->c, rpc_s_call_cancelled);
1636        }
1637        else
1638
1639        /*
1640         * We're handling a callback scall and we've detected a local cancel.
1641         *
1642         * Callbacks and cancels are a very poor / ugly story.  Basically
1643         * (given the current client / server RPC cancel design) callbacks
1644         * and cancels can only work properly when clients (both the original
1645         * and the callback originating manager) are executing with cancel-on.
1646         * Other scenarios result in very inproper handling of a RPC cancel.
1647         *
1648         * If the original client RPC was being made with cancel delivery
1649         * disabled, then the callback's sstub forceable enabling cancel
1650         * delivery while in the callback manager has already broken
1651         * the cancel model.  A cancel against the original client thread
1652         * was NOT suppose to be seen.
1653         *
1654         * Another case where callbacks and cancels don't work is if
1655         * the client RPC is being performed cancel-on and the manager
1656         * decides to perform a callback with cancel-off.  Again, a cancel
1657         * against the original thread can result in terminating the
1658         * callback manager (which wasn't suppose to see a cancel).
1659         *
1660         * Since the only callback cancel case that can work correctly is
1661         * when all components of the original call are being made with
1662         * cancel delivery enabled, that's the only case we'll try to make
1663         * work.  Here's what we do:
1664         *      If we detected a local cancel for a cbk_scall
1665         *          AND the cbk_scall had received a forwarded
1666         *          cancel-request
1667         *      then we treat this just like the normal scall and
1668         *          fail the cbk_scall
1669         *      otherwise, the cancel must have been a result
1670         *          of a dcethread_interrupt_throw() against the original
1671         *          client and the cancel should treated just like
1672         *          the normal ccall; forward the cancel to the server
1673         */
1674        {
1675            if (scall->c.c.u.server.cancel.count > 0)
1676            {
1677                /*
1678                 * The callback accepted a cancel-request.
1679                 */
1680                RPC_DBG_GPRINTF(
1681                    ("(rpc__dg_call_local_cancel) cbk_scall failure [%s]\n",
1682                    rpc__dg_act_seq_string(&scall->c.xq.hdr)));
1683
1684                rpc__dg_call_signal_failure(&scall->c, rpc_s_call_cancelled);
1685            }
1686            else
1687            {
1688                /*
1689                 * Locally generated cancel - forward it to the originator
1690                 * (the server) of this callback (as above).
1691                 *
1692                 * The callback's cbk_ccall is the one where the info
1693                 * must be updated.  Unfortunately there's a locking
1694                 * hierarchy violation that we have to deal with.  In
1695                 * the case that we can't get the lock, just repost the
1696                 * cancel to the thread so we don't loose it (we will
1697                 * try to handle it again the next time the thread calls
1698                 * a cancelable operation).
1699                 */
1700
1701                rpc_dg_ccall_p_t ccall = scall->cbk_ccall;
1702                boolean b;
1703
1704                RPC_DG_CALL_TRY_LOCK(&ccall->c, &b);
1705                if (!b)
1706                {
1707                    RPC_DBG_PRINTF(rpc_e_dbg_cancel, 3,
1708                        ("(rpc__dg_call_local_cancel) cbk_scall can't get ccall lock [%s]\n",
1709                        rpc__dg_act_seq_string(&scall->c.xq.hdr)));
1710
1711                    dcethread_interrupt_throw(dcethread_self());
1712                }
1713                else
1714                {
1715                    ccall->cancel.local_count++;
1716                    rpc__dg_ccall_setup_cancel_tmo(ccall);
1717
1718                    RPC_DBG_PRINTF(rpc_e_dbg_cancel, 10,
1719                        ("(rpc__dg_call_local_cancel) cbk_scall ccall fwd [%s]\n",
1720                        rpc__dg_act_seq_string(&ccall->c.xq.hdr)));
1721
1722                    rpc__dg_ccall_xmit_cancel_quit(ccall,
1723                            ccall->cancel.local_count);
1724                    RPC_DG_CALL_UNLOCK(&ccall->c);
1725                }
1726            }
1727        }
1728    }
1729}
1730