Merge "Add 'collect_end_time' param for distil-collector command"
This commit is contained in:
commit
e2dbdd4b50
@ -25,7 +25,6 @@ from distil.db import api as db_api
|
||||
from distil import exceptions as exc
|
||||
from distil import transformer as d_transformer
|
||||
from distil.common import constants
|
||||
from distil.common import general
|
||||
from distil.common import openstack
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -46,7 +45,7 @@ class BaseCollector(object):
|
||||
def get_meter(self, project, meter, start, end):
|
||||
raise NotImplementedError
|
||||
|
||||
def collect_usage(self, project, start, end):
|
||||
def collect_usage(self, project, windows):
|
||||
"""Collect usage for specific tenant.
|
||||
|
||||
:return: True if no error happened otherwise return False.
|
||||
@ -54,15 +53,6 @@ class BaseCollector(object):
|
||||
LOG.info('collect_usage by %s for project: %s(%s)' %
|
||||
(self.__class__.__name__, project['id'], project['name']))
|
||||
|
||||
windows = list(general.generate_windows(start, end))
|
||||
|
||||
if CONF.collector.max_windows_per_cycle > 0:
|
||||
windows = windows[:CONF.collector.max_windows_per_cycle]
|
||||
|
||||
if not windows:
|
||||
LOG.info("Skipped project %s(%s), less than 1 hour since last "
|
||||
"collection time.", project['id'], project['name'])
|
||||
|
||||
for window_start, window_end in windows:
|
||||
LOG.info("Project %s(%s) slice %s %s", project['id'],
|
||||
project['name'], window_start, window_end)
|
||||
|
@ -46,14 +46,22 @@ def get_transformer_config():
|
||||
return _TRANS_CONFIG
|
||||
|
||||
|
||||
def generate_windows(start, end):
|
||||
"""Generator for configured hour windows in a given range."""
|
||||
def get_windows(start, end):
|
||||
"""Get configured hour windows in a given range."""
|
||||
windows = []
|
||||
window_size = timedelta(hours=CONF.collector.collect_window)
|
||||
|
||||
while start + window_size <= end:
|
||||
window_end = start + window_size
|
||||
yield start, window_end
|
||||
windows.append((start, window_end))
|
||||
|
||||
if len(windows) >= CONF.collector.max_windows_per_cycle:
|
||||
break
|
||||
|
||||
start = window_end
|
||||
|
||||
return windows
|
||||
|
||||
|
||||
def log_and_time_it(f):
|
||||
def decorator(*args, **kwargs):
|
||||
|
@ -52,7 +52,7 @@ COLLECTOR_OPTS = [
|
||||
help=('Window of usage collection in hours.')),
|
||||
cfg.StrOpt('collector_backend', default='ceilometer',
|
||||
help=('Data collector.')),
|
||||
cfg.IntOpt('max_windows_per_cycle', default=0,
|
||||
cfg.IntOpt('max_windows_per_cycle', default=1,
|
||||
help=('The maximum number of windows per collecting cycle.')),
|
||||
cfg.StrOpt('meter_mappings_file', default='/etc/distil/meter_mappings.yml',
|
||||
help=('The meter mappings configuration.')),
|
||||
@ -110,6 +110,15 @@ RATER_OPTS = [
|
||||
'is "file".'),
|
||||
]
|
||||
|
||||
CLI_OPTS = [
|
||||
cfg.StrOpt(
|
||||
'collect-end-time',
|
||||
help=('The end date of usage to collect before distil-collector is '
|
||||
'stopped. If not provided, distil-collector will keep running. '
|
||||
'Time format is %Y-%m-%dT%H:%M:%S')
|
||||
),
|
||||
]
|
||||
|
||||
AUTH_GROUP = 'keystone_authtoken'
|
||||
ODOO_GROUP = 'odoo'
|
||||
COLLECTOR_GROUP = 'collector'
|
||||
@ -120,6 +129,7 @@ CONF.register_opts(DEFAULT_OPTIONS)
|
||||
CONF.register_opts(ODOO_OPTS, group=ODOO_GROUP)
|
||||
CONF.register_opts(COLLECTOR_OPTS, group=COLLECTOR_GROUP)
|
||||
CONF.register_opts(RATER_OPTS, group=RATER_GROUP)
|
||||
CONF.register_cli_opts(CLI_OPTS)
|
||||
|
||||
|
||||
def list_opts():
|
||||
|
@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
import os
|
||||
from random import shuffle
|
||||
|
||||
from oslo_config import cfg
|
||||
@ -23,6 +24,7 @@ from stevedore import driver
|
||||
|
||||
from distil.db import api as db_api
|
||||
from distil import exceptions
|
||||
from distil.common import constants
|
||||
from distil.common import general
|
||||
from distil.common import openstack
|
||||
|
||||
@ -110,6 +112,11 @@ class CollectorService(service.Service):
|
||||
def collect_usage(self):
|
||||
LOG.info("Starting to collect usage...")
|
||||
|
||||
if CONF.collector.max_windows_per_cycle <= 0:
|
||||
LOG.info("Finished collecting usage with configuration "
|
||||
"max_windows_per_cycle<=0.")
|
||||
return
|
||||
|
||||
projects = openstack.get_projects()
|
||||
project_ids = [p['id'] for p in projects]
|
||||
valid_projects = filter_projects(projects)
|
||||
@ -119,7 +126,15 @@ class CollectorService(service.Service):
|
||||
last_collect = db_api.get_last_collect(project_ids).last_collected
|
||||
|
||||
end = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
|
||||
count = 0
|
||||
if CONF.collect_end_time:
|
||||
end = datetime.strptime(CONF.collect_end_time, constants.iso_time)
|
||||
|
||||
# Number of projects updated successfully.
|
||||
success_count = 0
|
||||
# Number of projects processed actually.
|
||||
processed_count = 0
|
||||
# Number of projects already up-to-date.
|
||||
updated_count = 0
|
||||
|
||||
valid_projects = self._get_projects_by_order(valid_projects)
|
||||
for project in valid_projects:
|
||||
@ -127,7 +142,6 @@ class CollectorService(service.Service):
|
||||
# instance. If no, will get a lock and continue processing,
|
||||
# otherwise just skip it.
|
||||
locks = db_api.get_project_locks(project['id'])
|
||||
|
||||
if locks and locks[0].owner != self.identifier:
|
||||
LOG.debug(
|
||||
"Project %s is being processed by collector %s." %
|
||||
@ -137,13 +151,31 @@ class CollectorService(service.Service):
|
||||
|
||||
try:
|
||||
with db_api.project_lock(project['id'], self.identifier):
|
||||
processed_count += 1
|
||||
|
||||
# Add a project or get last_collected of existing project.
|
||||
db_project = db_api.project_add(project, last_collect)
|
||||
start = db_project.last_collected
|
||||
|
||||
if self.collector.collect_usage(project, start, end):
|
||||
count = count + 1
|
||||
windows = general.get_windows(start, end)
|
||||
if not windows:
|
||||
LOG.info(
|
||||
"project %s(%s) already up-to-date.",
|
||||
project['id'], project['name']
|
||||
)
|
||||
updated_count += 1
|
||||
continue
|
||||
|
||||
if self.collector.collect_usage(project, windows):
|
||||
success_count += 1
|
||||
except Exception:
|
||||
LOG.warning('Get lock failed. Process: %s' % self.identifier)
|
||||
|
||||
LOG.info("Finished collecting usage for %s projects." % count)
|
||||
LOG.info("Finished collecting usage for %s projects." % success_count)
|
||||
|
||||
# If we start distil-collector manually with 'collect_end_time' param
|
||||
# specified, the service should be stopped automatically after all
|
||||
# projects usage collection is up-to-date.
|
||||
if CONF.collect_end_time and updated_count == processed_count:
|
||||
self.stop()
|
||||
os.kill(os.getpid(), 9)
|
||||
|
@ -97,6 +97,7 @@ class DistilTestCase(base.BaseTestCase):
|
||||
"""
|
||||
for k, v in kw.items():
|
||||
self.conf.set_override(k, v, group)
|
||||
self.addCleanup(self.conf.clear_override, k, group)
|
||||
|
||||
def _my_dir(self):
|
||||
return os.path.abspath(os.path.dirname(__file__))
|
||||
|
@ -148,8 +148,7 @@ class CollectorBaseTest(base.DistilWithDbTestCase):
|
||||
srv = collector.CollectorService()
|
||||
ret = srv.collector.collect_usage(
|
||||
{'name': 'fake_project', 'id': '123'},
|
||||
datetime.utcnow() - timedelta(hours=1.5),
|
||||
datetime.utcnow()
|
||||
[(datetime.utcnow() - timedelta(hours=1), datetime.utcnow())]
|
||||
)
|
||||
|
||||
self.assertFalse(ret)
|
||||
|
@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
@ -20,6 +21,8 @@ import os
|
||||
import mock
|
||||
|
||||
from distil.collector import base as collector_base
|
||||
from distil.common import constants
|
||||
from distil import config
|
||||
from distil.db.sqlalchemy import api as db_api
|
||||
from distil.service import collector
|
||||
from distil.tests.unit import base
|
||||
@ -86,7 +89,7 @@ class CollectorTest(base.DistilWithDbTestCase):
|
||||
]
|
||||
|
||||
collector = collector_base.BaseCollector()
|
||||
collector.collect_usage(project, start_time, end_time)
|
||||
collector.collect_usage(project, [(start_time, end_time)])
|
||||
|
||||
resources = db_api.resource_get_by_ids(project_id, [resource_id_hash])
|
||||
res_info = json.loads(resources[0].info)
|
||||
@ -144,22 +147,17 @@ class CollectorTest(base.DistilWithDbTestCase):
|
||||
project_2_collect
|
||||
)
|
||||
|
||||
end = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
|
||||
|
||||
svc = collector.CollectorService()
|
||||
svc.collect_usage()
|
||||
|
||||
mock_collect_usage.assert_called_once_with(
|
||||
{'id': '333', 'name': 'project_3', 'description': ''},
|
||||
project_1_collect,
|
||||
end
|
||||
[(project_1_collect, project_1_collect + timedelta(hours=1))]
|
||||
)
|
||||
|
||||
@mock.patch('distil.db.api.get_project_locks')
|
||||
@mock.patch('distil.common.openstack.get_ceilometer_client')
|
||||
@mock.patch('distil.common.openstack.get_projects')
|
||||
def test_project_order_ascending(self, mock_get_projects, mock_cclient,
|
||||
mock_getlocks):
|
||||
def test_project_order_ascending(self, mock_get_projects, mock_cclient):
|
||||
mock_get_projects.return_value = [
|
||||
{'id': '111', 'name': 'project_1', 'description': ''},
|
||||
{'id': '222', 'name': 'project_2', 'description': ''},
|
||||
@ -174,23 +172,25 @@ class CollectorTest(base.DistilWithDbTestCase):
|
||||
'name': 'project_1',
|
||||
'description': '',
|
||||
},
|
||||
datetime(2017, 5, 17, 19)
|
||||
datetime.utcnow() - timedelta(hours=2)
|
||||
)
|
||||
|
||||
svc = collector.CollectorService()
|
||||
svc.collector = mock.Mock()
|
||||
svc.collect_usage()
|
||||
|
||||
expected_projects = []
|
||||
for call in svc.collector.collect_usage.call_args_list:
|
||||
expected_projects.append(call[0][0]['id'])
|
||||
|
||||
self.assertEqual(
|
||||
[mock.call('111'), mock.call('222'), mock.call('333'),
|
||||
mock.call('444')],
|
||||
mock_getlocks.call_args_list
|
||||
['111', '222', '333', '444'],
|
||||
expected_projects
|
||||
)
|
||||
|
||||
@mock.patch('distil.db.api.get_project_locks')
|
||||
@mock.patch('distil.common.openstack.get_ceilometer_client')
|
||||
@mock.patch('distil.common.openstack.get_projects')
|
||||
def test_project_order_descending(self, mock_get_projects, mock_cclient,
|
||||
mock_getlocks):
|
||||
def test_project_order_descending(self, mock_get_projects, mock_cclient):
|
||||
self.override_config('collector', project_order='descending')
|
||||
|
||||
mock_get_projects.return_value = [
|
||||
@ -207,23 +207,25 @@ class CollectorTest(base.DistilWithDbTestCase):
|
||||
'name': 'project_1',
|
||||
'description': '',
|
||||
},
|
||||
datetime(2017, 5, 17, 19)
|
||||
datetime.utcnow() - timedelta(hours=2)
|
||||
)
|
||||
|
||||
svc = collector.CollectorService()
|
||||
svc.collector = mock.Mock()
|
||||
svc.collect_usage()
|
||||
|
||||
expected_projects = []
|
||||
for call in svc.collector.collect_usage.call_args_list:
|
||||
expected_projects.append(call[0][0]['id'])
|
||||
|
||||
self.assertEqual(
|
||||
[mock.call('444'), mock.call('333'), mock.call('222'),
|
||||
mock.call('111')],
|
||||
mock_getlocks.call_args_list
|
||||
['444', '333', '222', '111'],
|
||||
expected_projects
|
||||
)
|
||||
|
||||
@mock.patch('distil.db.api.get_project_locks')
|
||||
@mock.patch('distil.common.openstack.get_ceilometer_client')
|
||||
@mock.patch('distil.common.openstack.get_projects')
|
||||
def test_project_order_random(self, mock_get_projects, mock_cclient,
|
||||
mock_getlocks):
|
||||
def test_project_order_random(self, mock_get_projects, mock_cclient):
|
||||
self.override_config('collector', project_order='random')
|
||||
|
||||
mock_get_projects.return_value = [
|
||||
@ -240,14 +242,51 @@ class CollectorTest(base.DistilWithDbTestCase):
|
||||
'name': 'project_1',
|
||||
'description': '',
|
||||
},
|
||||
datetime(2017, 5, 17, 19)
|
||||
datetime.utcnow() - timedelta(hours=2)
|
||||
)
|
||||
|
||||
svc = collector.CollectorService()
|
||||
svc.collector = mock.Mock()
|
||||
svc.collect_usage()
|
||||
|
||||
expected_projects = []
|
||||
for call in svc.collector.collect_usage.call_args_list:
|
||||
expected_projects.append(call[0][0]['id'])
|
||||
|
||||
self.assertNotEqual(
|
||||
[mock.call('111'), mock.call('222'), mock.call('333'),
|
||||
mock.call('444')],
|
||||
mock_getlocks.call_args_list
|
||||
['111', '222', '333', '444'],
|
||||
expected_projects
|
||||
)
|
||||
|
||||
@mock.patch('os.kill')
|
||||
@mock.patch('distil.common.openstack.get_ceilometer_client')
|
||||
@mock.patch('distil.common.openstack.get_projects')
|
||||
def test_collect_with_end_time(self, mock_get_projects, mock_cclient,
|
||||
mock_kill):
|
||||
end_time = datetime.utcnow() + timedelta(hours=0.5)
|
||||
end_time_str = end_time.strftime(constants.iso_time)
|
||||
self.override_config(collect_end_time=end_time_str)
|
||||
|
||||
mock_get_projects.return_value = [
|
||||
{
|
||||
'id': '111',
|
||||
'name': 'project_1',
|
||||
'description': 'description'
|
||||
}
|
||||
]
|
||||
# Insert the project info in the database.
|
||||
db_api.project_add(
|
||||
{
|
||||
'id': '111',
|
||||
'name': 'project_1',
|
||||
'description': '',
|
||||
},
|
||||
datetime.utcnow()
|
||||
)
|
||||
|
||||
srv = collector.CollectorService()
|
||||
srv.thread_grp = mock.Mock()
|
||||
srv.collect_usage()
|
||||
|
||||
self.assertEqual(1, srv.thread_grp.stop.call_count)
|
||||
self.assertEqual(1, mock_kill.call_count)
|
||||
|
Loading…
x
Reference in New Issue
Block a user