Commit 4525c033 authored by David Johnson's avatar David Johnson

Ensure wfagents are running when agent restarts; cleanup stale wfa netnses.

(Also strip out some old code that I had thought about using, but won't.)
parent 04033e0a
Pipeline #1146 skipped
......@@ -952,6 +952,7 @@ class CapnetWorkflowAgentManager(object):
self.agents = {}
self.wfapp_launcher = conf.CAPNET.wfapp_launcher
self.rundir = conf.CAPNET.capnet_controller_rundir
self.ip = ip_lib.IPWrapper()
pass
def get_interface_name(self,port):
......@@ -997,21 +998,21 @@ class CapnetWorkflowAgentManager(object):
port = self.agent.capnet_ext_plugin_rpc.wfagent_port_create(port_dict)
return port
## def _cleanup_stale_devices(self, network, dhcp_port):
## LOG.debug("Cleaning stale devices for network %s", network.id)
## dev_name = self.ifdriver.get_device_name(dhcp_port)
## ns_ip = ip_lib.IPWrapper(namespace=network.namespace)
## for d in ns_ip.get_devices(exclude_loopback=True):
## # delete all devices except current active DHCP port device
## if d.name != dev_name:
## LOG.debug("Found stale device %s, deleting", d.name)
## self.ifdriver.unplug(d.name, namespace=network.namespace)
def cleanup_stale_namespaces(self):
LOG.debug("Cleaning stale wfa namespaces")
for ns in self.ip.get_namespaces():
if not ns.startswith('qwfa'):
continue
ns_ip = ip_lib.IPWrapper(namespace=ns)
if ns_ip.namespace_is_empty():
LOG.debug("Cleaning stale wfa namespace %s" % (ns,))
ns_ip.garbage_collect_namespace()
pass
pass
def _setup(self,context,wfagent):
"""
Create and initialize a device for the wfagent on this host.
"""
port = self._setup_wfagent_port(context,wfagent)
def ensure_running(self,wfagent,port):
self.__add_namespace(wfagent)
network_id = port.network_id
network = self.agent.get_network(network_id)
interface_name = self.get_interface_name(port)
......@@ -1037,12 +1038,6 @@ class CapnetWorkflowAgentManager(object):
self.ifdriver.init_l3(interface_name, ip_cidrs,
namespace=wfagent.namespace)
# ensure that the dhcp interface is first in the list
if wfagent.namespace is None:
device = ip_lib.IPDevice(interface_name)
device.route.pullup_route(interface_name,
ip_version=constants.IP_VERSION_4)
## if self.use_namespaces:
## self._set_default_route(network, interface_name)
## try:
......@@ -1055,8 +1050,8 @@ class CapnetWorkflowAgentManager(object):
# actually run the agent program... need a rootwrap wrapper.
LOG.debug("launching workflow app:")
pidfile = "%s/wfagent.%s.pid" % (self.rundir,wfagent.id)
logfile = "%s/wfagent.%s.log" % (self.rundir,wfagent.id)
pidfile = "%s/wfagent.%s.%s.pid" % (self.rundir,wfagent.name,wfagent.id)
logfile = "%s/wfagent.%s.%s.log" % (self.rundir,wfagent.name,wfagent.id)
cmd = [ 'ip','netns','exec',str(wfagent.namespace),
self.wfapp_launcher,pidfile,logfile,interface_name,
wfagent.wfapp_path ]
......@@ -1070,6 +1065,20 @@ class CapnetWorkflowAgentManager(object):
LOG.debug("calling wfagent_update: %s" % (str(wfagent),))
self.agent.capnet_ext_plugin_rpc.wfagent_update(wfagent)
return interface_name
def _setup(self,context,wfagent):
"""
Create and initialize a device for the wfagent on this host.
"""
try:
self.cleanup_stale_namespaces()
except:
LOG.error(_LE("Exception during stale wfa namespace cleanup"))
pass
port = self._setup_wfagent_port(context,wfagent)
interface_name = self.ensure_running(wfagent,port)
return interface_name
......@@ -1094,6 +1103,13 @@ class CapnetWorkflowAgentManager(object):
if wfagent.id in self.agents:
del self.agents[wfagent.id]
pass
try:
self.cleanup_stale_namespaces()
except:
LOG.error(_LE("Exception during stale wfa namespace cleanup"))
pass
pass
def _bind(self,context,wfagent):
......@@ -1148,83 +1164,6 @@ class CapnetWorkflowAgentManager(object):
return self._setup(context,wfagent)
pass
###
### XXX YYY ZZZ
###
def enable(self):
"""Enables DHCP for this network by spawning a local process."""
if self.active:
self.restart()
elif self._enable_dhcp():
commonutils.ensure_dir(self.network_conf_dir)
interface_name = self.device_manager.setup(self.network)
self.interface_name = interface_name
self.spawn_process()
def _get_process_manager(self, cmd_callback=None):
return external_process.ProcessManager(
conf=self.conf,
uuid=self.network.id,
namespace=self.network.namespace,
default_cmd_callback=cmd_callback,
pid_file=self.get_conf_file_name('pid'),
run_as_root=True)
def disable(self, retain_port=False):
"""Disable DHCP for this network by killing the local process."""
self.process_monitor.unregister(self.network.id, DNSMASQ_SERVICE_NAME)
self._get_process_manager().disable()
if not retain_port:
self._destroy_namespace_and_port()
self._remove_config_files()
def _destroy_namespace_and_port(self):
try:
self.device_manager.destroy(self.network, self.interface_name)
except RuntimeError:
LOG.warning(_LW('Failed trying to delete interface: %s'),
self.interface_name)
if self.conf.dhcp_delete_namespaces and self.network.namespace:
ns_ip = ip_lib.IPWrapper(namespace=self.network.namespace)
try:
ns_ip.netns.delete(self.network.namespace)
except RuntimeError:
LOG.warning(_LW('Failed trying to delete namespace: %s'),
self.network.namespace)
@property
def active(self):
return self._get_process_manager().active
def spawn_process(self):
"""Spawn the process, if it's not spawned already."""
# we only need to generate the lease file the first time dnsmasq starts
# rather than on every reload since dnsmasq will keep the file current
self._output_init_lease_file()
self._spawn_or_reload_process(reload_with_HUP=False)
def _spawn_or_reload_process(self, reload_with_HUP):
"""Spawns or reloads a Dnsmasq process for the network.
When reload_with_HUP is True, dnsmasq receives a HUP signal,
or it's reloaded if the process is not running.
"""
self._output_config_files()
pm = self._get_process_manager(
cmd_callback=self._build_cmdline_callback)
pm.enable(reload_cfg=reload_with_HUP)
self.process_monitor.register(uuid=self.network.id,
service_name=DNSMASQ_SERVICE_NAME,
monitored_process=pm)
pass
......@@ -1550,6 +1489,20 @@ class CapnetNeutronAgent(service.Service):
# metadata written.
self.run_physical_bridge_controllers()
try:
self.wfagent_manager.cleanup_stale_namespaces()
except:
LOG.error(_LE("Exception during stale wfa namespace cleanup"))
pass
# Fire off any workflow agents that need to be running...
for binding in bindings:
if 'wfagent' in binding and 'port' in binding:
self.wfagent_manager.ensure_running(binding['wfagent'],
binding['port'])
pass
pass
self.daemon_loop()
@lockutils.synchronized('capnet_agent_stop')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment