1/**
2 * \file
3 * \brief Definitions for external C predicates used in Prolog code of
4 * the octopus server implementation.
5 */
6
7/*
8 * Copyright (c) 2011, ETH Zurich.
9 * All rights reserved.
10 *
11 * This file is distributed under the terms in the attached LICENSE file.
12 * If you do not find this file, copies can be found by writing to:
13 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
14 */
15#define _USE_XOPEN /* for strdup() */
16#include <stdio.h>
17#include <string.h>
18
19#include <eclipse.h>
20#include <barrelfish/barrelfish.h>
21#include <include/skb_server.h>
22#include <collections/hash_table.h>
23
24#include <if/octopus_defs.h>
25#include <octopus_server/debug.h>
26#include <octopus_server/service.h>
27#include <octopus/trigger.h> // for trigger modes
28
29#include "predicates.h"
30#include "skiplist.h"
31#include "bitfield.h"
32#include "fnv.h"
33
34#define HASH_INDEX_BUCKETS 6151
35static collections_hash_table* record_index = NULL;
36
37static collections_hash_table* trigger_index = NULL;
38static struct bitfield* no_attr_triggers = NULL;
39
40static collections_hash_table* subscriber_index = NULL;
41static struct bitfield* no_attr_subscriptions = NULL;
42
43static inline void init_index(void) {
44    if(record_index == NULL) {
45        collections_hash_create_with_buckets(&record_index, HASH_INDEX_BUCKETS, NULL);
46    }
47
48    if(subscriber_index == NULL) {
49        collections_hash_create_with_buckets(&subscriber_index, HASH_INDEX_BUCKETS, NULL);
50        bitfield_create(&no_attr_subscriptions);
51    }
52
53    if(trigger_index == NULL) {
54        collections_hash_create_with_buckets(&trigger_index, HASH_INDEX_BUCKETS, NULL);
55        bitfield_create(&no_attr_triggers);
56    }
57}
58
59
60static int skip_index_insert(collections_hash_table* ht, uint64_t key, char* value)
61{
62    assert(ht != NULL);
63    assert(value != NULL);
64
65    struct skip_list* sl = (struct skip_list*) collections_hash_find(ht, key);
66    if (sl == NULL) {
67        errval_t err = skip_create_list(&sl);
68        if (err_is_fail(err)) {
69            return PFAIL;
70        }
71        collections_hash_insert(ht, key, sl);
72    }
73
74    skip_insert(sl, value);
75    //skip_print_list(sl);
76
77    return PSUCCEED;
78}
79
80static char* skip_index_remove(collections_hash_table* ht, uint64_t key, char* value)
81{
82    assert(ht != NULL);
83    assert(value != NULL);
84
85    struct skip_list* sl = (struct skip_list*) collections_hash_find(ht, key);
86    if (sl == NULL) {
87        return NULL;
88    }
89
90    char* record_name = skip_delete(sl, value);
91
92    //skip_print_list(sl);
93    return record_name;
94}
95
96int p_save_index(void)
97{
98    OCT_DEBUG("p_save_index\n");
99    init_index();
100
101    char* value = NULL;
102    int res = ec_get_string(ec_arg(3), &value);
103    assert(res == PSUCCEED);
104
105    char* record_name = strdup(value);
106    bool inserted = false;
107
108    pword list, cur, rest;
109    pword attribute_term;
110    for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
111        ec_get_arg(1, cur, &attribute_term);
112
113        char* attribute;
114        ec_get_string(attribute_term, &attribute);
115
116        OCT_DEBUG("insert %s(%p) into index[%s]=", record_name, record_name, attribute);
117        uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
118        int res = skip_index_insert(record_index, key, record_name);
119        assert(res == PSUCCEED);
120        inserted = true;
121    }
122
123    if (!inserted) {
124        free(record_name);
125    }
126
127    return PSUCCEED;
128}
129
130int p_remove_index(void)
131{
132    int res;
133    char* to_free = NULL;
134    init_index();
135
136    char* name = NULL;
137    res = ec_get_string(ec_arg(3), &name);
138    assert(res == PSUCCEED);
139
140    pword list, cur, rest;
141    pword attribute_term;
142    for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
143        ec_get_arg(1, cur, &attribute_term);
144
145        char* attribute;
146        res = ec_get_string(attribute_term, &attribute);
147        assert(res == PSUCCEED);
148
149        uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
150        to_free = skip_index_remove(record_index, key, name);
151        OCT_DEBUG("removed %s(%p) from index[%s]=", name, to_free, attribute);
152        //assert(to_free != NULL);
153    }
154
155    free(to_free);
156    return PSUCCEED;
157}
158
159int p_index_intersect(void) /* p_index_intersect(type, -[Attributes], -Current, +Next) */
160{
161    OCT_DEBUG("p_index_intersect\n");
162    static struct skip_list** sets = NULL;
163    static char* next = NULL;
164    static size_t elems = 0;
165
166    int res;
167    char* key;
168
169    init_index();
170
171    char* index_type = NULL;
172    res = ec_get_string(ec_arg(1), &index_type);
173    if (res != PSUCCEED) {
174        return res;
175    }
176    collections_hash_table* ht = record_index;
177
178    res = ec_get_string(ec_arg(3), &next);
179    if (res != PSUCCEED) {
180        OCT_DEBUG("state is not a string, find skip lists\n");
181        free(sets);
182        pword list, cur, rest;
183
184        elems = 0;
185        for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
186            elems++;
187        }
188        sets = malloc(sizeof(struct skip_list*) * elems);
189
190        size_t i = 0;
191        for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
192            res = ec_get_string(cur, &key);
193            if (res != PSUCCEED) {
194                return res;
195            }
196
197            uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT);
198            struct skip_list* sl = collections_hash_find(ht, hash_key);
199            if (sl == NULL) {
200                return PFAIL;
201            }
202            OCT_DEBUG("skip_intersect found skip list for key: %s\n", key);
203            //skip_print_list(sl);
204
205            sets[i] = sl;
206            i++;
207        }
208        next = NULL;
209    }
210
211    next = skip_intersect(sets, elems, next);
212    OCT_DEBUG("skip_intersect found next: %s\n", next);
213    if(next != NULL) {
214        dident item = ec_did(next, 0);
215        return ec_unify_arg(4, ec_atom(item));
216    }
217
218    return PFAIL;
219}
220
221int p_index_union(void) /* p_index_union(type, -[Attributes], -Current, +Next) */
222{
223    OCT_DEBUG("p_index_union\n");
224    static collections_hash_table* union_ht = NULL;
225    static char* next = NULL;
226
227    int res;
228    char* key;
229
230    init_index();
231
232    char* index_type = NULL;
233    res = ec_get_string(ec_arg(1), &index_type);
234    if (res != PSUCCEED) {
235        return res;
236    }
237    collections_hash_table* ht = record_index; // TODO broken
238
239    res = ec_get_string(ec_arg(3), &next);
240    if (res != PSUCCEED) {
241        OCT_DEBUG("state is not a string, find skip lists\n");
242        if (union_ht != NULL) {
243            collections_hash_release(union_ht);
244            union_ht = NULL;
245        }
246        collections_hash_create_with_buckets(&union_ht, HASH_INDEX_BUCKETS, NULL);
247
248        pword list, cur, rest;
249        for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
250            res = ec_get_string(cur, &key);
251            if (res != PSUCCEED) {
252                return res;
253            }
254
255            uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT);
256            struct skip_list* sl = collections_hash_find(ht, hash_key);
257
258            // Insert all entries in union hash table
259            if (sl != NULL) {
260                OCT_DEBUG("p_index_union found skip list for key: %s\n", key);
261                //skip_print_list(sl);
262
263                struct skip_node* sentry = sl->header->forward[0];
264                while(sentry != NULL) {
265                    uint64_t hash_key = fnv_64a_str(sentry->element, FNV1A_64_INIT);
266                    if(collections_hash_find(union_ht, hash_key) == NULL) {
267                        OCT_DEBUG("p_index_union insert: %s\n", sentry->element);
268                        collections_hash_insert(union_ht, hash_key, sentry->element);
269                    }
270                    sentry = sentry->forward[0];
271                }
272            }
273
274        }
275        next = NULL;
276        collections_hash_traverse_start(union_ht);
277    }
278
279    uint64_t hash_key;
280    next = collections_hash_traverse_next(union_ht, &hash_key);
281    OCT_DEBUG("skip_union found next: %s\n", next);
282    if(next != NULL) {
283        dident item = ec_did(next, 0);
284        return ec_unify_arg(4, ec_atom(item));
285    }
286    else {
287        collections_hash_traverse_end(union_ht);
288        return PFAIL;
289    }
290}
291
292
293
294static int bitfield_index_insert(collections_hash_table* ht, uint64_t key, long int id)
295{
296    assert(ht != NULL);
297
298    struct bitfield* bf = (struct bitfield*) collections_hash_find(ht, key);
299    if (bf == NULL) {
300        errval_t err = bitfield_create(&bf);
301        if (err_is_fail(err)) {
302            return PFAIL;
303        }
304        collections_hash_insert(ht, key, bf);
305    }
306
307    bitfield_on(bf, id);
308    return PSUCCEED;
309}
310
311static int bitfield_index_remove(collections_hash_table* ht, uint64_t key, long int id)
312{
313    assert(ht != NULL);
314
315    struct bitfield* bf = (struct bitfield*) collections_hash_find(ht, key);
316    if (bf != NULL) {
317        bitfield_off(bf, id);
318    }
319
320    return PSUCCEED;
321}
322
323int p_bitfield_add(void) /* p_bitfield_add(Storage, +Name, +[AttributeList], +Id) */
324{
325    init_index();
326    int res = 0;
327    long int id;
328    bool inserted = false;
329
330    collections_hash_table* ht = NULL;
331    struct bitfield* no_attr_bf = NULL;
332
333    char* storage;
334    res = ec_get_string(ec_arg(1), &storage);
335    if (strcmp(storage, "trigger") == 0) {
336        ht = trigger_index;
337        no_attr_bf = no_attr_triggers;
338    }
339    else {
340        ht = subscriber_index;
341        no_attr_bf = no_attr_subscriptions;
342    }
343
344    res = ec_get_long(ec_arg(3), &id);
345    if (res != PSUCCEED) {
346        return PFAIL;
347    }
348
349    pword list, cur, rest;
350    pword attribute_term;
351    for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
352        ec_get_arg(1, cur, &attribute_term);
353
354        char* attribute;
355        ec_get_string(attribute_term, &attribute);
356        uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
357
358        int res = bitfield_index_insert(ht, key, id);
359        assert(res == PSUCCEED);
360        inserted = true;
361    }
362
363    if (!inserted) {
364        bitfield_on(no_attr_bf, id);
365    }
366
367    return PSUCCEED;
368}
369
370int p_bitfield_remove(void) /* p_bitfield_remove(Storage, +Name, +[AttributeList], +Id) */
371{
372    init_index();
373
374    int res = 0;
375    long int id;
376
377    collections_hash_table* ht = NULL;
378    struct bitfield* no_attr_bf = NULL;
379
380    char* storage;
381    res = ec_get_string(ec_arg(1), &storage);
382    if (strcmp(storage, "trigger") == 0) {
383        ht = trigger_index;
384        no_attr_bf = no_attr_triggers;
385    }
386    else {
387        ht = subscriber_index;
388        no_attr_bf = no_attr_subscriptions;
389    }
390
391    res = ec_get_long(ec_arg(3), &id);
392    if (res != PSUCCEED) {
393        return PFAIL;
394    }
395
396    pword list, cur, rest;
397    pword attribute_term;
398    for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
399        ec_get_arg(1, cur, &attribute_term);
400
401        char* attribute;
402        res = ec_get_string(attribute_term, &attribute);
403        assert(res == PSUCCEED);
404
405        uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT);
406        bitfield_index_remove(ht, key, id);
407    }
408
409    bitfield_off(no_attr_bf, id);
410    return PSUCCEED;
411}
412
413int p_bitfield_union(void) /* p_index_union(Storage, -[Attributes], -Current, +Next) */
414{
415    OCT_DEBUG("p_bitfield_union\n");
416    static struct bitfield** sets = NULL;
417    static long int next = -1;
418    static size_t elems = 0;
419
420    int res;
421    char* key;
422
423    init_index();
424    collections_hash_table* ht = NULL;
425    struct bitfield* no_attr_bf = NULL;
426
427    char* storage = NULL;
428    res = ec_get_string(ec_arg(1), &storage);
429    if (strcmp(storage, "trigger") == 0) {
430        ht = trigger_index;
431        no_attr_bf = no_attr_triggers;
432    }
433    else {
434        ht = subscriber_index;
435        no_attr_bf = no_attr_subscriptions;
436    }
437
438    res = ec_get_long(ec_arg(3), &next);
439    if (res != PSUCCEED) {
440        OCT_DEBUG("state is not a id, find bitmaps\n");
441        free(sets);
442        pword list, cur, rest;
443
444        elems = 0;
445        for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
446            elems++;
447        }
448        sets = calloc(elems+1, sizeof(struct bitfield*));
449        sets[0] = no_attr_bf;
450
451        elems = 1;
452        for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) {
453            res = ec_get_string(cur, &key);
454            if (res != PSUCCEED) {
455                return res;
456            }
457
458            uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT);
459            struct bitfield* sl = collections_hash_find(ht, hash_key);
460            if (sl != NULL) {
461                OCT_DEBUG("bitfield_union found bitfield for key: %s\n", key);
462                sets[elems++] = sl;
463            }
464            // else: no record with this attribute, just ignore
465
466        }
467        next = -1;
468    }
469
470    next = bitfield_union(sets, elems, next);
471    OCT_DEBUG("bitfield_union found next: %ld\n", next);
472    if(next != -1) {
473        pword item = ec_long(next);
474        return ec_unify_arg(4, item);
475    }
476
477    return PFAIL;
478}
479
480void oct_rpc_enqueue_reply(struct octopus_binding *b, struct oct_reply_state* st);
481extern struct bitfield* trigger_ids;
482
483int p_trigger_watch(void) /* p_trigger_watch(+String, +Mode, +Recipient, +WatchId, -Retract) */
484{
485    int res;
486    OCT_DEBUG("\n*** p_trigger_watch: start\n");
487
488    // Get arguments
489    char* record = NULL;
490    res = ec_get_string(ec_arg(1), &record);
491    if (res != PSUCCEED) {
492        assert(ec_is_var(ec_arg(1)) == PSUCCEED);
493        // record will be null
494        // can happen in case we send OCT_REMOVED
495    }
496
497    // Action that triggered the event
498    long int action = 0;
499    res = ec_get_long(ec_arg(2), &action);
500    if (res != PSUCCEED) {
501        return res;
502    }
503
504    // Mode of watch
505    long int watch_mode = 0;
506    res = ec_get_long(ec_arg(3), &watch_mode);
507    if (res != PSUCCEED) {
508        return res;
509    }
510
511    struct oct_reply_state* drs = NULL;
512    res = ec_get_long(ec_arg(4), (long int*) &drs);
513    if (res != PSUCCEED) {
514        return res;
515    }
516    assert(drs != NULL);
517    OCT_DEBUG("drs is: %p\n", drs);
518
519    long int watch_id = 0;
520    res = ec_get_long(ec_arg(5), &watch_id);
521    if (res != PSUCCEED) {
522        return res;
523    }
524
525    OCT_DEBUG("p_trigger_watch: %s\n", record);
526    OCT_DEBUG("drs->binding: %p\n", drs->binding);
527    OCT_DEBUG("drs->reply: %p\n", drs->reply);
528
529
530    drs->error = SYS_ERR_OK;
531    bool retract = !(watch_mode & OCT_PERSIST);
532    if (record != NULL) {
533        assert(strlen(record)+1 < MAX_QUERY_LENGTH);
534        strcpy(drs->query_state.std_out.buffer, record);
535    }
536    else {
537        drs->query_state.std_out.buffer[0] = '\0';
538        drs->query_state.std_out.length = 0;
539    }
540
541    if (drs->binding != NULL && drs->reply != NULL) {
542
543        if (!retract) {
544            // Copy reply state because the trigger will stay intact
545            struct oct_reply_state* drs_copy = NULL;
546            errval_t err = new_oct_reply_state(&drs_copy, NULL);
547            assert(err_is_ok(err));
548            memcpy(drs_copy, drs, sizeof(struct oct_reply_state));
549            drs = drs_copy; // overwrite drs
550        }
551        else {
552            assert(trigger_ids != NULL);
553            OCT_DEBUG("turn off trigger id: %lu\n", watch_id);
554            bitfield_off(trigger_ids, watch_id);
555        }
556
557        drs->mode = (retract) ? (action | OCT_REMOVED) : action;
558
559        if (drs->binding->st != NULL) {
560            oct_rpc_enqueue_reply(drs->binding, drs);
561        }
562        else {
563            drs->reply(drs->binding, drs);
564        }
565    }
566    else {
567        USER_PANIC("No binding set for watch_id: %lu", watch_id);
568    }
569
570    OCT_DEBUG("p_trigger_watch: done");
571    return ec_unify_arg(6, ec_long(retract));
572}
573