1286705Smav/* 2286705Smav * CDDL HEADER START 3286705Smav * 4286705Smav * This file and its contents are supplied under the terms of the 5286705Smav * Common Development and Distribution License ("CDDL"), version 1.0. 6286705Smav * You may only use this file in accordance with the terms of version 7286705Smav * 1.0 of the CDDL. 8286705Smav * 9286705Smav * A full copy of the text of the CDDL should have accompanied this 10286705Smav * source. A copy of the CDDL is also available via the Internet at 11286705Smav * http://www.illumos.org/license/CDDL. 12286705Smav * 13286705Smav * CDDL HEADER END 14286705Smav */ 15286705Smav/* 16286705Smav * Copyright (c) 2014 by Delphix. All rights reserved. 17286705Smav */ 18286705Smav 19286705Smav#include <sys/bqueue.h> 20286705Smav#include <sys/zfs_context.h> 21286705Smav 22286705Smavstatic inline bqueue_node_t * 23286705Smavobj2node(bqueue_t *q, void *data) 24286705Smav{ 25286705Smav return ((bqueue_node_t *)((char *)data + q->bq_node_offset)); 26286705Smav} 27286705Smav 28286705Smav/* 29286705Smav * Initialize a blocking queue The maximum capacity of the queue is set to 30286705Smav * size. Types that want to be stored in a bqueue must contain a bqueue_node_t, 31286705Smav * and offset should give its offset from the start of the struct. Return 0 on 32286705Smav * success, or -1 on failure. 33286705Smav */ 34286705Smavint 35286705Smavbqueue_init(bqueue_t *q, uint64_t size, size_t node_offset) 36286705Smav{ 37286705Smav list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t), 38286705Smav node_offset + offsetof(bqueue_node_t, bqn_node)); 39286705Smav cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL); 40286705Smav cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL); 41286705Smav mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL); 42286705Smav q->bq_node_offset = node_offset; 43286705Smav q->bq_size = 0; 44286705Smav q->bq_maxsize = size; 45286705Smav return (0); 46286705Smav} 47286705Smav 48286705Smav/* 49286705Smav * Destroy a blocking queue. This function asserts that there are no 50286705Smav * elements in the queue, and no one is blocked on the condition 51286705Smav * variables. 52286705Smav */ 53286705Smavvoid 54286705Smavbqueue_destroy(bqueue_t *q) 55286705Smav{ 56286705Smav ASSERT0(q->bq_size); 57286705Smav cv_destroy(&q->bq_add_cv); 58286705Smav cv_destroy(&q->bq_pop_cv); 59286705Smav mutex_destroy(&q->bq_lock); 60286705Smav list_destroy(&q->bq_list); 61286705Smav} 62286705Smav 63286705Smav/* 64286705Smav * Add data to q, consuming size units of capacity. If there is insufficient 65286705Smav * capacity to consume size units, block until capacity exists. Asserts size is 66286705Smav * > 0. 67286705Smav */ 68286705Smavvoid 69286705Smavbqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size) 70286705Smav{ 71286705Smav ASSERT3U(item_size, >, 0); 72286705Smav ASSERT3U(item_size, <, q->bq_maxsize); 73286705Smav mutex_enter(&q->bq_lock); 74286705Smav obj2node(q, data)->bqn_size = item_size; 75286705Smav while (q->bq_size + item_size > q->bq_maxsize) { 76286705Smav cv_wait(&q->bq_add_cv, &q->bq_lock); 77286705Smav } 78286705Smav q->bq_size += item_size; 79286705Smav list_insert_tail(&q->bq_list, data); 80286705Smav cv_signal(&q->bq_pop_cv); 81286705Smav mutex_exit(&q->bq_lock); 82286705Smav} 83286705Smav/* 84286705Smav * Take the first element off of q. If there are no elements on the queue, wait 85286705Smav * until one is put there. Return the removed element. 86286705Smav */ 87286705Smavvoid * 88286705Smavbqueue_dequeue(bqueue_t *q) 89286705Smav{ 90286705Smav void *ret; 91286705Smav uint64_t item_size; 92286705Smav mutex_enter(&q->bq_lock); 93286705Smav while (q->bq_size == 0) { 94286705Smav cv_wait(&q->bq_pop_cv, &q->bq_lock); 95286705Smav } 96286705Smav ret = list_remove_head(&q->bq_list); 97286705Smav item_size = obj2node(q, ret)->bqn_size; 98286705Smav q->bq_size -= item_size; 99286705Smav mutex_exit(&q->bq_lock); 100286705Smav cv_signal(&q->bq_add_cv); 101286705Smav return (ret); 102286705Smav} 103286705Smav 104286705Smav/* 105286705Smav * Returns true if the space used is 0. 106286705Smav */ 107286705Smavboolean_t 108286705Smavbqueue_empty(bqueue_t *q) 109286705Smav{ 110286705Smav return (q->bq_size == 0); 111286705Smav} 112