1-- event IDs,  version 1 unless otherwise specified
2local event_id_open = 1
3local event_id_close = 2
4local event_id_milestone = 3
5local event_id_classification = 4
6local event_id_feature = 5
7local event_id_con_open = 6
8local event_id_con_close = 7
9-- 8 much?
10local event_id_oversub_update = 9
11local event_id_oversub_create = 10
12
13-- open, v1
14-- format: "1,time,uuid,flow_id,mac,policy_id"
15local function open_decode_from_csv(event_csv)
16	local s,e,pos
17	local event = { event = "open" }
18
19	pos = 1
20	-- event id
21	s,e = string.find(event_csv,',',pos)
22	local id = tonumber(string.sub(event_csv,pos,e - 1))
23	if id ~= event_id_open then
24		return nil
25	end
26	pos = e + 1
27	-- time
28	s,e = string.find(event_csv,',',pos)
29	event.time = string.sub(event_csv,pos,e - 1)
30	pos = e + 1
31	-- uuid
32	s,e = string.find(event_csv,',',pos)
33	event.uuid = string.sub(event_csv,pos,e - 1)
34	pos = e + 1
35	-- flow_id
36	s,e = string.find(event_csv,',',pos)
37	event.flow_id = string.sub(event_csv,pos,e - 1)
38	pos = e + 1
39	-- mac
40	s,e = string.find(event_csv,',',pos)
41	event.mac = string.sub(event_csv,pos,e - 1)
42	pos = e + 1
43	-- begin details
44	event.details = {}
45	-- policy_id
46	event.details.policy_id = string.sub(event_csv,pos)
47
48	return event
49end
50-- close, v1
51-- format: "2,time,uuid,flow_id,mac,tx_bytes,rx_bytes"
52local function close_decode_from_csv(event_csv)
53	local s,e,pos
54	local event = { event = "close" }
55
56	pos = 1
57	-- event id
58	s,e = string.find(event_csv,',',pos)
59	local id = tonumber(string.sub(event_csv,pos,e - 1))
60	if id ~= event_id_close then
61		return nil
62	end
63	pos = e + 1
64	-- time
65	s,e = string.find(event_csv,',',pos)
66	event.time = string.sub(event_csv,pos,e - 1)
67	pos = e + 1
68	-- uuid
69	s,e = string.find(event_csv,',',pos)
70	event.uuid = string.sub(event_csv,pos,e - 1)
71	pos = e + 1
72	-- flow_id
73	s,e = string.find(event_csv,',',pos)
74	event.flow_id = string.sub(event_csv,pos,e - 1)
75	pos = e + 1
76	-- mac
77	s,e = string.find(event_csv,',',pos)
78	event.mac = string.sub(event_csv,pos,e - 1)
79	pos = e + 1
80	-- begin details
81	event.details = {}
82	-- tx_bytes
83	s,e = string.find(event_csv,',',pos)
84	event.details.tx_bytes = string.sub(event_csv,pos,e - 1)
85	pos = e + 1
86	-- rx_bytes
87	event.details.rx_bytes = string.sub(event_csv,pos)
88
89	return event
90end
91-- milestone, v1
92-- format: "3,time,uuid,flow_id,tx_bytes,tx_packets,tx_qlen,tx_requeues,tx_overlimits,tx_drops,tx_backlog,rx_bytes,rx_packets,rx_qlen,rx_requeues,rx_overlimits,rx_drops,rx_backlog"
93local function milestone_decode_from_csv(event_csv)
94	local s,e,pos
95	local event = { event = "milestone" }
96
97	pos = 1
98	-- event id
99	s,e = string.find(event_csv,',',pos)
100	local id = tonumber(string.sub(event_csv,pos,e - 1))
101	if id ~= event_id_milestone then
102		return nil
103	end
104	pos = e + 1
105	-- time
106	s,e = string.find(event_csv,',',pos)
107	event.time = string.sub(event_csv,pos,e - 1)
108	pos = e + 1
109	-- uuid
110	s,e = string.find(event_csv,',',pos)
111	event.uuid = string.sub(event_csv,pos,e - 1)
112	pos = e + 1
113	-- flow_id
114	s,e = string.find(event_csv,',',pos)
115	event.flow_id = string.sub(event_csv,pos,e - 1)
116	pos = e + 1
117	-- begin details
118	event.details = {}
119	-- tx_bytes
120	s,e = string.find(event_csv,',',pos)
121	event.details.tx_bytes = string.sub(event_csv,pos,e - 1)
122	pos = e + 1
123	-- tx_packets
124	s,e = string.find(event_csv,',',pos)
125	event.details.tx_packets = string.sub(event_csv,pos,e - 1)
126	pos = e + 1
127	-- tx_qlen
128	s,e = string.find(event_csv,',',pos)
129	event.details.tx_qlen = string.sub(event_csv,pos,e - 1)
130	pos = e + 1
131	-- tx_requeues
132	s,e = string.find(event_csv,',',pos)
133	event.details.tx_requeues = string.sub(event_csv,pos,e - 1)
134	pos = e + 1
135	-- tx_overlimits
136	s,e = string.find(event_csv,',',pos)
137	event.details.tx_overlimits = string.sub(event_csv,pos,e - 1)
138	pos = e + 1
139	-- tx_drops
140	s,e = string.find(event_csv,',',pos)
141	event.details.tx_drops = string.sub(event_csv,pos,e - 1)
142	pos = e + 1
143	-- tx_backlog
144	s,e = string.find(event_csv,',',pos)
145	event.details.tx_backlog = string.sub(event_csv,pos,e - 1)
146	pos = e + 1
147	-- rx_bytes
148	s,e = string.find(event_csv,',',pos)
149	event.details.rx_bytes = string.sub(event_csv,pos,e - 1)
150	pos = e + 1
151	-- rx_packets
152	s,e = string.find(event_csv,',',pos)
153	event.details.rx_packets = string.sub(event_csv,pos,e - 1)
154	pos = e + 1
155	-- rx_qlen
156	s,e = string.find(event_csv,',',pos)
157	event.details.rx_qlen = string.sub(event_csv,pos,e - 1)
158	pos = e + 1
159	-- rx_requeues
160	s,e = string.find(event_csv,',',pos)
161	event.details.rx_requeues = string.sub(event_csv,pos,e - 1)
162	pos = e + 1
163	-- rx_overlimits
164	s,e = string.find(event_csv,',',pos)
165	event.details.rx_overlimits = string.sub(event_csv,pos,e - 1)
166	pos = e + 1
167	-- rx_drops
168	s,e = string.find(event_csv,',',pos)
169	event.details.rx_drops = string.sub(event_csv,pos,e - 1)
170	pos = e + 1
171	-- rx_backlog
172	event.details.rx_backlog = string.sub(event_csv,pos)
173
174	return event
175end
176
177-- decode a 5-tuple from the csv, where the first character of the
178-- 5-tuple starts at pos
179-- returns: a 2-tuple of (5-tuple-string, end_of_tuple_pos)
180local function tuple_decode_from_csv(event_csv,pos)
181	local s,e
182	local tstart,tend
183	-- tstart points to proto
184	tstart = pos
185	s,e = string.find(event_csv,',',tstart)
186	-- src_ip is e + 1
187	s,e = string.find(event_csv,',',e + 1)
188	-- sport is e + 1
189	s,e = string.find(event_csv,',',e + 1)
190	-- dest_ip is e + 1
191	s,e = string.find(event_csv,',',e + 1)
192	-- dport is e + 1
193	s,e = string.find(event_csv,',',e + 1)
194	if e then
195		tend = e - 1
196	else
197		tend = nil
198	end
199	return string.sub(event_csv,tstart,tend) , e
200end
201
202-- classification, v1
203-- format: "4,time,5-tuple,class,classifier"
204local function classification_decode_from_csv(event_csv)
205	local s,e,pos
206	local event = { event = "classification" }
207
208	pos = 1
209	-- event id
210	s,e = string.find(event_csv,',',pos)
211	local id = tonumber(string.sub(event_csv,pos,e - 1))
212	if id ~= event_id_classification then
213		return nil
214	end
215	pos = e + 1
216	-- time
217	s,e = string.find(event_csv,',',pos)
218	event.time = string.sub(event_csv,pos,e - 1)
219	pos = e + 1
220	-- 5-tuple
221	event['5-tuple'], e = tuple_decode_from_csv(event_csv,pos)
222	pos = e + 1
223	-- class
224	s,e = string.find(event_csv,',',pos)
225	event.class = string.sub(event_csv,pos,e - 1)
226	pos = e + 1
227	-- classifier
228	event.classifier = string.sub(event_csv,pos)
229
230	return event
231end
232-- feature discovery, v1
233-- format: "5,time,5-tuple,data"
234local function feature_decode_from_csv(event_csv)
235	local s,e,pos
236	local event = { event = "feature" }
237
238	pos = 1
239	-- event id
240	s,e = string.find(event_csv,',',pos)
241	local id = tonumber(string.sub(event_csv,pos,e - 1))
242	if id ~= event_id_feature then
243		return nil
244	end
245	pos = e + 1
246	-- time
247	s,e = string.find(event_csv,',',pos)
248	event.time = string.sub(event_csv,pos,e - 1)
249	pos = e + 1
250	-- 5-tuple
251	event['5-tuple'], e = tuple_decode_from_csv(event_csv,pos)
252	pos = e + 1
253	-- data
254	event.data = string.sub(event_csv,pos)
255
256	return event
257end
258-- con-open, v1
259-- format: "6,time,5-tuple"
260local function con_open_decode_from_csv(event_csv)
261	local s,e,pos
262	local event = { event = "con-open" }
263
264	pos = 1
265	-- event id
266	s,e = string.find(event_csv,',',pos)
267	local id = tonumber(string.sub(event_csv,pos,e - 1))
268	if id ~= event_id_con_open then
269		return nil
270	end
271	pos = e + 1
272	-- time
273	s,e = string.find(event_csv,',',pos)
274	event.time = string.sub(event_csv,pos,e - 1)
275	pos = e + 1
276	-- 5-tuple
277	event['5-tuple'], e = tuple_decode_from_csv(event_csv,pos)
278
279	return event
280end
281-- con-close, v1
282-- format: "7,time,5-tuple,orig-bytes,orig-packets,reply-bytes,reply-packets"
283local function con_close_decode_from_csv(event_csv)
284	local s,e,pos
285	local event = { event = "con-close" }
286
287	pos = 1
288	-- event id
289	s,e = string.find(event_csv,',',pos)
290	local id = tonumber(string.sub(event_csv,pos,e - 1))
291	if id ~= event_id_con_close then
292		return nil
293	end
294	pos = e + 1
295	-- time
296	s,e = string.find(event_csv,',',pos)
297	event.time = string.sub(event_csv,pos,e - 1)
298	pos = e + 1
299	-- 5-tuple
300	event['5-tuple'], e = tuple_decode_from_csv(event_csv,pos)
301	pos = e + 1
302	-- begin details
303	event.details = {}
304	-- orig-bytes
305	s,e = string.find(event_csv,',',pos)
306	event.details['orig-bytes'] = string.sub(event_csv,pos,e - 1)
307	pos = e + 1
308	-- orig-packets
309	s,e = string.find(event_csv,',',pos)
310	event.details['orig-packets'] = string.sub(event_csv,pos,e - 1)
311	pos = e + 1
312	-- reply-bytes
313	s,e = string.find(event_csv,',',pos)
314	event.details['reply-bytes'] = string.sub(event_csv,pos,e - 1)
315	pos = e + 1
316	-- reply-packets
317	event.details['reply-packets'] = string.sub(event_csv,pos)
318
319	return event
320end
321
322-- oversub update and create, v1
323-- format: "[9,10],time,uuid,flow_id,direction,policy_type,stratum,rate,delay,work"
324local function oversub_decode_from_csv(event_csv)
325	local s,e,pos
326	local event = {}
327
328	pos = 1
329	-- event id
330	s,e = string.find(event_csv,',',pos)
331	local id = tonumber(string.sub(event_csv,pos,e - 1))
332	if id ~= event_id_oversub_update and id ~= event_id_oversub_create then
333		return nil
334	end
335	pos = e + 1
336	-- time
337	s,e = string.find(event_csv,',',pos)
338	event.time = string.sub(event_csv,pos,e - 1)
339	pos = e + 1
340	-- uuid
341	s,e = string.find(event_csv,',',pos)
342	event.uuid = string.sub(event_csv,pos,e - 1)
343	pos = e + 1
344	-- flow_id
345	s,e = string.find(event_csv,',',pos)
346	event.flow_id = string.sub(event_csv,pos,e - 1)
347	pos = e + 1
348	-- begin details
349	event.details = {}
350	-- direction
351	s,e = string.find(event_csv,',',pos)
352	event.details.direction = string.sub(event_csv,pos,e - 1)
353	pos = e + 1
354	-- policy_type
355	s,e = string.find(event_csv,',',pos)
356	event.details.policy_type = string.sub(event_csv,pos,e - 1)
357	pos = e + 1
358	-- stratum
359	s,e = string.find(event_csv,',',pos)
360	event.details.stratum = string.sub(event_csv,pos,e - 1)
361	pos = e + 1
362	-- rate
363	s,e = string.find(event_csv,',',pos)
364	event.details.rate = string.sub(event_csv,pos,e - 1)
365	pos = e + 1
366	-- delay
367	s,e = string.find(event_csv,',',pos)
368	event.details.delay = string.sub(event_csv,pos,e - 1)
369	pos = e + 1
370	-- work
371	event.details.work = string.sub(event_csv,pos)
372
373	return event
374end
375-- oversub update, v1
376-- format: "9,time,uuid,flow_id,direction,policy_type,stratum,rate,delay,work"
377local function update_decode_from_csv(event_csv)
378	local event = oversub_decode_from_csv(event_csv)
379	if not event then
380		return nil
381	end
382	event.event = "update"
383	return event
384end
385-- oversub create, v1
386-- format: "10,time,uuid,flow_id,direction,policy_type,stratum,rate,delay,work"
387local function create_decode_from_csv(event_csv)
388	local event = oversub_decode_from_csv(event_csv)
389	if not event then
390		return nil
391	end
392	event.event = "create"
393	return event
394end
395
396-- lookup table that maps decode function of event types
397-- [<event_type_id>] = decode_event_type_fn
398local event_decoder_fn = {
399	[event_id_open] = open_decode_from_csv,
400	[event_id_close] = close_decode_from_csv,
401	[event_id_milestone] = milestone_decode_from_csv,
402	[event_id_classification] = classification_decode_from_csv,
403	[event_id_feature] = feature_decode_from_csv,
404	[event_id_con_open] = con_open_decode_from_csv,
405	[event_id_con_close] = con_close_decode_from_csv,
406	[event_id_oversub_update] = update_decode_from_csv,
407	[event_id_oversub_create] = create_decode_from_csv,
408}
409-- converts a single raw event entry in csv format into a lua table
410local function csv_to_table(event_csv)
411	if type(event_csv) ~= "string" then
412		return nil
413	end
414	-- the first field is always the ID of the event type
415	local s,e = string.find(event_csv,',',1)
416	local etype = string.sub(event_csv,1,e - 1)
417	local decode = event_decoder_fn[tonumber(etype)]
418	if decode then
419		return decode(event_csv)
420	end
421	return nil
422end
423-- converts a lua table containing a list of events into json
424local function table_to_json(events)
425	return cjson.encode(events)
426end
427