diff --git a/fs/direct-io.c b/fs/direct-io.c
index 71f4aeac7632328ead009dbe7d2411f670e3cf2f..d9d0833444f59da15dc8be42420c13366429d402 100644
--- a/fs/direct-io.c
+++ b/fs/direct-io.c
@@ -121,8 +121,8 @@ struct dio {
 	int page_errors;		/* errno from get_user_pages() */
 
 	/* BIO completion state */
-	atomic_t refcount;		/* direct_io_worker() and bios */
 	spinlock_t bio_lock;		/* protects BIO fields below */
+	unsigned long refcount;		/* direct_io_worker() and bios */
 	struct bio *bio_list;		/* singly linked via bi_private */
 	struct task_struct *waiter;	/* waiting task (NULL if none) */
 
@@ -267,8 +267,8 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio);
 static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
 {
 	struct dio *dio = bio->bi_private;
-	int waiter_holds_ref = 0;
-	int remaining;
+	unsigned long remaining;
+	unsigned long flags;
 
 	if (bio->bi_size)
 		return 1;
@@ -276,10 +276,11 @@ static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
 	/* cleanup the bio */
 	dio_bio_complete(dio, bio);
 
-	waiter_holds_ref = !!dio->waiter;
-	remaining = atomic_sub_return(1, (&dio->refcount));
-	if (remaining == 1 && waiter_holds_ref)
+	spin_lock_irqsave(&dio->bio_lock, flags);
+	remaining = --dio->refcount;
+	if (remaining == 1 && dio->waiter)
 		wake_up_process(dio->waiter);
+	spin_unlock_irqrestore(&dio->bio_lock, flags);
 
 	if (remaining == 0) {
 		int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
@@ -308,7 +309,7 @@ static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error)
 	spin_lock_irqsave(&dio->bio_lock, flags);
 	bio->bi_private = dio->bio_list;
 	dio->bio_list = bio;
-	if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter)
+	if (--dio->refcount == 1 && dio->waiter)
 		wake_up_process(dio->waiter);
 	spin_unlock_irqrestore(&dio->bio_lock, flags);
 	return 0;
@@ -345,11 +346,17 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
 static void dio_bio_submit(struct dio *dio)
 {
 	struct bio *bio = dio->bio;
+	unsigned long flags;
 
 	bio->bi_private = dio;
-	atomic_inc(&dio->refcount);
+
+	spin_lock_irqsave(&dio->bio_lock, flags);
+	dio->refcount++;
+	spin_unlock_irqrestore(&dio->bio_lock, flags);
+
 	if (dio->is_async && dio->rw == READ)
 		bio_set_pages_dirty(bio);
+
 	submit_bio(dio->rw, bio);
 
 	dio->bio = NULL;
@@ -365,13 +372,6 @@ static void dio_cleanup(struct dio *dio)
 		page_cache_release(dio_get_page(dio));
 }
 
-static int wait_for_more_bios(struct dio *dio)
-{
-	assert_spin_locked(&dio->bio_lock);
-
-	return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
-}
-
 /*
  * Wait for the next BIO to complete.  Remove it and return it.  NULL is
  * returned once all BIOs have been completed.  This must only be called once
@@ -384,16 +384,21 @@ static struct bio *dio_await_one(struct dio *dio)
 	struct bio *bio = NULL;
 
 	spin_lock_irqsave(&dio->bio_lock, flags);
-	while (wait_for_more_bios(dio)) {
-		set_current_state(TASK_UNINTERRUPTIBLE);
-		if (wait_for_more_bios(dio)) {
-			dio->waiter = current;
-			spin_unlock_irqrestore(&dio->bio_lock, flags);
-			io_schedule();
-			spin_lock_irqsave(&dio->bio_lock, flags);
-			dio->waiter = NULL;
-		}
-		set_current_state(TASK_RUNNING);
+
+	/*
+	 * Wait as long as the list is empty and there are bios in flight.  bio
+	 * completion drops the count, maybe adds to the list, and wakes while
+	 * holding the bio_lock so we don't need set_current_state()'s barrier
+	 * and can call it after testing our condition.
+	 */
+	while (dio->refcount > 1 && dio->bio_list == NULL) {
+		__set_current_state(TASK_UNINTERRUPTIBLE);
+		dio->waiter = current;
+		spin_unlock_irqrestore(&dio->bio_lock, flags);
+		io_schedule();
+		/* wake up sets us TASK_RUNNING */
+		spin_lock_irqsave(&dio->bio_lock, flags);
+		dio->waiter = NULL;
 	}
 	if (dio->bio_list) {
 		bio = dio->bio_list;
@@ -951,6 +956,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 	struct dio *dio)
 {
 	unsigned long user_addr; 
+	unsigned long flags;
 	int seg;
 	ssize_t ret = 0;
 	ssize_t ret2;
@@ -981,8 +987,8 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 	dio->iocb = iocb;
 	dio->i_size = i_size_read(inode);
 
-	atomic_set(&dio->refcount, 1);
 	spin_lock_init(&dio->bio_lock);
+	dio->refcount = 1;
 	dio->bio_list = NULL;
 	dio->waiter = NULL;
 
@@ -1092,12 +1098,20 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
 
 	/*
 	 * Sync will always be dropping the final ref and completing the
-	 * operation.  AIO can if it was a broken operation described above
-	 * or in fact if all the bios race to complete before we get here.
-	 * In that case dio_complete() translates the EIOCBQUEUED into
-	 * the proper return code that the caller will hand to aio_complete().
+	 * operation.  AIO can if it was a broken operation described above or
+	 * in fact if all the bios race to complete before we get here.  In
+	 * that case dio_complete() translates the EIOCBQUEUED into the proper
+	 * return code that the caller will hand to aio_complete().
+	 *
+	 * This is managed by the bio_lock instead of being an atomic_t so that
+	 * completion paths can drop their ref and use the remaining count to
+	 * decide to wake the submission path atomically.
 	 */
-	if (atomic_dec_and_test(&dio->refcount)) {
+	spin_lock_irqsave(&dio->bio_lock, flags);
+	ret2 = --dio->refcount;
+	spin_unlock_irqrestore(&dio->bio_lock, flags);
+	BUG_ON(!dio->is_async && ret2 != 0);
+	if (ret2 == 0) {
 		ret = dio_complete(dio, offset, ret);
 		kfree(dio);
 	} else