Haproxy container for galera
We add haproxy side-cont to the galera pod. We bind galera to port 33306 and haproxy takes its place with 3306. Haproxy starts with non-working backend. After it start, it checks for current leader in etcd, if there is none, it set current leader to itself. After that it updates its backend to connect to current leader. It does polling of this logic each 5 sec and if leader is changed it will update backend again. Other nodes do the same, so, in the end, each node will be connected to the same, single galera node. Change-Id: Ieb611661857de1828259b28f54f5a0390b1dd196
This commit is contained in:
parent
613e47413a
commit
a248697bc0
13
docker/galera-haproxy/Dockerfile.j2
Normal file
13
docker/galera-haproxy/Dockerfile.j2
Normal file
@ -0,0 +1,13 @@
|
||||
FROM {{ image_spec("base-tools") }}
|
||||
MAINTAINER {{ maintainer }}
|
||||
|
||||
COPY {{ render('sources.list.debian.j2') }} /etc/apt/sources.list.d/testing.list
|
||||
COPY sudoers /etc/sudoers.d/haproxy_sudoers
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y -t testing haproxy \
|
||||
&& apt-get clean \
|
||||
&& chown -R haproxy: /etc/haproxy /var/lib/haproxy \
|
||||
&& usermod -a -G microservices haproxy
|
||||
|
||||
USER haproxy
|
2
docker/galera-haproxy/sources.list.debian.j2
Normal file
2
docker/galera-haproxy/sources.list.debian.j2
Normal file
@ -0,0 +1,2 @@
|
||||
# Testing repos
|
||||
deb {{ url.debian }} testing main
|
1
docker/galera-haproxy/sudoers
Normal file
1
docker/galera-haproxy/sudoers
Normal file
@ -0,0 +1 @@
|
||||
haproxy ALL=(root) NOPASSWD: /bin/chown -R haproxy\: /run/haproxy, /bin/mkdir /run/haproxy
|
@ -76,7 +76,7 @@ def get_etcd_client():
|
||||
@retry
|
||||
def get_mysql_client():
|
||||
mysql_client = pymysql.connect(host='127.0.0.1',
|
||||
port=3306,
|
||||
port=33306,
|
||||
user='monitor',
|
||||
password=MONITOR_PASSWORD,
|
||||
connect_timeout=1,
|
||||
|
33
service/files/haproxy.conf.j2
Normal file
33
service/files/haproxy.conf.j2
Normal file
@ -0,0 +1,33 @@
|
||||
global
|
||||
# No syslog in containers
|
||||
#log /dev/log local0
|
||||
stats socket /run/haproxy/admin.sock mode 660 level admin
|
||||
stats timeout 30s
|
||||
|
||||
# Tunes from MOS
|
||||
tune.bufsize 32768
|
||||
tune.maxrewrite 1024
|
||||
|
||||
# Default SSL material locations
|
||||
ca-base /etc/ssl/certs
|
||||
crt-base /etc/ssl/private
|
||||
ssl-default-bind-ciphers ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES:!aNULL:!MD5:!DSS
|
||||
ssl-default-bind-options no-sslv3
|
||||
|
||||
defaults
|
||||
log global
|
||||
mode tcp
|
||||
option tcplog
|
||||
option logasap
|
||||
option dontlognull
|
||||
option mysql-check
|
||||
option tcpka
|
||||
timeout connect 10s
|
||||
timeout client 28801s
|
||||
timeout server 28801s
|
||||
|
||||
listen galera-cluster
|
||||
bind 0.0.0.0:{{ percona.port.cont }}
|
||||
# We start with non-working configuration and update it via admin socket in the runtime
|
||||
server primary 127.0.0.1:11111 check
|
||||
|
291
service/files/haproxy_entrypoint.py
Normal file
291
service/files/haproxy_entrypoint.py
Normal file
@ -0,0 +1,291 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import argparse
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import etcd
|
||||
|
||||
HOSTNAME = socket.getfqdn()
|
||||
IPADDR = socket.gethostbyname(HOSTNAME)
|
||||
BACKEND_NAME = "galera-cluster"
|
||||
SERVER_NAME = "primary"
|
||||
GLOBALS_PATH = '/etc/ccp/globals/globals.json'
|
||||
|
||||
LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
|
||||
LOG_FORMAT = "%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s"
|
||||
logging.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATEFMT)
|
||||
LOG = logging.getLogger(__name__)
|
||||
LOG.setLevel(logging.DEBUG)
|
||||
|
||||
CONNECTION_ATTEMPTS = None
|
||||
CONNECTION_DELAY = None
|
||||
ETCD_PATH = None
|
||||
ETCD_HOST = None
|
||||
ETCD_PORT = None
|
||||
|
||||
# Haproxy constant for health checks
|
||||
SRV_STATE_RUNNING = 2
|
||||
SRV_CHK_RES_PASSED = 3
|
||||
|
||||
|
||||
def retry(f):
|
||||
@functools.wraps(f)
|
||||
def wrap(*args, **kwargs):
|
||||
attempts = CONNECTION_ATTEMPTS
|
||||
delay = CONNECTION_DELAY
|
||||
while attempts > 1:
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except etcd.EtcdException as e:
|
||||
LOG.warning('Etcd is not ready: %s', str(e))
|
||||
LOG.warning('Retrying in %d seconds...', delay)
|
||||
time.sleep(delay)
|
||||
attempts -= 1
|
||||
return f(*args, **kwargs)
|
||||
return wrap
|
||||
|
||||
|
||||
def get_config():
|
||||
|
||||
LOG.info("Getting global variables from %s", GLOBALS_PATH)
|
||||
variables = {}
|
||||
with open(GLOBALS_PATH) as f:
|
||||
global_conf = json.load(f)
|
||||
for key in ['percona', 'etcd', 'namespace']:
|
||||
variables[key] = global_conf[key]
|
||||
LOG.debug(variables)
|
||||
return variables
|
||||
|
||||
|
||||
def set_globals():
|
||||
|
||||
config = get_config()
|
||||
global CONNECTION_ATTEMPTS, CONNECTION_DELAY
|
||||
global ETCD_PATH, ETCD_HOST, ETCD_PORT
|
||||
|
||||
CONNECTION_ATTEMPTS = config['etcd']['connection_attempts']
|
||||
CONNECTION_DELAY = config['etcd']['connection_delay']
|
||||
ETCD_PATH = "/galera/%s" % config['percona']['cluster_name']
|
||||
ETCD_HOST = "etcd.%s" % config['namespace']
|
||||
ETCD_PORT = int(config['etcd']['client_port']['cont'])
|
||||
|
||||
|
||||
def get_etcd_client():
|
||||
|
||||
return etcd.Client(host=ETCD_HOST,
|
||||
port=ETCD_PORT,
|
||||
allow_reconnect=True,
|
||||
read_timeout=2)
|
||||
|
||||
|
||||
def get_socket():
|
||||
unix_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
unix_socket.settimeout(5)
|
||||
unix_socket.connect('/var/run/haproxy/admin.sock')
|
||||
return unix_socket
|
||||
|
||||
|
||||
def run_haproxy():
|
||||
|
||||
cmd = ["haproxy", "-f", "/etc/haproxy/haproxy.conf"]
|
||||
LOG.info("Executing cmd:\n%s", cmd)
|
||||
proc = subprocess.Popen(cmd)
|
||||
return proc
|
||||
|
||||
|
||||
def check_haproxy(proc):
|
||||
|
||||
ret_code = proc.poll()
|
||||
if ret_code is not None:
|
||||
LOG.error("Haproxy was terminated, exit code was: %s",
|
||||
proc.returncode)
|
||||
sys.exit(proc.returncode)
|
||||
|
||||
|
||||
@retry
|
||||
def etcd_set(etcd_client, key, value, ttl, dir=False, append=False, **kwargs):
|
||||
|
||||
etcd_client.write(key, value, ttl, dir, append, **kwargs)
|
||||
LOG.info("Set %s with value '%s'", key, value)
|
||||
|
||||
|
||||
@retry
|
||||
def etcd_refresh(etcd_client, path, ttl):
|
||||
|
||||
key = os.path.join(ETCD_PATH, path)
|
||||
etcd_client.refresh(key, ttl)
|
||||
LOG.info("Refreshed %s ttl. New ttl is '%s'", key, ttl)
|
||||
|
||||
|
||||
def send_command(cmd):
|
||||
|
||||
LOG.debug("Sending '%s' cmd to haproxy", cmd)
|
||||
sock = get_socket()
|
||||
sock.send(cmd + '\n')
|
||||
file_handle = sock.makefile()
|
||||
data = file_handle.read().splitlines()
|
||||
sock.close()
|
||||
return data
|
||||
|
||||
|
||||
def get_haproxy_status():
|
||||
|
||||
state_data = send_command("show servers state galera-cluster")
|
||||
stat_data = send_command("show stat typed")
|
||||
# we need to parse string which looks like this:
|
||||
# 'S.2.1.73.addr.1:CGS:str:10.233.76.104:33306'
|
||||
for line in stat_data:
|
||||
if "addr" in line:
|
||||
ip, port = line.split(':')[-2:]
|
||||
# It returns as a 3 elements list, with string inside.
|
||||
# We have to do some magic, to make a valid dict out of it.
|
||||
keys = state_data[1].split(' ')
|
||||
keys.pop(0)
|
||||
values = state_data[2].split(' ')
|
||||
data_dict = dict(zip(keys, values))
|
||||
data_dict['backend'] = "%s:%s" % (ip, port)
|
||||
return data_dict
|
||||
|
||||
|
||||
def get_cluster_state(etcd_client):
|
||||
|
||||
key = os.path.join(ETCD_PATH, 'state')
|
||||
try:
|
||||
state = etcd_client.read(key).value
|
||||
return state
|
||||
except etcd.EtcdKeyNotFound:
|
||||
return None
|
||||
|
||||
|
||||
def wait_for_cluster_to_be_steady(etcd_client, haproxy_proc):
|
||||
|
||||
while True:
|
||||
state = get_cluster_state(etcd_client)
|
||||
if state != 'STEADY':
|
||||
check_haproxy(haproxy_proc)
|
||||
LOG.warning("Cluster is not in the STEADY state, waiting...")
|
||||
time.sleep(5)
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
def set_server_addr(leader_ip):
|
||||
|
||||
cmds = ["set server %s/%s addr %s port 33306" % (
|
||||
BACKEND_NAME, SERVER_NAME, leader_ip),
|
||||
"set server %s/%s check-port 33306" % (
|
||||
BACKEND_NAME, SERVER_NAME)]
|
||||
for cmd in cmds:
|
||||
# Bug in haproxy. Sometimes, haproxy can't convert port str to int.
|
||||
# Will be fixed in 1.7.2
|
||||
while True:
|
||||
response = send_command(cmd)
|
||||
if "problem converting port" in response[0]:
|
||||
LOG.error("Port convertation failed, trying again...")
|
||||
time.sleep(1)
|
||||
else:
|
||||
LOG.info("Successfuly set backend to %s:33306", leader_ip)
|
||||
return
|
||||
|
||||
|
||||
def get_leader(etcd_client):
|
||||
|
||||
key = os.path.join(ETCD_PATH, 'leader')
|
||||
try:
|
||||
leader = etcd_client.read(key).value
|
||||
except etcd.EtcdKeyNotFound:
|
||||
leader = None
|
||||
|
||||
LOG.info("Current leader is: %s", leader)
|
||||
return leader
|
||||
|
||||
|
||||
def set_leader(etcd_client, ttl, **kwargs):
|
||||
|
||||
key = os.path.join(ETCD_PATH, 'leader')
|
||||
etcd_set(etcd_client, key, IPADDR, ttl, **kwargs)
|
||||
|
||||
|
||||
def refresh_leader(etcd_client, ttl):
|
||||
|
||||
key = os.path.join(ETCD_PATH, 'leader')
|
||||
etcd_refresh(etcd_client, key, ttl)
|
||||
|
||||
|
||||
def do_we_need_to_reconfigure_haproxy(leader):
|
||||
|
||||
haproxy_stat = get_haproxy_status()
|
||||
haproxy_leader = haproxy_stat['backend']
|
||||
leader += ":33306"
|
||||
LOG.debug("Haproxy server is: %s. Current leader is: %s",
|
||||
haproxy_leader, leader)
|
||||
return haproxy_leader != leader
|
||||
|
||||
|
||||
def run_daemon(ttl):
|
||||
|
||||
LOG.debug("My IP is: %s", IPADDR)
|
||||
haproxy_proc = run_haproxy()
|
||||
etcd_client = get_etcd_client()
|
||||
while True:
|
||||
wait_for_cluster_to_be_steady(etcd_client, haproxy_proc)
|
||||
leader = get_leader(etcd_client)
|
||||
if not leader:
|
||||
set_leader(etcd_client, ttl, prevExist=False)
|
||||
leader = IPADDR
|
||||
elif leader == IPADDR:
|
||||
refresh_leader(etcd_client, ttl)
|
||||
|
||||
if do_we_need_to_reconfigure_haproxy(leader):
|
||||
LOG.info("Updating haproxy configuration")
|
||||
set_server_addr(leader)
|
||||
check_haproxy(haproxy_proc)
|
||||
LOG.info("Sleeping for 5 sec...")
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
def run_readiness():
|
||||
|
||||
etcd_client = get_etcd_client()
|
||||
state = get_cluster_state(etcd_client)
|
||||
if state != 'STEADY':
|
||||
LOG.error("Cluster is not in the STEADY state")
|
||||
sys.exit(1)
|
||||
leader = get_leader(etcd_client)
|
||||
if not leader:
|
||||
LOG.error("No leader found")
|
||||
sys.exit(1)
|
||||
else:
|
||||
if do_we_need_to_reconfigure_haproxy(leader):
|
||||
LOG.error("Haproxy configuration is wrong")
|
||||
sys.exit(1)
|
||||
haproxy_stat = get_haproxy_status()
|
||||
LOG.debug(haproxy_stat)
|
||||
if (int(haproxy_stat['srv_op_state']) != SRV_STATE_RUNNING and
|
||||
int(haproxy_stat['srv_check_result']) != SRV_CHK_RES_PASSED):
|
||||
LOG.error("Current leader is not alive")
|
||||
sys.exit(1)
|
||||
LOG.info("Service is ready")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('type', choices=['daemon', 'readiness'])
|
||||
args = parser.parse_args()
|
||||
|
||||
get_config()
|
||||
set_globals()
|
||||
if args.type == 'daemon':
|
||||
run_daemon(ttl=20)
|
||||
elif args.type == 'readiness':
|
||||
run_readiness()
|
||||
|
||||
# vim: set ts=4 sw=4 tw=0 et :
|
@ -1,6 +1,6 @@
|
||||
[mysqld]
|
||||
bind-address = 0.0.0.0
|
||||
port = {{ percona.port.cont }}
|
||||
port = 33306
|
||||
|
||||
datadir = /var/lib/mysql
|
||||
pid-file = /var/lib/mysql/mysqld.pid
|
||||
|
@ -255,11 +255,14 @@ def etcd_set_seqno(etcd_client, ttl):
|
||||
_etcd_set(etcd_client, key, seqno, ttl)
|
||||
|
||||
|
||||
def etcd_deregister_in_path(etcd_client, path):
|
||||
def etcd_deregister_in_path(etcd_client, path, prevValue=False):
|
||||
|
||||
key = os.path.join(ETCD_PATH, path, IPADDR)
|
||||
try:
|
||||
etcd_client.delete(key, recursive=True)
|
||||
if prevValue:
|
||||
etcd_client.delete(key, prevValue=prevValue)
|
||||
else:
|
||||
etcd_client.delete(key, recursive=True)
|
||||
LOG.warning("Deleted key %s", key)
|
||||
except etcd.EtcdKeyNotFound:
|
||||
LOG.warning("Key %s not exist", key)
|
||||
@ -632,6 +635,7 @@ def main(ttl):
|
||||
etcd_deregister_in_path(etcd_client, 'queue')
|
||||
etcd_deregister_in_path(etcd_client, 'nodes')
|
||||
etcd_deregister_in_path(etcd_client, 'seqno')
|
||||
etcd_deregister_in_path(etcd_client, 'leader', prevValue=IPADDR)
|
||||
release_lock(lock)
|
||||
|
||||
|
||||
|
@ -18,6 +18,22 @@ service:
|
||||
dependencies:
|
||||
- etcd
|
||||
command: "/opt/ccp/bin/galera_checker.py liveness"
|
||||
- name: galera-haproxy
|
||||
image: galera-haproxy
|
||||
probes:
|
||||
readiness: "/opt/ccp/bin/haproxy_entrypoint.py readiness"
|
||||
pre:
|
||||
- name: mkdir-run
|
||||
command: "sudo /bin/mkdir /run/haproxy"
|
||||
- name: chown-run
|
||||
command: "sudo /bin/chown -R haproxy: /run/haproxy"
|
||||
daemon:
|
||||
files:
|
||||
- haproxy-conf
|
||||
- haproxy_entrypoint
|
||||
dependencies:
|
||||
- etcd
|
||||
command: "/opt/ccp/bin/haproxy_entrypoint.py daemon"
|
||||
- name: galera
|
||||
image: percona
|
||||
probes:
|
||||
@ -63,3 +79,10 @@ files:
|
||||
path: /opt/ccp/bin/galera_checker.py
|
||||
content: galera_checker.py
|
||||
perm: "0755"
|
||||
haproxy-conf:
|
||||
path: /etc/haproxy/haproxy.conf
|
||||
content: haproxy.conf.j2
|
||||
haproxy_entrypoint:
|
||||
path: /opt/ccp/bin/haproxy_entrypoint.py
|
||||
content: haproxy_entrypoint.py
|
||||
perm: "0755"
|
||||
|
Loading…
x
Reference in New Issue
Block a user