1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005,2008 Oracle.  All rights reserved.
5 *
6 * $Id: env_failchk.c,v 12.44 2008/03/12 20:52:53 mbrey Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#ifndef HAVE_SIMPLE_THREAD_TYPE
13#include "dbinc/db_page.h"
14#include "dbinc/hash.h"			/* Needed for call to __ham_func5. */
15#endif
16#include "dbinc/lock.h"
17#include "dbinc/log.h"
18#include "dbinc/mp.h"
19#include "dbinc/txn.h"
20
21static int __env_in_api __P((ENV *));
22static void __env_clear_state __P((ENV *));
23
24/*
25 * __env_failchk_pp --
26 *	ENV->failchk pre/post processing.
27 *
28 * PUBLIC: int __env_failchk_pp __P((DB_ENV *, u_int32_t));
29 */
30int
31__env_failchk_pp(dbenv, flags)
32	DB_ENV *dbenv;
33	u_int32_t flags;
34{
35	DB_THREAD_INFO *ip;
36	ENV *env;
37	int ret;
38
39	env = dbenv->env;
40
41	ENV_ILLEGAL_BEFORE_OPEN(env, "DB_ENV->failchk");
42
43	/*
44	 * ENV->failchk requires self and is-alive functions.  We
45	 * have a default self function, but no is-alive function.
46	 */
47	if (!ALIVE_ON(env)) {
48		__db_errx(env,
49	"DB_ENV->failchk requires DB_ENV->is_alive be configured");
50		return (EINVAL);
51	}
52
53	if (flags != 0)
54		return (__db_ferr(env, "DB_ENV->failchk", 0));
55
56	ENV_ENTER(env, ip);
57
58	/*
59	 * We check for dead threads in the API first as this would be likely
60	 * to hang other things we try later, like locks and transactions.
61	 */
62	if ((ret = __env_in_api(env)) != 0)
63		goto err;
64
65	if (LOCKING_ON(env) && (ret = __lock_failchk(env)) != 0)
66		goto err;
67
68	if (TXN_ON(env) &&
69	    ((ret = __txn_failchk(env)) != 0 ||
70	    (ret = __dbreg_failchk(env)) != 0))
71		goto err;
72
73	/* Mark any dead blocked threads as dead. */
74	__env_clear_state(env);
75
76#ifdef HAVE_MUTEX_SUPPORT
77	ret = __mut_failchk(env);
78#endif
79
80err:	ENV_LEAVE(env, ip);
81	return (ret);
82}
83
84/*
85 * __env_thread_init --
86 *	Initialize the thread control block table.
87 *
88 * PUBLIC: int __env_thread_init __P((ENV *, int));
89 */
90int
91__env_thread_init(env, during_creation)
92	ENV *env;
93	int during_creation;
94{
95	DB_ENV *dbenv;
96	DB_HASHTAB *htab;
97	REGENV *renv;
98	REGINFO *infop;
99	THREAD_INFO *thread;
100	int ret;
101
102	dbenv = env->dbenv;
103	infop = env->reginfo;
104	renv = infop->primary;
105
106	if (renv->thread_off == INVALID_ROFF) {
107		if (dbenv->thr_max == 0) {
108			env->thr_hashtab = NULL;
109			if (ALIVE_ON(env)) {
110				__db_errx(env,
111		"is_alive method specified but no thread region allocated");
112				return (EINVAL);
113			}
114			return (0);
115		}
116
117		if (!during_creation) {
118			__db_errx(env,
119    "thread table must be allocated when the database environment is created");
120			return (EINVAL);
121		}
122
123		if ((ret =
124		    __env_alloc(infop, sizeof(THREAD_INFO), &thread)) != 0) {
125			__db_err(env, ret,
126			     "unable to allocate a thread status block");
127			return (ret);
128		}
129		memset(thread, 0, sizeof(*thread));
130		renv->thread_off = R_OFFSET(infop, thread);
131		/*
132		 * Set the number of buckets to be 1/8th the number of
133		 * thread control blocks.  This is rather arbitrary.
134		 */
135		thread->thr_nbucket = __db_tablesize(dbenv->thr_max / 8);
136		if ((ret = __env_alloc(infop,
137		     thread->thr_nbucket * sizeof(DB_HASHTAB), &htab)) != 0)
138			return (ret);
139		thread->thr_hashoff = R_OFFSET(infop, htab);
140		__db_hashinit(htab, thread->thr_nbucket);
141		thread->thr_max = dbenv->thr_max;
142	} else {
143		thread = R_ADDR(infop, renv->thread_off);
144		htab = R_ADDR(infop, thread->thr_hashoff);
145	}
146
147	env->thr_hashtab = htab;
148	env->thr_nbucket = thread->thr_nbucket;
149	dbenv->thr_max = thread->thr_max;
150	return (0);
151}
152
153/*
154 * __env_thread_destroy --
155 *	Destroy the thread control block table.
156 *
157 * PUBLIC: void __env_thread_destroy __P((ENV *));
158 */
159void
160__env_thread_destroy(env)
161	ENV *env;
162{
163	DB_HASHTAB *htab;
164	DB_THREAD_INFO *ip, *np;
165	REGENV *renv;
166	REGINFO *infop;
167	THREAD_INFO *thread;
168	u_int32_t i;
169
170	infop = env->reginfo;
171	renv = infop->primary;
172	if (renv->thread_off == INVALID_ROFF)
173		return;
174
175	thread = R_ADDR(infop, renv->thread_off);
176	if ((htab = env->thr_hashtab) != NULL) {
177		for (i = 0; i < env->thr_nbucket; i++) {
178			ip = SH_TAILQ_FIRST(&htab[i], __db_thread_info);
179			for (; ip != NULL; ip = np) {
180				np = SH_TAILQ_NEXT(ip,
181				    dbth_links, __db_thread_info);
182				__env_alloc_free(infop, ip);
183			}
184		}
185		__env_alloc_free(infop, htab);
186	}
187
188	__env_alloc_free(infop, thread);
189	return;
190}
191
192/*
193 * __env_in_api --
194 *	Look for threads which died in the api and complain.
195 *	If no threads died but there are blocked threads unpin
196 *	any buffers they may have locked.
197 */
198static int
199__env_in_api(env)
200	ENV *env;
201{
202	DB_ENV *dbenv;
203	DB_HASHTAB *htab;
204	DB_THREAD_INFO *ip;
205	REGENV *renv;
206	REGINFO *infop;
207	THREAD_INFO *thread;
208	u_int32_t i;
209	int unpin, ret;
210
211	if ((htab = env->thr_hashtab) == NULL)
212		return (EINVAL);
213
214	dbenv = env->dbenv;
215	infop = env->reginfo;
216	renv = infop->primary;
217	thread = R_ADDR(infop, renv->thread_off);
218	unpin = 0;
219
220	for (i = 0; i < env->thr_nbucket; i++)
221		SH_TAILQ_FOREACH(ip, &htab[i], dbth_links, __db_thread_info) {
222			if (ip->dbth_state == THREAD_SLOT_NOT_IN_USE ||
223			    (ip->dbth_state == THREAD_OUT &&
224			    thread->thr_count <  thread->thr_max))
225				continue;
226			if (dbenv->is_alive(
227			    dbenv, ip->dbth_pid, ip->dbth_tid, 0))
228				continue;
229			if (ip->dbth_state == THREAD_BLOCKED) {
230				ip->dbth_state = THREAD_BLOCKED_DEAD;
231				unpin = 1;
232				continue;
233			}
234			if (ip->dbth_state == THREAD_OUT) {
235				ip->dbth_state = THREAD_SLOT_NOT_IN_USE;
236				continue;
237			}
238			return (__db_failed(env,
239			     "Thread died in Berkeley DB library",
240			     ip->dbth_pid, ip->dbth_tid));
241		}
242
243	if (unpin == 0)
244		return (0);
245
246	for (i = 0; i < env->thr_nbucket; i++)
247		SH_TAILQ_FOREACH(ip, &htab[i], dbth_links, __db_thread_info)
248			if (ip->dbth_state == THREAD_BLOCKED_DEAD &&
249			    (ret = __memp_unpin_buffers(env, ip)) != 0)
250				return (ret);
251
252	return (0);
253}
254
255/*
256 * __env_clear_state --
257 *	Look for threads which died while blockedi and clear them..
258 */
259static void
260__env_clear_state(env)
261	ENV *env;
262{
263	DB_HASHTAB *htab;
264	DB_THREAD_INFO *ip;
265	u_int32_t i;
266
267	htab = env->thr_hashtab;
268	for (i = 0; i < env->thr_nbucket; i++)
269		SH_TAILQ_FOREACH(ip, &htab[i], dbth_links, __db_thread_info)
270			if (ip->dbth_state == THREAD_BLOCKED_DEAD)
271				ip->dbth_state = THREAD_SLOT_NOT_IN_USE;
272}
273
274struct __db_threadid {
275	pid_t pid;
276	db_threadid_t tid;
277};
278
279/*
280 * PUBLIC: int __env_set_state __P((ENV *, DB_THREAD_INFO **, DB_THREAD_STATE));
281 */
282int
283__env_set_state(env, ipp, state)
284	ENV *env;
285	DB_THREAD_INFO **ipp;
286	DB_THREAD_STATE state;
287{
288	struct __db_threadid id;
289	DB_ENV *dbenv;
290	DB_HASHTAB *htab;
291	DB_THREAD_INFO *ip;
292	REGENV *renv;
293	REGINFO *infop;
294	THREAD_INFO *thread;
295	u_int32_t indx;
296	int ret;
297
298	dbenv = env->dbenv;
299	htab = env->thr_hashtab;
300
301	dbenv->thread_id(dbenv, &id.pid, &id.tid);
302
303	/*
304	 * Hashing of thread ids.  This is simple but could be replaced with
305	 * something more expensive if needed.
306	 */
307#ifdef HAVE_SIMPLE_THREAD_TYPE
308	/*
309	 * A thread ID may be a pointer, so explicitly cast to a pointer of
310	 * the appropriate size before doing the bitwise XOR.
311	 */
312	indx = (u_int32_t)((uintptr_t)id.pid ^ (uintptr_t)id.tid);
313#else
314	indx = __ham_func5(NULL, &id.tid, sizeof(id.tid));
315#endif
316	indx %= env->thr_nbucket;
317	SH_TAILQ_FOREACH(ip, &htab[indx], dbth_links, __db_thread_info) {
318#ifdef HAVE_SIMPLE_THREAD_TYPE
319		if (id.pid == ip->dbth_pid && id.tid == ip->dbth_tid)
320			break;
321#else
322		if (memcmp(&id.pid, &ip->dbth_pid, sizeof(id.pid)) != 0)
323			continue;
324		if (memcmp(&id.tid, &ip->dbth_tid, sizeof(id.tid)) != 0)
325			continue;
326		break;
327#endif
328	}
329
330#ifdef DIAGNOSTIC
331	/* A check to ensure the thread of control has been registered. */
332	if (state == THREAD_VERIFY) {
333		DB_ASSERT(env, ip != NULL && ip->dbth_state != THREAD_OUT);
334		return (0);
335	}
336#endif
337
338	*ipp = NULL;
339	ret = 0;
340	if (ip == NULL) {
341		infop = env->reginfo;
342		renv = infop->primary;
343		thread = R_ADDR(infop, renv->thread_off);
344		MUTEX_LOCK(env, renv->mtx_regenv);
345
346		/*
347		 * If we are passed the specified max, try to reclaim one from
348		 * our queue.  If failcheck has marked the slot not in use, we
349		 * can take it, otherwise we must call is_alive before freeing
350		 * it.
351		 */
352		if (thread->thr_count >= thread->thr_max) {
353			SH_TAILQ_FOREACH(
354			    ip, &htab[indx], dbth_links, __db_thread_info)
355				if (ip->dbth_state == THREAD_SLOT_NOT_IN_USE ||
356				    (ip->dbth_state == THREAD_OUT &&
357				    ALIVE_ON(env) && !dbenv->is_alive(
358				    dbenv, ip->dbth_pid, ip->dbth_tid, 0)))
359					break;
360
361			if (ip != NULL) {
362				DB_ASSERT(env, ip->dbth_pincount == 0);
363				goto init;
364			}
365		}
366
367		thread->thr_count++;
368		if ((ret = __env_alloc(infop,
369		     sizeof(DB_THREAD_INFO), &ip)) == 0) {
370			memset(ip, 0, sizeof(*ip));
371			/*
372			 * This assumes we can link atomically since we do
373			 * no locking here.  We never use the backpointer
374			 * so we only need to be able to write an offset
375			 * atomically.
376			 */
377			SH_TAILQ_INSERT_HEAD(
378			    &htab[indx], ip, dbth_links, __db_thread_info);
379			ip->dbth_pincount = 0;
380			ip->dbth_pinmax = PINMAX;
381			ip->dbth_pinlist = R_OFFSET(infop, ip->dbth_pinarray);
382
383init:			ip->dbth_pid = id.pid;
384			ip->dbth_tid = id.tid;
385			ip->dbth_state = state;
386		}
387		MUTEX_UNLOCK(env, renv->mtx_regenv);
388	} else
389		ip->dbth_state = state;
390	*ipp = ip;
391
392	return (ret);
393}
394
395/*
396 * __env_thread_id_string --
397 *	Convert a thread id to a string.
398 *
399 * PUBLIC: char *__env_thread_id_string
400 * PUBLIC:     __P((DB_ENV *, pid_t, db_threadid_t, char *));
401 */
402char *
403__env_thread_id_string(dbenv, pid, tid, buf)
404	DB_ENV *dbenv;
405	pid_t pid;
406	db_threadid_t tid;
407	char *buf;
408{
409#ifdef HAVE_SIMPLE_THREAD_TYPE
410#ifdef UINT64_FMT
411	char fmt[20];
412
413	snprintf(fmt, sizeof(fmt), "%s/%s", UINT64_FMT, UINT64_FMT);
414	snprintf(buf,
415	    DB_THREADID_STRLEN, fmt, (u_int64_t)pid, (u_int64_t)(uintptr_t)tid);
416#else
417	snprintf(buf, DB_THREADID_STRLEN, "%lu/%lu", (u_long)pid, (u_long)tid);
418#endif
419#else
420#ifdef UINT64_FMT
421	char fmt[20];
422
423	snprintf(fmt, sizeof(fmt), "%s/TID", UINT64_FMT);
424	snprintf(buf, DB_THREADID_STRLEN, fmt, (u_int64_t)pid);
425#else
426	snprintf(buf, DB_THREADID_STRLEN, "%lu/TID", (u_long)pid);
427#endif
428#endif
429	COMPQUIET(dbenv, NULL);
430	COMPQUIET(*(u_int8_t *)&tid, 0);
431
432	return (buf);
433}
434