
In order to fix the init of the logging, some refactoring of how the logger is used throughout all modules was performed. Change-Id: If116fb7c17898fdc411fc62015a26b53ac006927 Closes-Bug: #1491938
214 lines
7.3 KiB
Python
214 lines
7.3 KiB
Python
# -*- coding:utf-8 -*-
|
|
#
|
|
# Copyright 2014 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
|
|
"""An object to store/access results associated with Bandit tests."""
|
|
|
|
from collections import OrderedDict
|
|
import datetime
|
|
import linecache
|
|
|
|
from bandit.core import constants
|
|
from bandit.core import extension_loader
|
|
from bandit.core import utils
|
|
|
|
|
|
class BanditResultStore():
|
|
count = 0
|
|
skipped = None
|
|
|
|
def __init__(self, config, agg_type, verbose):
|
|
self.resstore = OrderedDict()
|
|
self.count = 0
|
|
self.skipped = []
|
|
self.config = config
|
|
self.agg_type = agg_type
|
|
self.sev_level = 0
|
|
self.conf_level = 0
|
|
self.max_lines = -1
|
|
self.format = 'txt'
|
|
self.out_file = None
|
|
self.verbose = verbose
|
|
self.generated_time = datetime.datetime.utcnow()
|
|
|
|
def skip(self, filename, reason):
|
|
'''Indicates that the specified file was skipped and why
|
|
|
|
:param filename: The file that was skipped
|
|
:param reason: Why the file was skipped
|
|
:return: -
|
|
'''
|
|
self.skipped.append((filename, reason))
|
|
|
|
def add(self, context, test, issue):
|
|
'''Adds a result, with the context and the issue that was found
|
|
|
|
:param context: Context of the node
|
|
:param test: The type (function name) of the test
|
|
:param issue: Which issue was found
|
|
:return: -
|
|
'''
|
|
filename = context['filename']
|
|
lineno = context['lineno']
|
|
linerange = context['linerange']
|
|
(issue_severity, issue_confidence, issue_text) = issue
|
|
|
|
if self.agg_type == 'vuln':
|
|
key = test
|
|
else:
|
|
key = filename
|
|
|
|
self.resstore.setdefault(key, []).append(
|
|
{'fname': filename,
|
|
'test': test,
|
|
'lineno': lineno,
|
|
'linerange': linerange,
|
|
'issue_severity': issue_severity,
|
|
'issue_confidence': issue_confidence,
|
|
'issue_text': issue_text})
|
|
|
|
self.count += 1
|
|
|
|
def _write_report(self, files_list, scores, excluded_files):
|
|
formatters_mgr = extension_loader.MANAGER.formatters_mgr
|
|
try:
|
|
formatter = formatters_mgr[self.format]
|
|
except KeyError: # Unrecognized format, so use text instead
|
|
formatter = formatters_mgr['txt']
|
|
|
|
if self.format == 'csv':
|
|
self.max_lines = 1
|
|
elif formatter.name == 'txt' and self.out_file:
|
|
self.format = 'plain'
|
|
|
|
report_func = formatter.plugin
|
|
report_func(self, files_list, scores, excluded_files=excluded_files)
|
|
|
|
def report(self, files_list, scores, excluded_files=None, lines=-1,
|
|
sev_level=1, conf_level=1, output_filename=None,
|
|
output_format=None):
|
|
'''Prints the contents of the result store
|
|
|
|
:param scope: Which files were inspected
|
|
:param scores: The scores awarded to each file in the scope
|
|
:param lines: # of lines around the issue line to display (optional)
|
|
:param sev_level: What level of severity to display (optional)
|
|
:param conf_level: What level of confidence to display (optional)
|
|
:param output_filename: File to output the results (optional)
|
|
:param output_format: File type to output (csv|json|txt|xml)
|
|
:return: -
|
|
'''
|
|
|
|
if not excluded_files:
|
|
excluded_files = []
|
|
|
|
if sev_level >= len(constants.RANKING):
|
|
sev_level = len(constants.RANKING) - 1
|
|
if conf_level >= len(constants.RANKING):
|
|
conf_level = len(constants.RANKING) - 1
|
|
|
|
self.sev_level = sev_level
|
|
self.conf_level = conf_level
|
|
self.max_lines = lines
|
|
self.format = output_format
|
|
self.out_file = output_filename
|
|
|
|
try:
|
|
self._write_report(files_list, scores, excluded_files)
|
|
except IOError:
|
|
print("Unable to write to file: %s" % self.out_file)
|
|
|
|
def _get_issue_list(self):
|
|
|
|
collector = list()
|
|
|
|
for group in self.resstore.items():
|
|
issue_list = group[1]
|
|
for issue in issue_list:
|
|
if (self._check_severity(issue['issue_severity']) and
|
|
self._check_confidence(issue['issue_confidence'])):
|
|
code = self._get_code(issue, True)
|
|
holder = dict({
|
|
"filename": issue['fname'],
|
|
"line_number": issue['lineno'],
|
|
"line_range": issue['linerange'],
|
|
"test_name": issue['test'],
|
|
"issue_severity": issue['issue_severity'],
|
|
"issue_confidence": issue['issue_confidence'],
|
|
"code": code,
|
|
"issue_text": issue['issue_text']
|
|
})
|
|
collector.append(holder)
|
|
|
|
return collector
|
|
|
|
def _get_code(self, issue, tabbed=False):
|
|
'''Gets lines of code from a file
|
|
|
|
:param filename: Filename of file with code in it
|
|
:param line_list: A list of integers corresponding to line numbers
|
|
:return: string of code
|
|
'''
|
|
issue_line = []
|
|
prepend = ""
|
|
|
|
file_len = sum(1 for line in open(issue['fname']))
|
|
lines = utils.lines_with_context(issue['lineno'],
|
|
issue['linerange'],
|
|
self.max_lines,
|
|
file_len)
|
|
|
|
for l in lines:
|
|
if l:
|
|
if tabbed:
|
|
prepend = "%s\t" % l
|
|
issue_line.append(prepend + linecache.getline(
|
|
issue['fname'],
|
|
l))
|
|
|
|
return ''.join(issue_line)
|
|
|
|
def _sum_scores(self, scores):
|
|
'''Get total of all scores
|
|
|
|
This just computes the sum of all recorded scores, filtering them
|
|
on the chosen minimum severity level.
|
|
:param score_list: the list of scores to total
|
|
:return: an integer total sum of all scores above the threshold
|
|
'''
|
|
total = 0
|
|
for score_type in scores:
|
|
total = total + sum(scores[score_type][self.sev_level:])
|
|
return total
|
|
|
|
def _check_severity(self, severity):
|
|
'''Check severity level
|
|
|
|
returns true if the issue severity is above the threshold.
|
|
:param severity: the severity of the issue being checked
|
|
:return: boolean result
|
|
'''
|
|
return constants.RANKING.index(severity) >= self.sev_level
|
|
|
|
def _check_confidence(self, confidence):
|
|
'''Check confidence level
|
|
|
|
returns true if the issue confidence is above the threshold.
|
|
:param confidence: the confidence of the issue being checked
|
|
:return: boolean result
|
|
'''
|
|
return constants.RANKING.index(confidence) >= self.conf_level
|