/* * Copyright 2012 Haiku, Inc. All rights reserved. * Distributed under the terms of the MIT License. * * Authors: * Paweł Dziepak, pdziepak@quarnos.org */ #include "WorkQueue.h" #include #define MAX_BUFFER_SIZE (1024 * 1024) WorkQueue* gWorkQueue = NULL; WorkQueue::WorkQueue() : fQueueSemaphore(create_sem(0, NULL)), fThreadCancel(create_sem(0, NULL)) { mutex_init(&fQueueLock, NULL); fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread, "NFSv4 Work Queue", B_NORMAL_PRIORITY, this); if (fThread < B_OK) { fInitError = fThread; return; } status_t result = resume_thread(fThread); if (result != B_OK) { kill_thread(fThread); fInitError = result; return; } fInitError = B_OK; } WorkQueue::~WorkQueue() { release_sem(fThreadCancel); status_t result; wait_for_thread(fThread, &result); mutex_destroy(&fQueueLock); delete_sem(fThreadCancel); delete_sem(fQueueSemaphore); } status_t WorkQueue::EnqueueJob(JobType type, void* args) { WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry; if (entry == NULL) return B_NO_MEMORY; entry->fType = type; entry->fArguments = args; if (type == IORequest) reinterpret_cast(args)->fInode->BeginAIOOp(); MutexLocker locker(fQueueLock); fQueue.InsertAfter(fQueue.Tail(), entry); locker.Unlock(); release_sem(fQueueSemaphore); return B_OK; } status_t WorkQueue::LaunchWorkingThread(void* object) { ASSERT(object != NULL); WorkQueue* queue = reinterpret_cast(object); return queue->WorkingThread(); } status_t WorkQueue::WorkingThread() { while (true) { object_wait_info object[2]; object[0].object = fThreadCancel; object[0].type = B_OBJECT_TYPE_SEMAPHORE; object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; object[1].object = fQueueSemaphore; object[1].type = B_OBJECT_TYPE_SEMAPHORE; object[1].events = B_EVENT_ACQUIRE_SEMAPHORE; status_t result = wait_for_objects(object, 2); if (result < B_OK || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { return result; } else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0) continue; acquire_sem(fQueueSemaphore); DequeueJob(); } return B_OK; } void WorkQueue::DequeueJob() { MutexLocker locker(fQueueLock); WorkQueueEntry* entry = fQueue.RemoveHead(); locker.Unlock(); ASSERT(entry != NULL); void* args = entry->fArguments; switch (entry->fType) { case DelegationRecall: JobRecall(reinterpret_cast(args)); break; case IORequest: JobIO(reinterpret_cast(args)); break; } delete entry; } void WorkQueue::JobRecall(DelegationRecallArgs* args) { ASSERT(args != NULL); args->fDelegation->GetInode()->RecallDelegation(args->fTruncate); } void WorkQueue::JobIO(IORequestArgs* args) { ASSERT(args != NULL); uint64 offset = io_request_offset(args->fRequest); uint64 length = io_request_length(args->fRequest); size_t bufferLength = min_c(MAX_BUFFER_SIZE, length); char* buffer = reinterpret_cast(malloc(bufferLength)); if (buffer == NULL) { notify_io_request(args->fRequest, B_NO_MEMORY); args->fInode->EndAIOOp(); return; } status_t result; if (io_request_is_write(args->fRequest)) { if (offset + length > args->fInode->MaxFileSize()) length = args->fInode->MaxFileSize() - offset; uint64 position = 0; do { size_t size = 0; size_t thisBufferLength = min_c(bufferLength, length - position); result = read_from_io_request(args->fRequest, buffer, thisBufferLength); while (size < thisBufferLength && result == B_OK) { size_t bytesWritten = thisBufferLength - size; result = args->fInode->WriteDirect(NULL, offset + position + size, buffer + size, &bytesWritten); size += bytesWritten; } position += thisBufferLength; } while (position < length && result == B_OK); } else { bool eof = false; uint64 position = 0; do { size_t size = 0; size_t thisBufferLength = min_c(bufferLength, length - position); do { size_t bytesRead = thisBufferLength - size; result = args->fInode->ReadDirect(NULL, offset + position + size, buffer + size, &bytesRead, &eof); if (result != B_OK) break; result = write_to_io_request(args->fRequest, buffer + size, bytesRead); if (result != B_OK) break; size += bytesRead; } while (size < length && result == B_OK && !eof); position += thisBufferLength; } while (position < length && result == B_OK && !eof); } free(buffer); notify_io_request(args->fRequest, result); args->fInode->EndAIOOp(); }