add support for add/remove CA Certificates via cloud-config (LP: #915232)

This commit is contained in:
Scott Moser 2012-01-17 16:38:01 -05:00
commit 4ba79280a6
8 changed files with 540 additions and 6 deletions

View File

@ -19,6 +19,7 @@
in the payload parameter. (LP: #874342) in the payload parameter. (LP: #874342)
- add test case framework [Mike Milner] (LP: #890851) - add test case framework [Mike Milner] (LP: #890851)
- fix pylint warnings [Juerg Haefliger] (LP: #914739) - fix pylint warnings [Juerg Haefliger] (LP: #914739)
- add support for adding and deleting CA Certificates [Mike Milner] (LP: #915232)
0.6.2: 0.6.2:
- fix bug where update was not done unless update was explicitly set. - fix bug where update was not done unless update was explicitly set.
It would not be run if 'upgrade' or packages were set to be installed It would not be run if 'upgrade' or packages were set to be installed

View File

@ -0,0 +1,88 @@
# vi: ts=4 expandtab
#
# Author: Mike Milner <mike.milner@canonical.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from subprocess import check_call
from cloudinit.util import (write_file, get_cfg_option_list_or_str,
delete_dir_contents)
CA_CERT_PATH = "/usr/share/ca-certificates/"
CA_CERT_FILENAME = "cloud-init-ca-certs.crt"
CA_CERT_CONFIG = "/etc/ca-certificates.conf"
CA_CERT_SYSTEM_PATH = "/etc/ssl/certs/"
def update_ca_certs():
"""
Updates the CA certificate cache on the current machine.
"""
check_call(["update-ca-certificates"])
def add_ca_certs(certs):
"""
Adds certificates to the system. To actually apply the new certificates
you must also call L{update_ca_certs}.
@param certs: A list of certificate strings.
"""
if certs:
cert_file_contents = "\n".join(certs)
cert_file_fullpath = os.path.join(CA_CERT_PATH, CA_CERT_FILENAME)
write_file(cert_file_fullpath, cert_file_contents, mode=0644)
# Append cert filename to CA_CERT_CONFIG file.
write_file(CA_CERT_CONFIG, "\n%s" % CA_CERT_FILENAME, omode="a")
def remove_default_ca_certs():
"""
Removes all default trusted CA certificates from the system. To actually
apply the change you must also call L{update_ca_certs}.
"""
delete_dir_contents(CA_CERT_PATH)
delete_dir_contents(CA_CERT_SYSTEM_PATH)
write_file(CA_CERT_CONFIG, "", mode=0644)
def handle(_name, cfg, _cloud, log, _args):
"""
Call to handle ca-cert sections in cloud-config file.
@param name: The module name "ca-cert" from cloud.cfg
@param cfg: A nested dict containing the entire cloud config contents.
@param cloud: The L{CloudInit} object in use.
@param log: Pre-initialized Python logger object to use for logging.
@param args: Any module arguments from cloud.cfg
"""
# If there isn't a ca-certs section in the configuration don't do anything
if "ca-certs" not in cfg:
return
ca_cert_cfg = cfg['ca-certs']
# If there is a remove-defaults option set to true, remove the system
# default trusted CA certs first.
if ca_cert_cfg.get("remove-defaults", False):
log.debug("removing default certificates")
remove_default_ca_certs()
# If we are given any new trusted CA certs to add, add them.
if "trusted" in ca_cert_cfg:
trusted_certs = get_cfg_option_list_or_str(ca_cert_cfg, "trusted")
if trusted_certs:
log.debug("adding %d certificates" % len(trusted_certs))
add_ca_certs(trusted_certs)
# Update the system with the new cert configuration.
update_ca_certs()

View File

@ -18,6 +18,7 @@
import yaml import yaml
import os import os
import os.path import os.path
import shutil
import errno import errno
import subprocess import subprocess
from Cheetah.Template import Template from Cheetah.Template import Template
@ -94,13 +95,24 @@ def get_cfg_option_str(yobj, key, default=None):
def get_cfg_option_list_or_str(yobj, key, default=None): def get_cfg_option_list_or_str(yobj, key, default=None):
if key not in yobj: """
Gets the C{key} config option from C{yobj} as a list of strings. If the
key is present as a single string it will be returned as a list with one
string arg.
@param yobj: The configuration object.
@param key: The configuration key to get.
@param default: The default to return if key is not found.
@return: The configuration option as a list of strings or default if key
is not found.
"""
if not key in yobj:
return default return default
if yobj[key] is None: if yobj[key] is None:
return [] return []
if isinstance(yobj[key], list): if isinstance(yobj[key], list):
return yobj[key] return yobj[key]
return([yobj[key]]) return [yobj[key]]
# get a cfg entry by its path array # get a cfg entry by its path array
@ -114,9 +126,11 @@ def get_cfg_by_path(yobj, keyp, default=None):
return(cur) return(cur)
# merge values from cand into source
# if src has a key, cand will not override
def mergedict(src, cand): def mergedict(src, cand):
"""
Merge values from C{cand} into C{src}. If C{src} has a key C{cand} will
not override. Nested dictionaries are merged recursively.
"""
if isinstance(src, dict) and isinstance(cand, dict): if isinstance(src, dict) and isinstance(cand, dict):
for k, v in cand.iteritems(): for k, v in cand.iteritems():
if k not in src: if k not in src:
@ -126,7 +140,30 @@ def mergedict(src, cand):
return src return src
def delete_dir_contents(dirname):
"""
Deletes all contents of a directory without deleting the directory itself.
@param dirname: The directory whose contents should be deleted.
"""
for node in os.listdir(dirname):
node_fullpath = os.path.join(dirname, node)
if os.path.isdir(node_fullpath):
shutil.rmtree(node_fullpath)
else:
os.unlink(node_fullpath)
def write_file(filename, content, mode=0644, omode="wb"): def write_file(filename, content, mode=0644, omode="wb"):
"""
Writes a file with the given content and sets the file mode as specified.
Resotres the SELinux context if possible.
@param filename: The full path of the file to write.
@param content: The content to write to the file.
@param mode: The filesystem mode to set on the file.
@param omode: The open mode used when opening the file (r, rb, a, etc.)
"""
try: try:
os.makedirs(os.path.dirname(filename)) os.makedirs(os.path.dirname(filename))
except OSError as e: except OSError as e:
@ -134,7 +171,7 @@ def write_file(filename, content, mode=0644, omode="wb"):
raise e raise e
f = open(filename, omode) f = open(filename, omode)
if mode != None: if mode is not None:
os.chmod(filename, mode) os.chmod(filename, mode)
f.write(content) f.write(content)
f.close() f.close()

View File

@ -9,6 +9,7 @@ cloud_init_modules:
- set_hostname - set_hostname
- update_hostname - update_hostname
- update_etc_hosts - update_etc_hosts
- ca-certs
- rsyslog - rsyslog
- ssh - ssh

View File

@ -8,6 +8,7 @@ Build-Depends: cdbs,
python-nose, python-nose,
pyflakes, pyflakes,
pylint, pylint,
python-mocker,
XS-Python-Version: all XS-Python-Version: all
Standards-Version: 3.9.1 Standards-Version: 3.9.1

View File

@ -0,0 +1,31 @@
#cloud-config
#
# This is an example file to configure an instance's trusted CA certificates
# system-wide for SSL/TLS trust establishment when the instance boots for the
# first time.
#
# Make sure that this file is valid yaml before starting instances.
# It should be passed as user-data when starting the instance.
ca-certs:
# If present and set to True, the 'remove-defaults' parameter will remove
# all the default trusted CA certificates that are normally shipped with
# Ubuntu.
# This is mainly for paranoid admins - most users will not need this
# functionality.
remove-defaults: true
# If present, the 'trusted' parameter should contain a certificate (or list
# of certificates) to add to the system as trusted CA certificates.
# Pay close attention to the YAML multiline list syntax. The example shown
# here is for a list of multiline certificates.
trusted:
- |
-----BEGIN CERTIFICATE-----
YOUR-ORGS-TRUSTED-CA-CERT-HERE
-----END CERTIFICATE-----
- |
-----BEGIN CERTIFICATE-----
YOUR-ORGS-TRUSTED-CA-CERT-HERE
-----END CERTIFICATE-----

View File

@ -0,0 +1,178 @@
from mocker import MockerTestCase
from cloudinit.util import write_file, delete_dir_contents
from cloudinit.CloudConfig.cc_ca_certs import (
handle, update_ca_certs, add_ca_certs, remove_default_ca_certs)
from logging import getLogger
class TestNoConfig(MockerTestCase):
def setUp(self):
super(TestNoConfig, self).setUp()
self.name = "ca-certs"
self.cloud_init = None
self.log = getLogger("TestNoConfig")
self.args = []
def test_no_config(self):
"""
Test that nothing is done if no ca-certs configuration is provided.
"""
config = {"unknown-key": "value"}
self.mocker.replace(write_file, passthrough=False)
self.mocker.replace(update_ca_certs, passthrough=False)
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
class TestConfig(MockerTestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "ca-certs"
self.cloud_init = None
self.log = getLogger("TestNoConfig")
self.args = []
# Mock out the functions that actually modify the system
self.mock_add = self.mocker.replace(add_ca_certs, passthrough=False)
self.mock_update = self.mocker.replace(update_ca_certs,
passthrough=False)
self.mock_remove = self.mocker.replace(remove_default_ca_certs,
passthrough=False)
# Order must be correct
self.mocker.order()
def test_no_trusted_list(self):
"""
Test that no certificates are written if the 'trusted' key is not
present.
"""
config = {"ca-certs": {}}
# No functions should be called
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
def test_empty_trusted_list(self):
"""Test that no certificate are written if 'trusted' list is empty"""
config = {"ca-certs": {"trusted": []}}
# No functions should be called
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
def test_single_trusted(self):
"""Test that a single cert gets passed to add_ca_certs"""
config = {"ca-certs": {"trusted": ["CERT1"]}}
self.mock_add(["CERT1"])
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
def test_multiple_trusted(self):
"""Test that multiple certs get passed to add_ca_certs"""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
self.mock_add(["CERT1", "CERT2"])
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
def test_remove_default_ca_certs(self):
"""Test remove_defaults works as expected"""
config = {"ca-certs": {"remove-defaults": True}}
self.mock_remove()
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
def test_no_remove_defaults_if_false(self):
"""Test remove_defaults is not called when config value is False"""
config = {"ca-certs": {"remove-defaults": False}}
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
def test_correct_order_for_remove_then_add(self):
"""Test remove_defaults is not called when config value is False"""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
self.mock_remove()
self.mock_add(["CERT1"])
self.mock_update()
self.mocker.replay()
handle(self.name, config, self.cloud_init, self.log, self.args)
class TestAddCaCerts(MockerTestCase):
def test_no_certs_in_list(self):
"""Test that no certificate are written if not provided."""
self.mocker.replace(write_file, passthrough=False)
self.mocker.replay()
add_ca_certs([])
def test_single_cert(self):
"""Test adding a single certificate to the trusted CAs"""
cert = "CERT1\nLINE2\nLINE3"
mock_write = self.mocker.replace(write_file, passthrough=False)
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
cert, mode=0644)
mock_write("/etc/ca-certificates.conf",
"\ncloud-init-ca-certs.crt", omode="a")
self.mocker.replay()
add_ca_certs([cert])
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs"""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
expected_cert_file = "\n".join(certs)
mock_write = self.mocker.replace(write_file, passthrough=False)
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
expected_cert_file, mode=0644)
mock_write("/etc/ca-certificates.conf",
"\ncloud-init-ca-certs.crt", omode="a")
self.mocker.replay()
add_ca_certs(certs)
class TestUpdateCaCerts(MockerTestCase):
def test_commands(self):
mock_check_call = self.mocker.replace("subprocess.check_call",
passthrough=False)
mock_check_call(["update-ca-certificates"])
self.mocker.replay()
update_ca_certs()
class TestRemoveDefaultCaCerts(MockerTestCase):
def test_commands(self):
mock_delete_dir_contents = self.mocker.replace(delete_dir_contents,
passthrough=False)
mock_write = self.mocker.replace(write_file, passthrough=False)
mock_delete_dir_contents("/usr/share/ca-certificates/")
mock_delete_dir_contents("/etc/ssl/certs/")
mock_write("/etc/ca-certificates.conf", "", mode=0644)
self.mocker.replay()
remove_default_ca_certs()

View File

@ -1,15 +1,24 @@
from unittest import TestCase from unittest import TestCase
from mocker import MockerTestCase
from tempfile import mkdtemp
from shutil import rmtree
import os
import stat
from cloudinit.util import (mergedict, get_cfg_option_list_or_str, write_file,
delete_dir_contents)
from cloudinit.util import mergedict
class TestMergeDict(TestCase): class TestMergeDict(TestCase):
def test_simple_merge(self): def test_simple_merge(self):
"""Test simple non-conflict merge."""
source = {"key1": "value1"} source = {"key1": "value1"}
candidate = {"key2": "value2"} candidate = {"key2": "value2"}
result = mergedict(source, candidate) result = mergedict(source, candidate)
self.assertEqual({"key1": "value1", "key2": "value2"}, result) self.assertEqual({"key1": "value1", "key2": "value2"}, result)
def test_nested_merge(self): def test_nested_merge(self):
"""Test nested merge."""
source = {"key1": {"key1.1": "value1.1"}} source = {"key1": {"key1.1": "value1.1"}}
candidate = {"key1": {"key1.2": "value1.2"}} candidate = {"key1": {"key1.2": "value1.2"}}
result = mergedict(source, candidate) result = mergedict(source, candidate)
@ -17,37 +26,225 @@ class TestMergeDict(TestCase):
{"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result) {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
def test_merge_does_not_override(self): def test_merge_does_not_override(self):
"""Test that candidate doesn't override source."""
source = {"key1": "value1", "key2": "value2"} source = {"key1": "value1", "key2": "value2"}
candidate = {"key2": "value2", "key2": "NEW VALUE"} candidate = {"key2": "value2", "key2": "NEW VALUE"}
result = mergedict(source, candidate) result = mergedict(source, candidate)
self.assertEqual(source, result) self.assertEqual(source, result)
def test_empty_candidate(self): def test_empty_candidate(self):
"""Test empty candidate doesn't change source."""
source = {"key": "value"} source = {"key": "value"}
candidate = {} candidate = {}
result = mergedict(source, candidate) result = mergedict(source, candidate)
self.assertEqual(source, result) self.assertEqual(source, result)
def test_empty_source(self): def test_empty_source(self):
"""Test empty source is replaced by candidate."""
source = {} source = {}
candidate = {"key": "value"} candidate = {"key": "value"}
result = mergedict(source, candidate) result = mergedict(source, candidate)
self.assertEqual(candidate, result) self.assertEqual(candidate, result)
def test_non_dict_candidate(self): def test_non_dict_candidate(self):
"""Test non-dict candidate is discarded."""
source = {"key": "value"} source = {"key": "value"}
candidate = "not a dict" candidate = "not a dict"
result = mergedict(source, candidate) result = mergedict(source, candidate)
self.assertEqual(source, result) self.assertEqual(source, result)
def test_non_dict_source(self): def test_non_dict_source(self):
"""Test non-dict source is not modified with a dict candidate."""
source = "not a dict" source = "not a dict"
candidate = {"key": "value"} candidate = {"key": "value"}
result = mergedict(source, candidate) result = mergedict(source, candidate)
self.assertEqual(source, result) self.assertEqual(source, result)
def test_neither_dict(self): def test_neither_dict(self):
"""Test if neither candidate or source is dict source wins."""
source = "source" source = "source"
candidate = "candidate" candidate = "candidate"
result = mergedict(source, candidate) result = mergedict(source, candidate)
self.assertEqual(source, result) self.assertEqual(source, result)
class TestGetCfgOptionListOrStr(TestCase):
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""
config = {}
result = get_cfg_option_list_or_str(config, "key")
self.assertIsNone(result)
def test_not_found_with_default(self):
"""Default is returned if key is not found."""
config = {}
result = get_cfg_option_list_or_str(config, "key", default=["DEFAULT"])
self.assertEqual(["DEFAULT"], result)
def test_found_with_default(self):
"""Default is not returned if key is found."""
config = {"key": ["value1"]}
result = get_cfg_option_list_or_str(config, "key", default=["DEFAULT"])
self.assertEqual(["value1"], result)
def test_found_convert_to_list(self):
"""Single string is converted to one element list."""
config = {"key": "value1"}
result = get_cfg_option_list_or_str(config, "key")
self.assertEqual(["value1"], result)
def test_value_is_none(self):
"""If value is None empty list is returned."""
config = {"key": None}
result = get_cfg_option_list_or_str(config, "key")
self.assertEqual([], result)
class TestWriteFile(MockerTestCase):
def setUp(self):
super(TestWriteFile, self).setUp()
# Make a temp directoy for tests to use.
self.tmp = mkdtemp(prefix="unittest_")
def tearDown(self):
super(TestWriteFile, self).tearDown()
# Clean up temp directory
rmtree(self.tmp)
def test_basic_usage(self):
"""Verify basic usage with default args."""
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
write_file(path, contents)
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
with open(path) as f:
create_contents = f.read()
self.assertEqual(contents, create_contents)
file_stat = os.stat(path)
self.assertEqual(0644, stat.S_IMODE(file_stat.st_mode))
def test_dir_is_created_if_required(self):
"""Verifiy that directories are created is required."""
dirname = os.path.join(self.tmp, "subdir")
path = os.path.join(dirname, "NewFile.txt")
contents = "Hey there"
write_file(path, contents)
self.assertTrue(os.path.isdir(dirname))
self.assertTrue(os.path.isfile(path))
def test_custom_mode(self):
"""Verify custom mode works properly."""
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
write_file(path, contents, mode=0666)
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
file_stat = os.stat(path)
self.assertEqual(0666, stat.S_IMODE(file_stat.st_mode))
def test_custom_omode(self):
"""Verify custom omode works properly."""
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
# Create file first with basic content
with open(path, "wb") as f:
f.write("LINE1\n")
write_file(path, contents, omode="a")
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
with open(path) as f:
create_contents = f.read()
self.assertEqual("LINE1\nHey there", create_contents)
def test_restorecon_if_possible_is_called(self):
"""Make sure the restorecon_if_possible is called correctly."""
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
# Mock out the restorecon_if_possible call to test if it's called.
mock_restorecon = self.mocker.replace(
"cloudinit.util.restorecon_if_possible", passthrough=False)
mock_restorecon(path)
self.mocker.replay()
write_file(path, contents)
class TestDeleteDirContents(TestCase):
def setUp(self):
super(TestDeleteDirContents, self).setUp()
# Make a temp directoy for tests to use.
self.tmp = mkdtemp(prefix="unittest_")
def tearDown(self):
super(TestDeleteDirContents, self).tearDown()
# Clean up temp directory
rmtree(self.tmp)
def assertDirEmpty(self, dirname):
self.assertEqual([], os.listdir(dirname))
def test_does_not_delete_dir(self):
"""Ensure directory itself is not deleted."""
delete_dir_contents(self.tmp)
self.assertTrue(os.path.isdir(self.tmp))
self.assertDirEmpty(self.tmp)
def test_deletes_files(self):
"""Single file should be deleted."""
with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f:
f.write("DELETE ME")
delete_dir_contents(self.tmp)
self.assertDirEmpty(self.tmp)
def test_deletes_empty_dirs(self):
"""Empty directories should be deleted."""
os.mkdir(os.path.join(self.tmp, "new_dir"))
delete_dir_contents(self.tmp)
self.assertDirEmpty(self.tmp)
def test_deletes_nested_dirs(self):
"""Nested directories should be deleted."""
os.mkdir(os.path.join(self.tmp, "new_dir"))
os.mkdir(os.path.join(self.tmp, "new_dir", "new_subdir"))
delete_dir_contents(self.tmp)
self.assertDirEmpty(self.tmp)
def test_deletes_non_empty_dirs(self):
"""Non-empty directories should be deleted."""
os.mkdir(os.path.join(self.tmp, "new_dir"))
f_name = os.path.join(self.tmp, "new_dir", "new_file.txt")
with open(f_name, "wb") as f:
f.write("DELETE ME")
delete_dir_contents(self.tmp)
self.assertDirEmpty(self.tmp)
def test_deletes_symlinks(self):
"""Symlinks should be deleted."""
file_name = os.path.join(self.tmp, "new_file.txt")
link_name = os.path.join(self.tmp, "new_file_link.txt")
with open(file_name, "wb") as f:
f.write("DELETE ME")
os.symlink(file_name, link_name)
delete_dir_contents(self.tmp)
self.assertDirEmpty(self.tmp)