[fix] Upgrade airflow worker aggressively
Changes to a more aggresssive approach when deciding to restart the airflow worker - if any retries of the armada_post_apply step have occurred, err on the side of restarting the worker. Change-Id: Ic9b96f7a2ce729535028f019540523d895f5b384
This commit is contained in:
parent
d40e9776d3
commit
ac4dac972d
@ -92,15 +92,15 @@ class ArmadaBaseOperator(BaseOperator):
|
||||
def armada_base(self, context):
|
||||
|
||||
# Define task_instance
|
||||
task_instance = context['task_instance']
|
||||
self.task_instance = context['task_instance']
|
||||
|
||||
# Set up and retrieve values from xcom
|
||||
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
|
||||
self.xcom_puller = XcomPuller(self.main_dag_name, self.task_instance)
|
||||
self.action_info = self.xcom_puller.get_action_info()
|
||||
self.dc = self.xcom_puller.get_deployment_configuration()
|
||||
|
||||
# Set up xcom_pusher to push values to xcom
|
||||
self.xcom_pusher = XcomPusher(context['task_instance'])
|
||||
self.xcom_pusher = XcomPusher(self.task_instance)
|
||||
|
||||
# Logs uuid of action performed by the Operator
|
||||
logging.info("Armada Operator for action %s", self.action_info['id'])
|
||||
|
@ -64,40 +64,35 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator):
|
||||
timeout=timeout)
|
||||
|
||||
except errors.ClientError as client_error:
|
||||
# Set 'get_attempted_failed_install_upgrade' xcom to 'true'
|
||||
self.xcom_pusher.xcom_push(
|
||||
key='get_attempted_failed_install_upgrade',
|
||||
value='true')
|
||||
|
||||
raise AirflowException(client_error)
|
||||
|
||||
# Retrieve xcom for 'get_attempted_failed_install_upgrade'
|
||||
# NOTE: The key will only be set to 'true' if there was a failed
|
||||
# attempt to upgrade or update the Helm charts. It does not hold
|
||||
# any value by default.
|
||||
if self.xcom_puller.get_attempted_failed_install_upgrade() == 'true':
|
||||
# NOTE: It is possible for Armada to return a HTTP 500 response
|
||||
# even though the Helm charts have been upgraded/updated. The
|
||||
# workflow will treat the 'Armada Apply' task as a failed attempt
|
||||
# in such situation and proceed to schedule and run the task for
|
||||
# a second time (the default is 3 retries). As the relevant Helm
|
||||
# Charts would have already been updated, we will get an empty
|
||||
# list from Armada for that second retry. As a workaround, we will
|
||||
# need to treat such response as a successful upgrade/update.
|
||||
# A long term solution will be in place in the future.
|
||||
if (not armada_post_apply['message']['install'] and
|
||||
not armada_post_apply['message']['upgrade']):
|
||||
upgrade_airflow_worker = True
|
||||
|
||||
# Search for Shipyard deployment in the list of chart upgrades
|
||||
# NOTE: It is possible for the chart name to take on different
|
||||
# values, e.g. 'aic-ucp-shipyard', 'ucp-shipyard'. Hence we
|
||||
# will search for the word 'shipyard', which should exist as
|
||||
# part of the name of the Shipyard Helm Chart.
|
||||
for i in armada_post_apply['message']['upgrade']:
|
||||
if 'shipyard' in i:
|
||||
upgrade_airflow_worker = True
|
||||
break
|
||||
# if this is a retry, assume that the airflow worker needs to be
|
||||
# updated at the end of the workflow.
|
||||
# TODO(bryan-strassner) need to persist the decision to restart the
|
||||
# airflow worker outside of the xcom structure. This is a work-
|
||||
# around that will restart the worker more often than it
|
||||
# needs to. Problem with xcom is that it is cleared for the task
|
||||
# on retry, which means we can't use it as a flag reliably.
|
||||
if self.task_instance.try_number > 1:
|
||||
logging.info(
|
||||
"Airflow Worker will be upgraded because retry may obfuscate "
|
||||
"an upgrade of shipyard/airflow."
|
||||
)
|
||||
upgrade_airflow_worker = True
|
||||
else:
|
||||
# Search for Shipyard deployment in the list of chart upgrades
|
||||
# NOTE: It is possible for the chart name to take on different
|
||||
# values, e.g. 'aic-ucp-shipyard', 'ucp-shipyard'. Hence we
|
||||
# will search for the word 'shipyard', which should exist as
|
||||
# part of the name of the Shipyard Helm Chart.
|
||||
for i in armada_post_apply['message']['upgrade']:
|
||||
if 'shipyard' in i:
|
||||
logging.info(
|
||||
"Shipyard was upgraded. Airflow worker must be "
|
||||
"restarted to reflect any workflow changes."
|
||||
)
|
||||
upgrade_airflow_worker = True
|
||||
break
|
||||
|
||||
# Create xcom key 'upgrade_airflow_worker'
|
||||
# Value of key will depend on whether an upgrade has been
|
||||
|
@ -82,13 +82,3 @@ class XcomPuller(object):
|
||||
return self._get_xcom(source_task=source_task,
|
||||
dag_id=source_dag,
|
||||
key=key)
|
||||
|
||||
def get_attempted_failed_install_upgrade(self):
|
||||
"""Retrieve information on whether there was a failed attempt
|
||||
of Armada Apply"""
|
||||
source_task = 'armada_post_apply'
|
||||
source_dag = 'armada_build'
|
||||
key = 'get_attempted_failed_install_upgrade'
|
||||
return self._get_xcom(source_task=source_task,
|
||||
dag_id=source_dag,
|
||||
key=key)
|
||||
|
Loading…
x
Reference in New Issue
Block a user