DAG Maintenance - parallelization
Updates the workflows to have considerably more parallelization of the early steps, and unwraps some of the operators from their sub-dags, in an attempt to simplify the representation of the workflow and potentially increase performance. Change-Id: I6ce987b32399e261a2383233bd192b0e49514791
This commit is contained in:
parent
1aa814491b
commit
9f99431fad
@ -33,7 +33,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
|
||||
task_id='armada_get_status',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
# Armada Apply
|
||||
@ -41,7 +40,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
|
||||
task_id='armada_post_apply',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
@ -50,7 +48,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
|
||||
task_id='armada_get_releases',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
# Define dependencies
|
||||
|
@ -12,15 +12,17 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from airflow.operators import ConcurrencyCheckOperator
|
||||
from airflow.operators import DeckhandRetrieveRenderedDocOperator
|
||||
from airflow.operators import DeploymentConfigurationOperator
|
||||
from airflow.operators import DeckhandCreateSiteActionTagOperator
|
||||
|
||||
from airflow.operators.bash_operator import BashOperator
|
||||
from airflow.operators.python_operator import BranchPythonOperator
|
||||
from airflow.operators.python_operator import PythonOperator
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
|
||||
from armada_deploy_site import deploy_site_armada
|
||||
from dag_deployment_configuration import get_deployment_configuration
|
||||
from deckhand_create_tag import create_deckhand_tag
|
||||
from deckhand_get_rendered_doc import get_rendered_doc_deckhand
|
||||
from config_path import config_path
|
||||
from destroy_node import destroy_server
|
||||
from drydock_deploy_site import deploy_site_drydock
|
||||
from failure_handlers import step_failure_handler
|
||||
@ -64,7 +66,7 @@ class CommonStepFactory(object):
|
||||
dag=self.dag,
|
||||
python_callable=xcom_push)
|
||||
|
||||
def get_concurrency_check(self, task_id=dn.DAG_CONCURRENCY_CHECK_DAG_NAME):
|
||||
def get_concurrency_check(self, task_id=dn.CONCURRENCY_CHECK):
|
||||
"""Generate the concurrency check step
|
||||
|
||||
Concurrency check prevents simultaneous execution of dags that should
|
||||
@ -95,11 +97,9 @@ class CommonStepFactory(object):
|
||||
Check that we are able to render the docs before proceeding
|
||||
further with the workflow
|
||||
"""
|
||||
return SubDagOperator(
|
||||
subdag=get_rendered_doc_deckhand(
|
||||
self.parent_dag_name,
|
||||
task_id,
|
||||
args=self.default_args),
|
||||
return DeckhandRetrieveRenderedDocOperator(
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=self.parent_dag_name,
|
||||
task_id=task_id,
|
||||
on_failure_callback=step_failure_handler,
|
||||
dag=self.dag)
|
||||
@ -121,17 +121,15 @@ class CommonStepFactory(object):
|
||||
dag=self.dag)
|
||||
|
||||
def get_deployment_configuration(self,
|
||||
task_id=dn.GET_DEPLOY_CONF_DAG_NAME):
|
||||
task_id=dn.DEPLOYMENT_CONFIGURATION):
|
||||
"""Generate the step to retrieve the deployment configuration
|
||||
|
||||
This step provides the timings and strategies that will be used in
|
||||
subsequent steps
|
||||
"""
|
||||
return SubDagOperator(
|
||||
subdag=get_deployment_configuration(
|
||||
self.parent_dag_name,
|
||||
task_id,
|
||||
args=self.default_args),
|
||||
return DeploymentConfigurationOperator(
|
||||
main_dag_name=self.parent_dag_name,
|
||||
shipyard_conf=config_path,
|
||||
task_id=task_id,
|
||||
on_failure_callback=step_failure_handler,
|
||||
dag=self.dag)
|
||||
@ -254,12 +252,11 @@ class CommonStepFactory(object):
|
||||
Note that trigger_rule is set to "all_done" so that this
|
||||
step will run even when upstream tasks are in failed state.
|
||||
"""
|
||||
return SubDagOperator(
|
||||
subdag=create_deckhand_tag(
|
||||
self.parent_dag_name,
|
||||
task_id,
|
||||
args=self.default_args),
|
||||
|
||||
return DeckhandCreateSiteActionTagOperator(
|
||||
task_id=task_id,
|
||||
shipyard_conf=config_path,
|
||||
on_failure_callback=step_failure_handler,
|
||||
trigger_rule="all_done",
|
||||
main_dag_name=self.parent_dag_name,
|
||||
dag=self.dag)
|
||||
|
@ -1,31 +0,0 @@
|
||||
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from airflow.models import DAG
|
||||
from airflow.operators import ConcurrencyCheckOperator
|
||||
|
||||
|
||||
def dag_concurrency_check(parent_dag_name, child_dag_name, args):
|
||||
'''
|
||||
dag_concurrency_check is a sub-DAG that will will allow for a DAG to
|
||||
determine if it is already running, and result in an error if so.
|
||||
'''
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args, )
|
||||
|
||||
dag_concurrency_check_operator = ConcurrencyCheckOperator(
|
||||
task_id='dag_concurrency_check', dag=dag)
|
||||
|
||||
return dag
|
@ -1,36 +0,0 @@
|
||||
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from airflow.models import DAG
|
||||
from airflow.operators import DeploymentConfigurationOperator
|
||||
|
||||
from config_path import config_path
|
||||
|
||||
|
||||
GET_DEPLOYMENT_CONFIGURATION_NAME = 'get_deployment_configuration'
|
||||
|
||||
|
||||
def get_deployment_configuration(parent_dag_name, child_dag_name, args):
|
||||
"""DAG to retrieve deployment configuration"""
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args)
|
||||
|
||||
deployment_configuration = DeploymentConfigurationOperator(
|
||||
task_id=GET_DEPLOYMENT_CONFIGURATION_NAME,
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
dag=dag)
|
||||
|
||||
return dag
|
@ -15,16 +15,16 @@
|
||||
# Subdags
|
||||
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
|
||||
ARMADA_BUILD_DAG_NAME = 'armada_build'
|
||||
CREATE_ACTION_TAG = 'create_action_tag'
|
||||
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
|
||||
DESTROY_SERVER_DAG_NAME = 'destroy_server'
|
||||
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
|
||||
GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration'
|
||||
GET_RENDERED_DOC = 'get_rendered_doc'
|
||||
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
|
||||
|
||||
# Steps
|
||||
ACTION_XCOM = 'action_xcom'
|
||||
CONCURRENCY_CHECK = 'dag_concurrency_check'
|
||||
CREATE_ACTION_TAG = 'create_action_tag'
|
||||
DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade'
|
||||
DEPLOYMENT_CONFIGURATION = 'deployment_configuration'
|
||||
GET_RENDERED_DOC = 'get_rendered_doc'
|
||||
SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow'
|
||||
UPGRADE_AIRFLOW = 'upgrade_airflow'
|
||||
|
@ -1,36 +0,0 @@
|
||||
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from airflow.models import DAG
|
||||
from airflow.operators import DeckhandCreateSiteActionTagOperator
|
||||
|
||||
from config_path import config_path
|
||||
|
||||
|
||||
def create_deckhand_tag(parent_dag_name, child_dag_name, args):
|
||||
'''
|
||||
Create Deckhand Revision Tag
|
||||
'''
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args)
|
||||
|
||||
create_action_tag = DeckhandCreateSiteActionTagOperator(
|
||||
task_id='deckhand_create_action_tag',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
return dag
|
@ -1,36 +0,0 @@
|
||||
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from airflow.models import DAG
|
||||
from airflow.operators import DeckhandRetrieveRenderedDocOperator
|
||||
|
||||
from config_path import config_path
|
||||
|
||||
|
||||
def get_rendered_doc_deckhand(parent_dag_name, child_dag_name, args):
|
||||
'''
|
||||
Get rendered documents from Deckhand for the committed revision ID.
|
||||
'''
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args)
|
||||
|
||||
deckhand_retrieve_rendered_doc = DeckhandRetrieveRenderedDocOperator(
|
||||
task_id='deckhand_retrieve_rendered_doc',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
return dag
|
@ -54,11 +54,15 @@ armada_build = step_factory.get_armada_build()
|
||||
create_action_tag = step_factory.get_create_action_tag()
|
||||
|
||||
# DAG Wiring
|
||||
concurrency_check.set_upstream(action_xcom)
|
||||
preflight.set_upstream(concurrency_check)
|
||||
get_rendered_doc.set_upstream(preflight)
|
||||
deployment_configuration.set_upstream(get_rendered_doc)
|
||||
validate_site_design.set_upstream(deployment_configuration)
|
||||
preflight.set_upstream(action_xcom)
|
||||
get_rendered_doc.set_upstream(action_xcom)
|
||||
deployment_configuration.set_upstream(action_xcom)
|
||||
validate_site_design.set_upstream([
|
||||
preflight,
|
||||
get_rendered_doc,
|
||||
concurrency_check,
|
||||
deployment_configuration
|
||||
])
|
||||
drydock_build.set_upstream(validate_site_design)
|
||||
armada_build.set_upstream(drydock_build)
|
||||
create_action_tag.set_upstream(armada_build)
|
||||
|
@ -39,7 +39,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||
task_id='promenade_drain_node',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
# Clear Labels
|
||||
@ -47,7 +46,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||
task_id='promenade_clear_labels',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
# Shutdown Kubelet
|
||||
@ -55,7 +53,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||
task_id='promenade_shutdown_kubelet',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
# ETCD Sanity Check
|
||||
@ -63,7 +60,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||
task_id='promenade_check_etcd',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
# Power down and destroy node using DryDock
|
||||
@ -71,7 +67,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||
task_id='destroy_node',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
# Decommission node from Kubernetes cluster using Promenade
|
||||
@ -79,7 +74,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||
task_id='promenade_decommission_node',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
# Define dependencies
|
||||
|
@ -32,21 +32,18 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args):
|
||||
task_id='verify_site',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
drydock_prepare_site = DrydockPrepareSiteOperator(
|
||||
task_id='prepare_site',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
drydock_nodes = DrydockNodesOperator(
|
||||
task_id='prepare_and_deploy_nodes',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
# Define dependencies
|
||||
|
@ -62,11 +62,15 @@ skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow()
|
||||
create_action_tag = step_factory.get_create_action_tag()
|
||||
|
||||
# DAG Wiring
|
||||
concurrency_check.set_upstream(action_xcom)
|
||||
preflight.set_upstream(concurrency_check)
|
||||
get_rendered_doc.set_upstream(preflight)
|
||||
deployment_configuration.set_upstream(get_rendered_doc)
|
||||
validate_site_design.set_upstream(deployment_configuration)
|
||||
preflight.set_upstream(action_xcom)
|
||||
get_rendered_doc.set_upstream(action_xcom)
|
||||
deployment_configuration.set_upstream(action_xcom)
|
||||
validate_site_design.set_upstream([
|
||||
preflight,
|
||||
get_rendered_doc,
|
||||
concurrency_check,
|
||||
deployment_configuration
|
||||
])
|
||||
drydock_build.set_upstream(validate_site_design)
|
||||
armada_build.set_upstream(drydock_build)
|
||||
decide_airflow_upgrade.set_upstream(armada_build)
|
||||
|
@ -22,9 +22,10 @@ from config_path import config_path
|
||||
|
||||
|
||||
def validate_site_design(parent_dag_name, child_dag_name, args):
|
||||
'''
|
||||
Subdag to delegate design verification to the UCP components
|
||||
'''
|
||||
"""Subdag to delegate design verification to the UCP components
|
||||
|
||||
There is no wiring of steps - they all execute in parallel
|
||||
"""
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args)
|
||||
@ -33,32 +34,28 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
|
||||
task_id='deckhand_validate_site_design',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
retries=3,
|
||||
retries=1,
|
||||
dag=dag)
|
||||
|
||||
drydock_validate_docs = DrydockValidateDesignOperator(
|
||||
task_id='drydock_validate_site_design',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
retries=3,
|
||||
retries=1,
|
||||
dag=dag)
|
||||
|
||||
armada_validate_docs = ArmadaValidateDesignOperator(
|
||||
task_id='armada_validate_site_design',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
retries=3,
|
||||
retries=1,
|
||||
dag=dag)
|
||||
|
||||
promenade_validate_docs = PromenadeValidateSiteDesignOperator(
|
||||
task_id='promenade_validate_site_design',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
retries=3,
|
||||
retries=1,
|
||||
dag=dag)
|
||||
|
||||
return dag
|
||||
|
@ -71,7 +71,6 @@ class UcpBaseOperator(BaseOperator):
|
||||
pod_selector_pattern=None,
|
||||
shipyard_conf=None,
|
||||
start_time=None,
|
||||
sub_dag_name=None,
|
||||
xcom_push=True,
|
||||
*args, **kwargs):
|
||||
"""Initialization of UcpBaseOperator object.
|
||||
@ -92,7 +91,6 @@ class UcpBaseOperator(BaseOperator):
|
||||
log-rotate container.
|
||||
:param shipyard_conf: Location of shipyard.conf
|
||||
:param start_time: Time when Operator gets executed
|
||||
:param sub_dag_name: Child Dag
|
||||
:param xcom_push: xcom usage
|
||||
|
||||
"""
|
||||
@ -103,7 +101,6 @@ class UcpBaseOperator(BaseOperator):
|
||||
self.pod_selector_pattern = pod_selector_pattern or []
|
||||
self.shipyard_conf = shipyard_conf
|
||||
self.start_time = datetime.now()
|
||||
self.sub_dag_name = sub_dag_name
|
||||
self.xcom_push_flag = xcom_push
|
||||
self.doc_utils = _get_document_util(self.shipyard_conf)
|
||||
self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf)
|
||||
|
@ -33,7 +33,20 @@ class XcomPuller(object):
|
||||
self.ti = task_instance
|
||||
|
||||
def _get_xcom(self, source_task, dag_id=None, key=None, log_result=True):
|
||||
"""Find a particular xcom value"""
|
||||
"""Find and return an xcom value
|
||||
|
||||
:param source_task: The name of the task that wrote the xcom
|
||||
:param dag_id: The name of the subdag (of the main DAG) that contained
|
||||
the source task. Let this default to None if the task is a direct
|
||||
child of the main dag
|
||||
:param key: The name of the xcom item that was written by the task. If
|
||||
the source task allowed for the step to simply push xcom at the
|
||||
end of the step, leave this None.
|
||||
:param log_result: boolean to indicate if the value of the xcom should
|
||||
be logged upon retreival. This can be nice for investigative
|
||||
purposes, but would likely not be good for large or complex
|
||||
values.
|
||||
"""
|
||||
if dag_id is None:
|
||||
source_dag = self.mdn
|
||||
else:
|
||||
@ -53,8 +66,8 @@ class XcomPuller(object):
|
||||
|
||||
def get_deployment_configuration(self):
|
||||
"""Retrieve the deployment configuration dictionary"""
|
||||
source_task = 'get_deployment_configuration'
|
||||
source_dag = 'dag_deployment_configuration'
|
||||
source_task = 'deployment_configuration'
|
||||
source_dag = None
|
||||
key = None
|
||||
return self._get_xcom(source_task=source_task,
|
||||
dag_id=source_dag,
|
||||
@ -77,7 +90,7 @@ class XcomPuller(object):
|
||||
def get_check_drydock_continue_on_fail(self):
|
||||
"""Check if 'drydock_continue_on_fail' key exists"""
|
||||
source_task = 'ucp_preflight_check'
|
||||
source_dag = 'preflight'
|
||||
source_dag = None
|
||||
key = 'drydock_continue_on_fail'
|
||||
return self._get_xcom(source_task=source_task,
|
||||
dag_id=source_dag,
|
||||
|
Loading…
x
Reference in New Issue
Block a user