1/**
2 * \file
3 * \brief Implementation of a synchronous locking API using the octopus Interface.
4 */
5
6/*
7 * Copyright (c) 2011, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <barrelfish/barrelfish.h>
16#include <barrelfish/threads.h>
17
18#include <if/octopus_defs.h>
19#include <if/octopus_thc.h>
20
21#include <octopus/init.h>
22#include <octopus/lock.h>
23#include <octopus/getset.h>
24#include <octopus/trigger.h>
25
26#include "common.h"
27
28/**
29 * \brief Synchronous locking function.
30 *
31 * The lock function will create a new record based on the lock name
32 * using sequential set mode. This means we create a queue of records
33 * with all parties that want to acquire the lock. The lock owner
34 * is the one with the lowest sequence appended to its name.
35 * Once the lock owner deletes its lock_record the next client in
36 * the queue is notified through triggers.
37 *
38 * \note Once a client holds the lock it can be released using oct_unlock.
39 *
40 * \param[in] lock_name Name to identify the lock.
41 * \param[out] lock_record Your current lock record in the queue.
42 * Client needs to free this.
43 */
44errval_t oct_lock(const char* lock_name, char** lock_record)
45{
46    assert(lock_name != NULL);
47
48    errval_t err = SYS_ERR_OK;
49    errval_t exist_err;
50    char** names = NULL;
51    char* name = NULL;
52    size_t len = 0;
53    size_t i = 0;
54    bool found = false;
55    uint64_t mode = 0;
56    octopus_trigger_id_t tid;
57    octopus_trigger_t t = oct_mktrigger(SYS_ERR_OK, octopus_BINDING_RPC,
58            OCT_ON_DEL, NULL, NULL);
59
60    err = oct_set_get(SET_SEQUENTIAL, lock_record, "%s_ { lock: '%s' }",
61            lock_name, lock_name);
62    if (err_is_fail(err)) {
63        goto out;
64    }
65    /// XXX why is there a strdup ?
66    *lock_record = strdup(*lock_record);
67    err = oct_read(*lock_record, "%s", &name);
68    if (err_is_fail(err)) {
69        goto out;
70    }
71
72    while (true) {
73        err = oct_get_names(&names, &len, "_ { lock: '%s' }", lock_name);
74        if (err_is_fail(err)) {
75            goto out;
76        }
77
78        //debug_printf("lock queue:\n");
79        found = false;
80        for (i=0; i < len; i++) {
81            //debug_printf("%s\n", names[i]);
82            if (strcmp(names[i], name) == 0) {
83                found = true;
84                break;
85            }
86        }
87        assert(found);
88
89        if (i == 0) {
90            // We are the lock owner
91            goto out;
92        }
93        else {
94            // Someone else holds the lock
95            struct octopus_thc_client_binding_t* cl = oct_get_thc_client();
96            //printf("%s:%s:%d: does names[i-1] = %s exists\n",
97            //       __FILE__, __FUNCTION__, __LINE__, names[i-1]);
98            err = cl->call_seq.exists(cl, names[i-1], t, &tid, &exist_err);
99            if (err_is_fail(err)) {
100                goto out;
101            }
102
103            if (err_is_ok(exist_err)) {
104                err = cl->recv.trigger(cl, &tid, NULL, &mode, NULL, NULL);
105                assert(err_is_ok(err));
106                assert(mode & OCT_REMOVED);
107            }
108            else if (err_no(exist_err) != OCT_ERR_NO_RECORD) {
109                err = exist_err;
110                goto out;
111            }
112        }
113
114        // If we've come here our predecessor deleted his record;
115        // need to re-check that we are really the lock owner now
116
117        oct_free_names(names, len);
118        names = NULL;
119        len = 0;
120    }
121
122
123out:
124    oct_free_names(names, len);
125    free(name);
126    return err;
127}
128
129/**
130 * \brief Synchronous unlock function.
131 *
132 * Deletes the given lock_record in on the server.
133 *
134 * \param[in] lock_record Record provided by oct_lock.
135 */
136errval_t oct_unlock(const char* lock_record)
137{
138    assert(lock_record != NULL);
139    errval_t err = SYS_ERR_OK;
140    char* name = NULL;
141
142    err = oct_read(lock_record, "%s", &name);
143    if (err_is_ok(err)) {
144        err = oct_del(name);
145    }
146    //debug_printf("unlocking: %s\n", name);
147
148    free(name);
149    return err;
150}
151