127 lines
4.4 KiB
Python
127 lines
4.4 KiB
Python
# Copyright (c) 2014 ITOCHU Techno-Solutions Corporation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import cPickle
|
|
import json
|
|
import logging
|
|
import pika
|
|
import requests
|
|
|
|
from rackclient import exceptions
|
|
from rackclient.client import Client
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
LOG.setLevel(logging.DEBUG)
|
|
|
|
META_URL = 'http://169.254.169.254/openstack/latest/meta_data.json'
|
|
|
|
|
|
def get_rack_context(proxy_port=8088, client_version="1", api_version="v1"):
|
|
|
|
try:
|
|
resp = requests.get(META_URL)
|
|
metadata = json.loads(resp.text)["meta"]
|
|
proxy_url = 'http://%s:%d/%s' % (
|
|
metadata["proxy_ip"], proxy_port, api_version)
|
|
client = Client(client_version, rack_url=proxy_url)
|
|
proxy_info = client.proxy.get(metadata["gid"])
|
|
|
|
rack_ctx = type('', (object,), metadata)
|
|
rack_ctx.ppid = getattr(rack_ctx, "ppid", "")
|
|
rack_ctx.client = client
|
|
rack_ctx.fs_endpoint = proxy_info.fs_endpoint
|
|
rack_ctx.ipc_endpoint = proxy_info.ipc_endpoint
|
|
rack_ctx.shm_endpoint = proxy_info.shm_endpoint
|
|
|
|
try:
|
|
# Check if this process is not recognized by RACK.
|
|
rack_ctx.client.processes.get(rack_ctx.gid, rack_ctx.pid)
|
|
except exceptions.NotFound:
|
|
msg = "This process is not recognized by RACK"
|
|
raise exceptions.InvalidProcessError(msg)
|
|
|
|
if rack_ctx.ppid:
|
|
LOG.debug("Messaging: send message to %s", rack_ctx.ppid)
|
|
msg = _Messaging(rack_ctx)
|
|
msg.send_msg(rack_ctx.ppid)
|
|
while True:
|
|
receive_msg = msg.receive_msg(
|
|
getattr(rack_ctx, "msg_limit_time", 180))
|
|
if receive_msg and rack_ctx.ppid == receive_msg.get("pid"):
|
|
LOG.debug(
|
|
"Messaging: receive message from %s",
|
|
receive_msg.get("pid"))
|
|
break
|
|
|
|
except Exception as e:
|
|
msg = "Failed to initialize the process: %s." % e.message
|
|
raise exceptions.ProcessInitError(msg)
|
|
|
|
return rack_ctx
|
|
|
|
|
|
class _Messaging(object):
|
|
|
|
def __init__(self, ctx):
|
|
self.ctx = ctx
|
|
connection_param = pika.ConnectionParameters(self.ctx.proxy_ip)
|
|
if self.ctx.ipc_endpoint:
|
|
connection_param = pika.ConnectionParameters(self.ctx.ipc_endpoint)
|
|
self.connection = pika.BlockingConnection(connection_param)
|
|
self.channel = self.connection.channel()
|
|
self.channel.exchange_declare(exchange=ctx.gid, type='topic')
|
|
self.channel.queue_declare(queue=ctx.pid)
|
|
self.channel.queue_bind(
|
|
exchange=ctx.gid,
|
|
queue=ctx.pid,
|
|
routing_key=ctx.gid + '.' + ctx.pid)
|
|
|
|
def receive_msg(self, timeout_limit=180):
|
|
queue_name = self.ctx.pid
|
|
self.channel = self.connection.channel()
|
|
receive = self.Receive()
|
|
self.connection.add_timeout(
|
|
deadline=int(timeout_limit),
|
|
callback_method=receive.time_out)
|
|
self.channel.basic_consume(
|
|
receive.get_msg,
|
|
queue=queue_name,
|
|
no_ack=False)
|
|
receive.channel = self.channel
|
|
self.channel.start_consuming()
|
|
return receive.message
|
|
|
|
def send_msg(self, target):
|
|
routing_key = self.ctx.gid + '.' + target
|
|
send_dict = {'pid': self.ctx.pid}
|
|
send_msg = cPickle.dumps(send_dict)
|
|
self.channel.basic_publish(
|
|
exchange=self.ctx.gid,
|
|
routing_key=routing_key,
|
|
body=send_msg)
|
|
|
|
class Receive(object):
|
|
|
|
def __init__(self):
|
|
self.channel = None
|
|
self.message = None
|
|
|
|
def get_msg(self, ch, method, properties, body):
|
|
self.message = cPickle.loads(body)
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
ch.stop_consuming()
|
|
|
|
def time_out(self):
|
|
self.channel.stop_consuming()
|