Add Site Design Validation to DeckHand Operator
This P.S. is meant to extend the functionalities of the DeckHand Operator to support 'validate_site_design' Change-Id: Id0d142044a99f9db234282d3991f861433a772a3
This commit is contained in:
parent
28e112dfdd
commit
ab3ae1a86c
@ -15,6 +15,46 @@
|
||||
from airflow.models import DAG
|
||||
from airflow.operators import DeckhandOperator
|
||||
from airflow.operators import PlaceholderOperator
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
|
||||
|
||||
'''
|
||||
Note that in the event where the 'deploy_site' Action is triggered
|
||||
from Shipyard, the 'parent_dag_name' variable gets assigned with
|
||||
'deploy_site.validate_site_design'. The name of the main dag will
|
||||
be the front part of that value, i.e. 'deploy_site'. Hence we will
|
||||
extract the front part and assign it to main_dag for the functions
|
||||
defined below
|
||||
'''
|
||||
# Location of shiyard.conf
|
||||
config_path = '/usr/local/airflow/plugins/shipyard.conf'
|
||||
|
||||
# Names used for sub-subdags in UCP components design verification
|
||||
DECKHAND_VALIDATE_DOCS_DAG_NAME = 'deckhand_validate_site_design'
|
||||
|
||||
|
||||
def deckhand_validate_site_design(parent_dag_name, child_dag_name, args):
|
||||
'''
|
||||
Validate Site Design - Deckhand
|
||||
'''
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args, )
|
||||
|
||||
# Assigns value 'deploy_site.deckhand_validate_site_design' to
|
||||
# the sub_dag
|
||||
child_dag = parent_dag_name[0:parent_dag_name.find('.')] + \
|
||||
'.' + DECKHAND_VALIDATE_DOCS_DAG_NAME
|
||||
|
||||
operator = DeckhandOperator(
|
||||
task_id=DECKHAND_VALIDATE_DOCS_DAG_NAME,
|
||||
shipyard_conf=config_path,
|
||||
action=DECKHAND_VALIDATE_DOCS_DAG_NAME,
|
||||
main_dag_name=parent_dag_name[0:parent_dag_name.find('.')],
|
||||
sub_dag_name=child_dag,
|
||||
dag=dag)
|
||||
|
||||
return dag
|
||||
|
||||
|
||||
def validate_site_design(parent_dag_name, child_dag_name, args):
|
||||
@ -23,10 +63,14 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
|
||||
'''
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args, )
|
||||
default_args=args)
|
||||
|
||||
deckhand_validate_docs = DeckhandOperator(
|
||||
task_id='deckhand_validate_site_design', dag=dag)
|
||||
deckhand_validate_docs = SubDagOperator(
|
||||
subdag=deckhand_validate_site_design(dag.dag_id,
|
||||
DECKHAND_VALIDATE_DOCS_DAG_NAME,
|
||||
args),
|
||||
task_id=DECKHAND_VALIDATE_DOCS_DAG_NAME,
|
||||
dag=dag)
|
||||
|
||||
# TODO () use the real operator here
|
||||
drydock_validate_docs = PlaceholderOperator(
|
||||
|
@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import requests
|
||||
import yaml
|
||||
|
||||
@ -70,12 +71,13 @@ class DeckhandOperator(BaseOperator):
|
||||
# Logs uuid of action performed by the Operator
|
||||
logging.info("DeckHand Operator for action %s", workflow_info['id'])
|
||||
|
||||
# Deckhand API Call
|
||||
if self.action == 'deckhand_get_design_version':
|
||||
# Retrieve Endpoint Information
|
||||
context['svc_endpoint'] = ucp_service_endpoint(self, context)
|
||||
logging.info("Deckhand endpoint is %s", context['svc_endpoint'])
|
||||
# Retrieve Endpoint Information
|
||||
context['svc_endpoint'] = ucp_service_endpoint(self, context)
|
||||
logging.info("Deckhand endpoint is %s", context['svc_endpoint'])
|
||||
|
||||
# Deckhand API Call
|
||||
# Retrieve Design Version from DeckHand
|
||||
if self.action == 'deckhand_get_design_version':
|
||||
# Retrieve DeckHand Design Version
|
||||
deckhand_design_version = self.deckhand_get_design(context)
|
||||
|
||||
@ -83,12 +85,30 @@ class DeckhandOperator(BaseOperator):
|
||||
return deckhand_design_version
|
||||
else:
|
||||
raise AirflowException('Failed to retrieve revision ID!')
|
||||
|
||||
# Validate Design using DeckHand
|
||||
elif self.action == 'deckhand_validate_site_design':
|
||||
# Retrieve revision_id from xcom
|
||||
# Note that in the case of 'deploy_site', the dag_id will be
|
||||
# 'deploy_site.deckhand_get_design_version.deckhand_get_design_version'
|
||||
# for the 'deckhand_get_design_version' task. We need to extract
|
||||
# the xcom value from it in order to get the value of the last
|
||||
# committed revision ID
|
||||
context['revision_id'] = task_instance.xcom_pull(
|
||||
task_ids='deckhand_get_design_version',
|
||||
dag_id=self.main_dag_name + '.deckhand_get_design_version' * 2)
|
||||
|
||||
logging.info("Revision ID is %d", context['revision_id'])
|
||||
self.deckhand_validate_site(context)
|
||||
|
||||
# No action to perform
|
||||
else:
|
||||
logging.info('No Action to Perform')
|
||||
|
||||
def deckhand_get_design(self, context):
|
||||
# Form Revision Endpoint
|
||||
revision_endpoint = context['svc_endpoint'] + '/revisions'
|
||||
revision_endpoint = os.path.join(context['svc_endpoint'],
|
||||
'revisions')
|
||||
|
||||
# Retrieve Revision
|
||||
logging.info("Retrieving revisions information...")
|
||||
@ -98,11 +118,11 @@ class DeckhandOperator(BaseOperator):
|
||||
# DeckHand
|
||||
logging.info("The number of revisions is %s", revisions['count'])
|
||||
|
||||
# Initialize Revision ID
|
||||
# Initialize Committed Version
|
||||
committed_ver = None
|
||||
|
||||
# Construct revision_list
|
||||
revision_list = revisions.get('results')
|
||||
revision_list = revisions.get('results', [])
|
||||
|
||||
# Search for the last committed version and save it as xcom
|
||||
for revision in reversed(revision_list):
|
||||
@ -116,6 +136,36 @@ class DeckhandOperator(BaseOperator):
|
||||
else:
|
||||
raise AirflowException("Failed to retrieve committed revision!")
|
||||
|
||||
def deckhand_validate_site(self, context):
|
||||
# Form Validation Endpoint
|
||||
validation_endpoint = os.path.join(context['svc_endpoint'],
|
||||
str(context['revision_id']),
|
||||
'validations')
|
||||
logging.info(validation_endpoint)
|
||||
|
||||
# Retrieve Validation list
|
||||
logging.info("Retrieving validation list...")
|
||||
retrieved_list = yaml.safe_load(requests.get(validation_endpoint).text)
|
||||
|
||||
# Initialize Validation Status
|
||||
validation_status = True
|
||||
|
||||
# Construct validation_list
|
||||
validation_list = retrieved_list.get('results', [])
|
||||
|
||||
# Assigns 'False' to validation_status if result status
|
||||
# is 'failure'
|
||||
for validation in validation_list:
|
||||
if validation.get('status') == 'failure':
|
||||
validation_status = False
|
||||
break
|
||||
|
||||
if validation_status:
|
||||
logging.info("Revision %d has been successfully validated",
|
||||
context['revision_id'])
|
||||
else:
|
||||
raise AirflowException("DeckHand Site Design Validation Failed!")
|
||||
|
||||
|
||||
class DeckhandOperatorPlugin(AirflowPlugin):
|
||||
name = 'deckhand_operator_plugin'
|
||||
|
Loading…
x
Reference in New Issue
Block a user