Allow precise #nosec placement
allow #nosec in exactly the same place the error was reported rather than at the beginning of a function call. For example the error is reported on the second line of: Popen("foo *", shell=True) so #nosec on the same line should be interpreted correctly. The original behaviour of #nosec at the start of function call is still allowed for backwards compatibility. Plugins which check keyword arguments must explicitly pass the line of the argument to the Issue constructor now. Closes-bug: 1477739 Change-Id: I71f25e2920e0533649ad8dc65b9883559fc31311
This commit is contained in:
parent
c29fdccad9
commit
c5e2eb9974
@ -273,8 +273,6 @@ class Context():
|
||||
argument_values = list((argument_values,))
|
||||
for val in argument_values:
|
||||
if arg_value == val:
|
||||
# if matched, fix up the context lineno for reporting
|
||||
self.set_lineno_for_call_arg(argument_name)
|
||||
return True
|
||||
return False
|
||||
else:
|
||||
@ -282,19 +280,17 @@ class Context():
|
||||
# eventuality
|
||||
return None
|
||||
|
||||
def set_lineno_for_call_arg(self, argument_name):
|
||||
'''Updates the line number for a specific named argument
|
||||
def get_lineno_for_call_arg(self, argument_name):
|
||||
'''Get the line number for a specific named argument
|
||||
|
||||
If a call is split over multiple lines, when a keyword arg is found
|
||||
the issue will be reported with the line number of the start of the
|
||||
call. This function updates the line number in the current context
|
||||
copy to match the actual line where the match occurs.
|
||||
:call_node: the call to find the argument in
|
||||
In case the call is split over multiple lines, get the correct one for
|
||||
the argument.
|
||||
:param argument_name: A string - name of the argument to look for
|
||||
:return: Integer - the line number of the found argument, or -1
|
||||
'''
|
||||
for key in self.node.keywords:
|
||||
if key.arg == argument_name:
|
||||
self._context['lineno'] = key.value.lineno
|
||||
return key.value.lineno
|
||||
|
||||
def get_call_arg_at_position(self, position_num):
|
||||
'''Returns positional argument at the specified position (if it exists)
|
||||
|
@ -25,14 +25,14 @@ import linecache
|
||||
|
||||
class Issue(object):
|
||||
def __init__(self, severity, confidence=constants.CONFIDENCE_DEFAULT,
|
||||
text="", ident=None):
|
||||
text="", ident=None, lineno=None):
|
||||
self.severity = severity
|
||||
self.confidence = confidence
|
||||
self.text = text
|
||||
self.ident = ident
|
||||
self.fname = ""
|
||||
self.test = ""
|
||||
self.lineno = -1
|
||||
self.lineno = lineno
|
||||
self.linerange = []
|
||||
|
||||
def __str__(self):
|
||||
|
@ -248,7 +248,15 @@ class BanditManager():
|
||||
lines = data.splitlines()
|
||||
self.metrics.begin(fname)
|
||||
self.metrics.count_locs(lines)
|
||||
score = self._execute_ast_visitor(fname, data, lines)
|
||||
if self.ignore_nosec:
|
||||
nosec_lines = set()
|
||||
else:
|
||||
nosec_lines = set(
|
||||
lineno + 1 for
|
||||
(lineno, line) in enumerate(lines)
|
||||
if b'#nosec' in line or b'# nosec' in line)
|
||||
score = self._execute_ast_visitor(fname, data,
|
||||
nosec_lines)
|
||||
self.scores.append(score)
|
||||
self.metrics.count_issues([score, ])
|
||||
except KeyboardInterrupt as e:
|
||||
@ -271,7 +279,7 @@ class BanditManager():
|
||||
# do final aggregation of metrics
|
||||
self.metrics.aggregate()
|
||||
|
||||
def _execute_ast_visitor(self, fname, data, lines):
|
||||
def _execute_ast_visitor(self, fname, data, nosec_lines):
|
||||
'''Execute AST parse on each file
|
||||
|
||||
:param fname: The name of the file being parsed
|
||||
@ -282,9 +290,9 @@ class BanditManager():
|
||||
score = []
|
||||
res = b_node_visitor.BanditNodeVisitor(fname, self.b_conf, self.b_ma,
|
||||
self.b_ts, self.debug,
|
||||
self.ignore_nosec, self.metrics)
|
||||
nosec_lines, self.metrics)
|
||||
|
||||
score = res.process(data, lines)
|
||||
score = res.process(data)
|
||||
self.results.extend(res.tester.results)
|
||||
return score
|
||||
|
||||
|
@ -29,9 +29,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class BanditNodeVisitor(object):
|
||||
def __init__(self, fname, config, metaast, testset,
|
||||
debug, ignore_nosec, metrics):
|
||||
debug, nosec_lines, metrics):
|
||||
self.debug = debug
|
||||
self.ignore_nosec = ignore_nosec
|
||||
self.nosec_lines = nosec_lines
|
||||
self.seen = 0
|
||||
self.scores = {
|
||||
'SEVERITY': [0] * len(constants.RANKING),
|
||||
@ -45,7 +45,7 @@ class BanditNodeVisitor(object):
|
||||
self.imports = set()
|
||||
self.import_aliases = {}
|
||||
self.tester = b_tester.BanditTester(
|
||||
self.config, self.testset, self.debug
|
||||
self.config, self.testset, self.debug, nosec_lines,
|
||||
)
|
||||
|
||||
# in some cases we can't determine a qualified name
|
||||
@ -56,7 +56,6 @@ class BanditNodeVisitor(object):
|
||||
self.fname)
|
||||
self.namespace = ""
|
||||
logger.debug('Module qualified name: %s', self.namespace)
|
||||
self.lines = []
|
||||
self.metrics = metrics
|
||||
|
||||
def visit_ClassDef(self, node):
|
||||
@ -194,12 +193,10 @@ class BanditNodeVisitor(object):
|
||||
if hasattr(node, 'lineno'):
|
||||
self.context['lineno'] = node.lineno
|
||||
|
||||
if not self.ignore_nosec:
|
||||
if (b"# nosec" in self.lines[node.lineno - 1] or
|
||||
b"#nosec" in self.lines[node.lineno - 1]):
|
||||
logger.debug("skipped, nosec")
|
||||
self.metrics.note_nosec()
|
||||
return False
|
||||
if node.lineno in self.nosec_lines:
|
||||
logger.debug("skipped, nosec")
|
||||
self.metrics.note_nosec()
|
||||
return False
|
||||
|
||||
self.context['node'] = node
|
||||
self.context['linerange'] = b_utils.linerange_fix(node)
|
||||
@ -273,14 +270,13 @@ class BanditNodeVisitor(object):
|
||||
operator.add, self.scores[score_type], scores[score_type]
|
||||
))
|
||||
|
||||
def process(self, data, lines):
|
||||
def process(self, data):
|
||||
'''Main process loop
|
||||
|
||||
Build and process the AST
|
||||
:param lines: lines code to process
|
||||
:return score: the aggregated score for the current file
|
||||
'''
|
||||
self.lines = lines
|
||||
f_ast = ast.parse(data)
|
||||
self.generic_visit(f_ast)
|
||||
return self.scores
|
||||
|
@ -29,12 +29,13 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BanditTester():
|
||||
def __init__(self, config, testset, debug):
|
||||
def __init__(self, config, testset, debug, nosec_lines):
|
||||
self.config = config
|
||||
self.results = []
|
||||
self.testset = testset
|
||||
self.last_result = None
|
||||
self.debug = debug
|
||||
self.nosec_lines = nosec_lines
|
||||
|
||||
def run_tests(self, raw_context, checktype):
|
||||
'''Runs all tests for a certain type of check, for example
|
||||
@ -44,6 +45,7 @@ class BanditTester():
|
||||
|
||||
:param raw_context: Raw context dictionary
|
||||
:param checktype: The type of checks to run
|
||||
:param nosec_lines: Lines which should be skipped because of nosec
|
||||
:return: a score based on the number and type of test results
|
||||
'''
|
||||
|
||||
@ -64,9 +66,12 @@ class BanditTester():
|
||||
result = test(context)
|
||||
|
||||
# if we have a result, record it and update scores
|
||||
if result is not None:
|
||||
if (result is not None and
|
||||
result.lineno not in self.nosec_lines and
|
||||
temp_context['lineno'] not in self.nosec_lines):
|
||||
result.fname = temp_context['filename']
|
||||
result.lineno = temp_context['lineno']
|
||||
if result.lineno is None:
|
||||
result.lineno = temp_context['lineno']
|
||||
result.linerange = temp_context['linerange']
|
||||
result.test = test.__name__
|
||||
|
||||
|
@ -66,5 +66,6 @@ def flask_debug_true(context):
|
||||
confidence=bandit.MEDIUM,
|
||||
text="A Flask app appears to be run with debug=True, "
|
||||
"which exposes the Werkzeug debugger and allows "
|
||||
"the execution of arbitrary code."
|
||||
"the execution of arbitrary code.",
|
||||
lineno=context.get_lineno_for_call_arg('debug'),
|
||||
)
|
||||
|
@ -63,10 +63,11 @@ def request_with_no_cert_validation(context):
|
||||
if ('requests' in context.call_function_name_qual and
|
||||
context.call_function_name in http_verbs):
|
||||
if context.check_call_arg_value('verify', 'False'):
|
||||
|
||||
return bandit.Issue(
|
||||
issue = bandit.Issue(
|
||||
severity=bandit.HIGH,
|
||||
confidence=bandit.HIGH,
|
||||
text="Requests call with verify=False disabling SSL "
|
||||
"certificate checks, security issue."
|
||||
"certificate checks, security issue.",
|
||||
lineno=context.get_lineno_for_call_arg('verify'),
|
||||
)
|
||||
return issue
|
||||
|
@ -87,5 +87,6 @@ def execute_with_run_as_root_equals_true(context, config):
|
||||
severity=bandit.LOW,
|
||||
confidence=bandit.MEDIUM,
|
||||
text="Execute with run_as_root=True identified, possible "
|
||||
"security issue."
|
||||
"security issue.",
|
||||
lineno=context.get_lineno_for_call_arg('run_as_root'),
|
||||
)
|
||||
|
@ -201,7 +201,8 @@ def subprocess_popen_with_shell_equals_true(context, config):
|
||||
confidence=bandit.HIGH,
|
||||
text='subprocess call with shell=True seems safe, but '
|
||||
'may be changed in the future, consider '
|
||||
'rewriting without shell'
|
||||
'rewriting without shell',
|
||||
lineno=context.get_lineno_for_call_arg('shell'),
|
||||
)
|
||||
elif sev == bandit.MEDIUM:
|
||||
return bandit.Issue(
|
||||
@ -209,14 +210,16 @@ def subprocess_popen_with_shell_equals_true(context, config):
|
||||
confidence=bandit.HIGH,
|
||||
text='call with shell=True contains special shell '
|
||||
'characters, consider moving extra logic into '
|
||||
'Python code'
|
||||
'Python code',
|
||||
lineno=context.get_lineno_for_call_arg('shell'),
|
||||
)
|
||||
else:
|
||||
return bandit.Issue(
|
||||
severity=bandit.HIGH,
|
||||
confidence=bandit.HIGH,
|
||||
text='subprocess call with shell=True identified, '
|
||||
'security issue.'
|
||||
'security issue.',
|
||||
lineno=context.get_lineno_for_call_arg('shell'),
|
||||
)
|
||||
|
||||
|
||||
@ -295,7 +298,8 @@ def subprocess_without_shell_equals_true(context, config):
|
||||
severity=bandit.LOW,
|
||||
confidence=bandit.HIGH,
|
||||
text='subprocess call - check for execution of untrusted '
|
||||
'input.'
|
||||
'input.',
|
||||
lineno=context.get_lineno_for_call_arg('shell'),
|
||||
)
|
||||
|
||||
|
||||
@ -377,7 +381,8 @@ def any_other_function_with_shell_equals_true(context, config):
|
||||
severity=bandit.MEDIUM,
|
||||
confidence=bandit.LOW,
|
||||
text='Function call with shell=True parameter identifed, '
|
||||
'possible security issue.'
|
||||
'possible security issue.',
|
||||
lineno=context.get_lineno_for_call_arg('shell'),
|
||||
)
|
||||
|
||||
|
||||
|
@ -142,5 +142,6 @@ def linux_commands_wildcard_injection(context, config):
|
||||
severity=bandit.HIGH,
|
||||
confidence=bandit.MEDIUM,
|
||||
text="Possible wildcard injection in call: %s" %
|
||||
context.call_function_name_qual
|
||||
context.call_function_name_qual,
|
||||
lineno=context.get_lineno_for_call_arg('shell'),
|
||||
)
|
||||
|
@ -123,7 +123,8 @@ def ssl_with_bad_version(context, config):
|
||||
severity=bandit.HIGH,
|
||||
confidence=bandit.HIGH,
|
||||
text="ssl.wrap_socket call with insecure SSL/TLS protocol "
|
||||
"version identified, security issue."
|
||||
"version identified, security issue.",
|
||||
lineno=context.get_lineno_for_call_arg('ssl_version'),
|
||||
)
|
||||
elif (context.call_function_name_qual == 'pyOpenSSL.SSL.Context'):
|
||||
if context.check_call_arg_value('method', bad_ssl_versions):
|
||||
@ -131,18 +132,22 @@ def ssl_with_bad_version(context, config):
|
||||
severity=bandit.HIGH,
|
||||
confidence=bandit.HIGH,
|
||||
text="SSL.Context call with insecure SSL/TLS protocol "
|
||||
"version identified, security issue."
|
||||
"version identified, security issue.",
|
||||
lineno=context.get_lineno_for_call_arg('method'),
|
||||
)
|
||||
|
||||
elif (context.call_function_name_qual != 'ssl.wrap_socket' and
|
||||
context.call_function_name_qual != 'pyOpenSSL.SSL.Context'):
|
||||
if (context.check_call_arg_value('method', bad_ssl_versions) or
|
||||
context.check_call_arg_value('ssl_version', bad_ssl_versions)):
|
||||
lineno = (context.get_lineno_for_call_arg('method') or
|
||||
context.get_lineno_for_call_arg('ssl_version'))
|
||||
return bandit.Issue(
|
||||
severity=bandit.MEDIUM,
|
||||
confidence=bandit.MEDIUM,
|
||||
text="Function call with insecure SSL/TLS protocol "
|
||||
"identified, possible security issue."
|
||||
"identified, possible security issue.",
|
||||
lineno=lineno,
|
||||
)
|
||||
|
||||
|
||||
@ -267,5 +272,6 @@ def ssl_with_no_version(context):
|
||||
confidence=bandit.MEDIUM,
|
||||
text="ssl.wrap_socket call with no SSL/TLS protocol version "
|
||||
"specified, the default SSLv23 could be insecure, "
|
||||
"possible security issue."
|
||||
"possible security issue.",
|
||||
lineno=context.get_lineno_for_call_arg('ssl_version'),
|
||||
)
|
||||
|
@ -97,7 +97,8 @@ def password_config_option_not_marked_secret(context, config):
|
||||
severity=bandit.MEDIUM,
|
||||
confidence=bandit.MEDIUM,
|
||||
text="oslo config option not marked secret=True "
|
||||
"identifed, security issue."
|
||||
"identifed, security issue.",
|
||||
lineno=context.get_lineno_for_call_arg('secret'),
|
||||
)
|
||||
# Checks whether secret is not True, for example when its set to a
|
||||
# variable, secret=secret.
|
||||
@ -106,5 +107,6 @@ def password_config_option_not_marked_secret(context, config):
|
||||
severity=bandit.MEDIUM,
|
||||
confidence=bandit.LOW,
|
||||
text="oslo config option possibly not marked secret=True "
|
||||
"identified."
|
||||
"identified.",
|
||||
lineno=context.get_lineno_for_call_arg('secret'),
|
||||
)
|
||||
|
5
examples/nosec.py
Normal file
5
examples/nosec.py
Normal file
@ -0,0 +1,5 @@
|
||||
subprocess.Popen('/bin/ls *', shell=True) #nosec (on the line)
|
||||
subprocess.Popen('/bin/ls *', #nosec (at the start of function call)
|
||||
shell=True)
|
||||
subprocess.Popen('/bin/ls *',
|
||||
shell=True) #nosec (on the specific kwarg line)
|
@ -479,6 +479,13 @@ class FunctionalTests(testtools.TestCase):
|
||||
}
|
||||
self.check_example('flask_debug.py', expect)
|
||||
|
||||
def test_nosec(self):
|
||||
expect = {
|
||||
'SEVERITY': {},
|
||||
'CONFIDENCE': {}
|
||||
}
|
||||
self.check_example('nosec.py', expect)
|
||||
|
||||
def test_baseline_filter(self):
|
||||
json = """{
|
||||
"results": [
|
||||
|
@ -189,14 +189,11 @@ class ContextTests(testtools.TestCase):
|
||||
|
||||
self.assertIsNone(new_context._get_literal_value(None))
|
||||
|
||||
@mock.patch('bandit.core.context.Context.set_lineno_for_call_arg')
|
||||
@mock.patch('bandit.core.context.Context.call_keywords',
|
||||
new_callable=mock.PropertyMock)
|
||||
def test_check_call_arg_value(self, call_keywords,
|
||||
set_lineno_for_call_args):
|
||||
def test_check_call_arg_value(self, call_keywords):
|
||||
new_context = context.Context()
|
||||
call_keywords.return_value = dict(spam='eggs')
|
||||
set_lineno_for_call_args.return_value = 42
|
||||
self.assertTrue(new_context.check_call_arg_value('spam', 'eggs'))
|
||||
self.assertTrue(new_context.check_call_arg_value('spam',
|
||||
['spam', 'eggs']))
|
||||
@ -209,18 +206,18 @@ class ContextTests(testtools.TestCase):
|
||||
|
||||
@mock.patch('bandit.core.context.Context.node',
|
||||
new_callable=mock.PropertyMock)
|
||||
def test_set_lineno_for_call_arg(self, node):
|
||||
def test_get_lineno_for_call_arg(self, node):
|
||||
expected_lineno = 42
|
||||
keyword1 = mock.Mock(arg='spam',
|
||||
value=mock.Mock(lineno=expected_lineno))
|
||||
node.return_value = mock.Mock(keywords=[keyword1])
|
||||
new_context = context.Context()
|
||||
new_context.set_lineno_for_call_arg('spam')
|
||||
self.assertEqual(expected_lineno, new_context._context['lineno'])
|
||||
actual_lineno = new_context.get_lineno_for_call_arg('spam')
|
||||
self.assertEqual(expected_lineno, actual_lineno)
|
||||
|
||||
new_context = context.Context()
|
||||
new_context.set_lineno_for_call_arg('eggs')
|
||||
self.assertNotIn('lineno', new_context._context)
|
||||
missing_lineno = new_context.get_lineno_for_call_arg('eggs')
|
||||
self.assertIsNone(missing_lineno)
|
||||
|
||||
def test_get_call_arg_at_position(self):
|
||||
expected_arg = 'spam'
|
||||
|
Loading…
x
Reference in New Issue
Block a user