Commit 6cccc7d3 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull ceph updates from Sage Weil:
 "This includes both the first pile of Ceph patches (which I sent to
  torvalds@vger, sigh) and a few new patches that add support for
  fscache for Ceph.  That includes a few fscache core fixes that David
  Howells asked go through the Ceph tree.  (Thanks go to Milosz Tanski
  for putting this feature together)

  This first batch of patches (included here) had (has) several
  important RBD bug fixes, hole punch support, several different
  cleanups in the page cache interactions, improvements in the truncate
  code (new truncate mutex to avoid shenanigans with i_mutex), and a
  series of fixes in the synchronous striping read/write code.

  On top of that is a random collection of small fixes all across the
  tree (error code checks and error path cleanup, obsolete wq flags,
  etc)"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (43 commits)
  ceph: use d_invalidate() to invalidate aliases
  ceph: remove ceph_lookup_inode()
  ceph: trivial buildbot warnings fix
  ceph: Do not do invalidate if the filesystem is mounted nofsc
  ceph: page still marked private_2
  ceph: ceph_readpage_to_fscache didn't check if marked
  ceph: clean PgPrivate2 on returning from readpages
  ceph: use fscache as a local presisent cache
  fscache: Netfs function for cleanup post readpages
  FS-Cache: Fix heading in documentation
  CacheFiles: Implement interface to check cache consistency
  FS-Cache: Add interface to check consistency of a cached object
  rbd: fix null dereference in dout
  rbd: fix buffer size for writes to images with snapshots
  libceph: use pg_num_mask instead of pgp_num_mask for pg.seed calc
  rbd: fix I/O error propagation for reads
  ceph: use vfs __set_page_dirty_nobuffers interface instead of doing it inside filesystem
  ceph: allow sync_read/write return partial successed size of read/write.
  ceph: fix bugs about handling short-read for sync read mode.
  ceph: remove useless variable revoked_rdcache
  ...
parents 255ae3fb a8d436f0
......@@ -299,6 +299,15 @@ performed on the denizens of the cache. These are held in a structure of type:
enough space in the cache to permit this.
(*) Check coherency state of an object [mandatory]:
int (*check_consistency)(struct fscache_object *object)
This method is called to have the cache check the saved auxiliary data of
the object against the netfs's idea of the state. 0 should be returned
if they're consistent and -ESTALE otherwise. -ENOMEM and -ERESTARTSYS
may also be returned.
(*) Update object [mandatory]:
int (*update_object)(struct fscache_object *object)
......
......@@ -32,7 +32,7 @@ This document contains the following sections:
(9) Setting the data file size
(10) Page alloc/read/write
(11) Page uncaching
(12) Index and data file update
(12) Index and data file consistency
(13) Miscellaneous cookie operations
(14) Cookie unregistration
(15) Index invalidation
......@@ -433,7 +433,7 @@ to the caller. The attribute adjustment excludes read and write operations.
=====================
PAGE READ/ALLOC/WRITE
PAGE ALLOC/READ/WRITE
=====================
And the sixth step is to store and retrieve pages in the cache. There are
......@@ -499,7 +499,7 @@ Else if there's a copy of the page resident in the cache:
(*) An argument that's 0 on success or negative for an error code.
If an error occurs, it should be assumed that the page contains no usable
data.
data. fscache_readpages_cancel() may need to be called.
end_io_func() will be called in process context if the read is results in
an error, but it might be called in interrupt context if the read is
......@@ -623,6 +623,22 @@ some of the pages being read and some being allocated. Those pages will have
been marked appropriately and will need uncaching.
CANCELLATION OF UNREAD PAGES
----------------------------
If one or more pages are passed to fscache_read_or_alloc_pages() but not then
read from the cache and also not read from the underlying filesystem then
those pages will need to have any marks and reservations removed. This can be
done by calling:
void fscache_readpages_cancel(struct fscache_cookie *cookie,
struct list_head *pages);
prior to returning to the caller. The cookie argument should be as passed to
fscache_read_or_alloc_pages(). Every page in the pages list will be examined
and any that have PG_fscache set will be uncached.
==============
PAGE UNCACHING
==============
......@@ -690,9 +706,18 @@ written to the cache and for the cache to finish with the page generally. No
error is returned.
==========================
INDEX AND DATA FILE UPDATE
==========================
===============================
INDEX AND DATA FILE CONSISTENCY
===============================
To find out whether auxiliary data for an object is up to data within the
cache, the following function can be called:
int fscache_check_consistency(struct fscache_cookie *cookie)
This will call back to the netfs to check whether the auxiliary data associated
with a cookie is correct. It returns 0 if it is and -ESTALE if it isn't; it
may also return -ENOMEM and -ERESTARTSYS.
To request an update of the index data for an index or other object, the
following function should be called:
......
......@@ -1561,11 +1561,12 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
obj_request, obj_request->img_request, obj_request->result,
xferred, length);
/*
* ENOENT means a hole in the image. We zero-fill the
* entire length of the request. A short read also implies
* zero-fill to the end of the request. Either way we
* update the xferred count to indicate the whole request
* was satisfied.
* ENOENT means a hole in the image. We zero-fill the entire
* length of the request. A short read also implies zero-fill
* to the end of the request. An error requires the whole
* length of the request to be reported finished with an error
* to the block layer. In each case we update the xferred
* count to indicate the whole request was satisfied.
*/
rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
if (obj_request->result == -ENOENT) {
......@@ -1574,14 +1575,13 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
else
zero_pages(obj_request->pages, 0, length);
obj_request->result = 0;
obj_request->xferred = length;
} else if (xferred < length && !obj_request->result) {
if (obj_request->type == OBJ_REQUEST_BIO)
zero_bio_chain(obj_request->bio_list, xferred);
else
zero_pages(obj_request->pages, xferred, length);
obj_request->xferred = length;
}
obj_request->xferred = length;
obj_request_done_set(obj_request);
}
......@@ -2167,9 +2167,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
struct rbd_obj_request *obj_request = NULL;
struct rbd_obj_request *next_obj_request;
bool write_request = img_request_write_test(img_request);
struct bio *bio_list = 0;
struct bio *bio_list = NULL;
unsigned int bio_offset = 0;
struct page **pages = 0;
struct page **pages = NULL;
u64 img_offset;
u64 resid;
u16 opcode;
......@@ -2207,6 +2207,11 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
rbd_segment_name_free(object_name);
if (!obj_request)
goto out_unwind;
/*
* set obj_request->img_request before creating the
* osd_request so that it gets the right snapc
*/
rbd_img_obj_request_add(img_request, obj_request);
if (type == OBJ_REQUEST_BIO) {
unsigned int clone_size;
......@@ -2248,11 +2253,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
obj_request->pages, length,
offset & ~PAGE_MASK, false, false);
/*
* set obj_request->img_request before formatting
* the osd_request so that it gets the right snapc
*/
rbd_img_obj_request_add(img_request, obj_request);
if (write_request)
rbd_osd_req_format_write(obj_request);
else
......@@ -3706,12 +3706,14 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
if (ret < sizeof (size_buf))
return -ERANGE;
if (order)
if (order) {
*order = size_buf.order;
dout(" order %u", (unsigned int)*order);
}
*snap_size = le64_to_cpu(size_buf.size);
dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
(unsigned long long)snap_id, (unsigned int)*order,
dout(" snap_id 0x%016llx snap_size = %llu\n",
(unsigned long long)snap_id,
(unsigned long long)*snap_size);
return 0;
......
......@@ -377,6 +377,31 @@ static void cachefiles_sync_cache(struct fscache_cache *_cache)
ret);
}
/*
* check if the backing cache is updated to FS-Cache
* - called by FS-Cache when evaluates if need to invalidate the cache
*/
static bool cachefiles_check_consistency(struct fscache_operation *op)
{
struct cachefiles_object *object;
struct cachefiles_cache *cache;
const struct cred *saved_cred;
int ret;
_enter("{OBJ%x}", op->object->debug_id);
object = container_of(op->object, struct cachefiles_object, fscache);
cache = container_of(object->fscache.cache,
struct cachefiles_cache, cache);
cachefiles_begin_secure(cache, &saved_cred);
ret = cachefiles_check_auxdata(object);
cachefiles_end_secure(cache, saved_cred);
_leave(" = %d", ret);
return ret;
}
/*
* notification the attributes on an object have changed
* - called with reads/writes excluded by FS-Cache
......@@ -522,4 +547,5 @@ const struct fscache_cache_ops cachefiles_cache_ops = {
.write_page = cachefiles_write_page,
.uncache_page = cachefiles_uncache_page,
.dissociate_pages = cachefiles_dissociate_pages,
.check_consistency = cachefiles_check_consistency,
};
......@@ -235,6 +235,7 @@ extern int cachefiles_set_object_xattr(struct cachefiles_object *object,
struct cachefiles_xattr *auxdata);
extern int cachefiles_update_object_xattr(struct cachefiles_object *object,
struct cachefiles_xattr *auxdata);
extern int cachefiles_check_auxdata(struct cachefiles_object *object);
extern int cachefiles_check_object_xattr(struct cachefiles_object *object,
struct cachefiles_xattr *auxdata);
extern int cachefiles_remove_object_xattr(struct cachefiles_cache *cache,
......
......@@ -156,6 +156,42 @@ int cachefiles_update_object_xattr(struct cachefiles_object *object,
return ret;
}
/*
* check the consistency between the backing cache and the FS-Cache cookie
*/
int cachefiles_check_auxdata(struct cachefiles_object *object)
{
struct cachefiles_xattr *auxbuf;
struct dentry *dentry = object->dentry;
unsigned int dlen;
int ret;
ASSERT(dentry);
ASSERT(dentry->d_inode);
ASSERT(object->fscache.cookie->def->check_aux);
auxbuf = kmalloc(sizeof(struct cachefiles_xattr) + 512, GFP_KERNEL);
if (!auxbuf)
return -ENOMEM;
auxbuf->len = vfs_getxattr(dentry, cachefiles_xattr_cache,
&auxbuf->type, 512 + 1);
if (auxbuf->len < 1)
return -ESTALE;
if (auxbuf->type != object->fscache.cookie->def->type)
return -ESTALE;
dlen = auxbuf->len - 1;
ret = fscache_check_aux(&object->fscache, &auxbuf->data, dlen);
kfree(auxbuf);
if (ret != FSCACHE_CHECKAUX_OKAY)
return -ESTALE;
return 0;
}
/*
* check the state xattr on a cache file
* - return -ESTALE if the object should be deleted
......
......@@ -16,3 +16,12 @@ config CEPH_FS
If unsure, say N.
if CEPH_FS
config CEPH_FSCACHE
bool "Enable Ceph client caching support"
depends on CEPH_FS=m && FSCACHE || CEPH_FS=y && FSCACHE=y
help
Choose Y here to enable persistent, read-only local
caching support for Ceph clients using FS-Cache
endif
......@@ -9,3 +9,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
mds_client.o mdsmap.o strings.o ceph_frag.o \
debugfs.o
ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
......@@ -11,6 +11,7 @@
#include "super.h"
#include "mds_client.h"
#include "cache.h"
#include <linux/ceph/osd_client.h>
/*
......@@ -70,15 +71,16 @@ static int ceph_set_page_dirty(struct page *page)
struct address_space *mapping = page->mapping;
struct inode *inode;
struct ceph_inode_info *ci;
int undo = 0;
struct ceph_snap_context *snapc;
int ret;
if (unlikely(!mapping))
return !TestSetPageDirty(page);
if (TestSetPageDirty(page)) {
if (PageDirty(page)) {
dout("%p set_page_dirty %p idx %lu -- already dirty\n",
mapping->host, page, page->index);
BUG_ON(!PagePrivate(page));
return 0;
}
......@@ -107,35 +109,19 @@ static int ceph_set_page_dirty(struct page *page)
snapc, snapc->seq, snapc->num_snaps);
spin_unlock(&ci->i_ceph_lock);
/* now adjust page */
spin_lock_irq(&mapping->tree_lock);
if (page->mapping) { /* Race with truncate? */
WARN_ON_ONCE(!PageUptodate(page));
account_page_dirtied(page, page->mapping);
radix_tree_tag_set(&mapping->page_tree,
page_index(page), PAGECACHE_TAG_DIRTY);
/*
* Reference snap context in page->private. Also set
* PagePrivate so that we get invalidatepage callback.
*/
page->private = (unsigned long)snapc;
SetPagePrivate(page);
} else {
dout("ANON set_page_dirty %p (raced truncate?)\n", page);
undo = 1;
}
spin_unlock_irq(&mapping->tree_lock);
if (undo)
/* whoops, we failed to dirty the page */
ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
/*
* Reference snap context in page->private. Also set
* PagePrivate so that we get invalidatepage callback.
*/
BUG_ON(PagePrivate(page));
page->private = (unsigned long)snapc;
SetPagePrivate(page);
__mark_inode_dirty(mapping->host, I_DIRTY_PAGES);
ret = __set_page_dirty_nobuffers(page);
WARN_ON(!PageLocked(page));
WARN_ON(!page->mapping);
BUG_ON(!PageDirty(page));
return 1;
return ret;
}
/*
......@@ -150,11 +136,19 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
struct ceph_inode_info *ci;
struct ceph_snap_context *snapc = page_snap_context(page);
BUG_ON(!PageLocked(page));
BUG_ON(!PagePrivate(page));
BUG_ON(!page->mapping);
inode = page->mapping->host;
ci = ceph_inode(inode);
if (offset != 0 || length != PAGE_CACHE_SIZE) {
dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
inode, page, page->index, offset, length);
return;
}
ceph_invalidate_fscache_page(inode, page);
if (!PagePrivate(page))
return;
/*
* We can get non-dirty pages here due to races between
......@@ -164,31 +158,28 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
if (!PageDirty(page))
pr_err("%p invalidatepage %p page not dirty\n", inode, page);
if (offset == 0 && length == PAGE_CACHE_SIZE)
ClearPageChecked(page);
ClearPageChecked(page);
ci = ceph_inode(inode);
if (offset == 0 && length == PAGE_CACHE_SIZE) {
dout("%p invalidatepage %p idx %lu full dirty page\n",
inode, page, page->index);
ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
ceph_put_snap_context(snapc);
page->private = 0;
ClearPagePrivate(page);
} else {
dout("%p invalidatepage %p idx %lu partial dirty page %u(%u)\n",
inode, page, page->index, offset, length);
}
dout("%p invalidatepage %p idx %lu full dirty page\n",
inode, page, page->index);
ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
ceph_put_snap_context(snapc);
page->private = 0;
ClearPagePrivate(page);
}
/* just a sanity check */
static int ceph_releasepage(struct page *page, gfp_t g)
{
struct inode *inode = page->mapping ? page->mapping->host : NULL;
dout("%p releasepage %p idx %lu\n", inode, page, page->index);
WARN_ON(PageDirty(page));
WARN_ON(PagePrivate(page));
return 0;
/* Can we release the page from the cache? */
if (!ceph_release_fscache_page(page, g))
return 0;
return !PagePrivate(page);
}
/*
......@@ -198,11 +189,16 @@ static int readpage_nounlock(struct file *filp, struct page *page)
{
struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_client *osdc =
struct ceph_osd_client *osdc =
&ceph_inode_to_client(inode)->client->osdc;
int err = 0;
u64 len = PAGE_CACHE_SIZE;
err = ceph_readpage_from_fscache(inode, page);
if (err == 0)
goto out;
dout("readpage inode %p file %p page %p index %lu\n",
inode, filp, page, page->index);
err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
......@@ -220,6 +216,9 @@ static int readpage_nounlock(struct file *filp, struct page *page)
}
SetPageUptodate(page);
if (err == 0)
ceph_readpage_to_fscache(inode, page);
out:
return err < 0 ? err : 0;
}
......@@ -262,6 +261,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
page->index);
flush_dcache_page(page);
SetPageUptodate(page);
ceph_readpage_to_fscache(inode, page);
unlock_page(page);
page_cache_release(page);
bytes -= PAGE_CACHE_SIZE;
......@@ -331,11 +331,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
page = list_entry(page_list->prev, struct page, lru);
BUG_ON(PageLocked(page));
list_del(&page->lru);
dout("start_read %p adding %p idx %lu\n", inode, page,
page->index);
if (add_to_page_cache_lru(page, &inode->i_data, page->index,
GFP_NOFS)) {
ceph_fscache_uncache_page(inode, page);
page_cache_release(page);
dout("start_read %p add_to_page_cache failed %p\n",
inode, page);
......@@ -378,6 +379,12 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
int rc = 0;
int max = 0;
rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
&nr_pages);
if (rc == 0)
goto out;
if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
>> PAGE_SHIFT;
......@@ -392,6 +399,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
BUG_ON(rc == 0);
}
out:
ceph_fscache_readpages_cancel(inode, page_list);
dout("readpages %p file %p ret %d\n", inode, file, rc);
return rc;
}
......@@ -497,6 +506,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
ceph_readpage_to_fscache(inode, page);
set_page_writeback(page);
err = ceph_osdc_writepages(osdc, ceph_vino(inode),
&ci->i_layout, snapc,
......@@ -552,7 +563,6 @@ static void ceph_release_pages(struct page **pages, int num)
pagevec_release(&pvec);
}
/*
* async writeback completion handler.
*
......
/*
* Ceph cache definitions.
*
* Copyright (C) 2013 by Adfin Solutions, Inc. All Rights Reserved.
* Written by Milosz Tanski (milosz@adfin.com)
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to:
* Free Software Foundation
* 51 Franklin Street, Fifth Floor
* Boston, MA 02111-1301 USA
*
*/
#include "super.h"
#include "cache.h"
struct ceph_aux_inode {
struct timespec mtime;
loff_t size;
};
struct fscache_netfs ceph_cache_netfs = {
.name = "ceph",
.version = 0,
};
static uint16_t ceph_fscache_session_get_key(const void *cookie_netfs_data,
void *buffer, uint16_t maxbuf)
{
const struct ceph_fs_client* fsc = cookie_netfs_data;
uint16_t klen;
klen = sizeof(fsc->client->fsid);
if (klen > maxbuf)
return 0;
memcpy(buffer, &fsc->client->fsid, klen);
return klen;
}
static const struct fscache_cookie_def ceph_fscache_fsid_object_def = {
.name = "CEPH.fsid",
.type = FSCACHE_COOKIE_TYPE_INDEX,
.get_key = ceph_fscache_session_get_key,
};
int ceph_fscache_register(void)
{
return fscache_register_netfs(&ceph_cache_netfs);
}
void ceph_fscache_unregister(void)
{
fscache_unregister_netfs(&ceph_cache_netfs);
}
int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
{
fsc->fscache = fscache_acquire_cookie(ceph_cache_netfs.primary_index,
&ceph_fscache_fsid_object_def,
fsc);
if (fsc->fscache == NULL) {
pr_err("Unable to resgister fsid: %p fscache cookie", fsc);
return 0;
}
fsc->revalidate_wq = alloc_workqueue("ceph-revalidate", 0, 1);
if (fsc->revalidate_wq == NULL)
return -ENOMEM;
return 0;
}
static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data,
void *buffer, uint16_t maxbuf)
{
const struct ceph_inode_info* ci = cookie_netfs_data;
uint16_t klen;
/* use ceph virtual inode (id + snaphot) */
klen = sizeof(ci->i_vino);
if (klen > maxbuf)
return 0;
memcpy(buffer, &ci->i_vino, klen);
return klen;
}
static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data,
void *buffer, uint16_t bufmax)
{
struct ceph_aux_inode aux;
const struct ceph_inode_info* ci = cookie_netfs_data;
const struct inode* inode = &ci->vfs_inode;
memset(&aux, 0, sizeof(aux));
aux.mtime = inode->i_mtime;
aux.size = inode->i_size;
memcpy(buffer, &aux, sizeof(aux));
return sizeof(aux);