forked from mirrors/linux
		
	libceph: have messages point to their connection
When a ceph message is queued for sending it is placed on a list of pending messages (ceph_connection->out_queue). When they are actually sent over the wire, they are moved from that list to another (ceph_connection->out_sent). When acknowledgement for the message is received, it is removed from the sent messages list. During that entire time the message is "in the possession" of a single ceph connection. Keep track of that connection in the message. This will be used in the next patch (and is a helpful bit of information for debugging anyway). Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Sage Weil <sage@inktank.com>
This commit is contained in:
		
							parent
							
								
									1c20f2d267
								
							
						
					
					
						commit
						38941f8031
					
				
					 2 changed files with 28 additions and 2 deletions
				
			
		| 
						 | 
					@ -77,7 +77,10 @@ struct ceph_msg {
 | 
				
			||||||
	unsigned nr_pages;              /* size of page array */
 | 
						unsigned nr_pages;              /* size of page array */
 | 
				
			||||||
	unsigned page_alignment;        /* io offset in first page */
 | 
						unsigned page_alignment;        /* io offset in first page */
 | 
				
			||||||
	struct ceph_pagelist *pagelist; /* instead of pages */
 | 
						struct ceph_pagelist *pagelist; /* instead of pages */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						struct ceph_connection *con;
 | 
				
			||||||
	struct list_head list_head;
 | 
						struct list_head list_head;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	struct kref kref;
 | 
						struct kref kref;
 | 
				
			||||||
	struct bio  *bio;		/* instead of pages/pagelist */
 | 
						struct bio  *bio;		/* instead of pages/pagelist */
 | 
				
			||||||
	struct bio  *bio_iter;		/* bio iterator */
 | 
						struct bio  *bio_iter;		/* bio iterator */
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -414,6 +414,9 @@ static int con_close_socket(struct ceph_connection *con)
 | 
				
			||||||
static void ceph_msg_remove(struct ceph_msg *msg)
 | 
					static void ceph_msg_remove(struct ceph_msg *msg)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	list_del_init(&msg->list_head);
 | 
						list_del_init(&msg->list_head);
 | 
				
			||||||
 | 
						BUG_ON(msg->con == NULL);
 | 
				
			||||||
 | 
						msg->con = NULL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ceph_msg_put(msg);
 | 
						ceph_msg_put(msg);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
static void ceph_msg_remove_list(struct list_head *head)
 | 
					static void ceph_msg_remove_list(struct list_head *head)
 | 
				
			||||||
| 
						 | 
					@ -433,6 +436,8 @@ static void reset_connection(struct ceph_connection *con)
 | 
				
			||||||
	ceph_msg_remove_list(&con->out_sent);
 | 
						ceph_msg_remove_list(&con->out_sent);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (con->in_msg) {
 | 
						if (con->in_msg) {
 | 
				
			||||||
 | 
							BUG_ON(con->in_msg->con != con);
 | 
				
			||||||
 | 
							con->in_msg->con = NULL;
 | 
				
			||||||
		ceph_msg_put(con->in_msg);
 | 
							ceph_msg_put(con->in_msg);
 | 
				
			||||||
		con->in_msg = NULL;
 | 
							con->in_msg = NULL;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -625,8 +630,10 @@ static void prepare_write_message(struct ceph_connection *con)
 | 
				
			||||||
			&con->out_temp_ack);
 | 
								&con->out_temp_ack);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						BUG_ON(list_empty(&con->out_queue));
 | 
				
			||||||
	m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
 | 
						m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
 | 
				
			||||||
	con->out_msg = m;
 | 
						con->out_msg = m;
 | 
				
			||||||
 | 
						BUG_ON(m->con != con);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* put message on sent list */
 | 
						/* put message on sent list */
 | 
				
			||||||
	ceph_msg_get(m);
 | 
						ceph_msg_get(m);
 | 
				
			||||||
| 
						 | 
					@ -1806,6 +1813,8 @@ static int read_partial_message(struct ceph_connection *con)
 | 
				
			||||||
				"error allocating memory for incoming message";
 | 
									"error allocating memory for incoming message";
 | 
				
			||||||
			return -ENOMEM;
 | 
								return -ENOMEM;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							BUG_ON(con->in_msg->con != con);
 | 
				
			||||||
		m = con->in_msg;
 | 
							m = con->in_msg;
 | 
				
			||||||
		m->front.iov_len = 0;    /* haven't read it yet */
 | 
							m->front.iov_len = 0;    /* haven't read it yet */
 | 
				
			||||||
		if (m->middle)
 | 
							if (m->middle)
 | 
				
			||||||
| 
						 | 
					@ -1901,6 +1910,8 @@ static void process_message(struct ceph_connection *con)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct ceph_msg *msg;
 | 
						struct ceph_msg *msg;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						BUG_ON(con->in_msg->con != con);
 | 
				
			||||||
 | 
						con->in_msg->con = NULL;
 | 
				
			||||||
	msg = con->in_msg;
 | 
						msg = con->in_msg;
 | 
				
			||||||
	con->in_msg = NULL;
 | 
						con->in_msg = NULL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2260,6 +2271,8 @@ static void ceph_fault(struct ceph_connection *con)
 | 
				
			||||||
	con_close_socket(con);
 | 
						con_close_socket(con);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (con->in_msg) {
 | 
						if (con->in_msg) {
 | 
				
			||||||
 | 
							BUG_ON(con->in_msg->con != con);
 | 
				
			||||||
 | 
							con->in_msg->con = NULL;
 | 
				
			||||||
		ceph_msg_put(con->in_msg);
 | 
							ceph_msg_put(con->in_msg);
 | 
				
			||||||
		con->in_msg = NULL;
 | 
							con->in_msg = NULL;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -2378,6 +2391,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* queue */
 | 
						/* queue */
 | 
				
			||||||
	mutex_lock(&con->mutex);
 | 
						mutex_lock(&con->mutex);
 | 
				
			||||||
 | 
						BUG_ON(msg->con != NULL);
 | 
				
			||||||
 | 
						msg->con = con;
 | 
				
			||||||
	BUG_ON(!list_empty(&msg->list_head));
 | 
						BUG_ON(!list_empty(&msg->list_head));
 | 
				
			||||||
	list_add_tail(&msg->list_head, &con->out_queue);
 | 
						list_add_tail(&msg->list_head, &con->out_queue);
 | 
				
			||||||
	dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
 | 
						dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
 | 
				
			||||||
| 
						 | 
					@ -2403,13 +2418,16 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	mutex_lock(&con->mutex);
 | 
						mutex_lock(&con->mutex);
 | 
				
			||||||
	if (!list_empty(&msg->list_head)) {
 | 
						if (!list_empty(&msg->list_head)) {
 | 
				
			||||||
		dout("con_revoke %p msg %p - was on queue\n", con, msg);
 | 
							dout("%s %p msg %p - was on queue\n", __func__, con, msg);
 | 
				
			||||||
		list_del_init(&msg->list_head);
 | 
							list_del_init(&msg->list_head);
 | 
				
			||||||
 | 
							BUG_ON(msg->con == NULL);
 | 
				
			||||||
 | 
							msg->con = NULL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		ceph_msg_put(msg);
 | 
							ceph_msg_put(msg);
 | 
				
			||||||
		msg->hdr.seq = 0;
 | 
							msg->hdr.seq = 0;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if (con->out_msg == msg) {
 | 
						if (con->out_msg == msg) {
 | 
				
			||||||
		dout("con_revoke %p msg %p - was sending\n", con, msg);
 | 
							dout("%s %p msg %p - was sending\n", __func__, con, msg);
 | 
				
			||||||
		con->out_msg = NULL;
 | 
							con->out_msg = NULL;
 | 
				
			||||||
		if (con->out_kvec_is_msg) {
 | 
							if (con->out_kvec_is_msg) {
 | 
				
			||||||
			con->out_skip = con->out_kvec_bytes;
 | 
								con->out_skip = con->out_kvec_bytes;
 | 
				
			||||||
| 
						 | 
					@ -2478,6 +2496,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 | 
				
			||||||
	if (m == NULL)
 | 
						if (m == NULL)
 | 
				
			||||||
		goto out;
 | 
							goto out;
 | 
				
			||||||
	kref_init(&m->kref);
 | 
						kref_init(&m->kref);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						m->con = NULL;
 | 
				
			||||||
	INIT_LIST_HEAD(&m->list_head);
 | 
						INIT_LIST_HEAD(&m->list_head);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	m->hdr.tid = 0;
 | 
						m->hdr.tid = 0;
 | 
				
			||||||
| 
						 | 
					@ -2598,6 +2618,8 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
 | 
				
			||||||
		mutex_unlock(&con->mutex);
 | 
							mutex_unlock(&con->mutex);
 | 
				
			||||||
		con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
 | 
							con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
 | 
				
			||||||
		mutex_lock(&con->mutex);
 | 
							mutex_lock(&con->mutex);
 | 
				
			||||||
 | 
							if (con->in_msg)
 | 
				
			||||||
 | 
								con->in_msg->con = con;
 | 
				
			||||||
		if (skip)
 | 
							if (skip)
 | 
				
			||||||
			con->in_msg = NULL;
 | 
								con->in_msg = NULL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2611,6 +2633,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
 | 
				
			||||||
			       type, front_len);
 | 
								       type, front_len);
 | 
				
			||||||
			return false;
 | 
								return false;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							con->in_msg->con = con;
 | 
				
			||||||
		con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
 | 
							con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 | 
						memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in a new issue