Commit 5cf9cb01 authored by Vikram Narayanan's avatar Vikram Narayanan
Browse files

lcd/nullnet: Integrate sender side dispatch loop



for the below loop, it takes 625 cycles for a single ASYNC with sender side
dispatch loop

for (1..10^6)
  DO_FINISH
    ASYNC(xmit())

ndo_start_xmit_bare_async, 1000000 ch3 transactions took 623
Signed-off-by: Vikram Narayanan's avatarVikram Narayanan <vikram186@gmail.com>
parent cff93fbe
......@@ -39,6 +39,8 @@ struct lcd_sk_buff_container {
struct cptr my_ref, other_ref;
struct sk_buff skbuff;
uint64_t tid;
void *chnl;
unsigned int cookie;
};
int glue_nullnet_init(void)
......@@ -986,6 +988,36 @@ fail_insert:
TS_DECL(ipc_send);
TS_DECL(hlookup);
#ifdef CONSUME_SKB_NO_HASHING
void consume_skb(struct sk_buff *skb)
{
int ret;
struct fipc_message *request;
struct lcd_sk_buff_container *skb_c;
struct thc_channel *channel;
skb_c = container_of(skb,
struct lcd_sk_buff_container, skbuff);
channel = (struct thc_channel*) skb_c->chnl;
ret = async_msg_blocking_send_start(channel, &request);
if (ret) {
LIBLCD_ERR("failed to get a send slot");
goto fail_async;
}
async_msg_set_fn_type(request, CONSUME_SKB);
thc_set_msg_type(request, msg_type_request);
thc_set_msg_id(request, skb_c->cookie);
fipc_send_msg_end (thc_channel_to_fipc(channel), request);
fail_async:
return;
}
#else
void consume_skb(struct sk_buff *skb)
{
#ifdef LCD_SKB_CONTAINER
......@@ -1080,6 +1112,8 @@ fail_virt:
return;
}
#endif /* CONSUME_SKB_NO_HASHING */
// DONE
int ndo_init_callee(struct fipc_message *request, struct thc_channel *channel, struct glue_cspace *cspace, struct cptr sync_ep)
{
......@@ -1234,6 +1268,80 @@ int ndo_start_xmit_bare_callee(struct fipc_message *_request, struct thc_channel
return 0;
}
#ifdef CONSUME_SKB_NO_HASHING
int ndo_start_xmit_noawe_callee(struct fipc_message *_request, struct thc_channel *channel, struct glue_cspace *cspace, struct cptr sync_ep)
{
struct lcd_sk_buff_container static_skb_c;
struct lcd_sk_buff_container *skb_c = &static_skb_c;
struct sk_buff *skb = &skb_c->skbuff;
struct fipc_message *response;
int ret;
#ifdef COPY
struct skbuff_members *skb_lcd;
#endif
#ifndef NO_MARSHAL
xmit_type_t xmit_type;
unsigned long skbh_offset, skb_end;
__be16 proto;
u32 len;
cptr_t skb_ref;
xmit_type = fipc_get_reg0(_request);
skb_ref = __cptr(fipc_get_reg2(_request));
skbh_offset = fipc_get_reg3(_request);
skb_end = fipc_get_reg4(_request);
proto = fipc_get_reg5(_request);
len = fipc_get_reg6(_request);
#endif
fipc_recv_msg_end(thc_channel_to_fipc(channel),
_request);
#ifndef NO_MARSHAL
skb->head = (char*)data_pool + skbh_offset;
skb->end = skb_end;
skb->len = len;
skb->private = true;
#endif
#ifdef COPY
skb_lcd = SKB_LCD_MEMBERS(skb);
P(len);
P(data_len);
P(queue_mapping);
P(xmit_more);
P(tail);
P(truesize);
P(ip_summed);
P(csum_start);
P(network_header);
P(csum_offset);
P(transport_header);
skb->data = skb->head + skb_lcd->head_data_off;
#endif
skb_c->chnl = channel;
ret = dummy_xmit(skb, NULL);
if (async_msg_blocking_send_start(channel, &response)) {
LIBLCD_ERR("error getting response msg");
return -EIO;
}
fipc_set_reg1(response, ret);
thc_set_msg_type(response, msg_type_response);
fipc_send_msg_end(thc_channel_to_fipc(channel), response);
return ret;
}
#else
int ndo_start_xmit_noawe_callee(struct fipc_message *_request, struct thc_channel *channel, struct glue_cspace *cspace, struct cptr sync_ep)
{
#if defined(NO_HASHING) && defined(STATIC_SKB)
......@@ -1329,6 +1437,7 @@ int ndo_start_xmit_noawe_callee(struct fipc_message *_request, struct thc_channe
//printk("%s, response sent\n", __func__);
return ret;
}
#endif /* CONSUME_SKB_NO_HASHING */
/* Function to test bare async. This function receives the IPC and
* sends back a response
......@@ -1337,13 +1446,19 @@ int ndo_start_xmit_async_bare_callee(struct fipc_message *_request, struct thc_c
{
struct fipc_message *response;
unsigned int request_cookie;
struct lcd_sk_buff_container static_skb_c;
struct lcd_sk_buff_container *skb_c = &static_skb_c;
struct sk_buff *skb = &skb_c->skbuff;
request_cookie = thc_get_request_cookie(_request);
fipc_recv_msg_end(thc_channel_to_fipc(channel),
_request);
//dummy_xmit(skb, NULL);
skb_c->chnl = channel;
skb_c->cookie = request_cookie;
dummy_xmit(skb, NULL);
if (async_msg_blocking_send_start(channel, &response)) {
LIBLCD_ERR("error getting response msg");
......@@ -1374,7 +1489,7 @@ int ndo_start_xmit_callee(struct fipc_message *_request, struct thc_channel *cha
#else
struct sk_buff_container *skb_c;
#endif
uint64_t pid;
uint64_t pid = 0;
#endif /* NO_HASHING */
#ifndef NOLOOKUP
......@@ -1450,7 +1565,7 @@ int ndo_start_xmit_callee(struct fipc_message *_request, struct thc_channel *cha
LIBLCD_MSG("no memory");
#endif /* STATIC_SKB */
#ifdef LCD_SKB_CONTAINER
#if defined(LCD_SKB_CONTAINER)
skb = &skb_c->skbuff;
skb_c->tid = pid;
#else
......
......@@ -79,8 +79,11 @@ struct skbuff_members {
#define P(x) skb->x = skb_lcd->x
#define CONSUME_SKB_SEND_ONLY
#define CONSUME_SKB_NO_HASHING
#define SENDER_DISPATCH_LOOP
//#define NO_AWE
#define NO_HASHING
//#define NO_HASHING
//#define NO_MARSHAL
//#define DOUBLE_HASHING
......
#ifndef IPC_HELPER_H
#define IPC_HELPER_H
#define FIPC_MSG_STATUS_AVAILABLE 0xdeaddeadUL
#define FIPC_MSG_STATUS_SENT 0xfeedfeedUL
#define fipc_test_pause() asm volatile ( "pause\n": : :"memory" );
static inline unsigned long inc_rx_slot(struct fipc_ring_channel *rc)
{
return (rc->rx.slot++);
}
static inline unsigned long get_rx_idx(struct fipc_ring_channel *rc)
{
return rc->rx.slot & rc->rx.order_two_mask;
}
static inline struct fipc_message*
get_current_rx_slot(struct fipc_ring_channel *rc)
{
return &rc->rx.buffer[get_rx_idx(rc)];
}
static inline int check_rx_slot_msg_waiting(struct fipc_message *slot)
{
return slot->msg_status == FIPC_MSG_STATUS_SENT;
}
static inline int
thc_ipc_recv_response_inline(struct thc_channel* channel, uint32_t id,
struct fipc_message** out)
{
int ret;
int received_cookie;
PTState_t *pts = PTS();
retry:
while ( 1 )
{
// Poll until we get a message or error
*out = get_current_rx_slot( thc_channel_to_fipc(channel));
if ( ! check_rx_slot_msg_waiting( *out ) )
{
// No messages to receive, yield to next async
#ifndef BASE_CASE_NOASYNC
#if 1
if (pts->reached_dofin) {
fipc_test_pause();
continue;
}
#endif
THCYieldAndSave(id);
#else
fipc_test_pause();
#endif
continue;
}
break;
}
#ifndef BASE_CASE_NOASYNC
received_cookie = thc_get_msg_id(*out);
if (received_cookie == id) {
#endif
inc_rx_slot( thc_channel_to_fipc(channel) );
return 0;
#ifndef BASE_CASE_NOASYNC
}
#endif
#ifndef BASE_CASE_NOASYNC
ret = THCYieldToIdAndSave(received_cookie, id);
if (ret) {
printk("ALERT: wrong id\n");
return ret;
}
#endif
// We came back here but maybe we're the last AWE and
// we're re-started by do finish
fipc_test_pause();
goto retry;
return 0;
}
#endif /* IPC_HELPER_H */
......@@ -15,6 +15,8 @@
#include "../../perf_counter_helper.h"
#include <linux/vmalloc.h>
#include "../../ipc_helper.h"
#include <lcd_config/post_hook.h>
#define NUM_PACKETS (1000000)
......@@ -1245,6 +1247,14 @@ int setup_once(struct trampoline_hidden_args *hidden_args)
return 0;
}
int sender_dispatch(struct thc_channel *chnl, struct fipc_message *out, void *arg)
{
/* we receive the skb pointer via arg, pass it to consume skb via reg 0 */
fipc_set_reg0(out, (uint64_t) arg);
// printk("%s, called\n", __func__);
return dispatch_async_loop(chnl, out, c_cspace, sync_ep);
}
int ndo_start_xmit_async(struct sk_buff *skb, struct net_device *dev, struct trampoline_hidden_args *hidden_args)
{
int ret;
......@@ -1321,10 +1331,20 @@ int ndo_start_xmit_async(struct sk_buff *skb, struct net_device *dev, struct tra
// ret = thc_ipc_recv_response(async_chnl, request_cookie, &_response);
ret = thc_ipc_recv_response_new(async_chnl, request_cookie,
#ifdef SENDER_DISPATCH_LOOP
// printk("%s, server dispatch %p\n", __func__, skb);
ret = thc_ipc_recv_req_resp(async_chnl, &_response, request_cookie, sender_dispatch, (void*)skb);
fipc_recv_msg_end(thc_channel_to_fipc(async_chnl), _response);
// printk("%s, got response\n", __func__);
#else
ret = thc_ipc_recv_response_inline(async_chnl, request_cookie,
&_response);
awe_mapper_remove_id(request_cookie);
#endif
//printk("%s, ipc call returned %d\n", __func__, ret);
if (unlikely(ret)) {
LIBLCD_ERR("thc_ipc_call");
......@@ -1361,7 +1381,7 @@ int ndo_start_xmit_async_landing(struct sk_buff *first, struct net_device *dev,
}
if (!skb->chain_skb) {
return ndo_start_xmit_dummy(skb, dev, hidden_args);
return ndo_start_xmit_bare_async(skb, dev, hidden_args);
} else {
/* chain skb */
if (!current->ptstate) {
......@@ -1568,12 +1588,16 @@ int ndo_start_xmit_async2(struct sk_buff *skb, struct net_device *dev, struct tr
#ifdef OLD_RESPONSE
ret = thc_ipc_recv_response(async_chnl, request_cookie,
&_response);
#elif defined(SENDER_DISPATCH_LOOP)
ret = thc_ipc_recv_req_resp(async_chnl, &_response, request_cookie, sender_dispatch, (void*)skb);
fipc_recv_msg_end(thc_channel_to_fipc(async_chnl), _response);
#else
ret = thc_ipc_recv_response_new(async_chnl, request_cookie,
ret = thc_ipc_recv_response_inline(async_chnl, request_cookie,
&_response);
awe_mapper_remove_id(request_cookie);
#endif /* OLD_RESPONSE */
#endif /* OLD_RESPONSE */
#else /* NO_ASYNC */
thc_set_msg_type(_request, msg_type_request);
......@@ -1605,7 +1629,7 @@ fail_ipc:
return ret;
}
int NUM_INNER_ASYNCS = 2;
int NUM_INNER_ASYNCS = 1;
//#define TS
int ndo_start_xmit_bare_async(struct sk_buff *skb, struct net_device *dev, struct trampoline_hidden_args *hidden_args)
......@@ -1691,8 +1715,11 @@ int ndo_start_xmit_bare_async(struct sk_buff *skb, struct net_device *dev, struc
NUM_TRANSACTIONS, _TS_DIFF(xmit)/NUM_TRANSACTIONS);
#endif
free:
#ifdef NO_HASHING
#ifdef CONSUME_SKB_NO_HASHING
dev_kfree_skb(skb);
#endif
#ifdef NO_HASHING
// dev_kfree_skb(skb);
#endif
return NETDEV_TX_OK;
}
......@@ -1829,10 +1856,17 @@ free:
#endif
return NETDEV_TX_OK;
}
#if 0
int dispatch_request(struct thc_channel *channel, struct fipc_message *req)
{
}
#endif
int ndo_start_xmit_bare2(struct sk_buff *skb, struct net_device *dev, struct trampoline_hidden_args *hidden_args)
{
struct fipc_message *_request;
struct fipc_message *_request1;
struct fipc_message *_response;
xmit_type_t xmit_type;
struct thc_channel *async_chnl;
......@@ -1888,6 +1922,24 @@ int ndo_start_xmit_bare2(struct sk_buff *skb, struct net_device *dev, struct tra
fipc_send_msg_end(thc_channel_to_fipc(
async_chnl), _request);
/* to receive consume_skb */
fipc_test_blocking_recv_start(
async_chnl,
&_request1);
/* TODO: replace this with a valid identifier mechanism
* e.g., bitmap, create a bitmap and store this skb pointer and send the bitnumber
* across the domain. during the consume_skb call, the same number is sent back and
* looked up here by the consume skb function to get the corresponding skb pointer.
* but this below mechanism is no way inferior to bitmap, except that the bitmap
* mechanism is more generalizable.
*/
fipc_set_reg0(_request1, (uint64_t) skb);
/* call consume_skb */
dispatch_async_loop(async_chnl, _request1, hidden_args->cspace,
hidden_args->sync_ep);
/* guard nonlcd case with all macros */
fipc_test_blocking_recv_start(
async_chnl,
......@@ -4274,6 +4326,25 @@ fail_alloc:
return ret;
}
#ifdef CONSUME_SKB_NO_HASHING
int consume_skb_callee(struct fipc_message *request, struct thc_channel *channel, struct glue_cspace *cspace, struct cptr sync_ep)
{
int ret = 0;
struct sk_buff *skb;
skb = (struct sk_buff *) fipc_get_reg0(request);
fipc_recv_msg_end(thc_channel_to_fipc(channel),
request);
// printk("%s, freeing %p\n", __func__, skb);
// consume_skb(skb);
return ret;
}
#else
//TODO:
int consume_skb_callee(struct fipc_message *request, struct thc_channel *channel, struct glue_cspace *cspace, struct cptr sync_ep)
{
......@@ -4375,6 +4446,7 @@ fail_async:
#endif
return ret;
}
#endif /* CONSUME_SKB_NO_HASHING */
int trigger_exit_to_lcd(struct thc_channel *_channel, enum dispatch_t disp)
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment