Ensure pod logs are fetched in case of exception in any operator
This patch tries to cover some edge cases could happen during Shipyard Airflow operator execution. All operators at the moment make interactions with other services i.e. k8s pods. In a case of exceptions during execution of the operator, logs will be fetched from the appropriate pod and if the operator has "fetch_failure_details" method (see DrydockBaseOperator) it will be called as well. What exception could happen during an operator execution? Besides explicitly defined in code like DrydockClientUseFailureException, other exception e.g. KeyError or similar may be raised. It's not clear who is a culprit in that client side (Shipyard) or server side (Drydock, Armada, Deckhand, Promenade). So this patch applies defensive mode and gets logs from pods and gets additional details for any exceptional situations. For doing that do_execute method is wrapped with try..except in UcpBaseOperator.execute. While fetching logs from a pod and fetching failure details it makes appropriate logging by itself and finally reraises the original exception. Change-Id: If1501e9a24b05edb6eb32c7b1b2d27f24f3ee063
This commit is contained in:
parent
d937a165e2
commit
4164518502
@ -58,9 +58,6 @@ class ArmadaGetReleasesOperator(ArmadaBaseOperator):
|
||||
LOG.info("Successfully retrieved Helm charts releases")
|
||||
LOG.info(armada_get_releases)
|
||||
else:
|
||||
# Dump logs from Armada API pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException("Failed to retrieve Helm charts releases!")
|
||||
|
||||
|
||||
|
@ -61,9 +61,6 @@ class ArmadaGetStatusOperator(ArmadaBaseOperator):
|
||||
armada_get_status['tiller']['version'])
|
||||
|
||||
else:
|
||||
# Dump logs from Armada API pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException("Please check Tiller!")
|
||||
|
||||
|
||||
|
@ -69,9 +69,6 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator):
|
||||
timeout=timeout)
|
||||
|
||||
except errors.ClientError as client_error:
|
||||
# Dump logs from Armada API pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException(client_error)
|
||||
|
||||
# if this is a retry, assume that the airflow worker needs to be
|
||||
|
@ -50,9 +50,6 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator):
|
||||
manifest=self.design_ref, timeout=timeout)
|
||||
|
||||
except errors.ClientError as client_error:
|
||||
# Dump logs from Armada API pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException(client_error)
|
||||
|
||||
# Print results
|
||||
@ -65,9 +62,6 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator):
|
||||
if status.lower() == 'success':
|
||||
LOG.info("Site Design has been successfully validated")
|
||||
else:
|
||||
# Dump logs from Armada API pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException("Site Design Validation Failed "
|
||||
"with status: {}!".format(status))
|
||||
|
||||
|
@ -70,9 +70,6 @@ class DeckhandCreateSiteActionTagOperator(DeckhandBaseOperator):
|
||||
self.revision_id)
|
||||
|
||||
except:
|
||||
# Dump logs from Deckhand pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException("Failed to create revision tag!")
|
||||
|
||||
def check_task_result(self, task_id):
|
||||
|
@ -46,9 +46,6 @@ class DeckhandRetrieveRenderedDocOperator(DeckhandBaseOperator):
|
||||
LOG.info("Successfully Retrieved Rendered Document")
|
||||
|
||||
except:
|
||||
# Dump logs from Deckhand pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException("Failed to Retrieve Rendered Document!")
|
||||
|
||||
|
||||
|
@ -57,16 +57,10 @@ class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator):
|
||||
timeout=self.validation_read_timeout).text)
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
# Dump logs from Deckhand pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException(e)
|
||||
|
||||
if (any([str(v.get('status', 'unspecified')).lower() == 'failure'
|
||||
for v in retrieved_list.get('results', [])])):
|
||||
# Dump logs from Deckhand pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException("DeckHand Site Design Validation Failed!")
|
||||
else:
|
||||
LOG.info("Revision %d has been successfully validated",
|
||||
|
@ -179,9 +179,6 @@ class DrydockBaseOperator(UcpBaseOperator):
|
||||
node_filter=self.node_filter)
|
||||
|
||||
except errors.ClientError as client_error:
|
||||
# Dump logs from Drydock pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise DrydockClientUseFailureException(client_error)
|
||||
|
||||
# Retrieve Task ID
|
||||
@ -221,7 +218,6 @@ class DrydockBaseOperator(UcpBaseOperator):
|
||||
LOG.info("Current status of task id %s is %s",
|
||||
self.drydock_task_id, task_status)
|
||||
except DrydockClientUseFailureException:
|
||||
self.get_k8s_logs()
|
||||
raise
|
||||
except:
|
||||
# There can be situations where there are intermittent network
|
||||
@ -234,10 +230,7 @@ class DrydockBaseOperator(UcpBaseOperator):
|
||||
# TODO(bryan-strassner) If Shipyard has timed out waiting for
|
||||
# this task to complete, and Drydock has provided a means
|
||||
# to cancel a task, that cancellation should be done here.
|
||||
|
||||
# task_failure only exits with an exception, so this is the
|
||||
# end of processing in the case of a timeout.
|
||||
self.task_failure(False)
|
||||
raise DrydockTaskTimeoutException("Task Execution Timed Out!")
|
||||
|
||||
# Exit 'for' loop if the task is in 'complete' or 'terminated'
|
||||
# state
|
||||
@ -252,7 +245,8 @@ class DrydockBaseOperator(UcpBaseOperator):
|
||||
LOG.info('Task id %s has been successfully completed',
|
||||
self.drydock_task_id)
|
||||
else:
|
||||
self.task_failure(True)
|
||||
raise DrydockTaskFailedException(
|
||||
"Failed to Execute/Complete Task!")
|
||||
|
||||
def get_task_dict(self, task_id):
|
||||
"""Retrieve task output in its raw dictionary format
|
||||
@ -268,10 +262,7 @@ class DrydockBaseOperator(UcpBaseOperator):
|
||||
except errors.ClientError as client_error:
|
||||
raise DrydockClientUseFailureException(client_error)
|
||||
|
||||
def task_failure(self, _task_failure):
|
||||
# Dump logs from Drydock pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
def fetch_failure_details(self):
|
||||
LOG.info('Retrieving all tasks records from Drydock...')
|
||||
|
||||
try:
|
||||
@ -297,21 +288,14 @@ class DrydockBaseOperator(UcpBaseOperator):
|
||||
|
||||
LOG.error(pprint.pprint(failed_parent_task[0]))
|
||||
|
||||
# Get the list of subtasks belonging to the failed parent task
|
||||
parent_subtask_id_list = failed_parent_task[0]['subtask_id_list']
|
||||
# Get the list of subtasks belonging to the failed parent task
|
||||
parent_subtask_id_list = failed_parent_task[0]['subtask_id_list']
|
||||
|
||||
# Check for failed subtasks
|
||||
self.check_subtask_failure(parent_subtask_id_list)
|
||||
|
||||
# Raise Exception to terminate workflow
|
||||
if _task_failure:
|
||||
raise DrydockTaskFailedException(
|
||||
"Failed to Execute/Complete Task!"
|
||||
)
|
||||
# Check for failed subtasks
|
||||
self.check_subtask_failure(parent_subtask_id_list)
|
||||
else:
|
||||
raise DrydockTaskTimeoutException(
|
||||
"Task Execution Timed Out!"
|
||||
)
|
||||
LOG.info("No failed parent task found for task_id %s",
|
||||
self.drydock_task_id)
|
||||
|
||||
def check_subtask_failure(self, subtask_id_list):
|
||||
|
||||
|
@ -81,9 +81,6 @@ class DrydockValidateDesignOperator(DrydockBaseOperator):
|
||||
if status.lower() == 'success':
|
||||
LOG.info("DryDock Site Design has been successfully validated")
|
||||
else:
|
||||
# Dump logs from Drydock pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException("DryDock Site Design Validation Failed "
|
||||
"with status: {}!".format(status))
|
||||
|
||||
|
@ -89,9 +89,6 @@ class PromenadeValidateSiteDesignOperator(PromenadeBaseOperator):
|
||||
LOG.info("Promenade Site Design has been successfully validated")
|
||||
|
||||
else:
|
||||
# Dump logs from Promenade pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException("Promenade Site Design Validation Failed "
|
||||
"with status: {}!".format(status))
|
||||
|
||||
|
@ -102,8 +102,18 @@ class UcpBaseOperator(BaseOperator):
|
||||
self.run_base(context)
|
||||
|
||||
if self.continue_processing:
|
||||
# Exeute child function
|
||||
self.do_execute()
|
||||
# Execute child function
|
||||
try:
|
||||
self.do_execute()
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
'Exception happened during %s execution, '
|
||||
'will try to log additional details',
|
||||
self.__class__.__name__)
|
||||
self.get_k8s_logs()
|
||||
if hasattr(self, 'fetch_failure_details'):
|
||||
self.fetch_failure_details()
|
||||
raise
|
||||
|
||||
def ucp_base(self, context):
|
||||
|
||||
|
@ -13,3 +13,6 @@ project_domain_name = default
|
||||
project_name = service
|
||||
user_domain_name = default
|
||||
username = shipyard
|
||||
|
||||
[k8s_logs]
|
||||
ucp_namespace = fake_ucp
|
||||
|
@ -0,0 +1,62 @@
|
||||
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for drydock base operator functions"""
|
||||
import os
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from shipyard_airflow.plugins.drydock_verify_site import (
|
||||
DrydockVerifySiteOperator
|
||||
)
|
||||
from shipyard_airflow.plugins.drydock_errors import (
|
||||
DrydockTaskFailedException,
|
||||
)
|
||||
|
||||
CONF_FILE = os.path.join(os.path.dirname(__file__), 'test.conf')
|
||||
|
||||
|
||||
@mock.patch('shipyard_airflow.plugins.ucp_base_operator.get_pod_logs')
|
||||
def test_logs_fetched_if_exception_in_create_task(get_pod_logs):
|
||||
client = mock.MagicMock()
|
||||
err = 'Fake create task method failed'
|
||||
client.create_task.side_effect = ValueError(err)
|
||||
dvs = DrydockVerifySiteOperator(
|
||||
task_id="t1",
|
||||
shipyard_conf=CONF_FILE,
|
||||
drydock_client=client)
|
||||
dvs._deckhand_design_ref = mock.MagicMock()
|
||||
dvs._continue_processing_flag = mock.MagicMock(return_value=True)
|
||||
dvs._setup_drydock_client = mock.MagicMock()
|
||||
with pytest.raises(ValueError, match=err):
|
||||
dvs.execute(mock.MagicMock())
|
||||
assert get_pod_logs.called
|
||||
assert client.get_tasks.called
|
||||
|
||||
|
||||
@mock.patch('time.sleep', mock.MagicMock())
|
||||
@mock.patch('shipyard_airflow.plugins.ucp_base_operator.get_pod_logs')
|
||||
def test_logs_fetched_if_exception_in_query_task(get_pod_logs):
|
||||
client = mock.MagicMock()
|
||||
dvs = DrydockVerifySiteOperator(
|
||||
task_id="t1",
|
||||
shipyard_conf=CONF_FILE,
|
||||
drydock_client=client)
|
||||
dvs._deckhand_design_ref = mock.MagicMock()
|
||||
dvs._continue_processing_flag = mock.MagicMock(return_value=True)
|
||||
dvs._setup_drydock_client = mock.MagicMock()
|
||||
with pytest.raises(DrydockTaskFailedException):
|
||||
dvs.execute(mock.MagicMock())
|
||||
assert get_pod_logs.called
|
||||
assert client.get_tasks.called
|
Loading…
x
Reference in New Issue
Block a user