Commit f8df3b39 authored by Josh Kunz's avatar Josh Kunz

Add through step 2 of the joint computation protocol

parent 4e2c79f4
Pipeline #1787 passed with stage
in 6 seconds
import capnet
import capnet.util
import util
DATA_PROXY_NODE = "A-data-proxy"
def split_nodes(n):
"""Split the nodes into those that are for A_data, and those
for A_compute"""
...
return (A_data, A_compute)
create(Flow)
# Where "p" is the protocol object
def setup(p):
# get the basic bootstrapping objects
rp0 = p.rp0()
me = rp0.recv()
broker = rp0.recv()
# Get the nodes + A_data and A_compute
nodes = []
for n in rp0.recv_iter_exn():
nodes.append(n)
A_data, A_compute = split_nodes(nodes)
# Wait till B has registered its broker RP, then
# it and continue
b_rp = broker.lookup_wait("B")
# Get the sealed RP and the update flow from the first step.
aaas_result_gen = util.aaas_send(p, b_rp, A_data)
a_seal_unseal = p.create(capnet.SealerUnsealer)
with util.aaas_recv_context(p, b_rp) as (w_rp, A_data_w):
nodes = util.build_node_db(p, A_data_w)
# ... Do any kind of setup actions here
# ... Add A's data onto the nodes for example
# Give the proxy node our sealer/unsealer
nodes[DATA_PROXY_NODE].rp0.send(a_seal_unseal)
# Get the proxy RP from the proxy node
sealed_proxy_rp = nodes[DATA_PROXY_NODE].rp0.recv_wait()
# create a proxy flow
data_proxy_flow = nodes[DATA_PROXY_NODE].grant.flow()
# send back our data-proxy flow and the proxy rp
w_rp.send(data_proxy_flow)
w_rp.send(sealed_proxy_rp)
update_flow, double_sealed_rp = aaas_result_gen
# Finished Step 2
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_option("interface")
args = parser.parse_args()
p = capnet.Protocol(args.interface)
setup(p)
import capnet
p = capnet.Protocol("...intf...")
rp0 = p.rp0()
a_seal_unseal = rp0.recv_wait()
data_rp = p.create(capnet.RP)
sealed_data_rp = a_seal_unseal.seal(data_rp)
rp0.send(sealed_data_rp)
# Wait till step 5
a_compute_pxy = rp0.recv_wait()
# Run proxy to begin receiving requests
import capnet
import capnet.util
import util
from collections import namedtuple
NodeEntry = namedtuple("NodeEntry", ["node", "info", "rp", "grant"])
def setup(p):
# get the basic bootstrapping objects
rp0 = p.rp0()
me = rp0.recv()
broker = rp0.recv()
b_update_pxy, b_compute_pxy = rp0.recv_iter_exn()
broker_rp = p.create(capnet.RP)
broker.register("B", broker_rp)
b_seal_unseal = p.create(capnet.SealerUnsealer)
with util.aaas_recv_context(p, broker_rp) as (w_rp, A_data):
update_flow, sealed_rp = util.aaas_send(p, w_rp, A_data)
# Somehow need to wrap the b_update_proxy
b_update_pxy_rp0 = p.create(capnet.RP)
b_update_pxy_grant = b_update_pxy.reset(b_update_pxy_rp0)
update_flow = b_update_pxy_grant.flow()
# Run the correct capability agent on b_proxy
# See: B_data_proxy_for_a.py
b_update_pxy_rp0.send(b_seal_unseal)
b_update_pxy_rp0.send(update_flow)
b_update_pxy_rp0.send(sealed_rp)
double_sealed_rp = b_update_pxy_rp0.recv_wait()
# Send the results
w_rp.send(update_flow)
w_rp.send(double_sealed_rp)
# Finished Step 2
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_option("interface")
args = parser.parse_args()
p = capnet.Protocol(args.interface)
setup(p)
import capnet
p = capnet.Protocol("... intf ...")
rp0 = p.rp0()
b_seal_unseal = rp0.recv_wait()
A_data_update_flow = rp0.recv_wait()
A_data_sealed_rp = rp0.recv_wait()
A_data_ab_sealed_rp = b_seal_unseal.seal(A_data_sealed_rp)
rp0.send(A_data_ab_sealed_rp)
# Run proxy to begin receiving requests
import capnet
from contextlib import contextmanager
def build_node_db(p, nodes):
db = {}
for node in nodes:
info = node.info()
rp = p.create(capnet.RP)
grant = node.reset(rp)
db[info.name] = NodeEntry(node, info, rp, grant)
return db
def aaas_send(p, init_rp, nodes):
"""AaaS (Appliance as a Service) Reciver
This call is used by the party that is trying to isolate their capabilities
from the party that wants to temporarily use those capabilites. """
membrane = p.create(capnet.Membrane)
membrane_ext = membrane.external()
init_rp.send(membrane_ext)
membrane_rp = p.create(capnet.RP)
membrane.send(membrane_rp)
with membrane_rp.send_iter_context():
for node in nodes: membrane_rp.send(node)
for result in membrane_rp.recv_iter():
yield result
membrane.clear()
membrane_rp.revoke()
membrane_rp.delete()
membrane.revoke()
membrane.delete()
@contextmanager
def aaas_recv_context(p, init_rp):
"""AaaS (Appliance as a Service) Receiver.
This call is used to receive capabili..."""
membrane_ext = init_rp.recv_wait()
membrane_rp = membrane_ext.recv_wait()
wrapped_caps = list(membrane_rp.recv_iter())
with membrane_rp.send_iter_context():
yield (membrane_rp, wrapped_caps)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment