mp_rs_query.c revision 183770
1/*-
2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 */
27
28#include <sys/cdefs.h>
29__FBSDID("$FreeBSD: head/usr.sbin/nscd/mp_rs_query.c 183770 2008-10-12 00:44:27Z delphij $");
30
31#include <sys/socket.h>
32#include <sys/time.h>
33#include <sys/types.h>
34#include <sys/event.h>
35#include <assert.h>
36#include <errno.h>
37#include <stdlib.h>
38#include <string.h>
39#include <stdio.h>
40
41#include "cachelib.h"
42#include "config.h"
43#include "debug.h"
44#include "log.h"
45#include "query.h"
46#include "mp_rs_query.h"
47#include "mp_ws_query.h"
48#include "singletons.h"
49
50static int on_mp_read_session_close_notification(struct query_state *);
51static void on_mp_read_session_destroy(struct query_state *);
52static int on_mp_read_session_mapper(struct query_state *);
53/* int on_mp_read_session_request_read1(struct query_state *); */
54static int on_mp_read_session_request_read2(struct query_state *);
55static int on_mp_read_session_request_process(struct query_state *);
56static int on_mp_read_session_response_write1(struct query_state *);
57static int on_mp_read_session_read_request_process(struct query_state *);
58static int on_mp_read_session_read_response_write1(struct query_state *);
59static int on_mp_read_session_read_response_write2(struct query_state *);
60
61/*
62 * This function is used as the query_state's destroy_func to make the
63 * proper cleanup in case of errors.
64 */
65static void
66on_mp_read_session_destroy(struct query_state *qstate)
67{
68	TRACE_IN(on_mp_read_session_destroy);
69	finalize_comm_element(&qstate->request);
70	finalize_comm_element(&qstate->response);
71
72	if (qstate->mdata != NULL) {
73		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
74		close_cache_mp_read_session(
75	    		(cache_mp_read_session)qstate->mdata);
76		configuration_unlock_entry(qstate->config_entry,
77			CELT_MULTIPART);
78	}
79	TRACE_OUT(on_mp_read_session_destroy);
80}
81
82/*
83 * The functions below are used to process multipart read session initiation
84 * requests.
85 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
86 *   the request itself
87 * - on_mp_read_session_request_process processes it
88 * - on_mp_read_session_response_write1 sends the response
89 */
90int
91on_mp_read_session_request_read1(struct query_state *qstate)
92{
93	struct cache_mp_read_session_request	*c_mp_rs_request;
94	ssize_t	result;
95
96	TRACE_IN(on_mp_read_session_request_read1);
97	if (qstate->kevent_watermark == 0)
98		qstate->kevent_watermark = sizeof(size_t);
99	else {
100		init_comm_element(&qstate->request,
101	    		CET_MP_READ_SESSION_REQUEST);
102		c_mp_rs_request = get_cache_mp_read_session_request(
103	    		&qstate->request);
104
105		result = qstate->read_func(qstate,
106	    		&c_mp_rs_request->entry_length, sizeof(size_t));
107
108		if (result != sizeof(size_t)) {
109			TRACE_OUT(on_mp_read_session_request_read1);
110			return (-1);
111		}
112
113		if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
114			TRACE_OUT(on_mp_read_session_request_read1);
115			return (-1);
116		}
117
118		c_mp_rs_request->entry = (char *)calloc(1,
119			c_mp_rs_request->entry_length + 1);
120		assert(c_mp_rs_request->entry != NULL);
121
122		qstate->kevent_watermark = c_mp_rs_request->entry_length;
123		qstate->process_func = on_mp_read_session_request_read2;
124	}
125	TRACE_OUT(on_mp_read_session_request_read1);
126	return (0);
127}
128
129static int
130on_mp_read_session_request_read2(struct query_state *qstate)
131{
132	struct cache_mp_read_session_request	*c_mp_rs_request;
133	ssize_t	result;
134
135	TRACE_IN(on_mp_read_session_request_read2);
136	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
137
138	result = qstate->read_func(qstate, c_mp_rs_request->entry,
139		c_mp_rs_request->entry_length);
140
141	if (result != qstate->kevent_watermark) {
142		LOG_ERR_3("on_mp_read_session_request_read2",
143			"read failed");
144		TRACE_OUT(on_mp_read_session_request_read2);
145		return (-1);
146	}
147
148	qstate->kevent_watermark = 0;
149	qstate->process_func = on_mp_read_session_request_process;
150	TRACE_OUT(on_mp_read_session_request_read2);
151	return (0);
152}
153
154static int
155on_mp_read_session_request_process(struct query_state *qstate)
156{
157	struct cache_mp_read_session_request	*c_mp_rs_request;
158	struct cache_mp_read_session_response	*c_mp_rs_response;
159	cache_mp_read_session	rs;
160	cache_entry	c_entry;
161	char	*dec_cache_entry_name;
162
163	char *buffer;
164	size_t buffer_size;
165	cache_mp_write_session ws;
166	struct agent	*lookup_agent;
167	struct multipart_agent *mp_agent;
168	void *mdata;
169	int res;
170
171	TRACE_IN(on_mp_read_session_request_process);
172	init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
173	c_mp_rs_response = get_cache_mp_read_session_response(
174		&qstate->response);
175	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
176
177	qstate->config_entry = configuration_find_entry(
178		s_configuration, c_mp_rs_request->entry);
179	if (qstate->config_entry == NULL) {
180		c_mp_rs_response->error_code = ENOENT;
181
182		LOG_ERR_2("read_session_request",
183			"can't find configuration entry '%s'."
184			" aborting request", c_mp_rs_request->entry);
185		goto fin;
186	}
187
188	if (qstate->config_entry->enabled == 0) {
189		c_mp_rs_response->error_code = EACCES;
190
191		LOG_ERR_2("read_session_request",
192			"configuration entry '%s' is disabled",
193			c_mp_rs_request->entry);
194		goto fin;
195	}
196
197	if (qstate->config_entry->perform_actual_lookups != 0)
198		dec_cache_entry_name = strdup(
199			qstate->config_entry->mp_cache_params.entry_name);
200	else {
201#ifdef NS_NSCD_EID_CHECKING
202		if (check_query_eids(qstate) != 0) {
203			c_mp_rs_response->error_code = EPERM;
204			goto fin;
205		}
206#endif
207
208		asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
209			qstate->config_entry->mp_cache_params.entry_name);
210	}
211
212	assert(dec_cache_entry_name != NULL);
213
214	configuration_lock_rdlock(s_configuration);
215	c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
216	configuration_unlock(s_configuration);
217
218	if ((c_entry == INVALID_CACHE) &&
219	   (qstate->config_entry->perform_actual_lookups != 0))
220		c_entry = register_new_mp_cache_entry(qstate,
221			dec_cache_entry_name);
222
223	free(dec_cache_entry_name);
224
225	if (c_entry != INVALID_CACHE_ENTRY) {
226		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
227		rs = open_cache_mp_read_session(c_entry);
228		configuration_unlock_entry(qstate->config_entry,
229			CELT_MULTIPART);
230
231		if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
232		   (qstate->config_entry->perform_actual_lookups != 0)) {
233			lookup_agent = find_agent(s_agent_table,
234				c_mp_rs_request->entry, MULTIPART_AGENT);
235
236			if ((lookup_agent != NULL) &&
237			(lookup_agent->type == MULTIPART_AGENT)) {
238				mp_agent = (struct multipart_agent *)
239					lookup_agent;
240				mdata = mp_agent->mp_init_func();
241
242				/*
243				 * Multipart agents read the whole snapshot
244				 * of the data at one time.
245				 */
246				configuration_lock_entry(qstate->config_entry,
247					CELT_MULTIPART);
248				ws = open_cache_mp_write_session(c_entry);
249				configuration_unlock_entry(qstate->config_entry,
250					CELT_MULTIPART);
251				if (ws != NULL) {
252				    do {
253					buffer = NULL;
254					res = mp_agent->mp_lookup_func(&buffer,
255						&buffer_size,
256						mdata);
257
258					if ((res & NS_TERMINATE) &&
259					   (buffer != NULL)) {
260						configuration_lock_entry(
261							qstate->config_entry,
262						   	CELT_MULTIPART);
263						if (cache_mp_write(ws, buffer,
264						    buffer_size) != 0) {
265							abandon_cache_mp_write_session(ws);
266							ws = NULL;
267						}
268						configuration_unlock_entry(
269							qstate->config_entry,
270							CELT_MULTIPART);
271
272						free(buffer);
273						buffer = NULL;
274					} else {
275						configuration_lock_entry(
276							qstate->config_entry,
277							CELT_MULTIPART);
278						close_cache_mp_write_session(ws);
279						configuration_unlock_entry(
280							qstate->config_entry,
281							CELT_MULTIPART);
282
283						free(buffer);
284						buffer = NULL;
285					}
286				    } while ((res & NS_TERMINATE) &&
287				    	    (ws != NULL));
288				}
289
290				configuration_lock_entry(qstate->config_entry,
291					CELT_MULTIPART);
292				rs = open_cache_mp_read_session(c_entry);
293				configuration_unlock_entry(qstate->config_entry,
294					CELT_MULTIPART);
295			}
296		}
297
298		if (rs == INVALID_CACHE_MP_READ_SESSION)
299			c_mp_rs_response->error_code = -1;
300		else {
301		    qstate->mdata = rs;
302		    qstate->destroy_func = on_mp_read_session_destroy;
303
304		    configuration_lock_entry(qstate->config_entry,
305			CELT_MULTIPART);
306		    if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
307		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
308			memcpy(&qstate->timeout,
309			    &qstate->config_entry->mp_query_timeout,
310			    sizeof(struct timeval));
311		    configuration_unlock_entry(qstate->config_entry,
312			CELT_MULTIPART);
313		}
314	} else
315		c_mp_rs_response->error_code = -1;
316
317fin:
318	qstate->process_func = on_mp_read_session_response_write1;
319	qstate->kevent_watermark = sizeof(int);
320	qstate->kevent_filter = EVFILT_WRITE;
321
322	TRACE_OUT(on_mp_read_session_request_process);
323	return (0);
324}
325
326static int
327on_mp_read_session_response_write1(struct query_state *qstate)
328{
329	struct cache_mp_read_session_response	*c_mp_rs_response;
330	ssize_t	result;
331
332	TRACE_IN(on_mp_read_session_response_write1);
333	c_mp_rs_response = get_cache_mp_read_session_response(
334		&qstate->response);
335	result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
336		sizeof(int));
337
338	if (result != sizeof(int)) {
339		LOG_ERR_3("on_mp_read_session_response_write1",
340			"write failed");
341		TRACE_OUT(on_mp_read_session_response_write1);
342		return (-1);
343	}
344
345	if (c_mp_rs_response->error_code == 0) {
346		qstate->kevent_watermark = sizeof(int);
347		qstate->process_func = on_mp_read_session_mapper;
348		qstate->kevent_filter = EVFILT_READ;
349	} else {
350		qstate->kevent_watermark = 0;
351		qstate->process_func = NULL;
352	}
353	TRACE_OUT(on_mp_read_session_response_write1);
354	return (0);
355}
356
357/*
358 * Mapper function is used to avoid multiple connections for each session
359 * write or read requests. After processing the request, it does not close
360 * the connection, but waits for the next request.
361 */
362static int
363on_mp_read_session_mapper(struct query_state *qstate)
364{
365	ssize_t	result;
366	int elem_type;
367
368	TRACE_IN(on_mp_read_session_mapper);
369	if (qstate->kevent_watermark == 0) {
370		qstate->kevent_watermark = sizeof(int);
371	} else {
372		result = qstate->read_func(qstate, &elem_type, sizeof(int));
373		if (result != sizeof(int)) {
374			LOG_ERR_3("on_mp_read_session_mapper",
375				"read failed");
376			TRACE_OUT(on_mp_read_session_mapper);
377			return (-1);
378		}
379
380		switch (elem_type) {
381		case CET_MP_READ_SESSION_READ_REQUEST:
382			qstate->kevent_watermark = 0;
383			qstate->process_func =
384				on_mp_read_session_read_request_process;
385			break;
386		case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
387			qstate->kevent_watermark = 0;
388			qstate->process_func =
389				on_mp_read_session_close_notification;
390			break;
391		default:
392			qstate->kevent_watermark = 0;
393			qstate->process_func = NULL;
394			LOG_ERR_3("on_mp_read_session_mapper",
395				"unknown element type");
396			TRACE_OUT(on_mp_read_session_mapper);
397			return (-1);
398		}
399	}
400	TRACE_OUT(on_mp_read_session_mapper);
401	return (0);
402}
403
404/*
405 * The functions below are used to process multipart read sessions read
406 * requests. User doesn't have to pass any kind of data, besides the
407 * request identificator itself. So we don't need any XXX_read functions and
408 * start with the XXX_process function.
409 * - on_mp_read_session_read_request_process processes it
410 * - on_mp_read_session_read_response_write1 and
411 *   on_mp_read_session_read_response_write2 sends the response
412 */
413static int
414on_mp_read_session_read_request_process(struct query_state *qstate)
415{
416	struct cache_mp_read_session_read_response	*read_response;
417
418	TRACE_IN(on_mp_read_session_response_process);
419	init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
420	read_response = get_cache_mp_read_session_read_response(
421		&qstate->response);
422
423	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
424	read_response->error_code = cache_mp_read(
425		(cache_mp_read_session)qstate->mdata, NULL,
426		&read_response->data_size);
427
428	if (read_response->error_code == 0) {
429		read_response->data = (char *)malloc(read_response->data_size);
430		assert(read_response != NULL);
431		read_response->error_code = cache_mp_read(
432			(cache_mp_read_session)qstate->mdata,
433	    		read_response->data,
434			&read_response->data_size);
435	}
436	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
437
438	if (read_response->error_code == 0)
439		qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
440	else
441		qstate->kevent_watermark = sizeof(int);
442	qstate->process_func = on_mp_read_session_read_response_write1;
443	qstate->kevent_filter = EVFILT_WRITE;
444
445	TRACE_OUT(on_mp_read_session_response_process);
446	return (0);
447}
448
449static int
450on_mp_read_session_read_response_write1(struct query_state *qstate)
451{
452	struct cache_mp_read_session_read_response	*read_response;
453	ssize_t	result;
454
455	TRACE_IN(on_mp_read_session_read_response_write1);
456	read_response = get_cache_mp_read_session_read_response(
457		&qstate->response);
458
459	result = qstate->write_func(qstate, &read_response->error_code,
460		sizeof(int));
461	if (read_response->error_code == 0) {
462		result += qstate->write_func(qstate, &read_response->data_size,
463			sizeof(size_t));
464		if (result != qstate->kevent_watermark) {
465			TRACE_OUT(on_mp_read_session_read_response_write1);
466			LOG_ERR_3("on_mp_read_session_read_response_write1",
467				"write failed");
468			return (-1);
469		}
470
471		qstate->kevent_watermark = read_response->data_size;
472		qstate->process_func = on_mp_read_session_read_response_write2;
473	} else {
474		if (result != qstate->kevent_watermark) {
475			LOG_ERR_3("on_mp_read_session_read_response_write1",
476				"write failed");
477			TRACE_OUT(on_mp_read_session_read_response_write1);
478			return (-1);
479		}
480
481		qstate->kevent_watermark = 0;
482		qstate->process_func = NULL;
483	}
484
485	TRACE_OUT(on_mp_read_session_read_response_write1);
486	return (0);
487}
488
489static int
490on_mp_read_session_read_response_write2(struct query_state *qstate)
491{
492	struct cache_mp_read_session_read_response *read_response;
493	ssize_t	result;
494
495	TRACE_IN(on_mp_read_session_read_response_write2);
496	read_response = get_cache_mp_read_session_read_response(
497		&qstate->response);
498	result = qstate->write_func(qstate, read_response->data,
499		read_response->data_size);
500	if (result != qstate->kevent_watermark) {
501		LOG_ERR_3("on_mp_read_session_read_response_write2",
502			"write failed");
503		TRACE_OUT(on_mp_read_session_read_response_write2);
504		return (-1);
505	}
506
507	finalize_comm_element(&qstate->request);
508	finalize_comm_element(&qstate->response);
509
510	qstate->kevent_watermark = sizeof(int);
511	qstate->process_func = on_mp_read_session_mapper;
512	qstate->kevent_filter = EVFILT_READ;
513
514	TRACE_OUT(on_mp_read_session_read_response_write2);
515	return (0);
516}
517
518/*
519 * Handles session close notification by calling close_cache_mp_read_session
520 * function.
521 */
522static int
523on_mp_read_session_close_notification(struct query_state *qstate)
524{
525
526	TRACE_IN(on_mp_read_session_close_notification);
527	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
528	close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
529	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
530	qstate->mdata = NULL;
531	qstate->kevent_watermark = 0;
532	qstate->process_func = NULL;
533	TRACE_OUT(on_mp_read_session_close_notification);
534	return (0);
535}
536