1/*
2 * Copyright 2012 Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 *
5 * Authors:
6 *		Pawe�� Dziepak, pdziepak@quarnos.org
7 */
8
9
10#include "WorkQueue.h"
11
12#include <io_requests.h>
13
14
15#define	MAX_BUFFER_SIZE			(1024 * 1024)
16
17WorkQueue*		gWorkQueue		= NULL;
18
19
20WorkQueue::WorkQueue()
21	:
22	fQueueSemaphore(create_sem(0, NULL)),
23	fThreadCancel(create_sem(0, NULL))
24{
25	mutex_init(&fQueueLock, NULL);
26
27	fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread,
28		"NFSv4 Work Queue", B_NORMAL_PRIORITY, this);
29	if (fThread < B_OK) {
30		fInitError = fThread;
31		return;
32	}
33
34	status_t result = resume_thread(fThread);
35	if (result != B_OK) {
36		kill_thread(fThread);
37		fInitError = result;
38		return;
39	}
40
41	fInitError = B_OK;
42}
43
44
45WorkQueue::~WorkQueue()
46{
47	release_sem(fThreadCancel);
48
49	status_t result;
50	wait_for_thread(fThread, &result);
51
52	mutex_destroy(&fQueueLock);
53	delete_sem(fThreadCancel);
54	delete_sem(fQueueSemaphore);
55}
56
57
58status_t
59WorkQueue::EnqueueJob(JobType type, void* args)
60{
61	WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
62	if (entry == NULL)
63		return B_NO_MEMORY;
64
65	entry->fType = type;
66	entry->fArguments = args;
67	if (type == IORequest)
68		reinterpret_cast<IORequestArgs*>(args)->fInode->BeginAIOOp();
69
70	MutexLocker locker(fQueueLock);
71	fQueue.InsertAfter(fQueue.Tail(), entry);
72	locker.Unlock();
73
74	release_sem(fQueueSemaphore);
75	return B_OK;
76}
77
78
79status_t
80WorkQueue::LaunchWorkingThread(void* object)
81{
82	ASSERT(object != NULL);
83
84	WorkQueue* queue = reinterpret_cast<WorkQueue*>(object);
85	return queue->WorkingThread();
86}
87
88
89status_t
90WorkQueue::WorkingThread()
91{
92	while (true) {
93		object_wait_info object[2];
94		object[0].object = fThreadCancel;
95		object[0].type = B_OBJECT_TYPE_SEMAPHORE;
96		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
97
98		object[1].object = fQueueSemaphore;
99		object[1].type = B_OBJECT_TYPE_SEMAPHORE;
100		object[1].events = B_EVENT_ACQUIRE_SEMAPHORE;
101
102		status_t result = wait_for_objects(object, 2);
103
104		if (result < B_OK
105			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
106			return result;
107		} else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0)
108			continue;
109
110		acquire_sem(fQueueSemaphore);
111
112		DequeueJob();
113	}
114
115	return B_OK;
116}
117
118
119void
120WorkQueue::DequeueJob()
121{
122	MutexLocker locker(fQueueLock);
123	WorkQueueEntry* entry = fQueue.RemoveHead();
124	locker.Unlock();
125	ASSERT(entry != NULL);
126
127	void* args = entry->fArguments;
128	switch (entry->fType) {
129		case DelegationRecall:
130			JobRecall(reinterpret_cast<DelegationRecallArgs*>(args));
131			break;
132		case IORequest:
133			JobIO(reinterpret_cast<IORequestArgs*>(args));
134			break;
135	}
136
137	delete entry;
138}
139
140
141void
142WorkQueue::JobRecall(DelegationRecallArgs* args)
143{
144	ASSERT(args != NULL);
145	args->fDelegation->GetInode()->RecallDelegation(args->fTruncate);
146}
147
148
149void
150WorkQueue::JobIO(IORequestArgs* args)
151{
152	ASSERT(args != NULL);
153
154	uint64 offset = io_request_offset(args->fRequest);
155	uint64 length = io_request_length(args->fRequest);
156
157	size_t bufferLength = min_c(MAX_BUFFER_SIZE, length);
158	char* buffer = reinterpret_cast<char*>(malloc(bufferLength));
159	if (buffer == NULL) {
160		notify_io_request(args->fRequest, B_NO_MEMORY);
161		args->fInode->EndAIOOp();
162		return;
163	}
164
165	status_t result;
166	if (io_request_is_write(args->fRequest)) {
167		if (offset + length > args->fInode->MaxFileSize())
168				length = args->fInode->MaxFileSize() - offset;
169
170		uint64 position = 0;
171		do {
172			size_t size = 0;
173			size_t thisBufferLength = min_c(bufferLength, length - position);
174
175			result = read_from_io_request(args->fRequest, buffer,
176				thisBufferLength);
177
178			while (size < thisBufferLength && result == B_OK) {
179				size_t bytesWritten = thisBufferLength - size;
180				result = args->fInode->WriteDirect(NULL,
181					offset + position + size, buffer + size, &bytesWritten);
182				size += bytesWritten;
183			}
184
185			position += thisBufferLength;
186		} while (position < length && result == B_OK);
187	} else {
188		bool eof = false;
189		uint64 position = 0;
190		do {
191			size_t size = 0;
192			size_t thisBufferLength = min_c(bufferLength, length - position);
193
194			do {
195				size_t bytesRead = thisBufferLength - size;
196				result = args->fInode->ReadDirect(NULL,
197					offset + position + size, buffer + size, &bytesRead, &eof);
198				if (result != B_OK)
199					break;
200
201				result = write_to_io_request(args->fRequest, buffer + size,
202					bytesRead);
203				if (result != B_OK)
204					break;
205
206				size += bytesRead;
207			} while (size < length && result == B_OK && !eof);
208
209			position += thisBufferLength;
210		} while (position < length && result == B_OK && !eof);
211	}
212
213	free(buffer);
214
215	notify_io_request(args->fRequest, result);
216	args->fInode->EndAIOOp();
217}
218
219