1286705Smav/*
2286705Smav * CDDL HEADER START
3286705Smav *
4286705Smav * This file and its contents are supplied under the terms of the
5286705Smav * Common Development and Distribution License ("CDDL"), version 1.0.
6286705Smav * You may only use this file in accordance with the terms of version
7286705Smav * 1.0 of the CDDL.
8286705Smav *
9286705Smav * A full copy of the text of the CDDL should have accompanied this
10286705Smav * source.  A copy of the CDDL is also available via the Internet at
11286705Smav * http://www.illumos.org/license/CDDL.
12286705Smav *
13286705Smav * CDDL HEADER END
14286705Smav */
15286705Smav/*
16286705Smav * Copyright (c) 2014 by Delphix. All rights reserved.
17286705Smav */
18286705Smav
19286705Smav#include	<sys/bqueue.h>
20286705Smav#include	<sys/zfs_context.h>
21286705Smav
22286705Smavstatic inline bqueue_node_t *
23286705Smavobj2node(bqueue_t *q, void *data)
24286705Smav{
25286705Smav	return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26286705Smav}
27286705Smav
28286705Smav/*
29286705Smav * Initialize a blocking queue  The maximum capacity of the queue is set to
30286705Smav * size.  Types that want to be stored in a bqueue must contain a bqueue_node_t,
31286705Smav * and offset should give its offset from the start of the struct.  Return 0 on
32286705Smav * success, or -1 on failure.
33286705Smav */
34286705Smavint
35286705Smavbqueue_init(bqueue_t *q, uint64_t size, size_t node_offset)
36286705Smav{
37286705Smav	list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
38286705Smav	    node_offset + offsetof(bqueue_node_t, bqn_node));
39286705Smav	cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
40286705Smav	cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
41286705Smav	mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
42286705Smav	q->bq_node_offset = node_offset;
43286705Smav	q->bq_size = 0;
44286705Smav	q->bq_maxsize = size;
45286705Smav	return (0);
46286705Smav}
47286705Smav
48286705Smav/*
49286705Smav * Destroy a blocking queue.  This function asserts that there are no
50286705Smav * elements in the queue, and no one is blocked on the condition
51286705Smav * variables.
52286705Smav */
53286705Smavvoid
54286705Smavbqueue_destroy(bqueue_t *q)
55286705Smav{
56286705Smav	ASSERT0(q->bq_size);
57286705Smav	cv_destroy(&q->bq_add_cv);
58286705Smav	cv_destroy(&q->bq_pop_cv);
59286705Smav	mutex_destroy(&q->bq_lock);
60286705Smav	list_destroy(&q->bq_list);
61286705Smav}
62286705Smav
63286705Smav/*
64286705Smav * Add data to q, consuming size units of capacity.  If there is insufficient
65286705Smav * capacity to consume size units, block until capacity exists.  Asserts size is
66286705Smav * > 0.
67286705Smav */
68286705Smavvoid
69286705Smavbqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
70286705Smav{
71286705Smav	ASSERT3U(item_size, >, 0);
72286705Smav	ASSERT3U(item_size, <, q->bq_maxsize);
73286705Smav	mutex_enter(&q->bq_lock);
74286705Smav	obj2node(q, data)->bqn_size = item_size;
75286705Smav	while (q->bq_size + item_size > q->bq_maxsize) {
76286705Smav		cv_wait(&q->bq_add_cv, &q->bq_lock);
77286705Smav	}
78286705Smav	q->bq_size += item_size;
79286705Smav	list_insert_tail(&q->bq_list, data);
80286705Smav	cv_signal(&q->bq_pop_cv);
81286705Smav	mutex_exit(&q->bq_lock);
82286705Smav}
83286705Smav/*
84286705Smav * Take the first element off of q.  If there are no elements on the queue, wait
85286705Smav * until one is put there.  Return the removed element.
86286705Smav */
87286705Smavvoid *
88286705Smavbqueue_dequeue(bqueue_t *q)
89286705Smav{
90286705Smav	void *ret;
91286705Smav	uint64_t item_size;
92286705Smav	mutex_enter(&q->bq_lock);
93286705Smav	while (q->bq_size == 0) {
94286705Smav		cv_wait(&q->bq_pop_cv, &q->bq_lock);
95286705Smav	}
96286705Smav	ret = list_remove_head(&q->bq_list);
97286705Smav	item_size = obj2node(q, ret)->bqn_size;
98286705Smav	q->bq_size -= item_size;
99286705Smav	mutex_exit(&q->bq_lock);
100286705Smav	cv_signal(&q->bq_add_cv);
101286705Smav	return (ret);
102286705Smav}
103286705Smav
104286705Smav/*
105286705Smav * Returns true if the space used is 0.
106286705Smav */
107286705Smavboolean_t
108286705Smavbqueue_empty(bqueue_t *q)
109286705Smav{
110286705Smav	return (q->bq_size == 0);
111286705Smav}
112