Commit 73ee247e authored by Leigh B. Stoller's avatar Leigh B. Stoller

Pull in Tim's changes for persistent connections over ssh.

parent f226d7ef
......@@ -3,22 +3,171 @@
# Copyright (c) 2004 University of Utah and the Flux Group.
# All rights reserved.
#
import os
import sys
import types
import urllib
import popen2
import rfc822
import xmlrpclib
##
# Base class for exceptions in this module.
#
class SSHException(Exception):
pass
##
# Indicates a poorly formatted response from the server.
#
class BadResponse(SSHException):
##
# @param host The server host name.
# @param handler The handler being accessed on the server.
# @param arg Description of the problem.
#
def __init__(self, host, handler, msg):
self.args = host, handler, msg,
return
pass
##
# Class used to decode headers.
#
class SSHMessage(rfc822.Message):
pass
##
# An SSH based connection class.
#
class SSHConnection:
##
# @param host The peer host name.
# @param handler The handler being accessed.
# @param streams A pair containing the input and output files respectively.
# If this value is not given, ssh will be used to connect to the host.
# @param ssh_config The ssh config file to use when initiating a new
# connection.
#
def __init__(self, host, handler, streams=None, ssh_config=None):
# Store information about the peer and
self.handler = handler
self.host = host
# ... initialize the read and write file objects.
if streams:
self.myChild = None
self.rfile = streams[0]
self.wfile = streams[1]
pass
else:
self.user, self.host = urllib.splituser(self.host)
print self.user + " " + self.host + " " + handler
flags = ""
if self.user:
flags = flags + " -l " + self.user
pass
if ssh_config:
flags = flags + " -F " + ssh_config
pass
self.myChild = popen2.Popen3("ssh -x "
+ flags
+ " "
+ self.host
+ " "
+ handler,
1)
self.rfile = self.myChild.fromchild
self.wfile = self.myChild.tochild
pass
return
##
# @param len The amount of data to read. (Default: 1024)
# @return The amount of data read.
#
def read(self, len=1024):
return self.rfile.read(len)
##
# @return A line of data or None if there is no more input.
#
def readline(self):
return self.rfile.readline()
##
# @param stuff The data to send to the other side.
# @return The amount of data written.
#
def write(self, stuff):
return self.wfile.write(stuff)
##
# Flush any write buffers.
#
def flush(self):
self.wfile.flush()
return
##
# Close the connection.
#
def close(self):
self.rfile.close()
self.wfile.close()
if self.myChild:
self.myChild.wait()
self.myChild = None
pass
return
##
# Send an rfc822 style header to the other side.
#
# @param key The header key.
# @param value The value paired with the given key.
#
def putheader(self, key, value):
self.write("%s: %s\r\n" % (key, str(value)))
return
##
# Terminate the list of headers so the body can follow.
#
def endheaders(self):
self.write("\r\n")
self.flush()
return
def __repr__(self):
return "<SSHConnection %s%s>" % (self.host, self.handler)
__str__ = __repr__
pass
##
# Use SSH to transport XML-RPC requests/responses
#
class SSHTransport:
##
# @param ssh_config The ssh config file to use when making new connections.
#
def __init__(self, ssh_config=None):
self.connections = {}
self.ssh_config = ssh_config
return
##
# Send a request to the destination.
#
# @param host The host name on which to execute the request
# @param handler The python module that will handle the request.
# @param handler The python file that will handle the request.
# @param request_body The XML-RPC encoded request.
# @param verbose unused.
# @return The value returned
......@@ -29,25 +178,22 @@ class SSHTransport:
handler = handler[1:]
pass
self.user, self.realhost = urllib.splituser(host)
print self.user + " " + self.realhost + " " + handler
# SSH to the host and call python on the handler.
self.myChild = popen2.Popen3("ssh -x -l " + self.user + " "
+ self.realhost + " "
+ handler)
# Send the request over SSH's stdin,
self.myChild.tochild.write(request_body)
# ... close to signal the end of the request,
self.myChild.tochild.close() # XXX Do something smarter here.
# Try to get a new connection,
if not self.connections.has_key((host,handler)):
sys.stderr.write("New connection for %s %s\n" %
(host, handler))
self.connections[(host,handler)] = SSHConnection(host, handler)
pass
connection = self.connections[(host,handler)]
# ... parse the response, and
retval = self.parse_response()
# ... send our request, and
connection.putheader("content-length", len(request_body))
connection.endheaders()
connection.write(request_body)
connection.flush()
# ... wait for SSH to terminate.
self.myChild.wait()
# ... parse the response.
retval = self.parse_response(connection)
return retval
......@@ -62,15 +208,25 @@ class SSHTransport:
#
# @return The python value returned by the server method.
#
def parse_response(self):
def parse_response(self, connection):
parser, unmarshaller = self.getparser()
while True:
response = self.myChild.fromchild.read(1024)
if not response:
break
parser.feed(response)
try:
# Get the headers,
headers = SSHMessage(connection, False)
# ... the length of the body, and
length = int(headers['content-length'])
# ... read in the body.
response = connection.read(length)
pass
except KeyError, e:
# Bad header, drop the connection, and
del self.connections[(connection.host,connection.handler)]
connection.close()
# ... tell the user.
raise BadResponse(connection.host, connection.handler, e.args)
parser.feed(response)
return unmarshaller.close()
......@@ -90,6 +246,7 @@ class SSHServerWrapper:
# @param object The object to wrap.
#
def __init__(self, object):
self.ssh_connection = os.environ['SSH_CONNECTION'].split()
self.myObject = object
return
......@@ -98,50 +255,86 @@ class SSHServerWrapper:
# from the client, dispatch the method, and write the response back to the
# client.
#
# @param streams A pair containing the input and output streams.
# @param connection An initialized SSHConnection object.
#
def handle_request(self, streams):
def handle_request(self, connection):
retval = False
try:
# Read the request,
params, method = xmlrpclib.loads(streams[0].read())
# ... find the corresponding method in the wrapped object,
meth = getattr(self.myObject, method)
# ... dispatch the method, and
if type(meth) == type(self.handle_request):
response = apply(meth, params) # It is really a method.
hdrs = SSHMessage(connection, False)
length = int(hdrs['content-length'])
params, method = xmlrpclib.loads(connection.read(length))
try:
# ... find the corresponding method in the wrapped object,
meth = getattr(self.myObject, method)
# ... dispatch the method, and
if type(meth) == type(self.handle_request):
response = apply(meth, params) # It is really a method.
pass
else:
response = str(meth) # Is is just a plain variable.
pass
# ... ensure there was a valid response.
if type(response) != type(( )):
response = (response,)
pass
pass
else:
response = str(meth) # Is is just a plain variable.
except:
# Some other exception happened, convert it to an XML-RPC fault
response = xmlrpclib.dumps(
xmlrpclib.Fault(1,
"%s:%s" % (sys.exc_type, sys.exc_value)))
pass
# ... ensure there was a valid response.
if type(response) != type(( )):
response = (response,)
else:
# Everything worked, encode the real response.
response = xmlrpclib.dumps(response, methodresponse=1)
pass
pass
except xmlrpclib.Fault, faultobj:
# An XML-RPC related fault occurred, just encode the response.
response = xmlrpclib.dumps(faultobj)
retval = True
pass
except:
# Some other exception happened, convert it to an XML-RPC fault.
response = xmlrpclib.dumps(
xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value)))
pass
else:
# Everything worked, encode the real response.
response = xmlrpclib.dumps(response, methodresponse=1)
retval = True
pass
# Finally, send the reply to the client.
streams[1].write(response)
connection.putheader("content-length", len(response))
connection.endheaders()
connection.write(response)
connection.flush()
return
return retval
##
# Handle all of the user requests.
#
# @param streams A pair containing the input and output streams.
#
def serve_forever(self, streams):
# Make a new connection from the streams and handle requests until the
# streams are closed or there is a protocol error.
connection = SSHConnection(self.ssh_connection[0], '', streams=streams)
try:
done = False
while not done:
done = self.handle_request(connection)
pass
pass
finally:
connection.close()
pass
return
pass
##
# A proxy for XML-RPC servers that are accessible via SSH.
# A client-side proxy for XML-RPC servers that are accessible via SSH.
#
class SSHServerProxy:
......@@ -208,3 +401,4 @@ class SSHServerProxy:
# magic method dispatcher
return xmlrpclib._Method(self.__request, name)
pass
......@@ -58,6 +58,61 @@ def usage():
+ " -s boss.emulab.net echo \"Hello World!\"")
return
#
# Process a single command line
#
def do_method(server, method_and_args):
# Get a pointer to the function we want to invoke.
meth = getattr(server, method_and_args[0])
# Pop off the method, and then convert the rest of the arguments.
# Be sure to add the version.
method_and_args.pop(0)
#
# Convert all params (name=value) into a Dictionary.
#
params = {}
for param in method_and_args:
plist = string.split(param, "=")
if len(plist) != 2:
print "Parameters are of the form: param=value!"
return -1
params[plist[0]] = plist[1]
pass
meth_args = [ PACKAGE_VERSION, params ]
#
# Make the call.
#
response = apply(meth, meth_args)
#
# Parse the Response, which is a Dictionary. See ResponseBlock in the
# emulabclient.py module. The XML standard converts classes to a plain
# Dictionary, hence the code below.
#
if len(response["output"]):
print response["output"]
pass
rval = response["code"]
#
# If the code indicates failure, look for a "value". Use that as the
# return value instead of the code.
#
if rval != RESPONSE_SUCCESS:
if response["value"]:
rval = response["value"]
pass
pass
return rval
#
# Process program arguments.
#
try:
# Parse the options,
opts, req_args = getopt.getopt(sys.argv[1:],
......@@ -86,9 +141,6 @@ try:
debug = 1
pass
pass
# ... make sure the required arguments are there.
if len(req_args) < 1:
raise getopt.error("Required arguments not given", "")
pass
except getopt.error, e:
print e.args[0]
......@@ -100,43 +152,27 @@ except getopt.error, e:
server = SSHServerProxy("ssh://" + login_id + "@" + xmlrpc_server +
"/xmlrpc/" + module)
# Get a pointer to the function we want to invoke.
meth = getattr(server, req_args[0])
# Pop off the method, and then convert the rest of the arguments. Be sure to
# add the version.
req_args.pop(0)
params = {}
for param in req_args:
plist = string.split(param, "=")
if len(plist) != 2:
print "Parameters are of the form: param=value!"
sys.exit(-1)
params[plist[0]] = plist[1]
pass
meth_args = [ PACKAGE_VERSION, params ]
#
# Make the call.
#
response = apply(meth, meth_args)
#
# Parse the Response, which is a Dictionary. See ResponseBlock in the
# emulabclient.py module. The XML standard converts classes to a plain
# Dictionary, hence the code below.
#
if len(response["output"]):
print response["output"]
pass
if response["code"] != RESPONSE_SUCCESS:
if response["value"]:
sys.exit(response["value"])
else:
sys.exit(response["code"])
if len(req_args):
# Method and args are on the command line.
sys.exit(do_method(server, req_args))
else:
# Prompt the user for input.
try:
while True:
line = raw_input("$ ")
tokens = line.split(" ", 1)
if len(tokens) >= 1 and len(tokens[0]) > 0:
try:
print str(do_method(server, tokens))
pass
except xmlrpclib.Fault, e:
print e.faultString
pass
pass
pass
pass
except EOFError:
pass
print
pass
sys.exit(RESPONSE_SUCCESS)
......@@ -18,5 +18,5 @@ from emulabserver import emulabserver
# Construct and wrap our object.
wrapper = sshxmlrpc.SSHServerWrapper(emulabserver())
# Handle the request on stdin and send the response to stdout.
wrapper.handle_request((sys.stdin, sys.stdout))
wrapper.serve_forever((sys.stdin, sys.stdout))
sys.exit(0)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment