ring_buffer.c 130 KB
Newer Older
Steven Rostedt's avatar
Steven Rostedt committed
1
2
3
4
5
/*
 * Generic ring buffer
 *
 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
 */
6
#include <linux/trace_events.h>
Steven Rostedt's avatar
Steven Rostedt committed
7
#include <linux/ring_buffer.h>
8
#include <linux/trace_clock.h>
9
#include <linux/trace_seq.h>
Steven Rostedt's avatar
Steven Rostedt committed
10
#include <linux/spinlock.h>
11
#include <linux/irq_work.h>
Steven Rostedt's avatar
Steven Rostedt committed
12
#include <linux/uaccess.h>
13
#include <linux/hardirq.h>
14
#include <linux/kthread.h>	/* for self test */
15
#include <linux/kmemcheck.h>
Steven Rostedt's avatar
Steven Rostedt committed
16
17
18
#include <linux/module.h>
#include <linux/percpu.h>
#include <linux/mutex.h>
19
#include <linux/delay.h>
20
#include <linux/slab.h>
Steven Rostedt's avatar
Steven Rostedt committed
21
22
23
#include <linux/init.h>
#include <linux/hash.h>
#include <linux/list.h>
24
#include <linux/cpu.h>
Steven Rostedt's avatar
Steven Rostedt committed
25

26
#include <asm/local.h>
27

28
29
static void update_pages_handler(struct work_struct *work);

30
31
32
33
34
/*
 * The ring buffer header is special. We must manually up keep it.
 */
int ring_buffer_print_entry_header(struct trace_seq *s)
{
35
36
37
38
39
40
41
42
43
44
45
46
47
	trace_seq_puts(s, "# compressed entry header\n");
	trace_seq_puts(s, "\ttype_len    :    5 bits\n");
	trace_seq_puts(s, "\ttime_delta  :   27 bits\n");
	trace_seq_puts(s, "\tarray       :   32 bits\n");
	trace_seq_putc(s, '\n');
	trace_seq_printf(s, "\tpadding     : type == %d\n",
			 RINGBUF_TYPE_PADDING);
	trace_seq_printf(s, "\ttime_extend : type == %d\n",
			 RINGBUF_TYPE_TIME_EXTEND);
	trace_seq_printf(s, "\tdata max type_len  == %d\n",
			 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);

	return !trace_seq_has_overflowed(s);
48
49
}

50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
 * The ring buffer is made up of a list of pages. A separate list of pages is
 * allocated for each CPU. A writer may only write to a buffer that is
 * associated with the CPU it is currently executing on.  A reader may read
 * from any per cpu buffer.
 *
 * The reader is special. For each per cpu buffer, the reader has its own
 * reader page. When a reader has read the entire reader page, this reader
 * page is swapped with another page in the ring buffer.
 *
 * Now, as long as the writer is off the reader page, the reader can do what
 * ever it wants with that page. The writer will never write to that page
 * again (as long as it is out of the ring buffer).
 *
 * Here's some silly ASCII art.
 *
 *   +------+
 *   |reader|          RING BUFFER
 *   |page  |
 *   +------+        +---+   +---+   +---+
 *                   |   |-->|   |-->|   |
 *                   +---+   +---+   +---+
 *                     ^               |
 *                     |               |
 *                     +---------------+
 *
 *
 *   +------+
 *   |reader|          RING BUFFER
 *   |page  |------------------v
 *   +------+        +---+   +---+   +---+
 *                   |   |-->|   |-->|   |
 *                   +---+   +---+   +---+
 *                     ^               |
 *                     |               |
 *                     +---------------+
 *
 *
 *   +------+
 *   |reader|          RING BUFFER
 *   |page  |------------------v
 *   +------+        +---+   +---+   +---+
 *      ^            |   |-->|   |-->|   |
 *      |            +---+   +---+   +---+
 *      |                              |
 *      |                              |
 *      +------------------------------+
 *
 *
 *   +------+
 *   |buffer|          RING BUFFER
 *   |page  |------------------v
 *   +------+        +---+   +---+   +---+
 *      ^            |   |   |   |-->|   |
 *      |   New      +---+   +---+   +---+
 *      |  Reader------^               |
 *      |   page                       |
 *      +------------------------------+
 *
 *
 * After we make this swap, the reader can hand this page off to the splice
 * code and be done with it. It can even allocate a new page if it needs to
 * and swap that into the ring buffer.
 *
 * We will be using cmpxchg soon to make all this lockless.
 *
 */

118
119
/* Used for individual buffers (after the counter) */
#define RB_BUFFER_OFF		(1 << 20)
120

121
#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
122

123
#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
124
#define RB_ALIGNMENT		4U
125
#define RB_MAX_SMALL_DATA	(RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
126
#define RB_EVNT_MIN_SIZE	8U	/* two 32bit words */
127

128
#ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
129
130
131
132
133
134
135
# define RB_FORCE_8BYTE_ALIGNMENT	0
# define RB_ARCH_ALIGNMENT		RB_ALIGNMENT
#else
# define RB_FORCE_8BYTE_ALIGNMENT	1
# define RB_ARCH_ALIGNMENT		8U
#endif

136
137
#define RB_ALIGN_DATA		__aligned(RB_ARCH_ALIGNMENT)

138
139
/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
Steven Rostedt's avatar
Steven Rostedt committed
140
141
142
143
144
145

enum {
	RB_LEN_TIME_EXTEND = 8,
	RB_LEN_TIME_STAMP = 16,
};

146
147
148
#define skip_time_extend(event) \
	((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))

149
150
static inline int rb_null_event(struct ring_buffer_event *event)
{
151
	return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
152
153
154
155
}

static void rb_event_set_padding(struct ring_buffer_event *event)
{
156
	/* padding has a NULL time_delta */
157
	event->type_len = RINGBUF_TYPE_PADDING;
158
159
160
	event->time_delta = 0;
}

161
static unsigned
162
rb_event_data_length(struct ring_buffer_event *event)
Steven Rostedt's avatar
Steven Rostedt committed
163
164
165
{
	unsigned length;

166
167
	if (event->type_len)
		length = event->type_len * RB_ALIGNMENT;
168
169
170
171
172
	else
		length = event->array[0];
	return length + RB_EVNT_HDR_SIZE;
}

173
174
175
176
177
178
/*
 * Return the length of the given event. Will return
 * the length of the time extend if the event is a
 * time extend.
 */
static inline unsigned
179
180
rb_event_length(struct ring_buffer_event *event)
{
181
	switch (event->type_len) {
Steven Rostedt's avatar
Steven Rostedt committed
182
	case RINGBUF_TYPE_PADDING:
183
184
185
		if (rb_null_event(event))
			/* undefined */
			return -1;
186
		return  event->array[0] + RB_EVNT_HDR_SIZE;
Steven Rostedt's avatar
Steven Rostedt committed
187
188
189
190
191
192
193
194

	case RINGBUF_TYPE_TIME_EXTEND:
		return RB_LEN_TIME_EXTEND;

	case RINGBUF_TYPE_TIME_STAMP:
		return RB_LEN_TIME_STAMP;

	case RINGBUF_TYPE_DATA:
195
		return rb_event_data_length(event);
Steven Rostedt's avatar
Steven Rostedt committed
196
197
198
199
200
201
202
	default:
		BUG();
	}
	/* not hit */
	return 0;
}

203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
/*
 * Return total length of time extend and data,
 *   or just the event length for all other events.
 */
static inline unsigned
rb_event_ts_length(struct ring_buffer_event *event)
{
	unsigned len = 0;

	if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
		/* time extends include the data event after it */
		len = RB_LEN_TIME_EXTEND;
		event = skip_time_extend(event);
	}
	return len + rb_event_length(event);
}

Steven Rostedt's avatar
Steven Rostedt committed
220
221
222
/**
 * ring_buffer_event_length - return the length of the event
 * @event: the event to get the length of
223
224
225
226
227
228
 *
 * Returns the size of the data load of a data event.
 * If the event is something other than a data event, it
 * returns the size of the event itself. With the exception
 * of a TIME EXTEND, where it still returns the size of the
 * data load of the data event after it.
Steven Rostedt's avatar
Steven Rostedt committed
229
230
231
 */
unsigned ring_buffer_event_length(struct ring_buffer_event *event)
{
232
233
234
235
236
237
	unsigned length;

	if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
		event = skip_time_extend(event);

	length = rb_event_length(event);
238
	if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
239
240
241
242
243
		return length;
	length -= RB_EVNT_HDR_SIZE;
	if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
                length -= sizeof(event->array[0]);
	return length;
Steven Rostedt's avatar
Steven Rostedt committed
244
}
245
EXPORT_SYMBOL_GPL(ring_buffer_event_length);
Steven Rostedt's avatar
Steven Rostedt committed
246
247

/* inline for ring buffer fast paths */
248
static void *
Steven Rostedt's avatar
Steven Rostedt committed
249
250
rb_event_data(struct ring_buffer_event *event)
{
251
252
	if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
		event = skip_time_extend(event);
253
	BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
Steven Rostedt's avatar
Steven Rostedt committed
254
	/* If length is in len field, then array[0] has the data */
255
	if (event->type_len)
Steven Rostedt's avatar
Steven Rostedt committed
256
257
258
259
260
261
262
263
264
265
266
267
268
		return (void *)&event->array[0];
	/* Otherwise length is in array[0] and array[1] has the data */
	return (void *)&event->array[1];
}

/**
 * ring_buffer_event_data - return the data of the event
 * @event: the event to get the data from
 */
void *ring_buffer_event_data(struct ring_buffer_event *event)
{
	return rb_event_data(event);
}
269
EXPORT_SYMBOL_GPL(ring_buffer_event_data);
Steven Rostedt's avatar
Steven Rostedt committed
270
271

#define for_each_buffer_cpu(buffer, cpu)		\
272
	for_each_cpu(cpu, buffer->cpumask)
Steven Rostedt's avatar
Steven Rostedt committed
273
274
275
276
277

#define TS_SHIFT	27
#define TS_MASK		((1ULL << TS_SHIFT) - 1)
#define TS_DELTA_TEST	(~TS_MASK)

278
279
/* Flag when events were overwritten */
#define RB_MISSED_EVENTS	(1 << 31)
280
281
/* Missed count stored at end */
#define RB_MISSED_STORED	(1 << 30)
282

283
struct buffer_data_page {
284
	u64		 time_stamp;	/* page time stamp */
Wenji Huang's avatar
Wenji Huang committed
285
	local_t		 commit;	/* write committed index */
286
	unsigned char	 data[] RB_ALIGN_DATA;	/* data of buffer page */
287
288
};

Steven Rostedt's avatar
Steven Rostedt committed
289
290
291
292
293
294
295
296
/*
 * Note, the buffer_page list must be first. The buffer pages
 * are allocated in cache lines, which means that each buffer
 * page will be at the beginning of a cache line, and thus
 * the least significant bits will be zero. We use this to
 * add flags in the list struct pointers, to make the ring buffer
 * lockless.
 */
297
struct buffer_page {
298
	struct list_head list;		/* list of buffer pages */
299
	local_t		 write;		/* index for next write */
300
	unsigned	 read;		/* index for next read */
301
	local_t		 entries;	/* entries on this page */
302
	unsigned long	 real_end;	/* real end of data */
303
	struct buffer_data_page *page;	/* Actual data page */
Steven Rostedt's avatar
Steven Rostedt committed
304
305
};

Steven Rostedt's avatar
Steven Rostedt committed
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/*
 * The buffer page counters, write and entries, must be reset
 * atomically when crossing page boundaries. To synchronize this
 * update, two counters are inserted into the number. One is
 * the actual counter for the write position or count on the page.
 *
 * The other is a counter of updaters. Before an update happens
 * the update partition of the counter is incremented. This will
 * allow the updater to update the counter atomically.
 *
 * The counter is 20 bits, and the state data is 12.
 */
#define RB_WRITE_MASK		0xfffff
#define RB_WRITE_INTCNT		(1 << 20)

321
static void rb_init_page(struct buffer_data_page *bpage)
322
{
323
	local_set(&bpage->commit, 0);
324
325
}

326
327
328
329
330
331
/**
 * ring_buffer_page_len - the size of data on the page.
 * @page: The page to read
 *
 * Returns the amount of data on the page, including buffer page header.
 */
332
333
size_t ring_buffer_page_len(void *page)
{
334
335
	return local_read(&((struct buffer_data_page *)page)->commit)
		+ BUF_PAGE_HDR_SIZE;
336
337
}

338
339
340
341
/*
 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
 * this issue out.
 */
342
static void free_buffer_page(struct buffer_page *bpage)
343
{
344
	free_page((unsigned long)bpage->page);
345
	kfree(bpage);
346
347
}

Steven Rostedt's avatar
Steven Rostedt committed
348
349
350
351
352
353
354
355
356
357
/*
 * We need to fit the time_stamp delta into 27 bits.
 */
static inline int test_time_stamp(u64 delta)
{
	if (delta & TS_DELTA_TEST)
		return 1;
	return 0;
}

358
#define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
Steven Rostedt's avatar
Steven Rostedt committed
359

360
361
362
/* Max payload is BUF_PAGE_SIZE - header (8bytes) */
#define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))

363
364
365
int ring_buffer_print_page_header(struct trace_seq *s)
{
	struct buffer_data_page field;
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390

	trace_seq_printf(s, "\tfield: u64 timestamp;\t"
			 "offset:0;\tsize:%u;\tsigned:%u;\n",
			 (unsigned int)sizeof(field.time_stamp),
			 (unsigned int)is_signed_type(u64));

	trace_seq_printf(s, "\tfield: local_t commit;\t"
			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
			 (unsigned int)offsetof(typeof(field), commit),
			 (unsigned int)sizeof(field.commit),
			 (unsigned int)is_signed_type(long));

	trace_seq_printf(s, "\tfield: int overwrite;\t"
			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
			 (unsigned int)offsetof(typeof(field), commit),
			 1,
			 (unsigned int)is_signed_type(long));

	trace_seq_printf(s, "\tfield: char data;\t"
			 "offset:%u;\tsize:%u;\tsigned:%u;\n",
			 (unsigned int)offsetof(typeof(field), data),
			 (unsigned int)BUF_PAGE_SIZE,
			 (unsigned int)is_signed_type(char));

	return !trace_seq_has_overflowed(s);
391
392
}

393
394
395
struct rb_irq_work {
	struct irq_work			work;
	wait_queue_head_t		waiters;
396
	wait_queue_head_t		full_waiters;
397
	bool				waiters_pending;
398
399
	bool				full_waiters_pending;
	bool				wakeup_full;
400
401
};

402
403
404
405
406
407
408
409
410
411
412
/*
 * Structure to hold event state and handle nested events.
 */
struct rb_event_info {
	u64			ts;
	u64			delta;
	unsigned long		length;
	struct buffer_page	*tail_page;
	int			add_timestamp;
};

413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
/*
 * Used for which event context the event is in.
 *  NMI     = 0
 *  IRQ     = 1
 *  SOFTIRQ = 2
 *  NORMAL  = 3
 *
 * See trace_recursive_lock() comment below for more details.
 */
enum {
	RB_CTX_NMI,
	RB_CTX_IRQ,
	RB_CTX_SOFTIRQ,
	RB_CTX_NORMAL,
	RB_CTX_MAX
};

Steven Rostedt's avatar
Steven Rostedt committed
430
431
432
433
434
/*
 * head_page == tail_page && head == tail then buffer is empty.
 */
struct ring_buffer_per_cpu {
	int				cpu;
435
	atomic_t			record_disabled;
Steven Rostedt's avatar
Steven Rostedt committed
436
	struct ring_buffer		*buffer;
437
	raw_spinlock_t			reader_lock;	/* serialize readers */
438
	arch_spinlock_t			lock;
Steven Rostedt's avatar
Steven Rostedt committed
439
	struct lock_class_key		lock_key;
440
	unsigned int			nr_pages;
441
	unsigned int			current_context;
442
	struct list_head		*pages;
443
444
	struct buffer_page		*head_page;	/* read from head */
	struct buffer_page		*tail_page;	/* write to tail */
Wenji Huang's avatar
Wenji Huang committed
445
	struct buffer_page		*commit_page;	/* committed pages */
446
	struct buffer_page		*reader_page;
447
448
	unsigned long			lost_events;
	unsigned long			last_overrun;
449
	local_t				entries_bytes;
450
	local_t				entries;
451
452
453
	local_t				overrun;
	local_t				commit_overrun;
	local_t				dropped_events;
454
455
	local_t				committing;
	local_t				commits;
Steven Rostedt's avatar
Steven Rostedt committed
456
	unsigned long			read;
457
	unsigned long			read_bytes;
Steven Rostedt's avatar
Steven Rostedt committed
458
459
	u64				write_stamp;
	u64				read_stamp;
460
461
462
	/* ring buffer pages to update, > 0 to add, < 0 to remove */
	int				nr_pages_to_update;
	struct list_head		new_pages; /* new pages to add */
463
	struct work_struct		update_pages_work;
464
	struct completion		update_done;
465
466

	struct rb_irq_work		irq_work;
Steven Rostedt's avatar
Steven Rostedt committed
467
468
469
470
471
472
};

struct ring_buffer {
	unsigned			flags;
	int				cpus;
	atomic_t			record_disabled;
473
	atomic_t			resize_disabled;
474
	cpumask_var_t			cpumask;
Steven Rostedt's avatar
Steven Rostedt committed
475

476
477
	struct lock_class_key		*reader_lock_key;

Steven Rostedt's avatar
Steven Rostedt committed
478
479
480
	struct mutex			mutex;

	struct ring_buffer_per_cpu	**buffers;
481

482
#ifdef CONFIG_HOTPLUG_CPU
483
484
	struct notifier_block		cpu_notify;
#endif
485
	u64				(*clock)(void);
486
487

	struct rb_irq_work		irq_work;
Steven Rostedt's avatar
Steven Rostedt committed
488
489
490
491
492
493
};

struct ring_buffer_iter {
	struct ring_buffer_per_cpu	*cpu_buffer;
	unsigned long			head;
	struct buffer_page		*head_page;
494
495
	struct buffer_page		*cache_reader_page;
	unsigned long			cache_read;
Steven Rostedt's avatar
Steven Rostedt committed
496
497
498
	u64				read_stamp;
};

499
500
501
502
503
504
505
506
507
508
509
/*
 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
 *
 * Schedules a delayed work to wake up any task that is blocked on the
 * ring buffer waiters queue.
 */
static void rb_wake_up_waiters(struct irq_work *work)
{
	struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);

	wake_up_all(&rbwork->waiters);
510
511
512
513
	if (rbwork->wakeup_full) {
		rbwork->wakeup_full = false;
		wake_up_all(&rbwork->full_waiters);
	}
514
515
516
517
518
519
}

/**
 * ring_buffer_wait - wait for input to the ring buffer
 * @buffer: buffer to wait on
 * @cpu: the cpu buffer to wait on
520
 * @full: wait until a full page is available, if @cpu != RING_BUFFER_ALL_CPUS
521
522
523
524
525
 *
 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
 * as data is added to any of the @buffer's cpu buffers. Otherwise
 * it will wait for data to be added to a specific cpu buffer.
 */
526
int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
527
{
528
	struct ring_buffer_per_cpu *uninitialized_var(cpu_buffer);
529
530
	DEFINE_WAIT(wait);
	struct rb_irq_work *work;
531
	int ret = 0;
532
533
534
535
536
537

	/*
	 * Depending on what the caller is waiting for, either any
	 * data in any cpu buffer, or a specific buffer, put the
	 * caller on the appropriate wait queue.
	 */
538
	if (cpu == RING_BUFFER_ALL_CPUS) {
539
		work = &buffer->irq_work;
540
541
542
		/* Full only makes sense on per cpu reads */
		full = false;
	} else {
543
544
		if (!cpumask_test_cpu(cpu, buffer->cpumask))
			return -ENODEV;
545
546
547
548
549
		cpu_buffer = buffer->buffers[cpu];
		work = &cpu_buffer->irq_work;
	}


550
	while (true) {
551
552
553
554
		if (full)
			prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
		else
			prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575

		/*
		 * The events can happen in critical sections where
		 * checking a work queue can cause deadlocks.
		 * After adding a task to the queue, this flag is set
		 * only to notify events to try to wake up the queue
		 * using irq_work.
		 *
		 * We don't clear it even if the buffer is no longer
		 * empty. The flag only causes the next event to run
		 * irq_work to do the work queue wake up. The worse
		 * that can happen if we race with !trace_empty() is that
		 * an event will cause an irq_work to try to wake up
		 * an empty queue.
		 *
		 * There's no reason to protect this flag either, as
		 * the work queue and irq_work logic will do the necessary
		 * synchronization for the wake ups. The only thing
		 * that is necessary is that the wake up happens after
		 * a task has been queued. It's OK for spurious wake ups.
		 */
576
577
578
579
		if (full)
			work->full_waiters_pending = true;
		else
			work->waiters_pending = true;
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603

		if (signal_pending(current)) {
			ret = -EINTR;
			break;
		}

		if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
			break;

		if (cpu != RING_BUFFER_ALL_CPUS &&
		    !ring_buffer_empty_cpu(buffer, cpu)) {
			unsigned long flags;
			bool pagebusy;

			if (!full)
				break;

			raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
			pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
			raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

			if (!pagebusy)
				break;
		}
604
605

		schedule();
606
	}
607

608
609
610
611
	if (full)
		finish_wait(&work->full_waiters, &wait);
	else
		finish_wait(&work->waiters, &wait);
612
613

	return ret;
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
}

/**
 * ring_buffer_poll_wait - poll on buffer input
 * @buffer: buffer to wait on
 * @cpu: the cpu buffer to wait on
 * @filp: the file descriptor
 * @poll_table: The poll descriptor
 *
 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
 * as data is added to any of the @buffer's cpu buffers. Otherwise
 * it will wait for data to be added to a specific cpu buffer.
 *
 * Returns POLLIN | POLLRDNORM if data exists in the buffers,
 * zero otherwise.
 */
int ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
			  struct file *filp, poll_table *poll_table)
{
	struct ring_buffer_per_cpu *cpu_buffer;
	struct rb_irq_work *work;

	if (cpu == RING_BUFFER_ALL_CPUS)
		work = &buffer->irq_work;
	else {
639
640
641
		if (!cpumask_test_cpu(cpu, buffer->cpumask))
			return -EINVAL;

642
643
644
645
646
		cpu_buffer = buffer->buffers[cpu];
		work = &cpu_buffer->irq_work;
	}

	poll_wait(filp, &work->waiters, poll_table);
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
	work->waiters_pending = true;
	/*
	 * There's a tight race between setting the waiters_pending and
	 * checking if the ring buffer is empty.  Once the waiters_pending bit
	 * is set, the next event will wake the task up, but we can get stuck
	 * if there's only a single event in.
	 *
	 * FIXME: Ideally, we need a memory barrier on the writer side as well,
	 * but adding a memory barrier to all events will cause too much of a
	 * performance hit in the fast path.  We only need a memory barrier when
	 * the buffer goes from empty to having content.  But as this race is
	 * extremely small, and it's not a problem if another event comes in, we
	 * will fix it later.
	 */
	smp_mb();
662
663
664
665
666
667
668

	if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
	    (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
		return POLLIN | POLLRDNORM;
	return 0;
}

669
/* buffer may be either ring_buffer or ring_buffer_per_cpu */
670
671
672
673
674
675
676
677
678
679
680
681
682
#define RB_WARN_ON(b, cond)						\
	({								\
		int _____ret = unlikely(cond);				\
		if (_____ret) {						\
			if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
				struct ring_buffer_per_cpu *__b =	\
					(void *)b;			\
				atomic_inc(&__b->buffer->record_disabled); \
			} else						\
				atomic_inc(&b->record_disabled);	\
			WARN_ON(1);					\
		}							\
		_____ret;						\
683
	})
684

685
686
687
/* Up this if you want to test the TIME_EXTENTS and normalization */
#define DEBUG_SHIFT 0

688
static inline u64 rb_time_stamp(struct ring_buffer *buffer)
689
690
691
692
693
{
	/* shift to debug/test normalization and TIME_EXTENTS */
	return buffer->clock() << DEBUG_SHIFT;
}

694
695
696
697
698
u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
{
	u64 time;

	preempt_disable_notrace();
699
	time = rb_time_stamp(buffer);
700
701
702
703
704
705
706
707
708
709
710
711
712
713
	preempt_enable_no_resched_notrace();

	return time;
}
EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);

void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
				      int cpu, u64 *ts)
{
	/* Just stupid testing the normalize function and deltas */
	*ts >>= DEBUG_SHIFT;
}
EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);

Steven Rostedt's avatar
Steven Rostedt committed
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
/*
 * Making the ring buffer lockless makes things tricky.
 * Although writes only happen on the CPU that they are on,
 * and they only need to worry about interrupts. Reads can
 * happen on any CPU.
 *
 * The reader page is always off the ring buffer, but when the
 * reader finishes with a page, it needs to swap its page with
 * a new one from the buffer. The reader needs to take from
 * the head (writes go to the tail). But if a writer is in overwrite
 * mode and wraps, it must push the head page forward.
 *
 * Here lies the problem.
 *
 * The reader must be careful to replace only the head page, and
 * not another one. As described at the top of the file in the
 * ASCII art, the reader sets its old page to point to the next
 * page after head. It then sets the page after head to point to
 * the old reader page. But if the writer moves the head page
 * during this operation, the reader could end up with the tail.
 *
 * We use cmpxchg to help prevent this race. We also do something
 * special with the page before head. We set the LSB to 1.
 *
 * When the writer must push the page forward, it will clear the
 * bit that points to the head page, move the head, and then set
 * the bit that points to the new head page.
 *
 * We also don't want an interrupt coming in and moving the head
 * page on another writer. Thus we use the second LSB to catch
 * that too. Thus:
 *
 * head->list->prev->next        bit 1          bit 0
 *                              -------        -------
 * Normal page                     0              0
 * Points to head page             0              1
 * New head page                   1              0
 *
 * Note we can not trust the prev pointer of the head page, because:
 *
 * +----+       +-----+        +-----+
 * |    |------>|  T  |---X--->|  N  |
 * |    |<------|     |        |     |
 * +----+       +-----+        +-----+
 *   ^                           ^ |
 *   |          +-----+          | |
 *   +----------|  R  |----------+ |
 *              |     |<-----------+
 *              +-----+
 *
 * Key:  ---X-->  HEAD flag set in pointer
 *         T      Tail page
 *         R      Reader page
 *         N      Next page
 *
 * (see __rb_reserve_next() to see where this happens)
 *
 *  What the above shows is that the reader just swapped out
 *  the reader page with a page in the buffer, but before it
 *  could make the new header point back to the new page added
 *  it was preempted by a writer. The writer moved forward onto
 *  the new page added by the reader and is about to move forward
 *  again.
 *
 *  You can see, it is legitimate for the previous pointer of
 *  the head (or any page) not to point back to itself. But only
 *  temporarially.
 */

#define RB_PAGE_NORMAL		0UL
#define RB_PAGE_HEAD		1UL
#define RB_PAGE_UPDATE		2UL


#define RB_FLAG_MASK		3UL

/* PAGE_MOVED is not part of the mask */
#define RB_PAGE_MOVED		4UL

/*
 * rb_list_head - remove any bit
 */
static struct list_head *rb_list_head(struct list_head *list)
{
	unsigned long val = (unsigned long)list;

	return (struct list_head *)(val & ~RB_FLAG_MASK);
}

/*
804
 * rb_is_head_page - test if the given page is the head page
Steven Rostedt's avatar
Steven Rostedt committed
805
806
807
808
809
810
 *
 * Because the reader may move the head_page pointer, we can
 * not trust what the head page is (it may be pointing to
 * the reader page). But if the next page is a header page,
 * its flags will be non zero.
 */
811
static inline int
Steven Rostedt's avatar
Steven Rostedt committed
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
		struct buffer_page *page, struct list_head *list)
{
	unsigned long val;

	val = (unsigned long)list->next;

	if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
		return RB_PAGE_MOVED;

	return val & RB_FLAG_MASK;
}

/*
 * rb_is_reader_page
 *
 * The unique thing about the reader page, is that, if the
 * writer is ever on it, the previous pointer never points
 * back to the reader page.
 */
832
static bool rb_is_reader_page(struct buffer_page *page)
Steven Rostedt's avatar
Steven Rostedt committed
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
{
	struct list_head *list = page->list.prev;

	return rb_list_head(list->next) != &page->list;
}

/*
 * rb_set_list_to_head - set a list_head to be pointing to head.
 */
static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
				struct list_head *list)
{
	unsigned long *ptr;

	ptr = (unsigned long *)&list->next;
	*ptr |= RB_PAGE_HEAD;
	*ptr &= ~RB_PAGE_UPDATE;
}

/*
 * rb_head_page_activate - sets up head page
 */
static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
{
	struct buffer_page *head;

	head = cpu_buffer->head_page;
	if (!head)
		return;

	/*
	 * Set the previous list pointer to have the HEAD flag.
	 */
	rb_set_list_to_head(cpu_buffer, head->list.prev);
}

static void rb_list_head_clear(struct list_head *list)
{
	unsigned long *ptr = (unsigned long *)&list->next;

	*ptr &= ~RB_FLAG_MASK;
}

/*
 * rb_head_page_dactivate - clears head page ptr (for free list)
 */
static void
rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
{
	struct list_head *hd;

	/* Go through the whole list and clear any pointers found. */
	rb_list_head_clear(cpu_buffer->pages);

	list_for_each(hd, cpu_buffer->pages)
		rb_list_head_clear(hd);
}

static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
			    struct buffer_page *head,
			    struct buffer_page *prev,
			    int old_flag, int new_flag)
{
	struct list_head *list;
	unsigned long val = (unsigned long)&head->list;
	unsigned long ret;

	list = &prev->list;

	val &= ~RB_FLAG_MASK;

904
905
	ret = cmpxchg((unsigned long *)&list->next,
		      val | old_flag, val | new_flag);
Steven Rostedt's avatar
Steven Rostedt committed
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996

	/* check if the reader took the page */
	if ((ret & ~RB_FLAG_MASK) != val)
		return RB_PAGE_MOVED;

	return ret & RB_FLAG_MASK;
}

static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
				   struct buffer_page *head,
				   struct buffer_page *prev,
				   int old_flag)
{
	return rb_head_page_set(cpu_buffer, head, prev,
				old_flag, RB_PAGE_UPDATE);
}

static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
				 struct buffer_page *head,
				 struct buffer_page *prev,
				 int old_flag)
{
	return rb_head_page_set(cpu_buffer, head, prev,
				old_flag, RB_PAGE_HEAD);
}

static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
				   struct buffer_page *head,
				   struct buffer_page *prev,
				   int old_flag)
{
	return rb_head_page_set(cpu_buffer, head, prev,
				old_flag, RB_PAGE_NORMAL);
}

static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
			       struct buffer_page **bpage)
{
	struct list_head *p = rb_list_head((*bpage)->list.next);

	*bpage = list_entry(p, struct buffer_page, list);
}

static struct buffer_page *
rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
{
	struct buffer_page *head;
	struct buffer_page *page;
	struct list_head *list;
	int i;

	if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
		return NULL;

	/* sanity check */
	list = cpu_buffer->pages;
	if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
		return NULL;

	page = head = cpu_buffer->head_page;
	/*
	 * It is possible that the writer moves the header behind
	 * where we started, and we miss in one loop.
	 * A second loop should grab the header, but we'll do
	 * three loops just because I'm paranoid.
	 */
	for (i = 0; i < 3; i++) {
		do {
			if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
				cpu_buffer->head_page = page;
				return page;
			}
			rb_inc_page(cpu_buffer, &page);
		} while (page != head);
	}

	RB_WARN_ON(cpu_buffer, 1);

	return NULL;
}

static int rb_head_page_replace(struct buffer_page *old,
				struct buffer_page *new)
{
	unsigned long *ptr = (unsigned long *)&old->list.prev->next;
	unsigned long val;
	unsigned long ret;

	val = *ptr & ~RB_FLAG_MASK;
	val |= RB_PAGE_HEAD;

997
	ret = cmpxchg(ptr, val, (unsigned long)&new->list);
Steven Rostedt's avatar
Steven Rostedt committed
998
999
1000
1001
1002
1003
1004

	return ret == val;
}

/*
 * rb_tail_page_update - move the tail page forward
 */
1005
static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
Steven Rostedt's avatar
Steven Rostedt committed
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
			       struct buffer_page *tail_page,
			       struct buffer_page *next_page)
{
	unsigned long old_entries;
	unsigned long old_write;

	/*
	 * The tail page now needs to be moved forward.
	 *
	 * We need to reset the tail page, but without messing
	 * with possible erasing of data brought in by interrupts
	 * that have moved the tail page and are currently on it.
	 *
	 * We add a counter to the write field to denote this.
	 */
	old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
	old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);

	/*
	 * Just make sure we have seen our old_write and synchronize
	 * with any interrupts that come in.
	 */
	barrier();

	/*
	 * If the tail page is still the same as what we think
	 * it is, then it is up to us to update the tail
	 * pointer.
	 */
1035
	if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
Steven Rostedt's avatar
Steven Rostedt committed
1036
1037
1038
1039
1040
1041
1042
1043
		/* Zero the write counter */
		unsigned long val = old_write & ~RB_WRITE_MASK;
		unsigned long eval = old_entries & ~RB_WRITE_MASK;

		/*
		 * This will only succeed if an interrupt did
		 * not come in and change it. In which case, we
		 * do not want to modify it.
1044
1045
1046
1047
1048
		 *
		 * We add (void) to let the compiler know that we do not care
		 * about the return value of these functions. We use the
		 * cmpxchg to only update if an interrupt did not already
		 * do it for us. If the cmpxchg fails, we don't care.
Steven Rostedt's avatar
Steven Rostedt committed
1049
		 */
1050
1051
		(void)local_cmpxchg(&next_page->write, old_write, val);
		(void)local_cmpxchg(&next_page->entries, old_entries, eval);
Steven Rostedt's avatar
Steven Rostedt committed
1052
1053
1054
1055
1056
1057
1058
1059

		/*
		 * No need to worry about races with clearing out the commit.
		 * it only can increment when a commit takes place. But that
		 * only happens in the outer most nested commit.
		 */
		local_set(&next_page->page->commit, 0);

1060
1061
		/* Again, either we update tail_page or an interrupt does */
		(void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
Steven Rostedt's avatar
Steven Rostedt committed
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
	}
}

static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
			  struct buffer_page *bpage)
{
	unsigned long val = (unsigned long)bpage;

	if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
		return 1;

	return 0;
}

/**
 * rb_check_list - make sure a pointer to a list has the last bits zero
 */
static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
			 struct list_head *list)
{
	if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
		return 1;
	if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
		return 1;
	return 0;
}

Steven Rostedt's avatar
Steven Rostedt committed
1089
/**
1090
 * rb_check_pages - integrity check of buffer pages
Steven Rostedt's avatar
Steven Rostedt committed
1091
1092
 * @cpu_buffer: CPU buffer with pages to test
 *
Wenji Huang's avatar
Wenji Huang committed
1093
 * As a safety measure we check to make sure the data pages have not
Steven Rostedt's avatar
Steven Rostedt committed
1094
1095
1096
1097
 * been corrupted.
 */
static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
1098
	struct list_head *head = cpu_buffer->pages;
1099
	struct buffer_page *bpage, *tmp;
Steven Rostedt's avatar
Steven Rostedt committed
1100

1101
1102
1103
1104
	/* Reset the head page if it exists */
	if (cpu_buffer->head_page)
		rb_set_head_page(cpu_buffer);

Steven Rostedt's avatar
Steven Rostedt committed
1105
1106
	rb_head_page_deactivate(cpu_buffer);

1107
1108
1109
1110
	if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
		return -1;
	if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
		return -1;
Steven Rostedt's avatar
Steven Rostedt committed
1111

Steven Rostedt's avatar
Steven Rostedt committed
1112
1113
1114
	if (rb_check_list(cpu_buffer, head))
		return -1;

1115
	list_for_each_entry_safe(bpage, tmp, head, list) {
1116
		if (RB_WARN_ON(cpu_buffer,
1117
			       bpage->list.next->prev != &bpage->list))
1118
1119
			return -1;
		if (RB_WARN_ON(cpu_buffer,
1120
			       bpage->list.prev->next != &bpage->list))
1121
			return -1;
Steven Rostedt's avatar
Steven Rostedt committed
1122
1123
		if (rb_check_list(cpu_buffer, &bpage->list))
			return -1;
Steven Rostedt's avatar
Steven Rostedt committed
1124
1125
	}

Steven Rostedt's avatar
Steven Rostedt committed
1126
1127
	rb_head_page_activate(cpu_buffer);

Steven Rostedt's avatar
Steven Rostedt committed
1128
1129
1130
	return 0;
}

1131
static int __rb_allocate_pages(int nr_pages, struct list_head *pages, int cpu)
Steven Rostedt's avatar
Steven Rostedt committed
1132
{
1133
	int i;
1134
	struct buffer_page *bpage, *tmp;
1135

Steven Rostedt's avatar
Steven Rostedt committed
1136
	for (i = 0; i < nr_pages; i++) {
1137
		struct page *page;
1138
1139
1140
1141
1142
		/*
		 * __GFP_NORETRY flag makes sure that the allocation fails
		 * gracefully without invoking oom-killer and the system is
		 * not destabilized.
		 */
1143
		bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1144
				    GFP_KERNEL | __GFP_NORETRY,
1145
				    cpu_to_node(cpu));
1146
		if (!bpage)
1147
			goto free_pages;
Steven Rostedt's avatar
Steven Rostedt committed
1148

1149
		list_add(&bpage->list, pages);
Steven Rostedt's avatar
Steven Rostedt committed
1150

1151
		page = alloc_pages_node(cpu_to_node(cpu),
1152
					GFP_KERNEL | __GFP_NORETRY, 0);
1153
		if (!page)
Steven Rostedt's avatar
Steven Rostedt committed
1154
			goto free_pages;
1155
		bpage->page = page_address(page);
1156
		rb_init_page(bpage->page);
Steven Rostedt's avatar
Steven Rostedt committed
1157
1158
	}

1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
	return 0;

free_pages:
	list_for_each_entry_safe(bpage, tmp, pages, list) {
		list_del_init(&bpage->list);
		free_buffer_page(bpage);
	}

	return -ENOMEM;
}

static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
			     unsigned nr_pages)
{
	LIST_HEAD(pages);

	WARN_ON(!nr_pages);

	if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu))
		return -ENOMEM;

1180
1181
1182
1183
1184
1185
1186
	/*
	 * The ring buffer page list is a circular list that does not
	 * start and end with a list head. All page list items point to
	 * other pages.
	 */
	cpu_buffer->pages = pages.next;
	list_del(&pages);
Steven Rostedt's avatar
Steven Rostedt committed
1187

1188
1189
	cpu_buffer->nr_pages = nr_pages;

Steven Rostedt's avatar
Steven Rostedt committed
1190
1191
1192
1193
1194
1195
	rb_check_pages(cpu_buffer);

	return 0;
}

static struct ring_buffer_per_cpu *
1196
rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
Steven Rostedt's avatar
Steven Rostedt committed
1197
1198
{
	struct ring_buffer_per_cpu *cpu_buffer;
1199
	struct buffer_page *bpage;
1200
	struct page *page;
Steven Rostedt's avatar
Steven Rostedt committed
1201
1202
1203
1204
1205
1206
1207
1208
1209
	int ret;

	cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
				  GFP_KERNEL, cpu_to_node(cpu));
	if (!cpu_buffer)
		return NULL;

	cpu_buffer->cpu = cpu;
	cpu_buffer->buffer = buffer;
1210
	raw_spin_lock_init(&cpu_buffer->reader_lock);
1211
	lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1212
	cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1213
	INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1214
	init_completion(&cpu_buffer->update_done);
1215
	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1216
	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1217
	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
Steven Rostedt's avatar
Steven Rostedt committed
1218

1219
	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1220
			    GFP_KERNEL, cpu_to_node(cpu));
1221
	if (!bpage)
1222
1223
		goto fail_free_buffer;

Steven Rostedt's avatar
Steven Rostedt committed
1224
1225
	rb_check_bpage(cpu_buffer, bpage);

1226
	cpu_buffer->reader_page = bpage;
1227
1228
	page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
	if (!page)
1229
		goto fail_free_reader;
1230
	bpage->page = page_address(page);
1231
	rb_init_page(bpage->page);
1232

1233
	INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1234
	INIT_LIST_HEAD(&cpu_buffer->new_pages);
1235

1236
	ret = rb_allocate_pages(cpu_buffer, nr_pages);
Steven Rostedt's avatar
Steven Rostedt committed
1237
	if (ret < 0)
1238
		goto fail_free_reader;
Steven Rostedt's avatar
Steven Rostedt committed
1239
1240

	cpu_buffer->head_page
1241
		= list_entry(cpu_buffer->pages, struct buffer_page, list);
Steven Rostedt's avatar
Steven Rostedt committed
1242
	cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
Steven Rostedt's avatar
Steven Rostedt committed
1243

Steven Rostedt's avatar
Steven Rostedt committed
1244
1245
	rb_head_page_activate(cpu_buffer);

Steven Rostedt's avatar
Steven Rostedt committed
1246
1247
	return cpu_buffer;

1248
1249
1250
 fail_free_reader:
	free_buffer_page(cpu_buffer->reader_page);

Steven Rostedt's avatar
Steven Rostedt committed
1251
1252
1253
1254
1255
1256
1257
 fail_free_buffer:
	kfree(cpu_buffer);
	return NULL;
}

static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
{
1258
	struct list_head *head = cpu_buffer->pages;
1259
	struct buffer_page *bpage, *tmp;
Steven Rostedt's avatar
Steven Rostedt committed
1260

1261
1262
	free_buffer_page(cpu_buffer->reader_page);

Steven Rostedt's avatar
Steven Rostedt committed
1263
1264
	rb_head_page_deactivate(cpu_buffer);

1265
1266
1267
1268
1269
1270
	if (head) {
		list_for_each_entry_safe(bpage, tmp, head, list) {
			list_del_init(&bpage->list);
			free_buffer_page(bpage);
		}
		bpage = list_entry(head, struct buffer_page, list);
1271
		free_buffer_page(bpage);
Steven Rostedt's avatar
Steven Rostedt committed
1272
	}
1273

Steven Rostedt's avatar
Steven Rostedt committed
1274
1275
1276
	kfree(cpu_buffer);
}

1277
#ifdef CONFIG_HOTPLUG_CPU
1278
1279
static int rb_cpu_notify(struct notifier_block *self,
			 unsigned long action, void *hcpu);
1280
1281
#endif

Steven Rostedt's avatar
Steven Rostedt committed
1282
/**
1283
 * __ring_buffer_alloc - allocate a new ring_buffer
1284
 * @size: the size in bytes per cpu that is needed.
Steven Rostedt's avatar
Steven Rostedt committed
1285
1286
1287
1288
1289
1290
1291
 * @flags: attributes to set for the ring buffer.
 *
 * Currently the only flag that is available is the RB_FL_OVERWRITE
 * flag. This flag means that the buffer will overwrite old data
 * when the buffer wraps. If this flag is not set, the buffer will
 * drop data when the tail hits the head.
 */
1292
1293
struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
					struct lock_class_key *key)
Steven Rostedt's avatar
Steven Rostedt committed
1294
1295
1296
{
	struct ring_buffer *buffer;
	int bsize;
1297
	int cpu, nr_pages;
Steven Rostedt's avatar
Steven Rostedt committed
1298
1299
1300
1301
1302
1303
1304

	/* keep it in its own cache line */
	buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
			 GFP_KERNEL);
	if (!buffer)
		return NULL;

1305
1306
1307
	if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
		goto fail_free_buffer;

1308
	nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
Steven Rostedt's avatar
Steven Rostedt committed
1309
	buffer->flags = flags;
1310
	buffer->clock = trace_clock_local;
1311
	buffer->reader_lock_key = key;
Steven Rostedt's avatar
Steven Rostedt committed
1312

1313
	init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1314
	init_waitqueue_head(&buffer->irq_work.waiters);
1315

Steven Rostedt's avatar
Steven Rostedt committed
1316
	/* need at least two pages */
1317
1318
	if (nr_pages < 2)
		nr_pages = 2;
Steven Rostedt's avatar
Steven Rostedt committed
1319

1320
1321
1322
1323
1324
1325
	/*
	 * In case of non-hotplug cpu, if the ring-buffer is allocated
	 * in early initcall, it will not be notified of secondary cpus.
	 * In that off case, we need to allocate for all possible cpus.
	 */
#ifdef CONFIG_HOTPLUG_CPU
1326
	cpu_notifier_register_begin();
1327
	cpumask_copy(buffer->cpumask, cpu_online_mask);