1/* ndbio.cpp - get/set/del data for NDB */
2/* $OpenLDAP$ */
3/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4 *
5 * Copyright 2008-2021 The OpenLDAP Foundation.
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted only as authorized by the OpenLDAP
10 * Public License.
11 *
12 * A copy of this license is available in the file LICENSE in the
13 * top-level directory of the distribution or, alternatively, at
14 * <http://www.OpenLDAP.org/license.html>.
15 */
16/* ACKNOWLEDGEMENTS:
17 * This work was initially developed by Howard Chu for inclusion
18 * in OpenLDAP Software. This work was sponsored by MySQL.
19 */
20
21#include "portable.h"
22
23#include <stdio.h>
24#include <ac/string.h>
25#include <ac/errno.h>
26#include <lutil.h>
27
28#include "back-ndb.h"
29
30/* For reference only */
31typedef struct MedVar {
32	Int16 len;	/* length is always little-endian */
33	char buf[1024];
34} MedVar;
35
36extern "C" {
37	static int ndb_name_cmp( const void *v1, const void *v2 );
38	static int ndb_oc_dup_err( void *v1, void *v2 );
39};
40
41static int
42ndb_name_cmp( const void *v1, const void *v2 )
43{
44	NdbOcInfo *oc1 = (NdbOcInfo *)v1, *oc2 = (NdbOcInfo *)v2;
45	return ber_bvstrcasecmp( &oc1->no_name, &oc2->no_name );
46}
47
48static int
49ndb_oc_dup_err( void *v1, void *v2 )
50{
51	NdbOcInfo *oc = (NdbOcInfo *)v2;
52
53	oc->no_oc = (ObjectClass *)v1;
54	return -1;
55}
56
57/* Find an existing NdbAttrInfo */
58extern "C" NdbAttrInfo *
59ndb_ai_find( struct ndb_info *ni, AttributeType *at )
60{
61	NdbAttrInfo atmp;
62	atmp.na_name = at->sat_cname;
63
64	return (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
65}
66
67/* Find or create an NdbAttrInfo */
68extern "C" NdbAttrInfo *
69ndb_ai_get( struct ndb_info *ni, struct berval *aname )
70{
71	NdbAttrInfo atmp, *ai;
72	atmp.na_name = *aname;
73
74	ai = (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
75	if ( !ai ) {
76		const char *text;
77		AttributeDescription *ad = NULL;
78
79		if ( slap_bv2ad( aname, &ad, &text ))
80			return NULL;
81
82		ai = (NdbAttrInfo *)ch_malloc( sizeof( NdbAttrInfo ));
83		ai->na_desc = ad;
84		ai->na_attr = ai->na_desc->ad_type;
85		ai->na_name = ai->na_attr->sat_cname;
86		ai->na_oi = NULL;
87		ai->na_flag = 0;
88		ai->na_ixcol = 0;
89		ai->na_len = ai->na_attr->sat_atype.at_syntax_len;
90		/* Reasonable default */
91		if ( !ai->na_len ) {
92			if ( ai->na_attr->sat_syntax == slap_schema.si_syn_distinguishedName )
93				ai->na_len = 1024;
94			else
95				ai->na_len = 128;
96		}
97		/* Arbitrary limit */
98		if ( ai->na_len > 1024 )
99			ai->na_len = 1024;
100		avl_insert( &ni->ni_ai_tree, ai, ndb_name_cmp, avl_dup_error );
101	}
102	return ai;
103}
104
105static int
106ndb_ai_check( struct ndb_info *ni, NdbOcInfo *oci, AttributeType **attrs, char **ptr, int *col,
107	int create )
108{
109	NdbAttrInfo *ai;
110	int i;
111
112	for ( i=0; attrs[i]; i++ ) {
113		if ( attrs[i] == slap_schema.si_ad_objectClass->ad_type )
114			continue;
115		/* skip attrs that are in a superior */
116		if ( oci->no_oc && oci->no_oc->soc_sups ) {
117			int j, k, found=0;
118			ObjectClass *oc;
119			for ( j=0; oci->no_oc->soc_sups[j]; j++ ) {
120				oc = oci->no_oc->soc_sups[j];
121				if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
122					continue;
123				if ( oc->soc_required ) {
124					for ( k=0; oc->soc_required[k]; k++ ) {
125						if ( attrs[i] == oc->soc_required[k] ) {
126							found = 1;
127							break;
128						}
129					}
130					if ( found ) break;
131				}
132				if ( oc->soc_allowed ) {
133					for ( k=0; oc->soc_allowed[k]; k++ ) {
134						if ( attrs[i] == oc->soc_allowed[k] ) {
135							found = 1;
136							break;
137						}
138					}
139					if ( found ) break;
140				}
141			}
142			if ( found )
143				continue;
144		}
145
146		ai = ndb_ai_get( ni, &attrs[i]->sat_cname );
147		if ( !ai ) {
148			/* can never happen */
149			return LDAP_OTHER;
150		}
151
152		/* An attrset may have already been connected */
153		if (( oci->no_flag & NDB_INFO_ATSET ) && ai->na_oi == oci )
154			continue;
155
156		/* An indexed attr is defined before its OC is */
157		if ( !ai->na_oi ) {
158			ai->na_oi = oci;
159			ai->na_column = (*col)++;
160		}
161
162		oci->no_attrs[oci->no_nattrs++] = ai;
163
164		/* An attrset attr may already be defined */
165		if ( ai->na_oi != oci ) {
166			int j;
167			for ( j=0; j<oci->no_nsets; j++ )
168				if ( oci->no_sets[j] == ai->na_oi ) break;
169			if ( j >= oci->no_nsets ) {
170				/* FIXME: data loss if more sets are in use */
171				if ( oci->no_nsets < NDB_MAX_OCSETS ) {
172					oci->no_sets[oci->no_nsets++] = ai->na_oi;
173				}
174			}
175			continue;
176		}
177
178		if ( create ) {
179			if ( ai->na_flag & NDB_INFO_ATBLOB ) {
180				*ptr += sprintf( *ptr, ", `%s` BLOB", ai->na_attr->sat_cname.bv_val );
181			} else {
182				*ptr += sprintf( *ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
183					ai->na_len );
184			}
185		}
186	}
187	return 0;
188}
189
190static int
191ndb_oc_create( struct ndb_info *ni, NdbOcInfo *oci, int create )
192{
193	char buf[4096], *ptr;
194	int i, rc = 0, col;
195
196	if ( create ) {
197		ptr = buf + sprintf( buf,
198			"CREATE TABLE `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
199			oci->no_table.bv_val );
200	}
201
202	col = 0;
203	if ( oci->no_oc->soc_required ) {
204		for ( i=0; oci->no_oc->soc_required[i]; i++ );
205		col += i;
206	}
207	if ( oci->no_oc->soc_allowed ) {
208		for ( i=0; oci->no_oc->soc_allowed[i]; i++ );
209		col += i;
210	}
211	/* assume all are present */
212	oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
213
214	col = 2;
215	ldap_pvt_thread_rdwr_wlock( &ni->ni_ai_rwlock );
216	if ( oci->no_oc->soc_required ) {
217		rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, &ptr, &col, create );
218	}
219	if ( !rc && oci->no_oc->soc_allowed ) {
220		rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, &ptr, &col, create );
221	}
222	ldap_pvt_thread_rdwr_wunlock( &ni->ni_ai_rwlock );
223
224	/* shrink down to just the needed size */
225	oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
226		oci->no_nattrs * sizeof(struct ndb_attrinfo *));
227
228	if ( create ) {
229		ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
230		rc = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
231		if ( rc ) {
232			Debug( LDAP_DEBUG_ANY,
233				"ndb_oc_create: CREATE TABLE %s failed, %s (%d)\n",
234				oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
235		}
236	}
237	return rc;
238}
239
240/* Read table definitions from the DB and populate ObjectClassInfo */
241extern "C" int
242ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
243{
244	const NdbDictionary::Table *myTable;
245	const NdbDictionary::Column *myCol;
246	NdbOcInfo *oci, octmp;
247	NdbAttrInfo *ai;
248	ObjectClass *oc;
249	NdbDictionary::Dictionary::List myList;
250	struct berval bv;
251	int i, j, rc, col;
252
253	rc = myDict->listObjects( myList, NdbDictionary::Object::UserTable );
254	/* Populate our objectClass structures */
255	for ( i=0; i<myList.count; i++ ) {
256		/* Ignore other DBs */
257		if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
258			continue;
259		/* Ignore internal tables */
260		if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
261			continue;
262		ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
263		oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
264		if ( oci )
265			continue;
266
267		oc = oc_bvfind( &octmp.no_name );
268		if ( !oc ) {
269			/* undefined - shouldn't happen */
270			continue;
271		}
272		myTable = myDict->getTable( myList.elements[i].name );
273		oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
274		oci->no_table.bv_val = (char *)(oci+1);
275		strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
276		oci->no_table.bv_len = oc->soc_cname.bv_len;
277		oci->no_name = oci->no_table;
278		oci->no_oc = oc;
279		oci->no_flag = 0;
280		oci->no_nsets = 0;
281		oci->no_nattrs = 0;
282		col = 0;
283		/* Make space for all attrs, even tho sups will be dropped */
284		if ( oci->no_oc->soc_required ) {
285			for ( j=0; oci->no_oc->soc_required[j]; j++ );
286			col = j;
287		}
288		if ( oci->no_oc->soc_allowed ) {
289			for ( j=0; oci->no_oc->soc_allowed[j]; j++ );
290			col += j;
291		}
292		oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
293		avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
294
295		col = myTable->getNoOfColumns();
296		/* Skip 0 and 1, eid and vid */
297		for ( j = 2; j<col; j++ ) {
298			myCol = myTable->getColumn( j );
299			ber_str2bv( myCol->getName(), 0, 0, &bv );
300			ai = ndb_ai_get( ni, &bv );
301			/* shouldn't happen */
302			if ( !ai )
303				continue;
304			ai->na_oi = oci;
305			ai->na_column = j;
306			ai->na_len = myCol->getLength();
307			if ( myCol->getType() == NdbDictionary::Column::Blob )
308				ai->na_flag |= NDB_INFO_ATBLOB;
309		}
310	}
311	/* Link to any attrsets */
312	for ( i=0; i<myList.count; i++ ) {
313		/* Ignore other DBs */
314		if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
315			continue;
316		/* Ignore internal tables */
317		if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
318			continue;
319		ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
320		oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
321		/* shouldn't happen */
322		if ( !oci )
323			continue;
324		col = 2;
325		if ( oci->no_oc->soc_required ) {
326			rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, NULL, &col, 0 );
327		}
328		if ( oci->no_oc->soc_allowed ) {
329			rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, NULL, &col, 0 );
330		}
331		/* shrink down to just the needed size */
332		oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
333			oci->no_nattrs * sizeof(struct ndb_attrinfo *));
334	}
335	return 0;
336}
337
338static int
339ndb_oc_list( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
340	struct berval *oname, int implied, NdbOcs *out )
341{
342	const NdbDictionary::Table *myTable;
343	NdbOcInfo *oci, octmp;
344	ObjectClass *oc;
345	int i, rc;
346
347	/* shortcut top */
348	if ( ber_bvstrcasecmp( oname, &slap_schema.si_oc_top->soc_cname )) {
349		octmp.no_name = *oname;
350		oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
351		if ( oci ) {
352			oc = oci->no_oc;
353		} else {
354			oc = oc_bvfind( oname );
355			if ( !oc ) {
356				/* undefined - shouldn't happen */
357				return LDAP_INVALID_SYNTAX;
358			}
359		}
360		if ( oc->soc_sups ) {
361			int i;
362
363			for ( i=0; oc->soc_sups[i]; i++ ) {
364				rc = ndb_oc_list( ni, myDict, &oc->soc_sups[i]->soc_cname, 1, out );
365				if ( rc ) return rc;
366			}
367		}
368	} else {
369		oc = slap_schema.si_oc_top;
370	}
371	/* Only insert once */
372	for ( i=0; i<out->no_ntext; i++ )
373		if ( out->no_text[i].bv_val == oc->soc_cname.bv_val )
374			break;
375	if ( i == out->no_ntext ) {
376		for ( i=0; i<out->no_nitext; i++ )
377			if ( out->no_itext[i].bv_val == oc->soc_cname.bv_val )
378				break;
379		if ( i == out->no_nitext ) {
380			if ( implied )
381				out->no_itext[out->no_nitext++] = oc->soc_cname;
382			else
383				out->no_text[out->no_ntext++] = oc->soc_cname;
384		}
385	}
386
387	/* ignore top, etc... */
388	if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
389		return 0;
390
391	if ( !oci ) {
392		ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
393		oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
394		oci->no_table.bv_val = (char *)(oci+1);
395		strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
396		oci->no_table.bv_len = oc->soc_cname.bv_len;
397		oci->no_name = oci->no_table;
398		oci->no_oc = oc;
399		oci->no_flag = 0;
400		oci->no_nsets = 0;
401		oci->no_nattrs = 0;
402		ldap_pvt_thread_rdwr_wlock( &ni->ni_oc_rwlock );
403		if ( avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, ndb_oc_dup_err )) {
404			octmp.no_oc = oci->no_oc;
405			ch_free( oci );
406			oci = (NdbOcInfo *)octmp.no_oc;
407		}
408		/* see if the oc table already exists in the DB */
409		myTable = myDict->getTable( oci->no_table.bv_val );
410		rc = ndb_oc_create( ni, oci, myTable == NULL );
411		ldap_pvt_thread_rdwr_wunlock( &ni->ni_oc_rwlock );
412		ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
413		if ( rc ) return rc;
414	}
415	/* Only insert once */
416	for ( i=0; i<out->no_ninfo; i++ )
417		if ( out->no_info[i] == oci )
418			break;
419	if ( i == out->no_ninfo )
420		out->no_info[out->no_ninfo++] = oci;
421	return 0;
422}
423
424extern "C" int
425ndb_aset_get( struct ndb_info *ni, struct berval *sname, struct berval *attrs, NdbOcInfo **ret )
426{
427	NdbOcInfo *oci, octmp;
428	int i, rc;
429
430	octmp.no_name = *sname;
431	oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
432	if ( oci )
433		return LDAP_ALREADY_EXISTS;
434
435	for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ ) {
436		if ( !at_bvfind( &attrs[i] ))
437			return LDAP_NO_SUCH_ATTRIBUTE;
438	}
439	i++;
440
441	oci = (NdbOcInfo *)ch_calloc( 1, sizeof( NdbOcInfo ) + sizeof( ObjectClass ) +
442		i*sizeof(AttributeType *) + sname->bv_len+1 );
443	oci->no_oc = (ObjectClass *)(oci+1);
444	oci->no_oc->soc_required = (AttributeType **)(oci->no_oc+1);
445	oci->no_table.bv_val = (char *)(oci->no_oc->soc_required+i);
446
447	for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ )
448		oci->no_oc->soc_required[i] = at_bvfind( &attrs[i] );
449
450	strcpy( oci->no_table.bv_val, sname->bv_val );
451	oci->no_table.bv_len = sname->bv_len;
452	oci->no_name = oci->no_table;
453	oci->no_oc->soc_cname = oci->no_name;
454	oci->no_flag = NDB_INFO_ATSET;
455
456	if ( !ber_bvcmp( sname, &slap_schema.si_oc_extensibleObject->soc_cname ))
457		oci->no_oc->soc_kind = slap_schema.si_oc_extensibleObject->soc_kind;
458
459	rc = ndb_oc_create( ni, oci, 0 );
460	if ( !rc )
461		rc = avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
462	if ( rc ) {
463		ch_free( oci );
464	} else {
465		*ret = oci;
466	}
467	return rc;
468}
469
470extern "C" int
471ndb_aset_create( struct ndb_info *ni, NdbOcInfo *oci )
472{
473	char buf[4096], *ptr;
474	NdbAttrInfo *ai;
475	int i;
476
477	ptr = buf + sprintf( buf,
478		"CREATE TABLE IF NOT EXISTS `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
479		oci->no_table.bv_val );
480
481	for ( i=0; i<oci->no_nattrs; i++ ) {
482		if ( oci->no_attrs[i]->na_oi != oci )
483			continue;
484		ai = oci->no_attrs[i];
485		ptr += sprintf( ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
486			ai->na_len );
487		if ( ai->na_flag & NDB_INFO_INDEX ) {
488			ptr += sprintf( ptr, ", INDEX (`%s`)", ai->na_attr->sat_cname.bv_val );
489		}
490	}
491	ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
492	i = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
493	if ( i ) {
494		Debug( LDAP_DEBUG_ANY,
495			"ndb_aset_create: CREATE TABLE %s failed, %s (%d)\n",
496			oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
497	}
498	return i;
499}
500
501static int
502ndb_oc_check( BackendDB *be, Ndb *ndb,
503	struct berval *ocsin, NdbOcs *out )
504{
505	struct ndb_info *ni = (struct ndb_info *) be->be_private;
506	const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
507
508	int i, rc = 0;
509
510	out->no_ninfo = 0;
511	out->no_ntext = 0;
512	out->no_nitext = 0;
513
514	/* Find all objectclasses and their superiors. List
515	 * the superiors first.
516	 */
517
518	ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
519	for ( i=0; !BER_BVISNULL( &ocsin[i] ); i++ ) {
520		rc = ndb_oc_list( ni, myDict, &ocsin[i], 0, out );
521		if ( rc ) break;
522	}
523	ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
524	return rc;
525}
526
527#define	V_INS	1
528#define	V_DEL	2
529#define	V_REP	3
530
531static int ndb_flush_blobs;
532
533/* set all the unique attrs of this objectclass into the table
534 */
535extern "C" int
536ndb_oc_attrs(
537	NdbTransaction *txn,
538	const NdbDictionary::Table *myTable,
539	Entry *e,
540	NdbOcInfo *no,
541	NdbAttrInfo **attrs,
542	int nattrs,
543	Attribute *old
544)
545{
546	char buf[65538], *ptr;
547	Attribute **an, **ao, *a;
548	NdbOperation *myop;
549	int i, j, max = 0;
550	int changed, rc;
551	Uint64 eid = e->e_id;
552
553	if ( !nattrs )
554		return 0;
555
556	an = (Attribute **)ch_malloc( 2 * nattrs * sizeof(Attribute *));
557	ao = an + nattrs;
558
559	/* Turn lists of attrs into arrays for easier access */
560	for ( i=0; i<nattrs; i++ ) {
561		if ( attrs[i]->na_oi != no ) {
562			an[i] = NULL;
563			ao[i] = NULL;
564			continue;
565		}
566		for ( a=e->e_attrs; a; a=a->a_next ) {
567			if ( a->a_desc == slap_schema.si_ad_objectClass )
568				continue;
569			if ( a->a_desc->ad_type == attrs[i]->na_attr ) {
570				/* Don't process same attr twice */
571				if ( a->a_flags & SLAP_ATTR_IXADD )
572					a = NULL;
573				else
574					a->a_flags |= SLAP_ATTR_IXADD;
575				break;
576			}
577		}
578		an[i] = a;
579		if ( a && a->a_numvals > max )
580			max = a->a_numvals;
581		for ( a=old; a; a=a->a_next ) {
582			if ( a->a_desc == slap_schema.si_ad_objectClass )
583				continue;
584			if ( a->a_desc->ad_type == attrs[i]->na_attr )
585				break;
586		}
587		ao[i] = a;
588		if ( a && a->a_numvals > max )
589			max = a->a_numvals;
590	}
591
592	for ( i=0; i<max; i++ ) {
593		myop = NULL;
594		for ( j=0; j<nattrs; j++ ) {
595			if ( !an[j] && !ao[j] )
596				continue;
597			changed = 0;
598			if ( an[j] && an[j]->a_numvals > i ) {
599				/* both old and new are present, compare for changes */
600				if ( ao[j] && ao[j]->a_numvals > i ) {
601					if ( ber_bvcmp( &ao[j]->a_nvals[i], &an[j]->a_nvals[i] ))
602						changed = V_REP;
603				} else {
604					changed = V_INS;
605				}
606			} else {
607				if ( ao[j] && ao[j]->a_numvals > i )
608					changed = V_DEL;
609			}
610			if ( changed ) {
611				if ( !myop ) {
612					rc = LDAP_OTHER;
613					myop = txn->getNdbOperation( myTable );
614					if ( !myop ) {
615						goto done;
616					}
617					if ( old ) {
618						if ( myop->writeTuple()) {
619							goto done;
620						}
621					} else {
622						if ( myop->insertTuple()) {
623							goto done;
624						}
625					}
626					if ( myop->equal( EID_COLUMN, eid )) {
627						goto done;
628					}
629					if ( myop->equal( VID_COLUMN, i )) {
630						goto done;
631					}
632				}
633				if ( attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
634					NdbBlob *myBlob = myop->getBlobHandle( attrs[j]->na_column );
635					rc = LDAP_OTHER;
636					if ( !myBlob ) {
637						Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: getBlobHandle failed %s (%d)\n",
638							myop->getNdbError().message, myop->getNdbError().code, 0 );
639						goto done;
640					}
641					if ( slapMode & SLAP_TOOL_MODE )
642						ndb_flush_blobs = 1;
643					if ( changed & V_INS ) {
644						if ( myBlob->setValue( an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len )) {
645							Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
646								myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
647							goto done;
648						}
649					} else {
650						if ( myBlob->setValue( NULL, 0 )) {
651							Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
652								myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
653							goto done;
654						}
655					}
656				} else {
657					if ( changed & V_INS ) {
658						if ( an[j]->a_vals[i].bv_len > attrs[j]->na_len ) {
659							Debug( LDAP_DEBUG_ANY, "ndb_oc_attrs: attribute %s too long for column\n",
660								attrs[j]->na_name.bv_val, 0, 0 );
661							rc = LDAP_CONSTRAINT_VIOLATION;
662							goto done;
663						}
664						ptr = buf;
665						*ptr++ = an[j]->a_vals[i].bv_len & 0xff;
666						if ( attrs[j]->na_len > 255 ) {
667							/* MedVar */
668							*ptr++ = an[j]->a_vals[i].bv_len >> 8;
669						}
670						memcpy( ptr, an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len );
671						ptr = buf;
672					} else {
673						ptr = NULL;
674					}
675					if ( myop->setValue( attrs[j]->na_column, ptr )) {
676						rc = LDAP_OTHER;
677						goto done;
678					}
679				}
680			}
681		}
682	}
683	rc = LDAP_SUCCESS;
684done:
685	ch_free( an );
686	if ( rc ) {
687		Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: failed %s (%d)\n",
688			myop->getNdbError().message, myop->getNdbError().code, 0 );
689	}
690	return rc;
691}
692
693static int
694ndb_oc_put(
695	const NdbDictionary::Dictionary *myDict,
696	NdbTransaction *txn, NdbOcInfo *no, Entry *e )
697{
698	const NdbDictionary::Table *myTable;
699	int i, rc;
700
701	for ( i=0; i<no->no_nsets; i++ ) {
702		rc = ndb_oc_put( myDict, txn, no->no_sets[i], e );
703		if ( rc )
704			return rc;
705	}
706
707	myTable = myDict->getTable( no->no_table.bv_val );
708	if ( !myTable )
709		return LDAP_OTHER;
710
711	return ndb_oc_attrs( txn, myTable, e, no, no->no_attrs, no->no_nattrs, NULL );
712}
713
714/* This is now only used for Adds. Modifies call ndb_oc_attrs directly. */
715extern "C" int
716ndb_entry_put_data(
717	BackendDB *be,
718	NdbArgs *NA
719)
720{
721	struct ndb_info *ni = (struct ndb_info *) be->be_private;
722	Attribute *aoc;
723	const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
724	NdbOcs myOcs;
725	int i, rc;
726
727	/* Get the entry's objectClass attribute */
728	aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
729	if ( !aoc )
730		return LDAP_OTHER;
731
732	ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
733	myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
734
735	/* Walk thru objectclasses, find all the attributes belonging to a class */
736	for ( i=0; i<myOcs.no_ninfo; i++ ) {
737		rc = ndb_oc_put( myDict, NA->txn, myOcs.no_info[i], NA->e );
738		if ( rc ) return rc;
739	}
740
741	/* slapadd tries to batch multiple entries per txn, but entry data is
742	 * transient and blob data is required to remain valid for the whole txn.
743	 * So we need to flush blobs before their source data disappears.
744	 */
745	if (( slapMode & SLAP_TOOL_MODE ) && ndb_flush_blobs )
746		NA->txn->execute( NdbTransaction::NoCommit );
747
748	return 0;
749}
750
751static void
752ndb_oc_get( Operation *op, NdbOcInfo *no, int *j, int *nocs, NdbOcInfo ***oclist )
753{
754	int i;
755	NdbOcInfo  **ol2;
756
757	for ( i=0; i<no->no_nsets; i++ ) {
758		ndb_oc_get( op, no->no_sets[i], j, nocs, oclist );
759	}
760
761	/* Don't insert twice */
762	ol2 = *oclist;
763	for ( i=0; i<*j; i++ )
764		if ( ol2[i] == no )
765			return;
766
767	if ( *j >= *nocs ) {
768		*nocs *= 2;
769		ol2 = (NdbOcInfo **)op->o_tmprealloc( *oclist, *nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
770		*oclist = ol2;
771	}
772	ol2 = *oclist;
773	ol2[(*j)++] = no;
774}
775
776/* Retrieve attribute data for given entry. The entry's DN and eid should
777 * already be populated.
778 */
779extern "C" int
780ndb_entry_get_data(
781	Operation *op,
782	NdbArgs *NA,
783	int update
784)
785{
786	struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
787	const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
788	const NdbDictionary::Table *myTable;
789	NdbIndexScanOperation **myop = NULL;
790	Uint64 eid;
791
792	Attribute *a;
793	NdbOcs myOcs;
794	NdbOcInfo *oci, **oclist = NULL;
795	char abuf[65536], *ptr, **attrs = NULL;
796	struct berval bv[2];
797	int *ocx = NULL;
798
799	/* FIXME: abuf should be dynamically allocated */
800
801	int i, j, k, nocs, nattrs, rc = LDAP_OTHER;
802
803	eid = NA->e->e_id;
804
805	ndb_oc_check( op->o_bd, NA->ndb, NA->ocs, &myOcs );
806	myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
807	nocs = myOcs.no_ninfo;
808
809	oclist = (NdbOcInfo **)op->o_tmpcalloc( 1, nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
810
811	for ( i=0, j=0; i<myOcs.no_ninfo; i++ ) {
812		ndb_oc_get( op, myOcs.no_info[i], &j, &nocs, &oclist );
813	}
814
815	nocs = j;
816	nattrs = 0;
817	for ( i=0; i<nocs; i++ )
818		nattrs += oclist[i]->no_nattrs;
819
820	ocx = (int *)op->o_tmpalloc( nocs * sizeof(int), op->o_tmpmemctx );
821
822	attrs = (char **)op->o_tmpalloc( nattrs * sizeof(char *), op->o_tmpmemctx );
823
824	myop = (NdbIndexScanOperation **)op->o_tmpalloc( nattrs * sizeof(NdbIndexScanOperation *), op->o_tmpmemctx );
825
826	k = 0;
827	ptr = abuf;
828	for ( i=0; i<nocs; i++ ) {
829		oci = oclist[i];
830
831		myop[i] = NA->txn->getNdbIndexScanOperation( "PRIMARY", oci->no_table.bv_val );
832		if ( !myop[i] )
833			goto leave;
834		if ( myop[i]->readTuples( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead ))
835			goto leave;
836		if ( myop[i]->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
837			goto leave;
838
839		for ( j=0; j<oci->no_nattrs; j++ ) {
840			if ( oci->no_attrs[j]->na_oi != oci )
841				continue;
842			if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
843				NdbBlob *bi = myop[i]->getBlobHandle( oci->no_attrs[j]->na_column );
844				attrs[k++] = (char *)bi;
845			} else {
846				attrs[k] = ptr;
847				*ptr++ = 0;
848				if ( oci->no_attrs[j]->na_len > 255 )
849					*ptr++ = 0;
850				ptr += oci->no_attrs[j]->na_len + 1;
851				myop[i]->getValue( oci->no_attrs[j]->na_column, attrs[k++] );
852			}
853		}
854		ocx[i] = k;
855	}
856	/* Must use IgnoreError, because an entry with multiple objectClasses may not
857	 * actually have attributes defined in each class / table.
858	 */
859	if ( NA->txn->execute( NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 )
860		goto leave;
861
862	/* count results */
863	for ( i=0; i<nocs; i++ ) {
864		if (( j = myop[i]->nextResult(true) )) {
865			if ( j < 0 ) {
866				Debug( LDAP_DEBUG_TRACE,
867					"ndb_entry_get_data: first nextResult(%d) failed: %s (%d)\n",
868					i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
869			}
870			myop[i] = NULL;
871		}
872	}
873
874	nattrs = 0;
875	k = 0;
876	for ( i=0; i<nocs; i++ ) {
877		oci = oclist[i];
878		for ( j=0; j<oci->no_nattrs; j++ ) {
879			unsigned char *buf;
880			int len;
881			if ( oci->no_attrs[j]->na_oi != oci )
882				continue;
883			if ( !myop[i] ) {
884				attrs[k] = NULL;
885			} else if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
886				void *vi = attrs[k];
887				NdbBlob *bi = (NdbBlob *)vi;
888				int isNull;
889				bi->getNull( isNull );
890				if ( !isNull ) {
891					nattrs++;
892				} else {
893					attrs[k] = NULL;
894				}
895			} else {
896				buf = (unsigned char *)attrs[k];
897				len = buf[0];
898				if ( oci->no_attrs[j]->na_len > 255 ) {
899					/* MedVar */
900					len |= (buf[1] << 8);
901				}
902				if ( len ) {
903					nattrs++;
904				} else {
905					attrs[k] = NULL;
906				}
907			}
908			k++;
909		}
910	}
911
912	a = attrs_alloc( nattrs+1 );
913	NA->e->e_attrs = a;
914
915	a->a_desc = slap_schema.si_ad_objectClass;
916	a->a_vals = NULL;
917	ber_bvarray_dup_x( &a->a_vals, NA->ocs, NULL );
918	a->a_nvals = a->a_vals;
919	a->a_numvals = myOcs.no_ntext;
920
921	BER_BVZERO( &bv[1] );
922
923	do {
924		a = NA->e->e_attrs->a_next;
925		k = 0;
926		for ( i=0; i<nocs; k=ocx[i], i++ ) {
927			oci = oclist[i];
928			for ( j=0; j<oci->no_nattrs; j++ ) {
929				unsigned char *buf;
930				struct berval nbv;
931				if ( oci->no_attrs[j]->na_oi != oci )
932					continue;
933				buf = (unsigned char *)attrs[k++];
934				if ( !buf )
935					continue;
936				if ( !myop[i] ) {
937					a=a->a_next;
938					continue;
939				}
940				if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
941					void *vi = (void *)buf;
942					NdbBlob *bi = (NdbBlob *)vi;
943					Uint64 len;
944					Uint32 len2;
945					int isNull;
946					bi->getNull( isNull );
947					if ( isNull ) {
948						a = a->a_next;
949						continue;
950					}
951					bi->getLength( len );
952					bv[0].bv_len = len;
953					bv[0].bv_val = (char *)ch_malloc( len+1 );
954					len2 = len;
955					if ( bi->readData( bv[0].bv_val, len2 )) {
956						Debug( LDAP_DEBUG_TRACE,
957							"ndb_entry_get_data: blob readData failed: %s (%d), len %d\n",
958							bi->getNdbError().message, bi->getNdbError().code, len2 );
959					}
960					bv[0].bv_val[len] = '\0';
961					ber_bvarray_add_x( &a->a_vals, bv, NULL );
962				} else {
963					bv[0].bv_len = buf[0];
964					if ( oci->no_attrs[j]->na_len > 255 ) {
965						/* MedVar */
966						bv[0].bv_len |= (buf[1] << 8);
967						bv[0].bv_val = (char *)buf+2;
968						buf[1] = 0;
969					} else {
970						bv[0].bv_val = (char *)buf+1;
971					}
972					buf[0] = 0;
973					if ( bv[0].bv_len == 0 ) {
974						a = a->a_next;
975						continue;
976					}
977					bv[0].bv_val[bv[0].bv_len] = '\0';
978					value_add_one( &a->a_vals, bv );
979				}
980				a->a_desc = oci->no_attrs[j]->na_desc;
981				attr_normalize_one( a->a_desc, bv, &nbv, NULL );
982				a->a_numvals++;
983				if ( !BER_BVISNULL( &nbv )) {
984					ber_bvarray_add_x( &a->a_nvals, &nbv, NULL );
985				} else if ( !a->a_nvals ) {
986					a->a_nvals = a->a_vals;
987				}
988				a = a->a_next;
989			}
990		}
991		k = 0;
992		for ( i=0; i<nocs; i++ ) {
993			if ( !myop[i] )
994				continue;
995			if ((j = myop[i]->nextResult(true))) {
996				if ( j < 0 ) {
997					Debug( LDAP_DEBUG_TRACE,
998						"ndb_entry_get_data: last nextResult(%d) failed: %s (%d)\n",
999						i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
1000				}
1001				myop[i] = NULL;
1002			} else {
1003				k = 1;
1004			}
1005		}
1006	} while ( k );
1007
1008	rc = 0;
1009leave:
1010	if ( myop ) {
1011		op->o_tmpfree( myop, op->o_tmpmemctx );
1012	}
1013	if ( attrs ) {
1014		op->o_tmpfree( attrs, op->o_tmpmemctx );
1015	}
1016	if ( ocx ) {
1017		op->o_tmpfree( ocx, op->o_tmpmemctx );
1018	}
1019	if ( oclist ) {
1020		op->o_tmpfree( oclist, op->o_tmpmemctx );
1021	}
1022
1023	return rc;
1024}
1025
1026static int
1027ndb_oc_del(
1028	NdbTransaction *txn, Uint64 eid, NdbOcInfo *no )
1029{
1030	NdbIndexScanOperation *myop;
1031	int i, rc;
1032
1033	for ( i=0; i<no->no_nsets; i++ ) {
1034		rc = ndb_oc_del( txn, eid, no->no_sets[i] );
1035		if ( rc ) return rc;
1036	}
1037
1038	myop = txn->getNdbIndexScanOperation( "PRIMARY", no->no_table.bv_val );
1039	if ( !myop )
1040		return LDAP_OTHER;
1041	if ( myop->readTuples( NdbOperation::LM_Exclusive ))
1042		return LDAP_OTHER;
1043	if ( myop->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
1044		return LDAP_OTHER;
1045
1046	txn->execute(NoCommit);
1047	while ( myop->nextResult(true) == 0) {
1048		do {
1049			myop->deleteCurrentTuple();
1050		} while (myop->nextResult(false) == 0);
1051		txn->execute(NoCommit);
1052	}
1053
1054	return 0;
1055}
1056
1057extern "C" int
1058ndb_entry_del_data(
1059	BackendDB *be,
1060	NdbArgs *NA
1061)
1062{
1063	struct ndb_info *ni = (struct ndb_info *) be->be_private;
1064	Uint64 eid = NA->e->e_id;
1065	int i;
1066	NdbOcs myOcs;
1067
1068	ndb_oc_check( be, NA->ndb, NA->ocs, &myOcs );
1069	myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
1070
1071	for ( i=0; i<myOcs.no_ninfo; i++ ) {
1072		if ( ndb_oc_del( NA->txn, eid, myOcs.no_info[i] ))
1073			return LDAP_OTHER;
1074	}
1075
1076	return 0;
1077}
1078
1079extern "C" int
1080ndb_dn2rdns(
1081	struct berval *dn,
1082	NdbRdns *rdns
1083)
1084{
1085	char *beg, *end;
1086	int i, len;
1087
1088	/* Walk thru RDNs */
1089	end = dn->bv_val + dn->bv_len;
1090	for ( i=0; i<NDB_MAX_RDNS; i++ ) {
1091		for ( beg = end-1; beg > dn->bv_val; beg-- ) {
1092			if (*beg == ',') {
1093				beg++;
1094				break;
1095			}
1096		}
1097		if ( beg >= dn->bv_val ) {
1098			len = end - beg;
1099			/* RDN is too long */
1100			if ( len > NDB_RDN_LEN )
1101				return LDAP_CONSTRAINT_VIOLATION;
1102			memcpy( rdns->nr_buf[i]+1, beg, len );
1103		} else {
1104			break;
1105		}
1106		rdns->nr_buf[i][0] = len;
1107		end = beg - 1;
1108	}
1109	/* Too many RDNs in DN */
1110	if ( i == NDB_MAX_RDNS && beg > dn->bv_val ) {
1111			return LDAP_CONSTRAINT_VIOLATION;
1112	}
1113	rdns->nr_num = i;
1114	return 0;
1115}
1116
1117static int
1118ndb_rdns2keys(
1119	NdbOperation *myop,
1120	NdbRdns *rdns
1121)
1122{
1123	int i;
1124	char dummy[2] = {0,0};
1125
1126	/* Walk thru RDNs */
1127	for ( i=0; i<rdns->nr_num; i++ ) {
1128		if ( myop->equal( i+RDN_COLUMN, rdns->nr_buf[i] ))
1129			return LDAP_OTHER;
1130	}
1131	for ( ; i<NDB_MAX_RDNS; i++ ) {
1132		if ( myop->equal( i+RDN_COLUMN, dummy ))
1133			return LDAP_OTHER;
1134	}
1135	return 0;
1136}
1137
1138/* Store the DN2ID_TABLE fields */
1139extern "C" int
1140ndb_entry_put_info(
1141	BackendDB *be,
1142	NdbArgs *NA,
1143	int update
1144)
1145{
1146	struct ndb_info *ni = (struct ndb_info *) be->be_private;
1147	const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1148	const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1149	NdbOperation *myop;
1150	NdbAttrInfo *ai;
1151	Attribute *aoc, *a;
1152
1153	/* Get the entry's objectClass attribute; it's ok to be
1154	 * absent on a fresh insert
1155	 */
1156	aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
1157	if ( update && !aoc )
1158		return LDAP_OBJECT_CLASS_VIOLATION;
1159
1160	myop = NA->txn->getNdbOperation( myTable );
1161	if ( !myop )
1162		return LDAP_OTHER;
1163	if ( update ) {
1164		if ( myop->updateTuple())
1165			return LDAP_OTHER;
1166	} else {
1167		if ( myop->insertTuple())
1168			return LDAP_OTHER;
1169	}
1170
1171	if ( ndb_rdns2keys( myop, NA->rdns ))
1172		return LDAP_OTHER;
1173
1174	/* Set entry ID */
1175	{
1176		Uint64 eid = NA->e->e_id;
1177		if ( myop->setValue( EID_COLUMN, eid ))
1178			return LDAP_OTHER;
1179	}
1180
1181	/* Set list of objectClasses */
1182	/* List is <sp> <class> <sp> <class> <sp> ... so that
1183	 * searches for " class " will yield accurate results
1184	 */
1185	if ( aoc ) {
1186		char *ptr, buf[sizeof(MedVar)];
1187		NdbOcs myOcs;
1188		int i;
1189
1190		ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
1191		ptr = buf+2;
1192		*ptr++ = ' ';
1193		for ( i=0; i<myOcs.no_ntext; i++ ) {
1194			/* data loss... */
1195			if ( ptr + myOcs.no_text[i].bv_len + 1 >= &buf[sizeof(buf)] )
1196				break;
1197			ptr = lutil_strcopy( ptr, myOcs.no_text[i].bv_val );
1198			*ptr++ = ' ';
1199		}
1200
1201		/* implicit classes */
1202		if ( myOcs.no_nitext ) {
1203			*ptr++ = '@';
1204			*ptr++ = ' ';
1205			for ( i=0; i<myOcs.no_nitext; i++ ) {
1206				/* data loss... */
1207				if ( ptr + myOcs.no_itext[i].bv_len + 1 >= &buf[sizeof(buf)] )
1208					break;
1209				ptr = lutil_strcopy( ptr, myOcs.no_itext[i].bv_val );
1210				*ptr++ = ' ';
1211			}
1212		}
1213
1214		i = ptr - buf - 2;
1215		buf[0] = i & 0xff;
1216		buf[1] = i >> 8;
1217		if ( myop->setValue( OCS_COLUMN, buf ))
1218			return LDAP_OTHER;
1219	}
1220
1221	/* Set any indexed attrs */
1222	for ( a = NA->e->e_attrs; a; a=a->a_next ) {
1223		ai = ndb_ai_find( ni, a->a_desc->ad_type );
1224		if ( ai && ( ai->na_flag & NDB_INFO_INDEX )) {
1225			char *ptr, buf[sizeof(MedVar)];
1226			int len;
1227
1228			ptr = buf+1;
1229			len = a->a_vals[0].bv_len;
1230			/* FIXME: data loss */
1231			if ( len > ai->na_len )
1232				len = ai->na_len;
1233			buf[0] = len & 0xff;
1234			if ( ai->na_len > 255 ) {
1235				*ptr++ = len >> 8;
1236			}
1237			memcpy( ptr, a->a_vals[0].bv_val, len );
1238			if ( myop->setValue( ai->na_ixcol, buf ))
1239				return LDAP_OTHER;
1240		}
1241	}
1242
1243	return 0;
1244}
1245
1246extern "C" struct berval *
1247ndb_str2bvarray(
1248	char *str,
1249	int len,
1250	char delim,
1251	void *ctx
1252)
1253{
1254	struct berval *list, tmp;
1255	char *beg;
1256	int i, num;
1257
1258	while ( *str == delim ) {
1259		str++;
1260		len--;
1261	}
1262
1263	while ( str[len-1] == delim ) {
1264		str[--len] = '\0';
1265	}
1266
1267	for ( i = 1, beg = str;; i++ ) {
1268		beg = strchr( beg, delim );
1269		if ( !beg )
1270			break;
1271		if ( beg >= str + len )
1272			break;
1273		beg++;
1274	}
1275
1276	num = i;
1277	list = (struct berval *)slap_sl_malloc( (num+1)*sizeof(struct berval), ctx);
1278
1279	for ( i = 0, beg = str; i<num; i++ ) {
1280		tmp.bv_val = beg;
1281		beg = strchr( beg, delim );
1282		if ( beg >= str + len )
1283			beg = NULL;
1284		if ( beg ) {
1285			tmp.bv_len = beg - tmp.bv_val;
1286		} else {
1287			tmp.bv_len = len - (tmp.bv_val - str);
1288		}
1289		ber_dupbv_x( &list[i], &tmp, ctx );
1290		beg++;
1291	}
1292
1293	BER_BVZERO( &list[i] );
1294	return list;
1295}
1296
1297extern "C" struct berval *
1298ndb_ref2oclist(
1299	const char *ref,
1300	void *ctx
1301)
1302{
1303	char *implied;
1304
1305	/* MedVar */
1306	int len = ref[0] | (ref[1] << 8);
1307
1308	/* don't return the implied classes */
1309	implied = (char *)memchr( ref+2, '@', len );
1310	if ( implied ) {
1311		len = implied - ref - 2;
1312		*implied = '\0';
1313	}
1314
1315	return ndb_str2bvarray( (char *)ref+2, len, ' ', ctx );
1316}
1317
1318/* Retrieve the DN2ID_TABLE fields. Can call with NULL ocs if just verifying
1319 * the existence of a DN.
1320 */
1321extern "C" int
1322ndb_entry_get_info(
1323	Operation *op,
1324	NdbArgs *NA,
1325	int update,
1326	struct berval *matched
1327)
1328{
1329	struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1330	const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1331	const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1332	NdbOperation *myop[NDB_MAX_RDNS];
1333	NdbRecAttr *eid[NDB_MAX_RDNS], *oc[NDB_MAX_RDNS];
1334	char idbuf[NDB_MAX_RDNS][2*sizeof(ID)];
1335	char ocbuf[NDB_MAX_RDNS][NDB_OC_BUFLEN];
1336
1337	if ( matched ) {
1338		BER_BVZERO( matched );
1339	}
1340	if ( !myTable ) {
1341		return LDAP_OTHER;
1342	}
1343
1344	myop[0] = NA->txn->getNdbOperation( myTable );
1345	if ( !myop[0] ) {
1346		return LDAP_OTHER;
1347	}
1348
1349	if ( myop[0]->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead )) {
1350		return LDAP_OTHER;
1351	}
1352
1353	if ( !NA->rdns->nr_num && ndb_dn2rdns( &NA->e->e_name, NA->rdns )) {
1354		return LDAP_NO_SUCH_OBJECT;
1355	}
1356
1357	if ( ndb_rdns2keys( myop[0], NA->rdns )) {
1358		return LDAP_OTHER;
1359	}
1360
1361	eid[0] = myop[0]->getValue( EID_COLUMN, idbuf[0] );
1362	if ( !eid[0] ) {
1363		return LDAP_OTHER;
1364	}
1365
1366	ocbuf[0][0] = 0;
1367	ocbuf[0][1] = 0;
1368	if ( !NA->ocs ) {
1369		oc[0] = myop[0]->getValue( OCS_COLUMN, ocbuf[0] );
1370		if ( !oc[0] ) {
1371			return LDAP_OTHER;
1372		}
1373	}
1374
1375	if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1376		return LDAP_OTHER;
1377	}
1378
1379	switch( myop[0]->getNdbError().code ) {
1380	case 0:
1381		if ( !eid[0]->isNULL() && ( NA->e->e_id = eid[0]->u_64_value() )) {
1382			/* If we didn't care about OCs, or we got them */
1383			if ( NA->ocs || ocbuf[0][0] || ocbuf[0][1] ) {
1384				/* If wanted, return them */
1385				if ( !NA->ocs )
1386					NA->ocs = ndb_ref2oclist( ocbuf[0], op->o_tmpmemctx );
1387				break;
1388			}
1389		}
1390		/* FALLTHRU */
1391	case NDB_NO_SUCH_OBJECT:	/* no such tuple: look for closest parent */
1392		if ( matched ) {
1393			int i, j, k;
1394			char dummy[2] = {0,0};
1395
1396			/* get to last RDN, then back up 1 */
1397			k = NA->rdns->nr_num - 1;
1398
1399			for ( i=0; i<k; i++ ) {
1400				myop[i] = NA->txn->getNdbOperation( myTable );
1401				if ( !myop[i] )
1402					return LDAP_OTHER;
1403				if ( myop[i]->readTuple( NdbOperation::LM_CommittedRead ))
1404					return LDAP_OTHER;
1405				for ( j=0; j<=i; j++ ) {
1406					if ( myop[i]->equal( j+RDN_COLUMN, NA->rdns->nr_buf[j] ))
1407						return LDAP_OTHER;
1408				}
1409				for ( ;j<NDB_MAX_RDNS; j++ ) {
1410					if ( myop[i]->equal( j+RDN_COLUMN, dummy ))
1411						return LDAP_OTHER;
1412				}
1413				eid[i] = myop[i]->getValue( EID_COLUMN, idbuf[i] );
1414				if ( !eid[i] ) {
1415					return LDAP_OTHER;
1416				}
1417				ocbuf[i][0] = 0;
1418				ocbuf[i][1] = 0;
1419				if ( !NA->ocs ) {
1420					oc[i] = myop[0]->getValue( OCS_COLUMN, ocbuf[i] );
1421					if ( !oc[i] ) {
1422						return LDAP_OTHER;
1423					}
1424				}
1425			}
1426			if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1427				return LDAP_OTHER;
1428			}
1429			for ( --i; i>=0; i-- ) {
1430				if ( myop[i]->getNdbError().code == 0 ) {
1431					for ( j=0; j<=i; j++ )
1432						matched->bv_len += NA->rdns->nr_buf[j][0];
1433					NA->erdns = NA->rdns->nr_num;
1434					NA->rdns->nr_num = j;
1435					matched->bv_len += i;
1436					matched->bv_val = NA->e->e_name.bv_val +
1437						NA->e->e_name.bv_len - matched->bv_len;
1438					if ( !eid[i]->isNULL() )
1439						NA->e->e_id = eid[i]->u_64_value();
1440					if ( !NA->ocs )
1441						NA->ocs = ndb_ref2oclist( ocbuf[i], op->o_tmpmemctx );
1442					break;
1443				}
1444			}
1445		}
1446		return LDAP_NO_SUCH_OBJECT;
1447	default:
1448		return LDAP_OTHER;
1449	}
1450
1451	return 0;
1452}
1453
1454extern "C" int
1455ndb_entry_del_info(
1456	BackendDB *be,
1457	NdbArgs *NA
1458)
1459{
1460	struct ndb_info *ni = (struct ndb_info *) be->be_private;
1461	const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1462	const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1463	NdbOperation *myop;
1464
1465	myop = NA->txn->getNdbOperation( myTable );
1466	if ( !myop )
1467		return LDAP_OTHER;
1468	if ( myop->deleteTuple())
1469		return LDAP_OTHER;
1470
1471	if ( ndb_rdns2keys( myop, NA->rdns ))
1472		return LDAP_OTHER;
1473
1474	return 0;
1475}
1476
1477extern "C" int
1478ndb_next_id(
1479	BackendDB *be,
1480	Ndb *ndb,
1481	ID *id
1482)
1483{
1484	struct ndb_info *ni = (struct ndb_info *) be->be_private;
1485	const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
1486	const NdbDictionary::Table *myTable = myDict->getTable( NEXTID_TABLE );
1487	Uint64 nid = 0;
1488	int rc;
1489
1490	if ( !myTable ) {
1491		Debug( LDAP_DEBUG_ANY, "ndb_next_id: " NEXTID_TABLE " table is missing\n",
1492			0, 0, 0 );
1493		return LDAP_OTHER;
1494	}
1495
1496	rc = ndb->getAutoIncrementValue( myTable, nid, 1000 );
1497	if ( !rc )
1498		*id = nid;
1499	return rc;
1500}
1501
1502extern "C" { static void ndb_thread_hfree( void *key, void *data ); };
1503static void
1504ndb_thread_hfree( void *key, void *data )
1505{
1506	Ndb *ndb = (Ndb *)data;
1507	delete ndb;
1508}
1509
1510extern "C" int
1511ndb_thread_handle(
1512	Operation *op,
1513	Ndb **ndb )
1514{
1515	struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1516	void *data;
1517
1518	if ( ldap_pvt_thread_pool_getkey( op->o_threadctx, ni, &data, NULL )) {
1519		Ndb *myNdb;
1520		int rc;
1521		ldap_pvt_thread_mutex_lock( &ni->ni_conn_mutex );
1522		myNdb = new Ndb( ni->ni_cluster[ni->ni_nextconn++], ni->ni_dbname );
1523		if ( ni->ni_nextconn >= ni->ni_nconns )
1524			ni->ni_nextconn = 0;
1525		ldap_pvt_thread_mutex_unlock( &ni->ni_conn_mutex );
1526		if ( !myNdb ) {
1527			return LDAP_OTHER;
1528		}
1529		rc = myNdb->init(1024);
1530		if ( rc ) {
1531			delete myNdb;
1532			Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1533				rc, 0, 0 );
1534			return rc;
1535		}
1536		data = (void *)myNdb;
1537		if (( rc = ldap_pvt_thread_pool_setkey( op->o_threadctx, ni,
1538			data, ndb_thread_hfree, NULL, NULL ))) {
1539			delete myNdb;
1540			Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1541				rc, 0, 0 );
1542			return rc;
1543		}
1544	}
1545	*ndb = (Ndb *)data;
1546	return 0;
1547}
1548
1549extern "C" int
1550ndb_entry_get(
1551	Operation *op,
1552	struct berval *ndn,
1553	ObjectClass *oc,
1554	AttributeDescription *ad,
1555	int rw,
1556	Entry **ent )
1557{
1558	struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1559	NdbArgs NA;
1560	Entry e = {0};
1561	int rc;
1562
1563	/* Get our NDB handle */
1564	rc = ndb_thread_handle( op, &NA.ndb );
1565
1566	NA.txn = NA.ndb->startTransaction();
1567	if( !NA.txn ) {
1568		Debug( LDAP_DEBUG_TRACE,
1569			LDAP_XSTRING(ndb_entry_get) ": startTransaction failed: %s (%d)\n",
1570			NA.ndb->getNdbError().message, NA.ndb->getNdbError().code, 0 );
1571		return 1;
1572	}
1573
1574	e.e_name = *ndn;
1575	NA.e = &e;
1576	/* get entry */
1577	{
1578		NdbRdns rdns;
1579		rdns.nr_num = 0;
1580		NA.ocs = NULL;
1581		NA.rdns = &rdns;
1582		rc = ndb_entry_get_info( op, &NA, rw, NULL );
1583	}
1584	if ( rc == 0 ) {
1585		e.e_name = *ndn;
1586		e.e_nname = *ndn;
1587		rc = ndb_entry_get_data( op, &NA, 0 );
1588		ber_bvarray_free( NA.ocs );
1589		if ( rc == 0 ) {
1590			if ( oc && !is_entry_objectclass_or_sub( &e, oc )) {
1591				attrs_free( e.e_attrs );
1592				rc = 1;
1593			}
1594		}
1595	}
1596	if ( rc == 0 ) {
1597		*ent = entry_alloc();
1598		**ent = e;
1599		ber_dupbv( &(*ent)->e_name, ndn );
1600		ber_dupbv( &(*ent)->e_nname, ndn );
1601	} else {
1602		rc = 1;
1603	}
1604	NA.txn->close();
1605	return rc;
1606}
1607
1608/* Congestion avoidance code
1609 * for Deadlock Rollback
1610 */
1611
1612extern "C" void
1613ndb_trans_backoff( int num_retries )
1614{
1615	int i;
1616	int delay = 0;
1617	int pow_retries = 1;
1618	unsigned long key = 0;
1619	unsigned long max_key = -1;
1620	struct timeval timeout;
1621
1622	lutil_entropy( (unsigned char *) &key, sizeof( unsigned long ));
1623
1624	for ( i = 0; i < num_retries; i++ ) {
1625		if ( i >= 5 ) break;
1626		pow_retries *= 4;
1627	}
1628
1629	delay = 16384 * (key * (double) pow_retries / (double) max_key);
1630	delay = delay ? delay : 1;
1631
1632	Debug( LDAP_DEBUG_TRACE,  "delay = %d, num_retries = %d\n", delay, num_retries, 0 );
1633
1634	timeout.tv_sec = delay / 1000000;
1635	timeout.tv_usec = delay % 1000000;
1636	select( 0, NULL, NULL, NULL, &timeout );
1637}
1638
1639extern "C" void
1640ndb_check_referral( Operation *op, SlapReply *rs, NdbArgs *NA )
1641{
1642	struct berval dn, ndn;
1643	int i, dif;
1644	dif = NA->erdns - NA->rdns->nr_num;
1645
1646	/* Set full DN of matched into entry */
1647	for ( i=0; i<dif; i++ ) {
1648		dnParent( &NA->e->e_name, &dn );
1649		dnParent( &NA->e->e_nname, &ndn );
1650		NA->e->e_name = dn;
1651		NA->e->e_nname = ndn;
1652	}
1653
1654	/* return referral only if "disclose" is granted on the object */
1655	if ( access_allowed( op, NA->e, slap_schema.si_ad_entry,
1656		NULL, ACL_DISCLOSE, NULL )) {
1657		Attribute a;
1658		for ( i=0; !BER_BVISNULL( &NA->ocs[i] ); i++ );
1659		a.a_numvals = i;
1660		a.a_desc = slap_schema.si_ad_objectClass;
1661		a.a_vals = NA->ocs;
1662		a.a_nvals = NA->ocs;
1663		a.a_next = NULL;
1664		NA->e->e_attrs = &a;
1665		if ( is_entry_referral( NA->e )) {
1666			NA->e->e_attrs = NULL;
1667			ndb_entry_get_data( op, NA, 0 );
1668			rs->sr_ref = get_entry_referrals( op, NA->e );
1669			if ( rs->sr_ref ) {
1670				rs->sr_err = LDAP_REFERRAL;
1671				rs->sr_flags |= REP_REF_MUSTBEFREED;
1672			}
1673			attrs_free( NA->e->e_attrs );
1674		}
1675		NA->e->e_attrs = NULL;
1676	}
1677}
1678