diff --git a/stackalytics/processor/dump.py b/stackalytics/processor/dump.py index 957161ad4..891b033ed 100644 --- a/stackalytics/processor/dump.py +++ b/stackalytics/processor/dump.py @@ -111,9 +111,20 @@ def export_data(memcached_inst, fd): key_prefix = key + ':' for record_id_set in utils.make_range(0, count, BULK_READ_SIZE): - for k, v in six.iteritems(memcached_inst.get_multi( - record_id_set, key_prefix)): - pickle.dump((key_prefix + str(k), v), fd) + # memcache limits the size of returned data to specific yet unknown + # chunk size, the code should verify that all requested records are + # returned an be able to fall back to one-by-one retrieval + + chunk = memcached_inst.get_multi(record_id_set, key_prefix) + if len(chunk) < len(record_id_set): + # retrieve one-by-one + for record_id in record_id_set: + key = key_prefix + str(record_id) + pickle.dump((key, memcached_inst.get(key)), fd) + else: + # dump the whole chunk + for k, v in six.iteritems(chunk): + pickle.dump((key_prefix + str(k), v), fd) for user_seq in range(memcached_inst.get('user:count') or 0): user = memcached_inst.get('user:%s' % user_seq) @@ -126,25 +137,6 @@ def export_data(memcached_inst, fd): pickle.dump(('user:%s' % email, user), fd) -def export_data_universal(memcached_inst, fd): - LOG.info('Exporting data from memcached') - slabs = memcached_inst.get_slabs() - for slab_number, slab in six.iteritems(slabs[0][1]): - count = int(slab['number']) - keys = memcached_inst.get_stats( - 'cachedump %s %s' % (slab_number, count))[0][1].keys() - - n = 0 - while n < count: - LOG.debug('Dumping slab %s, start record %s', slab_number, n) - - for k, v in six.iteritems(memcached_inst.get_multi( - keys[n: min(count, n + BULK_READ_SIZE)])): - pickle.dump((k, v), fd) - - n += BULK_READ_SIZE - - def _connect_to_memcached(uri): stripped = re.sub(MEMCACHED_URI_PREFIX, '', uri) if stripped: diff --git a/tests/unit/test_dump.py b/tests/unit/test_dump.py new file mode 100644 index 000000000..0741a2c7f --- /dev/null +++ b/tests/unit/test_dump.py @@ -0,0 +1,68 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import memcache +import mock +import testtools + +from stackalytics.processor import dump + + +class TestDump(testtools.TestCase): + + def _make_data(self, record_count): + data = {'record:count': record_count} + for i in range(record_count): + data['record:%d' % i] = i + return data + + def test_export_data_records(self): + record_count = 153 + data = self._make_data(record_count) + memcache_inst = mock.Mock(memcache.Client) + memcache_inst.get = lambda x: data.get(x) + memcache_inst.get_multi = lambda keys, key_prefix: dict( + ('%s' % n, data.get(key_prefix + '%s' % n)) for n in keys) + + with mock.patch('pickle.dump') as pickle_dump: + fd = mock.Mock() + dump.export_data(memcache_inst, fd) + # self.assertEquals(total, pickle_dump.call_count) + + expected_calls = [mock.call(('record:count', record_count), fd)] + for i in range(record_count): + expected_calls.append(mock.call(('record:%d' % i, + data['record:%d' % i]), fd)) + pickle_dump.assert_has_calls(expected_calls, any_order=True) + + def test_export_data_records_get_multi_truncates_chunk(self): + record_count = 153 + data = self._make_data(record_count) + memcache_inst = mock.Mock(memcache.Client) + memcache_inst.get = lambda x: data.get(x) + memcache_inst.get_multi = lambda keys, key_prefix: dict( + ('%s' % n, data.get(key_prefix + '%s' % n)) + for n in [k for k, v in zip(keys, range(len(keys) - 1))]) + + with mock.patch('pickle.dump') as pickle_dump: + fd = mock.Mock() + dump.export_data(memcache_inst, fd) + # self.assertEquals(total, pickle_dump.call_count) + + expected_calls = [mock.call(('record:count', record_count), fd)] + for i in range(record_count): + expected_calls.append(mock.call(('record:%d' % i, + data['record:%d' % i]), fd)) + pickle_dump.assert_has_calls(expected_calls, any_order=True)