1/* $NetBSD$ */ 2 3/* OpenLDAP: pkg/ldap/servers/slapd/overlays/syncprov.c,v 1.147.2.75 2010/06/10 18:50:48 quanah Exp */ 4/* syncprov.c - syncrepl provider */ 5/* This work is part of OpenLDAP Software <http://www.openldap.org/>. 6 * 7 * Copyright 2004-2010 The OpenLDAP Foundation. 8 * All rights reserved. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted only as authorized by the OpenLDAP 12 * Public License. 13 * 14 * A copy of this license is available in the file LICENSE in the 15 * top-level directory of the distribution or, alternatively, at 16 * <http://www.OpenLDAP.org/license.html>. 17 */ 18/* ACKNOWLEDGEMENTS: 19 * This work was initially developed by Howard Chu for inclusion in 20 * OpenLDAP Software. 21 */ 22 23#include "portable.h" 24 25#ifdef SLAPD_OVER_SYNCPROV 26 27#include <ac/string.h> 28#include "lutil.h" 29#include "slap.h" 30#include "config.h" 31#include "ldap_rq.h" 32 33#ifdef LDAP_DEVEL 34#define CHECK_CSN 1 35#endif 36 37/* A modify request on a particular entry */ 38typedef struct modinst { 39 struct modinst *mi_next; 40 Operation *mi_op; 41} modinst; 42 43typedef struct modtarget { 44 struct modinst *mt_mods; 45 struct modinst *mt_tail; 46 Operation *mt_op; 47 ldap_pvt_thread_mutex_t mt_mutex; 48} modtarget; 49 50/* A queued result of a persistent search */ 51typedef struct syncres { 52 struct syncres *s_next; 53 Entry *s_e; 54 struct berval s_dn; 55 struct berval s_ndn; 56 struct berval s_uuid; 57 struct berval s_csn; 58 char s_mode; 59 char s_isreference; 60} syncres; 61 62/* Record of a persistent search */ 63typedef struct syncops { 64 struct syncops *s_next; 65 struct berval s_base; /* ndn of search base */ 66 ID s_eid; /* entryID of search base */ 67 Operation *s_op; /* search op */ 68 int s_rid; 69 int s_sid; 70 struct berval s_filterstr; 71 int s_flags; /* search status */ 72#define PS_IS_REFRESHING 0x01 73#define PS_IS_DETACHED 0x02 74#define PS_WROTE_BASE 0x04 75#define PS_FIND_BASE 0x08 76#define PS_FIX_FILTER 0x10 77#define PS_TASK_QUEUED 0x20 78 79 int s_inuse; /* reference count */ 80 struct syncres *s_res; 81 struct syncres *s_restail; 82 ldap_pvt_thread_mutex_t s_mutex; 83} syncops; 84 85/* A received sync control */ 86typedef struct sync_control { 87 struct sync_cookie sr_state; 88 int sr_rhint; 89} sync_control; 90 91#if 0 /* moved back to slap.h */ 92#define o_sync o_ctrlflag[slap_cids.sc_LDAPsync] 93#endif 94/* o_sync_mode uses data bits of o_sync */ 95#define o_sync_mode o_ctrlflag[slap_cids.sc_LDAPsync] 96 97#define SLAP_SYNC_NONE (LDAP_SYNC_NONE<<SLAP_CONTROL_SHIFT) 98#define SLAP_SYNC_REFRESH (LDAP_SYNC_REFRESH_ONLY<<SLAP_CONTROL_SHIFT) 99#define SLAP_SYNC_PERSIST (LDAP_SYNC_RESERVED<<SLAP_CONTROL_SHIFT) 100#define SLAP_SYNC_REFRESH_AND_PERSIST (LDAP_SYNC_REFRESH_AND_PERSIST<<SLAP_CONTROL_SHIFT) 101 102/* Record of which searches matched at premodify step */ 103typedef struct syncmatches { 104 struct syncmatches *sm_next; 105 syncops *sm_op; 106} syncmatches; 107 108/* Session log data */ 109typedef struct slog_entry { 110 struct slog_entry *se_next; 111 struct berval se_uuid; 112 struct berval se_csn; 113 int se_sid; 114 ber_tag_t se_tag; 115} slog_entry; 116 117typedef struct sessionlog { 118 struct berval sl_mincsn; 119 int sl_num; 120 int sl_size; 121 slog_entry *sl_head; 122 slog_entry *sl_tail; 123 ldap_pvt_thread_mutex_t sl_mutex; 124} sessionlog; 125 126/* The main state for this overlay */ 127typedef struct syncprov_info_t { 128 syncops *si_ops; 129 BerVarray si_ctxcsn; /* ldapsync context */ 130 struct berval si_contextdn; 131 int *si_sids; 132 int si_numcsns; 133 int si_chkops; /* checkpointing info */ 134 int si_chktime; 135 int si_numops; /* number of ops since last checkpoint */ 136 int si_nopres; /* Skip present phase */ 137 int si_usehint; /* use reload hint */ 138 time_t si_chklast; /* time of last checkpoint */ 139 Avlnode *si_mods; /* entries being modified */ 140 sessionlog *si_logs; 141 ldap_pvt_thread_rdwr_t si_csn_rwlock; 142 ldap_pvt_thread_mutex_t si_ops_mutex; 143 ldap_pvt_thread_mutex_t si_mods_mutex; 144 ldap_pvt_thread_mutex_t si_resp_mutex; 145} syncprov_info_t; 146 147typedef struct opcookie { 148 slap_overinst *son; 149 syncmatches *smatches; 150 modtarget *smt; 151 Entry *se; 152 struct berval sdn; /* DN of entry, for deletes */ 153 struct berval sndn; 154 struct berval suuid; /* UUID of entry */ 155 struct berval sctxcsn; 156 short osid; /* sid of op csn */ 157 short rsid; /* sid of relay */ 158 short sreference; /* Is the entry a reference? */ 159} opcookie; 160 161typedef struct mutexint { 162 ldap_pvt_thread_mutex_t mi_mutex; 163 int mi_int; 164} mutexint; 165 166typedef struct fbase_cookie { 167 struct berval *fdn; /* DN of a modified entry, for scope testing */ 168 syncops *fss; /* persistent search we're testing against */ 169 int fbase; /* if TRUE we found the search base and it's still valid */ 170 int fscope; /* if TRUE then fdn is within the psearch scope */ 171} fbase_cookie; 172 173static AttributeName csn_anlist[3]; 174static AttributeName uuid_anlist[2]; 175 176/* Build a LDAPsync intermediate state control */ 177static int 178syncprov_state_ctrl( 179 Operation *op, 180 SlapReply *rs, 181 Entry *e, 182 int entry_sync_state, 183 LDAPControl **ctrls, 184 int num_ctrls, 185 int send_cookie, 186 struct berval *cookie ) 187{ 188 Attribute* a; 189 int ret; 190 191 BerElementBuffer berbuf; 192 BerElement *ber = (BerElement *)&berbuf; 193 194 struct berval entryuuid_bv = BER_BVNULL; 195 196 ber_init2( ber, 0, LBER_USE_DER ); 197 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 198 199 ctrls[num_ctrls] = op->o_tmpalloc( sizeof ( LDAPControl ), op->o_tmpmemctx ); 200 201 for ( a = e->e_attrs; a != NULL; a = a->a_next ) { 202 AttributeDescription *desc = a->a_desc; 203 if ( desc == slap_schema.si_ad_entryUUID ) { 204 entryuuid_bv = a->a_nvals[0]; 205 break; 206 } 207 } 208 209 /* FIXME: what if entryuuid is NULL or empty ? */ 210 211 if ( send_cookie && cookie ) { 212 ber_printf( ber, "{eOON}", 213 entry_sync_state, &entryuuid_bv, cookie ); 214 } else { 215 ber_printf( ber, "{eON}", 216 entry_sync_state, &entryuuid_bv ); 217 } 218 219 ctrls[num_ctrls]->ldctl_oid = LDAP_CONTROL_SYNC_STATE; 220 ctrls[num_ctrls]->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL); 221 ret = ber_flatten2( ber, &ctrls[num_ctrls]->ldctl_value, 1 ); 222 223 ber_free_buf( ber ); 224 225 if ( ret < 0 ) { 226 Debug( LDAP_DEBUG_TRACE, 227 "slap_build_sync_ctrl: ber_flatten2 failed\n", 228 0, 0, 0 ); 229 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 230 return ret; 231 } 232 233 return LDAP_SUCCESS; 234} 235 236/* Build a LDAPsync final state control */ 237static int 238syncprov_done_ctrl( 239 Operation *op, 240 SlapReply *rs, 241 LDAPControl **ctrls, 242 int num_ctrls, 243 int send_cookie, 244 struct berval *cookie, 245 int refreshDeletes ) 246{ 247 int ret; 248 BerElementBuffer berbuf; 249 BerElement *ber = (BerElement *)&berbuf; 250 251 ber_init2( ber, NULL, LBER_USE_DER ); 252 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 253 254 ctrls[num_ctrls] = op->o_tmpalloc( sizeof ( LDAPControl ), op->o_tmpmemctx ); 255 256 ber_printf( ber, "{" ); 257 if ( send_cookie && cookie ) { 258 ber_printf( ber, "O", cookie ); 259 } 260 if ( refreshDeletes == LDAP_SYNC_REFRESH_DELETES ) { 261 ber_printf( ber, "b", refreshDeletes ); 262 } 263 ber_printf( ber, "N}" ); 264 265 ctrls[num_ctrls]->ldctl_oid = LDAP_CONTROL_SYNC_DONE; 266 ctrls[num_ctrls]->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL); 267 ret = ber_flatten2( ber, &ctrls[num_ctrls]->ldctl_value, 1 ); 268 269 ber_free_buf( ber ); 270 271 if ( ret < 0 ) { 272 Debug( LDAP_DEBUG_TRACE, 273 "syncprov_done_ctrl: ber_flatten2 failed\n", 274 0, 0, 0 ); 275 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 276 return ret; 277 } 278 279 return LDAP_SUCCESS; 280} 281 282static int 283syncprov_sendinfo( 284 Operation *op, 285 SlapReply *rs, 286 int type, 287 struct berval *cookie, 288 int refreshDone, 289 BerVarray syncUUIDs, 290 int refreshDeletes ) 291{ 292 BerElementBuffer berbuf; 293 BerElement *ber = (BerElement *)&berbuf; 294 struct berval rspdata; 295 296 int ret; 297 298 ber_init2( ber, NULL, LBER_USE_DER ); 299 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 300 301 if ( type ) { 302 switch ( type ) { 303 case LDAP_TAG_SYNC_NEW_COOKIE: 304 ber_printf( ber, "tO", type, cookie ); 305 break; 306 case LDAP_TAG_SYNC_REFRESH_DELETE: 307 case LDAP_TAG_SYNC_REFRESH_PRESENT: 308 ber_printf( ber, "t{", type ); 309 if ( cookie ) { 310 ber_printf( ber, "O", cookie ); 311 } 312 if ( refreshDone == 0 ) { 313 ber_printf( ber, "b", refreshDone ); 314 } 315 ber_printf( ber, "N}" ); 316 break; 317 case LDAP_TAG_SYNC_ID_SET: 318 ber_printf( ber, "t{", type ); 319 if ( cookie ) { 320 ber_printf( ber, "O", cookie ); 321 } 322 if ( refreshDeletes == 1 ) { 323 ber_printf( ber, "b", refreshDeletes ); 324 } 325 ber_printf( ber, "[W]", syncUUIDs ); 326 ber_printf( ber, "N}" ); 327 break; 328 default: 329 Debug( LDAP_DEBUG_TRACE, 330 "syncprov_sendinfo: invalid syncinfo type (%d)\n", 331 type, 0, 0 ); 332 return LDAP_OTHER; 333 } 334 } 335 336 ret = ber_flatten2( ber, &rspdata, 0 ); 337 338 if ( ret < 0 ) { 339 Debug( LDAP_DEBUG_TRACE, 340 "syncprov_sendinfo: ber_flatten2 failed\n", 341 0, 0, 0 ); 342 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 343 return ret; 344 } 345 346 rs->sr_rspoid = LDAP_SYNC_INFO; 347 rs->sr_rspdata = &rspdata; 348 send_ldap_intermediate( op, rs ); 349 rs->sr_rspdata = NULL; 350 ber_free_buf( ber ); 351 352 return LDAP_SUCCESS; 353} 354 355/* Find a modtarget in an AVL tree */ 356static int 357sp_avl_cmp( const void *c1, const void *c2 ) 358{ 359 const modtarget *m1, *m2; 360 int rc; 361 362 m1 = c1; m2 = c2; 363 rc = m1->mt_op->o_req_ndn.bv_len - m2->mt_op->o_req_ndn.bv_len; 364 365 if ( rc ) return rc; 366 return ber_bvcmp( &m1->mt_op->o_req_ndn, &m2->mt_op->o_req_ndn ); 367} 368 369/* syncprov_findbase: 370 * finds the true DN of the base of a search (with alias dereferencing) and 371 * checks to make sure the base entry doesn't get replaced with a different 372 * entry (e.g., swapping trees via ModDN, or retargeting an alias). If a 373 * change is detected, any persistent search on this base must be terminated / 374 * reloaded. 375 * On the first call, we just save the DN and entryID. On subsequent calls 376 * we compare the DN and entryID with the saved values. 377 */ 378static int 379findbase_cb( Operation *op, SlapReply *rs ) 380{ 381 slap_callback *sc = op->o_callback; 382 383 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { 384 fbase_cookie *fc = sc->sc_private; 385 386 /* If no entryID, we're looking for the first time. 387 * Just store whatever we got. 388 */ 389 if ( fc->fss->s_eid == NOID ) { 390 fc->fbase = 2; 391 fc->fss->s_eid = rs->sr_entry->e_id; 392 ber_dupbv( &fc->fss->s_base, &rs->sr_entry->e_nname ); 393 394 } else if ( rs->sr_entry->e_id == fc->fss->s_eid && 395 dn_match( &rs->sr_entry->e_nname, &fc->fss->s_base )) { 396 397 /* OK, the DN is the same and the entryID is the same. */ 398 fc->fbase = 1; 399 } 400 } 401 if ( rs->sr_err != LDAP_SUCCESS ) { 402 Debug( LDAP_DEBUG_ANY, "findbase failed! %d\n", rs->sr_err,0,0 ); 403 } 404 return LDAP_SUCCESS; 405} 406 407static Filter generic_filter = { LDAP_FILTER_PRESENT, { 0 }, NULL }; 408static struct berval generic_filterstr = BER_BVC("(objectclass=*)"); 409 410static int 411syncprov_findbase( Operation *op, fbase_cookie *fc ) 412{ 413 /* Use basic parameters from syncrepl search, but use 414 * current op's threadctx / tmpmemctx 415 */ 416 ldap_pvt_thread_mutex_lock( &fc->fss->s_mutex ); 417 if ( fc->fss->s_flags & PS_FIND_BASE ) { 418 slap_callback cb = {0}; 419 Operation fop; 420 SlapReply frs = { REP_RESULT }; 421 int rc; 422 423 fc->fss->s_flags ^= PS_FIND_BASE; 424 ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex ); 425 426 fop = *fc->fss->s_op; 427 428 fop.o_bd = fop.o_bd->bd_self; 429 fop.o_hdr = op->o_hdr; 430 fop.o_time = op->o_time; 431 fop.o_tincr = op->o_tincr; 432 433 cb.sc_response = findbase_cb; 434 cb.sc_private = fc; 435 436 fop.o_sync_mode = 0; /* turn off sync mode */ 437 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 438 fop.o_callback = &cb; 439 fop.o_tag = LDAP_REQ_SEARCH; 440 fop.ors_scope = LDAP_SCOPE_BASE; 441 fop.ors_limit = NULL; 442 fop.ors_slimit = 1; 443 fop.ors_tlimit = SLAP_NO_LIMIT; 444 fop.ors_attrs = slap_anlist_no_attrs; 445 fop.ors_attrsonly = 1; 446 fop.ors_filter = &generic_filter; 447 fop.ors_filterstr = generic_filterstr; 448 449 rc = fop.o_bd->be_search( &fop, &frs ); 450 } else { 451 ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex ); 452 fc->fbase = 1; 453 } 454 455 /* After the first call, see if the fdn resides in the scope */ 456 if ( fc->fbase == 1 ) { 457 switch ( fc->fss->s_op->ors_scope ) { 458 case LDAP_SCOPE_BASE: 459 fc->fscope = dn_match( fc->fdn, &fc->fss->s_base ); 460 break; 461 case LDAP_SCOPE_ONELEVEL: { 462 struct berval pdn; 463 dnParent( fc->fdn, &pdn ); 464 fc->fscope = dn_match( &pdn, &fc->fss->s_base ); 465 break; } 466 case LDAP_SCOPE_SUBTREE: 467 fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ); 468 break; 469 case LDAP_SCOPE_SUBORDINATE: 470 fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ) && 471 !dn_match( fc->fdn, &fc->fss->s_base ); 472 break; 473 } 474 } 475 476 if ( fc->fbase ) 477 return LDAP_SUCCESS; 478 479 /* If entryID has changed, then the base of this search has 480 * changed. Invalidate the psearch. 481 */ 482 return LDAP_NO_SUCH_OBJECT; 483} 484 485/* syncprov_findcsn: 486 * This function has three different purposes, but they all use a search 487 * that filters on entryCSN so they're combined here. 488 * 1: at startup time, after a contextCSN has been read from the database, 489 * we search for all entries with CSN >= contextCSN in case the contextCSN 490 * was not checkpointed at the previous shutdown. 491 * 492 * 2: when the current contextCSN is known and we have a sync cookie, we search 493 * for one entry with CSN = the cookie CSN. If not found, try <= cookie CSN. 494 * If an entry is found, the cookie CSN is valid, otherwise it is stale. 495 * 496 * 3: during a refresh phase, we search for all entries with CSN <= the cookie 497 * CSN, and generate Present records for them. We always collect this result 498 * in SyncID sets, even if there's only one match. 499 */ 500typedef enum find_csn_t { 501 FIND_MAXCSN = 1, 502 FIND_CSN = 2, 503 FIND_PRESENT = 3 504} find_csn_t; 505 506static int 507findmax_cb( Operation *op, SlapReply *rs ) 508{ 509 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { 510 struct berval *maxcsn = op->o_callback->sc_private; 511 Attribute *a = attr_find( rs->sr_entry->e_attrs, 512 slap_schema.si_ad_entryCSN ); 513 514 if ( a && ber_bvcmp( &a->a_vals[0], maxcsn ) > 0 && 515 slap_parse_csn_sid( &a->a_vals[0] ) == slap_serverID ) { 516 maxcsn->bv_len = a->a_vals[0].bv_len; 517 strcpy( maxcsn->bv_val, a->a_vals[0].bv_val ); 518 } 519 } 520 return LDAP_SUCCESS; 521} 522 523static int 524findcsn_cb( Operation *op, SlapReply *rs ) 525{ 526 slap_callback *sc = op->o_callback; 527 528 /* We just want to know that at least one exists, so it's OK if 529 * we exceed the unchecked limit. 530 */ 531 if ( rs->sr_err == LDAP_ADMINLIMIT_EXCEEDED || 532 (rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS )) { 533 sc->sc_private = (void *)1; 534 } 535 return LDAP_SUCCESS; 536} 537 538/* Build a list of entryUUIDs for sending in a SyncID set */ 539 540#define UUID_LEN 16 541 542typedef struct fpres_cookie { 543 int num; 544 BerVarray uuids; 545 char *last; 546} fpres_cookie; 547 548static int 549findpres_cb( Operation *op, SlapReply *rs ) 550{ 551 slap_callback *sc = op->o_callback; 552 fpres_cookie *pc = sc->sc_private; 553 Attribute *a; 554 int ret = SLAP_CB_CONTINUE; 555 556 switch ( rs->sr_type ) { 557 case REP_SEARCH: 558 a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID ); 559 if ( a ) { 560 pc->uuids[pc->num].bv_val = pc->last; 561 AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val, 562 pc->uuids[pc->num].bv_len ); 563 pc->num++; 564 pc->last = pc->uuids[pc->num].bv_val; 565 pc->uuids[pc->num].bv_val = NULL; 566 } 567 ret = LDAP_SUCCESS; 568 if ( pc->num != SLAP_SYNCUUID_SET_SIZE ) 569 break; 570 /* FALLTHRU */ 571 case REP_RESULT: 572 ret = rs->sr_err; 573 if ( pc->num ) { 574 ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, 575 0, pc->uuids, 0 ); 576 pc->uuids[pc->num].bv_val = pc->last; 577 pc->num = 0; 578 pc->last = pc->uuids[0].bv_val; 579 } 580 break; 581 default: 582 break; 583 } 584 return ret; 585} 586 587static int 588syncprov_findcsn( Operation *op, find_csn_t mode ) 589{ 590 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 591 syncprov_info_t *si = on->on_bi.bi_private; 592 593 slap_callback cb = {0}; 594 Operation fop; 595 SlapReply frs = { REP_RESULT }; 596 char buf[LDAP_PVT_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")]; 597 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; 598 struct berval maxcsn; 599 Filter cf; 600 AttributeAssertion eq = ATTRIBUTEASSERTION_INIT; 601 fpres_cookie pcookie; 602 sync_control *srs = NULL; 603 struct slap_limits_set fc_limits; 604 int i, rc = LDAP_SUCCESS, findcsn_retry = 1; 605 int maxid; 606 607 if ( mode != FIND_MAXCSN ) { 608 srs = op->o_controls[slap_cids.sc_LDAPsync]; 609 } 610 611 fop = *op; 612 fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync_mode */ 613 /* We want pure entries, not referrals */ 614 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 615 616 cf.f_ava = &eq; 617 cf.f_av_desc = slap_schema.si_ad_entryCSN; 618 BER_BVZERO( &cf.f_av_value ); 619 cf.f_next = NULL; 620 621 fop.o_callback = &cb; 622 fop.ors_limit = NULL; 623 fop.ors_tlimit = SLAP_NO_LIMIT; 624 fop.ors_filter = &cf; 625 fop.ors_filterstr.bv_val = buf; 626 627again: 628 switch( mode ) { 629 case FIND_MAXCSN: 630 cf.f_choice = LDAP_FILTER_GE; 631 /* If there are multiple CSNs, use the one with our serverID */ 632 for ( i=0; i<si->si_numcsns; i++) { 633 if ( slap_serverID == si->si_sids[i] ) { 634 maxid = i; 635 break; 636 } 637 } 638 if ( i == si->si_numcsns ) { 639 /* No match: this is multimaster, and none of the content in the DB 640 * originated locally. Treat like no CSN. 641 */ 642 return LDAP_NO_SUCH_OBJECT; 643 } 644 cf.f_av_value = si->si_ctxcsn[maxid]; 645 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 646 "(entryCSN>=%s)", cf.f_av_value.bv_val ); 647 if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) { 648 return LDAP_OTHER; 649 } 650 fop.ors_attrsonly = 0; 651 fop.ors_attrs = csn_anlist; 652 fop.ors_slimit = SLAP_NO_LIMIT; 653 cb.sc_private = &maxcsn; 654 cb.sc_response = findmax_cb; 655 strcpy( cbuf, cf.f_av_value.bv_val ); 656 maxcsn.bv_val = cbuf; 657 maxcsn.bv_len = cf.f_av_value.bv_len; 658 break; 659 case FIND_CSN: 660 if ( BER_BVISEMPTY( &cf.f_av_value )) { 661 cf.f_av_value = srs->sr_state.ctxcsn[0]; 662 /* If there are multiple CSNs, use the smallest */ 663 for ( i=1; i<srs->sr_state.numcsns; i++ ) { 664 if ( ber_bvcmp( &cf.f_av_value, &srs->sr_state.ctxcsn[i] ) 665 > 0 ) { 666 cf.f_av_value = srs->sr_state.ctxcsn[i]; 667 } 668 } 669 } 670 /* Look for exact match the first time */ 671 if ( findcsn_retry ) { 672 cf.f_choice = LDAP_FILTER_EQUALITY; 673 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 674 "(entryCSN=%s)", cf.f_av_value.bv_val ); 675 /* On retry, look for <= */ 676 } else { 677 cf.f_choice = LDAP_FILTER_LE; 678 fop.ors_limit = &fc_limits; 679 memset( &fc_limits, 0, sizeof( fc_limits )); 680 fc_limits.lms_s_unchecked = 1; 681 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 682 "(entryCSN<=%s)", cf.f_av_value.bv_val ); 683 } 684 if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) { 685 return LDAP_OTHER; 686 } 687 fop.ors_attrsonly = 1; 688 fop.ors_attrs = slap_anlist_no_attrs; 689 fop.ors_slimit = 1; 690 cb.sc_private = NULL; 691 cb.sc_response = findcsn_cb; 692 break; 693 case FIND_PRESENT: 694 fop.ors_filter = op->ors_filter; 695 fop.ors_filterstr = op->ors_filterstr; 696 fop.ors_attrsonly = 0; 697 fop.ors_attrs = uuid_anlist; 698 fop.ors_slimit = SLAP_NO_LIMIT; 699 cb.sc_private = &pcookie; 700 cb.sc_response = findpres_cb; 701 pcookie.num = 0; 702 703 /* preallocate storage for a full set */ 704 pcookie.uuids = op->o_tmpalloc( (SLAP_SYNCUUID_SET_SIZE+1) * 705 sizeof(struct berval) + SLAP_SYNCUUID_SET_SIZE * UUID_LEN, 706 op->o_tmpmemctx ); 707 pcookie.last = (char *)(pcookie.uuids + SLAP_SYNCUUID_SET_SIZE+1); 708 pcookie.uuids[0].bv_val = pcookie.last; 709 pcookie.uuids[0].bv_len = UUID_LEN; 710 for (i=1; i<SLAP_SYNCUUID_SET_SIZE; i++) { 711 pcookie.uuids[i].bv_val = pcookie.uuids[i-1].bv_val + UUID_LEN; 712 pcookie.uuids[i].bv_len = UUID_LEN; 713 } 714 break; 715 } 716 717 fop.o_bd->bd_info = (BackendInfo *)on->on_info; 718 fop.o_bd->be_search( &fop, &frs ); 719 fop.o_bd->bd_info = (BackendInfo *)on; 720 721 switch( mode ) { 722 case FIND_MAXCSN: 723 if ( ber_bvcmp( &si->si_ctxcsn[maxid], &maxcsn )) { 724#ifdef CHECK_CSN 725 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 726 assert( !syn->ssyn_validate( syn, &maxcsn )); 727#endif 728 ber_bvreplace( &si->si_ctxcsn[maxid], &maxcsn ); 729 si->si_numops++; /* ensure a checkpoint */ 730 } 731 break; 732 case FIND_CSN: 733 /* If matching CSN was not found, invalidate the context. */ 734 if ( !cb.sc_private ) { 735 /* If we didn't find an exact match, then try for <= */ 736 if ( findcsn_retry ) { 737 findcsn_retry = 0; 738 goto again; 739 } 740 rc = LDAP_NO_SUCH_OBJECT; 741 } 742 break; 743 case FIND_PRESENT: 744 op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx ); 745 break; 746 } 747 748 return rc; 749} 750 751/* Should find a place to cache these */ 752static mutexint *get_mutexint() 753{ 754 mutexint *mi = ch_malloc( sizeof( mutexint )); 755 ldap_pvt_thread_mutex_init( &mi->mi_mutex ); 756 mi->mi_int = 1; 757 return mi; 758} 759 760static void inc_mutexint( mutexint *mi ) 761{ 762 ldap_pvt_thread_mutex_lock( &mi->mi_mutex ); 763 mi->mi_int++; 764 ldap_pvt_thread_mutex_unlock( &mi->mi_mutex ); 765} 766 767/* return resulting counter */ 768static int dec_mutexint( mutexint *mi ) 769{ 770 int i; 771 ldap_pvt_thread_mutex_lock( &mi->mi_mutex ); 772 i = --mi->mi_int; 773 ldap_pvt_thread_mutex_unlock( &mi->mi_mutex ); 774 if ( !i ) { 775 ldap_pvt_thread_mutex_destroy( &mi->mi_mutex ); 776 ch_free( mi ); 777 } 778 return i; 779} 780 781static void 782syncprov_free_syncop( syncops *so ) 783{ 784 syncres *sr, *srnext; 785 GroupAssertion *ga, *gnext; 786 787 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 788 if ( --so->s_inuse > 0 ) { 789 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 790 return; 791 } 792 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 793 if ( so->s_flags & PS_IS_DETACHED ) { 794 filter_free( so->s_op->ors_filter ); 795 for ( ga = so->s_op->o_groups; ga; ga=gnext ) { 796 gnext = ga->ga_next; 797 ch_free( ga ); 798 } 799 ch_free( so->s_op ); 800 } 801 ch_free( so->s_base.bv_val ); 802 for ( sr=so->s_res; sr; sr=srnext ) { 803 srnext = sr->s_next; 804 if ( sr->s_e ) { 805 if ( !dec_mutexint( sr->s_e->e_private )) { 806 sr->s_e->e_private = NULL; 807 entry_free( sr->s_e ); 808 } 809 } 810 ch_free( sr ); 811 } 812 ldap_pvt_thread_mutex_destroy( &so->s_mutex ); 813 ch_free( so ); 814} 815 816/* Send a persistent search response */ 817static int 818syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode ) 819{ 820 slap_overinst *on = opc->son; 821 822 SlapReply rs = { REP_SEARCH }; 823 LDAPControl *ctrls[2]; 824 struct berval cookie = BER_BVNULL, csns[2]; 825 Entry e_uuid = {0}; 826 Attribute a_uuid = {0}; 827 828 if ( so->s_op->o_abandon ) 829 return SLAPD_ABANDON; 830 831 ctrls[1] = NULL; 832 if ( !BER_BVISNULL( &opc->sctxcsn )) { 833 csns[0] = opc->sctxcsn; 834 BER_BVZERO( &csns[1] ); 835 slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, slap_serverID ? slap_serverID : -1 ); 836 } 837 838#ifdef LDAP_DEBUG 839 if ( so->s_sid > 0 ) { 840 Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: to=%03x, cookie=%s\n", 841 so->s_sid, cookie.bv_val ? cookie.bv_val : "", 0 ); 842 } else { 843 Debug( LDAP_DEBUG_SYNC, "syncprov_sendresp: cookie=%s\n", 844 cookie.bv_val ? cookie.bv_val : "", 0, 0 ); 845 } 846#endif 847 848 e_uuid.e_attrs = &a_uuid; 849 a_uuid.a_desc = slap_schema.si_ad_entryUUID; 850 a_uuid.a_nvals = &opc->suuid; 851 rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid, 852 mode, ctrls, 0, 1, &cookie ); 853 if ( !BER_BVISNULL( &cookie )) { 854 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 855 } 856 857 rs.sr_ctrls = ctrls; 858 rs.sr_entry = &e_uuid; 859 if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) { 860 e_uuid = *opc->se; 861 e_uuid.e_private = NULL; 862 } 863 864 switch( mode ) { 865 case LDAP_SYNC_ADD: 866 if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { 867 rs.sr_ref = get_entry_referrals( op, rs.sr_entry ); 868 rs.sr_err = send_search_reference( op, &rs ); 869 ber_bvarray_free( rs.sr_ref ); 870 break; 871 } 872 /* fallthru */ 873 case LDAP_SYNC_MODIFY: 874 rs.sr_attrs = op->ors_attrs; 875 rs.sr_err = send_search_entry( op, &rs ); 876 break; 877 case LDAP_SYNC_DELETE: 878 e_uuid.e_attrs = NULL; 879 e_uuid.e_name = opc->sdn; 880 e_uuid.e_nname = opc->sndn; 881 if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { 882 struct berval bv = BER_BVNULL; 883 rs.sr_ref = &bv; 884 rs.sr_err = send_search_reference( op, &rs ); 885 } else { 886 rs.sr_err = send_search_entry( op, &rs ); 887 } 888 break; 889 default: 890 assert(0); 891 } 892 /* In case someone else freed it already? */ 893 if ( rs.sr_ctrls ) { 894 int i; 895 for ( i=0; rs.sr_ctrls[i]; i++ ) { 896 if ( rs.sr_ctrls[i] == ctrls[0] ) { 897 op->o_tmpfree( ctrls[0]->ldctl_value.bv_val, op->o_tmpmemctx ); 898 ctrls[0]->ldctl_value.bv_val = NULL; 899 break; 900 } 901 } 902 rs.sr_ctrls = NULL; 903 } 904 905 return rs.sr_err; 906} 907 908static void 909syncprov_qstart( syncops *so ); 910 911/* Play back queued responses */ 912static int 913syncprov_qplay( Operation *op, syncops *so ) 914{ 915 slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key; 916 syncres *sr; 917 Entry *e; 918 opcookie opc; 919 int rc = 0; 920 921 opc.son = on; 922 923 do { 924 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 925 sr = so->s_res; 926 if ( sr ) 927 so->s_res = sr->s_next; 928 if ( !so->s_res ) 929 so->s_restail = NULL; 930 /* Exit loop with mutex held */ 931 if ( !sr || so->s_op->o_abandon ) 932 break; 933 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 934 935 if ( sr->s_mode == LDAP_SYNC_NEW_COOKIE ) { 936 SlapReply rs = { REP_INTERMEDIATE }; 937 938 rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE, 939 &sr->s_csn, 0, NULL, 0 ); 940 } else { 941 opc.sdn = sr->s_dn; 942 opc.sndn = sr->s_ndn; 943 opc.suuid = sr->s_uuid; 944 opc.sctxcsn = sr->s_csn; 945 opc.sreference = sr->s_isreference; 946 opc.se = sr->s_e; 947 948 rc = syncprov_sendresp( op, &opc, so, sr->s_mode ); 949 950 } 951 if ( sr->s_e ) { 952 if ( !dec_mutexint( sr->s_e->e_private )) { 953 sr->s_e->e_private = NULL; 954 entry_free ( sr->s_e ); 955 } 956 } 957 958 ch_free( sr ); 959 960 /* Exit loop with mutex held */ 961 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 962 963 } while (0); 964 965 /* We now only send one change at a time, to prevent one 966 * psearch from hogging all the CPU. Resubmit this task if 967 * there are more responses queued and no errors occurred. 968 */ 969 970 if ( rc == 0 && so->s_res ) { 971 syncprov_qstart( so ); 972 } else { 973 so->s_flags ^= PS_TASK_QUEUED; 974 } 975 976 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 977 return rc; 978} 979 980/* task for playing back queued responses */ 981static void * 982syncprov_qtask( void *ctx, void *arg ) 983{ 984 syncops *so = arg; 985 OperationBuffer opbuf; 986 Operation *op; 987 BackendDB be; 988 int rc; 989 990 op = &opbuf.ob_op; 991 *op = *so->s_op; 992 op->o_hdr = &opbuf.ob_hdr; 993 op->o_controls = opbuf.ob_controls; 994 memset( op->o_controls, 0, sizeof(opbuf.ob_controls) ); 995 996 *op->o_hdr = *so->s_op->o_hdr; 997 998 op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx, 1); 999 op->o_tmpmfuncs = &slap_sl_mfuncs; 1000 op->o_threadctx = ctx; 1001 1002 /* syncprov_qplay expects a fake db */ 1003 be = *so->s_op->o_bd; 1004 be.be_flags |= SLAP_DBFLAG_OVERLAY; 1005 op->o_bd = &be; 1006 LDAP_SLIST_FIRST(&op->o_extra) = NULL; 1007 op->o_callback = NULL; 1008 1009 rc = syncprov_qplay( op, so ); 1010 1011 /* decrement use count... */ 1012 syncprov_free_syncop( so ); 1013 1014 return NULL; 1015} 1016 1017/* Start the task to play back queued psearch responses */ 1018static void 1019syncprov_qstart( syncops *so ) 1020{ 1021 so->s_flags |= PS_TASK_QUEUED; 1022 so->s_inuse++; 1023 ldap_pvt_thread_pool_submit( &connection_pool, 1024 syncprov_qtask, so ); 1025} 1026 1027/* Queue a persistent search response */ 1028static int 1029syncprov_qresp( opcookie *opc, syncops *so, int mode ) 1030{ 1031 syncres *sr; 1032 int srsize; 1033 struct berval cookie = opc->sctxcsn; 1034 1035 if ( mode == LDAP_SYNC_NEW_COOKIE ) { 1036 syncprov_info_t *si = opc->son->on_bi.bi_private; 1037 1038 slap_compose_sync_cookie( NULL, &cookie, si->si_ctxcsn, 1039 so->s_rid, slap_serverID ? slap_serverID : -1); 1040 } 1041 1042 srsize = sizeof(syncres) + opc->suuid.bv_len + 1 + 1043 opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1; 1044 if ( cookie.bv_len ) 1045 srsize += cookie.bv_len + 1; 1046 sr = ch_malloc( srsize ); 1047 sr->s_next = NULL; 1048 sr->s_e = opc->se; 1049 /* bump refcount on this entry */ 1050 if ( opc->se ) 1051 inc_mutexint( opc->se->e_private ); 1052 sr->s_dn.bv_val = (char *)(sr + 1); 1053 sr->s_dn.bv_len = opc->sdn.bv_len; 1054 sr->s_mode = mode; 1055 sr->s_isreference = opc->sreference; 1056 sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, 1057 opc->sdn.bv_val ) + 1; 1058 sr->s_ndn.bv_len = opc->sndn.bv_len; 1059 sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, 1060 opc->sndn.bv_val ) + 1; 1061 sr->s_uuid.bv_len = opc->suuid.bv_len; 1062 AC_MEMCPY( sr->s_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); 1063 if ( cookie.bv_len ) { 1064 sr->s_csn.bv_val = sr->s_uuid.bv_val + sr->s_uuid.bv_len + 1; 1065 strcpy( sr->s_csn.bv_val, cookie.bv_val ); 1066 } else { 1067 sr->s_csn.bv_val = NULL; 1068 } 1069 sr->s_csn.bv_len = cookie.bv_len; 1070 1071 if ( mode == LDAP_SYNC_NEW_COOKIE && cookie.bv_val ) { 1072 ch_free( cookie.bv_val ); 1073 } 1074 1075 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 1076 if ( !so->s_res ) { 1077 so->s_res = sr; 1078 } else { 1079 so->s_restail->s_next = sr; 1080 } 1081 so->s_restail = sr; 1082 1083 /* If the base of the psearch was modified, check it next time round */ 1084 if ( so->s_flags & PS_WROTE_BASE ) { 1085 so->s_flags ^= PS_WROTE_BASE; 1086 so->s_flags |= PS_FIND_BASE; 1087 } 1088 if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) { 1089 syncprov_qstart( so ); 1090 } 1091 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 1092 return LDAP_SUCCESS; 1093} 1094 1095static int 1096syncprov_drop_psearch( syncops *so, int lock ) 1097{ 1098 if ( so->s_flags & PS_IS_DETACHED ) { 1099 if ( lock ) 1100 ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex ); 1101 so->s_op->o_conn->c_n_ops_executing--; 1102 so->s_op->o_conn->c_n_ops_completed++; 1103 LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, Operation, 1104 o_next ); 1105 if ( lock ) 1106 ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex ); 1107 } 1108 syncprov_free_syncop( so ); 1109 1110 return 0; 1111} 1112 1113static int 1114syncprov_ab_cleanup( Operation *op, SlapReply *rs ) 1115{ 1116 slap_callback *sc = op->o_callback; 1117 op->o_callback = sc->sc_next; 1118 syncprov_drop_psearch( op->o_callback->sc_private, 0 ); 1119 op->o_tmpfree( sc, op->o_tmpmemctx ); 1120 return 0; 1121} 1122 1123static int 1124syncprov_op_abandon( Operation *op, SlapReply *rs ) 1125{ 1126 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1127 syncprov_info_t *si = on->on_bi.bi_private; 1128 syncops *so, *soprev; 1129 1130 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1131 for ( so=si->si_ops, soprev = (syncops *)&si->si_ops; so; 1132 soprev=so, so=so->s_next ) { 1133 if ( so->s_op->o_connid == op->o_connid && 1134 so->s_op->o_msgid == op->orn_msgid ) { 1135 so->s_op->o_abandon = 1; 1136 soprev->s_next = so->s_next; 1137 break; 1138 } 1139 } 1140 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1141 if ( so ) { 1142 /* Is this really a Cancel exop? */ 1143 if ( op->o_tag != LDAP_REQ_ABANDON ) { 1144 so->s_op->o_cancel = SLAP_CANCEL_ACK; 1145 rs->sr_err = LDAP_CANCELLED; 1146 send_ldap_result( so->s_op, rs ); 1147 if ( so->s_flags & PS_IS_DETACHED ) { 1148 slap_callback *cb; 1149 cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx ); 1150 cb->sc_cleanup = syncprov_ab_cleanup; 1151 cb->sc_next = op->o_callback; 1152 cb->sc_private = so; 1153 return SLAP_CB_CONTINUE; 1154 } 1155 } 1156 syncprov_drop_psearch( so, 0 ); 1157 } 1158 return SLAP_CB_CONTINUE; 1159} 1160 1161/* Find which persistent searches are affected by this operation */ 1162static void 1163syncprov_matchops( Operation *op, opcookie *opc, int saveit ) 1164{ 1165 slap_overinst *on = opc->son; 1166 syncprov_info_t *si = on->on_bi.bi_private; 1167 1168 fbase_cookie fc; 1169 syncops *ss, *sprev, *snext; 1170 Entry *e = NULL; 1171 Attribute *a; 1172 int rc; 1173 struct berval newdn; 1174 int freefdn = 0; 1175 BackendDB *b0 = op->o_bd, db; 1176 1177 fc.fdn = &op->o_req_ndn; 1178 /* compute new DN */ 1179 if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) { 1180 struct berval pdn; 1181 if ( op->orr_nnewSup ) pdn = *op->orr_nnewSup; 1182 else dnParent( fc.fdn, &pdn ); 1183 build_new_dn( &newdn, &pdn, &op->orr_nnewrdn, op->o_tmpmemctx ); 1184 fc.fdn = &newdn; 1185 freefdn = 1; 1186 } 1187 if ( op->o_tag != LDAP_REQ_ADD ) { 1188 if ( !SLAP_ISOVERLAY( op->o_bd )) { 1189 db = *op->o_bd; 1190 op->o_bd = &db; 1191 } 1192 rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on ); 1193 /* If we're sending responses now, make a copy and unlock the DB */ 1194 if ( e && !saveit ) { 1195 if ( !opc->se ) { 1196 opc->se = entry_dup( e ); 1197 opc->se->e_private = get_mutexint(); 1198 } 1199 overlay_entry_release_ov( op, e, 0, on ); 1200 e = opc->se; 1201 } 1202 if ( rc ) { 1203 op->o_bd = b0; 1204 return; 1205 } 1206 } else { 1207 e = op->ora_e; 1208 if ( !saveit ) { 1209 if ( !opc->se ) { 1210 opc->se = entry_dup( e ); 1211 opc->se->e_private = get_mutexint(); 1212 } 1213 e = opc->se; 1214 } 1215 } 1216 1217 if ( saveit || op->o_tag == LDAP_REQ_ADD ) { 1218 ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx ); 1219 ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx ); 1220 opc->sreference = is_entry_referral( e ); 1221 a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID ); 1222 if ( a ) 1223 ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx ); 1224 } else if ( op->o_tag == LDAP_REQ_MODRDN && !saveit ) { 1225 op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx ); 1226 op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx ); 1227 ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx ); 1228 ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx ); 1229 } 1230 1231 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1232 for (ss = si->si_ops, sprev = (syncops *)&si->si_ops; ss; 1233 sprev = ss, ss=snext) 1234 { 1235 Operation op2; 1236 Opheader oh; 1237 syncmatches *sm; 1238 int found = 0; 1239 1240 snext = ss->s_next; 1241 if ( ss->s_op->o_abandon ) 1242 continue; 1243 1244 /* Don't send ops back to the originator */ 1245 if ( opc->osid > 0 && opc->osid == ss->s_sid ) { 1246 Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping original sid %03x\n", 1247 opc->osid, 0, 0 ); 1248 continue; 1249 } 1250 1251 /* Don't send ops back to the messenger */ 1252 if ( opc->rsid > 0 && opc->rsid == ss->s_sid ) { 1253 Debug( LDAP_DEBUG_SYNC, "syncprov_matchops: skipping relayed sid %03x\n", 1254 opc->rsid, 0, 0 ); 1255 continue; 1256 } 1257 1258 /* validate base */ 1259 fc.fss = ss; 1260 fc.fbase = 0; 1261 fc.fscope = 0; 1262 1263 /* If the base of the search is missing, signal a refresh */ 1264 rc = syncprov_findbase( op, &fc ); 1265 if ( rc != LDAP_SUCCESS ) { 1266 SlapReply rs = {REP_RESULT}; 1267 send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED, 1268 "search base has changed" ); 1269 sprev->s_next = snext; 1270 syncprov_drop_psearch( ss, 1 ); 1271 ss = sprev; 1272 continue; 1273 } 1274 1275 /* If we're sending results now, look for this op in old matches */ 1276 if ( !saveit ) { 1277 syncmatches *old; 1278 1279 /* Did we modify the search base? */ 1280 if ( dn_match( &op->o_req_ndn, &ss->s_base )) { 1281 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1282 ss->s_flags |= PS_WROTE_BASE; 1283 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1284 } 1285 1286 for ( sm=opc->smatches, old=(syncmatches *)&opc->smatches; sm; 1287 old=sm, sm=sm->sm_next ) { 1288 if ( sm->sm_op == ss ) { 1289 found = 1; 1290 old->sm_next = sm->sm_next; 1291 op->o_tmpfree( sm, op->o_tmpmemctx ); 1292 break; 1293 } 1294 } 1295 } 1296 1297 if ( fc.fscope ) { 1298 op2 = *ss->s_op; 1299 oh = *op->o_hdr; 1300 oh.oh_conn = ss->s_op->o_conn; 1301 oh.oh_connid = ss->s_op->o_connid; 1302 op2.o_bd = op->o_bd->bd_self; 1303 op2.o_hdr = &oh; 1304 op2.o_extra = op->o_extra; 1305 op2.o_callback = NULL; 1306 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1307 if (ss->s_flags & PS_FIX_FILTER) { 1308 /* Skip the AND/GE clause that we stuck on in front. We 1309 would lose deletes/mods that happen during the refresh 1310 phase otherwise (ITS#6555) */ 1311 op2.ors_filter = ss->s_op->ors_filter->f_and->f_next; 1312 } 1313 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1314 rc = test_filter( &op2, e, op2.ors_filter ); 1315 } 1316 1317 Debug( LDAP_DEBUG_TRACE, "syncprov_matchops: sid %03x fscope %d rc %d\n", 1318 ss->s_sid, fc.fscope, rc ); 1319 1320 /* check if current o_req_dn is in scope and matches filter */ 1321 if ( fc.fscope && rc == LDAP_COMPARE_TRUE ) { 1322 if ( saveit ) { 1323 sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx ); 1324 sm->sm_next = opc->smatches; 1325 sm->sm_op = ss; 1326 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1327 ++ss->s_inuse; 1328 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1329 opc->smatches = sm; 1330 } else { 1331 /* if found send UPDATE else send ADD */ 1332 syncprov_qresp( opc, ss, 1333 found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD ); 1334 } 1335 } else if ( !saveit && found ) { 1336 /* send DELETE */ 1337 syncprov_qresp( opc, ss, LDAP_SYNC_DELETE ); 1338 } else if ( !saveit ) { 1339 syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE ); 1340 } 1341 if ( !saveit && found ) { 1342 /* Decrement s_inuse, was incremented when called 1343 * with saveit == TRUE 1344 */ 1345 syncprov_free_syncop( ss ); 1346 } 1347 } 1348 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1349 1350 if ( op->o_tag != LDAP_REQ_ADD && e ) { 1351 if ( !SLAP_ISOVERLAY( op->o_bd )) { 1352 op->o_bd = &db; 1353 } 1354 if ( saveit ) 1355 overlay_entry_release_ov( op, e, 0, on ); 1356 op->o_bd = b0; 1357 } 1358 if ( opc->se && !saveit ) { 1359 if ( !dec_mutexint( opc->se->e_private )) { 1360 opc->se->e_private = NULL; 1361 entry_free( opc->se ); 1362 opc->se = NULL; 1363 } 1364 } 1365 if ( freefdn ) { 1366 op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx ); 1367 } 1368 op->o_bd = b0; 1369} 1370 1371static int 1372syncprov_op_cleanup( Operation *op, SlapReply *rs ) 1373{ 1374 slap_callback *cb = op->o_callback; 1375 opcookie *opc = cb->sc_private; 1376 slap_overinst *on = opc->son; 1377 syncprov_info_t *si = on->on_bi.bi_private; 1378 syncmatches *sm, *snext; 1379 modtarget *mt, mtdummy; 1380 1381 for (sm = opc->smatches; sm; sm=snext) { 1382 snext = sm->sm_next; 1383 syncprov_free_syncop( sm->sm_op ); 1384 op->o_tmpfree( sm, op->o_tmpmemctx ); 1385 } 1386 1387 /* Remove op from lock table */ 1388 mt = opc->smt; 1389 if ( mt ) { 1390 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 1391 mt->mt_mods = mt->mt_mods->mi_next; 1392 /* If there are more, promote the next one */ 1393 if ( mt->mt_mods ) { 1394 mt->mt_op = mt->mt_mods->mi_op; 1395 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 1396 } else { 1397 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 1398 ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); 1399 avl_delete( &si->si_mods, mt, sp_avl_cmp ); 1400 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 1401 ldap_pvt_thread_mutex_destroy( &mt->mt_mutex ); 1402 ch_free( mt ); 1403 } 1404 } 1405 if ( !BER_BVISNULL( &opc->suuid )) 1406 op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx ); 1407 if ( !BER_BVISNULL( &opc->sndn )) 1408 op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx ); 1409 if ( !BER_BVISNULL( &opc->sdn )) 1410 op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx ); 1411 op->o_callback = cb->sc_next; 1412 op->o_tmpfree(cb, op->o_tmpmemctx); 1413 1414 return 0; 1415} 1416 1417static void 1418syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on ) 1419{ 1420 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 1421 Modifications mod; 1422 Operation opm; 1423 SlapReply rsm = { 0 }; 1424 slap_callback cb = {0}; 1425 BackendDB be; 1426 BackendInfo *bi; 1427 1428#ifdef CHECK_CSN 1429 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 1430 1431 int i; 1432 for ( i=0; i<si->si_numcsns; i++ ) { 1433 assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i )); 1434 } 1435#endif 1436 mod.sml_numvals = si->si_numcsns; 1437 mod.sml_values = si->si_ctxcsn; 1438 mod.sml_nvalues = NULL; 1439 mod.sml_desc = slap_schema.si_ad_contextCSN; 1440 mod.sml_op = LDAP_MOD_REPLACE; 1441 mod.sml_flags = SLAP_MOD_INTERNAL; 1442 mod.sml_next = NULL; 1443 1444 cb.sc_response = slap_null_cb; 1445 opm = *op; 1446 opm.o_tag = LDAP_REQ_MODIFY; 1447 opm.o_callback = &cb; 1448 opm.orm_modlist = &mod; 1449 opm.orm_no_opattrs = 1; 1450 if ( SLAP_GLUE_SUBORDINATE( op->o_bd )) { 1451 be = *on->on_info->oi_origdb; 1452 opm.o_bd = &be; 1453 } 1454 opm.o_req_dn = si->si_contextdn; 1455 opm.o_req_ndn = si->si_contextdn; 1456 bi = opm.o_bd->bd_info; 1457 opm.o_bd->bd_info = on->on_info->oi_orig; 1458 opm.o_managedsait = SLAP_CONTROL_NONCRITICAL; 1459 opm.o_no_schema_check = 1; 1460 opm.o_bd->be_modify( &opm, &rsm ); 1461 1462 if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT && 1463 SLAP_SYNC_SUBENTRY( opm.o_bd )) { 1464 const char *text; 1465 char txtbuf[SLAP_TEXT_BUFLEN]; 1466 size_t textlen = sizeof txtbuf; 1467 Entry *e = slap_create_context_csn_entry( opm.o_bd, NULL ); 1468 slap_mods2entry( &mod, &e, 0, 1, &text, txtbuf, textlen); 1469 opm.ora_e = e; 1470 opm.o_bd->be_add( &opm, &rsm ); 1471 if ( e == opm.ora_e ) 1472 be_entry_release_w( &opm, opm.ora_e ); 1473 } 1474 opm.o_bd->bd_info = bi; 1475 1476 if ( mod.sml_next != NULL ) { 1477 slap_mods_free( mod.sml_next, 1 ); 1478 } 1479#ifdef CHECK_CSN 1480 for ( i=0; i<si->si_numcsns; i++ ) { 1481 assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i )); 1482 } 1483#endif 1484} 1485 1486static void 1487syncprov_add_slog( Operation *op ) 1488{ 1489 opcookie *opc = op->o_callback->sc_private; 1490 slap_overinst *on = opc->son; 1491 syncprov_info_t *si = on->on_bi.bi_private; 1492 sessionlog *sl; 1493 slog_entry *se; 1494 1495 sl = si->si_logs; 1496 { 1497 /* Allocate a record. UUIDs are not NUL-terminated. */ 1498 se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + 1499 op->o_csn.bv_len + 1 ); 1500 se->se_next = NULL; 1501 se->se_tag = op->o_tag; 1502 1503 se->se_uuid.bv_val = (char *)(&se[1]); 1504 AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); 1505 se->se_uuid.bv_len = opc->suuid.bv_len; 1506 1507 se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len; 1508 AC_MEMCPY( se->se_csn.bv_val, op->o_csn.bv_val, op->o_csn.bv_len ); 1509 se->se_csn.bv_val[op->o_csn.bv_len] = '\0'; 1510 se->se_csn.bv_len = op->o_csn.bv_len; 1511 se->se_sid = slap_parse_csn_sid( &se->se_csn ); 1512 1513 ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); 1514 if ( sl->sl_head ) { 1515 sl->sl_tail->se_next = se; 1516 } else { 1517 sl->sl_head = se; 1518 } 1519 sl->sl_tail = se; 1520 sl->sl_num++; 1521 while ( sl->sl_num > sl->sl_size ) { 1522 se = sl->sl_head; 1523 sl->sl_head = se->se_next; 1524 strcpy( sl->sl_mincsn.bv_val, se->se_csn.bv_val ); 1525 sl->sl_mincsn.bv_len = se->se_csn.bv_len; 1526 ch_free( se ); 1527 sl->sl_num--; 1528 } 1529 ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); 1530 } 1531} 1532 1533/* Just set a flag if we found the matching entry */ 1534static int 1535playlog_cb( Operation *op, SlapReply *rs ) 1536{ 1537 if ( rs->sr_type == REP_SEARCH ) { 1538 op->o_callback->sc_private = (void *)1; 1539 } 1540 return rs->sr_err; 1541} 1542 1543/* enter with sl->sl_mutex locked, release before returning */ 1544static void 1545syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, 1546 sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids ) 1547{ 1548 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1549 slog_entry *se; 1550 int i, j, ndel, num, nmods, mmods; 1551 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; 1552 BerVarray uuids; 1553 struct berval delcsn[2]; 1554 1555 if ( !sl->sl_num ) { 1556 ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); 1557 return; 1558 } 1559 1560 num = sl->sl_num; 1561 i = 0; 1562 nmods = 0; 1563 1564 uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) + 1565 num * UUID_LEN, op->o_tmpmemctx ); 1566 uuids[0].bv_val = (char *)(uuids + num + 1); 1567 1568 delcsn[0].bv_len = 0; 1569 delcsn[0].bv_val = cbuf; 1570 BER_BVZERO(&delcsn[1]); 1571 1572 /* Make a copy of the relevant UUIDs. Put the Deletes up front 1573 * and everything else at the end. Do this first so we can 1574 * unlock the list mutex. 1575 */ 1576 Debug( LDAP_DEBUG_SYNC, "srs csn %s\n", 1577 srs->sr_state.ctxcsn[0].bv_val, 0, 0 ); 1578 for ( se=sl->sl_head; se; se=se->se_next ) { 1579 int k; 1580 Debug( LDAP_DEBUG_SYNC, "log csn %s\n", se->se_csn.bv_val, 0, 0 ); 1581 ndel = 1; 1582 for ( k=0; k<srs->sr_state.numcsns; k++ ) { 1583 if ( se->se_sid == srs->sr_state.sids[k] ) { 1584 ndel = ber_bvcmp( &se->se_csn, &srs->sr_state.ctxcsn[k] ); 1585 break; 1586 } 1587 } 1588 if ( ndel <= 0 ) { 1589 Debug( LDAP_DEBUG_SYNC, "cmp %d, too old\n", ndel, 0, 0 ); 1590 continue; 1591 } 1592 ndel = 0; 1593 for ( k=0; k<numcsns; k++ ) { 1594 if ( se->se_sid == sids[k] ) { 1595 ndel = ber_bvcmp( &se->se_csn, &ctxcsn[k] ); 1596 break; 1597 } 1598 } 1599 if ( ndel > 0 ) { 1600 Debug( LDAP_DEBUG_SYNC, "cmp %d, too new\n", ndel, 0, 0 ); 1601 break; 1602 } 1603 if ( se->se_tag == LDAP_REQ_DELETE ) { 1604 j = i; 1605 i++; 1606 AC_MEMCPY( cbuf, se->se_csn.bv_val, se->se_csn.bv_len ); 1607 delcsn[0].bv_len = se->se_csn.bv_len; 1608 delcsn[0].bv_val[delcsn[0].bv_len] = '\0'; 1609 } else { 1610 nmods++; 1611 j = num - nmods; 1612 } 1613 uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN); 1614 AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN); 1615 uuids[j].bv_len = UUID_LEN; 1616 } 1617 ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); 1618 1619 ndel = i; 1620 1621 /* Zero out unused slots */ 1622 for ( i=ndel; i < num - nmods; i++ ) 1623 uuids[i].bv_len = 0; 1624 1625 /* Mods must be validated to see if they belong in this delete set. 1626 */ 1627 1628 mmods = nmods; 1629 /* Strip any duplicates */ 1630 for ( i=0; i<nmods; i++ ) { 1631 for ( j=0; j<ndel; j++ ) { 1632 if ( bvmatch( &uuids[j], &uuids[num - 1 - i] )) { 1633 uuids[num - 1 - i].bv_len = 0; 1634 mmods --; 1635 break; 1636 } 1637 } 1638 if ( uuids[num - 1 - i].bv_len == 0 ) continue; 1639 for ( j=0; j<i; j++ ) { 1640 if ( bvmatch( &uuids[num - 1 - j], &uuids[num - 1 - i] )) { 1641 uuids[num - 1 - i].bv_len = 0; 1642 mmods --; 1643 break; 1644 } 1645 } 1646 } 1647 1648 if ( mmods ) { 1649 Operation fop; 1650 SlapReply frs = { REP_RESULT }; 1651 int rc; 1652 Filter mf, af; 1653 AttributeAssertion eq = ATTRIBUTEASSERTION_INIT; 1654 slap_callback cb = {0}; 1655 1656 fop = *op; 1657 1658 fop.o_sync_mode = 0; 1659 fop.o_callback = &cb; 1660 fop.ors_limit = NULL; 1661 fop.ors_tlimit = SLAP_NO_LIMIT; 1662 fop.ors_attrs = slap_anlist_all_attributes; 1663 fop.ors_attrsonly = 0; 1664 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 1665 1666 af.f_choice = LDAP_FILTER_AND; 1667 af.f_next = NULL; 1668 af.f_and = &mf; 1669 mf.f_choice = LDAP_FILTER_EQUALITY; 1670 mf.f_ava = &eq; 1671 mf.f_av_desc = slap_schema.si_ad_entryUUID; 1672 mf.f_next = fop.ors_filter; 1673 1674 fop.ors_filter = ⁡ 1675 1676 cb.sc_response = playlog_cb; 1677 fop.o_bd->bd_info = (BackendInfo *)on->on_info; 1678 1679 for ( i=ndel; i<num; i++ ) { 1680 if ( uuids[i].bv_len == 0 ) continue; 1681 1682 mf.f_av_value = uuids[i]; 1683 cb.sc_private = NULL; 1684 fop.ors_slimit = 1; 1685 frs.sr_nentries = 0; 1686 rc = fop.o_bd->be_search( &fop, &frs ); 1687 1688 /* If entry was not found, add to delete list */ 1689 if ( !cb.sc_private ) { 1690 uuids[ndel++] = uuids[i]; 1691 } 1692 } 1693 fop.o_bd->bd_info = (BackendInfo *)on; 1694 } 1695 if ( ndel ) { 1696 struct berval cookie; 1697 1698 if ( delcsn[0].bv_len ) { 1699 slap_compose_sync_cookie( op, &cookie, delcsn, srs->sr_state.rid, 1700 slap_serverID ? slap_serverID : -1 ); 1701 1702 Debug( LDAP_DEBUG_SYNC, "syncprov_playlog: cookie=%s\n", cookie.bv_val, 0, 0 ); 1703 } 1704 1705 uuids[ndel].bv_val = NULL; 1706 syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, 1707 delcsn[0].bv_len ? &cookie : NULL, 0, uuids, 1 ); 1708 if ( delcsn[0].bv_len ) { 1709 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 1710 } 1711 } 1712 op->o_tmpfree( uuids, op->o_tmpmemctx ); 1713} 1714 1715static int 1716syncprov_op_response( Operation *op, SlapReply *rs ) 1717{ 1718 opcookie *opc = op->o_callback->sc_private; 1719 slap_overinst *on = opc->son; 1720 syncprov_info_t *si = on->on_bi.bi_private; 1721 syncmatches *sm; 1722 1723 if ( rs->sr_err == LDAP_SUCCESS ) 1724 { 1725 struct berval maxcsn; 1726 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; 1727 int do_check = 0, have_psearches, foundit, csn_changed = 0; 1728 1729 ldap_pvt_thread_mutex_lock( &si->si_resp_mutex ); 1730 1731 /* Update our context CSN */ 1732 cbuf[0] = '\0'; 1733 maxcsn.bv_val = cbuf; 1734 maxcsn.bv_len = sizeof(cbuf); 1735 ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock ); 1736 1737 slap_get_commit_csn( op, &maxcsn, &foundit ); 1738 if ( BER_BVISEMPTY( &maxcsn ) && SLAP_GLUE_SUBORDINATE( op->o_bd )) { 1739 /* syncrepl queues the CSN values in the db where 1740 * it is configured , not where the changes are made. 1741 * So look for a value in the glue db if we didn't 1742 * find any in this db. 1743 */ 1744 BackendDB *be = op->o_bd; 1745 op->o_bd = select_backend( &be->be_nsuffix[0], 1); 1746 maxcsn.bv_val = cbuf; 1747 maxcsn.bv_len = sizeof(cbuf); 1748 slap_get_commit_csn( op, &maxcsn, &foundit ); 1749 op->o_bd = be; 1750 } 1751 if ( !BER_BVISEMPTY( &maxcsn ) ) { 1752 int i, sid; 1753#ifdef CHECK_CSN 1754 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 1755 assert( !syn->ssyn_validate( syn, &maxcsn )); 1756#endif 1757 sid = slap_parse_csn_sid( &maxcsn ); 1758 for ( i=0; i<si->si_numcsns; i++ ) { 1759 if ( sid == si->si_sids[i] ) { 1760 if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn[i] ) > 0 ) { 1761 ber_bvreplace( &si->si_ctxcsn[i], &maxcsn ); 1762 csn_changed = 1; 1763 } 1764 break; 1765 } 1766 } 1767 /* It's a new SID for us */ 1768 if ( i == si->si_numcsns ) { 1769 value_add_one( &si->si_ctxcsn, &maxcsn ); 1770 csn_changed = 1; 1771 si->si_numcsns++; 1772 si->si_sids = ch_realloc( si->si_sids, si->si_numcsns * 1773 sizeof(int)); 1774 si->si_sids[i] = sid; 1775 } 1776 } 1777 1778 /* Don't do any processing for consumer contextCSN updates */ 1779 if ( op->o_dont_replicate ) { 1780 if ( op->o_tag == LDAP_REQ_MODIFY && 1781 op->orm_modlist->sml_op == LDAP_MOD_REPLACE && 1782 op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) { 1783 /* Catch contextCSN updates from syncrepl. We have to look at 1784 * all the attribute values, as there may be more than one csn 1785 * that changed, and only one can be passed in the csn queue. 1786 */ 1787 Modifications *mod = op->orm_modlist; 1788 int i, j, sid; 1789 1790 for ( i=0; i<mod->sml_numvals; i++ ) { 1791 sid = slap_parse_csn_sid( &mod->sml_values[i] ); 1792 for ( j=0; j<si->si_numcsns; j++ ) { 1793 if ( sid == si->si_sids[j] ) { 1794 if ( ber_bvcmp( &mod->sml_values[i], &si->si_ctxcsn[j] ) > 0 ) { 1795 ber_bvreplace( &si->si_ctxcsn[j], &mod->sml_values[i] ); 1796 csn_changed = 1; 1797 } 1798 break; 1799 } 1800 } 1801 1802 if ( j == si->si_numcsns ) { 1803 value_add_one( &si->si_ctxcsn, &mod->sml_values[i] ); 1804 si->si_numcsns++; 1805 si->si_sids = ch_realloc( si->si_sids, si->si_numcsns * 1806 sizeof(int)); 1807 si->si_sids[j] = sid; 1808 csn_changed = 1; 1809 } 1810 } 1811 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 1812 1813 if ( csn_changed ) { 1814 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1815 have_psearches = ( si->si_ops != NULL ); 1816 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1817 1818 if ( have_psearches ) { 1819 for ( sm = opc->smatches; sm; sm=sm->sm_next ) { 1820 if ( sm->sm_op->s_op->o_abandon ) 1821 continue; 1822 syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_NEW_COOKIE ); 1823 } 1824 } 1825 } 1826 } else { 1827 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 1828 } 1829 goto leave; 1830 } 1831 1832 si->si_numops++; 1833 if ( si->si_chkops || si->si_chktime ) { 1834 /* Never checkpoint adding the context entry, 1835 * it will deadlock 1836 */ 1837 if ( op->o_tag != LDAP_REQ_ADD || 1838 !dn_match( &op->o_req_ndn, &si->si_contextdn )) { 1839 if ( si->si_chkops && si->si_numops >= si->si_chkops ) { 1840 do_check = 1; 1841 si->si_numops = 0; 1842 } 1843 if ( si->si_chktime && 1844 (op->o_time - si->si_chklast >= si->si_chktime )) { 1845 if ( si->si_chklast ) { 1846 do_check = 1; 1847 si->si_chklast = op->o_time; 1848 } else { 1849 si->si_chklast = 1; 1850 } 1851 } 1852 } 1853 } 1854 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 1855 1856 if ( do_check ) { 1857 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 1858 syncprov_checkpoint( op, rs, on ); 1859 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 1860 } 1861 1862 /* only update consumer ctx if this is a newer csn */ 1863 if ( csn_changed ) { 1864 opc->sctxcsn = maxcsn; 1865 } 1866 1867 /* Handle any persistent searches */ 1868 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1869 have_psearches = ( si->si_ops != NULL ); 1870 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1871 if ( have_psearches ) { 1872 switch(op->o_tag) { 1873 case LDAP_REQ_ADD: 1874 case LDAP_REQ_MODIFY: 1875 case LDAP_REQ_MODRDN: 1876 case LDAP_REQ_EXTENDED: 1877 syncprov_matchops( op, opc, 0 ); 1878 break; 1879 case LDAP_REQ_DELETE: 1880 /* for each match in opc->smatches: 1881 * send DELETE msg 1882 */ 1883 for ( sm = opc->smatches; sm; sm=sm->sm_next ) { 1884 if ( sm->sm_op->s_op->o_abandon ) 1885 continue; 1886 syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE ); 1887 } 1888 break; 1889 } 1890 } 1891 1892 /* Add any log records */ 1893 if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) { 1894 syncprov_add_slog( op ); 1895 } 1896leave: ldap_pvt_thread_mutex_unlock( &si->si_resp_mutex ); 1897 } 1898 return SLAP_CB_CONTINUE; 1899} 1900 1901/* We don't use a subentry to store the context CSN any more. 1902 * We expose the current context CSN as an operational attribute 1903 * of the suffix entry. 1904 */ 1905static int 1906syncprov_op_compare( Operation *op, SlapReply *rs ) 1907{ 1908 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1909 syncprov_info_t *si = on->on_bi.bi_private; 1910 int rc = SLAP_CB_CONTINUE; 1911 1912 if ( dn_match( &op->o_req_ndn, &si->si_contextdn ) && 1913 op->oq_compare.rs_ava->aa_desc == slap_schema.si_ad_contextCSN ) 1914 { 1915 Entry e = {0}; 1916 Attribute a = {0}; 1917 1918 e.e_name = si->si_contextdn; 1919 e.e_nname = si->si_contextdn; 1920 e.e_attrs = &a; 1921 1922 a.a_desc = slap_schema.si_ad_contextCSN; 1923 1924 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 1925 1926 a.a_vals = si->si_ctxcsn; 1927 a.a_nvals = a.a_vals; 1928 a.a_numvals = si->si_numcsns; 1929 1930 rs->sr_err = access_allowed( op, &e, op->oq_compare.rs_ava->aa_desc, 1931 &op->oq_compare.rs_ava->aa_value, ACL_COMPARE, NULL ); 1932 if ( ! rs->sr_err ) { 1933 rs->sr_err = LDAP_INSUFFICIENT_ACCESS; 1934 goto return_results; 1935 } 1936 1937 if ( get_assert( op ) && 1938 ( test_filter( op, &e, get_assertion( op ) ) != LDAP_COMPARE_TRUE ) ) 1939 { 1940 rs->sr_err = LDAP_ASSERTION_FAILED; 1941 goto return_results; 1942 } 1943 1944 1945 rs->sr_err = LDAP_COMPARE_FALSE; 1946 1947 if ( attr_valfind( &a, 1948 SLAP_MR_ATTRIBUTE_VALUE_NORMALIZED_MATCH | 1949 SLAP_MR_ASSERTED_VALUE_NORMALIZED_MATCH, 1950 &op->oq_compare.rs_ava->aa_value, NULL, op->o_tmpmemctx ) == 0 ) 1951 { 1952 rs->sr_err = LDAP_COMPARE_TRUE; 1953 } 1954 1955return_results:; 1956 1957 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 1958 1959 send_ldap_result( op, rs ); 1960 1961 if( rs->sr_err == LDAP_COMPARE_FALSE || rs->sr_err == LDAP_COMPARE_TRUE ) { 1962 rs->sr_err = LDAP_SUCCESS; 1963 } 1964 rc = rs->sr_err; 1965 } 1966 1967 return rc; 1968} 1969 1970static int 1971syncprov_op_mod( Operation *op, SlapReply *rs ) 1972{ 1973 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1974 syncprov_info_t *si = on->on_bi.bi_private; 1975 slap_callback *cb; 1976 opcookie *opc; 1977 int have_psearches, cbsize; 1978 1979 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1980 have_psearches = ( si->si_ops != NULL ); 1981 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1982 1983 cbsize = sizeof(slap_callback) + sizeof(opcookie) + 1984 (have_psearches ? sizeof(modinst) : 0 ); 1985 1986 cb = op->o_tmpcalloc(1, cbsize, op->o_tmpmemctx); 1987 opc = (opcookie *)(cb+1); 1988 opc->son = on; 1989 cb->sc_response = syncprov_op_response; 1990 cb->sc_cleanup = syncprov_op_cleanup; 1991 cb->sc_private = opc; 1992 cb->sc_next = op->o_callback; 1993 op->o_callback = cb; 1994 1995 opc->osid = -1; 1996 opc->rsid = -1; 1997 if ( op->o_csn.bv_val ) { 1998 opc->osid = slap_parse_csn_sid( &op->o_csn ); 1999 } 2000 if ( op->o_controls ) { 2001 struct sync_cookie *scook = 2002 op->o_controls[slap_cids.sc_LDAPsync]; 2003 if ( scook ) 2004 opc->rsid = scook->sid; 2005 } 2006 2007 /* If there are active persistent searches, lock this operation. 2008 * See seqmod.c for the locking logic on its own. 2009 */ 2010 if ( have_psearches ) { 2011 modtarget *mt, mtdummy; 2012 modinst *mi; 2013 2014 mi = (modinst *)(opc+1); 2015 mi->mi_op = op; 2016 2017 /* See if we're already modifying this entry... */ 2018 mtdummy.mt_op = op; 2019 ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); 2020 mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); 2021 if ( mt ) { 2022 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 2023 if ( mt->mt_mods == NULL ) { 2024 /* Cannot reuse this mt, as another thread is about 2025 * to release it in syncprov_op_cleanup. 2026 */ 2027 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2028 mt = NULL; 2029 } 2030 } 2031 if ( mt ) { 2032 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 2033 mt->mt_tail->mi_next = mi; 2034 mt->mt_tail = mi; 2035 /* wait for this op to get to head of list */ 2036 while ( mt->mt_mods != mi ) { 2037 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2038 /* FIXME: if dynamic config can delete overlays or 2039 * databases we'll have to check for cleanup here. 2040 * Currently it's not an issue because there are 2041 * no dynamic config deletes... 2042 */ 2043 if ( slapd_shutdown ) 2044 return SLAPD_ABANDON; 2045 2046 if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool )) 2047 ldap_pvt_thread_yield(); 2048 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 2049 2050 /* clean up if the caller is giving up */ 2051 if ( op->o_abandon ) { 2052 modinst *m2; 2053 for ( m2 = mt->mt_mods; m2->mi_next != mi; 2054 m2 = m2->mi_next ); 2055 m2->mi_next = mi->mi_next; 2056 if ( mt->mt_tail == mi ) mt->mt_tail = m2; 2057 op->o_tmpfree( cb, op->o_tmpmemctx ); 2058 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2059 return SLAPD_ABANDON; 2060 } 2061 } 2062 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2063 } else { 2064 /* Record that we're modifying this entry now */ 2065 mt = ch_malloc( sizeof(modtarget) ); 2066 mt->mt_mods = mi; 2067 mt->mt_tail = mi; 2068 mt->mt_op = mi->mi_op; 2069 ldap_pvt_thread_mutex_init( &mt->mt_mutex ); 2070 avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error ); 2071 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 2072 } 2073 opc->smt = mt; 2074 } 2075 2076 if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD ) 2077 syncprov_matchops( op, opc, 1 ); 2078 2079 return SLAP_CB_CONTINUE; 2080} 2081 2082static int 2083syncprov_op_extended( Operation *op, SlapReply *rs ) 2084{ 2085 if ( exop_is_write( op )) 2086 return syncprov_op_mod( op, rs ); 2087 2088 return SLAP_CB_CONTINUE; 2089} 2090 2091typedef struct searchstate { 2092 slap_overinst *ss_on; 2093 syncops *ss_so; 2094 BerVarray ss_ctxcsn; 2095 int *ss_sids; 2096 int ss_numcsns; 2097#define SS_PRESENT 0x01 2098#define SS_CHANGED 0x02 2099 int ss_flags; 2100} searchstate; 2101 2102static int 2103syncprov_search_cleanup( Operation *op, SlapReply *rs ) 2104{ 2105 if ( rs->sr_ctrls ) { 2106 op->o_tmpfree( rs->sr_ctrls[0], op->o_tmpmemctx ); 2107 op->o_tmpfree( rs->sr_ctrls, op->o_tmpmemctx ); 2108 rs->sr_ctrls = NULL; 2109 } 2110 return 0; 2111} 2112 2113typedef struct SyncOperationBuffer { 2114 Operation sob_op; 2115 Opheader sob_hdr; 2116 OpExtra sob_oe; 2117 AttributeName sob_extra; /* not always present */ 2118 /* Further data allocated here */ 2119} SyncOperationBuffer; 2120 2121static void 2122syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on ) 2123{ 2124 SyncOperationBuffer *sopbuf2; 2125 Operation *op2; 2126 int i, alen = 0; 2127 size_t size; 2128 char *ptr; 2129 GroupAssertion *g1, *g2; 2130 2131 /* count the search attrs */ 2132 for (i=0; op->ors_attrs && !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) { 2133 alen += op->ors_attrs[i].an_name.bv_len + 1; 2134 } 2135 /* Make a new copy of the operation */ 2136 size = offsetof( SyncOperationBuffer, sob_extra ) + 2137 (i ? ( (i+1) * sizeof(AttributeName) + alen) : 0) + 2138 op->o_req_dn.bv_len + 1 + 2139 op->o_req_ndn.bv_len + 1 + 2140 op->o_ndn.bv_len + 1 + 2141 so->s_filterstr.bv_len + 1; 2142 sopbuf2 = ch_calloc( 1, size ); 2143 op2 = &sopbuf2->sob_op; 2144 op2->o_hdr = &sopbuf2->sob_hdr; 2145 LDAP_SLIST_FIRST(&op2->o_extra) = &sopbuf2->sob_oe; 2146 2147 /* Copy the fields we care about explicitly, leave the rest alone */ 2148 *op2->o_hdr = *op->o_hdr; 2149 op2->o_tag = op->o_tag; 2150 op2->o_time = op->o_time; 2151 op2->o_bd = on->on_info->oi_origdb; 2152 op2->o_request = op->o_request; 2153 op2->o_managedsait = op->o_managedsait; 2154 LDAP_SLIST_FIRST(&op2->o_extra)->oe_key = on; 2155 LDAP_SLIST_NEXT(LDAP_SLIST_FIRST(&op2->o_extra), oe_next) = NULL; 2156 2157 ptr = (char *) sopbuf2 + offsetof( SyncOperationBuffer, sob_extra ); 2158 if ( i ) { 2159 op2->ors_attrs = (AttributeName *) ptr; 2160 ptr = (char *) &op2->ors_attrs[i+1]; 2161 for (i=0; !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) { 2162 op2->ors_attrs[i] = op->ors_attrs[i]; 2163 op2->ors_attrs[i].an_name.bv_val = ptr; 2164 ptr = lutil_strcopy( ptr, op->ors_attrs[i].an_name.bv_val ) + 1; 2165 } 2166 BER_BVZERO( &op2->ors_attrs[i].an_name ); 2167 } 2168 2169 op2->o_authz = op->o_authz; 2170 op2->o_ndn.bv_val = ptr; 2171 ptr = lutil_strcopy(ptr, op->o_ndn.bv_val) + 1; 2172 op2->o_dn = op2->o_ndn; 2173 op2->o_req_dn.bv_len = op->o_req_dn.bv_len; 2174 op2->o_req_dn.bv_val = ptr; 2175 ptr = lutil_strcopy(ptr, op->o_req_dn.bv_val) + 1; 2176 op2->o_req_ndn.bv_len = op->o_req_ndn.bv_len; 2177 op2->o_req_ndn.bv_val = ptr; 2178 ptr = lutil_strcopy(ptr, op->o_req_ndn.bv_val) + 1; 2179 op2->ors_filterstr.bv_val = ptr; 2180 strcpy( ptr, so->s_filterstr.bv_val ); 2181 op2->ors_filterstr.bv_len = so->s_filterstr.bv_len; 2182 2183 /* Skip the AND/GE clause that we stuck on in front */ 2184 if ( so->s_flags & PS_FIX_FILTER ) { 2185 op2->ors_filter = op->ors_filter->f_and->f_next; 2186 so->s_flags ^= PS_FIX_FILTER; 2187 } else { 2188 op2->ors_filter = op->ors_filter; 2189 } 2190 op2->ors_filter = filter_dup( op2->ors_filter, NULL ); 2191 so->s_op = op2; 2192 2193 /* Copy any cached group ACLs individually */ 2194 op2->o_groups = NULL; 2195 for ( g1=op->o_groups; g1; g1=g1->ga_next ) { 2196 g2 = ch_malloc( sizeof(GroupAssertion) + g1->ga_len ); 2197 *g2 = *g1; 2198 strcpy( g2->ga_ndn, g1->ga_ndn ); 2199 g2->ga_next = op2->o_groups; 2200 op2->o_groups = g2; 2201 } 2202 /* Don't allow any further group caching */ 2203 op2->o_do_not_cache = 1; 2204 2205 /* Add op2 to conn so abandon will find us */ 2206 op->o_conn->c_n_ops_executing++; 2207 op->o_conn->c_n_ops_completed--; 2208 LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next ); 2209 so->s_flags |= PS_IS_DETACHED; 2210 2211 /* Prevent anyone else from trying to send a result for this op */ 2212 op->o_abandon = 1; 2213} 2214 2215static int 2216syncprov_search_response( Operation *op, SlapReply *rs ) 2217{ 2218 searchstate *ss = op->o_callback->sc_private; 2219 slap_overinst *on = ss->ss_on; 2220 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2221 sync_control *srs = op->o_controls[slap_cids.sc_LDAPsync]; 2222 2223 if ( rs->sr_type == REP_SEARCH || rs->sr_type == REP_SEARCHREF ) { 2224 Attribute *a; 2225 /* If we got a referral without a referral object, there's 2226 * something missing that we cannot replicate. Just ignore it. 2227 * The consumer will abort because we didn't send the expected 2228 * control. 2229 */ 2230 if ( !rs->sr_entry ) { 2231 assert( rs->sr_entry != NULL ); 2232 Debug( LDAP_DEBUG_ANY, "bogus referral in context\n",0,0,0 ); 2233 return SLAP_CB_CONTINUE; 2234 } 2235 a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN ); 2236 if ( a == NULL && rs->sr_operational_attrs != NULL ) { 2237 a = attr_find( rs->sr_operational_attrs, slap_schema.si_ad_entryCSN ); 2238 } 2239 if ( a ) { 2240 int i, sid; 2241 sid = slap_parse_csn_sid( &a->a_nvals[0] ); 2242 2243 /* Don't send changed entries back to the originator */ 2244 if ( sid == srs->sr_state.sid && srs->sr_state.numcsns ) { 2245 Debug( LDAP_DEBUG_SYNC, 2246 "Entry %s changed by peer, ignored\n", 2247 rs->sr_entry->e_name.bv_val, 0, 0 ); 2248 return LDAP_SUCCESS; 2249 } 2250 2251 /* If not a persistent search */ 2252 if ( !ss->ss_so ) { 2253 /* Make sure entry is less than the snapshot'd contextCSN */ 2254 for ( i=0; i<ss->ss_numcsns; i++ ) { 2255 if ( sid == ss->ss_sids[i] && ber_bvcmp( &a->a_nvals[0], 2256 &ss->ss_ctxcsn[i] ) > 0 ) { 2257 Debug( LDAP_DEBUG_SYNC, 2258 "Entry %s CSN %s greater than snapshot %s\n", 2259 rs->sr_entry->e_name.bv_val, 2260 a->a_nvals[0].bv_val, 2261 ss->ss_ctxcsn[i].bv_val ); 2262 return LDAP_SUCCESS; 2263 } 2264 } 2265 } 2266 2267 /* Don't send old entries twice */ 2268 if ( srs->sr_state.ctxcsn ) { 2269 for ( i=0; i<srs->sr_state.numcsns; i++ ) { 2270 if ( sid == srs->sr_state.sids[i] && 2271 ber_bvcmp( &a->a_nvals[0], 2272 &srs->sr_state.ctxcsn[i] )<= 0 ) { 2273 Debug( LDAP_DEBUG_SYNC, 2274 "Entry %s CSN %s older or equal to ctx %s\n", 2275 rs->sr_entry->e_name.bv_val, 2276 a->a_nvals[0].bv_val, 2277 srs->sr_state.ctxcsn[i].bv_val ); 2278 return LDAP_SUCCESS; 2279 } 2280 } 2281 } 2282 } 2283 rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, 2284 op->o_tmpmemctx ); 2285 rs->sr_ctrls[1] = NULL; 2286 /* If we're in delta-sync mode, always send a cookie */ 2287 if ( si->si_nopres && si->si_usehint && a ) { 2288 struct berval cookie; 2289 slap_compose_sync_cookie( op, &cookie, a->a_nvals, srs->sr_state.rid, slap_serverID ? slap_serverID : -1 ); 2290 rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry, 2291 LDAP_SYNC_ADD, rs->sr_ctrls, 0, 1, &cookie ); 2292 } else { 2293 rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry, 2294 LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL ); 2295 } 2296 } else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) { 2297 struct berval cookie = BER_BVNULL; 2298 2299 if ( ( ss->ss_flags & SS_CHANGED ) && 2300 ss->ss_ctxcsn && !BER_BVISNULL( &ss->ss_ctxcsn[0] )) { 2301 slap_compose_sync_cookie( op, &cookie, ss->ss_ctxcsn, 2302 srs->sr_state.rid, slap_serverID ? slap_serverID : -1 ); 2303 2304 Debug( LDAP_DEBUG_SYNC, "syncprov_search_response: cookie=%s\n", cookie.bv_val, 0, 0 ); 2305 } 2306 2307 /* Is this a regular refresh? 2308 * Note: refresh never gets here if there were no changes 2309 */ 2310 if ( !ss->ss_so ) { 2311 rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, 2312 op->o_tmpmemctx ); 2313 rs->sr_ctrls[1] = NULL; 2314 rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls, 2315 0, 1, &cookie, ( ss->ss_flags & SS_PRESENT ) ? LDAP_SYNC_REFRESH_PRESENTS : 2316 LDAP_SYNC_REFRESH_DELETES ); 2317 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 2318 } else { 2319 /* It's RefreshAndPersist, transition to Persist phase */ 2320 syncprov_sendinfo( op, rs, ( ss->ss_flags & SS_PRESENT ) ? 2321 LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, 2322 ( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL, 2323 1, NULL, 0 ); 2324 if ( !BER_BVISNULL( &cookie )) 2325 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 2326 2327 /* Detach this Op from frontend control */ 2328 ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex ); 2329 2330 /* But not if this connection was closed along the way */ 2331 if ( op->o_abandon ) { 2332 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); 2333 /* syncprov_ab_cleanup will free this syncop */ 2334 return SLAPD_ABANDON; 2335 2336 } else { 2337 ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex ); 2338 /* Turn off the refreshing flag */ 2339 ss->ss_so->s_flags ^= PS_IS_REFRESHING; 2340 2341 syncprov_detach_op( op, ss->ss_so, on ); 2342 2343 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); 2344 2345 /* If there are queued responses, fire them off */ 2346 if ( ss->ss_so->s_res ) 2347 syncprov_qstart( ss->ss_so ); 2348 ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex ); 2349 } 2350 2351 return LDAP_SUCCESS; 2352 } 2353 } 2354 2355 return SLAP_CB_CONTINUE; 2356} 2357 2358static int 2359syncprov_op_search( Operation *op, SlapReply *rs ) 2360{ 2361 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2362 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2363 slap_callback *cb; 2364 int gotstate = 0, changed = 0, do_present = 0; 2365 syncops *sop = NULL; 2366 searchstate *ss; 2367 sync_control *srs; 2368 BerVarray ctxcsn; 2369 int i, *sids, numcsns; 2370 struct berval mincsn; 2371 2372 if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE; 2373 2374 if ( op->ors_deref & LDAP_DEREF_SEARCHING ) { 2375 send_ldap_error( op, rs, LDAP_PROTOCOL_ERROR, "illegal value for derefAliases" ); 2376 return rs->sr_err; 2377 } 2378 2379 srs = op->o_controls[slap_cids.sc_LDAPsync]; 2380 2381 /* If this is a persistent search, set it up right away */ 2382 if ( op->o_sync_mode & SLAP_SYNC_PERSIST ) { 2383 syncops so = {0}; 2384 fbase_cookie fc; 2385 opcookie opc; 2386 slap_callback sc; 2387 2388 fc.fss = &so; 2389 fc.fbase = 0; 2390 so.s_eid = NOID; 2391 so.s_op = op; 2392 so.s_flags = PS_IS_REFRESHING | PS_FIND_BASE; 2393 /* syncprov_findbase expects to be called as a callback... */ 2394 sc.sc_private = &opc; 2395 opc.son = on; 2396 ldap_pvt_thread_mutex_init( &so.s_mutex ); 2397 cb = op->o_callback; 2398 op->o_callback = ≻ 2399 rs->sr_err = syncprov_findbase( op, &fc ); 2400 op->o_callback = cb; 2401 ldap_pvt_thread_mutex_destroy( &so.s_mutex ); 2402 2403 if ( rs->sr_err != LDAP_SUCCESS ) { 2404 send_ldap_result( op, rs ); 2405 return rs->sr_err; 2406 } 2407 sop = ch_malloc( sizeof( syncops )); 2408 *sop = so; 2409 ldap_pvt_thread_mutex_init( &sop->s_mutex ); 2410 sop->s_rid = srs->sr_state.rid; 2411 sop->s_sid = srs->sr_state.sid; 2412 sop->s_inuse = 1; 2413 2414 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 2415 sop->s_next = si->si_ops; 2416 si->si_ops = sop; 2417 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 2418 } 2419 2420 /* snapshot the ctxcsn */ 2421 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 2422 numcsns = si->si_numcsns; 2423 if ( numcsns ) { 2424 ber_bvarray_dup_x( &ctxcsn, si->si_ctxcsn, op->o_tmpmemctx ); 2425 sids = op->o_tmpalloc( numcsns * sizeof(int), op->o_tmpmemctx ); 2426 for ( i=0; i<numcsns; i++ ) 2427 sids[i] = si->si_sids[i]; 2428 } else { 2429 ctxcsn = NULL; 2430 sids = NULL; 2431 } 2432 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 2433 2434 /* If we have a cookie, handle the PRESENT lookups */ 2435 if ( srs->sr_state.ctxcsn ) { 2436 sessionlog *sl; 2437 int i, j; 2438 2439 /* If we don't have any CSN of our own yet, pretend nothing 2440 * has changed. 2441 */ 2442 if ( !numcsns ) 2443 goto no_change; 2444 2445 if ( !si->si_nopres ) 2446 do_present = SS_PRESENT; 2447 2448 /* If there are SIDs we don't recognize in the cookie, drop them */ 2449 for (i=0; i<srs->sr_state.numcsns; ) { 2450 for (j=0; j<numcsns; j++) { 2451 if ( srs->sr_state.sids[i] == sids[j] ) { 2452 break; 2453 } 2454 } 2455 /* not found */ 2456 if ( j == numcsns ) { 2457 struct berval tmp = srs->sr_state.ctxcsn[i]; 2458 j = srs->sr_state.numcsns - 1; 2459 srs->sr_state.ctxcsn[i] = srs->sr_state.ctxcsn[j]; 2460 tmp.bv_len = 0; 2461 srs->sr_state.ctxcsn[j] = tmp; 2462 srs->sr_state.numcsns = j; 2463 srs->sr_state.sids[i] = srs->sr_state.sids[j]; 2464 continue; 2465 } 2466 i++; 2467 } 2468 2469 /* Find the smallest CSN */ 2470 mincsn = srs->sr_state.ctxcsn[0]; 2471 for ( i=1; i<srs->sr_state.numcsns; i++ ) { 2472 if ( ber_bvcmp( &mincsn, &srs->sr_state.ctxcsn[i] ) > 0 ) 2473 mincsn = srs->sr_state.ctxcsn[i]; 2474 } 2475 2476 /* If nothing has changed, shortcut it */ 2477 if ( srs->sr_state.numcsns == numcsns ) { 2478 int i, j, newer; 2479 for ( i=0; i<srs->sr_state.numcsns; i++ ) { 2480 for ( j=0; j<numcsns; j++ ) { 2481 if ( srs->sr_state.sids[i] != sids[j] ) 2482 continue; 2483 newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] ); 2484 /* If our state is newer, tell consumer about changes */ 2485 if ( newer < 0 ) 2486 changed = SS_CHANGED; 2487 else if ( newer > 0 ) { 2488 /* our state is older, tell consumer nothing */ 2489 rs->sr_err = LDAP_SUCCESS; 2490bailout: 2491 if ( sop ) { 2492 syncops **sp = &si->si_ops; 2493 2494 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 2495 while ( *sp != sop ) 2496 sp = &(*sp)->s_next; 2497 *sp = sop->s_next; 2498 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 2499 ch_free( sop ); 2500 } 2501 rs->sr_ctrls = NULL; 2502 send_ldap_result( op, rs ); 2503 return rs->sr_err; 2504 } 2505 break; 2506 } 2507 if ( changed ) 2508 break; 2509 } 2510 if ( !changed ) { 2511 do_present = 0; 2512no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { 2513 LDAPControl *ctrls[2]; 2514 2515 ctrls[0] = NULL; 2516 ctrls[1] = NULL; 2517 syncprov_done_ctrl( op, rs, ctrls, 0, 0, 2518 NULL, LDAP_SYNC_REFRESH_DELETES ); 2519 rs->sr_ctrls = ctrls; 2520 rs->sr_err = LDAP_SUCCESS; 2521 send_ldap_result( op, rs ); 2522 rs->sr_ctrls = NULL; 2523 return rs->sr_err; 2524 } 2525 goto shortcut; 2526 } 2527 } else { 2528 /* consumer doesn't have the right number of CSNs */ 2529 changed = SS_CHANGED; 2530 } 2531 /* Do we have a sessionlog for this search? */ 2532 sl=si->si_logs; 2533 if ( sl ) { 2534 ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); 2535 /* Are there any log entries, and is the consumer state 2536 * present in the session log? 2537 */ 2538 if ( sl->sl_num > 0 && ber_bvcmp( &mincsn, &sl->sl_mincsn ) >= 0 ) { 2539 do_present = 0; 2540 /* mutex is unlocked in playlog */ 2541 syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids ); 2542 } else { 2543 ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); 2544 } 2545 } 2546 /* Is the CSN still present in the database? */ 2547 if ( syncprov_findcsn( op, FIND_CSN ) != LDAP_SUCCESS ) { 2548 /* No, so a reload is required */ 2549 /* the 2.2 consumer doesn't send this hint */ 2550 if ( si->si_usehint && srs->sr_rhint == 0 ) { 2551 if ( ctxcsn ) 2552 ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx ); 2553 if ( sids ) 2554 op->o_tmpfree( sids, op->o_tmpmemctx ); 2555 rs->sr_err = LDAP_SYNC_REFRESH_REQUIRED; 2556 rs->sr_text = "sync cookie is stale"; 2557 goto bailout; 2558 } 2559 if ( srs->sr_state.ctxcsn ) { 2560 ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx ); 2561 srs->sr_state.ctxcsn = NULL; 2562 } 2563 if ( srs->sr_state.sids ) { 2564 slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx ); 2565 srs->sr_state.sids = NULL; 2566 } 2567 srs->sr_state.numcsns = 0; 2568 } else { 2569 gotstate = 1; 2570 /* If changed and doing Present lookup, send Present UUIDs */ 2571 if ( do_present && syncprov_findcsn( op, FIND_PRESENT ) != 2572 LDAP_SUCCESS ) { 2573 if ( ctxcsn ) 2574 ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx ); 2575 if ( sids ) 2576 op->o_tmpfree( sids, op->o_tmpmemctx ); 2577 goto bailout; 2578 } 2579 } 2580 } else { 2581 /* No consumer state, assume something has changed */ 2582 changed = SS_CHANGED; 2583 } 2584 2585shortcut: 2586 /* Append CSN range to search filter, save original filter 2587 * for persistent search evaluation 2588 */ 2589 if ( sop ) { 2590 sop->s_filterstr= op->ors_filterstr; 2591 } 2592 2593 /* If something changed, find the changes */ 2594 if ( gotstate && changed ) { 2595 Filter *fand, *fava; 2596 2597 fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx ); 2598 fand->f_choice = LDAP_FILTER_AND; 2599 fand->f_next = NULL; 2600 fava = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx ); 2601 fand->f_and = fava; 2602 fava->f_choice = LDAP_FILTER_GE; 2603 fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx ); 2604 fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN; 2605#ifdef LDAP_COMP_MATCH 2606 fava->f_ava->aa_cf = NULL; 2607#endif 2608 ber_dupbv_x( &fava->f_ava->aa_value, &mincsn, op->o_tmpmemctx ); 2609 fava->f_next = op->ors_filter; 2610 op->ors_filter = fand; 2611 filter2bv_x( op, op->ors_filter, &op->ors_filterstr ); 2612 if ( sop ) 2613 sop->s_flags |= PS_FIX_FILTER; 2614 } 2615 2616 /* Let our callback add needed info to returned entries */ 2617 cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(searchstate), op->o_tmpmemctx); 2618 ss = (searchstate *)(cb+1); 2619 ss->ss_on = on; 2620 ss->ss_so = sop; 2621 ss->ss_flags = do_present | changed; 2622 ss->ss_ctxcsn = ctxcsn; 2623 ss->ss_numcsns = numcsns; 2624 ss->ss_sids = sids; 2625 cb->sc_response = syncprov_search_response; 2626 cb->sc_cleanup = syncprov_search_cleanup; 2627 cb->sc_private = ss; 2628 cb->sc_next = op->o_callback; 2629 op->o_callback = cb; 2630 2631 /* If this is a persistent search and no changes were reported during 2632 * the refresh phase, just invoke the response callback to transition 2633 * us into persist phase 2634 */ 2635 if ( !changed ) { 2636 rs->sr_err = LDAP_SUCCESS; 2637 rs->sr_nentries = 0; 2638 send_ldap_result( op, rs ); 2639 return rs->sr_err; 2640 } 2641 return SLAP_CB_CONTINUE; 2642} 2643 2644static int 2645syncprov_operational( 2646 Operation *op, 2647 SlapReply *rs ) 2648{ 2649 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2650 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2651 2652 /* This prevents generating unnecessarily; frontend will strip 2653 * any statically stored copy. 2654 */ 2655 if ( op->o_sync != SLAP_CONTROL_NONE ) 2656 return SLAP_CB_CONTINUE; 2657 2658 if ( rs->sr_entry && 2659 dn_match( &rs->sr_entry->e_nname, &si->si_contextdn )) { 2660 2661 if ( SLAP_OPATTRS( rs->sr_attr_flags ) || 2662 ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) { 2663 Attribute *a, **ap = NULL; 2664 2665 for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) { 2666 if ( a->a_desc == slap_schema.si_ad_contextCSN ) 2667 break; 2668 } 2669 2670 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 2671 if ( si->si_ctxcsn ) { 2672 if ( !a ) { 2673 for ( ap = &rs->sr_operational_attrs; *ap; 2674 ap=&(*ap)->a_next ); 2675 2676 a = attr_alloc( slap_schema.si_ad_contextCSN ); 2677 *ap = a; 2678 } 2679 2680 if ( !ap ) { 2681 if ( !(rs->sr_flags & REP_ENTRY_MODIFIABLE) ) { 2682 Entry *e = entry_dup( rs->sr_entry ); 2683 if ( rs->sr_flags & REP_ENTRY_MUSTRELEASE ) { 2684 overlay_entry_release_ov( op, rs->sr_entry, 0, on ); 2685 rs->sr_flags ^= REP_ENTRY_MUSTRELEASE; 2686 } else if ( rs->sr_flags & REP_ENTRY_MUSTBEFREED ) { 2687 entry_free( rs->sr_entry ); 2688 } 2689 rs->sr_entry = e; 2690 rs->sr_flags |= 2691 REP_ENTRY_MODIFIABLE|REP_ENTRY_MUSTBEFREED; 2692 a = attr_find( rs->sr_entry->e_attrs, 2693 slap_schema.si_ad_contextCSN ); 2694 } 2695 if ( a->a_nvals != a->a_vals ) { 2696 ber_bvarray_free( a->a_nvals ); 2697 } 2698 a->a_nvals = NULL; 2699 ber_bvarray_free( a->a_vals ); 2700 a->a_vals = NULL; 2701 a->a_numvals = 0; 2702 } 2703 attr_valadd( a, si->si_ctxcsn, si->si_ctxcsn, si->si_numcsns ); 2704 } 2705 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 2706 } 2707 } 2708 return SLAP_CB_CONTINUE; 2709} 2710 2711enum { 2712 SP_CHKPT = 1, 2713 SP_SESSL, 2714 SP_NOPRES, 2715 SP_USEHINT 2716}; 2717 2718static ConfigDriver sp_cf_gen; 2719 2720static ConfigTable spcfg[] = { 2721 { "syncprov-checkpoint", "ops> <minutes", 3, 3, 0, ARG_MAGIC|SP_CHKPT, 2722 sp_cf_gen, "( OLcfgOvAt:1.1 NAME 'olcSpCheckpoint' " 2723 "DESC 'ContextCSN checkpoint interval in ops and minutes' " 2724 "SYNTAX OMsDirectoryString SINGLE-VALUE )", NULL, NULL }, 2725 { "syncprov-sessionlog", "ops", 2, 2, 0, ARG_INT|ARG_MAGIC|SP_SESSL, 2726 sp_cf_gen, "( OLcfgOvAt:1.2 NAME 'olcSpSessionlog' " 2727 "DESC 'Session log size in ops' " 2728 "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL }, 2729 { "syncprov-nopresent", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_NOPRES, 2730 sp_cf_gen, "( OLcfgOvAt:1.3 NAME 'olcSpNoPresent' " 2731 "DESC 'Omit Present phase processing' " 2732 "SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL }, 2733 { "syncprov-reloadhint", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_USEHINT, 2734 sp_cf_gen, "( OLcfgOvAt:1.4 NAME 'olcSpReloadHint' " 2735 "DESC 'Observe Reload Hint in Request control' " 2736 "SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL }, 2737 { NULL, NULL, 0, 0, 0, ARG_IGNORED } 2738}; 2739 2740static ConfigOCs spocs[] = { 2741 { "( OLcfgOvOc:1.1 " 2742 "NAME 'olcSyncProvConfig' " 2743 "DESC 'SyncRepl Provider configuration' " 2744 "SUP olcOverlayConfig " 2745 "MAY ( olcSpCheckpoint " 2746 "$ olcSpSessionlog " 2747 "$ olcSpNoPresent " 2748 "$ olcSpReloadHint " 2749 ") )", 2750 Cft_Overlay, spcfg }, 2751 { NULL, 0, NULL } 2752}; 2753 2754static int 2755sp_cf_gen(ConfigArgs *c) 2756{ 2757 slap_overinst *on = (slap_overinst *)c->bi; 2758 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2759 int rc = 0; 2760 2761 if ( c->op == SLAP_CONFIG_EMIT ) { 2762 switch ( c->type ) { 2763 case SP_CHKPT: 2764 if ( si->si_chkops || si->si_chktime ) { 2765 struct berval bv; 2766 /* we assume si_chktime is a multiple of 60 2767 * because the parsed value was originally 2768 * multiplied by 60 */ 2769 bv.bv_len = snprintf( c->cr_msg, sizeof( c->cr_msg ), 2770 "%d %d", si->si_chkops, si->si_chktime/60 ); 2771 if ( bv.bv_len >= sizeof( c->cr_msg ) ) { 2772 rc = 1; 2773 } else { 2774 bv.bv_val = c->cr_msg; 2775 value_add_one( &c->rvalue_vals, &bv ); 2776 } 2777 } else { 2778 rc = 1; 2779 } 2780 break; 2781 case SP_SESSL: 2782 if ( si->si_logs ) { 2783 c->value_int = si->si_logs->sl_size; 2784 } else { 2785 rc = 1; 2786 } 2787 break; 2788 case SP_NOPRES: 2789 if ( si->si_nopres ) { 2790 c->value_int = 1; 2791 } else { 2792 rc = 1; 2793 } 2794 break; 2795 case SP_USEHINT: 2796 if ( si->si_usehint ) { 2797 c->value_int = 1; 2798 } else { 2799 rc = 1; 2800 } 2801 break; 2802 } 2803 return rc; 2804 } else if ( c->op == LDAP_MOD_DELETE ) { 2805 switch ( c->type ) { 2806 case SP_CHKPT: 2807 si->si_chkops = 0; 2808 si->si_chktime = 0; 2809 break; 2810 case SP_SESSL: 2811 if ( si->si_logs ) 2812 si->si_logs->sl_size = 0; 2813 else 2814 rc = LDAP_NO_SUCH_ATTRIBUTE; 2815 break; 2816 case SP_NOPRES: 2817 if ( si->si_nopres ) 2818 si->si_nopres = 0; 2819 else 2820 rc = LDAP_NO_SUCH_ATTRIBUTE; 2821 break; 2822 case SP_USEHINT: 2823 if ( si->si_usehint ) 2824 si->si_usehint = 0; 2825 else 2826 rc = LDAP_NO_SUCH_ATTRIBUTE; 2827 break; 2828 } 2829 return rc; 2830 } 2831 switch ( c->type ) { 2832 case SP_CHKPT: 2833 if ( lutil_atoi( &si->si_chkops, c->argv[1] ) != 0 ) { 2834 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint ops # \"%s\"", 2835 c->argv[0], c->argv[1] ); 2836 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 2837 "%s: %s\n", c->log, c->cr_msg, 0 ); 2838 return ARG_BAD_CONF; 2839 } 2840 if ( si->si_chkops <= 0 ) { 2841 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint ops # \"%d\"", 2842 c->argv[0], si->si_chkops ); 2843 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 2844 "%s: %s\n", c->log, c->cr_msg, 0 ); 2845 return ARG_BAD_CONF; 2846 } 2847 if ( lutil_atoi( &si->si_chktime, c->argv[2] ) != 0 ) { 2848 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint time \"%s\"", 2849 c->argv[0], c->argv[1] ); 2850 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 2851 "%s: %s\n", c->log, c->cr_msg, 0 ); 2852 return ARG_BAD_CONF; 2853 } 2854 if ( si->si_chktime <= 0 ) { 2855 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint time \"%d\"", 2856 c->argv[0], si->si_chkops ); 2857 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 2858 "%s: %s\n", c->log, c->cr_msg, 0 ); 2859 return ARG_BAD_CONF; 2860 } 2861 si->si_chktime *= 60; 2862 break; 2863 case SP_SESSL: { 2864 sessionlog *sl; 2865 int size = c->value_int; 2866 2867 if ( size < 0 ) { 2868 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s size %d is negative", 2869 c->argv[0], size ); 2870 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 2871 "%s: %s\n", c->log, c->cr_msg, 0 ); 2872 return ARG_BAD_CONF; 2873 } 2874 sl = si->si_logs; 2875 if ( !sl ) { 2876 sl = ch_malloc( sizeof( sessionlog ) + LDAP_PVT_CSNSTR_BUFSIZE ); 2877 sl->sl_mincsn.bv_val = (char *)(sl+1); 2878 sl->sl_mincsn.bv_len = 0; 2879 sl->sl_num = 0; 2880 sl->sl_head = sl->sl_tail = NULL; 2881 ldap_pvt_thread_mutex_init( &sl->sl_mutex ); 2882 si->si_logs = sl; 2883 } 2884 sl->sl_size = size; 2885 } 2886 break; 2887 case SP_NOPRES: 2888 si->si_nopres = c->value_int; 2889 break; 2890 case SP_USEHINT: 2891 si->si_usehint = c->value_int; 2892 break; 2893 } 2894 return rc; 2895} 2896 2897/* ITS#3456 we cannot run this search on the main thread, must use a 2898 * child thread in order to insure we have a big enough stack. 2899 */ 2900static void * 2901syncprov_db_otask( 2902 void *ptr 2903) 2904{ 2905 syncprov_findcsn( ptr, FIND_MAXCSN ); 2906 return NULL; 2907} 2908 2909/* Read any existing contextCSN from the underlying db. 2910 * Then search for any entries newer than that. If no value exists, 2911 * just generate it. Cache whatever result. 2912 */ 2913static int 2914syncprov_db_open( 2915 BackendDB *be, 2916 ConfigReply *cr 2917) 2918{ 2919 slap_overinst *on = (slap_overinst *) be->bd_info; 2920 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2921 2922 Connection conn = { 0 }; 2923 OperationBuffer opbuf; 2924 Operation *op; 2925 Entry *e = NULL; 2926 Attribute *a; 2927 int rc; 2928 void *thrctx = NULL; 2929 2930 if ( !SLAP_LASTMOD( be )) { 2931 Debug( LDAP_DEBUG_ANY, 2932 "syncprov_db_open: invalid config, lastmod must be enabled\n", 0, 0, 0 ); 2933 return -1; 2934 } 2935 2936 if ( slapMode & SLAP_TOOL_MODE ) { 2937 return 0; 2938 } 2939 2940 rc = overlay_register_control( be, LDAP_CONTROL_SYNC ); 2941 if ( rc ) { 2942 return rc; 2943 } 2944 2945 thrctx = ldap_pvt_thread_pool_context(); 2946 connection_fake_init2( &conn, &opbuf, thrctx, 0 ); 2947 op = &opbuf.ob_op; 2948 op->o_bd = be; 2949 op->o_dn = be->be_rootdn; 2950 op->o_ndn = be->be_rootndn; 2951 2952 if ( SLAP_SYNC_SUBENTRY( be )) { 2953 build_new_dn( &si->si_contextdn, be->be_nsuffix, 2954 (struct berval *)&slap_ldapsync_cn_bv, NULL ); 2955 } else { 2956 si->si_contextdn = be->be_nsuffix[0]; 2957 } 2958 rc = overlay_entry_get_ov( op, &si->si_contextdn, NULL, 2959 slap_schema.si_ad_contextCSN, 0, &e, on ); 2960 2961 if ( e ) { 2962 ldap_pvt_thread_t tid; 2963 2964 a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN ); 2965 if ( a ) { 2966 ber_bvarray_dup_x( &si->si_ctxcsn, a->a_vals, NULL ); 2967 si->si_numcsns = a->a_numvals; 2968 si->si_sids = slap_parse_csn_sids( si->si_ctxcsn, a->a_numvals, NULL ); 2969 } 2970 overlay_entry_release_ov( op, e, 0, on ); 2971 if ( si->si_ctxcsn && !SLAP_DBCLEAN( be )) { 2972 op->o_req_dn = be->be_suffix[0]; 2973 op->o_req_ndn = be->be_nsuffix[0]; 2974 op->ors_scope = LDAP_SCOPE_SUBTREE; 2975 ldap_pvt_thread_create( &tid, 0, syncprov_db_otask, op ); 2976 ldap_pvt_thread_join( tid, NULL ); 2977 } 2978 } 2979 2980 /* Didn't find a contextCSN, should we generate one? */ 2981 if ( !si->si_ctxcsn ) { 2982 char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ]; 2983 struct berval csn; 2984 2985 if ( SLAP_SYNC_SHADOW( op->o_bd )) { 2986 /* If we're also a consumer, then don't generate anything. 2987 * Wait for our provider to send it to us, or for a local 2988 * modify if we have multimaster. 2989 */ 2990 goto out; 2991 } 2992 csn.bv_val = csnbuf; 2993 csn.bv_len = sizeof( csnbuf ); 2994 slap_get_csn( op, &csn, 0 ); 2995 value_add_one( &si->si_ctxcsn, &csn ); 2996 si->si_numcsns = 1; 2997 si->si_sids = ch_malloc( sizeof(int) ); 2998 si->si_sids[0] = slap_serverID; 2999 3000 /* make sure we do a checkpoint on close */ 3001 si->si_numops++; 3002 } 3003 3004out: 3005 op->o_bd->bd_info = (BackendInfo *)on; 3006 return 0; 3007} 3008 3009/* Write the current contextCSN into the underlying db. 3010 */ 3011static int 3012syncprov_db_close( 3013 BackendDB *be, 3014 ConfigReply *cr 3015) 3016{ 3017 slap_overinst *on = (slap_overinst *) be->bd_info; 3018 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3019 3020 if ( slapMode & SLAP_TOOL_MODE ) { 3021 return 0; 3022 } 3023 if ( si->si_numops ) { 3024 Connection conn = {0}; 3025 OperationBuffer opbuf; 3026 Operation *op; 3027 SlapReply rs = {REP_RESULT}; 3028 void *thrctx; 3029 3030 thrctx = ldap_pvt_thread_pool_context(); 3031 connection_fake_init2( &conn, &opbuf, thrctx, 0 ); 3032 op = &opbuf.ob_op; 3033 op->o_bd = be; 3034 op->o_dn = be->be_rootdn; 3035 op->o_ndn = be->be_rootndn; 3036 syncprov_checkpoint( op, &rs, on ); 3037 } 3038 3039 return 0; 3040} 3041 3042static int 3043syncprov_db_init( 3044 BackendDB *be, 3045 ConfigReply *cr 3046) 3047{ 3048 slap_overinst *on = (slap_overinst *)be->bd_info; 3049 syncprov_info_t *si; 3050 3051 if ( SLAP_ISGLOBALOVERLAY( be ) ) { 3052 Debug( LDAP_DEBUG_ANY, 3053 "syncprov must be instantiated within a database.\n", 3054 0, 0, 0 ); 3055 return 1; 3056 } 3057 3058 si = ch_calloc(1, sizeof(syncprov_info_t)); 3059 on->on_bi.bi_private = si; 3060 ldap_pvt_thread_rdwr_init( &si->si_csn_rwlock ); 3061 ldap_pvt_thread_mutex_init( &si->si_ops_mutex ); 3062 ldap_pvt_thread_mutex_init( &si->si_mods_mutex ); 3063 ldap_pvt_thread_mutex_init( &si->si_resp_mutex ); 3064 3065 csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN; 3066 csn_anlist[0].an_name = slap_schema.si_ad_entryCSN->ad_cname; 3067 csn_anlist[1].an_desc = slap_schema.si_ad_entryUUID; 3068 csn_anlist[1].an_name = slap_schema.si_ad_entryUUID->ad_cname; 3069 3070 uuid_anlist[0].an_desc = slap_schema.si_ad_entryUUID; 3071 uuid_anlist[0].an_name = slap_schema.si_ad_entryUUID->ad_cname; 3072 3073 return 0; 3074} 3075 3076static int 3077syncprov_db_destroy( 3078 BackendDB *be, 3079 ConfigReply *cr 3080) 3081{ 3082 slap_overinst *on = (slap_overinst *)be->bd_info; 3083 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3084 3085 if ( si ) { 3086 if ( si->si_logs ) { 3087 slog_entry *se = si->si_logs->sl_head; 3088 3089 while ( se ) { 3090 slog_entry *se_next = se->se_next; 3091 ch_free( se ); 3092 se = se_next; 3093 } 3094 3095 ch_free( si->si_logs ); 3096 } 3097 if ( si->si_ctxcsn ) 3098 ber_bvarray_free( si->si_ctxcsn ); 3099 if ( si->si_sids ) 3100 ch_free( si->si_sids ); 3101 ldap_pvt_thread_mutex_destroy( &si->si_resp_mutex ); 3102 ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex ); 3103 ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex ); 3104 ldap_pvt_thread_rdwr_destroy( &si->si_csn_rwlock ); 3105 ch_free( si ); 3106 } 3107 3108 return 0; 3109} 3110 3111static int syncprov_parseCtrl ( 3112 Operation *op, 3113 SlapReply *rs, 3114 LDAPControl *ctrl ) 3115{ 3116 ber_tag_t tag; 3117 BerElementBuffer berbuf; 3118 BerElement *ber = (BerElement *)&berbuf; 3119 ber_int_t mode; 3120 ber_len_t len; 3121 struct berval cookie = BER_BVNULL; 3122 sync_control *sr; 3123 int rhint = 0; 3124 3125 if ( op->o_sync != SLAP_CONTROL_NONE ) { 3126 rs->sr_text = "Sync control specified multiple times"; 3127 return LDAP_PROTOCOL_ERROR; 3128 } 3129 3130 if ( op->o_pagedresults != SLAP_CONTROL_NONE ) { 3131 rs->sr_text = "Sync control specified with pagedResults control"; 3132 return LDAP_PROTOCOL_ERROR; 3133 } 3134 3135 if ( BER_BVISNULL( &ctrl->ldctl_value ) ) { 3136 rs->sr_text = "Sync control value is absent"; 3137 return LDAP_PROTOCOL_ERROR; 3138 } 3139 3140 if ( BER_BVISEMPTY( &ctrl->ldctl_value ) ) { 3141 rs->sr_text = "Sync control value is empty"; 3142 return LDAP_PROTOCOL_ERROR; 3143 } 3144 3145 /* Parse the control value 3146 * syncRequestValue ::= SEQUENCE { 3147 * mode ENUMERATED { 3148 * -- 0 unused 3149 * refreshOnly (1), 3150 * -- 2 reserved 3151 * refreshAndPersist (3) 3152 * }, 3153 * cookie syncCookie OPTIONAL 3154 * } 3155 */ 3156 3157 ber_init2( ber, &ctrl->ldctl_value, 0 ); 3158 3159 if ( (tag = ber_scanf( ber, "{i" /*}*/, &mode )) == LBER_ERROR ) { 3160 rs->sr_text = "Sync control : mode decoding error"; 3161 return LDAP_PROTOCOL_ERROR; 3162 } 3163 3164 switch( mode ) { 3165 case LDAP_SYNC_REFRESH_ONLY: 3166 mode = SLAP_SYNC_REFRESH; 3167 break; 3168 case LDAP_SYNC_REFRESH_AND_PERSIST: 3169 mode = SLAP_SYNC_REFRESH_AND_PERSIST; 3170 break; 3171 default: 3172 rs->sr_text = "Sync control : unknown update mode"; 3173 return LDAP_PROTOCOL_ERROR; 3174 } 3175 3176 tag = ber_peek_tag( ber, &len ); 3177 3178 if ( tag == LDAP_TAG_SYNC_COOKIE ) { 3179 if (( ber_scanf( ber, /*{*/ "m", &cookie )) == LBER_ERROR ) { 3180 rs->sr_text = "Sync control : cookie decoding error"; 3181 return LDAP_PROTOCOL_ERROR; 3182 } 3183 tag = ber_peek_tag( ber, &len ); 3184 } 3185 if ( tag == LDAP_TAG_RELOAD_HINT ) { 3186 if (( ber_scanf( ber, /*{*/ "b", &rhint )) == LBER_ERROR ) { 3187 rs->sr_text = "Sync control : rhint decoding error"; 3188 return LDAP_PROTOCOL_ERROR; 3189 } 3190 } 3191 if (( ber_scanf( ber, /*{*/ "}")) == LBER_ERROR ) { 3192 rs->sr_text = "Sync control : decoding error"; 3193 return LDAP_PROTOCOL_ERROR; 3194 } 3195 sr = op->o_tmpcalloc( 1, sizeof(struct sync_control), op->o_tmpmemctx ); 3196 sr->sr_rhint = rhint; 3197 if (!BER_BVISNULL(&cookie)) { 3198 ber_dupbv_x( &sr->sr_state.octet_str, &cookie, op->o_tmpmemctx ); 3199 /* If parse fails, pretend no cookie was sent */ 3200 if ( slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx ) || 3201 sr->sr_state.rid == -1 ) { 3202 if ( sr->sr_state.ctxcsn ) { 3203 ber_bvarray_free_x( sr->sr_state.ctxcsn, op->o_tmpmemctx ); 3204 sr->sr_state.ctxcsn = NULL; 3205 } 3206 sr->sr_state.numcsns = 0; 3207 } 3208 } 3209 3210 op->o_controls[slap_cids.sc_LDAPsync] = sr; 3211 3212 op->o_sync = ctrl->ldctl_iscritical 3213 ? SLAP_CONTROL_CRITICAL 3214 : SLAP_CONTROL_NONCRITICAL; 3215 3216 op->o_sync_mode |= mode; /* o_sync_mode shares o_sync */ 3217 3218 return LDAP_SUCCESS; 3219} 3220 3221/* This overlay is set up for dynamic loading via moduleload. For static 3222 * configuration, you'll need to arrange for the slap_overinst to be 3223 * initialized and registered by some other function inside slapd. 3224 */ 3225 3226static slap_overinst syncprov; 3227 3228int 3229syncprov_initialize() 3230{ 3231 int rc; 3232 3233 rc = register_supported_control( LDAP_CONTROL_SYNC, 3234 SLAP_CTRL_SEARCH, NULL, 3235 syncprov_parseCtrl, &slap_cids.sc_LDAPsync ); 3236 if ( rc != LDAP_SUCCESS ) { 3237 Debug( LDAP_DEBUG_ANY, 3238 "syncprov_init: Failed to register control %d\n", rc, 0, 0 ); 3239 return rc; 3240 } 3241 3242 syncprov.on_bi.bi_type = "syncprov"; 3243 syncprov.on_bi.bi_db_init = syncprov_db_init; 3244 syncprov.on_bi.bi_db_destroy = syncprov_db_destroy; 3245 syncprov.on_bi.bi_db_open = syncprov_db_open; 3246 syncprov.on_bi.bi_db_close = syncprov_db_close; 3247 3248 syncprov.on_bi.bi_op_abandon = syncprov_op_abandon; 3249 syncprov.on_bi.bi_op_cancel = syncprov_op_abandon; 3250 3251 syncprov.on_bi.bi_op_add = syncprov_op_mod; 3252 syncprov.on_bi.bi_op_compare = syncprov_op_compare; 3253 syncprov.on_bi.bi_op_delete = syncprov_op_mod; 3254 syncprov.on_bi.bi_op_modify = syncprov_op_mod; 3255 syncprov.on_bi.bi_op_modrdn = syncprov_op_mod; 3256 syncprov.on_bi.bi_op_search = syncprov_op_search; 3257 syncprov.on_bi.bi_extended = syncprov_op_extended; 3258 syncprov.on_bi.bi_operational = syncprov_operational; 3259 3260 syncprov.on_bi.bi_cf_ocs = spocs; 3261 3262 generic_filter.f_desc = slap_schema.si_ad_objectClass; 3263 3264 rc = config_register_schema( spcfg, spocs ); 3265 if ( rc ) return rc; 3266 3267 return overlay_register( &syncprov ); 3268} 3269 3270#if SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC 3271int 3272init_module( int argc, char *argv[] ) 3273{ 3274 return syncprov_initialize(); 3275} 3276#endif /* SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC */ 3277 3278#endif /* defined(SLAPD_OVER_SYNCPROV) */ 3279