1/*	$NetBSD: backglue.c,v 1.1.1.4 2010/12/12 15:22:19 adam Exp $	*/
2
3/* backglue.c - backend glue */
4/* OpenLDAP: pkg/ldap/servers/slapd/backglue.c,v 1.112.2.25 2010/06/10 19:33:40 quanah Exp */
5/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
6 *
7 * Copyright 2001-2010 The OpenLDAP Foundation.
8 * All rights reserved.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted only as authorized by the OpenLDAP
12 * Public License.
13 *
14 * A copy of this license is available in the file LICENSE in the
15 * top-level directory of the distribution or, alternatively, at
16 * <http://www.OpenLDAP.org/license.html>.
17 */
18
19/*
20 * Functions to glue a bunch of other backends into a single tree.
21 * All of the glued backends must share a common suffix. E.g., you
22 * can glue o=foo and ou=bar,o=foo but you can't glue o=foo and o=bar.
23 *
24 * The purpose of these functions is to allow you to split a single database
25 * into pieces (for load balancing purposes, whatever) but still be able
26 * to treat it as a single database after it's been split. As such, each
27 * of the glued backends should have identical rootdn.
28 *  -- Howard Chu
29 */
30
31#include "portable.h"
32
33#include <stdio.h>
34
35#include <ac/string.h>
36#include <ac/socket.h>
37
38#define SLAPD_TOOLS
39#include "slap.h"
40#include "lutil.h"
41#include "config.h"
42
43typedef struct gluenode {
44	BackendDB *gn_be;
45	struct berval gn_pdn;
46} gluenode;
47
48typedef struct glueinfo {
49	int gi_nodes;
50	struct berval gi_pdn;
51	gluenode gi_n[1];
52} glueinfo;
53
54static slap_overinst	glue;
55
56static int glueMode;
57static BackendDB *glueBack;
58static BackendDB glueBackDone;
59#define GLUEBACK_DONE (&glueBackDone)
60
61static slap_overinst * glue_tool_inst( BackendInfo *bi);
62
63static slap_response glue_op_response;
64
65/* Just like select_backend, but only for our backends */
66static BackendDB *
67glue_back_select (
68	BackendDB *be,
69	struct berval *dn
70)
71{
72	slap_overinst	*on = (slap_overinst *)be->bd_info;
73	glueinfo		*gi = (glueinfo *)on->on_bi.bi_private;
74	int i;
75
76	for (i = gi->gi_nodes-1; i >= 0; i--) {
77		assert( gi->gi_n[i].gn_be->be_nsuffix != NULL );
78
79		if (dnIsSuffix(dn, &gi->gi_n[i].gn_be->be_nsuffix[0])) {
80			return gi->gi_n[i].gn_be;
81		}
82	}
83	be->bd_info = on->on_info->oi_orig;
84	return be;
85}
86
87
88typedef struct glue_state {
89	char *matched;
90	BerVarray refs;
91	LDAPControl **ctrls;
92	int err;
93	int matchlen;
94	int nrefs;
95	int nctrls;
96} glue_state;
97
98static int
99glue_op_cleanup( Operation *op, SlapReply *rs )
100{
101	/* This is not a final result */
102	if (rs->sr_type == REP_RESULT )
103		rs->sr_type = REP_GLUE_RESULT;
104	return SLAP_CB_CONTINUE;
105}
106
107static int
108glue_op_response ( Operation *op, SlapReply *rs )
109{
110	glue_state *gs = op->o_callback->sc_private;
111
112	switch(rs->sr_type) {
113	case REP_SEARCH:
114	case REP_SEARCHREF:
115	case REP_INTERMEDIATE:
116		return SLAP_CB_CONTINUE;
117
118	default:
119		if (rs->sr_err == LDAP_SUCCESS ||
120			rs->sr_err == LDAP_SIZELIMIT_EXCEEDED ||
121			rs->sr_err == LDAP_TIMELIMIT_EXCEEDED ||
122			rs->sr_err == LDAP_ADMINLIMIT_EXCEEDED ||
123			rs->sr_err == LDAP_NO_SUCH_OBJECT ||
124			gs->err != LDAP_SUCCESS)
125			gs->err = rs->sr_err;
126		if (gs->err == LDAP_SUCCESS && gs->matched) {
127			ch_free (gs->matched);
128			gs->matched = NULL;
129			gs->matchlen = 0;
130		}
131		if (gs->err != LDAP_SUCCESS && rs->sr_matched) {
132			int len;
133			len = strlen (rs->sr_matched);
134			if (len > gs->matchlen) {
135				if (gs->matched)
136					ch_free (gs->matched);
137				gs->matched = ch_strdup (rs->sr_matched);
138				gs->matchlen = len;
139			}
140		}
141		if (rs->sr_ref) {
142			int i, j, k;
143			BerVarray new;
144
145			for (i=0; rs->sr_ref[i].bv_val; i++);
146
147			j = gs->nrefs;
148			if (!j) {
149				new = ch_malloc ((i+1)*sizeof(struct berval));
150			} else {
151				new = ch_realloc(gs->refs,
152					(j+i+1)*sizeof(struct berval));
153			}
154			for (k=0; k<i; j++,k++) {
155				ber_dupbv( &new[j], &rs->sr_ref[k] );
156			}
157			new[j].bv_val = NULL;
158			gs->nrefs = j;
159			gs->refs = new;
160		}
161		if (rs->sr_ctrls) {
162			int i, j, k;
163			LDAPControl **newctrls;
164
165			for (i=0; rs->sr_ctrls[i]; i++);
166
167			j = gs->nctrls;
168			if (!j) {
169				newctrls = op->o_tmpalloc((i+1)*sizeof(LDAPControl *),
170					op->o_tmpmemctx);
171			} else {
172				/* Forget old pagedResults response if we're sending
173				 * a new one now
174				 */
175				if ( get_pagedresults( op ) > SLAP_CONTROL_IGNORED ) {
176					int newpage = 0;
177					for ( k=0; k<i; k++ ) {
178						if ( !strcmp(rs->sr_ctrls[k]->ldctl_oid,
179							LDAP_CONTROL_PAGEDRESULTS )) {
180							newpage = 1;
181							break;
182						}
183					}
184					if ( newpage ) {
185						for ( k=0; k<j; k++ ) {
186							if ( !strcmp(gs->ctrls[k]->ldctl_oid,
187								LDAP_CONTROL_PAGEDRESULTS ))
188							{
189								op->o_tmpfree(gs->ctrls[k], op->o_tmpmemctx);
190								gs->ctrls[k] = gs->ctrls[--j];
191								gs->ctrls[j] = NULL;
192								break;
193							}
194						}
195					}
196				}
197				newctrls = op->o_tmprealloc(gs->ctrls,
198					(j+i+1)*sizeof(LDAPControl *), op->o_tmpmemctx);
199			}
200			for (k=0; k<i; j++,k++) {
201				ber_len_t oidlen = strlen( rs->sr_ctrls[k]->ldctl_oid );
202				newctrls[j] = op->o_tmpalloc(sizeof(LDAPControl) + oidlen + 1 + rs->sr_ctrls[k]->ldctl_value.bv_len + 1,
203					op->o_tmpmemctx);
204				newctrls[j]->ldctl_iscritical = rs->sr_ctrls[k]->ldctl_iscritical;
205				newctrls[j]->ldctl_oid = (char *)&newctrls[j][1];
206				lutil_strcopy( newctrls[j]->ldctl_oid, rs->sr_ctrls[k]->ldctl_oid );
207				if ( !BER_BVISNULL( &rs->sr_ctrls[k]->ldctl_value ) ) {
208					newctrls[j]->ldctl_value.bv_val = &newctrls[j]->ldctl_oid[oidlen + 1];
209					newctrls[j]->ldctl_value.bv_len = rs->sr_ctrls[k]->ldctl_value.bv_len;
210					lutil_memcopy( newctrls[j]->ldctl_value.bv_val,
211						rs->sr_ctrls[k]->ldctl_value.bv_val,
212						rs->sr_ctrls[k]->ldctl_value.bv_len + 1 );
213				} else {
214					BER_BVZERO( &newctrls[j]->ldctl_value );
215				}
216			}
217			newctrls[j] = NULL;
218			gs->nctrls = j;
219			gs->ctrls = newctrls;
220		}
221	}
222	return 0;
223}
224
225static int
226glue_op_func ( Operation *op, SlapReply *rs )
227{
228	slap_overinst	*on = (slap_overinst *)op->o_bd->bd_info;
229	BackendDB *b0 = op->o_bd;
230	BackendInfo *bi0 = op->o_bd->bd_info;
231	BI_op_modify **func;
232	slap_operation_t which = op_bind;
233	int rc;
234
235	op->o_bd = glue_back_select (b0, &op->o_req_ndn);
236
237	/* If we're on the master backend, let overlay framework handle it */
238	if ( op->o_bd == b0 )
239		return SLAP_CB_CONTINUE;
240
241	b0->bd_info = on->on_info->oi_orig;
242
243	switch(op->o_tag) {
244	case LDAP_REQ_ADD: which = op_add; break;
245	case LDAP_REQ_DELETE: which = op_delete; break;
246	case LDAP_REQ_MODIFY: which = op_modify; break;
247	case LDAP_REQ_MODRDN: which = op_modrdn; break;
248	case LDAP_REQ_EXTENDED: which = op_extended; break;
249	default: assert( 0 ); break;
250	}
251
252	func = &op->o_bd->bd_info->bi_op_bind;
253	if ( func[which] )
254		rc = func[which]( op, rs );
255	else
256		rc = SLAP_CB_BYPASS;
257
258	op->o_bd = b0;
259	op->o_bd->bd_info = bi0;
260	return rc;
261}
262
263static int
264glue_response ( Operation *op, SlapReply *rs )
265{
266	BackendDB *be = op->o_bd;
267	be = glue_back_select (op->o_bd, &op->o_req_ndn);
268
269	/* If we're on the master backend, let overlay framework handle it.
270	 * Otherwise, bail out.
271	 */
272	return ( op->o_bd == be ) ? SLAP_CB_CONTINUE : SLAP_CB_BYPASS;
273}
274
275static int
276glue_chk_referrals ( Operation *op, SlapReply *rs )
277{
278	slap_overinst	*on = (slap_overinst *)op->o_bd->bd_info;
279	BackendDB *b0 = op->o_bd;
280	BackendInfo *bi0 = op->o_bd->bd_info;
281	int rc;
282
283	op->o_bd = glue_back_select (b0, &op->o_req_ndn);
284	if ( op->o_bd == b0 )
285		return SLAP_CB_CONTINUE;
286
287	b0->bd_info = on->on_info->oi_orig;
288
289	if ( op->o_bd->bd_info->bi_chk_referrals )
290		rc = ( *op->o_bd->bd_info->bi_chk_referrals )( op, rs );
291	else
292		rc = SLAP_CB_CONTINUE;
293
294	op->o_bd = b0;
295	op->o_bd->bd_info = bi0;
296	return rc;
297}
298
299static int
300glue_chk_controls ( Operation *op, SlapReply *rs )
301{
302	slap_overinst	*on = (slap_overinst *)op->o_bd->bd_info;
303	BackendDB *b0 = op->o_bd;
304	BackendInfo *bi0 = op->o_bd->bd_info;
305	int rc = SLAP_CB_CONTINUE;
306
307	op->o_bd = glue_back_select (b0, &op->o_req_ndn);
308	if ( op->o_bd == b0 )
309		return SLAP_CB_CONTINUE;
310
311	b0->bd_info = on->on_info->oi_orig;
312
313	/* if the subordinate database has overlays, the bi_chk_controls()
314	 * hook is actually over_aux_chk_controls(); in case it actually
315	 * wraps a missing hok, we need to mimic the behavior
316	 * of the frontend applied to that database */
317	if ( op->o_bd->bd_info->bi_chk_controls ) {
318		rc = ( *op->o_bd->bd_info->bi_chk_controls )( op, rs );
319	}
320
321
322	if ( rc == SLAP_CB_CONTINUE ) {
323		rc = backend_check_controls( op, rs );
324	}
325
326	op->o_bd = b0;
327	op->o_bd->bd_info = bi0;
328	return rc;
329}
330
331/* ITS#4615 - overlays configured above the glue overlay should be
332 * invoked for the entire glued tree. Overlays configured below the
333 * glue overlay should only be invoked on the master backend.
334 * So, if we're searching on any subordinates, we need to force the
335 * current overlay chain to stop processing, without stopping the
336 * overall callback flow.
337 */
338static int
339glue_sub_search( Operation *op, SlapReply *rs, BackendDB *b0,
340	slap_overinst *on )
341{
342	/* Process any overlays on the master backend */
343	if ( op->o_bd == b0 && on->on_next ) {
344		BackendInfo *bi = op->o_bd->bd_info;
345		int rc = SLAP_CB_CONTINUE;
346		for ( on=on->on_next; on; on=on->on_next ) {
347			op->o_bd->bd_info = (BackendInfo *)on;
348			if ( on->on_bi.bi_op_search ) {
349				rc = on->on_bi.bi_op_search( op, rs );
350				if ( rc != SLAP_CB_CONTINUE )
351					break;
352			}
353		}
354		op->o_bd->bd_info = bi;
355		if ( rc != SLAP_CB_CONTINUE )
356			return rc;
357	}
358	return op->o_bd->be_search( op, rs );
359}
360
361static const ID glueID = NOID;
362static const struct berval gluecookie = { sizeof( glueID ), (char *)&glueID };
363
364static int
365glue_op_search ( Operation *op, SlapReply *rs )
366{
367	slap_overinst	*on = (slap_overinst *)op->o_bd->bd_info;
368	glueinfo		*gi = (glueinfo *)on->on_bi.bi_private;
369	BackendDB *b0 = op->o_bd;
370	BackendDB *b1 = NULL, *btmp;
371	BackendInfo *bi0 = op->o_bd->bd_info;
372	int i;
373	long stoptime = 0, starttime;
374	glue_state gs = {NULL, NULL, NULL, 0, 0, 0, 0};
375	slap_callback cb = { NULL, glue_op_response, glue_op_cleanup, NULL };
376	int scope0, tlimit0;
377	struct berval dn, ndn, *pdn;
378
379	cb.sc_private = &gs;
380
381	cb.sc_next = op->o_callback;
382
383	starttime = op->o_time;
384	stoptime = slap_get_time () + op->ors_tlimit;
385
386	/* reset dummy cookie used to keep paged results going across databases */
387	if ( get_pagedresults( op ) > SLAP_CONTROL_IGNORED
388		&& bvmatch( &((PagedResultsState *)op->o_pagedresults_state)->ps_cookieval, &gluecookie ) )
389	{
390		PagedResultsState *ps = op->o_pagedresults_state;
391		BerElementBuffer berbuf;
392		BerElement *ber = (BerElement *)&berbuf;
393		struct berval cookie = BER_BVC(""), value;
394		int c;
395
396		for (c = 0; op->o_ctrls[c] != NULL; c++) {
397			if (strcmp(op->o_ctrls[c]->ldctl_oid, LDAP_CONTROL_PAGEDRESULTS) == 0)
398				break;
399		}
400
401		assert( op->o_ctrls[c] != NULL );
402
403		ber_init2( ber, NULL, LBER_USE_DER );
404		ber_printf( ber, "{iO}", ps->ps_size, &cookie );
405		ber_flatten2( ber, &value, 0 );
406		assert( op->o_ctrls[c]->ldctl_value.bv_len >= value.bv_len );
407		op->o_ctrls[c]->ldctl_value.bv_len = value.bv_len;
408		lutil_memcopy( op->o_ctrls[c]->ldctl_value.bv_val,
409			value.bv_val, value.bv_len );
410		ber_free_buf( ber );
411
412		ps->ps_cookie = (PagedResultsCookie)0;
413		BER_BVZERO( &ps->ps_cookieval );
414	}
415
416	op->o_bd = glue_back_select (b0, &op->o_req_ndn);
417	b0->bd_info = on->on_info->oi_orig;
418
419	switch (op->ors_scope) {
420	case LDAP_SCOPE_BASE:
421		if ( op->o_bd == b0 )
422			return SLAP_CB_CONTINUE;
423
424		rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
425		if (op->o_bd && op->o_bd->be_search) {
426			rs->sr_err = op->o_bd->be_search( op, rs );
427		}
428		return rs->sr_err;
429
430	case LDAP_SCOPE_ONELEVEL:
431	case LDAP_SCOPE_SUBTREE:
432	case LDAP_SCOPE_SUBORDINATE: /* FIXME */
433		op->o_callback = &cb;
434		rs->sr_err = gs.err = LDAP_UNWILLING_TO_PERFORM;
435		scope0 = op->ors_scope;
436		tlimit0 = op->ors_tlimit;
437		dn = op->o_req_dn;
438		ndn = op->o_req_ndn;
439		b1 = op->o_bd;
440
441		/*
442		 * Execute in reverse order, most specific first
443		 */
444		for (i = gi->gi_nodes; i >= 0; i--) {
445			if ( i == gi->gi_nodes ) {
446				btmp = b0;
447				pdn = &gi->gi_pdn;
448			} else {
449				btmp = gi->gi_n[i].gn_be;
450				pdn = &gi->gi_n[i].gn_pdn;
451			}
452			if (!btmp || !btmp->be_search)
453				continue;
454			if (!dnIsSuffix(&btmp->be_nsuffix[0], &b1->be_nsuffix[0]))
455				continue;
456			if (get_no_subordinate_glue(op) && btmp != b1)
457				continue;
458			/* If we remembered which backend we were on before,
459			 * skip down to it now
460			 */
461			if ( get_pagedresults( op ) > SLAP_CONTROL_IGNORED &&
462				op->o_conn->c_pagedresults_state.ps_be &&
463				op->o_conn->c_pagedresults_state.ps_be != btmp )
464				continue;
465
466			if (tlimit0 != SLAP_NO_LIMIT) {
467				op->o_time = slap_get_time();
468				op->ors_tlimit = stoptime - op->o_time;
469				if (op->ors_tlimit <= 0) {
470					rs->sr_err = gs.err = LDAP_TIMELIMIT_EXCEEDED;
471					break;
472				}
473			}
474			rs->sr_err = 0;
475			/*
476			 * check for abandon
477			 */
478			if (op->o_abandon) {
479				goto end_of_loop;
480			}
481			op->o_bd = btmp;
482
483			assert( op->o_bd->be_suffix != NULL );
484			assert( op->o_bd->be_nsuffix != NULL );
485
486			if (scope0 == LDAP_SCOPE_ONELEVEL &&
487				dn_match(pdn, &ndn))
488			{
489				struct berval mdn, mndn;
490				op->ors_scope = LDAP_SCOPE_BASE;
491				mdn = op->o_req_dn = op->o_bd->be_suffix[0];
492				mndn = op->o_req_ndn = op->o_bd->be_nsuffix[0];
493				rs->sr_err = op->o_bd->be_search(op, rs);
494				if ( rs->sr_err == LDAP_NO_SUCH_OBJECT ) {
495					gs.err = LDAP_SUCCESS;
496				}
497				op->ors_scope = LDAP_SCOPE_ONELEVEL;
498				if ( op->o_req_dn.bv_val == mdn.bv_val )
499					op->o_req_dn = dn;
500				if ( op->o_req_ndn.bv_val == mndn.bv_val )
501					op->o_req_ndn = ndn;
502
503			} else if (scope0 == LDAP_SCOPE_SUBTREE &&
504				dn_match(&op->o_bd->be_nsuffix[0], &ndn))
505			{
506				rs->sr_err = glue_sub_search( op, rs, b0, on );
507
508			} else if (scope0 == LDAP_SCOPE_SUBTREE &&
509				dnIsSuffix(&op->o_bd->be_nsuffix[0], &ndn))
510			{
511				struct berval mdn, mndn;
512				mdn = op->o_req_dn = op->o_bd->be_suffix[0];
513				mndn = op->o_req_ndn = op->o_bd->be_nsuffix[0];
514				rs->sr_err = glue_sub_search( op, rs, b0, on );
515				if ( rs->sr_err == LDAP_NO_SUCH_OBJECT ) {
516					gs.err = LDAP_SUCCESS;
517				}
518				if ( op->o_req_dn.bv_val == mdn.bv_val )
519					op->o_req_dn = dn;
520				if ( op->o_req_ndn.bv_val == mndn.bv_val )
521					op->o_req_ndn = ndn;
522
523			} else if (dnIsSuffix(&ndn, &op->o_bd->be_nsuffix[0])) {
524				rs->sr_err = glue_sub_search( op, rs, b0, on );
525			}
526
527			switch ( gs.err ) {
528
529			/*
530			 * Add errors that should result in dropping
531			 * the search
532			 */
533			case LDAP_SIZELIMIT_EXCEEDED:
534			case LDAP_TIMELIMIT_EXCEEDED:
535			case LDAP_ADMINLIMIT_EXCEEDED:
536			case LDAP_NO_SUCH_OBJECT:
537#ifdef LDAP_CONTROL_X_CHAINING_BEHAVIOR
538			case LDAP_X_CANNOT_CHAIN:
539#endif /* LDAP_CONTROL_X_CHAINING_BEHAVIOR */
540				goto end_of_loop;
541
542			case LDAP_SUCCESS:
543				if ( get_pagedresults( op ) > SLAP_CONTROL_IGNORED ) {
544					PagedResultsState *ps = op->o_pagedresults_state;
545
546					/* Assume this backend can be forgotten now */
547					op->o_conn->c_pagedresults_state.ps_be = NULL;
548
549					/* If we have a full page, exit the loop. We may
550					 * need to remember this backend so we can continue
551					 * from here on a subsequent request.
552					 */
553					if ( rs->sr_nentries >= ps->ps_size ) {
554						PagedResultsState *cps = &op->o_conn->c_pagedresults_state;
555
556						/* Don't bother to remember the first backend.
557						 * Only remember the last one if there's more state left.
558						 */
559						if ( op->o_bd != b0 &&
560							( cps->ps_cookie != NOID
561								|| !BER_BVISNULL( &cps->ps_cookieval )
562								|| op->o_bd != gi->gi_n[0].gn_be ) )
563						{
564							op->o_conn->c_pagedresults_state.ps_be = op->o_bd;
565						}
566
567						/* Check whether the cookie is empty,
568						 * and give remaining databases a chance
569						 */
570						if ( op->o_bd != gi->gi_n[0].gn_be || cps->ps_cookie == NOID ) {
571							int		c;
572
573							for ( c = 0; gs.ctrls[c] != NULL; c++ ) {
574								if ( strcmp( gs.ctrls[c]->ldctl_oid, LDAP_CONTROL_PAGEDRESULTS ) == 0 ) {
575									break;
576								}
577							}
578
579							if ( gs.ctrls[c] != NULL ) {
580								BerElementBuffer berbuf;
581								BerElement	*ber = (BerElement *)&berbuf;
582								ber_tag_t	tag;
583								ber_int_t	size;
584								struct berval	cookie, value;
585
586								ber_init2( ber, &gs.ctrls[c]->ldctl_value, LBER_USE_DER );
587
588								tag = ber_scanf( ber, "{im}", &size, &cookie );
589								assert( tag != LBER_ERROR );
590
591								if ( BER_BVISEMPTY( &cookie ) && op->o_bd != gi->gi_n[0].gn_be ) {
592									/* delete old, create new cookie with NOID */
593									PagedResultsCookie respcookie = (PagedResultsCookie)NOID;
594									ber_len_t oidlen = strlen( gs.ctrls[c]->ldctl_oid );
595									LDAPControl *newctrl;
596
597									/* it's next database's turn */
598									if ( btmp == b0 ) {
599										op->o_conn->c_pagedresults_state.ps_be = gi->gi_n[gi->gi_nodes - 1].gn_be;
600
601									} else {
602										op->o_conn->c_pagedresults_state.ps_be = gi->gi_n[(i > 0 ? i - 1: 0)].gn_be;
603									}
604
605									cookie.bv_val = (char *)&respcookie;
606									cookie.bv_len = sizeof( PagedResultsCookie );
607
608									ber_init2( ber, NULL, LBER_USE_DER );
609									ber_printf( ber, "{iO}", 0, &cookie );
610									ber_flatten2( ber, &value, 0 );
611
612									newctrl = op->o_tmprealloc( gs.ctrls[c],
613										sizeof(LDAPControl) + oidlen + 1 + value.bv_len + 1,
614										op->o_tmpmemctx);
615									newctrl->ldctl_iscritical = gs.ctrls[c]->ldctl_iscritical;
616									newctrl->ldctl_oid = (char *)&newctrl[1];
617									lutil_strcopy( newctrl->ldctl_oid, gs.ctrls[c]->ldctl_oid );
618									newctrl->ldctl_value.bv_len = value.bv_len;
619									lutil_memcopy( newctrl->ldctl_value.bv_val,
620										value.bv_val, value.bv_len );
621
622									gs.ctrls[c] = newctrl;
623
624									ber_free_buf( ber );
625
626								} else if ( !BER_BVISEMPTY( &cookie ) && op->o_bd != b0 ) {
627									/* if cookie not empty, it's again this database's turn */
628									op->o_conn->c_pagedresults_state.ps_be = op->o_bd;
629								}
630							}
631						}
632
633						goto end_of_loop;
634					}
635
636					/* This backend has run out of entries, but more responses
637					 * can fit in the page. Fake a reset of the state so the
638					 * next backend will start up properly. Only back-[bh]db
639					 * and back-sql look at this state info.
640					 */
641					ps->ps_cookie = (PagedResultsCookie)0;
642					BER_BVZERO( &ps->ps_cookieval );
643
644					{
645						/* change the size of the page in the request
646						 * that will be propagated, and reset the cookie */
647						BerElementBuffer berbuf;
648						BerElement *ber = (BerElement *)&berbuf;
649						int size = ps->ps_size - rs->sr_nentries;
650						struct berval cookie = BER_BVC(""), value;
651						int c;
652
653						for (c = 0; op->o_ctrls[c] != NULL; c++) {
654							if (strcmp(op->o_ctrls[c]->ldctl_oid, LDAP_CONTROL_PAGEDRESULTS) == 0)
655								break;
656						}
657
658						assert( op->o_ctrls[c] != NULL );
659
660						ber_init2( ber, NULL, LBER_USE_DER );
661						ber_printf( ber, "{iO}", size, &cookie );
662						ber_flatten2( ber, &value, 0 );
663						assert( op->o_ctrls[c]->ldctl_value.bv_len >= value.bv_len );
664						op->o_ctrls[c]->ldctl_value.bv_len = value.bv_len;
665						lutil_memcopy( op->o_ctrls[c]->ldctl_value.bv_val,
666							value.bv_val, value.bv_len );
667						ber_free_buf( ber );
668					}
669				}
670
671			default:
672				break;
673			}
674		}
675end_of_loop:;
676		op->ors_scope = scope0;
677		op->ors_tlimit = tlimit0;
678		op->o_time = starttime;
679
680		break;
681	}
682
683	op->o_callback = cb.sc_next;
684	if ( op->o_abandon ) {
685		rs->sr_err = SLAPD_ABANDON;
686	} else {
687		rs->sr_err = gs.err;
688		rs->sr_matched = gs.matched;
689		rs->sr_ref = gs.refs;
690	}
691	rs->sr_ctrls = gs.ctrls;
692
693	send_ldap_result( op, rs );
694
695	op->o_bd = b0;
696	op->o_bd->bd_info = bi0;
697	if (gs.matched)
698		free (gs.matched);
699	if (gs.refs)
700		ber_bvarray_free(gs.refs);
701	if (gs.ctrls) {
702		for (i = gs.nctrls; --i >= 0; ) {
703			op->o_tmpfree(gs.ctrls[i], op->o_tmpmemctx);
704		}
705		op->o_tmpfree(gs.ctrls, op->o_tmpmemctx);
706	}
707	return rs->sr_err;
708}
709
710static BackendDB toolDB;
711
712static int
713glue_tool_entry_open (
714	BackendDB *b0,
715	int mode
716)
717{
718	slap_overinfo	*oi = (slap_overinfo *)b0->bd_info;
719
720	/* We don't know which backend to talk to yet, so just
721	 * remember the mode and move on...
722	 */
723
724	glueMode = mode;
725	glueBack = NULL;
726	toolDB = *b0;
727	toolDB.bd_info = oi->oi_orig;
728
729	/* Sanity checks */
730	{
731		slap_overinst *on = glue_tool_inst( b0->bd_info );
732		glueinfo	*gi = on->on_bi.bi_private;
733
734		int i;
735		for (i = 0; i < gi->gi_nodes; i++) {
736			BackendDB *bd;
737			struct berval pdn;
738
739			dnParent( &gi->gi_n[i].gn_be->be_nsuffix[0], &pdn );
740			bd = select_backend( &pdn, 0 );
741			if ( bd ) {
742				ID id;
743				BackendDB db;
744
745				if ( overlay_is_over( bd ) ) {
746					slap_overinfo *oi = (slap_overinfo *)bd->bd_info;
747					db = *bd;
748					db.bd_info = oi->oi_orig;
749					bd = &db;
750				}
751
752				if ( !bd->bd_info->bi_tool_dn2id_get
753					|| !bd->bd_info->bi_tool_entry_open
754					|| !bd->bd_info->bi_tool_entry_close )
755				{
756					continue;
757				}
758
759				bd->bd_info->bi_tool_entry_open( bd, 0 );
760				id = bd->bd_info->bi_tool_dn2id_get( bd, &gi->gi_n[i].gn_be->be_nsuffix[0] );
761				bd->bd_info->bi_tool_entry_close( bd );
762				if ( id != NOID ) {
763					Debug( LDAP_DEBUG_ANY,
764						"glue_tool_entry_open: subordinate database suffix entry DN=\"%s\" also present in superior database rooted at DN=\"%s\"\n",
765						gi->gi_n[i].gn_be->be_suffix[0].bv_val, bd->be_suffix[0].bv_val, 0 );
766					return LDAP_OTHER;
767				}
768			}
769		}
770	}
771
772	return 0;
773}
774
775static int
776glue_tool_entry_close (
777	BackendDB *b0
778)
779{
780	int rc = 0;
781
782	if (glueBack && glueBack != GLUEBACK_DONE) {
783		if (!glueBack->be_entry_close)
784			return 0;
785		rc = glueBack->be_entry_close (glueBack);
786	}
787	return rc;
788}
789
790static slap_overinst *
791glue_tool_inst(
792	BackendInfo *bi
793)
794{
795	slap_overinfo	*oi = (slap_overinfo *)bi;
796	slap_overinst	*on;
797
798	for ( on = oi->oi_list; on; on=on->on_next ) {
799		if ( !strcmp( on->on_bi.bi_type, glue.on_bi.bi_type ))
800			return on;
801	}
802	return NULL;
803}
804
805/* This function will only be called in tool mode */
806static int
807glue_open (
808	BackendInfo *bi
809)
810{
811	slap_overinst *on = glue_tool_inst( bi );
812	glueinfo		*gi = on->on_bi.bi_private;
813	static int glueOpened = 0;
814	int i, j, same, bsame = 0, rc = 0;
815	ConfigReply cr = {0};
816
817	if (glueOpened) return 0;
818
819	glueOpened = 1;
820
821	/* If we were invoked in tool mode, open all the underlying backends */
822	if (slapMode & SLAP_TOOL_MODE) {
823		for (i = 0; i<gi->gi_nodes; i++) {
824			same = 0;
825			/* Same bi_open as our main backend? */
826			if ( gi->gi_n[i].gn_be->bd_info->bi_open ==
827				on->on_info->oi_orig->bi_open )
828				bsame = 1;
829
830			/* Loop thru the bd_info's and make sure we only
831			 * invoke their bi_open functions once each.
832			 */
833			for ( j = 0; j<i; j++ ) {
834				if ( gi->gi_n[i].gn_be->bd_info->bi_open ==
835					gi->gi_n[j].gn_be->bd_info->bi_open ) {
836					same = 1;
837					break;
838				}
839			}
840			/* OK, it's unique and non-NULL, call it. */
841			if ( !same && gi->gi_n[i].gn_be->bd_info->bi_open )
842				rc = gi->gi_n[i].gn_be->bd_info->bi_open(
843					gi->gi_n[i].gn_be->bd_info );
844			/* Let backend.c take care of the rest of startup */
845			if ( !rc )
846				rc = backend_startup_one( gi->gi_n[i].gn_be, &cr );
847			if ( rc ) break;
848		}
849		if ( !rc && !bsame && on->on_info->oi_orig->bi_open )
850			rc = on->on_info->oi_orig->bi_open( on->on_info->oi_orig );
851
852	} /* other case is impossible */
853	return rc;
854}
855
856/* This function will only be called in tool mode */
857static int
858glue_close (
859	BackendInfo *bi
860)
861{
862	static int glueClosed = 0;
863	int rc = 0;
864
865	if (glueClosed) return 0;
866
867	glueClosed = 1;
868
869	if (slapMode & SLAP_TOOL_MODE) {
870		rc = backend_shutdown( NULL );
871	}
872	return rc;
873}
874
875static int
876glue_entry_get_rw (
877	Operation		*op,
878	struct berval	*dn,
879	ObjectClass		*oc,
880	AttributeDescription	*ad,
881	int	rw,
882	Entry	**e )
883{
884	int rc;
885	BackendDB *b0 = op->o_bd;
886	op->o_bd = glue_back_select( b0, dn );
887
888	if ( op->o_bd->be_fetch ) {
889		rc = op->o_bd->be_fetch( op, dn, oc, ad, rw, e );
890	} else {
891		rc = LDAP_UNWILLING_TO_PERFORM;
892	}
893	op->o_bd =b0;
894	return rc;
895}
896
897static int
898glue_entry_release_rw (
899	Operation *op,
900	Entry *e,
901	int rw
902)
903{
904	BackendDB *b0 = op->o_bd;
905	int rc = -1;
906
907	op->o_bd = glue_back_select (b0, &e->e_nname);
908
909	if ( op->o_bd->be_release ) {
910		rc = op->o_bd->be_release( op, e, rw );
911
912	} else {
913		/* FIXME: mimic be_entry_release_rw
914		 * when no be_release() available */
915		/* free entry */
916		entry_free( e );
917		rc = 0;
918	}
919	op->o_bd = b0;
920	return rc;
921}
922
923static struct berval *glue_base;
924static int glue_scope;
925static Filter *glue_filter;
926
927static ID
928glue_tool_entry_first (
929	BackendDB *b0
930)
931{
932	slap_overinst	*on = glue_tool_inst( b0->bd_info );
933	glueinfo		*gi = on->on_bi.bi_private;
934	int i;
935	ID rc;
936
937	/* If we're starting from scratch, start at the most general */
938	if (!glueBack) {
939		if ( toolDB.be_entry_open && toolDB.be_entry_first ) {
940			glueBack = &toolDB;
941		} else {
942			for (i = gi->gi_nodes-1; i >= 0; i--) {
943				if (gi->gi_n[i].gn_be->be_entry_open &&
944					gi->gi_n[i].gn_be->be_entry_first) {
945						glueBack = gi->gi_n[i].gn_be;
946					break;
947				}
948			}
949		}
950	}
951	if (!glueBack || !glueBack->be_entry_open || !glueBack->be_entry_first ||
952		glueBack->be_entry_open (glueBack, glueMode) != 0)
953		return NOID;
954
955	rc = glueBack->be_entry_first (glueBack);
956	while ( rc == NOID ) {
957		if ( glueBack && glueBack->be_entry_close )
958			glueBack->be_entry_close (glueBack);
959		for (i=0; i<gi->gi_nodes; i++) {
960			if (gi->gi_n[i].gn_be == glueBack)
961				break;
962		}
963		if (i == 0) {
964			glueBack = GLUEBACK_DONE;
965			break;
966		} else {
967			glueBack = gi->gi_n[i-1].gn_be;
968			rc = glue_tool_entry_first (b0);
969			if ( glueBack == GLUEBACK_DONE ) {
970				break;
971			}
972		}
973	}
974	return rc;
975}
976
977static ID
978glue_tool_entry_first_x (
979	BackendDB *b0,
980	struct berval *base,
981	int scope,
982	Filter *f
983)
984{
985	slap_overinst	*on = glue_tool_inst( b0->bd_info );
986	glueinfo		*gi = on->on_bi.bi_private;
987	int i;
988	ID rc;
989
990	glue_base = base;
991	glue_scope = scope;
992	glue_filter = f;
993
994	/* If we're starting from scratch, start at the most general */
995	if (!glueBack) {
996		if ( toolDB.be_entry_open && toolDB.be_entry_first_x ) {
997			glueBack = &toolDB;
998		} else {
999			for (i = gi->gi_nodes-1; i >= 0; i--) {
1000				if (gi->gi_n[i].gn_be->be_entry_open &&
1001					gi->gi_n[i].gn_be->be_entry_first_x)
1002				{
1003					glueBack = gi->gi_n[i].gn_be;
1004					break;
1005				}
1006			}
1007		}
1008	}
1009	if (!glueBack || !glueBack->be_entry_open || !glueBack->be_entry_first_x ||
1010		glueBack->be_entry_open (glueBack, glueMode) != 0)
1011		return NOID;
1012
1013	rc = glueBack->be_entry_first_x (glueBack,
1014		glue_base, glue_scope, glue_filter);
1015	while ( rc == NOID ) {
1016		if ( glueBack && glueBack->be_entry_close )
1017			glueBack->be_entry_close (glueBack);
1018		for (i=0; i<gi->gi_nodes; i++) {
1019			if (gi->gi_n[i].gn_be == glueBack)
1020				break;
1021		}
1022		if (i == 0) {
1023			glueBack = GLUEBACK_DONE;
1024			break;
1025		} else {
1026			glueBack = gi->gi_n[i-1].gn_be;
1027			rc = glue_tool_entry_first_x (b0,
1028				glue_base, glue_scope, glue_filter);
1029			if ( glueBack == GLUEBACK_DONE ) {
1030				break;
1031			}
1032		}
1033	}
1034	return rc;
1035}
1036
1037static ID
1038glue_tool_entry_next (
1039	BackendDB *b0
1040)
1041{
1042	slap_overinst	*on = glue_tool_inst( b0->bd_info );
1043	glueinfo		*gi = on->on_bi.bi_private;
1044	int i;
1045	ID rc;
1046
1047	if (!glueBack || !glueBack->be_entry_next)
1048		return NOID;
1049
1050	rc = glueBack->be_entry_next (glueBack);
1051
1052	/* If we ran out of entries in one database, move on to the next */
1053	while (rc == NOID) {
1054		if ( glueBack && glueBack->be_entry_close )
1055			glueBack->be_entry_close (glueBack);
1056		for (i=0; i<gi->gi_nodes; i++) {
1057			if (gi->gi_n[i].gn_be == glueBack)
1058				break;
1059		}
1060		if (i == 0) {
1061			glueBack = GLUEBACK_DONE;
1062			break;
1063		} else {
1064			glueBack = gi->gi_n[i-1].gn_be;
1065			if ( glue_base || glue_filter ) {
1066				/* using entry_first_x() */
1067				rc = glue_tool_entry_first_x (b0,
1068					glue_base, glue_scope, glue_filter);
1069
1070			} else {
1071				/* using entry_first() */
1072				rc = glue_tool_entry_first (b0);
1073			}
1074			if ( glueBack == GLUEBACK_DONE ) {
1075				break;
1076			}
1077		}
1078	}
1079	return rc;
1080}
1081
1082static ID
1083glue_tool_dn2id_get (
1084	BackendDB *b0,
1085	struct berval *dn
1086)
1087{
1088	BackendDB *be, b2;
1089	int rc = -1;
1090
1091	b2 = *b0;
1092	b2.bd_info = (BackendInfo *)glue_tool_inst( b0->bd_info );
1093	be = glue_back_select (&b2, dn);
1094	if ( be == &b2 ) be = &toolDB;
1095
1096	if (!be->be_dn2id_get)
1097		return NOID;
1098
1099	if (!glueBack) {
1100		if ( be->be_entry_open ) {
1101			rc = be->be_entry_open (be, glueMode);
1102		}
1103		if (rc != 0) {
1104			return NOID;
1105		}
1106	} else if (be != glueBack) {
1107		/* If this entry belongs in a different branch than the
1108		 * previous one, close the current database and open the
1109		 * new one.
1110		 */
1111		if ( glueBack->be_entry_close ) {
1112			glueBack->be_entry_close (glueBack);
1113		}
1114		if ( be->be_entry_open ) {
1115			rc = be->be_entry_open (be, glueMode);
1116		}
1117		if (rc != 0) {
1118			return NOID;
1119		}
1120	}
1121	glueBack = be;
1122	return be->be_dn2id_get (be, dn);
1123}
1124
1125static Entry *
1126glue_tool_entry_get (
1127	BackendDB *b0,
1128	ID id
1129)
1130{
1131	if (!glueBack || !glueBack->be_entry_get)
1132		return NULL;
1133
1134	return glueBack->be_entry_get (glueBack, id);
1135}
1136
1137static ID
1138glue_tool_entry_put (
1139	BackendDB *b0,
1140	Entry *e,
1141	struct berval *text
1142)
1143{
1144	BackendDB *be, b2;
1145	int rc = -1;
1146
1147	b2 = *b0;
1148	b2.bd_info = (BackendInfo *)glue_tool_inst( b0->bd_info );
1149	be = glue_back_select (&b2, &e->e_nname);
1150	if ( be == &b2 ) be = &toolDB;
1151
1152	if (!be->be_entry_put)
1153		return NOID;
1154
1155	if (!glueBack) {
1156		if ( be->be_entry_open ) {
1157			rc = be->be_entry_open (be, glueMode);
1158		}
1159		if (rc != 0) {
1160			return NOID;
1161		}
1162	} else if (be != glueBack) {
1163		/* If this entry belongs in a different branch than the
1164		 * previous one, close the current database and open the
1165		 * new one.
1166		 */
1167		if ( glueBack->be_entry_close ) {
1168			glueBack->be_entry_close (glueBack);
1169		}
1170		if ( be->be_entry_open ) {
1171			rc = be->be_entry_open (be, glueMode);
1172		}
1173		if (rc != 0) {
1174			return NOID;
1175		}
1176	}
1177	glueBack = be;
1178	return be->be_entry_put (be, e, text);
1179}
1180
1181static ID
1182glue_tool_entry_modify (
1183	BackendDB *b0,
1184	Entry *e,
1185	struct berval *text
1186)
1187{
1188	if (!glueBack || !glueBack->be_entry_modify)
1189		return NOID;
1190
1191	return glueBack->be_entry_modify (glueBack, e, text);
1192}
1193
1194static int
1195glue_tool_entry_reindex (
1196	BackendDB *b0,
1197	ID id,
1198	AttributeDescription **adv
1199)
1200{
1201	if (!glueBack || !glueBack->be_entry_reindex)
1202		return -1;
1203
1204	return glueBack->be_entry_reindex (glueBack, id, adv);
1205}
1206
1207static int
1208glue_tool_sync (
1209	BackendDB *b0
1210)
1211{
1212	slap_overinst	*on = glue_tool_inst( b0->bd_info );
1213	glueinfo		*gi = on->on_bi.bi_private;
1214	BackendInfo		*bi = b0->bd_info;
1215	int i;
1216
1217	/* just sync everyone */
1218	for (i = 0; i<gi->gi_nodes; i++)
1219		if (gi->gi_n[i].gn_be->be_sync)
1220			gi->gi_n[i].gn_be->be_sync (gi->gi_n[i].gn_be);
1221	b0->bd_info = on->on_info->oi_orig;
1222	if ( b0->be_sync )
1223		b0->be_sync( b0 );
1224	b0->bd_info = bi;
1225	return 0;
1226}
1227
1228typedef struct glue_Addrec {
1229	struct glue_Addrec *ga_next;
1230	BackendDB *ga_be;
1231} glue_Addrec;
1232
1233/* List of added subordinates */
1234static glue_Addrec *ga_list;
1235static int ga_adding;
1236
1237static int
1238glue_db_init(
1239	BackendDB *be,
1240	ConfigReply *cr
1241)
1242{
1243	slap_overinst	*on = (slap_overinst *)be->bd_info;
1244	slap_overinfo	*oi = on->on_info;
1245	BackendInfo	*bi = oi->oi_orig;
1246	glueinfo *gi;
1247
1248	if ( SLAP_GLUE_SUBORDINATE( be )) {
1249		Debug( LDAP_DEBUG_ANY, "glue: backend %s is already subordinate, "
1250			"cannot have glue overlay!\n",
1251			be->be_suffix[0].bv_val, 0, 0 );
1252		return LDAP_OTHER;
1253	}
1254
1255	gi = ch_calloc( 1, sizeof(glueinfo));
1256	on->on_bi.bi_private = gi;
1257	dnParent( be->be_nsuffix, &gi->gi_pdn );
1258
1259	/* Currently the overlay framework doesn't handle these entry points
1260	 * but we need them....
1261	 */
1262	oi->oi_bi.bi_open = glue_open;
1263	oi->oi_bi.bi_close = glue_close;
1264
1265	/* Only advertise these if the root DB supports them */
1266	if ( bi->bi_tool_entry_open )
1267		oi->oi_bi.bi_tool_entry_open = glue_tool_entry_open;
1268	if ( bi->bi_tool_entry_close )
1269		oi->oi_bi.bi_tool_entry_close = glue_tool_entry_close;
1270	if ( bi->bi_tool_entry_first )
1271		oi->oi_bi.bi_tool_entry_first = glue_tool_entry_first;
1272	/* FIXME: check whether all support bi_tool_entry_first_x() ? */
1273	if ( bi->bi_tool_entry_first_x )
1274		oi->oi_bi.bi_tool_entry_first_x = glue_tool_entry_first_x;
1275	if ( bi->bi_tool_entry_next )
1276		oi->oi_bi.bi_tool_entry_next = glue_tool_entry_next;
1277	if ( bi->bi_tool_entry_get )
1278		oi->oi_bi.bi_tool_entry_get = glue_tool_entry_get;
1279	if ( bi->bi_tool_dn2id_get )
1280		oi->oi_bi.bi_tool_dn2id_get = glue_tool_dn2id_get;
1281	if ( bi->bi_tool_entry_put )
1282		oi->oi_bi.bi_tool_entry_put = glue_tool_entry_put;
1283	if ( bi->bi_tool_entry_reindex )
1284		oi->oi_bi.bi_tool_entry_reindex = glue_tool_entry_reindex;
1285	if ( bi->bi_tool_entry_modify )
1286		oi->oi_bi.bi_tool_entry_modify = glue_tool_entry_modify;
1287	if ( bi->bi_tool_sync )
1288		oi->oi_bi.bi_tool_sync = glue_tool_sync;
1289
1290	SLAP_DBFLAGS( be ) |= SLAP_DBFLAG_GLUE_INSTANCE;
1291
1292	if ( ga_list ) {
1293		be->bd_info = (BackendInfo *)oi;
1294		glue_sub_attach( 1 );
1295	}
1296
1297	return 0;
1298}
1299
1300static int
1301glue_db_destroy (
1302	BackendDB *be,
1303	ConfigReply *cr
1304)
1305{
1306	slap_overinst	*on = (slap_overinst *)be->bd_info;
1307	glueinfo		*gi = (glueinfo *)on->on_bi.bi_private;
1308
1309	free (gi);
1310	return SLAP_CB_CONTINUE;
1311}
1312
1313static int
1314glue_db_close(
1315	BackendDB *be,
1316	ConfigReply *cr
1317)
1318{
1319	slap_overinst	*on = (slap_overinst *)be->bd_info;
1320
1321	on->on_info->oi_bi.bi_db_close = 0;
1322	return 0;
1323}
1324
1325int
1326glue_sub_del( BackendDB *b0 )
1327{
1328	BackendDB *be;
1329	int rc = 0;
1330
1331	/* Find the top backend for this subordinate */
1332	be = b0;
1333	while ( (be=LDAP_STAILQ_NEXT( be, be_next )) != NULL ) {
1334		slap_overinfo *oi;
1335		slap_overinst *on;
1336		glueinfo *gi;
1337		int i;
1338
1339		if ( SLAP_GLUE_SUBORDINATE( be ))
1340			continue;
1341		if ( !SLAP_GLUE_INSTANCE( be ))
1342			continue;
1343		if ( !dnIsSuffix( &b0->be_nsuffix[0], &be->be_nsuffix[0] ))
1344			continue;
1345
1346		/* OK, got the right backend, find the overlay */
1347		oi = (slap_overinfo *)be->bd_info;
1348		for ( on=oi->oi_list; on; on=on->on_next ) {
1349			if ( on->on_bi.bi_type == glue.on_bi.bi_type )
1350				break;
1351		}
1352		assert( on != NULL );
1353		gi = on->on_bi.bi_private;
1354		for ( i=0; i < gi->gi_nodes; i++ ) {
1355			if ( gi->gi_n[i].gn_be == b0 ) {
1356				int j;
1357
1358				for (j=i+1; j < gi->gi_nodes; j++)
1359					gi->gi_n[j-1] = gi->gi_n[j];
1360
1361				gi->gi_nodes--;
1362			}
1363		}
1364	}
1365	if ( be == NULL )
1366		rc = LDAP_NO_SUCH_OBJECT;
1367
1368	return rc;
1369}
1370
1371
1372/* Attach all the subordinate backends to their superior */
1373int
1374glue_sub_attach( int online )
1375{
1376	glue_Addrec *ga, *gnext = NULL;
1377	int rc = 0;
1378
1379	if ( ga_adding )
1380		return 0;
1381
1382	ga_adding = 1;
1383
1384	/* For all the subordinate backends */
1385	for ( ga=ga_list; ga != NULL; ga = gnext ) {
1386		BackendDB *be;
1387
1388		gnext = ga->ga_next;
1389
1390		/* Find the top backend for this subordinate */
1391		be = ga->ga_be;
1392		while ( (be=LDAP_STAILQ_NEXT( be, be_next )) != NULL ) {
1393			slap_overinfo *oi;
1394			slap_overinst *on;
1395			glueinfo *gi;
1396
1397			if ( SLAP_GLUE_SUBORDINATE( be ))
1398				continue;
1399			if ( !dnIsSuffix( &ga->ga_be->be_nsuffix[0], &be->be_nsuffix[0] ))
1400				continue;
1401
1402			/* If it's not already configured, set up the overlay */
1403			if ( !SLAP_GLUE_INSTANCE( be )) {
1404				rc = overlay_config( be, glue.on_bi.bi_type, -1, NULL, NULL);
1405				if ( rc )
1406					break;
1407			}
1408			/* Find the overlay instance */
1409			oi = (slap_overinfo *)be->bd_info;
1410			for ( on=oi->oi_list; on; on=on->on_next ) {
1411				if ( on->on_bi.bi_type == glue.on_bi.bi_type )
1412					break;
1413			}
1414			assert( on != NULL );
1415			gi = on->on_bi.bi_private;
1416			gi = (glueinfo *)ch_realloc( gi, sizeof(glueinfo) +
1417				gi->gi_nodes * sizeof(gluenode));
1418			gi->gi_n[gi->gi_nodes].gn_be = ga->ga_be;
1419			dnParent( &ga->ga_be->be_nsuffix[0],
1420				&gi->gi_n[gi->gi_nodes].gn_pdn );
1421			gi->gi_nodes++;
1422			on->on_bi.bi_private = gi;
1423			ga->ga_be->be_flags |= SLAP_DBFLAG_GLUE_LINKED;
1424			break;
1425		}
1426		if ( !be ) {
1427			Debug( LDAP_DEBUG_ANY, "glue: no superior found for sub %s!\n",
1428				ga->ga_be->be_suffix[0].bv_val, 0, 0 );
1429			/* allow this for now, assume a superior will
1430			 * be added later
1431			 */
1432			if ( online ) {
1433				rc = 0;
1434				gnext = ga_list;
1435				break;
1436			}
1437			rc = LDAP_NO_SUCH_OBJECT;
1438		}
1439		ch_free( ga );
1440		if ( rc ) break;
1441	}
1442
1443	ga_list = gnext;
1444
1445	ga_adding = 0;
1446
1447	return rc;
1448}
1449
1450int
1451glue_sub_add( BackendDB *be, int advert, int online )
1452{
1453	glue_Addrec *ga;
1454	int rc = 0;
1455
1456	if ( overlay_is_inst( be, "glue" )) {
1457		Debug( LDAP_DEBUG_ANY, "glue: backend %s already has glue overlay, "
1458			"cannot be a subordinate!\n",
1459			be->be_suffix[0].bv_val, 0, 0 );
1460		return LDAP_OTHER;
1461	}
1462	SLAP_DBFLAGS( be ) |= SLAP_DBFLAG_GLUE_SUBORDINATE;
1463	if ( advert )
1464		SLAP_DBFLAGS( be ) |= SLAP_DBFLAG_GLUE_ADVERTISE;
1465
1466	ga = ch_malloc( sizeof( glue_Addrec ));
1467	ga->ga_next = ga_list;
1468	ga->ga_be = be;
1469	ga_list = ga;
1470
1471	if ( online )
1472		rc = glue_sub_attach( online );
1473
1474	return rc;
1475}
1476
1477static int
1478glue_access_allowed(
1479	Operation		*op,
1480	Entry			*e,
1481	AttributeDescription	*desc,
1482	struct berval		*val,
1483	slap_access_t		access,
1484	AccessControlState	*state,
1485	slap_mask_t		*maskp )
1486{
1487	BackendDB *b0, *be = glue_back_select( op->o_bd, &e->e_nname );
1488	int rc;
1489
1490	if ( be == NULL || be == op->o_bd || be->bd_info->bi_access_allowed == NULL )
1491		return SLAP_CB_CONTINUE;
1492
1493	b0 = op->o_bd;
1494	op->o_bd = be;
1495	rc = be->bd_info->bi_access_allowed ( op, e, desc, val, access, state, maskp );
1496	op->o_bd = b0;
1497	return rc;
1498}
1499
1500int
1501glue_sub_init()
1502{
1503	glue.on_bi.bi_type = "glue";
1504
1505	glue.on_bi.bi_db_init = glue_db_init;
1506	glue.on_bi.bi_db_close = glue_db_close;
1507	glue.on_bi.bi_db_destroy = glue_db_destroy;
1508
1509	glue.on_bi.bi_op_search = glue_op_search;
1510	glue.on_bi.bi_op_modify = glue_op_func;
1511	glue.on_bi.bi_op_modrdn = glue_op_func;
1512	glue.on_bi.bi_op_add = glue_op_func;
1513	glue.on_bi.bi_op_delete = glue_op_func;
1514	glue.on_bi.bi_extended = glue_op_func;
1515
1516	glue.on_bi.bi_chk_referrals = glue_chk_referrals;
1517	glue.on_bi.bi_chk_controls = glue_chk_controls;
1518	glue.on_bi.bi_entry_get_rw = glue_entry_get_rw;
1519	glue.on_bi.bi_entry_release_rw = glue_entry_release_rw;
1520	glue.on_bi.bi_access_allowed = glue_access_allowed;
1521
1522	glue.on_response = glue_response;
1523
1524	return overlay_register( &glue );
1525}
1526