add a config option to put sql queries into annotations instead of spans
This commit is contained in:
parent
b135473ba2
commit
0b7f10cac1
@ -33,6 +33,7 @@ zipkin_debug_scribe_sender=False
|
||||
|
||||
debug = False
|
||||
db_tracing_enabled = True
|
||||
db_trace_as_spans = True
|
||||
|
||||
def set_backends(backends):
|
||||
"""
|
||||
|
@ -26,9 +26,9 @@ span_stack = corolocal.local()
|
||||
|
||||
def start(service_name, name, address, port, trace_info=None):
|
||||
parent_id = None
|
||||
if hasattr(span_stack, 'trace_id'):
|
||||
if tracing_started():
|
||||
trace_id = span_stack.trace_id
|
||||
parent_id = span_stack.spans[-1].id
|
||||
parent_id = cur_span().id
|
||||
else:
|
||||
if trace_info is None:
|
||||
trace_id = span_stack.trace_id = getId()
|
||||
@ -41,8 +41,19 @@ def start(service_name, name, address, port, trace_info=None):
|
||||
span_stack.spans.append(span)
|
||||
annotate('start', service_name, address, port)
|
||||
|
||||
def tracing_started():
|
||||
return hasattr(span_stack, 'trace_id')
|
||||
|
||||
def cur_span():
|
||||
if not tracing_started():
|
||||
start('orphan', 'orphan', '127.0.0.1', '1')
|
||||
return span_stack.spans[-1]
|
||||
|
||||
def get_trace_info():
|
||||
return (span_stack.trace_id, span_stack.spans[-1].id)
|
||||
if tracing_started():
|
||||
return (span_stack.trace_id, cur_span().id)
|
||||
else:
|
||||
return None
|
||||
|
||||
def stop(name):
|
||||
annotate('stop')
|
||||
@ -55,33 +66,31 @@ def stop(name):
|
||||
|
||||
def annotate(value, service_name=None, address=None, port=None, duration=None):
|
||||
"""add an annotation at a particular point in time (with an optional duration)"""
|
||||
cur_span = span_stack.spans[-1]
|
||||
# attempt to default some values
|
||||
if service_name is None:
|
||||
service_name = cur_span.notes[0].service_name
|
||||
service_name = cur_span().notes[0].service_name
|
||||
if address is None:
|
||||
address = cur_span.notes[0].address
|
||||
address = cur_span().notes[0].address
|
||||
if port is None:
|
||||
port = cur_span.notes[0].port
|
||||
port = cur_span().notes[0].port
|
||||
if duration is None:
|
||||
duration = 0
|
||||
note = Note(time.time(), str(value), service_name, address, int(port),
|
||||
int(duration))
|
||||
span_stack.spans[-1].notes.append(note)
|
||||
cur_span().notes.append(note)
|
||||
|
||||
def tag(key, value, service_name=None, address=None, port=None):
|
||||
"""add a key/value tag to the current span. values can be int,
|
||||
float, or string."""
|
||||
assert isinstance(value, str) or isinstance(value, int) or isinstance(value, float)
|
||||
cur_span = span_stack.spans[-1]
|
||||
if service_name is None:
|
||||
service_name = cur_span.notes[0].service_name
|
||||
service_name = cur_span().notes[0].service_name
|
||||
if address is None:
|
||||
address = cur_span.notes[0].address
|
||||
address = cur_span().notes[0].address
|
||||
if port is None:
|
||||
port = cur_span.notes[0].port
|
||||
port = cur_span().notes[0].port
|
||||
tag = Tag(str(key), value, service_name, address, port)
|
||||
span_stack.spans[-1].dimensions.append(tag)
|
||||
cur_span().dimensions.append(tag)
|
||||
|
||||
def getId():
|
||||
return random.randrange(sys.maxint >> 10)
|
||||
@ -122,7 +131,8 @@ def before_execute(name):
|
||||
port = conn.connection.connection.port
|
||||
#print >>sys.stderr, 'connection is {0}:{1}'.format(h, port)
|
||||
#print >>sys.stderr, 'sql statement is {0}'.format(clauseelement)
|
||||
start(str(name) + 'db client', 'execute', h, port)
|
||||
if config.db_trace_as_spans:
|
||||
start(str(name) + 'db client', 'execute', h, port)
|
||||
annotate(clauseelement)
|
||||
return handler
|
||||
|
||||
@ -131,7 +141,15 @@ def after_execute(name):
|
||||
def handler(conn, clauseelement, multiparams, params, result):
|
||||
if not config.db_tracing_enabled:
|
||||
return
|
||||
stop('execute')
|
||||
annotate(clauseelement)
|
||||
# fix up the duration on the annotation for the sql query
|
||||
start_time = cur_span().notes[0].time
|
||||
last_note = cur_span().notes.pop()
|
||||
cur_span().notes.append(Note(last_note.time, last_note.value,
|
||||
last_note.service_name, last_note.address,
|
||||
last_note.port, time.time() - start_time))
|
||||
if config.db_trace_as_spans:
|
||||
stop('execute')
|
||||
return handler
|
||||
|
||||
def dbapi_error(name):
|
||||
@ -153,7 +171,9 @@ def start_http(service_name, name, request):
|
||||
start(service_name, name, host, port, trace_info)
|
||||
|
||||
def add_trace_info_header(headers):
|
||||
headers['X-Trace-Info'] = base64.b64encode(pickle.dumps(get_trace_info()))
|
||||
trace_info = get_trace_info()
|
||||
if trace_info:
|
||||
headers['X-Trace-Info'] = base64.b64encode(pickle.dumps(trace_info))
|
||||
|
||||
|
||||
## WSGI middleware
|
||||
|
Loading…
x
Reference in New Issue
Block a user