From 68993e593253ea1dda8a7aed713a11474cba85ff Mon Sep 17 00:00:00 2001 From: Peter Sabaini Date: Fri, 20 Dec 2024 17:54:28 +0100 Subject: [PATCH] SSL handling: check cert/key before attempting to set Also update some dependencies and fix tox config Change-Id: Ief48e2f67130f45b4aa83840636b11313a44bb96 Signed-off-by: Peter Sabaini --- src/ceph_dashboard_commands.py | 71 ++++++++++- src/charm.py | 10 ++ test-requirements.txt | 2 +- tox.ini | 13 +- unit_tests/test_ceph_dashboard_commands.py | 136 +++++++++++++++++++++ 5 files changed, 226 insertions(+), 6 deletions(-) create mode 100644 unit_tests/test_ceph_dashboard_commands.py diff --git a/src/ceph_dashboard_commands.py b/src/ceph_dashboard_commands.py index d047e89..6dfbf49 100644 --- a/src/ceph_dashboard_commands.py +++ b/src/ceph_dashboard_commands.py @@ -5,8 +5,10 @@ # Learn more at: https://juju.is/docs/sdk import json +import os import socket -from typing import List +import tempfile +from typing import List, Tuple from functools import partial import subprocess @@ -150,3 +152,70 @@ def check_ceph_dashboard_ssl_configured( return False return True + + +def validate_ssl_keypair(cert: bytes, key: bytes) -> Tuple[bool, str]: + """Validates if a private key matches a certificate + + Args: + cert, key (str): SSL material + + Returns: + Tuple[bool, str]: bool for validaity and err message + """ + try: + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as cert_temp: + cert_temp.write(cert) + cert_path = cert_temp.name + + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as key_temp: + key_temp.write(key) + key_path = key_temp.name + except IOError as e: + return False, f"Failed to create temporary files: {str(e)}" + + try: + # check if pubkeys from cert and key match + try: + cert_pubkey_cmd = subprocess.run( + ["openssl", "x509", "-in", cert_path, "-noout", "-pubkey"], + capture_output=True, + text=True, + check=True, + ) + cert_pubkey = cert_pubkey_cmd.stdout.strip() + except subprocess.CalledProcessError as e: + return ( + False, + f"Failed to extract pubkey from cert: {e.stderr.strip()}", + ) + + try: + key_pubkey_cmd = subprocess.run( + ["openssl", "rsa", "-in", key_path, "-pubout"], + capture_output=True, + text=True, + check=True, + ) + key_pubkey = key_pubkey_cmd.stdout.strip() + except subprocess.CalledProcessError as e: + return ( + False, + f"Failed to extract pubkey from priv key: {e.stderr.strip()}", + ) + + if cert_pubkey != key_pubkey: + return False, "Certificate and private key do not match" + + return ( + True, + "Certificate and private key match and certificate is valid", + ) + + finally: + # Best effort clean up + try: + os.unlink(cert_path) + os.unlink(key_path) + except Exception: + pass diff --git a/src/charm.py b/src/charm.py index fe8eb69..9313f5c 100755 --- a/src/charm.py +++ b/src/charm.py @@ -569,6 +569,10 @@ class CephDashboardCharm(ops_openstack.core.OSBaseCharm): def _configure_tls(self, key, cert, ca_cert, ca_cert_path) -> None: """Configure TLS using provided credentials""" + is_valid, msg = cmds.validate_ssl_keypair(cert, key) + if not is_valid: + logging.error("Invalid SSL key/cert: %s", msg) + return self.TLS_KEY_PATH.write_bytes(key) self.TLS_CERT_PATH.write_bytes(cert) if ca_cert: @@ -697,6 +701,12 @@ class CephDashboardCharm(ops_openstack.core.OSBaseCharm): "config is ignored. Remove conflicting source to proceed." ) + # Check for ssl material validity. + is_valid, msg = cmds.validate_ssl_keypair(cert, key) + if not is_valid: + return BlockedStatus( + "Invalid SSL key/cert: {}".format(msg) + ) return BlockedStatus("Unknown SSL source.") def _configure_tls_from_charm_config(self) -> None: diff --git a/test-requirements.txt b/test-requirements.txt index 90c3691..63b7249 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,7 +3,7 @@ # requirements management in charms via bot-control. Thank you. charm-tools>=2.4.4 coverage>=3.6 -flake8>=2.2.4,<=2.4.1 +flake8 pyflakes==2.1.1 stestr>=2.2.0 requests>=2.18.4 diff --git a/tox.ini b/tox.ini index aa5eec8..5483caa 100644 --- a/tox.ini +++ b/tox.ini @@ -18,9 +18,9 @@ skip_missing_interpreters = False # * It is necessary to declare setuptools as a dependency otherwise tox will # fail very early at not being able to load it. The version pinning is in # line with `pip.sh`. -requires = pip < 20.3 - virtualenv < 20.0 - setuptools < 50.0.0 +requires = pip + virtualenv + setuptools # NOTE: https://wiki.canonical.com/engineering/OpenStack/InstallLatestToxOnOsci minversion = 3.2.0 @@ -40,7 +40,12 @@ allowlist_externals = rename.sh ls pwd -passenv = HOME TERM CS_* OS_* TEST_* +passenv = + HOME + TERM + CS_* + OS_* + TEST_* deps = -r{toxinidir}/test-requirements.txt [testenv:py35] diff --git a/unit_tests/test_ceph_dashboard_commands.py b/unit_tests/test_ceph_dashboard_commands.py new file mode 100644 index 0000000..aa595a3 --- /dev/null +++ b/unit_tests/test_ceph_dashboard_commands.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical +# See LICENSE file for licensing details. +# +# Learn more at: https://juju.is/docs/sdk +import unittest +import subprocess +import tempfile +import os + +from ceph_dashboard_commands import validate_ssl_keypair + + +class TestSSLValidation(unittest.TestCase): + @classmethod + def setUpClass(cls): + """Generate test certificates and keys for all test cases""" + cls.valid_cert, cls.valid_key = cls._generate_cert_key_pair() + cls.another_cert, cls.another_key = cls._generate_cert_key_pair() + cls.malformed_cert = ( + b"-----BEGIN CERTIFICATE-----\nMalform\n-----END CERTIFICATE-----" + ) + cls.malformed_key = ( + b"-----BEGIN PRIVATE KEY-----\nMalform\n-----END PRIVATE KEY-----" + ) + + @staticmethod + def _generate_cert_key_pair(days=1): + """Generate a test certificate and private key pair""" + # create a key tmpfile + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as key_file: + subprocess.run( + [ + "openssl", + "genpkey", + "-algorithm", + "RSA", + "-out", + key_file.name, + ], + check=True, + capture_output=True, + ) + # openssl config file + with tempfile.NamedTemporaryFile( + mode="w", delete=False + ) as config_file: + config_content = """ + [req] + default_bits = 2048 + prompt = no + default_md = sha256 + distinguished_name = dn + x509_extensions = v3_req + + [dn] + CN = test.local + + [v3_req] + basicConstraints = CA:FALSE + keyUsage = nonRepudiation, digitalSignature, keyEncipherment + subjectAltName = @alt_names + + [alt_names] + DNS.1 = test.local + """ + config_file.write(config_content) + config_file.flush() + + # create certificate with config file + with tempfile.NamedTemporaryFile( + delete=False, mode="wb" + ) as cert_file: + subprocess.run( + [ + "openssl", + "req", + "-new", + "-x509", + "-key", + key_file.name, + "-out", + cert_file.name, + "-config", + config_file.name, + ], + check=True, + capture_output=True, + ) + with open(cert_file.name, "rb") as cert_f: + cert_content = cert_f.read() + with open(key_file.name, "rb") as key_f: + key_content = key_f.read() + + os.unlink(cert_file.name) + os.unlink(config_file.name) + os.unlink(key_file.name) + + return cert_content, key_content + + def test_valid_cert_key_pair(self): + """Test validation of a valid certificate and key pair""" + is_valid, message = validate_ssl_keypair( + self.valid_cert, self.valid_key + ) + self.assertTrue(is_valid) + + def test_mismatched_pair(self): + """Test validation with mismatched certificate and key""" + is_valid, message = validate_ssl_keypair( + self.valid_cert, self.another_key + ) + self.assertFalse(is_valid) + + def test_malformed_cert(self): + """Test validation with malformed certificate""" + is_valid, message = validate_ssl_keypair( + self.malformed_cert, self.valid_key + ) + self.assertFalse(is_valid) + + def test_malformed_key(self): + """Test validation with malformed key""" + is_valid, message = validate_ssl_keypair( + self.valid_cert, self.malformed_key + ) + self.assertFalse(is_valid) + + def test_empty_inputs(self): + """Test validation with empty inputs""" + is_valid, message = validate_ssl_keypair(b"", b"") + self.assertFalse(is_valid) + + +if __name__ == "__main__": + unittest.main()