diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index e791b8e4635391ecd1b628d7edb8b93c9c2f72f9..f88eacb111d4151dc5491797fe0e03faaadc26a9 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -32,6 +32,7 @@ struct ceph_osd {
 	struct rb_node o_node;
 	struct ceph_connection o_con;
 	struct list_head o_requests;
+	struct list_head o_linger_requests;
 	struct list_head o_osd_lru;
 	struct ceph_authorizer *o_authorizer;
 	void *o_authorizer_buf, *o_authorizer_reply_buf;
@@ -47,6 +48,8 @@ struct ceph_osd_request {
 	struct rb_node  r_node;
 	struct list_head r_req_lru_item;
 	struct list_head r_osd_item;
+	struct list_head r_linger_item;
+	struct list_head r_linger_osd;
 	struct ceph_osd *r_osd;
 	struct ceph_pg   r_pgid;
 	int              r_pg_osds[CEPH_PG_MAX_SIZE];
@@ -59,6 +62,7 @@ struct ceph_osd_request {
 	int               r_flags;     /* any additional flags for the osd */
 	u32               r_sent;      /* >0 if r_request is sending/sent */
 	int               r_got_reply;
+	int		  r_linger;
 
 	struct ceph_osd_client *r_osdc;
 	struct kref       r_kref;
@@ -89,6 +93,26 @@ struct ceph_osd_request {
 	struct ceph_pagelist *r_trail;	      /* trailing part of the data */
 };
 
+struct ceph_osd_event {
+	u64 cookie;
+	int one_shot;
+	struct ceph_osd_client *osdc;
+	void (*cb)(u64, u64, u8, void *);
+	void *data;
+	struct rb_node node;
+	struct list_head osd_node;
+	struct kref kref;
+	struct completion completion;
+};
+
+struct ceph_osd_event_work {
+	struct work_struct work;
+	struct ceph_osd_event *event;
+        u64 ver;
+        u64 notify_id;
+        u8 opcode;
+};
+
 struct ceph_osd_client {
 	struct ceph_client     *client;
 
@@ -106,6 +130,7 @@ struct ceph_osd_client {
 	struct list_head       req_lru;	      /* in-flight lru */
 	struct list_head       req_unsent;    /* unsent/need-resend queue */
 	struct list_head       req_notarget;  /* map to no osd */
+	struct list_head       req_linger;    /* lingering requests */
 	int                    num_requests;
 	struct delayed_work    timeout_work;
 	struct delayed_work    osds_timeout_work;
@@ -117,6 +142,12 @@ struct ceph_osd_client {
 
 	struct ceph_msgpool	msgpool_op;
 	struct ceph_msgpool	msgpool_op_reply;
+
+	spinlock_t		event_lock;
+	struct rb_root		event_tree;
+	u64			event_count;
+
+	struct workqueue_struct	*notify_wq;
 };
 
 struct ceph_osd_req_op {
@@ -151,6 +182,13 @@ struct ceph_osd_req_op {
 	        struct {
 		        u64 snapid;
 	        } snap;
+		struct {
+			u64 cookie;
+			u64 ver;
+			__u8 flag;
+			u32 prot_ver;
+			u32 timeout;
+		} watch;
 	};
 	u32 payload_len;
 };
@@ -199,6 +237,11 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
 				      bool use_mempool, int num_reply,
 				      int page_align);
 
+extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
+					 struct ceph_osd_request *req);
+extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
+						struct ceph_osd_request *req);
+
 static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
 {
 	kref_get(&req->r_kref);
@@ -234,5 +277,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 				struct page **pages, int nr_pages,
 				int flags, int do_sync, bool nofail);
 
+/* watch/notify events */
+extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
+				  void (*event_cb)(u64, u64, u8, void *),
+				  int one_shot, void *data,
+				  struct ceph_osd_event **pevent);
+extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
+extern int ceph_osdc_wait_event(struct ceph_osd_event *event,
+				unsigned long timeout);
+extern void ceph_osdc_put_event(struct ceph_osd_event *event);
 #endif
 
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index f3e4a13fea0c8a5337ce5383951fb389ea24144a..95f96ab94bba12b57052705d385be749d6183c53 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -62,6 +62,7 @@ const char *ceph_msg_type_name(int type)
 	case CEPH_MSG_OSD_MAP: return "osd_map";
 	case CEPH_MSG_OSD_OP: return "osd_op";
 	case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
+	case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
 	default: return "unknown";
 	}
 }
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index b85ed5a5503dde0640c3eb17003c5e736ffef7c5..02212ed50852eaa41b5d0c75c7f121b77f074ce3 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -25,6 +25,12 @@ static const struct ceph_connection_operations osd_con_ops;
 
 static void send_queued(struct ceph_osd_client *osdc);
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static void __register_request(struct ceph_osd_client *osdc,
+			       struct ceph_osd_request *req);
+static void __unregister_linger_request(struct ceph_osd_client *osdc,
+					struct ceph_osd_request *req);
+static int __send_request(struct ceph_osd_client *osdc,
+			  struct ceph_osd_request *req);
 
 static int op_needs_trail(int op)
 {
@@ -33,6 +39,7 @@ static int op_needs_trail(int op)
 	case CEPH_OSD_OP_SETXATTR:
 	case CEPH_OSD_OP_CMPXATTR:
 	case CEPH_OSD_OP_CALL:
+	case CEPH_OSD_OP_NOTIFY:
 		return 1;
 	default:
 		return 0;
@@ -208,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	init_completion(&req->r_completion);
 	init_completion(&req->r_safe_completion);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
+	INIT_LIST_HEAD(&req->r_linger_item);
+	INIT_LIST_HEAD(&req->r_linger_osd);
 	req->r_flags = flags;
 
 	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -314,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
 		break;
 	case CEPH_OSD_OP_STARTSYNC:
 		break;
+	case CEPH_OSD_OP_NOTIFY:
+		{
+			__le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
+			__le32 timeout = cpu_to_le32(src->watch.timeout);
+
+			BUG_ON(!req->r_trail);
+
+			ceph_pagelist_append(req->r_trail,
+						&prot_ver, sizeof(prot_ver));
+			ceph_pagelist_append(req->r_trail,
+						&timeout, sizeof(timeout));
+		}
+	case CEPH_OSD_OP_NOTIFY_ACK:
+	case CEPH_OSD_OP_WATCH:
+		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
+		dst->watch.ver = cpu_to_le64(src->watch.ver);
+		dst->watch.flag = src->watch.flag;
+		break;
 	default:
 		pr_err("unrecognized osd opcode %d\n", dst->op);
 		WARN_ON(1);
@@ -534,7 +561,7 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
 static void __kick_osd_requests(struct ceph_osd_client *osdc,
 				struct ceph_osd *osd)
 {
-	struct ceph_osd_request *req;
+	struct ceph_osd_request *req, *nreq;
 	int err;
 
 	dout("__kick_osd_requests osd%d\n", osd->o_osd);
@@ -546,7 +573,17 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 		list_move(&req->r_req_lru_item, &osdc->req_unsent);
 		dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
 		     osd->o_osd);
-		req->r_flags |= CEPH_OSD_FLAG_RETRY;
+		if (!req->r_linger)
+			req->r_flags |= CEPH_OSD_FLAG_RETRY;
+	}
+
+	list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
+				 r_linger_osd) {
+		__unregister_linger_request(osdc, req);
+		__register_request(osdc, req);
+		list_move(&req->r_req_lru_item, &osdc->req_unsent);
+		dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
+		     osd->o_osd);
 	}
 }
 
@@ -590,6 +627,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 	atomic_set(&osd->o_ref, 1);
 	osd->o_osdc = osdc;
 	INIT_LIST_HEAD(&osd->o_requests);
+	INIT_LIST_HEAD(&osd->o_linger_requests);
 	INIT_LIST_HEAD(&osd->o_osd_lru);
 	osd->o_incarnation = 1;
 
@@ -679,7 +717,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 	int ret = 0;
 
 	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
-	if (list_empty(&osd->o_requests)) {
+	if (list_empty(&osd->o_requests) &&
+	    list_empty(&osd->o_linger_requests)) {
 		__remove_osd(osdc, osd);
 	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
 			  &osd->o_con.peer_addr,
@@ -752,10 +791,9 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
  * Register request, assign tid.  If this is the first request, set up
  * the timeout event.
  */
-static void register_request(struct ceph_osd_client *osdc,
-			     struct ceph_osd_request *req)
+static void __register_request(struct ceph_osd_client *osdc,
+			       struct ceph_osd_request *req)
 {
-	mutex_lock(&osdc->request_mutex);
 	req->r_tid = ++osdc->last_tid;
 	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
 	INIT_LIST_HEAD(&req->r_req_lru_item);
@@ -769,6 +807,13 @@ static void register_request(struct ceph_osd_client *osdc,
 		dout(" first request, scheduling timeout\n");
 		__schedule_osd_timeout(osdc);
 	}
+}
+
+static void register_request(struct ceph_osd_client *osdc,
+			     struct ceph_osd_request *req)
+{
+	mutex_lock(&osdc->request_mutex);
+	__register_request(osdc, req);
 	mutex_unlock(&osdc->request_mutex);
 }
 
@@ -787,9 +832,14 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 
 		list_del_init(&req->r_osd_item);
-		if (list_empty(&req->r_osd->o_requests))
+		if (list_empty(&req->r_osd->o_requests) &&
+		    list_empty(&req->r_osd->o_linger_requests)) {
+			dout("moving osd to %p lru\n", req->r_osd);
 			__move_osd_to_lru(osdc, req->r_osd);
-		req->r_osd = NULL;
+		}
+		if (list_empty(&req->r_osd_item) &&
+		    list_empty(&req->r_linger_item))
+			req->r_osd = NULL;
 	}
 
 	ceph_osdc_put_request(req);
@@ -812,6 +862,58 @@ static void __cancel_request(struct ceph_osd_request *req)
 	}
 }
 
+static void __register_linger_request(struct ceph_osd_client *osdc,
+				    struct ceph_osd_request *req)
+{
+	dout("__register_linger_request %p\n", req);
+	list_add_tail(&req->r_linger_item, &osdc->req_linger);
+	list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
+}
+
+static void __unregister_linger_request(struct ceph_osd_client *osdc,
+					struct ceph_osd_request *req)
+{
+	dout("__unregister_linger_request %p\n", req);
+	if (req->r_osd) {
+		list_del_init(&req->r_linger_item);
+		list_del_init(&req->r_linger_osd);
+
+		if (list_empty(&req->r_osd->o_requests) &&
+		    list_empty(&req->r_osd->o_linger_requests)) {
+			dout("moving osd to %p lru\n", req->r_osd);
+			__move_osd_to_lru(osdc, req->r_osd);
+		}
+		req->r_osd = NULL;
+	}
+}
+
+void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
+					 struct ceph_osd_request *req)
+{
+	mutex_lock(&osdc->request_mutex);
+	if (req->r_linger) {
+		__unregister_linger_request(osdc, req);
+		ceph_osdc_put_request(req);
+	}
+	mutex_unlock(&osdc->request_mutex);
+}
+EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
+
+void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
+				  struct ceph_osd_request *req)
+{
+	if (!req->r_linger) {
+		dout("set_request_linger %p\n", req);
+		req->r_linger = 1;
+		/*
+		 * caller is now responsible for calling
+		 * unregister_linger_request
+		 */
+		ceph_osdc_get_request(req);
+	}
+}
+EXPORT_SYMBOL(ceph_osdc_set_request_linger);
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
@@ -958,7 +1060,6 @@ static void handle_timeout(struct work_struct *work)
 		osdc->client->options->osd_keepalive_timeout * HZ;
 	unsigned long last_stamp = 0;
 	struct list_head slow_osds;
-
 	dout("timeout\n");
 	down_read(&osdc->map_sem);
 
@@ -1060,7 +1161,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 	    numops * sizeof(struct ceph_osd_op))
 		goto bad;
 	dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
-
 	/* lookup */
 	mutex_lock(&osdc->request_mutex);
 	req = __lookup_request(osdc, tid);
@@ -1104,6 +1204,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 
 	dout("handle_reply tid %llu flags %d\n", tid, flags);
 
+	if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
+		__register_linger_request(osdc, req);
+
 	/* either this is a read, or we got the safe response */
 	if (result < 0 ||
 	    (flags & CEPH_OSD_FLAG_ONDISK) ||
@@ -1124,6 +1227,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 	}
 
 done:
+	dout("req=%p req->r_linger=%d\n", req, req->r_linger);
 	ceph_osdc_put_request(req);
 	return;
 
@@ -1159,7 +1263,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
  */
 static void kick_requests(struct ceph_osd_client *osdc)
 {
-	struct ceph_osd_request *req;
+	struct ceph_osd_request *req, *nreq;
 	struct rb_node *p;
 	int needmap = 0;
 	int err;
@@ -1177,8 +1281,30 @@ static void kick_requests(struct ceph_osd_client *osdc)
 		} else if (err > 0) {
 			dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
 			     req->r_osd ? req->r_osd->o_osd : -1);
-			req->r_flags |= CEPH_OSD_FLAG_RETRY;
+			if (!req->r_linger)
+				req->r_flags |= CEPH_OSD_FLAG_RETRY;
+		}
+	}
+
+	list_for_each_entry_safe(req, nreq, &osdc->req_linger,
+				 r_linger_item) {
+		dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
+
+		err = __map_request(osdc, req);
+		if (err == 0)
+			continue;  /* no change and no osd was specified */
+		if (err < 0)
+			continue;  /* hrm! */
+		if (req->r_osd == NULL) {
+			dout("tid %llu maps to no valid osd\n", req->r_tid);
+			needmap++;  /* request a newer map */
+			continue;
 		}
+
+		dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
+		     req->r_osd ? req->r_osd->o_osd : -1);
+		__unregister_linger_request(osdc, req);
+		__register_request(osdc, req);
 	}
 	mutex_unlock(&osdc->request_mutex);
 
@@ -1301,6 +1427,223 @@ bad:
 	return;
 }
 
+/*
+ * watch/notify callback event infrastructure
+ *
+ * These callbacks are used both for watch and notify operations.
+ */
+static void __release_event(struct kref *kref)
+{
+	struct ceph_osd_event *event =
+		container_of(kref, struct ceph_osd_event, kref);
+
+	dout("__release_event %p\n", event);
+	kfree(event);
+}
+
+static void get_event(struct ceph_osd_event *event)
+{
+	kref_get(&event->kref);
+}
+
+void ceph_osdc_put_event(struct ceph_osd_event *event)
+{
+	kref_put(&event->kref, __release_event);
+}
+EXPORT_SYMBOL(ceph_osdc_put_event);
+
+static void __insert_event(struct ceph_osd_client *osdc,
+			     struct ceph_osd_event *new)
+{
+	struct rb_node **p = &osdc->event_tree.rb_node;
+	struct rb_node *parent = NULL;
+	struct ceph_osd_event *event = NULL;
+
+	while (*p) {
+		parent = *p;
+		event = rb_entry(parent, struct ceph_osd_event, node);
+		if (new->cookie < event->cookie)
+			p = &(*p)->rb_left;
+		else if (new->cookie > event->cookie)
+			p = &(*p)->rb_right;
+		else
+			BUG();
+	}
+
+	rb_link_node(&new->node, parent, p);
+	rb_insert_color(&new->node, &osdc->event_tree);
+}
+
+static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
+					        u64 cookie)
+{
+	struct rb_node **p = &osdc->event_tree.rb_node;
+	struct rb_node *parent = NULL;
+	struct ceph_osd_event *event = NULL;
+
+	while (*p) {
+		parent = *p;
+		event = rb_entry(parent, struct ceph_osd_event, node);
+		if (cookie < event->cookie)
+			p = &(*p)->rb_left;
+		else if (cookie > event->cookie)
+			p = &(*p)->rb_right;
+		else
+			return event;
+	}
+	return NULL;
+}
+
+static void __remove_event(struct ceph_osd_event *event)
+{
+	struct ceph_osd_client *osdc = event->osdc;
+
+	if (!RB_EMPTY_NODE(&event->node)) {
+		dout("__remove_event removed %p\n", event);
+		rb_erase(&event->node, &osdc->event_tree);
+		ceph_osdc_put_event(event);
+	} else {
+		dout("__remove_event didn't remove %p\n", event);
+	}
+}
+
+int ceph_osdc_create_event(struct ceph_osd_client *osdc,
+			   void (*event_cb)(u64, u64, u8, void *),
+			   int one_shot, void *data,
+			   struct ceph_osd_event **pevent)
+{
+	struct ceph_osd_event *event;
+
+	event = kmalloc(sizeof(*event), GFP_NOIO);
+	if (!event)
+		return -ENOMEM;
+
+	dout("create_event %p\n", event);
+	event->cb = event_cb;
+	event->one_shot = one_shot;
+	event->data = data;
+	event->osdc = osdc;
+	INIT_LIST_HEAD(&event->osd_node);
+	kref_init(&event->kref);   /* one ref for us */
+	kref_get(&event->kref);    /* one ref for the caller */
+	init_completion(&event->completion);
+
+	spin_lock(&osdc->event_lock);
+	event->cookie = ++osdc->event_count;
+	__insert_event(osdc, event);
+	spin_unlock(&osdc->event_lock);
+
+	*pevent = event;
+	return 0;
+}
+EXPORT_SYMBOL(ceph_osdc_create_event);
+
+void ceph_osdc_cancel_event(struct ceph_osd_event *event)
+{
+	struct ceph_osd_client *osdc = event->osdc;
+
+	dout("cancel_event %p\n", event);
+	spin_lock(&osdc->event_lock);
+	__remove_event(event);
+	spin_unlock(&osdc->event_lock);
+	ceph_osdc_put_event(event); /* caller's */
+}
+EXPORT_SYMBOL(ceph_osdc_cancel_event);
+
+
+static void do_event_work(struct work_struct *work)
+{
+	struct ceph_osd_event_work *event_work =
+		container_of(work, struct ceph_osd_event_work, work);
+	struct ceph_osd_event *event = event_work->event;
+	u64 ver = event_work->ver;
+	u64 notify_id = event_work->notify_id;
+	u8 opcode = event_work->opcode;
+
+	dout("do_event_work completing %p\n", event);
+	event->cb(ver, notify_id, opcode, event->data);
+	complete(&event->completion);
+	dout("do_event_work completed %p\n", event);
+	ceph_osdc_put_event(event);
+	kfree(event_work);
+}
+
+
+/*
+ * Process osd watch notifications
+ */
+void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+	void *p, *end;
+	u8 proto_ver;
+	u64 cookie, ver, notify_id;
+	u8 opcode;
+	struct ceph_osd_event *event;
+	struct ceph_osd_event_work *event_work;
+
+	p = msg->front.iov_base;
+	end = p + msg->front.iov_len;
+
+	ceph_decode_8_safe(&p, end, proto_ver, bad);
+	ceph_decode_8_safe(&p, end, opcode, bad);
+	ceph_decode_64_safe(&p, end, cookie, bad);
+	ceph_decode_64_safe(&p, end, ver, bad);
+	ceph_decode_64_safe(&p, end, notify_id, bad);
+
+	spin_lock(&osdc->event_lock);
+	event = __find_event(osdc, cookie);
+	if (event) {
+		get_event(event);
+		if (event->one_shot)
+			__remove_event(event);
+	}
+	spin_unlock(&osdc->event_lock);
+	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
+	     cookie, ver, event);
+	if (event) {
+		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
+		INIT_WORK(&event_work->work, do_event_work);
+		if (!event_work) {
+			dout("ERROR: could not allocate event_work\n");
+			goto done_err;
+		}
+		event_work->event = event;
+		event_work->ver = ver;
+		event_work->notify_id = notify_id;
+		event_work->opcode = opcode;
+		if (!queue_work(osdc->notify_wq, &event_work->work)) {
+			dout("WARNING: failed to queue notify event work\n");
+			goto done_err;
+		}
+	}
+
+	return;
+
+done_err:
+	complete(&event->completion);
+	ceph_osdc_put_event(event);
+	return;
+
+bad:
+	pr_err("osdc handle_watch_notify corrupt msg\n");
+	return;
+}
+
+int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
+{
+	int err;
+
+	dout("wait_event %p\n", event);
+	err = wait_for_completion_interruptible_timeout(&event->completion,
+							timeout * HZ);
+	ceph_osdc_put_event(event);
+	if (err > 0)
+		err = 0;
+	dout("wait_event %p returns %d\n", event, err);
+	return err;
+}
+EXPORT_SYMBOL(ceph_osdc_wait_event);
+
 /*
  * Register request, send initial attempt.
  */
@@ -1430,9 +1773,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 	INIT_LIST_HEAD(&osdc->req_lru);
 	INIT_LIST_HEAD(&osdc->req_unsent);
 	INIT_LIST_HEAD(&osdc->req_notarget);
+	INIT_LIST_HEAD(&osdc->req_linger);
 	osdc->num_requests = 0;
 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+	spin_lock_init(&osdc->event_lock);
+	osdc->event_tree = RB_ROOT;
+	osdc->event_count = 0;
 
 	schedule_delayed_work(&osdc->osds_timeout_work,
 	   round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
@@ -1452,6 +1799,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 				"osd_op_reply");
 	if (err < 0)
 		goto out_msgpool;
+
+	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
+	if (IS_ERR(osdc->notify_wq)) {
+		err = PTR_ERR(osdc->notify_wq);
+		osdc->notify_wq = NULL;
+		goto out_msgpool;
+	}
 	return 0;
 
 out_msgpool:
@@ -1465,6 +1819,8 @@ EXPORT_SYMBOL(ceph_osdc_init);
 
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
+	flush_workqueue(osdc->notify_wq);
+	destroy_workqueue(osdc->notify_wq);
 	cancel_delayed_work_sync(&osdc->timeout_work);
 	cancel_delayed_work_sync(&osdc->osds_timeout_work);
 	if (osdc->osdmap) {
@@ -1472,6 +1828,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
 		osdc->osdmap = NULL;
 	}
 	remove_old_osds(osdc, 1);
+	WARN_ON(!RB_EMPTY_ROOT(&osdc->osds));
 	mempool_destroy(osdc->req_mempool);
 	ceph_msgpool_destroy(&osdc->msgpool_op);
 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
@@ -1580,6 +1937,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 	case CEPH_MSG_OSD_OPREPLY:
 		handle_reply(osdc, msg, con);
 		break;
+	case CEPH_MSG_WATCH_NOTIFY:
+		handle_watch_notify(osdc, msg);
+		break;
 
 	default:
 		pr_err("received unknown message type %d %s\n", type,
@@ -1673,6 +2033,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 
 	switch (type) {
 	case CEPH_MSG_OSD_MAP:
+	case CEPH_MSG_WATCH_NOTIFY:
 		return ceph_msg_new(type, front, GFP_NOFS);
 	case CEPH_MSG_OSD_OPREPLY:
 		return get_reply(con, hdr, skip);