Add tests for subprocesses and deserialization
- Add a new section "shell_injection" to the config file. - Read subprocess functions from config in "injection_shell.py". - Check all process-starting calls in "injection_wildcard.py". - Avoid double-counting of subprocess.Popen(shell=True) and others. - Check for cPickle and marshal. - Don't flag `pickle.dumps`, since `import pickle` triggers a message. - Update a handful of examples and tests. Change-Id: I041c7d8a30658b177e88d1664bb9dcf08367d7f7
This commit is contained in:
parent
dc4278c3cb
commit
34144381d0
38
bandit.yaml
38
bandit.yaml
@ -46,27 +46,15 @@ profiles:
|
||||
blacklist_functions:
|
||||
bad_name_sets:
|
||||
- pickle:
|
||||
qualname: pickle.loads, pickle.dumps
|
||||
qualname: pickle.loads, pickle.load, pickle.Unpickler,
|
||||
cPickle.loads, cPickle.load, cPickle.Unpickler
|
||||
message: "Pickle library appears to be in use, possible security issue."
|
||||
- marshal:
|
||||
qualname: marshal.load, marshal.loads
|
||||
message: "Deserialization with the marshal module is possibly dangerous."
|
||||
- md5:
|
||||
qualname: hashlib.md5
|
||||
message: "Use of insecure MD5 hash function."
|
||||
- subprocess_popen:
|
||||
qualname: subprocess.Popen
|
||||
message: "Use of possibly insecure system call function (subprocess.Popen)."
|
||||
- subprocess_call:
|
||||
qualname: subprocess.call
|
||||
message: "Use of possibly insecure system call function (subprocess.call)."
|
||||
- os_exec:
|
||||
qualname: os.exec, os.spawn
|
||||
params: l, le, lp, lpe, v, ve, vp, vpe
|
||||
message: "Use of possibly insecure system call function (os.exec or os.spawn)."
|
||||
- os_popen:
|
||||
qualname: os.popen
|
||||
message: "Use of insecure / deprecated system call function (os.popen)."
|
||||
- os_startfile_q:
|
||||
qualname: os.startfile
|
||||
message: "Use of insecure system function (os.startfile)."
|
||||
- mktemp_q:
|
||||
qualname: tempfile.mktemp
|
||||
message: "Use of insecure and deprecated function (mktemp)."
|
||||
@ -86,6 +74,20 @@ blacklist_functions:
|
||||
qualname: urllib.urlopen, urllib.urlretrieve, urllib.URLopener, urllib.FancyURLopener, urllib2.urlopen, urllib2.Request
|
||||
message: "Audit url open for permitted schemes. Allowing use of file:/ or custom schemes is often unexpected."
|
||||
|
||||
shell_injection:
|
||||
# Start a process using the subprocess module, or one of its wrappers.
|
||||
subprocess: [subprocess.Popen, subprocess.call, subprocess.check_call,
|
||||
subprocess.check_output, utils.execute, utils.execute_with_timeout]
|
||||
# Start a process with a function vulnerable to shell injection.
|
||||
shell: [os.system, os.popen, os.popen2, os.popen3, os.popen4,
|
||||
popen2.popen2, popen2.popen3, popen2.popen4, popen2.Popen3,
|
||||
popen2.Popen4, commands.getoutput, commands.getstatusoutput]
|
||||
# Start a process with a function that is not vulnerable to shell injection.
|
||||
no_shell: [os.execl, os.execle, os.execlp, os.execlpe, os.execv,os.execve,
|
||||
os.execvp, os.execvpe, os.spawnl, os.spawnle, os.spawnlp,
|
||||
os.spawnlpe, os.spawnv, os.spawnve, os.spawnvp, os.spawnvpe,
|
||||
os.startfile]
|
||||
|
||||
blacklist_imports:
|
||||
bad_import_sets:
|
||||
- telnet:
|
||||
@ -93,7 +95,7 @@ blacklist_imports:
|
||||
level: ERROR
|
||||
message: "Telnet is considered insecure. Use SSH or some other encrypted protocol."
|
||||
- info_libs:
|
||||
import: pickle, subprocess, Crypto
|
||||
import: pickle, cPickle, subprocess, Crypto
|
||||
level: INFO
|
||||
message: "Consider possible security implications associated with {module} module."
|
||||
|
||||
|
@ -15,29 +15,47 @@
|
||||
# under the License.
|
||||
|
||||
import bandit
|
||||
from bandit.core.test_properties import *
|
||||
from bandit.core.test_properties import checks
|
||||
from bandit.core.test_properties import takes_config
|
||||
|
||||
|
||||
@takes_config('shell_injection')
|
||||
@checks('Call')
|
||||
def subprocess_popen_with_shell_equals_true(context):
|
||||
if (context.call_function_name_qual == 'subprocess.Popen' or
|
||||
context.call_function_name_qual == 'utils.execute' or
|
||||
context.call_function_name_qual == 'utils.execute_with_timeout'):
|
||||
def subprocess_popen_with_shell_equals_true(context, config):
|
||||
if config and context.call_function_name_qual in config['subprocess']:
|
||||
if context.check_call_arg_value('shell', 'True'):
|
||||
|
||||
return(bandit.ERROR, 'Popen call with shell=True '
|
||||
return(bandit.ERROR, 'subprocess call with shell=True '
|
||||
'identified, security issue. %s' %
|
||||
context.call_args_string)
|
||||
else:
|
||||
return (bandit.INFO, 'subprocess call without a subshell.')
|
||||
|
||||
|
||||
@takes_config('shell_injection')
|
||||
@checks('Call')
|
||||
def any_other_function_with_shell_equals_true(context):
|
||||
# Alerts on any function call that includes a shell=True parameter
|
||||
# (multiple 'helpers' with varying names have been identified across
|
||||
# various OpenStack projects).
|
||||
if context.call_function_name_qual != 'subprocess.Popen':
|
||||
if context.check_call_arg_value('shell', 'True'):
|
||||
def any_other_function_with_shell_equals_true(context, config):
|
||||
'''Alerts on any function call that includes a shell=True parameter.
|
||||
|
||||
Multiple "helpers" with varying names have been identified across
|
||||
various OpenStack projects.
|
||||
'''
|
||||
if config and context.call_function_name_qual not in config['subprocess']:
|
||||
if context.check_call_arg_value('shell', 'True'):
|
||||
return(bandit.WARN, 'Function call with shell=True '
|
||||
'parameter identified, possible security issue. %s' %
|
||||
context.call_args_string)
|
||||
|
||||
|
||||
@takes_config('shell_injection')
|
||||
@checks('Call')
|
||||
def start_process_with_a_shell(context, config):
|
||||
if config and context.call_function_name_qual in config['shell']:
|
||||
return (bandit.ERROR, 'Starting a process with a shell: '
|
||||
'check for injection.')
|
||||
|
||||
|
||||
@takes_config('shell_injection')
|
||||
@checks('Call')
|
||||
def start_process_with_no_shell(context, config):
|
||||
if config and context.call_function_name_qual in config['no_shell']:
|
||||
return (bandit.INFO, 'Starting a process without a shell.')
|
||||
|
@ -18,29 +18,28 @@ import bandit
|
||||
from bandit.core.test_properties import *
|
||||
|
||||
|
||||
@takes_config('shell_injection')
|
||||
@checks('Call')
|
||||
def linux_commands_wildcard_injection(context):
|
||||
system_calls = ['os.system', 'subprocess.Popen', 'os.popen']
|
||||
def linux_commands_wildcard_injection(context, config):
|
||||
vulnerable_funcs = ['chown', 'chmod', 'tar', 'rsync']
|
||||
system_calls = config['subprocess'] + config['shell'] + config['no_shell']
|
||||
if context.call_function_name_qual in system_calls:
|
||||
if context.call_args_count == 1:
|
||||
call_argument = context.get_call_arg_at_position(0)
|
||||
argument_string = ''
|
||||
if isinstance(call_argument, list):
|
||||
for li in call_argument:
|
||||
argument_string = argument_string + ' %s' % li
|
||||
elif isinstance(call_argument, str):
|
||||
argument_string = call_argument
|
||||
|
||||
for system_call in system_calls:
|
||||
if system_call in context.call_function_name_qual:
|
||||
if context.call_args_count == 1:
|
||||
call_argument = context.get_call_arg_at_position(0)
|
||||
argument_string = ''
|
||||
if isinstance(call_argument, list):
|
||||
for li in call_argument:
|
||||
argument_string = argument_string + ' %s' % li
|
||||
elif isinstance(call_argument, str):
|
||||
argument_string = call_argument
|
||||
if argument_string != '':
|
||||
for vulnerable_func in vulnerable_funcs:
|
||||
if(
|
||||
vulnerable_func in argument_string and
|
||||
'*' in argument_string
|
||||
):
|
||||
|
||||
if argument_string != '':
|
||||
for vulnerable_func in vulnerable_funcs:
|
||||
if(
|
||||
vulnerable_func in argument_string and
|
||||
'*' in argument_string
|
||||
):
|
||||
|
||||
return(bandit.ERROR, 'Possible wildcard injection '
|
||||
'in call: %s' %
|
||||
context.call_function_name_qual)
|
||||
return(bandit.ERROR, 'Possible wildcard '
|
||||
'injection in call: %s' %
|
||||
context.call_function_name_qual)
|
||||
|
@ -1,3 +0,0 @@
|
||||
import subprocess
|
||||
|
||||
subprocess.Popen(import_str)(*args, **kwargs)
|
12
examples/marshal_deserialize.py
Normal file
12
examples/marshal_deserialize.py
Normal file
@ -0,0 +1,12 @@
|
||||
import marshal
|
||||
import tempfile
|
||||
|
||||
|
||||
serialized = marshal.dumps({'a': 1})
|
||||
print marshal.loads(serialized)
|
||||
|
||||
file_obj = tempfile.TemporaryFile()
|
||||
marshal.dump(range(5), file_obj)
|
||||
file_obj.seek(0)
|
||||
print marshal.load(file_obj)
|
||||
file_obj.close()
|
3
examples/os_system.py
Normal file
3
examples/os_system.py
Normal file
@ -0,0 +1,3 @@
|
||||
import os
|
||||
|
||||
os.system('echo hi')
|
@ -1,4 +0,0 @@
|
||||
import pickle
|
||||
|
||||
pick = pickle.dumps({'a': 'b', 'c': 'd'})
|
||||
raw = pickle.loads(pick)
|
29
examples/pickle_deserialize.py
Normal file
29
examples/pickle_deserialize.py
Normal file
@ -0,0 +1,29 @@
|
||||
import cPickle
|
||||
import pickle
|
||||
import StringIO
|
||||
|
||||
|
||||
# pickle
|
||||
pick = pickle.dumps({'a': 'b', 'c': 'd'})
|
||||
print pickle.loads(pick)
|
||||
|
||||
file_obj = StringIO.StringIO()
|
||||
pickle.dump([1, 2, '3'], file_obj)
|
||||
file_obj.seek(0)
|
||||
print pickle.load(file_obj)
|
||||
|
||||
file_obj.seek(0)
|
||||
print pickle.Unpickler(file_obj).load()
|
||||
|
||||
# cPickle
|
||||
serialized = cPickle.dumps({(): []})
|
||||
print cPickle.loads(serialized)
|
||||
|
||||
file_obj = StringIO.StringIO()
|
||||
cPickle.dump((1,), file_obj)
|
||||
file_obj.seek(0)
|
||||
print cPickle.load(file_obj)
|
||||
|
||||
file_obj.seek(0)
|
||||
print cPickle.Unpickler(file_obj).load()
|
||||
|
15
examples/popen_wrappers.py
Normal file
15
examples/popen_wrappers.py
Normal file
@ -0,0 +1,15 @@
|
||||
import commands
|
||||
import popen2
|
||||
|
||||
|
||||
print commands.getstatusoutput('echo / | xargs ls')
|
||||
print commands.getoutput('echo / | xargs ls')
|
||||
|
||||
# This one is safe.
|
||||
print commands.getstatus('echo / | xargs ls')
|
||||
|
||||
print popen2.popen2('echo / | xargs ls')[0].read()
|
||||
print popen2.popen3('echo / | xargs ls')[0].read()
|
||||
print popen2.popen4('echo / | xargs ls')[0].read()
|
||||
print popen2.Popen3('echo / | xargs ls').fromchild.read()
|
||||
print popen2.Popen4('echo / | xargs ls').fromchild.read()
|
@ -1,7 +0,0 @@
|
||||
import subprocess
|
||||
|
||||
subprocess.call(["ls", "-l"])
|
||||
|
||||
subprocess.call(["ls",
|
||||
"-l"
|
||||
])
|
@ -1,13 +0,0 @@
|
||||
import subprocess
|
||||
from subprocess import Popen as pop
|
||||
|
||||
|
||||
def Popen():
|
||||
print('hi')
|
||||
|
||||
pop('gcc --version', shell=True)
|
||||
Popen('gcc --version', shell=True)
|
||||
|
||||
subprocess.Popen('gcc --version', shell=True)
|
||||
subprocess.Popen('gcc --version', shell=False)
|
||||
subprocess.Popen('gcc --version')
|
24
examples/subprocess_shell.py
Normal file
24
examples/subprocess_shell.py
Normal file
@ -0,0 +1,24 @@
|
||||
import subprocess
|
||||
from subprocess import Popen as pop
|
||||
|
||||
|
||||
def Popen(*args, **kwargs):
|
||||
print('hi')
|
||||
|
||||
pop('gcc --version', shell=True)
|
||||
Popen('gcc --version', shell=True)
|
||||
|
||||
subprocess.Popen('gcc --version', shell=True)
|
||||
subprocess.Popen(['gcc', '--version'], shell=False)
|
||||
subprocess.Popen(['gcc', '--version'])
|
||||
|
||||
subprocess.call(["ls",
|
||||
"-l"
|
||||
])
|
||||
subprocess.call('ls -l', shell=True)
|
||||
|
||||
subprocess.check_call(['ls', '-l'], shell=False)
|
||||
subprocess.check_call('ls -l', shell=True)
|
||||
|
||||
subprocess.check_output(['ls', '-l'])
|
||||
subprocess.check_output('ls -l', shell=True)
|
@ -5,5 +5,6 @@ u.execute('gcc --version', shell=True)
|
||||
utils.execute('gcc --version', shell=True)
|
||||
u.execute_with_timeout('gcc --version', shell=True)
|
||||
utils.execute_with_timeout('gcc --version', shell=True)
|
||||
utils.execute_with_timeout(['gcc', '--version'], shell=False)
|
||||
|
||||
|
||||
|
@ -4,6 +4,7 @@ import subprocess as subp
|
||||
os.system("tar xvzf *")
|
||||
subprocess.Popen("chmod *")
|
||||
o.system('chown *')
|
||||
o.popen2('chmod *')
|
||||
subp.Popen('rsync *')
|
||||
subprocess.Popen(['chown', '*'])
|
||||
subprocess.Popen(["chmod", sys.argv[1], "*"],
|
||||
|
@ -70,10 +70,6 @@ class FunctionalTests(unittest.TestCase):
|
||||
'''Test the bind-to-0.0.0.0 example.'''
|
||||
self.check_example('binding.py', warn=1)
|
||||
|
||||
def test_call_tests(self):
|
||||
'''Test the `subprocess.call` example.'''
|
||||
self.check_example('call-tests.py', info=1, warn=1)
|
||||
|
||||
def test_crypto_md5(self):
|
||||
'''Test the `hashlib.md5` example.'''
|
||||
self.check_example('crypto-md5.py', warn=5)
|
||||
@ -104,7 +100,7 @@ class FunctionalTests(unittest.TestCase):
|
||||
|
||||
def test_imports_aliases(self):
|
||||
'''Test the `import X as Y` syntax.'''
|
||||
self.check_example('imports-aliases.py', info=3, warn=6, error=1)
|
||||
self.check_example('imports-aliases.py', info=3, warn=5, error=1)
|
||||
|
||||
def test_imports_from(self):
|
||||
'''Test the `from X import Y` syntax.'''
|
||||
@ -139,31 +135,37 @@ class FunctionalTests(unittest.TestCase):
|
||||
'''Test setting file permissions.'''
|
||||
self.check_example('os-chmod.py', warn=1, error=8)
|
||||
|
||||
@unittest.skip('the `os.exec*` tests are currently broken')
|
||||
def test_os_exec(self):
|
||||
'''Test for `os.exec*`.'''
|
||||
self.check_example('os-exec.py')
|
||||
self.check_example('os-exec.py', info=8)
|
||||
|
||||
def test_os_popen(self):
|
||||
'''Test for `os.popen`.'''
|
||||
self.check_example('os-popen.py', warn=4)
|
||||
self.check_example('os-popen.py', error=7)
|
||||
|
||||
@unittest.skip('the `os.spawn*` tests are currently broken')
|
||||
def test_os_spawn(self):
|
||||
'''Test for `os.spawn*`.'''
|
||||
self.check_example('os-spawn.py')
|
||||
self.check_example('os-spawn.py', info=8)
|
||||
|
||||
def test_os_startfile(self):
|
||||
'''Test for `os.startfile`.'''
|
||||
self.check_example('os-startfile.py', warn=3)
|
||||
self.check_example('os-startfile.py', info=3)
|
||||
|
||||
def test_os_system(self):
|
||||
'''Test for `os.system`.'''
|
||||
self.check_example('os_system.py', error=1)
|
||||
|
||||
def test_pickle(self):
|
||||
'''Test for the `pickle` module.'''
|
||||
self.check_example('pickle.py', info=1, warn=2)
|
||||
self.check_example('pickle_deserialize.py', info=2, warn=6)
|
||||
|
||||
def test_random(self):
|
||||
def test_popen_wrappers(self):
|
||||
'''Test the `popen2` and `commands` modules.'''
|
||||
self.check_example('popen_wrappers.py', error=7)
|
||||
|
||||
def test_random_module(self):
|
||||
'''Test for the `random` module.'''
|
||||
self.check_example('random.py', info=3)
|
||||
self.check_example('random_module.py', info=3)
|
||||
|
||||
def test_requests_ssl_verify_disabled(self):
|
||||
'''Test for the `requests` library skipping verification.'''
|
||||
@ -186,13 +188,9 @@ class FunctionalTests(unittest.TestCase):
|
||||
'''Test for insecure SSL protocol versions.'''
|
||||
self.check_example('ssl-insecure-version.py', info=1, warn=10, error=7)
|
||||
|
||||
def test_subprocess_call(self):
|
||||
'''Test for `subprocess.call`.'''
|
||||
self.check_example('subprocess-call.py', info=1, warn=2)
|
||||
|
||||
def test_subprocess_popen_shell(self):
|
||||
def test_subprocess_shell(self):
|
||||
'''Test for `subprocess.Popen` with `shell=True`.'''
|
||||
self.check_example('subprocess-popen-shell.py', info=2, warn=5, error=2)
|
||||
self.check_example('subprocess_shell.py', info=7, warn=1, error=5)
|
||||
|
||||
def test_urlopen(self):
|
||||
'''Test for dangerous URL opening.'''
|
||||
@ -200,11 +198,11 @@ class FunctionalTests(unittest.TestCase):
|
||||
|
||||
def test_utils_shell(self):
|
||||
'''Test for `utils.execute*` with `shell=True`.'''
|
||||
self.check_example('utils-shell.py', warn=4, error=4)
|
||||
self.check_example('utils-shell.py', info=1, error=4)
|
||||
|
||||
def test_wildcard_injection(self):
|
||||
'''Test for wildcard injection in shell commands.'''
|
||||
self.check_example('wildcard-injection.py', info=1, warn=4, error=6)
|
||||
self.check_example('wildcard-injection.py', info=5, error=10)
|
||||
|
||||
def test_yaml(self):
|
||||
'''Test for `yaml.load`.'''
|
||||
@ -219,4 +217,5 @@ class FunctionalTests(unittest.TestCase):
|
||||
self.check_example('secret-config-option.py', info=1, warn=2)
|
||||
|
||||
def test_mako_templating(self):
|
||||
'''Test Mako templates for XSS.'''
|
||||
self.check_example('mako_templating.py', warn=3)
|
||||
|
Loading…
x
Reference in New Issue
Block a user