Commit 915880ff authored by Charlie Jacobsen's avatar Charlie Jacobsen
Browse files

lcd: First draft of libfipc implementation.

Moved old implementation into old2.
parent 45c40870
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/miscdevice.h>
#include <linux/fs.h>
#include <linux/string.h>
#include <linux/slab.h>
#include <linux/sched.h>
#include <linux/irqflags.h>
#include <linux/kthread.h>
#include <linux/cpumask.h>
#include <linux/preempt.h>
#include <asm/uaccess.h>
#include <asm/mwait.h>
#include <asm/page_types.h>
#include <asm/cpufeature.h>
#include <linux/ktime.h>
#include <linux/sort.h>
#include <asm/tsc.h>
#include <lcd-domains/thc.h>
#include <lcd-domains/thcinternal.h>
#include <lcd-domains/awe-mapper.h>
#include "../ring-chan/ring-channel.h"
#include "ipc.h"
static unsigned int tx_slot_avail = 0xC1346BAD;
static unsigned int send_message = 0xBADBEEF;
static unsigned int rx_msg_avail = 0xBADBEEF;
static unsigned int trans_complete = 0xC1346BAD;
awe_t* get_awe_from_msg_id(unsigned long msg_id)
{
if( sizeof(unsigned long) != sizeof(awe_t*) )
printk(KERN_WARNING "mismatched sizes in get_awe_from_msg_id\n");
/*
* ipc.c
*
* Authors: Anton Burtsev, Scotty Bauer
* Date: October 2011, Feburary 2015
*
* Copyright: University of Utah
*/
return (awe_t*)msg_id;
}
EXPORT_SYMBOL(get_awe_from_msg_id);
#include <libfipc.h>
static inline void monitor_mwait(unsigned long rcx, volatile uint32_t *rax,
unsigned long wait_type)
{
abcdef
__monitor((void *)rax, 0, 0);
/* TODO comment for memory barrier, why is this necessary? */
mb();
__mwait(wait_type, rcx);
}
#define FIPC_MSG_STATUS_AVAILABLE 0xdeaddeadUL
#define FIPC_MSG_STATUS_SENT 0xfeedfeedUL
static inline int check_rx_slot_available(struct ipc_message *loc)
static inline unsigned long get_tx_slot(struct fipc_ring_channel *rc)
{
return (likely(loc->msg_status != rx_msg_avail));
return rc->tx.slot;
}
static inline int check_tx_slot_available(struct ipc_message *loc)
static inline unsigned long get_rx_slot(struct fipc_ring_channel *rc)
{
return (unlikely(loc->msg_status != tx_slot_avail));
return rc->rx.slot;
}
static int wait_for_tx_slot(struct ipc_message *imsg)
static inline void set_tx_slot(struct fipc_ring_channel *rc, unsigned long num)
{
while (check_tx_slot_available(imsg)) {
#if defined(USE_MWAIT)
cpu_relax();
monitor_mwait(ecx, &imsg->msg_status, cstate_wait);
#endif//usemwait
#if defined(POLL)
cpu_relax();
#endif
}
return 0;
rc->tx.slot = num;
}
static int wait_for_rx_slot(struct ipc_message *imsg)
static inline void set_rx_slot(struct fipc_ring_channel *rc, unsigned long num)
{
while (check_rx_slot_available(imsg)) { //while a slot is not available
#if defined(USE_MWAIT)
monitor_mwait(ecx, &imsg->msg_status, cstate_wait);
#endif//usemwait
#if defined(POLL)
cpu_relax();
#endif
}
return 0;
rc->rx.slot = num;
}
static struct task_struct *attach_data_to_channel(void *chan_data,
int CPU_PIN,
int (*threadfn)(void *data)) {
struct cpumask cpu_core;
struct task_struct* thread;
if (!chan_data)
return NULL;
if (CPU_PIN > num_online_cpus()) {
pr_err("Trying to pin on cpu > than avail # cpus\n");
return NULL;
}
thread = kthread_create(threadfn, chan_data,
"AsyncIPC.%u", CPU_PIN);
if (IS_ERR(thread)) {
pr_err("Error while creating kernel thread\n");
return NULL;
}
static inline unsigned long inc_tx_slot(struct fipc_ring_channel *rc)
{
return (rc->tx.slot++);
}
get_task_struct(thread);
static inline unsigned long inc_rx_slot(struct fipc_ring_channel *rc)
{
return (rc->rx.slot++);
}
cpumask_clear(&cpu_core);
cpumask_set_cpu(CPU_PIN , &cpu_core);
static inline struct fipc_message*
get_current_tx_slot(struct fipc_ring_channel *rc)
{
unsigned long idx = rc->tx.slot & rc->tx.order_two_mask;
return &rc->tx.buffer[idx];
}
set_cpus_allowed_ptr(thread, &cpu_core);
static inline struct fipc_message*
get_current_rx_slot(struct fipc_ring_channel *rc)
{
unsigned long idx = rc->rx.slot & rc->rx.order_two_mask;
return &rc->rx.buffer[idx];
}
return thread;
static inline int check_rx_slot_available(struct ipc_message *slot)
{
return (likely(slot->msg_status != FIPC_MSG_STATUS_SENT));
}
static inline int check_tx_slot_available(struct fipc_message *slot)
{
return unlikely(slot->msg_status != FIPC_MSG_STATUS_AVAILABLE);
}
struct task_struct * attach_channels_to_thread(ttd_ring_channel_group_t *chan_group,
int CPU_PIN,
int (*threadfn)(void *data))
static inline unsigned long nr_slots(unsigned int buf_order)
{
chan_group->thread = attach_data_to_channel((void *)chan_group, CPU_PIN, threadfn);
return chan_group->thread;
return (1UL << buf_order) / sizeof(struct fipc_ring_buf);
}
EXPORT_SYMBOL(attach_channels_to_thread);
static inline unsigned long order_two_mask(unsigned int buf_order)
{
return nr_slots(buf_order) - 1;
}
int fipc_prep_buffers(unsigned int buf_order, void *buffer_1, void *buffer_2);
{
unsigned long i;
struct ipc_message *msg_buffer_1 = buffer_1;
struct ipc_message *msg_buffer_2 = buffer_2;
/*
* Buffers must be at least as big as one ipc message slot
*/
if ((1UL << buf_order) < sizeof(struct fipc_message))
return -EINVAL;
/*
* Initialize slots as available
*/
for (i = 0; i < nr_slots(buf_order); i++) {
msg_buffer_1[i].msg_status = FIPC_MSG_STATUS_AVAILABLE;
msg_buffer_2[i].msg_status = FIPC_MSG_STATUS_AVAILABLE;
}
return 0;
}
struct task_struct *attach_thread_to_channel(struct ttd_ring_channel *chan,
int CPU_PIN,
int (*threadfn)(void *data)) {
static void ring_buf_init(struct fipc_ring_buf *ring_buf,
unsigned int buf_order,
void *buffer)
{
fipc_mutex_init(&ring_buf->lock);
ring_buf->buffer = buffer;
ring_buf->order_two_mask = order_two_mask(buf_order);
}
int fipc_ring_channel_init(struct fipc_ring_channel *chnl,
unsigned int buf_order,
void *buffer_tx, void *buffer_rx)
{
/*
* Checks at compile time
*/
BUILD_BUG_ON_NOT_POWER_OF_2(FIPC_CACHE_LINE_SIZE);
BUILD_BUG_ON(sizeof(struct fipc_ring_buf) != FIPC_CACHE_LINE_SIZE);
BUILD_BUG_ON(sizeof(struct fipc_message) != FIPC_CACHE_LINE_SIZE);
/*
* Buffers must be as big as one ipc message slot
*/
if ((1UL << buf_order) < sizeof(struct fipc_message))
return -EINVAL;
/*
* Initialize tx and rx
*/
memset(chnl, 0, sizeof(*chnl));
ring_buf_init(&chnl->tx, buf_order, buffer_tx);
ring_buf_init(&chnl->rx, buf_order, buffer_rx);
return attach_data_to_channel((void *)chan, CPU_PIN, threadfn);
return 0;
}
EXPORT_SYMBOL(attach_thread_to_channel);
/*
* Create a channel with a ring-buffer of size pages
*/
struct ttd_ring_channel *create_channel(unsigned long size_pages)
int fipc_send_msg_start(struct fipc_ring_channel *chnl,
struct fipc_message **msg)
{
int ret = -EWOULDBLOCK;
int i,ret;
struct ttd_ring_channel *channel;
fipc_mutex_lock(&chnl->tx.lock);
if (check_tx_slot_available(get_current_tx_slot(chnl))) {
*msg = get_current_tx_slot(chnl);
inc_tx_slot(chnl);
ret = 0;
if (((sizeof(unsigned long) * CHAR_BITS) -
(__builtin_clzl(size_pages*PAGE_SIZE)-1)) % 2 != 0) {
pr_err("buffers _MUST_ be on order 2 size, ie 2^2 or 2^4 etc");
return NULL;
}
channel = kzalloc(sizeof(*channel), GFP_KERNEL);
fipc_mutex_unlock(&chnl->tx.lock);
if (!channel) {
pr_err("could not alloc space for channel");
return NULL;
}
return ret;
}
ret = ttd_ring_channel_alloc(channel,
size_pages,
sizeof(struct ipc_message));
int fipc_send_msg_end(struct fipc_ring_channel *chnl,
struct fipc_message *msg)
{
msg->msg_status = FIPC_MSG_STATUS_SENT;
return 0;
}
if (ret != 0) {
pr_err("Failed to alloc/Init ring channel\n");
return NULL;
}
/* Expects rx to be locked! */
static int recv_msg_peek(struct fipc_ring_channel *chnl,
struct fipc_message **msg)
{
int ret = -EWOULDBLOCK;
pr_debug("Channel is at %p, recs are %p to %p\n", (void*)channel,
channel->tx.recs,
channel->tx.recs + (size_pages * PAGE_SIZE));
if (check_rx_slot_available(get_current_rx_slot(chnl))) {
/* We init the buffer to say each slot is free */
for (i = 0; i < (size_pages * PAGE_SIZE)/sizeof(int); i++)
*((int *)channel->tx.recs+i) = tx_slot_avail;
*msg = get_current_rx_slot(chnl);
ret = 0;
return channel;
}
EXPORT_SYMBOL(create_channel);
}
void free_channel(struct ttd_ring_channel *channel)
{
ttd_ring_channel_free(channel);
kfree(channel);
return ret;
}
EXPORT_SYMBOL(free_channel);
void free_thread(struct task_struct *thread)
int fipc_recv_msg_start(struct fipc_ring_channel *chnl,
struct fipc_message **msg)
{
put_task_struct(thread);
}
EXPORT_SYMBOL(free_thread);
int ret;
struct fipc_message *m;
fipc_mutex_lock(&chnl->rx.lock);
void send(struct ttd_ring_channel *tx, struct ipc_message *trans)
{
trans->msg_status = send_message;
inc_tx_slot(tx);
}
EXPORT_SYMBOL(send);
ret = recv_msg_peek(chnl, &m);
if (!ret) {
/* Message waiting to be received */
*msg = m;
inc_rx_slot(chnl);
}
struct ipc_message *recv(struct ttd_ring_channel *rx)
{
struct ipc_message *recv_msg;
fipc_mutex_unlock(&chnl->rx.lock);
recv_msg = get_rx_rec(rx, sizeof(struct ipc_message));
inc_rx_slot(rx);
wait_for_rx_slot(recv_msg);
return recv_msg;
return ret;
}
EXPORT_SYMBOL(recv);
/*
Takes an array of rx channels to iterate over. This function does one
loop over the array and populates 'msg' with a received message and returns true
if there is a message, else it returns false.
curr_ind: the index to start iterating and wrap around to. The value of this when
the function is finished will be the index of the ipc that has a message.
NOTE: right now this just checks the first rx slot for each channel that previously didn't have a message.
To make this check for everything where there could be a message, it would need to check the interval [rx, tx]
*/
bool poll_recv(struct ttd_ring_channel_group* rx_group, int* curr_ind, struct ipc_message** msg)
int fipc_recv_msg_if(struct fipc_ring_channel *chnl,
int (*pred)(struct fipc_message *, void *),
void *data,
struct fipc_message **msg)
{
struct ttd_ring_channel* curr_chan;
struct ipc_message *recv_msg;
int i;
for( i = 0; i < rx_group->chans_length; i++ )
{
*curr_ind = ((*curr_ind) + i) % (rx_group->chans_length);
curr_chan = rx_group->chans[*curr_ind];
recv_msg = get_rx_rec(curr_chan, sizeof(struct ipc_message));
if( !check_rx_slot_available(recv_msg) ) //if message exists
{
*msg = recv_msg;
if( recv_msg->msg_type == msg_type_request )
{
inc_rx_slot(curr_chan);
}
return true;
}
}
return false;
}
EXPORT_SYMBOL(poll_recv);
int ret;
struct fipc_message *m;
noinline struct ipc_message *async_recv(struct ttd_ring_channel *rx, unsigned long msg_id)
{
struct ipc_message *recv_msg;
while( true )
{
recv_msg = get_rx_rec(rx, sizeof(struct ipc_message));
if( !check_rx_slot_available(recv_msg) ) //if slot is available
{
if( recv_msg->msg_id == msg_id )
{
break;
}
else
{
if( recv_msg->msg_type == msg_type_response )
{
printk(KERN_ERR "CALLING YIELD TO\n");
THCYieldToId((uint32_t) recv_msg->msg_id, (uint32_t) msg_id);
}
else
{
THCYieldAndSave((uint32_t) msg_id);
}
}
}
else
{
THCYieldAndSave((uint32_t) msg_id);
fipc_mutex_lock(&chnl->rx.lock);
ret = recv_msg_peek(chnl, &m);
if (!ret) {
/* Message waiting to be received; query predicate */
if (pred(m, data)) {
/* Caller wants the message */
*msg = m;
inc_rx_slot(chnl);
} else {
ret = -ENOMSG;
}
}
printk(KERN_ERR "REMOVING ID: %d\n", (uint32_t) msg_id);
awe_mapper_remove_id((uint32_t)msg_id);
inc_rx_slot(rx);
return recv_msg;
}
EXPORT_SYMBOL(async_recv);
fipc_mutex_unlock(&chnl->rx.lock);
struct ipc_message *get_send_slot(struct ttd_ring_channel *tx)
{
struct ipc_message *msg =
(struct ipc_message *) get_tx_rec(tx, sizeof(struct ipc_message));
wait_for_tx_slot(msg);
return msg;
return ret;
}
EXPORT_SYMBOL(get_send_slot);
void connect_channels(struct ttd_ring_channel *chan1,
struct ttd_ring_channel *chan2)
int fipc_recv_msg_end(struct fipc_ring_channel *chnl,
struct fipc_message *msg)
{
/* exchange pointers and sizes */
memcpy(&chan1->rx, &chan2->tx, sizeof(struct ttd_buf));
memcpy(&chan2->rx, &chan1->tx, sizeof(struct ttd_buf));
msg->msg_status = FIPC_MSG_STATUS_AVAILABLE;
return 0;
}
EXPORT_SYMBOL(connect_channels);
/* Notify the buffer that the message slot is available and can be re-used */
void transaction_complete(struct ipc_message *msg)
int fipc_init(void)
{
msg->msg_status = trans_complete;
return 0;
}
EXPORT_SYMBOL(transaction_complete);
int ipc_start_thread(struct task_struct* thread)
void fipc_fini(void)
{
return wake_up_process(thread);
return;
}
EXPORT_SYMBOL(ipc_start_thread);
......@@ -50,7 +50,7 @@
*
* There are few steps:
*
* 1 - Allocate the shared memory buffers
* 1 - Allocate and initialize the shared memory buffers
*
* 2 - Allocate the headers (struct fipc_ring_channel's). These
* can be statically allocated (e.g. global variables).
......@@ -61,7 +61,7 @@
* share them with Thread 2 (how this is done depends on the environment).
* Thread 1 and Thread 2 will allocate their private headers, and initialize
* them to point to the allocated memory buffers. Here is how this looks
* for Thread 1 using the libfipc interface:
* for Thread 1 using the libfipc interface (return values ignored):
*
* --------
* Thread 1
......@@ -70,14 +70,17 @@
* struct fipc_ring_channel t1_chnl_header;
*
* // Allocate shared memory buffers
* unsigned int buf_nr_pages_order = .. buffers are 2^buf_nr_pages_order ..
* unsigned int buf_order = .. buffers are 2^buf_order bytes ..
* char *buffer_1 = ... alloc memory buffer 1 ...
* char *buffer_2 = ... alloc memory buffer 1 ...
*
* // Initialize the shared buffers (*required*)
* fipc_prep_buffers(buf_order, buffer_1, buffer_2);
*
* .... share buffers with Thread 2 ...
*
* // Initialize my struct fipc_ring_channel
* fipc_ring_channel_init(&t1_chnl_header, buf_nr_pages_order,
* // Initialize my struct fipc_ring_channel header
* fipc_ring_channel_init(&t1_chnl_header, buf_order,
* buffer_1,
* buffer_2);
*
......@@ -151,6 +154,8 @@
*
* This should be invoked before any use of libfipc functions.
*
* Returns non-zero on initialization failure.
*
* Note: This is a no-op for now, but gives us a chance later to have
* some init code if necessary (internal caches, whatever).
*/
......@@ -161,6 +166,21 @@ int fipc_init(void);
* This should be invoked when finished using libfipc functions.
*/
void fipc_fini(void);
/**
* fipc_prep_buffers -- Initialize the shared memory buffers for ipc
* @buf_order: both buffers are 2^buf_order bytes
* @buffer_1, @buffer_2: buffers used for channel
*
* This *must* be called by exactly one of the sides of the channel
* before using the channel (probably the same thread that allocated the
* buffers themselves should call this). It initializes the slots in
* the shared buffers.
*
* @buffer_1 and @buffer_2 *must* be exactly 2^buf_order bytes (if not,
* your memory will be corrupted), and the buffers must be big enough
* to fit at least one struct fipc_message.
*/
int fipc_prep_buffers(unsigned int buf_order, void *buffer_1, void *buffer_2);
/**
* fipc_ring_channel_init -- Initialize ring channel header with buffers
* @chnl: the struct fipc_ring_channel to initialize
......
......@@ -8,6 +8,13 @@
#ifndef LIBFIPC_TYPES_H
#define LIBFIPC_TYPES_H
#include <libfipc_platform_types.h>
/**
* Assumed cacheline size, in bytes.
*/
#define FIPC_CACHE_LINE_SIZE 64
/**
* struct fipc_message
*
......@@ -17,7 +24,7 @@
* to track the status of individual message slots in the IPC ring buffer.
*
* XXX: This probably needs to be arch-specific in order to fit in a
* cache line.
* cache line, and to ensure that msg_status can be updated atomically.
*
* XXX: The size of the message must be a power of 2.
*/
......@@ -48,36 +55,44 @@ struct fipc_message {
* above.)
*