SSL handling: check cert/key before attempting to set

Also update some dependencies and fix tox config

Change-Id: Ief48e2f67130f45b4aa83840636b11313a44bb96
Signed-off-by: Peter Sabaini <peter.sabaini@canonical.com>
This commit is contained in:
Peter Sabaini 2024-12-20 17:54:28 +01:00
parent aee0db62e9
commit 68993e5932
5 changed files with 226 additions and 6 deletions

View File

@ -5,8 +5,10 @@
# Learn more at: https://juju.is/docs/sdk
import json
import os
import socket
from typing import List
import tempfile
from typing import List, Tuple
from functools import partial
import subprocess
@ -150,3 +152,70 @@ def check_ceph_dashboard_ssl_configured(
return False
return True
def validate_ssl_keypair(cert: bytes, key: bytes) -> Tuple[bool, str]:
"""Validates if a private key matches a certificate
Args:
cert, key (str): SSL material
Returns:
Tuple[bool, str]: bool for validaity and err message
"""
try:
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as cert_temp:
cert_temp.write(cert)
cert_path = cert_temp.name
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as key_temp:
key_temp.write(key)
key_path = key_temp.name
except IOError as e:
return False, f"Failed to create temporary files: {str(e)}"
try:
# check if pubkeys from cert and key match
try:
cert_pubkey_cmd = subprocess.run(
["openssl", "x509", "-in", cert_path, "-noout", "-pubkey"],
capture_output=True,
text=True,
check=True,
)
cert_pubkey = cert_pubkey_cmd.stdout.strip()
except subprocess.CalledProcessError as e:
return (
False,
f"Failed to extract pubkey from cert: {e.stderr.strip()}",
)
try:
key_pubkey_cmd = subprocess.run(
["openssl", "rsa", "-in", key_path, "-pubout"],
capture_output=True,
text=True,
check=True,
)
key_pubkey = key_pubkey_cmd.stdout.strip()
except subprocess.CalledProcessError as e:
return (
False,
f"Failed to extract pubkey from priv key: {e.stderr.strip()}",
)
if cert_pubkey != key_pubkey:
return False, "Certificate and private key do not match"
return (
True,
"Certificate and private key match and certificate is valid",
)
finally:
# Best effort clean up
try:
os.unlink(cert_path)
os.unlink(key_path)
except Exception:
pass

View File

@ -569,6 +569,10 @@ class CephDashboardCharm(ops_openstack.core.OSBaseCharm):
def _configure_tls(self, key, cert, ca_cert, ca_cert_path) -> None:
"""Configure TLS using provided credentials"""
is_valid, msg = cmds.validate_ssl_keypair(cert, key)
if not is_valid:
logging.error("Invalid SSL key/cert: %s", msg)
return
self.TLS_KEY_PATH.write_bytes(key)
self.TLS_CERT_PATH.write_bytes(cert)
if ca_cert:
@ -697,6 +701,12 @@ class CephDashboardCharm(ops_openstack.core.OSBaseCharm):
"config is ignored. Remove conflicting source to proceed."
)
# Check for ssl material validity.
is_valid, msg = cmds.validate_ssl_keypair(cert, key)
if not is_valid:
return BlockedStatus(
"Invalid SSL key/cert: {}".format(msg)
)
return BlockedStatus("Unknown SSL source.")
def _configure_tls_from_charm_config(self) -> None:

View File

@ -3,7 +3,7 @@
# requirements management in charms via bot-control. Thank you.
charm-tools>=2.4.4
coverage>=3.6
flake8>=2.2.4,<=2.4.1
flake8
pyflakes==2.1.1
stestr>=2.2.0
requests>=2.18.4

13
tox.ini
View File

@ -18,9 +18,9 @@ skip_missing_interpreters = False
# * It is necessary to declare setuptools as a dependency otherwise tox will
# fail very early at not being able to load it. The version pinning is in
# line with `pip.sh`.
requires = pip < 20.3
virtualenv < 20.0
setuptools < 50.0.0
requires = pip
virtualenv
setuptools
# NOTE: https://wiki.canonical.com/engineering/OpenStack/InstallLatestToxOnOsci
minversion = 3.2.0
@ -40,7 +40,12 @@ allowlist_externals =
rename.sh
ls
pwd
passenv = HOME TERM CS_* OS_* TEST_*
passenv =
HOME
TERM
CS_*
OS_*
TEST_*
deps = -r{toxinidir}/test-requirements.txt
[testenv:py35]

View File

@ -0,0 +1,136 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical
# See LICENSE file for licensing details.
#
# Learn more at: https://juju.is/docs/sdk
import unittest
import subprocess
import tempfile
import os
from ceph_dashboard_commands import validate_ssl_keypair
class TestSSLValidation(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Generate test certificates and keys for all test cases"""
cls.valid_cert, cls.valid_key = cls._generate_cert_key_pair()
cls.another_cert, cls.another_key = cls._generate_cert_key_pair()
cls.malformed_cert = (
b"-----BEGIN CERTIFICATE-----\nMalform\n-----END CERTIFICATE-----"
)
cls.malformed_key = (
b"-----BEGIN PRIVATE KEY-----\nMalform\n-----END PRIVATE KEY-----"
)
@staticmethod
def _generate_cert_key_pair(days=1):
"""Generate a test certificate and private key pair"""
# create a key tmpfile
with tempfile.NamedTemporaryFile(mode="wb", delete=False) as key_file:
subprocess.run(
[
"openssl",
"genpkey",
"-algorithm",
"RSA",
"-out",
key_file.name,
],
check=True,
capture_output=True,
)
# openssl config file
with tempfile.NamedTemporaryFile(
mode="w", delete=False
) as config_file:
config_content = """
[req]
default_bits = 2048
prompt = no
default_md = sha256
distinguished_name = dn
x509_extensions = v3_req
[dn]
CN = test.local
[v3_req]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names
[alt_names]
DNS.1 = test.local
"""
config_file.write(config_content)
config_file.flush()
# create certificate with config file
with tempfile.NamedTemporaryFile(
delete=False, mode="wb"
) as cert_file:
subprocess.run(
[
"openssl",
"req",
"-new",
"-x509",
"-key",
key_file.name,
"-out",
cert_file.name,
"-config",
config_file.name,
],
check=True,
capture_output=True,
)
with open(cert_file.name, "rb") as cert_f:
cert_content = cert_f.read()
with open(key_file.name, "rb") as key_f:
key_content = key_f.read()
os.unlink(cert_file.name)
os.unlink(config_file.name)
os.unlink(key_file.name)
return cert_content, key_content
def test_valid_cert_key_pair(self):
"""Test validation of a valid certificate and key pair"""
is_valid, message = validate_ssl_keypair(
self.valid_cert, self.valid_key
)
self.assertTrue(is_valid)
def test_mismatched_pair(self):
"""Test validation with mismatched certificate and key"""
is_valid, message = validate_ssl_keypair(
self.valid_cert, self.another_key
)
self.assertFalse(is_valid)
def test_malformed_cert(self):
"""Test validation with malformed certificate"""
is_valid, message = validate_ssl_keypair(
self.malformed_cert, self.valid_key
)
self.assertFalse(is_valid)
def test_malformed_key(self):
"""Test validation with malformed key"""
is_valid, message = validate_ssl_keypair(
self.valid_cert, self.malformed_key
)
self.assertFalse(is_valid)
def test_empty_inputs(self):
"""Test validation with empty inputs"""
is_valid, message = validate_ssl_keypair(b"", b"")
self.assertFalse(is_valid)
if __name__ == "__main__":
unittest.main()