1/*
2 * Copyright 2012-2015 Samy Al Bahra.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#define CK_HT_IM
28#include <ck_ht.h>
29
30/*
31 * This implementation borrows several techniques from Josh Dybnis's
32 * nbds library which can be found at http://code.google.com/p/nbds
33 *
34 * This release currently only includes support for 64-bit platforms.
35 * We can address 32-bit platforms in a future release.
36 */
37#include <ck_cc.h>
38#include <ck_md.h>
39#include <ck_pr.h>
40#include <ck_stdint.h>
41#include <ck_stdbool.h>
42#include <ck_string.h>
43
44#include "ck_ht_hash.h"
45#include "ck_internal.h"
46
47#ifndef CK_HT_BUCKET_LENGTH
48
49#ifdef CK_HT_PP
50#define CK_HT_BUCKET_SHIFT 2ULL
51#else
52#define CK_HT_BUCKET_SHIFT 1ULL
53#endif
54
55#define CK_HT_BUCKET_LENGTH (1U << CK_HT_BUCKET_SHIFT)
56#define CK_HT_BUCKET_MASK (CK_HT_BUCKET_LENGTH - 1)
57#endif
58
59#ifndef CK_HT_PROBE_DEFAULT
60#define CK_HT_PROBE_DEFAULT 64ULL
61#endif
62
63#if defined(CK_F_PR_LOAD_8) && defined(CK_F_PR_STORE_8)
64#define CK_HT_WORD	    uint8_t
65#define CK_HT_WORD_MAX	    UINT8_MAX
66#define CK_HT_STORE(x, y)   ck_pr_store_8(x, y)
67#define CK_HT_LOAD(x)	    ck_pr_load_8(x)
68#elif defined(CK_F_PR_LOAD_16) && defined(CK_F_PR_STORE_16)
69#define CK_HT_WORD	    uint16_t
70#define CK_HT_WORD_MAX	    UINT16_MAX
71#define CK_HT_STORE(x, y)   ck_pr_store_16(x, y)
72#define CK_HT_LOAD(x)	    ck_pr_load_16(x)
73#elif defined(CK_F_PR_LOAD_32) && defined(CK_F_PR_STORE_32)
74#define CK_HT_WORD	    uint32_t
75#define CK_HT_WORD_MAX	    UINT32_MAX
76#define CK_HT_STORE(x, y)   ck_pr_store_32(x, y)
77#define CK_HT_LOAD(x)	    ck_pr_load_32(x)
78#else
79#error "ck_ht is not supported on your platform."
80#endif
81
82struct ck_ht_map {
83	unsigned int mode;
84	CK_HT_TYPE deletions;
85	CK_HT_TYPE probe_maximum;
86	CK_HT_TYPE probe_length;
87	CK_HT_TYPE probe_limit;
88	CK_HT_TYPE size;
89	CK_HT_TYPE n_entries;
90	CK_HT_TYPE mask;
91	CK_HT_TYPE capacity;
92	CK_HT_TYPE step;
93	CK_HT_WORD *probe_bound;
94	struct ck_ht_entry *entries;
95};
96
97void
98ck_ht_stat(struct ck_ht *table,
99    struct ck_ht_stat *st)
100{
101	struct ck_ht_map *map = table->map;
102
103	st->n_entries = map->n_entries;
104	st->probe_maximum = map->probe_maximum;
105	return;
106}
107
108void
109ck_ht_hash(struct ck_ht_hash *h,
110    struct ck_ht *table,
111    const void *key,
112    uint16_t key_length)
113{
114
115	table->h(h, key, key_length, table->seed);
116	return;
117}
118
119void
120ck_ht_hash_direct(struct ck_ht_hash *h,
121    struct ck_ht *table,
122    uintptr_t key)
123{
124
125	ck_ht_hash(h, table, &key, sizeof(key));
126	return;
127}
128
129static void
130ck_ht_hash_wrapper(struct ck_ht_hash *h,
131    const void *key,
132    size_t length,
133    uint64_t seed)
134{
135
136	h->value = (unsigned long)MurmurHash64A(key, length, seed);
137	return;
138}
139
140static struct ck_ht_map *
141ck_ht_map_create(struct ck_ht *table, CK_HT_TYPE entries)
142{
143	struct ck_ht_map *map;
144	CK_HT_TYPE size;
145	uintptr_t prefix;
146	uint32_t n_entries;
147
148	n_entries = ck_internal_power_2(entries);
149	if (n_entries < CK_HT_BUCKET_LENGTH)
150		n_entries = CK_HT_BUCKET_LENGTH;
151
152	size = sizeof(struct ck_ht_map) +
153		   (sizeof(struct ck_ht_entry) * n_entries + CK_MD_CACHELINE - 1);
154
155	if (table->mode & CK_HT_WORKLOAD_DELETE) {
156		prefix = sizeof(CK_HT_WORD) * n_entries;
157		size += prefix;
158	} else {
159		prefix = 0;
160	}
161
162	map = table->m->malloc(size);
163	if (map == NULL)
164		return NULL;
165
166	map->mode = table->mode;
167	map->size = size;
168	map->probe_limit = ck_internal_max_64(n_entries >>
169	    (CK_HT_BUCKET_SHIFT + 2), CK_HT_PROBE_DEFAULT);
170
171	map->deletions = 0;
172	map->probe_maximum = 0;
173	map->capacity = n_entries;
174	map->step = ck_cc_ffsll(map->capacity);
175	map->mask = map->capacity - 1;
176	map->n_entries = 0;
177	map->entries = (struct ck_ht_entry *)(((uintptr_t)&map[1] + prefix +
178	    CK_MD_CACHELINE - 1) & ~(CK_MD_CACHELINE - 1));
179
180	if (table->mode & CK_HT_WORKLOAD_DELETE) {
181		map->probe_bound = (CK_HT_WORD *)&map[1];
182		memset(map->probe_bound, 0, prefix);
183	} else {
184		map->probe_bound = NULL;
185	}
186
187	memset(map->entries, 0, sizeof(struct ck_ht_entry) * n_entries);
188	ck_pr_fence_store();
189	return map;
190}
191
192static inline void
193ck_ht_map_bound_set(struct ck_ht_map *m,
194    struct ck_ht_hash h,
195    CK_HT_TYPE n_probes)
196{
197	CK_HT_TYPE offset = h.value & m->mask;
198
199	if (n_probes > m->probe_maximum)
200		CK_HT_TYPE_STORE(&m->probe_maximum, n_probes);
201
202	if (m->probe_bound != NULL && m->probe_bound[offset] < n_probes) {
203		if (n_probes >= CK_HT_WORD_MAX)
204			n_probes = CK_HT_WORD_MAX;
205
206		CK_HT_STORE(&m->probe_bound[offset], n_probes);
207		ck_pr_fence_store();
208	}
209
210	return;
211}
212
213static inline CK_HT_TYPE
214ck_ht_map_bound_get(struct ck_ht_map *m, struct ck_ht_hash h)
215{
216	CK_HT_TYPE offset = h.value & m->mask;
217	CK_HT_TYPE r = CK_HT_WORD_MAX;
218
219	if (m->probe_bound != NULL) {
220		r = CK_HT_LOAD(&m->probe_bound[offset]);
221		if (r == CK_HT_WORD_MAX)
222			r = CK_HT_TYPE_LOAD(&m->probe_maximum);
223	} else {
224		r = CK_HT_TYPE_LOAD(&m->probe_maximum);
225	}
226
227	return r;
228}
229
230static void
231ck_ht_map_destroy(struct ck_malloc *m, struct ck_ht_map *map, bool defer)
232{
233
234	m->free(map, map->size, defer);
235	return;
236}
237
238static inline size_t
239ck_ht_map_probe_next(struct ck_ht_map *map, size_t offset, ck_ht_hash_t h, size_t probes)
240{
241	ck_ht_hash_t r;
242	size_t stride;
243	unsigned long level = (unsigned long)probes >> CK_HT_BUCKET_SHIFT;
244
245	r.value = (h.value >> map->step) >> level;
246	stride = (r.value & ~CK_HT_BUCKET_MASK) << 1
247		     | (r.value & CK_HT_BUCKET_MASK);
248
249	return (offset + level +
250	    (stride | CK_HT_BUCKET_LENGTH)) & map->mask;
251}
252
253bool
254ck_ht_init(struct ck_ht *table,
255    unsigned int mode,
256    ck_ht_hash_cb_t *h,
257    struct ck_malloc *m,
258    CK_HT_TYPE entries,
259    uint64_t seed)
260{
261
262	if (m == NULL || m->malloc == NULL || m->free == NULL)
263		return false;
264
265	table->m = m;
266	table->mode = mode;
267	table->seed = seed;
268
269	if (h == NULL) {
270		table->h = ck_ht_hash_wrapper;
271	} else {
272		table->h = h;
273	}
274
275	table->map = ck_ht_map_create(table, entries);
276	return table->map != NULL;
277}
278
279static struct ck_ht_entry *
280ck_ht_map_probe_wr(struct ck_ht_map *map,
281    ck_ht_hash_t h,
282    ck_ht_entry_t *snapshot,
283    ck_ht_entry_t **available,
284    const void *key,
285    uint16_t key_length,
286    CK_HT_TYPE *probe_limit,
287    CK_HT_TYPE *probe_wr)
288{
289	struct ck_ht_entry *bucket, *cursor;
290	struct ck_ht_entry *first = NULL;
291	size_t offset, i, j;
292	CK_HT_TYPE probes = 0;
293	CK_HT_TYPE limit;
294
295	if (probe_limit == NULL) {
296		limit = ck_ht_map_bound_get(map, h);
297	} else {
298		limit = CK_HT_TYPE_MAX;
299	}
300
301	offset = h.value & map->mask;
302	for (i = 0; i < map->probe_limit; i++) {
303		/*
304		 * Probe on a complete cache line first. Scan forward and wrap around to
305		 * the beginning of the cache line. Only when the complete cache line has
306		 * been scanned do we move on to the next row.
307		 */
308		bucket = (void *)((uintptr_t)(map->entries + offset) &
309			     ~(CK_MD_CACHELINE - 1));
310
311		for (j = 0; j < CK_HT_BUCKET_LENGTH; j++) {
312			uint16_t k;
313
314			if (probes++ > limit)
315				break;
316
317			cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
318
319			/*
320			 * It is probably worth it to encapsulate probe state
321			 * in order to prevent a complete reprobe sequence in
322			 * the case of intermittent writers.
323			 */
324			if (cursor->key == CK_HT_KEY_TOMBSTONE) {
325				if (first == NULL) {
326					first = cursor;
327					*probe_wr = probes;
328				}
329
330				continue;
331			}
332
333			if (cursor->key == CK_HT_KEY_EMPTY)
334				goto leave;
335
336			if (cursor->key == (uintptr_t)key)
337				goto leave;
338
339			if (map->mode & CK_HT_MODE_BYTESTRING) {
340				void *pointer;
341
342				/*
343				 * Check memoized portion of hash value before
344				 * expensive full-length comparison.
345				 */
346				k = ck_ht_entry_key_length(cursor);
347				if (k != key_length)
348					continue;
349
350#ifdef CK_HT_PP
351				if ((cursor->value >> CK_MD_VMA_BITS) != ((h.value >> 32) & CK_HT_KEY_MASK))
352					continue;
353#else
354				if (cursor->hash != h.value)
355					continue;
356#endif
357
358				pointer = ck_ht_entry_key(cursor);
359				if (memcmp(pointer, key, key_length) == 0)
360					goto leave;
361			}
362		}
363
364		offset = ck_ht_map_probe_next(map, offset, h, probes);
365	}
366
367	cursor = NULL;
368
369leave:
370	if (probe_limit != NULL) {
371		*probe_limit = probes;
372	} else if (first == NULL) {
373		*probe_wr = probes;
374	}
375
376	*available = first;
377
378	if (cursor != NULL) {
379		*snapshot = *cursor;
380	}
381
382	return cursor;
383}
384
385bool
386ck_ht_gc(struct ck_ht *ht, unsigned long cycles, unsigned long seed)
387{
388	CK_HT_WORD *bounds = NULL;
389	struct ck_ht_map *map = ht->map;
390	CK_HT_TYPE maximum, i;
391	CK_HT_TYPE size = 0;
392
393	if (map->n_entries == 0) {
394		CK_HT_TYPE_STORE(&map->probe_maximum, 0);
395		if (map->probe_bound != NULL)
396			memset(map->probe_bound, 0, sizeof(CK_HT_WORD) * map->capacity);
397
398		return true;
399	}
400
401	if (cycles == 0) {
402		maximum = 0;
403
404		if (map->probe_bound != NULL) {
405			size = sizeof(CK_HT_WORD) * map->capacity;
406			bounds = ht->m->malloc(size);
407			if (bounds == NULL)
408				return false;
409
410			memset(bounds, 0, size);
411		}
412	} else {
413		maximum = map->probe_maximum;
414	}
415
416	for (i = 0; i < map->capacity; i++) {
417		struct ck_ht_entry *entry, *priority, snapshot;
418		struct ck_ht_hash h;
419		CK_HT_TYPE probes_wr;
420		CK_HT_TYPE offset;
421
422		entry = &map->entries[(i + seed) & map->mask];
423		if (entry->key == CK_HT_KEY_EMPTY ||
424		    entry->key == CK_HT_KEY_TOMBSTONE) {
425			continue;
426		}
427
428		if (ht->mode & CK_HT_MODE_BYTESTRING) {
429#ifndef CK_HT_PP
430			h.value = entry->hash;
431#else
432			ht->h(&h, ck_ht_entry_key(entry), ck_ht_entry_key_length(entry),
433			    ht->seed);
434#endif
435			entry = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
436			    ck_ht_entry_key(entry),
437			    ck_ht_entry_key_length(entry),
438			    NULL, &probes_wr);
439		} else {
440#ifndef CK_HT_PP
441			h.value = entry->hash;
442#else
443			ht->h(&h, &entry->key, sizeof(entry->key), ht->seed);
444#endif
445			entry = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
446			    (void *)entry->key,
447			    sizeof(entry->key),
448			    NULL, &probes_wr);
449		}
450
451		offset = h.value & map->mask;
452
453		if (priority != NULL) {
454			CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
455			ck_pr_fence_store();
456#ifndef CK_HT_PP
457			CK_HT_TYPE_STORE(&priority->key_length, entry->key_length);
458			CK_HT_TYPE_STORE(&priority->hash, entry->hash);
459#endif
460			ck_pr_store_ptr_unsafe(&priority->value, (void *)entry->value);
461			ck_pr_fence_store();
462			ck_pr_store_ptr_unsafe(&priority->key, (void *)entry->key);
463			ck_pr_fence_store();
464			CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
465			ck_pr_fence_store();
466			ck_pr_store_ptr_unsafe(&entry->key, (void *)CK_HT_KEY_TOMBSTONE);
467			ck_pr_fence_store();
468		}
469
470		if (cycles == 0) {
471			if (probes_wr > maximum)
472				maximum = probes_wr;
473
474			if (probes_wr >= CK_HT_WORD_MAX)
475				probes_wr = CK_HT_WORD_MAX;
476
477			if (bounds != NULL && probes_wr > bounds[offset])
478				bounds[offset] = probes_wr;
479		} else if (--cycles == 0)
480			break;
481	}
482
483	if (maximum != map->probe_maximum)
484		CK_HT_TYPE_STORE(&map->probe_maximum, maximum);
485
486	if (bounds != NULL) {
487		for (i = 0; i < map->capacity; i++)
488			CK_HT_STORE(&map->probe_bound[i], bounds[i]);
489
490		ht->m->free(bounds, size, false);
491	}
492
493	return true;
494}
495
496static struct ck_ht_entry *
497ck_ht_map_probe_rd(struct ck_ht_map *map,
498    ck_ht_hash_t h,
499    ck_ht_entry_t *snapshot,
500    const void *key,
501    uint16_t key_length)
502{
503	struct ck_ht_entry *bucket, *cursor;
504	size_t offset, i, j;
505	CK_HT_TYPE probes = 0;
506	CK_HT_TYPE probe_maximum;
507
508#ifndef CK_HT_PP
509	CK_HT_TYPE d = 0;
510	CK_HT_TYPE d_prime = 0;
511retry:
512#endif
513
514	probe_maximum = ck_ht_map_bound_get(map, h);
515	offset = h.value & map->mask;
516
517	for (i = 0; i < map->probe_limit; i++) {
518		/*
519		 * Probe on a complete cache line first. Scan forward and wrap around to
520		 * the beginning of the cache line. Only when the complete cache line has
521		 * been scanned do we move on to the next row.
522		 */
523		bucket = (void *)((uintptr_t)(map->entries + offset) &
524			     ~(CK_MD_CACHELINE - 1));
525
526		for (j = 0; j < CK_HT_BUCKET_LENGTH; j++) {
527			uint16_t k;
528
529			if (probes++ > probe_maximum)
530				return NULL;
531
532			cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
533
534#ifdef CK_HT_PP
535			snapshot->key = (uintptr_t)ck_pr_load_ptr(&cursor->key);
536			ck_pr_fence_load();
537			snapshot->value = (uintptr_t)ck_pr_load_ptr(&cursor->value);
538#else
539			d = CK_HT_TYPE_LOAD(&map->deletions);
540			snapshot->key = (uintptr_t)ck_pr_load_ptr(&cursor->key);
541			ck_pr_fence_load();
542			snapshot->key_length = CK_HT_TYPE_LOAD(&cursor->key_length);
543			snapshot->hash = CK_HT_TYPE_LOAD(&cursor->hash);
544			snapshot->value = (uintptr_t)ck_pr_load_ptr(&cursor->value);
545#endif
546
547			/*
548			 * It is probably worth it to encapsulate probe state
549			 * in order to prevent a complete reprobe sequence in
550			 * the case of intermittent writers.
551			 */
552			if (snapshot->key == CK_HT_KEY_TOMBSTONE)
553				continue;
554
555			if (snapshot->key == CK_HT_KEY_EMPTY)
556				goto leave;
557
558			if (snapshot->key == (uintptr_t)key)
559				goto leave;
560
561			if (map->mode & CK_HT_MODE_BYTESTRING) {
562				void *pointer;
563
564				/*
565				 * Check memoized portion of hash value before
566				 * expensive full-length comparison.
567				 */
568				k = ck_ht_entry_key_length(snapshot);
569				if (k != key_length)
570					continue;
571#ifdef CK_HT_PP
572				if ((snapshot->value >> CK_MD_VMA_BITS) != ((h.value >> 32) & CK_HT_KEY_MASK))
573					continue;
574#else
575				if (snapshot->hash != h.value)
576					continue;
577
578				d_prime = CK_HT_TYPE_LOAD(&map->deletions);
579
580				/*
581				 * It is possible that the slot was
582				 * replaced, initiate a re-probe.
583				 */
584				if (d != d_prime)
585					goto retry;
586#endif
587
588				pointer = ck_ht_entry_key(snapshot);
589				if (memcmp(pointer, key, key_length) == 0)
590					goto leave;
591			}
592		}
593
594		offset = ck_ht_map_probe_next(map, offset, h, probes);
595	}
596
597	return NULL;
598
599leave:
600	return cursor;
601}
602
603CK_HT_TYPE
604ck_ht_count(struct ck_ht *table)
605{
606	struct ck_ht_map *map = ck_pr_load_ptr(&table->map);
607
608	return CK_HT_TYPE_LOAD(&map->n_entries);
609}
610
611bool
612ck_ht_next(struct ck_ht *table,
613    struct ck_ht_iterator *i,
614    struct ck_ht_entry **entry)
615{
616	struct ck_ht_map *map = table->map;
617	uintptr_t key;
618
619	if (i->offset >= map->capacity)
620		return false;
621
622	do {
623		key = map->entries[i->offset].key;
624		if (key != CK_HT_KEY_EMPTY && key != CK_HT_KEY_TOMBSTONE)
625			break;
626	} while (++i->offset < map->capacity);
627
628	if (i->offset >= map->capacity)
629		return false;
630
631	*entry = map->entries + i->offset++;
632	return true;
633}
634
635bool
636ck_ht_reset_size_spmc(struct ck_ht *table, CK_HT_TYPE size)
637{
638	struct ck_ht_map *map, *update;
639
640	map = table->map;
641	update = ck_ht_map_create(table, size);
642	if (update == NULL)
643		return false;
644
645	ck_pr_store_ptr_unsafe(&table->map, update);
646	ck_ht_map_destroy(table->m, map, true);
647	return true;
648}
649
650bool
651ck_ht_reset_spmc(struct ck_ht *table)
652{
653	struct ck_ht_map *map = table->map;
654
655	return ck_ht_reset_size_spmc(table, map->capacity);
656}
657
658bool
659ck_ht_grow_spmc(struct ck_ht *table, CK_HT_TYPE capacity)
660{
661	struct ck_ht_map *map, *update;
662	struct ck_ht_entry *bucket, *previous;
663	struct ck_ht_hash h;
664	size_t k, i, j, offset;
665	CK_HT_TYPE probes;
666
667restart:
668	map = table->map;
669
670	if (map->capacity >= capacity)
671		return false;
672
673	update = ck_ht_map_create(table, capacity);
674	if (update == NULL)
675		return false;
676
677	for (k = 0; k < map->capacity; k++) {
678		previous = &map->entries[k];
679
680		if (previous->key == CK_HT_KEY_EMPTY || previous->key == CK_HT_KEY_TOMBSTONE)
681			continue;
682
683		if (table->mode & CK_HT_MODE_BYTESTRING) {
684#ifdef CK_HT_PP
685			void *key;
686			uint16_t key_length;
687
688			key = ck_ht_entry_key(previous);
689			key_length = ck_ht_entry_key_length(previous);
690#endif
691
692#ifndef CK_HT_PP
693			h.value = previous->hash;
694#else
695			table->h(&h, key, key_length, table->seed);
696#endif
697		} else {
698#ifndef CK_HT_PP
699			h.value = previous->hash;
700#else
701			table->h(&h, &previous->key, sizeof(previous->key), table->seed);
702#endif
703		}
704
705		offset = h.value & update->mask;
706		probes = 0;
707
708		for (i = 0; i < update->probe_limit; i++) {
709			bucket = (void *)((uintptr_t)(update->entries + offset) & ~(CK_MD_CACHELINE - 1));
710
711			for (j = 0; j < CK_HT_BUCKET_LENGTH; j++) {
712				struct ck_ht_entry *cursor = bucket + ((j + offset) & (CK_HT_BUCKET_LENGTH - 1));
713
714				probes++;
715				if (CK_CC_LIKELY(cursor->key == CK_HT_KEY_EMPTY)) {
716					*cursor = *previous;
717					update->n_entries++;
718					ck_ht_map_bound_set(update, h, probes);
719					break;
720				}
721			}
722
723			if (j < CK_HT_BUCKET_LENGTH)
724				break;
725
726			offset = ck_ht_map_probe_next(update, offset, h, probes);
727		}
728
729		if (i == update->probe_limit) {
730			/*
731			 * We have hit the probe limit, the map needs to be even
732			 * larger.
733			 */
734			ck_ht_map_destroy(table->m, update, false);
735			capacity <<= 1;
736			goto restart;
737		}
738	}
739
740	ck_pr_fence_store();
741	ck_pr_store_ptr_unsafe(&table->map, update);
742	ck_ht_map_destroy(table->m, map, true);
743	return true;
744}
745
746bool
747ck_ht_remove_spmc(struct ck_ht *table,
748    ck_ht_hash_t h,
749    ck_ht_entry_t *entry)
750{
751	struct ck_ht_map *map;
752	struct ck_ht_entry *candidate, snapshot;
753
754	map = table->map;
755
756	if (table->mode & CK_HT_MODE_BYTESTRING) {
757		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
758		    ck_ht_entry_key(entry),
759		    ck_ht_entry_key_length(entry));
760	} else {
761		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
762		    (void *)entry->key,
763		    sizeof(entry->key));
764	}
765
766	/* No matching entry was found. */
767	if (candidate == NULL || snapshot.key == CK_HT_KEY_EMPTY)
768		return false;
769
770	*entry = snapshot;
771
772	ck_pr_store_ptr_unsafe(&candidate->key, (void *)CK_HT_KEY_TOMBSTONE);
773	ck_pr_fence_store();
774	CK_HT_TYPE_STORE(&map->n_entries, map->n_entries - 1);
775	return true;
776}
777
778bool
779ck_ht_get_spmc(struct ck_ht *table,
780    ck_ht_hash_t h,
781    ck_ht_entry_t *entry)
782{
783	struct ck_ht_entry *candidate, snapshot;
784	struct ck_ht_map *map;
785	CK_HT_TYPE d, d_prime;
786
787restart:
788	map = ck_pr_load_ptr(&table->map);
789
790	/*
791	 * Platforms that cannot read key and key_length atomically must reprobe
792	 * on the scan of any single entry.
793	 */
794	d = CK_HT_TYPE_LOAD(&map->deletions);
795
796	if (table->mode & CK_HT_MODE_BYTESTRING) {
797		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
798		    ck_ht_entry_key(entry), ck_ht_entry_key_length(entry));
799	} else {
800		candidate = ck_ht_map_probe_rd(map, h, &snapshot,
801		    (void *)entry->key, sizeof(entry->key));
802	}
803
804	d_prime = CK_HT_TYPE_LOAD(&map->deletions);
805	if (d != d_prime) {
806		/*
807		 * It is possible we have read (K, V'). Only valid states are
808		 * (K, V), (K', V') and (T, V). Restart load operation in face
809		 * of concurrent deletions or replacements.
810		 */
811		goto restart;
812	}
813
814	if (candidate == NULL || snapshot.key == CK_HT_KEY_EMPTY)
815		return false;
816
817	*entry = snapshot;
818	return true;
819}
820
821bool
822ck_ht_set_spmc(struct ck_ht *table,
823    ck_ht_hash_t h,
824    ck_ht_entry_t *entry)
825{
826	struct ck_ht_entry snapshot, *candidate, *priority;
827	struct ck_ht_map *map;
828	CK_HT_TYPE probes, probes_wr;
829	bool empty = false;
830
831	for (;;) {
832		map = table->map;
833
834		if (table->mode & CK_HT_MODE_BYTESTRING) {
835			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
836			    ck_ht_entry_key(entry),
837			    ck_ht_entry_key_length(entry),
838			    &probes, &probes_wr);
839		} else {
840			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
841			    (void *)entry->key,
842			    sizeof(entry->key),
843			    &probes, &probes_wr);
844		}
845
846		if (priority != NULL) {
847			probes = probes_wr;
848			break;
849		}
850
851		if (candidate != NULL)
852			break;
853
854		if (ck_ht_grow_spmc(table, map->capacity << 1) == false)
855			return false;
856	}
857
858	if (candidate == NULL) {
859		candidate = priority;
860		empty = true;
861	}
862
863	if (candidate->key != CK_HT_KEY_EMPTY &&
864	    priority != NULL && candidate != priority) {
865		/*
866		 * Entry is moved into another position in probe sequence.
867		 * We avoid a state of (K, B) (where [K, B] -> [K', B]) by
868		 * guaranteeing a forced reprobe before transitioning from K to
869		 * T. (K, B) implies (K, B, D') so we will reprobe successfully
870		 * from this transient state.
871		 */
872		probes = probes_wr;
873
874#ifndef CK_HT_PP
875		CK_HT_TYPE_STORE(&priority->key_length, entry->key_length);
876		CK_HT_TYPE_STORE(&priority->hash, entry->hash);
877#endif
878
879		/*
880		 * Readers must observe version counter change before they
881		 * observe re-use. If they observe re-use, it is at most
882		 * a tombstone.
883		 */
884		if (priority->value == CK_HT_KEY_TOMBSTONE) {
885			CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
886			ck_pr_fence_store();
887		}
888
889		ck_pr_store_ptr_unsafe(&priority->value, (void *)entry->value);
890		ck_pr_fence_store();
891		ck_pr_store_ptr_unsafe(&priority->key, (void *)entry->key);
892		ck_pr_fence_store();
893
894		/*
895		 * Make sure that readers who observe the tombstone would
896		 * also observe counter change.
897		 */
898		CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
899		ck_pr_fence_store();
900
901		ck_pr_store_ptr_unsafe(&candidate->key, (void *)CK_HT_KEY_TOMBSTONE);
902		ck_pr_fence_store();
903	} else {
904		/*
905		 * In this case we are inserting a new entry or replacing
906		 * an existing entry. Yes, this can be combined into above branch,
907		 * but isn't because you are actually looking at dying code
908		 * (ck_ht is effectively deprecated and is being replaced soon).
909		 */
910		bool replace = candidate->key != CK_HT_KEY_EMPTY &&
911		    candidate->key != CK_HT_KEY_TOMBSTONE;
912
913		if (priority != NULL) {
914			if (priority->key == CK_HT_KEY_TOMBSTONE) {
915				CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
916				ck_pr_fence_store();
917			}
918
919			candidate = priority;
920			probes = probes_wr;
921		}
922
923#ifdef CK_HT_PP
924		ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
925		ck_pr_fence_store();
926		ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
927#else
928		CK_HT_TYPE_STORE(&candidate->key_length, entry->key_length);
929		CK_HT_TYPE_STORE(&candidate->hash, entry->hash);
930		ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
931		ck_pr_fence_store();
932		ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
933#endif
934
935		/*
936		 * If we are insert a new entry then increment number
937		 * of entries associated with map.
938		 */
939		if (replace == false)
940			CK_HT_TYPE_STORE(&map->n_entries, map->n_entries + 1);
941	}
942
943	ck_ht_map_bound_set(map, h, probes);
944
945	/* Enforce a load factor of 0.5. */
946	if (map->n_entries * 2 > map->capacity)
947		ck_ht_grow_spmc(table, map->capacity << 1);
948
949	if (empty == true) {
950		entry->key = CK_HT_KEY_EMPTY;
951	} else {
952		*entry = snapshot;
953	}
954
955	return true;
956}
957
958bool
959ck_ht_put_spmc(struct ck_ht *table,
960    ck_ht_hash_t h,
961    ck_ht_entry_t *entry)
962{
963	struct ck_ht_entry snapshot, *candidate, *priority;
964	struct ck_ht_map *map;
965	CK_HT_TYPE probes, probes_wr;
966
967	for (;;) {
968		map = table->map;
969
970		if (table->mode & CK_HT_MODE_BYTESTRING) {
971			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
972			    ck_ht_entry_key(entry),
973			    ck_ht_entry_key_length(entry),
974			    &probes, &probes_wr);
975		} else {
976			candidate = ck_ht_map_probe_wr(map, h, &snapshot, &priority,
977			    (void *)entry->key,
978			    sizeof(entry->key),
979			    &probes, &probes_wr);
980		}
981
982		if (candidate != NULL || priority != NULL)
983			break;
984
985		if (ck_ht_grow_spmc(table, map->capacity << 1) == false)
986			return false;
987	}
988
989	if (priority != NULL) {
990		/* Version counter is updated before re-use. */
991		CK_HT_TYPE_STORE(&map->deletions, map->deletions + 1);
992		ck_pr_fence_store();
993
994		/* Re-use tombstone if one was found. */
995		candidate = priority;
996		probes = probes_wr;
997	} else if (candidate->key != CK_HT_KEY_EMPTY &&
998	    candidate->key != CK_HT_KEY_TOMBSTONE) {
999		/*
1000		 * If the snapshot key is non-empty and the value field is not
1001		 * a tombstone then an identical key was found. As store does
1002		 * not implement replacement, we will fail.
1003		 */
1004		return false;
1005	}
1006
1007	ck_ht_map_bound_set(map, h, probes);
1008
1009#ifdef CK_HT_PP
1010	ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
1011	ck_pr_fence_store();
1012	ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
1013#else
1014	CK_HT_TYPE_STORE(&candidate->key_length, entry->key_length);
1015	CK_HT_TYPE_STORE(&candidate->hash, entry->hash);
1016	ck_pr_store_ptr_unsafe(&candidate->value, (void *)entry->value);
1017	ck_pr_fence_store();
1018	ck_pr_store_ptr_unsafe(&candidate->key, (void *)entry->key);
1019#endif
1020
1021	CK_HT_TYPE_STORE(&map->n_entries, map->n_entries + 1);
1022
1023	/* Enforce a load factor of 0.5. */
1024	if (map->n_entries * 2 > map->capacity)
1025		ck_ht_grow_spmc(table, map->capacity << 1);
1026
1027	return true;
1028}
1029
1030void
1031ck_ht_destroy(struct ck_ht *table)
1032{
1033
1034	ck_ht_map_destroy(table->m, table->map, false);
1035	return;
1036}
1037