Support Swift log file archive for HAProxy driver.
Fixes bug #1156852. This adds the support for archiving HAProxy log files to Swift. Files will be organized into containers based on load balancer ID. Object names will be 'haproxy-YYYMMDD-HHMMSS.log.gz' where the timestamp portion is current UTC time when the file is stored. Files are Gzip compressed. Change-Id: I5e9007ca816f4834d477be42e1cfa1624d33b54e
This commit is contained in:
parent
f781ee9350
commit
d52b510350
@ -39,7 +39,7 @@ def check_request_status(job_request):
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
hostname = socket.gethostname()
|
hostname = socket.gethostname()
|
||||||
task = "lbaas-1.0-%s" % hostname
|
task = hostname
|
||||||
client = JSONGearmanClient(['localhost:4730'])
|
client = JSONGearmanClient(['localhost:4730'])
|
||||||
data = """
|
data = """
|
||||||
{
|
{
|
||||||
|
@ -40,7 +40,7 @@ prompted for a password. It is suggested that you run the worker as
|
|||||||
the `haproxy` user and `haproxy` group on Ubuntu systems. Then add the
|
the `haproxy` user and `haproxy` group on Ubuntu systems. Then add the
|
||||||
following line to /etc/sudoers::
|
following line to /etc/sudoers::
|
||||||
|
|
||||||
%haproxy ALL = NOPASSWD: /usr/sbin/service, /bin/cp, /bin/mv, /bin/rm, /usr/bin/socat
|
%haproxy ALL = NOPASSWD: /usr/sbin/service, /bin/cp, /bin/mv, /bin/rm, /bin/chown, /usr/bin/socat
|
||||||
|
|
||||||
The above lets everyone in the *haproxy* group run those commands
|
The above lets everyone in the *haproxy* group run those commands
|
||||||
as root without being prompted for a password.
|
as root without being prompted for a password.
|
||||||
|
@ -312,7 +312,7 @@ class LBaaSController(object):
|
|||||||
).to_json()
|
).to_json()
|
||||||
|
|
||||||
lb_list = self.msg[self.LBLIST_FIELD]
|
lb_list = self.msg[self.LBLIST_FIELD]
|
||||||
params['protocol'] = lb_list[0]['protocol']
|
params['proto'] = lb_list[0]['protocol']
|
||||||
params['lbid'] = lb_list[0]['id']
|
params['lbid'] = lb_list[0]['id']
|
||||||
params['basepath'] = self.msg[self.OBJ_STORE_BASEPATH_FIELD]
|
params['basepath'] = self.msg[self.OBJ_STORE_BASEPATH_FIELD]
|
||||||
params['endpoint'] = self.msg[self.OBJ_STORE_ENDPOINT_FIELD]
|
params['endpoint'] = self.msg[self.OBJ_STORE_ENDPOINT_FIELD]
|
||||||
|
@ -12,6 +12,13 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import gzip
|
||||||
|
import hashlib
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
from datetime import datetime
|
||||||
|
from swiftclient import client as sc
|
||||||
|
|
||||||
from libra.openstack.common import importutils
|
from libra.openstack.common import importutils
|
||||||
from libra.worker.drivers.base import LoadBalancerDriver
|
from libra.worker.drivers.base import LoadBalancerDriver
|
||||||
from libra.worker.drivers.haproxy.services_base import ServicesBase
|
from libra.worker.drivers.haproxy.services_base import ServicesBase
|
||||||
@ -19,7 +26,9 @@ from libra.worker.drivers.haproxy.services_base import ServicesBase
|
|||||||
|
|
||||||
class HAProxyDriver(LoadBalancerDriver):
|
class HAProxyDriver(LoadBalancerDriver):
|
||||||
|
|
||||||
def __init__(self, ossvc):
|
def __init__(self, ossvc, user, group):
|
||||||
|
self.user = user
|
||||||
|
self.group = group
|
||||||
ossvc_driver = importutils.import_class(ossvc)
|
ossvc_driver = importutils.import_class(ossvc)
|
||||||
self.ossvc = ossvc_driver()
|
self.ossvc = ossvc_driver()
|
||||||
if not isinstance(self.ossvc, ServicesBase):
|
if not isinstance(self.ossvc, ServicesBase):
|
||||||
@ -43,7 +52,6 @@ class HAProxyDriver(LoadBalancerDriver):
|
|||||||
output.append('global')
|
output.append('global')
|
||||||
output.append(' daemon')
|
output.append(' daemon')
|
||||||
output.append(' log /dev/log local0')
|
output.append(' log /dev/log local0')
|
||||||
output.append(' log /dev/log local1 notice')
|
|
||||||
output.append(' maxconn 4096')
|
output.append(' maxconn 4096')
|
||||||
output.append(' user haproxy')
|
output.append(' user haproxy')
|
||||||
output.append(' group haproxy')
|
output.append(' group haproxy')
|
||||||
@ -99,6 +107,84 @@ class HAProxyDriver(LoadBalancerDriver):
|
|||||||
|
|
||||||
return '\n'.join(output) + '\n'
|
return '\n'.join(output) + '\n'
|
||||||
|
|
||||||
|
def _archive_swift(self, endpoint, token, basepath, lbid, proto):
|
||||||
|
"""
|
||||||
|
Archive HAProxy log files into swift.
|
||||||
|
|
||||||
|
endpoint - Object store endpoint
|
||||||
|
token - Authorization token
|
||||||
|
basepath - Container base path
|
||||||
|
lbid - Load balancer ID
|
||||||
|
proto - Protocol of the load balancer we are archiving
|
||||||
|
|
||||||
|
Note: It should be acceptable for exceptions to be thrown here as
|
||||||
|
the controller should wrap these up nicely in a message back to the
|
||||||
|
API server.
|
||||||
|
"""
|
||||||
|
|
||||||
|
proto = proto.lower()
|
||||||
|
|
||||||
|
reallog = '/mnt/log/haproxy.log'
|
||||||
|
|
||||||
|
if not os.path.exists(reallog):
|
||||||
|
raise Exception('No HAProxy logs found')
|
||||||
|
|
||||||
|
# We need a copy we can read
|
||||||
|
reallog_copy = '/tmp/haproxy.log'
|
||||||
|
self.ossvc.sudo_copy(reallog, reallog_copy)
|
||||||
|
self.ossvc.sudo_chown(reallog_copy, self.user, self.group)
|
||||||
|
|
||||||
|
# Extract contents from the log based on protocol. This is
|
||||||
|
# because each protocol (tcp or http) represents a separate
|
||||||
|
# load balancer in Libra. See _config_to_string() for the
|
||||||
|
# frontend and backend names we search for below.
|
||||||
|
|
||||||
|
filtered_log = '/tmp/haproxy-' + proto + '.log'
|
||||||
|
fh = open(filtered_log, 'wb')
|
||||||
|
for line in open(reallog_copy, 'rb'):
|
||||||
|
if re.search(proto + '-in', line):
|
||||||
|
fh.write(line)
|
||||||
|
elif re.search(proto + '-servers', line):
|
||||||
|
fh.write(line)
|
||||||
|
fh.close()
|
||||||
|
os.remove(reallog_copy)
|
||||||
|
|
||||||
|
# Compress the filtered log and generate the MD5 checksum value.
|
||||||
|
# We generate object name using UTC timestamp. The MD5 checksum of
|
||||||
|
# the compressed file is used to guarantee Swift properly receives
|
||||||
|
# the file contents.
|
||||||
|
|
||||||
|
ts = datetime.utcnow().strftime('%Y%m%d-%H%M%S')
|
||||||
|
objname = 'haproxy-' + ts + '.log.gz'
|
||||||
|
compressed_file = '/tmp/' + objname
|
||||||
|
|
||||||
|
gzip_in = open(filtered_log, 'rb')
|
||||||
|
gzip_out = gzip.open(compressed_file, 'wb')
|
||||||
|
gzip_out.writelines(gzip_in)
|
||||||
|
gzip_out.close()
|
||||||
|
gzip_in.close()
|
||||||
|
os.remove(filtered_log)
|
||||||
|
|
||||||
|
etag = hashlib.md5(open(compressed_file, 'rb').read()).hexdigest()
|
||||||
|
|
||||||
|
# We now have a file to send to Swift for storage. We'll connect
|
||||||
|
# using the pre-authorized token passed to use for the given endpoint.
|
||||||
|
# Then make sure that we have a proper container name for this load
|
||||||
|
# balancer, and place the compressed file in that container. Creating
|
||||||
|
# containers is idempotent so no need to check if it already exists.
|
||||||
|
|
||||||
|
container = '/'.join([basepath, lbid])
|
||||||
|
conn = sc.Connection(preauthurl=endpoint, preauthtoken=token)
|
||||||
|
conn.put_container(basepath)
|
||||||
|
conn.put_container(container)
|
||||||
|
logfh = open(compressed_file, 'rb')
|
||||||
|
conn.put_object(container=container,
|
||||||
|
obj=objname,
|
||||||
|
etag=etag,
|
||||||
|
contents=logfh)
|
||||||
|
logfh.close()
|
||||||
|
os.remove(compressed_file)
|
||||||
|
|
||||||
####################
|
####################
|
||||||
# Driver API Methods
|
# Driver API Methods
|
||||||
####################
|
####################
|
||||||
@ -166,3 +252,25 @@ class HAProxyDriver(LoadBalancerDriver):
|
|||||||
|
|
||||||
def get_stats(self, protocol):
|
def get_stats(self, protocol):
|
||||||
return self.ossvc.get_stats(protocol)
|
return self.ossvc.get_stats(protocol)
|
||||||
|
|
||||||
|
def archive(self, method, params):
|
||||||
|
"""
|
||||||
|
Implementation of the archive() API call.
|
||||||
|
|
||||||
|
method
|
||||||
|
Method we use for archiving the files.
|
||||||
|
|
||||||
|
params
|
||||||
|
Dictionary with parameters needed for archiving. The keys of
|
||||||
|
the dictionary will vary based on the value of 'method'.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if method == 'swift':
|
||||||
|
return self._archive_swift(params['endpoint'],
|
||||||
|
params['token'],
|
||||||
|
params['basepath'],
|
||||||
|
params['lbid'],
|
||||||
|
params['proto'])
|
||||||
|
else:
|
||||||
|
raise Exception("Driver does not support archive method '%s'" %
|
||||||
|
method)
|
||||||
|
@ -45,3 +45,11 @@ class ServicesBase:
|
|||||||
def get_stats(self):
|
def get_stats(self):
|
||||||
""" Get the stats from HAProxy. """
|
""" Get the stats from HAProxy. """
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def sudo_copy(self, from_file, to_file):
|
||||||
|
""" Do a privileged file copy. """
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def sudo_chown(self, file, user, group):
|
||||||
|
""" Do a privileged file ownership change. """
|
||||||
|
raise NotImplementedError()
|
||||||
|
@ -51,6 +51,27 @@ class UbuntuServices(ServicesBase):
|
|||||||
raise Exception("%s does not exist. Start failed." %
|
raise Exception("%s does not exist. Start failed." %
|
||||||
self._haproxy_pid)
|
self._haproxy_pid)
|
||||||
|
|
||||||
|
def sudo_copy(self, from_file, to_file):
|
||||||
|
cmd = "/usr/bin/sudo -n /bin/cp %s %s" % (from_file, to_file)
|
||||||
|
try:
|
||||||
|
subprocess.check_output(cmd.split(),
|
||||||
|
stderr=subprocess.STDOUT)
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
raise Exception("Failed to copy file: %s\n%s"
|
||||||
|
% (e, e.output.rstrip('\n')))
|
||||||
|
|
||||||
|
def sudo_chown(self, file, user, group):
|
||||||
|
if group is None:
|
||||||
|
cmd = "/usr/bin/sudo -n /bin/chown %s %s" % (user, file)
|
||||||
|
else:
|
||||||
|
cmd = "/usr/bin/sudo -n /bin/chown %s:%s %s" % (user, group, file)
|
||||||
|
try:
|
||||||
|
subprocess.check_output(cmd.split(),
|
||||||
|
stderr=subprocess.STDOUT)
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
raise Exception("Failed to change file ownership: %s\n%s"
|
||||||
|
% (e, e.output.rstrip('\n')))
|
||||||
|
|
||||||
def write_config(self, config_str):
|
def write_config(self, config_str):
|
||||||
"""
|
"""
|
||||||
Generate the new config and replace the current config file.
|
Generate the new config and replace the current config file.
|
||||||
|
@ -22,6 +22,7 @@ import daemon.pidfile
|
|||||||
import daemon.runner
|
import daemon.runner
|
||||||
import grp
|
import grp
|
||||||
import pwd
|
import pwd
|
||||||
|
import getpass
|
||||||
|
|
||||||
from libra.openstack.common import importutils
|
from libra.openstack.common import importutils
|
||||||
from libra.common.options import Options, setup_logging
|
from libra.common.options import Options, setup_logging
|
||||||
@ -112,7 +113,18 @@ def main():
|
|||||||
|
|
||||||
if args.driver == 'haproxy':
|
if args.driver == 'haproxy':
|
||||||
logger.info("Selected HAProxy service: %s" % args.haproxy_service)
|
logger.info("Selected HAProxy service: %s" % args.haproxy_service)
|
||||||
driver = driver_class(haproxy_services[args.haproxy_service])
|
if args.user:
|
||||||
|
user = args.user
|
||||||
|
else:
|
||||||
|
user = getpass.getuser()
|
||||||
|
|
||||||
|
if args.group:
|
||||||
|
group = args.group
|
||||||
|
else:
|
||||||
|
group = None
|
||||||
|
|
||||||
|
driver = driver_class(haproxy_services[args.haproxy_service],
|
||||||
|
user, group)
|
||||||
else:
|
else:
|
||||||
driver = driver_class()
|
driver = driver_class()
|
||||||
|
|
||||||
|
@ -6,7 +6,8 @@ from libra.worker.drivers.haproxy.driver import HAProxyDriver
|
|||||||
class TestHAProxyDriver(testtools.TestCase):
|
class TestHAProxyDriver(testtools.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestHAProxyDriver, self).setUp()
|
super(TestHAProxyDriver, self).setUp()
|
||||||
self.driver = HAProxyDriver('tests.mock_objects.FakeOSServices')
|
self.driver = HAProxyDriver('tests.mock_objects.FakeOSServices',
|
||||||
|
None, None)
|
||||||
|
|
||||||
def testInit(self):
|
def testInit(self):
|
||||||
""" Test the HAProxy init() method """
|
""" Test the HAProxy init() method """
|
||||||
@ -88,3 +89,13 @@ class TestHAProxyDriver(testtools.TestCase):
|
|||||||
Exception,
|
Exception,
|
||||||
self.driver.add_server, proto, '1.2.3.4', 7777, "abc")
|
self.driver.add_server, proto, '1.2.3.4', 7777, "abc")
|
||||||
self.assertEqual("Non-integer 'weight' value: 'abc'", e.message)
|
self.assertEqual("Non-integer 'weight' value: 'abc'", e.message)
|
||||||
|
|
||||||
|
def testArchive(self):
|
||||||
|
""" Test the HAProxy archive() method """
|
||||||
|
|
||||||
|
# Test an invalid archive method
|
||||||
|
method = 'invalid'
|
||||||
|
e = self.assertRaises(Exception, self.driver.archive, method, None)
|
||||||
|
self.assertEqual(
|
||||||
|
"Driver does not support archive method '%s'" % method,
|
||||||
|
e.message)
|
||||||
|
@ -13,7 +13,8 @@ class TestWorkerController(testtools.TestCase):
|
|||||||
self.lh = tests.mock_objects.MockLoggingHandler()
|
self.lh = tests.mock_objects.MockLoggingHandler()
|
||||||
self.logger.setLevel(logging.DEBUG)
|
self.logger.setLevel(logging.DEBUG)
|
||||||
self.logger.addHandler(self.lh)
|
self.logger.addHandler(self.lh)
|
||||||
self.driver = HAProxyDriver('tests.mock_objects.FakeOSServices')
|
self.driver = HAProxyDriver('tests.mock_objects.FakeOSServices',
|
||||||
|
None, None)
|
||||||
|
|
||||||
def testBadAction(self):
|
def testBadAction(self):
|
||||||
msg = {
|
msg = {
|
||||||
|
@ -3,3 +3,4 @@ gearman
|
|||||||
python-daemon
|
python-daemon
|
||||||
requests>=1.0.0
|
requests>=1.0.0
|
||||||
python_novaclient>=2.11.1
|
python_novaclient>=2.11.1
|
||||||
|
python_swiftclient>=1.3.0
|
||||||
|
Loading…
x
Reference in New Issue
Block a user