api serve rnow uses gear library; remove yelp's gearman
Change-Id: Iaa81fc34161992a98a9449b8c58e80484d048d16
This commit is contained in:
parent
771f6bd76d
commit
dc82de4a3e
@ -13,19 +13,22 @@
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import gear
|
||||
import json
|
||||
|
||||
eventlet.monkey_patch()
|
||||
import ipaddress
|
||||
from libra.common.json_gearman import JSONGearmanClient
|
||||
from libra.common.api.lbaas import LoadBalancer, db_session, Device, Node, Vip
|
||||
from libra.common.api.lbaas import HealthMonitor, Counters
|
||||
from libra.common.api.lbaas import loadbalancers_devices
|
||||
from libra.common.api.mnb import update_mnb
|
||||
from libra.openstack.common import log
|
||||
from pecan import conf
|
||||
|
||||
from time import sleep
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
POLL_COUNT = 10
|
||||
POLL_SLEEP = 10
|
||||
|
||||
gearman_workers = [
|
||||
'UPDATE', # Create/Update a Load Balancer.
|
||||
@ -39,6 +42,17 @@ gearman_workers = [
|
||||
]
|
||||
|
||||
|
||||
class DisconnectClient(gear.Client):
|
||||
def handleDisconnect(self, job):
|
||||
job.disconnect = True
|
||||
|
||||
|
||||
class DisconnectJob(gear.Job):
|
||||
def __init__(self, name, arguments):
|
||||
super(DisconnectJob, self).__init__(name, arguments)
|
||||
self.disconnect = False
|
||||
|
||||
|
||||
def submit_job(job_type, host, data, lbid):
|
||||
eventlet.spawn_n(client_job, job_type, str(host), data, lbid)
|
||||
|
||||
@ -122,19 +136,15 @@ class GearmanClientThread(object):
|
||||
self.host = host
|
||||
self.lbid = lbid
|
||||
|
||||
server_list = []
|
||||
self.gear_client = DisconnectClient()
|
||||
|
||||
for server in conf.gearman.server:
|
||||
ghost, gport = server.split(':')
|
||||
server_list.append({'host': ghost,
|
||||
'port': int(gport),
|
||||
'keyfile': conf.gearman.ssl_key,
|
||||
'certfile': conf.gearman.ssl_cert,
|
||||
'ca_certs': conf.gearman.ssl_ca,
|
||||
'keepalive': conf.gearman.keepalive,
|
||||
'keepcnt': conf.gearman.keepcnt,
|
||||
'keepidle': conf.gearman.keepidle,
|
||||
'keepintvl': conf.gearman.keepintvl})
|
||||
self.gearman_client = JSONGearmanClient(server_list)
|
||||
self.gear_client.addServer(ghost,
|
||||
int(gport),
|
||||
conf.gearman.ssl_key,
|
||||
conf.gearman.ssl_cert,
|
||||
conf.gearman.ssl_ca)
|
||||
|
||||
def send_assign(self, data):
|
||||
NULL = None # For pep8
|
||||
@ -522,28 +532,40 @@ class GearmanClientThread(object):
|
||||
mnb_data["tenantid"])
|
||||
|
||||
def _send_message(self, message, response_name):
|
||||
job_status = self.gearman_client.submit_job(
|
||||
self.host, message, background=False, wait_until_complete=True,
|
||||
max_retries=10, poll_timeout=120.0
|
||||
)
|
||||
if job_status.state == 'UNKNOWN':
|
||||
# Gearman server connection failed
|
||||
LOG.error('Could not talk to gearman server')
|
||||
return False, "System error communicating with load balancer"
|
||||
if job_status.timed_out:
|
||||
# Job timed out
|
||||
LOG.warning(
|
||||
'Gearman timeout talking to {0}'.format(self.host)
|
||||
)
|
||||
|
||||
self.gear_client.waitForServer()
|
||||
|
||||
job = DisconnectJob(self.host, json.dumps(message))
|
||||
|
||||
self.gear_client.submitJob(job)
|
||||
|
||||
pollcount = 0
|
||||
# Would like to make these config file settings
|
||||
while not job.complete and pollcount < POLL_COUNT:
|
||||
sleep(POLL_SLEEP)
|
||||
pollcount += 1
|
||||
|
||||
if job.disconnect:
|
||||
LOG.error('Gearman Job server fail - disconnect')
|
||||
return False, "Gearman Job server fail - "\
|
||||
"disconnect communicating with load balancer"
|
||||
|
||||
# We timed out waiting for the job to finish
|
||||
if not job.complete:
|
||||
LOG.warning('Gearman timeout talking to {0}'.format(self.host))
|
||||
return False, "Timeout error communicating with load balancer"
|
||||
LOG.debug(job_status.result)
|
||||
if 'badRequest' in job_status.result:
|
||||
error = job_status.result['badRequest']['validationErrors']
|
||||
|
||||
result = json.loads(job.data[0])
|
||||
|
||||
LOG.debug(result)
|
||||
|
||||
if 'badRequest' in result:
|
||||
error = result['badRequest']['validationErrors']
|
||||
return False, error['message']
|
||||
if job_status.result[response_name] == 'FAIL':
|
||||
if result[response_name] == 'FAIL':
|
||||
# Worker says 'no'
|
||||
if 'hpcs_error' in job_status.result:
|
||||
error = job_status.result['hpcs_error']
|
||||
if 'hpcs_error' in result:
|
||||
error = result['hpcs_error']
|
||||
else:
|
||||
error = 'Load Balancer error'
|
||||
LOG.error(
|
||||
@ -551,4 +573,4 @@ class GearmanClientThread(object):
|
||||
)
|
||||
return False, error
|
||||
LOG.info('Gearman success from {0}'.format(self.host))
|
||||
return True, job_status.result
|
||||
return True, result
|
@ -12,35 +12,9 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from gearman import GearmanClient, GearmanWorker, DataEncoder
|
||||
import json
|
||||
import gear
|
||||
|
||||
|
||||
class JSONDataEncoder(DataEncoder):
|
||||
""" Class to transform data that the worker either receives or sends. """
|
||||
|
||||
@classmethod
|
||||
def encode(cls, encodable_object):
|
||||
""" Encode JSON object as string """
|
||||
return json.dumps(encodable_object)
|
||||
|
||||
@classmethod
|
||||
def decode(cls, decodable_string):
|
||||
""" Decode string to JSON object """
|
||||
return json.loads(decodable_string)
|
||||
|
||||
|
||||
class JSONGearmanWorker(GearmanWorker):
|
||||
""" Overload the Gearman worker class so we can set the data encoder. """
|
||||
data_encoder = JSONDataEncoder
|
||||
|
||||
|
||||
class JSONGearmanClient(GearmanClient):
|
||||
""" Overload the Gearman client class so we can set the data encoder. """
|
||||
data_encoder = JSONDataEncoder
|
||||
|
||||
|
||||
# Here is the good stuff
|
||||
class JsonJob(gear.Job):
|
||||
def __init__(self, name, msg, unique=None):
|
||||
|
@ -4,7 +4,6 @@ Babel>=0.9.6
|
||||
eventlet
|
||||
# put back once it's patched
|
||||
# gear
|
||||
gearman>=2.0.2
|
||||
oslo.config>=1.2.0
|
||||
python-daemon>=1.6
|
||||
python_novaclient>=2.14.1,<2.14.2
|
||||
|
Loading…
x
Reference in New Issue
Block a user