1/*	$NetBSD: pgfs_subs.c,v 1.2 2011/10/12 16:24:39 yamt Exp $	*/
2
3/*-
4 * Copyright (c)2010,2011 YAMAMOTO Takashi,
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 *    notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 *    notice, this list of conditions and the following disclaimer in the
14 *    documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29/*
30 * a file system server which stores the data in a PostgreSQL database.
31 */
32
33/*
34 * we use large objects to store file contents.  there are a few XXXs wrt it.
35 *
36 * - large objects don't obey the normal transaction semantics.
37 *
38 * - we use large object server-side functions directly (instead of via the
39 *   libpq large object api) because:
40 *	- we want to use asynchronous (in the sense of PQsendFoo) operations
41 *	  which is not available with the libpq large object api.
42 *	- with the libpq large object api, there's no way to know details of
43 *	  an error because PGresult is freed in the library without saving
44 *	  PG_DIAG_SQLSTATE etc.
45 */
46
47#include <sys/cdefs.h>
48#ifndef lint
49__RCSID("$NetBSD: pgfs_subs.c,v 1.2 2011/10/12 16:24:39 yamt Exp $");
50#endif /* not lint */
51
52#include <assert.h>
53#include <err.h>
54#include <errno.h>
55#include <puffs.h>
56#include <inttypes.h>
57#include <stdarg.h>
58#include <stdbool.h>
59#include <stdio.h>
60#include <stdlib.h>
61#include <time.h>
62#include <util.h>
63
64#include <libpq-fe.h>
65#include <libpq/libpq-fs.h>	/* INV_* */
66
67#include "pgfs.h"
68#include "pgfs_db.h"
69#include "pgfs_debug.h"
70#include "pgfs_waitq.h"
71#include "pgfs_subs.h"
72
73const char * const vtype_table[] = {
74	[VREG] = "regular",
75	[VDIR] = "directory",
76	[VLNK] = "link",
77};
78
79static unsigned int
80tovtype(const char *type)
81{
82	unsigned int i;
83
84	for (i = 0; i < __arraycount(vtype_table); i++) {
85		if (vtype_table[i] == NULL) {
86			continue;
87		}
88		if (!strcmp(type, vtype_table[i])) {
89			return i;
90		}
91	}
92	assert(0);
93	return 0;
94}
95
96static const char *
97fromvtype(enum vtype vtype)
98{
99
100	if (vtype < __arraycount(vtype_table)) {
101		assert(vtype_table[vtype] != NULL);
102		return vtype_table[vtype];
103	}
104	return NULL;
105}
106
107/*
108 * fileid_lock stuff below is to keep ordering of operations for a file.
109 * it is a workaround for the lack of operation barriers in the puffs
110 * protocol.
111 *
112 * currently we do this locking only for SETATTR, GETATTR, and WRITE as
113 * they are known to be reorder-unsafe.  they are sensitive to the file
114 * attributes, mainly the file size.  note that as the kernel issues async
115 * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing
116 * the stale attributes.
117 *
118 * we are relying on waiton/wakeup being a FIFO.
119 */
120
121struct fileid_lock_handle {
122	TAILQ_ENTRY(fileid_lock_handle) list;
123	fileid_t fileid;
124	struct puffs_cc *owner;	/* diagnostic only */
125	struct waitq waitq;
126};
127
128TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list =
129    TAILQ_HEAD_INITIALIZER(fileid_lock_list);
130struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq);
131
132/*
133 * fileid_lock: serialize requests for the fileid.
134 *
135 * this function should be the first yieldable point in a puffs callback.
136 */
137
138struct fileid_lock_handle *
139fileid_lock(fileid_t fileid, struct puffs_cc *cc)
140{
141	struct fileid_lock_handle *lock;
142
143	TAILQ_FOREACH(lock, &fileid_lock_list, list) {
144		if (lock->fileid == fileid) {
145			DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc);
146			assert(lock->owner != cc);
147			waiton(&lock->waitq, cc);	/* enter FIFO */
148			assert(lock->owner == cc);
149			return lock;
150		}
151	}
152	lock = emalloc(sizeof(*lock));
153	lock->fileid = fileid;
154	lock->owner = cc;
155	DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc);
156	waitq_init(&lock->waitq);
157	TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list);
158	return lock;
159}
160
161void
162fileid_unlock(struct fileid_lock_handle *lock)
163{
164
165	DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid);
166	assert(lock != NULL);
167	assert(lock->owner != NULL);
168	/*
169	 * perform direct-handoff to the first waiter.
170	 *
171	 * a handoff is essential to keep the order of requests.
172	 */
173	lock->owner = wakeup_one(&lock->waitq);
174	if (lock->owner != NULL) {
175		return;
176	}
177	/*
178	 * no one is waiting this fileid.
179	 */
180	TAILQ_REMOVE(&fileid_lock_list, lock, list);
181	free(lock);
182}
183
184/*
185 * timespec_to_pgtimestamp: create a text representation of timestamp which
186 * can be recognized by the database server.
187 *
188 * it's caller's responsibility to free(3) the result.
189 */
190
191int
192timespec_to_pgtimestamp(const struct timespec *tv, char **resultp)
193{
194	/*
195	 * XXX is there any smarter way?
196	 */
197	char buf1[1024];
198	char buf2[1024];
199	struct tm tm_store;
200	struct tm *tm;
201
202	tm = gmtime_r(&tv->tv_sec, &tm_store);
203	if (tm == NULL) {
204		assert(errno != 0);
205		return errno;
206	}
207	strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm);
208	snprintf(buf2, sizeof(buf2), "%s.%ju", buf1,
209	    (uintmax_t)tv->tv_nsec / 1000);
210	*resultp = estrdup(buf2);
211	return 0;
212}
213
214int
215my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size)
216{
217	static struct cmd *c;
218	int32_t ret;
219	int error;
220
221	CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID);
222	error = sendcmd(xc, c, fd, size);
223	if (error != 0) {
224		return error;
225	}
226	error = simplefetch(xc, INT4OID, &ret);
227	if (error != 0) {
228		if (error == EEXIST) {
229			/*
230			 * probably the insertion of the new-sized page
231			 * caused a duplicated key error.  retry.
232			 */
233			DPRINTF("map EEXIST to EAGAIN\n");
234			error = EAGAIN;
235		}
236		return error;
237	}
238	assert(ret == 0);
239	return 0;
240}
241
242int
243my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence,
244    int32_t *retp)
245{
246	static struct cmd *c;
247	int32_t ret;
248	int error;
249
250	CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID);
251	error = sendcmd(xc, c, fd, offset, whence);
252	if (error != 0) {
253		return error;
254	}
255	error = simplefetch(xc, INT4OID, &ret);
256	if (error != 0) {
257		return error;
258	}
259	if (retp != NULL) {
260		*retp = ret;
261	}
262	return 0;
263}
264
265int
266my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size,
267    size_t *resultsizep)
268{
269	static struct cmd *c;
270	size_t resultsize;
271	int error;
272
273	CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID);
274	error = sendcmdx(xc, 1, c, fd, (int32_t)size);
275	if (error != 0) {
276		return error;
277	}
278	error = simplefetch(xc, BYTEA, buf, &resultsize);
279	if (error != 0) {
280		return error;
281	}
282	*resultsizep = resultsize;
283	if (size != resultsize) {
284		DPRINTF("shortread? %zu != %zu\n", size, resultsize);
285	}
286	return 0;
287}
288
289int
290my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size,
291    size_t *resultsizep)
292{
293	static struct cmd *c;
294	int32_t resultsize;
295	int error;
296
297	CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA);
298	error = sendcmd(xc, c, fd, buf, (int32_t)size);
299	if (error != 0) {
300		return error;
301	}
302	error = simplefetch(xc, INT4OID, &resultsize);
303	if (error != 0) {
304		if (error == EEXIST) {
305			/*
306			 * probably the insertion of the new data page
307			 * caused a duplicated key error.  retry.
308			 */
309			DPRINTF("map EEXIST to EAGAIN\n");
310			error = EAGAIN;
311		}
312		return error;
313	}
314	*resultsizep = resultsize;
315	if (size != (size_t)resultsize) {
316		DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize);
317	}
318	return 0;
319}
320
321int
322my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp)
323{
324	static struct cmd *c;
325	int error;
326
327	CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID);
328	error = sendcmd(xc, c, loid, mode);
329	if (error != 0) {
330		return error;
331	}
332	return simplefetch(xc, INT4OID, fdp);
333}
334
335int
336my_lo_close(struct Xconn *xc, int32_t fd)
337{
338	static struct cmd *c;
339	int32_t ret;
340	int error;
341
342	CREATECMD(c, "SELECT lo_close($1)", INT4OID);
343	error = sendcmd(xc, c, fd);
344	if (error != 0) {
345		return error;
346	}
347	error = simplefetch(xc, INT4OID, &ret);
348	if (error != 0) {
349		return error;
350	}
351	assert(ret == 0);
352	return 0;
353}
354
355static int
356lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp)
357{
358	static struct cmd *c;
359	static const Oid types[] = { OIDOID, };
360	struct fetchstatus s;
361	int error;
362
363	CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID);
364	error = sendcmd(xc, c, fileid);
365	if (error != 0) {
366		return error;
367	}
368	fetchinit(&s, xc);
369	error = FETCHNEXT(&s, types, idp);
370	fetchdone(&s);
371	DPRINTF("error %d\n", error);
372	return error;
373}
374
375int
376lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp)
377{
378	Oid loid;
379	int fd;
380	int error;
381
382	error = lo_lookup_by_fileid(xc, fileid, &loid);
383	if (error != 0) {
384		return error;
385	}
386	error = my_lo_open(xc, loid, mode, &fd);
387	if (error != 0) {
388		return error;
389	}
390	*fdp = fd;
391	return 0;
392}
393
394static int
395getsize(struct Xconn *xc, fileid_t fileid, int *resultp)
396{
397	int32_t size;
398	int fd;
399	int error;
400
401	error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
402	if (error != 0) {
403		return error;
404	}
405	error = my_lo_lseek(xc, fd, 0, SEEK_END, &size);
406	if (error != 0) {
407		return error;
408	}
409	error = my_lo_close(xc, fd);
410	if (error != 0) {
411		return error;
412	}
413	*resultp = size;
414	return 0;
415}
416
417#define	GETATTR_TYPE	0x00000001
418#define	GETATTR_NLINK	0x00000002
419#define	GETATTR_SIZE	0x00000004
420#define	GETATTR_MODE	0x00000008
421#define	GETATTR_UID	0x00000010
422#define	GETATTR_GID	0x00000020
423#define	GETATTR_TIME	0x00000040
424#define	GETATTR_ALL	\
425	(GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \
426	GETATTR_UID|GETATTR_GID|GETATTR_TIME)
427
428int
429getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask)
430{
431	char *type;
432	long long atime_s;
433	long long atime_us;
434	long long ctime_s;
435	long long ctime_us;
436	long long mtime_s;
437	long long mtime_us;
438	long long btime_s;
439	long long btime_us;
440	uint64_t mode;
441	long long uid;
442	long long gid;
443	long long nlink;
444	long long rev;
445	struct fetchstatus s;
446	int error;
447
448	if (mask == 0) {
449		return 0;
450	}
451	/*
452	 * unless explicitly requested, avoid fetching timestamps as they
453	 * are a little more expensive than other simple attributes.
454	 */
455	if ((mask & GETATTR_TIME) != 0) {
456		static struct cmd *c;
457		static const Oid types[] = {
458			TEXTOID,
459			INT8OID,
460			INT8OID,
461			INT8OID,
462			INT8OID,
463			INT8OID,
464			INT8OID,
465			INT8OID,
466			INT8OID,
467			INT8OID,
468			INT8OID,
469			INT8OID,
470			INT8OID,
471			INT8OID,
472		};
473
474		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, "
475		    "extract(epoch from date_trunc('second', atime))::int8, "
476		    "extract(microseconds from atime)::int8, "
477		    "extract(epoch from date_trunc('second', ctime))::int8, "
478		    "extract(microseconds from ctime)::int8, "
479		    "extract(epoch from date_trunc('second', mtime))::int8, "
480		    "extract(microseconds from mtime)::int8, "
481		    "extract(epoch from date_trunc('second', btime))::int8, "
482		    "extract(microseconds from btime)::int8 "
483		    "FROM file "
484		    "WHERE fileid = $1", INT8OID);
485		error = sendcmd(xc, c, fileid);
486		if (error != 0) {
487			return error;
488		}
489		fetchinit(&s, xc);
490		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
491		    &rev,
492		    &atime_s, &atime_us,
493		    &ctime_s, &ctime_us,
494		    &mtime_s, &mtime_us,
495		    &btime_s, &btime_us);
496	} else {
497		static struct cmd *c;
498		static const Oid types[] = {
499			TEXTOID,
500			INT8OID,
501			INT8OID,
502			INT8OID,
503			INT8OID,
504			INT8OID,
505		};
506
507		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev "
508		    "FROM file "
509		    "WHERE fileid = $1", INT8OID);
510		error = sendcmd(xc, c, fileid);
511		if (error != 0) {
512			return error;
513		}
514		fetchinit(&s, xc);
515		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
516		    &rev);
517	}
518	fetchdone(&s);
519	if (error != 0) {
520		return error;
521	}
522	memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */
523	va->va_type = tovtype(type);
524	free(type);
525	va->va_mode = mode;
526	va->va_uid = uid;
527	va->va_gid = gid;
528	if (nlink > 0 && va->va_type == VDIR) {
529		nlink++; /* "." */
530	}
531	va->va_nlink = nlink;
532	va->va_fileid = fileid;
533	va->va_atime.tv_sec = atime_s;
534	va->va_atime.tv_nsec = atime_us * 1000;
535	va->va_ctime.tv_sec = ctime_s;
536	va->va_ctime.tv_nsec = ctime_us * 1000;
537	va->va_mtime.tv_sec = mtime_s;
538	va->va_mtime.tv_nsec = mtime_us * 1000;
539	va->va_birthtime.tv_sec = btime_s;
540	va->va_birthtime.tv_nsec = btime_us * 1000;
541	va->va_blocksize = LOBLKSIZE;
542	va->va_gen = 1;
543	va->va_filerev = rev;
544	if ((mask & GETATTR_SIZE) != 0) {
545		int size;
546
547		size = 0;
548		if (va->va_type == VREG || va->va_type == VLNK) {
549			error = getsize(xc, fileid, &size);
550			if (error != 0) {
551				return error;
552			}
553		} else if (va->va_type == VDIR) {
554			size = 100; /* XXX */
555		}
556		va->va_size = size;
557	}
558	/*
559	 * XXX va_bytes: likely wrong due to toast compression.
560	 * there's no cheap way to get the compressed size of LO.
561	 */
562	va->va_bytes = va->va_size;
563	va->va_flags = 0;
564	return 0;
565}
566
567int
568update_mctime(struct Xconn *xc, fileid_t fileid)
569{
570	static struct cmd *c;
571
572	CREATECMD(c,
573	    "UPDATE file "
574	    "SET mtime = current_timestamp, ctime = current_timestamp, "
575		"rev = rev + 1 "
576	    "WHERE fileid = $1", INT8OID);
577	return simplecmd(xc, c, fileid);
578}
579
580int
581update_atime(struct Xconn *xc, fileid_t fileid)
582{
583	static struct cmd *c;
584
585	CREATECMD(c,
586	    "UPDATE file SET atime = current_timestamp WHERE fileid = $1",
587	    INT8OID);
588	return simplecmd(xc, c, fileid);
589}
590
591int
592update_mtime(struct Xconn *xc, fileid_t fileid)
593{
594	static struct cmd *c;
595
596	CREATECMD(c,
597	    "UPDATE file "
598	    "SET mtime = current_timestamp, rev = rev + 1 "
599	    "WHERE fileid = $1", INT8OID);
600	return simplecmd(xc, c, fileid);
601}
602
603int
604update_ctime(struct Xconn *xc, fileid_t fileid)
605{
606	static struct cmd *c;
607
608	CREATECMD(c,
609	    "UPDATE file SET ctime = current_timestamp WHERE fileid = $1",
610	    INT8OID);
611	return simplecmd(xc, c, fileid);
612}
613
614int
615update_nlink(struct Xconn *xc, fileid_t fileid, int delta)
616{
617	static struct cmd *c;
618
619	CREATECMD(c,
620	    "UPDATE file "
621	    "SET nlink = nlink + $1 "
622	    "WHERE fileid = $2",
623	    INT8OID, INT8OID);
624	return simplecmd(xc, c, (int64_t)delta, fileid);
625}
626
627int
628lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent)
629{
630	static struct cmd *c;
631	static const Oid types[] = { INT8OID, };
632	struct fetchstatus s;
633	int error;
634
635	CREATECMD(c, "SELECT parent_fileid FROM dirent "
636		"WHERE child_fileid = $1 LIMIT 1", INT8OID);
637	error = sendcmd(xc, c, fileid);
638	if (error != 0) {
639		return error;
640	}
641	fetchinit(&s, xc);
642	error = FETCHNEXT(&s, types, parent);
643	fetchdone(&s);
644	if (error != 0) {
645		return error;
646	}
647	return 0;
648}
649
650int
651mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid,
652    fileid_t *idp)
653{
654	static struct cmd *c;
655	const char *type;
656	int error;
657
658	type = fromvtype(vtype);
659	if (type == NULL) {
660		return EOPNOTSUPP;
661	}
662	CREATECMD(c,
663		"INSERT INTO file "
664		"(fileid, type, mode, uid, gid, nlink, rev, "
665		"atime, ctime, mtime, btime) "
666		"VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, "
667		"current_timestamp, "
668		"current_timestamp, "
669		"current_timestamp, "
670		"current_timestamp) "
671		"RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID);
672	error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid,
673	    (uint64_t)gid);
674	if (error != 0) {
675		return error;
676	}
677	return simplefetch(xc, INT8OID, idp);
678}
679
680int
681linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
682{
683	static struct cmd *c;
684	int error;
685
686	CREATECMD(c,
687		"INSERT INTO dirent "
688		"(parent_fileid, name, child_fileid) "
689		"VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID);
690	error = simplecmd(xc, c, parent, name, child);
691	if (error != 0) {
692		return error;
693	}
694	error = update_nlink(xc, child, 1);
695	if (error != 0) {
696		return error;
697	}
698	return update_mtime(xc, parent);
699}
700
701int
702unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
703{
704	static struct cmd *c;
705	int error;
706
707	/*
708	 * in addition to the primary key, we check child_fileid as well here
709	 * to avoid removing an entry which was appeared after our VOP_LOOKUP.
710	 */
711	CREATECMD(c,
712		"DELETE FROM dirent "
713		"WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3",
714		INT8OID, TEXTOID, INT8OID);
715	error = simplecmd(xc, c, parent, name, child);
716	if (error != 0) {
717		return error;
718	}
719	error = update_nlink(xc, child, -1);
720	if (error != 0) {
721		return error;
722	}
723	error = update_mtime(xc, parent);
724	if (error != 0) {
725		return error;
726	}
727	return update_ctime(xc, child);
728}
729
730int
731mklinkfile(struct Xconn *xc, fileid_t parent, const char *name,
732    enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp)
733{
734	fileid_t fileid;
735	int error;
736
737	error = mkfile(xc, vtype, mode, uid, gid, &fileid);
738	if (error != 0) {
739		return error;
740	}
741	error = linkfile(xc, parent, name, fileid);
742	if (error != 0) {
743		return error;
744	}
745	if (idp != NULL) {
746		*idp = fileid;
747	}
748	return 0;
749}
750
751int
752mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name,
753    enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp,
754    int *loidp)
755{
756	static struct cmd *c;
757	fileid_t new_fileid;
758	int loid;
759	int error;
760
761	error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid,
762	    &new_fileid);
763	if (error != 0) {
764		return error;
765	}
766	CREATECMD(c,
767		"INSERT INTO datafork (fileid, loid) "
768		"VALUES($1, lo_creat(-1)) "
769		"RETURNING loid", INT8OID);
770	error = sendcmd(xc, c, new_fileid);
771	if (error != 0) {
772		return error;
773	}
774	error = simplefetch(xc, OIDOID, &loid);
775	if (error != 0) {
776		return error;
777	}
778	if (fileidp != NULL) {
779		*fileidp = new_fileid;
780	}
781	if (loidp != NULL) {
782		*loidp = loid;
783	}
784	return 0;
785}
786
787int
788cleanupfile(struct Xconn *xc, fileid_t fileid, struct vattr *va)
789{
790	static struct cmd *c;
791
792	/*
793	 * XXX what to do when the filesystem is shared?
794	 */
795
796	if (va->va_type == VREG || va->va_type == VLNK) {
797		static struct cmd *c_datafork;
798		int32_t ret;
799		int error;
800
801		CREATECMD(c_datafork,
802			"WITH loids AS (DELETE FROM datafork WHERE fileid = $1 "
803			"RETURNING loid) SELECT lo_unlink(loid) FROM loids",
804			INT8OID);
805		error = sendcmd(xc, c_datafork, fileid);
806		if (error != 0) {
807			return error;
808		}
809		error = simplefetch(xc, INT4OID, &ret);
810		if (error != 0) {
811			return error;
812		}
813		if (ret != 1) {
814			return EIO; /* lo_unlink failed */
815		}
816	}
817	CREATECMD(c, "DELETE FROM file WHERE fileid = $1", INT8OID);
818	return simplecmd(xc, c, fileid);
819}
820
821/*
822 * check_path: do locking and check to prevent a rename from creating loop.
823 *
824 * lock the dirents between child_fileid and the root directory.
825 * if gate_fileid is appeared in the path, return EINVAL.
826 * caller should ensure that child_fileid is of VDIR beforehand.
827 *
828 * we uses FOR SHARE row level locks as poor man's predicate locks.
829 *
830 * the following is an example to show why we need to lock the path.
831 *
832 * consider:
833 * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6"
834 * and then
835 * thread 1 is doing "mv /a/b /1/2/3/4/5/6"
836 * thread 2 is doing "mv /1/2 /a/b/c/d/e/f"
837 *
838 * a possible consequence:
839 *	thread 1: check_path -> success
840 *	thread 2: check_path -> success
841 *	thread 1: modify directories -> block on row-level lock
842 *	thread 2: modify directories -> block on row-level lock
843 *			-> deadlock detected
844 *			-> rollback and retry
845 *
846 * another possible consequence:
847 *	thread 1: check_path -> success
848 *	thread 1: modify directory entries -> success
849 *	thread 2: check_path -> block on row-level lock
850 *	thread 1: commit
851 *	thread 2: acquire the lock and notices the row is updated
852 *			-> serialization error
853 *			-> rollback and retry
854 *
855 * XXX it might be better to use real serializable transactions,
856 * which will be available for PostgreSQL 9.1
857 */
858
859int
860check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid)
861{
862	static struct cmd *c;
863	fileid_t parent_fileid;
864	struct fetchstatus s;
865	int error;
866
867	CREATECMD(c,
868		"WITH RECURSIVE r AS "
869		"( "
870				"SELECT parent_fileid, cookie, child_fileid "
871				"FROM dirent "
872				"WHERE child_fileid = $1 "
873			"UNION ALL "
874				"SELECT d.parent_fileid, d.cookie, "
875				"d.child_fileid "
876				"FROM dirent AS d INNER JOIN r "
877				"ON d.child_fileid = r.parent_fileid "
878		") "
879		"SELECT d.parent_fileid "
880		"FROM dirent d "
881		"JOIN r "
882		"ON d.cookie = r.cookie "
883		"FOR SHARE", INT8OID);
884	error = sendcmd(xc, c, child_fileid);
885	if (error != 0) {
886		return error;
887	}
888	fetchinit(&s, xc);
889	do {
890		static const Oid types[] = { INT8OID, };
891
892		error = FETCHNEXT(&s, types, &parent_fileid);
893		if (error == ENOENT) {
894			fetchdone(&s);
895			return 0;
896		}
897		if (error != 0) {
898			fetchdone(&s);
899			return error;
900		}
901	} while (gate_fileid != parent_fileid);
902	fetchdone(&s);
903	return EINVAL;
904}
905
906int
907isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp)
908{
909	int32_t dummy;
910	static struct cmd *c;
911	int error;
912
913	CREATECMD(c,
914		"SELECT 1 FROM dirent "
915		"WHERE parent_fileid = $1 LIMIT 1", INT8OID);
916	error = sendcmd(xc, c, fileid);
917	if (error != 0) {
918		return error;
919	}
920	error = simplefetch(xc, INT4OID, &dummy);
921	assert(error != 0 || dummy == 1);
922	if (error == ENOENT) {
923		*emptyp = true;
924		error = 0;
925	} else {
926		*emptyp = false;
927	}
928	return error;
929}
930