support time to live on event database for MongoDB
event database will grow gradually, currently, even though we can dump data to 3rd storage, the old data cannot be cleaned. This patch adds a new option [database].event_time_to_live, and create ttl index on event collection for MongoDB backend. Also change previous [database].time_to_live to precise [database].metering_time_to_live. To enable test_bin, this patch also simply implements log:// partially implements blueprint event-database-ttl DocImpact Change-Id: I0d67cf212f37d3fa5555870a8e7cbb65af205d63
This commit is contained in:
parent
5a6aeb3bf0
commit
ef3d5612ab
@ -34,10 +34,21 @@ def dbsync():
|
|||||||
|
|
||||||
def expirer():
|
def expirer():
|
||||||
service.prepare_service()
|
service.prepare_service()
|
||||||
if cfg.CONF.database.time_to_live > 0:
|
|
||||||
|
if cfg.CONF.database.metering_time_to_live > 0:
|
||||||
LOG.debug(_("Clearing expired metering data"))
|
LOG.debug(_("Clearing expired metering data"))
|
||||||
storage_conn = storage.get_connection_from_config(cfg.CONF)
|
storage_conn = storage.get_connection_from_config(cfg.CONF)
|
||||||
storage_conn.clear_expired_metering_data(
|
storage_conn.clear_expired_metering_data(
|
||||||
cfg.CONF.database.time_to_live)
|
cfg.CONF.database.metering_time_to_live)
|
||||||
else:
|
else:
|
||||||
LOG.info(_("Nothing to clean, database time to live is disabled"))
|
LOG.info(_("Nothing to clean, database metering time to live "
|
||||||
|
"is disabled"))
|
||||||
|
|
||||||
|
if cfg.CONF.database.event_time_to_live > 0:
|
||||||
|
LOG.debug(_("Clearing expired event data"))
|
||||||
|
event_conn = storage.get_connection_from_config(cfg.CONF, 'event')
|
||||||
|
event_conn.clear_expired_event_data(
|
||||||
|
cfg.CONF.database.event_time_to_live)
|
||||||
|
else:
|
||||||
|
LOG.info(_("Nothing to clean, database event time to live "
|
||||||
|
"is disabled"))
|
||||||
|
@ -87,3 +87,13 @@ class Connection(object):
|
|||||||
This is needed to evaluate the performance of each driver.
|
This is needed to evaluate the performance of each driver.
|
||||||
"""
|
"""
|
||||||
return cls.STORAGE_CAPABILITIES
|
return cls.STORAGE_CAPABILITIES
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def clear_expired_event_data(ttl):
|
||||||
|
"""Clear expired data from the backend storage system.
|
||||||
|
|
||||||
|
Clearing occurs according to the time-to-live.
|
||||||
|
|
||||||
|
:param ttl: Number of seconds to keep records for.
|
||||||
|
"""
|
||||||
|
raise ceilometer.NotImplementedError('Clearing events not implemented')
|
||||||
|
@ -9,9 +9,24 @@
|
|||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.i
|
# under the License.
|
||||||
|
|
||||||
from ceilometer.event.storage import base
|
from ceilometer.event.storage import base
|
||||||
|
from ceilometer.i18n import _LI
|
||||||
|
from ceilometer.openstack.common import log
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Connection(base.Connection):
|
class Connection(base.Connection):
|
||||||
"""Log event data."""
|
"""Log event data."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def clear_expired_event_data(ttl):
|
||||||
|
"""Clear expired data from the backend storage system.
|
||||||
|
|
||||||
|
Clearing occurs according to the time-to-live.
|
||||||
|
|
||||||
|
:param ttl: Number of seconds to keep records for.
|
||||||
|
"""
|
||||||
|
LOG.info(_LI("Dropping event data with TTL %d"), ttl)
|
||||||
|
@ -11,12 +11,18 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
"""MongoDB storage backend"""
|
"""MongoDB storage backend"""
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
import pymongo
|
import pymongo
|
||||||
|
|
||||||
from ceilometer.event.storage import pymongo_base
|
from ceilometer.event.storage import pymongo_base
|
||||||
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer import storage
|
from ceilometer import storage
|
||||||
|
from ceilometer.storage import impl_mongodb
|
||||||
from ceilometer.storage.mongo import utils as pymongo_utils
|
from ceilometer.storage.mongo import utils as pymongo_utils
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Connection(pymongo_base.Connection):
|
class Connection(pymongo_base.Connection):
|
||||||
"""Put the event data into a MongoDB database."""
|
"""Put the event data into a MongoDB database."""
|
||||||
@ -46,7 +52,24 @@ class Connection(pymongo_base.Connection):
|
|||||||
# needed.
|
# needed.
|
||||||
self.upgrade()
|
self.upgrade()
|
||||||
|
|
||||||
|
def upgrade(self):
|
||||||
|
# Establish indexes
|
||||||
|
ttl = cfg.CONF.database.event_time_to_live
|
||||||
|
impl_mongodb.Connection.update_ttl(ttl, 'event_ttl', 'timestamp',
|
||||||
|
self.db.event)
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
self.conn.drop_database(self.db.name)
|
self.conn.drop_database(self.db.name)
|
||||||
# Connection will be reopened automatically if needed
|
# Connection will be reopened automatically if needed
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def clear_expired_event_data(ttl):
|
||||||
|
"""Clear expired data from the backend storage system.
|
||||||
|
|
||||||
|
Clearing occurs according to the time-to-live.
|
||||||
|
|
||||||
|
:param ttl: Number of seconds to keep records for.
|
||||||
|
"""
|
||||||
|
LOG.debug("Clearing expired event data is based on native "
|
||||||
|
"MongoDB time to live feature and going in background.")
|
||||||
|
@ -40,10 +40,16 @@ cfg.CONF.register_opts(OLD_OPTS)
|
|||||||
|
|
||||||
|
|
||||||
OPTS = [
|
OPTS = [
|
||||||
cfg.IntOpt('time_to_live',
|
cfg.IntOpt('metering_time_to_live',
|
||||||
default=-1,
|
default=-1,
|
||||||
help="Number of seconds that samples are kept "
|
help="Number of seconds that samples are kept "
|
||||||
"in the database for (<= 0 means forever)."),
|
"in the database for (<= 0 means forever).",
|
||||||
|
deprecated_opts=[cfg.DeprecatedOpt('time_to_live',
|
||||||
|
'database')]),
|
||||||
|
cfg.IntOpt('event_time_to_live',
|
||||||
|
default=-1,
|
||||||
|
help=("Number of seconds that events are kept "
|
||||||
|
"in the database for (<= 0 means forever).")),
|
||||||
cfg.StrOpt('metering_connection',
|
cfg.StrOpt('metering_connection',
|
||||||
default=None,
|
default=None,
|
||||||
help='The connection string used to connect to the metering '
|
help='The connection string used to connect to the metering '
|
||||||
|
@ -49,7 +49,7 @@ class Connection(base.Connection):
|
|||||||
Clearing occurs according to the time-to-live.
|
Clearing occurs according to the time-to-live.
|
||||||
:param ttl: Number of seconds to keep records for.
|
:param ttl: Number of seconds to keep records for.
|
||||||
"""
|
"""
|
||||||
LOG.info(_("Dropping data with TTL %d"), ttl)
|
LOG.info(_("Dropping metering data with TTL %d"), ttl)
|
||||||
|
|
||||||
def get_resources(self, user=None, project=None, source=None,
|
def get_resources(self, user=None, project=None, source=None,
|
||||||
start_timestamp=None, start_timestamp_op=None,
|
start_timestamp=None, start_timestamp_op=None,
|
||||||
|
@ -410,14 +410,14 @@ class Connection(pymongo_base.Connection):
|
|||||||
self.upgrade()
|
self.upgrade()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def update_ttl(ttl_index_name, index_field, coll):
|
def update_ttl(ttl, ttl_index_name, index_field, coll):
|
||||||
"""Update or ensure time_to_live indexes.
|
"""Update or ensure time_to_live indexes.
|
||||||
|
|
||||||
|
:param ttl: time to live in seconds.
|
||||||
:param ttl_index_name: name of the index we want to update or ensure.
|
:param ttl_index_name: name of the index we want to update or ensure.
|
||||||
:param index_field: field with the index that we need to update.
|
:param index_field: field with the index that we need to update.
|
||||||
:param coll: collection which indexes need to be updated.
|
:param coll: collection which indexes need to be updated.
|
||||||
"""
|
"""
|
||||||
ttl = cfg.CONF.database.time_to_live
|
|
||||||
indexes = coll.index_information()
|
indexes = coll.index_information()
|
||||||
if ttl <= 0:
|
if ttl <= 0:
|
||||||
if ttl_index_name in indexes:
|
if ttl_index_name in indexes:
|
||||||
@ -471,8 +471,9 @@ class Connection(pymongo_base.Connection):
|
|||||||
self.db.project.drop()
|
self.db.project.drop()
|
||||||
|
|
||||||
# update or ensure time_to_live index
|
# update or ensure time_to_live index
|
||||||
self.update_ttl('meter_ttl', 'timestamp', self.db.meter)
|
ttl = cfg.CONF.database.metering_time_to_live
|
||||||
self.update_ttl('resource_ttl', 'last_sample_timestamp',
|
self.update_ttl(ttl, 'meter_ttl', 'timestamp', self.db.meter)
|
||||||
|
self.update_ttl(ttl, 'resource_ttl', 'last_sample_timestamp',
|
||||||
self.db.resource)
|
self.db.resource)
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
|
@ -86,33 +86,50 @@ class MongoDBTestMarkerBase(test_storage_scenarios.DBTestBase,
|
|||||||
@tests_db.run_with('mongodb')
|
@tests_db.run_with('mongodb')
|
||||||
class IndexTest(tests_db.TestBase,
|
class IndexTest(tests_db.TestBase,
|
||||||
tests_db.MixinTestsWithBackendScenarios):
|
tests_db.MixinTestsWithBackendScenarios):
|
||||||
def test_meter_ttl_index_absent(self):
|
|
||||||
# create a fake index and check it is deleted
|
|
||||||
self.conn.db.meter.ensure_index('foo', name='meter_ttl')
|
|
||||||
self.CONF.set_override('time_to_live', -1, group='database')
|
|
||||||
self.conn.upgrade()
|
|
||||||
self.assertTrue(self.conn.db.meter.ensure_index('foo',
|
|
||||||
name='meter_ttl'))
|
|
||||||
self.conn.db.meter.drop_index('meter_ttl')
|
|
||||||
|
|
||||||
self.CONF.set_override('time_to_live', 456789, group='database')
|
def _test_ttl_index_absent(self, conn, coll_name, ttl_opt):
|
||||||
self.conn.upgrade()
|
# create a fake index and check it is deleted
|
||||||
self.assertFalse(self.conn.db.meter.ensure_index('foo',
|
coll = getattr(conn.db, coll_name)
|
||||||
name='meter_ttl'))
|
index_name = '%s_ttl' % coll_name
|
||||||
|
coll.ensure_index('foo', name=index_name)
|
||||||
|
self.CONF.set_override(ttl_opt, -1, group='database')
|
||||||
|
conn.upgrade()
|
||||||
|
self.assertTrue(coll.ensure_index('foo', name=index_name))
|
||||||
|
coll.drop_index(index_name)
|
||||||
|
|
||||||
|
self.CONF.set_override(ttl_opt, 456789, group='database')
|
||||||
|
conn.upgrade()
|
||||||
|
self.assertFalse(coll.ensure_index('foo', name=index_name))
|
||||||
|
|
||||||
|
def test_meter_ttl_index_absent(self):
|
||||||
|
self._test_ttl_index_absent(self.conn, 'meter',
|
||||||
|
'metering_time_to_live')
|
||||||
|
|
||||||
|
def test_event_ttl_index_absent(self):
|
||||||
|
self._test_ttl_index_absent(self.event_conn, 'event',
|
||||||
|
'event_time_to_live')
|
||||||
|
|
||||||
|
def _test_ttl_index_present(self, conn, coll_name, ttl_opt):
|
||||||
|
coll = getattr(conn.db, coll_name)
|
||||||
|
self.CONF.set_override(ttl_opt, 456789, group='database')
|
||||||
|
conn.upgrade()
|
||||||
|
index_name = '%s_ttl' % coll_name
|
||||||
|
self.assertFalse(coll.ensure_index('foo', name=index_name))
|
||||||
|
self.assertEqual(456789,
|
||||||
|
coll.index_information()
|
||||||
|
[index_name]['expireAfterSeconds'])
|
||||||
|
|
||||||
|
self.CONF.set_override(ttl_opt, -1, group='database')
|
||||||
|
conn.upgrade()
|
||||||
|
self.assertTrue(coll.ensure_index('foo', name=index_name))
|
||||||
|
|
||||||
def test_meter_ttl_index_present(self):
|
def test_meter_ttl_index_present(self):
|
||||||
self.CONF.set_override('time_to_live', 456789, group='database')
|
self._test_ttl_index_present(self.conn, 'meter',
|
||||||
self.conn.upgrade()
|
'metering_time_to_live')
|
||||||
self.assertFalse(self.conn.db.meter.ensure_index('foo',
|
|
||||||
name='meter_ttl'))
|
|
||||||
self.assertEqual(456789,
|
|
||||||
self.conn.db.meter.index_information()
|
|
||||||
['meter_ttl']['expireAfterSeconds'])
|
|
||||||
|
|
||||||
self.CONF.set_override('time_to_live', -1, group='database')
|
def test_event_ttl_index_present(self):
|
||||||
self.conn.upgrade()
|
self._test_ttl_index_present(self.event_conn, 'event',
|
||||||
self.assertTrue(self.conn.db.meter.ensure_index('foo',
|
'event_time_to_live')
|
||||||
name='meter_ttl'))
|
|
||||||
|
|
||||||
|
|
||||||
@tests_db.run_with('mongodb')
|
@tests_db.run_with('mongodb')
|
||||||
|
@ -3581,7 +3581,7 @@ class MongoAutoReconnectTest(DBTestBase,
|
|||||||
class MongoTimeToLiveTest(DBTestBase, tests_db.MixinTestsWithBackendScenarios):
|
class MongoTimeToLiveTest(DBTestBase, tests_db.MixinTestsWithBackendScenarios):
|
||||||
|
|
||||||
def test_ensure_index(self):
|
def test_ensure_index(self):
|
||||||
cfg.CONF.set_override('time_to_live', 5, group='database')
|
cfg.CONF.set_override('metering_time_to_live', 5, group='database')
|
||||||
self.conn.upgrade()
|
self.conn.upgrade()
|
||||||
self.assertEqual(5, self.conn.db.resource.index_information()
|
self.assertEqual(5, self.conn.db.resource.index_information()
|
||||||
['resource_ttl']['expireAfterSeconds'])
|
['resource_ttl']['expireAfterSeconds'])
|
||||||
@ -3589,9 +3589,9 @@ class MongoTimeToLiveTest(DBTestBase, tests_db.MixinTestsWithBackendScenarios):
|
|||||||
['meter_ttl']['expireAfterSeconds'])
|
['meter_ttl']['expireAfterSeconds'])
|
||||||
|
|
||||||
def test_modification_of_index(self):
|
def test_modification_of_index(self):
|
||||||
cfg.CONF.set_override('time_to_live', 5, group='database')
|
cfg.CONF.set_override('metering_time_to_live', 5, group='database')
|
||||||
self.conn.upgrade()
|
self.conn.upgrade()
|
||||||
cfg.CONF.set_override('time_to_live', 15, group='database')
|
cfg.CONF.set_override('metering_time_to_live', 15, group='database')
|
||||||
self.conn.upgrade()
|
self.conn.upgrade()
|
||||||
self.assertEqual(15, self.conn.db.resource.index_information()
|
self.assertEqual(15, self.conn.db.resource.index_information()
|
||||||
['resource_ttl']['expireAfterSeconds'])
|
['resource_ttl']['expireAfterSeconds'])
|
||||||
|
@ -56,12 +56,13 @@ class BinTestCase(base.BaseTestCase):
|
|||||||
self.assertEqual(0, subp.poll())
|
self.assertEqual(0, subp.poll())
|
||||||
self.assertIn("Nothing to clean", err)
|
self.assertIn("Nothing to clean", err)
|
||||||
|
|
||||||
def test_run_expirer_ttl_enabled(self):
|
def _test_run_expirer_ttl_enabled(self, metering_ttl_name):
|
||||||
content = ("[DEFAULT]\n"
|
content = ("[DEFAULT]\n"
|
||||||
"rpc_backend=fake\n"
|
"rpc_backend=fake\n"
|
||||||
"[database]\n"
|
"[database]\n"
|
||||||
"time_to_live=1\n"
|
"%s=1\n"
|
||||||
"connection=log://localhost\n")
|
"event_time_to_live=1\n"
|
||||||
|
"connection=log://localhost\n" % metering_ttl_name)
|
||||||
self.tempfile = fileutils.write_to_tempfile(content=content,
|
self.tempfile = fileutils.write_to_tempfile(content=content,
|
||||||
prefix='ceilometer',
|
prefix='ceilometer',
|
||||||
suffix='.conf')
|
suffix='.conf')
|
||||||
@ -71,7 +72,14 @@ class BinTestCase(base.BaseTestCase):
|
|||||||
stderr=subprocess.PIPE)
|
stderr=subprocess.PIPE)
|
||||||
__, err = subp.communicate()
|
__, err = subp.communicate()
|
||||||
self.assertEqual(0, subp.poll())
|
self.assertEqual(0, subp.poll())
|
||||||
self.assertIn("Dropping data with TTL 1", err)
|
self.assertIn("Dropping metering data with TTL 1", err)
|
||||||
|
self.assertIn("Dropping event data with TTL 1", err)
|
||||||
|
|
||||||
|
def test_run_expirer_ttl_enabled(self):
|
||||||
|
self._test_run_expirer_ttl_enabled('metering_time_to_live')
|
||||||
|
|
||||||
|
def test_run_expirer_ttl_enabled_with_deprecated_opt_name(self):
|
||||||
|
self._test_run_expirer_ttl_enabled('time_to_live')
|
||||||
|
|
||||||
|
|
||||||
class BinSendSampleTestCase(base.BaseTestCase):
|
class BinSendSampleTestCase(base.BaseTestCase):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user