1/*
2 * Copyright (c) 2010 Apple Inc. All rights reserved.
3 *
4 * @APPLE_LICENSE_HEADER_START@
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1.  Redistributions of source code must retain the above copyright
11 *     notice, this list of conditions and the following disclaimer.
12 * 2.  Redistributions in binary form must reproduce the above copyright
13 *     notice, this list of conditions and the following disclaimer in the
14 *     documentation and/or other materials provided with the distribution.
15 * 3.  Neither the name of Apple Inc. ("Apple") nor the names of its
16 *     contributors may be used to endorse or promote products derived from
17 *     this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY
20 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY
23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 *
30 * Portions of this software have been released under the following terms:
31 *
32 * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC.
33 * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY
34 * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION
35 *
36 * To anyone who acknowledges that this file is provided "AS IS"
37 * without any express or implied warranty:
38 * permission to use, copy, modify, and distribute this file for any
39 * purpose is hereby granted without fee, provided that the above
40 * copyright notices and this notice appears in all source code copies,
41 * and that none of the names of Open Software Foundation, Inc., Hewlett-
42 * Packard Company or Digital Equipment Corporation be used
43 * in advertising or publicity pertaining to distribution of the software
44 * without specific, written prior permission.  Neither Open Software
45 * Foundation, Inc., Hewlett-Packard Company nor Digital
46 * Equipment Corporation makes any representations about the suitability
47 * of this software for any purpose.
48 *
49 * Copyright (c) 2007, Novell, Inc. All rights reserved.
50 * Redistribution and use in source and binary forms, with or without
51 * modification, are permitted provided that the following conditions
52 * are met:
53 *
54 * 1.  Redistributions of source code must retain the above copyright
55 *     notice, this list of conditions and the following disclaimer.
56 * 2.  Redistributions in binary form must reproduce the above copyright
57 *     notice, this list of conditions and the following disclaimer in the
58 *     documentation and/or other materials provided with the distribution.
59 * 3.  Neither the name of Novell Inc. nor the names of its contributors
60 *     may be used to endorse or promote products derived from this
61 *     this software without specific prior written permission.
62 *
63 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
64 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
65 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
66 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY
67 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
68 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
69 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
70 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
71 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
72 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
73 *
74 * @APPLE_LICENSE_HEADER_END@
75 */
76
77/*
78**
79**  NAME:
80**
81**      dgxq.c
82**
83**  FACILITY:
84**
85**      Remote Procedure Call (RPC)
86**
87**  ABSTRACT:
88**
89**  DG protocol service routines.  Handles transmit queues.
90**
91**
92*/
93
94#include <dg.h>
95#include <dgxq.h>
96#include <dgpkt.h>
97#include <dgcall.h>
98
99/*========================================================================= */
100
101#define MAX_SENDMSG_RETRIES 5
102
103/*========================================================================= */
104
105#define XQE_FREE_LIST_MAX_LENGTH    32
106
107INTERNAL struct {
108    rpc_dg_xmitq_elt_p_t head;
109    unsigned16 length;
110} xqe_free_list ATTRIBUTE_UNUSED;
111
112/*========================================================================= */
113
114/*
115 * com_timeout controllable parameters...
116 *
117 * max_xq_awaiting_ack_time:
118 *     The maximum time that we will attempt to retransmit pkts to a receiver
119 *     without any apparent type of acknowledgment.  This is a general xq
120 *     related value and mechanism that removes the need for the old server
121 *     specific "final" state timeout processing.
122 */
123
124typedef struct {
125    unsigned32 max_xq_awaiting_ack_time;
126} com_timeout_params_t;
127
128/*========================================================================= */
129
130/*
131 * R P C _ _ D G _ X M I T Q _ A W A I T I N G _ A C K _ T M O
132 *
133 * Return true iff the xmitq is awaiting an ack and it's been waiting
134 * too long.
135 */
136
137PRIVATE boolean rpc__dg_xmitq_awaiting_ack_tmo
138(
139    rpc_dg_xmitq_p_t xq,
140    unsigned32 com_timeout_knob
141)
142{
143    rpc_clock_t timestamp, wait_time;
144    static com_timeout_params_t xq_com_timeout_params[] = {
145                            /* max_xq_awaiting_ack_time */
146        /*  0 min */        {RPC_CLOCK_SEC(1)},
147        /*  1 */            {RPC_CLOCK_SEC(2)},
148        /*  2 */            {RPC_CLOCK_SEC(4)},
149        /*  3 */            {RPC_CLOCK_SEC(8)},
150        /*  4 */            {RPC_CLOCK_SEC(15)},
151        /*  5 def */        {RPC_CLOCK_SEC(30)},
152        /*  6 */            {RPC_CLOCK_SEC(2*30)},
153        /*  7 */            {RPC_CLOCK_SEC(4*30)},
154        /*  8 */            {RPC_CLOCK_SEC(8*30)},
155        /*  9 */            {RPC_CLOCK_SEC(16*30)},
156        /* 10 infinite */   {RPC_CLOCK_SEC(0)}
157    };
158
159    timestamp = xq->awaiting_ack_timestamp;
160    wait_time = xq_com_timeout_params[com_timeout_knob].max_xq_awaiting_ack_time;
161
162    if (xq->awaiting_ack && rpc__clock_aged(timestamp, wait_time)
163        && com_timeout_knob != rpc_c_binding_infinite_timeout)
164    {
165        RPC_DBG_GPRINTF(
166            ("(rpc__dg_xmitq_awaiting_ack_tmo) timeout (timestamp=%lu, wait_time=%lu, now=%lu) [%s]\n",
167            (unsigned long)timestamp, (unsigned long)wait_time,
168	    (unsigned long)rpc__clock_stamp(),
169            rpc__dg_act_seq_string(&xq->hdr)));
170        return (true);
171    }
172    else
173    {
174        return (false);
175    }
176}
177
178/*
179 * S E N D _ B R O A D C A S T
180 *
181 * Send a datagram out all the appropriate broadcast addresses.
182 */
183
184INTERNAL void send_broadcast (
185        rpc_dg_call_p_t  /*call*/,
186        rpc_socket_iovec_p_t  /*iov*/,
187        int  /*iovlen*/
188    );
189
190INTERNAL void send_broadcast
191(
192    rpc_dg_call_p_t call,
193    rpc_socket_iovec_p_t iov,
194    int iovlen
195)
196{
197    unsigned32 st, j;
198    rpc_socket_error_t serr;
199    int i;
200    size_t sentcc, sendcc;
201    rpc_dg_sock_pool_elt_p_t sp;
202    unsigned_char_p_t endpoint = NULL;
203    rpc_dg_ccall_p_t ccall = (rpc_dg_ccall_p_t) call;
204
205    assert(RPC_DG_CALL_IS_CLIENT(call));
206
207    sp = ccall->c.sock_ref;
208
209    /*
210     * See if we have already inquired about the broadcast addresses
211     * for this socket.  If not, enable broadcasts on the socket (if
212     * necessary) and call the NAF routine for the broadcast addresses.
213     * (Note that we're ignoring the result of "rpc__socket_set_broadcast"
214     * because what's the worst that can happen?)
215     */
216
217    if (sp->brd_addrs == NULL)
218    {
219        (void) rpc__socket_set_broadcast(sp->sock);
220
221        rpc__naf_get_broadcast(rpc_g_protseq_id[sp->pseq_id].naf_id,
222                               ccall->h->c.c.rpc_addr->rpc_protseq_id,
223                               &sp->brd_addrs, &st);
224        if (st != rpc_s_ok)
225        {
226            return;
227        }
228    }
229
230    sendcc = 0;
231    for (i = 0; i < iovlen; i++)
232        sendcc += iov[i].iov_len;
233
234    rpc__naf_addr_inq_endpoint(call->addr, &endpoint, &st);
235
236    for (j = 0; j < sp->brd_addrs->len; j++)
237    {
238        rpc__naf_addr_set_endpoint(endpoint, &sp->brd_addrs->addrs[j], &st);
239
240        RPC_DG_SOCKET_SENDMSG_OOL(sp->sock, iov, iovlen,
241                sp->brd_addrs->addrs[j], &sentcc, &serr);
242
243        RPC_DG_SOCK_UPDATE_ERR_COUNT(sp, serr);
244
245        if (RPC_SOCKET_IS_ERR(serr) || sentcc != sendcc)
246        {
247            RPC_DBG_GPRINTF(("(send_broadcast) sendmsg failed, sendcc = %ld, sentcc = %ld, error = %d\n",
248                sendcc, sentcc, RPC_SOCKET_ETOI(serr)));
249            break;
250        }
251        RPC_DG_STATS_INCR(pkts_sent);
252        RPC_DG_STATS_INCR(brds_sent);
253    }
254
255    RPC_MEM_FREE(endpoint, RPC_C_MEM_STRING);
256}
257
258/*
259 * R P C _ _ D G _ X M I T Q _ E L T _ X M I T
260 *
261 * Send the request/response packet denoted by the transmit queue element
262 * on the specified call.
263 */
264
265PRIVATE void rpc__dg_xmitq_elt_xmit
266(
267    rpc_dg_xmitq_elt_p_t xqe,
268    rpc_dg_call_p_t call,
269    boolean32 block
270)
271{
272    rpc_socket_iovec_t iov[RPC_C_DG_MAX_NUM_PKTS_IN_FRAG+1];
273    int iovlen;
274    rpc_dg_xmitq_elt_p_t last_xqe = xqe;
275    rpc_dg_xmitq_p_t xq = &call->xq;
276    rpc_key_info_p_t key_info;
277    rpc_dg_auth_epv_p_t auth_epv;
278    size_t sentcc, sendcc;
279    unsigned32  original_seq = 0;
280    rpc_socket_error_t serr;
281    unsigned16 i;
282    int ptype;
283#ifdef MISPACKED_HDR
284    rpc_dg_raw_pkt_hdr_t raw_hdr;
285#endif
286
287    RPC_DG_CALL_LOCK_ASSERT(call);
288
289    /*
290     * First, make sure the socket we're about to use has not been
291     * disabled.  If it has been, we might as well fault the call
292     * now.
293     */
294
295    if (RPC_DG_SOCK_IS_DISABLED(call->sock_ref))
296    {
297        RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5,
298            ("(rpc__dg_xmitq_elt_xmit) socket %p has been disabled\n",
299            call->sock_ref->sock));
300        rpc__dg_call_signal_failure(call, rpc_s_socket_failure);
301        return;
302    }
303
304    key_info = call->key_info;
305    auth_epv = call->auth_epv;
306
307    /*
308     * Fill in the prototype header in the transmit queue using values
309     * from transmit queue element.
310     */
311
312    xq->hdr.fragnum = xqe->fragnum;
313    xq->hdr.flags   = xq->base_flags | xqe->flags;
314    xq->hdr.flags2  = xq->base_flags2;
315    xq->hdr.len     = xqe->frag_len;
316    xq->hdr.auth_proto = 0;
317
318    /*
319     * Tag this packet with a serial number, and record that number
320     * into the packet header.
321     */
322
323    xqe->serial_num = xq->next_serial_num++;
324    xq->hdr.serial_hi = (xqe->serial_num & 0xFF00) >> 8;
325    xq->hdr.serial_lo = xqe->serial_num & 0xFF;;
326
327    ptype = RPC_DG_HDR_INQ_PTYPE(&xq->hdr);
328
329    /*
330     * For response packets we need to use the highest sequence number
331     * we've seen up to this time, so that the client can keep track of
332     * the sequence number space.  To work with 1.5.1 clients, we still
333     * need to use the original sequence number for facks, ping responses,
334     * etc.
335     */
336
337    if (ptype == RPC_C_DG_PT_RESPONSE)
338    {
339        original_seq = xq->hdr.seq;
340        xq->hdr.seq = call->high_seq;
341    }
342
343    /*
344     * Only authenticate data packets -- setting key_info to NULL
345     * disables authentication machinery for this packet.
346     */
347
348    if (!RPC_DG_PT_IS_DATA(ptype))
349        key_info = NULL;
350
351    /*
352     * Set "last frag" bit as necessary.  (Note that we make sure that
353     * last frags never ask for facks.  Also, we don't set any of the
354     * frag bits if all the data is being sent in a single packet [i.e.,
355     * we're NOT fragmenting at all]).
356     */
357
358    if (xq->push && xqe == xq->tail &&
359            RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_FRAG))
360        xq->hdr.flags |= RPC_C_DG_PF_LAST_FRAG | RPC_C_DG_PF_NO_FACK;
361
362    /*
363     * If this is the call's final xqe (fragmented or not) or we're
364     * requesting a fack, then start up the awaiting_ack detection
365     * machinery (if it isn't already setup).  The receiver needs to
366     * send a xmitq acknowledgment (i.e. a fack, working, response or
367     * ack pkt) sooner or later.
368     */
369
370    if (! xq->awaiting_ack
371        && (! RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_FRAG)
372            || RPC_DG_HDR_FLAG_IS_SET(&xq->hdr, RPC_C_DG_PF_LAST_FRAG)
373            || ! RPC_DG_HDR_FLAG_IS_SET(&xq->hdr, RPC_C_DG_PF_NO_FACK)))
374    {
375        RPC_DG_XMITQ_AWAITING_ACK_SET(xq);
376    }
377
378    /*
379     * If we're authenticating, set auth_proto field in packet header
380     * to indicate which auth protocol we're using.
381     */
382    if (key_info != NULL)
383        xq->hdr.auth_proto = auth_epv->auth_proto;
384
385    /*
386     * Set up an I/O vector with two elements:  The first points to the
387     * header (from the transmit queue) and the second points to the
388     * body (from the transmit queue element).  Note that in the case
389     * of "mispacked header" systems, we have to make a correctly formatted
390     * header before we can transmit.
391     */
392
393    iov[0].iov_base = (byte_p_t) &xq->hdr;
394    iov[0].iov_len  = RPC_C_DG_RAW_PKT_HDR_SIZE;
395    iovlen = 1;
396    sendcc = iov[0].iov_len;
397
398    iov[1].iov_base = (byte_p_t) xqe->body;
399    iov[1].iov_len  = xqe->body_len;
400
401    if (xqe->body_len != 0)
402    {
403        iov[iovlen].iov_base = (byte_p_t) last_xqe->body;
404        iov[iovlen].iov_len  = last_xqe->body_len;
405        sendcc += iov[iovlen++].iov_len;
406
407        while (last_xqe->more_data != NULL)
408        {
409            iov[iovlen].iov_base = (byte_p_t) last_xqe->more_data->body;
410            iov[iovlen].iov_len  = last_xqe->more_data->body_len;
411            sendcc += iov[iovlen++].iov_len;
412            last_xqe = last_xqe->more_data;
413    }
414    }
415
416    assert(iovlen > 0 && iovlen <= RPC_C_DG_MAX_NUM_PKTS_IN_FRAG + 1);
417    assert(xqe->frag_len == 0 ||
418           (unsigned32)sendcc == (xqe->frag_len + RPC_C_DG_RAW_PKT_HDR_SIZE));
419
420    xqe->frag_len = xq->hdr.len = sendcc - RPC_C_DG_RAW_PKT_HDR_SIZE;
421
422    RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5,
423        ("(rpc__dg_xmitq_elt_xmit) %s %lu.%u.%u len=%lu %s\n",
424        rpc__dg_pkt_name(RPC_DG_HDR_INQ_PTYPE(&call->xq.hdr)),
425        (unsigned long)call->xq.hdr.seq, xqe->fragnum,
426        xqe->serial_num, (unsigned long)xq->hdr.len,
427        RPC_DG_HDR_FLAG_IS_SET(&xq->hdr, RPC_C_DG_PF_NO_FACK) ? "" : "frq"));
428
429#ifdef MISPACKED_HDR
430    /* !!! ...compress hdr pointed to by iov[0] into raw_hdr... !!! */
431    hdrp = iov[0].base;
432    compress_hdr(&xq->hdr, &raw_hdr);
433    iov[0].base = (byte_p_t) &raw_hdr;
434#endif
435
436    /*
437     * Attach any authentication information now.
438     */
439
440    if (key_info != NULL)
441    {
442        unsigned32 st;
443        int overhead;
444
445        dce_pointer_t cksum = last_xqe->body->args + last_xqe->body_len;
446
447        (*auth_epv->pre_send) (key_info, xqe, &xq->hdr, iov, iovlen, cksum, &st);
448        if (st != 0)
449        {
450            RPC_DBG_GPRINTF(
451                ("(rpc__dg_xmitq_elt_xmit) auth pre_send failed, status = %x\n",
452                st));
453            rpc__dg_call_signal_failure (call, st);
454            return;
455        }
456        overhead = auth_epv->overhead;
457
458        if (iovlen == 1)
459        {
460        iovlen = 2;
461        iov[1].iov_len += overhead;
462        }
463        else
464        {
465            iov[iovlen-1].iov_len += overhead;
466        }
467        sendcc += overhead;
468    }
469    RPC_DBG_PRINTF(rpc_e_dbg_xmit, 5,
470        ("(rpc__dg_xmitq_elt_xmit) iovlen %d, sendcc %ld\n",
471         iovlen, sendcc));
472
473    /*
474     * Send out the datagram (checking whether we should broadcast it).
475     * For the non-broadcast cast, we accept a few EWOULDBLOCK send
476     * failures (by sleeping for a bit and then trying again).  We wish
477     * we could have the socket in blocking I/O mode, but the receive
478     * path really wants the socket in NON-blocking mode.
479     */
480
481    if (RPC_DG_FLAG_IS_SET(call->xq.base_flags, RPC_C_DG_PF_BROADCAST))
482    {
483        send_broadcast(call, iov, iovlen);
484    }
485    else
486    {
487        for (i = 0; i < MAX_SENDMSG_RETRIES; i++)
488        {
489            RPC_DG_SOCKET_SENDMSG(call->sock_ref->sock, iov, iovlen,
490                               call->addr, &sentcc, &serr);
491
492            RPC_DG_SOCK_UPDATE_ERR_COUNT(call->sock_ref, serr);
493
494            if (! RPC_SOCKET_IS_ERR(serr) && sentcc == sendcc)
495            {
496                RPC_DG_STATS_INCR(pkts_sent);
497                RPC_DG_STATS_INCR(pstats[RPC_DG_HDR_INQ_PTYPE(&xq->hdr)].sent);
498                break;
499            }
500
501            if (! RPC_SOCKET_ERR_EQ(serr, RPC_C_SOCKET_EWOULDBLOCK))
502            {
503                RPC_DBG_GPRINTF(
504                    ("(rpc__dg_xmitq_elt_xmit) sendmsg failed, sendcc = %ld, sentcc = %ld, error = %d\n",
505                    sendcc, sentcc, RPC_SOCKET_ETOI(serr)));
506                break;
507            }
508
509            if (! block)
510                break;
511
512            /*
513             * Handle EWOULDBLOCKs waiting for the condition to go away.
514             */
515
516            RPC_DBG_PRINTF(rpc_e_dbg_xmit, 2,
517                ("(rpc__dg_xmitq_elt_xmit) sendmsg failed with EWOULDBLOCK; waiting\n"));
518
519            rpc__socket_nowriteblock_wait(call->sock_ref->sock, NULL);
520        }
521    }
522
523    xq->timestamp = rpc__clock_stamp();
524
525    if (RPC_DG_HDR_INQ_PTYPE(&xq->hdr) == RPC_C_DG_PT_RESPONSE)
526    {
527        xq->hdr.seq = original_seq;
528    }
529}
530
531/*
532 * R P C _ _ D G _ X M I T Q _ I N I T
533 *
534 * Initialize a transmit queue (rpc_dg_xmit_q_t).  Note that we DON'T
535 * fill in (all) the prototype packet header here because we don't have
536 * enough info to do it at this point.
537 */
538
539PRIVATE void rpc__dg_xmitq_init
540(
541    rpc_dg_xmitq_p_t xq
542)
543{
544    /*
545     * Presumably the call is either locked or "private" at this point
546     * RPC_DG_CALL_LOCK_ASSERT(call);
547     */
548
549    rpc__dg_xmitq_reinit(xq);
550
551    xq->max_rcv_tsdu                = RPC_C_DG_INITIAL_MAX_PKT_SIZE;
552    xq->max_snd_tsdu                = RPC_C_DG_INITIAL_MAX_PKT_SIZE;
553    xq->max_frag_size               = RPC_C_DG_INITIAL_MAX_PKT_SIZE;
554    xq->snd_frag_size               = RPC_C_DG_INITIAL_MAX_PKT_SIZE;
555    xq->max_blast_size              = RPC_C_DG_INITIAL_MAX_BLAST_SIZE;
556    xq->xq_timer                    = RPC_C_DG_INITIAL_XQ_TIMER;
557    xq->xq_timer_throttle           = 1;
558    xq->high_cwindow                = 0;
559
560    /*
561     * Initialize some highly constant fields (RPC protocol version and
562     * local NDR drep) in the xmitq's prototype packet header.
563     */
564
565    RPC_DG_HDR_SET_VERS(&xq->hdr);
566    RPC_DG_HDR_SET_DREP(&xq->hdr);
567
568    xq->hdr.auth_proto = 0;
569}
570
571/*
572 * R P C _ _ D G _ X M I T Q _ R E I N I T
573 *
574 * Reinitialize a transmit queue (rpc_dg_xmit_q_t).  Note that we DON'T
575 * fill in the prototype packet header here because we don't have enough
576 * info to do it at this point.  Calling this routine will retain some
577 * state between calls; see xmitq_init for which state is retained.
578 */
579
580PRIVATE void rpc__dg_xmitq_reinit
581(
582    rpc_dg_xmitq_p_t xq
583)
584{
585    /*
586     * Presumably the call is either locked or "private" at this point
587     * RPC_DG_CALL_LOCK_ASSERT(call);
588     */
589
590    xq->head = xq->first_unsent = xq->tail = xq->part_xqe = xq->rexmitq = NULL;
591
592    xq->next_fragnum                = 0;
593    xq->next_serial_num             = 0;
594    xq->last_fack_serial            = -1;
595    xq->cwindow_size                = 0;
596    xq->window_size                 = RPC_C_DG_INITIAL_WINDOW_SIZE;
597    xq->blast_size                  = 0;
598    xq->freqs_out                   = 0;
599    xq->push                        = false;
600    xq->awaiting_ack                = false;
601    xq->rexmit_timeout              = RPC_C_DG_INITIAL_REXMIT_TIMEOUT;
602    /*
603     * Temporarily, we turn off the first fack wait logic for recovering
604     * the small in/out performance. We'll revisit it later.
605     */
606    xq->first_fack_seen             = true;
607}
608
609/*
610 * R P C _ _ D G _ X M I T Q _ F R E E
611 *
612 * Frees data referenced by a transmit queue (rpc_dg_xmit_q_t).  The
613 * transmit queue itself is NOT freed, since it's (assumed to be) part
614 * of a larger structure.  Clearly this means any sort of xmitq related
615 * ack must have arrived.
616 */
617
618PRIVATE void rpc__dg_xmitq_free
619(
620    rpc_dg_xmitq_p_t xq,
621    rpc_dg_call_p_t call
622)
623{
624    /*
625     * presumably the call is either locked or 'private' at this point
626     * RPC_DG_CALL_LOCK_ASSERT(call);
627     */
628
629    RPC_DG_XMITQ_AWAITING_ACK_CLR(xq);
630
631    while (xq->head != NULL)
632    {
633        rpc_dg_xmitq_elt_p_t xqe = xq->head;
634
635        xq->head = xqe->next;
636        rpc__dg_pkt_free_xqe(xqe, call);
637    }
638    xq->first_unsent = xq->tail = xq->rexmitq = NULL;
639
640    /*
641     * Clear any previously set blast_size.
642     */
643
644    xq->blast_size = 0;
645}
646
647/*
648 * R P C _ _ D G _ X M I T Q _ A P P E N D _ P P
649 *
650 * Append a transmit queue's partial packet to the transmit queue itself.
651 */
652
653PRIVATE void rpc__dg_xmitq_append_pp
654(
655        rpc_dg_call_p_t call,
656        unsigned32 *st
657)
658{
659    rpc_dg_xmitq_p_t xq = &call->xq;
660    rpc_dg_xmitq_elt_p_t xqe = xq->part_xqe;
661    rpc_key_info_p_t key_info = call->key_info;
662    int ptype;
663    unsigned32 frag_length = 0; /* # bytes in the body of the fragment */
664    rpc_dg_xmitq_elt_p_t last_xqe = xqe;
665
666    *st = rpc_s_ok;
667
668    RPC_DG_CALL_LOCK_ASSERT(call);
669
670    if (xqe == NULL)
671        return;
672
673    /*
674     * Compute the fragment length and store it.
675     */
676
677    frag_length = last_xqe->body_len;
678
679    while (last_xqe->more_data != NULL)
680    {
681        frag_length += last_xqe->more_data->body_len;
682        last_xqe = last_xqe->more_data;
683    }
684
685    xqe->frag_len = frag_length;
686
687    xqe->next       = NULL;
688    xqe->fragnum    = xq->next_fragnum++;
689    xqe->flags      = 0;
690
691    /*
692     * Add the partial xqe to the queue.
693     */
694
695    if (xq->head == NULL)
696    {
697        xq->head = xq->tail = xq->first_unsent = xqe;
698    }
699    else
700    {
701        xq->tail->next = xqe;
702        xq->tail = xqe;
703        if (xq->first_unsent == NULL)
704            xq->first_unsent = xqe;
705    }
706
707    /*
708     * "Normal" idempotent operations with *large ins* get tagged as
709     * non-idempotent calls.  Old (1.5.1) servers (and 2.0 as well) can
710     * dispose of their response packet in the case of idempotent calls
711     * with "small outs".  V2.0 clients dispose of acknowledged input
712     * stream frags hence, they are unable to rerun an idempotent call
713     * with large ins in the event that the server response is lost.
714     *
715     * The condition below is triggered for fragnum 0 only if we aren't
716     * 'pushing' the queue; ie. there are more packets to follow so we
717     * can assume the call has large INS.
718     */
719
720    if (xqe->fragnum == 0 && ! xq->push)
721    {
722        if (RPC_DG_HDR_INQ_PTYPE(&xq->hdr) == RPC_C_DG_PT_REQUEST
723            && ! RPC_DG_FLAG_IS_SET(xq->base_flags, RPC_C_DG_PF_MAYBE))
724        {
725            xq->base_flags &= ~RPC_C_DG_PF_IDEMPOTENT;
726        }
727    }
728
729    /*
730     * Set the "frag" bit appropriately.  The only time we don't set
731     * this bit is when the first packet is being pushed from xmitq_push.
732     * If a packet is appended at any time before we begin "pushing,"
733     * we can be sure that the xmit is fragmented.  Once the flag is
734     * set, it never gets unset, so no need to worry that the last frag
735     * will reset the flag.
736     */
737
738    if (! xq->push)
739    {
740        xq->base_flags |= RPC_C_DG_PF_FRAG;
741    }
742
743    xq->part_xqe = NULL;
744
745    /*
746     * Only encrypt data packets -- setting key_info to NULL
747     * disables encrypt machinery for this packet.
748     */
749
750    ptype = RPC_DG_HDR_INQ_PTYPE(&xq->hdr);
751    if (!RPC_DG_PT_IS_DATA(ptype))
752        key_info = NULL;
753
754    if (key_info != NULL)
755    {
756        rpc_dg_auth_epv_p_t auth_epv = call->auth_epv;
757        unsigned32 blocksize = auth_epv->blocksize;
758
759        /*
760         * If the packet length isn't a multiple of the encryption block
761         * size, round it up now.
762         */
763        frag_length = (((frag_length) + blocksize - 1)
764                       / blocksize) * blocksize;
765        last_xqe->body_len += (frag_length - xqe->frag_len);
766        xqe->frag_len = frag_length;
767
768        assert(RPC_C_DG_RAW_PKT_HDR_SIZE + frag_length + auth_epv->overhead <= xq->snd_frag_size);
769
770        if (last_xqe->body_len + auth_epv->overhead
771            > RPC_C_DG_MAX_PKT_BODY_SIZE)
772        {
773            /*
774             * This can happen if the fragment gets pushed.
775             */
776            last_xqe->more_data = rpc__dg_pkt_alloc_xqe(call, st);
777            if (*st != rpc_s_ok)
778                return;
779        }
780        (*auth_epv->encrypt) (key_info, xqe, st);
781        if (*st != rpc_s_ok)
782            return;
783    }
784}
785
786/*
787 * R P C _ _ D G _ X M I T Q _ R E S T A R T
788 *
789 * Timeout everything on the transmit queue.  This routine is used when
790 * we infer that the receiver has lost some of the data previously sent
791 * it.  Situations in which we are forced to *infer* data loss include
792 * receiving a no-call in response to a ping, and receiving a ping while
793 * we are in the xmit of final states.  In such situations we recover
794 * by beginning to send the xmitq again, and hoping the flow control
795 * login will kick in.
796 */
797
798PRIVATE void rpc__dg_xmitq_restart
799(
800    rpc_dg_call_p_t call
801)
802{
803    rpc_dg_xmitq_p_t xq = &call->xq;
804    rpc_dg_xmitq_elt_p_t xqe, tail = NULL;
805    unsigned32 rexmit_cnt = 0;
806
807    /*
808     * If the xmitq has already been set up to do a transmission, leave
809     * it alone.   Since 'restarting' the queue is a meat-axe approach
810     * to error recovery, we'll defer to any 'normal' processing that
811     * might also be trying to handle the xmitq.
812     */
813
814    if (RPC_DG_CALL_READY_TO_SEND(call))
815    {
816        RPC_DG_START_XMIT(call);
817        return;
818    }
819
820    for (xqe = xq->head; xqe != NULL && xqe != xq->first_unsent;
821         xqe = xqe->next)
822    {
823        rexmit_cnt++;
824
825        /*
826         * If the packets is counted in the current congestion window,
827         * remove it.  Also check to see if the packet counted as one
828         * of our outstanding fack requests.
829         */
830
831        if (xqe->in_cwindow)
832        {
833            xqe->in_cwindow = false;
834            xq->cwindow_size--;
835            if (! RPC_DG_FLAG_IS_SET(xqe->flags, RPC_C_DG_PF_NO_FACK) ||
836                RPC_DG_FLAG_IS_SET(xqe->flags, RPC_C_DG_PF_LAST_FRAG))
837            {
838                xq->freqs_out--;
839            }
840        }
841        if (rexmit_cnt == 1)
842            xq->rexmitq = xqe;
843        else
844            tail->next_rexmit = xqe;
845
846        xqe->next_rexmit = NULL;
847        tail = xqe;
848    }
849
850    /*
851     * If we didn't find any packets to retransmit, then let's send
852     * the first unsent packet.
853     */
854
855    if (rexmit_cnt == 0 && xq->first_unsent != NULL)
856    {
857        rexmit_cnt = 1;
858    }
859
860    xq->blast_size = MIN(rexmit_cnt, RPC_C_DG_INITIAL_BLAST_SIZE);
861
862    if (RPC_DG_CALL_READY_TO_SEND(call))
863	 {
864        RPC_DG_START_XMIT(call);
865	 }
866}
867