1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22/*
23 * Portions copyright (c) 2013, Joyent, Inc. All rights reserved.
24 */
25
26/*
27 * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
28 * Use is subject to license terms.
29 */
30
31#pragma ident	"@(#)dt_aggregate.c	1.14	08/01/29 SMI"
32
33#include <stdlib.h>
34#include <strings.h>
35#include <errno.h>
36#include <unistd.h>
37#include <dt_impl.h>
38#include <assert.h>
39#include <alloca.h>
40#include <limits.h>
41
42#define	DTRACE_AHASHSIZE	32779		/* big 'ol prime */
43
44/*
45 * Because qsort(3C) does not allow an argument to be passed to a comparison
46 * function, the variables that affect comparison must regrettably be global;
47 * they are protected by a global static lock, dt_qsort_lock.
48 */
49static pthread_mutex_t dt_qsort_lock = PTHREAD_MUTEX_INITIALIZER;
50
51static int dt_revsort;
52static int dt_keysort;
53static int dt_keypos;
54
55#define	DT_LESSTHAN	(dt_revsort == 0 ? -1 : 1)
56#define	DT_GREATERTHAN	(dt_revsort == 0 ? 1 : -1)
57
58static void
59dt_aggregate_count(int64_t *existing, int64_t *new, size_t size)
60{
61	int i;
62
63	for (i = 0; i < size / sizeof (int64_t); i++)
64		existing[i] = existing[i] + new[i];
65}
66
67static int
68dt_aggregate_countcmp(int64_t *lhs, int64_t *rhs)
69{
70	int64_t lvar = *lhs;
71	int64_t rvar = *rhs;
72
73	if (lvar < rvar)
74		return (DT_LESSTHAN);
75
76	if (lvar > rvar)
77		return (DT_GREATERTHAN);
78
79	return (0);
80}
81
82/*ARGSUSED*/
83static void
84dt_aggregate_min(int64_t *existing, int64_t *new, size_t size)
85{
86	if (*new < *existing)
87		*existing = *new;
88}
89
90/*ARGSUSED*/
91static void
92dt_aggregate_max(int64_t *existing, int64_t *new, size_t size)
93{
94	if (*new > *existing)
95		*existing = *new;
96}
97
98static int
99dt_aggregate_averagecmp(int64_t *lhs, int64_t *rhs)
100{
101	int64_t lavg = lhs[0] ? (lhs[1] / lhs[0]) : 0;
102	int64_t ravg = rhs[0] ? (rhs[1] / rhs[0]) : 0;
103
104	if (lavg < ravg)
105		return (DT_LESSTHAN);
106
107	if (lavg > ravg)
108		return (DT_GREATERTHAN);
109
110	return (0);
111}
112
113static int
114dt_aggregate_stddevcmp(int64_t *lhs, int64_t *rhs)
115{
116	uint64_t lsd = dt_stddev((uint64_t *)lhs, 1);
117	uint64_t rsd = dt_stddev((uint64_t *)rhs, 1);
118
119	if (lsd < rsd)
120		return (DT_LESSTHAN);
121
122	if (lsd > rsd)
123		return (DT_GREATERTHAN);
124
125	return (0);
126}
127
128/*ARGSUSED*/
129static void
130dt_aggregate_lquantize(int64_t *existing, int64_t *new, size_t size)
131{
132	int64_t arg = *existing++;
133	uint16_t levels = DTRACE_LQUANTIZE_LEVELS(arg);
134	int i;
135
136	for (i = 0; i <= levels + 1; i++)
137		existing[i] = existing[i] + new[i + 1];
138}
139
140static long double
141dt_aggregate_lquantizedsum(int64_t *lquanta)
142{
143	int64_t arg = *lquanta++;
144	int32_t base = DTRACE_LQUANTIZE_BASE(arg);
145	uint16_t step = DTRACE_LQUANTIZE_STEP(arg);
146	uint16_t levels = DTRACE_LQUANTIZE_LEVELS(arg), i;
147	long double total = (long double)lquanta[0] * (long double)(base - 1);
148
149	for (i = 0; i < levels; base += step, i++)
150		total += (long double)lquanta[i + 1] * (long double)base;
151
152	return (total + (long double)lquanta[levels + 1] *
153	    (long double)(base + 1));
154}
155
156static int64_t
157dt_aggregate_lquantizedzero(int64_t *lquanta)
158{
159	int64_t arg = *lquanta++;
160	int32_t base = DTRACE_LQUANTIZE_BASE(arg);
161	uint16_t step = DTRACE_LQUANTIZE_STEP(arg);
162	uint16_t levels = DTRACE_LQUANTIZE_LEVELS(arg), i;
163
164	if (base - 1 == 0)
165		return (lquanta[0]);
166
167	for (i = 0; i < levels; base += step, i++) {
168		if (base != 0)
169			continue;
170
171		return (lquanta[i + 1]);
172	}
173
174	if (base + 1 == 0)
175		return (lquanta[levels + 1]);
176
177	return (0);
178}
179
180static int
181dt_aggregate_lquantizedcmp(int64_t *lhs, int64_t *rhs)
182{
183	long double lsum = dt_aggregate_lquantizedsum(lhs);
184	long double rsum = dt_aggregate_lquantizedsum(rhs);
185	int64_t lzero, rzero;
186
187	if (lsum < rsum)
188		return (DT_LESSTHAN);
189
190	if (lsum > rsum)
191		return (DT_GREATERTHAN);
192
193	/*
194	 * If they're both equal, then we will compare based on the weights at
195	 * zero.  If the weights at zero are equal (or if zero is not within
196	 * the range of the linear quantization), then this will be judged a
197	 * tie and will be resolved based on the key comparison.
198	 */
199	lzero = dt_aggregate_lquantizedzero(lhs);
200	rzero = dt_aggregate_lquantizedzero(rhs);
201
202	if (lzero < rzero)
203		return (DT_LESSTHAN);
204
205	if (lzero > rzero)
206		return (DT_GREATERTHAN);
207
208	return (0);
209}
210
211/*ARGSUSED*/
212static void
213dt_aggregate_llquantize(int64_t *existing, int64_t *new, size_t size)
214{
215    int i;
216    for (i = 1; i < size / sizeof(uint64_t); ++i)
217        existing[i] = existing[i] + new[i];
218}
219
220static long double
221dt_aggregate_llquantizedsum(int64_t *llquanta)
222{
223		int64_t arg = *llquanta++;
224		int16_t factor  = DTRACE_LLQUANTIZE_FACTOR(arg);
225		int16_t low     = DTRACE_LLQUANTIZE_LOW(arg);
226		int16_t high    = DTRACE_LLQUANTIZE_HIGH(arg);
227		int16_t nsteps  = DTRACE_LLQUANTIZE_NSTEP(arg);
228		int64_t value = 1, next, step;
229		int bin = 0, order;
230		long double total;
231
232		assert(nsteps >= factor);
233		assert(nsteps % factor == 0);
234
235		/* Looks for the first value of the serie */
236		for (order = 0; order < low; order++)
237				value *= factor;
238
239		total = (long double)llquanta[bin++] * (long double)(value - 1);
240		next = value * factor;
241		step = next > nsteps ? next / nsteps : 1;
242
243		/* Go through all the bins of each order of magnitudes */
244		while (order <= high)
245		{
246				assert(value < next);
247				total += (long double)(llquanta[bin++]) * (long double)(value);
248
249				if ((value += step) != next)
250						continue;
251
252				next = value * factor;
253				step = next > nsteps ? next / nsteps : 1;
254				++order;
255		}
256
257		return (total + (long double)llquanta[bin] * (long double)value);
258}
259
260static int
261dt_aggregate_llquantizedcmp(int64_t *lhs, int64_t *rhs)
262{
263		long double lsum = dt_aggregate_llquantizedsum(lhs);
264		long double rsum = dt_aggregate_llquantizedsum(rhs);
265		int64_t lzero, rzero;
266
267		if (lsum < rsum)
268				return (DT_LESSTHAN);
269		if (lsum > rsum)
270				return (DT_GREATERTHAN);
271
272		/*
273		 * If they're both equal, then we will compare based on the weights at
274		 * zero.  If the weights at zero are equal (or if zero is not within
275		 * the range of the linear quantization), then this will be judged a
276		 * tie and will be resolved based on the key comparison.
277		 */
278		lzero = lhs[1];
279		rzero = rhs[1];
280
281		if (lzero < rzero)
282				return (DT_LESSTHAN);
283		if (lzero > rzero)
284				return (DT_GREATERTHAN);
285
286		return (0);
287}
288
289static int
290dt_aggregate_quantizedcmp(int64_t *lhs, int64_t *rhs)
291{
292	int nbuckets = DTRACE_QUANTIZE_NBUCKETS, i;
293	long double ltotal = 0, rtotal = 0;
294	int64_t lzero, rzero;
295
296	for (i = 0; i < nbuckets; i++) {
297		int64_t bucketval = DTRACE_QUANTIZE_BUCKETVAL(i);
298
299		if (bucketval == 0) {
300			lzero = lhs[i];
301			rzero = rhs[i];
302		}
303
304		ltotal += (long double)bucketval * (long double)lhs[i];
305		rtotal += (long double)bucketval * (long double)rhs[i];
306	}
307
308	if (ltotal < rtotal)
309		return (DT_LESSTHAN);
310
311	if (ltotal > rtotal)
312		return (DT_GREATERTHAN);
313
314	/*
315	 * If they're both equal, then we will compare based on the weights at
316	 * zero.  If the weights at zero are equal, then this will be judged a
317	 * tie and will be resolved based on the key comparison.
318	 */
319	if (lzero < rzero)
320		return (DT_LESSTHAN);
321
322	if (lzero > rzero)
323		return (DT_GREATERTHAN);
324
325	return (0);
326}
327
328static void
329dt_aggregate_usym(dtrace_hdl_t *dtp, uint64_t *data)
330{
331	uint64_t pid = data[0];
332	uint64_t *pc = &data[1];
333	struct ps_prochandle *P;
334	GElf_Sym sym;
335
336	if (dtp->dt_vector != NULL)
337		return;
338
339	if ((P = dt_proc_grab(dtp, pid, PGRAB_RDONLY | PGRAB_FORCE, 0)) == NULL)
340		return;
341
342	dt_proc_lock(dtp, P);
343
344	if (Plookup_by_addr(P, *pc, NULL, 0, &sym) == 0)
345		*pc = sym.st_value;
346
347	dt_proc_unlock(dtp, P);
348	dt_proc_release(dtp, P);
349}
350
351static void
352dt_aggregate_umod(dtrace_hdl_t *dtp, uint64_t *data)
353{
354	uint64_t pid = data[0];
355	uint64_t *pc = &data[1];
356	struct ps_prochandle *P;
357	prmap_t thread_local_map;
358	const prmap_t *map;
359
360	if (dtp->dt_vector != NULL)
361		return;
362
363	if ((P = dt_proc_grab(dtp, pid, PGRAB_RDONLY | PGRAB_FORCE, 0)) == NULL)
364		return;
365
366	dt_proc_lock(dtp, P);
367
368	if ((map = Paddr_to_map(P, *pc, &thread_local_map)) != NULL)
369		*pc = map->pr_vaddr;
370
371	dt_proc_unlock(dtp, P);
372	dt_proc_release(dtp, P);
373}
374
375static void
376dt_aggregate_sym(dtrace_hdl_t *dtp, uint64_t *data)
377{
378	GElf_Sym sym;
379	uint64_t *pc = data;
380
381	if (dtrace_lookup_by_addr(dtp, *pc, NULL, 0, &sym, NULL) == 0)
382		*pc = sym.st_value;
383}
384
385static void
386dt_aggregate_mod(dtrace_hdl_t *dtp, uint64_t *data)
387{
388	uint64_t *pc = data;
389	dt_module_t *dmp;
390
391	if (dtp->dt_vector != NULL) {
392		/*
393		 * We don't have a way of just getting the module for a
394		 * vectored open, and it doesn't seem to be worth defining
395		 * one.  This means that use of mod() won't get true
396		 * aggregation in the postmortem case (some modules may
397		 * appear more than once in aggregation output).  It seems
398		 * unlikely that anyone will ever notice or care...
399		 */
400		return;
401	}
402
403	for (dmp = dt_list_next(&dtp->dt_modlist); dmp != NULL;
404	    dmp = dt_list_next(dmp)) {
405		if (*pc - dmp->dm_text_va < dmp->dm_text_size) {
406			*pc = dmp->dm_text_va;
407			return;
408		}
409	}
410}
411
412static dtrace_aggvarid_t
413dt_aggregate_aggvarid(dt_ahashent_t *ent)
414{
415	dtrace_aggdesc_t *agg = ent->dtahe_data.dtada_desc;
416	caddr_t data = ent->dtahe_data.dtada_data;
417	dtrace_recdesc_t *rec = agg->dtagd_rec;
418
419	/*
420	 * First, we'll check the variable ID in the aggdesc.  If it's valid,
421	 * we'll return it.  If not, we'll use the compiler-generated ID
422	 * present as the first record.
423	 */
424	if (agg->dtagd_varid != DTRACE_AGGVARIDNONE)
425		return (agg->dtagd_varid);
426
427	agg->dtagd_varid = *((dtrace_aggvarid_t *)(uintptr_t)(data +
428	    rec->dtrd_offset));
429
430	return (agg->dtagd_varid);
431}
432
433#pragma mark vvv modified functions. separate collect/analyze phases vvv
434
435static int
436dt_aggregate_collect_cpu(dtrace_hdl_t *dtp, dtrace_bufdesc_t **bufr, processorid_t cpu)
437{
438	//dtrace_epid_t id;
439	//uint64_t hashval;
440	//size_t offs, roffs, size, ndx;
441	//int i, j, rval;
442	//caddr_t addr, data;
443	//dtrace_recdesc_t *rec;
444	//dt_aggregate_t *agp = &dtp->dt_aggregate;
445	//dtrace_aggdesc_t *agg;
446	//dt_ahash_t *hash = &agp->dtat_hash;
447	//dt_ahashent_t *h;
448////	dtrace_bufdesc_t b = agp->dtat_buf, *buf = &b;
449	dtrace_bufdesc_t *buf = *bufr;
450	//dtrace_aggdata_t *aggdata;
451	//int flags = agp->dtat_flags;
452
453	buf->dtbd_cpu = cpu;
454
455	if (dt_ioctl(dtp, DTRACEIOC_AGGSNAP, buf) == -1) {
456		if (errno == ENOENT) {
457			/*
458			 * If that failed with ENOENT, it may be because the
459			 * CPU was unconfigured.  This is okay; we'll just
460			 * do nothing but return success.
461			 */
462			return (0);
463		}
464
465		return (dt_set_errno(dtp, errno));
466	}
467////printf(">>> buf size AFTER agg collect cpu(%d)=%d\n",cpu,buf->dtbd_size);
468	if (buf->dtbd_drops != 0) {
469		if (dt_handle_cpudrop(dtp, cpu,
470		    DTRACEDROP_AGGREGATION, buf->dtbd_drops) == -1)
471			return (-1);
472	}
473
474	return ((int)(buf->dtbd_size == 0));
475}
476
477static int
478dt_aggregate_hash_cpu(dtrace_hdl_t *dtp, processorid_t cpu, dtrace_bufdesc_t *cpubuf)
479{
480	dtrace_epid_t id;
481	uint64_t hashval;
482	size_t offs, roffs, size, ndx;
483	int i, j, rval;
484	caddr_t addr, data;
485	dtrace_recdesc_t *rec;
486	dt_aggregate_t *agp = &dtp->dt_aggregate;
487	dtrace_aggdesc_t *agg;
488	dt_ahash_t *hash = &agp->dtat_hash;
489	dt_ahashent_t *h;
490////	dtrace_bufdesc_t b = agp->dtat_buf, *buf = &b; /**/
491	dtrace_bufdesc_t *buf = cpubuf;
492	dtrace_aggdata_t *aggdata;
493	int flags = agp->dtat_flags;
494
495	buf->dtbd_cpu = cpu;
496
497	if (hash->dtah_hash == NULL) {
498		size_t size;
499
500		hash->dtah_size = DTRACE_AHASHSIZE;
501		size = hash->dtah_size * sizeof (dt_ahashent_t *);
502
503		if ((hash->dtah_hash = malloc(size)) == NULL)
504			return (dt_set_errno(dtp, EDT_NOMEM));
505		bzero(hash->dtah_hash, size);
506	}
507////printf(">>>> hash->dtah_hash = %u\n",hash->dtah_hash);
508
509	for (offs = 0; offs < buf->dtbd_size; ) {
510		/*
511		 * We're guaranteed to have an ID.
512		 */
513		id = *((dtrace_epid_t *)((uintptr_t)buf->dtbd_data +
514		    (uintptr_t)offs));
515
516		if (id == DTRACE_AGGIDNONE) {
517			/*
518			 * This is filler to assure proper alignment of the
519			 * next record; we simply ignore it.
520			 */
521			offs += sizeof (id);
522			continue;
523		}
524
525		if ((rval = dt_aggid_lookup(dtp, id, &agg)) != 0)
526			return (rval);
527
528		addr = buf->dtbd_data + offs;
529		size = agg->dtagd_size;
530		hashval = 0;
531
532		for (j = 0; j < agg->dtagd_nrecs - 1; j++) {
533			rec = &agg->dtagd_rec[j];
534			roffs = rec->dtrd_offset;
535
536			switch (rec->dtrd_action) {
537			case DTRACEACT_USYM:
538				dt_aggregate_usym(dtp,
539				    /* LINTED - alignment */
540				    (uint64_t *)&addr[roffs]);
541				break;
542
543			case DTRACEACT_UMOD:
544				dt_aggregate_umod(dtp,
545				    /* LINTED - alignment */
546				    (uint64_t *)&addr[roffs]);
547				break;
548
549			case DTRACEACT_SYM:
550				/* LINTED - alignment */
551				dt_aggregate_sym(dtp, (uint64_t *)&addr[roffs]);
552				break;
553
554			case DTRACEACT_MOD:
555				/* LINTED - alignment */
556				dt_aggregate_mod(dtp, (uint64_t *)&addr[roffs]);
557				break;
558
559			default:
560				break;
561			}
562
563			for (i = 0; i < rec->dtrd_size; i++)
564				hashval += addr[roffs + i];
565		}
566
567		ndx = hashval % hash->dtah_size;
568
569		for (h = hash->dtah_hash[ndx]; h != NULL; h = h->dtahe_next) {
570			if (h->dtahe_hashval != hashval)
571				continue;
572
573			if (h->dtahe_size != size)
574				continue;
575
576			aggdata = &h->dtahe_data;
577			data = aggdata->dtada_data;
578
579			for (j = 0; j < agg->dtagd_nrecs - 1; j++) {
580				rec = &agg->dtagd_rec[j];
581				roffs = rec->dtrd_offset;
582
583				for (i = 0; i < rec->dtrd_size; i++)
584					if (addr[roffs + i] != data[roffs + i])
585						goto hashnext;
586			}
587
588			/*
589			 * We found it.  Now we need to apply the aggregating
590			 * action on the data here.
591			 */
592			rec = &agg->dtagd_rec[agg->dtagd_nrecs - 1];
593			roffs = rec->dtrd_offset;
594			/* LINTED - alignment */
595			h->dtahe_aggregate((int64_t *)&data[roffs],
596			    /* LINTED - alignment */
597			    (int64_t *)&addr[roffs], rec->dtrd_size);
598
599			/*
600			 * If we're keeping per CPU data, apply the aggregating
601			 * action there as well.
602			 */
603			if (aggdata->dtada_percpu != NULL) {
604				data = aggdata->dtada_percpu[cpu];
605
606				/* LINTED - alignment */
607				h->dtahe_aggregate((int64_t *)data,
608				    /* LINTED - alignment */
609				    (int64_t *)&addr[roffs], rec->dtrd_size);
610			}
611
612			goto bufnext;
613hashnext:
614			continue;
615		}
616
617		/*
618		 * If we're here, we couldn't find an entry for this record.
619		 */
620		if ((h = malloc(sizeof (dt_ahashent_t))) == NULL)
621			return (dt_set_errno(dtp, EDT_NOMEM));
622		bzero(h, sizeof (dt_ahashent_t));
623		aggdata = &h->dtahe_data;
624
625		if ((aggdata->dtada_data = malloc(size)) == NULL) {
626			free(h);
627			return (dt_set_errno(dtp, EDT_NOMEM));
628		}
629
630		bcopy(addr, aggdata->dtada_data, size);
631		aggdata->dtada_size = size;
632		aggdata->dtada_desc = agg;
633		aggdata->dtada_handle = dtp;
634		(void) dt_epid_lookup(dtp, agg->dtagd_epid,
635		    &aggdata->dtada_edesc, &aggdata->dtada_pdesc);
636		aggdata->dtada_normal = 1;
637
638		h->dtahe_hashval = hashval;
639		h->dtahe_size = size;
640		(void) dt_aggregate_aggvarid(h);
641
642		rec = &agg->dtagd_rec[agg->dtagd_nrecs - 1];
643
644		if (flags & DTRACE_A_PERCPU) {
645			int max_cpus = agp->dtat_maxcpu;
646			caddr_t *percpu = malloc(max_cpus * sizeof (caddr_t));
647
648			if (percpu == NULL) {
649				free(aggdata->dtada_data);
650				free(h);
651				return (dt_set_errno(dtp, EDT_NOMEM));
652			}
653
654			for (j = 0; j < max_cpus; j++) {
655				percpu[j] = malloc(rec->dtrd_size);
656
657				if (percpu[j] == NULL) {
658					while (--j >= 0)
659						free(percpu[j]);
660
661					free(aggdata->dtada_data);
662					free(h);
663					return (dt_set_errno(dtp, EDT_NOMEM));
664				}
665
666				if (j == cpu) {
667					bcopy(&addr[rec->dtrd_offset],
668					    percpu[j], rec->dtrd_size);
669				} else {
670					bzero(percpu[j], rec->dtrd_size);
671				}
672			}
673
674			aggdata->dtada_percpu = percpu;
675		}
676
677		switch (rec->dtrd_action) {
678		case DTRACEAGG_MIN:
679			h->dtahe_aggregate = dt_aggregate_min;
680			break;
681
682		case DTRACEAGG_MAX:
683			h->dtahe_aggregate = dt_aggregate_max;
684			break;
685
686		case DTRACEAGG_LQUANTIZE:
687			h->dtahe_aggregate = dt_aggregate_lquantize;
688			break;
689
690		case DTRACEAGG_LLQUANTIZE:
691		  h->dtahe_aggregate = dt_aggregate_llquantize;
692		  break;
693
694		case DTRACEAGG_COUNT:
695		case DTRACEAGG_SUM:
696		case DTRACEAGG_AVG:
697		case DTRACEAGG_QUANTIZE:
698			h->dtahe_aggregate = dt_aggregate_count;
699			break;
700
701		default:
702			return (dt_set_errno(dtp, EDT_BADAGG));
703		}
704
705		if (hash->dtah_hash[ndx] != NULL)
706			hash->dtah_hash[ndx]->dtahe_prev = h;
707
708		h->dtahe_next = hash->dtah_hash[ndx];
709		hash->dtah_hash[ndx] = h;
710
711		if (hash->dtah_all != NULL)
712			hash->dtah_all->dtahe_prevall = h;
713
714		h->dtahe_nextall = hash->dtah_all;
715		hash->dtah_all = h;
716bufnext:
717		offs += agg->dtagd_size;
718	}
719////printf(">>>2> hash->dtah_hash = %u\n",hash->dtah_hash);
720
721	return (0);
722}
723
724int
725dtrace_aggregate_collect(dtrace_hdl_t *dtp, dtrace_bufdesc_t ***agg_bufs)
726{
727	int i, rval=0, cpuct=0;
728	dt_aggregate_t *agp = &dtp->dt_aggregate;
729	hrtime_t now = gethrtime();
730	dtrace_optval_t interval = dtp->dt_options[DTRACEOPT_AGGRATE];
731//	dtrace_bufdesc_t b = agp->dtat_buf, *buf = &b;
732	dtrace_bufdesc_t *buf;
733
734	if (dtp->dt_lastagg != 0) {
735		if (now - dtp->dt_lastagg < interval)
736			return (0);
737
738		dtp->dt_lastagg += interval;
739	} else {
740		dtp->dt_lastagg = now;
741	}
742
743	if (!dtp->dt_active)
744		return (dt_set_errno(dtp, EINVAL));
745
746
747	for (i = 0; i < agp->dtat_ncpus; i++) {
748		buf = (*agg_bufs)[i]; /// using the same buffer all the time
749////		if (buf->dtbd_size == 0)
750////			return (0);
751////printf(">>> %d: buf size BEFORE agg collect cpu(%d)=%d\n",i,agp->dtat_cpus[i],buf->dtbd_size);
752		rval = dt_aggregate_collect_cpu(dtp, &buf, agp->dtat_cpus[i]);
753		if(rval > 0)
754			cpuct++;
755	}
756	if(cpuct)
757		return (cpuct);
758	return (rval);
759}
760
761int
762dtrace_aggregate_hash(dtrace_hdl_t *dtp, dtrace_bufdesc_t **agg_bufs)
763{
764	int i, rval=0;
765	dt_aggregate_t *agp = &dtp->dt_aggregate;
766/***
767 *** again, analysis is no longer time dependent...
768	hrtime_t now = gethrtime();
769	dtrace_optval_t interval = dtp->dt_options[DTRACEOPT_AGGRATE];
770
771	if (dtp->dt_lastagg != 0) {
772		if (now - dtp->dt_lastagg < interval)
773			return (0);
774
775		dtp->dt_lastagg += interval;
776	} else {
777		dtp->dt_lastagg = now;
778	}
779 *** this is performed in the collection phase to maintain the timing
780 ***
781 *** ...and whether the facility is active doesn't matter
782 ***
783 	if (!dtp->dt_active)
784		return (dt_set_errno(dtp, EINVAL));
785
786 ***/
787
788////printf(">>>2> agp->dtat_ncpus=%d\n",agp->dtat_ncpus);
789	for (i = 0; i < agp->dtat_ncpus; i++) { //using the same buffer all the time
790		if (rval = dt_aggregate_hash_cpu(dtp, agp->dtat_cpus[i], agg_bufs[i]))
791			return (rval);
792	}
793
794	return (0);
795}
796
797#pragma mark ^^^ modified functions. separate collect/analyze phases ^^^
798
799static int
800dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
801{
802	dtrace_epid_t id;
803	uint64_t hashval;
804	size_t offs, roffs, size, ndx;
805	int i, j, rval;
806	caddr_t addr, data;
807	dtrace_recdesc_t *rec;
808	dt_aggregate_t *agp = &dtp->dt_aggregate;
809	dtrace_aggdesc_t *agg;
810	dt_ahash_t *hash = &agp->dtat_hash;
811	dt_ahashent_t *h;
812	dtrace_bufdesc_t b = agp->dtat_buf, *buf = &b;
813	dtrace_aggdata_t *aggdata;
814	int flags = agp->dtat_flags;
815
816	buf->dtbd_cpu = cpu;
817
818	if (dt_ioctl(dtp, DTRACEIOC_AGGSNAP, buf) == -1) {
819		if (errno == ENOENT) {
820			/*
821			 * If that failed with ENOENT, it may be because the
822			 * CPU was unconfigured.  This is okay; we'll just
823			 * do nothing but return success.
824			 */
825			return (0);
826		}
827
828		return (dt_set_errno(dtp, errno));
829	}
830
831	if (buf->dtbd_drops != 0) {
832		if (dt_handle_cpudrop(dtp, cpu,
833		    DTRACEDROP_AGGREGATION, buf->dtbd_drops) == -1)
834			return (-1);
835	}
836
837	if (buf->dtbd_size == 0)
838		return (0);
839
840	if (hash->dtah_hash == NULL) {
841		size_t size;
842
843		hash->dtah_size = DTRACE_AHASHSIZE;
844		size = hash->dtah_size * sizeof (dt_ahashent_t *);
845
846		if ((hash->dtah_hash = malloc(size)) == NULL)
847			return (dt_set_errno(dtp, EDT_NOMEM));
848
849		bzero(hash->dtah_hash, size);
850	}
851
852	for (offs = 0; offs < buf->dtbd_size; ) {
853		/*
854		 * We're guaranteed to have an ID.
855		 */
856		id = *((dtrace_epid_t *)((uintptr_t)buf->dtbd_data +
857		    (uintptr_t)offs));
858
859		if (id == DTRACE_AGGIDNONE) {
860			/*
861			 * This is filler to assure proper alignment of the
862			 * next record; we simply ignore it.
863			 */
864			offs += sizeof (id);
865			continue;
866		}
867
868		if ((rval = dt_aggid_lookup(dtp, id, &agg)) != 0)
869			return (rval);
870
871		addr = buf->dtbd_data + offs;
872		size = agg->dtagd_size;
873		hashval = 0;
874
875		for (j = 0; j < agg->dtagd_nrecs - 1; j++) {
876			rec = &agg->dtagd_rec[j];
877			roffs = rec->dtrd_offset;
878
879			switch (rec->dtrd_action) {
880			case DTRACEACT_USYM:
881				dt_aggregate_usym(dtp,
882				    /* LINTED - alignment */
883				    (uint64_t *)&addr[roffs]);
884				break;
885
886			case DTRACEACT_UMOD:
887				dt_aggregate_umod(dtp,
888				    /* LINTED - alignment */
889				    (uint64_t *)&addr[roffs]);
890				break;
891
892			case DTRACEACT_SYM:
893				/* LINTED - alignment */
894				dt_aggregate_sym(dtp, (uint64_t *)&addr[roffs]);
895				break;
896
897			case DTRACEACT_MOD:
898				/* LINTED - alignment */
899				dt_aggregate_mod(dtp, (uint64_t *)&addr[roffs]);
900				break;
901
902			default:
903				break;
904			}
905
906			for (i = 0; i < rec->dtrd_size; i++)
907				hashval += addr[roffs + i];
908		}
909
910		ndx = hashval % hash->dtah_size;
911
912		for (h = hash->dtah_hash[ndx]; h != NULL; h = h->dtahe_next) {
913			if (h->dtahe_hashval != hashval)
914				continue;
915
916			if (h->dtahe_size != size)
917				continue;
918
919			aggdata = &h->dtahe_data;
920			data = aggdata->dtada_data;
921
922			for (j = 0; j < agg->dtagd_nrecs - 1; j++) {
923				rec = &agg->dtagd_rec[j];
924				roffs = rec->dtrd_offset;
925
926				for (i = 0; i < rec->dtrd_size; i++)
927					if (addr[roffs + i] != data[roffs + i])
928						goto hashnext;
929			}
930
931			/*
932			 * We found it.  Now we need to apply the aggregating
933			 * action on the data here.
934			 */
935			rec = &agg->dtagd_rec[agg->dtagd_nrecs - 1];
936			roffs = rec->dtrd_offset;
937			/* LINTED - alignment */
938			h->dtahe_aggregate((int64_t *)&data[roffs],
939			    /* LINTED - alignment */
940			    (int64_t *)&addr[roffs], rec->dtrd_size);
941
942			/*
943			 * If we're keeping per CPU data, apply the aggregating
944			 * action there as well.
945			 */
946			if (aggdata->dtada_percpu != NULL) {
947				data = aggdata->dtada_percpu[cpu];
948
949				/* LINTED - alignment */
950				h->dtahe_aggregate((int64_t *)data,
951				    /* LINTED - alignment */
952				    (int64_t *)&addr[roffs], rec->dtrd_size);
953			}
954
955			goto bufnext;
956hashnext:
957			continue;
958		}
959
960		/*
961		 * If we're here, we couldn't find an entry for this record.
962		 */
963		if ((h = malloc(sizeof (dt_ahashent_t))) == NULL)
964			return (dt_set_errno(dtp, EDT_NOMEM));
965		bzero(h, sizeof (dt_ahashent_t));
966		aggdata = &h->dtahe_data;
967
968		if ((aggdata->dtada_data = malloc(size)) == NULL) {
969			free(h);
970			return (dt_set_errno(dtp, EDT_NOMEM));
971		}
972
973		bcopy(addr, aggdata->dtada_data, size);
974		aggdata->dtada_size = size;
975		aggdata->dtada_desc = agg;
976		aggdata->dtada_handle = dtp;
977		(void) dt_epid_lookup(dtp, agg->dtagd_epid,
978		    &aggdata->dtada_edesc, &aggdata->dtada_pdesc);
979		aggdata->dtada_normal = 1;
980
981		h->dtahe_hashval = hashval;
982		h->dtahe_size = size;
983		(void) dt_aggregate_aggvarid(h);
984
985		rec = &agg->dtagd_rec[agg->dtagd_nrecs - 1];
986
987		if (flags & DTRACE_A_PERCPU) {
988			int max_cpus = agp->dtat_maxcpu;
989			caddr_t *percpu = malloc(max_cpus * sizeof (caddr_t));
990
991			if (percpu == NULL) {
992				free(aggdata->dtada_data);
993				free(h);
994				return (dt_set_errno(dtp, EDT_NOMEM));
995			}
996
997			for (j = 0; j < max_cpus; j++) {
998				percpu[j] = malloc(rec->dtrd_size);
999
1000				if (percpu[j] == NULL) {
1001					while (--j >= 0)
1002						free(percpu[j]);
1003
1004					free(aggdata->dtada_data);
1005					free(h);
1006					return (dt_set_errno(dtp, EDT_NOMEM));
1007				}
1008
1009				if (j == cpu) {
1010					bcopy(&addr[rec->dtrd_offset],
1011					    percpu[j], rec->dtrd_size);
1012				} else {
1013					bzero(percpu[j], rec->dtrd_size);
1014				}
1015			}
1016
1017			aggdata->dtada_percpu = percpu;
1018		}
1019
1020		switch (rec->dtrd_action) {
1021		case DTRACEAGG_MIN:
1022			h->dtahe_aggregate = dt_aggregate_min;
1023			break;
1024
1025		case DTRACEAGG_MAX:
1026			h->dtahe_aggregate = dt_aggregate_max;
1027			break;
1028
1029		case DTRACEAGG_LQUANTIZE:
1030			h->dtahe_aggregate = dt_aggregate_lquantize;
1031			break;
1032
1033		case DTRACEAGG_LLQUANTIZE:
1034		  h->dtahe_aggregate = dt_aggregate_llquantize;
1035		  break;
1036
1037		case DTRACEAGG_COUNT:
1038		case DTRACEAGG_SUM:
1039		case DTRACEAGG_AVG:
1040		case DTRACEAGG_STDDEV:
1041		case DTRACEAGG_QUANTIZE:
1042			h->dtahe_aggregate = dt_aggregate_count;
1043			break;
1044
1045		default:
1046			return (dt_set_errno(dtp, EDT_BADAGG));
1047		}
1048
1049		if (hash->dtah_hash[ndx] != NULL)
1050			hash->dtah_hash[ndx]->dtahe_prev = h;
1051
1052		h->dtahe_next = hash->dtah_hash[ndx];
1053		hash->dtah_hash[ndx] = h;
1054
1055		if (hash->dtah_all != NULL)
1056			hash->dtah_all->dtahe_prevall = h;
1057
1058		h->dtahe_nextall = hash->dtah_all;
1059		hash->dtah_all = h;
1060bufnext:
1061		offs += agg->dtagd_size;
1062	}
1063
1064	return (0);
1065}
1066
1067int
1068dtrace_aggregate_snap(dtrace_hdl_t *dtp)
1069{
1070	int i, rval;
1071	dt_aggregate_t *agp = &dtp->dt_aggregate;
1072	hrtime_t now = gethrtime();
1073	dtrace_optval_t interval = dtp->dt_options[DTRACEOPT_AGGRATE];
1074
1075	if (dtp->dt_lastagg != 0) {
1076		if (now - dtp->dt_lastagg < interval)
1077			return (0);
1078
1079		dtp->dt_lastagg += interval;
1080	} else {
1081		dtp->dt_lastagg = now;
1082	}
1083
1084	if (!dtp->dt_active)
1085		return (dt_set_errno(dtp, EINVAL));
1086
1087	if (agp->dtat_buf.dtbd_size == 0)
1088		return (0);
1089
1090	for (i = 0; i < agp->dtat_ncpus; i++) {
1091		if (rval = dt_aggregate_snap_cpu(dtp, agp->dtat_cpus[i]))
1092			return (rval);
1093	}
1094
1095	return (0);
1096}
1097
1098static int
1099dt_aggregate_hashcmp(const void *lhs, const void *rhs)
1100{
1101	dt_ahashent_t *lh = *((dt_ahashent_t **)lhs);
1102	dt_ahashent_t *rh = *((dt_ahashent_t **)rhs);
1103	dtrace_aggdesc_t *lagg = lh->dtahe_data.dtada_desc;
1104	dtrace_aggdesc_t *ragg = rh->dtahe_data.dtada_desc;
1105
1106	if (lagg->dtagd_nrecs < ragg->dtagd_nrecs)
1107		return (DT_LESSTHAN);
1108
1109	if (lagg->dtagd_nrecs > ragg->dtagd_nrecs)
1110		return (DT_GREATERTHAN);
1111
1112	return (0);
1113}
1114
1115static int
1116dt_aggregate_varcmp(const void *lhs, const void *rhs)
1117{
1118	dt_ahashent_t *lh = *((dt_ahashent_t **)lhs);
1119	dt_ahashent_t *rh = *((dt_ahashent_t **)rhs);
1120	dtrace_aggvarid_t lid, rid;
1121
1122	lid = dt_aggregate_aggvarid(lh);
1123	rid = dt_aggregate_aggvarid(rh);
1124
1125	if (lid < rid)
1126		return (DT_LESSTHAN);
1127
1128	if (lid > rid)
1129		return (DT_GREATERTHAN);
1130
1131	return (0);
1132}
1133
1134static int
1135dt_aggregate_keycmp(const void *lhs, const void *rhs)
1136{
1137	dt_ahashent_t *lh = *((dt_ahashent_t **)lhs);
1138	dt_ahashent_t *rh = *((dt_ahashent_t **)rhs);
1139	dtrace_aggdesc_t *lagg = lh->dtahe_data.dtada_desc;
1140	dtrace_aggdesc_t *ragg = rh->dtahe_data.dtada_desc;
1141	dtrace_recdesc_t *lrec, *rrec;
1142	char *ldata, *rdata;
1143	int rval, i, j, keypos, nrecs;
1144
1145	if ((rval = dt_aggregate_hashcmp(lhs, rhs)) != 0)
1146		return (rval);
1147
1148	nrecs = lagg->dtagd_nrecs - 1;
1149	assert(nrecs == ragg->dtagd_nrecs - 1);
1150
1151	keypos = dt_keypos + 1 >= nrecs ? 0 : dt_keypos;
1152
1153	for (i = 1; i < nrecs; i++) {
1154		uint64_t lval, rval;
1155		int ndx = i + keypos;
1156
1157		if (ndx >= nrecs)
1158			ndx = ndx - nrecs + 1;
1159
1160		lrec = &lagg->dtagd_rec[ndx];
1161		rrec = &ragg->dtagd_rec[ndx];
1162
1163		ldata = lh->dtahe_data.dtada_data + lrec->dtrd_offset;
1164		rdata = rh->dtahe_data.dtada_data + rrec->dtrd_offset;
1165
1166		if (lrec->dtrd_size < rrec->dtrd_size)
1167			return (DT_LESSTHAN);
1168
1169		if (lrec->dtrd_size > rrec->dtrd_size)
1170			return (DT_GREATERTHAN);
1171
1172		switch (lrec->dtrd_size) {
1173		case sizeof (uint64_t):
1174			/* LINTED - alignment */
1175			lval = *((uint64_t *)ldata);
1176			/* LINTED - alignment */
1177			rval = *((uint64_t *)rdata);
1178			break;
1179
1180		case sizeof (uint32_t):
1181			/* LINTED - alignment */
1182			lval = *((uint32_t *)ldata);
1183			/* LINTED - alignment */
1184			rval = *((uint32_t *)rdata);
1185			break;
1186
1187		case sizeof (uint16_t):
1188			/* LINTED - alignment */
1189			lval = *((uint16_t *)ldata);
1190			/* LINTED - alignment */
1191			rval = *((uint16_t *)rdata);
1192			break;
1193
1194		case sizeof (uint8_t):
1195			lval = *((uint8_t *)ldata);
1196			rval = *((uint8_t *)rdata);
1197			break;
1198
1199		default:
1200			switch (lrec->dtrd_action) {
1201			case DTRACEACT_UMOD:
1202			case DTRACEACT_UADDR:
1203			case DTRACEACT_USYM:
1204				for (j = 0; j < 2; j++) {
1205					/* LINTED - alignment */
1206					lval = ((uint64_t *)ldata)[j];
1207					/* LINTED - alignment */
1208					rval = ((uint64_t *)rdata)[j];
1209
1210					if (lval < rval)
1211						return (DT_LESSTHAN);
1212
1213					if (lval > rval)
1214						return (DT_GREATERTHAN);
1215				}
1216
1217				break;
1218
1219			default:
1220			for (j = 0; j < lrec->dtrd_size; j++) {
1221				lval = ((uint8_t *)ldata)[j];
1222				rval = ((uint8_t *)rdata)[j];
1223
1224				if (lval < rval)
1225					return (DT_LESSTHAN);
1226
1227				if (lval > rval)
1228					return (DT_GREATERTHAN);
1229			}
1230			}
1231
1232			continue;
1233		}
1234
1235		if (lval < rval)
1236			return (DT_LESSTHAN);
1237
1238		if (lval > rval)
1239			return (DT_GREATERTHAN);
1240	}
1241
1242	return (0);
1243}
1244
1245static int
1246dt_aggregate_valcmp(const void *lhs, const void *rhs)
1247{
1248	dt_ahashent_t *lh = *((dt_ahashent_t **)lhs);
1249	dt_ahashent_t *rh = *((dt_ahashent_t **)rhs);
1250	dtrace_aggdesc_t *lagg = lh->dtahe_data.dtada_desc;
1251	dtrace_aggdesc_t *ragg = rh->dtahe_data.dtada_desc;
1252	caddr_t ldata = lh->dtahe_data.dtada_data;
1253	caddr_t rdata = rh->dtahe_data.dtada_data;
1254	dtrace_recdesc_t *lrec, *rrec;
1255	int64_t *laddr, *raddr;
1256	int rval, i;
1257
1258	if ((rval = dt_aggregate_hashcmp(lhs, rhs)) != 0)
1259		return (rval);
1260
1261	if (lagg->dtagd_nrecs > ragg->dtagd_nrecs)
1262		return (DT_GREATERTHAN);
1263
1264	if (lagg->dtagd_nrecs < ragg->dtagd_nrecs)
1265		return (DT_LESSTHAN);
1266
1267	for (i = 0; i < lagg->dtagd_nrecs; i++) {
1268		lrec = &lagg->dtagd_rec[i];
1269		rrec = &ragg->dtagd_rec[i];
1270
1271		if (lrec->dtrd_offset < rrec->dtrd_offset)
1272			return (DT_LESSTHAN);
1273
1274		if (lrec->dtrd_offset > rrec->dtrd_offset)
1275			return (DT_GREATERTHAN);
1276
1277		if (lrec->dtrd_action < rrec->dtrd_action)
1278			return (DT_LESSTHAN);
1279
1280		if (lrec->dtrd_action > rrec->dtrd_action)
1281			return (DT_GREATERTHAN);
1282	}
1283
1284	laddr = (int64_t *)(uintptr_t)(ldata + lrec->dtrd_offset);
1285	raddr = (int64_t *)(uintptr_t)(rdata + rrec->dtrd_offset);
1286
1287	switch (lrec->dtrd_action) {
1288	case DTRACEAGG_AVG:
1289		rval = dt_aggregate_averagecmp(laddr, raddr);
1290		break;
1291
1292	case DTRACEAGG_STDDEV:
1293		rval = dt_aggregate_stddevcmp(laddr, raddr);
1294		break;
1295
1296	case DTRACEAGG_QUANTIZE:
1297		rval = dt_aggregate_quantizedcmp(laddr, raddr);
1298		break;
1299
1300	case DTRACEAGG_LQUANTIZE:
1301		rval = dt_aggregate_lquantizedcmp(laddr, raddr);
1302		break;
1303
1304  case DTRACEAGG_LLQUANTIZE:
1305		rval = dt_aggregate_llquantizedcmp(laddr, raddr);
1306		break;
1307
1308	case DTRACEAGG_COUNT:
1309	case DTRACEAGG_SUM:
1310	case DTRACEAGG_MIN:
1311	case DTRACEAGG_MAX:
1312		rval = dt_aggregate_countcmp(laddr, raddr);
1313		break;
1314
1315	default:
1316		assert(0);
1317	}
1318
1319	return (rval);
1320}
1321
1322static int
1323dt_aggregate_valkeycmp(const void *lhs, const void *rhs)
1324{
1325	int rval;
1326
1327	if ((rval = dt_aggregate_valcmp(lhs, rhs)) != 0)
1328		return (rval);
1329
1330	/*
1331	 * If we're here, the values for the two aggregation elements are
1332	 * equal.  We already know that the key layout is the same for the two
1333	 * elements; we must now compare the keys themselves as a tie-breaker.
1334	 */
1335	return (dt_aggregate_keycmp(lhs, rhs));
1336}
1337
1338static int
1339dt_aggregate_keyvarcmp(const void *lhs, const void *rhs)
1340{
1341	int rval;
1342
1343	if ((rval = dt_aggregate_keycmp(lhs, rhs)) != 0)
1344		return (rval);
1345
1346	return (dt_aggregate_varcmp(lhs, rhs));
1347}
1348
1349static int
1350dt_aggregate_varkeycmp(const void *lhs, const void *rhs)
1351{
1352	int rval;
1353
1354	if ((rval = dt_aggregate_varcmp(lhs, rhs)) != 0)
1355		return (rval);
1356
1357	return (dt_aggregate_keycmp(lhs, rhs));
1358}
1359
1360static int
1361dt_aggregate_valvarcmp(const void *lhs, const void *rhs)
1362{
1363	int rval;
1364
1365	if ((rval = dt_aggregate_valkeycmp(lhs, rhs)) != 0)
1366		return (rval);
1367
1368	return (dt_aggregate_varcmp(lhs, rhs));
1369}
1370
1371static int
1372dt_aggregate_varvalcmp(const void *lhs, const void *rhs)
1373{
1374	int rval;
1375
1376	if ((rval = dt_aggregate_varcmp(lhs, rhs)) != 0)
1377		return (rval);
1378
1379	return (dt_aggregate_valkeycmp(lhs, rhs));
1380}
1381
1382static int
1383dt_aggregate_keyvarrevcmp(const void *lhs, const void *rhs)
1384{
1385	return (dt_aggregate_keyvarcmp(rhs, lhs));
1386}
1387
1388static int
1389dt_aggregate_varkeyrevcmp(const void *lhs, const void *rhs)
1390{
1391	return (dt_aggregate_varkeycmp(rhs, lhs));
1392}
1393
1394static int
1395dt_aggregate_valvarrevcmp(const void *lhs, const void *rhs)
1396{
1397	return (dt_aggregate_valvarcmp(rhs, lhs));
1398}
1399
1400static int
1401dt_aggregate_varvalrevcmp(const void *lhs, const void *rhs)
1402{
1403	return (dt_aggregate_varvalcmp(rhs, lhs));
1404}
1405
1406static int
1407dt_aggregate_bundlecmp(const void *lhs, const void *rhs)
1408{
1409	dt_ahashent_t **lh = *((dt_ahashent_t ***)lhs);
1410	dt_ahashent_t **rh = *((dt_ahashent_t ***)rhs);
1411	int i, rval;
1412
1413	if (dt_keysort) {
1414		/*
1415		 * If we're sorting on keys, we need to scan until we find the
1416		 * last entry -- that's the representative key.  (The order of
1417		 * the bundle is values followed by key to accommodate the
1418		 * default behavior of sorting by value.)  If the keys are
1419		 * equal, we'll fall into the value comparison loop, below.
1420		 */
1421		for (i = 0; lh[i + 1] != NULL; i++)
1422			continue;
1423
1424		assert(i != 0);
1425		assert(rh[i + 1] == NULL);
1426
1427		if ((rval = dt_aggregate_keycmp(&lh[i], &rh[i])) != 0)
1428			return (rval);
1429	}
1430
1431	for (i = 0; ; i++) {
1432		if (lh[i + 1] == NULL) {
1433			/*
1434			 * All of the values are equal; if we're sorting on
1435			 * keys, then we're only here because the keys were
1436			 * found to be equal and these records are therefore
1437			 * equal.  If we're not sorting on keys, we'll use the
1438			 * key comparison from the representative key as the
1439			 * tie-breaker.
1440			 */
1441			if (dt_keysort)
1442				return (0);
1443
1444			assert(i != 0);
1445			assert(rh[i + 1] == NULL);
1446			return (dt_aggregate_keycmp(&lh[i], &rh[i]));
1447		} else {
1448			if ((rval = dt_aggregate_valcmp(&lh[i], &rh[i])) != 0)
1449				return (rval);
1450		}
1451	}
1452}
1453
1454#pragma mark vvv modified functions. separate collect/analyze phases vvv
1455
1456int
1457dt_aggregate_setup(dtrace_hdl_t *dtp, dtrace_bufdesc_t ***agg_bufs)
1458{
1459	dt_aggregate_t *agp = &dtp->dt_aggregate;
1460	dtrace_optval_t size, cpu;
1461	dtrace_bufdesc_t *buf; //= &agp->dtat_buf;
1462	int rval, i;
1463
1464	assert(agp->dtat_maxcpu == 0);
1465	assert(agp->dtat_ncpu == 0);
1466	assert(agp->dtat_cpus == NULL);
1467
1468	agp->dtat_maxcpu = dt_sysconf(dtp, _SC_CPUID_MAX) + 1;
1469	agp->dtat_ncpu = dt_sysconf(dtp, _SC_NPROCESSORS_MAX);
1470	agp->dtat_cpus = malloc(agp->dtat_ncpu * sizeof (processorid_t));
1471
1472	if (agp->dtat_cpus == NULL)
1473		return (dt_set_errno(dtp, EDT_NOMEM));
1474
1475	/*
1476	 * Use the aggregation buffer size as reloaded from the kernel.
1477	 */
1478	size = dtp->dt_options[DTRACEOPT_AGGSIZE];
1479
1480	rval = dtrace_getopt(dtp, "aggsize", &size);
1481	assert(rval == 0);
1482
1483	if (size == 0 || size == DTRACEOPT_UNSET)
1484		return (0);
1485
1486/*** original allocation of single buffer ***
1487	buf = &agp->dtat_buf;
1488	buf->dtbd_size = size;
1489
1490	if ((buf->dtbd_data = malloc(buf->dtbd_size)) == NULL)
1491		return (dt_set_errno(dtp, EDT_NOMEM));
1492***/
1493	/*
1494	 * Now query for the CPUs enabled.
1495	 */
1496	rval = dtrace_getopt(dtp, "cpu", &cpu);
1497	assert(rval == 0 && cpu != DTRACEOPT_UNSET);
1498
1499	if (cpu != DTRACE_CPUALL) {
1500		assert(cpu < agp->dtat_ncpu);
1501		agp->dtat_cpus[agp->dtat_ncpus++] = (processorid_t)cpu;
1502
1503		return (0);
1504	}
1505
1506	agp->dtat_ncpus = 0;
1507	for (i = 0; i < agp->dtat_maxcpu; i++) {
1508		if (dt_status(dtp, i) == -1)
1509			continue;
1510
1511		agp->dtat_cpus[agp->dtat_ncpus++] = i;
1512	}
1513////printf(">>> agp->dtat_ncpus=%d\n",agp->dtat_ncpus);
1514
1515/*** make an array of buffers instead ***/
1516	*agg_bufs = calloc(agp->dtat_maxcpu,sizeof(dtrace_bufdesc_t *));
1517	for (i = 0; i < agp->dtat_maxcpu; i++) {
1518		buf = calloc(1,sizeof(dtrace_bufdesc_t));
1519		buf->dtbd_cpu = i;
1520		buf->dtbd_size = size;
1521		if ((buf->dtbd_data = malloc(buf->dtbd_size)) == NULL)
1522			return (dt_set_errno(dtp, EDT_NOMEM));
1523		(*agg_bufs)[i] = buf;
1524	}
1525/***/
1526	return (0);
1527}
1528#pragma mark ^^^ modified functions. separate collect/analyze phases ^^^
1529
1530int
1531dt_aggregate_go(dtrace_hdl_t *dtp)
1532{
1533	dt_aggregate_t *agp = &dtp->dt_aggregate;
1534	dtrace_optval_t size, cpu;
1535	dtrace_bufdesc_t *buf = &agp->dtat_buf;
1536	int rval, i;
1537
1538	assert(agp->dtat_maxcpu == 0);
1539	assert(agp->dtat_ncpu == 0);
1540	assert(agp->dtat_cpus == NULL);
1541
1542	agp->dtat_maxcpu = dt_sysconf(dtp, _SC_CPUID_MAX) + 1;
1543	agp->dtat_ncpu = dt_sysconf(dtp, _SC_NPROCESSORS_MAX);
1544	agp->dtat_cpus = malloc(agp->dtat_ncpu * sizeof (processorid_t));
1545
1546	if (agp->dtat_cpus == NULL)
1547		return (dt_set_errno(dtp, EDT_NOMEM));
1548
1549	/*
1550	 * Use the aggregation buffer size as reloaded from the kernel.
1551	 */
1552	size = dtp->dt_options[DTRACEOPT_AGGSIZE];
1553
1554	rval = dtrace_getopt(dtp, "aggsize", &size);
1555	assert(rval == 0);
1556
1557	if (size == 0 || size == DTRACEOPT_UNSET)
1558		return (0);
1559
1560	buf = &agp->dtat_buf;
1561	buf->dtbd_size = size;
1562
1563	if ((buf->dtbd_data = malloc(buf->dtbd_size)) == NULL)
1564		return (dt_set_errno(dtp, EDT_NOMEM));
1565
1566	/*
1567	 * Now query for the CPUs enabled.
1568	 */
1569	rval = dtrace_getopt(dtp, "cpu", &cpu);
1570	assert(rval == 0 && cpu != DTRACEOPT_UNSET);
1571
1572	if (cpu != DTRACE_CPUALL) {
1573		assert(cpu < agp->dtat_ncpu);
1574		agp->dtat_cpus[agp->dtat_ncpus++] = (processorid_t)cpu;
1575
1576		return (0);
1577	}
1578
1579	agp->dtat_ncpus = 0;
1580	for (i = 0; i < agp->dtat_maxcpu; i++) {
1581		if (dt_status(dtp, i) == -1)
1582			continue;
1583
1584		agp->dtat_cpus[agp->dtat_ncpus++] = i;
1585	}
1586
1587	return (0);
1588}
1589
1590static int
1591dt_aggwalk_rval(dtrace_hdl_t *dtp, dt_ahashent_t *h, int rval)
1592{
1593	dt_aggregate_t *agp = &dtp->dt_aggregate;
1594	dtrace_aggdata_t *data;
1595	dtrace_aggdesc_t *aggdesc;
1596	dtrace_recdesc_t *rec;
1597	int i;
1598
1599	switch (rval) {
1600	case DTRACE_AGGWALK_NEXT:
1601		break;
1602
1603	case DTRACE_AGGWALK_CLEAR: {
1604		uint32_t size, offs = 0;
1605
1606		aggdesc = h->dtahe_data.dtada_desc;
1607		rec = &aggdesc->dtagd_rec[aggdesc->dtagd_nrecs - 1];
1608		size = rec->dtrd_size;
1609		data = &h->dtahe_data;
1610
1611		if (rec->dtrd_action == DTRACEAGG_LQUANTIZE) {
1612			offs = sizeof (uint64_t);
1613			size -= sizeof (uint64_t);
1614		}
1615
1616		bzero(&data->dtada_data[rec->dtrd_offset] + offs, size);
1617
1618		if (data->dtada_percpu == NULL)
1619			break;
1620
1621		for (i = 0; i < dtp->dt_aggregate.dtat_maxcpu; i++)
1622			bzero(data->dtada_percpu[i] + offs, size);
1623		break;
1624	}
1625
1626	case DTRACE_AGGWALK_ERROR:
1627		/*
1628		 * We assume that errno is already set in this case.
1629		 */
1630		return (dt_set_errno(dtp, errno));
1631
1632	case DTRACE_AGGWALK_ABORT:
1633		return (dt_set_errno(dtp, EDT_DIRABORT));
1634
1635	case DTRACE_AGGWALK_DENORMALIZE:
1636		h->dtahe_data.dtada_normal = 1;
1637		return (0);
1638
1639	case DTRACE_AGGWALK_NORMALIZE:
1640		if (h->dtahe_data.dtada_normal == 0) {
1641			h->dtahe_data.dtada_normal = 1;
1642			return (dt_set_errno(dtp, EDT_BADRVAL));
1643		}
1644
1645		return (0);
1646
1647	case DTRACE_AGGWALK_REMOVE: {
1648		dtrace_aggdata_t *aggdata = &h->dtahe_data;
1649		int i, max_cpus = agp->dtat_maxcpu;
1650
1651		/*
1652		 * First, remove this hash entry from its hash chain.
1653		 */
1654		if (h->dtahe_prev != NULL) {
1655			h->dtahe_prev->dtahe_next = h->dtahe_next;
1656		} else {
1657			dt_ahash_t *hash = &agp->dtat_hash;
1658			size_t ndx = h->dtahe_hashval % hash->dtah_size;
1659
1660			assert(hash->dtah_hash[ndx] == h);
1661			hash->dtah_hash[ndx] = h->dtahe_next;
1662		}
1663
1664		if (h->dtahe_next != NULL)
1665			h->dtahe_next->dtahe_prev = h->dtahe_prev;
1666
1667		/*
1668		 * Now remove it from the list of all hash entries.
1669		 */
1670		if (h->dtahe_prevall != NULL) {
1671			h->dtahe_prevall->dtahe_nextall = h->dtahe_nextall;
1672		} else {
1673			dt_ahash_t *hash = &agp->dtat_hash;
1674
1675			assert(hash->dtah_all == h);
1676			hash->dtah_all = h->dtahe_nextall;
1677		}
1678
1679		if (h->dtahe_nextall != NULL)
1680			h->dtahe_nextall->dtahe_prevall = h->dtahe_prevall;
1681
1682		/*
1683		 * We're unlinked.  We can safely destroy the data.
1684		 */
1685		if (aggdata->dtada_percpu != NULL) {
1686			for (i = 0; i < max_cpus; i++)
1687				free(aggdata->dtada_percpu[i]);
1688			free(aggdata->dtada_percpu);
1689		}
1690
1691		free(aggdata->dtada_data);
1692		free(h);
1693
1694		return (0);
1695	}
1696
1697	default:
1698		return (dt_set_errno(dtp, EDT_BADRVAL));
1699	}
1700
1701	return (0);
1702}
1703
1704void
1705dt_aggregate_qsort(dtrace_hdl_t *dtp, void *base, size_t nel, size_t width,
1706    int (*compar)(const void *, const void *))
1707{
1708	int rev = dt_revsort, key = dt_keysort, keypos = dt_keypos;
1709	dtrace_optval_t keyposopt = dtp->dt_options[DTRACEOPT_AGGSORTKEYPOS];
1710
1711	dt_revsort = (dtp->dt_options[DTRACEOPT_AGGSORTREV] != DTRACEOPT_UNSET);
1712	dt_keysort = (dtp->dt_options[DTRACEOPT_AGGSORTKEY] != DTRACEOPT_UNSET);
1713
1714	if (keyposopt != DTRACEOPT_UNSET && keyposopt <= INT_MAX) {
1715		dt_keypos = (int)keyposopt;
1716	} else {
1717		dt_keypos = 0;
1718	}
1719
1720	if (compar == NULL) {
1721		if (!dt_keysort) {
1722			compar = dt_aggregate_varvalcmp;
1723		} else {
1724			compar = dt_aggregate_varkeycmp;
1725		}
1726	}
1727
1728	qsort(base, nel, width, compar);
1729
1730	dt_revsort = rev;
1731	dt_keysort = key;
1732	dt_keypos = keypos;
1733}
1734
1735int
1736dtrace_aggregate_walk(dtrace_hdl_t *dtp, dtrace_aggregate_f *func, void *arg)
1737{
1738	dt_ahashent_t *h, *next;
1739	dt_ahash_t *hash = &dtp->dt_aggregate.dtat_hash;
1740
1741	for (h = hash->dtah_all; h != NULL; h = next) {
1742		/*
1743		 * dt_aggwalk_rval() can potentially remove the current hash
1744		 * entry; we need to load the next hash entry before calling
1745		 * into it.
1746		 */
1747		next = h->dtahe_nextall;
1748
1749		if (dt_aggwalk_rval(dtp, h, func(&h->dtahe_data, arg)) == -1)
1750			return (-1);
1751	}
1752
1753	return (0);
1754}
1755
1756static int
1757dt_aggregate_total(dtrace_hdl_t *dtp, boolean_t clear)
1758{
1759	dt_ahashent_t *h;
1760	dtrace_aggdata_t **total;
1761	dtrace_aggid_t max = DTRACE_AGGVARIDNONE, id;
1762	dt_aggregate_t *agp = &dtp->dt_aggregate;
1763	dt_ahash_t *hash = &agp->dtat_hash;
1764	uint32_t tflags;
1765
1766	tflags = DTRACE_A_TOTAL | DTRACE_A_HASNEGATIVES | DTRACE_A_HASPOSITIVES;
1767
1768	/*
1769	 * If we need to deliver per-aggregation totals, we're going to take
1770	 * three passes over the aggregate:  one to clear everything out and
1771	 * determine our maximum aggregation ID, one to actually total
1772	 * everything up, and a final pass to assign the totals to the
1773	 * individual elements.
1774	 */
1775	for (h = hash->dtah_all; h != NULL; h = h->dtahe_nextall) {
1776		dtrace_aggdata_t *aggdata = &h->dtahe_data;
1777
1778		if ((id = dt_aggregate_aggvarid(h)) > max)
1779			max = id;
1780
1781		aggdata->dtada_total = 0;
1782		aggdata->dtada_flags &= ~tflags;
1783	}
1784
1785	if (clear || max == DTRACE_AGGVARIDNONE)
1786		return (0);
1787
1788	total = dt_zalloc(dtp, (max + 1) * sizeof (dtrace_aggdata_t *));
1789
1790	if (total == NULL)
1791		return (-1);
1792
1793	for (h = hash->dtah_all; h != NULL; h = h->dtahe_nextall) {
1794		dtrace_aggdata_t *aggdata = &h->dtahe_data;
1795		dtrace_aggdesc_t *agg = aggdata->dtada_desc;
1796		dtrace_recdesc_t *rec;
1797		caddr_t data;
1798		int64_t val, *addr;
1799
1800		rec = &agg->dtagd_rec[agg->dtagd_nrecs - 1];
1801		data = aggdata->dtada_data;
1802		addr = (int64_t *)(uintptr_t)(data + rec->dtrd_offset);
1803
1804		switch (rec->dtrd_action) {
1805			case DTRACEAGG_STDDEV:
1806				val = dt_stddev((uint64_t *)addr, 1);
1807				break;
1808
1809			case DTRACEAGG_SUM:
1810			case DTRACEAGG_COUNT:
1811				val = *addr;
1812				break;
1813
1814			case DTRACEAGG_AVG:
1815				val = addr[0] ? (addr[1] / addr[0]) : 0;
1816				break;
1817
1818			default:
1819				continue;
1820		}
1821
1822		if (total[agg->dtagd_varid] == NULL) {
1823			total[agg->dtagd_varid] = aggdata;
1824			aggdata->dtada_flags |= DTRACE_A_TOTAL;
1825		} else {
1826			aggdata = total[agg->dtagd_varid];
1827		}
1828
1829		if (val > 0)
1830			aggdata->dtada_flags |= DTRACE_A_HASPOSITIVES;
1831
1832		if (val < 0) {
1833			aggdata->dtada_flags |= DTRACE_A_HASNEGATIVES;
1834			val = -val;
1835		}
1836
1837		if (dtp->dt_options[DTRACEOPT_AGGZOOM] != DTRACEOPT_UNSET) {
1838			val = (int64_t)((long double)val *
1839					(1 / DTRACE_AGGZOOM_MAX));
1840
1841			if (val > aggdata->dtada_total)
1842				aggdata->dtada_total = val;
1843		} else {
1844			aggdata->dtada_total += val;
1845		}
1846	}
1847
1848	/*
1849	 * And now one final pass to set everyone's total.
1850	 */
1851	for (h = hash->dtah_all; h != NULL; h = h->dtahe_nextall) {
1852		dtrace_aggdata_t *aggdata = &h->dtahe_data, *t;
1853		dtrace_aggdesc_t *agg = aggdata->dtada_desc;
1854
1855		if ((t = total[agg->dtagd_varid]) == NULL || aggdata == t)
1856			continue;
1857
1858		aggdata->dtada_total = t->dtada_total;
1859		aggdata->dtada_flags |= (t->dtada_flags & tflags);
1860	}
1861
1862	dt_free(dtp, total);
1863
1864	return (0);
1865}
1866
1867static int
1868dt_aggregate_minmaxbin(dtrace_hdl_t *dtp, boolean_t clear)
1869{
1870	dt_ahashent_t *h;
1871	dtrace_aggdata_t **minmax;
1872	dtrace_aggid_t max = DTRACE_AGGVARIDNONE, id;
1873	dt_aggregate_t *agp = &dtp->dt_aggregate;
1874	dt_ahash_t *hash = &agp->dtat_hash;
1875
1876	for (h = hash->dtah_all; h != NULL; h = h->dtahe_nextall) {
1877		dtrace_aggdata_t *aggdata = &h->dtahe_data;
1878
1879		if ((id = dt_aggregate_aggvarid(h)) > max)
1880			max = id;
1881
1882		aggdata->dtada_minbin = 0;
1883		aggdata->dtada_maxbin = 0;
1884		aggdata->dtada_flags &= ~DTRACE_A_MINMAXBIN;
1885	}
1886
1887	if (clear || max == DTRACE_AGGVARIDNONE)
1888		return (0);
1889
1890	minmax = dt_zalloc(dtp, (max + 1) * sizeof (dtrace_aggdata_t *));
1891
1892	if (minmax == NULL)
1893		return (-1);
1894
1895	for (h = hash->dtah_all; h != NULL; h = h->dtahe_nextall) {
1896		dtrace_aggdata_t *aggdata = &h->dtahe_data;
1897		dtrace_aggdesc_t *agg = aggdata->dtada_desc;
1898		dtrace_recdesc_t *rec;
1899		caddr_t data;
1900		int64_t *addr;
1901		int minbin = -1, maxbin = -1, i;
1902		int start = 0, size;
1903
1904		rec = &agg->dtagd_rec[agg->dtagd_nrecs - 1];
1905		size = rec->dtrd_size / sizeof (int64_t);
1906		data = aggdata->dtada_data;
1907		addr = (int64_t *)(uintptr_t)(data + rec->dtrd_offset);
1908
1909		switch (rec->dtrd_action) {
1910		case DTRACEAGG_LQUANTIZE:
1911			/*
1912			 * For lquantize(), we always display the entire range
1913			 * of the aggregation when aggpack is set.
1914			 */
1915			start = 1;
1916			minbin = start;
1917			maxbin = size - 1 - start;
1918			break;
1919
1920		case DTRACEAGG_QUANTIZE:
1921			for (i = start; i < size; i++) {
1922				if (!addr[i])
1923					continue;
1924
1925				if (minbin == -1)
1926					minbin = i - start;
1927
1928				maxbin = i - start;
1929			}
1930
1931			if (minbin == -1) {
1932				/*
1933				 * If we have no data (e.g., due to a clear()
1934				 * or negative increments), we'll use the
1935				 * zero bucket as both our min and max.
1936				 */
1937				minbin = maxbin = DTRACE_QUANTIZE_ZEROBUCKET;
1938			}
1939
1940			break;
1941
1942		default:
1943			continue;
1944		}
1945
1946		if (minmax[agg->dtagd_varid] == NULL) {
1947			minmax[agg->dtagd_varid] = aggdata;
1948			aggdata->dtada_flags |= DTRACE_A_MINMAXBIN;
1949			aggdata->dtada_minbin = minbin;
1950			aggdata->dtada_maxbin = maxbin;
1951			continue;
1952		}
1953
1954		if (minbin < minmax[agg->dtagd_varid]->dtada_minbin)
1955			minmax[agg->dtagd_varid]->dtada_minbin = minbin;
1956
1957		if (maxbin > minmax[agg->dtagd_varid]->dtada_maxbin)
1958			minmax[agg->dtagd_varid]->dtada_maxbin = maxbin;
1959	}
1960
1961	/*
1962	 * And now one final pass to set everyone's minbin and maxbin.
1963	 */
1964	for (h = hash->dtah_all; h != NULL; h = h->dtahe_nextall) {
1965		dtrace_aggdata_t *aggdata = &h->dtahe_data, *mm;
1966		dtrace_aggdesc_t *agg = aggdata->dtada_desc;
1967
1968		if ((mm = minmax[agg->dtagd_varid]) == NULL || aggdata == mm)
1969			continue;
1970
1971		aggdata->dtada_minbin = mm->dtada_minbin;
1972		aggdata->dtada_maxbin = mm->dtada_maxbin;
1973		aggdata->dtada_flags |= DTRACE_A_MINMAXBIN;
1974	}
1975
1976	dt_free(dtp, minmax);
1977
1978	return (0);
1979}
1980
1981static int
1982dt_aggregate_walk_sorted(dtrace_hdl_t *dtp,
1983    dtrace_aggregate_f *func, void *arg,
1984    int (*sfunc)(const void *, const void *))
1985{
1986	dt_aggregate_t *agp = &dtp->dt_aggregate;
1987	dt_ahashent_t *h, **sorted;
1988	dt_ahash_t *hash = &agp->dtat_hash;
1989	size_t i, nentries = 0;
1990	int rval = -1;
1991
1992	agp->dtat_flags &= ~(DTRACE_A_TOTAL | DTRACE_A_MINMAXBIN);
1993
1994	if (dtp->dt_options[DTRACEOPT_AGGHIST] != DTRACEOPT_UNSET) {
1995		agp->dtat_flags |= DTRACE_A_TOTAL;
1996
1997		if (dt_aggregate_total(dtp, B_FALSE) != 0)
1998			return (-1);
1999	}
2000
2001	if (dtp->dt_options[DTRACEOPT_AGGPACK] != DTRACEOPT_UNSET) {
2002		agp->dtat_flags |= DTRACE_A_MINMAXBIN;
2003
2004		if (dt_aggregate_minmaxbin(dtp, B_FALSE) != 0)
2005			return (-1);
2006	}
2007
2008	for (h = hash->dtah_all; h != NULL; h = h->dtahe_nextall)
2009		nentries++;
2010
2011	sorted = dt_alloc(dtp, nentries * sizeof (dt_ahashent_t *));
2012
2013	if (sorted == NULL)
2014		goto out;
2015
2016	for (h = hash->dtah_all, i = 0; h != NULL; h = h->dtahe_nextall)
2017		sorted[i++] = h;
2018
2019	(void) pthread_mutex_lock(&dt_qsort_lock);
2020
2021	if (sfunc == NULL) {
2022		dt_aggregate_qsort(dtp, sorted, nentries,
2023				sizeof (dt_ahashent_t *), NULL);
2024	} else {
2025		/*
2026		 * If we've been explicitly passed a sorting function,
2027		 * we'll use that -- ignoring the values of the "aggsortrev",
2028		 * "aggsortkey" and "aggsortkeypos" options.
2029		 */
2030		qsort(sorted, nentries, sizeof (dt_ahashent_t *), sfunc);
2031	}
2032
2033	(void) pthread_mutex_unlock(&dt_qsort_lock);
2034
2035	for (i = 0; i < nentries; i++) {
2036		h = sorted[i];
2037
2038		if (dt_aggwalk_rval(dtp, h, func(&h->dtahe_data, arg)) == -1) {
2039			goto out;
2040		}
2041	}
2042
2043	rval = 0;
2044out:
2045	if (agp->dtat_flags & DTRACE_A_TOTAL)
2046		(void) dt_aggregate_total(dtp, B_TRUE);
2047	if (agp->dtat_flags & DTRACE_A_MINMAXBIN)
2048		(void) dt_aggregate_minmaxbin(dtp, B_TRUE);
2049
2050	dt_free(dtp, sorted);
2051	return (rval);
2052}
2053
2054int
2055dtrace_aggregate_walk_sorted(dtrace_hdl_t *dtp,
2056    dtrace_aggregate_f *func, void *arg)
2057{
2058	return (dt_aggregate_walk_sorted(dtp, func, arg, NULL));
2059}
2060
2061int
2062dtrace_aggregate_walk_keysorted(dtrace_hdl_t *dtp,
2063    dtrace_aggregate_f *func, void *arg)
2064{
2065	return (dt_aggregate_walk_sorted(dtp, func,
2066	    arg, dt_aggregate_varkeycmp));
2067}
2068
2069int
2070dtrace_aggregate_walk_valsorted(dtrace_hdl_t *dtp,
2071    dtrace_aggregate_f *func, void *arg)
2072{
2073	return (dt_aggregate_walk_sorted(dtp, func,
2074	    arg, dt_aggregate_varvalcmp));
2075}
2076
2077int
2078dtrace_aggregate_walk_keyvarsorted(dtrace_hdl_t *dtp,
2079    dtrace_aggregate_f *func, void *arg)
2080{
2081	return (dt_aggregate_walk_sorted(dtp, func,
2082	    arg, dt_aggregate_keyvarcmp));
2083}
2084
2085int
2086dtrace_aggregate_walk_valvarsorted(dtrace_hdl_t *dtp,
2087    dtrace_aggregate_f *func, void *arg)
2088{
2089	return (dt_aggregate_walk_sorted(dtp, func,
2090	    arg, dt_aggregate_valvarcmp));
2091}
2092
2093int
2094dtrace_aggregate_walk_keyrevsorted(dtrace_hdl_t *dtp,
2095    dtrace_aggregate_f *func, void *arg)
2096{
2097	return (dt_aggregate_walk_sorted(dtp, func,
2098	    arg, dt_aggregate_varkeyrevcmp));
2099}
2100
2101int
2102dtrace_aggregate_walk_valrevsorted(dtrace_hdl_t *dtp,
2103    dtrace_aggregate_f *func, void *arg)
2104{
2105	return (dt_aggregate_walk_sorted(dtp, func,
2106	    arg, dt_aggregate_varvalrevcmp));
2107}
2108
2109int
2110dtrace_aggregate_walk_keyvarrevsorted(dtrace_hdl_t *dtp,
2111    dtrace_aggregate_f *func, void *arg)
2112{
2113	return (dt_aggregate_walk_sorted(dtp, func,
2114	    arg, dt_aggregate_keyvarrevcmp));
2115}
2116
2117int
2118dtrace_aggregate_walk_valvarrevsorted(dtrace_hdl_t *dtp,
2119    dtrace_aggregate_f *func, void *arg)
2120{
2121	return (dt_aggregate_walk_sorted(dtp, func,
2122	    arg, dt_aggregate_valvarrevcmp));
2123}
2124
2125int
2126dtrace_aggregate_walk_joined(dtrace_hdl_t *dtp, dtrace_aggvarid_t *aggvars,
2127    int naggvars, dtrace_aggregate_walk_joined_f *func, void *arg)
2128{
2129	dt_aggregate_t *agp = &dtp->dt_aggregate;
2130	dt_ahashent_t *h, **sorted = NULL, ***bundle, **nbundle;
2131	const dtrace_aggdata_t **data;
2132	dt_ahashent_t *zaggdata = NULL;
2133	dt_ahash_t *hash = &agp->dtat_hash;
2134	size_t nentries = 0, nbundles = 0, start, zsize = 0, bundlesize;
2135	dtrace_aggvarid_t max = 0, aggvar;
2136	int rval = -1, *map, *remap = NULL;
2137	int i, j;
2138	dtrace_optval_t sortpos = dtp->dt_options[DTRACEOPT_AGGSORTPOS];
2139
2140	/*
2141	 * If the sorting position is greater than the number of aggregation
2142	 * variable IDs, we silently set it to 0.
2143	 */
2144	if (sortpos == DTRACEOPT_UNSET || sortpos >= naggvars)
2145		sortpos = 0;
2146
2147	/*
2148	 * First we need to translate the specified aggregation variable IDs
2149	 * into a linear map that will allow us to translate an aggregation
2150	 * variable ID into its position in the specified aggvars.
2151	 */
2152	for (i = 0; i < naggvars; i++) {
2153		if (aggvars[i] == DTRACE_AGGVARIDNONE || aggvars[i] < 0)
2154			return (dt_set_errno(dtp, EDT_BADAGGVAR));
2155
2156		if (aggvars[i] > max)
2157			max = aggvars[i];
2158	}
2159
2160	if ((map = dt_zalloc(dtp, (max + 1) * sizeof (int))) == NULL)
2161		return (-1);
2162
2163	zaggdata = dt_zalloc(dtp, naggvars * sizeof (dt_ahashent_t));
2164
2165	if (zaggdata == NULL)
2166		goto out;
2167
2168	for (i = 0; i < naggvars; i++) {
2169		int ndx = i + sortpos;
2170
2171		if (ndx >= naggvars)
2172			ndx -= naggvars;
2173
2174		aggvar = aggvars[ndx];
2175		assert(aggvar <= max);
2176
2177		if (map[aggvar]) {
2178			/*
2179			 * We have an aggregation variable that is present
2180			 * more than once in the array of aggregation
2181			 * variables.  While it's unclear why one might want
2182			 * to do this, it's legal.  To support this construct,
2183			 * we will allocate a remap that will indicate the
2184			 * position from which this aggregation variable
2185			 * should be pulled.  (That is, where the remap will
2186			 * map from one position to another.)
2187			 */
2188			if (remap == NULL) {
2189				remap = dt_zalloc(dtp, naggvars * sizeof (int));
2190
2191				if (remap == NULL)
2192					goto out;
2193			}
2194
2195			/*
2196			 * Given that the variable is already present, assert
2197			 * that following through the mapping and adjusting
2198			 * for the sort position yields the same aggregation
2199			 * variable ID.
2200			 */
2201			assert(aggvars[(map[aggvar] - 1 + sortpos) %
2202			    naggvars] == aggvars[ndx]);
2203
2204			remap[i] = map[aggvar];
2205			continue;
2206		}
2207
2208		map[aggvar] = i + 1;
2209	}
2210
2211	/*
2212	 * We need to take two passes over the data to size our allocation, so
2213	 * we'll use the first pass to also fill in the zero-filled data to be
2214	 * used to properly format a zero-valued aggregation.
2215	 */
2216	for (h = hash->dtah_all; h != NULL; h = h->dtahe_nextall) {
2217		dtrace_aggvarid_t id;
2218		int ndx;
2219
2220		if ((id = dt_aggregate_aggvarid(h)) > max || !(ndx = map[id]))
2221			continue;
2222
2223		if (zaggdata[ndx - 1].dtahe_size == 0) {
2224			zaggdata[ndx - 1].dtahe_size = h->dtahe_size;
2225			zaggdata[ndx - 1].dtahe_data = h->dtahe_data;
2226		}
2227
2228		nentries++;
2229	}
2230
2231	if (nentries == 0) {
2232		/*
2233		 * We couldn't find any entries; there is nothing else to do.
2234		 */
2235		rval = 0;
2236		goto out;
2237	}
2238
2239	/*
2240	 * Before we sort the data, we're going to look for any holes in our
2241	 * zero-filled data.  This will occur if an aggregation variable that
2242	 * we are being asked to print has not yet been assigned the result of
2243	 * any aggregating action for _any_ tuple.  The issue becomes that we
2244	 * would like a zero value to be printed for all columns for this
2245	 * aggregation, but without any record description, we don't know the
2246	 * aggregating action that corresponds to the aggregation variable.  To
2247	 * try to find a match, we're simply going to lookup aggregation IDs
2248	 * (which are guaranteed to be contiguous and to start from 1), looking
2249	 * for the specified aggregation variable ID.  If we find a match,
2250	 * we'll use that.  If we iterate over all aggregation IDs and don't
2251	 * find a match, then we must be an anonymous enabling.  (Anonymous
2252	 * enablings can't currently derive either aggregation variable IDs or
2253	 * aggregation variable names given only an aggregation ID.)  In this
2254	 * obscure case (anonymous enabling, multiple aggregation printa() with
2255	 * some aggregations not represented for any tuple), our defined
2256	 * behavior is that the zero will be printed in the format of the first
2257	 * aggregation variable that contains any non-zero value.
2258	 */
2259	for (i = 0; i < naggvars; i++) {
2260		if (zaggdata[i].dtahe_size == 0) {
2261			dtrace_aggvarid_t aggvar;
2262
2263			aggvar = aggvars[(i - sortpos + naggvars) % naggvars];
2264			assert(zaggdata[i].dtahe_data.dtada_data == NULL);
2265
2266			for (j = DTRACE_AGGIDNONE + 1; ; j++) {
2267				dtrace_aggdesc_t *agg;
2268				dtrace_aggdata_t *aggdata;
2269
2270				if (dt_aggid_lookup(dtp, j, &agg) != 0)
2271					break;
2272
2273				if (agg->dtagd_varid != aggvar)
2274					continue;
2275
2276				/*
2277				 * We have our description -- now we need to
2278				 * cons up the zaggdata entry for it.
2279				 */
2280				aggdata = &zaggdata[i].dtahe_data;
2281				aggdata->dtada_size = agg->dtagd_size;
2282				aggdata->dtada_desc = agg;
2283				aggdata->dtada_handle = dtp;
2284				(void) dt_epid_lookup(dtp, agg->dtagd_epid,
2285				    &aggdata->dtada_edesc,
2286				    &aggdata->dtada_pdesc);
2287				aggdata->dtada_normal = 1;
2288				zaggdata[i].dtahe_hashval = 0;
2289				zaggdata[i].dtahe_size = agg->dtagd_size;
2290				break;
2291			}
2292
2293			if (zaggdata[i].dtahe_size == 0) {
2294				caddr_t data;
2295
2296				/*
2297				 * We couldn't find this aggregation, meaning
2298				 * that we have never seen it before for any
2299				 * tuple _and_ this is an anonymous enabling.
2300				 * That is, we're in the obscure case outlined
2301				 * above.  In this case, our defined behavior
2302				 * is to format the data in the format of the
2303				 * first non-zero aggregation -- of which, of
2304				 * course, we know there to be at least one
2305				 * (or nentries would have been zero).
2306				 */
2307				for (j = 0; j < naggvars; j++) {
2308					if (zaggdata[j].dtahe_size != 0)
2309						break;
2310				}
2311
2312				assert(j < naggvars);
2313				zaggdata[i] = zaggdata[j];
2314
2315				data = zaggdata[i].dtahe_data.dtada_data;
2316				assert(data != NULL);
2317			}
2318		}
2319	}
2320
2321	/*
2322	 * Now we need to allocate our zero-filled data for use for
2323	 * aggregations that don't have a value corresponding to a given key.
2324	 */
2325	for (i = 0; i < naggvars; i++) {
2326		dtrace_aggdata_t *aggdata = &zaggdata[i].dtahe_data;
2327		dtrace_aggdesc_t *aggdesc = aggdata->dtada_desc;
2328		dtrace_recdesc_t *rec;
2329		uint64_t larg;
2330		caddr_t zdata;
2331
2332		zsize = zaggdata[i].dtahe_size;
2333		assert(zsize != 0);
2334
2335		if ((zdata = dt_zalloc(dtp, zsize)) == NULL) {
2336			/*
2337			 * If we failed to allocated some zero-filled data, we
2338			 * need to zero out the remaining dtada_data pointers
2339			 * to prevent the wrong data from being freed below.
2340			 */
2341			for (j = i; j < naggvars; j++)
2342				zaggdata[j].dtahe_data.dtada_data = NULL;
2343			goto out;
2344		}
2345
2346		aggvar = aggvars[(i - sortpos + naggvars) % naggvars];
2347
2348		/*
2349		 * First, the easy bit.  To maintain compatibility with
2350		 * consumers that pull the compiler-generated ID out of the
2351		 * data, we put that ID at the top of the zero-filled data.
2352		 */
2353		rec = &aggdesc->dtagd_rec[0];
2354		/* LINTED - alignment */
2355		*((dtrace_aggvarid_t *)(zdata + rec->dtrd_offset)) = aggvar;
2356
2357		rec = &aggdesc->dtagd_rec[aggdesc->dtagd_nrecs - 1];
2358
2359		/*
2360		 * Now for the more complicated part.  If (and only if) this
2361		 * is an lquantize() aggregating action, zero-filled data is
2362		 * not equivalent to an empty record:  we must also get the
2363		 * parameters for the lquantize().
2364		 */
2365		if (rec->dtrd_action == DTRACEAGG_LQUANTIZE) {
2366			if (aggdata->dtada_data != NULL) {
2367				/*
2368				 * The easier case here is if we actually have
2369				 * some prototype data -- in which case we
2370				 * manually dig it out of the aggregation
2371				 * record.
2372				 */
2373				/* LINTED - alignment */
2374				larg = *((uint64_t *)(aggdata->dtada_data +
2375				    rec->dtrd_offset));
2376			} else {
2377				/*
2378				 * We don't have any prototype data.  As a
2379				 * result, we know that we _do_ have the
2380				 * compiler-generated information.  (If this
2381				 * were an anonymous enabling, all of our
2382				 * zero-filled data would have prototype data
2383				 * -- either directly or indirectly.) So as
2384				 * gross as it is, we'll grovel around in the
2385				 * compiler-generated information to find the
2386				 * lquantize() parameters.
2387				 */
2388				dtrace_stmtdesc_t *sdp;
2389				dt_ident_t *aid;
2390				dt_idsig_t *isp;
2391
2392				sdp = (dtrace_stmtdesc_t *)(uintptr_t)
2393				    aggdesc->dtagd_rec[0].dtrd_uarg;
2394				aid = sdp->dtsd_aggdata;
2395				isp = (dt_idsig_t *)aid->di_data;
2396				assert(isp->dis_auxinfo != 0);
2397				larg = isp->dis_auxinfo;
2398			}
2399
2400			/* LINTED - alignment */
2401			*((uint64_t *)(zdata + rec->dtrd_offset)) = larg;
2402		}
2403
2404		aggdata->dtada_data = zdata;
2405	}
2406
2407	/*
2408	 * Now that we've dealt with setting up our zero-filled data, we can
2409	 * allocate our sorted array, and take another pass over the data to
2410	 * fill it.
2411	 */
2412	sorted = dt_alloc(dtp, nentries * sizeof (dt_ahashent_t *));
2413
2414	if (sorted == NULL)
2415		goto out;
2416
2417	for (h = hash->dtah_all, i = 0; h != NULL; h = h->dtahe_nextall) {
2418		dtrace_aggvarid_t id;
2419
2420		if ((id = dt_aggregate_aggvarid(h)) > max || !map[id])
2421			continue;
2422
2423		sorted[i++] = h;
2424	}
2425
2426	assert(i == nentries);
2427
2428	/*
2429	 * We've loaded our array; now we need to sort by value to allow us
2430	 * to create bundles of like value.  We're going to acquire the
2431	 * dt_qsort_lock here, and hold it across all of our subsequent
2432	 * comparison and sorting.
2433	 */
2434	(void) pthread_mutex_lock(&dt_qsort_lock);
2435
2436	qsort(sorted, nentries, sizeof (dt_ahashent_t *),
2437	    dt_aggregate_keyvarcmp);
2438
2439	/*
2440	 * Now we need to go through and create bundles.  Because the number
2441	 * of bundles is bounded by the size of the sorted array, we're going
2442	 * to reuse the underlying storage.  And note that "bundle" is an
2443	 * array of pointers to arrays of pointers to dt_ahashent_t -- making
2444	 * its type (regrettably) "dt_ahashent_t ***".  (Regrettable because
2445	 * '*' -- like '_' and 'X' -- should never appear in triplicate in
2446	 * an ideal world.)
2447	 */
2448	bundle = (dt_ahashent_t ***)sorted;
2449
2450	for (i = 1, start = 0; i <= nentries; i++) {
2451		if (i < nentries &&
2452		    dt_aggregate_keycmp(&sorted[i], &sorted[i - 1]) == 0)
2453			continue;
2454
2455		/*
2456		 * We have a bundle boundary.  Everything from start to
2457		 * (i - 1) belongs in one bundle.
2458		 */
2459		assert(i - start <= naggvars);
2460		bundlesize = (naggvars + 2) * sizeof (dt_ahashent_t *);
2461
2462		if ((nbundle = dt_zalloc(dtp, bundlesize)) == NULL) {
2463			(void) pthread_mutex_unlock(&dt_qsort_lock);
2464			goto out;
2465		}
2466
2467		for (j = start; j < i; j++) {
2468			dtrace_aggvarid_t id = dt_aggregate_aggvarid(sorted[j]);
2469
2470			assert(id <= max);
2471			assert(map[id] != 0);
2472			assert(map[id] - 1 < naggvars);
2473			assert(nbundle[map[id] - 1] == NULL);
2474			nbundle[map[id] - 1] = sorted[j];
2475
2476			if (nbundle[naggvars] == NULL)
2477				nbundle[naggvars] = sorted[j];
2478		}
2479
2480		for (j = 0; j < naggvars; j++) {
2481			if (nbundle[j] != NULL)
2482				continue;
2483
2484			/*
2485			 * Before we assume that this aggregation variable
2486			 * isn't present (and fall back to using the
2487			 * zero-filled data allocated earlier), check the
2488			 * remap.  If we have a remapping, we'll drop it in
2489			 * here.  Note that we might be remapping an
2490			 * aggregation variable that isn't present for this
2491			 * key; in this case, the aggregation data that we
2492			 * copy will point to the zeroed data.
2493			 */
2494			if (remap != NULL && remap[j]) {
2495				assert(remap[j] - 1 < j);
2496				assert(nbundle[remap[j] - 1] != NULL);
2497				nbundle[j] = nbundle[remap[j] - 1];
2498			} else {
2499				nbundle[j] = &zaggdata[j];
2500			}
2501		}
2502
2503		bundle[nbundles++] = nbundle;
2504		start = i;
2505	}
2506
2507	/*
2508	 * Now we need to re-sort based on the first value.
2509	 */
2510	dt_aggregate_qsort(dtp, bundle, nbundles, sizeof (dt_ahashent_t **),
2511	    dt_aggregate_bundlecmp);
2512
2513	(void) pthread_mutex_unlock(&dt_qsort_lock);
2514
2515	/*
2516	 * We're done!  Now we just need to go back over the sorted bundles,
2517	 * calling the function.
2518	 */
2519	data = alloca((naggvars + 1) * sizeof (dtrace_aggdata_t *));
2520
2521	for (i = 0; i < nbundles; i++) {
2522		for (j = 0; j < naggvars; j++)
2523			data[j + 1] = NULL;
2524
2525		for (j = 0; j < naggvars; j++) {
2526			int ndx = j - sortpos;
2527
2528			if (ndx < 0)
2529				ndx += naggvars;
2530
2531			assert(bundle[i][ndx] != NULL);
2532			data[j + 1] = &bundle[i][ndx]->dtahe_data;
2533		}
2534
2535		for (j = 0; j < naggvars; j++)
2536			assert(data[j + 1] != NULL);
2537
2538		/*
2539		 * The representative key is the last element in the bundle.
2540		 * Assert that we have one, and then set it to be the first
2541		 * element of data.
2542		 */
2543		assert(bundle[i][j] != NULL);
2544		data[0] = &bundle[i][j]->dtahe_data;
2545
2546		if ((rval = func(data, naggvars + 1, arg)) == -1)
2547			goto out;
2548	}
2549
2550	rval = 0;
2551out:
2552	for (i = 0; i < nbundles; i++)
2553		dt_free(dtp, bundle[i]);
2554
2555	if (zaggdata != NULL) {
2556		for (i = 0; i < naggvars; i++)
2557			dt_free(dtp, zaggdata[i].dtahe_data.dtada_data);
2558	}
2559
2560	dt_free(dtp, zaggdata);
2561	dt_free(dtp, sorted);
2562	dt_free(dtp, remap);
2563	dt_free(dtp, map);
2564
2565	return (rval);
2566}
2567
2568int
2569dtrace_aggregate_print(dtrace_hdl_t *dtp, FILE *fp,
2570    dtrace_aggregate_walk_f *func)
2571{
2572	dt_print_aggdata_t pd;
2573
2574	bzero(&pd, sizeof (pd));
2575
2576	pd.dtpa_dtp = dtp;
2577	pd.dtpa_fp = fp;
2578	pd.dtpa_allunprint = 1;
2579
2580	if (func == NULL)
2581		func = dtrace_aggregate_walk_sorted;
2582
2583	if ((*func)(dtp, dt_print_agg, &pd) == -1)
2584		return (dt_set_errno(dtp, dtp->dt_errno));
2585
2586	return (0);
2587}
2588
2589void
2590dtrace_aggregate_clear(dtrace_hdl_t *dtp)
2591{
2592	dt_aggregate_t *agp = &dtp->dt_aggregate;
2593	dt_ahash_t *hash = &agp->dtat_hash;
2594	dt_ahashent_t *h;
2595	dtrace_aggdata_t *data;
2596	dtrace_aggdesc_t *aggdesc;
2597	dtrace_recdesc_t *rec;
2598	int i, max_cpus = agp->dtat_maxcpu;
2599
2600	for (h = hash->dtah_all; h != NULL; h = h->dtahe_nextall) {
2601		aggdesc = h->dtahe_data.dtada_desc;
2602		rec = &aggdesc->dtagd_rec[aggdesc->dtagd_nrecs - 1];
2603		data = &h->dtahe_data;
2604
2605		bzero(&data->dtada_data[rec->dtrd_offset], rec->dtrd_size);
2606
2607		if (data->dtada_percpu == NULL)
2608			continue;
2609
2610		for (i = 0; i < max_cpus; i++)
2611			bzero(data->dtada_percpu[i], rec->dtrd_size);
2612	}
2613}
2614
2615void
2616dt_aggregate_destroy(dtrace_hdl_t *dtp)
2617{
2618	dt_aggregate_t *agp = &dtp->dt_aggregate;
2619	dt_ahash_t *hash = &agp->dtat_hash;
2620	dt_ahashent_t *h, *next;
2621	dtrace_aggdata_t *aggdata;
2622	int i, max_cpus = agp->dtat_maxcpu;
2623
2624	if (hash->dtah_hash == NULL) {
2625		assert(hash->dtah_all == NULL);
2626	} else {
2627		free(hash->dtah_hash);
2628
2629		for (h = hash->dtah_all; h != NULL; h = next) {
2630			next = h->dtahe_nextall;
2631
2632			aggdata = &h->dtahe_data;
2633
2634			if (aggdata->dtada_percpu != NULL) {
2635				for (i = 0; i < max_cpus; i++)
2636					free(aggdata->dtada_percpu[i]);
2637				free(aggdata->dtada_percpu);
2638			}
2639
2640			free(aggdata->dtada_data);
2641			free(h);
2642		}
2643
2644		hash->dtah_hash = NULL;
2645		hash->dtah_all = NULL;
2646		hash->dtah_size = 0;
2647	}
2648
2649	free(agp->dtat_buf.dtbd_data);
2650	free(agp->dtat_cpus);
2651}
2652