1/** 2 * \file 3 * \brief RPC implementation 4 */ 5 6/* 7 * Copyright (c) 2008, ETH Zurich. 8 * All rights reserved. 9 * 10 * This file is distributed under the terms in the attached LICENSE file. 11 * If you do not find this file, copies can be found by writing to: 12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group. 13 */ 14 15#include <nfs/xdr.h> 16#include <assert.h> 17#include <net_sockets/net_sockets.h> 18 19#include <barrelfish/barrelfish.h> 20#include <bench/bench.h> 21#include <netbench/netbench.h> 22 23// XXX: kludge making it possible to use bench_tsc without -lbench 24#if defined(__i386__) || defined(__x86_64__) 25bool rdtscp_flag; 26#endif 27 28#define FALSE false 29#define TRUE true 30 31#include "rpc.h" 32#include "rpc_debug.h" 33#include "xdr_pbuf.h" 34 35/// RPC authentication flavour 36enum rpc_auth_flavor { 37 RPC_AUTH_NULL = 0, 38 RPC_AUTH_UNIX = 1, 39 RPC_AUTH_SHORT = 2, 40 RPC_AUTH_DES = 3 41}; 42 43/// RPC message type 44enum rpc_msg_type { 45 RPC_CALL = 0, 46 RPC_REPLY = 1 47}; 48 49//#define RPC_TIMER_PERIOD (5000 * 1000) ///< Time between RPC timer firings (us) 50#define RPC_TIMER_PERIOD (200 * 1000) ///< Time between RPC timer firings (us) 51#define RPC_RETRANSMIT_AFTER 3 ///< Number of timer firings before a retransmit 52#define RPC_MAX_RETRANSMIT 60 ///< Max number of retransmissions before giving up 53 54/* XXX: hardcoded data for authentication info */ 55#define AUTH_MACHINE_NAME "barrelfish" 56#define AUTH_MACHINE_NAME_LEN 10 /* XXX: must match string above */ 57#define AUTH_UID 0 58#define AUTH_GID 0 59#define AUTH_SIZE (RNDUP(AUTH_MACHINE_NAME_LEN) + BYTES_PER_XDR_UNIT * 5) 60 61/// bytes needed for full RPC call header 62#define RPC_CALL_HEADER_LEN (10 * BYTES_PER_XDR_UNIT + AUTH_SIZE) 63 64static uint8_t net_debug_state = 0; 65 66static int hash_function(uint32_t xid) 67{ 68 return (xid % RPC_HTABLE_SIZE); 69} 70 71/// Data for an outstanding (unreplied) RPC call 72struct rpc_call { 73 uint32_t xid; ///< Transaction ID (XID) 74 uint16_t timers, retries; ///< Number of timer expiries and retries 75 uint8_t *data; ///< Pointer for packet data 76 size_t size; 77 rpc_callback_t callback; ///< Callback function pointer 78 void *cbarg1, *cbarg2; ///< Callback function opaque arguments 79 struct rpc_call *next; ///< Next call in queue 80}; 81 82/// Utility function to prepare an outgoing packet buffer with the RPC call header 83static errval_t rpc_call_init(XDR *xdr, uint32_t xid, uint32_t prog, uint32_t vers, 84 uint32_t proc) 85{ 86 int32_t *buf; 87 bool rb; 88 89 /* reserve space for the first part of the header */ 90 if ((buf = XDR_INLINE(xdr, 9 * BYTES_PER_XDR_UNIT)) == NULL) { 91 return LWIP_ERR_BUF; 92 } 93 94 // XID comes first 95 IXDR_PUT_UINT32(buf, xid); 96 97 // message type: call 98 IXDR_PUT_UINT32(buf, RPC_CALL); 99 100 // RPC version 101 IXDR_PUT_UINT32(buf, 2); 102 103 // program number, version number, procedure number 104 IXDR_PUT_UINT32(buf, prog); 105 IXDR_PUT_UINT32(buf, vers); 106 IXDR_PUT_UINT32(buf, proc); 107 108 // CRED: auth_unix (hardcoded) 109 IXDR_PUT_UINT32(buf, RPC_AUTH_UNIX); 110 IXDR_PUT_UINT32(buf, AUTH_SIZE); 111 IXDR_PUT_UINT32(buf, 0); // stamp 112 113 /* here endeth the reserved space. now the machine name string */ 114 char *machname = AUTH_MACHINE_NAME; 115 rb = xdr_string(xdr, &machname, AUTH_MACHINE_NAME_LEN); 116 if (!rb) { 117 return LWIP_ERR_BUF; 118 } 119 120 /* reserve some more space for the rest, which is done inline again */ 121 if ((buf = XDR_INLINE(xdr, 5 * BYTES_PER_XDR_UNIT)) == NULL) { 122 return LWIP_ERR_BUF; 123 } 124 125 // Rest of the CRED 126 IXDR_PUT_UINT32(buf, AUTH_UID); 127 IXDR_PUT_UINT32(buf, AUTH_GID); 128 IXDR_PUT_UINT32(buf, 0); // GIDs 129 130 // VERF: auth_null 131 IXDR_PUT_UINT32(buf, RPC_AUTH_NULL); 132 IXDR_PUT_UINT32(buf, 0); 133 134 return SYS_ERR_OK; 135} 136 137/// Utility function to skip over variable-sized authentication data in a reply 138static errval_t xdr_skip_auth(XDR *xdr) 139{ 140 int32_t *buf; 141 142 if ((buf = XDR_INLINE(xdr, 2 * BYTES_PER_XDR_UNIT)) == NULL) { 143 return LWIP_ERR_BUF; 144 } 145 146 (void) IXDR_GET_UINT32(buf); // skip auth flavour 147 148 size_t auth_size = IXDR_GET_UINT32(buf); 149 150 /* skip over auth data bytes */ 151 if (auth_size > 0) { 152 buf = XDR_INLINE(xdr, auth_size); 153 if (buf == NULL) { 154 return LWIP_ERR_BUF; 155 } 156 } 157 158 return SYS_ERR_OK; 159} 160 161/// Generic handler for all incoming RPC messages. Finds the appropriate call 162/// instance, checks arguments, and notifies the callback. 163static void rpc_recv_handler(void *user_state, struct net_socket *socket, 164 void *data, size_t size, struct in_addr ip_address, uint16_t port) 165{ 166 167// uint64_t ts = rdtsc(); 168 uint32_t replystat, acceptstat; 169 XDR xdr; 170 errval_t r; 171 bool rb; 172 struct rpc_client *client = user_state; 173 struct rpc_call *call = NULL; 174 175 xdr_create_recv(&xdr, data, size); 176 177 int32_t *buf; 178 if ((buf = XDR_INLINE(&xdr, 3 * BYTES_PER_XDR_UNIT)) == NULL) { 179 fprintf(stderr, "RPC: packet too small, dropped\n"); 180 goto out; 181 } 182 183 // XID comes first 184 uint32_t xid = IXDR_GET_UINT32(buf); 185 // message type 186 uint32_t msgtype = IXDR_GET_UINT32(buf); 187 if (msgtype != RPC_REPLY) { 188 fprintf(stderr, "RPC: Received non-reply message, dropped\n"); 189 goto out; 190 } 191 RPC_DEBUGP("rpc_recv_call: RPC callback for xid %u x0%x\n", xid, xid); 192 193 194 int hid = hash_function(xid); 195 struct rpc_call *hash_list = client->call_hash[hid]; 196 // find matching call and dequeue it 197 struct rpc_call *prev = NULL; 198 for (call = hash_list; call && call->xid != xid; call = call->next){ 199 prev = call; 200 } 201 if (call == NULL) { 202 fprintf(stderr, "RPC: Unknown XID 0x%" PRIx32 " in reply, dropped\n", xid); 203/* fprintf(stderr, "RPC:[%d:%s] Unknown XID 0x%x in reply, dropped\n", 204 disp_get_domain_id(), disp_name(), xid); 205*/ 206 goto out; 207 } else if (prev == NULL) { 208 client->call_hash[hid] = call->next; 209 } else { 210 prev->next = call->next; 211 } 212 213 replystat = IXDR_GET_UINT32(buf); 214 if (replystat == RPC_MSG_ACCEPTED) { 215 r = xdr_skip_auth(&xdr); 216 if (r != SYS_ERR_OK) { 217 fprintf(stderr, "RPC: Error in incoming auth data, dropped\n"); 218 goto out; 219 } 220 221 rb = xdr_uint32_t(&xdr, &acceptstat); 222 if (!rb) { 223 fprintf(stderr, "RPC, Error decoding accept status, dropped\n"); 224 goto out; 225 } 226 } else { 227 acceptstat = -1; 228 } 229 230 call->callback(client, call->cbarg1, call->cbarg2, replystat, acceptstat, 231 &xdr); 232 233out: 234 if (call != NULL) { 235 // We got reply, so there is not need for keeping TX packet saved 236 // here for retransmission. Lets free it up. 237 net_free(call->data); 238 free(call); 239 } 240} 241 242static void traverse_hash_bucket(int hid, struct rpc_client *client) 243{ 244 struct rpc_call *call, *next, *prev = NULL; 245 246 for (call = client->call_hash[hid]; call != NULL; call = next) { 247 next = call->next; 248 bool freed_call = false; 249 if (++call->timers >= RPC_RETRANSMIT_AFTER) { 250 if (call->retries++ == RPC_MAX_RETRANSMIT) { 251 /* admit failure */ 252 printf("##### [%d][%"PRIuDOMAINID"] " 253 "RPC: timeout for XID 0x%"PRIu32"\n", 254 disp_get_core_id(), disp_get_domain_id(), call->xid); 255 free(call->data); 256 if (prev == NULL) { 257 client->call_hash[hid] = call->next; 258 } else { 259 prev->next = call->next; 260 } 261 call->callback(client, call->cbarg1, call->cbarg2, -1, -1, NULL); 262 free(call); 263 freed_call = true; 264 } else { 265 /* 266 if(net_debug_state == 0) { 267 net_debug_state = 1; 268 printf("starting the debug in network driver\n"); 269 lwip_benchmark_control(0, BMS_START_REQUEST, 270 0, rdtsc()); 271 lwip_benchmark_control(1, BMS_START_REQUEST, 272 0, rdtsc()); 273 } else { 274 printf("already started the debug in network driver\n"); 275 } 276 */ 277 278 /* retransmit */ 279 RPC_DEBUGP("###### [%d][%"PRIuDOMAINID"] " 280 "RPC: retransmit XID 0x%"PRIu32"\n", 281 disp_get_core_id(), disp_get_domain_id(), call->xid); 282 283 // throw away (hide) UDP/IP/ARP headers from previous transmission 284 // err_t e = pbuf_header(call->pbuf, 285 // -UDP_HLEN - IP_HLEN - PBUF_LINK_HLEN); 286 // assert(e == SYS_ERR_OK); 287 288 errval_t e = net_send_to(client->socket, call->data, call->size, client->connected_address, client->connected_port); 289 if (e != SYS_ERR_OK) { 290 /* XXX: assume that this is a transient condition, retry */ 291 fprintf(stderr, "RPC: retransmit failed! will retry...\n"); 292 call->timers--; 293 } else { 294 call->timers = 0; 295 } 296 } 297 } 298 if (!freed_call) { 299 prev = call; 300 } 301 } /* end for: */ 302} 303 304/// Timer callback: walk the queue of pending calls, and retransmit/expire them 305static void rpc_timer(void *arg) 306{ 307 struct rpc_client *client = arg; 308 RPC_DEBUGP("rpc_timer fired\n"); 309 for (int i = 0; i < RPC_HTABLE_SIZE; ++i) { 310 if (client->call_hash[i] != NULL) { 311 traverse_hash_bucket(i, client); 312 } 313 } 314} 315 316 317/** 318 * \brief Initialise a new RPC client instance 319 * 320 * \param client Pointer to memory for RPC client data, to be initialised 321 * \param server IP address of server to be called 322 * 323 * \returns Error code (SYS_ERR_OK on success) 324 */ 325errval_t rpc_init(struct rpc_client *client, struct in_addr server) 326{ 327 errval_t err; 328 329 client->socket = net_udp_socket(); 330 assert(client->socket); 331 net_set_user_state(client->socket, client); 332 net_set_on_received(client->socket, rpc_recv_handler); 333 334 net_debug_state = 0; 335 336 client->server = server; 337 client->connected_address.s_addr = INADDR_NONE; 338 client->connected_port = 0; 339 340 for (int i = 0; i < RPC_HTABLE_SIZE; ++i) { 341 client->call_hash[i] = NULL; 342 } 343 344 /* XXX: (very) pseudo-random number for initial XID */ 345 client->nextxid = (uint32_t)bench_tsc(); 346 347 RPC_DEBUGP("###### Initial sequence no. is %"PRIu32" 0x%"PRIx32"\n", 348 client->nextxid, client->nextxid); 349 err = periodic_event_create(&client->timer, get_default_waitset(), 350 RPC_TIMER_PERIOD, MKCLOSURE(rpc_timer, client)); 351 assert(err_is_ok(err)); 352 if (err_is_fail(err)) { 353 printf("rpc timer creation failed\n"); 354 net_close(client->socket); 355 return LWIP_ERR_MEM; 356 } 357 RPC_DEBUGP("rpc timer created\n"); 358 359 return SYS_ERR_OK; 360} 361 362 363 364/** 365 * \brief Initiate an RPC Call 366 * 367 * \param client RPC client, previously initialised by a call to rpc_init() 368 * \param port UDP port on server to call 369 * \param prog RPC program number 370 * \param vers RPC program version 371 * \param proc RPC procedure number 372 * \param args_xdrproc XDR serialisation function for arguments to call 373 * \param args Argument data to be passed to #args_xdrproc 374 * \param args_size Upper bound on size of serialised argument data 375 * \param callback Callback function to be invoked when call either completes or fails 376 * \param cbarg1,cbarg2 Opaque arguments to be passed to callback function 377 * 378 * \returns Error code (SYS_ERR_OK on success) 379 */ 380errval_t rpc_call(struct rpc_client *client, uint16_t port, uint32_t prog, 381 uint32_t vers, uint32_t proc, xdrproc_t args_xdrproc, void *args, 382 size_t args_size, rpc_callback_t callback, void *cbarg1, 383 void *cbarg2) 384{ 385 386 XDR xdr; 387 errval_t r; 388 bool rb; 389 uint32_t xid; 390 391 RPC_DEBUGP("rpc_call: calling xdr_create_send\n"); 392 rb = xdr_create_send(&xdr, args_size + RPC_CALL_HEADER_LEN); 393 if (!rb) { 394 return LWIP_ERR_MEM; 395 } 396 397 xid = client->nextxid++; 398 399 RPC_DEBUGP("rpc_call: calling rpc_call_init\n"); 400 r = rpc_call_init(&xdr, xid, prog, vers, proc); 401 if (r != SYS_ERR_OK) { 402 XDR_DESTROY(&xdr); 403 return r; 404 } 405 RPC_DEBUGP("rpc_call: rpc_call_init done\n"); 406 407 rb = args_xdrproc(&xdr, args); 408 if (!rb) { 409 XDR_DESTROY(&xdr); 410 return LWIP_ERR_BUF; 411 } 412 413 struct rpc_call *call = malloc(sizeof(struct rpc_call)); 414 if (call == NULL) { 415 XDR_DESTROY(&xdr); 416 return LWIP_ERR_MEM; 417 } 418 call->xid = xid; 419 call->retries = call->timers = 0; 420 call->data = xdr.x_private; 421 call->size = xdr.size; 422 call->callback = callback; 423 call->cbarg1 = cbarg1; 424 call->cbarg2 = cbarg2; 425 call->next = NULL; 426 427 RPC_DEBUGP("rpc_call: RPC call for xid %u x0%x\n", xid, xid); 428 RPC_DEBUGP("rpc_call: calling UPD_connect\n"); 429 client->connected_address = client->server; 430 client->connected_port = port; 431 432 /* enqueue */ 433 int hid = hash_function(xid); 434 call->next = client->call_hash[hid]; 435 client->call_hash[hid] = call; 436 437 RPC_DEBUGP("rpc_call: calling UPD_send\n"); 438 r = net_send_to(client->socket, call->data, call->size, client->connected_address, client->connected_port); 439 if (r != SYS_ERR_OK) { 440 /* dequeue */ 441 assert(client->call_hash[hid] == call); 442 client->call_hash[hid] = call->next; 443 /* destroy */ 444 XDR_DESTROY(&xdr); 445 free(call); 446 } 447 448 RPC_DEBUGP("rpc_call: rpc_call done\n"); 449 return r; 450} 451 452/// Destroy the given client, freeing any associated memory 453void rpc_destroy(struct rpc_client *client) 454{ 455 periodic_event_cancel(&client->timer); 456 457 /* go through list of pending requests and free them */ 458 struct rpc_call *call, *next; 459 for(int i = 0; i < RPC_HTABLE_SIZE; ++i) { 460 for (call = client->call_hash[i]; call != NULL; call = next) { 461 free(call->data); 462 next = call->next; 463 free(call); 464 } 465 client->call_hash[i] = NULL; 466 } 467 net_close(client->socket); 468} 469