Add UCP Base Operator
1) Refactor Drydock Base Operator to make use of the UCP Base Operator instead 2) Dump logs from Drydock Pods when there are Exceptions Change-Id: I3fbe03d13b5fc89a503cfb2c3c25751076718554
This commit is contained in:
parent
83d91689aa
commit
b9b0e27de0
@ -389,6 +389,8 @@ conf:
|
||||
airflow:
|
||||
worker_endpoint_scheme: 'http'
|
||||
worker_port: 8793
|
||||
k8s_logs:
|
||||
ucp_namespace: 'ucp'
|
||||
airflow_config_file:
|
||||
path: /usr/local/airflow/airflow.cfg
|
||||
airflow:
|
||||
|
@ -249,6 +249,16 @@
|
||||
#auth_section = <None>
|
||||
|
||||
|
||||
[k8s_logs]
|
||||
|
||||
#
|
||||
# From shipyard_airflow
|
||||
#
|
||||
|
||||
# The namespace of the UCP Pods (string value)
|
||||
#ucp_namespace = ucp
|
||||
|
||||
|
||||
[logging]
|
||||
|
||||
#
|
||||
|
@ -202,6 +202,17 @@ SECTIONS = [
|
||||
),
|
||||
]
|
||||
),
|
||||
ConfigSection(
|
||||
name='k8s_logs',
|
||||
title='Parameters for K8s Pods Logs',
|
||||
options=[
|
||||
cfg.StrOpt(
|
||||
'ucp_namespace',
|
||||
default='ucp',
|
||||
help='Namespace of UCP Pods'
|
||||
),
|
||||
]
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
|
@ -11,7 +11,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@ -19,7 +18,6 @@ import time
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.plugins_manager import AirflowPlugin
|
||||
from airflow.utils.decorators import apply_defaults
|
||||
|
||||
@ -28,10 +26,12 @@ import drydock_provisioner.drydock_client.session as session
|
||||
from drydock_provisioner import error as errors
|
||||
from service_endpoint import ucp_service_endpoint
|
||||
from service_token import shipyard_service_token
|
||||
from xcom_puller import XcomPuller
|
||||
from ucp_base_operator import UcpBaseOperator
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DrydockBaseOperator(BaseOperator):
|
||||
class DrydockBaseOperator(UcpBaseOperator):
|
||||
|
||||
"""Drydock Base Operator
|
||||
|
||||
@ -49,14 +49,10 @@ class DrydockBaseOperator(BaseOperator):
|
||||
drydock_svc_endpoint=None,
|
||||
drydock_svc_type='physicalprovisioner',
|
||||
drydock_task_id=None,
|
||||
main_dag_name=None,
|
||||
node_filter=None,
|
||||
redeploy_server=None,
|
||||
shipyard_conf=None,
|
||||
sub_dag_name=None,
|
||||
svc_session=None,
|
||||
svc_token=None,
|
||||
xcom_push=True,
|
||||
*args, **kwargs):
|
||||
"""Initialization of DrydockBaseOperator object.
|
||||
|
||||
@ -66,62 +62,39 @@ class DrydockBaseOperator(BaseOperator):
|
||||
:param drydock_svc_endpoint: Drydock Service Endpoint
|
||||
:param drydock_svc_type: Drydock Service Type
|
||||
:param drydock_task_id: Drydock Task ID
|
||||
:param main_dag_name: Parent Dag
|
||||
:param node_filter: A filter for narrowing the scope of the task.
|
||||
Valid fields are 'node_names', 'rack_names',
|
||||
'node_tags'. Note that node filter is turned
|
||||
off by default, i.e. all nodes will be deployed.
|
||||
:param redeploy_server: Server to be redeployed
|
||||
:param shipyard_conf: Location of shipyard.conf
|
||||
:param sub_dag_name: Child Dag
|
||||
:param svc_session: Keystone Session
|
||||
:param svc_token: Keystone Token
|
||||
:param xcom_push: xcom usage
|
||||
|
||||
The Drydock operator assumes that prior steps have set xcoms for
|
||||
the action and the deployment configuration
|
||||
|
||||
"""
|
||||
|
||||
super(DrydockBaseOperator, self).__init__(*args, **kwargs)
|
||||
super(DrydockBaseOperator,
|
||||
self).__init__(
|
||||
pod_selector_pattern=[{'pod_pattern': 'drydock-api',
|
||||
'container': 'drydock-api'}],
|
||||
*args, **kwargs)
|
||||
self.deckhand_design_ref = deckhand_design_ref
|
||||
self.deckhand_svc_type = deckhand_svc_type
|
||||
self.drydock_client = drydock_client
|
||||
self.drydock_svc_endpoint = drydock_svc_endpoint
|
||||
self.drydock_svc_type = drydock_svc_type
|
||||
self.drydock_task_id = drydock_task_id
|
||||
self.main_dag_name = main_dag_name
|
||||
self.node_filter = node_filter
|
||||
self.redeploy_server = redeploy_server
|
||||
self.shipyard_conf = shipyard_conf
|
||||
self.sub_dag_name = sub_dag_name
|
||||
self.svc_session = svc_session
|
||||
self.svc_token = svc_token
|
||||
self.xcom_push_flag = xcom_push
|
||||
|
||||
def execute(self, context):
|
||||
|
||||
# Execute drydock base function
|
||||
self.drydock_base(context)
|
||||
|
||||
# Exeute child function
|
||||
self.do_execute()
|
||||
|
||||
def drydock_base(self, context):
|
||||
# Initialize Variables
|
||||
drydock_url = None
|
||||
dd_session = None
|
||||
|
||||
# Define task_instance
|
||||
task_instance = context['task_instance']
|
||||
|
||||
# Set up and retrieve values from xcom
|
||||
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
|
||||
self.action_info = self.xcom_puller.get_action_info()
|
||||
self.dc = self.xcom_puller.get_deployment_configuration()
|
||||
def run_base(self, context):
|
||||
|
||||
# Logs uuid of action performed by the Operator
|
||||
logging.info("DryDock Operator for action %s", self.action_info['id'])
|
||||
LOG.info("DryDock Operator for action %s", self.action_info['id'])
|
||||
|
||||
# Retrieve information of the server that we want to redeploy if user
|
||||
# executes the 'redeploy_server' dag
|
||||
@ -131,18 +104,19 @@ class DrydockBaseOperator(BaseOperator):
|
||||
self.action_info['parameters']['server-name'])
|
||||
|
||||
if self.redeploy_server:
|
||||
logging.info("Server to be redeployed is %s",
|
||||
self.redeploy_server)
|
||||
LOG.info("Server to be redeployed is %s",
|
||||
self.redeploy_server)
|
||||
self.node_filter = self.redeploy_server
|
||||
else:
|
||||
raise AirflowException('Unable to retrieve information of '
|
||||
'node to be redeployed!')
|
||||
raise AirflowException('%s was unable to retrieve the '
|
||||
'server to be redeployed.'
|
||||
% self.__class__.__name__)
|
||||
|
||||
# Retrieve Endpoint Information
|
||||
self.drydock_svc_endpoint = ucp_service_endpoint(
|
||||
self, svc_type=self.drydock_svc_type)
|
||||
|
||||
logging.info("Drydock endpoint is %s", self.drydock_svc_endpoint)
|
||||
LOG.info("Drydock endpoint is %s", self.drydock_svc_endpoint)
|
||||
|
||||
# Parse DryDock Service Endpoint
|
||||
drydock_url = urlparse(self.drydock_svc_endpoint)
|
||||
@ -151,25 +125,25 @@ class DrydockBaseOperator(BaseOperator):
|
||||
# information.
|
||||
# The DrydockSession will care for TCP connection pooling
|
||||
# and header management
|
||||
logging.info("Build DryDock Session")
|
||||
LOG.info("Build DryDock Session")
|
||||
dd_session = session.DrydockSession(drydock_url.hostname,
|
||||
port=drydock_url.port,
|
||||
auth_gen=self._auth_gen)
|
||||
|
||||
# Raise Exception if we are not able to set up the session
|
||||
if dd_session:
|
||||
logging.info("Successfully Set Up DryDock Session")
|
||||
LOG.info("Successfully Set Up DryDock Session")
|
||||
else:
|
||||
raise AirflowException("Failed to set up Drydock Session!")
|
||||
|
||||
# Use the DrydockSession to build a DrydockClient that can
|
||||
# be used to make one or more API calls
|
||||
logging.info("Create DryDock Client")
|
||||
LOG.info("Create DryDock Client")
|
||||
self.drydock_client = client.DrydockClient(dd_session)
|
||||
|
||||
# Raise Exception if we are not able to build the client
|
||||
if self.drydock_client:
|
||||
logging.info("Successfully Set Up DryDock client")
|
||||
LOG.info("Successfully Set Up DryDock client")
|
||||
else:
|
||||
raise AirflowException("Failed to set up Drydock Client!")
|
||||
|
||||
@ -177,7 +151,7 @@ class DrydockBaseOperator(BaseOperator):
|
||||
deckhand_svc_endpoint = ucp_service_endpoint(
|
||||
self, svc_type=self.deckhand_svc_type)
|
||||
|
||||
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
|
||||
LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
|
||||
|
||||
# Retrieve last committed revision id
|
||||
committed_revision_id = self.xcom_puller.get_design_version()
|
||||
@ -190,8 +164,8 @@ class DrydockBaseOperator(BaseOperator):
|
||||
str(committed_revision_id),
|
||||
"rendered-documents")
|
||||
if self.deckhand_design_ref:
|
||||
logging.info("Design YAMLs will be retrieved from %s",
|
||||
self.deckhand_design_ref)
|
||||
LOG.info("Design YAMLs will be retrieved from %s",
|
||||
self.deckhand_design_ref)
|
||||
else:
|
||||
raise AirflowException("Unable to Retrieve Design Reference!")
|
||||
|
||||
@ -207,7 +181,7 @@ class DrydockBaseOperator(BaseOperator):
|
||||
create_task_response = {}
|
||||
|
||||
# Node Filter
|
||||
logging.info("Nodes Filter List: %s", self.node_filter)
|
||||
LOG.info("Nodes Filter List: %s", self.node_filter)
|
||||
|
||||
try:
|
||||
# Create Task
|
||||
@ -217,12 +191,15 @@ class DrydockBaseOperator(BaseOperator):
|
||||
node_filter=self.node_filter)
|
||||
|
||||
except errors.ClientError as client_error:
|
||||
# Dump logs from Drydock pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException(client_error)
|
||||
|
||||
# Retrieve Task ID
|
||||
self.drydock_task_id = create_task_response['task_id']
|
||||
logging.info('Drydock %s task ID is %s',
|
||||
task_action, self.drydock_task_id)
|
||||
LOG.info('Drydock %s task ID is %s',
|
||||
task_action, self.drydock_task_id)
|
||||
|
||||
# Raise Exception if we are not able to get the task_id from
|
||||
# Drydock
|
||||
@ -239,7 +216,7 @@ class DrydockBaseOperator(BaseOperator):
|
||||
# We will round off to nearest whole number
|
||||
end_range = round(int(time_out) / int(interval))
|
||||
|
||||
logging.info('Task ID is %s', self.drydock_task_id)
|
||||
LOG.info('Task ID is %s', self.drydock_task_id)
|
||||
|
||||
# Query task status
|
||||
for i in range(0, end_range + 1):
|
||||
@ -251,17 +228,20 @@ class DrydockBaseOperator(BaseOperator):
|
||||
task_status = task_state['status']
|
||||
task_result = task_state['result']['status']
|
||||
|
||||
logging.info("Current status of task id %s is %s",
|
||||
self.drydock_task_id, task_status)
|
||||
LOG.info("Current status of task id %s is %s",
|
||||
self.drydock_task_id, task_status)
|
||||
|
||||
except errors.ClientError as client_error:
|
||||
# Dump logs from Drydock pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException(client_error)
|
||||
|
||||
except:
|
||||
# There can be situations where there are intermittent network
|
||||
# issues that prevents us from retrieving the task state. We
|
||||
# will want to retry in such situations.
|
||||
logging.warning("Unable to retrieve task state. Retrying...")
|
||||
LOG.warning("Unable to retrieve task state. Retrying...")
|
||||
|
||||
# Raise Time Out Exception
|
||||
if task_status == 'running' and i == end_range:
|
||||
@ -270,21 +250,23 @@ class DrydockBaseOperator(BaseOperator):
|
||||
# Exit 'for' loop if the task is in 'complete' or 'terminated'
|
||||
# state
|
||||
if task_status in ['complete', 'terminated']:
|
||||
logging.info('Task result is %s', task_result)
|
||||
LOG.info('Task result is %s', task_result)
|
||||
break
|
||||
else:
|
||||
time.sleep(int(interval))
|
||||
|
||||
# Get final task result
|
||||
if task_result == 'success':
|
||||
logging.info('Task id %s has been successfully completed',
|
||||
self.drydock_task_id)
|
||||
LOG.info('Task id %s has been successfully completed',
|
||||
self.drydock_task_id)
|
||||
else:
|
||||
self.task_failure(True)
|
||||
|
||||
def task_failure(self, _task_failure):
|
||||
# Dump logs from Drydock pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
logging.info('Retrieving all tasks records from Drydock...')
|
||||
LOG.info('Retrieving all tasks records from Drydock...')
|
||||
|
||||
try:
|
||||
# Get all tasks records
|
||||
@ -304,39 +286,38 @@ class DrydockBaseOperator(BaseOperator):
|
||||
# Since there is only 1 failed parent task, we will print index 0
|
||||
# of the list
|
||||
if failed_task:
|
||||
logging.error('%s task has either failed or timed out',
|
||||
failed_task[0]['action'])
|
||||
LOG.error('%s task has either failed or timed out',
|
||||
failed_task[0]['action'])
|
||||
|
||||
logging.error(json.dumps(failed_task[0],
|
||||
indent=4,
|
||||
sort_keys=True))
|
||||
LOG.error(json.dumps(failed_task[0],
|
||||
indent=4,
|
||||
sort_keys=True))
|
||||
|
||||
# Get the list of subtasks belonging to the failed parent task
|
||||
subtask_id_list = failed_task[0]['subtask_id_list']
|
||||
|
||||
logging.info("Printing information of failed sub-tasks...")
|
||||
LOG.info("Printing information of failed sub-tasks...")
|
||||
|
||||
# Print detailed information of failed step(s) under each subtask
|
||||
# This will help to provide additional information for troubleshooting
|
||||
# purpose.
|
||||
for subtask_id in subtask_id_list:
|
||||
|
||||
logging.info("Retrieving details of subtask %s...",
|
||||
subtask_id)
|
||||
LOG.info("Retrieving details of subtask %s...", subtask_id)
|
||||
|
||||
# Retrieve task information
|
||||
task = all_task_ids.get(subtask_id)
|
||||
|
||||
if task:
|
||||
# Print subtask action and state
|
||||
logging.info("%s subtask is in %s state",
|
||||
task['action'],
|
||||
task['result']['status'])
|
||||
LOG.info("%s subtask is in %s state",
|
||||
task['action'],
|
||||
task['result']['status'])
|
||||
|
||||
# Print list containing steps in failure state
|
||||
if task['result']['failures']:
|
||||
logging.error("The following steps have failed:")
|
||||
logging.error(task['result']['failures'])
|
||||
LOG.error("The following steps have failed:")
|
||||
LOG.error(task['result']['failures'])
|
||||
|
||||
message_list = (
|
||||
task['result']['details']['messageList'] or [])
|
||||
@ -346,12 +327,12 @@ class DrydockBaseOperator(BaseOperator):
|
||||
is_error = message['error'] is True
|
||||
|
||||
if is_error:
|
||||
logging.error(json.dumps(message,
|
||||
indent=4,
|
||||
sort_keys=True))
|
||||
LOG.error(json.dumps(message,
|
||||
indent=4,
|
||||
sort_keys=True))
|
||||
else:
|
||||
logging.info("No failed step detected for subtask %s",
|
||||
subtask_id)
|
||||
LOG.info("No failed step detected for subtask %s",
|
||||
subtask_id)
|
||||
|
||||
else:
|
||||
raise AirflowException("Unable to retrieve subtask info!")
|
||||
|
@ -11,7 +11,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin
|
||||
from check_k8s_node_status import check_node_status
|
||||
from drydock_base_operator import DrydockBaseOperator
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DrydockDeployNodesOperator(DrydockBaseOperator):
|
||||
|
||||
@ -47,9 +48,9 @@ class DrydockDeployNodesOperator(DrydockBaseOperator):
|
||||
# and wait before checking the state of the cluster join process.
|
||||
join_wait = self.dc['physical_provisioner.join_wait']
|
||||
|
||||
logging.info("All nodes deployed in MAAS")
|
||||
logging.info("Wait for %d seconds before checking node state...",
|
||||
join_wait)
|
||||
LOG.info("All nodes deployed in MAAS")
|
||||
LOG.info("Wait for %d seconds before checking node state...",
|
||||
join_wait)
|
||||
|
||||
time.sleep(join_wait)
|
||||
|
||||
|
@ -11,7 +11,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
@ -19,6 +18,8 @@ from airflow.plugins_manager import AirflowPlugin
|
||||
|
||||
from drydock_base_operator import DrydockBaseOperator
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DrydockDestroyNodeOperator(DrydockBaseOperator):
|
||||
|
||||
@ -31,17 +32,13 @@ class DrydockDestroyNodeOperator(DrydockBaseOperator):
|
||||
|
||||
def do_execute(self):
|
||||
|
||||
# Retrieve query interval and timeout
|
||||
q_interval = self.dc['physical_provisioner.destroy_interval']
|
||||
task_timeout = self.dc['physical_provisioner.destroy_timeout']
|
||||
|
||||
# NOTE: This is a PlaceHolder function. The 'destroy_node'
|
||||
# functionalities in DryDock is being worked on and is not
|
||||
# ready at the moment.
|
||||
logging.info("Destroying node %s from cluster...",
|
||||
self.redeploy_server)
|
||||
LOG.info("Destroying node %s from cluster...",
|
||||
self.redeploy_server)
|
||||
time.sleep(15)
|
||||
logging.info("Successfully deleted node %s", self.redeploy_server)
|
||||
LOG.info("Successfully deleted node %s", self.redeploy_server)
|
||||
|
||||
|
||||
class DrydockDestroyNodeOperatorPlugin(AirflowPlugin):
|
||||
|
@ -11,7 +11,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from airflow.plugins_manager import AirflowPlugin
|
||||
|
||||
from drydock_base_operator import DrydockBaseOperator
|
||||
|
@ -11,7 +11,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from airflow.plugins_manager import AirflowPlugin
|
||||
|
||||
from drydock_base_operator import DrydockBaseOperator
|
||||
|
@ -11,7 +11,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@ -22,6 +21,8 @@ from airflow.exceptions import AirflowException
|
||||
|
||||
from drydock_base_operator import DrydockBaseOperator
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DrydockValidateDesignOperator(DrydockBaseOperator):
|
||||
|
||||
@ -38,7 +39,7 @@ class DrydockValidateDesignOperator(DrydockBaseOperator):
|
||||
validation_endpoint = os.path.join(self.drydock_svc_endpoint,
|
||||
'validatedesign')
|
||||
|
||||
logging.info("Validation Endpoint is %s", validation_endpoint)
|
||||
LOG.info("Validation Endpoint is %s", validation_endpoint)
|
||||
|
||||
# Define Headers and Payload
|
||||
headers = {
|
||||
@ -53,7 +54,7 @@ class DrydockValidateDesignOperator(DrydockBaseOperator):
|
||||
}
|
||||
|
||||
# Requests DryDock to validate site design
|
||||
logging.info("Waiting for DryDock to validate site design...")
|
||||
LOG.info("Waiting for DryDock to validate site design...")
|
||||
|
||||
try:
|
||||
design_validate_response = requests.post(validation_endpoint,
|
||||
@ -67,15 +68,18 @@ class DrydockValidateDesignOperator(DrydockBaseOperator):
|
||||
validate_site_design = design_validate_response.text
|
||||
|
||||
# Print response
|
||||
logging.info("Retrieving DryDock validate site design response...")
|
||||
logging.info(json.loads(validate_site_design))
|
||||
LOG.info("Retrieving DryDock validate site design response...")
|
||||
LOG.info(json.loads(validate_site_design))
|
||||
|
||||
# Check if site design is valid
|
||||
status = str(json.loads(validate_site_design).get('status',
|
||||
'unspecified'))
|
||||
if status.lower() == 'success':
|
||||
logging.info("DryDock Site Design has been successfully validated")
|
||||
LOG.info("DryDock Site Design has been successfully validated")
|
||||
else:
|
||||
# Dump logs from Drydock pods
|
||||
self.get_k8s_logs()
|
||||
|
||||
raise AirflowException("DryDock Site Design Validation Failed "
|
||||
"with status: {}!".format(status))
|
||||
|
||||
|
@ -11,7 +11,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from airflow.plugins_manager import AirflowPlugin
|
||||
|
||||
from drydock_base_operator import DrydockBaseOperator
|
||||
|
143
shipyard_airflow/plugins/ucp_base_operator.py
Normal file
143
shipyard_airflow/plugins/ucp_base_operator.py
Normal file
@ -0,0 +1,143 @@
|
||||
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import configparser
|
||||
import logging
|
||||
import math
|
||||
from datetime import datetime
|
||||
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.plugins_manager import AirflowPlugin
|
||||
from airflow.utils.decorators import apply_defaults
|
||||
|
||||
from get_k8s_logs import get_pod_logs
|
||||
from get_k8s_logs import K8sLoggingException
|
||||
from xcom_puller import XcomPuller
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UcpBaseOperator(BaseOperator):
|
||||
|
||||
"""UCP Base Operator
|
||||
|
||||
All UCP related workflow operators will use the UCP base
|
||||
operator as the parent and inherit attributes and methods
|
||||
from this class
|
||||
|
||||
"""
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
main_dag_name=None,
|
||||
pod_selector_pattern=None,
|
||||
shipyard_conf=None,
|
||||
start_time=None,
|
||||
sub_dag_name=None,
|
||||
xcom_push=True,
|
||||
*args, **kwargs):
|
||||
"""Initialization of UcpBaseOperator object.
|
||||
|
||||
:param main_dag_name: Parent Dag
|
||||
:param pod_selector_pattern: A list containing the information on
|
||||
the patterns of the Pod name and name
|
||||
of the associated container for log
|
||||
queries. This will allow us to query
|
||||
multiple components, e.g. MAAS and
|
||||
Drydock at the same time. It also allows
|
||||
us to query the logs of specific container
|
||||
in Pods with multiple containers. For
|
||||
instance the Airflow worker pod contains
|
||||
both the airflow-worker container and the
|
||||
log-rotate container.
|
||||
:param shipyard_conf: Location of shipyard.conf
|
||||
:param start_time: Time when Operator gets executed
|
||||
:param sub_dag_name: Child Dag
|
||||
:param xcom_push: xcom usage
|
||||
|
||||
"""
|
||||
|
||||
super(UcpBaseOperator, self).__init__(*args, **kwargs)
|
||||
self.main_dag_name = main_dag_name
|
||||
self.pod_selector_pattern = pod_selector_pattern or []
|
||||
self.shipyard_conf = shipyard_conf
|
||||
self.start_time = datetime.now()
|
||||
self.sub_dag_name = sub_dag_name
|
||||
self.xcom_push_flag = xcom_push
|
||||
|
||||
def execute(self, context):
|
||||
|
||||
# Execute UCP base function
|
||||
self.ucp_base(context)
|
||||
|
||||
# Execute base function
|
||||
self.run_base(context)
|
||||
|
||||
# Exeute child function
|
||||
self.do_execute()
|
||||
|
||||
def ucp_base(self, context):
|
||||
|
||||
LOG.info("Running UCP Base Operator...")
|
||||
|
||||
# Read and parse shiyard.conf
|
||||
config = configparser.ConfigParser()
|
||||
config.read(self.shipyard_conf)
|
||||
|
||||
# Initialize variable
|
||||
self.ucp_namespace = config.get('k8s_logs', 'ucp_namespace')
|
||||
|
||||
# Define task_instance
|
||||
task_instance = context['task_instance']
|
||||
|
||||
# Set up and retrieve values from xcom
|
||||
self.xcom_puller = XcomPuller(self.main_dag_name, task_instance)
|
||||
self.action_info = self.xcom_puller.get_action_info()
|
||||
self.dc = self.xcom_puller.get_deployment_configuration()
|
||||
|
||||
def get_k8s_logs(self):
|
||||
"""Retrieve Kubernetes pod/container logs specified by an opererator
|
||||
|
||||
This method is "best effort" and should not prevent the progress of
|
||||
the workflow processing
|
||||
"""
|
||||
if self.pod_selector_pattern:
|
||||
for selector in self.pod_selector_pattern:
|
||||
# Get difference in current time and time when the
|
||||
# operator was first executed (in seconds)
|
||||
t_diff = (datetime.now() - self.start_time).total_seconds()
|
||||
|
||||
# Note that we will end up with a floating number for
|
||||
# 't_diff' and will need to round it up to the nearest
|
||||
# integer
|
||||
t_diff_int = int(math.ceil(t_diff))
|
||||
|
||||
try:
|
||||
get_pod_logs(selector['pod_pattern'],
|
||||
self.ucp_namespace,
|
||||
selector['container'],
|
||||
t_diff_int)
|
||||
|
||||
except K8sLoggingException as e:
|
||||
LOG.error(e)
|
||||
|
||||
else:
|
||||
LOG.debug("There are no pod logs specified to retrieve")
|
||||
|
||||
|
||||
class UcpBaseOperatorPlugin(AirflowPlugin):
|
||||
|
||||
"""Creates UcpBaseOperator in Airflow."""
|
||||
|
||||
name = 'ucp_base_operator_plugin'
|
||||
operators = [UcpBaseOperator]
|
@ -40,6 +40,8 @@ project_domain_name = default
|
||||
project_name = service
|
||||
user_domain_name = default
|
||||
username = shipyard
|
||||
[k8s_logs]
|
||||
ucp_namespace = ucp
|
||||
[requests_config]
|
||||
airflow_log_connect_timeout = 5
|
||||
airflow_log_read_timeout = 300
|
||||
|
Loading…
x
Reference in New Issue
Block a user