Source code for revenge.plugins.radare2.radare2


import logging
from collections.abc import Iterable
from ... import common
from .. import Plugin

# TODO: For now, if you use both process.radare2 and module[main_object].radare2,
# you will open up a duplicate r2 session. This shouldn't hurt anything
# but it's definitely not optimal


[docs]class Radare2(Plugin): _R2WARNING = False def __init__(self, process, module=None): """Use radare2 to enrich reversing information. Examples: .. code-block:: python # # Normal enrichment works without connection # Radare2 plugin can enrich a remote instance of r2 with more # information as well. # In different window, open r2 # r2 -A /bin/ls # Start up web server # =h& 12345 # Connect up to it with revenge process.radare2.connect("http://127.0.0.1:12345") # Highlight paths that have executed timeless = process.techniques.NativeTimelessTracer() timeless.apply() # Do whatever t = list(timeless)[0] process.radare2.highlight(t) """ self._process = process self._module = module or list(self._process.modules)[0] self._r2exe = None self._r2 = None self._find_r2() if self._r2exe is None: # Looks like we don't have r2 installed if not Radare2._R2WARNING: LOGGER.warning("Cannot find radare2 in your path. Disabling this functionality.") Radare2._R2WARNING = True return try: self._process.modules._register_plugin(Radare2._modules_plugin, "radare2") except RevengeModulePluginAlreadyRegistered: # This will error out if we're already registered pass self._load_file() # Register myself as a decompiler if Ghidra plugin is present if isinstance(self.decompiler, GhidraDecompiler): try: self._process.decompiler._register_decompiler(self.decompiler, 70) except RevengeDecompilerAlreadyRegisteredError: pass # Always clean ourselves up at exit self._process._register_cleanup(self.disconnect) # TODO: Add base information about file
[docs] def analyze(self): """Ask radare2 to run some auto analysis on this file. Note: This is NOT run by default due to the fact that it may take a while to run. If you connect to a remote session that has already run analysis, you do NOT need to run this. """ if self._r2 is None: return False self._r2.cmd("aaa") return True
def _load_file(self): """Attempt to load this revenge file.""" """ if not os.path.isfile(self._process.argv[0]): LOGGER.warning("Cannot find file to load.") return """ self._r2 = r2pipe.open(common.load_file(self._process, self._module.path).name) try: self._r2.cmd('i') except BrokenPipeError: self._r2 = None LOGGER.error("Error opening file " + self._module.name + " with r2.") def _find_r2(self): """Locate and save what radare2 we're dealing with.""" r2_names = ["radare2", "r2", "radare2.exe", "r2.exe"] self._r2exe = None for name in r2_names: path = shutil.which(name) if path is not None: self._r2exe = path LOGGER.debug("Found r2 at: " + path) return LOGGER.debug("Couldn't find r2...") @classmethod def _modules_plugin(klass, module): return klass(module._process, module) @common.validate_argument_types(address=int, color=str) def _highlight_address(self, address, color): """Highlights this address with the given color. Args: address (int): What address to color? color (str): What to color? The address will NOT be adjusted nor checked for correctness. This is meant to be called from highlight. Color can be found in r2 by typing "ecs". """ if self._r2 is None: LOGGER.warning("Can't find connected r2 instance...") return # First gotta clear any existing color # ecH- is broken atm. ecHi __should__ overwrite it though... #self._r2.cmd("ecH-@" + hex(address)) # Now add the new color self._r2.cmd("ecHi " + color + "@" + hex(address))
[docs] def highlight(self, what): """Highlights an instruction or list of instructions. Args: what (int, list, tuple): Address to highlight. Note: The addresses should be instantiated from this revenge process. Highlight will determine the correct offset to use for highlighting automatically. This is likely only useful when you have connected to a remote r2 session as you won't see the color locally. """ # Standardize to list if not isinstance(what, Iterable): what = [what] # Standardize addrs addrs = [] for thing in what: if isinstance(thing, int): addrs.append(int(thing)) elif hasattr(thing, 'context'): addrs.append(int(thing.context.ip)) else: LOGGER.error("Unhandled thing type of " + type(thing)) # Color-up for addr in addrs: out = self._process.modules.lookup_offset(addr) # Addr didn't end up in something mapped... if out is None: continue name, addr = out # Don't both coloring things that aren't in our opened binary... if name != self.file: continue self._highlight_address(self.base_address + addr, "cyan")
[docs] @common.validate_argument_types(web_server=str) def connect(self, web_server): """Connect to a separate session to work in tandem. Args: web_server (str): Web server to connect to. Examples: .. code-block:: python # On existing r2 instance, start web listener on port 12345 # =h& 12345 # Now tell this r2 plugin to connect to it process.radare2.connect("http://127.0.0.1:12345") """ orig_r2 = self._r2 # Can't have trailing slash web_server = web_server.rstrip("/") print("Connecting to " + web_server + " ... ", end='', flush=True) self._r2 = r2pipe.open(web_server) # r2pipe doesn't actually error on open, we gotta check if self._r2.cmd('i') is None: cprint("[ Error ]", color='red') self._r2.quit() self._r2 = orig_r2 else: # We've got a good connection, kill the old if orig_r2 is not None: orig_r2.quit() # Verify name if self.file.lower() != self._process.file_name.lower(): cprint("[ File Names Differ! ]", color='yellow') else: cprint("[ OK ]", color='green')
[docs] def disconnect(self): """Disconnect from web server.""" if self._r2 is not None: try: self._r2.quit() except ConnectionResetError: pass self._r2 = None
def __repr__(self): return "<Radare2 Plugin>" @property def _is_valid(self): return self._r2exe is not None @property def _has_ghidra(self): """bool: Does this instance of r2 have the ghidra decompiler?""" return "Ghidra" in self._r2.cmd('pdg?') @property def decompiler(self): """Either returns an instance of a decompiler (if one is valid) or None.""" try: return self.__decompiler except AttributeError: pass if self._r2 is not None: if self._has_ghidra: self.__decompiler = GhidraDecompiler(self) else: self.__decompiler = None else: self.__decompiler = None return self.__decompiler @decompiler.setter @common.validate_argument_types(decompiler=str) def decompiler(self, decompiler): if decompiler.lower() == "ghidra": self.__decompiler = GhidraDecompiler(self) else: raise RevengeInvalidArgumentType("Radare2 Invalid decompiler selected. Options are: ghidra") @property def file(self): if self._r2 is None: return return os.path.basename(self._r2.cmdj("ij")['core']['file']) @property def base_address(self): if self._r2 is None: return return self._r2.cmdj('ij')['bin']['baddr']
import os import shutil import r2pipe from termcolor import colored, cprint from ...exceptions import * from .decompilers import GhidraDecompiler LOGGER = logging.getLogger(__name__) Radare2.__doc__ = Radare2.__init__.__doc__