Commit 4f6a7f3e authored by Dmitry Duplyakin's avatar Dmitry Duplyakin

SLURM client: add Chef/knife wrapper

Also add retries to ShellCommand class and push_local_file_to_remote_node() in util
parent d0fe48fb
import time
import re
import ast
import logging
from shutil import copyfile
from elasticslice.managers.core import SimpleElasticSliceHelper, \
from elasticslice.util.util import ShellCommand
from elasticslice.util.log import configure_file_logging
from elasticslice.util.util import get_file_contents
from elasticslice.util.util import get_file_contents, push_local_file_to_remote_node
......@@ -67,6 +68,9 @@ class SlurmDynamicManager(SimpleElasticSliceHelper,SimpleElasticSliceManager):
# Add SLURM related capabilities
self.slurm = SlurmScheduler(config=self.config)
# Add Chef/knife wrapper
self.knife = KnifeWrapper(config=self.config)
# File logger for special events
configure_file_logging('file_log', '/tmp/SlurmDynamicManager.log')
self.file_log = logging.getLogger('file_log')
......@@ -115,10 +119,19 @@ class SlurmDynamicManager(SimpleElasticSliceHelper,SimpleElasticSliceManager):
nd = {}
if self.image_urn:
nd["diskimage"] = self.image_urn
if self.startup_command:
nd["startup"] = escape(self.startup_command)
if self.tarballs:
nd["tarballs"] = tarballs
### nd["tarballs"] = tarballs
# Tarballs list specified in the config file will appear as a string
if isinstance(self.tarballs, str):
# Example of tarballs string in the config file:
# [['','/root']]
# Need to parse the string and and pass it a list of 2-member lists
nd["tarballs"] = ast.literal_eval(self.tarballs.lstrip('\'').rstrip('\''))
if self.startup_command:
nd["startup"] = escape(self.startup_command.lstrip('\'').rstrip('\''))
if self.nodetype:
nd['hardware_type'] = self.nodetype
if len(lans) > 0:
......@@ -127,6 +140,7 @@ class SlurmDynamicManager(SimpleElasticSliceHelper,SimpleElasticSliceManager):
retval[nn] = nd
#self.idx += count
self.file_log.debug("Adding nodes with the following attributes: %s" % retval)
return retval
def get_system_state(self):
......@@ -152,7 +166,19 @@ class SlurmDynamicManager(SimpleElasticSliceHelper,SimpleElasticSliceManager):
self.file_log.debug("Handling added node: %s; Status: %s" % (node, status))
all_nodes = self.get_nodenames()
code,output = self.knife.bootstrap_node(node=node['client_id'], run_list='role[nfs],role[slurm]')
self.file_log.debug("Bootstrapped a new node %s with Chef. Code: %s; Output: %s" % (node['client_id'], code, output))
# Push locally updated files to all nodes (including the new one)
for n in all_nodes:
push_local_file_to_remote_node("/etc/hosts", n)
push_local_file_to_remote_node("/etc/slurm-llnl/slurm.conf", n)
code,output = self.knife.run_cmd_on_all_clients(cmd_str="service slurm-llnl restart",retries=1)
self.file_log.debug("Restarting SLURM deamons. Code: %s; Output: %s" % (code, output))
def handle_deleted_node(self,node):
......@@ -168,12 +194,15 @@ class SlurmDynamicManager(SimpleElasticSliceHelper,SimpleElasticSliceManager):
code,output = self.knife.purge_node(node=node['client_id'])
self.file_log.debug("Purging node %s from Chef. Code: %s; Output: %s" % (node['client_id'], code, output))
def get_nodenames(self, include_IPs=False, include_fullnames=False):
This method prints short (virtual) nodenames for all nodes in the experiment.
It can optionally also print external (physical) nodenames,
This method returns a list of short (virtual) nodenames for all nodes in the experiment.
It can optionally also include external (physical) nodenames,
as well as internal (experiment) IP addresses.
wrapped_pgmanifest = self.server.get_wrapped_manifest()
......@@ -230,7 +259,6 @@ class SlurmDynamicManager(SimpleElasticSliceHelper,SimpleElasticSliceManager):
class SlurmScheduler(object):
"""Define operations for interacting with the SLURM resource manager and scheduler"""
def __init__(self, config=None):
LOG.debug("Starting SlurmScheduler object")
self.config = config
# List of dislayed fields; all flags are documented at:
self.squeue_format = "%i,%P,%j,%u,%T,%M,%l,%D,%R,%p,%C,%D,%e"
......@@ -311,6 +339,9 @@ class SlurmScheduler(object):
def list_running_jobs(self, show_header=True):
return self._get_queue_info(job_type="RUNNING", show_header=show_header)
def get_running_job_ids(self):
return [j.split(',')[0] for j in self.list_running_jobs(show_header=False)]
def get_job_count(self):
pj = self.list_pending_jobs(show_header=False)
LOG.debug("Pending jobs: %s" % pj)
......@@ -375,5 +406,38 @@ class SlurmScheduler(object):
# Replace the old file with the new one
class KnifeWrapper(object):
"""Define operations for interacting with Chef via the knife command line utility installed locally"""
def __init__(self, config=None):
self.config = config
def bootstrap_node(self, node=None, run_list=None, retries=5):
if node:
if run_list:
cmd_str = "knife bootstrap %s -N %s -r '%s'" % (node, node, run_list)
cmd_str = "knife bootstrap %s -N %s" % (node, node)
cmd = ShellCommand(cmd_str, retries=retries)
code,output = cmd.execute()
return (code,output)
return (None,None)
def purge_node(self, node=None, retries=5):
if node:
cmd = ShellCommand("knife node delete -y %s && knife client delete -y %s " % (node, node), retries=retries)
code,output = cmd.execute()
return (code,output)
return (None,None)
def run_cmd_on_all_clients(self, cmd_str=None, retries=5):
if cmd_str:
cmd = ShellCommand("knife ssh 'name:*' '%s'" % (cmd_str), retries=retries)
code,output = cmd.execute()
return (code,output)
return (None,None)
import logging
import os
import sys
import time
import subprocess
from ConfigParser import SafeConfigParser
import argparse
......@@ -299,21 +300,28 @@ class Config(object):
class ShellCommand(object):
"""Wrapper for running shell commands"""
def __init__(self, args=[], executable='/bin/bash'):
def __init__(self, args=[], executable='/bin/bash', retries=1):
self.stdout = None
self.stderr = None
self.args = args
self.executable = executable
self.retries = retries
def execute(self,pause=1):
attempts = 0
retval = 1
while (attempts < self.retries) and retval:
process = subprocess.Popen(self.args, shell=True,
(self.stdout, self.stderr) = process.communicate()
attempts += 1
retval = process.returncode
def execute(self):
process = subprocess.Popen(self.args, shell=True,
(self.stdout, self.stderr) = process.communicate()
retval = process.returncode
if retval:
LOG.error("Command \"%s\" failed with message: %s" % (self.args, self.stdout))
LOG.error("Exceeded number of retries=%d. Command \"%s\" failed with message: %s" % (self.retries,self.args, self.stdout))
return retval, None
return retval, self.stdout
......@@ -375,3 +383,27 @@ def get_file_contents(file_path, cleanup=True):
return filter(None, [' '.join(l.rstrip('\n').replace('\t',' ').strip().split()) for l in contents])
return contents
def push_local_file_to_remote_node(file_source, node, file_dest=None, retries=5):
This method wraps the ShellCommand class to run scp command
and copy the specified local file to a remote node.
If file_dest is not specified, the same path, file_source, will be used as destination.
If specified, file_dest should be absolute path.
Number or retries will be passed to ShellCommand.
if not os.path.isfile(file_source):
LOG.error("Argument file_source is not pointing to a local file.")
if not node:
LOG.warning("Argument node should not be empty for the method to actually work.")
abs_file_source = os.path.abspath(file_source)
if file_dest:
cmd_str = "scp %s %s:%s" % (abs_file_source, node, file_dest)
cmd_str = "scp %s %s:%s" % (abs_file_source, node, abs_file_source)
cmd = ShellCommand(cmd_str, retries=retries)
code,output = cmd.execute()
return (code,output)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment