1/*
2 * Copyright (c) 2004-2016 Maxim Sobolev <sobomax@FreeBSD.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <sys/cdefs.h>
28#include <err.h>
29#include <pthread.h>
30#include <stdint.h>
31#include <stdlib.h>
32
33#if defined(MKUZ_DEBUG)
34# include <assert.h>
35#endif
36
37#include "mkuzip.h"
38#include "mkuz_fqueue.h"
39#include "mkuz_conveyor.h"
40#include "mkuz_blk.h"
41#include "mkuz_blk_chain.h"
42
43struct mkuz_fifo_queue *
44mkuz_fqueue_ctor(int wakeup_len)
45{
46    struct mkuz_fifo_queue *fqp;
47
48    fqp = mkuz_safe_zmalloc(sizeof(struct mkuz_fifo_queue));
49    fqp->wakeup_len = wakeup_len;
50    if (pthread_mutex_init(&fqp->mtx, NULL) != 0) {
51        errx(1, "pthread_mutex_init() failed");
52    }
53    if (pthread_cond_init(&fqp->cvar, NULL) != 0) {
54        errx(1, "pthread_cond_init() failed");
55    }
56    return (fqp);
57}
58
59void
60mkuz_fqueue_enq(struct mkuz_fifo_queue *fqp, struct mkuz_blk *bp)
61{
62    struct mkuz_bchain_link *ip;
63
64    ip = mkuz_safe_zmalloc(sizeof(struct mkuz_bchain_link));
65    ip->this = bp;
66
67    pthread_mutex_lock(&fqp->mtx);
68    if (fqp->first != NULL) {
69        fqp->first->prev = ip;
70    } else {
71        fqp->last = ip;
72    }
73    fqp->first = ip;
74    fqp->length += 1;
75    if (fqp->length >= fqp->wakeup_len) {
76        pthread_cond_signal(&fqp->cvar);
77    }
78    pthread_mutex_unlock(&fqp->mtx);
79}
80
81#if defined(NOTYET)
82int
83mkuz_fqueue_enq_all(struct mkuz_fifo_queue *fqp, struct mkuz_bchain_link *cip_f,
84  struct mkuz_bchain_link *cip_l, int clen)
85{
86    int rval;
87
88    pthread_mutex_lock(&fqp->mtx);
89    if (fqp->first != NULL) {
90        fqp->first->prev = cip_l;
91    } else {
92        fqp->last = cip_l;
93    }
94    fqp->first = cip_f;
95    fqp->length += clen;
96    rval = fqp->length;
97    if (fqp->length >= fqp->wakeup_len) {
98        pthread_cond_signal(&fqp->cvar);
99    }
100    pthread_mutex_unlock(&fqp->mtx);
101    return (rval);
102}
103#endif
104
105static int
106mkuz_fqueue_check(struct mkuz_fifo_queue *fqp, cmp_cb_t cmp_cb, void *cap)
107{
108    struct mkuz_bchain_link *ip;
109
110    for (ip = fqp->last; ip != NULL; ip = ip->prev) {
111        if (cmp_cb(ip->this, cap)) {
112            return (1);
113        }
114    }
115    return (0);
116}
117
118struct mkuz_blk *
119mkuz_fqueue_deq_when(struct mkuz_fifo_queue *fqp, cmp_cb_t cmp_cb, void *cap)
120{
121    struct mkuz_bchain_link *ip, *newlast, *newfirst, *mip;
122    struct mkuz_blk *bp;
123
124    pthread_mutex_lock(&fqp->mtx);
125    while (fqp->last == NULL || !mkuz_fqueue_check(fqp, cmp_cb, cap)) {
126        pthread_cond_wait(&fqp->cvar, &fqp->mtx);
127    }
128    if (cmp_cb(fqp->last->this, cap)) {
129        mip = fqp->last;
130        fqp->last = mip->prev;
131        if (fqp->last == NULL) {
132#if defined(MKUZ_DEBUG)
133            assert(fqp->length == 1);
134#endif
135            fqp->first = NULL;
136        }
137    } else {
138#if defined(MKUZ_DEBUG)
139        assert(fqp->length > 1);
140#endif
141        newfirst = newlast = fqp->last;
142        mip = NULL;
143        for (ip = fqp->last->prev; ip != NULL; ip = ip->prev) {
144            if (cmp_cb(ip->this, cap)) {
145                mip = ip;
146                continue;
147            }
148            newfirst->prev = ip;
149            newfirst = ip;
150        }
151        newfirst->prev = NULL;
152        fqp->first = newfirst;
153        fqp->last = newlast;
154    }
155    fqp->length -= 1;
156    pthread_mutex_unlock(&fqp->mtx);
157    bp = mip->this;
158    free(mip);
159
160    return bp;
161}
162
163struct mkuz_blk *
164mkuz_fqueue_deq(struct mkuz_fifo_queue *fqp)
165{
166    struct mkuz_bchain_link *ip;
167    struct mkuz_blk *bp;
168
169    pthread_mutex_lock(&fqp->mtx);
170    while (fqp->last == NULL) {
171        pthread_cond_wait(&fqp->cvar, &fqp->mtx);
172    }
173#if defined(MKUZ_DEBUG)
174    assert(fqp->length > 0);
175#endif
176    ip = fqp->last;
177    fqp->last = ip->prev;
178    if (fqp->last == NULL) {
179#if defined(MKUZ_DEBUG)
180        assert(fqp->length == 1);
181#endif
182        fqp->first = NULL;
183    }
184    fqp->length -= 1;
185    pthread_mutex_unlock(&fqp->mtx);
186    bp = ip->this;
187    free(ip);
188
189    return bp;
190}
191
192#if defined(NOTYET)
193struct mkuz_bchain_link *
194mkuz_fqueue_deq_all(struct mkuz_fifo_queue *fqp, int *rclen)
195{
196    struct mkuz_bchain_link *rchain;
197
198    pthread_mutex_lock(&fqp->mtx);
199    while (fqp->last == NULL) {
200        pthread_cond_wait(&fqp->cvar, &fqp->mtx);
201    }
202#if defined(MKUZ_DEBUG)
203    assert(fqp->length > 0);
204#endif
205    rchain = fqp->last;
206    fqp->first = fqp->last = NULL;
207    *rclen = fqp->length;
208    fqp->length = 0;
209    pthread_mutex_unlock(&fqp->mtx);
210    return (rchain);
211}
212#endif
213