Lines Matching defs:monc

37 static int __validate_auth(struct ceph_mon_client *monc);
175 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
177 monc->pending_auth = 1;
178 monc->m_auth->front.iov_len = len;
179 monc->m_auth->hdr.front_len = cpu_to_le32(len);
180 ceph_msg_revoke(monc->m_auth);
181 ceph_msg_get(monc->m_auth); /* keep our ref */
182 ceph_con_send(&monc->con, monc->m_auth);
188 static void __close_session(struct ceph_mon_client *monc)
190 dout("__close_session closing mon%d\n", monc->cur_mon);
191 ceph_msg_revoke(monc->m_auth);
192 ceph_msg_revoke_incoming(monc->m_auth_reply);
193 ceph_msg_revoke(monc->m_subscribe);
194 ceph_msg_revoke_incoming(monc->m_subscribe_ack);
195 ceph_con_close(&monc->con);
197 monc->pending_auth = 0;
198 ceph_auth_reset(monc->auth);
205 static void pick_new_mon(struct ceph_mon_client *monc)
207 int old_mon = monc->cur_mon;
209 BUG_ON(monc->monmap->num_mon < 1);
211 if (monc->monmap->num_mon == 1) {
212 monc->cur_mon = 0;
214 int max = monc->monmap->num_mon;
218 if (monc->cur_mon >= 0) {
219 if (monc->cur_mon < monc->monmap->num_mon)
220 o = monc->cur_mon;
229 monc->cur_mon = n;
233 monc->cur_mon, monc->monmap->num_mon);
239 static void __open_session(struct ceph_mon_client *monc)
243 pick_new_mon(monc);
245 monc->hunting = true;
246 if (monc->had_a_connection) {
247 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
248 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
249 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
252 monc->sub_renew_after = jiffies; /* i.e., expired */
253 monc->sub_renew_sent = 0;
255 dout("%s opening mon%d\n", __func__, monc->cur_mon);
256 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
257 &monc->monmap->mon_inst[monc->cur_mon].addr);
265 ceph_con_keepalive(&monc->con);
266 if (ceph_msgr2(monc->client)) {
267 monc->pending_auth = 1;
272 ret = ceph_auth_build_hello(monc->auth,
273 monc->m_auth->front.iov_base,
274 monc->m_auth->front_alloc_len);
276 __send_prepared_auth_request(monc, ret);
279 static void reopen_session(struct ceph_mon_client *monc)
281 if (!monc->hunting)
283 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
285 __close_session(monc);
286 __open_session(monc);
289 void ceph_monc_reopen_session(struct ceph_mon_client *monc)
291 mutex_lock(&monc->mutex);
292 reopen_session(monc);
293 mutex_unlock(&monc->mutex);
296 static void un_backoff(struct ceph_mon_client *monc)
298 monc->hunt_mult /= 2; /* reduce by 50% */
299 if (monc->hunt_mult < 1)
300 monc->hunt_mult = 1;
301 dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
307 static void __schedule_delayed(struct ceph_mon_client *monc)
311 if (monc->hunting)
312 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
317 mod_delayed_work(system_wq, &monc->delayed_work,
330 * monc->subs.
332 static void __send_subscribe(struct ceph_mon_client *monc)
334 struct ceph_msg *msg = monc->m_subscribe;
340 dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
342 BUG_ON(monc->cur_mon < 0);
344 if (!monc->sub_renew_sent)
345 monc->sub_renew_sent = jiffies | 1; /* never 0 */
349 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
350 if (monc->subs[i].want)
355 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
359 if (!monc->subs[i].want)
364 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
365 len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
368 le64_to_cpu(monc->subs[i].item.start),
369 monc->subs[i].item.flags);
371 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
372 p += sizeof(monc->subs[i].item);
379 ceph_con_send(&monc->con, ceph_msg_get(msg));
382 static void handle_subscribe_ack(struct ceph_mon_client *monc,
392 mutex_lock(&monc->mutex);
393 if (monc->sub_renew_sent) {
398 monc->sub_renew_after = monc->sub_renew_sent +
401 monc->sub_renew_sent, seconds, monc->sub_renew_after);
402 monc->sub_renew_sent = 0;
405 monc->sub_renew_sent, monc->sub_renew_after);
407 mutex_unlock(&monc->mutex);
420 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
429 if (monc->subs[sub].want &&
430 monc->subs[sub].item.start == start &&
431 monc->subs[sub].item.flags == flags)
434 monc->subs[sub].item.start = start;
435 monc->subs[sub].item.flags = flags;
436 monc->subs[sub].want = true;
441 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
446 mutex_lock(&monc->mutex);
447 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
448 mutex_unlock(&monc->mutex);
459 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
464 if (monc->subs[sub].want) {
465 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
466 monc->subs[sub].want = false;
468 monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
471 monc->subs[sub].have = epoch;
474 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
476 mutex_lock(&monc->mutex);
477 __ceph_monc_got_map(monc, sub, epoch);
478 mutex_unlock(&monc->mutex);
482 void ceph_monc_renew_subs(struct ceph_mon_client *monc)
484 mutex_lock(&monc->mutex);
485 __send_subscribe(monc);
486 mutex_unlock(&monc->mutex);
496 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
502 mutex_lock(&monc->mutex);
503 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
504 mutex_unlock(&monc->mutex);
509 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
510 monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
515 mutex_lock(&monc->mutex);
518 mutex_unlock(&monc->mutex);
527 int ceph_monc_open_session(struct ceph_mon_client *monc)
529 mutex_lock(&monc->mutex);
530 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
531 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
532 __open_session(monc);
533 __schedule_delayed(monc);
534 mutex_unlock(&monc->mutex);
539 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
542 struct ceph_client *client = monc->client;
546 mutex_lock(&monc->mutex);
565 kfree(monc->monmap);
566 monc->monmap = monmap;
568 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
572 mutex_unlock(&monc->mutex);
610 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
618 req->monc = monc;
629 struct ceph_mon_client *monc = req->monc;
634 req->tid = ++monc->last_tid;
635 insert_generic_request(&monc->generic_request_tree, req);
638 static void send_generic_request(struct ceph_mon_client *monc,
645 ceph_con_send(&monc->con, ceph_msg_get(req->request));
650 struct ceph_mon_client *monc = req->monc;
653 erase_generic_request(&monc->generic_request_tree, req);
676 struct ceph_mon_client *monc = req->monc;
681 mutex_lock(&monc->mutex);
682 lookup_req = lookup_generic_request(&monc->generic_request_tree,
689 mutex_unlock(&monc->mutex);
710 struct ceph_mon_client *monc = con->private;
715 mutex_lock(&monc->mutex);
716 req = lookup_generic_request(&monc->generic_request_tree, tid);
731 mutex_unlock(&monc->mutex);
738 static void handle_statfs_reply(struct ceph_mon_client *monc,
750 mutex_lock(&monc->mutex);
751 req = lookup_generic_request(&monc->generic_request_tree, tid);
753 mutex_unlock(&monc->mutex);
760 mutex_unlock(&monc->mutex);
773 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
780 req = alloc_generic_request(monc, GFP_NOFS);
796 mutex_lock(&monc->mutex);
803 h->fsid = monc->monmap->fsid;
806 send_generic_request(monc, req);
807 mutex_unlock(&monc->mutex);
816 static void handle_get_version_reply(struct ceph_mon_client *monc,
832 mutex_lock(&monc->mutex);
833 req = lookup_generic_request(&monc->generic_request_tree, handle);
835 mutex_unlock(&monc->mutex);
842 mutex_unlock(&monc->mutex);
853 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
858 req = alloc_generic_request(monc, GFP_NOIO);
876 mutex_lock(&monc->mutex);
886 send_generic_request(monc, req);
887 mutex_unlock(&monc->mutex);
901 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
907 req = __ceph_monc_get_version(monc, what, NULL, 0);
925 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
930 req = __ceph_monc_get_version(monc, what, cb, private_data);
939 static void handle_command_ack(struct ceph_mon_client *monc,
953 mutex_lock(&monc->mutex);
954 req = lookup_generic_request(&monc->generic_request_tree, tid);
956 mutex_unlock(&monc->mutex);
962 mutex_unlock(&monc->mutex);
973 int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
981 req = alloc_generic_request(monc, GFP_NOIO);
994 mutex_lock(&monc->mutex);
1000 h->fsid = monc->monmap->fsid;
1004 send_generic_request(monc, req);
1005 mutex_unlock(&monc->mutex);
1014 int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
1020 ret = do_mon_command_vargs(monc, fmt, ap);
1025 int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
1030 ret = do_mon_command(monc,
1042 ret = do_mon_command(monc,
1058 return ceph_wait_for_latest_osdmap(monc->client, 0);
1065 static void __resend_generic_request(struct ceph_mon_client *monc)
1070 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
1074 ceph_con_send(&monc->con, ceph_msg_get(req->request));
1085 struct ceph_mon_client *monc =
1088 dout("monc delayed_work\n");
1089 mutex_lock(&monc->mutex);
1090 if (monc->hunting) {
1092 reopen_session(monc);
1094 int is_auth = ceph_auth_is_authenticated(monc->auth);
1095 if (ceph_con_keepalive_expired(&monc->con,
1097 dout("monc keepalive timeout\n");
1099 reopen_session(monc);
1102 if (!monc->hunting) {
1103 ceph_con_keepalive(&monc->con);
1104 __validate_auth(monc);
1105 un_backoff(monc);
1109 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
1113 __func__, now, monc->sub_renew_after);
1114 if (time_after_eq(now, monc->sub_renew_after))
1115 __send_subscribe(monc);
1118 __schedule_delayed(monc);
1119 mutex_unlock(&monc->mutex);
1126 static int build_initial_monmap(struct ceph_mon_client *monc)
1128 __le32 my_type = ceph_msgr2(monc->client) ?
1130 struct ceph_options *opt = monc->client->options;
1135 monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
1137 if (!monc->monmap)
1139 monc->monmap->num_mon = num_mon;
1142 struct ceph_entity_inst *inst = &monc->monmap->mon_inst[i];
1154 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1159 memset(monc, 0, sizeof(*monc));
1160 monc->client = cl;
1161 mutex_init(&monc->mutex);
1163 err = build_initial_monmap(monc);
1169 monc->auth = ceph_auth_init(cl->options->name, cl->options->key,
1171 if (IS_ERR(monc->auth)) {
1172 err = PTR_ERR(monc->auth);
1175 monc->auth->want_keys =
1181 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1184 if (!monc->m_subscribe_ack)
1187 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1189 if (!monc->m_subscribe)
1192 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1194 if (!monc->m_auth_reply)
1197 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1198 monc->pending_auth = 0;
1199 if (!monc->m_auth)
1202 ceph_con_init(&monc->con, monc, &mon_con_ops,
1203 &monc->client->msgr);
1205 monc->cur_mon = -1;
1206 monc->had_a_connection = false;
1207 monc->hunt_mult = 1;
1209 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1210 monc->generic_request_tree = RB_ROOT;
1211 monc->last_tid = 0;
1213 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1218 ceph_msg_put(monc->m_auth_reply);
1220 ceph_msg_put(monc->m_subscribe);
1222 ceph_msg_put(monc->m_subscribe_ack);
1224 ceph_auth_destroy(monc->auth);
1226 kfree(monc->monmap);
1232 void ceph_monc_stop(struct ceph_mon_client *monc)
1235 cancel_delayed_work_sync(&monc->delayed_work);
1237 mutex_lock(&monc->mutex);
1238 __close_session(monc);
1239 monc->cur_mon = -1;
1240 mutex_unlock(&monc->mutex);
1250 ceph_auth_destroy(monc->auth);
1252 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1254 ceph_msg_put(monc->m_auth);
1255 ceph_msg_put(monc->m_auth_reply);
1256 ceph_msg_put(monc->m_subscribe);
1257 ceph_msg_put(monc->m_subscribe_ack);
1259 kfree(monc->monmap);
1263 static void finish_hunting(struct ceph_mon_client *monc)
1265 if (monc->hunting) {
1266 dout("%s found mon%d\n", __func__, monc->cur_mon);
1267 monc->hunting = false;
1268 monc->had_a_connection = true;
1269 un_backoff(monc);
1270 __schedule_delayed(monc);
1274 static void finish_auth(struct ceph_mon_client *monc, int auth_err,
1280 monc->pending_auth = 0;
1282 monc->client->auth_err = auth_err;
1283 wake_up_all(&monc->client->auth_wq);
1287 if (!was_authed && ceph_auth_is_authenticated(monc->auth)) {
1289 __func__, monc->auth->global_id);
1291 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1292 monc->client->msgr.inst.name.num =
1293 cpu_to_le64(monc->auth->global_id);
1295 __send_subscribe(monc);
1296 __resend_generic_request(monc);
1298 pr_info("mon%d %s session established\n", monc->cur_mon,
1299 ceph_pr_addr(&monc->con.peer_addr));
1303 static void handle_auth_reply(struct ceph_mon_client *monc,
1309 mutex_lock(&monc->mutex);
1310 was_authed = ceph_auth_is_authenticated(monc->auth);
1311 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1313 monc->m_auth->front.iov_base,
1314 monc->m_auth->front_alloc_len);
1316 __send_prepared_auth_request(monc, ret);
1318 finish_auth(monc, ret, was_authed);
1319 finish_hunting(monc);
1321 mutex_unlock(&monc->mutex);
1324 static int __validate_auth(struct ceph_mon_client *monc)
1328 if (monc->pending_auth)
1331 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1332 monc->m_auth->front_alloc_len);
1335 __send_prepared_auth_request(monc, ret);
1339 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1343 mutex_lock(&monc->mutex);
1344 ret = __validate_auth(monc);
1345 mutex_unlock(&monc->mutex);
1354 struct ceph_mon_client *monc = con->private;
1357 mutex_lock(&monc->mutex);
1358 ret = ceph_auth_get_request(monc->auth, buf, *buf_len);
1359 mutex_unlock(&monc->mutex);
1374 struct ceph_mon_client *monc = con->private;
1377 mutex_lock(&monc->mutex);
1378 ret = ceph_auth_handle_reply_more(monc->auth, reply, reply_len,
1380 mutex_unlock(&monc->mutex);
1395 struct ceph_mon_client *monc = con->private;
1399 mutex_lock(&monc->mutex);
1400 WARN_ON(!monc->hunting);
1401 was_authed = ceph_auth_is_authenticated(monc->auth);
1402 ret = ceph_auth_handle_reply_done(monc->auth, global_id,
1406 finish_auth(monc, ret, was_authed);
1408 finish_hunting(monc);
1409 mutex_unlock(&monc->mutex);
1418 struct ceph_mon_client *monc = con->private;
1421 mutex_lock(&monc->mutex);
1422 WARN_ON(!monc->hunting);
1423 was_authed = ceph_auth_is_authenticated(monc->auth);
1424 ceph_auth_handle_bad_method(monc->auth, used_proto, result,
1427 finish_auth(monc, -EACCES, was_authed);
1428 mutex_unlock(&monc->mutex);
1437 struct ceph_mon_client *monc = con->private;
1442 handle_auth_reply(monc, msg);
1446 handle_subscribe_ack(monc, msg);
1450 handle_statfs_reply(monc, msg);
1454 handle_get_version_reply(monc, msg);
1458 handle_command_ack(monc, msg);
1462 ceph_monc_handle_map(monc, msg);
1466 ceph_osdc_handle_map(&monc->client->osdc, msg);
1471 if (monc->client->extra_mon_dispatch &&
1472 monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1488 struct ceph_mon_client *monc = con->private;
1497 m = ceph_msg_get(monc->m_subscribe_ack);
1503 m = ceph_msg_get(monc->m_auth_reply);
1546 struct ceph_mon_client *monc = con->private;
1548 mutex_lock(&monc->mutex);
1549 dout("%s mon%d\n", __func__, monc->cur_mon);
1550 if (monc->cur_mon >= 0) {
1551 if (!monc->hunting) {
1553 reopen_session(monc);
1554 __schedule_delayed(monc);
1559 mutex_unlock(&monc->mutex);