1133808Spjd/*- 2142727Spjd * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru> 3133808Spjd * All rights reserved. 4133808Spjd * 5133808Spjd * Redistribution and use in source and binary forms, with or without 6133808Spjd * modification, are permitted provided that the following conditions 7133808Spjd * are met: 8133808Spjd * 1. Redistributions of source code must retain the above copyright 9133808Spjd * notice, this list of conditions and the following disclaimer. 10133808Spjd * 2. Redistributions in binary form must reproduce the above copyright 11133808Spjd * notice, this list of conditions and the following disclaimer in the 12133808Spjd * documentation and/or other materials provided with the distribution. 13133808Spjd * 14133808Spjd * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15133808Spjd * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16133808Spjd * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17133808Spjd * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18133808Spjd * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19133808Spjd * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20133808Spjd * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21133808Spjd * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22133808Spjd * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23133808Spjd * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24133808Spjd * SUCH DAMAGE. 25133808Spjd * 26133808Spjd */ 27133808Spjd 28133808Spjd#include <sys/cdefs.h> 29133808Spjd__FBSDID("$FreeBSD: releng/11.0/usr.sbin/nscd/mp_rs_query.c 194104 2009-06-13 13:07:56Z des $"); 30133808Spjd 31133808Spjd#include <sys/types.h> 32133808Spjd#include <sys/event.h> 33133808Spjd#include <sys/socket.h> 34133808Spjd#include <sys/time.h> 35133808Spjd 36133808Spjd#include <assert.h> 37133808Spjd#include <errno.h> 38133808Spjd#include <nsswitch.h> 39133808Spjd#include <stdio.h> 40133808Spjd#include <stdlib.h> 41133808Spjd#include <string.h> 42133808Spjd 43133808Spjd#include "cachelib.h" 44133808Spjd#include "config.h" 45133808Spjd#include "debug.h" 46133808Spjd#include "log.h" 47133808Spjd#include "query.h" 48133808Spjd#include "mp_rs_query.h" 49133808Spjd#include "mp_ws_query.h" 50133808Spjd#include "singletons.h" 51133808Spjd 52133808Spjdstatic int on_mp_read_session_close_notification(struct query_state *); 53133808Spjdstatic void on_mp_read_session_destroy(struct query_state *); 54143586Spjdstatic int on_mp_read_session_mapper(struct query_state *); 55143586Spjd/* int on_mp_read_session_request_read1(struct query_state *); */ 56143586Spjdstatic int on_mp_read_session_request_read2(struct query_state *); 57133808Spjdstatic int on_mp_read_session_request_process(struct query_state *); 58133808Spjdstatic int on_mp_read_session_response_write1(struct query_state *); 59133808Spjdstatic int on_mp_read_session_read_request_process(struct query_state *); 60133808Spjdstatic int on_mp_read_session_read_response_write1(struct query_state *); 61133808Spjdstatic int on_mp_read_session_read_response_write2(struct query_state *); 62133808Spjd 63134124Spjd/* 64134124Spjd * This function is used as the query_state's destroy_func to make the 65134168Spjd * proper cleanup in case of errors. 66134168Spjd */ 67133808Spjdstatic void 68143586Spjdon_mp_read_session_destroy(struct query_state *qstate) 69143586Spjd{ 70133808Spjd TRACE_IN(on_mp_read_session_destroy); 71143586Spjd finalize_comm_element(&qstate->request); 72143586Spjd finalize_comm_element(&qstate->response); 73143586Spjd 74133808Spjd if (qstate->mdata != NULL) { 75133808Spjd configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 76133808Spjd close_cache_mp_read_session( 77133808Spjd (cache_mp_read_session)qstate->mdata); 78133808Spjd configuration_unlock_entry(qstate->config_entry, 79143586Spjd CELT_MULTIPART); 80143586Spjd } 81133808Spjd TRACE_OUT(on_mp_read_session_destroy); 82133808Spjd} 83133808Spjd 84133808Spjd/* 85133808Spjd * The functions below are used to process multipart read session initiation 86134124Spjd * requests. 87134168Spjd * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read 88133808Spjd * the request itself 89143586Spjd * - on_mp_read_session_request_process processes it 90143586Spjd * - on_mp_read_session_response_write1 sends the response 91133808Spjd */ 92143586Spjdint 93143586Spjdon_mp_read_session_request_read1(struct query_state *qstate) 94143586Spjd{ 95133808Spjd struct cache_mp_read_session_request *c_mp_rs_request; 96133808Spjd ssize_t result; 97133808Spjd 98133808Spjd TRACE_IN(on_mp_read_session_request_read1); 99143586Spjd if (qstate->kevent_watermark == 0) 100143586Spjd qstate->kevent_watermark = sizeof(size_t); 101133808Spjd else { 102133808Spjd init_comm_element(&qstate->request, 103133808Spjd CET_MP_READ_SESSION_REQUEST); 104133808Spjd c_mp_rs_request = get_cache_mp_read_session_request( 105133808Spjd &qstate->request); 106143586Spjd 107143586Spjd result = qstate->read_func(qstate, 108133808Spjd &c_mp_rs_request->entry_length, sizeof(size_t)); 109133808Spjd 110133808Spjd if (result != sizeof(size_t)) { 111133808Spjd TRACE_OUT(on_mp_read_session_request_read1); 112133808Spjd return (-1); 113133808Spjd } 114133808Spjd 115133808Spjd if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) { 116133808Spjd TRACE_OUT(on_mp_read_session_request_read1); 117133808Spjd return (-1); 118133808Spjd } 119133808Spjd 120133808Spjd c_mp_rs_request->entry = calloc(1, 121133808Spjd c_mp_rs_request->entry_length + 1); 122133808Spjd assert(c_mp_rs_request->entry != NULL); 123133808Spjd 124133808Spjd qstate->kevent_watermark = c_mp_rs_request->entry_length; 125133808Spjd qstate->process_func = on_mp_read_session_request_read2; 126133808Spjd } 127133808Spjd TRACE_OUT(on_mp_read_session_request_read1); 128133808Spjd return (0); 129133808Spjd} 130133808Spjd 131133808Spjdstatic int 132133808Spjdon_mp_read_session_request_read2(struct query_state *qstate) 133133808Spjd{ 134133808Spjd struct cache_mp_read_session_request *c_mp_rs_request; 135133808Spjd ssize_t result; 136133808Spjd 137133808Spjd TRACE_IN(on_mp_read_session_request_read2); 138133808Spjd c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 139133808Spjd 140133808Spjd result = qstate->read_func(qstate, c_mp_rs_request->entry, 141133808Spjd c_mp_rs_request->entry_length); 142133808Spjd 143133808Spjd if (result < 0 || (size_t)result != qstate->kevent_watermark) { 144134168Spjd LOG_ERR_3("on_mp_read_session_request_read2", 145134124Spjd "read failed"); 146134420Spjd TRACE_OUT(on_mp_read_session_request_read2); 147134420Spjd return (-1); 148133808Spjd } 149133808Spjd 150133808Spjd qstate->kevent_watermark = 0; 151133808Spjd qstate->process_func = on_mp_read_session_request_process; 152133808Spjd TRACE_OUT(on_mp_read_session_request_read2); 153133808Spjd return (0); 154133808Spjd} 155133808Spjd 156133808Spjdstatic int 157133808Spjdon_mp_read_session_request_process(struct query_state *qstate) 158133808Spjd{ 159133808Spjd struct cache_mp_read_session_request *c_mp_rs_request; 160133808Spjd struct cache_mp_read_session_response *c_mp_rs_response; 161133808Spjd cache_mp_read_session rs; 162133808Spjd cache_entry c_entry; 163133808Spjd char *dec_cache_entry_name; 164133808Spjd 165133808Spjd char *buffer; 166133808Spjd size_t buffer_size; 167133808Spjd cache_mp_write_session ws; 168133808Spjd struct agent *lookup_agent; 169133808Spjd struct multipart_agent *mp_agent; 170133808Spjd void *mdata; 171133808Spjd int res; 172133808Spjd 173133808Spjd TRACE_IN(on_mp_read_session_request_process); 174133808Spjd init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE); 175133808Spjd c_mp_rs_response = get_cache_mp_read_session_response( 176147947Spjd &qstate->response); 177133808Spjd c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request); 178133808Spjd 179133808Spjd qstate->config_entry = configuration_find_entry( 180139295Spjd s_configuration, c_mp_rs_request->entry); 181133808Spjd if (qstate->config_entry == NULL) { 182133808Spjd c_mp_rs_response->error_code = ENOENT; 183133808Spjd 184133808Spjd LOG_ERR_2("read_session_request", 185133808Spjd "can't find configuration entry '%s'." 186133808Spjd " aborting request", c_mp_rs_request->entry); 187133808Spjd goto fin; 188133808Spjd } 189133808Spjd 190134124Spjd if (qstate->config_entry->enabled == 0) { 191134124Spjd c_mp_rs_response->error_code = EACCES; 192134124Spjd 193134124Spjd LOG_ERR_2("read_session_request", 194134124Spjd "configuration entry '%s' is disabled", 195134124Spjd c_mp_rs_request->entry); 196134124Spjd goto fin; 197134168Spjd } 198134168Spjd 199134168Spjd if (qstate->config_entry->perform_actual_lookups != 0) 200134168Spjd dec_cache_entry_name = strdup( 201134168Spjd qstate->config_entry->mp_cache_params.cep.entry_name); 202134168Spjd else { 203134168Spjd#ifdef NS_NSCD_EID_CHECKING 204134168Spjd if (check_query_eids(qstate) != 0) { 205134168Spjd c_mp_rs_response->error_code = EPERM; 206134168Spjd goto fin; 207134168Spjd } 208133808Spjd#endif 209133808Spjd 210133808Spjd asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str, 211133808Spjd qstate->config_entry->mp_cache_params.cep.entry_name); 212133808Spjd } 213133808Spjd 214133808Spjd assert(dec_cache_entry_name != NULL); 215133808Spjd 216133808Spjd configuration_lock_rdlock(s_configuration); 217133808Spjd c_entry = find_cache_entry(s_cache, dec_cache_entry_name); 218133808Spjd configuration_unlock(s_configuration); 219133808Spjd 220133808Spjd if ((c_entry == INVALID_CACHE) && 221133808Spjd (qstate->config_entry->perform_actual_lookups != 0)) 222133808Spjd c_entry = register_new_mp_cache_entry(qstate, 223133808Spjd dec_cache_entry_name); 224133808Spjd 225133808Spjd free(dec_cache_entry_name); 226133808Spjd 227133808Spjd if (c_entry != INVALID_CACHE_ENTRY) { 228133808Spjd configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 229133808Spjd rs = open_cache_mp_read_session(c_entry); 230133808Spjd configuration_unlock_entry(qstate->config_entry, 231133808Spjd CELT_MULTIPART); 232133808Spjd 233133808Spjd if ((rs == INVALID_CACHE_MP_READ_SESSION) && 234133808Spjd (qstate->config_entry->perform_actual_lookups != 0)) { 235133808Spjd lookup_agent = find_agent(s_agent_table, 236133808Spjd c_mp_rs_request->entry, MULTIPART_AGENT); 237133808Spjd 238133808Spjd if ((lookup_agent != NULL) && 239133808Spjd (lookup_agent->type == MULTIPART_AGENT)) { 240133808Spjd mp_agent = (struct multipart_agent *) 241133808Spjd lookup_agent; 242133808Spjd mdata = mp_agent->mp_init_func(); 243133808Spjd 244133808Spjd /* 245133808Spjd * Multipart agents read the whole snapshot 246133808Spjd * of the data at one time. 247133808Spjd */ 248133808Spjd configuration_lock_entry(qstate->config_entry, 249133808Spjd CELT_MULTIPART); 250133808Spjd ws = open_cache_mp_write_session(c_entry); 251133808Spjd configuration_unlock_entry(qstate->config_entry, 252133808Spjd CELT_MULTIPART); 253133808Spjd if (ws != NULL) { 254133808Spjd do { 255133808Spjd buffer = NULL; 256133808Spjd res = mp_agent->mp_lookup_func(&buffer, 257133808Spjd &buffer_size, 258133808Spjd mdata); 259133808Spjd 260133808Spjd if ((res & NS_TERMINATE) && 261133808Spjd (buffer != NULL)) { 262133808Spjd configuration_lock_entry( 263133808Spjd qstate->config_entry, 264142727Spjd CELT_MULTIPART); 265142727Spjd if (cache_mp_write(ws, buffer, 266142727Spjd buffer_size) != 0) { 267134420Spjd abandon_cache_mp_write_session(ws); 268134420Spjd ws = NULL; 269142727Spjd } 270134420Spjd configuration_unlock_entry( 271134420Spjd qstate->config_entry, 272133808Spjd CELT_MULTIPART); 273142727Spjd 274133808Spjd free(buffer); 275133808Spjd buffer = NULL; 276133808Spjd } else { 277133808Spjd configuration_lock_entry( 278133808Spjd qstate->config_entry, 279133808Spjd CELT_MULTIPART); 280133808Spjd close_cache_mp_write_session(ws); 281134539Spjd configuration_unlock_entry( 282134539Spjd qstate->config_entry, 283134539Spjd CELT_MULTIPART); 284134539Spjd 285134539Spjd free(buffer); 286134539Spjd buffer = NULL; 287134539Spjd } 288133808Spjd } while ((res & NS_TERMINATE) && 289133808Spjd (ws != NULL)); 290133808Spjd } 291133808Spjd 292133808Spjd configuration_lock_entry(qstate->config_entry, 293133808Spjd CELT_MULTIPART); 294133808Spjd rs = open_cache_mp_read_session(c_entry); 295133808Spjd configuration_unlock_entry(qstate->config_entry, 296133808Spjd CELT_MULTIPART); 297133808Spjd } 298133808Spjd } 299133808Spjd 300133808Spjd if (rs == INVALID_CACHE_MP_READ_SESSION) 301133808Spjd c_mp_rs_response->error_code = -1; 302133808Spjd else { 303133808Spjd qstate->mdata = rs; 304133808Spjd qstate->destroy_func = on_mp_read_session_destroy; 305133808Spjd 306133808Spjd configuration_lock_entry(qstate->config_entry, 307133808Spjd CELT_MULTIPART); 308133808Spjd if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) || 309133808Spjd (qstate->config_entry->mp_query_timeout.tv_usec != 0)) 310133808Spjd memcpy(&qstate->timeout, 311133808Spjd &qstate->config_entry->mp_query_timeout, 312133808Spjd sizeof(struct timeval)); 313133808Spjd configuration_unlock_entry(qstate->config_entry, 314133808Spjd CELT_MULTIPART); 315133808Spjd } 316133808Spjd } else 317133808Spjd c_mp_rs_response->error_code = -1; 318133808Spjd 319133808Spjdfin: 320133808Spjd qstate->process_func = on_mp_read_session_response_write1; 321133808Spjd qstate->kevent_watermark = sizeof(int); 322133808Spjd qstate->kevent_filter = EVFILT_WRITE; 323133808Spjd 324133808Spjd TRACE_OUT(on_mp_read_session_request_process); 325133808Spjd return (0); 326133808Spjd} 327133808Spjd 328133808Spjdstatic int 329133808Spjdon_mp_read_session_response_write1(struct query_state *qstate) 330133808Spjd{ 331133808Spjd struct cache_mp_read_session_response *c_mp_rs_response; 332133808Spjd ssize_t result; 333133808Spjd 334133808Spjd TRACE_IN(on_mp_read_session_response_write1); 335133808Spjd c_mp_rs_response = get_cache_mp_read_session_response( 336133808Spjd &qstate->response); 337133808Spjd result = qstate->write_func(qstate, &c_mp_rs_response->error_code, 338133808Spjd sizeof(int)); 339133808Spjd 340133808Spjd if (result != sizeof(int)) { 341133808Spjd LOG_ERR_3("on_mp_read_session_response_write1", 342133808Spjd "write failed"); 343133808Spjd TRACE_OUT(on_mp_read_session_response_write1); 344133808Spjd return (-1); 345133808Spjd } 346133808Spjd 347133808Spjd if (c_mp_rs_response->error_code == 0) { 348133808Spjd qstate->kevent_watermark = sizeof(int); 349133808Spjd qstate->process_func = on_mp_read_session_mapper; 350133808Spjd qstate->kevent_filter = EVFILT_READ; 351133808Spjd } else { 352133808Spjd qstate->kevent_watermark = 0; 353133808Spjd qstate->process_func = NULL; 354133808Spjd } 355133808Spjd TRACE_OUT(on_mp_read_session_response_write1); 356133808Spjd return (0); 357133808Spjd} 358133808Spjd 359133808Spjd/* 360133808Spjd * Mapper function is used to avoid multiple connections for each session 361133808Spjd * write or read requests. After processing the request, it does not close 362133808Spjd * the connection, but waits for the next request. 363133808Spjd */ 364133808Spjdstatic int 365133808Spjdon_mp_read_session_mapper(struct query_state *qstate) 366133808Spjd{ 367133808Spjd ssize_t result; 368133808Spjd int elem_type; 369133808Spjd 370133808Spjd TRACE_IN(on_mp_read_session_mapper); 371133808Spjd if (qstate->kevent_watermark == 0) { 372133808Spjd qstate->kevent_watermark = sizeof(int); 373133808Spjd } else { 374133808Spjd result = qstate->read_func(qstate, &elem_type, sizeof(int)); 375 if (result != sizeof(int)) { 376 LOG_ERR_3("on_mp_read_session_mapper", 377 "read failed"); 378 TRACE_OUT(on_mp_read_session_mapper); 379 return (-1); 380 } 381 382 switch (elem_type) { 383 case CET_MP_READ_SESSION_READ_REQUEST: 384 qstate->kevent_watermark = 0; 385 qstate->process_func = 386 on_mp_read_session_read_request_process; 387 break; 388 case CET_MP_READ_SESSION_CLOSE_NOTIFICATION: 389 qstate->kevent_watermark = 0; 390 qstate->process_func = 391 on_mp_read_session_close_notification; 392 break; 393 default: 394 qstate->kevent_watermark = 0; 395 qstate->process_func = NULL; 396 LOG_ERR_3("on_mp_read_session_mapper", 397 "unknown element type"); 398 TRACE_OUT(on_mp_read_session_mapper); 399 return (-1); 400 } 401 } 402 TRACE_OUT(on_mp_read_session_mapper); 403 return (0); 404} 405 406/* 407 * The functions below are used to process multipart read sessions read 408 * requests. User doesn't have to pass any kind of data, besides the 409 * request identificator itself. So we don't need any XXX_read functions and 410 * start with the XXX_process function. 411 * - on_mp_read_session_read_request_process processes it 412 * - on_mp_read_session_read_response_write1 and 413 * on_mp_read_session_read_response_write2 sends the response 414 */ 415static int 416on_mp_read_session_read_request_process(struct query_state *qstate) 417{ 418 struct cache_mp_read_session_read_response *read_response; 419 420 TRACE_IN(on_mp_read_session_response_process); 421 init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE); 422 read_response = get_cache_mp_read_session_read_response( 423 &qstate->response); 424 425 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 426 read_response->error_code = cache_mp_read( 427 (cache_mp_read_session)qstate->mdata, NULL, 428 &read_response->data_size); 429 430 if (read_response->error_code == 0) { 431 read_response->data = malloc(read_response->data_size); 432 assert(read_response != NULL); 433 read_response->error_code = cache_mp_read( 434 (cache_mp_read_session)qstate->mdata, 435 read_response->data, 436 &read_response->data_size); 437 } 438 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 439 440 if (read_response->error_code == 0) 441 qstate->kevent_watermark = sizeof(size_t) + sizeof(int); 442 else 443 qstate->kevent_watermark = sizeof(int); 444 qstate->process_func = on_mp_read_session_read_response_write1; 445 qstate->kevent_filter = EVFILT_WRITE; 446 447 TRACE_OUT(on_mp_read_session_response_process); 448 return (0); 449} 450 451static int 452on_mp_read_session_read_response_write1(struct query_state *qstate) 453{ 454 struct cache_mp_read_session_read_response *read_response; 455 ssize_t result; 456 457 TRACE_IN(on_mp_read_session_read_response_write1); 458 read_response = get_cache_mp_read_session_read_response( 459 &qstate->response); 460 461 result = qstate->write_func(qstate, &read_response->error_code, 462 sizeof(int)); 463 if (read_response->error_code == 0) { 464 result += qstate->write_func(qstate, &read_response->data_size, 465 sizeof(size_t)); 466 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 467 TRACE_OUT(on_mp_read_session_read_response_write1); 468 LOG_ERR_3("on_mp_read_session_read_response_write1", 469 "write failed"); 470 return (-1); 471 } 472 473 qstate->kevent_watermark = read_response->data_size; 474 qstate->process_func = on_mp_read_session_read_response_write2; 475 } else { 476 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 477 LOG_ERR_3("on_mp_read_session_read_response_write1", 478 "write failed"); 479 TRACE_OUT(on_mp_read_session_read_response_write1); 480 return (-1); 481 } 482 483 qstate->kevent_watermark = 0; 484 qstate->process_func = NULL; 485 } 486 487 TRACE_OUT(on_mp_read_session_read_response_write1); 488 return (0); 489} 490 491static int 492on_mp_read_session_read_response_write2(struct query_state *qstate) 493{ 494 struct cache_mp_read_session_read_response *read_response; 495 ssize_t result; 496 497 TRACE_IN(on_mp_read_session_read_response_write2); 498 read_response = get_cache_mp_read_session_read_response( 499 &qstate->response); 500 result = qstate->write_func(qstate, read_response->data, 501 read_response->data_size); 502 if (result < 0 || (size_t)result != qstate->kevent_watermark) { 503 LOG_ERR_3("on_mp_read_session_read_response_write2", 504 "write failed"); 505 TRACE_OUT(on_mp_read_session_read_response_write2); 506 return (-1); 507 } 508 509 finalize_comm_element(&qstate->request); 510 finalize_comm_element(&qstate->response); 511 512 qstate->kevent_watermark = sizeof(int); 513 qstate->process_func = on_mp_read_session_mapper; 514 qstate->kevent_filter = EVFILT_READ; 515 516 TRACE_OUT(on_mp_read_session_read_response_write2); 517 return (0); 518} 519 520/* 521 * Handles session close notification by calling close_cache_mp_read_session 522 * function. 523 */ 524static int 525on_mp_read_session_close_notification(struct query_state *qstate) 526{ 527 528 TRACE_IN(on_mp_read_session_close_notification); 529 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART); 530 close_cache_mp_read_session((cache_mp_read_session)qstate->mdata); 531 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART); 532 qstate->mdata = NULL; 533 qstate->kevent_watermark = 0; 534 qstate->process_func = NULL; 535 TRACE_OUT(on_mp_read_session_close_notification); 536 return (0); 537} 538