1/*-
2 * Copyright (c) 2011, 2012, 2013, 2014 Spectra Logic Corporation
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions, and the following disclaimer,
10 *    without modification.
11 * 2. Redistributions in binary form must reproduce at minimum a disclaimer
12 *    substantially similar to the "NO WARRANTY" disclaimer below
13 *    ("Disclaimer") and any redistribution must be conditioned upon
14 *    including a substantially similar Disclaimer requirement for further
15 *    binary redistribution.
16 *
17 * NO WARRANTY
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR
21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
27 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGES.
29 *
30 * Authors: Justin T. Gibbs     (Spectra Logic Corporation)
31 */
32
33/**
34 * \file consumer.cc
35 */
36
37#include <sys/cdefs.h>
38#include <sys/poll.h>
39#include <sys/socket.h>
40#include <sys/un.h>
41
42#include <err.h>
43#include <errno.h>
44#include <fcntl.h>
45#include <syslog.h>
46#include <unistd.h>
47
48#include <cstdarg>
49#include <cstring>
50#include <list>
51#include <map>
52#include <string>
53
54#include "guid.h"
55#include "event.h"
56#include "event_factory.h"
57#include "exception.h"
58
59#include "consumer.h"
60
61__FBSDID("$FreeBSD: releng/11.0/lib/libdevdctl/consumer.cc 301087 2016-05-31 23:26:45Z asomers $");
62
63/*================================== Macros ==================================*/
64#define NUM_ELEMENTS(x) (sizeof(x) / sizeof(*x))
65
66/*============================ Namespace Control =============================*/
67using std::string;
68namespace DevdCtl
69{
70
71/*============================= Class Definitions ============================*/
72/*----------------------------- DevdCtl::Consumer ----------------------------*/
73//- Consumer Static Private Data -----------------------------------------------
74const char Consumer::s_devdSockPath[] = "/var/run/devd.seqpacket.pipe";
75
76//- Consumer Public Methods ----------------------------------------------------
77Consumer::Consumer(Event::BuildMethod *defBuilder,
78		   EventFactory::Record *regEntries,
79		   size_t numEntries)
80 : m_devdSockFD(-1),
81   m_eventFactory(defBuilder),
82   m_replayingEvents(false)
83{
84	m_eventFactory.UpdateRegistry(regEntries, numEntries);
85}
86
87Consumer::~Consumer()
88{
89	DisconnectFromDevd();
90}
91
92bool
93Consumer::ConnectToDevd()
94{
95	struct sockaddr_un devdAddr;
96	int		   sLen;
97	int		   result;
98
99	if (m_devdSockFD != -1) {
100		/* Already connected. */
101		syslog(LOG_DEBUG, "%s: Already connected.", __func__);
102		return (true);
103	}
104	syslog(LOG_INFO, "%s: Connecting to devd.", __func__);
105
106	memset(&devdAddr, 0, sizeof(devdAddr));
107	devdAddr.sun_family= AF_UNIX;
108	strlcpy(devdAddr.sun_path, s_devdSockPath, sizeof(devdAddr.sun_path));
109	sLen = SUN_LEN(&devdAddr);
110
111	m_devdSockFD = socket(AF_UNIX, SOCK_SEQPACKET, 0);
112	if (m_devdSockFD == -1)
113		err(1, "Unable to create socket");
114        if (fcntl(m_devdSockFD, F_SETFL, O_NONBLOCK) < 0)
115                err(1, "fcntl");
116	result = connect(m_devdSockFD,
117			 reinterpret_cast<sockaddr *>(&devdAddr),
118			 sLen);
119	if (result == -1) {
120		syslog(LOG_INFO, "Unable to connect to devd");
121		DisconnectFromDevd();
122		return (false);
123	}
124
125	syslog(LOG_INFO, "Connection to devd successful");
126	return (true);
127}
128
129void
130Consumer::DisconnectFromDevd()
131{
132	if (m_devdSockFD != -1) {
133		syslog(LOG_INFO, "Disconnecting from devd.");
134		close(m_devdSockFD);
135	}
136	m_devdSockFD = -1;
137}
138
139std::string
140Consumer::ReadEvent()
141{
142	char buf[MAX_EVENT_SIZE + 1];
143	ssize_t len;
144
145	len = ::recv(m_devdSockFD, buf, MAX_EVENT_SIZE, MSG_WAITALL);
146	if (len == -1)
147		return (std::string(""));
148	else {
149		/* NULL-terminate the result */
150		buf[len] = '\0';
151		return (std::string(buf));
152	}
153}
154
155void
156Consumer::ReplayUnconsumedEvents(bool discardUnconsumed)
157{
158	EventList::iterator event(m_unconsumedEvents.begin());
159	bool replayed_any = (event != m_unconsumedEvents.end());
160
161	m_replayingEvents = true;
162	if (replayed_any)
163		syslog(LOG_INFO, "Started replaying unconsumed events");
164	while (event != m_unconsumedEvents.end()) {
165		bool consumed((*event)->Process());
166		if (consumed || discardUnconsumed) {
167			delete *event;
168			event = m_unconsumedEvents.erase(event);
169		} else {
170			event++;
171		}
172	}
173	if (replayed_any)
174		syslog(LOG_INFO, "Finished replaying unconsumed events");
175	m_replayingEvents = false;
176}
177
178bool
179Consumer::SaveEvent(const Event &event)
180{
181        if (m_replayingEvents)
182                return (false);
183        m_unconsumedEvents.push_back(event.DeepCopy());
184        return (true);
185}
186
187Event *
188Consumer::NextEvent()
189{
190	if (!Connected())
191		return(NULL);
192
193	Event *event(NULL);
194	try {
195		string evString;
196
197		evString = ReadEvent();
198		if (! evString.empty()) {
199			Event::TimestampEventString(evString);
200			event = Event::CreateEvent(m_eventFactory, evString);
201		}
202	} catch (const Exception &exp) {
203		exp.Log();
204		DisconnectFromDevd();
205	}
206	return (event);
207}
208
209/* Capture and process buffered events. */
210void
211Consumer::ProcessEvents()
212{
213	Event *event;
214	while ((event = NextEvent()) != NULL) {
215		if (event->Process())
216			SaveEvent(*event);
217		delete event;
218	}
219}
220
221void
222Consumer::FlushEvents()
223{
224	std::string s;
225
226	do
227		s = ReadEvent();
228	while (! s.empty()) ;
229}
230
231bool
232Consumer::EventsPending()
233{
234	struct pollfd fds[1];
235	int	      result;
236
237	do {
238		fds->fd      = m_devdSockFD;
239		fds->events  = POLLIN;
240		fds->revents = 0;
241		result = poll(fds, NUM_ELEMENTS(fds), /*timeout*/0);
242	} while (result == -1 && errno == EINTR);
243
244	if (result == -1)
245		err(1, "Polling for devd events failed");
246
247	if ((fds->revents & POLLERR) != 0)
248		throw Exception("Consumer::EventsPending(): "
249				"POLLERR detected on devd socket.");
250
251	if ((fds->revents & POLLHUP) != 0)
252		throw Exception("Consumer::EventsPending(): "
253				"POLLHUP detected on devd socket.");
254
255	return ((fds->revents & POLLIN) != 0);
256}
257
258} // namespace DevdCtl
259