Pass Drydock health failure
Bypass a failure due to the health of Drydock on update_site or deploy_site if the continue-on-fail param is enetered. Adds unit tests Changed operator to be more easily testable Removed the K8s Health Check for preflight checks. We will add the functionalities back once we have a clearer view on what should be checked and validated for the workflow. Change-Id: Idd6d6b18d762a0284f2041248faa4040c78def3f
This commit is contained in:
parent
c0f8fc4359
commit
3629245b0c
@ -246,12 +246,17 @@ id of the action invoked so that it can be queried subsequently.
|
||||
|
||||
Example:
|
||||
shipyard create action redeploy_server --param="server-name=mcp"
|
||||
shipyard create action update_site --param="continue-on-fail=true"
|
||||
|
||||
<action_command>
|
||||
The action to invoke.
|
||||
|
||||
\--param=<parameter>
|
||||
A parameter to be provided to the action being invoked. (repeatable)
|
||||
Note that we can pass in different information to the create action
|
||||
workflow, i.e. name of server to be redeployed, whether to continue
|
||||
the workflow if there are failures in Drydock, e.g. failed health
|
||||
checks.
|
||||
|
||||
\--allow-intermediate-commits
|
||||
Allows continuation of a site action, e.g. update_site even when the
|
||||
|
@ -13,7 +13,6 @@
|
||||
# limitations under the License.
|
||||
|
||||
from airflow.models import DAG
|
||||
from airflow.operators import K8sHealthCheckOperator
|
||||
from airflow.operators import UcpHealthCheckOperator
|
||||
|
||||
from config_path import config_path
|
||||
@ -27,22 +26,14 @@ def all_preflight_checks(parent_dag_name, child_dag_name, args):
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args)
|
||||
|
||||
'''
|
||||
The k8s_preflight_check checks that k8s is in a good state
|
||||
for the purposes of the Undercloud Platform to proceed with
|
||||
processing
|
||||
'''
|
||||
k8s = K8sHealthCheckOperator(
|
||||
task_id='k8s_preflight_check',
|
||||
dag=dag)
|
||||
|
||||
'''
|
||||
Check that all UCP components are in good state for the purposes
|
||||
of the Undercloud Platform to proceed with processing
|
||||
of the Undercloud Platform to proceed with processing.
|
||||
'''
|
||||
shipyard = UcpHealthCheckOperator(
|
||||
task_id='ucp_preflight_check',
|
||||
shipyard_conf=config_path,
|
||||
main_dag_name=parent_dag_name,
|
||||
dag=dag)
|
||||
|
||||
return dag
|
||||
|
@ -50,6 +50,7 @@ step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME,
|
||||
|
||||
action_xcom = step_factory.get_action_xcom()
|
||||
concurrency_check = step_factory.get_concurrency_check()
|
||||
preflight = step_factory.get_preflight()
|
||||
get_rendered_doc = step_factory.get_get_rendered_doc()
|
||||
deployment_configuration = step_factory.get_deployment_configuration()
|
||||
validate_site_design = step_factory.get_validate_site_design()
|
||||
@ -62,7 +63,8 @@ create_action_tag = step_factory.get_create_action_tag()
|
||||
|
||||
# DAG Wiring
|
||||
concurrency_check.set_upstream(action_xcom)
|
||||
get_rendered_doc.set_upstream(concurrency_check)
|
||||
preflight.set_upstream(concurrency_check)
|
||||
get_rendered_doc.set_upstream(preflight)
|
||||
deployment_configuration.set_upstream(get_rendered_doc)
|
||||
validate_site_design.set_upstream(deployment_configuration)
|
||||
drydock_build.set_upstream(validate_site_design)
|
||||
|
@ -96,6 +96,18 @@ class DrydockBaseOperator(UcpBaseOperator):
|
||||
# Logs uuid of action performed by the Operator
|
||||
LOG.info("DryDock Operator for action %s", self.action_info['id'])
|
||||
|
||||
# Skip workflow if health checks on Drydock failed and continue-on-fail
|
||||
# option is turned on
|
||||
if self.xcom_puller.get_check_drydock_continue_on_fail():
|
||||
LOG.info("Skipping %s as health checks on Drydock have "
|
||||
"failed and continue-on-fail option has been "
|
||||
"turned on", self.__class__.__name__)
|
||||
|
||||
# Set continue processing to False
|
||||
self.continue_processing = False
|
||||
|
||||
return
|
||||
|
||||
# Retrieve information of the server that we want to redeploy if user
|
||||
# executes the 'redeploy_server' dag
|
||||
# Set node filter to be the server that we want to redeploy
|
||||
|
@ -17,7 +17,10 @@ import time
|
||||
|
||||
from airflow.exceptions import AirflowException
|
||||
|
||||
from service_session import ucp_keystone_session
|
||||
try:
|
||||
from service_session import ucp_keystone_session
|
||||
except ImportError:
|
||||
from shipyard_airflow.plugins.service_session import ucp_keystone_session
|
||||
|
||||
|
||||
def ucp_service_endpoint(self, svc_type):
|
||||
|
@ -48,6 +48,8 @@ class UcpBaseOperator(BaseOperator):
|
||||
*args, **kwargs):
|
||||
"""Initialization of UcpBaseOperator object.
|
||||
|
||||
:param continue_processing: A boolean value on whether to continue
|
||||
with the workflow. Defaults to True.
|
||||
:param main_dag_name: Parent Dag
|
||||
:param pod_selector_pattern: A list containing the information on
|
||||
the patterns of the Pod name and name
|
||||
@ -68,6 +70,7 @@ class UcpBaseOperator(BaseOperator):
|
||||
"""
|
||||
|
||||
super(UcpBaseOperator, self).__init__(*args, **kwargs)
|
||||
self.continue_processing = True
|
||||
self.main_dag_name = main_dag_name
|
||||
self.pod_selector_pattern = pod_selector_pattern or []
|
||||
self.shipyard_conf = shipyard_conf
|
||||
@ -83,8 +86,9 @@ class UcpBaseOperator(BaseOperator):
|
||||
# Execute base function
|
||||
self.run_base(context)
|
||||
|
||||
# Exeute child function
|
||||
self.do_execute()
|
||||
if self.continue_processing:
|
||||
# Exeute child function
|
||||
self.do_execute()
|
||||
|
||||
def ucp_base(self, context):
|
||||
|
||||
|
@ -11,7 +11,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import requests
|
||||
@ -22,6 +21,10 @@ from airflow.plugins_manager import AirflowPlugin
|
||||
from airflow.utils.decorators import apply_defaults
|
||||
|
||||
from service_endpoint import ucp_service_endpoint
|
||||
from xcom_puller import XcomPuller
|
||||
from xcom_pusher import XcomPusher
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UcpHealthCheckOperator(BaseOperator):
|
||||
@ -31,12 +34,16 @@ class UcpHealthCheckOperator(BaseOperator):
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
shipyard_conf,
|
||||
shipyard_conf=None,
|
||||
main_dag_name=None,
|
||||
xcom_push=True,
|
||||
*args,
|
||||
**kwargs):
|
||||
|
||||
super(UcpHealthCheckOperator, self).__init__(*args, **kwargs)
|
||||
self.shipyard_conf = shipyard_conf
|
||||
self.main_dag_name = main_dag_name
|
||||
self.xcom_push_flag = xcom_push
|
||||
|
||||
def execute(self, context):
|
||||
|
||||
@ -48,35 +55,64 @@ class UcpHealthCheckOperator(BaseOperator):
|
||||
'physicalprovisioner',
|
||||
'shipyard']
|
||||
|
||||
# Define task_instance
|
||||
self.task_instance = context['task_instance']
|
||||
|
||||
# Set up and retrieve values from xcom
|
||||
self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance)
|
||||
self.action_info = self.xcom_puller.get_action_info()
|
||||
|
||||
# Set up xcom_pusher to push values to xcom
|
||||
self.xcom_pusher = XcomPusher(self.task_instance)
|
||||
|
||||
# Loop through various UCP Components
|
||||
for i in ucp_components:
|
||||
for component in ucp_components:
|
||||
|
||||
# Retrieve Endpoint Information
|
||||
service_endpoint = ucp_service_endpoint(self, svc_type=i)
|
||||
logging.info("%s endpoint is %s", i, service_endpoint)
|
||||
service_endpoint = ucp_service_endpoint(self,
|
||||
svc_type=component)
|
||||
LOG.info("%s endpoint is %s", component, service_endpoint)
|
||||
|
||||
# Construct Health Check Endpoint
|
||||
healthcheck_endpoint = os.path.join(service_endpoint,
|
||||
'health')
|
||||
|
||||
logging.info("%s healthcheck endpoint is %s", i,
|
||||
healthcheck_endpoint)
|
||||
LOG.info("%s healthcheck endpoint is %s", component,
|
||||
healthcheck_endpoint)
|
||||
|
||||
try:
|
||||
logging.info("Performing Health Check on %s", i)
|
||||
|
||||
LOG.info("Performing Health Check on %s", component)
|
||||
# Set health check timeout to 30 seconds
|
||||
req = requests.get(healthcheck_endpoint, timeout=30)
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise AirflowException(e)
|
||||
|
||||
# UCP Component will return empty response/body to show that
|
||||
# it is healthy
|
||||
if req.status_code == 204:
|
||||
logging.info("%s is alive and healthy", i)
|
||||
else:
|
||||
logging.error(req.text)
|
||||
raise AirflowException("Invalid Response!")
|
||||
# An empty response/body returned by a component means
|
||||
# that it is healthy
|
||||
if req.status_code == 204:
|
||||
LOG.info("%s is alive and healthy", component)
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.log_health_exception(component, e)
|
||||
|
||||
def log_health_exception(self, component, error_messages):
|
||||
"""Logs Exceptions for health check
|
||||
"""
|
||||
# If Drydock health check fails and continue-on-fail, continue
|
||||
# and create xcom key 'drydock_continue_on_fail'
|
||||
if (component == 'physicalprovisioner' and
|
||||
self.action_info['parameters'].get(
|
||||
'continue-on-fail').lower() == 'true' and
|
||||
self.action_info['dag_id'] in ['update_site', 'deploy_site']):
|
||||
LOG.warning('Drydock did not pass health check. Continuing '
|
||||
'as "continue-on-fail" option is enabled.')
|
||||
self.xcom_pusher.xcom_push(key='drydock_continue_on_fail',
|
||||
value=True)
|
||||
|
||||
else:
|
||||
LOG.error(error_messages)
|
||||
raise AirflowException("Health check failed for %s component on "
|
||||
"dag_id=%s. Details: %s" %
|
||||
(component, self.action_info.get('dag_id'),
|
||||
error_messages))
|
||||
|
||||
|
||||
class UcpHealthCheckPlugin(AirflowPlugin):
|
||||
|
@ -73,3 +73,12 @@ class XcomPuller(object):
|
||||
return self._get_xcom(source_task=source_task,
|
||||
dag_id=source_dag,
|
||||
key=key)
|
||||
|
||||
def get_check_drydock_continue_on_fail(self):
|
||||
"""Check if 'drydock_continue_on_fail' key exists"""
|
||||
source_task = 'ucp_preflight_check'
|
||||
source_dag = 'preflight'
|
||||
key = 'drydock_continue_on_fail'
|
||||
return self._get_xcom(source_task=source_task,
|
||||
dag_id=source_dag,
|
||||
key=key)
|
||||
|
@ -41,6 +41,7 @@ DESC_ACTION = """
|
||||
FORMAT: shipyard create action <action command> --param=<parameter>
|
||||
(repeatable) [--allow-intermediate-commits] \n
|
||||
EXAMPLE: shipyard create action redeploy_server --param="server-name=mcp"
|
||||
shipyard create action update_site --param="continue-on-fail=true"
|
||||
"""
|
||||
|
||||
SHORT_DESC_ACTION = (
|
||||
|
101
tests/unit/plugins/test_ucp_preflight_check_operator.py
Normal file
101
tests/unit/plugins/test_ucp_preflight_check_operator.py
Normal file
@ -0,0 +1,101 @@
|
||||
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import mock
|
||||
import pytest
|
||||
from requests.models import Response
|
||||
|
||||
from airflow.exceptions import AirflowException
|
||||
from shipyard_airflow.plugins.ucp_preflight_check_operator import (
|
||||
UcpHealthCheckOperator)
|
||||
|
||||
ucp_components = [
|
||||
'armada',
|
||||
'deckhand',
|
||||
'kubernetesprovisioner',
|
||||
'physicalprovisioner',
|
||||
'shipyard']
|
||||
|
||||
|
||||
def test_drydock_health_skip_update_site():
|
||||
"""
|
||||
Ensure that an error is not thrown due to Drydock health failing during
|
||||
update_site or deploy site
|
||||
"""
|
||||
|
||||
expected_log = ('Drydock did not pass health check. Continuing '
|
||||
'as "continue-on-fail" option is enabled.')
|
||||
|
||||
req = Response()
|
||||
req.status_code = None
|
||||
|
||||
action_info = {
|
||||
"dag_id": "update_site",
|
||||
"parameters": {"continue-on-fail": "true"}
|
||||
}
|
||||
|
||||
op = UcpHealthCheckOperator(task_id='test')
|
||||
op.action_info = action_info
|
||||
|
||||
with mock.patch('logging.info', autospec=True) as mock_logger:
|
||||
op.log_health('physicalprovisioner', req)
|
||||
mock_logger.assert_called_with(expected_log)
|
||||
|
||||
action_info = {
|
||||
"dag_id": "deploy_site",
|
||||
"parameters": {"continue-on-fail": "true"}
|
||||
}
|
||||
|
||||
with mock.patch('logging.info', autospec=True) as mock_logger:
|
||||
op.log_health('physicalprovisioner', req)
|
||||
mock_logger.assert_called_with(expected_log)
|
||||
|
||||
|
||||
def test_failure_log_health():
|
||||
""" Ensure an error is thrown on failure for all components.
|
||||
"""
|
||||
action_info = {
|
||||
"dag_id": "update_site",
|
||||
"parameters": {"something-else": "true"}
|
||||
}
|
||||
|
||||
req = Response()
|
||||
req.status_code = None
|
||||
|
||||
op = UcpHealthCheckOperator(task_id='test')
|
||||
op.action_info = action_info
|
||||
|
||||
for i in ucp_components:
|
||||
with pytest.raises(AirflowException) as expected_exc:
|
||||
op.log_health(i, req)
|
||||
assert "Health check failed" in str(expected_exc)
|
||||
|
||||
|
||||
def test_success_log_health():
|
||||
""" Ensure 204 gives correct response for all components
|
||||
"""
|
||||
action_info = {
|
||||
"dag_id": "deploy_site",
|
||||
"parameters": {"something-else": "true"}
|
||||
}
|
||||
|
||||
req = Response()
|
||||
req.status_code = 204
|
||||
|
||||
op = UcpHealthCheckOperator(task_id='test')
|
||||
op.action_info = action_info
|
||||
|
||||
for i in ucp_components:
|
||||
with mock.patch('logging.info', autospec=True) as mock_logger:
|
||||
op.log_health(i, req)
|
||||
mock_logger.assert_called_with('%s is alive and healthy', i)
|
Loading…
x
Reference in New Issue
Block a user