ceph: put unused osd connections on lru
Instead of removing osd connection immediately when the
requests list is empty, put the osd connection on an lru.
Only if that osd has not been used for more than a specified
time, will it be removed.
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 35c8afe..7f8a26f 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -389,6 +389,7 @@
atomic_set(&osd->o_ref, 1);
osd->o_osdc = osdc;
INIT_LIST_HEAD(&osd->o_requests);
+ INIT_LIST_HEAD(&osd->o_osd_lru);
osd->o_incarnation = 1;
ceph_con_init(osdc->client->msgr, &osd->o_con);
@@ -422,25 +423,56 @@
/*
* remove an osd from our map
*/
-static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
- dout("remove_osd %p\n", osd);
+ dout("__remove_osd %p\n", osd);
BUG_ON(!list_empty(&osd->o_requests));
rb_erase(&osd->o_node, &osdc->osds);
+ list_del_init(&osd->o_osd_lru);
ceph_con_close(&osd->o_con);
put_osd(osd);
}
+static void __move_osd_to_lru(struct ceph_osd_client *osdc,
+ struct ceph_osd *osd)
+{
+ dout("__move_osd_to_lru %p\n", osd);
+ BUG_ON(!list_empty(&osd->o_osd_lru));
+ list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
+ osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
+}
+
+static void __remove_osd_from_lru(struct ceph_osd *osd)
+{
+ dout("__remove_osd_from_lru %p\n", osd);
+ if (!list_empty(&osd->o_osd_lru))
+ list_del_init(&osd->o_osd_lru);
+}
+
+static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
+{
+ struct ceph_osd *osd, *nosd;
+
+ dout("__remove_old_osds %p\n", osdc);
+ mutex_lock(&osdc->request_mutex);
+ list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
+ if (!remove_all && time_before(jiffies, osd->lru_ttl))
+ break;
+ __remove_osd(osdc, osd);
+ }
+ mutex_unlock(&osdc->request_mutex);
+}
+
/*
* reset osd connect
*/
-static int reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
int ret = 0;
- dout("reset_osd %p osd%d\n", osd, osd->o_osd);
+ dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
if (list_empty(&osd->o_requests)) {
- remove_osd(osdc, osd);
+ __remove_osd(osdc, osd);
} else {
ceph_con_close(&osd->o_con);
ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
@@ -533,7 +565,7 @@
list_del_init(&req->r_osd_item);
if (list_empty(&req->r_osd->o_requests))
- remove_osd(osdc, req->r_osd);
+ __move_osd_to_lru(osdc, req->r_osd);
req->r_osd = NULL;
}
@@ -611,7 +643,7 @@
if (list_empty(&req->r_osd->o_requests)) {
/* try to re-use r_osd if possible */
newosd = get_osd(req->r_osd);
- remove_osd(osdc, newosd);
+ __remove_osd(osdc, newosd);
}
req->r_osd = NULL;
}
@@ -636,8 +668,10 @@
ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
}
- if (req->r_osd)
+ if (req->r_osd) {
+ __remove_osd_from_lru(req->r_osd);
list_add(&req->r_osd_item, &req->r_osd->o_requests);
+ }
err = 1; /* osd changed */
out:
@@ -744,6 +778,23 @@
up_read(&osdc->map_sem);
}
+static void handle_osds_timeout(struct work_struct *work)
+{
+ struct ceph_osd_client *osdc =
+ container_of(work, struct ceph_osd_client,
+ osds_timeout_work.work);
+ unsigned long delay =
+ osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
+
+ dout("osds timeout\n");
+ down_read(&osdc->map_sem);
+ remove_old_osds(osdc, 0);
+ up_read(&osdc->map_sem);
+
+ schedule_delayed_work(&osdc->osds_timeout_work,
+ round_jiffies_relative(delay));
+}
+
/*
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
@@ -881,7 +932,7 @@
ceph_osd_addr(osdc->osdmap,
osd->o_osd),
sizeof(struct ceph_entity_addr)) != 0)
- reset_osd(osdc, osd);
+ __reset_osd(osdc, osd);
}
}
@@ -1195,9 +1246,14 @@
osdc->timeout_tid = 0;
osdc->last_tid = 0;
osdc->osds = RB_ROOT;
+ INIT_LIST_HEAD(&osdc->osd_lru);
osdc->requests = RB_ROOT;
osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
+ INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+
+ schedule_delayed_work(&osdc->osds_timeout_work,
+ round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
err = -ENOMEM;
osdc->req_mempool = mempool_create_kmalloc_pool(10,
@@ -1219,10 +1275,12 @@
void ceph_osdc_stop(struct ceph_osd_client *osdc)
{
cancel_delayed_work_sync(&osdc->timeout_work);
+ cancel_delayed_work_sync(&osdc->osds_timeout_work);
if (osdc->osdmap) {
ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = NULL;
}
+ remove_old_osds(osdc, 1);
mempool_destroy(osdc->req_mempool);
ceph_msgpool_destroy(&osdc->msgpool_op);
}