1133808Spjd/*-
2142727Spjd * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3133808Spjd * All rights reserved.
4133808Spjd *
5133808Spjd * Redistribution and use in source and binary forms, with or without
6133808Spjd * modification, are permitted provided that the following conditions
7133808Spjd * are met:
8133808Spjd * 1. Redistributions of source code must retain the above copyright
9133808Spjd *    notice, this list of conditions and the following disclaimer.
10133808Spjd * 2. Redistributions in binary form must reproduce the above copyright
11133808Spjd *    notice, this list of conditions and the following disclaimer in the
12133808Spjd *    documentation and/or other materials provided with the distribution.
13133808Spjd *
14133808Spjd * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15133808Spjd * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16133808Spjd * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17133808Spjd * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18133808Spjd * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19133808Spjd * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20133808Spjd * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21133808Spjd * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22133808Spjd * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23133808Spjd * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24133808Spjd * SUCH DAMAGE.
25133808Spjd *
26133808Spjd */
27133808Spjd
28133808Spjd#include <sys/cdefs.h>
29133808Spjd__FBSDID("$FreeBSD: releng/11.0/usr.sbin/nscd/mp_rs_query.c 194104 2009-06-13 13:07:56Z des $");
30133808Spjd
31133808Spjd#include <sys/types.h>
32133808Spjd#include <sys/event.h>
33133808Spjd#include <sys/socket.h>
34133808Spjd#include <sys/time.h>
35133808Spjd
36133808Spjd#include <assert.h>
37133808Spjd#include <errno.h>
38133808Spjd#include <nsswitch.h>
39133808Spjd#include <stdio.h>
40133808Spjd#include <stdlib.h>
41133808Spjd#include <string.h>
42133808Spjd
43133808Spjd#include "cachelib.h"
44133808Spjd#include "config.h"
45133808Spjd#include "debug.h"
46133808Spjd#include "log.h"
47133808Spjd#include "query.h"
48133808Spjd#include "mp_rs_query.h"
49133808Spjd#include "mp_ws_query.h"
50133808Spjd#include "singletons.h"
51133808Spjd
52133808Spjdstatic int on_mp_read_session_close_notification(struct query_state *);
53133808Spjdstatic void on_mp_read_session_destroy(struct query_state *);
54143586Spjdstatic int on_mp_read_session_mapper(struct query_state *);
55143586Spjd/* int on_mp_read_session_request_read1(struct query_state *); */
56143586Spjdstatic int on_mp_read_session_request_read2(struct query_state *);
57133808Spjdstatic int on_mp_read_session_request_process(struct query_state *);
58133808Spjdstatic int on_mp_read_session_response_write1(struct query_state *);
59133808Spjdstatic int on_mp_read_session_read_request_process(struct query_state *);
60133808Spjdstatic int on_mp_read_session_read_response_write1(struct query_state *);
61133808Spjdstatic int on_mp_read_session_read_response_write2(struct query_state *);
62133808Spjd
63134124Spjd/*
64134124Spjd * This function is used as the query_state's destroy_func to make the
65134168Spjd * proper cleanup in case of errors.
66134168Spjd */
67133808Spjdstatic void
68143586Spjdon_mp_read_session_destroy(struct query_state *qstate)
69143586Spjd{
70133808Spjd	TRACE_IN(on_mp_read_session_destroy);
71143586Spjd	finalize_comm_element(&qstate->request);
72143586Spjd	finalize_comm_element(&qstate->response);
73143586Spjd
74133808Spjd	if (qstate->mdata != NULL) {
75133808Spjd		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
76133808Spjd		close_cache_mp_read_session(
77133808Spjd	    		(cache_mp_read_session)qstate->mdata);
78133808Spjd		configuration_unlock_entry(qstate->config_entry,
79143586Spjd			CELT_MULTIPART);
80143586Spjd	}
81133808Spjd	TRACE_OUT(on_mp_read_session_destroy);
82133808Spjd}
83133808Spjd
84133808Spjd/*
85133808Spjd * The functions below are used to process multipart read session initiation
86134124Spjd * requests.
87134168Spjd * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
88133808Spjd *   the request itself
89143586Spjd * - on_mp_read_session_request_process processes it
90143586Spjd * - on_mp_read_session_response_write1 sends the response
91133808Spjd */
92143586Spjdint
93143586Spjdon_mp_read_session_request_read1(struct query_state *qstate)
94143586Spjd{
95133808Spjd	struct cache_mp_read_session_request	*c_mp_rs_request;
96133808Spjd	ssize_t	result;
97133808Spjd
98133808Spjd	TRACE_IN(on_mp_read_session_request_read1);
99143586Spjd	if (qstate->kevent_watermark == 0)
100143586Spjd		qstate->kevent_watermark = sizeof(size_t);
101133808Spjd	else {
102133808Spjd		init_comm_element(&qstate->request,
103133808Spjd	    		CET_MP_READ_SESSION_REQUEST);
104133808Spjd		c_mp_rs_request = get_cache_mp_read_session_request(
105133808Spjd	    		&qstate->request);
106143586Spjd
107143586Spjd		result = qstate->read_func(qstate,
108133808Spjd	    		&c_mp_rs_request->entry_length, sizeof(size_t));
109133808Spjd
110133808Spjd		if (result != sizeof(size_t)) {
111133808Spjd			TRACE_OUT(on_mp_read_session_request_read1);
112133808Spjd			return (-1);
113133808Spjd		}
114133808Spjd
115133808Spjd		if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
116133808Spjd			TRACE_OUT(on_mp_read_session_request_read1);
117133808Spjd			return (-1);
118133808Spjd		}
119133808Spjd
120133808Spjd		c_mp_rs_request->entry = calloc(1,
121133808Spjd			c_mp_rs_request->entry_length + 1);
122133808Spjd		assert(c_mp_rs_request->entry != NULL);
123133808Spjd
124133808Spjd		qstate->kevent_watermark = c_mp_rs_request->entry_length;
125133808Spjd		qstate->process_func = on_mp_read_session_request_read2;
126133808Spjd	}
127133808Spjd	TRACE_OUT(on_mp_read_session_request_read1);
128133808Spjd	return (0);
129133808Spjd}
130133808Spjd
131133808Spjdstatic int
132133808Spjdon_mp_read_session_request_read2(struct query_state *qstate)
133133808Spjd{
134133808Spjd	struct cache_mp_read_session_request	*c_mp_rs_request;
135133808Spjd	ssize_t	result;
136133808Spjd
137133808Spjd	TRACE_IN(on_mp_read_session_request_read2);
138133808Spjd	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
139133808Spjd
140133808Spjd	result = qstate->read_func(qstate, c_mp_rs_request->entry,
141133808Spjd		c_mp_rs_request->entry_length);
142133808Spjd
143133808Spjd	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
144134168Spjd		LOG_ERR_3("on_mp_read_session_request_read2",
145134124Spjd			"read failed");
146134420Spjd		TRACE_OUT(on_mp_read_session_request_read2);
147134420Spjd		return (-1);
148133808Spjd	}
149133808Spjd
150133808Spjd	qstate->kevent_watermark = 0;
151133808Spjd	qstate->process_func = on_mp_read_session_request_process;
152133808Spjd	TRACE_OUT(on_mp_read_session_request_read2);
153133808Spjd	return (0);
154133808Spjd}
155133808Spjd
156133808Spjdstatic int
157133808Spjdon_mp_read_session_request_process(struct query_state *qstate)
158133808Spjd{
159133808Spjd	struct cache_mp_read_session_request	*c_mp_rs_request;
160133808Spjd	struct cache_mp_read_session_response	*c_mp_rs_response;
161133808Spjd	cache_mp_read_session	rs;
162133808Spjd	cache_entry	c_entry;
163133808Spjd	char	*dec_cache_entry_name;
164133808Spjd
165133808Spjd	char *buffer;
166133808Spjd	size_t buffer_size;
167133808Spjd	cache_mp_write_session ws;
168133808Spjd	struct agent	*lookup_agent;
169133808Spjd	struct multipart_agent *mp_agent;
170133808Spjd	void *mdata;
171133808Spjd	int res;
172133808Spjd
173133808Spjd	TRACE_IN(on_mp_read_session_request_process);
174133808Spjd	init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
175133808Spjd	c_mp_rs_response = get_cache_mp_read_session_response(
176147947Spjd		&qstate->response);
177133808Spjd	c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
178133808Spjd
179133808Spjd	qstate->config_entry = configuration_find_entry(
180139295Spjd		s_configuration, c_mp_rs_request->entry);
181133808Spjd	if (qstate->config_entry == NULL) {
182133808Spjd		c_mp_rs_response->error_code = ENOENT;
183133808Spjd
184133808Spjd		LOG_ERR_2("read_session_request",
185133808Spjd			"can't find configuration entry '%s'."
186133808Spjd			" aborting request", c_mp_rs_request->entry);
187133808Spjd		goto fin;
188133808Spjd	}
189133808Spjd
190134124Spjd	if (qstate->config_entry->enabled == 0) {
191134124Spjd		c_mp_rs_response->error_code = EACCES;
192134124Spjd
193134124Spjd		LOG_ERR_2("read_session_request",
194134124Spjd			"configuration entry '%s' is disabled",
195134124Spjd			c_mp_rs_request->entry);
196134124Spjd		goto fin;
197134168Spjd	}
198134168Spjd
199134168Spjd	if (qstate->config_entry->perform_actual_lookups != 0)
200134168Spjd		dec_cache_entry_name = strdup(
201134168Spjd			qstate->config_entry->mp_cache_params.cep.entry_name);
202134168Spjd	else {
203134168Spjd#ifdef NS_NSCD_EID_CHECKING
204134168Spjd		if (check_query_eids(qstate) != 0) {
205134168Spjd			c_mp_rs_response->error_code = EPERM;
206134168Spjd			goto fin;
207134168Spjd		}
208133808Spjd#endif
209133808Spjd
210133808Spjd		asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
211133808Spjd			qstate->config_entry->mp_cache_params.cep.entry_name);
212133808Spjd	}
213133808Spjd
214133808Spjd	assert(dec_cache_entry_name != NULL);
215133808Spjd
216133808Spjd	configuration_lock_rdlock(s_configuration);
217133808Spjd	c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
218133808Spjd	configuration_unlock(s_configuration);
219133808Spjd
220133808Spjd	if ((c_entry == INVALID_CACHE) &&
221133808Spjd	   (qstate->config_entry->perform_actual_lookups != 0))
222133808Spjd		c_entry = register_new_mp_cache_entry(qstate,
223133808Spjd			dec_cache_entry_name);
224133808Spjd
225133808Spjd	free(dec_cache_entry_name);
226133808Spjd
227133808Spjd	if (c_entry != INVALID_CACHE_ENTRY) {
228133808Spjd		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
229133808Spjd		rs = open_cache_mp_read_session(c_entry);
230133808Spjd		configuration_unlock_entry(qstate->config_entry,
231133808Spjd			CELT_MULTIPART);
232133808Spjd
233133808Spjd		if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
234133808Spjd		   (qstate->config_entry->perform_actual_lookups != 0)) {
235133808Spjd			lookup_agent = find_agent(s_agent_table,
236133808Spjd				c_mp_rs_request->entry, MULTIPART_AGENT);
237133808Spjd
238133808Spjd			if ((lookup_agent != NULL) &&
239133808Spjd			(lookup_agent->type == MULTIPART_AGENT)) {
240133808Spjd				mp_agent = (struct multipart_agent *)
241133808Spjd					lookup_agent;
242133808Spjd				mdata = mp_agent->mp_init_func();
243133808Spjd
244133808Spjd				/*
245133808Spjd				 * Multipart agents read the whole snapshot
246133808Spjd				 * of the data at one time.
247133808Spjd				 */
248133808Spjd				configuration_lock_entry(qstate->config_entry,
249133808Spjd					CELT_MULTIPART);
250133808Spjd				ws = open_cache_mp_write_session(c_entry);
251133808Spjd				configuration_unlock_entry(qstate->config_entry,
252133808Spjd					CELT_MULTIPART);
253133808Spjd				if (ws != NULL) {
254133808Spjd				    do {
255133808Spjd					buffer = NULL;
256133808Spjd					res = mp_agent->mp_lookup_func(&buffer,
257133808Spjd						&buffer_size,
258133808Spjd						mdata);
259133808Spjd
260133808Spjd					if ((res & NS_TERMINATE) &&
261133808Spjd					   (buffer != NULL)) {
262133808Spjd						configuration_lock_entry(
263133808Spjd							qstate->config_entry,
264142727Spjd						   	CELT_MULTIPART);
265142727Spjd						if (cache_mp_write(ws, buffer,
266142727Spjd						    buffer_size) != 0) {
267134420Spjd							abandon_cache_mp_write_session(ws);
268134420Spjd							ws = NULL;
269142727Spjd						}
270134420Spjd						configuration_unlock_entry(
271134420Spjd							qstate->config_entry,
272133808Spjd							CELT_MULTIPART);
273142727Spjd
274133808Spjd						free(buffer);
275133808Spjd						buffer = NULL;
276133808Spjd					} else {
277133808Spjd						configuration_lock_entry(
278133808Spjd							qstate->config_entry,
279133808Spjd							CELT_MULTIPART);
280133808Spjd						close_cache_mp_write_session(ws);
281134539Spjd						configuration_unlock_entry(
282134539Spjd							qstate->config_entry,
283134539Spjd							CELT_MULTIPART);
284134539Spjd
285134539Spjd						free(buffer);
286134539Spjd						buffer = NULL;
287134539Spjd					}
288133808Spjd				    } while ((res & NS_TERMINATE) &&
289133808Spjd				    	    (ws != NULL));
290133808Spjd				}
291133808Spjd
292133808Spjd				configuration_lock_entry(qstate->config_entry,
293133808Spjd					CELT_MULTIPART);
294133808Spjd				rs = open_cache_mp_read_session(c_entry);
295133808Spjd				configuration_unlock_entry(qstate->config_entry,
296133808Spjd					CELT_MULTIPART);
297133808Spjd			}
298133808Spjd		}
299133808Spjd
300133808Spjd		if (rs == INVALID_CACHE_MP_READ_SESSION)
301133808Spjd			c_mp_rs_response->error_code = -1;
302133808Spjd		else {
303133808Spjd		    qstate->mdata = rs;
304133808Spjd		    qstate->destroy_func = on_mp_read_session_destroy;
305133808Spjd
306133808Spjd		    configuration_lock_entry(qstate->config_entry,
307133808Spjd			CELT_MULTIPART);
308133808Spjd		    if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
309133808Spjd		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
310133808Spjd			memcpy(&qstate->timeout,
311133808Spjd			    &qstate->config_entry->mp_query_timeout,
312133808Spjd			    sizeof(struct timeval));
313133808Spjd		    configuration_unlock_entry(qstate->config_entry,
314133808Spjd			CELT_MULTIPART);
315133808Spjd		}
316133808Spjd	} else
317133808Spjd		c_mp_rs_response->error_code = -1;
318133808Spjd
319133808Spjdfin:
320133808Spjd	qstate->process_func = on_mp_read_session_response_write1;
321133808Spjd	qstate->kevent_watermark = sizeof(int);
322133808Spjd	qstate->kevent_filter = EVFILT_WRITE;
323133808Spjd
324133808Spjd	TRACE_OUT(on_mp_read_session_request_process);
325133808Spjd	return (0);
326133808Spjd}
327133808Spjd
328133808Spjdstatic int
329133808Spjdon_mp_read_session_response_write1(struct query_state *qstate)
330133808Spjd{
331133808Spjd	struct cache_mp_read_session_response	*c_mp_rs_response;
332133808Spjd	ssize_t	result;
333133808Spjd
334133808Spjd	TRACE_IN(on_mp_read_session_response_write1);
335133808Spjd	c_mp_rs_response = get_cache_mp_read_session_response(
336133808Spjd		&qstate->response);
337133808Spjd	result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
338133808Spjd		sizeof(int));
339133808Spjd
340133808Spjd	if (result != sizeof(int)) {
341133808Spjd		LOG_ERR_3("on_mp_read_session_response_write1",
342133808Spjd			"write failed");
343133808Spjd		TRACE_OUT(on_mp_read_session_response_write1);
344133808Spjd		return (-1);
345133808Spjd	}
346133808Spjd
347133808Spjd	if (c_mp_rs_response->error_code == 0) {
348133808Spjd		qstate->kevent_watermark = sizeof(int);
349133808Spjd		qstate->process_func = on_mp_read_session_mapper;
350133808Spjd		qstate->kevent_filter = EVFILT_READ;
351133808Spjd	} else {
352133808Spjd		qstate->kevent_watermark = 0;
353133808Spjd		qstate->process_func = NULL;
354133808Spjd	}
355133808Spjd	TRACE_OUT(on_mp_read_session_response_write1);
356133808Spjd	return (0);
357133808Spjd}
358133808Spjd
359133808Spjd/*
360133808Spjd * Mapper function is used to avoid multiple connections for each session
361133808Spjd * write or read requests. After processing the request, it does not close
362133808Spjd * the connection, but waits for the next request.
363133808Spjd */
364133808Spjdstatic int
365133808Spjdon_mp_read_session_mapper(struct query_state *qstate)
366133808Spjd{
367133808Spjd	ssize_t	result;
368133808Spjd	int elem_type;
369133808Spjd
370133808Spjd	TRACE_IN(on_mp_read_session_mapper);
371133808Spjd	if (qstate->kevent_watermark == 0) {
372133808Spjd		qstate->kevent_watermark = sizeof(int);
373133808Spjd	} else {
374133808Spjd		result = qstate->read_func(qstate, &elem_type, sizeof(int));
375		if (result != sizeof(int)) {
376			LOG_ERR_3("on_mp_read_session_mapper",
377				"read failed");
378			TRACE_OUT(on_mp_read_session_mapper);
379			return (-1);
380		}
381
382		switch (elem_type) {
383		case CET_MP_READ_SESSION_READ_REQUEST:
384			qstate->kevent_watermark = 0;
385			qstate->process_func =
386				on_mp_read_session_read_request_process;
387			break;
388		case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
389			qstate->kevent_watermark = 0;
390			qstate->process_func =
391				on_mp_read_session_close_notification;
392			break;
393		default:
394			qstate->kevent_watermark = 0;
395			qstate->process_func = NULL;
396			LOG_ERR_3("on_mp_read_session_mapper",
397				"unknown element type");
398			TRACE_OUT(on_mp_read_session_mapper);
399			return (-1);
400		}
401	}
402	TRACE_OUT(on_mp_read_session_mapper);
403	return (0);
404}
405
406/*
407 * The functions below are used to process multipart read sessions read
408 * requests. User doesn't have to pass any kind of data, besides the
409 * request identificator itself. So we don't need any XXX_read functions and
410 * start with the XXX_process function.
411 * - on_mp_read_session_read_request_process processes it
412 * - on_mp_read_session_read_response_write1 and
413 *   on_mp_read_session_read_response_write2 sends the response
414 */
415static int
416on_mp_read_session_read_request_process(struct query_state *qstate)
417{
418	struct cache_mp_read_session_read_response	*read_response;
419
420	TRACE_IN(on_mp_read_session_response_process);
421	init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
422	read_response = get_cache_mp_read_session_read_response(
423		&qstate->response);
424
425	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
426	read_response->error_code = cache_mp_read(
427		(cache_mp_read_session)qstate->mdata, NULL,
428		&read_response->data_size);
429
430	if (read_response->error_code == 0) {
431		read_response->data = malloc(read_response->data_size);
432		assert(read_response != NULL);
433		read_response->error_code = cache_mp_read(
434			(cache_mp_read_session)qstate->mdata,
435	    		read_response->data,
436			&read_response->data_size);
437	}
438	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
439
440	if (read_response->error_code == 0)
441		qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
442	else
443		qstate->kevent_watermark = sizeof(int);
444	qstate->process_func = on_mp_read_session_read_response_write1;
445	qstate->kevent_filter = EVFILT_WRITE;
446
447	TRACE_OUT(on_mp_read_session_response_process);
448	return (0);
449}
450
451static int
452on_mp_read_session_read_response_write1(struct query_state *qstate)
453{
454	struct cache_mp_read_session_read_response	*read_response;
455	ssize_t	result;
456
457	TRACE_IN(on_mp_read_session_read_response_write1);
458	read_response = get_cache_mp_read_session_read_response(
459		&qstate->response);
460
461	result = qstate->write_func(qstate, &read_response->error_code,
462		sizeof(int));
463	if (read_response->error_code == 0) {
464		result += qstate->write_func(qstate, &read_response->data_size,
465			sizeof(size_t));
466		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
467			TRACE_OUT(on_mp_read_session_read_response_write1);
468			LOG_ERR_3("on_mp_read_session_read_response_write1",
469				"write failed");
470			return (-1);
471		}
472
473		qstate->kevent_watermark = read_response->data_size;
474		qstate->process_func = on_mp_read_session_read_response_write2;
475	} else {
476		if (result < 0 || (size_t)result != qstate->kevent_watermark) {
477			LOG_ERR_3("on_mp_read_session_read_response_write1",
478				"write failed");
479			TRACE_OUT(on_mp_read_session_read_response_write1);
480			return (-1);
481		}
482
483		qstate->kevent_watermark = 0;
484		qstate->process_func = NULL;
485	}
486
487	TRACE_OUT(on_mp_read_session_read_response_write1);
488	return (0);
489}
490
491static int
492on_mp_read_session_read_response_write2(struct query_state *qstate)
493{
494	struct cache_mp_read_session_read_response *read_response;
495	ssize_t	result;
496
497	TRACE_IN(on_mp_read_session_read_response_write2);
498	read_response = get_cache_mp_read_session_read_response(
499		&qstate->response);
500	result = qstate->write_func(qstate, read_response->data,
501		read_response->data_size);
502	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
503		LOG_ERR_3("on_mp_read_session_read_response_write2",
504			"write failed");
505		TRACE_OUT(on_mp_read_session_read_response_write2);
506		return (-1);
507	}
508
509	finalize_comm_element(&qstate->request);
510	finalize_comm_element(&qstate->response);
511
512	qstate->kevent_watermark = sizeof(int);
513	qstate->process_func = on_mp_read_session_mapper;
514	qstate->kevent_filter = EVFILT_READ;
515
516	TRACE_OUT(on_mp_read_session_read_response_write2);
517	return (0);
518}
519
520/*
521 * Handles session close notification by calling close_cache_mp_read_session
522 * function.
523 */
524static int
525on_mp_read_session_close_notification(struct query_state *qstate)
526{
527
528	TRACE_IN(on_mp_read_session_close_notification);
529	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
530	close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
531	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
532	qstate->mdata = NULL;
533	qstate->kevent_watermark = 0;
534	qstate->process_func = NULL;
535	TRACE_OUT(on_mp_read_session_close_notification);
536	return (0);
537}
538