mirror of
				https://github.com/torvalds/linux.git
				synced 2025-11-04 02:30:34 +02:00 
			
		
		
		
	libceph: make message data be a pointer
Begin the transition from a single message data item to a list of
them by replacing the "data" structure in a message with a pointer
to a ceph_msg_data structure.
A null pointer will indicate the message has no data; replace the
use of ceph_msg_has_data() with a simple check for a null pointer.
Create functions ceph_msg_data_create() and ceph_msg_data_destroy()
to dynamically allocate and free a data item structure of a given type.
When a message has its data item "set," allocate one of these to
hold the data description, and free it when the last reference to
the message is dropped.
This partially resolves:
    http://tracker.ceph.com/issues/4429
Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
			
			
This commit is contained in:
		
							parent
							
								
									8ea299bcbc
								
							
						
					
					
						commit
						6644ed7b7e
					
				
					 2 changed files with 63 additions and 38 deletions
				
			
		| 
						 | 
					@ -64,8 +64,6 @@ struct ceph_messenger {
 | 
				
			||||||
	u32 required_features;
 | 
						u32 required_features;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#define ceph_msg_has_data(m)		((m)->data.type != CEPH_MSG_DATA_NONE)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
enum ceph_msg_data_type {
 | 
					enum ceph_msg_data_type {
 | 
				
			||||||
	CEPH_MSG_DATA_NONE,	/* message contains no data payload */
 | 
						CEPH_MSG_DATA_NONE,	/* message contains no data payload */
 | 
				
			||||||
	CEPH_MSG_DATA_PAGES,	/* data source/destination is a page array */
 | 
						CEPH_MSG_DATA_PAGES,	/* data source/destination is a page array */
 | 
				
			||||||
| 
						 | 
					@ -141,8 +139,7 @@ struct ceph_msg {
 | 
				
			||||||
	struct kvec front;              /* unaligned blobs of message */
 | 
						struct kvec front;              /* unaligned blobs of message */
 | 
				
			||||||
	struct ceph_buffer *middle;
 | 
						struct ceph_buffer *middle;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* data payload */
 | 
						struct ceph_msg_data	*data;	/* data payload */
 | 
				
			||||||
	struct ceph_msg_data	data;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	struct ceph_connection *con;
 | 
						struct ceph_connection *con;
 | 
				
			||||||
	struct list_head list_head;	/* links for connection lists */
 | 
						struct list_head list_head;	/* links for connection lists */
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1086,7 +1086,7 @@ static void prepare_message_data(struct ceph_msg *msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* Initialize data cursor */
 | 
						/* Initialize data cursor */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ceph_msg_data_cursor_init(&msg->data, data_len);
 | 
						ceph_msg_data_cursor_init(msg->data, data_len);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/*
 | 
					/*
 | 
				
			||||||
| 
						 | 
					@ -1406,13 +1406,13 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page,
 | 
				
			||||||
static int write_partial_message_data(struct ceph_connection *con)
 | 
					static int write_partial_message_data(struct ceph_connection *con)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct ceph_msg *msg = con->out_msg;
 | 
						struct ceph_msg *msg = con->out_msg;
 | 
				
			||||||
	struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
 | 
						struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
 | 
				
			||||||
	bool do_datacrc = !con->msgr->nocrc;
 | 
						bool do_datacrc = !con->msgr->nocrc;
 | 
				
			||||||
	u32 crc;
 | 
						u32 crc;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	dout("%s %p msg %p\n", __func__, con, msg);
 | 
						dout("%s %p msg %p\n", __func__, con, msg);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (WARN_ON(!ceph_msg_has_data(msg)))
 | 
						if (WARN_ON(!msg->data))
 | 
				
			||||||
		return -EINVAL;
 | 
							return -EINVAL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/*
 | 
						/*
 | 
				
			||||||
| 
						 | 
					@ -1432,7 +1432,7 @@ static int write_partial_message_data(struct ceph_connection *con)
 | 
				
			||||||
		bool need_crc;
 | 
							bool need_crc;
 | 
				
			||||||
		int ret;
 | 
							int ret;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		page = ceph_msg_data_next(&msg->data, &page_offset, &length,
 | 
							page = ceph_msg_data_next(msg->data, &page_offset, &length,
 | 
				
			||||||
							&last_piece);
 | 
												&last_piece);
 | 
				
			||||||
		ret = ceph_tcp_sendpage(con->sock, page, page_offset,
 | 
							ret = ceph_tcp_sendpage(con->sock, page, page_offset,
 | 
				
			||||||
				      length, last_piece);
 | 
									      length, last_piece);
 | 
				
			||||||
| 
						 | 
					@ -1444,7 +1444,7 @@ static int write_partial_message_data(struct ceph_connection *con)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if (do_datacrc && cursor->need_crc)
 | 
							if (do_datacrc && cursor->need_crc)
 | 
				
			||||||
			crc = ceph_crc32c_page(crc, page, page_offset, length);
 | 
								crc = ceph_crc32c_page(crc, page, page_offset, length);
 | 
				
			||||||
		need_crc = ceph_msg_data_advance(&msg->data, (size_t) ret);
 | 
							need_crc = ceph_msg_data_advance(msg->data, (size_t)ret);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	dout("%s %p msg %p done\n", __func__, con, msg);
 | 
						dout("%s %p msg %p done\n", __func__, con, msg);
 | 
				
			||||||
| 
						 | 
					@ -2104,7 +2104,7 @@ static int read_partial_message_section(struct ceph_connection *con,
 | 
				
			||||||
static int read_partial_msg_data(struct ceph_connection *con)
 | 
					static int read_partial_msg_data(struct ceph_connection *con)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct ceph_msg *msg = con->in_msg;
 | 
						struct ceph_msg *msg = con->in_msg;
 | 
				
			||||||
	struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
 | 
						struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
 | 
				
			||||||
	const bool do_datacrc = !con->msgr->nocrc;
 | 
						const bool do_datacrc = !con->msgr->nocrc;
 | 
				
			||||||
	struct page *page;
 | 
						struct page *page;
 | 
				
			||||||
	size_t page_offset;
 | 
						size_t page_offset;
 | 
				
			||||||
| 
						 | 
					@ -2113,13 +2113,13 @@ static int read_partial_msg_data(struct ceph_connection *con)
 | 
				
			||||||
	int ret;
 | 
						int ret;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	BUG_ON(!msg);
 | 
						BUG_ON(!msg);
 | 
				
			||||||
	if (WARN_ON(!ceph_msg_has_data(msg)))
 | 
						if (!msg->data)
 | 
				
			||||||
		return -EIO;
 | 
							return -EIO;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (do_datacrc)
 | 
						if (do_datacrc)
 | 
				
			||||||
		crc = con->in_data_crc;
 | 
							crc = con->in_data_crc;
 | 
				
			||||||
	while (cursor->resid) {
 | 
						while (cursor->resid) {
 | 
				
			||||||
		page = ceph_msg_data_next(&msg->data, &page_offset, &length,
 | 
							page = ceph_msg_data_next(msg->data, &page_offset, &length,
 | 
				
			||||||
							NULL);
 | 
												NULL);
 | 
				
			||||||
		ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
 | 
							ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
 | 
				
			||||||
		if (ret <= 0) {
 | 
							if (ret <= 0) {
 | 
				
			||||||
| 
						 | 
					@ -2131,7 +2131,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if (do_datacrc)
 | 
							if (do_datacrc)
 | 
				
			||||||
			crc = ceph_crc32c_page(crc, page, page_offset, ret);
 | 
								crc = ceph_crc32c_page(crc, page, page_offset, ret);
 | 
				
			||||||
		(void) ceph_msg_data_advance(&msg->data, (size_t) ret);
 | 
							(void) ceph_msg_data_advance(msg->data, (size_t)ret);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if (do_datacrc)
 | 
						if (do_datacrc)
 | 
				
			||||||
		con->in_data_crc = crc;
 | 
							con->in_data_crc = crc;
 | 
				
			||||||
| 
						 | 
					@ -2947,44 +2947,80 @@ void ceph_con_keepalive(struct ceph_connection *con)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
EXPORT_SYMBOL(ceph_con_keepalive);
 | 
					EXPORT_SYMBOL(ceph_con_keepalive);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void ceph_msg_data_init(struct ceph_msg_data *data)
 | 
					static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	data->type = CEPH_MSG_DATA_NONE;
 | 
						struct ceph_msg_data *data;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (WARN_ON(!ceph_msg_data_type_valid(type)))
 | 
				
			||||||
 | 
							return NULL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						data = kzalloc(sizeof (*data), GFP_NOFS);
 | 
				
			||||||
 | 
						if (data)
 | 
				
			||||||
 | 
							data->type = type;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return data;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void ceph_msg_data_destroy(struct ceph_msg_data *data)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						if (!data)
 | 
				
			||||||
 | 
							return;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (data->type == CEPH_MSG_DATA_PAGELIST) {
 | 
				
			||||||
 | 
							ceph_pagelist_release(data->pagelist);
 | 
				
			||||||
 | 
							kfree(data->pagelist);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						kfree(data);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
 | 
					void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
 | 
				
			||||||
		size_t length, size_t alignment)
 | 
							size_t length, size_t alignment)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
						struct ceph_msg_data *data;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	BUG_ON(!pages);
 | 
						BUG_ON(!pages);
 | 
				
			||||||
	BUG_ON(!length);
 | 
						BUG_ON(!length);
 | 
				
			||||||
	BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
 | 
						BUG_ON(msg->data != NULL);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	msg->data.type = CEPH_MSG_DATA_PAGES;
 | 
						data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
 | 
				
			||||||
	msg->data.pages = pages;
 | 
						BUG_ON(!data);
 | 
				
			||||||
	msg->data.length = length;
 | 
						data->pages = pages;
 | 
				
			||||||
	msg->data.alignment = alignment & ~PAGE_MASK;
 | 
						data->length = length;
 | 
				
			||||||
 | 
						data->alignment = alignment & ~PAGE_MASK;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						msg->data = data;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
EXPORT_SYMBOL(ceph_msg_data_set_pages);
 | 
					EXPORT_SYMBOL(ceph_msg_data_set_pages);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
 | 
					void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
 | 
				
			||||||
				struct ceph_pagelist *pagelist)
 | 
									struct ceph_pagelist *pagelist)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
						struct ceph_msg_data *data;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	BUG_ON(!pagelist);
 | 
						BUG_ON(!pagelist);
 | 
				
			||||||
	BUG_ON(!pagelist->length);
 | 
						BUG_ON(!pagelist->length);
 | 
				
			||||||
	BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
 | 
						BUG_ON(msg->data != NULL);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	msg->data.type = CEPH_MSG_DATA_PAGELIST;
 | 
						data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
 | 
				
			||||||
	msg->data.pagelist = pagelist;
 | 
						BUG_ON(!data);
 | 
				
			||||||
 | 
						data->pagelist = pagelist;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						msg->data = data;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
 | 
					EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
 | 
					void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	BUG_ON(!bio);
 | 
						struct ceph_msg_data *data;
 | 
				
			||||||
	BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	msg->data.type = CEPH_MSG_DATA_BIO;
 | 
						BUG_ON(!bio);
 | 
				
			||||||
	msg->data.bio = bio;
 | 
						BUG_ON(msg->data != NULL);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
 | 
				
			||||||
 | 
						BUG_ON(!data);
 | 
				
			||||||
 | 
						data->bio = bio;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						msg->data = data;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
EXPORT_SYMBOL(ceph_msg_data_set_bio);
 | 
					EXPORT_SYMBOL(ceph_msg_data_set_bio);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -3008,8 +3044,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 | 
				
			||||||
	INIT_LIST_HEAD(&m->list_head);
 | 
						INIT_LIST_HEAD(&m->list_head);
 | 
				
			||||||
	kref_init(&m->kref);
 | 
						kref_init(&m->kref);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ceph_msg_data_init(&m->data);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	/* front */
 | 
						/* front */
 | 
				
			||||||
	m->front_max = front_len;
 | 
						m->front_max = front_len;
 | 
				
			||||||
	if (front_len) {
 | 
						if (front_len) {
 | 
				
			||||||
| 
						 | 
					@ -3163,14 +3197,8 @@ void ceph_msg_last_put(struct kref *kref)
 | 
				
			||||||
		ceph_buffer_put(m->middle);
 | 
							ceph_buffer_put(m->middle);
 | 
				
			||||||
		m->middle = NULL;
 | 
							m->middle = NULL;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if (ceph_msg_has_data(m)) {
 | 
						ceph_msg_data_destroy(m->data);
 | 
				
			||||||
		if (m->data.type == CEPH_MSG_DATA_PAGELIST) {
 | 
						m->data = NULL;
 | 
				
			||||||
			ceph_pagelist_release(m->data.pagelist);
 | 
					 | 
				
			||||||
			kfree(m->data.pagelist);
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		memset(&m->data, 0, sizeof m->data);
 | 
					 | 
				
			||||||
		ceph_msg_data_init(&m->data);
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (m->pool)
 | 
						if (m->pool)
 | 
				
			||||||
		ceph_msgpool_put(m->pool, m);
 | 
							ceph_msgpool_put(m->pool, m);
 | 
				
			||||||
| 
						 | 
					@ -3182,7 +3210,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
 | 
				
			||||||
void ceph_msg_dump(struct ceph_msg *msg)
 | 
					void ceph_msg_dump(struct ceph_msg *msg)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
 | 
						pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
 | 
				
			||||||
		 msg->front_max, msg->data.length);
 | 
							 msg->front_max, msg->data->length);
 | 
				
			||||||
	print_hex_dump(KERN_DEBUG, "header: ",
 | 
						print_hex_dump(KERN_DEBUG, "header: ",
 | 
				
			||||||
		       DUMP_PREFIX_OFFSET, 16, 1,
 | 
							       DUMP_PREFIX_OFFSET, 16, 1,
 | 
				
			||||||
		       &msg->hdr, sizeof(msg->hdr), true);
 | 
							       &msg->hdr, sizeof(msg->hdr), true);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in a new issue