import logging
logger = logging.getLogger(__name__)
from termcolor import cprint, colored
from prettytable import PrettyTable
[docs]class NativeTimelessTrace(object):
def __init__(self, process, thread):
"""Container for NativeTimelessTrace items
Args:
process (revenge.process.Process): Active process object.
thread (int): Thread ID for this trace or Thread object
"""
self._process = process
self._thread = self._process.threads[thread]
self._trace = [] # List of actual trace items
self._script = None
[docs] def start(self):
"""Start tracing."""
s = "timeless_trace({})".format(self._thread.id)
self._process.engine.run_script_generic(s,
raw=True,
include_js=("dispose.js", "send_batch.js", "stalk.js", "telescope.js", "timeless.js"),
unload=False,
on_message=self._parse_items_cb,
runtime='v8'
)
self._script = self._process.engine._scripts.pop(0)
self._process.techniques._active_stalks[self._thread.id] = self
[docs] def stop(self):
"""Stop tracing."""
if self._script is not None:
self._script[0].exports.unfollow()
# As with NativeInstructionTracer, the script unload always causes a process crash for some reason
# TODO: Figure out why the f that happens
#self._script[0].unload()
self._process.techniques._active_stalks.pop(self._thread.id)
self._script = None
[docs] def wait_for(self, address):
"""Don't return until the given address is hit in the trace."""
address = self._process.memory[address].address
# TODO: Optimize this so I don't keep checking the same IPs over and over
while True:
try:
return next(x for x in self if int(x.context.pc) == address)
except StopIteration:
continue
def __str__(self):
table = PrettyTable(["inst"])
table.border = False
table.header = False
table.align = 'l'
for item in self:
table.add_row([" "*item.depth + str(item.context.pc.next.thing)])
return str(table)
def __len__(self):
return len(self._trace)
def __repr__(self):
l = len(self)
return "<NativeTimelessTrace {} {}>".format(
l,
"item" if l == 1 else "items"
)
def __iter__(self):
return self._trace.__iter__()
def __getitem__(self, item):
if isinstance(item, int):
return self._trace.__getitem__(item)
if isinstance(item, slice):
ret = NativeTimelessTrace(self._process, self._thread)
ret._trace = self._trace[item]
return ret
raise Exception("Unhandled getitem type of {}".format(type(item)))
def __len__(self):
return len(self._trace)
def _parse_items_cb(self, items, data):
"""Parse incoming trace items.
Args:
items (list): List of dicts.
This method will be called directly as a callback from the js.
"""
if items["type"] == "error":
logger.error("_parse_items_cb error: " + items["description"])
else:
for item in items["payload"]:
# Previous item used for diff generation
previous_item = self._trace[-1] if self._trace != [] else None
self._trace.append(NativeTimelessTraceItem.from_snapshot(self._process, item, previous=previous_item))
from pprint import pprint
from .timeless_trace_item import NativeTimelessTraceItem