forked from mirrors/linux
		
	ceph: Implement writev/pwritev for sync operation.
For writev/pwritev sync-operatoin, ceph only do the first iov. I divided the write-sync-operation into two functions. One for direct-write, other for none-direct-sync-write. This is because for none-direct-sync-write we can merge iovs to one. But for direct-write, we can't merge iovs. Signed-off-by: Jianpeng Ma <majianpeng@gmail.com> Reviewed-by: Yan, Zheng <zheng.z.yan@intel.com> Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
		
							parent
							
								
									9f12bd119e
								
							
						
					
					
						commit
						e8344e6689
					
				
					 1 changed files with 192 additions and 79 deletions
				
			
		
							
								
								
									
										249
									
								
								fs/ceph/file.c
									
									
									
									
									
								
							
							
						
						
									
										249
									
								
								fs/ceph/file.c
									
									
									
									
									
								
							| 
						 | 
				
			
			@ -489,83 +489,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Synchronous write, straight from __user pointer or user pages (if
 | 
			
		||||
 * O_DIRECT).
 | 
			
		||||
 * Synchronous write, straight from __user pointer or user pages.
 | 
			
		||||
 *
 | 
			
		||||
 * If write spans object boundary, just do multiple writes.  (For a
 | 
			
		||||
 * correct atomic write, we should e.g. take write locks on all
 | 
			
		||||
 * objects, rollback on failure, etc.)
 | 
			
		||||
 */
 | 
			
		||||
static ssize_t ceph_sync_write(struct file *file, const char __user *data,
 | 
			
		||||
			       size_t left, loff_t pos, loff_t *ppos)
 | 
			
		||||
static ssize_t
 | 
			
		||||
ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
 | 
			
		||||
		       unsigned long nr_segs, size_t count)
 | 
			
		||||
{
 | 
			
		||||
	struct file *file = iocb->ki_filp;
 | 
			
		||||
	struct inode *inode = file_inode(file);
 | 
			
		||||
	struct ceph_inode_info *ci = ceph_inode(inode);
 | 
			
		||||
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 | 
			
		||||
	struct ceph_snap_context *snapc;
 | 
			
		||||
	struct ceph_vino vino;
 | 
			
		||||
	struct ceph_osd_request *req;
 | 
			
		||||
	int num_ops = 1;
 | 
			
		||||
	struct page **pages;
 | 
			
		||||
	int num_pages;
 | 
			
		||||
	u64 len;
 | 
			
		||||
	int written = 0;
 | 
			
		||||
	int flags;
 | 
			
		||||
	int check_caps = 0;
 | 
			
		||||
	int page_align, io_align;
 | 
			
		||||
	unsigned long buf_align;
 | 
			
		||||
	int page_align;
 | 
			
		||||
	int ret;
 | 
			
		||||
	struct timespec mtime = CURRENT_TIME;
 | 
			
		||||
	bool own_pages = false;
 | 
			
		||||
	loff_t pos = iocb->ki_pos;
 | 
			
		||||
	struct iov_iter i;
 | 
			
		||||
 | 
			
		||||
	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
 | 
			
		||||
		return -EROFS;
 | 
			
		||||
 | 
			
		||||
	dout("sync_write on file %p %lld~%u %s\n", file, pos,
 | 
			
		||||
	     (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
 | 
			
		||||
	dout("sync_direct_write on file %p %lld~%u\n", file, pos,
 | 
			
		||||
	     (unsigned)count);
 | 
			
		||||
 | 
			
		||||
	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
 | 
			
		||||
	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
 | 
			
		||||
	if (ret < 0)
 | 
			
		||||
		return ret;
 | 
			
		||||
 | 
			
		||||
	ret = invalidate_inode_pages2_range(inode->i_mapping,
 | 
			
		||||
					    pos >> PAGE_CACHE_SHIFT,
 | 
			
		||||
					    (pos + left) >> PAGE_CACHE_SHIFT);
 | 
			
		||||
					    (pos + count) >> PAGE_CACHE_SHIFT);
 | 
			
		||||
	if (ret < 0)
 | 
			
		||||
		dout("invalidate_inode_pages2_range returned %d\n", ret);
 | 
			
		||||
 | 
			
		||||
	flags = CEPH_OSD_FLAG_ORDERSNAP |
 | 
			
		||||
		CEPH_OSD_FLAG_ONDISK |
 | 
			
		||||
		CEPH_OSD_FLAG_WRITE;
 | 
			
		||||
	if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
 | 
			
		||||
		flags |= CEPH_OSD_FLAG_ACK;
 | 
			
		||||
	else
 | 
			
		||||
		num_ops++;	/* Also include a 'startsync' command. */
 | 
			
		||||
 | 
			
		||||
	/*
 | 
			
		||||
	 * we may need to do multiple writes here if we span an object
 | 
			
		||||
	 * boundary.  this isn't atomic, unfortunately.  :(
 | 
			
		||||
	 */
 | 
			
		||||
more:
 | 
			
		||||
	io_align = pos & ~PAGE_MASK;
 | 
			
		||||
	buf_align = (unsigned long)data & ~PAGE_MASK;
 | 
			
		||||
	len = left;
 | 
			
		||||
	iov_iter_init(&i, iov, nr_segs, count, 0);
 | 
			
		||||
 | 
			
		||||
	while (iov_iter_count(&i) > 0) {
 | 
			
		||||
		void __user *data = i.iov->iov_base + i.iov_offset;
 | 
			
		||||
		u64 len = i.iov->iov_len - i.iov_offset;
 | 
			
		||||
 | 
			
		||||
		page_align = (unsigned long)data & ~PAGE_MASK;
 | 
			
		||||
 | 
			
		||||
		snapc = ci->i_snap_realm->cached_context;
 | 
			
		||||
		vino = ceph_vino(inode);
 | 
			
		||||
		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 | 
			
		||||
				    vino, pos, &len, num_ops,
 | 
			
		||||
					    vino, pos, &len,
 | 
			
		||||
					    2,/*include a 'startsync' command*/
 | 
			
		||||
					    CEPH_OSD_OP_WRITE, flags, snapc,
 | 
			
		||||
				    ci->i_truncate_seq, ci->i_truncate_size,
 | 
			
		||||
					    ci->i_truncate_seq,
 | 
			
		||||
					    ci->i_truncate_size,
 | 
			
		||||
					    false);
 | 
			
		||||
	if (IS_ERR(req))
 | 
			
		||||
		return PTR_ERR(req);
 | 
			
		||||
		if (IS_ERR(req)) {
 | 
			
		||||
			ret = PTR_ERR(req);
 | 
			
		||||
			goto out;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	/* write from beginning of first page, regardless of io alignment */
 | 
			
		||||
	page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
 | 
			
		||||
		num_pages = calc_pages_for(page_align, len);
 | 
			
		||||
	if (file->f_flags & O_DIRECT) {
 | 
			
		||||
		pages = ceph_get_direct_page_vector(data, num_pages, false);
 | 
			
		||||
		if (IS_ERR(pages)) {
 | 
			
		||||
			ret = PTR_ERR(pages);
 | 
			
		||||
| 
						 | 
				
			
			@ -578,27 +574,8 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
 | 
			
		|||
		 */
 | 
			
		||||
		truncate_inode_pages_range(inode->i_mapping, pos,
 | 
			
		||||
				   (pos+len) | (PAGE_CACHE_SIZE-1));
 | 
			
		||||
	} else {
 | 
			
		||||
		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
 | 
			
		||||
		if (IS_ERR(pages)) {
 | 
			
		||||
			ret = PTR_ERR(pages);
 | 
			
		||||
			goto out;
 | 
			
		||||
		}
 | 
			
		||||
		ret = ceph_copy_user_to_page_vector(pages, data, pos, len);
 | 
			
		||||
		if (ret < 0) {
 | 
			
		||||
			ceph_release_page_vector(pages, num_pages);
 | 
			
		||||
			goto out;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if ((file->f_flags & O_SYNC) == 0) {
 | 
			
		||||
			/* get a second commit callback */
 | 
			
		||||
			req->r_unsafe_callback = ceph_sync_write_unsafe;
 | 
			
		||||
			req->r_inode = inode;
 | 
			
		||||
			own_pages = true;
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
 | 
			
		||||
					false, own_pages);
 | 
			
		||||
						false, false);
 | 
			
		||||
 | 
			
		||||
		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
 | 
			
		||||
		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
 | 
			
		||||
| 
						 | 
				
			
			@ -607,34 +584,168 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
 | 
			
		|||
		if (!ret)
 | 
			
		||||
			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 | 
			
		||||
 | 
			
		||||
	if (file->f_flags & O_DIRECT)
 | 
			
		||||
		ceph_put_page_vector(pages, num_pages, false);
 | 
			
		||||
	else if (file->f_flags & O_SYNC)
 | 
			
		||||
		ceph_release_page_vector(pages, num_pages);
 | 
			
		||||
 | 
			
		||||
out:
 | 
			
		||||
		ceph_osdc_put_request(req);
 | 
			
		||||
		if (ret == 0) {
 | 
			
		||||
			pos += len;
 | 
			
		||||
			written += len;
 | 
			
		||||
		left -= len;
 | 
			
		||||
		data += len;
 | 
			
		||||
		if (left)
 | 
			
		||||
			goto more;
 | 
			
		||||
			iov_iter_advance(&i, (size_t)len);
 | 
			
		||||
 | 
			
		||||
		ret = written;
 | 
			
		||||
		*ppos = pos;
 | 
			
		||||
		if (pos > i_size_read(inode))
 | 
			
		||||
			if (pos > i_size_read(inode)) {
 | 
			
		||||
				check_caps = ceph_inode_set_size(inode, pos);
 | 
			
		||||
				if (check_caps)
 | 
			
		||||
			ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
 | 
			
		||||
					ceph_check_caps(ceph_inode(inode),
 | 
			
		||||
							CHECK_CAPS_AUTHONLY,
 | 
			
		||||
							NULL);
 | 
			
		||||
	} else if (ret != -EOLDSNAPC && written > 0) {
 | 
			
		||||
			}
 | 
			
		||||
		} else
 | 
			
		||||
			break;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if (ret != -EOLDSNAPC && written > 0) {
 | 
			
		||||
		iocb->ki_pos = pos;
 | 
			
		||||
		ret = written;
 | 
			
		||||
	}
 | 
			
		||||
	return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Synchronous write, straight from __user pointer or user pages.
 | 
			
		||||
 *
 | 
			
		||||
 * If write spans object boundary, just do multiple writes.  (For a
 | 
			
		||||
 * correct atomic write, we should e.g. take write locks on all
 | 
			
		||||
 * objects, rollback on failure, etc.)
 | 
			
		||||
 */
 | 
			
		||||
static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
 | 
			
		||||
			       unsigned long nr_segs, size_t count)
 | 
			
		||||
{
 | 
			
		||||
	struct file *file = iocb->ki_filp;
 | 
			
		||||
	struct inode *inode = file_inode(file);
 | 
			
		||||
	struct ceph_inode_info *ci = ceph_inode(inode);
 | 
			
		||||
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 | 
			
		||||
	struct ceph_snap_context *snapc;
 | 
			
		||||
	struct ceph_vino vino;
 | 
			
		||||
	struct ceph_osd_request *req;
 | 
			
		||||
	struct page **pages;
 | 
			
		||||
	u64 len;
 | 
			
		||||
	int num_pages;
 | 
			
		||||
	int written = 0;
 | 
			
		||||
	int flags;
 | 
			
		||||
	int check_caps = 0;
 | 
			
		||||
	int ret;
 | 
			
		||||
	struct timespec mtime = CURRENT_TIME;
 | 
			
		||||
	loff_t pos = iocb->ki_pos;
 | 
			
		||||
	struct iov_iter i;
 | 
			
		||||
 | 
			
		||||
	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
 | 
			
		||||
		return -EROFS;
 | 
			
		||||
 | 
			
		||||
	dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
 | 
			
		||||
 | 
			
		||||
	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
 | 
			
		||||
	if (ret < 0)
 | 
			
		||||
		return ret;
 | 
			
		||||
 | 
			
		||||
	ret = invalidate_inode_pages2_range(inode->i_mapping,
 | 
			
		||||
					    pos >> PAGE_CACHE_SHIFT,
 | 
			
		||||
					    (pos + count) >> PAGE_CACHE_SHIFT);
 | 
			
		||||
	if (ret < 0)
 | 
			
		||||
		dout("invalidate_inode_pages2_range returned %d\n", ret);
 | 
			
		||||
 | 
			
		||||
	flags = CEPH_OSD_FLAG_ORDERSNAP |
 | 
			
		||||
		CEPH_OSD_FLAG_ONDISK |
 | 
			
		||||
		CEPH_OSD_FLAG_WRITE |
 | 
			
		||||
		CEPH_OSD_FLAG_ACK;
 | 
			
		||||
 | 
			
		||||
	iov_iter_init(&i, iov, nr_segs, count, 0);
 | 
			
		||||
 | 
			
		||||
	while ((len = iov_iter_count(&i)) > 0) {
 | 
			
		||||
		size_t left;
 | 
			
		||||
		int n;
 | 
			
		||||
 | 
			
		||||
		snapc = ci->i_snap_realm->cached_context;
 | 
			
		||||
		vino = ceph_vino(inode);
 | 
			
		||||
		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 | 
			
		||||
					    vino, pos, &len, 1,
 | 
			
		||||
					    CEPH_OSD_OP_WRITE, flags, snapc,
 | 
			
		||||
					    ci->i_truncate_seq,
 | 
			
		||||
					    ci->i_truncate_size,
 | 
			
		||||
					    false);
 | 
			
		||||
		if (IS_ERR(req)) {
 | 
			
		||||
			ret = PTR_ERR(req);
 | 
			
		||||
			goto out;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		/*
 | 
			
		||||
		 * write from beginning of first page,
 | 
			
		||||
		 * regardless of io alignment
 | 
			
		||||
		 */
 | 
			
		||||
		num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
 | 
			
		||||
 | 
			
		||||
		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
 | 
			
		||||
		if (IS_ERR(pages)) {
 | 
			
		||||
			ret = PTR_ERR(pages);
 | 
			
		||||
			goto out;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		left = len;
 | 
			
		||||
		for (n = 0; n < num_pages; n++) {
 | 
			
		||||
			size_t plen = min(left, PAGE_SIZE);
 | 
			
		||||
			ret = iov_iter_copy_from_user(pages[n], &i, 0, plen);
 | 
			
		||||
			if (ret != plen) {
 | 
			
		||||
				ret = -EFAULT;
 | 
			
		||||
				break;
 | 
			
		||||
			}
 | 
			
		||||
			left -= ret;
 | 
			
		||||
			iov_iter_advance(&i, ret);
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if (ret < 0) {
 | 
			
		||||
			ceph_release_page_vector(pages, num_pages);
 | 
			
		||||
			goto out;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		/* get a second commit callback */
 | 
			
		||||
		req->r_unsafe_callback = ceph_sync_write_unsafe;
 | 
			
		||||
		req->r_inode = inode;
 | 
			
		||||
 | 
			
		||||
		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
 | 
			
		||||
						false, true);
 | 
			
		||||
 | 
			
		||||
		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
 | 
			
		||||
		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
 | 
			
		||||
 | 
			
		||||
		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
 | 
			
		||||
		if (!ret)
 | 
			
		||||
			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 | 
			
		||||
 | 
			
		||||
out:
 | 
			
		||||
		ceph_osdc_put_request(req);
 | 
			
		||||
		if (ret == 0) {
 | 
			
		||||
			pos += len;
 | 
			
		||||
			written += len;
 | 
			
		||||
 | 
			
		||||
			if (pos > i_size_read(inode)) {
 | 
			
		||||
				check_caps = ceph_inode_set_size(inode, pos);
 | 
			
		||||
				if (check_caps)
 | 
			
		||||
					ceph_check_caps(ceph_inode(inode),
 | 
			
		||||
							CHECK_CAPS_AUTHONLY,
 | 
			
		||||
							NULL);
 | 
			
		||||
			}
 | 
			
		||||
		} else
 | 
			
		||||
			break;
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if (ret != -EOLDSNAPC && written > 0) {
 | 
			
		||||
		ret = written;
 | 
			
		||||
		iocb->ki_pos = pos;
 | 
			
		||||
	}
 | 
			
		||||
	return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Wrap generic_file_aio_read with checks for cap bits on the inode.
 | 
			
		||||
 * Atomically grab references, so that those bits are not released
 | 
			
		||||
| 
						 | 
				
			
			@ -772,11 +883,13 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
 | 
			
		|||
	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
 | 
			
		||||
 | 
			
		||||
	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
 | 
			
		||||
	    (iocb->ki_filp->f_flags & O_DIRECT) ||
 | 
			
		||||
	    (fi->flags & CEPH_F_SYNC)) {
 | 
			
		||||
	    (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
 | 
			
		||||
		mutex_unlock(&inode->i_mutex);
 | 
			
		||||
		written = ceph_sync_write(file, iov->iov_base, count,
 | 
			
		||||
					  pos, &iocb->ki_pos);
 | 
			
		||||
		if (file->f_flags & O_DIRECT)
 | 
			
		||||
			written = ceph_sync_direct_write(iocb, iov,
 | 
			
		||||
							 nr_segs, count);
 | 
			
		||||
		else
 | 
			
		||||
			written = ceph_sync_write(iocb, iov, nr_segs, count);
 | 
			
		||||
		if (written == -EOLDSNAPC) {
 | 
			
		||||
			dout("aio_write %p %llx.%llx %llu~%u"
 | 
			
		||||
				"got EOLDSNAPC, retrying\n",
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in a new issue