1158115Sume/*- 2158115Sume * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3158115Sume * All rights reserved. 4158115Sume * 5158115Sume * Redistribution and use in source and binary forms, with or without 6158115Sume * modification, are permitted provided that the following conditions 7158115Sume * are met: 8158115Sume * 1. Redistributions of source code must retain the above copyright 9158115Sume * notice, this list of conditions and the following disclaimer. 10158115Sume * 2. Redistributions in binary form must reproduce the above copyright 11158115Sume * notice, this list of conditions and the following disclaimer in the 12158115Sume * documentation and/or other materials provided with the distribution. 13158115Sume * 14158115Sume * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15158115Sume * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16158115Sume * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17158115Sume * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18158115Sume * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19158115Sume * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20158115Sume * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21158115Sume * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22158115Sume * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23158115Sume * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24158115Sume * SUCH DAMAGE. 25158115Sume * 26158115Sume */ 27158115Sume 28158115Sume#include <sys/cdefs.h> 29158115Sume__FBSDID("$FreeBSD$"); 30158115Sume 31194089Sdes#include <sys/types.h> 32194089Sdes#include <sys/event.h> 33158115Sume#include <sys/socket.h> 34158115Sume#include <sys/time.h> 35194089Sdes 36158115Sume#include <assert.h> 37158115Sume#include <errno.h> 38194089Sdes#include <nsswitch.h> 39194089Sdes#include <stdio.h> 40158115Sume#include <stdlib.h> 41158115Sume#include <string.h> 42158115Sume 43158115Sume#include "cachelib.h" 44158115Sume#include "config.h" 45158115Sume#include "debug.h" 46158115Sume#include "log.h" 47158115Sume#include "query.h" 48158115Sume#include "mp_rs_query.h" 49158115Sume#include "mp_ws_query.h" 50158115Sume#include "singletons.h" 51158115Sume 52158115Sumestatic int on_mp_read_session_close_notification(struct query_state *); 53158115Sumestatic void on_mp_read_session_destroy(struct query_state *); 54158115Sumestatic int on_mp_read_session_mapper(struct query_state *); 55158115Sume/* int on_mp_read_session_request_read1(struct query_state *); */ 56158115Sumestatic int on_mp_read_session_request_read2(struct query_state *); 57158115Sumestatic int on_mp_read_session_request_process(struct query_state *); 58158115Sumestatic int on_mp_read_session_response_write1(struct query_state *); 59158115Sumestatic int on_mp_read_session_read_request_process(struct query_state *); 60158115Sumestatic int on_mp_read_session_read_response_write1(struct query_state *); 61158115Sumestatic int on_mp_read_session_read_response_write2(struct query_state *); 62158115Sume 63158115Sume/* 64158115Sume * This function is used as the query_state's destroy_func to make the 65158115Sume * proper cleanup in case of errors. 66158115Sume */ 67158115Sumestatic void 68158115Sumeon_mp_read_session_destroy(struct query_state *qstate) 69158115Sume{ 70158115Sume TRACE_IN(on_mp_read_session_destroy); 71158115Sume finalize_comm_element(&qstate->request); 72158115Sume finalize_comm_element(&qstate->response); 73158115Sume 74158115Sume if (qstate->mdata != NULL) { 75158115Sume configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 76158115Sume close_cache_mp_read_session( 77158115Sume (cache_mp_read_session)qstate->mdata); 78158115Sume configuration_unlock_entry(qstate->config_entry, 79158115Sume CELT_MULTIPART); 80158115Sume } 81158115Sume TRACE_OUT(on_mp_read_session_destroy); 82158115Sume} 83158115Sume 84158115Sume/* 85158115Sume * The functions below are used to process multipart read session initiation 86158115Sume * requests. 87158115Sume * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read 88158115Sume * the request itself 89158115Sume * - on_mp_read_session_request_process processes it 90158115Sume * - on_mp_read_session_response_write1 sends the response 91158115Sume */ 92158115Sumeint 93158115Sumeon_mp_read_session_request_read1(struct query_state *qstate) 94158115Sume{ 95158115Sume struct cache_mp_read_session_request *c_mp_rs_request; 96158115Sume ssize_t result; 97158115Sume 98158115Sume TRACE_IN(on_mp_read_session_request_read1); 99158115Sume if (qstate->kevent_watermark == 0) 100158115Sume qstate->kevent_watermark = sizeof(size_t); 101158115Sume else { 102158115Sume init_comm_element(&qstate->request, 103158115Sume CET_MP_READ_SESSION_REQUEST); 104158115Sume c_mp_rs_request = get_cache_mp_read_session_request( 105158115Sume &qstate->request); 106158115Sume 107158115Sume result = qstate->read_func(qstate, 108158115Sume &c_mp_rs_request->entry_length, sizeof(size_t)); 109158115Sume 110158115Sume if (result != sizeof(size_t)) { 111158115Sume TRACE_OUT(on_mp_read_session_request_read1); 112158115Sume return (-1); 113158115Sume } 114158115Sume 115158115Sume if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) { 116158115Sume TRACE_OUT(on_mp_read_session_request_read1); 117158115Sume return (-1); 118158115Sume } 119158115Sume 120194104Sdes c_mp_rs_request->entry = calloc(1, 121158115Sume c_mp_rs_request->entry_length + 1); 122158115Sume assert(c_mp_rs_request->entry != NULL); 123158115Sume 124158115Sume qstate->kevent_watermark = c_mp_rs_request->entry_length; 125158115Sume qstate->process_func = on_mp_read_session_request_read2; 126158115Sume } 127158115Sume TRACE_OUT(on_mp_read_session_request_read1); 128158115Sume return (0); 129158115Sume} 130158115Sume 131158115Sumestatic int 132158115Sumeon_mp_read_session_request_read2(struct query_state *qstate) 133158115Sume{ 134158115Sume struct cache_mp_read_session_request *c_mp_rs_request; 135158115Sume ssize_t result; 136158115Sume 137158115Sume TRACE_IN(on_mp_read_session_request_read2); 138158115Sume c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 139158115Sume 140158115Sume result = qstate->read_func(qstate, c_mp_rs_request->entry, 141158115Sume c_mp_rs_request->entry_length); 142158115Sume 143194096Sdes if (result < 0 || (size_t)result != qstate->kevent_watermark) { 144158115Sume LOG_ERR_3("on_mp_read_session_request_read2", 145158115Sume "read failed"); 146158115Sume TRACE_OUT(on_mp_read_session_request_read2); 147158115Sume return (-1); 148158115Sume } 149158115Sume 150158115Sume qstate->kevent_watermark = 0; 151158115Sume qstate->process_func = on_mp_read_session_request_process; 152158115Sume TRACE_OUT(on_mp_read_session_request_read2); 153158115Sume return (0); 154158115Sume} 155158115Sume 156158115Sumestatic int 157158115Sumeon_mp_read_session_request_process(struct query_state *qstate) 158158115Sume{ 159158115Sume struct cache_mp_read_session_request *c_mp_rs_request; 160158115Sume struct cache_mp_read_session_response *c_mp_rs_response; 161158115Sume cache_mp_read_session rs; 162158115Sume cache_entry c_entry; 163158115Sume char *dec_cache_entry_name; 164158115Sume 165158115Sume char *buffer; 166158115Sume size_t buffer_size; 167158115Sume cache_mp_write_session ws; 168158115Sume struct agent *lookup_agent; 169158115Sume struct multipart_agent *mp_agent; 170158115Sume void *mdata; 171158115Sume int res; 172158115Sume 173158115Sume TRACE_IN(on_mp_read_session_request_process); 174158115Sume init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE); 175158115Sume c_mp_rs_response = get_cache_mp_read_session_response( 176158115Sume &qstate->response); 177158115Sume c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 178158115Sume 179158115Sume qstate->config_entry = configuration_find_entry( 180158115Sume s_configuration, c_mp_rs_request->entry); 181158115Sume if (qstate->config_entry == NULL) { 182158115Sume c_mp_rs_response->error_code = ENOENT; 183158115Sume 184158115Sume LOG_ERR_2("read_session_request", 185158115Sume "can't find configuration entry '%s'." 186158115Sume " aborting request", c_mp_rs_request->entry); 187158115Sume goto fin; 188158115Sume } 189158115Sume 190158115Sume if (qstate->config_entry->enabled == 0) { 191158115Sume c_mp_rs_response->error_code = EACCES; 192158115Sume 193158115Sume LOG_ERR_2("read_session_request", 194158115Sume "configuration entry '%s' is disabled", 195158115Sume c_mp_rs_request->entry); 196158115Sume goto fin; 197158115Sume } 198158115Sume 199158115Sume if (qstate->config_entry->perform_actual_lookups != 0) 200158115Sume dec_cache_entry_name = strdup( 201194097Sdes qstate->config_entry->mp_cache_params.cep.entry_name); 202158115Sume else { 203171795Sbushman#ifdef NS_NSCD_EID_CHECKING 204158115Sume if (check_query_eids(qstate) != 0) { 205158115Sume c_mp_rs_response->error_code = EPERM; 206158115Sume goto fin; 207158115Sume } 208158115Sume#endif 209158115Sume 210158115Sume asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 211194097Sdes qstate->config_entry->mp_cache_params.cep.entry_name); 212158115Sume } 213158115Sume 214158115Sume assert(dec_cache_entry_name != NULL); 215158115Sume 216158115Sume configuration_lock_rdlock(s_configuration); 217158115Sume c_entry = find_cache_entry(s_cache, dec_cache_entry_name); 218158115Sume configuration_unlock(s_configuration); 219158115Sume 220158115Sume if ((c_entry == INVALID_CACHE) && 221158115Sume (qstate->config_entry->perform_actual_lookups != 0)) 222158115Sume c_entry = register_new_mp_cache_entry(qstate, 223158115Sume dec_cache_entry_name); 224158115Sume 225158115Sume free(dec_cache_entry_name); 226158115Sume 227158115Sume if (c_entry != INVALID_CACHE_ENTRY) { 228158115Sume configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 229158115Sume rs = open_cache_mp_read_session(c_entry); 230158115Sume configuration_unlock_entry(qstate->config_entry, 231158115Sume CELT_MULTIPART); 232158115Sume 233158115Sume if ((rs == INVALID_CACHE_MP_READ_SESSION) && 234158115Sume (qstate->config_entry->perform_actual_lookups != 0)) { 235158115Sume lookup_agent = find_agent(s_agent_table, 236158115Sume c_mp_rs_request->entry, MULTIPART_AGENT); 237158115Sume 238158115Sume if ((lookup_agent != NULL) && 239158115Sume (lookup_agent->type == MULTIPART_AGENT)) { 240158115Sume mp_agent = (struct multipart_agent *) 241158115Sume lookup_agent; 242158115Sume mdata = mp_agent->mp_init_func(); 243158115Sume 244158115Sume /* 245158115Sume * Multipart agents read the whole snapshot 246158115Sume * of the data at one time. 247158115Sume */ 248158115Sume configuration_lock_entry(qstate->config_entry, 249158115Sume CELT_MULTIPART); 250158115Sume ws = open_cache_mp_write_session(c_entry); 251158115Sume configuration_unlock_entry(qstate->config_entry, 252158115Sume CELT_MULTIPART); 253158115Sume if (ws != NULL) { 254158115Sume do { 255158115Sume buffer = NULL; 256158115Sume res = mp_agent->mp_lookup_func(&buffer, 257158115Sume &buffer_size, 258158115Sume mdata); 259158115Sume 260158115Sume if ((res & NS_TERMINATE) && 261158115Sume (buffer != NULL)) { 262158115Sume configuration_lock_entry( 263158115Sume qstate->config_entry, 264158115Sume CELT_MULTIPART); 265158115Sume if (cache_mp_write(ws, buffer, 266158115Sume buffer_size) != 0) { 267158115Sume abandon_cache_mp_write_session(ws); 268158115Sume ws = NULL; 269158115Sume } 270158115Sume configuration_unlock_entry( 271158115Sume qstate->config_entry, 272158115Sume CELT_MULTIPART); 273158115Sume 274158115Sume free(buffer); 275158115Sume buffer = NULL; 276158115Sume } else { 277158115Sume configuration_lock_entry( 278158115Sume qstate->config_entry, 279158115Sume CELT_MULTIPART); 280158115Sume close_cache_mp_write_session(ws); 281158115Sume configuration_unlock_entry( 282158115Sume qstate->config_entry, 283158115Sume CELT_MULTIPART); 284158115Sume 285158115Sume free(buffer); 286158115Sume buffer = NULL; 287158115Sume } 288158115Sume } while ((res & NS_TERMINATE) && 289158115Sume (ws != NULL)); 290158115Sume } 291158115Sume 292158115Sume configuration_lock_entry(qstate->config_entry, 293158115Sume CELT_MULTIPART); 294158115Sume rs = open_cache_mp_read_session(c_entry); 295158115Sume configuration_unlock_entry(qstate->config_entry, 296158115Sume CELT_MULTIPART); 297158115Sume } 298158115Sume } 299158115Sume 300158115Sume if (rs == INVALID_CACHE_MP_READ_SESSION) 301158115Sume c_mp_rs_response->error_code = -1; 302158115Sume else { 303158115Sume qstate->mdata = rs; 304158115Sume qstate->destroy_func = on_mp_read_session_destroy; 305158115Sume 306158115Sume configuration_lock_entry(qstate->config_entry, 307158115Sume CELT_MULTIPART); 308158115Sume if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 309158115Sume (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 310158115Sume memcpy(&qstate->timeout, 311158115Sume &qstate->config_entry->mp_query_timeout, 312158115Sume sizeof(struct timeval)); 313158115Sume configuration_unlock_entry(qstate->config_entry, 314158115Sume CELT_MULTIPART); 315158115Sume } 316158115Sume } else 317158115Sume c_mp_rs_response->error_code = -1; 318158115Sume 319158115Sumefin: 320158115Sume qstate->process_func = on_mp_read_session_response_write1; 321158115Sume qstate->kevent_watermark = sizeof(int); 322158115Sume qstate->kevent_filter = EVFILT_WRITE; 323158115Sume 324158115Sume TRACE_OUT(on_mp_read_session_request_process); 325158115Sume return (0); 326158115Sume} 327158115Sume 328158115Sumestatic int 329158115Sumeon_mp_read_session_response_write1(struct query_state *qstate) 330158115Sume{ 331158115Sume struct cache_mp_read_session_response *c_mp_rs_response; 332158115Sume ssize_t result; 333158115Sume 334158115Sume TRACE_IN(on_mp_read_session_response_write1); 335158115Sume c_mp_rs_response = get_cache_mp_read_session_response( 336158115Sume &qstate->response); 337158115Sume result = qstate->write_func(qstate, &c_mp_rs_response->error_code, 338158115Sume sizeof(int)); 339158115Sume 340158115Sume if (result != sizeof(int)) { 341158115Sume LOG_ERR_3("on_mp_read_session_response_write1", 342158115Sume "write failed"); 343158115Sume TRACE_OUT(on_mp_read_session_response_write1); 344158115Sume return (-1); 345158115Sume } 346158115Sume 347158115Sume if (c_mp_rs_response->error_code == 0) { 348158115Sume qstate->kevent_watermark = sizeof(int); 349158115Sume qstate->process_func = on_mp_read_session_mapper; 350158115Sume qstate->kevent_filter = EVFILT_READ; 351158115Sume } else { 352158115Sume qstate->kevent_watermark = 0; 353158115Sume qstate->process_func = NULL; 354158115Sume } 355158115Sume TRACE_OUT(on_mp_read_session_response_write1); 356158115Sume return (0); 357158115Sume} 358158115Sume 359158115Sume/* 360158115Sume * Mapper function is used to avoid multiple connections for each session 361158115Sume * write or read requests. After processing the request, it does not close 362158115Sume * the connection, but waits for the next request. 363158115Sume */ 364158115Sumestatic int 365158115Sumeon_mp_read_session_mapper(struct query_state *qstate) 366158115Sume{ 367158115Sume ssize_t result; 368158115Sume int elem_type; 369158115Sume 370158115Sume TRACE_IN(on_mp_read_session_mapper); 371158115Sume if (qstate->kevent_watermark == 0) { 372158115Sume qstate->kevent_watermark = sizeof(int); 373158115Sume } else { 374158115Sume result = qstate->read_func(qstate, &elem_type, sizeof(int)); 375158115Sume if (result != sizeof(int)) { 376158115Sume LOG_ERR_3("on_mp_read_session_mapper", 377158115Sume "read failed"); 378158115Sume TRACE_OUT(on_mp_read_session_mapper); 379158115Sume return (-1); 380158115Sume } 381158115Sume 382158115Sume switch (elem_type) { 383158115Sume case CET_MP_READ_SESSION_READ_REQUEST: 384158115Sume qstate->kevent_watermark = 0; 385158115Sume qstate->process_func = 386158115Sume on_mp_read_session_read_request_process; 387158115Sume break; 388158115Sume case CET_MP_READ_SESSION_CLOSE_NOTIFICATION: 389158115Sume qstate->kevent_watermark = 0; 390158115Sume qstate->process_func = 391158115Sume on_mp_read_session_close_notification; 392158115Sume break; 393158115Sume default: 394158115Sume qstate->kevent_watermark = 0; 395158115Sume qstate->process_func = NULL; 396158115Sume LOG_ERR_3("on_mp_read_session_mapper", 397158115Sume "unknown element type"); 398158115Sume TRACE_OUT(on_mp_read_session_mapper); 399158115Sume return (-1); 400158115Sume } 401158115Sume } 402158115Sume TRACE_OUT(on_mp_read_session_mapper); 403158115Sume return (0); 404158115Sume} 405158115Sume 406158115Sume/* 407158115Sume * The functions below are used to process multipart read sessions read 408158115Sume * requests. User doesn't have to pass any kind of data, besides the 409158115Sume * request identificator itself. So we don't need any XXX_read functions and 410158115Sume * start with the XXX_process function. 411158115Sume * - on_mp_read_session_read_request_process processes it 412158115Sume * - on_mp_read_session_read_response_write1 and 413158115Sume * on_mp_read_session_read_response_write2 sends the response 414158115Sume */ 415158115Sumestatic int 416158115Sumeon_mp_read_session_read_request_process(struct query_state *qstate) 417158115Sume{ 418158115Sume struct cache_mp_read_session_read_response *read_response; 419158115Sume 420158115Sume TRACE_IN(on_mp_read_session_response_process); 421158115Sume init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE); 422158115Sume read_response = get_cache_mp_read_session_read_response( 423158115Sume &qstate->response); 424158115Sume 425158115Sume configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 426158115Sume read_response->error_code = cache_mp_read( 427158115Sume (cache_mp_read_session)qstate->mdata, NULL, 428158115Sume &read_response->data_size); 429158115Sume 430158115Sume if (read_response->error_code == 0) { 431194104Sdes read_response->data = malloc(read_response->data_size); 432158115Sume assert(read_response != NULL); 433158115Sume read_response->error_code = cache_mp_read( 434158115Sume (cache_mp_read_session)qstate->mdata, 435158115Sume read_response->data, 436158115Sume &read_response->data_size); 437158115Sume } 438158115Sume configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 439158115Sume 440158115Sume if (read_response->error_code == 0) 441158115Sume qstate->kevent_watermark = sizeof(size_t) + sizeof(int); 442158115Sume else 443158115Sume qstate->kevent_watermark = sizeof(int); 444158115Sume qstate->process_func = on_mp_read_session_read_response_write1; 445158115Sume qstate->kevent_filter = EVFILT_WRITE; 446158115Sume 447158115Sume TRACE_OUT(on_mp_read_session_response_process); 448158115Sume return (0); 449158115Sume} 450158115Sume 451158115Sumestatic int 452158115Sumeon_mp_read_session_read_response_write1(struct query_state *qstate) 453158115Sume{ 454158115Sume struct cache_mp_read_session_read_response *read_response; 455158115Sume ssize_t result; 456158115Sume 457158115Sume TRACE_IN(on_mp_read_session_read_response_write1); 458158115Sume read_response = get_cache_mp_read_session_read_response( 459158115Sume &qstate->response); 460158115Sume 461158115Sume result = qstate->write_func(qstate, &read_response->error_code, 462158115Sume sizeof(int)); 463158115Sume if (read_response->error_code == 0) { 464158115Sume result += qstate->write_func(qstate, &read_response->data_size, 465158115Sume sizeof(size_t)); 466194096Sdes if (result < 0 || (size_t)result != qstate->kevent_watermark) { 467158115Sume TRACE_OUT(on_mp_read_session_read_response_write1); 468158115Sume LOG_ERR_3("on_mp_read_session_read_response_write1", 469158115Sume "write failed"); 470158115Sume return (-1); 471158115Sume } 472158115Sume 473158115Sume qstate->kevent_watermark = read_response->data_size; 474158115Sume qstate->process_func = on_mp_read_session_read_response_write2; 475158115Sume } else { 476194096Sdes if (result < 0 || (size_t)result != qstate->kevent_watermark) { 477158115Sume LOG_ERR_3("on_mp_read_session_read_response_write1", 478158115Sume "write failed"); 479158115Sume TRACE_OUT(on_mp_read_session_read_response_write1); 480158115Sume return (-1); 481158115Sume } 482158115Sume 483158115Sume qstate->kevent_watermark = 0; 484158115Sume qstate->process_func = NULL; 485158115Sume } 486158115Sume 487158115Sume TRACE_OUT(on_mp_read_session_read_response_write1); 488158115Sume return (0); 489158115Sume} 490158115Sume 491158115Sumestatic int 492158115Sumeon_mp_read_session_read_response_write2(struct query_state *qstate) 493158115Sume{ 494158115Sume struct cache_mp_read_session_read_response *read_response; 495158115Sume ssize_t result; 496158115Sume 497158115Sume TRACE_IN(on_mp_read_session_read_response_write2); 498158115Sume read_response = get_cache_mp_read_session_read_response( 499158115Sume &qstate->response); 500158115Sume result = qstate->write_func(qstate, read_response->data, 501158115Sume read_response->data_size); 502194096Sdes if (result < 0 || (size_t)result != qstate->kevent_watermark) { 503158115Sume LOG_ERR_3("on_mp_read_session_read_response_write2", 504158115Sume "write failed"); 505158115Sume TRACE_OUT(on_mp_read_session_read_response_write2); 506158115Sume return (-1); 507158115Sume } 508158115Sume 509158115Sume finalize_comm_element(&qstate->request); 510158115Sume finalize_comm_element(&qstate->response); 511158115Sume 512158115Sume qstate->kevent_watermark = sizeof(int); 513158115Sume qstate->process_func = on_mp_read_session_mapper; 514158115Sume qstate->kevent_filter = EVFILT_READ; 515158115Sume 516158115Sume TRACE_OUT(on_mp_read_session_read_response_write2); 517158115Sume return (0); 518158115Sume} 519158115Sume 520158115Sume/* 521158115Sume * Handles session close notification by calling close_cache_mp_read_session 522158115Sume * function. 523158115Sume */ 524158115Sumestatic int 525158115Sumeon_mp_read_session_close_notification(struct query_state *qstate) 526158115Sume{ 527158115Sume 528158115Sume TRACE_IN(on_mp_read_session_close_notification); 529158115Sume configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 530158115Sume close_cache_mp_read_session((cache_mp_read_session)qstate->mdata); 531158115Sume configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 532158115Sume qstate->mdata = NULL; 533158115Sume qstate->kevent_watermark = 0; 534158115Sume qstate->process_func = NULL; 535158115Sume TRACE_OUT(on_mp_read_session_close_notification); 536158115Sume return (0); 537158115Sume} 538