Prevent override of bearer token by .netrc

If there is a matching .netrc entry requests will silently override
already existing authorization headers by that entry with basic auth
[1]. This breaks the authorized admin api and can be solved by using a
custom authentication helper class.

[1] https://github.com/psf/requests/issues/3929

Change-Id: Ic3c7f4a2e8496b4e7cc57335187b935bb245c0d2
This commit is contained in:
Tobias Henkel 2020-12-04 12:24:49 +01:00
parent b371dc2f40
commit 66f87085d9
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
2 changed files with 21 additions and 5 deletions

View File

@ -17,7 +17,7 @@ from tests.unit import FakeRequestResponse
from unittest.mock import MagicMock
from zuulclient.api import ZuulRESTClient
from zuulclient.api import ZuulRESTClient, BearerAuth
from zuulclient.api import ZuulRESTException
@ -42,8 +42,8 @@ class TestApi(BaseTestCase):
self.assertEqual('https://fake.zuul/', client.url)
self.assertEqual('https://fake.zuul/api/', client.base_url)
self.assertEqual(True, client.session.verify)
self.assertEqual('Bearer %s' % token,
client.session.headers.get('Authorization'))
self.assertTrue(isinstance(client.session.auth, BearerAuth))
self.assertEqual(token, client.session.auth._token)
def _test_status_check(self, client, verb, func, *args, **kwargs):
# validate request errors

View File

@ -21,6 +21,23 @@ class ZuulRESTException(Exception):
pass
class BearerAuth(requests.auth.AuthBase):
"""Custom authentication helper class.
Authentication helper class to work around requests' default behavior
of using ~/.netrc to authenticate despite having set an explicit
authorization header.
See also https://github.com/psf/requests/issues/3929
"""
def __init__(self, token):
self._token = token
def __call__(self, r):
r.headers["Authorization"] = 'Bearer %s' % self._token
return r
class ZuulRESTClient(object):
"""Basic client for Zuul's REST API"""
def __init__(self, url, verify=False, auth_token=None):
@ -33,8 +50,7 @@ class ZuulRESTClient(object):
self.session = requests.Session()
self.session.verify = self.verify
if self.auth_token:
self.session.headers.update(
dict(Authorization='Bearer %s' % self.auth_token))
self.session.auth = BearerAuth(self.auth_token)
def _check_request_status(self, req):
try: