replay.c revision 211830
1/*- 2 * Copyright (c) 2008 Isilon Inc http://www.isilon.com/ 3 * Authors: Doug Rabson <dfr@rabson.org> 4 * Developed with Red Inc: Alfred Perlstein <alfred@freebsd.org> 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25 * SUCH DAMAGE. 26 */ 27 28#include <sys/cdefs.h> 29__FBSDID("$FreeBSD: head/sys/rpc/replay.c 211830 2010-08-25 23:23:00Z rmacklem $"); 30 31#include <sys/param.h> 32#include <sys/hash.h> 33#include <sys/kernel.h> 34#include <sys/lock.h> 35#include <sys/mbuf.h> 36#include <sys/mutex.h> 37#include <sys/queue.h> 38 39#include <rpc/rpc.h> 40#include <rpc/replay.h> 41 42struct replay_cache_entry { 43 int rce_hash; 44 struct rpc_msg rce_msg; 45 struct sockaddr_storage rce_addr; 46 struct rpc_msg rce_repmsg; 47 struct mbuf *rce_repbody; 48 49 TAILQ_ENTRY(replay_cache_entry) rce_link; 50 TAILQ_ENTRY(replay_cache_entry) rce_alllink; 51}; 52TAILQ_HEAD(replay_cache_list, replay_cache_entry); 53 54static struct replay_cache_entry * 55 replay_alloc(struct replay_cache *rc, struct rpc_msg *msg, 56 struct sockaddr *addr, int h); 57static void replay_free(struct replay_cache *rc, 58 struct replay_cache_entry *rce); 59static void replay_prune(struct replay_cache *rc); 60 61#define REPLAY_HASH_SIZE 256 62#define REPLAY_MAX 1024 63 64struct replay_cache { 65 struct replay_cache_list rc_cache[REPLAY_HASH_SIZE]; 66 struct replay_cache_list rc_all; 67 struct mtx rc_lock; 68 int rc_count; 69 size_t rc_size; 70 size_t rc_maxsize; 71}; 72 73struct replay_cache * 74replay_newcache(size_t maxsize) 75{ 76 struct replay_cache *rc; 77 int i; 78 79 rc = malloc(sizeof(*rc), M_RPC, M_WAITOK|M_ZERO); 80 for (i = 0; i < REPLAY_HASH_SIZE; i++) 81 TAILQ_INIT(&rc->rc_cache[i]); 82 TAILQ_INIT(&rc->rc_all); 83 mtx_init(&rc->rc_lock, "rc_lock", NULL, MTX_DEF); 84 rc->rc_maxsize = maxsize; 85 86 return (rc); 87} 88 89void 90replay_setsize(struct replay_cache *rc, size_t newmaxsize) 91{ 92 93 mtx_lock(&rc->rc_lock); 94 rc->rc_maxsize = newmaxsize; 95 replay_prune(rc); 96 mtx_unlock(&rc->rc_lock); 97} 98 99void 100replay_freecache(struct replay_cache *rc) 101{ 102 103 mtx_lock(&rc->rc_lock); 104 while (TAILQ_FIRST(&rc->rc_all)) 105 replay_free(rc, TAILQ_FIRST(&rc->rc_all)); 106 mtx_destroy(&rc->rc_lock); 107 free(rc, M_RPC); 108} 109 110static struct replay_cache_entry * 111replay_alloc(struct replay_cache *rc, 112 struct rpc_msg *msg, struct sockaddr *addr, int h) 113{ 114 struct replay_cache_entry *rce; 115 116 rc->rc_count++; 117 rce = malloc(sizeof(*rce), M_RPC, M_NOWAIT|M_ZERO); 118 rce->rce_hash = h; 119 rce->rce_msg = *msg; 120 bcopy(addr, &rce->rce_addr, addr->sa_len); 121 122 TAILQ_INSERT_HEAD(&rc->rc_cache[h], rce, rce_link); 123 TAILQ_INSERT_HEAD(&rc->rc_all, rce, rce_alllink); 124 125 return (rce); 126} 127 128static void 129replay_free(struct replay_cache *rc, struct replay_cache_entry *rce) 130{ 131 132 rc->rc_count--; 133 TAILQ_REMOVE(&rc->rc_cache[rce->rce_hash], rce, rce_link); 134 TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink); 135 if (rce->rce_repbody) { 136 rc->rc_size -= m_length(rce->rce_repbody, NULL); 137 m_freem(rce->rce_repbody); 138 } 139 free(rce, M_RPC); 140} 141 142static void 143replay_prune(struct replay_cache *rc) 144{ 145 struct replay_cache_entry *rce; 146 bool_t freed_one; 147 148 if (rc->rc_count >= REPLAY_MAX || rc->rc_size > rc->rc_maxsize) { 149 do { 150 freed_one = FALSE; 151 /* 152 * Try to free an entry. Don't free in-progress entries 153 */ 154 TAILQ_FOREACH_REVERSE(rce, &rc->rc_all, 155 replay_cache_list, rce_alllink) { 156 if (rce->rce_repmsg.rm_xid) { 157 replay_free(rc, rce); 158 freed_one = TRUE; 159 break; 160 } 161 } 162 } while (freed_one 163 && (rc->rc_count >= REPLAY_MAX 164 || rc->rc_size > rc->rc_maxsize)); 165 } 166} 167 168enum replay_state 169replay_find(struct replay_cache *rc, struct rpc_msg *msg, 170 struct sockaddr *addr, struct rpc_msg *repmsg, struct mbuf **mp) 171{ 172 int h = HASHSTEP(HASHINIT, msg->rm_xid) % REPLAY_HASH_SIZE; 173 struct replay_cache_entry *rce; 174 175 mtx_lock(&rc->rc_lock); 176 TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) { 177 if (rce->rce_msg.rm_xid == msg->rm_xid 178 && rce->rce_msg.rm_call.cb_prog == msg->rm_call.cb_prog 179 && rce->rce_msg.rm_call.cb_vers == msg->rm_call.cb_vers 180 && rce->rce_msg.rm_call.cb_proc == msg->rm_call.cb_proc 181 && rce->rce_addr.ss_len == addr->sa_len 182 && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) { 183 if (rce->rce_repmsg.rm_xid) { 184 /* 185 * We have a reply for this 186 * message. Copy it and return. Keep 187 * replay_all LRU sorted 188 */ 189 TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink); 190 TAILQ_INSERT_HEAD(&rc->rc_all, rce, 191 rce_alllink); 192 *repmsg = rce->rce_repmsg; 193 if (rce->rce_repbody) { 194 *mp = m_copym(rce->rce_repbody, 195 0, M_COPYALL, M_NOWAIT); 196 mtx_unlock(&rc->rc_lock); 197 if (!*mp) 198 return (RS_ERROR); 199 } else { 200 mtx_unlock(&rc->rc_lock); 201 } 202 return (RS_DONE); 203 } else { 204 mtx_unlock(&rc->rc_lock); 205 return (RS_INPROGRESS); 206 } 207 } 208 } 209 210 replay_prune(rc); 211 212 rce = replay_alloc(rc, msg, addr, h); 213 214 mtx_unlock(&rc->rc_lock); 215 216 if (!rce) 217 return (RS_ERROR); 218 else 219 return (RS_NEW); 220} 221 222void 223replay_setreply(struct replay_cache *rc, 224 struct rpc_msg *repmsg, struct sockaddr *addr, struct mbuf *m) 225{ 226 int h = HASHSTEP(HASHINIT, repmsg->rm_xid) % REPLAY_HASH_SIZE; 227 struct replay_cache_entry *rce; 228 229 /* 230 * Copy the reply before the lock so we can sleep. 231 */ 232 if (m) 233 m = m_copym(m, 0, M_COPYALL, M_WAITOK); 234 235 mtx_lock(&rc->rc_lock); 236 TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) { 237 if (rce->rce_msg.rm_xid == repmsg->rm_xid 238 && rce->rce_addr.ss_len == addr->sa_len 239 && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) { 240 break; 241 } 242 } 243 if (rce) { 244 rce->rce_repmsg = *repmsg; 245 rce->rce_repbody = m; 246 if (m) 247 rc->rc_size += m_length(m, NULL); 248 } 249 mtx_unlock(&rc->rc_lock); 250} 251