Add support for DSA, ECDSA and Ed25519 key types

Add support default key files paths (id_dsa, id_ecdsa, id_ed25519)

Change-Id: I8c754a3584249ec4fb372a3822642b7b95b2600e
This commit is contained in:
Federico Ressi 2022-01-10 12:37:51 +01:00
parent 69afc3aa87
commit 51811b1dbe
4 changed files with 57 additions and 16 deletions

View File

@ -59,6 +59,7 @@ tobiko_config_path = _config.tobiko_config_path
TobikoException = _exception.TobikoException
check_valid_type = _exception.check_valid_type
exc_info = _exception.exc_info
ExceptionInfo = _exception.ExceptionInfo
handle_multiple_exceptions = _exception.handle_multiple_exceptions
list_exc_infos = _exception.list_exc_infos

View File

@ -18,7 +18,6 @@ from __future__ import absolute_import
from collections import abc
import contextlib
import getpass
import io
import os
import subprocess
import time
@ -26,6 +25,7 @@ import threading
import typing
import netaddr
import testtools
from oslo_log import log
import paramiko
from paramiko import common
@ -578,32 +578,63 @@ def ssh_client(host, port=None, username=None, proxy_jump=None,
**connect_parameters)
def load_private_keys(key_filenames: typing.List[str]) \
KEY_CLASSES: typing.List[typing.Type[paramiko.PKey]] = [
paramiko.RSAKey,
paramiko.DSSKey,
paramiko.ECDSAKey,
paramiko.Ed25519Key,
]
def load_private_keys(key_filenames: typing.List[str],
password: str = None) \
-> typing.List[paramiko.PKey]:
pkeys: typing.List[paramiko.PKey] = []
for filename in key_filenames:
if os.path.exists(filename):
try:
with io.open(filename, 'rt') as fd:
pkey: paramiko.PKey = paramiko.RSAKey.from_private_key(fd)
except Exception:
LOG.error('Unable to get RSAKey private key from file: '
f'{filename}', exc_info=1)
pkey = load_private_key(filename, password=password)
except LoadPrivateKeyError as ex:
LOG.exception(f'Error loading key file: {ex}')
else:
pkeys.append(pkey)
else:
LOG.debug(f'Key file not found: {filename}')
return pkeys
class LoadPrivateKeyError(tobiko.TobikoException):
message = "Unable to load private key from file {filename}"
def load_private_key(filename: str,
password: str = None) -> paramiko.PKey:
errors: typing.List[tobiko.ExceptionInfo] = []
for key_class in KEY_CLASSES:
try:
pkey: paramiko.PKey = key_class.from_private_key_file(
filename=filename, password=password)
except (paramiko.SSHException, ValueError):
errors.append(tobiko.exc_info())
else:
LOG.debug(f'Key file loaded: {filename} (key_class={key_class})')
return pkey
cause = testtools.MultipleExceptions(*errors)
raise LoadPrivateKeyError(filename=filename) from cause
def ssh_connect(hostname, username=None, port=None, connection_interval=None,
connection_attempts=None, connection_timeout=None,
proxy_command=None, proxy_client=None, key_filename=None,
password: str = None,
**parameters):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
login = _command.ssh_login(hostname=hostname, username=username, port=port)
assert isinstance(key_filename, list)
pkeys = load_private_keys(key_filename)
pkeys = load_private_keys(key_filename, password=password)
auth_failed: typing.Optional[Exception] = None
for attempt in tobiko.retry(count=connection_attempts,
timeout=connection_timeout,
@ -614,7 +645,7 @@ def ssh_connect(hostname, username=None, port=None, connection_interval=None,
LOG.debug(f"Logging in to '{login}'...\n"
f" - parameters: {parameters}\n"
f" - attempt: {attempt.details}\n")
for pkey in pkeys + [None]:
for pkey in pkeys + [None]: # type: ignore
succeeded = False
proxy_sock = ssh_proxy_sock(
hostname=hostname,

View File

@ -15,6 +15,7 @@
# under the License.
from __future__ import absolute_import
import os.path
import typing # noqa
from oslo_log import log
@ -75,7 +76,8 @@ class GetSSHKeyFileFixture(tobiko.SharedFixture):
return
key_file = tobiko.tobiko_config_path(
f"~/.ssh/id_rsa-{remote_hostname}")
f"~/.ssh/{os.path.basename(self.remote_key_file)}-" +
remote_hostname)
with tobiko.open_output_file(key_file) as fd:
fd.write(private_key.decode())
with tobiko.open_output_file(key_file + '.pub') as fd:

View File

@ -40,8 +40,12 @@ OPTIONS = [
default=['ssh_config', '.ssh/config'],
help="Default user SSH configuration files"),
cfg.ListOpt('key_file',
default=['~/.ssh/id_rsa', '.ssh/id'],
help="Default SSH private key file(s)"),
default=['.ssh/id',
'~/.ssh/id_dsa',
'~/.ssh/id_rsa',
'~/.ssh/id_ecdsa',
'~/.ssh/id_ed25519'],
help="Default SSH private key file(s) wildcard"),
cfg.BoolOpt('allow_agent',
default=False,
help=("Set to False to disable connecting to the "
@ -97,7 +101,10 @@ def setup_tobiko_config(conf):
ssh_proxy_client = _client.ssh_proxy_client()
if ssh_proxy_client:
key_file = _ssh_key_file.get_key_file(ssh_client=ssh_proxy_client)
if key_file and os.path.isfile(key_file):
LOG.info(f"Use SSH proxy server keyfile: {key_file}")
conf.ssh.key_file.append(key_file)
key_file: str
for remote_key_file in conf.ssh.key_file:
key_file = _ssh_key_file.get_key_file(ssh_client=ssh_proxy_client,
key_file=remote_key_file)
if key_file and os.path.isfile(key_file):
LOG.info(f"Use SSH proxy server keyfile: {key_file}")
conf.ssh.key_file.append(key_file)