1/*
2 * Copyright 2012-2015 Samy Al Bahra.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#define CK_HT_IM
28#include <ck_ht.h>
29
30/*
31 * This implementation borrows several techniques from Josh Dybnis's
32 * nbds library which can be found at http://code.google.com/p/nbds
33 */
34#include <ck_cc.h>
35#include <ck_md.h>
36#include <ck_pr.h>
37#include <ck_stdint.h>
38#include <ck_stdbool.h>
39#include <ck_string.h>
40
41#include "ck_ht_hash.h"
42#include "ck_internal.h"
43
44#ifndef CK_HT_BUCKET_LENGTH
45
46#ifdef CK_HT_PP
47#define CK_HT_BUCKET_SHIFT 2ULL
48#else
49#define CK_HT_BUCKET_SHIFT 1ULL
50#endif
51
52#define CK_HT_BUCKET_LENGTH (1U << CK_HT_BUCKET_SHIFT)
53#define CK_HT_BUCKET_MASK (CK_HT_BUCKET_LENGTH - 1)
54#endif
55
56#ifndef CK_HT_PROBE_DEFAULT
57#define CK_HT_PROBE_DEFAULT 64ULL
58#endif
59
60#if defined(CK_F_PR_LOAD_8) && defined(CK_F_PR_STORE_8)
61#define CK_HT_WORD	    uint8_t
62#define CK_HT_WORD_MAX	    UINT8_MAX
63#define CK_HT_STORE(x, y)   ck_pr_store_8(x, y)
64#define CK_HT_LOAD(x)	    ck_pr_load_8(x)
65#elif defined(CK_F_PR_LOAD_16) && defined(CK_F_PR_STORE_16)
66#define CK_HT_WORD	    uint16_t
67#define CK_HT_WORD_MAX	    UINT16_MAX
68#define CK_HT_STORE(x, y)   ck_pr_store_16(x, y)
69#define CK_HT_LOAD(x)	    ck_pr_load_16(x)
70#elif defined(CK_F_PR_LOAD_32) && defined(CK_F_PR_STORE_32)
71#define CK_HT_WORD	    uint32_t
72#define CK_HT_WORD_MAX	    UINT32_MAX
73#define CK_HT_STORE(x, y)   ck_pr_store_32(x, y)
74#define CK_HT_LOAD(x)	    ck_pr_load_32(x)
75#else
76#error "ck_ht is not supported on your platform."
77#endif
78
79struct ck_ht_map {
80	unsigned int mode;
81	CK_HT_TYPE deletions;
82	CK_HT_TYPE probe_maximum;
83	CK_HT_TYPE probe_length;
84	CK_HT_TYPE probe_limit;
85	CK_HT_TYPE size;
86	CK_HT_TYPE n_entries;
87	CK_HT_TYPE mask;
88	CK_HT_TYPE capacity;
89	CK_HT_TYPE step;
90	CK_HT_WORD *probe_bound;
91	struct ck_ht_entry *entries;
92};
93
94void
95ck_ht_stat(struct ck_ht *table,
96    struct ck_ht_stat *st)
97{
98	struct ck_ht_map *map = table->map;
99
100	st->n_entries = map->n_entries;
101	st->probe_maximum = map->probe_maximum;
102	return;
103}
104
105void
106ck_ht_hash(struct ck_ht_hash *h,
107    struct ck_ht *table,
108    const void *key,
109    uint16_t key_length)
110{
111
112	table->h(h, key, key_length, table->seed);
113	return;
114}
115
116void
117ck_ht_hash_direct(struct ck_ht_hash *h,
118    struct ck_ht *table,
119    uintptr_t key)
120{
121
122	ck_ht_hash(h, table, &key, sizeof(key));
123	return;
124}
125
126static void
127ck_ht_hash_wrapper(struct ck_ht_hash *h,
128    const void *key,
129    size_t length,
130    uint64_t seed)
131{
132
133	h->value = (unsigned long)MurmurHash64A(key, length, seed);
134	return;
135}
136
137static struct ck_ht_map *
138ck_ht_map_create(struct ck_ht *table, CK_HT_TYPE entries)
139{
140	struct ck_ht_map *map;
141	CK_HT_TYPE size;
142	uintptr_t prefix;
143	uint32_t n_entries;
144
145	n_entries = ck_internal_power_2(entries);
146	if (n_entries < CK_HT_BUCKET_LENGTH)
147		n_entries = CK_HT_BUCKET_LENGTH;
148
149	size = sizeof(struct ck_ht_map) +
150		   (sizeof(struct ck_ht_entry) * n_entries + CK_MD_CACHELINE - 1);
151
152	if (table->mode & CK_HT_WORKLOAD_DELETE) {
153		prefix = sizeof(CK_HT_WORD) * n_entries;
154		size += prefix;
155	} else {
156		prefix = 0;
157	}
158
159	map = table->m->malloc(size);
160	if (map == NULL)
161		return NULL;
162
163	map->mode = table->mode;
164	map->size = size;
165	map->probe_limit = ck_internal_max_64(n_entries >>
166	    (CK_HT_BUCKET_SHIFT + 2), CK_HT_PROBE_DEFAULT);
167
168	map->deletions = 0;
169	map->probe_maximum = 0;
170	map->capacity = n_entries;
171	map->step = ck_cc_ffsll(map->capacity);
172	map->mask = map->capacity - 1;
173	map->n_entries = 0;
174	map->entries = (struct ck_ht_entry *)(((uintptr_t)&map[1] + prefix +
175	    CK_MD_CACHELINE - 1) & ~(CK_MD_CACHELINE - 1));
176
177	if (table->mode & CK_HT_WORKLOAD_DELETE) {
178		map->probe_bound = (CK_HT_WORD *)&map[1];
179		memset(map->probe_bound, 0, prefix);
180	} else {
181		map->probe_bound = NULL;
182	}
183
184	memset(map->entries, 0, sizeof(struct ck_ht_entry) * n_entries);
185	ck_pr_fence_store();
186	return map;
187}
188
189static inline void
190ck_ht_map_bound_set(struct ck_ht_map *m,
191    struct ck_ht_hash h,
192    CK_HT_TYPE n_probes)
193{
194	CK_HT_TYPE offset = h.value & m->mask;
195
196	if (n_probes > m->probe_maximum)
197		CK_HT_TYPE_STORE(&m->probe_maximum, n_probes);
198
199	if (m->probe_bound != NULL && m->probe_bound[offset] < n_probes) {
200		if (n_probes >= CK_HT_WORD_MAX)
201			n_probes = CK_HT_WORD_MAX;
202
203		CK_HT_STORE(&m->probe_bound[offset], n_probes);
204		ck_pr_fence_store();
205	}
206
207	return;
208}
209
210static inline CK_HT_TYPE
211ck_ht_map_bound_get(struct ck_ht_map *m, struct ck_ht_hash h)
212{
213	CK_HT_TYPE offset = h.value & m->mask;
214	CK_HT_TYPE r = CK_HT_WORD_MAX;
215
216	if (m->probe_bound != NULL) {
217		r = CK_HT_LOAD(&m->probe_bound[offset]);
218		if (r == CK_HT_WORD_MAX)
219			r = CK_HT_TYPE_LOAD(&m->probe_maximum);
220	} else {
221		r = CK_HT_TYPE_LOAD(&m->probe_maximum);
222	}
223
224	return r;
225}
226
227static void
228ck_ht_map_destroy(struct ck_malloc *m, struct ck_ht_map *map, bool defer)
229{
230
231	m->free(map, map->size, defer);
232	return;
233}
234
235static inline size_t
236ck_ht_map_probe_next(struct ck_ht_map *map, size_t offset, ck_ht_hash_t h, size_t probes)
237{
238	ck_ht_hash_t r;
239	size_t stride;
240	unsigned long level = (unsigned long)probes >> CK_HT_BUCKET_SHIFT;
241
242	r.value = (h.value >> map->step) >> level;
243	stride = (r.value & ~CK_HT_BUCKET_MASK) << 1
244		     | (r.value & CK_HT_BUCKET_MASK);
245
246	return (offset + level +
247	    (stride | CK_HT_BUCKET_LENGTH)) & map->mask;
248}
249
250bool
251ck_ht_init(struct ck_ht *table,
252    unsigned int mode,
253    ck_ht_hash_cb_t *h,
254    struct ck_malloc *m,
255    CK_HT_TYPE entries,
256    uint64_t seed)
257{
258
259	if (m == NULL || m->malloc == NULL || m->free == NULL)
260		return false;
261
262	table->m = m;
263	table->mode = mode;
264	table->seed = seed;
265
266	if (h == NULL) {
267		table->h = ck_ht_hash_wrapper;
268	} else {
269		table->h = h;
270	}
271
272	table->map = ck_ht_map_create(table, entries);
273	return table->map != NULL;
274}
275
276static struct ck_ht_entry *
277ck_ht_map_probe_wr(struct ck_ht_map *map,
278    ck_ht_hash_t h,
279    ck_ht_entry_t *snapshot,
280    ck_ht_entry_t **available,
281    const void *key,
282    uint16_t key_length,
283    CK_HT_TYPE *probe_limit,
284    CK_HT_TYPE *probe_wr)
285{
286	struct ck_ht_entry *bucket, *cursor;
287	struct ck_ht_entry *first = NULL;
288	size_t offset, i, j;
289	CK_HT_TYPE probes = 0;
290	CK_HT_TYPE limit;
291
292	if (probe_limit == NULL) {
293		limit = ck_ht_map_bound_get(map, h);
294	} else {
295		limit = CK_HT_TYPE_MAX;
296	}
297
298	offset = h.value & map->mask;
299	for (i = 0; i < map->probe_limit; i++) {
300		/*
301		 * Probe on a complete cache line first. Scan forward and wrap around to
302		 * the beginning of the cache line. Only when the complete cache line has
303		 * been scanned do we move on to the next row.
304		 */
305		bucket = (void *)((uintptr_t)(map->entries + offset) &
306			     ~(CK_MD_CACHELINE - 1));
307
308		for (j = 0; j < CK_HT_BUCKET_LENGTH; j++) {
309			uint16_t k;
310
311			if (probes++ > limit)
312				break;
313
314			cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
315
316			/*
317			 * It is probably worth it to encapsulate probe state
318			 * in order to prevent a complete reprobe sequence in
319			 * the case of intermittent writers.
320			 */
321			if (cursor->key == CK_HT_KEY_TOMBSTONE) {
322				if (first == NULL) {
323					first = cursor;
324					*probe_wr = probes;
325				}
326
327				continue;
328			}
329
330			if (cursor->key == CK_HT_KEY_EMPTY)
331				goto leave;
332
333			if (cursor->key == (uintptr_t)key)
334				goto leave;
335
336			if (map->mode & CK_HT_MODE_BYTESTRING) {
337				void *pointer;
338
339				/*
340				 * Check memoized portion of hash value before
341				 * expensive full-length comparison.
342				 */
343				k = ck_ht_entry_key_length(cursor);
344				if (k != key_length)
345					continue;
346
347#ifdef CK_HT_PP
348				if ((cursor->value >> CK_MD_VMA_BITS) != ((h.value >> 32) & CK_HT_KEY_MASK))
349					continue;
350#else
351				if (cursor->hash != h.value)
352					continue;
353#endif
354
355				pointer = ck_ht_entry_key(cursor);
356				if (memcmp(pointer, key, key_length) == 0)
357					goto leave;
358			}
359		}
360
361		offset = ck_ht_map_probe_next(map, offset, h, probes);
362	}
363
364	cursor = NULL;
365
366leave:
367	if (probe_limit != NULL) {
368		*probe_limit = probes;
369	} else if (first == NULL) {
370		*probe_wr = probes;
371	}
372
373	*available = first;
374
375	if (cursor != NULL) {
376		*snapshot = *cursor;
377	}
378
379	return cursor;
380}
381
382bool
383ck_ht_gc(struct ck_ht *ht, unsigned long cycles, unsigned long seed)
384{
385	CK_HT_WORD *bounds = NULL;
386	struct ck_ht_map *map = ht->map;
387	CK_HT_TYPE maximum, i;
388	CK_HT_TYPE size = 0;
389
390	if (map->n_entries == 0) {
391		CK_HT_TYPE_STORE(&map->probe_maximum, 0);
392		if (map->probe_bound != NULL)
393			memset(map->probe_bound, 0, sizeof(CK_HT_WORD) * map->capacity);
394
395		return true;
396	}
397
398	if (cycles == 0) {
399		maximum = 0;
400
401		if (map->probe_bound != NULL) {
402			size = sizeof(CK_HT_WORD) * map->capacity;
403			bounds = ht->m->malloc(size);
404			if (bounds == NULL)
405				return false;
406
407			memset(bounds, 0, size);
408		}
409	} else {
410		maximum = map->probe_maximum;
411	}
412
413	for (i = 0; i < map->capacity; i++) {
414		struct ck_ht_entry *entry, *priority, snapshot;
415		struct ck_ht_hash h;
416		CK_HT_TYPE probes_wr;
417		CK_HT_TYPE offset;
418
419		entry = &map->entries[(i + seed) & map->mask];
420		if (entry->key == CK_HT_KEY_EMPTY ||
421		    entry->key == CK_HT_KEY_TOMBSTONE) {
422			continue;
423		}
424
425		if (ht->mode & CK_HT_MODE_BYTESTRING) {
426#ifndef CK_HT_PP
427			h.value = entry->hash;
428#else
429			ht->h(&h, ck_ht_entry_key(entry), ck_ht_entry_key_length(entry),
430			    ht->seed);
431#endif
432			entry = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
433			    ck_ht_entry_key(entry),
434			    ck_ht_entry_key_length(entry),
435			    NULL, &probes_wr);
436		} else {
437#ifndef CK_HT_PP
438			h.value = entry->hash;
439#else
440			ht->h(&h, &entry->key, sizeof(entry->key), ht->seed);
441#endif
442			entry = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
443			    (void *)entry->key,
444			    sizeof(entry->key),
445			    NULL, &probes_wr);
446		}
447
448		offset = h.value & map->mask;
449
450		if (priority != NULL) {
451			CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
452			ck_pr_fence_store();
453#ifndef CK_HT_PP
454			CK_HT_TYPE_STORE(&priority->key_length, entry->key_length);
455			CK_HT_TYPE_STORE(&priority->hash, entry->hash);
456#endif
457			ck_pr_store_ptr_unsafe(&priority->value, (void *)entry->value);
458			ck_pr_fence_store();
459			ck_pr_store_ptr_unsafe(&priority->key, (void *)entry->key);
460			ck_pr_fence_store();
461			CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
462			ck_pr_fence_store();
463			ck_pr_store_ptr_unsafe(&entry->key, (void *)CK_HT_KEY_TOMBSTONE);
464			ck_pr_fence_store();
465		}
466
467		if (cycles == 0) {
468			if (probes_wr > maximum)
469				maximum = probes_wr;
470
471			if (probes_wr >= CK_HT_WORD_MAX)
472				probes_wr = CK_HT_WORD_MAX;
473
474			if (bounds != NULL && probes_wr > bounds[offset])
475				bounds[offset] = probes_wr;
476		} else if (--cycles == 0)
477			break;
478	}
479
480	if (maximum != map->probe_maximum)
481		CK_HT_TYPE_STORE(&map->probe_maximum, maximum);
482
483	if (bounds != NULL) {
484		for (i = 0; i < map->capacity; i++)
485			CK_HT_STORE(&map->probe_bound[i], bounds[i]);
486
487		ht->m->free(bounds, size, false);
488	}
489
490	return true;
491}
492
493static struct ck_ht_entry *
494ck_ht_map_probe_rd(struct ck_ht_map *map,
495    ck_ht_hash_t h,
496    ck_ht_entry_t *snapshot,
497    const void *key,
498    uint16_t key_length)
499{
500	struct ck_ht_entry *bucket, *cursor;
501	size_t offset, i, j;
502	CK_HT_TYPE probes = 0;
503	CK_HT_TYPE probe_maximum;
504
505#ifndef CK_HT_PP
506	CK_HT_TYPE d = 0;
507	CK_HT_TYPE d_prime = 0;
508retry:
509#endif
510
511	probe_maximum = ck_ht_map_bound_get(map, h);
512	offset = h.value & map->mask;
513
514	for (i = 0; i < map->probe_limit; i++) {
515		/*
516		 * Probe on a complete cache line first. Scan forward and wrap around to
517		 * the beginning of the cache line. Only when the complete cache line has
518		 * been scanned do we move on to the next row.
519		 */
520		bucket = (void *)((uintptr_t)(map->entries + offset) &
521			     ~(CK_MD_CACHELINE - 1));
522
523		for (j = 0; j < CK_HT_BUCKET_LENGTH; j++) {
524			uint16_t k;
525
526			if (probes++ > probe_maximum)
527				return NULL;
528
529			cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
530
531#ifdef CK_HT_PP
532			snapshot->key = (uintptr_t)ck_pr_load_ptr(&cursor->key);
533			ck_pr_fence_load();
534			snapshot->value = (uintptr_t)ck_pr_load_ptr(&cursor->value);
535#else
536			d = CK_HT_TYPE_LOAD(&map->deletions);
537			snapshot->key = (uintptr_t)ck_pr_load_ptr(&cursor->key);
538			ck_pr_fence_load();
539			snapshot->key_length = CK_HT_TYPE_LOAD(&cursor->key_length);
540			snapshot->hash = CK_HT_TYPE_LOAD(&cursor->hash);
541			snapshot->value = (uintptr_t)ck_pr_load_ptr(&cursor->value);
542#endif
543
544			/*
545			 * It is probably worth it to encapsulate probe state
546			 * in order to prevent a complete reprobe sequence in
547			 * the case of intermittent writers.
548			 */
549			if (snapshot->key == CK_HT_KEY_TOMBSTONE)
550				continue;
551
552			if (snapshot->key == CK_HT_KEY_EMPTY)
553				goto leave;
554
555			if (snapshot->key == (uintptr_t)key)
556				goto leave;
557
558			if (map->mode & CK_HT_MODE_BYTESTRING) {
559				void *pointer;
560
561				/*
562				 * Check memoized portion of hash value before
563				 * expensive full-length comparison.
564				 */
565				k = ck_ht_entry_key_length(snapshot);
566				if (k != key_length)
567					continue;
568#ifdef CK_HT_PP
569				if ((snapshot->value >> CK_MD_VMA_BITS) != ((h.value >> 32) & CK_HT_KEY_MASK))
570					continue;
571#else
572				if (snapshot->hash != h.value)
573					continue;
574
575				d_prime = CK_HT_TYPE_LOAD(&map->deletions);
576
577				/*
578				 * It is possible that the slot was
579				 * replaced, initiate a re-probe.
580				 */
581				if (d != d_prime)
582					goto retry;
583#endif
584
585				pointer = ck_ht_entry_key(snapshot);
586				if (memcmp(pointer, key, key_length) == 0)
587					goto leave;
588			}
589		}
590
591		offset = ck_ht_map_probe_next(map, offset, h, probes);
592	}
593
594	return NULL;
595
596leave:
597	return cursor;
598}
599
600CK_HT_TYPE
601ck_ht_count(struct ck_ht *table)
602{
603	struct ck_ht_map *map = ck_pr_load_ptr(&table->map);
604
605	return CK_HT_TYPE_LOAD(&map->n_entries);
606}
607
608bool
609ck_ht_next(struct ck_ht *table,
610    struct ck_ht_iterator *i,
611    struct ck_ht_entry **entry)
612{
613	struct ck_ht_map *map = table->map;
614	uintptr_t key;
615
616	if (i->offset >= map->capacity)
617		return false;
618
619	do {
620		key = map->entries[i->offset].key;
621		if (key != CK_HT_KEY_EMPTY && key != CK_HT_KEY_TOMBSTONE)
622			break;
623	} while (++i->offset < map->capacity);
624
625	if (i->offset >= map->capacity)
626		return false;
627
628	*entry = map->entries + i->offset++;
629	return true;
630}
631
632bool
633ck_ht_reset_size_spmc(struct ck_ht *table, CK_HT_TYPE size)
634{
635	struct ck_ht_map *map, *update;
636
637	map = table->map;
638	update = ck_ht_map_create(table, size);
639	if (update == NULL)
640		return false;
641
642	ck_pr_store_ptr_unsafe(&table->map, update);
643	ck_ht_map_destroy(table->m, map, true);
644	return true;
645}
646
647bool
648ck_ht_reset_spmc(struct ck_ht *table)
649{
650	struct ck_ht_map *map = table->map;
651
652	return ck_ht_reset_size_spmc(table, map->capacity);
653}
654
655bool
656ck_ht_grow_spmc(struct ck_ht *table, CK_HT_TYPE capacity)
657{
658	struct ck_ht_map *map, *update;
659	struct ck_ht_entry *bucket, *previous;
660	struct ck_ht_hash h;
661	size_t k, i, j, offset;
662	CK_HT_TYPE probes;
663
664restart:
665	map = table->map;
666
667	if (map->capacity >= capacity)
668		return false;
669
670	update = ck_ht_map_create(table, capacity);
671	if (update == NULL)
672		return false;
673
674	for (k = 0; k < map->capacity; k++) {
675		previous = &map->entries[k];
676
677		if (previous->key == CK_HT_KEY_EMPTY || previous->key == CK_HT_KEY_TOMBSTONE)
678			continue;
679
680		if (table->mode & CK_HT_MODE_BYTESTRING) {
681#ifdef CK_HT_PP
682			void *key;
683			uint16_t key_length;
684
685			key = ck_ht_entry_key(previous);
686			key_length = ck_ht_entry_key_length(previous);
687#endif
688
689#ifndef CK_HT_PP
690			h.value = previous->hash;
691#else
692			table->h(&h, key, key_length, table->seed);
693#endif
694		} else {
695#ifndef CK_HT_PP
696			h.value = previous->hash;
697#else
698			table->h(&h, &previous->key, sizeof(previous->key), table->seed);
699#endif
700		}
701
702		offset = h.value & update->mask;
703		probes = 0;
704
705		for (i = 0; i < update->probe_limit; i++) {
706			bucket = (void *)((uintptr_t)(update->entries + offset) & ~(CK_MD_CACHELINE - 1));
707
708			for (j = 0; j < CK_HT_BUCKET_LENGTH; j++) {
709				struct ck_ht_entry *cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
710
711				probes++;
712				if (CK_CC_LIKELY(cursor->key == CK_HT_KEY_EMPTY)) {
713					*cursor = *previous;
714					update->n_entries++;
715					ck_ht_map_bound_set(update, h, probes);
716					break;
717				}
718			}
719
720			if (j < CK_HT_BUCKET_LENGTH)
721				break;
722
723			offset = ck_ht_map_probe_next(update, offset, h, probes);
724		}
725
726		if (i == update->probe_limit) {
727			/*
728			 * We have hit the probe limit, the map needs to be even
729			 * larger.
730			 */
731			ck_ht_map_destroy(table->m, update, false);
732			capacity <<= 1;
733			goto restart;
734		}
735	}
736
737	ck_pr_fence_store();
738	ck_pr_store_ptr_unsafe(&table->map, update);
739	ck_ht_map_destroy(table->m, map, true);
740	return true;
741}
742
743bool
744ck_ht_remove_spmc(struct ck_ht *table,
745    ck_ht_hash_t h,
746    ck_ht_entry_t *entry)
747{
748	struct ck_ht_map *map;
749	struct ck_ht_entry *candidate, snapshot;
750
751	map = table->map;
752
753	if (table->mode & CK_HT_MODE_BYTESTRING) {
754		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
755		    ck_ht_entry_key(entry),
756		    ck_ht_entry_key_length(entry));
757	} else {
758		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
759		    (void *)entry->key,
760		    sizeof(entry->key));
761	}
762
763	/* No matching entry was found. */
764	if (candidate == NULL || snapshot.key == CK_HT_KEY_EMPTY)
765		return false;
766
767	*entry = snapshot;
768
769	ck_pr_store_ptr_unsafe(&candidate->key, (void *)CK_HT_KEY_TOMBSTONE);
770	ck_pr_fence_store();
771	CK_HT_TYPE_STORE(&map->n_entries, map->n_entries - 1);
772	return true;
773}
774
775bool
776ck_ht_get_spmc(struct ck_ht *table,
777    ck_ht_hash_t h,
778    ck_ht_entry_t *entry)
779{
780	struct ck_ht_entry *candidate, snapshot;
781	struct ck_ht_map *map;
782	CK_HT_TYPE d, d_prime;
783
784restart:
785	map = ck_pr_load_ptr(&table->map);
786
787	/*
788	 * Platforms that cannot read key and key_length atomically must reprobe
789	 * on the scan of any single entry.
790	 */
791	d = CK_HT_TYPE_LOAD(&map->deletions);
792
793	if (table->mode & CK_HT_MODE_BYTESTRING) {
794		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
795		    ck_ht_entry_key(entry), ck_ht_entry_key_length(entry));
796	} else {
797		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
798		    (void *)entry->key, sizeof(entry->key));
799	}
800
801	d_prime = CK_HT_TYPE_LOAD(&map->deletions);
802	if (d != d_prime) {
803		/*
804		 * It is possible we have read (K, V'). Only valid states are
805		 * (K, V), (K', V') and (T, V). Restart load operation in face
806		 * of concurrent deletions or replacements.
807		 */
808		goto restart;
809	}
810
811	if (candidate == NULL || snapshot.key == CK_HT_KEY_EMPTY)
812		return false;
813
814	*entry = snapshot;
815	return true;
816}
817
818bool
819ck_ht_set_spmc(struct ck_ht *table,
820    ck_ht_hash_t h,
821    ck_ht_entry_t *entry)
822{
823	struct ck_ht_entry snapshot, *candidate, *priority;
824	struct ck_ht_map *map;
825	CK_HT_TYPE probes, probes_wr;
826	bool empty = false;
827
828	for (;;) {
829		map = table->map;
830
831		if (table->mode & CK_HT_MODE_BYTESTRING) {
832			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
833			    ck_ht_entry_key(entry),
834			    ck_ht_entry_key_length(entry),
835			    &probes, &probes_wr);
836		} else {
837			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
838			    (void *)entry->key,
839			    sizeof(entry->key),
840			    &probes, &probes_wr);
841		}
842
843		if (priority != NULL) {
844			probes = probes_wr;
845			break;
846		}
847
848		if (candidate != NULL)
849			break;
850
851		if (ck_ht_grow_spmc(table, map->capacity << 1) == false)
852			return false;
853	}
854
855	if (candidate == NULL) {
856		candidate = priority;
857		empty = true;
858	}
859
860	if (candidate->key != CK_HT_KEY_EMPTY &&
861	    priority != NULL && candidate != priority) {
862		/*
863		 * Entry is moved into another position in probe sequence.
864		 * We avoid a state of (K, B) (where [K, B] -> [K', B]) by
865		 * guaranteeing a forced reprobe before transitioning from K to
866		 * T. (K, B) implies (K, B, D') so we will reprobe successfully
867		 * from this transient state.
868		 */
869		probes = probes_wr;
870
871#ifndef CK_HT_PP
872		CK_HT_TYPE_STORE(&priority->key_length, entry->key_length);
873		CK_HT_TYPE_STORE(&priority->hash, entry->hash);
874#endif
875
876		/*
877		 * Readers must observe version counter change before they
878		 * observe re-use. If they observe re-use, it is at most
879		 * a tombstone.
880		 */
881		if (priority->value == CK_HT_KEY_TOMBSTONE) {
882			CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
883			ck_pr_fence_store();
884		}
885
886		ck_pr_store_ptr_unsafe(&priority->value, (void *)entry->value);
887		ck_pr_fence_store();
888		ck_pr_store_ptr_unsafe(&priority->key, (void *)entry->key);
889		ck_pr_fence_store();
890
891		/*
892		 * Make sure that readers who observe the tombstone would
893		 * also observe counter change.
894		 */
895		CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
896		ck_pr_fence_store();
897
898		ck_pr_store_ptr_unsafe(&candidate->key, (void *)CK_HT_KEY_TOMBSTONE);
899		ck_pr_fence_store();
900	} else {
901		/*
902		 * In this case we are inserting a new entry or replacing
903		 * an existing entry. Yes, this can be combined into above branch,
904		 * but isn't because you are actually looking at dying code
905		 * (ck_ht is effectively deprecated and is being replaced soon).
906		 */
907		bool replace = candidate->key != CK_HT_KEY_EMPTY &&
908		    candidate->key != CK_HT_KEY_TOMBSTONE;
909
910		if (priority != NULL) {
911			if (priority->key == CK_HT_KEY_TOMBSTONE) {
912				CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
913				ck_pr_fence_store();
914			}
915
916			candidate = priority;
917			probes = probes_wr;
918		}
919
920#ifdef CK_HT_PP
921		ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
922		ck_pr_fence_store();
923		ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
924#else
925		CK_HT_TYPE_STORE(&candidate->key_length, entry->key_length);
926		CK_HT_TYPE_STORE(&candidate->hash, entry->hash);
927		ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
928		ck_pr_fence_store();
929		ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
930#endif
931
932		/*
933		 * If we are insert a new entry then increment number
934		 * of entries associated with map.
935		 */
936		if (replace == false)
937			CK_HT_TYPE_STORE(&map->n_entries, map->n_entries + 1);
938	}
939
940	ck_ht_map_bound_set(map, h, probes);
941
942	/* Enforce a load factor of 0.5. */
943	if (map->n_entries * 2 > map->capacity)
944		ck_ht_grow_spmc(table, map->capacity << 1);
945
946	if (empty == true) {
947		entry->key = CK_HT_KEY_EMPTY;
948	} else {
949		*entry = snapshot;
950	}
951
952	return true;
953}
954
955bool
956ck_ht_put_spmc(struct ck_ht *table,
957    ck_ht_hash_t h,
958    ck_ht_entry_t *entry)
959{
960	struct ck_ht_entry snapshot, *candidate, *priority;
961	struct ck_ht_map *map;
962	CK_HT_TYPE probes, probes_wr;
963
964	for (;;) {
965		map = table->map;
966
967		if (table->mode & CK_HT_MODE_BYTESTRING) {
968			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
969			    ck_ht_entry_key(entry),
970			    ck_ht_entry_key_length(entry),
971			    &probes, &probes_wr);
972		} else {
973			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
974			    (void *)entry->key,
975			    sizeof(entry->key),
976			    &probes, &probes_wr);
977		}
978
979		if (candidate != NULL || priority != NULL)
980			break;
981
982		if (ck_ht_grow_spmc(table, map->capacity << 1) == false)
983			return false;
984	}
985
986	if (priority != NULL) {
987		/* Version counter is updated before re-use. */
988		CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
989		ck_pr_fence_store();
990
991		/* Re-use tombstone if one was found. */
992		candidate = priority;
993		probes = probes_wr;
994	} else if (candidate->key != CK_HT_KEY_EMPTY &&
995	    candidate->key != CK_HT_KEY_TOMBSTONE) {
996		/*
997		 * If the snapshot key is non-empty and the value field is not
998		 * a tombstone then an identical key was found. As store does
999		 * not implement replacement, we will fail.
1000		 */
1001		return false;
1002	}
1003
1004	ck_ht_map_bound_set(map, h, probes);
1005
1006#ifdef CK_HT_PP
1007	ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
1008	ck_pr_fence_store();
1009	ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
1010#else
1011	CK_HT_TYPE_STORE(&candidate->key_length, entry->key_length);
1012	CK_HT_TYPE_STORE(&candidate->hash, entry->hash);
1013	ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
1014	ck_pr_fence_store();
1015	ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
1016#endif
1017
1018	CK_HT_TYPE_STORE(&map->n_entries, map->n_entries + 1);
1019
1020	/* Enforce a load factor of 0.5. */
1021	if (map->n_entries * 2 > map->capacity)
1022		ck_ht_grow_spmc(table, map->capacity << 1);
1023
1024	return true;
1025}
1026
1027void
1028ck_ht_destroy(struct ck_ht *table)
1029{
1030
1031	ck_ht_map_destroy(table->m, table->map, false);
1032	return;
1033}
1034