diff --git a/whitebox_neutron_tempest_plugin/tests/scenario/test_security_group_logging.py b/whitebox_neutron_tempest_plugin/tests/scenario/test_security_group_logging.py index 494cbbf..dda641b 100644 --- a/whitebox_neutron_tempest_plugin/tests/scenario/test_security_group_logging.py +++ b/whitebox_neutron_tempest_plugin/tests/scenario/test_security_group_logging.py @@ -69,6 +69,7 @@ class BaseSecGroupLoggingTest( socket.timeout, ConnectionResetError, EOFError) + ENTRY_PTN = 'acl_log' @classmethod def resource_setup(cls): @@ -206,8 +207,8 @@ class BaseSecGroupLoggingTest( if start_track: # tracks A value, before test traffic sent to be logged _track_value = int(hypervisor_ssh.exec_command( - "sudo grep ovn_pinctrl0 {} | tail -n1 | cut -d '|' -f 2" - .format(self.SG_LOG_FILE), timeout=120)) + "sudo grep {} {} | tail -n1 | cut -d '|' -f 2" + .format(self.ENTRY_PTN, self.SG_LOG_FILE), timeout=120)) self._hypervisors_counts[hypervisor_ssh.host]['A'] = _track_value LOG.debug("Start log count value A on '%s' is %d", hypervisor_ssh.host, _track_value) @@ -216,7 +217,7 @@ class BaseSecGroupLoggingTest( # (wait time for logging to be done fully). time.sleep(5) cmd_outputs = hypervisor_ssh.exec_command( - "sudo grep ovn_pinctrl0 {0}".format(self.SG_LOG_FILE), + "sudo grep {} {}".format(self.ENTRY_PTN, self.SG_LOG_FILE), timeout=120).splitlines() b = self._hypervisors_counts[hypervisor_ssh.host]['B'] = int( cmd_outputs[-1].split("|")[1]) @@ -227,18 +228,18 @@ class BaseSecGroupLoggingTest( a = self._hypervisors_counts[hypervisor_ssh.host]['A'] new_logs = [output for output in cmd_outputs if a < int(output.split("|")[1]) <= b] + new_logs_str = "\n".join(new_logs) self._hypervisors_counts[hypervisor_ssh.host][ - 'tested_logs'] = "\n".join(new_logs) - # log in tempest the retrieved entries amount - _test_logs_amount = b - a - mess = "Unexpected num of lines with {} < acl_log_ID <= {}".format( - a, b) - mess += "\n" + "\n".join(cmd_outputs) - self.assertEqual(_test_logs_amount, len(new_logs), mess) + 'tested_logs'] = new_logs_str + # log in tempest the retrieved entries and amount + mess = "The following SGL entries fetched: %d < log ID <= %d\n\n%s" + LOG.debug(mess, a, b, new_logs_str) + # SGL entries amount - used to verify rate/burst limits applied + _test_logs_amount = len(new_logs) self._hypervisors_counts[hypervisor_ssh.host][ 'test_logs_amount'] = _test_logs_amount LOG.debug( - "Retrieved %d log entries for test assertions from '%s'.", + "Retrieved %d SGL entries for test assertions from '%s'.", _test_logs_amount, hypervisor_ssh.host) def start_track_log(self, hypervisor_ssh): @@ -273,13 +274,13 @@ class BaseSecGroupLoggingTest( fail_msg.format('should not')) def check_log_tcp(self, should_log, hypervisor_ssh, port=9999): - pattern = 'acl_log.*tcp.*tp_dst={}'.format(port) + pattern = '{}.*tcp.*tp_dst={}'.format(self.ENTRY_PTN, port) fail_msg = ('TCP traffic to port {} {{}} ' 'be logged in test log entries.').format(port) self._check_log(should_log, pattern, fail_msg, hypervisor_ssh) def check_log_udp(self, should_log, hypervisor_ssh, port=9999): - pattern = 'acl_log.*udp.*tp_dst={}'.format(port) + pattern = '{}.*udp.*tp_dst={}'.format(self.ENTRY_PTN, port) fail_msg = ('UDP traffic to port {} {{}} ' 'be logged in test log entries.').format(port) self._check_log(should_log, pattern, fail_msg, hypervisor_ssh) @@ -289,16 +290,18 @@ class BaseSecGroupLoggingTest( fail_msg = 'ICMP {} traffic {{}} be logged in tested log entries.' # pairs of logging pattern, and matching failure message patterns = [ - (r'acl_log.*icmp.*icmp_type=8', fail_msg.format('request'))] + ('{}.*icmp.*icmp_type=8'.format(self.ENTRY_PTN), + fail_msg.format('request'))] if both_directions: patterns.append( - (r'acl_log.*icmp.*icmp_type=0', fail_msg.format('reply'))) + ('{}.*icmp.*icmp_type=0'.format(self.ENTRY_PTN), + fail_msg.format('reply'))) for ptn in patterns: self._check_log(should_log, *ptn, hypervisor_ssh) def check_log_ssh(self, should_log, hypervisor_ssh): # in RFE all test cases use drop verdict for ssh, therefore hardcoded - pattern = r'acl_log.*verdict=drop.*tcp.*tp_dst=22' + pattern = '{}.*verdict=drop.*tcp.*tp_dst=22'.format(self.ENTRY_PTN) fail_msg = 'ssh traffic {} be logged in tested log entries.' self._check_log(should_log, pattern, fail_msg, hypervisor_ssh)