diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index bc87b9c1d27ea8e253f5a1b9b395a287ede4106e..0fcd2640c23fdda2c7fd99415b730837c591eba8 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -3,6 +3,7 @@ config CEPH_FS
 	depends on INET && EXPERIMENTAL
 	select LIBCRC32C
 	select CRYPTO_AES
+	select CRYPTO
 	help
 	  Choose Y or M here to include support for mounting the
 	  experimental Ceph distributed file system.  Ceph is an extremely
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 4cfce1ee31faaf4f2f6aab966acd5d6001753940..efbc604001c8bbfa3b96eb107529f0f03256b1a1 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -411,8 +411,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	if (i_size < page_off + len)
 		len = i_size - page_off;
 
-	dout("writepage %p page %p index %lu on %llu~%u\n",
-	     inode, page, page->index, page_off, len);
+	dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
+	     inode, page, page->index, page_off, len, snapc);
 
 	writeback_stat = atomic_long_inc_return(&client->writeback_count);
 	if (writeback_stat >
@@ -766,7 +766,8 @@ get_more_pages:
 			/* ok */
 			if (locked_pages == 0) {
 				/* prepare async write request */
-				offset = page->index << PAGE_CACHE_SHIFT;
+				offset = (unsigned long long)page->index
+					<< PAGE_CACHE_SHIFT;
 				len = wsize;
 				req = ceph_osdc_new_request(&client->osdc,
 					    &ci->i_layout,
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index a2069b6680aed83eb0be0af7c584b4a619ae239a..73c153092f7292f20616108a3f2e61cc0630659e 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -814,7 +814,7 @@ int __ceph_caps_used(struct ceph_inode_info *ci)
 		used |= CEPH_CAP_PIN;
 	if (ci->i_rd_ref)
 		used |= CEPH_CAP_FILE_RD;
-	if (ci->i_rdcache_ref || ci->i_rdcache_gen)
+	if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages)
 		used |= CEPH_CAP_FILE_CACHE;
 	if (ci->i_wr_ref)
 		used |= CEPH_CAP_FILE_WR;
@@ -1195,10 +1195,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
  * asynchronously back to the MDS once sync writes complete and dirty
  * data is written out.
  *
+ * Unless @again is true, skip cap_snaps that were already sent to
+ * the MDS (i.e., during this session).
+ *
  * Called under i_lock.  Takes s_mutex as needed.
  */
 void __ceph_flush_snaps(struct ceph_inode_info *ci,
-			struct ceph_mds_session **psession)
+			struct ceph_mds_session **psession,
+			int again)
 		__releases(ci->vfs_inode->i_lock)
 		__acquires(ci->vfs_inode->i_lock)
 {
@@ -1227,7 +1231,7 @@ retry:
 		 * pages to be written out.
 		 */
 		if (capsnap->dirty_pages || capsnap->writing)
-			continue;
+			break;
 
 		/*
 		 * if cap writeback already occurred, we should have dropped
@@ -1240,6 +1244,13 @@ retry:
 			dout("no auth cap (migrating?), doing nothing\n");
 			goto out;
 		}
+
+		/* only flush each capsnap once */
+		if (!again && !list_empty(&capsnap->flushing_item)) {
+			dout("already flushed %p, skipping\n", capsnap);
+			continue;
+		}
+
 		mds = ci->i_auth_cap->session->s_mds;
 		mseq = ci->i_auth_cap->mseq;
 
@@ -1276,8 +1287,8 @@ retry:
 			      &session->s_cap_snaps_flushing);
 		spin_unlock(&inode->i_lock);
 
-		dout("flush_snaps %p cap_snap %p follows %lld size %llu\n",
-		     inode, capsnap, next_follows, capsnap->size);
+		dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
+		     inode, capsnap, capsnap->follows, capsnap->flush_tid);
 		send_cap_msg(session, ceph_vino(inode).ino, 0,
 			     CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
 			     capsnap->dirty, 0, capsnap->flush_tid, 0, mseq,
@@ -1314,7 +1325,7 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci)
 	struct inode *inode = &ci->vfs_inode;
 
 	spin_lock(&inode->i_lock);
-	__ceph_flush_snaps(ci, NULL);
+	__ceph_flush_snaps(ci, NULL, 0);
 	spin_unlock(&inode->i_lock);
 }
 
@@ -1477,7 +1488,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 
 	/* flush snaps first time around only */
 	if (!list_empty(&ci->i_cap_snaps))
-		__ceph_flush_snaps(ci, &session);
+		__ceph_flush_snaps(ci, &session, 0);
 	goto retry_locked;
 retry:
 	spin_lock(&inode->i_lock);
@@ -1894,7 +1905,7 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
 		if (cap && cap->session == session) {
 			dout("kick_flushing_caps %p cap %p capsnap %p\n", inode,
 			     cap, capsnap);
-			__ceph_flush_snaps(ci, &session);
+			__ceph_flush_snaps(ci, &session, 1);
 		} else {
 			pr_err("%p auth cap %p not mds%d ???\n", inode,
 			       cap, session->s_mds);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 6e4f43ff23ec587050eab1b0e735e8d519827c85..a1986eb52045b8184d6ab31e46b8a2497295e9ba 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -1021,11 +1021,15 @@ out_touch:
 static void ceph_dentry_release(struct dentry *dentry)
 {
 	struct ceph_dentry_info *di = ceph_dentry(dentry);
-	struct inode *parent_inode = dentry->d_parent->d_inode;
-	u64 snapid = ceph_snap(parent_inode);
+	struct inode *parent_inode = NULL;
+	u64 snapid = CEPH_NOSNAP;
 
+	if (!IS_ROOT(dentry)) {
+		parent_inode = dentry->d_parent->d_inode;
+		if (parent_inode)
+			snapid = ceph_snap(parent_inode);
+	}
 	dout("dentry_release %p parent %p\n", dentry, parent_inode);
-
 	if (parent_inode && snapid != CEPH_SNAPDIR) {
 		struct ceph_inode_info *ci = ceph_inode(parent_inode);
 
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e7cca414da03bcbd7549889a5ecb00d05ee11901..62377ec37edf05c14cf8567c0d22e19e02762253 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -845,7 +845,7 @@ static void ceph_set_dentry_offset(struct dentry *dn)
  * the caller) if we fail.
  */
 static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
-				    bool *prehash)
+				    bool *prehash, bool set_offset)
 {
 	struct dentry *realdn;
 
@@ -877,7 +877,8 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
 	}
 	if ((!prehash || *prehash) && d_unhashed(dn))
 		d_rehash(dn);
-	ceph_set_dentry_offset(dn);
+	if (set_offset)
+		ceph_set_dentry_offset(dn);
 out:
 	return dn;
 }
@@ -1062,7 +1063,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
 				d_delete(dn);
 				goto done;
 			}
-			dn = splice_dentry(dn, in, &have_lease);
+			dn = splice_dentry(dn, in, &have_lease, true);
 			if (IS_ERR(dn)) {
 				err = PTR_ERR(dn);
 				goto done;
@@ -1105,7 +1106,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
 			goto done;
 		}
 		dout(" linking snapped dir %p to dn %p\n", in, dn);
-		dn = splice_dentry(dn, in, NULL);
+		dn = splice_dentry(dn, in, NULL, true);
 		if (IS_ERR(dn)) {
 			err = PTR_ERR(dn);
 			goto done;
@@ -1237,7 +1238,7 @@ retry_lookup:
 				err = PTR_ERR(in);
 				goto out;
 			}
-			dn = splice_dentry(dn, in, NULL);
+			dn = splice_dentry(dn, in, NULL, false);
 			if (IS_ERR(dn))
 				dn = NULL;
 		}
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index f091b1351786368de18757d8cb262a19d1006bf1..fad95f8f2608bc4e86c0876bedd81cb5be46a4ef 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2374,6 +2374,8 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 						num_fcntl_locks,
 						num_flock_locks);
 		unlock_kernel();
+	} else {
+		err = ceph_pagelist_append(pagelist, &rec, reclen);
 	}
 
 out_free:
diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c
index b6859f47d364ba2134121d48fa6cc720f3b6b1a4..46a368b6dce5378f3b7a486b60700c52f65a809b 100644
--- a/fs/ceph/pagelist.c
+++ b/fs/ceph/pagelist.c
@@ -5,10 +5,18 @@
 
 #include "pagelist.h"
 
+static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
+{
+	struct page *page = list_entry(pl->head.prev, struct page,
+				       lru);
+	kunmap(page);
+}
+
 int ceph_pagelist_release(struct ceph_pagelist *pl)
 {
 	if (pl->mapped_tail)
-		kunmap(pl->mapped_tail);
+		ceph_pagelist_unmap_tail(pl);
+
 	while (!list_empty(&pl->head)) {
 		struct page *page = list_first_entry(&pl->head, struct page,
 						     lru);
@@ -26,7 +34,7 @@ static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
 	pl->room += PAGE_SIZE;
 	list_add_tail(&page->lru, &pl->head);
 	if (pl->mapped_tail)
-		kunmap(pl->mapped_tail);
+		ceph_pagelist_unmap_tail(pl);
 	pl->mapped_tail = kmap(page);
 	return 0;
 }
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 4868b9dcac5a6cc7a4d00610780572f335ef68f2..190b6c4a6f2b91aace658f8de7664f75cc0bdcc4 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -119,6 +119,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
 	INIT_LIST_HEAD(&realm->children);
 	INIT_LIST_HEAD(&realm->child_item);
 	INIT_LIST_HEAD(&realm->empty_item);
+	INIT_LIST_HEAD(&realm->dirty_item);
 	INIT_LIST_HEAD(&realm->inodes_with_caps);
 	spin_lock_init(&realm->inodes_with_caps_lock);
 	__insert_snap_realm(&mdsc->snap_realms, realm);
@@ -467,7 +468,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 		INIT_LIST_HEAD(&capsnap->ci_item);
 		INIT_LIST_HEAD(&capsnap->flushing_item);
 
-		capsnap->follows = snapc->seq - 1;
+		capsnap->follows = snapc->seq;
 		capsnap->issued = __ceph_caps_issued(ci, NULL);
 		capsnap->dirty = dirty;
 
@@ -604,6 +605,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
 	struct ceph_snap_realm *realm;
 	int invalidate = 0;
 	int err = -ENOMEM;
+	LIST_HEAD(dirty_realms);
 
 	dout("update_snap_trace deletion=%d\n", deletion);
 more:
@@ -626,24 +628,6 @@ more:
 		}
 	}
 
-	if (le64_to_cpu(ri->seq) > realm->seq) {
-		dout("update_snap_trace updating %llx %p %lld -> %lld\n",
-		     realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
-		/*
-		 * if the realm seq has changed, queue a cap_snap for every
-		 * inode with open caps.  we do this _before_ we update
-		 * the realm info so that we prepare for writeback under the
-		 * _previous_ snap context.
-		 *
-		 * ...unless it's a snap deletion!
-		 */
-		if (!deletion)
-			queue_realm_cap_snaps(realm);
-	} else {
-		dout("update_snap_trace %llx %p seq %lld unchanged\n",
-		     realm->ino, realm, realm->seq);
-	}
-
 	/* ensure the parent is correct */
 	err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
 	if (err < 0)
@@ -651,6 +635,8 @@ more:
 	invalidate += err;
 
 	if (le64_to_cpu(ri->seq) > realm->seq) {
+		dout("update_snap_trace updating %llx %p %lld -> %lld\n",
+		     realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
 		/* update realm parameters, snap lists */
 		realm->seq = le64_to_cpu(ri->seq);
 		realm->created = le64_to_cpu(ri->created);
@@ -668,9 +654,17 @@ more:
 		if (err < 0)
 			goto fail;
 
+		/* queue realm for cap_snap creation */
+		list_add(&realm->dirty_item, &dirty_realms);
+
 		invalidate = 1;
 	} else if (!realm->cached_context) {
+		dout("update_snap_trace %llx %p seq %lld new\n",
+		     realm->ino, realm, realm->seq);
 		invalidate = 1;
+	} else {
+		dout("update_snap_trace %llx %p seq %lld unchanged\n",
+		     realm->ino, realm, realm->seq);
 	}
 
 	dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
@@ -683,6 +677,14 @@ more:
 	if (invalidate)
 		rebuild_snap_realms(realm);
 
+	/*
+	 * queue cap snaps _after_ we've built the new snap contexts,
+	 * so that i_head_snapc can be set appropriately.
+	 */
+	list_for_each_entry(realm, &dirty_realms, dirty_item) {
+		queue_realm_cap_snaps(realm);
+	}
+
 	__cleanup_empty_realms(mdsc);
 	return 0;
 
@@ -715,7 +717,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
 		igrab(inode);
 		spin_unlock(&mdsc->snap_flush_lock);
 		spin_lock(&inode->i_lock);
-		__ceph_flush_snaps(ci, &session);
+		__ceph_flush_snaps(ci, &session, 0);
 		spin_unlock(&inode->i_lock);
 		iput(inode);
 		spin_lock(&mdsc->snap_flush_lock);
@@ -816,6 +818,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
 			};
 			struct inode *inode = ceph_find_inode(sb, vino);
 			struct ceph_inode_info *ci;
+			struct ceph_snap_realm *oldrealm;
 
 			if (!inode)
 				continue;
@@ -841,18 +844,19 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
 			dout(" will move %p to split realm %llx %p\n",
 			     inode, realm->ino, realm);
 			/*
-			 * Remove the inode from the realm's inode
-			 * list, but don't add it to the new realm
-			 * yet.  We don't want the cap_snap to be
-			 * queued (again) by ceph_update_snap_trace()
-			 * below.  Queue it _now_, under the old context.
+			 * Move the inode to the new realm
 			 */
 			spin_lock(&realm->inodes_with_caps_lock);
 			list_del_init(&ci->i_snap_realm_item);
+			list_add(&ci->i_snap_realm_item,
+				 &realm->inodes_with_caps);
+			oldrealm = ci->i_snap_realm;
+			ci->i_snap_realm = realm;
 			spin_unlock(&realm->inodes_with_caps_lock);
 			spin_unlock(&inode->i_lock);
 
-			ceph_queue_cap_snap(ci);
+			ceph_get_snap_realm(mdsc, realm);
+			ceph_put_snap_realm(mdsc, oldrealm);
 
 			iput(inode);
 			continue;
@@ -880,43 +884,9 @@ skip_inode:
 	ceph_update_snap_trace(mdsc, p, e,
 			       op == CEPH_SNAP_OP_DESTROY);
 
-	if (op == CEPH_SNAP_OP_SPLIT) {
-		/*
-		 * ok, _now_ add the inodes into the new realm.
-		 */
-		for (i = 0; i < num_split_inos; i++) {
-			struct ceph_vino vino = {
-				.ino = le64_to_cpu(split_inos[i]),
-				.snap = CEPH_NOSNAP,
-			};
-			struct inode *inode = ceph_find_inode(sb, vino);
-			struct ceph_inode_info *ci;
-
-			if (!inode)
-				continue;
-			ci = ceph_inode(inode);
-			spin_lock(&inode->i_lock);
-			if (list_empty(&ci->i_snap_realm_item)) {
-				struct ceph_snap_realm *oldrealm =
-					ci->i_snap_realm;
-
-				dout(" moving %p to split realm %llx %p\n",
-				     inode, realm->ino, realm);
-				spin_lock(&realm->inodes_with_caps_lock);
-				list_add(&ci->i_snap_realm_item,
-					 &realm->inodes_with_caps);
-				ci->i_snap_realm = realm;
-				spin_unlock(&realm->inodes_with_caps_lock);
-				ceph_get_snap_realm(mdsc, realm);
-				ceph_put_snap_realm(mdsc, oldrealm);
-			}
-			spin_unlock(&inode->i_lock);
-			iput(inode);
-		}
-
+	if (op == CEPH_SNAP_OP_SPLIT)
 		/* we took a reference when we created the realm, above */
 		ceph_put_snap_realm(mdsc, realm);
-	}
 
 	__cleanup_empty_realms(mdsc);
 
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index c33897ae5725e82ca269606b78d54214d8abf7af..b87638e84c4bc266e9b325f12e7fb98fc78e8986 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -690,6 +690,8 @@ struct ceph_snap_realm {
 
 	struct list_head empty_item;     /* if i have ref==0 */
 
+	struct list_head dirty_item;     /* if realm needs new context */
+
 	/* the current set of snaps for this realm */
 	struct ceph_snap_context *cached_context;
 
@@ -826,7 +828,8 @@ extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
 				       struct ceph_snap_context *snapc);
 extern void __ceph_flush_snaps(struct ceph_inode_info *ci,
-			       struct ceph_mds_session **psession);
+			       struct ceph_mds_session **psession,
+			       int again);
 extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 			    struct ceph_mds_session *session);
 extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);