Switch to cryptography from pycrypto

PyCrypto seems to no longer be maintained and has issues
with Python 3+. This patch switches refstack to use the cryptography
library instead. Cryptography is actively maintained and works with
later versions of Python.

Change-Id: I38baf813caad37c9fa63d2a3ffdacad5301a5139
This commit is contained in:
Paul Van Eck 2017-02-08 18:39:11 -08:00
parent 784926d3a1
commit 1d606b3554
5 changed files with 86 additions and 56 deletions

View File

@ -22,7 +22,8 @@ import requests
import string
import types
from Crypto.PublicKey import RSA
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import serialization
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
@ -389,9 +390,14 @@ def decode_token(request):
pubkeys = db.get_user_pubkeys(openid)
for pubkey in pubkeys:
try:
pem_pubkey = RSA.importKey(
'%s %s' % (pubkey['format'], pubkey['pubkey'])
).exportKey(format='PEM')
pubkey_string = '%s %s' % (pubkey['format'], pubkey['pubkey'])
pubkey_obj = serialization.load_ssh_public_key(
pubkey_string.encode('utf-8'),
backend=backends.default_backend()
)
pem_pubkey = pubkey_obj.public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo)
except (ValueError, IndexError, TypeError, binascii.Error):
pass
else:

View File

@ -21,9 +21,11 @@ import uuid
import json
import jsonschema
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import load_ssh_public_key
from refstack.api import exceptions as api_exc
@ -125,13 +127,18 @@ class TestResultValidator(BaseValidator):
raise api_exc.ValidationError('Malformed signature', e)
try:
key = RSA.importKey(request.headers.get('X-Public-Key', ''))
key = load_ssh_public_key(
request.headers.get('X-Public-Key', ''),
backend=backends.default_backend()
)
except (binascii.Error, ValueError) as e:
raise api_exc.ValidationError('Malformed public key', e)
signer = PKCS1_v1_5.new(key)
data_hash = SHA256.new()
data_hash.update(request.body.encode('utf-8'))
if not signer.verify(data_hash, sign):
verifier = key.verifier(sign, padding.PKCS1v15(), hashes.SHA256())
verifier.update(request.body.encode('utf-8'))
try:
verifier.verify()
except InvalidSignature:
raise api_exc.ValidationError('Signature verification failed')
if self._is_empty_result(request):
raise api_exc.ValidationError('Uploaded results must contain at '
@ -179,13 +186,16 @@ class PubkeyValidator(BaseValidator):
raise api_exc.ValidationError('Malformed signature', e)
try:
key = RSA.importKey(body['raw_key'])
key = load_ssh_public_key(body['raw_key'].encode('utf-8'),
backend=backends.default_backend())
except (binascii.Error, ValueError) as e:
raise api_exc.ValidationError('Malformed public key', e)
signer = PKCS1_v1_5.new(key)
data_hash = SHA256.new()
data_hash.update('signature'.encode('utf-8'))
if not signer.verify(data_hash, sign):
verifier = key.verifier(sign, padding.PKCS1v15(), hashes.SHA256())
verifier.update('signature'.encode('utf-8'))
try:
verifier.verify()
except InvalidSignature:
raise api_exc.ValidationError('Signature verification failed')

View File

@ -15,9 +15,11 @@
import binascii
import json
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
import mock
import webtest.app
@ -49,12 +51,18 @@ class TestProfileEndpoint(api.FunctionalTest):
def test_pubkeys(self, mock_get_user):
"""Test '/v1/profile/pubkeys' API endpoint."""
url = self.URL + 'pubkeys'
data_hash = SHA256.new()
data_hash.update('signature'.encode('utf-8'))
key = RSA.generate(1024)
signer = PKCS1_v1_5.new(key)
sign = signer.sign(data_hash)
pubkey = key.publickey().exportKey('OpenSSH')
key = rsa.generate_private_key(
public_exponent=65537,
key_size=1024,
backend=default_backend()
)
signer = key.signer(padding.PKCS1v15(), hashes.SHA256())
signer.update('signature'.encode('utf-8'))
sign = signer.finalize()
pubkey = key.public_key().public_bytes(
serialization.Encoding.OpenSSH,
serialization.PublicFormat.OpenSSH
)
body = {'raw_key': pubkey,
'self_signature': binascii.b2a_hex(sign)}
json_params = json.dumps(body)

View File

@ -17,13 +17,15 @@
import binascii
import json
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
import jsonschema
import mock
from oslotest import base
import six
from refstack.api import exceptions as api_exc
from refstack.api import validators
@ -102,18 +104,24 @@ class TestResultValidatorTestCase(base.BaseTestCase):
self.validator.schema)
def test_validation_with_signature(self):
if six.PY3:
self.skip('https://github.com/dlitz/pycrypto/issues/99')
request = mock.Mock()
request.body = json.dumps(self.FAKE_JSON)
data_hash = SHA256.new()
data_hash.update(request.body.encode('utf-8'))
key = RSA.generate(1024)
signer = PKCS1_v1_5.new(key)
sign = signer.sign(data_hash)
key = rsa.generate_private_key(
public_exponent=65537,
key_size=1024,
backend=default_backend()
)
signer = key.signer(padding.PKCS1v15(), hashes.SHA256())
signer.update(request.body.encode('utf-8'))
sign = signer.finalize()
pubkey = key.public_key().public_bytes(
serialization.Encoding.OpenSSH,
serialization.PublicFormat.OpenSSH
)
request.headers = {
'X-Signature': binascii.b2a_hex(sign),
'X-Public-Key': key.publickey().exportKey('OpenSSH')
'X-Public-Key': pubkey
}
self.validator.validate(request)
@ -150,23 +158,25 @@ class TestResultValidatorTestCase(base.BaseTestCase):
@mock.patch('jsonschema.validate')
def test_validation_with_broken_signature(self, mock_validate):
if six.PY3:
self.skip('https://github.com/dlitz/pycrypto/issues/99')
request = mock.Mock()
request.body = json.dumps(self.FAKE_JSON)
key = RSA.generate(2048)
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
pubkey = key.public_key().public_bytes(
serialization.Encoding.OpenSSH,
serialization.PublicFormat.OpenSSH
)
request.headers = {
'X-Signature': binascii.b2a_hex('fake_sign'.encode('utf-8')),
'X-Public-Key': key.publickey().exportKey('OpenSSH')
'X-Signature': binascii.b2a_hex(b'fake_sign'),
'X-Public-Key': pubkey
}
self.assertRaises(api_exc.ValidationError,
self.validator.validate,
request)
request.headers = {
'X-Signature': binascii.b2a_hex('fake_sign'.encode('utf-8')),
'X-Public-Key': key.publickey().exportKey('OpenSSH')
}
try:
self.validator.validate(request)
except api_exc.ValidationError as e:
@ -175,16 +185,16 @@ class TestResultValidatorTestCase(base.BaseTestCase):
request.headers = {
'X-Signature': 'z-z-z-z!!!',
'X-Public-Key': key.publickey().exportKey('OpenSSH')
'X-Public-Key': pubkey
}
try:
self.validator.validate(request)
except api_exc.ValidationError as e:
self.assertIsInstance(e.exc, TypeError)
self.assertEqual(e.title, 'Malformed signature')
request.headers = {
'X-Signature': binascii.b2a_hex('fake_sign'),
'X-Public-Key': 'H--0'
'X-Signature': binascii.b2a_hex(b'fake_sign'),
'X-Public-Key': b'H--0'
}
try:
self.validator.validate(request)
@ -213,8 +223,6 @@ class PubkeyValidatorTestCase(base.BaseTestCase):
self.validator = validators.PubkeyValidator()
def test_validation(self):
if six.PY3:
self.skip('https://github.com/dlitz/pycrypto/issues/99')
request = mock.Mock()
request.body = json.dumps(self.FAKE_JSON)
self.validator.validate(request)
@ -245,8 +253,6 @@ class PubkeyValidatorTestCase(base.BaseTestCase):
@mock.patch('jsonschema.validate')
def test_validation_with_broken_signature(self, mock_validate):
if six.PY3:
self.skip('https://github.com/dlitz/pycrypto/issues/99')
body = self.FAKE_JSON.copy()
body['self_signature'] = 'deadbeef'

View File

@ -1,6 +1,7 @@
SQLAlchemy>=0.8.3
alembic==0.5.0
beaker==1.6.5.post1
cryptography>=1.0,!=1.3.0 # BSD/Apache-2.0
#gunicorn 19.1.1 has a bug with threading module
gunicorn==18
oslo.config>=1.6.0 # Apache-2.0
@ -8,7 +9,6 @@ oslo.db>=1.4.1 # Apache-2.0
oslo.log
pecan>=0.8.2
pyOpenSSL>=0.14
pycrypto>=2.6
requests>=2.2.0,!=2.4.0
requests-cache>=0.4.9
jsonschema>=2.0.0,<3.0.0