Make the node pool manager read from the API server
Change-Id: I118ebb15f24ceaafa0bad7631fcffed2abf7e675 TODO: make it write too!
This commit is contained in:
parent
117ffdfade
commit
8e9970ac07
@ -45,7 +45,6 @@ nova_region = region
|
||||
nova_keyname = default
|
||||
nova_secgroup = default
|
||||
haproxy_image = 12345
|
||||
api_servers = 10.0.0.1:1234 10.0.0.2:4321
|
||||
nodes = 1
|
||||
api_servers = 10.0.0.1:8889 10.0.0.2:8889
|
||||
nodes = 10
|
||||
check_interval = 5
|
||||
sync_interval = 60
|
||||
|
@ -21,6 +21,7 @@ import time
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from libra.mgm.rest import APIClient
|
||||
from libra.common.options import Options, setup_logging
|
||||
|
||||
|
||||
@ -39,17 +40,12 @@ class Server(object):
|
||||
signal.signal(signal.SIGINT, self.exit_handler)
|
||||
signal.signal(signal.SIGTERM, self.exit_handler)
|
||||
|
||||
# make initial sync and then run scheduler
|
||||
# make initial check and then run scheduler
|
||||
self.logger.info(
|
||||
'Scheduling node sync for {sync} minutes'
|
||||
.format(sync=self.args.sync_interval)
|
||||
)
|
||||
self.logger.info(
|
||||
'and node check for {check} minutes'
|
||||
'Scheduling node check for {check} minutes'
|
||||
.format(check=self.args.check_interval)
|
||||
)
|
||||
self.rlock = threading.RLock()
|
||||
self.sync_nodes()
|
||||
self.check_nodes()
|
||||
while True:
|
||||
time.sleep(1)
|
||||
@ -58,17 +54,35 @@ class Server(object):
|
||||
""" check if known nodes are used """
|
||||
with self.rlock:
|
||||
self.logger.info('Checking if new nodes are needed')
|
||||
self.ct = threading.Timer(60 * int(self.args.check_interval),
|
||||
self.check_nodes, ())
|
||||
self.ct.start()
|
||||
api = APIClient(self.args.api_servers, self.logger)
|
||||
if api.is_online:
|
||||
self.logger.info('Connected to {url}'.format(url=api.url))
|
||||
status, usage = api.get_usage()
|
||||
if not status:
|
||||
self.reset_scheduler()
|
||||
return
|
||||
if usage['free'] < self.args.nodes:
|
||||
# we need to build new nodes
|
||||
self.logger.info(
|
||||
'Building {nodes} nodes'
|
||||
.format(nodes=self.args.nodes - usage['free'])
|
||||
)
|
||||
# TODO:
|
||||
# build nodes
|
||||
# send to API server
|
||||
# deal with case where node is created but not sent to API
|
||||
else:
|
||||
self.logger.info('No new nodes required')
|
||||
else:
|
||||
self.logger.error('No working API server found')
|
||||
self.reset_scheduler()
|
||||
|
||||
def sync_nodes(self):
|
||||
""" sync list of known nodes """
|
||||
with self.rlock:
|
||||
self.logger.info('Syncing internal nodes list')
|
||||
self.st = threading.Timer(60 * int(self.args.sync_interval),
|
||||
self.sync_nodes, ())
|
||||
self.st.start()
|
||||
def reset_scheduler(self):
|
||||
self.logger.info('Sleeping for {mins} minutes'
|
||||
.format(mins=self.args.check_interval))
|
||||
self.ct = threading.Timer(60 * int(self.args.check_interval),
|
||||
self.check_nodes, ())
|
||||
self.ct.start()
|
||||
|
||||
def exit_handler(self, signum, frame):
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
@ -99,10 +113,6 @@ def main():
|
||||
'--check_interval', type=int, default=5,
|
||||
help='how often to check if new nodes are needed (in minutes)'
|
||||
)
|
||||
options.parser.add_argument(
|
||||
'--sync_interval', type=int, default=60,
|
||||
help='how often to sync node lost (in minutes)'
|
||||
)
|
||||
args = options.run()
|
||||
|
||||
logger = setup_logging('libra_mgm', args)
|
||||
|
@ -17,6 +17,8 @@ import time
|
||||
|
||||
from novaclient import client
|
||||
|
||||
LIBRA_VERSION = 'v1'
|
||||
|
||||
|
||||
class Node(object):
|
||||
def __init__(self, username, password, tenant, auth_url, region, keyname,
|
||||
@ -73,7 +75,7 @@ class Node(object):
|
||||
""" create a nova node """
|
||||
url = "/servers"
|
||||
body = {"server": {
|
||||
"name": 'libra_{0}'.format(node_id),
|
||||
"name": 'lbaas-{0}-{1}'.format(LIBRA_VERSION, node_id),
|
||||
"imageRef": self.image,
|
||||
"key_name": self.keyname,
|
||||
"flavorRef": self.node_type,
|
||||
|
@ -14,31 +14,40 @@
|
||||
|
||||
import requests
|
||||
import json
|
||||
import random
|
||||
import sys
|
||||
|
||||
API_VERSION = 'v1'
|
||||
|
||||
|
||||
class APIClient(object):
|
||||
def __init__(self, url):
|
||||
self.url = url
|
||||
def __init__(self, addresses, logger):
|
||||
self.logger = logger
|
||||
addresses = addresses.split(' ')
|
||||
random.shuffle(addresses)
|
||||
for address in addresses:
|
||||
self.url = 'https://{0}/{1}'.format(address, API_VERSION)
|
||||
logger.info('Trying {url}'.format(url=self.url))
|
||||
status, data = self._get('{url}/devices/usage'
|
||||
.format(url=self.url))
|
||||
if status:
|
||||
logger.info('API Server is online')
|
||||
self.is_online = True
|
||||
return
|
||||
|
||||
# if we get this far all API servers are down
|
||||
self.is_online = False
|
||||
|
||||
def get_node_list(self, limit, marker):
|
||||
if marker:
|
||||
marker = '&marker={id}'.format(id=id)
|
||||
return self._get('{url}/devices'.format(url=self.url))
|
||||
|
||||
r = requests.get(
|
||||
'{url}/devices/?limit={limit}{marker}'
|
||||
.format(url=self.url, limit=limit, marker=marker)
|
||||
)
|
||||
return r.json
|
||||
|
||||
def get_metrics(self):
|
||||
r = requests.get('{url}/devices/metrics'.format(url=self.url))
|
||||
return r.json
|
||||
def get_usage(self):
|
||||
return self._get('{url}/devices/usage'.format(url=self.url))
|
||||
|
||||
def get_node(self, node_id):
|
||||
r = requests.get(
|
||||
return self._get(
|
||||
'{url}/devices/{nid}'.format(url=self.url, nid=node_id)
|
||||
)
|
||||
return r.json
|
||||
|
||||
def add_node(self, node_data):
|
||||
requests.post('{url}/devices', json.dumps(node_data))
|
||||
@ -53,3 +62,17 @@ class APIClient(object):
|
||||
'{url}/devices/{nid}'.format(url=self.url, nid=node_id),
|
||||
json.dumps(node_data)
|
||||
)
|
||||
|
||||
def _get(self, url):
|
||||
try:
|
||||
r = requests.get(url, verify=False)
|
||||
except:
|
||||
self.logger.error('Exception communicating to server: {exc}'
|
||||
.format(exc=sys.exc_info()[0]))
|
||||
return False, None
|
||||
|
||||
if r.status_code != 200:
|
||||
self.logger.error('Server returned error {code}'
|
||||
.format(code=r.status_code))
|
||||
return False, r.json
|
||||
return True, r.json
|
||||
|
Loading…
x
Reference in New Issue
Block a user