buffered_file.c 6.11 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
/*
 * QEMU buffered QEMUFile
 *
 * Copyright IBM, Corp. 2008
 *
 * Authors:
 *  Anthony Liguori   <aliguori@us.ibm.com>
 *
 * This work is licensed under the terms of the GNU GPL, version 2.  See
 * the COPYING file in the top-level directory.
 *
 */

#include "qemu-common.h"
#include "hw/hw.h"
#include "qemu-timer.h"
#include "sysemu.h"
#include "qemu-char.h"
#include "buffered_file.h"

//#define DEBUG_BUFFERED_FILE

typedef struct QEMUFileBuffered
{
    BufferedPutFunc *put_buffer;
    BufferedPutReadyFunc *put_ready;
    BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
    BufferedCloseFunc *close;
    void *opaque;
    QEMUFile *file;
    int has_error;
    int freeze_output;
    size_t bytes_xfer;
    size_t xfer_limit;
    uint8_t *buffer;
    size_t buffer_size;
    size_t buffer_capacity;
    QEMUTimer *timer;
} QEMUFileBuffered;

#ifdef DEBUG_BUFFERED_FILE
malc's avatar
malc committed
42
#define DPRINTF(fmt, ...) \
43 44
    do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
#else
malc's avatar
malc committed
45
#define DPRINTF(fmt, ...) \
46 47 48 49 50 51 52 53 54
    do { } while (0)
#endif

static void buffered_append(QEMUFileBuffered *s,
                            const uint8_t *buf, size_t size)
{
    if (size > (s->buffer_capacity - s->buffer_size)) {
        void *tmp;

malc's avatar
malc committed
55
        DPRINTF("increasing buffer capacity from %zu by %zu\n",
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
                s->buffer_capacity, size + 1024);

        s->buffer_capacity += size + 1024;

        tmp = qemu_realloc(s->buffer, s->buffer_capacity);
        if (tmp == NULL) {
            fprintf(stderr, "qemu file buffer expansion failed\n");
            exit(1);
        }

        s->buffer = tmp;
    }

    memcpy(s->buffer + s->buffer_size, buf, size);
    s->buffer_size += size;
}

static void buffered_flush(QEMUFileBuffered *s)
{
    size_t offset = 0;

    if (s->has_error) {
malc's avatar
malc committed
78
        DPRINTF("flush when error, bailing\n");
79 80 81
        return;
    }

malc's avatar
malc committed
82
    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
83 84 85 86 87 88 89

    while (offset < s->buffer_size) {
        ssize_t ret;

        ret = s->put_buffer(s->opaque, s->buffer + offset,
                            s->buffer_size - offset);
        if (ret == -EAGAIN) {
malc's avatar
malc committed
90
            DPRINTF("backend not ready, freezing\n");
91 92 93 94 95
            s->freeze_output = 1;
            break;
        }

        if (ret <= 0) {
malc's avatar
malc committed
96
            DPRINTF("error flushing data, %zd\n", ret);
97 98 99
            s->has_error = 1;
            break;
        } else {
malc's avatar
malc committed
100
            DPRINTF("flushed %zd byte(s)\n", ret);
101 102 103 104
            offset += ret;
        }
    }

malc's avatar
malc committed
105
    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
106 107 108 109 110 111 112 113 114 115
    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
    s->buffer_size -= offset;
}

static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
{
    QEMUFileBuffered *s = opaque;
    int offset = 0;
    ssize_t ret;

malc's avatar
malc committed
116
    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
117 118

    if (s->has_error) {
malc's avatar
malc committed
119
        DPRINTF("flush when error, bailing\n");
120 121 122
        return -EINVAL;
    }

malc's avatar
malc committed
123
    DPRINTF("unfreezing output\n");
124 125 126 127 128 129
    s->freeze_output = 0;

    buffered_flush(s);

    while (!s->freeze_output && offset < size) {
        if (s->bytes_xfer > s->xfer_limit) {
malc's avatar
malc committed
130
            DPRINTF("transfer limit exceeded when putting\n");
131 132 133 134 135
            break;
        }

        ret = s->put_buffer(s->opaque, buf + offset, size - offset);
        if (ret == -EAGAIN) {
malc's avatar
malc committed
136
            DPRINTF("backend not ready, freezing\n");
137 138 139 140 141
            s->freeze_output = 1;
            break;
        }

        if (ret <= 0) {
malc's avatar
malc committed
142
            DPRINTF("error putting\n");
143 144 145 146 147
            s->has_error = 1;
            offset = -EINVAL;
            break;
        }

malc's avatar
malc committed
148
        DPRINTF("put %zd byte(s)\n", ret);
149 150 151 152 153
        offset += ret;
        s->bytes_xfer += ret;
    }

    if (offset >= 0) {
malc's avatar
malc committed
154
        DPRINTF("buffering %d bytes\n", size - offset);
155 156 157 158 159 160 161 162 163 164 165 166
        buffered_append(s, buf + offset, size - offset);
        offset = size;
    }

    return offset;
}

static int buffered_close(void *opaque)
{
    QEMUFileBuffered *s = opaque;
    int ret;

malc's avatar
malc committed
167
    DPRINTF("closing\n");
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200

    while (!s->has_error && s->buffer_size) {
        buffered_flush(s);
        if (s->freeze_output)
            s->wait_for_unfreeze(s);
    }

    ret = s->close(s->opaque);

    qemu_del_timer(s->timer);
    qemu_free_timer(s->timer);
    qemu_free(s->buffer);
    qemu_free(s);

    return ret;
}

static int buffered_rate_limit(void *opaque)
{
    QEMUFileBuffered *s = opaque;

    if (s->has_error)
        return 0;

    if (s->freeze_output)
        return 1;

    if (s->bytes_xfer > s->xfer_limit)
        return 1;

    return 0;
}

201 202 203 204 205 206 207 208 209 210 211 212 213
static size_t buffered_set_rate_limit(void *opaque, size_t new_rate)
{
    QEMUFileBuffered *s = opaque;

    if (s->has_error)
        goto out;

    s->xfer_limit = new_rate / 10;
    
out:
    return s->xfer_limit;
}

lirans@il.ibm.com's avatar
lirans@il.ibm.com committed
214 215 216 217 218 219 220
static size_t buffered_get_rate_limit(void *opaque)
{
    QEMUFileBuffered *s = opaque;
  
    return s->xfer_limit;
}

221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
static void buffered_rate_tick(void *opaque)
{
    QEMUFileBuffered *s = opaque;

    if (s->has_error)
        return;

    qemu_mod_timer(s->timer, qemu_get_clock(rt_clock) + 100);

    if (s->freeze_output)
        return;

    s->bytes_xfer = 0;

    buffered_flush(s);

    /* Add some checks around this */
    s->put_ready(s->opaque);
}

QEMUFile *qemu_fopen_ops_buffered(void *opaque,
                                  size_t bytes_per_sec,
                                  BufferedPutFunc *put_buffer,
                                  BufferedPutReadyFunc *put_ready,
                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
                                  BufferedCloseFunc *close)
{
    QEMUFileBuffered *s;

    s = qemu_mallocz(sizeof(*s));

    s->opaque = opaque;
    s->xfer_limit = bytes_per_sec / 10;
    s->put_buffer = put_buffer;
    s->put_ready = put_ready;
    s->wait_for_unfreeze = wait_for_unfreeze;
    s->close = close;

    s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
260
                             buffered_close, buffered_rate_limit,
lirans@il.ibm.com's avatar
lirans@il.ibm.com committed
261 262
                             buffered_set_rate_limit,
			     buffered_get_rate_limit);
263 264 265 266 267 268 269

    s->timer = qemu_new_timer(rt_clock, buffered_rate_tick, s);

    qemu_mod_timer(s->timer, qemu_get_clock(rt_clock) + 100);

    return s->file;
}