Lines Matching refs:pool

72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
76 static void svc_change_space_used(SVCPOOL *pool, long delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
88 SVCPOOL *pool;
92 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
94 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
95 pool->sp_name = name;
96 pool->sp_state = SVCPOOL_INIT;
97 pool->sp_proc = NULL;
98 TAILQ_INIT(&pool->sp_callouts);
99 TAILQ_INIT(&pool->sp_lcallouts);
100 pool->sp_minthreads = 1;
101 pool->sp_maxthreads = 1;
102 pool->sp_groupcount = 1;
104 grp = &pool->sp_groups[g];
106 grp->sg_pool = pool;
122 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
123 pool->sp_space_low = (pool->sp_space_high / 3) * 2;
125 sysctl_ctx_init(&pool->sp_sysctl);
127 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
129 pool, 0, svcpool_minthread_sysctl, "I",
131 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133 pool, 0, svcpool_maxthread_sysctl, "I",
135 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 pool, 0, svcpool_threads_sysctl, "I",
139 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
140 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
143 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145 &pool->sp_space_used,
148 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150 &pool->sp_space_used_highest,
153 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
155 &pool->sp_space_high,
158 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
160 &pool->sp_space_low,
163 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
165 &pool->sp_space_throttled, 0,
168 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
170 &pool->sp_space_throttle_count, 0,
174 return pool;
178 svcpool_destroy(SVCPOOL *pool)
190 grp = &pool->sp_groups[g];
202 mtx_lock(&pool->sp_lock);
203 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
204 mtx_unlock(&pool->sp_lock);
205 svc_unreg(pool, s->sc_prog, s->sc_vers);
206 mtx_lock(&pool->sp_lock);
208 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
209 mtx_unlock(&pool->sp_lock);
210 svc_loss_unreg(pool, sl->slc_dispatch);
211 mtx_lock(&pool->sp_lock);
213 mtx_unlock(&pool->sp_lock);
216 grp = &pool->sp_groups[g];
219 mtx_destroy(&pool->sp_lock);
221 if (pool->sp_rcache)
222 replay_freecache(pool->sp_rcache);
224 sysctl_ctx_free(&pool->sp_sysctl);
225 free(pool, M_RPC);
229 * Sysctl handler to get the present thread count on a pool
234 SVCPOOL *pool;
237 pool = oidp->oid_arg1;
239 mtx_lock(&pool->sp_lock);
240 for (g = 0; g < pool->sp_groupcount; g++)
241 threads += pool->sp_groups[g].sg_threadcount;
242 mtx_unlock(&pool->sp_lock);
248 * Sysctl handler to set the minimum thread count on a pool
253 SVCPOOL *pool;
256 pool = oidp->oid_arg1;
257 newminthreads = pool->sp_minthreads;
259 if (error == 0 && newminthreads != pool->sp_minthreads) {
260 if (newminthreads > pool->sp_maxthreads)
262 mtx_lock(&pool->sp_lock);
263 pool->sp_minthreads = newminthreads;
264 for (g = 0; g < pool->sp_groupcount; g++) {
265 pool->sp_groups[g].sg_minthreads = max(1,
266 pool->sp_minthreads / pool->sp_groupcount);
268 mtx_unlock(&pool->sp_lock);
274 * Sysctl handler to set the maximum thread count on a pool
279 SVCPOOL *pool;
282 pool = oidp->oid_arg1;
283 newmaxthreads = pool->sp_maxthreads;
285 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
286 if (newmaxthreads < pool->sp_minthreads)
288 mtx_lock(&pool->sp_lock);
289 pool->sp_maxthreads = newmaxthreads;
290 for (g = 0; g < pool->sp_groupcount; g++) {
291 pool->sp_groups[g].sg_maxthreads = max(1,
292 pool->sp_maxthreads / pool->sp_groupcount);
294 mtx_unlock(&pool->sp_lock);
305 SVCPOOL *pool = xprt->xp_pool;
310 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
311 xprt->xp_group = grp = &pool->sp_groups[g];
321 * release the transport - caller must do that after dropping the pool
464 SVCPOOL *pool = xprt->xp_pool;
482 mtx_lock(&pool->sp_lock);
483 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
488 mtx_unlock(&pool->sp_lock);
495 mtx_unlock(&pool->sp_lock);
503 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
509 mtx_unlock(&pool->sp_lock);
528 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
534 mtx_lock(&pool->sp_lock);
535 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
536 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
541 mtx_unlock(&pool->sp_lock);
546 * The dispatch routine will be called when some port in ths pool die.
551 SVCPOOL *pool = xprt->xp_pool;
554 mtx_lock(&pool->sp_lock);
555 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
560 mtx_unlock(&pool->sp_lock);
565 mtx_unlock(&pool->sp_lock);
569 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
570 mtx_unlock(&pool->sp_lock);
578 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
582 mtx_lock(&pool->sp_lock);
583 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
585 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
590 mtx_unlock(&pool->sp_lock);
600 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
604 mtx_assert(&pool->sp_lock, MA_OWNED);
605 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
878 SVCPOOL *pool = xprt->xp_pool;
900 if (pool->sp_rcache) {
904 rs = replay_find(pool->sp_rcache, &msg,
961 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
973 SVCPOOL *pool = xprt->xp_pool;
983 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1045 svc_assign_waiting_sockets(SVCPOOL *pool)
1051 for (g = 0; g < pool->sp_groupcount; g++) {
1052 grp = &pool->sp_groups[g];
1065 svc_change_space_used(SVCPOOL *pool, long delta)
1069 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1071 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1072 pool->sp_space_throttled = TRUE;
1073 pool->sp_space_throttle_count++;
1075 if (value > pool->sp_space_used_highest)
1076 pool->sp_space_used_highest = value;
1078 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1079 pool->sp_space_throttled = FALSE;
1080 svc_assign_waiting_sockets(pool);
1086 svc_request_space_available(SVCPOOL *pool)
1089 if (pool->sp_space_throttled)
1097 SVCPOOL *pool = grp->sg_pool;
1108 st->st_pool = pool;
1156 if (svc_request_space_available(pool) &&
1198 svc_exit(pool);
1212 if (!svc_request_space_available(pool))
1217 svc_change_space_used(pool, rqstp->rq_size);
1222 if (pool->sp_assign) {
1223 stpref = pool->sp_assign(st, rqstp);
1249 if (!svc_request_space_available(pool) ||
1266 svc_change_space_used(pool, -sz);
1300 SVCPOOL *pool = grp->sg_pool;
1306 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1307 "%s: service", pool->sp_name);
1311 svc_run(SVCPOOL *pool)
1321 "%s: master", pool->sp_name);
1322 pool->sp_state = SVCPOOL_ACTIVE;
1323 pool->sp_proc = p;
1326 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1327 min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1328 for (g = 0; g < pool->sp_groupcount; g++) {
1329 grp = &pool->sp_groups[g];
1331 pool->sp_minthreads / pool->sp_groupcount);
1333 pool->sp_maxthreads / pool->sp_groupcount);
1338 pool->sp_groups[0].sg_threadcount++;
1339 for (g = 0; g < pool->sp_groupcount; g++) {
1340 grp = &pool->sp_groups[g];
1344 svc_run_internal(&pool->sp_groups[0], TRUE);
1347 for (g = 0; g < pool->sp_groupcount; g++) {
1348 grp = &pool->sp_groups[g];
1357 svc_exit(SVCPOOL *pool)
1363 pool->sp_state = SVCPOOL_CLOSING;
1364 for (g = 0; g < pool->sp_groupcount; g++) {
1365 grp = &pool->sp_groups[g];
1411 SVCPOOL *pool;
1415 pool = st->st_pool;
1416 if (pool->sp_done)
1417 pool->sp_done(st, rqstp);