mds_client.c revision c80dc3ae
1// SPDX-License-Identifier: GPL-2.0
2#include <linux/ceph/ceph_debug.h>
3
4#include <linux/fs.h>
5#include <linux/wait.h>
6#include <linux/slab.h>
7#include <linux/gfp.h>
8#include <linux/sched.h>
9#include <linux/debugfs.h>
10#include <linux/seq_file.h>
11#include <linux/ratelimit.h>
12#include <linux/bits.h>
13#include <linux/ktime.h>
14
15#include "super.h"
16#include "mds_client.h"
17
18#include <linux/ceph/ceph_features.h>
19#include <linux/ceph/messenger.h>
20#include <linux/ceph/decode.h>
21#include <linux/ceph/pagelist.h>
22#include <linux/ceph/auth.h>
23#include <linux/ceph/debugfs.h>
24
25#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
26
27/*
28 * A cluster of MDS (metadata server) daemons is responsible for
29 * managing the file system namespace (the directory hierarchy and
30 * inodes) and for coordinating shared access to storage.  Metadata is
31 * partitioning hierarchically across a number of servers, and that
32 * partition varies over time as the cluster adjusts the distribution
33 * in order to balance load.
34 *
35 * The MDS client is primarily responsible to managing synchronous
36 * metadata requests for operations like open, unlink, and so forth.
37 * If there is a MDS failure, we find out about it when we (possibly
38 * request and) receive a new MDS map, and can resubmit affected
39 * requests.
40 *
41 * For the most part, though, we take advantage of a lossless
42 * communications channel to the MDS, and do not need to worry about
43 * timing out or resubmitting requests.
44 *
45 * We maintain a stateful "session" with each MDS we interact with.
46 * Within each session, we sent periodic heartbeat messages to ensure
47 * any capabilities or leases we have been issues remain valid.  If
48 * the session times out and goes stale, our leases and capabilities
49 * are no longer valid.
50 */
51
52struct ceph_reconnect_state {
53	struct ceph_mds_session *session;
54	int nr_caps, nr_realms;
55	struct ceph_pagelist *pagelist;
56	unsigned msg_version;
57	bool allow_multi;
58};
59
60static void __wake_requests(struct ceph_mds_client *mdsc,
61			    struct list_head *head);
62static void ceph_cap_release_work(struct work_struct *work);
63static void ceph_cap_reclaim_work(struct work_struct *work);
64
65static const struct ceph_connection_operations mds_con_ops;
66
67
68/*
69 * mds reply parsing
70 */
71
72static int parse_reply_info_quota(void **p, void *end,
73				  struct ceph_mds_reply_info_in *info)
74{
75	u8 struct_v, struct_compat;
76	u32 struct_len;
77
78	ceph_decode_8_safe(p, end, struct_v, bad);
79	ceph_decode_8_safe(p, end, struct_compat, bad);
80	/* struct_v is expected to be >= 1. we only
81	 * understand encoding with struct_compat == 1. */
82	if (!struct_v || struct_compat != 1)
83		goto bad;
84	ceph_decode_32_safe(p, end, struct_len, bad);
85	ceph_decode_need(p, end, struct_len, bad);
86	end = *p + struct_len;
87	ceph_decode_64_safe(p, end, info->max_bytes, bad);
88	ceph_decode_64_safe(p, end, info->max_files, bad);
89	*p = end;
90	return 0;
91bad:
92	return -EIO;
93}
94
95/*
96 * parse individual inode info
97 */
98static int parse_reply_info_in(void **p, void *end,
99			       struct ceph_mds_reply_info_in *info,
100			       u64 features)
101{
102	int err = 0;
103	u8 struct_v = 0;
104
105	if (features == (u64)-1) {
106		u32 struct_len;
107		u8 struct_compat;
108		ceph_decode_8_safe(p, end, struct_v, bad);
109		ceph_decode_8_safe(p, end, struct_compat, bad);
110		/* struct_v is expected to be >= 1. we only understand
111		 * encoding with struct_compat == 1. */
112		if (!struct_v || struct_compat != 1)
113			goto bad;
114		ceph_decode_32_safe(p, end, struct_len, bad);
115		ceph_decode_need(p, end, struct_len, bad);
116		end = *p + struct_len;
117	}
118
119	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
120	info->in = *p;
121	*p += sizeof(struct ceph_mds_reply_inode) +
122		sizeof(*info->in->fragtree.splits) *
123		le32_to_cpu(info->in->fragtree.nsplits);
124
125	ceph_decode_32_safe(p, end, info->symlink_len, bad);
126	ceph_decode_need(p, end, info->symlink_len, bad);
127	info->symlink = *p;
128	*p += info->symlink_len;
129
130	ceph_decode_copy_safe(p, end, &info->dir_layout,
131			      sizeof(info->dir_layout), bad);
132	ceph_decode_32_safe(p, end, info->xattr_len, bad);
133	ceph_decode_need(p, end, info->xattr_len, bad);
134	info->xattr_data = *p;
135	*p += info->xattr_len;
136
137	if (features == (u64)-1) {
138		/* inline data */
139		ceph_decode_64_safe(p, end, info->inline_version, bad);
140		ceph_decode_32_safe(p, end, info->inline_len, bad);
141		ceph_decode_need(p, end, info->inline_len, bad);
142		info->inline_data = *p;
143		*p += info->inline_len;
144		/* quota */
145		err = parse_reply_info_quota(p, end, info);
146		if (err < 0)
147			goto out_bad;
148		/* pool namespace */
149		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
150		if (info->pool_ns_len > 0) {
151			ceph_decode_need(p, end, info->pool_ns_len, bad);
152			info->pool_ns_data = *p;
153			*p += info->pool_ns_len;
154		}
155
156		/* btime */
157		ceph_decode_need(p, end, sizeof(info->btime), bad);
158		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
159
160		/* change attribute */
161		ceph_decode_64_safe(p, end, info->change_attr, bad);
162
163		/* dir pin */
164		if (struct_v >= 2) {
165			ceph_decode_32_safe(p, end, info->dir_pin, bad);
166		} else {
167			info->dir_pin = -ENODATA;
168		}
169
170		/* snapshot birth time, remains zero for v<=2 */
171		if (struct_v >= 3) {
172			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
173			ceph_decode_copy(p, &info->snap_btime,
174					 sizeof(info->snap_btime));
175		} else {
176			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
177		}
178
179		/* snapshot count, remains zero for v<=3 */
180		if (struct_v >= 4) {
181			ceph_decode_64_safe(p, end, info->rsnaps, bad);
182		} else {
183			info->rsnaps = 0;
184		}
185
186		*p = end;
187	} else {
188		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
189			ceph_decode_64_safe(p, end, info->inline_version, bad);
190			ceph_decode_32_safe(p, end, info->inline_len, bad);
191			ceph_decode_need(p, end, info->inline_len, bad);
192			info->inline_data = *p;
193			*p += info->inline_len;
194		} else
195			info->inline_version = CEPH_INLINE_NONE;
196
197		if (features & CEPH_FEATURE_MDS_QUOTA) {
198			err = parse_reply_info_quota(p, end, info);
199			if (err < 0)
200				goto out_bad;
201		} else {
202			info->max_bytes = 0;
203			info->max_files = 0;
204		}
205
206		info->pool_ns_len = 0;
207		info->pool_ns_data = NULL;
208		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
209			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
210			if (info->pool_ns_len > 0) {
211				ceph_decode_need(p, end, info->pool_ns_len, bad);
212				info->pool_ns_data = *p;
213				*p += info->pool_ns_len;
214			}
215		}
216
217		if (features & CEPH_FEATURE_FS_BTIME) {
218			ceph_decode_need(p, end, sizeof(info->btime), bad);
219			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
220			ceph_decode_64_safe(p, end, info->change_attr, bad);
221		}
222
223		info->dir_pin = -ENODATA;
224		/* info->snap_btime and info->rsnaps remain zero */
225	}
226	return 0;
227bad:
228	err = -EIO;
229out_bad:
230	return err;
231}
232
233static int parse_reply_info_dir(void **p, void *end,
234				struct ceph_mds_reply_dirfrag **dirfrag,
235				u64 features)
236{
237	if (features == (u64)-1) {
238		u8 struct_v, struct_compat;
239		u32 struct_len;
240		ceph_decode_8_safe(p, end, struct_v, bad);
241		ceph_decode_8_safe(p, end, struct_compat, bad);
242		/* struct_v is expected to be >= 1. we only understand
243		 * encoding whose struct_compat == 1. */
244		if (!struct_v || struct_compat != 1)
245			goto bad;
246		ceph_decode_32_safe(p, end, struct_len, bad);
247		ceph_decode_need(p, end, struct_len, bad);
248		end = *p + struct_len;
249	}
250
251	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
252	*dirfrag = *p;
253	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
254	if (unlikely(*p > end))
255		goto bad;
256	if (features == (u64)-1)
257		*p = end;
258	return 0;
259bad:
260	return -EIO;
261}
262
263static int parse_reply_info_lease(void **p, void *end,
264				  struct ceph_mds_reply_lease **lease,
265				  u64 features)
266{
267	if (features == (u64)-1) {
268		u8 struct_v, struct_compat;
269		u32 struct_len;
270		ceph_decode_8_safe(p, end, struct_v, bad);
271		ceph_decode_8_safe(p, end, struct_compat, bad);
272		/* struct_v is expected to be >= 1. we only understand
273		 * encoding whose struct_compat == 1. */
274		if (!struct_v || struct_compat != 1)
275			goto bad;
276		ceph_decode_32_safe(p, end, struct_len, bad);
277		ceph_decode_need(p, end, struct_len, bad);
278		end = *p + struct_len;
279	}
280
281	ceph_decode_need(p, end, sizeof(**lease), bad);
282	*lease = *p;
283	*p += sizeof(**lease);
284	if (features == (u64)-1)
285		*p = end;
286	return 0;
287bad:
288	return -EIO;
289}
290
291/*
292 * parse a normal reply, which may contain a (dir+)dentry and/or a
293 * target inode.
294 */
295static int parse_reply_info_trace(void **p, void *end,
296				  struct ceph_mds_reply_info_parsed *info,
297				  u64 features)
298{
299	int err;
300
301	if (info->head->is_dentry) {
302		err = parse_reply_info_in(p, end, &info->diri, features);
303		if (err < 0)
304			goto out_bad;
305
306		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
307		if (err < 0)
308			goto out_bad;
309
310		ceph_decode_32_safe(p, end, info->dname_len, bad);
311		ceph_decode_need(p, end, info->dname_len, bad);
312		info->dname = *p;
313		*p += info->dname_len;
314
315		err = parse_reply_info_lease(p, end, &info->dlease, features);
316		if (err < 0)
317			goto out_bad;
318	}
319
320	if (info->head->is_target) {
321		err = parse_reply_info_in(p, end, &info->targeti, features);
322		if (err < 0)
323			goto out_bad;
324	}
325
326	if (unlikely(*p != end))
327		goto bad;
328	return 0;
329
330bad:
331	err = -EIO;
332out_bad:
333	pr_err("problem parsing mds trace %d\n", err);
334	return err;
335}
336
337/*
338 * parse readdir results
339 */
340static int parse_reply_info_readdir(void **p, void *end,
341				struct ceph_mds_reply_info_parsed *info,
342				u64 features)
343{
344	u32 num, i = 0;
345	int err;
346
347	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
348	if (err < 0)
349		goto out_bad;
350
351	ceph_decode_need(p, end, sizeof(num) + 2, bad);
352	num = ceph_decode_32(p);
353	{
354		u16 flags = ceph_decode_16(p);
355		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
356		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
357		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
358		info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
359	}
360	if (num == 0)
361		goto done;
362
363	BUG_ON(!info->dir_entries);
364	if ((unsigned long)(info->dir_entries + num) >
365	    (unsigned long)info->dir_entries + info->dir_buf_size) {
366		pr_err("dir contents are larger than expected\n");
367		WARN_ON(1);
368		goto bad;
369	}
370
371	info->dir_nr = num;
372	while (num) {
373		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
374		/* dentry */
375		ceph_decode_32_safe(p, end, rde->name_len, bad);
376		ceph_decode_need(p, end, rde->name_len, bad);
377		rde->name = *p;
378		*p += rde->name_len;
379		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
380
381		/* dentry lease */
382		err = parse_reply_info_lease(p, end, &rde->lease, features);
383		if (err)
384			goto out_bad;
385		/* inode */
386		err = parse_reply_info_in(p, end, &rde->inode, features);
387		if (err < 0)
388			goto out_bad;
389		/* ceph_readdir_prepopulate() will update it */
390		rde->offset = 0;
391		i++;
392		num--;
393	}
394
395done:
396	/* Skip over any unrecognized fields */
397	*p = end;
398	return 0;
399
400bad:
401	err = -EIO;
402out_bad:
403	pr_err("problem parsing dir contents %d\n", err);
404	return err;
405}
406
407/*
408 * parse fcntl F_GETLK results
409 */
410static int parse_reply_info_filelock(void **p, void *end,
411				     struct ceph_mds_reply_info_parsed *info,
412				     u64 features)
413{
414	if (*p + sizeof(*info->filelock_reply) > end)
415		goto bad;
416
417	info->filelock_reply = *p;
418
419	/* Skip over any unrecognized fields */
420	*p = end;
421	return 0;
422bad:
423	return -EIO;
424}
425
426
427#if BITS_PER_LONG == 64
428
429#define DELEGATED_INO_AVAILABLE		xa_mk_value(1)
430
431static int ceph_parse_deleg_inos(void **p, void *end,
432				 struct ceph_mds_session *s)
433{
434	u32 sets;
435
436	ceph_decode_32_safe(p, end, sets, bad);
437	dout("got %u sets of delegated inodes\n", sets);
438	while (sets--) {
439		u64 start, len, ino;
440
441		ceph_decode_64_safe(p, end, start, bad);
442		ceph_decode_64_safe(p, end, len, bad);
443
444		/* Don't accept a delegation of system inodes */
445		if (start < CEPH_INO_SYSTEM_BASE) {
446			pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
447					start, len);
448			continue;
449		}
450		while (len--) {
451			int err = xa_insert(&s->s_delegated_inos, ino = start++,
452					    DELEGATED_INO_AVAILABLE,
453					    GFP_KERNEL);
454			if (!err) {
455				dout("added delegated inode 0x%llx\n",
456				     start - 1);
457			} else if (err == -EBUSY) {
458				pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
459					start - 1);
460			} else {
461				return err;
462			}
463		}
464	}
465	return 0;
466bad:
467	return -EIO;
468}
469
470u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
471{
472	unsigned long ino;
473	void *val;
474
475	xa_for_each(&s->s_delegated_inos, ino, val) {
476		val = xa_erase(&s->s_delegated_inos, ino);
477		if (val == DELEGATED_INO_AVAILABLE)
478			return ino;
479	}
480	return 0;
481}
482
483int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
484{
485	return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
486			 GFP_KERNEL);
487}
488#else /* BITS_PER_LONG == 64 */
489/*
490 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
491 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
492 * and bottom words?
493 */
494static int ceph_parse_deleg_inos(void **p, void *end,
495				 struct ceph_mds_session *s)
496{
497	u32 sets;
498
499	ceph_decode_32_safe(p, end, sets, bad);
500	if (sets)
501		ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
502	return 0;
503bad:
504	return -EIO;
505}
506
507u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
508{
509	return 0;
510}
511
512int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
513{
514	return 0;
515}
516#endif /* BITS_PER_LONG == 64 */
517
518/*
519 * parse create results
520 */
521static int parse_reply_info_create(void **p, void *end,
522				  struct ceph_mds_reply_info_parsed *info,
523				  u64 features, struct ceph_mds_session *s)
524{
525	int ret;
526
527	if (features == (u64)-1 ||
528	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
529		if (*p == end) {
530			/* Malformed reply? */
531			info->has_create_ino = false;
532		} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
533			info->has_create_ino = true;
534			/* struct_v, struct_compat, and len */
535			ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
536			ceph_decode_64_safe(p, end, info->ino, bad);
537			ret = ceph_parse_deleg_inos(p, end, s);
538			if (ret)
539				return ret;
540		} else {
541			/* legacy */
542			ceph_decode_64_safe(p, end, info->ino, bad);
543			info->has_create_ino = true;
544		}
545	} else {
546		if (*p != end)
547			goto bad;
548	}
549
550	/* Skip over any unrecognized fields */
551	*p = end;
552	return 0;
553bad:
554	return -EIO;
555}
556
557/*
558 * parse extra results
559 */
560static int parse_reply_info_extra(void **p, void *end,
561				  struct ceph_mds_reply_info_parsed *info,
562				  u64 features, struct ceph_mds_session *s)
563{
564	u32 op = le32_to_cpu(info->head->op);
565
566	if (op == CEPH_MDS_OP_GETFILELOCK)
567		return parse_reply_info_filelock(p, end, info, features);
568	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
569		return parse_reply_info_readdir(p, end, info, features);
570	else if (op == CEPH_MDS_OP_CREATE)
571		return parse_reply_info_create(p, end, info, features, s);
572	else
573		return -EIO;
574}
575
576/*
577 * parse entire mds reply
578 */
579static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
580			    struct ceph_mds_reply_info_parsed *info,
581			    u64 features)
582{
583	void *p, *end;
584	u32 len;
585	int err;
586
587	info->head = msg->front.iov_base;
588	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
589	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
590
591	/* trace */
592	ceph_decode_32_safe(&p, end, len, bad);
593	if (len > 0) {
594		ceph_decode_need(&p, end, len, bad);
595		err = parse_reply_info_trace(&p, p+len, info, features);
596		if (err < 0)
597			goto out_bad;
598	}
599
600	/* extra */
601	ceph_decode_32_safe(&p, end, len, bad);
602	if (len > 0) {
603		ceph_decode_need(&p, end, len, bad);
604		err = parse_reply_info_extra(&p, p+len, info, features, s);
605		if (err < 0)
606			goto out_bad;
607	}
608
609	/* snap blob */
610	ceph_decode_32_safe(&p, end, len, bad);
611	info->snapblob_len = len;
612	info->snapblob = p;
613	p += len;
614
615	if (p != end)
616		goto bad;
617	return 0;
618
619bad:
620	err = -EIO;
621out_bad:
622	pr_err("mds parse_reply err %d\n", err);
623	return err;
624}
625
626static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
627{
628	if (!info->dir_entries)
629		return;
630	free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
631}
632
633
634/*
635 * sessions
636 */
637const char *ceph_session_state_name(int s)
638{
639	switch (s) {
640	case CEPH_MDS_SESSION_NEW: return "new";
641	case CEPH_MDS_SESSION_OPENING: return "opening";
642	case CEPH_MDS_SESSION_OPEN: return "open";
643	case CEPH_MDS_SESSION_HUNG: return "hung";
644	case CEPH_MDS_SESSION_CLOSING: return "closing";
645	case CEPH_MDS_SESSION_CLOSED: return "closed";
646	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
647	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
648	case CEPH_MDS_SESSION_REJECTED: return "rejected";
649	default: return "???";
650	}
651}
652
653struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
654{
655	if (refcount_inc_not_zero(&s->s_ref)) {
656		dout("mdsc get_session %p %d -> %d\n", s,
657		     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
658		return s;
659	} else {
660		dout("mdsc get_session %p 0 -- FAIL\n", s);
661		return NULL;
662	}
663}
664
665void ceph_put_mds_session(struct ceph_mds_session *s)
666{
667	if (IS_ERR_OR_NULL(s))
668		return;
669
670	dout("mdsc put_session %p %d -> %d\n", s,
671	     refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
672	if (refcount_dec_and_test(&s->s_ref)) {
673		if (s->s_auth.authorizer)
674			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
675		WARN_ON(mutex_is_locked(&s->s_mutex));
676		xa_destroy(&s->s_delegated_inos);
677		kfree(s);
678	}
679}
680
681/*
682 * called under mdsc->mutex
683 */
684struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
685						   int mds)
686{
687	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
688		return NULL;
689	return ceph_get_mds_session(mdsc->sessions[mds]);
690}
691
692static bool __have_session(struct ceph_mds_client *mdsc, int mds)
693{
694	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
695		return false;
696	else
697		return true;
698}
699
700static int __verify_registered_session(struct ceph_mds_client *mdsc,
701				       struct ceph_mds_session *s)
702{
703	if (s->s_mds >= mdsc->max_sessions ||
704	    mdsc->sessions[s->s_mds] != s)
705		return -ENOENT;
706	return 0;
707}
708
709/*
710 * create+register a new session for given mds.
711 * called under mdsc->mutex.
712 */
713static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
714						 int mds)
715{
716	struct ceph_mds_session *s;
717
718	if (mds >= mdsc->mdsmap->possible_max_rank)
719		return ERR_PTR(-EINVAL);
720
721	s = kzalloc(sizeof(*s), GFP_NOFS);
722	if (!s)
723		return ERR_PTR(-ENOMEM);
724
725	if (mds >= mdsc->max_sessions) {
726		int newmax = 1 << get_count_order(mds + 1);
727		struct ceph_mds_session **sa;
728
729		dout("%s: realloc to %d\n", __func__, newmax);
730		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
731		if (!sa)
732			goto fail_realloc;
733		if (mdsc->sessions) {
734			memcpy(sa, mdsc->sessions,
735			       mdsc->max_sessions * sizeof(void *));
736			kfree(mdsc->sessions);
737		}
738		mdsc->sessions = sa;
739		mdsc->max_sessions = newmax;
740	}
741
742	dout("%s: mds%d\n", __func__, mds);
743	s->s_mdsc = mdsc;
744	s->s_mds = mds;
745	s->s_state = CEPH_MDS_SESSION_NEW;
746	mutex_init(&s->s_mutex);
747
748	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
749
750	atomic_set(&s->s_cap_gen, 1);
751	s->s_cap_ttl = jiffies - 1;
752
753	spin_lock_init(&s->s_cap_lock);
754	INIT_LIST_HEAD(&s->s_caps);
755	refcount_set(&s->s_ref, 1);
756	INIT_LIST_HEAD(&s->s_waiting);
757	INIT_LIST_HEAD(&s->s_unsafe);
758	xa_init(&s->s_delegated_inos);
759	INIT_LIST_HEAD(&s->s_cap_releases);
760	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
761
762	INIT_LIST_HEAD(&s->s_cap_dirty);
763	INIT_LIST_HEAD(&s->s_cap_flushing);
764
765	mdsc->sessions[mds] = s;
766	atomic_inc(&mdsc->num_sessions);
767	refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
768
769	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
770		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
771
772	return s;
773
774fail_realloc:
775	kfree(s);
776	return ERR_PTR(-ENOMEM);
777}
778
779/*
780 * called under mdsc->mutex
781 */
782static void __unregister_session(struct ceph_mds_client *mdsc,
783			       struct ceph_mds_session *s)
784{
785	dout("__unregister_session mds%d %p\n", s->s_mds, s);
786	BUG_ON(mdsc->sessions[s->s_mds] != s);
787	mdsc->sessions[s->s_mds] = NULL;
788	ceph_con_close(&s->s_con);
789	ceph_put_mds_session(s);
790	atomic_dec(&mdsc->num_sessions);
791}
792
793/*
794 * drop session refs in request.
795 *
796 * should be last request ref, or hold mdsc->mutex
797 */
798static void put_request_session(struct ceph_mds_request *req)
799{
800	if (req->r_session) {
801		ceph_put_mds_session(req->r_session);
802		req->r_session = NULL;
803	}
804}
805
806void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
807				void (*cb)(struct ceph_mds_session *),
808				bool check_state)
809{
810	int mds;
811
812	mutex_lock(&mdsc->mutex);
813	for (mds = 0; mds < mdsc->max_sessions; ++mds) {
814		struct ceph_mds_session *s;
815
816		s = __ceph_lookup_mds_session(mdsc, mds);
817		if (!s)
818			continue;
819
820		if (check_state && !check_session_state(s)) {
821			ceph_put_mds_session(s);
822			continue;
823		}
824
825		mutex_unlock(&mdsc->mutex);
826		cb(s);
827		ceph_put_mds_session(s);
828		mutex_lock(&mdsc->mutex);
829	}
830	mutex_unlock(&mdsc->mutex);
831}
832
833void ceph_mdsc_release_request(struct kref *kref)
834{
835	struct ceph_mds_request *req = container_of(kref,
836						    struct ceph_mds_request,
837						    r_kref);
838	ceph_mdsc_release_dir_caps_no_check(req);
839	destroy_reply_info(&req->r_reply_info);
840	if (req->r_request)
841		ceph_msg_put(req->r_request);
842	if (req->r_reply)
843		ceph_msg_put(req->r_reply);
844	if (req->r_inode) {
845		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
846		iput(req->r_inode);
847	}
848	if (req->r_parent) {
849		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
850		iput(req->r_parent);
851	}
852	iput(req->r_target_inode);
853	if (req->r_dentry)
854		dput(req->r_dentry);
855	if (req->r_old_dentry)
856		dput(req->r_old_dentry);
857	if (req->r_old_dentry_dir) {
858		/*
859		 * track (and drop pins for) r_old_dentry_dir
860		 * separately, since r_old_dentry's d_parent may have
861		 * changed between the dir mutex being dropped and
862		 * this request being freed.
863		 */
864		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
865				  CEPH_CAP_PIN);
866		iput(req->r_old_dentry_dir);
867	}
868	kfree(req->r_path1);
869	kfree(req->r_path2);
870	put_cred(req->r_cred);
871	if (req->r_pagelist)
872		ceph_pagelist_release(req->r_pagelist);
873	put_request_session(req);
874	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
875	WARN_ON_ONCE(!list_empty(&req->r_wait));
876	kmem_cache_free(ceph_mds_request_cachep, req);
877}
878
879DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
880
881/*
882 * lookup session, bump ref if found.
883 *
884 * called under mdsc->mutex.
885 */
886static struct ceph_mds_request *
887lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
888{
889	struct ceph_mds_request *req;
890
891	req = lookup_request(&mdsc->request_tree, tid);
892	if (req)
893		ceph_mdsc_get_request(req);
894
895	return req;
896}
897
898/*
899 * Register an in-flight request, and assign a tid.  Link to directory
900 * are modifying (if any).
901 *
902 * Called under mdsc->mutex.
903 */
904static void __register_request(struct ceph_mds_client *mdsc,
905			       struct ceph_mds_request *req,
906			       struct inode *dir)
907{
908	int ret = 0;
909
910	req->r_tid = ++mdsc->last_tid;
911	if (req->r_num_caps) {
912		ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
913					req->r_num_caps);
914		if (ret < 0) {
915			pr_err("__register_request %p "
916			       "failed to reserve caps: %d\n", req, ret);
917			/* set req->r_err to fail early from __do_request */
918			req->r_err = ret;
919			return;
920		}
921	}
922	dout("__register_request %p tid %lld\n", req, req->r_tid);
923	ceph_mdsc_get_request(req);
924	insert_request(&mdsc->request_tree, req);
925
926	req->r_cred = get_current_cred();
927
928	if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
929		mdsc->oldest_tid = req->r_tid;
930
931	if (dir) {
932		struct ceph_inode_info *ci = ceph_inode(dir);
933
934		ihold(dir);
935		req->r_unsafe_dir = dir;
936		spin_lock(&ci->i_unsafe_lock);
937		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
938		spin_unlock(&ci->i_unsafe_lock);
939	}
940}
941
942static void __unregister_request(struct ceph_mds_client *mdsc,
943				 struct ceph_mds_request *req)
944{
945	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
946
947	/* Never leave an unregistered request on an unsafe list! */
948	list_del_init(&req->r_unsafe_item);
949
950	if (req->r_tid == mdsc->oldest_tid) {
951		struct rb_node *p = rb_next(&req->r_node);
952		mdsc->oldest_tid = 0;
953		while (p) {
954			struct ceph_mds_request *next_req =
955				rb_entry(p, struct ceph_mds_request, r_node);
956			if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
957				mdsc->oldest_tid = next_req->r_tid;
958				break;
959			}
960			p = rb_next(p);
961		}
962	}
963
964	erase_request(&mdsc->request_tree, req);
965
966	if (req->r_unsafe_dir) {
967		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
968		spin_lock(&ci->i_unsafe_lock);
969		list_del_init(&req->r_unsafe_dir_item);
970		spin_unlock(&ci->i_unsafe_lock);
971	}
972	if (req->r_target_inode &&
973	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
974		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
975		spin_lock(&ci->i_unsafe_lock);
976		list_del_init(&req->r_unsafe_target_item);
977		spin_unlock(&ci->i_unsafe_lock);
978	}
979
980	if (req->r_unsafe_dir) {
981		iput(req->r_unsafe_dir);
982		req->r_unsafe_dir = NULL;
983	}
984
985	complete_all(&req->r_safe_completion);
986
987	ceph_mdsc_put_request(req);
988}
989
990/*
991 * Walk back up the dentry tree until we hit a dentry representing a
992 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
993 * when calling this) to ensure that the objects won't disappear while we're
994 * working with them. Once we hit a candidate dentry, we attempt to take a
995 * reference to it, and return that as the result.
996 */
997static struct inode *get_nonsnap_parent(struct dentry *dentry)
998{
999	struct inode *inode = NULL;
1000
1001	while (dentry && !IS_ROOT(dentry)) {
1002		inode = d_inode_rcu(dentry);
1003		if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1004			break;
1005		dentry = dentry->d_parent;
1006	}
1007	if (inode)
1008		inode = igrab(inode);
1009	return inode;
1010}
1011
1012/*
1013 * Choose mds to send request to next.  If there is a hint set in the
1014 * request (e.g., due to a prior forward hint from the mds), use that.
1015 * Otherwise, consult frag tree and/or caps to identify the
1016 * appropriate mds.  If all else fails, choose randomly.
1017 *
1018 * Called under mdsc->mutex.
1019 */
1020static int __choose_mds(struct ceph_mds_client *mdsc,
1021			struct ceph_mds_request *req,
1022			bool *random)
1023{
1024	struct inode *inode;
1025	struct ceph_inode_info *ci;
1026	struct ceph_cap *cap;
1027	int mode = req->r_direct_mode;
1028	int mds = -1;
1029	u32 hash = req->r_direct_hash;
1030	bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1031
1032	if (random)
1033		*random = false;
1034
1035	/*
1036	 * is there a specific mds we should try?  ignore hint if we have
1037	 * no session and the mds is not up (active or recovering).
1038	 */
1039	if (req->r_resend_mds >= 0 &&
1040	    (__have_session(mdsc, req->r_resend_mds) ||
1041	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1042		dout("%s using resend_mds mds%d\n", __func__,
1043		     req->r_resend_mds);
1044		return req->r_resend_mds;
1045	}
1046
1047	if (mode == USE_RANDOM_MDS)
1048		goto random;
1049
1050	inode = NULL;
1051	if (req->r_inode) {
1052		if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1053			inode = req->r_inode;
1054			ihold(inode);
1055		} else {
1056			/* req->r_dentry is non-null for LSSNAP request */
1057			rcu_read_lock();
1058			inode = get_nonsnap_parent(req->r_dentry);
1059			rcu_read_unlock();
1060			dout("%s using snapdir's parent %p\n", __func__, inode);
1061		}
1062	} else if (req->r_dentry) {
1063		/* ignore race with rename; old or new d_parent is okay */
1064		struct dentry *parent;
1065		struct inode *dir;
1066
1067		rcu_read_lock();
1068		parent = READ_ONCE(req->r_dentry->d_parent);
1069		dir = req->r_parent ? : d_inode_rcu(parent);
1070
1071		if (!dir || dir->i_sb != mdsc->fsc->sb) {
1072			/*  not this fs or parent went negative */
1073			inode = d_inode(req->r_dentry);
1074			if (inode)
1075				ihold(inode);
1076		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
1077			/* direct snapped/virtual snapdir requests
1078			 * based on parent dir inode */
1079			inode = get_nonsnap_parent(parent);
1080			dout("%s using nonsnap parent %p\n", __func__, inode);
1081		} else {
1082			/* dentry target */
1083			inode = d_inode(req->r_dentry);
1084			if (!inode || mode == USE_AUTH_MDS) {
1085				/* dir + name */
1086				inode = igrab(dir);
1087				hash = ceph_dentry_hash(dir, req->r_dentry);
1088				is_hash = true;
1089			} else {
1090				ihold(inode);
1091			}
1092		}
1093		rcu_read_unlock();
1094	}
1095
1096	dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1097	     hash, mode);
1098	if (!inode)
1099		goto random;
1100	ci = ceph_inode(inode);
1101
1102	if (is_hash && S_ISDIR(inode->i_mode)) {
1103		struct ceph_inode_frag frag;
1104		int found;
1105
1106		ceph_choose_frag(ci, hash, &frag, &found);
1107		if (found) {
1108			if (mode == USE_ANY_MDS && frag.ndist > 0) {
1109				u8 r;
1110
1111				/* choose a random replica */
1112				get_random_bytes(&r, 1);
1113				r %= frag.ndist;
1114				mds = frag.dist[r];
1115				dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1116				     __func__, inode, ceph_vinop(inode),
1117				     frag.frag, mds, (int)r, frag.ndist);
1118				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1119				    CEPH_MDS_STATE_ACTIVE &&
1120				    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1121					goto out;
1122			}
1123
1124			/* since this file/dir wasn't known to be
1125			 * replicated, then we want to look for the
1126			 * authoritative mds. */
1127			if (frag.mds >= 0) {
1128				/* choose auth mds */
1129				mds = frag.mds;
1130				dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1131				     __func__, inode, ceph_vinop(inode),
1132				     frag.frag, mds);
1133				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1134				    CEPH_MDS_STATE_ACTIVE) {
1135					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1136								  mds))
1137						goto out;
1138				}
1139			}
1140			mode = USE_AUTH_MDS;
1141		}
1142	}
1143
1144	spin_lock(&ci->i_ceph_lock);
1145	cap = NULL;
1146	if (mode == USE_AUTH_MDS)
1147		cap = ci->i_auth_cap;
1148	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1149		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1150	if (!cap) {
1151		spin_unlock(&ci->i_ceph_lock);
1152		iput(inode);
1153		goto random;
1154	}
1155	mds = cap->session->s_mds;
1156	dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1157	     inode, ceph_vinop(inode), mds,
1158	     cap == ci->i_auth_cap ? "auth " : "", cap);
1159	spin_unlock(&ci->i_ceph_lock);
1160out:
1161	iput(inode);
1162	return mds;
1163
1164random:
1165	if (random)
1166		*random = true;
1167
1168	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1169	dout("%s chose random mds%d\n", __func__, mds);
1170	return mds;
1171}
1172
1173
1174/*
1175 * session messages
1176 */
1177struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1178{
1179	struct ceph_msg *msg;
1180	struct ceph_mds_session_head *h;
1181
1182	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1183			   false);
1184	if (!msg) {
1185		pr_err("ENOMEM creating session %s msg\n",
1186		       ceph_session_op_name(op));
1187		return NULL;
1188	}
1189	h = msg->front.iov_base;
1190	h->op = cpu_to_le32(op);
1191	h->seq = cpu_to_le64(seq);
1192
1193	return msg;
1194}
1195
1196static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1197#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1198static int encode_supported_features(void **p, void *end)
1199{
1200	static const size_t count = ARRAY_SIZE(feature_bits);
1201
1202	if (count > 0) {
1203		size_t i;
1204		size_t size = FEATURE_BYTES(count);
1205
1206		if (WARN_ON_ONCE(*p + 4 + size > end))
1207			return -ERANGE;
1208
1209		ceph_encode_32(p, size);
1210		memset(*p, 0, size);
1211		for (i = 0; i < count; i++)
1212			((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
1213		*p += size;
1214	} else {
1215		if (WARN_ON_ONCE(*p + 4 > end))
1216			return -ERANGE;
1217
1218		ceph_encode_32(p, 0);
1219	}
1220
1221	return 0;
1222}
1223
1224static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1225#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1226static int encode_metric_spec(void **p, void *end)
1227{
1228	static const size_t count = ARRAY_SIZE(metric_bits);
1229
1230	/* header */
1231	if (WARN_ON_ONCE(*p + 2 > end))
1232		return -ERANGE;
1233
1234	ceph_encode_8(p, 1); /* version */
1235	ceph_encode_8(p, 1); /* compat */
1236
1237	if (count > 0) {
1238		size_t i;
1239		size_t size = METRIC_BYTES(count);
1240
1241		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1242			return -ERANGE;
1243
1244		/* metric spec info length */
1245		ceph_encode_32(p, 4 + size);
1246
1247		/* metric spec */
1248		ceph_encode_32(p, size);
1249		memset(*p, 0, size);
1250		for (i = 0; i < count; i++)
1251			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1252		*p += size;
1253	} else {
1254		if (WARN_ON_ONCE(*p + 4 + 4 > end))
1255			return -ERANGE;
1256
1257		/* metric spec info length */
1258		ceph_encode_32(p, 4);
1259		/* metric spec */
1260		ceph_encode_32(p, 0);
1261	}
1262
1263	return 0;
1264}
1265
1266/*
1267 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1268 * to include additional client metadata fields.
1269 */
1270static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1271{
1272	struct ceph_msg *msg;
1273	struct ceph_mds_session_head *h;
1274	int i;
1275	int extra_bytes = 0;
1276	int metadata_key_count = 0;
1277	struct ceph_options *opt = mdsc->fsc->client->options;
1278	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1279	size_t size, count;
1280	void *p, *end;
1281	int ret;
1282
1283	const char* metadata[][2] = {
1284		{"hostname", mdsc->nodename},
1285		{"kernel_version", init_utsname()->release},
1286		{"entity_id", opt->name ? : ""},
1287		{"root", fsopt->server_path ? : "/"},
1288		{NULL, NULL}
1289	};
1290
1291	/* Calculate serialized length of metadata */
1292	extra_bytes = 4;  /* map length */
1293	for (i = 0; metadata[i][0]; ++i) {
1294		extra_bytes += 8 + strlen(metadata[i][0]) +
1295			strlen(metadata[i][1]);
1296		metadata_key_count++;
1297	}
1298
1299	/* supported feature */
1300	size = 0;
1301	count = ARRAY_SIZE(feature_bits);
1302	if (count > 0)
1303		size = FEATURE_BYTES(count);
1304	extra_bytes += 4 + size;
1305
1306	/* metric spec */
1307	size = 0;
1308	count = ARRAY_SIZE(metric_bits);
1309	if (count > 0)
1310		size = METRIC_BYTES(count);
1311	extra_bytes += 2 + 4 + 4 + size;
1312
1313	/* Allocate the message */
1314	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1315			   GFP_NOFS, false);
1316	if (!msg) {
1317		pr_err("ENOMEM creating session open msg\n");
1318		return ERR_PTR(-ENOMEM);
1319	}
1320	p = msg->front.iov_base;
1321	end = p + msg->front.iov_len;
1322
1323	h = p;
1324	h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1325	h->seq = cpu_to_le64(seq);
1326
1327	/*
1328	 * Serialize client metadata into waiting buffer space, using
1329	 * the format that userspace expects for map<string, string>
1330	 *
1331	 * ClientSession messages with metadata are v4
1332	 */
1333	msg->hdr.version = cpu_to_le16(4);
1334	msg->hdr.compat_version = cpu_to_le16(1);
1335
1336	/* The write pointer, following the session_head structure */
1337	p += sizeof(*h);
1338
1339	/* Number of entries in the map */
1340	ceph_encode_32(&p, metadata_key_count);
1341
1342	/* Two length-prefixed strings for each entry in the map */
1343	for (i = 0; metadata[i][0]; ++i) {
1344		size_t const key_len = strlen(metadata[i][0]);
1345		size_t const val_len = strlen(metadata[i][1]);
1346
1347		ceph_encode_32(&p, key_len);
1348		memcpy(p, metadata[i][0], key_len);
1349		p += key_len;
1350		ceph_encode_32(&p, val_len);
1351		memcpy(p, metadata[i][1], val_len);
1352		p += val_len;
1353	}
1354
1355	ret = encode_supported_features(&p, end);
1356	if (ret) {
1357		pr_err("encode_supported_features failed!\n");
1358		ceph_msg_put(msg);
1359		return ERR_PTR(ret);
1360	}
1361
1362	ret = encode_metric_spec(&p, end);
1363	if (ret) {
1364		pr_err("encode_metric_spec failed!\n");
1365		ceph_msg_put(msg);
1366		return ERR_PTR(ret);
1367	}
1368
1369	msg->front.iov_len = p - msg->front.iov_base;
1370	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1371
1372	return msg;
1373}
1374
1375/*
1376 * send session open request.
1377 *
1378 * called under mdsc->mutex
1379 */
1380static int __open_session(struct ceph_mds_client *mdsc,
1381			  struct ceph_mds_session *session)
1382{
1383	struct ceph_msg *msg;
1384	int mstate;
1385	int mds = session->s_mds;
1386
1387	/* wait for mds to go active? */
1388	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1389	dout("open_session to mds%d (%s)\n", mds,
1390	     ceph_mds_state_name(mstate));
1391	session->s_state = CEPH_MDS_SESSION_OPENING;
1392	session->s_renew_requested = jiffies;
1393
1394	/* send connect message */
1395	msg = create_session_open_msg(mdsc, session->s_seq);
1396	if (IS_ERR(msg))
1397		return PTR_ERR(msg);
1398	ceph_con_send(&session->s_con, msg);
1399	return 0;
1400}
1401
1402/*
1403 * open sessions for any export targets for the given mds
1404 *
1405 * called under mdsc->mutex
1406 */
1407static struct ceph_mds_session *
1408__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1409{
1410	struct ceph_mds_session *session;
1411	int ret;
1412
1413	session = __ceph_lookup_mds_session(mdsc, target);
1414	if (!session) {
1415		session = register_session(mdsc, target);
1416		if (IS_ERR(session))
1417			return session;
1418	}
1419	if (session->s_state == CEPH_MDS_SESSION_NEW ||
1420	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
1421		ret = __open_session(mdsc, session);
1422		if (ret)
1423			return ERR_PTR(ret);
1424	}
1425
1426	return session;
1427}
1428
1429struct ceph_mds_session *
1430ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1431{
1432	struct ceph_mds_session *session;
1433
1434	dout("open_export_target_session to mds%d\n", target);
1435
1436	mutex_lock(&mdsc->mutex);
1437	session = __open_export_target_session(mdsc, target);
1438	mutex_unlock(&mdsc->mutex);
1439
1440	return session;
1441}
1442
1443static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1444					  struct ceph_mds_session *session)
1445{
1446	struct ceph_mds_info *mi;
1447	struct ceph_mds_session *ts;
1448	int i, mds = session->s_mds;
1449
1450	if (mds >= mdsc->mdsmap->possible_max_rank)
1451		return;
1452
1453	mi = &mdsc->mdsmap->m_info[mds];
1454	dout("open_export_target_sessions for mds%d (%d targets)\n",
1455	     session->s_mds, mi->num_export_targets);
1456
1457	for (i = 0; i < mi->num_export_targets; i++) {
1458		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1459		ceph_put_mds_session(ts);
1460	}
1461}
1462
1463void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1464					   struct ceph_mds_session *session)
1465{
1466	mutex_lock(&mdsc->mutex);
1467	__open_export_target_sessions(mdsc, session);
1468	mutex_unlock(&mdsc->mutex);
1469}
1470
1471/*
1472 * session caps
1473 */
1474
1475static void detach_cap_releases(struct ceph_mds_session *session,
1476				struct list_head *target)
1477{
1478	lockdep_assert_held(&session->s_cap_lock);
1479
1480	list_splice_init(&session->s_cap_releases, target);
1481	session->s_num_cap_releases = 0;
1482	dout("dispose_cap_releases mds%d\n", session->s_mds);
1483}
1484
1485static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1486				 struct list_head *dispose)
1487{
1488	while (!list_empty(dispose)) {
1489		struct ceph_cap *cap;
1490		/* zero out the in-progress message */
1491		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1492		list_del(&cap->session_caps);
1493		ceph_put_cap(mdsc, cap);
1494	}
1495}
1496
1497static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1498				     struct ceph_mds_session *session)
1499{
1500	struct ceph_mds_request *req;
1501	struct rb_node *p;
1502	struct ceph_inode_info *ci;
1503
1504	dout("cleanup_session_requests mds%d\n", session->s_mds);
1505	mutex_lock(&mdsc->mutex);
1506	while (!list_empty(&session->s_unsafe)) {
1507		req = list_first_entry(&session->s_unsafe,
1508				       struct ceph_mds_request, r_unsafe_item);
1509		pr_warn_ratelimited(" dropping unsafe request %llu\n",
1510				    req->r_tid);
1511		if (req->r_target_inode) {
1512			/* dropping unsafe change of inode's attributes */
1513			ci = ceph_inode(req->r_target_inode);
1514			errseq_set(&ci->i_meta_err, -EIO);
1515		}
1516		if (req->r_unsafe_dir) {
1517			/* dropping unsafe directory operation */
1518			ci = ceph_inode(req->r_unsafe_dir);
1519			errseq_set(&ci->i_meta_err, -EIO);
1520		}
1521		__unregister_request(mdsc, req);
1522	}
1523	/* zero r_attempts, so kick_requests() will re-send requests */
1524	p = rb_first(&mdsc->request_tree);
1525	while (p) {
1526		req = rb_entry(p, struct ceph_mds_request, r_node);
1527		p = rb_next(p);
1528		if (req->r_session &&
1529		    req->r_session->s_mds == session->s_mds)
1530			req->r_attempts = 0;
1531	}
1532	mutex_unlock(&mdsc->mutex);
1533}
1534
1535/*
1536 * Helper to safely iterate over all caps associated with a session, with
1537 * special care taken to handle a racing __ceph_remove_cap().
1538 *
1539 * Caller must hold session s_mutex.
1540 */
1541int ceph_iterate_session_caps(struct ceph_mds_session *session,
1542			      int (*cb)(struct inode *, struct ceph_cap *,
1543					void *), void *arg)
1544{
1545	struct list_head *p;
1546	struct ceph_cap *cap;
1547	struct inode *inode, *last_inode = NULL;
1548	struct ceph_cap *old_cap = NULL;
1549	int ret;
1550
1551	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1552	spin_lock(&session->s_cap_lock);
1553	p = session->s_caps.next;
1554	while (p != &session->s_caps) {
1555		cap = list_entry(p, struct ceph_cap, session_caps);
1556		inode = igrab(&cap->ci->vfs_inode);
1557		if (!inode) {
1558			p = p->next;
1559			continue;
1560		}
1561		session->s_cap_iterator = cap;
1562		spin_unlock(&session->s_cap_lock);
1563
1564		if (last_inode) {
1565			iput(last_inode);
1566			last_inode = NULL;
1567		}
1568		if (old_cap) {
1569			ceph_put_cap(session->s_mdsc, old_cap);
1570			old_cap = NULL;
1571		}
1572
1573		ret = cb(inode, cap, arg);
1574		last_inode = inode;
1575
1576		spin_lock(&session->s_cap_lock);
1577		p = p->next;
1578		if (!cap->ci) {
1579			dout("iterate_session_caps  finishing cap %p removal\n",
1580			     cap);
1581			BUG_ON(cap->session != session);
1582			cap->session = NULL;
1583			list_del_init(&cap->session_caps);
1584			session->s_nr_caps--;
1585			atomic64_dec(&session->s_mdsc->metric.total_caps);
1586			if (cap->queue_release)
1587				__ceph_queue_cap_release(session, cap);
1588			else
1589				old_cap = cap;  /* put_cap it w/o locks held */
1590		}
1591		if (ret < 0)
1592			goto out;
1593	}
1594	ret = 0;
1595out:
1596	session->s_cap_iterator = NULL;
1597	spin_unlock(&session->s_cap_lock);
1598
1599	iput(last_inode);
1600	if (old_cap)
1601		ceph_put_cap(session->s_mdsc, old_cap);
1602
1603	return ret;
1604}
1605
1606static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1607				  void *arg)
1608{
1609	struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1610	struct ceph_inode_info *ci = ceph_inode(inode);
1611	LIST_HEAD(to_remove);
1612	bool dirty_dropped = false;
1613	bool invalidate = false;
1614
1615	dout("removing cap %p, ci is %p, inode is %p\n",
1616	     cap, ci, &ci->vfs_inode);
1617	spin_lock(&ci->i_ceph_lock);
1618	__ceph_remove_cap(cap, false);
1619	if (!ci->i_auth_cap) {
1620		struct ceph_cap_flush *cf;
1621		struct ceph_mds_client *mdsc = fsc->mdsc;
1622
1623		if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
1624			if (inode->i_data.nrpages > 0)
1625				invalidate = true;
1626			if (ci->i_wrbuffer_ref > 0)
1627				mapping_set_error(&inode->i_data, -EIO);
1628		}
1629
1630		while (!list_empty(&ci->i_cap_flush_list)) {
1631			cf = list_first_entry(&ci->i_cap_flush_list,
1632					      struct ceph_cap_flush, i_list);
1633			list_move(&cf->i_list, &to_remove);
1634		}
1635
1636		spin_lock(&mdsc->cap_dirty_lock);
1637
1638		list_for_each_entry(cf, &to_remove, i_list)
1639			list_del_init(&cf->g_list);
1640
1641		if (!list_empty(&ci->i_dirty_item)) {
1642			pr_warn_ratelimited(
1643				" dropping dirty %s state for %p %lld\n",
1644				ceph_cap_string(ci->i_dirty_caps),
1645				inode, ceph_ino(inode));
1646			ci->i_dirty_caps = 0;
1647			list_del_init(&ci->i_dirty_item);
1648			dirty_dropped = true;
1649		}
1650		if (!list_empty(&ci->i_flushing_item)) {
1651			pr_warn_ratelimited(
1652				" dropping dirty+flushing %s state for %p %lld\n",
1653				ceph_cap_string(ci->i_flushing_caps),
1654				inode, ceph_ino(inode));
1655			ci->i_flushing_caps = 0;
1656			list_del_init(&ci->i_flushing_item);
1657			mdsc->num_cap_flushing--;
1658			dirty_dropped = true;
1659		}
1660		spin_unlock(&mdsc->cap_dirty_lock);
1661
1662		if (dirty_dropped) {
1663			errseq_set(&ci->i_meta_err, -EIO);
1664
1665			if (ci->i_wrbuffer_ref_head == 0 &&
1666			    ci->i_wr_ref == 0 &&
1667			    ci->i_dirty_caps == 0 &&
1668			    ci->i_flushing_caps == 0) {
1669				ceph_put_snap_context(ci->i_head_snapc);
1670				ci->i_head_snapc = NULL;
1671			}
1672		}
1673
1674		if (atomic_read(&ci->i_filelock_ref) > 0) {
1675			/* make further file lock syscall return -EIO */
1676			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1677			pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1678					    inode, ceph_ino(inode));
1679		}
1680
1681		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1682			list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1683			ci->i_prealloc_cap_flush = NULL;
1684		}
1685	}
1686	spin_unlock(&ci->i_ceph_lock);
1687	while (!list_empty(&to_remove)) {
1688		struct ceph_cap_flush *cf;
1689		cf = list_first_entry(&to_remove,
1690				      struct ceph_cap_flush, i_list);
1691		list_del_init(&cf->i_list);
1692		if (!cf->is_capsnap)
1693			ceph_free_cap_flush(cf);
1694	}
1695
1696	wake_up_all(&ci->i_cap_wq);
1697	if (invalidate)
1698		ceph_queue_invalidate(inode);
1699	if (dirty_dropped)
1700		iput(inode);
1701	return 0;
1702}
1703
1704/*
1705 * caller must hold session s_mutex
1706 */
1707static void remove_session_caps(struct ceph_mds_session *session)
1708{
1709	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1710	struct super_block *sb = fsc->sb;
1711	LIST_HEAD(dispose);
1712
1713	dout("remove_session_caps on %p\n", session);
1714	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1715
1716	wake_up_all(&fsc->mdsc->cap_flushing_wq);
1717
1718	spin_lock(&session->s_cap_lock);
1719	if (session->s_nr_caps > 0) {
1720		struct inode *inode;
1721		struct ceph_cap *cap, *prev = NULL;
1722		struct ceph_vino vino;
1723		/*
1724		 * iterate_session_caps() skips inodes that are being
1725		 * deleted, we need to wait until deletions are complete.
1726		 * __wait_on_freeing_inode() is designed for the job,
1727		 * but it is not exported, so use lookup inode function
1728		 * to access it.
1729		 */
1730		while (!list_empty(&session->s_caps)) {
1731			cap = list_entry(session->s_caps.next,
1732					 struct ceph_cap, session_caps);
1733			if (cap == prev)
1734				break;
1735			prev = cap;
1736			vino = cap->ci->i_vino;
1737			spin_unlock(&session->s_cap_lock);
1738
1739			inode = ceph_find_inode(sb, vino);
1740			iput(inode);
1741
1742			spin_lock(&session->s_cap_lock);
1743		}
1744	}
1745
1746	// drop cap expires and unlock s_cap_lock
1747	detach_cap_releases(session, &dispose);
1748
1749	BUG_ON(session->s_nr_caps > 0);
1750	BUG_ON(!list_empty(&session->s_cap_flushing));
1751	spin_unlock(&session->s_cap_lock);
1752	dispose_cap_releases(session->s_mdsc, &dispose);
1753}
1754
1755enum {
1756	RECONNECT,
1757	RENEWCAPS,
1758	FORCE_RO,
1759};
1760
1761/*
1762 * wake up any threads waiting on this session's caps.  if the cap is
1763 * old (didn't get renewed on the client reconnect), remove it now.
1764 *
1765 * caller must hold s_mutex.
1766 */
1767static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1768			      void *arg)
1769{
1770	struct ceph_inode_info *ci = ceph_inode(inode);
1771	unsigned long ev = (unsigned long)arg;
1772
1773	if (ev == RECONNECT) {
1774		spin_lock(&ci->i_ceph_lock);
1775		ci->i_wanted_max_size = 0;
1776		ci->i_requested_max_size = 0;
1777		spin_unlock(&ci->i_ceph_lock);
1778	} else if (ev == RENEWCAPS) {
1779		if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) {
1780			/* mds did not re-issue stale cap */
1781			spin_lock(&ci->i_ceph_lock);
1782			cap->issued = cap->implemented = CEPH_CAP_PIN;
1783			spin_unlock(&ci->i_ceph_lock);
1784		}
1785	} else if (ev == FORCE_RO) {
1786	}
1787	wake_up_all(&ci->i_cap_wq);
1788	return 0;
1789}
1790
1791static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1792{
1793	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1794	ceph_iterate_session_caps(session, wake_up_session_cb,
1795				  (void *)(unsigned long)ev);
1796}
1797
1798/*
1799 * Send periodic message to MDS renewing all currently held caps.  The
1800 * ack will reset the expiration for all caps from this session.
1801 *
1802 * caller holds s_mutex
1803 */
1804static int send_renew_caps(struct ceph_mds_client *mdsc,
1805			   struct ceph_mds_session *session)
1806{
1807	struct ceph_msg *msg;
1808	int state;
1809
1810	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1811	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1812		pr_info("mds%d caps stale\n", session->s_mds);
1813	session->s_renew_requested = jiffies;
1814
1815	/* do not try to renew caps until a recovering mds has reconnected
1816	 * with its clients. */
1817	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1818	if (state < CEPH_MDS_STATE_RECONNECT) {
1819		dout("send_renew_caps ignoring mds%d (%s)\n",
1820		     session->s_mds, ceph_mds_state_name(state));
1821		return 0;
1822	}
1823
1824	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1825		ceph_mds_state_name(state));
1826	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1827				      ++session->s_renew_seq);
1828	if (!msg)
1829		return -ENOMEM;
1830	ceph_con_send(&session->s_con, msg);
1831	return 0;
1832}
1833
1834static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1835			     struct ceph_mds_session *session, u64 seq)
1836{
1837	struct ceph_msg *msg;
1838
1839	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1840	     session->s_mds, ceph_session_state_name(session->s_state), seq);
1841	msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1842	if (!msg)
1843		return -ENOMEM;
1844	ceph_con_send(&session->s_con, msg);
1845	return 0;
1846}
1847
1848
1849/*
1850 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1851 *
1852 * Called under session->s_mutex
1853 */
1854static void renewed_caps(struct ceph_mds_client *mdsc,
1855			 struct ceph_mds_session *session, int is_renew)
1856{
1857	int was_stale;
1858	int wake = 0;
1859
1860	spin_lock(&session->s_cap_lock);
1861	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1862
1863	session->s_cap_ttl = session->s_renew_requested +
1864		mdsc->mdsmap->m_session_timeout*HZ;
1865
1866	if (was_stale) {
1867		if (time_before(jiffies, session->s_cap_ttl)) {
1868			pr_info("mds%d caps renewed\n", session->s_mds);
1869			wake = 1;
1870		} else {
1871			pr_info("mds%d caps still stale\n", session->s_mds);
1872		}
1873	}
1874	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1875	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1876	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1877	spin_unlock(&session->s_cap_lock);
1878
1879	if (wake)
1880		wake_up_session_caps(session, RENEWCAPS);
1881}
1882
1883/*
1884 * send a session close request
1885 */
1886static int request_close_session(struct ceph_mds_session *session)
1887{
1888	struct ceph_msg *msg;
1889
1890	dout("request_close_session mds%d state %s seq %lld\n",
1891	     session->s_mds, ceph_session_state_name(session->s_state),
1892	     session->s_seq);
1893	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
1894				      session->s_seq);
1895	if (!msg)
1896		return -ENOMEM;
1897	ceph_con_send(&session->s_con, msg);
1898	return 1;
1899}
1900
1901/*
1902 * Called with s_mutex held.
1903 */
1904static int __close_session(struct ceph_mds_client *mdsc,
1905			 struct ceph_mds_session *session)
1906{
1907	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1908		return 0;
1909	session->s_state = CEPH_MDS_SESSION_CLOSING;
1910	return request_close_session(session);
1911}
1912
1913static bool drop_negative_children(struct dentry *dentry)
1914{
1915	struct dentry *child;
1916	bool all_negative = true;
1917
1918	if (!d_is_dir(dentry))
1919		goto out;
1920
1921	spin_lock(&dentry->d_lock);
1922	list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1923		if (d_really_is_positive(child)) {
1924			all_negative = false;
1925			break;
1926		}
1927	}
1928	spin_unlock(&dentry->d_lock);
1929
1930	if (all_negative)
1931		shrink_dcache_parent(dentry);
1932out:
1933	return all_negative;
1934}
1935
1936/*
1937 * Trim old(er) caps.
1938 *
1939 * Because we can't cache an inode without one or more caps, we do
1940 * this indirectly: if a cap is unused, we prune its aliases, at which
1941 * point the inode will hopefully get dropped to.
1942 *
1943 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1944 * memory pressure from the MDS, though, so it needn't be perfect.
1945 */
1946static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1947{
1948	int *remaining = arg;
1949	struct ceph_inode_info *ci = ceph_inode(inode);
1950	int used, wanted, oissued, mine;
1951
1952	if (*remaining <= 0)
1953		return -1;
1954
1955	spin_lock(&ci->i_ceph_lock);
1956	mine = cap->issued | cap->implemented;
1957	used = __ceph_caps_used(ci);
1958	wanted = __ceph_caps_file_wanted(ci);
1959	oissued = __ceph_caps_issued_other(ci, cap);
1960
1961	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1962	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1963	     ceph_cap_string(used), ceph_cap_string(wanted));
1964	if (cap == ci->i_auth_cap) {
1965		if (ci->i_dirty_caps || ci->i_flushing_caps ||
1966		    !list_empty(&ci->i_cap_snaps))
1967			goto out;
1968		if ((used | wanted) & CEPH_CAP_ANY_WR)
1969			goto out;
1970		/* Note: it's possible that i_filelock_ref becomes non-zero
1971		 * after dropping auth caps. It doesn't hurt because reply
1972		 * of lock mds request will re-add auth caps. */
1973		if (atomic_read(&ci->i_filelock_ref) > 0)
1974			goto out;
1975	}
1976	/* The inode has cached pages, but it's no longer used.
1977	 * we can safely drop it */
1978	if (S_ISREG(inode->i_mode) &&
1979	    wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1980	    !(oissued & CEPH_CAP_FILE_CACHE)) {
1981	  used = 0;
1982	  oissued = 0;
1983	}
1984	if ((used | wanted) & ~oissued & mine)
1985		goto out;   /* we need these caps */
1986
1987	if (oissued) {
1988		/* we aren't the only cap.. just remove us */
1989		__ceph_remove_cap(cap, true);
1990		(*remaining)--;
1991	} else {
1992		struct dentry *dentry;
1993		/* try dropping referring dentries */
1994		spin_unlock(&ci->i_ceph_lock);
1995		dentry = d_find_any_alias(inode);
1996		if (dentry && drop_negative_children(dentry)) {
1997			int count;
1998			dput(dentry);
1999			d_prune_aliases(inode);
2000			count = atomic_read(&inode->i_count);
2001			if (count == 1)
2002				(*remaining)--;
2003			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2004			     inode, cap, count);
2005		} else {
2006			dput(dentry);
2007		}
2008		return 0;
2009	}
2010
2011out:
2012	spin_unlock(&ci->i_ceph_lock);
2013	return 0;
2014}
2015
2016/*
2017 * Trim session cap count down to some max number.
2018 */
2019int ceph_trim_caps(struct ceph_mds_client *mdsc,
2020		   struct ceph_mds_session *session,
2021		   int max_caps)
2022{
2023	int trim_caps = session->s_nr_caps - max_caps;
2024
2025	dout("trim_caps mds%d start: %d / %d, trim %d\n",
2026	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2027	if (trim_caps > 0) {
2028		int remaining = trim_caps;
2029
2030		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2031		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2032		     session->s_mds, session->s_nr_caps, max_caps,
2033			trim_caps - remaining);
2034	}
2035
2036	ceph_flush_cap_releases(mdsc, session);
2037	return 0;
2038}
2039
2040static int check_caps_flush(struct ceph_mds_client *mdsc,
2041			    u64 want_flush_tid)
2042{
2043	int ret = 1;
2044
2045	spin_lock(&mdsc->cap_dirty_lock);
2046	if (!list_empty(&mdsc->cap_flush_list)) {
2047		struct ceph_cap_flush *cf =
2048			list_first_entry(&mdsc->cap_flush_list,
2049					 struct ceph_cap_flush, g_list);
2050		if (cf->tid <= want_flush_tid) {
2051			dout("check_caps_flush still flushing tid "
2052			     "%llu <= %llu\n", cf->tid, want_flush_tid);
2053			ret = 0;
2054		}
2055	}
2056	spin_unlock(&mdsc->cap_dirty_lock);
2057	return ret;
2058}
2059
2060/*
2061 * flush all dirty inode data to disk.
2062 *
2063 * returns true if we've flushed through want_flush_tid
2064 */
2065static void wait_caps_flush(struct ceph_mds_client *mdsc,
2066			    u64 want_flush_tid)
2067{
2068	dout("check_caps_flush want %llu\n", want_flush_tid);
2069
2070	wait_event(mdsc->cap_flushing_wq,
2071		   check_caps_flush(mdsc, want_flush_tid));
2072
2073	dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2074}
2075
2076/*
2077 * called under s_mutex
2078 */
2079static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2080				   struct ceph_mds_session *session)
2081{
2082	struct ceph_msg *msg = NULL;
2083	struct ceph_mds_cap_release *head;
2084	struct ceph_mds_cap_item *item;
2085	struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2086	struct ceph_cap *cap;
2087	LIST_HEAD(tmp_list);
2088	int num_cap_releases;
2089	__le32	barrier, *cap_barrier;
2090
2091	down_read(&osdc->lock);
2092	barrier = cpu_to_le32(osdc->epoch_barrier);
2093	up_read(&osdc->lock);
2094
2095	spin_lock(&session->s_cap_lock);
2096again:
2097	list_splice_init(&session->s_cap_releases, &tmp_list);
2098	num_cap_releases = session->s_num_cap_releases;
2099	session->s_num_cap_releases = 0;
2100	spin_unlock(&session->s_cap_lock);
2101
2102	while (!list_empty(&tmp_list)) {
2103		if (!msg) {
2104			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2105					PAGE_SIZE, GFP_NOFS, false);
2106			if (!msg)
2107				goto out_err;
2108			head = msg->front.iov_base;
2109			head->num = cpu_to_le32(0);
2110			msg->front.iov_len = sizeof(*head);
2111
2112			msg->hdr.version = cpu_to_le16(2);
2113			msg->hdr.compat_version = cpu_to_le16(1);
2114		}
2115
2116		cap = list_first_entry(&tmp_list, struct ceph_cap,
2117					session_caps);
2118		list_del(&cap->session_caps);
2119		num_cap_releases--;
2120
2121		head = msg->front.iov_base;
2122		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2123				   &head->num);
2124		item = msg->front.iov_base + msg->front.iov_len;
2125		item->ino = cpu_to_le64(cap->cap_ino);
2126		item->cap_id = cpu_to_le64(cap->cap_id);
2127		item->migrate_seq = cpu_to_le32(cap->mseq);
2128		item->seq = cpu_to_le32(cap->issue_seq);
2129		msg->front.iov_len += sizeof(*item);
2130
2131		ceph_put_cap(mdsc, cap);
2132
2133		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2134			// Append cap_barrier field
2135			cap_barrier = msg->front.iov_base + msg->front.iov_len;
2136			*cap_barrier = barrier;
2137			msg->front.iov_len += sizeof(*cap_barrier);
2138
2139			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2140			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2141			ceph_con_send(&session->s_con, msg);
2142			msg = NULL;
2143		}
2144	}
2145
2146	BUG_ON(num_cap_releases != 0);
2147
2148	spin_lock(&session->s_cap_lock);
2149	if (!list_empty(&session->s_cap_releases))
2150		goto again;
2151	spin_unlock(&session->s_cap_lock);
2152
2153	if (msg) {
2154		// Append cap_barrier field
2155		cap_barrier = msg->front.iov_base + msg->front.iov_len;
2156		*cap_barrier = barrier;
2157		msg->front.iov_len += sizeof(*cap_barrier);
2158
2159		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2160		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2161		ceph_con_send(&session->s_con, msg);
2162	}
2163	return;
2164out_err:
2165	pr_err("send_cap_releases mds%d, failed to allocate message\n",
2166		session->s_mds);
2167	spin_lock(&session->s_cap_lock);
2168	list_splice(&tmp_list, &session->s_cap_releases);
2169	session->s_num_cap_releases += num_cap_releases;
2170	spin_unlock(&session->s_cap_lock);
2171}
2172
2173static void ceph_cap_release_work(struct work_struct *work)
2174{
2175	struct ceph_mds_session *session =
2176		container_of(work, struct ceph_mds_session, s_cap_release_work);
2177
2178	mutex_lock(&session->s_mutex);
2179	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2180	    session->s_state == CEPH_MDS_SESSION_HUNG)
2181		ceph_send_cap_releases(session->s_mdsc, session);
2182	mutex_unlock(&session->s_mutex);
2183	ceph_put_mds_session(session);
2184}
2185
2186void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2187		             struct ceph_mds_session *session)
2188{
2189	if (mdsc->stopping)
2190		return;
2191
2192	ceph_get_mds_session(session);
2193	if (queue_work(mdsc->fsc->cap_wq,
2194		       &session->s_cap_release_work)) {
2195		dout("cap release work queued\n");
2196	} else {
2197		ceph_put_mds_session(session);
2198		dout("failed to queue cap release work\n");
2199	}
2200}
2201
2202/*
2203 * caller holds session->s_cap_lock
2204 */
2205void __ceph_queue_cap_release(struct ceph_mds_session *session,
2206			      struct ceph_cap *cap)
2207{
2208	list_add_tail(&cap->session_caps, &session->s_cap_releases);
2209	session->s_num_cap_releases++;
2210
2211	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2212		ceph_flush_cap_releases(session->s_mdsc, session);
2213}
2214
2215static void ceph_cap_reclaim_work(struct work_struct *work)
2216{
2217	struct ceph_mds_client *mdsc =
2218		container_of(work, struct ceph_mds_client, cap_reclaim_work);
2219	int ret = ceph_trim_dentries(mdsc);
2220	if (ret == -EAGAIN)
2221		ceph_queue_cap_reclaim_work(mdsc);
2222}
2223
2224void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2225{
2226	if (mdsc->stopping)
2227		return;
2228
2229        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2230                dout("caps reclaim work queued\n");
2231        } else {
2232                dout("failed to queue caps release work\n");
2233        }
2234}
2235
2236void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2237{
2238	int val;
2239	if (!nr)
2240		return;
2241	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2242	if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2243		atomic_set(&mdsc->cap_reclaim_pending, 0);
2244		ceph_queue_cap_reclaim_work(mdsc);
2245	}
2246}
2247
2248/*
2249 * requests
2250 */
2251
2252int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2253				    struct inode *dir)
2254{
2255	struct ceph_inode_info *ci = ceph_inode(dir);
2256	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2257	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2258	size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2259	unsigned int num_entries;
2260	int order;
2261
2262	spin_lock(&ci->i_ceph_lock);
2263	num_entries = ci->i_files + ci->i_subdirs;
2264	spin_unlock(&ci->i_ceph_lock);
2265	num_entries = max(num_entries, 1U);
2266	num_entries = min(num_entries, opt->max_readdir);
2267
2268	order = get_order(size * num_entries);
2269	while (order >= 0) {
2270		rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2271							     __GFP_NOWARN,
2272							     order);
2273		if (rinfo->dir_entries)
2274			break;
2275		order--;
2276	}
2277	if (!rinfo->dir_entries)
2278		return -ENOMEM;
2279
2280	num_entries = (PAGE_SIZE << order) / size;
2281	num_entries = min(num_entries, opt->max_readdir);
2282
2283	rinfo->dir_buf_size = PAGE_SIZE << order;
2284	req->r_num_caps = num_entries + 1;
2285	req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2286	req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2287	return 0;
2288}
2289
2290/*
2291 * Create an mds request.
2292 */
2293struct ceph_mds_request *
2294ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2295{
2296	struct ceph_mds_request *req;
2297
2298	req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2299	if (!req)
2300		return ERR_PTR(-ENOMEM);
2301
2302	mutex_init(&req->r_fill_mutex);
2303	req->r_mdsc = mdsc;
2304	req->r_started = jiffies;
2305	req->r_start_latency = ktime_get();
2306	req->r_resend_mds = -1;
2307	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2308	INIT_LIST_HEAD(&req->r_unsafe_target_item);
2309	req->r_fmode = -1;
2310	kref_init(&req->r_kref);
2311	RB_CLEAR_NODE(&req->r_node);
2312	INIT_LIST_HEAD(&req->r_wait);
2313	init_completion(&req->r_completion);
2314	init_completion(&req->r_safe_completion);
2315	INIT_LIST_HEAD(&req->r_unsafe_item);
2316
2317	ktime_get_coarse_real_ts64(&req->r_stamp);
2318
2319	req->r_op = op;
2320	req->r_direct_mode = mode;
2321	return req;
2322}
2323
2324/*
2325 * return oldest (lowest) request, tid in request tree, 0 if none.
2326 *
2327 * called under mdsc->mutex.
2328 */
2329static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2330{
2331	if (RB_EMPTY_ROOT(&mdsc->request_tree))
2332		return NULL;
2333	return rb_entry(rb_first(&mdsc->request_tree),
2334			struct ceph_mds_request, r_node);
2335}
2336
2337static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2338{
2339	return mdsc->oldest_tid;
2340}
2341
2342/*
2343 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2344 * on build_path_from_dentry in fs/cifs/dir.c.
2345 *
2346 * If @stop_on_nosnap, generate path relative to the first non-snapped
2347 * inode.
2348 *
2349 * Encode hidden .snap dirs as a double /, i.e.
2350 *   foo/.snap/bar -> foo//bar
2351 */
2352char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2353			   int stop_on_nosnap)
2354{
2355	struct dentry *temp;
2356	char *path;
2357	int pos;
2358	unsigned seq;
2359	u64 base;
2360
2361	if (!dentry)
2362		return ERR_PTR(-EINVAL);
2363
2364	path = __getname();
2365	if (!path)
2366		return ERR_PTR(-ENOMEM);
2367retry:
2368	pos = PATH_MAX - 1;
2369	path[pos] = '\0';
2370
2371	seq = read_seqbegin(&rename_lock);
2372	rcu_read_lock();
2373	temp = dentry;
2374	for (;;) {
2375		struct inode *inode;
2376
2377		spin_lock(&temp->d_lock);
2378		inode = d_inode(temp);
2379		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2380			dout("build_path path+%d: %p SNAPDIR\n",
2381			     pos, temp);
2382		} else if (stop_on_nosnap && inode && dentry != temp &&
2383			   ceph_snap(inode) == CEPH_NOSNAP) {
2384			spin_unlock(&temp->d_lock);
2385			pos++; /* get rid of any prepended '/' */
2386			break;
2387		} else {
2388			pos -= temp->d_name.len;
2389			if (pos < 0) {
2390				spin_unlock(&temp->d_lock);
2391				break;
2392			}
2393			memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2394		}
2395		spin_unlock(&temp->d_lock);
2396		temp = READ_ONCE(temp->d_parent);
2397
2398		/* Are we at the root? */
2399		if (IS_ROOT(temp))
2400			break;
2401
2402		/* Are we out of buffer? */
2403		if (--pos < 0)
2404			break;
2405
2406		path[pos] = '/';
2407	}
2408	base = ceph_ino(d_inode(temp));
2409	rcu_read_unlock();
2410
2411	if (read_seqretry(&rename_lock, seq))
2412		goto retry;
2413
2414	if (pos < 0) {
2415		/*
2416		 * A rename didn't occur, but somehow we didn't end up where
2417		 * we thought we would. Throw a warning and try again.
2418		 */
2419		pr_warn("build_path did not end path lookup where "
2420			"expected, pos is %d\n", pos);
2421		goto retry;
2422	}
2423
2424	*pbase = base;
2425	*plen = PATH_MAX - 1 - pos;
2426	dout("build_path on %p %d built %llx '%.*s'\n",
2427	     dentry, d_count(dentry), base, *plen, path + pos);
2428	return path + pos;
2429}
2430
2431static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2432			     const char **ppath, int *ppathlen, u64 *pino,
2433			     bool *pfreepath, bool parent_locked)
2434{
2435	char *path;
2436
2437	rcu_read_lock();
2438	if (!dir)
2439		dir = d_inode_rcu(dentry->d_parent);
2440	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2441		*pino = ceph_ino(dir);
2442		rcu_read_unlock();
2443		*ppath = dentry->d_name.name;
2444		*ppathlen = dentry->d_name.len;
2445		return 0;
2446	}
2447	rcu_read_unlock();
2448	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2449	if (IS_ERR(path))
2450		return PTR_ERR(path);
2451	*ppath = path;
2452	*pfreepath = true;
2453	return 0;
2454}
2455
2456static int build_inode_path(struct inode *inode,
2457			    const char **ppath, int *ppathlen, u64 *pino,
2458			    bool *pfreepath)
2459{
2460	struct dentry *dentry;
2461	char *path;
2462
2463	if (ceph_snap(inode) == CEPH_NOSNAP) {
2464		*pino = ceph_ino(inode);
2465		*ppathlen = 0;
2466		return 0;
2467	}
2468	dentry = d_find_alias(inode);
2469	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2470	dput(dentry);
2471	if (IS_ERR(path))
2472		return PTR_ERR(path);
2473	*ppath = path;
2474	*pfreepath = true;
2475	return 0;
2476}
2477
2478/*
2479 * request arguments may be specified via an inode *, a dentry *, or
2480 * an explicit ino+path.
2481 */
2482static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2483				  struct inode *rdiri, const char *rpath,
2484				  u64 rino, const char **ppath, int *pathlen,
2485				  u64 *ino, bool *freepath, bool parent_locked)
2486{
2487	int r = 0;
2488
2489	if (rinode) {
2490		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2491		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2492		     ceph_snap(rinode));
2493	} else if (rdentry) {
2494		r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2495					freepath, parent_locked);
2496		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2497		     *ppath);
2498	} else if (rpath || rino) {
2499		*ino = rino;
2500		*ppath = rpath;
2501		*pathlen = rpath ? strlen(rpath) : 0;
2502		dout(" path %.*s\n", *pathlen, rpath);
2503	}
2504
2505	return r;
2506}
2507
2508static void encode_timestamp_and_gids(void **p,
2509				      const struct ceph_mds_request *req)
2510{
2511	struct ceph_timespec ts;
2512	int i;
2513
2514	ceph_encode_timespec64(&ts, &req->r_stamp);
2515	ceph_encode_copy(p, &ts, sizeof(ts));
2516
2517	/* gid_list */
2518	ceph_encode_32(p, req->r_cred->group_info->ngroups);
2519	for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2520		ceph_encode_64(p, from_kgid(&init_user_ns,
2521					    req->r_cred->group_info->gid[i]));
2522}
2523
2524/*
2525 * called under mdsc->mutex
2526 */
2527static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2528					       struct ceph_mds_request *req,
2529					       bool drop_cap_releases)
2530{
2531	int mds = session->s_mds;
2532	struct ceph_mds_client *mdsc = session->s_mdsc;
2533	struct ceph_msg *msg;
2534	struct ceph_mds_request_head_old *head;
2535	const char *path1 = NULL;
2536	const char *path2 = NULL;
2537	u64 ino1 = 0, ino2 = 0;
2538	int pathlen1 = 0, pathlen2 = 0;
2539	bool freepath1 = false, freepath2 = false;
2540	int len;
2541	u16 releases;
2542	void *p, *end;
2543	int ret;
2544	bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2545
2546	ret = set_request_path_attr(req->r_inode, req->r_dentry,
2547			      req->r_parent, req->r_path1, req->r_ino1.ino,
2548			      &path1, &pathlen1, &ino1, &freepath1,
2549			      test_bit(CEPH_MDS_R_PARENT_LOCKED,
2550					&req->r_req_flags));
2551	if (ret < 0) {
2552		msg = ERR_PTR(ret);
2553		goto out;
2554	}
2555
2556	/* If r_old_dentry is set, then assume that its parent is locked */
2557	ret = set_request_path_attr(NULL, req->r_old_dentry,
2558			      req->r_old_dentry_dir,
2559			      req->r_path2, req->r_ino2.ino,
2560			      &path2, &pathlen2, &ino2, &freepath2, true);
2561	if (ret < 0) {
2562		msg = ERR_PTR(ret);
2563		goto out_free1;
2564	}
2565
2566	len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head);
2567	len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2568		sizeof(struct ceph_timespec);
2569	len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
2570
2571	/* calculate (max) length for cap releases */
2572	len += sizeof(struct ceph_mds_request_release) *
2573		(!!req->r_inode_drop + !!req->r_dentry_drop +
2574		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2575
2576	if (req->r_dentry_drop)
2577		len += pathlen1;
2578	if (req->r_old_dentry_drop)
2579		len += pathlen2;
2580
2581	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2582	if (!msg) {
2583		msg = ERR_PTR(-ENOMEM);
2584		goto out_free2;
2585	}
2586
2587	msg->hdr.tid = cpu_to_le64(req->r_tid);
2588
2589	/*
2590	 * The old ceph_mds_request_head didn't contain a version field, and
2591	 * one was added when we moved the message version from 3->4.
2592	 */
2593	if (legacy) {
2594		msg->hdr.version = cpu_to_le16(3);
2595		head = msg->front.iov_base;
2596		p = msg->front.iov_base + sizeof(*head);
2597	} else {
2598		struct ceph_mds_request_head *new_head = msg->front.iov_base;
2599
2600		msg->hdr.version = cpu_to_le16(4);
2601		new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
2602		head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2603		p = msg->front.iov_base + sizeof(*new_head);
2604	}
2605
2606	end = msg->front.iov_base + msg->front.iov_len;
2607
2608	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2609	head->op = cpu_to_le32(req->r_op);
2610	head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
2611						 req->r_cred->fsuid));
2612	head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
2613						 req->r_cred->fsgid));
2614	head->ino = cpu_to_le64(req->r_deleg_ino);
2615	head->args = req->r_args;
2616
2617	ceph_encode_filepath(&p, end, ino1, path1);
2618	ceph_encode_filepath(&p, end, ino2, path2);
2619
2620	/* make note of release offset, in case we need to replay */
2621	req->r_request_release_offset = p - msg->front.iov_base;
2622
2623	/* cap releases */
2624	releases = 0;
2625	if (req->r_inode_drop)
2626		releases += ceph_encode_inode_release(&p,
2627		      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2628		      mds, req->r_inode_drop, req->r_inode_unless,
2629		      req->r_op == CEPH_MDS_OP_READDIR);
2630	if (req->r_dentry_drop)
2631		releases += ceph_encode_dentry_release(&p, req->r_dentry,
2632				req->r_parent, mds, req->r_dentry_drop,
2633				req->r_dentry_unless);
2634	if (req->r_old_dentry_drop)
2635		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2636				req->r_old_dentry_dir, mds,
2637				req->r_old_dentry_drop,
2638				req->r_old_dentry_unless);
2639	if (req->r_old_inode_drop)
2640		releases += ceph_encode_inode_release(&p,
2641		      d_inode(req->r_old_dentry),
2642		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2643
2644	if (drop_cap_releases) {
2645		releases = 0;
2646		p = msg->front.iov_base + req->r_request_release_offset;
2647	}
2648
2649	head->num_releases = cpu_to_le16(releases);
2650
2651	encode_timestamp_and_gids(&p, req);
2652
2653	if (WARN_ON_ONCE(p > end)) {
2654		ceph_msg_put(msg);
2655		msg = ERR_PTR(-ERANGE);
2656		goto out_free2;
2657	}
2658
2659	msg->front.iov_len = p - msg->front.iov_base;
2660	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2661
2662	if (req->r_pagelist) {
2663		struct ceph_pagelist *pagelist = req->r_pagelist;
2664		ceph_msg_data_add_pagelist(msg, pagelist);
2665		msg->hdr.data_len = cpu_to_le32(pagelist->length);
2666	} else {
2667		msg->hdr.data_len = 0;
2668	}
2669
2670	msg->hdr.data_off = cpu_to_le16(0);
2671
2672out_free2:
2673	if (freepath2)
2674		ceph_mdsc_free_path((char *)path2, pathlen2);
2675out_free1:
2676	if (freepath1)
2677		ceph_mdsc_free_path((char *)path1, pathlen1);
2678out:
2679	return msg;
2680}
2681
2682/*
2683 * called under mdsc->mutex if error, under no mutex if
2684 * success.
2685 */
2686static void complete_request(struct ceph_mds_client *mdsc,
2687			     struct ceph_mds_request *req)
2688{
2689	req->r_end_latency = ktime_get();
2690
2691	if (req->r_callback)
2692		req->r_callback(mdsc, req);
2693	complete_all(&req->r_completion);
2694}
2695
2696static struct ceph_mds_request_head_old *
2697find_old_request_head(void *p, u64 features)
2698{
2699	bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2700	struct ceph_mds_request_head *new_head;
2701
2702	if (legacy)
2703		return (struct ceph_mds_request_head_old *)p;
2704	new_head = (struct ceph_mds_request_head *)p;
2705	return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2706}
2707
2708/*
2709 * called under mdsc->mutex
2710 */
2711static int __prepare_send_request(struct ceph_mds_session *session,
2712				  struct ceph_mds_request *req,
2713				  bool drop_cap_releases)
2714{
2715	int mds = session->s_mds;
2716	struct ceph_mds_client *mdsc = session->s_mdsc;
2717	struct ceph_mds_request_head_old *rhead;
2718	struct ceph_msg *msg;
2719	int flags = 0;
2720
2721	req->r_attempts++;
2722	if (req->r_inode) {
2723		struct ceph_cap *cap =
2724			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2725
2726		if (cap)
2727			req->r_sent_on_mseq = cap->mseq;
2728		else
2729			req->r_sent_on_mseq = -1;
2730	}
2731	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2732	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2733
2734	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2735		void *p;
2736
2737		/*
2738		 * Replay.  Do not regenerate message (and rebuild
2739		 * paths, etc.); just use the original message.
2740		 * Rebuilding paths will break for renames because
2741		 * d_move mangles the src name.
2742		 */
2743		msg = req->r_request;
2744		rhead = find_old_request_head(msg->front.iov_base,
2745					      session->s_con.peer_features);
2746
2747		flags = le32_to_cpu(rhead->flags);
2748		flags |= CEPH_MDS_FLAG_REPLAY;
2749		rhead->flags = cpu_to_le32(flags);
2750
2751		if (req->r_target_inode)
2752			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2753
2754		rhead->num_retry = req->r_attempts - 1;
2755
2756		/* remove cap/dentry releases from message */
2757		rhead->num_releases = 0;
2758
2759		p = msg->front.iov_base + req->r_request_release_offset;
2760		encode_timestamp_and_gids(&p, req);
2761
2762		msg->front.iov_len = p - msg->front.iov_base;
2763		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2764		return 0;
2765	}
2766
2767	if (req->r_request) {
2768		ceph_msg_put(req->r_request);
2769		req->r_request = NULL;
2770	}
2771	msg = create_request_message(session, req, drop_cap_releases);
2772	if (IS_ERR(msg)) {
2773		req->r_err = PTR_ERR(msg);
2774		return PTR_ERR(msg);
2775	}
2776	req->r_request = msg;
2777
2778	rhead = find_old_request_head(msg->front.iov_base,
2779				      session->s_con.peer_features);
2780	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2781	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2782		flags |= CEPH_MDS_FLAG_REPLAY;
2783	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2784		flags |= CEPH_MDS_FLAG_ASYNC;
2785	if (req->r_parent)
2786		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2787	rhead->flags = cpu_to_le32(flags);
2788	rhead->num_fwd = req->r_num_fwd;
2789	rhead->num_retry = req->r_attempts - 1;
2790
2791	dout(" r_parent = %p\n", req->r_parent);
2792	return 0;
2793}
2794
2795/*
2796 * called under mdsc->mutex
2797 */
2798static int __send_request(struct ceph_mds_session *session,
2799			  struct ceph_mds_request *req,
2800			  bool drop_cap_releases)
2801{
2802	int err;
2803
2804	err = __prepare_send_request(session, req, drop_cap_releases);
2805	if (!err) {
2806		ceph_msg_get(req->r_request);
2807		ceph_con_send(&session->s_con, req->r_request);
2808	}
2809
2810	return err;
2811}
2812
2813/*
2814 * send request, or put it on the appropriate wait list.
2815 */
2816static void __do_request(struct ceph_mds_client *mdsc,
2817			struct ceph_mds_request *req)
2818{
2819	struct ceph_mds_session *session = NULL;
2820	int mds = -1;
2821	int err = 0;
2822	bool random;
2823
2824	if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2825		if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2826			__unregister_request(mdsc, req);
2827		return;
2828	}
2829
2830	if (req->r_timeout &&
2831	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2832		dout("do_request timed out\n");
2833		err = -ETIMEDOUT;
2834		goto finish;
2835	}
2836	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2837		dout("do_request forced umount\n");
2838		err = -EIO;
2839		goto finish;
2840	}
2841	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2842		if (mdsc->mdsmap_err) {
2843			err = mdsc->mdsmap_err;
2844			dout("do_request mdsmap err %d\n", err);
2845			goto finish;
2846		}
2847		if (mdsc->mdsmap->m_epoch == 0) {
2848			dout("do_request no mdsmap, waiting for map\n");
2849			list_add(&req->r_wait, &mdsc->waiting_for_map);
2850			return;
2851		}
2852		if (!(mdsc->fsc->mount_options->flags &
2853		      CEPH_MOUNT_OPT_MOUNTWAIT) &&
2854		    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2855			err = -EHOSTUNREACH;
2856			goto finish;
2857		}
2858	}
2859
2860	put_request_session(req);
2861
2862	mds = __choose_mds(mdsc, req, &random);
2863	if (mds < 0 ||
2864	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2865		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2866			err = -EJUKEBOX;
2867			goto finish;
2868		}
2869		dout("do_request no mds or not active, waiting for map\n");
2870		list_add(&req->r_wait, &mdsc->waiting_for_map);
2871		return;
2872	}
2873
2874	/* get, open session */
2875	session = __ceph_lookup_mds_session(mdsc, mds);
2876	if (!session) {
2877		session = register_session(mdsc, mds);
2878		if (IS_ERR(session)) {
2879			err = PTR_ERR(session);
2880			goto finish;
2881		}
2882	}
2883	req->r_session = ceph_get_mds_session(session);
2884
2885	dout("do_request mds%d session %p state %s\n", mds, session,
2886	     ceph_session_state_name(session->s_state));
2887	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2888	    session->s_state != CEPH_MDS_SESSION_HUNG) {
2889		/*
2890		 * We cannot queue async requests since the caps and delegated
2891		 * inodes are bound to the session. Just return -EJUKEBOX and
2892		 * let the caller retry a sync request in that case.
2893		 */
2894		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2895			err = -EJUKEBOX;
2896			goto out_session;
2897		}
2898
2899		/*
2900		 * If the session has been REJECTED, then return a hard error,
2901		 * unless it's a CLEANRECOVER mount, in which case we'll queue
2902		 * it to the mdsc queue.
2903		 */
2904		if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2905			if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
2906				list_add(&req->r_wait, &mdsc->waiting_for_map);
2907			else
2908				err = -EACCES;
2909			goto out_session;
2910		}
2911
2912		if (session->s_state == CEPH_MDS_SESSION_NEW ||
2913		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
2914			err = __open_session(mdsc, session);
2915			if (err)
2916				goto out_session;
2917			/* retry the same mds later */
2918			if (random)
2919				req->r_resend_mds = mds;
2920		}
2921		list_add(&req->r_wait, &session->s_waiting);
2922		goto out_session;
2923	}
2924
2925	/* send request */
2926	req->r_resend_mds = -1;   /* forget any previous mds hint */
2927
2928	if (req->r_request_started == 0)   /* note request start time */
2929		req->r_request_started = jiffies;
2930
2931	err = __send_request(session, req, false);
2932
2933out_session:
2934	ceph_put_mds_session(session);
2935finish:
2936	if (err) {
2937		dout("__do_request early error %d\n", err);
2938		req->r_err = err;
2939		complete_request(mdsc, req);
2940		__unregister_request(mdsc, req);
2941	}
2942	return;
2943}
2944
2945/*
2946 * called under mdsc->mutex
2947 */
2948static void __wake_requests(struct ceph_mds_client *mdsc,
2949			    struct list_head *head)
2950{
2951	struct ceph_mds_request *req;
2952	LIST_HEAD(tmp_list);
2953
2954	list_splice_init(head, &tmp_list);
2955
2956	while (!list_empty(&tmp_list)) {
2957		req = list_entry(tmp_list.next,
2958				 struct ceph_mds_request, r_wait);
2959		list_del_init(&req->r_wait);
2960		dout(" wake request %p tid %llu\n", req, req->r_tid);
2961		__do_request(mdsc, req);
2962	}
2963}
2964
2965/*
2966 * Wake up threads with requests pending for @mds, so that they can
2967 * resubmit their requests to a possibly different mds.
2968 */
2969static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2970{
2971	struct ceph_mds_request *req;
2972	struct rb_node *p = rb_first(&mdsc->request_tree);
2973
2974	dout("kick_requests mds%d\n", mds);
2975	while (p) {
2976		req = rb_entry(p, struct ceph_mds_request, r_node);
2977		p = rb_next(p);
2978		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2979			continue;
2980		if (req->r_attempts > 0)
2981			continue; /* only new requests */
2982		if (req->r_session &&
2983		    req->r_session->s_mds == mds) {
2984			dout(" kicking tid %llu\n", req->r_tid);
2985			list_del_init(&req->r_wait);
2986			__do_request(mdsc, req);
2987		}
2988	}
2989}
2990
2991int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2992			      struct ceph_mds_request *req)
2993{
2994	int err = 0;
2995
2996	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2997	if (req->r_inode)
2998		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2999	if (req->r_parent) {
3000		struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3001		int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3002			    CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3003		spin_lock(&ci->i_ceph_lock);
3004		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3005		__ceph_touch_fmode(ci, mdsc, fmode);
3006		spin_unlock(&ci->i_ceph_lock);
3007	}
3008	if (req->r_old_dentry_dir)
3009		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3010				  CEPH_CAP_PIN);
3011
3012	if (req->r_inode) {
3013		err = ceph_wait_on_async_create(req->r_inode);
3014		if (err) {
3015			dout("%s: wait for async create returned: %d\n",
3016			     __func__, err);
3017			return err;
3018		}
3019	}
3020
3021	if (!err && req->r_old_inode) {
3022		err = ceph_wait_on_async_create(req->r_old_inode);
3023		if (err) {
3024			dout("%s: wait for async create returned: %d\n",
3025			     __func__, err);
3026			return err;
3027		}
3028	}
3029
3030	dout("submit_request on %p for inode %p\n", req, dir);
3031	mutex_lock(&mdsc->mutex);
3032	__register_request(mdsc, req, dir);
3033	__do_request(mdsc, req);
3034	err = req->r_err;
3035	mutex_unlock(&mdsc->mutex);
3036	return err;
3037}
3038
3039static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3040				  struct ceph_mds_request *req)
3041{
3042	int err;
3043
3044	/* wait */
3045	dout("do_request waiting\n");
3046	if (!req->r_timeout && req->r_wait_for_completion) {
3047		err = req->r_wait_for_completion(mdsc, req);
3048	} else {
3049		long timeleft = wait_for_completion_killable_timeout(
3050					&req->r_completion,
3051					ceph_timeout_jiffies(req->r_timeout));
3052		if (timeleft > 0)
3053			err = 0;
3054		else if (!timeleft)
3055			err = -ETIMEDOUT;  /* timed out */
3056		else
3057			err = timeleft;  /* killed */
3058	}
3059	dout("do_request waited, got %d\n", err);
3060	mutex_lock(&mdsc->mutex);
3061
3062	/* only abort if we didn't race with a real reply */
3063	if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3064		err = le32_to_cpu(req->r_reply_info.head->result);
3065	} else if (err < 0) {
3066		dout("aborted request %lld with %d\n", req->r_tid, err);
3067
3068		/*
3069		 * ensure we aren't running concurrently with
3070		 * ceph_fill_trace or ceph_readdir_prepopulate, which
3071		 * rely on locks (dir mutex) held by our caller.
3072		 */
3073		mutex_lock(&req->r_fill_mutex);
3074		req->r_err = err;
3075		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3076		mutex_unlock(&req->r_fill_mutex);
3077
3078		if (req->r_parent &&
3079		    (req->r_op & CEPH_MDS_OP_WRITE))
3080			ceph_invalidate_dir_request(req);
3081	} else {
3082		err = req->r_err;
3083	}
3084
3085	mutex_unlock(&mdsc->mutex);
3086	return err;
3087}
3088
3089/*
3090 * Synchrously perform an mds request.  Take care of all of the
3091 * session setup, forwarding, retry details.
3092 */
3093int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3094			 struct inode *dir,
3095			 struct ceph_mds_request *req)
3096{
3097	int err;
3098
3099	dout("do_request on %p\n", req);
3100
3101	/* issue */
3102	err = ceph_mdsc_submit_request(mdsc, dir, req);
3103	if (!err)
3104		err = ceph_mdsc_wait_request(mdsc, req);
3105	dout("do_request %p done, result %d\n", req, err);
3106	return err;
3107}
3108
3109/*
3110 * Invalidate dir's completeness, dentry lease state on an aborted MDS
3111 * namespace request.
3112 */
3113void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3114{
3115	struct inode *dir = req->r_parent;
3116	struct inode *old_dir = req->r_old_dentry_dir;
3117
3118	dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3119
3120	ceph_dir_clear_complete(dir);
3121	if (old_dir)
3122		ceph_dir_clear_complete(old_dir);
3123	if (req->r_dentry)
3124		ceph_invalidate_dentry_lease(req->r_dentry);
3125	if (req->r_old_dentry)
3126		ceph_invalidate_dentry_lease(req->r_old_dentry);
3127}
3128
3129/*
3130 * Handle mds reply.
3131 *
3132 * We take the session mutex and parse and process the reply immediately.
3133 * This preserves the logical ordering of replies, capabilities, etc., sent
3134 * by the MDS as they are applied to our local cache.
3135 */
3136static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3137{
3138	struct ceph_mds_client *mdsc = session->s_mdsc;
3139	struct ceph_mds_request *req;
3140	struct ceph_mds_reply_head *head = msg->front.iov_base;
3141	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3142	struct ceph_snap_realm *realm;
3143	u64 tid;
3144	int err, result;
3145	int mds = session->s_mds;
3146
3147	if (msg->front.iov_len < sizeof(*head)) {
3148		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3149		ceph_msg_dump(msg);
3150		return;
3151	}
3152
3153	/* get request, session */
3154	tid = le64_to_cpu(msg->hdr.tid);
3155	mutex_lock(&mdsc->mutex);
3156	req = lookup_get_request(mdsc, tid);
3157	if (!req) {
3158		dout("handle_reply on unknown tid %llu\n", tid);
3159		mutex_unlock(&mdsc->mutex);
3160		return;
3161	}
3162	dout("handle_reply %p\n", req);
3163
3164	/* correct session? */
3165	if (req->r_session != session) {
3166		pr_err("mdsc_handle_reply got %llu on session mds%d"
3167		       " not mds%d\n", tid, session->s_mds,
3168		       req->r_session ? req->r_session->s_mds : -1);
3169		mutex_unlock(&mdsc->mutex);
3170		goto out;
3171	}
3172
3173	/* dup? */
3174	if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3175	    (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3176		pr_warn("got a dup %s reply on %llu from mds%d\n",
3177			   head->safe ? "safe" : "unsafe", tid, mds);
3178		mutex_unlock(&mdsc->mutex);
3179		goto out;
3180	}
3181	if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3182		pr_warn("got unsafe after safe on %llu from mds%d\n",
3183			   tid, mds);
3184		mutex_unlock(&mdsc->mutex);
3185		goto out;
3186	}
3187
3188	result = le32_to_cpu(head->result);
3189
3190	/*
3191	 * Handle an ESTALE
3192	 * if we're not talking to the authority, send to them
3193	 * if the authority has changed while we weren't looking,
3194	 * send to new authority
3195	 * Otherwise we just have to return an ESTALE
3196	 */
3197	if (result == -ESTALE) {
3198		dout("got ESTALE on request %llu\n", req->r_tid);
3199		req->r_resend_mds = -1;
3200		if (req->r_direct_mode != USE_AUTH_MDS) {
3201			dout("not using auth, setting for that now\n");
3202			req->r_direct_mode = USE_AUTH_MDS;
3203			__do_request(mdsc, req);
3204			mutex_unlock(&mdsc->mutex);
3205			goto out;
3206		} else  {
3207			int mds = __choose_mds(mdsc, req, NULL);
3208			if (mds >= 0 && mds != req->r_session->s_mds) {
3209				dout("but auth changed, so resending\n");
3210				__do_request(mdsc, req);
3211				mutex_unlock(&mdsc->mutex);
3212				goto out;
3213			}
3214		}
3215		dout("have to return ESTALE on request %llu\n", req->r_tid);
3216	}
3217
3218
3219	if (head->safe) {
3220		set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3221		__unregister_request(mdsc, req);
3222
3223		/* last request during umount? */
3224		if (mdsc->stopping && !__get_oldest_req(mdsc))
3225			complete_all(&mdsc->safe_umount_waiters);
3226
3227		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3228			/*
3229			 * We already handled the unsafe response, now do the
3230			 * cleanup.  No need to examine the response; the MDS
3231			 * doesn't include any result info in the safe
3232			 * response.  And even if it did, there is nothing
3233			 * useful we could do with a revised return value.
3234			 */
3235			dout("got safe reply %llu, mds%d\n", tid, mds);
3236
3237			mutex_unlock(&mdsc->mutex);
3238			goto out;
3239		}
3240	} else {
3241		set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3242		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3243	}
3244
3245	dout("handle_reply tid %lld result %d\n", tid, result);
3246	rinfo = &req->r_reply_info;
3247	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3248		err = parse_reply_info(session, msg, rinfo, (u64)-1);
3249	else
3250		err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3251	mutex_unlock(&mdsc->mutex);
3252
3253	/* Must find target inode outside of mutexes to avoid deadlocks */
3254	if ((err >= 0) && rinfo->head->is_target) {
3255		struct inode *in;
3256		struct ceph_vino tvino = {
3257			.ino  = le64_to_cpu(rinfo->targeti.in->ino),
3258			.snap = le64_to_cpu(rinfo->targeti.in->snapid)
3259		};
3260
3261		in = ceph_get_inode(mdsc->fsc->sb, tvino);
3262		if (IS_ERR(in)) {
3263			err = PTR_ERR(in);
3264			mutex_lock(&session->s_mutex);
3265			goto out_err;
3266		}
3267		req->r_target_inode = in;
3268	}
3269
3270	mutex_lock(&session->s_mutex);
3271	if (err < 0) {
3272		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3273		ceph_msg_dump(msg);
3274		goto out_err;
3275	}
3276
3277	/* snap trace */
3278	realm = NULL;
3279	if (rinfo->snapblob_len) {
3280		down_write(&mdsc->snap_rwsem);
3281		ceph_update_snap_trace(mdsc, rinfo->snapblob,
3282				rinfo->snapblob + rinfo->snapblob_len,
3283				le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3284				&realm);
3285		downgrade_write(&mdsc->snap_rwsem);
3286	} else {
3287		down_read(&mdsc->snap_rwsem);
3288	}
3289
3290	/* insert trace into our cache */
3291	mutex_lock(&req->r_fill_mutex);
3292	current->journal_info = req;
3293	err = ceph_fill_trace(mdsc->fsc->sb, req);
3294	if (err == 0) {
3295		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3296				    req->r_op == CEPH_MDS_OP_LSSNAP))
3297			ceph_readdir_prepopulate(req, req->r_session);
3298	}
3299	current->journal_info = NULL;
3300	mutex_unlock(&req->r_fill_mutex);
3301
3302	up_read(&mdsc->snap_rwsem);
3303	if (realm)
3304		ceph_put_snap_realm(mdsc, realm);
3305
3306	if (err == 0) {
3307		if (req->r_target_inode &&
3308		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3309			struct ceph_inode_info *ci =
3310				ceph_inode(req->r_target_inode);
3311			spin_lock(&ci->i_unsafe_lock);
3312			list_add_tail(&req->r_unsafe_target_item,
3313				      &ci->i_unsafe_iops);
3314			spin_unlock(&ci->i_unsafe_lock);
3315		}
3316
3317		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3318	}
3319out_err:
3320	mutex_lock(&mdsc->mutex);
3321	if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3322		if (err) {
3323			req->r_err = err;
3324		} else {
3325			req->r_reply =  ceph_msg_get(msg);
3326			set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3327		}
3328	} else {
3329		dout("reply arrived after request %lld was aborted\n", tid);
3330	}
3331	mutex_unlock(&mdsc->mutex);
3332
3333	mutex_unlock(&session->s_mutex);
3334
3335	/* kick calling process */
3336	complete_request(mdsc, req);
3337
3338	ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
3339				     req->r_end_latency, err);
3340out:
3341	ceph_mdsc_put_request(req);
3342	return;
3343}
3344
3345
3346
3347/*
3348 * handle mds notification that our request has been forwarded.
3349 */
3350static void handle_forward(struct ceph_mds_client *mdsc,
3351			   struct ceph_mds_session *session,
3352			   struct ceph_msg *msg)
3353{
3354	struct ceph_mds_request *req;
3355	u64 tid = le64_to_cpu(msg->hdr.tid);
3356	u32 next_mds;
3357	u32 fwd_seq;
3358	int err = -EINVAL;
3359	void *p = msg->front.iov_base;
3360	void *end = p + msg->front.iov_len;
3361
3362	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3363	next_mds = ceph_decode_32(&p);
3364	fwd_seq = ceph_decode_32(&p);
3365
3366	mutex_lock(&mdsc->mutex);
3367	req = lookup_get_request(mdsc, tid);
3368	if (!req) {
3369		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3370		goto out;  /* dup reply? */
3371	}
3372
3373	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3374		dout("forward tid %llu aborted, unregistering\n", tid);
3375		__unregister_request(mdsc, req);
3376	} else if (fwd_seq <= req->r_num_fwd) {
3377		dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3378		     tid, next_mds, req->r_num_fwd, fwd_seq);
3379	} else {
3380		/* resend. forward race not possible; mds would drop */
3381		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3382		BUG_ON(req->r_err);
3383		BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3384		req->r_attempts = 0;
3385		req->r_num_fwd = fwd_seq;
3386		req->r_resend_mds = next_mds;
3387		put_request_session(req);
3388		__do_request(mdsc, req);
3389	}
3390	ceph_mdsc_put_request(req);
3391out:
3392	mutex_unlock(&mdsc->mutex);
3393	return;
3394
3395bad:
3396	pr_err("mdsc_handle_forward decode error err=%d\n", err);
3397}
3398
3399static int __decode_session_metadata(void **p, void *end,
3400				     bool *blocklisted)
3401{
3402	/* map<string,string> */
3403	u32 n;
3404	bool err_str;
3405	ceph_decode_32_safe(p, end, n, bad);
3406	while (n-- > 0) {
3407		u32 len;
3408		ceph_decode_32_safe(p, end, len, bad);
3409		ceph_decode_need(p, end, len, bad);
3410		err_str = !strncmp(*p, "error_string", len);
3411		*p += len;
3412		ceph_decode_32_safe(p, end, len, bad);
3413		ceph_decode_need(p, end, len, bad);
3414		/*
3415		 * Match "blocklisted (blacklisted)" from newer MDSes,
3416		 * or "blacklisted" from older MDSes.
3417		 */
3418		if (err_str && strnstr(*p, "blacklisted", len))
3419			*blocklisted = true;
3420		*p += len;
3421	}
3422	return 0;
3423bad:
3424	return -1;
3425}
3426
3427/*
3428 * handle a mds session control message
3429 */
3430static void handle_session(struct ceph_mds_session *session,
3431			   struct ceph_msg *msg)
3432{
3433	struct ceph_mds_client *mdsc = session->s_mdsc;
3434	int mds = session->s_mds;
3435	int msg_version = le16_to_cpu(msg->hdr.version);
3436	void *p = msg->front.iov_base;
3437	void *end = p + msg->front.iov_len;
3438	struct ceph_mds_session_head *h;
3439	u32 op;
3440	u64 seq, features = 0;
3441	int wake = 0;
3442	bool blocklisted = false;
3443
3444	/* decode */
3445	ceph_decode_need(&p, end, sizeof(*h), bad);
3446	h = p;
3447	p += sizeof(*h);
3448
3449	op = le32_to_cpu(h->op);
3450	seq = le64_to_cpu(h->seq);
3451
3452	if (msg_version >= 3) {
3453		u32 len;
3454		/* version >= 2, metadata */
3455		if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3456			goto bad;
3457		/* version >= 3, feature bits */
3458		ceph_decode_32_safe(&p, end, len, bad);
3459		if (len) {
3460			ceph_decode_64_safe(&p, end, features, bad);
3461			p += len - sizeof(features);
3462		}
3463	}
3464
3465	mutex_lock(&mdsc->mutex);
3466	if (op == CEPH_SESSION_CLOSE) {
3467		ceph_get_mds_session(session);
3468		__unregister_session(mdsc, session);
3469	}
3470	/* FIXME: this ttl calculation is generous */
3471	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3472	mutex_unlock(&mdsc->mutex);
3473
3474	mutex_lock(&session->s_mutex);
3475
3476	dout("handle_session mds%d %s %p state %s seq %llu\n",
3477	     mds, ceph_session_op_name(op), session,
3478	     ceph_session_state_name(session->s_state), seq);
3479
3480	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3481		session->s_state = CEPH_MDS_SESSION_OPEN;
3482		pr_info("mds%d came back\n", session->s_mds);
3483	}
3484
3485	switch (op) {
3486	case CEPH_SESSION_OPEN:
3487		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3488			pr_info("mds%d reconnect success\n", session->s_mds);
3489		session->s_state = CEPH_MDS_SESSION_OPEN;
3490		session->s_features = features;
3491		renewed_caps(mdsc, session, 0);
3492		if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
3493			metric_schedule_delayed(&mdsc->metric);
3494		wake = 1;
3495		if (mdsc->stopping)
3496			__close_session(mdsc, session);
3497		break;
3498
3499	case CEPH_SESSION_RENEWCAPS:
3500		if (session->s_renew_seq == seq)
3501			renewed_caps(mdsc, session, 1);
3502		break;
3503
3504	case CEPH_SESSION_CLOSE:
3505		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3506			pr_info("mds%d reconnect denied\n", session->s_mds);
3507		session->s_state = CEPH_MDS_SESSION_CLOSED;
3508		cleanup_session_requests(mdsc, session);
3509		remove_session_caps(session);
3510		wake = 2; /* for good measure */
3511		wake_up_all(&mdsc->session_close_wq);
3512		break;
3513
3514	case CEPH_SESSION_STALE:
3515		pr_info("mds%d caps went stale, renewing\n",
3516			session->s_mds);
3517		atomic_inc(&session->s_cap_gen);
3518		session->s_cap_ttl = jiffies - 1;
3519		send_renew_caps(mdsc, session);
3520		break;
3521
3522	case CEPH_SESSION_RECALL_STATE:
3523		ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3524		break;
3525
3526	case CEPH_SESSION_FLUSHMSG:
3527		send_flushmsg_ack(mdsc, session, seq);
3528		break;
3529
3530	case CEPH_SESSION_FORCE_RO:
3531		dout("force_session_readonly %p\n", session);
3532		spin_lock(&session->s_cap_lock);
3533		session->s_readonly = true;
3534		spin_unlock(&session->s_cap_lock);
3535		wake_up_session_caps(session, FORCE_RO);
3536		break;
3537
3538	case CEPH_SESSION_REJECT:
3539		WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3540		pr_info("mds%d rejected session\n", session->s_mds);
3541		session->s_state = CEPH_MDS_SESSION_REJECTED;
3542		cleanup_session_requests(mdsc, session);
3543		remove_session_caps(session);
3544		if (blocklisted)
3545			mdsc->fsc->blocklisted = true;
3546		wake = 2; /* for good measure */
3547		break;
3548
3549	default:
3550		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3551		WARN_ON(1);
3552	}
3553
3554	mutex_unlock(&session->s_mutex);
3555	if (wake) {
3556		mutex_lock(&mdsc->mutex);
3557		__wake_requests(mdsc, &session->s_waiting);
3558		if (wake == 2)
3559			kick_requests(mdsc, mds);
3560		mutex_unlock(&mdsc->mutex);
3561	}
3562	if (op == CEPH_SESSION_CLOSE)
3563		ceph_put_mds_session(session);
3564	return;
3565
3566bad:
3567	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3568	       (int)msg->front.iov_len);
3569	ceph_msg_dump(msg);
3570	return;
3571}
3572
3573void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3574{
3575	int dcaps;
3576
3577	dcaps = xchg(&req->r_dir_caps, 0);
3578	if (dcaps) {
3579		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3580		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3581	}
3582}
3583
3584void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3585{
3586	int dcaps;
3587
3588	dcaps = xchg(&req->r_dir_caps, 0);
3589	if (dcaps) {
3590		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3591		ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3592						dcaps);
3593	}
3594}
3595
3596/*
3597 * called under session->mutex.
3598 */
3599static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3600				   struct ceph_mds_session *session)
3601{
3602	struct ceph_mds_request *req, *nreq;
3603	struct rb_node *p;
3604
3605	dout("replay_unsafe_requests mds%d\n", session->s_mds);
3606
3607	mutex_lock(&mdsc->mutex);
3608	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3609		__send_request(session, req, true);
3610
3611	/*
3612	 * also re-send old requests when MDS enters reconnect stage. So that MDS
3613	 * can process completed request in clientreplay stage.
3614	 */
3615	p = rb_first(&mdsc->request_tree);
3616	while (p) {
3617		req = rb_entry(p, struct ceph_mds_request, r_node);
3618		p = rb_next(p);
3619		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3620			continue;
3621		if (req->r_attempts == 0)
3622			continue; /* only old requests */
3623		if (!req->r_session)
3624			continue;
3625		if (req->r_session->s_mds != session->s_mds)
3626			continue;
3627
3628		ceph_mdsc_release_dir_caps_no_check(req);
3629
3630		__send_request(session, req, true);
3631	}
3632	mutex_unlock(&mdsc->mutex);
3633}
3634
3635static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3636{
3637	struct ceph_msg *reply;
3638	struct ceph_pagelist *_pagelist;
3639	struct page *page;
3640	__le32 *addr;
3641	int err = -ENOMEM;
3642
3643	if (!recon_state->allow_multi)
3644		return -ENOSPC;
3645
3646	/* can't handle message that contains both caps and realm */
3647	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3648
3649	/* pre-allocate new pagelist */
3650	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
3651	if (!_pagelist)
3652		return -ENOMEM;
3653
3654	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3655	if (!reply)
3656		goto fail_msg;
3657
3658	/* placeholder for nr_caps */
3659	err = ceph_pagelist_encode_32(_pagelist, 0);
3660	if (err < 0)
3661		goto fail;
3662
3663	if (recon_state->nr_caps) {
3664		/* currently encoding caps */
3665		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3666		if (err)
3667			goto fail;
3668	} else {
3669		/* placeholder for nr_realms (currently encoding relams) */
3670		err = ceph_pagelist_encode_32(_pagelist, 0);
3671		if (err < 0)
3672			goto fail;
3673	}
3674
3675	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3676	if (err)
3677		goto fail;
3678
3679	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3680	addr = kmap_atomic(page);
3681	if (recon_state->nr_caps) {
3682		/* currently encoding caps */
3683		*addr = cpu_to_le32(recon_state->nr_caps);
3684	} else {
3685		/* currently encoding relams */
3686		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3687	}
3688	kunmap_atomic(addr);
3689
3690	reply->hdr.version = cpu_to_le16(5);
3691	reply->hdr.compat_version = cpu_to_le16(4);
3692
3693	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3694	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3695
3696	ceph_con_send(&recon_state->session->s_con, reply);
3697	ceph_pagelist_release(recon_state->pagelist);
3698
3699	recon_state->pagelist = _pagelist;
3700	recon_state->nr_caps = 0;
3701	recon_state->nr_realms = 0;
3702	recon_state->msg_version = 5;
3703	return 0;
3704fail:
3705	ceph_msg_put(reply);
3706fail_msg:
3707	ceph_pagelist_release(_pagelist);
3708	return err;
3709}
3710
3711static struct dentry* d_find_primary(struct inode *inode)
3712{
3713	struct dentry *alias, *dn = NULL;
3714
3715	if (hlist_empty(&inode->i_dentry))
3716		return NULL;
3717
3718	spin_lock(&inode->i_lock);
3719	if (hlist_empty(&inode->i_dentry))
3720		goto out_unlock;
3721
3722	if (S_ISDIR(inode->i_mode)) {
3723		alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3724		if (!IS_ROOT(alias))
3725			dn = dget(alias);
3726		goto out_unlock;
3727	}
3728
3729	hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3730		spin_lock(&alias->d_lock);
3731		if (!d_unhashed(alias) &&
3732		    (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3733			dn = dget_dlock(alias);
3734		}
3735		spin_unlock(&alias->d_lock);
3736		if (dn)
3737			break;
3738	}
3739out_unlock:
3740	spin_unlock(&inode->i_lock);
3741	return dn;
3742}
3743
3744/*
3745 * Encode information about a cap for a reconnect with the MDS.
3746 */
3747static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3748			  void *arg)
3749{
3750	union {
3751		struct ceph_mds_cap_reconnect v2;
3752		struct ceph_mds_cap_reconnect_v1 v1;
3753	} rec;
3754	struct ceph_inode_info *ci = cap->ci;
3755	struct ceph_reconnect_state *recon_state = arg;
3756	struct ceph_pagelist *pagelist = recon_state->pagelist;
3757	struct dentry *dentry;
3758	char *path;
3759	int pathlen, err;
3760	u64 pathbase;
3761	u64 snap_follows;
3762
3763	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3764	     inode, ceph_vinop(inode), cap, cap->cap_id,
3765	     ceph_cap_string(cap->issued));
3766
3767	dentry = d_find_primary(inode);
3768	if (dentry) {
3769		/* set pathbase to parent dir when msg_version >= 2 */
3770		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3771					    recon_state->msg_version >= 2);
3772		dput(dentry);
3773		if (IS_ERR(path)) {
3774			err = PTR_ERR(path);
3775			goto out_err;
3776		}
3777	} else {
3778		path = NULL;
3779		pathlen = 0;
3780		pathbase = 0;
3781	}
3782
3783	spin_lock(&ci->i_ceph_lock);
3784	cap->seq = 0;        /* reset cap seq */
3785	cap->issue_seq = 0;  /* and issue_seq */
3786	cap->mseq = 0;       /* and migrate_seq */
3787	cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
3788
3789	/* These are lost when the session goes away */
3790	if (S_ISDIR(inode->i_mode)) {
3791		if (cap->issued & CEPH_CAP_DIR_CREATE) {
3792			ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3793			memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3794		}
3795		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3796	}
3797
3798	if (recon_state->msg_version >= 2) {
3799		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3800		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3801		rec.v2.issued = cpu_to_le32(cap->issued);
3802		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3803		rec.v2.pathbase = cpu_to_le64(pathbase);
3804		rec.v2.flock_len = (__force __le32)
3805			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3806	} else {
3807		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3808		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3809		rec.v1.issued = cpu_to_le32(cap->issued);
3810		rec.v1.size = cpu_to_le64(i_size_read(inode));
3811		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3812		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3813		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3814		rec.v1.pathbase = cpu_to_le64(pathbase);
3815	}
3816
3817	if (list_empty(&ci->i_cap_snaps)) {
3818		snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3819	} else {
3820		struct ceph_cap_snap *capsnap =
3821			list_first_entry(&ci->i_cap_snaps,
3822					 struct ceph_cap_snap, ci_item);
3823		snap_follows = capsnap->follows;
3824	}
3825	spin_unlock(&ci->i_ceph_lock);
3826
3827	if (recon_state->msg_version >= 2) {
3828		int num_fcntl_locks, num_flock_locks;
3829		struct ceph_filelock *flocks = NULL;
3830		size_t struct_len, total_len = sizeof(u64);
3831		u8 struct_v = 0;
3832
3833encode_again:
3834		if (rec.v2.flock_len) {
3835			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3836		} else {
3837			num_fcntl_locks = 0;
3838			num_flock_locks = 0;
3839		}
3840		if (num_fcntl_locks + num_flock_locks > 0) {
3841			flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3842					       sizeof(struct ceph_filelock),
3843					       GFP_NOFS);
3844			if (!flocks) {
3845				err = -ENOMEM;
3846				goto out_err;
3847			}
3848			err = ceph_encode_locks_to_buffer(inode, flocks,
3849							  num_fcntl_locks,
3850							  num_flock_locks);
3851			if (err) {
3852				kfree(flocks);
3853				flocks = NULL;
3854				if (err == -ENOSPC)
3855					goto encode_again;
3856				goto out_err;
3857			}
3858		} else {
3859			kfree(flocks);
3860			flocks = NULL;
3861		}
3862
3863		if (recon_state->msg_version >= 3) {
3864			/* version, compat_version and struct_len */
3865			total_len += 2 * sizeof(u8) + sizeof(u32);
3866			struct_v = 2;
3867		}
3868		/*
3869		 * number of encoded locks is stable, so copy to pagelist
3870		 */
3871		struct_len = 2 * sizeof(u32) +
3872			    (num_fcntl_locks + num_flock_locks) *
3873			    sizeof(struct ceph_filelock);
3874		rec.v2.flock_len = cpu_to_le32(struct_len);
3875
3876		struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
3877
3878		if (struct_v >= 2)
3879			struct_len += sizeof(u64); /* snap_follows */
3880
3881		total_len += struct_len;
3882
3883		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3884			err = send_reconnect_partial(recon_state);
3885			if (err)
3886				goto out_freeflocks;
3887			pagelist = recon_state->pagelist;
3888		}
3889
3890		err = ceph_pagelist_reserve(pagelist, total_len);
3891		if (err)
3892			goto out_freeflocks;
3893
3894		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3895		if (recon_state->msg_version >= 3) {
3896			ceph_pagelist_encode_8(pagelist, struct_v);
3897			ceph_pagelist_encode_8(pagelist, 1);
3898			ceph_pagelist_encode_32(pagelist, struct_len);
3899		}
3900		ceph_pagelist_encode_string(pagelist, path, pathlen);
3901		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3902		ceph_locks_to_pagelist(flocks, pagelist,
3903				       num_fcntl_locks, num_flock_locks);
3904		if (struct_v >= 2)
3905			ceph_pagelist_encode_64(pagelist, snap_follows);
3906out_freeflocks:
3907		kfree(flocks);
3908	} else {
3909		err = ceph_pagelist_reserve(pagelist,
3910					    sizeof(u64) + sizeof(u32) +
3911					    pathlen + sizeof(rec.v1));
3912		if (err)
3913			goto out_err;
3914
3915		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3916		ceph_pagelist_encode_string(pagelist, path, pathlen);
3917		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3918	}
3919
3920out_err:
3921	ceph_mdsc_free_path(path, pathlen);
3922	if (!err)
3923		recon_state->nr_caps++;
3924	return err;
3925}
3926
3927static int encode_snap_realms(struct ceph_mds_client *mdsc,
3928			      struct ceph_reconnect_state *recon_state)
3929{
3930	struct rb_node *p;
3931	struct ceph_pagelist *pagelist = recon_state->pagelist;
3932	int err = 0;
3933
3934	if (recon_state->msg_version >= 4) {
3935		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3936		if (err < 0)
3937			goto fail;
3938	}
3939
3940	/*
3941	 * snaprealms.  we provide mds with the ino, seq (version), and
3942	 * parent for all of our realms.  If the mds has any newer info,
3943	 * it will tell us.
3944	 */
3945	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3946		struct ceph_snap_realm *realm =
3947		       rb_entry(p, struct ceph_snap_realm, node);
3948		struct ceph_mds_snaprealm_reconnect sr_rec;
3949
3950		if (recon_state->msg_version >= 4) {
3951			size_t need = sizeof(u8) * 2 + sizeof(u32) +
3952				      sizeof(sr_rec);
3953
3954			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3955				err = send_reconnect_partial(recon_state);
3956				if (err)
3957					goto fail;
3958				pagelist = recon_state->pagelist;
3959			}
3960
3961			err = ceph_pagelist_reserve(pagelist, need);
3962			if (err)
3963				goto fail;
3964
3965			ceph_pagelist_encode_8(pagelist, 1);
3966			ceph_pagelist_encode_8(pagelist, 1);
3967			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3968		}
3969
3970		dout(" adding snap realm %llx seq %lld parent %llx\n",
3971		     realm->ino, realm->seq, realm->parent_ino);
3972		sr_rec.ino = cpu_to_le64(realm->ino);
3973		sr_rec.seq = cpu_to_le64(realm->seq);
3974		sr_rec.parent = cpu_to_le64(realm->parent_ino);
3975
3976		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3977		if (err)
3978			goto fail;
3979
3980		recon_state->nr_realms++;
3981	}
3982fail:
3983	return err;
3984}
3985
3986
3987/*
3988 * If an MDS fails and recovers, clients need to reconnect in order to
3989 * reestablish shared state.  This includes all caps issued through
3990 * this session _and_ the snap_realm hierarchy.  Because it's not
3991 * clear which snap realms the mds cares about, we send everything we
3992 * know about.. that ensures we'll then get any new info the
3993 * recovering MDS might have.
3994 *
3995 * This is a relatively heavyweight operation, but it's rare.
3996 */
3997static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3998			       struct ceph_mds_session *session)
3999{
4000	struct ceph_msg *reply;
4001	int mds = session->s_mds;
4002	int err = -ENOMEM;
4003	struct ceph_reconnect_state recon_state = {
4004		.session = session,
4005	};
4006	LIST_HEAD(dispose);
4007
4008	pr_info("mds%d reconnect start\n", mds);
4009
4010	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4011	if (!recon_state.pagelist)
4012		goto fail_nopagelist;
4013
4014	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4015	if (!reply)
4016		goto fail_nomsg;
4017
4018	xa_destroy(&session->s_delegated_inos);
4019
4020	mutex_lock(&session->s_mutex);
4021	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4022	session->s_seq = 0;
4023
4024	dout("session %p state %s\n", session,
4025	     ceph_session_state_name(session->s_state));
4026
4027	atomic_inc(&session->s_cap_gen);
4028
4029	spin_lock(&session->s_cap_lock);
4030	/* don't know if session is readonly */
4031	session->s_readonly = 0;
4032	/*
4033	 * notify __ceph_remove_cap() that we are composing cap reconnect.
4034	 * If a cap get released before being added to the cap reconnect,
4035	 * __ceph_remove_cap() should skip queuing cap release.
4036	 */
4037	session->s_cap_reconnect = 1;
4038	/* drop old cap expires; we're about to reestablish that state */
4039	detach_cap_releases(session, &dispose);
4040	spin_unlock(&session->s_cap_lock);
4041	dispose_cap_releases(mdsc, &dispose);
4042
4043	/* trim unused caps to reduce MDS's cache rejoin time */
4044	if (mdsc->fsc->sb->s_root)
4045		shrink_dcache_parent(mdsc->fsc->sb->s_root);
4046
4047	ceph_con_close(&session->s_con);
4048	ceph_con_open(&session->s_con,
4049		      CEPH_ENTITY_TYPE_MDS, mds,
4050		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4051
4052	/* replay unsafe requests */
4053	replay_unsafe_requests(mdsc, session);
4054
4055	ceph_early_kick_flushing_caps(mdsc, session);
4056
4057	down_read(&mdsc->snap_rwsem);
4058
4059	/* placeholder for nr_caps */
4060	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4061	if (err)
4062		goto fail;
4063
4064	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4065		recon_state.msg_version = 3;
4066		recon_state.allow_multi = true;
4067	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4068		recon_state.msg_version = 3;
4069	} else {
4070		recon_state.msg_version = 2;
4071	}
4072	/* trsaverse this session's caps */
4073	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4074
4075	spin_lock(&session->s_cap_lock);
4076	session->s_cap_reconnect = 0;
4077	spin_unlock(&session->s_cap_lock);
4078
4079	if (err < 0)
4080		goto fail;
4081
4082	/* check if all realms can be encoded into current message */
4083	if (mdsc->num_snap_realms) {
4084		size_t total_len =
4085			recon_state.pagelist->length +
4086			mdsc->num_snap_realms *
4087			sizeof(struct ceph_mds_snaprealm_reconnect);
4088		if (recon_state.msg_version >= 4) {
4089			/* number of realms */
4090			total_len += sizeof(u32);
4091			/* version, compat_version and struct_len */
4092			total_len += mdsc->num_snap_realms *
4093				     (2 * sizeof(u8) + sizeof(u32));
4094		}
4095		if (total_len > RECONNECT_MAX_SIZE) {
4096			if (!recon_state.allow_multi) {
4097				err = -ENOSPC;
4098				goto fail;
4099			}
4100			if (recon_state.nr_caps) {
4101				err = send_reconnect_partial(&recon_state);
4102				if (err)
4103					goto fail;
4104			}
4105			recon_state.msg_version = 5;
4106		}
4107	}
4108
4109	err = encode_snap_realms(mdsc, &recon_state);
4110	if (err < 0)
4111		goto fail;
4112
4113	if (recon_state.msg_version >= 5) {
4114		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4115		if (err < 0)
4116			goto fail;
4117	}
4118
4119	if (recon_state.nr_caps || recon_state.nr_realms) {
4120		struct page *page =
4121			list_first_entry(&recon_state.pagelist->head,
4122					struct page, lru);
4123		__le32 *addr = kmap_atomic(page);
4124		if (recon_state.nr_caps) {
4125			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4126			*addr = cpu_to_le32(recon_state.nr_caps);
4127		} else if (recon_state.msg_version >= 4) {
4128			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4129		}
4130		kunmap_atomic(addr);
4131	}
4132
4133	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4134	if (recon_state.msg_version >= 4)
4135		reply->hdr.compat_version = cpu_to_le16(4);
4136
4137	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4138	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4139
4140	ceph_con_send(&session->s_con, reply);
4141
4142	mutex_unlock(&session->s_mutex);
4143
4144	mutex_lock(&mdsc->mutex);
4145	__wake_requests(mdsc, &session->s_waiting);
4146	mutex_unlock(&mdsc->mutex);
4147
4148	up_read(&mdsc->snap_rwsem);
4149	ceph_pagelist_release(recon_state.pagelist);
4150	return;
4151
4152fail:
4153	ceph_msg_put(reply);
4154	up_read(&mdsc->snap_rwsem);
4155	mutex_unlock(&session->s_mutex);
4156fail_nomsg:
4157	ceph_pagelist_release(recon_state.pagelist);
4158fail_nopagelist:
4159	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4160	return;
4161}
4162
4163
4164/*
4165 * compare old and new mdsmaps, kicking requests
4166 * and closing out old connections as necessary
4167 *
4168 * called under mdsc->mutex.
4169 */
4170static void check_new_map(struct ceph_mds_client *mdsc,
4171			  struct ceph_mdsmap *newmap,
4172			  struct ceph_mdsmap *oldmap)
4173{
4174	int i;
4175	int oldstate, newstate;
4176	struct ceph_mds_session *s;
4177
4178	dout("check_new_map new %u old %u\n",
4179	     newmap->m_epoch, oldmap->m_epoch);
4180
4181	for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4182		if (!mdsc->sessions[i])
4183			continue;
4184		s = mdsc->sessions[i];
4185		oldstate = ceph_mdsmap_get_state(oldmap, i);
4186		newstate = ceph_mdsmap_get_state(newmap, i);
4187
4188		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4189		     i, ceph_mds_state_name(oldstate),
4190		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4191		     ceph_mds_state_name(newstate),
4192		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4193		     ceph_session_state_name(s->s_state));
4194
4195		if (i >= newmap->possible_max_rank) {
4196			/* force close session for stopped mds */
4197			ceph_get_mds_session(s);
4198			__unregister_session(mdsc, s);
4199			__wake_requests(mdsc, &s->s_waiting);
4200			mutex_unlock(&mdsc->mutex);
4201
4202			mutex_lock(&s->s_mutex);
4203			cleanup_session_requests(mdsc, s);
4204			remove_session_caps(s);
4205			mutex_unlock(&s->s_mutex);
4206
4207			ceph_put_mds_session(s);
4208
4209			mutex_lock(&mdsc->mutex);
4210			kick_requests(mdsc, i);
4211			continue;
4212		}
4213
4214		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4215			   ceph_mdsmap_get_addr(newmap, i),
4216			   sizeof(struct ceph_entity_addr))) {
4217			/* just close it */
4218			mutex_unlock(&mdsc->mutex);
4219			mutex_lock(&s->s_mutex);
4220			mutex_lock(&mdsc->mutex);
4221			ceph_con_close(&s->s_con);
4222			mutex_unlock(&s->s_mutex);
4223			s->s_state = CEPH_MDS_SESSION_RESTARTING;
4224		} else if (oldstate == newstate) {
4225			continue;  /* nothing new with this mds */
4226		}
4227
4228		/*
4229		 * send reconnect?
4230		 */
4231		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4232		    newstate >= CEPH_MDS_STATE_RECONNECT) {
4233			mutex_unlock(&mdsc->mutex);
4234			send_mds_reconnect(mdsc, s);
4235			mutex_lock(&mdsc->mutex);
4236		}
4237
4238		/*
4239		 * kick request on any mds that has gone active.
4240		 */
4241		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4242		    newstate >= CEPH_MDS_STATE_ACTIVE) {
4243			if (oldstate != CEPH_MDS_STATE_CREATING &&
4244			    oldstate != CEPH_MDS_STATE_STARTING)
4245				pr_info("mds%d recovery completed\n", s->s_mds);
4246			kick_requests(mdsc, i);
4247			mutex_unlock(&mdsc->mutex);
4248			mutex_lock(&s->s_mutex);
4249			mutex_lock(&mdsc->mutex);
4250			ceph_kick_flushing_caps(mdsc, s);
4251			mutex_unlock(&s->s_mutex);
4252			wake_up_session_caps(s, RECONNECT);
4253		}
4254	}
4255
4256	for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4257		s = mdsc->sessions[i];
4258		if (!s)
4259			continue;
4260		if (!ceph_mdsmap_is_laggy(newmap, i))
4261			continue;
4262		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4263		    s->s_state == CEPH_MDS_SESSION_HUNG ||
4264		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
4265			dout(" connecting to export targets of laggy mds%d\n",
4266			     i);
4267			__open_export_target_sessions(mdsc, s);
4268		}
4269	}
4270}
4271
4272
4273
4274/*
4275 * leases
4276 */
4277
4278/*
4279 * caller must hold session s_mutex, dentry->d_lock
4280 */
4281void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4282{
4283	struct ceph_dentry_info *di = ceph_dentry(dentry);
4284
4285	ceph_put_mds_session(di->lease_session);
4286	di->lease_session = NULL;
4287}
4288
4289static void handle_lease(struct ceph_mds_client *mdsc,
4290			 struct ceph_mds_session *session,
4291			 struct ceph_msg *msg)
4292{
4293	struct super_block *sb = mdsc->fsc->sb;
4294	struct inode *inode;
4295	struct dentry *parent, *dentry;
4296	struct ceph_dentry_info *di;
4297	int mds = session->s_mds;
4298	struct ceph_mds_lease *h = msg->front.iov_base;
4299	u32 seq;
4300	struct ceph_vino vino;
4301	struct qstr dname;
4302	int release = 0;
4303
4304	dout("handle_lease from mds%d\n", mds);
4305
4306	/* decode */
4307	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4308		goto bad;
4309	vino.ino = le64_to_cpu(h->ino);
4310	vino.snap = CEPH_NOSNAP;
4311	seq = le32_to_cpu(h->seq);
4312	dname.len = get_unaligned_le32(h + 1);
4313	if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4314		goto bad;
4315	dname.name = (void *)(h + 1) + sizeof(u32);
4316
4317	/* lookup inode */
4318	inode = ceph_find_inode(sb, vino);
4319	dout("handle_lease %s, ino %llx %p %.*s\n",
4320	     ceph_lease_op_name(h->action), vino.ino, inode,
4321	     dname.len, dname.name);
4322
4323	mutex_lock(&session->s_mutex);
4324	inc_session_sequence(session);
4325
4326	if (!inode) {
4327		dout("handle_lease no inode %llx\n", vino.ino);
4328		goto release;
4329	}
4330
4331	/* dentry */
4332	parent = d_find_alias(inode);
4333	if (!parent) {
4334		dout("no parent dentry on inode %p\n", inode);
4335		WARN_ON(1);
4336		goto release;  /* hrm... */
4337	}
4338	dname.hash = full_name_hash(parent, dname.name, dname.len);
4339	dentry = d_lookup(parent, &dname);
4340	dput(parent);
4341	if (!dentry)
4342		goto release;
4343
4344	spin_lock(&dentry->d_lock);
4345	di = ceph_dentry(dentry);
4346	switch (h->action) {
4347	case CEPH_MDS_LEASE_REVOKE:
4348		if (di->lease_session == session) {
4349			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4350				h->seq = cpu_to_le32(di->lease_seq);
4351			__ceph_mdsc_drop_dentry_lease(dentry);
4352		}
4353		release = 1;
4354		break;
4355
4356	case CEPH_MDS_LEASE_RENEW:
4357		if (di->lease_session == session &&
4358		    di->lease_gen == atomic_read(&session->s_cap_gen) &&
4359		    di->lease_renew_from &&
4360		    di->lease_renew_after == 0) {
4361			unsigned long duration =
4362				msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4363
4364			di->lease_seq = seq;
4365			di->time = di->lease_renew_from + duration;
4366			di->lease_renew_after = di->lease_renew_from +
4367				(duration >> 1);
4368			di->lease_renew_from = 0;
4369		}
4370		break;
4371	}
4372	spin_unlock(&dentry->d_lock);
4373	dput(dentry);
4374
4375	if (!release)
4376		goto out;
4377
4378release:
4379	/* let's just reuse the same message */
4380	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4381	ceph_msg_get(msg);
4382	ceph_con_send(&session->s_con, msg);
4383
4384out:
4385	mutex_unlock(&session->s_mutex);
4386	iput(inode);
4387	return;
4388
4389bad:
4390	pr_err("corrupt lease message\n");
4391	ceph_msg_dump(msg);
4392}
4393
4394void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4395			      struct dentry *dentry, char action,
4396			      u32 seq)
4397{
4398	struct ceph_msg *msg;
4399	struct ceph_mds_lease *lease;
4400	struct inode *dir;
4401	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4402
4403	dout("lease_send_msg identry %p %s to mds%d\n",
4404	     dentry, ceph_lease_op_name(action), session->s_mds);
4405
4406	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4407	if (!msg)
4408		return;
4409	lease = msg->front.iov_base;
4410	lease->action = action;
4411	lease->seq = cpu_to_le32(seq);
4412
4413	spin_lock(&dentry->d_lock);
4414	dir = d_inode(dentry->d_parent);
4415	lease->ino = cpu_to_le64(ceph_ino(dir));
4416	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4417
4418	put_unaligned_le32(dentry->d_name.len, lease + 1);
4419	memcpy((void *)(lease + 1) + 4,
4420	       dentry->d_name.name, dentry->d_name.len);
4421	spin_unlock(&dentry->d_lock);
4422	/*
4423	 * if this is a preemptive lease RELEASE, no need to
4424	 * flush request stream, since the actual request will
4425	 * soon follow.
4426	 */
4427	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4428
4429	ceph_con_send(&session->s_con, msg);
4430}
4431
4432/*
4433 * lock unlock the session, to wait ongoing session activities
4434 */
4435static void lock_unlock_session(struct ceph_mds_session *s)
4436{
4437	mutex_lock(&s->s_mutex);
4438	mutex_unlock(&s->s_mutex);
4439}
4440
4441static void maybe_recover_session(struct ceph_mds_client *mdsc)
4442{
4443	struct ceph_fs_client *fsc = mdsc->fsc;
4444
4445	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4446		return;
4447
4448	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4449		return;
4450
4451	if (!READ_ONCE(fsc->blocklisted))
4452		return;
4453
4454	pr_info("auto reconnect after blocklisted\n");
4455	ceph_force_reconnect(fsc->sb);
4456}
4457
4458bool check_session_state(struct ceph_mds_session *s)
4459{
4460	switch (s->s_state) {
4461	case CEPH_MDS_SESSION_OPEN:
4462		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4463			s->s_state = CEPH_MDS_SESSION_HUNG;
4464			pr_info("mds%d hung\n", s->s_mds);
4465		}
4466		break;
4467	case CEPH_MDS_SESSION_CLOSING:
4468		/* Should never reach this when we're unmounting */
4469		WARN_ON_ONCE(s->s_ttl);
4470		fallthrough;
4471	case CEPH_MDS_SESSION_NEW:
4472	case CEPH_MDS_SESSION_RESTARTING:
4473	case CEPH_MDS_SESSION_CLOSED:
4474	case CEPH_MDS_SESSION_REJECTED:
4475		return false;
4476	}
4477
4478	return true;
4479}
4480
4481/*
4482 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4483 * then we need to retransmit that request.
4484 */
4485void inc_session_sequence(struct ceph_mds_session *s)
4486{
4487	lockdep_assert_held(&s->s_mutex);
4488
4489	s->s_seq++;
4490
4491	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4492		int ret;
4493
4494		dout("resending session close request for mds%d\n", s->s_mds);
4495		ret = request_close_session(s);
4496		if (ret < 0)
4497			pr_err("unable to close session to mds%d: %d\n",
4498			       s->s_mds, ret);
4499	}
4500}
4501
4502/*
4503 * delayed work -- periodically trim expired leases, renew caps with mds.  If
4504 * the @delay parameter is set to 0 or if it's more than 5 secs, the default
4505 * workqueue delay value of 5 secs will be used.
4506 */
4507static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
4508{
4509	unsigned long max_delay = HZ * 5;
4510
4511	/* 5 secs default delay */
4512	if (!delay || (delay > max_delay))
4513		delay = max_delay;
4514	schedule_delayed_work(&mdsc->delayed_work,
4515			      round_jiffies_relative(delay));
4516}
4517
4518static void delayed_work(struct work_struct *work)
4519{
4520	struct ceph_mds_client *mdsc =
4521		container_of(work, struct ceph_mds_client, delayed_work.work);
4522	unsigned long delay;
4523	int renew_interval;
4524	int renew_caps;
4525	int i;
4526
4527	dout("mdsc delayed_work\n");
4528
4529	if (mdsc->stopping)
4530		return;
4531
4532	mutex_lock(&mdsc->mutex);
4533	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4534	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4535				   mdsc->last_renew_caps);
4536	if (renew_caps)
4537		mdsc->last_renew_caps = jiffies;
4538
4539	for (i = 0; i < mdsc->max_sessions; i++) {
4540		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4541		if (!s)
4542			continue;
4543
4544		if (!check_session_state(s)) {
4545			ceph_put_mds_session(s);
4546			continue;
4547		}
4548		mutex_unlock(&mdsc->mutex);
4549
4550		mutex_lock(&s->s_mutex);
4551		if (renew_caps)
4552			send_renew_caps(mdsc, s);
4553		else
4554			ceph_con_keepalive(&s->s_con);
4555		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4556		    s->s_state == CEPH_MDS_SESSION_HUNG)
4557			ceph_send_cap_releases(mdsc, s);
4558		mutex_unlock(&s->s_mutex);
4559		ceph_put_mds_session(s);
4560
4561		mutex_lock(&mdsc->mutex);
4562	}
4563	mutex_unlock(&mdsc->mutex);
4564
4565	delay = ceph_check_delayed_caps(mdsc);
4566
4567	ceph_queue_cap_reclaim_work(mdsc);
4568
4569	ceph_trim_snapid_map(mdsc);
4570
4571	maybe_recover_session(mdsc);
4572
4573	schedule_delayed(mdsc, delay);
4574}
4575
4576int ceph_mdsc_init(struct ceph_fs_client *fsc)
4577
4578{
4579	struct ceph_mds_client *mdsc;
4580	int err;
4581
4582	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4583	if (!mdsc)
4584		return -ENOMEM;
4585	mdsc->fsc = fsc;
4586	mutex_init(&mdsc->mutex);
4587	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4588	if (!mdsc->mdsmap) {
4589		err = -ENOMEM;
4590		goto err_mdsc;
4591	}
4592
4593	init_completion(&mdsc->safe_umount_waiters);
4594	init_waitqueue_head(&mdsc->session_close_wq);
4595	INIT_LIST_HEAD(&mdsc->waiting_for_map);
4596	mdsc->quotarealms_inodes = RB_ROOT;
4597	mutex_init(&mdsc->quotarealms_inodes_mutex);
4598	init_rwsem(&mdsc->snap_rwsem);
4599	mdsc->snap_realms = RB_ROOT;
4600	INIT_LIST_HEAD(&mdsc->snap_empty);
4601	spin_lock_init(&mdsc->snap_empty_lock);
4602	mdsc->request_tree = RB_ROOT;
4603	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4604	mdsc->last_renew_caps = jiffies;
4605	INIT_LIST_HEAD(&mdsc->cap_delay_list);
4606	INIT_LIST_HEAD(&mdsc->cap_wait_list);
4607	spin_lock_init(&mdsc->cap_delay_lock);
4608	INIT_LIST_HEAD(&mdsc->snap_flush_list);
4609	spin_lock_init(&mdsc->snap_flush_lock);
4610	mdsc->last_cap_flush_tid = 1;
4611	INIT_LIST_HEAD(&mdsc->cap_flush_list);
4612	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4613	spin_lock_init(&mdsc->cap_dirty_lock);
4614	init_waitqueue_head(&mdsc->cap_flushing_wq);
4615	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4616	err = ceph_metric_init(&mdsc->metric);
4617	if (err)
4618		goto err_mdsmap;
4619
4620	spin_lock_init(&mdsc->dentry_list_lock);
4621	INIT_LIST_HEAD(&mdsc->dentry_leases);
4622	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4623
4624	ceph_caps_init(mdsc);
4625	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4626
4627	spin_lock_init(&mdsc->snapid_map_lock);
4628	mdsc->snapid_map_tree = RB_ROOT;
4629	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4630
4631	init_rwsem(&mdsc->pool_perm_rwsem);
4632	mdsc->pool_perm_tree = RB_ROOT;
4633
4634	strscpy(mdsc->nodename, utsname()->nodename,
4635		sizeof(mdsc->nodename));
4636
4637	fsc->mdsc = mdsc;
4638	return 0;
4639
4640err_mdsmap:
4641	kfree(mdsc->mdsmap);
4642err_mdsc:
4643	kfree(mdsc);
4644	return err;
4645}
4646
4647/*
4648 * Wait for safe replies on open mds requests.  If we time out, drop
4649 * all requests from the tree to avoid dangling dentry refs.
4650 */
4651static void wait_requests(struct ceph_mds_client *mdsc)
4652{
4653	struct ceph_options *opts = mdsc->fsc->client->options;
4654	struct ceph_mds_request *req;
4655
4656	mutex_lock(&mdsc->mutex);
4657	if (__get_oldest_req(mdsc)) {
4658		mutex_unlock(&mdsc->mutex);
4659
4660		dout("wait_requests waiting for requests\n");
4661		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4662				    ceph_timeout_jiffies(opts->mount_timeout));
4663
4664		/* tear down remaining requests */
4665		mutex_lock(&mdsc->mutex);
4666		while ((req = __get_oldest_req(mdsc))) {
4667			dout("wait_requests timed out on tid %llu\n",
4668			     req->r_tid);
4669			list_del_init(&req->r_wait);
4670			__unregister_request(mdsc, req);
4671		}
4672	}
4673	mutex_unlock(&mdsc->mutex);
4674	dout("wait_requests done\n");
4675}
4676
4677void send_flush_mdlog(struct ceph_mds_session *s)
4678{
4679	struct ceph_msg *msg;
4680
4681	/*
4682	 * Pre-luminous MDS crashes when it sees an unknown session request
4683	 */
4684	if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
4685		return;
4686
4687	mutex_lock(&s->s_mutex);
4688	dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
4689	     ceph_session_state_name(s->s_state), s->s_seq);
4690	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
4691				      s->s_seq);
4692	if (!msg) {
4693		pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
4694		       s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
4695	} else {
4696		ceph_con_send(&s->s_con, msg);
4697	}
4698	mutex_unlock(&s->s_mutex);
4699}
4700
4701/*
4702 * called before mount is ro, and before dentries are torn down.
4703 * (hmm, does this still race with new lookups?)
4704 */
4705void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4706{
4707	dout("pre_umount\n");
4708	mdsc->stopping = 1;
4709
4710	ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
4711	ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
4712	ceph_flush_dirty_caps(mdsc);
4713	wait_requests(mdsc);
4714
4715	/*
4716	 * wait for reply handlers to drop their request refs and
4717	 * their inode/dcache refs
4718	 */
4719	ceph_msgr_flush();
4720
4721	ceph_cleanup_quotarealms_inodes(mdsc);
4722}
4723
4724/*
4725 * wait for all write mds requests to flush.
4726 */
4727static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4728{
4729	struct ceph_mds_request *req = NULL, *nextreq;
4730	struct rb_node *n;
4731
4732	mutex_lock(&mdsc->mutex);
4733	dout("wait_unsafe_requests want %lld\n", want_tid);
4734restart:
4735	req = __get_oldest_req(mdsc);
4736	while (req && req->r_tid <= want_tid) {
4737		/* find next request */
4738		n = rb_next(&req->r_node);
4739		if (n)
4740			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4741		else
4742			nextreq = NULL;
4743		if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4744		    (req->r_op & CEPH_MDS_OP_WRITE)) {
4745			/* write op */
4746			ceph_mdsc_get_request(req);
4747			if (nextreq)
4748				ceph_mdsc_get_request(nextreq);
4749			mutex_unlock(&mdsc->mutex);
4750			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4751			     req->r_tid, want_tid);
4752			wait_for_completion(&req->r_safe_completion);
4753			mutex_lock(&mdsc->mutex);
4754			ceph_mdsc_put_request(req);
4755			if (!nextreq)
4756				break;  /* next dne before, so we're done! */
4757			if (RB_EMPTY_NODE(&nextreq->r_node)) {
4758				/* next request was removed from tree */
4759				ceph_mdsc_put_request(nextreq);
4760				goto restart;
4761			}
4762			ceph_mdsc_put_request(nextreq);  /* won't go away */
4763		}
4764		req = nextreq;
4765	}
4766	mutex_unlock(&mdsc->mutex);
4767	dout("wait_unsafe_requests done\n");
4768}
4769
4770void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4771{
4772	u64 want_tid, want_flush;
4773
4774	if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
4775		return;
4776
4777	dout("sync\n");
4778	mutex_lock(&mdsc->mutex);
4779	want_tid = mdsc->last_tid;
4780	mutex_unlock(&mdsc->mutex);
4781
4782	ceph_flush_dirty_caps(mdsc);
4783	spin_lock(&mdsc->cap_dirty_lock);
4784	want_flush = mdsc->last_cap_flush_tid;
4785	if (!list_empty(&mdsc->cap_flush_list)) {
4786		struct ceph_cap_flush *cf =
4787			list_last_entry(&mdsc->cap_flush_list,
4788					struct ceph_cap_flush, g_list);
4789		cf->wake = true;
4790	}
4791	spin_unlock(&mdsc->cap_dirty_lock);
4792
4793	dout("sync want tid %lld flush_seq %lld\n",
4794	     want_tid, want_flush);
4795
4796	wait_unsafe_requests(mdsc, want_tid);
4797	wait_caps_flush(mdsc, want_flush);
4798}
4799
4800/*
4801 * true if all sessions are closed, or we force unmount
4802 */
4803static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4804{
4805	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4806		return true;
4807	return atomic_read(&mdsc->num_sessions) <= skipped;
4808}
4809
4810/*
4811 * called after sb is ro.
4812 */
4813void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4814{
4815	struct ceph_options *opts = mdsc->fsc->client->options;
4816	struct ceph_mds_session *session;
4817	int i;
4818	int skipped = 0;
4819
4820	dout("close_sessions\n");
4821
4822	/* close sessions */
4823	mutex_lock(&mdsc->mutex);
4824	for (i = 0; i < mdsc->max_sessions; i++) {
4825		session = __ceph_lookup_mds_session(mdsc, i);
4826		if (!session)
4827			continue;
4828		mutex_unlock(&mdsc->mutex);
4829		mutex_lock(&session->s_mutex);
4830		if (__close_session(mdsc, session) <= 0)
4831			skipped++;
4832		mutex_unlock(&session->s_mutex);
4833		ceph_put_mds_session(session);
4834		mutex_lock(&mdsc->mutex);
4835	}
4836	mutex_unlock(&mdsc->mutex);
4837
4838	dout("waiting for sessions to close\n");
4839	wait_event_timeout(mdsc->session_close_wq,
4840			   done_closing_sessions(mdsc, skipped),
4841			   ceph_timeout_jiffies(opts->mount_timeout));
4842
4843	/* tear down remaining sessions */
4844	mutex_lock(&mdsc->mutex);
4845	for (i = 0; i < mdsc->max_sessions; i++) {
4846		if (mdsc->sessions[i]) {
4847			session = ceph_get_mds_session(mdsc->sessions[i]);
4848			__unregister_session(mdsc, session);
4849			mutex_unlock(&mdsc->mutex);
4850			mutex_lock(&session->s_mutex);
4851			remove_session_caps(session);
4852			mutex_unlock(&session->s_mutex);
4853			ceph_put_mds_session(session);
4854			mutex_lock(&mdsc->mutex);
4855		}
4856	}
4857	WARN_ON(!list_empty(&mdsc->cap_delay_list));
4858	mutex_unlock(&mdsc->mutex);
4859
4860	ceph_cleanup_snapid_map(mdsc);
4861	ceph_cleanup_empty_realms(mdsc);
4862
4863	cancel_work_sync(&mdsc->cap_reclaim_work);
4864	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4865
4866	dout("stopped\n");
4867}
4868
4869void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4870{
4871	struct ceph_mds_session *session;
4872	int mds;
4873
4874	dout("force umount\n");
4875
4876	mutex_lock(&mdsc->mutex);
4877	for (mds = 0; mds < mdsc->max_sessions; mds++) {
4878		session = __ceph_lookup_mds_session(mdsc, mds);
4879		if (!session)
4880			continue;
4881
4882		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4883			__unregister_session(mdsc, session);
4884		__wake_requests(mdsc, &session->s_waiting);
4885		mutex_unlock(&mdsc->mutex);
4886
4887		mutex_lock(&session->s_mutex);
4888		__close_session(mdsc, session);
4889		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4890			cleanup_session_requests(mdsc, session);
4891			remove_session_caps(session);
4892		}
4893		mutex_unlock(&session->s_mutex);
4894		ceph_put_mds_session(session);
4895
4896		mutex_lock(&mdsc->mutex);
4897		kick_requests(mdsc, mds);
4898	}
4899	__wake_requests(mdsc, &mdsc->waiting_for_map);
4900	mutex_unlock(&mdsc->mutex);
4901}
4902
4903static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4904{
4905	dout("stop\n");
4906	/*
4907	 * Make sure the delayed work stopped before releasing
4908	 * the resources.
4909	 *
4910	 * Because the cancel_delayed_work_sync() will only
4911	 * guarantee that the work finishes executing. But the
4912	 * delayed work will re-arm itself again after that.
4913	 */
4914	flush_delayed_work(&mdsc->delayed_work);
4915
4916	if (mdsc->mdsmap)
4917		ceph_mdsmap_destroy(mdsc->mdsmap);
4918	kfree(mdsc->sessions);
4919	ceph_caps_finalize(mdsc);
4920	ceph_pool_perm_destroy(mdsc);
4921}
4922
4923void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4924{
4925	struct ceph_mds_client *mdsc = fsc->mdsc;
4926	dout("mdsc_destroy %p\n", mdsc);
4927
4928	if (!mdsc)
4929		return;
4930
4931	/* flush out any connection work with references to us */
4932	ceph_msgr_flush();
4933
4934	ceph_mdsc_stop(mdsc);
4935
4936	ceph_metric_destroy(&mdsc->metric);
4937
4938	fsc->mdsc = NULL;
4939	kfree(mdsc);
4940	dout("mdsc_destroy %p done\n", mdsc);
4941}
4942
4943void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4944{
4945	struct ceph_fs_client *fsc = mdsc->fsc;
4946	const char *mds_namespace = fsc->mount_options->mds_namespace;
4947	void *p = msg->front.iov_base;
4948	void *end = p + msg->front.iov_len;
4949	u32 epoch;
4950	u32 num_fs;
4951	u32 mount_fscid = (u32)-1;
4952	int err = -EINVAL;
4953
4954	ceph_decode_need(&p, end, sizeof(u32), bad);
4955	epoch = ceph_decode_32(&p);
4956
4957	dout("handle_fsmap epoch %u\n", epoch);
4958
4959	/* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
4960	ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
4961
4962	ceph_decode_32_safe(&p, end, num_fs, bad);
4963	while (num_fs-- > 0) {
4964		void *info_p, *info_end;
4965		u32 info_len;
4966		u32 fscid, namelen;
4967
4968		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4969		p += 2;		// info_v, info_cv
4970		info_len = ceph_decode_32(&p);
4971		ceph_decode_need(&p, end, info_len, bad);
4972		info_p = p;
4973		info_end = p + info_len;
4974		p = info_end;
4975
4976		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4977		fscid = ceph_decode_32(&info_p);
4978		namelen = ceph_decode_32(&info_p);
4979		ceph_decode_need(&info_p, info_end, namelen, bad);
4980
4981		if (mds_namespace &&
4982		    strlen(mds_namespace) == namelen &&
4983		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
4984			mount_fscid = fscid;
4985			break;
4986		}
4987	}
4988
4989	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4990	if (mount_fscid != (u32)-1) {
4991		fsc->client->monc.fs_cluster_id = mount_fscid;
4992		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4993				   0, true);
4994		ceph_monc_renew_subs(&fsc->client->monc);
4995	} else {
4996		err = -ENOENT;
4997		goto err_out;
4998	}
4999	return;
5000
5001bad:
5002	pr_err("error decoding fsmap\n");
5003err_out:
5004	mutex_lock(&mdsc->mutex);
5005	mdsc->mdsmap_err = err;
5006	__wake_requests(mdsc, &mdsc->waiting_for_map);
5007	mutex_unlock(&mdsc->mutex);
5008}
5009
5010/*
5011 * handle mds map update.
5012 */
5013void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5014{
5015	u32 epoch;
5016	u32 maplen;
5017	void *p = msg->front.iov_base;
5018	void *end = p + msg->front.iov_len;
5019	struct ceph_mdsmap *newmap, *oldmap;
5020	struct ceph_fsid fsid;
5021	int err = -EINVAL;
5022
5023	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5024	ceph_decode_copy(&p, &fsid, sizeof(fsid));
5025	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5026		return;
5027	epoch = ceph_decode_32(&p);
5028	maplen = ceph_decode_32(&p);
5029	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5030
5031	/* do we need it? */
5032	mutex_lock(&mdsc->mutex);
5033	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5034		dout("handle_map epoch %u <= our %u\n",
5035		     epoch, mdsc->mdsmap->m_epoch);
5036		mutex_unlock(&mdsc->mutex);
5037		return;
5038	}
5039
5040	newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
5041	if (IS_ERR(newmap)) {
5042		err = PTR_ERR(newmap);
5043		goto bad_unlock;
5044	}
5045
5046	/* swap into place */
5047	if (mdsc->mdsmap) {
5048		oldmap = mdsc->mdsmap;
5049		mdsc->mdsmap = newmap;
5050		check_new_map(mdsc, newmap, oldmap);
5051		ceph_mdsmap_destroy(oldmap);
5052	} else {
5053		mdsc->mdsmap = newmap;  /* first mds map */
5054	}
5055	mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5056					MAX_LFS_FILESIZE);
5057
5058	__wake_requests(mdsc, &mdsc->waiting_for_map);
5059	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5060			  mdsc->mdsmap->m_epoch);
5061
5062	mutex_unlock(&mdsc->mutex);
5063	schedule_delayed(mdsc, 0);
5064	return;
5065
5066bad_unlock:
5067	mutex_unlock(&mdsc->mutex);
5068bad:
5069	pr_err("error decoding mdsmap %d\n", err);
5070	return;
5071}
5072
5073static struct ceph_connection *mds_get_con(struct ceph_connection *con)
5074{
5075	struct ceph_mds_session *s = con->private;
5076
5077	if (ceph_get_mds_session(s))
5078		return con;
5079	return NULL;
5080}
5081
5082static void mds_put_con(struct ceph_connection *con)
5083{
5084	struct ceph_mds_session *s = con->private;
5085
5086	ceph_put_mds_session(s);
5087}
5088
5089/*
5090 * if the client is unresponsive for long enough, the mds will kill
5091 * the session entirely.
5092 */
5093static void mds_peer_reset(struct ceph_connection *con)
5094{
5095	struct ceph_mds_session *s = con->private;
5096	struct ceph_mds_client *mdsc = s->s_mdsc;
5097
5098	pr_warn("mds%d closed our session\n", s->s_mds);
5099	send_mds_reconnect(mdsc, s);
5100}
5101
5102static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5103{
5104	struct ceph_mds_session *s = con->private;
5105	struct ceph_mds_client *mdsc = s->s_mdsc;
5106	int type = le16_to_cpu(msg->hdr.type);
5107
5108	mutex_lock(&mdsc->mutex);
5109	if (__verify_registered_session(mdsc, s) < 0) {
5110		mutex_unlock(&mdsc->mutex);
5111		goto out;
5112	}
5113	mutex_unlock(&mdsc->mutex);
5114
5115	switch (type) {
5116	case CEPH_MSG_MDS_MAP:
5117		ceph_mdsc_handle_mdsmap(mdsc, msg);
5118		break;
5119	case CEPH_MSG_FS_MAP_USER:
5120		ceph_mdsc_handle_fsmap(mdsc, msg);
5121		break;
5122	case CEPH_MSG_CLIENT_SESSION:
5123		handle_session(s, msg);
5124		break;
5125	case CEPH_MSG_CLIENT_REPLY:
5126		handle_reply(s, msg);
5127		break;
5128	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5129		handle_forward(mdsc, s, msg);
5130		break;
5131	case CEPH_MSG_CLIENT_CAPS:
5132		ceph_handle_caps(s, msg);
5133		break;
5134	case CEPH_MSG_CLIENT_SNAP:
5135		ceph_handle_snap(mdsc, s, msg);
5136		break;
5137	case CEPH_MSG_CLIENT_LEASE:
5138		handle_lease(mdsc, s, msg);
5139		break;
5140	case CEPH_MSG_CLIENT_QUOTA:
5141		ceph_handle_quota(mdsc, s, msg);
5142		break;
5143
5144	default:
5145		pr_err("received unknown message type %d %s\n", type,
5146		       ceph_msg_type_name(type));
5147	}
5148out:
5149	ceph_msg_put(msg);
5150}
5151
5152/*
5153 * authentication
5154 */
5155
5156/*
5157 * Note: returned pointer is the address of a structure that's
5158 * managed separately.  Caller must *not* attempt to free it.
5159 */
5160static struct ceph_auth_handshake *
5161mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5162{
5163	struct ceph_mds_session *s = con->private;
5164	struct ceph_mds_client *mdsc = s->s_mdsc;
5165	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5166	struct ceph_auth_handshake *auth = &s->s_auth;
5167	int ret;
5168
5169	ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5170					 force_new, proto, NULL, NULL);
5171	if (ret)
5172		return ERR_PTR(ret);
5173
5174	return auth;
5175}
5176
5177static int mds_add_authorizer_challenge(struct ceph_connection *con,
5178				    void *challenge_buf, int challenge_buf_len)
5179{
5180	struct ceph_mds_session *s = con->private;
5181	struct ceph_mds_client *mdsc = s->s_mdsc;
5182	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5183
5184	return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5185					    challenge_buf, challenge_buf_len);
5186}
5187
5188static int mds_verify_authorizer_reply(struct ceph_connection *con)
5189{
5190	struct ceph_mds_session *s = con->private;
5191	struct ceph_mds_client *mdsc = s->s_mdsc;
5192	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5193	struct ceph_auth_handshake *auth = &s->s_auth;
5194
5195	return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5196		auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5197		NULL, NULL, NULL, NULL);
5198}
5199
5200static int mds_invalidate_authorizer(struct ceph_connection *con)
5201{
5202	struct ceph_mds_session *s = con->private;
5203	struct ceph_mds_client *mdsc = s->s_mdsc;
5204	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5205
5206	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5207
5208	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5209}
5210
5211static int mds_get_auth_request(struct ceph_connection *con,
5212				void *buf, int *buf_len,
5213				void **authorizer, int *authorizer_len)
5214{
5215	struct ceph_mds_session *s = con->private;
5216	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5217	struct ceph_auth_handshake *auth = &s->s_auth;
5218	int ret;
5219
5220	ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5221				       buf, buf_len);
5222	if (ret)
5223		return ret;
5224
5225	*authorizer = auth->authorizer_buf;
5226	*authorizer_len = auth->authorizer_buf_len;
5227	return 0;
5228}
5229
5230static int mds_handle_auth_reply_more(struct ceph_connection *con,
5231				      void *reply, int reply_len,
5232				      void *buf, int *buf_len,
5233				      void **authorizer, int *authorizer_len)
5234{
5235	struct ceph_mds_session *s = con->private;
5236	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5237	struct ceph_auth_handshake *auth = &s->s_auth;
5238	int ret;
5239
5240	ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5241					      buf, buf_len);
5242	if (ret)
5243		return ret;
5244
5245	*authorizer = auth->authorizer_buf;
5246	*authorizer_len = auth->authorizer_buf_len;
5247	return 0;
5248}
5249
5250static int mds_handle_auth_done(struct ceph_connection *con,
5251				u64 global_id, void *reply, int reply_len,
5252				u8 *session_key, int *session_key_len,
5253				u8 *con_secret, int *con_secret_len)
5254{
5255	struct ceph_mds_session *s = con->private;
5256	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5257	struct ceph_auth_handshake *auth = &s->s_auth;
5258
5259	return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5260					       session_key, session_key_len,
5261					       con_secret, con_secret_len);
5262}
5263
5264static int mds_handle_auth_bad_method(struct ceph_connection *con,
5265				      int used_proto, int result,
5266				      const int *allowed_protos, int proto_cnt,
5267				      const int *allowed_modes, int mode_cnt)
5268{
5269	struct ceph_mds_session *s = con->private;
5270	struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
5271	int ret;
5272
5273	if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
5274					    used_proto, result,
5275					    allowed_protos, proto_cnt,
5276					    allowed_modes, mode_cnt)) {
5277		ret = ceph_monc_validate_auth(monc);
5278		if (ret)
5279			return ret;
5280	}
5281
5282	return -EACCES;
5283}
5284
5285static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5286				struct ceph_msg_header *hdr, int *skip)
5287{
5288	struct ceph_msg *msg;
5289	int type = (int) le16_to_cpu(hdr->type);
5290	int front_len = (int) le32_to_cpu(hdr->front_len);
5291
5292	if (con->in_msg)
5293		return con->in_msg;
5294
5295	*skip = 0;
5296	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5297	if (!msg) {
5298		pr_err("unable to allocate msg type %d len %d\n",
5299		       type, front_len);
5300		return NULL;
5301	}
5302
5303	return msg;
5304}
5305
5306static int mds_sign_message(struct ceph_msg *msg)
5307{
5308       struct ceph_mds_session *s = msg->con->private;
5309       struct ceph_auth_handshake *auth = &s->s_auth;
5310
5311       return ceph_auth_sign_message(auth, msg);
5312}
5313
5314static int mds_check_message_signature(struct ceph_msg *msg)
5315{
5316       struct ceph_mds_session *s = msg->con->private;
5317       struct ceph_auth_handshake *auth = &s->s_auth;
5318
5319       return ceph_auth_check_message_signature(auth, msg);
5320}
5321
5322static const struct ceph_connection_operations mds_con_ops = {
5323	.get = mds_get_con,
5324	.put = mds_put_con,
5325	.alloc_msg = mds_alloc_msg,
5326	.dispatch = mds_dispatch,
5327	.peer_reset = mds_peer_reset,
5328	.get_authorizer = mds_get_authorizer,
5329	.add_authorizer_challenge = mds_add_authorizer_challenge,
5330	.verify_authorizer_reply = mds_verify_authorizer_reply,
5331	.invalidate_authorizer = mds_invalidate_authorizer,
5332	.sign_message = mds_sign_message,
5333	.check_message_signature = mds_check_message_signature,
5334	.get_auth_request = mds_get_auth_request,
5335	.handle_auth_reply_more = mds_handle_auth_reply_more,
5336	.handle_auth_done = mds_handle_auth_done,
5337	.handle_auth_bad_method = mds_handle_auth_bad_method,
5338};
5339
5340/* eof */
5341