1/**
2 * \file
3 * \brief producer consumer library
4 *
5 * This file provides a producer consumer protocol
6 */
7
8/*
9 * Copyright (c) 2007-12 ETH Zurich.
10 * All rights reserved.
11 *
12 * This file is distributed under the terms in the attached LICENSE file.
13 * If you do not find this file, copies can be found by writing to:
14 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
15 */
16#include <stdio.h>
17#include <string.h>
18#include <barrelfish/barrelfish.h>
19#include <barrelfish/bulk_transfer.h>
20#include <procon/procon.h>
21
22
23
24// Hack for profiling to see how much mfence slows the code down
25// should probably not be disabled.
26#define DISABLE_MFENCE    1
27
28#if 0
29static uint64_t sp_atomic_read_reg(union vreg *reg)
30{
31    return reg->value;
32#ifndef DISABLE_MFENCE
33    mfence();
34#endif
35} // end function: sp_atomic_read_reg
36#endif // 0
37
38static void sp_atomic_set_reg(union vreg *reg, uint64_t value)
39{
40    reg->value = value;
41#ifndef DISABLE_MFENCE
42    mfence();
43#endif
44/*
45#if !defined(__i386__)
46        cache_flush_range(reg, CACHESIZE);
47#endif // !defined(__i386__)
48*/
49}
50// 3 mfence
51void sp_reload_regs(struct shared_pool_private *spp)
52{
53    assert(spp != NULL);
54    struct shared_pool *sp = spp->sp;
55    assert(sp != NULL);
56    spp->c_read_id = spp->sp->read_reg.value;
57    spp->c_write_id = spp->sp->write_reg.value;
58    spp->c_size = spp->sp->size_reg.value;
59//    spp->c_read_id = sp_atomic_read_reg(&spp->sp->read_reg);
60//    spp->c_write_id = sp_atomic_read_reg(&spp->sp->write_reg);
61//    spp->c_size = sp_atomic_read_reg(&spp->sp->size_reg);
62}
63
64
65
66// **************************** generic queue based code
67bool sp_gen_queue_empty(uint64_t read, uint64_t write)
68{
69    return (read == write);
70}
71
72bool sp_gen_queue_full(uint64_t read, uint64_t write, uint64_t size)
73{
74    return (((write + 1) % size ) == read);
75}
76
77uint64_t sp_c_range_size(uint64_t start, uint64_t end, uint64_t size)
78{
79
80    // simple, non-wrapped space
81    if (start <= end) {
82        return (end - start);
83    }
84
85    // wrapped queue, so more complicated!
86    return ((size - start) + end);
87}
88
89
90// checks for (start <= value < end ) in circular queue of size "size"
91bool sp_c_between(uint64_t start, uint64_t value, uint64_t end, uint64_t size)
92{
93
94    // sanity check: value must be less than size
95    if (value >= size) {
96        return false;
97    }
98
99    // Logical queue empty state
100    if (start == end) {
101        if (start == value) {
102            return true;
103        }
104        return false;
105    }
106
107    // simple, non-wrapped space
108    if (start < end) {
109        if ((start <= value) && (value < end)) {
110            return true;
111        }
112        return false;
113    }
114
115    // wrapped space, more complicated
116    if ((value < end) || (start <= value)) {
117        return true;
118    }
119    return false;
120}
121
122// ******************* spp queue code for condition checking
123
124// 4 mfence
125uint64_t sp_get_read_index(struct shared_pool_private *spp)
126{
127    sp_reload_regs(spp);
128    return spp->c_read_id;
129}
130
131uint64_t sp_get_write_index(struct shared_pool_private *spp)
132{
133    sp_reload_regs(spp);
134    return spp->c_write_id;
135}
136
137uint64_t sp_get_queue_size(struct shared_pool_private *spp)
138{
139    sp_reload_regs(spp);
140    return spp->c_size;
141}
142
143
144// 0 mfence
145// Checks for queue empty condition
146bool sp_queue_empty(struct shared_pool_private *spp)
147{
148//    sp_reload_regs(spp);
149    return sp_gen_queue_empty(spp->c_read_id, spp->c_write_id);
150}
151
152
153// Check for queue full condition
154bool sp_queue_full(struct shared_pool_private *spp)
155{
156    return sp_gen_queue_full(spp->c_read_id, spp->c_write_id,
157            spp->c_size);
158}
159
160
161// Checks if given index is peekable or not
162bool sp_read_peekable_index(struct shared_pool_private *spp, uint64_t idx)
163{
164    sp_reload_regs(spp);
165    return sp_c_between(spp->c_read_id, idx, spp->c_write_id, spp->c_size);
166} // end function: sp_read_peekable_index
167
168
169// Checks if given index is settable for not for read_reg
170bool sp_validate_read_index(struct shared_pool_private *spp, uint64_t idx)
171{
172    sp_reload_regs(spp);
173    // Since sp_c_between only checks for value < end and we want <= end, we
174    // check this case manually here
175    if (idx == spp->c_write_id) {
176        return true;
177    }
178    return sp_c_between(spp->c_read_id, idx, spp->c_write_id, spp->c_size);
179}
180
181
182// Returns no. of elements available for consumption
183uint64_t sp_queue_elements_count(struct shared_pool_private *spp)
184{
185//    sp_reload_regs(spp);
186    return sp_c_range_size(spp->c_read_id, spp->c_write_id, spp->c_size);
187} // end function: sp_queue_elements_count
188
189// Checks if given index is write peekable or not
190bool sp_write_peekable_index(struct shared_pool_private *spp, uint64_t idx)
191{
192    sp_reload_regs(spp);
193
194    // Trivial case: index bigger than queue size
195    if (idx >= spp->c_size){
196        return false;
197    }
198
199    // Trivial case: queue empty
200    if (sp_queue_empty(spp)) {
201        return true;
202    }
203
204    return sp_c_between(spp->c_write_id, idx, spp->c_read_id, spp->c_size);
205} // end function: sp_write_peekable_index
206
207
208// Checks if given index is valid for write or not
209bool sp_validate_write_index(struct shared_pool_private *spp, uint64_t idx)
210{
211    return sp_write_peekable_index(spp, idx);
212} // end function: sp_validate_write_index
213
214// 4 mfence
215// Returns no. of free slots available for production
216uint64_t sp_queue_free_slots_count(struct shared_pool_private *spp)
217{
218    sp_reload_regs(spp);
219    if (sp_queue_empty(spp)) {
220        return spp->c_size;
221    }
222    return sp_c_range_size(spp->c_write_id, spp->c_read_id, spp->c_size);
223} // end function: sp_queue_free_slots_count
224
225
226// ************* Initialization functions ***********************
227
228static size_t calculate_shared_pool_size(uint64_t slot_no)
229{
230    return (sizeof(struct shared_pool) +
231                ((sizeof(union slot)) * (slot_no - TMP_SLOTS)));
232}
233
234// 4 mfence
235static void sp_reset_pool(struct shared_pool_private *spp, uint64_t slot_count)
236{
237    assert(spp != NULL);
238    struct shared_pool *sp = spp->sp;
239    assert(sp != NULL);
240    assert(slot_count > TMP_SLOTS);
241
242    int i = 0;
243
244    // Esure that slot_count is <= alloted_slots
245    assert(slot_count <= spp->alloted_slots);
246
247    sp_atomic_set_reg(&sp->read_reg, 0);
248    sp_atomic_set_reg(&sp->write_reg, 0);
249    sp_atomic_set_reg(&sp->size_reg, slot_count);
250    for(i = 0; i < slot_count; ++i)  {
251       memset(&sp->slot_list[i], 0, sizeof(union slot));
252    } // end for:
253
254    sp_reload_regs(spp);
255    spp->notify_other_side = 0;
256    spp->ghost_read_id = spp->c_read_id;
257    spp->ghost_write_id = spp->c_write_id;
258    spp->pre_write_id = spp->c_read_id;
259    spp->produce_counter = 0;
260    spp->consume_counter = 0;
261    spp->clear_counter = 0;
262#ifndef DISABLE_MFENCE
263    mfence();
264#endif
265} // sp_reset_pool
266
267
268// Creates a new shared_pool area and initializes it as creator
269struct shared_pool_private *sp_create_shared_pool(uint64_t slot_no,
270        uint8_t role)
271{
272
273    struct shared_pool_private *spp = (struct shared_pool_private *)
274                malloc(sizeof(struct shared_pool_private));
275    assert(spp != NULL);
276
277    errval_t err;
278    assert(slot_no > 2);
279
280    // adding 1 more slot for safety
281    size_t mem_size = calculate_shared_pool_size((slot_no));
282
283    // NOTE: using bulk create here because bulk_create code has
284    // been modified to suit the shared buffer allocation
285    // FIXME: code repetation with mem_barrelfish_alloc_and_register
286    struct bulk_transfer bt_sp;
287    err = bulk_create(mem_size, sizeof(union slot), &(spp->cap), &bt_sp);
288    if (err_is_fail(err)) {
289        DEBUG_ERR(err, "bulk_create failed.");
290        return NULL;
291    }
292    spp->va = bt_sp.mem;
293    spp->sp = (struct shared_pool *)spp->va;
294
295    struct frame_identity f;
296
297    err = frame_identify(spp->cap, &f);
298    if (err_is_fail(err)) {
299        DEBUG_ERR(err, "frame_identify failed");
300        return NULL;
301    }
302    spp->pa = f.base;
303    spp->mem_size = f.bytes;
304    spp->alloted_slots = slot_no;
305    spp->is_creator = true;
306    spp->role = role;
307
308    sp_reset_pool(spp, slot_no);
309    printf("Created shared_pool of size(Req %"PRIu64", Actual %"PRIu64") "
310            "with role [%"PRIu8"] and slots [%"PRIu64"]\n",
311            (uint64_t)mem_size, spp->mem_size, spp->role,
312            spp->alloted_slots);
313
314/*            printf("##### procon sizeof spp[%lu], sizeof sp[%lu]\n",
315                    sizeof(struct shared_pool_private),
316                    sizeof(struct shared_pool) );
317*/
318#ifndef DISABLE_MFENCE
319    mfence();
320#endif
321    return spp;
322} // end function: sp_create_shared_pool
323
324
325// Loads shared_pool area which is already created by some other creator
326errval_t sp_map_shared_pool(struct shared_pool_private *spp, struct capref cap,
327        uint64_t slot_no, uint8_t role)
328{
329    errval_t err = SYS_ERR_OK;
330    assert(spp != NULL);
331    assert(spp->sp == NULL);
332    assert(slot_no > 2);
333    spp->cap = cap;
334    spp->alloted_slots = slot_no;
335    spp->role = role;
336    spp->is_creator = 0;
337
338    struct frame_identity f;
339
340    err = frame_identify(cap, &f);
341    if (err_is_fail(err)) {
342        DEBUG_ERR(err, "frame_identify failed");
343        return err;
344    }
345    spp->pa = f.base;
346    spp->mem_size = f.bytes;
347    size_t mem_size = calculate_shared_pool_size(slot_no);
348
349    assert(spp->mem_size >= mem_size);
350
351    err = vspace_map_one_frame_attr(&spp->va, f.bytes, cap,
352                  VREGION_FLAGS_READ_WRITE_NOCACHE, NULL, NULL);
353
354    if (err_is_fail(err)) {
355        DEBUG_ERR(err, "vspace_map_one_frame failed");
356        return err;
357    }
358
359    spp->sp = (struct shared_pool *)spp->va;
360
361    sp_reload_regs(spp);
362    assert(spp->c_size == spp->alloted_slots);
363
364    spp->ghost_read_id = spp->c_read_id;
365    spp->ghost_write_id = spp->c_write_id;
366    spp->pre_write_id = spp->c_read_id;
367    spp->notify_other_side = 0;
368    spp->produce_counter = 0;
369    spp->consume_counter = 0;
370    spp->clear_counter = 0;
371
372    printf("Mapped shared_pool of size(R %"PRIu64", A %"PRIu64") "
373            "with role [%"PRIu8"], slots[%"PRIu64"] and pool len[%"PRIu64"]\n",
374            (uint64_t)mem_size, spp->mem_size, spp->role, spp->alloted_slots,
375            spp->c_size);
376
377#ifndef DISABLE_MFENCE
378    mfence();
379#endif
380    return SYS_ERR_OK;
381
382} // end function: sp_map_shared_pool
383
384
385// *************************** State modifying functions *************
386static bool validate_slot(struct slot_data *d)
387{
388    if (d == NULL) {
389        return false;
390    }
391
392    // FIXME: check if the buffer_id, pbuff_id, len and all are sensible!
393    return true;
394} // end function: validate_slot
395
396void copy_data_into_slot(struct shared_pool_private *spp, uint64_t buf_id,
397        uint64_t id, uint64_t offset, uint64_t len, uint64_t no_pbufs,
398        uint64_t client_data, uint64_t ts)
399{
400    assert(id < spp->c_size);
401    spp->sp->slot_list[id].d.buffer_id = buf_id;
402    spp->sp->slot_list[id].d.no_pbufs = no_pbufs;
403    spp->sp->slot_list[id].d.pbuf_id = id;
404    spp->sp->slot_list[id].d.offset = offset;
405    spp->sp->slot_list[id].d.len = len;
406    spp->sp->slot_list[id].d.client_data = client_data;
407    spp->sp->slot_list[id].d.ts = ts;
408
409#ifndef DISABLE_MFENCE
410    mfence();
411#endif
412    // copy the s into shared_pool
413#if 0
414#if !defined(__i386__)
415    cache_flush_range(&spp->sp->slot_list[id], SLOT_SIZE);
416#endif // !defined(__i386__)
417#endif // 0
418}
419
420void sp_copy_slot_data(struct slot_data *d, struct slot_data *s)
421{
422    assert(d != NULL);
423    assert(s != NULL);
424    *d = *s;
425    /*
426    d->buffer_id = s->buffer_id;
427    d->pbuf_id = s->pbuf_id;
428    d->offset = s->offset;
429    d->len = s->len;
430    d->no_pbufs = s->no_pbufs;
431    d->client_data = s->client_data;
432    d->ts = s->ts;
433#ifndef DISABLE_MFENCE
434    mfence();
435#endif
436*/
437}
438
439void sp_copy_slot_data_from_index(struct shared_pool_private *spp,
440        uint64_t idx, struct slot_data *d)
441{
442    sp_copy_slot_data(d, &spp->sp->slot_list[idx].d);
443} // end function: sp_copy_slot_data_index
444
445
446// Set the value of read index
447// To be used with sp_read_peek_slot
448bool sp_set_read_index(struct shared_pool_private *spp, uint64_t idx)
449{
450
451    sp_reload_regs(spp);
452    // Trivial case:
453    if (spp->c_read_id == idx) {
454        return true;
455    }
456
457    if (!sp_validate_read_index(spp, idx)) {
458        // The value in index is invalid!
459        return false;
460    }
461
462    if (sp_queue_full(spp)) {
463        // Producer is assuming that there is no free space in this pool.
464        // As we have created some free space by reading, we should inform
465        // the producer to produce more!
466        // Typically means, I am slow!
467        ++spp->notify_other_side;
468    }
469
470    sp_atomic_set_reg(&spp->sp->read_reg, idx);
471    sp_reload_regs(spp);
472
473//    spp->ghost_read_id = spp->c_read_id;
474//    printf("changing read_index!\n");
475    if (sp_queue_empty(spp)) {
476        // There is nothing more to consume,
477        // We should inform producer to produce quickly
478        // Typically means, Producer is slow!
479        ++spp->notify_other_side;
480    }
481
482    ++spp->consume_counter;
483    return true;
484} // end function: sp_set_read_index
485
486
487// 9 mfence
488// Set the value of write index
489// To be used with sp_ghost_produce_slot
490bool sp_set_write_index(struct shared_pool_private *spp, uint64_t idx)
491{
492    sp_reload_regs(spp);
493
494    // Trivial case:
495    if (spp->c_write_id  == idx) {
496        return true;
497    }
498
499    if (!sp_validate_write_index(spp, idx)) {
500        // The value in index is invalid!
501        return false;
502    }
503
504    if (sp_queue_empty(spp)) {
505        // Consumer is assuming that there is no data in the pool
506        // As we have created new data, we should inform
507        // the consumer to consume more!
508        // Typically means, I am slow!
509        ++spp->notify_other_side;
510    }
511
512    sp_atomic_set_reg(&spp->sp->write_reg, idx);
513    sp_reload_regs(spp);
514//    spp->ghost_write_id = spp->c_write_id;
515    if (sp_queue_elements_count(spp) <= 1) {
516        ++spp->notify_other_side;
517    }
518
519    if (sp_queue_full(spp)) {
520        // There no free space left to create new items.
521        // We should inform the consumer that it is slow!
522        // Typically means, consumer is slow!
523        ++spp->notify_other_side;
524    }
525
526    ++spp->produce_counter;
527    return true;
528} // end function: sp_set_write_index
529
530bool sp_increment_write_index(struct shared_pool_private *spp)
531{
532    sp_reload_regs(spp);
533    uint64_t idx = ((spp->c_write_id + 1) % spp->c_size);
534
535    if (sp_queue_empty(spp)) {
536        // Consumer is assuming that there is no data in the pool
537        // As we have created new data, we should inform
538        // the consumer to consume more!
539        // Typically means, I am slow!
540        ++spp->notify_other_side;
541    }
542
543
544    sp_atomic_set_reg(&spp->sp->write_reg, idx);
545    spp->c_write_id = idx;
546
547     if (sp_queue_full(spp)) {
548        // There no free space left to create new items.
549        // We should inform the consumer that it is slow!
550        // Typically means, consumer is slow!
551        ++spp->notify_other_side;
552    }
553
554    ++spp->produce_counter;
555    return true;
556} // end function: sp_increment_write_index
557
558
559uint64_t sp_is_slot_clear(struct shared_pool_private *spp, uint64_t id)
560{
561    sp_reload_regs(spp);
562    if (!sp_queue_empty(spp)) {
563        if (!sp_c_between(spp->c_write_id, id, spp->c_read_id, spp->c_size)) {
564            sp_print_metadata(spp);
565            printf("failed for id %"PRIu64"\n", id);
566/*
567            printf("callstack: %p %p %p %p\n",
568	         __builtin_return_address(0),
569	         __builtin_return_address(1),
570	         __builtin_return_address(2),
571	         __builtin_return_address(3));
572*/
573        }
574        if (!sp_c_between(spp->c_write_id, id, spp->c_read_id, spp->c_size)) {
575            printf("sp_is_slot_clear failed: "
576                    " (%"PRIu64", %"PRIu64", %"PRIu64") S %"PRIu64"\n",
577                    spp->c_write_id, id, spp->c_read_id, spp->c_size);
578//            abort();
579        }
580
581    }
582    /*
583    else {
584        // queue empty!
585        if (id == spp->c_write_id) {
586            sp_print_metadata(spp);
587            printf("failed for id %"PRIu64"\n", id);
588            printf("callstack: %p %p %p %p\n",
589	         __builtin_return_address(0),
590	         __builtin_return_address(1),
591	         __builtin_return_address(2),
592	         __builtin_return_address(3));
593        }
594        assert(id != spp->c_write_id);
595    }
596    */
597    return spp->sp->slot_list[id].d.client_data;
598}
599
600bool sp_clear_slot(struct shared_pool_private *spp, struct slot_data *d,
601        uint64_t id)
602{
603    sp_reload_regs(spp);
604
605    if (sp_queue_full(spp)) {
606        return false;
607    }
608
609    if (sp_queue_empty(spp) ||
610          sp_c_between(spp->c_write_id, id, spp->c_read_id, spp->c_size)) {
611
612        sp_copy_slot_data(d, &spp->sp->slot_list[id].d);
613        spp->pre_write_id = id;
614//      printf("%s Slot %p with id %"PRIu64" is cleared and had "
615//           "%"PRIu64", %"PRIu64"\n",
616//            disp_name(), &spp->sp->slot_list[id].d,
617//            id, spp->sp->slot_list[id].d.client_data, d->client_data);
618
619        spp->sp->slot_list[id].d.client_data = 0;
620        ++spp->clear_counter;
621        return true;
622    }
623
624    return false;
625} // end function: sp_clear_slot
626
627bool validate_and_empty_produce_slot(struct shared_pool_private *spp,
628        uint64_t produced_slot_id)
629{
630    sp_reload_regs(spp);
631
632    if (sp_queue_full(spp)) {
633        return false;
634    }
635
636    uint64_t wi = spp->c_write_id;
637    assert(spp->c_write_id == produced_slot_id);
638    // If needed, mark the slot as produced
639    if(!sp_set_write_index(spp, ((wi + 1) % spp->c_size))) {
640        printf("ERROR: validate_and_empty_produce_slot: sp_set_write_index "
641                "failed\n");
642        abort();
643    }
644    return true;
645} // end function: validate_and_empty_produce_slot
646
647
648// Adds the data from parameter d into appropriate slot of shared pool queue
649bool sp_produce_slot(struct shared_pool_private *spp, struct slot_data *d)
650{
651
652    sp_reload_regs(spp);
653
654    if (sp_queue_full(spp)) {
655        return false;
656    }
657
658    uint64_t wi = spp->c_write_id;
659    sp_copy_slot_data(&spp->sp->slot_list[wi].d, d);
660
661#if 0
662#if !defined(__i386__)
663        cache_flush_range(&spp->sp->slot_list[wi], SLOT_SIZE);
664#endif // !defined(__i386__)
665#endif // 0
666
667    // Incrementing write pointer
668    if(!sp_set_write_index(spp, ((wi + 1) % spp->c_size))) {
669        printf("ERROR: sp_produce_slot: sp_set_write_index failed\n");
670        abort();
671    }
672    return true;
673} // end function: sp_produce_slot
674
675
676// 9 mfence
677// Gost-add data into shared_pool
678// Add data into free slots, but don't increment write index
679// This allows adding multiple slots and then atomically increment write index
680bool sp_ghost_produce_slot(struct shared_pool_private *spp,
681        struct slot_data *d, uint64_t idx)
682{
683    sp_reload_regs(spp);
684
685    // Make sure that slot provided is proper
686    assert(d != NULL);
687
688    if (sp_queue_full(spp)) {
689//        printf("sp_ghost_produce_slot: queue full\n");
690        return false;
691    }
692
693    // Check if the requested peak is valid or not
694    if (!sp_write_peekable_index(spp, idx))
695    {
696        return false;
697    }
698
699    sp_copy_slot_data(&spp->sp->slot_list[idx].d, d);
700#if 0
701#if !defined(__i386__)
702        cache_flush_range(&spp->sp->slot_list[idx], SLOT_SIZE);
703#endif // !defined(__i386__)
704#endif // 0
705    // Incrementing write pointer
706    spp->ghost_write_id = (idx + 1) % spp->c_size;
707    /*
708    printf("ghost produce slot, producing for %"PRIu64", val %"PRIu64"\n",
709            idx, d->client_data);
710   sp_print_slot(&spp->sp->slot_list[idx].d);
711   */
712    return true;
713} // end function: sp_produce_slot
714
715// Reads the slot without changing the read pointer, instead changes the local
716// ghost_read_id to know how much is read.
717// To bu used by driver when it adds the packet in hardware queue for sending
718// but the packet is not yet sent.
719// when packet is actually done, then read pointer can be changed.
720bool sp_ghost_read_slot(struct shared_pool_private *spp, struct slot_data *dst)
721{
722    sp_reload_regs(spp);
723
724    // Make sure that slot provided is proper
725    assert(dst != NULL);
726
727    // Make sure that there is slot available for consumption
728    if (sp_queue_empty(spp)) {
729        return false;
730    }
731
732    // Check if the requested peak is valid or not
733    if (!sp_read_peekable_index(spp, spp->ghost_read_id))
734    {
735        return false;
736    }
737
738    //  Copying the slot data contents into provided slot
739/*
740#if !defined(__i386__)
741        cache_flush_range(&spp->sp->slot_list[spp->ghost_read_id], SLOT_SIZE);
742#endif // !defined(__i386__)
743*/
744    sp_copy_slot_data(dst, &spp->sp->slot_list[spp->ghost_read_id].d);
745/*    printf("After copying data from id %"PRIu64"\n", spp->ghost_read_id);
746    sp_print_slot(&spp->sp->slot_list[spp->ghost_read_id].d);
747*/
748    spp->ghost_read_id = (spp->ghost_read_id + 1) % spp->c_size;
749    return true;
750} // end function: sp_read_peak_slot
751
752
753
754// FIXME: not used, may be it should be removed
755bool sp_ghost_read_confirm(struct shared_pool_private *spp)
756{
757    return (sp_set_read_index(spp, spp->ghost_read_id));
758}
759
760// swaps the slot provided in parameter d with next available slot for
761// consumption.
762// TO be used by application to receive packet and register new pbuf
763// at same time.
764bool sp_replace_slot(struct shared_pool_private *spp, struct slot_data *new_slot)
765{
766    sp_reload_regs(spp);
767
768    // Make sure that slot provided is proper
769    if (!validate_slot(new_slot)) {
770        return false;
771    }
772
773    // Make sure that there is slot available for consumption
774    if (sp_queue_empty(spp)) {
775        return false;
776    }
777
778    uint64_t ri = spp->c_read_id;
779    // swapping the slot_data contents between ri and new_slot
780    struct slot_data tmp;
781#if 0
782#if !defined(__i386__)
783        cache_flush_range(&spp->sp->slot_list[ri], SLOT_SIZE);
784#endif // !defined(__i386__)
785#endif // 0
786    sp_copy_slot_data(&tmp, &spp->sp->slot_list[ri].d);
787    sp_copy_slot_data(&spp->sp->slot_list[ri].d, new_slot);
788    sp_copy_slot_data(new_slot, &tmp);
789#if 0
790#if !defined(__i386__)
791        cache_flush_range(&spp->sp->slot_list[ri], SLOT_SIZE);
792#endif // !defined(__i386__)
793#endif // 0
794    // Incrementing read index
795    if(!sp_set_read_index(spp, ((ri + 1) % spp->c_size))) {
796        printf("sp_set_read_index failed\n");
797        sp_print_metadata(spp);
798        abort();
799    }
800    return true;
801} // end function: sp_consume_slot
802
803
804// ****************** For debugging purposes **************
805void sp_print_metadata(struct shared_pool_private *spp)
806{
807    assert(spp != NULL);
808//    sp_reload_regs(spp);
809/*    printf("SPP Q C[%"PRIu8"], R[%"PRIu8"], GRI[%"PRIu64"], GWI[%"PRIu64"] "
810            "pre_write_id[%"PRIu64"]\n",
811            spp->is_creator?1:0, spp->role,
812            spp->ghost_read_id, spp->ghost_write_id, spp->pre_write_id);
813*/
814    printf("SPP S PRO[%"PRIu64"],  CON[%"PRIu64"], CLEAR[%"PRIu64"]\n",
815            spp->produce_counter, spp->consume_counter, spp->clear_counter);
816    printf("SPP S C C-R[%"PRIu64"],  C-W[%"PRIu64"] C-S[%"PRIu64"]\n",
817            spp->c_read_id, spp->c_write_id, spp->c_size);
818
819    struct shared_pool *sp = spp->sp;
820    assert(sp != NULL);
821/*
822    printf("SP Q len[%"PRIu64"], RI[%"PRIu64"], WI[%"PRIu64"], elem[%"PRIu64"]"
823            " free[%"PRIu64"]\n",
824            sp->size_reg.value, sp->read_reg.value, sp->write_reg.value,
825            sp_queue_elements_count(spp),
826            sp_queue_free_slots_count(spp));
827*/
828}
829
830
831void sp_print_slot(struct slot_data *d)
832{
833    printf("@%p, buf[%"PRIu64"], pbuf_id[%"PRIu64"], offset[%"PRIu64"], "
834            "len[%"PRIu64"], n_p[%"PRIu64"], CL[%"PRIu64"], ts[%"PRIu64"]\n",
835            d, d->buffer_id, d->pbuf_id, d->offset, d->len,
836            d->no_pbufs, d->client_data, d->ts);
837}
838
839// Code for testing and debugging the library
840void sp_print_pool(struct shared_pool_private *spp)
841{
842    sp_reload_regs(spp);
843    assert(spp != NULL);
844    struct shared_pool *sp = spp->sp;
845    assert(sp != NULL);
846
847    uint64_t queue_size = sp->size_reg.value;
848    sp_print_metadata(spp);
849    int i = 0;
850    for(i = 0; i < queue_size; ++i)  {
851        sp_print_slot(&sp->slot_list[i].d);
852    }
853}
854
855