Commit 05f39972 authored by Michael Quigley's avatar Michael Quigley
Browse files

got simple test for dispatch loop with two threads working.

parent 889568ba
......@@ -36,6 +36,7 @@ awe_t* get_awe_from_msg_id(unsigned long msg_id)
return (awe_t*)msg_id;
}
EXPORT_SYMBOL(get_awe_from_msg_id);
static inline void monitor_mwait(unsigned long rcx, volatile uint32_t *rax,
unsigned long wait_type)
......@@ -121,6 +122,7 @@ struct task_struct *attach_thread_to_channel(struct ttd_ring_channel *chan,
return chan->thread;
}
EXPORT_SYMBOL(attach_thread_to_channel);
/*
* Create a channel with a ring-buffer of size pages
......@@ -201,25 +203,25 @@ EXPORT_SYMBOL(recv);
Takes an array of rx channels to iterate over. This function does one
loop over the array and populates 'msg' with a received message and returns true
if there is a message, else it returns false.
start_ind: the index to start iterating and wrap around to
curr_ind: the index to start iterating and wrap around to. The value of this when
the function is finished will be the index of the ipc that has a message.
NOTE: right now this just checks the first rx slot for each channel that previously didn't have a message.
To make this check for everything where there could be a message, it would need to check the interval [rx, tx]
*/
bool poll_recv(struct ttd_ring_channel** rx_chans, int chans_num, int start_ind, struct ipc_message* msg)
bool poll_recv(struct ttd_ring_channel** rx_chans, int chans_num, int* curr_ind, struct ipc_message** msg)
{
int curr_ind;
struct ttd_ring_channel* curr_chan;
struct ipc_message *recv_msg;
for( int i = 0; i < chans_num; i++ )
int i;
for( i = 0; i < chans_num; i++ )
{
curr_ind = (start_ind + i) % chans_num;
curr_chan = rx_chans[curr_ind];
*curr_ind = ((*curr_ind) + i) % chans_num;
curr_chan = rx_chans[*curr_ind];
recv_msg = get_rx_rec(curr_chan, sizeof(struct ipc_message));
if( !check_rx_slot_available(imsg) ) //if message exists
if( !check_rx_slot_available(recv_msg) ) //if message exists
{
msg = recv_msg;
*msg = recv_msg;
inc_rx_slot(curr_chan);
return true;
}
......@@ -232,7 +234,7 @@ EXPORT_SYMBOL(poll_recv);
noinline struct ipc_message *async_recv(struct ttd_ring_channel *rx, unsigned long msg_id)
{
struct ipc_message *recv_msg;
void* pts = current->ptstate;
while( true )
{
recv_msg = get_rx_rec(rx, sizeof(struct ipc_message));
......@@ -248,7 +250,14 @@ noinline struct ipc_message *async_recv(struct ttd_ring_channel *rx, unsigned lo
//printk(KERN_ERR "MESSAGE ID RECEIVED IS: %lx\n", recv_msg->msg_id);
//printk(KERN_ERR "MESSAGE ID FOR CONTEXT IS: %lx\n", msg_id);
//printk(KERN_ERR "CALLING YIELD TO\n");
THCYieldToId((uint32_t) recv_msg->msg_id, (uint32_t) msg_id);
if( recv_msg->pts == pts )
{
THCYieldToId((uint32_t) recv_msg->msg_id, (uint32_t) msg_id);
}
else
{
THCYieldAndSave((uint32_t) msg_id);
}
}
}
else
......
......@@ -23,7 +23,11 @@ struct ipc_message{
unsigned long reg3;
unsigned long reg4;
unsigned long reg5;
unsigned long reg6;
#ifdef USE_ASYNC
unsigned long pts;
#else
unsigned long reg6;
#endif
unsigned long msg_id;
volatile uint32_t msg_status;
}__attribute__((packed));
......@@ -35,7 +39,7 @@ struct task_struct *attach_thread_to_channel(struct ttd_ring_channel *chan,
void free_channel(struct ttd_ring_channel *channel);
void send(struct ttd_ring_channel *tx, struct ipc_message *trans);
struct ipc_message *recv(struct ttd_ring_channel *rx);
bool poll_recv(struct ttd_ring_channel** rx_chans, int chans_num, int start_ind, struct ipc_message* msg);
bool poll_recv(struct ttd_ring_channel** rx_chans, int chans_num, int* curr_ind, struct ipc_message** msg);
struct ipc_message *async_recv(struct ttd_ring_channel *rx, unsigned long msg_id);
struct ipc_message *get_send_slot(struct ttd_ring_channel *tx);
void transaction_complete(struct ipc_message *msg);
......
obj-m := async_dispatch.o
INC_BASE_PATH=/local/sda4/xcap-async-module
ASYNC_SRC=../../../../libasync/src/common
ccflags-y += -O0 -I$(INC_BASE_PATH)/fast-ipc-module/current/IPC -I$(INC_BASE_PATH)/libasync/src/include -fno-ipa-cp -fno-ipa-sra -DUSE_ASYNC
EXTRA_CFLAGS=-DDEBUG_OUTPUT
INC_DIR = ../../IPC
CFLAGS_ipc.o = -O2 -DPOLL
CFLAGS_ring-channel.o = -I$(INC_DIR)
DEPS= $(INC_DIR)/ipc.h
async_dispatch-objs := $(ASYNC_SRC)/awe-mapper.o ../../IPC/ipc.o ../../ring-chan/ring-channel.o $(ASYNC_SRC)/thc.o $(ASYNC_SRC)/thcsync.o ipc_dispatch.o thread1_fn1.o thread2_fn1.o async_tester.o
KDIR := /lib/modules/`uname -r`/build
default:
make -C $(KDIR) M=$(PWD) modules
clean:
make -C $(KDIR) M=$(PWD) clean
rm ../../IPC/ipc.o
rm ../../ring-chan/ring-channel.o
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/miscdevice.h>
#include <linux/fs.h>
#include <linux/string.h>
#include <linux/slab.h>
#include <linux/sched.h>
#include <linux/irqflags.h>
#include <linux/kthread.h>
#include <linux/cpumask.h>
#include <linux/preempt.h>
#include <asm/uaccess.h>
#include <asm/mwait.h>
#include <asm/page_types.h>
#include <asm/cpufeature.h>
#include <linux/ktime.h>
#include <linux/sort.h>
#include <asm/tsc.h>
#include "thread_fn_util.h"
#include "../../IPC/ipc.h"
#define CHAN_NUM_PAGES 4
#define CHAN1_CPU 1
#define CHAN2_CPU 3
MODULE_LICENSE("GPL");
static struct ttd_ring_channel *chan1;
static struct ttd_ring_channel *chan2;
static void setup_tests(void)
{
chan1 = create_channel(CHAN_NUM_PAGES);
if (!chan1) {
pr_err("Failed to create channel 1");
return;
}
chan2 = create_channel(CHAN_NUM_PAGES);
if (!chan2) {
pr_err("Failed to create channel 2");
free_channel(chan1);
return;
}
connect_channels(chan1, chan2);
/* Create a thread for each channel to utilize, pin it to a core.
* Pass a function pointer to call on wakeup.
*/
if (attach_thread_to_channel(chan1, CHAN1_CPU, thread1_fn1) == NULL ||
attach_thread_to_channel(chan2, CHAN2_CPU, thread2_fn1) == NULL ) {
ttd_ring_channel_free(chan1);
ttd_ring_channel_free(chan2);
kfree(chan1);
kfree(chan2);
return;
}
ipc_start_thread(chan1);
ipc_start_thread(chan2);
}
static int __init async_dispatch_start(void)
{
#if defined(USE_MWAIT)
if (!this_cpu_has(X86_FEATURE_MWAIT))
{
printk(KERN_ERR "CPU does not have X86_FEATURE_MWAIT ");
return -EPERM;
}
#endif
setup_tests();
return 0;
}
static int __exit async_dispatch_end(void)
{
printk(KERN_ERR "done\n");
return 0;
}
module_init(async_dispatch_start);
module_exit(async_dispatch_end);
#include <stdio.h>
#include <thc.h>
#include <thcinternal.h>
#include "ipc.h"
#include "ipc_dispatch.h"
#include <awe-mapper.h>
#include <linux/types.h>
static int foo(unsigned long* data, int length)
{
for(int i = 0; i < length; i++)
{
printf("%lu\n", data[i]);
}
}
#include "thread_fn_util.h"
int ipc_dispatch_loop(ipc_local_fn_t* fns, int fns_length,struct ttd_ring_channel** rx_chans, int chans_num)
//ipc_channels, channel_length
{
int start_ind = 0;
struct ipc_message* curr_msg;
DO_FINISH(
while( true )
volatile void ** frame = (volatile void**)__builtin_frame_address(0);
volatile void *ret_addr = *(frame + 1);
*(frame + 1) = NULL;
//NOTE:recv_ct is just for testing
int recv_ct = 0;
DO_FINISH({
int curr_ind = 0;
int* curr_ind_pt = &curr_ind;
struct ipc_message* curr_msg;
uint32_t do_finish_awe_id = awe_mapper_create_id();
while( recv_ct < TRANSACTIONS )
{
if( poll_recv(rx_chans, chans_num, start_ind, curr_msg) )
curr_ind = 0;
if( poll_recv(rx_chans, chans_num, curr_ind_pt, &curr_msg) )
{
//check if curr_msg corresponds to existing awe,
recv_ct++;
printk(KERN_ERR "poll_recv returned\n");
//check if curr_msg corresponds to existing awe in this thread
if( curr_msg->pts == current->ptstate )
{
printk(KERN_ERR "yielding to\n");
THCYieldToId(curr_msg->msg_id, do_finish_awe_id);
}
//else find corresponding function and execute.
else
{
int i;
for(i = 0; i < fns_length; i++)
{
printk(KERN_ERR "fn_type: %d\n", curr_msg->fn_type);
if( curr_msg->fn_type == fns[i].fn_type )
{
printk(KERN_ERR "calling fn: %lu\n", fns[i].fn_type);
fns[i].local_fn(rx_chans[*curr_ind_pt], curr_msg, NULL);
}
}
}
}
}
);
return 0;
}
});
*(frame + 1) = ret_addr;
int main(void)
{
ipc_local_fn_t test_fn = {1, foo};
unsigned long data[] = {0,1,2,3};
test_fn.local_fn(&data[1], 2);
return 0;
}
EXPORT_SYMBOL(ipc_dispatch_loop);
#ifndef IPC_DISPATCH_H
#define IPC_DISPATCH_H
typedef struct ipc_local_fn{
unsigned long fn_type;
int (*local_fn)(unsigned long*, int);
int fn_type;
int (*local_fn)(struct ttd_ring_channel*,struct ipc_message*, void*);
} ipc_local_fn_t;
int ipc_dispatch_loop(ipc_local_fn_t* fns, int fns_length,struct ttd_ring_channel** rx_chans, int chans_num);
#endif
#include "thc.h"
#include "ipc.h"
#include "thcinternal.h"
#include "ipc_dispatch.h"
#include "thread_fn_util.h"
#include <awe-mapper.h>
static struct ttd_ring_channel* channel;
static unsigned long add_2_nums_async(unsigned long lhs, unsigned long rhs, unsigned long msg_id)
{
struct ipc_message *msg;
unsigned long result;
msg = get_send_slot(channel);
msg->fn_type = ADD_2_FN;
msg->reg1 = lhs;
msg->reg2 = rhs;
msg->msg_id = msg_id;
msg->pts = (unsigned long)current->ptstate;
send(channel,msg);
msg = async_recv(channel, msg_id);
result = msg->reg1;
printk(KERN_ERR "result is %lu\n", result);
transaction_complete(msg);
return result;
}
int thread1_fn1(void* chan)
{
volatile void ** frame = (volatile void**)__builtin_frame_address(0);
volatile void *ret_addr = *(frame + 1);
*(frame + 1) = NULL;
int num_transactions = 0;
channel = chan;
thc_init();
DO_FINISH(
uint32_t id_num;
while (num_transactions < TRANSACTIONS / 3) {
if((num_transactions * 3) % 10 == 0)
{
printk(KERN_ERR "num_transactions: %d\n", num_transactions * 3);
}
ASYNC(
id_num = awe_mapper_create_id();
printk(KERN_ERR "ID_NUM: %d\n", id_num);
add_2_nums_async(num_transactions, 1,(unsigned long) id_num);
);
ASYNC(
id_num = awe_mapper_create_id();
printk(KERN_ERR "ID_NUM: %d\n", id_num);
add_2_nums_async(num_transactions, 2,(unsigned long) id_num);
);
ASYNC(
id_num = awe_mapper_create_id();
printk(KERN_ERR "ID_NUM: %d\n", id_num);
add_2_nums_async(num_transactions, 3,(unsigned long) id_num);
);
num_transactions++;
});
pr_err("Complete\n");
printk(KERN_ERR "lcd async exiting module and deleting ptstate");
thc_done();
*(frame + 1) = ret_addr;
return 1;
}
#include "thc.h"
#include "ipc.h"
#include "thcinternal.h"
#include "ipc_dispatch.h"
#include "thread_fn_util.h"
#define FNS_LENGTH 1
#define CHANS_NUM 1
int add_2_fn(struct ttd_ring_channel* chan, struct ipc_message* msg, void* params)
{
unsigned long result = msg->reg1 + msg->reg2;
struct ipc_message* out_msg = get_send_slot(chan);
out_msg->reg1 = result;
out_msg->msg_id = msg->msg_id;
out_msg->fn_type = ADD_2_FN;
out_msg->pts = msg->pts;
send(chan, out_msg);
return 0;
}
ipc_local_fn_t functions[FNS_LENGTH] = {
{ADD_2_FN, add_2_fn}
};
int thread2_fn1(void* chan)
{
struct ttd_ring_channel* rx_chan = chan;
thc_init();
ipc_dispatch_loop(functions, FNS_LENGTH, &rx_chan, CHANS_NUM);
thc_done();
return 1;
}
#ifndef THREAD_FN_UTIL_H
#define THREAD_FN_UTIL_H
#define ADD_2_FN 1
#define TRANSACTIONS 6
int thread1_fn1(void* chan);
int thread2_fn1(void* chan);
#endif
obj-m := rpc_testing.o
ccflags-y += -O0 -I/users/mquigley/fast-ipc-module/current/IPC -fno-ipa-cp -fno-ipa-sra
ccflags-y += -O0 -I../../IPC -fno-ipa-cp -fno-ipa-sra -DUSE_ASYNC
EXTRA_CFLAGS=-DDEBUG_OUTPUT
INC_DIR = ../../IPC
CFLAGS_ipc.o = -O2 -DPOLL
......
......@@ -93,7 +93,7 @@ static unsigned long add_nums_async(unsigned long trans, unsigned long res1, uns
msg->reg3 = res1;
msg->reg4 = res1;
msg->reg5 = res1;
msg->reg6 = res1;
//msg->reg6 = res1;
msg->msg_id = msg_id;
send(channel,msg);
msg = async_recv(channel, msg_id);
......@@ -178,7 +178,7 @@ static unsigned long add_6_nums(unsigned long trans, unsigned long res1,
msg->reg3 = res2;
msg->reg4 = res3;
msg->reg5 = res4;
msg->reg6 = res5;
msg->pts = res5;
send(channel,msg);
msg = recv(channel);
if (msg->fn_type != ADD_6_NUMS)
......
......@@ -112,7 +112,7 @@ int caller(void *channel)
break;
case ADD_6_NUMS:
temp_res = add_6_nums(msg->reg1, msg->reg2, msg->reg3,
msg->reg4, msg->reg5, msg->reg6);
msg->reg4, msg->reg5, msg->pts);
transaction_complete(msg);
msg = get_send_slot(chan);
msg->fn_type = ADD_6_NUMS;
......
......@@ -6,7 +6,7 @@ typedef enum {NULL_INVOCATION, ADD_CONSTANT, ADD_NUMS, ADD_3_NUMS,
ADD_4_NUMS, ADD_5_NUMS, ADD_6_NUMS} fn_types;
/* must be divisible by 6... because I call 6 functions in the Callee.c */
#define TRANSACTIONS 6000
#define TRANSACTIONS 60
int callee(void *chan);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment