1#include "betalk.h"
2#include "sysdepdefs.h"
3#include "beserved_rpc.h"
4
5#include "netdb.h"
6
7#define BT_RPC_THREAD_NAME			"BeServed RPC Marshaller"
8#define BT_MAX_TOTAL_ATTEMPTS		4
9#define BT_ATTEMPTS_BEFORE_RESTART	2
10
11bool btReconnect(bt_rpcinfo *info);
12int32 btRPCReceive(void *data);
13
14int btRecv(int sock, void *data, int dataLen, int flags);
15int btSend(int sock, void *data, int dataLen, int flags);
16
17void btRPCRecordCall(bt_rpccall *call);
18void btRPCFreeCall(bt_rpccall *call);
19void btCreateInPacket(bt_rpccall *call, char *buffer, unsigned int length);
20void btDestroyInPacket(bt_inPacket *packet);
21
22bt_rpccall *rootCall;
23sem_id callSem;
24sem_id connectSem;
25int32 nextXID = 1;
26
27
28void btRPCInit(bt_rpcinfo *info)
29{
30	info->s = INVALID_SOCKET;
31	info->rpcThread = 0;
32	info->quitXID = 0;
33
34	callSem = create_sem(1, "rpcCall");
35//	set_sem_owner(callSem, B_SYSTEM_TEAM);
36
37	connectSem = create_sem(1, "rpcConnection");
38//	set_sem_owner(connectSem, B_SYSTEM_TEAM);
39
40	info->rpcThread = spawn_thread(btRPCReceive, BT_RPC_THREAD_NAME, B_NORMAL_PRIORITY, info);
41	resume_thread(info->rpcThread);
42}
43
44void btRPCClose(bt_rpcinfo *info)
45{
46	if (info->rpcThread > 0)
47	{
48		status_t exitVal;
49		wait_for_thread(info->rpcThread, &exitVal);
50	}
51
52	// Close the socket used for all file system RPC communications,
53	// now that we know the RPC recipient thread is dead.
54	closesocket(info->s);
55
56	delete_sem(connectSem);
57	delete_sem(callSem);
58}
59
60int btRecvMsg(int sock, void *data, int dataLen, int flags)
61{
62	int bytesRead = 0;
63	do
64	{
65		int bytes = btRecv(sock, (char *) data + bytesRead, dataLen - bytesRead, flags);
66		if (bytes == -1)
67			return -1;
68
69		bytesRead += bytes;
70	} while (bytesRead < dataLen);
71
72	return bytesRead;
73}
74
75int btRecv(int sock, void *data, int dataLen, int flags)
76{
77	int bytes;
78
79	for (;;)
80	{
81		bytes = recv(sock, data, dataLen, flags);
82		if (bytes == 0)
83			return -1;
84		else if (bytes == -1)
85			if (errno == EINTR)
86				continue;
87			else
88				return -1;
89		else
90			break;
91	}
92
93	return bytes;
94}
95
96int btSendMsg(int sock, void *data, int dataLen, int flags)
97{
98	int bytesSent = 0;
99	do
100	{
101		int bytes = btSend(sock, (char *) data + bytesSent, dataLen - bytesSent, flags);
102		if (bytes == -1)
103			return -1;
104
105		bytesSent += bytes;
106	} while (bytesSent < dataLen);
107
108	return bytesSent;
109}
110
111int btSend(int sock, void *data, int dataLen, int flags)
112{
113	int bytes;
114
115	for (;;)
116	{
117		bytes = send(sock, data, dataLen, flags);
118		if (bytes == -1)
119			if (errno == EINTR)
120				continue;
121			else
122				return -1;
123		else
124			break;
125	}
126
127	return bytes;
128}
129
130int32 btRPCReceive(void *data)
131{
132	bt_rpcinfo *info = (bt_rpcinfo *) data;
133	bt_rpccall *call;
134	char signature[20], *buffer;
135	int32 xid, length, sigLen;
136	int bytes;
137	bool failure = false;
138
139	while (info->s == INVALID_SOCKET)
140		snooze(100);
141
142	int sock = info->s;
143
144#ifdef BONE_VERSION
145	fd_set sockSet;
146	struct timeval timeout;
147
148	FD_ZERO(&sockSet);
149	timeout.tv_sec = 30;
150	timeout.tv_usec = 0;
151#endif
152
153	while (!failure)
154	{
155#ifdef BONE_VERSION
156		FD_SET(sock, &sockSet);
157		select(sock + 1, &sockSet, NULL, NULL, &timeout);
158
159		if (FD_ISSET(sock, &sockSet))
160		{
161#endif
162
163		// Receive the signature.  If a socket error occurs, break out of the loop and
164		// effectively exit this thread because the socket is closed.
165		sigLen = strlen(BT_RPC_SIGNATURE);
166		memset(signature, 0, sigLen);
167		if (btRecvMsg(sock, signature, sigLen, 0) == -1)
168			break;
169
170		// Make sure the signature is correct.  Otherwise, ignore it and start over.
171		signature[sigLen] = 0;
172		if (strcmp(signature, BT_RPC_SIGNATURE))
173			continue;
174
175		// Now read the transaction id (XID) and the length of the packet body.
176		bytes = btRecvMsg(sock, &xid, sizeof(int32), 0);
177		if (bytes > 0)
178			bytes = btRecvMsg(sock, &length, sizeof(int32), 0);
179
180		xid = B_LENDIAN_TO_HOST_INT32(xid);
181		length = B_LENDIAN_TO_HOST_INT32(length);
182		if (length <= 0 || length >= BT_RPC_MAX_PACKET_SIZE)
183			continue;
184
185		buffer = (char *) malloc(length + 1);
186		if (!buffer)
187			continue;
188
189		// Read the remainder of the packet.
190		if (btRecvMsg(sock, buffer, length, 0) == -1)
191			failure = true;
192
193		buffer[length] = 0;
194
195		while (acquire_sem(callSem) == B_INTERRUPTED);
196
197		call = rootCall;
198		while (call)
199		{
200			if (call->xid == xid)
201			{
202				btCreateInPacket(call, buffer, length);
203				release_sem(call->sem);
204				break;
205			}
206
207			call = call->next;
208		}
209
210		release_sem(callSem);
211
212		// The originating RPC call was not found.  This is probably not a very
213		// good sign.
214		if (!call)
215			free(buffer);
216
217		// If a valid quit XID has been defined, and it's equal to the current
218		// XID, quit.
219		if (info->quitXID)
220			if (info->quitXID == xid)
221				break;
222
223#ifdef BONE_VERSION
224	}
225#endif
226
227	}
228}
229
230void btCreateInPacket(bt_rpccall *call, char *buffer, unsigned int length)
231{
232	bt_inPacket *packet;
233
234	packet = (bt_inPacket *) malloc(sizeof(bt_inPacket));
235	if (!packet)
236		return;
237
238	packet->buffer = buffer;
239	packet->length = length;
240	packet->offset = 0;
241
242	call->inPacket = packet;
243}
244
245void btDestroyInPacket(bt_inPacket *packet)
246{
247	if (packet)
248	{
249		if (packet->buffer)
250			free(packet->buffer);
251
252		free(packet);
253	}
254}
255
256void btRPCRecordCall(bt_rpccall *call)
257{
258	bt_rpccall *curCall, *lastCall;
259
260	while (acquire_sem(callSem) == B_INTERRUPTED);
261
262	curCall = lastCall = rootCall;
263	while (curCall)
264	{
265		lastCall = curCall;
266		curCall = curCall->next;
267	}
268
269	call->next = NULL;
270	call->prev = lastCall;
271
272	if (lastCall == NULL)
273		rootCall = call;
274	else
275		lastCall->next = call;
276
277	release_sem(callSem);
278}
279
280// btRPCRemoveCall()
281//
282void btRPCRemoveCall(bt_rpccall *call)
283{
284	if (call)
285	{
286		if (call->sem > 0)
287			delete_sem(call->sem);
288
289		while (acquire_sem(callSem) == B_INTERRUPTED);
290
291		// Make this entry's predecessor point to its successor.
292		if (call->prev)
293			call->prev->next = call->next;
294
295		// Make this entry's successor point to its predecessor.
296		if (call->next)
297			call->next->prev = call->prev;
298
299		// If we just deleted the root node of the list, then the next node
300		// has become the root.
301		if (call == rootCall)
302			rootCall = call->next;
303
304		release_sem(callSem);
305
306		// Now we can release the memory allocated for this packet.
307		btDestroyInPacket(call->inPacket);
308		free(call);
309	}
310}
311
312// btConnect()
313//
314int btConnect(bt_rpcinfo *info, unsigned int serverIP, int port)
315{
316	struct sockaddr_in serverAddr;
317	int session, addrLen;
318#ifdef BONE_VERSION
319	int flags;
320#endif
321
322	info->serverIP = serverIP;
323	info->serverPort = port;
324
325	// Store the length of the socket addressing structure for accept().
326	addrLen = sizeof(struct sockaddr_in);
327
328	// Initialize the server address structure.
329	memset(&serverAddr, 0, sizeof(serverAddr));
330	serverAddr.sin_port = htons(port);
331	serverAddr.sin_family = AF_INET;
332	serverAddr.sin_addr.s_addr = htonl(serverIP);
333
334	// Create a new socket to receive incoming requests.
335	session = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
336	if (session == INVALID_SOCKET)
337		return INVALID_SOCKET;
338
339	// Bind that socket to the address constructed above.
340	if (connect(session, (struct sockaddr *) &serverAddr, sizeof(serverAddr)))
341		return INVALID_SOCKET;
342
343	// Enabled periodic keep-alive messages, so we stay connected during times of
344	// no activity.  This isn't supported by ksocketd, only in BONE.  :-(
345#ifdef BONE_VERSION
346	flags = 1;
347	setsockopt(session, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
348#endif
349
350	return session;
351}
352
353void btDisconnect(bt_rpcinfo *info)
354{
355	bt_outPacket *packet;
356	bt_rpccall *call;
357
358	packet = btRPCPutHeader(BT_CMD_QUIT, 0, 0);
359	call = btRPCInvoke(info, packet, true);
360	if (call)
361		btRPCRemoveCall(call);
362}
363
364bt_outPacket *btRPCPutHeader(unsigned char command, unsigned char argc, int32 length)
365{
366	bt_outPacket *packet;
367
368	packet = (bt_outPacket *) malloc(sizeof(bt_outPacket));
369	if (!packet)
370		return NULL;
371
372	packet->size = BT_RPC_MIN_PACKET_SIZE;
373	packet->buffer = (char *) malloc(packet->size);
374	packet->length = 0;
375
376	if (!packet->buffer)
377	{
378		free(packet);
379		return NULL;
380	}
381
382	strcpy(packet->buffer, BT_RPC_SIGNATURE);
383	packet->length += strlen(BT_RPC_SIGNATURE);
384
385//	btRPCPutChar(packet, BT_RPC_VERSION_HI);
386//	btRPCPutChar(packet, BT_RPC_VERSION_LO);
387	btRPCPutInt32(packet, 7 + (8 * argc) + length);
388	btRPCPutChar(packet, command);
389	btRPCPutChar(packet, argc);
390
391	return packet;
392}
393
394void btRPCPutArg(bt_outPacket *packet, unsigned int type, void *data, int length)
395{
396	btRPCPutInt32(packet, type);
397	btRPCPutInt32(packet, length);
398	btRPCPutBinary(packet, data, length);
399}
400
401bt_rpccall *btRPCInvoke(bt_rpcinfo *info, bt_outPacket *packet, bool lastPkt)
402{
403	status_t result;
404	bt_rpccall *call;
405	int attempts;
406	bool failure;
407
408	call = (bt_rpccall *) malloc(sizeof(bt_rpccall));
409	if (!call)
410		return NULL;
411
412	attempts = 0;
413
414	call->inPacket = NULL;
415	call->xid = atomic_add(&nextXID, 1);
416	if ((call->sem = create_sem(0, "rpc call")) < B_OK)
417	{
418		free(call);
419		return NULL;
420	}
421
422	btRPCRecordCall(call);
423
424	btRPCPutInt32(packet, call->xid);
425	btRPCPutChar(packet, BT_CMD_TERMINATOR);
426
427	// If this is the last RPC packet that will be transmitted, store
428	// its XID so the RPC recipient thread will know when to quit.
429	if (lastPkt)
430		info->quitXID = call->xid;
431
432doSend:
433	failure = false;
434	if (btSendMsg(info->s, packet->buffer, packet->length, 0) == -1)
435		failure = true;
436
437	if (!failure)
438		do
439		{
440			result = acquire_sem_etc(call->sem, 1, B_RELATIVE_TIMEOUT, 2500000);
441		}
442		while (result == B_INTERRUPTED);
443
444	if (failure || result == B_TIMED_OUT)
445	{
446		attempts++;
447		if (attempts >= BT_MAX_TOTAL_ATTEMPTS)
448		{
449			free(packet->buffer);
450			free(packet);
451			btRPCRemoveCall(call);
452			return NULL;
453		}
454		else if (attempts == BT_ATTEMPTS_BEFORE_RESTART)
455			btReconnect(info);
456
457		goto doSend;
458	}
459
460	free(packet->buffer);
461	free(packet);
462	return call;
463}
464
465bool btReconnect(bt_rpcinfo *info)
466{
467	static int counter = 0;
468	int curCount = counter;
469	bool connected = true;
470
471	while (acquire_sem(connectSem) == B_INTERRUPTED);
472
473	if (curCount == counter)
474	{
475		connected = false;
476
477		closesocket(info->s);
478		if (info->rpcThread > 0)
479			kill_thread(info->rpcThread);
480
481		info->s = btConnect(info, info->serverIP, info->serverPort);
482		if (info->s != INVALID_SOCKET)
483		{
484			info->rpcThread = spawn_thread(btRPCReceive, BT_RPC_THREAD_NAME, B_NORMAL_PRIORITY, info);
485			resume_thread(info->rpcThread);
486			connected = true;
487		}
488
489		counter++;
490	}
491
492	release_sem(connectSem);
493	return connected;
494}
495
496void btRPCGrowPacket(bt_outPacket *packet, int bytes)
497{
498	if (packet->length + bytes > packet->size)
499	{
500		int growth = ((bytes / BT_RPC_MIN_PACKET_SIZE) + 1) * BT_RPC_MIN_PACKET_SIZE;
501		packet->buffer = (char *) realloc(packet->buffer, packet->size + growth);
502		packet->size += growth;
503	}
504}
505
506unsigned int btRPCGetInt32(bt_inPacket *packet)
507{
508	int32 value;
509
510	if (packet->offset < packet->length)
511		value = B_LENDIAN_TO_HOST_INT32(*((int32 *) &packet->buffer[packet->offset]));
512	else
513		value = 0;
514
515	packet->offset += sizeof(value);
516	return value;
517}
518
519int64 btRPCGetInt64(bt_inPacket *packet)
520{
521	int64 value;
522
523	if (packet->offset < packet->length)
524		value = B_LENDIAN_TO_HOST_INT64(*((int64 *) &packet->buffer[packet->offset]));
525	else
526		value = 0;
527
528	packet->offset += sizeof(value);
529	return value;
530}
531
532char *btRPCGetNewString(bt_inPacket *packet)
533{
534	char *str;
535	unsigned int bytes;
536
537	if (packet->offset >= packet->length)
538		return NULL;
539
540	bytes = B_LENDIAN_TO_HOST_INT32(*((int32 *) &packet->buffer[packet->offset]));
541	packet->offset += sizeof(bytes);
542	if (!bytes)
543		return NULL;
544
545	str = (char *) malloc(bytes + 1);
546	if (!str)
547		return NULL;
548
549	memcpy(str, &packet->buffer[packet->offset], bytes);
550	str[bytes] = 0;
551
552	packet->offset += bytes;
553
554	return str;
555}
556
557int btRPCGetString(bt_inPacket *packet, char *buffer, int length)
558{
559	unsigned int bytes;
560
561	if (packet->offset >= packet->length)
562		return 0;
563
564	bytes = B_LENDIAN_TO_HOST_INT64(*((int32 *) &packet->buffer[packet->offset]));
565	packet->offset += sizeof(bytes);
566	if (!bytes)
567		return 0L;
568
569	if (length < bytes)
570		return ERANGE;
571
572	memcpy(buffer, &packet->buffer[packet->offset], bytes);
573	packet->offset += bytes;
574	return bytes;
575}
576
577void btRPCPutChar(bt_outPacket *packet, char value)
578{
579	btRPCGrowPacket(packet, sizeof(value));
580	packet->buffer[packet->length] = value;
581	packet->length += sizeof(value);
582}
583
584void btRPCPutInt32(bt_outPacket *packet, int32 value)
585{
586	btRPCGrowPacket(packet, sizeof(value));
587	*(int32 *)(&packet->buffer[packet->length]) = B_HOST_TO_LENDIAN_INT32(value);
588	packet->length += sizeof(value);
589}
590
591void btRPCPutInt64(bt_outPacket *packet, int64 value)
592{
593	btRPCGrowPacket(packet, sizeof(value));
594	*(int64 *)(&packet->buffer[packet->length]) = B_HOST_TO_LENDIAN_INT64(value);
595	packet->length += sizeof(value);
596}
597
598void btRPCPutString(bt_outPacket *packet, char *buffer, int length)
599{
600	if (packet && buffer)
601	{
602		btRPCGrowPacket(packet, sizeof(length) + length);
603		btRPCPutInt32(packet, length);
604		memcpy(&packet->buffer[packet->length], buffer, length);
605		packet->length += length;
606	}
607}
608
609void btRPCPutBinary(bt_outPacket *packet, void *buffer, int length)
610{
611	if (packet && buffer)
612	{
613		btRPCGrowPacket(packet, length);
614		memcpy(&packet->buffer[packet->length], buffer, length);
615		packet->length += length;
616	}
617}
618
619int btRPCGetStat(bt_inPacket *packet, struct stat *st)
620{
621	st->st_dev = 0;
622	st->st_nlink = btRPCGetInt32(packet);
623	st->st_uid = btRPCGetInt32(packet);
624	st->st_gid = btRPCGetInt32(packet);
625	st->st_size = btRPCGetInt64(packet);
626	st->st_blksize = btRPCGetInt32(packet);
627	st->st_rdev = btRPCGetInt32(packet);
628	st->st_ino = btRPCGetInt64(packet);
629	st->st_mode = btRPCGetInt32(packet);
630	st->st_atime = btRPCGetInt32(packet);
631	st->st_mtime = btRPCGetInt32(packet);
632	st->st_ctime = btRPCGetInt32(packet);
633}
634
635void btRPCPutStat(bt_outPacket *packet, struct stat *st)
636{
637	if (packet && st)
638	{
639		btRPCPutInt32(packet, (int) st->st_nlink);
640		btRPCPutInt32(packet, (int) st->st_uid);
641		btRPCPutInt32(packet, (int) st->st_gid);
642		btRPCPutInt64(packet, (int64) st->st_size);
643		btRPCPutInt32(packet, (int) st->st_blksize);
644		btRPCPutInt32(packet, (int) st->st_rdev);
645		btRPCPutInt64(packet, (int64) st->st_ino);
646		btRPCPutInt32(packet, (int) st->st_mode);
647		btRPCPutInt32(packet, (int) st->st_atime);
648		btRPCPutInt32(packet, (int) st->st_mtime);
649		btRPCPutInt32(packet, (int) st->st_ctime);
650	}
651}
652