Lines Matching refs:pool

71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73 static void svc_new_thread(SVCPOOL *pool);
75 static void svc_change_space_used(SVCPOOL *pool, int delta);
76 static bool_t svc_request_space_available(SVCPOOL *pool);
86 SVCPOOL *pool;
88 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
90 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
91 pool->sp_name = name;
92 pool->sp_state = SVCPOOL_INIT;
93 pool->sp_proc = NULL;
94 TAILQ_INIT(&pool->sp_xlist);
95 TAILQ_INIT(&pool->sp_active);
96 TAILQ_INIT(&pool->sp_callouts);
97 TAILQ_INIT(&pool->sp_lcallouts);
98 LIST_INIT(&pool->sp_threads);
99 LIST_INIT(&pool->sp_idlethreads);
100 pool->sp_minthreads = 1;
101 pool->sp_maxthreads = 1;
102 pool->sp_threadcount = 0;
108 pool->sp_space_high = nmbclusters * MCLBYTES / 4;
109 if (pool->sp_space_high > 45 << 20)
110 pool->sp_space_high = 45 << 20;
111 pool->sp_space_low = 2 * pool->sp_space_high / 3;
113 sysctl_ctx_init(&pool->sp_sysctl);
115 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
117 pool, 0, svcpool_minthread_sysctl, "I", "");
118 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
120 pool, 0, svcpool_maxthread_sysctl, "I", "");
121 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122 "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
124 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
126 &pool->sp_space_used, 0,
129 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131 &pool->sp_space_used_highest, 0,
134 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136 &pool->sp_space_high, 0,
139 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141 &pool->sp_space_low, 0,
144 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
146 &pool->sp_space_throttled, 0,
149 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
151 &pool->sp_space_throttle_count, 0,
155 return pool;
159 svcpool_destroy(SVCPOOL *pool)
167 mtx_lock(&pool->sp_lock);
169 while (TAILQ_FIRST(&pool->sp_xlist)) {
170 xprt = TAILQ_FIRST(&pool->sp_xlist);
175 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
176 mtx_unlock(&pool->sp_lock);
177 svc_unreg(pool, s->sc_prog, s->sc_vers);
178 mtx_lock(&pool->sp_lock);
180 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
181 mtx_unlock(&pool->sp_lock);
182 svc_loss_unreg(pool, sl->slc_dispatch);
183 mtx_lock(&pool->sp_lock);
185 mtx_unlock(&pool->sp_lock);
191 mtx_destroy(&pool->sp_lock);
193 if (pool->sp_rcache)
194 replay_freecache(pool->sp_rcache);
196 sysctl_ctx_free(&pool->sp_sysctl);
197 free(pool, M_RPC);
201 svcpool_active(SVCPOOL *pool)
203 enum svcpool_state state = pool->sp_state;
211 * Sysctl handler to set the minimum thread count on a pool
216 SVCPOOL *pool;
219 pool = oidp->oid_arg1;
220 newminthreads = pool->sp_minthreads;
222 if (error == 0 && newminthreads != pool->sp_minthreads) {
223 if (newminthreads > pool->sp_maxthreads)
225 mtx_lock(&pool->sp_lock);
226 if (newminthreads > pool->sp_minthreads
227 && svcpool_active(pool)) {
229 * If the pool is running and we are
232 n = newminthreads - pool->sp_threadcount;
234 mtx_unlock(&pool->sp_lock);
236 svc_new_thread(pool);
237 mtx_lock(&pool->sp_lock);
240 pool->sp_minthreads = newminthreads;
241 mtx_unlock(&pool->sp_lock);
247 * Sysctl handler to set the maximum thread count on a pool
252 SVCPOOL *pool;
256 pool = oidp->oid_arg1;
257 newmaxthreads = pool->sp_maxthreads;
259 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
260 if (newmaxthreads < pool->sp_minthreads)
262 mtx_lock(&pool->sp_lock);
263 if (newmaxthreads < pool->sp_maxthreads
264 && svcpool_active(pool)) {
266 * If the pool is running and we are
270 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
273 pool->sp_maxthreads = newmaxthreads;
274 mtx_unlock(&pool->sp_lock);
285 SVCPOOL *pool = xprt->xp_pool;
288 mtx_lock(&pool->sp_lock);
291 TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
292 mtx_unlock(&pool->sp_lock);
297 * release the transport - caller must do that after dropping the pool
303 SVCPOOL *pool = xprt->xp_pool;
305 mtx_assert(&pool->sp_lock, MA_OWNED);
309 TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
316 SVCPOOL *pool = xprt->xp_pool;
318 mtx_lock(&pool->sp_lock);
321 mtx_unlock(&pool->sp_lock);
325 mtx_unlock(&pool->sp_lock);
336 SVCPOOL *pool = xprt->xp_pool;
339 mtx_assert(&pool->sp_lock, MA_OWNED);
340 st = LIST_FIRST(&pool->sp_idlethreads);
358 if (pool->sp_state == SVCPOOL_ACTIVE
359 && pool->sp_lastcreatetime < time_uptime
360 && pool->sp_threadcount < pool->sp_maxthreads) {
361 pool->sp_state = SVCPOOL_THREADWANTED;
370 SVCPOOL *pool = xprt->xp_pool;
372 mtx_lock(&pool->sp_lock);
378 mtx_unlock(&pool->sp_lock);
385 if (!svc_request_space_available(pool) ||
387 TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
392 mtx_unlock(&pool->sp_lock);
398 SVCPOOL *pool = xprt->xp_pool;
400 mtx_assert(&pool->sp_lock, MA_OWNED);
403 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
411 SVCPOOL *pool = xprt->xp_pool;
413 mtx_lock(&pool->sp_lock);
415 mtx_unlock(&pool->sp_lock);
441 SVCPOOL *pool = xprt->xp_pool;
459 mtx_lock(&pool->sp_lock);
460 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
465 mtx_unlock(&pool->sp_lock);
472 mtx_unlock(&pool->sp_lock);
480 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
486 mtx_unlock(&pool->sp_lock);
505 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
511 mtx_lock(&pool->sp_lock);
512 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
513 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
518 mtx_unlock(&pool->sp_lock);
523 * The dispatch routine will be called when some port in ths pool die.
528 SVCPOOL *pool = xprt->xp_pool;
531 mtx_lock(&pool->sp_lock);
532 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
537 mtx_unlock(&pool->sp_lock);
542 mtx_unlock(&pool->sp_lock);
546 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
547 mtx_unlock(&pool->sp_lock);
555 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
559 mtx_lock(&pool->sp_lock);
560 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
562 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
567 mtx_unlock(&pool->sp_lock);
577 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
581 mtx_assert(&pool->sp_lock, MA_OWNED);
582 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
860 SVCPOOL *pool = xprt->xp_pool;
882 if (pool->sp_rcache) {
886 rs = replay_find(pool->sp_rcache, &msg,
943 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
955 SVCPOOL *pool = xprt->xp_pool;
965 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
997 svc_checkidle(SVCPOOL *pool)
1004 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
1019 mtx_unlock(&pool->sp_lock);
1023 mtx_lock(&pool->sp_lock);
1028 svc_assign_waiting_sockets(SVCPOOL *pool)
1032 mtx_lock(&pool->sp_lock);
1033 while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
1035 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1039 mtx_unlock(&pool->sp_lock);
1043 svc_change_space_used(SVCPOOL *pool, int delta)
1047 value = atomic_fetchadd_int(&pool->sp_space_used, delta) + delta;
1049 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1050 pool->sp_space_throttled = TRUE;
1051 pool->sp_space_throttle_count++;
1053 if (value > pool->sp_space_used_highest)
1054 pool->sp_space_used_highest = value;
1056 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1057 pool->sp_space_throttled = FALSE;
1058 svc_assign_waiting_sockets(pool);
1064 svc_request_space_available(SVCPOOL *pool)
1067 if (pool->sp_space_throttled)
1073 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
1084 st->st_pool = pool;
1090 mtx_lock(&pool->sp_lock);
1091 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
1097 if (pool->sp_state == SVCPOOL_THREADSTARTING)
1098 pool->sp_state = SVCPOOL_ACTIVE;
1100 while (pool->sp_state != SVCPOOL_CLOSING) {
1104 if (pool->sp_state == SVCPOOL_THREADWANTED) {
1105 pool->sp_state = SVCPOOL_THREADSTARTING;
1106 pool->sp_lastcreatetime = time_uptime;
1107 mtx_unlock(&pool->sp_lock);
1108 svc_new_thread(pool);
1109 mtx_lock(&pool->sp_lock);
1116 if (time_uptime > pool->sp_lastidlecheck) {
1117 pool->sp_lastidlecheck = time_uptime;
1118 svc_checkidle(pool);
1126 if (pool->sp_threadcount > pool->sp_maxthreads)
1134 if (svc_request_space_available(pool) &&
1135 (xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
1136 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1143 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1146 pool->sp_threadcount > pool->sp_minthreads))
1148 &pool->sp_lock, 5 * hz);
1151 &pool->sp_lock);
1162 && (pool->sp_threadcount
1163 > pool->sp_minthreads)
1168 mtx_unlock(&pool->sp_lock);
1169 svc_exit(pool);
1170 mtx_lock(&pool->sp_lock);
1183 if (!svc_request_space_available(pool))
1185 mtx_unlock(&pool->sp_lock);
1189 svc_change_space_used(pool, rqstp->rq_size);
1196 if (pool->sp_assign)
1197 stpref = pool->sp_assign(st,
1200 mtx_lock(&pool->sp_lock);
1225 mtx_lock(&pool->sp_lock);
1227 && pool->sp_state != SVCPOOL_CLOSING);
1240 if (!svc_request_space_available(pool) ||
1242 TAILQ_INSERT_TAIL(&pool->sp_active,
1246 mtx_unlock(&pool->sp_lock);
1250 mtx_unlock(&pool->sp_lock);
1262 svc_change_space_used(pool, -sz);
1263 mtx_lock(&pool->sp_lock);
1274 pool->sp_threadcount--;
1276 mtx_unlock(&pool->sp_lock);
1282 wakeup(pool);
1294 svc_new_thread(SVCPOOL *pool)
1298 pool->sp_threadcount++;
1299 kthread_add(svc_thread_start, pool,
1300 pool->sp_proc, &td, 0, 0,
1301 "%s: service", pool->sp_name);
1305 svc_run(SVCPOOL *pool)
1314 "%s: master", pool->sp_name);
1315 pool->sp_state = SVCPOOL_ACTIVE;
1316 pool->sp_proc = p;
1317 pool->sp_lastcreatetime = time_uptime;
1318 pool->sp_threadcount = 1;
1320 for (i = 1; i < pool->sp_minthreads; i++) {
1321 svc_new_thread(pool);
1324 svc_run_internal(pool, TRUE);
1326 mtx_lock(&pool->sp_lock);
1327 while (pool->sp_threadcount > 0)
1328 msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1329 mtx_unlock(&pool->sp_lock);
1333 svc_exit(SVCPOOL *pool)
1337 mtx_lock(&pool->sp_lock);
1339 if (pool->sp_state != SVCPOOL_CLOSING) {
1340 pool->sp_state = SVCPOOL_CLOSING;
1341 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1345 mtx_unlock(&pool->sp_lock);
1383 SVCPOOL *pool;
1387 pool = st->st_pool;
1388 if (pool->sp_done)
1389 pool->sp_done(st, rqstp);