1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005,2008 Oracle. All rights reserved. 5 * 6 * $Id: env_failchk.c,v 12.44 2008/03/12 20:52:53 mbrey Exp $ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#ifndef HAVE_SIMPLE_THREAD_TYPE 13#include "dbinc/db_page.h" 14#include "dbinc/hash.h" /* Needed for call to __ham_func5. */ 15#endif 16#include "dbinc/lock.h" 17#include "dbinc/log.h" 18#include "dbinc/mp.h" 19#include "dbinc/txn.h" 20 21static int __env_in_api __P((ENV *)); 22static void __env_clear_state __P((ENV *)); 23 24/* 25 * __env_failchk_pp -- 26 * ENV->failchk pre/post processing. 27 * 28 * PUBLIC: int __env_failchk_pp __P((DB_ENV *, u_int32_t)); 29 */ 30int 31__env_failchk_pp(dbenv, flags) 32 DB_ENV *dbenv; 33 u_int32_t flags; 34{ 35 DB_THREAD_INFO *ip; 36 ENV *env; 37 int ret; 38 39 env = dbenv->env; 40 41 ENV_ILLEGAL_BEFORE_OPEN(env, "DB_ENV->failchk"); 42 43 /* 44 * ENV->failchk requires self and is-alive functions. We 45 * have a default self function, but no is-alive function. 46 */ 47 if (!ALIVE_ON(env)) { 48 __db_errx(env, 49 "DB_ENV->failchk requires DB_ENV->is_alive be configured"); 50 return (EINVAL); 51 } 52 53 if (flags != 0) 54 return (__db_ferr(env, "DB_ENV->failchk", 0)); 55 56 ENV_ENTER(env, ip); 57 58 /* 59 * We check for dead threads in the API first as this would be likely 60 * to hang other things we try later, like locks and transactions. 61 */ 62 if ((ret = __env_in_api(env)) != 0) 63 goto err; 64 65 if (LOCKING_ON(env) && (ret = __lock_failchk(env)) != 0) 66 goto err; 67 68 if (TXN_ON(env) && 69 ((ret = __txn_failchk(env)) != 0 || 70 (ret = __dbreg_failchk(env)) != 0)) 71 goto err; 72 73 /* Mark any dead blocked threads as dead. */ 74 __env_clear_state(env); 75 76#ifdef HAVE_MUTEX_SUPPORT 77 ret = __mut_failchk(env); 78#endif 79 80err: ENV_LEAVE(env, ip); 81 return (ret); 82} 83 84/* 85 * __env_thread_init -- 86 * Initialize the thread control block table. 87 * 88 * PUBLIC: int __env_thread_init __P((ENV *, int)); 89 */ 90int 91__env_thread_init(env, during_creation) 92 ENV *env; 93 int during_creation; 94{ 95 DB_ENV *dbenv; 96 DB_HASHTAB *htab; 97 REGENV *renv; 98 REGINFO *infop; 99 THREAD_INFO *thread; 100 int ret; 101 102 dbenv = env->dbenv; 103 infop = env->reginfo; 104 renv = infop->primary; 105 106 if (renv->thread_off == INVALID_ROFF) { 107 if (dbenv->thr_max == 0) { 108 env->thr_hashtab = NULL; 109 if (ALIVE_ON(env)) { 110 __db_errx(env, 111 "is_alive method specified but no thread region allocated"); 112 return (EINVAL); 113 } 114 return (0); 115 } 116 117 if (!during_creation) { 118 __db_errx(env, 119 "thread table must be allocated when the database environment is created"); 120 return (EINVAL); 121 } 122 123 if ((ret = 124 __env_alloc(infop, sizeof(THREAD_INFO), &thread)) != 0) { 125 __db_err(env, ret, 126 "unable to allocate a thread status block"); 127 return (ret); 128 } 129 memset(thread, 0, sizeof(*thread)); 130 renv->thread_off = R_OFFSET(infop, thread); 131 /* 132 * Set the number of buckets to be 1/8th the number of 133 * thread control blocks. This is rather arbitrary. 134 */ 135 thread->thr_nbucket = __db_tablesize(dbenv->thr_max / 8); 136 if ((ret = __env_alloc(infop, 137 thread->thr_nbucket * sizeof(DB_HASHTAB), &htab)) != 0) 138 return (ret); 139 thread->thr_hashoff = R_OFFSET(infop, htab); 140 __db_hashinit(htab, thread->thr_nbucket); 141 thread->thr_max = dbenv->thr_max; 142 } else { 143 thread = R_ADDR(infop, renv->thread_off); 144 htab = R_ADDR(infop, thread->thr_hashoff); 145 } 146 147 env->thr_hashtab = htab; 148 env->thr_nbucket = thread->thr_nbucket; 149 dbenv->thr_max = thread->thr_max; 150 return (0); 151} 152 153/* 154 * __env_thread_destroy -- 155 * Destroy the thread control block table. 156 * 157 * PUBLIC: void __env_thread_destroy __P((ENV *)); 158 */ 159void 160__env_thread_destroy(env) 161 ENV *env; 162{ 163 DB_HASHTAB *htab; 164 DB_THREAD_INFO *ip, *np; 165 REGENV *renv; 166 REGINFO *infop; 167 THREAD_INFO *thread; 168 u_int32_t i; 169 170 infop = env->reginfo; 171 renv = infop->primary; 172 if (renv->thread_off == INVALID_ROFF) 173 return; 174 175 thread = R_ADDR(infop, renv->thread_off); 176 if ((htab = env->thr_hashtab) != NULL) { 177 for (i = 0; i < env->thr_nbucket; i++) { 178 ip = SH_TAILQ_FIRST(&htab[i], __db_thread_info); 179 for (; ip != NULL; ip = np) { 180 np = SH_TAILQ_NEXT(ip, 181 dbth_links, __db_thread_info); 182 __env_alloc_free(infop, ip); 183 } 184 } 185 __env_alloc_free(infop, htab); 186 } 187 188 __env_alloc_free(infop, thread); 189 return; 190} 191 192/* 193 * __env_in_api -- 194 * Look for threads which died in the api and complain. 195 * If no threads died but there are blocked threads unpin 196 * any buffers they may have locked. 197 */ 198static int 199__env_in_api(env) 200 ENV *env; 201{ 202 DB_ENV *dbenv; 203 DB_HASHTAB *htab; 204 DB_THREAD_INFO *ip; 205 REGENV *renv; 206 REGINFO *infop; 207 THREAD_INFO *thread; 208 u_int32_t i; 209 int unpin, ret; 210 211 if ((htab = env->thr_hashtab) == NULL) 212 return (EINVAL); 213 214 dbenv = env->dbenv; 215 infop = env->reginfo; 216 renv = infop->primary; 217 thread = R_ADDR(infop, renv->thread_off); 218 unpin = 0; 219 220 for (i = 0; i < env->thr_nbucket; i++) 221 SH_TAILQ_FOREACH(ip, &htab[i], dbth_links, __db_thread_info) { 222 if (ip->dbth_state == THREAD_SLOT_NOT_IN_USE || 223 (ip->dbth_state == THREAD_OUT && 224 thread->thr_count < thread->thr_max)) 225 continue; 226 if (dbenv->is_alive( 227 dbenv, ip->dbth_pid, ip->dbth_tid, 0)) 228 continue; 229 if (ip->dbth_state == THREAD_BLOCKED) { 230 ip->dbth_state = THREAD_BLOCKED_DEAD; 231 unpin = 1; 232 continue; 233 } 234 if (ip->dbth_state == THREAD_OUT) { 235 ip->dbth_state = THREAD_SLOT_NOT_IN_USE; 236 continue; 237 } 238 return (__db_failed(env, 239 "Thread died in Berkeley DB library", 240 ip->dbth_pid, ip->dbth_tid)); 241 } 242 243 if (unpin == 0) 244 return (0); 245 246 for (i = 0; i < env->thr_nbucket; i++) 247 SH_TAILQ_FOREACH(ip, &htab[i], dbth_links, __db_thread_info) 248 if (ip->dbth_state == THREAD_BLOCKED_DEAD && 249 (ret = __memp_unpin_buffers(env, ip)) != 0) 250 return (ret); 251 252 return (0); 253} 254 255/* 256 * __env_clear_state -- 257 * Look for threads which died while blockedi and clear them.. 258 */ 259static void 260__env_clear_state(env) 261 ENV *env; 262{ 263 DB_HASHTAB *htab; 264 DB_THREAD_INFO *ip; 265 u_int32_t i; 266 267 htab = env->thr_hashtab; 268 for (i = 0; i < env->thr_nbucket; i++) 269 SH_TAILQ_FOREACH(ip, &htab[i], dbth_links, __db_thread_info) 270 if (ip->dbth_state == THREAD_BLOCKED_DEAD) 271 ip->dbth_state = THREAD_SLOT_NOT_IN_USE; 272} 273 274struct __db_threadid { 275 pid_t pid; 276 db_threadid_t tid; 277}; 278 279/* 280 * PUBLIC: int __env_set_state __P((ENV *, DB_THREAD_INFO **, DB_THREAD_STATE)); 281 */ 282int 283__env_set_state(env, ipp, state) 284 ENV *env; 285 DB_THREAD_INFO **ipp; 286 DB_THREAD_STATE state; 287{ 288 struct __db_threadid id; 289 DB_ENV *dbenv; 290 DB_HASHTAB *htab; 291 DB_THREAD_INFO *ip; 292 REGENV *renv; 293 REGINFO *infop; 294 THREAD_INFO *thread; 295 u_int32_t indx; 296 int ret; 297 298 dbenv = env->dbenv; 299 htab = env->thr_hashtab; 300 301 dbenv->thread_id(dbenv, &id.pid, &id.tid); 302 303 /* 304 * Hashing of thread ids. This is simple but could be replaced with 305 * something more expensive if needed. 306 */ 307#ifdef HAVE_SIMPLE_THREAD_TYPE 308 /* 309 * A thread ID may be a pointer, so explicitly cast to a pointer of 310 * the appropriate size before doing the bitwise XOR. 311 */ 312 indx = (u_int32_t)((uintptr_t)id.pid ^ (uintptr_t)id.tid); 313#else 314 indx = __ham_func5(NULL, &id.tid, sizeof(id.tid)); 315#endif 316 indx %= env->thr_nbucket; 317 SH_TAILQ_FOREACH(ip, &htab[indx], dbth_links, __db_thread_info) { 318#ifdef HAVE_SIMPLE_THREAD_TYPE 319 if (id.pid == ip->dbth_pid && id.tid == ip->dbth_tid) 320 break; 321#else 322 if (memcmp(&id.pid, &ip->dbth_pid, sizeof(id.pid)) != 0) 323 continue; 324 if (memcmp(&id.tid, &ip->dbth_tid, sizeof(id.tid)) != 0) 325 continue; 326 break; 327#endif 328 } 329 330#ifdef DIAGNOSTIC 331 /* A check to ensure the thread of control has been registered. */ 332 if (state == THREAD_VERIFY) { 333 DB_ASSERT(env, ip != NULL && ip->dbth_state != THREAD_OUT); 334 return (0); 335 } 336#endif 337 338 *ipp = NULL; 339 ret = 0; 340 if (ip == NULL) { 341 infop = env->reginfo; 342 renv = infop->primary; 343 thread = R_ADDR(infop, renv->thread_off); 344 MUTEX_LOCK(env, renv->mtx_regenv); 345 346 /* 347 * If we are passed the specified max, try to reclaim one from 348 * our queue. If failcheck has marked the slot not in use, we 349 * can take it, otherwise we must call is_alive before freeing 350 * it. 351 */ 352 if (thread->thr_count >= thread->thr_max) { 353 SH_TAILQ_FOREACH( 354 ip, &htab[indx], dbth_links, __db_thread_info) 355 if (ip->dbth_state == THREAD_SLOT_NOT_IN_USE || 356 (ip->dbth_state == THREAD_OUT && 357 ALIVE_ON(env) && !dbenv->is_alive( 358 dbenv, ip->dbth_pid, ip->dbth_tid, 0))) 359 break; 360 361 if (ip != NULL) { 362 DB_ASSERT(env, ip->dbth_pincount == 0); 363 goto init; 364 } 365 } 366 367 thread->thr_count++; 368 if ((ret = __env_alloc(infop, 369 sizeof(DB_THREAD_INFO), &ip)) == 0) { 370 memset(ip, 0, sizeof(*ip)); 371 /* 372 * This assumes we can link atomically since we do 373 * no locking here. We never use the backpointer 374 * so we only need to be able to write an offset 375 * atomically. 376 */ 377 SH_TAILQ_INSERT_HEAD( 378 &htab[indx], ip, dbth_links, __db_thread_info); 379 ip->dbth_pincount = 0; 380 ip->dbth_pinmax = PINMAX; 381 ip->dbth_pinlist = R_OFFSET(infop, ip->dbth_pinarray); 382 383init: ip->dbth_pid = id.pid; 384 ip->dbth_tid = id.tid; 385 ip->dbth_state = state; 386 } 387 MUTEX_UNLOCK(env, renv->mtx_regenv); 388 } else 389 ip->dbth_state = state; 390 *ipp = ip; 391 392 return (ret); 393} 394 395/* 396 * __env_thread_id_string -- 397 * Convert a thread id to a string. 398 * 399 * PUBLIC: char *__env_thread_id_string 400 * PUBLIC: __P((DB_ENV *, pid_t, db_threadid_t, char *)); 401 */ 402char * 403__env_thread_id_string(dbenv, pid, tid, buf) 404 DB_ENV *dbenv; 405 pid_t pid; 406 db_threadid_t tid; 407 char *buf; 408{ 409#ifdef HAVE_SIMPLE_THREAD_TYPE 410#ifdef UINT64_FMT 411 char fmt[20]; 412 413 snprintf(fmt, sizeof(fmt), "%s/%s", UINT64_FMT, UINT64_FMT); 414 snprintf(buf, 415 DB_THREADID_STRLEN, fmt, (u_int64_t)pid, (u_int64_t)(uintptr_t)tid); 416#else 417 snprintf(buf, DB_THREADID_STRLEN, "%lu/%lu", (u_long)pid, (u_long)tid); 418#endif 419#else 420#ifdef UINT64_FMT 421 char fmt[20]; 422 423 snprintf(fmt, sizeof(fmt), "%s/TID", UINT64_FMT); 424 snprintf(buf, DB_THREADID_STRLEN, fmt, (u_int64_t)pid); 425#else 426 snprintf(buf, DB_THREADID_STRLEN, "%lu/TID", (u_long)pid); 427#endif 428#endif 429 COMPQUIET(dbenv, NULL); 430 COMPQUIET(*(u_int8_t *)&tid, 0); 431 432 return (buf); 433} 434