/* * Copyright (c) 2004-2007 Marcus Overhagen * * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated documentation * files (the "Software"), to deal in the Software without restriction, * including without limitation the rights to use, copy, modify, * merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR * OTHER DEALINGS IN THE SOFTWARE. */ #include #include #include "Packet.h" #include "PacketQueue.h" #define TRACE_PACKET_QUEUE #ifdef TRACE_PACKET_QUEUE #define TRACE printf #else #define TRACE(a...) #endif PacketQueue::PacketQueue(int max_packets) : fQueue(new Packet* [max_packets]) , fSem(create_sem(0, "packet queue sem")) , fTerminate(false) , fWriteIndex(0) , fReadIndex(0) , fMaxPackets(max_packets) , fPacketCount(0) { } PacketQueue::~PacketQueue() { Empty(); delete_sem(fSem); delete [] fQueue; } void PacketQueue::Empty() { while (fPacketCount--) { delete fQueue[fReadIndex]; fReadIndex = (fReadIndex + 1) % fMaxPackets; } } status_t PacketQueue::Insert(Packet *packet) { if (fTerminate) { return B_NOT_ALLOWED; } if (atomic_add(&fPacketCount, 1) == fMaxPackets) { atomic_add(&fPacketCount, -1); return B_WOULD_BLOCK; } fQueue[fWriteIndex] = packet; fWriteIndex = (fWriteIndex + 1) % fMaxPackets; release_sem(fSem); return B_OK; } status_t PacketQueue::Remove(Packet **packet) { if (fTerminate) { return B_NOT_ALLOWED; } if (acquire_sem(fSem) != B_OK) return B_ERROR; if (fTerminate) { return B_NOT_ALLOWED; } *packet = fQueue[fReadIndex]; atomic_add(&fPacketCount, -1); fReadIndex = (fReadIndex + 1) % fMaxPackets; return B_OK; } void PacketQueue::Flush(bigtime_t timeout) { if (fTerminate) { return; } timeout += system_time(); while (acquire_sem_etc(fSem, 1, B_ABSOLUTE_TIMEOUT, timeout) == B_OK) { if (fTerminate) { return; } Packet *packet = fQueue[fReadIndex]; atomic_add(&fPacketCount, -1); fReadIndex = (fReadIndex + 1) % fMaxPackets; delete packet; } } // peeks into queue and delivers a copy status_t PacketQueue::Peek(Packet **packet) { if (fTerminate) { return B_NOT_ALLOWED; } if (acquire_sem(fSem) != B_OK) return B_ERROR; if (fTerminate) { return B_NOT_ALLOWED; } *packet = new Packet(*fQueue[fReadIndex]); release_sem(fSem); return B_OK; } void PacketQueue::Terminate() { fTerminate = true; release_sem(fSem); } void PacketQueue::Restart() { Empty(); delete_sem(fSem); fSem = create_sem(0, "packet queue sem"); fTerminate = false; fWriteIndex = 0; fReadIndex = 0; fPacketCount = 0; }