Hemanth Nakkina ea5025cbd9 Use tls-certificates-interface library
Use tls-certificates-interface library from
https://charmhub.io/tls-certificates-interface/libraries/tls_certificates

Fix py3 failures due to cryptography package
clashes from zaza. Move zaza dependcies from
test-requirements to tox

Depends-On: https://review.opendev.org/c/openstack/charm-ops-sunbeam/+/865410
Change-Id: I0f211b43bd9a8fec14cccd3699181e7f9bd34542
2023-01-17 22:04:11 +05:30

1262 lines
53 KiB
Python

# Copyright 2021 Canonical Ltd.
# See LICENSE file for licensing details.
"""Library for the tls-certificates relation.
This library contains the Requires and Provides classes for handling the tls-certificates
interface.
## Getting Started
From a charm directory, fetch the library using `charmcraft`:
```shell
charmcraft fetch-lib charms.tls_certificates_interface.v1.tls_certificates
```
Add the following libraries to the charm's `requirements.txt` file:
- jsonschema
- cryptography
Add the following section to the charm's `charmcraft.yaml` file:
```yaml
parts:
charm:
build-packages:
- libffi-dev
- libssl-dev
- rustc
- cargo
```
### Provider charm
The provider charm is the charm providing certificates to another charm that requires them. In
this example, the provider charm is storing its private key using a peer relation interface called
`replicas`.
Example:
```python
from charms.tls_certificates_interface.v1.tls_certificates import (
CertificateCreationRequestEvent,
CertificateRevocationRequestEvent,
TLSCertificatesProvidesV1,
generate_private_key,
)
from ops.charm import CharmBase, InstallEvent
from ops.main import main
from ops.model import ActiveStatus, WaitingStatus
def generate_ca(private_key: bytes, subject: str) -> str:
return "whatever ca content"
def generate_certificate(ca: str, private_key: str, csr: str) -> str:
return "Whatever certificate"
class ExampleProviderCharm(CharmBase):
def __init__(self, *args):
super().__init__(*args)
self.certificates = TLSCertificatesProvidesV1(self, "certificates")
self.framework.observe(
self.certificates.on.certificate_request, self._on_certificate_request
)
self.framework.observe(
self.certificates.on.certificate_revoked, self._on_certificate_revocation_request
)
self.framework.observe(self.on.install, self._on_install)
def _on_install(self, event: InstallEvent) -> None:
private_key_password = b"banana"
private_key = generate_private_key(password=private_key_password)
ca_certificate = generate_ca(private_key=private_key, subject="whatever")
replicas_relation = self.model.get_relation("replicas")
if not replicas_relation:
self.unit.status = WaitingStatus("Waiting for peer relation to be created")
event.defer()
return
replicas_relation.data[self.app].update(
{
"private_key_password": "banana",
"private_key": private_key,
"ca_certificate": ca_certificate,
}
)
self.unit.status = ActiveStatus()
def _on_certificate_request(self, event: CertificateCreationRequestEvent) -> None:
replicas_relation = self.model.get_relation("replicas")
if not replicas_relation:
self.unit.status = WaitingStatus("Waiting for peer relation to be created")
event.defer()
return
ca_certificate = replicas_relation.data[self.app].get("ca_certificate")
private_key = replicas_relation.data[self.app].get("private_key")
certificate = generate_certificate(
ca=ca_certificate,
private_key=private_key,
csr=event.certificate_signing_request,
)
self.certificates.set_relation_certificate(
certificate=certificate,
certificate_signing_request=event.certificate_signing_request,
ca=ca_certificate,
chain=[ca_certificate, certificate],
relation_id=event.relation_id,
)
def _on_certificate_revocation_request(self, event: CertificateRevocationRequestEvent) -> None:
# Do what you want to do with this information
pass
if __name__ == "__main__":
main(ExampleProviderCharm)
```
### Requirer charm
The requirer charm is the charm requiring certificates from another charm that provides them. In
this example, the requirer charm is storing its certificates using a peer relation interface called
`replicas`.
Example:
```python
from charms.tls_certificates_interface.v1.tls_certificates import (
CertificateAvailableEvent,
CertificateExpiringEvent,
TLSCertificatesRequiresV1,
generate_csr,
generate_private_key,
)
from ops.charm import CharmBase, RelationJoinedEvent
from ops.main import main
from ops.model import ActiveStatus, WaitingStatus
class ExampleRequirerCharm(CharmBase):
def __init__(self, *args):
super().__init__(*args)
self.cert_subject = "whatever"
self.certificates = TLSCertificatesRequiresV1(self, "certificates")
self.framework.observe(self.on.install, self._on_install)
self.framework.observe(
self.on.certificates_relation_joined, self._on_certificates_relation_joined
)
self.framework.observe(
self.certificates.on.certificate_available, self._on_certificate_available
)
self.framework.observe(
self.certificates.on.certificate_expiring, self._on_certificate_expiring
)
def _on_install(self, event) -> None:
private_key_password = b"banana"
private_key = generate_private_key(password=private_key_password)
replicas_relation = self.model.get_relation("replicas")
if not replicas_relation:
self.unit.status = WaitingStatus("Waiting for peer relation to be created")
event.defer()
return
replicas_relation.data[self.app].update(
{"private_key_password": "banana", "private_key": private_key.decode()}
)
def _on_certificates_relation_joined(self, event: RelationJoinedEvent) -> None:
replicas_relation = self.model.get_relation("replicas")
if not replicas_relation:
self.unit.status = WaitingStatus("Waiting for peer relation to be created")
event.defer()
return
private_key_password = replicas_relation.data[self.app].get("private_key_password")
private_key = replicas_relation.data[self.app].get("private_key")
csr = generate_csr(
private_key=private_key.encode(),
private_key_password=private_key_password.encode(),
subject=self.cert_subject,
)
replicas_relation.data[self.app].update({"csr": csr.decode()})
self.certificates.request_certificate_creation(certificate_signing_request=csr)
def _on_certificate_available(self, event: CertificateAvailableEvent) -> None:
replicas_relation = self.model.get_relation("replicas")
if not replicas_relation:
self.unit.status = WaitingStatus("Waiting for peer relation to be created")
event.defer()
return
replicas_relation.data[self.app].update({"certificate": event.certificate})
replicas_relation.data[self.app].update({"ca": event.ca})
replicas_relation.data[self.app].update({"chain": event.chain})
self.unit.status = ActiveStatus()
def _on_certificate_expiring(self, event: CertificateExpiringEvent) -> None:
replicas_relation = self.model.get_relation("replicas")
if not replicas_relation:
self.unit.status = WaitingStatus("Waiting for peer relation to be created")
event.defer()
return
old_csr = replicas_relation.data[self.app].get("csr")
private_key_password = replicas_relation.data[self.app].get("private_key_password")
private_key = replicas_relation.data[self.app].get("private_key")
new_csr = generate_csr(
private_key=private_key.encode(),
private_key_password=private_key_password.encode(),
subject=self.cert_subject,
)
self.certificates.request_certificate_renewal(
old_certificate_signing_request=old_csr,
new_certificate_signing_request=new_csr,
)
replicas_relation.data[self.app].update({"csr": new_csr.decode()})
if __name__ == "__main__":
main(ExampleRequirerCharm)
```
""" # noqa: D405, D410, D411, D214, D416
import copy
import json
import logging
import uuid
from datetime import datetime, timedelta
from ipaddress import IPv4Address
from typing import Dict, List, Optional
from cryptography import x509
from cryptography.hazmat._oid import ExtensionOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.x509.extensions import Extension, ExtensionNotFound
from jsonschema import exceptions, validate # type: ignore[import]
from ops.charm import CharmBase, CharmEvents, RelationChangedEvent, UpdateStatusEvent
from ops.framework import EventBase, EventSource, Handle, Object
# The unique Charmhub library identifier, never change it
LIBID = "afd8c2bccf834997afce12c2706d2ede"
# Increment this major API version when introducing breaking changes
LIBAPI = 1
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 10
REQUIRER_JSON_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"$id": "https://canonical.github.io/charm-relation-interfaces/tls_certificates/v1/schemas/requirer.json", # noqa: E501
"type": "object",
"title": "`tls_certificates` requirer root schema",
"description": "The `tls_certificates` root schema comprises the entire requirer databag for this interface.", # noqa: E501
"examples": [
{
"certificate_signing_requests": [
{
"certificate_signing_request": "-----BEGIN CERTIFICATE REQUEST-----\\nMIICWjCCAUICAQAwFTETMBEGA1UEAwwKYmFuYW5hLmNvbTCCASIwDQYJKoZIhvcN\\nAQEBBQADggEPADCCAQoCggEBANWlx9wE6cW7Jkb4DZZDOZoEjk1eDBMJ+8R4pyKp\\nFBeHMl1SQSDt6rAWsrfL3KOGiIHqrRY0B5H6c51L8LDuVrJG0bPmyQ6rsBo3gVke\\nDSivfSLtGvHtp8lwYnIunF8r858uYmblAR0tdXQNmnQvm+6GERvURQ6sxpgZ7iLC\\npPKDoPt+4GKWL10FWf0i82FgxWC2KqRZUtNbgKETQuARLig7etBmCnh20zmynorA\\ncY7vrpTPAaeQpGLNqqYvKV9W6yWVY08V+nqARrFrjk3vSioZSu8ZJUdZ4d9++SGl\\nbH7A6e77YDkX9i/dQ3Pa/iDtWO3tXS2MvgoxX1iSWlGNOHcCAwEAAaAAMA0GCSqG\\nSIb3DQEBCwUAA4IBAQCW1fKcHessy/ZhnIwAtSLznZeZNH8LTVOzkhVd4HA7EJW+\\nKVLBx8DnN7L3V2/uPJfHiOg4Rx7fi7LkJPegl3SCqJZ0N5bQS/KvDTCyLG+9E8Y+\\n7wqCmWiXaH1devimXZvazilu4IC2dSks2D8DPWHgsOdVks9bme8J3KjdNMQudegc\\newWZZ1Dtbd+Rn7cpKU3jURMwm4fRwGxbJ7iT5fkLlPBlyM/yFEik4SmQxFYrZCQg\\n0f3v4kBefTh5yclPy5tEH+8G0LMsbbo3dJ5mPKpAShi0QEKDLd7eR1R/712lYTK4\\ndi4XaEfqERgy68O4rvb4PGlJeRGS7AmL7Ss8wfAq\\n-----END CERTIFICATE REQUEST-----\\n" # noqa: E501
},
{
"certificate_signing_request": "-----BEGIN CERTIFICATE REQUEST-----\\nMIICWjCCAUICAQAwFTETMBEGA1UEAwwKYmFuYW5hLmNvbTCCASIwDQYJKoZIhvcN\\nAQEBBQADggEPADCCAQoCggEBAMk3raaX803cHvzlBF9LC7KORT46z4VjyU5PIaMb\\nQLIDgYKFYI0n5hf2Ra4FAHvOvEmW7bjNlHORFEmvnpcU5kPMNUyKFMTaC8LGmN8z\\nUBH3aK+0+FRvY4afn9tgj5435WqOG9QdoDJ0TJkjJbJI9M70UOgL711oU7ql6HxU\\n4d2ydFK9xAHrBwziNHgNZ72L95s4gLTXf0fAHYf15mDA9U5yc+YDubCKgTXzVySQ\\nUx73VCJLfC/XkZIh559IrnRv5G9fu6BMLEuBwAz6QAO4+/XidbKWN4r2XSq5qX4n\\n6EPQQWP8/nd4myq1kbg6Q8w68L/0YdfjCmbyf2TuoWeImdUCAwEAAaAAMA0GCSqG\\nSIb3DQEBCwUAA4IBAQBIdwraBvpYo/rl5MH1+1Um6HRg4gOdQPY5WcJy9B9tgzJz\\nittRSlRGTnhyIo6fHgq9KHrmUthNe8mMTDailKFeaqkVNVvk7l0d1/B90Kz6OfmD\\nxN0qjW53oP7y3QB5FFBM8DjqjmUnz5UePKoX4AKkDyrKWxMwGX5RoET8c/y0y9jp\\nvSq3Wh5UpaZdWbe1oVY8CqMVUEVQL2DPjtopxXFz2qACwsXkQZxWmjvZnRiP8nP8\\nbdFaEuh9Q6rZ2QdZDEtrU4AodPU3NaukFr5KlTUQt3w/cl+5//zils6G5zUWJ2pN\\ng7+t9PTvXHRkH+LnwaVnmsBFU2e05qADQbfIn7JA\\n-----END CERTIFICATE REQUEST-----\\n" # noqa: E501
},
]
}
],
"properties": {
"certificate_signing_requests": {
"type": "array",
"items": {
"type": "object",
"properties": {"certificate_signing_request": {"type": "string"}},
"required": ["certificate_signing_request"],
},
}
},
"required": ["certificate_signing_requests"],
"additionalProperties": True,
}
PROVIDER_JSON_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"$id": "https://canonical.github.io/charm-relation-interfaces/tls_certificates/v1/schemas/provider.json", # noqa: E501
"type": "object",
"title": "`tls_certificates` provider root schema",
"description": "The `tls_certificates` root schema comprises the entire provider databag for this interface.", # noqa: E501
"example": [
{
"certificates": [
{
"ca": "-----BEGIN CERTIFICATE-----\\nMIIDJTCCAg2gAwIBAgIUMsSK+4FGCjW6sL/EXMSxColmKw8wDQYJKoZIhvcNAQEL\\nBQAwIDELMAkGA1UEBhMCVVMxETAPBgNVBAMMCHdoYXRldmVyMB4XDTIyMDcyOTIx\\nMTgyN1oXDTIzMDcyOTIxMTgyN1owIDELMAkGA1UEBhMCVVMxETAPBgNVBAMMCHdo\\nYXRldmVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA55N9DkgFWbJ/\\naqcdQhso7n1kFvt6j/fL1tJBvRubkiFMQJnZFtekfalN6FfRtA3jq+nx8o49e+7t\\nLCKT0xQ+wufXfOnxv6/if6HMhHTiCNPOCeztUgQ2+dfNwRhYYgB1P93wkUVjwudK\\n13qHTTZ6NtEF6EzOqhOCe6zxq6wrr422+ZqCvcggeQ5tW9xSd/8O1vNID/0MTKpy\\nET3drDtBfHmiUEIBR3T3tcy6QsIe4Rz/2sDinAcM3j7sG8uY6drh8jY3PWar9til\\nv2l4qDYSU8Qm5856AB1FVZRLRJkLxZYZNgreShAIYgEd0mcyI2EO/UvKxsIcxsXc\\nd45GhGpKkwIDAQABo1cwVTAfBgNVHQ4EGAQWBBRXBrXKh3p/aFdQjUcT/UcvICBL\\nODAhBgNVHSMEGjAYgBYEFFcGtcqHen9oV1CNRxP9Ry8gIEs4MA8GA1UdEwEB/wQF\\nMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAGmCEvcoFUrT9e133SHkgF/ZAgzeIziO\\nBjfAdU4fvAVTVfzaPm0yBnGqzcHyacCzbZjKQpaKVgc5e6IaqAQtf6cZJSCiJGhS\\nJYeosWrj3dahLOUAMrXRr8G/Ybcacoqc+osKaRa2p71cC3V6u2VvcHRV7HDFGJU7\\noijbdB+WhqET6Txe67rxZCJG9Ez3EOejBJBl2PJPpy7m1Ml4RR+E8YHNzB0lcBzc\\nEoiJKlDfKSO14E2CPDonnUoWBJWjEvJys3tbvKzsRj2fnLilytPFU0gH3cEjCopi\\nzFoWRdaRuNHYCqlBmso1JFDl8h4fMmglxGNKnKRar0WeGyxb4xXBGpI=\\n-----END CERTIFICATE-----\\n", # noqa: E501
"chain": [
"-----BEGIN CERTIFICATE-----\\nMIIDJTCCAg2gAwIBAgIUMsSK+4FGCjW6sL/EXMSxColmKw8wDQYJKoZIhvcNAQEL\\nBQAwIDELMAkGA1UEBhMCVVMxETAPBgNVBAMMCHdoYXRldmVyMB4XDTIyMDcyOTIx\\nMTgyN1oXDTIzMDcyOTIxMTgyN1owIDELMAkGA1UEBhMCVVMxETAPBgNVBAMMCHdo\\nYXRldmVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA55N9DkgFWbJ/\\naqcdQhso7n1kFvt6j/fL1tJBvRubkiFMQJnZFtekfalN6FfRtA3jq+nx8o49e+7t\\nLCKT0xQ+wufXfOnxv6/if6HMhHTiCNPOCeztUgQ2+dfNwRhYYgB1P93wkUVjwudK\\n13qHTTZ6NtEF6EzOqhOCe6zxq6wrr422+ZqCvcggeQ5tW9xSd/8O1vNID/0MTKpy\\nET3drDtBfHmiUEIBR3T3tcy6QsIe4Rz/2sDinAcM3j7sG8uY6drh8jY3PWar9til\\nv2l4qDYSU8Qm5856AB1FVZRLRJkLxZYZNgreShAIYgEd0mcyI2EO/UvKxsIcxsXc\\nd45GhGpKkwIDAQABo1cwVTAfBgNVHQ4EGAQWBBRXBrXKh3p/aFdQjUcT/UcvICBL\\nODAhBgNVHSMEGjAYgBYEFFcGtcqHen9oV1CNRxP9Ry8gIEs4MA8GA1UdEwEB/wQF\\nMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAGmCEvcoFUrT9e133SHkgF/ZAgzeIziO\\nBjfAdU4fvAVTVfzaPm0yBnGqzcHyacCzbZjKQpaKVgc5e6IaqAQtf6cZJSCiJGhS\\nJYeosWrj3dahLOUAMrXRr8G/Ybcacoqc+osKaRa2p71cC3V6u2VvcHRV7HDFGJU7\\noijbdB+WhqET6Txe67rxZCJG9Ez3EOejBJBl2PJPpy7m1Ml4RR+E8YHNzB0lcBzc\\nEoiJKlDfKSO14E2CPDonnUoWBJWjEvJys3tbvKzsRj2fnLilytPFU0gH3cEjCopi\\nzFoWRdaRuNHYCqlBmso1JFDl8h4fMmglxGNKnKRar0WeGyxb4xXBGpI=\\n-----END CERTIFICATE-----\\n" # noqa: E501, W505
],
"certificate_signing_request": "-----BEGIN CERTIFICATE REQUEST-----\nMIICWjCCAUICAQAwFTETMBEGA1UEAwwKYmFuYW5hLmNvbTCCASIwDQYJKoZIhvcN\nAQEBBQADggEPADCCAQoCggEBANWlx9wE6cW7Jkb4DZZDOZoEjk1eDBMJ+8R4pyKp\nFBeHMl1SQSDt6rAWsrfL3KOGiIHqrRY0B5H6c51L8LDuVrJG0bPmyQ6rsBo3gVke\nDSivfSLtGvHtp8lwYnIunF8r858uYmblAR0tdXQNmnQvm+6GERvURQ6sxpgZ7iLC\npPKDoPt+4GKWL10FWf0i82FgxWC2KqRZUtNbgKETQuARLig7etBmCnh20zmynorA\ncY7vrpTPAaeQpGLNqqYvKV9W6yWVY08V+nqARrFrjk3vSioZSu8ZJUdZ4d9++SGl\nbH7A6e77YDkX9i/dQ3Pa/iDtWO3tXS2MvgoxX1iSWlGNOHcCAwEAAaAAMA0GCSqG\nSIb3DQEBCwUAA4IBAQCW1fKcHessy/ZhnIwAtSLznZeZNH8LTVOzkhVd4HA7EJW+\nKVLBx8DnN7L3V2/uPJfHiOg4Rx7fi7LkJPegl3SCqJZ0N5bQS/KvDTCyLG+9E8Y+\n7wqCmWiXaH1devimXZvazilu4IC2dSks2D8DPWHgsOdVks9bme8J3KjdNMQudegc\newWZZ1Dtbd+Rn7cpKU3jURMwm4fRwGxbJ7iT5fkLlPBlyM/yFEik4SmQxFYrZCQg\n0f3v4kBefTh5yclPy5tEH+8G0LMsbbo3dJ5mPKpAShi0QEKDLd7eR1R/712lYTK4\ndi4XaEfqERgy68O4rvb4PGlJeRGS7AmL7Ss8wfAq\n-----END CERTIFICATE REQUEST-----\n", # noqa: E501
"certificate": "-----BEGIN CERTIFICATE-----\nMIICvDCCAaQCFFPAOD7utDTsgFrm0vS4We18OcnKMA0GCSqGSIb3DQEBCwUAMCAx\nCzAJBgNVBAYTAlVTMREwDwYDVQQDDAh3aGF0ZXZlcjAeFw0yMjA3MjkyMTE5Mzha\nFw0yMzA3MjkyMTE5MzhaMBUxEzARBgNVBAMMCmJhbmFuYS5jb20wggEiMA0GCSqG\nSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDVpcfcBOnFuyZG+A2WQzmaBI5NXgwTCfvE\neKciqRQXhzJdUkEg7eqwFrK3y9yjhoiB6q0WNAeR+nOdS/Cw7layRtGz5skOq7Aa\nN4FZHg0or30i7Rrx7afJcGJyLpxfK/OfLmJm5QEdLXV0DZp0L5vuhhEb1EUOrMaY\nGe4iwqTyg6D7fuBili9dBVn9IvNhYMVgtiqkWVLTW4ChE0LgES4oO3rQZgp4dtM5\nsp6KwHGO766UzwGnkKRizaqmLylfVusllWNPFfp6gEaxa45N70oqGUrvGSVHWeHf\nfvkhpWx+wOnu+2A5F/Yv3UNz2v4g7Vjt7V0tjL4KMV9YklpRjTh3AgMBAAEwDQYJ\nKoZIhvcNAQELBQADggEBAChjRzuba8zjQ7NYBVas89Oy7u++MlS8xWxh++yiUsV6\nWMk3ZemsPtXc1YmXorIQohtxLxzUPm2JhyzFzU/sOLmJQ1E/l+gtZHyRCwsb20fX\nmphuJsMVd7qv/GwEk9PBsk2uDqg4/Wix0Rx5lf95juJP7CPXQJl5FQauf3+LSz0y\nwF/j+4GqvrwsWr9hKOLmPdkyKkR6bHKtzzsxL9PM8GnElk2OpaPMMnzbL/vt2IAt\nxK01ZzPxCQCzVwHo5IJO5NR/fIyFbEPhxzG17QsRDOBR9fl9cOIvDeSO04vyZ+nz\n+kA2c3fNrZFAtpIlOOmFh8Q12rVL4sAjI5mVWnNEgvI=\n-----END CERTIFICATE-----\n", # noqa: E501
}
]
}
],
"properties": {
"certificates": {
"$id": "#/properties/certificates",
"type": "array",
"items": {
"$id": "#/properties/certificates/items",
"type": "object",
"required": ["certificate_signing_request", "certificate", "ca", "chain"],
"properties": {
"certificate_signing_request": {
"$id": "#/properties/certificates/items/certificate_signing_request",
"type": "string",
},
"certificate": {
"$id": "#/properties/certificates/items/certificate",
"type": "string",
},
"ca": {"$id": "#/properties/certificates/items/ca", "type": "string"},
"chain": {
"$id": "#/properties/certificates/items/chain",
"type": "array",
"items": {
"type": "string",
"$id": "#/properties/certificates/items/chain/items",
},
},
},
"additionalProperties": True,
},
}
},
"required": ["certificates"],
"additionalProperties": True,
}
logger = logging.getLogger(__name__)
class CertificateAvailableEvent(EventBase):
"""Charm Event triggered when a TLS certificate is available."""
def __init__(
self,
handle: Handle,
certificate: str,
certificate_signing_request: str,
ca: str,
chain: List[str],
):
super().__init__(handle)
self.certificate = certificate
self.certificate_signing_request = certificate_signing_request
self.ca = ca
self.chain = chain
def snapshot(self) -> dict:
"""Returns snapshot."""
return {
"certificate": self.certificate,
"certificate_signing_request": self.certificate_signing_request,
"ca": self.ca,
"chain": self.chain,
}
def restore(self, snapshot: dict):
"""Restores snapshot."""
self.certificate = snapshot["certificate"]
self.certificate_signing_request = snapshot["certificate_signing_request"]
self.ca = snapshot["ca"]
self.chain = snapshot["chain"]
class CertificateExpiringEvent(EventBase):
"""Charm Event triggered when a TLS certificate is almost expired."""
def __init__(self, handle, certificate: str, expiry: str):
"""CertificateExpiringEvent.
Args:
handle (Handle): Juju framework handle
certificate (str): TLS Certificate
expiry (str): Datetime string reprensenting the time at which the certificate
won't be valid anymore.
"""
super().__init__(handle)
self.certificate = certificate
self.expiry = expiry
def snapshot(self) -> dict:
"""Returns snapshot."""
return {"certificate": self.certificate, "expiry": self.expiry}
def restore(self, snapshot: dict):
"""Restores snapshot."""
self.certificate = snapshot["certificate"]
self.expiry = snapshot["expiry"]
class CertificateExpiredEvent(EventBase):
"""Charm Event triggered when a TLS certificate is expired."""
def __init__(self, handle: Handle, certificate: str):
super().__init__(handle)
self.certificate = certificate
def snapshot(self) -> dict:
"""Returns snapshot."""
return {"certificate": self.certificate}
def restore(self, snapshot: dict):
"""Restores snapshot."""
self.certificate = snapshot["certificate"]
class CertificateCreationRequestEvent(EventBase):
"""Charm Event triggered when a TLS certificate is required."""
def __init__(self, handle: Handle, certificate_signing_request: str, relation_id: int):
super().__init__(handle)
self.certificate_signing_request = certificate_signing_request
self.relation_id = relation_id
def snapshot(self) -> dict:
"""Returns snapshot."""
return {
"certificate_signing_request": self.certificate_signing_request,
"relation_id": self.relation_id,
}
def restore(self, snapshot: dict):
"""Restores snapshot."""
self.certificate_signing_request = snapshot["certificate_signing_request"]
self.relation_id = snapshot["relation_id"]
class CertificateRevocationRequestEvent(EventBase):
"""Charm Event triggered when a TLS certificate needs to be revoked."""
def __init__(
self,
handle: Handle,
certificate: str,
certificate_signing_request: str,
ca: str,
chain: str,
):
super().__init__(handle)
self.certificate = certificate
self.certificate_signing_request = certificate_signing_request
self.ca = ca
self.chain = chain
def snapshot(self) -> dict:
"""Returns snapshot."""
return {
"certificate": self.certificate,
"certificate_signing_request": self.certificate_signing_request,
"ca": self.ca,
"chain": self.chain,
}
def restore(self, snapshot: dict):
"""Restores snapshot."""
self.certificate = snapshot["certificate"]
self.certificate_signing_request = snapshot["certificate_signing_request"]
self.ca = snapshot["ca"]
self.chain = snapshot["chain"]
def _load_relation_data(raw_relation_data: dict) -> dict:
"""Loads relation data from the relation data bag.
Json loads all data.
Args:
raw_relation_data: Relation data from the databag
Returns:
dict: Relation data in dict format.
"""
certificate_data = dict()
for key in raw_relation_data:
try:
certificate_data[key] = json.loads(raw_relation_data[key])
except (json.decoder.JSONDecodeError, TypeError):
certificate_data[key] = raw_relation_data[key]
return certificate_data
def generate_ca(
private_key: bytes,
subject: str,
private_key_password: Optional[bytes] = None,
validity: int = 365,
country: str = "US",
) -> bytes:
"""Generates a CA Certificate.
Args:
private_key (bytes): Private key
subject (str): Certificate subject
private_key_password (bytes): Private key password
validity (int): Certificate validity time (in days)
country (str): Certificate Issuing country
Returns:
bytes: CA Certificate.
"""
private_key_object = serialization.load_pem_private_key(
private_key, password=private_key_password
)
subject = issuer = x509.Name(
[
x509.NameAttribute(x509.NameOID.COUNTRY_NAME, country),
x509.NameAttribute(x509.NameOID.COMMON_NAME, subject),
]
)
subject_identifier_object = x509.SubjectKeyIdentifier.from_public_key(
private_key_object.public_key() # type: ignore[arg-type]
)
subject_identifier = key_identifier = subject_identifier_object.public_bytes()
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key_object.public_key()) # type: ignore[arg-type]
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=validity))
.add_extension(x509.SubjectKeyIdentifier(digest=subject_identifier), critical=False)
.add_extension(
x509.AuthorityKeyIdentifier(
key_identifier=key_identifier,
authority_cert_issuer=None,
authority_cert_serial_number=None,
),
critical=False,
)
.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
)
.sign(private_key_object, hashes.SHA256()) # type: ignore[arg-type]
)
return cert.public_bytes(serialization.Encoding.PEM)
def generate_certificate(
csr: bytes,
ca: bytes,
ca_key: bytes,
ca_key_password: Optional[bytes] = None,
validity: int = 365,
alt_names: List[str] = None,
) -> bytes:
"""Generates a TLS certificate based on a CSR.
Args:
csr (bytes): CSR
ca (bytes): CA Certificate
ca_key (bytes): CA private key
ca_key_password: CA private key password
validity (int): Certificate validity (in days)
alt_names (list): List of alt names to put on cert - prefer putting SANs in CSR
Returns:
bytes: Certificate
"""
csr_object = x509.load_pem_x509_csr(csr)
subject = csr_object.subject
issuer = x509.load_pem_x509_certificate(ca).issuer
private_key = serialization.load_pem_private_key(ca_key, password=ca_key_password)
certificate_builder = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(csr_object.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=validity))
)
extensions_list = csr_object.extensions
san_ext: Optional[x509.Extension] = None
if alt_names:
full_sans_dns = alt_names.copy()
try:
loaded_san_ext = csr_object.extensions.get_extension_for_class(
x509.SubjectAlternativeName
)
full_sans_dns.extend(loaded_san_ext.value.get_values_for_type(x509.DNSName))
except ExtensionNotFound:
pass
finally:
san_ext = Extension(
ExtensionOID.SUBJECT_ALTERNATIVE_NAME,
False,
x509.SubjectAlternativeName([x509.DNSName(name) for name in full_sans_dns]),
)
if not extensions_list:
extensions_list = x509.Extensions([san_ext])
for extension in extensions_list:
if extension.value.oid == ExtensionOID.SUBJECT_ALTERNATIVE_NAME and san_ext:
extension = san_ext
certificate_builder = certificate_builder.add_extension(
extension.value,
critical=extension.critical,
)
certificate_builder._version = x509.Version.v3
cert = certificate_builder.sign(private_key, hashes.SHA256()) # type: ignore[arg-type]
return cert.public_bytes(serialization.Encoding.PEM)
def generate_pfx_package(
certificate: bytes,
private_key: bytes,
package_password: str,
private_key_password: Optional[bytes] = None,
) -> bytes:
"""Generates a PFX package to contain the TLS certificate and private key.
Args:
certificate (bytes): TLS certificate
private_key (bytes): Private key
package_password (str): Password to open the PFX package
private_key_password (bytes): Private key password
Returns:
bytes:
"""
private_key_object = serialization.load_pem_private_key(
private_key, password=private_key_password
)
certificate_object = x509.load_pem_x509_certificate(certificate)
name = certificate_object.subject.rfc4514_string()
pfx_bytes = pkcs12.serialize_key_and_certificates(
name=name.encode(),
cert=certificate_object,
key=private_key_object, # type: ignore[arg-type]
cas=None,
encryption_algorithm=serialization.BestAvailableEncryption(package_password.encode()),
)
return pfx_bytes
def generate_private_key(
password: Optional[bytes] = None,
key_size: int = 2048,
public_exponent: int = 65537,
) -> bytes:
"""Generates a private key.
Args:
password (bytes): Password for decrypting the private key
key_size (int): Key size in bytes
public_exponent: Public exponent.
Returns:
bytes: Private Key
"""
private_key = rsa.generate_private_key(
public_exponent=public_exponent,
key_size=key_size,
)
key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.BestAvailableEncryption(password)
if password
else serialization.NoEncryption(),
)
return key_bytes
def generate_csr(
private_key: bytes,
subject: str,
add_unique_id_to_subject_name: bool = True,
organization: str = None,
email_address: str = None,
country_name: str = None,
private_key_password: Optional[bytes] = None,
sans: Optional[List[str]] = None,
sans_oid: Optional[List[str]] = None,
sans_ip: Optional[List[str]] = None,
sans_dns: Optional[List[str]] = None,
additional_critical_extensions: Optional[List] = None,
) -> bytes:
"""Generates a CSR using private key and subject.
Args:
private_key (bytes): Private key
subject (str): CSR Subject.
add_unique_id_to_subject_name (bool): Whether a unique ID must be added to the CSR's
subject name. Always leave to "True" when the CSR is used to request certificates
using the tls-certificates relation.
organization (str): Name of organization.
email_address (str): Email address.
country_name (str): Country Name.
private_key_password (bytes): Private key password
sans (list): Use sans_dns - this will be deprecated in a future release
List of DNS subject alternative names (keeping it for now for backward compatibility)
sans_oid (list): List of registered ID SANs
sans_dns (list): List of DNS subject alternative names (similar to the arg: sans)
sans_ip (list): List of IP subject alternative names
additional_critical_extensions (list): List if critical additional extension objects.
Object must be a x509 ExtensionType.
Returns:
bytes: CSR
"""
signing_key = serialization.load_pem_private_key(private_key, password=private_key_password)
subject_name = [x509.NameAttribute(x509.NameOID.COMMON_NAME, subject)]
if add_unique_id_to_subject_name:
unique_identifier = uuid.uuid4()
subject_name.append(
x509.NameAttribute(x509.NameOID.X500_UNIQUE_IDENTIFIER, str(unique_identifier))
)
if organization:
subject_name.append(x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, organization))
if email_address:
subject_name.append(x509.NameAttribute(x509.NameOID.EMAIL_ADDRESS, email_address))
if country_name:
subject_name.append(x509.NameAttribute(x509.NameOID.COUNTRY_NAME, country_name))
csr = x509.CertificateSigningRequestBuilder(subject_name=x509.Name(subject_name))
_sans: List[x509.GeneralName] = []
if sans_oid:
_sans.extend([x509.RegisteredID(x509.ObjectIdentifier(san)) for san in sans_oid])
if sans_ip:
_sans.extend([x509.IPAddress(IPv4Address(san)) for san in sans_ip])
if sans:
_sans.extend([x509.DNSName(san) for san in sans])
if sans_dns:
_sans.extend([x509.DNSName(san) for san in sans_dns])
if _sans:
csr = csr.add_extension(x509.SubjectAlternativeName(set(_sans)), critical=False)
if additional_critical_extensions:
for extension in additional_critical_extensions:
csr = csr.add_extension(extension, critical=True)
signed_certificate = csr.sign(signing_key, hashes.SHA256()) # type: ignore[arg-type]
return signed_certificate.public_bytes(serialization.Encoding.PEM)
class CertificatesProviderCharmEvents(CharmEvents):
"""List of events that the TLS Certificates provider charm can leverage."""
certificate_creation_request = EventSource(CertificateCreationRequestEvent)
certificate_revocation_request = EventSource(CertificateRevocationRequestEvent)
class CertificatesRequirerCharmEvents(CharmEvents):
"""List of events that the TLS Certificates requirer charm can leverage."""
certificate_available = EventSource(CertificateAvailableEvent)
certificate_expiring = EventSource(CertificateExpiringEvent)
certificate_expired = EventSource(CertificateExpiredEvent)
class TLSCertificatesProvidesV1(Object):
"""TLS certificates provider class to be instantiated by TLS certificates providers."""
on = CertificatesProviderCharmEvents()
def __init__(self, charm: CharmBase, relationship_name: str):
super().__init__(charm, relationship_name)
self.framework.observe(
charm.on[relationship_name].relation_changed, self._on_relation_changed
)
self.charm = charm
self.relationship_name = relationship_name
def _add_certificate(
self,
relation_id: int,
certificate: str,
certificate_signing_request: str,
ca: str,
chain: List[str],
) -> None:
"""Adds certificate to relation data.
Args:
relation_id (int): Relation id
certificate (str): Certificate
certificate_signing_request (str): Certificate Signing Request
ca (str): CA Certificate
chain (list): CA Chain
Returns:
None
"""
relation = self.model.get_relation(
relation_name=self.relationship_name, relation_id=relation_id
)
if not relation:
raise RuntimeError(
f"Relation {self.relationship_name} does not exist - "
f"The certificate request can't be completed"
)
new_certificate = {
"certificate": certificate,
"certificate_signing_request": certificate_signing_request,
"ca": ca,
"chain": chain,
}
provider_relation_data = _load_relation_data(relation.data[self.charm.app])
provider_certificates = provider_relation_data.get("certificates", [])
certificates = copy.deepcopy(provider_certificates)
if new_certificate in certificates:
logger.info("Certificate already in relation data - Doing nothing")
return
certificates.append(new_certificate)
relation.data[self.model.app]["certificates"] = json.dumps(certificates)
def _remove_certificate(
self,
relation_id: int,
certificate: str = None,
certificate_signing_request: str = None,
) -> None:
"""Removes certificate from a given relation based on user provided certificate or csr.
Args:
relation_id (int): Relation id
certificate (str): Certificate (optional)
certificate_signing_request: Certificate signing request (optional)
Returns:
None
"""
relation = self.model.get_relation(
relation_name=self.relationship_name,
relation_id=relation_id,
)
if not relation:
raise RuntimeError(
f"Relation {self.relationship_name} with relation id {relation_id} does not exist"
)
provider_relation_data = _load_relation_data(relation.data[self.charm.app])
provider_certificates = provider_relation_data.get("certificates", [])
certificates = copy.deepcopy(provider_certificates)
for certificate_dict in certificates:
if certificate and certificate_dict["certificate"] == certificate:
certificates.remove(certificate_dict)
if (
certificate_signing_request
and certificate_dict["certificate_signing_request"] == certificate_signing_request
):
certificates.remove(certificate_dict)
relation.data[self.model.app]["certificates"] = json.dumps(certificates)
@staticmethod
def _relation_data_is_valid(certificates_data: dict) -> bool:
"""Uses JSON schema validator to validate relation data content.
Args:
certificates_data (dict): Certificate data dictionary as retrieved from relation data.
Returns:
bool: True/False depending on whether the relation data follows the json schema.
"""
try:
validate(instance=certificates_data, schema=REQUIRER_JSON_SCHEMA)
return True
except exceptions.ValidationError:
return False
def revoke_all_certificates(self) -> None:
"""Revokes all certificates of this provider.
This method is meant to be used when the Root CA has changed.
"""
for relation in self.model.relations[self.relationship_name]:
relation.data[self.model.app]["certificates"] = json.dumps([])
def set_relation_certificate(
self,
certificate: str,
certificate_signing_request: str,
ca: str,
chain: List[str],
relation_id: int,
) -> None:
"""Adds certificates to relation data.
Args:
certificate (str): Certificate
certificate_signing_request (str): Certificate signing request
ca (str): CA Certificate
chain (list): CA Chain
relation_id (int): Juju relation ID
Returns:
None
"""
certificates_relation = self.model.get_relation(
relation_name=self.relationship_name, relation_id=relation_id
)
if not certificates_relation:
raise RuntimeError(f"Relation {self.relationship_name} does not exist")
self._remove_certificate(
certificate_signing_request=certificate_signing_request.strip(),
relation_id=relation_id,
)
self._add_certificate(
relation_id=relation_id,
certificate=certificate.strip(),
certificate_signing_request=certificate_signing_request.strip(),
ca=ca.strip(),
chain=[cert.strip() for cert in chain],
)
def remove_certificate(self, certificate: str) -> None:
"""Removes a given certificate from relation data.
Args:
certificate (str): TLS Certificate
Returns:
None
"""
certificates_relation = self.model.relations[self.relationship_name]
if not certificates_relation:
raise RuntimeError(f"Relation {self.relationship_name} does not exist")
for certificate_relation in certificates_relation:
self._remove_certificate(certificate=certificate, relation_id=certificate_relation.id)
def _on_relation_changed(self, event: RelationChangedEvent) -> None:
"""Handler triggerred on relation changed event.
Looks at the relation data and either emits:
- certificate request event: If the unit relation data contains a CSR for which
a certificate does not exist in the provider relation data.
- certificate revocation event: If the provider relation data contains a CSR for which
a csr does not exist in the requirer relation data.
Args:
event: Juju event
Returns:
None
"""
assert event.unit is not None
requirer_relation_data = _load_relation_data(event.relation.data[event.unit])
provider_relation_data = _load_relation_data(event.relation.data[self.charm.app])
if not self._relation_data_is_valid(requirer_relation_data):
logger.warning(
f"Relation data did not pass JSON Schema validation: {requirer_relation_data}"
)
return
provider_certificates = provider_relation_data.get("certificates", [])
requirer_csrs = requirer_relation_data.get("certificate_signing_requests", [])
provider_csrs = [
certificate_creation_request["certificate_signing_request"]
for certificate_creation_request in provider_certificates
]
requirer_unit_csrs = [
certificate_creation_request["certificate_signing_request"]
for certificate_creation_request in requirer_csrs
]
for certificate_signing_request in requirer_unit_csrs:
if certificate_signing_request not in provider_csrs:
self.on.certificate_creation_request.emit(
certificate_signing_request=certificate_signing_request,
relation_id=event.relation.id,
)
self._revoke_certificates_for_which_no_csr_exists(relation_id=event.relation.id)
def _revoke_certificates_for_which_no_csr_exists(self, relation_id: int) -> None:
"""Revokes certificates for which no unit has a CSR.
Goes through all generated certificates and compare agains the list of CSRS for all units
of a given relationship.
Args:
relation_id (int): Relation id
Returns:
None
"""
certificates_relation = self.model.get_relation(
relation_name=self.relationship_name, relation_id=relation_id
)
if not certificates_relation:
raise RuntimeError(f"Relation {self.relationship_name} does not exist")
provider_relation_data = _load_relation_data(certificates_relation.data[self.charm.app])
list_of_csrs: List[str] = []
for unit in certificates_relation.units:
requirer_relation_data = _load_relation_data(certificates_relation.data[unit])
requirer_csrs = requirer_relation_data.get("certificate_signing_requests", [])
list_of_csrs.extend(csr["certificate_signing_request"] for csr in requirer_csrs)
provider_certificates = provider_relation_data.get("certificates", [])
for certificate in provider_certificates:
if certificate["certificate_signing_request"] not in list_of_csrs:
self.on.certificate_revocation_request.emit(
certificate=certificate["certificate"],
certificate_signing_request=certificate["certificate_signing_request"],
ca=certificate["ca"],
chain=certificate["chain"],
)
self.remove_certificate(certificate=certificate["certificate"])
class TLSCertificatesRequiresV1(Object):
"""TLS certificates requirer class to be instantiated by TLS certificates requirers."""
on = CertificatesRequirerCharmEvents()
def __init__(
self,
charm: CharmBase,
relationship_name: str,
expiry_notification_time: int = 168,
):
"""Generates/use private key and observes relation changed event.
Args:
charm: Charm object
relationship_name: Juju relation name
expiry_notification_time (int): Time difference between now and expiry (in hours).
Used to trigger the CertificateExpiring event. Default: 7 days.
"""
super().__init__(charm, relationship_name)
self.relationship_name = relationship_name
self.charm = charm
self.expiry_notification_time = expiry_notification_time
self.framework.observe(
charm.on[relationship_name].relation_changed, self._on_relation_changed
)
self.framework.observe(charm.on.update_status, self._on_update_status)
@property
def _requirer_csrs(self) -> List[Dict[str, str]]:
"""Returns list of requirer CSR's from relation data."""
relation = self.model.get_relation(self.relationship_name)
if not relation:
raise RuntimeError(f"Relation {self.relationship_name} does not exist")
requirer_relation_data = _load_relation_data(relation.data[self.model.unit])
return requirer_relation_data.get("certificate_signing_requests", [])
@property
def _provider_certificates(self) -> List[Dict[str, str]]:
"""Returns list of provider CSR's from relation data."""
relation = self.model.get_relation(self.relationship_name)
if not relation:
raise RuntimeError(f"Relation {self.relationship_name} does not exist")
if not relation.app:
raise RuntimeError(f"Remote app for relation {self.relationship_name} does not exist")
provider_relation_data = _load_relation_data(relation.data[relation.app])
return provider_relation_data.get("certificates", [])
def _add_requirer_csr(self, csr: str) -> None:
"""Adds CSR to relation data.
Args:
csr (str): Certificate Signing Request
Returns:
None
"""
relation = self.model.get_relation(self.relationship_name)
if not relation:
raise RuntimeError(
f"Relation {self.relationship_name} does not exist - "
f"The certificate request can't be completed"
)
new_csr_dict = {"certificate_signing_request": csr}
if new_csr_dict in self._requirer_csrs:
logger.info("CSR already in relation data - Doing nothing")
return
requirer_csrs = copy.deepcopy(self._requirer_csrs)
requirer_csrs.append(new_csr_dict)
relation.data[self.model.unit]["certificate_signing_requests"] = json.dumps(requirer_csrs)
def _remove_requirer_csr(self, csr: str) -> None:
"""Removes CSR from relation data.
Args:
csr (str): Certificate signing request
Returns:
None
"""
relation = self.model.get_relation(self.relationship_name)
if not relation:
raise RuntimeError(
f"Relation {self.relationship_name} does not exist - "
f"The certificate request can't be completed"
)
requirer_csrs = copy.deepcopy(self._requirer_csrs)
csr_dict = {"certificate_signing_request": csr}
if csr_dict not in requirer_csrs:
logger.info("CSR not in relation data - Doing nothing")
return
requirer_csrs.remove(csr_dict)
relation.data[self.model.unit]["certificate_signing_requests"] = json.dumps(requirer_csrs)
def request_certificate_creation(self, certificate_signing_request: bytes) -> None:
"""Request TLS certificate to provider charm.
Args:
certificate_signing_request (bytes): Certificate Signing Request
Returns:
None
"""
relation = self.model.get_relation(self.relationship_name)
if not relation:
message = (
f"Relation {self.relationship_name} does not exist - "
f"The certificate request can't be completed"
)
logger.error(message)
raise RuntimeError(message)
self._add_requirer_csr(certificate_signing_request.decode().strip())
logger.info("Certificate request sent to provider")
def request_certificate_revocation(self, certificate_signing_request: bytes) -> None:
"""Removes CSR from relation data.
The provider of this relation is then expected to remove certificates associated to this
CSR from the relation data as well and emit a request_certificate_revocation event for the
provider charm to interpret.
Args:
certificate_signing_request (bytes): Certificate Signing Request
Returns:
None
"""
self._remove_requirer_csr(certificate_signing_request.decode().strip())
logger.info("Certificate revocation sent to provider")
def request_certificate_renewal(
self, old_certificate_signing_request: bytes, new_certificate_signing_request: bytes
) -> None:
"""Renews certificate.
Removes old CSR from relation data and adds new one.
Args:
old_certificate_signing_request: Old CSR
new_certificate_signing_request: New CSR
Returns:
None
"""
try:
self.request_certificate_revocation(
certificate_signing_request=old_certificate_signing_request
)
except RuntimeError:
logger.warning("Certificate revocation failed.")
self.request_certificate_creation(
certificate_signing_request=new_certificate_signing_request
)
logger.info("Certificate renewal request completed.")
@staticmethod
def _relation_data_is_valid(certificates_data: dict) -> bool:
"""Checks whether relation data is valid based on json schema.
Args:
certificates_data: Certificate data in dict format.
Returns:
bool: Whether relation data is valid.
"""
try:
validate(instance=certificates_data, schema=PROVIDER_JSON_SCHEMA)
return True
except exceptions.ValidationError:
return False
def _on_relation_changed(self, event: RelationChangedEvent) -> None:
"""Handler triggerred on relation changed events.
Args:
event: Juju event
Returns:
None
"""
relation = self.model.get_relation(self.relationship_name)
if not relation:
logger.warning(f"No relation: {self.relationship_name}")
return
if not relation.app:
logger.warning(f"No remote app in relation: {self.relationship_name}")
return
provider_relation_data = _load_relation_data(relation.data[relation.app])
if not self._relation_data_is_valid(provider_relation_data):
logger.warning(
f"Provider relation data did not pass JSON Schema validation: "
f"{event.relation.data[relation.app]}"
)
return
requirer_csrs = [
certificate_creation_request["certificate_signing_request"]
for certificate_creation_request in self._requirer_csrs
]
for certificate in self._provider_certificates:
if certificate["certificate_signing_request"] in requirer_csrs:
self.on.certificate_available.emit(
certificate_signing_request=certificate["certificate_signing_request"],
certificate=certificate["certificate"],
ca=certificate["ca"],
chain=certificate["chain"],
)
def _on_update_status(self, event: UpdateStatusEvent) -> None:
"""Triggered on update status event.
Goes through each certificate in the "certificates" relation and checks their expiry date.
If they are close to expire (<7 days), emits a CertificateExpiringEvent event and if
they are expired, emits a CertificateExpiredEvent.
Args:
event (UpdateStatusEvent): Juju event
Returns:
None
"""
relation = self.model.get_relation(self.relationship_name)
if not relation:
logger.warning(f"No relation: {self.relationship_name}")
return
if not relation.app:
logger.warning(f"No remote app in relation: {self.relationship_name}")
return
provider_relation_data = _load_relation_data(relation.data[relation.app])
if not self._relation_data_is_valid(provider_relation_data):
logger.warning(
f"Provider relation data did not pass JSON Schema validation: "
f"{relation.data[relation.app]}"
)
return
for certificate_dict in self._provider_certificates:
certificate = certificate_dict["certificate"]
try:
certificate_object = x509.load_pem_x509_certificate(data=certificate.encode())
except ValueError:
logger.warning("Could not load certificate.")
continue
time_difference = certificate_object.not_valid_after - datetime.utcnow()
if time_difference.total_seconds() < 0:
logger.warning("Certificate is expired")
self.on.certificate_expired.emit(certificate=certificate)
self.request_certificate_revocation(certificate.encode())
continue
if time_difference.total_seconds() < (self.expiry_notification_time * 60 * 60):
logger.warning("Certificate almost expired")
self.on.certificate_expiring.emit(
certificate=certificate, expiry=certificate_object.not_valid_after.isoformat()
)