dict_lmdb.c revision 1.1
1/*	$NetBSD: dict_lmdb.c,v 1.1 2014/07/06 19:27:58 tron Exp $	*/
2
3/*++
4/* NAME
5/*	dict_lmdb 3
6/* SUMMARY
7/*	dictionary manager interface to OpenLDAP LMDB files
8/* SYNOPSIS
9/*	#include <dict_lmdb.h>
10/*
11/*	size_t	dict_lmdb_map_size;
12/*
13/*	DICT	*dict_lmdb_open(path, open_flags, dict_flags)
14/*	const char *name;
15/*	const char *path;
16/*	int	open_flags;
17/*	int	dict_flags;
18/* DESCRIPTION
19/*	dict_lmdb_open() opens the named LMDB database and makes
20/*	it available via the generic interface described in
21/*	dict_open(3).
22/*
23/*	The dict_lmdb_map_size variable specifies the initial
24/*	database memory map size.  When a map becomes full its size
25/*	is doubled, and other programs pick up the size change.
26/* DIAGNOSTICS
27/*	Fatal errors: cannot open file, file write error, out of
28/*	memory.
29/* BUGS
30/*	The on-the-fly map resize operations require no concurrent
31/*	activity in the same database by other threads in the same
32/*	memory address space.
33/* SEE ALSO
34/*	dict(3) generic dictionary manager
35/* LICENSE
36/* .ad
37/* .fi
38/*	The Secure Mailer license must be distributed with this software.
39/* AUTHOR(S)
40/*	Howard Chu
41/*	Symas Corporation
42/*
43/*	Wietse Venema
44/*	IBM T.J. Watson Research
45/*	P.O. Box 704
46/*	Yorktown Heights, NY 10598, USA
47/*--*/
48
49#include <sys_defs.h>
50
51#ifdef HAS_LMDB
52
53/* System library. */
54
55#include <sys/stat.h>
56#include <string.h>
57#include <unistd.h>
58#include <limits.h>
59
60/* Utility library. */
61
62#include <msg.h>
63#include <mymalloc.h>
64#include <htable.h>
65#include <iostuff.h>
66#include <vstring.h>
67#include <myflock.h>
68#include <stringops.h>
69#include <slmdb.h>
70#include <dict.h>
71#include <dict_lmdb.h>
72#include <warn_stat.h>
73
74/* Application-specific. */
75
76typedef struct {
77    DICT    dict;			/* generic members */
78    SLMDB   slmdb;			/* sane LMDB API */
79    VSTRING *key_buf;			/* key buffer */
80    VSTRING *val_buf;			/* value buffer */
81} DICT_LMDB;
82
83 /*
84  * The LMDB database filename suffix happens to equal our DICT_TYPE_LMDB
85  * prefix, but that doesn't mean it is kosher to use DICT_TYPE_LMDB where a
86  * suffix is needed, so we define an explicit suffix here.
87  */
88#define DICT_LMDB_SUFFIX	"lmdb"
89
90 /*
91  * Make a safe string copy that is guaranteed to be null-terminated.
92  */
93#define SCOPY(buf, data, size) \
94    vstring_str(vstring_strncpy(buf ? buf : (buf = vstring_alloc(10)), data, size))
95
96 /*
97  * Postfix writers recover from a "map full" error by increasing the memory
98  * map size with a factor DICT_LMDB_SIZE_INCR (up to some limit) and
99  * retrying the transaction.
100  *
101  * Each dict(3) API call is retried no more than a few times. For bulk-mode
102  * transactions the number of retries is proportional to the size of the
103  * address space.
104  *
105  * We do not expose these details to the Postfix user interface. The purpose of
106  * Postfix is to solve problems, not punt them to the user.
107  */
108#ifndef SSIZE_T_MAX			/* The maximum map size */
109#define SSIZE_T_MAX __MAXINT__(ssize_t)	/* XXX Assumes two's complement */
110#endif
111
112#define DICT_LMDB_SIZE_INCR	2	/* Increase size by 1 bit on retry */
113#define DICT_LMDB_SIZE_MAX	SSIZE_T_MAX
114
115#define DICT_LMDB_API_RETRY_LIMIT 2	/* Retries per dict(3) API call */
116#define DICT_LMDB_BULK_RETRY_LIMIT \
117	((int) (2 * sizeof(size_t) * CHAR_BIT))	/* Retries per bulk-mode
118						 * transaction */
119
120size_t  dict_lmdb_map_size = 8192;	/* Minimum size without SIGSEGV */
121
122/* #define msg_verbose 1 */
123
124/* dict_lmdb_lookup - find database entry */
125
126static const char *dict_lmdb_lookup(DICT *dict, const char *name)
127{
128    DICT_LMDB *dict_lmdb = (DICT_LMDB *) dict;
129    MDB_val mdb_key;
130    MDB_val mdb_value;
131    const char *result = 0;
132    int     status, klen;
133
134    dict->error = 0;
135    klen = strlen(name);
136
137    /*
138     * Sanity check.
139     */
140    if ((dict->flags & (DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL)) == 0)
141	msg_panic("dict_lmdb_lookup: no DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL flag");
142
143    /*
144     * Optionally fold the key.
145     */
146    if (dict->flags & DICT_FLAG_FOLD_FIX) {
147	if (dict->fold_buf == 0)
148	    dict->fold_buf = vstring_alloc(10);
149	vstring_strcpy(dict->fold_buf, name);
150	name = lowercase(vstring_str(dict->fold_buf));
151    }
152
153    /*
154     * Acquire a shared lock.
155     */
156    if ((dict->flags & DICT_FLAG_LOCK)
157      && myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_SHARED) < 0)
158	msg_fatal("%s: lock dictionary: %m", dict->name);
159
160    /*
161     * See if this LMDB file was written with one null byte appended to key
162     * and value.
163     */
164    if (dict->flags & DICT_FLAG_TRY1NULL) {
165	mdb_key.mv_data = (void *) name;
166	mdb_key.mv_size = klen + 1;
167	status = slmdb_get(&dict_lmdb->slmdb, &mdb_key, &mdb_value);
168	if (status == 0) {
169	    dict->flags &= ~DICT_FLAG_TRY0NULL;
170	    result = SCOPY(dict_lmdb->val_buf, mdb_value.mv_data,
171			   mdb_value.mv_size);
172	} else if (status != MDB_NOTFOUND) {
173	    msg_fatal("error reading %s:%s: %s",
174		      dict_lmdb->dict.type, dict_lmdb->dict.name,
175		      mdb_strerror(status));
176	}
177    }
178
179    /*
180     * See if this LMDB file was written with no null byte appended to key
181     * and value.
182     */
183    if (result == 0 && (dict->flags & DICT_FLAG_TRY0NULL)) {
184	mdb_key.mv_data = (void *) name;
185	mdb_key.mv_size = klen;
186	status = slmdb_get(&dict_lmdb->slmdb, &mdb_key, &mdb_value);
187	if (status == 0) {
188	    dict->flags &= ~DICT_FLAG_TRY1NULL;
189	    result = SCOPY(dict_lmdb->val_buf, mdb_value.mv_data,
190			   mdb_value.mv_size);
191	} else if (status != MDB_NOTFOUND) {
192	    msg_fatal("error reading %s:%s: %s",
193		      dict_lmdb->dict.type, dict_lmdb->dict.name,
194		      mdb_strerror(status));
195	}
196    }
197
198    /*
199     * Release the shared lock.
200     */
201    if ((dict->flags & DICT_FLAG_LOCK)
202	&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_NONE) < 0)
203	msg_fatal("%s: unlock dictionary: %m", dict->name);
204
205    return (result);
206}
207
208/* dict_lmdb_update - add or update database entry */
209
210static int dict_lmdb_update(DICT *dict, const char *name, const char *value)
211{
212    DICT_LMDB *dict_lmdb = (DICT_LMDB *) dict;
213    MDB_val mdb_key;
214    MDB_val mdb_value;
215    int     status;
216
217    dict->error = 0;
218
219    /*
220     * Sanity check.
221     */
222    if ((dict->flags & (DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL)) == 0)
223	msg_panic("dict_lmdb_update: no DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL flag");
224
225    /*
226     * Optionally fold the key.
227     */
228    if (dict->flags & DICT_FLAG_FOLD_FIX) {
229	if (dict->fold_buf == 0)
230	    dict->fold_buf = vstring_alloc(10);
231	vstring_strcpy(dict->fold_buf, name);
232	name = lowercase(vstring_str(dict->fold_buf));
233    }
234    mdb_key.mv_data = (void *) name;
235
236    mdb_value.mv_data = (void *) value;
237    mdb_key.mv_size = strlen(name);
238    mdb_value.mv_size = strlen(value);
239
240    /*
241     * If undecided about appending a null byte to key and value, choose a
242     * default depending on the platform.
243     */
244    if ((dict->flags & DICT_FLAG_TRY1NULL)
245	&& (dict->flags & DICT_FLAG_TRY0NULL)) {
246#ifdef LMDB_NO_TRAILING_NULL
247	dict->flags &= ~DICT_FLAG_TRY1NULL;
248#else
249	dict->flags &= ~DICT_FLAG_TRY0NULL;
250#endif
251    }
252
253    /*
254     * Optionally append a null byte to key and value.
255     */
256    if (dict->flags & DICT_FLAG_TRY1NULL) {
257	mdb_key.mv_size++;
258	mdb_value.mv_size++;
259    }
260
261    /*
262     * Acquire an exclusive lock.
263     */
264    if ((dict->flags & DICT_FLAG_LOCK)
265    && myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_EXCLUSIVE) < 0)
266	msg_fatal("%s: lock dictionary: %m", dict->name);
267
268    /*
269     * Do the update.
270     */
271    status = slmdb_put(&dict_lmdb->slmdb, &mdb_key, &mdb_value,
272	       (dict->flags & DICT_FLAG_DUP_REPLACE) ? 0 : MDB_NOOVERWRITE);
273    if (status != 0) {
274	if (status == MDB_KEYEXIST) {
275	    if (dict->flags & DICT_FLAG_DUP_IGNORE)
276		 /* void */ ;
277	    else if (dict->flags & DICT_FLAG_DUP_WARN)
278		msg_warn("%s:%s: duplicate entry: \"%s\"",
279			 dict_lmdb->dict.type, dict_lmdb->dict.name, name);
280	    else
281		msg_fatal("%s:%s: duplicate entry: \"%s\"",
282			  dict_lmdb->dict.type, dict_lmdb->dict.name, name);
283	} else {
284	    msg_fatal("error updating %s:%s: %s",
285		      dict_lmdb->dict.type, dict_lmdb->dict.name,
286		      mdb_strerror(status));
287	}
288    }
289
290    /*
291     * Release the exclusive lock.
292     */
293    if ((dict->flags & DICT_FLAG_LOCK)
294	&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_NONE) < 0)
295	msg_fatal("%s: unlock dictionary: %m", dict->name);
296
297    return (status);
298}
299
300/* dict_lmdb_delete - delete one entry from the dictionary */
301
302static int dict_lmdb_delete(DICT *dict, const char *name)
303{
304    DICT_LMDB *dict_lmdb = (DICT_LMDB *) dict;
305    MDB_val mdb_key;
306    int     status = 1, klen;
307
308    dict->error = 0;
309    klen = strlen(name);
310
311    /*
312     * Sanity check.
313     */
314    if ((dict->flags & (DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL)) == 0)
315	msg_panic("dict_lmdb_delete: no DICT_FLAG_TRY1NULL | DICT_FLAG_TRY0NULL flag");
316
317    /*
318     * Optionally fold the key.
319     */
320    if (dict->flags & DICT_FLAG_FOLD_FIX) {
321	if (dict->fold_buf == 0)
322	    dict->fold_buf = vstring_alloc(10);
323	vstring_strcpy(dict->fold_buf, name);
324	name = lowercase(vstring_str(dict->fold_buf));
325    }
326
327    /*
328     * Acquire an exclusive lock.
329     */
330    if ((dict->flags & DICT_FLAG_LOCK)
331    && myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_EXCLUSIVE) < 0)
332	msg_fatal("%s: lock dictionary: %m", dict->name);
333
334    /*
335     * See if this LMDB file was written with one null byte appended to key
336     * and value.
337     */
338    if (dict->flags & DICT_FLAG_TRY1NULL) {
339	mdb_key.mv_data = (void *) name;
340	mdb_key.mv_size = klen + 1;
341	status = slmdb_del(&dict_lmdb->slmdb, &mdb_key);
342	if (status != 0) {
343	    if (status == MDB_NOTFOUND)
344		status = 1;
345	    else
346		msg_fatal("error deleting from %s:%s: %s",
347			  dict_lmdb->dict.type, dict_lmdb->dict.name,
348			  mdb_strerror(status));
349	} else {
350	    dict->flags &= ~DICT_FLAG_TRY0NULL;	/* found */
351	}
352    }
353
354    /*
355     * See if this LMDB file was written with no null byte appended to key
356     * and value.
357     */
358    if (status > 0 && (dict->flags & DICT_FLAG_TRY0NULL)) {
359	mdb_key.mv_data = (void *) name;
360	mdb_key.mv_size = klen;
361	status = slmdb_del(&dict_lmdb->slmdb, &mdb_key);
362	if (status != 0) {
363	    if (status == MDB_NOTFOUND)
364		status = 1;
365	    else
366		msg_fatal("error deleting from %s:%s: %s",
367			  dict_lmdb->dict.type, dict_lmdb->dict.name,
368			  mdb_strerror(status));
369	} else {
370	    dict->flags &= ~DICT_FLAG_TRY1NULL;	/* found */
371	}
372    }
373
374    /*
375     * Release the exclusive lock.
376     */
377    if ((dict->flags & DICT_FLAG_LOCK)
378	&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_NONE) < 0)
379	msg_fatal("%s: unlock dictionary: %m", dict->name);
380
381    return (status);
382}
383
384/* dict_lmdb_sequence - traverse the dictionary */
385
386static int dict_lmdb_sequence(DICT *dict, int function,
387			              const char **key, const char **value)
388{
389    const char *myname = "dict_lmdb_sequence";
390    DICT_LMDB *dict_lmdb = (DICT_LMDB *) dict;
391    MDB_val mdb_key;
392    MDB_val mdb_value;
393    MDB_cursor_op op;
394    int     status;
395
396    dict->error = 0;
397
398    /*
399     * Determine the seek function.
400     */
401    switch (function) {
402    case DICT_SEQ_FUN_FIRST:
403	op = MDB_FIRST;
404	break;
405    case DICT_SEQ_FUN_NEXT:
406	op = MDB_NEXT;
407	break;
408    default:
409	msg_panic("%s: invalid function: %d", myname, function);
410    }
411
412    /*
413     * Acquire a shared lock.
414     */
415    if ((dict->flags & DICT_FLAG_LOCK)
416      && myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_SHARED) < 0)
417	msg_fatal("%s: lock dictionary: %m", dict->name);
418
419    /*
420     * Database lookup.
421     */
422    status = slmdb_cursor_get(&dict_lmdb->slmdb, &mdb_key, &mdb_value, op);
423
424    switch (status) {
425
426	/*
427	 * Copy the key and value so they are guaranteed null terminated.
428	 */
429    case 0:
430	*key = SCOPY(dict_lmdb->key_buf, mdb_key.mv_data, mdb_key.mv_size);
431	if (mdb_value.mv_data != 0 && mdb_value.mv_size > 0)
432	    *value = SCOPY(dict_lmdb->val_buf, mdb_value.mv_data,
433			   mdb_value.mv_size);
434	break;
435
436	/*
437	 * End-of-database.
438	 */
439    case MDB_NOTFOUND:
440	status = 1;
441	/* Not: mdb_cursor_close(). Wrong abstraction level. */
442	break;
443
444	/*
445	 * Bust.
446	 */
447    default:
448	msg_fatal("error seeking %s:%s: %s",
449		  dict_lmdb->dict.type, dict_lmdb->dict.name,
450		  mdb_strerror(status));
451    }
452
453    /*
454     * Release the shared lock.
455     */
456    if ((dict->flags & DICT_FLAG_LOCK)
457	&& myflock(dict->lock_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_NONE) < 0)
458	msg_fatal("%s: unlock dictionary: %m", dict->name);
459
460    return (status);
461}
462
463/* dict_lmdb_close - disassociate from data base */
464
465static void dict_lmdb_close(DICT *dict)
466{
467    DICT_LMDB *dict_lmdb = (DICT_LMDB *) dict;
468
469    slmdb_close(&dict_lmdb->slmdb);
470    if (dict_lmdb->key_buf)
471	vstring_free(dict_lmdb->key_buf);
472    if (dict_lmdb->val_buf)
473	vstring_free(dict_lmdb->val_buf);
474    if (dict->fold_buf)
475	vstring_free(dict->fold_buf);
476    dict_free(dict);
477}
478
479/* dict_lmdb_longjmp - repeat bulk transaction */
480
481static void dict_lmdb_longjmp(void *context, int val)
482{
483    DICT_LMDB *dict_lmdb = (DICT_LMDB *) context;
484
485    dict_longjmp(&dict_lmdb->dict, val);
486}
487
488/* dict_lmdb_notify - debug logging */
489
490static void dict_lmdb_notify(void *context, int error_code,...)
491{
492    DICT_LMDB *dict_lmdb = (DICT_LMDB *) context;
493    va_list ap;
494
495    va_start(ap, error_code);
496    switch (error_code) {
497    case MDB_SUCCESS:
498	msg_info("database %s:%s: using size limit %lu during open",
499		 dict_lmdb->dict.type, dict_lmdb->dict.name,
500		 (unsigned long) va_arg(ap, size_t));
501	break;
502    case MDB_MAP_FULL:
503	msg_info("database %s:%s: using size limit %lu after MDB_MAP_FULL",
504		 dict_lmdb->dict.type, dict_lmdb->dict.name,
505		 (unsigned long) va_arg(ap, size_t));
506	break;
507    case MDB_MAP_RESIZED:
508	msg_info("database %s:%s: using size limit %lu after MDB_MAP_RESIZED",
509		 dict_lmdb->dict.type, dict_lmdb->dict.name,
510		 (unsigned long) va_arg(ap, size_t));
511	break;
512    case MDB_READERS_FULL:
513	msg_info("database %s:%s: pausing after MDB_READERS_FULL",
514		 dict_lmdb->dict.type, dict_lmdb->dict.name);
515	break;
516    default:
517	msg_warn("unknown MDB error code: %d", error_code);
518	break;
519    }
520    va_end(ap);
521}
522
523/* dict_lmdb_assert - report LMDB internal assertion failure */
524
525static void dict_lmdb_assert(void *context, const char *text)
526{
527    DICT_LMDB *dict_lmdb = (DICT_LMDB *) context;
528
529    msg_fatal("%s:%s: internal error: %s",
530	      dict_lmdb->dict.type, dict_lmdb->dict.name, text);
531}
532
533/* dict_lmdb_open - open LMDB data base */
534
535DICT   *dict_lmdb_open(const char *path, int open_flags, int dict_flags)
536{
537    DICT_LMDB *dict_lmdb;
538    DICT   *dict;
539    struct stat st;
540    SLMDB   slmdb;
541    char   *mdb_path;
542    int     mdb_flags, slmdb_flags, status;
543    int     db_fd;
544
545    /*
546     * Let the optimizer worry about eliminating redundant code.
547     */
548#define DICT_LMDB_OPEN_RETURN(d) { \
549	DICT *__d = (d); \
550	myfree(mdb_path); \
551	return (__d); \
552    } while (0)
553
554    mdb_path = concatenate(path, "." DICT_TYPE_LMDB, (char *) 0);
555
556    /*
557     * Impedance adapters.
558     */
559    mdb_flags = MDB_NOSUBDIR | MDB_NOLOCK;
560    if (open_flags == O_RDONLY)
561	mdb_flags |= MDB_RDONLY;
562
563    slmdb_flags = 0;
564    if (dict_flags & DICT_FLAG_BULK_UPDATE)
565	slmdb_flags |= SLMDB_FLAG_BULK;
566
567    /*
568     * Security violation.
569     *
570     * By default, LMDB 0.9.9 writes uninitialized heap memory to a
571     * world-readable database file, as chunks of up to 4096 bytes. This is a
572     * huge memory disclosure vulnerability: memory content that a program
573     * does not intend to share ends up in a world-readable file. The content
574     * of uninitialized heap memory depends on program execution history.
575     * That history includes code execution in other libraries that are
576     * linked into the program.
577     *
578     * This is a problem whenever the user who writes the database file differs
579     * from the user who reads the database file. For example, a privileged
580     * writer and an unprivileged reader. In the case of Postfix, the
581     * postmap(1) and postalias(1) commands would leak uninitialized heap
582     * memory, as chunks of up to 4096 bytes, from a root-privileged process
583     * that writes to a database file, to unprivileged processes that read
584     * from that database file.
585     *
586     * As a workaround the postmap(1) and postalias(1) commands turn on
587     * MDB_WRITEMAP which disables the use of malloc() in LMDB. However, that
588     * does not address several disclosures of stack memory. We don't enable
589     * this workaround for Postfix databases are maintained by Postfix daemon
590     * processes, because those are accessible only by the postfix user.
591     *
592     * LMDB 0.9.10 by default does not write uninitialized heap memory to file
593     * (specify MDB_NOMEMINIT to revert that change). We use the MDB_WRITEMAP
594     * workaround for older LMDB versions.
595     */
596#ifndef MDB_NOMEMINIT
597    if (dict_flags & DICT_FLAG_BULK_UPDATE)	/* XXX Good enough */
598	mdb_flags |= MDB_WRITEMAP;
599#endif
600
601    /*
602     * Gracefully handle most database open errors.
603     */
604    if ((status = slmdb_init(&slmdb, dict_lmdb_map_size, DICT_LMDB_SIZE_INCR,
605			     DICT_LMDB_SIZE_MAX)) != 0
606	|| (status = slmdb_open(&slmdb, mdb_path, open_flags, mdb_flags,
607				slmdb_flags)) != 0) {
608	/* This leaks a little memory that would have been used otherwise. */
609	dict = dict_surrogate(DICT_TYPE_LMDB, path, open_flags, dict_flags,
610		    "open database %s: %s", mdb_path, mdb_strerror(status));
611	DICT_LMDB_OPEN_RETURN(dict);
612    }
613
614    /*
615     * XXX Persistent locking belongs in mkmap_lmdb.
616     *
617     * We just need to acquire exclusive access momentarily. This establishes
618     * that no readers are accessing old (obsoleted by copy-on-write) txn
619     * snapshots, so we are free to reuse all eligible old pages. Downgrade
620     * the lock right after acquiring it. This is sufficient to keep out
621     * other writers until we are done.
622     */
623    db_fd = slmdb_fd(&slmdb);
624    if (dict_flags & DICT_FLAG_BULK_UPDATE) {
625	if (myflock(db_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_EXCLUSIVE) < 0)
626	    msg_fatal("%s: lock dictionary: %m", mdb_path);
627	if (myflock(db_fd, MYFLOCK_STYLE_FCNTL, MYFLOCK_OP_SHARED) < 0)
628	    msg_fatal("%s: unlock dictionary: %m", mdb_path);
629    }
630
631    /*
632     * Bundle up. From here on no more assignments to slmdb.
633     */
634    dict_lmdb = (DICT_LMDB *) dict_alloc(DICT_TYPE_LMDB, path, sizeof(*dict_lmdb));
635    dict_lmdb->slmdb = slmdb;
636    dict_lmdb->dict.lookup = dict_lmdb_lookup;
637    dict_lmdb->dict.update = dict_lmdb_update;
638    dict_lmdb->dict.delete = dict_lmdb_delete;
639    dict_lmdb->dict.sequence = dict_lmdb_sequence;
640    dict_lmdb->dict.close = dict_lmdb_close;
641
642    if (fstat(db_fd, &st) < 0)
643	msg_fatal("dict_lmdb_open: fstat: %m");
644    dict_lmdb->dict.lock_fd = dict_lmdb->dict.stat_fd = db_fd;
645    dict_lmdb->dict.lock_type = MYFLOCK_STYLE_FCNTL;
646    dict_lmdb->dict.mtime = st.st_mtime;
647    dict_lmdb->dict.owner.uid = st.st_uid;
648    dict_lmdb->dict.owner.status = (st.st_uid != 0);
649
650    dict_lmdb->key_buf = 0;
651    dict_lmdb->val_buf = 0;
652
653    /*
654     * Warn if the source file is newer than the indexed file, except when
655     * the source file changed only seconds ago.
656     */
657    if ((dict_flags & DICT_FLAG_LOCK) != 0
658	&& stat(path, &st) == 0
659	&& st.st_mtime > dict_lmdb->dict.mtime
660	&& st.st_mtime < time((time_t *) 0) - 100)
661	msg_warn("database %s is older than source file %s", mdb_path, path);
662
663#define DICT_LMDB_IMPL_FLAGS	(DICT_FLAG_FIXED | DICT_FLAG_MULTI_WRITER)
664
665    dict_lmdb->dict.flags = dict_flags | DICT_LMDB_IMPL_FLAGS;
666    if ((dict_flags & (DICT_FLAG_TRY0NULL | DICT_FLAG_TRY1NULL)) == 0)
667	dict_lmdb->dict.flags |= (DICT_FLAG_TRY0NULL | DICT_FLAG_TRY1NULL);
668    if (dict_flags & DICT_FLAG_FOLD_FIX)
669	dict_lmdb->dict.fold_buf = vstring_alloc(10);
670
671    if (dict_flags & DICT_FLAG_BULK_UPDATE)
672	dict_jmp_alloc(&dict_lmdb->dict);
673
674    /*
675     * The following requests return an error result only if we have serious
676     * memory corruption problem.
677     */
678    if (slmdb_control(&dict_lmdb->slmdb,
679		      SLMDB_CTL_API_RETRY_LIMIT, DICT_LMDB_API_RETRY_LIMIT,
680		      SLMDB_CTL_BULK_RETRY_LIMIT, DICT_LMDB_BULK_RETRY_LIMIT,
681		      SLMDB_CTL_LONGJMP_FN, dict_lmdb_longjmp,
682		      SLMDB_CTL_NOTIFY_FN, msg_verbose ?
683		      dict_lmdb_notify : (SLMDB_NOTIFY_FN) 0,
684		      SLMDB_CTL_ASSERT_FN, dict_lmdb_assert,
685		      SLMDB_CTL_CB_CONTEXT, (void *) dict_lmdb,
686		      SLMDB_CTL_END) != 0)
687	msg_panic("dict_lmdb_open: slmdb_control: %m");
688
689    if (msg_verbose)
690	dict_lmdb_notify((void *) dict_lmdb, MDB_SUCCESS,
691			 slmdb_curr_limit(&dict_lmdb->slmdb));
692
693    DICT_LMDB_OPEN_RETURN(DICT_DEBUG (&dict_lmdb->dict));
694}
695
696#endif
697