1/*
2 * Copyright (c) 2010 Apple Inc. All rights reserved.
3 *
4 * @APPLE_LICENSE_HEADER_START@
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1.  Redistributions of source code must retain the above copyright
11 *     notice, this list of conditions and the following disclaimer.
12 * 2.  Redistributions in binary form must reproduce the above copyright
13 *     notice, this list of conditions and the following disclaimer in the
14 *     documentation and/or other materials provided with the distribution.
15 * 3.  Neither the name of Apple Inc. ("Apple") nor the names of its
16 *     contributors may be used to endorse or promote products derived from
17 *     this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY
20 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY
23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 *
30 * Portions of this software have been released under the following terms:
31 *
32 * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC.
33 * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY
34 * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION
35 *
36 * To anyone who acknowledges that this file is provided "AS IS"
37 * without any express or implied warranty:
38 * permission to use, copy, modify, and distribute this file for any
39 * purpose is hereby granted without fee, provided that the above
40 * copyright notices and this notice appears in all source code copies,
41 * and that none of the names of Open Software Foundation, Inc., Hewlett-
42 * Packard Company or Digital Equipment Corporation be used
43 * in advertising or publicity pertaining to distribution of the software
44 * without specific, written prior permission.  Neither Open Software
45 * Foundation, Inc., Hewlett-Packard Company nor Digital
46 * Equipment Corporation makes any representations about the suitability
47 * of this software for any purpose.
48 *
49 * Copyright (c) 2007, Novell, Inc. All rights reserved.
50 * Redistribution and use in source and binary forms, with or without
51 * modification, are permitted provided that the following conditions
52 * are met:
53 *
54 * 1.  Redistributions of source code must retain the above copyright
55 *     notice, this list of conditions and the following disclaimer.
56 * 2.  Redistributions in binary form must reproduce the above copyright
57 *     notice, this list of conditions and the following disclaimer in the
58 *     documentation and/or other materials provided with the distribution.
59 * 3.  Neither the name of Novell Inc. nor the names of its contributors
60 *     may be used to endorse or promote products derived from this
61 *     this software without specific prior written permission.
62 *
63 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
64 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
65 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
66 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY
67 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
68 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
69 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
70 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
71 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
72 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
73 *
74 * @APPLE_LICENSE_HEADER_END@
75 */
76
77/*
78**
79**  NAME:
80**
81**      dgpkt.c
82**
83**  FACILITY:
84**
85**      Remote Procedure Call (RPC)
86**
87**  ABSTRACT:
88**
89**  Routines for managing the datagram packet pool.
90**
91**
92*/
93
94#include <dg.h>
95#include <dgpkt.h>
96#include <dgcall.h>
97#include <dgscall.h>
98#include <dgccall.h>
99
100/* ========================================================================= */
101
102GLOBAL rpc_dg_pkt_pool_t rpc_g_dg_pkt_pool;
103
104/* ========================================================================= */
105
106INTERNAL rpc_dg_pkt_pool_elt_p_t pkt_alloc (void);
107
108INTERNAL void pkt_free (
109        rpc_dg_pkt_pool_elt_p_t  /*pkt*/,
110        rpc_dg_call_p_t  /*call*/
111    );
112
113INTERNAL void dequeue_pool_waiter (
114        rpc_dg_call_p_t  /*call*/,
115        rpc_dg_call_p_t * /*head*/,
116        rpc_dg_call_p_t * /*tail*/
117    );
118
119INTERNAL void scan_waiter_lists (
120        rpc_dg_call_p_t  /*call*/
121    );
122
123/* ========================================================================= */
124
125/*
126 * These macros define the pool condition under which it's okay to grant
127 * new reservations.  In the general case, there must be enough free packets to
128 * cover all current reservations, plus n to cover the reservation being
129 * considered.
130 * In the second case, there are a numer of pre-reserved reservations set
131 * aside for use by servers only.
132 */
133
134#define RESERVATION_AVAILABLE(n) \
135        (pool->free_count + pool->pkt_count >= pool->reservations + (n))
136
137#define SRV_RESERVATION_AVAILABLE(call) \
138        (RPC_DG_CALL_IS_SERVER(call) && pool->srv_resv_avail > 0)
139
140/*
141 * The macro to update the rationing state.
142 *
143 * First, determine if we are currently rationing packets.
144 * If so, we'll set a flag to indicate this condition.
145 * This information will be used by recvq_insert when deciding whether
146 * to queue this packet to a call handle.
147 *
148 * Second, see if the pool is getting low. The xmit_fack routine uses
149 * this information to begin backing off the sender.
150 * Avoid doing the computation if is_rationing is already set;
151 */
152#define UPDATE_RATIONING_STATE(junk) \
153{ \
154    rpc_g_dg_pkt_pool.is_rationing = RPC_DG_PKT_RATIONING(0); \
155    rpc_g_dg_pkt_pool.low_on_pkts = rpc_g_dg_pkt_pool.is_rationing || \
156        (rpc_g_dg_pkt_pool.free_count + rpc_g_dg_pkt_pool.pkt_count <= \
157         2 * rpc_g_dg_pkt_pool.reservations); \
158}
159
160/* ========================================================================= */
161/*
162 * D E Q U E U E _ P O O L _ W A I T E R
163 *
164 * This routine is used to dequeue a call handle from any location within
165 * a list of waiting calls.  Calls may be waiting for 1) a reservation,
166 * or 2) a packet; the head/tail pointer will determine which list to
167 * search.
168 *
169 * The packet pool and call handle locks must be held before calling
170 * this routine.
171 */
172
173INTERNAL void dequeue_pool_waiter
174(
175    rpc_dg_call_p_t call,
176    rpc_dg_call_p_t *head,
177    rpc_dg_call_p_t *tail
178)
179{
180    rpc_dg_call_p_t waiter = *head, prev = NULL;
181
182    for ( ; waiter != NULL; prev = waiter, waiter = waiter->pkt_chain)
183    {
184        if (waiter == call)
185        {
186            if (prev == NULL)
187                (*head) = call->pkt_chain;
188            else
189                prev->pkt_chain = call->pkt_chain;
190
191            if (call->pkt_chain == NULL)
192                (*tail) = prev;
193
194            call->is_in_pkt_chain = false;
195
196            /*
197             * Decrement our reference to the call.
198             */
199
200            if (RPC_DG_CALL_IS_SERVER(call))
201            {
202                RPC_DG_SCALL_RELEASE_NO_UNLOCK((rpc_dg_scall_p_t *) &call);
203            }
204            else
205            {
206                RPC_DG_CCALL_RELEASE_NO_UNLOCK((rpc_dg_ccall_p_t *) &call);
207            }
208
209            return;
210        }
211    }
212    /*
213     * Since these lists are internal, we should never be mistaken
214     * about what's on them.
215     */
216    RPC_DBG_GPRINTF(("(dequeue_pool_waiter) No call found\n"));
217}
218
219/*
220 * S C A N _ W A I T E R _ L I S T S
221 *
222 * This routine is called when the system is *not* in a rationing state,
223 * but there are calls which are blocked waiting for 1) a pool reservation,
224 * or 2) a packet.  Precedence is given to reservation waiters.
225 *
226 * If it is possible that the caller holds the lock for one of the calls
227 * on one of the waiters list, it must send us the address of that call
228 * handle.  Since we need to lock a call before signalling it, this allows
229 * us to avoid trying to re-lock a call handle that has already been
230 * locked by the caller.
231 *
232 * The pool must be locked before calling this routine.
233 */
234
235INTERNAL void scan_waiter_lists
236(
237    rpc_dg_call_p_t call
238)
239{
240    rpc_dg_call_p_t waiter = NULL, prev = NULL;
241    rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool;
242
243    RPC_DG_PKT_POOL_LOCK_ASSERT(0);
244
245    /*
246     * See if it's possible to wake up a blocked call.  Precedence is
247     * given to calls waiting for reservations.  Note that if there are
248     * N reservations, and N+1 free packets, it is possible to wake a
249     * call waiting for a packet.  However, we will not do so if there
250     * are calls waiting for reservations.  In that case, we just return,
251     * since a reservation waiter requires N+2 free packets in the pool
252     * before it can be granted a reservation.
253     */
254
255    if (pool->rsv_waiters_head != NULL)
256    {
257        /*
258         * There are two places we might find an available reservation.  First
259         * see if there are enough free packets in the pool to grant another
260         * reservation...
261         */
262        if (RESERVATION_AVAILABLE(pool->rsv_waiters_head->n_resvs_wait))
263        {
264            waiter = pool->rsv_waiters_head;
265            pool->rsv_waiters_head = waiter->pkt_chain;
266        }
267
268       /*
269        * ... otherwise, if any of the pre-reserved server reservations
270        * are available, see if there are any scalls on the waiters' queue.
271        */
272       else if (pool->srv_resv_avail > 0)
273       {
274            for (waiter = pool->rsv_waiters_head; waiter != NULL;
275                 prev = waiter, waiter = waiter->pkt_chain)
276            {
277                if (RPC_DG_CALL_IS_SERVER(waiter)
278                    && pool->max_resv_pkt+1 >= waiter->n_resvs_wait)
279                {
280                    if (prev == NULL)
281                         pool->rsv_waiters_head = waiter->pkt_chain;
282                    else
283                        prev->pkt_chain = waiter->pkt_chain;
284
285                    if (waiter->pkt_chain == NULL)
286                         pool->rsv_waiters_tail = prev;
287
288                    break;
289                }
290            }
291        }
292    }
293    else if (pool->pkt_waiters_head != NULL)
294    {
295        if (! RPC_DG_PKT_RATIONING(0))
296        {
297            waiter = pool->pkt_waiters_head;
298            pool->pkt_waiters_head = waiter->pkt_chain;
299        }
300    }
301
302    /*
303     * If we weren't able to dequeue a waiter, return now.
304     */
305
306    if (waiter == NULL)
307        return;
308
309    /*
310     * Reset the call's flag, which is protected by the packet pool lock.
311     */
312
313    waiter->is_in_pkt_chain = false;
314
315    /*
316     * Signalling the call requires that we hold the call's lock.  This
317     * involves unlocking the pool first to respect the locking heirarchy
318     * (call, then pool).  However, if the call we pulled off the queue
319     * happens to be the call we were sent, it's already locked, and
320     * the operation is somewhat simpler.
321     */
322
323    if (call != waiter)
324    {
325        RPC_DG_PKT_POOL_UNLOCK(0);
326        RPC_DG_CALL_LOCK(waiter);
327    }
328
329    rpc__dg_call_signal(waiter);
330
331    /*
332     * Since we've removed the call from the waiters' queue, we need
333     * to release that reference.  In the case where we locked the
334     * call, unlock it.  Otherwise, leave it the way we found it.
335     */
336
337    if (call != waiter)
338    {
339        if (RPC_DG_CALL_IS_SERVER(waiter))
340        {
341            RPC_DG_SCALL_RELEASE((rpc_dg_scall_p_t *) &waiter);
342        }
343        else
344        {
345            RPC_DG_CCALL_RELEASE((rpc_dg_ccall_p_t *) &waiter);
346        }
347
348        /*
349         * Return the pool locked, as the caller expects.
350         */
351
352        RPC_DG_PKT_POOL_LOCK(0);
353    }
354    else
355    {
356        if (RPC_DG_CALL_IS_SERVER(waiter))
357        {
358            RPC_DG_SCALL_RELEASE_NO_UNLOCK((rpc_dg_scall_p_t *) &waiter);
359        }
360        else
361        {
362            RPC_DG_CCALL_RELEASE_NO_UNLOCK((rpc_dg_ccall_p_t *) &waiter);
363        }
364    }
365}
366
367/*
368 * R P C _ _ D G _ P K T _ P O O L _ I N I T
369 *
370 * Initialize the packet pool structure.
371 */
372
373PRIVATE void rpc__dg_pkt_pool_init(void)
374{
375    rpc_dg_pkt_pool_elt_p_t   pkt;
376    rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool;
377    unsigned32 i;
378
379    /*
380     * Allow the total number of packets in the pool to be settable thru
381     * a debug switch.  [# of pkts = 2 ^ (level - 1)]
382     */
383
384#ifdef DEBUG
385    if (RPC_DBG(rpc_es_dbg_pkt_quota_size, 1))
386    {
387        pool->max_pkt_count =
388              1 << (rpc_g_dbg_switches[(int) rpc_es_dbg_pkt_quota_size] - 1);
389    }
390    else
391#endif
392    {
393        pool->max_pkt_count = RPC_C_DG_PKT_MAX;
394    }
395
396    pool->pkt_count = pool->max_pkt_count;
397    pool->active_rqes = 0;
398    pool->active_xqes = 0;
399    pool->free_count = 0;
400    pool->free_list  = NULL;
401    pool->free_list_tail = NULL;
402
403    /*
404     * The number of packets required for the largest fragment size
405     * possible.
406     */
407    pool->max_resv_pkt = RPC_C_DG_MAX_NUM_PKTS_IN_FRAG;
408
409    /*
410     * We always start out by reserving RPC_C_DG_MAX_NUM_PKTS_IN_FRAG
411     * packets for the listener thread, plus max_resv_pkt+1 packets for
412     * each of the pre-reserved server-side reservations.
413     */
414
415    pool->reservations = RPC_C_DG_MAX_NUM_PKTS_IN_FRAG
416        + ((pool->max_resv_pkt + 1) * RPC_C_DG_PKT_INIT_SERVER_RESVS);
417
418    /*
419     * Initialize the count of pre-reserved server-side reservations.
420     * If, when a server call tries to make a reservertion, this count is greater than
421     * 0, the server call will never need to wait.
422     */
423
424    pool->srv_resv_avail = RPC_C_DG_PKT_INIT_SERVER_RESVS;
425
426    /*
427     * Initialize the two lists of waiters.  Note that when the head
428     * of the list is NULL, the tail pointer is undefined.
429     */
430
431    pool->pkt_waiters_head  = NULL;
432    pool->rsv_waiters_head  = NULL;
433
434    /*
435     * Allocate an initial allotment of packet buffers.  See notes in
436     * pkt_alloc() below for more details about the makeup of the packets.
437     */
438
439    for (i = MIN(RPC_C_DG_PKT_INIT_CNT, pool->pkt_count); i > 0; i--)
440    {
441        RPC_MEM_ALLOC(pkt, rpc_dg_pkt_pool_elt_p_t,
442            sizeof(rpc_dg_pkt_pool_elt_t), RPC_C_MEM_DG_PKT_POOL_ELT,
443            RPC_C_MEM_NOWAIT);
444
445        pkt->u.next = pool->free_list;
446        pkt->is_on_free_list = true;
447        pool->free_list = pkt;
448
449        /*
450         * Anchor the tail pointer on the first packet allocated.
451         */
452        if (pool->free_list_tail == NULL)
453            pool->free_list_tail = pkt;
454
455        pool->free_count++;
456        pool->pkt_count--;
457    }
458
459    RPC_MUTEX_INIT(pool->pkt_mutex);
460}
461
462/*
463 * R P C _ _ D G _ P K T _ P O O L _ F O R K _ H A N D L E R
464 *
465 * Handle fork related processing for this module.
466 */
467
468PRIVATE void rpc__dg_pkt_pool_fork_handler
469(
470    rpc_fork_stage_id_t stage
471)
472{
473    rpc_dg_pkt_pool_elt_p_t   pkt, next_pkt;
474    rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool;
475
476    switch ((int)stage)
477    {
478        case RPC_C_PREFORK:
479            break;
480        case RPC_C_POSTFORK_PARENT:
481            break;
482        case RPC_C_POSTFORK_CHILD:
483            /*
484             * Free any packets sitting on the free list.
485             */
486
487            pkt = pool->free_list;
488            while (pool->free_count--)
489            {
490                next_pkt = pkt->u.next;
491                RPC_MEM_FREE(pkt, RPC_C_MEM_DG_PKT_POOL_ELT);
492                pkt = next_pkt;;
493            }
494
495            /*
496             * Clear out remaining fields of the packet pool.
497             */
498            /*b_z_e_r_o((char *) &rpc_g_dg_pkt_pool, sizeof(rpc_dg_pkt_pool_t));*/
499	    memset( &rpc_g_dg_pkt_pool, 0, sizeof(rpc_dg_pkt_pool_t));
500            break;
501    }
502}
503
504/*
505 * P K T _ A L L O C
506 *
507 * Return a datagram packet.  The packet comes either from the free list
508 * we're maintaining, or if the free list is empty, we alloc up a new
509 * one.  It is up to the callers of this routine to respect the packet
510 * rationing rules; as such, this routine should always be able to return
511 * a packet successfully.
512 *
513 * The packet pool lock must be held before calling this routine
514 */
515
516INTERNAL rpc_dg_pkt_pool_elt_p_t pkt_alloc(void)
517{
518    rpc_dg_pkt_pool_elt_p_t   pkt;
519    rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool;
520
521    RPC_DG_PKT_POOL_LOCK_ASSERT(0);
522
523    /*
524     * If there is an packet on the free list, use it.
525     */
526
527    if (pool->free_list != NULL)
528    {
529        pkt = pool->free_list;
530        pool->free_list = pool->free_list->u.next;
531        pool->free_count--;
532
533        /*
534         * If the free list is now empty, then there's no longer a tail.
535         */
536        if (pool->free_list == NULL)
537            pool->free_list_tail = NULL;
538
539#ifdef DEBUG
540        if (RPC_DBG_EXACT(rpc_es_dbg_mem,20))
541        {
542            unsigned32 count;
543            rpc_dg_pkt_pool_elt_p_t next_pkt;
544
545            for (count = 0, next_pkt = pool->free_list;
546                 next_pkt != NULL;
547                 count++, next_pkt = next_pkt->u.next);
548
549            if (pool->free_count != count)
550            {
551                RPC_DBG_PRINTF(rpc_e_dbg_mem, 20,
552                   ("(pkt_alloc) free_count mismatch: free_count (%d) != %d\n",
553                                pool->free_count, count));
554                /*
555                 * rpc_m_dgpkt_pool_corrupt
556                 * "(%s) DG packet free pool is corrupted"
557                 */
558                rpc_dce_svc_printf (
559                    __FILE__, __LINE__,
560                    "%s",
561                    rpc_svc_dg_pkt,
562                    svc_c_sev_fatal | svc_c_action_abort,
563                    rpc_m_dgpkt_pool_corrupt,
564                    "pkt_alloc" );
565            }
566        }
567#endif
568    }
569
570    /*
571     * Else, we'll need to alloc up a new one.
572     */
573
574    else
575    {
576        /*
577         * Leave this assert un-ifdef'ed... it's cheap enough and it seems
578         * to be a reasonably good indicator of other problems related to
579         * xq/rq management.  If this assert triggers, it's time to enable the
580         * rest of the pkt pool debug stuff to help zero in on the problem
581         * (in the past, problems have been users of the pool, not the pool
582         * code itself).
583         */
584        assert(rpc_g_dg_pkt_pool.free_count == 0);
585
586        /*
587         * First make sure we haven't already allocated the maximum number
588         * of packets.  This should never happen since the callers of
589         * this routine are enforcing packet qoutas, which require that
590         * at least one packet be available at all times.
591         */
592
593        if (pool->pkt_count == 0)
594            return (NULL);
595
596        /*
597         * Allocate up a new packet.  The size of this packet is the
598         * maximum of what we need for either an xqe or an rqe.
599         */
600
601        RPC_MEM_ALLOC(pkt, rpc_dg_pkt_pool_elt_p_t,
602            sizeof(rpc_dg_pkt_pool_elt_t), RPC_C_MEM_DG_PKT_POOL_ELT,
603            RPC_C_MEM_NOWAIT);
604
605        pool->pkt_count--;
606    }
607
608    RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 5,
609            ("(pkt_alloc) pkts %u\n",
610            pool->free_count + pool->pkt_count));
611
612    pkt->is_on_free_list = false;
613
614    UPDATE_RATIONING_STATE(0);
615    return(pkt);
616}
617
618/*
619 * R P C _ _ D G _ P K T _ A L L O C _ X Q E
620 *
621 * Return a transmit queue element (rpc_dg_xmit_q_elt_t).  Before
622 * allocating the packet, check to see if we need to be rationing packets.
623 * If so, and this call is already using its reserved packet, block the
624 * call.
625 *
626 * Call handle lock must be held.  Acquires the packet pool lock.
627 */
628
629PRIVATE rpc_dg_xmitq_elt_p_t rpc__dg_pkt_alloc_xqe
630(
631    rpc_dg_call_p_t call,
632    unsigned32 *st
633)
634{
635    rpc_dg_pkt_pool_elt_p_t pkt;
636    rpc_dg_xmitq_elt_p_t xqe = NULL;
637    rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool;
638
639    RPC_DG_CALL_LOCK_ASSERT(call);
640
641    /*
642     * Before taking the packet pool lock, check to see if this call
643     * handle is using a private socket, and if the private socket has
644     * a private xqe that can be used.
645     */
646    if (call->sock_ref->is_private)
647    {
648        if (call->sock_ref->xqe != NULL)
649        {
650            xqe = call->sock_ref->xqe;
651            call->sock_ref->xqe = NULL;
652
653            xqe->next       = NULL;
654            xqe->more_data  = NULL;
655            xqe->frag_len   = 0;
656            xqe->flags      = 0;
657            xqe->body_len   = 0;
658            xqe->serial_num = 0;
659            xqe->in_cwindow = false;
660
661            return (xqe);
662        }
663        /*
664         * This call handle is using a private socket, but the cached
665         * xqe is already in use.  We'll need to really allocate one;
666         * if the call has not yet made a packet pool reservation, we
667         * need to do that now.
668         */
669        else if (call->n_resvs == 0)
670        {
671            unsigned32  resv ATTRIBUTE_UNUSED;
672
673            /*
674             * Only the client uses the private socket (at least for now). Thus,
675             * we can adjust the reservation in the blocking mode.
676             *
677             * If we ever allow the server callback's use of the private socket,
678             * we will need to be careful here.
679             *
680             * Since we have been running without a reservation, we are still in
681             * SBF (Single Buffer Fragment) and need the minimum reservation.
682             */
683            rpc__dg_pkt_adjust_reservation(call, 1, true);
684        }
685    }
686
687    RPC_DG_PKT_POOL_LOCK(0);
688
689    /*
690     * If we are rationing packets, and this call is already using its
691     * reserved packet, put the call on the list of packet waiters and
692     * go to sleep.
693     */
694
695    while (RPC_DG_PKT_RATIONING(0) && call->xq.head != NULL)
696    {
697        pool->blocked_alloc_xqe++;
698
699        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 3,
700            ("(alloc_xqe) rationing, blocking call, fc %u pkt %u [%s]\n",
701            pool->free_count, pool->pkt_count,
702            rpc__dg_act_seq_string(&call->xq.hdr)));
703
704        /*
705         * If the call is not already on the packet waiters' chain, queue
706         * it.  Update the call's reference count.
707         */
708
709        if (! call->is_in_pkt_chain)
710        {
711            if (pool->pkt_waiters_head == NULL)
712                pool->pkt_waiters_head = call;
713            else
714               pool->pkt_waiters_tail->pkt_chain = call;
715            pool->pkt_waiters_tail = call;
716            call->pkt_chain = NULL;
717            call->is_in_pkt_chain = true;
718
719            RPC_DG_CALL_REFERENCE(call);
720        }
721
722        RPC_DG_PKT_POOL_UNLOCK(0);
723
724        rpc__dg_call_wait(call, rpc_e_dg_wait_on_internal_event, st);
725
726        RPC_DG_PKT_POOL_LOCK(0);
727
728        if (*st != rpc_s_ok)
729        {
730            /*
731             * If there's an error, get off the queue.  (first make sure
732             * that the error didn't occur after someone had already
733             * removed us from the queue.)
734             */
735
736            if (call->is_in_pkt_chain)
737                dequeue_pool_waiter(call, &pool->pkt_waiters_head,
738                                          &pool->pkt_waiters_tail);
739
740            RPC_DG_PKT_POOL_UNLOCK(0);
741            return(NULL);
742        }
743
744        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 3,
745            ("(alloc_xqe) call signalled, fc %u pkt %u [%s]\n",
746            pool->free_count, pool->pkt_count,
747            rpc__dg_act_seq_string(&call->xq.hdr)));
748    }
749
750    /*
751     * Before giving up the PKT lock, check to see if this call is on
752     * the packet queue.  This could happen if the call got signalled
753     * for some other reason, but woke up to find that there was a packet
754     * available.
755     */
756
757    if (call->is_in_pkt_chain)
758        dequeue_pool_waiter(call, &pool->pkt_waiters_head,
759                                  &pool->pkt_waiters_tail);
760
761    /*
762     * We now have the packet pool locked, and we know that it's
763     * appropriate to allocate a packet.
764     */
765
766    pkt = pkt_alloc();
767
768    /*
769     * If the pkt pointer is NULL at this point, someone screwed up the
770     * quotas.
771     */
772
773    if (pkt == NULL)
774    {
775        RPC_DG_PKT_POOL_UNLOCK(0);
776        RPC_DBG_GPRINTF(("(rpc__dg_pkt_alloc_xqe) No buffers available\n"));
777        *st = rpc_s_coding_error;
778        return(NULL);
779    }
780
781    rpc_g_dg_pkt_pool.active_xqes++;
782
783    RPC_DG_PKT_POOL_UNLOCK(0);
784
785    xqe             = &pkt->u.xqe.xqe;
786    xqe->body       = &pkt->u.xqe.pkt;
787    xqe->next       = NULL;
788    xqe->more_data  = NULL;
789    xqe->frag_len   = 0;
790    xqe->flags      = 0;
791    xqe->body_len   = 0;
792    xqe->serial_num = 0;
793    xqe->in_cwindow = false;
794
795    return (xqe);
796}
797
798/*
799 * R P C _ _ D G _ P K T _ A L L O C _ R Q E
800 *
801 * Allocate a packet for reading in a network datagram.  This routine
802 * should always be able to return a packet since every packet initially
803 * belongs to the listener thread, which has a packet reservation but
804 * never holds onto packets.
805 *
806 * In the case of private sockets, the private rqe is always available
807 * (conceptually) for reading in a packet;  eventhough this new packet
808 * may then need to be dropped because of rationing.
809 */
810
811PRIVATE rpc_dg_recvq_elt_p_t rpc__dg_pkt_alloc_rqe(ccall)
812rpc_dg_ccall_p_t ccall;
813{
814    rpc_dg_pkt_pool_elt_p_t pkt;
815    rpc_dg_recvq_elt_p_t rqe;
816
817    /*
818     * Before taking the packet pool lock, check to see if this call
819     * handle is using a private socket, and if the private socket has
820     * a private rqe that can be used.
821     */
822    if (ccall != NULL)
823    {
824        if (ccall->c.sock_ref->rqe_available == true)
825        {
826            rqe = ccall->c.sock_ref->rqe;
827            ccall->c.sock_ref->rqe_available = false;
828
829            rqe->next   = NULL;
830            rqe->more_data = NULL;
831            rqe->frag_len = 0;
832            rqe->hdrp   = NULL;
833
834            return (rqe);
835        }
836        /*
837         * This call handle is using a private socket, but the cached
838         * rqe is already in use.  We'll need to really allocate one;
839         * if the call has not yet made a packet pool reservation, we
840         * need to do that now.
841         */
842        else if (ccall->c.n_resvs == 0)
843        {
844            /*
845             * Only the client uses the private socket (at least for now). Thus,
846             * we can adjust the reservation in the blocking mode.
847             *
848             * If we ever allow the server callback's use of the private socket,
849             * we will need to be careful here.
850             *
851             * Note: The ccall hasn't advertised larger fragment size than the
852             * minimum size yet. Thus the minimum reservation is required.
853             */
854            rpc__dg_pkt_adjust_reservation(&ccall->c, 1, true);
855        }
856    }
857
858    RPC_DG_PKT_POOL_LOCK(0);
859
860    pkt = pkt_alloc();
861
862    /*
863     * If the pkt pointer is NULL at this point, someone screwed up the
864     * packet_rationing.
865     */
866
867    if (pkt == NULL)
868    {
869        rpc_g_dg_pkt_pool.failed_alloc_rqe++;
870        RPC_DG_PKT_POOL_UNLOCK(0);
871        RPC_DBG_GPRINTF(("(rpc__dg_pkt_alloc_rqe) No buffers available\n"));
872        return(NULL);
873    }
874
875    rpc_g_dg_pkt_pool.active_rqes++;
876
877    RPC_DG_PKT_POOL_UNLOCK(0);
878
879    pkt->u.rqe.sock_ref = NULL;
880
881    rqe = &pkt->u.rqe.rqe;
882    rqe->pkt_real = &pkt->u.rqe.pkt;
883    rqe->pkt = (rpc_dg_raw_pkt_p_t) RPC_DG_ALIGN_8(rqe->pkt_real);
884
885    rqe->next   = NULL;
886    rqe->more_data = NULL;
887    rqe->frag_len = 0;
888    rqe->hdrp   = NULL;
889
890    return(rqe);
891}
892
893/*
894 * P K T _ F R E E
895 *
896 * Add a datagram packet onto the free list, and possibly signal a waiting
897 * call.  The rules for signalling calls are as follows:
898 *
899 *      - If rationing is in effect, see if the caller is on the packet
900 *        waiters' list If so, it must be waiting for an XQE, since calls
901 *        never block waiting for RQEs.  If the caller's xq->head is
902 *        NULL, it is allowed to allocate a packet.  Signal it.
903 *
904 *      - If not rationing, see if there are calls waiting for either
905 *        a reservation or a packet.  If so, signal one.
906 *
907 * The packet pool lock must be held before calling this routine.  It
908 * may be that the caller also holds a call lock.  If it is possible
909 * that this locked call might represent a call that is currently waiting
910 * on the packet waiters' queue, we need to be sent the address of the
911 * call handle.  This enables us to avoid trying to lock the call twice
912 * if we happen to pull that call handle off a queue.
913 *
914 * This scenario is only possible when this routine is called from the
915 * listener thread, since if we are being called by the call thread we
916 * can be sure that the thread is not waiting on the queue.  In particular,
917 * when the stubs free rqes, the call handle we receive will be NULL, since
918 * there is no danger that the call is currently queued.
919 */
920
921INTERNAL void pkt_free
922(
923    rpc_dg_pkt_pool_elt_p_t pkt,
924    rpc_dg_call_p_t call
925)
926{
927    rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool;
928
929    RPC_DG_PKT_POOL_LOCK_ASSERT(0);
930
931    assert(pkt != NULL);
932
933#ifdef DEBUG
934    /*
935     * Do some sanity checking on the packet pool.
936     */
937    if (RPC_DBG_EXACT(rpc_es_dbg_mem,20))
938    {
939        unsigned32 count;
940        rpc_dg_pkt_pool_elt_p_t next_pkt;
941
942        for (count = 0, next_pkt = pool->free_list;
943             next_pkt != NULL;
944             count++, next_pkt = next_pkt->u.next)
945        {
946            if (next_pkt == pkt)
947            {
948                RPC_DBG_PRINTF(rpc_e_dbg_mem, 20,
949                           ("(pkt_free) pkt already on free_list %p (%d)\n",
950                            pkt, count));
951                /*
952                 * rpc_m_dgpkt_pool_corrupt
953                 * "(%s) DG packet free pool is corrupted"
954                 */
955                rpc_dce_svc_printf (
956                    __FILE__, __LINE__,
957                    "%s",
958                    rpc_svc_dg_pkt,
959                    svc_c_sev_fatal | svc_c_action_abort,
960                    rpc_m_dgpkt_pool_corrupt,
961                    "pkt_free" );
962            }
963            if (!next_pkt->is_on_free_list)
964            {
965                RPC_DBG_PRINTF(rpc_e_dbg_mem, 20,
966                 ("(pkt_free) free'ed pkt(%p) is not marked as free (%d)\n",
967                                next_pkt, count));
968                /*
969                 * rpc_m_dgpkt_pool_corrupt
970                 * "(%s) DG packet free pool is corrupted"
971                 */
972                rpc_dce_svc_printf (
973                    __FILE__, __LINE__,
974                    "%s",
975                    rpc_svc_dg_pkt,
976                    svc_c_sev_fatal | svc_c_action_abort,
977                    rpc_m_dgpkt_pool_corrupt,
978                    "pkt_free" );
979            }
980        }
981
982        if (pool->free_count != count)
983        {
984            RPC_DBG_PRINTF(rpc_e_dbg_mem, 20,
985                    ("(pkt_free) free_count mismatch: free_count (%d) != %d\n",
986                            pool->free_count, count));
987            /*
988             * rpc_m_dgpkt_pool_corrupt
989             * "(%s) DG packet free pool is corrupted"
990             */
991            rpc_dce_svc_printf (
992                __FILE__, __LINE__,
993                "%s",
994                rpc_svc_dg_pkt,
995                svc_c_sev_fatal | svc_c_action_abort,
996                rpc_m_dgpkt_pool_corrupt,
997                "pkt_free" );
998        }
999
1000        if (pkt->is_on_free_list)
1001        {
1002            RPC_DBG_PRINTF(rpc_e_dbg_mem, 20,
1003                           ("(pkt_free) double free'ing pkt(%p)\n", pkt));
1004            /*
1005             * rpc_m_dgpkt_bad_free
1006             * "(%s) Attempt to free already-freed DG packet"
1007             */
1008            rpc_dce_svc_printf (
1009                __FILE__, __LINE__,
1010                "%s",
1011                rpc_svc_dg_pkt,
1012                svc_c_sev_fatal | svc_c_action_abort,
1013                rpc_m_dgpkt_bad_free,
1014                "pkt_free" );
1015        }
1016    }
1017#endif
1018
1019    if (pkt->is_on_free_list)
1020    {
1021        return;
1022    }
1023
1024    /*
1025     * Move the packet onto the tail of the pool's free list.
1026     */
1027
1028    if (pool->free_list == NULL)
1029    {
1030        pool->free_list = pool->free_list_tail = pkt;
1031    }
1032    else
1033    {
1034        pool->free_list_tail->u.next = pkt;
1035        pool->free_list_tail = pkt;
1036    }
1037
1038    pkt->u.next = NULL;
1039    pkt->is_on_free_list = true;
1040    pool->free_count++;
1041
1042    RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 5,
1043            ("(pkt_free) pkts %u\n",
1044            pool->free_count + pool->pkt_count));
1045    /*
1046     * Giving up this packet may have taken the system out of a rationing
1047     * state.  If so, try to reclaim any pre-reserved server reservations that
1048     * we may be outstanding.
1049     */
1050
1051    while (RESERVATION_AVAILABLE(pool->max_resv_pkt+1) &&
1052        pool->srv_resv_avail < RPC_C_DG_PKT_INIT_SERVER_RESVS)
1053    {
1054        pool->reservations += (pool->max_resv_pkt + 1);
1055        pool->srv_resv_avail++;
1056    }
1057
1058    /*
1059     * If the system is currently rationing packets, check to see if
1060     * the call which just freed this packet is blocked waiting for another
1061     * packet, and not currently using its reserved packet.  If so, wake
1062     * it up.
1063     */
1064
1065    if (RPC_DG_PKT_RATIONING(0))
1066    {
1067        if (call != NULL && call->is_in_pkt_chain && call->xq.head == NULL)
1068        {
1069            RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 3,
1070                ("(pkt_free) signalling self\n"));
1071
1072            dequeue_pool_waiter(call, &pool->pkt_waiters_head,
1073                                      &pool->pkt_waiters_tail);
1074
1075            rpc__dg_call_signal(call);
1076        }
1077    }
1078
1079    /*
1080     * If we're not rationing, check to see if there are any blocked calls
1081     * that could be woken up.
1082     */
1083
1084    else if (pool->rsv_waiters_head != NULL || pool->pkt_waiters_head != NULL)
1085    {
1086        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 3,
1087            ("(pkt_free) calling list scanner\n"));
1088
1089        scan_waiter_lists(call);
1090    }
1091    UPDATE_RATIONING_STATE(0);
1092}
1093
1094/*
1095 * R P C _ _ D G _ P K T _ F R E E _ X Q E
1096 *
1097 * Return an xqe to the packet pool.  This routine decrements the global
1098 * count of xqe's in use, and calls a common routine to actually move the
1099 * packet onto the pool list.
1100 *
1101 * The caller of this routine must hold the call lock for the call handle
1102 * on which this xqe is currently queued.
1103 */
1104
1105PRIVATE void rpc__dg_pkt_free_xqe
1106(
1107    rpc_dg_xmitq_elt_p_t xqe,
1108    rpc_dg_call_p_t call
1109)
1110{
1111    rpc_dg_xmitq_elt_p_t tmp;
1112    RPC_DG_CALL_LOCK_ASSERT(call);
1113
1114    /*
1115     * If the packet is being freed by a call handle using a private
1116     * socket, and the private socket doesn't currently contain a cached
1117     * xqe, then cache this xqe.
1118     */
1119
1120    if (call->sock_ref != NULL &&
1121       call->sock_ref->is_private &&
1122       call->sock_ref->xqe == NULL)
1123    {
1124        call->sock_ref->xqe = xqe;
1125        if (xqe->more_data == NULL)
1126        return;
1127        else
1128        {
1129            xqe = xqe->more_data;
1130            call->sock_ref->xqe->more_data = NULL;
1131        }
1132    }
1133
1134    RPC_DG_PKT_POOL_LOCK(0);
1135
1136    do
1137    {
1138        tmp = xqe->more_data;
1139    rpc_g_dg_pkt_pool.active_xqes--;
1140
1141    pkt_free((rpc_dg_pkt_pool_elt_p_t) xqe, call);
1142        xqe = tmp;
1143    } while (xqe != NULL);
1144
1145    RPC_DG_PKT_POOL_UNLOCK(0);
1146}
1147
1148/*
1149 * R P C _ _ D G _ P K T _ F R E E _ R Q E _ F O R _ S T U B
1150 *
1151 * Return an rqe to the packet pool.  This is a shell routine for callers
1152 * who know that they don't hold the call lock of any of the calls
1153 * currently waiting in the packet queue.  In particular, the call thread
1154 * itself never needs to worry about this deadlock, and so the stubs always
1155 * call this routine to free rqe's.
1156 */
1157
1158PRIVATE void rpc__dg_pkt_free_rqe_for_stub
1159(
1160    rpc_dg_recvq_elt_p_t rqe
1161)
1162{
1163    rpc__dg_pkt_free_rqe(rqe, NULL);
1164}
1165
1166/*
1167 * R P C _ _ D G _ P K T _ F R E E _ R Q E
1168 *
1169 * Return an rqe to the packet pool.  This routine decrements the global
1170 * count of rqe's in use, and calls a common routine to actually move the
1171 * packet onto the pool list.
1172 *
1173 * If the caller holds any call handle, locks it must send pass in a pointer
1174 * to that handle.  Typically this occurs when freeing an rqe which is
1175 * currently queued on a call handle.   If the caller holds no such locks,
1176 * it should pass a NULL pointer.
1177 */
1178
1179PRIVATE void rpc__dg_pkt_free_rqe
1180(
1181    rpc_dg_recvq_elt_p_t rqe,
1182    rpc_dg_call_p_t call
1183)
1184{
1185    rpc_dg_pkt_pool_elt_p_t    p = (rpc_dg_pkt_pool_elt_p_t) rqe;
1186    rpc_dg_recvq_elt_p_t tmp;
1187
1188    /*
1189     * Mark the private rqe as available to use.
1190     */
1191    if (p->u.rqe.sock_ref != NULL)
1192    {
1193        p->u.rqe.sock_ref->rqe_available = true;
1194        if (rqe->more_data == NULL)
1195            return;
1196        else
1197        {
1198            rqe = rqe->more_data;
1199            p->u.rqe.sock_ref->rqe->more_data = NULL;
1200        }
1201    }
1202
1203    RPC_DG_PKT_POOL_LOCK(0);
1204
1205    do
1206    {
1207        tmp = rqe->more_data;
1208        /*
1209         * Mark the private rqe as available to use.
1210         */
1211        if (((rpc_dg_pkt_pool_elt_p_t)rqe)->u.rqe.sock_ref != NULL)
1212        {
1213            ((rpc_dg_pkt_pool_elt_p_t)rqe)->u.rqe.sock_ref->rqe_available = true;
1214            rqe->more_data = NULL;
1215        }
1216        else
1217        {
1218            rpc_g_dg_pkt_pool.active_rqes--;
1219            pkt_free((rpc_dg_pkt_pool_elt_p_t) rqe, call);
1220        }
1221        rqe = tmp;
1222    } while (rqe != NULL);
1223
1224    RPC_DG_PKT_POOL_UNLOCK(0);
1225}
1226
1227/*
1228 * R P C _ _ D G _ P K T _ A D J U S T _ R E S E R V A T I O N
1229 *
1230 * Reserve packets from the packet pool for a call.  Depending on the 'block'
1231 * argument, this call may block until a reservation is available.
1232 *
1233 * This routine has no dependencies on the global lock.  However, to simplify
1234 * the locking requirements of its callers, it can handle being called with
1235 * or without the global lock held.  In the case of a thread using a private
1236 * socket, the adjust_reservation will always be made *without* the global lock
1237 * held.  For users of shared sockets, this call is always made *with* the
1238 * global lock held.
1239 *
1240 * In all cases, it is assumed that the input call handle is locked.
1241 */
1242
1243PRIVATE boolean32 rpc__dg_pkt_adjust_reservation
1244(
1245    rpc_dg_call_p_t call,
1246    unsigned32 nreq,
1247    boolean32 block
1248)
1249{
1250    unsigned32 st = rpc_s_ok;
1251    boolean32 got_it = false;
1252    signed32 how_many;
1253    rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool;
1254    boolean  using_private_socket = call->sock_ref->is_private;
1255
1256    if (using_private_socket == false)
1257        RPC_LOCK_ASSERT(0);
1258
1259    RPC_DG_CALL_LOCK_ASSERT(call);
1260
1261    /*
1262     * Callback handles and WAY/WAY2 handles inherit the reservation
1263     * made for the original scall/ccall.
1264     */
1265    if (call->is_cbk == true)
1266    {
1267        if (RPC_DG_CALL_IS_CLIENT(call))
1268            call->n_resvs = ((rpc_dg_ccall_p_t) call)->cbk_scall->c.n_resvs;
1269        else
1270            call->n_resvs = ((rpc_dg_scall_p_t) call)->cbk_ccall->c.n_resvs;
1271
1272        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1273   ("(rpc__dg_pkt_adjust_reservation) for callback inherited %u(%u) resvs\n",
1274                        call->n_resvs, nreq));
1275        /*
1276         * Fall through.
1277         *
1278         * Note: We don't special-case the callback scall's private socket
1279         * use. It must reserve some packets.
1280         */
1281    }
1282    else if (RPC_DG_CALL_IS_CLIENT(call) &&
1283             ((rpc_dg_ccall_p_t) call)->h->is_WAY_binding != 0)
1284    {
1285        /*
1286         * We do not allow WAY/WAY2 to adjust the reservation.
1287         */
1288        call->n_resvs = ((rpc_dg_ccall_p_t) call)->h->is_WAY_binding;
1289        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1290             ("(rpc__dg_pkt_adjust_reservation) for WAY/WAY2 %u(%u) resvs\n",
1291                        call->n_resvs, nreq));
1292
1293        return (call->n_resvs >= nreq);
1294    }
1295
1296    /*
1297     * For now, we don't release the already holding reservations.
1298     * It should be done for KRPC.
1299     */
1300    if (nreq <= call->n_resvs)
1301    {
1302        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1303              ("(rpc__dg_pkt_adjust_reservation) already has %u(%u) resvs\n",
1304                        call->n_resvs, nreq));
1305        return true;
1306    }
1307
1308    /*
1309     * Find out how many packets we really need to reserve.
1310     * We must reserve nreq+1 packets. This additional packet is
1311     * reserved for the stub. However, we do not recored it in n_resvs.
1312     * If we already have some reserved packets, then we have the packet
1313     * for the stub already reserved.
1314     */
1315
1316    how_many = nreq - call->n_resvs;
1317
1318    if (call->n_resvs == 0)
1319        how_many++;
1320
1321    RPC_DG_PKT_POOL_LOCK(0);
1322
1323    /*
1324     * It may be necessary to block the current call until a reservation can be
1325     * granted.  Just in case, wrap the following in a loop so that we can
1326     * continue trying to acquire a reservation if necessary.
1327     */
1328    while (st == rpc_s_ok)
1329    {
1330        /*
1331         * First handle the common case...
1332         */
1333        if (RESERVATION_AVAILABLE(how_many))
1334        {
1335            pool->reservations += how_many;
1336            call->n_resvs = nreq;
1337            got_it = true;
1338
1339            RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1340 ("(rpc__dg_pkt_adjust_reservation) available %u(%u), current reservations %u\n",
1341                            call->n_resvs, nreq, pool->reservations));
1342            break;
1343        }
1344
1345        /*
1346         * Next, see if there are any server-only reservations available...
1347         * iff this is the server's initial reservation
1348         * (should be for RPC_C_DG_MUST_RECV_FRAG_SIZE).
1349         *
1350         * Note: pool->max_resv_pkt should never be less than how_many.
1351         */
1352        if (call->n_resvs == 0
1353            && SRV_RESERVATION_AVAILABLE(call)
1354            && pool->max_resv_pkt+1 >= (unsigned32)how_many)
1355        {
1356            RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas,2,
1357       ("(rpc__dg_pkt_adjust_reservation) using server-only reservation %u\n",
1358                            pool->srv_resv_avail));
1359            call->n_resvs = pool->max_resv_pkt;
1360            pool->srv_resv_avail--;
1361            got_it = true;
1362
1363            RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1364 ("(rpc__dg_pkt_adjust_reservation) available %u(%u), current reservations %u\n",
1365                            call->n_resvs, nreq, pool->reservations));
1366            break;
1367        }
1368
1369        /*
1370         * It's not possible to grant a reservation at this time.  If the caller
1371         * specified non-blocking operation, return now.
1372         */
1373        if (!block)
1374        {
1375            RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1376    ("(rpc__dg_pkt_adjust_reservation) not available %u(%u), not blocking\n",
1377                            call->n_resvs, nreq));
1378
1379            RPC_DG_PKT_POOL_UNLOCK(0);
1380            return false;
1381        }
1382
1383        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1384("(pkt_rpc__dg_pkt_adjust_reservation) blocking call %u(%u), pkts %u [%s]\n",
1385            call->n_resvs, nreq,
1386            pool->free_count + pool->pkt_count,
1387            rpc__dg_act_seq_string(&call->xq.hdr)));
1388
1389        call->n_resvs_wait = how_many;
1390
1391        /*
1392         * If the call is not already on the reservation waiters' chain, queue
1393         * it.  Update the call's reference count.
1394         */
1395        if (! call->is_in_pkt_chain)
1396        {
1397            if (pool->rsv_waiters_head == NULL)
1398                pool->rsv_waiters_head = call;
1399            else
1400               pool->rsv_waiters_tail->pkt_chain = call;
1401            pool->rsv_waiters_tail = call;
1402            call->pkt_chain = NULL;
1403            call->is_in_pkt_chain = true;
1404
1405				printf("here 1\n");
1406
1407            RPC_DG_CALL_REFERENCE(call);
1408        }
1409
1410        if (using_private_socket == false)
1411            RPC_UNLOCK(0);
1412
1413        RPC_DG_PKT_POOL_UNLOCK(0);
1414
1415			printf("here 2\n");
1416        rpc__dg_call_wait(call, rpc_e_dg_wait_on_internal_event, &st);
1417			printf("here 3\n");
1418
1419        /*
1420         * Re-acquire all the locks, in the right order.
1421         */
1422
1423        if (using_private_socket == false)
1424        {
1425            RPC_DG_CALL_UNLOCK(call);
1426            RPC_LOCK(0);
1427            RPC_DG_CALL_LOCK(call);
1428        }
1429        RPC_DG_PKT_POOL_LOCK(0);
1430
1431        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1432       ("(pkt_rpc__dg_pkt_adjust_reservation) call signalled, pkts %u [%s]\n",
1433            pool->free_count + pool->pkt_count,
1434            rpc__dg_act_seq_string(&call->xq.hdr)));
1435    }
1436
1437    /*
1438     * Before returning, check to see if this call is still on the waiters'
1439     * queue.  This could happen if 1) the call got signalled for some
1440     * other reason, but woke up to find that rationing was no longer
1441     * in effect, or 2) call_wait returned with an error status.
1442     */
1443    if (call->is_in_pkt_chain)
1444        dequeue_pool_waiter(call, &pool->rsv_waiters_head,
1445                                  &pool->rsv_waiters_tail);
1446
1447    if (got_it == true)
1448        UPDATE_RATIONING_STATE(0);
1449    RPC_DG_PKT_POOL_UNLOCK(0);
1450
1451    /*
1452     * Update the original scall/ccall's reservation.
1453     *
1454     * What is the locking requirement? Can we assume that the original
1455     * scall/ccall is always locked when this is called?
1456     */
1457    if (got_it == true && call->is_cbk == true)
1458    {
1459        if (RPC_DG_CALL_IS_CLIENT(call))
1460        {
1461            ((rpc_dg_ccall_p_t) call)->cbk_scall->c.n_resvs = call->n_resvs;
1462        }
1463        else
1464        {
1465            ((rpc_dg_scall_p_t) call)->cbk_ccall->c.n_resvs = call->n_resvs;
1466        }
1467        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1468("(rpc__dg_pkt_adjust_reservation) for callback updated the original scall/ccall %u(%u) resvs\n",
1469                        call->n_resvs, nreq));
1470    }
1471    return got_it;
1472}
1473
1474/*
1475 * R P C _ _ D G _ P K T _ C A N C E L _ R E S E R V A T I O N
1476 *
1477 * Cancel the reservation owned by this call.  Consider waking up a blocked
1478 * call if the system is not rationing.
1479 *
1480 * The input call handle is expected to be locked.
1481 */
1482
1483PRIVATE void rpc__dg_pkt_cancel_reservation
1484(
1485    rpc_dg_call_p_t call
1486)
1487{
1488    rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool;
1489
1490    RPC_DG_CALL_LOCK_ASSERT(call);
1491
1492    /*
1493     * It's possible that the call handle does not have a reservation.
1494     * For instance, it might have been blocked waiting for a reservation
1495     * when a quit came in.  In such a case, just return.
1496     */
1497
1498    if (call->n_resvs == 0)
1499    {
1500        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1501                ("(rpc__dg_pkt_cancel_reservation) had no reservation\n"));
1502        return;
1503    }
1504
1505    /*
1506     * Callback handles and WAY/WAY2 handles inherit the reservation
1507     * made for the original scall/ccall.  To cancel the reservation,
1508     * simply reset the handle's flag, but leave the actual reservation
1509     * count as it is.
1510     */
1511
1512    if (call->is_cbk == true ||
1513        (RPC_DG_CALL_IS_CLIENT(call) &&
1514        ((rpc_dg_ccall_p_t) call)->h->is_WAY_binding != 0))
1515    {
1516        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1517                ("(rpc__dg_pkt_cancel_reservation) for callback handle\n"));
1518
1519        call->n_resvs = 0;
1520        return;
1521    }
1522
1523    /*
1524     * Otherwise, we'll really need to modify the pool.
1525     */
1526
1527    RPC_DG_PKT_POOL_LOCK(0);
1528
1529    pool->reservations -= (call->n_resvs + 1);
1530    call->n_resvs = 0;
1531
1532    /*
1533     * Giving up this reservation may have taken the system out of a rationing
1534     * state.  If so, try to reclaim any pre-reserved server reservations that
1535     * we may have handed out.
1536     */
1537
1538    while (RESERVATION_AVAILABLE(pool->max_resv_pkt+1) &&
1539        pool->srv_resv_avail < RPC_C_DG_PKT_INIT_SERVER_RESVS)
1540    {
1541        pool->reservations += (pool->max_resv_pkt + 1);
1542        pool->srv_resv_avail++;
1543    }
1544
1545    RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1546            ("(rpc__dg_pkt_cancel_reservation) %u reservations left\n", pool->reservations));
1547
1548    /*
1549     * Now that we've made another reservation/packets available to the system, see if
1550     * there are any calls blocked that can be woken up.
1551     */
1552
1553    if (pool->rsv_waiters_head != NULL || pool->pkt_waiters_head != NULL)
1554    {
1555        RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2,
1556                ("(rpc__dg_pkt_cancel_reservation) calling list scanner\n"));
1557
1558        scan_waiter_lists(call);
1559    }
1560    UPDATE_RATIONING_STATE(0);
1561
1562    RPC_DG_PKT_POOL_UNLOCK(0);
1563}
1564
1565/*
1566 * R P C _ M G M T _ S E T _ M A X _ C O N C U R R E N C Y
1567 *
1568 * This call allows an application (currently only DFS) to specify how many call
1569 * threads it must be able to run concurrently.  The runtime uses this information
1570 * to make sure there are enough packets in the packet pool to satisfy the
1571 * requirements of the packet rationing algorithm.
1572 */
1573
1574PUBLIC void rpc_mgmt_set_max_concurrency
1575(
1576    unsigned32 max_client_calls,
1577    unsigned32 max_server_calls,
1578    unsigned32 *status
1579)
1580{
1581    unsigned32 new_max;
1582
1583    /*
1584     * The packet rationing algorithm requires max_resv_pkt+1 packets
1585     * for each concurrent rpc thread, plus RPC_C_DG_MAX_NUM_PKTS_IN_FRAG
1586     * for the listener thread.
1587     */
1588    new_max = (rpc_g_dg_pkt_pool.max_resv_pkt + 1)
1589        * (max_client_calls + max_server_calls) + RPC_C_DG_MAX_NUM_PKTS_IN_FRAG;
1590
1591    RPC_DG_PKT_POOL_LOCK(0);
1592
1593    *status = rpc_s_ok;
1594
1595    /*
1596     * Only allow the packet pool to be expanded.
1597     */
1598    if (new_max > rpc_g_dg_pkt_pool.max_pkt_count)
1599    {
1600        rpc_g_dg_pkt_pool.pkt_count += new_max - rpc_g_dg_pkt_pool.max_pkt_count;
1601        rpc_g_dg_pkt_pool.max_pkt_count = new_max;
1602    }
1603
1604    RPC_DG_PKT_POOL_UNLOCK(0);
1605}
1606
1607/*
1608 * R P C _ M G M T _ G E T _ M A X _ C O N C U R R E N C Y
1609 *
1610 * This call allows an application (currently only DFS) to inquire about the
1611 * maximum number of call threads that can be run concurrently.
1612 */
1613
1614PUBLIC unsigned32 rpc_mgmt_get_max_concurrency(void)
1615{
1616    unsigned32 temp;
1617
1618    RPC_DG_PKT_POOL_LOCK(0);
1619
1620    /*
1621     * The listener thread has the reservation for
1622     * RPC_C_DG_MAX_NUM_PKTS_IN_FRAG.
1623     */
1624
1625    temp = (rpc_g_dg_pkt_pool.max_pkt_count - RPC_C_DG_MAX_NUM_PKTS_IN_FRAG)
1626           / (rpc_g_dg_pkt_pool.max_resv_pkt + 1);
1627
1628    RPC_DG_PKT_POOL_UNLOCK(0);
1629
1630    return (temp);
1631}
1632
1633/*
1634 * R P C _ _ D G _ P K T _ I S _ R A T I O N I N G
1635 *
1636 * Return the packet pool's rationing state.
1637 */
1638PRIVATE boolean32 rpc__dg_pkt_is_rationing
1639(
1640 boolean32 *low_on_pkts
1641)
1642{
1643    boolean32 is_rationing;
1644
1645    /*
1646     * We think that checking flags without the pkt pool locked is ok
1647     * because there is always a window in which the rationing state may
1648     * change.
1649     */
1650
1651    /*
1652     * First, determine if we are currently rationing packets.
1653     */
1654    is_rationing = rpc_g_dg_pkt_pool.is_rationing;
1655
1656    /*
1657     * See if the pool is getting low. Only
1658     * rpc__dg_call_xmit_fack()::dgcall.c uses this information to begin
1659     * backing off the sender. Avoid doing the computation if
1660     * is_rationing is already set;
1661     */
1662    if (low_on_pkts != NULL)
1663    {
1664        *low_on_pkts = rpc_g_dg_pkt_pool.low_on_pkts;
1665    }
1666
1667    return(is_rationing);
1668}
1669