From 219d6c1b231be8b4a78fc607553780da5347e623 Mon Sep 17 00:00:00 2001 From: wanghao Date: Wed, 6 May 2015 17:55:39 +0800 Subject: [PATCH] Add sqlalchemy support to db Add sqlalchemy support to db Change-Id: Ibea0b3d71f05dbcd59311baa2e78b69bae616663 --- terracotta/db/api.py | 195 ++++++++++ terracotta/db/sqlalchemy/__init__.py | 0 terracotta/db/sqlalchemy/api.py | 341 ++++++++++++++++++ .../db/sqlalchemy/migrate_repo/__init__.py | 0 .../migrate_repo/versions/__init__.py | 0 terracotta/db/sqlalchemy/models.py | 84 +++++ 6 files changed, 620 insertions(+) create mode 100644 terracotta/db/sqlalchemy/__init__.py create mode 100644 terracotta/db/sqlalchemy/api.py create mode 100644 terracotta/db/sqlalchemy/migrate_repo/__init__.py create mode 100644 terracotta/db/sqlalchemy/migrate_repo/versions/__init__.py create mode 100644 terracotta/db/sqlalchemy/models.py diff --git a/terracotta/db/api.py b/terracotta/db/api.py index 0500aaf..83efd43 100644 --- a/terracotta/db/api.py +++ b/terracotta/db/api.py @@ -1,3 +1,4 @@ +<<<<<<< .mine # Copyright (c) 2015 - 2016 Huawei Technologies Co., Ltd. # All Rights Reserved. # @@ -190,3 +191,197 @@ def insert_vm_migration(vm, hostname): :param hostname: A host name. """ IMPL.insert_vm_migration(vm, hostname) +======= +# Copyright (c) 2015 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Defines interface for DB access. +""" + +from oslo_config import cfg +from oslo_db import concurrency as db_concurrency + + +CONF = cfg.CONF + +_BACKEND_MAPPING = {'sqlalchemy': 'terracotta.db.sqlalchemy.api'} + +IMPL = db_concurrency.TpoolDbapiWrapper(CONF, backend_mapping=_BACKEND_MAPPING) + + +def select_cpu_mhz_for_vm(uuid, limit): + """ Select n last values of CPU MHz for a VM UUID. + + :param uuid: The UUID of a VM. + :param limit: The number of last values to select. + :return: The list of n last CPU Mhz values. + """ + return IMPL.select_cpu_mhz_for_vm(uuid, limit) + + +def select_last_cpu_mhz_for_vms(): + """ Select the last value of CPU MHz for all the VMs. + + :return: A dict of VM UUIDs to the last CPU MHz values. + """ + return IMPL.select_last_cpu_mhz_for_vms() + + +def select_vm_id(uuid): + """ Select the ID of a VM by the VM UUID, or insert a new record. + + :param uuid: The UUID of a VM. + :return: The ID of the VM. + """ + return IMPL.select_vm_id(uuid) + + +def insert_vm_cpu_mhz(data): + """ Insert a set of CPU MHz values for a set of VMs. + + :param data: A dictionary of VM UUIDs and CPU MHz values. + """ + return IMPL.insert_vm_cpu_mhz() + + +def update_host(hostname, cpu_mhz, cpu_cores, ram): + """ Insert new or update the corresponding host record. + + :param hostname: A host name. + :param cpu_mhz: The total CPU frequency of the host in MHz. + :param cpu_cores: The number of physical CPU cores. + :param ram: The total amount of RAM of the host in MB. + :return: The ID of the host. + """ + return update_host(hostname, cpu_mhz, cpu_cores, ram) + + +def insert_host_cpu_mhz(hostname, cpu_mhz): + """ Insert a CPU MHz value for a host. + + :param hostname: A host name. + :param cpu_mhz: The CPU usage of the host in MHz. + """ + return IMPL.insert_host_cpu_mhz(hostname, cpu_mhz) + + +def select_cpu_mhz_for_host(hostname, limit): + """ Select n last values of CPU MHz for a host. + + :param hostname: A host name. + :param limit: The number of last values to select. + :return: The list of n last CPU Mhz values. + """ + return IMPL.select_cpu_mhz_for_host(hostname, limit) + + +def select_last_cpu_mhz_for_hosts(): + """ Select the last value of CPU MHz for all the hosts. + + :return: A dict of host names to the last CPU MHz values. + """ + return IMPL.select_last_cpu_mhz_for_hosts() + + +def select_host_characteristics(self): + """ Select the characteristics of all the hosts. + + :return: Three dicts of hostnames to CPU MHz, cores, and RAM. + """ + return IMPL.select_host_characteristics() + + +def select_host_id(hostname): + """ Select the ID of a host. + + :param hostname: A host name. + :return: The ID of the host. + """ + return IMPL.select_host_id(hostname) + + +def select_host_ids(): + """ Select the IDs of all the hosts. + + :return: A dict of host names to IDs. + """ + return IMPL.select_host_ids() + + +def cleanup_vm_resource_usage(datetime_threshold): + """ Delete VM resource usage data older than the threshold. + + :param datetime_threshold: A datetime threshold. + """ + IMPL.cleanup_vm_resource_usage(datetime_threshold) + + +def cleanup_host_resource_usage(sdatetime_threshold): + """ Delete host resource usage data older than the threshold. + + :param datetime_threshold: A datetime threshold. + """ + IMPL.cleanup_host_resource_usage() + + +def insert_host_states(hosts): + """ Insert host states for a set of hosts. + + :param hosts: A dict of hostnames to states (0, 1). + """ + IMPL.insert_host_states(hosts) + + +def select_host_states(): + """ Select the current states of all the hosts. + + :return: A dict of host names to states. + """ + return IMPL.select_host_states() + + +def select_active_hosts(): + """ Select the currently active hosts. + + :return: A list of host names. + """ + return IMPL.select_active_hosts() + + +def select_inactive_hosts(): + """ Select the currently inactive hosts. + + :return: A list of host names. + """ + return IMPL.select_inactive_hosts() + + +def insert_host_overload(hostname, overload): + """ Insert whether a host is overloaded. + + :param hostname: A host name. + :param overload: Whether the host is overloaded. + """ + IMPL.insert_host_overload(hostname, overload) + + +def insert_vm_migration(vm, hostname): + """ Insert a VM migration. + + :param hostname: A VM UUID. + :param hostname: A host name. + """ + IMPL.insert_vm_migration(vm, hostname) +>>>>>>> .theirs diff --git a/terracotta/db/sqlalchemy/__init__.py b/terracotta/db/sqlalchemy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/terracotta/db/sqlalchemy/api.py b/terracotta/db/sqlalchemy/api.py new file mode 100644 index 0000000..ee5985f --- /dev/null +++ b/terracotta/db/sqlalchemy/api.py @@ -0,0 +1,341 @@ +# Copyright (c) 2015 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Implementation of SQLAlchemy backend.""" + +import sys +import threading + +from oslo_config import cfg +from oslo_db import exception as db_exc +from oslo_db import options +from oslo_db.sqlalchemy import session as db_session +from oslo_log import log as logging + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + +options.set_defaults(CONF, connection='sqlite:///$state_path/cinder.sqlite') + +_LOCK = threading.Lock() +_FACADE = None + + +def _create_facade_lazily(): + global _LOCK + with _LOCK: + global _FACADE + if _FACADE is None: + _FACADE = db_session.EngineFacade( + CONF.database.connection, + **dict(CONF.database.iteritems()) + ) + + return _FACADE + + +def get_engine(): + facade = _create_facade_lazily() + return facade.get_engine() + + +def get_session(**kwargs): + facade = _create_facade_lazily() + return facade.get_session(**kwargs) + + +def dispose_engine(): + get_engine().dispose() + + +def get_backend(): + """The backend is this module itself.""" + + return sys.modules[__name__] + + +def select_cpu_mhz_for_vm(self, uuid, n): + """ Select n last values of CPU MHz for a VM UUID. + + :param uuid: The UUID of a VM. + :param n: The number of last values to select. + :return: The list of n last CPU Mhz values. + """ + sel = select([self.vm_resource_usage.c.cpu_mhz]). \ + where(and_( + self.vms.c.id == self.vm_resource_usage.c.vm_id, + self.vms.c.uuid == uuid)). \ + order_by(self.vm_resource_usage.c.id.desc()). \ + limit(n) + res = self.connection.execute(sel).fetchall() + return list(reversed([int(x[0]) for x in res])) + +def select_last_cpu_mhz_for_vms(self): + """ Select the last value of CPU MHz for all the VMs. + + :return: A dict of VM UUIDs to the last CPU MHz values. + """ + vru1 = self.vm_resource_usage + vru2 = self.vm_resource_usage.alias() + sel = select([vru1.c.vm_id, vru1.c.cpu_mhz], from_obj=[ + vru1.outerjoin(vru2, and_( + vru1.c.vm_id == vru2.c.vm_id, + vru1.c.id < vru2.c.id))]). \ + where(vru2.c.id == None) + vms_cpu_mhz = dict(self.connection.execute(sel).fetchall()) + vms_uuids = dict(self.vms.select().execute().fetchall()) + + vms_last_mhz = {} + for id, uuid in vms_uuids.items(): + if id in vms_cpu_mhz: + vms_last_mhz[str(uuid)] = int(vms_cpu_mhz[id]) + else: + vms_last_mhz[str(uuid)] = 0 + return vms_last_mhz + +def select_vm_id(self, uuid): + """ Select the ID of a VM by the VM UUID, or insert a new record. + + :param uuid: The UUID of a VM. + :return: The ID of the VM. + """ + sel = select([self.vms.c.id]).where(self.vms.c.uuid == uuid) + row = self.connection.execute(sel).fetchone() + if row is None: + id = self.vms.insert().execute(uuid=uuid).inserted_primary_key[0] + LOG.info('Created a new DB record for a VM %s, id=%d', uuid, id) + return int(id) + else: + return int(row['id']) + +def insert_vm_cpu_mhz(self, data): + """ Insert a set of CPU MHz values for a set of VMs. + + :param data: A dictionary of VM UUIDs and CPU MHz values. + """ + if data: + query = [] + for uuid, cpu_mhz in data.items(): + vm_id = self.select_vm_id(uuid) + query.append({'vm_id': vm_id, + 'cpu_mhz': cpu_mhz}) + self.vm_resource_usage.insert().execute(query) + +def update_host(self, hostname, cpu_mhz, cpu_cores, ram): + """ Insert new or update the corresponding host record. + + :param hostname: A host name. + :param cpu_mhz: The total CPU frequency of the host in MHz. + :param cpu_cores: The number of physical CPU cores. + :param ram: The total amount of RAM of the host in MB. + :return: The ID of the host. + """ + sel = select([self.hosts.c.id]). \ + where(self.hosts.c.hostname == hostname) + row = self.connection.execute(sel).fetchone() + if row is None: + id = self.hosts.insert().execute( + hostname=hostname, + cpu_mhz=cpu_mhz, + cpu_cores=cpu_cores, + ram=ram).inserted_primary_key[0] + LOG.info('Created a new DB record for a host %s, id=%d', + hostname, id) + return int(id) + else: + self.connection.execute(self.hosts.update(). + where(self.hosts.c.id == row['id']). + values(cpu_mhz=cpu_mhz, + cpu_cores=cpu_cores, + ram=ram)) + return int(row['id']) + +def insert_host_cpu_mhz(self, hostname, cpu_mhz): + """ Insert a CPU MHz value for a host. + + :param hostname: A host name. + :param cpu_mhz: The CPU usage of the host in MHz. + """ + self.host_resource_usage.insert().execute( + host_id=self.select_host_id(hostname), + cpu_mhz=cpu_mhz) + +def select_cpu_mhz_for_host(self, hostname, n): + """ Select n last values of CPU MHz for a host. + + :param hostname: A host name. + :param n: The number of last values to select. + :return: The list of n last CPU Mhz values. + """ + sel = select([self.host_resource_usage.c.cpu_mhz]). \ + where(and_( + self.hosts.c.id == self.host_resource_usage.c.host_id, + self.hosts.c.hostname == hostname)). \ + order_by(self.host_resource_usage.c.id.desc()). \ + limit(n) + res = self.connection.execute(sel).fetchall() + return list(reversed([int(x[0]) for x in res])) + +def select_last_cpu_mhz_for_hosts(self): + """ Select the last value of CPU MHz for all the hosts. + + :return: A dict of host names to the last CPU MHz values. + """ + hru1 = self.host_resource_usage + hru2 = self.host_resource_usage.alias() + sel = select([hru1.c.host_id, hru1.c.cpu_mhz], from_obj=[ + hru1.outerjoin(hru2, and_( + hru1.c.host_id == hru2.c.host_id, + hru1.c.id < hru2.c.id))]). \ + where(hru2.c.id == None) + hosts_cpu_mhz = dict(self.connection.execute(sel).fetchall()) + + sel = select([self.hosts.c.id, self.hosts.c.hostname]) + hosts_names = dict(self.connection.execute(sel).fetchall()) + + hosts_last_mhz = {} + for id, hostname in hosts_names.items(): + if id in hosts_cpu_mhz: + hosts_last_mhz[str(hostname)] = int(hosts_cpu_mhz[id]) + else: + hosts_last_mhz[str(hostname)] = 0 + return hosts_last_mhz + +def select_host_characteristics(self): + """ Select the characteristics of all the hosts. + + :return: Three dicts of hostnames to CPU MHz, cores, and RAM. + """ + hosts_cpu_mhz = {} + hosts_cpu_cores = {} + hosts_ram = {} + for x in self.hosts.select().execute().fetchall(): + hostname = str(x[1]) + hosts_cpu_mhz[hostname] = int(x[2]) + hosts_cpu_cores[hostname] = int(x[3]) + hosts_ram[hostname] = int(x[4]) + return hosts_cpu_mhz, hosts_cpu_cores, hosts_ram + +def select_host_id(self, hostname): + """ Select the ID of a host. + + :param hostname: A host name. + :return: The ID of the host. + """ + sel = select([self.hosts.c.id]). \ + where(self.hosts.c.hostname == hostname) + row = self.connection.execute(sel).fetchone() + if not row: + raise LookupError('No host found for hostname: %s', hostname) + return int(row['id']) + +def select_host_ids(self): + """ Select the IDs of all the hosts. + + :return: A dict of host names to IDs. + """ + return dict((str(x[1]), int(x[0])) + for x in self.hosts.select().execute().fetchall()) + +def cleanup_vm_resource_usage(self, datetime_threshold): + """ Delete VM resource usage data older than the threshold. + + :param datetime_threshold: A datetime threshold. + """ + self.connection.execute( + self.vm_resource_usage.delete().where( + self.vm_resource_usage.c.timestamp < datetime_threshold)) + +def cleanup_host_resource_usage(self, datetime_threshold): + """ Delete host resource usage data older than the threshold. + + :param datetime_threshold: A datetime threshold. + """ + self.connection.execute( + self.host_resource_usage.delete().where( + self.host_resource_usage.c.timestamp < datetime_threshold)) + +def insert_host_states(self, hosts): + """ Insert host states for a set of hosts. + + :param hosts: A dict of hostnames to states (0, 1). + """ + host_ids = self.select_host_ids() + to_insert = [{'host_id': host_ids[k], + 'state': v} + for k, v in hosts.items()] + self.connection.execute( + self.host_states.insert(), to_insert) + +def select_host_states(self): + """ Select the current states of all the hosts. + + :return: A dict of host names to states. + """ + hs1 = self.host_states + hs2 = self.host_states.alias() + sel = select([hs1.c.host_id, hs1.c.state], from_obj=[ + hs1.outerjoin(hs2, and_( + hs1.c.host_id == hs2.c.host_id, + hs1.c.id < hs2.c.id))]). \ + where(hs2.c.id == None) + data = dict(self.connection.execute(sel).fetchall()) + host_ids = self.select_host_ids() + host_states = {} + for host, id in host_ids.items(): + if id in data: + host_states[str(host)] = int(data[id]) + else: + host_states[str(host)] = 1 + return host_states + +def select_active_hosts(self): + """ Select the currently active hosts. + + :return: A list of host names. + """ + return [host + for host, state in self.select_host_states().items() + if state == 1] + +def select_inactive_hosts(self): + """ Select the currently inactive hosts. + + :return: A list of host names. + """ + return [host + for host, state in self.select_host_states().items() + if state == 0] + +def insert_host_overload(self, hostname, overload): + """ Insert whether a host is overloaded. + + :param hostname: A host name. + :param overload: Whether the host is overloaded. + """ + self.host_overload.insert().execute( + host_id=self.select_host_id(hostname), + overload=int(overload)) + +def insert_vm_migration(self, vm, hostname): + """ Insert a VM migration. + + :param hostname: A VM UUID. + :param hostname: A host name. + """ + self.vm_migrations.insert().execute( + vm_id=self.select_vm_id(vm), + host_id=self.select_host_id(hostname)) diff --git a/terracotta/db/sqlalchemy/migrate_repo/__init__.py b/terracotta/db/sqlalchemy/migrate_repo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/terracotta/db/sqlalchemy/migrate_repo/versions/__init__.py b/terracotta/db/sqlalchemy/migrate_repo/versions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/terracotta/db/sqlalchemy/models.py b/terracotta/db/sqlalchemy/models.py new file mode 100644 index 0000000..4af983a --- /dev/null +++ b/terracotta/db/sqlalchemy/models.py @@ -0,0 +1,84 @@ +# Copyright (c) 2015 Huawei Technologies Co., Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +SQLAlchemy models for cinder data. +""" + +from oslo_config import cfg +from oslo_db.sqlalchemy import models +from oslo_utils import timeutils +from sqlalchemy import Column, Integer, String, Text, schema +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import ForeignKey, DateTime, Boolean +from sqlalchemy.orm import relationship, backref, validates + + +CONF = cfg.CONF +BASE = declarative_base() + +class TerracottaBase(models.TimestampMixin, + models.ModelBase): + """Base class for TerracottaBase Models.""" + + __table_args__ = {'mysql_engine': 'InnoDB'} + + deleted_at = Column(DateTime) + deleted = Column(Boolean, default=False) + metadata = None + + def delete(self, session): + """Delete this object.""" + self.deleted = True + self.deleted_at = timeutils.utcnow() + self.save(session=session) + + +class Host(BASE, TerracottaBase): + __tablename__ = 'hosts' + id = Column(Integer, primary_key=True) + host_name = Column(String(255)) + cpu_mhz = Column(String(255)) + cpu_cores = Column(String(255)) + cpu_vendor = Column(String(255)) + ram = Column(Integer, nullable=False, default=0) + disabled = Column(Boolean, default=False) + disabled_reason = Column(String(255)) + + +class HostResourceUsage(BASE, TerracottaBase): + __tablename__ = 'host_resource_usage' + id = Column(Integer, primary_key=True) + host_id = Column(String(255)) + cpu_mhz = Column(String(255)) + + host = relationship(Host, backref="host_resource_usage", + foreign_keys=host_id, + primaryjoin='HostResourceUsage.host_id == Host.id') + +class VM(BASE, TerracottaBase): + __tablename__ = 'vms' + id = Column(Integer, primary_key=True) + uuid = Column(String(36), nullable=False) + + +class VmResourceUsage(BASE, TerracottaBase): + __tablename__ = 'vm_resource_usage' + id = Column(Integer, primary_key=True) + vm_id = Column(String(255)) + cpu_mhz = Column(String(255)) + + host = relationship(Host, backref="vm_resource_usage", + foreign_keys=vm_id, + primaryjoin='VmResourceUsage.vm_id == VM.id')