/* * Copyright (c) 2010 Apple Inc. All rights reserved. * * @APPLE_LICENSE_HEADER_START@ * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of Apple Inc. ("Apple") nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * Portions of this software have been released under the following terms: * * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC. * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION * * To anyone who acknowledges that this file is provided "AS IS" * without any express or implied warranty: * permission to use, copy, modify, and distribute this file for any * purpose is hereby granted without fee, provided that the above * copyright notices and this notice appears in all source code copies, * and that none of the names of Open Software Foundation, Inc., Hewlett- * Packard Company or Digital Equipment Corporation be used * in advertising or publicity pertaining to distribution of the software * without specific, written prior permission. Neither Open Software * Foundation, Inc., Hewlett-Packard Company nor Digital * Equipment Corporation makes any representations about the suitability * of this software for any purpose. * * Copyright (c) 2007, Novell, Inc. All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of Novell Inc. nor the names of its contributors * may be used to endorse or promote products derived from this * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @APPLE_LICENSE_HEADER_END@ */ /* ** ** NAME: ** ** dgpkt.c ** ** FACILITY: ** ** Remote Procedure Call (RPC) ** ** ABSTRACT: ** ** Routines for managing the datagram packet pool. ** ** */ #include #include #include #include #include /* ========================================================================= */ GLOBAL rpc_dg_pkt_pool_t rpc_g_dg_pkt_pool; /* ========================================================================= */ INTERNAL rpc_dg_pkt_pool_elt_p_t pkt_alloc (void); INTERNAL void pkt_free ( rpc_dg_pkt_pool_elt_p_t /*pkt*/, rpc_dg_call_p_t /*call*/ ); INTERNAL void dequeue_pool_waiter ( rpc_dg_call_p_t /*call*/, rpc_dg_call_p_t * /*head*/, rpc_dg_call_p_t * /*tail*/ ); INTERNAL void scan_waiter_lists ( rpc_dg_call_p_t /*call*/ ); /* ========================================================================= */ /* * These macros define the pool condition under which it's okay to grant * new reservations. In the general case, there must be enough free packets to * cover all current reservations, plus n to cover the reservation being * considered. * In the second case, there are a numer of pre-reserved reservations set * aside for use by servers only. */ #define RESERVATION_AVAILABLE(n) \ (pool->free_count + pool->pkt_count >= pool->reservations + (n)) #define SRV_RESERVATION_AVAILABLE(call) \ (RPC_DG_CALL_IS_SERVER(call) && pool->srv_resv_avail > 0) /* * The macro to update the rationing state. * * First, determine if we are currently rationing packets. * If so, we'll set a flag to indicate this condition. * This information will be used by recvq_insert when deciding whether * to queue this packet to a call handle. * * Second, see if the pool is getting low. The xmit_fack routine uses * this information to begin backing off the sender. * Avoid doing the computation if is_rationing is already set; */ #define UPDATE_RATIONING_STATE(junk) \ { \ rpc_g_dg_pkt_pool.is_rationing = RPC_DG_PKT_RATIONING(0); \ rpc_g_dg_pkt_pool.low_on_pkts = rpc_g_dg_pkt_pool.is_rationing || \ (rpc_g_dg_pkt_pool.free_count + rpc_g_dg_pkt_pool.pkt_count <= \ 2 * rpc_g_dg_pkt_pool.reservations); \ } /* ========================================================================= */ /* * D E Q U E U E _ P O O L _ W A I T E R * * This routine is used to dequeue a call handle from any location within * a list of waiting calls. Calls may be waiting for 1) a reservation, * or 2) a packet; the head/tail pointer will determine which list to * search. * * The packet pool and call handle locks must be held before calling * this routine. */ INTERNAL void dequeue_pool_waiter ( rpc_dg_call_p_t call, rpc_dg_call_p_t *head, rpc_dg_call_p_t *tail ) { rpc_dg_call_p_t waiter = *head, prev = NULL; for ( ; waiter != NULL; prev = waiter, waiter = waiter->pkt_chain) { if (waiter == call) { if (prev == NULL) (*head) = call->pkt_chain; else prev->pkt_chain = call->pkt_chain; if (call->pkt_chain == NULL) (*tail) = prev; call->is_in_pkt_chain = false; /* * Decrement our reference to the call. */ if (RPC_DG_CALL_IS_SERVER(call)) { RPC_DG_SCALL_RELEASE_NO_UNLOCK((rpc_dg_scall_p_t *) &call); } else { RPC_DG_CCALL_RELEASE_NO_UNLOCK((rpc_dg_ccall_p_t *) &call); } return; } } /* * Since these lists are internal, we should never be mistaken * about what's on them. */ RPC_DBG_GPRINTF(("(dequeue_pool_waiter) No call found\n")); } /* * S C A N _ W A I T E R _ L I S T S * * This routine is called when the system is *not* in a rationing state, * but there are calls which are blocked waiting for 1) a pool reservation, * or 2) a packet. Precedence is given to reservation waiters. * * If it is possible that the caller holds the lock for one of the calls * on one of the waiters list, it must send us the address of that call * handle. Since we need to lock a call before signalling it, this allows * us to avoid trying to re-lock a call handle that has already been * locked by the caller. * * The pool must be locked before calling this routine. */ INTERNAL void scan_waiter_lists ( rpc_dg_call_p_t call ) { rpc_dg_call_p_t waiter = NULL, prev = NULL; rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool; RPC_DG_PKT_POOL_LOCK_ASSERT(0); /* * See if it's possible to wake up a blocked call. Precedence is * given to calls waiting for reservations. Note that if there are * N reservations, and N+1 free packets, it is possible to wake a * call waiting for a packet. However, we will not do so if there * are calls waiting for reservations. In that case, we just return, * since a reservation waiter requires N+2 free packets in the pool * before it can be granted a reservation. */ if (pool->rsv_waiters_head != NULL) { /* * There are two places we might find an available reservation. First * see if there are enough free packets in the pool to grant another * reservation... */ if (RESERVATION_AVAILABLE(pool->rsv_waiters_head->n_resvs_wait)) { waiter = pool->rsv_waiters_head; pool->rsv_waiters_head = waiter->pkt_chain; } /* * ... otherwise, if any of the pre-reserved server reservations * are available, see if there are any scalls on the waiters' queue. */ else if (pool->srv_resv_avail > 0) { for (waiter = pool->rsv_waiters_head; waiter != NULL; prev = waiter, waiter = waiter->pkt_chain) { if (RPC_DG_CALL_IS_SERVER(waiter) && pool->max_resv_pkt+1 >= waiter->n_resvs_wait) { if (prev == NULL) pool->rsv_waiters_head = waiter->pkt_chain; else prev->pkt_chain = waiter->pkt_chain; if (waiter->pkt_chain == NULL) pool->rsv_waiters_tail = prev; break; } } } } else if (pool->pkt_waiters_head != NULL) { if (! RPC_DG_PKT_RATIONING(0)) { waiter = pool->pkt_waiters_head; pool->pkt_waiters_head = waiter->pkt_chain; } } /* * If we weren't able to dequeue a waiter, return now. */ if (waiter == NULL) return; /* * Reset the call's flag, which is protected by the packet pool lock. */ waiter->is_in_pkt_chain = false; /* * Signalling the call requires that we hold the call's lock. This * involves unlocking the pool first to respect the locking heirarchy * (call, then pool). However, if the call we pulled off the queue * happens to be the call we were sent, it's already locked, and * the operation is somewhat simpler. */ if (call != waiter) { RPC_DG_PKT_POOL_UNLOCK(0); RPC_DG_CALL_LOCK(waiter); } rpc__dg_call_signal(waiter); /* * Since we've removed the call from the waiters' queue, we need * to release that reference. In the case where we locked the * call, unlock it. Otherwise, leave it the way we found it. */ if (call != waiter) { if (RPC_DG_CALL_IS_SERVER(waiter)) { RPC_DG_SCALL_RELEASE((rpc_dg_scall_p_t *) &waiter); } else { RPC_DG_CCALL_RELEASE((rpc_dg_ccall_p_t *) &waiter); } /* * Return the pool locked, as the caller expects. */ RPC_DG_PKT_POOL_LOCK(0); } else { if (RPC_DG_CALL_IS_SERVER(waiter)) { RPC_DG_SCALL_RELEASE_NO_UNLOCK((rpc_dg_scall_p_t *) &waiter); } else { RPC_DG_CCALL_RELEASE_NO_UNLOCK((rpc_dg_ccall_p_t *) &waiter); } } } /* * R P C _ _ D G _ P K T _ P O O L _ I N I T * * Initialize the packet pool structure. */ PRIVATE void rpc__dg_pkt_pool_init(void) { rpc_dg_pkt_pool_elt_p_t pkt; rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool; unsigned32 i; /* * Allow the total number of packets in the pool to be settable thru * a debug switch. [# of pkts = 2 ^ (level - 1)] */ #ifdef DEBUG if (RPC_DBG(rpc_es_dbg_pkt_quota_size, 1)) { pool->max_pkt_count = 1 << (rpc_g_dbg_switches[(int) rpc_es_dbg_pkt_quota_size] - 1); } else #endif { pool->max_pkt_count = RPC_C_DG_PKT_MAX; } pool->pkt_count = pool->max_pkt_count; pool->active_rqes = 0; pool->active_xqes = 0; pool->free_count = 0; pool->free_list = NULL; pool->free_list_tail = NULL; /* * The number of packets required for the largest fragment size * possible. */ pool->max_resv_pkt = RPC_C_DG_MAX_NUM_PKTS_IN_FRAG; /* * We always start out by reserving RPC_C_DG_MAX_NUM_PKTS_IN_FRAG * packets for the listener thread, plus max_resv_pkt+1 packets for * each of the pre-reserved server-side reservations. */ pool->reservations = RPC_C_DG_MAX_NUM_PKTS_IN_FRAG + ((pool->max_resv_pkt + 1) * RPC_C_DG_PKT_INIT_SERVER_RESVS); /* * Initialize the count of pre-reserved server-side reservations. * If, when a server call tries to make a reservertion, this count is greater than * 0, the server call will never need to wait. */ pool->srv_resv_avail = RPC_C_DG_PKT_INIT_SERVER_RESVS; /* * Initialize the two lists of waiters. Note that when the head * of the list is NULL, the tail pointer is undefined. */ pool->pkt_waiters_head = NULL; pool->rsv_waiters_head = NULL; /* * Allocate an initial allotment of packet buffers. See notes in * pkt_alloc() below for more details about the makeup of the packets. */ for (i = MIN(RPC_C_DG_PKT_INIT_CNT, pool->pkt_count); i > 0; i--) { RPC_MEM_ALLOC(pkt, rpc_dg_pkt_pool_elt_p_t, sizeof(rpc_dg_pkt_pool_elt_t), RPC_C_MEM_DG_PKT_POOL_ELT, RPC_C_MEM_NOWAIT); pkt->u.next = pool->free_list; pkt->is_on_free_list = true; pool->free_list = pkt; /* * Anchor the tail pointer on the first packet allocated. */ if (pool->free_list_tail == NULL) pool->free_list_tail = pkt; pool->free_count++; pool->pkt_count--; } RPC_MUTEX_INIT(pool->pkt_mutex); } /* * R P C _ _ D G _ P K T _ P O O L _ F O R K _ H A N D L E R * * Handle fork related processing for this module. */ PRIVATE void rpc__dg_pkt_pool_fork_handler ( rpc_fork_stage_id_t stage ) { rpc_dg_pkt_pool_elt_p_t pkt, next_pkt; rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool; switch ((int)stage) { case RPC_C_PREFORK: break; case RPC_C_POSTFORK_PARENT: break; case RPC_C_POSTFORK_CHILD: /* * Free any packets sitting on the free list. */ pkt = pool->free_list; while (pool->free_count--) { next_pkt = pkt->u.next; RPC_MEM_FREE(pkt, RPC_C_MEM_DG_PKT_POOL_ELT); pkt = next_pkt;; } /* * Clear out remaining fields of the packet pool. */ /*b_z_e_r_o((char *) &rpc_g_dg_pkt_pool, sizeof(rpc_dg_pkt_pool_t));*/ memset( &rpc_g_dg_pkt_pool, 0, sizeof(rpc_dg_pkt_pool_t)); break; } } /* * P K T _ A L L O C * * Return a datagram packet. The packet comes either from the free list * we're maintaining, or if the free list is empty, we alloc up a new * one. It is up to the callers of this routine to respect the packet * rationing rules; as such, this routine should always be able to return * a packet successfully. * * The packet pool lock must be held before calling this routine */ INTERNAL rpc_dg_pkt_pool_elt_p_t pkt_alloc(void) { rpc_dg_pkt_pool_elt_p_t pkt; rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool; RPC_DG_PKT_POOL_LOCK_ASSERT(0); /* * If there is an packet on the free list, use it. */ if (pool->free_list != NULL) { pkt = pool->free_list; pool->free_list = pool->free_list->u.next; pool->free_count--; /* * If the free list is now empty, then there's no longer a tail. */ if (pool->free_list == NULL) pool->free_list_tail = NULL; #ifdef DEBUG if (RPC_DBG_EXACT(rpc_es_dbg_mem,20)) { unsigned32 count; rpc_dg_pkt_pool_elt_p_t next_pkt; for (count = 0, next_pkt = pool->free_list; next_pkt != NULL; count++, next_pkt = next_pkt->u.next); if (pool->free_count != count) { RPC_DBG_PRINTF(rpc_e_dbg_mem, 20, ("(pkt_alloc) free_count mismatch: free_count (%d) != %d\n", pool->free_count, count)); /* * rpc_m_dgpkt_pool_corrupt * "(%s) DG packet free pool is corrupted" */ rpc_dce_svc_printf ( __FILE__, __LINE__, "%s", rpc_svc_dg_pkt, svc_c_sev_fatal | svc_c_action_abort, rpc_m_dgpkt_pool_corrupt, "pkt_alloc" ); } } #endif } /* * Else, we'll need to alloc up a new one. */ else { /* * Leave this assert un-ifdef'ed... it's cheap enough and it seems * to be a reasonably good indicator of other problems related to * xq/rq management. If this assert triggers, it's time to enable the * rest of the pkt pool debug stuff to help zero in on the problem * (in the past, problems have been users of the pool, not the pool * code itself). */ assert(rpc_g_dg_pkt_pool.free_count == 0); /* * First make sure we haven't already allocated the maximum number * of packets. This should never happen since the callers of * this routine are enforcing packet qoutas, which require that * at least one packet be available at all times. */ if (pool->pkt_count == 0) return (NULL); /* * Allocate up a new packet. The size of this packet is the * maximum of what we need for either an xqe or an rqe. */ RPC_MEM_ALLOC(pkt, rpc_dg_pkt_pool_elt_p_t, sizeof(rpc_dg_pkt_pool_elt_t), RPC_C_MEM_DG_PKT_POOL_ELT, RPC_C_MEM_NOWAIT); pool->pkt_count--; } RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 5, ("(pkt_alloc) pkts %u\n", pool->free_count + pool->pkt_count)); pkt->is_on_free_list = false; UPDATE_RATIONING_STATE(0); return(pkt); } /* * R P C _ _ D G _ P K T _ A L L O C _ X Q E * * Return a transmit queue element (rpc_dg_xmit_q_elt_t). Before * allocating the packet, check to see if we need to be rationing packets. * If so, and this call is already using its reserved packet, block the * call. * * Call handle lock must be held. Acquires the packet pool lock. */ PRIVATE rpc_dg_xmitq_elt_p_t rpc__dg_pkt_alloc_xqe ( rpc_dg_call_p_t call, unsigned32 *st ) { rpc_dg_pkt_pool_elt_p_t pkt; rpc_dg_xmitq_elt_p_t xqe = NULL; rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool; RPC_DG_CALL_LOCK_ASSERT(call); /* * Before taking the packet pool lock, check to see if this call * handle is using a private socket, and if the private socket has * a private xqe that can be used. */ if (call->sock_ref->is_private) { if (call->sock_ref->xqe != NULL) { xqe = call->sock_ref->xqe; call->sock_ref->xqe = NULL; xqe->next = NULL; xqe->more_data = NULL; xqe->frag_len = 0; xqe->flags = 0; xqe->body_len = 0; xqe->serial_num = 0; xqe->in_cwindow = false; return (xqe); } /* * This call handle is using a private socket, but the cached * xqe is already in use. We'll need to really allocate one; * if the call has not yet made a packet pool reservation, we * need to do that now. */ else if (call->n_resvs == 0) { unsigned32 resv ATTRIBUTE_UNUSED; /* * Only the client uses the private socket (at least for now). Thus, * we can adjust the reservation in the blocking mode. * * If we ever allow the server callback's use of the private socket, * we will need to be careful here. * * Since we have been running without a reservation, we are still in * SBF (Single Buffer Fragment) and need the minimum reservation. */ rpc__dg_pkt_adjust_reservation(call, 1, true); } } RPC_DG_PKT_POOL_LOCK(0); /* * If we are rationing packets, and this call is already using its * reserved packet, put the call on the list of packet waiters and * go to sleep. */ while (RPC_DG_PKT_RATIONING(0) && call->xq.head != NULL) { pool->blocked_alloc_xqe++; RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 3, ("(alloc_xqe) rationing, blocking call, fc %u pkt %u [%s]\n", pool->free_count, pool->pkt_count, rpc__dg_act_seq_string(&call->xq.hdr))); /* * If the call is not already on the packet waiters' chain, queue * it. Update the call's reference count. */ if (! call->is_in_pkt_chain) { if (pool->pkt_waiters_head == NULL) pool->pkt_waiters_head = call; else pool->pkt_waiters_tail->pkt_chain = call; pool->pkt_waiters_tail = call; call->pkt_chain = NULL; call->is_in_pkt_chain = true; RPC_DG_CALL_REFERENCE(call); } RPC_DG_PKT_POOL_UNLOCK(0); rpc__dg_call_wait(call, rpc_e_dg_wait_on_internal_event, st); RPC_DG_PKT_POOL_LOCK(0); if (*st != rpc_s_ok) { /* * If there's an error, get off the queue. (first make sure * that the error didn't occur after someone had already * removed us from the queue.) */ if (call->is_in_pkt_chain) dequeue_pool_waiter(call, &pool->pkt_waiters_head, &pool->pkt_waiters_tail); RPC_DG_PKT_POOL_UNLOCK(0); return(NULL); } RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 3, ("(alloc_xqe) call signalled, fc %u pkt %u [%s]\n", pool->free_count, pool->pkt_count, rpc__dg_act_seq_string(&call->xq.hdr))); } /* * Before giving up the PKT lock, check to see if this call is on * the packet queue. This could happen if the call got signalled * for some other reason, but woke up to find that there was a packet * available. */ if (call->is_in_pkt_chain) dequeue_pool_waiter(call, &pool->pkt_waiters_head, &pool->pkt_waiters_tail); /* * We now have the packet pool locked, and we know that it's * appropriate to allocate a packet. */ pkt = pkt_alloc(); /* * If the pkt pointer is NULL at this point, someone screwed up the * quotas. */ if (pkt == NULL) { RPC_DG_PKT_POOL_UNLOCK(0); RPC_DBG_GPRINTF(("(rpc__dg_pkt_alloc_xqe) No buffers available\n")); *st = rpc_s_coding_error; return(NULL); } rpc_g_dg_pkt_pool.active_xqes++; RPC_DG_PKT_POOL_UNLOCK(0); xqe = &pkt->u.xqe.xqe; xqe->body = &pkt->u.xqe.pkt; xqe->next = NULL; xqe->more_data = NULL; xqe->frag_len = 0; xqe->flags = 0; xqe->body_len = 0; xqe->serial_num = 0; xqe->in_cwindow = false; return (xqe); } /* * R P C _ _ D G _ P K T _ A L L O C _ R Q E * * Allocate a packet for reading in a network datagram. This routine * should always be able to return a packet since every packet initially * belongs to the listener thread, which has a packet reservation but * never holds onto packets. * * In the case of private sockets, the private rqe is always available * (conceptually) for reading in a packet; eventhough this new packet * may then need to be dropped because of rationing. */ PRIVATE rpc_dg_recvq_elt_p_t rpc__dg_pkt_alloc_rqe(ccall) rpc_dg_ccall_p_t ccall; { rpc_dg_pkt_pool_elt_p_t pkt; rpc_dg_recvq_elt_p_t rqe; /* * Before taking the packet pool lock, check to see if this call * handle is using a private socket, and if the private socket has * a private rqe that can be used. */ if (ccall != NULL) { if (ccall->c.sock_ref->rqe_available == true) { rqe = ccall->c.sock_ref->rqe; ccall->c.sock_ref->rqe_available = false; rqe->next = NULL; rqe->more_data = NULL; rqe->frag_len = 0; rqe->hdrp = NULL; return (rqe); } /* * This call handle is using a private socket, but the cached * rqe is already in use. We'll need to really allocate one; * if the call has not yet made a packet pool reservation, we * need to do that now. */ else if (ccall->c.n_resvs == 0) { /* * Only the client uses the private socket (at least for now). Thus, * we can adjust the reservation in the blocking mode. * * If we ever allow the server callback's use of the private socket, * we will need to be careful here. * * Note: The ccall hasn't advertised larger fragment size than the * minimum size yet. Thus the minimum reservation is required. */ rpc__dg_pkt_adjust_reservation(&ccall->c, 1, true); } } RPC_DG_PKT_POOL_LOCK(0); pkt = pkt_alloc(); /* * If the pkt pointer is NULL at this point, someone screwed up the * packet_rationing. */ if (pkt == NULL) { rpc_g_dg_pkt_pool.failed_alloc_rqe++; RPC_DG_PKT_POOL_UNLOCK(0); RPC_DBG_GPRINTF(("(rpc__dg_pkt_alloc_rqe) No buffers available\n")); return(NULL); } rpc_g_dg_pkt_pool.active_rqes++; RPC_DG_PKT_POOL_UNLOCK(0); pkt->u.rqe.sock_ref = NULL; rqe = &pkt->u.rqe.rqe; rqe->pkt_real = &pkt->u.rqe.pkt; rqe->pkt = (rpc_dg_raw_pkt_p_t) RPC_DG_ALIGN_8(rqe->pkt_real); rqe->next = NULL; rqe->more_data = NULL; rqe->frag_len = 0; rqe->hdrp = NULL; return(rqe); } /* * P K T _ F R E E * * Add a datagram packet onto the free list, and possibly signal a waiting * call. The rules for signalling calls are as follows: * * - If rationing is in effect, see if the caller is on the packet * waiters' list If so, it must be waiting for an XQE, since calls * never block waiting for RQEs. If the caller's xq->head is * NULL, it is allowed to allocate a packet. Signal it. * * - If not rationing, see if there are calls waiting for either * a reservation or a packet. If so, signal one. * * The packet pool lock must be held before calling this routine. It * may be that the caller also holds a call lock. If it is possible * that this locked call might represent a call that is currently waiting * on the packet waiters' queue, we need to be sent the address of the * call handle. This enables us to avoid trying to lock the call twice * if we happen to pull that call handle off a queue. * * This scenario is only possible when this routine is called from the * listener thread, since if we are being called by the call thread we * can be sure that the thread is not waiting on the queue. In particular, * when the stubs free rqes, the call handle we receive will be NULL, since * there is no danger that the call is currently queued. */ INTERNAL void pkt_free ( rpc_dg_pkt_pool_elt_p_t pkt, rpc_dg_call_p_t call ) { rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool; RPC_DG_PKT_POOL_LOCK_ASSERT(0); assert(pkt != NULL); #ifdef DEBUG /* * Do some sanity checking on the packet pool. */ if (RPC_DBG_EXACT(rpc_es_dbg_mem,20)) { unsigned32 count; rpc_dg_pkt_pool_elt_p_t next_pkt; for (count = 0, next_pkt = pool->free_list; next_pkt != NULL; count++, next_pkt = next_pkt->u.next) { if (next_pkt == pkt) { RPC_DBG_PRINTF(rpc_e_dbg_mem, 20, ("(pkt_free) pkt already on free_list %p (%d)\n", pkt, count)); /* * rpc_m_dgpkt_pool_corrupt * "(%s) DG packet free pool is corrupted" */ rpc_dce_svc_printf ( __FILE__, __LINE__, "%s", rpc_svc_dg_pkt, svc_c_sev_fatal | svc_c_action_abort, rpc_m_dgpkt_pool_corrupt, "pkt_free" ); } if (!next_pkt->is_on_free_list) { RPC_DBG_PRINTF(rpc_e_dbg_mem, 20, ("(pkt_free) free'ed pkt(%p) is not marked as free (%d)\n", next_pkt, count)); /* * rpc_m_dgpkt_pool_corrupt * "(%s) DG packet free pool is corrupted" */ rpc_dce_svc_printf ( __FILE__, __LINE__, "%s", rpc_svc_dg_pkt, svc_c_sev_fatal | svc_c_action_abort, rpc_m_dgpkt_pool_corrupt, "pkt_free" ); } } if (pool->free_count != count) { RPC_DBG_PRINTF(rpc_e_dbg_mem, 20, ("(pkt_free) free_count mismatch: free_count (%d) != %d\n", pool->free_count, count)); /* * rpc_m_dgpkt_pool_corrupt * "(%s) DG packet free pool is corrupted" */ rpc_dce_svc_printf ( __FILE__, __LINE__, "%s", rpc_svc_dg_pkt, svc_c_sev_fatal | svc_c_action_abort, rpc_m_dgpkt_pool_corrupt, "pkt_free" ); } if (pkt->is_on_free_list) { RPC_DBG_PRINTF(rpc_e_dbg_mem, 20, ("(pkt_free) double free'ing pkt(%p)\n", pkt)); /* * rpc_m_dgpkt_bad_free * "(%s) Attempt to free already-freed DG packet" */ rpc_dce_svc_printf ( __FILE__, __LINE__, "%s", rpc_svc_dg_pkt, svc_c_sev_fatal | svc_c_action_abort, rpc_m_dgpkt_bad_free, "pkt_free" ); } } #endif if (pkt->is_on_free_list) { return; } /* * Move the packet onto the tail of the pool's free list. */ if (pool->free_list == NULL) { pool->free_list = pool->free_list_tail = pkt; } else { pool->free_list_tail->u.next = pkt; pool->free_list_tail = pkt; } pkt->u.next = NULL; pkt->is_on_free_list = true; pool->free_count++; RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 5, ("(pkt_free) pkts %u\n", pool->free_count + pool->pkt_count)); /* * Giving up this packet may have taken the system out of a rationing * state. If so, try to reclaim any pre-reserved server reservations that * we may be outstanding. */ while (RESERVATION_AVAILABLE(pool->max_resv_pkt+1) && pool->srv_resv_avail < RPC_C_DG_PKT_INIT_SERVER_RESVS) { pool->reservations += (pool->max_resv_pkt + 1); pool->srv_resv_avail++; } /* * If the system is currently rationing packets, check to see if * the call which just freed this packet is blocked waiting for another * packet, and not currently using its reserved packet. If so, wake * it up. */ if (RPC_DG_PKT_RATIONING(0)) { if (call != NULL && call->is_in_pkt_chain && call->xq.head == NULL) { RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 3, ("(pkt_free) signalling self\n")); dequeue_pool_waiter(call, &pool->pkt_waiters_head, &pool->pkt_waiters_tail); rpc__dg_call_signal(call); } } /* * If we're not rationing, check to see if there are any blocked calls * that could be woken up. */ else if (pool->rsv_waiters_head != NULL || pool->pkt_waiters_head != NULL) { RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 3, ("(pkt_free) calling list scanner\n")); scan_waiter_lists(call); } UPDATE_RATIONING_STATE(0); } /* * R P C _ _ D G _ P K T _ F R E E _ X Q E * * Return an xqe to the packet pool. This routine decrements the global * count of xqe's in use, and calls a common routine to actually move the * packet onto the pool list. * * The caller of this routine must hold the call lock for the call handle * on which this xqe is currently queued. */ PRIVATE void rpc__dg_pkt_free_xqe ( rpc_dg_xmitq_elt_p_t xqe, rpc_dg_call_p_t call ) { rpc_dg_xmitq_elt_p_t tmp; RPC_DG_CALL_LOCK_ASSERT(call); /* * If the packet is being freed by a call handle using a private * socket, and the private socket doesn't currently contain a cached * xqe, then cache this xqe. */ if (call->sock_ref != NULL && call->sock_ref->is_private && call->sock_ref->xqe == NULL) { call->sock_ref->xqe = xqe; if (xqe->more_data == NULL) return; else { xqe = xqe->more_data; call->sock_ref->xqe->more_data = NULL; } } RPC_DG_PKT_POOL_LOCK(0); do { tmp = xqe->more_data; rpc_g_dg_pkt_pool.active_xqes--; pkt_free((rpc_dg_pkt_pool_elt_p_t) xqe, call); xqe = tmp; } while (xqe != NULL); RPC_DG_PKT_POOL_UNLOCK(0); } /* * R P C _ _ D G _ P K T _ F R E E _ R Q E _ F O R _ S T U B * * Return an rqe to the packet pool. This is a shell routine for callers * who know that they don't hold the call lock of any of the calls * currently waiting in the packet queue. In particular, the call thread * itself never needs to worry about this deadlock, and so the stubs always * call this routine to free rqe's. */ PRIVATE void rpc__dg_pkt_free_rqe_for_stub ( rpc_dg_recvq_elt_p_t rqe ) { rpc__dg_pkt_free_rqe(rqe, NULL); } /* * R P C _ _ D G _ P K T _ F R E E _ R Q E * * Return an rqe to the packet pool. This routine decrements the global * count of rqe's in use, and calls a common routine to actually move the * packet onto the pool list. * * If the caller holds any call handle, locks it must send pass in a pointer * to that handle. Typically this occurs when freeing an rqe which is * currently queued on a call handle. If the caller holds no such locks, * it should pass a NULL pointer. */ PRIVATE void rpc__dg_pkt_free_rqe ( rpc_dg_recvq_elt_p_t rqe, rpc_dg_call_p_t call ) { rpc_dg_pkt_pool_elt_p_t p = (rpc_dg_pkt_pool_elt_p_t) rqe; rpc_dg_recvq_elt_p_t tmp; /* * Mark the private rqe as available to use. */ if (p->u.rqe.sock_ref != NULL) { p->u.rqe.sock_ref->rqe_available = true; if (rqe->more_data == NULL) return; else { rqe = rqe->more_data; p->u.rqe.sock_ref->rqe->more_data = NULL; } } RPC_DG_PKT_POOL_LOCK(0); do { tmp = rqe->more_data; /* * Mark the private rqe as available to use. */ if (((rpc_dg_pkt_pool_elt_p_t)rqe)->u.rqe.sock_ref != NULL) { ((rpc_dg_pkt_pool_elt_p_t)rqe)->u.rqe.sock_ref->rqe_available = true; rqe->more_data = NULL; } else { rpc_g_dg_pkt_pool.active_rqes--; pkt_free((rpc_dg_pkt_pool_elt_p_t) rqe, call); } rqe = tmp; } while (rqe != NULL); RPC_DG_PKT_POOL_UNLOCK(0); } /* * R P C _ _ D G _ P K T _ A D J U S T _ R E S E R V A T I O N * * Reserve packets from the packet pool for a call. Depending on the 'block' * argument, this call may block until a reservation is available. * * This routine has no dependencies on the global lock. However, to simplify * the locking requirements of its callers, it can handle being called with * or without the global lock held. In the case of a thread using a private * socket, the adjust_reservation will always be made *without* the global lock * held. For users of shared sockets, this call is always made *with* the * global lock held. * * In all cases, it is assumed that the input call handle is locked. */ PRIVATE boolean32 rpc__dg_pkt_adjust_reservation ( rpc_dg_call_p_t call, unsigned32 nreq, boolean32 block ) { unsigned32 st = rpc_s_ok; boolean32 got_it = false; signed32 how_many; rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool; boolean using_private_socket = call->sock_ref->is_private; if (using_private_socket == false) RPC_LOCK_ASSERT(0); RPC_DG_CALL_LOCK_ASSERT(call); /* * Callback handles and WAY/WAY2 handles inherit the reservation * made for the original scall/ccall. */ if (call->is_cbk == true) { if (RPC_DG_CALL_IS_CLIENT(call)) call->n_resvs = ((rpc_dg_ccall_p_t) call)->cbk_scall->c.n_resvs; else call->n_resvs = ((rpc_dg_scall_p_t) call)->cbk_ccall->c.n_resvs; RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_adjust_reservation) for callback inherited %u(%u) resvs\n", call->n_resvs, nreq)); /* * Fall through. * * Note: We don't special-case the callback scall's private socket * use. It must reserve some packets. */ } else if (RPC_DG_CALL_IS_CLIENT(call) && ((rpc_dg_ccall_p_t) call)->h->is_WAY_binding != 0) { /* * We do not allow WAY/WAY2 to adjust the reservation. */ call->n_resvs = ((rpc_dg_ccall_p_t) call)->h->is_WAY_binding; RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_adjust_reservation) for WAY/WAY2 %u(%u) resvs\n", call->n_resvs, nreq)); return (call->n_resvs >= nreq); } /* * For now, we don't release the already holding reservations. * It should be done for KRPC. */ if (nreq <= call->n_resvs) { RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_adjust_reservation) already has %u(%u) resvs\n", call->n_resvs, nreq)); return true; } /* * Find out how many packets we really need to reserve. * We must reserve nreq+1 packets. This additional packet is * reserved for the stub. However, we do not recored it in n_resvs. * If we already have some reserved packets, then we have the packet * for the stub already reserved. */ how_many = nreq - call->n_resvs; if (call->n_resvs == 0) how_many++; RPC_DG_PKT_POOL_LOCK(0); /* * It may be necessary to block the current call until a reservation can be * granted. Just in case, wrap the following in a loop so that we can * continue trying to acquire a reservation if necessary. */ while (st == rpc_s_ok) { /* * First handle the common case... */ if (RESERVATION_AVAILABLE(how_many)) { pool->reservations += how_many; call->n_resvs = nreq; got_it = true; RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_adjust_reservation) available %u(%u), current reservations %u\n", call->n_resvs, nreq, pool->reservations)); break; } /* * Next, see if there are any server-only reservations available... * iff this is the server's initial reservation * (should be for RPC_C_DG_MUST_RECV_FRAG_SIZE). * * Note: pool->max_resv_pkt should never be less than how_many. */ if (call->n_resvs == 0 && SRV_RESERVATION_AVAILABLE(call) && pool->max_resv_pkt+1 >= (unsigned32)how_many) { RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas,2, ("(rpc__dg_pkt_adjust_reservation) using server-only reservation %u\n", pool->srv_resv_avail)); call->n_resvs = pool->max_resv_pkt; pool->srv_resv_avail--; got_it = true; RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_adjust_reservation) available %u(%u), current reservations %u\n", call->n_resvs, nreq, pool->reservations)); break; } /* * It's not possible to grant a reservation at this time. If the caller * specified non-blocking operation, return now. */ if (!block) { RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_adjust_reservation) not available %u(%u), not blocking\n", call->n_resvs, nreq)); RPC_DG_PKT_POOL_UNLOCK(0); return false; } RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(pkt_rpc__dg_pkt_adjust_reservation) blocking call %u(%u), pkts %u [%s]\n", call->n_resvs, nreq, pool->free_count + pool->pkt_count, rpc__dg_act_seq_string(&call->xq.hdr))); call->n_resvs_wait = how_many; /* * If the call is not already on the reservation waiters' chain, queue * it. Update the call's reference count. */ if (! call->is_in_pkt_chain) { if (pool->rsv_waiters_head == NULL) pool->rsv_waiters_head = call; else pool->rsv_waiters_tail->pkt_chain = call; pool->rsv_waiters_tail = call; call->pkt_chain = NULL; call->is_in_pkt_chain = true; printf("here 1\n"); RPC_DG_CALL_REFERENCE(call); } if (using_private_socket == false) RPC_UNLOCK(0); RPC_DG_PKT_POOL_UNLOCK(0); printf("here 2\n"); rpc__dg_call_wait(call, rpc_e_dg_wait_on_internal_event, &st); printf("here 3\n"); /* * Re-acquire all the locks, in the right order. */ if (using_private_socket == false) { RPC_DG_CALL_UNLOCK(call); RPC_LOCK(0); RPC_DG_CALL_LOCK(call); } RPC_DG_PKT_POOL_LOCK(0); RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(pkt_rpc__dg_pkt_adjust_reservation) call signalled, pkts %u [%s]\n", pool->free_count + pool->pkt_count, rpc__dg_act_seq_string(&call->xq.hdr))); } /* * Before returning, check to see if this call is still on the waiters' * queue. This could happen if 1) the call got signalled for some * other reason, but woke up to find that rationing was no longer * in effect, or 2) call_wait returned with an error status. */ if (call->is_in_pkt_chain) dequeue_pool_waiter(call, &pool->rsv_waiters_head, &pool->rsv_waiters_tail); if (got_it == true) UPDATE_RATIONING_STATE(0); RPC_DG_PKT_POOL_UNLOCK(0); /* * Update the original scall/ccall's reservation. * * What is the locking requirement? Can we assume that the original * scall/ccall is always locked when this is called? */ if (got_it == true && call->is_cbk == true) { if (RPC_DG_CALL_IS_CLIENT(call)) { ((rpc_dg_ccall_p_t) call)->cbk_scall->c.n_resvs = call->n_resvs; } else { ((rpc_dg_scall_p_t) call)->cbk_ccall->c.n_resvs = call->n_resvs; } RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_adjust_reservation) for callback updated the original scall/ccall %u(%u) resvs\n", call->n_resvs, nreq)); } return got_it; } /* * R P C _ _ D G _ P K T _ C A N C E L _ R E S E R V A T I O N * * Cancel the reservation owned by this call. Consider waking up a blocked * call if the system is not rationing. * * The input call handle is expected to be locked. */ PRIVATE void rpc__dg_pkt_cancel_reservation ( rpc_dg_call_p_t call ) { rpc_dg_pkt_pool_t *pool = &rpc_g_dg_pkt_pool; RPC_DG_CALL_LOCK_ASSERT(call); /* * It's possible that the call handle does not have a reservation. * For instance, it might have been blocked waiting for a reservation * when a quit came in. In such a case, just return. */ if (call->n_resvs == 0) { RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_cancel_reservation) had no reservation\n")); return; } /* * Callback handles and WAY/WAY2 handles inherit the reservation * made for the original scall/ccall. To cancel the reservation, * simply reset the handle's flag, but leave the actual reservation * count as it is. */ if (call->is_cbk == true || (RPC_DG_CALL_IS_CLIENT(call) && ((rpc_dg_ccall_p_t) call)->h->is_WAY_binding != 0)) { RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_cancel_reservation) for callback handle\n")); call->n_resvs = 0; return; } /* * Otherwise, we'll really need to modify the pool. */ RPC_DG_PKT_POOL_LOCK(0); pool->reservations -= (call->n_resvs + 1); call->n_resvs = 0; /* * Giving up this reservation may have taken the system out of a rationing * state. If so, try to reclaim any pre-reserved server reservations that * we may have handed out. */ while (RESERVATION_AVAILABLE(pool->max_resv_pkt+1) && pool->srv_resv_avail < RPC_C_DG_PKT_INIT_SERVER_RESVS) { pool->reservations += (pool->max_resv_pkt + 1); pool->srv_resv_avail++; } RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_cancel_reservation) %u reservations left\n", pool->reservations)); /* * Now that we've made another reservation/packets available to the system, see if * there are any calls blocked that can be woken up. */ if (pool->rsv_waiters_head != NULL || pool->pkt_waiters_head != NULL) { RPC_DBG_PRINTF(rpc_e_dbg_pkt_quotas, 2, ("(rpc__dg_pkt_cancel_reservation) calling list scanner\n")); scan_waiter_lists(call); } UPDATE_RATIONING_STATE(0); RPC_DG_PKT_POOL_UNLOCK(0); } /* * R P C _ M G M T _ S E T _ M A X _ C O N C U R R E N C Y * * This call allows an application (currently only DFS) to specify how many call * threads it must be able to run concurrently. The runtime uses this information * to make sure there are enough packets in the packet pool to satisfy the * requirements of the packet rationing algorithm. */ PUBLIC void rpc_mgmt_set_max_concurrency ( unsigned32 max_client_calls, unsigned32 max_server_calls, unsigned32 *status ) { unsigned32 new_max; /* * The packet rationing algorithm requires max_resv_pkt+1 packets * for each concurrent rpc thread, plus RPC_C_DG_MAX_NUM_PKTS_IN_FRAG * for the listener thread. */ new_max = (rpc_g_dg_pkt_pool.max_resv_pkt + 1) * (max_client_calls + max_server_calls) + RPC_C_DG_MAX_NUM_PKTS_IN_FRAG; RPC_DG_PKT_POOL_LOCK(0); *status = rpc_s_ok; /* * Only allow the packet pool to be expanded. */ if (new_max > rpc_g_dg_pkt_pool.max_pkt_count) { rpc_g_dg_pkt_pool.pkt_count += new_max - rpc_g_dg_pkt_pool.max_pkt_count; rpc_g_dg_pkt_pool.max_pkt_count = new_max; } RPC_DG_PKT_POOL_UNLOCK(0); } /* * R P C _ M G M T _ G E T _ M A X _ C O N C U R R E N C Y * * This call allows an application (currently only DFS) to inquire about the * maximum number of call threads that can be run concurrently. */ PUBLIC unsigned32 rpc_mgmt_get_max_concurrency(void) { unsigned32 temp; RPC_DG_PKT_POOL_LOCK(0); /* * The listener thread has the reservation for * RPC_C_DG_MAX_NUM_PKTS_IN_FRAG. */ temp = (rpc_g_dg_pkt_pool.max_pkt_count - RPC_C_DG_MAX_NUM_PKTS_IN_FRAG) / (rpc_g_dg_pkt_pool.max_resv_pkt + 1); RPC_DG_PKT_POOL_UNLOCK(0); return (temp); } /* * R P C _ _ D G _ P K T _ I S _ R A T I O N I N G * * Return the packet pool's rationing state. */ PRIVATE boolean32 rpc__dg_pkt_is_rationing ( boolean32 *low_on_pkts ) { boolean32 is_rationing; /* * We think that checking flags without the pkt pool locked is ok * because there is always a window in which the rationing state may * change. */ /* * First, determine if we are currently rationing packets. */ is_rationing = rpc_g_dg_pkt_pool.is_rationing; /* * See if the pool is getting low. Only * rpc__dg_call_xmit_fack()::dgcall.c uses this information to begin * backing off the sender. Avoid doing the computation if * is_rationing is already set; */ if (low_on_pkts != NULL) { *low_on_pkts = rpc_g_dg_pkt_pool.low_on_pkts; } return(is_rationing); }