mds_client.c revision fba97e80
1// SPDX-License-Identifier: GPL-2.0
2#include <linux/ceph/ceph_debug.h>
3
4#include <linux/fs.h>
5#include <linux/wait.h>
6#include <linux/slab.h>
7#include <linux/gfp.h>
8#include <linux/sched.h>
9#include <linux/debugfs.h>
10#include <linux/seq_file.h>
11#include <linux/ratelimit.h>
12#include <linux/bits.h>
13#include <linux/ktime.h>
14
15#include "super.h"
16#include "mds_client.h"
17
18#include <linux/ceph/ceph_features.h>
19#include <linux/ceph/messenger.h>
20#include <linux/ceph/decode.h>
21#include <linux/ceph/pagelist.h>
22#include <linux/ceph/auth.h>
23#include <linux/ceph/debugfs.h>
24
25#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
26
27/*
28 * A cluster of MDS (metadata server) daemons is responsible for
29 * managing the file system namespace (the directory hierarchy and
30 * inodes) and for coordinating shared access to storage.  Metadata is
31 * partitioning hierarchically across a number of servers, and that
32 * partition varies over time as the cluster adjusts the distribution
33 * in order to balance load.
34 *
35 * The MDS client is primarily responsible to managing synchronous
36 * metadata requests for operations like open, unlink, and so forth.
37 * If there is a MDS failure, we find out about it when we (possibly
38 * request and) receive a new MDS map, and can resubmit affected
39 * requests.
40 *
41 * For the most part, though, we take advantage of a lossless
42 * communications channel to the MDS, and do not need to worry about
43 * timing out or resubmitting requests.
44 *
45 * We maintain a stateful "session" with each MDS we interact with.
46 * Within each session, we sent periodic heartbeat messages to ensure
47 * any capabilities or leases we have been issues remain valid.  If
48 * the session times out and goes stale, our leases and capabilities
49 * are no longer valid.
50 */
51
52struct ceph_reconnect_state {
53	struct ceph_mds_session *session;
54	int nr_caps, nr_realms;
55	struct ceph_pagelist *pagelist;
56	unsigned msg_version;
57	bool allow_multi;
58};
59
60static void __wake_requests(struct ceph_mds_client *mdsc,
61			    struct list_head *head);
62static void ceph_cap_release_work(struct work_struct *work);
63static void ceph_cap_reclaim_work(struct work_struct *work);
64
65static const struct ceph_connection_operations mds_con_ops;
66
67
68/*
69 * mds reply parsing
70 */
71
72static int parse_reply_info_quota(void **p, void *end,
73				  struct ceph_mds_reply_info_in *info)
74{
75	u8 struct_v, struct_compat;
76	u32 struct_len;
77
78	ceph_decode_8_safe(p, end, struct_v, bad);
79	ceph_decode_8_safe(p, end, struct_compat, bad);
80	/* struct_v is expected to be >= 1. we only
81	 * understand encoding with struct_compat == 1. */
82	if (!struct_v || struct_compat != 1)
83		goto bad;
84	ceph_decode_32_safe(p, end, struct_len, bad);
85	ceph_decode_need(p, end, struct_len, bad);
86	end = *p + struct_len;
87	ceph_decode_64_safe(p, end, info->max_bytes, bad);
88	ceph_decode_64_safe(p, end, info->max_files, bad);
89	*p = end;
90	return 0;
91bad:
92	return -EIO;
93}
94
95/*
96 * parse individual inode info
97 */
98static int parse_reply_info_in(void **p, void *end,
99			       struct ceph_mds_reply_info_in *info,
100			       u64 features)
101{
102	int err = 0;
103	u8 struct_v = 0;
104
105	if (features == (u64)-1) {
106		u32 struct_len;
107		u8 struct_compat;
108		ceph_decode_8_safe(p, end, struct_v, bad);
109		ceph_decode_8_safe(p, end, struct_compat, bad);
110		/* struct_v is expected to be >= 1. we only understand
111		 * encoding with struct_compat == 1. */
112		if (!struct_v || struct_compat != 1)
113			goto bad;
114		ceph_decode_32_safe(p, end, struct_len, bad);
115		ceph_decode_need(p, end, struct_len, bad);
116		end = *p + struct_len;
117	}
118
119	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
120	info->in = *p;
121	*p += sizeof(struct ceph_mds_reply_inode) +
122		sizeof(*info->in->fragtree.splits) *
123		le32_to_cpu(info->in->fragtree.nsplits);
124
125	ceph_decode_32_safe(p, end, info->symlink_len, bad);
126	ceph_decode_need(p, end, info->symlink_len, bad);
127	info->symlink = *p;
128	*p += info->symlink_len;
129
130	ceph_decode_copy_safe(p, end, &info->dir_layout,
131			      sizeof(info->dir_layout), bad);
132	ceph_decode_32_safe(p, end, info->xattr_len, bad);
133	ceph_decode_need(p, end, info->xattr_len, bad);
134	info->xattr_data = *p;
135	*p += info->xattr_len;
136
137	if (features == (u64)-1) {
138		/* inline data */
139		ceph_decode_64_safe(p, end, info->inline_version, bad);
140		ceph_decode_32_safe(p, end, info->inline_len, bad);
141		ceph_decode_need(p, end, info->inline_len, bad);
142		info->inline_data = *p;
143		*p += info->inline_len;
144		/* quota */
145		err = parse_reply_info_quota(p, end, info);
146		if (err < 0)
147			goto out_bad;
148		/* pool namespace */
149		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
150		if (info->pool_ns_len > 0) {
151			ceph_decode_need(p, end, info->pool_ns_len, bad);
152			info->pool_ns_data = *p;
153			*p += info->pool_ns_len;
154		}
155
156		/* btime */
157		ceph_decode_need(p, end, sizeof(info->btime), bad);
158		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
159
160		/* change attribute */
161		ceph_decode_64_safe(p, end, info->change_attr, bad);
162
163		/* dir pin */
164		if (struct_v >= 2) {
165			ceph_decode_32_safe(p, end, info->dir_pin, bad);
166		} else {
167			info->dir_pin = -ENODATA;
168		}
169
170		/* snapshot birth time, remains zero for v<=2 */
171		if (struct_v >= 3) {
172			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
173			ceph_decode_copy(p, &info->snap_btime,
174					 sizeof(info->snap_btime));
175		} else {
176			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
177		}
178
179		/* snapshot count, remains zero for v<=3 */
180		if (struct_v >= 4) {
181			ceph_decode_64_safe(p, end, info->rsnaps, bad);
182		} else {
183			info->rsnaps = 0;
184		}
185
186		*p = end;
187	} else {
188		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
189			ceph_decode_64_safe(p, end, info->inline_version, bad);
190			ceph_decode_32_safe(p, end, info->inline_len, bad);
191			ceph_decode_need(p, end, info->inline_len, bad);
192			info->inline_data = *p;
193			*p += info->inline_len;
194		} else
195			info->inline_version = CEPH_INLINE_NONE;
196
197		if (features & CEPH_FEATURE_MDS_QUOTA) {
198			err = parse_reply_info_quota(p, end, info);
199			if (err < 0)
200				goto out_bad;
201		} else {
202			info->max_bytes = 0;
203			info->max_files = 0;
204		}
205
206		info->pool_ns_len = 0;
207		info->pool_ns_data = NULL;
208		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
209			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
210			if (info->pool_ns_len > 0) {
211				ceph_decode_need(p, end, info->pool_ns_len, bad);
212				info->pool_ns_data = *p;
213				*p += info->pool_ns_len;
214			}
215		}
216
217		if (features & CEPH_FEATURE_FS_BTIME) {
218			ceph_decode_need(p, end, sizeof(info->btime), bad);
219			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
220			ceph_decode_64_safe(p, end, info->change_attr, bad);
221		}
222
223		info->dir_pin = -ENODATA;
224		/* info->snap_btime and info->rsnaps remain zero */
225	}
226	return 0;
227bad:
228	err = -EIO;
229out_bad:
230	return err;
231}
232
233static int parse_reply_info_dir(void **p, void *end,
234				struct ceph_mds_reply_dirfrag **dirfrag,
235				u64 features)
236{
237	if (features == (u64)-1) {
238		u8 struct_v, struct_compat;
239		u32 struct_len;
240		ceph_decode_8_safe(p, end, struct_v, bad);
241		ceph_decode_8_safe(p, end, struct_compat, bad);
242		/* struct_v is expected to be >= 1. we only understand
243		 * encoding whose struct_compat == 1. */
244		if (!struct_v || struct_compat != 1)
245			goto bad;
246		ceph_decode_32_safe(p, end, struct_len, bad);
247		ceph_decode_need(p, end, struct_len, bad);
248		end = *p + struct_len;
249	}
250
251	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
252	*dirfrag = *p;
253	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
254	if (unlikely(*p > end))
255		goto bad;
256	if (features == (u64)-1)
257		*p = end;
258	return 0;
259bad:
260	return -EIO;
261}
262
263static int parse_reply_info_lease(void **p, void *end,
264				  struct ceph_mds_reply_lease **lease,
265				  u64 features)
266{
267	if (features == (u64)-1) {
268		u8 struct_v, struct_compat;
269		u32 struct_len;
270		ceph_decode_8_safe(p, end, struct_v, bad);
271		ceph_decode_8_safe(p, end, struct_compat, bad);
272		/* struct_v is expected to be >= 1. we only understand
273		 * encoding whose struct_compat == 1. */
274		if (!struct_v || struct_compat != 1)
275			goto bad;
276		ceph_decode_32_safe(p, end, struct_len, bad);
277		ceph_decode_need(p, end, struct_len, bad);
278		end = *p + struct_len;
279	}
280
281	ceph_decode_need(p, end, sizeof(**lease), bad);
282	*lease = *p;
283	*p += sizeof(**lease);
284	if (features == (u64)-1)
285		*p = end;
286	return 0;
287bad:
288	return -EIO;
289}
290
291/*
292 * parse a normal reply, which may contain a (dir+)dentry and/or a
293 * target inode.
294 */
295static int parse_reply_info_trace(void **p, void *end,
296				  struct ceph_mds_reply_info_parsed *info,
297				  u64 features)
298{
299	int err;
300
301	if (info->head->is_dentry) {
302		err = parse_reply_info_in(p, end, &info->diri, features);
303		if (err < 0)
304			goto out_bad;
305
306		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
307		if (err < 0)
308			goto out_bad;
309
310		ceph_decode_32_safe(p, end, info->dname_len, bad);
311		ceph_decode_need(p, end, info->dname_len, bad);
312		info->dname = *p;
313		*p += info->dname_len;
314
315		err = parse_reply_info_lease(p, end, &info->dlease, features);
316		if (err < 0)
317			goto out_bad;
318	}
319
320	if (info->head->is_target) {
321		err = parse_reply_info_in(p, end, &info->targeti, features);
322		if (err < 0)
323			goto out_bad;
324	}
325
326	if (unlikely(*p != end))
327		goto bad;
328	return 0;
329
330bad:
331	err = -EIO;
332out_bad:
333	pr_err("problem parsing mds trace %d\n", err);
334	return err;
335}
336
337/*
338 * parse readdir results
339 */
340static int parse_reply_info_readdir(void **p, void *end,
341				struct ceph_mds_reply_info_parsed *info,
342				u64 features)
343{
344	u32 num, i = 0;
345	int err;
346
347	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
348	if (err < 0)
349		goto out_bad;
350
351	ceph_decode_need(p, end, sizeof(num) + 2, bad);
352	num = ceph_decode_32(p);
353	{
354		u16 flags = ceph_decode_16(p);
355		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
356		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
357		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
358		info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
359	}
360	if (num == 0)
361		goto done;
362
363	BUG_ON(!info->dir_entries);
364	if ((unsigned long)(info->dir_entries + num) >
365	    (unsigned long)info->dir_entries + info->dir_buf_size) {
366		pr_err("dir contents are larger than expected\n");
367		WARN_ON(1);
368		goto bad;
369	}
370
371	info->dir_nr = num;
372	while (num) {
373		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
374		/* dentry */
375		ceph_decode_32_safe(p, end, rde->name_len, bad);
376		ceph_decode_need(p, end, rde->name_len, bad);
377		rde->name = *p;
378		*p += rde->name_len;
379		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
380
381		/* dentry lease */
382		err = parse_reply_info_lease(p, end, &rde->lease, features);
383		if (err)
384			goto out_bad;
385		/* inode */
386		err = parse_reply_info_in(p, end, &rde->inode, features);
387		if (err < 0)
388			goto out_bad;
389		/* ceph_readdir_prepopulate() will update it */
390		rde->offset = 0;
391		i++;
392		num--;
393	}
394
395done:
396	/* Skip over any unrecognized fields */
397	*p = end;
398	return 0;
399
400bad:
401	err = -EIO;
402out_bad:
403	pr_err("problem parsing dir contents %d\n", err);
404	return err;
405}
406
407/*
408 * parse fcntl F_GETLK results
409 */
410static int parse_reply_info_filelock(void **p, void *end,
411				     struct ceph_mds_reply_info_parsed *info,
412				     u64 features)
413{
414	if (*p + sizeof(*info->filelock_reply) > end)
415		goto bad;
416
417	info->filelock_reply = *p;
418
419	/* Skip over any unrecognized fields */
420	*p = end;
421	return 0;
422bad:
423	return -EIO;
424}
425
426
427#if BITS_PER_LONG == 64
428
429#define DELEGATED_INO_AVAILABLE		xa_mk_value(1)
430
431static int ceph_parse_deleg_inos(void **p, void *end,
432				 struct ceph_mds_session *s)
433{
434	u32 sets;
435
436	ceph_decode_32_safe(p, end, sets, bad);
437	dout("got %u sets of delegated inodes\n", sets);
438	while (sets--) {
439		u64 start, len, ino;
440
441		ceph_decode_64_safe(p, end, start, bad);
442		ceph_decode_64_safe(p, end, len, bad);
443
444		/* Don't accept a delegation of system inodes */
445		if (start < CEPH_INO_SYSTEM_BASE) {
446			pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
447					start, len);
448			continue;
449		}
450		while (len--) {
451			int err = xa_insert(&s->s_delegated_inos, ino = start++,
452					    DELEGATED_INO_AVAILABLE,
453					    GFP_KERNEL);
454			if (!err) {
455				dout("added delegated inode 0x%llx\n",
456				     start - 1);
457			} else if (err == -EBUSY) {
458				pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
459					start - 1);
460			} else {
461				return err;
462			}
463		}
464	}
465	return 0;
466bad:
467	return -EIO;
468}
469
470u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
471{
472	unsigned long ino;
473	void *val;
474
475	xa_for_each(&s->s_delegated_inos, ino, val) {
476		val = xa_erase(&s->s_delegated_inos, ino);
477		if (val == DELEGATED_INO_AVAILABLE)
478			return ino;
479	}
480	return 0;
481}
482
483int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
484{
485	return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
486			 GFP_KERNEL);
487}
488#else /* BITS_PER_LONG == 64 */
489/*
490 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
491 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
492 * and bottom words?
493 */
494static int ceph_parse_deleg_inos(void **p, void *end,
495				 struct ceph_mds_session *s)
496{
497	u32 sets;
498
499	ceph_decode_32_safe(p, end, sets, bad);
500	if (sets)
501		ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
502	return 0;
503bad:
504	return -EIO;
505}
506
507u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
508{
509	return 0;
510}
511
512int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
513{
514	return 0;
515}
516#endif /* BITS_PER_LONG == 64 */
517
518/*
519 * parse create results
520 */
521static int parse_reply_info_create(void **p, void *end,
522				  struct ceph_mds_reply_info_parsed *info,
523				  u64 features, struct ceph_mds_session *s)
524{
525	int ret;
526
527	if (features == (u64)-1 ||
528	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
529		if (*p == end) {
530			/* Malformed reply? */
531			info->has_create_ino = false;
532		} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
533			info->has_create_ino = true;
534			/* struct_v, struct_compat, and len */
535			ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
536			ceph_decode_64_safe(p, end, info->ino, bad);
537			ret = ceph_parse_deleg_inos(p, end, s);
538			if (ret)
539				return ret;
540		} else {
541			/* legacy */
542			ceph_decode_64_safe(p, end, info->ino, bad);
543			info->has_create_ino = true;
544		}
545	} else {
546		if (*p != end)
547			goto bad;
548	}
549
550	/* Skip over any unrecognized fields */
551	*p = end;
552	return 0;
553bad:
554	return -EIO;
555}
556
557/*
558 * parse extra results
559 */
560static int parse_reply_info_extra(void **p, void *end,
561				  struct ceph_mds_reply_info_parsed *info,
562				  u64 features, struct ceph_mds_session *s)
563{
564	u32 op = le32_to_cpu(info->head->op);
565
566	if (op == CEPH_MDS_OP_GETFILELOCK)
567		return parse_reply_info_filelock(p, end, info, features);
568	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
569		return parse_reply_info_readdir(p, end, info, features);
570	else if (op == CEPH_MDS_OP_CREATE)
571		return parse_reply_info_create(p, end, info, features, s);
572	else
573		return -EIO;
574}
575
576/*
577 * parse entire mds reply
578 */
579static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
580			    struct ceph_mds_reply_info_parsed *info,
581			    u64 features)
582{
583	void *p, *end;
584	u32 len;
585	int err;
586
587	info->head = msg->front.iov_base;
588	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
589	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
590
591	/* trace */
592	ceph_decode_32_safe(&p, end, len, bad);
593	if (len > 0) {
594		ceph_decode_need(&p, end, len, bad);
595		err = parse_reply_info_trace(&p, p+len, info, features);
596		if (err < 0)
597			goto out_bad;
598	}
599
600	/* extra */
601	ceph_decode_32_safe(&p, end, len, bad);
602	if (len > 0) {
603		ceph_decode_need(&p, end, len, bad);
604		err = parse_reply_info_extra(&p, p+len, info, features, s);
605		if (err < 0)
606			goto out_bad;
607	}
608
609	/* snap blob */
610	ceph_decode_32_safe(&p, end, len, bad);
611	info->snapblob_len = len;
612	info->snapblob = p;
613	p += len;
614
615	if (p != end)
616		goto bad;
617	return 0;
618
619bad:
620	err = -EIO;
621out_bad:
622	pr_err("mds parse_reply err %d\n", err);
623	return err;
624}
625
626static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
627{
628	if (!info->dir_entries)
629		return;
630	free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
631}
632
633
634/*
635 * sessions
636 */
637const char *ceph_session_state_name(int s)
638{
639	switch (s) {
640	case CEPH_MDS_SESSION_NEW: return "new";
641	case CEPH_MDS_SESSION_OPENING: return "opening";
642	case CEPH_MDS_SESSION_OPEN: return "open";
643	case CEPH_MDS_SESSION_HUNG: return "hung";
644	case CEPH_MDS_SESSION_CLOSING: return "closing";
645	case CEPH_MDS_SESSION_CLOSED: return "closed";
646	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
647	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
648	case CEPH_MDS_SESSION_REJECTED: return "rejected";
649	default: return "???";
650	}
651}
652
653struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
654{
655	if (refcount_inc_not_zero(&s->s_ref)) {
656		dout("mdsc get_session %p %d -> %d\n", s,
657		     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
658		return s;
659	} else {
660		dout("mdsc get_session %p 0 -- FAIL\n", s);
661		return NULL;
662	}
663}
664
665void ceph_put_mds_session(struct ceph_mds_session *s)
666{
667	if (IS_ERR_OR_NULL(s))
668		return;
669
670	dout("mdsc put_session %p %d -> %d\n", s,
671	     refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
672	if (refcount_dec_and_test(&s->s_ref)) {
673		if (s->s_auth.authorizer)
674			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
675		WARN_ON(mutex_is_locked(&s->s_mutex));
676		xa_destroy(&s->s_delegated_inos);
677		kfree(s);
678	}
679}
680
681/*
682 * called under mdsc->mutex
683 */
684struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
685						   int mds)
686{
687	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
688		return NULL;
689	return ceph_get_mds_session(mdsc->sessions[mds]);
690}
691
692static bool __have_session(struct ceph_mds_client *mdsc, int mds)
693{
694	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
695		return false;
696	else
697		return true;
698}
699
700static int __verify_registered_session(struct ceph_mds_client *mdsc,
701				       struct ceph_mds_session *s)
702{
703	if (s->s_mds >= mdsc->max_sessions ||
704	    mdsc->sessions[s->s_mds] != s)
705		return -ENOENT;
706	return 0;
707}
708
709/*
710 * create+register a new session for given mds.
711 * called under mdsc->mutex.
712 */
713static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
714						 int mds)
715{
716	struct ceph_mds_session *s;
717
718	if (mds >= mdsc->mdsmap->possible_max_rank)
719		return ERR_PTR(-EINVAL);
720
721	s = kzalloc(sizeof(*s), GFP_NOFS);
722	if (!s)
723		return ERR_PTR(-ENOMEM);
724
725	if (mds >= mdsc->max_sessions) {
726		int newmax = 1 << get_count_order(mds + 1);
727		struct ceph_mds_session **sa;
728
729		dout("%s: realloc to %d\n", __func__, newmax);
730		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
731		if (!sa)
732			goto fail_realloc;
733		if (mdsc->sessions) {
734			memcpy(sa, mdsc->sessions,
735			       mdsc->max_sessions * sizeof(void *));
736			kfree(mdsc->sessions);
737		}
738		mdsc->sessions = sa;
739		mdsc->max_sessions = newmax;
740	}
741
742	dout("%s: mds%d\n", __func__, mds);
743	s->s_mdsc = mdsc;
744	s->s_mds = mds;
745	s->s_state = CEPH_MDS_SESSION_NEW;
746	s->s_ttl = 0;
747	s->s_seq = 0;
748	mutex_init(&s->s_mutex);
749
750	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
751
752	atomic_set(&s->s_cap_gen, 1);
753	s->s_cap_ttl = jiffies - 1;
754
755	spin_lock_init(&s->s_cap_lock);
756	s->s_renew_requested = 0;
757	s->s_renew_seq = 0;
758	INIT_LIST_HEAD(&s->s_caps);
759	s->s_nr_caps = 0;
760	refcount_set(&s->s_ref, 1);
761	INIT_LIST_HEAD(&s->s_waiting);
762	INIT_LIST_HEAD(&s->s_unsafe);
763	xa_init(&s->s_delegated_inos);
764	s->s_num_cap_releases = 0;
765	s->s_cap_reconnect = 0;
766	s->s_cap_iterator = NULL;
767	INIT_LIST_HEAD(&s->s_cap_releases);
768	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
769
770	INIT_LIST_HEAD(&s->s_cap_dirty);
771	INIT_LIST_HEAD(&s->s_cap_flushing);
772
773	mdsc->sessions[mds] = s;
774	atomic_inc(&mdsc->num_sessions);
775	refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
776
777	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
778		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
779
780	return s;
781
782fail_realloc:
783	kfree(s);
784	return ERR_PTR(-ENOMEM);
785}
786
787/*
788 * called under mdsc->mutex
789 */
790static void __unregister_session(struct ceph_mds_client *mdsc,
791			       struct ceph_mds_session *s)
792{
793	dout("__unregister_session mds%d %p\n", s->s_mds, s);
794	BUG_ON(mdsc->sessions[s->s_mds] != s);
795	mdsc->sessions[s->s_mds] = NULL;
796	ceph_con_close(&s->s_con);
797	ceph_put_mds_session(s);
798	atomic_dec(&mdsc->num_sessions);
799}
800
801/*
802 * drop session refs in request.
803 *
804 * should be last request ref, or hold mdsc->mutex
805 */
806static void put_request_session(struct ceph_mds_request *req)
807{
808	if (req->r_session) {
809		ceph_put_mds_session(req->r_session);
810		req->r_session = NULL;
811	}
812}
813
814void ceph_mdsc_release_request(struct kref *kref)
815{
816	struct ceph_mds_request *req = container_of(kref,
817						    struct ceph_mds_request,
818						    r_kref);
819	ceph_mdsc_release_dir_caps_no_check(req);
820	destroy_reply_info(&req->r_reply_info);
821	if (req->r_request)
822		ceph_msg_put(req->r_request);
823	if (req->r_reply)
824		ceph_msg_put(req->r_reply);
825	if (req->r_inode) {
826		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
827		iput(req->r_inode);
828	}
829	if (req->r_parent) {
830		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
831		iput(req->r_parent);
832	}
833	iput(req->r_target_inode);
834	if (req->r_dentry)
835		dput(req->r_dentry);
836	if (req->r_old_dentry)
837		dput(req->r_old_dentry);
838	if (req->r_old_dentry_dir) {
839		/*
840		 * track (and drop pins for) r_old_dentry_dir
841		 * separately, since r_old_dentry's d_parent may have
842		 * changed between the dir mutex being dropped and
843		 * this request being freed.
844		 */
845		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
846				  CEPH_CAP_PIN);
847		iput(req->r_old_dentry_dir);
848	}
849	kfree(req->r_path1);
850	kfree(req->r_path2);
851	put_cred(req->r_cred);
852	if (req->r_pagelist)
853		ceph_pagelist_release(req->r_pagelist);
854	put_request_session(req);
855	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
856	WARN_ON_ONCE(!list_empty(&req->r_wait));
857	kmem_cache_free(ceph_mds_request_cachep, req);
858}
859
860DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
861
862/*
863 * lookup session, bump ref if found.
864 *
865 * called under mdsc->mutex.
866 */
867static struct ceph_mds_request *
868lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
869{
870	struct ceph_mds_request *req;
871
872	req = lookup_request(&mdsc->request_tree, tid);
873	if (req)
874		ceph_mdsc_get_request(req);
875
876	return req;
877}
878
879/*
880 * Register an in-flight request, and assign a tid.  Link to directory
881 * are modifying (if any).
882 *
883 * Called under mdsc->mutex.
884 */
885static void __register_request(struct ceph_mds_client *mdsc,
886			       struct ceph_mds_request *req,
887			       struct inode *dir)
888{
889	int ret = 0;
890
891	req->r_tid = ++mdsc->last_tid;
892	if (req->r_num_caps) {
893		ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
894					req->r_num_caps);
895		if (ret < 0) {
896			pr_err("__register_request %p "
897			       "failed to reserve caps: %d\n", req, ret);
898			/* set req->r_err to fail early from __do_request */
899			req->r_err = ret;
900			return;
901		}
902	}
903	dout("__register_request %p tid %lld\n", req, req->r_tid);
904	ceph_mdsc_get_request(req);
905	insert_request(&mdsc->request_tree, req);
906
907	req->r_cred = get_current_cred();
908
909	if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
910		mdsc->oldest_tid = req->r_tid;
911
912	if (dir) {
913		struct ceph_inode_info *ci = ceph_inode(dir);
914
915		ihold(dir);
916		req->r_unsafe_dir = dir;
917		spin_lock(&ci->i_unsafe_lock);
918		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
919		spin_unlock(&ci->i_unsafe_lock);
920	}
921}
922
923static void __unregister_request(struct ceph_mds_client *mdsc,
924				 struct ceph_mds_request *req)
925{
926	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
927
928	/* Never leave an unregistered request on an unsafe list! */
929	list_del_init(&req->r_unsafe_item);
930
931	if (req->r_tid == mdsc->oldest_tid) {
932		struct rb_node *p = rb_next(&req->r_node);
933		mdsc->oldest_tid = 0;
934		while (p) {
935			struct ceph_mds_request *next_req =
936				rb_entry(p, struct ceph_mds_request, r_node);
937			if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
938				mdsc->oldest_tid = next_req->r_tid;
939				break;
940			}
941			p = rb_next(p);
942		}
943	}
944
945	erase_request(&mdsc->request_tree, req);
946
947	if (req->r_unsafe_dir) {
948		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
949		spin_lock(&ci->i_unsafe_lock);
950		list_del_init(&req->r_unsafe_dir_item);
951		spin_unlock(&ci->i_unsafe_lock);
952	}
953	if (req->r_target_inode &&
954	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
955		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
956		spin_lock(&ci->i_unsafe_lock);
957		list_del_init(&req->r_unsafe_target_item);
958		spin_unlock(&ci->i_unsafe_lock);
959	}
960
961	if (req->r_unsafe_dir) {
962		iput(req->r_unsafe_dir);
963		req->r_unsafe_dir = NULL;
964	}
965
966	complete_all(&req->r_safe_completion);
967
968	ceph_mdsc_put_request(req);
969}
970
971/*
972 * Walk back up the dentry tree until we hit a dentry representing a
973 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
974 * when calling this) to ensure that the objects won't disappear while we're
975 * working with them. Once we hit a candidate dentry, we attempt to take a
976 * reference to it, and return that as the result.
977 */
978static struct inode *get_nonsnap_parent(struct dentry *dentry)
979{
980	struct inode *inode = NULL;
981
982	while (dentry && !IS_ROOT(dentry)) {
983		inode = d_inode_rcu(dentry);
984		if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
985			break;
986		dentry = dentry->d_parent;
987	}
988	if (inode)
989		inode = igrab(inode);
990	return inode;
991}
992
993/*
994 * Choose mds to send request to next.  If there is a hint set in the
995 * request (e.g., due to a prior forward hint from the mds), use that.
996 * Otherwise, consult frag tree and/or caps to identify the
997 * appropriate mds.  If all else fails, choose randomly.
998 *
999 * Called under mdsc->mutex.
1000 */
1001static int __choose_mds(struct ceph_mds_client *mdsc,
1002			struct ceph_mds_request *req,
1003			bool *random)
1004{
1005	struct inode *inode;
1006	struct ceph_inode_info *ci;
1007	struct ceph_cap *cap;
1008	int mode = req->r_direct_mode;
1009	int mds = -1;
1010	u32 hash = req->r_direct_hash;
1011	bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1012
1013	if (random)
1014		*random = false;
1015
1016	/*
1017	 * is there a specific mds we should try?  ignore hint if we have
1018	 * no session and the mds is not up (active or recovering).
1019	 */
1020	if (req->r_resend_mds >= 0 &&
1021	    (__have_session(mdsc, req->r_resend_mds) ||
1022	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1023		dout("%s using resend_mds mds%d\n", __func__,
1024		     req->r_resend_mds);
1025		return req->r_resend_mds;
1026	}
1027
1028	if (mode == USE_RANDOM_MDS)
1029		goto random;
1030
1031	inode = NULL;
1032	if (req->r_inode) {
1033		if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1034			inode = req->r_inode;
1035			ihold(inode);
1036		} else {
1037			/* req->r_dentry is non-null for LSSNAP request */
1038			rcu_read_lock();
1039			inode = get_nonsnap_parent(req->r_dentry);
1040			rcu_read_unlock();
1041			dout("%s using snapdir's parent %p\n", __func__, inode);
1042		}
1043	} else if (req->r_dentry) {
1044		/* ignore race with rename; old or new d_parent is okay */
1045		struct dentry *parent;
1046		struct inode *dir;
1047
1048		rcu_read_lock();
1049		parent = READ_ONCE(req->r_dentry->d_parent);
1050		dir = req->r_parent ? : d_inode_rcu(parent);
1051
1052		if (!dir || dir->i_sb != mdsc->fsc->sb) {
1053			/*  not this fs or parent went negative */
1054			inode = d_inode(req->r_dentry);
1055			if (inode)
1056				ihold(inode);
1057		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
1058			/* direct snapped/virtual snapdir requests
1059			 * based on parent dir inode */
1060			inode = get_nonsnap_parent(parent);
1061			dout("%s using nonsnap parent %p\n", __func__, inode);
1062		} else {
1063			/* dentry target */
1064			inode = d_inode(req->r_dentry);
1065			if (!inode || mode == USE_AUTH_MDS) {
1066				/* dir + name */
1067				inode = igrab(dir);
1068				hash = ceph_dentry_hash(dir, req->r_dentry);
1069				is_hash = true;
1070			} else {
1071				ihold(inode);
1072			}
1073		}
1074		rcu_read_unlock();
1075	}
1076
1077	dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1078	     hash, mode);
1079	if (!inode)
1080		goto random;
1081	ci = ceph_inode(inode);
1082
1083	if (is_hash && S_ISDIR(inode->i_mode)) {
1084		struct ceph_inode_frag frag;
1085		int found;
1086
1087		ceph_choose_frag(ci, hash, &frag, &found);
1088		if (found) {
1089			if (mode == USE_ANY_MDS && frag.ndist > 0) {
1090				u8 r;
1091
1092				/* choose a random replica */
1093				get_random_bytes(&r, 1);
1094				r %= frag.ndist;
1095				mds = frag.dist[r];
1096				dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1097				     __func__, inode, ceph_vinop(inode),
1098				     frag.frag, mds, (int)r, frag.ndist);
1099				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1100				    CEPH_MDS_STATE_ACTIVE &&
1101				    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1102					goto out;
1103			}
1104
1105			/* since this file/dir wasn't known to be
1106			 * replicated, then we want to look for the
1107			 * authoritative mds. */
1108			if (frag.mds >= 0) {
1109				/* choose auth mds */
1110				mds = frag.mds;
1111				dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1112				     __func__, inode, ceph_vinop(inode),
1113				     frag.frag, mds);
1114				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1115				    CEPH_MDS_STATE_ACTIVE) {
1116					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1117								  mds))
1118						goto out;
1119				}
1120			}
1121			mode = USE_AUTH_MDS;
1122		}
1123	}
1124
1125	spin_lock(&ci->i_ceph_lock);
1126	cap = NULL;
1127	if (mode == USE_AUTH_MDS)
1128		cap = ci->i_auth_cap;
1129	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1130		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1131	if (!cap) {
1132		spin_unlock(&ci->i_ceph_lock);
1133		iput(inode);
1134		goto random;
1135	}
1136	mds = cap->session->s_mds;
1137	dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1138	     inode, ceph_vinop(inode), mds,
1139	     cap == ci->i_auth_cap ? "auth " : "", cap);
1140	spin_unlock(&ci->i_ceph_lock);
1141out:
1142	iput(inode);
1143	return mds;
1144
1145random:
1146	if (random)
1147		*random = true;
1148
1149	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1150	dout("%s chose random mds%d\n", __func__, mds);
1151	return mds;
1152}
1153
1154
1155/*
1156 * session messages
1157 */
1158struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1159{
1160	struct ceph_msg *msg;
1161	struct ceph_mds_session_head *h;
1162
1163	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1164			   false);
1165	if (!msg) {
1166		pr_err("ENOMEM creating session %s msg\n",
1167		       ceph_session_op_name(op));
1168		return NULL;
1169	}
1170	h = msg->front.iov_base;
1171	h->op = cpu_to_le32(op);
1172	h->seq = cpu_to_le64(seq);
1173
1174	return msg;
1175}
1176
1177static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1178#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1179static int encode_supported_features(void **p, void *end)
1180{
1181	static const size_t count = ARRAY_SIZE(feature_bits);
1182
1183	if (count > 0) {
1184		size_t i;
1185		size_t size = FEATURE_BYTES(count);
1186
1187		if (WARN_ON_ONCE(*p + 4 + size > end))
1188			return -ERANGE;
1189
1190		ceph_encode_32(p, size);
1191		memset(*p, 0, size);
1192		for (i = 0; i < count; i++)
1193			((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
1194		*p += size;
1195	} else {
1196		if (WARN_ON_ONCE(*p + 4 > end))
1197			return -ERANGE;
1198
1199		ceph_encode_32(p, 0);
1200	}
1201
1202	return 0;
1203}
1204
1205static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1206#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1207static int encode_metric_spec(void **p, void *end)
1208{
1209	static const size_t count = ARRAY_SIZE(metric_bits);
1210
1211	/* header */
1212	if (WARN_ON_ONCE(*p + 2 > end))
1213		return -ERANGE;
1214
1215	ceph_encode_8(p, 1); /* version */
1216	ceph_encode_8(p, 1); /* compat */
1217
1218	if (count > 0) {
1219		size_t i;
1220		size_t size = METRIC_BYTES(count);
1221
1222		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1223			return -ERANGE;
1224
1225		/* metric spec info length */
1226		ceph_encode_32(p, 4 + size);
1227
1228		/* metric spec */
1229		ceph_encode_32(p, size);
1230		memset(*p, 0, size);
1231		for (i = 0; i < count; i++)
1232			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1233		*p += size;
1234	} else {
1235		if (WARN_ON_ONCE(*p + 4 + 4 > end))
1236			return -ERANGE;
1237
1238		/* metric spec info length */
1239		ceph_encode_32(p, 4);
1240		/* metric spec */
1241		ceph_encode_32(p, 0);
1242	}
1243
1244	return 0;
1245}
1246
1247/*
1248 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1249 * to include additional client metadata fields.
1250 */
1251static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1252{
1253	struct ceph_msg *msg;
1254	struct ceph_mds_session_head *h;
1255	int i;
1256	int extra_bytes = 0;
1257	int metadata_key_count = 0;
1258	struct ceph_options *opt = mdsc->fsc->client->options;
1259	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1260	size_t size, count;
1261	void *p, *end;
1262	int ret;
1263
1264	const char* metadata[][2] = {
1265		{"hostname", mdsc->nodename},
1266		{"kernel_version", init_utsname()->release},
1267		{"entity_id", opt->name ? : ""},
1268		{"root", fsopt->server_path ? : "/"},
1269		{NULL, NULL}
1270	};
1271
1272	/* Calculate serialized length of metadata */
1273	extra_bytes = 4;  /* map length */
1274	for (i = 0; metadata[i][0]; ++i) {
1275		extra_bytes += 8 + strlen(metadata[i][0]) +
1276			strlen(metadata[i][1]);
1277		metadata_key_count++;
1278	}
1279
1280	/* supported feature */
1281	size = 0;
1282	count = ARRAY_SIZE(feature_bits);
1283	if (count > 0)
1284		size = FEATURE_BYTES(count);
1285	extra_bytes += 4 + size;
1286
1287	/* metric spec */
1288	size = 0;
1289	count = ARRAY_SIZE(metric_bits);
1290	if (count > 0)
1291		size = METRIC_BYTES(count);
1292	extra_bytes += 2 + 4 + 4 + size;
1293
1294	/* Allocate the message */
1295	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1296			   GFP_NOFS, false);
1297	if (!msg) {
1298		pr_err("ENOMEM creating session open msg\n");
1299		return ERR_PTR(-ENOMEM);
1300	}
1301	p = msg->front.iov_base;
1302	end = p + msg->front.iov_len;
1303
1304	h = p;
1305	h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1306	h->seq = cpu_to_le64(seq);
1307
1308	/*
1309	 * Serialize client metadata into waiting buffer space, using
1310	 * the format that userspace expects for map<string, string>
1311	 *
1312	 * ClientSession messages with metadata are v4
1313	 */
1314	msg->hdr.version = cpu_to_le16(4);
1315	msg->hdr.compat_version = cpu_to_le16(1);
1316
1317	/* The write pointer, following the session_head structure */
1318	p += sizeof(*h);
1319
1320	/* Number of entries in the map */
1321	ceph_encode_32(&p, metadata_key_count);
1322
1323	/* Two length-prefixed strings for each entry in the map */
1324	for (i = 0; metadata[i][0]; ++i) {
1325		size_t const key_len = strlen(metadata[i][0]);
1326		size_t const val_len = strlen(metadata[i][1]);
1327
1328		ceph_encode_32(&p, key_len);
1329		memcpy(p, metadata[i][0], key_len);
1330		p += key_len;
1331		ceph_encode_32(&p, val_len);
1332		memcpy(p, metadata[i][1], val_len);
1333		p += val_len;
1334	}
1335
1336	ret = encode_supported_features(&p, end);
1337	if (ret) {
1338		pr_err("encode_supported_features failed!\n");
1339		ceph_msg_put(msg);
1340		return ERR_PTR(ret);
1341	}
1342
1343	ret = encode_metric_spec(&p, end);
1344	if (ret) {
1345		pr_err("encode_metric_spec failed!\n");
1346		ceph_msg_put(msg);
1347		return ERR_PTR(ret);
1348	}
1349
1350	msg->front.iov_len = p - msg->front.iov_base;
1351	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1352
1353	return msg;
1354}
1355
1356/*
1357 * send session open request.
1358 *
1359 * called under mdsc->mutex
1360 */
1361static int __open_session(struct ceph_mds_client *mdsc,
1362			  struct ceph_mds_session *session)
1363{
1364	struct ceph_msg *msg;
1365	int mstate;
1366	int mds = session->s_mds;
1367
1368	/* wait for mds to go active? */
1369	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1370	dout("open_session to mds%d (%s)\n", mds,
1371	     ceph_mds_state_name(mstate));
1372	session->s_state = CEPH_MDS_SESSION_OPENING;
1373	session->s_renew_requested = jiffies;
1374
1375	/* send connect message */
1376	msg = create_session_open_msg(mdsc, session->s_seq);
1377	if (IS_ERR(msg))
1378		return PTR_ERR(msg);
1379	ceph_con_send(&session->s_con, msg);
1380	return 0;
1381}
1382
1383/*
1384 * open sessions for any export targets for the given mds
1385 *
1386 * called under mdsc->mutex
1387 */
1388static struct ceph_mds_session *
1389__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1390{
1391	struct ceph_mds_session *session;
1392	int ret;
1393
1394	session = __ceph_lookup_mds_session(mdsc, target);
1395	if (!session) {
1396		session = register_session(mdsc, target);
1397		if (IS_ERR(session))
1398			return session;
1399	}
1400	if (session->s_state == CEPH_MDS_SESSION_NEW ||
1401	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
1402		ret = __open_session(mdsc, session);
1403		if (ret)
1404			return ERR_PTR(ret);
1405	}
1406
1407	return session;
1408}
1409
1410struct ceph_mds_session *
1411ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1412{
1413	struct ceph_mds_session *session;
1414
1415	dout("open_export_target_session to mds%d\n", target);
1416
1417	mutex_lock(&mdsc->mutex);
1418	session = __open_export_target_session(mdsc, target);
1419	mutex_unlock(&mdsc->mutex);
1420
1421	return session;
1422}
1423
1424static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1425					  struct ceph_mds_session *session)
1426{
1427	struct ceph_mds_info *mi;
1428	struct ceph_mds_session *ts;
1429	int i, mds = session->s_mds;
1430
1431	if (mds >= mdsc->mdsmap->possible_max_rank)
1432		return;
1433
1434	mi = &mdsc->mdsmap->m_info[mds];
1435	dout("open_export_target_sessions for mds%d (%d targets)\n",
1436	     session->s_mds, mi->num_export_targets);
1437
1438	for (i = 0; i < mi->num_export_targets; i++) {
1439		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1440		ceph_put_mds_session(ts);
1441	}
1442}
1443
1444void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1445					   struct ceph_mds_session *session)
1446{
1447	mutex_lock(&mdsc->mutex);
1448	__open_export_target_sessions(mdsc, session);
1449	mutex_unlock(&mdsc->mutex);
1450}
1451
1452/*
1453 * session caps
1454 */
1455
1456static void detach_cap_releases(struct ceph_mds_session *session,
1457				struct list_head *target)
1458{
1459	lockdep_assert_held(&session->s_cap_lock);
1460
1461	list_splice_init(&session->s_cap_releases, target);
1462	session->s_num_cap_releases = 0;
1463	dout("dispose_cap_releases mds%d\n", session->s_mds);
1464}
1465
1466static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1467				 struct list_head *dispose)
1468{
1469	while (!list_empty(dispose)) {
1470		struct ceph_cap *cap;
1471		/* zero out the in-progress message */
1472		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1473		list_del(&cap->session_caps);
1474		ceph_put_cap(mdsc, cap);
1475	}
1476}
1477
1478static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1479				     struct ceph_mds_session *session)
1480{
1481	struct ceph_mds_request *req;
1482	struct rb_node *p;
1483	struct ceph_inode_info *ci;
1484
1485	dout("cleanup_session_requests mds%d\n", session->s_mds);
1486	mutex_lock(&mdsc->mutex);
1487	while (!list_empty(&session->s_unsafe)) {
1488		req = list_first_entry(&session->s_unsafe,
1489				       struct ceph_mds_request, r_unsafe_item);
1490		pr_warn_ratelimited(" dropping unsafe request %llu\n",
1491				    req->r_tid);
1492		if (req->r_target_inode) {
1493			/* dropping unsafe change of inode's attributes */
1494			ci = ceph_inode(req->r_target_inode);
1495			errseq_set(&ci->i_meta_err, -EIO);
1496		}
1497		if (req->r_unsafe_dir) {
1498			/* dropping unsafe directory operation */
1499			ci = ceph_inode(req->r_unsafe_dir);
1500			errseq_set(&ci->i_meta_err, -EIO);
1501		}
1502		__unregister_request(mdsc, req);
1503	}
1504	/* zero r_attempts, so kick_requests() will re-send requests */
1505	p = rb_first(&mdsc->request_tree);
1506	while (p) {
1507		req = rb_entry(p, struct ceph_mds_request, r_node);
1508		p = rb_next(p);
1509		if (req->r_session &&
1510		    req->r_session->s_mds == session->s_mds)
1511			req->r_attempts = 0;
1512	}
1513	mutex_unlock(&mdsc->mutex);
1514}
1515
1516/*
1517 * Helper to safely iterate over all caps associated with a session, with
1518 * special care taken to handle a racing __ceph_remove_cap().
1519 *
1520 * Caller must hold session s_mutex.
1521 */
1522int ceph_iterate_session_caps(struct ceph_mds_session *session,
1523			      int (*cb)(struct inode *, struct ceph_cap *,
1524					void *), void *arg)
1525{
1526	struct list_head *p;
1527	struct ceph_cap *cap;
1528	struct inode *inode, *last_inode = NULL;
1529	struct ceph_cap *old_cap = NULL;
1530	int ret;
1531
1532	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1533	spin_lock(&session->s_cap_lock);
1534	p = session->s_caps.next;
1535	while (p != &session->s_caps) {
1536		cap = list_entry(p, struct ceph_cap, session_caps);
1537		inode = igrab(&cap->ci->vfs_inode);
1538		if (!inode) {
1539			p = p->next;
1540			continue;
1541		}
1542		session->s_cap_iterator = cap;
1543		spin_unlock(&session->s_cap_lock);
1544
1545		if (last_inode) {
1546			iput(last_inode);
1547			last_inode = NULL;
1548		}
1549		if (old_cap) {
1550			ceph_put_cap(session->s_mdsc, old_cap);
1551			old_cap = NULL;
1552		}
1553
1554		ret = cb(inode, cap, arg);
1555		last_inode = inode;
1556
1557		spin_lock(&session->s_cap_lock);
1558		p = p->next;
1559		if (!cap->ci) {
1560			dout("iterate_session_caps  finishing cap %p removal\n",
1561			     cap);
1562			BUG_ON(cap->session != session);
1563			cap->session = NULL;
1564			list_del_init(&cap->session_caps);
1565			session->s_nr_caps--;
1566			atomic64_dec(&session->s_mdsc->metric.total_caps);
1567			if (cap->queue_release)
1568				__ceph_queue_cap_release(session, cap);
1569			else
1570				old_cap = cap;  /* put_cap it w/o locks held */
1571		}
1572		if (ret < 0)
1573			goto out;
1574	}
1575	ret = 0;
1576out:
1577	session->s_cap_iterator = NULL;
1578	spin_unlock(&session->s_cap_lock);
1579
1580	iput(last_inode);
1581	if (old_cap)
1582		ceph_put_cap(session->s_mdsc, old_cap);
1583
1584	return ret;
1585}
1586
1587static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1588				  void *arg)
1589{
1590	struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1591	struct ceph_inode_info *ci = ceph_inode(inode);
1592	LIST_HEAD(to_remove);
1593	bool dirty_dropped = false;
1594	bool invalidate = false;
1595
1596	dout("removing cap %p, ci is %p, inode is %p\n",
1597	     cap, ci, &ci->vfs_inode);
1598	spin_lock(&ci->i_ceph_lock);
1599	__ceph_remove_cap(cap, false);
1600	if (!ci->i_auth_cap) {
1601		struct ceph_cap_flush *cf;
1602		struct ceph_mds_client *mdsc = fsc->mdsc;
1603
1604		if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
1605			if (inode->i_data.nrpages > 0)
1606				invalidate = true;
1607			if (ci->i_wrbuffer_ref > 0)
1608				mapping_set_error(&inode->i_data, -EIO);
1609		}
1610
1611		while (!list_empty(&ci->i_cap_flush_list)) {
1612			cf = list_first_entry(&ci->i_cap_flush_list,
1613					      struct ceph_cap_flush, i_list);
1614			list_move(&cf->i_list, &to_remove);
1615		}
1616
1617		spin_lock(&mdsc->cap_dirty_lock);
1618
1619		list_for_each_entry(cf, &to_remove, i_list)
1620			list_del_init(&cf->g_list);
1621
1622		if (!list_empty(&ci->i_dirty_item)) {
1623			pr_warn_ratelimited(
1624				" dropping dirty %s state for %p %lld\n",
1625				ceph_cap_string(ci->i_dirty_caps),
1626				inode, ceph_ino(inode));
1627			ci->i_dirty_caps = 0;
1628			list_del_init(&ci->i_dirty_item);
1629			dirty_dropped = true;
1630		}
1631		if (!list_empty(&ci->i_flushing_item)) {
1632			pr_warn_ratelimited(
1633				" dropping dirty+flushing %s state for %p %lld\n",
1634				ceph_cap_string(ci->i_flushing_caps),
1635				inode, ceph_ino(inode));
1636			ci->i_flushing_caps = 0;
1637			list_del_init(&ci->i_flushing_item);
1638			mdsc->num_cap_flushing--;
1639			dirty_dropped = true;
1640		}
1641		spin_unlock(&mdsc->cap_dirty_lock);
1642
1643		if (dirty_dropped) {
1644			errseq_set(&ci->i_meta_err, -EIO);
1645
1646			if (ci->i_wrbuffer_ref_head == 0 &&
1647			    ci->i_wr_ref == 0 &&
1648			    ci->i_dirty_caps == 0 &&
1649			    ci->i_flushing_caps == 0) {
1650				ceph_put_snap_context(ci->i_head_snapc);
1651				ci->i_head_snapc = NULL;
1652			}
1653		}
1654
1655		if (atomic_read(&ci->i_filelock_ref) > 0) {
1656			/* make further file lock syscall return -EIO */
1657			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1658			pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1659					    inode, ceph_ino(inode));
1660		}
1661
1662		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1663			list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1664			ci->i_prealloc_cap_flush = NULL;
1665		}
1666	}
1667	spin_unlock(&ci->i_ceph_lock);
1668	while (!list_empty(&to_remove)) {
1669		struct ceph_cap_flush *cf;
1670		cf = list_first_entry(&to_remove,
1671				      struct ceph_cap_flush, i_list);
1672		list_del_init(&cf->i_list);
1673		if (!cf->is_capsnap)
1674			ceph_free_cap_flush(cf);
1675	}
1676
1677	wake_up_all(&ci->i_cap_wq);
1678	if (invalidate)
1679		ceph_queue_invalidate(inode);
1680	if (dirty_dropped)
1681		iput(inode);
1682	return 0;
1683}
1684
1685/*
1686 * caller must hold session s_mutex
1687 */
1688static void remove_session_caps(struct ceph_mds_session *session)
1689{
1690	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1691	struct super_block *sb = fsc->sb;
1692	LIST_HEAD(dispose);
1693
1694	dout("remove_session_caps on %p\n", session);
1695	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1696
1697	wake_up_all(&fsc->mdsc->cap_flushing_wq);
1698
1699	spin_lock(&session->s_cap_lock);
1700	if (session->s_nr_caps > 0) {
1701		struct inode *inode;
1702		struct ceph_cap *cap, *prev = NULL;
1703		struct ceph_vino vino;
1704		/*
1705		 * iterate_session_caps() skips inodes that are being
1706		 * deleted, we need to wait until deletions are complete.
1707		 * __wait_on_freeing_inode() is designed for the job,
1708		 * but it is not exported, so use lookup inode function
1709		 * to access it.
1710		 */
1711		while (!list_empty(&session->s_caps)) {
1712			cap = list_entry(session->s_caps.next,
1713					 struct ceph_cap, session_caps);
1714			if (cap == prev)
1715				break;
1716			prev = cap;
1717			vino = cap->ci->i_vino;
1718			spin_unlock(&session->s_cap_lock);
1719
1720			inode = ceph_find_inode(sb, vino);
1721			iput(inode);
1722
1723			spin_lock(&session->s_cap_lock);
1724		}
1725	}
1726
1727	// drop cap expires and unlock s_cap_lock
1728	detach_cap_releases(session, &dispose);
1729
1730	BUG_ON(session->s_nr_caps > 0);
1731	BUG_ON(!list_empty(&session->s_cap_flushing));
1732	spin_unlock(&session->s_cap_lock);
1733	dispose_cap_releases(session->s_mdsc, &dispose);
1734}
1735
1736enum {
1737	RECONNECT,
1738	RENEWCAPS,
1739	FORCE_RO,
1740};
1741
1742/*
1743 * wake up any threads waiting on this session's caps.  if the cap is
1744 * old (didn't get renewed on the client reconnect), remove it now.
1745 *
1746 * caller must hold s_mutex.
1747 */
1748static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1749			      void *arg)
1750{
1751	struct ceph_inode_info *ci = ceph_inode(inode);
1752	unsigned long ev = (unsigned long)arg;
1753
1754	if (ev == RECONNECT) {
1755		spin_lock(&ci->i_ceph_lock);
1756		ci->i_wanted_max_size = 0;
1757		ci->i_requested_max_size = 0;
1758		spin_unlock(&ci->i_ceph_lock);
1759	} else if (ev == RENEWCAPS) {
1760		if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) {
1761			/* mds did not re-issue stale cap */
1762			spin_lock(&ci->i_ceph_lock);
1763			cap->issued = cap->implemented = CEPH_CAP_PIN;
1764			spin_unlock(&ci->i_ceph_lock);
1765		}
1766	} else if (ev == FORCE_RO) {
1767	}
1768	wake_up_all(&ci->i_cap_wq);
1769	return 0;
1770}
1771
1772static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1773{
1774	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1775	ceph_iterate_session_caps(session, wake_up_session_cb,
1776				  (void *)(unsigned long)ev);
1777}
1778
1779/*
1780 * Send periodic message to MDS renewing all currently held caps.  The
1781 * ack will reset the expiration for all caps from this session.
1782 *
1783 * caller holds s_mutex
1784 */
1785static int send_renew_caps(struct ceph_mds_client *mdsc,
1786			   struct ceph_mds_session *session)
1787{
1788	struct ceph_msg *msg;
1789	int state;
1790
1791	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1792	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1793		pr_info("mds%d caps stale\n", session->s_mds);
1794	session->s_renew_requested = jiffies;
1795
1796	/* do not try to renew caps until a recovering mds has reconnected
1797	 * with its clients. */
1798	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1799	if (state < CEPH_MDS_STATE_RECONNECT) {
1800		dout("send_renew_caps ignoring mds%d (%s)\n",
1801		     session->s_mds, ceph_mds_state_name(state));
1802		return 0;
1803	}
1804
1805	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1806		ceph_mds_state_name(state));
1807	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1808				      ++session->s_renew_seq);
1809	if (!msg)
1810		return -ENOMEM;
1811	ceph_con_send(&session->s_con, msg);
1812	return 0;
1813}
1814
1815static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1816			     struct ceph_mds_session *session, u64 seq)
1817{
1818	struct ceph_msg *msg;
1819
1820	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1821	     session->s_mds, ceph_session_state_name(session->s_state), seq);
1822	msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1823	if (!msg)
1824		return -ENOMEM;
1825	ceph_con_send(&session->s_con, msg);
1826	return 0;
1827}
1828
1829
1830/*
1831 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1832 *
1833 * Called under session->s_mutex
1834 */
1835static void renewed_caps(struct ceph_mds_client *mdsc,
1836			 struct ceph_mds_session *session, int is_renew)
1837{
1838	int was_stale;
1839	int wake = 0;
1840
1841	spin_lock(&session->s_cap_lock);
1842	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1843
1844	session->s_cap_ttl = session->s_renew_requested +
1845		mdsc->mdsmap->m_session_timeout*HZ;
1846
1847	if (was_stale) {
1848		if (time_before(jiffies, session->s_cap_ttl)) {
1849			pr_info("mds%d caps renewed\n", session->s_mds);
1850			wake = 1;
1851		} else {
1852			pr_info("mds%d caps still stale\n", session->s_mds);
1853		}
1854	}
1855	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1856	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1857	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1858	spin_unlock(&session->s_cap_lock);
1859
1860	if (wake)
1861		wake_up_session_caps(session, RENEWCAPS);
1862}
1863
1864/*
1865 * send a session close request
1866 */
1867static int request_close_session(struct ceph_mds_session *session)
1868{
1869	struct ceph_msg *msg;
1870
1871	dout("request_close_session mds%d state %s seq %lld\n",
1872	     session->s_mds, ceph_session_state_name(session->s_state),
1873	     session->s_seq);
1874	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
1875				      session->s_seq);
1876	if (!msg)
1877		return -ENOMEM;
1878	ceph_con_send(&session->s_con, msg);
1879	return 1;
1880}
1881
1882/*
1883 * Called with s_mutex held.
1884 */
1885static int __close_session(struct ceph_mds_client *mdsc,
1886			 struct ceph_mds_session *session)
1887{
1888	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1889		return 0;
1890	session->s_state = CEPH_MDS_SESSION_CLOSING;
1891	return request_close_session(session);
1892}
1893
1894static bool drop_negative_children(struct dentry *dentry)
1895{
1896	struct dentry *child;
1897	bool all_negative = true;
1898
1899	if (!d_is_dir(dentry))
1900		goto out;
1901
1902	spin_lock(&dentry->d_lock);
1903	list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1904		if (d_really_is_positive(child)) {
1905			all_negative = false;
1906			break;
1907		}
1908	}
1909	spin_unlock(&dentry->d_lock);
1910
1911	if (all_negative)
1912		shrink_dcache_parent(dentry);
1913out:
1914	return all_negative;
1915}
1916
1917/*
1918 * Trim old(er) caps.
1919 *
1920 * Because we can't cache an inode without one or more caps, we do
1921 * this indirectly: if a cap is unused, we prune its aliases, at which
1922 * point the inode will hopefully get dropped to.
1923 *
1924 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1925 * memory pressure from the MDS, though, so it needn't be perfect.
1926 */
1927static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1928{
1929	int *remaining = arg;
1930	struct ceph_inode_info *ci = ceph_inode(inode);
1931	int used, wanted, oissued, mine;
1932
1933	if (*remaining <= 0)
1934		return -1;
1935
1936	spin_lock(&ci->i_ceph_lock);
1937	mine = cap->issued | cap->implemented;
1938	used = __ceph_caps_used(ci);
1939	wanted = __ceph_caps_file_wanted(ci);
1940	oissued = __ceph_caps_issued_other(ci, cap);
1941
1942	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1943	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1944	     ceph_cap_string(used), ceph_cap_string(wanted));
1945	if (cap == ci->i_auth_cap) {
1946		if (ci->i_dirty_caps || ci->i_flushing_caps ||
1947		    !list_empty(&ci->i_cap_snaps))
1948			goto out;
1949		if ((used | wanted) & CEPH_CAP_ANY_WR)
1950			goto out;
1951		/* Note: it's possible that i_filelock_ref becomes non-zero
1952		 * after dropping auth caps. It doesn't hurt because reply
1953		 * of lock mds request will re-add auth caps. */
1954		if (atomic_read(&ci->i_filelock_ref) > 0)
1955			goto out;
1956	}
1957	/* The inode has cached pages, but it's no longer used.
1958	 * we can safely drop it */
1959	if (S_ISREG(inode->i_mode) &&
1960	    wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1961	    !(oissued & CEPH_CAP_FILE_CACHE)) {
1962	  used = 0;
1963	  oissued = 0;
1964	}
1965	if ((used | wanted) & ~oissued & mine)
1966		goto out;   /* we need these caps */
1967
1968	if (oissued) {
1969		/* we aren't the only cap.. just remove us */
1970		__ceph_remove_cap(cap, true);
1971		(*remaining)--;
1972	} else {
1973		struct dentry *dentry;
1974		/* try dropping referring dentries */
1975		spin_unlock(&ci->i_ceph_lock);
1976		dentry = d_find_any_alias(inode);
1977		if (dentry && drop_negative_children(dentry)) {
1978			int count;
1979			dput(dentry);
1980			d_prune_aliases(inode);
1981			count = atomic_read(&inode->i_count);
1982			if (count == 1)
1983				(*remaining)--;
1984			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1985			     inode, cap, count);
1986		} else {
1987			dput(dentry);
1988		}
1989		return 0;
1990	}
1991
1992out:
1993	spin_unlock(&ci->i_ceph_lock);
1994	return 0;
1995}
1996
1997/*
1998 * Trim session cap count down to some max number.
1999 */
2000int ceph_trim_caps(struct ceph_mds_client *mdsc,
2001		   struct ceph_mds_session *session,
2002		   int max_caps)
2003{
2004	int trim_caps = session->s_nr_caps - max_caps;
2005
2006	dout("trim_caps mds%d start: %d / %d, trim %d\n",
2007	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2008	if (trim_caps > 0) {
2009		int remaining = trim_caps;
2010
2011		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2012		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2013		     session->s_mds, session->s_nr_caps, max_caps,
2014			trim_caps - remaining);
2015	}
2016
2017	ceph_flush_cap_releases(mdsc, session);
2018	return 0;
2019}
2020
2021static int check_caps_flush(struct ceph_mds_client *mdsc,
2022			    u64 want_flush_tid)
2023{
2024	int ret = 1;
2025
2026	spin_lock(&mdsc->cap_dirty_lock);
2027	if (!list_empty(&mdsc->cap_flush_list)) {
2028		struct ceph_cap_flush *cf =
2029			list_first_entry(&mdsc->cap_flush_list,
2030					 struct ceph_cap_flush, g_list);
2031		if (cf->tid <= want_flush_tid) {
2032			dout("check_caps_flush still flushing tid "
2033			     "%llu <= %llu\n", cf->tid, want_flush_tid);
2034			ret = 0;
2035		}
2036	}
2037	spin_unlock(&mdsc->cap_dirty_lock);
2038	return ret;
2039}
2040
2041/*
2042 * flush all dirty inode data to disk.
2043 *
2044 * returns true if we've flushed through want_flush_tid
2045 */
2046static void wait_caps_flush(struct ceph_mds_client *mdsc,
2047			    u64 want_flush_tid)
2048{
2049	dout("check_caps_flush want %llu\n", want_flush_tid);
2050
2051	wait_event(mdsc->cap_flushing_wq,
2052		   check_caps_flush(mdsc, want_flush_tid));
2053
2054	dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2055}
2056
2057/*
2058 * called under s_mutex
2059 */
2060static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2061				   struct ceph_mds_session *session)
2062{
2063	struct ceph_msg *msg = NULL;
2064	struct ceph_mds_cap_release *head;
2065	struct ceph_mds_cap_item *item;
2066	struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2067	struct ceph_cap *cap;
2068	LIST_HEAD(tmp_list);
2069	int num_cap_releases;
2070	__le32	barrier, *cap_barrier;
2071
2072	down_read(&osdc->lock);
2073	barrier = cpu_to_le32(osdc->epoch_barrier);
2074	up_read(&osdc->lock);
2075
2076	spin_lock(&session->s_cap_lock);
2077again:
2078	list_splice_init(&session->s_cap_releases, &tmp_list);
2079	num_cap_releases = session->s_num_cap_releases;
2080	session->s_num_cap_releases = 0;
2081	spin_unlock(&session->s_cap_lock);
2082
2083	while (!list_empty(&tmp_list)) {
2084		if (!msg) {
2085			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2086					PAGE_SIZE, GFP_NOFS, false);
2087			if (!msg)
2088				goto out_err;
2089			head = msg->front.iov_base;
2090			head->num = cpu_to_le32(0);
2091			msg->front.iov_len = sizeof(*head);
2092
2093			msg->hdr.version = cpu_to_le16(2);
2094			msg->hdr.compat_version = cpu_to_le16(1);
2095		}
2096
2097		cap = list_first_entry(&tmp_list, struct ceph_cap,
2098					session_caps);
2099		list_del(&cap->session_caps);
2100		num_cap_releases--;
2101
2102		head = msg->front.iov_base;
2103		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2104				   &head->num);
2105		item = msg->front.iov_base + msg->front.iov_len;
2106		item->ino = cpu_to_le64(cap->cap_ino);
2107		item->cap_id = cpu_to_le64(cap->cap_id);
2108		item->migrate_seq = cpu_to_le32(cap->mseq);
2109		item->seq = cpu_to_le32(cap->issue_seq);
2110		msg->front.iov_len += sizeof(*item);
2111
2112		ceph_put_cap(mdsc, cap);
2113
2114		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2115			// Append cap_barrier field
2116			cap_barrier = msg->front.iov_base + msg->front.iov_len;
2117			*cap_barrier = barrier;
2118			msg->front.iov_len += sizeof(*cap_barrier);
2119
2120			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2121			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2122			ceph_con_send(&session->s_con, msg);
2123			msg = NULL;
2124		}
2125	}
2126
2127	BUG_ON(num_cap_releases != 0);
2128
2129	spin_lock(&session->s_cap_lock);
2130	if (!list_empty(&session->s_cap_releases))
2131		goto again;
2132	spin_unlock(&session->s_cap_lock);
2133
2134	if (msg) {
2135		// Append cap_barrier field
2136		cap_barrier = msg->front.iov_base + msg->front.iov_len;
2137		*cap_barrier = barrier;
2138		msg->front.iov_len += sizeof(*cap_barrier);
2139
2140		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2141		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2142		ceph_con_send(&session->s_con, msg);
2143	}
2144	return;
2145out_err:
2146	pr_err("send_cap_releases mds%d, failed to allocate message\n",
2147		session->s_mds);
2148	spin_lock(&session->s_cap_lock);
2149	list_splice(&tmp_list, &session->s_cap_releases);
2150	session->s_num_cap_releases += num_cap_releases;
2151	spin_unlock(&session->s_cap_lock);
2152}
2153
2154static void ceph_cap_release_work(struct work_struct *work)
2155{
2156	struct ceph_mds_session *session =
2157		container_of(work, struct ceph_mds_session, s_cap_release_work);
2158
2159	mutex_lock(&session->s_mutex);
2160	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2161	    session->s_state == CEPH_MDS_SESSION_HUNG)
2162		ceph_send_cap_releases(session->s_mdsc, session);
2163	mutex_unlock(&session->s_mutex);
2164	ceph_put_mds_session(session);
2165}
2166
2167void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2168		             struct ceph_mds_session *session)
2169{
2170	if (mdsc->stopping)
2171		return;
2172
2173	ceph_get_mds_session(session);
2174	if (queue_work(mdsc->fsc->cap_wq,
2175		       &session->s_cap_release_work)) {
2176		dout("cap release work queued\n");
2177	} else {
2178		ceph_put_mds_session(session);
2179		dout("failed to queue cap release work\n");
2180	}
2181}
2182
2183/*
2184 * caller holds session->s_cap_lock
2185 */
2186void __ceph_queue_cap_release(struct ceph_mds_session *session,
2187			      struct ceph_cap *cap)
2188{
2189	list_add_tail(&cap->session_caps, &session->s_cap_releases);
2190	session->s_num_cap_releases++;
2191
2192	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2193		ceph_flush_cap_releases(session->s_mdsc, session);
2194}
2195
2196static void ceph_cap_reclaim_work(struct work_struct *work)
2197{
2198	struct ceph_mds_client *mdsc =
2199		container_of(work, struct ceph_mds_client, cap_reclaim_work);
2200	int ret = ceph_trim_dentries(mdsc);
2201	if (ret == -EAGAIN)
2202		ceph_queue_cap_reclaim_work(mdsc);
2203}
2204
2205void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2206{
2207	if (mdsc->stopping)
2208		return;
2209
2210        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2211                dout("caps reclaim work queued\n");
2212        } else {
2213                dout("failed to queue caps release work\n");
2214        }
2215}
2216
2217void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2218{
2219	int val;
2220	if (!nr)
2221		return;
2222	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2223	if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2224		atomic_set(&mdsc->cap_reclaim_pending, 0);
2225		ceph_queue_cap_reclaim_work(mdsc);
2226	}
2227}
2228
2229/*
2230 * requests
2231 */
2232
2233int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2234				    struct inode *dir)
2235{
2236	struct ceph_inode_info *ci = ceph_inode(dir);
2237	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2238	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2239	size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2240	unsigned int num_entries;
2241	int order;
2242
2243	spin_lock(&ci->i_ceph_lock);
2244	num_entries = ci->i_files + ci->i_subdirs;
2245	spin_unlock(&ci->i_ceph_lock);
2246	num_entries = max(num_entries, 1U);
2247	num_entries = min(num_entries, opt->max_readdir);
2248
2249	order = get_order(size * num_entries);
2250	while (order >= 0) {
2251		rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2252							     __GFP_NOWARN,
2253							     order);
2254		if (rinfo->dir_entries)
2255			break;
2256		order--;
2257	}
2258	if (!rinfo->dir_entries)
2259		return -ENOMEM;
2260
2261	num_entries = (PAGE_SIZE << order) / size;
2262	num_entries = min(num_entries, opt->max_readdir);
2263
2264	rinfo->dir_buf_size = PAGE_SIZE << order;
2265	req->r_num_caps = num_entries + 1;
2266	req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2267	req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2268	return 0;
2269}
2270
2271/*
2272 * Create an mds request.
2273 */
2274struct ceph_mds_request *
2275ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2276{
2277	struct ceph_mds_request *req;
2278
2279	req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2280	if (!req)
2281		return ERR_PTR(-ENOMEM);
2282
2283	mutex_init(&req->r_fill_mutex);
2284	req->r_mdsc = mdsc;
2285	req->r_started = jiffies;
2286	req->r_start_latency = ktime_get();
2287	req->r_resend_mds = -1;
2288	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2289	INIT_LIST_HEAD(&req->r_unsafe_target_item);
2290	req->r_fmode = -1;
2291	kref_init(&req->r_kref);
2292	RB_CLEAR_NODE(&req->r_node);
2293	INIT_LIST_HEAD(&req->r_wait);
2294	init_completion(&req->r_completion);
2295	init_completion(&req->r_safe_completion);
2296	INIT_LIST_HEAD(&req->r_unsafe_item);
2297
2298	ktime_get_coarse_real_ts64(&req->r_stamp);
2299
2300	req->r_op = op;
2301	req->r_direct_mode = mode;
2302	return req;
2303}
2304
2305/*
2306 * return oldest (lowest) request, tid in request tree, 0 if none.
2307 *
2308 * called under mdsc->mutex.
2309 */
2310static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2311{
2312	if (RB_EMPTY_ROOT(&mdsc->request_tree))
2313		return NULL;
2314	return rb_entry(rb_first(&mdsc->request_tree),
2315			struct ceph_mds_request, r_node);
2316}
2317
2318static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2319{
2320	return mdsc->oldest_tid;
2321}
2322
2323/*
2324 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2325 * on build_path_from_dentry in fs/cifs/dir.c.
2326 *
2327 * If @stop_on_nosnap, generate path relative to the first non-snapped
2328 * inode.
2329 *
2330 * Encode hidden .snap dirs as a double /, i.e.
2331 *   foo/.snap/bar -> foo//bar
2332 */
2333char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2334			   int stop_on_nosnap)
2335{
2336	struct dentry *temp;
2337	char *path;
2338	int pos;
2339	unsigned seq;
2340	u64 base;
2341
2342	if (!dentry)
2343		return ERR_PTR(-EINVAL);
2344
2345	path = __getname();
2346	if (!path)
2347		return ERR_PTR(-ENOMEM);
2348retry:
2349	pos = PATH_MAX - 1;
2350	path[pos] = '\0';
2351
2352	seq = read_seqbegin(&rename_lock);
2353	rcu_read_lock();
2354	temp = dentry;
2355	for (;;) {
2356		struct inode *inode;
2357
2358		spin_lock(&temp->d_lock);
2359		inode = d_inode(temp);
2360		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2361			dout("build_path path+%d: %p SNAPDIR\n",
2362			     pos, temp);
2363		} else if (stop_on_nosnap && inode && dentry != temp &&
2364			   ceph_snap(inode) == CEPH_NOSNAP) {
2365			spin_unlock(&temp->d_lock);
2366			pos++; /* get rid of any prepended '/' */
2367			break;
2368		} else {
2369			pos -= temp->d_name.len;
2370			if (pos < 0) {
2371				spin_unlock(&temp->d_lock);
2372				break;
2373			}
2374			memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2375		}
2376		spin_unlock(&temp->d_lock);
2377		temp = READ_ONCE(temp->d_parent);
2378
2379		/* Are we at the root? */
2380		if (IS_ROOT(temp))
2381			break;
2382
2383		/* Are we out of buffer? */
2384		if (--pos < 0)
2385			break;
2386
2387		path[pos] = '/';
2388	}
2389	base = ceph_ino(d_inode(temp));
2390	rcu_read_unlock();
2391
2392	if (read_seqretry(&rename_lock, seq))
2393		goto retry;
2394
2395	if (pos < 0) {
2396		/*
2397		 * A rename didn't occur, but somehow we didn't end up where
2398		 * we thought we would. Throw a warning and try again.
2399		 */
2400		pr_warn("build_path did not end path lookup where "
2401			"expected, pos is %d\n", pos);
2402		goto retry;
2403	}
2404
2405	*pbase = base;
2406	*plen = PATH_MAX - 1 - pos;
2407	dout("build_path on %p %d built %llx '%.*s'\n",
2408	     dentry, d_count(dentry), base, *plen, path + pos);
2409	return path + pos;
2410}
2411
2412static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2413			     const char **ppath, int *ppathlen, u64 *pino,
2414			     bool *pfreepath, bool parent_locked)
2415{
2416	char *path;
2417
2418	rcu_read_lock();
2419	if (!dir)
2420		dir = d_inode_rcu(dentry->d_parent);
2421	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2422		*pino = ceph_ino(dir);
2423		rcu_read_unlock();
2424		*ppath = dentry->d_name.name;
2425		*ppathlen = dentry->d_name.len;
2426		return 0;
2427	}
2428	rcu_read_unlock();
2429	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2430	if (IS_ERR(path))
2431		return PTR_ERR(path);
2432	*ppath = path;
2433	*pfreepath = true;
2434	return 0;
2435}
2436
2437static int build_inode_path(struct inode *inode,
2438			    const char **ppath, int *ppathlen, u64 *pino,
2439			    bool *pfreepath)
2440{
2441	struct dentry *dentry;
2442	char *path;
2443
2444	if (ceph_snap(inode) == CEPH_NOSNAP) {
2445		*pino = ceph_ino(inode);
2446		*ppathlen = 0;
2447		return 0;
2448	}
2449	dentry = d_find_alias(inode);
2450	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2451	dput(dentry);
2452	if (IS_ERR(path))
2453		return PTR_ERR(path);
2454	*ppath = path;
2455	*pfreepath = true;
2456	return 0;
2457}
2458
2459/*
2460 * request arguments may be specified via an inode *, a dentry *, or
2461 * an explicit ino+path.
2462 */
2463static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2464				  struct inode *rdiri, const char *rpath,
2465				  u64 rino, const char **ppath, int *pathlen,
2466				  u64 *ino, bool *freepath, bool parent_locked)
2467{
2468	int r = 0;
2469
2470	if (rinode) {
2471		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2472		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2473		     ceph_snap(rinode));
2474	} else if (rdentry) {
2475		r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2476					freepath, parent_locked);
2477		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2478		     *ppath);
2479	} else if (rpath || rino) {
2480		*ino = rino;
2481		*ppath = rpath;
2482		*pathlen = rpath ? strlen(rpath) : 0;
2483		dout(" path %.*s\n", *pathlen, rpath);
2484	}
2485
2486	return r;
2487}
2488
2489static void encode_timestamp_and_gids(void **p,
2490				      const struct ceph_mds_request *req)
2491{
2492	struct ceph_timespec ts;
2493	int i;
2494
2495	ceph_encode_timespec64(&ts, &req->r_stamp);
2496	ceph_encode_copy(p, &ts, sizeof(ts));
2497
2498	/* gid_list */
2499	ceph_encode_32(p, req->r_cred->group_info->ngroups);
2500	for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2501		ceph_encode_64(p, from_kgid(&init_user_ns,
2502					    req->r_cred->group_info->gid[i]));
2503}
2504
2505/*
2506 * called under mdsc->mutex
2507 */
2508static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2509					       struct ceph_mds_request *req,
2510					       bool drop_cap_releases)
2511{
2512	int mds = session->s_mds;
2513	struct ceph_mds_client *mdsc = session->s_mdsc;
2514	struct ceph_msg *msg;
2515	struct ceph_mds_request_head_old *head;
2516	const char *path1 = NULL;
2517	const char *path2 = NULL;
2518	u64 ino1 = 0, ino2 = 0;
2519	int pathlen1 = 0, pathlen2 = 0;
2520	bool freepath1 = false, freepath2 = false;
2521	int len;
2522	u16 releases;
2523	void *p, *end;
2524	int ret;
2525	bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2526
2527	ret = set_request_path_attr(req->r_inode, req->r_dentry,
2528			      req->r_parent, req->r_path1, req->r_ino1.ino,
2529			      &path1, &pathlen1, &ino1, &freepath1,
2530			      test_bit(CEPH_MDS_R_PARENT_LOCKED,
2531					&req->r_req_flags));
2532	if (ret < 0) {
2533		msg = ERR_PTR(ret);
2534		goto out;
2535	}
2536
2537	/* If r_old_dentry is set, then assume that its parent is locked */
2538	ret = set_request_path_attr(NULL, req->r_old_dentry,
2539			      req->r_old_dentry_dir,
2540			      req->r_path2, req->r_ino2.ino,
2541			      &path2, &pathlen2, &ino2, &freepath2, true);
2542	if (ret < 0) {
2543		msg = ERR_PTR(ret);
2544		goto out_free1;
2545	}
2546
2547	len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head);
2548	len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2549		sizeof(struct ceph_timespec);
2550	len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
2551
2552	/* calculate (max) length for cap releases */
2553	len += sizeof(struct ceph_mds_request_release) *
2554		(!!req->r_inode_drop + !!req->r_dentry_drop +
2555		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2556
2557	if (req->r_dentry_drop)
2558		len += pathlen1;
2559	if (req->r_old_dentry_drop)
2560		len += pathlen2;
2561
2562	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2563	if (!msg) {
2564		msg = ERR_PTR(-ENOMEM);
2565		goto out_free2;
2566	}
2567
2568	msg->hdr.tid = cpu_to_le64(req->r_tid);
2569
2570	/*
2571	 * The old ceph_mds_request_head didn't contain a version field, and
2572	 * one was added when we moved the message version from 3->4.
2573	 */
2574	if (legacy) {
2575		msg->hdr.version = cpu_to_le16(3);
2576		head = msg->front.iov_base;
2577		p = msg->front.iov_base + sizeof(*head);
2578	} else {
2579		struct ceph_mds_request_head *new_head = msg->front.iov_base;
2580
2581		msg->hdr.version = cpu_to_le16(4);
2582		new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
2583		head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2584		p = msg->front.iov_base + sizeof(*new_head);
2585	}
2586
2587	end = msg->front.iov_base + msg->front.iov_len;
2588
2589	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2590	head->op = cpu_to_le32(req->r_op);
2591	head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
2592						 req->r_cred->fsuid));
2593	head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
2594						 req->r_cred->fsgid));
2595	head->ino = cpu_to_le64(req->r_deleg_ino);
2596	head->args = req->r_args;
2597
2598	ceph_encode_filepath(&p, end, ino1, path1);
2599	ceph_encode_filepath(&p, end, ino2, path2);
2600
2601	/* make note of release offset, in case we need to replay */
2602	req->r_request_release_offset = p - msg->front.iov_base;
2603
2604	/* cap releases */
2605	releases = 0;
2606	if (req->r_inode_drop)
2607		releases += ceph_encode_inode_release(&p,
2608		      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2609		      mds, req->r_inode_drop, req->r_inode_unless,
2610		      req->r_op == CEPH_MDS_OP_READDIR);
2611	if (req->r_dentry_drop)
2612		releases += ceph_encode_dentry_release(&p, req->r_dentry,
2613				req->r_parent, mds, req->r_dentry_drop,
2614				req->r_dentry_unless);
2615	if (req->r_old_dentry_drop)
2616		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2617				req->r_old_dentry_dir, mds,
2618				req->r_old_dentry_drop,
2619				req->r_old_dentry_unless);
2620	if (req->r_old_inode_drop)
2621		releases += ceph_encode_inode_release(&p,
2622		      d_inode(req->r_old_dentry),
2623		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2624
2625	if (drop_cap_releases) {
2626		releases = 0;
2627		p = msg->front.iov_base + req->r_request_release_offset;
2628	}
2629
2630	head->num_releases = cpu_to_le16(releases);
2631
2632	encode_timestamp_and_gids(&p, req);
2633
2634	if (WARN_ON_ONCE(p > end)) {
2635		ceph_msg_put(msg);
2636		msg = ERR_PTR(-ERANGE);
2637		goto out_free2;
2638	}
2639
2640	msg->front.iov_len = p - msg->front.iov_base;
2641	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2642
2643	if (req->r_pagelist) {
2644		struct ceph_pagelist *pagelist = req->r_pagelist;
2645		ceph_msg_data_add_pagelist(msg, pagelist);
2646		msg->hdr.data_len = cpu_to_le32(pagelist->length);
2647	} else {
2648		msg->hdr.data_len = 0;
2649	}
2650
2651	msg->hdr.data_off = cpu_to_le16(0);
2652
2653out_free2:
2654	if (freepath2)
2655		ceph_mdsc_free_path((char *)path2, pathlen2);
2656out_free1:
2657	if (freepath1)
2658		ceph_mdsc_free_path((char *)path1, pathlen1);
2659out:
2660	return msg;
2661}
2662
2663/*
2664 * called under mdsc->mutex if error, under no mutex if
2665 * success.
2666 */
2667static void complete_request(struct ceph_mds_client *mdsc,
2668			     struct ceph_mds_request *req)
2669{
2670	req->r_end_latency = ktime_get();
2671
2672	if (req->r_callback)
2673		req->r_callback(mdsc, req);
2674	complete_all(&req->r_completion);
2675}
2676
2677static struct ceph_mds_request_head_old *
2678find_old_request_head(void *p, u64 features)
2679{
2680	bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2681	struct ceph_mds_request_head *new_head;
2682
2683	if (legacy)
2684		return (struct ceph_mds_request_head_old *)p;
2685	new_head = (struct ceph_mds_request_head *)p;
2686	return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2687}
2688
2689/*
2690 * called under mdsc->mutex
2691 */
2692static int __prepare_send_request(struct ceph_mds_session *session,
2693				  struct ceph_mds_request *req,
2694				  bool drop_cap_releases)
2695{
2696	int mds = session->s_mds;
2697	struct ceph_mds_client *mdsc = session->s_mdsc;
2698	struct ceph_mds_request_head_old *rhead;
2699	struct ceph_msg *msg;
2700	int flags = 0;
2701
2702	req->r_attempts++;
2703	if (req->r_inode) {
2704		struct ceph_cap *cap =
2705			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2706
2707		if (cap)
2708			req->r_sent_on_mseq = cap->mseq;
2709		else
2710			req->r_sent_on_mseq = -1;
2711	}
2712	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2713	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2714
2715	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2716		void *p;
2717
2718		/*
2719		 * Replay.  Do not regenerate message (and rebuild
2720		 * paths, etc.); just use the original message.
2721		 * Rebuilding paths will break for renames because
2722		 * d_move mangles the src name.
2723		 */
2724		msg = req->r_request;
2725		rhead = find_old_request_head(msg->front.iov_base,
2726					      session->s_con.peer_features);
2727
2728		flags = le32_to_cpu(rhead->flags);
2729		flags |= CEPH_MDS_FLAG_REPLAY;
2730		rhead->flags = cpu_to_le32(flags);
2731
2732		if (req->r_target_inode)
2733			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2734
2735		rhead->num_retry = req->r_attempts - 1;
2736
2737		/* remove cap/dentry releases from message */
2738		rhead->num_releases = 0;
2739
2740		p = msg->front.iov_base + req->r_request_release_offset;
2741		encode_timestamp_and_gids(&p, req);
2742
2743		msg->front.iov_len = p - msg->front.iov_base;
2744		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2745		return 0;
2746	}
2747
2748	if (req->r_request) {
2749		ceph_msg_put(req->r_request);
2750		req->r_request = NULL;
2751	}
2752	msg = create_request_message(session, req, drop_cap_releases);
2753	if (IS_ERR(msg)) {
2754		req->r_err = PTR_ERR(msg);
2755		return PTR_ERR(msg);
2756	}
2757	req->r_request = msg;
2758
2759	rhead = find_old_request_head(msg->front.iov_base,
2760				      session->s_con.peer_features);
2761	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2762	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2763		flags |= CEPH_MDS_FLAG_REPLAY;
2764	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2765		flags |= CEPH_MDS_FLAG_ASYNC;
2766	if (req->r_parent)
2767		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2768	rhead->flags = cpu_to_le32(flags);
2769	rhead->num_fwd = req->r_num_fwd;
2770	rhead->num_retry = req->r_attempts - 1;
2771
2772	dout(" r_parent = %p\n", req->r_parent);
2773	return 0;
2774}
2775
2776/*
2777 * called under mdsc->mutex
2778 */
2779static int __send_request(struct ceph_mds_session *session,
2780			  struct ceph_mds_request *req,
2781			  bool drop_cap_releases)
2782{
2783	int err;
2784
2785	err = __prepare_send_request(session, req, drop_cap_releases);
2786	if (!err) {
2787		ceph_msg_get(req->r_request);
2788		ceph_con_send(&session->s_con, req->r_request);
2789	}
2790
2791	return err;
2792}
2793
2794/*
2795 * send request, or put it on the appropriate wait list.
2796 */
2797static void __do_request(struct ceph_mds_client *mdsc,
2798			struct ceph_mds_request *req)
2799{
2800	struct ceph_mds_session *session = NULL;
2801	int mds = -1;
2802	int err = 0;
2803	bool random;
2804
2805	if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2806		if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2807			__unregister_request(mdsc, req);
2808		return;
2809	}
2810
2811	if (req->r_timeout &&
2812	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2813		dout("do_request timed out\n");
2814		err = -ETIMEDOUT;
2815		goto finish;
2816	}
2817	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2818		dout("do_request forced umount\n");
2819		err = -EIO;
2820		goto finish;
2821	}
2822	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2823		if (mdsc->mdsmap_err) {
2824			err = mdsc->mdsmap_err;
2825			dout("do_request mdsmap err %d\n", err);
2826			goto finish;
2827		}
2828		if (mdsc->mdsmap->m_epoch == 0) {
2829			dout("do_request no mdsmap, waiting for map\n");
2830			list_add(&req->r_wait, &mdsc->waiting_for_map);
2831			return;
2832		}
2833		if (!(mdsc->fsc->mount_options->flags &
2834		      CEPH_MOUNT_OPT_MOUNTWAIT) &&
2835		    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2836			err = -EHOSTUNREACH;
2837			goto finish;
2838		}
2839	}
2840
2841	put_request_session(req);
2842
2843	mds = __choose_mds(mdsc, req, &random);
2844	if (mds < 0 ||
2845	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2846		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2847			err = -EJUKEBOX;
2848			goto finish;
2849		}
2850		dout("do_request no mds or not active, waiting for map\n");
2851		list_add(&req->r_wait, &mdsc->waiting_for_map);
2852		return;
2853	}
2854
2855	/* get, open session */
2856	session = __ceph_lookup_mds_session(mdsc, mds);
2857	if (!session) {
2858		session = register_session(mdsc, mds);
2859		if (IS_ERR(session)) {
2860			err = PTR_ERR(session);
2861			goto finish;
2862		}
2863	}
2864	req->r_session = ceph_get_mds_session(session);
2865
2866	dout("do_request mds%d session %p state %s\n", mds, session,
2867	     ceph_session_state_name(session->s_state));
2868	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2869	    session->s_state != CEPH_MDS_SESSION_HUNG) {
2870		/*
2871		 * We cannot queue async requests since the caps and delegated
2872		 * inodes are bound to the session. Just return -EJUKEBOX and
2873		 * let the caller retry a sync request in that case.
2874		 */
2875		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2876			err = -EJUKEBOX;
2877			goto out_session;
2878		}
2879
2880		/*
2881		 * If the session has been REJECTED, then return a hard error,
2882		 * unless it's a CLEANRECOVER mount, in which case we'll queue
2883		 * it to the mdsc queue.
2884		 */
2885		if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2886			if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
2887				list_add(&req->r_wait, &mdsc->waiting_for_map);
2888			else
2889				err = -EACCES;
2890			goto out_session;
2891		}
2892
2893		if (session->s_state == CEPH_MDS_SESSION_NEW ||
2894		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
2895			err = __open_session(mdsc, session);
2896			if (err)
2897				goto out_session;
2898			/* retry the same mds later */
2899			if (random)
2900				req->r_resend_mds = mds;
2901		}
2902		list_add(&req->r_wait, &session->s_waiting);
2903		goto out_session;
2904	}
2905
2906	/* send request */
2907	req->r_resend_mds = -1;   /* forget any previous mds hint */
2908
2909	if (req->r_request_started == 0)   /* note request start time */
2910		req->r_request_started = jiffies;
2911
2912	err = __send_request(session, req, false);
2913
2914out_session:
2915	ceph_put_mds_session(session);
2916finish:
2917	if (err) {
2918		dout("__do_request early error %d\n", err);
2919		req->r_err = err;
2920		complete_request(mdsc, req);
2921		__unregister_request(mdsc, req);
2922	}
2923	return;
2924}
2925
2926/*
2927 * called under mdsc->mutex
2928 */
2929static void __wake_requests(struct ceph_mds_client *mdsc,
2930			    struct list_head *head)
2931{
2932	struct ceph_mds_request *req;
2933	LIST_HEAD(tmp_list);
2934
2935	list_splice_init(head, &tmp_list);
2936
2937	while (!list_empty(&tmp_list)) {
2938		req = list_entry(tmp_list.next,
2939				 struct ceph_mds_request, r_wait);
2940		list_del_init(&req->r_wait);
2941		dout(" wake request %p tid %llu\n", req, req->r_tid);
2942		__do_request(mdsc, req);
2943	}
2944}
2945
2946/*
2947 * Wake up threads with requests pending for @mds, so that they can
2948 * resubmit their requests to a possibly different mds.
2949 */
2950static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2951{
2952	struct ceph_mds_request *req;
2953	struct rb_node *p = rb_first(&mdsc->request_tree);
2954
2955	dout("kick_requests mds%d\n", mds);
2956	while (p) {
2957		req = rb_entry(p, struct ceph_mds_request, r_node);
2958		p = rb_next(p);
2959		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2960			continue;
2961		if (req->r_attempts > 0)
2962			continue; /* only new requests */
2963		if (req->r_session &&
2964		    req->r_session->s_mds == mds) {
2965			dout(" kicking tid %llu\n", req->r_tid);
2966			list_del_init(&req->r_wait);
2967			__do_request(mdsc, req);
2968		}
2969	}
2970}
2971
2972int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2973			      struct ceph_mds_request *req)
2974{
2975	int err = 0;
2976
2977	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2978	if (req->r_inode)
2979		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2980	if (req->r_parent) {
2981		struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2982		int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2983			    CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2984		spin_lock(&ci->i_ceph_lock);
2985		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2986		__ceph_touch_fmode(ci, mdsc, fmode);
2987		spin_unlock(&ci->i_ceph_lock);
2988	}
2989	if (req->r_old_dentry_dir)
2990		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2991				  CEPH_CAP_PIN);
2992
2993	if (req->r_inode) {
2994		err = ceph_wait_on_async_create(req->r_inode);
2995		if (err) {
2996			dout("%s: wait for async create returned: %d\n",
2997			     __func__, err);
2998			return err;
2999		}
3000	}
3001
3002	if (!err && req->r_old_inode) {
3003		err = ceph_wait_on_async_create(req->r_old_inode);
3004		if (err) {
3005			dout("%s: wait for async create returned: %d\n",
3006			     __func__, err);
3007			return err;
3008		}
3009	}
3010
3011	dout("submit_request on %p for inode %p\n", req, dir);
3012	mutex_lock(&mdsc->mutex);
3013	__register_request(mdsc, req, dir);
3014	__do_request(mdsc, req);
3015	err = req->r_err;
3016	mutex_unlock(&mdsc->mutex);
3017	return err;
3018}
3019
3020static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3021				  struct ceph_mds_request *req)
3022{
3023	int err;
3024
3025	/* wait */
3026	dout("do_request waiting\n");
3027	if (!req->r_timeout && req->r_wait_for_completion) {
3028		err = req->r_wait_for_completion(mdsc, req);
3029	} else {
3030		long timeleft = wait_for_completion_killable_timeout(
3031					&req->r_completion,
3032					ceph_timeout_jiffies(req->r_timeout));
3033		if (timeleft > 0)
3034			err = 0;
3035		else if (!timeleft)
3036			err = -ETIMEDOUT;  /* timed out */
3037		else
3038			err = timeleft;  /* killed */
3039	}
3040	dout("do_request waited, got %d\n", err);
3041	mutex_lock(&mdsc->mutex);
3042
3043	/* only abort if we didn't race with a real reply */
3044	if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3045		err = le32_to_cpu(req->r_reply_info.head->result);
3046	} else if (err < 0) {
3047		dout("aborted request %lld with %d\n", req->r_tid, err);
3048
3049		/*
3050		 * ensure we aren't running concurrently with
3051		 * ceph_fill_trace or ceph_readdir_prepopulate, which
3052		 * rely on locks (dir mutex) held by our caller.
3053		 */
3054		mutex_lock(&req->r_fill_mutex);
3055		req->r_err = err;
3056		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3057		mutex_unlock(&req->r_fill_mutex);
3058
3059		if (req->r_parent &&
3060		    (req->r_op & CEPH_MDS_OP_WRITE))
3061			ceph_invalidate_dir_request(req);
3062	} else {
3063		err = req->r_err;
3064	}
3065
3066	mutex_unlock(&mdsc->mutex);
3067	return err;
3068}
3069
3070/*
3071 * Synchrously perform an mds request.  Take care of all of the
3072 * session setup, forwarding, retry details.
3073 */
3074int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3075			 struct inode *dir,
3076			 struct ceph_mds_request *req)
3077{
3078	int err;
3079
3080	dout("do_request on %p\n", req);
3081
3082	/* issue */
3083	err = ceph_mdsc_submit_request(mdsc, dir, req);
3084	if (!err)
3085		err = ceph_mdsc_wait_request(mdsc, req);
3086	dout("do_request %p done, result %d\n", req, err);
3087	return err;
3088}
3089
3090/*
3091 * Invalidate dir's completeness, dentry lease state on an aborted MDS
3092 * namespace request.
3093 */
3094void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3095{
3096	struct inode *dir = req->r_parent;
3097	struct inode *old_dir = req->r_old_dentry_dir;
3098
3099	dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3100
3101	ceph_dir_clear_complete(dir);
3102	if (old_dir)
3103		ceph_dir_clear_complete(old_dir);
3104	if (req->r_dentry)
3105		ceph_invalidate_dentry_lease(req->r_dentry);
3106	if (req->r_old_dentry)
3107		ceph_invalidate_dentry_lease(req->r_old_dentry);
3108}
3109
3110/*
3111 * Handle mds reply.
3112 *
3113 * We take the session mutex and parse and process the reply immediately.
3114 * This preserves the logical ordering of replies, capabilities, etc., sent
3115 * by the MDS as they are applied to our local cache.
3116 */
3117static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3118{
3119	struct ceph_mds_client *mdsc = session->s_mdsc;
3120	struct ceph_mds_request *req;
3121	struct ceph_mds_reply_head *head = msg->front.iov_base;
3122	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3123	struct ceph_snap_realm *realm;
3124	u64 tid;
3125	int err, result;
3126	int mds = session->s_mds;
3127
3128	if (msg->front.iov_len < sizeof(*head)) {
3129		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3130		ceph_msg_dump(msg);
3131		return;
3132	}
3133
3134	/* get request, session */
3135	tid = le64_to_cpu(msg->hdr.tid);
3136	mutex_lock(&mdsc->mutex);
3137	req = lookup_get_request(mdsc, tid);
3138	if (!req) {
3139		dout("handle_reply on unknown tid %llu\n", tid);
3140		mutex_unlock(&mdsc->mutex);
3141		return;
3142	}
3143	dout("handle_reply %p\n", req);
3144
3145	/* correct session? */
3146	if (req->r_session != session) {
3147		pr_err("mdsc_handle_reply got %llu on session mds%d"
3148		       " not mds%d\n", tid, session->s_mds,
3149		       req->r_session ? req->r_session->s_mds : -1);
3150		mutex_unlock(&mdsc->mutex);
3151		goto out;
3152	}
3153
3154	/* dup? */
3155	if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3156	    (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3157		pr_warn("got a dup %s reply on %llu from mds%d\n",
3158			   head->safe ? "safe" : "unsafe", tid, mds);
3159		mutex_unlock(&mdsc->mutex);
3160		goto out;
3161	}
3162	if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3163		pr_warn("got unsafe after safe on %llu from mds%d\n",
3164			   tid, mds);
3165		mutex_unlock(&mdsc->mutex);
3166		goto out;
3167	}
3168
3169	result = le32_to_cpu(head->result);
3170
3171	/*
3172	 * Handle an ESTALE
3173	 * if we're not talking to the authority, send to them
3174	 * if the authority has changed while we weren't looking,
3175	 * send to new authority
3176	 * Otherwise we just have to return an ESTALE
3177	 */
3178	if (result == -ESTALE) {
3179		dout("got ESTALE on request %llu\n", req->r_tid);
3180		req->r_resend_mds = -1;
3181		if (req->r_direct_mode != USE_AUTH_MDS) {
3182			dout("not using auth, setting for that now\n");
3183			req->r_direct_mode = USE_AUTH_MDS;
3184			__do_request(mdsc, req);
3185			mutex_unlock(&mdsc->mutex);
3186			goto out;
3187		} else  {
3188			int mds = __choose_mds(mdsc, req, NULL);
3189			if (mds >= 0 && mds != req->r_session->s_mds) {
3190				dout("but auth changed, so resending\n");
3191				__do_request(mdsc, req);
3192				mutex_unlock(&mdsc->mutex);
3193				goto out;
3194			}
3195		}
3196		dout("have to return ESTALE on request %llu\n", req->r_tid);
3197	}
3198
3199
3200	if (head->safe) {
3201		set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3202		__unregister_request(mdsc, req);
3203
3204		/* last request during umount? */
3205		if (mdsc->stopping && !__get_oldest_req(mdsc))
3206			complete_all(&mdsc->safe_umount_waiters);
3207
3208		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3209			/*
3210			 * We already handled the unsafe response, now do the
3211			 * cleanup.  No need to examine the response; the MDS
3212			 * doesn't include any result info in the safe
3213			 * response.  And even if it did, there is nothing
3214			 * useful we could do with a revised return value.
3215			 */
3216			dout("got safe reply %llu, mds%d\n", tid, mds);
3217
3218			mutex_unlock(&mdsc->mutex);
3219			goto out;
3220		}
3221	} else {
3222		set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3223		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3224	}
3225
3226	dout("handle_reply tid %lld result %d\n", tid, result);
3227	rinfo = &req->r_reply_info;
3228	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3229		err = parse_reply_info(session, msg, rinfo, (u64)-1);
3230	else
3231		err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3232	mutex_unlock(&mdsc->mutex);
3233
3234	/* Must find target inode outside of mutexes to avoid deadlocks */
3235	if ((err >= 0) && rinfo->head->is_target) {
3236		struct inode *in;
3237		struct ceph_vino tvino = {
3238			.ino  = le64_to_cpu(rinfo->targeti.in->ino),
3239			.snap = le64_to_cpu(rinfo->targeti.in->snapid)
3240		};
3241
3242		in = ceph_get_inode(mdsc->fsc->sb, tvino);
3243		if (IS_ERR(in)) {
3244			err = PTR_ERR(in);
3245			mutex_lock(&session->s_mutex);
3246			goto out_err;
3247		}
3248		req->r_target_inode = in;
3249	}
3250
3251	mutex_lock(&session->s_mutex);
3252	if (err < 0) {
3253		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3254		ceph_msg_dump(msg);
3255		goto out_err;
3256	}
3257
3258	/* snap trace */
3259	realm = NULL;
3260	if (rinfo->snapblob_len) {
3261		down_write(&mdsc->snap_rwsem);
3262		ceph_update_snap_trace(mdsc, rinfo->snapblob,
3263				rinfo->snapblob + rinfo->snapblob_len,
3264				le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3265				&realm);
3266		downgrade_write(&mdsc->snap_rwsem);
3267	} else {
3268		down_read(&mdsc->snap_rwsem);
3269	}
3270
3271	/* insert trace into our cache */
3272	mutex_lock(&req->r_fill_mutex);
3273	current->journal_info = req;
3274	err = ceph_fill_trace(mdsc->fsc->sb, req);
3275	if (err == 0) {
3276		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3277				    req->r_op == CEPH_MDS_OP_LSSNAP))
3278			ceph_readdir_prepopulate(req, req->r_session);
3279	}
3280	current->journal_info = NULL;
3281	mutex_unlock(&req->r_fill_mutex);
3282
3283	up_read(&mdsc->snap_rwsem);
3284	if (realm)
3285		ceph_put_snap_realm(mdsc, realm);
3286
3287	if (err == 0) {
3288		if (req->r_target_inode &&
3289		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3290			struct ceph_inode_info *ci =
3291				ceph_inode(req->r_target_inode);
3292			spin_lock(&ci->i_unsafe_lock);
3293			list_add_tail(&req->r_unsafe_target_item,
3294				      &ci->i_unsafe_iops);
3295			spin_unlock(&ci->i_unsafe_lock);
3296		}
3297
3298		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3299	}
3300out_err:
3301	mutex_lock(&mdsc->mutex);
3302	if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3303		if (err) {
3304			req->r_err = err;
3305		} else {
3306			req->r_reply =  ceph_msg_get(msg);
3307			set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3308		}
3309	} else {
3310		dout("reply arrived after request %lld was aborted\n", tid);
3311	}
3312	mutex_unlock(&mdsc->mutex);
3313
3314	mutex_unlock(&session->s_mutex);
3315
3316	/* kick calling process */
3317	complete_request(mdsc, req);
3318
3319	ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
3320				     req->r_end_latency, err);
3321out:
3322	ceph_mdsc_put_request(req);
3323	return;
3324}
3325
3326
3327
3328/*
3329 * handle mds notification that our request has been forwarded.
3330 */
3331static void handle_forward(struct ceph_mds_client *mdsc,
3332			   struct ceph_mds_session *session,
3333			   struct ceph_msg *msg)
3334{
3335	struct ceph_mds_request *req;
3336	u64 tid = le64_to_cpu(msg->hdr.tid);
3337	u32 next_mds;
3338	u32 fwd_seq;
3339	int err = -EINVAL;
3340	void *p = msg->front.iov_base;
3341	void *end = p + msg->front.iov_len;
3342
3343	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3344	next_mds = ceph_decode_32(&p);
3345	fwd_seq = ceph_decode_32(&p);
3346
3347	mutex_lock(&mdsc->mutex);
3348	req = lookup_get_request(mdsc, tid);
3349	if (!req) {
3350		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3351		goto out;  /* dup reply? */
3352	}
3353
3354	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3355		dout("forward tid %llu aborted, unregistering\n", tid);
3356		__unregister_request(mdsc, req);
3357	} else if (fwd_seq <= req->r_num_fwd) {
3358		dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3359		     tid, next_mds, req->r_num_fwd, fwd_seq);
3360	} else {
3361		/* resend. forward race not possible; mds would drop */
3362		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3363		BUG_ON(req->r_err);
3364		BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3365		req->r_attempts = 0;
3366		req->r_num_fwd = fwd_seq;
3367		req->r_resend_mds = next_mds;
3368		put_request_session(req);
3369		__do_request(mdsc, req);
3370	}
3371	ceph_mdsc_put_request(req);
3372out:
3373	mutex_unlock(&mdsc->mutex);
3374	return;
3375
3376bad:
3377	pr_err("mdsc_handle_forward decode error err=%d\n", err);
3378}
3379
3380static int __decode_session_metadata(void **p, void *end,
3381				     bool *blocklisted)
3382{
3383	/* map<string,string> */
3384	u32 n;
3385	bool err_str;
3386	ceph_decode_32_safe(p, end, n, bad);
3387	while (n-- > 0) {
3388		u32 len;
3389		ceph_decode_32_safe(p, end, len, bad);
3390		ceph_decode_need(p, end, len, bad);
3391		err_str = !strncmp(*p, "error_string", len);
3392		*p += len;
3393		ceph_decode_32_safe(p, end, len, bad);
3394		ceph_decode_need(p, end, len, bad);
3395		/*
3396		 * Match "blocklisted (blacklisted)" from newer MDSes,
3397		 * or "blacklisted" from older MDSes.
3398		 */
3399		if (err_str && strnstr(*p, "blacklisted", len))
3400			*blocklisted = true;
3401		*p += len;
3402	}
3403	return 0;
3404bad:
3405	return -1;
3406}
3407
3408/*
3409 * handle a mds session control message
3410 */
3411static void handle_session(struct ceph_mds_session *session,
3412			   struct ceph_msg *msg)
3413{
3414	struct ceph_mds_client *mdsc = session->s_mdsc;
3415	int mds = session->s_mds;
3416	int msg_version = le16_to_cpu(msg->hdr.version);
3417	void *p = msg->front.iov_base;
3418	void *end = p + msg->front.iov_len;
3419	struct ceph_mds_session_head *h;
3420	u32 op;
3421	u64 seq, features = 0;
3422	int wake = 0;
3423	bool blocklisted = false;
3424
3425	/* decode */
3426	ceph_decode_need(&p, end, sizeof(*h), bad);
3427	h = p;
3428	p += sizeof(*h);
3429
3430	op = le32_to_cpu(h->op);
3431	seq = le64_to_cpu(h->seq);
3432
3433	if (msg_version >= 3) {
3434		u32 len;
3435		/* version >= 2, metadata */
3436		if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3437			goto bad;
3438		/* version >= 3, feature bits */
3439		ceph_decode_32_safe(&p, end, len, bad);
3440		if (len) {
3441			ceph_decode_64_safe(&p, end, features, bad);
3442			p += len - sizeof(features);
3443		}
3444	}
3445
3446	mutex_lock(&mdsc->mutex);
3447	if (op == CEPH_SESSION_CLOSE) {
3448		ceph_get_mds_session(session);
3449		__unregister_session(mdsc, session);
3450	}
3451	/* FIXME: this ttl calculation is generous */
3452	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3453	mutex_unlock(&mdsc->mutex);
3454
3455	mutex_lock(&session->s_mutex);
3456
3457	dout("handle_session mds%d %s %p state %s seq %llu\n",
3458	     mds, ceph_session_op_name(op), session,
3459	     ceph_session_state_name(session->s_state), seq);
3460
3461	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3462		session->s_state = CEPH_MDS_SESSION_OPEN;
3463		pr_info("mds%d came back\n", session->s_mds);
3464	}
3465
3466	switch (op) {
3467	case CEPH_SESSION_OPEN:
3468		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3469			pr_info("mds%d reconnect success\n", session->s_mds);
3470		session->s_state = CEPH_MDS_SESSION_OPEN;
3471		session->s_features = features;
3472		renewed_caps(mdsc, session, 0);
3473		if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
3474			metric_schedule_delayed(&mdsc->metric);
3475		wake = 1;
3476		if (mdsc->stopping)
3477			__close_session(mdsc, session);
3478		break;
3479
3480	case CEPH_SESSION_RENEWCAPS:
3481		if (session->s_renew_seq == seq)
3482			renewed_caps(mdsc, session, 1);
3483		break;
3484
3485	case CEPH_SESSION_CLOSE:
3486		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3487			pr_info("mds%d reconnect denied\n", session->s_mds);
3488		session->s_state = CEPH_MDS_SESSION_CLOSED;
3489		cleanup_session_requests(mdsc, session);
3490		remove_session_caps(session);
3491		wake = 2; /* for good measure */
3492		wake_up_all(&mdsc->session_close_wq);
3493		break;
3494
3495	case CEPH_SESSION_STALE:
3496		pr_info("mds%d caps went stale, renewing\n",
3497			session->s_mds);
3498		atomic_inc(&session->s_cap_gen);
3499		session->s_cap_ttl = jiffies - 1;
3500		send_renew_caps(mdsc, session);
3501		break;
3502
3503	case CEPH_SESSION_RECALL_STATE:
3504		ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3505		break;
3506
3507	case CEPH_SESSION_FLUSHMSG:
3508		send_flushmsg_ack(mdsc, session, seq);
3509		break;
3510
3511	case CEPH_SESSION_FORCE_RO:
3512		dout("force_session_readonly %p\n", session);
3513		spin_lock(&session->s_cap_lock);
3514		session->s_readonly = true;
3515		spin_unlock(&session->s_cap_lock);
3516		wake_up_session_caps(session, FORCE_RO);
3517		break;
3518
3519	case CEPH_SESSION_REJECT:
3520		WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3521		pr_info("mds%d rejected session\n", session->s_mds);
3522		session->s_state = CEPH_MDS_SESSION_REJECTED;
3523		cleanup_session_requests(mdsc, session);
3524		remove_session_caps(session);
3525		if (blocklisted)
3526			mdsc->fsc->blocklisted = true;
3527		wake = 2; /* for good measure */
3528		break;
3529
3530	default:
3531		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3532		WARN_ON(1);
3533	}
3534
3535	mutex_unlock(&session->s_mutex);
3536	if (wake) {
3537		mutex_lock(&mdsc->mutex);
3538		__wake_requests(mdsc, &session->s_waiting);
3539		if (wake == 2)
3540			kick_requests(mdsc, mds);
3541		mutex_unlock(&mdsc->mutex);
3542	}
3543	if (op == CEPH_SESSION_CLOSE)
3544		ceph_put_mds_session(session);
3545	return;
3546
3547bad:
3548	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3549	       (int)msg->front.iov_len);
3550	ceph_msg_dump(msg);
3551	return;
3552}
3553
3554void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3555{
3556	int dcaps;
3557
3558	dcaps = xchg(&req->r_dir_caps, 0);
3559	if (dcaps) {
3560		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3561		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3562	}
3563}
3564
3565void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3566{
3567	int dcaps;
3568
3569	dcaps = xchg(&req->r_dir_caps, 0);
3570	if (dcaps) {
3571		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3572		ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3573						dcaps);
3574	}
3575}
3576
3577/*
3578 * called under session->mutex.
3579 */
3580static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3581				   struct ceph_mds_session *session)
3582{
3583	struct ceph_mds_request *req, *nreq;
3584	struct rb_node *p;
3585
3586	dout("replay_unsafe_requests mds%d\n", session->s_mds);
3587
3588	mutex_lock(&mdsc->mutex);
3589	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3590		__send_request(session, req, true);
3591
3592	/*
3593	 * also re-send old requests when MDS enters reconnect stage. So that MDS
3594	 * can process completed request in clientreplay stage.
3595	 */
3596	p = rb_first(&mdsc->request_tree);
3597	while (p) {
3598		req = rb_entry(p, struct ceph_mds_request, r_node);
3599		p = rb_next(p);
3600		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3601			continue;
3602		if (req->r_attempts == 0)
3603			continue; /* only old requests */
3604		if (!req->r_session)
3605			continue;
3606		if (req->r_session->s_mds != session->s_mds)
3607			continue;
3608
3609		ceph_mdsc_release_dir_caps_no_check(req);
3610
3611		__send_request(session, req, true);
3612	}
3613	mutex_unlock(&mdsc->mutex);
3614}
3615
3616static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3617{
3618	struct ceph_msg *reply;
3619	struct ceph_pagelist *_pagelist;
3620	struct page *page;
3621	__le32 *addr;
3622	int err = -ENOMEM;
3623
3624	if (!recon_state->allow_multi)
3625		return -ENOSPC;
3626
3627	/* can't handle message that contains both caps and realm */
3628	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3629
3630	/* pre-allocate new pagelist */
3631	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
3632	if (!_pagelist)
3633		return -ENOMEM;
3634
3635	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3636	if (!reply)
3637		goto fail_msg;
3638
3639	/* placeholder for nr_caps */
3640	err = ceph_pagelist_encode_32(_pagelist, 0);
3641	if (err < 0)
3642		goto fail;
3643
3644	if (recon_state->nr_caps) {
3645		/* currently encoding caps */
3646		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3647		if (err)
3648			goto fail;
3649	} else {
3650		/* placeholder for nr_realms (currently encoding relams) */
3651		err = ceph_pagelist_encode_32(_pagelist, 0);
3652		if (err < 0)
3653			goto fail;
3654	}
3655
3656	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3657	if (err)
3658		goto fail;
3659
3660	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3661	addr = kmap_atomic(page);
3662	if (recon_state->nr_caps) {
3663		/* currently encoding caps */
3664		*addr = cpu_to_le32(recon_state->nr_caps);
3665	} else {
3666		/* currently encoding relams */
3667		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3668	}
3669	kunmap_atomic(addr);
3670
3671	reply->hdr.version = cpu_to_le16(5);
3672	reply->hdr.compat_version = cpu_to_le16(4);
3673
3674	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3675	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3676
3677	ceph_con_send(&recon_state->session->s_con, reply);
3678	ceph_pagelist_release(recon_state->pagelist);
3679
3680	recon_state->pagelist = _pagelist;
3681	recon_state->nr_caps = 0;
3682	recon_state->nr_realms = 0;
3683	recon_state->msg_version = 5;
3684	return 0;
3685fail:
3686	ceph_msg_put(reply);
3687fail_msg:
3688	ceph_pagelist_release(_pagelist);
3689	return err;
3690}
3691
3692static struct dentry* d_find_primary(struct inode *inode)
3693{
3694	struct dentry *alias, *dn = NULL;
3695
3696	if (hlist_empty(&inode->i_dentry))
3697		return NULL;
3698
3699	spin_lock(&inode->i_lock);
3700	if (hlist_empty(&inode->i_dentry))
3701		goto out_unlock;
3702
3703	if (S_ISDIR(inode->i_mode)) {
3704		alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3705		if (!IS_ROOT(alias))
3706			dn = dget(alias);
3707		goto out_unlock;
3708	}
3709
3710	hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3711		spin_lock(&alias->d_lock);
3712		if (!d_unhashed(alias) &&
3713		    (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3714			dn = dget_dlock(alias);
3715		}
3716		spin_unlock(&alias->d_lock);
3717		if (dn)
3718			break;
3719	}
3720out_unlock:
3721	spin_unlock(&inode->i_lock);
3722	return dn;
3723}
3724
3725/*
3726 * Encode information about a cap for a reconnect with the MDS.
3727 */
3728static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3729			  void *arg)
3730{
3731	union {
3732		struct ceph_mds_cap_reconnect v2;
3733		struct ceph_mds_cap_reconnect_v1 v1;
3734	} rec;
3735	struct ceph_inode_info *ci = cap->ci;
3736	struct ceph_reconnect_state *recon_state = arg;
3737	struct ceph_pagelist *pagelist = recon_state->pagelist;
3738	struct dentry *dentry;
3739	char *path;
3740	int pathlen, err;
3741	u64 pathbase;
3742	u64 snap_follows;
3743
3744	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3745	     inode, ceph_vinop(inode), cap, cap->cap_id,
3746	     ceph_cap_string(cap->issued));
3747
3748	dentry = d_find_primary(inode);
3749	if (dentry) {
3750		/* set pathbase to parent dir when msg_version >= 2 */
3751		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3752					    recon_state->msg_version >= 2);
3753		dput(dentry);
3754		if (IS_ERR(path)) {
3755			err = PTR_ERR(path);
3756			goto out_err;
3757		}
3758	} else {
3759		path = NULL;
3760		pathlen = 0;
3761		pathbase = 0;
3762	}
3763
3764	spin_lock(&ci->i_ceph_lock);
3765	cap->seq = 0;        /* reset cap seq */
3766	cap->issue_seq = 0;  /* and issue_seq */
3767	cap->mseq = 0;       /* and migrate_seq */
3768	cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
3769
3770	/* These are lost when the session goes away */
3771	if (S_ISDIR(inode->i_mode)) {
3772		if (cap->issued & CEPH_CAP_DIR_CREATE) {
3773			ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3774			memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3775		}
3776		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3777	}
3778
3779	if (recon_state->msg_version >= 2) {
3780		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3781		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3782		rec.v2.issued = cpu_to_le32(cap->issued);
3783		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3784		rec.v2.pathbase = cpu_to_le64(pathbase);
3785		rec.v2.flock_len = (__force __le32)
3786			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3787	} else {
3788		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3789		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3790		rec.v1.issued = cpu_to_le32(cap->issued);
3791		rec.v1.size = cpu_to_le64(i_size_read(inode));
3792		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3793		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3794		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3795		rec.v1.pathbase = cpu_to_le64(pathbase);
3796	}
3797
3798	if (list_empty(&ci->i_cap_snaps)) {
3799		snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3800	} else {
3801		struct ceph_cap_snap *capsnap =
3802			list_first_entry(&ci->i_cap_snaps,
3803					 struct ceph_cap_snap, ci_item);
3804		snap_follows = capsnap->follows;
3805	}
3806	spin_unlock(&ci->i_ceph_lock);
3807
3808	if (recon_state->msg_version >= 2) {
3809		int num_fcntl_locks, num_flock_locks;
3810		struct ceph_filelock *flocks = NULL;
3811		size_t struct_len, total_len = sizeof(u64);
3812		u8 struct_v = 0;
3813
3814encode_again:
3815		if (rec.v2.flock_len) {
3816			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3817		} else {
3818			num_fcntl_locks = 0;
3819			num_flock_locks = 0;
3820		}
3821		if (num_fcntl_locks + num_flock_locks > 0) {
3822			flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3823					       sizeof(struct ceph_filelock),
3824					       GFP_NOFS);
3825			if (!flocks) {
3826				err = -ENOMEM;
3827				goto out_err;
3828			}
3829			err = ceph_encode_locks_to_buffer(inode, flocks,
3830							  num_fcntl_locks,
3831							  num_flock_locks);
3832			if (err) {
3833				kfree(flocks);
3834				flocks = NULL;
3835				if (err == -ENOSPC)
3836					goto encode_again;
3837				goto out_err;
3838			}
3839		} else {
3840			kfree(flocks);
3841			flocks = NULL;
3842		}
3843
3844		if (recon_state->msg_version >= 3) {
3845			/* version, compat_version and struct_len */
3846			total_len += 2 * sizeof(u8) + sizeof(u32);
3847			struct_v = 2;
3848		}
3849		/*
3850		 * number of encoded locks is stable, so copy to pagelist
3851		 */
3852		struct_len = 2 * sizeof(u32) +
3853			    (num_fcntl_locks + num_flock_locks) *
3854			    sizeof(struct ceph_filelock);
3855		rec.v2.flock_len = cpu_to_le32(struct_len);
3856
3857		struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
3858
3859		if (struct_v >= 2)
3860			struct_len += sizeof(u64); /* snap_follows */
3861
3862		total_len += struct_len;
3863
3864		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3865			err = send_reconnect_partial(recon_state);
3866			if (err)
3867				goto out_freeflocks;
3868			pagelist = recon_state->pagelist;
3869		}
3870
3871		err = ceph_pagelist_reserve(pagelist, total_len);
3872		if (err)
3873			goto out_freeflocks;
3874
3875		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3876		if (recon_state->msg_version >= 3) {
3877			ceph_pagelist_encode_8(pagelist, struct_v);
3878			ceph_pagelist_encode_8(pagelist, 1);
3879			ceph_pagelist_encode_32(pagelist, struct_len);
3880		}
3881		ceph_pagelist_encode_string(pagelist, path, pathlen);
3882		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3883		ceph_locks_to_pagelist(flocks, pagelist,
3884				       num_fcntl_locks, num_flock_locks);
3885		if (struct_v >= 2)
3886			ceph_pagelist_encode_64(pagelist, snap_follows);
3887out_freeflocks:
3888		kfree(flocks);
3889	} else {
3890		err = ceph_pagelist_reserve(pagelist,
3891					    sizeof(u64) + sizeof(u32) +
3892					    pathlen + sizeof(rec.v1));
3893		if (err)
3894			goto out_err;
3895
3896		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3897		ceph_pagelist_encode_string(pagelist, path, pathlen);
3898		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3899	}
3900
3901out_err:
3902	ceph_mdsc_free_path(path, pathlen);
3903	if (!err)
3904		recon_state->nr_caps++;
3905	return err;
3906}
3907
3908static int encode_snap_realms(struct ceph_mds_client *mdsc,
3909			      struct ceph_reconnect_state *recon_state)
3910{
3911	struct rb_node *p;
3912	struct ceph_pagelist *pagelist = recon_state->pagelist;
3913	int err = 0;
3914
3915	if (recon_state->msg_version >= 4) {
3916		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3917		if (err < 0)
3918			goto fail;
3919	}
3920
3921	/*
3922	 * snaprealms.  we provide mds with the ino, seq (version), and
3923	 * parent for all of our realms.  If the mds has any newer info,
3924	 * it will tell us.
3925	 */
3926	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3927		struct ceph_snap_realm *realm =
3928		       rb_entry(p, struct ceph_snap_realm, node);
3929		struct ceph_mds_snaprealm_reconnect sr_rec;
3930
3931		if (recon_state->msg_version >= 4) {
3932			size_t need = sizeof(u8) * 2 + sizeof(u32) +
3933				      sizeof(sr_rec);
3934
3935			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3936				err = send_reconnect_partial(recon_state);
3937				if (err)
3938					goto fail;
3939				pagelist = recon_state->pagelist;
3940			}
3941
3942			err = ceph_pagelist_reserve(pagelist, need);
3943			if (err)
3944				goto fail;
3945
3946			ceph_pagelist_encode_8(pagelist, 1);
3947			ceph_pagelist_encode_8(pagelist, 1);
3948			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3949		}
3950
3951		dout(" adding snap realm %llx seq %lld parent %llx\n",
3952		     realm->ino, realm->seq, realm->parent_ino);
3953		sr_rec.ino = cpu_to_le64(realm->ino);
3954		sr_rec.seq = cpu_to_le64(realm->seq);
3955		sr_rec.parent = cpu_to_le64(realm->parent_ino);
3956
3957		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3958		if (err)
3959			goto fail;
3960
3961		recon_state->nr_realms++;
3962	}
3963fail:
3964	return err;
3965}
3966
3967
3968/*
3969 * If an MDS fails and recovers, clients need to reconnect in order to
3970 * reestablish shared state.  This includes all caps issued through
3971 * this session _and_ the snap_realm hierarchy.  Because it's not
3972 * clear which snap realms the mds cares about, we send everything we
3973 * know about.. that ensures we'll then get any new info the
3974 * recovering MDS might have.
3975 *
3976 * This is a relatively heavyweight operation, but it's rare.
3977 */
3978static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3979			       struct ceph_mds_session *session)
3980{
3981	struct ceph_msg *reply;
3982	int mds = session->s_mds;
3983	int err = -ENOMEM;
3984	struct ceph_reconnect_state recon_state = {
3985		.session = session,
3986	};
3987	LIST_HEAD(dispose);
3988
3989	pr_info("mds%d reconnect start\n", mds);
3990
3991	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3992	if (!recon_state.pagelist)
3993		goto fail_nopagelist;
3994
3995	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3996	if (!reply)
3997		goto fail_nomsg;
3998
3999	xa_destroy(&session->s_delegated_inos);
4000
4001	mutex_lock(&session->s_mutex);
4002	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4003	session->s_seq = 0;
4004
4005	dout("session %p state %s\n", session,
4006	     ceph_session_state_name(session->s_state));
4007
4008	atomic_inc(&session->s_cap_gen);
4009
4010	spin_lock(&session->s_cap_lock);
4011	/* don't know if session is readonly */
4012	session->s_readonly = 0;
4013	/*
4014	 * notify __ceph_remove_cap() that we are composing cap reconnect.
4015	 * If a cap get released before being added to the cap reconnect,
4016	 * __ceph_remove_cap() should skip queuing cap release.
4017	 */
4018	session->s_cap_reconnect = 1;
4019	/* drop old cap expires; we're about to reestablish that state */
4020	detach_cap_releases(session, &dispose);
4021	spin_unlock(&session->s_cap_lock);
4022	dispose_cap_releases(mdsc, &dispose);
4023
4024	/* trim unused caps to reduce MDS's cache rejoin time */
4025	if (mdsc->fsc->sb->s_root)
4026		shrink_dcache_parent(mdsc->fsc->sb->s_root);
4027
4028	ceph_con_close(&session->s_con);
4029	ceph_con_open(&session->s_con,
4030		      CEPH_ENTITY_TYPE_MDS, mds,
4031		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4032
4033	/* replay unsafe requests */
4034	replay_unsafe_requests(mdsc, session);
4035
4036	ceph_early_kick_flushing_caps(mdsc, session);
4037
4038	down_read(&mdsc->snap_rwsem);
4039
4040	/* placeholder for nr_caps */
4041	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4042	if (err)
4043		goto fail;
4044
4045	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4046		recon_state.msg_version = 3;
4047		recon_state.allow_multi = true;
4048	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4049		recon_state.msg_version = 3;
4050	} else {
4051		recon_state.msg_version = 2;
4052	}
4053	/* trsaverse this session's caps */
4054	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4055
4056	spin_lock(&session->s_cap_lock);
4057	session->s_cap_reconnect = 0;
4058	spin_unlock(&session->s_cap_lock);
4059
4060	if (err < 0)
4061		goto fail;
4062
4063	/* check if all realms can be encoded into current message */
4064	if (mdsc->num_snap_realms) {
4065		size_t total_len =
4066			recon_state.pagelist->length +
4067			mdsc->num_snap_realms *
4068			sizeof(struct ceph_mds_snaprealm_reconnect);
4069		if (recon_state.msg_version >= 4) {
4070			/* number of realms */
4071			total_len += sizeof(u32);
4072			/* version, compat_version and struct_len */
4073			total_len += mdsc->num_snap_realms *
4074				     (2 * sizeof(u8) + sizeof(u32));
4075		}
4076		if (total_len > RECONNECT_MAX_SIZE) {
4077			if (!recon_state.allow_multi) {
4078				err = -ENOSPC;
4079				goto fail;
4080			}
4081			if (recon_state.nr_caps) {
4082				err = send_reconnect_partial(&recon_state);
4083				if (err)
4084					goto fail;
4085			}
4086			recon_state.msg_version = 5;
4087		}
4088	}
4089
4090	err = encode_snap_realms(mdsc, &recon_state);
4091	if (err < 0)
4092		goto fail;
4093
4094	if (recon_state.msg_version >= 5) {
4095		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4096		if (err < 0)
4097			goto fail;
4098	}
4099
4100	if (recon_state.nr_caps || recon_state.nr_realms) {
4101		struct page *page =
4102			list_first_entry(&recon_state.pagelist->head,
4103					struct page, lru);
4104		__le32 *addr = kmap_atomic(page);
4105		if (recon_state.nr_caps) {
4106			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4107			*addr = cpu_to_le32(recon_state.nr_caps);
4108		} else if (recon_state.msg_version >= 4) {
4109			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4110		}
4111		kunmap_atomic(addr);
4112	}
4113
4114	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4115	if (recon_state.msg_version >= 4)
4116		reply->hdr.compat_version = cpu_to_le16(4);
4117
4118	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4119	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4120
4121	ceph_con_send(&session->s_con, reply);
4122
4123	mutex_unlock(&session->s_mutex);
4124
4125	mutex_lock(&mdsc->mutex);
4126	__wake_requests(mdsc, &session->s_waiting);
4127	mutex_unlock(&mdsc->mutex);
4128
4129	up_read(&mdsc->snap_rwsem);
4130	ceph_pagelist_release(recon_state.pagelist);
4131	return;
4132
4133fail:
4134	ceph_msg_put(reply);
4135	up_read(&mdsc->snap_rwsem);
4136	mutex_unlock(&session->s_mutex);
4137fail_nomsg:
4138	ceph_pagelist_release(recon_state.pagelist);
4139fail_nopagelist:
4140	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4141	return;
4142}
4143
4144
4145/*
4146 * compare old and new mdsmaps, kicking requests
4147 * and closing out old connections as necessary
4148 *
4149 * called under mdsc->mutex.
4150 */
4151static void check_new_map(struct ceph_mds_client *mdsc,
4152			  struct ceph_mdsmap *newmap,
4153			  struct ceph_mdsmap *oldmap)
4154{
4155	int i;
4156	int oldstate, newstate;
4157	struct ceph_mds_session *s;
4158
4159	dout("check_new_map new %u old %u\n",
4160	     newmap->m_epoch, oldmap->m_epoch);
4161
4162	for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4163		if (!mdsc->sessions[i])
4164			continue;
4165		s = mdsc->sessions[i];
4166		oldstate = ceph_mdsmap_get_state(oldmap, i);
4167		newstate = ceph_mdsmap_get_state(newmap, i);
4168
4169		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4170		     i, ceph_mds_state_name(oldstate),
4171		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4172		     ceph_mds_state_name(newstate),
4173		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4174		     ceph_session_state_name(s->s_state));
4175
4176		if (i >= newmap->possible_max_rank) {
4177			/* force close session for stopped mds */
4178			ceph_get_mds_session(s);
4179			__unregister_session(mdsc, s);
4180			__wake_requests(mdsc, &s->s_waiting);
4181			mutex_unlock(&mdsc->mutex);
4182
4183			mutex_lock(&s->s_mutex);
4184			cleanup_session_requests(mdsc, s);
4185			remove_session_caps(s);
4186			mutex_unlock(&s->s_mutex);
4187
4188			ceph_put_mds_session(s);
4189
4190			mutex_lock(&mdsc->mutex);
4191			kick_requests(mdsc, i);
4192			continue;
4193		}
4194
4195		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4196			   ceph_mdsmap_get_addr(newmap, i),
4197			   sizeof(struct ceph_entity_addr))) {
4198			/* just close it */
4199			mutex_unlock(&mdsc->mutex);
4200			mutex_lock(&s->s_mutex);
4201			mutex_lock(&mdsc->mutex);
4202			ceph_con_close(&s->s_con);
4203			mutex_unlock(&s->s_mutex);
4204			s->s_state = CEPH_MDS_SESSION_RESTARTING;
4205		} else if (oldstate == newstate) {
4206			continue;  /* nothing new with this mds */
4207		}
4208
4209		/*
4210		 * send reconnect?
4211		 */
4212		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4213		    newstate >= CEPH_MDS_STATE_RECONNECT) {
4214			mutex_unlock(&mdsc->mutex);
4215			send_mds_reconnect(mdsc, s);
4216			mutex_lock(&mdsc->mutex);
4217		}
4218
4219		/*
4220		 * kick request on any mds that has gone active.
4221		 */
4222		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4223		    newstate >= CEPH_MDS_STATE_ACTIVE) {
4224			if (oldstate != CEPH_MDS_STATE_CREATING &&
4225			    oldstate != CEPH_MDS_STATE_STARTING)
4226				pr_info("mds%d recovery completed\n", s->s_mds);
4227			kick_requests(mdsc, i);
4228			mutex_unlock(&mdsc->mutex);
4229			mutex_lock(&s->s_mutex);
4230			mutex_lock(&mdsc->mutex);
4231			ceph_kick_flushing_caps(mdsc, s);
4232			mutex_unlock(&s->s_mutex);
4233			wake_up_session_caps(s, RECONNECT);
4234		}
4235	}
4236
4237	for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4238		s = mdsc->sessions[i];
4239		if (!s)
4240			continue;
4241		if (!ceph_mdsmap_is_laggy(newmap, i))
4242			continue;
4243		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4244		    s->s_state == CEPH_MDS_SESSION_HUNG ||
4245		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
4246			dout(" connecting to export targets of laggy mds%d\n",
4247			     i);
4248			__open_export_target_sessions(mdsc, s);
4249		}
4250	}
4251}
4252
4253
4254
4255/*
4256 * leases
4257 */
4258
4259/*
4260 * caller must hold session s_mutex, dentry->d_lock
4261 */
4262void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4263{
4264	struct ceph_dentry_info *di = ceph_dentry(dentry);
4265
4266	ceph_put_mds_session(di->lease_session);
4267	di->lease_session = NULL;
4268}
4269
4270static void handle_lease(struct ceph_mds_client *mdsc,
4271			 struct ceph_mds_session *session,
4272			 struct ceph_msg *msg)
4273{
4274	struct super_block *sb = mdsc->fsc->sb;
4275	struct inode *inode;
4276	struct dentry *parent, *dentry;
4277	struct ceph_dentry_info *di;
4278	int mds = session->s_mds;
4279	struct ceph_mds_lease *h = msg->front.iov_base;
4280	u32 seq;
4281	struct ceph_vino vino;
4282	struct qstr dname;
4283	int release = 0;
4284
4285	dout("handle_lease from mds%d\n", mds);
4286
4287	/* decode */
4288	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4289		goto bad;
4290	vino.ino = le64_to_cpu(h->ino);
4291	vino.snap = CEPH_NOSNAP;
4292	seq = le32_to_cpu(h->seq);
4293	dname.len = get_unaligned_le32(h + 1);
4294	if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4295		goto bad;
4296	dname.name = (void *)(h + 1) + sizeof(u32);
4297
4298	/* lookup inode */
4299	inode = ceph_find_inode(sb, vino);
4300	dout("handle_lease %s, ino %llx %p %.*s\n",
4301	     ceph_lease_op_name(h->action), vino.ino, inode,
4302	     dname.len, dname.name);
4303
4304	mutex_lock(&session->s_mutex);
4305	inc_session_sequence(session);
4306
4307	if (!inode) {
4308		dout("handle_lease no inode %llx\n", vino.ino);
4309		goto release;
4310	}
4311
4312	/* dentry */
4313	parent = d_find_alias(inode);
4314	if (!parent) {
4315		dout("no parent dentry on inode %p\n", inode);
4316		WARN_ON(1);
4317		goto release;  /* hrm... */
4318	}
4319	dname.hash = full_name_hash(parent, dname.name, dname.len);
4320	dentry = d_lookup(parent, &dname);
4321	dput(parent);
4322	if (!dentry)
4323		goto release;
4324
4325	spin_lock(&dentry->d_lock);
4326	di = ceph_dentry(dentry);
4327	switch (h->action) {
4328	case CEPH_MDS_LEASE_REVOKE:
4329		if (di->lease_session == session) {
4330			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4331				h->seq = cpu_to_le32(di->lease_seq);
4332			__ceph_mdsc_drop_dentry_lease(dentry);
4333		}
4334		release = 1;
4335		break;
4336
4337	case CEPH_MDS_LEASE_RENEW:
4338		if (di->lease_session == session &&
4339		    di->lease_gen == atomic_read(&session->s_cap_gen) &&
4340		    di->lease_renew_from &&
4341		    di->lease_renew_after == 0) {
4342			unsigned long duration =
4343				msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4344
4345			di->lease_seq = seq;
4346			di->time = di->lease_renew_from + duration;
4347			di->lease_renew_after = di->lease_renew_from +
4348				(duration >> 1);
4349			di->lease_renew_from = 0;
4350		}
4351		break;
4352	}
4353	spin_unlock(&dentry->d_lock);
4354	dput(dentry);
4355
4356	if (!release)
4357		goto out;
4358
4359release:
4360	/* let's just reuse the same message */
4361	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4362	ceph_msg_get(msg);
4363	ceph_con_send(&session->s_con, msg);
4364
4365out:
4366	mutex_unlock(&session->s_mutex);
4367	iput(inode);
4368	return;
4369
4370bad:
4371	pr_err("corrupt lease message\n");
4372	ceph_msg_dump(msg);
4373}
4374
4375void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4376			      struct dentry *dentry, char action,
4377			      u32 seq)
4378{
4379	struct ceph_msg *msg;
4380	struct ceph_mds_lease *lease;
4381	struct inode *dir;
4382	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4383
4384	dout("lease_send_msg identry %p %s to mds%d\n",
4385	     dentry, ceph_lease_op_name(action), session->s_mds);
4386
4387	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4388	if (!msg)
4389		return;
4390	lease = msg->front.iov_base;
4391	lease->action = action;
4392	lease->seq = cpu_to_le32(seq);
4393
4394	spin_lock(&dentry->d_lock);
4395	dir = d_inode(dentry->d_parent);
4396	lease->ino = cpu_to_le64(ceph_ino(dir));
4397	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4398
4399	put_unaligned_le32(dentry->d_name.len, lease + 1);
4400	memcpy((void *)(lease + 1) + 4,
4401	       dentry->d_name.name, dentry->d_name.len);
4402	spin_unlock(&dentry->d_lock);
4403	/*
4404	 * if this is a preemptive lease RELEASE, no need to
4405	 * flush request stream, since the actual request will
4406	 * soon follow.
4407	 */
4408	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4409
4410	ceph_con_send(&session->s_con, msg);
4411}
4412
4413/*
4414 * lock unlock sessions, to wait ongoing session activities
4415 */
4416static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
4417{
4418	int i;
4419
4420	mutex_lock(&mdsc->mutex);
4421	for (i = 0; i < mdsc->max_sessions; i++) {
4422		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4423		if (!s)
4424			continue;
4425		mutex_unlock(&mdsc->mutex);
4426		mutex_lock(&s->s_mutex);
4427		mutex_unlock(&s->s_mutex);
4428		ceph_put_mds_session(s);
4429		mutex_lock(&mdsc->mutex);
4430	}
4431	mutex_unlock(&mdsc->mutex);
4432}
4433
4434static void maybe_recover_session(struct ceph_mds_client *mdsc)
4435{
4436	struct ceph_fs_client *fsc = mdsc->fsc;
4437
4438	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4439		return;
4440
4441	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4442		return;
4443
4444	if (!READ_ONCE(fsc->blocklisted))
4445		return;
4446
4447	pr_info("auto reconnect after blocklisted\n");
4448	ceph_force_reconnect(fsc->sb);
4449}
4450
4451bool check_session_state(struct ceph_mds_session *s)
4452{
4453	switch (s->s_state) {
4454	case CEPH_MDS_SESSION_OPEN:
4455		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4456			s->s_state = CEPH_MDS_SESSION_HUNG;
4457			pr_info("mds%d hung\n", s->s_mds);
4458		}
4459		break;
4460	case CEPH_MDS_SESSION_CLOSING:
4461		/* Should never reach this when we're unmounting */
4462		WARN_ON_ONCE(s->s_ttl);
4463		fallthrough;
4464	case CEPH_MDS_SESSION_NEW:
4465	case CEPH_MDS_SESSION_RESTARTING:
4466	case CEPH_MDS_SESSION_CLOSED:
4467	case CEPH_MDS_SESSION_REJECTED:
4468		return false;
4469	}
4470
4471	return true;
4472}
4473
4474/*
4475 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4476 * then we need to retransmit that request.
4477 */
4478void inc_session_sequence(struct ceph_mds_session *s)
4479{
4480	lockdep_assert_held(&s->s_mutex);
4481
4482	s->s_seq++;
4483
4484	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4485		int ret;
4486
4487		dout("resending session close request for mds%d\n", s->s_mds);
4488		ret = request_close_session(s);
4489		if (ret < 0)
4490			pr_err("unable to close session to mds%d: %d\n",
4491			       s->s_mds, ret);
4492	}
4493}
4494
4495/*
4496 * delayed work -- periodically trim expired leases, renew caps with mds.  If
4497 * the @delay parameter is set to 0 or if it's more than 5 secs, the default
4498 * workqueue delay value of 5 secs will be used.
4499 */
4500static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
4501{
4502	unsigned long max_delay = HZ * 5;
4503
4504	/* 5 secs default delay */
4505	if (!delay || (delay > max_delay))
4506		delay = max_delay;
4507	schedule_delayed_work(&mdsc->delayed_work,
4508			      round_jiffies_relative(delay));
4509}
4510
4511static void delayed_work(struct work_struct *work)
4512{
4513	struct ceph_mds_client *mdsc =
4514		container_of(work, struct ceph_mds_client, delayed_work.work);
4515	unsigned long delay;
4516	int renew_interval;
4517	int renew_caps;
4518	int i;
4519
4520	dout("mdsc delayed_work\n");
4521
4522	if (mdsc->stopping)
4523		return;
4524
4525	mutex_lock(&mdsc->mutex);
4526	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4527	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4528				   mdsc->last_renew_caps);
4529	if (renew_caps)
4530		mdsc->last_renew_caps = jiffies;
4531
4532	for (i = 0; i < mdsc->max_sessions; i++) {
4533		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4534		if (!s)
4535			continue;
4536
4537		if (!check_session_state(s)) {
4538			ceph_put_mds_session(s);
4539			continue;
4540		}
4541		mutex_unlock(&mdsc->mutex);
4542
4543		mutex_lock(&s->s_mutex);
4544		if (renew_caps)
4545			send_renew_caps(mdsc, s);
4546		else
4547			ceph_con_keepalive(&s->s_con);
4548		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4549		    s->s_state == CEPH_MDS_SESSION_HUNG)
4550			ceph_send_cap_releases(mdsc, s);
4551		mutex_unlock(&s->s_mutex);
4552		ceph_put_mds_session(s);
4553
4554		mutex_lock(&mdsc->mutex);
4555	}
4556	mutex_unlock(&mdsc->mutex);
4557
4558	delay = ceph_check_delayed_caps(mdsc);
4559
4560	ceph_queue_cap_reclaim_work(mdsc);
4561
4562	ceph_trim_snapid_map(mdsc);
4563
4564	maybe_recover_session(mdsc);
4565
4566	schedule_delayed(mdsc, delay);
4567}
4568
4569int ceph_mdsc_init(struct ceph_fs_client *fsc)
4570
4571{
4572	struct ceph_mds_client *mdsc;
4573	int err;
4574
4575	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4576	if (!mdsc)
4577		return -ENOMEM;
4578	mdsc->fsc = fsc;
4579	mutex_init(&mdsc->mutex);
4580	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4581	if (!mdsc->mdsmap) {
4582		err = -ENOMEM;
4583		goto err_mdsc;
4584	}
4585
4586	init_completion(&mdsc->safe_umount_waiters);
4587	init_waitqueue_head(&mdsc->session_close_wq);
4588	INIT_LIST_HEAD(&mdsc->waiting_for_map);
4589	mdsc->sessions = NULL;
4590	atomic_set(&mdsc->num_sessions, 0);
4591	mdsc->max_sessions = 0;
4592	mdsc->stopping = 0;
4593	atomic64_set(&mdsc->quotarealms_count, 0);
4594	mdsc->quotarealms_inodes = RB_ROOT;
4595	mutex_init(&mdsc->quotarealms_inodes_mutex);
4596	mdsc->last_snap_seq = 0;
4597	init_rwsem(&mdsc->snap_rwsem);
4598	mdsc->snap_realms = RB_ROOT;
4599	INIT_LIST_HEAD(&mdsc->snap_empty);
4600	mdsc->num_snap_realms = 0;
4601	spin_lock_init(&mdsc->snap_empty_lock);
4602	mdsc->last_tid = 0;
4603	mdsc->oldest_tid = 0;
4604	mdsc->request_tree = RB_ROOT;
4605	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4606	mdsc->last_renew_caps = jiffies;
4607	INIT_LIST_HEAD(&mdsc->cap_delay_list);
4608	INIT_LIST_HEAD(&mdsc->cap_wait_list);
4609	spin_lock_init(&mdsc->cap_delay_lock);
4610	INIT_LIST_HEAD(&mdsc->snap_flush_list);
4611	spin_lock_init(&mdsc->snap_flush_lock);
4612	mdsc->last_cap_flush_tid = 1;
4613	INIT_LIST_HEAD(&mdsc->cap_flush_list);
4614	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4615	mdsc->num_cap_flushing = 0;
4616	spin_lock_init(&mdsc->cap_dirty_lock);
4617	init_waitqueue_head(&mdsc->cap_flushing_wq);
4618	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4619	atomic_set(&mdsc->cap_reclaim_pending, 0);
4620	err = ceph_metric_init(&mdsc->metric);
4621	if (err)
4622		goto err_mdsmap;
4623
4624	spin_lock_init(&mdsc->dentry_list_lock);
4625	INIT_LIST_HEAD(&mdsc->dentry_leases);
4626	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4627
4628	ceph_caps_init(mdsc);
4629	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4630
4631	spin_lock_init(&mdsc->snapid_map_lock);
4632	mdsc->snapid_map_tree = RB_ROOT;
4633	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4634
4635	init_rwsem(&mdsc->pool_perm_rwsem);
4636	mdsc->pool_perm_tree = RB_ROOT;
4637
4638	strscpy(mdsc->nodename, utsname()->nodename,
4639		sizeof(mdsc->nodename));
4640
4641	fsc->mdsc = mdsc;
4642	return 0;
4643
4644err_mdsmap:
4645	kfree(mdsc->mdsmap);
4646err_mdsc:
4647	kfree(mdsc);
4648	return err;
4649}
4650
4651/*
4652 * Wait for safe replies on open mds requests.  If we time out, drop
4653 * all requests from the tree to avoid dangling dentry refs.
4654 */
4655static void wait_requests(struct ceph_mds_client *mdsc)
4656{
4657	struct ceph_options *opts = mdsc->fsc->client->options;
4658	struct ceph_mds_request *req;
4659
4660	mutex_lock(&mdsc->mutex);
4661	if (__get_oldest_req(mdsc)) {
4662		mutex_unlock(&mdsc->mutex);
4663
4664		dout("wait_requests waiting for requests\n");
4665		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4666				    ceph_timeout_jiffies(opts->mount_timeout));
4667
4668		/* tear down remaining requests */
4669		mutex_lock(&mdsc->mutex);
4670		while ((req = __get_oldest_req(mdsc))) {
4671			dout("wait_requests timed out on tid %llu\n",
4672			     req->r_tid);
4673			list_del_init(&req->r_wait);
4674			__unregister_request(mdsc, req);
4675		}
4676	}
4677	mutex_unlock(&mdsc->mutex);
4678	dout("wait_requests done\n");
4679}
4680
4681/*
4682 * called before mount is ro, and before dentries are torn down.
4683 * (hmm, does this still race with new lookups?)
4684 */
4685void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4686{
4687	dout("pre_umount\n");
4688	mdsc->stopping = 1;
4689
4690	lock_unlock_sessions(mdsc);
4691	ceph_flush_dirty_caps(mdsc);
4692	wait_requests(mdsc);
4693
4694	/*
4695	 * wait for reply handlers to drop their request refs and
4696	 * their inode/dcache refs
4697	 */
4698	ceph_msgr_flush();
4699
4700	ceph_cleanup_quotarealms_inodes(mdsc);
4701}
4702
4703/*
4704 * wait for all write mds requests to flush.
4705 */
4706static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4707{
4708	struct ceph_mds_request *req = NULL, *nextreq;
4709	struct rb_node *n;
4710
4711	mutex_lock(&mdsc->mutex);
4712	dout("wait_unsafe_requests want %lld\n", want_tid);
4713restart:
4714	req = __get_oldest_req(mdsc);
4715	while (req && req->r_tid <= want_tid) {
4716		/* find next request */
4717		n = rb_next(&req->r_node);
4718		if (n)
4719			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4720		else
4721			nextreq = NULL;
4722		if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4723		    (req->r_op & CEPH_MDS_OP_WRITE)) {
4724			/* write op */
4725			ceph_mdsc_get_request(req);
4726			if (nextreq)
4727				ceph_mdsc_get_request(nextreq);
4728			mutex_unlock(&mdsc->mutex);
4729			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4730			     req->r_tid, want_tid);
4731			wait_for_completion(&req->r_safe_completion);
4732			mutex_lock(&mdsc->mutex);
4733			ceph_mdsc_put_request(req);
4734			if (!nextreq)
4735				break;  /* next dne before, so we're done! */
4736			if (RB_EMPTY_NODE(&nextreq->r_node)) {
4737				/* next request was removed from tree */
4738				ceph_mdsc_put_request(nextreq);
4739				goto restart;
4740			}
4741			ceph_mdsc_put_request(nextreq);  /* won't go away */
4742		}
4743		req = nextreq;
4744	}
4745	mutex_unlock(&mdsc->mutex);
4746	dout("wait_unsafe_requests done\n");
4747}
4748
4749void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4750{
4751	u64 want_tid, want_flush;
4752
4753	if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
4754		return;
4755
4756	dout("sync\n");
4757	mutex_lock(&mdsc->mutex);
4758	want_tid = mdsc->last_tid;
4759	mutex_unlock(&mdsc->mutex);
4760
4761	ceph_flush_dirty_caps(mdsc);
4762	spin_lock(&mdsc->cap_dirty_lock);
4763	want_flush = mdsc->last_cap_flush_tid;
4764	if (!list_empty(&mdsc->cap_flush_list)) {
4765		struct ceph_cap_flush *cf =
4766			list_last_entry(&mdsc->cap_flush_list,
4767					struct ceph_cap_flush, g_list);
4768		cf->wake = true;
4769	}
4770	spin_unlock(&mdsc->cap_dirty_lock);
4771
4772	dout("sync want tid %lld flush_seq %lld\n",
4773	     want_tid, want_flush);
4774
4775	wait_unsafe_requests(mdsc, want_tid);
4776	wait_caps_flush(mdsc, want_flush);
4777}
4778
4779/*
4780 * true if all sessions are closed, or we force unmount
4781 */
4782static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4783{
4784	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4785		return true;
4786	return atomic_read(&mdsc->num_sessions) <= skipped;
4787}
4788
4789/*
4790 * called after sb is ro.
4791 */
4792void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4793{
4794	struct ceph_options *opts = mdsc->fsc->client->options;
4795	struct ceph_mds_session *session;
4796	int i;
4797	int skipped = 0;
4798
4799	dout("close_sessions\n");
4800
4801	/* close sessions */
4802	mutex_lock(&mdsc->mutex);
4803	for (i = 0; i < mdsc->max_sessions; i++) {
4804		session = __ceph_lookup_mds_session(mdsc, i);
4805		if (!session)
4806			continue;
4807		mutex_unlock(&mdsc->mutex);
4808		mutex_lock(&session->s_mutex);
4809		if (__close_session(mdsc, session) <= 0)
4810			skipped++;
4811		mutex_unlock(&session->s_mutex);
4812		ceph_put_mds_session(session);
4813		mutex_lock(&mdsc->mutex);
4814	}
4815	mutex_unlock(&mdsc->mutex);
4816
4817	dout("waiting for sessions to close\n");
4818	wait_event_timeout(mdsc->session_close_wq,
4819			   done_closing_sessions(mdsc, skipped),
4820			   ceph_timeout_jiffies(opts->mount_timeout));
4821
4822	/* tear down remaining sessions */
4823	mutex_lock(&mdsc->mutex);
4824	for (i = 0; i < mdsc->max_sessions; i++) {
4825		if (mdsc->sessions[i]) {
4826			session = ceph_get_mds_session(mdsc->sessions[i]);
4827			__unregister_session(mdsc, session);
4828			mutex_unlock(&mdsc->mutex);
4829			mutex_lock(&session->s_mutex);
4830			remove_session_caps(session);
4831			mutex_unlock(&session->s_mutex);
4832			ceph_put_mds_session(session);
4833			mutex_lock(&mdsc->mutex);
4834		}
4835	}
4836	WARN_ON(!list_empty(&mdsc->cap_delay_list));
4837	mutex_unlock(&mdsc->mutex);
4838
4839	ceph_cleanup_snapid_map(mdsc);
4840	ceph_cleanup_empty_realms(mdsc);
4841
4842	cancel_work_sync(&mdsc->cap_reclaim_work);
4843	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4844
4845	dout("stopped\n");
4846}
4847
4848void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4849{
4850	struct ceph_mds_session *session;
4851	int mds;
4852
4853	dout("force umount\n");
4854
4855	mutex_lock(&mdsc->mutex);
4856	for (mds = 0; mds < mdsc->max_sessions; mds++) {
4857		session = __ceph_lookup_mds_session(mdsc, mds);
4858		if (!session)
4859			continue;
4860
4861		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4862			__unregister_session(mdsc, session);
4863		__wake_requests(mdsc, &session->s_waiting);
4864		mutex_unlock(&mdsc->mutex);
4865
4866		mutex_lock(&session->s_mutex);
4867		__close_session(mdsc, session);
4868		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4869			cleanup_session_requests(mdsc, session);
4870			remove_session_caps(session);
4871		}
4872		mutex_unlock(&session->s_mutex);
4873		ceph_put_mds_session(session);
4874
4875		mutex_lock(&mdsc->mutex);
4876		kick_requests(mdsc, mds);
4877	}
4878	__wake_requests(mdsc, &mdsc->waiting_for_map);
4879	mutex_unlock(&mdsc->mutex);
4880}
4881
4882static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4883{
4884	dout("stop\n");
4885	/*
4886	 * Make sure the delayed work stopped before releasing
4887	 * the resources.
4888	 *
4889	 * Because the cancel_delayed_work_sync() will only
4890	 * guarantee that the work finishes executing. But the
4891	 * delayed work will re-arm itself again after that.
4892	 */
4893	flush_delayed_work(&mdsc->delayed_work);
4894
4895	if (mdsc->mdsmap)
4896		ceph_mdsmap_destroy(mdsc->mdsmap);
4897	kfree(mdsc->sessions);
4898	ceph_caps_finalize(mdsc);
4899	ceph_pool_perm_destroy(mdsc);
4900}
4901
4902void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4903{
4904	struct ceph_mds_client *mdsc = fsc->mdsc;
4905	dout("mdsc_destroy %p\n", mdsc);
4906
4907	if (!mdsc)
4908		return;
4909
4910	/* flush out any connection work with references to us */
4911	ceph_msgr_flush();
4912
4913	ceph_mdsc_stop(mdsc);
4914
4915	ceph_metric_destroy(&mdsc->metric);
4916
4917	flush_delayed_work(&mdsc->metric.delayed_work);
4918	fsc->mdsc = NULL;
4919	kfree(mdsc);
4920	dout("mdsc_destroy %p done\n", mdsc);
4921}
4922
4923void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4924{
4925	struct ceph_fs_client *fsc = mdsc->fsc;
4926	const char *mds_namespace = fsc->mount_options->mds_namespace;
4927	void *p = msg->front.iov_base;
4928	void *end = p + msg->front.iov_len;
4929	u32 epoch;
4930	u32 num_fs;
4931	u32 mount_fscid = (u32)-1;
4932	int err = -EINVAL;
4933
4934	ceph_decode_need(&p, end, sizeof(u32), bad);
4935	epoch = ceph_decode_32(&p);
4936
4937	dout("handle_fsmap epoch %u\n", epoch);
4938
4939	/* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
4940	ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
4941
4942	ceph_decode_32_safe(&p, end, num_fs, bad);
4943	while (num_fs-- > 0) {
4944		void *info_p, *info_end;
4945		u32 info_len;
4946		u32 fscid, namelen;
4947
4948		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4949		p += 2;		// info_v, info_cv
4950		info_len = ceph_decode_32(&p);
4951		ceph_decode_need(&p, end, info_len, bad);
4952		info_p = p;
4953		info_end = p + info_len;
4954		p = info_end;
4955
4956		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4957		fscid = ceph_decode_32(&info_p);
4958		namelen = ceph_decode_32(&info_p);
4959		ceph_decode_need(&info_p, info_end, namelen, bad);
4960
4961		if (mds_namespace &&
4962		    strlen(mds_namespace) == namelen &&
4963		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
4964			mount_fscid = fscid;
4965			break;
4966		}
4967	}
4968
4969	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4970	if (mount_fscid != (u32)-1) {
4971		fsc->client->monc.fs_cluster_id = mount_fscid;
4972		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4973				   0, true);
4974		ceph_monc_renew_subs(&fsc->client->monc);
4975	} else {
4976		err = -ENOENT;
4977		goto err_out;
4978	}
4979	return;
4980
4981bad:
4982	pr_err("error decoding fsmap\n");
4983err_out:
4984	mutex_lock(&mdsc->mutex);
4985	mdsc->mdsmap_err = err;
4986	__wake_requests(mdsc, &mdsc->waiting_for_map);
4987	mutex_unlock(&mdsc->mutex);
4988}
4989
4990/*
4991 * handle mds map update.
4992 */
4993void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4994{
4995	u32 epoch;
4996	u32 maplen;
4997	void *p = msg->front.iov_base;
4998	void *end = p + msg->front.iov_len;
4999	struct ceph_mdsmap *newmap, *oldmap;
5000	struct ceph_fsid fsid;
5001	int err = -EINVAL;
5002
5003	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5004	ceph_decode_copy(&p, &fsid, sizeof(fsid));
5005	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5006		return;
5007	epoch = ceph_decode_32(&p);
5008	maplen = ceph_decode_32(&p);
5009	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5010
5011	/* do we need it? */
5012	mutex_lock(&mdsc->mutex);
5013	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5014		dout("handle_map epoch %u <= our %u\n",
5015		     epoch, mdsc->mdsmap->m_epoch);
5016		mutex_unlock(&mdsc->mutex);
5017		return;
5018	}
5019
5020	newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
5021	if (IS_ERR(newmap)) {
5022		err = PTR_ERR(newmap);
5023		goto bad_unlock;
5024	}
5025
5026	/* swap into place */
5027	if (mdsc->mdsmap) {
5028		oldmap = mdsc->mdsmap;
5029		mdsc->mdsmap = newmap;
5030		check_new_map(mdsc, newmap, oldmap);
5031		ceph_mdsmap_destroy(oldmap);
5032	} else {
5033		mdsc->mdsmap = newmap;  /* first mds map */
5034	}
5035	mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5036					MAX_LFS_FILESIZE);
5037
5038	__wake_requests(mdsc, &mdsc->waiting_for_map);
5039	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5040			  mdsc->mdsmap->m_epoch);
5041
5042	mutex_unlock(&mdsc->mutex);
5043	schedule_delayed(mdsc, 0);
5044	return;
5045
5046bad_unlock:
5047	mutex_unlock(&mdsc->mutex);
5048bad:
5049	pr_err("error decoding mdsmap %d\n", err);
5050	return;
5051}
5052
5053static struct ceph_connection *mds_get_con(struct ceph_connection *con)
5054{
5055	struct ceph_mds_session *s = con->private;
5056
5057	if (ceph_get_mds_session(s))
5058		return con;
5059	return NULL;
5060}
5061
5062static void mds_put_con(struct ceph_connection *con)
5063{
5064	struct ceph_mds_session *s = con->private;
5065
5066	ceph_put_mds_session(s);
5067}
5068
5069/*
5070 * if the client is unresponsive for long enough, the mds will kill
5071 * the session entirely.
5072 */
5073static void mds_peer_reset(struct ceph_connection *con)
5074{
5075	struct ceph_mds_session *s = con->private;
5076	struct ceph_mds_client *mdsc = s->s_mdsc;
5077
5078	pr_warn("mds%d closed our session\n", s->s_mds);
5079	send_mds_reconnect(mdsc, s);
5080}
5081
5082static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5083{
5084	struct ceph_mds_session *s = con->private;
5085	struct ceph_mds_client *mdsc = s->s_mdsc;
5086	int type = le16_to_cpu(msg->hdr.type);
5087
5088	mutex_lock(&mdsc->mutex);
5089	if (__verify_registered_session(mdsc, s) < 0) {
5090		mutex_unlock(&mdsc->mutex);
5091		goto out;
5092	}
5093	mutex_unlock(&mdsc->mutex);
5094
5095	switch (type) {
5096	case CEPH_MSG_MDS_MAP:
5097		ceph_mdsc_handle_mdsmap(mdsc, msg);
5098		break;
5099	case CEPH_MSG_FS_MAP_USER:
5100		ceph_mdsc_handle_fsmap(mdsc, msg);
5101		break;
5102	case CEPH_MSG_CLIENT_SESSION:
5103		handle_session(s, msg);
5104		break;
5105	case CEPH_MSG_CLIENT_REPLY:
5106		handle_reply(s, msg);
5107		break;
5108	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5109		handle_forward(mdsc, s, msg);
5110		break;
5111	case CEPH_MSG_CLIENT_CAPS:
5112		ceph_handle_caps(s, msg);
5113		break;
5114	case CEPH_MSG_CLIENT_SNAP:
5115		ceph_handle_snap(mdsc, s, msg);
5116		break;
5117	case CEPH_MSG_CLIENT_LEASE:
5118		handle_lease(mdsc, s, msg);
5119		break;
5120	case CEPH_MSG_CLIENT_QUOTA:
5121		ceph_handle_quota(mdsc, s, msg);
5122		break;
5123
5124	default:
5125		pr_err("received unknown message type %d %s\n", type,
5126		       ceph_msg_type_name(type));
5127	}
5128out:
5129	ceph_msg_put(msg);
5130}
5131
5132/*
5133 * authentication
5134 */
5135
5136/*
5137 * Note: returned pointer is the address of a structure that's
5138 * managed separately.  Caller must *not* attempt to free it.
5139 */
5140static struct ceph_auth_handshake *
5141mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5142{
5143	struct ceph_mds_session *s = con->private;
5144	struct ceph_mds_client *mdsc = s->s_mdsc;
5145	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5146	struct ceph_auth_handshake *auth = &s->s_auth;
5147	int ret;
5148
5149	ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5150					 force_new, proto, NULL, NULL);
5151	if (ret)
5152		return ERR_PTR(ret);
5153
5154	return auth;
5155}
5156
5157static int mds_add_authorizer_challenge(struct ceph_connection *con,
5158				    void *challenge_buf, int challenge_buf_len)
5159{
5160	struct ceph_mds_session *s = con->private;
5161	struct ceph_mds_client *mdsc = s->s_mdsc;
5162	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5163
5164	return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5165					    challenge_buf, challenge_buf_len);
5166}
5167
5168static int mds_verify_authorizer_reply(struct ceph_connection *con)
5169{
5170	struct ceph_mds_session *s = con->private;
5171	struct ceph_mds_client *mdsc = s->s_mdsc;
5172	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5173	struct ceph_auth_handshake *auth = &s->s_auth;
5174
5175	return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5176		auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5177		NULL, NULL, NULL, NULL);
5178}
5179
5180static int mds_invalidate_authorizer(struct ceph_connection *con)
5181{
5182	struct ceph_mds_session *s = con->private;
5183	struct ceph_mds_client *mdsc = s->s_mdsc;
5184	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5185
5186	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5187
5188	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5189}
5190
5191static int mds_get_auth_request(struct ceph_connection *con,
5192				void *buf, int *buf_len,
5193				void **authorizer, int *authorizer_len)
5194{
5195	struct ceph_mds_session *s = con->private;
5196	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5197	struct ceph_auth_handshake *auth = &s->s_auth;
5198	int ret;
5199
5200	ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5201				       buf, buf_len);
5202	if (ret)
5203		return ret;
5204
5205	*authorizer = auth->authorizer_buf;
5206	*authorizer_len = auth->authorizer_buf_len;
5207	return 0;
5208}
5209
5210static int mds_handle_auth_reply_more(struct ceph_connection *con,
5211				      void *reply, int reply_len,
5212				      void *buf, int *buf_len,
5213				      void **authorizer, int *authorizer_len)
5214{
5215	struct ceph_mds_session *s = con->private;
5216	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5217	struct ceph_auth_handshake *auth = &s->s_auth;
5218	int ret;
5219
5220	ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5221					      buf, buf_len);
5222	if (ret)
5223		return ret;
5224
5225	*authorizer = auth->authorizer_buf;
5226	*authorizer_len = auth->authorizer_buf_len;
5227	return 0;
5228}
5229
5230static int mds_handle_auth_done(struct ceph_connection *con,
5231				u64 global_id, void *reply, int reply_len,
5232				u8 *session_key, int *session_key_len,
5233				u8 *con_secret, int *con_secret_len)
5234{
5235	struct ceph_mds_session *s = con->private;
5236	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5237	struct ceph_auth_handshake *auth = &s->s_auth;
5238
5239	return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5240					       session_key, session_key_len,
5241					       con_secret, con_secret_len);
5242}
5243
5244static int mds_handle_auth_bad_method(struct ceph_connection *con,
5245				      int used_proto, int result,
5246				      const int *allowed_protos, int proto_cnt,
5247				      const int *allowed_modes, int mode_cnt)
5248{
5249	struct ceph_mds_session *s = con->private;
5250	struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
5251	int ret;
5252
5253	if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
5254					    used_proto, result,
5255					    allowed_protos, proto_cnt,
5256					    allowed_modes, mode_cnt)) {
5257		ret = ceph_monc_validate_auth(monc);
5258		if (ret)
5259			return ret;
5260	}
5261
5262	return -EACCES;
5263}
5264
5265static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5266				struct ceph_msg_header *hdr, int *skip)
5267{
5268	struct ceph_msg *msg;
5269	int type = (int) le16_to_cpu(hdr->type);
5270	int front_len = (int) le32_to_cpu(hdr->front_len);
5271
5272	if (con->in_msg)
5273		return con->in_msg;
5274
5275	*skip = 0;
5276	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5277	if (!msg) {
5278		pr_err("unable to allocate msg type %d len %d\n",
5279		       type, front_len);
5280		return NULL;
5281	}
5282
5283	return msg;
5284}
5285
5286static int mds_sign_message(struct ceph_msg *msg)
5287{
5288       struct ceph_mds_session *s = msg->con->private;
5289       struct ceph_auth_handshake *auth = &s->s_auth;
5290
5291       return ceph_auth_sign_message(auth, msg);
5292}
5293
5294static int mds_check_message_signature(struct ceph_msg *msg)
5295{
5296       struct ceph_mds_session *s = msg->con->private;
5297       struct ceph_auth_handshake *auth = &s->s_auth;
5298
5299       return ceph_auth_check_message_signature(auth, msg);
5300}
5301
5302static const struct ceph_connection_operations mds_con_ops = {
5303	.get = mds_get_con,
5304	.put = mds_put_con,
5305	.alloc_msg = mds_alloc_msg,
5306	.dispatch = mds_dispatch,
5307	.peer_reset = mds_peer_reset,
5308	.get_authorizer = mds_get_authorizer,
5309	.add_authorizer_challenge = mds_add_authorizer_challenge,
5310	.verify_authorizer_reply = mds_verify_authorizer_reply,
5311	.invalidate_authorizer = mds_invalidate_authorizer,
5312	.sign_message = mds_sign_message,
5313	.check_message_signature = mds_check_message_signature,
5314	.get_auth_request = mds_get_auth_request,
5315	.handle_auth_reply_more = mds_handle_auth_reply_more,
5316	.handle_auth_done = mds_handle_auth_done,
5317	.handle_auth_bad_method = mds_handle_auth_bad_method,
5318};
5319
5320/* eof */
5321