[LOGGING] Change to use oslo logging.
Co-Authored-By: David Shrewsbury <shrewsbury.dave@gmail.com> Change-Id: I8bb49825ca1a660a0a7bdbe5fc141e995e5e10fb
This commit is contained in:
parent
bfb0fde961
commit
b99bc9f201
35
etc/logging.conf
Normal file
35
etc/logging.conf
Normal file
@ -0,0 +1,35 @@
|
||||
[loggers]
|
||||
keys=root
|
||||
|
||||
[logger_root]
|
||||
level=DEBUG
|
||||
handlers=screen,rotating_file
|
||||
|
||||
[formatters]
|
||||
keys=simple,ts,newline
|
||||
|
||||
[formatter_simple]
|
||||
format=%(name)s - %(levelname)s - %(message)s
|
||||
|
||||
[formatter_ts]
|
||||
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
|
||||
|
||||
[formatter_newline]
|
||||
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
|
||||
class=libra.common.log.NewlineFormatter
|
||||
|
||||
[handlers]
|
||||
keys=rotating_file,screen
|
||||
|
||||
[handler_rotating_file]
|
||||
formatter=newline
|
||||
class=libra.common.log.CompressedTimedRotatingFileHandler
|
||||
level=DEBUG
|
||||
args=('/var/log/libra/libra.log',)
|
||||
|
||||
[handler_screen]
|
||||
class=StreamHandler
|
||||
formatter=ts
|
||||
level=AUDIT
|
||||
args=(sys.stdout,)
|
||||
|
@ -42,9 +42,6 @@ cfg.CONF.register_opts(
|
||||
cfg.StrOpt('host',
|
||||
default='0.0.0.0',
|
||||
help='IP address to bind to, 0.0.0.0 for all IPs'),
|
||||
cfg.StrOpt('logfile',
|
||||
default='/var/log/libra/libra_admin_api.log',
|
||||
help='Log file'),
|
||||
cfg.IntOpt('node_pool_size',
|
||||
default=10,
|
||||
help='Number of hot spare devices to keep in the pool'),
|
||||
|
@ -18,6 +18,7 @@ import daemon
|
||||
import daemon.pidfile
|
||||
import daemon.runner
|
||||
import grp
|
||||
import logging as std_logging
|
||||
import pwd
|
||||
import pecan
|
||||
import sys
|
||||
@ -34,7 +35,12 @@ from libra.admin_api.expunge.expunge import ExpungeScheduler
|
||||
from libra.admin_api import config as api_config
|
||||
from libra.admin_api import model
|
||||
from libra.openstack.common import importutils
|
||||
from libra.common.options import add_common_opts, libra_logging, CONF
|
||||
from libra.openstack.common import log as logging
|
||||
from libra.common.options import add_common_opts, CONF
|
||||
from libra.common.log import get_descriptors
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_pecan_config():
|
||||
@ -83,18 +89,17 @@ def setup_app(pecan_config):
|
||||
|
||||
|
||||
class MaintThreads(object):
|
||||
def __init__(self, logger, drivers):
|
||||
def __init__(self, drivers):
|
||||
self.classes = []
|
||||
self.logger = logger
|
||||
self.drivers = drivers
|
||||
signal.signal(signal.SIGINT, self.exit_handler)
|
||||
signal.signal(signal.SIGTERM, self.exit_handler)
|
||||
self.run_threads()
|
||||
|
||||
def run_threads(self):
|
||||
stats = Stats(self.logger, self.drivers)
|
||||
pool = Pool(self.logger)
|
||||
expunge = ExpungeScheduler(self.logger)
|
||||
stats = Stats(self.drivers)
|
||||
pool = Pool()
|
||||
expunge = ExpungeScheduler()
|
||||
self.classes.append(stats)
|
||||
self.classes.append(pool)
|
||||
self.classes.append(expunge)
|
||||
@ -104,35 +109,46 @@ class MaintThreads(object):
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
for function in self.classes:
|
||||
function.shutdown()
|
||||
self.logger.info("Safely shutting down")
|
||||
sys.exit()
|
||||
|
||||
|
||||
class LogStdout(object):
|
||||
def __init__(self, logger):
|
||||
self.logger = logger.info
|
||||
|
||||
def write(self, data):
|
||||
if data.strip() != '':
|
||||
self.logger(data)
|
||||
LOG.info(data)
|
||||
|
||||
|
||||
def main():
|
||||
add_common_opts()
|
||||
CONF(project='libra', version=__version__)
|
||||
|
||||
logging.setup('libra')
|
||||
|
||||
LOG.debug('Configuration:')
|
||||
CONF.log_opt_values(LOG, std_logging.DEBUG)
|
||||
|
||||
drivers = []
|
||||
|
||||
pc = get_pecan_config()
|
||||
|
||||
sock = server.make_socket(CONF['admin_api']['host'],
|
||||
CONF['admin_api']['port'],
|
||||
CONF['admin_api']['ssl_keyfile'],
|
||||
CONF['admin_api']['ssl_certfile'])
|
||||
|
||||
if CONF['daemon']:
|
||||
pidfile = daemon.pidfile.TimeoutPIDLockFile(CONF['admin_api']['pid'],
|
||||
10)
|
||||
if daemon.runner.is_pidfile_stale(pidfile):
|
||||
pidfile.break_lock()
|
||||
|
||||
descriptors = get_descriptors()
|
||||
descriptors.append(sock.fileno())
|
||||
context = daemon.DaemonContext(
|
||||
working_directory='/',
|
||||
umask=0o022,
|
||||
pidfile=pidfile
|
||||
pidfile=pidfile,
|
||||
files_preserve=descriptors
|
||||
)
|
||||
if CONF['user']:
|
||||
context.uid = pwd.getpwnam(CONF['user']).pw_uid
|
||||
@ -141,21 +157,15 @@ def main():
|
||||
context.open()
|
||||
|
||||
# Use the root logger due to lots of services using logger
|
||||
logger = libra_logging('', 'admin_api')
|
||||
logger.info('Starting on {0}:{1}'.format(CONF['admin_api']['host'],
|
||||
CONF['admin_api']['port']))
|
||||
LOG.info('Starting on %s:%d', CONF.admin_api.host, CONF.admin_api.port)
|
||||
api = setup_app(pc)
|
||||
|
||||
for driver in CONF['admin_api']['stats_driver']:
|
||||
drivers.append(importutils.import_class(known_drivers[driver]))
|
||||
|
||||
MaintThreads(logger, drivers)
|
||||
sys.stderr = LogStdout(logger)
|
||||
MaintThreads(drivers)
|
||||
sys.stderr = LogStdout()
|
||||
|
||||
sock = server.make_socket(CONF['admin_api']['host'],
|
||||
CONF['admin_api']['port'],
|
||||
CONF['admin_api']['ssl_keyfile'],
|
||||
CONF['admin_api']['ssl_certfile'])
|
||||
wsgi.server(sock, api, keepalive=False)
|
||||
|
||||
return 0
|
||||
|
@ -14,7 +14,6 @@
|
||||
# under the License.
|
||||
|
||||
# pecan imports
|
||||
import logging
|
||||
from pecan import expose, response, abort
|
||||
from pecan.rest import RestController
|
||||
import wsmeext.pecan as wsme_pecan
|
||||
@ -22,6 +21,9 @@ from wsme.exc import ClientSideError
|
||||
from libra.admin_api.model.validators import DeviceResp, DevicePost, DevicePut
|
||||
from libra.common.api.lbaas import LoadBalancer, Device, db_session
|
||||
from libra.common.api.lbaas import loadbalancers_devices
|
||||
from libra.openstack.common import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DevicesController(RestController):
|
||||
@ -198,8 +200,7 @@ class DevicesController(RestController):
|
||||
session.commit()
|
||||
return return_data
|
||||
except:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.exception('Error communicating with load balancer pool')
|
||||
LOG.exception('Error communicating with load balancer pool')
|
||||
errstr = 'Error communicating with load balancer pool'
|
||||
session.rollback()
|
||||
raise ClientSideError(errstr)
|
||||
@ -296,8 +297,7 @@ class DevicesController(RestController):
|
||||
return None
|
||||
except:
|
||||
session.rollback()
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.exception('Error deleting device from pool')
|
||||
LOG.exception('Error deleting device from pool')
|
||||
response.status = 500
|
||||
return dict(
|
||||
faultcode="Server",
|
||||
|
@ -22,9 +22,12 @@ from sqlalchemy import func
|
||||
|
||||
from libra.common.api.lbaas import Device, PoolBuilding, Vip, db_session
|
||||
from libra.common.json_gearman import JSONGearmanClient
|
||||
from libra.openstack.common import log
|
||||
|
||||
#TODO: Lots of duplication of code here, need to cleanup
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Pool(object):
|
||||
|
||||
@ -32,8 +35,7 @@ class Pool(object):
|
||||
PROBE_SECONDS = 30
|
||||
VIPS_SECONDS = 50
|
||||
|
||||
def __init__(self, logger):
|
||||
self.logger = logger
|
||||
def __init__(self):
|
||||
self.probe_timer = None
|
||||
self.delete_timer = None
|
||||
self.vips_time = None
|
||||
@ -58,10 +60,10 @@ class Pool(object):
|
||||
""" Searches for all devices in the DELETED state and removes them """
|
||||
minute = datetime.now().minute
|
||||
if self.server_id != minute % self.number_of_servers:
|
||||
self.logger.info('Not our turn to run delete check, sleeping')
|
||||
LOG.info('Not our turn to run delete check, sleeping')
|
||||
self.start_delete_sched()
|
||||
return
|
||||
self.logger.info('Running device delete check')
|
||||
LOG.info('Running device delete check')
|
||||
try:
|
||||
message = []
|
||||
with db_session() as session:
|
||||
@ -76,29 +78,29 @@ class Pool(object):
|
||||
message.append(dict(task='libra_pool_mgm', data=job_data))
|
||||
session.commit()
|
||||
if not message:
|
||||
self.logger.info("No devices to delete")
|
||||
LOG.info("No devices to delete")
|
||||
else:
|
||||
gear = GearmanWork(self.logger)
|
||||
gear = GearmanWork()
|
||||
gear.send_delete_message(message)
|
||||
except:
|
||||
self.logger.exception("Exception when deleting devices")
|
||||
LOG.exception("Exception when deleting devices")
|
||||
|
||||
self.start_delete_sched()
|
||||
|
||||
def probe_vips(self):
|
||||
minute = datetime.now().minute
|
||||
if self.server_id != minute % self.number_of_servers:
|
||||
self.logger.info('Not our turn to run vips check, sleeping')
|
||||
LOG.info('Not our turn to run vips check, sleeping')
|
||||
self.start_vips_sched()
|
||||
return
|
||||
self.logger.info('Running vips count probe check')
|
||||
LOG.info('Running vips count probe check')
|
||||
try:
|
||||
with db_session() as session:
|
||||
NULL = None # For pep8
|
||||
vip_count = session.query(Vip).\
|
||||
filter(Vip.device == NULL).count()
|
||||
if vip_count >= self.vip_pool_size:
|
||||
self.logger.info("Enough vips exist, no work to do")
|
||||
LOG.info("Enough vips exist, no work to do")
|
||||
session.commit()
|
||||
self.start_vips_sched()
|
||||
return
|
||||
@ -106,7 +108,7 @@ class Pool(object):
|
||||
build_count = self.vip_pool_size - vip_count
|
||||
self._build_vips(build_count)
|
||||
except:
|
||||
self.logger.exception(
|
||||
LOG.exception(
|
||||
"Uncaught exception during vip pool expansion"
|
||||
)
|
||||
self.start_vips_sched()
|
||||
@ -114,10 +116,10 @@ class Pool(object):
|
||||
def probe_devices(self):
|
||||
minute = datetime.now().minute
|
||||
if self.server_id != minute % self.number_of_servers:
|
||||
self.logger.info('Not our turn to run probe check, sleeping')
|
||||
LOG.info('Not our turn to run probe check, sleeping')
|
||||
self.start_probe_sched()
|
||||
return
|
||||
self.logger.info('Running device count probe check')
|
||||
LOG.info('Running device count probe check')
|
||||
try:
|
||||
with db_session() as session:
|
||||
# Double check we have no outstanding builds assigned to us
|
||||
@ -128,7 +130,7 @@ class Pool(object):
|
||||
dev_count = session.query(Device).\
|
||||
filter(Device.status == 'OFFLINE').count()
|
||||
if dev_count >= self.node_pool_size:
|
||||
self.logger.info("Enough devices exist, no work to do")
|
||||
LOG.info("Enough devices exist, no work to do")
|
||||
session.commit()
|
||||
self.start_probe_sched()
|
||||
return
|
||||
@ -140,7 +142,7 @@ class Pool(object):
|
||||
else:
|
||||
built = built[0]
|
||||
if build_count - built <= 0:
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
"Other servers are building enough nodes"
|
||||
)
|
||||
session.commit()
|
||||
@ -162,7 +164,7 @@ class Pool(object):
|
||||
delete()
|
||||
session.commit()
|
||||
except:
|
||||
self.logger.exception("Uncaught exception during pool expansion")
|
||||
LOG.exception("Uncaught exception during pool expansion")
|
||||
self.start_probe_sched()
|
||||
|
||||
def _build_nodes(self, count):
|
||||
@ -172,7 +174,7 @@ class Pool(object):
|
||||
while it < count:
|
||||
message.append(dict(task='libra_pool_mgm', data=job_data))
|
||||
it += 1
|
||||
gear = GearmanWork(self.logger)
|
||||
gear = GearmanWork()
|
||||
gear.send_create_message(message)
|
||||
|
||||
def _build_vips(self, count):
|
||||
@ -182,7 +184,7 @@ class Pool(object):
|
||||
while it < count:
|
||||
message.append(dict(task='libra_pool_mgm', data=job_data))
|
||||
it += 1
|
||||
gear = GearmanWork(self.logger)
|
||||
gear = GearmanWork()
|
||||
gear.send_vips_message(message)
|
||||
|
||||
def start_probe_sched(self):
|
||||
@ -192,8 +194,7 @@ class Pool(object):
|
||||
else:
|
||||
sleeptime = 60 - (seconds - self.PROBE_SECONDS)
|
||||
|
||||
self.logger.info('Pool probe check timer sleeping for {secs} seconds'
|
||||
.format(secs=sleeptime))
|
||||
LOG.info('Pool probe check timer sleeping for %d seconds', sleeptime)
|
||||
self.probe_timer = threading.Timer(sleeptime, self.probe_devices, ())
|
||||
self.probe_timer.start()
|
||||
|
||||
@ -204,8 +205,7 @@ class Pool(object):
|
||||
else:
|
||||
sleeptime = 60 - (seconds - self.VIPS_SECONDS)
|
||||
|
||||
self.logger.info('Pool vips check timer sleeping for {secs} seconds'
|
||||
.format(secs=sleeptime))
|
||||
LOG.info('Pool vips check timer sleeping for %d seconds', sleeptime)
|
||||
self.vips_timer = threading.Timer(sleeptime, self.probe_vips, ())
|
||||
self.vips_timer.start()
|
||||
|
||||
@ -216,16 +216,14 @@ class Pool(object):
|
||||
else:
|
||||
sleeptime = 60 - (seconds - self.DELETE_SECONDS)
|
||||
|
||||
self.logger.info('Pool delete check timer sleeping for {secs} seconds'
|
||||
.format(secs=sleeptime))
|
||||
LOG.info('Pool delete check timer sleeping for %d seconds', sleeptime)
|
||||
self.delete_timer = threading.Timer(sleeptime, self.delete_devices, ())
|
||||
self.delete_timer.start()
|
||||
|
||||
|
||||
class GearmanWork(object):
|
||||
|
||||
def __init__(self, logger):
|
||||
self.logger = logger
|
||||
def __init__(self):
|
||||
server_list = []
|
||||
for server in cfg.CONF['gearman']['servers']:
|
||||
host, port = server.split(':')
|
||||
@ -242,7 +240,7 @@ class GearmanWork(object):
|
||||
self.gearman_client = JSONGearmanClient(server_list)
|
||||
|
||||
def send_delete_message(self, message):
|
||||
self.logger.info("Sending {0} gearman messages".format(len(message)))
|
||||
LOG.info("Sending %d gearman messages", len(message))
|
||||
job_status = self.gearman_client.submit_multiple_jobs(
|
||||
message, background=False, wait_until_complete=True,
|
||||
max_retries=10, poll_timeout=30.0
|
||||
@ -250,13 +248,13 @@ class GearmanWork(object):
|
||||
delete_count = 0
|
||||
for status in job_status:
|
||||
if status.state == JOB_UNKNOWN:
|
||||
self.logger.error('Gearman Job server fail')
|
||||
LOG.error('Gearman Job server fail')
|
||||
continue
|
||||
if status.timed_out:
|
||||
self.logger.error('Gearman timeout whilst deleting device')
|
||||
LOG.error('Gearman timeout whilst deleting device')
|
||||
continue
|
||||
if status.result['response'] == 'FAIL':
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Pool manager failed to delete a device, removing from DB'
|
||||
)
|
||||
|
||||
@ -266,13 +264,11 @@ class GearmanWork(object):
|
||||
filter(Device.name == status.result['name']).delete()
|
||||
session.commit()
|
||||
|
||||
self.logger.info(
|
||||
'{nodes} freed devices delete from pool'.format(nodes=delete_count)
|
||||
)
|
||||
LOG.info('%d freed devices delete from pool', delete_count)
|
||||
|
||||
def send_vips_message(self, message):
|
||||
# TODO: make this gearman part more async, not wait for all builds
|
||||
self.logger.info("Sending {0} gearman messages".format(len(message)))
|
||||
LOG.info("Sending %d gearman messages", len(message))
|
||||
job_status = self.gearman_client.submit_multiple_jobs(
|
||||
message, background=False, wait_until_complete=True,
|
||||
max_retries=10, poll_timeout=3600.0
|
||||
@ -280,30 +276,30 @@ class GearmanWork(object):
|
||||
built_count = 0
|
||||
for status in job_status:
|
||||
if status.state == JOB_UNKNOWN:
|
||||
self.logger.error('Gearman Job server fail')
|
||||
LOG.error('Gearman Job server fail')
|
||||
continue
|
||||
if status.timed_out:
|
||||
self.logger.error('Gearman timeout whilst building vip')
|
||||
LOG.error('Gearman timeout whilst building vip')
|
||||
continue
|
||||
if status.result['response'] == 'FAIL':
|
||||
self.logger.error('Pool manager failed to build a vip')
|
||||
LOG.error('Pool manager failed to build a vip')
|
||||
continue
|
||||
|
||||
built_count += 1
|
||||
try:
|
||||
self._add_vip(status.result)
|
||||
except:
|
||||
self.logger.exception(
|
||||
LOG.exception(
|
||||
'Could not add vip to DB, node data: {0}'
|
||||
.format(status.result)
|
||||
)
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
'{vips} vips built and added to pool'.format(vips=built_count)
|
||||
)
|
||||
|
||||
def send_create_message(self, message):
|
||||
# TODO: make this gearman part more async, not wait for all builds
|
||||
self.logger.info("Sending {0} gearman messages".format(len(message)))
|
||||
LOG.info("Sending {0} gearman messages".format(len(message)))
|
||||
job_status = self.gearman_client.submit_multiple_jobs(
|
||||
message, background=False, wait_until_complete=True,
|
||||
max_retries=10, poll_timeout=3600.0
|
||||
@ -311,29 +307,29 @@ class GearmanWork(object):
|
||||
built_count = 0
|
||||
for status in job_status:
|
||||
if status.state == JOB_UNKNOWN:
|
||||
self.logger.error('Gearman Job server fail')
|
||||
LOG.error('Gearman Job server fail')
|
||||
continue
|
||||
if status.timed_out:
|
||||
self.logger.error('Gearman timeout whilst building device')
|
||||
LOG.error('Gearman timeout whilst building device')
|
||||
continue
|
||||
if status.result['response'] == 'FAIL':
|
||||
self.logger.error('Pool manager failed to build a device')
|
||||
LOG.error('Pool manager failed to build a device')
|
||||
continue
|
||||
|
||||
built_count += 1
|
||||
try:
|
||||
self._add_node(status.result)
|
||||
except:
|
||||
self.logger.exception(
|
||||
LOG.exception(
|
||||
'Could not add node to DB, node data: {0}'
|
||||
.format(status.result)
|
||||
)
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
'{nodes} devices built and added to pool'.format(nodes=built_count)
|
||||
)
|
||||
|
||||
def _add_vip(self, data):
|
||||
self.logger.info('Adding vip {0} to DB'.format(data['ip']))
|
||||
LOG.info('Adding vip {0} to DB'.format(data['ip']))
|
||||
vip = Vip()
|
||||
vip.ip = int(ipaddress.IPv4Address(unicode(data['ip'])))
|
||||
with db_session() as session:
|
||||
@ -341,7 +337,7 @@ class GearmanWork(object):
|
||||
session.commit()
|
||||
|
||||
def _add_node(self, data):
|
||||
self.logger.info('Adding device {0} to DB'.format(data['name']))
|
||||
LOG.info('Adding device {0} to DB'.format(data['name']))
|
||||
device = Device()
|
||||
device.name = data['name']
|
||||
device.publicIpAddr = data['addr']
|
||||
|
@ -18,16 +18,19 @@ from datetime import datetime, timedelta
|
||||
from oslo.config import cfg
|
||||
|
||||
from libra.common.api.lbaas import LoadBalancer, db_session
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ExpungeScheduler(object):
|
||||
def __init__(self, logger):
|
||||
def __init__(self):
|
||||
self.expunge_timer = None
|
||||
self.expire_days = cfg.CONF['admin_api']['expire_days']
|
||||
if not self.expire_days:
|
||||
logger.info('Expunge not configured, disabled')
|
||||
LOG.info('Expunge not configured, disabled')
|
||||
return
|
||||
self.logger = logger
|
||||
self.server_id = cfg.CONF['admin_api']['server_id']
|
||||
self.number_of_servers = cfg.CONF['admin_api']['number_of_servers']
|
||||
self.run_expunge()
|
||||
@ -39,7 +42,7 @@ class ExpungeScheduler(object):
|
||||
def run_expunge(self):
|
||||
day = datetime.now().day
|
||||
if self.server_id != day % self.number_of_servers:
|
||||
self.logger.info('Not our turn to run expunge check, sleeping')
|
||||
LOG.info('Not our turn to run expunge check, sleeping')
|
||||
self.expunge_timer = threading.Timer(
|
||||
24 * 60 * 60, self.run_expunge, ()
|
||||
)
|
||||
@ -49,7 +52,7 @@ class ExpungeScheduler(object):
|
||||
days=int(self.expire_days)
|
||||
)
|
||||
exp_time = exp.strftime('%Y-%m-%d %H:%M:%S')
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
'Expunging deleted loadbalancers older than {0}'
|
||||
.format(exp_time)
|
||||
)
|
||||
@ -58,11 +61,11 @@ class ExpungeScheduler(object):
|
||||
).filter(LoadBalancer.updated < exp_time).\
|
||||
filter(LoadBalancer.status == 'DELETED').delete()
|
||||
session.commit()
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
'{0} deleted load balancers expunged'.format(count)
|
||||
)
|
||||
except:
|
||||
self.logger.exception('Exception occurred during expunge')
|
||||
self.logger.info('Expunge thread sleeping for 24 hours')
|
||||
LOG.exception('Exception occurred during expunge')
|
||||
LOG.info('Expunge thread sleeping for 24 hours')
|
||||
self.expunge_timer = threading.Timer(
|
||||
24 * 60 * 60, self.run_expunge, ())
|
||||
|
@ -19,9 +19,6 @@ known_drivers = {
|
||||
|
||||
|
||||
class AlertDriver(object):
|
||||
def __init__(self, logger):
|
||||
self.logger = logger
|
||||
|
||||
def send_alert(self, message, device_id):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
@ -11,12 +11,15 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
|
||||
import logging
|
||||
import ipaddress
|
||||
from libra.admin_api.stats.drivers.base import AlertDriver
|
||||
from libra.common.api.lbaas import Device, LoadBalancer, db_session
|
||||
from libra.common.api.lbaas import loadbalancers_devices, Vip
|
||||
from libra.common.api.gearman_client import submit_job, submit_vip_job
|
||||
from libra.admin_api.stats.drivers.base import AlertDriver
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DbDriver(AlertDriver):
|
||||
@ -68,7 +71,6 @@ class DbDriver(AlertDriver):
|
||||
session.commit()
|
||||
|
||||
def _rebuild_device(self, device_id):
|
||||
logger = logging.getLogger(__name__)
|
||||
new_device_id = None
|
||||
new_device_name = None
|
||||
with db_session() as session:
|
||||
@ -82,14 +84,14 @@ class DbDriver(AlertDriver):
|
||||
first()
|
||||
if new_device is None:
|
||||
session.rollback()
|
||||
logger.error(
|
||||
LOG.error(
|
||||
'No spare devices when trying to rebuild device {0}'
|
||||
.format(device_id)
|
||||
)
|
||||
return
|
||||
new_device_id = new_device.id
|
||||
new_device_name = new_device.name
|
||||
logger.info(
|
||||
LOG.info(
|
||||
"Moving device {0} to device {1}"
|
||||
.format(device_id, new_device_id)
|
||||
)
|
||||
@ -118,7 +120,7 @@ class DbDriver(AlertDriver):
|
||||
filter(Device.id == new_device_id).all()
|
||||
for lb in lbs:
|
||||
lb.errmsg = "Load Balancer rebuild on new device"
|
||||
logger.info(
|
||||
LOG.info(
|
||||
"Moving IP {0} and marking device {1} for deletion"
|
||||
.format(str(ipaddress.IPv4Address(vip.ip)), device_id)
|
||||
)
|
||||
|
@ -15,11 +15,15 @@ from dogapi import dog_http_api as api
|
||||
from oslo.config import cfg
|
||||
|
||||
from libra.admin_api.stats.drivers.base import AlertDriver
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DatadogDriver(AlertDriver):
|
||||
def __init__(self, logger):
|
||||
super(DatadogDriver, self).__init__(logger)
|
||||
def __init__(self):
|
||||
super(DatadogDriver, self).__init__()
|
||||
api.api_key = cfg.CONF['admin_api']['datadog_api_key']
|
||||
api.application_key = cfg.CONF['admin_api']['datadog_app_key']
|
||||
self.dd_env = cfg.CONF['admin_api']['datadog_env']
|
||||
@ -35,7 +39,7 @@ class DatadogDriver(AlertDriver):
|
||||
resp = api.event_with_response(
|
||||
title, text, tags=tags, alert_type='error'
|
||||
)
|
||||
self.logger.info('Datadog alert response: {0}'.format(resp))
|
||||
LOG.info('Datadog alert response: {0}'.format(resp))
|
||||
|
||||
def send_delete(self, message, device_id):
|
||||
title = 'Load balancer unreachable in {0}'.\
|
||||
@ -47,4 +51,4 @@ class DatadogDriver(AlertDriver):
|
||||
resp = api.event_with_response(
|
||||
title, text, tags=tags, alert_type='success'
|
||||
)
|
||||
self.logger.info('Datadog alert response: {0}'.format(resp))
|
||||
LOG.info('Datadog alert response: {0}'.format(resp))
|
||||
|
@ -12,14 +12,18 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
|
||||
from libra.admin_api.stats.drivers.base import AlertDriver
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DummyDriver(AlertDriver):
|
||||
def send_alert(self, message, device_id):
|
||||
self.logger.info('Dummy alert of: {0}'.format(message))
|
||||
LOG.info('Dummy alert of: {0}'.format(message))
|
||||
|
||||
def send_delete(self, message, device_id):
|
||||
self.logger.info('Dummy delete of: {0}'.format(message))
|
||||
LOG.info('Dummy delete of: {0}'.format(message))
|
||||
|
||||
def send_node_change(self, message, lbid, degraded):
|
||||
self.logger.info('Dummy node change of: {0}'.format(message))
|
||||
LOG.info('Dummy node change of: {0}'.format(message))
|
||||
|
@ -18,9 +18,13 @@ from datetime import datetime
|
||||
from oslo.config import cfg
|
||||
|
||||
from libra.common.api.lbaas import LoadBalancer, Device, Node, db_session
|
||||
from libra.openstack.common import log
|
||||
from libra.admin_api.stats.stats_gearman import GearJobs
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class NodeNotFound(Exception):
|
||||
pass
|
||||
|
||||
@ -30,8 +34,7 @@ class Stats(object):
|
||||
PING_SECONDS = 15
|
||||
OFFLINE_SECONDS = 45
|
||||
|
||||
def __init__(self, logger, drivers):
|
||||
self.logger = logger
|
||||
def __init__(self, drivers):
|
||||
self.drivers = drivers
|
||||
self.ping_timer = None
|
||||
self.offline_timer = None
|
||||
@ -40,7 +43,7 @@ class Stats(object):
|
||||
self.server_id = cfg.CONF['admin_api']['server_id']
|
||||
self.number_of_servers = cfg.CONF['admin_api']['number_of_servers']
|
||||
self.stats_driver = cfg.CONF['admin_api']['stats_driver']
|
||||
logger.info("Selected stats drivers: {0}".format(self.stats_driver))
|
||||
LOG.info("Selected stats drivers: %s", self.stats_driver)
|
||||
|
||||
self.start_ping_sched()
|
||||
self.start_offline_sched()
|
||||
@ -55,7 +58,7 @@ class Stats(object):
|
||||
# Work out if it is our turn to run
|
||||
minute = datetime.now().minute
|
||||
if self.server_id != minute % self.number_of_servers:
|
||||
self.logger.info('Not our turn to run OFFLINE check, sleeping')
|
||||
LOG.info('Not our turn to run OFFLINE check, sleeping')
|
||||
self.start_offline_sched()
|
||||
return
|
||||
tested = 0
|
||||
@ -63,9 +66,9 @@ class Stats(object):
|
||||
try:
|
||||
tested, failed = self._exec_offline_check()
|
||||
except Exception:
|
||||
self.logger.exception('Uncaught exception during OFFLINE check')
|
||||
LOG.exception('Uncaught exception during OFFLINE check')
|
||||
# Need to restart timer after every ping cycle
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
'{tested} OFFLINE loadbalancers tested, {failed} failed'
|
||||
.format(tested=tested, failed=failed)
|
||||
)
|
||||
@ -75,7 +78,7 @@ class Stats(object):
|
||||
# Work out if it is our turn to run
|
||||
minute = datetime.now().minute
|
||||
if self.server_id != minute % self.number_of_servers:
|
||||
self.logger.info('Not our turn to run ping check, sleeping')
|
||||
LOG.info('Not our turn to run ping check, sleeping')
|
||||
self.start_ping_sched()
|
||||
return
|
||||
pings = 0
|
||||
@ -83,32 +86,32 @@ class Stats(object):
|
||||
try:
|
||||
pings, failed = self._exec_ping()
|
||||
except Exception:
|
||||
self.logger.exception('Uncaught exception during LB ping')
|
||||
LOG.exception('Uncaught exception during LB ping')
|
||||
# Need to restart timer after every ping cycle
|
||||
self.logger.info('{pings} loadbalancers pinged, {failed} failed'
|
||||
.format(pings=pings, failed=failed))
|
||||
LOG.info('{pings} loadbalancers pinged, {failed} failed'
|
||||
.format(pings=pings, failed=failed))
|
||||
self.start_ping_sched()
|
||||
|
||||
def _exec_ping(self):
|
||||
pings = 0
|
||||
failed = 0
|
||||
node_list = []
|
||||
self.logger.info('Running ping check')
|
||||
LOG.info('Running ping check')
|
||||
with db_session() as session:
|
||||
devices = session.query(
|
||||
Device.id, Device.name
|
||||
).filter(Device.status == 'ONLINE').all()
|
||||
pings = len(devices)
|
||||
if pings == 0:
|
||||
self.logger.info('No LBs to ping')
|
||||
LOG.info('No LBs to ping')
|
||||
return (0, 0)
|
||||
for lb in devices:
|
||||
node_list.append(lb.name)
|
||||
gearman = GearJobs(self.logger)
|
||||
gearman = GearJobs()
|
||||
failed_lbs, node_status = gearman.send_pings(node_list)
|
||||
failed = len(failed_lbs)
|
||||
if failed > self.error_limit:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Too many simultaneous Load Balancer Failures.'
|
||||
' Aborting recovery attempt'
|
||||
)
|
||||
@ -127,7 +130,7 @@ class Stats(object):
|
||||
tested = 0
|
||||
failed = 0
|
||||
node_list = []
|
||||
self.logger.info('Running OFFLINE check')
|
||||
LOG.info('Running OFFLINE check')
|
||||
with db_session() as session:
|
||||
# Join to ensure device is in-use
|
||||
devices = session.query(
|
||||
@ -136,15 +139,15 @@ class Stats(object):
|
||||
|
||||
tested = len(devices)
|
||||
if tested == 0:
|
||||
self.logger.info('No OFFLINE Load Balancers to check')
|
||||
LOG.info('No OFFLINE Load Balancers to check')
|
||||
return (0, 0)
|
||||
for lb in devices:
|
||||
node_list.append(lb.name)
|
||||
gearman = GearJobs(self.logger)
|
||||
gearman = GearJobs()
|
||||
failed_lbs = gearman.offline_check(node_list)
|
||||
failed = len(failed_lbs)
|
||||
if failed > self.error_limit:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Too many simultaneous Load Balancer Failures.'
|
||||
' Aborting deletion attempt'
|
||||
)
|
||||
@ -169,7 +172,7 @@ class Stats(object):
|
||||
for lb in failed_lbs:
|
||||
data = self._get_lb(lb, session)
|
||||
if not data:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Device {0} has no Loadbalancer attached'.
|
||||
format(lb)
|
||||
)
|
||||
@ -184,8 +187,8 @@ class Stats(object):
|
||||
)
|
||||
)
|
||||
for driver in self.drivers:
|
||||
instance = driver(self.logger)
|
||||
self.logger.info(
|
||||
instance = driver()
|
||||
LOG.info(
|
||||
'Sending failure of {0} to {1}'.format(
|
||||
lb, instance.__class__.__name__
|
||||
)
|
||||
@ -202,14 +205,14 @@ class Stats(object):
|
||||
filter(Device.name == lb).first()
|
||||
|
||||
if not data:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Device {0} no longer exists'.format(data.id)
|
||||
)
|
||||
continue
|
||||
|
||||
if data.pingCount < self.ping_limit:
|
||||
data.pingCount += 1
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Offline Device {0} has failed {1} ping attempts'.
|
||||
format(lb, data.pingCount)
|
||||
)
|
||||
@ -225,8 +228,8 @@ class Stats(object):
|
||||
format(lb)
|
||||
)
|
||||
for driver in self.drivers:
|
||||
instance = driver(self.logger)
|
||||
self.logger.info(
|
||||
instance = driver()
|
||||
LOG.info(
|
||||
'Sending delete request for {0} to {1}'.format(
|
||||
lb, instance.__class__.__name__
|
||||
)
|
||||
@ -252,7 +255,7 @@ class Stats(object):
|
||||
for lb, nodes in node_status.iteritems():
|
||||
data = self._get_lb(lb, session)
|
||||
if not data:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Device {0} has no Loadbalancer attached'.
|
||||
format(lb)
|
||||
)
|
||||
@ -266,7 +269,7 @@ class Stats(object):
|
||||
filter(Node.id == int(node['id'])).first()
|
||||
|
||||
if node_data is None:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'DB error getting node {0} to set status {1}'
|
||||
.format(node['id'], node['status'])
|
||||
)
|
||||
@ -333,8 +336,8 @@ class Stats(object):
|
||||
else:
|
||||
is_degraded = False
|
||||
for driver in self.drivers:
|
||||
instance = driver(self.logger)
|
||||
self.logger.info(
|
||||
instance = driver()
|
||||
LOG.info(
|
||||
'Sending change of node status on LB {0} to {1}'.format(
|
||||
lbid, instance.__class__.__name__)
|
||||
)
|
||||
@ -352,8 +355,7 @@ class Stats(object):
|
||||
else:
|
||||
sleeptime = 60 - (seconds - self.PING_SECONDS)
|
||||
|
||||
self.logger.info('LB ping check timer sleeping for {secs} seconds'
|
||||
.format(secs=sleeptime))
|
||||
LOG.info('LB ping check timer sleeping for %d seconds', sleeptime)
|
||||
self.ping_timer = threading.Timer(sleeptime, self.ping_lbs, ())
|
||||
self.ping_timer.start()
|
||||
|
||||
@ -365,8 +367,7 @@ class Stats(object):
|
||||
else:
|
||||
sleeptime = 60 - (seconds - self.OFFLINE_SECONDS)
|
||||
|
||||
self.logger.info('LB offline check timer sleeping for {secs} seconds'
|
||||
.format(secs=sleeptime))
|
||||
LOG.info('LB offline check timer sleeping for %d seconds', sleeptime)
|
||||
self.offline_timer = threading.Timer(
|
||||
sleeptime, self.check_offline_lbs, ()
|
||||
)
|
||||
|
@ -15,11 +15,14 @@
|
||||
from gearman.constants import JOB_UNKNOWN
|
||||
from oslo.config import cfg
|
||||
from libra.common.json_gearman import JSONGearmanClient
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class GearJobs(object):
|
||||
def __init__(self, logger):
|
||||
self.logger = logger
|
||||
def __init__(self):
|
||||
self.poll_timeout = cfg.CONF['admin_api']['stats_poll_timeout']
|
||||
self.poll_retry = cfg.CONF['admin_api']['stats_poll_timeout_retry']
|
||||
|
||||
@ -54,7 +57,7 @@ class GearJobs(object):
|
||||
for ping in submitted_pings:
|
||||
if ping.state == JOB_UNKNOWN:
|
||||
# TODO: Gearman server failed, ignoring for now
|
||||
self.logger.error('Gearman Job server fail')
|
||||
LOG.error('Gearman Job server fail')
|
||||
continue
|
||||
if ping.timed_out:
|
||||
# Ping timeout
|
||||
@ -75,7 +78,7 @@ class GearJobs(object):
|
||||
|
||||
list_of_jobs = []
|
||||
if len(retry_list) > 0:
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
"{0} pings timed out, retrying".format(len(retry_list))
|
||||
)
|
||||
for node in retry_list:
|
||||
@ -87,7 +90,7 @@ class GearJobs(object):
|
||||
for ping in submitted_pings:
|
||||
if ping.state == JOB_UNKNOWN:
|
||||
# TODO: Gearman server failed, ignoring for now
|
||||
self.logger.error('Gearman Job server fail')
|
||||
LOG.error('Gearman Job server fail')
|
||||
continue
|
||||
if ping.timed_out:
|
||||
# Ping timeout
|
||||
@ -120,7 +123,7 @@ class GearJobs(object):
|
||||
)
|
||||
for ping in submitted_pings:
|
||||
if ping.state == JOB_UNKNOWN:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Gearman Job server failed during OFFLINE check of {0}".
|
||||
format(ping.job.task)
|
||||
)
|
||||
|
@ -37,9 +37,6 @@ cfg.CONF.register_opts(
|
||||
default='keystoneclient.middleware.auth_token:AuthProtocol',
|
||||
help='A colon separated module and class for keystone '
|
||||
' middleware'),
|
||||
cfg.StrOpt('logfile',
|
||||
default='/var/log/libra/libra_api.log',
|
||||
help='Log file'),
|
||||
cfg.StrOpt('pid',
|
||||
default='/var/run/libra/libra_api.pid',
|
||||
help='PID file'),
|
||||
|
@ -14,19 +14,21 @@
|
||||
|
||||
import ConfigParser
|
||||
import importlib
|
||||
import logging
|
||||
|
||||
from oslo.config import cfg
|
||||
from pecan import request
|
||||
|
||||
from libra.api.library.exp import NotAuthorized
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def get_limited_to_project(headers):
|
||||
"""Return the tenant the request should be limited to."""
|
||||
tenant_id = headers.get('X-Tenant-Id')
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info(
|
||||
LOG.info(
|
||||
'Loadbalancers {0} request {1} ({2}) from {3} tenant {4}'.format(
|
||||
request.environ.get('REQUEST_METHOD'),
|
||||
request.environ.get('PATH_INFO'),
|
||||
|
@ -18,6 +18,7 @@ import daemon
|
||||
import daemon.pidfile
|
||||
import daemon.runner
|
||||
import grp
|
||||
import logging as std_logging
|
||||
import pwd
|
||||
import pecan
|
||||
import sys
|
||||
@ -30,7 +31,12 @@ from libra.api import config as api_config
|
||||
from libra.api import model
|
||||
from libra.api import acl
|
||||
from libra.common.api import server
|
||||
from libra.common.options import add_common_opts, libra_logging, CONF
|
||||
from libra.common.log import get_descriptors
|
||||
from libra.common.options import add_common_opts, CONF
|
||||
from libra.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Gets rid of pep8 error
|
||||
@ -89,12 +95,9 @@ def setup_app(pecan_config):
|
||||
|
||||
|
||||
class LogStdout(object):
|
||||
def __init__(self, logger):
|
||||
self.logger = logger.info
|
||||
|
||||
def write(self, data):
|
||||
if data.strip() != '':
|
||||
self.logger(data)
|
||||
LOG.info(data)
|
||||
|
||||
# Gearman calls this
|
||||
def flush(self):
|
||||
@ -105,6 +108,11 @@ def main():
|
||||
add_common_opts()
|
||||
CONF(project='libra', version=__version__)
|
||||
|
||||
logging.setup('libra')
|
||||
|
||||
LOG.debug('Configuration:')
|
||||
CONF.log_opt_values(LOG, std_logging.DEBUG)
|
||||
|
||||
pc = get_pecan_config()
|
||||
|
||||
# NOTE: Let's not force anyone to actually have to use SSL, it shouldn't be
|
||||
@ -118,11 +126,15 @@ def main():
|
||||
pidfile = daemon.pidfile.TimeoutPIDLockFile(CONF['api']['pid'], 10)
|
||||
if daemon.runner.is_pidfile_stale(pidfile):
|
||||
pidfile.break_lock()
|
||||
|
||||
descriptors = get_descriptors()
|
||||
descriptors.append(sock.fileno())
|
||||
|
||||
context = daemon.DaemonContext(
|
||||
working_directory='/',
|
||||
umask=0o022,
|
||||
pidfile=pidfile,
|
||||
files_preserve=[sock.fileno()]
|
||||
files_preserve=descriptors
|
||||
)
|
||||
if CONF['user']:
|
||||
context.uid = pwd.getpwnam(CONF['user']).pw_uid
|
||||
@ -130,12 +142,9 @@ def main():
|
||||
context.gid = grp.getgrnam(CONF['group']).gr_gid
|
||||
context.open()
|
||||
|
||||
# Use the root logger due to lots of services using logger
|
||||
logger = libra_logging('', 'api')
|
||||
logger.info('Starting on {0}:{1}'.format(CONF['api']['host'],
|
||||
CONF['api']['port']))
|
||||
LOG.info('Starting on %s:%d', CONF.api.host, CONF.api.port)
|
||||
api = setup_app(pc)
|
||||
sys.stderr = LogStdout(logger)
|
||||
sys.stderr = LogStdout()
|
||||
|
||||
wsgi.server(sock, api, keepalive=False, debug=CONF['debug'])
|
||||
|
||||
|
@ -12,7 +12,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import traceback
|
||||
import functools
|
||||
import inspect
|
||||
@ -27,25 +26,28 @@ import wsmeext.pecan
|
||||
import pecan
|
||||
from libra.api.library.exp import OverLimit, NotFound, NotAuthorized
|
||||
from libra.api.library.exp import ImmutableEntity
|
||||
from libra.openstack.common import log
|
||||
from libra.common.exc import DetailError
|
||||
from wsme.rest.json import tojson
|
||||
from sqlalchemy.exc import OperationalError, ResourceClosedError
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def format_exception(excinfo, debug=False):
|
||||
"""Extract informations that can be sent to the client."""
|
||||
error = excinfo[1]
|
||||
log = logging.getLogger(__name__)
|
||||
if isinstance(error, wsme.exc.ClientSideError):
|
||||
r = dict(message="Bad Request",
|
||||
details=error.faultstring)
|
||||
log.warning("Client-side error: %s" % r['details'])
|
||||
LOG.warning("Client-side error: %s" % r['details'])
|
||||
return r
|
||||
else:
|
||||
faultstring = str(error)
|
||||
debuginfo = "\n".join(traceback.format_exception(*excinfo))
|
||||
|
||||
log.error('Server-side error: "%s". Detail: \n%s' % (
|
||||
LOG.error('Server-side error: "%s". Detail: \n%s' % (
|
||||
faultstring, debuginfo))
|
||||
|
||||
if isinstance(error, DetailError):
|
||||
@ -112,8 +114,7 @@ def wsexpose(*args, **kwargs):
|
||||
|
||||
# ResourceClosedError happens on for_update deadlock
|
||||
except (OperationalError, ResourceClosedError):
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.warning(
|
||||
LOG.warning(
|
||||
"Galera deadlock, retry {0}".format(x + 1)
|
||||
)
|
||||
args = old_args
|
||||
|
@ -14,15 +14,18 @@
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
import logging
|
||||
import ipaddress
|
||||
from libra.common.json_gearman import JSONGearmanClient
|
||||
from libra.common.api.lbaas import LoadBalancer, db_session, Device, Node, Vip
|
||||
from libra.common.api.lbaas import HealthMonitor
|
||||
from libra.common.api.lbaas import loadbalancers_devices
|
||||
from libra.openstack.common import log
|
||||
from pecan import conf
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
gearman_workers = [
|
||||
'UPDATE', # Create/Update a Load Balancer.
|
||||
'SUSPEND', # Suspend a Load Balancer.
|
||||
@ -35,21 +38,19 @@ gearman_workers = [
|
||||
|
||||
|
||||
def submit_job(job_type, host, data, lbid):
|
||||
logger = logging.getLogger(__name__)
|
||||
eventlet.spawn_n(client_job, logger, job_type, str(host), data, lbid)
|
||||
eventlet.spawn_n(client_job, job_type, str(host), data, lbid)
|
||||
|
||||
|
||||
def submit_vip_job(job_type, device, vip):
|
||||
logger = logging.getLogger(__name__)
|
||||
eventlet.spawn_n(
|
||||
client_job, logger, job_type, "libra_pool_mgm", device, vip
|
||||
client_job, job_type, "libra_pool_mgm", device, vip
|
||||
)
|
||||
|
||||
|
||||
def client_job(logger, job_type, host, data, lbid):
|
||||
def client_job(job_type, host, data, lbid):
|
||||
try:
|
||||
client = GearmanClientThread(logger, host, lbid)
|
||||
logger.info(
|
||||
client = GearmanClientThread(host, lbid)
|
||||
LOG.info(
|
||||
"Sending Gearman job {0} to {1} for loadbalancer {2}".format(
|
||||
job_type, host, lbid
|
||||
)
|
||||
@ -70,14 +71,14 @@ def client_job(logger, job_type, host, data, lbid):
|
||||
device = session.query(Device).\
|
||||
filter(Device.name == data).first()
|
||||
if device is None:
|
||||
logger.error(
|
||||
LOG.error(
|
||||
"Device {0} not found in ASSIGN, this shouldn't happen"
|
||||
.format(data)
|
||||
)
|
||||
return
|
||||
|
||||
if not status:
|
||||
logger.error(
|
||||
LOG.error(
|
||||
"Giving up vip assign for device {0}".format(data)
|
||||
)
|
||||
errmsg = 'Floating IP assign failed'
|
||||
@ -99,12 +100,11 @@ def client_job(logger, job_type, host, data, lbid):
|
||||
client.send_remove(data)
|
||||
return
|
||||
except:
|
||||
logger.exception("Gearman thread unhandled exception")
|
||||
LOG.exception("Gearman thread unhandled exception")
|
||||
|
||||
|
||||
class GearmanClientThread(object):
|
||||
def __init__(self, logger, host, lbid):
|
||||
self.logger = logger
|
||||
def __init__(self, host, lbid):
|
||||
self.host = host
|
||||
self.lbid = lbid
|
||||
|
||||
@ -128,7 +128,7 @@ class GearmanClientThread(object):
|
||||
device = session.query(Device).\
|
||||
filter(Device.name == data).first()
|
||||
if device is None:
|
||||
self.logger.error(
|
||||
self.LOG.error(
|
||||
"VIP assign have been given non existent device {0}"
|
||||
.format(data)
|
||||
)
|
||||
@ -141,7 +141,7 @@ class GearmanClientThread(object):
|
||||
first()
|
||||
if vip is None:
|
||||
errmsg = 'Floating IP assign failed (none available)'
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Failed to assign IP to device {0} (none available)"
|
||||
.format(data)
|
||||
)
|
||||
@ -153,7 +153,7 @@ class GearmanClientThread(object):
|
||||
filter(Vip.id == self.lbid).first()
|
||||
if vip is None:
|
||||
errmsg = 'Cannot find existing floating IP'
|
||||
self.logger.error(
|
||||
self.LOG.error(
|
||||
"Failed to assign IP to device {0}"
|
||||
.format(data)
|
||||
)
|
||||
@ -175,12 +175,12 @@ class GearmanClientThread(object):
|
||||
if status:
|
||||
return True
|
||||
elif self.lbid:
|
||||
self.logger.error(
|
||||
self.LOG.error(
|
||||
"Failed to assign IP {0} to device {1}"
|
||||
.format(ip_str, data)
|
||||
)
|
||||
else:
|
||||
self.logger.error(
|
||||
self.LOG.error(
|
||||
"Failed to assign IP {0} to device {1}"
|
||||
.format(ip_str, data)
|
||||
)
|
||||
@ -201,7 +201,7 @@ class GearmanClientThread(object):
|
||||
ip_int = int(ipaddress.IPv4Address(unicode(self.lbid)))
|
||||
with db_session() as session:
|
||||
if not status:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Failed to delete IP {0}"
|
||||
.format(self.lbid)
|
||||
)
|
||||
@ -272,7 +272,7 @@ class GearmanClientThread(object):
|
||||
filter(LoadBalancer.id == self.lbid).\
|
||||
first()
|
||||
if not status:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Failed Gearman delete for LB {0}".format(lb.id)
|
||||
)
|
||||
self._set_error(data, response, session)
|
||||
@ -356,7 +356,7 @@ class GearmanClientThread(object):
|
||||
|
||||
degraded = []
|
||||
if lbs is None:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Attempting to send empty LB data for device {0} ({1}), '
|
||||
'something went wrong'.format(data, self.host)
|
||||
)
|
||||
@ -453,15 +453,15 @@ class GearmanClientThread(object):
|
||||
)
|
||||
if job_status.state == 'UNKNOWN':
|
||||
# Gearman server connection failed
|
||||
self.logger.error('Could not talk to gearman server')
|
||||
LOG.error('Could not talk to gearman server')
|
||||
return False, "System error communicating with load balancer"
|
||||
if job_status.timed_out:
|
||||
# Job timed out
|
||||
self.logger.warning(
|
||||
LOG.warning(
|
||||
'Gearman timeout talking to {0}'.format(self.host)
|
||||
)
|
||||
return False, "Timeout error communicating with load balancer"
|
||||
self.logger.debug(job_status.result)
|
||||
LOG.debug(job_status.result)
|
||||
if 'badRequest' in job_status.result:
|
||||
error = job_status.result['badRequest']['validationErrors']
|
||||
return False, error['message']
|
||||
@ -471,9 +471,9 @@ class GearmanClientThread(object):
|
||||
error = job_status.result['hpcs_error']
|
||||
else:
|
||||
error = 'Load Balancer error'
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Gearman error response from {0}: {1}'.format(self.host, error)
|
||||
)
|
||||
return False, error
|
||||
self.logger.info('Gearman success from {0}'.format(self.host))
|
||||
LOG.info('Gearman success from {0}'.format(self.host))
|
||||
return True, job_status.result
|
||||
|
@ -14,7 +14,6 @@
|
||||
# under the License.
|
||||
|
||||
import ConfigParser
|
||||
import logging
|
||||
import sqlalchemy.types as types
|
||||
import time
|
||||
|
||||
@ -25,6 +24,10 @@ from sqlalchemy import INTEGER, VARCHAR, BIGINT
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import relationship, backref, sessionmaker, Session
|
||||
|
||||
from libra.openstack.common import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
DeclarativeBase = declarative_base()
|
||||
metadata = DeclarativeBase.metadata
|
||||
@ -219,7 +222,6 @@ class RoutingSession(Session):
|
||||
class db_session(object):
|
||||
def __init__(self):
|
||||
self.session = None
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def __enter__(self):
|
||||
for x in xrange(10):
|
||||
@ -228,7 +230,7 @@ class db_session(object):
|
||||
self.session.execute("SELECT 1")
|
||||
return self.session
|
||||
except:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Could not connect to DB server: {0}'.format(
|
||||
RoutingSession.engines[RoutingSession.use_engine].url
|
||||
)
|
||||
@ -237,7 +239,7 @@ class db_session(object):
|
||||
RoutingSession.use_engine += 1
|
||||
if RoutingSession.use_engine == RoutingSession.engines_count:
|
||||
RoutingSession.use_engine = 0
|
||||
self.logger.error('Could not connect to any DB server')
|
||||
LOG.error('Could not connect to any DB server')
|
||||
return None
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
|
@ -19,6 +19,27 @@ import sys
|
||||
import time
|
||||
import glob
|
||||
import codecs
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
def get_descriptors():
|
||||
"""
|
||||
Utility method to get all Oslo logging filedescrptiors.
|
||||
|
||||
Needs to be called after log.setup(...)
|
||||
"""
|
||||
descriptors = []
|
||||
|
||||
def _add(fh):
|
||||
if not fh in descriptors:
|
||||
descriptors.append(fh)
|
||||
|
||||
for logger in log._loggers.values():
|
||||
for handler in logger.handlers:
|
||||
_add(handler.stream)
|
||||
for i in logging.root.handlers:
|
||||
_add(i.stream)
|
||||
return descriptors
|
||||
|
||||
|
||||
class NewlineFormatter(logging.Formatter):
|
@ -11,50 +11,18 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
|
||||
import logging
|
||||
import logging.handlers
|
||||
import logstash
|
||||
import sys
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from logging_handler import CompressedTimedRotatingFileHandler
|
||||
from logging_handler import NewlineFormatter
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
common_opts = [
|
||||
cfg.BoolOpt('syslog',
|
||||
default=False,
|
||||
help='Use syslog for logging output'),
|
||||
cfg.StrOpt('syslog_socket',
|
||||
default='/dev/log',
|
||||
help='Socket to use for syslog connection'),
|
||||
cfg.StrOpt('syslog_facility',
|
||||
default='local7',
|
||||
help='Syslog logging facility'),
|
||||
cfg.StrOpt('logstash',
|
||||
metavar="HOST:PORT",
|
||||
help='Send logs to logstash at "host:port"'),
|
||||
cfg.StrOpt('group',
|
||||
help='Group to use for daemon mode'),
|
||||
cfg.StrOpt('user',
|
||||
help='User to use for daemon mode'),
|
||||
]
|
||||
|
||||
common_cli_opts = [
|
||||
cfg.BoolOpt('daemon',
|
||||
default=True,
|
||||
help='Run as a daemon'),
|
||||
cfg.BoolOpt('debug',
|
||||
short='d',
|
||||
default=False,
|
||||
help='Turn on debug output'),
|
||||
cfg.BoolOpt('verbose',
|
||||
short='v',
|
||||
default=False,
|
||||
help='Turn on verbose output'),
|
||||
cfg.StrOpt('group',
|
||||
help='Group to use for daemon mode'),
|
||||
cfg.StrOpt('user',
|
||||
help='User to use for daemon mode')
|
||||
]
|
||||
|
||||
gearman_opts = [
|
||||
@ -95,64 +63,5 @@ gearman_opts = [
|
||||
|
||||
|
||||
def add_common_opts():
|
||||
CONF.register_opts(common_opts)
|
||||
CONF.register_opts(gearman_opts, group='gearman')
|
||||
CONF.register_cli_opts(common_cli_opts)
|
||||
|
||||
|
||||
def libra_logging(name, section):
|
||||
"""
|
||||
Shared routine for setting up logging. Depends on some common options
|
||||
(nodaemon, logfile, debug, verbose) being set.
|
||||
"""
|
||||
|
||||
debug = CONF['debug']
|
||||
verbose = CONF['verbose']
|
||||
logfile = CONF[section]['logfile']
|
||||
daemon = CONF['daemon']
|
||||
syslog = CONF['syslog']
|
||||
syslog_socket = CONF['syslog_socket']
|
||||
syslog_facility = CONF['syslog_facility']
|
||||
logstash_opt = CONF['logstash']
|
||||
|
||||
if not daemon:
|
||||
logfile = None
|
||||
|
||||
# Timestamped formatter
|
||||
# Use newline formatter to convert /n to ' ' so logstatsh doesn't break
|
||||
# multiline
|
||||
ts_formatter = NewlineFormatter(
|
||||
'%(asctime)-6s: %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
# No timestamp, used with syslog
|
||||
simple_formatter = logging.Formatter(
|
||||
'%(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
if syslog and daemon:
|
||||
handler = logging.handlers.SysLogHandler(address=syslog_socket,
|
||||
facility=syslog_facility)
|
||||
handler.setFormatter(simple_formatter)
|
||||
elif logstash_opt:
|
||||
logstash_host, logstash_port = logstash_opt.split(':')
|
||||
handler = logstash.LogstashHandler(logstash_host, int(logstash_port))
|
||||
handler.setFormatter(ts_formatter)
|
||||
elif logfile:
|
||||
handler = CompressedTimedRotatingFileHandler(
|
||||
logfile, when='D', interval=1, backupCount=7
|
||||
)
|
||||
handler.setFormatter(ts_formatter)
|
||||
else:
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setFormatter(ts_formatter)
|
||||
|
||||
logger = logging.getLogger(name)
|
||||
logger.addHandler(handler)
|
||||
|
||||
if debug:
|
||||
logger.setLevel(level=logging.DEBUG)
|
||||
elif verbose:
|
||||
logger.setLevel(level=logging.INFO)
|
||||
|
||||
return logger
|
||||
|
@ -25,9 +25,6 @@ cfg.CONF.register_opts(
|
||||
required=True,
|
||||
help='The az the nodes and IPs will reside in (to be '
|
||||
'passed to the API server'),
|
||||
cfg.StrOpt('logfile',
|
||||
default='/var/log/libra/libra_mgm.log',
|
||||
help='Log file'),
|
||||
cfg.StrOpt('pid',
|
||||
default='/var/run/libra/libra_mgm.pid',
|
||||
help='PID file'),
|
||||
@ -38,9 +35,11 @@ cfg.CONF.register_opts(
|
||||
help='the auth URL for the Nova API'),
|
||||
cfg.StrOpt('nova_user',
|
||||
required=True,
|
||||
secret=True,
|
||||
help='the username for the Nova API'),
|
||||
cfg.StrOpt('nova_pass',
|
||||
required=True,
|
||||
secret=True,
|
||||
help='the password for the Nova API'),
|
||||
cfg.StrOpt('nova_region',
|
||||
required=True,
|
||||
|
@ -16,34 +16,37 @@ from time import sleep
|
||||
from novaclient import exceptions
|
||||
from oslo.config import cfg
|
||||
from gearman.constants import JOB_UNKNOWN
|
||||
from libra.openstack.common import log
|
||||
from libra.common.json_gearman import JSONGearmanClient
|
||||
from libra.mgm.nova import Node, BuildError, NotFound
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class BuildController(object):
|
||||
|
||||
RESPONSE_FIELD = 'response'
|
||||
RESPONSE_SUCCESS = 'PASS'
|
||||
RESPONSE_FAILURE = 'FAIL'
|
||||
|
||||
def __init__(self, logger, msg):
|
||||
self.logger = logger
|
||||
def __init__(self, msg):
|
||||
self.msg = msg
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
nova = Node()
|
||||
except Exception:
|
||||
self.logger.exception("Error initialising Nova connection")
|
||||
LOG.exception("Error initialising Nova connection")
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
self.logger.info("Building a requested Nova instance")
|
||||
LOG.info("Building a requested Nova instance")
|
||||
try:
|
||||
node_id = nova.build()
|
||||
self.logger.info("Build command sent to Nova")
|
||||
LOG.info("Build command sent to Nova")
|
||||
except BuildError as exc:
|
||||
self.logger.exception(
|
||||
LOG.exception(
|
||||
"{0}, node {1}".format(exc.msg, exc.node_name)
|
||||
)
|
||||
name = exc.node_name
|
||||
@ -51,13 +54,13 @@ class BuildController(object):
|
||||
try:
|
||||
node_id = nova.get_node(name)
|
||||
except NotFound:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"No node found for {0}, giving up on it".format(name)
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
except exceptions.ClientException:
|
||||
self.logger.exception(
|
||||
LOG.exception(
|
||||
'Error getting failed node info from Nova for {0}'
|
||||
.format(name)
|
||||
)
|
||||
@ -71,7 +74,7 @@ class BuildController(object):
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
else:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Node build did not return an ID, cannot find it'
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
@ -82,19 +85,19 @@ class BuildController(object):
|
||||
try:
|
||||
resp, status = nova.status(node_id)
|
||||
except NotFound:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Node {0} can no longer be found'.format(node_id)
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
except exceptions.ClientException:
|
||||
self.logger.exception(
|
||||
LOG.exception(
|
||||
'Error getting status from Nova'
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
if resp.status_code not in(200, 203):
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
'Error geting status from Nova, error {0}'
|
||||
.format(resp.status_code)
|
||||
)
|
||||
@ -114,12 +117,12 @@ class BuildController(object):
|
||||
)
|
||||
self.msg['az'] = cfg.CONF['mgm']['az']
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
||||
self.logger.info('Node {0} returned'.format(status['name']))
|
||||
LOG.info('Node {0} returned'.format(status['name']))
|
||||
return self.msg
|
||||
sleep(60)
|
||||
|
||||
nova.delete(node_id)
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Node {0} didn't come up after 10 minutes, deleted".format(node_id)
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
@ -149,12 +152,12 @@ class BuildController(object):
|
||||
if job_status.state == JOB_UNKNOWN:
|
||||
# Gearman server connect fail, count as bad node because we can't
|
||||
# tell if it really is working
|
||||
self.logger.error('Could not talk to gearman server')
|
||||
LOG.error('Could not talk to gearman server')
|
||||
return False
|
||||
if job_status.timed_out:
|
||||
self.logger.warning('Timeout getting diags from {0}'.format(name))
|
||||
LOG.warning('Timeout getting diags from {0}'.format(name))
|
||||
return False
|
||||
self.logger.debug(job_status.result)
|
||||
LOG.debug(job_status.result)
|
||||
# Would only happen if DIAGNOSTICS call not supported
|
||||
if job_status.result['hpcs_response'] == 'FAIL':
|
||||
return True
|
||||
@ -167,7 +170,7 @@ class BuildController(object):
|
||||
for gearman_test in job_status.result['gearman']:
|
||||
gearman_count += 1
|
||||
if gearman_test['status'] == 'FAIL':
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
'Device {0} cannot talk to gearman {1}'
|
||||
.format(name, gearman_test['host'])
|
||||
)
|
||||
|
@ -13,6 +13,10 @@
|
||||
# under the License.
|
||||
|
||||
from libra.mgm.nova import Node, NotFound
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class DeleteController(object):
|
||||
@ -21,32 +25,31 @@ class DeleteController(object):
|
||||
RESPONSE_SUCCESS = 'PASS'
|
||||
RESPONSE_FAILURE = 'FAIL'
|
||||
|
||||
def __init__(self, logger, msg):
|
||||
self.logger = logger
|
||||
def __init__(self, msg):
|
||||
self.msg = msg
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
nova = Node()
|
||||
except Exception:
|
||||
self.logger.exception("Error initialising Nova connection")
|
||||
LOG.exception("Error initialising Nova connection")
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
"Deleting a requested Nova instance {0}".format(self.msg['name'])
|
||||
)
|
||||
try:
|
||||
node_id = nova.get_node(self.msg['name'])
|
||||
except NotFound:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"No node found for {0}".format(self.msg['name'])
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
nova.delete(node_id)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
'Deleted node {0}, id {1}'.format(self.msg['name'], node_id)
|
||||
)
|
||||
return self.msg
|
||||
|
@ -16,6 +16,10 @@ from libra.mgm.controllers.build import BuildController
|
||||
from libra.mgm.controllers.delete import DeleteController
|
||||
from libra.mgm.controllers.vip import BuildIpController, AssignIpController
|
||||
from libra.mgm.controllers.vip import RemoveIpController, DeleteIpController
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class PoolMgmController(object):
|
||||
@ -25,13 +29,12 @@ class PoolMgmController(object):
|
||||
RESPONSE_SUCCESS = 'PASS'
|
||||
RESPONSE_FAILURE = 'FAIL'
|
||||
|
||||
def __init__(self, logger, json_msg):
|
||||
self.logger = logger
|
||||
def __init__(self, json_msg):
|
||||
self.msg = json_msg
|
||||
|
||||
def run(self):
|
||||
if self.ACTION_FIELD not in self.msg:
|
||||
self.logger.error("Missing `{0}` value".format(self.ACTION_FIELD))
|
||||
LOG.error("Missing `{0}` value".format(self.ACTION_FIELD))
|
||||
self.msg[self.RESPONSE_FILED] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
@ -39,19 +42,19 @@ class PoolMgmController(object):
|
||||
|
||||
try:
|
||||
if action == 'BUILD_DEVICE':
|
||||
controller = BuildController(self.logger, self.msg)
|
||||
controller = BuildController(self.msg)
|
||||
elif action == 'DELETE_DEVICE':
|
||||
controller = DeleteController(self.logger, self.msg)
|
||||
controller = DeleteController(self.msg)
|
||||
elif action == 'BUILD_IP':
|
||||
controller = BuildIpController(self.logger, self.msg)
|
||||
controller = BuildIpController(self.msg)
|
||||
elif action == 'ASSIGN_IP':
|
||||
controller = AssignIpController(self.logger, self.msg)
|
||||
controller = AssignIpController(self.msg)
|
||||
elif action == 'REMOVE_IP':
|
||||
controller = RemoveIpController(self.logger, self.msg)
|
||||
controller = RemoveIpController(self.msg)
|
||||
elif action == 'DELETE_IP':
|
||||
controller = DeleteIpController(self.logger, self.msg)
|
||||
controller = DeleteIpController(self.msg)
|
||||
else:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Invalid `{0}` value: {1}".format(
|
||||
self.ACTION_FIELD, action
|
||||
)
|
||||
@ -66,13 +69,11 @@ class PoolMgmController(object):
|
||||
and 'name' in self.msg
|
||||
):
|
||||
delete_msg = {'name': self.msg['name']}
|
||||
controller = DeleteController(
|
||||
self.logger, delete_msg
|
||||
)
|
||||
controller = DeleteController(delete_msg)
|
||||
controller.run()
|
||||
|
||||
return self.msg
|
||||
except Exception:
|
||||
self.logger.exception("Controller exception")
|
||||
LOG.exception("Controller exception")
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
@ -18,6 +18,10 @@ from novaclient import exceptions
|
||||
from oslo.config import cfg
|
||||
|
||||
from libra.mgm.nova import Node
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class BuildIpController(object):
|
||||
@ -26,28 +30,27 @@ class BuildIpController(object):
|
||||
RESPONSE_SUCCESS = 'PASS'
|
||||
RESPONSE_FAILURE = 'FAIL'
|
||||
|
||||
def __init__(self, logger, msg):
|
||||
self.logger = logger
|
||||
def __init__(self, msg):
|
||||
self.msg = msg
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
nova = Node()
|
||||
except Exception:
|
||||
self.logger.exception("Error initialising Nova connection")
|
||||
LOG.exception("Error initialising Nova connection")
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
self.logger.info("Creating a requested floating IP")
|
||||
LOG.info("Creating a requested floating IP")
|
||||
try:
|
||||
ip_info = nova.vip_create()
|
||||
except exceptions.ClientException:
|
||||
self.logger.exception(
|
||||
LOG.exception(
|
||||
'Error getting a Floating IP'
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
self.logger.info("Floating IP {0} created".format(ip_info['id']))
|
||||
LOG.info("Floating IP {0} created".format(ip_info['id']))
|
||||
self.msg['id'] = ip_info['id']
|
||||
self.msg['ip'] = ip_info['ip']
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
||||
@ -60,25 +63,24 @@ class AssignIpController(object):
|
||||
RESPONSE_SUCCESS = 'PASS'
|
||||
RESPONSE_FAILURE = 'FAIL'
|
||||
|
||||
def __init__(self, logger, msg):
|
||||
self.logger = logger
|
||||
def __init__(self, msg):
|
||||
self.msg = msg
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
nova = Node()
|
||||
except Exception:
|
||||
self.logger.exception("Error initialising Nova connection")
|
||||
LOG.exception("Error initialising Nova connection")
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
"Assigning Floating IP {0} to {1}"
|
||||
.format(self.msg['ip'], self.msg['name'])
|
||||
)
|
||||
try:
|
||||
node_id = nova.get_node(self.msg['name'])
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
'Node name {0} identified as ID {1}'
|
||||
.format(self.msg['name'], node_id)
|
||||
)
|
||||
@ -87,7 +89,7 @@ class AssignIpController(object):
|
||||
self.check_ip(self.msg['ip'],
|
||||
cfg.CONF['mgm']['tcp_check_port'])
|
||||
except:
|
||||
self.logger.exception(
|
||||
LOG.exception(
|
||||
'Error assigning Floating IP {0} to {1}'
|
||||
.format(self.msg['ip'], self.msg['name'])
|
||||
)
|
||||
@ -114,7 +116,7 @@ class AssignIpController(object):
|
||||
pass
|
||||
loop_count += 1
|
||||
if loop_count >= 5:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"TCP connect error after floating IP assign {0}"
|
||||
.format(ip)
|
||||
)
|
||||
@ -128,19 +130,18 @@ class RemoveIpController(object):
|
||||
RESPONSE_SUCCESS = 'PASS'
|
||||
RESPONSE_FAILURE = 'FAIL'
|
||||
|
||||
def __init__(self, logger, msg):
|
||||
self.logger = logger
|
||||
def __init__(self, msg):
|
||||
self.msg = msg
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
nova = Node()
|
||||
except Exception:
|
||||
self.logger.exception("Error initialising Nova connection")
|
||||
LOG.exception("Error initialising Nova connection")
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
"Removing Floating IP {0} from {1}"
|
||||
.format(self.msg['ip'], self.msg['name'])
|
||||
)
|
||||
@ -148,7 +149,7 @@ class RemoveIpController(object):
|
||||
node_id = nova.get_node(self.msg['name'])
|
||||
nova.vip_remove(node_id, self.msg['ip'])
|
||||
except:
|
||||
self.logger.exception(
|
||||
LOG.exception(
|
||||
'Error removing Floating IP {0} from {1}'
|
||||
.format(self.msg['ip'], self.msg['name'])
|
||||
)
|
||||
@ -165,26 +166,25 @@ class DeleteIpController(object):
|
||||
RESPONSE_SUCCESS = 'PASS'
|
||||
RESPONSE_FAILURE = 'FAIL'
|
||||
|
||||
def __init__(self, logger, msg):
|
||||
self.logger = logger
|
||||
def __init__(self, msg):
|
||||
self.msg = msg
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
nova = Node()
|
||||
except Exception:
|
||||
self.logger.exception("Error initialising Nova connection")
|
||||
LOG.exception("Error initialising Nova connection")
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
"Deleting Floating IP {0}"
|
||||
.format(self.msg['ip'])
|
||||
)
|
||||
try:
|
||||
nova.vip_delete(self.msg['ip'])
|
||||
except:
|
||||
self.logger.exception(
|
||||
LOG.exception(
|
||||
'Error deleting Floating IP {0}'
|
||||
.format(self.msg['ip'])
|
||||
)
|
||||
|
@ -21,19 +21,22 @@ from oslo.config import cfg
|
||||
|
||||
from libra.common.json_gearman import JSONGearmanWorker
|
||||
from libra.mgm.controllers.root import PoolMgmController
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def handler(worker, job):
|
||||
logger = worker.logger
|
||||
logger.debug("Received JSON message: {0}".format(json.dumps(job.data)))
|
||||
controller = PoolMgmController(logger, job.data)
|
||||
LOG.debug("Received JSON message: {0}".format(json.dumps(job.data)))
|
||||
controller = PoolMgmController(job.data)
|
||||
response = controller.run()
|
||||
logger.debug("Return JSON message: {0}".format(json.dumps(response)))
|
||||
LOG.debug("Return JSON message: {0}".format(json.dumps(response)))
|
||||
return response
|
||||
|
||||
|
||||
def worker_thread(logger):
|
||||
logger.info("Registering task libra_pool_mgm")
|
||||
def worker_thread():
|
||||
LOG.info("Registering task libra_pool_mgm")
|
||||
hostname = socket.gethostname()
|
||||
|
||||
server_list = []
|
||||
@ -52,7 +55,7 @@ def worker_thread(logger):
|
||||
|
||||
worker.set_client_id(hostname)
|
||||
worker.register_task('libra_pool_mgm', handler)
|
||||
worker.logger = logger
|
||||
worker.logger = LOG
|
||||
|
||||
retry = True
|
||||
|
||||
@ -62,11 +65,11 @@ def worker_thread(logger):
|
||||
except KeyboardInterrupt:
|
||||
retry = False
|
||||
except gearman.errors.ServerUnavailable:
|
||||
logger.error("Job server(s) went away. Reconnecting.")
|
||||
LOG.error("Job server(s) went away. Reconnecting.")
|
||||
time.sleep(cfg.CONF['gearman']['reconnect_sleep'])
|
||||
retry = True
|
||||
except Exception:
|
||||
logger.exception("Exception in worker")
|
||||
LOG.exception("Exception in worker")
|
||||
retry = False
|
||||
|
||||
logger.debug("Pool manager process terminated.")
|
||||
LOG.debug("Pool manager process terminated.")
|
||||
|
@ -16,29 +16,31 @@ import daemon
|
||||
import daemon.pidfile
|
||||
import daemon.runner
|
||||
import grp
|
||||
import logging as std_logging
|
||||
import pwd
|
||||
import threading
|
||||
|
||||
from libra import __version__
|
||||
from libra.common.options import add_common_opts, libra_logging, CONF
|
||||
from libra.common.options import add_common_opts, CONF
|
||||
from libra.common.log import get_descriptors
|
||||
from libra.openstack.common import log as logging
|
||||
from libra.mgm.gearman_worker import worker_thread
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Server(object):
|
||||
def __init__(self):
|
||||
self.logger = None
|
||||
|
||||
def main(self):
|
||||
self.logger = libra_logging('libra_mgm', 'mgm')
|
||||
|
||||
self.logger.info(
|
||||
LOG.info(
|
||||
'Libra Pool Manager worker started, spawning {0} threads'
|
||||
.format(CONF['mgm']['threads'])
|
||||
)
|
||||
thread_list = []
|
||||
for x in xrange(0, CONF['mgm']['threads']):
|
||||
thd = threading.Thread(
|
||||
target=worker_thread, args=[self.logger]
|
||||
target=worker_thread, args=[]
|
||||
)
|
||||
thd.daemon = True
|
||||
thread_list.append(thd)
|
||||
@ -51,6 +53,11 @@ def main():
|
||||
add_common_opts()
|
||||
CONF(project='libra', version=__version__)
|
||||
|
||||
logging.setup('libra')
|
||||
|
||||
LOG.debug('Configuration:')
|
||||
CONF.log_opt_values(LOG, std_logging.DEBUG)
|
||||
|
||||
server = Server()
|
||||
|
||||
if not CONF['daemon']:
|
||||
@ -59,10 +66,13 @@ def main():
|
||||
pidfile = daemon.pidfile.TimeoutPIDLockFile(CONF['mgm']['pid'], 10)
|
||||
if daemon.runner.is_pidfile_stale(pidfile):
|
||||
pidfile.break_lock()
|
||||
|
||||
descriptors = get_descriptors()
|
||||
context = daemon.DaemonContext(
|
||||
working_directory='/',
|
||||
umask=0o022,
|
||||
pidfile=pidfile
|
||||
pidfile=pidfile,
|
||||
files_preserve=descriptors
|
||||
)
|
||||
if CONF['user']:
|
||||
context.uid = pwd.getpwnam(CONF['user']).pw_uid
|
||||
|
@ -35,6 +35,7 @@ import logging
|
||||
import logging.config
|
||||
import logging.handlers
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
@ -50,6 +51,24 @@ from libra.openstack.common import local
|
||||
|
||||
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password']
|
||||
|
||||
# NOTE(ldbragst): Let's build a list of regex objects using the list of
|
||||
# _SANITIZE_KEYS we already have. This way, we only have to add the new key
|
||||
# to the list of _SANITIZE_KEYS and we can generate regular expressions
|
||||
# for XML and JSON automatically.
|
||||
_SANITIZE_PATTERNS = []
|
||||
_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
|
||||
r'(<%(key)s>).*?(</%(key)s>)',
|
||||
r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
|
||||
r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])']
|
||||
|
||||
for key in _SANITIZE_KEYS:
|
||||
for pattern in _FORMAT_PATTERNS:
|
||||
reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
|
||||
_SANITIZE_PATTERNS.append(reg_ex)
|
||||
|
||||
|
||||
common_cli_opts = [
|
||||
cfg.BoolOpt('debug',
|
||||
short='d',
|
||||
@ -64,11 +83,13 @@ common_cli_opts = [
|
||||
]
|
||||
|
||||
logging_cli_opts = [
|
||||
cfg.StrOpt('log-config',
|
||||
cfg.StrOpt('log-config-append',
|
||||
metavar='PATH',
|
||||
help='If this option is specified, the logging configuration '
|
||||
'file specified is used and overrides any other logging '
|
||||
'options specified. Please see the Python logging module '
|
||||
deprecated_name='log-config',
|
||||
help='The name of logging configuration file. It does not '
|
||||
'disable existing loggers, but just appends specified '
|
||||
'logging configuration to any other existing logging '
|
||||
'options. Please see the Python logging module '
|
||||
'documentation for details on logging configuration '
|
||||
'files.'),
|
||||
cfg.StrOpt('log-format',
|
||||
@ -212,6 +233,39 @@ def _get_log_file_path(binary=None):
|
||||
return None
|
||||
|
||||
|
||||
def mask_password(message, secret="***"):
|
||||
"""Replace password with 'secret' in message.
|
||||
|
||||
:param message: The string which includes security information.
|
||||
:param secret: value with which to replace passwords, defaults to "***".
|
||||
:returns: The unicode value of message with the password fields masked.
|
||||
|
||||
For example:
|
||||
>>> mask_password("'adminPass' : 'aaaaa'")
|
||||
"'adminPass' : '***'"
|
||||
>>> mask_password("'admin_pass' : 'aaaaa'")
|
||||
"'admin_pass' : '***'"
|
||||
>>> mask_password('"password" : "aaaaa"')
|
||||
'"password" : "***"'
|
||||
>>> mask_password("'original_password' : 'aaaaa'")
|
||||
"'original_password' : '***'"
|
||||
>>> mask_password("u'original_password' : u'aaaaa'")
|
||||
"u'original_password' : u'***'"
|
||||
"""
|
||||
message = six.text_type(message)
|
||||
|
||||
# NOTE(ldbragst): Check to see if anything in message contains any key
|
||||
# specified in _SANITIZE_KEYS, if not then just return the message since
|
||||
# we don't have to mask any passwords.
|
||||
if not any(key in message for key in _SANITIZE_KEYS):
|
||||
return message
|
||||
|
||||
secret = r'\g<1>' + secret + r'\g<2>'
|
||||
for pattern in _SANITIZE_PATTERNS:
|
||||
message = re.sub(pattern, secret, message)
|
||||
return message
|
||||
|
||||
|
||||
class BaseLoggerAdapter(logging.LoggerAdapter):
|
||||
|
||||
def audit(self, msg, *args, **kwargs):
|
||||
@ -355,17 +409,18 @@ class LogConfigError(Exception):
|
||||
err_msg=self.err_msg)
|
||||
|
||||
|
||||
def _load_log_config(log_config):
|
||||
def _load_log_config(log_config_append):
|
||||
try:
|
||||
logging.config.fileConfig(log_config)
|
||||
logging.config.fileConfig(log_config_append,
|
||||
disable_existing_loggers=False)
|
||||
except moves.configparser.Error as exc:
|
||||
raise LogConfigError(log_config, str(exc))
|
||||
raise LogConfigError(log_config_append, str(exc))
|
||||
|
||||
|
||||
def setup(product_name):
|
||||
"""Setup logging."""
|
||||
if CONF.log_config:
|
||||
_load_log_config(CONF.log_config)
|
||||
if CONF.log_config_append:
|
||||
_load_log_config(CONF.log_config_append)
|
||||
else:
|
||||
_setup_logging_from_conf()
|
||||
sys.excepthook = _create_logging_excepthook(product_name)
|
||||
@ -429,7 +484,7 @@ def _setup_logging_from_conf():
|
||||
|
||||
if CONF.publish_errors:
|
||||
handler = importutils.import_object(
|
||||
"libra.openstack.common.log_handler.PublishErrorsHandler",
|
||||
"openstack.common.log_handler.PublishErrorsHandler",
|
||||
logging.ERROR)
|
||||
log_root.addHandler(handler)
|
||||
|
||||
|
@ -77,6 +77,7 @@ class Launcher(object):
|
||||
"""
|
||||
self.services = Services()
|
||||
self.backdoor_port = eventlet_backdoor.initialize_if_enabled()
|
||||
g_
|
||||
|
||||
def launch_service(self, service):
|
||||
"""Load and start the given service.
|
||||
|
@ -17,18 +17,17 @@ from libra.tests.base import TestCase
|
||||
import libra.tests.mock_objects
|
||||
from libra import __version__ as libra_version
|
||||
from libra import __release__ as libra_release
|
||||
from libra.openstack.common import log
|
||||
from libra.worker.controller import LBaaSController as c
|
||||
from libra.worker.drivers.base import LoadBalancerDriver
|
||||
from libra.worker.drivers.haproxy.driver import HAProxyDriver
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class TestWorkerController(TestCase):
|
||||
def setUp(self):
|
||||
super(TestWorkerController, self).setUp()
|
||||
self.logger = logging.getLogger('test_worker_controller')
|
||||
self.lh = libra.tests.mock_objects.MockLoggingHandler()
|
||||
self.logger.setLevel(logging.DEBUG)
|
||||
self.logger.addHandler(self.lh)
|
||||
self.driver = HAProxyDriver('libra.tests.mock_objects.FakeOSServices',
|
||||
None, None, None)
|
||||
|
||||
@ -36,7 +35,7 @@ class TestWorkerController(TestCase):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'BOGUS'
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn(c.RESPONSE_FIELD, response)
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_FAILURE)
|
||||
@ -46,7 +45,7 @@ class TestWorkerController(TestCase):
|
||||
c.ACTION_FIELD: 'UPDATE',
|
||||
'LoAdBaLaNcErS': [{'protocol': 'http'}]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
|
||||
@ -66,7 +65,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn(c.RESPONSE_FIELD, response)
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
|
||||
@ -75,7 +74,7 @@ class TestWorkerController(TestCase):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'SUSPEND'
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn(c.RESPONSE_FIELD, response)
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
|
||||
@ -84,7 +83,7 @@ class TestWorkerController(TestCase):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'ENABLE'
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn(c.RESPONSE_FIELD, response)
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
|
||||
@ -93,7 +92,7 @@ class TestWorkerController(TestCase):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'DELETE'
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn(c.RESPONSE_FIELD, response)
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
|
||||
@ -113,7 +112,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -135,7 +134,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -145,7 +144,7 @@ class TestWorkerController(TestCase):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'UPDATE'
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -156,7 +155,7 @@ class TestWorkerController(TestCase):
|
||||
c.ACTION_FIELD: 'UPDATE',
|
||||
c.LBLIST_FIELD: [{'protocol': 'http'}]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -177,7 +176,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -206,7 +205,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertNotIn('badRequest', response)
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
|
||||
@ -233,7 +232,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -261,7 +260,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -289,7 +288,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -317,7 +316,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -345,7 +344,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn(c.RESPONSE_FIELD, response)
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
|
||||
@ -367,7 +366,7 @@ class TestWorkerController(TestCase):
|
||||
}
|
||||
]
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn(c.RESPONSE_FIELD, response)
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_FAILURE)
|
||||
@ -376,7 +375,7 @@ class TestWorkerController(TestCase):
|
||||
msg = {
|
||||
c.ACTION_FIELD: 'DISCOVER'
|
||||
}
|
||||
controller = c(self.logger, self.driver, msg)
|
||||
controller = c(self.driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('version', response)
|
||||
self.assertIn('release', response)
|
||||
@ -389,7 +388,7 @@ class TestWorkerController(TestCase):
|
||||
c.ACTION_FIELD: 'ARCHIVE'
|
||||
}
|
||||
null_driver = LoadBalancerDriver()
|
||||
controller = c(self.logger, null_driver, msg)
|
||||
controller = c(null_driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -401,7 +400,7 @@ class TestWorkerController(TestCase):
|
||||
c.OBJ_STORE_TYPE_FIELD: 'bad'
|
||||
}
|
||||
null_driver = LoadBalancerDriver()
|
||||
controller = c(self.logger, null_driver, msg)
|
||||
controller = c(null_driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
|
||||
@ -416,7 +415,7 @@ class TestWorkerController(TestCase):
|
||||
c.OBJ_STORE_TOKEN_FIELD: "XXXX",
|
||||
c.LBLIST_FIELD: [{'protocol': 'http', 'id': '123'}]
|
||||
}
|
||||
controller = c(self.logger, null_driver, msg)
|
||||
controller = c(null_driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -431,7 +430,7 @@ class TestWorkerController(TestCase):
|
||||
c.OBJ_STORE_TOKEN_FIELD: "XXXX",
|
||||
c.LBLIST_FIELD: [{'protocol': 'http', 'id': '123'}]
|
||||
}
|
||||
controller = c(self.logger, null_driver, msg)
|
||||
controller = c(null_driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -446,7 +445,7 @@ class TestWorkerController(TestCase):
|
||||
c.OBJ_STORE_ENDPOINT_FIELD: "https://example.com",
|
||||
c.LBLIST_FIELD: [{'protocol': 'http', 'id': '123'}]
|
||||
}
|
||||
controller = c(self.logger, null_driver, msg)
|
||||
controller = c(null_driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -461,7 +460,7 @@ class TestWorkerController(TestCase):
|
||||
c.OBJ_STORE_ENDPOINT_FIELD: "https://example.com",
|
||||
c.OBJ_STORE_TOKEN_FIELD: "XXXX"
|
||||
}
|
||||
controller = c(self.logger, null_driver, msg)
|
||||
controller = c(null_driver, msg)
|
||||
response = controller.run()
|
||||
self.assertIn('badRequest', response)
|
||||
msg = response['badRequest']['validationErrors']['message']
|
||||
@ -477,7 +476,7 @@ class TestWorkerController(TestCase):
|
||||
c.LBLIST_FIELD: [{'protocol': 'http', 'id': '123'}]
|
||||
}
|
||||
null_driver = LoadBalancerDriver()
|
||||
controller = c(self.logger, null_driver, msg)
|
||||
controller = c(null_driver, msg)
|
||||
response = controller.run()
|
||||
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_FAILURE)
|
||||
self.assertIn(c.ERROR_FIELD, response)
|
||||
|
@ -26,9 +26,6 @@ cfg.CONF.register_opts(
|
||||
default='haproxy',
|
||||
choices=known_drivers.keys(),
|
||||
help='Type of device to use'),
|
||||
cfg.StrOpt('logfile',
|
||||
default='/var/log/libra/libra_worker.log',
|
||||
help='Log file'),
|
||||
cfg.StrOpt('pid',
|
||||
default='/var/run/libra/libra_worker.pid',
|
||||
help='PID file'),
|
||||
|
@ -20,8 +20,11 @@ from libra import __version__ as libra_version
|
||||
from libra import __release__ as libra_release
|
||||
from libra.common.exc import DeletedStateError
|
||||
from libra.common.faults import BadRequest
|
||||
from libra.openstack.common import log
|
||||
from libra.worker.drivers.base import LoadBalancerDriver
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class LBaaSController(object):
|
||||
|
||||
@ -38,8 +41,7 @@ class LBaaSController(object):
|
||||
OBJ_STORE_ENDPOINT_FIELD = 'hpcs_object_store_endpoint'
|
||||
OBJ_STORE_TOKEN_FIELD = 'hpcs_object_store_token'
|
||||
|
||||
def __init__(self, logger, driver, json_msg):
|
||||
self.logger = logger
|
||||
def __init__(self, driver, json_msg):
|
||||
self.driver = driver
|
||||
self.msg = json_msg
|
||||
|
||||
@ -49,7 +51,7 @@ class LBaaSController(object):
|
||||
"""
|
||||
|
||||
if self.ACTION_FIELD not in self.msg:
|
||||
self.logger.error("Missing `%s` value" % self.ACTION_FIELD)
|
||||
LOG.error("Missing `%s` value" % self.ACTION_FIELD)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
@ -76,13 +78,11 @@ class LBaaSController(object):
|
||||
elif action == 'DIAGNOSTICS':
|
||||
return self._action_diagnostic()
|
||||
else:
|
||||
self.logger.error("Invalid `%s` value: %s" %
|
||||
(self.ACTION_FIELD, action))
|
||||
LOG.error("Invalid `%s` value: %s", self.ACTION_FIELD, action)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
except Exception as e:
|
||||
self.logger.error("Controller exception: %s, %s" %
|
||||
(e.__class__, e))
|
||||
LOG.error("Controller exception: %s, %s", e.__class__, e)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
@ -128,7 +128,7 @@ class LBaaSController(object):
|
||||
sock.close()
|
||||
return True
|
||||
except socket.error:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"TCP connect error to gearman server {0}"
|
||||
.format(ip)
|
||||
)
|
||||
@ -161,7 +161,7 @@ class LBaaSController(object):
|
||||
except NotImplementedError:
|
||||
pass
|
||||
except Exception as e:
|
||||
self.logger.error("Selected driver failed initialization.")
|
||||
LOG.error("Selected driver failed initialization.")
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
@ -187,13 +187,13 @@ class LBaaSController(object):
|
||||
try:
|
||||
self.driver.add_protocol(current_lb['protocol'], port)
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Selected driver does not support setting protocol."
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Failure trying to set protocol: %s, %s" %
|
||||
(e.__class__, e)
|
||||
)
|
||||
@ -207,7 +207,7 @@ class LBaaSController(object):
|
||||
elif algo == 'LEAST_CONNECTIONS':
|
||||
algo = LoadBalancerDriver.LEASTCONN
|
||||
else:
|
||||
self.logger.error("Invalid algorithm: %s" % algo)
|
||||
LOG.error("Invalid algorithm: %s" % algo)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
else:
|
||||
@ -216,13 +216,13 @@ class LBaaSController(object):
|
||||
try:
|
||||
self.driver.set_algorithm(current_lb['protocol'], algo)
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Selected driver does not support setting algorithm."
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Selected driver failed setting algorithm."
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
@ -245,13 +245,13 @@ class LBaaSController(object):
|
||||
monitor['attempts'],
|
||||
monitor['path'])
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Selected driver does not support adding healthchecks."
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Selected driver failed adding healthchecks: %s, %s" %
|
||||
(e.__class__, e)
|
||||
)
|
||||
@ -296,25 +296,25 @@ class LBaaSController(object):
|
||||
except NotImplementedError:
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
error = "Selected driver does not support adding a server"
|
||||
self.logger.error(error)
|
||||
LOG.error(error)
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
except Exception as e:
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
error = "Failure adding server %s: %s" % (node_id, e)
|
||||
self.logger.error(error)
|
||||
LOG.error(error)
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
else:
|
||||
self.logger.debug("Added server: %s:%s" % (address, port))
|
||||
LOG.debug("Added server: %s:%s" % (address, port))
|
||||
lb_node['condition'] = self.NODE_OK
|
||||
|
||||
try:
|
||||
self.driver.create()
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Selected driver does not support CREATE action."
|
||||
)
|
||||
for current_lb in lb_list:
|
||||
@ -322,12 +322,12 @@ class LBaaSController(object):
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
except Exception as e:
|
||||
self.logger.error("CREATE failed: %s, %s" % (e.__class__, e))
|
||||
LOG.error("CREATE failed: %s, %s" % (e.__class__, e))
|
||||
for lb_node in current_lb['nodes']:
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
else:
|
||||
self.logger.info("Activated load balancer changes")
|
||||
LOG.info("Activated load balancer changes")
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
||||
|
||||
return self.msg
|
||||
@ -337,12 +337,12 @@ class LBaaSController(object):
|
||||
try:
|
||||
self.driver.suspend()
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Selected driver does not support SUSPEND action."
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
except Exception as e:
|
||||
self.logger.error("SUSPEND failed: %s, %s" % (e.__class__, e))
|
||||
LOG.error("SUSPEND failed: %s, %s" % (e.__class__, e))
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
else:
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
||||
@ -353,12 +353,12 @@ class LBaaSController(object):
|
||||
try:
|
||||
self.driver.enable()
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Selected driver does not support ENABLE action."
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
except Exception as e:
|
||||
self.logger.error("ENABLE failed: %s, %s" % (e.__class__, e))
|
||||
LOG.error("ENABLE failed: %s, %s" % (e.__class__, e))
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
else:
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
||||
@ -369,12 +369,12 @@ class LBaaSController(object):
|
||||
try:
|
||||
self.driver.delete()
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
LOG.error(
|
||||
"Selected driver does not support DELETE action."
|
||||
)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
except Exception as e:
|
||||
self.logger.error("DELETE failed: %s, %s" % (e.__class__, e))
|
||||
LOG.error("DELETE failed: %s, %s" % (e.__class__, e))
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
else:
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
|
||||
@ -430,11 +430,11 @@ class LBaaSController(object):
|
||||
self.driver.archive(method, params)
|
||||
except NotImplementedError:
|
||||
error = "Selected driver does not support ARCHIVE action."
|
||||
self.logger.error(error)
|
||||
LOG.error(error)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
except Exception as e:
|
||||
self.logger.error("ARCHIVE failed: %s, %s" % (e.__class__, e))
|
||||
LOG.error("ARCHIVE failed: %s, %s" % (e.__class__, e))
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
self.msg[self.ERROR_FIELD] = str(e)
|
||||
else:
|
||||
@ -454,15 +454,15 @@ class LBaaSController(object):
|
||||
stats = self.driver.get_status()
|
||||
except NotImplementedError:
|
||||
error = "Selected driver does not support PING action."
|
||||
self.logger.error(error)
|
||||
LOG.error(error)
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
self.msg[self.ERROR_FIELD] = error
|
||||
except DeletedStateError:
|
||||
self.logger.info("Invalid operation PING on a deleted LB")
|
||||
LOG.info("Invalid operation PING on a deleted LB")
|
||||
self.msg['status'] = 'DELETED'
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
except Exception as e:
|
||||
self.logger.error("PING failed: %s, %s" % (e.__class__, e))
|
||||
LOG.error("PING failed: %s, %s" % (e.__class__, e))
|
||||
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
|
||||
self.msg[self.ERROR_FIELD] = str(e)
|
||||
else:
|
||||
|
@ -17,18 +17,24 @@ import daemon.pidfile
|
||||
import daemon.runner
|
||||
import getpass
|
||||
import grp
|
||||
import logging as std_logging
|
||||
import pwd
|
||||
import time
|
||||
import threading
|
||||
|
||||
from libra import __version__
|
||||
from libra.openstack.common import importutils
|
||||
from libra.common.options import libra_logging, add_common_opts, CONF
|
||||
from libra.openstack.common import log as logging
|
||||
from libra.common.options import add_common_opts, CONF
|
||||
from libra.common.log import get_descriptors
|
||||
from libra.worker.drivers.base import known_drivers
|
||||
from libra.worker.drivers.haproxy.services_base import haproxy_services
|
||||
from libra.worker.worker import config_thread
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventServer(object):
|
||||
"""
|
||||
Encapsulates server activity so we can run it in either daemon or
|
||||
@ -44,17 +50,15 @@ class EventServer(object):
|
||||
that function's arguments.
|
||||
"""
|
||||
thread_list = []
|
||||
logger = libra_logging('libra_worker', 'worker')
|
||||
|
||||
driver = CONF['worker']['driver']
|
||||
logger.info("Selected driver: %s" % driver)
|
||||
LOG.info("Selected driver: %s" % driver)
|
||||
if driver == 'haproxy':
|
||||
logger.info("Selected HAProxy service: %s" %
|
||||
CONF['worker:haproxy']['service'])
|
||||
logger.info("Job server list: %s" % CONF['gearman']['servers'])
|
||||
LOG.info("Selected HAProxy service: %s" %
|
||||
CONF['worker:haproxy']['service'])
|
||||
LOG.info("Job server list: %s" % CONF['gearman']['servers'])
|
||||
|
||||
for task, task_args in tasks:
|
||||
task_args = (logger,) + task_args # Make the logger the first arg
|
||||
thd = threading.Thread(target=task, args=task_args)
|
||||
thd.daemon = True
|
||||
thread_list.append(thd)
|
||||
@ -64,10 +68,10 @@ class EventServer(object):
|
||||
try:
|
||||
time.sleep(600)
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Non-daemon session terminated")
|
||||
LOG.info("Non-daemon session terminated")
|
||||
break
|
||||
|
||||
logger.info("Shutting down")
|
||||
LOG.info("Shutting down")
|
||||
|
||||
|
||||
def main():
|
||||
@ -76,6 +80,11 @@ def main():
|
||||
add_common_opts()
|
||||
CONF(project='libra', version=__version__)
|
||||
|
||||
logging.setup('libra')
|
||||
|
||||
LOG.debug('Configuration:')
|
||||
CONF.log_opt_values(LOG, std_logging.DEBUG)
|
||||
|
||||
# Import the device driver we are going to use. This will be sent
|
||||
# along to the Gearman task that will use it to communicate with
|
||||
# the device.
|
||||
@ -112,13 +121,16 @@ def main():
|
||||
if not CONF['daemon']:
|
||||
server.main(task_list)
|
||||
else:
|
||||
|
||||
pidfile = daemon.pidfile.TimeoutPIDLockFile(CONF['worker']['pid'], 10)
|
||||
if daemon.runner.is_pidfile_stale(pidfile):
|
||||
pidfile.break_lock()
|
||||
descriptors = get_descriptors()
|
||||
context = daemon.DaemonContext(
|
||||
working_directory='/etc/haproxy',
|
||||
umask=0o022,
|
||||
pidfile=pidfile
|
||||
pidfile=pidfile,
|
||||
files_preserve=descriptors
|
||||
)
|
||||
if CONF['user']:
|
||||
context.uid = pwd.getpwnam(CONF['user']).pw_uid
|
||||
|
@ -15,38 +15,41 @@
|
||||
import eventlet
|
||||
|
||||
from libra.common.exc import ServiceUnavailable
|
||||
from libra.openstack.common import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def record_stats(logger, http_stats, tcp_stats):
|
||||
def record_stats(http_stats, tcp_stats):
|
||||
""" Permanently record load balancer statistics. """
|
||||
logger.debug("[stats] HTTP bytes in/out: (%d, %d)" %
|
||||
(http_stats.bytes_in, http_stats.bytes_out))
|
||||
logger.debug("[stats] TCP bytes in/out: (%d, %d)" %
|
||||
(tcp_stats.bytes_in, tcp_stats.bytes_out))
|
||||
LOG.debug("HTTP bytes in/out: (%d, %d)" %
|
||||
http_stats.bytes_in, http_stats.bytes_out)
|
||||
LOG.debug("[TCP bytes in/out: (%d, %d)" %
|
||||
tcp_stats.bytes_in, tcp_stats.bytes_out)
|
||||
|
||||
|
||||
def stats_thread(logger, driver, stats_poll):
|
||||
def stats_thread(driver, stats_poll):
|
||||
""" Statistics thread function. """
|
||||
logger.debug("[stats] Statistics gathering process started.")
|
||||
logger.debug("[stats] Polling interval: %d" % stats_poll)
|
||||
LOG.debug("Statistics gathering process started.")
|
||||
LOG.debug("Polling interval: %d", stats_poll)
|
||||
|
||||
while True:
|
||||
try:
|
||||
http_stats = driver.get_stats('http')
|
||||
tcp_stats = driver.get_stats('tcp')
|
||||
except NotImplementedError:
|
||||
logger.critical(
|
||||
"[stats] Driver does not implement statisics gathering."
|
||||
LOG.critical(
|
||||
"Driver does not implement statisics gathering."
|
||||
)
|
||||
break
|
||||
except ServiceUnavailable:
|
||||
logger.warn("[stats] Unable to get statistics at this time.")
|
||||
LOG.warn("Unable to get statistics at this time.")
|
||||
except Exception as e:
|
||||
logger.critical("[stats] Exception: %s, %s" % (e.__class__, e))
|
||||
LOG.critical("Exception: %s, %s" % (e.__class__, e))
|
||||
break
|
||||
else:
|
||||
record_stats(logger, http_stats, tcp_stats)
|
||||
record_stats(http_stats, tcp_stats)
|
||||
|
||||
eventlet.sleep(stats_poll)
|
||||
|
||||
logger.info("[stats] Statistics gathering process terminated.")
|
||||
LOG.info("Statistics gathering process terminated.")
|
||||
|
@ -21,11 +21,14 @@ from oslo.config import cfg
|
||||
|
||||
from libra.common.json_gearman import JSONGearmanWorker
|
||||
from libra.worker.controller import LBaaSController
|
||||
from libra.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class CustomJSONGearmanWorker(JSONGearmanWorker):
|
||||
""" Custom class we will use to pass arguments to the Gearman task. """
|
||||
logger = None
|
||||
driver = None
|
||||
|
||||
|
||||
@ -37,7 +40,6 @@ def handler(worker, job):
|
||||
from the Gearman job server. It will be executed once per request. Data
|
||||
comes in as a JSON object, and a JSON object is returned in response.
|
||||
"""
|
||||
logger = worker.logger
|
||||
driver = worker.driver
|
||||
|
||||
# Hide information that should not be logged
|
||||
@ -45,9 +47,9 @@ def handler(worker, job):
|
||||
if LBaaSController.OBJ_STORE_TOKEN_FIELD in copy:
|
||||
copy[LBaaSController.OBJ_STORE_TOKEN_FIELD] = "*****"
|
||||
|
||||
logger.debug("Received JSON message: %s" % json.dumps(copy))
|
||||
LOG.debug("Received JSON message: %s" % json.dumps(copy))
|
||||
|
||||
controller = LBaaSController(logger, driver, job.data)
|
||||
controller = LBaaSController(driver, job.data)
|
||||
response = controller.run()
|
||||
|
||||
# Hide information that should not be logged
|
||||
@ -55,15 +57,15 @@ def handler(worker, job):
|
||||
if LBaaSController.OBJ_STORE_TOKEN_FIELD in copy:
|
||||
copy[LBaaSController.OBJ_STORE_TOKEN_FIELD] = "*****"
|
||||
|
||||
logger.debug("Return JSON message: %s" % json.dumps(copy))
|
||||
LOG.debug("Return JSON message: %s" % json.dumps(copy))
|
||||
return copy
|
||||
|
||||
|
||||
def config_thread(logger, driver):
|
||||
def config_thread(driver):
|
||||
""" Worker thread function. """
|
||||
# Hostname should be a unique value, like UUID
|
||||
hostname = socket.gethostname()
|
||||
logger.info("[worker] Registering task %s" % hostname)
|
||||
LOG.info("Registering task %s" % hostname)
|
||||
|
||||
server_list = []
|
||||
for host_port in cfg.CONF['gearman']['servers']:
|
||||
@ -81,7 +83,7 @@ def config_thread(logger, driver):
|
||||
worker = CustomJSONGearmanWorker(server_list)
|
||||
worker.set_client_id(hostname)
|
||||
worker.register_task(hostname, handler)
|
||||
worker.logger = logger
|
||||
worker.logger = LOG
|
||||
worker.driver = driver
|
||||
|
||||
retry = True
|
||||
@ -92,11 +94,11 @@ def config_thread(logger, driver):
|
||||
except KeyboardInterrupt:
|
||||
retry = False
|
||||
except gearman.errors.ServerUnavailable:
|
||||
logger.error("[worker] Job server(s) went away. Reconnecting.")
|
||||
LOG.error("Job server(s) went away. Reconnecting.")
|
||||
time.sleep(cfg.CONF['gearman']['reconnect_sleep'])
|
||||
retry = True
|
||||
except Exception as e:
|
||||
logger.critical("[worker] Exception: %s, %s" % (e.__class__, e))
|
||||
LOG.critical("Exception: %s, %s" % (e.__class__, e))
|
||||
retry = False
|
||||
|
||||
logger.debug("[worker] Worker process terminated.")
|
||||
LOG.debug("Worker process terminated.")
|
||||
|
@ -1,5 +1,6 @@
|
||||
pbr>=0.5.21,<1.0
|
||||
|
||||
Babel>=1.3
|
||||
eventlet
|
||||
gearman>=2.0.2
|
||||
oslo.config>=1.2.0
|
||||
|
@ -6,5 +6,4 @@ python-subunit
|
||||
sphinx>=1.1.2
|
||||
testrepository>=0.0.8
|
||||
testtools>=0.9.22
|
||||
babel
|
||||
mox
|
||||
mox
|
||||
|
Loading…
x
Reference in New Issue
Block a user