Send Env to Conductor for deletion

This commit is contained in:
Serg Melikyan 2013-03-26 13:36:05 +04:00
parent bac6e8e3db
commit a006952080
3 changed files with 46 additions and 21 deletions

View File

@ -1,10 +1,16 @@
from amqplib.client_0_8 import Message
import anyjson
import eventlet
from webob import exc from webob import exc
from portas.common import config
from portas.api.v1 import get_env_status from portas.api.v1 import get_env_status
from portas.db.session import get_session from portas.db.session import get_session
from portas.db.models import Environment from portas.db.models import Environment
from portas.openstack.common import wsgi from portas.openstack.common import wsgi
from portas.openstack.common import log as logging from portas.openstack.common import log as logging
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
rabbitmq = config.CONF.rabbitmq
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -85,6 +91,24 @@ class Controller(object):
with session.begin(): with session.begin():
session.delete(environment) session.delete(environment)
#preparing data for removal from conductor
env = environment.description
env['services'] = []
env['deleted'] = True
connection = amqp.Connection('{0}:{1}'.
format(rabbitmq.host, rabbitmq.port),
virtual_host=rabbitmq.virtual_host,
userid=rabbitmq.userid,
password=rabbitmq.password,
ssl=rabbitmq.use_ssl, insist=True)
channel = connection.channel()
channel.exchange_declare('tasks', 'direct', durable=True,
auto_delete=False)
channel.basic_publish(Message(body=anyjson.serialize(env)), 'tasks',
'tasks')
return None return None

View File

@ -1,7 +1,6 @@
from amqplib.client_0_8 import Message from amqplib.client_0_8 import Message
import anyjson import anyjson
import eventlet import eventlet
from eventlet.semaphore import Semaphore
from webob import exc from webob import exc
from portas.common import config from portas.common import config
from portas.db.models import Session, Status, Environment from portas.db.models import Session, Status, Environment
@ -15,18 +14,6 @@ log = logging.getLogger(__name__)
class Controller(object): class Controller(object):
def __init__(self):
self.write_lock = Semaphore(1)
connection = amqp.Connection('{0}:{1}'.
format(rabbitmq.host, rabbitmq.port),
virtual_host=rabbitmq.virtual_host,
userid=rabbitmq.userid,
password=rabbitmq.password,
ssl=rabbitmq.use_ssl, insist=True)
self.ch = connection.channel()
self.ch.exchange_declare('tasks', 'direct', durable=True,
auto_delete=False)
def index(self, request, environment_id): def index(self, request, environment_id):
log.debug(_('Session:List <EnvId: {0}>'.format(environment_id))) log.debug(_('Session:List <EnvId: {0}>'.format(environment_id)))
@ -121,14 +108,22 @@ class Controller(object):
session.state = 'deploying' session.state = 'deploying'
session.save(unit) session.save(unit)
#Set X-Auth-Tokenconductor for conductor #Set X-Auth-Token for conductor
env = session.description env = session.description
env['token'] = request.context.auth_token env['token'] = request.context.auth_token
with self.write_lock: connection = amqp.Connection('{0}:{1}'.
self.ch.basic_publish(Message(body=anyjson. format(rabbitmq.host, rabbitmq.port),
serialize(env)), virtual_host=rabbitmq.virtual_host,
'tasks', 'tasks') userid=rabbitmq.userid,
password=rabbitmq.password,
ssl=rabbitmq.use_ssl, insist=True)
channel = connection.channel()
channel.exchange_declare('tasks', 'direct', durable=True,
auto_delete=False)
channel.basic_publish(Message(body=anyjson.serialize(env)), 'tasks',
'tasks')
def create_resource(): def create_resource():

View File

@ -55,7 +55,7 @@ class TaskResultHandlerService(service.Service):
def handle_report(msg): def handle_report(msg):
log.debug(_('Got report message from orchestration engine:\n{0}'. log.debug(_('Got report message from orchestration engine:\n{0}'.
format(msg.body))) format(msg.body)))
params = anyjson.deserialize(msg.body) params = anyjson.deserialize(msg.body)
params['entity_id'] = params['id'] params['entity_id'] = params['id']
@ -76,10 +76,16 @@ def handle_report(msg):
def handle_result(msg): def handle_result(msg):
log.debug(_('Got result message from orchestration engine:\n{0}'. log.debug(_('Got result message from '
format(msg.body))) 'orchestration engine:\n{0}'.format(msg.body)))
environment_result = anyjson.deserialize(msg.body) environment_result = anyjson.deserialize(msg.body)
if environment_result['deleted']:
log.debug(_('Result for environment {0} is dropped. '
'Environment is deleted'.format(environment_result['id'])))
msg.channel.basic_ack(msg.delivery_tag)
return
session = get_session() session = get_session()
environment = session.query(Environment).get(environment_result['id']) environment = session.query(Environment).get(environment_result['id'])