Refactor account class as class and add tests.

This commit is contained in:
Chmouel Boudjnah 2013-03-26 13:51:37 +01:00
parent 53f95305e0
commit b5d3bb83c9
6 changed files with 370 additions and 79 deletions

View File

View File

@ -1,39 +1,57 @@
# -*- encoding: utf-8 -*- # -*- coding: utf-8 -*-
import datetime # Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
import os #
import sys # Author: Chmouel Boudjnah <chmouel@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time import time
import datetime
from keystoneclient.v2_0 import client as ksclient
import dateutil.relativedelta
import swiftclient import swiftclient
import dateutil.relativedelta
import keystoneclient.v2_0.client
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from utils import get_config
from common.utils import get_config, get_auth from containers import sync_container
from sync.containers import sync_container
def get_ks_auth_orig(): class Accounts(object):
"""Process Keystone Accounts."""
def __init__(self):
self.keystone_cnx = None
def get_swift_auth(self, auth_url, tenant, user, password):
"""Get swift connexion from args"""
return swiftclient.client.Connection(
auth_url,
'%s:%s' % (tenant, user),
password,
auth_version=2).get_auth()
def get_ks_auth_orig(self):
"""Get keystone cnx from config"""
orig_auth_url = get_config('auth', 'keystone_origin') orig_auth_url = get_config('auth', 'keystone_origin')
tenant_name, username, password = \ cfg = get_config('auth', 'keystone_origin_admin_credentials')
get_config('auth', 'keystone_origin_admin_credentials').split(':') (tenant_name, username, password) = cfg.split(':')
return ksclient.Client(auth_url=orig_auth_url, return keystoneclient.v2_0.client.Client(auth_url=orig_auth_url,
username=username, username=username,
password=password, password=password,
tenant_name=tenant_name) tenant_name=tenant_name)
def sync_account(self, orig_storage_url, orig_token,
def list_accounts(cnx): dest_storage_url, dest_token):
for x in cnx.tenants.list(): """Sync a single account with url/tok to dest_url/dest_tok."""
yield x
def sync_an_account(orig_storage_url,
orig_token,
dest_storage_url,
dest_token):
orig_storage_cnx = swiftclient.http_connection(orig_storage_url) orig_storage_cnx = swiftclient.http_connection(orig_storage_url)
dest_storage_cnx = swiftclient.http_connection(dest_storage_url) dest_storage_cnx = swiftclient.http_connection(dest_storage_url)
@ -43,42 +61,55 @@ def sync_an_account(orig_storage_url,
for container in orig_containers: for container in orig_containers:
print container print container
dt1 = datetime.datetime.fromtimestamp(time.time()) dt1 = datetime.datetime.fromtimestamp(time.time())
sync_container(orig_storage_cnx, orig_storage_url, orig_token, sync_container(orig_storage_cnx,
dest_storage_cnx, dest_storage_url, dest_token, orig_storage_url,
orig_token,
dest_storage_cnx,
dest_storage_url, dest_token,
container['name']) container['name'])
dt2 = datetime.datetime.fromtimestamp(time.time()) dt2 = datetime.datetime.fromtimestamp(time.time())
rd = dateutil.relativedelta.relativedelta(dt2, dt1) rd = dateutil.relativedelta.relativedelta(dt2, dt1)
print "%d hours, %d minutes and %d seconds" % (rd.hours, rd.minutes, #TODO(chmou): use logging
print "%d hours, %d minutes and %d seconds" % (rd.hours,
rd.minutes,
rd.seconds) rd.seconds)
print print
def process(self):
def sync_accounts(): """Process all keystone accounts to sync."""
orig_auth_url = get_config('auth', 'keystone_origin') orig_auth_url = get_config('auth', 'keystone_origin')
orig_admin_tenant, orig_admin_user, orig_admin_password = ( orig_admin_tenant, orig_admin_user, orig_admin_password = (
get_config('auth', 'keystone_origin_admin_credentials').split(':')) get_config('auth', 'keystone_origin_admin_credentials').split(':'))
oa_st_url, orig_admin_token = (get_auth(orig_auth_url, orig_admin_tenant, oa_st_url, orig_admin_token = self.get_swift_auth(
orig_admin_user, orig_auth_url, orig_admin_tenant,
orig_admin_password)) orig_admin_user, orig_admin_password)
dest_auth_url = get_config('auth', 'keystone_dest') dest_auth_url = get_config('auth', 'keystone_dest')
# we assume orig and dest passwd are the same obv synchronized. # we assume orig and dest passwd are the same obv synchronized.
dst_st_url, dest_admin_token = (get_auth(dest_auth_url, orig_admin_tenant, dst_st_url, dest_admin_token = self.get_swift_auth(
orig_admin_user, dest_auth_url, orig_admin_tenant,
orig_admin_password)) orig_admin_user, orig_admin_password)
bare_oa_st_url = oa_st_url[:oa_st_url.find('AUTH_')] + "AUTH_" bare_oa_st_url = oa_st_url[:oa_st_url.find('AUTH_')] + "AUTH_"
bare_dst_st_url = dst_st_url[:dst_st_url.find('AUTH_')] + "AUTH_" bare_dst_st_url = dst_st_url[:dst_st_url.find('AUTH_')] + "AUTH_"
for x in list_accounts(get_ks_auth_orig()): self.keystone_cnx = self.get_ks_auth_orig()
user_orig_st_url = bare_oa_st_url + x.id
user_dst_st_url = bare_dst_st_url + x.id
sync_an_account(user_orig_st_url, for tenant in self.keystone_cnx.tenants.list():
user_orig_st_url = bare_oa_st_url + tenant.id
user_dst_st_url = bare_dst_st_url + tenant.id
self.sync_account(user_orig_st_url,
orig_admin_token, orig_admin_token,
user_dst_st_url, user_dst_st_url,
dest_admin_token) dest_admin_token)
def main():
acc = Accounts()
acc.process()
if __name__ == '__main__': if __name__ == '__main__':
sync_accounts() main()

View File

@ -42,12 +42,5 @@ def get_config(section, option, default=None):
"section/option: %s/%s" % (section, option)) "section/option: %s/%s" % (section, option))
def get_auth(auth_url, tenant, user, password):
return swclient.Connection(
auth_url,
'%s:%s' % (tenant, user),
password,
auth_version=2).get_auth()
if __name__ == '__main__': if __name__ == '__main__':
get_config("foo", "bar") get_config("foo", "bar")

70
tests/fakes.py Normal file
View File

@ -0,0 +1,70 @@
# -*- encoding: utf-8 -*-
__author__ = "Chmouel Boudjnah <chmouel@chmouel.com>"
import uuid
TENANTS_LIST = {'foo1': {'id': uuid.uuid4().hex},
'foo2': {'id': uuid.uuid4().hex},
'foo3': {'id': uuid.uuid4().hex}}
CONFIGDICT = {'auth':
{'keystone_origin': 'http://keystone-origin.com',
'keystone_origin_admin_credentials': 'foo1:bar:kernel',
'keystone_dest': 'http://storage-dest.com'}}
STORAGE_DEST = 'http://storage-dest.com'
def fake_get_config(section, option):
return CONFIGDICT[section][option]
def fake_get_auth(auth_url, tenant, user, password):
return FakeSWConnection(
auth_url,
'%s:%s' % (tenant, user),
password,
auth_version=2).get_auth()
class FakeSWConnection(object):
def __init__(self, *args, **kwargs):
self.mainargs = args
self.mainkwargs = kwargs
def get_auth(self, *args, **kwargs):
tenant, user = self.mainargs[1].split(':')
tenant_id = TENANTS_LIST[tenant]['id']
return ('%s/v1/AUTH_%s' % (STORAGE_DEST, tenant_id), 'token')
class FakeKSTenant(object):
def __init__(self, tenant_name):
self.tenant_name = tenant_name
@property
def id(self):
return TENANTS_LIST[self.tenant_name]['id']
def __str__(self):
return self.tenant_name
class FakeKSClientTenant(object):
def list(self):
for t in list(TENANTS_LIST):
yield FakeKSTenant(t)
class FakeKSClient(object):
def __init__(self, *args):
self.args = args
self.tenants = FakeKSClientTenant()
def __call__(self):
return self.args
class FakeKS(object):
@staticmethod
def Client(*args, **kwargs):
return FakeKSClient(args, kwargs)

73
tests/test_accounts.py Normal file
View File

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Chmouel Boudjnah <chmouel@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest2
import swiftclient
import keystoneclient
import sync.accounts
from fakes import (FakeSWConnection, TENANTS_LIST,
STORAGE_DEST, FakeKS, CONFIGDICT,
fake_get_config)
class TestAccount(unittest2.TestCase):
def setUp(self):
self.accounts_cls = sync.accounts.Accounts()
self._monkey_patch()
def _monkey_patch(self):
keystoneclient.v2_0.client = FakeKS
swiftclient.client.Connection = FakeSWConnection
sync.accounts.get_config = fake_get_config
def test_get_swift_auth(self):
tenant_name = 'foo1'
ret = self.accounts_cls.get_swift_auth(
"http://test.com", tenant_name, "user", "password")
tenant_id = TENANTS_LIST[tenant_name]['id']
self.assertEquals(ret[0], "%s/v1/AUTH_%s" % (STORAGE_DEST,
tenant_id))
def test_get_ks_auth_orig(self):
_, kwargs = self.accounts_cls.get_ks_auth_orig()()
k = CONFIGDICT['auth']['keystone_origin_admin_credentials']
tenant_name, username, password = k.split(':')
self.assertEquals(kwargs['tenant_name'], tenant_name)
self.assertEquals(kwargs['username'], username)
self.assertEquals(kwargs['password'], password)
k = CONFIGDICT['auth']['keystone_origin']
self.assertEquals(k, kwargs['auth_url'])
def test_process(self):
ret = []
def sync_account(orig_storage_url,
orig_token,
dest_storage_url,
dest_token):
ret.append((orig_storage_url, dest_storage_url))
self.accounts_cls.sync_account = sync_account
self.accounts_cls.process()
tenant_list_ids = sorted(TENANTS_LIST[x]['id'] for x in TENANTS_LIST)
ret_orig_storage_id = sorted(
x[0][x[0].find('AUTH_') + 5:] for x in ret)
self.assertEquals(tenant_list_ids, ret_orig_storage_id)
[self.assertTrue(x[1].startswith(STORAGE_DEST)) for x in ret]
def test_sync_account(self):
pass

124
tests/test_accounts.py.old Normal file
View File

@ -0,0 +1,124 @@
# -*- encoding: utf-8 -*-
import os
import sys
import uuid
import unittest2
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import sync.accounts as sync_accounts
TENANTS_LIST = {'foo1': {'id': uuid.uuid4().hex},
'foo2': {'id': uuid.uuid4().hex},
'foo3': {'id': uuid.uuid4().hex}}
CONFIGDICT = {'auth':
{'keystone_origin': 'http://keystone-origin.com',
'keystone_origin_admin_credentials': 'foo1:bar:kernel',
'keystone_dest': 'http://storage-dest.com'}}
STORAGE_DEST = 'http://storage-dest.com'
def fake_get_config(section, option):
return CONFIGDICT[section][option]
def fake_get_auth(auth_url, tenant, user, password):
return FakeSWConnection(
auth_url,
'%s:%s' % (tenant, user),
password,
auth_version=2).get_auth()
class FakeSWConnection(object):
def __init__(self, *args, **kwargs):
self.mainargs = args
self.mainkwargs = kwargs
def get_auth(self, *args, **kwargs):
tenant, user = self.mainargs[1].split(':')
tenant_id = TENANTS_LIST[tenant]['id']
return ('%s/v1/AUTH_%s' % (STORAGE_DEST, tenant_id), 'token')
class FakeKSTenant(object):
def __init__(self, tenant_name):
self.tenant_name = tenant_name
@property
def id(self):
return TENANTS_LIST[self.tenant_name]['id']
def __str__(self):
return self.tenant_name
class FakeKSClientTenant(object):
def list(self):
for t in list(TENANTS_LIST):
yield FakeKSTenant(t)
class FakeKSClient(object):
def __init__(self, *args):
self.args = args
self.tenants = FakeKSClientTenant()
def __call__(self):
return self.args
class FakeKS(object):
@staticmethod
def Client(*args, **kwargs):
return FakeKSClient(args, kwargs)
class TestAccount(unittest2.TestCase):
def setUp(self):
sync_accounts.ksclient = FakeKS
sync_accounts.get_config = fake_get_config
sync_accounts.get_auth = fake_get_auth
def test_list_accounts(self):
cnx = FakeKSClient()
tenant_list = [str(x) for x in sync_accounts.list_accounts(cnx)]
self.assertEquals(sorted(TENANTS_LIST), tenant_list)
def test_get_ks_auth_orig(self):
args, kwargs = sync_accounts.get_ks_auth_orig()()
k = CONFIGDICT['auth']['keystone_origin_admin_credentials']
tenant_name, username, password = k.split(':')
self.assertEquals(kwargs['tenant_name'], tenant_name)
self.assertEquals(kwargs['username'], username)
self.assertEquals(kwargs['password'], password)
k = CONFIGDICT['auth']['keystone_origin']
self.assertEquals(k, kwargs['auth_url'])
def test_sync_accounts(self):
ret = []
def sync_an_account(orig_storage_url,
orig_token,
dest_storage_url,
dest_token):
ret.append((orig_storage_url, dest_storage_url))
sync_accounts.sync_an_account = sync_an_account
sync_accounts.sync_accounts()
tenant_list_ids = sorted(TENANTS_LIST[x]['id'] for x in TENANTS_LIST)
ret_orig_storage_id = sorted(
x[0][x[0].find('AUTH_') + 5:] for x in ret)
self.assertEquals(tenant_list_ids, ret_orig_storage_id)
[self.assertTrue(x[1].startswith(STORAGE_DEST)) for x in ret]
def test_sync_an_account(self):
pass
if __name__ == '__main__':
unittest2.main()