Source code for revenge.modules.modules


import logging
logger = logging.getLogger(__name__)

import os
import collections
from prettytable import PrettyTable
import datetime
from fnmatch import fnmatch
import time

from .. import common, types

[docs]class Modules(object): def __init__(self, process): self._process = process # key == module name, value == dict of symbol->address self._symbol_to_address = {} # key == address, value == symbol self._address_to_symbol = {} # key == symbol, value = address (resolved global symbols) self._global_symbol_to_address = {} self.__last_update = datetime.datetime(1970,1,1)
[docs] def lookup_offset(self, symbol): """Lookup raw file offset to symbol. Returns: tuple: (module_name, offset) or None if cannot resolve See examples from modules.lookup_symbol """ # Resolve symbol a = self._process.memory[symbol] if a is None: logger.error("lookup_offset: Cannot resolve '{}'.".format(symbol)) return m = self[a.address] if m is None: return None return (m.name, a.address - m.base)
[docs] def lookup_symbol(self, symbol): """Generically resolve a symbol. Examples: resolve_symbol(":strlen") -> returns address of strlen resolved globally. resolve_symbol("strlen") -> equivalent to above resolve_symbol("strlen+0xf") -> strlen offset by 0xf resolve_symbol("a.out:main") -> returns address of main resolved to a.out. resolve_symbol(0x12345) -> returns symbol at that address. """ # Resolve address to symbol if isinstance(symbol, int): try: return self._address_to_symbol[symbol] except KeyError: return None module, offset, symbol = common.parse_location_string(symbol) # Check the caches first if module != "": # First try to resolve with local symbol table try: return self._symbol_to_address[module][symbol] + int(offset, 16) except KeyError: pass else: try: return self._global_symbol_to_address[symbol] + int(offset, 16) except KeyError: pass # Fall back to asking Frida to resolve it replace_vars = { "FUNCTION_SYMBOL_HERE": symbol, "FUNCTION_MODULE_HERE": module, "FUNCTION_OFFSET_HERE": offset, } location_resolved = self._process.engine.run_script_generic("resolve_location_address.js", replace=replace_vars, unload=True)[0] if location_resolved == []: raise RevengeSymbolLookupFailure("Cannot resolve symbol.") location_resolved = common.auto_int(location_resolved[0]) # Update the caches, but only with the base symbol, not with offset if module == "": self._global_symbol_to_address[symbol] = location_resolved - int(offset,16) return types.Pointer(location_resolved)
[docs] @common.validate_argument_types(library=str) def load_library(self, library): """Dynamically load a library into the program. Args: library (str): The full path to the library on the process machine Returns: revenge.modules.Module: RetuRns the new loaded module or None on error. Examples: .. code-block:: python3 selinux = process.modules.load_library("/lib/x86_64-linux-gnu/libselinux.so.1") This will eventually be implemented across all platforms. For now, it only works on linux platforms. """ def load_linux(self, library): dlopen = self._process.memory[':dlopen'] dlopen.argument_types = types.Pointer, types.Int32 if dlopen is None: logger.error("Unable to locate dlopen. Cannot dynamically load.") return # Assuming non-lazy load and exporting symbols for now. out = dlopen(library, 0x102) # dlopen is reporting an error if out == 0: return False self._flush_cache() return self[os.path.basename(library)] if self._process.device_platform == "linux": return load_linux(self, library) else: logger.error("Not yet supported platform for load_library: {}".format(self._process.device_platform))
[docs] @common.validate_argument_types(name=str) def _register_plugin(self, plugin, name): """Registers this plugin to be exposed as a module plugin. Args: plugin (callable): A class constructor. Must take an argument for the current module name (str): What will this be called? The plugin will be instantiated at most once per module instance, and done only when referenced. Examples: .. code-block:: python class MyPlugin: @classmethod def _modules_plugin(klass, module): self = klass() self._module = module return self process.modules._register_plugin(MyPlugin._modules_plugin, "myplugin") # This first call will instantiate the plugin process.modules['proc_name'].myplugin """ def getter(self): try: return getattr(self, "__" + name) except AttributeError: setattr(self, "__" + name, plugin(self)) return getattr(self, "__" + name) if not callable(plugin): raise RevengeInvalidArgumentType("plugin must be callable") if name in Module.__dict__: raise RevengeModulePluginAlreadyRegistered("Property name " + name + " is already taken.") # Add the new plugin setattr(Module, name, property(getter, doc=plugin.__doc__))
[docs] def _flush_cache(self): """Make sure the next time we're hit is a full one.""" self.__last_update = datetime.datetime(1970, 1, 1)
def __iter__(self): return self.modules.__iter__() def __len__(self): return len(self.modules) def __repr__(self): attrs = ['Modules', str(len(self))] return "<{}>".format(' '.join(attrs)) def __str__(self): table = PrettyTable(['name', 'base', 'size', 'path']) for module in self: table.add_row([module.name, hex(module.base), hex(module.size), module.path]) table.align['path'] = 'l' return str(table) def __getitem__(self, item): # Resolve module by it's name if isinstance(item, str): return next(mod for mod in self if fnmatch(mod.name, item)) if isinstance(item, types.Telescope): item = int(item) # Resolve module by the address if isinstance(item, int): for mod in self: if item >= mod.base and item <= mod.base + mod.size: return mod return None raise NotImplementedError @property def modules(self): """list: Return list of modules.""" # Time to update the cache if datetime.datetime.now() - self.__last_update > datetime.timedelta(seconds=0.5): mods = self._process.engine.run_script_generic("""send(Process.enumerateModulesSync());""", raw=True, unload=True)[0][0] self.__modules = [Module(self._process, name=mod['name'], base=mod['base'], size=mod['size'], path=mod['path']) for mod in mods] self.__last_update = datetime.datetime.now() return self.__modules
from .module import Module from ..exceptions import *