Commit 3972a7d6 authored by Robert Ricci's avatar Robert Ricci

Merge in version 0.2.2.1 of dslice.

parent 8f2e1cc3
......@@ -45,7 +45,7 @@ authentication and encryption over SSL.
AUTHOR: Brent Chun (bnc@intel-research.net)
$Id: agentsslsvr.py,v 1.1 2003-08-19 17:17:19 aclement Exp $
$Id: agentsslsvr.py,v 1.2 2003-09-13 00:23:03 ricci Exp $
"""
import threading
......@@ -109,8 +109,8 @@ class agentsslsvr(SSL.ThreadingSSLServer):
raise "Static slice %s already exists" % slice
if not slicename.isvalidlogin(slice):
raise "Slice name %s is a bad name" % slice
if agent.dbthr.hastickets(slice):
raise "Outstanding tickets for slice %s" % slice
# if agent.dbthr.hastickets(slice):
# raise "Outstanding tickets for slice %s" % slice
if agent.gmetadthr.leaseexists(slice):
raise "Outstanding leases for slice %s" % slice
if agent.gmetadthr.sliceexists(slice):
......
......@@ -47,7 +47,7 @@ publish their existence in Ganglia through their local gmond).
AUTHOR: Brent Chun (bnc@intel-research.net)
$Id: gmetadthr.py,v 1.1 2003-08-19 17:17:20 aclement Exp $
$Id: gmetadthr.py,v 1.2 2003-09-13 00:23:03 ricci Exp $
"""
import threading
......@@ -66,6 +66,13 @@ class gmetadthr(threading.Thread):
self.leases_tag = agent.conf.leases_tag
self.slivers_tag = agent.conf.slivers_tag
def reset(self):
self.ips = []
self.ipstoleases = {}
self.leasestoips = {}
self.ipstoslices = {}
self.slicestoips = {}
def run(self):
self.ips = []
self.ipstoleases = {}
......@@ -80,7 +87,7 @@ class gmetadthr(threading.Thread):
self.updateleases()
self.updateslices()
except:
pass # Ganglia XML SAX badness
self.reset() # Ganglia XML SAX badness
time.sleep(self.pollint)
def updateips(self):
......
......@@ -47,7 +47,7 @@ commands for probing agent and node manager state.
AUTHOR: Brent Chun (bnc@intel-research.net)
$Id: svm.py,v 1.1 2003-08-19 17:17:23 aclement Exp $
$Id: svm.py,v 1.2 2003-09-13 00:23:03 ricci Exp $
"""
import agent, agentproxy, broker, calendar, digest, fileutil, lease
......@@ -55,13 +55,14 @@ import nodemgr, nodemgrproxy, os, re, shutil, sys, threading, ticket
import time, xmlconf
class svmthread(threading.Thread):
def __init__(self, ip, method, inargs, outargs, flags):
def __init__(self, ip, method, inargs, outargs, flags, sem):
threading.Thread.__init__(self)
self.ip = ip
self.method = method
self.inargs = inargs
self.outargs = outargs
self.flags = flags
self.sem = sem
def run(self):
from tcppinger import tcppinger
......@@ -76,6 +77,7 @@ class svmthread(threading.Thread):
print "Error on %s: %s" % (self.ip, sys.exc_info())
else:
print "Success on %s" % self.ip
self.sem.release()
class svm:
def __init__(self, clientconf, brokerconf):
......@@ -384,34 +386,27 @@ class svm:
"""
Parallel XML-RPC to nodes. inargs and outargs are dictionaries
of input/output arguments indexed by node IP addresses. Wait
at most self.maxwait seconds for each thread group to complete.
at most self.maxwait seconds for all threads to complete.
"""
maxthreads = flags["parallelism"]
ngroups = len(ips)/maxthreads + 1
for i in range(ngroups):
start = i * maxthreads
end = (i + 1) * maxthreads
if start > len(ips):
start = len(ips)
if end > len(ips):
end = len(ips)
threads = []
for ip in ips[start:end]:
proxy = self.nodemgrproxy(ip)
method = getattr(proxy, methodname)
thread = svmthread(ip, method, inargs, outargs, flags)
thread.start()
threads.append(thread)
wait = self.maxwait
start = time.time()
for thread in threads:
thread.join(wait)
now = time.time()
diff = now - start
if diff > self.maxwait:
break
else:
wait = self.maxwait - diff
sem = threading.Semaphore(flags["parallelism"])
threads = []
for ip in ips:
proxy = self.nodemgrproxy(ip)
method = getattr(proxy, methodname)
sem.acquire()
thread = svmthread(ip, method, inargs, outargs, flags, sem)
thread.start()
threads.append(thread)
wait = self.maxwait
start = time.time()
for thread in threads:
thread.join(wait)
now = time.time()
diff = now - start
if diff > self.maxwait:
break
else:
wait = self.maxwait - diff
if len(inargs) == len(outargs):
return 0
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment