Commit c1baec23 authored by Michael Quigley's avatar Michael Quigley
Browse files

got fully functional async example working

parent cf15758d
......@@ -93,7 +93,7 @@ static struct task_struct *attach_data_to_channel(void *chan_data,
int CPU_PIN,
int (*threadfn)(void *data)) {
struct cpumask cpu_core;
struct task_struct* thread;
if (!chan_data)
return NULL;
......@@ -102,7 +102,7 @@ static struct task_struct *attach_data_to_channel(void *chan_data,
return NULL;
}
struct task_struct* thread = kthread_create(threadfn, chan_data,
thread = kthread_create(threadfn, chan_data,
"AsyncIPC.%u", CPU_PIN);
if (IS_ERR(thread)) {
......@@ -126,7 +126,8 @@ struct task_struct * attach_channels_to_thread(ttd_ring_channel_group_t *chan_gr
int CPU_PIN,
int (*threadfn)(void *data))
{
return attach_data_to_channel((void *)chan_group, CPU_PIN, threadfn);
chan_group->thread = attach_data_to_channel((void *)chan_group, CPU_PIN, threadfn);
return chan_group->thread;
}
EXPORT_SYMBOL(attach_channels_to_thread);
......@@ -227,15 +228,15 @@ the function is finished will be the index of the ipc that has a message.
NOTE: right now this just checks the first rx slot for each channel that previously didn't have a message.
To make this check for everything where there could be a message, it would need to check the interval [rx, tx]
*/
bool poll_recv(struct ttd_ring_channel** rx_chans, int chans_num, int* curr_ind, struct ipc_message** msg)
bool poll_recv(struct ttd_ring_channel_group* rx_group, int* curr_ind, struct ipc_message** msg)
{
struct ttd_ring_channel* curr_chan;
struct ipc_message *recv_msg;
int i;
for( i = 0; i < chans_num; i++ )
for( i = 0; i < rx_group->chans_length; i++ )
{
*curr_ind = ((*curr_ind) + i) % chans_num;
curr_chan = rx_chans[*curr_ind];
*curr_ind = ((*curr_ind) + i) % (rx_group->chans_length);
curr_chan = rx_group->chans[*curr_ind];
recv_msg = get_rx_rec(curr_chan, sizeof(struct ipc_message));
if( !check_rx_slot_available(recv_msg) ) //if message exists
......
......@@ -51,7 +51,7 @@ struct task_struct *attach_channels_to_thread(ttd_ring_channel_group_t *chan_gro
void free_channel(struct ttd_ring_channel *channel);
void send(struct ttd_ring_channel *tx, struct ipc_message *trans);
struct ipc_message *recv(struct ttd_ring_channel *rx);
bool poll_recv(struct ttd_ring_channel** rx_chans, int chans_num, int* curr_ind, struct ipc_message** msg);
bool poll_recv(struct ttd_ring_channel_group* rx_group, int* curr_ind, struct ipc_message** msg);
struct ipc_message *async_recv(struct ttd_ring_channel *rx, unsigned long msg_id);
struct ipc_message *get_send_slot(struct ttd_ring_channel *tx);
void transaction_complete(struct ipc_message *msg);
......
......@@ -17,6 +17,7 @@
#include <linux/string.h>
#include <linux/types.h>
#include <linux/slab.h>
#include "../IPC/ipc.h"
struct ttd_buf {
......@@ -39,11 +40,31 @@ struct ttd_ring_channel {
struct ttd_ring_channel_group
{
struct ttd_ring_channel **chans;
int chans_length;
size_t chans_length;
struct task_struct *thread;
};
static inline void channel_group_alloc(struct ttd_ring_channel_group* channel_group, size_t chans_length)
{
struct ttd_ring_channel **chans_arr = (struct ttd_ring_channel **)kzalloc(
sizeof(struct ttd_ring_channel*)*chans_length,
GFP_KERNEL);
if( !chans_arr )
{
pr_err("could not allocate memory for ring channel group\n");
return;
}
channel_group->chans = chans_arr;
channel_group->chans_length = chans_length;
}
static inline void channel_group_free(struct ttd_ring_channel_group* channel_group)
{
kfree(channel_group->chans);
}
static inline void ttd_ring_channel_init(struct ttd_ring_channel *ring_channel)
{
......
......@@ -9,7 +9,7 @@ CFLAGS_ipc.o = -O2 -DPOLL
CFLAGS_ring-channel.o = -I$(INC_DIR)
DEPS= $(INC_DIR)/ipc.h
async_dispatch-objs := $(ASYNC_SRC)/awe-mapper.o ../../IPC/ipc.o ../../ring-chan/ring-channel.o $(ASYNC_SRC)/thc.o $(ASYNC_SRC)/thcsync.o ipc_dispatch.o thread1_fn1.o thread2_fn1.o async_tester.o
async_dispatch-objs := $(ASYNC_SRC)/awe-mapper.o ../../IPC/ipc.o ../../ring-chan/ring-channel.o $(ASYNC_SRC)/thc.o $(ASYNC_SRC)/thcsync.o ipc_dispatch.o thread1_fn1.o thread2_fn1.o thread3_fn1.o async_tester.o
KDIR := /lib/modules/`uname -r`/build
......
......@@ -21,43 +21,74 @@
#include "thread_fn_util.h"
#include "../../IPC/ipc.h"
#define CHAN_NUM_PAGES 4
#define CHAN1_CPU 1
#define CHAN2_CPU 3
#define THREAD1_CPU 1
#define THREAD2_CPU 3
#define THREAD3_CPU 4
MODULE_LICENSE("GPL");
static struct ttd_ring_channel *chan1;
static struct ttd_ring_channel *chan2;
static struct ttd_ring_channel_group group1;
static struct ttd_ring_channel_group group2;
static struct ttd_ring_channel_group group3;
static void alloc_chans(struct ttd_ring_channel_group* chan_group, int num_pages)
{
int i;
for(i = 0; i < chan_group->chans_length; i++)
{
chan_group->chans[i] = create_channel(num_pages);
if ( !chan_group->chans[i] )
{
pr_err("Failed to create channel\n");
return;
}
}
}
static void setup_tests(void)
{
chan1 = create_channel(CHAN_NUM_PAGES);
if (!chan1) {
pr_err("Failed to create channel 1");
return;
}
chan2 = create_channel(CHAN_NUM_PAGES);
if (!chan2) {
pr_err("Failed to create channel 2");
free_channel(chan1);
return;
}
connect_channels(chan1, chan2);
/* Create a thread for each channel to utilize, pin it to a core.
* Pass a function pointer to call on wakeup.
*/
struct task_struct *thread1 = attach_thread_to_channel(chan1, CHAN1_CPU, thread1_fn1);
struct task_struct *thread2 = attach_thread_to_channel(chan2, CHAN2_CPU, thread2_fn1);
if ( thread1 == NULL || thread2 == NULL ) {
ttd_ring_channel_free(chan1);
ttd_ring_channel_free(chan2);
kfree(chan1);
kfree(chan2);
return;
}
ipc_start_thread(thread1);
ipc_start_thread(thread2);
const size_t thread1_chans = 1;
const size_t thread2_chans = 2;
const size_t thread3_chans = 1;
channel_group_alloc(&group1, thread1_chans);
channel_group_alloc(&group2, thread2_chans);
channel_group_alloc(&group3, thread3_chans);
alloc_chans(&group1, CHAN_NUM_PAGES);
alloc_chans(&group2, CHAN_NUM_PAGES);
alloc_chans(&group3, CHAN_NUM_PAGES);
connect_channels(group1.chans[0], group2.chans[0]);
connect_channels(group2.chans[1], group3.chans[0]);
/* Create a thread for each channel to utilize, pin it to a core.
* Pass a function pointer to call on wakeup.
*/
attach_channels_to_thread(&group1,
THREAD1_CPU,
thread1_fn1);
attach_channels_to_thread(&group2,
THREAD2_CPU,
thread2_fn1);
attach_channels_to_thread(&group3,
THREAD3_CPU,
thread3_fn1);
if ( group1.thread == NULL || group2.thread == NULL || group3.thread == NULL) {
ttd_ring_channel_free(group1.chans[0]);
ttd_ring_channel_free(group2.chans[0]);
ttd_ring_channel_free(group3.chans[0]);
channel_group_free(&group1);
channel_group_free(&group2);
channel_group_free(&group3);
return;
}
ipc_start_thread(group1.thread);
ipc_start_thread(group2.thread);
ipc_start_thread(group3.thread);
}
static int __init async_dispatch_start(void)
......
......@@ -7,7 +7,8 @@
#include "thread_fn_util.h"
int ipc_dispatch_loop(ipc_local_fn_t* fns, int fns_length,struct ttd_ring_channel** rx_chans, int chans_num)
//max_recv_ct just for testing
int ipc_dispatch_loop(ipc_local_fn_t* fns, int fns_length,struct ttd_ring_channel_group* rx_group, int max_recv_ct)
{
volatile void ** frame = (volatile void**)__builtin_frame_address(0);
volatile void *ret_addr = *(frame + 1);
......@@ -15,16 +16,16 @@ int ipc_dispatch_loop(ipc_local_fn_t* fns, int fns_length,struct ttd_ring_channe
*(frame + 1) = NULL;
//NOTE:recv_ct is just for testing
DO_FINISH({
DO_FINISH_(0,{
int curr_ind = 0;
int* curr_ind_pt = &curr_ind;
struct ipc_message* curr_msg;
uint32_t do_finish_awe_id = awe_mapper_create_id();
while( recv_ct < TRANSACTIONS )
while( recv_ct < max_recv_ct )
{
curr_ind = 0;
if( poll_recv(rx_chans, chans_num, curr_ind_pt, &curr_msg) )
if( poll_recv(rx_group, curr_ind_pt, &curr_msg) )
{
recv_ct++;
......@@ -41,11 +42,12 @@ int ipc_dispatch_loop(ipc_local_fn_t* fns, int fns_length,struct ttd_ring_channe
int i;
for(i = 0; i < fns_length; i++)
{
printk(KERN_ERR "fn_type: %d\n", curr_msg->fn_type);
if( curr_msg->fn_type == fns[i].fn_type )
{
printk(KERN_ERR "calling fn: %d\n", fns[i].fn_type);
fns[i].local_fn(rx_chans[*curr_ind_pt], curr_msg, NULL);
ASYNC_({
fns[i].local_fn(rx_group, *curr_ind_pt, curr_msg);
},0xdeadbeef);
}
}
}
......
......@@ -4,8 +4,8 @@
typedef struct ipc_local_fn{
int fn_type;
int (*local_fn)(struct ttd_ring_channel*,struct ipc_message*, void*);
int (*local_fn)(struct ttd_ring_channel_group*, int, struct ipc_message*);
} ipc_local_fn_t;
int ipc_dispatch_loop(ipc_local_fn_t* fns, int fns_length,struct ttd_ring_channel** rx_chans, int chans_num);
int ipc_dispatch_loop(ipc_local_fn_t* fns, int fns_length,struct ttd_ring_channel_group* rx_group, int max_recv_ct);
#endif
......@@ -7,12 +7,12 @@
static struct ttd_ring_channel* channel;
static unsigned long add_2_nums_async(unsigned long lhs, unsigned long rhs, unsigned long msg_id)
static unsigned long add_nums_async(unsigned long lhs, unsigned long rhs, unsigned long msg_id, int fn_type)
{
struct ipc_message *msg;
unsigned long result;
msg = get_send_slot(channel);
msg->fn_type = ADD_2_FN;
msg->fn_type = fn_type;
msg->reg1 = lhs;
msg->reg2 = rhs;
msg->msg_id = msg_id;
......@@ -28,43 +28,43 @@ static unsigned long add_2_nums_async(unsigned long lhs, unsigned long rhs, unsi
int thread1_fn1(void* chan)
int thread1_fn1(void* group)
{
volatile void ** frame = (volatile void**)__builtin_frame_address(0);
volatile void *ret_addr = *(frame + 1);
int num_transactions = 0;
uint32_t id_num;
*(frame + 1) = NULL;
channel = chan;
struct ttd_ring_channel_group *rcg = (struct ttd_ring_channel_group*)group;
channel = rcg->chans[0];
thc_init();
DO_FINISH(
while (num_transactions < TRANSACTIONS / 3) {
if((num_transactions * 3) % 10 == 0)
{
printk(KERN_ERR "num_transactions: %d\n", num_transactions * 3);
}
DO_FINISH_(1,{
while (num_transactions < TRANSACTIONS) {
printk(KERN_ERR "num_transactions: %d\n", num_transactions);
ASYNC(
id_num = awe_mapper_create_id();
printk(KERN_ERR "ID_NUM: %d\n", id_num);
add_2_nums_async(num_transactions, 1,(unsigned long) id_num);
add_nums_async(num_transactions, 1,(unsigned long) id_num, ADD_2_FN);
);
ASYNC(
id_num = awe_mapper_create_id();
printk(KERN_ERR "ID_NUM: %d\n", id_num);
add_2_nums_async(num_transactions, 2,(unsigned long) id_num);
add_nums_async(num_transactions, 2,(unsigned long) id_num, ADD_10_FN);
);
ASYNC(
id_num = awe_mapper_create_id();
printk(KERN_ERR "ID_NUM: %d\n", id_num);
add_2_nums_async(num_transactions, 3,(unsigned long) id_num);
add_nums_async(num_transactions, 3,(unsigned long) id_num, ADD_2_FN);
);
num_transactions++;
});
}});
pr_err("Complete\n");
printk(KERN_ERR "lcd async exiting module and deleting ptstate");
thc_done();
......
......@@ -3,12 +3,14 @@
#include "thcinternal.h"
#include "ipc_dispatch.h"
#include "thread_fn_util.h"
#include "awe-mapper.h"
#define THREAD2_FNS_LENGTH 2
#define FNS_LENGTH 1
#define CHANS_NUM 1
int add_2_fn(struct ttd_ring_channel* chan, struct ipc_message* msg, void* params)
//Just returns a value back to thread 1
static int add_2_fn(struct ttd_ring_channel_group* group, int channel_index, struct ipc_message* msg)
{
struct ttd_ring_channel * chan = group->chans[channel_index];
unsigned long result = msg->reg1 + msg->reg2;
struct ipc_message* out_msg = get_send_slot(chan);
out_msg->reg1 = result;
......@@ -20,15 +22,53 @@ int add_2_fn(struct ttd_ring_channel* chan, struct ipc_message* msg, void* param
return 0;
}
ipc_local_fn_t functions[FNS_LENGTH] = {
{ADD_2_FN, add_2_fn}
//Receives a value from thread1, then passes it to thread 3 and returns that result to thread 1
static int add_10_fn(struct ttd_ring_channel_group* group, int channel_index, struct ipc_message* msg)
{
struct ttd_ring_channel * thread1_chan = group->chans[0];
struct ttd_ring_channel * thread3_chan = group->chans[1];
struct ipc_message* thread3_msg = get_send_slot(thread3_chan);
struct ipc_message* thread1_result;
unsigned long saved_msg_id = msg->msg_id;
unsigned long new_msg_id = awe_mapper_create_id();
if( channel_index != 0 )
{
printk(KERN_ERR "CHANNEL INDEX WRONG\n");
}
thread3_msg->fn_type = msg->fn_type;
thread3_msg->reg1 = msg->reg1;
thread3_msg->reg2 = msg->reg2;
thread3_msg->msg_id = new_msg_id;
thread3_msg->msg_type = msg_type_request;
send(thread3_chan,thread3_msg);
msg = async_recv(thread3_chan, new_msg_id);
thread1_result = get_send_slot(thread1_chan);
thread1_result->fn_type = msg->fn_type;
thread1_result->reg1 = msg->reg1;
thread1_result->reg2 = msg->reg2;
thread1_result->msg_id = saved_msg_id;
thread1_result->msg_type = msg_type_response;
send(thread1_chan,thread1_result);
return 0;
}
static ipc_local_fn_t functions[THREAD2_FNS_LENGTH] = {
{ADD_2_FN, add_2_fn},
{ADD_10_FN, add_10_fn}
};
int thread2_fn1(void* chan)
int thread2_fn1(void* group)
{
struct ttd_ring_channel* rx_chan = chan;
struct ttd_ring_channel_group* rx_group = group;
thc_init();
ipc_dispatch_loop(functions, FNS_LENGTH, &rx_chan, CHANS_NUM);
ipc_dispatch_loop(functions, THREAD2_FNS_LENGTH, rx_group, 3);
thc_done();
return 1;
......
......@@ -2,9 +2,11 @@
#define THREAD_FN_UTIL_H
#define ADD_2_FN 1
#define TRANSACTIONS 6
#define ADD_10_FN 2
#define TRANSACTIONS 1
int thread1_fn1(void* chan);
int thread2_fn1(void* chan);
int thread1_fn1(void* data);
int thread2_fn1(void* data);
int thread3_fn1(void* data);
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment