diff --git a/net/rds/connection.c b/net/rds/connection.c
index 5bb0eec5ada3ee8e3c6bab3bf73298e6a0067584..89871db77f8fbc0dfe0d58ce3677b26677dfbac1 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -148,9 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
 	spin_lock_init(&conn->c_lock);
 	conn->c_next_tx_seq = 1;
 
-	spin_lock_init(&conn->c_send_lock);
-	atomic_set(&conn->c_send_generation, 1);
-	atomic_set(&conn->c_senders, 0);
+	init_waitqueue_head(&conn->c_waitq);
 	INIT_LIST_HEAD(&conn->c_send_queue);
 	INIT_LIST_HEAD(&conn->c_retrans);
 
@@ -275,15 +273,8 @@ void rds_conn_shutdown(struct rds_connection *conn)
 		}
 		mutex_unlock(&conn->c_cm_lock);
 
-		/* verify everybody's out of rds_send_xmit() */
-		spin_lock_irq(&conn->c_send_lock);
-		spin_unlock_irq(&conn->c_send_lock);
-
-		while(atomic_read(&conn->c_senders)) {
-			schedule_timeout(1);
-			spin_lock_irq(&conn->c_send_lock);
-			spin_unlock_irq(&conn->c_send_lock);
-		}
+		wait_event(conn->c_waitq,
+			   !test_bit(RDS_IN_XMIT, &conn->c_flags));
 
 		conn->c_trans->conn_shutdown(conn);
 		rds_conn_reset(conn);
@@ -477,8 +468,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
 		sizeof(cinfo->transport));
 	cinfo->flags = 0;
 
-	rds_conn_info_set(cinfo->flags,
-			  spin_is_locked(&conn->c_send_lock), SENDING);
+	rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+			  SENDING);
 	/* XXX Future: return the state rather than these funky bits */
 	rds_conn_info_set(cinfo->flags,
 			  atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 3f91e794eae931ebfd03b111c70d403ad63d3dbb..e88cb4af009b8b7eff61829fc2312dfce569a3a7 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -321,7 +321,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
  * credits (see rds_ib_send_add_credits below).
  *
  * The RDS send code is essentially single-threaded; rds_send_xmit
- * grabs c_send_lock to ensure exclusive access to the send ring.
+ * sets RDS_IN_XMIT to ensure exclusive access to the send ring.
  * However, the ACK sending code is independent and can race with
  * message SENDs.
  *
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 270ded76fd53115c294773d4d57b46f030cc4afe..4510344ce8ca3a527fa6ea46c790ff78b1fd050f 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -80,6 +80,7 @@ enum {
 /* Bits for c_flags */
 #define RDS_LL_SEND_FULL	0
 #define RDS_RECONNECT_PENDING	1
+#define RDS_IN_XMIT		2
 
 struct rds_connection {
 	struct hlist_node	c_hash_node;
@@ -91,9 +92,6 @@ struct rds_connection {
 	struct rds_cong_map	*c_lcong;
 	struct rds_cong_map	*c_fcong;
 
-	spinlock_t		c_send_lock;	/* protect send ring */
-	atomic_t		c_send_generation;
-	atomic_t		c_senders;
 	struct rds_message	*c_xmit_rm;
 	unsigned long		c_xmit_sg;
 	unsigned int		c_xmit_hdr_off;
@@ -120,6 +118,7 @@ struct rds_connection {
 	struct delayed_work	c_conn_w;
 	struct work_struct	c_down_w;
 	struct mutex		c_cm_lock;	/* protect conn state & cm */
+	wait_queue_head_t	c_waitq;
 
 	struct list_head	c_map_item;
 	unsigned long		c_map_queued;
diff --git a/net/rds/send.c b/net/rds/send.c
index b9e41afef32343acfda3cc2d05b2044a65358a0a..81471b25373bef8df7d506b6cb6aed7b640e8776 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444);
 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
 
 /*
- * Reset the send state. Caller must hold c_send_lock when calling here.
+ * Reset the send state.  Callers must ensure that this doesn't race with
+ * rds_send_xmit().
  */
 void rds_send_reset(struct rds_connection *conn)
 {
 	struct rds_message *rm, *tmp;
 	unsigned long flags;
 
-	spin_lock_irqsave(&conn->c_send_lock, flags);
 	if (conn->c_xmit_rm) {
 		rm = conn->c_xmit_rm;
 		conn->c_xmit_rm = NULL;
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn)
 		 * independently) but as the connection is down, there's
 		 * no ongoing RDMA to/from that memory */
 		rds_message_unmapped(rm);
-		spin_unlock_irqrestore(&conn->c_send_lock, flags);
-
 		rds_message_put(rm);
-	} else {
-		spin_unlock_irqrestore(&conn->c_send_lock, flags);
 	}
 
 	conn->c_xmit_sg = 0;
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
 	spin_unlock_irqrestore(&conn->c_lock, flags);
 }
 
+static int acquire_in_xmit(struct rds_connection *conn)
+{
+	return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
+}
+
+static void release_in_xmit(struct rds_connection *conn)
+{
+	clear_bit(RDS_IN_XMIT, &conn->c_flags);
+	smp_mb__after_clear_bit();
+	/*
+	 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
+	 * hot path and finding waiters is very rare.  We don't want to walk
+	 * the system-wide hashed waitqueue buckets in the fast path only to
+	 * almost never find waiters.
+	 */
+	if (waitqueue_active(&conn->c_waitq))
+		wake_up_all(&conn->c_waitq);
+}
+
 /*
  * We're making the concious trade-off here to only send one message
  * down the connection at a time.
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn)
 	unsigned int tmp;
 	struct scatterlist *sg;
 	int ret = 0;
-	int gen = 0;
 	LIST_HEAD(to_be_dropped);
 
 restart:
-	if (!rds_conn_up(conn))
-		goto out;
 
 	/*
 	 * sendmsg calls here after having queued its message on the send
@@ -133,18 +145,25 @@ restart:
 	 * avoids blocking the caller and trading per-connection data between
 	 * caches per message.
 	 */
-	if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
+	if (!acquire_in_xmit(conn)) {
 		rds_stats_inc(s_send_lock_contention);
 		ret = -ENOMEM;
 		goto out;
 	}
-	atomic_inc(&conn->c_senders);
+
+	/*
+	 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
+	 * we do the opposite to avoid races.
+	 */
+	if (!rds_conn_up(conn)) {
+		release_in_xmit(conn);
+		ret = 0;
+		goto out;
+	}
 
 	if (conn->c_trans->xmit_prepare)
 		conn->c_trans->xmit_prepare(conn);
 
-	gen = atomic_inc_return(&conn->c_send_generation);
-
 	/*
 	 * spin trying to push headers and data down the connection until
 	 * the connection doesn't make forward progress.
@@ -178,7 +197,7 @@ restart:
 		if (!rm) {
 			unsigned int len;
 
-			spin_lock(&conn->c_lock);
+			spin_lock_irqsave(&conn->c_lock, flags);
 
 			if (!list_empty(&conn->c_send_queue)) {
 				rm = list_entry(conn->c_send_queue.next,
@@ -193,7 +212,7 @@ restart:
 				list_move_tail(&rm->m_conn_item, &conn->c_retrans);
 			}
 
-			spin_unlock(&conn->c_lock);
+			spin_unlock_irqrestore(&conn->c_lock, flags);
 
 			if (!rm)
 				break;
@@ -207,10 +226,10 @@ restart:
 			 */
 			if (rm->rdma.op_active &&
 			    test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
-				spin_lock(&conn->c_lock);
+				spin_lock_irqsave(&conn->c_lock, flags);
 				if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
 					list_move(&rm->m_conn_item, &to_be_dropped);
-				spin_unlock(&conn->c_lock);
+				spin_unlock_irqrestore(&conn->c_lock, flags);
 				continue;
 			}
 
@@ -336,19 +355,7 @@ restart:
 	if (conn->c_trans->xmit_complete)
 		conn->c_trans->xmit_complete(conn);
 
-	/*
-	 * We might be racing with another sender who queued a message but
-	 * backed off on noticing that we held the c_send_lock.  If we check
-	 * for queued messages after dropping the sem then either we'll
-	 * see the queued message or the queuer will get the sem.  If we
-	 * notice the queued message then we trigger an immediate retry.
-	 *
-	 * We need to be careful only to do this when we stopped processing
-	 * the send queue because it was empty.  It's the only way we
-	 * stop processing the loop when the transport hasn't taken
-	 * responsibility for forward progress.
-	 */
-	spin_unlock_irqrestore(&conn->c_send_lock, flags);
+	release_in_xmit(conn);
 
 	/* Nuke any messages we decided not to retransmit. */
 	if (!list_empty(&to_be_dropped)) {
@@ -358,13 +365,12 @@ restart:
 		rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
 	}
 
-	atomic_dec(&conn->c_senders);
-
 	/*
-	 * Other senders will see we have c_send_lock and exit. We
-	 * need to recheck the send queue and race again for c_send_lock
-	 * to make sure messages don't just sit on the send queue, if
-	 * somebody hasn't already beat us into the loop.
+	 * Other senders can queue a message after we last test the send queue
+	 * but before we clear RDS_IN_XMIT.  In that case they'd back off and
+	 * not try and send their newly queued message.  We need to check the
+	 * send queue after having cleared RDS_IN_XMIT so that their message
+	 * doesn't get stuck on the send queue.
 	 *
 	 * If the transport cannot continue (i.e ret != 0), then it must
 	 * call us when more room is available, such as from the tx
@@ -374,9 +380,7 @@ restart:
 		smp_mb();
 		if (!list_empty(&conn->c_send_queue)) {
 			rds_stats_inc(s_send_lock_queue_raced);
-			if (gen == atomic_read(&conn->c_send_generation)) {
-				goto restart;
-			}
+			goto restart;
 		}
 	}
 out:
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 7a8ca7a1d983ff58240e2ccd11c1c457cfa1b3e9..2bab9bf07b91e48536c9202ae98c8b0e225996f2 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -61,7 +61,7 @@
  *
  * Transition to state DISCONNECTING/DOWN:
  *  -	Inside the shutdown worker; synchronizes with xmit path
- *	through c_send_lock, and with connection management callbacks
+ *	through RDS_IN_XMIT, and with connection management callbacks
  *	via c_cm_lock.
  *
  *	For receive callbacks, we rely on the underlying transport