1/*
2 * Copyright 2012-2013 Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *		Pawe�� Dziepak, pdziepak@quarnos.org
7 */
8
9
10#include "RPCCallbackServer.h"
11
12#include "NFS4Defs.h"
13#include "RPCCallback.h"
14#include "RPCCallbackReply.h"
15#include "RPCCallbackRequest.h"
16#include "RPCServer.h"
17
18
19using namespace RPC;
20
21
22CallbackServer* gRPCCallbackServer		= NULL;
23CallbackServer* gRPCCallbackServer6		= NULL;
24
25
26CallbackServer::CallbackServer(int networkFamily)
27	:
28	fConnectionList(NULL),
29	fListener(NULL),
30	fThreadRunning(false),
31	fCallbackArray(NULL),
32	fArraySize(0),
33	fFreeSlot(-1),
34	fNetworkFamily(networkFamily)
35{
36	mutex_init(&fConnectionLock, NULL);
37	mutex_init(&fThreadLock, NULL);
38	rw_lock_init(&fArrayLock, NULL);
39}
40
41
42CallbackServer::~CallbackServer()
43{
44	StopServer();
45
46	free(fCallbackArray);
47	rw_lock_destroy(&fArrayLock);
48	mutex_destroy(&fThreadLock);
49	mutex_destroy(&fConnectionLock);
50}
51
52
53CallbackServer*
54CallbackServer::Get(Server* server)
55{
56	ASSERT(server != NULL);
57
58	int family = server->ID().Family();
59	ASSERT(family == AF_INET || family == AF_INET6);
60
61	int idx;
62	switch (family) {
63		case AF_INET:
64			idx = 0;
65			break;
66		case AF_INET6:
67			idx = 1;
68			break;
69		default:
70			return NULL;
71	}
72
73	MutexLocker _(fServerCreationLock);
74	if (fServers[idx] == NULL)
75		fServers[idx] = new CallbackServer(family);
76	return fServers[idx];
77}
78
79
80void
81CallbackServer::ShutdownAll()
82{
83	MutexLocker _(fServerCreationLock);
84	for (unsigned int i = 0; i < sizeof(fServers) / sizeof(fServers[0]); i++)
85		delete fServers[i];
86	memset(&fServers, 0, sizeof(fServers));
87}
88
89
90mutex			CallbackServer::fServerCreationLock = MUTEX_INITIALIZER(NULL);
91CallbackServer*	CallbackServer::fServers[2] = { NULL, NULL };
92
93
94status_t
95CallbackServer::RegisterCallback(Callback* callback)
96{
97	ASSERT(callback != NULL);
98
99	status_t result = StartServer();
100	if (result != B_OK)
101		return result;
102
103	WriteLocker _(fArrayLock);
104	if (fFreeSlot == -1) {
105		uint32 newSize = max_c(fArraySize * 2, 4);
106		uint32 size = newSize * sizeof(CallbackSlot);
107		CallbackSlot* array	= reinterpret_cast<CallbackSlot*>(malloc(size));
108		if (array == NULL)
109			return B_NO_MEMORY;
110
111		if (fCallbackArray != NULL)
112			memcpy(array, fCallbackArray, fArraySize * sizeof(CallbackSlot));
113
114		for (uint32 i = fArraySize; i < newSize; i++)
115			array[i].fNext = i + 1;
116
117		array[newSize - 1].fNext = -1;
118
119		fCallbackArray = array;
120		fFreeSlot = fArraySize;
121		fArraySize = newSize;
122	}
123
124	int32 id = fFreeSlot;
125	fFreeSlot = fCallbackArray[id].fNext;
126
127	fCallbackArray[id].fCallback = callback;
128	callback->SetID(id);
129	callback->SetCBServer(this);
130
131	return B_OK;
132}
133
134
135status_t
136CallbackServer::UnregisterCallback(Callback* callback)
137{
138	ASSERT(callback != NULL);
139	ASSERT(callback->CBServer() == this);
140
141	int32 id = callback->ID();
142
143	WriteLocker _(fArrayLock);
144	fCallbackArray[id].fNext = fFreeSlot;
145	fFreeSlot = id;
146
147	callback->SetCBServer(NULL);
148	return B_OK;
149}
150
151
152status_t
153CallbackServer::StartServer()
154{
155	MutexLocker _(fThreadLock);
156	if (fThreadRunning)
157		return B_OK;
158
159	status_t result = ConnectionListener::Listen(&fListener, fNetworkFamily);
160	if (result != B_OK)
161		return result;
162
163	fThread = spawn_kernel_thread(&CallbackServer::ListenerThreadLauncher,
164		"NFSv4 Callback Listener", B_NORMAL_PRIORITY, this);
165	if (fThread < B_OK)
166		return fThread;
167
168	fThreadRunning = true;
169
170	result = resume_thread(fThread);
171	if (result != B_OK) {
172		kill_thread(fThread);
173		fThreadRunning = false;
174		return result;
175	}
176
177	return B_OK;
178}
179
180
181status_t
182CallbackServer::StopServer()
183{
184	MutexLocker _(&fThreadLock);
185	if (!fThreadRunning)
186		return B_OK;
187
188	fListener->Disconnect();
189	status_t result;
190	wait_for_thread(fThread, &result);
191
192	MutexLocker locker(fConnectionLock);
193	while (fConnectionList != NULL) {
194		ConnectionEntry* entry = fConnectionList;
195		fConnectionList = entry->fNext;
196		entry->fConnection->Disconnect();
197
198		status_t result;
199		wait_for_thread(entry->fThread, &result);
200
201		delete entry->fConnection;
202		delete entry;
203	}
204
205	delete fListener;
206
207	fThreadRunning = false;
208	return B_OK;
209}
210
211
212status_t
213CallbackServer::NewConnection(Connection* connection)
214{
215	ASSERT(connection != NULL);
216
217	ConnectionEntry* entry = new ConnectionEntry;
218	entry->fConnection = connection;
219	entry->fPrev = NULL;
220
221	MutexLocker locker(fConnectionLock);
222	entry->fNext = fConnectionList;
223	if (fConnectionList != NULL)
224		fConnectionList->fPrev = entry;
225	fConnectionList = entry;
226	locker.Unlock();
227
228	void** arguments = reinterpret_cast<void**>(malloc(sizeof(void*) * 2));
229	if (arguments == NULL)
230		return B_NO_MEMORY;
231
232	arguments[0] = this;
233	arguments[1] = entry;
234
235	thread_id thread;
236	thread = spawn_kernel_thread(&CallbackServer::ConnectionThreadLauncher,
237		"NFSv4 Callback Connection", B_NORMAL_PRIORITY, arguments);
238	if (thread < B_OK) {
239		ReleaseConnection(entry);
240		free(arguments);
241		return thread;
242	}
243
244	entry->fThread = thread;
245
246	status_t result = resume_thread(thread);
247	if (result != B_OK) {
248		kill_thread(thread);
249		ReleaseConnection(entry);
250		free(arguments);
251		return result;
252	}
253
254	return B_OK;
255}
256
257
258status_t
259CallbackServer::ReleaseConnection(ConnectionEntry* entry)
260{
261	ASSERT(entry != NULL);
262
263	MutexLocker _(fConnectionLock);
264	if (entry->fNext != NULL)
265		entry->fNext->fPrev = entry->fPrev;
266	if (entry->fPrev != NULL)
267		entry->fPrev->fNext = entry->fNext;
268	else
269		fConnectionList = entry->fNext;
270
271	delete entry->fConnection;
272	delete entry;
273	return B_OK;
274}
275
276
277status_t
278CallbackServer::ConnectionThreadLauncher(void* object)
279{
280	ASSERT(object != NULL);
281
282	void** objects = reinterpret_cast<void**>(object);
283	CallbackServer* server = reinterpret_cast<CallbackServer*>(objects[0]);
284	ConnectionEntry* entry = reinterpret_cast<ConnectionEntry*>(objects[1]);
285	free(objects);
286
287	return server->ConnectionThread(entry);
288}
289
290
291status_t
292CallbackServer::ConnectionThread(ConnectionEntry* entry)
293{
294	ASSERT(entry != NULL);
295
296	Connection* connection = entry->fConnection;
297	CallbackReply* reply;
298
299	while (fThreadRunning) {
300		uint32 size;
301		void* buffer;
302		status_t result = connection->Receive(&buffer, &size);
303		if (result != B_OK) {
304			if (result != ECONNABORTED)
305				ReleaseConnection(entry);
306			return result;
307		}
308
309		CallbackRequest* request
310			= new(std::nothrow) CallbackRequest(buffer, size);
311		if (request == NULL) {
312			free(buffer);
313			continue;
314		} else if (request->Error() != B_OK) {
315			reply = CallbackReply::Create(request->XID(), request->RPCError());
316			if (reply != NULL) {
317				connection->Send(reply->Stream().Buffer(),
318					reply->Stream().Size());
319				delete reply;
320			}
321			delete request;
322			continue;
323		}
324
325		switch (request->Procedure()) {
326			case CallbackProcCompound:
327				GetCallback(request->ID())->EnqueueRequest(request, connection);
328				break;
329
330			case CallbackProcNull:
331				reply = CallbackReply::Create(request->XID());
332				if (reply != NULL) {
333					connection->Send(reply->Stream().Buffer(),
334						reply->Stream().Size());
335					delete reply;
336				}
337
338			default:
339				delete request;
340		}
341	}
342
343	return B_OK;
344}
345
346
347status_t
348CallbackServer::ListenerThreadLauncher(void* object)
349{
350	ASSERT(object != NULL);
351
352	CallbackServer* server = reinterpret_cast<CallbackServer*>(object);
353	return server->ListenerThread();
354}
355
356
357status_t
358CallbackServer::ListenerThread()
359{
360	while (fThreadRunning) {
361		Connection* connection;
362
363		status_t result = fListener->AcceptConnection(&connection);
364		if (result != B_OK) {
365			fThreadRunning = false;
366			return result;
367		}
368		result = NewConnection(connection);
369		if (result != B_OK)
370			delete connection;
371	}
372
373	return B_OK;
374}
375
376