1#!/usr/bin/python 2 3# Unix SMB/CIFS implementation. 4# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008 5# Copyright (C) Martin Kuehl <mkhl@samba.org> 2006 6# 7# This is a Python port of the original in testprogs/ejs/samba3sam.js 8# 9# This program is free software; you can redistribute it and/or modify 10# it under the terms of the GNU General Public License as published by 11# the Free Software Foundation; either version 3 of the License, or 12# (at your option) any later version. 13# 14# This program is distributed in the hope that it will be useful, 15# but WITHOUT ANY WARRANTY; without even the implied warranty of 16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17# GNU General Public License for more details. 18# 19# You should have received a copy of the GNU General Public License 20# along with this program. If not, see <http://www.gnu.org/licenses/>. 21# 22 23"""Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP.""" 24 25import os 26import ldb 27from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE 28from samba import Ldb, substitute_var 29from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm 30import samba.dcerpc.security 31import samba.ndr 32 33datadir = os.path.join(os.path.dirname(__file__), 34 "../../../../../testdata/samba3") 35 36def read_datafile(filename): 37 return open(os.path.join(datadir, filename), 'r').read() 38 39def ldb_debug(l, text): 40 print text 41 42 43class MapBaseTestCase(TestCaseInTempDir): 44 """Base test case for mapping tests.""" 45 46 def setup_modules(self, ldb, s3, s4): 47 ldb.add({"dn": "@MAP=samba3sam", 48 "@FROM": s4.basedn, 49 "@TO": "sambaDomainName=TESTS," + s3.basedn}) 50 51 ldb.add({"dn": "@MODULES", 52 "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"}) 53 54 ldb.add({"dn": "@PARTITION", 55 "partition": ["%s:%s" % (s4.basedn, s4.url), 56 "%s:%s" % (s3.basedn, s3.url)], 57 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]}) 58 59 def setUp(self): 60 super(MapBaseTestCase, self).setUp() 61 62 def make_dn(basedn, rdn): 63 return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn) 64 65 def make_s4dn(basedn, rdn): 66 return "%s,%s" % (rdn, basedn) 67 68 self.ldbfile = os.path.join(self.tempdir, "test.ldb") 69 self.ldburl = "tdb://" + self.ldbfile 70 71 tempdir = self.tempdir 72 73 class Target: 74 """Simple helper class that contains data for a specific SAM 75 connection.""" 76 def __init__(self, file, basedn, dn): 77 self.file = os.path.join(tempdir, file) 78 self.url = "tdb://" + self.file 79 self.basedn = basedn 80 self.substvars = {"BASEDN": self.basedn} 81 self.db = Ldb(lp=cmdline_loadparm) 82 self._dn = dn 83 84 def dn(self, rdn): 85 return self._dn(self.basedn, rdn) 86 87 def connect(self): 88 return self.db.connect(self.url) 89 90 def setup_data(self, path): 91 self.add_ldif(read_datafile(path)) 92 93 def subst(self, text): 94 return substitute_var(text, self.substvars) 95 96 def add_ldif(self, ldif): 97 self.db.add_ldif(self.subst(ldif)) 98 99 def modify_ldif(self, ldif): 100 self.db.modify_ldif(self.subst(ldif)) 101 102 self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn) 103 self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn) 104 self.templates = Target("templates.ldb", "cn=templates", None) 105 106 self.samba3.connect() 107 self.templates.connect() 108 self.samba4.connect() 109 110 def tearDown(self): 111 os.unlink(self.ldbfile) 112 os.unlink(self.samba3.file) 113 os.unlink(self.templates.file) 114 os.unlink(self.samba4.file) 115 super(MapBaseTestCase, self).tearDown() 116 117 def assertSidEquals(self, text, ndr_sid): 118 sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid, 119 str(ndr_sid[0])) 120 sid_obj2 = samba.dcerpc.security.dom_sid(text) 121 self.assertEquals(sid_obj1, sid_obj2) 122 123 124class Samba3SamTestCase(MapBaseTestCase): 125 126 def setUp(self): 127 super(Samba3SamTestCase, self).setUp() 128 ldb = Ldb(self.ldburl, lp=cmdline_loadparm) 129 self.samba3.setup_data("samba3.ldif") 130 self.templates.setup_data("provision_samba3sam_templates.ldif") 131 ldif = read_datafile("provision_samba3sam.ldif") 132 ldb.add_ldif(self.samba4.subst(ldif)) 133 self.setup_modules(ldb, self.samba3, self.samba4) 134 del ldb 135 self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm) 136 137 def test_search_non_mapped(self): 138 """Looking up by non-mapped attribute""" 139 msg = self.ldb.search(expression="(cn=Administrator)") 140 self.assertEquals(len(msg), 1) 141 self.assertEquals(msg[0]["cn"], "Administrator") 142 143 def test_search_non_mapped(self): 144 """Looking up by mapped attribute""" 145 msg = self.ldb.search(expression="(name=Backup Operators)") 146 self.assertEquals(len(msg), 1) 147 self.assertEquals(str(msg[0]["name"]), "Backup Operators") 148 149 def test_old_name_of_renamed(self): 150 """Looking up by old name of renamed attribute""" 151 msg = self.ldb.search(expression="(displayName=Backup Operators)") 152 self.assertEquals(len(msg), 0) 153 154 def test_mapped_containing_sid(self): 155 """Looking up mapped entry containing SID""" 156 msg = self.ldb.search(expression="(cn=Replicator)") 157 self.assertEquals(len(msg), 1) 158 self.assertEquals(str(msg[0].dn), 159 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl") 160 self.assertTrue("objectSid" in msg[0]) 161 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", 162 msg[0]["objectSid"]) 163 oc = set(msg[0]["objectClass"]) 164 self.assertEquals(oc, set(["group"])) 165 166 def test_search_by_objclass(self): 167 """Looking up by objectClass""" 168 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))") 169 self.assertEquals(set([str(m.dn) for m in msg]), 170 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", 171 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"])) 172 173 def test_s3sam_modify(self): 174 # Adding a record that will be fallbacked 175 self.ldb.add({"dn": "cn=Foo", 176 "foo": "bar", 177 "blah": "Blie", 178 "cn": "Foo", 179 "showInAdvancedViewOnly": "TRUE"} 180 ) 181 182 # Checking for existence of record (local) 183 # TODO: This record must be searched in the local database, which is 184 # currently only supported for base searches 185 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')] 186 # TODO: Actually, this version should work as well but doesn't... 187 # 188 # 189 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", 190 scope=SCOPE_BASE, 191 attrs=['foo','blah','cn','showInAdvancedViewOnly']) 192 self.assertEquals(len(msg), 1) 193 self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE") 194 self.assertEquals(str(msg[0]["foo"]), "bar") 195 self.assertEquals(str(msg[0]["blah"]), "Blie") 196 197 # Adding record that will be mapped 198 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl", 199 "objectClass": "user", 200 "unixName": "bin", 201 "sambaUnicodePwd": "geheim", 202 "cn": "Niemand"}) 203 204 # Checking for existence of record (remote) 205 msg = self.ldb.search(expression="(unixName=bin)", 206 attrs=['unixName','cn','dn', 'sambaUnicodePwd']) 207 self.assertEquals(len(msg), 1) 208 self.assertEquals(str(msg[0]["cn"]), "Niemand") 209 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim") 210 211 # Checking for existence of record (local && remote) 212 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 213 attrs=['unixName','cn','dn', 'sambaUnicodePwd']) 214 self.assertEquals(len(msg), 1) # TODO: should check with more records 215 self.assertEquals(str(msg[0]["cn"]), "Niemand") 216 self.assertEquals(str(msg[0]["unixName"]), "bin") 217 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim") 218 219 # Checking for existence of record (local || remote) 220 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 221 attrs=['unixName','cn','dn', 'sambaUnicodePwd']) 222 #print "got %d replies" % len(msg) 223 self.assertEquals(len(msg), 1) # TODO: should check with more records 224 self.assertEquals(str(msg[0]["cn"]), "Niemand") 225 self.assertEquals(str(msg[0]["unixName"]), "bin") 226 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim") 227 228 # Checking for data in destination database 229 msg = self.samba3.db.search(expression="(cn=Niemand)") 230 self.assertTrue(len(msg) >= 1) 231 self.assertEquals(str(msg[0]["sambaSID"]), 232 "S-1-5-21-4231626423-2410014848-2360679739-2001") 233 self.assertEquals(str(msg[0]["displayName"]), "Niemand") 234 235 # Adding attribute... 236 self.ldb.modify_ldif(""" 237dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl 238changetype: modify 239add: description 240description: Blah 241""") 242 243 # Checking whether changes are still there... 244 msg = self.ldb.search(expression="(cn=Niemand)") 245 self.assertTrue(len(msg) >= 1) 246 self.assertEquals(str(msg[0]["cn"]), "Niemand") 247 self.assertEquals(str(msg[0]["description"]), "Blah") 248 249 # Modifying attribute... 250 self.ldb.modify_ldif(""" 251dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl 252changetype: modify 253replace: description 254description: Blie 255""") 256 257 # Checking whether changes are still there... 258 msg = self.ldb.search(expression="(cn=Niemand)") 259 self.assertTrue(len(msg) >= 1) 260 self.assertEquals(str(msg[0]["description"]), "Blie") 261 262 # Deleting attribute... 263 self.ldb.modify_ldif(""" 264dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl 265changetype: modify 266delete: description 267""") 268 269 # Checking whether changes are no longer there... 270 msg = self.ldb.search(expression="(cn=Niemand)") 271 self.assertTrue(len(msg) >= 1) 272 self.assertTrue(not "description" in msg[0]) 273 274 # Renaming record... 275 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", 276 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") 277 278 # Checking whether DN has changed... 279 msg = self.ldb.search(expression="(cn=Niemand2)") 280 self.assertEquals(len(msg), 1) 281 self.assertEquals(str(msg[0].dn), 282 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") 283 284 # Deleting record... 285 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl") 286 287 # Checking whether record is gone... 288 msg = self.ldb.search(expression="(cn=Niemand2)") 289 self.assertEquals(len(msg), 0) 290 291 292class MapTestCase(MapBaseTestCase): 293 294 def setUp(self): 295 super(MapTestCase, self).setUp() 296 ldb = Ldb(self.ldburl, lp=cmdline_loadparm) 297 self.templates.setup_data("provision_samba3sam_templates.ldif") 298 ldif = read_datafile("provision_samba3sam.ldif") 299 ldb.add_ldif(self.samba4.subst(ldif)) 300 self.setup_modules(ldb, self.samba3, self.samba4) 301 del ldb 302 self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm) 303 304 def test_map_search(self): 305 """Running search tests on mapped data.""" 306 self.samba3.db.add({ 307 "dn": "sambaDomainName=TESTS," + self.samba3.basedn, 308 "objectclass": ["sambaDomain", "top"], 309 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739", 310 "sambaNextRid": "2000", 311 "sambaDomainName": "TESTS" 312 }) 313 314 # Add a set of split records 315 self.ldb.add_ldif(""" 316dn: """+ self.samba4.dn("cn=X") + """ 317objectClass: user 318cn: X 319codePage: x 320revision: x 321dnsHostName: x 322nextRid: y 323lastLogon: x 324description: x 325objectSid: S-1-5-21-4231626423-2410014848-2360679739-552 326""") 327 328 self.ldb.add({ 329 "dn": self.samba4.dn("cn=Y"), 330 "objectClass": "top", 331 "cn": "Y", 332 "codePage": "x", 333 "revision": "x", 334 "dnsHostName": "y", 335 "nextRid": "y", 336 "lastLogon": "y", 337 "description": "x"}) 338 339 self.ldb.add({ 340 "dn": self.samba4.dn("cn=Z"), 341 "objectClass": "top", 342 "cn": "Z", 343 "codePage": "x", 344 "revision": "y", 345 "dnsHostName": "z", 346 "nextRid": "y", 347 "lastLogon": "z", 348 "description": "y"}) 349 350 # Add a set of remote records 351 352 self.samba3.db.add({ 353 "dn": self.samba3.dn("cn=A"), 354 "objectClass": "posixAccount", 355 "cn": "A", 356 "sambaNextRid": "x", 357 "sambaBadPasswordCount": "x", 358 "sambaLogonTime": "x", 359 "description": "x", 360 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552", 361 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"}) 362 363 self.samba3.db.add({ 364 "dn": self.samba3.dn("cn=B"), 365 "objectClass": "top", 366 "cn": "B", 367 "sambaNextRid": "x", 368 "sambaBadPasswordCount": "x", 369 "sambaLogonTime": "y", 370 "description": "x"}) 371 372 self.samba3.db.add({ 373 "dn": self.samba3.dn("cn=C"), 374 "objectClass": "top", 375 "cn": "C", 376 "sambaNextRid": "x", 377 "sambaBadPasswordCount": "y", 378 "sambaLogonTime": "z", 379 "description": "y"}) 380 381 # Testing search by DN 382 383 # Search remote record by local DN 384 dn = self.samba4.dn("cn=A") 385 res = self.ldb.search(dn, scope=SCOPE_BASE, 386 attrs=["dnsHostName", "lastLogon"]) 387 self.assertEquals(len(res), 1) 388 self.assertEquals(str(res[0].dn), dn) 389 self.assertTrue(not "dnsHostName" in res[0]) 390 self.assertEquals(str(res[0]["lastLogon"]), "x") 391 392 # Search remote record by remote DN 393 dn = self.samba3.dn("cn=A") 394 res = self.samba3.db.search(dn, scope=SCOPE_BASE, 395 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"]) 396 self.assertEquals(len(res), 1) 397 self.assertEquals(str(res[0].dn), dn) 398 self.assertTrue(not "dnsHostName" in res[0]) 399 self.assertTrue(not "lastLogon" in res[0]) 400 self.assertEquals(str(res[0]["sambaLogonTime"]), "x") 401 402 # Search split record by local DN 403 dn = self.samba4.dn("cn=X") 404 res = self.ldb.search(dn, scope=SCOPE_BASE, 405 attrs=["dnsHostName", "lastLogon"]) 406 self.assertEquals(len(res), 1) 407 self.assertEquals(str(res[0].dn), dn) 408 self.assertEquals(str(res[0]["dnsHostName"]), "x") 409 self.assertEquals(str(res[0]["lastLogon"]), "x") 410 411 # Search split record by remote DN 412 dn = self.samba3.dn("cn=X") 413 res = self.samba3.db.search(dn, scope=SCOPE_BASE, 414 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"]) 415 self.assertEquals(len(res), 1) 416 self.assertEquals(str(res[0].dn), dn) 417 self.assertTrue(not "dnsHostName" in res[0]) 418 self.assertTrue(not "lastLogon" in res[0]) 419 self.assertEquals(str(res[0]["sambaLogonTime"]), "x") 420 421 # Testing search by attribute 422 423 # Search by ignored attribute 424 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, 425 attrs=["dnsHostName", "lastLogon"]) 426 self.assertEquals(len(res), 2) 427 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) 428 self.assertEquals(str(res[0]["dnsHostName"]), "y") 429 self.assertEquals(str(res[0]["lastLogon"]), "y") 430 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) 431 self.assertEquals(str(res[1]["dnsHostName"]), "x") 432 self.assertEquals(str(res[1]["lastLogon"]), "x") 433 434 # Search by kept attribute 435 res = self.ldb.search(expression="(description=y)", 436 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"]) 437 self.assertEquals(len(res), 2) 438 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z")) 439 self.assertEquals(str(res[0]["dnsHostName"]), "z") 440 self.assertEquals(str(res[0]["lastLogon"]), "z") 441 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C")) 442 self.assertTrue(not "dnsHostName" in res[1]) 443 self.assertEquals(str(res[1]["lastLogon"]), "z") 444 445 # Search by renamed attribute 446 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, 447 attrs=["dnsHostName", "lastLogon"]) 448 self.assertEquals(len(res), 2) 449 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) 450 self.assertTrue(not "dnsHostName" in res[0]) 451 self.assertEquals(str(res[0]["lastLogon"]), "y") 452 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) 453 self.assertTrue(not "dnsHostName" in res[1]) 454 self.assertEquals(str(res[1]["lastLogon"]), "x") 455 456 # Search by converted attribute 457 # TODO: 458 # Using the SID directly in the parse tree leads to conversion 459 # errors, letting the search fail with no results. 460 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs) 461 res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"]) 462 self.assertEquals(len(res), 3) 463 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) 464 self.assertEquals(str(res[0]["dnsHostName"]), "x") 465 self.assertEquals(str(res[0]["lastLogon"]), "x") 466 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", 467 res[0]["objectSid"]) 468 self.assertTrue("objectSid" in res[0]) 469 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) 470 self.assertTrue(not "dnsHostName" in res[1]) 471 self.assertEquals(str(res[1]["lastLogon"]), "x") 472 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", 473 res[1]["objectSid"]) 474 self.assertTrue("objectSid" in res[1]) 475 476 # Search by generated attribute 477 # In most cases, this even works when the mapping is missing 478 # a `convert_operator' by enumerating the remote db. 479 res = self.ldb.search(expression="(primaryGroupID=512)", 480 attrs=["dnsHostName", "lastLogon", "primaryGroupID"]) 481 self.assertEquals(len(res), 1) 482 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) 483 self.assertTrue(not "dnsHostName" in res[0]) 484 self.assertEquals(str(res[0]["lastLogon"]), "x") 485 self.assertEquals(str(res[0]["primaryGroupID"]), "512") 486 487 # Note that Xs "objectSid" seems to be fine in the previous search for 488 # "objectSid"... 489 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs) 490 #print len(res) + " results found" 491 #for i in range(len(res)): 492 # for (obj in res[i]) { 493 # print obj + ": " + res[i][obj] 494 # } 495 # print "---" 496 # 497 498 # Search by remote name of renamed attribute */ 499 res = self.ldb.search(expression="(sambaBadPasswordCount=*)", 500 attrs=["dnsHostName", "lastLogon"]) 501 self.assertEquals(len(res), 0) 502 503 # Search by objectClass 504 attrs = ["dnsHostName", "lastLogon", "objectClass"] 505 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs) 506 self.assertEquals(len(res), 2) 507 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) 508 self.assertEquals(str(res[0]["dnsHostName"]), "x") 509 self.assertEquals(str(res[0]["lastLogon"]), "x") 510 self.assertEquals(str(res[0]["objectClass"][0]), "user") 511 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) 512 self.assertTrue(not "dnsHostName" in res[1]) 513 self.assertEquals(str(res[1]["lastLogon"]), "x") 514 self.assertEquals(str(res[1]["objectClass"][0]), "user") 515 516 # Prove that the objectClass is actually used for the search 517 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", 518 attrs=attrs) 519 self.assertEquals(len(res), 3) 520 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) 521 self.assertTrue(not "dnsHostName" in res[0]) 522 self.assertEquals(str(res[0]["lastLogon"]), "y") 523 self.assertEquals(set(res[0]["objectClass"]), set(["top"])) 524 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) 525 self.assertEquals(str(res[1]["dnsHostName"]), "x") 526 self.assertEquals(str(res[1]["lastLogon"]), "x") 527 self.assertEquals(str(res[1]["objectClass"][0]), "user") 528 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A")) 529 self.assertTrue(not "dnsHostName" in res[2]) 530 self.assertEquals(str(res[2]["lastLogon"]), "x") 531 self.assertEquals(res[2]["objectClass"][0], "user") 532 533 # Testing search by parse tree 534 535 # Search by conjunction of local attributes 536 res = self.ldb.search(expression="(&(codePage=x)(revision=x))", 537 attrs=["dnsHostName", "lastLogon"]) 538 self.assertEquals(len(res), 2) 539 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) 540 self.assertEquals(str(res[0]["dnsHostName"]), "y") 541 self.assertEquals(str(res[0]["lastLogon"]), "y") 542 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) 543 self.assertEquals(str(res[1]["dnsHostName"]), "x") 544 self.assertEquals(str(res[1]["lastLogon"]), "x") 545 546 # Search by conjunction of remote attributes 547 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", 548 attrs=["dnsHostName", "lastLogon"]) 549 self.assertEquals(len(res), 2) 550 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) 551 self.assertEquals(str(res[0]["dnsHostName"]), "x") 552 self.assertEquals(str(res[0]["lastLogon"]), "x") 553 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) 554 self.assertTrue(not "dnsHostName" in res[1]) 555 self.assertEquals(str(res[1]["lastLogon"]), "x") 556 557 # Search by conjunction of local and remote attribute 558 res = self.ldb.search(expression="(&(codePage=x)(description=x))", 559 attrs=["dnsHostName", "lastLogon"]) 560 self.assertEquals(len(res), 2) 561 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) 562 self.assertEquals(str(res[0]["dnsHostName"]), "y") 563 self.assertEquals(str(res[0]["lastLogon"]), "y") 564 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) 565 self.assertEquals(str(res[1]["dnsHostName"]), "x") 566 self.assertEquals(str(res[1]["lastLogon"]), "x") 567 568 # Search by conjunction of local and remote attribute w/o match 569 attrs = ["dnsHostName", "lastLogon"] 570 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", 571 attrs=attrs) 572 self.assertEquals(len(res), 0) 573 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", 574 attrs=attrs) 575 self.assertEquals(len(res), 0) 576 577 # Search by disjunction of local attributes 578 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", 579 attrs=["dnsHostName", "lastLogon"]) 580 self.assertEquals(len(res), 2) 581 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) 582 self.assertEquals(str(res[0]["dnsHostName"]), "y") 583 self.assertEquals(str(res[0]["lastLogon"]), "y") 584 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) 585 self.assertEquals(str(res[1]["dnsHostName"]), "x") 586 self.assertEquals(str(res[1]["lastLogon"]), "x") 587 588 # Search by disjunction of remote attributes 589 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", 590 attrs=["dnsHostName", "lastLogon"]) 591 self.assertEquals(len(res), 3) 592 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) 593 self.assertFalse("dnsHostName" in res[0]) 594 self.assertEquals(str(res[0]["lastLogon"]), "y") 595 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) 596 self.assertEquals(str(res[1]["dnsHostName"]), "x") 597 self.assertEquals(str(res[1]["lastLogon"]), "x") 598 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A")) 599 self.assertFalse("dnsHostName" in res[2]) 600 self.assertEquals(str(res[2]["lastLogon"]), "x") 601 602 # Search by disjunction of local and remote attribute 603 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", 604 attrs=["dnsHostName", "lastLogon"]) 605 self.assertEquals(len(res), 3) 606 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) 607 self.assertEquals(str(res[0]["dnsHostName"]), "y") 608 self.assertEquals(str(res[0]["lastLogon"]), "y") 609 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) 610 self.assertFalse("dnsHostName" in res[1]) 611 self.assertEquals(str(res[1]["lastLogon"]), "y") 612 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X")) 613 self.assertEquals(str(res[2]["dnsHostName"]), "x") 614 self.assertEquals(str(res[2]["lastLogon"]), "x") 615 616 # Search by disjunction of local and remote attribute w/o match 617 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", 618 attrs=["dnsHostName", "lastLogon"]) 619 self.assertEquals(len(res), 0) 620 621 # Search by negated local attribute 622 res = self.ldb.search(expression="(!(revision=x))", 623 attrs=["dnsHostName", "lastLogon"]) 624 self.assertEquals(len(res), 5) 625 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) 626 self.assertTrue(not "dnsHostName" in res[0]) 627 self.assertEquals(str(res[0]["lastLogon"]), "y") 628 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) 629 self.assertTrue(not "dnsHostName" in res[1]) 630 self.assertEquals(str(res[1]["lastLogon"]), "x") 631 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) 632 self.assertEquals(str(res[2]["dnsHostName"]), "z") 633 self.assertEquals(str(res[2]["lastLogon"]), "z") 634 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) 635 self.assertTrue(not "dnsHostName" in res[3]) 636 self.assertEquals(str(res[3]["lastLogon"]), "z") 637 638 # Search by negated remote attribute 639 res = self.ldb.search(expression="(!(description=x))", 640 attrs=["dnsHostName", "lastLogon"]) 641 self.assertEquals(len(res), 3) 642 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z")) 643 self.assertEquals(str(res[0]["dnsHostName"]), "z") 644 self.assertEquals(str(res[0]["lastLogon"]), "z") 645 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C")) 646 self.assertTrue(not "dnsHostName" in res[1]) 647 self.assertEquals(str(res[1]["lastLogon"]), "z") 648 649 # Search by negated conjunction of local attributes 650 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", 651 attrs=["dnsHostName", "lastLogon"]) 652 self.assertEquals(len(res), 5) 653 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) 654 self.assertTrue(not "dnsHostName" in res[0]) 655 self.assertEquals(str(res[0]["lastLogon"]), "y") 656 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) 657 self.assertTrue(not "dnsHostName" in res[1]) 658 self.assertEquals(str(res[1]["lastLogon"]), "x") 659 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) 660 self.assertEquals(str(res[2]["dnsHostName"]), "z") 661 self.assertEquals(str(res[2]["lastLogon"]), "z") 662 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) 663 self.assertTrue(not "dnsHostName" in res[3]) 664 self.assertEquals(str(res[3]["lastLogon"]), "z") 665 666 # Search by negated conjunction of remote attributes 667 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", 668 attrs=["dnsHostName", "lastLogon"]) 669 self.assertEquals(len(res), 5) 670 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) 671 self.assertEquals(str(res[0]["dnsHostName"]), "y") 672 self.assertEquals(str(res[0]["lastLogon"]), "y") 673 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) 674 self.assertTrue(not "dnsHostName" in res[1]) 675 self.assertEquals(str(res[1]["lastLogon"]), "y") 676 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) 677 self.assertEquals(str(res[2]["dnsHostName"]), "z") 678 self.assertEquals(str(res[2]["lastLogon"]), "z") 679 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) 680 self.assertTrue(not "dnsHostName" in res[3]) 681 self.assertEquals(str(res[3]["lastLogon"]), "z") 682 683 # Search by negated conjunction of local and remote attribute 684 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", 685 attrs=["dnsHostName", "lastLogon"]) 686 self.assertEquals(len(res), 5) 687 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) 688 self.assertTrue(not "dnsHostName" in res[0]) 689 self.assertEquals(str(res[0]["lastLogon"]), "y") 690 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) 691 self.assertTrue(not "dnsHostName" in res[1]) 692 self.assertEquals(str(res[1]["lastLogon"]), "x") 693 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) 694 self.assertEquals(str(res[2]["dnsHostName"]), "z") 695 self.assertEquals(str(res[2]["lastLogon"]), "z") 696 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) 697 self.assertTrue(not "dnsHostName" in res[3]) 698 self.assertEquals(str(res[3]["lastLogon"]), "z") 699 700 # Search by negated disjunction of local attributes 701 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", 702 attrs=["dnsHostName", "lastLogon"]) 703 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) 704 self.assertTrue(not "dnsHostName" in res[0]) 705 self.assertEquals(str(res[0]["lastLogon"]), "y") 706 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A")) 707 self.assertTrue(not "dnsHostName" in res[1]) 708 self.assertEquals(str(res[1]["lastLogon"]), "x") 709 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) 710 self.assertEquals(str(res[2]["dnsHostName"]), "z") 711 self.assertEquals(str(res[2]["lastLogon"]), "z") 712 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C")) 713 self.assertTrue(not "dnsHostName" in res[3]) 714 self.assertEquals(str(res[3]["lastLogon"]), "z") 715 716 # Search by negated disjunction of remote attributes 717 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", 718 attrs=["dnsHostName", "lastLogon"]) 719 self.assertEquals(len(res), 4) 720 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y")) 721 self.assertEquals(str(res[0]["dnsHostName"]), "y") 722 self.assertEquals(str(res[0]["lastLogon"]), "y") 723 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z")) 724 self.assertEquals(str(res[1]["dnsHostName"]), "z") 725 self.assertEquals(str(res[1]["lastLogon"]), "z") 726 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) 727 self.assertTrue(not "dnsHostName" in res[2]) 728 self.assertEquals(str(res[2]["lastLogon"]), "z") 729 730 # Search by negated disjunction of local and remote attribute 731 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", 732 attrs=["dnsHostName", "lastLogon"]) 733 self.assertEquals(len(res), 4) 734 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) 735 self.assertTrue(not "dnsHostName" in res[0]) 736 self.assertEquals(str(res[0]["lastLogon"]), "x") 737 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z")) 738 self.assertEquals(str(res[1]["dnsHostName"]), "z") 739 self.assertEquals(str(res[1]["lastLogon"]), "z") 740 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) 741 self.assertTrue(not "dnsHostName" in res[2]) 742 self.assertEquals(str(res[2]["lastLogon"]), "z") 743 744 # Search by complex parse tree 745 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"]) 746 self.assertEquals(len(res), 6) 747 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) 748 self.assertTrue(not "dnsHostName" in res[0]) 749 self.assertEquals(str(res[0]["lastLogon"]), "y") 750 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) 751 self.assertEquals(str(res[1]["dnsHostName"]), "x") 752 self.assertEquals(str(res[1]["lastLogon"]), "x") 753 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A")) 754 self.assertTrue(not "dnsHostName" in res[2]) 755 self.assertEquals(str(res[2]["lastLogon"]), "x") 756 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z")) 757 self.assertEquals(str(res[3]["dnsHostName"]), "z") 758 self.assertEquals(str(res[3]["lastLogon"]), "z") 759 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C")) 760 self.assertTrue(not "dnsHostName" in res[4]) 761 self.assertEquals(str(res[4]["lastLogon"]), "z") 762 763 # Clean up 764 dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]] 765 for dn in dns: 766 self.ldb.delete(dn) 767 768 def test_map_modify_local(self): 769 """Modification of local records.""" 770 # Add local record 771 dn = "cn=test,dc=idealx,dc=org" 772 self.ldb.add({"dn": dn, 773 "cn": "test", 774 "foo": "bar", 775 "revision": "1", 776 "description": "test"}) 777 # Check it's there 778 attrs = ["foo", "revision", "description"] 779 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) 780 self.assertEquals(len(res), 1) 781 self.assertEquals(str(res[0].dn), dn) 782 self.assertEquals(str(res[0]["foo"]), "bar") 783 self.assertEquals(str(res[0]["revision"]), "1") 784 self.assertEquals(str(res[0]["description"]), "test") 785 # Check it's not in the local db 786 res = self.samba4.db.search(expression="(cn=test)", 787 scope=SCOPE_DEFAULT, attrs=attrs) 788 self.assertEquals(len(res), 0) 789 # Check it's not in the remote db 790 res = self.samba3.db.search(expression="(cn=test)", 791 scope=SCOPE_DEFAULT, attrs=attrs) 792 self.assertEquals(len(res), 0) 793 794 # Modify local record 795 ldif = """ 796dn: """ + dn + """ 797replace: foo 798foo: baz 799replace: description 800description: foo 801""" 802 self.ldb.modify_ldif(ldif) 803 # Check in local db 804 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) 805 self.assertEquals(len(res), 1) 806 self.assertEquals(str(res[0].dn), dn) 807 self.assertEquals(str(res[0]["foo"]), "baz") 808 self.assertEquals(str(res[0]["revision"]), "1") 809 self.assertEquals(str(res[0]["description"]), "foo") 810 811 # Rename local record 812 dn2 = "cn=toast,dc=idealx,dc=org" 813 self.ldb.rename(dn, dn2) 814 # Check in local db 815 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs) 816 self.assertEquals(len(res), 1) 817 self.assertEquals(str(res[0].dn), dn2) 818 self.assertEquals(str(res[0]["foo"]), "baz") 819 self.assertEquals(str(res[0]["revision"]), "1") 820 self.assertEquals(str(res[0]["description"]), "foo") 821 822 # Delete local record 823 self.ldb.delete(dn2) 824 # Check it's gone 825 res = self.ldb.search(dn2, scope=SCOPE_BASE) 826 self.assertEquals(len(res), 0) 827 828 def test_map_modify_remote_remote(self): 829 """Modification of remote data of remote records""" 830 # Add remote record 831 dn = self.samba4.dn("cn=test") 832 dn2 = self.samba3.dn("cn=test") 833 self.samba3.db.add({"dn": dn2, 834 "cn": "test", 835 "description": "foo", 836 "sambaBadPasswordCount": "3", 837 "sambaNextRid": "1001"}) 838 # Check it's there 839 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 840 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"]) 841 self.assertEquals(len(res), 1) 842 self.assertEquals(str(res[0].dn), dn2) 843 self.assertEquals(str(res[0]["description"]), "foo") 844 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3") 845 self.assertEquals(str(res[0]["sambaNextRid"]), "1001") 846 # Check in mapped db 847 attrs = ["description", "badPwdCount", "nextRid"] 848 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="") 849 self.assertEquals(len(res), 1) 850 self.assertEquals(str(res[0].dn), dn) 851 self.assertEquals(str(res[0]["description"]), "foo") 852 self.assertEquals(str(res[0]["badPwdCount"]), "3") 853 self.assertEquals(str(res[0]["nextRid"]), "1001") 854 # Check in local db 855 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) 856 self.assertEquals(len(res), 0) 857 858 # Modify remote data of remote record 859 ldif = """ 860dn: """ + dn + """ 861replace: description 862description: test 863replace: badPwdCount 864badPwdCount: 4 865""" 866 self.ldb.modify_ldif(ldif) 867 # Check in mapped db 868 res = self.ldb.search(dn, scope=SCOPE_BASE, 869 attrs=["description", "badPwdCount", "nextRid"]) 870 self.assertEquals(len(res), 1) 871 self.assertEquals(str(res[0].dn), dn) 872 self.assertEquals(str(res[0]["description"]), "test") 873 self.assertEquals(str(res[0]["badPwdCount"]), "4") 874 self.assertEquals(str(res[0]["nextRid"]), "1001") 875 # Check in remote db 876 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 877 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"]) 878 self.assertEquals(len(res), 1) 879 self.assertEquals(str(res[0].dn), dn2) 880 self.assertEquals(str(res[0]["description"]), "test") 881 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4") 882 self.assertEquals(str(res[0]["sambaNextRid"]), "1001") 883 884 # Rename remote record 885 dn2 = self.samba4.dn("cn=toast") 886 self.ldb.rename(dn, dn2) 887 # Check in mapped db 888 dn = dn2 889 res = self.ldb.search(dn, scope=SCOPE_BASE, 890 attrs=["description", "badPwdCount", "nextRid"]) 891 self.assertEquals(len(res), 1) 892 self.assertEquals(str(res[0].dn), dn) 893 self.assertEquals(str(res[0]["description"]), "test") 894 self.assertEquals(str(res[0]["badPwdCount"]), "4") 895 self.assertEquals(str(res[0]["nextRid"]), "1001") 896 # Check in remote db 897 dn2 = self.samba3.dn("cn=toast") 898 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 899 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"]) 900 self.assertEquals(len(res), 1) 901 self.assertEquals(str(res[0].dn), dn2) 902 self.assertEquals(str(res[0]["description"]), "test") 903 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4") 904 self.assertEquals(str(res[0]["sambaNextRid"]), "1001") 905 906 # Delete remote record 907 self.ldb.delete(dn) 908 # Check in mapped db that it's removed 909 res = self.ldb.search(dn, scope=SCOPE_BASE) 910 self.assertEquals(len(res), 0) 911 # Check in remote db 912 res = self.samba3.db.search(dn2, scope=SCOPE_BASE) 913 self.assertEquals(len(res), 0) 914 915 def test_map_modify_remote_local(self): 916 """Modification of local data of remote records""" 917 # Add remote record (same as before) 918 dn = self.samba4.dn("cn=test") 919 dn2 = self.samba3.dn("cn=test") 920 self.samba3.db.add({"dn": dn2, 921 "cn": "test", 922 "description": "foo", 923 "sambaBadPasswordCount": "3", 924 "sambaNextRid": "1001"}) 925 926 # Modify local data of remote record 927 ldif = """ 928dn: """ + dn + """ 929add: revision 930revision: 1 931replace: description 932description: test 933 934""" 935 self.ldb.modify_ldif(ldif) 936 # Check in mapped db 937 attrs = ["revision", "description"] 938 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) 939 self.assertEquals(len(res), 1) 940 self.assertEquals(str(res[0].dn), dn) 941 self.assertEquals(str(res[0]["description"]), "test") 942 self.assertEquals(str(res[0]["revision"]), "1") 943 # Check in remote db 944 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) 945 self.assertEquals(len(res), 1) 946 self.assertEquals(str(res[0].dn), dn2) 947 self.assertEquals(str(res[0]["description"]), "test") 948 self.assertTrue(not "revision" in res[0]) 949 # Check in local db 950 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) 951 self.assertEquals(len(res), 1) 952 self.assertEquals(str(res[0].dn), dn) 953 self.assertTrue(not "description" in res[0]) 954 self.assertEquals(str(res[0]["revision"]), "1") 955 956 # Delete (newly) split record 957 self.ldb.delete(dn) 958 959 def test_map_modify_split(self): 960 """Testing modification of split records""" 961 # Add split record 962 dn = self.samba4.dn("cn=test") 963 dn2 = self.samba3.dn("cn=test") 964 self.ldb.add({ 965 "dn": dn, 966 "cn": "test", 967 "description": "foo", 968 "badPwdCount": "3", 969 "nextRid": "1001", 970 "revision": "1"}) 971 # Check it's there 972 attrs = ["description", "badPwdCount", "nextRid", "revision"] 973 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) 974 self.assertEquals(len(res), 1) 975 self.assertEquals(str(res[0].dn), dn) 976 self.assertEquals(str(res[0]["description"]), "foo") 977 self.assertEquals(str(res[0]["badPwdCount"]), "3") 978 self.assertEquals(str(res[0]["nextRid"]), "1001") 979 self.assertEquals(str(res[0]["revision"]), "1") 980 # Check in local db 981 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) 982 self.assertEquals(len(res), 1) 983 self.assertEquals(str(res[0].dn), dn) 984 self.assertTrue(not "description" in res[0]) 985 self.assertTrue(not "badPwdCount" in res[0]) 986 self.assertTrue(not "nextRid" in res[0]) 987 self.assertEquals(str(res[0]["revision"]), "1") 988 # Check in remote db 989 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 990 "revision"] 991 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) 992 self.assertEquals(len(res), 1) 993 self.assertEquals(str(res[0].dn), dn2) 994 self.assertEquals(str(res[0]["description"]), "foo") 995 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3") 996 self.assertEquals(str(res[0]["sambaNextRid"]), "1001") 997 self.assertTrue(not "revision" in res[0]) 998 999 # Modify of split record 1000 ldif = """ 1001dn: """ + dn + """ 1002replace: description 1003description: test 1004replace: badPwdCount 1005badPwdCount: 4 1006replace: revision 1007revision: 2 1008""" 1009 self.ldb.modify_ldif(ldif) 1010 # Check in mapped db 1011 attrs = ["description", "badPwdCount", "nextRid", "revision"] 1012 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) 1013 self.assertEquals(len(res), 1) 1014 self.assertEquals(str(res[0].dn), dn) 1015 self.assertEquals(str(res[0]["description"]), "test") 1016 self.assertEquals(str(res[0]["badPwdCount"]), "4") 1017 self.assertEquals(str(res[0]["nextRid"]), "1001") 1018 self.assertEquals(str(res[0]["revision"]), "2") 1019 # Check in local db 1020 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) 1021 self.assertEquals(len(res), 1) 1022 self.assertEquals(str(res[0].dn), dn) 1023 self.assertTrue(not "description" in res[0]) 1024 self.assertTrue(not "badPwdCount" in res[0]) 1025 self.assertTrue(not "nextRid" in res[0]) 1026 self.assertEquals(str(res[0]["revision"]), "2") 1027 # Check in remote db 1028 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 1029 "revision"] 1030 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) 1031 self.assertEquals(len(res), 1) 1032 self.assertEquals(str(res[0].dn), dn2) 1033 self.assertEquals(str(res[0]["description"]), "test") 1034 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4") 1035 self.assertEquals(str(res[0]["sambaNextRid"]), "1001") 1036 self.assertTrue(not "revision" in res[0]) 1037 1038 # Rename split record 1039 dn2 = self.samba4.dn("cn=toast") 1040 self.ldb.rename(dn, dn2) 1041 # Check in mapped db 1042 dn = dn2 1043 attrs = ["description", "badPwdCount", "nextRid", "revision"] 1044 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) 1045 self.assertEquals(len(res), 1) 1046 self.assertEquals(str(res[0].dn), dn) 1047 self.assertEquals(str(res[0]["description"]), "test") 1048 self.assertEquals(str(res[0]["badPwdCount"]), "4") 1049 self.assertEquals(str(res[0]["nextRid"]), "1001") 1050 self.assertEquals(str(res[0]["revision"]), "2") 1051 # Check in local db 1052 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) 1053 self.assertEquals(len(res), 1) 1054 self.assertEquals(str(res[0].dn), dn) 1055 self.assertTrue(not "description" in res[0]) 1056 self.assertTrue(not "badPwdCount" in res[0]) 1057 self.assertTrue(not "nextRid" in res[0]) 1058 self.assertEquals(str(res[0]["revision"]), "2") 1059 # Check in remote db 1060 dn2 = self.samba3.dn("cn=toast") 1061 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 1062 attrs=["description", "sambaBadPasswordCount", "sambaNextRid", 1063 "revision"]) 1064 self.assertEquals(len(res), 1) 1065 self.assertEquals(str(res[0].dn), dn2) 1066 self.assertEquals(str(res[0]["description"]), "test") 1067 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4") 1068 self.assertEquals(str(res[0]["sambaNextRid"]), "1001") 1069 self.assertTrue(not "revision" in res[0]) 1070 1071 # Delete split record 1072 self.ldb.delete(dn) 1073 # Check in mapped db 1074 res = self.ldb.search(dn, scope=SCOPE_BASE) 1075 self.assertEquals(len(res), 0) 1076 # Check in local db 1077 res = self.samba4.db.search(dn, scope=SCOPE_BASE) 1078 self.assertEquals(len(res), 0) 1079 # Check in remote db 1080 res = self.samba3.db.search(dn2, scope=SCOPE_BASE) 1081 self.assertEquals(len(res), 0) 1082