[W.I.P] Refactor Promenade Operator

- Move away from usage of context for keystone endpoint and
  token
- Refactor Promenade Operator

Change-Id: Id32592d4f5f2d32e479344c7b859493b6bb450cb
This commit is contained in:
Anthony Lin 2018-02-12 07:05:11 +00:00
parent b8bb66d412
commit 604258a824
7 changed files with 83 additions and 73 deletions

View File

@ -44,6 +44,7 @@ class ArmadaOperator(BaseOperator):
action=None,
main_dag_name=None,
shipyard_conf=None,
svc_token=None,
sub_dag_name=None,
workflow_info={},
xcom_push=True,
@ -53,6 +54,7 @@ class ArmadaOperator(BaseOperator):
self.action = action
self.main_dag_name = main_dag_name
self.shipyard_conf = shipyard_conf
self.svc_token = svc_token
self.sub_dag_name = sub_dag_name
self.workflow_info = workflow_info
self.xcom_push_flag = xcom_push
@ -88,15 +90,16 @@ class ArmadaOperator(BaseOperator):
# Validate Site Design
if self.action == 'validate_site_design':
# Initialize variable
armada_svc_endpoint = None
site_design_validity = 'invalid'
# Retrieve Endpoint Information
svc_type = 'armada'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
armada_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
site_design_validity = self.armada_validate_site_design(context,
design_ref)
site_design_validity = self.armada_validate_site_design(
armada_svc_endpoint, design_ref)
if site_design_validity == 'valid':
logging.info("Site Design has been successfully validated")
@ -108,12 +111,12 @@ class ArmadaOperator(BaseOperator):
# Create Armada Client
# Retrieve Endpoint Information
svc_type = 'armada'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Armada endpoint is %s", context['svc_endpoint'])
armada_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Armada endpoint is %s", armada_svc_endpoint)
# Set up Armada Client
armada_client = self.armada_session_client(context)
armada_client = self.armada_session_client(armada_svc_endpoint)
# Retrieve Tiller Information and assign to context 'query'
context['query'] = self.get_tiller_info(context)
@ -135,14 +138,14 @@ class ArmadaOperator(BaseOperator):
logging.info('No Action to Perform')
@shipyard_service_token
def armada_session_client(self, context):
def armada_session_client(self, armada_svc_endpoint):
# Initialize Variables
armada_url = None
a_session = None
a_client = None
# Parse Armada Service Endpoint
armada_url = urlparse(context['svc_endpoint'])
armada_url = urlparse(armada_svc_endpoint)
# Build a ArmadaSession with credentials and target host
# information.
@ -150,7 +153,7 @@ class ArmadaOperator(BaseOperator):
a_session = session.ArmadaSession(host=armada_url.hostname,
port=armada_url.port,
scheme='http',
token=context['svc_token'],
token=self.svc_token,
marker=None)
# Raise Exception if we are not able to get armada session
@ -234,6 +237,7 @@ class ArmadaOperator(BaseOperator):
def armada_get_releases(self, context, armada_client):
# Initialize Variables
armada_releases = {}
deckhand_svc_endpoint = None
# Retrieve Armada Releases after deployment
logging.info("Retrieving Armada Releases after deployment..")
@ -249,9 +253,9 @@ class ArmadaOperator(BaseOperator):
# Retrieve DeckHand Endpoint Information
svc_type = 'deckhand'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", context['svc_endpoint'])
deckhand_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Retrieve revision_id from xcom
# Note that in the case of 'deploy_site', the dag_id will
@ -265,7 +269,7 @@ class ArmadaOperator(BaseOperator):
# Form Design Reference Path that we will use to retrieve
# the Design YAMLs
deckhand_path = "deckhand+" + context['svc_endpoint']
deckhand_path = "deckhand+" + deckhand_svc_endpoint
deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(committed_revision_id),
@ -274,10 +278,10 @@ class ArmadaOperator(BaseOperator):
return deckhand_design_ref
@shipyard_service_token
def armada_validate_site_design(self, context, design_ref):
def armada_validate_site_design(self, armada_svc_endpoint, design_ref):
# Form Validation Endpoint
validation_endpoint = os.path.join(context['svc_endpoint'],
validation_endpoint = os.path.join(armada_svc_endpoint,
'validatedesign')
logging.info("Validation Endpoint is %s", validation_endpoint)
@ -285,7 +289,7 @@ class ArmadaOperator(BaseOperator):
# Define Headers and Payload
headers = {
'Content-Type': 'application/json',
'X-Auth-Token': context['svc_token']
'X-Auth-Token': self.svc_token
}
payload = {

View File

@ -45,6 +45,7 @@ class DeckhandOperator(BaseOperator):
main_dag_name=None,
shipyard_conf=None,
sub_dag_name=None,
svc_token=None,
workflow_info={},
xcom_push=True,
*args, **kwargs):
@ -54,6 +55,7 @@ class DeckhandOperator(BaseOperator):
self.main_dag_name = main_dag_name
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.svc_token = svc_token
self.workflow_info = workflow_info
self.xcom_push_flag = xcom_push
@ -90,9 +92,9 @@ class DeckhandOperator(BaseOperator):
# Retrieve Endpoint Information
svc_type = 'deckhand'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", context['svc_endpoint'])
deckhand_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Deckhand API Call
# Retrieve Design Version from DeckHand
@ -138,12 +140,12 @@ class DeckhandOperator(BaseOperator):
logging.info('No Action to Perform')
@shipyard_service_token
def deckhand_get_design(self, context):
def deckhand_get_design(self, deckhand_svc_endpoint):
# Retrieve Keystone Token and assign to X-Auth-Token Header
x_auth_token = {"X-Auth-Token": context['svc_token']}
x_auth_token = {"X-Auth-Token": self.svc_token}
# Form Revision Endpoint
revision_endpoint = os.path.join(context['svc_endpoint'],
revision_endpoint = os.path.join(deckhand_svc_endpoint,
'revisions')
# Retrieve Revision
@ -178,12 +180,12 @@ class DeckhandOperator(BaseOperator):
raise AirflowException("Failed to retrieve committed revision!")
@shipyard_service_token
def deckhand_validate_site(self, context, revision_id):
def deckhand_validate_site(self, deckhand_svc_endpoint, revision_id):
# Retrieve Keystone Token and assign to X-Auth-Token Header
x_auth_token = {"X-Auth-Token": context['svc_token']}
x_auth_token = {"X-Auth-Token": self.svc_token}
# Form Validation Endpoint
validation_endpoint = os.path.join(context['svc_endpoint'],
validation_endpoint = os.path.join(deckhand_svc_endpoint,
'revisions',
str(revision_id),
'validations')

View File

@ -52,6 +52,7 @@ class DryDockOperator(BaseOperator):
main_dag_name=None,
node_filter=None,
shipyard_conf=None,
svc_token=None,
sub_dag_name=None,
workflow_info={},
xcom_push=True,
@ -63,6 +64,7 @@ class DryDockOperator(BaseOperator):
self.main_dag_name = main_dag_name
self.node_filter = node_filter
self.shipyard_conf = shipyard_conf
self.svc_token = svc_token
self.sub_dag_name = sub_dag_name
self.workflow_info = workflow_info
self.xcom_push_flag = xcom_push
@ -119,22 +121,23 @@ class DryDockOperator(BaseOperator):
# Retrieve Endpoint Information
svc_type = 'physicalprovisioner'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
drydock_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
site_design_validity = self.drydock_validate_design(context)
site_design_validity = self.drydock_validate_design(
drydock_svc_endpoint)
return site_design_validity
# DrydockClient
# Retrieve Endpoint Information
svc_type = 'physicalprovisioner'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("DryDock endpoint is %s", context['svc_endpoint'])
drydock_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("DryDock endpoint is %s", drydock_svc_endpoint)
# Set up DryDock Client
drydock_client = self.drydock_session_client(context)
drydock_client = self.drydock_session_client(drydock_svc_endpoint)
# Read shipyard.conf
config = configparser.ConfigParser()
@ -215,7 +218,7 @@ class DryDockOperator(BaseOperator):
task_timeout = config.get('drydock', 'destroy_node_task_timeout')
logging.info("Destroying node %s from cluster...", redeploy_server)
time.sleep(30)
time.sleep(15)
logging.info("Successfully deleted node %s", redeploy_server)
# TODO: Uncomment when the function to destroy/delete node is
@ -228,21 +231,21 @@ class DryDockOperator(BaseOperator):
logging.info('No Action to Perform')
@shipyard_service_token
def drydock_session_client(self, context):
def drydock_session_client(self, drydock_svc_endpoint):
# Initialize Variables
drydock_url = None
dd_session = None
dd_client = None
# Parse DryDock Service Endpoint
drydock_url = urlparse(context['svc_endpoint'])
drydock_url = urlparse(drydock_svc_endpoint)
# Build a DrydockSession with credentials and target host
# information.
logging.info("Build DryDock Session")
dd_session = session.DrydockSession(drydock_url.hostname,
port=drydock_url.port,
token=context['svc_token'])
token=self.svc_token)
# Raise Exception if we are not able to get a drydock session
if dd_session:
@ -350,10 +353,11 @@ class DryDockOperator(BaseOperator):
# Set up new drydock client with new keystone token
logging.info("Setting up new drydock session...")
context['svc_endpoint'] = ucp_service_endpoint(
drydock_svc_endpoint = ucp_service_endpoint(
self, svc_type='physicalprovisioner')
new_dd_client = self.drydock_session_client(context)
new_dd_client = self.drydock_session_client(
drydock_svc_endpoint)
except errors.ClientForbiddenError as forbidden_error:
raise AirflowException(forbidden_error)
@ -398,9 +402,9 @@ class DryDockOperator(BaseOperator):
# Retrieve DeckHand Endpoint Information
svc_type = 'deckhand'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", context['svc_endpoint'])
deckhand_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Retrieve revision_id from xcom
# Note that in the case of 'deploy_site', the dag_id will
@ -414,7 +418,7 @@ class DryDockOperator(BaseOperator):
# Form DeckHand Design Reference Path that we will use to retrieve
# the DryDock YAMLs
deckhand_path = "deckhand+" + context['svc_endpoint']
deckhand_path = "deckhand+" + deckhand_svc_endpoint
deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(committed_revision_id),
@ -423,10 +427,10 @@ class DryDockOperator(BaseOperator):
return deckhand_design_ref
@shipyard_service_token
def drydock_validate_design(self, context):
def drydock_validate_design(self, drydock_svc_endpoint):
# Form Validation Endpoint
validation_endpoint = os.path.join(context['svc_endpoint'],
validation_endpoint = os.path.join(drydock_svc_endpoint,
'validatedesign')
logging.info("Validation Endpoint is %s", validation_endpoint)
@ -434,7 +438,7 @@ class DryDockOperator(BaseOperator):
# Define Headers and Payload
headers = {
'Content-Type': 'application/json',
'X-Auth-Token': context['svc_token']
'X-Auth-Token': self.svc_token
}
payload = {

View File

@ -39,6 +39,7 @@ class PromenadeOperator(BaseOperator):
main_dag_name=None,
shipyard_conf=None,
sub_dag_name=None,
svc_token=None,
workflow_info={},
xcom_push=True,
*args, **kwargs):
@ -48,6 +49,7 @@ class PromenadeOperator(BaseOperator):
self.main_dag_name = main_dag_name
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.svc_token = svc_token
self.workflow_info = workflow_info
self.xcom_push_flag = xcom_push
@ -94,8 +96,7 @@ class PromenadeOperator(BaseOperator):
# Promenade API Call
# Drain node using Promenade
if self.action == 'promenade_drain_node':
node_drained = self.promenade_drain_node(context,
redeploy_server)
node_drained = self.promenade_drain_node(redeploy_server)
if node_drained:
logging.info("Node %s has been successfully drained",
@ -106,8 +107,7 @@ class PromenadeOperator(BaseOperator):
# Remove labels using Promenade
elif self.action == 'promenade_remove_labels':
labels_removed = self.promenade_drain_node(context,
redeploy_server)
labels_removed = self.promenade_remove_labels(redeploy_server)
if labels_removed:
logging.info("Successfully removed labels on %s",
@ -118,8 +118,7 @@ class PromenadeOperator(BaseOperator):
# Stops kubelet on node using Promenade
elif self.action == 'promenade_stop_kubelet':
stop_kubelet = self.promenade_stop_kubelet(context,
redeploy_server)
stop_kubelet = self.promenade_stop_kubelet(redeploy_server)
if stop_kubelet:
logging.info("Successfully stopped kubelet on %s",
@ -130,7 +129,7 @@ class PromenadeOperator(BaseOperator):
# Performs etcd sanity check using Promenade
elif self.action == 'promenade_check_etcd':
check_etcd = self.promenade_check_etcd(context)
check_etcd = self.promenade_check_etcd()
if check_etcd:
logging.info("The etcd cluster is healthy and ready")
@ -139,8 +138,7 @@ class PromenadeOperator(BaseOperator):
# Delete node from cluster using Promenade
elif self.action == 'promenade_delete_node':
delete_node = self.promenade_delete_node(context,
redeploy_server)
delete_node = self.promenade_delete_node(redeploy_server)
if delete_node:
logging.info("Succesfully deleted node %s from cluster",
@ -154,44 +152,48 @@ class PromenadeOperator(BaseOperator):
logging.info('No Action to Perform')
@shipyard_service_token
def promenade_drain_node(self, context, redeploy_server):
def promenade_drain_node(self, redeploy_server):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("The token is %s", self.svc_token)
logging.info("Draining node...")
time.sleep(15)
return True
@shipyard_service_token
def promenade_remove_labels(self, context, redeploy_server):
def promenade_remove_labels(self, redeploy_server):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Removing labels on node...")
time.sleep(15)
return True
@shipyard_service_token
def promenade_stop_kubelet(self, context, redeploy_server):
def promenade_stop_kubelet(self, redeploy_server):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Stopping kubelet on node...")
time.sleep(15)
return True
@shipyard_service_token
def promenade_check_etcd(self, context):
def promenade_check_etcd(self):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Performing health check on etcd...")
time.sleep(15)
return True
@shipyard_service_token
def promenade_delete_node(self, context, redeploy_server):
def promenade_delete_node(self, redeploy_server):
# Placeholder function. Updates will be made when the Promenade
# API is ready for consumption.
logging.info("Deleting node from cluster...")
time.sleep(30)
logging.info("Successfully deleted node %s", redeploy_server)
time.sleep(15)
return True

View File

@ -27,7 +27,7 @@ def ucp_service_endpoint(self, svc_type):
int_endpoint = None
# Retrieve Keystone Session
sess = ucp_keystone_session(self, svc_type)
sess = ucp_keystone_session(self)
# We will allow 1 retry in getting the Keystone Endpoint with a
# backoff interval of 10 seconds in case there is a temporary

View File

@ -22,7 +22,7 @@ from keystoneauth1.identity import v3 as keystone_v3
from keystoneauth1 import session as keystone_session
def ucp_keystone_session(self, context):
def ucp_keystone_session(self):
# Read and parse shiyard.conf
config = configparser.ConfigParser()

View File

@ -23,7 +23,7 @@ from service_session import ucp_keystone_session
def shipyard_service_token(func):
@wraps(func)
def keystone_token_get(self, context, *args):
def keystone_token_get(self, *args):
"""This function retrieves Keystone token for UCP Services
:param context: Information on the current workflow
@ -42,10 +42,9 @@ def shipyard_service_token(func):
"""
# Initialize variables
retry = 0
token = None
# Retrieve Keystone Session
sess = ucp_keystone_session(self, context)
sess = ucp_keystone_session(self)
# We will allow 1 retry in getting the Keystone Token with a
# backoff interval of 10 seconds in case there is a temporary
@ -54,12 +53,11 @@ def shipyard_service_token(func):
while retry <= 1:
# Retrieve Keystone Token
logging.info("Get Keystone Token")
token = sess.get_auth_headers().get('X-Auth-Token')
self.svc_token = sess.get_auth_headers().get('X-Auth-Token')
# Retry if we fail to get the keystone token
if token:
if self.svc_token:
logging.info("Successfully Retrieved Keystone Token")
context['svc_token'] = token
break
else:
logging.info("Unable to get Keystone Token on first attempt")
@ -68,9 +66,9 @@ def shipyard_service_token(func):
retry += 1
# Raise Execptions if we fail to get a proper response
if not token:
if not self.svc_token:
raise AirflowException("Unable to get Keystone Token!")
else:
return func(self, context, *args)
return func(self, *args)
return keystone_token_get