Source code for revenge.threads.threads


import logging
from prettytable import PrettyTable
import collections

from revenge import common
from revenge.exceptions import *

logger = logging.getLogger(__name__)


[docs]class Threads(object): def __init__(self, process): self._process = process # tid: list of exceptions, appended as they come in self._exceptions = collections.defaultdict(lambda: list()) # tid: breakpoint_context to keep track of when we hit a breakpoint # and what the thread state is at that point self._breakpoint_context = {} # addr: bytes -- what bytes were in the binary at this location before # we overwrote them with stuff (like Frida Interceptor) self._breakpoint_original_bytes = {} def __len__(self): return len(self.threads) def __iter__(self): return iter(self.threads) def __repr__(self): return "<{} {}>".format(len(self), "Thread" if len(self) == 1 else "Threads") def __str__(self): table = PrettyTable(['id', 'state', 'pc', 'module', 'Trace', 'Breakpoint']) for thread in self: table.add_row([str(thread.id), thread.state, self._process.memory.describe_address(thread.pc).split(":")[-1], thread.module, 'Yes' if thread.trace is not None else 'No', "Yes" if thread.breakpoint else "No" ]) return str(table) def __getitem__(self, elm): if type(elm) is int: try: return next(thread for thread in self.threads if thread.id == elm) except StopIteration: # If this is the Frida thread, it will be hidden. Create a dummy one if self._process.engine.run_script_generic(r"""send(Process.getCurrentThreadId())""", unload=True, raw=True)[0][0] == elm: return Thread(self._process, {'id': elm, 'state': 'waiting', 'context': {'pc': '0'}}) logger.error("Invalid thread id selected.") elif isinstance(elm, Thread): return elm else: logger.error("Not sure how to handle this.")
[docs] def create(self, callback): """Create and start a new thread on the given callback. Args: callback: Pointer to function to start the thread on. This can be created via CModule, NativeCallback or use an existing function in the binary Returns: revenge.threads.Thread: The new thread that was created or None if either the thread create failed or the thread finished before this method returned. Example: .. code-block:: python3 # Create a stupid callback that just spins func = process.memory.create_c_function("void func() { while ( 1 ) { ; } }") # Start the thread t = process.threads.create(func.address) assert isinstance(t, revenge.threads.thread.Thread) # View it running print(process.threads) # Grab the return value (in this case the thread won't end though) return_val = t.join() """ pre = set([t.id for t in self]) out = create_thread(self._process, callback) post = set([t.id for t in self]) diff = post.difference(pre) if diff != {}: diff = list(diff) if len(diff) == 0: # It may just already be done... Not necessarily an error # Create a mock thread new_thread = Thread(self._process, {'context': {'pc': "0x0"}, 'id': 0, 'state': 'completed'}) elif len(diff) > 1: logger.warning("More than one thread has been created... Returning first.") else: new_thread = self[diff[0]] if self._process.device_platform == 'linux': new_thread.pthread_id = out return new_thread
[docs] @common.validate_argument_types(name=str) def _register_plugin(self, plugin, name): """Registers this plugin to be exposed as a thread plugin. Args: plugin (callable): A class constructor. Must take an argument for the current thread name (str): What will this be called? The plugin will be instantiated at most once per thread instance, and done only when referenced. Examples: .. code-block:: python class MyPlugin: @classmethod def _thread_plugin(klass, thread): self = klass() self._thread = module return self process.threads._register_plugin(MyPlugin._thread_plugin, "myplugin") # This first call will instantiate the plugin process.threads[1234].myplugin """ def getter(self): try: return getattr(self, "__" + name) except AttributeError: setattr(self, "__" + name, plugin(self)) return getattr(self, "__" + name) if not callable(plugin): raise RevengeInvalidArgumentType("plugin must be callable") if name in Thread.__dict__: raise RevengeModulePluginAlreadyRegistered("Property name " + name + " is already taken.") # Add the new plugin setattr(Thread, name, property(getter, doc=plugin.__doc__))
@property def threads(self): """Current snapshop of active threads.""" threads = self._process.engine.run_script_generic( "send(Process.enumerateThreadsSync());", raw=True, unload=True)[0][0] return [Thread(self._process, thread) for thread in threads]
from . import Thread from .create import create_thread