1/* $OpenLDAP$ */
2/* syncprov.c - syncrepl provider */
3/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4 *
5 * Copyright 2004-2013 The OpenLDAP Foundation.
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted only as authorized by the OpenLDAP
10 * Public License.
11 *
12 * A copy of this license is available in the file LICENSE in the
13 * top-level directory of the distribution or, alternatively, at
14 * <http://www.OpenLDAP.org/license.html>.
15 */
16/* ACKNOWLEDGEMENTS:
17 * This work was initially developed by Howard Chu for inclusion in
18 * OpenLDAP Software.
19 */
20
21#include "portable.h"
22
23#ifdef SLAPD_OVER_SYNCPROV
24
25#include <ac/string.h>
26#include "lutil.h"
27#include "slap.h"
28#include "config.h"
29#include "ldap_rq.h"
30
31#ifdef LDAP_DEVEL
32#define	CHECK_CSN	1
33#endif
34
35/* A modify request on a particular entry */
36typedef struct modinst {
37	struct modinst *mi_next;
38	Operation *mi_op;
39} modinst;
40
41typedef struct modtarget {
42	struct modinst *mt_mods;
43	struct modinst *mt_tail;
44	struct berval mt_dn;
45	ldap_pvt_thread_mutex_t mt_mutex;
46} modtarget;
47
48/* A queued result of a persistent search */
49typedef struct syncres {
50	struct syncres *s_next;
51	Entry *s_e;
52	struct berval s_dn;
53	struct berval s_ndn;
54	struct berval s_uuid;
55	struct berval s_csn;
56	char s_mode;
57	char s_isreference;
58} syncres;
59
60/* Record of a persistent search */
61typedef struct syncops {
62	struct syncops *s_next;
63	struct berval	s_base;		/* ndn of search base */
64	ID		s_eid;		/* entryID of search base */
65	Operation	*s_op;		/* search op */
66	int		s_rid;
67	int		s_sid;
68	struct berval s_filterstr;
69	int		s_flags;	/* search status */
70#define	PS_IS_REFRESHING	0x01
71#define	PS_IS_DETACHED		0x02
72#define	PS_WROTE_BASE		0x04
73#define	PS_FIND_BASE		0x08
74#define	PS_FIX_FILTER		0x10
75#define	PS_TASK_QUEUED		0x20
76
77	int		s_inuse;	/* reference count */
78	struct syncres *s_res;
79	struct syncres *s_restail;
80	ldap_pvt_thread_mutex_t	s_mutex;
81} syncops;
82
83/* A received sync control */
84typedef struct sync_control {
85	struct sync_cookie sr_state;
86	int sr_rhint;
87} sync_control;
88
89#if 0 /* moved back to slap.h */
90#define	o_sync	o_ctrlflag[slap_cids.sc_LDAPsync]
91#endif
92/* o_sync_mode uses data bits of o_sync */
93#define	o_sync_mode	o_ctrlflag[slap_cids.sc_LDAPsync]
94
95#define SLAP_SYNC_NONE					(LDAP_SYNC_NONE<<SLAP_CONTROL_SHIFT)
96#define SLAP_SYNC_REFRESH				(LDAP_SYNC_REFRESH_ONLY<<SLAP_CONTROL_SHIFT)
97#define SLAP_SYNC_PERSIST				(LDAP_SYNC_RESERVED<<SLAP_CONTROL_SHIFT)
98#define SLAP_SYNC_REFRESH_AND_PERSIST	(LDAP_SYNC_REFRESH_AND_PERSIST<<SLAP_CONTROL_SHIFT)
99
100/* Record of which searches matched at premodify step */
101typedef struct syncmatches {
102	struct syncmatches *sm_next;
103	syncops *sm_op;
104} syncmatches;
105
106/* Session log data */
107typedef struct slog_entry {
108	struct slog_entry *se_next;
109	struct berval se_uuid;
110	struct berval se_csn;
111	int	se_sid;
112	ber_tag_t	se_tag;
113} slog_entry;
114
115typedef struct sessionlog {
116	BerVarray	sl_mincsn;
117	int		*sl_sids;
118	int		sl_numcsns;
119	int		sl_num;
120	int		sl_size;
121	slog_entry *sl_head;
122	slog_entry *sl_tail;
123	ldap_pvt_thread_mutex_t sl_mutex;
124} sessionlog;
125
126/* The main state for this overlay */
127typedef struct syncprov_info_t {
128	syncops		*si_ops;
129	struct berval	si_contextdn;
130	BerVarray	si_ctxcsn;	/* ldapsync context */
131	int		*si_sids;
132	int		si_numcsns;
133	int		si_chkops;	/* checkpointing info */
134	int		si_chktime;
135	int		si_numops;	/* number of ops since last checkpoint */
136	int		si_nopres;	/* Skip present phase */
137	int		si_usehint;	/* use reload hint */
138	int		si_active;	/* True if there are active mods */
139	int		si_dirty;	/* True if the context is dirty, i.e changes
140						 * have been made without updating the csn. */
141	time_t	si_chklast;	/* time of last checkpoint */
142	Avlnode	*si_mods;	/* entries being modified */
143	sessionlog	*si_logs;
144	ldap_pvt_thread_rdwr_t	si_csn_rwlock;
145	ldap_pvt_thread_mutex_t	si_ops_mutex;
146	ldap_pvt_thread_mutex_t	si_mods_mutex;
147	ldap_pvt_thread_mutex_t	si_resp_mutex;
148} syncprov_info_t;
149
150typedef struct opcookie {
151	slap_overinst *son;
152	syncmatches *smatches;
153	modtarget *smt;
154	Entry *se;
155	struct berval sdn;	/* DN of entry, for deletes */
156	struct berval sndn;
157	struct berval suuid;	/* UUID of entry */
158	struct berval sctxcsn;
159	short osid;	/* sid of op csn */
160	short rsid;	/* sid of relay */
161	short sreference;	/* Is the entry a reference? */
162} opcookie;
163
164typedef struct mutexint {
165	ldap_pvt_thread_mutex_t mi_mutex;
166	int mi_int;
167} mutexint;
168
169typedef struct fbase_cookie {
170	struct berval *fdn;	/* DN of a modified entry, for scope testing */
171	syncops *fss;	/* persistent search we're testing against */
172	int fbase;	/* if TRUE we found the search base and it's still valid */
173	int fscope;	/* if TRUE then fdn is within the psearch scope */
174} fbase_cookie;
175
176static AttributeName csn_anlist[3];
177static AttributeName uuid_anlist[2];
178
179/* Build a LDAPsync intermediate state control */
180static int
181syncprov_state_ctrl(
182	Operation	*op,
183	SlapReply	*rs,
184	Entry		*e,
185	int		entry_sync_state,
186	LDAPControl	**ctrls,
187	int		num_ctrls,
188	int		send_cookie,
189	struct berval	*cookie )
190{
191	Attribute* a;
192	int ret;
193
194	BerElementBuffer berbuf;
195	BerElement *ber = (BerElement *)&berbuf;
196	LDAPControl *cp;
197	struct berval bv;
198	struct berval	entryuuid_bv = BER_BVNULL;
199
200	ber_init2( ber, 0, LBER_USE_DER );
201	ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx );
202
203	for ( a = e->e_attrs; a != NULL; a = a->a_next ) {
204		AttributeDescription *desc = a->a_desc;
205		if ( desc == slap_schema.si_ad_entryUUID ) {
206			entryuuid_bv = a->a_nvals[0];
207			break;
208		}
209	}
210
211	/* FIXME: what if entryuuid is NULL or empty ? */
212
213	if ( send_cookie && cookie ) {
214		ber_printf( ber, "{eOON}",
215			entry_sync_state, &entryuuid_bv, cookie );
216	} else {
217		ber_printf( ber, "{eON}",
218			entry_sync_state, &entryuuid_bv );
219	}
220
221	ret = ber_flatten2( ber, &bv, 0 );
222	if ( ret == 0 ) {
223		cp = op->o_tmpalloc( sizeof( LDAPControl ) + bv.bv_len, op->o_tmpmemctx );
224		cp->ldctl_oid = LDAP_CONTROL_SYNC_STATE;
225		cp->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL);
226		cp->ldctl_value.bv_val = (char *)&cp[1];
227		cp->ldctl_value.bv_len = bv.bv_len;
228		AC_MEMCPY( cp->ldctl_value.bv_val, bv.bv_val, bv.bv_len );
229		ctrls[num_ctrls] = cp;
230	}
231	ber_free_buf( ber );
232
233	if ( ret < 0 ) {
234		Debug( LDAP_DEBUG_TRACE,
235			"slap_build_sync_ctrl: ber_flatten2 failed (%d)\n",
236			ret, 0, 0 );
237		send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
238		return LDAP_OTHER;
239	}
240
241	return LDAP_SUCCESS;
242}
243
244/* Build a LDAPsync final state control */
245static int
246syncprov_done_ctrl(
247	Operation	*op,
248	SlapReply	*rs,
249	LDAPControl	**ctrls,
250	int			num_ctrls,
251	int			send_cookie,
252	struct berval *cookie,
253	int			refreshDeletes )
254{
255	int ret;
256	BerElementBuffer berbuf;
257	BerElement *ber = (BerElement *)&berbuf;
258	LDAPControl *cp;
259	struct berval bv;
260
261	ber_init2( ber, NULL, LBER_USE_DER );
262	ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx );
263
264	ber_printf( ber, "{" );
265	if ( send_cookie && cookie ) {
266		ber_printf( ber, "O", cookie );
267	}
268	if ( refreshDeletes == LDAP_SYNC_REFRESH_DELETES ) {
269		ber_printf( ber, "b", refreshDeletes );
270	}
271	ber_printf( ber, "N}" );
272
273	ret = ber_flatten2( ber, &bv, 0 );
274	if ( ret == 0 ) {
275		cp = op->o_tmpalloc( sizeof( LDAPControl ) + bv.bv_len, op->o_tmpmemctx );
276		cp->ldctl_oid = LDAP_CONTROL_SYNC_DONE;
277		cp->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL);
278		cp->ldctl_value.bv_val = (char *)&cp[1];
279		cp->ldctl_value.bv_len = bv.bv_len;
280		AC_MEMCPY( cp->ldctl_value.bv_val, bv.bv_val, bv.bv_len );
281		ctrls[num_ctrls] = cp;
282	}
283
284	ber_free_buf( ber );
285
286	if ( ret < 0 ) {
287		Debug( LDAP_DEBUG_TRACE,
288			"syncprov_done_ctrl: ber_flatten2 failed (%d)\n",
289			ret, 0, 0 );
290		send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
291		return LDAP_OTHER;
292	}
293
294	return LDAP_SUCCESS;
295}
296
297static int
298syncprov_sendinfo(
299	Operation	*op,
300	SlapReply	*rs,
301	int			type,
302	struct berval *cookie,
303	int			refreshDone,
304	BerVarray	syncUUIDs,
305	int			refreshDeletes )
306{
307	BerElementBuffer berbuf;
308	BerElement *ber = (BerElement *)&berbuf;
309	struct berval rspdata;
310
311	int ret;
312
313	ber_init2( ber, NULL, LBER_USE_DER );
314	ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx );
315
316	if ( type ) {
317		switch ( type ) {
318		case LDAP_TAG_SYNC_NEW_COOKIE:
319			ber_printf( ber, "tO", type, cookie );
320			break;
321		case LDAP_TAG_SYNC_REFRESH_DELETE:
322		case LDAP_TAG_SYNC_REFRESH_PRESENT:
323			ber_printf( ber, "t{", type );
324			if ( cookie ) {
325				ber_printf( ber, "O", cookie );
326			}
327			if ( refreshDone == 0 ) {
328				ber_printf( ber, "b", refreshDone );
329			}
330			ber_printf( ber, "N}" );
331			break;
332		case LDAP_TAG_SYNC_ID_SET:
333			ber_printf( ber, "t{", type );
334			if ( cookie ) {
335				ber_printf( ber, "O", cookie );
336			}
337			if ( refreshDeletes == 1 ) {
338				ber_printf( ber, "b", refreshDeletes );
339			}
340			ber_printf( ber, "[W]", syncUUIDs );
341			ber_printf( ber, "N}" );
342			break;
343		default:
344			Debug( LDAP_DEBUG_TRACE,
345				"syncprov_sendinfo: invalid syncinfo type (%d)\n",
346				type, 0, 0 );
347			return LDAP_OTHER;
348		}
349	}
350
351	ret = ber_flatten2( ber, &rspdata, 0 );
352
353	if ( ret < 0 ) {
354		Debug( LDAP_DEBUG_TRACE,
355			"syncprov_sendinfo: ber_flatten2 failed (%d)\n",
356			ret, 0, 0 );
357		send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
358		return LDAP_OTHER;
359	}
360
361	rs->sr_rspoid = LDAP_SYNC_INFO;
362	rs->sr_rspdata = &rspdata;
363	send_ldap_intermediate( op, rs );
364	rs->sr_rspdata = NULL;
365	ber_free_buf( ber );
366
367	return LDAP_SUCCESS;
368}
369
370/* Find a modtarget in an AVL tree */
371static int
372sp_avl_cmp( const void *c1, const void *c2 )
373{
374	const modtarget *m1, *m2;
375	int rc;
376
377	m1 = c1; m2 = c2;
378	rc = m1->mt_dn.bv_len - m2->mt_dn.bv_len;
379
380	if ( rc ) return rc;
381	return ber_bvcmp( &m1->mt_dn, &m2->mt_dn );
382}
383
384/* syncprov_findbase:
385 *   finds the true DN of the base of a search (with alias dereferencing) and
386 * checks to make sure the base entry doesn't get replaced with a different
387 * entry (e.g., swapping trees via ModDN, or retargeting an alias). If a
388 * change is detected, any persistent search on this base must be terminated /
389 * reloaded.
390 *   On the first call, we just save the DN and entryID. On subsequent calls
391 * we compare the DN and entryID with the saved values.
392 */
393static int
394findbase_cb( Operation *op, SlapReply *rs )
395{
396	slap_callback *sc = op->o_callback;
397
398	if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
399		fbase_cookie *fc = sc->sc_private;
400
401		/* If no entryID, we're looking for the first time.
402		 * Just store whatever we got.
403		 */
404		if ( fc->fss->s_eid == NOID ) {
405			fc->fbase = 2;
406			fc->fss->s_eid = rs->sr_entry->e_id;
407			ber_dupbv( &fc->fss->s_base, &rs->sr_entry->e_nname );
408
409		} else if ( rs->sr_entry->e_id == fc->fss->s_eid &&
410			dn_match( &rs->sr_entry->e_nname, &fc->fss->s_base )) {
411
412		/* OK, the DN is the same and the entryID is the same. */
413			fc->fbase = 1;
414		}
415	}
416	if ( rs->sr_err != LDAP_SUCCESS ) {
417		Debug( LDAP_DEBUG_ANY, "findbase failed! %d\n", rs->sr_err,0,0 );
418	}
419	return LDAP_SUCCESS;
420}
421
422static Filter generic_filter = { LDAP_FILTER_PRESENT, { 0 }, NULL };
423static struct berval generic_filterstr = BER_BVC("(objectclass=*)");
424
425static int
426syncprov_findbase( Operation *op, fbase_cookie *fc )
427{
428	/* Use basic parameters from syncrepl search, but use
429	 * current op's threadctx / tmpmemctx
430	 */
431	ldap_pvt_thread_mutex_lock( &fc->fss->s_mutex );
432	if ( fc->fss->s_flags & PS_FIND_BASE ) {
433		slap_callback cb = {0};
434		Operation fop;
435		SlapReply frs = { REP_RESULT };
436		int rc;
437
438		fc->fss->s_flags ^= PS_FIND_BASE;
439		ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex );
440
441		fop = *fc->fss->s_op;
442
443		fop.o_bd = fop.o_bd->bd_self;
444		fop.o_hdr = op->o_hdr;
445		fop.o_time = op->o_time;
446		fop.o_tincr = op->o_tincr;
447
448		cb.sc_response = findbase_cb;
449		cb.sc_private = fc;
450
451		fop.o_sync_mode = 0;	/* turn off sync mode */
452		fop.o_managedsait = SLAP_CONTROL_CRITICAL;
453		fop.o_callback = &cb;
454		LDAP_SLIST_INIT( &fop.o_extra );
455		fop.o_tag = LDAP_REQ_SEARCH;
456		fop.ors_scope = LDAP_SCOPE_BASE;
457		fop.ors_limit = NULL;
458		fop.ors_slimit = 1;
459		fop.ors_tlimit = SLAP_NO_LIMIT;
460		fop.ors_attrs = slap_anlist_no_attrs;
461		fop.ors_attrsonly = 1;
462		fop.ors_filter = &generic_filter;
463		fop.ors_filterstr = generic_filterstr;
464
465		rc = fop.o_bd->be_search( &fop, &frs );
466	} else {
467		ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex );
468		fc->fbase = 1;
469	}
470
471	/* After the first call, see if the fdn resides in the scope */
472	if ( fc->fbase == 1 ) {
473		switch ( fc->fss->s_op->ors_scope ) {
474		case LDAP_SCOPE_BASE:
475			fc->fscope = dn_match( fc->fdn, &fc->fss->s_base );
476			break;
477		case LDAP_SCOPE_ONELEVEL: {
478			struct berval pdn;
479			dnParent( fc->fdn, &pdn );
480			fc->fscope = dn_match( &pdn, &fc->fss->s_base );
481			break; }
482		case LDAP_SCOPE_SUBTREE:
483			fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base );
484			break;
485		case LDAP_SCOPE_SUBORDINATE:
486			fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ) &&
487				!dn_match( fc->fdn, &fc->fss->s_base );
488			break;
489		}
490	}
491
492	if ( fc->fbase )
493		return LDAP_SUCCESS;
494
495	/* If entryID has changed, then the base of this search has
496	 * changed. Invalidate the psearch.
497	 */
498	return LDAP_NO_SUCH_OBJECT;
499}
500
501/* syncprov_findcsn:
502 *   This function has three different purposes, but they all use a search
503 * that filters on entryCSN so they're combined here.
504 * 1: at startup time, after a contextCSN has been read from the database,
505 * we search for all entries with CSN >= contextCSN in case the contextCSN
506 * was not checkpointed at the previous shutdown.
507 *
508 * 2: when the current contextCSN is known and we have a sync cookie, we search
509 * for one entry with CSN = the cookie CSN. If not found, try <= cookie CSN.
510 * If an entry is found, the cookie CSN is valid, otherwise it is stale.
511 *
512 * 3: during a refresh phase, we search for all entries with CSN <= the cookie
513 * CSN, and generate Present records for them. We always collect this result
514 * in SyncID sets, even if there's only one match.
515 */
516typedef enum find_csn_t {
517	FIND_MAXCSN	= 1,
518	FIND_CSN	= 2,
519	FIND_PRESENT	= 3
520} find_csn_t;
521
522static int
523findmax_cb( Operation *op, SlapReply *rs )
524{
525	if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
526		struct berval *maxcsn = op->o_callback->sc_private;
527		Attribute *a = attr_find( rs->sr_entry->e_attrs,
528			slap_schema.si_ad_entryCSN );
529
530		if ( a && ber_bvcmp( &a->a_vals[0], maxcsn ) > 0 &&
531			slap_parse_csn_sid( &a->a_vals[0] ) == slap_serverID ) {
532			maxcsn->bv_len = a->a_vals[0].bv_len;
533			strcpy( maxcsn->bv_val, a->a_vals[0].bv_val );
534		}
535	}
536	return LDAP_SUCCESS;
537}
538
539static int
540findcsn_cb( Operation *op, SlapReply *rs )
541{
542	slap_callback *sc = op->o_callback;
543
544	/* We just want to know that at least one exists, so it's OK if
545	 * we exceed the unchecked limit.
546	 */
547	if ( rs->sr_err == LDAP_ADMINLIMIT_EXCEEDED ||
548		(rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS )) {
549		sc->sc_private = (void *)1;
550	}
551	return LDAP_SUCCESS;
552}
553
554/* Build a list of entryUUIDs for sending in a SyncID set */
555
556#define UUID_LEN	16
557
558typedef struct fpres_cookie {
559	int num;
560	BerVarray uuids;
561	char *last;
562} fpres_cookie;
563
564static int
565findpres_cb( Operation *op, SlapReply *rs )
566{
567	slap_callback *sc = op->o_callback;
568	fpres_cookie *pc = sc->sc_private;
569	Attribute *a;
570	int ret = SLAP_CB_CONTINUE;
571
572	switch ( rs->sr_type ) {
573	case REP_SEARCH:
574		a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID );
575		if ( a ) {
576			pc->uuids[pc->num].bv_val = pc->last;
577			AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val,
578				pc->uuids[pc->num].bv_len );
579			pc->num++;
580			pc->last = pc->uuids[pc->num].bv_val;
581			pc->uuids[pc->num].bv_val = NULL;
582		}
583		ret = LDAP_SUCCESS;
584		if ( pc->num != SLAP_SYNCUUID_SET_SIZE )
585			break;
586		/* FALLTHRU */
587	case REP_RESULT:
588		ret = rs->sr_err;
589		if ( pc->num ) {
590			ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
591				0, pc->uuids, 0 );
592			pc->uuids[pc->num].bv_val = pc->last;
593			pc->num = 0;
594			pc->last = pc->uuids[0].bv_val;
595		}
596		break;
597	default:
598		break;
599	}
600	return ret;
601}
602
603static int
604syncprov_findcsn( Operation *op, find_csn_t mode, struct berval *csn )
605{
606	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
607	syncprov_info_t		*si = on->on_bi.bi_private;
608
609	slap_callback cb = {0};
610	Operation fop;
611	SlapReply frs = { REP_RESULT };
612	char buf[LDAP_PVT_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")];
613	char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
614	struct berval maxcsn;
615	Filter cf;
616	AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
617	fpres_cookie pcookie;
618	sync_control *srs = NULL;
619	struct slap_limits_set fc_limits;
620	int i, rc = LDAP_SUCCESS, findcsn_retry = 1;
621	int maxid;
622
623	if ( mode != FIND_MAXCSN ) {
624		srs = op->o_controls[slap_cids.sc_LDAPsync];
625	}
626
627	fop = *op;
628	fop.o_sync_mode &= SLAP_CONTROL_MASK;	/* turn off sync_mode */
629	/* We want pure entries, not referrals */
630	fop.o_managedsait = SLAP_CONTROL_CRITICAL;
631
632	cf.f_ava = &eq;
633	cf.f_av_desc = slap_schema.si_ad_entryCSN;
634	BER_BVZERO( &cf.f_av_value );
635	cf.f_next = NULL;
636
637	fop.o_callback = &cb;
638	fop.ors_limit = NULL;
639	fop.ors_tlimit = SLAP_NO_LIMIT;
640	fop.ors_filter = &cf;
641	fop.ors_filterstr.bv_val = buf;
642
643again:
644	switch( mode ) {
645	case FIND_MAXCSN:
646		cf.f_choice = LDAP_FILTER_GE;
647		/* If there are multiple CSNs, use the one with our serverID */
648		for ( i=0; i<si->si_numcsns; i++) {
649			if ( slap_serverID == si->si_sids[i] ) {
650				maxid = i;
651				break;
652			}
653		}
654		if ( i == si->si_numcsns ) {
655			/* No match: this is multimaster, and none of the content in the DB
656			 * originated locally. Treat like no CSN.
657			 */
658			return LDAP_NO_SUCH_OBJECT;
659		}
660		cf.f_av_value = si->si_ctxcsn[maxid];
661		fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ),
662			"(entryCSN>=%s)", cf.f_av_value.bv_val );
663		if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) {
664			return LDAP_OTHER;
665		}
666		fop.ors_attrsonly = 0;
667		fop.ors_attrs = csn_anlist;
668		fop.ors_slimit = SLAP_NO_LIMIT;
669		cb.sc_private = &maxcsn;
670		cb.sc_response = findmax_cb;
671		strcpy( cbuf, cf.f_av_value.bv_val );
672		maxcsn.bv_val = cbuf;
673		maxcsn.bv_len = cf.f_av_value.bv_len;
674		break;
675	case FIND_CSN:
676		if ( BER_BVISEMPTY( &cf.f_av_value )) {
677			cf.f_av_value = *csn;
678		}
679		fop.o_dn = op->o_bd->be_rootdn;
680		fop.o_ndn = op->o_bd->be_rootndn;
681		fop.o_req_dn = op->o_bd->be_suffix[0];
682		fop.o_req_ndn = op->o_bd->be_nsuffix[0];
683		/* Look for exact match the first time */
684		if ( findcsn_retry ) {
685			cf.f_choice = LDAP_FILTER_EQUALITY;
686			fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ),
687				"(entryCSN=%s)", cf.f_av_value.bv_val );
688		/* On retry, look for <= */
689		} else {
690			cf.f_choice = LDAP_FILTER_LE;
691			fop.ors_limit = &fc_limits;
692			memset( &fc_limits, 0, sizeof( fc_limits ));
693			fc_limits.lms_s_unchecked = 1;
694			fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ),
695				"(entryCSN<=%s)", cf.f_av_value.bv_val );
696		}
697		if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) {
698			return LDAP_OTHER;
699		}
700		fop.ors_attrsonly = 1;
701		fop.ors_attrs = slap_anlist_no_attrs;
702		fop.ors_slimit = 1;
703		cb.sc_private = NULL;
704		cb.sc_response = findcsn_cb;
705		break;
706	case FIND_PRESENT:
707		fop.ors_filter = op->ors_filter;
708		fop.ors_filterstr = op->ors_filterstr;
709		fop.ors_attrsonly = 0;
710		fop.ors_attrs = uuid_anlist;
711		fop.ors_slimit = SLAP_NO_LIMIT;
712		cb.sc_private = &pcookie;
713		cb.sc_response = findpres_cb;
714		pcookie.num = 0;
715
716		/* preallocate storage for a full set */
717		pcookie.uuids = op->o_tmpalloc( (SLAP_SYNCUUID_SET_SIZE+1) *
718			sizeof(struct berval) + SLAP_SYNCUUID_SET_SIZE * UUID_LEN,
719			op->o_tmpmemctx );
720		pcookie.last = (char *)(pcookie.uuids + SLAP_SYNCUUID_SET_SIZE+1);
721		pcookie.uuids[0].bv_val = pcookie.last;
722		pcookie.uuids[0].bv_len = UUID_LEN;
723		for (i=1; i<SLAP_SYNCUUID_SET_SIZE; i++) {
724			pcookie.uuids[i].bv_val = pcookie.uuids[i-1].bv_val + UUID_LEN;
725			pcookie.uuids[i].bv_len = UUID_LEN;
726		}
727		break;
728	}
729
730	fop.o_bd->bd_info = (BackendInfo *)on->on_info;
731	fop.o_bd->be_search( &fop, &frs );
732	fop.o_bd->bd_info = (BackendInfo *)on;
733
734	switch( mode ) {
735	case FIND_MAXCSN:
736		if ( ber_bvcmp( &si->si_ctxcsn[maxid], &maxcsn )) {
737#ifdef CHECK_CSN
738			Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
739			assert( !syn->ssyn_validate( syn, &maxcsn ));
740#endif
741			ber_bvreplace( &si->si_ctxcsn[maxid], &maxcsn );
742			si->si_numops++;	/* ensure a checkpoint */
743		}
744		break;
745	case FIND_CSN:
746		/* If matching CSN was not found, invalidate the context. */
747		if ( !cb.sc_private ) {
748			/* If we didn't find an exact match, then try for <= */
749			if ( findcsn_retry ) {
750				findcsn_retry = 0;
751				rs_reinit( &frs, REP_RESULT );
752				goto again;
753			}
754			rc = LDAP_NO_SUCH_OBJECT;
755		}
756		break;
757	case FIND_PRESENT:
758		op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx );
759		break;
760	}
761
762	return rc;
763}
764
765/* Should find a place to cache these */
766static mutexint *get_mutexint()
767{
768	mutexint *mi = ch_malloc( sizeof( mutexint ));
769	ldap_pvt_thread_mutex_init( &mi->mi_mutex );
770	mi->mi_int = 1;
771	return mi;
772}
773
774static void inc_mutexint( mutexint *mi )
775{
776	ldap_pvt_thread_mutex_lock( &mi->mi_mutex );
777	mi->mi_int++;
778	ldap_pvt_thread_mutex_unlock( &mi->mi_mutex );
779}
780
781/* return resulting counter */
782static int dec_mutexint( mutexint *mi )
783{
784	int i;
785	ldap_pvt_thread_mutex_lock( &mi->mi_mutex );
786	i = --mi->mi_int;
787	ldap_pvt_thread_mutex_unlock( &mi->mi_mutex );
788	if ( !i ) {
789		ldap_pvt_thread_mutex_destroy( &mi->mi_mutex );
790		ch_free( mi );
791	}
792	return i;
793}
794
795static void
796syncprov_free_syncop( syncops *so )
797{
798	syncres *sr, *srnext;
799	GroupAssertion *ga, *gnext;
800
801	ldap_pvt_thread_mutex_lock( &so->s_mutex );
802	/* already being freed, or still in use */
803	if ( !so->s_inuse || --so->s_inuse > 0 ) {
804		ldap_pvt_thread_mutex_unlock( &so->s_mutex );
805		return;
806	}
807	ldap_pvt_thread_mutex_unlock( &so->s_mutex );
808	if ( so->s_flags & PS_IS_DETACHED ) {
809		filter_free( so->s_op->ors_filter );
810		for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
811			gnext = ga->ga_next;
812			ch_free( ga );
813		}
814		ch_free( so->s_op );
815	}
816	ch_free( so->s_base.bv_val );
817	for ( sr=so->s_res; sr; sr=srnext ) {
818		srnext = sr->s_next;
819		if ( sr->s_e ) {
820			if ( !dec_mutexint( sr->s_e->e_private )) {
821				sr->s_e->e_private = NULL;
822				entry_free( sr->s_e );
823			}
824		}
825		ch_free( sr );
826	}
827	ldap_pvt_thread_mutex_destroy( &so->s_mutex );
828	ch_free( so );
829}
830
831/* Send a persistent search response */
832static int
833syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode )
834{
835	SlapReply rs = { REP_SEARCH };
836	struct berval cookie, csns[2];
837	Entry e_uuid = {0};
838	Attribute a_uuid = {0};
839
840	if ( so->s_op->o_abandon )
841		return SLAPD_ABANDON;
842
843	rs.sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, op->o_tmpmemctx );
844	rs.sr_ctrls[1] = NULL;
845	rs.sr_flags = REP_CTRLS_MUSTBEFREED;
846	csns[0] = opc->sctxcsn;
847	BER_BVZERO( &csns[1] );
848	slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, slap_serverID ? slap_serverID : -1 );
849
850#ifdef LDAP_DEBUG
851	if ( so->s_sid > 0 ) {
852		Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: to=%03x, cookie=%s\n",
853			so->s_sid, cookie.bv_val, 0 );
854	} else {
855		Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: cookie=%s\n",
856			cookie.bv_val, 0, 0 );
857	}
858#endif
859
860	e_uuid.e_attrs = &a_uuid;
861	a_uuid.a_desc = slap_schema.si_ad_entryUUID;
862	a_uuid.a_nvals = &opc->suuid;
863	rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid,
864		mode, rs.sr_ctrls, 0, 1, &cookie );
865	op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
866
867	rs.sr_entry = &e_uuid;
868	if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) {
869		e_uuid = *opc->se;
870		e_uuid.e_private = NULL;
871	}
872
873	switch( mode ) {
874	case LDAP_SYNC_ADD:
875		if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
876			rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
877			rs.sr_err = send_search_reference( op, &rs );
878			ber_bvarray_free( rs.sr_ref );
879			break;
880		}
881		/* fallthru */
882	case LDAP_SYNC_MODIFY:
883		rs.sr_attrs = op->ors_attrs;
884		rs.sr_err = send_search_entry( op, &rs );
885		break;
886	case LDAP_SYNC_DELETE:
887		e_uuid.e_attrs = NULL;
888		e_uuid.e_name = opc->sdn;
889		e_uuid.e_nname = opc->sndn;
890		if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
891			struct berval bv = BER_BVNULL;
892			rs.sr_ref = &bv;
893			rs.sr_err = send_search_reference( op, &rs );
894		} else {
895			rs.sr_err = send_search_entry( op, &rs );
896		}
897		break;
898	default:
899		assert(0);
900	}
901	return rs.sr_err;
902}
903
904static void
905syncprov_qstart( syncops *so );
906
907/* Play back queued responses */
908static int
909syncprov_qplay( Operation *op, syncops *so )
910{
911	slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
912	syncres *sr;
913	opcookie opc;
914	int rc = 0;
915
916	opc.son = on;
917
918	do {
919		ldap_pvt_thread_mutex_lock( &so->s_mutex );
920		sr = so->s_res;
921		/* Exit loop with mutex held */
922		if ( !sr )
923			break;
924		so->s_res = sr->s_next;
925		if ( !so->s_res )
926			so->s_restail = NULL;
927		ldap_pvt_thread_mutex_unlock( &so->s_mutex );
928
929		if ( !so->s_op->o_abandon ) {
930
931			if ( sr->s_mode == LDAP_SYNC_NEW_COOKIE ) {
932				SlapReply rs = { REP_INTERMEDIATE };
933
934				rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE,
935					&sr->s_csn, 0, NULL, 0 );
936			} else {
937				opc.sdn = sr->s_dn;
938				opc.sndn = sr->s_ndn;
939				opc.suuid = sr->s_uuid;
940				opc.sctxcsn = sr->s_csn;
941				opc.sreference = sr->s_isreference;
942				opc.se = sr->s_e;
943
944				rc = syncprov_sendresp( op, &opc, so, sr->s_mode );
945			}
946		}
947
948		if ( sr->s_e ) {
949			if ( !dec_mutexint( sr->s_e->e_private )) {
950				sr->s_e->e_private = NULL;
951				entry_free ( sr->s_e );
952			}
953		}
954		ch_free( sr );
955
956		if ( so->s_op->o_abandon )
957			continue;
958
959		/* Exit loop with mutex held */
960		ldap_pvt_thread_mutex_lock( &so->s_mutex );
961		break;
962
963	} while (1);
964
965	/* We now only send one change at a time, to prevent one
966	 * psearch from hogging all the CPU. Resubmit this task if
967	 * there are more responses queued and no errors occurred.
968	 */
969
970	if ( rc == 0 && so->s_res ) {
971		syncprov_qstart( so );
972	} else {
973		so->s_flags ^= PS_TASK_QUEUED;
974	}
975
976	ldap_pvt_thread_mutex_unlock( &so->s_mutex );
977	return rc;
978}
979
980/* task for playing back queued responses */
981static void *
982syncprov_qtask( void *ctx, void *arg )
983{
984	syncops *so = arg;
985	OperationBuffer opbuf;
986	Operation *op;
987	BackendDB be;
988	int rc;
989
990	op = &opbuf.ob_op;
991	*op = *so->s_op;
992	op->o_hdr = &opbuf.ob_hdr;
993	op->o_controls = opbuf.ob_controls;
994	memset( op->o_controls, 0, sizeof(opbuf.ob_controls) );
995	op->o_sync = SLAP_CONTROL_IGNORED;
996
997	*op->o_hdr = *so->s_op->o_hdr;
998
999	op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx, 1);
1000	op->o_tmpmfuncs = &slap_sl_mfuncs;
1001	op->o_threadctx = ctx;
1002
1003	/* syncprov_qplay expects a fake db */
1004	be = *so->s_op->o_bd;
1005	be.be_flags |= SLAP_DBFLAG_OVERLAY;
1006	op->o_bd = &be;
1007	LDAP_SLIST_FIRST(&op->o_extra) = NULL;
1008	op->o_callback = NULL;
1009
1010	rc = syncprov_qplay( op, so );
1011
1012	/* decrement use count... */
1013	syncprov_free_syncop( so );
1014
1015	return NULL;
1016}
1017
1018/* Start the task to play back queued psearch responses */
1019static void
1020syncprov_qstart( syncops *so )
1021{
1022	so->s_flags |= PS_TASK_QUEUED;
1023	so->s_inuse++;
1024	ldap_pvt_thread_pool_submit( &connection_pool,
1025		syncprov_qtask, so );
1026}
1027
1028/* Queue a persistent search response */
1029static int
1030syncprov_qresp( opcookie *opc, syncops *so, int mode )
1031{
1032	syncres *sr;
1033	int srsize;
1034	struct berval cookie = opc->sctxcsn;
1035
1036	if ( mode == LDAP_SYNC_NEW_COOKIE ) {
1037		syncprov_info_t	*si = opc->son->on_bi.bi_private;
1038
1039		slap_compose_sync_cookie( NULL, &cookie, si->si_ctxcsn,
1040			so->s_rid, slap_serverID ? slap_serverID : -1);
1041	}
1042
1043	srsize = sizeof(syncres) + opc->suuid.bv_len + 1 +
1044		opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1;
1045	if ( cookie.bv_len )
1046		srsize += cookie.bv_len + 1;
1047	sr = ch_malloc( srsize );
1048	sr->s_next = NULL;
1049	sr->s_e = opc->se;
1050	/* bump refcount on this entry */
1051	if ( opc->se )
1052		inc_mutexint( opc->se->e_private );
1053	sr->s_dn.bv_val = (char *)(sr + 1);
1054	sr->s_dn.bv_len = opc->sdn.bv_len;
1055	sr->s_mode = mode;
1056	sr->s_isreference = opc->sreference;
1057	sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val,
1058		 opc->sdn.bv_val ) + 1;
1059	sr->s_ndn.bv_len = opc->sndn.bv_len;
1060	sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val,
1061		 opc->sndn.bv_val ) + 1;
1062	sr->s_uuid.bv_len = opc->suuid.bv_len;
1063	AC_MEMCPY( sr->s_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
1064	if ( cookie.bv_len ) {
1065		sr->s_csn.bv_val = sr->s_uuid.bv_val + sr->s_uuid.bv_len + 1;
1066		strcpy( sr->s_csn.bv_val, cookie.bv_val );
1067	} else {
1068		sr->s_csn.bv_val = NULL;
1069	}
1070	sr->s_csn.bv_len = cookie.bv_len;
1071
1072	if ( mode == LDAP_SYNC_NEW_COOKIE && cookie.bv_val ) {
1073		ch_free( cookie.bv_val );
1074	}
1075
1076	ldap_pvt_thread_mutex_lock( &so->s_mutex );
1077	if ( !so->s_res ) {
1078		so->s_res = sr;
1079	} else {
1080		so->s_restail->s_next = sr;
1081	}
1082	so->s_restail = sr;
1083
1084	/* If the base of the psearch was modified, check it next time round */
1085	if ( so->s_flags & PS_WROTE_BASE ) {
1086		so->s_flags ^= PS_WROTE_BASE;
1087		so->s_flags |= PS_FIND_BASE;
1088	}
1089	if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) {
1090		syncprov_qstart( so );
1091	}
1092	ldap_pvt_thread_mutex_unlock( &so->s_mutex );
1093	return LDAP_SUCCESS;
1094}
1095
1096static int
1097syncprov_drop_psearch( syncops *so, int lock )
1098{
1099	if ( so->s_flags & PS_IS_DETACHED ) {
1100		if ( lock )
1101			ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex );
1102		so->s_op->o_conn->c_n_ops_executing--;
1103		so->s_op->o_conn->c_n_ops_completed++;
1104		LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, Operation,
1105			o_next );
1106		if ( lock )
1107			ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex );
1108	}
1109	syncprov_free_syncop( so );
1110
1111	return 0;
1112}
1113
1114static int
1115syncprov_ab_cleanup( Operation *op, SlapReply *rs )
1116{
1117	slap_callback *sc = op->o_callback;
1118	op->o_callback = sc->sc_next;
1119	syncprov_drop_psearch( op->o_callback->sc_private, 0 );
1120	op->o_tmpfree( sc, op->o_tmpmemctx );
1121	return 0;
1122}
1123
1124static int
1125syncprov_op_abandon( Operation *op, SlapReply *rs )
1126{
1127	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
1128	syncprov_info_t		*si = on->on_bi.bi_private;
1129	syncops *so, *soprev;
1130
1131	ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1132	for ( so=si->si_ops, soprev = (syncops *)&si->si_ops; so;
1133		soprev=so, so=so->s_next ) {
1134		if ( so->s_op->o_connid == op->o_connid &&
1135			so->s_op->o_msgid == op->orn_msgid ) {
1136				so->s_op->o_abandon = 1;
1137				soprev->s_next = so->s_next;
1138				break;
1139		}
1140	}
1141	ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1142	if ( so ) {
1143		/* Is this really a Cancel exop? */
1144		if ( op->o_tag != LDAP_REQ_ABANDON ) {
1145			so->s_op->o_cancel = SLAP_CANCEL_ACK;
1146			rs->sr_err = LDAP_CANCELLED;
1147			send_ldap_result( so->s_op, rs );
1148			if ( so->s_flags & PS_IS_DETACHED ) {
1149				slap_callback *cb;
1150				cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx );
1151				cb->sc_cleanup = syncprov_ab_cleanup;
1152				cb->sc_next = op->o_callback;
1153				cb->sc_private = so;
1154				op->o_callback = cb;
1155				return SLAP_CB_CONTINUE;
1156			}
1157		}
1158		syncprov_drop_psearch( so, 0 );
1159	}
1160	return SLAP_CB_CONTINUE;
1161}
1162
1163/* Find which persistent searches are affected by this operation */
1164static void
1165syncprov_matchops( Operation *op, opcookie *opc, int saveit )
1166{
1167	slap_overinst *on = opc->son;
1168	syncprov_info_t		*si = on->on_bi.bi_private;
1169
1170	fbase_cookie fc;
1171	syncops *ss, *sprev, *snext;
1172	Entry *e = NULL;
1173	Attribute *a;
1174	int rc;
1175	struct berval newdn;
1176	int freefdn = 0;
1177	BackendDB *b0 = op->o_bd, db;
1178
1179	fc.fdn = &op->o_req_ndn;
1180	/* compute new DN */
1181	if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) {
1182		struct berval pdn;
1183		if ( op->orr_nnewSup ) pdn = *op->orr_nnewSup;
1184		else dnParent( fc.fdn, &pdn );
1185		build_new_dn( &newdn, &pdn, &op->orr_nnewrdn, op->o_tmpmemctx );
1186		fc.fdn = &newdn;
1187		freefdn = 1;
1188	}
1189	if ( op->o_tag != LDAP_REQ_ADD ) {
1190		if ( !SLAP_ISOVERLAY( op->o_bd )) {
1191			db = *op->o_bd;
1192			op->o_bd = &db;
1193		}
1194		rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on );
1195		/* If we're sending responses now, make a copy and unlock the DB */
1196		if ( e && !saveit ) {
1197			if ( !opc->se ) {
1198				opc->se = entry_dup( e );
1199				opc->se->e_private = get_mutexint();
1200			}
1201			overlay_entry_release_ov( op, e, 0, on );
1202			e = opc->se;
1203		}
1204		if ( rc ) {
1205			op->o_bd = b0;
1206			return;
1207		}
1208	} else {
1209		e = op->ora_e;
1210		if ( !saveit ) {
1211			if ( !opc->se ) {
1212				opc->se = entry_dup( e );
1213				opc->se->e_private = get_mutexint();
1214			}
1215			e = opc->se;
1216		}
1217	}
1218
1219	if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
1220		ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
1221		ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
1222		opc->sreference = is_entry_referral( e );
1223		a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID );
1224		if ( a )
1225			ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx );
1226	} else if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) {
1227		op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx );
1228		op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx );
1229		ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
1230		ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
1231	}
1232
1233	ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1234	for (ss = si->si_ops, sprev = (syncops *)&si->si_ops; ss;
1235		sprev = ss, ss=snext)
1236	{
1237		Operation op2;
1238		Opheader oh;
1239		syncmatches *sm;
1240		int found = 0;
1241
1242		snext = ss->s_next;
1243		if ( ss->s_op->o_abandon )
1244			continue;
1245
1246		/* Don't send ops back to the originator */
1247		if ( opc->osid > 0 && opc->osid == ss->s_sid ) {
1248			Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping original sid %03x\n",
1249				opc->osid, 0, 0 );
1250			continue;
1251		}
1252
1253		/* Don't send ops back to the messenger */
1254		if ( opc->rsid > 0 && opc->rsid == ss->s_sid ) {
1255			Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping relayed sid %03x\n",
1256				opc->rsid, 0, 0 );
1257			continue;
1258		}
1259
1260		/* validate base */
1261		fc.fss = ss;
1262		fc.fbase = 0;
1263		fc.fscope = 0;
1264
1265		/* If the base of the search is missing, signal a refresh */
1266		rc = syncprov_findbase( op, &fc );
1267		if ( rc != LDAP_SUCCESS ) {
1268			SlapReply rs = {REP_RESULT};
1269			send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED,
1270				"search base has changed" );
1271			sprev->s_next = snext;
1272			syncprov_drop_psearch( ss, 1 );
1273			ss = sprev;
1274			continue;
1275		}
1276
1277		/* If we're sending results now, look for this op in old matches */
1278		if ( !saveit ) {
1279			syncmatches *old;
1280
1281			/* Did we modify the search base? */
1282			if ( dn_match( &op->o_req_ndn, &ss->s_base )) {
1283				ldap_pvt_thread_mutex_lock( &ss->s_mutex );
1284				ss->s_flags |= PS_WROTE_BASE;
1285				ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
1286			}
1287
1288			for ( sm=opc->smatches, old=(syncmatches *)&opc->smatches; sm;
1289				old=sm, sm=sm->sm_next ) {
1290				if ( sm->sm_op == ss ) {
1291					found = 1;
1292					old->sm_next = sm->sm_next;
1293					op->o_tmpfree( sm, op->o_tmpmemctx );
1294					break;
1295				}
1296			}
1297		}
1298
1299		if ( fc.fscope ) {
1300			ldap_pvt_thread_mutex_lock( &ss->s_mutex );
1301			op2 = *ss->s_op;
1302			oh = *op->o_hdr;
1303			oh.oh_conn = ss->s_op->o_conn;
1304			oh.oh_connid = ss->s_op->o_connid;
1305			op2.o_bd = op->o_bd->bd_self;
1306			op2.o_hdr = &oh;
1307			op2.o_extra = op->o_extra;
1308			op2.o_callback = NULL;
1309			if (ss->s_flags & PS_FIX_FILTER) {
1310				/* Skip the AND/GE clause that we stuck on in front. We
1311				   would lose deletes/mods that happen during the refresh
1312				   phase otherwise (ITS#6555) */
1313				op2.ors_filter = ss->s_op->ors_filter->f_and->f_next;
1314			}
1315			ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
1316			rc = test_filter( &op2, e, op2.ors_filter );
1317		}
1318
1319		Debug( LDAP_DEBUG_TRACE, "syncprov_matchops: sid %03x fscope %d rc %d\n",
1320			ss->s_sid, fc.fscope, rc );
1321
1322		/* check if current o_req_dn is in scope and matches filter */
1323		if ( fc.fscope && rc == LDAP_COMPARE_TRUE ) {
1324			if ( saveit ) {
1325				sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx );
1326				sm->sm_next = opc->smatches;
1327				sm->sm_op = ss;
1328				ldap_pvt_thread_mutex_lock( &ss->s_mutex );
1329				++ss->s_inuse;
1330				ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
1331				opc->smatches = sm;
1332			} else {
1333				/* if found send UPDATE else send ADD */
1334				syncprov_qresp( opc, ss,
1335					found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );
1336			}
1337		} else if ( !saveit && found ) {
1338			/* send DELETE */
1339			syncprov_qresp( opc, ss, LDAP_SYNC_DELETE );
1340		} else if ( !saveit ) {
1341			syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE );
1342		}
1343		if ( !saveit && found ) {
1344			/* Decrement s_inuse, was incremented when called
1345			 * with saveit == TRUE
1346			 */
1347			syncprov_free_syncop( ss );
1348		}
1349	}
1350	ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1351
1352	if ( op->o_tag != LDAP_REQ_ADD && e ) {
1353		if ( !SLAP_ISOVERLAY( op->o_bd )) {
1354			op->o_bd = &db;
1355		}
1356		if ( saveit )
1357			overlay_entry_release_ov( op, e, 0, on );
1358		op->o_bd = b0;
1359	}
1360	if ( opc->se && !saveit ) {
1361		if ( !dec_mutexint( opc->se->e_private )) {
1362			opc->se->e_private = NULL;
1363			entry_free( opc->se );
1364			opc->se = NULL;
1365		}
1366	}
1367	if ( freefdn ) {
1368		op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx );
1369	}
1370	op->o_bd = b0;
1371}
1372
1373static int
1374syncprov_op_cleanup( Operation *op, SlapReply *rs )
1375{
1376	slap_callback *cb = op->o_callback;
1377	opcookie *opc = cb->sc_private;
1378	slap_overinst *on = opc->son;
1379	syncprov_info_t		*si = on->on_bi.bi_private;
1380	syncmatches *sm, *snext;
1381	modtarget *mt;
1382
1383	ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1384	if ( si->si_active )
1385		si->si_active--;
1386	ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1387
1388	for (sm = opc->smatches; sm; sm=snext) {
1389		snext = sm->sm_next;
1390		syncprov_free_syncop( sm->sm_op );
1391		op->o_tmpfree( sm, op->o_tmpmemctx );
1392	}
1393
1394	/* Remove op from lock table */
1395	mt = opc->smt;
1396	if ( mt ) {
1397		ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
1398		/* If there are more, promote the next one */
1399		if ( mt->mt_mods ) {
1400			mt->mt_mods = mt->mt_mods->mi_next;
1401		}
1402		if ( mt->mt_mods ) {
1403			ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
1404		} else {
1405			ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
1406			ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
1407			avl_delete( &si->si_mods, mt, sp_avl_cmp );
1408			ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
1409			ldap_pvt_thread_mutex_destroy( &mt->mt_mutex );
1410			ch_free( mt->mt_dn.bv_val );
1411			ch_free( mt );
1412		}
1413	}
1414	if ( !BER_BVISNULL( &opc->suuid ))
1415		op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx );
1416	if ( !BER_BVISNULL( &opc->sndn ))
1417		op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx );
1418	if ( !BER_BVISNULL( &opc->sdn ))
1419		op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx );
1420	op->o_callback = cb->sc_next;
1421	op->o_tmpfree(cb, op->o_tmpmemctx);
1422
1423	return 0;
1424}
1425
1426static void
1427syncprov_checkpoint( Operation *op, slap_overinst *on )
1428{
1429	syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
1430	Modifications mod;
1431	Operation opm;
1432	SlapReply rsm = {REP_RESULT};
1433	slap_callback cb = {0};
1434	BackendDB be;
1435	BackendInfo *bi;
1436
1437#ifdef CHECK_CSN
1438	Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
1439
1440	int i;
1441	for ( i=0; i<si->si_numcsns; i++ ) {
1442		assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i ));
1443	}
1444#endif
1445	mod.sml_numvals = si->si_numcsns;
1446	mod.sml_values = si->si_ctxcsn;
1447	mod.sml_nvalues = NULL;
1448	mod.sml_desc = slap_schema.si_ad_contextCSN;
1449	mod.sml_op = LDAP_MOD_REPLACE;
1450	mod.sml_flags = SLAP_MOD_INTERNAL;
1451	mod.sml_next = NULL;
1452
1453	cb.sc_response = slap_null_cb;
1454	opm = *op;
1455	opm.o_tag = LDAP_REQ_MODIFY;
1456	opm.o_callback = &cb;
1457	opm.orm_modlist = &mod;
1458	opm.orm_no_opattrs = 1;
1459	if ( SLAP_GLUE_SUBORDINATE( op->o_bd )) {
1460		be = *on->on_info->oi_origdb;
1461		opm.o_bd = &be;
1462	}
1463	opm.o_req_dn = si->si_contextdn;
1464	opm.o_req_ndn = si->si_contextdn;
1465	bi = opm.o_bd->bd_info;
1466	opm.o_bd->bd_info = on->on_info->oi_orig;
1467	opm.o_managedsait = SLAP_CONTROL_NONCRITICAL;
1468	opm.o_no_schema_check = 1;
1469	opm.o_bd->be_modify( &opm, &rsm );
1470
1471	if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT &&
1472		SLAP_SYNC_SUBENTRY( opm.o_bd )) {
1473		const char	*text;
1474		char txtbuf[SLAP_TEXT_BUFLEN];
1475		size_t textlen = sizeof txtbuf;
1476		Entry *e = slap_create_context_csn_entry( opm.o_bd, NULL );
1477		rs_reinit( &rsm, REP_RESULT );
1478		slap_mods2entry( &mod, &e, 0, 1, &text, txtbuf, textlen);
1479		opm.ora_e = e;
1480		opm.o_bd->be_add( &opm, &rsm );
1481		if ( e == opm.ora_e )
1482			be_entry_release_w( &opm, opm.ora_e );
1483	}
1484	opm.o_bd->bd_info = bi;
1485
1486	if ( mod.sml_next != NULL ) {
1487		slap_mods_free( mod.sml_next, 1 );
1488	}
1489#ifdef CHECK_CSN
1490	for ( i=0; i<si->si_numcsns; i++ ) {
1491		assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i ));
1492	}
1493#endif
1494}
1495
1496static void
1497syncprov_add_slog( Operation *op )
1498{
1499	opcookie *opc = op->o_callback->sc_private;
1500	slap_overinst *on = opc->son;
1501	syncprov_info_t		*si = on->on_bi.bi_private;
1502	sessionlog *sl;
1503	slog_entry *se;
1504
1505	sl = si->si_logs;
1506	{
1507		if ( BER_BVISEMPTY( &op->o_csn ) ) {
1508			/* During the syncrepl refresh phase we can receive operations
1509			 * without a csn.  We cannot reliably determine the consumers
1510			 * state with respect to such operations, so we ignore them and
1511			 * wipe out anything in the log if we see them.
1512			 */
1513			ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
1514			while ( se = sl->sl_head ) {
1515				sl->sl_head = se->se_next;
1516				ch_free( se );
1517			}
1518			sl->sl_tail = NULL;
1519			sl->sl_num = 0;
1520			ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
1521			return;
1522		}
1523
1524		/* Allocate a record. UUIDs are not NUL-terminated. */
1525		se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
1526			op->o_csn.bv_len + 1 );
1527		se->se_next = NULL;
1528		se->se_tag = op->o_tag;
1529
1530		se->se_uuid.bv_val = (char *)(&se[1]);
1531		AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
1532		se->se_uuid.bv_len = opc->suuid.bv_len;
1533
1534		se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len;
1535		AC_MEMCPY( se->se_csn.bv_val, op->o_csn.bv_val, op->o_csn.bv_len );
1536		se->se_csn.bv_val[op->o_csn.bv_len] = '\0';
1537		se->se_csn.bv_len = op->o_csn.bv_len;
1538		se->se_sid = slap_parse_csn_sid( &se->se_csn );
1539
1540		ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
1541		if ( sl->sl_head ) {
1542			/* Keep the list in csn order. */
1543			if ( ber_bvcmp( &sl->sl_tail->se_csn, &se->se_csn ) <= 0 ) {
1544				sl->sl_tail->se_next = se;
1545				sl->sl_tail = se;
1546			} else {
1547				slog_entry **sep;
1548
1549				for ( sep = &sl->sl_head; *sep; sep = &(*sep)->se_next ) {
1550					if ( ber_bvcmp( &se->se_csn, &(*sep)->se_csn ) < 0 ) {
1551						se->se_next = *sep;
1552						*sep = se;
1553						break;
1554					}
1555				}
1556			}
1557		} else {
1558			sl->sl_head = se;
1559			sl->sl_tail = se;
1560			if ( !sl->sl_mincsn ) {
1561				sl->sl_numcsns = 1;
1562				sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval ));
1563				sl->sl_sids = ch_malloc( sizeof( int ));
1564				sl->sl_sids[0] = se->se_sid;
1565				ber_dupbv( sl->sl_mincsn, &se->se_csn );
1566				BER_BVZERO( &sl->sl_mincsn[1] );
1567			}
1568		}
1569		sl->sl_num++;
1570		while ( sl->sl_num > sl->sl_size ) {
1571			int i, j;
1572			se = sl->sl_head;
1573			sl->sl_head = se->se_next;
1574			for ( i=0; i<sl->sl_numcsns; i++ )
1575				if ( sl->sl_sids[i] >= se->se_sid )
1576					break;
1577			if  ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) {
1578				slap_insert_csn_sids( (struct sync_cookie *)sl,
1579					i, se->se_sid, &se->se_csn );
1580			} else {
1581				ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn );
1582			}
1583			ch_free( se );
1584			sl->sl_num--;
1585		}
1586		ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
1587	}
1588}
1589
1590/* Just set a flag if we found the matching entry */
1591static int
1592playlog_cb( Operation *op, SlapReply *rs )
1593{
1594	if ( rs->sr_type == REP_SEARCH ) {
1595		op->o_callback->sc_private = (void *)1;
1596	}
1597	return rs->sr_err;
1598}
1599
1600/* enter with sl->sl_mutex locked, release before returning */
1601static void
1602syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
1603	sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids )
1604{
1605	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
1606	slog_entry *se;
1607	int i, j, ndel, num, nmods, mmods;
1608	char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
1609	BerVarray uuids;
1610	struct berval delcsn[2];
1611
1612	if ( !sl->sl_num ) {
1613		ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
1614		return;
1615	}
1616
1617	num = sl->sl_num;
1618	i = 0;
1619	nmods = 0;
1620
1621	uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) +
1622		num * UUID_LEN, op->o_tmpmemctx );
1623	uuids[0].bv_val = (char *)(uuids + num + 1);
1624
1625	delcsn[0].bv_len = 0;
1626	delcsn[0].bv_val = cbuf;
1627	BER_BVZERO(&delcsn[1]);
1628
1629	/* Make a copy of the relevant UUIDs. Put the Deletes up front
1630	 * and everything else at the end. Do this first so we can
1631	 * unlock the list mutex.
1632	 */
1633	Debug( LDAP_DEBUG_SYNC, "srs csn %s\n",
1634		srs->sr_state.ctxcsn[0].bv_val, 0, 0 );
1635	for ( se=sl->sl_head; se; se=se->se_next ) {
1636		int k;
1637		Debug( LDAP_DEBUG_SYNC, "log csn %s\n", se->se_csn.bv_val, 0, 0 );
1638		ndel = 1;
1639		for ( k=0; k<srs->sr_state.numcsns; k++ ) {
1640			if ( se->se_sid == srs->sr_state.sids[k] ) {
1641				ndel = ber_bvcmp( &se->se_csn, &srs->sr_state.ctxcsn[k] );
1642				break;
1643			}
1644		}
1645		if ( ndel <= 0 ) {
1646			Debug( LDAP_DEBUG_SYNC, "cmp %d, too old\n", ndel, 0, 0 );
1647			continue;
1648		}
1649		ndel = 0;
1650		for ( k=0; k<numcsns; k++ ) {
1651			if ( se->se_sid == sids[k] ) {
1652				ndel = ber_bvcmp( &se->se_csn, &ctxcsn[k] );
1653				break;
1654			}
1655		}
1656		if ( ndel > 0 ) {
1657			Debug( LDAP_DEBUG_SYNC, "cmp %d, too new\n", ndel, 0, 0 );
1658			break;
1659		}
1660		if ( se->se_tag == LDAP_REQ_DELETE ) {
1661			j = i;
1662			i++;
1663			AC_MEMCPY( cbuf, se->se_csn.bv_val, se->se_csn.bv_len );
1664			delcsn[0].bv_len = se->se_csn.bv_len;
1665			delcsn[0].bv_val[delcsn[0].bv_len] = '\0';
1666		} else {
1667			if ( se->se_tag == LDAP_REQ_ADD )
1668				continue;
1669			nmods++;
1670			j = num - nmods;
1671		}
1672		uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN);
1673		AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN);
1674		uuids[j].bv_len = UUID_LEN;
1675	}
1676	ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
1677
1678	ndel = i;
1679
1680	/* Zero out unused slots */
1681	for ( i=ndel; i < num - nmods; i++ )
1682		uuids[i].bv_len = 0;
1683
1684	/* Mods must be validated to see if they belong in this delete set.
1685	 */
1686
1687	mmods = nmods;
1688	/* Strip any duplicates */
1689	for ( i=0; i<nmods; i++ ) {
1690		for ( j=0; j<ndel; j++ ) {
1691			if ( bvmatch( &uuids[j], &uuids[num - 1 - i] )) {
1692				uuids[num - 1 - i].bv_len = 0;
1693				mmods --;
1694				break;
1695			}
1696		}
1697		if ( uuids[num - 1 - i].bv_len == 0 ) continue;
1698		for ( j=0; j<i; j++ ) {
1699			if ( bvmatch( &uuids[num - 1 - j], &uuids[num - 1 - i] )) {
1700				uuids[num - 1 - i].bv_len = 0;
1701				mmods --;
1702				break;
1703			}
1704		}
1705	}
1706
1707	if ( mmods ) {
1708		Operation fop;
1709		int rc;
1710		Filter mf, af;
1711		AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
1712		slap_callback cb = {0};
1713
1714		fop = *op;
1715
1716		fop.o_sync_mode = 0;
1717		fop.o_callback = &cb;
1718		fop.ors_limit = NULL;
1719		fop.ors_tlimit = SLAP_NO_LIMIT;
1720		fop.ors_attrs = slap_anlist_all_attributes;
1721		fop.ors_attrsonly = 0;
1722		fop.o_managedsait = SLAP_CONTROL_CRITICAL;
1723
1724		af.f_choice = LDAP_FILTER_AND;
1725		af.f_next = NULL;
1726		af.f_and = &mf;
1727		mf.f_choice = LDAP_FILTER_EQUALITY;
1728		mf.f_ava = &eq;
1729		mf.f_av_desc = slap_schema.si_ad_entryUUID;
1730		mf.f_next = fop.ors_filter;
1731
1732		fop.ors_filter = &af;
1733
1734		cb.sc_response = playlog_cb;
1735		fop.o_bd->bd_info = (BackendInfo *)on->on_info;
1736
1737		for ( i=ndel; i<num; i++ ) {
1738		  if ( uuids[i].bv_len != 0 ) {
1739			SlapReply frs = { REP_RESULT };
1740
1741			mf.f_av_value = uuids[i];
1742			cb.sc_private = NULL;
1743			fop.ors_slimit = 1;
1744			rc = fop.o_bd->be_search( &fop, &frs );
1745
1746			/* If entry was not found, add to delete list */
1747			if ( !cb.sc_private ) {
1748				uuids[ndel++] = uuids[i];
1749			}
1750		  }
1751		}
1752		fop.o_bd->bd_info = (BackendInfo *)on;
1753	}
1754	if ( ndel ) {
1755		struct berval cookie;
1756
1757		if ( delcsn[0].bv_len ) {
1758			slap_compose_sync_cookie( op, &cookie, delcsn, srs->sr_state.rid,
1759				slap_serverID ? slap_serverID : -1 );
1760
1761			Debug( LDAP_DEBUG_SYNC, "syncprov_playlog: cookie=%s\n", cookie.bv_val, 0, 0 );
1762		}
1763
1764		uuids[ndel].bv_val = NULL;
1765		syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET,
1766			delcsn[0].bv_len ? &cookie : NULL, 0, uuids, 1 );
1767		if ( delcsn[0].bv_len ) {
1768			op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
1769		}
1770	}
1771	op->o_tmpfree( uuids, op->o_tmpmemctx );
1772}
1773
1774static int
1775syncprov_op_response( Operation *op, SlapReply *rs )
1776{
1777	opcookie *opc = op->o_callback->sc_private;
1778	slap_overinst *on = opc->son;
1779	syncprov_info_t		*si = on->on_bi.bi_private;
1780	syncmatches *sm;
1781
1782	if ( rs->sr_err == LDAP_SUCCESS )
1783	{
1784		struct berval maxcsn;
1785		char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
1786		int do_check = 0, have_psearches, foundit, csn_changed = 0;
1787
1788		ldap_pvt_thread_mutex_lock( &si->si_resp_mutex );
1789
1790		/* Update our context CSN */
1791		cbuf[0] = '\0';
1792		maxcsn.bv_val = cbuf;
1793		maxcsn.bv_len = sizeof(cbuf);
1794		ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock );
1795
1796		slap_get_commit_csn( op, &maxcsn, &foundit );
1797		if ( BER_BVISEMPTY( &maxcsn ) && SLAP_GLUE_SUBORDINATE( op->o_bd )) {
1798			/* syncrepl queues the CSN values in the db where
1799			 * it is configured , not where the changes are made.
1800			 * So look for a value in the glue db if we didn't
1801			 * find any in this db.
1802			 */
1803			BackendDB *be = op->o_bd;
1804			op->o_bd = select_backend( &be->be_nsuffix[0], 1);
1805			maxcsn.bv_val = cbuf;
1806			maxcsn.bv_len = sizeof(cbuf);
1807			slap_get_commit_csn( op, &maxcsn, &foundit );
1808			op->o_bd = be;
1809		}
1810		if ( !BER_BVISEMPTY( &maxcsn ) ) {
1811			int i, sid;
1812#ifdef CHECK_CSN
1813			Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
1814			assert( !syn->ssyn_validate( syn, &maxcsn ));
1815#endif
1816			sid = slap_parse_csn_sid( &maxcsn );
1817			for ( i=0; i<si->si_numcsns; i++ ) {
1818				if ( sid < si->si_sids[i] )
1819					break;
1820				if ( sid == si->si_sids[i] ) {
1821					if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn[i] ) > 0 ) {
1822						ber_bvreplace( &si->si_ctxcsn[i], &maxcsn );
1823						csn_changed = 1;
1824					}
1825					break;
1826				}
1827			}
1828			/* It's a new SID for us */
1829			if ( i == si->si_numcsns || sid != si->si_sids[i] ) {
1830				slap_insert_csn_sids((struct sync_cookie *)&(si->si_ctxcsn),
1831					i, sid, &maxcsn );
1832				csn_changed = 1;
1833			}
1834		}
1835
1836		/* Don't do any processing for consumer contextCSN updates */
1837		if ( op->o_dont_replicate ) {
1838			if ( op->o_tag == LDAP_REQ_MODIFY &&
1839				op->orm_modlist->sml_op == LDAP_MOD_REPLACE &&
1840				op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) {
1841			/* Catch contextCSN updates from syncrepl. We have to look at
1842			 * all the attribute values, as there may be more than one csn
1843			 * that changed, and only one can be passed in the csn queue.
1844			 */
1845			Modifications *mod = op->orm_modlist;
1846			unsigned i;
1847			int j, sid;
1848
1849			for ( i=0; i<mod->sml_numvals; i++ ) {
1850				sid = slap_parse_csn_sid( &mod->sml_values[i] );
1851				for ( j=0; j<si->si_numcsns; j++ ) {
1852					if ( sid < si->si_sids[j] )
1853						break;
1854					if ( sid == si->si_sids[j] ) {
1855						if ( ber_bvcmp( &mod->sml_values[i], &si->si_ctxcsn[j] ) > 0 ) {
1856							ber_bvreplace( &si->si_ctxcsn[j], &mod->sml_values[i] );
1857							csn_changed = 1;
1858						}
1859						break;
1860					}
1861				}
1862
1863				if ( j == si->si_numcsns || sid != si->si_sids[j] ) {
1864					slap_insert_csn_sids( (struct sync_cookie *)&si->si_ctxcsn,
1865						j, sid, &mod->sml_values[i] );
1866					csn_changed = 1;
1867				}
1868			}
1869			if ( csn_changed )
1870				si->si_dirty = 0;
1871			ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
1872
1873			if ( csn_changed ) {
1874				syncops *ss;
1875				ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1876				for ( ss = si->si_ops; ss; ss = ss->s_next ) {
1877					if ( ss->s_op->o_abandon )
1878						continue;
1879					/* Send the updated csn to all syncrepl consumers,
1880					 * including the server from which it originated.
1881					 * The syncrepl consumer and syncprov provider on
1882					 * the originating server may be configured to store
1883					 * their csn values in different entries.
1884					 */
1885					syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE );
1886				}
1887				ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1888			}
1889			} else {
1890			ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
1891			}
1892			goto leave;
1893		}
1894
1895		si->si_numops++;
1896		if ( si->si_chkops || si->si_chktime ) {
1897			/* Never checkpoint adding the context entry,
1898			 * it will deadlock
1899			 */
1900			if ( op->o_tag != LDAP_REQ_ADD ||
1901				!dn_match( &op->o_req_ndn, &si->si_contextdn )) {
1902				if ( si->si_chkops && si->si_numops >= si->si_chkops ) {
1903					do_check = 1;
1904					si->si_numops = 0;
1905				}
1906				if ( si->si_chktime &&
1907					(op->o_time - si->si_chklast >= si->si_chktime )) {
1908					if ( si->si_chklast ) {
1909						do_check = 1;
1910						si->si_chklast = op->o_time;
1911					} else {
1912						si->si_chklast = 1;
1913					}
1914				}
1915			}
1916		}
1917		si->si_dirty = !csn_changed;
1918		ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
1919
1920		if ( do_check ) {
1921			ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
1922			syncprov_checkpoint( op, on );
1923			ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
1924		}
1925
1926		/* only update consumer ctx if this is a newer csn */
1927		if ( csn_changed ) {
1928			opc->sctxcsn = maxcsn;
1929		}
1930
1931		/* Handle any persistent searches */
1932		ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
1933		have_psearches = ( si->si_ops != NULL );
1934		ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
1935		if ( have_psearches ) {
1936			switch(op->o_tag) {
1937			case LDAP_REQ_ADD:
1938			case LDAP_REQ_MODIFY:
1939			case LDAP_REQ_MODRDN:
1940			case LDAP_REQ_EXTENDED:
1941				syncprov_matchops( op, opc, 0 );
1942				break;
1943			case LDAP_REQ_DELETE:
1944				/* for each match in opc->smatches:
1945				 *   send DELETE msg
1946				 */
1947				for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
1948					if ( sm->sm_op->s_op->o_abandon )
1949						continue;
1950					syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE );
1951				}
1952				break;
1953			}
1954		}
1955
1956		/* Add any log records */
1957		if ( si->si_logs ) {
1958			syncprov_add_slog( op );
1959		}
1960leave:		ldap_pvt_thread_mutex_unlock( &si->si_resp_mutex );
1961	}
1962	return SLAP_CB_CONTINUE;
1963}
1964
1965/* We don't use a subentry to store the context CSN any more.
1966 * We expose the current context CSN as an operational attribute
1967 * of the suffix entry.
1968 */
1969static int
1970syncprov_op_compare( Operation *op, SlapReply *rs )
1971{
1972	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
1973	syncprov_info_t		*si = on->on_bi.bi_private;
1974	int rc = SLAP_CB_CONTINUE;
1975
1976	if ( dn_match( &op->o_req_ndn, &si->si_contextdn ) &&
1977		op->oq_compare.rs_ava->aa_desc == slap_schema.si_ad_contextCSN )
1978	{
1979		Entry e = {0};
1980		Attribute a = {0};
1981
1982		e.e_name = si->si_contextdn;
1983		e.e_nname = si->si_contextdn;
1984		e.e_attrs = &a;
1985
1986		a.a_desc = slap_schema.si_ad_contextCSN;
1987
1988		ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
1989
1990		a.a_vals = si->si_ctxcsn;
1991		a.a_nvals = a.a_vals;
1992		a.a_numvals = si->si_numcsns;
1993
1994		rs->sr_err = access_allowed( op, &e, op->oq_compare.rs_ava->aa_desc,
1995			&op->oq_compare.rs_ava->aa_value, ACL_COMPARE, NULL );
1996		if ( ! rs->sr_err ) {
1997			rs->sr_err = LDAP_INSUFFICIENT_ACCESS;
1998			goto return_results;
1999		}
2000
2001		if ( get_assert( op ) &&
2002			( test_filter( op, &e, get_assertion( op ) ) != LDAP_COMPARE_TRUE ) )
2003		{
2004			rs->sr_err = LDAP_ASSERTION_FAILED;
2005			goto return_results;
2006		}
2007
2008
2009		rs->sr_err = LDAP_COMPARE_FALSE;
2010
2011		if ( attr_valfind( &a,
2012			SLAP_MR_ATTRIBUTE_VALUE_NORMALIZED_MATCH |
2013				SLAP_MR_ASSERTED_VALUE_NORMALIZED_MATCH,
2014				&op->oq_compare.rs_ava->aa_value, NULL, op->o_tmpmemctx ) == 0 )
2015		{
2016			rs->sr_err = LDAP_COMPARE_TRUE;
2017		}
2018
2019return_results:;
2020
2021		ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
2022
2023		send_ldap_result( op, rs );
2024
2025		if( rs->sr_err == LDAP_COMPARE_FALSE || rs->sr_err == LDAP_COMPARE_TRUE ) {
2026			rs->sr_err = LDAP_SUCCESS;
2027		}
2028		rc = rs->sr_err;
2029	}
2030
2031	return rc;
2032}
2033
2034static int
2035syncprov_op_mod( Operation *op, SlapReply *rs )
2036{
2037	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
2038	syncprov_info_t		*si = on->on_bi.bi_private;
2039	slap_callback *cb;
2040	opcookie *opc;
2041	int have_psearches, cbsize;
2042
2043	ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
2044	have_psearches = ( si->si_ops != NULL );
2045	si->si_active++;
2046	ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
2047
2048	cbsize = sizeof(slap_callback) + sizeof(opcookie) +
2049		(have_psearches ? sizeof(modinst) : 0 );
2050
2051	cb = op->o_tmpcalloc(1, cbsize, op->o_tmpmemctx);
2052	opc = (opcookie *)(cb+1);
2053	opc->son = on;
2054	cb->sc_response = syncprov_op_response;
2055	cb->sc_cleanup = syncprov_op_cleanup;
2056	cb->sc_private = opc;
2057	cb->sc_next = op->o_callback;
2058	op->o_callback = cb;
2059
2060	opc->osid = -1;
2061	opc->rsid = -1;
2062	if ( op->o_csn.bv_val ) {
2063		opc->osid = slap_parse_csn_sid( &op->o_csn );
2064	}
2065	if ( op->o_controls ) {
2066		struct sync_cookie *scook =
2067		op->o_controls[slap_cids.sc_LDAPsync];
2068		if ( scook )
2069			opc->rsid = scook->sid;
2070	}
2071
2072	if ( op->o_dont_replicate )
2073		return SLAP_CB_CONTINUE;
2074
2075	/* If there are active persistent searches, lock this operation.
2076	 * See seqmod.c for the locking logic on its own.
2077	 */
2078	if ( have_psearches ) {
2079		modtarget *mt, mtdummy;
2080		modinst *mi;
2081
2082		mi = (modinst *)(opc+1);
2083		mi->mi_op = op;
2084		mi->mi_next = NULL;
2085
2086		/* See if we're already modifying this entry... */
2087		mtdummy.mt_dn = op->o_req_ndn;
2088		ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
2089		mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp );
2090		if ( mt ) {
2091			ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
2092			if ( mt->mt_mods == NULL ) {
2093				/* Cannot reuse this mt, as another thread is about
2094				 * to release it in syncprov_op_cleanup.
2095				 */
2096				ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2097				mt = NULL;
2098			}
2099		}
2100		if ( mt ) {
2101			ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
2102			mt->mt_tail->mi_next = mi;
2103			mt->mt_tail = mi;
2104			/* wait for this op to get to head of list */
2105			while ( mt->mt_mods && mt->mt_mods != mi ) {
2106				modinst *m2;
2107				// If our op is no longer in the queue for any reason, allow
2108				// the op to continue, rather than spinning here forever.
2109				for ( m2 = mt->mt_mods; m2 && m2 != mi;
2110						m2 = m2->mi_next );
2111				if(!m2) break;
2112
2113				ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2114
2115				/* FIXME: if dynamic config can delete overlays or
2116				 * databases we'll have to check for cleanup here.
2117				 * Currently it's not an issue because there are
2118				 * no dynamic config deletes...
2119				 */
2120				if ( slapd_shutdown )
2121					return SLAPD_ABANDON;
2122
2123				if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool ))
2124					ldap_pvt_thread_yield();
2125				ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
2126
2127				/* clean up if the caller is giving up */
2128				if ( op->o_abandon ) {
2129					modinst *m2;
2130					// Here, the abandoned op can be anywhere in the queue.
2131					// op_cleanup will always dequeue the head of the queue,
2132					// so if we call that and the abandoned op isn't the
2133					// head, we will dequeue an op still in flight.
2134
2135					// Dequeue our op regardless of where it is in the queue.
2136					for ( m2 = mt->mt_mods; m2 && m2->mi_next != mi;
2137						m2 = m2->mi_next );
2138					if ( m2 ) {
2139						m2->mi_next = mi->mi_next;
2140						if(mt->mt_tail == mi) mt->mt_tail = m2;
2141					}
2142					if(mt->mt_mods == mi) {
2143						mt->mt_mods = mt->mt_mods->mi_next;
2144					}
2145
2146					if(mt->mt_mods) {
2147						ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2148					} else {
2149						ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2150						ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
2151						avl_delete( &si->si_mods, mt, sp_avl_cmp );
2152						ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
2153						ldap_pvt_thread_mutex_destroy( &mt->mt_mutex );
2154						ch_free(mt->mt_dn.bv_val);
2155						ch_free(mt);
2156					}
2157
2158					ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
2159					/* abandon current active op */
2160					op->o_callback = cb->sc_next; /* reset to previous callback */
2161					op->o_tmpfree( cb, op->o_tmpmemctx ); /* remove syncprov_op_cleanup and syncprov_op_response callbacks */
2162					si->si_active--; /* remove from active mods tally */
2163					ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
2164
2165					return SLAPD_ABANDON;
2166				}
2167			}
2168			ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
2169		} else {
2170			/* Record that we're modifying this entry now */
2171			mt = ch_malloc( sizeof(modtarget) );
2172			mt->mt_mods = mi;
2173			mt->mt_tail = mi;
2174			ber_dupbv( &mt->mt_dn, &mi->mi_op->o_req_ndn );
2175			ldap_pvt_thread_mutex_init( &mt->mt_mutex );
2176			avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error );
2177			ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
2178		}
2179		opc->smt = mt;
2180	}
2181
2182	if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )
2183		syncprov_matchops( op, opc, 1 );
2184
2185	return SLAP_CB_CONTINUE;
2186}
2187
2188static int
2189syncprov_op_extended( Operation *op, SlapReply *rs )
2190{
2191	if ( exop_is_write( op ))
2192		return syncprov_op_mod( op, rs );
2193
2194	return SLAP_CB_CONTINUE;
2195}
2196
2197typedef struct searchstate {
2198	slap_overinst *ss_on;
2199	syncops *ss_so;
2200	BerVarray ss_ctxcsn;
2201	int *ss_sids;
2202	int ss_numcsns;
2203#define	SS_PRESENT	0x01
2204#define	SS_CHANGED	0x02
2205	int ss_flags;
2206} searchstate;
2207
2208typedef struct SyncOperationBuffer {
2209	Operation		sob_op;
2210	Opheader		sob_hdr;
2211	OpExtra			sob_oe;
2212	AttributeName	sob_extra;	/* not always present */
2213	/* Further data allocated here */
2214} SyncOperationBuffer;
2215
2216static void
2217syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on )
2218{
2219	SyncOperationBuffer *sopbuf2;
2220	Operation *op2;
2221	int i, alen = 0;
2222	size_t size;
2223	char *ptr;
2224	GroupAssertion *g1, *g2;
2225
2226	/* count the search attrs */
2227	for (i=0; op->ors_attrs && !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) {
2228		alen += op->ors_attrs[i].an_name.bv_len + 1;
2229	}
2230	/* Make a new copy of the operation */
2231	size = offsetof( SyncOperationBuffer, sob_extra ) +
2232		(i ? ( (i+1) * sizeof(AttributeName) + alen) : 0) +
2233		op->o_req_dn.bv_len + 1 +
2234		op->o_req_ndn.bv_len + 1 +
2235		op->o_ndn.bv_len + 1 +
2236		so->s_filterstr.bv_len + 1;
2237	sopbuf2 = ch_calloc( 1, size );
2238	op2 = &sopbuf2->sob_op;
2239	op2->o_hdr = &sopbuf2->sob_hdr;
2240	LDAP_SLIST_FIRST(&op2->o_extra) = &sopbuf2->sob_oe;
2241
2242	/* Copy the fields we care about explicitly, leave the rest alone */
2243	*op2->o_hdr = *op->o_hdr;
2244	op2->o_tag = op->o_tag;
2245	op2->o_time = op->o_time;
2246	op2->o_bd = on->on_info->oi_origdb;
2247	op2->o_request = op->o_request;
2248	op2->o_managedsait = op->o_managedsait;
2249	LDAP_SLIST_FIRST(&op2->o_extra)->oe_key = on;
2250	LDAP_SLIST_NEXT(LDAP_SLIST_FIRST(&op2->o_extra), oe_next) = NULL;
2251
2252	ptr = (char *) sopbuf2 + offsetof( SyncOperationBuffer, sob_extra );
2253	if ( i ) {
2254		op2->ors_attrs = (AttributeName *) ptr;
2255		ptr = (char *) &op2->ors_attrs[i+1];
2256		for (i=0; !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) {
2257			op2->ors_attrs[i] = op->ors_attrs[i];
2258			op2->ors_attrs[i].an_name.bv_val = ptr;
2259			ptr = lutil_strcopy( ptr, op->ors_attrs[i].an_name.bv_val ) + 1;
2260		}
2261		BER_BVZERO( &op2->ors_attrs[i].an_name );
2262	}
2263
2264	op2->o_authz = op->o_authz;
2265	op2->o_ndn.bv_val = ptr;
2266	ptr = lutil_strcopy(ptr, op->o_ndn.bv_val) + 1;
2267	op2->o_dn = op2->o_ndn;
2268	op2->o_req_dn.bv_len = op->o_req_dn.bv_len;
2269	op2->o_req_dn.bv_val = ptr;
2270	ptr = lutil_strcopy(ptr, op->o_req_dn.bv_val) + 1;
2271	op2->o_req_ndn.bv_len = op->o_req_ndn.bv_len;
2272	op2->o_req_ndn.bv_val = ptr;
2273	ptr = lutil_strcopy(ptr, op->o_req_ndn.bv_val) + 1;
2274	op2->ors_filterstr.bv_val = ptr;
2275	strcpy( ptr, so->s_filterstr.bv_val );
2276	op2->ors_filterstr.bv_len = so->s_filterstr.bv_len;
2277
2278	/* Skip the AND/GE clause that we stuck on in front */
2279	if ( so->s_flags & PS_FIX_FILTER ) {
2280		op2->ors_filter = op->ors_filter->f_and->f_next;
2281		so->s_flags ^= PS_FIX_FILTER;
2282	} else {
2283		op2->ors_filter = op->ors_filter;
2284	}
2285	op2->ors_filter = filter_dup( op2->ors_filter, NULL );
2286	so->s_op = op2;
2287
2288	/* Copy any cached group ACLs individually */
2289	op2->o_groups = NULL;
2290	for ( g1=op->o_groups; g1; g1=g1->ga_next ) {
2291		g2 = ch_malloc( sizeof(GroupAssertion) + g1->ga_len );
2292		*g2 = *g1;
2293		strcpy( g2->ga_ndn, g1->ga_ndn );
2294		g2->ga_next = op2->o_groups;
2295		op2->o_groups = g2;
2296	}
2297	/* Don't allow any further group caching */
2298	op2->o_do_not_cache = 1;
2299
2300	/* Add op2 to conn so abandon will find us */
2301	op->o_conn->c_n_ops_executing++;
2302	op->o_conn->c_n_ops_completed--;
2303	LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next );
2304	so->s_flags |= PS_IS_DETACHED;
2305
2306	/* Prevent anyone else from trying to send a result for this op */
2307	op->o_abandon = 1;
2308}
2309
2310static int
2311syncprov_search_response( Operation *op, SlapReply *rs )
2312{
2313	searchstate *ss = op->o_callback->sc_private;
2314	slap_overinst *on = ss->ss_on;
2315	syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
2316	sync_control *srs = op->o_controls[slap_cids.sc_LDAPsync];
2317
2318	if ( rs->sr_type == REP_SEARCH || rs->sr_type == REP_SEARCHREF ) {
2319		Attribute *a;
2320		/* If we got a referral without a referral object, there's
2321		 * something missing that we cannot replicate. Just ignore it.
2322		 * The consumer will abort because we didn't send the expected
2323		 * control.
2324		 */
2325		if ( !rs->sr_entry ) {
2326			assert( rs->sr_entry != NULL );
2327			Debug( LDAP_DEBUG_ANY, "bogus referral in context\n",0,0,0 );
2328			return SLAP_CB_CONTINUE;
2329		}
2330		a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN );
2331		if ( a == NULL && rs->sr_operational_attrs != NULL ) {
2332			a = attr_find( rs->sr_operational_attrs, slap_schema.si_ad_entryCSN );
2333		}
2334		if ( a ) {
2335			int i, sid;
2336			sid = slap_parse_csn_sid( &a->a_nvals[0] );
2337
2338			/* Don't send changed entries back to the originator */
2339			if ( sid == srs->sr_state.sid && srs->sr_state.numcsns ) {
2340				Debug( LDAP_DEBUG_SYNC,
2341					"Entry %s changed by peer, ignored\n",
2342					rs->sr_entry->e_name.bv_val, 0, 0 );
2343				return LDAP_SUCCESS;
2344			}
2345
2346			/* If not a persistent search */
2347			if ( !ss->ss_so ) {
2348				/* Make sure entry is less than the snapshot'd contextCSN */
2349				for ( i=0; i<ss->ss_numcsns; i++ ) {
2350					if ( sid == ss->ss_sids[i] && ber_bvcmp( &a->a_nvals[0],
2351						&ss->ss_ctxcsn[i] ) > 0 ) {
2352						Debug( LDAP_DEBUG_SYNC,
2353							"Entry %s CSN %s greater than snapshot %s\n",
2354							rs->sr_entry->e_name.bv_val,
2355							a->a_nvals[0].bv_val,
2356							ss->ss_ctxcsn[i].bv_val );
2357						return LDAP_SUCCESS;
2358					}
2359				}
2360			}
2361
2362			/* Don't send old entries twice */
2363			if ( srs->sr_state.ctxcsn ) {
2364				for ( i=0; i<srs->sr_state.numcsns; i++ ) {
2365					if ( sid == srs->sr_state.sids[i] &&
2366						ber_bvcmp( &a->a_nvals[0],
2367							&srs->sr_state.ctxcsn[i] )<= 0 ) {
2368						Debug( LDAP_DEBUG_SYNC,
2369							"Entry %s CSN %s older or equal to ctx %s\n",
2370							rs->sr_entry->e_name.bv_val,
2371							a->a_nvals[0].bv_val,
2372							srs->sr_state.ctxcsn[i].bv_val );
2373						return LDAP_SUCCESS;
2374					}
2375				}
2376			}
2377		}
2378		rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
2379			op->o_tmpmemctx );
2380		rs->sr_ctrls[1] = NULL;
2381		rs->sr_flags |= REP_CTRLS_MUSTBEFREED;
2382		/* If we're in delta-sync mode, always send a cookie */
2383		if ( si->si_nopres && si->si_usehint && a ) {
2384			struct berval cookie;
2385			slap_compose_sync_cookie( op, &cookie, a->a_nvals, srs->sr_state.rid, slap_serverID ? slap_serverID : -1 );
2386			rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry,
2387				LDAP_SYNC_ADD, rs->sr_ctrls, 0, 1, &cookie );
2388			op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
2389		} else {
2390			rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry,
2391				LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL );
2392		}
2393	} else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) {
2394		struct berval cookie = BER_BVNULL;
2395
2396		if ( ( ss->ss_flags & SS_CHANGED ) &&
2397			ss->ss_ctxcsn && !BER_BVISNULL( &ss->ss_ctxcsn[0] )) {
2398			slap_compose_sync_cookie( op, &cookie, ss->ss_ctxcsn,
2399				srs->sr_state.rid, slap_serverID ? slap_serverID : -1 );
2400
2401			Debug( LDAP_DEBUG_SYNC, "syncprov_search_response: cookie=%s\n", cookie.bv_val, 0, 0 );
2402		}
2403
2404		/* Is this a regular refresh?
2405		 * Note: refresh never gets here if there were no changes
2406		 */
2407		if ( !ss->ss_so ) {
2408			rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
2409				op->o_tmpmemctx );
2410			rs->sr_ctrls[1] = NULL;
2411			rs->sr_flags |= REP_CTRLS_MUSTBEFREED;
2412			rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls,
2413				0, 1, &cookie, ( ss->ss_flags & SS_PRESENT ) ?  LDAP_SYNC_REFRESH_PRESENTS :
2414					LDAP_SYNC_REFRESH_DELETES );
2415			op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
2416		} else {
2417		/* It's RefreshAndPersist, transition to Persist phase */
2418			syncprov_sendinfo( op, rs, ( ss->ss_flags & SS_PRESENT ) ?
2419	 			LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
2420				( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL,
2421				1, NULL, 0 );
2422			if ( !BER_BVISNULL( &cookie ))
2423				op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
2424
2425			/* Detach this Op from frontend control */
2426			ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
2427
2428			/* But not if this connection was closed along the way */
2429			if ( op->o_abandon ) {
2430				ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
2431				/* syncprov_ab_cleanup will free this syncop */
2432				return SLAPD_ABANDON;
2433
2434			} else {
2435				ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
2436				/* Turn off the refreshing flag */
2437				ss->ss_so->s_flags ^= PS_IS_REFRESHING;
2438
2439				syncprov_detach_op( op, ss->ss_so, on );
2440
2441				ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
2442
2443				/* If there are queued responses, fire them off */
2444				if ( ss->ss_so->s_res )
2445					syncprov_qstart( ss->ss_so );
2446				ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
2447			}
2448
2449			return LDAP_SUCCESS;
2450		}
2451	}
2452
2453	return SLAP_CB_CONTINUE;
2454}
2455
2456static int
2457syncprov_op_search( Operation *op, SlapReply *rs )
2458{
2459	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
2460	syncprov_info_t		*si = (syncprov_info_t *)on->on_bi.bi_private;
2461	slap_callback	*cb;
2462	int gotstate = 0, changed = 0, do_present = 0;
2463	syncops *sop = NULL;
2464	searchstate *ss;
2465	sync_control *srs;
2466	BerVarray ctxcsn;
2467	int i, *sids, numcsns;
2468	struct berval mincsn, maxcsn;
2469	int minsid, maxsid;
2470	int dirty = 0;
2471
2472	if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
2473
2474	if ( op->ors_deref & LDAP_DEREF_SEARCHING ) {
2475		send_ldap_error( op, rs, LDAP_PROTOCOL_ERROR, "illegal value for derefAliases" );
2476		return rs->sr_err;
2477	}
2478
2479	srs = op->o_controls[slap_cids.sc_LDAPsync];
2480
2481	/* If this is a persistent search, set it up right away */
2482	if ( op->o_sync_mode & SLAP_SYNC_PERSIST ) {
2483		syncops so = {0};
2484		fbase_cookie fc;
2485		opcookie opc;
2486		slap_callback sc;
2487
2488		fc.fss = &so;
2489		fc.fbase = 0;
2490		so.s_eid = NOID;
2491		so.s_op = op;
2492		so.s_flags = PS_IS_REFRESHING | PS_FIND_BASE;
2493		/* syncprov_findbase expects to be called as a callback... */
2494		sc.sc_private = &opc;
2495		opc.son = on;
2496		ldap_pvt_thread_mutex_init( &so.s_mutex );
2497		cb = op->o_callback;
2498		op->o_callback = &sc;
2499		rs->sr_err = syncprov_findbase( op, &fc );
2500		op->o_callback = cb;
2501		ldap_pvt_thread_mutex_destroy( &so.s_mutex );
2502
2503		if ( rs->sr_err != LDAP_SUCCESS ) {
2504			send_ldap_result( op, rs );
2505			return rs->sr_err;
2506		}
2507		sop = ch_malloc( sizeof( syncops ));
2508		*sop = so;
2509		ldap_pvt_thread_mutex_init( &sop->s_mutex );
2510		sop->s_rid = srs->sr_state.rid;
2511		sop->s_sid = srs->sr_state.sid;
2512		/* set refcount=2 to prevent being freed out from under us
2513		 * by abandons that occur while we're running here
2514		 */
2515		sop->s_inuse = 2;
2516
2517		ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
2518		while ( si->si_active ) {
2519			/* Wait for active mods to finish before proceeding, as they
2520			 * may already have inspected the si_ops list looking for
2521			 * consumers to replicate the change to.  Using the log
2522			 * doesn't help, as we may finish playing it before the
2523			 * active mods gets added to it.
2524			 */
2525			ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
2526			if ( slapd_shutdown ) {
2527				ch_free( sop );
2528				return SLAPD_ABANDON;
2529			}
2530			if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool ))
2531				ldap_pvt_thread_yield();
2532			ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
2533		}
2534		sop->s_next = si->si_ops;
2535		si->si_ops = sop;
2536		ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
2537	}
2538
2539	/* snapshot the ctxcsn */
2540	ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
2541	numcsns = si->si_numcsns;
2542	if ( numcsns ) {
2543		ber_bvarray_dup_x( &ctxcsn, si->si_ctxcsn, op->o_tmpmemctx );
2544		sids = op->o_tmpalloc( numcsns * sizeof(int), op->o_tmpmemctx );
2545		for ( i=0; i<numcsns; i++ )
2546			sids[i] = si->si_sids[i];
2547	} else {
2548		ctxcsn = NULL;
2549		sids = NULL;
2550	}
2551	dirty = si->si_dirty;
2552	ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
2553
2554	/* If we have a cookie, handle the PRESENT lookups */
2555	if ( srs->sr_state.ctxcsn ) {
2556		sessionlog *sl;
2557		int i, j;
2558
2559		/* If we don't have any CSN of our own yet, pretend nothing
2560		 * has changed.
2561		 */
2562		if ( !numcsns )
2563			goto no_change;
2564
2565		if ( !si->si_nopres )
2566			do_present = SS_PRESENT;
2567
2568		/* If there are SIDs we don't recognize in the cookie, drop them */
2569		for (i=0; i<srs->sr_state.numcsns; ) {
2570			for (j=i; j<numcsns; j++) {
2571				if ( srs->sr_state.sids[i] <= sids[j] ) {
2572					break;
2573				}
2574			}
2575			/* not found */
2576			if ( j == numcsns || srs->sr_state.sids[i] != sids[j] ) {
2577				char *tmp = srs->sr_state.ctxcsn[i].bv_val;
2578				srs->sr_state.numcsns--;
2579				for ( j=i; j<srs->sr_state.numcsns; j++ ) {
2580					srs->sr_state.ctxcsn[j] = srs->sr_state.ctxcsn[j+1];
2581					srs->sr_state.sids[j] = srs->sr_state.sids[j+1];
2582				}
2583				srs->sr_state.ctxcsn[j].bv_val = tmp;
2584				srs->sr_state.ctxcsn[j].bv_len = 0;
2585				continue;
2586			}
2587			i++;
2588		}
2589
2590		if (srs->sr_state.numcsns != numcsns) {
2591			/* consumer doesn't have the right number of CSNs */
2592			changed = SS_CHANGED;
2593			if ( srs->sr_state.ctxcsn ) {
2594				ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx );
2595				srs->sr_state.ctxcsn = NULL;
2596			}
2597			if ( srs->sr_state.sids ) {
2598				slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx );
2599				srs->sr_state.sids = NULL;
2600			}
2601			srs->sr_state.numcsns = 0;
2602			goto shortcut;
2603		}
2604
2605		/* Find the smallest CSN which differs from contextCSN */
2606		mincsn.bv_len = 0;
2607		maxcsn.bv_len = 0;
2608		for ( i=0,j=0; i<srs->sr_state.numcsns; i++ ) {
2609			int newer;
2610			while ( srs->sr_state.sids[i] != sids[j] ) j++;
2611			if ( BER_BVISEMPTY( &maxcsn ) || ber_bvcmp( &maxcsn,
2612				&srs->sr_state.ctxcsn[i] ) < 0 ) {
2613				maxcsn = srs->sr_state.ctxcsn[i];
2614				maxsid = sids[j];
2615			}
2616			newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] );
2617			/* If our state is newer, tell consumer about changes */
2618			if ( newer < 0) {
2619				changed = SS_CHANGED;
2620				if ( BER_BVISEMPTY( &mincsn ) || ber_bvcmp( &mincsn,
2621					&srs->sr_state.ctxcsn[i] ) > 0 ) {
2622					mincsn = srs->sr_state.ctxcsn[i];
2623					minsid = sids[j];
2624				}
2625			} else if ( newer > 0 && sids[j] == slap_serverID ) {
2626			/* our state is older, complain to consumer */
2627				rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
2628				rs->sr_text = "consumer state is newer than provider!";
2629bailout:
2630				if ( sop ) {
2631					syncops **sp = &si->si_ops;
2632
2633					ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
2634					while ( *sp != sop )
2635						sp = &(*sp)->s_next;
2636					*sp = sop->s_next;
2637					ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
2638					ch_free( sop );
2639				}
2640				rs->sr_ctrls = NULL;
2641				send_ldap_result( op, rs );
2642				return rs->sr_err;
2643			}
2644 		}
2645		if ( BER_BVISEMPTY( &mincsn )) {
2646			mincsn = maxcsn;
2647			minsid = maxsid;
2648		}
2649
2650		/* If nothing has changed, shortcut it */
2651		if ( !changed && !dirty ) {
2652			do_present = 0;
2653no_change:		if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
2654				LDAPControl	*ctrls[2];
2655
2656				ctrls[0] = NULL;
2657				ctrls[1] = NULL;
2658				syncprov_done_ctrl( op, rs, ctrls, 0, 0,
2659					NULL, LDAP_SYNC_REFRESH_DELETES );
2660				rs->sr_ctrls = ctrls;
2661				rs->sr_err = LDAP_SUCCESS;
2662				send_ldap_result( op, rs );
2663				rs->sr_ctrls = NULL;
2664				return rs->sr_err;
2665			}
2666			goto shortcut;
2667		}
2668
2669		/* Do we have a sessionlog for this search? */
2670		sl=si->si_logs;
2671		if ( sl ) {
2672			int do_play = 0;
2673			ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
2674			/* Are there any log entries, and is the consumer state
2675			 * present in the session log?
2676			 */
2677			if ( sl->sl_num > 0 ) {
2678				int i;
2679				for ( i=0; i<sl->sl_numcsns; i++ ) {
2680					/* SID not present == new enough */
2681					if ( minsid < sl->sl_sids[i] ) {
2682						do_play = 1;
2683						break;
2684					}
2685					/* SID present */
2686					if ( minsid == sl->sl_sids[i] ) {
2687						/* new enough? */
2688						if ( ber_bvcmp( &mincsn, &sl->sl_mincsn[i] ) >= 0 )
2689							do_play = 1;
2690						break;
2691					}
2692				}
2693				/* SID not present == new enough */
2694				if ( i == sl->sl_numcsns )
2695					do_play = 1;
2696			}
2697			if ( do_play ) {
2698				do_present = 0;
2699				/* mutex is unlocked in playlog */
2700				syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids );
2701			} else {
2702				ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
2703			}
2704		}
2705		/* Is the CSN still present in the database? */
2706		if ( syncprov_findcsn( op, FIND_CSN, &mincsn ) != LDAP_SUCCESS ) {
2707			/* No, so a reload is required */
2708			/* the 2.2 consumer doesn't send this hint */
2709			if ( si->si_usehint && srs->sr_rhint == 0 ) {
2710				if ( ctxcsn )
2711					ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx );
2712				if ( sids )
2713					op->o_tmpfree( sids, op->o_tmpmemctx );
2714				rs->sr_err = LDAP_SYNC_REFRESH_REQUIRED;
2715				rs->sr_text = "sync cookie is stale";
2716				goto bailout;
2717			}
2718			if ( srs->sr_state.ctxcsn ) {
2719				ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx );
2720				srs->sr_state.ctxcsn = NULL;
2721			}
2722			if ( srs->sr_state.sids ) {
2723				slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx );
2724				srs->sr_state.sids = NULL;
2725			}
2726			srs->sr_state.numcsns = 0;
2727		} else {
2728			gotstate = 1;
2729			/* If changed and doing Present lookup, send Present UUIDs */
2730			if ( do_present && syncprov_findcsn( op, FIND_PRESENT, 0 ) !=
2731				LDAP_SUCCESS ) {
2732				if ( ctxcsn )
2733					ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx );
2734				if ( sids )
2735					op->o_tmpfree( sids, op->o_tmpmemctx );
2736				goto bailout;
2737			}
2738		}
2739	} else {
2740		/* No consumer state, assume something has changed */
2741		changed = SS_CHANGED;
2742	}
2743
2744shortcut:
2745	/* Append CSN range to search filter, save original filter
2746	 * for persistent search evaluation
2747	 */
2748	if ( sop ) {
2749		ldap_pvt_thread_mutex_lock( &sop->s_mutex );
2750		sop->s_filterstr = op->ors_filterstr;
2751		/* correct the refcount that was set to 2 before */
2752		sop->s_inuse--;
2753	}
2754
2755	/* If something changed, find the changes */
2756	if ( gotstate && ( changed || dirty ) ) {
2757		Filter *fand, *fava;
2758
2759		fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
2760		fand->f_choice = LDAP_FILTER_AND;
2761		fand->f_next = NULL;
2762		fava = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
2763		fand->f_and = fava;
2764		fava->f_choice = LDAP_FILTER_GE;
2765		fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
2766		fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
2767#ifdef LDAP_COMP_MATCH
2768		fava->f_ava->aa_cf = NULL;
2769#endif
2770		ber_dupbv_x( &fava->f_ava->aa_value, &mincsn, op->o_tmpmemctx );
2771		fava->f_next = op->ors_filter;
2772		op->ors_filter = fand;
2773		filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
2774		if ( sop ) {
2775			sop->s_flags |= PS_FIX_FILTER;
2776		}
2777	}
2778	if ( sop ) {
2779		ldap_pvt_thread_mutex_unlock( &sop->s_mutex );
2780	}
2781
2782	/* Let our callback add needed info to returned entries */
2783	cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(searchstate), op->o_tmpmemctx);
2784	ss = (searchstate *)(cb+1);
2785	ss->ss_on = on;
2786	ss->ss_so = sop;
2787	ss->ss_flags = do_present | changed;
2788	ss->ss_ctxcsn = ctxcsn;
2789	ss->ss_numcsns = numcsns;
2790	ss->ss_sids = sids;
2791	cb->sc_response = syncprov_search_response;
2792	cb->sc_private = ss;
2793	cb->sc_next = op->o_callback;
2794	op->o_callback = cb;
2795
2796	/* If this is a persistent search and no changes were reported during
2797	 * the refresh phase, just invoke the response callback to transition
2798	 * us into persist phase
2799	 */
2800	if ( !changed && !dirty ) {
2801		rs->sr_err = LDAP_SUCCESS;
2802		rs->sr_nentries = 0;
2803		send_ldap_result( op, rs );
2804		return rs->sr_err;
2805	}
2806	return SLAP_CB_CONTINUE;
2807}
2808
2809static int
2810syncprov_operational(
2811	Operation *op,
2812	SlapReply *rs )
2813{
2814	slap_overinst		*on = (slap_overinst *)op->o_bd->bd_info;
2815	syncprov_info_t		*si = (syncprov_info_t *)on->on_bi.bi_private;
2816
2817	/* This prevents generating unnecessarily; frontend will strip
2818	 * any statically stored copy.
2819	 */
2820	if ( op->o_sync != SLAP_CONTROL_NONE )
2821		return SLAP_CB_CONTINUE;
2822
2823	if ( rs->sr_entry &&
2824		dn_match( &rs->sr_entry->e_nname, &si->si_contextdn )) {
2825
2826		if ( SLAP_OPATTRS( rs->sr_attr_flags ) ||
2827			ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) {
2828			Attribute *a, **ap = NULL;
2829
2830			for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) {
2831				if ( a->a_desc == slap_schema.si_ad_contextCSN )
2832					break;
2833			}
2834
2835			ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
2836			if ( si->si_ctxcsn ) {
2837				if ( !a ) {
2838					for ( ap = &rs->sr_operational_attrs; *ap;
2839						ap=&(*ap)->a_next );
2840
2841					a = attr_alloc( slap_schema.si_ad_contextCSN );
2842					*ap = a;
2843				}
2844
2845				if ( !ap ) {
2846					if ( rs_entry2modifiable( op, rs, on )) {
2847						a = attr_find( rs->sr_entry->e_attrs,
2848							slap_schema.si_ad_contextCSN );
2849					}
2850					if ( a->a_nvals != a->a_vals ) {
2851						ber_bvarray_free( a->a_nvals );
2852					}
2853					a->a_nvals = NULL;
2854					ber_bvarray_free( a->a_vals );
2855					a->a_vals = NULL;
2856					a->a_numvals = 0;
2857				}
2858				attr_valadd( a, si->si_ctxcsn, si->si_ctxcsn, si->si_numcsns );
2859			}
2860			ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
2861		}
2862	}
2863	return SLAP_CB_CONTINUE;
2864}
2865
2866enum {
2867	SP_CHKPT = 1,
2868	SP_SESSL,
2869	SP_NOPRES,
2870	SP_USEHINT
2871};
2872
2873static ConfigDriver sp_cf_gen;
2874
2875static ConfigTable spcfg[] = {
2876	{ "syncprov-checkpoint", "ops> <minutes", 3, 3, 0, ARG_MAGIC|SP_CHKPT,
2877		sp_cf_gen, "( OLcfgOvAt:1.1 NAME 'olcSpCheckpoint' "
2878			"DESC 'ContextCSN checkpoint interval in ops and minutes' "
2879			"SYNTAX OMsDirectoryString SINGLE-VALUE )", NULL, NULL },
2880	{ "syncprov-sessionlog", "ops", 2, 2, 0, ARG_INT|ARG_MAGIC|SP_SESSL,
2881		sp_cf_gen, "( OLcfgOvAt:1.2 NAME 'olcSpSessionlog' "
2882			"DESC 'Session log size in ops' "
2883			"SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
2884	{ "syncprov-nopresent", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_NOPRES,
2885		sp_cf_gen, "( OLcfgOvAt:1.3 NAME 'olcSpNoPresent' "
2886			"DESC 'Omit Present phase processing' "
2887			"SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL },
2888	{ "syncprov-reloadhint", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_USEHINT,
2889		sp_cf_gen, "( OLcfgOvAt:1.4 NAME 'olcSpReloadHint' "
2890			"DESC 'Observe Reload Hint in Request control' "
2891			"SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL },
2892	{ NULL, NULL, 0, 0, 0, ARG_IGNORED }
2893};
2894
2895static ConfigOCs spocs[] = {
2896	{ "( OLcfgOvOc:1.1 "
2897		"NAME 'olcSyncProvConfig' "
2898		"DESC 'SyncRepl Provider configuration' "
2899		"SUP olcOverlayConfig "
2900		"MAY ( olcSpCheckpoint "
2901			"$ olcSpSessionlog "
2902			"$ olcSpNoPresent "
2903			"$ olcSpReloadHint "
2904		") )",
2905			Cft_Overlay, spcfg },
2906	{ NULL, 0, NULL }
2907};
2908
2909static int
2910sp_cf_gen(ConfigArgs *c)
2911{
2912	slap_overinst		*on = (slap_overinst *)c->bi;
2913	syncprov_info_t		*si = (syncprov_info_t *)on->on_bi.bi_private;
2914	int rc = 0;
2915
2916	if ( c->op == SLAP_CONFIG_EMIT ) {
2917		switch ( c->type ) {
2918		case SP_CHKPT:
2919			if ( si->si_chkops || si->si_chktime ) {
2920				struct berval bv;
2921				/* we assume si_chktime is a multiple of 60
2922				 * because the parsed value was originally
2923				 * multiplied by 60 */
2924				bv.bv_len = snprintf( c->cr_msg, sizeof( c->cr_msg ),
2925					"%d %d", si->si_chkops, si->si_chktime/60 );
2926				if ( bv.bv_len >= sizeof( c->cr_msg ) ) {
2927					rc = 1;
2928				} else {
2929					bv.bv_val = c->cr_msg;
2930					value_add_one( &c->rvalue_vals, &bv );
2931				}
2932			} else {
2933				rc = 1;
2934			}
2935			break;
2936		case SP_SESSL:
2937			if ( si->si_logs ) {
2938				c->value_int = si->si_logs->sl_size;
2939			} else {
2940				rc = 1;
2941			}
2942			break;
2943		case SP_NOPRES:
2944			if ( si->si_nopres ) {
2945				c->value_int = 1;
2946			} else {
2947				rc = 1;
2948			}
2949			break;
2950		case SP_USEHINT:
2951			if ( si->si_usehint ) {
2952				c->value_int = 1;
2953			} else {
2954				rc = 1;
2955			}
2956			break;
2957		}
2958		return rc;
2959	} else if ( c->op == LDAP_MOD_DELETE ) {
2960		switch ( c->type ) {
2961		case SP_CHKPT:
2962			si->si_chkops = 0;
2963			si->si_chktime = 0;
2964			break;
2965		case SP_SESSL:
2966			if ( si->si_logs )
2967				si->si_logs->sl_size = 0;
2968			else
2969				rc = LDAP_NO_SUCH_ATTRIBUTE;
2970			break;
2971		case SP_NOPRES:
2972			if ( si->si_nopres )
2973				si->si_nopres = 0;
2974			else
2975				rc = LDAP_NO_SUCH_ATTRIBUTE;
2976			break;
2977		case SP_USEHINT:
2978			if ( si->si_usehint )
2979				si->si_usehint = 0;
2980			else
2981				rc = LDAP_NO_SUCH_ATTRIBUTE;
2982			break;
2983		}
2984		return rc;
2985	}
2986	switch ( c->type ) {
2987	case SP_CHKPT:
2988		if ( lutil_atoi( &si->si_chkops, c->argv[1] ) != 0 ) {
2989			snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint ops # \"%s\"",
2990				c->argv[0], c->argv[1] );
2991			Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
2992				"%s: %s\n", c->log, c->cr_msg, 0 );
2993			return ARG_BAD_CONF;
2994		}
2995		if ( si->si_chkops <= 0 ) {
2996			snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint ops # \"%d\"",
2997				c->argv[0], si->si_chkops );
2998			Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
2999				"%s: %s\n", c->log, c->cr_msg, 0 );
3000			return ARG_BAD_CONF;
3001		}
3002		if ( lutil_atoi( &si->si_chktime, c->argv[2] ) != 0 ) {
3003			snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint time \"%s\"",
3004				c->argv[0], c->argv[1] );
3005			Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
3006				"%s: %s\n", c->log, c->cr_msg, 0 );
3007			return ARG_BAD_CONF;
3008		}
3009		if ( si->si_chktime <= 0 ) {
3010			snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint time \"%d\"",
3011				c->argv[0], si->si_chkops );
3012			Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
3013				"%s: %s\n", c->log, c->cr_msg, 0 );
3014			return ARG_BAD_CONF;
3015		}
3016		si->si_chktime *= 60;
3017		break;
3018	case SP_SESSL: {
3019		sessionlog *sl;
3020		int size = c->value_int;
3021
3022		if ( size < 0 ) {
3023			snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s size %d is negative",
3024				c->argv[0], size );
3025			Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
3026				"%s: %s\n", c->log, c->cr_msg, 0 );
3027			return ARG_BAD_CONF;
3028		}
3029		sl = si->si_logs;
3030		if ( !sl ) {
3031			sl = ch_malloc( sizeof( sessionlog ));
3032			sl->sl_mincsn = NULL;
3033			sl->sl_sids = NULL;
3034			sl->sl_num = 0;
3035			sl->sl_numcsns = 0;
3036			sl->sl_head = sl->sl_tail = NULL;
3037			ldap_pvt_thread_mutex_init( &sl->sl_mutex );
3038			si->si_logs = sl;
3039		}
3040		sl->sl_size = size;
3041		}
3042		break;
3043	case SP_NOPRES:
3044		si->si_nopres = c->value_int;
3045		break;
3046	case SP_USEHINT:
3047		si->si_usehint = c->value_int;
3048		break;
3049	}
3050	return rc;
3051}
3052
3053/* ITS#3456 we cannot run this search on the main thread, must use a
3054 * child thread in order to insure we have a big enough stack.
3055 */
3056static void *
3057syncprov_db_otask(
3058	void *ptr
3059)
3060{
3061	syncprov_findcsn( ptr, FIND_MAXCSN, 0 );
3062	return NULL;
3063}
3064
3065
3066/* Read any existing contextCSN from the underlying db.
3067 * Then search for any entries newer than that. If no value exists,
3068 * just generate it. Cache whatever result.
3069 */
3070static int
3071syncprov_db_open(
3072	BackendDB *be,
3073	ConfigReply *cr
3074)
3075{
3076	slap_overinst   *on = (slap_overinst *) be->bd_info;
3077	syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
3078
3079	Connection conn = { 0 };
3080	OperationBuffer opbuf;
3081	Operation *op;
3082	Entry *e = NULL;
3083	Attribute *a;
3084	int rc;
3085	void *thrctx = NULL;
3086
3087	if ( !SLAP_LASTMOD( be )) {
3088		Debug( LDAP_DEBUG_ANY,
3089			"syncprov_db_open: invalid config, lastmod must be enabled\n", 0, 0, 0 );
3090		return -1;
3091	}
3092
3093	if ( slapMode & SLAP_TOOL_MODE ) {
3094		return 0;
3095	}
3096
3097	rc = overlay_register_control( be, LDAP_CONTROL_SYNC );
3098	if ( rc ) {
3099		return rc;
3100	}
3101
3102	thrctx = ldap_pvt_thread_pool_context();
3103	connection_fake_init2( &conn, &opbuf, thrctx, 0 );
3104	op = &opbuf.ob_op;
3105	op->o_bd = be;
3106	op->o_dn = be->be_rootdn;
3107	op->o_ndn = be->be_rootndn;
3108
3109	if ( SLAP_SYNC_SUBENTRY( be )) {
3110		build_new_dn( &si->si_contextdn, be->be_nsuffix,
3111			(struct berval *)&slap_ldapsync_cn_bv, NULL );
3112	} else {
3113		si->si_contextdn = be->be_nsuffix[0];
3114	}
3115	rc = overlay_entry_get_ov( op, &si->si_contextdn, NULL,
3116		slap_schema.si_ad_contextCSN, 0, &e, on );
3117
3118	if ( e ) {
3119		ldap_pvt_thread_t tid;
3120
3121		a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN );
3122		if ( a ) {
3123			ber_bvarray_dup_x( &si->si_ctxcsn, a->a_vals, NULL );
3124			si->si_numcsns = a->a_numvals;
3125			si->si_sids = slap_parse_csn_sids( si->si_ctxcsn, a->a_numvals, NULL );
3126			slap_sort_csn_sids( si->si_ctxcsn, si->si_sids, si->si_numcsns, NULL );
3127		}
3128		overlay_entry_release_ov( op, e, 0, on );
3129		if ( si->si_ctxcsn && !SLAP_DBCLEAN( be )) {
3130			op->o_tag = LDAP_REQ_SEARCH;
3131			op->o_req_dn = be->be_suffix[0];
3132			op->o_req_ndn = be->be_nsuffix[0];
3133			op->ors_scope = LDAP_SCOPE_SUBTREE;
3134			ldap_pvt_thread_create( &tid, 0, syncprov_db_otask, op );
3135			ldap_pvt_thread_join( tid, NULL );
3136		}
3137	}
3138
3139	/* Didn't find a contextCSN, should we generate one? */
3140	if ( !si->si_ctxcsn ) {
3141		char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ];
3142		struct berval csn;
3143
3144		if ( SLAP_SYNC_SHADOW( op->o_bd )) {
3145		/* If we're also a consumer, then don't generate anything.
3146		 * Wait for our provider to send it to us, or for a local
3147		 * modify if we have multimaster.
3148		 */
3149			goto out;
3150		}
3151		csn.bv_val = csnbuf;
3152		csn.bv_len = sizeof( csnbuf );
3153		slap_get_csn( op, &csn, 0 );
3154		value_add_one( &si->si_ctxcsn, &csn );
3155		si->si_numcsns = 1;
3156		si->si_sids = ch_malloc( sizeof(int) );
3157		si->si_sids[0] = slap_serverID;
3158
3159		/* make sure we do a checkpoint on close */
3160		si->si_numops++;
3161	}
3162
3163	/* Initialize the sessionlog mincsn */
3164	if ( si->si_logs && si->si_numcsns ) {
3165		sessionlog *sl = si->si_logs;
3166		int i;
3167		ber_bvarray_dup_x( &sl->sl_mincsn, si->si_ctxcsn, NULL );
3168		sl->sl_numcsns = si->si_numcsns;
3169		sl->sl_sids = ch_malloc( si->si_numcsns * sizeof(int) );
3170		for ( i=0; i < si->si_numcsns; i++ )
3171			sl->sl_sids[i] = si->si_sids[i];
3172	}
3173
3174out:
3175	op->o_bd->bd_info = (BackendInfo *)on;
3176	return 0;
3177}
3178
3179/* Write the current contextCSN into the underlying db.
3180 */
3181static int
3182syncprov_db_close(
3183	BackendDB *be,
3184	ConfigReply *cr
3185)
3186{
3187    slap_overinst   *on = (slap_overinst *) be->bd_info;
3188    syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
3189#ifdef SLAP_CONFIG_DELETE
3190	syncops *so, *sonext;
3191#endif /* SLAP_CONFIG_DELETE */
3192
3193	if ( slapMode & SLAP_TOOL_MODE ) {
3194		return 0;
3195	}
3196	if ( si->si_numops ) {
3197		Connection conn = {0};
3198		OperationBuffer opbuf;
3199		Operation *op;
3200		void *thrctx;
3201
3202		thrctx = ldap_pvt_thread_pool_context();
3203		connection_fake_init2( &conn, &opbuf, thrctx, 0 );
3204		op = &opbuf.ob_op;
3205		op->o_bd = be;
3206		op->o_dn = be->be_rootdn;
3207		op->o_ndn = be->be_rootndn;
3208		syncprov_checkpoint( op, on );
3209	}
3210
3211#ifdef SLAP_CONFIG_DELETE
3212	if ( !slapd_shutdown ) {
3213		for ( so=si->si_ops, sonext=so;  so; so=sonext  ) {
3214			SlapReply rs = {REP_RESULT};
3215			rs.sr_err = LDAP_UNAVAILABLE;
3216			send_ldap_result( so->s_op, &rs );
3217			sonext=so->s_next;
3218			syncprov_drop_psearch( so, 0);
3219		}
3220		si->si_ops=NULL;
3221	}
3222	overlay_unregister_control( be, LDAP_CONTROL_SYNC );
3223#endif /* SLAP_CONFIG_DELETE */
3224
3225    return 0;
3226}
3227
3228static int
3229syncprov_db_init(
3230	BackendDB *be,
3231	ConfigReply *cr
3232)
3233{
3234	slap_overinst	*on = (slap_overinst *)be->bd_info;
3235	syncprov_info_t	*si;
3236
3237	if ( SLAP_ISGLOBALOVERLAY( be ) ) {
3238		Debug( LDAP_DEBUG_ANY,
3239			"syncprov must be instantiated within a database.\n",
3240			0, 0, 0 );
3241		return 1;
3242	}
3243
3244	si = ch_calloc(1, sizeof(syncprov_info_t));
3245	on->on_bi.bi_private = si;
3246	ldap_pvt_thread_rdwr_init( &si->si_csn_rwlock );
3247	ldap_pvt_thread_mutex_init( &si->si_ops_mutex );
3248	ldap_pvt_thread_mutex_init( &si->si_mods_mutex );
3249	ldap_pvt_thread_mutex_init( &si->si_resp_mutex );
3250
3251	csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN;
3252	csn_anlist[0].an_name = slap_schema.si_ad_entryCSN->ad_cname;
3253	csn_anlist[1].an_desc = slap_schema.si_ad_entryUUID;
3254	csn_anlist[1].an_name = slap_schema.si_ad_entryUUID->ad_cname;
3255
3256	uuid_anlist[0].an_desc = slap_schema.si_ad_entryUUID;
3257	uuid_anlist[0].an_name = slap_schema.si_ad_entryUUID->ad_cname;
3258
3259	return 0;
3260}
3261
3262static int
3263syncprov_db_destroy(
3264	BackendDB *be,
3265	ConfigReply *cr
3266)
3267{
3268	slap_overinst	*on = (slap_overinst *)be->bd_info;
3269	syncprov_info_t	*si = (syncprov_info_t *)on->on_bi.bi_private;
3270
3271	if ( si ) {
3272		if ( si->si_logs ) {
3273			sessionlog *sl = si->si_logs;
3274			slog_entry *se = sl->sl_head;
3275
3276			while ( se ) {
3277				slog_entry *se_next = se->se_next;
3278				ch_free( se );
3279				se = se_next;
3280			}
3281			if ( sl->sl_mincsn )
3282				ber_bvarray_free( sl->sl_mincsn );
3283			if ( sl->sl_sids )
3284				ch_free( sl->sl_sids );
3285
3286			ldap_pvt_thread_mutex_destroy(&si->si_logs->sl_mutex);
3287			ch_free( si->si_logs );
3288		}
3289		if ( si->si_ctxcsn )
3290			ber_bvarray_free( si->si_ctxcsn );
3291		if ( si->si_sids )
3292			ch_free( si->si_sids );
3293		ldap_pvt_thread_mutex_destroy( &si->si_resp_mutex );
3294		ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex );
3295		ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex );
3296		ldap_pvt_thread_rdwr_destroy( &si->si_csn_rwlock );
3297		ch_free( si );
3298	}
3299
3300	return 0;
3301}
3302
3303static int syncprov_parseCtrl (
3304	Operation *op,
3305	SlapReply *rs,
3306	LDAPControl *ctrl )
3307{
3308	ber_tag_t tag;
3309	BerElementBuffer berbuf;
3310	BerElement *ber = (BerElement *)&berbuf;
3311	ber_int_t mode;
3312	ber_len_t len;
3313	struct berval cookie = BER_BVNULL;
3314	sync_control *sr;
3315	int rhint = 0;
3316
3317	if ( op->o_sync != SLAP_CONTROL_NONE ) {
3318		rs->sr_text = "Sync control specified multiple times";
3319		return LDAP_PROTOCOL_ERROR;
3320	}
3321
3322	if ( op->o_pagedresults != SLAP_CONTROL_NONE ) {
3323		rs->sr_text = "Sync control specified with pagedResults control";
3324		return LDAP_PROTOCOL_ERROR;
3325	}
3326
3327	if ( BER_BVISNULL( &ctrl->ldctl_value ) ) {
3328		rs->sr_text = "Sync control value is absent";
3329		return LDAP_PROTOCOL_ERROR;
3330	}
3331
3332	if ( BER_BVISEMPTY( &ctrl->ldctl_value ) ) {
3333		rs->sr_text = "Sync control value is empty";
3334		return LDAP_PROTOCOL_ERROR;
3335	}
3336
3337	/* Parse the control value
3338	 *      syncRequestValue ::= SEQUENCE {
3339	 *              mode   ENUMERATED {
3340	 *                      -- 0 unused
3341	 *                      refreshOnly		(1),
3342	 *                      -- 2 reserved
3343	 *                      refreshAndPersist	(3)
3344	 *              },
3345	 *              cookie  syncCookie OPTIONAL
3346	 *      }
3347	 */
3348
3349	ber_init2( ber, &ctrl->ldctl_value, 0 );
3350
3351	if ( (tag = ber_scanf( ber, "{i" /*}*/, &mode )) == LBER_ERROR ) {
3352		rs->sr_text = "Sync control : mode decoding error";
3353		return LDAP_PROTOCOL_ERROR;
3354	}
3355
3356	switch( mode ) {
3357	case LDAP_SYNC_REFRESH_ONLY:
3358		mode = SLAP_SYNC_REFRESH;
3359		break;
3360	case LDAP_SYNC_REFRESH_AND_PERSIST:
3361		mode = SLAP_SYNC_REFRESH_AND_PERSIST;
3362		break;
3363	default:
3364		rs->sr_text = "Sync control : unknown update mode";
3365		return LDAP_PROTOCOL_ERROR;
3366	}
3367
3368	tag = ber_peek_tag( ber, &len );
3369
3370	if ( tag == LDAP_TAG_SYNC_COOKIE ) {
3371		if (( ber_scanf( ber, /*{*/ "m", &cookie )) == LBER_ERROR ) {
3372			rs->sr_text = "Sync control : cookie decoding error";
3373			return LDAP_PROTOCOL_ERROR;
3374		}
3375		tag = ber_peek_tag( ber, &len );
3376	}
3377	if ( tag == LDAP_TAG_RELOAD_HINT ) {
3378		if (( ber_scanf( ber, /*{*/ "b", &rhint )) == LBER_ERROR ) {
3379			rs->sr_text = "Sync control : rhint decoding error";
3380			return LDAP_PROTOCOL_ERROR;
3381		}
3382	}
3383	if (( ber_scanf( ber, /*{*/ "}")) == LBER_ERROR ) {
3384			rs->sr_text = "Sync control : decoding error";
3385			return LDAP_PROTOCOL_ERROR;
3386	}
3387	sr = op->o_tmpcalloc( 1, sizeof(struct sync_control), op->o_tmpmemctx );
3388	sr->sr_rhint = rhint;
3389	if (!BER_BVISNULL(&cookie)) {
3390		ber_dupbv_x( &sr->sr_state.octet_str, &cookie, op->o_tmpmemctx );
3391		/* If parse fails, pretend no cookie was sent */
3392		if ( slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx ) ||
3393			sr->sr_state.rid == -1 ) {
3394			if ( sr->sr_state.ctxcsn ) {
3395				ber_bvarray_free_x( sr->sr_state.ctxcsn, op->o_tmpmemctx );
3396				sr->sr_state.ctxcsn = NULL;
3397			}
3398			sr->sr_state.numcsns = 0;
3399		}
3400	}
3401
3402	op->o_controls[slap_cids.sc_LDAPsync] = sr;
3403
3404	op->o_sync = ctrl->ldctl_iscritical
3405		? SLAP_CONTROL_CRITICAL
3406		: SLAP_CONTROL_NONCRITICAL;
3407
3408	op->o_sync_mode |= mode;	/* o_sync_mode shares o_sync */
3409
3410	return LDAP_SUCCESS;
3411}
3412
3413/* This overlay is set up for dynamic loading via moduleload. For static
3414 * configuration, you'll need to arrange for the slap_overinst to be
3415 * initialized and registered by some other function inside slapd.
3416 */
3417
3418static slap_overinst 		syncprov;
3419
3420int
3421syncprov_initialize()
3422{
3423	int rc;
3424
3425	rc = register_supported_control( LDAP_CONTROL_SYNC,
3426		SLAP_CTRL_SEARCH, NULL,
3427		syncprov_parseCtrl, &slap_cids.sc_LDAPsync );
3428	if ( rc != LDAP_SUCCESS ) {
3429		Debug( LDAP_DEBUG_ANY,
3430			"syncprov_init: Failed to register control %d\n", rc, 0, 0 );
3431		return rc;
3432	}
3433
3434	syncprov.on_bi.bi_type = "syncprov";
3435	syncprov.on_bi.bi_db_init = syncprov_db_init;
3436	syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;
3437	syncprov.on_bi.bi_db_open = syncprov_db_open;
3438	syncprov.on_bi.bi_db_close = syncprov_db_close;
3439
3440	syncprov.on_bi.bi_op_abandon = syncprov_op_abandon;
3441	syncprov.on_bi.bi_op_cancel = syncprov_op_abandon;
3442
3443	syncprov.on_bi.bi_op_add = syncprov_op_mod;
3444	syncprov.on_bi.bi_op_compare = syncprov_op_compare;
3445	syncprov.on_bi.bi_op_delete = syncprov_op_mod;
3446	syncprov.on_bi.bi_op_modify = syncprov_op_mod;
3447	syncprov.on_bi.bi_op_modrdn = syncprov_op_mod;
3448	syncprov.on_bi.bi_op_search = syncprov_op_search;
3449	syncprov.on_bi.bi_extended = syncprov_op_extended;
3450	syncprov.on_bi.bi_operational = syncprov_operational;
3451
3452	syncprov.on_bi.bi_cf_ocs = spocs;
3453
3454	generic_filter.f_desc = slap_schema.si_ad_objectClass;
3455
3456	rc = config_register_schema( spcfg, spocs );
3457	if ( rc ) return rc;
3458
3459	return overlay_register( &syncprov );
3460}
3461
3462#if SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC
3463int
3464init_module( int argc, char *argv[] )
3465{
3466	return syncprov_initialize();
3467}
3468#endif /* SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC */
3469
3470#endif /* defined(SLAPD_OVER_SYNCPROV) */
3471