Merge "switch to sqlalchemy core"

This commit is contained in:
Jenkins 2014-09-12 20:13:58 +00:00 committed by Gerrit Code Review
commit ae828f2686

View File

@ -28,6 +28,7 @@ from oslo.db.sqlalchemy import migration
from oslo.db.sqlalchemy import session as db_session
from oslo.utils import timeutils
import six
import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy import distinct
from sqlalchemy import func
@ -238,50 +239,58 @@ class Connection(base.Connection):
engine.dispose()
@staticmethod
def _create_meter(session, name, type, unit):
def _create_meter(conn, name, type, unit):
# TODO(gordc): implement lru_cache to improve performance
try:
nested = session.connection().dialect.name != 'sqlite'
with session.begin(nested=nested,
subtransactions=not nested):
obj = (session.query(models.Meter)
.filter(models.Meter.name == name)
.filter(models.Meter.type == type)
.filter(models.Meter.unit == unit).first())
if obj is None:
obj = models.Meter(name=name, type=type, unit=unit)
session.add(obj)
meter = models.Meter.__table__
trans = conn.begin_nested()
if conn.dialect.name == 'sqlite':
trans = conn.begin()
with trans:
meter_row = conn.execute(
sa.select([meter.c.id])
.where(sa.and_(meter.c.name == name,
meter.c.type == type,
meter.c.unit == unit))).first()
meter_id = meter_row[0] if meter_row else None
if meter_id is None:
result = conn.execute(meter.insert(), name=name,
type=type, unit=unit)
meter_id = result.inserted_primary_key[0]
except dbexc.DBDuplicateEntry:
# retry function to pick up duplicate committed object
obj = Connection._create_meter(session, name, type, unit)
meter_id = Connection._create_meter(conn, name, type, unit)
return obj
return meter_id
@staticmethod
def _create_resource(session, res_id, user_id, project_id, source_id,
def _create_resource(conn, res_id, user_id, project_id, source_id,
rmeta):
# TODO(gordc): implement lru_cache to improve performance
try:
nested = session.connection().dialect.name != 'sqlite'
m_hash = jsonutils.dumps(rmeta, sort_keys=True)
with session.begin(nested=nested,
subtransactions=not nested):
obj = (session.query(models.Resource.internal_id)
.filter(models.Resource.resource_id == res_id)
.filter(models.Resource.user_id == user_id)
.filter(models.Resource.project_id == project_id)
.filter(models.Resource.source_id == source_id)
.filter(models.Resource.metadata_hash ==
hashlib.md5(m_hash).hexdigest()).first())
obj_id = obj[0] if obj else None
if obj_id is None:
obj = models.Resource(resource_id=res_id, user_id=user_id,
res = models.Resource.__table__
m_hash = hashlib.md5(jsonutils.dumps(rmeta,
sort_keys=True)).hexdigest()
trans = conn.begin_nested()
if conn.dialect.name == 'sqlite':
trans = conn.begin()
with trans:
res_row = conn.execute(
sa.select([res.c.internal_id])
.where(sa.and_(res.c.resource_id == res_id,
res.c.user_id == user_id,
res.c.project_id == project_id,
res.c.source_id == source_id,
res.c.metadata_hash == m_hash))).first()
internal_id = res_row[0] if res_row else None
if internal_id is None:
result = conn.execute(res.insert(), resource_id=res_id,
user_id=user_id,
project_id=project_id,
source_id=source_id,
resource_metadata=rmeta)
session.add(obj)
session.flush()
obj_id = obj.internal_id
resource_metadata=rmeta,
metadata_hash=m_hash)
internal_id = result.inserted_primary_key[0]
if rmeta and isinstance(rmeta, dict):
meta_map = {}
for key, v in utils.dict_to_keyval(rmeta):
@ -290,21 +299,21 @@ class Connection(base.Connection):
if meta_map.get(_model) is None:
meta_map[_model] = []
meta_map[_model].append(
{'id': obj_id, 'meta_key': key,
{'id': internal_id, 'meta_key': key,
'value': v})
except KeyError:
LOG.warn(_("Unknown metadata type. Key (%s) "
"will not be queryable."), key)
"will not be queryable."), key)
for _model in meta_map.keys():
session.execute(_model.__table__.insert(),
meta_map[_model])
conn.execute(_model.__table__.insert(),
meta_map[_model])
except dbexc.DBDuplicateEntry:
# retry function to pick up duplicate committed object
obj_id = Connection._create_resource(session, res_id, user_id,
project_id, source_id, rmeta)
internal_id = Connection._create_resource(
conn, res_id, user_id, project_id, source_id, rmeta)
return obj_id
return internal_id
def record_metering_data(self, data):
"""Write the data to the backend storage system.
@ -312,27 +321,26 @@ class Connection(base.Connection):
:param data: a dictionary such as returned by
ceilometer.meter.meter_message_from_counter
"""
session = self._engine_facade.get_session()
with session.begin():
engine = self._engine_facade.get_engine()
with engine.begin() as conn:
# Record the raw data for the sample.
meter = self._create_meter(session,
data['counter_name'],
data['counter_type'],
data['counter_unit'])
res_id = self._create_resource(session,
m_id = self._create_meter(conn,
data['counter_name'],
data['counter_type'],
data['counter_unit'])
res_id = self._create_resource(conn,
data['resource_id'],
data['user_id'],
data['project_id'],
data['source'],
data['resource_metadata'])
sample = models.Sample(
meter_id=meter.id,
resource_id=res_id,
timestamp=data['timestamp'],
volume=data['counter_volume'],
message_signature=data['message_signature'],
message_id=data['message_id'])
session.add(sample)
sample = models.Sample.__table__
conn.execute(sample.insert(), meter_id=m_id,
resource_id=res_id,
timestamp=data['timestamp'],
volume=data['counter_volume'],
message_signature=data['message_signature'],
message_id=data['message_id'])
def clear_expired_metering_data(self, ttl):
"""Clear expired data from the backend storage system.