Enhance Error Logging for Drydock Operator

There is currently minimal error logging for the DryDock
Operator.

This patch set will provide more information with regards
to the parent and child tasks in the event of a failure
or time out.

Change-Id: Ief0f23031334d10300d09c6318537b02d8b3236b
This commit is contained in:
Anthony Lin 2018-03-13 09:40:38 +00:00 committed by Bryan Strassner
parent 4fd8d79574
commit e604118dfb

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import time
@ -93,7 +94,6 @@ class DrydockBaseOperator(BaseOperator):
self.node_filter = node_filter
self.redeploy_server = redeploy_server
self.shipyard_conf = shipyard_conf
self.svc_token = svc_token
self.sub_dag_name = sub_dag_name
self.svc_session = svc_session
self.svc_token = svc_token
@ -142,6 +142,8 @@ class DrydockBaseOperator(BaseOperator):
self.drydock_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.drydock_svc_type)
logging.info("Drydock endpoint is %s", self.drydock_svc_endpoint)
# Parse DryDock Service Endpoint
drydock_url = urlparse(self.drydock_svc_endpoint)
@ -263,7 +265,7 @@ class DrydockBaseOperator(BaseOperator):
# Raise Time Out Exception
if task_status == 'running' and i == end_range:
raise AirflowException("Task Execution Timed Out!")
self.task_failure(False)
# Exit 'for' loop if the task is in 'complete' or 'terminated'
# state
@ -278,7 +280,87 @@ class DrydockBaseOperator(BaseOperator):
logging.info('Task id %s has been successfully completed',
self.drydock_task_id)
else:
raise AirflowException("Failed to execute/complete task!")
self.task_failure(True)
def task_failure(self, _task_failure):
logging.info('Retrieving all tasks records from Drydock...')
try:
# Get all tasks records
all_tasks = self.drydock_client.get_tasks()
# Create a dictionary of tasks records with 'task_id' as key
all_task_ids = {t['task_id']: t for t in all_tasks}
except errors.ClientError as client_error:
raise AirflowException(client_error)
# Retrieve the failed parent task and assign it to list
failed_task = (
[x for x in all_tasks if x['task_id'] == self.drydock_task_id])
# Print detailed information of failed parent task in json output
# Since there is only 1 failed parent task, we will print index 0
# of the list
if failed_task:
logging.error('%s task has either failed or timed out',
failed_task[0]['action'])
logging.error(json.dumps(failed_task[0],
indent=4,
sort_keys=True))
# Get the list of subtasks belonging to the failed parent task
subtask_id_list = failed_task[0]['subtask_id_list']
logging.info("Printing information of failed sub-tasks...")
# Print detailed information of failed step(s) under each subtask
# This will help to provide additional information for troubleshooting
# purpose.
for subtask_id in subtask_id_list:
logging.info("Retrieving details of subtask %s...",
subtask_id)
# Retrieve task information
task = all_task_ids.get(subtask_id)
if task:
# Print subtask action and state
logging.info("%s subtask is in %s state",
task['action'],
task['result']['status'])
# Print list containing steps in failure state
if task['result']['failures']:
logging.error("The following steps have failed:")
logging.error(task['result']['failures'])
message_list = (
task['result']['details']['messageList'] or [])
# Print information of failed steps
for message in message_list:
is_error = message['error'] is True
if is_error:
logging.error(json.dumps(message,
indent=4,
sort_keys=True))
else:
logging.info("No failed step detected for subtask %s",
subtask_id)
else:
raise AirflowException("Unable to retrieve subtask info!")
# Raise Exception to terminate workflow
if _task_failure:
raise AirflowException("Failed to Execute/Complete Task!")
else:
raise AirflowException("Task Execution Timed Out!")
class DrydockBaseOperatorPlugin(AirflowPlugin):