Add ability to set Spoofguard profiles to Segments

Change-Id: Ic0ea9f5873b884f7a6730f91eb4a4bf36ff46e9d
This commit is contained in:
Abhishek Raut 2019-07-08 18:33:28 -07:00
parent 91a6cb566e
commit ffd3e9bedf
4 changed files with 213 additions and 1 deletions

View File

@ -4123,9 +4123,100 @@ class TestPolicySegmentSecurityProfile(TestPolicySegmentProfileBase):
class TestPolicySegmentSecProfilesBinding(NsxPolicyLibTestCase):
def setUp(self, resource_api_name='segment_security_profile_maps',
resource_def=core_defs.SegmentSecProfilesBindingMapDef):
super(TestPolicySegmentSecProfilesBinding, self).setUp()
self.resourceApi = getattr(self.policy_lib, resource_api_name)
self.resourceDef = resource_def
def test_create(self):
name = 'test'
segment_id = 'seg1'
prf1 = '1'
prf2 = '2'
with mock.patch.object(self.policy_api,
"create_or_update") as api_call:
result = self.resourceApi.create_or_overwrite(
name, segment_id,
segment_security_profile_id=prf1,
spoofguard_profile_id=prf2,
tenant=TEST_TENANT)
expected_def = self.resourceDef(
segment_id=segment_id,
map_id=core_resources.DEFAULT_MAP_ID,
name=name,
segment_security_profile_id=prf1,
spoofguard_profile_id=prf2,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
self.assertIsNotNone(result)
def test_delete(self):
segment_id = 'seg1'
with mock.patch.object(self.policy_api, "delete") as api_call:
self.resourceApi.delete(segment_id, tenant=TEST_TENANT)
expected_def = self.resourceDef(
segment_id=segment_id,
map_id=core_resources.DEFAULT_MAP_ID,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
def test_get(self):
segment_id = 'seg1'
with mock.patch.object(self.policy_api, "get",
return_value={'id': segment_id}) as api_call:
result = self.resourceApi.get(segment_id,
tenant=TEST_TENANT)
expected_def = self.resourceDef(
segment_id=segment_id,
map_id=core_resources.DEFAULT_MAP_ID,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
self.assertEqual(segment_id, result['id'])
def test_list(self):
segment_id = 'seg1'
with mock.patch.object(self.policy_api, "list",
return_value={'results': []}) as api_call:
result = self.resourceApi.list(segment_id,
tenant=TEST_TENANT)
expected_def = self.resourceDef(
segment_id=segment_id,
tenant=TEST_TENANT)
self.assert_called_with_def(api_call, expected_def)
self.assertEqual([], result)
def test_update(self):
name = 'new name'
segment_id = 'seg1'
prf1 = '1'
prf2 = '2'
with self.mock_get(segment_id, name), \
self.mock_create_update() as update_call:
self.resourceApi.update(
segment_id=segment_id,
name=name,
segment_security_profile_id=prf1,
spoofguard_profile_id=prf2,
tenant=TEST_TENANT)
expected_def = self.resourceDef(
segment_id=segment_id,
map_id=core_resources.DEFAULT_MAP_ID,
name=name,
segment_security_profile_id=prf1,
spoofguard_profile_id=prf2,
tenant=TEST_TENANT)
self.assert_called_with_def(
update_call, expected_def)
class TestPolicySegmentPortSecProfilesBinding(NsxPolicyLibTestCase):
def setUp(self, resource_api_name='segment_port_security_profiles',
resource_def=core_defs.SegmentPortSecProfilesBindingMapDef):
super(TestPolicySegmentSecProfilesBinding, self).setUp()
super(TestPolicySegmentPortSecProfilesBinding, self).setUp()
self.resourceApi = getattr(self.policy_lib, resource_api_name)
self.resourceDef = resource_def

View File

@ -103,6 +103,9 @@ class NsxPolicyLib(lib.NsxLibBase):
core_resources.NsxMacDiscoveryProfileApi(*args))
self.waf_profile = (
core_resources.NsxWAFProfileApi(*args))
self.segment_security_profile_maps = (
core_resources.SegmentSecurityProfilesBindingMapApi(
*args))
self.segment_port_security_profiles = (
core_resources.SegmentPortSecurityProfilesBindingMapApi(
*args))

View File

@ -804,6 +804,56 @@ class SegmentPortDef(ResourceDef):
return body
class SegmentBindingMapDefBase(ResourceDef):
@property
def path_ids(self):
return ('tenant', 'segment_id', 'map_id')
def path_defs(self):
return (TenantDef, SegmentDef)
class SegmentSecProfilesBindingMapDef(SegmentBindingMapDefBase):
@property
def path_pattern(self):
return (SEGMENTS_PATH_PATTERN +
"%s/segment-security-profile-binding-maps/")
@staticmethod
def resource_type():
return 'SegmentSecurityProfileBindingMap'
def get_obj_dict(self):
body = super(SegmentSecProfilesBindingMapDef, self).get_obj_dict()
if self.has_attr('segment_security_profile_id'):
path = None
if self.get_attr('segment_security_profile_id'):
profile = SegmentSecurityProfileDef(
profile_id=self.get_attr('segment_security_profile_id'),
tenant=self.get_tenant())
path = profile.get_resource_full_path()
self._set_attr_if_specified(
body, 'segment_security_profile_id',
body_attr='segment_security_profile_path',
value=path)
if self.has_attr('spoofguard_profile_id'):
path = None
if self.get_attr('spoofguard_profile_id'):
profile = SpoofguardProfileDef(
profile_id=self.get_attr('spoofguard_profile_id'),
tenant=self.get_tenant())
path = profile.get_resource_full_path()
self._set_attr_if_specified(
body, 'spoofguard_profile_id',
body_attr='spoofguard_profile_path',
value=path)
return body
class SegmentPortBindingMapDefBase(ResourceDef):
@property

View File

@ -2040,6 +2040,74 @@ class NsxPolicySegmentPortApi(NsxPolicyResourceBase):
admin_state=admin_state)
class SegmentProfilesBindingMapBaseApi(NsxPolicyResourceBase):
def delete(self, segment_id, map_id=DEFAULT_MAP_ID,
tenant=constants.POLICY_INFRA_TENANT):
map_def = self.entry_def(segment_id=segment_id,
map_id=map_id,
tenant=tenant)
self.policy_api.delete(map_def)
def get(self, segment_id, map_id=DEFAULT_MAP_ID,
tenant=constants.POLICY_INFRA_TENANT):
map_def = self.entry_def(segment_id=segment_id,
map_id=map_id,
tenant=tenant)
return self.policy_api.get(map_def)
def list(self, segment_id,
tenant=constants.POLICY_INFRA_TENANT):
map_def = self.entry_def(segment_id=segment_id,
tenant=tenant)
return self._list(map_def)
class SegmentSecurityProfilesBindingMapApi(SegmentProfilesBindingMapBaseApi):
@property
def entry_def(self):
return core_defs.SegmentSecProfilesBindingMapDef
def create_or_overwrite(self, name, segment_id,
map_id=DEFAULT_MAP_ID,
description=IGNORE,
segment_security_profile_id=IGNORE,
spoofguard_profile_id=IGNORE,
tags=IGNORE,
tenant=constants.POLICY_INFRA_TENANT):
map_id = self._init_obj_uuid(map_id)
map_def = self._init_def(
segment_id=segment_id,
map_id=map_id,
name=name,
description=description,
segment_security_profile_id=segment_security_profile_id,
spoofguard_profile_id=spoofguard_profile_id,
tags=tags,
tenant=tenant)
self._create_or_store(map_def)
return map_id
def update(self, segment_id,
map_id=DEFAULT_MAP_ID,
name=IGNORE,
description=IGNORE,
segment_security_profile_id=IGNORE,
spoofguard_profile_id=IGNORE,
tags=IGNORE,
tenant=constants.POLICY_INFRA_TENANT):
self._update(
segment_id=segment_id,
map_id=map_id,
name=name,
description=description,
segment_security_profile_id=segment_security_profile_id,
spoofguard_profile_id=spoofguard_profile_id,
tags=tags,
tenant=tenant)
class SegmentPortProfilesBindingMapBaseApi(NsxPolicyResourceBase):
def delete(self, segment_id, port_id, map_id=DEFAULT_MAP_ID,