1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2006,2008 Oracle. All rights reserved. 5 * 6 * $Id: repmgr_queue.c,v 1.12 2008/01/08 20:58:48 bostic Exp $ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#include "db_int.h" 13 14typedef STAILQ_HEAD(__repmgr_q_header, __repmgr_message) QUEUE_HEADER; 15struct __repmgr_queue { 16 int size; 17 QUEUE_HEADER header; 18}; 19 20/* 21 * PUBLIC: int __repmgr_queue_create __P((ENV *, DB_REP *)); 22 */ 23int 24__repmgr_queue_create(env, db_rep) 25 ENV *env; 26 DB_REP *db_rep; 27{ 28 REPMGR_QUEUE *q; 29 int ret; 30 31 if ((ret = __os_calloc(env, 1, sizeof(REPMGR_QUEUE), &q)) != 0) 32 return (ret); 33 q->size = 0; 34 STAILQ_INIT(&q->header); 35 db_rep->input_queue = q; 36 return (0); 37} 38 39/* 40 * Frees not only the queue header, but also any messages that may be on it, 41 * along with their data buffers. 42 * 43 * PUBLIC: void __repmgr_queue_destroy __P((ENV *)); 44 */ 45void 46__repmgr_queue_destroy(env) 47 ENV *env; 48{ 49 REPMGR_MESSAGE *m; 50 REPMGR_QUEUE *q; 51 52 if ((q = env->rep_handle->input_queue) == NULL) 53 return; 54 55 while (!STAILQ_EMPTY(&q->header)) { 56 m = STAILQ_FIRST(&q->header); 57 STAILQ_REMOVE_HEAD(&q->header, entries); 58 __os_free(env, m); 59 } 60 __os_free(env, q); 61} 62 63/* 64 * PUBLIC: int __repmgr_queue_get __P((ENV *, REPMGR_MESSAGE **)); 65 * 66 * Get the first input message from the queue and return it to the caller. The 67 * caller hereby takes responsibility for the entire message buffer, and should 68 * free it when done. 69 * 70 * Note that caller is NOT expected to hold the mutex. This is asymmetric with 71 * put(), because put() is expected to be called in a loop after select, where 72 * it's already necessary to be holding the mutex. 73 */ 74int 75__repmgr_queue_get(env, msgp) 76 ENV *env; 77 REPMGR_MESSAGE **msgp; 78{ 79 DB_REP *db_rep; 80 REPMGR_MESSAGE *m; 81 REPMGR_QUEUE *q; 82 int ret; 83 84 ret = 0; 85 db_rep = env->rep_handle; 86 q = db_rep->input_queue; 87 88 LOCK_MUTEX(db_rep->mutex); 89 while (STAILQ_EMPTY(&q->header) && !db_rep->finished) { 90#ifdef DB_WIN32 91 if (!ResetEvent(db_rep->queue_nonempty)) { 92 ret = GetLastError(); 93 goto err; 94 } 95 if (SignalObjectAndWait(db_rep->mutex, db_rep->queue_nonempty, 96 INFINITE, FALSE) != WAIT_OBJECT_0) { 97 ret = GetLastError(); 98 goto err; 99 } 100 LOCK_MUTEX(db_rep->mutex); 101#else 102 if ((ret = pthread_cond_wait(&db_rep->queue_nonempty, 103 &db_rep->mutex)) != 0) 104 goto err; 105#endif 106 } 107 if (db_rep->finished) 108 ret = DB_REP_UNAVAIL; 109 else { 110 m = STAILQ_FIRST(&q->header); 111 STAILQ_REMOVE_HEAD(&q->header, entries); 112 q->size--; 113 *msgp = m; 114 } 115 116err: 117 UNLOCK_MUTEX(db_rep->mutex); 118 return (ret); 119} 120 121/* 122 * PUBLIC: int __repmgr_queue_put __P((ENV *, REPMGR_MESSAGE *)); 123 * 124 * !!! 125 * Caller must hold repmgr->mutex. 126 */ 127int 128__repmgr_queue_put(env, msg) 129 ENV *env; 130 REPMGR_MESSAGE *msg; 131{ 132 DB_REP *db_rep; 133 REPMGR_QUEUE *q; 134 135 db_rep = env->rep_handle; 136 q = db_rep->input_queue; 137 138 STAILQ_INSERT_TAIL(&q->header, msg, entries); 139 q->size++; 140 141 return (__repmgr_signal(&db_rep->queue_nonempty)); 142} 143 144/* 145 * PUBLIC: int __repmgr_queue_size __P((ENV *)); 146 * 147 * !!! 148 * Caller must hold repmgr->mutex. 149 */ 150int 151__repmgr_queue_size(env) 152 ENV *env; 153{ 154 return (env->rep_handle->input_queue->size); 155} 156