1/** 2 * \file 3 * \brief Definitions for external C predicates used in Prolog code of 4 * the octopus server implementation. 5 */ 6 7/* 8 * Copyright (c) 2011, ETH Zurich. 9 * All rights reserved. 10 * 11 * This file is distributed under the terms in the attached LICENSE file. 12 * If you do not find this file, copies can be found by writing to: 13 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group. 14 */ 15#define _USE_XOPEN /* for strdup() */ 16#include <stdio.h> 17#include <string.h> 18 19#include <eclipse.h> 20#include <barrelfish/barrelfish.h> 21#include <include/skb_server.h> 22#include <collections/hash_table.h> 23 24#include <if/octopus_defs.h> 25#include <octopus_server/debug.h> 26#include <octopus_server/service.h> 27#include <octopus/trigger.h> // for trigger modes 28 29#include "predicates.h" 30#include "skiplist.h" 31#include "bitfield.h" 32#include "fnv.h" 33 34#define HASH_INDEX_BUCKETS 6151 35static collections_hash_table* record_index = NULL; 36 37static collections_hash_table* trigger_index = NULL; 38static struct bitfield* no_attr_triggers = NULL; 39 40static collections_hash_table* subscriber_index = NULL; 41static struct bitfield* no_attr_subscriptions = NULL; 42 43static inline void init_index(void) { 44 if(record_index == NULL) { 45 collections_hash_create_with_buckets(&record_index, HASH_INDEX_BUCKETS, NULL); 46 } 47 48 if(subscriber_index == NULL) { 49 collections_hash_create_with_buckets(&subscriber_index, HASH_INDEX_BUCKETS, NULL); 50 bitfield_create(&no_attr_subscriptions); 51 } 52 53 if(trigger_index == NULL) { 54 collections_hash_create_with_buckets(&trigger_index, HASH_INDEX_BUCKETS, NULL); 55 bitfield_create(&no_attr_triggers); 56 } 57} 58 59 60static int skip_index_insert(collections_hash_table* ht, uint64_t key, char* value) 61{ 62 assert(ht != NULL); 63 assert(value != NULL); 64 65 struct skip_list* sl = (struct skip_list*) collections_hash_find(ht, key); 66 if (sl == NULL) { 67 errval_t err = skip_create_list(&sl); 68 if (err_is_fail(err)) { 69 return PFAIL; 70 } 71 collections_hash_insert(ht, key, sl); 72 } 73 74 skip_insert(sl, value); 75 //skip_print_list(sl); 76 77 return PSUCCEED; 78} 79 80static char* skip_index_remove(collections_hash_table* ht, uint64_t key, char* value) 81{ 82 assert(ht != NULL); 83 assert(value != NULL); 84 85 struct skip_list* sl = (struct skip_list*) collections_hash_find(ht, key); 86 if (sl == NULL) { 87 return NULL; 88 } 89 90 char* record_name = skip_delete(sl, value); 91 92 //skip_print_list(sl); 93 return record_name; 94} 95 96int p_save_index(void) 97{ 98 OCT_DEBUG("p_save_index\n"); 99 init_index(); 100 101 char* value = NULL; 102 int res = ec_get_string(ec_arg(3), &value); 103 assert(res == PSUCCEED); 104 105 char* record_name = strdup(value); 106 bool inserted = false; 107 108 pword list, cur, rest; 109 pword attribute_term; 110 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) { 111 ec_get_arg(1, cur, &attribute_term); 112 113 char* attribute; 114 ec_get_string(attribute_term, &attribute); 115 116 OCT_DEBUG("insert %s(%p) into index[%s]=", record_name, record_name, attribute); 117 uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT); 118 int res = skip_index_insert(record_index, key, record_name); 119 assert(res == PSUCCEED); 120 inserted = true; 121 } 122 123 if (!inserted) { 124 free(record_name); 125 } 126 127 return PSUCCEED; 128} 129 130int p_remove_index(void) 131{ 132 int res; 133 char* to_free = NULL; 134 init_index(); 135 136 char* name = NULL; 137 res = ec_get_string(ec_arg(3), &name); 138 assert(res == PSUCCEED); 139 140 pword list, cur, rest; 141 pword attribute_term; 142 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) { 143 ec_get_arg(1, cur, &attribute_term); 144 145 char* attribute; 146 res = ec_get_string(attribute_term, &attribute); 147 assert(res == PSUCCEED); 148 149 uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT); 150 to_free = skip_index_remove(record_index, key, name); 151 OCT_DEBUG("removed %s(%p) from index[%s]=", name, to_free, attribute); 152 //assert(to_free != NULL); 153 } 154 155 free(to_free); 156 return PSUCCEED; 157} 158 159int p_index_intersect(void) /* p_index_intersect(type, -[Attributes], -Current, +Next) */ 160{ 161 OCT_DEBUG("p_index_intersect\n"); 162 static struct skip_list** sets = NULL; 163 static char* next = NULL; 164 static size_t elems = 0; 165 166 int res; 167 char* key; 168 169 init_index(); 170 171 char* index_type = NULL; 172 res = ec_get_string(ec_arg(1), &index_type); 173 if (res != PSUCCEED) { 174 return res; 175 } 176 collections_hash_table* ht = record_index; 177 178 res = ec_get_string(ec_arg(3), &next); 179 if (res != PSUCCEED) { 180 OCT_DEBUG("state is not a string, find skip lists\n"); 181 free(sets); 182 pword list, cur, rest; 183 184 elems = 0; 185 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) { 186 elems++; 187 } 188 sets = malloc(sizeof(struct skip_list*) * elems); 189 190 size_t i = 0; 191 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) { 192 res = ec_get_string(cur, &key); 193 if (res != PSUCCEED) { 194 return res; 195 } 196 197 uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT); 198 struct skip_list* sl = collections_hash_find(ht, hash_key); 199 if (sl == NULL) { 200 return PFAIL; 201 } 202 OCT_DEBUG("skip_intersect found skip list for key: %s\n", key); 203 //skip_print_list(sl); 204 205 sets[i] = sl; 206 i++; 207 } 208 next = NULL; 209 } 210 211 next = skip_intersect(sets, elems, next); 212 OCT_DEBUG("skip_intersect found next: %s\n", next); 213 if(next != NULL) { 214 dident item = ec_did(next, 0); 215 return ec_unify_arg(4, ec_atom(item)); 216 } 217 218 return PFAIL; 219} 220 221int p_index_union(void) /* p_index_union(type, -[Attributes], -Current, +Next) */ 222{ 223 OCT_DEBUG("p_index_union\n"); 224 static collections_hash_table* union_ht = NULL; 225 static char* next = NULL; 226 227 int res; 228 char* key; 229 230 init_index(); 231 232 char* index_type = NULL; 233 res = ec_get_string(ec_arg(1), &index_type); 234 if (res != PSUCCEED) { 235 return res; 236 } 237 collections_hash_table* ht = record_index; // TODO broken 238 239 res = ec_get_string(ec_arg(3), &next); 240 if (res != PSUCCEED) { 241 OCT_DEBUG("state is not a string, find skip lists\n"); 242 if (union_ht != NULL) { 243 collections_hash_release(union_ht); 244 union_ht = NULL; 245 } 246 collections_hash_create_with_buckets(&union_ht, HASH_INDEX_BUCKETS, NULL); 247 248 pword list, cur, rest; 249 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) { 250 res = ec_get_string(cur, &key); 251 if (res != PSUCCEED) { 252 return res; 253 } 254 255 uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT); 256 struct skip_list* sl = collections_hash_find(ht, hash_key); 257 258 // Insert all entries in union hash table 259 if (sl != NULL) { 260 OCT_DEBUG("p_index_union found skip list for key: %s\n", key); 261 //skip_print_list(sl); 262 263 struct skip_node* sentry = sl->header->forward[0]; 264 while(sentry != NULL) { 265 uint64_t hash_key = fnv_64a_str(sentry->element, FNV1A_64_INIT); 266 if(collections_hash_find(union_ht, hash_key) == NULL) { 267 OCT_DEBUG("p_index_union insert: %s\n", sentry->element); 268 collections_hash_insert(union_ht, hash_key, sentry->element); 269 } 270 sentry = sentry->forward[0]; 271 } 272 } 273 274 } 275 next = NULL; 276 collections_hash_traverse_start(union_ht); 277 } 278 279 uint64_t hash_key; 280 next = collections_hash_traverse_next(union_ht, &hash_key); 281 OCT_DEBUG("skip_union found next: %s\n", next); 282 if(next != NULL) { 283 dident item = ec_did(next, 0); 284 return ec_unify_arg(4, ec_atom(item)); 285 } 286 else { 287 collections_hash_traverse_end(union_ht); 288 return PFAIL; 289 } 290} 291 292 293 294static int bitfield_index_insert(collections_hash_table* ht, uint64_t key, long int id) 295{ 296 assert(ht != NULL); 297 298 struct bitfield* bf = (struct bitfield*) collections_hash_find(ht, key); 299 if (bf == NULL) { 300 errval_t err = bitfield_create(&bf); 301 if (err_is_fail(err)) { 302 return PFAIL; 303 } 304 collections_hash_insert(ht, key, bf); 305 } 306 307 bitfield_on(bf, id); 308 return PSUCCEED; 309} 310 311static int bitfield_index_remove(collections_hash_table* ht, uint64_t key, long int id) 312{ 313 assert(ht != NULL); 314 315 struct bitfield* bf = (struct bitfield*) collections_hash_find(ht, key); 316 if (bf != NULL) { 317 bitfield_off(bf, id); 318 } 319 320 return PSUCCEED; 321} 322 323int p_bitfield_add(void) /* p_bitfield_add(Storage, +Name, +[AttributeList], +Id) */ 324{ 325 init_index(); 326 int res = 0; 327 long int id; 328 bool inserted = false; 329 330 collections_hash_table* ht = NULL; 331 struct bitfield* no_attr_bf = NULL; 332 333 char* storage; 334 res = ec_get_string(ec_arg(1), &storage); 335 if (strcmp(storage, "trigger") == 0) { 336 ht = trigger_index; 337 no_attr_bf = no_attr_triggers; 338 } 339 else { 340 ht = subscriber_index; 341 no_attr_bf = no_attr_subscriptions; 342 } 343 344 res = ec_get_long(ec_arg(3), &id); 345 if (res != PSUCCEED) { 346 return PFAIL; 347 } 348 349 pword list, cur, rest; 350 pword attribute_term; 351 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) { 352 ec_get_arg(1, cur, &attribute_term); 353 354 char* attribute; 355 ec_get_string(attribute_term, &attribute); 356 uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT); 357 358 int res = bitfield_index_insert(ht, key, id); 359 assert(res == PSUCCEED); 360 inserted = true; 361 } 362 363 if (!inserted) { 364 bitfield_on(no_attr_bf, id); 365 } 366 367 return PSUCCEED; 368} 369 370int p_bitfield_remove(void) /* p_bitfield_remove(Storage, +Name, +[AttributeList], +Id) */ 371{ 372 init_index(); 373 374 int res = 0; 375 long int id; 376 377 collections_hash_table* ht = NULL; 378 struct bitfield* no_attr_bf = NULL; 379 380 char* storage; 381 res = ec_get_string(ec_arg(1), &storage); 382 if (strcmp(storage, "trigger") == 0) { 383 ht = trigger_index; 384 no_attr_bf = no_attr_triggers; 385 } 386 else { 387 ht = subscriber_index; 388 no_attr_bf = no_attr_subscriptions; 389 } 390 391 res = ec_get_long(ec_arg(3), &id); 392 if (res != PSUCCEED) { 393 return PFAIL; 394 } 395 396 pword list, cur, rest; 397 pword attribute_term; 398 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) { 399 ec_get_arg(1, cur, &attribute_term); 400 401 char* attribute; 402 res = ec_get_string(attribute_term, &attribute); 403 assert(res == PSUCCEED); 404 405 uint64_t key = fnv_64a_str(attribute, FNV1A_64_INIT); 406 bitfield_index_remove(ht, key, id); 407 } 408 409 bitfield_off(no_attr_bf, id); 410 return PSUCCEED; 411} 412 413int p_bitfield_union(void) /* p_index_union(Storage, -[Attributes], -Current, +Next) */ 414{ 415 OCT_DEBUG("p_bitfield_union\n"); 416 static struct bitfield** sets = NULL; 417 static long int next = -1; 418 static size_t elems = 0; 419 420 int res; 421 char* key; 422 423 init_index(); 424 collections_hash_table* ht = NULL; 425 struct bitfield* no_attr_bf = NULL; 426 427 char* storage = NULL; 428 res = ec_get_string(ec_arg(1), &storage); 429 if (strcmp(storage, "trigger") == 0) { 430 ht = trigger_index; 431 no_attr_bf = no_attr_triggers; 432 } 433 else { 434 ht = subscriber_index; 435 no_attr_bf = no_attr_subscriptions; 436 } 437 438 res = ec_get_long(ec_arg(3), &next); 439 if (res != PSUCCEED) { 440 OCT_DEBUG("state is not a id, find bitmaps\n"); 441 free(sets); 442 pword list, cur, rest; 443 444 elems = 0; 445 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) { 446 elems++; 447 } 448 sets = calloc(elems+1, sizeof(struct bitfield*)); 449 sets[0] = no_attr_bf; 450 451 elems = 1; 452 for (list = ec_arg(2); ec_get_list(list, &cur, &rest) == PSUCCEED; list = rest) { 453 res = ec_get_string(cur, &key); 454 if (res != PSUCCEED) { 455 return res; 456 } 457 458 uint64_t hash_key = fnv_64a_str(key, FNV1A_64_INIT); 459 struct bitfield* sl = collections_hash_find(ht, hash_key); 460 if (sl != NULL) { 461 OCT_DEBUG("bitfield_union found bitfield for key: %s\n", key); 462 sets[elems++] = sl; 463 } 464 // else: no record with this attribute, just ignore 465 466 } 467 next = -1; 468 } 469 470 next = bitfield_union(sets, elems, next); 471 OCT_DEBUG("bitfield_union found next: %ld\n", next); 472 if(next != -1) { 473 pword item = ec_long(next); 474 return ec_unify_arg(4, item); 475 } 476 477 return PFAIL; 478} 479 480void oct_rpc_enqueue_reply(struct octopus_binding *b, struct oct_reply_state* st); 481extern struct bitfield* trigger_ids; 482 483int p_trigger_watch(void) /* p_trigger_watch(+String, +Mode, +Recipient, +WatchId, -Retract) */ 484{ 485 int res; 486 OCT_DEBUG("\n*** p_trigger_watch: start\n"); 487 488 // Get arguments 489 char* record = NULL; 490 res = ec_get_string(ec_arg(1), &record); 491 if (res != PSUCCEED) { 492 assert(ec_is_var(ec_arg(1)) == PSUCCEED); 493 // record will be null 494 // can happen in case we send OCT_REMOVED 495 } 496 497 // Action that triggered the event 498 long int action = 0; 499 res = ec_get_long(ec_arg(2), &action); 500 if (res != PSUCCEED) { 501 return res; 502 } 503 504 // Mode of watch 505 long int watch_mode = 0; 506 res = ec_get_long(ec_arg(3), &watch_mode); 507 if (res != PSUCCEED) { 508 return res; 509 } 510 511 struct oct_reply_state* drs = NULL; 512 res = ec_get_long(ec_arg(4), (long int*) &drs); 513 if (res != PSUCCEED) { 514 return res; 515 } 516 assert(drs != NULL); 517 OCT_DEBUG("drs is: %p\n", drs); 518 519 long int watch_id = 0; 520 res = ec_get_long(ec_arg(5), &watch_id); 521 if (res != PSUCCEED) { 522 return res; 523 } 524 525 OCT_DEBUG("p_trigger_watch: %s\n", record); 526 OCT_DEBUG("drs->binding: %p\n", drs->binding); 527 OCT_DEBUG("drs->reply: %p\n", drs->reply); 528 529 530 drs->error = SYS_ERR_OK; 531 bool retract = !(watch_mode & OCT_PERSIST); 532 if (record != NULL) { 533 assert(strlen(record)+1 < MAX_QUERY_LENGTH); 534 strcpy(drs->query_state.std_out.buffer, record); 535 } 536 else { 537 drs->query_state.std_out.buffer[0] = '\0'; 538 drs->query_state.std_out.length = 0; 539 } 540 541 if (drs->binding != NULL && drs->reply != NULL) { 542 543 if (!retract) { 544 // Copy reply state because the trigger will stay intact 545 struct oct_reply_state* drs_copy = NULL; 546 errval_t err = new_oct_reply_state(&drs_copy, NULL); 547 assert(err_is_ok(err)); 548 memcpy(drs_copy, drs, sizeof(struct oct_reply_state)); 549 drs = drs_copy; // overwrite drs 550 } 551 else { 552 assert(trigger_ids != NULL); 553 OCT_DEBUG("turn off trigger id: %lu\n", watch_id); 554 bitfield_off(trigger_ids, watch_id); 555 } 556 557 drs->mode = (retract) ? (action | OCT_REMOVED) : action; 558 559 if (drs->binding->st != NULL) { 560 oct_rpc_enqueue_reply(drs->binding, drs); 561 } 562 else { 563 drs->reply(drs->binding, drs); 564 } 565 } 566 else { 567 USER_PANIC("No binding set for watch_id: %lu", watch_id); 568 } 569 570 OCT_DEBUG("p_trigger_watch: done"); 571 return ec_unify_arg(6, ec_long(retract)); 572} 573