1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22/*
23 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
24 */
25
26#include <fmd_alloc.h>
27#include <fmd_eventq.h>
28#include <fmd_module.h>
29#include <fmd_dispq.h>
30#include <fmd_subr.h>
31
32#include <fmd.h>
33
34fmd_eventq_t *
35fmd_eventq_create(fmd_module_t *mp, fmd_eventqstat_t *stats,
36    pthread_mutex_t *stats_lock, uint_t limit)
37{
38	fmd_eventq_t *eq = fmd_zalloc(sizeof (fmd_eventq_t), FMD_SLEEP);
39
40	(void) pthread_mutex_init(&eq->eq_lock, NULL);
41	(void) pthread_cond_init(&eq->eq_cv, NULL);
42
43	eq->eq_mod = mp;
44	eq->eq_stats = stats;
45	eq->eq_stats_lock = stats_lock;
46	eq->eq_limit = limit;
47	eq->eq_sgid = fmd_dispq_getgid(fmd.d_disp, eq);
48
49	return (eq);
50}
51
52void
53fmd_eventq_destroy(fmd_eventq_t *eq)
54{
55	fmd_eventqelem_t *eqe;
56
57	while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) {
58		fmd_list_delete(&eq->eq_list, eqe);
59		fmd_event_rele(eqe->eqe_event);
60		fmd_free(eqe, sizeof (fmd_eventqelem_t));
61	}
62
63	fmd_dispq_delgid(fmd.d_disp, eq->eq_sgid);
64	fmd_free(eq, sizeof (fmd_eventq_t));
65}
66
67static void
68fmd_eventq_drop(fmd_eventq_t *eq, fmd_eventqelem_t *eqe)
69{
70	(void) pthread_mutex_lock(eq->eq_stats_lock);
71	eq->eq_stats->eqs_dropped.fmds_value.ui64++;
72	(void) pthread_mutex_unlock(eq->eq_stats_lock);
73
74	fmd_event_rele(eqe->eqe_event);
75	fmd_free(eqe, sizeof (fmd_eventqelem_t));
76}
77
78void
79fmd_eventq_drop_topo(fmd_eventq_t *eq)
80{
81	fmd_eventqelem_t *eqe, *tmp;
82	boolean_t got_fm_events = B_FALSE;
83
84	/*
85	 * Here we iterate through the per-module event queue in order to remove
86	 * redundant FMD_EVT_TOPO events.  The trick is to not drop a given
87	 * topo event if there are any FM protocol events in the queue after
88	 * it, as those events need to be processed with the correct topology.
89	 */
90	(void) pthread_mutex_lock(&eq->eq_lock);
91	eqe = fmd_list_prev(&eq->eq_list);
92	while (eqe) {
93		if (FMD_EVENT_TYPE(eqe->eqe_event) == FMD_EVT_TOPO) {
94			if (!got_fm_events) {
95				tmp = eqe;
96				eqe = fmd_list_prev(eqe);
97				fmd_list_delete(&eq->eq_list, tmp);
98				eq->eq_size--;
99				fmd_eventq_drop(eq, tmp);
100			} else {
101				got_fm_events = B_FALSE;
102				eqe = fmd_list_prev(eqe);
103			}
104		} else if (FMD_EVENT_TYPE(eqe->eqe_event) == FMD_EVT_PROTOCOL) {
105			got_fm_events = B_TRUE;
106			eqe = fmd_list_prev(eqe);
107		} else
108			eqe = fmd_list_prev(eqe);
109	}
110	(void) pthread_mutex_unlock(&eq->eq_lock);
111}
112
113/*
114 * Update statistics when an event is dispatched and placed on a module's event
115 * queue.  This is essentially the same code as kstat_waitq_enter(9F).
116 */
117static void
118fmd_eventqstat_dispatch(fmd_eventq_t *eq)
119{
120	fmd_eventqstat_t *eqs = eq->eq_stats;
121	hrtime_t new, delta;
122	uint32_t wcnt;
123
124	(void) pthread_mutex_lock(eq->eq_stats_lock);
125
126	new = gethrtime();
127	delta = new - eqs->eqs_wlastupdate.fmds_value.ui64;
128	eqs->eqs_wlastupdate.fmds_value.ui64 = new;
129	wcnt = eqs->eqs_wcnt.fmds_value.ui32++;
130
131	if (wcnt != 0) {
132		eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt;
133		eqs->eqs_wtime.fmds_value.ui64 += delta;
134	}
135
136	eqs->eqs_dispatched.fmds_value.ui64++;
137	(void) pthread_mutex_unlock(eq->eq_stats_lock);
138}
139
140void
141fmd_eventq_insert_at_head(fmd_eventq_t *eq, fmd_event_t *ep)
142{
143	uint_t evt = FMD_EVENT_TYPE(ep);
144	fmd_eventqelem_t *eqe;
145	int ok;
146
147	/*
148	 * If this event queue is acting as /dev/null, bounce the reference
149	 * count to free an unreferenced event and just return immediately.
150	 */
151	if (eq->eq_limit == 0) {
152		fmd_event_hold(ep);
153		fmd_event_rele(ep);
154		return;
155	}
156
157	eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP);
158	fmd_event_hold(ep);
159	eqe->eqe_event = ep;
160
161	(void) pthread_mutex_lock(&eq->eq_lock);
162
163	if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) {
164		if (evt != FMD_EVT_CTL)
165			fmd_eventqstat_dispatch(eq);
166
167		fmd_list_prepend(&eq->eq_list, eqe);
168		eq->eq_size++;
169	}
170
171	(void) pthread_cond_broadcast(&eq->eq_cv);
172	(void) pthread_mutex_unlock(&eq->eq_lock);
173
174	if (!ok)
175		fmd_eventq_drop(eq, eqe);
176}
177
178void
179fmd_eventq_insert_at_time(fmd_eventq_t *eq, fmd_event_t *ep)
180{
181	uint_t evt = FMD_EVENT_TYPE(ep);
182	hrtime_t hrt = fmd_event_hrtime(ep);
183	fmd_eventqelem_t *eqe, *oqe;
184	int ok;
185
186	/*
187	 * If this event queue is acting as /dev/null, bounce the reference
188	 * count to free an unreferenced event and just return immediately.
189	 */
190	if (eq->eq_limit == 0) {
191		fmd_event_hold(ep);
192		fmd_event_rele(ep);
193		return;
194	}
195
196	eqe = fmd_alloc(sizeof (fmd_eventqelem_t), FMD_SLEEP);
197	fmd_event_hold(ep);
198	eqe->eqe_event = ep;
199
200	(void) pthread_mutex_lock(&eq->eq_lock);
201
202	/*
203	 * fmd makes no guarantees that events will be delivered in time order
204	 * because its transport can make no such guarantees.  Instead we make
205	 * a looser guarantee that an enqueued event will be dequeued before
206	 * any newer *pending* events according to event time.  This permits us
207	 * to state, for example, that a timer expiry event will be delivered
208	 * prior to any enqueued event whose time is after the timer expired.
209	 * We use a simple insertion sort for this task, as queue lengths are
210	 * typically short and events do *tend* to be received chronologically.
211	 */
212	for (oqe = fmd_list_prev(&eq->eq_list); oqe; oqe = fmd_list_prev(oqe)) {
213		if (hrt >= fmd_event_hrtime(oqe->eqe_event))
214			break; /* 'ep' is newer than the event in 'oqe' */
215	}
216
217	if ((ok = eq->eq_size < eq->eq_limit || evt != FMD_EVT_PROTOCOL) != 0) {
218		if (evt != FMD_EVT_CTL)
219			fmd_eventqstat_dispatch(eq);
220
221		if (oqe == NULL)
222			fmd_list_prepend(&eq->eq_list, eqe);
223		else
224			fmd_list_insert_after(&eq->eq_list, oqe, eqe);
225		eq->eq_size++;
226	}
227
228	(void) pthread_cond_broadcast(&eq->eq_cv);
229	(void) pthread_mutex_unlock(&eq->eq_lock);
230
231	if (!ok)
232		fmd_eventq_drop(eq, eqe);
233}
234
235fmd_event_t *
236fmd_eventq_delete(fmd_eventq_t *eq)
237{
238	fmd_eventqstat_t *eqs = eq->eq_stats;
239	hrtime_t new, delta;
240	uint32_t wcnt;
241
242	fmd_eventqelem_t *eqe;
243	fmd_event_t *ep;
244top:
245	(void) pthread_mutex_lock(&eq->eq_lock);
246
247	while (!(eq->eq_flags & FMD_EVENTQ_ABORT) &&
248	    (eq->eq_size == 0 || (eq->eq_flags & FMD_EVENTQ_SUSPEND)))
249		(void) pthread_cond_wait(&eq->eq_cv, &eq->eq_lock);
250
251	if (eq->eq_flags & FMD_EVENTQ_ABORT) {
252		(void) pthread_mutex_unlock(&eq->eq_lock);
253		return (NULL);
254	}
255
256	eqe = fmd_list_next(&eq->eq_list);
257	fmd_list_delete(&eq->eq_list, eqe);
258	eq->eq_size--;
259
260	(void) pthread_mutex_unlock(&eq->eq_lock);
261
262	ep = eqe->eqe_event;
263	fmd_free(eqe, sizeof (fmd_eventqelem_t));
264
265	/*
266	 * If we dequeued a control event, release it and go back to sleep.
267	 * fmd_event_rele() on the event will block as described in fmd_ctl.c.
268	 * This effectively renders control events invisible to our callers
269	 * as well as to statistics and observability tools (e.g. fmstat(1M)).
270	 */
271	if (FMD_EVENT_TYPE(ep) == FMD_EVT_CTL) {
272		fmd_event_rele(ep);
273		goto top;
274	}
275
276	/*
277	 * Before returning, update our statistics.  This code is essentially
278	 * kstat_waitq_to_runq(9F), except simplified because our queues are
279	 * always consumed by a single thread (i.e. runq len == 1).
280	 */
281	(void) pthread_mutex_lock(eq->eq_stats_lock);
282
283	new = gethrtime();
284	delta = new - eqs->eqs_wlastupdate.fmds_value.ui64;
285
286	eqs->eqs_wlastupdate.fmds_value.ui64 = new;
287	eqs->eqs_dlastupdate.fmds_value.ui64 = new;
288
289	ASSERT(eqs->eqs_wcnt.fmds_value.ui32 != 0);
290	wcnt = eqs->eqs_wcnt.fmds_value.ui32--;
291
292	eqs->eqs_wlentime.fmds_value.ui64 += delta * wcnt;
293	eqs->eqs_wtime.fmds_value.ui64 += delta;
294
295	if (FMD_EVENT_TYPE(ep) == FMD_EVT_PROTOCOL)
296		eqs->eqs_prdequeued.fmds_value.ui64++;
297
298	eqs->eqs_dequeued.fmds_value.ui64++;
299	(void) pthread_mutex_unlock(eq->eq_stats_lock);
300
301	return (ep);
302}
303
304/*
305 * Update statistics when an event is done being processed by the eventq's
306 * consumer thread.  This is essentially kstat_runq_exit(9F) simplified for
307 * our principle that a single thread consumes the queue (i.e. runq len == 1).
308 */
309void
310fmd_eventq_done(fmd_eventq_t *eq)
311{
312	fmd_eventqstat_t *eqs = eq->eq_stats;
313	hrtime_t new, delta;
314
315	(void) pthread_mutex_lock(eq->eq_stats_lock);
316
317	new = gethrtime();
318	delta = new - eqs->eqs_dlastupdate.fmds_value.ui64;
319
320	eqs->eqs_dlastupdate.fmds_value.ui64 = new;
321	eqs->eqs_dtime.fmds_value.ui64 += delta;
322
323	(void) pthread_mutex_unlock(eq->eq_stats_lock);
324}
325
326void
327fmd_eventq_cancel(fmd_eventq_t *eq, uint_t type, void *data)
328{
329	fmd_eventqelem_t *eqe, *nqe;
330
331	(void) pthread_mutex_lock(&eq->eq_lock);
332
333	for (eqe = fmd_list_next(&eq->eq_list); eqe != NULL; eqe = nqe) {
334		nqe = fmd_list_next(eqe);
335
336		if (fmd_event_match(eqe->eqe_event, type, data)) {
337			fmd_list_delete(&eq->eq_list, eqe);
338			eq->eq_size--;
339			fmd_event_rele(eqe->eqe_event);
340			fmd_free(eqe, sizeof (fmd_eventqelem_t));
341		}
342	}
343
344	(void) pthread_mutex_unlock(&eq->eq_lock);
345}
346
347void
348fmd_eventq_suspend(fmd_eventq_t *eq)
349{
350	(void) pthread_mutex_lock(&eq->eq_lock);
351	eq->eq_flags |= FMD_EVENTQ_SUSPEND;
352	(void) pthread_mutex_unlock(&eq->eq_lock);
353}
354
355void
356fmd_eventq_resume(fmd_eventq_t *eq)
357{
358	(void) pthread_mutex_lock(&eq->eq_lock);
359	eq->eq_flags &= ~FMD_EVENTQ_SUSPEND;
360	(void) pthread_cond_broadcast(&eq->eq_cv);
361	(void) pthread_mutex_unlock(&eq->eq_lock);
362}
363
364void
365fmd_eventq_abort(fmd_eventq_t *eq)
366{
367	fmd_eventqelem_t *eqe;
368
369	(void) pthread_mutex_lock(&eq->eq_lock);
370
371	while ((eqe = fmd_list_next(&eq->eq_list)) != NULL) {
372		fmd_list_delete(&eq->eq_list, eqe);
373		fmd_event_rele(eqe->eqe_event);
374		fmd_free(eqe, sizeof (fmd_eventqelem_t));
375	}
376
377	eq->eq_flags |= FMD_EVENTQ_ABORT;
378	(void) pthread_cond_broadcast(&eq->eq_cv);
379	(void) pthread_mutex_unlock(&eq->eq_lock);
380}
381