1/*-
2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 */
27
28#include <sys/cdefs.h>
29__FBSDID("$FreeBSD$");
30
31#include <sys/types.h>
32#include <sys/event.h>
33#include <sys/socket.h>
34#include <sys/time.h>
35
36#include <assert.h>
37#include <errno.h>
38#include <nsswitch.h>
39#include <stdio.h>
40#include <stdlib.h>
41#include <string.h>
42
43#include "cachelib.h"
44#include "config.h"
45#include "debug.h"
46#include "log.h"
47#include "query.h"
48#include "mp_rs_query.h"
49#include "mp_ws_query.h"
50#include "singletons.h"
51
52static int on_mp_read_session_close_notification(struct query_state *);
53static void on_mp_read_session_destroy(struct query_state *);
54static int on_mp_read_session_mapper(struct query_state *);
55/* int on_mp_read_session_request_read1(struct query_state *); */
56static int on_mp_read_session_request_read2(struct query_state *);
57static int on_mp_read_session_request_process(struct query_state *);
58static int on_mp_read_session_response_write1(struct query_state *);
59static int on_mp_read_session_read_request_process(struct query_state *);
60static int on_mp_read_session_read_response_write1(struct query_state *);
61static int on_mp_read_session_read_response_write2(struct query_state *);
62
63/*
64 * This function is used as the query_state's destroy_func to make the
65 * proper cleanup in case of errors.
66 */
67static void
68on_mp_read_session_destroy(struct query_state *qstate)
69{
70	TRACE_IN(on_mp_read_session_destroy);
71	finalize_comm_element(&qstate->request);
72	finalize_comm_element(&qstate->response);
73
74	if (qstate->mdata != NULL) {
75		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
76		close_cache_mp_read_session(
77	    		(cache_mp_read_session)qstate->mdata);
78		configuration_unlock_entry(qstate->config_entry,
79			CELT_MULTIPART);
80	}
81	TRACE_OUT(on_mp_read_session_destroy);
82}
83
84/*
85 * The functions below are used to process multipart read session initiation
86 * requests.
87 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
88 *   the request itself
89 * - on_mp_read_session_request_process processes it
90 * - on_mp_read_session_response_write1 sends the response
91 */
92int
93on_mp_read_session_request_read1(struct query_state *qstate)
94{
95	struct cache_mp_read_session_request	*c_mp_rs_request;
96	ssize_t	result;
97
98	TRACE_IN(on_mp_read_session_request_read1);
99	if (qstate->kevent_watermark == 0)
100		qstate->kevent_watermark = sizeof(size_t);
101	else {
102		init_comm_element(&qstate->request,
103	    		CET_MP_READ_SESSION_REQUEST);
104		c_mp_rs_request = get_cache_mp_read_session_request(
105	    		&qstate->request);
106
107		result = qstate->read_func(qstate,
108	    		&c_mp_rs_request->entry_length, sizeof(size_t));
109
110		if (result != sizeof(size_t)) {
111			TRACE_OUT(on_mp_read_session_request_read1);
112			return (-1);
113		}
114
115		if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
116			TRACE_OUT(on_mp_read_session_request_read1);
117			return (-1);
118		}
119
120		c_mp_rs_request->entry = calloc(1,
121			c_mp_rs_request->entry_length + 1);
122		assert(c_mp_rs_request->entry != NULL);
123
124		qstate->kevent_watermark = c_mp_rs_request->entry_length;
125		qstate->process_func = on_mp_read_session_request_read2;
126	}
127	TRACE_OUT(on_mp_read_session_request_read1);
128	return (0);
129}
130
131static int
132on_mp_read_session_request_read2(struct query_state *qstate)
133{
134	struct cache_mp_read_session_request	*c_mp_rs_request;
135	ssize_t	result;
136
137	TRACE_IN(on_mp_read_session_request_read2);
138	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
139
140	result = qstate->read_func(qstate, c_mp_rs_request->entry,
141		c_mp_rs_request->entry_length);
142
143	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
144		LOG_ERR_3("on_mp_read_session_request_read2",
145			"read failed");
146		TRACE_OUT(on_mp_read_session_request_read2);
147		return (-1);
148	}
149
150	qstate->kevent_watermark = 0;
151	qstate->process_func = on_mp_read_session_request_process;
152	TRACE_OUT(on_mp_read_session_request_read2);
153	return (0);
154}
155
156static int
157on_mp_read_session_request_process(struct query_state *qstate)
158{
159	struct cache_mp_read_session_request	*c_mp_rs_request;
160	struct cache_mp_read_session_response	*c_mp_rs_response;
161	cache_mp_read_session	rs;
162	cache_entry	c_entry;
163	char	*dec_cache_entry_name;
164
165	char *buffer;
166	size_t buffer_size;
167	cache_mp_write_session ws;
168	struct agent	*lookup_agent;
169	struct multipart_agent *mp_agent;
170	void *mdata;
171	int res;
172
173	TRACE_IN(on_mp_read_session_request_process);
174	init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
175	c_mp_rs_response = get_cache_mp_read_session_response(
176		&qstate->response);
177	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
178
179	qstate->config_entry = configuration_find_entry(
180		s_configuration, c_mp_rs_request->entry);
181	if (qstate->config_entry == NULL) {
182		c_mp_rs_response->error_code = ENOENT;
183
184		LOG_ERR_2("read_session_request",
185			"can't find configuration entry '%s'."
186			" aborting request", c_mp_rs_request->entry);
187		goto fin;
188	}
189
190	if (qstate->config_entry->enabled == 0) {
191		c_mp_rs_response->error_code = EACCES;
192
193		LOG_ERR_2("read_session_request",
194			"configuration entry '%s' is disabled",
195			c_mp_rs_request->entry);
196		goto fin;
197	}
198
199	if (qstate->config_entry->perform_actual_lookups != 0)
200		dec_cache_entry_name = strdup(
201			qstate->config_entry->mp_cache_params.cep.entry_name);
202	else {
203#ifdef NS_NSCD_EID_CHECKING
204		if (check_query_eids(qstate) != 0) {
205			c_mp_rs_response->error_code = EPERM;
206			goto fin;
207		}
208#endif
209
210		asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
211			qstate->config_entry->mp_cache_params.cep.entry_name);
212	}
213
214	assert(dec_cache_entry_name != NULL);
215
216	configuration_lock_rdlock(s_configuration);
217	c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
218	configuration_unlock(s_configuration);
219
220	if ((c_entry == INVALID_CACHE) &&
221	   (qstate->config_entry->perform_actual_lookups != 0))
222		c_entry = register_new_mp_cache_entry(qstate,
223			dec_cache_entry_name);
224
225	free(dec_cache_entry_name);
226
227	if (c_entry != INVALID_CACHE_ENTRY) {
228		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
229		rs = open_cache_mp_read_session(c_entry);
230		configuration_unlock_entry(qstate->config_entry,
231			CELT_MULTIPART);
232
233		if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
234		   (qstate->config_entry->perform_actual_lookups != 0)) {
235			lookup_agent = find_agent(s_agent_table,
236				c_mp_rs_request->entry, MULTIPART_AGENT);
237
238			if ((lookup_agent != NULL) &&
239			(lookup_agent->type == MULTIPART_AGENT)) {
240				mp_agent = (struct multipart_agent *)
241					lookup_agent;
242				mdata = mp_agent->mp_init_func();
243
244				/*
245				 * Multipart agents read the whole snapshot
246				 * of the data at one time.
247				 */
248				configuration_lock_entry(qstate->config_entry,
249					CELT_MULTIPART);
250				ws = open_cache_mp_write_session(c_entry);
251				configuration_unlock_entry(qstate->config_entry,
252					CELT_MULTIPART);
253				if (ws != NULL) {
254				    do {
255					buffer = NULL;
256					res = mp_agent->mp_lookup_func(&buffer,
257						&buffer_size,
258						mdata);
259
260					if ((res & NS_TERMINATE) &&
261					   (buffer != NULL)) {
262						configuration_lock_entry(
263							qstate->config_entry,
264						   	CELT_MULTIPART);
265						if (cache_mp_write(ws, buffer,
266						    buffer_size) != 0) {
267							abandon_cache_mp_write_session(ws);
268							ws = NULL;
269						}
270						configuration_unlock_entry(
271							qstate->config_entry,
272							CELT_MULTIPART);
273
274						free(buffer);
275						buffer = NULL;
276					} else {
277						configuration_lock_entry(
278							qstate->config_entry,
279							CELT_MULTIPART);
280						close_cache_mp_write_session(ws);
281						configuration_unlock_entry(
282							qstate->config_entry,
283							CELT_MULTIPART);
284
285						free(buffer);
286						buffer = NULL;
287					}
288				    } while ((res & NS_TERMINATE) &&
289				    	    (ws != NULL));
290				}
291
292				configuration_lock_entry(qstate->config_entry,
293					CELT_MULTIPART);
294				rs = open_cache_mp_read_session(c_entry);
295				configuration_unlock_entry(qstate->config_entry,
296					CELT_MULTIPART);
297			}
298		}
299
300		if (rs == INVALID_CACHE_MP_READ_SESSION)
301			c_mp_rs_response->error_code = -1;
302		else {
303		    qstate->mdata = rs;
304		    qstate->destroy_func = on_mp_read_session_destroy;
305
306		    configuration_lock_entry(qstate->config_entry,
307			CELT_MULTIPART);
308		    if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
309		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
310			memcpy(&qstate->timeout,
311			    &qstate->config_entry->mp_query_timeout,
312			    sizeof(struct timeval));
313		    configuration_unlock_entry(qstate->config_entry,
314			CELT_MULTIPART);
315		}
316	} else
317		c_mp_rs_response->error_code = -1;
318
319fin:
320	qstate->process_func = on_mp_read_session_response_write1;
321	qstate->kevent_watermark = sizeof(int);
322	qstate->kevent_filter = EVFILT_WRITE;
323
324	TRACE_OUT(on_mp_read_session_request_process);
325	return (0);
326}
327
328static int
329on_mp_read_session_response_write1(struct query_state *qstate)
330{
331	struct cache_mp_read_session_response	*c_mp_rs_response;
332	ssize_t	result;
333
334	TRACE_IN(on_mp_read_session_response_write1);
335	c_mp_rs_response = get_cache_mp_read_session_response(
336		&qstate->response);
337	result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
338		sizeof(int));
339
340	if (result != sizeof(int)) {
341		LOG_ERR_3("on_mp_read_session_response_write1",
342			"write failed");
343		TRACE_OUT(on_mp_read_session_response_write1);
344		return (-1);
345	}
346
347	if (c_mp_rs_response->error_code == 0) {
348		qstate->kevent_watermark = sizeof(int);
349		qstate->process_func = on_mp_read_session_mapper;
350		qstate->kevent_filter = EVFILT_READ;
351	} else {
352		qstate->kevent_watermark = 0;
353		qstate->process_func = NULL;
354	}
355	TRACE_OUT(on_mp_read_session_response_write1);
356	return (0);
357}
358
359/*
360 * Mapper function is used to avoid multiple connections for each session
361 * write or read requests. After processing the request, it does not close
362 * the connection, but waits for the next request.
363 */
364static int
365on_mp_read_session_mapper(struct query_state *qstate)
366{
367	ssize_t	result;
368	int elem_type;
369
370	TRACE_IN(on_mp_read_session_mapper);
371	if (qstate->kevent_watermark == 0) {
372		qstate->kevent_watermark = sizeof(int);
373	} else {
374		result = qstate->read_func(qstate, &elem_type, sizeof(int));
375		if (result != sizeof(int)) {
376			LOG_ERR_3("on_mp_read_session_mapper",
377				"read failed");
378			TRACE_OUT(on_mp_read_session_mapper);
379			return (-1);
380		}
381
382		switch (elem_type) {
383		case CET_MP_READ_SESSION_READ_REQUEST:
384			qstate->kevent_watermark = 0;
385			qstate->process_func =
386				on_mp_read_session_read_request_process;
387			break;
388		case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
389			qstate->kevent_watermark = 0;
390			qstate->process_func =
391				on_mp_read_session_close_notification;
392			break;
393		default:
394			qstate->kevent_watermark = 0;
395			qstate->process_func = NULL;
396			LOG_ERR_3("on_mp_read_session_mapper",
397				"unknown element type");
398			TRACE_OUT(on_mp_read_session_mapper);
399			return (-1);
400		}
401	}
402	TRACE_OUT(on_mp_read_session_mapper);
403	return (0);
404}
405
406/*
407 * The functions below are used to process multipart read sessions read
408 * requests. User doesn't have to pass any kind of data, besides the
409 * request identificator itself. So we don't need any XXX_read functions and
410 * start with the XXX_process function.
411 * - on_mp_read_session_read_request_process processes it
412 * - on_mp_read_session_read_response_write1 and
413 *   on_mp_read_session_read_response_write2 sends the response
414 */
415static int
416on_mp_read_session_read_request_process(struct query_state *qstate)
417{
418	struct cache_mp_read_session_read_response	*read_response;
419
420	TRACE_IN(on_mp_read_session_response_process);
421	init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
422	read_response = get_cache_mp_read_session_read_response(
423		&qstate->response);
424
425	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
426	read_response->error_code = cache_mp_read(
427		(cache_mp_read_session)qstate->mdata, NULL,
428		&read_response->data_size);
429
430	if (read_response->error_code == 0) {
431		read_response->data = malloc(read_response->data_size);
432		assert(read_response != NULL);
433		read_response->error_code = cache_mp_read(
434			(cache_mp_read_session)qstate->mdata,
435	    		read_response->data,
436			&read_response->data_size);
437	}
438	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
439
440	if (read_response->error_code == 0)
441		qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
442	else
443		qstate->kevent_watermark = sizeof(int);
444	qstate->process_func = on_mp_read_session_read_response_write1;
445	qstate->kevent_filter = EVFILT_WRITE;
446
447	TRACE_OUT(on_mp_read_session_response_process);
448	return (0);
449}
450
451static int
452on_mp_read_session_read_response_write1(struct query_state *qstate)
453{
454	struct cache_mp_read_session_read_response	*read_response;
455	ssize_t	result;
456
457	TRACE_IN(on_mp_read_session_read_response_write1);
458	read_response = get_cache_mp_read_session_read_response(
459		&qstate->response);
460
461	result = qstate->write_func(qstate, &read_response->error_code,
462		sizeof(int));
463	if (read_response->error_code == 0) {
464		result += qstate->write_func(qstate, &read_response->data_size,
465			sizeof(size_t));
466		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
467			TRACE_OUT(on_mp_read_session_read_response_write1);
468			LOG_ERR_3("on_mp_read_session_read_response_write1",
469				"write failed");
470			return (-1);
471		}
472
473		qstate->kevent_watermark = read_response->data_size;
474		qstate->process_func = on_mp_read_session_read_response_write2;
475	} else {
476		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
477			LOG_ERR_3("on_mp_read_session_read_response_write1",
478				"write failed");
479			TRACE_OUT(on_mp_read_session_read_response_write1);
480			return (-1);
481		}
482
483		qstate->kevent_watermark = 0;
484		qstate->process_func = NULL;
485	}
486
487	TRACE_OUT(on_mp_read_session_read_response_write1);
488	return (0);
489}
490
491static int
492on_mp_read_session_read_response_write2(struct query_state *qstate)
493{
494	struct cache_mp_read_session_read_response *read_response;
495	ssize_t	result;
496
497	TRACE_IN(on_mp_read_session_read_response_write2);
498	read_response = get_cache_mp_read_session_read_response(
499		&qstate->response);
500	result = qstate->write_func(qstate, read_response->data,
501		read_response->data_size);
502	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
503		LOG_ERR_3("on_mp_read_session_read_response_write2",
504			"write failed");
505		TRACE_OUT(on_mp_read_session_read_response_write2);
506		return (-1);
507	}
508
509	finalize_comm_element(&qstate->request);
510	finalize_comm_element(&qstate->response);
511
512	qstate->kevent_watermark = sizeof(int);
513	qstate->process_func = on_mp_read_session_mapper;
514	qstate->kevent_filter = EVFILT_READ;
515
516	TRACE_OUT(on_mp_read_session_read_response_write2);
517	return (0);
518}
519
520/*
521 * Handles session close notification by calling close_cache_mp_read_session
522 * function.
523 */
524static int
525on_mp_read_session_close_notification(struct query_state *qstate)
526{
527
528	TRACE_IN(on_mp_read_session_close_notification);
529	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
530	close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
531	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
532	qstate->mdata = NULL;
533	qstate->kevent_watermark = 0;
534	qstate->process_func = NULL;
535	TRACE_OUT(on_mp_read_session_close_notification);
536	return (0);
537}
538