Ensure Presence of Committed Doc prior to Workflow Execution
Shipyard should (1) validate that there is a current committed version of the documents, (2) Pass that committed version as a parameter to the workflow, that is then used for the entire workflow. This applies to all the current workflow, i.e. deploy_site, update_site, redeploy_server Note that We will remove the step to retrieve deckhand design version from the workflow as it will be handled as part of the Shipyard Create Action with this change. Change-Id: Ifdbdd8f1ce1b2c6afa26fdfaee86cbb2776ca715
This commit is contained in:
parent
769d0ded47
commit
dee8887c88
@ -25,6 +25,8 @@ from shipyard_airflow import policy
|
||||
from shipyard_airflow.control.action.action_helper import (determine_lifecycle,
|
||||
format_action_steps)
|
||||
from shipyard_airflow.control.base import BaseResource
|
||||
from shipyard_airflow.control.configdocs.configdocs_helper import (
|
||||
ConfigdocsHelper)
|
||||
from shipyard_airflow.control.json_schemas import ACTION
|
||||
from shipyard_airflow.db.db import AIRFLOW_DB, SHIPYARD_DB
|
||||
from shipyard_airflow.errors import ApiError
|
||||
@ -97,6 +99,10 @@ class ActionsResource(BaseResource):
|
||||
dag = SUPPORTED_ACTION_MAPPINGS.get(action['name'])['dag']
|
||||
action['dag_id'] = dag
|
||||
|
||||
# Retrieve last committed design revision
|
||||
self.configdocs_helper = ConfigdocsHelper(context)
|
||||
action['committed_rev_id'] = self.get_committed_design_version()
|
||||
|
||||
# populate action parameters if they are not set
|
||||
if 'parameters' not in action:
|
||||
action['parameters'] = {}
|
||||
@ -315,3 +321,21 @@ class ActionsResource(BaseResource):
|
||||
}],
|
||||
retry=True,
|
||||
)
|
||||
|
||||
def get_committed_design_version(self):
|
||||
|
||||
LOG.info("Checking for committed revision in Deckhand...")
|
||||
committed_rev_id = self.configdocs_helper._get_committed_rev_id()
|
||||
|
||||
if committed_rev_id:
|
||||
LOG.info("The committed revision in Deckhand is %d",
|
||||
committed_rev_id)
|
||||
|
||||
return committed_rev_id
|
||||
|
||||
else:
|
||||
raise ApiError(
|
||||
title='Unable to locate any committed revision in Deckhand',
|
||||
description='No committed version found in Deckhand',
|
||||
status=falcon.HTTP_404,
|
||||
retry=False)
|
||||
|
@ -19,7 +19,7 @@ from airflow.operators.subdag_operator import SubDagOperator
|
||||
|
||||
from armada_deploy_site import deploy_site_armada
|
||||
import dag_names as dn
|
||||
from deckhand_get_design import get_design_deckhand
|
||||
from deckhand_get_rendered_doc import get_rendered_doc_deckhand
|
||||
from destroy_node import destroy_server
|
||||
from drydock_deploy_site import deploy_site_drydock
|
||||
from failure_handlers import step_failure_handler
|
||||
@ -88,13 +88,14 @@ class CommonStepFactory(object):
|
||||
on_failure_callback=step_failure_handler,
|
||||
dag=self.dag)
|
||||
|
||||
def get_get_design_version(self, task_id=dn.GET_DESIGN_VERSION):
|
||||
"""Generate the get design version step
|
||||
def get_get_rendered_doc(self, task_id=dn.GET_RENDERED_DOC):
|
||||
"""Generate the get deckhand rendered doc step
|
||||
|
||||
Retrieves the version of the design to use from deckhand
|
||||
Check that we are able to render the docs before proceeding
|
||||
further with the workflow
|
||||
"""
|
||||
return SubDagOperator(
|
||||
subdag=get_design_deckhand(
|
||||
subdag=get_rendered_doc_deckhand(
|
||||
self.parent_dag_name,
|
||||
task_id,
|
||||
args=self.default_args),
|
||||
|
@ -16,7 +16,7 @@
|
||||
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
|
||||
ARMADA_BUILD_DAG_NAME = 'armada_build'
|
||||
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
|
||||
GET_DESIGN_VERSION = 'get_design_version'
|
||||
GET_RENDERED_DOC = 'get_rendered_doc'
|
||||
GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration'
|
||||
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
|
||||
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
|
||||
|
@ -13,35 +13,24 @@
|
||||
# limitations under the License.
|
||||
|
||||
from airflow.models import DAG
|
||||
from airflow.operators import DeckhandGetDesignOperator
|
||||
from airflow.operators import DeckhandRetrieveRenderedDocOperator
|
||||
|
||||
from config_path import config_path
|
||||
|
||||
|
||||
def get_design_deckhand(parent_dag_name, child_dag_name, args):
|
||||
def get_rendered_doc_deckhand(parent_dag_name, child_dag_name, args):
|
||||
'''
|
||||
Get Deckhand Design Version
|
||||
Get rendered documents from Deckhand for the committed revision ID.
|
||||
'''
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args)
|
||||
|
||||
deckhand_design = DeckhandGetDesignOperator(
|
||||
task_id='deckhand_get_design_version',
|
||||
deckhand_retrieve_rendered_doc = DeckhandRetrieveRenderedDocOperator(
|
||||
task_id='deckhand_retrieve_rendered_doc',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
shipyard_retrieve_rendered_doc = DeckhandRetrieveRenderedDocOperator(
|
||||
task_id='shipyard_retrieve_rendered_doc',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
sub_dag_name=child_dag_name,
|
||||
dag=dag)
|
||||
|
||||
# Define dependencies
|
||||
shipyard_retrieve_rendered_doc.set_upstream(deckhand_design)
|
||||
|
||||
return dag
|
@ -46,7 +46,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
|
||||
action_xcom = step_factory.get_action_xcom()
|
||||
concurrency_check = step_factory.get_concurrency_check()
|
||||
preflight = step_factory.get_preflight()
|
||||
get_design_version = step_factory.get_get_design_version()
|
||||
get_rendered_doc = step_factory.get_get_rendered_doc()
|
||||
deployment_configuration = step_factory.get_deployment_configuration()
|
||||
validate_site_design = step_factory.get_validate_site_design()
|
||||
drydock_build = step_factory.get_drydock_build()
|
||||
@ -55,8 +55,8 @@ armada_build = step_factory.get_armada_build()
|
||||
# DAG Wiring
|
||||
concurrency_check.set_upstream(action_xcom)
|
||||
preflight.set_upstream(concurrency_check)
|
||||
get_design_version.set_upstream(preflight)
|
||||
deployment_configuration.set_upstream(get_design_version)
|
||||
get_rendered_doc.set_upstream(preflight)
|
||||
deployment_configuration.set_upstream(get_rendered_doc)
|
||||
validate_site_design.set_upstream(deployment_configuration)
|
||||
drydock_build.set_upstream(validate_site_design)
|
||||
armada_build.set_upstream(drydock_build)
|
||||
|
@ -47,7 +47,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
|
||||
action_xcom = step_factory.get_action_xcom()
|
||||
concurrency_check = step_factory.get_concurrency_check()
|
||||
preflight = step_factory.get_preflight()
|
||||
get_design_version = step_factory.get_get_design_version()
|
||||
get_rendered_doc = step_factory.get_get_rendered_doc()
|
||||
deployment_configuration = step_factory.get_deployment_configuration()
|
||||
validate_site_design = step_factory.get_validate_site_design()
|
||||
destroy_server = step_factory.get_destroy_server()
|
||||
@ -56,8 +56,8 @@ drydock_build = step_factory.get_drydock_build()
|
||||
# DAG Wiring
|
||||
concurrency_check.set_upstream(action_xcom)
|
||||
preflight.set_upstream(concurrency_check)
|
||||
get_design_version.set_upstream(preflight)
|
||||
deployment_configuration.set_upstream(get_design_version)
|
||||
get_rendered_doc.set_upstream(preflight)
|
||||
deployment_configuration.set_upstream(get_rendered_doc)
|
||||
validate_site_design.set_upstream(deployment_configuration)
|
||||
destroy_server.set_upstream(validate_site_design)
|
||||
drydock_build.set_upstream(destroy_server)
|
||||
|
@ -50,7 +50,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
|
||||
|
||||
action_xcom = step_factory.get_action_xcom()
|
||||
concurrency_check = step_factory.get_concurrency_check()
|
||||
get_design_version = step_factory.get_get_design_version()
|
||||
get_rendered_doc = step_factory.get_get_rendered_doc()
|
||||
deployment_configuration = step_factory.get_deployment_configuration()
|
||||
validate_site_design = step_factory.get_validate_site_design()
|
||||
drydock_build = step_factory.get_drydock_build()
|
||||
@ -61,8 +61,8 @@ skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow()
|
||||
|
||||
# DAG Wiring
|
||||
concurrency_check.set_upstream(action_xcom)
|
||||
get_design_version.set_upstream(concurrency_check)
|
||||
deployment_configuration.set_upstream(get_design_version)
|
||||
get_rendered_doc.set_upstream(concurrency_check)
|
||||
deployment_configuration.set_upstream(get_rendered_doc)
|
||||
validate_site_design.set_upstream(deployment_configuration)
|
||||
drydock_build.set_upstream(validate_site_design)
|
||||
armada_build.set_upstream(drydock_build)
|
||||
|
@ -93,13 +93,9 @@ class ArmadaBaseOperator(UcpBaseOperator):
|
||||
deckhand_svc_endpoint = ucp_service_endpoint(
|
||||
self, svc_type=self.deckhand_svc_type)
|
||||
|
||||
# Retrieve last committed revision id
|
||||
committed_revision_id = self.xcom_puller.get_design_version()
|
||||
|
||||
# Get deckhand design reference url
|
||||
self.deckhand_design_ref = self._init_deckhand_design_ref(
|
||||
deckhand_svc_endpoint,
|
||||
committed_revision_id)
|
||||
deckhand_svc_endpoint)
|
||||
|
||||
@staticmethod
|
||||
def _init_armada_client(armada_svc_endpoint, svc_token):
|
||||
@ -137,9 +133,7 @@ class ArmadaBaseOperator(UcpBaseOperator):
|
||||
else:
|
||||
raise AirflowException("Failed to set up Armada client!")
|
||||
|
||||
@staticmethod
|
||||
def _init_deckhand_design_ref(deckhand_svc_endpoint,
|
||||
committed_revision_id):
|
||||
def _init_deckhand_design_ref(self, deckhand_svc_endpoint):
|
||||
|
||||
LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
|
||||
|
||||
@ -148,7 +142,7 @@ class ArmadaBaseOperator(UcpBaseOperator):
|
||||
deckhand_path = "deckhand+" + deckhand_svc_endpoint
|
||||
_deckhand_design_ref = os.path.join(deckhand_path,
|
||||
"revisions",
|
||||
str(committed_revision_id),
|
||||
str(self.revision_id),
|
||||
"rendered-documents")
|
||||
|
||||
if _deckhand_design_ref:
|
||||
|
@ -113,24 +113,6 @@ class DeckhandBaseOperator(UcpBaseOperator):
|
||||
if not self.deckhandclient:
|
||||
raise AirflowException('Failed to set up deckhand client!')
|
||||
|
||||
# Retrieve 'revision_id' from xcom for tasks other than
|
||||
# 'deckhand_get_design_version'
|
||||
#
|
||||
# NOTE: In the case of 'deploy_site', the dag_id will
|
||||
# be 'deploy_site.get_design_version' for the
|
||||
# 'deckhand_get_design_version' task. We need to extract
|
||||
# the xcom value from it in order to get the value of the
|
||||
# last committed revision ID
|
||||
if self.task_id != 'deckhand_get_design_version':
|
||||
|
||||
# Retrieve 'revision_id' from xcom
|
||||
self.revision_id = self.xcom_puller.get_design_version()
|
||||
|
||||
if self.revision_id:
|
||||
LOG.info("Revision ID is %d", self.revision_id)
|
||||
else:
|
||||
raise AirflowException('Failed to retrieve Revision ID!')
|
||||
|
||||
|
||||
class DeckhandBaseOperatorPlugin(AirflowPlugin):
|
||||
|
||||
|
@ -103,9 +103,13 @@ class DeploymentConfigurationOperator(BaseOperator):
|
||||
if task_instance:
|
||||
LOG.debug("task_instance found, extracting design version")
|
||||
# Set the revision_id to the revision on the xcom
|
||||
revision_id = task_instance.xcom_pull(
|
||||
task_ids='deckhand_get_design_version',
|
||||
dag_id=self.main_dag_name + '.get_design_version')
|
||||
action_info = task_instance.xcom_pull(
|
||||
task_ids='action_xcom',
|
||||
dag_id=self.main_dag_name,
|
||||
key='action')
|
||||
|
||||
revision_id = action_info['committed_rev_id']
|
||||
|
||||
if revision_id:
|
||||
LOG.info("Revision is set to: %s for deployment configuration",
|
||||
revision_id)
|
||||
|
@ -153,15 +153,12 @@ class DrydockBaseOperator(UcpBaseOperator):
|
||||
|
||||
LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
|
||||
|
||||
# Retrieve last committed revision id
|
||||
committed_revision_id = self.xcom_puller.get_design_version()
|
||||
|
||||
# Form DeckHand Design Reference Path
|
||||
# This URL will be used to retrieve the Site Design YAMLs
|
||||
deckhand_path = "deckhand+" + deckhand_svc_endpoint
|
||||
self.deckhand_design_ref = os.path.join(deckhand_path,
|
||||
"revisions",
|
||||
str(committed_revision_id),
|
||||
str(self.revision_id),
|
||||
"rendered-documents")
|
||||
if self.deckhand_design_ref:
|
||||
LOG.info("Design YAMLs will be retrieved from %s",
|
||||
|
@ -86,11 +86,6 @@ class UcpBaseOperator(BaseOperator):
|
||||
# Exeute child function
|
||||
self.do_execute()
|
||||
|
||||
# Push last committed version to xcom for the
|
||||
# 'get_design_version' subdag
|
||||
if self.sub_dag_name == 'get_design_version':
|
||||
return self.committed_ver
|
||||
|
||||
def ucp_base(self, context):
|
||||
|
||||
LOG.info("Running UCP Base Operator...")
|
||||
@ -109,6 +104,7 @@ class UcpBaseOperator(BaseOperator):
|
||||
self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance)
|
||||
self.action_info = self.xcom_puller.get_action_info()
|
||||
self.dc = self.xcom_puller.get_deployment_configuration()
|
||||
self.revision_id = self.action_info['committed_rev_id']
|
||||
|
||||
def get_k8s_logs(self):
|
||||
"""Retrieve Kubernetes pod/container logs specified by an opererator
|
||||
|
@ -73,12 +73,3 @@ class XcomPuller(object):
|
||||
return self._get_xcom(source_task=source_task,
|
||||
dag_id=source_dag,
|
||||
key=key)
|
||||
|
||||
def get_design_version(self):
|
||||
"""Retrieve the design version being used for this workflow"""
|
||||
source_task = 'deckhand_get_design_version'
|
||||
source_dag = 'get_design_version'
|
||||
key = None
|
||||
return self._get_xcom(source_task=source_task,
|
||||
dag_id=source_dag,
|
||||
key=key)
|
||||
|
@ -11,18 +11,16 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
import mock
|
||||
from falcon import testing
|
||||
from mock import patch
|
||||
from oslo_config import cfg
|
||||
import pytest
|
||||
|
||||
import falcon
|
||||
from falcon import testing
|
||||
import json
|
||||
import logging
|
||||
import mock
|
||||
import os
|
||||
import pytest
|
||||
import responses
|
||||
|
||||
from shipyard_airflow.control.action import actions_api
|
||||
@ -35,6 +33,7 @@ DATE_ONE = datetime(2017, 9, 13, 11, 13, 3, 57000)
|
||||
DATE_TWO = datetime(2017, 9, 13, 11, 13, 5, 57000)
|
||||
DATE_ONE_STR = DATE_ONE.strftime('%Y-%m-%dT%H:%M:%S')
|
||||
DATE_TWO_STR = DATE_TWO.strftime('%Y-%m-%dT%H:%M:%S')
|
||||
DESIGN_VERSION = 1
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -299,6 +298,7 @@ def test_create_action():
|
||||
action_resource.invoke_airflow_dag = airflow_stub
|
||||
action_resource.insert_action = insert_action_stub
|
||||
action_resource.audit_control_command_db = audit_control_command_db
|
||||
action_resource.get_committed_design_version = lambda: DESIGN_VERSION
|
||||
|
||||
# with invalid input. fail.
|
||||
try:
|
||||
@ -326,6 +326,7 @@ def test_create_action():
|
||||
assert len(action['id']) == 26
|
||||
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
|
||||
assert action['dag_status'] == 'SCHEDULED'
|
||||
assert action['committed_rev_id'] == 1
|
||||
except ApiError:
|
||||
assert False, 'Should not raise an ApiError'
|
||||
|
||||
@ -338,6 +339,7 @@ def test_create_action():
|
||||
assert len(action['id']) == 26
|
||||
assert action['dag_execution_date'] == '2017-09-06 14:10:08.528402'
|
||||
assert action['dag_status'] == 'SCHEDULED'
|
||||
assert action['committed_rev_id'] == 1
|
||||
except ApiError:
|
||||
assert False, 'Should not raise an ApiError'
|
||||
|
||||
|
@ -18,6 +18,25 @@ import yaml
|
||||
import airflow
|
||||
from airflow.exceptions import AirflowException
|
||||
|
||||
ACTION_INFO = {
|
||||
'committed_rev_id': 2,
|
||||
'dag_id': 'deploy_site',
|
||||
'id': '01CBGWY1GXQVXVCXRJKM9V71AT',
|
||||
'name': 'deploy_site',
|
||||
'parameters': {},
|
||||
'timestamp': '2018-04-20 06:47:43.905047',
|
||||
'user': 'shipyard'}
|
||||
|
||||
ACTION_INFO_NO_COMMIT = {
|
||||
'committed_rev_id': None,
|
||||
'dag_id': 'deploy_site',
|
||||
'id': '01CBGWY1GXQVXVCXRJKM9V71AT',
|
||||
'name': 'deploy_site',
|
||||
'parameters': {},
|
||||
'timestamp': '2018-04-20 06:47:43.905047',
|
||||
'user': 'shipyard'}
|
||||
|
||||
|
||||
try:
|
||||
from deployment_configuration_operator import (
|
||||
DeploymentConfigurationOperator
|
||||
@ -62,7 +81,7 @@ def test_execute_no_client(p1):
|
||||
|
||||
|
||||
@mock.patch.object(airflow.models.TaskInstance, 'xcom_pull',
|
||||
return_value=99)
|
||||
return_value=ACTION_INFO)
|
||||
def test_get_revision_id(ti):
|
||||
"""Test that get revision id follows desired exits"""
|
||||
dco = DeploymentConfigurationOperator(main_dag_name="main",
|
||||
@ -71,11 +90,11 @@ def test_get_revision_id(ti):
|
||||
ti = airflow.models.TaskInstance(task=mock.MagicMock(),
|
||||
execution_date="no")
|
||||
rid = dco.get_revision_id(ti)
|
||||
assert rid == 99
|
||||
assert rid == 2
|
||||
|
||||
|
||||
@mock.patch.object(airflow.models.TaskInstance, 'xcom_pull',
|
||||
return_value=None)
|
||||
return_value=ACTION_INFO_NO_COMMIT)
|
||||
def test_get_revision_id_none(ti):
|
||||
"""Test that get revision id follows desired exits"""
|
||||
dco = DeploymentConfigurationOperator(main_dag_name="main",
|
||||
|
Loading…
x
Reference in New Issue
Block a user