thread-pool.c 9.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * QEMU block layer thread pool
 *
 * Copyright IBM, Corp. 2008
 * Copyright Red Hat, Inc. 2012
 *
 * Authors:
 *  Anthony Liguori   <aliguori@us.ibm.com>
 *  Paolo Bonzini     <pbonzini@redhat.com>
 *
 * This work is licensed under the terms of the GNU GPL, version 2.  See
 * the COPYING file in the top-level directory.
 *
 * Contributions after 2012-01-13 are licensed under the terms of the
 * GNU GPL, version 2 or (at your option) any later version.
 */
#include "qemu-common.h"
18 19 20
#include "qemu/queue.h"
#include "qemu/thread.h"
#include "qemu/osdep.h"
21
#include "block/coroutine.h"
22
#include "trace.h"
23
#include "block/thread-pool.h"
24
#include "qemu/main-loop.h"
25

26
static void do_spawn_thread(ThreadPool *pool);
27 28 29 30 31 32 33 34 35 36 37

typedef struct ThreadPoolElement ThreadPoolElement;

enum ThreadState {
    THREAD_QUEUED,
    THREAD_ACTIVE,
    THREAD_DONE,
};

struct ThreadPoolElement {
    BlockDriverAIOCB common;
38
    ThreadPool *pool;
39 40
    ThreadPoolFunc *func;
    void *arg;
41 42 43 44 45

    /* Moving state out of THREAD_QUEUED is protected by lock.  After
     * that, only the worker thread can write to it.  Reads and writes
     * of state and ret are ordered with memory barriers.
     */
46 47 48 49 50 51 52 53 54 55
    enum ThreadState state;
    int ret;

    /* Access to this list is protected by lock.  */
    QTAILQ_ENTRY(ThreadPoolElement) reqs;

    /* Access to this list is protected by the global mutex.  */
    QLIST_ENTRY(ThreadPoolElement) all;
};

56
struct ThreadPool {
57
    AioContext *ctx;
58
    QEMUBH *completion_bh;
59
    QemuMutex lock;
60
    QemuCond worker_stopped;
61 62 63 64 65 66 67 68 69 70 71 72 73
    QemuSemaphore sem;
    int max_threads;
    QEMUBH *new_thread_bh;

    /* The following variables are only accessed from one AioContext. */
    QLIST_HEAD(, ThreadPoolElement) head;

    /* The following variables are protected by lock.  */
    QTAILQ_HEAD(, ThreadPoolElement) request_list;
    int cur_threads;
    int idle_threads;
    int new_threads;     /* backlog of threads we need to create */
    int pending_threads; /* threads created but not running yet */
74
    bool stopping;
75 76 77
};

static void *worker_thread(void *opaque)
78
{
79 80 81 82 83
    ThreadPool *pool = opaque;

    qemu_mutex_lock(&pool->lock);
    pool->pending_threads--;
    do_spawn_thread(pool);
84

85
    while (!pool->stopping) {
86 87 88 89
        ThreadPoolElement *req;
        int ret;

        do {
90 91 92 93 94 95
            pool->idle_threads++;
            qemu_mutex_unlock(&pool->lock);
            ret = qemu_sem_timedwait(&pool->sem, 10000);
            qemu_mutex_lock(&pool->lock);
            pool->idle_threads--;
        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
96
        if (ret == -1 || pool->stopping) {
97 98 99
            break;
        }

100 101
        req = QTAILQ_FIRST(&pool->request_list);
        QTAILQ_REMOVE(&pool->request_list, req, reqs);
102
        req->state = THREAD_ACTIVE;
103
        qemu_mutex_unlock(&pool->lock);
104 105 106 107

        ret = req->func(req->arg);

        req->ret = ret;
108 109 110 111
        /* Write ret before state.  */
        smp_wmb();
        req->state = THREAD_DONE;

112
        qemu_mutex_lock(&pool->lock);
113

114
        qemu_bh_schedule(pool->completion_bh);
115 116
    }

117
    pool->cur_threads--;
118
    qemu_cond_signal(&pool->worker_stopped);
119
    qemu_mutex_unlock(&pool->lock);
120 121 122
    return NULL;
}

123
static void do_spawn_thread(ThreadPool *pool)
124 125 126 127
{
    QemuThread t;

    /* Runs with lock taken.  */
128
    if (!pool->new_threads) {
129 130 131
        return;
    }

132 133
    pool->new_threads--;
    pool->pending_threads++;
134

135
    qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
136 137 138 139
}

static void spawn_thread_bh_fn(void *opaque)
{
140 141 142 143 144
    ThreadPool *pool = opaque;

    qemu_mutex_lock(&pool->lock);
    do_spawn_thread(pool);
    qemu_mutex_unlock(&pool->lock);
145 146
}

147
static void spawn_thread(ThreadPool *pool)
148
{
149 150
    pool->cur_threads++;
    pool->new_threads++;
151 152 153 154 155 156 157
    /* If there are threads being created, they will spawn new workers, so
     * we don't spend time creating many threads in a loop holding a mutex or
     * starving the current vcpu.
     *
     * If there are no idle threads, ask the main thread to create one, so we
     * inherit the correct affinity instead of the vcpu affinity.
     */
158 159
    if (!pool->pending_threads) {
        qemu_bh_schedule(pool->new_thread_bh);
160 161 162
    }
}

163
static void thread_pool_completion_bh(void *opaque)
164
{
165
    ThreadPool *pool = opaque;
166 167 168
    ThreadPoolElement *elem, *next;

restart:
169
    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
170
        if (elem->state != THREAD_DONE) {
171 172 173
            continue;
        }
        if (elem->state == THREAD_DONE) {
174 175
            trace_thread_pool_complete(pool, elem, elem->common.opaque,
                                       elem->ret);
176 177 178
        }
        if (elem->state == THREAD_DONE && elem->common.cb) {
            QLIST_REMOVE(elem, all);
179 180
            /* Read state before ret.  */
            smp_rmb();
181 182 183 184 185 186

            /* Schedule ourselves in case elem->common.cb() calls aio_poll() to
             * wait for another request that completed at the same time.
             */
            qemu_bh_schedule(pool->completion_bh);

187
            elem->common.cb(elem->common.opaque, elem->ret);
188
            qemu_aio_unref(elem);
189 190 191 192
            goto restart;
        } else {
            /* remove the request */
            QLIST_REMOVE(elem, all);
193
            qemu_aio_unref(elem);
194 195 196 197 198 199 200
        }
    }
}

static void thread_pool_cancel(BlockDriverAIOCB *acb)
{
    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
201
    ThreadPool *pool = elem->pool;
202 203 204

    trace_thread_pool_cancel(elem, elem->common.opaque);

205
    qemu_mutex_lock(&pool->lock);
206 207 208 209 210 211
    if (elem->state == THREAD_QUEUED &&
        /* No thread has yet started working on elem. we can try to "steal"
         * the item from the worker if we can get a signal from the
         * semaphore.  Because this is non-blocking, we can do it with
         * the lock taken and ensure that elem will remain THREAD_QUEUED.
         */
212 213
        qemu_sem_timedwait(&pool->sem, 0) == 0) {
        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
214
        qemu_bh_schedule(pool->completion_bh);
215 216 217

        elem->state = THREAD_DONE;
        elem->ret = -ECANCELED;
218
    }
219

220
    qemu_mutex_unlock(&pool->lock);
221 222 223 224 225 226 227
}

static AioContext *thread_pool_get_aio_context(BlockDriverAIOCB *acb)
{
    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
    ThreadPool *pool = elem->pool;
    return pool->ctx;
228 229
}

230
static const AIOCBInfo thread_pool_aiocb_info = {
231
    .aiocb_size         = sizeof(ThreadPoolElement),
232 233
    .cancel_async       = thread_pool_cancel,
    .get_aio_context    = thread_pool_get_aio_context,
234 235
};

236 237
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
        ThreadPoolFunc *func, void *arg,
238 239 240 241
        BlockDriverCompletionFunc *cb, void *opaque)
{
    ThreadPoolElement *req;

242
    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
243 244 245
    req->func = func;
    req->arg = arg;
    req->state = THREAD_QUEUED;
246
    req->pool = pool;
247

248
    QLIST_INSERT_HEAD(&pool->head, req, all);
249

250
    trace_thread_pool_submit(pool, req, arg);
251

252 253 254
    qemu_mutex_lock(&pool->lock);
    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
        spawn_thread(pool);
255
    }
256 257 258
    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
    qemu_mutex_unlock(&pool->lock);
    qemu_sem_post(&pool->sem);
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
    return &req->common;
}

typedef struct ThreadPoolCo {
    Coroutine *co;
    int ret;
} ThreadPoolCo;

static void thread_pool_co_cb(void *opaque, int ret)
{
    ThreadPoolCo *co = opaque;

    co->ret = ret;
    qemu_coroutine_enter(co->co, NULL);
}

275 276
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
                                       void *arg)
277 278 279
{
    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
    assert(qemu_in_coroutine());
280
    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
281 282 283 284
    qemu_coroutine_yield();
    return tpc.ret;
}

285
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
286
{
287
    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
288 289
}

290 291 292 293 294 295 296
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
{
    if (!ctx) {
        ctx = qemu_get_aio_context();
    }

    memset(pool, 0, sizeof(*pool));
297
    pool->ctx = ctx;
298
    pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
299
    qemu_mutex_init(&pool->lock);
300
    qemu_cond_init(&pool->worker_stopped);
301 302 303 304 305 306 307 308
    qemu_sem_init(&pool->sem, 0);
    pool->max_threads = 64;
    pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);

    QLIST_INIT(&pool->head);
    QTAILQ_INIT(&pool->request_list);
}

309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
ThreadPool *thread_pool_new(AioContext *ctx)
{
    ThreadPool *pool = g_new(ThreadPool, 1);
    thread_pool_init_one(pool, ctx);
    return pool;
}

void thread_pool_free(ThreadPool *pool)
{
    if (!pool) {
        return;
    }

    assert(QLIST_EMPTY(&pool->head));

    qemu_mutex_lock(&pool->lock);

    /* Stop new threads from spawning */
    qemu_bh_delete(pool->new_thread_bh);
    pool->cur_threads -= pool->new_threads;
    pool->new_threads = 0;

    /* Wait for worker threads to terminate */
    pool->stopping = true;
    while (pool->cur_threads > 0) {
        qemu_sem_post(&pool->sem);
        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
    }

    qemu_mutex_unlock(&pool->lock);

340
    qemu_bh_delete(pool->completion_bh);
341 342 343 344 345
    qemu_sem_destroy(&pool->sem);
    qemu_cond_destroy(&pool->worker_stopped);
    qemu_mutex_destroy(&pool->lock);
    g_free(pool);
}