ceph: mount non-default filesystem by name

To mount non-default filesytem, user currently needs to provide mds
namespace ID. This is inconvenience.

This patch makes user be able to mount filesystem by name. If user
wants to mount non-default filesystem. Client first subscribes to
fsmap.user. Subscribe to mdsmap.<ID> after getting ID of filesystem.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 78a3495..e555745 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2166,6 +2166,11 @@
 	mds = __choose_mds(mdsc, req);
 	if (mds < 0 ||
 	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+		if (mdsc->mdsmap_err) {
+			err = mdsc->mdsmap_err;
+			dout("do_request mdsmap err %d\n", err);
+			goto finish;
+		}
 		dout("do_request no mds or not active, waiting for map\n");
 		list_add(&req->r_wait, &mdsc->waiting_for_map);
 		goto out;
@@ -3683,11 +3688,86 @@
 	dout("mdsc_destroy %p done\n", mdsc);
 }
 
+void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+	struct ceph_fs_client *fsc = mdsc->fsc;
+	const char *mds_namespace = fsc->mount_options->mds_namespace;
+	void *p = msg->front.iov_base;
+	void *end = p + msg->front.iov_len;
+	u32 epoch;
+	u32 map_len;
+	u32 num_fs;
+	u32 mount_fscid = (u32)-1;
+	u8 struct_v, struct_cv;
+	int err = -EINVAL;
+
+	ceph_decode_need(&p, end, sizeof(u32), bad);
+	epoch = ceph_decode_32(&p);
+
+	dout("handle_fsmap epoch %u\n", epoch);
+
+	ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+	struct_v = ceph_decode_8(&p);
+	struct_cv = ceph_decode_8(&p);
+	map_len = ceph_decode_32(&p);
+
+	ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
+	p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
+
+	num_fs = ceph_decode_32(&p);
+	while (num_fs-- > 0) {
+		void *info_p, *info_end;
+		u32 info_len;
+		u8 info_v, info_cv;
+		u32 fscid, namelen;
+
+		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+		info_v = ceph_decode_8(&p);
+		info_cv = ceph_decode_8(&p);
+		info_len = ceph_decode_32(&p);
+		ceph_decode_need(&p, end, info_len, bad);
+		info_p = p;
+		info_end = p + info_len;
+		p = info_end;
+
+		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
+		fscid = ceph_decode_32(&info_p);
+		namelen = ceph_decode_32(&info_p);
+		ceph_decode_need(&info_p, info_end, namelen, bad);
+
+		if (mds_namespace &&
+		    strlen(mds_namespace) == namelen &&
+		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
+			mount_fscid = fscid;
+			break;
+		}
+	}
+
+	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
+	if (mount_fscid != (u32)-1) {
+		fsc->client->monc.fs_cluster_id = mount_fscid;
+		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+				   0, true);
+		ceph_monc_renew_subs(&fsc->client->monc);
+	} else {
+		err = -ENOENT;
+		goto err_out;
+	}
+	return;
+bad:
+	pr_err("error decoding fsmap\n");
+err_out:
+	mutex_lock(&mdsc->mutex);
+	mdsc->mdsmap_err = -ENOENT;
+	__wake_requests(mdsc, &mdsc->waiting_for_map);
+	mutex_unlock(&mdsc->mutex);
+	return;
+}
 
 /*
  * handle mds map update.
  */
-void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
 	u32 epoch;
 	u32 maplen;
@@ -3794,7 +3874,10 @@
 
 	switch (type) {
 	case CEPH_MSG_MDS_MAP:
-		ceph_mdsc_handle_map(mdsc, msg);
+		ceph_mdsc_handle_mdsmap(mdsc, msg);
+		break;
+	case CEPH_MSG_FS_MAP_USER:
+		ceph_mdsc_handle_fsmap(mdsc, msg);
 		break;
 	case CEPH_MSG_CLIENT_SESSION:
 		handle_session(s, msg);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 9dd2c82..3c154b8 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -293,6 +293,7 @@
 	struct completion       safe_umount_waiters;
 	wait_queue_head_t       session_close_wq;
 	struct list_head        waiting_for_map;
+	int 			mdsmap_err;
 
 	struct ceph_mds_session **sessions;    /* NULL for mds if no session */
 	atomic_t		num_sessions;
@@ -419,8 +420,10 @@
 				     struct dentry *dentry, char action,
 				     u32 seq);
 
-extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
-				 struct ceph_msg *msg);
+extern void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc,
+				    struct ceph_msg *msg);
+extern void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc,
+				   struct ceph_msg *msg);
 
 extern struct ceph_mds_session *
 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index a5b2275..7736a931 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -108,7 +108,6 @@
  * mount options
  */
 enum {
-	Opt_mds_namespace,
 	Opt_wsize,
 	Opt_rsize,
 	Opt_rasize,
@@ -121,6 +120,7 @@
 	Opt_last_int,
 	/* int args above */
 	Opt_snapdirname,
+	Opt_mds_namespace,
 	Opt_last_string,
 	/* string args above */
 	Opt_dirstat,
@@ -144,7 +144,6 @@
 };
 
 static match_table_t fsopt_tokens = {
-	{Opt_mds_namespace, "mds_namespace=%d"},
 	{Opt_wsize, "wsize=%d"},
 	{Opt_rsize, "rsize=%d"},
 	{Opt_rasize, "rasize=%d"},
@@ -156,6 +155,7 @@
 	{Opt_congestion_kb, "write_congestion_kb=%d"},
 	/* int args above */
 	{Opt_snapdirname, "snapdirname=%s"},
+	{Opt_mds_namespace, "mds_namespace=%s"},
 	/* string args above */
 	{Opt_dirstat, "dirstat"},
 	{Opt_nodirstat, "nodirstat"},
@@ -212,11 +212,14 @@
 		if (!fsopt->snapdir_name)
 			return -ENOMEM;
 		break;
-
-		/* misc */
 	case Opt_mds_namespace:
-		fsopt->mds_namespace = intval;
+		fsopt->mds_namespace = kstrndup(argstr[0].from,
+						argstr[0].to-argstr[0].from,
+						GFP_KERNEL);
+		if (!fsopt->mds_namespace)
+			return -ENOMEM;
 		break;
+		/* misc */
 	case Opt_wsize:
 		fsopt->wsize = intval;
 		break;
@@ -302,6 +305,7 @@
 {
 	dout("destroy_mount_options %p\n", args);
 	kfree(args->snapdir_name);
+	kfree(args->mds_namespace);
 	kfree(args->server_path);
 	kfree(args);
 }
@@ -333,6 +337,9 @@
 	ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name);
 	if (ret)
 		return ret;
+	ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace);
+	if (ret)
+		return ret;
 
 	ret = strcmp_null(fsopt1->server_path, fsopt2->server_path);
 	if (ret)
@@ -376,7 +383,6 @@
 	fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
 	fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
 	fsopt->congestion_kb = default_congestion_kb();
-	fsopt->mds_namespace = CEPH_FS_CLUSTER_ID_NONE;
 
 	/*
 	 * Distinguish the server list from the path in "dev_name".
@@ -469,8 +475,8 @@
 		seq_puts(m, ",noacl");
 #endif
 
-	if (fsopt->mds_namespace != CEPH_FS_CLUSTER_ID_NONE)
-		seq_printf(m, ",mds_namespace=%d", fsopt->mds_namespace);
+	if (fsopt->mds_namespace)
+		seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace);
 	if (fsopt->wsize)
 		seq_printf(m, ",wsize=%d", fsopt->wsize);
 	if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
@@ -509,9 +515,11 @@
 
 	switch (type) {
 	case CEPH_MSG_MDS_MAP:
-		ceph_mdsc_handle_map(fsc->mdsc, msg);
+		ceph_mdsc_handle_mdsmap(fsc->mdsc, msg);
 		return 0;
-
+	case CEPH_MSG_FS_MAP_USER:
+		ceph_mdsc_handle_fsmap(fsc->mdsc, msg);
+		return 0;
 	default:
 		return -1;
 	}
@@ -543,8 +551,14 @@
 		goto fail;
 	}
 	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
-	fsc->client->monc.fs_cluster_id = fsopt->mds_namespace;
-	ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
+
+	if (fsopt->mds_namespace == NULL) {
+		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+				   0, true);
+	} else {
+		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_FSMAP,
+				   0, false);
+	}
 
 	fsc->mount_options = fsopt;
 
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 5aa3158..9e82e29 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -62,7 +62,6 @@
 	int cap_release_safety;
 	int max_readdir;       /* max readdir result (entires) */
 	int max_readdir_bytes; /* max readdir result (bytes) */
-	int mds_namespace;
 
 	/*
 	 * everything above this point can be memcmp'd; everything below
@@ -70,6 +69,7 @@
 	 */
 
 	char *snapdir_name;   /* default ".snap" */
+	char *mds_namespace;  /* default NULL */
 	char *server_path;    /* default  "/" */
 };