mds_client.c revision a76d0a9c
1// SPDX-License-Identifier: GPL-2.0
2#include <linux/ceph/ceph_debug.h>
3
4#include <linux/fs.h>
5#include <linux/wait.h>
6#include <linux/slab.h>
7#include <linux/gfp.h>
8#include <linux/sched.h>
9#include <linux/debugfs.h>
10#include <linux/seq_file.h>
11#include <linux/ratelimit.h>
12#include <linux/bits.h>
13#include <linux/ktime.h>
14#include <linux/bitmap.h>
15
16#include "super.h"
17#include "mds_client.h"
18
19#include <linux/ceph/ceph_features.h>
20#include <linux/ceph/messenger.h>
21#include <linux/ceph/decode.h>
22#include <linux/ceph/pagelist.h>
23#include <linux/ceph/auth.h>
24#include <linux/ceph/debugfs.h>
25
26#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
27
28/*
29 * A cluster of MDS (metadata server) daemons is responsible for
30 * managing the file system namespace (the directory hierarchy and
31 * inodes) and for coordinating shared access to storage.  Metadata is
32 * partitioning hierarchically across a number of servers, and that
33 * partition varies over time as the cluster adjusts the distribution
34 * in order to balance load.
35 *
36 * The MDS client is primarily responsible to managing synchronous
37 * metadata requests for operations like open, unlink, and so forth.
38 * If there is a MDS failure, we find out about it when we (possibly
39 * request and) receive a new MDS map, and can resubmit affected
40 * requests.
41 *
42 * For the most part, though, we take advantage of a lossless
43 * communications channel to the MDS, and do not need to worry about
44 * timing out or resubmitting requests.
45 *
46 * We maintain a stateful "session" with each MDS we interact with.
47 * Within each session, we sent periodic heartbeat messages to ensure
48 * any capabilities or leases we have been issues remain valid.  If
49 * the session times out and goes stale, our leases and capabilities
50 * are no longer valid.
51 */
52
53struct ceph_reconnect_state {
54	struct ceph_mds_session *session;
55	int nr_caps, nr_realms;
56	struct ceph_pagelist *pagelist;
57	unsigned msg_version;
58	bool allow_multi;
59};
60
61static void __wake_requests(struct ceph_mds_client *mdsc,
62			    struct list_head *head);
63static void ceph_cap_release_work(struct work_struct *work);
64static void ceph_cap_reclaim_work(struct work_struct *work);
65
66static const struct ceph_connection_operations mds_con_ops;
67
68
69/*
70 * mds reply parsing
71 */
72
73static int parse_reply_info_quota(void **p, void *end,
74				  struct ceph_mds_reply_info_in *info)
75{
76	u8 struct_v, struct_compat;
77	u32 struct_len;
78
79	ceph_decode_8_safe(p, end, struct_v, bad);
80	ceph_decode_8_safe(p, end, struct_compat, bad);
81	/* struct_v is expected to be >= 1. we only
82	 * understand encoding with struct_compat == 1. */
83	if (!struct_v || struct_compat != 1)
84		goto bad;
85	ceph_decode_32_safe(p, end, struct_len, bad);
86	ceph_decode_need(p, end, struct_len, bad);
87	end = *p + struct_len;
88	ceph_decode_64_safe(p, end, info->max_bytes, bad);
89	ceph_decode_64_safe(p, end, info->max_files, bad);
90	*p = end;
91	return 0;
92bad:
93	return -EIO;
94}
95
96/*
97 * parse individual inode info
98 */
99static int parse_reply_info_in(void **p, void *end,
100			       struct ceph_mds_reply_info_in *info,
101			       u64 features)
102{
103	int err = 0;
104	u8 struct_v = 0;
105
106	if (features == (u64)-1) {
107		u32 struct_len;
108		u8 struct_compat;
109		ceph_decode_8_safe(p, end, struct_v, bad);
110		ceph_decode_8_safe(p, end, struct_compat, bad);
111		/* struct_v is expected to be >= 1. we only understand
112		 * encoding with struct_compat == 1. */
113		if (!struct_v || struct_compat != 1)
114			goto bad;
115		ceph_decode_32_safe(p, end, struct_len, bad);
116		ceph_decode_need(p, end, struct_len, bad);
117		end = *p + struct_len;
118	}
119
120	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
121	info->in = *p;
122	*p += sizeof(struct ceph_mds_reply_inode) +
123		sizeof(*info->in->fragtree.splits) *
124		le32_to_cpu(info->in->fragtree.nsplits);
125
126	ceph_decode_32_safe(p, end, info->symlink_len, bad);
127	ceph_decode_need(p, end, info->symlink_len, bad);
128	info->symlink = *p;
129	*p += info->symlink_len;
130
131	ceph_decode_copy_safe(p, end, &info->dir_layout,
132			      sizeof(info->dir_layout), bad);
133	ceph_decode_32_safe(p, end, info->xattr_len, bad);
134	ceph_decode_need(p, end, info->xattr_len, bad);
135	info->xattr_data = *p;
136	*p += info->xattr_len;
137
138	if (features == (u64)-1) {
139		/* inline data */
140		ceph_decode_64_safe(p, end, info->inline_version, bad);
141		ceph_decode_32_safe(p, end, info->inline_len, bad);
142		ceph_decode_need(p, end, info->inline_len, bad);
143		info->inline_data = *p;
144		*p += info->inline_len;
145		/* quota */
146		err = parse_reply_info_quota(p, end, info);
147		if (err < 0)
148			goto out_bad;
149		/* pool namespace */
150		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
151		if (info->pool_ns_len > 0) {
152			ceph_decode_need(p, end, info->pool_ns_len, bad);
153			info->pool_ns_data = *p;
154			*p += info->pool_ns_len;
155		}
156
157		/* btime */
158		ceph_decode_need(p, end, sizeof(info->btime), bad);
159		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
160
161		/* change attribute */
162		ceph_decode_64_safe(p, end, info->change_attr, bad);
163
164		/* dir pin */
165		if (struct_v >= 2) {
166			ceph_decode_32_safe(p, end, info->dir_pin, bad);
167		} else {
168			info->dir_pin = -ENODATA;
169		}
170
171		/* snapshot birth time, remains zero for v<=2 */
172		if (struct_v >= 3) {
173			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
174			ceph_decode_copy(p, &info->snap_btime,
175					 sizeof(info->snap_btime));
176		} else {
177			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
178		}
179
180		/* snapshot count, remains zero for v<=3 */
181		if (struct_v >= 4) {
182			ceph_decode_64_safe(p, end, info->rsnaps, bad);
183		} else {
184			info->rsnaps = 0;
185		}
186
187		*p = end;
188	} else {
189		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
190			ceph_decode_64_safe(p, end, info->inline_version, bad);
191			ceph_decode_32_safe(p, end, info->inline_len, bad);
192			ceph_decode_need(p, end, info->inline_len, bad);
193			info->inline_data = *p;
194			*p += info->inline_len;
195		} else
196			info->inline_version = CEPH_INLINE_NONE;
197
198		if (features & CEPH_FEATURE_MDS_QUOTA) {
199			err = parse_reply_info_quota(p, end, info);
200			if (err < 0)
201				goto out_bad;
202		} else {
203			info->max_bytes = 0;
204			info->max_files = 0;
205		}
206
207		info->pool_ns_len = 0;
208		info->pool_ns_data = NULL;
209		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
210			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
211			if (info->pool_ns_len > 0) {
212				ceph_decode_need(p, end, info->pool_ns_len, bad);
213				info->pool_ns_data = *p;
214				*p += info->pool_ns_len;
215			}
216		}
217
218		if (features & CEPH_FEATURE_FS_BTIME) {
219			ceph_decode_need(p, end, sizeof(info->btime), bad);
220			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
221			ceph_decode_64_safe(p, end, info->change_attr, bad);
222		}
223
224		info->dir_pin = -ENODATA;
225		/* info->snap_btime and info->rsnaps remain zero */
226	}
227	return 0;
228bad:
229	err = -EIO;
230out_bad:
231	return err;
232}
233
234static int parse_reply_info_dir(void **p, void *end,
235				struct ceph_mds_reply_dirfrag **dirfrag,
236				u64 features)
237{
238	if (features == (u64)-1) {
239		u8 struct_v, struct_compat;
240		u32 struct_len;
241		ceph_decode_8_safe(p, end, struct_v, bad);
242		ceph_decode_8_safe(p, end, struct_compat, bad);
243		/* struct_v is expected to be >= 1. we only understand
244		 * encoding whose struct_compat == 1. */
245		if (!struct_v || struct_compat != 1)
246			goto bad;
247		ceph_decode_32_safe(p, end, struct_len, bad);
248		ceph_decode_need(p, end, struct_len, bad);
249		end = *p + struct_len;
250	}
251
252	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
253	*dirfrag = *p;
254	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
255	if (unlikely(*p > end))
256		goto bad;
257	if (features == (u64)-1)
258		*p = end;
259	return 0;
260bad:
261	return -EIO;
262}
263
264static int parse_reply_info_lease(void **p, void *end,
265				  struct ceph_mds_reply_lease **lease,
266				  u64 features)
267{
268	if (features == (u64)-1) {
269		u8 struct_v, struct_compat;
270		u32 struct_len;
271		ceph_decode_8_safe(p, end, struct_v, bad);
272		ceph_decode_8_safe(p, end, struct_compat, bad);
273		/* struct_v is expected to be >= 1. we only understand
274		 * encoding whose struct_compat == 1. */
275		if (!struct_v || struct_compat != 1)
276			goto bad;
277		ceph_decode_32_safe(p, end, struct_len, bad);
278		ceph_decode_need(p, end, struct_len, bad);
279		end = *p + struct_len;
280	}
281
282	ceph_decode_need(p, end, sizeof(**lease), bad);
283	*lease = *p;
284	*p += sizeof(**lease);
285	if (features == (u64)-1)
286		*p = end;
287	return 0;
288bad:
289	return -EIO;
290}
291
292/*
293 * parse a normal reply, which may contain a (dir+)dentry and/or a
294 * target inode.
295 */
296static int parse_reply_info_trace(void **p, void *end,
297				  struct ceph_mds_reply_info_parsed *info,
298				  u64 features)
299{
300	int err;
301
302	if (info->head->is_dentry) {
303		err = parse_reply_info_in(p, end, &info->diri, features);
304		if (err < 0)
305			goto out_bad;
306
307		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
308		if (err < 0)
309			goto out_bad;
310
311		ceph_decode_32_safe(p, end, info->dname_len, bad);
312		ceph_decode_need(p, end, info->dname_len, bad);
313		info->dname = *p;
314		*p += info->dname_len;
315
316		err = parse_reply_info_lease(p, end, &info->dlease, features);
317		if (err < 0)
318			goto out_bad;
319	}
320
321	if (info->head->is_target) {
322		err = parse_reply_info_in(p, end, &info->targeti, features);
323		if (err < 0)
324			goto out_bad;
325	}
326
327	if (unlikely(*p != end))
328		goto bad;
329	return 0;
330
331bad:
332	err = -EIO;
333out_bad:
334	pr_err("problem parsing mds trace %d\n", err);
335	return err;
336}
337
338/*
339 * parse readdir results
340 */
341static int parse_reply_info_readdir(void **p, void *end,
342				struct ceph_mds_reply_info_parsed *info,
343				u64 features)
344{
345	u32 num, i = 0;
346	int err;
347
348	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
349	if (err < 0)
350		goto out_bad;
351
352	ceph_decode_need(p, end, sizeof(num) + 2, bad);
353	num = ceph_decode_32(p);
354	{
355		u16 flags = ceph_decode_16(p);
356		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
357		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
358		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
359		info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
360	}
361	if (num == 0)
362		goto done;
363
364	BUG_ON(!info->dir_entries);
365	if ((unsigned long)(info->dir_entries + num) >
366	    (unsigned long)info->dir_entries + info->dir_buf_size) {
367		pr_err("dir contents are larger than expected\n");
368		WARN_ON(1);
369		goto bad;
370	}
371
372	info->dir_nr = num;
373	while (num) {
374		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
375		/* dentry */
376		ceph_decode_32_safe(p, end, rde->name_len, bad);
377		ceph_decode_need(p, end, rde->name_len, bad);
378		rde->name = *p;
379		*p += rde->name_len;
380		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
381
382		/* dentry lease */
383		err = parse_reply_info_lease(p, end, &rde->lease, features);
384		if (err)
385			goto out_bad;
386		/* inode */
387		err = parse_reply_info_in(p, end, &rde->inode, features);
388		if (err < 0)
389			goto out_bad;
390		/* ceph_readdir_prepopulate() will update it */
391		rde->offset = 0;
392		i++;
393		num--;
394	}
395
396done:
397	/* Skip over any unrecognized fields */
398	*p = end;
399	return 0;
400
401bad:
402	err = -EIO;
403out_bad:
404	pr_err("problem parsing dir contents %d\n", err);
405	return err;
406}
407
408/*
409 * parse fcntl F_GETLK results
410 */
411static int parse_reply_info_filelock(void **p, void *end,
412				     struct ceph_mds_reply_info_parsed *info,
413				     u64 features)
414{
415	if (*p + sizeof(*info->filelock_reply) > end)
416		goto bad;
417
418	info->filelock_reply = *p;
419
420	/* Skip over any unrecognized fields */
421	*p = end;
422	return 0;
423bad:
424	return -EIO;
425}
426
427
428#if BITS_PER_LONG == 64
429
430#define DELEGATED_INO_AVAILABLE		xa_mk_value(1)
431
432static int ceph_parse_deleg_inos(void **p, void *end,
433				 struct ceph_mds_session *s)
434{
435	u32 sets;
436
437	ceph_decode_32_safe(p, end, sets, bad);
438	dout("got %u sets of delegated inodes\n", sets);
439	while (sets--) {
440		u64 start, len, ino;
441
442		ceph_decode_64_safe(p, end, start, bad);
443		ceph_decode_64_safe(p, end, len, bad);
444
445		/* Don't accept a delegation of system inodes */
446		if (start < CEPH_INO_SYSTEM_BASE) {
447			pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
448					start, len);
449			continue;
450		}
451		while (len--) {
452			int err = xa_insert(&s->s_delegated_inos, ino = start++,
453					    DELEGATED_INO_AVAILABLE,
454					    GFP_KERNEL);
455			if (!err) {
456				dout("added delegated inode 0x%llx\n",
457				     start - 1);
458			} else if (err == -EBUSY) {
459				pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
460					start - 1);
461			} else {
462				return err;
463			}
464		}
465	}
466	return 0;
467bad:
468	return -EIO;
469}
470
471u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
472{
473	unsigned long ino;
474	void *val;
475
476	xa_for_each(&s->s_delegated_inos, ino, val) {
477		val = xa_erase(&s->s_delegated_inos, ino);
478		if (val == DELEGATED_INO_AVAILABLE)
479			return ino;
480	}
481	return 0;
482}
483
484int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
485{
486	return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
487			 GFP_KERNEL);
488}
489#else /* BITS_PER_LONG == 64 */
490/*
491 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
492 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
493 * and bottom words?
494 */
495static int ceph_parse_deleg_inos(void **p, void *end,
496				 struct ceph_mds_session *s)
497{
498	u32 sets;
499
500	ceph_decode_32_safe(p, end, sets, bad);
501	if (sets)
502		ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
503	return 0;
504bad:
505	return -EIO;
506}
507
508u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
509{
510	return 0;
511}
512
513int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
514{
515	return 0;
516}
517#endif /* BITS_PER_LONG == 64 */
518
519/*
520 * parse create results
521 */
522static int parse_reply_info_create(void **p, void *end,
523				  struct ceph_mds_reply_info_parsed *info,
524				  u64 features, struct ceph_mds_session *s)
525{
526	int ret;
527
528	if (features == (u64)-1 ||
529	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
530		if (*p == end) {
531			/* Malformed reply? */
532			info->has_create_ino = false;
533		} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
534			info->has_create_ino = true;
535			/* struct_v, struct_compat, and len */
536			ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
537			ceph_decode_64_safe(p, end, info->ino, bad);
538			ret = ceph_parse_deleg_inos(p, end, s);
539			if (ret)
540				return ret;
541		} else {
542			/* legacy */
543			ceph_decode_64_safe(p, end, info->ino, bad);
544			info->has_create_ino = true;
545		}
546	} else {
547		if (*p != end)
548			goto bad;
549	}
550
551	/* Skip over any unrecognized fields */
552	*p = end;
553	return 0;
554bad:
555	return -EIO;
556}
557
558/*
559 * parse extra results
560 */
561static int parse_reply_info_extra(void **p, void *end,
562				  struct ceph_mds_reply_info_parsed *info,
563				  u64 features, struct ceph_mds_session *s)
564{
565	u32 op = le32_to_cpu(info->head->op);
566
567	if (op == CEPH_MDS_OP_GETFILELOCK)
568		return parse_reply_info_filelock(p, end, info, features);
569	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
570		return parse_reply_info_readdir(p, end, info, features);
571	else if (op == CEPH_MDS_OP_CREATE)
572		return parse_reply_info_create(p, end, info, features, s);
573	else
574		return -EIO;
575}
576
577/*
578 * parse entire mds reply
579 */
580static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
581			    struct ceph_mds_reply_info_parsed *info,
582			    u64 features)
583{
584	void *p, *end;
585	u32 len;
586	int err;
587
588	info->head = msg->front.iov_base;
589	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
590	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
591
592	/* trace */
593	ceph_decode_32_safe(&p, end, len, bad);
594	if (len > 0) {
595		ceph_decode_need(&p, end, len, bad);
596		err = parse_reply_info_trace(&p, p+len, info, features);
597		if (err < 0)
598			goto out_bad;
599	}
600
601	/* extra */
602	ceph_decode_32_safe(&p, end, len, bad);
603	if (len > 0) {
604		ceph_decode_need(&p, end, len, bad);
605		err = parse_reply_info_extra(&p, p+len, info, features, s);
606		if (err < 0)
607			goto out_bad;
608	}
609
610	/* snap blob */
611	ceph_decode_32_safe(&p, end, len, bad);
612	info->snapblob_len = len;
613	info->snapblob = p;
614	p += len;
615
616	if (p != end)
617		goto bad;
618	return 0;
619
620bad:
621	err = -EIO;
622out_bad:
623	pr_err("mds parse_reply err %d\n", err);
624	return err;
625}
626
627static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
628{
629	if (!info->dir_entries)
630		return;
631	free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
632}
633
634
635/*
636 * sessions
637 */
638const char *ceph_session_state_name(int s)
639{
640	switch (s) {
641	case CEPH_MDS_SESSION_NEW: return "new";
642	case CEPH_MDS_SESSION_OPENING: return "opening";
643	case CEPH_MDS_SESSION_OPEN: return "open";
644	case CEPH_MDS_SESSION_HUNG: return "hung";
645	case CEPH_MDS_SESSION_CLOSING: return "closing";
646	case CEPH_MDS_SESSION_CLOSED: return "closed";
647	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
648	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
649	case CEPH_MDS_SESSION_REJECTED: return "rejected";
650	default: return "???";
651	}
652}
653
654struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
655{
656	if (refcount_inc_not_zero(&s->s_ref)) {
657		dout("mdsc get_session %p %d -> %d\n", s,
658		     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
659		return s;
660	} else {
661		dout("mdsc get_session %p 0 -- FAIL\n", s);
662		return NULL;
663	}
664}
665
666void ceph_put_mds_session(struct ceph_mds_session *s)
667{
668	if (IS_ERR_OR_NULL(s))
669		return;
670
671	dout("mdsc put_session %p %d -> %d\n", s,
672	     refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
673	if (refcount_dec_and_test(&s->s_ref)) {
674		if (s->s_auth.authorizer)
675			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
676		WARN_ON(mutex_is_locked(&s->s_mutex));
677		xa_destroy(&s->s_delegated_inos);
678		kfree(s);
679	}
680}
681
682/*
683 * called under mdsc->mutex
684 */
685struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
686						   int mds)
687{
688	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
689		return NULL;
690	return ceph_get_mds_session(mdsc->sessions[mds]);
691}
692
693static bool __have_session(struct ceph_mds_client *mdsc, int mds)
694{
695	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
696		return false;
697	else
698		return true;
699}
700
701static int __verify_registered_session(struct ceph_mds_client *mdsc,
702				       struct ceph_mds_session *s)
703{
704	if (s->s_mds >= mdsc->max_sessions ||
705	    mdsc->sessions[s->s_mds] != s)
706		return -ENOENT;
707	return 0;
708}
709
710/*
711 * create+register a new session for given mds.
712 * called under mdsc->mutex.
713 */
714static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
715						 int mds)
716{
717	struct ceph_mds_session *s;
718
719	if (mds >= mdsc->mdsmap->possible_max_rank)
720		return ERR_PTR(-EINVAL);
721
722	s = kzalloc(sizeof(*s), GFP_NOFS);
723	if (!s)
724		return ERR_PTR(-ENOMEM);
725
726	if (mds >= mdsc->max_sessions) {
727		int newmax = 1 << get_count_order(mds + 1);
728		struct ceph_mds_session **sa;
729
730		dout("%s: realloc to %d\n", __func__, newmax);
731		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
732		if (!sa)
733			goto fail_realloc;
734		if (mdsc->sessions) {
735			memcpy(sa, mdsc->sessions,
736			       mdsc->max_sessions * sizeof(void *));
737			kfree(mdsc->sessions);
738		}
739		mdsc->sessions = sa;
740		mdsc->max_sessions = newmax;
741	}
742
743	dout("%s: mds%d\n", __func__, mds);
744	s->s_mdsc = mdsc;
745	s->s_mds = mds;
746	s->s_state = CEPH_MDS_SESSION_NEW;
747	mutex_init(&s->s_mutex);
748
749	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
750
751	atomic_set(&s->s_cap_gen, 1);
752	s->s_cap_ttl = jiffies - 1;
753
754	spin_lock_init(&s->s_cap_lock);
755	INIT_LIST_HEAD(&s->s_caps);
756	refcount_set(&s->s_ref, 1);
757	INIT_LIST_HEAD(&s->s_waiting);
758	INIT_LIST_HEAD(&s->s_unsafe);
759	xa_init(&s->s_delegated_inos);
760	INIT_LIST_HEAD(&s->s_cap_releases);
761	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
762
763	INIT_LIST_HEAD(&s->s_cap_dirty);
764	INIT_LIST_HEAD(&s->s_cap_flushing);
765
766	mdsc->sessions[mds] = s;
767	atomic_inc(&mdsc->num_sessions);
768	refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
769
770	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
771		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
772
773	return s;
774
775fail_realloc:
776	kfree(s);
777	return ERR_PTR(-ENOMEM);
778}
779
780/*
781 * called under mdsc->mutex
782 */
783static void __unregister_session(struct ceph_mds_client *mdsc,
784			       struct ceph_mds_session *s)
785{
786	dout("__unregister_session mds%d %p\n", s->s_mds, s);
787	BUG_ON(mdsc->sessions[s->s_mds] != s);
788	mdsc->sessions[s->s_mds] = NULL;
789	ceph_con_close(&s->s_con);
790	ceph_put_mds_session(s);
791	atomic_dec(&mdsc->num_sessions);
792}
793
794/*
795 * drop session refs in request.
796 *
797 * should be last request ref, or hold mdsc->mutex
798 */
799static void put_request_session(struct ceph_mds_request *req)
800{
801	if (req->r_session) {
802		ceph_put_mds_session(req->r_session);
803		req->r_session = NULL;
804	}
805}
806
807void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
808				void (*cb)(struct ceph_mds_session *),
809				bool check_state)
810{
811	int mds;
812
813	mutex_lock(&mdsc->mutex);
814	for (mds = 0; mds < mdsc->max_sessions; ++mds) {
815		struct ceph_mds_session *s;
816
817		s = __ceph_lookup_mds_session(mdsc, mds);
818		if (!s)
819			continue;
820
821		if (check_state && !check_session_state(s)) {
822			ceph_put_mds_session(s);
823			continue;
824		}
825
826		mutex_unlock(&mdsc->mutex);
827		cb(s);
828		ceph_put_mds_session(s);
829		mutex_lock(&mdsc->mutex);
830	}
831	mutex_unlock(&mdsc->mutex);
832}
833
834void ceph_mdsc_release_request(struct kref *kref)
835{
836	struct ceph_mds_request *req = container_of(kref,
837						    struct ceph_mds_request,
838						    r_kref);
839	ceph_mdsc_release_dir_caps_no_check(req);
840	destroy_reply_info(&req->r_reply_info);
841	if (req->r_request)
842		ceph_msg_put(req->r_request);
843	if (req->r_reply)
844		ceph_msg_put(req->r_reply);
845	if (req->r_inode) {
846		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
847		iput(req->r_inode);
848	}
849	if (req->r_parent) {
850		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
851		iput(req->r_parent);
852	}
853	iput(req->r_target_inode);
854	if (req->r_dentry)
855		dput(req->r_dentry);
856	if (req->r_old_dentry)
857		dput(req->r_old_dentry);
858	if (req->r_old_dentry_dir) {
859		/*
860		 * track (and drop pins for) r_old_dentry_dir
861		 * separately, since r_old_dentry's d_parent may have
862		 * changed between the dir mutex being dropped and
863		 * this request being freed.
864		 */
865		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
866				  CEPH_CAP_PIN);
867		iput(req->r_old_dentry_dir);
868	}
869	kfree(req->r_path1);
870	kfree(req->r_path2);
871	put_cred(req->r_cred);
872	if (req->r_pagelist)
873		ceph_pagelist_release(req->r_pagelist);
874	put_request_session(req);
875	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
876	WARN_ON_ONCE(!list_empty(&req->r_wait));
877	kmem_cache_free(ceph_mds_request_cachep, req);
878}
879
880DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
881
882/*
883 * lookup session, bump ref if found.
884 *
885 * called under mdsc->mutex.
886 */
887static struct ceph_mds_request *
888lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
889{
890	struct ceph_mds_request *req;
891
892	req = lookup_request(&mdsc->request_tree, tid);
893	if (req)
894		ceph_mdsc_get_request(req);
895
896	return req;
897}
898
899/*
900 * Register an in-flight request, and assign a tid.  Link to directory
901 * are modifying (if any).
902 *
903 * Called under mdsc->mutex.
904 */
905static void __register_request(struct ceph_mds_client *mdsc,
906			       struct ceph_mds_request *req,
907			       struct inode *dir)
908{
909	int ret = 0;
910
911	req->r_tid = ++mdsc->last_tid;
912	if (req->r_num_caps) {
913		ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
914					req->r_num_caps);
915		if (ret < 0) {
916			pr_err("__register_request %p "
917			       "failed to reserve caps: %d\n", req, ret);
918			/* set req->r_err to fail early from __do_request */
919			req->r_err = ret;
920			return;
921		}
922	}
923	dout("__register_request %p tid %lld\n", req, req->r_tid);
924	ceph_mdsc_get_request(req);
925	insert_request(&mdsc->request_tree, req);
926
927	req->r_cred = get_current_cred();
928
929	if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
930		mdsc->oldest_tid = req->r_tid;
931
932	if (dir) {
933		struct ceph_inode_info *ci = ceph_inode(dir);
934
935		ihold(dir);
936		req->r_unsafe_dir = dir;
937		spin_lock(&ci->i_unsafe_lock);
938		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
939		spin_unlock(&ci->i_unsafe_lock);
940	}
941}
942
943static void __unregister_request(struct ceph_mds_client *mdsc,
944				 struct ceph_mds_request *req)
945{
946	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
947
948	/* Never leave an unregistered request on an unsafe list! */
949	list_del_init(&req->r_unsafe_item);
950
951	if (req->r_tid == mdsc->oldest_tid) {
952		struct rb_node *p = rb_next(&req->r_node);
953		mdsc->oldest_tid = 0;
954		while (p) {
955			struct ceph_mds_request *next_req =
956				rb_entry(p, struct ceph_mds_request, r_node);
957			if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
958				mdsc->oldest_tid = next_req->r_tid;
959				break;
960			}
961			p = rb_next(p);
962		}
963	}
964
965	erase_request(&mdsc->request_tree, req);
966
967	if (req->r_unsafe_dir) {
968		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
969		spin_lock(&ci->i_unsafe_lock);
970		list_del_init(&req->r_unsafe_dir_item);
971		spin_unlock(&ci->i_unsafe_lock);
972	}
973	if (req->r_target_inode &&
974	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
975		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
976		spin_lock(&ci->i_unsafe_lock);
977		list_del_init(&req->r_unsafe_target_item);
978		spin_unlock(&ci->i_unsafe_lock);
979	}
980
981	if (req->r_unsafe_dir) {
982		iput(req->r_unsafe_dir);
983		req->r_unsafe_dir = NULL;
984	}
985
986	complete_all(&req->r_safe_completion);
987
988	ceph_mdsc_put_request(req);
989}
990
991/*
992 * Walk back up the dentry tree until we hit a dentry representing a
993 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
994 * when calling this) to ensure that the objects won't disappear while we're
995 * working with them. Once we hit a candidate dentry, we attempt to take a
996 * reference to it, and return that as the result.
997 */
998static struct inode *get_nonsnap_parent(struct dentry *dentry)
999{
1000	struct inode *inode = NULL;
1001
1002	while (dentry && !IS_ROOT(dentry)) {
1003		inode = d_inode_rcu(dentry);
1004		if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1005			break;
1006		dentry = dentry->d_parent;
1007	}
1008	if (inode)
1009		inode = igrab(inode);
1010	return inode;
1011}
1012
1013/*
1014 * Choose mds to send request to next.  If there is a hint set in the
1015 * request (e.g., due to a prior forward hint from the mds), use that.
1016 * Otherwise, consult frag tree and/or caps to identify the
1017 * appropriate mds.  If all else fails, choose randomly.
1018 *
1019 * Called under mdsc->mutex.
1020 */
1021static int __choose_mds(struct ceph_mds_client *mdsc,
1022			struct ceph_mds_request *req,
1023			bool *random)
1024{
1025	struct inode *inode;
1026	struct ceph_inode_info *ci;
1027	struct ceph_cap *cap;
1028	int mode = req->r_direct_mode;
1029	int mds = -1;
1030	u32 hash = req->r_direct_hash;
1031	bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1032
1033	if (random)
1034		*random = false;
1035
1036	/*
1037	 * is there a specific mds we should try?  ignore hint if we have
1038	 * no session and the mds is not up (active or recovering).
1039	 */
1040	if (req->r_resend_mds >= 0 &&
1041	    (__have_session(mdsc, req->r_resend_mds) ||
1042	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1043		dout("%s using resend_mds mds%d\n", __func__,
1044		     req->r_resend_mds);
1045		return req->r_resend_mds;
1046	}
1047
1048	if (mode == USE_RANDOM_MDS)
1049		goto random;
1050
1051	inode = NULL;
1052	if (req->r_inode) {
1053		if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1054			inode = req->r_inode;
1055			ihold(inode);
1056		} else {
1057			/* req->r_dentry is non-null for LSSNAP request */
1058			rcu_read_lock();
1059			inode = get_nonsnap_parent(req->r_dentry);
1060			rcu_read_unlock();
1061			dout("%s using snapdir's parent %p\n", __func__, inode);
1062		}
1063	} else if (req->r_dentry) {
1064		/* ignore race with rename; old or new d_parent is okay */
1065		struct dentry *parent;
1066		struct inode *dir;
1067
1068		rcu_read_lock();
1069		parent = READ_ONCE(req->r_dentry->d_parent);
1070		dir = req->r_parent ? : d_inode_rcu(parent);
1071
1072		if (!dir || dir->i_sb != mdsc->fsc->sb) {
1073			/*  not this fs or parent went negative */
1074			inode = d_inode(req->r_dentry);
1075			if (inode)
1076				ihold(inode);
1077		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
1078			/* direct snapped/virtual snapdir requests
1079			 * based on parent dir inode */
1080			inode = get_nonsnap_parent(parent);
1081			dout("%s using nonsnap parent %p\n", __func__, inode);
1082		} else {
1083			/* dentry target */
1084			inode = d_inode(req->r_dentry);
1085			if (!inode || mode == USE_AUTH_MDS) {
1086				/* dir + name */
1087				inode = igrab(dir);
1088				hash = ceph_dentry_hash(dir, req->r_dentry);
1089				is_hash = true;
1090			} else {
1091				ihold(inode);
1092			}
1093		}
1094		rcu_read_unlock();
1095	}
1096
1097	dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1098	     hash, mode);
1099	if (!inode)
1100		goto random;
1101	ci = ceph_inode(inode);
1102
1103	if (is_hash && S_ISDIR(inode->i_mode)) {
1104		struct ceph_inode_frag frag;
1105		int found;
1106
1107		ceph_choose_frag(ci, hash, &frag, &found);
1108		if (found) {
1109			if (mode == USE_ANY_MDS && frag.ndist > 0) {
1110				u8 r;
1111
1112				/* choose a random replica */
1113				get_random_bytes(&r, 1);
1114				r %= frag.ndist;
1115				mds = frag.dist[r];
1116				dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1117				     __func__, inode, ceph_vinop(inode),
1118				     frag.frag, mds, (int)r, frag.ndist);
1119				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1120				    CEPH_MDS_STATE_ACTIVE &&
1121				    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1122					goto out;
1123			}
1124
1125			/* since this file/dir wasn't known to be
1126			 * replicated, then we want to look for the
1127			 * authoritative mds. */
1128			if (frag.mds >= 0) {
1129				/* choose auth mds */
1130				mds = frag.mds;
1131				dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1132				     __func__, inode, ceph_vinop(inode),
1133				     frag.frag, mds);
1134				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1135				    CEPH_MDS_STATE_ACTIVE) {
1136					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1137								  mds))
1138						goto out;
1139				}
1140			}
1141			mode = USE_AUTH_MDS;
1142		}
1143	}
1144
1145	spin_lock(&ci->i_ceph_lock);
1146	cap = NULL;
1147	if (mode == USE_AUTH_MDS)
1148		cap = ci->i_auth_cap;
1149	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1150		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1151	if (!cap) {
1152		spin_unlock(&ci->i_ceph_lock);
1153		iput(inode);
1154		goto random;
1155	}
1156	mds = cap->session->s_mds;
1157	dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1158	     inode, ceph_vinop(inode), mds,
1159	     cap == ci->i_auth_cap ? "auth " : "", cap);
1160	spin_unlock(&ci->i_ceph_lock);
1161out:
1162	iput(inode);
1163	return mds;
1164
1165random:
1166	if (random)
1167		*random = true;
1168
1169	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1170	dout("%s chose random mds%d\n", __func__, mds);
1171	return mds;
1172}
1173
1174
1175/*
1176 * session messages
1177 */
1178struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1179{
1180	struct ceph_msg *msg;
1181	struct ceph_mds_session_head *h;
1182
1183	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1184			   false);
1185	if (!msg) {
1186		pr_err("ENOMEM creating session %s msg\n",
1187		       ceph_session_op_name(op));
1188		return NULL;
1189	}
1190	h = msg->front.iov_base;
1191	h->op = cpu_to_le32(op);
1192	h->seq = cpu_to_le64(seq);
1193
1194	return msg;
1195}
1196
1197static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1198#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1199static int encode_supported_features(void **p, void *end)
1200{
1201	static const size_t count = ARRAY_SIZE(feature_bits);
1202
1203	if (count > 0) {
1204		size_t i;
1205		size_t size = FEATURE_BYTES(count);
1206
1207		if (WARN_ON_ONCE(*p + 4 + size > end))
1208			return -ERANGE;
1209
1210		ceph_encode_32(p, size);
1211		memset(*p, 0, size);
1212		for (i = 0; i < count; i++)
1213			((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
1214		*p += size;
1215	} else {
1216		if (WARN_ON_ONCE(*p + 4 > end))
1217			return -ERANGE;
1218
1219		ceph_encode_32(p, 0);
1220	}
1221
1222	return 0;
1223}
1224
1225static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1226#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1227static int encode_metric_spec(void **p, void *end)
1228{
1229	static const size_t count = ARRAY_SIZE(metric_bits);
1230
1231	/* header */
1232	if (WARN_ON_ONCE(*p + 2 > end))
1233		return -ERANGE;
1234
1235	ceph_encode_8(p, 1); /* version */
1236	ceph_encode_8(p, 1); /* compat */
1237
1238	if (count > 0) {
1239		size_t i;
1240		size_t size = METRIC_BYTES(count);
1241
1242		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1243			return -ERANGE;
1244
1245		/* metric spec info length */
1246		ceph_encode_32(p, 4 + size);
1247
1248		/* metric spec */
1249		ceph_encode_32(p, size);
1250		memset(*p, 0, size);
1251		for (i = 0; i < count; i++)
1252			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1253		*p += size;
1254	} else {
1255		if (WARN_ON_ONCE(*p + 4 + 4 > end))
1256			return -ERANGE;
1257
1258		/* metric spec info length */
1259		ceph_encode_32(p, 4);
1260		/* metric spec */
1261		ceph_encode_32(p, 0);
1262	}
1263
1264	return 0;
1265}
1266
1267/*
1268 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1269 * to include additional client metadata fields.
1270 */
1271static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1272{
1273	struct ceph_msg *msg;
1274	struct ceph_mds_session_head *h;
1275	int i;
1276	int extra_bytes = 0;
1277	int metadata_key_count = 0;
1278	struct ceph_options *opt = mdsc->fsc->client->options;
1279	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1280	size_t size, count;
1281	void *p, *end;
1282	int ret;
1283
1284	const char* metadata[][2] = {
1285		{"hostname", mdsc->nodename},
1286		{"kernel_version", init_utsname()->release},
1287		{"entity_id", opt->name ? : ""},
1288		{"root", fsopt->server_path ? : "/"},
1289		{NULL, NULL}
1290	};
1291
1292	/* Calculate serialized length of metadata */
1293	extra_bytes = 4;  /* map length */
1294	for (i = 0; metadata[i][0]; ++i) {
1295		extra_bytes += 8 + strlen(metadata[i][0]) +
1296			strlen(metadata[i][1]);
1297		metadata_key_count++;
1298	}
1299
1300	/* supported feature */
1301	size = 0;
1302	count = ARRAY_SIZE(feature_bits);
1303	if (count > 0)
1304		size = FEATURE_BYTES(count);
1305	extra_bytes += 4 + size;
1306
1307	/* metric spec */
1308	size = 0;
1309	count = ARRAY_SIZE(metric_bits);
1310	if (count > 0)
1311		size = METRIC_BYTES(count);
1312	extra_bytes += 2 + 4 + 4 + size;
1313
1314	/* Allocate the message */
1315	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1316			   GFP_NOFS, false);
1317	if (!msg) {
1318		pr_err("ENOMEM creating session open msg\n");
1319		return ERR_PTR(-ENOMEM);
1320	}
1321	p = msg->front.iov_base;
1322	end = p + msg->front.iov_len;
1323
1324	h = p;
1325	h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1326	h->seq = cpu_to_le64(seq);
1327
1328	/*
1329	 * Serialize client metadata into waiting buffer space, using
1330	 * the format that userspace expects for map<string, string>
1331	 *
1332	 * ClientSession messages with metadata are v4
1333	 */
1334	msg->hdr.version = cpu_to_le16(4);
1335	msg->hdr.compat_version = cpu_to_le16(1);
1336
1337	/* The write pointer, following the session_head structure */
1338	p += sizeof(*h);
1339
1340	/* Number of entries in the map */
1341	ceph_encode_32(&p, metadata_key_count);
1342
1343	/* Two length-prefixed strings for each entry in the map */
1344	for (i = 0; metadata[i][0]; ++i) {
1345		size_t const key_len = strlen(metadata[i][0]);
1346		size_t const val_len = strlen(metadata[i][1]);
1347
1348		ceph_encode_32(&p, key_len);
1349		memcpy(p, metadata[i][0], key_len);
1350		p += key_len;
1351		ceph_encode_32(&p, val_len);
1352		memcpy(p, metadata[i][1], val_len);
1353		p += val_len;
1354	}
1355
1356	ret = encode_supported_features(&p, end);
1357	if (ret) {
1358		pr_err("encode_supported_features failed!\n");
1359		ceph_msg_put(msg);
1360		return ERR_PTR(ret);
1361	}
1362
1363	ret = encode_metric_spec(&p, end);
1364	if (ret) {
1365		pr_err("encode_metric_spec failed!\n");
1366		ceph_msg_put(msg);
1367		return ERR_PTR(ret);
1368	}
1369
1370	msg->front.iov_len = p - msg->front.iov_base;
1371	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1372
1373	return msg;
1374}
1375
1376/*
1377 * send session open request.
1378 *
1379 * called under mdsc->mutex
1380 */
1381static int __open_session(struct ceph_mds_client *mdsc,
1382			  struct ceph_mds_session *session)
1383{
1384	struct ceph_msg *msg;
1385	int mstate;
1386	int mds = session->s_mds;
1387
1388	/* wait for mds to go active? */
1389	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1390	dout("open_session to mds%d (%s)\n", mds,
1391	     ceph_mds_state_name(mstate));
1392	session->s_state = CEPH_MDS_SESSION_OPENING;
1393	session->s_renew_requested = jiffies;
1394
1395	/* send connect message */
1396	msg = create_session_open_msg(mdsc, session->s_seq);
1397	if (IS_ERR(msg))
1398		return PTR_ERR(msg);
1399	ceph_con_send(&session->s_con, msg);
1400	return 0;
1401}
1402
1403/*
1404 * open sessions for any export targets for the given mds
1405 *
1406 * called under mdsc->mutex
1407 */
1408static struct ceph_mds_session *
1409__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1410{
1411	struct ceph_mds_session *session;
1412	int ret;
1413
1414	session = __ceph_lookup_mds_session(mdsc, target);
1415	if (!session) {
1416		session = register_session(mdsc, target);
1417		if (IS_ERR(session))
1418			return session;
1419	}
1420	if (session->s_state == CEPH_MDS_SESSION_NEW ||
1421	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
1422		ret = __open_session(mdsc, session);
1423		if (ret)
1424			return ERR_PTR(ret);
1425	}
1426
1427	return session;
1428}
1429
1430struct ceph_mds_session *
1431ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1432{
1433	struct ceph_mds_session *session;
1434
1435	dout("open_export_target_session to mds%d\n", target);
1436
1437	mutex_lock(&mdsc->mutex);
1438	session = __open_export_target_session(mdsc, target);
1439	mutex_unlock(&mdsc->mutex);
1440
1441	return session;
1442}
1443
1444static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1445					  struct ceph_mds_session *session)
1446{
1447	struct ceph_mds_info *mi;
1448	struct ceph_mds_session *ts;
1449	int i, mds = session->s_mds;
1450
1451	if (mds >= mdsc->mdsmap->possible_max_rank)
1452		return;
1453
1454	mi = &mdsc->mdsmap->m_info[mds];
1455	dout("open_export_target_sessions for mds%d (%d targets)\n",
1456	     session->s_mds, mi->num_export_targets);
1457
1458	for (i = 0; i < mi->num_export_targets; i++) {
1459		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1460		ceph_put_mds_session(ts);
1461	}
1462}
1463
1464void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1465					   struct ceph_mds_session *session)
1466{
1467	mutex_lock(&mdsc->mutex);
1468	__open_export_target_sessions(mdsc, session);
1469	mutex_unlock(&mdsc->mutex);
1470}
1471
1472/*
1473 * session caps
1474 */
1475
1476static void detach_cap_releases(struct ceph_mds_session *session,
1477				struct list_head *target)
1478{
1479	lockdep_assert_held(&session->s_cap_lock);
1480
1481	list_splice_init(&session->s_cap_releases, target);
1482	session->s_num_cap_releases = 0;
1483	dout("dispose_cap_releases mds%d\n", session->s_mds);
1484}
1485
1486static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1487				 struct list_head *dispose)
1488{
1489	while (!list_empty(dispose)) {
1490		struct ceph_cap *cap;
1491		/* zero out the in-progress message */
1492		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1493		list_del(&cap->session_caps);
1494		ceph_put_cap(mdsc, cap);
1495	}
1496}
1497
1498static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1499				     struct ceph_mds_session *session)
1500{
1501	struct ceph_mds_request *req;
1502	struct rb_node *p;
1503	struct ceph_inode_info *ci;
1504
1505	dout("cleanup_session_requests mds%d\n", session->s_mds);
1506	mutex_lock(&mdsc->mutex);
1507	while (!list_empty(&session->s_unsafe)) {
1508		req = list_first_entry(&session->s_unsafe,
1509				       struct ceph_mds_request, r_unsafe_item);
1510		pr_warn_ratelimited(" dropping unsafe request %llu\n",
1511				    req->r_tid);
1512		if (req->r_target_inode) {
1513			/* dropping unsafe change of inode's attributes */
1514			ci = ceph_inode(req->r_target_inode);
1515			errseq_set(&ci->i_meta_err, -EIO);
1516		}
1517		if (req->r_unsafe_dir) {
1518			/* dropping unsafe directory operation */
1519			ci = ceph_inode(req->r_unsafe_dir);
1520			errseq_set(&ci->i_meta_err, -EIO);
1521		}
1522		__unregister_request(mdsc, req);
1523	}
1524	/* zero r_attempts, so kick_requests() will re-send requests */
1525	p = rb_first(&mdsc->request_tree);
1526	while (p) {
1527		req = rb_entry(p, struct ceph_mds_request, r_node);
1528		p = rb_next(p);
1529		if (req->r_session &&
1530		    req->r_session->s_mds == session->s_mds)
1531			req->r_attempts = 0;
1532	}
1533	mutex_unlock(&mdsc->mutex);
1534}
1535
1536/*
1537 * Helper to safely iterate over all caps associated with a session, with
1538 * special care taken to handle a racing __ceph_remove_cap().
1539 *
1540 * Caller must hold session s_mutex.
1541 */
1542int ceph_iterate_session_caps(struct ceph_mds_session *session,
1543			      int (*cb)(struct inode *, struct ceph_cap *,
1544					void *), void *arg)
1545{
1546	struct list_head *p;
1547	struct ceph_cap *cap;
1548	struct inode *inode, *last_inode = NULL;
1549	struct ceph_cap *old_cap = NULL;
1550	int ret;
1551
1552	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1553	spin_lock(&session->s_cap_lock);
1554	p = session->s_caps.next;
1555	while (p != &session->s_caps) {
1556		cap = list_entry(p, struct ceph_cap, session_caps);
1557		inode = igrab(&cap->ci->vfs_inode);
1558		if (!inode) {
1559			p = p->next;
1560			continue;
1561		}
1562		session->s_cap_iterator = cap;
1563		spin_unlock(&session->s_cap_lock);
1564
1565		if (last_inode) {
1566			iput(last_inode);
1567			last_inode = NULL;
1568		}
1569		if (old_cap) {
1570			ceph_put_cap(session->s_mdsc, old_cap);
1571			old_cap = NULL;
1572		}
1573
1574		ret = cb(inode, cap, arg);
1575		last_inode = inode;
1576
1577		spin_lock(&session->s_cap_lock);
1578		p = p->next;
1579		if (!cap->ci) {
1580			dout("iterate_session_caps  finishing cap %p removal\n",
1581			     cap);
1582			BUG_ON(cap->session != session);
1583			cap->session = NULL;
1584			list_del_init(&cap->session_caps);
1585			session->s_nr_caps--;
1586			atomic64_dec(&session->s_mdsc->metric.total_caps);
1587			if (cap->queue_release)
1588				__ceph_queue_cap_release(session, cap);
1589			else
1590				old_cap = cap;  /* put_cap it w/o locks held */
1591		}
1592		if (ret < 0)
1593			goto out;
1594	}
1595	ret = 0;
1596out:
1597	session->s_cap_iterator = NULL;
1598	spin_unlock(&session->s_cap_lock);
1599
1600	iput(last_inode);
1601	if (old_cap)
1602		ceph_put_cap(session->s_mdsc, old_cap);
1603
1604	return ret;
1605}
1606
1607static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
1608{
1609	struct ceph_inode_info *ci = ceph_inode(inode);
1610	struct ceph_cap_snap *capsnap;
1611	int capsnap_release = 0;
1612
1613	lockdep_assert_held(&ci->i_ceph_lock);
1614
1615	dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
1616
1617	while (!list_empty(&ci->i_cap_snaps)) {
1618		capsnap = list_first_entry(&ci->i_cap_snaps,
1619					   struct ceph_cap_snap, ci_item);
1620		__ceph_remove_capsnap(inode, capsnap, NULL, NULL);
1621		ceph_put_snap_context(capsnap->context);
1622		ceph_put_cap_snap(capsnap);
1623		capsnap_release++;
1624	}
1625	wake_up_all(&ci->i_cap_wq);
1626	wake_up_all(&mdsc->cap_flushing_wq);
1627	return capsnap_release;
1628}
1629
1630static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1631				  void *arg)
1632{
1633	struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1634	struct ceph_mds_client *mdsc = fsc->mdsc;
1635	struct ceph_inode_info *ci = ceph_inode(inode);
1636	LIST_HEAD(to_remove);
1637	bool dirty_dropped = false;
1638	bool invalidate = false;
1639	int capsnap_release = 0;
1640
1641	dout("removing cap %p, ci is %p, inode is %p\n",
1642	     cap, ci, &ci->vfs_inode);
1643	spin_lock(&ci->i_ceph_lock);
1644	__ceph_remove_cap(cap, false);
1645	if (!ci->i_auth_cap) {
1646		struct ceph_cap_flush *cf;
1647
1648		if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
1649			if (inode->i_data.nrpages > 0)
1650				invalidate = true;
1651			if (ci->i_wrbuffer_ref > 0)
1652				mapping_set_error(&inode->i_data, -EIO);
1653		}
1654
1655		while (!list_empty(&ci->i_cap_flush_list)) {
1656			cf = list_first_entry(&ci->i_cap_flush_list,
1657					      struct ceph_cap_flush, i_list);
1658			list_move(&cf->i_list, &to_remove);
1659		}
1660
1661		spin_lock(&mdsc->cap_dirty_lock);
1662
1663		list_for_each_entry(cf, &to_remove, i_list)
1664			list_del_init(&cf->g_list);
1665
1666		if (!list_empty(&ci->i_dirty_item)) {
1667			pr_warn_ratelimited(
1668				" dropping dirty %s state for %p %lld\n",
1669				ceph_cap_string(ci->i_dirty_caps),
1670				inode, ceph_ino(inode));
1671			ci->i_dirty_caps = 0;
1672			list_del_init(&ci->i_dirty_item);
1673			dirty_dropped = true;
1674		}
1675		if (!list_empty(&ci->i_flushing_item)) {
1676			pr_warn_ratelimited(
1677				" dropping dirty+flushing %s state for %p %lld\n",
1678				ceph_cap_string(ci->i_flushing_caps),
1679				inode, ceph_ino(inode));
1680			ci->i_flushing_caps = 0;
1681			list_del_init(&ci->i_flushing_item);
1682			mdsc->num_cap_flushing--;
1683			dirty_dropped = true;
1684		}
1685		spin_unlock(&mdsc->cap_dirty_lock);
1686
1687		if (dirty_dropped) {
1688			errseq_set(&ci->i_meta_err, -EIO);
1689
1690			if (ci->i_wrbuffer_ref_head == 0 &&
1691			    ci->i_wr_ref == 0 &&
1692			    ci->i_dirty_caps == 0 &&
1693			    ci->i_flushing_caps == 0) {
1694				ceph_put_snap_context(ci->i_head_snapc);
1695				ci->i_head_snapc = NULL;
1696			}
1697		}
1698
1699		if (atomic_read(&ci->i_filelock_ref) > 0) {
1700			/* make further file lock syscall return -EIO */
1701			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1702			pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1703					    inode, ceph_ino(inode));
1704		}
1705
1706		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1707			list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1708			ci->i_prealloc_cap_flush = NULL;
1709		}
1710
1711		if (!list_empty(&ci->i_cap_snaps))
1712			capsnap_release = remove_capsnaps(mdsc, inode);
1713	}
1714	spin_unlock(&ci->i_ceph_lock);
1715	while (!list_empty(&to_remove)) {
1716		struct ceph_cap_flush *cf;
1717		cf = list_first_entry(&to_remove,
1718				      struct ceph_cap_flush, i_list);
1719		list_del_init(&cf->i_list);
1720		if (!cf->is_capsnap)
1721			ceph_free_cap_flush(cf);
1722	}
1723
1724	wake_up_all(&ci->i_cap_wq);
1725	if (invalidate)
1726		ceph_queue_invalidate(inode);
1727	if (dirty_dropped)
1728		iput(inode);
1729	while (capsnap_release--)
1730		iput(inode);
1731	return 0;
1732}
1733
1734/*
1735 * caller must hold session s_mutex
1736 */
1737static void remove_session_caps(struct ceph_mds_session *session)
1738{
1739	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1740	struct super_block *sb = fsc->sb;
1741	LIST_HEAD(dispose);
1742
1743	dout("remove_session_caps on %p\n", session);
1744	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1745
1746	wake_up_all(&fsc->mdsc->cap_flushing_wq);
1747
1748	spin_lock(&session->s_cap_lock);
1749	if (session->s_nr_caps > 0) {
1750		struct inode *inode;
1751		struct ceph_cap *cap, *prev = NULL;
1752		struct ceph_vino vino;
1753		/*
1754		 * iterate_session_caps() skips inodes that are being
1755		 * deleted, we need to wait until deletions are complete.
1756		 * __wait_on_freeing_inode() is designed for the job,
1757		 * but it is not exported, so use lookup inode function
1758		 * to access it.
1759		 */
1760		while (!list_empty(&session->s_caps)) {
1761			cap = list_entry(session->s_caps.next,
1762					 struct ceph_cap, session_caps);
1763			if (cap == prev)
1764				break;
1765			prev = cap;
1766			vino = cap->ci->i_vino;
1767			spin_unlock(&session->s_cap_lock);
1768
1769			inode = ceph_find_inode(sb, vino);
1770			iput(inode);
1771
1772			spin_lock(&session->s_cap_lock);
1773		}
1774	}
1775
1776	// drop cap expires and unlock s_cap_lock
1777	detach_cap_releases(session, &dispose);
1778
1779	BUG_ON(session->s_nr_caps > 0);
1780	BUG_ON(!list_empty(&session->s_cap_flushing));
1781	spin_unlock(&session->s_cap_lock);
1782	dispose_cap_releases(session->s_mdsc, &dispose);
1783}
1784
1785enum {
1786	RECONNECT,
1787	RENEWCAPS,
1788	FORCE_RO,
1789};
1790
1791/*
1792 * wake up any threads waiting on this session's caps.  if the cap is
1793 * old (didn't get renewed on the client reconnect), remove it now.
1794 *
1795 * caller must hold s_mutex.
1796 */
1797static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1798			      void *arg)
1799{
1800	struct ceph_inode_info *ci = ceph_inode(inode);
1801	unsigned long ev = (unsigned long)arg;
1802
1803	if (ev == RECONNECT) {
1804		spin_lock(&ci->i_ceph_lock);
1805		ci->i_wanted_max_size = 0;
1806		ci->i_requested_max_size = 0;
1807		spin_unlock(&ci->i_ceph_lock);
1808	} else if (ev == RENEWCAPS) {
1809		if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) {
1810			/* mds did not re-issue stale cap */
1811			spin_lock(&ci->i_ceph_lock);
1812			cap->issued = cap->implemented = CEPH_CAP_PIN;
1813			spin_unlock(&ci->i_ceph_lock);
1814		}
1815	} else if (ev == FORCE_RO) {
1816	}
1817	wake_up_all(&ci->i_cap_wq);
1818	return 0;
1819}
1820
1821static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1822{
1823	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1824	ceph_iterate_session_caps(session, wake_up_session_cb,
1825				  (void *)(unsigned long)ev);
1826}
1827
1828/*
1829 * Send periodic message to MDS renewing all currently held caps.  The
1830 * ack will reset the expiration for all caps from this session.
1831 *
1832 * caller holds s_mutex
1833 */
1834static int send_renew_caps(struct ceph_mds_client *mdsc,
1835			   struct ceph_mds_session *session)
1836{
1837	struct ceph_msg *msg;
1838	int state;
1839
1840	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1841	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1842		pr_info("mds%d caps stale\n", session->s_mds);
1843	session->s_renew_requested = jiffies;
1844
1845	/* do not try to renew caps until a recovering mds has reconnected
1846	 * with its clients. */
1847	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1848	if (state < CEPH_MDS_STATE_RECONNECT) {
1849		dout("send_renew_caps ignoring mds%d (%s)\n",
1850		     session->s_mds, ceph_mds_state_name(state));
1851		return 0;
1852	}
1853
1854	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1855		ceph_mds_state_name(state));
1856	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1857				      ++session->s_renew_seq);
1858	if (!msg)
1859		return -ENOMEM;
1860	ceph_con_send(&session->s_con, msg);
1861	return 0;
1862}
1863
1864static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1865			     struct ceph_mds_session *session, u64 seq)
1866{
1867	struct ceph_msg *msg;
1868
1869	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1870	     session->s_mds, ceph_session_state_name(session->s_state), seq);
1871	msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1872	if (!msg)
1873		return -ENOMEM;
1874	ceph_con_send(&session->s_con, msg);
1875	return 0;
1876}
1877
1878
1879/*
1880 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1881 *
1882 * Called under session->s_mutex
1883 */
1884static void renewed_caps(struct ceph_mds_client *mdsc,
1885			 struct ceph_mds_session *session, int is_renew)
1886{
1887	int was_stale;
1888	int wake = 0;
1889
1890	spin_lock(&session->s_cap_lock);
1891	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1892
1893	session->s_cap_ttl = session->s_renew_requested +
1894		mdsc->mdsmap->m_session_timeout*HZ;
1895
1896	if (was_stale) {
1897		if (time_before(jiffies, session->s_cap_ttl)) {
1898			pr_info("mds%d caps renewed\n", session->s_mds);
1899			wake = 1;
1900		} else {
1901			pr_info("mds%d caps still stale\n", session->s_mds);
1902		}
1903	}
1904	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1905	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1906	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1907	spin_unlock(&session->s_cap_lock);
1908
1909	if (wake)
1910		wake_up_session_caps(session, RENEWCAPS);
1911}
1912
1913/*
1914 * send a session close request
1915 */
1916static int request_close_session(struct ceph_mds_session *session)
1917{
1918	struct ceph_msg *msg;
1919
1920	dout("request_close_session mds%d state %s seq %lld\n",
1921	     session->s_mds, ceph_session_state_name(session->s_state),
1922	     session->s_seq);
1923	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
1924				      session->s_seq);
1925	if (!msg)
1926		return -ENOMEM;
1927	ceph_con_send(&session->s_con, msg);
1928	return 1;
1929}
1930
1931/*
1932 * Called with s_mutex held.
1933 */
1934static int __close_session(struct ceph_mds_client *mdsc,
1935			 struct ceph_mds_session *session)
1936{
1937	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1938		return 0;
1939	session->s_state = CEPH_MDS_SESSION_CLOSING;
1940	return request_close_session(session);
1941}
1942
1943static bool drop_negative_children(struct dentry *dentry)
1944{
1945	struct dentry *child;
1946	bool all_negative = true;
1947
1948	if (!d_is_dir(dentry))
1949		goto out;
1950
1951	spin_lock(&dentry->d_lock);
1952	list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1953		if (d_really_is_positive(child)) {
1954			all_negative = false;
1955			break;
1956		}
1957	}
1958	spin_unlock(&dentry->d_lock);
1959
1960	if (all_negative)
1961		shrink_dcache_parent(dentry);
1962out:
1963	return all_negative;
1964}
1965
1966/*
1967 * Trim old(er) caps.
1968 *
1969 * Because we can't cache an inode without one or more caps, we do
1970 * this indirectly: if a cap is unused, we prune its aliases, at which
1971 * point the inode will hopefully get dropped to.
1972 *
1973 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1974 * memory pressure from the MDS, though, so it needn't be perfect.
1975 */
1976static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1977{
1978	int *remaining = arg;
1979	struct ceph_inode_info *ci = ceph_inode(inode);
1980	int used, wanted, oissued, mine;
1981
1982	if (*remaining <= 0)
1983		return -1;
1984
1985	spin_lock(&ci->i_ceph_lock);
1986	mine = cap->issued | cap->implemented;
1987	used = __ceph_caps_used(ci);
1988	wanted = __ceph_caps_file_wanted(ci);
1989	oissued = __ceph_caps_issued_other(ci, cap);
1990
1991	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1992	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1993	     ceph_cap_string(used), ceph_cap_string(wanted));
1994	if (cap == ci->i_auth_cap) {
1995		if (ci->i_dirty_caps || ci->i_flushing_caps ||
1996		    !list_empty(&ci->i_cap_snaps))
1997			goto out;
1998		if ((used | wanted) & CEPH_CAP_ANY_WR)
1999			goto out;
2000		/* Note: it's possible that i_filelock_ref becomes non-zero
2001		 * after dropping auth caps. It doesn't hurt because reply
2002		 * of lock mds request will re-add auth caps. */
2003		if (atomic_read(&ci->i_filelock_ref) > 0)
2004			goto out;
2005	}
2006	/* The inode has cached pages, but it's no longer used.
2007	 * we can safely drop it */
2008	if (S_ISREG(inode->i_mode) &&
2009	    wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2010	    !(oissued & CEPH_CAP_FILE_CACHE)) {
2011	  used = 0;
2012	  oissued = 0;
2013	}
2014	if ((used | wanted) & ~oissued & mine)
2015		goto out;   /* we need these caps */
2016
2017	if (oissued) {
2018		/* we aren't the only cap.. just remove us */
2019		ceph_remove_cap(cap, true);
2020		(*remaining)--;
2021	} else {
2022		struct dentry *dentry;
2023		/* try dropping referring dentries */
2024		spin_unlock(&ci->i_ceph_lock);
2025		dentry = d_find_any_alias(inode);
2026		if (dentry && drop_negative_children(dentry)) {
2027			int count;
2028			dput(dentry);
2029			d_prune_aliases(inode);
2030			count = atomic_read(&inode->i_count);
2031			if (count == 1)
2032				(*remaining)--;
2033			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2034			     inode, cap, count);
2035		} else {
2036			dput(dentry);
2037		}
2038		return 0;
2039	}
2040
2041out:
2042	spin_unlock(&ci->i_ceph_lock);
2043	return 0;
2044}
2045
2046/*
2047 * Trim session cap count down to some max number.
2048 */
2049int ceph_trim_caps(struct ceph_mds_client *mdsc,
2050		   struct ceph_mds_session *session,
2051		   int max_caps)
2052{
2053	int trim_caps = session->s_nr_caps - max_caps;
2054
2055	dout("trim_caps mds%d start: %d / %d, trim %d\n",
2056	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2057	if (trim_caps > 0) {
2058		int remaining = trim_caps;
2059
2060		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2061		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2062		     session->s_mds, session->s_nr_caps, max_caps,
2063			trim_caps - remaining);
2064	}
2065
2066	ceph_flush_cap_releases(mdsc, session);
2067	return 0;
2068}
2069
2070static int check_caps_flush(struct ceph_mds_client *mdsc,
2071			    u64 want_flush_tid)
2072{
2073	int ret = 1;
2074
2075	spin_lock(&mdsc->cap_dirty_lock);
2076	if (!list_empty(&mdsc->cap_flush_list)) {
2077		struct ceph_cap_flush *cf =
2078			list_first_entry(&mdsc->cap_flush_list,
2079					 struct ceph_cap_flush, g_list);
2080		if (cf->tid <= want_flush_tid) {
2081			dout("check_caps_flush still flushing tid "
2082			     "%llu <= %llu\n", cf->tid, want_flush_tid);
2083			ret = 0;
2084		}
2085	}
2086	spin_unlock(&mdsc->cap_dirty_lock);
2087	return ret;
2088}
2089
2090/*
2091 * flush all dirty inode data to disk.
2092 *
2093 * returns true if we've flushed through want_flush_tid
2094 */
2095static void wait_caps_flush(struct ceph_mds_client *mdsc,
2096			    u64 want_flush_tid)
2097{
2098	dout("check_caps_flush want %llu\n", want_flush_tid);
2099
2100	wait_event(mdsc->cap_flushing_wq,
2101		   check_caps_flush(mdsc, want_flush_tid));
2102
2103	dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2104}
2105
2106/*
2107 * called under s_mutex
2108 */
2109static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2110				   struct ceph_mds_session *session)
2111{
2112	struct ceph_msg *msg = NULL;
2113	struct ceph_mds_cap_release *head;
2114	struct ceph_mds_cap_item *item;
2115	struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2116	struct ceph_cap *cap;
2117	LIST_HEAD(tmp_list);
2118	int num_cap_releases;
2119	__le32	barrier, *cap_barrier;
2120
2121	down_read(&osdc->lock);
2122	barrier = cpu_to_le32(osdc->epoch_barrier);
2123	up_read(&osdc->lock);
2124
2125	spin_lock(&session->s_cap_lock);
2126again:
2127	list_splice_init(&session->s_cap_releases, &tmp_list);
2128	num_cap_releases = session->s_num_cap_releases;
2129	session->s_num_cap_releases = 0;
2130	spin_unlock(&session->s_cap_lock);
2131
2132	while (!list_empty(&tmp_list)) {
2133		if (!msg) {
2134			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2135					PAGE_SIZE, GFP_NOFS, false);
2136			if (!msg)
2137				goto out_err;
2138			head = msg->front.iov_base;
2139			head->num = cpu_to_le32(0);
2140			msg->front.iov_len = sizeof(*head);
2141
2142			msg->hdr.version = cpu_to_le16(2);
2143			msg->hdr.compat_version = cpu_to_le16(1);
2144		}
2145
2146		cap = list_first_entry(&tmp_list, struct ceph_cap,
2147					session_caps);
2148		list_del(&cap->session_caps);
2149		num_cap_releases--;
2150
2151		head = msg->front.iov_base;
2152		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2153				   &head->num);
2154		item = msg->front.iov_base + msg->front.iov_len;
2155		item->ino = cpu_to_le64(cap->cap_ino);
2156		item->cap_id = cpu_to_le64(cap->cap_id);
2157		item->migrate_seq = cpu_to_le32(cap->mseq);
2158		item->seq = cpu_to_le32(cap->issue_seq);
2159		msg->front.iov_len += sizeof(*item);
2160
2161		ceph_put_cap(mdsc, cap);
2162
2163		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2164			// Append cap_barrier field
2165			cap_barrier = msg->front.iov_base + msg->front.iov_len;
2166			*cap_barrier = barrier;
2167			msg->front.iov_len += sizeof(*cap_barrier);
2168
2169			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2170			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2171			ceph_con_send(&session->s_con, msg);
2172			msg = NULL;
2173		}
2174	}
2175
2176	BUG_ON(num_cap_releases != 0);
2177
2178	spin_lock(&session->s_cap_lock);
2179	if (!list_empty(&session->s_cap_releases))
2180		goto again;
2181	spin_unlock(&session->s_cap_lock);
2182
2183	if (msg) {
2184		// Append cap_barrier field
2185		cap_barrier = msg->front.iov_base + msg->front.iov_len;
2186		*cap_barrier = barrier;
2187		msg->front.iov_len += sizeof(*cap_barrier);
2188
2189		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2190		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2191		ceph_con_send(&session->s_con, msg);
2192	}
2193	return;
2194out_err:
2195	pr_err("send_cap_releases mds%d, failed to allocate message\n",
2196		session->s_mds);
2197	spin_lock(&session->s_cap_lock);
2198	list_splice(&tmp_list, &session->s_cap_releases);
2199	session->s_num_cap_releases += num_cap_releases;
2200	spin_unlock(&session->s_cap_lock);
2201}
2202
2203static void ceph_cap_release_work(struct work_struct *work)
2204{
2205	struct ceph_mds_session *session =
2206		container_of(work, struct ceph_mds_session, s_cap_release_work);
2207
2208	mutex_lock(&session->s_mutex);
2209	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2210	    session->s_state == CEPH_MDS_SESSION_HUNG)
2211		ceph_send_cap_releases(session->s_mdsc, session);
2212	mutex_unlock(&session->s_mutex);
2213	ceph_put_mds_session(session);
2214}
2215
2216void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2217		             struct ceph_mds_session *session)
2218{
2219	if (mdsc->stopping)
2220		return;
2221
2222	ceph_get_mds_session(session);
2223	if (queue_work(mdsc->fsc->cap_wq,
2224		       &session->s_cap_release_work)) {
2225		dout("cap release work queued\n");
2226	} else {
2227		ceph_put_mds_session(session);
2228		dout("failed to queue cap release work\n");
2229	}
2230}
2231
2232/*
2233 * caller holds session->s_cap_lock
2234 */
2235void __ceph_queue_cap_release(struct ceph_mds_session *session,
2236			      struct ceph_cap *cap)
2237{
2238	list_add_tail(&cap->session_caps, &session->s_cap_releases);
2239	session->s_num_cap_releases++;
2240
2241	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2242		ceph_flush_cap_releases(session->s_mdsc, session);
2243}
2244
2245static void ceph_cap_reclaim_work(struct work_struct *work)
2246{
2247	struct ceph_mds_client *mdsc =
2248		container_of(work, struct ceph_mds_client, cap_reclaim_work);
2249	int ret = ceph_trim_dentries(mdsc);
2250	if (ret == -EAGAIN)
2251		ceph_queue_cap_reclaim_work(mdsc);
2252}
2253
2254void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2255{
2256	if (mdsc->stopping)
2257		return;
2258
2259        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2260                dout("caps reclaim work queued\n");
2261        } else {
2262                dout("failed to queue caps release work\n");
2263        }
2264}
2265
2266void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2267{
2268	int val;
2269	if (!nr)
2270		return;
2271	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2272	if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2273		atomic_set(&mdsc->cap_reclaim_pending, 0);
2274		ceph_queue_cap_reclaim_work(mdsc);
2275	}
2276}
2277
2278/*
2279 * requests
2280 */
2281
2282int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2283				    struct inode *dir)
2284{
2285	struct ceph_inode_info *ci = ceph_inode(dir);
2286	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2287	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2288	size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2289	unsigned int num_entries;
2290	int order;
2291
2292	spin_lock(&ci->i_ceph_lock);
2293	num_entries = ci->i_files + ci->i_subdirs;
2294	spin_unlock(&ci->i_ceph_lock);
2295	num_entries = max(num_entries, 1U);
2296	num_entries = min(num_entries, opt->max_readdir);
2297
2298	order = get_order(size * num_entries);
2299	while (order >= 0) {
2300		rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2301							     __GFP_NOWARN,
2302							     order);
2303		if (rinfo->dir_entries)
2304			break;
2305		order--;
2306	}
2307	if (!rinfo->dir_entries)
2308		return -ENOMEM;
2309
2310	num_entries = (PAGE_SIZE << order) / size;
2311	num_entries = min(num_entries, opt->max_readdir);
2312
2313	rinfo->dir_buf_size = PAGE_SIZE << order;
2314	req->r_num_caps = num_entries + 1;
2315	req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2316	req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2317	return 0;
2318}
2319
2320/*
2321 * Create an mds request.
2322 */
2323struct ceph_mds_request *
2324ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2325{
2326	struct ceph_mds_request *req;
2327
2328	req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2329	if (!req)
2330		return ERR_PTR(-ENOMEM);
2331
2332	mutex_init(&req->r_fill_mutex);
2333	req->r_mdsc = mdsc;
2334	req->r_started = jiffies;
2335	req->r_start_latency = ktime_get();
2336	req->r_resend_mds = -1;
2337	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2338	INIT_LIST_HEAD(&req->r_unsafe_target_item);
2339	req->r_fmode = -1;
2340	kref_init(&req->r_kref);
2341	RB_CLEAR_NODE(&req->r_node);
2342	INIT_LIST_HEAD(&req->r_wait);
2343	init_completion(&req->r_completion);
2344	init_completion(&req->r_safe_completion);
2345	INIT_LIST_HEAD(&req->r_unsafe_item);
2346
2347	ktime_get_coarse_real_ts64(&req->r_stamp);
2348
2349	req->r_op = op;
2350	req->r_direct_mode = mode;
2351	return req;
2352}
2353
2354/*
2355 * return oldest (lowest) request, tid in request tree, 0 if none.
2356 *
2357 * called under mdsc->mutex.
2358 */
2359static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2360{
2361	if (RB_EMPTY_ROOT(&mdsc->request_tree))
2362		return NULL;
2363	return rb_entry(rb_first(&mdsc->request_tree),
2364			struct ceph_mds_request, r_node);
2365}
2366
2367static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2368{
2369	return mdsc->oldest_tid;
2370}
2371
2372/*
2373 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2374 * on build_path_from_dentry in fs/cifs/dir.c.
2375 *
2376 * If @stop_on_nosnap, generate path relative to the first non-snapped
2377 * inode.
2378 *
2379 * Encode hidden .snap dirs as a double /, i.e.
2380 *   foo/.snap/bar -> foo//bar
2381 */
2382char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2383			   int stop_on_nosnap)
2384{
2385	struct dentry *temp;
2386	char *path;
2387	int pos;
2388	unsigned seq;
2389	u64 base;
2390
2391	if (!dentry)
2392		return ERR_PTR(-EINVAL);
2393
2394	path = __getname();
2395	if (!path)
2396		return ERR_PTR(-ENOMEM);
2397retry:
2398	pos = PATH_MAX - 1;
2399	path[pos] = '\0';
2400
2401	seq = read_seqbegin(&rename_lock);
2402	rcu_read_lock();
2403	temp = dentry;
2404	for (;;) {
2405		struct inode *inode;
2406
2407		spin_lock(&temp->d_lock);
2408		inode = d_inode(temp);
2409		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2410			dout("build_path path+%d: %p SNAPDIR\n",
2411			     pos, temp);
2412		} else if (stop_on_nosnap && inode && dentry != temp &&
2413			   ceph_snap(inode) == CEPH_NOSNAP) {
2414			spin_unlock(&temp->d_lock);
2415			pos++; /* get rid of any prepended '/' */
2416			break;
2417		} else {
2418			pos -= temp->d_name.len;
2419			if (pos < 0) {
2420				spin_unlock(&temp->d_lock);
2421				break;
2422			}
2423			memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2424		}
2425		spin_unlock(&temp->d_lock);
2426		temp = READ_ONCE(temp->d_parent);
2427
2428		/* Are we at the root? */
2429		if (IS_ROOT(temp))
2430			break;
2431
2432		/* Are we out of buffer? */
2433		if (--pos < 0)
2434			break;
2435
2436		path[pos] = '/';
2437	}
2438	base = ceph_ino(d_inode(temp));
2439	rcu_read_unlock();
2440
2441	if (read_seqretry(&rename_lock, seq))
2442		goto retry;
2443
2444	if (pos < 0) {
2445		/*
2446		 * A rename didn't occur, but somehow we didn't end up where
2447		 * we thought we would. Throw a warning and try again.
2448		 */
2449		pr_warn("build_path did not end path lookup where "
2450			"expected, pos is %d\n", pos);
2451		goto retry;
2452	}
2453
2454	*pbase = base;
2455	*plen = PATH_MAX - 1 - pos;
2456	dout("build_path on %p %d built %llx '%.*s'\n",
2457	     dentry, d_count(dentry), base, *plen, path + pos);
2458	return path + pos;
2459}
2460
2461static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2462			     const char **ppath, int *ppathlen, u64 *pino,
2463			     bool *pfreepath, bool parent_locked)
2464{
2465	char *path;
2466
2467	rcu_read_lock();
2468	if (!dir)
2469		dir = d_inode_rcu(dentry->d_parent);
2470	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2471		*pino = ceph_ino(dir);
2472		rcu_read_unlock();
2473		*ppath = dentry->d_name.name;
2474		*ppathlen = dentry->d_name.len;
2475		return 0;
2476	}
2477	rcu_read_unlock();
2478	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2479	if (IS_ERR(path))
2480		return PTR_ERR(path);
2481	*ppath = path;
2482	*pfreepath = true;
2483	return 0;
2484}
2485
2486static int build_inode_path(struct inode *inode,
2487			    const char **ppath, int *ppathlen, u64 *pino,
2488			    bool *pfreepath)
2489{
2490	struct dentry *dentry;
2491	char *path;
2492
2493	if (ceph_snap(inode) == CEPH_NOSNAP) {
2494		*pino = ceph_ino(inode);
2495		*ppathlen = 0;
2496		return 0;
2497	}
2498	dentry = d_find_alias(inode);
2499	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2500	dput(dentry);
2501	if (IS_ERR(path))
2502		return PTR_ERR(path);
2503	*ppath = path;
2504	*pfreepath = true;
2505	return 0;
2506}
2507
2508/*
2509 * request arguments may be specified via an inode *, a dentry *, or
2510 * an explicit ino+path.
2511 */
2512static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2513				  struct inode *rdiri, const char *rpath,
2514				  u64 rino, const char **ppath, int *pathlen,
2515				  u64 *ino, bool *freepath, bool parent_locked)
2516{
2517	int r = 0;
2518
2519	if (rinode) {
2520		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2521		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2522		     ceph_snap(rinode));
2523	} else if (rdentry) {
2524		r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2525					freepath, parent_locked);
2526		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2527		     *ppath);
2528	} else if (rpath || rino) {
2529		*ino = rino;
2530		*ppath = rpath;
2531		*pathlen = rpath ? strlen(rpath) : 0;
2532		dout(" path %.*s\n", *pathlen, rpath);
2533	}
2534
2535	return r;
2536}
2537
2538static void encode_timestamp_and_gids(void **p,
2539				      const struct ceph_mds_request *req)
2540{
2541	struct ceph_timespec ts;
2542	int i;
2543
2544	ceph_encode_timespec64(&ts, &req->r_stamp);
2545	ceph_encode_copy(p, &ts, sizeof(ts));
2546
2547	/* gid_list */
2548	ceph_encode_32(p, req->r_cred->group_info->ngroups);
2549	for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2550		ceph_encode_64(p, from_kgid(&init_user_ns,
2551					    req->r_cred->group_info->gid[i]));
2552}
2553
2554/*
2555 * called under mdsc->mutex
2556 */
2557static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2558					       struct ceph_mds_request *req,
2559					       bool drop_cap_releases)
2560{
2561	int mds = session->s_mds;
2562	struct ceph_mds_client *mdsc = session->s_mdsc;
2563	struct ceph_msg *msg;
2564	struct ceph_mds_request_head_old *head;
2565	const char *path1 = NULL;
2566	const char *path2 = NULL;
2567	u64 ino1 = 0, ino2 = 0;
2568	int pathlen1 = 0, pathlen2 = 0;
2569	bool freepath1 = false, freepath2 = false;
2570	int len;
2571	u16 releases;
2572	void *p, *end;
2573	int ret;
2574	bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2575
2576	ret = set_request_path_attr(req->r_inode, req->r_dentry,
2577			      req->r_parent, req->r_path1, req->r_ino1.ino,
2578			      &path1, &pathlen1, &ino1, &freepath1,
2579			      test_bit(CEPH_MDS_R_PARENT_LOCKED,
2580					&req->r_req_flags));
2581	if (ret < 0) {
2582		msg = ERR_PTR(ret);
2583		goto out;
2584	}
2585
2586	/* If r_old_dentry is set, then assume that its parent is locked */
2587	ret = set_request_path_attr(NULL, req->r_old_dentry,
2588			      req->r_old_dentry_dir,
2589			      req->r_path2, req->r_ino2.ino,
2590			      &path2, &pathlen2, &ino2, &freepath2, true);
2591	if (ret < 0) {
2592		msg = ERR_PTR(ret);
2593		goto out_free1;
2594	}
2595
2596	len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head);
2597	len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2598		sizeof(struct ceph_timespec);
2599	len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
2600
2601	/* calculate (max) length for cap releases */
2602	len += sizeof(struct ceph_mds_request_release) *
2603		(!!req->r_inode_drop + !!req->r_dentry_drop +
2604		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2605
2606	if (req->r_dentry_drop)
2607		len += pathlen1;
2608	if (req->r_old_dentry_drop)
2609		len += pathlen2;
2610
2611	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2612	if (!msg) {
2613		msg = ERR_PTR(-ENOMEM);
2614		goto out_free2;
2615	}
2616
2617	msg->hdr.tid = cpu_to_le64(req->r_tid);
2618
2619	/*
2620	 * The old ceph_mds_request_head didn't contain a version field, and
2621	 * one was added when we moved the message version from 3->4.
2622	 */
2623	if (legacy) {
2624		msg->hdr.version = cpu_to_le16(3);
2625		head = msg->front.iov_base;
2626		p = msg->front.iov_base + sizeof(*head);
2627	} else {
2628		struct ceph_mds_request_head *new_head = msg->front.iov_base;
2629
2630		msg->hdr.version = cpu_to_le16(4);
2631		new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
2632		head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2633		p = msg->front.iov_base + sizeof(*new_head);
2634	}
2635
2636	end = msg->front.iov_base + msg->front.iov_len;
2637
2638	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2639	head->op = cpu_to_le32(req->r_op);
2640	head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
2641						 req->r_cred->fsuid));
2642	head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
2643						 req->r_cred->fsgid));
2644	head->ino = cpu_to_le64(req->r_deleg_ino);
2645	head->args = req->r_args;
2646
2647	ceph_encode_filepath(&p, end, ino1, path1);
2648	ceph_encode_filepath(&p, end, ino2, path2);
2649
2650	/* make note of release offset, in case we need to replay */
2651	req->r_request_release_offset = p - msg->front.iov_base;
2652
2653	/* cap releases */
2654	releases = 0;
2655	if (req->r_inode_drop)
2656		releases += ceph_encode_inode_release(&p,
2657		      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2658		      mds, req->r_inode_drop, req->r_inode_unless,
2659		      req->r_op == CEPH_MDS_OP_READDIR);
2660	if (req->r_dentry_drop)
2661		releases += ceph_encode_dentry_release(&p, req->r_dentry,
2662				req->r_parent, mds, req->r_dentry_drop,
2663				req->r_dentry_unless);
2664	if (req->r_old_dentry_drop)
2665		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2666				req->r_old_dentry_dir, mds,
2667				req->r_old_dentry_drop,
2668				req->r_old_dentry_unless);
2669	if (req->r_old_inode_drop)
2670		releases += ceph_encode_inode_release(&p,
2671		      d_inode(req->r_old_dentry),
2672		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2673
2674	if (drop_cap_releases) {
2675		releases = 0;
2676		p = msg->front.iov_base + req->r_request_release_offset;
2677	}
2678
2679	head->num_releases = cpu_to_le16(releases);
2680
2681	encode_timestamp_and_gids(&p, req);
2682
2683	if (WARN_ON_ONCE(p > end)) {
2684		ceph_msg_put(msg);
2685		msg = ERR_PTR(-ERANGE);
2686		goto out_free2;
2687	}
2688
2689	msg->front.iov_len = p - msg->front.iov_base;
2690	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2691
2692	if (req->r_pagelist) {
2693		struct ceph_pagelist *pagelist = req->r_pagelist;
2694		ceph_msg_data_add_pagelist(msg, pagelist);
2695		msg->hdr.data_len = cpu_to_le32(pagelist->length);
2696	} else {
2697		msg->hdr.data_len = 0;
2698	}
2699
2700	msg->hdr.data_off = cpu_to_le16(0);
2701
2702out_free2:
2703	if (freepath2)
2704		ceph_mdsc_free_path((char *)path2, pathlen2);
2705out_free1:
2706	if (freepath1)
2707		ceph_mdsc_free_path((char *)path1, pathlen1);
2708out:
2709	return msg;
2710}
2711
2712/*
2713 * called under mdsc->mutex if error, under no mutex if
2714 * success.
2715 */
2716static void complete_request(struct ceph_mds_client *mdsc,
2717			     struct ceph_mds_request *req)
2718{
2719	req->r_end_latency = ktime_get();
2720
2721	if (req->r_callback)
2722		req->r_callback(mdsc, req);
2723	complete_all(&req->r_completion);
2724}
2725
2726static struct ceph_mds_request_head_old *
2727find_old_request_head(void *p, u64 features)
2728{
2729	bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2730	struct ceph_mds_request_head *new_head;
2731
2732	if (legacy)
2733		return (struct ceph_mds_request_head_old *)p;
2734	new_head = (struct ceph_mds_request_head *)p;
2735	return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2736}
2737
2738/*
2739 * called under mdsc->mutex
2740 */
2741static int __prepare_send_request(struct ceph_mds_session *session,
2742				  struct ceph_mds_request *req,
2743				  bool drop_cap_releases)
2744{
2745	int mds = session->s_mds;
2746	struct ceph_mds_client *mdsc = session->s_mdsc;
2747	struct ceph_mds_request_head_old *rhead;
2748	struct ceph_msg *msg;
2749	int flags = 0;
2750
2751	req->r_attempts++;
2752	if (req->r_inode) {
2753		struct ceph_cap *cap =
2754			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2755
2756		if (cap)
2757			req->r_sent_on_mseq = cap->mseq;
2758		else
2759			req->r_sent_on_mseq = -1;
2760	}
2761	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2762	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2763
2764	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2765		void *p;
2766
2767		/*
2768		 * Replay.  Do not regenerate message (and rebuild
2769		 * paths, etc.); just use the original message.
2770		 * Rebuilding paths will break for renames because
2771		 * d_move mangles the src name.
2772		 */
2773		msg = req->r_request;
2774		rhead = find_old_request_head(msg->front.iov_base,
2775					      session->s_con.peer_features);
2776
2777		flags = le32_to_cpu(rhead->flags);
2778		flags |= CEPH_MDS_FLAG_REPLAY;
2779		rhead->flags = cpu_to_le32(flags);
2780
2781		if (req->r_target_inode)
2782			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2783
2784		rhead->num_retry = req->r_attempts - 1;
2785
2786		/* remove cap/dentry releases from message */
2787		rhead->num_releases = 0;
2788
2789		p = msg->front.iov_base + req->r_request_release_offset;
2790		encode_timestamp_and_gids(&p, req);
2791
2792		msg->front.iov_len = p - msg->front.iov_base;
2793		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2794		return 0;
2795	}
2796
2797	if (req->r_request) {
2798		ceph_msg_put(req->r_request);
2799		req->r_request = NULL;
2800	}
2801	msg = create_request_message(session, req, drop_cap_releases);
2802	if (IS_ERR(msg)) {
2803		req->r_err = PTR_ERR(msg);
2804		return PTR_ERR(msg);
2805	}
2806	req->r_request = msg;
2807
2808	rhead = find_old_request_head(msg->front.iov_base,
2809				      session->s_con.peer_features);
2810	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2811	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2812		flags |= CEPH_MDS_FLAG_REPLAY;
2813	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2814		flags |= CEPH_MDS_FLAG_ASYNC;
2815	if (req->r_parent)
2816		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2817	rhead->flags = cpu_to_le32(flags);
2818	rhead->num_fwd = req->r_num_fwd;
2819	rhead->num_retry = req->r_attempts - 1;
2820
2821	dout(" r_parent = %p\n", req->r_parent);
2822	return 0;
2823}
2824
2825/*
2826 * called under mdsc->mutex
2827 */
2828static int __send_request(struct ceph_mds_session *session,
2829			  struct ceph_mds_request *req,
2830			  bool drop_cap_releases)
2831{
2832	int err;
2833
2834	err = __prepare_send_request(session, req, drop_cap_releases);
2835	if (!err) {
2836		ceph_msg_get(req->r_request);
2837		ceph_con_send(&session->s_con, req->r_request);
2838	}
2839
2840	return err;
2841}
2842
2843/*
2844 * send request, or put it on the appropriate wait list.
2845 */
2846static void __do_request(struct ceph_mds_client *mdsc,
2847			struct ceph_mds_request *req)
2848{
2849	struct ceph_mds_session *session = NULL;
2850	int mds = -1;
2851	int err = 0;
2852	bool random;
2853
2854	if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2855		if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2856			__unregister_request(mdsc, req);
2857		return;
2858	}
2859
2860	if (req->r_timeout &&
2861	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2862		dout("do_request timed out\n");
2863		err = -ETIMEDOUT;
2864		goto finish;
2865	}
2866	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2867		dout("do_request forced umount\n");
2868		err = -EIO;
2869		goto finish;
2870	}
2871	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2872		if (mdsc->mdsmap_err) {
2873			err = mdsc->mdsmap_err;
2874			dout("do_request mdsmap err %d\n", err);
2875			goto finish;
2876		}
2877		if (mdsc->mdsmap->m_epoch == 0) {
2878			dout("do_request no mdsmap, waiting for map\n");
2879			list_add(&req->r_wait, &mdsc->waiting_for_map);
2880			return;
2881		}
2882		if (!(mdsc->fsc->mount_options->flags &
2883		      CEPH_MOUNT_OPT_MOUNTWAIT) &&
2884		    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2885			err = -EHOSTUNREACH;
2886			goto finish;
2887		}
2888	}
2889
2890	put_request_session(req);
2891
2892	mds = __choose_mds(mdsc, req, &random);
2893	if (mds < 0 ||
2894	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2895		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2896			err = -EJUKEBOX;
2897			goto finish;
2898		}
2899		dout("do_request no mds or not active, waiting for map\n");
2900		list_add(&req->r_wait, &mdsc->waiting_for_map);
2901		return;
2902	}
2903
2904	/* get, open session */
2905	session = __ceph_lookup_mds_session(mdsc, mds);
2906	if (!session) {
2907		session = register_session(mdsc, mds);
2908		if (IS_ERR(session)) {
2909			err = PTR_ERR(session);
2910			goto finish;
2911		}
2912	}
2913	req->r_session = ceph_get_mds_session(session);
2914
2915	dout("do_request mds%d session %p state %s\n", mds, session,
2916	     ceph_session_state_name(session->s_state));
2917	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2918	    session->s_state != CEPH_MDS_SESSION_HUNG) {
2919		/*
2920		 * We cannot queue async requests since the caps and delegated
2921		 * inodes are bound to the session. Just return -EJUKEBOX and
2922		 * let the caller retry a sync request in that case.
2923		 */
2924		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2925			err = -EJUKEBOX;
2926			goto out_session;
2927		}
2928
2929		/*
2930		 * If the session has been REJECTED, then return a hard error,
2931		 * unless it's a CLEANRECOVER mount, in which case we'll queue
2932		 * it to the mdsc queue.
2933		 */
2934		if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2935			if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
2936				list_add(&req->r_wait, &mdsc->waiting_for_map);
2937			else
2938				err = -EACCES;
2939			goto out_session;
2940		}
2941
2942		if (session->s_state == CEPH_MDS_SESSION_NEW ||
2943		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
2944			err = __open_session(mdsc, session);
2945			if (err)
2946				goto out_session;
2947			/* retry the same mds later */
2948			if (random)
2949				req->r_resend_mds = mds;
2950		}
2951		list_add(&req->r_wait, &session->s_waiting);
2952		goto out_session;
2953	}
2954
2955	/* send request */
2956	req->r_resend_mds = -1;   /* forget any previous mds hint */
2957
2958	if (req->r_request_started == 0)   /* note request start time */
2959		req->r_request_started = jiffies;
2960
2961	err = __send_request(session, req, false);
2962
2963out_session:
2964	ceph_put_mds_session(session);
2965finish:
2966	if (err) {
2967		dout("__do_request early error %d\n", err);
2968		req->r_err = err;
2969		complete_request(mdsc, req);
2970		__unregister_request(mdsc, req);
2971	}
2972	return;
2973}
2974
2975/*
2976 * called under mdsc->mutex
2977 */
2978static void __wake_requests(struct ceph_mds_client *mdsc,
2979			    struct list_head *head)
2980{
2981	struct ceph_mds_request *req;
2982	LIST_HEAD(tmp_list);
2983
2984	list_splice_init(head, &tmp_list);
2985
2986	while (!list_empty(&tmp_list)) {
2987		req = list_entry(tmp_list.next,
2988				 struct ceph_mds_request, r_wait);
2989		list_del_init(&req->r_wait);
2990		dout(" wake request %p tid %llu\n", req, req->r_tid);
2991		__do_request(mdsc, req);
2992	}
2993}
2994
2995/*
2996 * Wake up threads with requests pending for @mds, so that they can
2997 * resubmit their requests to a possibly different mds.
2998 */
2999static void kick_requests(struct ceph_mds_client *mdsc, int mds)
3000{
3001	struct ceph_mds_request *req;
3002	struct rb_node *p = rb_first(&mdsc->request_tree);
3003
3004	dout("kick_requests mds%d\n", mds);
3005	while (p) {
3006		req = rb_entry(p, struct ceph_mds_request, r_node);
3007		p = rb_next(p);
3008		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3009			continue;
3010		if (req->r_attempts > 0)
3011			continue; /* only new requests */
3012		if (req->r_session &&
3013		    req->r_session->s_mds == mds) {
3014			dout(" kicking tid %llu\n", req->r_tid);
3015			list_del_init(&req->r_wait);
3016			__do_request(mdsc, req);
3017		}
3018	}
3019}
3020
3021int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
3022			      struct ceph_mds_request *req)
3023{
3024	int err = 0;
3025
3026	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3027	if (req->r_inode)
3028		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3029	if (req->r_parent) {
3030		struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3031		int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3032			    CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3033		spin_lock(&ci->i_ceph_lock);
3034		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3035		__ceph_touch_fmode(ci, mdsc, fmode);
3036		spin_unlock(&ci->i_ceph_lock);
3037	}
3038	if (req->r_old_dentry_dir)
3039		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3040				  CEPH_CAP_PIN);
3041
3042	if (req->r_inode) {
3043		err = ceph_wait_on_async_create(req->r_inode);
3044		if (err) {
3045			dout("%s: wait for async create returned: %d\n",
3046			     __func__, err);
3047			return err;
3048		}
3049	}
3050
3051	if (!err && req->r_old_inode) {
3052		err = ceph_wait_on_async_create(req->r_old_inode);
3053		if (err) {
3054			dout("%s: wait for async create returned: %d\n",
3055			     __func__, err);
3056			return err;
3057		}
3058	}
3059
3060	dout("submit_request on %p for inode %p\n", req, dir);
3061	mutex_lock(&mdsc->mutex);
3062	__register_request(mdsc, req, dir);
3063	__do_request(mdsc, req);
3064	err = req->r_err;
3065	mutex_unlock(&mdsc->mutex);
3066	return err;
3067}
3068
3069static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3070				  struct ceph_mds_request *req)
3071{
3072	int err;
3073
3074	/* wait */
3075	dout("do_request waiting\n");
3076	if (!req->r_timeout && req->r_wait_for_completion) {
3077		err = req->r_wait_for_completion(mdsc, req);
3078	} else {
3079		long timeleft = wait_for_completion_killable_timeout(
3080					&req->r_completion,
3081					ceph_timeout_jiffies(req->r_timeout));
3082		if (timeleft > 0)
3083			err = 0;
3084		else if (!timeleft)
3085			err = -ETIMEDOUT;  /* timed out */
3086		else
3087			err = timeleft;  /* killed */
3088	}
3089	dout("do_request waited, got %d\n", err);
3090	mutex_lock(&mdsc->mutex);
3091
3092	/* only abort if we didn't race with a real reply */
3093	if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3094		err = le32_to_cpu(req->r_reply_info.head->result);
3095	} else if (err < 0) {
3096		dout("aborted request %lld with %d\n", req->r_tid, err);
3097
3098		/*
3099		 * ensure we aren't running concurrently with
3100		 * ceph_fill_trace or ceph_readdir_prepopulate, which
3101		 * rely on locks (dir mutex) held by our caller.
3102		 */
3103		mutex_lock(&req->r_fill_mutex);
3104		req->r_err = err;
3105		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3106		mutex_unlock(&req->r_fill_mutex);
3107
3108		if (req->r_parent &&
3109		    (req->r_op & CEPH_MDS_OP_WRITE))
3110			ceph_invalidate_dir_request(req);
3111	} else {
3112		err = req->r_err;
3113	}
3114
3115	mutex_unlock(&mdsc->mutex);
3116	return err;
3117}
3118
3119/*
3120 * Synchrously perform an mds request.  Take care of all of the
3121 * session setup, forwarding, retry details.
3122 */
3123int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3124			 struct inode *dir,
3125			 struct ceph_mds_request *req)
3126{
3127	int err;
3128
3129	dout("do_request on %p\n", req);
3130
3131	/* issue */
3132	err = ceph_mdsc_submit_request(mdsc, dir, req);
3133	if (!err)
3134		err = ceph_mdsc_wait_request(mdsc, req);
3135	dout("do_request %p done, result %d\n", req, err);
3136	return err;
3137}
3138
3139/*
3140 * Invalidate dir's completeness, dentry lease state on an aborted MDS
3141 * namespace request.
3142 */
3143void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3144{
3145	struct inode *dir = req->r_parent;
3146	struct inode *old_dir = req->r_old_dentry_dir;
3147
3148	dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3149
3150	ceph_dir_clear_complete(dir);
3151	if (old_dir)
3152		ceph_dir_clear_complete(old_dir);
3153	if (req->r_dentry)
3154		ceph_invalidate_dentry_lease(req->r_dentry);
3155	if (req->r_old_dentry)
3156		ceph_invalidate_dentry_lease(req->r_old_dentry);
3157}
3158
3159/*
3160 * Handle mds reply.
3161 *
3162 * We take the session mutex and parse and process the reply immediately.
3163 * This preserves the logical ordering of replies, capabilities, etc., sent
3164 * by the MDS as they are applied to our local cache.
3165 */
3166static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3167{
3168	struct ceph_mds_client *mdsc = session->s_mdsc;
3169	struct ceph_mds_request *req;
3170	struct ceph_mds_reply_head *head = msg->front.iov_base;
3171	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3172	struct ceph_snap_realm *realm;
3173	u64 tid;
3174	int err, result;
3175	int mds = session->s_mds;
3176
3177	if (msg->front.iov_len < sizeof(*head)) {
3178		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3179		ceph_msg_dump(msg);
3180		return;
3181	}
3182
3183	/* get request, session */
3184	tid = le64_to_cpu(msg->hdr.tid);
3185	mutex_lock(&mdsc->mutex);
3186	req = lookup_get_request(mdsc, tid);
3187	if (!req) {
3188		dout("handle_reply on unknown tid %llu\n", tid);
3189		mutex_unlock(&mdsc->mutex);
3190		return;
3191	}
3192	dout("handle_reply %p\n", req);
3193
3194	/* correct session? */
3195	if (req->r_session != session) {
3196		pr_err("mdsc_handle_reply got %llu on session mds%d"
3197		       " not mds%d\n", tid, session->s_mds,
3198		       req->r_session ? req->r_session->s_mds : -1);
3199		mutex_unlock(&mdsc->mutex);
3200		goto out;
3201	}
3202
3203	/* dup? */
3204	if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3205	    (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3206		pr_warn("got a dup %s reply on %llu from mds%d\n",
3207			   head->safe ? "safe" : "unsafe", tid, mds);
3208		mutex_unlock(&mdsc->mutex);
3209		goto out;
3210	}
3211	if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3212		pr_warn("got unsafe after safe on %llu from mds%d\n",
3213			   tid, mds);
3214		mutex_unlock(&mdsc->mutex);
3215		goto out;
3216	}
3217
3218	result = le32_to_cpu(head->result);
3219
3220	/*
3221	 * Handle an ESTALE
3222	 * if we're not talking to the authority, send to them
3223	 * if the authority has changed while we weren't looking,
3224	 * send to new authority
3225	 * Otherwise we just have to return an ESTALE
3226	 */
3227	if (result == -ESTALE) {
3228		dout("got ESTALE on request %llu\n", req->r_tid);
3229		req->r_resend_mds = -1;
3230		if (req->r_direct_mode != USE_AUTH_MDS) {
3231			dout("not using auth, setting for that now\n");
3232			req->r_direct_mode = USE_AUTH_MDS;
3233			__do_request(mdsc, req);
3234			mutex_unlock(&mdsc->mutex);
3235			goto out;
3236		} else  {
3237			int mds = __choose_mds(mdsc, req, NULL);
3238			if (mds >= 0 && mds != req->r_session->s_mds) {
3239				dout("but auth changed, so resending\n");
3240				__do_request(mdsc, req);
3241				mutex_unlock(&mdsc->mutex);
3242				goto out;
3243			}
3244		}
3245		dout("have to return ESTALE on request %llu\n", req->r_tid);
3246	}
3247
3248
3249	if (head->safe) {
3250		set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3251		__unregister_request(mdsc, req);
3252
3253		/* last request during umount? */
3254		if (mdsc->stopping && !__get_oldest_req(mdsc))
3255			complete_all(&mdsc->safe_umount_waiters);
3256
3257		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3258			/*
3259			 * We already handled the unsafe response, now do the
3260			 * cleanup.  No need to examine the response; the MDS
3261			 * doesn't include any result info in the safe
3262			 * response.  And even if it did, there is nothing
3263			 * useful we could do with a revised return value.
3264			 */
3265			dout("got safe reply %llu, mds%d\n", tid, mds);
3266
3267			mutex_unlock(&mdsc->mutex);
3268			goto out;
3269		}
3270	} else {
3271		set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3272		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3273	}
3274
3275	dout("handle_reply tid %lld result %d\n", tid, result);
3276	rinfo = &req->r_reply_info;
3277	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3278		err = parse_reply_info(session, msg, rinfo, (u64)-1);
3279	else
3280		err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3281	mutex_unlock(&mdsc->mutex);
3282
3283	/* Must find target inode outside of mutexes to avoid deadlocks */
3284	if ((err >= 0) && rinfo->head->is_target) {
3285		struct inode *in;
3286		struct ceph_vino tvino = {
3287			.ino  = le64_to_cpu(rinfo->targeti.in->ino),
3288			.snap = le64_to_cpu(rinfo->targeti.in->snapid)
3289		};
3290
3291		in = ceph_get_inode(mdsc->fsc->sb, tvino);
3292		if (IS_ERR(in)) {
3293			err = PTR_ERR(in);
3294			mutex_lock(&session->s_mutex);
3295			goto out_err;
3296		}
3297		req->r_target_inode = in;
3298	}
3299
3300	mutex_lock(&session->s_mutex);
3301	if (err < 0) {
3302		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3303		ceph_msg_dump(msg);
3304		goto out_err;
3305	}
3306
3307	/* snap trace */
3308	realm = NULL;
3309	if (rinfo->snapblob_len) {
3310		down_write(&mdsc->snap_rwsem);
3311		ceph_update_snap_trace(mdsc, rinfo->snapblob,
3312				rinfo->snapblob + rinfo->snapblob_len,
3313				le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3314				&realm);
3315		downgrade_write(&mdsc->snap_rwsem);
3316	} else {
3317		down_read(&mdsc->snap_rwsem);
3318	}
3319
3320	/* insert trace into our cache */
3321	mutex_lock(&req->r_fill_mutex);
3322	current->journal_info = req;
3323	err = ceph_fill_trace(mdsc->fsc->sb, req);
3324	if (err == 0) {
3325		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3326				    req->r_op == CEPH_MDS_OP_LSSNAP))
3327			ceph_readdir_prepopulate(req, req->r_session);
3328	}
3329	current->journal_info = NULL;
3330	mutex_unlock(&req->r_fill_mutex);
3331
3332	up_read(&mdsc->snap_rwsem);
3333	if (realm)
3334		ceph_put_snap_realm(mdsc, realm);
3335
3336	if (err == 0) {
3337		if (req->r_target_inode &&
3338		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3339			struct ceph_inode_info *ci =
3340				ceph_inode(req->r_target_inode);
3341			spin_lock(&ci->i_unsafe_lock);
3342			list_add_tail(&req->r_unsafe_target_item,
3343				      &ci->i_unsafe_iops);
3344			spin_unlock(&ci->i_unsafe_lock);
3345		}
3346
3347		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3348	}
3349out_err:
3350	mutex_lock(&mdsc->mutex);
3351	if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3352		if (err) {
3353			req->r_err = err;
3354		} else {
3355			req->r_reply =  ceph_msg_get(msg);
3356			set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3357		}
3358	} else {
3359		dout("reply arrived after request %lld was aborted\n", tid);
3360	}
3361	mutex_unlock(&mdsc->mutex);
3362
3363	mutex_unlock(&session->s_mutex);
3364
3365	/* kick calling process */
3366	complete_request(mdsc, req);
3367
3368	ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
3369				     req->r_end_latency, err);
3370out:
3371	ceph_mdsc_put_request(req);
3372	return;
3373}
3374
3375
3376
3377/*
3378 * handle mds notification that our request has been forwarded.
3379 */
3380static void handle_forward(struct ceph_mds_client *mdsc,
3381			   struct ceph_mds_session *session,
3382			   struct ceph_msg *msg)
3383{
3384	struct ceph_mds_request *req;
3385	u64 tid = le64_to_cpu(msg->hdr.tid);
3386	u32 next_mds;
3387	u32 fwd_seq;
3388	int err = -EINVAL;
3389	void *p = msg->front.iov_base;
3390	void *end = p + msg->front.iov_len;
3391
3392	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3393	next_mds = ceph_decode_32(&p);
3394	fwd_seq = ceph_decode_32(&p);
3395
3396	mutex_lock(&mdsc->mutex);
3397	req = lookup_get_request(mdsc, tid);
3398	if (!req) {
3399		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3400		goto out;  /* dup reply? */
3401	}
3402
3403	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3404		dout("forward tid %llu aborted, unregistering\n", tid);
3405		__unregister_request(mdsc, req);
3406	} else if (fwd_seq <= req->r_num_fwd) {
3407		dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3408		     tid, next_mds, req->r_num_fwd, fwd_seq);
3409	} else {
3410		/* resend. forward race not possible; mds would drop */
3411		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3412		BUG_ON(req->r_err);
3413		BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3414		req->r_attempts = 0;
3415		req->r_num_fwd = fwd_seq;
3416		req->r_resend_mds = next_mds;
3417		put_request_session(req);
3418		__do_request(mdsc, req);
3419	}
3420	ceph_mdsc_put_request(req);
3421out:
3422	mutex_unlock(&mdsc->mutex);
3423	return;
3424
3425bad:
3426	pr_err("mdsc_handle_forward decode error err=%d\n", err);
3427}
3428
3429static int __decode_session_metadata(void **p, void *end,
3430				     bool *blocklisted)
3431{
3432	/* map<string,string> */
3433	u32 n;
3434	bool err_str;
3435	ceph_decode_32_safe(p, end, n, bad);
3436	while (n-- > 0) {
3437		u32 len;
3438		ceph_decode_32_safe(p, end, len, bad);
3439		ceph_decode_need(p, end, len, bad);
3440		err_str = !strncmp(*p, "error_string", len);
3441		*p += len;
3442		ceph_decode_32_safe(p, end, len, bad);
3443		ceph_decode_need(p, end, len, bad);
3444		/*
3445		 * Match "blocklisted (blacklisted)" from newer MDSes,
3446		 * or "blacklisted" from older MDSes.
3447		 */
3448		if (err_str && strnstr(*p, "blacklisted", len))
3449			*blocklisted = true;
3450		*p += len;
3451	}
3452	return 0;
3453bad:
3454	return -1;
3455}
3456
3457/*
3458 * handle a mds session control message
3459 */
3460static void handle_session(struct ceph_mds_session *session,
3461			   struct ceph_msg *msg)
3462{
3463	struct ceph_mds_client *mdsc = session->s_mdsc;
3464	int mds = session->s_mds;
3465	int msg_version = le16_to_cpu(msg->hdr.version);
3466	void *p = msg->front.iov_base;
3467	void *end = p + msg->front.iov_len;
3468	struct ceph_mds_session_head *h;
3469	u32 op;
3470	u64 seq, features = 0;
3471	int wake = 0;
3472	bool blocklisted = false;
3473
3474	/* decode */
3475	ceph_decode_need(&p, end, sizeof(*h), bad);
3476	h = p;
3477	p += sizeof(*h);
3478
3479	op = le32_to_cpu(h->op);
3480	seq = le64_to_cpu(h->seq);
3481
3482	if (msg_version >= 3) {
3483		u32 len;
3484		/* version >= 2, metadata */
3485		if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3486			goto bad;
3487		/* version >= 3, feature bits */
3488		ceph_decode_32_safe(&p, end, len, bad);
3489		if (len) {
3490			ceph_decode_64_safe(&p, end, features, bad);
3491			p += len - sizeof(features);
3492		}
3493	}
3494
3495	mutex_lock(&mdsc->mutex);
3496	if (op == CEPH_SESSION_CLOSE) {
3497		ceph_get_mds_session(session);
3498		__unregister_session(mdsc, session);
3499	}
3500	/* FIXME: this ttl calculation is generous */
3501	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3502	mutex_unlock(&mdsc->mutex);
3503
3504	mutex_lock(&session->s_mutex);
3505
3506	dout("handle_session mds%d %s %p state %s seq %llu\n",
3507	     mds, ceph_session_op_name(op), session,
3508	     ceph_session_state_name(session->s_state), seq);
3509
3510	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3511		session->s_state = CEPH_MDS_SESSION_OPEN;
3512		pr_info("mds%d came back\n", session->s_mds);
3513	}
3514
3515	switch (op) {
3516	case CEPH_SESSION_OPEN:
3517		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3518			pr_info("mds%d reconnect success\n", session->s_mds);
3519		session->s_state = CEPH_MDS_SESSION_OPEN;
3520		session->s_features = features;
3521		renewed_caps(mdsc, session, 0);
3522		if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
3523			metric_schedule_delayed(&mdsc->metric);
3524		wake = 1;
3525		if (mdsc->stopping)
3526			__close_session(mdsc, session);
3527		break;
3528
3529	case CEPH_SESSION_RENEWCAPS:
3530		if (session->s_renew_seq == seq)
3531			renewed_caps(mdsc, session, 1);
3532		break;
3533
3534	case CEPH_SESSION_CLOSE:
3535		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3536			pr_info("mds%d reconnect denied\n", session->s_mds);
3537		session->s_state = CEPH_MDS_SESSION_CLOSED;
3538		cleanup_session_requests(mdsc, session);
3539		remove_session_caps(session);
3540		wake = 2; /* for good measure */
3541		wake_up_all(&mdsc->session_close_wq);
3542		break;
3543
3544	case CEPH_SESSION_STALE:
3545		pr_info("mds%d caps went stale, renewing\n",
3546			session->s_mds);
3547		atomic_inc(&session->s_cap_gen);
3548		session->s_cap_ttl = jiffies - 1;
3549		send_renew_caps(mdsc, session);
3550		break;
3551
3552	case CEPH_SESSION_RECALL_STATE:
3553		ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3554		break;
3555
3556	case CEPH_SESSION_FLUSHMSG:
3557		send_flushmsg_ack(mdsc, session, seq);
3558		break;
3559
3560	case CEPH_SESSION_FORCE_RO:
3561		dout("force_session_readonly %p\n", session);
3562		spin_lock(&session->s_cap_lock);
3563		session->s_readonly = true;
3564		spin_unlock(&session->s_cap_lock);
3565		wake_up_session_caps(session, FORCE_RO);
3566		break;
3567
3568	case CEPH_SESSION_REJECT:
3569		WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3570		pr_info("mds%d rejected session\n", session->s_mds);
3571		session->s_state = CEPH_MDS_SESSION_REJECTED;
3572		cleanup_session_requests(mdsc, session);
3573		remove_session_caps(session);
3574		if (blocklisted)
3575			mdsc->fsc->blocklisted = true;
3576		wake = 2; /* for good measure */
3577		break;
3578
3579	default:
3580		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3581		WARN_ON(1);
3582	}
3583
3584	mutex_unlock(&session->s_mutex);
3585	if (wake) {
3586		mutex_lock(&mdsc->mutex);
3587		__wake_requests(mdsc, &session->s_waiting);
3588		if (wake == 2)
3589			kick_requests(mdsc, mds);
3590		mutex_unlock(&mdsc->mutex);
3591	}
3592	if (op == CEPH_SESSION_CLOSE)
3593		ceph_put_mds_session(session);
3594	return;
3595
3596bad:
3597	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3598	       (int)msg->front.iov_len);
3599	ceph_msg_dump(msg);
3600	return;
3601}
3602
3603void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3604{
3605	int dcaps;
3606
3607	dcaps = xchg(&req->r_dir_caps, 0);
3608	if (dcaps) {
3609		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3610		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3611	}
3612}
3613
3614void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3615{
3616	int dcaps;
3617
3618	dcaps = xchg(&req->r_dir_caps, 0);
3619	if (dcaps) {
3620		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3621		ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3622						dcaps);
3623	}
3624}
3625
3626/*
3627 * called under session->mutex.
3628 */
3629static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3630				   struct ceph_mds_session *session)
3631{
3632	struct ceph_mds_request *req, *nreq;
3633	struct rb_node *p;
3634
3635	dout("replay_unsafe_requests mds%d\n", session->s_mds);
3636
3637	mutex_lock(&mdsc->mutex);
3638	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3639		__send_request(session, req, true);
3640
3641	/*
3642	 * also re-send old requests when MDS enters reconnect stage. So that MDS
3643	 * can process completed request in clientreplay stage.
3644	 */
3645	p = rb_first(&mdsc->request_tree);
3646	while (p) {
3647		req = rb_entry(p, struct ceph_mds_request, r_node);
3648		p = rb_next(p);
3649		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3650			continue;
3651		if (req->r_attempts == 0)
3652			continue; /* only old requests */
3653		if (!req->r_session)
3654			continue;
3655		if (req->r_session->s_mds != session->s_mds)
3656			continue;
3657
3658		ceph_mdsc_release_dir_caps_no_check(req);
3659
3660		__send_request(session, req, true);
3661	}
3662	mutex_unlock(&mdsc->mutex);
3663}
3664
3665static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3666{
3667	struct ceph_msg *reply;
3668	struct ceph_pagelist *_pagelist;
3669	struct page *page;
3670	__le32 *addr;
3671	int err = -ENOMEM;
3672
3673	if (!recon_state->allow_multi)
3674		return -ENOSPC;
3675
3676	/* can't handle message that contains both caps and realm */
3677	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3678
3679	/* pre-allocate new pagelist */
3680	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
3681	if (!_pagelist)
3682		return -ENOMEM;
3683
3684	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3685	if (!reply)
3686		goto fail_msg;
3687
3688	/* placeholder for nr_caps */
3689	err = ceph_pagelist_encode_32(_pagelist, 0);
3690	if (err < 0)
3691		goto fail;
3692
3693	if (recon_state->nr_caps) {
3694		/* currently encoding caps */
3695		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3696		if (err)
3697			goto fail;
3698	} else {
3699		/* placeholder for nr_realms (currently encoding relams) */
3700		err = ceph_pagelist_encode_32(_pagelist, 0);
3701		if (err < 0)
3702			goto fail;
3703	}
3704
3705	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3706	if (err)
3707		goto fail;
3708
3709	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3710	addr = kmap_atomic(page);
3711	if (recon_state->nr_caps) {
3712		/* currently encoding caps */
3713		*addr = cpu_to_le32(recon_state->nr_caps);
3714	} else {
3715		/* currently encoding relams */
3716		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3717	}
3718	kunmap_atomic(addr);
3719
3720	reply->hdr.version = cpu_to_le16(5);
3721	reply->hdr.compat_version = cpu_to_le16(4);
3722
3723	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3724	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3725
3726	ceph_con_send(&recon_state->session->s_con, reply);
3727	ceph_pagelist_release(recon_state->pagelist);
3728
3729	recon_state->pagelist = _pagelist;
3730	recon_state->nr_caps = 0;
3731	recon_state->nr_realms = 0;
3732	recon_state->msg_version = 5;
3733	return 0;
3734fail:
3735	ceph_msg_put(reply);
3736fail_msg:
3737	ceph_pagelist_release(_pagelist);
3738	return err;
3739}
3740
3741static struct dentry* d_find_primary(struct inode *inode)
3742{
3743	struct dentry *alias, *dn = NULL;
3744
3745	if (hlist_empty(&inode->i_dentry))
3746		return NULL;
3747
3748	spin_lock(&inode->i_lock);
3749	if (hlist_empty(&inode->i_dentry))
3750		goto out_unlock;
3751
3752	if (S_ISDIR(inode->i_mode)) {
3753		alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3754		if (!IS_ROOT(alias))
3755			dn = dget(alias);
3756		goto out_unlock;
3757	}
3758
3759	hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3760		spin_lock(&alias->d_lock);
3761		if (!d_unhashed(alias) &&
3762		    (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3763			dn = dget_dlock(alias);
3764		}
3765		spin_unlock(&alias->d_lock);
3766		if (dn)
3767			break;
3768	}
3769out_unlock:
3770	spin_unlock(&inode->i_lock);
3771	return dn;
3772}
3773
3774/*
3775 * Encode information about a cap for a reconnect with the MDS.
3776 */
3777static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3778			  void *arg)
3779{
3780	union {
3781		struct ceph_mds_cap_reconnect v2;
3782		struct ceph_mds_cap_reconnect_v1 v1;
3783	} rec;
3784	struct ceph_inode_info *ci = cap->ci;
3785	struct ceph_reconnect_state *recon_state = arg;
3786	struct ceph_pagelist *pagelist = recon_state->pagelist;
3787	struct dentry *dentry;
3788	char *path;
3789	int pathlen, err;
3790	u64 pathbase;
3791	u64 snap_follows;
3792
3793	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3794	     inode, ceph_vinop(inode), cap, cap->cap_id,
3795	     ceph_cap_string(cap->issued));
3796
3797	dentry = d_find_primary(inode);
3798	if (dentry) {
3799		/* set pathbase to parent dir when msg_version >= 2 */
3800		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3801					    recon_state->msg_version >= 2);
3802		dput(dentry);
3803		if (IS_ERR(path)) {
3804			err = PTR_ERR(path);
3805			goto out_err;
3806		}
3807	} else {
3808		path = NULL;
3809		pathlen = 0;
3810		pathbase = 0;
3811	}
3812
3813	spin_lock(&ci->i_ceph_lock);
3814	cap->seq = 0;        /* reset cap seq */
3815	cap->issue_seq = 0;  /* and issue_seq */
3816	cap->mseq = 0;       /* and migrate_seq */
3817	cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
3818
3819	/* These are lost when the session goes away */
3820	if (S_ISDIR(inode->i_mode)) {
3821		if (cap->issued & CEPH_CAP_DIR_CREATE) {
3822			ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3823			memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3824		}
3825		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3826	}
3827
3828	if (recon_state->msg_version >= 2) {
3829		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3830		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3831		rec.v2.issued = cpu_to_le32(cap->issued);
3832		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3833		rec.v2.pathbase = cpu_to_le64(pathbase);
3834		rec.v2.flock_len = (__force __le32)
3835			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3836	} else {
3837		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3838		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3839		rec.v1.issued = cpu_to_le32(cap->issued);
3840		rec.v1.size = cpu_to_le64(i_size_read(inode));
3841		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3842		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3843		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3844		rec.v1.pathbase = cpu_to_le64(pathbase);
3845	}
3846
3847	if (list_empty(&ci->i_cap_snaps)) {
3848		snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3849	} else {
3850		struct ceph_cap_snap *capsnap =
3851			list_first_entry(&ci->i_cap_snaps,
3852					 struct ceph_cap_snap, ci_item);
3853		snap_follows = capsnap->follows;
3854	}
3855	spin_unlock(&ci->i_ceph_lock);
3856
3857	if (recon_state->msg_version >= 2) {
3858		int num_fcntl_locks, num_flock_locks;
3859		struct ceph_filelock *flocks = NULL;
3860		size_t struct_len, total_len = sizeof(u64);
3861		u8 struct_v = 0;
3862
3863encode_again:
3864		if (rec.v2.flock_len) {
3865			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3866		} else {
3867			num_fcntl_locks = 0;
3868			num_flock_locks = 0;
3869		}
3870		if (num_fcntl_locks + num_flock_locks > 0) {
3871			flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3872					       sizeof(struct ceph_filelock),
3873					       GFP_NOFS);
3874			if (!flocks) {
3875				err = -ENOMEM;
3876				goto out_err;
3877			}
3878			err = ceph_encode_locks_to_buffer(inode, flocks,
3879							  num_fcntl_locks,
3880							  num_flock_locks);
3881			if (err) {
3882				kfree(flocks);
3883				flocks = NULL;
3884				if (err == -ENOSPC)
3885					goto encode_again;
3886				goto out_err;
3887			}
3888		} else {
3889			kfree(flocks);
3890			flocks = NULL;
3891		}
3892
3893		if (recon_state->msg_version >= 3) {
3894			/* version, compat_version and struct_len */
3895			total_len += 2 * sizeof(u8) + sizeof(u32);
3896			struct_v = 2;
3897		}
3898		/*
3899		 * number of encoded locks is stable, so copy to pagelist
3900		 */
3901		struct_len = 2 * sizeof(u32) +
3902			    (num_fcntl_locks + num_flock_locks) *
3903			    sizeof(struct ceph_filelock);
3904		rec.v2.flock_len = cpu_to_le32(struct_len);
3905
3906		struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
3907
3908		if (struct_v >= 2)
3909			struct_len += sizeof(u64); /* snap_follows */
3910
3911		total_len += struct_len;
3912
3913		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3914			err = send_reconnect_partial(recon_state);
3915			if (err)
3916				goto out_freeflocks;
3917			pagelist = recon_state->pagelist;
3918		}
3919
3920		err = ceph_pagelist_reserve(pagelist, total_len);
3921		if (err)
3922			goto out_freeflocks;
3923
3924		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3925		if (recon_state->msg_version >= 3) {
3926			ceph_pagelist_encode_8(pagelist, struct_v);
3927			ceph_pagelist_encode_8(pagelist, 1);
3928			ceph_pagelist_encode_32(pagelist, struct_len);
3929		}
3930		ceph_pagelist_encode_string(pagelist, path, pathlen);
3931		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3932		ceph_locks_to_pagelist(flocks, pagelist,
3933				       num_fcntl_locks, num_flock_locks);
3934		if (struct_v >= 2)
3935			ceph_pagelist_encode_64(pagelist, snap_follows);
3936out_freeflocks:
3937		kfree(flocks);
3938	} else {
3939		err = ceph_pagelist_reserve(pagelist,
3940					    sizeof(u64) + sizeof(u32) +
3941					    pathlen + sizeof(rec.v1));
3942		if (err)
3943			goto out_err;
3944
3945		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3946		ceph_pagelist_encode_string(pagelist, path, pathlen);
3947		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3948	}
3949
3950out_err:
3951	ceph_mdsc_free_path(path, pathlen);
3952	if (!err)
3953		recon_state->nr_caps++;
3954	return err;
3955}
3956
3957static int encode_snap_realms(struct ceph_mds_client *mdsc,
3958			      struct ceph_reconnect_state *recon_state)
3959{
3960	struct rb_node *p;
3961	struct ceph_pagelist *pagelist = recon_state->pagelist;
3962	int err = 0;
3963
3964	if (recon_state->msg_version >= 4) {
3965		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3966		if (err < 0)
3967			goto fail;
3968	}
3969
3970	/*
3971	 * snaprealms.  we provide mds with the ino, seq (version), and
3972	 * parent for all of our realms.  If the mds has any newer info,
3973	 * it will tell us.
3974	 */
3975	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3976		struct ceph_snap_realm *realm =
3977		       rb_entry(p, struct ceph_snap_realm, node);
3978		struct ceph_mds_snaprealm_reconnect sr_rec;
3979
3980		if (recon_state->msg_version >= 4) {
3981			size_t need = sizeof(u8) * 2 + sizeof(u32) +
3982				      sizeof(sr_rec);
3983
3984			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3985				err = send_reconnect_partial(recon_state);
3986				if (err)
3987					goto fail;
3988				pagelist = recon_state->pagelist;
3989			}
3990
3991			err = ceph_pagelist_reserve(pagelist, need);
3992			if (err)
3993				goto fail;
3994
3995			ceph_pagelist_encode_8(pagelist, 1);
3996			ceph_pagelist_encode_8(pagelist, 1);
3997			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3998		}
3999
4000		dout(" adding snap realm %llx seq %lld parent %llx\n",
4001		     realm->ino, realm->seq, realm->parent_ino);
4002		sr_rec.ino = cpu_to_le64(realm->ino);
4003		sr_rec.seq = cpu_to_le64(realm->seq);
4004		sr_rec.parent = cpu_to_le64(realm->parent_ino);
4005
4006		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4007		if (err)
4008			goto fail;
4009
4010		recon_state->nr_realms++;
4011	}
4012fail:
4013	return err;
4014}
4015
4016
4017/*
4018 * If an MDS fails and recovers, clients need to reconnect in order to
4019 * reestablish shared state.  This includes all caps issued through
4020 * this session _and_ the snap_realm hierarchy.  Because it's not
4021 * clear which snap realms the mds cares about, we send everything we
4022 * know about.. that ensures we'll then get any new info the
4023 * recovering MDS might have.
4024 *
4025 * This is a relatively heavyweight operation, but it's rare.
4026 */
4027static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4028			       struct ceph_mds_session *session)
4029{
4030	struct ceph_msg *reply;
4031	int mds = session->s_mds;
4032	int err = -ENOMEM;
4033	struct ceph_reconnect_state recon_state = {
4034		.session = session,
4035	};
4036	LIST_HEAD(dispose);
4037
4038	pr_info("mds%d reconnect start\n", mds);
4039
4040	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4041	if (!recon_state.pagelist)
4042		goto fail_nopagelist;
4043
4044	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4045	if (!reply)
4046		goto fail_nomsg;
4047
4048	xa_destroy(&session->s_delegated_inos);
4049
4050	mutex_lock(&session->s_mutex);
4051	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4052	session->s_seq = 0;
4053
4054	dout("session %p state %s\n", session,
4055	     ceph_session_state_name(session->s_state));
4056
4057	atomic_inc(&session->s_cap_gen);
4058
4059	spin_lock(&session->s_cap_lock);
4060	/* don't know if session is readonly */
4061	session->s_readonly = 0;
4062	/*
4063	 * notify __ceph_remove_cap() that we are composing cap reconnect.
4064	 * If a cap get released before being added to the cap reconnect,
4065	 * __ceph_remove_cap() should skip queuing cap release.
4066	 */
4067	session->s_cap_reconnect = 1;
4068	/* drop old cap expires; we're about to reestablish that state */
4069	detach_cap_releases(session, &dispose);
4070	spin_unlock(&session->s_cap_lock);
4071	dispose_cap_releases(mdsc, &dispose);
4072
4073	/* trim unused caps to reduce MDS's cache rejoin time */
4074	if (mdsc->fsc->sb->s_root)
4075		shrink_dcache_parent(mdsc->fsc->sb->s_root);
4076
4077	ceph_con_close(&session->s_con);
4078	ceph_con_open(&session->s_con,
4079		      CEPH_ENTITY_TYPE_MDS, mds,
4080		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4081
4082	/* replay unsafe requests */
4083	replay_unsafe_requests(mdsc, session);
4084
4085	ceph_early_kick_flushing_caps(mdsc, session);
4086
4087	down_read(&mdsc->snap_rwsem);
4088
4089	/* placeholder for nr_caps */
4090	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4091	if (err)
4092		goto fail;
4093
4094	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4095		recon_state.msg_version = 3;
4096		recon_state.allow_multi = true;
4097	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4098		recon_state.msg_version = 3;
4099	} else {
4100		recon_state.msg_version = 2;
4101	}
4102	/* trsaverse this session's caps */
4103	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4104
4105	spin_lock(&session->s_cap_lock);
4106	session->s_cap_reconnect = 0;
4107	spin_unlock(&session->s_cap_lock);
4108
4109	if (err < 0)
4110		goto fail;
4111
4112	/* check if all realms can be encoded into current message */
4113	if (mdsc->num_snap_realms) {
4114		size_t total_len =
4115			recon_state.pagelist->length +
4116			mdsc->num_snap_realms *
4117			sizeof(struct ceph_mds_snaprealm_reconnect);
4118		if (recon_state.msg_version >= 4) {
4119			/* number of realms */
4120			total_len += sizeof(u32);
4121			/* version, compat_version and struct_len */
4122			total_len += mdsc->num_snap_realms *
4123				     (2 * sizeof(u8) + sizeof(u32));
4124		}
4125		if (total_len > RECONNECT_MAX_SIZE) {
4126			if (!recon_state.allow_multi) {
4127				err = -ENOSPC;
4128				goto fail;
4129			}
4130			if (recon_state.nr_caps) {
4131				err = send_reconnect_partial(&recon_state);
4132				if (err)
4133					goto fail;
4134			}
4135			recon_state.msg_version = 5;
4136		}
4137	}
4138
4139	err = encode_snap_realms(mdsc, &recon_state);
4140	if (err < 0)
4141		goto fail;
4142
4143	if (recon_state.msg_version >= 5) {
4144		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4145		if (err < 0)
4146			goto fail;
4147	}
4148
4149	if (recon_state.nr_caps || recon_state.nr_realms) {
4150		struct page *page =
4151			list_first_entry(&recon_state.pagelist->head,
4152					struct page, lru);
4153		__le32 *addr = kmap_atomic(page);
4154		if (recon_state.nr_caps) {
4155			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4156			*addr = cpu_to_le32(recon_state.nr_caps);
4157		} else if (recon_state.msg_version >= 4) {
4158			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4159		}
4160		kunmap_atomic(addr);
4161	}
4162
4163	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4164	if (recon_state.msg_version >= 4)
4165		reply->hdr.compat_version = cpu_to_le16(4);
4166
4167	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4168	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4169
4170	ceph_con_send(&session->s_con, reply);
4171
4172	mutex_unlock(&session->s_mutex);
4173
4174	mutex_lock(&mdsc->mutex);
4175	__wake_requests(mdsc, &session->s_waiting);
4176	mutex_unlock(&mdsc->mutex);
4177
4178	up_read(&mdsc->snap_rwsem);
4179	ceph_pagelist_release(recon_state.pagelist);
4180	return;
4181
4182fail:
4183	ceph_msg_put(reply);
4184	up_read(&mdsc->snap_rwsem);
4185	mutex_unlock(&session->s_mutex);
4186fail_nomsg:
4187	ceph_pagelist_release(recon_state.pagelist);
4188fail_nopagelist:
4189	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4190	return;
4191}
4192
4193
4194/*
4195 * compare old and new mdsmaps, kicking requests
4196 * and closing out old connections as necessary
4197 *
4198 * called under mdsc->mutex.
4199 */
4200static void check_new_map(struct ceph_mds_client *mdsc,
4201			  struct ceph_mdsmap *newmap,
4202			  struct ceph_mdsmap *oldmap)
4203{
4204	int i, j, err;
4205	int oldstate, newstate;
4206	struct ceph_mds_session *s;
4207	unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
4208
4209	dout("check_new_map new %u old %u\n",
4210	     newmap->m_epoch, oldmap->m_epoch);
4211
4212	if (newmap->m_info) {
4213		for (i = 0; i < newmap->possible_max_rank; i++) {
4214			for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
4215				set_bit(newmap->m_info[i].export_targets[j], targets);
4216		}
4217	}
4218
4219	for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4220		if (!mdsc->sessions[i])
4221			continue;
4222		s = mdsc->sessions[i];
4223		oldstate = ceph_mdsmap_get_state(oldmap, i);
4224		newstate = ceph_mdsmap_get_state(newmap, i);
4225
4226		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4227		     i, ceph_mds_state_name(oldstate),
4228		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4229		     ceph_mds_state_name(newstate),
4230		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4231		     ceph_session_state_name(s->s_state));
4232
4233		if (i >= newmap->possible_max_rank) {
4234			/* force close session for stopped mds */
4235			ceph_get_mds_session(s);
4236			__unregister_session(mdsc, s);
4237			__wake_requests(mdsc, &s->s_waiting);
4238			mutex_unlock(&mdsc->mutex);
4239
4240			mutex_lock(&s->s_mutex);
4241			cleanup_session_requests(mdsc, s);
4242			remove_session_caps(s);
4243			mutex_unlock(&s->s_mutex);
4244
4245			ceph_put_mds_session(s);
4246
4247			mutex_lock(&mdsc->mutex);
4248			kick_requests(mdsc, i);
4249			continue;
4250		}
4251
4252		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4253			   ceph_mdsmap_get_addr(newmap, i),
4254			   sizeof(struct ceph_entity_addr))) {
4255			/* just close it */
4256			mutex_unlock(&mdsc->mutex);
4257			mutex_lock(&s->s_mutex);
4258			mutex_lock(&mdsc->mutex);
4259			ceph_con_close(&s->s_con);
4260			mutex_unlock(&s->s_mutex);
4261			s->s_state = CEPH_MDS_SESSION_RESTARTING;
4262		} else if (oldstate == newstate) {
4263			continue;  /* nothing new with this mds */
4264		}
4265
4266		/*
4267		 * send reconnect?
4268		 */
4269		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4270		    newstate >= CEPH_MDS_STATE_RECONNECT) {
4271			mutex_unlock(&mdsc->mutex);
4272			clear_bit(i, targets);
4273			send_mds_reconnect(mdsc, s);
4274			mutex_lock(&mdsc->mutex);
4275		}
4276
4277		/*
4278		 * kick request on any mds that has gone active.
4279		 */
4280		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4281		    newstate >= CEPH_MDS_STATE_ACTIVE) {
4282			if (oldstate != CEPH_MDS_STATE_CREATING &&
4283			    oldstate != CEPH_MDS_STATE_STARTING)
4284				pr_info("mds%d recovery completed\n", s->s_mds);
4285			kick_requests(mdsc, i);
4286			mutex_unlock(&mdsc->mutex);
4287			mutex_lock(&s->s_mutex);
4288			mutex_lock(&mdsc->mutex);
4289			ceph_kick_flushing_caps(mdsc, s);
4290			mutex_unlock(&s->s_mutex);
4291			wake_up_session_caps(s, RECONNECT);
4292		}
4293	}
4294
4295	/*
4296	 * Only open and reconnect sessions that don't exist yet.
4297	 */
4298	for (i = 0; i < newmap->possible_max_rank; i++) {
4299		/*
4300		 * In case the import MDS is crashed just after
4301		 * the EImportStart journal is flushed, so when
4302		 * a standby MDS takes over it and is replaying
4303		 * the EImportStart journal the new MDS daemon
4304		 * will wait the client to reconnect it, but the
4305		 * client may never register/open the session yet.
4306		 *
4307		 * Will try to reconnect that MDS daemon if the
4308		 * rank number is in the export targets array and
4309		 * is the up:reconnect state.
4310		 */
4311		newstate = ceph_mdsmap_get_state(newmap, i);
4312		if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
4313			continue;
4314
4315		/*
4316		 * The session maybe registered and opened by some
4317		 * requests which were choosing random MDSes during
4318		 * the mdsc->mutex's unlock/lock gap below in rare
4319		 * case. But the related MDS daemon will just queue
4320		 * that requests and be still waiting for the client's
4321		 * reconnection request in up:reconnect state.
4322		 */
4323		s = __ceph_lookup_mds_session(mdsc, i);
4324		if (likely(!s)) {
4325			s = __open_export_target_session(mdsc, i);
4326			if (IS_ERR(s)) {
4327				err = PTR_ERR(s);
4328				pr_err("failed to open export target session, err %d\n",
4329				       err);
4330				continue;
4331			}
4332		}
4333		dout("send reconnect to export target mds.%d\n", i);
4334		mutex_unlock(&mdsc->mutex);
4335		send_mds_reconnect(mdsc, s);
4336		ceph_put_mds_session(s);
4337		mutex_lock(&mdsc->mutex);
4338	}
4339
4340	for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4341		s = mdsc->sessions[i];
4342		if (!s)
4343			continue;
4344		if (!ceph_mdsmap_is_laggy(newmap, i))
4345			continue;
4346		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4347		    s->s_state == CEPH_MDS_SESSION_HUNG ||
4348		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
4349			dout(" connecting to export targets of laggy mds%d\n",
4350			     i);
4351			__open_export_target_sessions(mdsc, s);
4352		}
4353	}
4354}
4355
4356
4357
4358/*
4359 * leases
4360 */
4361
4362/*
4363 * caller must hold session s_mutex, dentry->d_lock
4364 */
4365void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4366{
4367	struct ceph_dentry_info *di = ceph_dentry(dentry);
4368
4369	ceph_put_mds_session(di->lease_session);
4370	di->lease_session = NULL;
4371}
4372
4373static void handle_lease(struct ceph_mds_client *mdsc,
4374			 struct ceph_mds_session *session,
4375			 struct ceph_msg *msg)
4376{
4377	struct super_block *sb = mdsc->fsc->sb;
4378	struct inode *inode;
4379	struct dentry *parent, *dentry;
4380	struct ceph_dentry_info *di;
4381	int mds = session->s_mds;
4382	struct ceph_mds_lease *h = msg->front.iov_base;
4383	u32 seq;
4384	struct ceph_vino vino;
4385	struct qstr dname;
4386	int release = 0;
4387
4388	dout("handle_lease from mds%d\n", mds);
4389
4390	/* decode */
4391	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4392		goto bad;
4393	vino.ino = le64_to_cpu(h->ino);
4394	vino.snap = CEPH_NOSNAP;
4395	seq = le32_to_cpu(h->seq);
4396	dname.len = get_unaligned_le32(h + 1);
4397	if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4398		goto bad;
4399	dname.name = (void *)(h + 1) + sizeof(u32);
4400
4401	/* lookup inode */
4402	inode = ceph_find_inode(sb, vino);
4403	dout("handle_lease %s, ino %llx %p %.*s\n",
4404	     ceph_lease_op_name(h->action), vino.ino, inode,
4405	     dname.len, dname.name);
4406
4407	mutex_lock(&session->s_mutex);
4408	inc_session_sequence(session);
4409
4410	if (!inode) {
4411		dout("handle_lease no inode %llx\n", vino.ino);
4412		goto release;
4413	}
4414
4415	/* dentry */
4416	parent = d_find_alias(inode);
4417	if (!parent) {
4418		dout("no parent dentry on inode %p\n", inode);
4419		WARN_ON(1);
4420		goto release;  /* hrm... */
4421	}
4422	dname.hash = full_name_hash(parent, dname.name, dname.len);
4423	dentry = d_lookup(parent, &dname);
4424	dput(parent);
4425	if (!dentry)
4426		goto release;
4427
4428	spin_lock(&dentry->d_lock);
4429	di = ceph_dentry(dentry);
4430	switch (h->action) {
4431	case CEPH_MDS_LEASE_REVOKE:
4432		if (di->lease_session == session) {
4433			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4434				h->seq = cpu_to_le32(di->lease_seq);
4435			__ceph_mdsc_drop_dentry_lease(dentry);
4436		}
4437		release = 1;
4438		break;
4439
4440	case CEPH_MDS_LEASE_RENEW:
4441		if (di->lease_session == session &&
4442		    di->lease_gen == atomic_read(&session->s_cap_gen) &&
4443		    di->lease_renew_from &&
4444		    di->lease_renew_after == 0) {
4445			unsigned long duration =
4446				msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4447
4448			di->lease_seq = seq;
4449			di->time = di->lease_renew_from + duration;
4450			di->lease_renew_after = di->lease_renew_from +
4451				(duration >> 1);
4452			di->lease_renew_from = 0;
4453		}
4454		break;
4455	}
4456	spin_unlock(&dentry->d_lock);
4457	dput(dentry);
4458
4459	if (!release)
4460		goto out;
4461
4462release:
4463	/* let's just reuse the same message */
4464	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4465	ceph_msg_get(msg);
4466	ceph_con_send(&session->s_con, msg);
4467
4468out:
4469	mutex_unlock(&session->s_mutex);
4470	iput(inode);
4471	return;
4472
4473bad:
4474	pr_err("corrupt lease message\n");
4475	ceph_msg_dump(msg);
4476}
4477
4478void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4479			      struct dentry *dentry, char action,
4480			      u32 seq)
4481{
4482	struct ceph_msg *msg;
4483	struct ceph_mds_lease *lease;
4484	struct inode *dir;
4485	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4486
4487	dout("lease_send_msg identry %p %s to mds%d\n",
4488	     dentry, ceph_lease_op_name(action), session->s_mds);
4489
4490	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4491	if (!msg)
4492		return;
4493	lease = msg->front.iov_base;
4494	lease->action = action;
4495	lease->seq = cpu_to_le32(seq);
4496
4497	spin_lock(&dentry->d_lock);
4498	dir = d_inode(dentry->d_parent);
4499	lease->ino = cpu_to_le64(ceph_ino(dir));
4500	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4501
4502	put_unaligned_le32(dentry->d_name.len, lease + 1);
4503	memcpy((void *)(lease + 1) + 4,
4504	       dentry->d_name.name, dentry->d_name.len);
4505	spin_unlock(&dentry->d_lock);
4506	/*
4507	 * if this is a preemptive lease RELEASE, no need to
4508	 * flush request stream, since the actual request will
4509	 * soon follow.
4510	 */
4511	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4512
4513	ceph_con_send(&session->s_con, msg);
4514}
4515
4516/*
4517 * lock unlock the session, to wait ongoing session activities
4518 */
4519static void lock_unlock_session(struct ceph_mds_session *s)
4520{
4521	mutex_lock(&s->s_mutex);
4522	mutex_unlock(&s->s_mutex);
4523}
4524
4525static void maybe_recover_session(struct ceph_mds_client *mdsc)
4526{
4527	struct ceph_fs_client *fsc = mdsc->fsc;
4528
4529	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4530		return;
4531
4532	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4533		return;
4534
4535	if (!READ_ONCE(fsc->blocklisted))
4536		return;
4537
4538	pr_info("auto reconnect after blocklisted\n");
4539	ceph_force_reconnect(fsc->sb);
4540}
4541
4542bool check_session_state(struct ceph_mds_session *s)
4543{
4544	struct ceph_fs_client *fsc = s->s_mdsc->fsc;
4545
4546	switch (s->s_state) {
4547	case CEPH_MDS_SESSION_OPEN:
4548		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4549			s->s_state = CEPH_MDS_SESSION_HUNG;
4550			pr_info("mds%d hung\n", s->s_mds);
4551		}
4552		break;
4553	case CEPH_MDS_SESSION_CLOSING:
4554		/* Should never reach this when not force unmounting */
4555		WARN_ON_ONCE(s->s_ttl &&
4556			     READ_ONCE(fsc->mount_state) != CEPH_MOUNT_SHUTDOWN);
4557		fallthrough;
4558	case CEPH_MDS_SESSION_NEW:
4559	case CEPH_MDS_SESSION_RESTARTING:
4560	case CEPH_MDS_SESSION_CLOSED:
4561	case CEPH_MDS_SESSION_REJECTED:
4562		return false;
4563	}
4564
4565	return true;
4566}
4567
4568/*
4569 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4570 * then we need to retransmit that request.
4571 */
4572void inc_session_sequence(struct ceph_mds_session *s)
4573{
4574	lockdep_assert_held(&s->s_mutex);
4575
4576	s->s_seq++;
4577
4578	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4579		int ret;
4580
4581		dout("resending session close request for mds%d\n", s->s_mds);
4582		ret = request_close_session(s);
4583		if (ret < 0)
4584			pr_err("unable to close session to mds%d: %d\n",
4585			       s->s_mds, ret);
4586	}
4587}
4588
4589/*
4590 * delayed work -- periodically trim expired leases, renew caps with mds.  If
4591 * the @delay parameter is set to 0 or if it's more than 5 secs, the default
4592 * workqueue delay value of 5 secs will be used.
4593 */
4594static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
4595{
4596	unsigned long max_delay = HZ * 5;
4597
4598	/* 5 secs default delay */
4599	if (!delay || (delay > max_delay))
4600		delay = max_delay;
4601	schedule_delayed_work(&mdsc->delayed_work,
4602			      round_jiffies_relative(delay));
4603}
4604
4605static void delayed_work(struct work_struct *work)
4606{
4607	struct ceph_mds_client *mdsc =
4608		container_of(work, struct ceph_mds_client, delayed_work.work);
4609	unsigned long delay;
4610	int renew_interval;
4611	int renew_caps;
4612	int i;
4613
4614	dout("mdsc delayed_work\n");
4615
4616	if (mdsc->stopping)
4617		return;
4618
4619	mutex_lock(&mdsc->mutex);
4620	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4621	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4622				   mdsc->last_renew_caps);
4623	if (renew_caps)
4624		mdsc->last_renew_caps = jiffies;
4625
4626	for (i = 0; i < mdsc->max_sessions; i++) {
4627		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4628		if (!s)
4629			continue;
4630
4631		if (!check_session_state(s)) {
4632			ceph_put_mds_session(s);
4633			continue;
4634		}
4635		mutex_unlock(&mdsc->mutex);
4636
4637		mutex_lock(&s->s_mutex);
4638		if (renew_caps)
4639			send_renew_caps(mdsc, s);
4640		else
4641			ceph_con_keepalive(&s->s_con);
4642		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4643		    s->s_state == CEPH_MDS_SESSION_HUNG)
4644			ceph_send_cap_releases(mdsc, s);
4645		mutex_unlock(&s->s_mutex);
4646		ceph_put_mds_session(s);
4647
4648		mutex_lock(&mdsc->mutex);
4649	}
4650	mutex_unlock(&mdsc->mutex);
4651
4652	delay = ceph_check_delayed_caps(mdsc);
4653
4654	ceph_queue_cap_reclaim_work(mdsc);
4655
4656	ceph_trim_snapid_map(mdsc);
4657
4658	maybe_recover_session(mdsc);
4659
4660	schedule_delayed(mdsc, delay);
4661}
4662
4663int ceph_mdsc_init(struct ceph_fs_client *fsc)
4664
4665{
4666	struct ceph_mds_client *mdsc;
4667	int err;
4668
4669	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4670	if (!mdsc)
4671		return -ENOMEM;
4672	mdsc->fsc = fsc;
4673	mutex_init(&mdsc->mutex);
4674	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4675	if (!mdsc->mdsmap) {
4676		err = -ENOMEM;
4677		goto err_mdsc;
4678	}
4679
4680	init_completion(&mdsc->safe_umount_waiters);
4681	init_waitqueue_head(&mdsc->session_close_wq);
4682	INIT_LIST_HEAD(&mdsc->waiting_for_map);
4683	mdsc->quotarealms_inodes = RB_ROOT;
4684	mutex_init(&mdsc->quotarealms_inodes_mutex);
4685	init_rwsem(&mdsc->snap_rwsem);
4686	mdsc->snap_realms = RB_ROOT;
4687	INIT_LIST_HEAD(&mdsc->snap_empty);
4688	spin_lock_init(&mdsc->snap_empty_lock);
4689	mdsc->request_tree = RB_ROOT;
4690	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4691	mdsc->last_renew_caps = jiffies;
4692	INIT_LIST_HEAD(&mdsc->cap_delay_list);
4693	INIT_LIST_HEAD(&mdsc->cap_wait_list);
4694	spin_lock_init(&mdsc->cap_delay_lock);
4695	INIT_LIST_HEAD(&mdsc->snap_flush_list);
4696	spin_lock_init(&mdsc->snap_flush_lock);
4697	mdsc->last_cap_flush_tid = 1;
4698	INIT_LIST_HEAD(&mdsc->cap_flush_list);
4699	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4700	spin_lock_init(&mdsc->cap_dirty_lock);
4701	init_waitqueue_head(&mdsc->cap_flushing_wq);
4702	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4703	err = ceph_metric_init(&mdsc->metric);
4704	if (err)
4705		goto err_mdsmap;
4706
4707	spin_lock_init(&mdsc->dentry_list_lock);
4708	INIT_LIST_HEAD(&mdsc->dentry_leases);
4709	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4710
4711	ceph_caps_init(mdsc);
4712	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4713
4714	spin_lock_init(&mdsc->snapid_map_lock);
4715	mdsc->snapid_map_tree = RB_ROOT;
4716	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4717
4718	init_rwsem(&mdsc->pool_perm_rwsem);
4719	mdsc->pool_perm_tree = RB_ROOT;
4720
4721	strscpy(mdsc->nodename, utsname()->nodename,
4722		sizeof(mdsc->nodename));
4723
4724	fsc->mdsc = mdsc;
4725	return 0;
4726
4727err_mdsmap:
4728	kfree(mdsc->mdsmap);
4729err_mdsc:
4730	kfree(mdsc);
4731	return err;
4732}
4733
4734/*
4735 * Wait for safe replies on open mds requests.  If we time out, drop
4736 * all requests from the tree to avoid dangling dentry refs.
4737 */
4738static void wait_requests(struct ceph_mds_client *mdsc)
4739{
4740	struct ceph_options *opts = mdsc->fsc->client->options;
4741	struct ceph_mds_request *req;
4742
4743	mutex_lock(&mdsc->mutex);
4744	if (__get_oldest_req(mdsc)) {
4745		mutex_unlock(&mdsc->mutex);
4746
4747		dout("wait_requests waiting for requests\n");
4748		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4749				    ceph_timeout_jiffies(opts->mount_timeout));
4750
4751		/* tear down remaining requests */
4752		mutex_lock(&mdsc->mutex);
4753		while ((req = __get_oldest_req(mdsc))) {
4754			dout("wait_requests timed out on tid %llu\n",
4755			     req->r_tid);
4756			list_del_init(&req->r_wait);
4757			__unregister_request(mdsc, req);
4758		}
4759	}
4760	mutex_unlock(&mdsc->mutex);
4761	dout("wait_requests done\n");
4762}
4763
4764void send_flush_mdlog(struct ceph_mds_session *s)
4765{
4766	struct ceph_msg *msg;
4767
4768	/*
4769	 * Pre-luminous MDS crashes when it sees an unknown session request
4770	 */
4771	if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
4772		return;
4773
4774	mutex_lock(&s->s_mutex);
4775	dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
4776	     ceph_session_state_name(s->s_state), s->s_seq);
4777	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
4778				      s->s_seq);
4779	if (!msg) {
4780		pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
4781		       s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
4782	} else {
4783		ceph_con_send(&s->s_con, msg);
4784	}
4785	mutex_unlock(&s->s_mutex);
4786}
4787
4788/*
4789 * called before mount is ro, and before dentries are torn down.
4790 * (hmm, does this still race with new lookups?)
4791 */
4792void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4793{
4794	dout("pre_umount\n");
4795	mdsc->stopping = 1;
4796
4797	ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
4798	ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
4799	ceph_flush_dirty_caps(mdsc);
4800	wait_requests(mdsc);
4801
4802	/*
4803	 * wait for reply handlers to drop their request refs and
4804	 * their inode/dcache refs
4805	 */
4806	ceph_msgr_flush();
4807
4808	ceph_cleanup_quotarealms_inodes(mdsc);
4809}
4810
4811/*
4812 * wait for all write mds requests to flush.
4813 */
4814static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4815{
4816	struct ceph_mds_request *req = NULL, *nextreq;
4817	struct rb_node *n;
4818
4819	mutex_lock(&mdsc->mutex);
4820	dout("wait_unsafe_requests want %lld\n", want_tid);
4821restart:
4822	req = __get_oldest_req(mdsc);
4823	while (req && req->r_tid <= want_tid) {
4824		/* find next request */
4825		n = rb_next(&req->r_node);
4826		if (n)
4827			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4828		else
4829			nextreq = NULL;
4830		if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4831		    (req->r_op & CEPH_MDS_OP_WRITE)) {
4832			/* write op */
4833			ceph_mdsc_get_request(req);
4834			if (nextreq)
4835				ceph_mdsc_get_request(nextreq);
4836			mutex_unlock(&mdsc->mutex);
4837			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4838			     req->r_tid, want_tid);
4839			wait_for_completion(&req->r_safe_completion);
4840			mutex_lock(&mdsc->mutex);
4841			ceph_mdsc_put_request(req);
4842			if (!nextreq)
4843				break;  /* next dne before, so we're done! */
4844			if (RB_EMPTY_NODE(&nextreq->r_node)) {
4845				/* next request was removed from tree */
4846				ceph_mdsc_put_request(nextreq);
4847				goto restart;
4848			}
4849			ceph_mdsc_put_request(nextreq);  /* won't go away */
4850		}
4851		req = nextreq;
4852	}
4853	mutex_unlock(&mdsc->mutex);
4854	dout("wait_unsafe_requests done\n");
4855}
4856
4857void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4858{
4859	u64 want_tid, want_flush;
4860
4861	if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
4862		return;
4863
4864	dout("sync\n");
4865	mutex_lock(&mdsc->mutex);
4866	want_tid = mdsc->last_tid;
4867	mutex_unlock(&mdsc->mutex);
4868
4869	ceph_flush_dirty_caps(mdsc);
4870	spin_lock(&mdsc->cap_dirty_lock);
4871	want_flush = mdsc->last_cap_flush_tid;
4872	if (!list_empty(&mdsc->cap_flush_list)) {
4873		struct ceph_cap_flush *cf =
4874			list_last_entry(&mdsc->cap_flush_list,
4875					struct ceph_cap_flush, g_list);
4876		cf->wake = true;
4877	}
4878	spin_unlock(&mdsc->cap_dirty_lock);
4879
4880	dout("sync want tid %lld flush_seq %lld\n",
4881	     want_tid, want_flush);
4882
4883	wait_unsafe_requests(mdsc, want_tid);
4884	wait_caps_flush(mdsc, want_flush);
4885}
4886
4887/*
4888 * true if all sessions are closed, or we force unmount
4889 */
4890static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4891{
4892	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4893		return true;
4894	return atomic_read(&mdsc->num_sessions) <= skipped;
4895}
4896
4897/*
4898 * called after sb is ro.
4899 */
4900void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4901{
4902	struct ceph_options *opts = mdsc->fsc->client->options;
4903	struct ceph_mds_session *session;
4904	int i;
4905	int skipped = 0;
4906
4907	dout("close_sessions\n");
4908
4909	/* close sessions */
4910	mutex_lock(&mdsc->mutex);
4911	for (i = 0; i < mdsc->max_sessions; i++) {
4912		session = __ceph_lookup_mds_session(mdsc, i);
4913		if (!session)
4914			continue;
4915		mutex_unlock(&mdsc->mutex);
4916		mutex_lock(&session->s_mutex);
4917		if (__close_session(mdsc, session) <= 0)
4918			skipped++;
4919		mutex_unlock(&session->s_mutex);
4920		ceph_put_mds_session(session);
4921		mutex_lock(&mdsc->mutex);
4922	}
4923	mutex_unlock(&mdsc->mutex);
4924
4925	dout("waiting for sessions to close\n");
4926	wait_event_timeout(mdsc->session_close_wq,
4927			   done_closing_sessions(mdsc, skipped),
4928			   ceph_timeout_jiffies(opts->mount_timeout));
4929
4930	/* tear down remaining sessions */
4931	mutex_lock(&mdsc->mutex);
4932	for (i = 0; i < mdsc->max_sessions; i++) {
4933		if (mdsc->sessions[i]) {
4934			session = ceph_get_mds_session(mdsc->sessions[i]);
4935			__unregister_session(mdsc, session);
4936			mutex_unlock(&mdsc->mutex);
4937			mutex_lock(&session->s_mutex);
4938			remove_session_caps(session);
4939			mutex_unlock(&session->s_mutex);
4940			ceph_put_mds_session(session);
4941			mutex_lock(&mdsc->mutex);
4942		}
4943	}
4944	WARN_ON(!list_empty(&mdsc->cap_delay_list));
4945	mutex_unlock(&mdsc->mutex);
4946
4947	ceph_cleanup_snapid_map(mdsc);
4948	ceph_cleanup_empty_realms(mdsc);
4949
4950	cancel_work_sync(&mdsc->cap_reclaim_work);
4951	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4952
4953	dout("stopped\n");
4954}
4955
4956void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4957{
4958	struct ceph_mds_session *session;
4959	int mds;
4960
4961	dout("force umount\n");
4962
4963	mutex_lock(&mdsc->mutex);
4964	for (mds = 0; mds < mdsc->max_sessions; mds++) {
4965		session = __ceph_lookup_mds_session(mdsc, mds);
4966		if (!session)
4967			continue;
4968
4969		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4970			__unregister_session(mdsc, session);
4971		__wake_requests(mdsc, &session->s_waiting);
4972		mutex_unlock(&mdsc->mutex);
4973
4974		mutex_lock(&session->s_mutex);
4975		__close_session(mdsc, session);
4976		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4977			cleanup_session_requests(mdsc, session);
4978			remove_session_caps(session);
4979		}
4980		mutex_unlock(&session->s_mutex);
4981		ceph_put_mds_session(session);
4982
4983		mutex_lock(&mdsc->mutex);
4984		kick_requests(mdsc, mds);
4985	}
4986	__wake_requests(mdsc, &mdsc->waiting_for_map);
4987	mutex_unlock(&mdsc->mutex);
4988}
4989
4990static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4991{
4992	dout("stop\n");
4993	/*
4994	 * Make sure the delayed work stopped before releasing
4995	 * the resources.
4996	 *
4997	 * Because the cancel_delayed_work_sync() will only
4998	 * guarantee that the work finishes executing. But the
4999	 * delayed work will re-arm itself again after that.
5000	 */
5001	flush_delayed_work(&mdsc->delayed_work);
5002
5003	if (mdsc->mdsmap)
5004		ceph_mdsmap_destroy(mdsc->mdsmap);
5005	kfree(mdsc->sessions);
5006	ceph_caps_finalize(mdsc);
5007	ceph_pool_perm_destroy(mdsc);
5008}
5009
5010void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
5011{
5012	struct ceph_mds_client *mdsc = fsc->mdsc;
5013	dout("mdsc_destroy %p\n", mdsc);
5014
5015	if (!mdsc)
5016		return;
5017
5018	/* flush out any connection work with references to us */
5019	ceph_msgr_flush();
5020
5021	ceph_mdsc_stop(mdsc);
5022
5023	ceph_metric_destroy(&mdsc->metric);
5024
5025	fsc->mdsc = NULL;
5026	kfree(mdsc);
5027	dout("mdsc_destroy %p done\n", mdsc);
5028}
5029
5030void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5031{
5032	struct ceph_fs_client *fsc = mdsc->fsc;
5033	const char *mds_namespace = fsc->mount_options->mds_namespace;
5034	void *p = msg->front.iov_base;
5035	void *end = p + msg->front.iov_len;
5036	u32 epoch;
5037	u32 num_fs;
5038	u32 mount_fscid = (u32)-1;
5039	int err = -EINVAL;
5040
5041	ceph_decode_need(&p, end, sizeof(u32), bad);
5042	epoch = ceph_decode_32(&p);
5043
5044	dout("handle_fsmap epoch %u\n", epoch);
5045
5046	/* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
5047	ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
5048
5049	ceph_decode_32_safe(&p, end, num_fs, bad);
5050	while (num_fs-- > 0) {
5051		void *info_p, *info_end;
5052		u32 info_len;
5053		u32 fscid, namelen;
5054
5055		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
5056		p += 2;		// info_v, info_cv
5057		info_len = ceph_decode_32(&p);
5058		ceph_decode_need(&p, end, info_len, bad);
5059		info_p = p;
5060		info_end = p + info_len;
5061		p = info_end;
5062
5063		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
5064		fscid = ceph_decode_32(&info_p);
5065		namelen = ceph_decode_32(&info_p);
5066		ceph_decode_need(&info_p, info_end, namelen, bad);
5067
5068		if (mds_namespace &&
5069		    strlen(mds_namespace) == namelen &&
5070		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
5071			mount_fscid = fscid;
5072			break;
5073		}
5074	}
5075
5076	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
5077	if (mount_fscid != (u32)-1) {
5078		fsc->client->monc.fs_cluster_id = mount_fscid;
5079		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
5080				   0, true);
5081		ceph_monc_renew_subs(&fsc->client->monc);
5082	} else {
5083		err = -ENOENT;
5084		goto err_out;
5085	}
5086	return;
5087
5088bad:
5089	pr_err("error decoding fsmap\n");
5090err_out:
5091	mutex_lock(&mdsc->mutex);
5092	mdsc->mdsmap_err = err;
5093	__wake_requests(mdsc, &mdsc->waiting_for_map);
5094	mutex_unlock(&mdsc->mutex);
5095}
5096
5097/*
5098 * handle mds map update.
5099 */
5100void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5101{
5102	u32 epoch;
5103	u32 maplen;
5104	void *p = msg->front.iov_base;
5105	void *end = p + msg->front.iov_len;
5106	struct ceph_mdsmap *newmap, *oldmap;
5107	struct ceph_fsid fsid;
5108	int err = -EINVAL;
5109
5110	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5111	ceph_decode_copy(&p, &fsid, sizeof(fsid));
5112	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5113		return;
5114	epoch = ceph_decode_32(&p);
5115	maplen = ceph_decode_32(&p);
5116	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5117
5118	/* do we need it? */
5119	mutex_lock(&mdsc->mutex);
5120	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5121		dout("handle_map epoch %u <= our %u\n",
5122		     epoch, mdsc->mdsmap->m_epoch);
5123		mutex_unlock(&mdsc->mutex);
5124		return;
5125	}
5126
5127	newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
5128	if (IS_ERR(newmap)) {
5129		err = PTR_ERR(newmap);
5130		goto bad_unlock;
5131	}
5132
5133	/* swap into place */
5134	if (mdsc->mdsmap) {
5135		oldmap = mdsc->mdsmap;
5136		mdsc->mdsmap = newmap;
5137		check_new_map(mdsc, newmap, oldmap);
5138		ceph_mdsmap_destroy(oldmap);
5139	} else {
5140		mdsc->mdsmap = newmap;  /* first mds map */
5141	}
5142	mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5143					MAX_LFS_FILESIZE);
5144
5145	__wake_requests(mdsc, &mdsc->waiting_for_map);
5146	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5147			  mdsc->mdsmap->m_epoch);
5148
5149	mutex_unlock(&mdsc->mutex);
5150	schedule_delayed(mdsc, 0);
5151	return;
5152
5153bad_unlock:
5154	mutex_unlock(&mdsc->mutex);
5155bad:
5156	pr_err("error decoding mdsmap %d\n", err);
5157	return;
5158}
5159
5160static struct ceph_connection *mds_get_con(struct ceph_connection *con)
5161{
5162	struct ceph_mds_session *s = con->private;
5163
5164	if (ceph_get_mds_session(s))
5165		return con;
5166	return NULL;
5167}
5168
5169static void mds_put_con(struct ceph_connection *con)
5170{
5171	struct ceph_mds_session *s = con->private;
5172
5173	ceph_put_mds_session(s);
5174}
5175
5176/*
5177 * if the client is unresponsive for long enough, the mds will kill
5178 * the session entirely.
5179 */
5180static void mds_peer_reset(struct ceph_connection *con)
5181{
5182	struct ceph_mds_session *s = con->private;
5183	struct ceph_mds_client *mdsc = s->s_mdsc;
5184
5185	pr_warn("mds%d closed our session\n", s->s_mds);
5186	send_mds_reconnect(mdsc, s);
5187}
5188
5189static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5190{
5191	struct ceph_mds_session *s = con->private;
5192	struct ceph_mds_client *mdsc = s->s_mdsc;
5193	int type = le16_to_cpu(msg->hdr.type);
5194
5195	mutex_lock(&mdsc->mutex);
5196	if (__verify_registered_session(mdsc, s) < 0) {
5197		mutex_unlock(&mdsc->mutex);
5198		goto out;
5199	}
5200	mutex_unlock(&mdsc->mutex);
5201
5202	switch (type) {
5203	case CEPH_MSG_MDS_MAP:
5204		ceph_mdsc_handle_mdsmap(mdsc, msg);
5205		break;
5206	case CEPH_MSG_FS_MAP_USER:
5207		ceph_mdsc_handle_fsmap(mdsc, msg);
5208		break;
5209	case CEPH_MSG_CLIENT_SESSION:
5210		handle_session(s, msg);
5211		break;
5212	case CEPH_MSG_CLIENT_REPLY:
5213		handle_reply(s, msg);
5214		break;
5215	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5216		handle_forward(mdsc, s, msg);
5217		break;
5218	case CEPH_MSG_CLIENT_CAPS:
5219		ceph_handle_caps(s, msg);
5220		break;
5221	case CEPH_MSG_CLIENT_SNAP:
5222		ceph_handle_snap(mdsc, s, msg);
5223		break;
5224	case CEPH_MSG_CLIENT_LEASE:
5225		handle_lease(mdsc, s, msg);
5226		break;
5227	case CEPH_MSG_CLIENT_QUOTA:
5228		ceph_handle_quota(mdsc, s, msg);
5229		break;
5230
5231	default:
5232		pr_err("received unknown message type %d %s\n", type,
5233		       ceph_msg_type_name(type));
5234	}
5235out:
5236	ceph_msg_put(msg);
5237}
5238
5239/*
5240 * authentication
5241 */
5242
5243/*
5244 * Note: returned pointer is the address of a structure that's
5245 * managed separately.  Caller must *not* attempt to free it.
5246 */
5247static struct ceph_auth_handshake *
5248mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5249{
5250	struct ceph_mds_session *s = con->private;
5251	struct ceph_mds_client *mdsc = s->s_mdsc;
5252	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5253	struct ceph_auth_handshake *auth = &s->s_auth;
5254	int ret;
5255
5256	ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5257					 force_new, proto, NULL, NULL);
5258	if (ret)
5259		return ERR_PTR(ret);
5260
5261	return auth;
5262}
5263
5264static int mds_add_authorizer_challenge(struct ceph_connection *con,
5265				    void *challenge_buf, int challenge_buf_len)
5266{
5267	struct ceph_mds_session *s = con->private;
5268	struct ceph_mds_client *mdsc = s->s_mdsc;
5269	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5270
5271	return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5272					    challenge_buf, challenge_buf_len);
5273}
5274
5275static int mds_verify_authorizer_reply(struct ceph_connection *con)
5276{
5277	struct ceph_mds_session *s = con->private;
5278	struct ceph_mds_client *mdsc = s->s_mdsc;
5279	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5280	struct ceph_auth_handshake *auth = &s->s_auth;
5281
5282	return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5283		auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5284		NULL, NULL, NULL, NULL);
5285}
5286
5287static int mds_invalidate_authorizer(struct ceph_connection *con)
5288{
5289	struct ceph_mds_session *s = con->private;
5290	struct ceph_mds_client *mdsc = s->s_mdsc;
5291	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5292
5293	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5294
5295	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5296}
5297
5298static int mds_get_auth_request(struct ceph_connection *con,
5299				void *buf, int *buf_len,
5300				void **authorizer, int *authorizer_len)
5301{
5302	struct ceph_mds_session *s = con->private;
5303	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5304	struct ceph_auth_handshake *auth = &s->s_auth;
5305	int ret;
5306
5307	ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5308				       buf, buf_len);
5309	if (ret)
5310		return ret;
5311
5312	*authorizer = auth->authorizer_buf;
5313	*authorizer_len = auth->authorizer_buf_len;
5314	return 0;
5315}
5316
5317static int mds_handle_auth_reply_more(struct ceph_connection *con,
5318				      void *reply, int reply_len,
5319				      void *buf, int *buf_len,
5320				      void **authorizer, int *authorizer_len)
5321{
5322	struct ceph_mds_session *s = con->private;
5323	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5324	struct ceph_auth_handshake *auth = &s->s_auth;
5325	int ret;
5326
5327	ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5328					      buf, buf_len);
5329	if (ret)
5330		return ret;
5331
5332	*authorizer = auth->authorizer_buf;
5333	*authorizer_len = auth->authorizer_buf_len;
5334	return 0;
5335}
5336
5337static int mds_handle_auth_done(struct ceph_connection *con,
5338				u64 global_id, void *reply, int reply_len,
5339				u8 *session_key, int *session_key_len,
5340				u8 *con_secret, int *con_secret_len)
5341{
5342	struct ceph_mds_session *s = con->private;
5343	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5344	struct ceph_auth_handshake *auth = &s->s_auth;
5345
5346	return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5347					       session_key, session_key_len,
5348					       con_secret, con_secret_len);
5349}
5350
5351static int mds_handle_auth_bad_method(struct ceph_connection *con,
5352				      int used_proto, int result,
5353				      const int *allowed_protos, int proto_cnt,
5354				      const int *allowed_modes, int mode_cnt)
5355{
5356	struct ceph_mds_session *s = con->private;
5357	struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
5358	int ret;
5359
5360	if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
5361					    used_proto, result,
5362					    allowed_protos, proto_cnt,
5363					    allowed_modes, mode_cnt)) {
5364		ret = ceph_monc_validate_auth(monc);
5365		if (ret)
5366			return ret;
5367	}
5368
5369	return -EACCES;
5370}
5371
5372static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5373				struct ceph_msg_header *hdr, int *skip)
5374{
5375	struct ceph_msg *msg;
5376	int type = (int) le16_to_cpu(hdr->type);
5377	int front_len = (int) le32_to_cpu(hdr->front_len);
5378
5379	if (con->in_msg)
5380		return con->in_msg;
5381
5382	*skip = 0;
5383	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5384	if (!msg) {
5385		pr_err("unable to allocate msg type %d len %d\n",
5386		       type, front_len);
5387		return NULL;
5388	}
5389
5390	return msg;
5391}
5392
5393static int mds_sign_message(struct ceph_msg *msg)
5394{
5395       struct ceph_mds_session *s = msg->con->private;
5396       struct ceph_auth_handshake *auth = &s->s_auth;
5397
5398       return ceph_auth_sign_message(auth, msg);
5399}
5400
5401static int mds_check_message_signature(struct ceph_msg *msg)
5402{
5403       struct ceph_mds_session *s = msg->con->private;
5404       struct ceph_auth_handshake *auth = &s->s_auth;
5405
5406       return ceph_auth_check_message_signature(auth, msg);
5407}
5408
5409static const struct ceph_connection_operations mds_con_ops = {
5410	.get = mds_get_con,
5411	.put = mds_put_con,
5412	.alloc_msg = mds_alloc_msg,
5413	.dispatch = mds_dispatch,
5414	.peer_reset = mds_peer_reset,
5415	.get_authorizer = mds_get_authorizer,
5416	.add_authorizer_challenge = mds_add_authorizer_challenge,
5417	.verify_authorizer_reply = mds_verify_authorizer_reply,
5418	.invalidate_authorizer = mds_invalidate_authorizer,
5419	.sign_message = mds_sign_message,
5420	.check_message_signature = mds_check_message_signature,
5421	.get_auth_request = mds_get_auth_request,
5422	.handle_auth_reply_more = mds_handle_auth_reply_more,
5423	.handle_auth_done = mds_handle_auth_done,
5424	.handle_auth_bad_method = mds_handle_auth_bad_method,
5425};
5426
5427/* eof */
5428