Skip to content

Commit

Permalink
Add YARA scanning functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
ashariyar committed Oct 3, 2022
1 parent ba21a79 commit 49e812a
Show file tree
Hide file tree
Showing 28 changed files with 1,596 additions and 267 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# NEXT RELEASE


# 1.6.0
* Integrate YARA scanning - all the rules I could dig up relating to PDFs
* Add MD5, SHA1, SHA256 to document info section
* `print_pdfalyzer_theme_colors` script shows the theme
* Make `README` more PyPi friendly

# 1.5.0
Expand Down
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ A PDF analysis tool geared towards visualizing the inner tree-like data structur

#### Quick Start
```sh
pip install pdfalyzer
pipx install pdfalyzer
pdfalyze the_heidiggerian_themes_expressed_in_illmatic.pdf
```
See [Installation](#installation) and [Usage](#usage) for more details.
`pip install pdfalyzer` also works. See [Installation](#installation) and [Usage](#usage) for more details.

### What It Do
1. **Generate summary format as well as in depth visualizations of a PDF's tree structure**[^1] with helpful color themes that conceptually link objects of similar type. See [the examples below](#example-output) to get an idea.
1. **Display text representations of the PDF's embedded binary data**. Adobe calls these "streams" and they hold things like images, fonts, etc.
1. **Scan for malicious content in the PDF**, including in-depth scans of the embedded font binaries where other tools don't look. This is accomplished by iterating over all the matches for various predefined binary regexes (e.g. the binary representation of the string `/JavaScript`) but is extensible to digging through the PDF for any kind of binary data pattern.
1. **Scan for malicious content in the PDF** both with PDF related [YARA](https://github.com/VirusTotal/yara-python) rules collected from around the internet as well as custom in-depth scans of the embedded font binaries where other tools don't look. These scans will be done both to the overall finay binary as well as to each of the PDF's embedded binary streams _post decode/decrypt_. Most PDFs have many such streams.
1. **Show the results of attempting to decode suspicious byte patterns with many differente character encodings.** In particular quoted bytes - those between things like front slashes (hint: think regexes) in addition to regular quote characters. Several encodings are configured as defaults to try but [the `chardet` library](https://github.com/chardet/chardet) is also leveraged to attempt to detect if the binary could be in an unconfigured encoding.
1. **Be used as a library for your own PDF related code.** All[^2] the inner PDF objects are guaranteed to be available in a searchable tree data structure.
1. **Ease the extraction of all the binary data in a PDF** (fonts, images, etc) to separate files for further analysis. (The heavy lifting is actually done by [Didier Stevens's tools](#installing-didier-stevenss-pdf-analysis-tools) - the pdfalyzer automates what would otherwise be a lot of typing into a single command.)

Expand Down Expand Up @@ -87,9 +88,13 @@ Some simple counts of some properties of the internal PDF objects. Not the most


# Installation

```
pip install pdfalyzer
pipx install pdfalyzer
```

[pipx](https://pypa.github.io/pipx/) is a tool that basically runs `pip install` for a python package but in such a way that the installed package's requirements are isolated from your system's python packages. If you don't feel like installing `pipx` then `pip install` should work fine as long as there are no conflicts between The Pdfalyzer's required packages and those on your system already. (If you aren't using other python based command line tools then your odds of a conflict are basically 0%.)

For info on how to setup a dev environment, see [Contributing](#contributing) section at the end of this file.

### Troubleshooting The Installation
Expand Down
6 changes: 2 additions & 4 deletions pdfalyzer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import sys
from os import environ, getcwd, path

from dotenv import load_dotenv

# load_dotenv() should be called as soon as possible (before parsing local classes) but not for pytest
if not environ.get('INVOKED_BY_PYTEST', False):
for dotenv_file in [path.join(dir, '.pdfalyzer') for dir in [getcwd(), path.expanduser('~')]]:
if path.exists(dotenv_file):
from dotenv import load_dotenv
print(f"Loading config: {dotenv_file}")
load_dotenv(dotenv_path=dotenv_file)
break

Expand All @@ -30,7 +30,6 @@ def pdfalyze():
log_and_print(f"Binary stream extraction complete, files written to '{args.output_dir}'.\nExiting.\n")
sys.exit()


def get_output_basepath(export_method):
"""Build the path to an output file - everything but the extension"""
export_type = export_method.__name__.removeprefix('print_')
Expand All @@ -50,7 +49,6 @@ def get_output_basepath(export_method):
output_basename += args.file_suffix
return path.join(args.output_dir, output_basename + f"___pdfalyzed_{args.invoked_at_str}")


# Analysis exports wrap themselves around the methods that actually generate the analyses
for (arg, method) in output_sections(args, walker):
if args.output_dir:
Expand Down
57 changes: 33 additions & 24 deletions pdfalyzer/binary/binary_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
from pdfalyzer.detection.constants.character_encodings import BOMS
from pdfalyzer.detection.constants.binary_regexes import DANGEROUS_INSTRUCTIONS
from pdfalyzer.detection.regex_match_metrics import RegexMatchMetrics
from pdfalyzer.helpers.bytes_helper import clean_byte_string, get_bytes_before_and_after_match, print_bytes
from pdfalyzer.detection.yara_scanner import YaraScanner
from pdfalyzer.helpers.bytes_helper import clean_byte_string, print_bytes
from pdfalyzer.helpers.rich_text_helper import (CENTER, DANGER_HEADER, NOT_FOUND_MSG, console, console_width,
generate_subtable, na_txt, pad_header, prefix_with_plain_text_obj, subheading_width)
generate_subtable, get_label_style, na_txt, pad_header, prefix_with_plain_text_obj, subheading_width)
from pdfalyzer.helpers.string_helper import generate_hyphen_line, print_section_header
from pdfalyzer.util.adobe_strings import CURRENTFILE_EEXEC
from pdfalyzer.util.logging import log
Expand All @@ -31,12 +32,19 @@


class BinaryScanner:
def __init__(self, _bytes: bytes, owner: Any = None):
def __init__(self, _bytes: bytes, owner: Any = None, label: Any = None):
"""owner is an optional link back to the object containing this binary"""
self.bytes = _bytes
self.label = label
self.owner = owner

if label is None and owner is not None:
self.label = Text(owner.label, get_label_style(owner.label))

self.stream_length = len(_bytes)
self.regex_extraction_stats = defaultdict(lambda: RegexMatchMetrics())
self.suppression_notice_queue = []
self.yara_scanner = YaraScanner(_bytes, label)

def check_for_dangerous_instructions(self) -> None:
"""Scan for all the strings in DANGEROUS_INSTRUCTIONS list and decode bytes around them"""
Expand All @@ -59,8 +67,7 @@ def force_decode_all_quoted_bytes(self) -> None:
def extract_regex_capture_bytes(self, regex: Pattern[bytes]) -> Iterator[BytesMatch]:
"""Finds all matches of regex_with_one_capture in self.bytes and calls yield() with BytesMatch tuples"""
for i, match in enumerate(regex.finditer(self.bytes, self._eexec_idx())):
surrounding_bytes = get_bytes_before_and_after_match(self.bytes, match)
yield(BytesMatch(match, surrounding_bytes, i))
yield(BytesMatch.from_regex_match(self.bytes, match, i + 1))


# -------------------------------------------------------------------------------
Expand Down Expand Up @@ -92,7 +99,7 @@ def print_stream_preview(self, num_bytes=None, title_suffix=None) -> None:
title = f"First and last {num_bytes} bytes of {self.stream_length} byte stream"

title += title_suffix if title_suffix is not None else ''
console.print(Panel(title, style='bytes_title', expand=False))
console.print(Panel(title, style='bytes.title', expand=False))
console.print(generate_hyphen_line(title='BEGIN BYTES'), style='dim')

if snipped_byte_count < 0:
Expand All @@ -107,13 +114,13 @@ def print_stream_preview(self, num_bytes=None, title_suffix=None) -> None:

def print_decoding_stats_table(self) -> None:
"""Diplay aggregate results on the decoding attempts we made on subsets of self.bytes"""
stats_table = new_decoding_stats_table(f"{self.owner or ''}")
stats_table = new_decoding_stats_table(self.label.plain if self.label else '')
regexes_not_found_in_stream = []

for regex, stats in self.regex_extraction_stats.items():
for matcher, stats in self.regex_extraction_stats.items():
# Set aside the regexes we didn't find so that the ones we did find are at the top of the table
if stats.match_count == 0:
regexes_not_found_in_stream.append([str(regex.pattern), NOT_FOUND_MSG, na_txt()])
regexes_not_found_in_stream.append([str(matcher), NOT_FOUND_MSG, na_txt()])
continue

regex_subtable = generate_subtable(cols=['Metric', 'Value'])
Expand All @@ -124,14 +131,13 @@ def print_decoding_stats_table(self) -> None:
regex_subtable.add_row(metric, str(measure))

for i, (encoding, count) in enumerate(stats.was_match_decodable.items()):
style = f"color({CHAR_ENCODING_1ST_COLOR_NUMBER + 2 * i})"
decodes_subtable.add_row(
Text(encoding, style=style),
Text(encoding, style=f"color({CHAR_ENCODING_1ST_COLOR_NUMBER + 2 * i})"),
str(count),
str(self.regex_extraction_stats[regex].was_match_force_decoded[encoding]),
str(self.regex_extraction_stats[regex].was_match_undecodable[encoding]))
str(self.regex_extraction_stats[matcher].was_match_force_decoded[encoding]),
str(self.regex_extraction_stats[matcher].was_match_undecodable[encoding]))

stats_table.add_row(str(regex.pattern), regex_subtable, decodes_subtable)
stats_table.add_row(str(matcher), regex_subtable, decodes_subtable)

for row in regexes_not_found_in_stream:
row[0] = Text(row[0], style='color(235)')
Expand All @@ -148,11 +154,11 @@ def _process_regex_matches(self, regex: Pattern[bytes], label: str, force: bool=
"""Decide whether to attempt to decode the matched bytes, track stats. force param ignores min/max length"""
for bytes_match in self.extract_regex_capture_bytes(regex):
self.regex_extraction_stats[regex].match_count += 1
self.regex_extraction_stats[regex].bytes_matched += bytes_match.capture_len
self.regex_extraction_stats[regex].bytes_matched += bytes_match.match_length
self.regex_extraction_stats[regex].bytes_match_objs.append(bytes_match)

# Send suppressed decodes to a queue and track the reason for the suppression in the stats
if not (force or PdfalyzerConfig.MIN_DECODE_LENGTH < bytes_match.capture_len < PdfalyzerConfig.MAX_DECODE_LENGTH):
if not (force or PdfalyzerConfig.MIN_DECODE_LENGTH < bytes_match.match_length < PdfalyzerConfig.MAX_DECODE_LENGTH):
self._queue_suppression_notice(bytes_match, label)
continue

Expand All @@ -170,26 +176,26 @@ def _attempt_binary_decodes(self, bytes_match: BytesMatch, label: str) -> None:
console.line()

# Track stats on whether the bytes were decodable or not w/a given encoding
self.regex_extraction_stats[bytes_match.regex].matches_decoded += 1
self.regex_extraction_stats[bytes_match.label].matches_decoded += 1

for encoding, count in decoder.was_match_decodable.items():
decode_stats = self.regex_extraction_stats[bytes_match.regex].was_match_decodable
decode_stats = self.regex_extraction_stats[bytes_match.label].was_match_decodable
decode_stats[encoding] = decode_stats.get(encoding, 0) + count

for encoding, count in decoder.was_match_undecodable.items():
failure_stats = self.regex_extraction_stats[bytes_match.regex].was_match_undecodable
failure_stats = self.regex_extraction_stats[bytes_match.label].was_match_undecodable
failure_stats[encoding] = failure_stats.get(encoding, 0) + count

for encoding, count in decoder.was_match_force_decoded.items():
forced_stats = self.regex_extraction_stats[bytes_match.regex].was_match_force_decoded
forced_stats = self.regex_extraction_stats[bytes_match.label].was_match_force_decoded
forced_stats[encoding] = forced_stats.get(encoding, 0) + count

def _queue_suppression_notice(self, bytes_match: BytesMatch, quote_type: str) -> None:
"""Print a message indicating that we are not going to decode a given block of bytes"""
self.regex_extraction_stats[bytes_match.regex].skipped_matches_lengths[bytes_match.capture_len] += 1
self.regex_extraction_stats[bytes_match.label].skipped_matches_lengths[bytes_match.match_length] += 1
txt = bytes_match.__rich__()

if bytes_match.capture_len < PdfalyzerConfig.MIN_DECODE_LENGTH:
if bytes_match.match_length < PdfalyzerConfig.MIN_DECODE_LENGTH:
txt = Text('Too little to actually attempt decode at ', style='grey') + txt
else:
txt.append(" is too large to decode ")
Expand All @@ -215,11 +221,14 @@ def _eexec_idx(self) -> int:

def new_decoding_stats_table(title) -> Table:
"""Build an empty table for displaying decoding stats"""
title = prefix_with_plain_text_obj(title, style='blue underline')
title.append(": Decoding Attempts Summary Statistics", style='bright_white bold')

table = Table(
title=prefix_with_plain_text_obj(title, style='blue underline') + Text(f": Decoding Attempts Summary Statistics"),
title=title,
min_width=subheading_width(),
show_lines=True,
padding=[0, 1],
padding=(0, 1),
style='color(18)',
border_style='color(111) dim',
header_style='color(235) on color(249) reverse',
Expand Down
24 changes: 12 additions & 12 deletions pdfalyzer/binary/bytes_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from collections import defaultdict
from operator import attrgetter
from typing import List
from typing import List, Optional

from rich.panel import Panel
from rich.table import Table
Expand Down Expand Up @@ -35,11 +35,11 @@


class BytesDecoder:
def __init__(self, bytes_match: BytesMatch, label=None) -> None:
def __init__(self, bytes_match: BytesMatch, label: Optional[str] = None) -> None:
"""Instantiated with _bytes as the whole stream; :bytes_seq tells it how to pull the bytes it will decode"""
self.bytes_match = bytes_match
self.bytes = bytes_match.surrounding_bytes
self.label = label or clean_byte_string(bytes_match.regex.pattern)
self.label = label or bytes_match.label

# Empty table/metrics/etc
self.table = empty_decoding_attempts_table(bytes_match)
Expand All @@ -52,6 +52,13 @@ def __init__(self, bytes_match: BytesMatch, label=None) -> None:
# Note we send both the match and surrounding bytes used when detecting the encoding
self.encoding_detector = EncodingDetector(self.bytes)

def print_decode_attempts(self) -> None:
if not PdfalyzerConfig.SUPPRESS_CHARDET_OUTPUT:
console.print(self.encoding_detector)

self._print_decode_attempt_subheading()
console.print(self._generate_decodings_table())

def _generate_decodings_table(self) -> Table:
"""First rows are the raw / hex views of the bytes"""
self.decodings = [DecodingAttempt(self.bytes_match, encoding) for encoding in ENCODINGS_TO_ATTEMPT.keys()]
Expand All @@ -75,13 +82,6 @@ def _generate_decodings_table(self) -> Table:

return self.table

def print_decode_attempts(self) -> None:
if not PdfalyzerConfig.SUPPRESS_CHARDET_OUTPUT:
console.print(self.encoding_detector)

self._print_decode_attempt_subheading()
console.print(self._generate_decodings_table())

def _forced_displays(self) -> List[EncodingAssessment]:
"""Returns assessments over the display threshold that are not yet decoded"""
return self._undecoded_assessments(self.encoding_detector.force_display_assessments)
Expand All @@ -96,8 +96,8 @@ def _was_decoded(self, encoding: str) -> bool:

def _print_decode_attempt_subheading(self) -> None:
"""Generate a rich.Panel for decode attempts"""
headline = Text(f"Found {self.label.lower()} ", style='decode_subheading') + self.bytes_match.__rich__()
panel = Panel(headline, style='decode_subheading', expand=False)
headline = Text(f"Found {self.label.lower()} ", style='decode.subheading') + self.bytes_match.__rich__()
panel = Panel(headline, style='decode.subheading', expand=False)
console.print(panel, justify=CENTER)

def _track_decode_stats(self):
Expand Down
Loading

0 comments on commit 49e812a

Please sign in to comment.