
This change is going to upgrade ranger to use Python 3.x Change-Id: I563661e071c56c2df7e0e1a6e365aecd4158b6cd
217 lines
8.2 KiB
Python
Executable File
217 lines
8.2 KiB
Python
Executable File
"""audit module."""
|
|
|
|
import json
|
|
import logging
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
from orm.common.client.audit.audit_client.api.exceptions.audit_exception import AuditException
|
|
from orm.common.client.audit.audit_client.api.model.get_audits_result import AuditsResult
|
|
from orm.common.client.audit.audit_client.api.model.transaction import Transaction
|
|
|
|
logger = logging.getLogger(__name__)
|
|
config = {
|
|
'AUDIT_SERVER_URL': None,
|
|
'NUM_OF_SEND_RETRIES': None,
|
|
'TIME_WAIT_BETWEEN_RETRIES': None,
|
|
'SERVICE_NAME': None}
|
|
|
|
|
|
def init(audit_server_url, num_of_send_retries, time_wait_between_retries,
|
|
service_name=''):
|
|
"""Initialize the audit client.
|
|
|
|
:param audit_server_url: audit server url
|
|
:param num_of_send_retries: number of times to try and send the record
|
|
in case of failures.
|
|
:param time_wait_between_retries: how much time to wait (in seconds)
|
|
between each retry.
|
|
"""
|
|
if not audit_server_url \
|
|
or not num_of_send_retries \
|
|
or not time_wait_between_retries:
|
|
error_msg = "Error: Fail to initialize audit using following inputs " \
|
|
"AUDIT_SERVER_URL={}, NUM_OF_SEND_RETRIES={}, " \
|
|
"TIME_WAIT_BETWEEN_RETRIES={}. " \
|
|
"One of them is possibly None" \
|
|
.format(audit_server_url, num_of_send_retries,
|
|
time_wait_between_retries)
|
|
logger.error(error_msg)
|
|
raise AuditException(error_msg)
|
|
|
|
config['AUDIT_SERVER_URL'] = audit_server_url
|
|
config['NUM_OF_SEND_RETRIES'] = num_of_send_retries
|
|
config['TIME_WAIT_BETWEEN_RETRIES'] = time_wait_between_retries
|
|
config['SERVICE_NAME'] = service_name
|
|
|
|
|
|
def _validate():
|
|
"""# Validate proper initialization of the audit client."""
|
|
# if not AUDIT_SERVER_URL or not NUM_OF_SEND_RETRIES
|
|
# or not TIME_WAIT_BETWEEN_RETRIES:
|
|
if not config['AUDIT_SERVER_URL'] \
|
|
or not config['NUM_OF_SEND_RETRIES'] \
|
|
or not config['TIME_WAIT_BETWEEN_RETRIES']:
|
|
raise AuditException(
|
|
"Error: Audit was not initialize correctly. You must first "
|
|
"run audit.init(audit_server_url, num_of_send_retries, "
|
|
"time_wait_between_retries)")
|
|
|
|
|
|
def audit(timestamp, application_id, tracking_id, transaction_id,
|
|
transaction_type, resource_id, service_name, user_id=None,
|
|
external_id=None, event_details=None):
|
|
"""The method is used to audit transactions.
|
|
|
|
:param timestamp:
|
|
:param application_id:
|
|
:param tracking_id:
|
|
:param transaction_id:
|
|
:param transaction_type:
|
|
:param resource_id:
|
|
:param service_name:
|
|
:param user_id:
|
|
:param external_id:
|
|
:param event_details:
|
|
:return:
|
|
"""
|
|
thread = threading.Thread(target=_audit_thread, args=(
|
|
timestamp, application_id, tracking_id, transaction_id,
|
|
transaction_type,
|
|
resource_id, service_name, user_id, external_id, event_details))
|
|
thread.start()
|
|
|
|
|
|
def get_audits(timestamp_from=None, timestamp_to=None, application_id=None,
|
|
tracking_id=None, transaction_id=None, transaction_type=None,
|
|
resource_id=None, service_name=None, user_id=None,
|
|
external_id=None, event_details=None, limit=None):
|
|
"""The method is used to audit transactions.
|
|
|
|
:param timestamp_from:
|
|
:param timestamp_to:
|
|
:param application_id:
|
|
:param tracking_id:
|
|
:param transaction_id:
|
|
:param transaction_type:
|
|
:param resource_id:
|
|
:param service_name:
|
|
:param user_id:
|
|
:param external_id:
|
|
:param event_details:
|
|
"""
|
|
query = ""
|
|
query = _build_query(query, "q.timestamp_from", timestamp_from)
|
|
query = _build_query(query, "q.timestamp_to", timestamp_to)
|
|
query = _build_query(query, "q.application_id", application_id)
|
|
query = _build_query(query, "q.tracking_id", tracking_id)
|
|
query = _build_query(query, "q.transaction_id", transaction_id)
|
|
query = _build_query(query, "q.transaction_type", transaction_type)
|
|
query = _build_query(query, "q.resource_id", resource_id)
|
|
query = _build_query(query, "q.service_name", service_name)
|
|
query = _build_query(query, "q.user_id", user_id)
|
|
query = _build_query(query, "q.external_id", external_id)
|
|
query = _build_query(query, "q.event_details", event_details)
|
|
query = _build_query(query, "limit", limit)
|
|
payload = _get_data(query)
|
|
data = json.load(payload)
|
|
transactions = []
|
|
for transaction in data['transactions']:
|
|
timestamp = transaction['timestamp']
|
|
user_id = transaction['user_id']
|
|
application_id = transaction['application_id']
|
|
tracking_id = transaction['tracking_id']
|
|
external_id = transaction['external_id']
|
|
transaction_id = transaction['transaction_id']
|
|
resource_id = transaction['resource_id']
|
|
service_name = transaction['service_name']
|
|
transaction_type = transaction['transaction_type']
|
|
event_details = transaction['event_details']
|
|
transactions.append(
|
|
Transaction(timestamp, user_id, application_id, tracking_id,
|
|
external_id, transaction_id, transaction_type,
|
|
event_details, resource_id, service_name))
|
|
return AuditsResult(transactions)
|
|
|
|
|
|
def _build_query(query, arg_name, arg_value):
|
|
if arg_value is not None:
|
|
query = query + "%s=%s&" % (arg_name, arg_value)
|
|
return query
|
|
|
|
|
|
# A thread method for sending data to the audit server
|
|
# This method is asynchronic in order to prevent blocking
|
|
def _audit_thread(timestamp, application_id, tracking_id, transaction_id,
|
|
transaction_type, resource_id, service_name, user_id,
|
|
external_id, event_details):
|
|
# Prepare the data for the audit server
|
|
data = {
|
|
"timestamp": timestamp,
|
|
"application_id": application_id,
|
|
"tracking_id": tracking_id,
|
|
"transaction_id": transaction_id,
|
|
"transaction_type": transaction_type,
|
|
"resource_id": resource_id,
|
|
"service_name": service_name,
|
|
"user_id": user_id,
|
|
"external_id": external_id,
|
|
"event_details": event_details,
|
|
"resource_id": resource_id,
|
|
"service_name": service_name
|
|
}
|
|
_post_data(data)
|
|
|
|
|
|
def _post_data(data):
|
|
# Validate that the configuration was initialized
|
|
_validate()
|
|
# Send the data
|
|
req = urllib.request.Request(config['AUDIT_SERVER_URL']) # nosec
|
|
req.add_header('Content-Type', 'application/json')
|
|
# Retry to send the data to the audit server
|
|
success = False
|
|
for retry_number in range(config['NUM_OF_SEND_RETRIES']):
|
|
try:
|
|
urllib.request.urlopen(req, json.dumps(data).encode('utf-8')) # nosec
|
|
success = True
|
|
break
|
|
except Exception as error:
|
|
time.sleep(config['TIME_WAIT_BETWEEN_RETRIES'])
|
|
error_msg = "ERROR|CON{}AUDIT001|Fail to send data to [{}]. Tried " \
|
|
"a couple of times with no success. Last attempt " \
|
|
"error: [{}]".format(config['SERVICE_NAME'],
|
|
config['AUDIT_SERVER_URL'],
|
|
str(error))
|
|
logger.error(error_msg)
|
|
raise AuditException(error_msg)
|
|
|
|
|
|
def _get_data(query):
|
|
# Validate that the configuration was initialized
|
|
_validate()
|
|
# Send the data
|
|
audit_server_url_with_query = "{}?{}".format(config['AUDIT_SERVER_URL'],
|
|
query)
|
|
req = urllib.request.Request(audit_server_url_with_query) # nosec
|
|
# Retry to get the data from the audit server
|
|
success = False
|
|
response = None
|
|
error = None
|
|
for retry_number in range(config['NUM_OF_SEND_RETRIES']):
|
|
try:
|
|
response = urllib.request.urlopen(req) # nosec
|
|
success = True
|
|
break
|
|
except Exception as error:
|
|
time.sleep(config['TIME_WAIT_BETWEEN_RETRIES'])
|
|
error_msg = "Fail to get data from [{}]. Tried a couple of times " \
|
|
"with no success. Last attempt error: [{}]".\
|
|
format(audit_server_url_with_query, str(error))
|
|
logger.error(error_msg)
|
|
raise AuditException(error_msg)
|
|
return response
|