syncprov.c revision 1.1.1.3
1/* $NetBSD: syncprov.c,v 1.1.1.3 2010/03/08 02:14:20 lukem Exp $ */ 2 3/* OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.147.2.70 2009/11/24 00:53:26 quanah Exp */ 4/* syncprov.c - syncrepl provider */ 5/* This work is part of OpenLDAP Software <http://www.openldap.org/>. 6 * 7 * Copyright 2004-2009 The OpenLDAP Foundation. 8 * All rights reserved. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted only as authorized by the OpenLDAP 12 * Public License. 13 * 14 * A copy of this license is available in the file LICENSE in the 15 * top-level directory of the distribution or, alternatively, at 16 * <http://www.OpenLDAP.org/license.html>. 17 */ 18/* ACKNOWLEDGEMENTS: 19 * This work was initially developed by Howard Chu for inclusion in 20 * OpenLDAP Software. 21 */ 22 23#include "portable.h" 24 25#ifdef SLAPD_OVER_SYNCPROV 26 27#include <ac/string.h> 28#include "lutil.h" 29#include "slap.h" 30#include "config.h" 31#include "ldap_rq.h" 32 33#ifdef LDAP_DEVEL 34#define CHECK_CSN 1 35#endif 36 37/* A modify request on a particular entry */ 38typedef struct modinst { 39 struct modinst *mi_next; 40 Operation *mi_op; 41} modinst; 42 43typedef struct modtarget { 44 struct modinst *mt_mods; 45 struct modinst *mt_tail; 46 Operation *mt_op; 47 ldap_pvt_thread_mutex_t mt_mutex; 48} modtarget; 49 50/* A queued result of a persistent search */ 51typedef struct syncres { 52 struct syncres *s_next; 53 Entry *s_e; 54 struct berval s_dn; 55 struct berval s_ndn; 56 struct berval s_uuid; 57 struct berval s_csn; 58 char s_mode; 59 char s_isreference; 60} syncres; 61 62/* Record of a persistent search */ 63typedef struct syncops { 64 struct syncops *s_next; 65 struct berval s_base; /* ndn of search base */ 66 ID s_eid; /* entryID of search base */ 67 Operation *s_op; /* search op */ 68 int s_rid; 69 int s_sid; 70 struct berval s_filterstr; 71 int s_flags; /* search status */ 72#define PS_IS_REFRESHING 0x01 73#define PS_IS_DETACHED 0x02 74#define PS_WROTE_BASE 0x04 75#define PS_FIND_BASE 0x08 76#define PS_FIX_FILTER 0x10 77#define PS_TASK_QUEUED 0x20 78 79 int s_inuse; /* reference count */ 80 struct syncres *s_res; 81 struct syncres *s_restail; 82 ldap_pvt_thread_mutex_t s_mutex; 83} syncops; 84 85/* A received sync control */ 86typedef struct sync_control { 87 struct sync_cookie sr_state; 88 int sr_rhint; 89} sync_control; 90 91#if 0 /* moved back to slap.h */ 92#define o_sync o_ctrlflag[slap_cids.sc_LDAPsync] 93#endif 94/* o_sync_mode uses data bits of o_sync */ 95#define o_sync_mode o_ctrlflag[slap_cids.sc_LDAPsync] 96 97#define SLAP_SYNC_NONE (LDAP_SYNC_NONE<<SLAP_CONTROL_SHIFT) 98#define SLAP_SYNC_REFRESH (LDAP_SYNC_REFRESH_ONLY<<SLAP_CONTROL_SHIFT) 99#define SLAP_SYNC_PERSIST (LDAP_SYNC_RESERVED<<SLAP_CONTROL_SHIFT) 100#define SLAP_SYNC_REFRESH_AND_PERSIST (LDAP_SYNC_REFRESH_AND_PERSIST<<SLAP_CONTROL_SHIFT) 101 102/* Record of which searches matched at premodify step */ 103typedef struct syncmatches { 104 struct syncmatches *sm_next; 105 syncops *sm_op; 106} syncmatches; 107 108/* Session log data */ 109typedef struct slog_entry { 110 struct slog_entry *se_next; 111 struct berval se_uuid; 112 struct berval se_csn; 113 int se_sid; 114 ber_tag_t se_tag; 115} slog_entry; 116 117typedef struct sessionlog { 118 struct berval sl_mincsn; 119 int sl_num; 120 int sl_size; 121 slog_entry *sl_head; 122 slog_entry *sl_tail; 123 ldap_pvt_thread_mutex_t sl_mutex; 124} sessionlog; 125 126/* The main state for this overlay */ 127typedef struct syncprov_info_t { 128 syncops *si_ops; 129 BerVarray si_ctxcsn; /* ldapsync context */ 130 struct berval si_contextdn; 131 int *si_sids; 132 int si_numcsns; 133 int si_chkops; /* checkpointing info */ 134 int si_chktime; 135 int si_numops; /* number of ops since last checkpoint */ 136 int si_nopres; /* Skip present phase */ 137 int si_usehint; /* use reload hint */ 138 time_t si_chklast; /* time of last checkpoint */ 139 Avlnode *si_mods; /* entries being modified */ 140 sessionlog *si_logs; 141 ldap_pvt_thread_rdwr_t si_csn_rwlock; 142 ldap_pvt_thread_mutex_t si_ops_mutex; 143 ldap_pvt_thread_mutex_t si_mods_mutex; 144 ldap_pvt_thread_mutex_t si_resp_mutex; 145} syncprov_info_t; 146 147typedef struct opcookie { 148 slap_overinst *son; 149 syncmatches *smatches; 150 modtarget *smt; 151 Entry *se; 152 struct berval sdn; /* DN of entry, for deletes */ 153 struct berval sndn; 154 struct berval suuid; /* UUID of entry */ 155 struct berval sctxcsn; 156 short osid; /* sid of op csn */ 157 short rsid; /* sid of relay */ 158 short sreference; /* Is the entry a reference? */ 159} opcookie; 160 161typedef struct mutexint { 162 ldap_pvt_thread_mutex_t mi_mutex; 163 int mi_int; 164} mutexint; 165 166typedef struct fbase_cookie { 167 struct berval *fdn; /* DN of a modified entry, for scope testing */ 168 syncops *fss; /* persistent search we're testing against */ 169 int fbase; /* if TRUE we found the search base and it's still valid */ 170 int fscope; /* if TRUE then fdn is within the psearch scope */ 171} fbase_cookie; 172 173static AttributeName csn_anlist[3]; 174static AttributeName uuid_anlist[2]; 175 176/* Build a LDAPsync intermediate state control */ 177static int 178syncprov_state_ctrl( 179 Operation *op, 180 SlapReply *rs, 181 Entry *e, 182 int entry_sync_state, 183 LDAPControl **ctrls, 184 int num_ctrls, 185 int send_cookie, 186 struct berval *cookie ) 187{ 188 Attribute* a; 189 int ret; 190 191 BerElementBuffer berbuf; 192 BerElement *ber = (BerElement *)&berbuf; 193 194 struct berval entryuuid_bv = BER_BVNULL; 195 196 ber_init2( ber, 0, LBER_USE_DER ); 197 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 198 199 ctrls[num_ctrls] = op->o_tmpalloc( sizeof ( LDAPControl ), op->o_tmpmemctx ); 200 201 for ( a = e->e_attrs; a != NULL; a = a->a_next ) { 202 AttributeDescription *desc = a->a_desc; 203 if ( desc == slap_schema.si_ad_entryUUID ) { 204 entryuuid_bv = a->a_nvals[0]; 205 break; 206 } 207 } 208 209 /* FIXME: what if entryuuid is NULL or empty ? */ 210 211 if ( send_cookie && cookie ) { 212 ber_printf( ber, "{eOON}", 213 entry_sync_state, &entryuuid_bv, cookie ); 214 } else { 215 ber_printf( ber, "{eON}", 216 entry_sync_state, &entryuuid_bv ); 217 } 218 219 ctrls[num_ctrls]->ldctl_oid = LDAP_CONTROL_SYNC_STATE; 220 ctrls[num_ctrls]->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL); 221 ret = ber_flatten2( ber, &ctrls[num_ctrls]->ldctl_value, 1 ); 222 223 ber_free_buf( ber ); 224 225 if ( ret < 0 ) { 226 Debug( LDAP_DEBUG_TRACE, 227 "slap_build_sync_ctrl: ber_flatten2 failed\n", 228 0, 0, 0 ); 229 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 230 return ret; 231 } 232 233 return LDAP_SUCCESS; 234} 235 236/* Build a LDAPsync final state control */ 237static int 238syncprov_done_ctrl( 239 Operation *op, 240 SlapReply *rs, 241 LDAPControl **ctrls, 242 int num_ctrls, 243 int send_cookie, 244 struct berval *cookie, 245 int refreshDeletes ) 246{ 247 int ret; 248 BerElementBuffer berbuf; 249 BerElement *ber = (BerElement *)&berbuf; 250 251 ber_init2( ber, NULL, LBER_USE_DER ); 252 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 253 254 ctrls[num_ctrls] = op->o_tmpalloc( sizeof ( LDAPControl ), op->o_tmpmemctx ); 255 256 ber_printf( ber, "{" ); 257 if ( send_cookie && cookie ) { 258 ber_printf( ber, "O", cookie ); 259 } 260 if ( refreshDeletes == LDAP_SYNC_REFRESH_DELETES ) { 261 ber_printf( ber, "b", refreshDeletes ); 262 } 263 ber_printf( ber, "N}" ); 264 265 ctrls[num_ctrls]->ldctl_oid = LDAP_CONTROL_SYNC_DONE; 266 ctrls[num_ctrls]->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL); 267 ret = ber_flatten2( ber, &ctrls[num_ctrls]->ldctl_value, 1 ); 268 269 ber_free_buf( ber ); 270 271 if ( ret < 0 ) { 272 Debug( LDAP_DEBUG_TRACE, 273 "syncprov_done_ctrl: ber_flatten2 failed\n", 274 0, 0, 0 ); 275 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 276 return ret; 277 } 278 279 return LDAP_SUCCESS; 280} 281 282static int 283syncprov_sendinfo( 284 Operation *op, 285 SlapReply *rs, 286 int type, 287 struct berval *cookie, 288 int refreshDone, 289 BerVarray syncUUIDs, 290 int refreshDeletes ) 291{ 292 BerElementBuffer berbuf; 293 BerElement *ber = (BerElement *)&berbuf; 294 struct berval rspdata; 295 296 int ret; 297 298 ber_init2( ber, NULL, LBER_USE_DER ); 299 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 300 301 if ( type ) { 302 switch ( type ) { 303 case LDAP_TAG_SYNC_NEW_COOKIE: 304 ber_printf( ber, "tO", type, cookie ); 305 break; 306 case LDAP_TAG_SYNC_REFRESH_DELETE: 307 case LDAP_TAG_SYNC_REFRESH_PRESENT: 308 ber_printf( ber, "t{", type ); 309 if ( cookie ) { 310 ber_printf( ber, "O", cookie ); 311 } 312 if ( refreshDone == 0 ) { 313 ber_printf( ber, "b", refreshDone ); 314 } 315 ber_printf( ber, "N}" ); 316 break; 317 case LDAP_TAG_SYNC_ID_SET: 318 ber_printf( ber, "t{", type ); 319 if ( cookie ) { 320 ber_printf( ber, "O", cookie ); 321 } 322 if ( refreshDeletes == 1 ) { 323 ber_printf( ber, "b", refreshDeletes ); 324 } 325 ber_printf( ber, "[W]", syncUUIDs ); 326 ber_printf( ber, "N}" ); 327 break; 328 default: 329 Debug( LDAP_DEBUG_TRACE, 330 "syncprov_sendinfo: invalid syncinfo type (%d)\n", 331 type, 0, 0 ); 332 return LDAP_OTHER; 333 } 334 } 335 336 ret = ber_flatten2( ber, &rspdata, 0 ); 337 338 if ( ret < 0 ) { 339 Debug( LDAP_DEBUG_TRACE, 340 "syncprov_sendinfo: ber_flatten2 failed\n", 341 0, 0, 0 ); 342 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 343 return ret; 344 } 345 346 rs->sr_rspoid = LDAP_SYNC_INFO; 347 rs->sr_rspdata = &rspdata; 348 send_ldap_intermediate( op, rs ); 349 rs->sr_rspdata = NULL; 350 ber_free_buf( ber ); 351 352 return LDAP_SUCCESS; 353} 354 355/* Find a modtarget in an AVL tree */ 356static int 357sp_avl_cmp( const void *c1, const void *c2 ) 358{ 359 const modtarget *m1, *m2; 360 int rc; 361 362 m1 = c1; m2 = c2; 363 rc = m1->mt_op->o_req_ndn.bv_len - m2->mt_op->o_req_ndn.bv_len; 364 365 if ( rc ) return rc; 366 return ber_bvcmp( &m1->mt_op->o_req_ndn, &m2->mt_op->o_req_ndn ); 367} 368 369/* syncprov_findbase: 370 * finds the true DN of the base of a search (with alias dereferencing) and 371 * checks to make sure the base entry doesn't get replaced with a different 372 * entry (e.g., swapping trees via ModDN, or retargeting an alias). If a 373 * change is detected, any persistent search on this base must be terminated / 374 * reloaded. 375 * On the first call, we just save the DN and entryID. On subsequent calls 376 * we compare the DN and entryID with the saved values. 377 */ 378static int 379findbase_cb( Operation *op, SlapReply *rs ) 380{ 381 slap_callback *sc = op->o_callback; 382 383 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { 384 fbase_cookie *fc = sc->sc_private; 385 386 /* If no entryID, we're looking for the first time. 387 * Just store whatever we got. 388 */ 389 if ( fc->fss->s_eid == NOID ) { 390 fc->fbase = 2; 391 fc->fss->s_eid = rs->sr_entry->e_id; 392 ber_dupbv( &fc->fss->s_base, &rs->sr_entry->e_nname ); 393 394 } else if ( rs->sr_entry->e_id == fc->fss->s_eid && 395 dn_match( &rs->sr_entry->e_nname, &fc->fss->s_base )) { 396 397 /* OK, the DN is the same and the entryID is the same. */ 398 fc->fbase = 1; 399 } 400 } 401 if ( rs->sr_err != LDAP_SUCCESS ) { 402 Debug( LDAP_DEBUG_ANY, "findbase failed! %d\n", rs->sr_err,0,0 ); 403 } 404 return LDAP_SUCCESS; 405} 406 407static Filter generic_filter = { LDAP_FILTER_PRESENT, { 0 }, NULL }; 408static struct berval generic_filterstr = BER_BVC("(objectclass=*)"); 409 410static int 411syncprov_findbase( Operation *op, fbase_cookie *fc ) 412{ 413 /* Use basic parameters from syncrepl search, but use 414 * current op's threadctx / tmpmemctx 415 */ 416 ldap_pvt_thread_mutex_lock( &fc->fss->s_mutex ); 417 if ( fc->fss->s_flags & PS_FIND_BASE ) { 418 slap_callback cb = {0}; 419 Operation fop; 420 SlapReply frs = { REP_RESULT }; 421 int rc; 422 423 fc->fss->s_flags ^= PS_FIND_BASE; 424 ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex ); 425 426 fop = *fc->fss->s_op; 427 428 fop.o_bd = fop.o_bd->bd_self; 429 fop.o_hdr = op->o_hdr; 430 fop.o_time = op->o_time; 431 fop.o_tincr = op->o_tincr; 432 433 cb.sc_response = findbase_cb; 434 cb.sc_private = fc; 435 436 fop.o_sync_mode = 0; /* turn off sync mode */ 437 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 438 fop.o_callback = &cb; 439 fop.o_tag = LDAP_REQ_SEARCH; 440 fop.ors_scope = LDAP_SCOPE_BASE; 441 fop.ors_limit = NULL; 442 fop.ors_slimit = 1; 443 fop.ors_tlimit = SLAP_NO_LIMIT; 444 fop.ors_attrs = slap_anlist_no_attrs; 445 fop.ors_attrsonly = 1; 446 fop.ors_filter = &generic_filter; 447 fop.ors_filterstr = generic_filterstr; 448 449 rc = fop.o_bd->be_search( &fop, &frs ); 450 } else { 451 ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex ); 452 fc->fbase = 1; 453 } 454 455 /* After the first call, see if the fdn resides in the scope */ 456 if ( fc->fbase == 1 ) { 457 switch ( fc->fss->s_op->ors_scope ) { 458 case LDAP_SCOPE_BASE: 459 fc->fscope = dn_match( fc->fdn, &fc->fss->s_base ); 460 break; 461 case LDAP_SCOPE_ONELEVEL: { 462 struct berval pdn; 463 dnParent( fc->fdn, &pdn ); 464 fc->fscope = dn_match( &pdn, &fc->fss->s_base ); 465 break; } 466 case LDAP_SCOPE_SUBTREE: 467 fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ); 468 break; 469 case LDAP_SCOPE_SUBORDINATE: 470 fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ) && 471 !dn_match( fc->fdn, &fc->fss->s_base ); 472 break; 473 } 474 } 475 476 if ( fc->fbase ) 477 return LDAP_SUCCESS; 478 479 /* If entryID has changed, then the base of this search has 480 * changed. Invalidate the psearch. 481 */ 482 return LDAP_NO_SUCH_OBJECT; 483} 484 485/* syncprov_findcsn: 486 * This function has three different purposes, but they all use a search 487 * that filters on entryCSN so they're combined here. 488 * 1: at startup time, after a contextCSN has been read from the database, 489 * we search for all entries with CSN >= contextCSN in case the contextCSN 490 * was not checkpointed at the previous shutdown. 491 * 492 * 2: when the current contextCSN is known and we have a sync cookie, we search 493 * for one entry with CSN = the cookie CSN. If not found, try <= cookie CSN. 494 * If an entry is found, the cookie CSN is valid, otherwise it is stale. 495 * 496 * 3: during a refresh phase, we search for all entries with CSN <= the cookie 497 * CSN, and generate Present records for them. We always collect this result 498 * in SyncID sets, even if there's only one match. 499 */ 500typedef enum find_csn_t { 501 FIND_MAXCSN = 1, 502 FIND_CSN = 2, 503 FIND_PRESENT = 3 504} find_csn_t; 505 506static int 507findmax_cb( Operation *op, SlapReply *rs ) 508{ 509 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { 510 struct berval *maxcsn = op->o_callback->sc_private; 511 Attribute *a = attr_find( rs->sr_entry->e_attrs, 512 slap_schema.si_ad_entryCSN ); 513 514 if ( a && ber_bvcmp( &a->a_vals[0], maxcsn ) > 0 && 515 slap_parse_csn_sid( &a->a_vals[0] ) == slap_serverID ) { 516 maxcsn->bv_len = a->a_vals[0].bv_len; 517 strcpy( maxcsn->bv_val, a->a_vals[0].bv_val ); 518 } 519 } 520 return LDAP_SUCCESS; 521} 522 523static int 524findcsn_cb( Operation *op, SlapReply *rs ) 525{ 526 slap_callback *sc = op->o_callback; 527 528 /* We just want to know that at least one exists, so it's OK if 529 * we exceed the unchecked limit. 530 */ 531 if ( rs->sr_err == LDAP_ADMINLIMIT_EXCEEDED || 532 (rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS )) { 533 sc->sc_private = (void *)1; 534 } 535 return LDAP_SUCCESS; 536} 537 538/* Build a list of entryUUIDs for sending in a SyncID set */ 539 540#define UUID_LEN 16 541 542typedef struct fpres_cookie { 543 int num; 544 BerVarray uuids; 545 char *last; 546} fpres_cookie; 547 548static int 549findpres_cb( Operation *op, SlapReply *rs ) 550{ 551 slap_callback *sc = op->o_callback; 552 fpres_cookie *pc = sc->sc_private; 553 Attribute *a; 554 int ret = SLAP_CB_CONTINUE; 555 556 switch ( rs->sr_type ) { 557 case REP_SEARCH: 558 a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID ); 559 if ( a ) { 560 pc->uuids[pc->num].bv_val = pc->last; 561 AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val, 562 pc->uuids[pc->num].bv_len ); 563 pc->num++; 564 pc->last = pc->uuids[pc->num].bv_val; 565 pc->uuids[pc->num].bv_val = NULL; 566 } 567 ret = LDAP_SUCCESS; 568 if ( pc->num != SLAP_SYNCUUID_SET_SIZE ) 569 break; 570 /* FALLTHRU */ 571 case REP_RESULT: 572 ret = rs->sr_err; 573 if ( pc->num ) { 574 ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, 575 0, pc->uuids, 0 ); 576 pc->uuids[pc->num].bv_val = pc->last; 577 pc->num = 0; 578 pc->last = pc->uuids[0].bv_val; 579 } 580 break; 581 default: 582 break; 583 } 584 return ret; 585} 586 587static int 588syncprov_findcsn( Operation *op, find_csn_t mode ) 589{ 590 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 591 syncprov_info_t *si = on->on_bi.bi_private; 592 593 slap_callback cb = {0}; 594 Operation fop; 595 SlapReply frs = { REP_RESULT }; 596 char buf[LDAP_LUTIL_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")]; 597 char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; 598 struct berval maxcsn; 599 Filter cf; 600 AttributeAssertion eq = ATTRIBUTEASSERTION_INIT; 601 fpres_cookie pcookie; 602 sync_control *srs = NULL; 603 struct slap_limits_set fc_limits; 604 int i, rc = LDAP_SUCCESS, findcsn_retry = 1; 605 int maxid; 606 607 if ( mode != FIND_MAXCSN ) { 608 srs = op->o_controls[slap_cids.sc_LDAPsync]; 609 } 610 611 fop = *op; 612 fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync_mode */ 613 /* We want pure entries, not referrals */ 614 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 615 616 cf.f_ava = &eq; 617 cf.f_av_desc = slap_schema.si_ad_entryCSN; 618 BER_BVZERO( &cf.f_av_value ); 619 cf.f_next = NULL; 620 621 fop.o_callback = &cb; 622 fop.ors_limit = NULL; 623 fop.ors_tlimit = SLAP_NO_LIMIT; 624 fop.ors_filter = &cf; 625 fop.ors_filterstr.bv_val = buf; 626 627again: 628 switch( mode ) { 629 case FIND_MAXCSN: 630 cf.f_choice = LDAP_FILTER_GE; 631 /* If there are multiple CSNs, use the one with our serverID */ 632 for ( i=0; i<si->si_numcsns; i++) { 633 if ( slap_serverID == si->si_sids[i] ) { 634 maxid = i; 635 break; 636 } 637 } 638 if ( i == si->si_numcsns ) { 639 /* No match: this is multimaster, and none of the content in the DB 640 * originated locally. Treat like no CSN. 641 */ 642 return LDAP_NO_SUCH_OBJECT; 643 } 644 cf.f_av_value = si->si_ctxcsn[maxid]; 645 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 646 "(entryCSN>=%s)", cf.f_av_value.bv_val ); 647 if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) { 648 return LDAP_OTHER; 649 } 650 fop.ors_attrsonly = 0; 651 fop.ors_attrs = csn_anlist; 652 fop.ors_slimit = SLAP_NO_LIMIT; 653 cb.sc_private = &maxcsn; 654 cb.sc_response = findmax_cb; 655 strcpy( cbuf, cf.f_av_value.bv_val ); 656 maxcsn.bv_val = cbuf; 657 maxcsn.bv_len = cf.f_av_value.bv_len; 658 break; 659 case FIND_CSN: 660 if ( BER_BVISEMPTY( &cf.f_av_value )) { 661 cf.f_av_value = srs->sr_state.ctxcsn[0]; 662 /* If there are multiple CSNs, use the smallest */ 663 for ( i=1; i<srs->sr_state.numcsns; i++ ) { 664 if ( ber_bvcmp( &cf.f_av_value, &srs->sr_state.ctxcsn[i] ) 665 > 0 ) { 666 cf.f_av_value = srs->sr_state.ctxcsn[i]; 667 } 668 } 669 } 670 /* Look for exact match the first time */ 671 if ( findcsn_retry ) { 672 cf.f_choice = LDAP_FILTER_EQUALITY; 673 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 674 "(entryCSN=%s)", cf.f_av_value.bv_val ); 675 /* On retry, look for <= */ 676 } else { 677 cf.f_choice = LDAP_FILTER_LE; 678 fop.ors_limit = &fc_limits; 679 memset( &fc_limits, 0, sizeof( fc_limits )); 680 fc_limits.lms_s_unchecked = 1; 681 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 682 "(entryCSN<=%s)", cf.f_av_value.bv_val ); 683 } 684 if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) { 685 return LDAP_OTHER; 686 } 687 fop.ors_attrsonly = 1; 688 fop.ors_attrs = slap_anlist_no_attrs; 689 fop.ors_slimit = 1; 690 cb.sc_private = NULL; 691 cb.sc_response = findcsn_cb; 692 break; 693 case FIND_PRESENT: 694 fop.ors_filter = op->ors_filter; 695 fop.ors_filterstr = op->ors_filterstr; 696 fop.ors_attrsonly = 0; 697 fop.ors_attrs = uuid_anlist; 698 fop.ors_slimit = SLAP_NO_LIMIT; 699 cb.sc_private = &pcookie; 700 cb.sc_response = findpres_cb; 701 pcookie.num = 0; 702 703 /* preallocate storage for a full set */ 704 pcookie.uuids = op->o_tmpalloc( (SLAP_SYNCUUID_SET_SIZE+1) * 705 sizeof(struct berval) + SLAP_SYNCUUID_SET_SIZE * UUID_LEN, 706 op->o_tmpmemctx ); 707 pcookie.last = (char *)(pcookie.uuids + SLAP_SYNCUUID_SET_SIZE+1); 708 pcookie.uuids[0].bv_val = pcookie.last; 709 pcookie.uuids[0].bv_len = UUID_LEN; 710 for (i=1; i<SLAP_SYNCUUID_SET_SIZE; i++) { 711 pcookie.uuids[i].bv_val = pcookie.uuids[i-1].bv_val + UUID_LEN; 712 pcookie.uuids[i].bv_len = UUID_LEN; 713 } 714 break; 715 } 716 717 fop.o_bd->bd_info = (BackendInfo *)on->on_info; 718 fop.o_bd->be_search( &fop, &frs ); 719 fop.o_bd->bd_info = (BackendInfo *)on; 720 721 switch( mode ) { 722 case FIND_MAXCSN: 723 if ( ber_bvcmp( &si->si_ctxcsn[maxid], &maxcsn )) { 724#ifdef CHECK_CSN 725 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 726 assert( !syn->ssyn_validate( syn, &maxcsn )); 727#endif 728 ber_bvreplace( &si->si_ctxcsn[maxid], &maxcsn ); 729 si->si_numops++; /* ensure a checkpoint */ 730 } 731 break; 732 case FIND_CSN: 733 /* If matching CSN was not found, invalidate the context. */ 734 if ( !cb.sc_private ) { 735 /* If we didn't find an exact match, then try for <= */ 736 if ( findcsn_retry ) { 737 findcsn_retry = 0; 738 goto again; 739 } 740 rc = LDAP_NO_SUCH_OBJECT; 741 } 742 break; 743 case FIND_PRESENT: 744 op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx ); 745 break; 746 } 747 748 return rc; 749} 750 751/* Should find a place to cache these */ 752static mutexint *get_mutexint() 753{ 754 mutexint *mi = ch_malloc( sizeof( mutexint )); 755 ldap_pvt_thread_mutex_init( &mi->mi_mutex ); 756 mi->mi_int = 1; 757 return mi; 758} 759 760static void inc_mutexint( mutexint *mi ) 761{ 762 ldap_pvt_thread_mutex_lock( &mi->mi_mutex ); 763 mi->mi_int++; 764 ldap_pvt_thread_mutex_unlock( &mi->mi_mutex ); 765} 766 767/* return resulting counter */ 768static int dec_mutexint( mutexint *mi ) 769{ 770 int i; 771 ldap_pvt_thread_mutex_lock( &mi->mi_mutex ); 772 i = --mi->mi_int; 773 ldap_pvt_thread_mutex_unlock( &mi->mi_mutex ); 774 if ( !i ) { 775 ldap_pvt_thread_mutex_destroy( &mi->mi_mutex ); 776 ch_free( mi ); 777 } 778 return i; 779} 780 781static void 782syncprov_free_syncop( syncops *so ) 783{ 784 syncres *sr, *srnext; 785 GroupAssertion *ga, *gnext; 786 787 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 788 if ( --so->s_inuse > 0 ) { 789 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 790 return; 791 } 792 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 793 if ( so->s_flags & PS_IS_DETACHED ) { 794 filter_free( so->s_op->ors_filter ); 795 for ( ga = so->s_op->o_groups; ga; ga=gnext ) { 796 gnext = ga->ga_next; 797 ch_free( ga ); 798 } 799 ch_free( so->s_op ); 800 } 801 ch_free( so->s_base.bv_val ); 802 for ( sr=so->s_res; sr; sr=srnext ) { 803 srnext = sr->s_next; 804 if ( sr->s_e ) { 805 if ( !dec_mutexint( sr->s_e->e_private )) { 806 sr->s_e->e_private = NULL; 807 entry_free( sr->s_e ); 808 } 809 } 810 ch_free( sr ); 811 } 812 ldap_pvt_thread_mutex_destroy( &so->s_mutex ); 813 ch_free( so ); 814} 815 816/* Send a persistent search response */ 817static int 818syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode ) 819{ 820 slap_overinst *on = opc->son; 821 822 SlapReply rs = { REP_SEARCH }; 823 LDAPControl *ctrls[2]; 824 struct berval cookie = BER_BVNULL, csns[2]; 825 Entry e_uuid = {0}; 826 Attribute a_uuid = {0}; 827 828 if ( so->s_op->o_abandon ) 829 return SLAPD_ABANDON; 830 831 ctrls[1] = NULL; 832 if ( !BER_BVISNULL( &opc->sctxcsn )) { 833 csns[0] = opc->sctxcsn; 834 BER_BVZERO( &csns[1] ); 835 slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, slap_serverID ? slap_serverID : -1 ); 836 } 837 838#ifdef LDAP_DEBUG 839 if ( so->s_sid > 0 ) { 840 Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: to=%03x, cookie=%s\n", 841 so->s_sid, cookie.bv_val ? cookie.bv_val : "", 0 ); 842 } else { 843 Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: cookie=%s\n", 844 cookie.bv_val ? cookie.bv_val : "", 0, 0 ); 845 } 846#endif 847 848 e_uuid.e_attrs = &a_uuid; 849 a_uuid.a_desc = slap_schema.si_ad_entryUUID; 850 a_uuid.a_nvals = &opc->suuid; 851 rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid, 852 mode, ctrls, 0, 1, &cookie ); 853 if ( !BER_BVISNULL( &cookie )) { 854 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 855 } 856 857 rs.sr_ctrls = ctrls; 858 rs.sr_entry = &e_uuid; 859 if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) { 860 e_uuid = *opc->se; 861 e_uuid.e_private = NULL; 862 } 863 864 switch( mode ) { 865 case LDAP_SYNC_ADD: 866 if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { 867 rs.sr_ref = get_entry_referrals( op, rs.sr_entry ); 868 rs.sr_err = send_search_reference( op, &rs ); 869 ber_bvarray_free( rs.sr_ref ); 870 break; 871 } 872 /* fallthru */ 873 case LDAP_SYNC_MODIFY: 874 rs.sr_attrs = op->ors_attrs; 875 rs.sr_err = send_search_entry( op, &rs ); 876 break; 877 case LDAP_SYNC_DELETE: 878 e_uuid.e_attrs = NULL; 879 e_uuid.e_name = opc->sdn; 880 e_uuid.e_nname = opc->sndn; 881 if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { 882 struct berval bv = BER_BVNULL; 883 rs.sr_ref = &bv; 884 rs.sr_err = send_search_reference( op, &rs ); 885 } else { 886 rs.sr_err = send_search_entry( op, &rs ); 887 } 888 break; 889 default: 890 assert(0); 891 } 892 /* In case someone else freed it already? */ 893 if ( rs.sr_ctrls ) { 894 int i; 895 for ( i=0; rs.sr_ctrls[i]; i++ ) { 896 if ( rs.sr_ctrls[i] == ctrls[0] ) { 897 op->o_tmpfree( ctrls[0]->ldctl_value.bv_val, op->o_tmpmemctx ); 898 ctrls[0]->ldctl_value.bv_val = NULL; 899 break; 900 } 901 } 902 rs.sr_ctrls = NULL; 903 } 904 905 return rs.sr_err; 906} 907 908static void 909syncprov_qstart( syncops *so ); 910 911/* Play back queued responses */ 912static int 913syncprov_qplay( Operation *op, syncops *so ) 914{ 915 slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key; 916 syncres *sr; 917 Entry *e; 918 opcookie opc; 919 int rc = 0; 920 921 opc.son = on; 922 923 do { 924 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 925 sr = so->s_res; 926 if ( sr ) 927 so->s_res = sr->s_next; 928 if ( !so->s_res ) 929 so->s_restail = NULL; 930 /* Exit loop with mutex held */ 931 if ( !sr || so->s_op->o_abandon ) 932 break; 933 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 934 935 if ( sr->s_mode == LDAP_SYNC_NEW_COOKIE ) { 936 SlapReply rs = { REP_INTERMEDIATE }; 937 938 rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE, 939 &sr->s_csn, 0, NULL, 0 ); 940 } else { 941 opc.sdn = sr->s_dn; 942 opc.sndn = sr->s_ndn; 943 opc.suuid = sr->s_uuid; 944 opc.sctxcsn = sr->s_csn; 945 opc.sreference = sr->s_isreference; 946 opc.se = sr->s_e; 947 948 rc = syncprov_sendresp( op, &opc, so, sr->s_mode ); 949 950 if ( opc.se ) { 951 if ( !dec_mutexint( opc.se->e_private )) { 952 opc.se->e_private = NULL; 953 entry_free ( opc.se ); 954 } 955 } 956 } 957 958 ch_free( sr ); 959 960 /* Exit loop with mutex held */ 961 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 962 963 } while (0); 964 965 /* We now only send one change at a time, to prevent one 966 * psearch from hogging all the CPU. Resubmit this task if 967 * there are more responses queued and no errors occurred. 968 */ 969 970 if ( rc == 0 && so->s_res ) { 971 syncprov_qstart( so ); 972 } else { 973 so->s_flags ^= PS_TASK_QUEUED; 974 } 975 976 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 977 return rc; 978} 979 980/* task for playing back queued responses */ 981static void * 982syncprov_qtask( void *ctx, void *arg ) 983{ 984 syncops *so = arg; 985 OperationBuffer opbuf; 986 Operation *op; 987 BackendDB be; 988 int rc; 989 990 op = &opbuf.ob_op; 991 *op = *so->s_op; 992 op->o_hdr = &opbuf.ob_hdr; 993 op->o_controls = opbuf.ob_controls; 994 memset( op->o_controls, 0, sizeof(opbuf.ob_controls) ); 995 996 *op->o_hdr = *so->s_op->o_hdr; 997 998 op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx, 1); 999 op->o_tmpmfuncs = &slap_sl_mfuncs; 1000 op->o_threadctx = ctx; 1001 1002 /* syncprov_qplay expects a fake db */ 1003 be = *so->s_op->o_bd; 1004 be.be_flags |= SLAP_DBFLAG_OVERLAY; 1005 op->o_bd = &be; 1006 LDAP_SLIST_FIRST(&op->o_extra) = NULL; 1007 op->o_callback = NULL; 1008 1009 rc = syncprov_qplay( op, so ); 1010 1011 /* decrement use count... */ 1012 syncprov_free_syncop( so ); 1013 1014 return NULL; 1015} 1016 1017/* Start the task to play back queued psearch responses */ 1018static void 1019syncprov_qstart( syncops *so ) 1020{ 1021 so->s_flags |= PS_TASK_QUEUED; 1022 so->s_inuse++; 1023 ldap_pvt_thread_pool_submit( &connection_pool, 1024 syncprov_qtask, so ); 1025} 1026 1027/* Queue a persistent search response */ 1028static int 1029syncprov_qresp( opcookie *opc, syncops *so, int mode ) 1030{ 1031 syncres *sr; 1032 int srsize; 1033 struct berval cookie = opc->sctxcsn; 1034 1035 if ( mode == LDAP_SYNC_NEW_COOKIE ) { 1036 syncprov_info_t *si = opc->son->on_bi.bi_private; 1037 1038 slap_compose_sync_cookie( NULL, &cookie, si->si_ctxcsn, 1039 so->s_rid, slap_serverID ? slap_serverID : -1); 1040 } 1041 1042 srsize = sizeof(syncres) + opc->suuid.bv_len + 1 + 1043 opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1; 1044 if ( cookie.bv_len ) 1045 srsize += cookie.bv_len + 1; 1046 sr = ch_malloc( srsize ); 1047 sr->s_next = NULL; 1048 sr->s_e = opc->se; 1049 /* bump refcount on this entry */ 1050 if ( opc->se ) 1051 inc_mutexint( opc->se->e_private ); 1052 sr->s_dn.bv_val = (char *)(sr + 1); 1053 sr->s_dn.bv_len = opc->sdn.bv_len; 1054 sr->s_mode = mode; 1055 sr->s_isreference = opc->sreference; 1056 sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, 1057 opc->sdn.bv_val ) + 1; 1058 sr->s_ndn.bv_len = opc->sndn.bv_len; 1059 sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, 1060 opc->sndn.bv_val ) + 1; 1061 sr->s_uuid.bv_len = opc->suuid.bv_len; 1062 AC_MEMCPY( sr->s_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); 1063 if ( cookie.bv_len ) { 1064 sr->s_csn.bv_val = sr->s_uuid.bv_val + sr->s_uuid.bv_len + 1; 1065 strcpy( sr->s_csn.bv_val, cookie.bv_val ); 1066 } else { 1067 sr->s_csn.bv_val = NULL; 1068 } 1069 sr->s_csn.bv_len = cookie.bv_len; 1070 1071 if ( mode == LDAP_SYNC_NEW_COOKIE && cookie.bv_val ) { 1072 ch_free( cookie.bv_val ); 1073 } 1074 1075 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 1076 if ( !so->s_res ) { 1077 so->s_res = sr; 1078 } else { 1079 so->s_restail->s_next = sr; 1080 } 1081 so->s_restail = sr; 1082 1083 /* If the base of the psearch was modified, check it next time round */ 1084 if ( so->s_flags & PS_WROTE_BASE ) { 1085 so->s_flags ^= PS_WROTE_BASE; 1086 so->s_flags |= PS_FIND_BASE; 1087 } 1088 if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) { 1089 syncprov_qstart( so ); 1090 } 1091 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 1092 return LDAP_SUCCESS; 1093} 1094 1095static int 1096syncprov_drop_psearch( syncops *so, int lock ) 1097{ 1098 if ( so->s_flags & PS_IS_DETACHED ) { 1099 if ( lock ) 1100 ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex ); 1101 so->s_op->o_conn->c_n_ops_executing--; 1102 so->s_op->o_conn->c_n_ops_completed++; 1103 LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, Operation, 1104 o_next ); 1105 if ( lock ) 1106 ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex ); 1107 } 1108 syncprov_free_syncop( so ); 1109 1110 return 0; 1111} 1112 1113static int 1114syncprov_ab_cleanup( Operation *op, SlapReply *rs ) 1115{ 1116 slap_callback *sc = op->o_callback; 1117 op->o_callback = sc->sc_next; 1118 syncprov_drop_psearch( op->o_callback->sc_private, 0 ); 1119 op->o_tmpfree( sc, op->o_tmpmemctx ); 1120 return 0; 1121} 1122 1123static int 1124syncprov_op_abandon( Operation *op, SlapReply *rs ) 1125{ 1126 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1127 syncprov_info_t *si = on->on_bi.bi_private; 1128 syncops *so, *soprev; 1129 1130 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1131 for ( so=si->si_ops, soprev = (syncops *)&si->si_ops; so; 1132 soprev=so, so=so->s_next ) { 1133 if ( so->s_op->o_connid == op->o_connid && 1134 so->s_op->o_msgid == op->orn_msgid ) { 1135 so->s_op->o_abandon = 1; 1136 soprev->s_next = so->s_next; 1137 break; 1138 } 1139 } 1140 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1141 if ( so ) { 1142 /* Is this really a Cancel exop? */ 1143 if ( op->o_tag != LDAP_REQ_ABANDON ) { 1144 so->s_op->o_cancel = SLAP_CANCEL_ACK; 1145 rs->sr_err = LDAP_CANCELLED; 1146 send_ldap_result( so->s_op, rs ); 1147 if ( so->s_flags & PS_IS_DETACHED ) { 1148 slap_callback *cb; 1149 cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx ); 1150 cb->sc_cleanup = syncprov_ab_cleanup; 1151 cb->sc_next = op->o_callback; 1152 cb->sc_private = so; 1153 return SLAP_CB_CONTINUE; 1154 } 1155 } 1156 syncprov_drop_psearch( so, 0 ); 1157 } 1158 return SLAP_CB_CONTINUE; 1159} 1160 1161/* Find which persistent searches are affected by this operation */ 1162static void 1163syncprov_matchops( Operation *op, opcookie *opc, int saveit ) 1164{ 1165 slap_overinst *on = opc->son; 1166 syncprov_info_t *si = on->on_bi.bi_private; 1167 1168 fbase_cookie fc; 1169 syncops *ss, *sprev, *snext; 1170 Entry *e = NULL; 1171 Attribute *a; 1172 int rc; 1173 struct berval newdn; 1174 int freefdn = 0; 1175 BackendDB *b0 = op->o_bd, db; 1176 1177 fc.fdn = &op->o_req_ndn; 1178 /* compute new DN */ 1179 if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) { 1180 struct berval pdn; 1181 if ( op->orr_nnewSup ) pdn = *op->orr_nnewSup; 1182 else dnParent( fc.fdn, &pdn ); 1183 build_new_dn( &newdn, &pdn, &op->orr_nnewrdn, op->o_tmpmemctx ); 1184 fc.fdn = &newdn; 1185 freefdn = 1; 1186 } 1187 if ( op->o_tag != LDAP_REQ_ADD ) { 1188 if ( !SLAP_ISOVERLAY( op->o_bd )) { 1189 db = *op->o_bd; 1190 op->o_bd = &db; 1191 } 1192 rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on ); 1193 /* If we're sending responses now, make a copy and unlock the DB */ 1194 if ( e && !saveit ) { 1195 if ( !opc->se ) { 1196 opc->se = entry_dup( e ); 1197 opc->se->e_private = get_mutexint(); 1198 } 1199 overlay_entry_release_ov( op, e, 0, on ); 1200 e = opc->se; 1201 } 1202 if ( rc ) { 1203 op->o_bd = b0; 1204 return; 1205 } 1206 } else { 1207 e = op->ora_e; 1208 if ( !saveit ) { 1209 if ( !opc->se ) { 1210 opc->se = entry_dup( e ); 1211 opc->se->e_private = get_mutexint(); 1212 } 1213 e = opc->se; 1214 } 1215 } 1216 1217 if ( saveit || op->o_tag == LDAP_REQ_ADD ) { 1218 ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx ); 1219 ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx ); 1220 opc->sreference = is_entry_referral( e ); 1221 a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID ); 1222 if ( a ) 1223 ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx ); 1224 } else if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) { 1225 op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx ); 1226 op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx ); 1227 ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx ); 1228 ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx ); 1229 } 1230 1231 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1232 for (ss = si->si_ops, sprev = (syncops *)&si->si_ops; ss; 1233 sprev = ss, ss=snext) 1234 { 1235 Operation op2; 1236 Opheader oh; 1237 syncmatches *sm; 1238 int found = 0; 1239 1240 snext = ss->s_next; 1241 if ( ss->s_op->o_abandon ) 1242 continue; 1243 1244 /* Don't send ops back to the originator */ 1245 if ( opc->osid > 0 && opc->osid == ss->s_sid ) { 1246 Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping original sid %03x\n", 1247 opc->osid, 0, 0 ); 1248 continue; 1249 } 1250 1251 /* Don't send ops back to the messenger */ 1252 if ( opc->rsid > 0 && opc->rsid == ss->s_sid ) { 1253 Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping relayed sid %03x\n", 1254 opc->rsid, 0, 0 ); 1255 continue; 1256 } 1257 1258 /* validate base */ 1259 fc.fss = ss; 1260 fc.fbase = 0; 1261 fc.fscope = 0; 1262 1263 /* If the base of the search is missing, signal a refresh */ 1264 rc = syncprov_findbase( op, &fc ); 1265 if ( rc != LDAP_SUCCESS ) { 1266 SlapReply rs = {REP_RESULT}; 1267 send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED, 1268 "search base has changed" ); 1269 sprev->s_next = snext; 1270 syncprov_drop_psearch( ss, 1 ); 1271 ss = sprev; 1272 continue; 1273 } 1274 1275 /* If we're sending results now, look for this op in old matches */ 1276 if ( !saveit ) { 1277 syncmatches *old; 1278 1279 /* Did we modify the search base? */ 1280 if ( dn_match( &op->o_req_ndn, &ss->s_base )) { 1281 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1282 ss->s_flags |= PS_WROTE_BASE; 1283 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1284 } 1285 1286 for ( sm=opc->smatches, old=(syncmatches *)&opc->smatches; sm; 1287 old=sm, sm=sm->sm_next ) { 1288 if ( sm->sm_op == ss ) { 1289 found = 1; 1290 old->sm_next = sm->sm_next; 1291 op->o_tmpfree( sm, op->o_tmpmemctx ); 1292 break; 1293 } 1294 } 1295 } 1296 1297 if ( fc.fscope ) { 1298 op2 = *ss->s_op; 1299 oh = *op->o_hdr; 1300 oh.oh_conn = ss->s_op->o_conn; 1301 oh.oh_connid = ss->s_op->o_connid; 1302 op2.o_bd = op->o_bd->bd_self; 1303 op2.o_hdr = &oh; 1304 op2.o_extra = op->o_extra; 1305 op2.o_callback = NULL; 1306 rc = test_filter( &op2, e, ss->s_op->ors_filter ); 1307 } 1308 1309 Debug( LDAP_DEBUG_TRACE, "syncprov_matchops: sid %03x fscope %d rc %d\n", 1310 ss->s_sid, fc.fscope, rc ); 1311 1312 /* check if current o_req_dn is in scope and matches filter */ 1313 if ( fc.fscope && rc == LDAP_COMPARE_TRUE ) { 1314 if ( saveit ) { 1315 sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx ); 1316 sm->sm_next = opc->smatches; 1317 sm->sm_op = ss; 1318 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1319 ++ss->s_inuse; 1320 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1321 opc->smatches = sm; 1322 } else { 1323 /* if found send UPDATE else send ADD */ 1324 syncprov_qresp( opc, ss, 1325 found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD ); 1326 } 1327 } else if ( !saveit && found ) { 1328 /* send DELETE */ 1329 syncprov_qresp( opc, ss, LDAP_SYNC_DELETE ); 1330 } else if ( !saveit ) { 1331 syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE ); 1332 } 1333 if ( !saveit && found ) { 1334 /* Decrement s_inuse, was incremented when called 1335 * with saveit == TRUE 1336 */ 1337 syncprov_free_syncop( ss ); 1338 } 1339 } 1340 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1341 1342 if ( op->o_tag != LDAP_REQ_ADD && e ) { 1343 if ( !SLAP_ISOVERLAY( op->o_bd )) { 1344 op->o_bd = &db; 1345 } 1346 if ( saveit ) 1347 overlay_entry_release_ov( op, e, 0, on ); 1348 op->o_bd = b0; 1349 } 1350 if ( opc->se && !saveit ) { 1351 if ( !dec_mutexint( opc->se->e_private )) { 1352 opc->se->e_private = NULL; 1353 entry_free( opc->se ); 1354 opc->se = NULL; 1355 } 1356 } 1357 if ( freefdn ) { 1358 op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx ); 1359 } 1360 op->o_bd = b0; 1361} 1362 1363static int 1364syncprov_op_cleanup( Operation *op, SlapReply *rs ) 1365{ 1366 slap_callback *cb = op->o_callback; 1367 opcookie *opc = cb->sc_private; 1368 slap_overinst *on = opc->son; 1369 syncprov_info_t *si = on->on_bi.bi_private; 1370 syncmatches *sm, *snext; 1371 modtarget *mt, mtdummy; 1372 1373 for (sm = opc->smatches; sm; sm=snext) { 1374 snext = sm->sm_next; 1375 syncprov_free_syncop( sm->sm_op ); 1376 op->o_tmpfree( sm, op->o_tmpmemctx ); 1377 } 1378 1379 /* Remove op from lock table */ 1380 mt = opc->smt; 1381 if ( mt ) { 1382 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 1383 mt->mt_mods = mt->mt_mods->mi_next; 1384 /* If there are more, promote the next one */ 1385 if ( mt->mt_mods ) { 1386 mt->mt_op = mt->mt_mods->mi_op; 1387 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 1388 } else { 1389 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 1390 ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); 1391 avl_delete( &si->si_mods, mt, sp_avl_cmp ); 1392 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 1393 ldap_pvt_thread_mutex_destroy( &mt->mt_mutex ); 1394 ch_free( mt ); 1395 } 1396 } 1397 if ( !BER_BVISNULL( &opc->suuid )) 1398 op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx ); 1399 if ( !BER_BVISNULL( &opc->sndn )) 1400 op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx ); 1401 if ( !BER_BVISNULL( &opc->sdn )) 1402 op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx ); 1403 op->o_callback = cb->sc_next; 1404 op->o_tmpfree(cb, op->o_tmpmemctx); 1405 1406 return 0; 1407} 1408 1409static void 1410syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on ) 1411{ 1412 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 1413 Modifications mod; 1414 Operation opm; 1415 SlapReply rsm = { 0 }; 1416 slap_callback cb = {0}; 1417 BackendDB be; 1418 1419#ifdef CHECK_CSN 1420 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 1421 1422 int i; 1423 for ( i=0; i<si->si_numcsns; i++ ) { 1424 assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i )); 1425 } 1426#endif 1427 mod.sml_numvals = si->si_numcsns; 1428 mod.sml_values = si->si_ctxcsn; 1429 mod.sml_nvalues = NULL; 1430 mod.sml_desc = slap_schema.si_ad_contextCSN; 1431 mod.sml_op = LDAP_MOD_REPLACE; 1432 mod.sml_flags = SLAP_MOD_INTERNAL; 1433 mod.sml_next = NULL; 1434 1435 cb.sc_response = slap_null_cb; 1436 opm = *op; 1437 opm.o_tag = LDAP_REQ_MODIFY; 1438 opm.o_callback = &cb; 1439 opm.orm_modlist = &mod; 1440 opm.orm_no_opattrs = 1; 1441 if ( SLAP_GLUE_SUBORDINATE( op->o_bd )) { 1442 be = *on->on_info->oi_origdb; 1443 opm.o_bd = &be; 1444 } 1445 opm.o_req_dn = si->si_contextdn; 1446 opm.o_req_ndn = si->si_contextdn; 1447 opm.o_bd->bd_info = on->on_info->oi_orig; 1448 opm.o_managedsait = SLAP_CONTROL_NONCRITICAL; 1449 opm.o_no_schema_check = 1; 1450 opm.o_bd->be_modify( &opm, &rsm ); 1451 1452 if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT && 1453 SLAP_SYNC_SUBENTRY( opm.o_bd )) { 1454 const char *text; 1455 char txtbuf[SLAP_TEXT_BUFLEN]; 1456 size_t textlen = sizeof txtbuf; 1457 Entry *e = slap_create_context_csn_entry( opm.o_bd, NULL ); 1458 slap_mods2entry( &mod, &e, 0, 1, &text, txtbuf, textlen); 1459 opm.ora_e = e; 1460 opm.o_bd->be_add( &opm, &rsm ); 1461 if ( e == opm.ora_e ) 1462 be_entry_release_w( &opm, opm.ora_e ); 1463 } 1464 1465 if ( mod.sml_next != NULL ) { 1466 slap_mods_free( mod.sml_next, 1 ); 1467 } 1468#ifdef CHECK_CSN 1469 for ( i=0; i<si->si_numcsns; i++ ) { 1470 assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i )); 1471 } 1472#endif 1473} 1474 1475static void 1476syncprov_add_slog( Operation *op ) 1477{ 1478 opcookie *opc = op->o_callback->sc_private; 1479 slap_overinst *on = opc->son; 1480 syncprov_info_t *si = on->on_bi.bi_private; 1481 sessionlog *sl; 1482 slog_entry *se; 1483 1484 sl = si->si_logs; 1485 { 1486 /* Allocate a record. UUIDs are not NUL-terminated. */ 1487 se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + 1488 op->o_csn.bv_len + 1 ); 1489 se->se_next = NULL; 1490 se->se_tag = op->o_tag; 1491 1492 se->se_uuid.bv_val = (char *)(&se[1]); 1493 AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); 1494 se->se_uuid.bv_len = opc->suuid.bv_len; 1495 1496 se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len; 1497 AC_MEMCPY( se->se_csn.bv_val, op->o_csn.bv_val, op->o_csn.bv_len ); 1498 se->se_csn.bv_val[op->o_csn.bv_len] = '\0'; 1499 se->se_csn.bv_len = op->o_csn.bv_len; 1500 se->se_sid = slap_parse_csn_sid( &se->se_csn ); 1501 1502 ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); 1503 if ( sl->sl_head ) { 1504 sl->sl_tail->se_next = se; 1505 } else { 1506 sl->sl_head = se; 1507 } 1508 sl->sl_tail = se; 1509 sl->sl_num++; 1510 while ( sl->sl_num > sl->sl_size ) { 1511 se = sl->sl_head; 1512 sl->sl_head = se->se_next; 1513 strcpy( sl->sl_mincsn.bv_val, se->se_csn.bv_val ); 1514 sl->sl_mincsn.bv_len = se->se_csn.bv_len; 1515 ch_free( se ); 1516 sl->sl_num--; 1517 } 1518 ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); 1519 } 1520} 1521 1522/* Just set a flag if we found the matching entry */ 1523static int 1524playlog_cb( Operation *op, SlapReply *rs ) 1525{ 1526 if ( rs->sr_type == REP_SEARCH ) { 1527 op->o_callback->sc_private = (void *)1; 1528 } 1529 return rs->sr_err; 1530} 1531 1532/* enter with sl->sl_mutex locked, release before returning */ 1533static void 1534syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, 1535 sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids ) 1536{ 1537 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1538 slog_entry *se; 1539 int i, j, ndel, num, nmods, mmods; 1540 char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; 1541 BerVarray uuids; 1542 struct berval delcsn[2]; 1543 1544 if ( !sl->sl_num ) { 1545 ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); 1546 return; 1547 } 1548 1549 num = sl->sl_num; 1550 i = 0; 1551 nmods = 0; 1552 1553 uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) + 1554 num * UUID_LEN, op->o_tmpmemctx ); 1555 uuids[0].bv_val = (char *)(uuids + num + 1); 1556 1557 delcsn[0].bv_len = 0; 1558 delcsn[0].bv_val = cbuf; 1559 BER_BVZERO(&delcsn[1]); 1560 1561 /* Make a copy of the relevant UUIDs. Put the Deletes up front 1562 * and everything else at the end. Do this first so we can 1563 * unlock the list mutex. 1564 */ 1565 Debug( LDAP_DEBUG_SYNC, "srs csn %s\n", 1566 srs->sr_state.ctxcsn[0].bv_val, 0, 0 ); 1567 for ( se=sl->sl_head; se; se=se->se_next ) { 1568 int k; 1569 Debug( LDAP_DEBUG_SYNC, "log csn %s\n", se->se_csn.bv_val, 0, 0 ); 1570 ndel = 1; 1571 for ( k=0; k<srs->sr_state.numcsns; k++ ) { 1572 if ( se->se_sid == srs->sr_state.sids[k] ) { 1573 ndel = ber_bvcmp( &se->se_csn, &srs->sr_state.ctxcsn[k] ); 1574 break; 1575 } 1576 } 1577 if ( ndel <= 0 ) { 1578 Debug( LDAP_DEBUG_SYNC, "cmp %d, too old\n", ndel, 0, 0 ); 1579 continue; 1580 } 1581 ndel = 0; 1582 for ( k=0; k<numcsns; k++ ) { 1583 if ( se->se_sid == sids[k] ) { 1584 ndel = ber_bvcmp( &se->se_csn, &ctxcsn[k] ); 1585 break; 1586 } 1587 } 1588 if ( ndel > 0 ) { 1589 Debug( LDAP_DEBUG_SYNC, "cmp %d, too new\n", ndel, 0, 0 ); 1590 break; 1591 } 1592 if ( se->se_tag == LDAP_REQ_DELETE ) { 1593 j = i; 1594 i++; 1595 AC_MEMCPY( cbuf, se->se_csn.bv_val, se->se_csn.bv_len ); 1596 delcsn[0].bv_len = se->se_csn.bv_len; 1597 delcsn[0].bv_val[delcsn[0].bv_len] = '\0'; 1598 } else { 1599 nmods++; 1600 j = num - nmods; 1601 } 1602 uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN); 1603 AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN); 1604 uuids[j].bv_len = UUID_LEN; 1605 } 1606 ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); 1607 1608 ndel = i; 1609 1610 /* Zero out unused slots */ 1611 for ( i=ndel; i < num - nmods; i++ ) 1612 uuids[i].bv_len = 0; 1613 1614 /* Mods must be validated to see if they belong in this delete set. 1615 */ 1616 1617 mmods = nmods; 1618 /* Strip any duplicates */ 1619 for ( i=0; i<nmods; i++ ) { 1620 for ( j=0; j<ndel; j++ ) { 1621 if ( bvmatch( &uuids[j], &uuids[num - 1 - i] )) { 1622 uuids[num - 1 - i].bv_len = 0; 1623 mmods --; 1624 break; 1625 } 1626 } 1627 if ( uuids[num - 1 - i].bv_len == 0 ) continue; 1628 for ( j=0; j<i; j++ ) { 1629 if ( bvmatch( &uuids[num - 1 - j], &uuids[num - 1 - i] )) { 1630 uuids[num - 1 - i].bv_len = 0; 1631 mmods --; 1632 break; 1633 } 1634 } 1635 } 1636 1637 if ( mmods ) { 1638 Operation fop; 1639 SlapReply frs = { REP_RESULT }; 1640 int rc; 1641 Filter mf, af; 1642 AttributeAssertion eq = ATTRIBUTEASSERTION_INIT; 1643 slap_callback cb = {0}; 1644 1645 fop = *op; 1646 1647 fop.o_sync_mode = 0; 1648 fop.o_callback = &cb; 1649 fop.ors_limit = NULL; 1650 fop.ors_tlimit = SLAP_NO_LIMIT; 1651 fop.ors_attrs = slap_anlist_all_attributes; 1652 fop.ors_attrsonly = 0; 1653 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 1654 1655 af.f_choice = LDAP_FILTER_AND; 1656 af.f_next = NULL; 1657 af.f_and = &mf; 1658 mf.f_choice = LDAP_FILTER_EQUALITY; 1659 mf.f_ava = &eq; 1660 mf.f_av_desc = slap_schema.si_ad_entryUUID; 1661 mf.f_next = fop.ors_filter; 1662 1663 fop.ors_filter = ⁡ 1664 1665 cb.sc_response = playlog_cb; 1666 fop.o_bd->bd_info = (BackendInfo *)on->on_info; 1667 1668 for ( i=ndel; i<num; i++ ) { 1669 if ( uuids[i].bv_len == 0 ) continue; 1670 1671 mf.f_av_value = uuids[i]; 1672 cb.sc_private = NULL; 1673 fop.ors_slimit = 1; 1674 frs.sr_nentries = 0; 1675 rc = fop.o_bd->be_search( &fop, &frs ); 1676 1677 /* If entry was not found, add to delete list */ 1678 if ( !cb.sc_private ) { 1679 uuids[ndel++] = uuids[i]; 1680 } 1681 } 1682 fop.o_bd->bd_info = (BackendInfo *)on; 1683 } 1684 if ( ndel ) { 1685 struct berval cookie; 1686 1687 if ( delcsn[0].bv_len ) { 1688 slap_compose_sync_cookie( op, &cookie, delcsn, srs->sr_state.rid, 1689 slap_serverID ? slap_serverID : -1 ); 1690 1691 Debug( LDAP_DEBUG_SYNC, "syncprov_playlog: cookie=%s\n", cookie.bv_val, 0, 0 ); 1692 } 1693 1694 uuids[ndel].bv_val = NULL; 1695 syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, 1696 delcsn[0].bv_len ? &cookie : NULL, 0, uuids, 1 ); 1697 if ( delcsn[0].bv_len ) { 1698 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 1699 } 1700 } 1701 op->o_tmpfree( uuids, op->o_tmpmemctx ); 1702} 1703 1704static int 1705syncprov_op_response( Operation *op, SlapReply *rs ) 1706{ 1707 opcookie *opc = op->o_callback->sc_private; 1708 slap_overinst *on = opc->son; 1709 syncprov_info_t *si = on->on_bi.bi_private; 1710 syncmatches *sm; 1711 1712 if ( rs->sr_err == LDAP_SUCCESS ) 1713 { 1714 struct berval maxcsn; 1715 char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; 1716 int do_check = 0, have_psearches, foundit, csn_changed = 0; 1717 1718 ldap_pvt_thread_mutex_lock( &si->si_resp_mutex ); 1719 1720 /* Update our context CSN */ 1721 cbuf[0] = '\0'; 1722 maxcsn.bv_val = cbuf; 1723 maxcsn.bv_len = sizeof(cbuf); 1724 ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock ); 1725 1726 slap_get_commit_csn( op, &maxcsn, &foundit ); 1727 if ( BER_BVISEMPTY( &maxcsn ) && SLAP_GLUE_SUBORDINATE( op->o_bd )) { 1728 /* syncrepl queues the CSN values in the db where 1729 * it is configured , not where the changes are made. 1730 * So look for a value in the glue db if we didn't 1731 * find any in this db. 1732 */ 1733 BackendDB *be = op->o_bd; 1734 op->o_bd = select_backend( &be->be_nsuffix[0], 1); 1735 maxcsn.bv_val = cbuf; 1736 maxcsn.bv_len = sizeof(cbuf); 1737 slap_get_commit_csn( op, &maxcsn, &foundit ); 1738 op->o_bd = be; 1739 } 1740 if ( !BER_BVISEMPTY( &maxcsn ) ) { 1741 int i, sid; 1742#ifdef CHECK_CSN 1743 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 1744 assert( !syn->ssyn_validate( syn, &maxcsn )); 1745#endif 1746 sid = slap_parse_csn_sid( &maxcsn ); 1747 for ( i=0; i<si->si_numcsns; i++ ) { 1748 if ( sid == si->si_sids[i] ) { 1749 if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn[i] ) > 0 ) { 1750 ber_bvreplace( &si->si_ctxcsn[i], &maxcsn ); 1751 csn_changed = 1; 1752 } 1753 break; 1754 } 1755 } 1756 /* It's a new SID for us */ 1757 if ( i == si->si_numcsns ) { 1758 value_add_one( &si->si_ctxcsn, &maxcsn ); 1759 csn_changed = 1; 1760 si->si_numcsns++; 1761 si->si_sids = ch_realloc( si->si_sids, si->si_numcsns * 1762 sizeof(int)); 1763 si->si_sids[i] = sid; 1764 } 1765 } 1766 1767 /* Don't do any processing for consumer contextCSN updates */ 1768 if ( op->o_dont_replicate ) { 1769 if ( op->o_tag == LDAP_REQ_MODIFY && 1770 op->orm_modlist->sml_op == LDAP_MOD_REPLACE && 1771 op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) { 1772 /* Catch contextCSN updates from syncrepl. We have to look at 1773 * all the attribute values, as there may be more than one csn 1774 * that changed, and only one can be passed in the csn queue. 1775 */ 1776 Modifications *mod = op->orm_modlist; 1777 int i, j, sid; 1778 1779 for ( i=0; i<mod->sml_numvals; i++ ) { 1780 sid = slap_parse_csn_sid( &mod->sml_values[i] ); 1781 for ( j=0; j<si->si_numcsns; j++ ) { 1782 if ( sid == si->si_sids[j] ) { 1783 if ( ber_bvcmp( &mod->sml_values[i], &si->si_ctxcsn[j] ) > 0 ) { 1784 ber_bvreplace( &si->si_ctxcsn[j], &mod->sml_values[i] ); 1785 csn_changed = 1; 1786 } 1787 break; 1788 } 1789 } 1790 1791 if ( j == si->si_numcsns ) { 1792 value_add_one( &si->si_ctxcsn, &mod->sml_values[i] ); 1793 si->si_numcsns++; 1794 si->si_sids = ch_realloc( si->si_sids, si->si_numcsns * 1795 sizeof(int)); 1796 si->si_sids[j] = sid; 1797 csn_changed = 1; 1798 } 1799 } 1800 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 1801 1802 if ( csn_changed ) { 1803 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1804 have_psearches = ( si->si_ops != NULL ); 1805 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1806 1807 if ( have_psearches ) { 1808 for ( sm = opc->smatches; sm; sm=sm->sm_next ) { 1809 if ( sm->sm_op->s_op->o_abandon ) 1810 continue; 1811 syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_NEW_COOKIE ); 1812 } 1813 } 1814 } 1815 } else { 1816 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 1817 } 1818 goto leave; 1819 } 1820 1821 si->si_numops++; 1822 if ( si->si_chkops || si->si_chktime ) { 1823 /* Never checkpoint adding the context entry, 1824 * it will deadlock 1825 */ 1826 if ( op->o_tag != LDAP_REQ_ADD || 1827 !dn_match( &op->o_req_ndn, &si->si_contextdn )) { 1828 if ( si->si_chkops && si->si_numops >= si->si_chkops ) { 1829 do_check = 1; 1830 si->si_numops = 0; 1831 } 1832 if ( si->si_chktime && 1833 (op->o_time - si->si_chklast >= si->si_chktime )) { 1834 if ( si->si_chklast ) { 1835 do_check = 1; 1836 si->si_chklast = op->o_time; 1837 } else { 1838 si->si_chklast = 1; 1839 } 1840 } 1841 } 1842 } 1843 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 1844 1845 if ( do_check ) { 1846 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 1847 syncprov_checkpoint( op, rs, on ); 1848 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 1849 } 1850 1851 /* only update consumer ctx if this is a newer csn */ 1852 if ( csn_changed ) { 1853 opc->sctxcsn = maxcsn; 1854 } 1855 1856 /* Handle any persistent searches */ 1857 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1858 have_psearches = ( si->si_ops != NULL ); 1859 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1860 if ( have_psearches ) { 1861 switch(op->o_tag) { 1862 case LDAP_REQ_ADD: 1863 case LDAP_REQ_MODIFY: 1864 case LDAP_REQ_MODRDN: 1865 case LDAP_REQ_EXTENDED: 1866 syncprov_matchops( op, opc, 0 ); 1867 break; 1868 case LDAP_REQ_DELETE: 1869 /* for each match in opc->smatches: 1870 * send DELETE msg 1871 */ 1872 for ( sm = opc->smatches; sm; sm=sm->sm_next ) { 1873 if ( sm->sm_op->s_op->o_abandon ) 1874 continue; 1875 syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE ); 1876 } 1877 break; 1878 } 1879 } 1880 1881 /* Add any log records */ 1882 if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) { 1883 syncprov_add_slog( op ); 1884 } 1885leave: ldap_pvt_thread_mutex_unlock( &si->si_resp_mutex ); 1886 } 1887 return SLAP_CB_CONTINUE; 1888} 1889 1890/* We don't use a subentry to store the context CSN any more. 1891 * We expose the current context CSN as an operational attribute 1892 * of the suffix entry. 1893 */ 1894static int 1895syncprov_op_compare( Operation *op, SlapReply *rs ) 1896{ 1897 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1898 syncprov_info_t *si = on->on_bi.bi_private; 1899 int rc = SLAP_CB_CONTINUE; 1900 1901 if ( dn_match( &op->o_req_ndn, &si->si_contextdn ) && 1902 op->oq_compare.rs_ava->aa_desc == slap_schema.si_ad_contextCSN ) 1903 { 1904 Entry e = {0}; 1905 Attribute a = {0}; 1906 1907 e.e_name = si->si_contextdn; 1908 e.e_nname = si->si_contextdn; 1909 e.e_attrs = &a; 1910 1911 a.a_desc = slap_schema.si_ad_contextCSN; 1912 1913 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 1914 1915 a.a_vals = si->si_ctxcsn; 1916 a.a_nvals = a.a_vals; 1917 a.a_numvals = si->si_numcsns; 1918 1919 rs->sr_err = access_allowed( op, &e, op->oq_compare.rs_ava->aa_desc, 1920 &op->oq_compare.rs_ava->aa_value, ACL_COMPARE, NULL ); 1921 if ( ! rs->sr_err ) { 1922 rs->sr_err = LDAP_INSUFFICIENT_ACCESS; 1923 goto return_results; 1924 } 1925 1926 if ( get_assert( op ) && 1927 ( test_filter( op, &e, get_assertion( op ) ) != LDAP_COMPARE_TRUE ) ) 1928 { 1929 rs->sr_err = LDAP_ASSERTION_FAILED; 1930 goto return_results; 1931 } 1932 1933 1934 rs->sr_err = LDAP_COMPARE_FALSE; 1935 1936 if ( attr_valfind( &a, 1937 SLAP_MR_ATTRIBUTE_VALUE_NORMALIZED_MATCH | 1938 SLAP_MR_ASSERTED_VALUE_NORMALIZED_MATCH, 1939 &op->oq_compare.rs_ava->aa_value, NULL, op->o_tmpmemctx ) == 0 ) 1940 { 1941 rs->sr_err = LDAP_COMPARE_TRUE; 1942 } 1943 1944return_results:; 1945 1946 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 1947 1948 send_ldap_result( op, rs ); 1949 1950 if( rs->sr_err == LDAP_COMPARE_FALSE || rs->sr_err == LDAP_COMPARE_TRUE ) { 1951 rs->sr_err = LDAP_SUCCESS; 1952 } 1953 rc = rs->sr_err; 1954 } 1955 1956 return rc; 1957} 1958 1959static int 1960syncprov_op_mod( Operation *op, SlapReply *rs ) 1961{ 1962 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1963 syncprov_info_t *si = on->on_bi.bi_private; 1964 slap_callback *cb; 1965 opcookie *opc; 1966 int have_psearches, cbsize; 1967 1968 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1969 have_psearches = ( si->si_ops != NULL ); 1970 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1971 1972 cbsize = sizeof(slap_callback) + sizeof(opcookie) + 1973 (have_psearches ? sizeof(modinst) : 0 ); 1974 1975 cb = op->o_tmpcalloc(1, cbsize, op->o_tmpmemctx); 1976 opc = (opcookie *)(cb+1); 1977 opc->son = on; 1978 cb->sc_response = syncprov_op_response; 1979 cb->sc_cleanup = syncprov_op_cleanup; 1980 cb->sc_private = opc; 1981 cb->sc_next = op->o_callback; 1982 op->o_callback = cb; 1983 1984 opc->osid = -1; 1985 opc->rsid = -1; 1986 if ( op->o_csn.bv_val ) { 1987 opc->osid = slap_parse_csn_sid( &op->o_csn ); 1988 } 1989 if ( op->o_controls ) { 1990 struct sync_cookie *scook = 1991 op->o_controls[slap_cids.sc_LDAPsync]; 1992 if ( scook ) 1993 opc->rsid = scook->sid; 1994 } 1995 1996 /* If there are active persistent searches, lock this operation. 1997 * See seqmod.c for the locking logic on its own. 1998 */ 1999 if ( have_psearches ) { 2000 modtarget *mt, mtdummy; 2001 modinst *mi; 2002 2003 mi = (modinst *)(opc+1); 2004 mi->mi_op = op; 2005 2006 /* See if we're already modifying this entry... */ 2007 mtdummy.mt_op = op; 2008 ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); 2009 mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); 2010 if ( mt ) { 2011 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 2012 if ( mt->mt_mods == NULL ) { 2013 /* Cannot reuse this mt, as another thread is about 2014 * to release it in syncprov_op_cleanup. 2015 */ 2016 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2017 mt = NULL; 2018 } 2019 } 2020 if ( mt ) { 2021 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 2022 mt->mt_tail->mi_next = mi; 2023 mt->mt_tail = mi; 2024 /* wait for this op to get to head of list */ 2025 while ( mt->mt_mods != mi ) { 2026 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2027 /* FIXME: if dynamic config can delete overlays or 2028 * databases we'll have to check for cleanup here. 2029 * Currently it's not an issue because there are 2030 * no dynamic config deletes... 2031 */ 2032 if ( slapd_shutdown ) 2033 return SLAPD_ABANDON; 2034 2035 if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool )) 2036 ldap_pvt_thread_yield(); 2037 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 2038 2039 /* clean up if the caller is giving up */ 2040 if ( op->o_abandon ) { 2041 modinst *m2; 2042 for ( m2 = mt->mt_mods; m2->mi_next != mi; 2043 m2 = m2->mi_next ); 2044 m2->mi_next = mi->mi_next; 2045 if ( mt->mt_tail == mi ) mt->mt_tail = m2; 2046 op->o_tmpfree( cb, op->o_tmpmemctx ); 2047 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2048 return SLAPD_ABANDON; 2049 } 2050 } 2051 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2052 } else { 2053 /* Record that we're modifying this entry now */ 2054 mt = ch_malloc( sizeof(modtarget) ); 2055 mt->mt_mods = mi; 2056 mt->mt_tail = mi; 2057 mt->mt_op = mi->mi_op; 2058 ldap_pvt_thread_mutex_init( &mt->mt_mutex ); 2059 avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error ); 2060 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 2061 } 2062 opc->smt = mt; 2063 } 2064 2065 if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD ) 2066 syncprov_matchops( op, opc, 1 ); 2067 2068 return SLAP_CB_CONTINUE; 2069} 2070 2071static int 2072syncprov_op_extended( Operation *op, SlapReply *rs ) 2073{ 2074 if ( exop_is_write( op )) 2075 return syncprov_op_mod( op, rs ); 2076 2077 return SLAP_CB_CONTINUE; 2078} 2079 2080typedef struct searchstate { 2081 slap_overinst *ss_on; 2082 syncops *ss_so; 2083 BerVarray ss_ctxcsn; 2084 int *ss_sids; 2085 int ss_numcsns; 2086#define SS_PRESENT 0x01 2087#define SS_CHANGED 0x02 2088 int ss_flags; 2089} searchstate; 2090 2091static int 2092syncprov_search_cleanup( Operation *op, SlapReply *rs ) 2093{ 2094 if ( rs->sr_ctrls ) { 2095 op->o_tmpfree( rs->sr_ctrls[0], op->o_tmpmemctx ); 2096 op->o_tmpfree( rs->sr_ctrls, op->o_tmpmemctx ); 2097 rs->sr_ctrls = NULL; 2098 } 2099 return 0; 2100} 2101 2102typedef struct SyncOperationBuffer { 2103 Operation sob_op; 2104 Opheader sob_hdr; 2105 OpExtra sob_oe; 2106 AttributeName sob_extra; /* not always present */ 2107 /* Further data allocated here */ 2108} SyncOperationBuffer; 2109 2110static void 2111syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on ) 2112{ 2113 SyncOperationBuffer *sopbuf2; 2114 Operation *op2; 2115 int i, alen = 0; 2116 size_t size; 2117 char *ptr; 2118 GroupAssertion *g1, *g2; 2119 2120 /* count the search attrs */ 2121 for (i=0; op->ors_attrs && !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) { 2122 alen += op->ors_attrs[i].an_name.bv_len + 1; 2123 } 2124 /* Make a new copy of the operation */ 2125 size = offsetof( SyncOperationBuffer, sob_extra ) + 2126 (i ? ( (i+1) * sizeof(AttributeName) + alen) : 0) + 2127 op->o_req_dn.bv_len + 1 + 2128 op->o_req_ndn.bv_len + 1 + 2129 op->o_ndn.bv_len + 1 + 2130 so->s_filterstr.bv_len + 1; 2131 sopbuf2 = ch_calloc( 1, size ); 2132 op2 = &sopbuf2->sob_op; 2133 op2->o_hdr = &sopbuf2->sob_hdr; 2134 LDAP_SLIST_FIRST(&op2->o_extra) = &sopbuf2->sob_oe; 2135 2136 /* Copy the fields we care about explicitly, leave the rest alone */ 2137 *op2->o_hdr = *op->o_hdr; 2138 op2->o_tag = op->o_tag; 2139 op2->o_time = op->o_time; 2140 op2->o_bd = on->on_info->oi_origdb; 2141 op2->o_request = op->o_request; 2142 op2->o_managedsait = op->o_managedsait; 2143 LDAP_SLIST_FIRST(&op2->o_extra)->oe_key = on; 2144 LDAP_SLIST_NEXT(LDAP_SLIST_FIRST(&op2->o_extra), oe_next) = NULL; 2145 2146 ptr = (char *) sopbuf2 + offsetof( SyncOperationBuffer, sob_extra ); 2147 if ( i ) { 2148 op2->ors_attrs = (AttributeName *) ptr; 2149 ptr = (char *) &op2->ors_attrs[i+1]; 2150 for (i=0; !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) { 2151 op2->ors_attrs[i] = op->ors_attrs[i]; 2152 op2->ors_attrs[i].an_name.bv_val = ptr; 2153 ptr = lutil_strcopy( ptr, op->ors_attrs[i].an_name.bv_val ) + 1; 2154 } 2155 BER_BVZERO( &op2->ors_attrs[i].an_name ); 2156 } 2157 2158 op2->o_authz = op->o_authz; 2159 op2->o_ndn.bv_val = ptr; 2160 ptr = lutil_strcopy(ptr, op->o_ndn.bv_val) + 1; 2161 op2->o_dn = op2->o_ndn; 2162 op2->o_req_dn.bv_len = op->o_req_dn.bv_len; 2163 op2->o_req_dn.bv_val = ptr; 2164 ptr = lutil_strcopy(ptr, op->o_req_dn.bv_val) + 1; 2165 op2->o_req_ndn.bv_len = op->o_req_ndn.bv_len; 2166 op2->o_req_ndn.bv_val = ptr; 2167 ptr = lutil_strcopy(ptr, op->o_req_ndn.bv_val) + 1; 2168 op2->ors_filterstr.bv_val = ptr; 2169 strcpy( ptr, so->s_filterstr.bv_val ); 2170 op2->ors_filterstr.bv_len = so->s_filterstr.bv_len; 2171 2172 /* Skip the AND/GE clause that we stuck on in front */ 2173 if ( so->s_flags & PS_FIX_FILTER ) { 2174 op2->ors_filter = op->ors_filter->f_and->f_next; 2175 so->s_flags ^= PS_FIX_FILTER; 2176 } else { 2177 op2->ors_filter = op->ors_filter; 2178 } 2179 op2->ors_filter = filter_dup( op2->ors_filter, NULL ); 2180 so->s_op = op2; 2181 2182 /* Copy any cached group ACLs individually */ 2183 op2->o_groups = NULL; 2184 for ( g1=op->o_groups; g1; g1=g1->ga_next ) { 2185 g2 = ch_malloc( sizeof(GroupAssertion) + g1->ga_len ); 2186 *g2 = *g1; 2187 strcpy( g2->ga_ndn, g1->ga_ndn ); 2188 g2->ga_next = op2->o_groups; 2189 op2->o_groups = g2; 2190 } 2191 /* Don't allow any further group caching */ 2192 op2->o_do_not_cache = 1; 2193 2194 /* Add op2 to conn so abandon will find us */ 2195 op->o_conn->c_n_ops_executing++; 2196 op->o_conn->c_n_ops_completed--; 2197 LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next ); 2198 so->s_flags |= PS_IS_DETACHED; 2199 2200 /* Prevent anyone else from trying to send a result for this op */ 2201 op->o_abandon = 1; 2202} 2203 2204static int 2205syncprov_search_response( Operation *op, SlapReply *rs ) 2206{ 2207 searchstate *ss = op->o_callback->sc_private; 2208 slap_overinst *on = ss->ss_on; 2209 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2210 sync_control *srs = op->o_controls[slap_cids.sc_LDAPsync]; 2211 2212 if ( rs->sr_type == REP_SEARCH || rs->sr_type == REP_SEARCHREF ) { 2213 Attribute *a; 2214 /* If we got a referral without a referral object, there's 2215 * something missing that we cannot replicate. Just ignore it. 2216 * The consumer will abort because we didn't send the expected 2217 * control. 2218 */ 2219 if ( !rs->sr_entry ) { 2220 assert( rs->sr_entry != NULL ); 2221 Debug( LDAP_DEBUG_ANY, "bogus referral in context\n",0,0,0 ); 2222 return SLAP_CB_CONTINUE; 2223 } 2224 a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN ); 2225 if ( a == NULL && rs->sr_operational_attrs != NULL ) { 2226 a = attr_find( rs->sr_operational_attrs, slap_schema.si_ad_entryCSN ); 2227 } 2228 if ( a ) { 2229 int i, sid; 2230 sid = slap_parse_csn_sid( &a->a_nvals[0] ); 2231 2232 /* Don't send changed entries back to the originator */ 2233 if ( sid == srs->sr_state.sid && srs->sr_state.numcsns ) { 2234 Debug( LDAP_DEBUG_SYNC, 2235 "Entry %s changed by peer, ignored\n", 2236 rs->sr_entry->e_name.bv_val, 0, 0 ); 2237 return LDAP_SUCCESS; 2238 } 2239 2240 /* If not a persistent search */ 2241 if ( !ss->ss_so ) { 2242 /* Make sure entry is less than the snapshot'd contextCSN */ 2243 for ( i=0; i<ss->ss_numcsns; i++ ) { 2244 if ( sid == ss->ss_sids[i] && ber_bvcmp( &a->a_nvals[0], 2245 &ss->ss_ctxcsn[i] ) > 0 ) { 2246 Debug( LDAP_DEBUG_SYNC, 2247 "Entry %s CSN %s greater than snapshot %s\n", 2248 rs->sr_entry->e_name.bv_val, 2249 a->a_nvals[0].bv_val, 2250 ss->ss_ctxcsn[i].bv_val ); 2251 return LDAP_SUCCESS; 2252 } 2253 } 2254 } 2255 2256 /* Don't send old entries twice */ 2257 if ( srs->sr_state.ctxcsn ) { 2258 for ( i=0; i<srs->sr_state.numcsns; i++ ) { 2259 if ( sid == srs->sr_state.sids[i] && 2260 ber_bvcmp( &a->a_nvals[0], 2261 &srs->sr_state.ctxcsn[i] )<= 0 ) { 2262 Debug( LDAP_DEBUG_SYNC, 2263 "Entry %s CSN %s older or equal to ctx %s\n", 2264 rs->sr_entry->e_name.bv_val, 2265 a->a_nvals[0].bv_val, 2266 srs->sr_state.ctxcsn[i].bv_val ); 2267 return LDAP_SUCCESS; 2268 } 2269 } 2270 } 2271 } 2272 rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, 2273 op->o_tmpmemctx ); 2274 rs->sr_ctrls[1] = NULL; 2275 /* If we're in delta-sync mode, always send a cookie */ 2276 if ( si->si_nopres && si->si_usehint && a ) { 2277 struct berval cookie; 2278 slap_compose_sync_cookie( op, &cookie, a->a_nvals, srs->sr_state.rid, slap_serverID ? slap_serverID : -1 ); 2279 rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry, 2280 LDAP_SYNC_ADD, rs->sr_ctrls, 0, 1, &cookie ); 2281 } else { 2282 rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry, 2283 LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL ); 2284 } 2285 } else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) { 2286 struct berval cookie = BER_BVNULL; 2287 2288 if ( ( ss->ss_flags & SS_CHANGED ) && 2289 ss->ss_ctxcsn && !BER_BVISNULL( &ss->ss_ctxcsn[0] )) { 2290 slap_compose_sync_cookie( op, &cookie, ss->ss_ctxcsn, 2291 srs->sr_state.rid, slap_serverID ? slap_serverID : -1 ); 2292 2293 Debug( LDAP_DEBUG_SYNC, "syncprov_search_response: cookie=%s\n", cookie.bv_val, 0, 0 ); 2294 } 2295 2296 /* Is this a regular refresh? 2297 * Note: refresh never gets here if there were no changes 2298 */ 2299 if ( !ss->ss_so ) { 2300 rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, 2301 op->o_tmpmemctx ); 2302 rs->sr_ctrls[1] = NULL; 2303 rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls, 2304 0, 1, &cookie, ( ss->ss_flags & SS_PRESENT ) ? LDAP_SYNC_REFRESH_PRESENTS : 2305 LDAP_SYNC_REFRESH_DELETES ); 2306 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 2307 } else { 2308 /* It's RefreshAndPersist, transition to Persist phase */ 2309 syncprov_sendinfo( op, rs, ( ss->ss_flags & SS_PRESENT ) ? 2310 LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, 2311 ( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL, 2312 1, NULL, 0 ); 2313 if ( !BER_BVISNULL( &cookie )) 2314 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 2315 2316 /* Detach this Op from frontend control */ 2317 ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex ); 2318 2319 /* But not if this connection was closed along the way */ 2320 if ( op->o_abandon ) { 2321 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); 2322 /* syncprov_ab_cleanup will free this syncop */ 2323 return SLAPD_ABANDON; 2324 2325 } else { 2326 ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex ); 2327 /* Turn off the refreshing flag */ 2328 ss->ss_so->s_flags ^= PS_IS_REFRESHING; 2329 2330 syncprov_detach_op( op, ss->ss_so, on ); 2331 2332 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); 2333 2334 /* If there are queued responses, fire them off */ 2335 if ( ss->ss_so->s_res ) 2336 syncprov_qstart( ss->ss_so ); 2337 ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex ); 2338 } 2339 2340 return LDAP_SUCCESS; 2341 } 2342 } 2343 2344 return SLAP_CB_CONTINUE; 2345} 2346 2347static int 2348syncprov_op_search( Operation *op, SlapReply *rs ) 2349{ 2350 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2351 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2352 slap_callback *cb; 2353 int gotstate = 0, changed = 0, do_present = 0; 2354 syncops *sop = NULL; 2355 searchstate *ss; 2356 sync_control *srs; 2357 BerVarray ctxcsn; 2358 int i, *sids, numcsns; 2359 struct berval mincsn; 2360 2361 if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE; 2362 2363 if ( op->ors_deref & LDAP_DEREF_SEARCHING ) { 2364 send_ldap_error( op, rs, LDAP_PROTOCOL_ERROR, "illegal value for derefAliases" ); 2365 return rs->sr_err; 2366 } 2367 2368 srs = op->o_controls[slap_cids.sc_LDAPsync]; 2369 2370 /* If this is a persistent search, set it up right away */ 2371 if ( op->o_sync_mode & SLAP_SYNC_PERSIST ) { 2372 syncops so = {0}; 2373 fbase_cookie fc; 2374 opcookie opc; 2375 slap_callback sc; 2376 2377 fc.fss = &so; 2378 fc.fbase = 0; 2379 so.s_eid = NOID; 2380 so.s_op = op; 2381 so.s_flags = PS_IS_REFRESHING | PS_FIND_BASE; 2382 /* syncprov_findbase expects to be called as a callback... */ 2383 sc.sc_private = &opc; 2384 opc.son = on; 2385 ldap_pvt_thread_mutex_init( &so.s_mutex ); 2386 cb = op->o_callback; 2387 op->o_callback = ≻ 2388 rs->sr_err = syncprov_findbase( op, &fc ); 2389 op->o_callback = cb; 2390 ldap_pvt_thread_mutex_destroy( &so.s_mutex ); 2391 2392 if ( rs->sr_err != LDAP_SUCCESS ) { 2393 send_ldap_result( op, rs ); 2394 return rs->sr_err; 2395 } 2396 sop = ch_malloc( sizeof( syncops )); 2397 *sop = so; 2398 ldap_pvt_thread_mutex_init( &sop->s_mutex ); 2399 sop->s_rid = srs->sr_state.rid; 2400 sop->s_sid = srs->sr_state.sid; 2401 sop->s_inuse = 1; 2402 2403 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 2404 sop->s_next = si->si_ops; 2405 si->si_ops = sop; 2406 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 2407 } 2408 2409 /* snapshot the ctxcsn */ 2410 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 2411 numcsns = si->si_numcsns; 2412 if ( numcsns ) { 2413 ber_bvarray_dup_x( &ctxcsn, si->si_ctxcsn, op->o_tmpmemctx ); 2414 sids = op->o_tmpalloc( numcsns * sizeof(int), op->o_tmpmemctx ); 2415 for ( i=0; i<numcsns; i++ ) 2416 sids[i] = si->si_sids[i]; 2417 } else { 2418 ctxcsn = NULL; 2419 sids = NULL; 2420 } 2421 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 2422 2423 /* If we have a cookie, handle the PRESENT lookups */ 2424 if ( srs->sr_state.ctxcsn ) { 2425 sessionlog *sl; 2426 int i, j; 2427 2428 /* If we don't have any CSN of our own yet, pretend nothing 2429 * has changed. 2430 */ 2431 if ( !numcsns ) 2432 goto no_change; 2433 2434 if ( !si->si_nopres ) 2435 do_present = SS_PRESENT; 2436 2437 /* If there are SIDs we don't recognize in the cookie, drop them */ 2438 for (i=0; i<srs->sr_state.numcsns; ) { 2439 for (j=0; j<numcsns; j++) { 2440 if ( srs->sr_state.sids[i] == sids[j] ) { 2441 break; 2442 } 2443 } 2444 /* not found */ 2445 if ( j == numcsns ) { 2446 struct berval tmp = srs->sr_state.ctxcsn[i]; 2447 j = srs->sr_state.numcsns - 1; 2448 srs->sr_state.ctxcsn[i] = srs->sr_state.ctxcsn[j]; 2449 tmp.bv_len = 0; 2450 srs->sr_state.ctxcsn[j] = tmp; 2451 srs->sr_state.numcsns = j; 2452 srs->sr_state.sids[i] = srs->sr_state.sids[j]; 2453 continue; 2454 } 2455 i++; 2456 } 2457 2458 /* Find the smallest CSN */ 2459 mincsn = srs->sr_state.ctxcsn[0]; 2460 for ( i=1; i<srs->sr_state.numcsns; i++ ) { 2461 if ( ber_bvcmp( &mincsn, &srs->sr_state.ctxcsn[i] ) > 0 ) 2462 mincsn = srs->sr_state.ctxcsn[i]; 2463 } 2464 2465 /* If nothing has changed, shortcut it */ 2466 if ( srs->sr_state.numcsns == numcsns ) { 2467 int i, j, newer; 2468 for ( i=0; i<srs->sr_state.numcsns; i++ ) { 2469 for ( j=0; j<numcsns; j++ ) { 2470 if ( srs->sr_state.sids[i] != sids[j] ) 2471 continue; 2472 newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] ); 2473 /* If our state is newer, tell consumer about changes */ 2474 if ( newer < 0 ) 2475 changed = SS_CHANGED; 2476 else if ( newer > 0 ) { 2477 /* our state is older, tell consumer nothing */ 2478 rs->sr_err = LDAP_SUCCESS; 2479bailout: 2480 if ( sop ) { 2481 syncops **sp = &si->si_ops; 2482 2483 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 2484 while ( *sp != sop ) 2485 sp = &(*sp)->s_next; 2486 *sp = sop->s_next; 2487 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 2488 ch_free( sop ); 2489 } 2490 rs->sr_ctrls = NULL; 2491 send_ldap_result( op, rs ); 2492 return rs->sr_err; 2493 } 2494 break; 2495 } 2496 if ( changed ) 2497 break; 2498 } 2499 if ( !changed ) { 2500 do_present = 0; 2501no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { 2502 LDAPControl *ctrls[2]; 2503 2504 ctrls[0] = NULL; 2505 ctrls[1] = NULL; 2506 syncprov_done_ctrl( op, rs, ctrls, 0, 0, 2507 NULL, LDAP_SYNC_REFRESH_DELETES ); 2508 rs->sr_ctrls = ctrls; 2509 rs->sr_err = LDAP_SUCCESS; 2510 send_ldap_result( op, rs ); 2511 rs->sr_ctrls = NULL; 2512 return rs->sr_err; 2513 } 2514 goto shortcut; 2515 } 2516 } else { 2517 /* consumer doesn't have the right number of CSNs */ 2518 changed = SS_CHANGED; 2519 } 2520 /* Do we have a sessionlog for this search? */ 2521 sl=si->si_logs; 2522 if ( sl ) { 2523 ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); 2524 /* Are there any log entries, and is the consumer state 2525 * present in the session log? 2526 */ 2527 if ( sl->sl_num > 0 && ber_bvcmp( &mincsn, &sl->sl_mincsn ) >= 0 ) { 2528 do_present = 0; 2529 /* mutex is unlocked in playlog */ 2530 syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids ); 2531 } else { 2532 ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); 2533 } 2534 } 2535 /* Is the CSN still present in the database? */ 2536 if ( syncprov_findcsn( op, FIND_CSN ) != LDAP_SUCCESS ) { 2537 /* No, so a reload is required */ 2538 /* the 2.2 consumer doesn't send this hint */ 2539 if ( si->si_usehint && srs->sr_rhint == 0 ) { 2540 if ( ctxcsn ) 2541 ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx ); 2542 if ( sids ) 2543 op->o_tmpfree( sids, op->o_tmpmemctx ); 2544 rs->sr_err = LDAP_SYNC_REFRESH_REQUIRED; 2545 rs->sr_text = "sync cookie is stale"; 2546 goto bailout; 2547 } 2548 if ( srs->sr_state.ctxcsn ) { 2549 ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx ); 2550 srs->sr_state.ctxcsn = NULL; 2551 } 2552 if ( srs->sr_state.sids ) { 2553 slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx ); 2554 srs->sr_state.sids = NULL; 2555 } 2556 srs->sr_state.numcsns = 0; 2557 } else { 2558 gotstate = 1; 2559 /* If changed and doing Present lookup, send Present UUIDs */ 2560 if ( do_present && syncprov_findcsn( op, FIND_PRESENT ) != 2561 LDAP_SUCCESS ) { 2562 if ( ctxcsn ) 2563 ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx ); 2564 if ( sids ) 2565 op->o_tmpfree( sids, op->o_tmpmemctx ); 2566 goto bailout; 2567 } 2568 } 2569 } else { 2570 /* No consumer state, assume something has changed */ 2571 changed = SS_CHANGED; 2572 } 2573 2574shortcut: 2575 /* Append CSN range to search filter, save original filter 2576 * for persistent search evaluation 2577 */ 2578 if ( sop ) { 2579 sop->s_filterstr= op->ors_filterstr; 2580 } 2581 2582 /* If something changed, find the changes */ 2583 if ( gotstate && changed ) { 2584 Filter *fand, *fava; 2585 2586 fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx ); 2587 fand->f_choice = LDAP_FILTER_AND; 2588 fand->f_next = NULL; 2589 fava = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx ); 2590 fand->f_and = fava; 2591 fava->f_choice = LDAP_FILTER_GE; 2592 fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx ); 2593 fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN; 2594#ifdef LDAP_COMP_MATCH 2595 fava->f_ava->aa_cf = NULL; 2596#endif 2597 ber_dupbv_x( &fava->f_ava->aa_value, &mincsn, op->o_tmpmemctx ); 2598 fava->f_next = op->ors_filter; 2599 op->ors_filter = fand; 2600 filter2bv_x( op, op->ors_filter, &op->ors_filterstr ); 2601 if ( sop ) 2602 sop->s_flags |= PS_FIX_FILTER; 2603 } 2604 2605 /* Let our callback add needed info to returned entries */ 2606 cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(searchstate), op->o_tmpmemctx); 2607 ss = (searchstate *)(cb+1); 2608 ss->ss_on = on; 2609 ss->ss_so = sop; 2610 ss->ss_flags = do_present | changed; 2611 ss->ss_ctxcsn = ctxcsn; 2612 ss->ss_numcsns = numcsns; 2613 ss->ss_sids = sids; 2614 cb->sc_response = syncprov_search_response; 2615 cb->sc_cleanup = syncprov_search_cleanup; 2616 cb->sc_private = ss; 2617 cb->sc_next = op->o_callback; 2618 op->o_callback = cb; 2619 2620 /* If this is a persistent search and no changes were reported during 2621 * the refresh phase, just invoke the response callback to transition 2622 * us into persist phase 2623 */ 2624 if ( !changed ) { 2625 rs->sr_err = LDAP_SUCCESS; 2626 rs->sr_nentries = 0; 2627 send_ldap_result( op, rs ); 2628 return rs->sr_err; 2629 } 2630 return SLAP_CB_CONTINUE; 2631} 2632 2633static int 2634syncprov_operational( 2635 Operation *op, 2636 SlapReply *rs ) 2637{ 2638 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2639 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2640 2641 /* This prevents generating unnecessarily; frontend will strip 2642 * any statically stored copy. 2643 */ 2644 if ( op->o_sync != SLAP_CONTROL_NONE ) 2645 return SLAP_CB_CONTINUE; 2646 2647 if ( rs->sr_entry && 2648 dn_match( &rs->sr_entry->e_nname, &si->si_contextdn )) { 2649 2650 if ( SLAP_OPATTRS( rs->sr_attr_flags ) || 2651 ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) { 2652 Attribute *a, **ap = NULL; 2653 2654 for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) { 2655 if ( a->a_desc == slap_schema.si_ad_contextCSN ) 2656 break; 2657 } 2658 2659 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 2660 if ( si->si_ctxcsn ) { 2661 if ( !a ) { 2662 for ( ap = &rs->sr_operational_attrs; *ap; 2663 ap=&(*ap)->a_next ); 2664 2665 a = attr_alloc( slap_schema.si_ad_contextCSN ); 2666 *ap = a; 2667 } 2668 2669 if ( !ap ) { 2670 if ( !(rs->sr_flags & REP_ENTRY_MODIFIABLE) ) { 2671 Entry *e = entry_dup( rs->sr_entry ); 2672 if ( rs->sr_flags & REP_ENTRY_MUSTRELEASE ) { 2673 overlay_entry_release_ov( op, rs->sr_entry, 0, on ); 2674 rs->sr_flags ^= REP_ENTRY_MUSTRELEASE; 2675 } else if ( rs->sr_flags & REP_ENTRY_MUSTBEFREED ) { 2676 entry_free( rs->sr_entry ); 2677 } 2678 rs->sr_entry = e; 2679 rs->sr_flags |= 2680 REP_ENTRY_MODIFIABLE|REP_ENTRY_MUSTBEFREED; 2681 a = attr_find( rs->sr_entry->e_attrs, 2682 slap_schema.si_ad_contextCSN ); 2683 } 2684 if ( a->a_nvals != a->a_vals ) { 2685 ber_bvarray_free( a->a_nvals ); 2686 } 2687 a->a_nvals = NULL; 2688 ber_bvarray_free( a->a_vals ); 2689 a->a_vals = NULL; 2690 a->a_numvals = 0; 2691 } 2692 attr_valadd( a, si->si_ctxcsn, si->si_ctxcsn, si->si_numcsns ); 2693 } 2694 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 2695 } 2696 } 2697 return SLAP_CB_CONTINUE; 2698} 2699 2700enum { 2701 SP_CHKPT = 1, 2702 SP_SESSL, 2703 SP_NOPRES, 2704 SP_USEHINT 2705}; 2706 2707static ConfigDriver sp_cf_gen; 2708 2709static ConfigTable spcfg[] = { 2710 { "syncprov-checkpoint", "ops> <minutes", 3, 3, 0, ARG_MAGIC|SP_CHKPT, 2711 sp_cf_gen, "( OLcfgOvAt:1.1 NAME 'olcSpCheckpoint' " 2712 "DESC 'ContextCSN checkpoint interval in ops and minutes' " 2713 "SYNTAX OMsDirectoryString SINGLE-VALUE )", NULL, NULL }, 2714 { "syncprov-sessionlog", "ops", 2, 2, 0, ARG_INT|ARG_MAGIC|SP_SESSL, 2715 sp_cf_gen, "( OLcfgOvAt:1.2 NAME 'olcSpSessionlog' " 2716 "DESC 'Session log size in ops' " 2717 "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL }, 2718 { "syncprov-nopresent", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_NOPRES, 2719 sp_cf_gen, "( OLcfgOvAt:1.3 NAME 'olcSpNoPresent' " 2720 "DESC 'Omit Present phase processing' " 2721 "SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL }, 2722 { "syncprov-reloadhint", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_USEHINT, 2723 sp_cf_gen, "( OLcfgOvAt:1.4 NAME 'olcSpReloadHint' " 2724 "DESC 'Observe Reload Hint in Request control' " 2725 "SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL }, 2726 { NULL, NULL, 0, 0, 0, ARG_IGNORED } 2727}; 2728 2729static ConfigOCs spocs[] = { 2730 { "( OLcfgOvOc:1.1 " 2731 "NAME 'olcSyncProvConfig' " 2732 "DESC 'SyncRepl Provider configuration' " 2733 "SUP olcOverlayConfig " 2734 "MAY ( olcSpCheckpoint " 2735 "$ olcSpSessionlog " 2736 "$ olcSpNoPresent " 2737 "$ olcSpReloadHint " 2738 ") )", 2739 Cft_Overlay, spcfg }, 2740 { NULL, 0, NULL } 2741}; 2742 2743static int 2744sp_cf_gen(ConfigArgs *c) 2745{ 2746 slap_overinst *on = (slap_overinst *)c->bi; 2747 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2748 int rc = 0; 2749 2750 if ( c->op == SLAP_CONFIG_EMIT ) { 2751 switch ( c->type ) { 2752 case SP_CHKPT: 2753 if ( si->si_chkops || si->si_chktime ) { 2754 struct berval bv; 2755 /* we assume si_chktime is a multiple of 60 2756 * because the parsed value was originally 2757 * multiplied by 60 */ 2758 bv.bv_len = snprintf( c->cr_msg, sizeof( c->cr_msg ), 2759 "%d %d", si->si_chkops, si->si_chktime/60 ); 2760 if ( bv.bv_len >= sizeof( c->cr_msg ) ) { 2761 rc = 1; 2762 } else { 2763 bv.bv_val = c->cr_msg; 2764 value_add_one( &c->rvalue_vals, &bv ); 2765 } 2766 } else { 2767 rc = 1; 2768 } 2769 break; 2770 case SP_SESSL: 2771 if ( si->si_logs ) { 2772 c->value_int = si->si_logs->sl_size; 2773 } else { 2774 rc = 1; 2775 } 2776 break; 2777 case SP_NOPRES: 2778 if ( si->si_nopres ) { 2779 c->value_int = 1; 2780 } else { 2781 rc = 1; 2782 } 2783 break; 2784 case SP_USEHINT: 2785 if ( si->si_usehint ) { 2786 c->value_int = 1; 2787 } else { 2788 rc = 1; 2789 } 2790 break; 2791 } 2792 return rc; 2793 } else if ( c->op == LDAP_MOD_DELETE ) { 2794 switch ( c->type ) { 2795 case SP_CHKPT: 2796 si->si_chkops = 0; 2797 si->si_chktime = 0; 2798 break; 2799 case SP_SESSL: 2800 if ( si->si_logs ) 2801 si->si_logs->sl_size = 0; 2802 else 2803 rc = LDAP_NO_SUCH_ATTRIBUTE; 2804 break; 2805 case SP_NOPRES: 2806 if ( si->si_nopres ) 2807 si->si_nopres = 0; 2808 else 2809 rc = LDAP_NO_SUCH_ATTRIBUTE; 2810 break; 2811 case SP_USEHINT: 2812 if ( si->si_usehint ) 2813 si->si_usehint = 0; 2814 else 2815 rc = LDAP_NO_SUCH_ATTRIBUTE; 2816 break; 2817 } 2818 return rc; 2819 } 2820 switch ( c->type ) { 2821 case SP_CHKPT: 2822 if ( lutil_atoi( &si->si_chkops, c->argv[1] ) != 0 ) { 2823 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint ops # \"%s\"", 2824 c->argv[0], c->argv[1] ); 2825 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 2826 "%s: %s\n", c->log, c->cr_msg, 0 ); 2827 return ARG_BAD_CONF; 2828 } 2829 if ( si->si_chkops <= 0 ) { 2830 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint ops # \"%d\"", 2831 c->argv[0], si->si_chkops ); 2832 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 2833 "%s: %s\n", c->log, c->cr_msg, 0 ); 2834 return ARG_BAD_CONF; 2835 } 2836 if ( lutil_atoi( &si->si_chktime, c->argv[2] ) != 0 ) { 2837 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint time \"%s\"", 2838 c->argv[0], c->argv[1] ); 2839 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 2840 "%s: %s\n", c->log, c->cr_msg, 0 ); 2841 return ARG_BAD_CONF; 2842 } 2843 if ( si->si_chktime <= 0 ) { 2844 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint time \"%d\"", 2845 c->argv[0], si->si_chkops ); 2846 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 2847 "%s: %s\n", c->log, c->cr_msg, 0 ); 2848 return ARG_BAD_CONF; 2849 } 2850 si->si_chktime *= 60; 2851 break; 2852 case SP_SESSL: { 2853 sessionlog *sl; 2854 int size = c->value_int; 2855 2856 if ( size < 0 ) { 2857 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s size %d is negative", 2858 c->argv[0], size ); 2859 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 2860 "%s: %s\n", c->log, c->cr_msg, 0 ); 2861 return ARG_BAD_CONF; 2862 } 2863 sl = si->si_logs; 2864 if ( !sl ) { 2865 sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE ); 2866 sl->sl_mincsn.bv_val = (char *)(sl+1); 2867 sl->sl_mincsn.bv_len = 0; 2868 sl->sl_num = 0; 2869 sl->sl_head = sl->sl_tail = NULL; 2870 ldap_pvt_thread_mutex_init( &sl->sl_mutex ); 2871 si->si_logs = sl; 2872 } 2873 sl->sl_size = size; 2874 } 2875 break; 2876 case SP_NOPRES: 2877 si->si_nopres = c->value_int; 2878 break; 2879 case SP_USEHINT: 2880 si->si_usehint = c->value_int; 2881 break; 2882 } 2883 return rc; 2884} 2885 2886/* ITS#3456 we cannot run this search on the main thread, must use a 2887 * child thread in order to insure we have a big enough stack. 2888 */ 2889static void * 2890syncprov_db_otask( 2891 void *ptr 2892) 2893{ 2894 syncprov_findcsn( ptr, FIND_MAXCSN ); 2895 return NULL; 2896} 2897 2898/* Read any existing contextCSN from the underlying db. 2899 * Then search for any entries newer than that. If no value exists, 2900 * just generate it. Cache whatever result. 2901 */ 2902static int 2903syncprov_db_open( 2904 BackendDB *be, 2905 ConfigReply *cr 2906) 2907{ 2908 slap_overinst *on = (slap_overinst *) be->bd_info; 2909 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2910 2911 Connection conn = { 0 }; 2912 OperationBuffer opbuf; 2913 Operation *op; 2914 Entry *e = NULL; 2915 Attribute *a; 2916 int rc; 2917 void *thrctx = NULL; 2918 2919 if ( !SLAP_LASTMOD( be )) { 2920 Debug( LDAP_DEBUG_ANY, 2921 "syncprov_db_open: invalid config, lastmod must be enabled\n", 0, 0, 0 ); 2922 return -1; 2923 } 2924 2925 if ( slapMode & SLAP_TOOL_MODE ) { 2926 return 0; 2927 } 2928 2929 rc = overlay_register_control( be, LDAP_CONTROL_SYNC ); 2930 if ( rc ) { 2931 return rc; 2932 } 2933 2934 thrctx = ldap_pvt_thread_pool_context(); 2935 connection_fake_init2( &conn, &opbuf, thrctx, 0 ); 2936 op = &opbuf.ob_op; 2937 op->o_bd = be; 2938 op->o_dn = be->be_rootdn; 2939 op->o_ndn = be->be_rootndn; 2940 2941 if ( SLAP_SYNC_SUBENTRY( be )) { 2942 build_new_dn( &si->si_contextdn, be->be_nsuffix, 2943 (struct berval *)&slap_ldapsync_cn_bv, NULL ); 2944 } else { 2945 si->si_contextdn = be->be_nsuffix[0]; 2946 } 2947 rc = overlay_entry_get_ov( op, &si->si_contextdn, NULL, 2948 slap_schema.si_ad_contextCSN, 0, &e, on ); 2949 2950 if ( e ) { 2951 ldap_pvt_thread_t tid; 2952 2953 a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN ); 2954 if ( a ) { 2955 ber_bvarray_dup_x( &si->si_ctxcsn, a->a_vals, NULL ); 2956 si->si_numcsns = a->a_numvals; 2957 si->si_sids = slap_parse_csn_sids( si->si_ctxcsn, a->a_numvals, NULL ); 2958 } 2959 overlay_entry_release_ov( op, e, 0, on ); 2960 if ( si->si_ctxcsn && !SLAP_DBCLEAN( be )) { 2961 op->o_req_dn = be->be_suffix[0]; 2962 op->o_req_ndn = be->be_nsuffix[0]; 2963 op->ors_scope = LDAP_SCOPE_SUBTREE; 2964 ldap_pvt_thread_create( &tid, 0, syncprov_db_otask, op ); 2965 ldap_pvt_thread_join( tid, NULL ); 2966 } 2967 } 2968 2969 /* Didn't find a contextCSN, should we generate one? */ 2970 if ( !si->si_ctxcsn ) { 2971 char csnbuf[ LDAP_LUTIL_CSNSTR_BUFSIZE ]; 2972 struct berval csn; 2973 2974 if ( SLAP_SYNC_SHADOW( op->o_bd )) { 2975 /* If we're also a consumer, then don't generate anything. 2976 * Wait for our provider to send it to us, or for a local 2977 * modify if we have multimaster. 2978 */ 2979 goto out; 2980 } 2981 csn.bv_val = csnbuf; 2982 csn.bv_len = sizeof( csnbuf ); 2983 slap_get_csn( op, &csn, 0 ); 2984 value_add_one( &si->si_ctxcsn, &csn ); 2985 si->si_numcsns = 1; 2986 si->si_sids = ch_malloc( sizeof(int) ); 2987 si->si_sids[0] = slap_serverID; 2988 2989 /* make sure we do a checkpoint on close */ 2990 si->si_numops++; 2991 } 2992 2993out: 2994 op->o_bd->bd_info = (BackendInfo *)on; 2995 return 0; 2996} 2997 2998/* Write the current contextCSN into the underlying db. 2999 */ 3000static int 3001syncprov_db_close( 3002 BackendDB *be, 3003 ConfigReply *cr 3004) 3005{ 3006 slap_overinst *on = (slap_overinst *) be->bd_info; 3007 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3008 3009 if ( slapMode & SLAP_TOOL_MODE ) { 3010 return 0; 3011 } 3012 if ( si->si_numops ) { 3013 Connection conn = {0}; 3014 OperationBuffer opbuf; 3015 Operation *op; 3016 SlapReply rs = {REP_RESULT}; 3017 void *thrctx; 3018 3019 thrctx = ldap_pvt_thread_pool_context(); 3020 connection_fake_init2( &conn, &opbuf, thrctx, 0 ); 3021 op = &opbuf.ob_op; 3022 op->o_bd = be; 3023 op->o_dn = be->be_rootdn; 3024 op->o_ndn = be->be_rootndn; 3025 syncprov_checkpoint( op, &rs, on ); 3026 } 3027 3028 return 0; 3029} 3030 3031static int 3032syncprov_db_init( 3033 BackendDB *be, 3034 ConfigReply *cr 3035) 3036{ 3037 slap_overinst *on = (slap_overinst *)be->bd_info; 3038 syncprov_info_t *si; 3039 3040 if ( SLAP_ISGLOBALOVERLAY( be ) ) { 3041 Debug( LDAP_DEBUG_ANY, 3042 "syncprov must be instantiated within a database.\n", 3043 0, 0, 0 ); 3044 return 1; 3045 } 3046 3047 si = ch_calloc(1, sizeof(syncprov_info_t)); 3048 on->on_bi.bi_private = si; 3049 ldap_pvt_thread_rdwr_init( &si->si_csn_rwlock ); 3050 ldap_pvt_thread_mutex_init( &si->si_ops_mutex ); 3051 ldap_pvt_thread_mutex_init( &si->si_mods_mutex ); 3052 ldap_pvt_thread_mutex_init( &si->si_resp_mutex ); 3053 3054 csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN; 3055 csn_anlist[0].an_name = slap_schema.si_ad_entryCSN->ad_cname; 3056 csn_anlist[1].an_desc = slap_schema.si_ad_entryUUID; 3057 csn_anlist[1].an_name = slap_schema.si_ad_entryUUID->ad_cname; 3058 3059 uuid_anlist[0].an_desc = slap_schema.si_ad_entryUUID; 3060 uuid_anlist[0].an_name = slap_schema.si_ad_entryUUID->ad_cname; 3061 3062 return 0; 3063} 3064 3065static int 3066syncprov_db_destroy( 3067 BackendDB *be, 3068 ConfigReply *cr 3069) 3070{ 3071 slap_overinst *on = (slap_overinst *)be->bd_info; 3072 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3073 3074 if ( si ) { 3075 if ( si->si_logs ) { 3076 slog_entry *se = si->si_logs->sl_head; 3077 3078 while ( se ) { 3079 slog_entry *se_next = se->se_next; 3080 ch_free( se ); 3081 se = se_next; 3082 } 3083 3084 ch_free( si->si_logs ); 3085 } 3086 if ( si->si_ctxcsn ) 3087 ber_bvarray_free( si->si_ctxcsn ); 3088 if ( si->si_sids ) 3089 ch_free( si->si_sids ); 3090 ldap_pvt_thread_mutex_destroy( &si->si_resp_mutex ); 3091 ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex ); 3092 ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex ); 3093 ldap_pvt_thread_rdwr_destroy( &si->si_csn_rwlock ); 3094 ch_free( si ); 3095 } 3096 3097 return 0; 3098} 3099 3100static int syncprov_parseCtrl ( 3101 Operation *op, 3102 SlapReply *rs, 3103 LDAPControl *ctrl ) 3104{ 3105 ber_tag_t tag; 3106 BerElementBuffer berbuf; 3107 BerElement *ber = (BerElement *)&berbuf; 3108 ber_int_t mode; 3109 ber_len_t len; 3110 struct berval cookie = BER_BVNULL; 3111 sync_control *sr; 3112 int rhint = 0; 3113 3114 if ( op->o_sync != SLAP_CONTROL_NONE ) { 3115 rs->sr_text = "Sync control specified multiple times"; 3116 return LDAP_PROTOCOL_ERROR; 3117 } 3118 3119 if ( op->o_pagedresults != SLAP_CONTROL_NONE ) { 3120 rs->sr_text = "Sync control specified with pagedResults control"; 3121 return LDAP_PROTOCOL_ERROR; 3122 } 3123 3124 if ( BER_BVISNULL( &ctrl->ldctl_value ) ) { 3125 rs->sr_text = "Sync control value is absent"; 3126 return LDAP_PROTOCOL_ERROR; 3127 } 3128 3129 if ( BER_BVISEMPTY( &ctrl->ldctl_value ) ) { 3130 rs->sr_text = "Sync control value is empty"; 3131 return LDAP_PROTOCOL_ERROR; 3132 } 3133 3134 /* Parse the control value 3135 * syncRequestValue ::= SEQUENCE { 3136 * mode ENUMERATED { 3137 * -- 0 unused 3138 * refreshOnly (1), 3139 * -- 2 reserved 3140 * refreshAndPersist (3) 3141 * }, 3142 * cookie syncCookie OPTIONAL 3143 * } 3144 */ 3145 3146 ber_init2( ber, &ctrl->ldctl_value, 0 ); 3147 3148 if ( (tag = ber_scanf( ber, "{i" /*}*/, &mode )) == LBER_ERROR ) { 3149 rs->sr_text = "Sync control : mode decoding error"; 3150 return LDAP_PROTOCOL_ERROR; 3151 } 3152 3153 switch( mode ) { 3154 case LDAP_SYNC_REFRESH_ONLY: 3155 mode = SLAP_SYNC_REFRESH; 3156 break; 3157 case LDAP_SYNC_REFRESH_AND_PERSIST: 3158 mode = SLAP_SYNC_REFRESH_AND_PERSIST; 3159 break; 3160 default: 3161 rs->sr_text = "Sync control : unknown update mode"; 3162 return LDAP_PROTOCOL_ERROR; 3163 } 3164 3165 tag = ber_peek_tag( ber, &len ); 3166 3167 if ( tag == LDAP_TAG_SYNC_COOKIE ) { 3168 if (( ber_scanf( ber, /*{*/ "m", &cookie )) == LBER_ERROR ) { 3169 rs->sr_text = "Sync control : cookie decoding error"; 3170 return LDAP_PROTOCOL_ERROR; 3171 } 3172 tag = ber_peek_tag( ber, &len ); 3173 } 3174 if ( tag == LDAP_TAG_RELOAD_HINT ) { 3175 if (( ber_scanf( ber, /*{*/ "b", &rhint )) == LBER_ERROR ) { 3176 rs->sr_text = "Sync control : rhint decoding error"; 3177 return LDAP_PROTOCOL_ERROR; 3178 } 3179 } 3180 if (( ber_scanf( ber, /*{*/ "}")) == LBER_ERROR ) { 3181 rs->sr_text = "Sync control : decoding error"; 3182 return LDAP_PROTOCOL_ERROR; 3183 } 3184 sr = op->o_tmpcalloc( 1, sizeof(struct sync_control), op->o_tmpmemctx ); 3185 sr->sr_rhint = rhint; 3186 if (!BER_BVISNULL(&cookie)) { 3187 ber_dupbv_x( &sr->sr_state.octet_str, &cookie, op->o_tmpmemctx ); 3188 /* If parse fails, pretend no cookie was sent */ 3189 if ( slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx ) || 3190 sr->sr_state.rid == -1 ) { 3191 if ( sr->sr_state.ctxcsn ) { 3192 ber_bvarray_free_x( sr->sr_state.ctxcsn, op->o_tmpmemctx ); 3193 sr->sr_state.ctxcsn = NULL; 3194 } 3195 sr->sr_state.numcsns = 0; 3196 } 3197 } 3198 3199 op->o_controls[slap_cids.sc_LDAPsync] = sr; 3200 3201 op->o_sync = ctrl->ldctl_iscritical 3202 ? SLAP_CONTROL_CRITICAL 3203 : SLAP_CONTROL_NONCRITICAL; 3204 3205 op->o_sync_mode |= mode; /* o_sync_mode shares o_sync */ 3206 3207 return LDAP_SUCCESS; 3208} 3209 3210/* This overlay is set up for dynamic loading via moduleload. For static 3211 * configuration, you'll need to arrange for the slap_overinst to be 3212 * initialized and registered by some other function inside slapd. 3213 */ 3214 3215static slap_overinst syncprov; 3216 3217int 3218syncprov_initialize() 3219{ 3220 int rc; 3221 3222 rc = register_supported_control( LDAP_CONTROL_SYNC, 3223 SLAP_CTRL_SEARCH, NULL, 3224 syncprov_parseCtrl, &slap_cids.sc_LDAPsync ); 3225 if ( rc != LDAP_SUCCESS ) { 3226 Debug( LDAP_DEBUG_ANY, 3227 "syncprov_init: Failed to register control %d\n", rc, 0, 0 ); 3228 return rc; 3229 } 3230 3231 syncprov.on_bi.bi_type = "syncprov"; 3232 syncprov.on_bi.bi_db_init = syncprov_db_init; 3233 syncprov.on_bi.bi_db_destroy = syncprov_db_destroy; 3234 syncprov.on_bi.bi_db_open = syncprov_db_open; 3235 syncprov.on_bi.bi_db_close = syncprov_db_close; 3236 3237 syncprov.on_bi.bi_op_abandon = syncprov_op_abandon; 3238 syncprov.on_bi.bi_op_cancel = syncprov_op_abandon; 3239 3240 syncprov.on_bi.bi_op_add = syncprov_op_mod; 3241 syncprov.on_bi.bi_op_compare = syncprov_op_compare; 3242 syncprov.on_bi.bi_op_delete = syncprov_op_mod; 3243 syncprov.on_bi.bi_op_modify = syncprov_op_mod; 3244 syncprov.on_bi.bi_op_modrdn = syncprov_op_mod; 3245 syncprov.on_bi.bi_op_search = syncprov_op_search; 3246 syncprov.on_bi.bi_extended = syncprov_op_extended; 3247 syncprov.on_bi.bi_operational = syncprov_operational; 3248 3249 syncprov.on_bi.bi_cf_ocs = spocs; 3250 3251 generic_filter.f_desc = slap_schema.si_ad_objectClass; 3252 3253 rc = config_register_schema( spcfg, spocs ); 3254 if ( rc ) return rc; 3255 3256 return overlay_register( &syncprov ); 3257} 3258 3259#if SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC 3260int 3261init_module( int argc, char *argv[] ) 3262{ 3263 return syncprov_initialize(); 3264} 3265#endif /* SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC */ 3266 3267#endif /* defined(SLAPD_OVER_SYNCPROV) */ 3268