Commit e2ab1860 authored by Anmol Vatsa's avatar Anmol Vatsa

multiple changes

+send local cap over remote rp
+recv remote cap over local rp
+send remote cap over remote rp
+receive remote cap over remote rp
+send remote cap over local rp
parent bf34ba4b
......@@ -137,6 +137,15 @@ void *intercomm_rpc_server_handler(void *thread_data) {
break;
}
case CONTROLLER_RPC__RPCMESSAGE__TYPE_RP_RECV_REMOTE_CAP: {
if (handle_rp_recv_remote_cap_messages(client, request) < 0) {
c_log_err("Could not fulfill RPRecvRemoteCap req");
client_reply_error(client->transport, "Could not fulfill RPRecvRemoteCap req");
return;
}
break;
}
default:
/* Nevermind a response if the request type is unrecognized */
c_log_err("Undefined RPC request");
......
......@@ -1277,12 +1277,22 @@ int cn_dispatch_invoke(cn_principal_t *as, cn_dispatch_result_t *result,
switch (method) {
case CN_RP_SEND: {
struct timespec starttime;
clock_gettime(CLOCK_REALTIME, &starttime);
c_log_debug("DEBUG:%s:%lu.%lu:RP_SEND Start", __func__, starttime.tv_sec, starttime.tv_nsec);
if( (res = cn_dispatch_rp_send(obj_cnode, args, result)) ) {
goto fail1;
}
struct timespec endtime;
clock_gettime(CLOCK_REALTIME, &endtime);
c_log_debug("DEBUG:%s:%lu.%lu:RP_SEND End", __func__, endtime.tv_sec, endtime.tv_nsec);
c_log_debug("DEBUG:%s:End-Start RP_SEND:%lu", __func__, mytimediff(starttime, endtime));
}
break;
case CN_RP_RECV: {
struct timespec starttime;
clock_gettime(CLOCK_REALTIME, &starttime);
c_log_debug("DEBUG:%s:%lu.%lu:RP_RECV Start", __func__, starttime.tv_sec, starttime.tv_nsec);
cn_objtype_t recvd_type;
cptr_t recvd_cap;
struct cn_rp_elem_message msg;
......@@ -1299,6 +1309,10 @@ int cn_dispatch_invoke(cn_principal_t *as, cn_dispatch_result_t *result,
res = fail_invoke(result, "couldn't allocate recv reply items");
goto fail1;
}
struct timespec endtime;
clock_gettime(CLOCK_REALTIME, &endtime);
c_log_debug("DEBUG:%s:%lu.%lu:RP_RECV End", __func__, endtime.tv_sec, endtime.tv_nsec);
c_log_debug("DEBUG:%s:End-Start RP_RECV:%lu", __func__, mytimediff(starttime, endtime));
}
break;
case CN_RP_RECV_WAIT: {
......
......@@ -485,16 +485,6 @@ void cn_node_free(cn_node_t *node) {
free(node);
}
/* subsec: RP Objects */
enum cn_rp_elem_source {
CN_RP_ELEM_SOURCE_INJECTED,
CN_RP_ELEM_SOURCE_NORMAL,
CN_RP_ELEM_SOURCE_MEMBRANE,
CN_RP_ELEM_SOURCE_MESSAGE,
CN_RP_ELEM_SOURCE_REMOTE
};
enum cn_membrane_type cn_membrane_type_opposite(enum cn_membrane_type type) {
switch (type) {
case CN_MEMBRANE_TYPE_EXTERNAL:
......@@ -506,29 +496,6 @@ enum cn_membrane_type cn_membrane_type_opposite(enum cn_membrane_type type) {
cn_abort("unreachable");
}
struct cn_rp_elem {
enum cn_rp_elem_source source;
bool has_msg;
struct cn_rp_elem_message msg;
union {
struct {
cn_principal_t *cap_owner;
cptr_t cap;
} normal;
struct {
cn_obj_t * object;
} injected;
struct {
cn_principal_t *cap_owner;
cptr_t cap;
cn_cnode_meta_t * sender_meta;
} membrane;
struct {
char* remote_end;
} remote;
};
};
static inline bool cn_rp_elem_is_normal(struct cn_rp_elem * elem) {
return elem->source == CN_RP_ELEM_SOURCE_NORMAL;
}
......@@ -541,7 +508,6 @@ static inline bool cn_rp_elem_is_membrane(struct cn_rp_elem * elem) {
return elem->source == CN_RP_ELEM_SOURCE_MEMBRANE;
}
static void __cn_rp_elem_release_refs(void * who, struct cn_rp_elem *elem);
static void __cn_rp_elem_hold_refs(void * who, struct cn_rp_elem *elem);
void cn_rp_elem_free(struct cn_rp_elem * elem) {
......@@ -560,7 +526,7 @@ void cn_rp_elem_free(struct cn_rp_elem * elem) {
static void __cn_rp_queue_free(cn_rp_t *rp) {
while (! g_queue_is_empty(rp->queue)) {
struct cn_rp_elem *elem = (struct cn_rp_elem *) g_queue_pop_head(rp->queue);
__cn_rp_elem_release_refs(rp, elem);
cn_rp_elem_release_refs(rp, elem);
cn_rp_elem_free(elem);
}
g_queue_free(rp->queue);
......@@ -1801,6 +1767,16 @@ static int __cn_rp_elem_message_set(struct cn_rp_elem *elem,
}
/* make a new rp_elem */
int cn_rp_elem_remote(char* remote_end, struct cn_rp_elem ** o_elem) {
struct cn_rp_elem * elem = g_slice_alloc(sizeof(struct cn_rp_elem));
if (elem == NULL) { c_log_err("could not allocate elem");return -1; }
elem->source = CN_RP_ELEM_SOURCE_REMOTE;
elem->remote.remote_end = strdup(remote_end);
*o_elem = elem;
return 0;
}
int cn_rp_elem_normal(cn_principal_t *p, cptr_t cap,
struct cn_rp_elem_message * msg,
struct cn_rp_elem ** o_elem) {
......@@ -1882,7 +1858,7 @@ static void __cn_rp_elem_hold_refs(void * who, struct cn_rp_elem *elem) {
}
}
static void __cn_rp_elem_release_refs(void * who, struct cn_rp_elem *elem) {
void cn_rp_elem_release_refs(void * who, struct cn_rp_elem *elem) {
if (cn_rp_elem_is_injected(elem)) {
RPUTs_OBJ(elem->injected.object, who);
} else if (cn_rp_elem_is_membrane(elem)) {
......@@ -2093,11 +2069,19 @@ int cn_rp_recv(struct cnode *rp,
goto recover;
}
*flags |= CN_RP_RECV_RESULT_TYPE_CPTR;
} else if (e->source == CN_RP_ELEM_SOURCE_REMOTE) {
cncinfo("[rp recv] remote element");
if(proxy_rp_recv_remote_cap(e->remote.remote_end,
rp, recv_cap, type) < 0) {
cncerr("remote grant failed\n");
goto recover;
}
*flags |= CN_RP_RECV_RESULT_TYPE_CPTR;
} else {
cn_abort("Unrecognized rp elem source: %d", e->source);
}
__cn_rp_elem_release_refs(_rp, e);
cn_rp_elem_release_refs(_rp, e);
cn_rp_elem_free(e);
cn_obj_unlock(_rp);
return 0;
......@@ -2109,7 +2093,7 @@ recover:
recover1:
// If the grant fails, put it back on the queue, in the front
cn_rp_pushfront(_rp, e);
__cn_rp_elem_release_refs(_rp, e);
cn_rp_elem_release_refs(_rp, e);
cn_obj_unlock(_rp);
return -1;
}
......@@ -2511,7 +2495,7 @@ int cn_membrane_recv(struct cnode *m, cptr_t *o_cap, cn_objtype_t *type) {
goto recover1;
}
__cn_rp_elem_release_refs(membrane->rp, e);
cn_rp_elem_release_refs(membrane->rp, e);
cn_rp_elem_free(e);
cn_obj_unlock(membrane);
return 0;
......@@ -2519,7 +2503,7 @@ int cn_membrane_recv(struct cnode *m, cptr_t *o_cap, cn_objtype_t *type) {
recover1:
/* Push first in case these are the last references to this object */
cn_rp_pushfront(membrane->rp, e);
__cn_rp_elem_release_refs(membrane->rp, e);
cn_rp_elem_release_refs(membrane->rp, e);
recover:
cn_obj_unlock(membrane);
return res;
......
......@@ -674,6 +674,30 @@ struct cn_node_grant {
cn_node_t * node;
};
struct cn_rp_elem {
enum cn_rp_elem_source source;
bool has_msg;
struct cn_rp_elem_message msg;
union {
struct {
cn_principal_t *cap_owner;
cptr_t cap;
} normal;
struct {
cn_obj_t * object;
} injected;
struct {
cn_principal_t *cap_owner;
cptr_t cap;
cn_cnode_meta_t * sender_meta;
} membrane;
struct {
char* remote_end;
} remote;
};
};
/* -------------- Setup ----------------- */
/* Initialize the object/capability system */
......@@ -1017,6 +1041,11 @@ int cn_node_grant_rp0(struct cnode * grant_cnode, cptr_t *o_rp_cap);
/* --------- RP Manipulation --------- */
/* Creates a "remote" rp_elem struct that consists of a remote_end
* identifier for the remote controller which has the actual element
*/
int cn_rp_elem_remote(char* remote_end, struct cn_rp_elem ** o_elem);
/* Creates a "normal" rp_elem struct that consistes of a principal and a
* cptr. When this elem is recevied, it will trigger a "grant" operation
* from the "owner" of the given "cap" to the receiver. Returns 0 on success. */
......@@ -1102,6 +1131,8 @@ int cn_rp_recv(struct cnode *rp,
* they called recv_wait. */
int cn_rp_recv_wait(struct cnode *rp, uint64_t uuid);
void cn_rp_elem_release_refs(void * who, struct cn_rp_elem *elem);
/* --------- Broker Manipulation --------- */
/*
......
......@@ -34,6 +34,14 @@ typedef struct cn_shadow cn_shadow_t;
struct cn_rp_elem;
enum cn_rp_elem_source {
CN_RP_ELEM_SOURCE_INJECTED,
CN_RP_ELEM_SOURCE_NORMAL,
CN_RP_ELEM_SOURCE_MEMBRANE,
CN_RP_ELEM_SOURCE_MESSAGE,
CN_RP_ELEM_SOURCE_REMOTE
};
struct cn_rp_elem_message {
uint8_t * data;
size_t len;
......
This diff is collapsed.
......@@ -15,6 +15,7 @@ int handle_create_object_request(struct RPCEndPoint* endpoint, ControllerRPC__RP
int handle_remote_invoke_messages(struct RPCEndPoint* endpoint, ControllerRPC__RPCMessage *request);
int handle_fulfill_embedded_messages(struct RPCEndPoint* endpoint, ControllerRPC__RPCMessage *request);
int proxy_rp_recv_remote_cap(char* remote_end, struct cnode* rp_cnode, cptr_t* recvd, cn_objtype_t* recvd_type);
int proxy_node_grant_remote_cap(struct cnode* node_grant_cnode, cn_principal_t* as, cptr_t cptr,
cptr_t* granted, cn_objtype_t* granted_type);
int proxy_remote_revoke(cn_principal_t* as, cptr_t cptr);
......
......@@ -18,9 +18,16 @@ message RPCMessage {
RemoteLookup remote_lookup = 13;
FulfillEmbedded fulfill_embedded = 14;
GrantRemoteCap grant_remote_cap = 15;
RPRecvRemoteCap rp_recv_remote_cap = 16;
}
}
message RPRecvRemoteCap {
required fixed64 requester_id = 1;
required NodeInfo requester_info = 2;
required fixed64 rp_obj_id = 3;
}
message GrantRemoteCap {
required fixed64 requester_id = 1;
required fixed64 grant_to_node_id = 2;
......@@ -175,13 +182,13 @@ message MethodInvokeRequest {
message MethodInvokeResponse {
message RPSend {
required fixed64 cap_id = 1;
required string remote_end = 1;
}
message RPRecv {
required ShadowReturnResponse shadow_cap = 1;
required int32 flags = 2;
required string message = 3;
required int32 message_len = 4;
optional string message = 3;
optional int32 message_len = 4;
}
message NodeReset {
required ShadowReturnResponse node_grant = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment