
In addition to importing the project, this commit also includes fixes to pass PEP8 and unit tests on OpenStack CI. Commit history that was imported from the launchpad project: . The first commit's message is: first Synergy implementation . This is the 2nd commit message: added README file . This is the 3rd commit message: added new logging configuration support . This is the 4th commit message: added new logging configuration support . This is the 5th commit message: Synergy repository restructured . This is the 6th commit message: Synergy repository restructured . This is the 7th commit message: TimeManager . This is the 8th commit message: first service implementation . This is the 9th commit message: package synergy.managers.scheduler removed . This is the 10th commit message: Keystone and MYSQL configuration removed . This is the 11th commit message: Clean up setup.py - remove all directory creation, chmoding, etc. This will be handled by the system package. - rename `scripts` as `bin` to follow best practices. . This is the 12th commit message: Fix systemd unit file - remove all the directory creation since it will be done by the system package. . This is the 13th commit message: Move synergy source to parent directory Follow the standard of having the source package directory at the root. This also make it easier to work with setuptools. . This is the 14th commit message: Add dependencies in setup.py . This is the 15th commit message: Add RPM spec file for packaging . This is the 16th commit message: debian/ubuntu package . This is the 17th commit message: add oslo.log as a requirement . This is the 18th commit message: deb pkg: explicit naming for oslo.* . This is the 19th commit message: rename deb package to python-synergy-service . This is the 20th commit message: fix debian package configure step . This is the 21st commit message: remove unecessary init & upstart scripts We will target Ubuntu >= 15.04 and EL >= 7. Both use systemd by default, so we do not need initv and upstart scripts. . This is the 22nd commit message: changed license field: GPL->Apache 2 . This is the 23rd commit message: change license to Apache 2 for rpm & deb . This is the 24th commit message: rename python package to "synergy-service" This is the name that will be used on PyPI. . This is the 25th commit message: oslo_log dependence removed . This is the 26th commit message: remove oslo.log dependency from packaging . This is the 27th commit message: remove oslo package renaming Both python-oslo-* and python-oslo.* are supported on Ubuntu >= 15.04, but only python-oslo.* are supported on Ubuntu 14.04. . This is the 28th commit message: fix oslo imports for Ubuntu14.04 This concerns 2 python packages: - oslo config - oslo messaging Both packages cannot be imported using their "oslo_PKG" name in Ubuntu 14.04. These packages can be imported on both Ubuntu 14.04 and 15.04 using "oslo.PKG". . This is the 29th commit message: fix manager config setup We check for manager in the conf file using the oslo config package. Previously, we relied on a private variable to list the managers. However, this private variable is not present on the oslo.config packaged shipped with ubuntu 14.04. In this commit we change the way we list managers to be compatible with both Ubuntu 14.04 and 15.04. . This is the 30th commit message: Revert "fix oslo imports for Ubuntu14.04" This reverts commit 6ee3b4d54765993d165169a56d54efdfb2653c89. We are going to use try/except imports to deal with oslo{.,_}PKG . This is the 31st commit message: use try/except imports for oslo{.,_} packages The oslo packages are imported using "oslo.PKG" for versions < 2.0.0, but are imported using "oslo_PKG" for versions >= 2.0.0. We use try/except statements to try to import "oslo_PKG" first, and fail over to "oslo.PKG". . This is the 32nd commit message: add packaging README . This is the 33rd commit message: add cleaning functions in packaging scripts This way when a container as finished building, when can easily do a rebuild with: docker start -a -i container_id . This is the 34th commit message: various fixes on packaging - deb: postinst depends on "adduser" package - deb: add upstart & systemv init scripts in deb package - deb: postrm purge now removes init script - deb: override lintian error about systemd/init script - rpm: fix the cleaning stage - rpm: fix /var/lib/synergy not being created and preventing systemd from starting the synergy service . This is the 35th commit message: use entry points to discover managers Also add the TimerManager to the manager entry point. fix centos docker builder The .git repo would not get cleaned up after a failed build, resulting in incapacity of starting the container again. add base files for OpenStack CI Most of the files were created with cookiecutter. http://docs.openstack.org/infra/manual/creators.html#preparing-a-new-git-repository-using-cookiecutter fix centos docker builder The .git repo would not get cleaned up after a failed build, resulting in incapacity of starting the container again. fix setup.py failing to build due to pbr rename service to "synergy" Previously it was either "python-synergy-service" or "synergy" depending on the platform. Change-Id: Iebded1d0712a710d9f71913144bf82be31e6765b
477 lines
17 KiB
Python
477 lines
17 KiB
Python
import eventlet
|
|
import json
|
|
import sys
|
|
|
|
from cgi import escape
|
|
from cgi import parse_qs
|
|
from pkg_resources import iter_entry_points
|
|
from synergy.common import config
|
|
from synergy.common import log as logging
|
|
from synergy.common import serializer
|
|
from synergy.common import service
|
|
from synergy.common import wsgi
|
|
|
|
try:
|
|
from oslo_config import cfg
|
|
except ImportError:
|
|
from oslo.config import cfg
|
|
|
|
|
|
__author__ = "Lisa Zangrando"
|
|
__email__ = "lisa.zangrando[AT]pd.infn.it"
|
|
__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud
|
|
All Rights Reserved
|
|
|
|
Licensed under the Apache License, Version 2.0;
|
|
you may not use this file except in compliance with the
|
|
License. You may obtain a copy of the License at:
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing,
|
|
software distributed under the License is distributed on an
|
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
|
either express or implied.
|
|
See the License for the specific language governing
|
|
permissions and limitations under the License."""
|
|
|
|
|
|
CONF = cfg.CONF
|
|
LOG = None
|
|
MANAGER_ENTRY_POINT = "synergy.managers" # used to discover Synergy managers
|
|
|
|
|
|
class ManagerRPC(object):
|
|
|
|
def __init__(self, managers):
|
|
self.managers = managers
|
|
|
|
def list(self, ctx, **args):
|
|
result = []
|
|
|
|
for name, manager in self.managers.items():
|
|
result.append(name)
|
|
|
|
return result
|
|
|
|
def start(self, ctx, **args):
|
|
manager_name = args.get("arg").get("manager", None)
|
|
result = {}
|
|
|
|
for name, manager in self.managers.items():
|
|
if manager.getStatus() == "ACTIVE" \
|
|
and (not manager_name or manager_name == name):
|
|
LOG.info("starting the %s manager" % (name))
|
|
try:
|
|
# self.managers[name].start()
|
|
self.managers[name].setStatus("RUNNING")
|
|
LOG.info("%s manager started" % (name))
|
|
|
|
result[name] = manager.getStatus()
|
|
except Exception as ex:
|
|
self.managers[name].setStatus("ERROR")
|
|
LOG.error("error occurred during the manager start-up %s"
|
|
% (ex))
|
|
|
|
result[name] = manager.getStatus()
|
|
pass
|
|
|
|
return result
|
|
|
|
def stop(self, ctx, **args):
|
|
manager_name = args.get("arg").get("manager", None)
|
|
result = {}
|
|
|
|
for name, manager in self.managers.items():
|
|
if manager.getStatus() == "RUNNING" \
|
|
and (not manager_name or manager_name == name):
|
|
LOG.info("stopping the %s manager" % (name))
|
|
try:
|
|
# self.managers[name].stop()
|
|
self.managers[name].setStatus("ACTIVE")
|
|
LOG.info("%s manager stopped" % (name))
|
|
|
|
result[name] = manager.getStatus()
|
|
except Exception as ex:
|
|
self.managers[name].setStatus("ERROR")
|
|
LOG.error("error occurred during the manager stop %s"
|
|
% (ex))
|
|
|
|
result[name] = manager.getStatus()
|
|
pass
|
|
|
|
return result
|
|
|
|
def execute(self, ctx, **args):
|
|
manager_name = args.get("arg").get("manager", None)
|
|
command = args.get("arg").get("command", None)
|
|
result = {}
|
|
|
|
if not manager_name:
|
|
result["error"] = "manager name not defined!"
|
|
|
|
if not command:
|
|
result["error"] = "command not defined!"
|
|
|
|
if manager_name in self.managers:
|
|
manager = self.managers[manager_name]
|
|
manager.execute(cmd=command)
|
|
result["command"] = "OK"
|
|
|
|
return result
|
|
|
|
def status(self, ctx, **args):
|
|
manager_name = args.get("arg").get("manager", None)
|
|
result = {}
|
|
|
|
for name, manager in self.managers.items():
|
|
if not manager_name or manager_name == name:
|
|
result[name] = manager.getStatus()
|
|
|
|
return result
|
|
|
|
|
|
class Synergy(service.Service):
|
|
"""Service object for binaries running on hosts.
|
|
|
|
A service takes a manager and enables rpc by listening to queues based
|
|
on topic. It also periodically runs tasks on the manager and reports
|
|
it state to the database services table.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(Synergy, self).__init__("Synergy")
|
|
|
|
self.managers = {}
|
|
|
|
for entry in iter_entry_points(MANAGER_ENTRY_POINT):
|
|
LOG.info("loading manager %r", entry.name)
|
|
|
|
try:
|
|
"""
|
|
found = False
|
|
|
|
try:
|
|
CONF.get(entry.name)
|
|
found = True
|
|
except Exception as ex:
|
|
LOG.info("missing section [%s] in synergy.conf for manager"
|
|
" %r: using the default values"
|
|
% (entry.name, entry.name))
|
|
"""
|
|
|
|
CONF.register_opts(config.manager_opts, group=entry.name)
|
|
|
|
manager_conf = CONF.get(entry.name)
|
|
manager_class = entry.load()
|
|
|
|
manager_obj = manager_class(*args, **kwargs)
|
|
LOG.info("manager instance %r created!", entry.name)
|
|
|
|
manager_obj.setAutoStart(manager_conf.autostart)
|
|
manager_obj.setRate(manager_conf.rate)
|
|
|
|
self.managers[manager_obj.getName()] = manager_obj
|
|
|
|
CONF.register_opts(manager_obj.getOptions(), group=entry.name)
|
|
except Exception as ex:
|
|
LOG.error("Exception has occured", exc_info=1)
|
|
|
|
LOG.error("manager %r instantiation error: %s"
|
|
% (entry.name, ex))
|
|
self.managers[manager_obj.getName()].setStatus("ERROR")
|
|
|
|
raise Exception("manager %r instantiation error: %s"
|
|
% (entry.name, ex))
|
|
|
|
for name, manager in self.managers.items():
|
|
manager.managers = self.managers
|
|
|
|
try:
|
|
manager.setup()
|
|
manager.setStatus("ACTIVE")
|
|
|
|
LOG.info("manager '%s' initialized!" % (manager.getName()))
|
|
except Exception as ex:
|
|
LOG.error("manager '%s' instantiation error: %s" % (name, ex))
|
|
self.managers[manager.getName()].setStatus("ERROR")
|
|
raise ex
|
|
|
|
self.saved_args, self.saved_kwargs = args, kwargs
|
|
|
|
def listManagers(self, environ, start_response):
|
|
result = []
|
|
|
|
for name, manager in self.managers.items():
|
|
result.append(name)
|
|
|
|
start_response("200 OK", [("Content-Type", "text/html")])
|
|
return ["%s" % json.dumps(result)]
|
|
|
|
def getManagerStatus(self, environ, start_response):
|
|
manager_list = None
|
|
result = {}
|
|
|
|
query = environ.get("QUERY_STRING", None)
|
|
|
|
if query:
|
|
parameters = parse_qs(query)
|
|
|
|
if "manager" in parameters:
|
|
if isinstance(parameters['manager'], (list, tuple)):
|
|
manager_list = parameters['manager']
|
|
else:
|
|
manager_list = [parameters['manager']]
|
|
|
|
for manager in manager_list:
|
|
escape(manager)
|
|
|
|
for name, manager in self.managers.items():
|
|
if not manager_list or name in manager_list:
|
|
result[name] = manager.getStatus()
|
|
|
|
if manager_list and len(manager_list) == 1 and len(result) == 0:
|
|
start_response("404 NOT FOUND", [("Content-Type", "text/plain")])
|
|
return ["manager %r not found!" % manager_list[0]]
|
|
else:
|
|
start_response("200 OK", [("Content-Type", "text/html")])
|
|
return ["%s" % json.dumps(result)]
|
|
|
|
def executeCommand(self, environ, start_response):
|
|
manager_name = None
|
|
command = None
|
|
|
|
synergySerializer = serializer.SynergySerializer()
|
|
query = environ.get("QUERY_STRING", None)
|
|
# LOG.info("QUERY_STRING %s" % query)
|
|
if query:
|
|
parameters = parse_qs(query)
|
|
|
|
if "manager" in parameters:
|
|
manager_name = escape(parameters['manager'][0])
|
|
|
|
if "command" in parameters:
|
|
command_string = escape(parameters['command'][0])
|
|
command_string = command_string.replace("'", "\"")
|
|
entity = json.loads(command_string)
|
|
command = synergySerializer.deserialize_entity(context=None,
|
|
entity=entity)
|
|
|
|
if not query or not manager_name or not command:
|
|
start_response("404 NOT FOUND", [("Content-Type", "text/plain")])
|
|
return ["wrong query"]
|
|
|
|
if manager_name in self.managers:
|
|
manager = self.managers[manager_name]
|
|
try:
|
|
manager.execute(cmd=command)
|
|
result = synergySerializer.serialize_entity(context=None,
|
|
entity=command)
|
|
# LOG.info("command result %s" % result)
|
|
|
|
start_response("200 OK", [("Content-Type", "text/html")])
|
|
return ["%s" % json.dumps(result)]
|
|
except Exception as ex:
|
|
LOG.info("executeCommand error: %s" % ex)
|
|
start_response("404 NOT FOUND",
|
|
[("Content-Type", "text/plain")])
|
|
return ["error: %s" % ex]
|
|
else:
|
|
start_response("404 NOT FOUND", [("Content-Type", "text/plain")])
|
|
return ["manager %r not found!" % manager_name]
|
|
|
|
def startManager(self, environ, start_response):
|
|
manager_list = None
|
|
result = {}
|
|
|
|
# synergySerializer = serializer.SynergySerializer()
|
|
query = environ.get("QUERY_STRING", None)
|
|
|
|
if query:
|
|
parameters = parse_qs(query)
|
|
|
|
if "manager" in parameters:
|
|
if isinstance(parameters['manager'], (list, tuple)):
|
|
manager_list = parameters['manager']
|
|
else:
|
|
manager_list = [parameters['manager']]
|
|
|
|
for manager in manager_list:
|
|
escape(manager)
|
|
|
|
for name, manager in self.managers.items():
|
|
if not manager_list or name in manager_list:
|
|
result[name] = {}
|
|
|
|
if manager.getStatus() == "ACTIVE":
|
|
LOG.info("starting the %r manager" % (name))
|
|
try:
|
|
# self.managers[name].start()
|
|
self.managers[name].setStatus("RUNNING")
|
|
LOG.info("%r manager started!" % (name))
|
|
|
|
result[name]["message"] = "started successfully"
|
|
except Exception as ex:
|
|
self.managers[name].setStatus("ERROR")
|
|
LOG.error("error occurred during the manager start-up"
|
|
"%s" % (ex))
|
|
|
|
result[name]["message"] = "ERROR: %s" % ex
|
|
pass
|
|
else:
|
|
result[name]["message"] = "WARN: already started"
|
|
|
|
result[name]["status"] = manager.getStatus()
|
|
|
|
if manager_list and len(manager_list) == 1 and len(result) == 0:
|
|
start_response("404 NOT FOUND", [("Content-Type", "text/plain")])
|
|
return ["manager %r not found!" % manager_list[0]]
|
|
else:
|
|
start_response("200 OK", [("Content-Type", "text/html")])
|
|
return ["%s" % json.dumps(result)]
|
|
|
|
def stopManager(self, environ, start_response):
|
|
manager_list = None
|
|
result = {}
|
|
|
|
query = environ.get("QUERY_STRING", None)
|
|
|
|
if query:
|
|
parameters = parse_qs(query)
|
|
|
|
if "manager" in parameters:
|
|
if isinstance(parameters['manager'], (list, tuple)):
|
|
manager_list = parameters['manager']
|
|
else:
|
|
manager_list = [parameters['manager']]
|
|
|
|
for manager in manager_list:
|
|
escape(manager)
|
|
|
|
for name, manager in self.managers.items():
|
|
if not manager_list or name in manager_list:
|
|
result[name] = {}
|
|
|
|
if manager.getStatus() == "RUNNING":
|
|
LOG.info("stopping the %r manager" % (name))
|
|
try:
|
|
# self.managers[name].stop()
|
|
self.managers[name].setStatus("ACTIVE")
|
|
LOG.info("%r manager stopped!" % (name))
|
|
|
|
result[name]["message"] = "stopped successfully"
|
|
except Exception as ex:
|
|
self.managers[name].setStatus("ERROR")
|
|
LOG.error("error occurred during the manager stop: %s"
|
|
% (ex))
|
|
|
|
result[name]["message"] = "ERROR: %s" % ex
|
|
pass
|
|
else:
|
|
result[name]["message"] = "WARN: already stopped"
|
|
|
|
result[name]["status"] = manager.getStatus()
|
|
|
|
if manager_list and len(manager_list) == 1 and len(result) == 0:
|
|
start_response("404 NOT FOUND", [("Content-Type", "text/plain")])
|
|
return ["manager %r not found!" % manager_list[0]]
|
|
else:
|
|
start_response("200 OK", [("Content-Type", "text/html")])
|
|
return ["%s" % json.dumps(result)]
|
|
|
|
def start(self):
|
|
self.model_disconnected = False
|
|
|
|
for name, manager in self.managers.items():
|
|
if manager.getStatus() != "ERROR" and manager.isAutoStart():
|
|
try:
|
|
LOG.info("starting the %r manager" % (name))
|
|
manager.start()
|
|
manager.setStatus("RUNNING")
|
|
LOG.info("%r manager started! (rate=%s min)"
|
|
% (name, manager.getRate()))
|
|
except Exception as ex:
|
|
LOG.error("error occurred during the manager start %s"
|
|
% (ex))
|
|
manager.setStatus("ERROR")
|
|
raise ex
|
|
|
|
self.wsgi_server = wsgi.Server(
|
|
name="WSGI server",
|
|
host_name=CONF.WSGI.host,
|
|
host_port=CONF.WSGI.port,
|
|
threads=CONF.WSGI.threads,
|
|
use_ssl=CONF.WSGI.use_ssl,
|
|
ssl_ca_file=CONF.WSGI.ssl_ca_file,
|
|
ssl_cert_file=CONF.WSGI.ssl_cert_file,
|
|
ssl_key_file=CONF.WSGI.ssl_key_file,
|
|
max_header_line=CONF.WSGI.max_header_line,
|
|
retry_until_window=CONF.WSGI.retry_until_window,
|
|
tcp_keepidle=CONF.WSGI.tcp_keepidle,
|
|
backlog=CONF.WSGI.backlog)
|
|
|
|
self.wsgi_server.register(r'^$', wsgi.index)
|
|
self.wsgi_server.register(r'synergy/list', self.listManagers)
|
|
self.wsgi_server.register(r'synergy/status', self.getManagerStatus)
|
|
self.wsgi_server.register(r'synergy/execute', self.executeCommand)
|
|
self.wsgi_server.register(r'synergy/start', self.startManager)
|
|
self.wsgi_server.register(r'synergy/stop', self.stopManager)
|
|
self.wsgi_server.start()
|
|
|
|
LOG.info("STARTED!")
|
|
self.wsgi_server.wait()
|
|
|
|
def kill(self):
|
|
"""Destroy the service object in the datastore."""
|
|
LOG.warn("killing service")
|
|
self.stop()
|
|
LOG.warn("Service killed")
|
|
|
|
def stop(self):
|
|
for name, manager in self.managers.items():
|
|
LOG.info("destroying the %s manager" % (name))
|
|
try:
|
|
manager.setStatus("DESTROYED")
|
|
manager.destroy()
|
|
# manager.join()
|
|
# LOG.info("%s manager destroyed" % (name))
|
|
except Exception as ex:
|
|
manager.setStatus("ERROR")
|
|
LOG.error("error occurred during the manager destruction: %s"
|
|
% ex)
|
|
|
|
if self.wsgi_server:
|
|
self.wsgi_server.stop()
|
|
|
|
LOG.info("STOPPED!")
|
|
|
|
|
|
def main():
|
|
try:
|
|
eventlet.monkey_patch(os=False)
|
|
|
|
# the configuration will be into the cfg.CONF global data structure
|
|
config.parse_args(args=sys.argv[1:],
|
|
default_config_files=["/etc/synergy/synergy.conf"])
|
|
|
|
if not cfg.CONF.config_file:
|
|
sys.exit("ERROR: Unable to find configuration file via the "
|
|
"default search paths (~/.synergy/, ~/, /etc/synergy/"
|
|
", /etc/) and the '--config-file' option!")
|
|
|
|
global LOG
|
|
# LOG = logging.getLogger(None)
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
LOG.info("Starting Synergy...")
|
|
|
|
# set session ID to this process so we can kill group in sigterm
|
|
# os.setsid()
|
|
|
|
server = Synergy()
|
|
server.start()
|
|
|
|
LOG.info("Synergy started")
|
|
except Exception as ex:
|
|
LOG.error("unrecoverable error: %s" % ex)
|