Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

![Rattle](logo_s.png)

Rattle is an EVM binary static analysis framework designed to work on deployed smart contracts. Rattle takes EVM byte strings, uses a flow-sensitive analysis to recover the original control flow graph, lifts the control flow graph into an SSA/infinite register form, and optimizes the SSA – removing DUPs, SWAPs, PUSHs, and POPs. The conversion from a stack machine to SSA form removes 60%+ of all EVM instructions and presents a much friendlier interface to those who wish to read the smart contracts they’re interacting with.
Rattle is an EVM binary static analysis framework designed to work on deployed smart contracts. Rattle takes EVM byte strings, uses a flow-sensitive analysis to recover the original control flow graph, lifts the control flow graph into an SSA/infinite register form, and optimizes the SSA – removing DUPs, SWAPs, PUSHs, and POPs. The conversion from a stack machine to SSA form removes 60%+ of all EVM instructions and presents a much friendlier interface to those who wish to read the smart contracts they’re interacting with. Note: Rattle currently supports only macOS and Linux; Windows users may experience permission issues. [Read more](https://github.com/crytic/rattle/issues/40)

## Example

Expand All @@ -24,6 +24,12 @@ If you're running rattle on a contract you can compile with solidity, use the `-
```console
$ solc --bin-runtime KingOfTheEtherThrone.sol 2>/dev/null | tail -n1 > contract.bin
```
## Export as JSON file
To run an analysis that outputs both control flow graphs in PNG format and a detailed JSON file with the analysis results, use the following command:

```bash
python3 rattle-cli.py --input inputs/kingofether/KingOfTheEtherThrone.bin --output_json output_jsons/0x00b2c0d8ca00002ea0d4881b58500a0470a336cc.json -O
```

## Dependencies

Expand Down
200 changes: 82 additions & 118 deletions rattle/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import json
import logging
Expand All @@ -9,29 +8,24 @@
import sys
import tempfile
from typing import Sequence

import rattle

# This might not be true, but I have a habit of running the wrong python version and this is to save me frustration
# Ensure Python 3.6 or higher
assert (sys.version_info.major >= 3 and sys.version_info.minor >= 6)

logger = logging.getLogger(__name__)


def main(argv: Sequence[str] = tuple(sys.argv)) -> None: # run me with python3, fool
parser = argparse.ArgumentParser(
description='rattle ethereum evm binary analysis')
def main(argv: Sequence[str] = tuple(sys.argv)) -> None:
parser = argparse.ArgumentParser(description='rattle ethereum evm binary analysis')
parser.add_argument('--input', '-i', type=argparse.FileType('rb'), help='input evm file')
parser.add_argument('--optimize', '-O', action='store_true', help='optimize resulting SSA form')
parser.add_argument('--no-split-functions', '-nsf', action='store_false', help='split functions')
parser.add_argument('--log', type=argparse.FileType('w'), default=sys.stdout,
help='log output file (default stdout)')
parser.add_argument('--verbosity', '-v', type=str, default="None",
help='log output verbosity (None, Critical, Error, Warning, Info, Debug)')
parser.add_argument('--log', type=argparse.FileType('w'), default=sys.stdout, help='log output file (default stdout)')
parser.add_argument('--verbosity', '-v', type=str, default="None", help='log output verbosity (None, Critical, Error, Warning, Info, Debug)')
parser.add_argument('--supplemental_cfg_file', type=argparse.FileType('rb'), default=None, help='optional cfg file')
parser.add_argument('--stdout_to', type=argparse.FileType('wt'), default=None, help='redirect stdout to file')
parser.add_argument('--output_json', type=str, default="output.json", help='output JSON file')
args = parser.parse_args(argv[1:])

if args.input is None:
parser.print_usage()
sys.exit(1)
Expand All @@ -52,160 +46,130 @@ def main(argv: Sequence[str] = tuple(sys.argv)) -> None: # run me with python3,
logging.basicConfig(stream=args.log, level=loglevel)
logger.info(f"Rattle running on input: {args.input.name}")

ssa = rattle.Recover(args.input.read(), edges=edges, optimize=args.optimize,
split_functions=args.no_split_functions)

print(ssa)

print("Identified Functions:")
for function in sorted(ssa.functions, key=lambda f: f.offset):
print(f'\t{function.desc()} argument offsets:{function.arguments()}')

print("")

print("Storage Locations: " + repr(ssa.storage))
print("Memory Locations: " + repr(ssa.memory))
# Perform analysis
ssa = rattle.Recover(args.input.read(), edges=edges, optimize=args.optimize, split_functions=args.no_split_functions)

for location in [x for x in ssa.memory if x > 0x20]:
print(f"Analyzing Memory Location: {location}\n")
for insn in sorted(ssa.memory_at(location), key=lambda i: i.offset):
print(f'\t{insn.offset:#x}: {insn}')
print('\n\n')
# Prepare JSON output
output_data = {
"functions": [],
"storage_locations": [],
"memory_locations": [],
"can_send_ether": False,
"calls": []
}

# Collect function information
for function in sorted(ssa.functions, key=lambda f: f.offset):
print(f"Function {function.desc()} storage:")
func_data = {
"name": function.desc(),
"offset": function.offset,
"arguments": function.arguments(),
"storage": []
}
for location in function.storage:
print(f"\tAnalyzing Storage Location: {location}")
storage_data = {
"location": location,
"instructions": []
}
for insn in sorted(ssa.storage_at(location), key=lambda i: i.offset):
print(f'\t\t{insn.offset:#x}: {insn}')
print('\n')

'''
print("Tracing SLOAD(0) (ignoring ANDs)")
for insn in ssa.storage_at(0):
print(insn)
if insn.insn.name == 'SLOAD':
g = rattle.DefUseGraph(insn.return_value)
print(g.dot(lambda x: x.insn.name in ('AND', )))
print('\n')
'''
storage_data["instructions"].append({
"offset": insn.offset,
"instruction": str(insn)
})
func_data["storage"].append(storage_data)
output_data["functions"].append(func_data)

# Collect storage locations
output_data["storage_locations"] = list(ssa.storage)

# Collect memory locations
output_data["memory_locations"] = list(ssa.memory)

# Check if the contract can send ether
can_send, functions_that_can_send = ssa.can_send_ether()
output_data["can_send_ether"] = can_send
if can_send:
print("[+] Contract can send ether from following functions:")
output_data["functions_that_can_send"] = []
for function in functions_that_can_send:
print(f"\t- {function.desc()}")

func_data = {
"name": function.desc(),
"calls": []
}
_, insns = function.can_send_ether()
for insn in insns:

print(f"\t\t{insn}")

call_data = {
"instruction": str(insn),
"details": {}
}
if insn.insn.name == 'SELFDESTRUCT':
address = insn.arguments[0]
print(f'\t\t\t{address.writer}')

call_data["details"]["address_writer"] = str(address.writer)
elif insn.insn.name == 'CALL':
address = insn.arguments[1]
value = insn.arguments[2]
print(f'\t\t\tTo:\t{address.writer}')

call_data["details"]["to_writer"] = str(address.writer)
try:
if value.writer:
print(f'\t\t\tValue:\t{value.writer}')
call_data["details"]["value_writer"] = str(value.writer)
else:
value_in_eth = int(value) * 1.0 / 10 ** 18
print(f'\t\t\tValue:\t{value} {value_in_eth}ETH')
call_data["details"]["value"] = f"{value} ({value_in_eth} ETH)"
except Exception as e:
print(e)
call_data["details"]["error"] = str(e)
func_data["calls"].append(call_data)
output_data["functions_that_can_send"].append(func_data)

print("")
else:
print("[+] Contract can not send ether.")

print("[+] Contract calls:")
# Collect contract calls
for call in ssa.calls():
print(f"\t{call}")
call_data = {
"instruction": str(call),
"details": {}
}
if call.insn.name == 'DELEGATECALL':
gas, to, in_offset, in_size, out_offset, out_size = call.arguments
value = None
else:
gas, to, value, in_offset, in_size, out_offset, out_size = call.arguments

print(f"\t\tGas: {gas}", end='')
if gas.writer:
print(f'\t\t\t{gas.writer}')
else:
print("\n", end='')

print(f"\t\tTo: {to} ", end='')
if to.writer:
print(f'\t\t\t{to.writer}')
else:
print("\n", end='')

call_data["details"]["gas"] = str(gas)
call_data["details"]["to"] = str(to)
call_data["details"]["in_offset"] = str(in_offset)
call_data["details"]["in_size"] = str(in_size)
call_data["details"]["out_offset"] = str(out_offset)
call_data["details"]["out_size"] = str(out_size)
if value:
print(f"\t\tValue: {value}", end='')
if value.writer:
print(f'\t\t\t{value.writer}')
else:
print("\n", end='')

print(f"\t\tIn Data Offset: {in_offset}", end='')
if in_offset.writer:
print(f'\t\t{in_offset.writer}')
else:
print("\n", end='')

print(f"\t\tIn Data Size: {in_size}", end='')
if in_size.writer:
print(f'\t\t{in_size.writer}')
else:
print("\n", end='')

print(f"\t\tOut Data Offset: {out_offset}", end='')
if out_offset.writer:
print(f'\t\t{out_offset.writer}')
else:
print("\n", end='')
call_data["details"]["value"] = str(value)
output_data["calls"].append(call_data)

print(f"\t\tOut Data Size: {out_size}", end='')
if out_size.writer:
print(f'\t\t{out_size.writer}')
else:
print("\n", end='')

print("")
# Write JSON output to file
with open(args.output_json, 'w') as json_file:
json.dump(output_data, json_file, indent=4)
print(f"[+] Wrote analysis results to {args.output_json}")

# Generate PNG outputs (optional)
for function in sorted(ssa.functions, key=lambda f: f.offset):
g = rattle.ControlFlowGraph(function)
t = tempfile.NamedTemporaryFile(suffix='.dot', mode='w')
t.write(g.dot())
t.flush()

try:
os.makedirs('output')
except:
pass

out_file = f'output/{function.desc()}.png'

subprocess.call(['dot', '-Tpng', f'-o{out_file}', t.name])
print(f'[+] Wrote {function.desc()} to {out_file}')

try:
# This is mac specific
# This is macOS-specific
subprocess.call(['open', out_file])
except OSError as e:
pass

# Maybe a way to query the current value of a storage location out of some api (can infra do that?)
# print(loc0.value.top())
# print(loc0.value.attx(012323213))

# Cleanup
if args.stdout_to:
sys.stdout = orig_stdout
args.stdout_to.close()

if args.input:
args.input.close()
args.input.close()

if __name__ == "__main__":
main()