Source code for revenge.threads.thread

import logging
logger = logging.getLogger(__name__)

from prettytable import PrettyTable

from .. import common, types

[docs]class Thread(object): def __init__(self, process, info): """Defines a process thread. Args: info (dict): frida thread info dict Examples: .. code-block:: python # Grab your thread thread = process.threads[tid] # Wait for this thread to return thread.join() # Check out any exceptions that may have been thrown on this thread thread.exceptions # Check out the attached trace object thread.trace """ self._process = process self.pthread_id = None self._info = info self._context = CPUContext(self._process, **self._info['context'])
[docs] def join(self): """Traditional thread join. Wait for thread to exit and return the thread's return value.""" if self.pthread_id is not None: # TODO: pthread_out cache pool # TODO: generalize memory cache pools pthread_join = self._process.memory['pthread_join'] pthread_join.argument_types = types.Int64, types.Pointer pthread_out = self._process.memory.alloc(8) pthread_join(self.pthread_id, pthread_out.address) val = pthread_out.cast(types.Pointer) pthread_out.free() return val else: logger.error("Thread join not yet supported on {}".format(self.device_platform))
[docs] def kill(self): """Attempts to kill this thread. Note: If you're having trouble killing the thread, be sure your thread is killable. For pthreads, that means: pthread_setcancelstate(0, 0); pthread_setcanceltype(1,0) """ if self.pthread_id is not None: pthread_cancel = self._process.memory['pthread_cancel'] pthread_cancel(self.pthread_id) else: logger.error("Thread kill not yet supported on {}".format(self.device_platform))
def __repr__(self): attrs = ['Thread', hex(self.id), '@', hex(self.pc), self.state, self.module] if self.trace is not None: attrs.append('tracing') return "<{}>".format(' '.join(attrs)) def __getattr__(self, elm): # return common.auto_int(self._info['context'][elm]) if elm.startswith("__"): raise AttributeError return getattr(self.context, elm) def __str__(self): table = PrettyTable(['attr', 'value']) table.add_row(['TID', str(self.id)]) table.add_row(["State", self.state]) table.add_row(["Module", self.module]) table.add_row(["Tracing?", "Yes" if self.trace is not None else "No"]) table.add_row(["Breakpoint?", "Yes" if self.breakpoint else "No"]) """ for reg in self._info['context']: table.add_row([reg, hex(getattr(self, reg))]) """ table.header = False table.align = "l" return str(table) + '\n' + str(self.context) @property def breakpoint(self): """bool: Is this thread at a breakpoint?""" return self.id in self._process.threads._breakpoint_context @breakpoint.setter @common.validate_argument_types(breakpoint=bool) def breakpoint(self, breakpoint): if breakpoint is not False: logger.error("Can only unset breakpoints via thread. Try: breakpoint = False") return if self.breakpoint is not True: return self._process.memory[self.pc].breakpoint = False @property def id(self) -> int: """Thread ID""" return self._info['id'] @property def state(self) -> str: """Thread state, such as 'waiting', 'suspended'""" return self._info['state'] @property def pc(self) -> int: """The current program counter/instruction pointer.""" return self.context.pc @property def module(self) -> str: """What module is the thread's program counter in? i.e.: libc-2.27.so.""" mod = self._process.modules[self.pc] return mod.name if mod is not None else "Unknown" @property def trace(self): """revenge.tracer.instruction_tracer.Trace: Returns Trace object if this thread is currently being traced, otherwise None.""" if self.id in self._process.techniques._active_stalks: return self._process.techniques._active_stalks[self.id] @property def exceptions(self): """list: Exceptions that have been caught generically for this thread.""" return self._process.threads._exceptions[self.id] @property def context(self): """revenge.cpu.contexts.CPUContext: The current context for this thread.""" # self.context = CPUContext(self._process, **self._info['context']) # First, if we're at a breakpoint, use that context instead as it is # more accurate if self.breakpoint: return self._process.threads._breakpoint_context[self.id] return self._context
from ..cpu import CPUContext # Doc fixup Thread.__doc__ = Thread.__init__.__doc__