11832Swollman/*	$NetBSD$	*/
239429Sobrien
31832Swollman/*-
41832Swollman * Copyright (c)2010,2011 YAMAMOTO Takashi,
51832Swollman * All rights reserved.
632624Swpaul *
71832Swollman * Redistribution and use in source and binary forms, with or without
826208Swpaul * modification, are permitted provided that the following conditions
916123Swpaul * are met:
1026208Swpaul * 1. Redistributions of source code must retain the above copyright
1126208Swpaul *    notice, this list of conditions and the following disclaimer.
1226208Swpaul * 2. Redistributions in binary form must reproduce the above copyright
131832Swollman *    notice, this list of conditions and the following disclaimer in the
1426208Swpaul *    documentation and/or other materials provided with the distribution.
1526208Swpaul *
1626208Swpaul * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
171832Swollman * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
181832Swollman * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
191832Swollman * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
201832Swollman * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
211832Swollman * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2229504Sbde * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2326248Swpaul * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2417953Speter * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2517953Speter * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2628542Sbde * SUCH DAMAGE.
2728524Sjmg */
2833298Sbde
2933298Sbde/*
3028524Sjmg * backend db operations
311832Swollman */
3239429Sobrien
331832Swollman#include <sys/cdefs.h>
3432551Sbde#ifndef lint
3532551Sbde__RCSID("$NetBSD$");
361832Swollman#endif /* not lint */
37
38#include <assert.h>
39#include <err.h>
40#include <errno.h>
41#include <inttypes.h>
42#include <puffs.h>
43#include <stdbool.h>
44#include <stdarg.h>
45#include <stdio.h>
46#include <stdlib.h>
47#include <util.h>
48
49#include <libpq-fe.h>
50
51#include "pgfs_db.h"
52#include "pgfs_waitq.h"
53#include "pgfs_debug.h"
54
55bool pgfs_dosync = false;
56
57struct Xconn {
58	TAILQ_ENTRY(Xconn) list;
59	PGconn *conn;
60	struct puffs_cc *blocker;
61	struct puffs_cc *owner;
62	bool in_trans;
63	int id;
64};
65
66static void
67dumperror(struct Xconn *xc, const PGresult *res)
68{
69	static const struct {
70		const char *name;
71		int code;
72	} fields[] = {
73#define F(x)	{ .name = #x, .code = x, }
74		F(PG_DIAG_SEVERITY),
75		F(PG_DIAG_SQLSTATE),
76		F(PG_DIAG_MESSAGE_PRIMARY),
77		F(PG_DIAG_MESSAGE_DETAIL),
78		F(PG_DIAG_MESSAGE_HINT),
79		F(PG_DIAG_STATEMENT_POSITION),
80		F(PG_DIAG_INTERNAL_POSITION),
81		F(PG_DIAG_INTERNAL_QUERY),
82		F(PG_DIAG_CONTEXT),
83		F(PG_DIAG_SOURCE_FILE),
84		F(PG_DIAG_SOURCE_LINE),
85		F(PG_DIAG_SOURCE_FUNCTION),
86#undef F
87	};
88	unsigned int i;
89
90	if (!pgfs_dodprintf) {
91		return;
92	}
93	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
94	    PQresultStatus(res) == PGRES_FATAL_ERROR);
95	for (i = 0; i < __arraycount(fields); i++) {
96		const char *val = PQresultErrorField(res, fields[i].code);
97
98		if (val == NULL) {
99			continue;
100		}
101		fprintf(stderr, "%s: %s\n", fields[i].name, val);
102	}
103}
104
105TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
106struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
107
108static struct Xconn *
109getxc(struct puffs_cc *cc)
110{
111	struct Xconn *xc;
112
113	assert(cc != NULL);
114retry:
115	TAILQ_FOREACH(xc, &xclist, list) {
116		if (xc->blocker == NULL) {
117			assert(xc->owner == NULL);
118			xc->owner = cc;
119			DPRINTF("xc %p acquire %p\n", xc, cc);
120			return xc;
121		} else {
122			assert(xc->owner == xc->blocker);
123		}
124	}
125	DPRINTF("no free conn %p\n", cc);
126	waiton(&xcwaitq, cc);
127	goto retry;
128}
129
130static void
131relxc(struct Xconn *xc)
132{
133
134	assert(xc->in_trans);
135	assert(xc->owner != NULL);
136	xc->in_trans = false;
137	xc->owner = NULL;
138	wakeup_one(&xcwaitq);
139}
140
141static void
142pqwait(struct Xconn *xc)
143{
144	PGconn *conn = xc->conn;
145	struct puffs_cc *cc = xc->owner;
146
147	if (PQflush(conn)) {
148		errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
149	}
150	if (!PQisBusy(conn)) {
151		return;
152	}
153	assert(xc->blocker == NULL);
154	xc->blocker = cc;
155	DPRINTF("yielding %p\n", cc);
156	/* XXX is it safe to yield before entering mainloop? */
157	puffs_cc_yield(cc);
158	DPRINTF("yield returned %p\n", cc);
159	assert(xc->owner == cc);
160	assert(xc->blocker == cc);
161	xc->blocker = NULL;
162}
163
164static int
165sqltoerrno(const char *sqlstate)
166{
167	/*
168	 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
169	 * "tuple concurrently updated" errors for lowrite/lo_truncate.
170	 *
171	 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
172	 */
173	static const struct {
174		char sqlstate[5];
175		int error;
176	} map[] = {
177		{ "00000", 0, },	/* ERRCODE_SUCCESSFUL_COMPLETION */
178		{ "02000", ENOENT, },	/* ERRCODE_NO_DATA */
179		{ "23505", EEXIST, },	/* ERRCODE_UNIQUE_VIOLATION */
180		{ "23514", EINVAL, },	/* ERRCODE_CHECK_VIOLATION */
181		{ "40001", EAGAIN, },	/* ERRCODE_T_R_SERIALIZATION_FAILURE */
182		{ "40P01", EAGAIN, },	/* ERRCODE_T_R_DEADLOCK_DETECTED */
183		{ "42704", ENOENT, },	/* ERRCODE_UNDEFINED_OBJECT */
184		{ "53100", ENOSPC, },	/* ERRCODE_DISK_FULL */
185		{ "53200", ENOMEM, },	/* ERRCODE_OUT_OF_MEMORY */
186		{ "XX000", EAGAIN, },	/* ERRCODE_INTERNAL_ERROR */
187	};
188	unsigned int i;
189
190	for (i = 0; i < __arraycount(map); i++) {
191		if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
192			const int error = map[i].error;
193
194			if (error != 0) {
195				DPRINTF("sqlstate %5s mapped to error %d\n",
196				    sqlstate, error);
197			}
198			if (error == EINVAL) {
199				/*
200				 * sounds like a bug.
201				 */
202				abort();
203			}
204			return error;
205		}
206	}
207	DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
208	return EIO;
209}
210
211struct cmd {
212	char name[32];		/* name of prepared statement */
213	char *cmd;		/* query string */
214	unsigned int nparams;
215	Oid *paramtypes;
216	uint32_t prepared_mask;	/* for which connections this is prepared? */
217	unsigned int flags;	/* CMD_ flags */
218};
219
220#define	CMD_NOPREPARE	1	/* don't prepare this command */
221
222struct cmd *
223createcmd(const char *cmd, unsigned int flags, ...)
224{
225	struct cmd *c;
226	va_list ap;
227	const char *cp;
228	unsigned int i;
229	static unsigned int cmdid;
230
231	c = emalloc(sizeof(*c));
232	c->cmd = estrdup(cmd);
233	c->nparams = 0;
234	va_start(ap, flags);
235	for (cp = cmd; *cp != 0; cp++) {
236		if (*cp == '$') { /* XXX */
237			c->nparams++;
238		}
239	}
240	c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
241	for (i = 0; i < c->nparams; i++) {
242		Oid type = va_arg(ap, Oid);
243		assert(type == BYTEA ||
244		    type == INT4OID || type == INT8OID || type == OIDOID ||
245		    type == TEXTOID || type == TIMESTAMPTZOID);
246		c->paramtypes[i] = type;
247	}
248	va_end(ap);
249	snprintf(c->name, sizeof(c->name), "%u", cmdid++);
250	if ((flags & CMD_NOPREPARE) != 0) {
251		c->prepared_mask = ~0;
252	} else {
253		c->prepared_mask = 0;
254	}
255	c->flags = flags;
256	return c;
257}
258
259static void
260freecmd(struct cmd *c)
261{
262
263	free(c->paramtypes);
264	free(c->cmd);
265	free(c);
266}
267
268static int
269fetch_noresult(struct Xconn *xc)
270{
271	PGresult *res;
272	ExecStatusType status;
273	PGconn *conn = xc->conn;
274	int error;
275
276	pqwait(xc);
277	res = PQgetResult(conn);
278	if (res == NULL) {
279		return ENOENT;
280	}
281	status = PQresultStatus(res);
282	if (status == PGRES_COMMAND_OK) {
283		assert(PQnfields(res) == 0);
284		assert(PQntuples(res) == 0);
285		if (!strcmp(PQcmdTuples(res), "0")) {
286			error = ENOENT;
287		} else {
288			error = 0;
289		}
290	} else if (status == PGRES_FATAL_ERROR) {
291		error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
292		assert(error != 0);
293		dumperror(xc, res);
294	} else {
295		errx(1, "%s not command_ok: %d: %s", __func__,
296		    (int)status,
297		    PQerrorMessage(conn));
298	}
299	PQclear(res);
300	res = PQgetResult(conn);
301	assert(res == NULL);
302	if (error != 0) {
303		DPRINTF("error %d\n", error);
304	}
305	return error;
306}
307
308static int
309preparecmd(struct Xconn *xc, struct cmd *c)
310{
311	PGconn *conn = xc->conn;
312	const uint32_t mask = 1 << xc->id;
313	int error;
314	int ret;
315
316	if ((c->prepared_mask & mask) != 0) {
317		return 0;
318	}
319	assert((c->flags & CMD_NOPREPARE) == 0);
320	DPRINTF("PREPARE: '%s'\n", c->cmd);
321	ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
322	if (!ret) {
323		errx(EXIT_FAILURE, "PQsendPrepare: %s",
324		    PQerrorMessage(conn));
325	}
326	error = fetch_noresult(xc);
327	if (error != 0) {
328		return error;
329	}
330	c->prepared_mask |= mask;
331	return 0;
332}
333
334/*
335 * vsendcmd:
336 *
337 * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
338 * 0 for text and 1 for binary.
339 */
340
341static int
342vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
343{
344	PGconn *conn = xc->conn;
345	char **paramvalues;
346	int *paramlengths;
347	int *paramformats;
348	unsigned int i;
349	int error;
350	int ret;
351
352	assert(xc->owner != NULL);
353	assert(xc->blocker == NULL);
354	error = preparecmd(xc, c);
355	if (error != 0) {
356		return error;
357	}
358	paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
359	paramlengths = NULL;
360	paramformats = NULL;
361	DPRINTF("CMD: '%s'\n", c->cmd);
362	for (i = 0; i < c->nparams; i++) {
363		Oid type = c->paramtypes[i];
364		char tmpstore[1024];
365		const char *buf = NULL;
366		intmax_t v = 0; /* XXXgcc */
367		int sz;
368		bool binary = false;
369
370		switch (type) {
371		case BYTEA:
372			buf = va_arg(ap, const void *);
373			sz = (int)va_arg(ap, size_t);
374			binary = true;
375			break;
376		case INT8OID:
377		case OIDOID:
378		case INT4OID:
379			switch (type) {
380			case INT8OID:
381				v = (intmax_t)va_arg(ap, int64_t);
382				break;
383			case OIDOID:
384				v = (intmax_t)va_arg(ap, Oid);
385				break;
386			case INT4OID:
387				v = (intmax_t)va_arg(ap, int32_t);
388				break;
389			default:
390				errx(EXIT_FAILURE, "unknown integer oid %u",
391				    type);
392			}
393			buf = tmpstore;
394			sz = snprintf(tmpstore, sizeof(tmpstore),
395			    "%jd", v);
396			assert(sz != -1);
397			assert((size_t)sz < sizeof(tmpstore));
398			sz += 1;
399			break;
400		case TEXTOID:
401		case TIMESTAMPTZOID:
402			buf = va_arg(ap, char *);
403			sz = strlen(buf) + 1;
404			break;
405		default:
406			errx(EXIT_FAILURE, "%s: unknown param type %u",
407			    __func__, type);
408		}
409		if (binary) {
410			if (paramlengths == NULL) {
411				paramlengths =
412				    emalloc(c->nparams * sizeof(*paramformats));
413			}
414			if (paramformats == NULL) {
415				paramformats = ecalloc(1,
416				    c->nparams * sizeof(*paramformats));
417			}
418			paramformats[i] = 1;
419			paramlengths[i] = sz;
420		}
421		paramvalues[i] = emalloc(sz);
422		memcpy(paramvalues[i], buf, sz);
423		if (binary) {
424			DPRINTF("\t[%u]=<BINARY>\n", i);
425		} else {
426			DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
427		}
428	}
429	if ((c->flags & CMD_NOPREPARE) != 0) {
430		ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
431		    (const char * const *)paramvalues, paramlengths,
432		    paramformats, resultmode);
433	} else {
434		ret = PQsendQueryPrepared(conn, c->name, c->nparams,
435		    (const char * const *)paramvalues, paramlengths,
436		    paramformats, resultmode);
437	}
438	for (i = 0; i < c->nparams; i++) {
439		free(paramvalues[i]);
440	}
441	free(paramvalues);
442	free(paramlengths);
443	free(paramformats);
444	if (!ret) {
445		errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
446		    PQerrorMessage(conn));
447	}
448	return 0;
449}
450
451int
452sendcmd(struct Xconn *xc, struct cmd *c, ...)
453{
454	va_list ap;
455	int error;
456
457	va_start(ap, c);
458	error = vsendcmd(xc, 0, c, ap);
459	va_end(ap);
460	return error;
461}
462
463int
464sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
465{
466	va_list ap;
467	int error;
468
469	va_start(ap, c);
470	error = vsendcmd(xc, resultmode, c, ap);
471	va_end(ap);
472	return error;
473}
474
475/*
476 * simplecmd: a convenient routine to execute a command which returns
477 * no rows synchronously.
478 */
479
480int
481simplecmd(struct Xconn *xc, struct cmd *c, ...)
482{
483	va_list ap;
484	int error;
485
486	va_start(ap, c);
487	error = vsendcmd(xc, 0, c, ap);
488	va_end(ap);
489	if (error != 0) {
490		return error;
491	}
492	return fetch_noresult(xc);
493}
494
495void
496fetchinit(struct fetchstatus *s, struct Xconn *xc)
497{
498	s->xc = xc;
499	s->res = NULL;
500	s->cur = 0;
501	s->nrows = 0;
502	s->done = false;
503}
504
505static intmax_t
506getint(const char *str)
507{
508	intmax_t i;
509	char *ep;
510
511	errno = 0;
512	i = strtoimax(str, &ep, 10);
513	assert(errno == 0);
514	assert(str[0] != 0);
515	assert(*ep == 0);
516	return i;
517}
518
519static int
520vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
521{
522	PGconn *conn = s->xc->conn;
523	unsigned int i;
524
525	assert(conn != NULL);
526	if (s->res == NULL) {
527		ExecStatusType status;
528		int error;
529
530		pqwait(s->xc);
531		s->res = PQgetResult(conn);
532		if (s->res == NULL) {
533			s->done = true;
534			return ENOENT;
535		}
536		status = PQresultStatus(s->res);
537		if (status == PGRES_FATAL_ERROR) {
538			error = sqltoerrno(
539			    PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
540			assert(error != 0);
541			dumperror(s->xc, s->res);
542			return error;
543		}
544		if (status != PGRES_TUPLES_OK) {
545			errx(1, "not tuples_ok: %s",
546			    PQerrorMessage(conn));
547		}
548		assert((unsigned int)PQnfields(s->res) == n);
549		s->nrows = PQntuples(s->res);
550		if (s->nrows == 0) {
551			DPRINTF("no rows\n");
552			return ENOENT;
553		}
554		assert(s->nrows >= 1);
555		s->cur = 0;
556	}
557	for (i = 0; i < n; i++) {
558		size_t size;
559
560		assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
561		DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
562		    i, PQftype(s->res, i), types[i],
563		    PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
564		    PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
565		    "<BINARY>");
566		assert(PQftype(s->res, i) == types[i]);
567		assert(!PQgetisnull(s->res, s->cur, i));
568		switch(types[i]) {
569		case INT8OID:
570			*va_arg(ap, int64_t *) =
571			    getint(PQgetvalue(s->res, s->cur, i));
572			break;
573		case OIDOID:
574			*va_arg(ap, Oid *) =
575			    getint(PQgetvalue(s->res, s->cur, i));
576			break;
577		case INT4OID:
578			*va_arg(ap, int32_t *) =
579			    getint(PQgetvalue(s->res, s->cur, i));
580			break;
581		case TEXTOID:
582			*va_arg(ap, char **) =
583			    estrdup(PQgetvalue(s->res, s->cur, i));
584			break;
585		case BYTEA:
586			size = PQgetlength(s->res, s->cur, i);
587			memcpy(va_arg(ap, void *),
588			    PQgetvalue(s->res, s->cur, i), size);
589			*va_arg(ap, size_t *) = size;
590			break;
591		default:
592			errx(EXIT_FAILURE, "%s unknown type %u", __func__,
593			    types[i]);
594		}
595	}
596	s->cur++;
597	if (s->cur == s->nrows) {
598		PQclear(s->res);
599		s->res = NULL;
600	}
601	return 0;
602}
603
604int
605fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
606{
607	va_list ap;
608	int error;
609
610	va_start(ap, types);
611	error = vfetchnext(s, n, types, ap);
612	va_end(ap);
613	return error;
614}
615
616void
617fetchdone(struct fetchstatus *s)
618{
619
620	if (s->res != NULL) {
621		PQclear(s->res);
622		s->res = NULL;
623	}
624	if (!s->done) {
625		PGresult *res;
626		unsigned int n;
627
628		n = 0;
629		while ((res = PQgetResult(s->xc->conn)) != NULL) {
630			PQclear(res);
631			n++;
632		}
633		if (n > 0) {
634			DPRINTF("%u rows dropped\n", n);
635		}
636	}
637}
638
639int
640simplefetch(struct Xconn *xc, Oid type, ...)
641{
642	struct fetchstatus s;
643	va_list ap;
644	int error;
645
646	fetchinit(&s, xc);
647	va_start(ap, type);
648	error = vfetchnext(&s, 1, &type, ap);
649	va_end(ap);
650	assert(error != 0 || s.res == NULL);
651	fetchdone(&s);
652	return error;
653}
654
655struct Xconn *
656begin(struct puffs_usermount *pu)
657{
658	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
659	static struct cmd *c;
660	int error;
661
662	CREATECMD_NOPARAM(c, "BEGIN");
663	assert(!xc->in_trans);
664	error = simplecmd(xc, c);
665	assert(error == 0);
666	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
667	xc->in_trans = true;
668	return xc;
669}
670
671struct Xconn *
672begin_readonly(struct puffs_usermount *pu)
673{
674	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
675	static struct cmd *c;
676	int error;
677
678	CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
679	assert(!xc->in_trans);
680	error = simplecmd(xc, c);
681	assert(error == 0);
682	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
683	xc->in_trans = true;
684	return xc;
685}
686
687void
688rollback(struct Xconn *xc)
689{
690	PGTransactionStatusType status;
691
692	/*
693	 * check the status as we are not sure the status of our transaction
694	 * after a failed commit.
695	 */
696	status = PQtransactionStatus(xc->conn);
697	assert(status != PQTRANS_ACTIVE);
698	assert(status != PQTRANS_UNKNOWN);
699	if (status != PQTRANS_IDLE) {
700		static struct cmd *c;
701		int error;
702
703		assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
704		CREATECMD_NOPARAM(c, "ROLLBACK");
705		error = simplecmd(xc, c);
706		assert(error == 0);
707	}
708	DPRINTF("xc %p rollback %p\n", xc, xc->owner);
709	relxc(xc);
710}
711
712int
713commit(struct Xconn *xc)
714{
715	static struct cmd *c;
716	int error;
717
718	CREATECMD_NOPARAM(c, "COMMIT");
719	error = simplecmd(xc, c);
720	if (error == 0) {
721		DPRINTF("xc %p commit %p\n", xc, xc->owner);
722		relxc(xc);
723	}
724	return error;
725}
726
727int
728commit_sync(struct Xconn *xc)
729{
730	static struct cmd *c;
731	int error;
732
733	assert(!pgfs_dosync);
734	CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
735	error = simplecmd(xc, c);
736	assert(error == 0);
737	return commit(xc);
738}
739
740static void
741pgfs_notice_receiver(void *vp, const PGresult *res)
742{
743	struct Xconn *xc = vp;
744
745	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
746	fprintf(stderr, "got a notice on %p\n", xc);
747	dumperror(xc, res);
748}
749
750static int
751pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
752    int fd, int *done)
753{
754	struct Xconn *xc;
755	PGconn *conn;
756
757	TAILQ_FOREACH(xc, &xclist, list) {
758		if (PQsocket(xc->conn) == fd) {
759			break;
760		}
761	}
762	assert(xc != NULL);
763	conn = xc->conn;
764	PQconsumeInput(conn);
765	if (!PQisBusy(conn)) {
766		if (xc->blocker != NULL) {
767			DPRINTF("schedule %p\n", xc->blocker);
768			puffs_cc_schedule(xc->blocker);
769		} else {
770			DPRINTF("no blockers\n");
771		}
772	}
773	*done = 0;
774	return 0;
775}
776
777int
778pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
779    const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
780{
781	const char *keywords[3+1];
782	const char *values[3];
783	unsigned int i;
784
785	if (nconn > 32) {
786		/*
787		 * limit from sizeof(cmd->prepared_mask)
788		 */
789		return EINVAL;
790	}
791	if (debug) {
792		pgfs_dodprintf = true;
793	}
794	if (synchronous) {
795		pgfs_dosync = true;
796	}
797	i = 0;
798	if (dbname != NULL) {
799		keywords[i] = "dbname";
800		values[i] = dbname;
801		i++;
802	}
803	if (dbuser != NULL) {
804		keywords[i] = "user";
805		values[i] = dbuser;
806		i++;
807	}
808	keywords[i] = "application_name";
809	values[i] = "pgfs";
810	i++;
811	keywords[i] = NULL;
812	puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
813	for (i = 0; i < nconn; i++) {
814		struct Xconn *xc;
815		struct Xconn *xc2;
816		static int xcid;
817		PGconn *conn;
818		struct cmd *c;
819		int error;
820
821		conn = PQconnectdbParams(keywords, values, 0);
822		if (conn == NULL) {
823			errx(EXIT_FAILURE,
824			    "PQconnectdbParams: unknown failure");
825		}
826		if (PQstatus(conn) != CONNECTION_OK) {
827			/*
828			 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
829			 */
830			errx(EXIT_FAILURE, "PQconnectdbParams: %s",
831			    PQerrorMessage(conn));
832		}
833		DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
834		puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
835		xc = emalloc(sizeof(*xc));
836		xc->conn = conn;
837		xc->blocker = NULL;
838		xc->owner = NULL;
839		xc->in_trans = false;
840		xc->id = xcid++;
841		assert(xc->id < 32);
842		PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
843		TAILQ_INSERT_HEAD(&xclist, xc, list);
844		xc2 = begin(pu);
845		assert(xc2 == xc);
846		c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
847		error = simplecmd(xc, c);
848		assert(error == 0);
849		freecmd(c);
850		c = createcmd("SET SESSION CHARACTERISTICS AS "
851		    "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
852		    CMD_NOPREPARE);
853		error = simplecmd(xc, c);
854		assert(error == 0);
855		freecmd(c);
856		c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
857		error = simplecmd(xc, c);
858		assert(error == 0);
859		freecmd(c);
860		if (!pgfs_dosync) {
861			c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
862			    CMD_NOPREPARE);
863			error = simplecmd(xc, c);
864			assert(error == 0);
865			freecmd(c);
866		}
867		if (debug) {
868			struct fetchstatus s;
869			static const Oid types[] = { INT8OID, };
870			uint64_t pid;
871
872			c = createcmd("SELECT pg_backend_pid()::int8;",
873			    CMD_NOPREPARE);
874			error = sendcmd(xc, c);
875			assert(error == 0);
876			fetchinit(&s, xc);
877			error = FETCHNEXT(&s, types, &pid);
878			fetchdone(&s);
879			assert(error == 0);
880			DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
881		}
882		error = commit(xc);
883		assert(error == 0);
884		assert(xc->owner == NULL);
885	}
886	/*
887	 * XXX cleanup unlinked files here?  what to do when the filesystem
888	 * is shared?
889	 */
890	return 0;
891}
892
893struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
894struct puffs_cc *flusher = NULL;
895
896int
897flush_xacts(struct puffs_usermount *pu)
898{
899	struct puffs_cc *cc = puffs_cc_getcc(pu);
900	struct Xconn *xc;
901	static struct cmd *c;
902	uint64_t dummy;
903	int error;
904
905	/*
906	 * flush all previously issued asynchronous transactions.
907	 *
908	 * XXX
909	 * unfortunately it seems that there is no clean way to tell
910	 * PostgreSQL flush XLOG.  we could perform a CHECKPOINT but it's
911	 * too expensive and overkill for our purpose.
912	 * besides, PostgreSQL has an optimization to skip XLOG flushing
913	 * for transactions which didn't produce WAL records.
914	 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
915	 * it means that an empty transaction ("BEGIN; COMMIT;"), which
916	 * doesn't produce any WAL records, doesn't flush the XLOG even if
917	 * synchronous_commit=on.  we issues a dummy setval() to avoid the
918	 * optimization.
919	 * on the other hand, we try to avoid creating unnecessary WAL activity
920	 * by serializing flushing and checking XLOG locations.
921	 */
922
923	assert(!pgfs_dosync);
924	if (flusher != NULL) { /* serialize flushers */
925		DPRINTF("%p flush in progress %p\n", cc, flusher);
926		waiton(&flushwaitq, cc);
927		assert(flusher == NULL);
928	}
929	DPRINTF("%p start flushing\n", cc);
930	flusher = cc;
931retry:
932	xc = begin(pu);
933	CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
934	    "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
935	error = sendcmd(xc, c);
936	if (error != 0) {
937		goto got_error;
938	}
939	error = simplefetch(xc, INT8OID, &dummy);
940	assert(error != 0 || dummy == 1);
941	if (error == ENOENT) {
942		/*
943		 * there seems to be nothing to flush.
944		 */
945		DPRINTF("%p no sync\n", cc);
946		error = 0;
947	}
948	if (error != 0) {
949		goto got_error;
950	}
951	error = commit_sync(xc);
952	if (error != 0) {
953		goto got_error;
954	}
955	goto done;
956got_error:
957	rollback(xc);
958	if (error == EAGAIN) {
959		goto retry;
960	}
961done:
962	assert(flusher == cc);
963	flusher = NULL;
964	wakeup_one(&flushwaitq);
965	DPRINTF("%p end flushing error=%d\n", cc, error);
966	return error;
967}
968