1/* 2 * linux/net/sunrpc/svc_xprt.c 3 * 4 * Author: Tom Tucker <tom@opengridcomputing.com> 5 */ 6 7#include <linux/sched.h> 8#include <linux/smp_lock.h> 9#include <linux/errno.h> 10#include <linux/freezer.h> 11#include <linux/kthread.h> 12#include <linux/slab.h> 13#include <net/sock.h> 14#include <linux/sunrpc/stats.h> 15#include <linux/sunrpc/svc_xprt.h> 16#include <linux/sunrpc/svcsock.h> 17 18#define RPCDBG_FACILITY RPCDBG_SVCXPRT 19 20static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); 21static int svc_deferred_recv(struct svc_rqst *rqstp); 22static struct cache_deferred_req *svc_defer(struct cache_req *req); 23static void svc_age_temp_xprts(unsigned long closure); 24 25/* apparently the "standard" is that clients close 26 * idle connections after 5 minutes, servers after 27 * 6 minutes 28 * http://www.connectathon.org/talks96/nfstcp.pdf 29 */ 30static int svc_conn_age_period = 6*60; 31 32/* List of registered transport classes */ 33static DEFINE_SPINLOCK(svc_xprt_class_lock); 34static LIST_HEAD(svc_xprt_class_list); 35 36/* SMP locking strategy: 37 * 38 * svc_pool->sp_lock protects most of the fields of that pool. 39 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 40 * when both need to be taken (rare), svc_serv->sv_lock is first. 41 * BKL protects svc_serv->sv_nrthread. 42 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 43 * and the ->sk_info_authunix cache. 44 * 45 * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being 46 * enqueued multiply. During normal transport processing this bit 47 * is set by svc_xprt_enqueue and cleared by svc_xprt_received. 48 * Providers should not manipulate this bit directly. 49 * 50 * Some flags can be set to certain values at any time 51 * providing that certain rules are followed: 52 * 53 * XPT_CONN, XPT_DATA: 54 * - Can be set or cleared at any time. 55 * - After a set, svc_xprt_enqueue must be called to enqueue 56 * the transport for processing. 57 * - After a clear, the transport must be read/accepted. 58 * If this succeeds, it must be set again. 59 * XPT_CLOSE: 60 * - Can set at any time. It is never cleared. 61 * XPT_DEAD: 62 * - Can only be set while XPT_BUSY is held which ensures 63 * that no other thread will be using the transport or will 64 * try to set XPT_DEAD. 65 */ 66 67int svc_reg_xprt_class(struct svc_xprt_class *xcl) 68{ 69 struct svc_xprt_class *cl; 70 int res = -EEXIST; 71 72 dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); 73 74 INIT_LIST_HEAD(&xcl->xcl_list); 75 spin_lock(&svc_xprt_class_lock); 76 /* Make sure there isn't already a class with the same name */ 77 list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) { 78 if (strcmp(xcl->xcl_name, cl->xcl_name) == 0) 79 goto out; 80 } 81 list_add_tail(&xcl->xcl_list, &svc_xprt_class_list); 82 res = 0; 83out: 84 spin_unlock(&svc_xprt_class_lock); 85 return res; 86} 87EXPORT_SYMBOL_GPL(svc_reg_xprt_class); 88 89void svc_unreg_xprt_class(struct svc_xprt_class *xcl) 90{ 91 dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); 92 spin_lock(&svc_xprt_class_lock); 93 list_del_init(&xcl->xcl_list); 94 spin_unlock(&svc_xprt_class_lock); 95} 96EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); 97 98/* 99 * Format the transport list for printing 100 */ 101int svc_print_xprts(char *buf, int maxlen) 102{ 103 struct list_head *le; 104 char tmpstr[80]; 105 int len = 0; 106 buf[0] = '\0'; 107 108 spin_lock(&svc_xprt_class_lock); 109 list_for_each(le, &svc_xprt_class_list) { 110 int slen; 111 struct svc_xprt_class *xcl = 112 list_entry(le, struct svc_xprt_class, xcl_list); 113 114 sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); 115 slen = strlen(tmpstr); 116 if (len + slen > maxlen) 117 break; 118 len += slen; 119 strcat(buf, tmpstr); 120 } 121 spin_unlock(&svc_xprt_class_lock); 122 123 return len; 124} 125 126static void svc_xprt_free(struct kref *kref) 127{ 128 struct svc_xprt *xprt = 129 container_of(kref, struct svc_xprt, xpt_ref); 130 struct module *owner = xprt->xpt_class->xcl_owner; 131 if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags) && 132 xprt->xpt_auth_cache != NULL) 133 svcauth_unix_info_release(xprt->xpt_auth_cache); 134 xprt->xpt_ops->xpo_free(xprt); 135 module_put(owner); 136} 137 138void svc_xprt_put(struct svc_xprt *xprt) 139{ 140 kref_put(&xprt->xpt_ref, svc_xprt_free); 141} 142EXPORT_SYMBOL_GPL(svc_xprt_put); 143 144/* 145 * Called by transport drivers to initialize the transport independent 146 * portion of the transport instance. 147 */ 148void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt, 149 struct svc_serv *serv) 150{ 151 memset(xprt, 0, sizeof(*xprt)); 152 xprt->xpt_class = xcl; 153 xprt->xpt_ops = xcl->xcl_ops; 154 kref_init(&xprt->xpt_ref); 155 xprt->xpt_server = serv; 156 INIT_LIST_HEAD(&xprt->xpt_list); 157 INIT_LIST_HEAD(&xprt->xpt_ready); 158 INIT_LIST_HEAD(&xprt->xpt_deferred); 159 mutex_init(&xprt->xpt_mutex); 160 spin_lock_init(&xprt->xpt_lock); 161 set_bit(XPT_BUSY, &xprt->xpt_flags); 162 rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending"); 163} 164EXPORT_SYMBOL_GPL(svc_xprt_init); 165 166static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl, 167 struct svc_serv *serv, 168 const int family, 169 const unsigned short port, 170 int flags) 171{ 172 struct sockaddr_in sin = { 173 .sin_family = AF_INET, 174 .sin_addr.s_addr = htonl(INADDR_ANY), 175 .sin_port = htons(port), 176 }; 177#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 178 struct sockaddr_in6 sin6 = { 179 .sin6_family = AF_INET6, 180 .sin6_addr = IN6ADDR_ANY_INIT, 181 .sin6_port = htons(port), 182 }; 183#endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 184 struct sockaddr *sap; 185 size_t len; 186 187 switch (family) { 188 case PF_INET: 189 sap = (struct sockaddr *)&sin; 190 len = sizeof(sin); 191 break; 192#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 193 case PF_INET6: 194 sap = (struct sockaddr *)&sin6; 195 len = sizeof(sin6); 196 break; 197#endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 198 default: 199 return ERR_PTR(-EAFNOSUPPORT); 200 } 201 202 return xcl->xcl_ops->xpo_create(serv, sap, len, flags); 203} 204 205int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, 206 const int family, const unsigned short port, 207 int flags) 208{ 209 struct svc_xprt_class *xcl; 210 211 dprintk("svc: creating transport %s[%d]\n", xprt_name, port); 212 spin_lock(&svc_xprt_class_lock); 213 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 214 struct svc_xprt *newxprt; 215 unsigned short newport; 216 217 if (strcmp(xprt_name, xcl->xcl_name)) 218 continue; 219 220 if (!try_module_get(xcl->xcl_owner)) 221 goto err; 222 223 spin_unlock(&svc_xprt_class_lock); 224 newxprt = __svc_xpo_create(xcl, serv, family, port, flags); 225 if (IS_ERR(newxprt)) { 226 module_put(xcl->xcl_owner); 227 return PTR_ERR(newxprt); 228 } 229 230 clear_bit(XPT_TEMP, &newxprt->xpt_flags); 231 spin_lock_bh(&serv->sv_lock); 232 list_add(&newxprt->xpt_list, &serv->sv_permsocks); 233 spin_unlock_bh(&serv->sv_lock); 234 newport = svc_xprt_local_port(newxprt); 235 clear_bit(XPT_BUSY, &newxprt->xpt_flags); 236 return newport; 237 } 238 err: 239 spin_unlock(&svc_xprt_class_lock); 240 dprintk("svc: transport %s not found\n", xprt_name); 241 242 /* This errno is exposed to user space. Provide a reasonable 243 * perror msg for a bad transport. */ 244 return -EPROTONOSUPPORT; 245} 246EXPORT_SYMBOL_GPL(svc_create_xprt); 247 248/* 249 * Copy the local and remote xprt addresses to the rqstp structure 250 */ 251void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt) 252{ 253 struct sockaddr *sin; 254 255 memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen); 256 rqstp->rq_addrlen = xprt->xpt_remotelen; 257 258 /* 259 * Destination address in request is needed for binding the 260 * source address in RPC replies/callbacks later. 261 */ 262 sin = (struct sockaddr *)&xprt->xpt_local; 263 switch (sin->sa_family) { 264 case AF_INET: 265 rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; 266 break; 267 case AF_INET6: 268 rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; 269 break; 270 } 271} 272EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs); 273 274/** 275 * svc_print_addr - Format rq_addr field for printing 276 * @rqstp: svc_rqst struct containing address to print 277 * @buf: target buffer for formatted address 278 * @len: length of target buffer 279 * 280 */ 281char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 282{ 283 return __svc_print_addr(svc_addr(rqstp), buf, len); 284} 285EXPORT_SYMBOL_GPL(svc_print_addr); 286 287/* 288 * Queue up an idle server thread. Must have pool->sp_lock held. 289 * Note: this is really a stack rather than a queue, so that we only 290 * use as many different threads as we need, and the rest don't pollute 291 * the cache. 292 */ 293static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 294{ 295 list_add(&rqstp->rq_list, &pool->sp_threads); 296} 297 298/* 299 * Dequeue an nfsd thread. Must have pool->sp_lock held. 300 */ 301static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 302{ 303 list_del(&rqstp->rq_list); 304} 305 306/* 307 * Queue up a transport with data pending. If there are idle nfsd 308 * processes, wake 'em up. 309 * 310 */ 311void svc_xprt_enqueue(struct svc_xprt *xprt) 312{ 313 struct svc_serv *serv = xprt->xpt_server; 314 struct svc_pool *pool; 315 struct svc_rqst *rqstp; 316 int cpu; 317 318 if (!(xprt->xpt_flags & 319 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 320 return; 321 322 cpu = get_cpu(); 323 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 324 put_cpu(); 325 326 spin_lock_bh(&pool->sp_lock); 327 328 if (!list_empty(&pool->sp_threads) && 329 !list_empty(&pool->sp_sockets)) 330 printk(KERN_ERR 331 "svc_xprt_enqueue: " 332 "threads and transports both waiting??\n"); 333 334 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) { 335 /* Don't enqueue dead transports */ 336 dprintk("svc: transport %p is dead, not enqueued\n", xprt); 337 goto out_unlock; 338 } 339 340 pool->sp_stats.packets++; 341 342 /* Mark transport as busy. It will remain in this state until 343 * the provider calls svc_xprt_received. We update XPT_BUSY 344 * atomically because it also guards against trying to enqueue 345 * the transport twice. 346 */ 347 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { 348 /* Don't enqueue transport while already enqueued */ 349 dprintk("svc: transport %p busy, not enqueued\n", xprt); 350 goto out_unlock; 351 } 352 BUG_ON(xprt->xpt_pool != NULL); 353 xprt->xpt_pool = pool; 354 355 /* Handle pending connection */ 356 if (test_bit(XPT_CONN, &xprt->xpt_flags)) 357 goto process; 358 359 /* Handle close in-progress */ 360 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 361 goto process; 362 363 /* Check if we have space to reply to a request */ 364 if (!xprt->xpt_ops->xpo_has_wspace(xprt)) { 365 /* Don't enqueue while not enough space for reply */ 366 dprintk("svc: no write space, transport %p not enqueued\n", 367 xprt); 368 xprt->xpt_pool = NULL; 369 clear_bit(XPT_BUSY, &xprt->xpt_flags); 370 goto out_unlock; 371 } 372 373 process: 374 if (!list_empty(&pool->sp_threads)) { 375 rqstp = list_entry(pool->sp_threads.next, 376 struct svc_rqst, 377 rq_list); 378 dprintk("svc: transport %p served by daemon %p\n", 379 xprt, rqstp); 380 svc_thread_dequeue(pool, rqstp); 381 if (rqstp->rq_xprt) 382 printk(KERN_ERR 383 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 384 rqstp, rqstp->rq_xprt); 385 rqstp->rq_xprt = xprt; 386 svc_xprt_get(xprt); 387 rqstp->rq_reserved = serv->sv_max_mesg; 388 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 389 pool->sp_stats.threads_woken++; 390 BUG_ON(xprt->xpt_pool != pool); 391 wake_up(&rqstp->rq_wait); 392 } else { 393 dprintk("svc: transport %p put into queue\n", xprt); 394 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 395 pool->sp_stats.sockets_queued++; 396 BUG_ON(xprt->xpt_pool != pool); 397 } 398 399out_unlock: 400 spin_unlock_bh(&pool->sp_lock); 401} 402EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 403 404/* 405 * Dequeue the first transport. Must be called with the pool->sp_lock held. 406 */ 407static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 408{ 409 struct svc_xprt *xprt; 410 411 if (list_empty(&pool->sp_sockets)) 412 return NULL; 413 414 xprt = list_entry(pool->sp_sockets.next, 415 struct svc_xprt, xpt_ready); 416 list_del_init(&xprt->xpt_ready); 417 418 dprintk("svc: transport %p dequeued, inuse=%d\n", 419 xprt, atomic_read(&xprt->xpt_ref.refcount)); 420 421 return xprt; 422} 423 424/* 425 * svc_xprt_received conditionally queues the transport for processing 426 * by another thread. The caller must hold the XPT_BUSY bit and must 427 * not thereafter touch transport data. 428 * 429 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or 430 * insufficient) data. 431 */ 432void svc_xprt_received(struct svc_xprt *xprt) 433{ 434 BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); 435 xprt->xpt_pool = NULL; 436 /* As soon as we clear busy, the xprt could be closed and 437 * 'put', so we need a reference to call svc_xprt_enqueue with: 438 */ 439 svc_xprt_get(xprt); 440 clear_bit(XPT_BUSY, &xprt->xpt_flags); 441 svc_xprt_enqueue(xprt); 442 svc_xprt_put(xprt); 443} 444EXPORT_SYMBOL_GPL(svc_xprt_received); 445 446/** 447 * svc_reserve - change the space reserved for the reply to a request. 448 * @rqstp: The request in question 449 * @space: new max space to reserve 450 * 451 * Each request reserves some space on the output queue of the transport 452 * to make sure the reply fits. This function reduces that reserved 453 * space to be the amount of space used already, plus @space. 454 * 455 */ 456void svc_reserve(struct svc_rqst *rqstp, int space) 457{ 458 space += rqstp->rq_res.head[0].iov_len; 459 460 if (space < rqstp->rq_reserved) { 461 struct svc_xprt *xprt = rqstp->rq_xprt; 462 atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); 463 rqstp->rq_reserved = space; 464 465 svc_xprt_enqueue(xprt); 466 } 467} 468EXPORT_SYMBOL_GPL(svc_reserve); 469 470static void svc_xprt_release(struct svc_rqst *rqstp) 471{ 472 struct svc_xprt *xprt = rqstp->rq_xprt; 473 474 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 475 476 kfree(rqstp->rq_deferred); 477 rqstp->rq_deferred = NULL; 478 479 svc_free_res_pages(rqstp); 480 rqstp->rq_res.page_len = 0; 481 rqstp->rq_res.page_base = 0; 482 483 /* Reset response buffer and release 484 * the reservation. 485 * But first, check that enough space was reserved 486 * for the reply, otherwise we have a bug! 487 */ 488 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 489 printk(KERN_ERR "RPC request reserved %d but used %d\n", 490 rqstp->rq_reserved, 491 rqstp->rq_res.len); 492 493 rqstp->rq_res.head[0].iov_len = 0; 494 svc_reserve(rqstp, 0); 495 rqstp->rq_xprt = NULL; 496 497 svc_xprt_put(xprt); 498} 499 500/* 501 * External function to wake up a server waiting for data 502 * This really only makes sense for services like lockd 503 * which have exactly one thread anyway. 504 */ 505void svc_wake_up(struct svc_serv *serv) 506{ 507 struct svc_rqst *rqstp; 508 unsigned int i; 509 struct svc_pool *pool; 510 511 for (i = 0; i < serv->sv_nrpools; i++) { 512 pool = &serv->sv_pools[i]; 513 514 spin_lock_bh(&pool->sp_lock); 515 if (!list_empty(&pool->sp_threads)) { 516 rqstp = list_entry(pool->sp_threads.next, 517 struct svc_rqst, 518 rq_list); 519 dprintk("svc: daemon %p woken up.\n", rqstp); 520 /* 521 svc_thread_dequeue(pool, rqstp); 522 rqstp->rq_xprt = NULL; 523 */ 524 wake_up(&rqstp->rq_wait); 525 } 526 spin_unlock_bh(&pool->sp_lock); 527 } 528} 529EXPORT_SYMBOL_GPL(svc_wake_up); 530 531int svc_port_is_privileged(struct sockaddr *sin) 532{ 533 switch (sin->sa_family) { 534 case AF_INET: 535 return ntohs(((struct sockaddr_in *)sin)->sin_port) 536 < PROT_SOCK; 537 case AF_INET6: 538 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 539 < PROT_SOCK; 540 default: 541 return 0; 542 } 543} 544 545/* 546 * Make sure that we don't have too many active connections. If we have, 547 * something must be dropped. It's not clear what will happen if we allow 548 * "too many" connections, but when dealing with network-facing software, 549 * we have to code defensively. Here we do that by imposing hard limits. 550 * 551 * There's no point in trying to do random drop here for DoS 552 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 553 * attacker can easily beat that. 554 * 555 * The only somewhat efficient mechanism would be if drop old 556 * connections from the same IP first. But right now we don't even 557 * record the client IP in svc_sock. 558 * 559 * single-threaded services that expect a lot of clients will probably 560 * need to set sv_maxconn to override the default value which is based 561 * on the number of threads 562 */ 563static void svc_check_conn_limits(struct svc_serv *serv) 564{ 565 unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn : 566 (serv->sv_nrthreads+3) * 20; 567 568 if (serv->sv_tmpcnt > limit) { 569 struct svc_xprt *xprt = NULL; 570 spin_lock_bh(&serv->sv_lock); 571 if (!list_empty(&serv->sv_tempsocks)) { 572 if (net_ratelimit()) { 573 /* Try to help the admin */ 574 printk(KERN_NOTICE "%s: too many open " 575 "connections, consider increasing %s\n", 576 serv->sv_name, serv->sv_maxconn ? 577 "the max number of connections." : 578 "the number of threads."); 579 } 580 /* 581 * Always select the oldest connection. It's not fair, 582 * but so is life 583 */ 584 xprt = list_entry(serv->sv_tempsocks.prev, 585 struct svc_xprt, 586 xpt_list); 587 set_bit(XPT_CLOSE, &xprt->xpt_flags); 588 svc_xprt_get(xprt); 589 } 590 spin_unlock_bh(&serv->sv_lock); 591 592 if (xprt) { 593 svc_xprt_enqueue(xprt); 594 svc_xprt_put(xprt); 595 } 596 } 597} 598 599/* 600 * Receive the next request on any transport. This code is carefully 601 * organised not to touch any cachelines in the shared svc_serv 602 * structure, only cachelines in the local svc_pool. 603 */ 604int svc_recv(struct svc_rqst *rqstp, long timeout) 605{ 606 struct svc_xprt *xprt = NULL; 607 struct svc_serv *serv = rqstp->rq_server; 608 struct svc_pool *pool = rqstp->rq_pool; 609 int len, i; 610 int pages; 611 struct xdr_buf *arg; 612 DECLARE_WAITQUEUE(wait, current); 613 long time_left; 614 615 dprintk("svc: server %p waiting for data (to = %ld)\n", 616 rqstp, timeout); 617 618 if (rqstp->rq_xprt) 619 printk(KERN_ERR 620 "svc_recv: service %p, transport not NULL!\n", 621 rqstp); 622 if (waitqueue_active(&rqstp->rq_wait)) 623 printk(KERN_ERR 624 "svc_recv: service %p, wait queue active!\n", 625 rqstp); 626 627 /* now allocate needed pages. If we get a failure, sleep briefly */ 628 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 629 for (i = 0; i < pages ; i++) 630 while (rqstp->rq_pages[i] == NULL) { 631 struct page *p = alloc_page(GFP_KERNEL); 632 if (!p) { 633 set_current_state(TASK_INTERRUPTIBLE); 634 if (signalled() || kthread_should_stop()) { 635 set_current_state(TASK_RUNNING); 636 return -EINTR; 637 } 638 schedule_timeout(msecs_to_jiffies(500)); 639 } 640 rqstp->rq_pages[i] = p; 641 } 642 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 643 BUG_ON(pages >= RPCSVC_MAXPAGES); 644 645 /* Make arg->head point to first page and arg->pages point to rest */ 646 arg = &rqstp->rq_arg; 647 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 648 arg->head[0].iov_len = PAGE_SIZE; 649 arg->pages = rqstp->rq_pages + 1; 650 arg->page_base = 0; 651 /* save at least one page for response */ 652 arg->page_len = (pages-2)*PAGE_SIZE; 653 arg->len = (pages-1)*PAGE_SIZE; 654 arg->tail[0].iov_len = 0; 655 656 try_to_freeze(); 657 cond_resched(); 658 if (signalled() || kthread_should_stop()) 659 return -EINTR; 660 661 spin_lock_bh(&pool->sp_lock); 662 xprt = svc_xprt_dequeue(pool); 663 if (xprt) { 664 rqstp->rq_xprt = xprt; 665 svc_xprt_get(xprt); 666 rqstp->rq_reserved = serv->sv_max_mesg; 667 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 668 } else { 669 /* No data pending. Go to sleep */ 670 svc_thread_enqueue(pool, rqstp); 671 672 /* 673 * We have to be able to interrupt this wait 674 * to bring down the daemons ... 675 */ 676 set_current_state(TASK_INTERRUPTIBLE); 677 678 /* 679 * checking kthread_should_stop() here allows us to avoid 680 * locking and signalling when stopping kthreads that call 681 * svc_recv. If the thread has already been woken up, then 682 * we can exit here without sleeping. If not, then it 683 * it'll be woken up quickly during the schedule_timeout 684 */ 685 if (kthread_should_stop()) { 686 set_current_state(TASK_RUNNING); 687 spin_unlock_bh(&pool->sp_lock); 688 return -EINTR; 689 } 690 691 add_wait_queue(&rqstp->rq_wait, &wait); 692 spin_unlock_bh(&pool->sp_lock); 693 694 time_left = schedule_timeout(timeout); 695 696 try_to_freeze(); 697 698 spin_lock_bh(&pool->sp_lock); 699 remove_wait_queue(&rqstp->rq_wait, &wait); 700 if (!time_left) 701 pool->sp_stats.threads_timedout++; 702 703 xprt = rqstp->rq_xprt; 704 if (!xprt) { 705 svc_thread_dequeue(pool, rqstp); 706 spin_unlock_bh(&pool->sp_lock); 707 dprintk("svc: server %p, no data yet\n", rqstp); 708 if (signalled() || kthread_should_stop()) 709 return -EINTR; 710 else 711 return -EAGAIN; 712 } 713 } 714 spin_unlock_bh(&pool->sp_lock); 715 716 len = 0; 717 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 718 dprintk("svc_recv: found XPT_CLOSE\n"); 719 svc_delete_xprt(xprt); 720 } else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { 721 struct svc_xprt *newxpt; 722 newxpt = xprt->xpt_ops->xpo_accept(xprt); 723 if (newxpt) { 724 /* 725 * We know this module_get will succeed because the 726 * listener holds a reference too 727 */ 728 __module_get(newxpt->xpt_class->xcl_owner); 729 svc_check_conn_limits(xprt->xpt_server); 730 spin_lock_bh(&serv->sv_lock); 731 set_bit(XPT_TEMP, &newxpt->xpt_flags); 732 list_add(&newxpt->xpt_list, &serv->sv_tempsocks); 733 serv->sv_tmpcnt++; 734 if (serv->sv_temptimer.function == NULL) { 735 /* setup timer to age temp transports */ 736 setup_timer(&serv->sv_temptimer, 737 svc_age_temp_xprts, 738 (unsigned long)serv); 739 mod_timer(&serv->sv_temptimer, 740 jiffies + svc_conn_age_period * HZ); 741 } 742 spin_unlock_bh(&serv->sv_lock); 743 svc_xprt_received(newxpt); 744 } 745 svc_xprt_received(xprt); 746 } else { 747 dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", 748 rqstp, pool->sp_id, xprt, 749 atomic_read(&xprt->xpt_ref.refcount)); 750 rqstp->rq_deferred = svc_deferred_dequeue(xprt); 751 if (rqstp->rq_deferred) { 752 svc_xprt_received(xprt); 753 len = svc_deferred_recv(rqstp); 754 } else { 755 len = xprt->xpt_ops->xpo_recvfrom(rqstp); 756 svc_xprt_received(xprt); 757 } 758 dprintk("svc: got len=%d\n", len); 759 } 760 761 /* No data, incomplete (TCP) read, or accept() */ 762 if (len == 0 || len == -EAGAIN) { 763 rqstp->rq_res.len = 0; 764 svc_xprt_release(rqstp); 765 return -EAGAIN; 766 } 767 clear_bit(XPT_OLD, &xprt->xpt_flags); 768 769 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 770 rqstp->rq_chandle.defer = svc_defer; 771 772 if (serv->sv_stats) 773 serv->sv_stats->netcnt++; 774 return len; 775} 776EXPORT_SYMBOL_GPL(svc_recv); 777 778/* 779 * Drop request 780 */ 781void svc_drop(struct svc_rqst *rqstp) 782{ 783 dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); 784 svc_xprt_release(rqstp); 785} 786EXPORT_SYMBOL_GPL(svc_drop); 787 788/* 789 * Return reply to client. 790 */ 791int svc_send(struct svc_rqst *rqstp) 792{ 793 struct svc_xprt *xprt; 794 int len; 795 struct xdr_buf *xb; 796 797 xprt = rqstp->rq_xprt; 798 if (!xprt) 799 return -EFAULT; 800 801 /* release the receive skb before sending the reply */ 802 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 803 804 /* calculate over-all length */ 805 xb = &rqstp->rq_res; 806 xb->len = xb->head[0].iov_len + 807 xb->page_len + 808 xb->tail[0].iov_len; 809 810 /* Grab mutex to serialize outgoing data. */ 811 mutex_lock(&xprt->xpt_mutex); 812 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 813 len = -ENOTCONN; 814 else 815 len = xprt->xpt_ops->xpo_sendto(rqstp); 816 mutex_unlock(&xprt->xpt_mutex); 817 rpc_wake_up(&xprt->xpt_bc_pending); 818 svc_xprt_release(rqstp); 819 820 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 821 return 0; 822 return len; 823} 824 825/* 826 * Timer function to close old temporary transports, using 827 * a mark-and-sweep algorithm. 828 */ 829static void svc_age_temp_xprts(unsigned long closure) 830{ 831 struct svc_serv *serv = (struct svc_serv *)closure; 832 struct svc_xprt *xprt; 833 struct list_head *le, *next; 834 LIST_HEAD(to_be_aged); 835 836 dprintk("svc_age_temp_xprts\n"); 837 838 if (!spin_trylock_bh(&serv->sv_lock)) { 839 /* busy, try again 1 sec later */ 840 dprintk("svc_age_temp_xprts: busy\n"); 841 mod_timer(&serv->sv_temptimer, jiffies + HZ); 842 return; 843 } 844 845 list_for_each_safe(le, next, &serv->sv_tempsocks) { 846 xprt = list_entry(le, struct svc_xprt, xpt_list); 847 848 /* First time through, just mark it OLD. Second time 849 * through, close it. */ 850 if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags)) 851 continue; 852 if (atomic_read(&xprt->xpt_ref.refcount) > 1 || 853 test_bit(XPT_BUSY, &xprt->xpt_flags)) 854 continue; 855 svc_xprt_get(xprt); 856 list_move(le, &to_be_aged); 857 set_bit(XPT_CLOSE, &xprt->xpt_flags); 858 set_bit(XPT_DETACHED, &xprt->xpt_flags); 859 } 860 spin_unlock_bh(&serv->sv_lock); 861 862 while (!list_empty(&to_be_aged)) { 863 le = to_be_aged.next; 864 /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */ 865 list_del_init(le); 866 xprt = list_entry(le, struct svc_xprt, xpt_list); 867 868 dprintk("queuing xprt %p for closing\n", xprt); 869 870 /* a thread will dequeue and close it soon */ 871 svc_xprt_enqueue(xprt); 872 svc_xprt_put(xprt); 873 } 874 875 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 876} 877 878/* 879 * Remove a dead transport 880 */ 881void svc_delete_xprt(struct svc_xprt *xprt) 882{ 883 struct svc_serv *serv = xprt->xpt_server; 884 struct svc_deferred_req *dr; 885 886 /* Only do this once */ 887 if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) 888 return; 889 890 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 891 xprt->xpt_ops->xpo_detach(xprt); 892 893 spin_lock_bh(&serv->sv_lock); 894 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 895 list_del_init(&xprt->xpt_list); 896 /* 897 * We used to delete the transport from whichever list 898 * it's sk_xprt.xpt_ready node was on, but we don't actually 899 * need to. This is because the only time we're called 900 * while still attached to a queue, the queue itself 901 * is about to be destroyed (in svc_destroy). 902 */ 903 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 904 serv->sv_tmpcnt--; 905 spin_unlock_bh(&serv->sv_lock); 906 907 while ((dr = svc_deferred_dequeue(xprt)) != NULL) 908 kfree(dr); 909 910 svc_xprt_put(xprt); 911} 912 913void svc_close_xprt(struct svc_xprt *xprt) 914{ 915 set_bit(XPT_CLOSE, &xprt->xpt_flags); 916 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 917 /* someone else will have to effect the close */ 918 return; 919 920 svc_xprt_get(xprt); 921 svc_delete_xprt(xprt); 922 clear_bit(XPT_BUSY, &xprt->xpt_flags); 923 svc_xprt_put(xprt); 924} 925EXPORT_SYMBOL_GPL(svc_close_xprt); 926 927void svc_close_all(struct list_head *xprt_list) 928{ 929 struct svc_xprt *xprt; 930 struct svc_xprt *tmp; 931 932 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 933 set_bit(XPT_CLOSE, &xprt->xpt_flags); 934 if (test_bit(XPT_BUSY, &xprt->xpt_flags)) { 935 /* Waiting to be processed, but no threads left, 936 * So just remove it from the waiting list 937 */ 938 list_del_init(&xprt->xpt_ready); 939 clear_bit(XPT_BUSY, &xprt->xpt_flags); 940 } 941 svc_close_xprt(xprt); 942 } 943} 944 945/* 946 * Handle defer and revisit of requests 947 */ 948 949static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 950{ 951 struct svc_deferred_req *dr = 952 container_of(dreq, struct svc_deferred_req, handle); 953 struct svc_xprt *xprt = dr->xprt; 954 955 spin_lock(&xprt->xpt_lock); 956 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 957 if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) { 958 spin_unlock(&xprt->xpt_lock); 959 dprintk("revisit canceled\n"); 960 svc_xprt_put(xprt); 961 kfree(dr); 962 return; 963 } 964 dprintk("revisit queued\n"); 965 dr->xprt = NULL; 966 list_add(&dr->handle.recent, &xprt->xpt_deferred); 967 spin_unlock(&xprt->xpt_lock); 968 svc_xprt_enqueue(xprt); 969 svc_xprt_put(xprt); 970} 971 972/* 973 * Save the request off for later processing. The request buffer looks 974 * like this: 975 * 976 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail> 977 * 978 * This code can only handle requests that consist of an xprt-header 979 * and rpc-header. 980 */ 981static struct cache_deferred_req *svc_defer(struct cache_req *req) 982{ 983 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 984 struct svc_deferred_req *dr; 985 986 if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral) 987 return NULL; 988 if (rqstp->rq_deferred) { 989 dr = rqstp->rq_deferred; 990 rqstp->rq_deferred = NULL; 991 } else { 992 size_t skip; 993 size_t size; 994 size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len; 995 dr = kmalloc(size, GFP_KERNEL); 996 if (dr == NULL) 997 return NULL; 998 999 dr->handle.owner = rqstp->rq_server; 1000 dr->prot = rqstp->rq_prot; 1001 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 1002 dr->addrlen = rqstp->rq_addrlen; 1003 dr->daddr = rqstp->rq_daddr; 1004 dr->argslen = rqstp->rq_arg.len >> 2; 1005 dr->xprt_hlen = rqstp->rq_xprt_hlen; 1006 1007 /* back up head to the start of the buffer and copy */ 1008 skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 1009 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, 1010 dr->argslen << 2); 1011 } 1012 svc_xprt_get(rqstp->rq_xprt); 1013 dr->xprt = rqstp->rq_xprt; 1014 1015 dr->handle.revisit = svc_revisit; 1016 return &dr->handle; 1017} 1018 1019/* 1020 * recv data from a deferred request into an active one 1021 */ 1022static int svc_deferred_recv(struct svc_rqst *rqstp) 1023{ 1024 struct svc_deferred_req *dr = rqstp->rq_deferred; 1025 1026 /* setup iov_base past transport header */ 1027 rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); 1028 /* The iov_len does not include the transport header bytes */ 1029 rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; 1030 rqstp->rq_arg.page_len = 0; 1031 /* The rq_arg.len includes the transport header bytes */ 1032 rqstp->rq_arg.len = dr->argslen<<2; 1033 rqstp->rq_prot = dr->prot; 1034 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 1035 rqstp->rq_addrlen = dr->addrlen; 1036 /* Save off transport header len in case we get deferred again */ 1037 rqstp->rq_xprt_hlen = dr->xprt_hlen; 1038 rqstp->rq_daddr = dr->daddr; 1039 rqstp->rq_respages = rqstp->rq_pages; 1040 return (dr->argslen<<2) - dr->xprt_hlen; 1041} 1042 1043 1044static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) 1045{ 1046 struct svc_deferred_req *dr = NULL; 1047 1048 if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags)) 1049 return NULL; 1050 spin_lock(&xprt->xpt_lock); 1051 clear_bit(XPT_DEFERRED, &xprt->xpt_flags); 1052 if (!list_empty(&xprt->xpt_deferred)) { 1053 dr = list_entry(xprt->xpt_deferred.next, 1054 struct svc_deferred_req, 1055 handle.recent); 1056 list_del_init(&dr->handle.recent); 1057 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 1058 } 1059 spin_unlock(&xprt->xpt_lock); 1060 return dr; 1061} 1062 1063/** 1064 * svc_find_xprt - find an RPC transport instance 1065 * @serv: pointer to svc_serv to search 1066 * @xcl_name: C string containing transport's class name 1067 * @af: Address family of transport's local address 1068 * @port: transport's IP port number 1069 * 1070 * Return the transport instance pointer for the endpoint accepting 1071 * connections/peer traffic from the specified transport class, 1072 * address family and port. 1073 * 1074 * Specifying 0 for the address family or port is effectively a 1075 * wild-card, and will result in matching the first transport in the 1076 * service's list that has a matching class name. 1077 */ 1078struct svc_xprt *svc_find_xprt(struct svc_serv *serv, const char *xcl_name, 1079 const sa_family_t af, const unsigned short port) 1080{ 1081 struct svc_xprt *xprt; 1082 struct svc_xprt *found = NULL; 1083 1084 /* Sanity check the args */ 1085 if (serv == NULL || xcl_name == NULL) 1086 return found; 1087 1088 spin_lock_bh(&serv->sv_lock); 1089 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1090 if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) 1091 continue; 1092 if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family) 1093 continue; 1094 if (port != 0 && port != svc_xprt_local_port(xprt)) 1095 continue; 1096 found = xprt; 1097 svc_xprt_get(xprt); 1098 break; 1099 } 1100 spin_unlock_bh(&serv->sv_lock); 1101 return found; 1102} 1103EXPORT_SYMBOL_GPL(svc_find_xprt); 1104 1105static int svc_one_xprt_name(const struct svc_xprt *xprt, 1106 char *pos, int remaining) 1107{ 1108 int len; 1109 1110 len = snprintf(pos, remaining, "%s %u\n", 1111 xprt->xpt_class->xcl_name, 1112 svc_xprt_local_port(xprt)); 1113 if (len >= remaining) 1114 return -ENAMETOOLONG; 1115 return len; 1116} 1117 1118/** 1119 * svc_xprt_names - format a buffer with a list of transport names 1120 * @serv: pointer to an RPC service 1121 * @buf: pointer to a buffer to be filled in 1122 * @buflen: length of buffer to be filled in 1123 * 1124 * Fills in @buf with a string containing a list of transport names, 1125 * each name terminated with '\n'. 1126 * 1127 * Returns positive length of the filled-in string on success; otherwise 1128 * a negative errno value is returned if an error occurs. 1129 */ 1130int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen) 1131{ 1132 struct svc_xprt *xprt; 1133 int len, totlen; 1134 char *pos; 1135 1136 /* Sanity check args */ 1137 if (!serv) 1138 return 0; 1139 1140 spin_lock_bh(&serv->sv_lock); 1141 1142 pos = buf; 1143 totlen = 0; 1144 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1145 len = svc_one_xprt_name(xprt, pos, buflen - totlen); 1146 if (len < 0) { 1147 *buf = '\0'; 1148 totlen = len; 1149 } 1150 if (len <= 0) 1151 break; 1152 1153 pos += len; 1154 totlen += len; 1155 } 1156 1157 spin_unlock_bh(&serv->sv_lock); 1158 return totlen; 1159} 1160EXPORT_SYMBOL_GPL(svc_xprt_names); 1161 1162 1163/*----------------------------------------------------------------------------*/ 1164 1165static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos) 1166{ 1167 unsigned int pidx = (unsigned int)*pos; 1168 struct svc_serv *serv = m->private; 1169 1170 dprintk("svc_pool_stats_start, *pidx=%u\n", pidx); 1171 1172 if (!pidx) 1173 return SEQ_START_TOKEN; 1174 return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]); 1175} 1176 1177static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) 1178{ 1179 struct svc_pool *pool = p; 1180 struct svc_serv *serv = m->private; 1181 1182 dprintk("svc_pool_stats_next, *pos=%llu\n", *pos); 1183 1184 if (p == SEQ_START_TOKEN) { 1185 pool = &serv->sv_pools[0]; 1186 } else { 1187 unsigned int pidx = (pool - &serv->sv_pools[0]); 1188 if (pidx < serv->sv_nrpools-1) 1189 pool = &serv->sv_pools[pidx+1]; 1190 else 1191 pool = NULL; 1192 } 1193 ++*pos; 1194 return pool; 1195} 1196 1197static void svc_pool_stats_stop(struct seq_file *m, void *p) 1198{ 1199} 1200 1201static int svc_pool_stats_show(struct seq_file *m, void *p) 1202{ 1203 struct svc_pool *pool = p; 1204 1205 if (p == SEQ_START_TOKEN) { 1206 seq_puts(m, "# pool packets-arrived sockets-enqueued threads-woken threads-timedout\n"); 1207 return 0; 1208 } 1209 1210 seq_printf(m, "%u %lu %lu %lu %lu\n", 1211 pool->sp_id, 1212 pool->sp_stats.packets, 1213 pool->sp_stats.sockets_queued, 1214 pool->sp_stats.threads_woken, 1215 pool->sp_stats.threads_timedout); 1216 1217 return 0; 1218} 1219 1220static const struct seq_operations svc_pool_stats_seq_ops = { 1221 .start = svc_pool_stats_start, 1222 .next = svc_pool_stats_next, 1223 .stop = svc_pool_stats_stop, 1224 .show = svc_pool_stats_show, 1225}; 1226 1227int svc_pool_stats_open(struct svc_serv *serv, struct file *file) 1228{ 1229 int err; 1230 1231 err = seq_open(file, &svc_pool_stats_seq_ops); 1232 if (!err) 1233 ((struct seq_file *) file->private_data)->private = serv; 1234 return err; 1235} 1236EXPORT_SYMBOL(svc_pool_stats_open); 1237 1238/*----------------------------------------------------------------------------*/ 1239