replay.c revision 225736
1118611Snjl/*- 2118611Snjl * Copyright (c) 2008 Isilon Inc http://www.isilon.com/ 3118611Snjl * Authors: Doug Rabson <dfr@rabson.org> 4118611Snjl * Developed with Red Inc: Alfred Perlstein <alfred@freebsd.org> 5118611Snjl * 6118611Snjl * Redistribution and use in source and binary forms, with or without 7217365Sjkim * modification, are permitted provided that the following conditions 8229989Sjkim * are met: 9118611Snjl * 1. Redistributions of source code must retain the above copyright 10118611Snjl * notice, this list of conditions and the following disclaimer. 11217365Sjkim * 2. Redistributions in binary form must reproduce the above copyright 12217365Sjkim * notice, this list of conditions and the following disclaimer in the 13217365Sjkim * documentation and/or other materials provided with the distribution. 14217365Sjkim * 15217365Sjkim * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16217365Sjkim * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17217365Sjkim * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18217365Sjkim * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19217365Sjkim * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20217365Sjkim * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21217365Sjkim * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22217365Sjkim * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23217365Sjkim * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24217365Sjkim * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25118611Snjl * SUCH DAMAGE. 26217365Sjkim */ 27217365Sjkim 28217365Sjkim#include <sys/cdefs.h> 29118611Snjl__FBSDID("$FreeBSD: stable/9/sys/rpc/replay.c 211853 2010-08-26 23:33:04Z pjd $"); 30217365Sjkim 31217365Sjkim#include <sys/param.h> 32217365Sjkim#include <sys/hash.h> 33217365Sjkim#include <sys/kernel.h> 34217365Sjkim#include <sys/lock.h> 35217365Sjkim#include <sys/mbuf.h> 36217365Sjkim#include <sys/mutex.h> 37217365Sjkim#include <sys/queue.h> 38217365Sjkim 39217365Sjkim#include <rpc/rpc.h> 40217365Sjkim#include <rpc/replay.h> 41217365Sjkim 42217365Sjkimstruct replay_cache_entry { 43118611Snjl int rce_hash; 44151937Sjkim struct rpc_msg rce_msg; 45193529Sjkim struct sockaddr_storage rce_addr; 46118611Snjl struct rpc_msg rce_repmsg; 47118611Snjl struct mbuf *rce_repbody; 48118611Snjl 49118611Snjl TAILQ_ENTRY(replay_cache_entry) rce_link; 50151937Sjkim TAILQ_ENTRY(replay_cache_entry) rce_alllink; 51118611Snjl}; 52233250SjkimTAILQ_HEAD(replay_cache_list, replay_cache_entry); 53197104Sjkim 54197104Sjkimstatic struct replay_cache_entry * 55197104Sjkim replay_alloc(struct replay_cache *rc, struct rpc_msg *msg, 56151937Sjkim struct sockaddr *addr, int h); 57197104Sjkimstatic void replay_free(struct replay_cache *rc, 58151937Sjkim struct replay_cache_entry *rce); 59151937Sjkimstatic void replay_prune(struct replay_cache *rc); 60151937Sjkim 61151937Sjkim#define REPLAY_HASH_SIZE 256 62151937Sjkim#define REPLAY_MAX 1024 63151937Sjkim 64151937Sjkimstruct replay_cache { 65118611Snjl struct replay_cache_list rc_cache[REPLAY_HASH_SIZE]; 66118611Snjl struct replay_cache_list rc_all; 67118611Snjl struct mtx rc_lock; 68118611Snjl int rc_count; 69118611Snjl size_t rc_size; 70118611Snjl size_t rc_maxsize; 71118611Snjl}; 72118611Snjl 73241973Sjkimstruct replay_cache * 74118611Snjlreplay_newcache(size_t maxsize) 75118611Snjl{ 76118611Snjl struct replay_cache *rc; 77118611Snjl int i; 78118611Snjl 79151937Sjkim rc = malloc(sizeof(*rc), M_RPC, M_WAITOK|M_ZERO); 80151937Sjkim for (i = 0; i < REPLAY_HASH_SIZE; i++) 81118611Snjl TAILQ_INIT(&rc->rc_cache[i]); 82118611Snjl TAILQ_INIT(&rc->rc_all); 83234623Sjkim mtx_init(&rc->rc_lock, "rc_lock", NULL, MTX_DEF); 84118611Snjl rc->rc_maxsize = maxsize; 85118611Snjl 86234623Sjkim return (rc); 87118611Snjl} 88234623Sjkim 89118611Snjlvoid 90118611Snjlreplay_setsize(struct replay_cache *rc, size_t newmaxsize) 91151937Sjkim{ 92118611Snjl 93118611Snjl mtx_lock(&rc->rc_lock); 94118611Snjl rc->rc_maxsize = newmaxsize; 95118611Snjl replay_prune(rc); 96118611Snjl mtx_unlock(&rc->rc_lock); 97118611Snjl} 98118611Snjl 99118611Snjlvoid 100118611Snjlreplay_freecache(struct replay_cache *rc) 101118611Snjl{ 102118611Snjl 103118611Snjl mtx_lock(&rc->rc_lock); 104118611Snjl while (TAILQ_FIRST(&rc->rc_all)) 105118611Snjl replay_free(rc, TAILQ_FIRST(&rc->rc_all)); 106118611Snjl mtx_destroy(&rc->rc_lock); 107118611Snjl free(rc, M_RPC); 108118611Snjl} 109118611Snjl 110118611Snjlstatic struct replay_cache_entry * 111118611Snjlreplay_alloc(struct replay_cache *rc, 112118611Snjl struct rpc_msg *msg, struct sockaddr *addr, int h) 113118611Snjl{ 114118611Snjl struct replay_cache_entry *rce; 115151937Sjkim 116151937Sjkim mtx_assert(&rc->rc_lock, MA_OWNED); 117118611Snjl 118118611Snjl rc->rc_count++; 119118611Snjl rce = malloc(sizeof(*rce), M_RPC, M_NOWAIT|M_ZERO); 120118611Snjl if (!rce) 121118611Snjl return (NULL); 122118611Snjl rce->rce_hash = h; 123118611Snjl rce->rce_msg = *msg; 124118611Snjl bcopy(addr, &rce->rce_addr, addr->sa_len); 125118611Snjl 126118611Snjl TAILQ_INSERT_HEAD(&rc->rc_cache[h], rce, rce_link); 127118611Snjl TAILQ_INSERT_HEAD(&rc->rc_all, rce, rce_alllink); 128118611Snjl 129151937Sjkim return (rce); 130118611Snjl} 131118611Snjl 132118611Snjlstatic void 133118611Snjlreplay_free(struct replay_cache *rc, struct replay_cache_entry *rce) 134118611Snjl{ 135118611Snjl 136209746Sjkim mtx_assert(&rc->rc_lock, MA_OWNED); 137118611Snjl 138118611Snjl rc->rc_count--; 139118611Snjl TAILQ_REMOVE(&rc->rc_cache[rce->rce_hash], rce, rce_link); 140118611Snjl TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink); 141118611Snjl if (rce->rce_repbody) { 142118611Snjl rc->rc_size -= m_length(rce->rce_repbody, NULL); 143118611Snjl m_freem(rce->rce_repbody); 144118611Snjl } 145118611Snjl free(rce, M_RPC); 146118611Snjl} 147118611Snjl 148118611Snjlstatic void 149118611Snjlreplay_prune(struct replay_cache *rc) 150118611Snjl{ 151118611Snjl struct replay_cache_entry *rce; 152118611Snjl 153118611Snjl mtx_assert(&rc->rc_lock, MA_OWNED); 154118611Snjl 155118611Snjl if (rc->rc_count < REPLAY_MAX && rc->rc_size <= rc->rc_maxsize) 156118611Snjl return; 157118611Snjl 158118611Snjl do { 159118611Snjl /* 160207344Sjkim * Try to free an entry. Don't free in-progress entries. 161207344Sjkim */ 162207344Sjkim TAILQ_FOREACH_REVERSE(rce, &rc->rc_all, replay_cache_list, 163207344Sjkim rce_alllink) { 164207344Sjkim if (rce->rce_repmsg.rm_xid) 165207344Sjkim break; 166207344Sjkim } 167207344Sjkim if (rce) 168207344Sjkim replay_free(rc, rce); 169207344Sjkim } while (rce && (rc->rc_count >= REPLAY_MAX 170207344Sjkim || rc->rc_size > rc->rc_maxsize)); 171207344Sjkim} 172207344Sjkim 173207344Sjkimenum replay_state 174207344Sjkimreplay_find(struct replay_cache *rc, struct rpc_msg *msg, 175207344Sjkim struct sockaddr *addr, struct rpc_msg *repmsg, struct mbuf **mp) 176228110Sjkim{ 177207344Sjkim int h = HASHSTEP(HASHINIT, msg->rm_xid) % REPLAY_HASH_SIZE; 178207344Sjkim struct replay_cache_entry *rce; 179207344Sjkim 180228110Sjkim mtx_lock(&rc->rc_lock); 181207344Sjkim TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) { 182207344Sjkim if (rce->rce_msg.rm_xid == msg->rm_xid 183207344Sjkim && rce->rce_msg.rm_call.cb_prog == msg->rm_call.cb_prog 184207344Sjkim && rce->rce_msg.rm_call.cb_vers == msg->rm_call.cb_vers 185228110Sjkim && rce->rce_msg.rm_call.cb_proc == msg->rm_call.cb_proc 186228110Sjkim && rce->rce_addr.ss_len == addr->sa_len 187228110Sjkim && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) { 188207344Sjkim if (rce->rce_repmsg.rm_xid) { 189207344Sjkim /* 190207344Sjkim * We have a reply for this 191207344Sjkim * message. Copy it and return. Keep 192207344Sjkim * replay_all LRU sorted 193207344Sjkim */ 194118611Snjl TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink); 195118611Snjl TAILQ_INSERT_HEAD(&rc->rc_all, rce, 196118611Snjl rce_alllink); 197118611Snjl *repmsg = rce->rce_repmsg; 198118611Snjl if (rce->rce_repbody) { 199118611Snjl *mp = m_copym(rce->rce_repbody, 200241973Sjkim 0, M_COPYALL, M_NOWAIT); 201118611Snjl mtx_unlock(&rc->rc_lock); 202118611Snjl if (!*mp) 203118611Snjl return (RS_ERROR); 204118611Snjl } else { 205118611Snjl mtx_unlock(&rc->rc_lock); 206118611Snjl } 207118611Snjl return (RS_DONE); 208118611Snjl } else { 209118611Snjl mtx_unlock(&rc->rc_lock); 210118611Snjl return (RS_INPROGRESS); 211118611Snjl } 212118611Snjl } 213118611Snjl } 214118611Snjl 215118611Snjl replay_prune(rc); 216118611Snjl 217118611Snjl rce = replay_alloc(rc, msg, addr, h); 218118611Snjl 219118611Snjl mtx_unlock(&rc->rc_lock); 220118611Snjl 221118611Snjl if (!rce) 222118611Snjl return (RS_ERROR); 223118611Snjl else 224118611Snjl return (RS_NEW); 225118611Snjl} 226118611Snjl 227118611Snjlvoid 228118611Snjlreplay_setreply(struct replay_cache *rc, 229118611Snjl struct rpc_msg *repmsg, struct sockaddr *addr, struct mbuf *m) 230118611Snjl{ 231118611Snjl int h = HASHSTEP(HASHINIT, repmsg->rm_xid) % REPLAY_HASH_SIZE; 232118611Snjl struct replay_cache_entry *rce; 233118611Snjl 234118611Snjl /* 235118611Snjl * Copy the reply before the lock so we can sleep. 236118611Snjl */ 237118611Snjl if (m) 238118611Snjl m = m_copym(m, 0, M_COPYALL, M_WAITOK); 239118611Snjl 240118611Snjl mtx_lock(&rc->rc_lock); 241118611Snjl TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) { 242118611Snjl if (rce->rce_msg.rm_xid == repmsg->rm_xid 243118611Snjl && rce->rce_addr.ss_len == addr->sa_len 244151937Sjkim && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) { 245118611Snjl break; 246118611Snjl } 247118611Snjl } 248118611Snjl if (rce) { 249118611Snjl rce->rce_repmsg = *repmsg; 250118611Snjl rce->rce_repbody = m; 251118611Snjl if (m) 252118611Snjl rc->rc_size += m_length(m, NULL); 253118611Snjl } 254118611Snjl mtx_unlock(&rc->rc_lock); 255118611Snjl} 256118611Snjl