syncprov.c revision 1.1.1.3
1/*	$NetBSD: syncprov.c,v 1.1.1.3 2010/03/08 02:14:20 lukem Exp $	*/
2
3/* OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.147.2.70 2009/11/24 00:53:26 quanah Exp */
4/* syncprov.c - syncrepl provider */
5/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
6 *
7 * Copyright 2004-2009 The OpenLDAP Foundation.
8 * All rights reserved.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted only as authorized by the OpenLDAP
12 * Public License.
13 *
14 * A copy of this license is available in the file LICENSE in the
15 * top-level directory of the distribution or, alternatively, at
16 * <http://www.OpenLDAP.org/license.html>.
17 */
18/* ACKNOWLEDGEMENTS:
19 * This work was initially developed by Howard Chu for inclusion in
20 * OpenLDAP Software.
21 */
22
23#include "portable.h"
24
25#ifdef SLAPD_OVER_SYNCPROV
26
27#include <ac/string.h>
28#include "lutil.h"
29#include "slap.h"
30#include "config.h"
31#include "ldap_rq.h"
32
33#ifdef LDAP_DEVEL
34#define	CHECK_CSN	1
35#endif
36
37/* A modify request on a particular entry */
38typedef struct modinst {
39	struct modinst *mi_next;
40	Operation *mi_op;
41} modinst;
42
43typedef struct modtarget {
44	struct modinst *mt_mods;
45	struct modinst *mt_tail;
46	Operation *mt_op;
47	ldap_pvt_thread_mutex_t mt_mutex;
48} modtarget;
49
50/* A queued result of a persistent search */
51typedef struct syncres {
52	struct syncres *s_next;
53	Entry *s_e;
54	struct berval s_dn;
55	struct berval s_ndn;
56	struct berval s_uuid;
57	struct berval s_csn;
58	char s_mode;
59	char s_isreference;
60} syncres;
61
62/* Record of a persistent search */
63typedef struct syncops {
64	struct syncops *s_next;
65	struct berval	s_base;		/* ndn of search base */
66	ID		s_eid;		/* entryID of search base */
67	Operation	*s_op;		/* search op */
68	int		s_rid;
69	int		s_sid;
70	struct berval s_filterstr;
71	int		s_flags;	/* search status */
72#define	PS_IS_REFRESHING	0x01
73#define	PS_IS_DETACHED		0x02
74#define	PS_WROTE_BASE		0x04
75#define	PS_FIND_BASE		0x08
76#define	PS_FIX_FILTER		0x10
77#define	PS_TASK_QUEUED		0x20
78
79	int		s_inuse;	/* reference count */
80	struct syncres *s_res;
81	struct syncres *s_restail;
82	ldap_pvt_thread_mutex_t	s_mutex;
83} syncops;
84
85/* A received sync control */
86typedef struct sync_control {
87	struct sync_cookie sr_state;
88	int sr_rhint;
89} sync_control;
90
91#if 0 /* moved back to slap.h */
92#define	o_sync	o_ctrlflag[slap_cids.sc_LDAPsync]
93#endif
94/* o_sync_mode uses data bits of o_sync */
95#define	o_sync_mode	o_ctrlflag[slap_cids.sc_LDAPsync]
96
97#define SLAP_SYNC_NONE					(LDAP_SYNC_NONE<<SLAP_CONTROL_SHIFT)
98#define SLAP_SYNC_REFRESH				(LDAP_SYNC_REFRESH_ONLY<<SLAP_CONTROL_SHIFT)
99#define SLAP_SYNC_PERSIST				(LDAP_SYNC_RESERVED<<SLAP_CONTROL_SHIFT)
100#define SLAP_SYNC_REFRESH_AND_PERSIST	(LDAP_SYNC_REFRESH_AND_PERSIST<<SLAP_CONTROL_SHIFT)
101
102/* Record of which searches matched at premodify step */
103typedef struct syncmatches {
104	struct syncmatches *sm_next;
105	syncops *sm_op;
106} syncmatches;
107
108/* Session log data */
109typedef struct slog_entry {
110	struct slog_entry *se_next;
111	struct berval se_uuid;
112	struct berval se_csn;
113	int	se_sid;
114	ber_tag_t	se_tag;
115} slog_entry;
116
117typedef struct sessionlog {
118	struct berval	sl_mincsn;
119	int		sl_num;
120	int		sl_size;
121	slog_entry *sl_head;
122	slog_entry *sl_tail;
123	ldap_pvt_thread_mutex_t sl_mutex;
124} sessionlog;
125
126/* The main state for this overlay */
127typedef struct syncprov_info_t {
128	syncops		*si_ops;
129	BerVarray	si_ctxcsn;	/* ldapsync context */
130	struct berval	si_contextdn;
131	int		*si_sids;
132	int		si_numcsns;
133	int		si_chkops;	/* checkpointing info */
134	int		si_chktime;
135	int		si_numops;	/* number of ops since last checkpoint */
136	int		si_nopres;	/* Skip present phase */
137	int		si_usehint;	/* use reload hint */
138	time_t	si_chklast;	/* time of last checkpoint */
139	Avlnode	*si_mods;	/* entries being modified */
140	sessionlog	*si_logs;
141	ldap_pvt_thread_rdwr_t	si_csn_rwlock;
142	ldap_pvt_thread_mutex_t	si_ops_mutex;
143	ldap_pvt_thread_mutex_t	si_mods_mutex;
144	ldap_pvt_thread_mutex_t	si_resp_mutex;
145} syncprov_info_t;
146
147typedef struct opcookie {
148	slap_overinst *son;
149	syncmatches *smatches;
150	modtarget *smt;
151	Entry *se;
152	struct berval sdn;	/* DN of entry, for deletes */
153	struct berval sndn;
154	struct berval suuid;	/* UUID of entry */
155	struct berval sctxcsn;
156	short osid;	/* sid of op csn */
157	short rsid;	/* sid of relay */
158	short sreference;	/* Is the entry a reference? */
159} opcookie;
160
161typedef struct mutexint {
162	ldap_pvt_thread_mutex_t mi_mutex;
163	int mi_int;
164} mutexint;
165
166typedef struct fbase_cookie {
167	struct berval *fdn;	/* DN of a modified entry, for scope testing */
168	syncops *fss;	/* persistent search we're testing against */
169	int fbase;	/* if TRUE we found the search base and it's still valid */
170	int fscope;	/* if TRUE then fdn is within the psearch scope */
171} fbase_cookie;
172
173static AttributeName csn_anlist[3];
174static AttributeName uuid_anlist[2];
175
176/* Build a LDAPsync intermediate state control */
177static int
178syncprov_state_ctrl(
179	Operation	*op,
180	SlapReply	*rs,
181	Entry		*e,
182	int		entry_sync_state,
183	LDAPControl	**ctrls,
184	int		num_ctrls,
185	int		send_cookie,
186	struct berval	*cookie )
187{
188	Attribute* a;
189	int ret;
190
191	BerElementBuffer berbuf;
192	BerElement *ber = (BerElement *)&berbuf;
193
194	struct berval	entryuuid_bv = BER_BVNULL;
195
196	ber_init2( ber, 0, LBER_USE_DER );
197	ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx );
198
199	ctrls[num_ctrls] = op->o_tmpalloc( sizeof ( LDAPControl ), op->o_tmpmemctx );
200
201	for ( a = e->e_attrs; a != NULL; a = a->a_next ) {
202		AttributeDescription *desc = a->a_desc;
203		if ( desc == slap_schema.si_ad_entryUUID ) {
204			entryuuid_bv = a->a_nvals[0];
205			break;
206		}
207	}
208
209	/* FIXME: what if entryuuid is NULL or empty ? */
210
211	if ( send_cookie && cookie ) {
212		ber_printf( ber, "{eOON}",
213			entry_sync_state, &entryuuid_bv, cookie );
214	} else {
215		ber_printf( ber, "{eON}",
216			entry_sync_state, &entryuuid_bv );
217	}
218
219	ctrls[num_ctrls]->ldctl_oid = LDAP_CONTROL_SYNC_STATE;
220	ctrls[num_ctrls]->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL);
221	ret = ber_flatten2( ber, &ctrls[num_ctrls]->ldctl_value, 1 );
222
223	ber_free_buf( ber );
224
225	if ( ret < 0 ) {
226		Debug( LDAP_DEBUG_TRACE,
227			"slap_build_sync_ctrl: ber_flatten2 failed\n",
228			0, 0, 0 );
229		send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
230		return ret;
231	}
232
233	return LDAP_SUCCESS;
234}
235
236/* Build a LDAPsync final state control */
237static int
238syncprov_done_ctrl(
239	Operation	*op,
240	SlapReply	*rs,
241	LDAPControl	**ctrls,
242	int			num_ctrls,
243	int			send_cookie,
244	struct berval *cookie,
245	int			refreshDeletes )
246{
247	int ret;
248	BerElementBuffer berbuf;
249	BerElement *ber = (BerElement *)&berbuf;
250
251	ber_init2( ber, NULL, LBER_USE_DER );
252	ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx );
253
254	ctrls[num_ctrls] = op->o_tmpalloc( sizeof ( LDAPControl ), op->o_tmpmemctx );
255
256	ber_printf( ber, "{" );
257	if ( send_cookie && cookie ) {
258		ber_printf( ber, "O", cookie );
259	}
260	if ( refreshDeletes == LDAP_SYNC_REFRESH_DELETES ) {
261		ber_printf( ber, "b", refreshDeletes );
262	}
263	ber_printf( ber, "N}" );
264
265	ctrls[num_ctrls]->ldctl_oid = LDAP_CONTROL_SYNC_DONE;
266	ctrls[num_ctrls]->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL);
267	ret = ber_flatten2( ber, &ctrls[num_ctrls]->ldctl_value, 1 );
268
269	ber_free_buf( ber );
270
271	if ( ret < 0 ) {
272		Debug( LDAP_DEBUG_TRACE,
273			"syncprov_done_ctrl: ber_flatten2 failed\n",
274			0, 0, 0 );
275		send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
276		return ret;
277	}
278
279	return LDAP_SUCCESS;
280}
281
282static int
283syncprov_sendinfo(
284	Operation	*op,
285	SlapReply	*rs,
286	int			type,
287	struct berval *cookie,
288	int			refreshDone,
289	BerVarray	syncUUIDs,
290	int			refreshDeletes )
291{
292	BerElementBuffer berbuf;
293	BerElement *ber = (BerElement *)&berbuf;
294	struct berval rspdata;
295
296	int ret;
297
298	ber_init2( ber, NULL, LBER_USE_DER );
299	ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx );
300
301	if ( type ) {
302		switch ( type ) {
303		case LDAP_TAG_SYNC_NEW_COOKIE:
304			ber_printf( ber, "tO", type, cookie );
305			break;
306		case LDAP_TAG_SYNC_REFRESH_DELETE:
307		case LDAP_TAG_SYNC_REFRESH_PRESENT:
308			ber_printf( ber, "t{", type );
309			if ( cookie ) {
310				ber_printf( ber, "O", cookie );
311			}
312			if ( refreshDone == 0 ) {
313				ber_printf( ber, "b", refreshDone );
314			}
315			ber_printf( ber, "N}" );
316			break;
317		case LDAP_TAG_SYNC_ID_SET:
318			ber_printf( ber, "t{", type );
319			if ( cookie ) {
320				ber_printf( ber, "O", cookie );
321			}
322			if ( refreshDeletes == 1 ) {
323				ber_printf( ber, "b", refreshDeletes );
324			}
325			ber_printf( ber, "[W]", syncUUIDs );
326			ber_printf( ber, "N}" );
327			break;
328		default:
329			Debug( LDAP_DEBUG_TRACE,
330				"syncprov_sendinfo: invalid syncinfo type (%d)\n",
331				type, 0, 0 );
332			return LDAP_OTHER;
333		}
334	}
335
336	ret = ber_flatten2( ber, &rspdata, 0 );
337
338	if ( ret < 0 ) {
339		Debug( LDAP_DEBUG_TRACE,
340			"syncprov_sendinfo: ber_flatten2 failed\n",
341			0, 0, 0 );
342		send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
343		return ret;
344	}
345
346	rs->sr_rspoid = LDAP_SYNC_INFO;
347	rs->sr_rspdata = &rspdata;
348	send_ldap_intermediate( op, rs );
349	rs->sr_rspdata = NULL;
350	ber_free_buf( ber );
351
352	return LDAP_SUCCESS;
353}
354
355/* Find a modtarget in an AVL tree */
356static int
357sp_avl_cmp( const void *c1, const void *c2 )
358{
359	const modtarget *m1, *m2;
360	int rc;
361
362	m1 = c1; m2 = c2;
363	rc = m1->mt_op->o_req_ndn.bv_len - m2->mt_op->o_req_ndn.bv_len;
364
365	if ( rc ) return rc;
366	return ber_bvcmp( &m1->mt_op->o_req_ndn, &m2->mt_op->o_req_ndn );
367}
368
369/* syncprov_findbase:
370 *   finds the true DN of the base of a search (with alias dereferencing) and
371 * checks to make sure the base entry doesn't get replaced with a different
372 * entry (e.g., swapping trees via ModDN, or retargeting an alias). If a
373 * change is detected, any persistent search on this base must be terminated /
374 * reloaded.
375 *   On the first call, we just save the DN and entryID. On subsequent calls
376 * we compare the DN and entryID with the saved values.
377 */
378static int
379findbase_cb( Operation *op, SlapReply *rs )
380{
381	slap_callback *sc = op->o_callback;
382
383	if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
384		fbase_cookie *fc = sc->sc_private;
385
386		/* If no entryID, we're looking for the first time.
387		 * Just store whatever we got.
388		 */
389		if ( fc->fss->s_eid == NOID ) {
390			fc->fbase = 2;
391			fc->fss->s_eid = rs->sr_entry->e_id;
392			ber_dupbv( &fc->fss->s_base, &rs->sr_entry->e_nname );
393
394		} else if ( rs->sr_entry->e_id == fc->fss->s_eid &&
395			dn_match( &rs->sr_entry->e_nname, &fc->fss->s_base )) {
396
397		/* OK, the DN is the same and the entryID is the same. */
398			fc->fbase = 1;
399		}
400	}
401	if ( rs->sr_err != LDAP_SUCCESS ) {
402		Debug( LDAP_DEBUG_ANY, "findbase failed! %d\n", rs->sr_err,0,0 );
403	}
404	return LDAP_SUCCESS;
405}
406
407static Filter generic_filter = { LDAP_FILTER_PRESENT, { 0 }, NULL };
408static struct berval generic_filterstr = BER_BVC("(objectclass=*)");
409
410static int
411syncprov_findbase( Operation *op, fbase_cookie *fc )
412{
413	/* Use basic parameters from syncrepl search, but use
414	 * current op's threadctx / tmpmemctx
415	 */
416	ldap_pvt_thread_mutex_lock( &fc->fss->s_mutex );
417	if ( fc->fss->s_flags & PS_FIND_BASE ) {
418		slap_callback cb = {0};
419		Operation fop;
420		SlapReply frs = { REP_RESULT };
421		int rc;
422
423		fc->fss->s_flags ^= PS_FIND_BASE;
424		ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex );
425
426		fop = *fc->fss->s_op;
427
428		fop.o_bd = fop.o_bd->bd_self;
429		fop.o_hdr = op->o_hdr;
430		fop.o_time = op->o_time;
431		fop.o_tincr = op->o_tincr;
432
433		cb.sc_response = findbase_cb;
434		cb.sc_private = fc;
435
436		fop.o_sync_mode = 0;	/* turn off sync mode */
437		fop.o_managedsait = SLAP_CONTROL_CRITICAL;
438		fop.o_callback = &cb;
439		fop.o_tag = LDAP_REQ_SEARCH;
440		fop.ors_scope = LDAP_SCOPE_BASE;
441		fop.ors_limit = NULL;
442		fop.ors_slimit = 1;
443		fop.ors_tlimit = SLAP_NO_LIMIT;
444		fop.ors_attrs = slap_anlist_no_attrs;
445		fop.ors_attrsonly = 1;
446		fop.ors_filter = &generic_filter;
447		fop.ors_filterstr = generic_filterstr;
448
449		rc = fop.o_bd->be_search( &fop, &frs );
450	} else {
451		ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex );
452		fc->fbase = 1;
453	}
454
455	/* After the first call, see if the fdn resides in the scope */
456	if ( fc->fbase == 1 ) {
457		switch ( fc->fss->s_op->ors_scope ) {
458		case LDAP_SCOPE_BASE:
459			fc->fscope = dn_match( fc->fdn, &fc->fss->s_base );
460			break;
461		case LDAP_SCOPE_ONELEVEL: {
462			struct berval pdn;
463			dnParent( fc->fdn, &pdn );
464			fc->fscope = dn_match( &pdn, &fc->fss->s_base );
465			break; }
466		case LDAP_SCOPE_SUBTREE:
467			fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base );
468			break;
469		case LDAP_SCOPE_SUBORDINATE:
470			fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ) &&
471				!dn_match( fc->fdn, &fc->fss->s_base );
472			break;
473		}
474	}
475
476	if ( fc->fbase )
477		return LDAP_SUCCESS;
478
479	/* If entryID has changed, then the base of this search has
480	 * changed. Invalidate the psearch.
481	 */
482	return LDAP_NO_SUCH_OBJECT;
483}
484
485/* syncprov_findcsn:
486 *   This function has three different purposes, but they all use a search
487 * that filters on entryCSN so they're combined here.
488 * 1: at startup time, after a contextCSN has been read from the database,
489 * we search for all entries with CSN >= contextCSN in case the contextCSN
490 * was not checkpointed at the previous shutdown.
491 *
492 * 2: when the current contextCSN is known and we have a sync cookie, we search
493 * for one entry with CSN = the cookie CSN. If not found, try <= cookie CSN.
494 * If an entry is found, the cookie CSN is valid, otherwise it is stale.
495 *
496 * 3: during a refresh phase, we search for all entries with CSN <= the cookie
497 * CSN, and generate Present records for them. We always collect this result
498 * in SyncID sets, even if there's only one match.
499 */
500typedef enum find_csn_t {
501	FIND_MAXCSN	= 1,
502	FIND_CSN	= 2,
503	FIND_PRESENT	= 3
504} find_csn_t;
505
506static int
507findmax_cb( Operation *op, SlapReply *rs )
508{
509	if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
510		struct berval *maxcsn = op->o_callback->sc_private;
511		Attribute *a = attr_find( rs->sr_entry->e_attrs,
512			slap_schema.si_ad_entryCSN );
513
514		if ( a && ber_bvcmp( &a->a_vals[0], maxcsn ) > 0 &&
515			slap_parse_csn_sid( &a->a_vals[0] ) == slap_serverID ) {
516			maxcsn->bv_len = a->a_vals[0].bv_len;
517			strcpy( maxcsn->bv_val, a->a_vals[0].bv_val );
518		}
519	}
520	return LDAP_SUCCESS;
521}
522
523static int
524findcsn_cb( Operation *op, SlapReply *rs )
525{
526	slap_callback *sc = op->o_callback;
527
528	/* We just want to know that at least one exists, so it's OK if
529	 * we exceed the unchecked limit.
530	 */
531	if ( rs->sr_err == LDAP_ADMINLIMIT_EXCEEDED ||
532		(rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS )) {
533		sc->sc_private = (void *)1;
534	}
535	return LDAP_SUCCESS;
536}
537
538/* Build a list of entryUUIDs for sending in a SyncID set */
539
540#define UUID_LEN	16
541
542typedef struct fpres_cookie {
543	int num;
544	BerVarray uuids;
545	char *last;
546} fpres_cookie;
547
548static int
549findpres_cb( Operation *op, SlapReply *rs )
550{
551	slap_callback *sc = op->o_callback;
552	fpres_cookie *pc = sc->sc_private;
553	Attribute *a;
554	int ret = SLAP_CB_CONTINUE;
555
556	switch ( rs->sr_type ) {
557	case REP_SEARCH:
558		a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID );
559		if ( a ) {
560			pc->uuids[pc->num].bv_val = pc->last;
561			AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val,
562				pc->uuids[pc->num].bv_len );
563			pc->num++;
564			pc->last = pc->uuids[pc->num].bv_val;
565			pc->uuids[pc->num].bv_val = NULL;
566		}
567		ret = LDAP_SUCCESS;
568		if ( pc->num != SLAP_SYNCUUID_SET_SIZE )
569			break;
570		/* FALLTHRU */
571	case REP_RESULT:
572		ret = rs->sr_err;
573		if ( pc->num ) {
574			ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
575				0, pc->uuids, 0 );
576			pc->uuids[pc->num].bv_val = pc->last;
577			pc->num = 0;
578			pc->last = pc->uuids[0].bv_val;
579		}
580		break;
581	default:
582		break;
583	}
584	return ret;
585}
586
587static int
588syncprov_findcsn( Operation *op, find_csn_t mode )
589{
590	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
591	syncprov_info_t		*si = on->on_bi.bi_private;
592
593	slap_callback cb = {0};
594	Operation fop;
595	SlapReply frs = { REP_RESULT };
596	char buf[LDAP_LUTIL_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")];
597	char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
598	struct berval maxcsn;
599	Filter cf;
600	AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
601	fpres_cookie pcookie;
602	sync_control *srs = NULL;
603	struct slap_limits_set fc_limits;
604	int i, rc = LDAP_SUCCESS, findcsn_retry = 1;
605	int maxid;
606
607	if ( mode != FIND_MAXCSN ) {
608		srs = op->o_controls[slap_cids.sc_LDAPsync];
609	}
610
611	fop = *op;
612	fop.o_sync_mode &= SLAP_CONTROL_MASK;	/* turn off sync_mode */
613	/* We want pure entries, not referrals */
614	fop.o_managedsait = SLAP_CONTROL_CRITICAL;
615
616	cf.f_ava = &eq;
617	cf.f_av_desc = slap_schema.si_ad_entryCSN;
618	BER_BVZERO( &cf.f_av_value );
619	cf.f_next = NULL;
620
621	fop.o_callback = &cb;
622	fop.ors_limit = NULL;
623	fop.ors_tlimit = SLAP_NO_LIMIT;
624	fop.ors_filter = &cf;
625	fop.ors_filterstr.bv_val = buf;
626
627again:
628	switch( mode ) {
629	case FIND_MAXCSN:
630		cf.f_choice = LDAP_FILTER_GE;
631		/* If there are multiple CSNs, use the one with our serverID */
632		for ( i=0; i<si->si_numcsns; i++) {
633			if ( slap_serverID == si->si_sids[i] ) {
634				maxid = i;
635				break;
636			}
637		}
638		if ( i == si->si_numcsns ) {
639			/* No match: this is multimaster, and none of the content in the DB
640			 * originated locally. Treat like no CSN.
641			 */
642			return LDAP_NO_SUCH_OBJECT;
643		}
644		cf.f_av_value = si->si_ctxcsn[maxid];
645		fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ),
646			"(entryCSN>=%s)", cf.f_av_value.bv_val );
647		if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) {
648			return LDAP_OTHER;
649		}
650		fop.ors_attrsonly = 0;
651		fop.ors_attrs = csn_anlist;
652		fop.ors_slimit = SLAP_NO_LIMIT;
653		cb.sc_private = &maxcsn;
654		cb.sc_response = findmax_cb;
655		strcpy( cbuf, cf.f_av_value.bv_val );
656		maxcsn.bv_val = cbuf;
657		maxcsn.bv_len = cf.f_av_value.bv_len;
658		break;
659	case FIND_CSN:
660		if ( BER_BVISEMPTY( &cf.f_av_value )) {
661			cf.f_av_value = srs->sr_state.ctxcsn[0];
662			/* If there are multiple CSNs, use the smallest */
663			for ( i=1; i<srs->sr_state.numcsns; i++ ) {
664				if ( ber_bvcmp( &cf.f_av_value, &srs->sr_state.ctxcsn[i] )
665					> 0 ) {
666					cf.f_av_value = srs->sr_state.ctxcsn[i];
667				}
668			}
669		}
670		/* Look for exact match the first time */
671		if ( findcsn_retry ) {
672			cf.f_choice = LDAP_FILTER_EQUALITY;
673			fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ),
674				"(entryCSN=%s)", cf.f_av_value.bv_val );
675		/* On retry, look for <= */
676		} else {
677			cf.f_choice = LDAP_FILTER_LE;
678			fop.ors_limit = &fc_limits;
679			memset( &fc_limits, 0, sizeof( fc_limits ));
680			fc_limits.lms_s_unchecked = 1;
681			fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ),
682				"(entryCSN<=%s)", cf.f_av_value.bv_val );
683		}
684		if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) {
685			return LDAP_OTHER;
686		}
687		fop.ors_attrsonly = 1;
688		fop.ors_attrs = slap_anlist_no_attrs;
689		fop.ors_slimit = 1;
690		cb.sc_private = NULL;
691		cb.sc_response = findcsn_cb;
692		break;
693	case FIND_PRESENT:
694		fop.ors_filter = op->ors_filter;
695		fop.ors_filterstr = op->ors_filterstr;
696		fop.ors_attrsonly = 0;
697		fop.ors_attrs = uuid_anlist;
698		fop.ors_slimit = SLAP_NO_LIMIT;
699		cb.sc_private = &pcookie;
700		cb.sc_response = findpres_cb;
701		pcookie.num = 0;
702
703		/* preallocate storage for a full set */
704		pcookie.uuids = op->o_tmpalloc( (SLAP_SYNCUUID_SET_SIZE+1) *
705			sizeof(struct berval) + SLAP_SYNCUUID_SET_SIZE * UUID_LEN,
706			op->o_tmpmemctx );
707		pcookie.last = (char *)(pcookie.uuids + SLAP_SYNCUUID_SET_SIZE+1);
708		pcookie.uuids[0].bv_val = pcookie.last;
709		pcookie.uuids[0].bv_len = UUID_LEN;
710		for (i=1; i<SLAP_SYNCUUID_SET_SIZE; i++) {
711			pcookie.uuids[i].bv_val = pcookie.uuids[i-1].bv_val + UUID_LEN;
712			pcookie.uuids[i].bv_len = UUID_LEN;
713		}
714		break;
715	}
716
717	fop.o_bd->bd_info = (BackendInfo *)on->on_info;
718	fop.o_bd->be_search( &fop, &frs );
719	fop.o_bd->bd_info = (BackendInfo *)on;
720
721	switch( mode ) {
722	case FIND_MAXCSN:
723		if ( ber_bvcmp( &si->si_ctxcsn[maxid], &maxcsn )) {
724#ifdef CHECK_CSN
725			Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
726			assert( !syn->ssyn_validate( syn, &maxcsn ));
727#endif
728			ber_bvreplace( &si->si_ctxcsn[maxid], &maxcsn );
729			si->si_numops++;	/* ensure a checkpoint */
730		}
731		break;
732	case FIND_CSN:
733		/* If matching CSN was not found, invalidate the context. */
734		if ( !cb.sc_private ) {
735			/* If we didn't find an exact match, then try for <= */
736			if ( findcsn_retry ) {
737				findcsn_retry = 0;
738				goto again;
739			}
740			rc = LDAP_NO_SUCH_OBJECT;
741		}
742		break;
743	case FIND_PRESENT:
744		op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx );
745		break;
746	}
747
748	return rc;
749}
750
751/* Should find a place to cache these */
752static mutexint *get_mutexint()
753{
754	mutexint *mi = ch_malloc( sizeof( mutexint ));
755	ldap_pvt_thread_mutex_init( &mi->mi_mutex );
756	mi->mi_int = 1;
757	return mi;
758}
759
760static void inc_mutexint( mutexint *mi )
761{
762	ldap_pvt_thread_mutex_lock( &mi->mi_mutex );
763	mi->mi_int++;
764	ldap_pvt_thread_mutex_unlock( &mi->mi_mutex );
765}
766
767/* return resulting counter */
768static int dec_mutexint( mutexint *mi )
769{
770	int i;
771	ldap_pvt_thread_mutex_lock( &mi->mi_mutex );
772	i = --mi->mi_int;
773	ldap_pvt_thread_mutex_unlock( &mi->mi_mutex );
774	if ( !i ) {
775		ldap_pvt_thread_mutex_destroy( &mi->mi_mutex );
776		ch_free( mi );
777	}
778	return i;
779}
780
781static void
782syncprov_free_syncop( syncops *so )
783{
784	syncres *sr, *srnext;
785	GroupAssertion *ga, *gnext;
786
787	ldap_pvt_thread_mutex_lock( &so->s_mutex );
788	if ( --so->s_inuse > 0 ) {
789		ldap_pvt_thread_mutex_unlock( &so->s_mutex );
790		return;
791	}
792	ldap_pvt_thread_mutex_unlock( &so->s_mutex );
793	if ( so->s_flags & PS_IS_DETACHED ) {
794		filter_free( so->s_op->ors_filter );
795		for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
796			gnext = ga->ga_next;
797			ch_free( ga );
798		}
799		ch_free( so->s_op );
800	}
801	ch_free( so->s_base.bv_val );
802	for ( sr=so->s_res; sr; sr=srnext ) {
803		srnext = sr->s_next;
804		if ( sr->s_e ) {
805			if ( !dec_mutexint( sr->s_e->e_private )) {
806				sr->s_e->e_private = NULL;
807				entry_free( sr->s_e );
808			}
809		}
810		ch_free( sr );
811	}
812	ldap_pvt_thread_mutex_destroy( &so->s_mutex );
813	ch_free( so );
814}
815
816/* Send a persistent search response */
817static int
818syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode )
819{
820	slap_overinst *on = opc->son;
821
822	SlapReply rs = { REP_SEARCH };
823	LDAPControl *ctrls[2];
824	struct berval cookie = BER_BVNULL, csns[2];
825	Entry e_uuid = {0};
826	Attribute a_uuid = {0};
827
828	if ( so->s_op->o_abandon )
829		return SLAPD_ABANDON;
830
831	ctrls[1] = NULL;
832	if ( !BER_BVISNULL( &opc->sctxcsn )) {
833		csns[0] = opc->sctxcsn;
834		BER_BVZERO( &csns[1] );
835		slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, slap_serverID ? slap_serverID : -1 );
836	}
837
838#ifdef LDAP_DEBUG
839	if ( so->s_sid > 0 ) {
840		Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: to=%03x, cookie=%s\n",
841			so->s_sid, cookie.bv_val ? cookie.bv_val : "", 0 );
842	} else {
843		Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: cookie=%s\n",
844			cookie.bv_val ? cookie.bv_val : "", 0, 0 );
845	}
846#endif
847
848	e_uuid.e_attrs = &a_uuid;
849	a_uuid.a_desc = slap_schema.si_ad_entryUUID;
850	a_uuid.a_nvals = &opc->suuid;
851	rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid,
852		mode, ctrls, 0, 1, &cookie );
853	if ( !BER_BVISNULL( &cookie )) {
854		op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
855	}
856
857	rs.sr_ctrls = ctrls;
858	rs.sr_entry = &e_uuid;
859	if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) {
860		e_uuid = *opc->se;
861		e_uuid.e_private = NULL;
862	}
863
864	switch( mode ) {
865	case LDAP_SYNC_ADD:
866		if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
867			rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
868			rs.sr_err = send_search_reference( op, &rs );
869			ber_bvarray_free( rs.sr_ref );
870			break;
871		}
872		/* fallthru */
873	case LDAP_SYNC_MODIFY:
874		rs.sr_attrs = op->ors_attrs;
875		rs.sr_err = send_search_entry( op, &rs );
876		break;
877	case LDAP_SYNC_DELETE:
878		e_uuid.e_attrs = NULL;
879		e_uuid.e_name = opc->sdn;
880		e_uuid.e_nname = opc->sndn;
881		if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
882			struct berval bv = BER_BVNULL;
883			rs.sr_ref = &bv;
884			rs.sr_err = send_search_reference( op, &rs );
885		} else {
886			rs.sr_err = send_search_entry( op, &rs );
887		}
888		break;
889	default:
890		assert(0);
891	}
892	/* In case someone else freed it already? */
893	if ( rs.sr_ctrls ) {
894		int i;
895		for ( i=0; rs.sr_ctrls[i]; i++ ) {
896			if ( rs.sr_ctrls[i] == ctrls[0] ) {
897				op->o_tmpfree( ctrls[0]->ldctl_value.bv_val, op->o_tmpmemctx );
898				ctrls[0]->ldctl_value.bv_val = NULL;
899				break;
900			}
901		}
902		rs.sr_ctrls = NULL;
903	}
904
905	return rs.sr_err;
906}
907
908static void
909syncprov_qstart( syncops *so );
910
911/* Play back queued responses */
912static int
913syncprov_qplay( Operation *op, syncops *so )
914{
915	slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
916	syncres *sr;
917	Entry *e;
918	opcookie opc;
919	int rc = 0;
920
921	opc.son = on;
922
923	do {
924		ldap_pvt_thread_mutex_lock( &so->s_mutex );
925		sr = so->s_res;
926		if ( sr )
927			so->s_res = sr->s_next;
928		if ( !so->s_res )
929			so->s_restail = NULL;
930		/* Exit loop with mutex held */
931		if ( !sr || so->s_op->o_abandon )
932			break;
933		ldap_pvt_thread_mutex_unlock( &so->s_mutex );
934
935		if ( sr->s_mode == LDAP_SYNC_NEW_COOKIE ) {
936		    SlapReply rs = { REP_INTERMEDIATE };
937
938		    rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE,
939				&sr->s_csn, 0, NULL, 0 );
940		} else {
941			opc.sdn = sr->s_dn;
942			opc.sndn = sr->s_ndn;
943			opc.suuid = sr->s_uuid;
944			opc.sctxcsn = sr->s_csn;
945			opc.sreference = sr->s_isreference;
946			opc.se = sr->s_e;
947
948			rc = syncprov_sendresp( op, &opc, so, sr->s_mode );
949
950			if ( opc.se ) {
951				if ( !dec_mutexint( opc.se->e_private )) {
952					opc.se->e_private = NULL;
953					entry_free ( opc.se );
954				}
955			}
956		}
957
958		ch_free( sr );
959
960		/* Exit loop with mutex held */
961		ldap_pvt_thread_mutex_lock( &so->s_mutex );
962
963	} while (0);
964
965	/* We now only send one change at a time, to prevent one
966	 * psearch from hogging all the CPU. Resubmit this task if
967	 * there are more responses queued and no errors occurred.
968	 */
969
970	if ( rc == 0 && so->s_res ) {
971		syncprov_qstart( so );
972	} else {
973		so->s_flags ^= PS_TASK_QUEUED;
974	}
975
976	ldap_pvt_thread_mutex_unlock( &so->s_mutex );
977	return rc;
978}
979
980/* task for playing back queued responses */
981static void *
982syncprov_qtask( void *ctx, void *arg )
983{
984	syncops *so = arg;
985	OperationBuffer opbuf;
986	Operation *op;
987	BackendDB be;
988	int rc;
989
990	op = &opbuf.ob_op;
991	*op = *so->s_op;
992	op->o_hdr = &opbuf.ob_hdr;
993	op->o_controls = opbuf.ob_controls;
994	memset( op->o_controls, 0, sizeof(opbuf.ob_controls) );
995
996	*op->o_hdr = *so->s_op->o_hdr;
997
998	op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx, 1);
999	op->o_tmpmfuncs = &slap_sl_mfuncs;
1000	op->o_threadctx = ctx;
1001
1002	/* syncprov_qplay expects a fake db */
1003	be = *so->s_op->o_bd;
1004	be.be_flags |= SLAP_DBFLAG_OVERLAY;
1005	op->o_bd = &be;
1006	LDAP_SLIST_FIRST(&op->o_extra) = NULL;
1007	op->o_callback = NULL;
1008
1009	rc = syncprov_qplay( op, so );
1010
1011	/* decrement use count... */
1012	syncprov_free_syncop( so );
1013
1014	return NULL;
1015}
1016
1017/* Start the task to play back queued psearch responses */
1018static void
1019syncprov_qstart( syncops *so )
1020{
1021	so->s_flags |= PS_TASK_QUEUED;
1022	so->s_inuse++;
1023	ldap_pvt_thread_pool_submit( &connection_pool,
1024		syncprov_qtask, so );
1025}
1026
1027/* Queue a persistent search response */
1028static int
1029syncprov_qresp( opcookie *opc, syncops *so, int mode )
1030{
1031	syncres *sr;
1032	int srsize;
1033	struct berval cookie = opc->sctxcsn;
1034
1035	if ( mode == LDAP_SYNC_NEW_COOKIE ) {
1036		syncprov_info_t	*si = opc->son->on_bi.bi_private;
1037
1038		slap_compose_sync_cookie( NULL, &cookie, si->si_ctxcsn,
1039			so->s_rid, slap_serverID ? slap_serverID : -1);
1040	}
1041
1042	srsize = sizeof(syncres) + opc->suuid.bv_len + 1 +
1043		opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1;
1044	if ( cookie.bv_len )
1045		srsize += cookie.bv_len + 1;
1046	sr = ch_malloc( srsize );
1047	sr->s_next = NULL;
1048	sr->s_e = opc->se;
1049	/* bump refcount on this entry */
1050	if ( opc->se )
1051		inc_mutexint( opc->se->e_private );
1052	sr->s_dn.bv_val = (char *)(sr + 1);
1053	sr->s_dn.bv_len = opc->sdn.bv_len;
1054	sr->s_mode = mode;
1055	sr->s_isreference = opc->sreference;
1056	sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val,
1057		 opc->sdn.bv_val ) + 1;
1058	sr->s_ndn.bv_len = opc->sndn.bv_len;
1059	sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val,
1060		 opc->sndn.bv_val ) + 1;
1061	sr->s_uuid.bv_len = opc->suuid.bv_len;
1062	AC_MEMCPY( sr->s_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
1063	if ( cookie.bv_len ) {
1064		sr->s_csn.bv_val = sr->s_uuid.bv_val + sr->s_uuid.bv_len + 1;
1065		strcpy( sr->s_csn.bv_val, cookie.bv_val );
1066	} else {
1067		sr->s_csn.bv_val = NULL;
1068	}
1069	sr->s_csn.bv_len = cookie.bv_len;
1070
1071	if ( mode == LDAP_SYNC_NEW_COOKIE && cookie.bv_val ) {
1072		ch_free( cookie.bv_val );
1073	}
1074
1075	ldap_pvt_thread_mutex_lock( &so->s_mutex );
1076	if ( !so->s_res ) {
1077		so->s_res = sr;
1078	} else {
1079		so->s_restail->s_next = sr;
1080	}
1081	so->s_restail = sr;
1082
1083	/* If the base of the psearch was modified, check it next time round */
1084	if ( so->s_flags & PS_WROTE_BASE ) {
1085		so->s_flags ^= PS_WROTE_BASE;
1086		so->s_flags |= PS_FIND_BASE;
1087	}
1088	if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) {
1089		syncprov_qstart( so );
1090	}
1091	ldap_pvt_thread_mutex_unlock( &so->s_mutex );
1092	return LDAP_SUCCESS;
1093}
1094
1095static int
1096syncprov_drop_psearch( syncops *so, int lock )
1097{
1098	if ( so->s_flags & PS_IS_DETACHED ) {
1099		if ( lock )
1100			ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex );
1101		so->s_op->o_conn->c_n_ops_executing--;
1102		so->s_op->o_conn->c_n_ops_completed++;
1103		LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, Operation,
1104			o_next );
1105		if ( lock )
1106			ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex );
1107	}
1108	syncprov_free_syncop( so );
1109
1110	return 0;
1111}
1112
1113static int
1114syncprov_ab_cleanup( Operation *op, SlapReply *rs )
1115{
1116	slap_callback *sc = op->o_callback;
1117	op->o_callback = sc->sc_next;
1118	syncprov_drop_psearch( op->o_callback->sc_private, 0 );
1119	op->o_tmpfree( sc, op->o_tmpmemctx );
1120	return 0;
1121}
1122
1123static int
1124syncprov_op_abandon( Operation *op, SlapReply *rs )
1125{
1126	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
1127	syncprov_info_t		*si = on->on_bi.bi_private;
1128	syncops *so, *soprev;
1129
1130	ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1131	for ( so=si->si_ops, soprev = (syncops *)&si->si_ops; so;
1132		soprev=so, so=so->s_next ) {
1133		if ( so->s_op->o_connid == op->o_connid &&
1134			so->s_op->o_msgid == op->orn_msgid ) {
1135				so->s_op->o_abandon = 1;
1136				soprev->s_next = so->s_next;
1137				break;
1138		}
1139	}
1140	ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1141	if ( so ) {
1142		/* Is this really a Cancel exop? */
1143		if ( op->o_tag != LDAP_REQ_ABANDON ) {
1144			so->s_op->o_cancel = SLAP_CANCEL_ACK;
1145			rs->sr_err = LDAP_CANCELLED;
1146			send_ldap_result( so->s_op, rs );
1147			if ( so->s_flags & PS_IS_DETACHED ) {
1148				slap_callback *cb;
1149				cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx );
1150				cb->sc_cleanup = syncprov_ab_cleanup;
1151				cb->sc_next = op->o_callback;
1152				cb->sc_private = so;
1153				return SLAP_CB_CONTINUE;
1154			}
1155		}
1156		syncprov_drop_psearch( so, 0 );
1157	}
1158	return SLAP_CB_CONTINUE;
1159}
1160
1161/* Find which persistent searches are affected by this operation */
1162static void
1163syncprov_matchops( Operation *op, opcookie *opc, int saveit )
1164{
1165	slap_overinst *on = opc->son;
1166	syncprov_info_t		*si = on->on_bi.bi_private;
1167
1168	fbase_cookie fc;
1169	syncops *ss, *sprev, *snext;
1170	Entry *e = NULL;
1171	Attribute *a;
1172	int rc;
1173	struct berval newdn;
1174	int freefdn = 0;
1175	BackendDB *b0 = op->o_bd, db;
1176
1177	fc.fdn = &op->o_req_ndn;
1178	/* compute new DN */
1179	if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) {
1180		struct berval pdn;
1181		if ( op->orr_nnewSup ) pdn = *op->orr_nnewSup;
1182		else dnParent( fc.fdn, &pdn );
1183		build_new_dn( &newdn, &pdn, &op->orr_nnewrdn, op->o_tmpmemctx );
1184		fc.fdn = &newdn;
1185		freefdn = 1;
1186	}
1187	if ( op->o_tag != LDAP_REQ_ADD ) {
1188		if ( !SLAP_ISOVERLAY( op->o_bd )) {
1189			db = *op->o_bd;
1190			op->o_bd = &db;
1191		}
1192		rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on );
1193		/* If we're sending responses now, make a copy and unlock the DB */
1194		if ( e && !saveit ) {
1195			if ( !opc->se ) {
1196				opc->se = entry_dup( e );
1197				opc->se->e_private = get_mutexint();
1198			}
1199			overlay_entry_release_ov( op, e, 0, on );
1200			e = opc->se;
1201		}
1202		if ( rc ) {
1203			op->o_bd = b0;
1204			return;
1205		}
1206	} else {
1207		e = op->ora_e;
1208		if ( !saveit ) {
1209			if ( !opc->se ) {
1210				opc->se = entry_dup( e );
1211				opc->se->e_private = get_mutexint();
1212			}
1213			e = opc->se;
1214		}
1215	}
1216
1217	if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
1218		ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
1219		ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
1220		opc->sreference = is_entry_referral( e );
1221		a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID );
1222		if ( a )
1223			ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx );
1224	} else if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) {
1225		op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx );
1226		op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx );
1227		ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
1228		ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
1229	}
1230
1231	ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1232	for (ss = si->si_ops, sprev = (syncops *)&si->si_ops; ss;
1233		sprev = ss, ss=snext)
1234	{
1235		Operation op2;
1236		Opheader oh;
1237		syncmatches *sm;
1238		int found = 0;
1239
1240		snext = ss->s_next;
1241		if ( ss->s_op->o_abandon )
1242			continue;
1243
1244		/* Don't send ops back to the originator */
1245		if ( opc->osid > 0 && opc->osid == ss->s_sid ) {
1246			Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping original sid %03x\n",
1247				opc->osid, 0, 0 );
1248			continue;
1249		}
1250
1251		/* Don't send ops back to the messenger */
1252		if ( opc->rsid > 0 && opc->rsid == ss->s_sid ) {
1253			Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping relayed sid %03x\n",
1254				opc->rsid, 0, 0 );
1255			continue;
1256		}
1257
1258		/* validate base */
1259		fc.fss = ss;
1260		fc.fbase = 0;
1261		fc.fscope = 0;
1262
1263		/* If the base of the search is missing, signal a refresh */
1264		rc = syncprov_findbase( op, &fc );
1265		if ( rc != LDAP_SUCCESS ) {
1266			SlapReply rs = {REP_RESULT};
1267			send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED,
1268				"search base has changed" );
1269			sprev->s_next = snext;
1270			syncprov_drop_psearch( ss, 1 );
1271			ss = sprev;
1272			continue;
1273		}
1274
1275		/* If we're sending results now, look for this op in old matches */
1276		if ( !saveit ) {
1277			syncmatches *old;
1278
1279			/* Did we modify the search base? */
1280			if ( dn_match( &op->o_req_ndn, &ss->s_base )) {
1281				ldap_pvt_thread_mutex_lock( &ss->s_mutex );
1282				ss->s_flags |= PS_WROTE_BASE;
1283				ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
1284			}
1285
1286			for ( sm=opc->smatches, old=(syncmatches *)&opc->smatches; sm;
1287				old=sm, sm=sm->sm_next ) {
1288				if ( sm->sm_op == ss ) {
1289					found = 1;
1290					old->sm_next = sm->sm_next;
1291					op->o_tmpfree( sm, op->o_tmpmemctx );
1292					break;
1293				}
1294			}
1295		}
1296
1297		if ( fc.fscope ) {
1298			op2 = *ss->s_op;
1299			oh = *op->o_hdr;
1300			oh.oh_conn = ss->s_op->o_conn;
1301			oh.oh_connid = ss->s_op->o_connid;
1302			op2.o_bd = op->o_bd->bd_self;
1303			op2.o_hdr = &oh;
1304			op2.o_extra = op->o_extra;
1305			op2.o_callback = NULL;
1306			rc = test_filter( &op2, e, ss->s_op->ors_filter );
1307		}
1308
1309		Debug( LDAP_DEBUG_TRACE, "syncprov_matchops: sid %03x fscope %d rc %d\n",
1310			ss->s_sid, fc.fscope, rc );
1311
1312		/* check if current o_req_dn is in scope and matches filter */
1313		if ( fc.fscope && rc == LDAP_COMPARE_TRUE ) {
1314			if ( saveit ) {
1315				sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx );
1316				sm->sm_next = opc->smatches;
1317				sm->sm_op = ss;
1318				ldap_pvt_thread_mutex_lock( &ss->s_mutex );
1319				++ss->s_inuse;
1320				ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
1321				opc->smatches = sm;
1322			} else {
1323				/* if found send UPDATE else send ADD */
1324				syncprov_qresp( opc, ss,
1325					found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );
1326			}
1327		} else if ( !saveit && found ) {
1328			/* send DELETE */
1329			syncprov_qresp( opc, ss, LDAP_SYNC_DELETE );
1330		} else if ( !saveit ) {
1331			syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE );
1332		}
1333		if ( !saveit && found ) {
1334			/* Decrement s_inuse, was incremented when called
1335			 * with saveit == TRUE
1336			 */
1337			syncprov_free_syncop( ss );
1338		}
1339	}
1340	ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1341
1342	if ( op->o_tag != LDAP_REQ_ADD && e ) {
1343		if ( !SLAP_ISOVERLAY( op->o_bd )) {
1344			op->o_bd = &db;
1345		}
1346		if ( saveit )
1347			overlay_entry_release_ov( op, e, 0, on );
1348		op->o_bd = b0;
1349	}
1350	if ( opc->se && !saveit ) {
1351		if ( !dec_mutexint( opc->se->e_private )) {
1352			opc->se->e_private = NULL;
1353			entry_free( opc->se );
1354			opc->se = NULL;
1355		}
1356	}
1357	if ( freefdn ) {
1358		op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx );
1359	}
1360	op->o_bd = b0;
1361}
1362
1363static int
1364syncprov_op_cleanup( Operation *op, SlapReply *rs )
1365{
1366	slap_callback *cb = op->o_callback;
1367	opcookie *opc = cb->sc_private;
1368	slap_overinst *on = opc->son;
1369	syncprov_info_t		*si = on->on_bi.bi_private;
1370	syncmatches *sm, *snext;
1371	modtarget *mt, mtdummy;
1372
1373	for (sm = opc->smatches; sm; sm=snext) {
1374		snext = sm->sm_next;
1375		syncprov_free_syncop( sm->sm_op );
1376		op->o_tmpfree( sm, op->o_tmpmemctx );
1377	}
1378
1379	/* Remove op from lock table */
1380	mt = opc->smt;
1381	if ( mt ) {
1382		ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
1383		mt->mt_mods = mt->mt_mods->mi_next;
1384		/* If there are more, promote the next one */
1385		if ( mt->mt_mods ) {
1386			mt->mt_op = mt->mt_mods->mi_op;
1387			ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
1388		} else {
1389			ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
1390			ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
1391			avl_delete( &si->si_mods, mt, sp_avl_cmp );
1392			ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
1393			ldap_pvt_thread_mutex_destroy( &mt->mt_mutex );
1394			ch_free( mt );
1395		}
1396	}
1397	if ( !BER_BVISNULL( &opc->suuid ))
1398		op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx );
1399	if ( !BER_BVISNULL( &opc->sndn ))
1400		op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx );
1401	if ( !BER_BVISNULL( &opc->sdn ))
1402		op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx );
1403	op->o_callback = cb->sc_next;
1404	op->o_tmpfree(cb, op->o_tmpmemctx);
1405
1406	return 0;
1407}
1408
1409static void
1410syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
1411{
1412	syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
1413	Modifications mod;
1414	Operation opm;
1415	SlapReply rsm = { 0 };
1416	slap_callback cb = {0};
1417	BackendDB be;
1418
1419#ifdef CHECK_CSN
1420	Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
1421
1422	int i;
1423	for ( i=0; i<si->si_numcsns; i++ ) {
1424		assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i ));
1425	}
1426#endif
1427	mod.sml_numvals = si->si_numcsns;
1428	mod.sml_values = si->si_ctxcsn;
1429	mod.sml_nvalues = NULL;
1430	mod.sml_desc = slap_schema.si_ad_contextCSN;
1431	mod.sml_op = LDAP_MOD_REPLACE;
1432	mod.sml_flags = SLAP_MOD_INTERNAL;
1433	mod.sml_next = NULL;
1434
1435	cb.sc_response = slap_null_cb;
1436	opm = *op;
1437	opm.o_tag = LDAP_REQ_MODIFY;
1438	opm.o_callback = &cb;
1439	opm.orm_modlist = &mod;
1440	opm.orm_no_opattrs = 1;
1441	if ( SLAP_GLUE_SUBORDINATE( op->o_bd )) {
1442		be = *on->on_info->oi_origdb;
1443		opm.o_bd = &be;
1444	}
1445	opm.o_req_dn = si->si_contextdn;
1446	opm.o_req_ndn = si->si_contextdn;
1447	opm.o_bd->bd_info = on->on_info->oi_orig;
1448	opm.o_managedsait = SLAP_CONTROL_NONCRITICAL;
1449	opm.o_no_schema_check = 1;
1450	opm.o_bd->be_modify( &opm, &rsm );
1451
1452	if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT &&
1453		SLAP_SYNC_SUBENTRY( opm.o_bd )) {
1454		const char	*text;
1455		char txtbuf[SLAP_TEXT_BUFLEN];
1456		size_t textlen = sizeof txtbuf;
1457		Entry *e = slap_create_context_csn_entry( opm.o_bd, NULL );
1458		slap_mods2entry( &mod, &e, 0, 1, &text, txtbuf, textlen);
1459		opm.ora_e = e;
1460		opm.o_bd->be_add( &opm, &rsm );
1461		if ( e == opm.ora_e )
1462			be_entry_release_w( &opm, opm.ora_e );
1463	}
1464
1465	if ( mod.sml_next != NULL ) {
1466		slap_mods_free( mod.sml_next, 1 );
1467	}
1468#ifdef CHECK_CSN
1469	for ( i=0; i<si->si_numcsns; i++ ) {
1470		assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i ));
1471	}
1472#endif
1473}
1474
1475static void
1476syncprov_add_slog( Operation *op )
1477{
1478	opcookie *opc = op->o_callback->sc_private;
1479	slap_overinst *on = opc->son;
1480	syncprov_info_t		*si = on->on_bi.bi_private;
1481	sessionlog *sl;
1482	slog_entry *se;
1483
1484	sl = si->si_logs;
1485	{
1486		/* Allocate a record. UUIDs are not NUL-terminated. */
1487		se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
1488			op->o_csn.bv_len + 1 );
1489		se->se_next = NULL;
1490		se->se_tag = op->o_tag;
1491
1492		se->se_uuid.bv_val = (char *)(&se[1]);
1493		AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
1494		se->se_uuid.bv_len = opc->suuid.bv_len;
1495
1496		se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len;
1497		AC_MEMCPY( se->se_csn.bv_val, op->o_csn.bv_val, op->o_csn.bv_len );
1498		se->se_csn.bv_val[op->o_csn.bv_len] = '\0';
1499		se->se_csn.bv_len = op->o_csn.bv_len;
1500		se->se_sid = slap_parse_csn_sid( &se->se_csn );
1501
1502		ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
1503		if ( sl->sl_head ) {
1504			sl->sl_tail->se_next = se;
1505		} else {
1506			sl->sl_head = se;
1507		}
1508		sl->sl_tail = se;
1509		sl->sl_num++;
1510		while ( sl->sl_num > sl->sl_size ) {
1511			se = sl->sl_head;
1512			sl->sl_head = se->se_next;
1513			strcpy( sl->sl_mincsn.bv_val, se->se_csn.bv_val );
1514			sl->sl_mincsn.bv_len = se->se_csn.bv_len;
1515			ch_free( se );
1516			sl->sl_num--;
1517		}
1518		ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
1519	}
1520}
1521
1522/* Just set a flag if we found the matching entry */
1523static int
1524playlog_cb( Operation *op, SlapReply *rs )
1525{
1526	if ( rs->sr_type == REP_SEARCH ) {
1527		op->o_callback->sc_private = (void *)1;
1528	}
1529	return rs->sr_err;
1530}
1531
1532/* enter with sl->sl_mutex locked, release before returning */
1533static void
1534syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
1535	sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids )
1536{
1537	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
1538	slog_entry *se;
1539	int i, j, ndel, num, nmods, mmods;
1540	char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
1541	BerVarray uuids;
1542	struct berval delcsn[2];
1543
1544	if ( !sl->sl_num ) {
1545		ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
1546		return;
1547	}
1548
1549	num = sl->sl_num;
1550	i = 0;
1551	nmods = 0;
1552
1553	uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) +
1554		num * UUID_LEN, op->o_tmpmemctx );
1555	uuids[0].bv_val = (char *)(uuids + num + 1);
1556
1557	delcsn[0].bv_len = 0;
1558	delcsn[0].bv_val = cbuf;
1559	BER_BVZERO(&delcsn[1]);
1560
1561	/* Make a copy of the relevant UUIDs. Put the Deletes up front
1562	 * and everything else at the end. Do this first so we can
1563	 * unlock the list mutex.
1564	 */
1565	Debug( LDAP_DEBUG_SYNC, "srs csn %s\n",
1566		srs->sr_state.ctxcsn[0].bv_val, 0, 0 );
1567	for ( se=sl->sl_head; se; se=se->se_next ) {
1568		int k;
1569		Debug( LDAP_DEBUG_SYNC, "log csn %s\n", se->se_csn.bv_val, 0, 0 );
1570		ndel = 1;
1571		for ( k=0; k<srs->sr_state.numcsns; k++ ) {
1572			if ( se->se_sid == srs->sr_state.sids[k] ) {
1573				ndel = ber_bvcmp( &se->se_csn, &srs->sr_state.ctxcsn[k] );
1574				break;
1575			}
1576		}
1577		if ( ndel <= 0 ) {
1578			Debug( LDAP_DEBUG_SYNC, "cmp %d, too old\n", ndel, 0, 0 );
1579			continue;
1580		}
1581		ndel = 0;
1582		for ( k=0; k<numcsns; k++ ) {
1583			if ( se->se_sid == sids[k] ) {
1584				ndel = ber_bvcmp( &se->se_csn, &ctxcsn[k] );
1585				break;
1586			}
1587		}
1588		if ( ndel > 0 ) {
1589			Debug( LDAP_DEBUG_SYNC, "cmp %d, too new\n", ndel, 0, 0 );
1590			break;
1591		}
1592		if ( se->se_tag == LDAP_REQ_DELETE ) {
1593			j = i;
1594			i++;
1595			AC_MEMCPY( cbuf, se->se_csn.bv_val, se->se_csn.bv_len );
1596			delcsn[0].bv_len = se->se_csn.bv_len;
1597			delcsn[0].bv_val[delcsn[0].bv_len] = '\0';
1598		} else {
1599			nmods++;
1600			j = num - nmods;
1601		}
1602		uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN);
1603		AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN);
1604		uuids[j].bv_len = UUID_LEN;
1605	}
1606	ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
1607
1608	ndel = i;
1609
1610	/* Zero out unused slots */
1611	for ( i=ndel; i < num - nmods; i++ )
1612		uuids[i].bv_len = 0;
1613
1614	/* Mods must be validated to see if they belong in this delete set.
1615	 */
1616
1617	mmods = nmods;
1618	/* Strip any duplicates */
1619	for ( i=0; i<nmods; i++ ) {
1620		for ( j=0; j<ndel; j++ ) {
1621			if ( bvmatch( &uuids[j], &uuids[num - 1 - i] )) {
1622				uuids[num - 1 - i].bv_len = 0;
1623				mmods --;
1624				break;
1625			}
1626		}
1627		if ( uuids[num - 1 - i].bv_len == 0 ) continue;
1628		for ( j=0; j<i; j++ ) {
1629			if ( bvmatch( &uuids[num - 1 - j], &uuids[num - 1 - i] )) {
1630				uuids[num - 1 - i].bv_len = 0;
1631				mmods --;
1632				break;
1633			}
1634		}
1635	}
1636
1637	if ( mmods ) {
1638		Operation fop;
1639		SlapReply frs = { REP_RESULT };
1640		int rc;
1641		Filter mf, af;
1642		AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
1643		slap_callback cb = {0};
1644
1645		fop = *op;
1646
1647		fop.o_sync_mode = 0;
1648		fop.o_callback = &cb;
1649		fop.ors_limit = NULL;
1650		fop.ors_tlimit = SLAP_NO_LIMIT;
1651		fop.ors_attrs = slap_anlist_all_attributes;
1652		fop.ors_attrsonly = 0;
1653		fop.o_managedsait = SLAP_CONTROL_CRITICAL;
1654
1655		af.f_choice = LDAP_FILTER_AND;
1656		af.f_next = NULL;
1657		af.f_and = &mf;
1658		mf.f_choice = LDAP_FILTER_EQUALITY;
1659		mf.f_ava = &eq;
1660		mf.f_av_desc = slap_schema.si_ad_entryUUID;
1661		mf.f_next = fop.ors_filter;
1662
1663		fop.ors_filter = &af;
1664
1665		cb.sc_response = playlog_cb;
1666		fop.o_bd->bd_info = (BackendInfo *)on->on_info;
1667
1668		for ( i=ndel; i<num; i++ ) {
1669			if ( uuids[i].bv_len == 0 ) continue;
1670
1671			mf.f_av_value = uuids[i];
1672			cb.sc_private = NULL;
1673			fop.ors_slimit = 1;
1674			frs.sr_nentries = 0;
1675			rc = fop.o_bd->be_search( &fop, &frs );
1676
1677			/* If entry was not found, add to delete list */
1678			if ( !cb.sc_private ) {
1679				uuids[ndel++] = uuids[i];
1680			}
1681		}
1682		fop.o_bd->bd_info = (BackendInfo *)on;
1683	}
1684	if ( ndel ) {
1685		struct berval cookie;
1686
1687		if ( delcsn[0].bv_len ) {
1688			slap_compose_sync_cookie( op, &cookie, delcsn, srs->sr_state.rid,
1689				slap_serverID ? slap_serverID : -1 );
1690
1691			Debug( LDAP_DEBUG_SYNC, "syncprov_playlog: cookie=%s\n", cookie.bv_val, 0, 0 );
1692		}
1693
1694		uuids[ndel].bv_val = NULL;
1695		syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET,
1696			delcsn[0].bv_len ? &cookie : NULL, 0, uuids, 1 );
1697		if ( delcsn[0].bv_len ) {
1698			op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
1699		}
1700	}
1701	op->o_tmpfree( uuids, op->o_tmpmemctx );
1702}
1703
1704static int
1705syncprov_op_response( Operation *op, SlapReply *rs )
1706{
1707	opcookie *opc = op->o_callback->sc_private;
1708	slap_overinst *on = opc->son;
1709	syncprov_info_t		*si = on->on_bi.bi_private;
1710	syncmatches *sm;
1711
1712	if ( rs->sr_err == LDAP_SUCCESS )
1713	{
1714		struct berval maxcsn;
1715		char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
1716		int do_check = 0, have_psearches, foundit, csn_changed = 0;
1717
1718		ldap_pvt_thread_mutex_lock( &si->si_resp_mutex );
1719
1720		/* Update our context CSN */
1721		cbuf[0] = '\0';
1722		maxcsn.bv_val = cbuf;
1723		maxcsn.bv_len = sizeof(cbuf);
1724		ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock );
1725
1726		slap_get_commit_csn( op, &maxcsn, &foundit );
1727		if ( BER_BVISEMPTY( &maxcsn ) && SLAP_GLUE_SUBORDINATE( op->o_bd )) {
1728			/* syncrepl queues the CSN values in the db where
1729			 * it is configured , not where the changes are made.
1730			 * So look for a value in the glue db if we didn't
1731			 * find any in this db.
1732			 */
1733			BackendDB *be = op->o_bd;
1734			op->o_bd = select_backend( &be->be_nsuffix[0], 1);
1735			maxcsn.bv_val = cbuf;
1736			maxcsn.bv_len = sizeof(cbuf);
1737			slap_get_commit_csn( op, &maxcsn, &foundit );
1738			op->o_bd = be;
1739		}
1740		if ( !BER_BVISEMPTY( &maxcsn ) ) {
1741			int i, sid;
1742#ifdef CHECK_CSN
1743			Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
1744			assert( !syn->ssyn_validate( syn, &maxcsn ));
1745#endif
1746			sid = slap_parse_csn_sid( &maxcsn );
1747			for ( i=0; i<si->si_numcsns; i++ ) {
1748				if ( sid == si->si_sids[i] ) {
1749					if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn[i] ) > 0 ) {
1750						ber_bvreplace( &si->si_ctxcsn[i], &maxcsn );
1751						csn_changed = 1;
1752					}
1753					break;
1754				}
1755			}
1756			/* It's a new SID for us */
1757			if ( i == si->si_numcsns ) {
1758				value_add_one( &si->si_ctxcsn, &maxcsn );
1759				csn_changed = 1;
1760				si->si_numcsns++;
1761				si->si_sids = ch_realloc( si->si_sids, si->si_numcsns *
1762					sizeof(int));
1763				si->si_sids[i] = sid;
1764			}
1765		}
1766
1767		/* Don't do any processing for consumer contextCSN updates */
1768		if ( op->o_dont_replicate ) {
1769			if ( op->o_tag == LDAP_REQ_MODIFY &&
1770				op->orm_modlist->sml_op == LDAP_MOD_REPLACE &&
1771				op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) {
1772			/* Catch contextCSN updates from syncrepl. We have to look at
1773			 * all the attribute values, as there may be more than one csn
1774			 * that changed, and only one can be passed in the csn queue.
1775			 */
1776			Modifications *mod = op->orm_modlist;
1777			int i, j, sid;
1778
1779			for ( i=0; i<mod->sml_numvals; i++ ) {
1780				sid = slap_parse_csn_sid( &mod->sml_values[i] );
1781				for ( j=0; j<si->si_numcsns; j++ ) {
1782					if ( sid == si->si_sids[j] ) {
1783						if ( ber_bvcmp( &mod->sml_values[i], &si->si_ctxcsn[j] ) > 0 ) {
1784							ber_bvreplace( &si->si_ctxcsn[j], &mod->sml_values[i] );
1785							csn_changed = 1;
1786						}
1787						break;
1788					}
1789				}
1790
1791				if ( j == si->si_numcsns ) {
1792					value_add_one( &si->si_ctxcsn, &mod->sml_values[i] );
1793					si->si_numcsns++;
1794					si->si_sids = ch_realloc( si->si_sids, si->si_numcsns *
1795						sizeof(int));
1796					si->si_sids[j] = sid;
1797					csn_changed = 1;
1798				}
1799			}
1800			ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
1801
1802			if ( csn_changed ) {
1803				ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1804				have_psearches = ( si->si_ops != NULL );
1805				ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1806
1807				if ( have_psearches ) {
1808					for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
1809						if ( sm->sm_op->s_op->o_abandon )
1810							continue;
1811						syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_NEW_COOKIE );
1812					}
1813				}
1814			}
1815			} else {
1816			ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
1817			}
1818			goto leave;
1819		}
1820
1821		si->si_numops++;
1822		if ( si->si_chkops || si->si_chktime ) {
1823			/* Never checkpoint adding the context entry,
1824			 * it will deadlock
1825			 */
1826			if ( op->o_tag != LDAP_REQ_ADD ||
1827				!dn_match( &op->o_req_ndn, &si->si_contextdn )) {
1828				if ( si->si_chkops && si->si_numops >= si->si_chkops ) {
1829					do_check = 1;
1830					si->si_numops = 0;
1831				}
1832				if ( si->si_chktime &&
1833					(op->o_time - si->si_chklast >= si->si_chktime )) {
1834					if ( si->si_chklast ) {
1835						do_check = 1;
1836						si->si_chklast = op->o_time;
1837					} else {
1838						si->si_chklast = 1;
1839					}
1840				}
1841			}
1842		}
1843		ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
1844
1845		if ( do_check ) {
1846			ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
1847			syncprov_checkpoint( op, rs, on );
1848			ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
1849		}
1850
1851		/* only update consumer ctx if this is a newer csn */
1852		if ( csn_changed ) {
1853			opc->sctxcsn = maxcsn;
1854		}
1855
1856		/* Handle any persistent searches */
1857		ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1858		have_psearches = ( si->si_ops != NULL );
1859		ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1860		if ( have_psearches ) {
1861			switch(op->o_tag) {
1862			case LDAP_REQ_ADD:
1863			case LDAP_REQ_MODIFY:
1864			case LDAP_REQ_MODRDN:
1865			case LDAP_REQ_EXTENDED:
1866				syncprov_matchops( op, opc, 0 );
1867				break;
1868			case LDAP_REQ_DELETE:
1869				/* for each match in opc->smatches:
1870				 *   send DELETE msg
1871				 */
1872				for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
1873					if ( sm->sm_op->s_op->o_abandon )
1874						continue;
1875					syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE );
1876				}
1877				break;
1878			}
1879		}
1880
1881		/* Add any log records */
1882		if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) {
1883			syncprov_add_slog( op );
1884		}
1885leave:		ldap_pvt_thread_mutex_unlock( &si->si_resp_mutex );
1886	}
1887	return SLAP_CB_CONTINUE;
1888}
1889
1890/* We don't use a subentry to store the context CSN any more.
1891 * We expose the current context CSN as an operational attribute
1892 * of the suffix entry.
1893 */
1894static int
1895syncprov_op_compare( Operation *op, SlapReply *rs )
1896{
1897	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
1898	syncprov_info_t		*si = on->on_bi.bi_private;
1899	int rc = SLAP_CB_CONTINUE;
1900
1901	if ( dn_match( &op->o_req_ndn, &si->si_contextdn ) &&
1902		op->oq_compare.rs_ava->aa_desc == slap_schema.si_ad_contextCSN )
1903	{
1904		Entry e = {0};
1905		Attribute a = {0};
1906
1907		e.e_name = si->si_contextdn;
1908		e.e_nname = si->si_contextdn;
1909		e.e_attrs = &a;
1910
1911		a.a_desc = slap_schema.si_ad_contextCSN;
1912
1913		ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
1914
1915		a.a_vals = si->si_ctxcsn;
1916		a.a_nvals = a.a_vals;
1917		a.a_numvals = si->si_numcsns;
1918
1919		rs->sr_err = access_allowed( op, &e, op->oq_compare.rs_ava->aa_desc,
1920			&op->oq_compare.rs_ava->aa_value, ACL_COMPARE, NULL );
1921		if ( ! rs->sr_err ) {
1922			rs->sr_err = LDAP_INSUFFICIENT_ACCESS;
1923			goto return_results;
1924		}
1925
1926		if ( get_assert( op ) &&
1927			( test_filter( op, &e, get_assertion( op ) ) != LDAP_COMPARE_TRUE ) )
1928		{
1929			rs->sr_err = LDAP_ASSERTION_FAILED;
1930			goto return_results;
1931		}
1932
1933
1934		rs->sr_err = LDAP_COMPARE_FALSE;
1935
1936		if ( attr_valfind( &a,
1937			SLAP_MR_ATTRIBUTE_VALUE_NORMALIZED_MATCH |
1938				SLAP_MR_ASSERTED_VALUE_NORMALIZED_MATCH,
1939				&op->oq_compare.rs_ava->aa_value, NULL, op->o_tmpmemctx ) == 0 )
1940		{
1941			rs->sr_err = LDAP_COMPARE_TRUE;
1942		}
1943
1944return_results:;
1945
1946		ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
1947
1948		send_ldap_result( op, rs );
1949
1950		if( rs->sr_err == LDAP_COMPARE_FALSE || rs->sr_err == LDAP_COMPARE_TRUE ) {
1951			rs->sr_err = LDAP_SUCCESS;
1952		}
1953		rc = rs->sr_err;
1954	}
1955
1956	return rc;
1957}
1958
1959static int
1960syncprov_op_mod( Operation *op, SlapReply *rs )
1961{
1962	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
1963	syncprov_info_t		*si = on->on_bi.bi_private;
1964	slap_callback *cb;
1965	opcookie *opc;
1966	int have_psearches, cbsize;
1967
1968	ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1969	have_psearches = ( si->si_ops != NULL );
1970	ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1971
1972	cbsize = sizeof(slap_callback) + sizeof(opcookie) +
1973		(have_psearches ? sizeof(modinst) : 0 );
1974
1975	cb = op->o_tmpcalloc(1, cbsize, op->o_tmpmemctx);
1976	opc = (opcookie *)(cb+1);
1977	opc->son = on;
1978	cb->sc_response = syncprov_op_response;
1979	cb->sc_cleanup = syncprov_op_cleanup;
1980	cb->sc_private = opc;
1981	cb->sc_next = op->o_callback;
1982	op->o_callback = cb;
1983
1984	opc->osid = -1;
1985	opc->rsid = -1;
1986	if ( op->o_csn.bv_val ) {
1987		opc->osid = slap_parse_csn_sid( &op->o_csn );
1988	}
1989	if ( op->o_controls ) {
1990		struct sync_cookie *scook =
1991		op->o_controls[slap_cids.sc_LDAPsync];
1992		if ( scook )
1993			opc->rsid = scook->sid;
1994	}
1995
1996	/* If there are active persistent searches, lock this operation.
1997	 * See seqmod.c for the locking logic on its own.
1998	 */
1999	if ( have_psearches ) {
2000		modtarget *mt, mtdummy;
2001		modinst *mi;
2002
2003		mi = (modinst *)(opc+1);
2004		mi->mi_op = op;
2005
2006		/* See if we're already modifying this entry... */
2007		mtdummy.mt_op = op;
2008		ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
2009		mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp );
2010		if ( mt ) {
2011			ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
2012			if ( mt->mt_mods == NULL ) {
2013				/* Cannot reuse this mt, as another thread is about
2014				 * to release it in syncprov_op_cleanup.
2015				 */
2016				ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2017				mt = NULL;
2018			}
2019		}
2020		if ( mt ) {
2021			ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
2022			mt->mt_tail->mi_next = mi;
2023			mt->mt_tail = mi;
2024			/* wait for this op to get to head of list */
2025			while ( mt->mt_mods != mi ) {
2026				ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2027				/* FIXME: if dynamic config can delete overlays or
2028				 * databases we'll have to check for cleanup here.
2029				 * Currently it's not an issue because there are
2030				 * no dynamic config deletes...
2031				 */
2032				if ( slapd_shutdown )
2033					return SLAPD_ABANDON;
2034
2035				if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool ))
2036					ldap_pvt_thread_yield();
2037				ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
2038
2039				/* clean up if the caller is giving up */
2040				if ( op->o_abandon ) {
2041					modinst *m2;
2042					for ( m2 = mt->mt_mods; m2->mi_next != mi;
2043						m2 = m2->mi_next );
2044					m2->mi_next = mi->mi_next;
2045					if ( mt->mt_tail == mi ) mt->mt_tail = m2;
2046					op->o_tmpfree( cb, op->o_tmpmemctx );
2047					ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2048					return SLAPD_ABANDON;
2049				}
2050			}
2051			ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2052		} else {
2053			/* Record that we're modifying this entry now */
2054			mt = ch_malloc( sizeof(modtarget) );
2055			mt->mt_mods = mi;
2056			mt->mt_tail = mi;
2057			mt->mt_op = mi->mi_op;
2058			ldap_pvt_thread_mutex_init( &mt->mt_mutex );
2059			avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error );
2060			ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
2061		}
2062		opc->smt = mt;
2063	}
2064
2065	if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )
2066		syncprov_matchops( op, opc, 1 );
2067
2068	return SLAP_CB_CONTINUE;
2069}
2070
2071static int
2072syncprov_op_extended( Operation *op, SlapReply *rs )
2073{
2074	if ( exop_is_write( op ))
2075		return syncprov_op_mod( op, rs );
2076
2077	return SLAP_CB_CONTINUE;
2078}
2079
2080typedef struct searchstate {
2081	slap_overinst *ss_on;
2082	syncops *ss_so;
2083	BerVarray ss_ctxcsn;
2084	int *ss_sids;
2085	int ss_numcsns;
2086#define	SS_PRESENT	0x01
2087#define	SS_CHANGED	0x02
2088	int ss_flags;
2089} searchstate;
2090
2091static int
2092syncprov_search_cleanup( Operation *op, SlapReply *rs )
2093{
2094	if ( rs->sr_ctrls ) {
2095		op->o_tmpfree( rs->sr_ctrls[0], op->o_tmpmemctx );
2096		op->o_tmpfree( rs->sr_ctrls, op->o_tmpmemctx );
2097		rs->sr_ctrls = NULL;
2098	}
2099	return 0;
2100}
2101
2102typedef struct SyncOperationBuffer {
2103	Operation		sob_op;
2104	Opheader		sob_hdr;
2105	OpExtra			sob_oe;
2106	AttributeName	sob_extra;	/* not always present */
2107	/* Further data allocated here */
2108} SyncOperationBuffer;
2109
2110static void
2111syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on )
2112{
2113	SyncOperationBuffer *sopbuf2;
2114	Operation *op2;
2115	int i, alen = 0;
2116	size_t size;
2117	char *ptr;
2118	GroupAssertion *g1, *g2;
2119
2120	/* count the search attrs */
2121	for (i=0; op->ors_attrs && !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) {
2122		alen += op->ors_attrs[i].an_name.bv_len + 1;
2123	}
2124	/* Make a new copy of the operation */
2125	size = offsetof( SyncOperationBuffer, sob_extra ) +
2126		(i ? ( (i+1) * sizeof(AttributeName) + alen) : 0) +
2127		op->o_req_dn.bv_len + 1 +
2128		op->o_req_ndn.bv_len + 1 +
2129		op->o_ndn.bv_len + 1 +
2130		so->s_filterstr.bv_len + 1;
2131	sopbuf2 = ch_calloc( 1, size );
2132	op2 = &sopbuf2->sob_op;
2133	op2->o_hdr = &sopbuf2->sob_hdr;
2134	LDAP_SLIST_FIRST(&op2->o_extra) = &sopbuf2->sob_oe;
2135
2136	/* Copy the fields we care about explicitly, leave the rest alone */
2137	*op2->o_hdr = *op->o_hdr;
2138	op2->o_tag = op->o_tag;
2139	op2->o_time = op->o_time;
2140	op2->o_bd = on->on_info->oi_origdb;
2141	op2->o_request = op->o_request;
2142	op2->o_managedsait = op->o_managedsait;
2143	LDAP_SLIST_FIRST(&op2->o_extra)->oe_key = on;
2144	LDAP_SLIST_NEXT(LDAP_SLIST_FIRST(&op2->o_extra), oe_next) = NULL;
2145
2146	ptr = (char *) sopbuf2 + offsetof( SyncOperationBuffer, sob_extra );
2147	if ( i ) {
2148		op2->ors_attrs = (AttributeName *) ptr;
2149		ptr = (char *) &op2->ors_attrs[i+1];
2150		for (i=0; !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) {
2151			op2->ors_attrs[i] = op->ors_attrs[i];
2152			op2->ors_attrs[i].an_name.bv_val = ptr;
2153			ptr = lutil_strcopy( ptr, op->ors_attrs[i].an_name.bv_val ) + 1;
2154		}
2155		BER_BVZERO( &op2->ors_attrs[i].an_name );
2156	}
2157
2158	op2->o_authz = op->o_authz;
2159	op2->o_ndn.bv_val = ptr;
2160	ptr = lutil_strcopy(ptr, op->o_ndn.bv_val) + 1;
2161	op2->o_dn = op2->o_ndn;
2162	op2->o_req_dn.bv_len = op->o_req_dn.bv_len;
2163	op2->o_req_dn.bv_val = ptr;
2164	ptr = lutil_strcopy(ptr, op->o_req_dn.bv_val) + 1;
2165	op2->o_req_ndn.bv_len = op->o_req_ndn.bv_len;
2166	op2->o_req_ndn.bv_val = ptr;
2167	ptr = lutil_strcopy(ptr, op->o_req_ndn.bv_val) + 1;
2168	op2->ors_filterstr.bv_val = ptr;
2169	strcpy( ptr, so->s_filterstr.bv_val );
2170	op2->ors_filterstr.bv_len = so->s_filterstr.bv_len;
2171
2172	/* Skip the AND/GE clause that we stuck on in front */
2173	if ( so->s_flags & PS_FIX_FILTER ) {
2174		op2->ors_filter = op->ors_filter->f_and->f_next;
2175		so->s_flags ^= PS_FIX_FILTER;
2176	} else {
2177		op2->ors_filter = op->ors_filter;
2178	}
2179	op2->ors_filter = filter_dup( op2->ors_filter, NULL );
2180	so->s_op = op2;
2181
2182	/* Copy any cached group ACLs individually */
2183	op2->o_groups = NULL;
2184	for ( g1=op->o_groups; g1; g1=g1->ga_next ) {
2185		g2 = ch_malloc( sizeof(GroupAssertion) + g1->ga_len );
2186		*g2 = *g1;
2187		strcpy( g2->ga_ndn, g1->ga_ndn );
2188		g2->ga_next = op2->o_groups;
2189		op2->o_groups = g2;
2190	}
2191	/* Don't allow any further group caching */
2192	op2->o_do_not_cache = 1;
2193
2194	/* Add op2 to conn so abandon will find us */
2195	op->o_conn->c_n_ops_executing++;
2196	op->o_conn->c_n_ops_completed--;
2197	LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next );
2198	so->s_flags |= PS_IS_DETACHED;
2199
2200	/* Prevent anyone else from trying to send a result for this op */
2201	op->o_abandon = 1;
2202}
2203
2204static int
2205syncprov_search_response( Operation *op, SlapReply *rs )
2206{
2207	searchstate *ss = op->o_callback->sc_private;
2208	slap_overinst *on = ss->ss_on;
2209	syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
2210	sync_control *srs = op->o_controls[slap_cids.sc_LDAPsync];
2211
2212	if ( rs->sr_type == REP_SEARCH || rs->sr_type == REP_SEARCHREF ) {
2213		Attribute *a;
2214		/* If we got a referral without a referral object, there's
2215		 * something missing that we cannot replicate. Just ignore it.
2216		 * The consumer will abort because we didn't send the expected
2217		 * control.
2218		 */
2219		if ( !rs->sr_entry ) {
2220			assert( rs->sr_entry != NULL );
2221			Debug( LDAP_DEBUG_ANY, "bogus referral in context\n",0,0,0 );
2222			return SLAP_CB_CONTINUE;
2223		}
2224		a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN );
2225		if ( a == NULL && rs->sr_operational_attrs != NULL ) {
2226			a = attr_find( rs->sr_operational_attrs, slap_schema.si_ad_entryCSN );
2227		}
2228		if ( a ) {
2229			int i, sid;
2230			sid = slap_parse_csn_sid( &a->a_nvals[0] );
2231
2232			/* Don't send changed entries back to the originator */
2233			if ( sid == srs->sr_state.sid && srs->sr_state.numcsns ) {
2234				Debug( LDAP_DEBUG_SYNC,
2235					"Entry %s changed by peer, ignored\n",
2236					rs->sr_entry->e_name.bv_val, 0, 0 );
2237				return LDAP_SUCCESS;
2238			}
2239
2240			/* If not a persistent search */
2241			if ( !ss->ss_so ) {
2242				/* Make sure entry is less than the snapshot'd contextCSN */
2243				for ( i=0; i<ss->ss_numcsns; i++ ) {
2244					if ( sid == ss->ss_sids[i] && ber_bvcmp( &a->a_nvals[0],
2245						&ss->ss_ctxcsn[i] ) > 0 ) {
2246						Debug( LDAP_DEBUG_SYNC,
2247							"Entry %s CSN %s greater than snapshot %s\n",
2248							rs->sr_entry->e_name.bv_val,
2249							a->a_nvals[0].bv_val,
2250							ss->ss_ctxcsn[i].bv_val );
2251						return LDAP_SUCCESS;
2252					}
2253				}
2254			}
2255
2256			/* Don't send old entries twice */
2257			if ( srs->sr_state.ctxcsn ) {
2258				for ( i=0; i<srs->sr_state.numcsns; i++ ) {
2259					if ( sid == srs->sr_state.sids[i] &&
2260						ber_bvcmp( &a->a_nvals[0],
2261							&srs->sr_state.ctxcsn[i] )<= 0 ) {
2262						Debug( LDAP_DEBUG_SYNC,
2263							"Entry %s CSN %s older or equal to ctx %s\n",
2264							rs->sr_entry->e_name.bv_val,
2265							a->a_nvals[0].bv_val,
2266							srs->sr_state.ctxcsn[i].bv_val );
2267						return LDAP_SUCCESS;
2268					}
2269				}
2270			}
2271		}
2272		rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
2273			op->o_tmpmemctx );
2274		rs->sr_ctrls[1] = NULL;
2275		/* If we're in delta-sync mode, always send a cookie */
2276		if ( si->si_nopres && si->si_usehint && a ) {
2277			struct berval cookie;
2278			slap_compose_sync_cookie( op, &cookie, a->a_nvals, srs->sr_state.rid, slap_serverID ? slap_serverID : -1 );
2279			rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry,
2280				LDAP_SYNC_ADD, rs->sr_ctrls, 0, 1, &cookie );
2281		} else {
2282			rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry,
2283				LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL );
2284		}
2285	} else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) {
2286		struct berval cookie = BER_BVNULL;
2287
2288		if ( ( ss->ss_flags & SS_CHANGED ) &&
2289			ss->ss_ctxcsn && !BER_BVISNULL( &ss->ss_ctxcsn[0] )) {
2290			slap_compose_sync_cookie( op, &cookie, ss->ss_ctxcsn,
2291				srs->sr_state.rid, slap_serverID ? slap_serverID : -1 );
2292
2293			Debug( LDAP_DEBUG_SYNC, "syncprov_search_response: cookie=%s\n", cookie.bv_val, 0, 0 );
2294		}
2295
2296		/* Is this a regular refresh?
2297		 * Note: refresh never gets here if there were no changes
2298		 */
2299		if ( !ss->ss_so ) {
2300			rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
2301				op->o_tmpmemctx );
2302			rs->sr_ctrls[1] = NULL;
2303			rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls,
2304				0, 1, &cookie, ( ss->ss_flags & SS_PRESENT ) ?  LDAP_SYNC_REFRESH_PRESENTS :
2305					LDAP_SYNC_REFRESH_DELETES );
2306			op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
2307		} else {
2308		/* It's RefreshAndPersist, transition to Persist phase */
2309			syncprov_sendinfo( op, rs, ( ss->ss_flags & SS_PRESENT ) ?
2310	 			LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
2311				( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL,
2312				1, NULL, 0 );
2313			if ( !BER_BVISNULL( &cookie ))
2314				op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
2315
2316			/* Detach this Op from frontend control */
2317			ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
2318
2319			/* But not if this connection was closed along the way */
2320			if ( op->o_abandon ) {
2321				ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
2322				/* syncprov_ab_cleanup will free this syncop */
2323				return SLAPD_ABANDON;
2324
2325			} else {
2326				ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
2327				/* Turn off the refreshing flag */
2328				ss->ss_so->s_flags ^= PS_IS_REFRESHING;
2329
2330				syncprov_detach_op( op, ss->ss_so, on );
2331
2332				ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
2333
2334				/* If there are queued responses, fire them off */
2335				if ( ss->ss_so->s_res )
2336					syncprov_qstart( ss->ss_so );
2337				ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
2338			}
2339
2340			return LDAP_SUCCESS;
2341		}
2342	}
2343
2344	return SLAP_CB_CONTINUE;
2345}
2346
2347static int
2348syncprov_op_search( Operation *op, SlapReply *rs )
2349{
2350	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
2351	syncprov_info_t		*si = (syncprov_info_t *)on->on_bi.bi_private;
2352	slap_callback	*cb;
2353	int gotstate = 0, changed = 0, do_present = 0;
2354	syncops *sop = NULL;
2355	searchstate *ss;
2356	sync_control *srs;
2357	BerVarray ctxcsn;
2358	int i, *sids, numcsns;
2359	struct berval mincsn;
2360
2361	if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
2362
2363	if ( op->ors_deref & LDAP_DEREF_SEARCHING ) {
2364		send_ldap_error( op, rs, LDAP_PROTOCOL_ERROR, "illegal value for derefAliases" );
2365		return rs->sr_err;
2366	}
2367
2368	srs = op->o_controls[slap_cids.sc_LDAPsync];
2369
2370	/* If this is a persistent search, set it up right away */
2371	if ( op->o_sync_mode & SLAP_SYNC_PERSIST ) {
2372		syncops so = {0};
2373		fbase_cookie fc;
2374		opcookie opc;
2375		slap_callback sc;
2376
2377		fc.fss = &so;
2378		fc.fbase = 0;
2379		so.s_eid = NOID;
2380		so.s_op = op;
2381		so.s_flags = PS_IS_REFRESHING | PS_FIND_BASE;
2382		/* syncprov_findbase expects to be called as a callback... */
2383		sc.sc_private = &opc;
2384		opc.son = on;
2385		ldap_pvt_thread_mutex_init( &so.s_mutex );
2386		cb = op->o_callback;
2387		op->o_callback = &sc;
2388		rs->sr_err = syncprov_findbase( op, &fc );
2389		op->o_callback = cb;
2390		ldap_pvt_thread_mutex_destroy( &so.s_mutex );
2391
2392		if ( rs->sr_err != LDAP_SUCCESS ) {
2393			send_ldap_result( op, rs );
2394			return rs->sr_err;
2395		}
2396		sop = ch_malloc( sizeof( syncops ));
2397		*sop = so;
2398		ldap_pvt_thread_mutex_init( &sop->s_mutex );
2399		sop->s_rid = srs->sr_state.rid;
2400		sop->s_sid = srs->sr_state.sid;
2401		sop->s_inuse = 1;
2402
2403		ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
2404		sop->s_next = si->si_ops;
2405		si->si_ops = sop;
2406		ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
2407	}
2408
2409	/* snapshot the ctxcsn */
2410	ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
2411	numcsns = si->si_numcsns;
2412	if ( numcsns ) {
2413		ber_bvarray_dup_x( &ctxcsn, si->si_ctxcsn, op->o_tmpmemctx );
2414		sids = op->o_tmpalloc( numcsns * sizeof(int), op->o_tmpmemctx );
2415		for ( i=0; i<numcsns; i++ )
2416			sids[i] = si->si_sids[i];
2417	} else {
2418		ctxcsn = NULL;
2419		sids = NULL;
2420	}
2421	ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
2422
2423	/* If we have a cookie, handle the PRESENT lookups */
2424	if ( srs->sr_state.ctxcsn ) {
2425		sessionlog *sl;
2426		int i, j;
2427
2428		/* If we don't have any CSN of our own yet, pretend nothing
2429		 * has changed.
2430		 */
2431		if ( !numcsns )
2432			goto no_change;
2433
2434		if ( !si->si_nopres )
2435			do_present = SS_PRESENT;
2436
2437		/* If there are SIDs we don't recognize in the cookie, drop them */
2438		for (i=0; i<srs->sr_state.numcsns; ) {
2439			for (j=0; j<numcsns; j++) {
2440				if ( srs->sr_state.sids[i] == sids[j] ) {
2441					break;
2442				}
2443			}
2444			/* not found */
2445			if ( j == numcsns ) {
2446				struct berval tmp = srs->sr_state.ctxcsn[i];
2447				j = srs->sr_state.numcsns - 1;
2448				srs->sr_state.ctxcsn[i] = srs->sr_state.ctxcsn[j];
2449				tmp.bv_len = 0;
2450				srs->sr_state.ctxcsn[j] = tmp;
2451				srs->sr_state.numcsns = j;
2452				srs->sr_state.sids[i] = srs->sr_state.sids[j];
2453				continue;
2454			}
2455			i++;
2456		}
2457
2458		/* Find the smallest CSN */
2459		mincsn = srs->sr_state.ctxcsn[0];
2460		for ( i=1; i<srs->sr_state.numcsns; i++ ) {
2461			if ( ber_bvcmp( &mincsn, &srs->sr_state.ctxcsn[i] ) > 0 )
2462				mincsn = srs->sr_state.ctxcsn[i];
2463		}
2464
2465		/* If nothing has changed, shortcut it */
2466		if ( srs->sr_state.numcsns == numcsns ) {
2467			int i, j, newer;
2468			for ( i=0; i<srs->sr_state.numcsns; i++ ) {
2469				for ( j=0; j<numcsns; j++ ) {
2470					if ( srs->sr_state.sids[i] != sids[j] )
2471						continue;
2472					newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] );
2473					/* If our state is newer, tell consumer about changes */
2474					if ( newer < 0 )
2475						changed = SS_CHANGED;
2476					else if ( newer > 0 ) {
2477					/* our state is older, tell consumer nothing */
2478						rs->sr_err = LDAP_SUCCESS;
2479bailout:
2480						if ( sop ) {
2481							syncops **sp = &si->si_ops;
2482
2483							ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
2484							while ( *sp != sop )
2485								sp = &(*sp)->s_next;
2486							*sp = sop->s_next;
2487							ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
2488							ch_free( sop );
2489						}
2490						rs->sr_ctrls = NULL;
2491						send_ldap_result( op, rs );
2492						return rs->sr_err;
2493					}
2494					break;
2495				}
2496				if ( changed )
2497					break;
2498			}
2499			if ( !changed ) {
2500				do_present = 0;
2501no_change:		if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
2502					LDAPControl	*ctrls[2];
2503
2504					ctrls[0] = NULL;
2505					ctrls[1] = NULL;
2506					syncprov_done_ctrl( op, rs, ctrls, 0, 0,
2507						NULL, LDAP_SYNC_REFRESH_DELETES );
2508					rs->sr_ctrls = ctrls;
2509					rs->sr_err = LDAP_SUCCESS;
2510					send_ldap_result( op, rs );
2511					rs->sr_ctrls = NULL;
2512					return rs->sr_err;
2513				}
2514				goto shortcut;
2515			}
2516		} else {
2517			/* consumer doesn't have the right number of CSNs */
2518			changed = SS_CHANGED;
2519		}
2520		/* Do we have a sessionlog for this search? */
2521		sl=si->si_logs;
2522		if ( sl ) {
2523			ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
2524			/* Are there any log entries, and is the consumer state
2525			 * present in the session log?
2526			 */
2527			if ( sl->sl_num > 0 && ber_bvcmp( &mincsn, &sl->sl_mincsn ) >= 0 ) {
2528				do_present = 0;
2529				/* mutex is unlocked in playlog */
2530				syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids );
2531			} else {
2532				ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
2533			}
2534		}
2535		/* Is the CSN still present in the database? */
2536		if ( syncprov_findcsn( op, FIND_CSN ) != LDAP_SUCCESS ) {
2537			/* No, so a reload is required */
2538			/* the 2.2 consumer doesn't send this hint */
2539			if ( si->si_usehint && srs->sr_rhint == 0 ) {
2540				if ( ctxcsn )
2541					ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx );
2542				if ( sids )
2543					op->o_tmpfree( sids, op->o_tmpmemctx );
2544				rs->sr_err = LDAP_SYNC_REFRESH_REQUIRED;
2545				rs->sr_text = "sync cookie is stale";
2546				goto bailout;
2547			}
2548			if ( srs->sr_state.ctxcsn ) {
2549				ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx );
2550				srs->sr_state.ctxcsn = NULL;
2551			}
2552			if ( srs->sr_state.sids ) {
2553				slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx );
2554				srs->sr_state.sids = NULL;
2555			}
2556			srs->sr_state.numcsns = 0;
2557		} else {
2558			gotstate = 1;
2559			/* If changed and doing Present lookup, send Present UUIDs */
2560			if ( do_present && syncprov_findcsn( op, FIND_PRESENT ) !=
2561				LDAP_SUCCESS ) {
2562				if ( ctxcsn )
2563					ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx );
2564				if ( sids )
2565					op->o_tmpfree( sids, op->o_tmpmemctx );
2566				goto bailout;
2567			}
2568		}
2569	} else {
2570		/* No consumer state, assume something has changed */
2571		changed = SS_CHANGED;
2572	}
2573
2574shortcut:
2575	/* Append CSN range to search filter, save original filter
2576	 * for persistent search evaluation
2577	 */
2578	if ( sop ) {
2579		sop->s_filterstr= op->ors_filterstr;
2580	}
2581
2582	/* If something changed, find the changes */
2583	if ( gotstate && changed ) {
2584		Filter *fand, *fava;
2585
2586		fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
2587		fand->f_choice = LDAP_FILTER_AND;
2588		fand->f_next = NULL;
2589		fava = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
2590		fand->f_and = fava;
2591		fava->f_choice = LDAP_FILTER_GE;
2592		fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
2593		fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
2594#ifdef LDAP_COMP_MATCH
2595		fava->f_ava->aa_cf = NULL;
2596#endif
2597		ber_dupbv_x( &fava->f_ava->aa_value, &mincsn, op->o_tmpmemctx );
2598		fava->f_next = op->ors_filter;
2599		op->ors_filter = fand;
2600		filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
2601		if ( sop )
2602			sop->s_flags |= PS_FIX_FILTER;
2603	}
2604
2605	/* Let our callback add needed info to returned entries */
2606	cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(searchstate), op->o_tmpmemctx);
2607	ss = (searchstate *)(cb+1);
2608	ss->ss_on = on;
2609	ss->ss_so = sop;
2610	ss->ss_flags = do_present | changed;
2611	ss->ss_ctxcsn = ctxcsn;
2612	ss->ss_numcsns = numcsns;
2613	ss->ss_sids = sids;
2614	cb->sc_response = syncprov_search_response;
2615	cb->sc_cleanup = syncprov_search_cleanup;
2616	cb->sc_private = ss;
2617	cb->sc_next = op->o_callback;
2618	op->o_callback = cb;
2619
2620	/* If this is a persistent search and no changes were reported during
2621	 * the refresh phase, just invoke the response callback to transition
2622	 * us into persist phase
2623	 */
2624	if ( !changed ) {
2625		rs->sr_err = LDAP_SUCCESS;
2626		rs->sr_nentries = 0;
2627		send_ldap_result( op, rs );
2628		return rs->sr_err;
2629	}
2630	return SLAP_CB_CONTINUE;
2631}
2632
2633static int
2634syncprov_operational(
2635	Operation *op,
2636	SlapReply *rs )
2637{
2638	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
2639	syncprov_info_t		*si = (syncprov_info_t *)on->on_bi.bi_private;
2640
2641	/* This prevents generating unnecessarily; frontend will strip
2642	 * any statically stored copy.
2643	 */
2644	if ( op->o_sync != SLAP_CONTROL_NONE )
2645		return SLAP_CB_CONTINUE;
2646
2647	if ( rs->sr_entry &&
2648		dn_match( &rs->sr_entry->e_nname, &si->si_contextdn )) {
2649
2650		if ( SLAP_OPATTRS( rs->sr_attr_flags ) ||
2651			ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) {
2652			Attribute *a, **ap = NULL;
2653
2654			for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) {
2655				if ( a->a_desc == slap_schema.si_ad_contextCSN )
2656					break;
2657			}
2658
2659			ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
2660			if ( si->si_ctxcsn ) {
2661				if ( !a ) {
2662					for ( ap = &rs->sr_operational_attrs; *ap;
2663						ap=&(*ap)->a_next );
2664
2665					a = attr_alloc( slap_schema.si_ad_contextCSN );
2666					*ap = a;
2667				}
2668
2669				if ( !ap ) {
2670					if ( !(rs->sr_flags & REP_ENTRY_MODIFIABLE) ) {
2671						Entry *e = entry_dup( rs->sr_entry );
2672						if ( rs->sr_flags & REP_ENTRY_MUSTRELEASE ) {
2673							overlay_entry_release_ov( op, rs->sr_entry, 0, on );
2674							rs->sr_flags ^= REP_ENTRY_MUSTRELEASE;
2675						} else if ( rs->sr_flags & REP_ENTRY_MUSTBEFREED ) {
2676							entry_free( rs->sr_entry );
2677						}
2678						rs->sr_entry = e;
2679						rs->sr_flags |=
2680							REP_ENTRY_MODIFIABLE|REP_ENTRY_MUSTBEFREED;
2681						a = attr_find( rs->sr_entry->e_attrs,
2682							slap_schema.si_ad_contextCSN );
2683					}
2684					if ( a->a_nvals != a->a_vals ) {
2685						ber_bvarray_free( a->a_nvals );
2686					}
2687					a->a_nvals = NULL;
2688					ber_bvarray_free( a->a_vals );
2689					a->a_vals = NULL;
2690					a->a_numvals = 0;
2691				}
2692				attr_valadd( a, si->si_ctxcsn, si->si_ctxcsn, si->si_numcsns );
2693			}
2694			ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
2695		}
2696	}
2697	return SLAP_CB_CONTINUE;
2698}
2699
2700enum {
2701	SP_CHKPT = 1,
2702	SP_SESSL,
2703	SP_NOPRES,
2704	SP_USEHINT
2705};
2706
2707static ConfigDriver sp_cf_gen;
2708
2709static ConfigTable spcfg[] = {
2710	{ "syncprov-checkpoint", "ops> <minutes", 3, 3, 0, ARG_MAGIC|SP_CHKPT,
2711		sp_cf_gen, "( OLcfgOvAt:1.1 NAME 'olcSpCheckpoint' "
2712			"DESC 'ContextCSN checkpoint interval in ops and minutes' "
2713			"SYNTAX OMsDirectoryString SINGLE-VALUE )", NULL, NULL },
2714	{ "syncprov-sessionlog", "ops", 2, 2, 0, ARG_INT|ARG_MAGIC|SP_SESSL,
2715		sp_cf_gen, "( OLcfgOvAt:1.2 NAME 'olcSpSessionlog' "
2716			"DESC 'Session log size in ops' "
2717			"SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
2718	{ "syncprov-nopresent", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_NOPRES,
2719		sp_cf_gen, "( OLcfgOvAt:1.3 NAME 'olcSpNoPresent' "
2720			"DESC 'Omit Present phase processing' "
2721			"SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL },
2722	{ "syncprov-reloadhint", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_USEHINT,
2723		sp_cf_gen, "( OLcfgOvAt:1.4 NAME 'olcSpReloadHint' "
2724			"DESC 'Observe Reload Hint in Request control' "
2725			"SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL },
2726	{ NULL, NULL, 0, 0, 0, ARG_IGNORED }
2727};
2728
2729static ConfigOCs spocs[] = {
2730	{ "( OLcfgOvOc:1.1 "
2731		"NAME 'olcSyncProvConfig' "
2732		"DESC 'SyncRepl Provider configuration' "
2733		"SUP olcOverlayConfig "
2734		"MAY ( olcSpCheckpoint "
2735			"$ olcSpSessionlog "
2736			"$ olcSpNoPresent "
2737			"$ olcSpReloadHint "
2738		") )",
2739			Cft_Overlay, spcfg },
2740	{ NULL, 0, NULL }
2741};
2742
2743static int
2744sp_cf_gen(ConfigArgs *c)
2745{
2746	slap_overinst		*on = (slap_overinst *)c->bi;
2747	syncprov_info_t		*si = (syncprov_info_t *)on->on_bi.bi_private;
2748	int rc = 0;
2749
2750	if ( c->op == SLAP_CONFIG_EMIT ) {
2751		switch ( c->type ) {
2752		case SP_CHKPT:
2753			if ( si->si_chkops || si->si_chktime ) {
2754				struct berval bv;
2755				/* we assume si_chktime is a multiple of 60
2756				 * because the parsed value was originally
2757				 * multiplied by 60 */
2758				bv.bv_len = snprintf( c->cr_msg, sizeof( c->cr_msg ),
2759					"%d %d", si->si_chkops, si->si_chktime/60 );
2760				if ( bv.bv_len >= sizeof( c->cr_msg ) ) {
2761					rc = 1;
2762				} else {
2763					bv.bv_val = c->cr_msg;
2764					value_add_one( &c->rvalue_vals, &bv );
2765				}
2766			} else {
2767				rc = 1;
2768			}
2769			break;
2770		case SP_SESSL:
2771			if ( si->si_logs ) {
2772				c->value_int = si->si_logs->sl_size;
2773			} else {
2774				rc = 1;
2775			}
2776			break;
2777		case SP_NOPRES:
2778			if ( si->si_nopres ) {
2779				c->value_int = 1;
2780			} else {
2781				rc = 1;
2782			}
2783			break;
2784		case SP_USEHINT:
2785			if ( si->si_usehint ) {
2786				c->value_int = 1;
2787			} else {
2788				rc = 1;
2789			}
2790			break;
2791		}
2792		return rc;
2793	} else if ( c->op == LDAP_MOD_DELETE ) {
2794		switch ( c->type ) {
2795		case SP_CHKPT:
2796			si->si_chkops = 0;
2797			si->si_chktime = 0;
2798			break;
2799		case SP_SESSL:
2800			if ( si->si_logs )
2801				si->si_logs->sl_size = 0;
2802			else
2803				rc = LDAP_NO_SUCH_ATTRIBUTE;
2804			break;
2805		case SP_NOPRES:
2806			if ( si->si_nopres )
2807				si->si_nopres = 0;
2808			else
2809				rc = LDAP_NO_SUCH_ATTRIBUTE;
2810			break;
2811		case SP_USEHINT:
2812			if ( si->si_usehint )
2813				si->si_usehint = 0;
2814			else
2815				rc = LDAP_NO_SUCH_ATTRIBUTE;
2816			break;
2817		}
2818		return rc;
2819	}
2820	switch ( c->type ) {
2821	case SP_CHKPT:
2822		if ( lutil_atoi( &si->si_chkops, c->argv[1] ) != 0 ) {
2823			snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint ops # \"%s\"",
2824				c->argv[0], c->argv[1] );
2825			Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
2826				"%s: %s\n", c->log, c->cr_msg, 0 );
2827			return ARG_BAD_CONF;
2828		}
2829		if ( si->si_chkops <= 0 ) {
2830			snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint ops # \"%d\"",
2831				c->argv[0], si->si_chkops );
2832			Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
2833				"%s: %s\n", c->log, c->cr_msg, 0 );
2834			return ARG_BAD_CONF;
2835		}
2836		if ( lutil_atoi( &si->si_chktime, c->argv[2] ) != 0 ) {
2837			snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint time \"%s\"",
2838				c->argv[0], c->argv[1] );
2839			Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
2840				"%s: %s\n", c->log, c->cr_msg, 0 );
2841			return ARG_BAD_CONF;
2842		}
2843		if ( si->si_chktime <= 0 ) {
2844			snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint time \"%d\"",
2845				c->argv[0], si->si_chkops );
2846			Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
2847				"%s: %s\n", c->log, c->cr_msg, 0 );
2848			return ARG_BAD_CONF;
2849		}
2850		si->si_chktime *= 60;
2851		break;
2852	case SP_SESSL: {
2853		sessionlog *sl;
2854		int size = c->value_int;
2855
2856		if ( size < 0 ) {
2857			snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s size %d is negative",
2858				c->argv[0], size );
2859			Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
2860				"%s: %s\n", c->log, c->cr_msg, 0 );
2861			return ARG_BAD_CONF;
2862		}
2863		sl = si->si_logs;
2864		if ( !sl ) {
2865			sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE );
2866			sl->sl_mincsn.bv_val = (char *)(sl+1);
2867			sl->sl_mincsn.bv_len = 0;
2868			sl->sl_num = 0;
2869			sl->sl_head = sl->sl_tail = NULL;
2870			ldap_pvt_thread_mutex_init( &sl->sl_mutex );
2871			si->si_logs = sl;
2872		}
2873		sl->sl_size = size;
2874		}
2875		break;
2876	case SP_NOPRES:
2877		si->si_nopres = c->value_int;
2878		break;
2879	case SP_USEHINT:
2880		si->si_usehint = c->value_int;
2881		break;
2882	}
2883	return rc;
2884}
2885
2886/* ITS#3456 we cannot run this search on the main thread, must use a
2887 * child thread in order to insure we have a big enough stack.
2888 */
2889static void *
2890syncprov_db_otask(
2891	void *ptr
2892)
2893{
2894	syncprov_findcsn( ptr, FIND_MAXCSN );
2895	return NULL;
2896}
2897
2898/* Read any existing contextCSN from the underlying db.
2899 * Then search for any entries newer than that. If no value exists,
2900 * just generate it. Cache whatever result.
2901 */
2902static int
2903syncprov_db_open(
2904	BackendDB *be,
2905	ConfigReply *cr
2906)
2907{
2908	slap_overinst   *on = (slap_overinst *) be->bd_info;
2909	syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
2910
2911	Connection conn = { 0 };
2912	OperationBuffer opbuf;
2913	Operation *op;
2914	Entry *e = NULL;
2915	Attribute *a;
2916	int rc;
2917	void *thrctx = NULL;
2918
2919	if ( !SLAP_LASTMOD( be )) {
2920		Debug( LDAP_DEBUG_ANY,
2921			"syncprov_db_open: invalid config, lastmod must be enabled\n", 0, 0, 0 );
2922		return -1;
2923	}
2924
2925	if ( slapMode & SLAP_TOOL_MODE ) {
2926		return 0;
2927	}
2928
2929	rc = overlay_register_control( be, LDAP_CONTROL_SYNC );
2930	if ( rc ) {
2931		return rc;
2932	}
2933
2934	thrctx = ldap_pvt_thread_pool_context();
2935	connection_fake_init2( &conn, &opbuf, thrctx, 0 );
2936	op = &opbuf.ob_op;
2937	op->o_bd = be;
2938	op->o_dn = be->be_rootdn;
2939	op->o_ndn = be->be_rootndn;
2940
2941	if ( SLAP_SYNC_SUBENTRY( be )) {
2942		build_new_dn( &si->si_contextdn, be->be_nsuffix,
2943			(struct berval *)&slap_ldapsync_cn_bv, NULL );
2944	} else {
2945		si->si_contextdn = be->be_nsuffix[0];
2946	}
2947	rc = overlay_entry_get_ov( op, &si->si_contextdn, NULL,
2948		slap_schema.si_ad_contextCSN, 0, &e, on );
2949
2950	if ( e ) {
2951		ldap_pvt_thread_t tid;
2952
2953		a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN );
2954		if ( a ) {
2955			ber_bvarray_dup_x( &si->si_ctxcsn, a->a_vals, NULL );
2956			si->si_numcsns = a->a_numvals;
2957			si->si_sids = slap_parse_csn_sids( si->si_ctxcsn, a->a_numvals, NULL );
2958		}
2959		overlay_entry_release_ov( op, e, 0, on );
2960		if ( si->si_ctxcsn && !SLAP_DBCLEAN( be )) {
2961			op->o_req_dn = be->be_suffix[0];
2962			op->o_req_ndn = be->be_nsuffix[0];
2963			op->ors_scope = LDAP_SCOPE_SUBTREE;
2964			ldap_pvt_thread_create( &tid, 0, syncprov_db_otask, op );
2965			ldap_pvt_thread_join( tid, NULL );
2966		}
2967	}
2968
2969	/* Didn't find a contextCSN, should we generate one? */
2970	if ( !si->si_ctxcsn ) {
2971		char csnbuf[ LDAP_LUTIL_CSNSTR_BUFSIZE ];
2972		struct berval csn;
2973
2974		if ( SLAP_SYNC_SHADOW( op->o_bd )) {
2975		/* If we're also a consumer, then don't generate anything.
2976		 * Wait for our provider to send it to us, or for a local
2977		 * modify if we have multimaster.
2978		 */
2979			goto out;
2980		}
2981		csn.bv_val = csnbuf;
2982		csn.bv_len = sizeof( csnbuf );
2983		slap_get_csn( op, &csn, 0 );
2984		value_add_one( &si->si_ctxcsn, &csn );
2985		si->si_numcsns = 1;
2986		si->si_sids = ch_malloc( sizeof(int) );
2987		si->si_sids[0] = slap_serverID;
2988
2989		/* make sure we do a checkpoint on close */
2990		si->si_numops++;
2991	}
2992
2993out:
2994	op->o_bd->bd_info = (BackendInfo *)on;
2995	return 0;
2996}
2997
2998/* Write the current contextCSN into the underlying db.
2999 */
3000static int
3001syncprov_db_close(
3002	BackendDB *be,
3003	ConfigReply *cr
3004)
3005{
3006    slap_overinst   *on = (slap_overinst *) be->bd_info;
3007    syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
3008
3009	if ( slapMode & SLAP_TOOL_MODE ) {
3010		return 0;
3011	}
3012	if ( si->si_numops ) {
3013		Connection conn = {0};
3014		OperationBuffer opbuf;
3015		Operation *op;
3016		SlapReply rs = {REP_RESULT};
3017		void *thrctx;
3018
3019		thrctx = ldap_pvt_thread_pool_context();
3020		connection_fake_init2( &conn, &opbuf, thrctx, 0 );
3021		op = &opbuf.ob_op;
3022		op->o_bd = be;
3023		op->o_dn = be->be_rootdn;
3024		op->o_ndn = be->be_rootndn;
3025		syncprov_checkpoint( op, &rs, on );
3026	}
3027
3028    return 0;
3029}
3030
3031static int
3032syncprov_db_init(
3033	BackendDB *be,
3034	ConfigReply *cr
3035)
3036{
3037	slap_overinst	*on = (slap_overinst *)be->bd_info;
3038	syncprov_info_t	*si;
3039
3040	if ( SLAP_ISGLOBALOVERLAY( be ) ) {
3041		Debug( LDAP_DEBUG_ANY,
3042			"syncprov must be instantiated within a database.\n",
3043			0, 0, 0 );
3044		return 1;
3045	}
3046
3047	si = ch_calloc(1, sizeof(syncprov_info_t));
3048	on->on_bi.bi_private = si;
3049	ldap_pvt_thread_rdwr_init( &si->si_csn_rwlock );
3050	ldap_pvt_thread_mutex_init( &si->si_ops_mutex );
3051	ldap_pvt_thread_mutex_init( &si->si_mods_mutex );
3052	ldap_pvt_thread_mutex_init( &si->si_resp_mutex );
3053
3054	csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN;
3055	csn_anlist[0].an_name = slap_schema.si_ad_entryCSN->ad_cname;
3056	csn_anlist[1].an_desc = slap_schema.si_ad_entryUUID;
3057	csn_anlist[1].an_name = slap_schema.si_ad_entryUUID->ad_cname;
3058
3059	uuid_anlist[0].an_desc = slap_schema.si_ad_entryUUID;
3060	uuid_anlist[0].an_name = slap_schema.si_ad_entryUUID->ad_cname;
3061
3062	return 0;
3063}
3064
3065static int
3066syncprov_db_destroy(
3067	BackendDB *be,
3068	ConfigReply *cr
3069)
3070{
3071	slap_overinst	*on = (slap_overinst *)be->bd_info;
3072	syncprov_info_t	*si = (syncprov_info_t *)on->on_bi.bi_private;
3073
3074	if ( si ) {
3075		if ( si->si_logs ) {
3076			slog_entry *se = si->si_logs->sl_head;
3077
3078			while ( se ) {
3079				slog_entry *se_next = se->se_next;
3080				ch_free( se );
3081				se = se_next;
3082			}
3083
3084			ch_free( si->si_logs );
3085		}
3086		if ( si->si_ctxcsn )
3087			ber_bvarray_free( si->si_ctxcsn );
3088		if ( si->si_sids )
3089			ch_free( si->si_sids );
3090		ldap_pvt_thread_mutex_destroy( &si->si_resp_mutex );
3091		ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex );
3092		ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex );
3093		ldap_pvt_thread_rdwr_destroy( &si->si_csn_rwlock );
3094		ch_free( si );
3095	}
3096
3097	return 0;
3098}
3099
3100static int syncprov_parseCtrl (
3101	Operation *op,
3102	SlapReply *rs,
3103	LDAPControl *ctrl )
3104{
3105	ber_tag_t tag;
3106	BerElementBuffer berbuf;
3107	BerElement *ber = (BerElement *)&berbuf;
3108	ber_int_t mode;
3109	ber_len_t len;
3110	struct berval cookie = BER_BVNULL;
3111	sync_control *sr;
3112	int rhint = 0;
3113
3114	if ( op->o_sync != SLAP_CONTROL_NONE ) {
3115		rs->sr_text = "Sync control specified multiple times";
3116		return LDAP_PROTOCOL_ERROR;
3117	}
3118
3119	if ( op->o_pagedresults != SLAP_CONTROL_NONE ) {
3120		rs->sr_text = "Sync control specified with pagedResults control";
3121		return LDAP_PROTOCOL_ERROR;
3122	}
3123
3124	if ( BER_BVISNULL( &ctrl->ldctl_value ) ) {
3125		rs->sr_text = "Sync control value is absent";
3126		return LDAP_PROTOCOL_ERROR;
3127	}
3128
3129	if ( BER_BVISEMPTY( &ctrl->ldctl_value ) ) {
3130		rs->sr_text = "Sync control value is empty";
3131		return LDAP_PROTOCOL_ERROR;
3132	}
3133
3134	/* Parse the control value
3135	 *      syncRequestValue ::= SEQUENCE {
3136	 *              mode   ENUMERATED {
3137	 *                      -- 0 unused
3138	 *                      refreshOnly		(1),
3139	 *                      -- 2 reserved
3140	 *                      refreshAndPersist	(3)
3141	 *              },
3142	 *              cookie  syncCookie OPTIONAL
3143	 *      }
3144	 */
3145
3146	ber_init2( ber, &ctrl->ldctl_value, 0 );
3147
3148	if ( (tag = ber_scanf( ber, "{i" /*}*/, &mode )) == LBER_ERROR ) {
3149		rs->sr_text = "Sync control : mode decoding error";
3150		return LDAP_PROTOCOL_ERROR;
3151	}
3152
3153	switch( mode ) {
3154	case LDAP_SYNC_REFRESH_ONLY:
3155		mode = SLAP_SYNC_REFRESH;
3156		break;
3157	case LDAP_SYNC_REFRESH_AND_PERSIST:
3158		mode = SLAP_SYNC_REFRESH_AND_PERSIST;
3159		break;
3160	default:
3161		rs->sr_text = "Sync control : unknown update mode";
3162		return LDAP_PROTOCOL_ERROR;
3163	}
3164
3165	tag = ber_peek_tag( ber, &len );
3166
3167	if ( tag == LDAP_TAG_SYNC_COOKIE ) {
3168		if (( ber_scanf( ber, /*{*/ "m", &cookie )) == LBER_ERROR ) {
3169			rs->sr_text = "Sync control : cookie decoding error";
3170			return LDAP_PROTOCOL_ERROR;
3171		}
3172		tag = ber_peek_tag( ber, &len );
3173	}
3174	if ( tag == LDAP_TAG_RELOAD_HINT ) {
3175		if (( ber_scanf( ber, /*{*/ "b", &rhint )) == LBER_ERROR ) {
3176			rs->sr_text = "Sync control : rhint decoding error";
3177			return LDAP_PROTOCOL_ERROR;
3178		}
3179	}
3180	if (( ber_scanf( ber, /*{*/ "}")) == LBER_ERROR ) {
3181			rs->sr_text = "Sync control : decoding error";
3182			return LDAP_PROTOCOL_ERROR;
3183	}
3184	sr = op->o_tmpcalloc( 1, sizeof(struct sync_control), op->o_tmpmemctx );
3185	sr->sr_rhint = rhint;
3186	if (!BER_BVISNULL(&cookie)) {
3187		ber_dupbv_x( &sr->sr_state.octet_str, &cookie, op->o_tmpmemctx );
3188		/* If parse fails, pretend no cookie was sent */
3189		if ( slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx ) ||
3190			sr->sr_state.rid == -1 ) {
3191			if ( sr->sr_state.ctxcsn ) {
3192				ber_bvarray_free_x( sr->sr_state.ctxcsn, op->o_tmpmemctx );
3193				sr->sr_state.ctxcsn = NULL;
3194			}
3195			sr->sr_state.numcsns = 0;
3196		}
3197	}
3198
3199	op->o_controls[slap_cids.sc_LDAPsync] = sr;
3200
3201	op->o_sync = ctrl->ldctl_iscritical
3202		? SLAP_CONTROL_CRITICAL
3203		: SLAP_CONTROL_NONCRITICAL;
3204
3205	op->o_sync_mode |= mode;	/* o_sync_mode shares o_sync */
3206
3207	return LDAP_SUCCESS;
3208}
3209
3210/* This overlay is set up for dynamic loading via moduleload. For static
3211 * configuration, you'll need to arrange for the slap_overinst to be
3212 * initialized and registered by some other function inside slapd.
3213 */
3214
3215static slap_overinst 		syncprov;
3216
3217int
3218syncprov_initialize()
3219{
3220	int rc;
3221
3222	rc = register_supported_control( LDAP_CONTROL_SYNC,
3223		SLAP_CTRL_SEARCH, NULL,
3224		syncprov_parseCtrl, &slap_cids.sc_LDAPsync );
3225	if ( rc != LDAP_SUCCESS ) {
3226		Debug( LDAP_DEBUG_ANY,
3227			"syncprov_init: Failed to register control %d\n", rc, 0, 0 );
3228		return rc;
3229	}
3230
3231	syncprov.on_bi.bi_type = "syncprov";
3232	syncprov.on_bi.bi_db_init = syncprov_db_init;
3233	syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;
3234	syncprov.on_bi.bi_db_open = syncprov_db_open;
3235	syncprov.on_bi.bi_db_close = syncprov_db_close;
3236
3237	syncprov.on_bi.bi_op_abandon = syncprov_op_abandon;
3238	syncprov.on_bi.bi_op_cancel = syncprov_op_abandon;
3239
3240	syncprov.on_bi.bi_op_add = syncprov_op_mod;
3241	syncprov.on_bi.bi_op_compare = syncprov_op_compare;
3242	syncprov.on_bi.bi_op_delete = syncprov_op_mod;
3243	syncprov.on_bi.bi_op_modify = syncprov_op_mod;
3244	syncprov.on_bi.bi_op_modrdn = syncprov_op_mod;
3245	syncprov.on_bi.bi_op_search = syncprov_op_search;
3246	syncprov.on_bi.bi_extended = syncprov_op_extended;
3247	syncprov.on_bi.bi_operational = syncprov_operational;
3248
3249	syncprov.on_bi.bi_cf_ocs = spocs;
3250
3251	generic_filter.f_desc = slap_schema.si_ad_objectClass;
3252
3253	rc = config_register_schema( spcfg, spocs );
3254	if ( rc ) return rc;
3255
3256	return overlay_register( &syncprov );
3257}
3258
3259#if SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC
3260int
3261init_module( int argc, char *argv[] )
3262{
3263	return syncprov_initialize();
3264}
3265#endif /* SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC */
3266
3267#endif /* defined(SLAPD_OVER_SYNCPROV) */
3268