initial vpn ipsec resources

Supporting new vpn ipsec objects, which were added at NSX 2.2
- IkeProfile
- IPSecTunnelProfile
- IPSecDpdProfile
- IPSecPeerEndpoint
- IPSecLocalEndpoint
- VPN service
- VPN Session with rules (policy based only)

Change-Id: I48139f9f0cc7e1b998efcf6fc7f50ac8e596bc6c
This commit is contained in:
Adit Sarfaty 2017-11-07 13:04:21 +02:00
parent 094cb0bcb4
commit bc57bc7037
4 changed files with 802 additions and 0 deletions

View File

@ -449,3 +449,40 @@ FAKE_CERT_PEM = (
"Nd3coEUMwd16vr57QJatJbTo/wVMMbvW3vqVy0AuXReHCPVTDF5+vnsMGXK/IV7w\n"
"LzulLswFmA==\n"
"-----END CERTIFICATE-----\n")
FAKE_DPD_ID = "c933402b-f111-4634-9d66-cc8fffde0f65"
FAKE_DPD = {
"resource_type": "IPSecVPNDPDProfile",
"description": "neutron dpd profile",
"id": FAKE_DPD_ID,
"display_name": "con1-dpd-profile",
"enabled": True,
"timeout": 120,
}
FAKE_PEP_ID = "a7b2915c-2041-4a33-9ea7-9d22b67bf38e"
FAKE_PEP = {
"resource_type": "IPSecVPNPeerEndpoint",
"id": FAKE_PEP_ID,
"display_name": "con1",
"connection_initiation_mode": "INITIATOR",
"authentication_mode": "PSK",
"ipsec_tunnel_profile_id": "76e3707d-22e5-4e36-a9ef-b568215e2481",
"dpd_profile_id": "04191f5f-3bdd-4ec1-ae56-154b06778d4f",
"ike_profile_id": "df386534-5cec-49b4-9c21-4c212cba3cbf",
"peer_address": "172.24.4.233",
"peer_id": "172.24.4.233"
}
FAKE_LEP_ID = "cb57de72-4adb-4dad-9abc-685f9f1d0265"
FAKE_LEP = {
"resource_type": "IPSecVPNLocalEndpoint",
"description": "XXX",
"id": FAKE_LEP_ID,
"display_name": "XXX",
"local_id": "1.1.1.1",
"ipsec_vpn_service_id": "aca38a11-981b-46d8-9e2c-9bedc0d96794",
"local_address": "1.1.1.1",
"trust_ca_ids": [],
"trust_crl_ids": [],
}

View File

@ -0,0 +1,311 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_serialization import jsonutils
from vmware_nsxlib.tests.unit.v3 import test_client
from vmware_nsxlib.tests.unit.v3 import test_constants
from vmware_nsxlib.tests.unit.v3 import test_resources
from vmware_nsxlib.v3 import vpn_ipsec
class TestIkeProfile(test_resources.BaseTestResource):
def setUp(self):
super(TestIkeProfile, self).setUp(
vpn_ipsec.IkeProfile)
def test_ike_profile_create(self):
mocked_resource = self.get_mocked_resource()
name = 'ike_profile'
description = 'desc'
enc_alg = vpn_ipsec.EncryptionAlgorithmTypes.ENCRYPTION_ALGORITHM_128
dig_alg = vpn_ipsec.DigestAlgorithmTypes.DIGEST_ALGORITHM_SHA1
ike_ver = vpn_ipsec.IkeVersionTypes.IKE_VERSION_V1
dh_group = vpn_ipsec.DHGroupTypes.DH_GROUP_14
lifetime = 100
mocked_resource.create(name, description=description,
encryption_algorithm=enc_alg,
digest_algorithm=dig_alg,
ike_version=ike_ver,
pfs=True,
dh_group=dh_group,
sa_life_time=lifetime)
test_client.assert_json_call(
'post', mocked_resource,
'https://1.2.3.4/api/v1/%s' % mocked_resource.uri_segment,
data=jsonutils.dumps({
'display_name': name,
'description': description,
'encryption_algorithms': [enc_alg],
'digest_algorithms': [dig_alg],
'ike_version': ike_ver,
'enable_perfect_forward_secrecy': True,
'dh_groups': [dh_group],
'sa_life_time': {'unit': 'SEC', 'value': lifetime}
}, sort_keys=True),
headers=self.default_headers())
class TestIPSecTunnelProfile(test_resources.BaseTestResource):
def setUp(self):
super(TestIPSecTunnelProfile, self).setUp(
vpn_ipsec.IPSecTunnelProfile)
def test_ipsec_profile_create(self):
mocked_resource = self.get_mocked_resource()
name = 'ipsec_profile'
description = 'desc'
enc_alg = vpn_ipsec.EncryptionAlgorithmTypes.ENCRYPTION_ALGORITHM_128
dig_alg = vpn_ipsec.DigestAlgorithmTypes.DIGEST_ALGORITHM_SHA1
dh_group = vpn_ipsec.DHGroupTypes.DH_GROUP_14
lifetime = 100
mocked_resource.create(name, description=description,
encryption_algorithm=enc_alg,
digest_algorithm=dig_alg,
pfs=True,
dh_group=dh_group,
sa_life_time=lifetime)
test_client.assert_json_call(
'post', mocked_resource,
'https://1.2.3.4/api/v1/%s' % mocked_resource.uri_segment,
data=jsonutils.dumps({
'display_name': name,
'description': description,
'encryption_algorithms': [enc_alg],
'digest_algorithms': [dig_alg],
'enable_perfect_forward_secrecy': True,
'dh_groups': [dh_group],
'sa_life_time': {'unit': 'SEC', 'value': lifetime}
}, sort_keys=True),
headers=self.default_headers())
class TestIPSecDpdProfile(test_resources.BaseTestResource):
def setUp(self):
super(TestIPSecDpdProfile, self).setUp(
vpn_ipsec.IPSecDpdProfile)
def test_dpd_profile_create(self):
mocked_resource = self.get_mocked_resource()
name = 'dpd_profile'
description = 'desc'
timeout = 100
enabled = True
mocked_resource.create(name, description=description,
timeout=timeout,
enabled=enabled)
test_client.assert_json_call(
'post', mocked_resource,
'https://1.2.3.4/api/v1/%s' % mocked_resource.uri_segment,
data=jsonutils.dumps({
'display_name': name,
'description': description,
'timeout': timeout,
'enabled': enabled
}, sort_keys=True),
headers=self.default_headers())
def test_dpd_profile_update(self):
fake_dpd = test_constants.FAKE_DPD.copy()
new_timeout = 1000
uuid = test_constants.FAKE_DPD_ID
mocked_resource = self.get_mocked_resource(response=fake_dpd)
mocked_resource.update(uuid, timeout=new_timeout)
fake_dpd['timeout'] = new_timeout
test_client.assert_json_call(
'put', mocked_resource,
'https://1.2.3.4/api/v1/%s/%s' % (mocked_resource.uri_segment,
uuid),
data=jsonutils.dumps(fake_dpd, sort_keys=True),
headers=self.default_headers())
class TestIPSecPeerEndpoint(test_resources.BaseTestResource):
def setUp(self):
super(TestIPSecPeerEndpoint, self).setUp(
vpn_ipsec.IPSecPeerEndpoint)
def test_peer_endpoint_create(self):
mocked_resource = self.get_mocked_resource()
name = 'peerep'
description = 'desc'
peer_address = peer_id = '1.1.1.1'
authentication_mode = 'PSK'
dpd_profile_id = 'uuid1'
ike_profile_id = 'uuid2'
ipsec_profile_id = 'uuid3'
initiation_mode = 'INITIATOR'
psk = 'secret'
mocked_resource.create(name, peer_address, peer_id,
description=description,
authentication_mode=authentication_mode,
dpd_profile_id=dpd_profile_id,
ike_profile_id=ike_profile_id,
ipsec_tunnel_profile_id=ipsec_profile_id,
connection_initiation_mode=initiation_mode,
psk=psk)
test_client.assert_json_call(
'post', mocked_resource,
'https://1.2.3.4/api/v1/%s' % mocked_resource.uri_segment,
data=jsonutils.dumps({
'display_name': name,
'peer_address': peer_address,
'peer_id': peer_id,
'description': description,
'authentication_mode': authentication_mode,
'dpd_profile_id': dpd_profile_id,
'ike_profile_id': ike_profile_id,
'ipsec_tunnel_profile_id': ipsec_profile_id,
'connection_initiation_mode': initiation_mode,
'psk': psk
}, sort_keys=True),
headers=self.default_headers())
def test_peer_endpoint_update(self):
fake_pep = test_constants.FAKE_PEP.copy()
new_desc = 'updated'
new_name = 'new'
new_psk = 'psk12'
uuid = test_constants.FAKE_PEP_ID
mocked_resource = self.get_mocked_resource(response=fake_pep)
mocked_resource.update(uuid, name=new_name, description=new_desc,
psk=new_psk)
fake_pep['description'] = new_desc
fake_pep['display_name'] = new_name
fake_pep['psk'] = new_psk
test_client.assert_json_call(
'put', mocked_resource,
'https://1.2.3.4/api/v1/%s/%s' % (mocked_resource.uri_segment,
uuid),
data=jsonutils.dumps(fake_pep, sort_keys=True),
headers=self.default_headers())
class TestLocalEndpoint(test_resources.BaseTestResource):
def setUp(self):
super(TestLocalEndpoint, self).setUp(
vpn_ipsec.LocalEndpoint)
def test_local_endpoint_create(self):
mocked_resource = self.get_mocked_resource()
name = 'localep'
description = 'desc'
local_address = local_id = '1.1.1.1'
ipsec_vpn_service_id = 'uuid1'
mocked_resource.create(name, local_address, ipsec_vpn_service_id,
description=description,
local_id=local_id)
test_client.assert_json_call(
'post', mocked_resource,
'https://1.2.3.4/api/v1/%s' % mocked_resource.uri_segment,
data=jsonutils.dumps({
'display_name': name,
'local_address': local_address,
'local_id': local_id,
'description': description,
'ipsec_vpn_service_id': ipsec_vpn_service_id
}, sort_keys=True),
headers=self.default_headers())
def test_local_endpoint_update(self):
fake_pep = test_constants.FAKE_LEP.copy()
new_desc = 'updated'
new_name = 'new'
new_addr = '2.2.2.2'
uuid = test_constants.FAKE_LEP_ID
mocked_resource = self.get_mocked_resource(response=fake_pep)
mocked_resource.update(uuid, name=new_name, description=new_desc,
local_address=new_addr,
local_id=new_addr)
fake_pep['description'] = new_desc
fake_pep['display_name'] = new_name
fake_pep['local_address'] = new_addr
fake_pep['local_id'] = new_addr
test_client.assert_json_call(
'put', mocked_resource,
'https://1.2.3.4/api/v1/%s/%s' % (mocked_resource.uri_segment,
uuid),
data=jsonutils.dumps(fake_pep, sort_keys=True),
headers=self.default_headers())
class TestSession(test_resources.BaseTestResource):
def setUp(self):
super(TestSession, self).setUp(
vpn_ipsec.Session)
def test_session_create(self):
mocked_resource = self.get_mocked_resource()
name = 'session'
description = 'desc'
local_ep_id = 'uuid1'
peer_ep_id = 'uuid2'
policy_rules = []
mocked_resource.create(name, local_ep_id, peer_ep_id, policy_rules,
description=description)
test_client.assert_json_call(
'post', mocked_resource,
'https://1.2.3.4/api/v1/%s' % mocked_resource.uri_segment,
data=jsonutils.dumps({
'display_name': name,
'description': description,
'local_endpoint_id': local_ep_id,
'peer_endpoint_id': peer_ep_id,
'enabled': True,
'resource_type': mocked_resource.resource_type,
'policy_rules': policy_rules,
}, sort_keys=True),
headers=self.default_headers())
# TODO(asarfaty): add tests for update & rules
class TestService(test_resources.BaseTestResource):
def setUp(self):
super(TestService, self).setUp(
vpn_ipsec.Service)
def test_service_create(self):
mocked_resource = self.get_mocked_resource()
router_id = 'abcd'
enabled = True
log_level = "DEBUG"
name = 'service'
mocked_resource.create(name, router_id, ike_log_level=log_level,
enabled=enabled)
test_client.assert_json_call(
'post', mocked_resource,
'https://1.2.3.4/api/v1/%s' % mocked_resource.uri_segment,
data=jsonutils.dumps({
'display_name': name,
'logical_router_id': {'target_id': router_id},
'ike_log_level': log_level,
'enabled': enabled
}, sort_keys=True),
headers=self.default_headers())

View File

@ -34,6 +34,7 @@ from vmware_nsxlib.v3 import router
from vmware_nsxlib.v3 import security
from vmware_nsxlib.v3 import trust_management
from vmware_nsxlib.v3 import utils
from vmware_nsxlib.v3 import vpn_ipsec
LOG = log.getLogger(__name__)
@ -272,6 +273,9 @@ class NsxLib(NsxLibBase):
self.client, self.nsxlib_config, nsxlib=self)
self.vif = core_resources.NsxLibFabricVirtualInterface(
self.client, self.nsxlib_config, nsxlib=self)
self.vpn_ipsec = vpn_ipsec.VpnIpSec(
self.client, self.nsxlib_config, nsxlib=self)
# Update tag limits
self.tag_limits = self.get_tag_limits()
utils.update_tag_limits(self.tag_limits)

View File

@ -0,0 +1,450 @@
# Copyright 2017 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from vmware_nsxlib.v3 import utils
LOG = logging.getLogger(__name__)
VPN_IPSEC_PATH = 'vpn/ipsec/'
# TODO(asarfaty) Add update for tags
class IkeVersionTypes(object):
"""Supported IKE versions (NSX default is V2)"""
IKE_VERSION_V1 = 'IKE_V1'
IKE_VERSION_V2 = 'IKE_V2'
IKE_VERSION_Flex = 'IKE_FLEX'
class EncryptionAlgorithmTypes(object):
"""Supported encryption algorithms (NSX default is GCM)"""
ENCRYPTION_ALGORITHM_128 = 'AES128'
ENCRYPTION_ALGORITHM_256 = 'AES256'
ENCRYPTION_ALGORITHM_GCM = 'AESGCM'
class DigestAlgorithmTypes(object):
"""Supported digest (auth) algorithms (NSX default is None)"""
DIGEST_ALGORITHM_SHA1 = 'SHA1'
DIGEST_ALGORITHM_SHA256 = 'SHA256'
DIGEST_ALGORITHM_GMAC_128 = 'GMAC_128'
DIGEST_ALGORITHM_GMAC_256 = 'GMAC_256'
class DHGroupTypes(object):
"""Supported DH groups for Perfect Forward Secrecy"""
DH_GROUP_2 = 'GROUP2'
DH_GROUP_5 = 'GROUP5'
DH_GROUP_14 = 'GROUP14'
DH_GROUP_15 = 'GROUP15'
DH_GROUP_16 = 'GROUP16'
class EncapsulationModeTypes(object):
"""Supported encapsulation modes for ipsec tunnel profile"""
ENCAPSULATION_MODE_TUNNEL = 'TUNNEL_MODE'
class TransformProtocolTypes(object):
"""Supported transform protocols for ipsec tunnel profile"""
TRANSFORM_PROTOCOL_ESP = 'ESP'
class AuthenticationModeTypes(object):
"""Supported authentication modes for ipsec peer endpoint (default PSK)"""
AUTH_MODE_PSK = 'PSK'
AUTH_MODE_CERT = 'CERTIFICATE'
class DpdProfileActionTypes(object):
"""Supported DPD profile actions"""
DPD_PROFILE_ACTION_HOLD = 'HOLD'
class DpdProfileTimeoutLimits(object):
"""Supported DPD timeout range"""
DPD_TIMEOUT_MIN = 10
DPD_TIMEOUT_MAX = 3600
class SALifetimeLimits(object):
"""Limits to the allowed SA lifetime in seconds"""
SA_LIFETIME_MIN = 90
SA_LIFETIME_MAX = 365 * 24 * 3600
class ConnectionInitiationModeTypes(object):
"""Supported connection initiation mode type"""
INITIATION_MODE_INITIATOR = 'INITIATOR'
INITIATION_MODE_RESPOND_ONLY = 'RESPOND_ONLY'
class IkeLogLevelTypes(object):
"""Supported service IKE log levels (default ERROR)"""
LOG_LEVEL_DEBUG = 'DEBUG'
LOG_LEVEL_INFO = 'INFO'
LOG_LEVEL_WARN = 'WARN'
LOG_LEVEL_ERROR = 'ERROR'
class PolicyRuleActionTypes(object):
POLICY_RULE_ACTION_BYPASS = 'BYPASS'
POLICY_RULE_ACTION_PROTECT = 'PROTECT'
class IkeProfile(utils.NsxLibApiBase):
@property
def resource_type(self):
return 'IPSecVPNIKEProfile'
@property
def uri_segment(self):
return VPN_IPSEC_PATH + 'ike-profiles'
def create(self, name, description=None,
encryption_algorithm=None,
digest_algorithm=None,
ike_version=None,
pfs=None,
dh_group=None,
sa_life_time=None,
tags=None):
# mandatory parameters
body = {'display_name': name}
# optional parameters
if description:
body['description'] = description
if encryption_algorithm:
body['encryption_algorithms'] = [encryption_algorithm]
if digest_algorithm:
body['digest_algorithms'] = [digest_algorithm]
if ike_version:
body['ike_version'] = ike_version
if sa_life_time:
body['sa_life_time'] = {'unit': 'SEC', 'value': sa_life_time}
if dh_group:
body['dh_groups'] = [dh_group]
if tags:
body['tags'] = tags
# Boolean parameters
if pfs is not None:
body['enable_perfect_forward_secrecy'] = pfs
return self.client.create(self.get_path(), body=body)
class IPSecTunnelProfile(utils.NsxLibApiBase):
@property
def resource_type(self):
return 'IPSecVPNTunnelProfile'
@property
def uri_segment(self):
return VPN_IPSEC_PATH + 'tunnel-profiles'
def create(self, name, description=None,
encryption_algorithm=None,
digest_algorithm=None,
pfs=None,
dh_group=None,
sa_life_time=None,
tags=None):
# mandatory parameters
body = {'display_name': name}
# optional parameters
if description:
body['description'] = description
if encryption_algorithm:
body['encryption_algorithms'] = [encryption_algorithm]
if digest_algorithm:
body['digest_algorithms'] = [digest_algorithm]
if sa_life_time:
body['sa_life_time'] = {'unit': 'SEC', 'value': sa_life_time}
if dh_group:
body['dh_groups'] = [dh_group]
if tags:
body['tags'] = tags
# Boolean parameters
if pfs is not None:
body['enable_perfect_forward_secrecy'] = pfs
return self.client.create(self.get_path(), body=body)
class IPSecDpdProfile(utils.NsxLibApiBase):
@property
def resource_type(self):
return 'IPSecVPNDPDProfile'
@property
def uri_segment(self):
return VPN_IPSEC_PATH + 'dpd-profiles'
def create(self, name, description=None,
enabled=None,
timeout=None,
tags=None):
# mandatory parameters
body = {'display_name': name}
# optional parameters
if description:
body['description'] = description
if timeout:
body['timeout'] = timeout
# Boolean parameters
if enabled is not None:
body['enabled'] = enabled
if tags:
body['tags'] = tags
return self.client.create(self.get_path(), body=body)
def update(self, profile_id, enabled=None,
timeout=None, tags=None):
body = self.get(profile_id)
if timeout:
body['timeout'] = timeout
if enabled is not None:
body['enabled'] = enabled
if tags is not None:
body['tags'] = tags
return self.client.update(self.get_path(profile_id), body=body)
class IPSecPeerEndpoint(utils.NsxLibApiBase):
@property
def resource_type(self):
return 'IPSecVPNPeerEndpoint'
@property
def uri_segment(self):
return VPN_IPSEC_PATH + 'peer-endpoints'
def create(self, name, peer_address, peer_id,
description=None,
authentication_mode=None,
dpd_profile_id=None,
ike_profile_id=None,
ipsec_tunnel_profile_id=None,
connection_initiation_mode=None,
psk=None, tags=None):
# mandatory parameters
body = {'display_name': name,
'peer_address': peer_address,
'peer_id': peer_id}
# optional parameters
if description:
body['description'] = description
if authentication_mode:
body['authentication_mode'] = authentication_mode
if dpd_profile_id:
body['dpd_profile_id'] = dpd_profile_id
if ike_profile_id:
body['ike_profile_id'] = ike_profile_id
if ipsec_tunnel_profile_id:
body['ipsec_tunnel_profile_id'] = ipsec_tunnel_profile_id
if psk:
body['psk'] = psk
if connection_initiation_mode:
body['connection_initiation_mode'] = connection_initiation_mode
if tags:
body['tags'] = tags
return self.client.create(self.get_path(), body=body)
def update(self, uuid, name=None, description=None, peer_address=None,
peer_id=None, connection_initiation_mode=None, psk=None,
tags=None):
body = self.get(uuid)
if description:
body['description'] = description
if name:
body['display_name'] = name
if psk:
body['psk'] = psk
if connection_initiation_mode:
body['connection_initiation_mode'] = connection_initiation_mode
if peer_address:
body['peer_address'] = peer_address
if peer_id:
body['peer_id'] = peer_id
if tags is not None:
body['tags'] = tags
return self.client.update(self.get_path(uuid), body=body)
class LocalEndpoint(utils.NsxLibApiBase):
@property
def resource_type(self):
return 'IPSecVPNLocalEndpoint'
@property
def uri_segment(self):
return VPN_IPSEC_PATH + 'local-endpoints'
def create(self, name, local_address, ipsec_vpn_service_id,
description=None,
local_id=None,
certificate_id=None,
trust_ca_ids=None,
trust_crl_ids=None,
tags=None):
# mandatory parameters
body = {'display_name': name,
'local_address': local_address,
'ipsec_vpn_service_id': ipsec_vpn_service_id}
# optional parameters
if description:
body['description'] = description
if local_id:
body['local_id'] = local_id
if certificate_id:
body['certificate_id'] = certificate_id
if trust_ca_ids:
body['trust_ca_ids'] = trust_ca_ids
if trust_crl_ids:
body['trust_crl_ids'] = trust_crl_ids
if tags:
body['tags'] = tags
return self.client.create(self.get_path(), body=body)
def update(self, uuid, name=None, description=None, local_address=None,
ipsec_vpn_service_id=None, local_id=None,
certificate_id=None,
trust_ca_ids=None,
trust_crl_ids=None,
tags=None):
body = self.get(uuid)
if description:
body['description'] = description
if name:
body['display_name'] = name
if local_address:
body['local_address'] = local_address
if ipsec_vpn_service_id:
body['ipsec_vpn_service_id'] = ipsec_vpn_service_id
if local_id:
body['local_id'] = local_id
if certificate_id:
body['certificate_id'] = certificate_id
if trust_ca_ids:
body['trust_ca_ids'] = trust_ca_ids
if trust_crl_ids:
body['trust_crl_ids'] = trust_crl_ids
if tags is not None:
body['tags'] = tags
return self.client.update(self.get_path(uuid), body=body)
class Session(utils.NsxLibApiBase):
@property
def resource_type(self):
return 'PolicyBasedIPSecVPNSession'
@property
def uri_segment(self):
return VPN_IPSEC_PATH + 'sessions'
def create(self, name, local_endpoint_id, peer_endpoint_id,
policy_rules, description=None,
enabled=True, tags=None):
# mandatory parameters
body = {'display_name': name,
'description': description,
'local_endpoint_id': local_endpoint_id,
'peer_endpoint_id': peer_endpoint_id,
'enabled': enabled,
'resource_type': self.resource_type,
'policy_rules': policy_rules}
if tags:
body['tags'] = tags
return self.client.create(self.get_path(), body=body)
def get_rule_obj(self, sources, destinations,
action=PolicyRuleActionTypes.POLICY_RULE_ACTION_PROTECT):
src_subnets = [{'subnet': src} for src in sources]
dst_subnets = [{'subnet': dst} for dst in destinations]
return {
'sources': src_subnets,
'destinations': dst_subnets,
'action': action
}
def update(self, uuid, name=None, description=None, policy_rules=None,
tags=None):
body = self.get(uuid)
if description:
body['description'] = description
if name:
body['display_name'] = name
if name:
body['display_name'] = name
if policy_rules is not None:
body['policy_rules'] = policy_rules
return self.client.update(self.get_path(uuid), body=body)
class Service(utils.NsxLibApiBase):
@property
def resource_type(self):
return 'IPSecVPNService'
@property
def uri_segment(self):
return VPN_IPSEC_PATH + 'services'
def create(self, name, logical_router_id,
enabled=True, ike_log_level="ERROR",
tags=None):
# mandatory parameters
body = {'display_name': name,
'logical_router_id': {'target_id': logical_router_id}}
# optional parameters
if ike_log_level:
body['ike_log_level'] = ike_log_level
if enabled is not None:
body['enabled'] = enabled
if tags:
body['tags'] = tags
return self.client.create(self.get_path(), body=body)
class VpnIpSec(object):
"""This is the class that have all vpn ipsec resource clients"""
def __init__(self, client, nsxlib_config, nsxlib=None):
self.ike_profile = IkeProfile(client, nsxlib_config, nsxlib=nsxlib)
self.tunnel_profile = IPSecTunnelProfile(client, nsxlib_config,
nsxlib=nsxlib)
self.dpd_profile = IPSecDpdProfile(client, nsxlib_config,
nsxlib=nsxlib)
self.peer_endpoint = IPSecPeerEndpoint(client, nsxlib_config,
nsxlib=nsxlib)
self.local_endpoint = LocalEndpoint(client, nsxlib_config,
nsxlib=nsxlib)
self.session = Session(client, nsxlib_config, nsxlib=nsxlib)
self.service = Service(client, nsxlib_config, nsxlib=nsxlib)