1/**
2 * \file
3 * \brief RPC implementation
4 */
5
6/*
7 * Copyright (c) 2008, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <nfs/xdr.h>
16#include <assert.h>
17#include <net_sockets/net_sockets.h>
18
19#include <barrelfish/barrelfish.h>
20#include <bench/bench.h>
21#include <netbench/netbench.h>
22
23// XXX: kludge making it possible to use bench_tsc without -lbench
24#if defined(__i386__) || defined(__x86_64__)
25bool rdtscp_flag;
26#endif
27
28#define FALSE   false
29#define TRUE    true
30
31#include "rpc.h"
32#include "rpc_debug.h"
33#include "xdr_pbuf.h"
34
35/// RPC authentication flavour
36enum rpc_auth_flavor {
37    RPC_AUTH_NULL       = 0,
38    RPC_AUTH_UNIX       = 1,
39    RPC_AUTH_SHORT      = 2,
40    RPC_AUTH_DES        = 3
41};
42
43/// RPC message type
44enum rpc_msg_type {
45    RPC_CALL  = 0,
46    RPC_REPLY = 1
47};
48
49//#define RPC_TIMER_PERIOD (5000 * 1000) ///< Time between RPC timer firings (us)
50#define RPC_TIMER_PERIOD (200 * 1000) ///< Time between RPC timer firings (us)
51#define RPC_RETRANSMIT_AFTER 3   ///< Number of timer firings before a retransmit
52#define RPC_MAX_RETRANSMIT  60  ///< Max number of retransmissions before giving up
53
54/* XXX: hardcoded data for authentication info */
55#define AUTH_MACHINE_NAME       "barrelfish"
56#define AUTH_MACHINE_NAME_LEN   10 /* XXX: must match string above */
57#define AUTH_UID    0
58#define AUTH_GID    0
59#define AUTH_SIZE   (RNDUP(AUTH_MACHINE_NAME_LEN) + BYTES_PER_XDR_UNIT * 5)
60
61/// bytes needed for full RPC call header
62#define RPC_CALL_HEADER_LEN (10 * BYTES_PER_XDR_UNIT + AUTH_SIZE)
63
64static uint8_t net_debug_state = 0;
65
66static int hash_function(uint32_t xid)
67{
68    return (xid % RPC_HTABLE_SIZE);
69}
70
71/// Data for an outstanding (unreplied) RPC call
72struct rpc_call {
73    uint32_t xid;               ///< Transaction ID (XID)
74    uint16_t timers, retries;   ///< Number of timer expiries and retries
75    uint8_t *data;              ///< Pointer for packet data
76    size_t size;
77    rpc_callback_t callback;    ///< Callback function pointer
78    void *cbarg1, *cbarg2;      ///< Callback function opaque arguments
79    struct rpc_call *next;      ///< Next call in queue
80};
81
82/// Utility function to prepare an outgoing packet buffer with the RPC call header
83static errval_t rpc_call_init(XDR *xdr, uint32_t xid, uint32_t prog, uint32_t vers,
84                           uint32_t proc)
85{
86    int32_t *buf;
87    bool rb;
88
89    /* reserve space for the first part of the header */
90    if ((buf = XDR_INLINE(xdr, 9 * BYTES_PER_XDR_UNIT)) == NULL) {
91        return LWIP_ERR_BUF;
92    }
93
94    // XID comes first
95    IXDR_PUT_UINT32(buf, xid);
96
97    // message type: call
98    IXDR_PUT_UINT32(buf, RPC_CALL);
99
100    // RPC version
101    IXDR_PUT_UINT32(buf, 2);
102
103    // program number, version number, procedure number
104    IXDR_PUT_UINT32(buf, prog);
105    IXDR_PUT_UINT32(buf, vers);
106    IXDR_PUT_UINT32(buf, proc);
107
108    // CRED: auth_unix (hardcoded)
109    IXDR_PUT_UINT32(buf, RPC_AUTH_UNIX);
110    IXDR_PUT_UINT32(buf, AUTH_SIZE);
111    IXDR_PUT_UINT32(buf, 0); // stamp
112
113    /* here endeth the reserved space. now the machine name string */
114    char *machname = AUTH_MACHINE_NAME;
115    rb = xdr_string(xdr, &machname, AUTH_MACHINE_NAME_LEN);
116    if (!rb) {
117        return LWIP_ERR_BUF;
118    }
119
120    /* reserve some more space for the rest, which is done inline again */
121    if ((buf = XDR_INLINE(xdr, 5 * BYTES_PER_XDR_UNIT)) == NULL) {
122        return LWIP_ERR_BUF;
123    }
124
125    // Rest of the CRED
126    IXDR_PUT_UINT32(buf, AUTH_UID);
127    IXDR_PUT_UINT32(buf, AUTH_GID);
128    IXDR_PUT_UINT32(buf, 0); // GIDs
129
130    // VERF: auth_null
131    IXDR_PUT_UINT32(buf, RPC_AUTH_NULL);
132    IXDR_PUT_UINT32(buf, 0);
133
134    return SYS_ERR_OK;
135}
136
137/// Utility function to skip over variable-sized authentication data in a reply
138static errval_t xdr_skip_auth(XDR *xdr)
139{
140    int32_t *buf;
141
142    if ((buf = XDR_INLINE(xdr, 2 * BYTES_PER_XDR_UNIT)) == NULL) {
143        return LWIP_ERR_BUF;
144    }
145
146    (void) IXDR_GET_UINT32(buf); // skip auth flavour
147
148    size_t auth_size = IXDR_GET_UINT32(buf);
149
150    /* skip over auth data bytes */
151    if (auth_size > 0) {
152        buf = XDR_INLINE(xdr, auth_size);
153        if (buf == NULL) {
154            return LWIP_ERR_BUF;
155        }
156    }
157
158    return SYS_ERR_OK;
159}
160
161/// Generic handler for all incoming RPC messages. Finds the appropriate call
162/// instance, checks arguments, and notifies the callback.
163static void rpc_recv_handler(void *user_state, struct net_socket *socket,
164    void *data, size_t size, struct in_addr ip_address, uint16_t port)
165{
166
167//    uint64_t ts = rdtsc();
168    uint32_t replystat, acceptstat;
169    XDR xdr;
170    errval_t r;
171    bool rb;
172    struct rpc_client *client = user_state;
173    struct rpc_call *call = NULL;
174
175    xdr_create_recv(&xdr, data, size);
176
177    int32_t *buf;
178    if ((buf = XDR_INLINE(&xdr, 3 * BYTES_PER_XDR_UNIT)) == NULL) {
179        fprintf(stderr, "RPC: packet too small, dropped\n");
180        goto out;
181    }
182
183    // XID comes first
184    uint32_t xid = IXDR_GET_UINT32(buf);
185    // message type
186    uint32_t msgtype = IXDR_GET_UINT32(buf);
187    if (msgtype != RPC_REPLY) {
188        fprintf(stderr, "RPC: Received non-reply message, dropped\n");
189        goto out;
190    }
191    RPC_DEBUGP("rpc_recv_call: RPC callback for xid %u x0%x\n", xid, xid);
192
193
194    int hid = hash_function(xid);
195    struct rpc_call *hash_list = client->call_hash[hid];
196    // find matching call and dequeue it
197    struct rpc_call *prev = NULL;
198    for (call = hash_list; call && call->xid != xid; call = call->next){
199        prev = call;
200    }
201    if (call == NULL) {
202        fprintf(stderr, "RPC: Unknown XID 0x%" PRIx32 " in reply, dropped\n", xid);
203/*        fprintf(stderr, "RPC:[%d:%s] Unknown XID 0x%x in reply, dropped\n",
204                disp_get_domain_id(), disp_name(), xid);
205*/
206        goto out;
207    } else if (prev == NULL) {
208    	client->call_hash[hid] = call->next;
209    } else {
210        prev->next = call->next;
211    }
212
213    replystat = IXDR_GET_UINT32(buf);
214    if (replystat == RPC_MSG_ACCEPTED) {
215        r = xdr_skip_auth(&xdr);
216        if (r != SYS_ERR_OK) {
217            fprintf(stderr, "RPC: Error in incoming auth data, dropped\n");
218            goto out;
219        }
220
221        rb = xdr_uint32_t(&xdr, &acceptstat);
222        if (!rb) {
223            fprintf(stderr, "RPC, Error decoding accept status, dropped\n");
224            goto out;
225        }
226    } else {
227        acceptstat = -1;
228    }
229
230    call->callback(client, call->cbarg1, call->cbarg2, replystat, acceptstat,
231                   &xdr);
232
233out:
234    if (call != NULL) {
235        // We got reply, so there is not need for keeping TX packet saved
236        // here for retransmission.  Lets free it up.
237        net_free(call->data);
238        free(call);
239    }
240}
241
242static void traverse_hash_bucket(int hid, struct rpc_client *client)
243{
244    struct rpc_call *call, *next, *prev = NULL;
245
246    for (call = client->call_hash[hid]; call != NULL; call = next) {
247        next = call->next;
248        bool freed_call = false;
249        if (++call->timers >= RPC_RETRANSMIT_AFTER) {
250            if (call->retries++ == RPC_MAX_RETRANSMIT) {
251                /* admit failure */
252                printf("##### [%d][%"PRIuDOMAINID"] "
253                       "RPC: timeout for XID 0x%"PRIu32"\n",
254                       disp_get_core_id(), disp_get_domain_id(), call->xid);
255                free(call->data);
256                if (prev == NULL) {
257                    client->call_hash[hid] = call->next;
258                } else {
259                    prev->next = call->next;
260                }
261                call->callback(client, call->cbarg1, call->cbarg2, -1, -1, NULL);
262                free(call);
263                freed_call = true;
264            } else {
265                /*
266                if(net_debug_state == 0) {
267                    net_debug_state = 1;
268                    printf("starting the debug in network driver\n");
269                    lwip_benchmark_control(0, BMS_START_REQUEST,
270                            0, rdtsc());
271                    lwip_benchmark_control(1, BMS_START_REQUEST,
272                            0, rdtsc());
273                } else {
274                    printf("already started the debug in network driver\n");
275                }
276                */
277
278                /* retransmit */
279                RPC_DEBUGP("###### [%d][%"PRIuDOMAINID"] "
280                       "RPC: retransmit XID 0x%"PRIu32"\n",
281                       disp_get_core_id(), disp_get_domain_id(), call->xid);
282
283                // throw away (hide) UDP/IP/ARP headers from previous transmission
284                // err_t e = pbuf_header(call->pbuf,
285                //                       -UDP_HLEN - IP_HLEN - PBUF_LINK_HLEN);
286                // assert(e == SYS_ERR_OK);
287
288                errval_t e = net_send_to(client->socket, call->data, call->size, client->connected_address, client->connected_port);
289                if (e != SYS_ERR_OK) {
290                    /* XXX: assume that this is a transient condition, retry */
291                    fprintf(stderr, "RPC: retransmit failed! will retry...\n");
292                    call->timers--;
293                } else {
294                    call->timers = 0;
295                }
296            }
297        }
298        if (!freed_call) {
299            prev = call;
300        }
301    } /* end for: */
302}
303
304/// Timer callback: walk the queue of pending calls, and retransmit/expire them
305static void rpc_timer(void *arg)
306{
307    struct rpc_client *client = arg;
308    RPC_DEBUGP("rpc_timer fired\n");
309    for (int i = 0; i < RPC_HTABLE_SIZE; ++i) {
310    	if (client->call_hash[i] != NULL) {
311            traverse_hash_bucket(i, client);
312    	}
313    }
314}
315
316
317/**
318 * \brief Initialise a new RPC client instance
319 *
320 * \param client Pointer to memory for RPC client data, to be initialised
321 * \param server IP address of server to be called
322 *
323 * \returns Error code (SYS_ERR_OK on success)
324 */
325errval_t rpc_init(struct rpc_client *client, struct in_addr server)
326{
327    errval_t err;
328
329    client->socket = net_udp_socket();
330    assert(client->socket);
331    net_set_user_state(client->socket, client);
332    net_set_on_received(client->socket, rpc_recv_handler);
333
334    net_debug_state = 0;
335
336    client->server = server;
337    client->connected_address.s_addr = INADDR_NONE;
338    client->connected_port = 0;
339
340    for (int i = 0; i < RPC_HTABLE_SIZE; ++i) {
341    	client->call_hash[i] = NULL;
342    }
343
344    /* XXX: (very) pseudo-random number for initial XID */
345    client->nextxid = (uint32_t)bench_tsc();
346
347    RPC_DEBUGP("###### Initial sequence no. is %"PRIu32" 0x%"PRIx32"\n",
348    		client->nextxid, client->nextxid);
349    err = periodic_event_create(&client->timer, get_default_waitset(),
350                                RPC_TIMER_PERIOD, MKCLOSURE(rpc_timer, client));
351    assert(err_is_ok(err));
352    if (err_is_fail(err)) {
353    	printf("rpc timer creation failed\n");
354        net_close(client->socket);
355        return LWIP_ERR_MEM;
356    }
357    RPC_DEBUGP("rpc timer created\n");
358
359    return SYS_ERR_OK;
360}
361
362
363
364/**
365 * \brief Initiate an RPC Call
366 *
367 * \param client RPC client, previously initialised by a call to rpc_init()
368 * \param port UDP port on server to call
369 * \param prog RPC program number
370 * \param vers RPC program version
371 * \param proc RPC procedure number
372 * \param args_xdrproc XDR serialisation function for arguments to call
373 * \param args Argument data to be passed to #args_xdrproc
374 * \param args_size Upper bound on size of serialised argument data
375 * \param callback Callback function to be invoked when call either completes or fails
376 * \param cbarg1,cbarg2 Opaque arguments to be passed to callback function
377 *
378 * \returns Error code (SYS_ERR_OK on success)
379 */
380errval_t rpc_call(struct rpc_client *client, uint16_t port, uint32_t prog,
381               uint32_t vers, uint32_t proc, xdrproc_t args_xdrproc, void *args,
382               size_t args_size, rpc_callback_t callback, void *cbarg1,
383               void *cbarg2)
384{
385
386    XDR xdr;
387    errval_t r;
388    bool rb;
389    uint32_t xid;
390
391    RPC_DEBUGP("rpc_call:  calling xdr_create_send\n");
392    rb = xdr_create_send(&xdr, args_size + RPC_CALL_HEADER_LEN);
393    if (!rb) {
394        return LWIP_ERR_MEM;
395    }
396
397    xid = client->nextxid++;
398
399    RPC_DEBUGP("rpc_call: calling rpc_call_init\n");
400    r = rpc_call_init(&xdr, xid, prog, vers, proc);
401    if (r != SYS_ERR_OK) {
402        XDR_DESTROY(&xdr);
403        return r;
404    }
405    RPC_DEBUGP("rpc_call: rpc_call_init done\n");
406
407    rb = args_xdrproc(&xdr, args);
408    if (!rb) {
409        XDR_DESTROY(&xdr);
410        return LWIP_ERR_BUF;
411    }
412
413    struct rpc_call *call = malloc(sizeof(struct rpc_call));
414    if (call == NULL) {
415        XDR_DESTROY(&xdr);
416        return LWIP_ERR_MEM;
417    }
418    call->xid = xid;
419    call->retries = call->timers = 0;
420    call->data = xdr.x_private;
421    call->size = xdr.size;
422    call->callback = callback;
423    call->cbarg1 = cbarg1;
424    call->cbarg2 = cbarg2;
425    call->next = NULL;
426
427    RPC_DEBUGP("rpc_call: RPC call for xid %u x0%x\n", xid, xid);
428    RPC_DEBUGP("rpc_call: calling UPD_connect\n");
429    client->connected_address = client->server;
430    client->connected_port = port;
431
432    /* enqueue */
433    int hid = hash_function(xid);
434    call->next = client->call_hash[hid];
435    client->call_hash[hid] = call;
436
437    RPC_DEBUGP("rpc_call: calling UPD_send\n");
438    r = net_send_to(client->socket, call->data, call->size, client->connected_address, client->connected_port);
439    if (r != SYS_ERR_OK) {
440        /* dequeue */
441        assert(client->call_hash[hid] == call);
442        client->call_hash[hid] = call->next;
443        /* destroy */
444        XDR_DESTROY(&xdr);
445        free(call);
446    }
447
448    RPC_DEBUGP("rpc_call: rpc_call done\n");
449    return r;
450}
451
452/// Destroy the given client, freeing any associated memory
453void rpc_destroy(struct rpc_client *client)
454{
455    periodic_event_cancel(&client->timer);
456
457    /* go through list of pending requests and free them */
458    struct rpc_call *call, *next;
459    for(int i = 0; i < RPC_HTABLE_SIZE; ++i) {
460        for (call = client->call_hash[i]; call != NULL; call = next) {
461            free(call->data);
462            next = call->next;
463            free(call);
464        }
465        client->call_hash[i] = NULL;
466    }
467    net_close(client->socket);
468}
469