from . import Colorer
import logging
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)
import frida
import colorama
colorama.init()
import os
import sys
from termcolor import cprint, colored
from prettytable import PrettyTable
import time
import atexit
import signal
import json
import pprint
import collections
import importlib
from . import common
here = os.path.dirname(os.path.abspath(__file__))
[docs]class Process(object):
def __init__(self, target, resume=False, verbose=False, load_symbols=None,
envp=None, engine=None, ignore_exceptions=False):
"""Represents a process.
Args:
target (str, int, list): File name or pid to attach to. If target
is a list, it will be set as argv.
resume (bool, optional): Resume the binary if need be after loading?
verbose (bool, optional): Enable verbose logging
load_symbols (list, optional): Only load symbols from those modules
in the list. Saves some startup time. Can use glob ('libc*')
envp (dict, optional): Specify what you want the environment
pointer list to look like. Defaults to whatever the current
envp is.
engine (revenge.engines.Engine): Instantiated Engine for this
process
ignore_exceptions (bool): Should we not attempt to generically
catch process exceptions? Default is False.
Examples:
.. code-block:: python3
# Kick off ls
p = revenge.Process("/bin/ls")
# Kick off ls for /tmp with custom environment
p = revenge.Process(["/bin/ls","/tmp/"], envp={'var1':'thing1'})
#
# Interaction
#
# Write to stdin
p.stdin(b"hello\n")
# Read from stdout
p.stdout(16)
# Read up to expected output in stdout
p.stdout("expected")
# Interact like a shell
p.interactive()
"""
self.__engine = engine
self.__file_name = None
self.__file_type = None
self.__entrypoint = None
self._resume_addr = None
self.__endianness = None
self.__bits = None
self._spawn_target = None
self.verbose = verbose
self._envp = envp
self.target = target
self._ignore_exceptions = ignore_exceptions
self._registered_cleanup = []
if not isinstance(load_symbols, (list, type, type(None))):
load_symbols = [load_symbols]
self._load_symbols = load_symbols
# self.memory = self.engine.memory.Memory(self)
self.memory = self.engine.memory
self.threads = Threads(self)
self.modules = Modules(self)
self.techniques = Techniques(self)
atexit.register(self._at_exit)
self.engine.start_session()
self._register_plugins()
# Cache arch right away so we don't have resource locks
self.arch
# TODO: move this into frida engine
# ELF binaries start up in ptrace, which causes some issues, shim at entrypoint so we can remove ptrace
if self._spawned_pid is not None and self.file_type == 'ELF':
# Set breakpoint at entry
self.memory[self.entrypoint].breakpoint = True
# Set breakpoints at exit calls
for c in [':exit', ':_exit']:
self.memory[c].breakpoint = True
# Resume to remove ptrace
self.engine.resume(self._spawned_pid)
if self.device.platform == "windows":
# Set exit breakpoints
for c in ['msvcrt.dll:exit', 'msvcrt.dll:_exit']:
self.memory[c].breakpoint = True
if self.device_platform == 'linux':
try:
str(self.threads)
except IndexError:
logger.error("Can't enumerate threads. Please check sysctl kernel.yama.ptrace_scope=0 or run as root.")
# Resume file if need be
if resume:
# If we are using a resume variable
if self.memory[self.entrypoint].breakpoint:
self.memory[self.entrypoint].breakpoint = False
else:
self.engine.resume(self._spawned_pid)
def _register_plugins(self):
"""Figures out which plugins to load and loads them."""
for plugin_name in dir(self.engine.plugins):
if plugin_name.startswith("_"):
continue
plugin_mod = getattr(self.engine.plugins, plugin_name)
for item, value in plugin_mod.__dict__.items():
if inspect.isclass(value) and issubclass(value, Plugin):
# Instantiate the plugin
plugin = value(self)
if plugin._is_valid:
setattr(self, item.lower(), plugin)
[docs] def quit(self):
"""Call to quit your session without exiting. Do NOT continue to use this object after.
If you spawned the process, it will be killed. If you attached to the
process, frida will be cleaned out, detatched, and the process should
continue normally.
"""
for c in self._registered_cleanup:
c()
self._at_exit()
[docs] def resume(self):
"""Resume execution of any current breakpoint hit or suspended thread."""
raise NotImplementedError("resume has not been implemented yet in this backend.")
def _register_cleanup(self, c):
self._registered_cleanup.append(c)
def _at_exit(self):
"""Called to clean-up at exit."""
self.engine._at_exit()
[docs] def target_type(self, x):
# Maybe it's PID
try:
return int(x)
# Probably process name
except Exception:
return x
[docs] @common.implement_in_engine()
def stdout(self, n):
"""Read n bytes from stdout.
Args:
n (int, str, bytes): Number of bytes to read or string to expect.
If no value is given, it's presumed you are trying to read
all currently queued output.
Returns:
bytes: Output of stdout
"""
pass
[docs] @common.implement_in_engine()
def stderr(self, n):
"""Read n bytes from stderr.
Args:
n (int, str, bytes): Number of bytes to read or string to expect.
If no value is given, it's presumed you are trying to read
all currently queued output.
Returns:
bytes: Output of stderr
"""
pass
[docs] @common.implement_in_engine()
def stdin(self, thing):
"""Write thing to stdin.
Args:
thing (str, bytes): If str, it will be encoded as latin-1.
Note: There's no newline auto appended. Remember to add one if you want it.
"""
pass
[docs] @common.implement_in_engine()
def interactive(self):
"""Go interactive. Return back to your shell with ctrl-c."""
pass
def __repr__(self):
attrs = ['Process', self.file_name + ":" + str(self.pid)]
return '<' + ' '.join(attrs) + '>'
############
# Property #
############
@property
def argv(self):
"""list: argv for this process instantitation."""
return self.__argv
@argv.setter
@common.validate_argument_types(argv=(list, tuple))
def argv(self, argv):
self.__argv = argv
@property
def device_platform(self):
"""Wrapper to discover the device's platform."""
self.device_platform = self.engine.run_script_generic(r"send(Process.platform)", raw=True, unload=True)[0][0]
return self.__device_platform
@device_platform.setter
def device_platform(self, platform):
self.__device_platform = platform
@property
def pid(self):
return self.engine.session._impl.pid
@property
def entrypoint(self):
"""int: Returns the entrypoint for this running program."""
mod = self.modules[self.file_name]
if self.__entrypoint is None:
if self.file_type == 'ELF':
self.__entrypoint = self.memory[mod.base + 0x18].pointer
if mod.elf.type_str == 'DYN':
self.__entrypoint = self.__entrypoint + mod.base
elif self.file_type == "PE":
me = self.modules[self.file_name]
self.__entrypoint = me.base + me.pe.OPTIONAL_HEADER.AddressOfEntryPoint
else:
logger.warn('entrypoint not implemented for file of type {}'.format(self.file_type))
return None
# TODO: Mac?
return self.__entrypoint
@property
def endianness(self):
"""Determine which endianness this binary is. (little, big)"""
if self.__endianness is not None:
return self.__endianness
if self.device_platform == 'windows':
# TODO: Technically assumption, but like 99% of the time it's right.
self.__endianness = 'little'
elif self.file_type == 'ELF':
endianness = self.engine.run_script_generic("""send(ptr(Number(Process.enumerateModulesSync()[0].base) + 5).readS8())""", raw=True, unload=True)[0][0]
self.__endianness = 'little' if endianness == 1 else 'big'
else:
logger.warn("Unhandled endianness check for ({}, {}), assuming little".format(self.file_type, self.device_platform))
return self.__endianness
@property
def file_type(self):
"""Guesses the file type."""
# TODO: Android processes we attach to can't getModuleByName for their file name...
# Maybe use "app_process64" instead... Or resolve the first module and go with that..
if isinstance(self.device, devices.AndroidDevice):
return "ELF"
# TODO: Update this with other formats. PE/COFF/MACHO/etc
if self.__file_type is None:
# Sometimes the file name and module name differ (such as /bin/sh -> dash)
# To handle this, we're assuming the core module will always come back first
# in the list.
me = list(self.modules)[0]
b = self.memory[me.base:me.base + 16].bytes
if b.startswith(b'\x7fELF'):
self.__file_type = 'ELF'
elif b.startswith(b'MZ'):
self.__file_type = "PE"
else:
self.__file_type = 'Unknown'
return self.__file_type
@property
def file_name(self):
"""str: The base file name."""
# TODO: This assumes the base module is always first...
if self.__file_name is None:
self.__file_name = self.engine.run_script_generic("""send(Process.enumerateModulesSync())""", raw=True, unload=True)[0][0][0]['name']
return self.__file_name
@property
def bits(self):
"""int: How many bits is the CPU?"""
if self.__bits is None:
self.__bits = self.engine.run_script_generic("""send(Process.pointerSize);""", raw=True, unload=True)[0][0] * 8
return self.__bits
@property
def arch(self):
"""str: What architecture? (x64, ia32, arm, others?)"""
try:
return self.__arch
except Exception:
known_arch = ['x64', 'ia32', 'arm']
arch = self.engine.run_script_generic("""send(Process.arch);""", raw=True, unload=True)[0][0]
if arch not in known_arch:
raise Exception("Unknown arch returned from Frida: {}".format(arch))
self.__arch = arch
return self.__arch
@property
def verbose(self):
"""bool: Output extra debugging information."""
return self.__verbose
@verbose.setter
def verbose(self, verbose):
if verbose:
logging.getLogger().setLevel(logging.DEBUG)
self.__verbose = verbose
@property
def target(self):
"""str, int: Target for this session."""
return self.__target
@target.setter
def target(self, target):
# target set will implicitly set argv
if isinstance(target, (list, tuple)):
self.argv = list(target)
target = target[0]
else:
# Place holder until we resolve target
self.argv = [target]
# Check if this is a pid
try:
p = next(x for x in self.engine._frida_device.enumerate_processes() if x.pid == common.auto_int(target))
target = p.pid
self.__file_name = p.name
self.argv[0] = p.name
except (StopIteration, ValueError):
pass
if isinstance(target, str):
full_path = os.path.abspath(target)
self.__file_name = os.path.basename(full_path)
# If this string points to an actual file, we will launch it later
if os.path.isfile(full_path):
self._spawn_target = full_path
self.__target = target
@property
def alive(self):
"""bool: Is this process still alive?"""
try:
next(True for x in self.engine._frida_device.enumerate_processes() if x.pid == self.pid)
return True
except StopIteration:
return False
@property
def device(self):
"""revenge.devices.BaseDevice: What device is this process associated with?"""
return self.engine.device
@property
def BatchContext(self):
"""Returns a BatchContext class for this process.
Example:
.. code-block:: python3
with process.BatchContext() as context:
something(context=context)
"""
return lambda *args, **kwargs: BatchContext(self, *args, **kwargs)
@property
def _envp(self):
"""dict: This holds the USER SPECIFIED environment variables. If you
do not specify envp, this will be empty but your program will still
have the default environment."""
return self.__envp
@_envp.setter
@common.validate_argument_types(envp=(dict, type(None)))
def _envp(self, envp):
self.__envp = envp
@property
def engine(self):
"""The current engine revenge is using."""
try:
self.__engine._process
except AttributeError:
self.__engine._process = self
return self.__engine
import inspect
from . import types, config, devices
from .memory import Memory
from .threads import Threads
from .modules import Modules
from .contexts import BatchContext
from .exceptions import *
from .techniques import Techniques
from .plugins import Plugin
from .engines import Engine
# Doc fixups
Process.BatchContext.__doc__ += BatchContext.__init__.__doc__
Process.__doc__ = Process.__init__.__doc__
def sigint_handler(sig, frame):
sys.exit()
signal.signal(signal.SIGINT, sigint_handler)
def main():
signal.signal(signal.SIGINT, sigint_handler)
global process
process = Process()
while True:
time.sleep(1)
if __name__ == '__main__':
main()