Update Armada Operator

This P.S. will add 'validate site design' workflow to the
Armada Operator. We will make use of the /validatedesign
endpoint in Armada for this purpose.

We will also make use of deckhand reference url instead of
nginx server to retrieve the YAMLs and will remove the existing
'armada_validate' step as that would be done during the initial
stage where the site design is validated.

The reference to the genesis node in the Drydock operator will
be removed as it is no longer needed.

Change-Id: I63e9bcd44d814d12db866032dd56fdfdc28d7b4d
This commit is contained in:
Anthony Lin 2017-12-26 05:55:10 +00:00 committed by Bryan Strassner
parent 411f999826
commit a90d9fa737
8 changed files with 142 additions and 105 deletions

View File

@ -47,15 +47,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
sub_dag_name=child_dag_name,
dag=dag)
# Validate Armada YAMLs
armada_validate = ArmadaOperator(
task_id='armada_validate',
shipyard_conf=config_path,
action='armada_validate',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Armada Apply
armada_apply = ArmadaOperator(
task_id='armada_apply',
@ -77,8 +68,7 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
# Define dependencies
armada_status.set_upstream(armada_client)
armada_validate.set_upstream(armada_status)
armada_apply.set_upstream(armada_validate)
armada_apply.set_upstream(armada_status)
armada_get_releases.set_upstream(armada_apply)
return dag

View File

@ -13,9 +13,9 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import ArmadaOperator
from airflow.operators import DeckhandOperator
from airflow.operators import DryDockOperator
from airflow.operators import PlaceholderOperator
# Location of shiyard.conf
# Note that the shipyard.conf file needs to be placed on a volume
@ -45,10 +45,16 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
action='validate_site_design',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
retries=3,
dag=dag)
# TODO () use the real operator here
armada_validate_docs = PlaceholderOperator(
task_id='armada_validate_site_design', dag=dag)
armada_validate_docs = ArmadaOperator(
task_id='armada_validate_site_design',
shipyard_conf=config_path,
action='validate_site_design',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
retries=3,
dag=dag)
return dag

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import requests
@ -58,8 +59,8 @@ class ArmadaOperator(BaseOperator):
def execute(self, context):
# Initialize Variables
context['svc_type'] = 'armada'
armada_client = None
design_ref = None
# Define task_instance
task_instance = context['task_instance']
@ -78,7 +79,9 @@ class ArmadaOperator(BaseOperator):
# Create Armada Client
if self.action == 'create_armada_client':
# Retrieve Endpoint Information
context['svc_endpoint'] = ucp_service_endpoint(self, context)
svc_type = 'armada'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Armada endpoint is %s", context['svc_endpoint'])
# Set up Armada Client
@ -86,6 +89,35 @@ class ArmadaOperator(BaseOperator):
return session_client
# Retrieve Deckhand Design Reference
design_ref = self.get_deckhand_design_ref(context)
if design_ref:
logging.info("Design YAMLs will be retrieved from %s",
design_ref)
else:
raise AirflowException("Unable to Retrieve Design Reference!")
# Validate Site Design
if self.action == 'validate_site_design':
# Initialize variable
site_design_validity = 'invalid'
# Retrieve Endpoint Information
svc_type = 'armada'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
site_design_validity = self.armada_validate_site_design(context,
design_ref)
if site_design_validity == 'valid':
logging.info("Site Design has been successfully validated")
else:
raise AirflowException("Site Design Validation Failed!")
return site_design_validity
# Retrieve armada_client via XCOM so as to perform other tasks
armada_client = task_instance.xcom_pull(
task_ids='create_armada_client',
@ -94,21 +126,14 @@ class ArmadaOperator(BaseOperator):
# Retrieve Tiller Information and assign to context 'query'
context['query'] = self.get_tiller_info(context)
# Retrieve Genesis Node IP and assign it to context 'genesis_ip'
context['genesis_ip'] = self.get_genesis_node_info(context)
# Armada API Call
# Armada Status
if self.action == 'armada_status':
self.get_armada_status(context, armada_client)
# Armada Validate
elif self.action == 'armada_validate':
self.armada_validate(context, armada_client)
# Armada Apply
elif self.action == 'armada_apply':
self.armada_apply(context, armada_client)
self.armada_apply(context, armada_client, design_ref)
# Armada Get Releases
elif self.action == 'armada_get_releases':
@ -130,9 +155,11 @@ class ArmadaOperator(BaseOperator):
# Build a ArmadaSession with credentials and target host
# information.
logging.info("Build Armada Session")
a_session = session.ArmadaSession(armada_url.hostname,
a_session = session.ArmadaSession(host=armada_url.hostname,
port=armada_url.port,
token=context['svc_token'])
scheme='http',
token=context['svc_token'],
marker=None)
# Raise Exception if we are not able to get armada session
if a_session:
@ -183,42 +210,21 @@ class ArmadaOperator(BaseOperator):
else:
raise AirflowException("Please check Tiller!")
def armada_validate(self, context, armada_client):
# Initialize Variables
armada_manifest = None
valid_armada_yaml = {}
# Retrieve Armada Manifest
armada_manifest = self.get_armada_yaml(context)
# Validate armada yaml file
logging.info("Armada Validate")
valid_armada_yaml = armada_client.post_validate(armada_manifest)
# The response will be a dictionary indicating whether the yaml
# file is valid or invalid. We will check the Boolean value in
# this case.
if valid_armada_yaml['valid']:
logging.info("Armada Yaml File is Valid")
else:
raise AirflowException("Invalid Armada Yaml File!")
def armada_apply(self, context, armada_client):
def armada_apply(self, context, armada_client, design_ref):
# Initialize Variables
armada_manifest = None
armada_ref = design_ref
armada_post_apply = {}
override_values = []
chart_set = []
# Retrieve Armada Manifest
armada_manifest = self.get_armada_yaml(context)
# Execute Armada Apply to install the helm charts in sequence
logging.info("Armada Apply")
armada_post_apply = armada_client.post_apply(armada_manifest,
override_values,
chart_set,
context['query'])
armada_post_apply = armada_client.post_apply(manifest=armada_manifest,
manifest_ref=armada_ref,
values=override_values,
set=chart_set,
query=context['query'])
# We will expect Armada to return the releases that it is
# deploying. An empty value for 'install' means that armada
@ -247,44 +253,82 @@ class ArmadaOperator(BaseOperator):
else:
raise AirflowException("Failed to retrieve Armada Releases")
@get_pod_port_ip('maas-rack')
def get_genesis_node_info(self, context, *args):
def get_deckhand_design_ref(self, context):
# Get IP and port information of Pods from context
k8s_pods_ip_port = context['pods_ip_port']
# Retrieve DeckHand Endpoint Information
svc_type = 'deckhand'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", context['svc_endpoint'])
# The maas-rack pod has the same IP as the genesis node
# We will retieve that IP and return the value
return k8s_pods_ip_port['maas-rack'].get('ip')
# Retrieve revision_id from xcom
# Note that in the case of 'deploy_site', the dag_id will
# be 'deploy_site.deckhand_get_design_version' for the
# 'deckhand_get_design_version' task. We need to extract
# the xcom value from it in order to get the value of the
# last committed revision ID
committed_revision_id = context['task_instance'].xcom_pull(
task_ids='deckhand_get_design_version',
dag_id=self.main_dag_name + '.deckhand_get_design_version')
def get_armada_yaml(self, context):
# Initialize Variables
genesis_node_ip = None
# Form Design Reference Path that we will use to retrieve
# the Design YAMLs
deckhand_path = "deckhand+" + context['svc_endpoint']
deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(committed_revision_id),
"rendered-documents")
# At this point in time, testing of the operator is being done by
# retrieving the armada.yaml from the nginx container on the Genesis
# node and feeding it to Armada as a string. We will assume that the
# file name is fixed and will always be 'armada_site.yaml'. This file
# will always be under the osh directory. This will change in the near
# future when Armada is integrated with DeckHand.
genesis_node_ip = context['genesis_ip']
return deckhand_design_ref
# Form Endpoint
schema = 'http://'
nginx_host_port = genesis_node_ip + ':6880'
armada_yaml = 'osh/armada.yaml'
design_ref = os.path.join(schema, nginx_host_port, armada_yaml)
@shipyard_service_token
def armada_validate_site_design(self, context, design_ref):
logging.info("Armada YAML will be retrieved from %s", design_ref)
# Form Validation Endpoint
validation_endpoint = os.path.join(context['svc_endpoint'],
'validatedesign')
logging.info("Validation Endpoint is %s", validation_endpoint)
# Define Headers and Payload
headers = {
'Content-Type': 'application/json',
'X-Auth-Token': context['svc_token']
}
payload = {
'rel': "design",
'href': design_ref,
'type': "application/x-yaml"
}
# Requests Armada to validate site design
logging.info("Waiting for Armada to validate site design...")
# TODO: We will implement the new approach when Armada and DeckHand
# integration is completed.
try:
armada_manifest = requests.get(design_ref, timeout=30).text
design_validate_response = requests.post(validation_endpoint,
headers=headers,
data=json.dumps(payload))
except requests.exceptions.RequestException as e:
raise AirflowException(e)
return armada_manifest
# Convert response to string
validate_site_design = design_validate_response.text
# Print response
logging.info("Retrieving Armada validate site design response...")
try:
validate_site_design_dict = json.loads(validate_site_design)
logging.info(validate_site_design_dict)
except json.JSONDecodeError as e:
raise AirflowException(e)
# Check if site design is valid
if validate_site_design_dict.get('status') == 'Success':
return 'valid'
else:
return 'invalid'
class ArmadaOperatorPlugin(AirflowPlugin):

View File

@ -55,7 +55,6 @@ class DeckhandOperator(BaseOperator):
def execute(self, context):
# Initialize Variables
context['svc_type'] = 'deckhand'
deckhand_design_version = None
# Define task_instance
@ -73,7 +72,9 @@ class DeckhandOperator(BaseOperator):
logging.info("DeckHand Operator for action %s", workflow_info['id'])
# Retrieve Endpoint Information
context['svc_endpoint'] = ucp_service_endpoint(self, context)
svc_type = 'deckhand'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", context['svc_endpoint'])
# Deckhand API Call

View File

@ -68,9 +68,6 @@ class DryDockOperator(BaseOperator):
self.xcom_push_flag = xcom_push
def execute(self, context):
# Initialize Variables
context['svc_type'] = 'physicalprovisioner'
genesis_node_ip = None
# Placeholder definition
# TODO: Need to decide how to pass the required value from Shipyard to
@ -94,7 +91,9 @@ class DryDockOperator(BaseOperator):
# DrydockClient
if self.action == 'create_drydock_client':
# Retrieve Endpoint Information
context['svc_endpoint'] = ucp_service_endpoint(self, context)
svc_type = 'physicalprovisioner'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("DryDock endpoint is %s", context['svc_endpoint'])
# Set up DryDock Client
@ -116,11 +115,10 @@ class DryDockOperator(BaseOperator):
# Initialize variable
site_design_validity = 'invalid'
# Reset 'svc_type' to DryDock instead of DeckHand
context['svc_type'] = 'physicalprovisioner'
# Retrieve Endpoint Information
context['svc_endpoint'] = ucp_service_endpoint(self, context)
svc_type = 'physicalprovisioner'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
site_design_validity = self.drydock_validate_design(context)
@ -372,8 +370,9 @@ class DryDockOperator(BaseOperator):
def get_deckhand_design_ref(self, context):
# Retrieve DeckHand Endpoint Information
context['svc_type'] = 'deckhand'
context['svc_endpoint'] = ucp_service_endpoint(self, context)
svc_type = 'deckhand'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", context['svc_endpoint'])
# Retrieve revision_id from xcom

View File

@ -20,14 +20,14 @@ from airflow.exceptions import AirflowException
from service_session import ucp_keystone_session
def ucp_service_endpoint(self, context):
def ucp_service_endpoint(self, svc_type):
# Initialize variables
retry = 0
int_endpoint = None
# Retrieve Keystone Session
sess = ucp_keystone_session(self, context)
sess = ucp_keystone_session(self, svc_type)
# We will allow 1 retry in getting the Keystone Endpoint with a
# backoff interval of 10 seconds in case there is a temporary
@ -38,7 +38,7 @@ def ucp_service_endpoint(self, context):
# We will make use of internal endpoint
logging.info("Get Keystone Endpoint")
int_endpoint = sess.get_endpoint(interface='internal',
service_type=context['svc_type'])
service_type=svc_type)
# Retry if we fail to get keystone endpoint
if int_endpoint:

View File

@ -23,7 +23,7 @@ from service_session import ucp_keystone_session
def shipyard_service_token(func):
@wraps(func)
def keystone_token_get(self, context):
def keystone_token_get(self, context, *args):
"""This function retrieves Keystone token for UCP Services
:param context: Information on the current workflow
@ -71,6 +71,6 @@ def shipyard_service_token(func):
if not token:
raise AirflowException("Unable to get Keystone Token!")
else:
return func(self, context)
return func(self, context, *args)
return keystone_token_get

View File

@ -51,15 +51,12 @@ class UcpHealthCheckOperator(BaseOperator):
# Loop through various UCP Components
for i in ucp_components:
# Define context 'svc_type'
context['svc_type'] = i
# Retrieve Endpoint Information
context['svc_endpoint'] = ucp_service_endpoint(self, context)
logging.info("%s endpoint is %s", i, context['svc_endpoint'])
service_endpoint = ucp_service_endpoint(self, svc_type=i)
logging.info("%s endpoint is %s", i, service_endpoint)
# Construct Health Check Endpoint
healthcheck_endpoint = os.path.join(context['svc_endpoint'],
healthcheck_endpoint = os.path.join(service_endpoint,
'health')
logging.info("%s healthcheck endpoint is %s", i,