#!/usr/bin/env python
__description__ = 'Tool to test a PDF file'
__author__ = 'Didier Stevens'
__version__ = '0.2.9'
__date__ = '2024/10/26'
"""
Tool to test a PDF file
Source code put in public domain by Didier Stevens, no Copyright
https://DidierStevens.com
Use at your own risk
History:
2009/03/27: start
2009/03/28: scan option
2009/03/29: V0.0.2: xml output
2009/03/31: V0.0.3: /ObjStm suggested by Dion
2009/04/02: V0.0.4: added ErrorMessage
2009/04/20: V0.0.5: added Dates
2009/04/21: V0.0.6: added entropy
2009/04/22: added disarm
2009/04/29: finished disarm
2009/05/13: V0.0.7: added cPDFEOF
2009/07/24: V0.0.8: added /AcroForm and /RichMedia, simplified %PDF header regex, extra date format (without TZ)
2009/07/25: added input redirection, option --force
2009/10/13: V0.0.9: added detection for CVE-2009-3459; added /RichMedia to disarm
2010/01/11: V0.0.10: relaxed %PDF header checking
2010/04/28: V0.0.11: added /Launch
2010/09/21: V0.0.12: fixed cntCharsAfterLastEOF bug; fix by Russell Holloway
2011/12/29: updated for Python 3, added keyword /EmbeddedFile
2012/03/03: added PDFiD2JSON; coded by Brandon Dixon
2013/02/10: V0.1.0: added http/https support; added support for ZIP file with password 'infected'
2013/03/11: V0.1.1: fixes for Python 3
2013/03/13: V0.1.2: Added error handling for files; added /XFA
2013/11/01: V0.2.0: Added @file & plugins
2013/11/02: continue
2013/11/04: added options -c, -m, -v
2013/11/06: added option -S
2013/11/08: continue
2013/11/09: added option -o
2013/11/15: refactoring
2014/09/30: added CSV header
2014/10/16: V0.2.1: added output when plugin & file not pdf
2014/10/18: some fixes for Python 3
2015/08/12: V0.2.2: added option pluginoptions
2015/08/13: added plugin Instructions method
2016/04/12: added option literal
2017/10/29: added pdfid.ini support
2017/11/05: V0.2.3: added option -n
2018/01/03: V0.2.4: bugfix entropy calculation for PDFs without streams; sample 28cb208d976466b295ee879d2d233c8a https://twitter.com/DubinRan/status/947783629123416069
2018/01/15: bugfix ConfigParser privately reported
2018/01/29: bugfix oPDFEOF.cntCharsAfterLastEOF when no %%EOF
2018/07/05: V0.2.5 introduced cExpandFilenameArguments; renamed option literal to literalfilenames
2019/09/30: V0.2.6 color bugfix, thanks to Leo
2019/11/05: V0.2.7 fixed plugin path when compiled with pyinstaller
2020/11/21: V0.2.8 added data argument to PDFiD function
2024/10/26: V0.2.9 added pyzipper support
Todo:
- update XML example (entropy, EOF)
- code review, cleanup
"""
import optparse
import os
import re
import xml.dom.minidom
import traceback
import math
import operator
import os.path
import sys
import json
import collections
import glob
import fnmatch
if sys.version_info[0] >= 3:
import urllib.request as urllib23
else:
import urllib2 as urllib23
if sys.version_info[0] >= 3:
import configparser as ConfigParser
else:
import ConfigParser
if sys.version_info[0] >= 3:
from io import BytesIO as DataIO
else:
from cStringIO import StringIO as DataIO
try:
import pyzipper as zipfile
except ImportError:
import zipfile
#Convert 2 Bytes If Python 3
def C2BIP3(string):
if sys.version_info[0] > 2:
return bytes([ord(x) for x in string])
else:
return string
def CreateZipFileObject(arg1, arg2):
if 'AESZipFile' in dir(zipfile):
return zipfile.AESZipFile(arg1, arg2)
else:
return zipfile.ZipFile(arg1, arg2)
class cBinaryFile:
def __init__(self, file, data=None):
self.file = file
if data != None:
self.infile = DataIO(data)
elif file == '':
self.infile = sys.stdin
elif file.lower().startswith('http://') or file.lower().startswith('https://'):
try:
if sys.hexversion >= 0x020601F0:
self.infile = urllib23.urlopen(file, timeout=5)
else:
self.infile = urllib23.urlopen(file)
except urllib23.HTTPError:
print('Error accessing URL %s' % file)
print(sys.exc_info()[1])
sys.exit()
elif file.lower().endswith('.zip'):
try:
self.zipfile = CreateZipFileObject(file, 'r')
self.infile = self.zipfile.open(self.zipfile.infolist()[0], 'r', C2BIP3('infected'))
except:
print('Error opening file %s' % file)
print(sys.exc_info()[1])
sys.exit()
else:
try:
self.infile = open(file, 'rb')
except:
print('Error opening file %s' % file)
print(sys.exc_info()[1])
sys.exit()
self.ungetted = []
def byte(self):
if len(self.ungetted) != 0:
return self.ungetted.pop()
inbyte = self.infile.read(1)
if not inbyte or inbyte == '':
self.infile.close()
return None
return ord(inbyte)
def bytes(self, size):
if size <= len(self.ungetted):
result = self.ungetted[0:size]
del self.ungetted[0:size]
return result
inbytes = self.infile.read(size - len(self.ungetted))
if inbytes == '':
self.infile.close()
if type(inbytes) == type(''):
result = self.ungetted + [ord(b) for b in inbytes]
else:
result = self.ungetted + [b for b in inbytes]
self.ungetted = []
return result
def unget(self, byte):
self.ungetted.append(byte)
def ungets(self, bytes):
bytes.reverse()
self.ungetted.extend(bytes)
class cPDFDate:
def __init__(self):
self.state = 0
def parse(self, char):
if char == 'D':
self.state = 1
return None
elif self.state == 1:
if char == ':':
self.state = 2
self.digits1 = ''
else:
self.state = 0
return None
elif self.state == 2:
if len(self.digits1) < 14:
if char >= '0' and char <= '9':
self.digits1 += char
return None
else:
self.state = 0
return None
elif char == '+' or char == '-' or char == 'Z':
self.state = 3
self.digits2 = ''
self.TZ = char
return None
elif char == '"':
self.state = 0
self.date = 'D:' + self.digits1
return self.date
elif char < '0' or char > '9':
self.state = 0
self.date = 'D:' + self.digits1
return self.date
else:
self.state = 0
return None
elif self.state == 3:
if len(self.digits2) < 2:
if char >= '0' and char <= '9':
self.digits2 += char
return None
else:
self.state = 0
return None
elif len(self.digits2) == 2:
if char == "'":
self.digits2 += char
return None
else:
self.state = 0
return None
elif len(self.digits2) < 5:
if char >= '0' and char <= '9':
self.digits2 += char
if len(self.digits2) == 5:
self.state = 0
self.date = 'D:' + self.digits1 + self.TZ + self.digits2
return self.date
else:
return None
else:
self.state = 0
return None
def fEntropy(countByte, countTotal):
x = float(countByte) / countTotal
if x > 0:
return - x * math.log(x, 2)
else:
return 0.0
class cEntropy:
def __init__(self):
self.allBucket = [0 for i in range(0, 256)]
self.streamBucket = [0 for i in range(0, 256)]
def add(self, byte, insideStream):
self.allBucket[byte] += 1
if insideStream:
self.streamBucket[byte] += 1
def removeInsideStream(self, byte):
if self.streamBucket[byte] > 0:
self.streamBucket[byte] -= 1
def calc(self):
self.nonStreamBucket = list(map(operator.sub, self.allBucket, self.streamBucket))
allCount = sum(self.allBucket)
streamCount = sum(self.streamBucket)
nonStreamCount = sum(self.nonStreamBucket)
if streamCount == 0:
return (allCount, sum(map(lambda x: fEntropy(x, allCount), self.allBucket)), streamCount, None, nonStreamCount, sum(map(lambda x: fEntropy(x, nonStreamCount), self.nonStreamBucket)))
else:
return (allCount, sum(map(lambda x: fEntropy(x, allCount), self.allBucket)), streamCount, sum(map(lambda x: fEntropy(x, streamCount), self.streamBucket)), nonStreamCount, sum(map(lambda x: fEntropy(x, nonStreamCount), self.nonStreamBucket)))
class cPDFEOF:
def __init__(self):
self.token = ''
self.cntEOFs = 0
def parse(self, char):
if self.cntEOFs > 0:
self.cntCharsAfterLastEOF += 1
if self.token == '' and char == '%':
self.token += char
return
elif self.token == '%' and char == '%':
self.token += char
return
elif self.token == '%%' and char == 'E':
self.token += char
return
elif self.token == '%%E' and char == 'O':
self.token += char
return
elif self.token == '%%EO' and char == 'F':
self.token += char
return
elif self.token == '%%EOF' and (char == '\n' or char == '\r' or char == ' ' or char == '\t'):
self.cntEOFs += 1
self.cntCharsAfterLastEOF = 0
if char == '\n':
self.token = ''
else:
self.token += char
return
elif self.token == '%%EOF\r':
if char == '\n':
self.cntCharsAfterLastEOF = 0
self.token = ''
else:
self.token = ''
def FindPDFHeaderRelaxed(oBinaryFile):
bytes = oBinaryFile.bytes(1024)
index = ''.join([chr(byte) for byte in bytes]).find('%PDF')
if index == -1:
oBinaryFile.ungets(bytes)
return ([], None)
for endHeader in range(index + 4, index + 4 + 10):
if bytes[endHeader] == 10 or bytes[endHeader] == 13:
break
oBinaryFile.ungets(bytes[endHeader:])
return (bytes[0:endHeader], ''.join([chr(byte) for byte in bytes[index:endHeader]]))
def Hexcode2String(char):
if type(char) == int:
return '#%02x' % char
else:
return char
def SwapCase(char):
if type(char) == int:
return ord(chr(char).swapcase())
else:
return char.swapcase()
def HexcodeName2String(hexcodeName):
return ''.join(map(Hexcode2String, hexcodeName))
def SwapName(wordExact):
return map(SwapCase, wordExact)
def UpdateWords(word, wordExact, slash, words, hexcode, allNames, lastName, insideStream, oEntropy, fOut):
if word != '':
if slash + word in words:
words[slash + word][0] += 1
if hexcode:
words[slash + word][1] += 1
elif slash == '/' and allNames:
words[slash + word] = [1, 0]
if hexcode:
words[slash + word][1] += 1
if slash == '/':
lastName = slash + word
if slash == '':
if word == 'stream':
insideStream = True
if word == 'endstream':
if insideStream == True and oEntropy != None:
for char in 'endstream':
oEntropy.removeInsideStream(ord(char))
insideStream = False
if fOut != None:
if slash == '/' and '/' + word in ('/JS', '/JavaScript', '/AA', '/OpenAction', '/JBIG2Decode', '/RichMedia', '/Launch'):
wordExactSwapped = HexcodeName2String(SwapName(wordExact))
fOut.write(C2BIP3(wordExactSwapped))
print('/%s -> /%s' % (HexcodeName2String(wordExact), wordExactSwapped))
else:
fOut.write(C2BIP3(HexcodeName2String(wordExact)))
return ('', [], False, lastName, insideStream)
class cCVE_2009_3459:
def __init__(self):
self.count = 0
def Check(self, lastName, word):
if (lastName == '/Colors' and word.isdigit() and int(word) > 2**24): # decided to alert when the number of colors is expressed with more than 3 bytes
self.count += 1
def XMLAddAttribute(xmlDoc, name, value=None):
att = xmlDoc.createAttribute(name)
xmlDoc.documentElement.setAttributeNode(att)
if value != None:
att.nodeValue = value
return att
def GetScriptPath():
if getattr(sys, 'frozen', False):
return os.path.dirname(sys.executable)
else:
return os.path.dirname(sys.argv[0])
def ParseINIFile():
oConfigParser = ConfigParser.ConfigParser(allow_no_value=True)
oConfigParser.optionxform = str
oConfigParser.read(os.path.join(GetScriptPath(), 'pdfid.ini'))
keywords = []
if oConfigParser.has_section('keywords'):
for key, value in oConfigParser.items('keywords'):
if not key in keywords:
keywords.append(key)
return keywords
def PDFiD(file, allNames=False, extraData=False, disarm=False, force=False, data=None):
"""Example of XML output:
"""
word = ''
wordExact = []
hexcode = False
lastName = ''
insideStream = False
keywords = ['obj',
'endobj',
'stream',
'endstream',
'xref',
'trailer',
'startxref',
'/Page',
'/Encrypt',
'/ObjStm',
'/JS',
'/JavaScript',
'/AA',
'/OpenAction',
'/AcroForm',
'/JBIG2Decode',
'/RichMedia',
'/Launch',
'/EmbeddedFile',
'/XFA',
]
words = {}
dates = []
for extrakeyword in ParseINIFile():
if not extrakeyword in keywords:
keywords.append(extrakeyword)
for keyword in keywords:
words[keyword] = [0, 0]
slash = ''
xmlDoc = xml.dom.minidom.getDOMImplementation().createDocument(None, 'PDFiD', None)
XMLAddAttribute(xmlDoc, 'Version', __version__)
XMLAddAttribute(xmlDoc, 'Filename', file)
attErrorOccured = XMLAddAttribute(xmlDoc, 'ErrorOccured', 'False')
attErrorMessage = XMLAddAttribute(xmlDoc, 'ErrorMessage', '')
oPDFDate = None
oEntropy = None
oPDFEOF = None
oCVE_2009_3459 = cCVE_2009_3459()
try:
attIsPDF = xmlDoc.createAttribute('IsPDF')
xmlDoc.documentElement.setAttributeNode(attIsPDF)
oBinaryFile = cBinaryFile(file, data)
if extraData:
oPDFDate = cPDFDate()
oEntropy = cEntropy()
oPDFEOF = cPDFEOF()
(bytesHeader, pdfHeader) = FindPDFHeaderRelaxed(oBinaryFile)
if disarm:
(pathfile, extension) = os.path.splitext(file)
fOut = open(pathfile + '.disarmed' + extension, 'wb')
for byteHeader in bytesHeader:
fOut.write(C2BIP3(chr(byteHeader)))
else:
fOut = None
if oEntropy != None:
for byteHeader in bytesHeader:
oEntropy.add(byteHeader, insideStream)
if pdfHeader == None and not force:
attIsPDF.nodeValue = 'False'
return xmlDoc
else:
if pdfHeader == None:
attIsPDF.nodeValue = 'False'
pdfHeader = ''
else:
attIsPDF.nodeValue = 'True'
att = xmlDoc.createAttribute('Header')
att.nodeValue = repr(pdfHeader[0:10]).strip("'")
xmlDoc.documentElement.setAttributeNode(att)
byte = oBinaryFile.byte()
while byte != None:
char = chr(byte)
charUpper = char.upper()
if charUpper >= 'A' and charUpper <= 'Z' or charUpper >= '0' and charUpper <= '9':
word += char
wordExact.append(char)
elif slash == '/' and char == '#':
d1 = oBinaryFile.byte()
if d1 != None:
d2 = oBinaryFile.byte()
if d2 != None and (chr(d1) >= '0' and chr(d1) <= '9' or chr(d1).upper() >= 'A' and chr(d1).upper() <= 'F') and (chr(d2) >= '0' and chr(d2) <= '9' or chr(d2).upper() >= 'A' and chr(d2).upper() <= 'F'):
word += chr(int(chr(d1) + chr(d2), 16))
wordExact.append(int(chr(d1) + chr(d2), 16))
hexcode = True
if oEntropy != None:
oEntropy.add(d1, insideStream)
oEntropy.add(d2, insideStream)
if oPDFEOF != None:
oPDFEOF.parse(d1)
oPDFEOF.parse(d2)
else:
oBinaryFile.unget(d2)
oBinaryFile.unget(d1)
(word, wordExact, hexcode, lastName, insideStream) = UpdateWords(word, wordExact, slash, words, hexcode, allNames, lastName, insideStream, oEntropy, fOut)
if disarm:
fOut.write(C2BIP3(char))
else:
oBinaryFile.unget(d1)
(word, wordExact, hexcode, lastName, insideStream) = UpdateWords(word, wordExact, slash, words, hexcode, allNames, lastName, insideStream, oEntropy, fOut)
if disarm:
fOut.write(C2BIP3(char))
else:
oCVE_2009_3459.Check(lastName, word)
(word, wordExact, hexcode, lastName, insideStream) = UpdateWords(word, wordExact, slash, words, hexcode, allNames, lastName, insideStream, oEntropy, fOut)
if char == '/':
slash = '/'
else:
slash = ''
if disarm:
fOut.write(C2BIP3(char))
if oPDFDate != None and oPDFDate.parse(char) != None:
dates.append([oPDFDate.date, lastName])
if oEntropy != None:
oEntropy.add(byte, insideStream)
if oPDFEOF != None:
oPDFEOF.parse(char)
byte = oBinaryFile.byte()
(word, wordExact, hexcode, lastName, insideStream) = UpdateWords(word, wordExact, slash, words, hexcode, allNames, lastName, insideStream, oEntropy, fOut)
# check to see if file ended with %%EOF. If so, we can reset charsAfterLastEOF and add one to EOF count. This is never performed in
# the parse function because it never gets called due to hitting the end of file.
if byte == None and oPDFEOF != None:
if oPDFEOF.token == '%%EOF':
oPDFEOF.cntEOFs += 1
oPDFEOF.cntCharsAfterLastEOF = 0
oPDFEOF.token = ''
except SystemExit:
sys.exit()
except:
attErrorOccured.nodeValue = 'True'
attErrorMessage.nodeValue = traceback.format_exc()
if disarm:
fOut.close()
attEntropyAll = xmlDoc.createAttribute('TotalEntropy')
xmlDoc.documentElement.setAttributeNode(attEntropyAll)
attCountAll = xmlDoc.createAttribute('TotalCount')
xmlDoc.documentElement.setAttributeNode(attCountAll)
attEntropyStream = xmlDoc.createAttribute('StreamEntropy')
xmlDoc.documentElement.setAttributeNode(attEntropyStream)
attCountStream = xmlDoc.createAttribute('StreamCount')
xmlDoc.documentElement.setAttributeNode(attCountStream)
attEntropyNonStream = xmlDoc.createAttribute('NonStreamEntropy')
xmlDoc.documentElement.setAttributeNode(attEntropyNonStream)
attCountNonStream = xmlDoc.createAttribute('NonStreamCount')
xmlDoc.documentElement.setAttributeNode(attCountNonStream)
if oEntropy != None:
(countAll, entropyAll , countStream, entropyStream, countNonStream, entropyNonStream) = oEntropy.calc()
attEntropyAll.nodeValue = '%f' % entropyAll
attCountAll.nodeValue = '%d' % countAll
if entropyStream == None:
attEntropyStream.nodeValue = 'N/A '
else:
attEntropyStream.nodeValue = '%f' % entropyStream
attCountStream.nodeValue = '%d' % countStream
attEntropyNonStream.nodeValue = '%f' % entropyNonStream
attCountNonStream.nodeValue = '%d' % countNonStream
else:
attEntropyAll.nodeValue = ''
attCountAll.nodeValue = ''
attEntropyStream.nodeValue = ''
attCountStream.nodeValue = ''
attEntropyNonStream.nodeValue = ''
attCountNonStream.nodeValue = ''
attCountEOF = xmlDoc.createAttribute('CountEOF')
xmlDoc.documentElement.setAttributeNode(attCountEOF)
attCountCharsAfterLastEOF = xmlDoc.createAttribute('CountCharsAfterLastEOF')
xmlDoc.documentElement.setAttributeNode(attCountCharsAfterLastEOF)
if oPDFEOF != None:
attCountEOF.nodeValue = '%d' % oPDFEOF.cntEOFs
if oPDFEOF.cntEOFs > 0:
attCountCharsAfterLastEOF.nodeValue = '%d' % oPDFEOF.cntCharsAfterLastEOF
else:
attCountCharsAfterLastEOF.nodeValue = ''
else:
attCountEOF.nodeValue = ''
attCountCharsAfterLastEOF.nodeValue = ''
eleKeywords = xmlDoc.createElement('Keywords')
xmlDoc.documentElement.appendChild(eleKeywords)
for keyword in keywords:
eleKeyword = xmlDoc.createElement('Keyword')
eleKeywords.appendChild(eleKeyword)
att = xmlDoc.createAttribute('Name')
att.nodeValue = keyword
eleKeyword.setAttributeNode(att)
att = xmlDoc.createAttribute('Count')
att.nodeValue = str(words[keyword][0])
eleKeyword.setAttributeNode(att)
att = xmlDoc.createAttribute('HexcodeCount')
att.nodeValue = str(words[keyword][1])
eleKeyword.setAttributeNode(att)
eleKeyword = xmlDoc.createElement('Keyword')
eleKeywords.appendChild(eleKeyword)
att = xmlDoc.createAttribute('Name')
att.nodeValue = '/Colors > 2^24'
eleKeyword.setAttributeNode(att)
att = xmlDoc.createAttribute('Count')
att.nodeValue = str(oCVE_2009_3459.count)
eleKeyword.setAttributeNode(att)
att = xmlDoc.createAttribute('HexcodeCount')
att.nodeValue = str(0)
eleKeyword.setAttributeNode(att)
if allNames:
keys = sorted(words.keys())
for word in keys:
if not word in keywords:
eleKeyword = xmlDoc.createElement('Keyword')
eleKeywords.appendChild(eleKeyword)
att = xmlDoc.createAttribute('Name')
att.nodeValue = word
eleKeyword.setAttributeNode(att)
att = xmlDoc.createAttribute('Count')
att.nodeValue = str(words[word][0])
eleKeyword.setAttributeNode(att)
att = xmlDoc.createAttribute('HexcodeCount')
att.nodeValue = str(words[word][1])
eleKeyword.setAttributeNode(att)
eleDates = xmlDoc.createElement('Dates')
xmlDoc.documentElement.appendChild(eleDates)
dates.sort(key=lambda x: x[0])
for date in dates:
eleDate = xmlDoc.createElement('Date')
eleDates.appendChild(eleDate)
att = xmlDoc.createAttribute('Value')
att.nodeValue = date[0]
eleDate.setAttributeNode(att)
att = xmlDoc.createAttribute('Name')
att.nodeValue = date[1]
eleDate.setAttributeNode(att)
return xmlDoc
def PDFiD2String(xmlDoc, nozero, force):
result = 'PDFiD %s %s\n' % (xmlDoc.documentElement.getAttribute('Version'), xmlDoc.documentElement.getAttribute('Filename'))
if xmlDoc.documentElement.getAttribute('ErrorOccured') == 'True':
return result + '***Error occured***\n%s\n' % xmlDoc.documentElement.getAttribute('ErrorMessage')
if not force and xmlDoc.documentElement.getAttribute('IsPDF') == 'False':
return result + ' Not a PDF document\n'
result += ' PDF Header: %s\n' % xmlDoc.documentElement.getAttribute('Header')
for node in xmlDoc.documentElement.getElementsByTagName('Keywords')[0].childNodes:
if not nozero or nozero and int(node.getAttribute('Count')) > 0:
result += ' %-16s %7d' % (node.getAttribute('Name'), int(node.getAttribute('Count')))
if int(node.getAttribute('HexcodeCount')) > 0:
result += '(%d)' % int(node.getAttribute('HexcodeCount'))
result += '\n'
if xmlDoc.documentElement.getAttribute('CountEOF') != '':
result += ' %-16s %7d\n' % ('%%EOF', int(xmlDoc.documentElement.getAttribute('CountEOF')))
if xmlDoc.documentElement.getAttribute('CountCharsAfterLastEOF') != '':
result += ' %-16s %7d\n' % ('After last %%EOF', int(xmlDoc.documentElement.getAttribute('CountCharsAfterLastEOF')))
for node in xmlDoc.documentElement.getElementsByTagName('Dates')[0].childNodes:
result += ' %-23s %s\n' % (node.getAttribute('Value'), node.getAttribute('Name'))
if xmlDoc.documentElement.getAttribute('TotalEntropy') != '':
result += ' Total entropy: %s (%10s bytes)\n' % (xmlDoc.documentElement.getAttribute('TotalEntropy'), xmlDoc.documentElement.getAttribute('TotalCount'))
if xmlDoc.documentElement.getAttribute('StreamEntropy') != '':
result += ' Entropy inside streams: %s (%10s bytes)\n' % (xmlDoc.documentElement.getAttribute('StreamEntropy'), xmlDoc.documentElement.getAttribute('StreamCount'))
if xmlDoc.documentElement.getAttribute('NonStreamEntropy') != '':
result += ' Entropy outside streams: %s (%10s bytes)\n' % (xmlDoc.documentElement.getAttribute('NonStreamEntropy'), xmlDoc.documentElement.getAttribute('NonStreamCount'))
return result
class cCount():
def __init__(self, count, hexcode):
self.count = count
self.hexcode = hexcode
class cPDFiD():
def __init__(self, xmlDoc, force):
self.version = xmlDoc.documentElement.getAttribute('Version')
self.filename = xmlDoc.documentElement.getAttribute('Filename')
self.errorOccured = xmlDoc.documentElement.getAttribute('ErrorOccured') == 'True'
self.errorMessage = xmlDoc.documentElement.getAttribute('ErrorMessage')
self.isPDF = None
if self.errorOccured:
return
self.isPDF = xmlDoc.documentElement.getAttribute('IsPDF') == 'True'
if not force and not self.isPDF:
return
self.header = xmlDoc.documentElement.getAttribute('Header')
self.keywords = {}
for node in xmlDoc.documentElement.getElementsByTagName('Keywords')[0].childNodes:
self.keywords[node.getAttribute('Name')] = cCount(int(node.getAttribute('Count')), int(node.getAttribute('HexcodeCount')))
self.obj = self.keywords['obj']
self.endobj = self.keywords['endobj']
self.stream = self.keywords['stream']
self.endstream = self.keywords['endstream']
self.xref = self.keywords['xref']
self.trailer = self.keywords['trailer']
self.startxref = self.keywords['startxref']
self.page = self.keywords['/Page']
self.encrypt = self.keywords['/Encrypt']
self.objstm = self.keywords['/ObjStm']
self.js = self.keywords['/JS']
self.javascript = self.keywords['/JavaScript']
self.aa = self.keywords['/AA']
self.openaction = self.keywords['/OpenAction']
self.acroform = self.keywords['/AcroForm']
self.jbig2decode = self.keywords['/JBIG2Decode']
self.richmedia = self.keywords['/RichMedia']
self.launch = self.keywords['/Launch']
self.embeddedfile = self.keywords['/EmbeddedFile']
self.xfa = self.keywords['/XFA']
self.colors_gt_2_24 = self.keywords['/Colors > 2^24']
def Print(lines, options):
print(lines)
filename = None
if options.scan:
filename = 'PDFiD.log'
if options.output != '':
filename = options.output
if filename:
logfile = open(filename, 'a')
logfile.write(lines + '\n')
logfile.close()
def Quote(value, separator, quote):
if isinstance(value, str):
if separator in value:
return quote + value + quote
return value
def MakeCSVLine(fields, separator=';', quote='"'):
formatstring = separator.join([field[0] for field in fields])
strings = [Quote(field[1], separator, quote) for field in fields]
return formatstring % tuple(strings)
def ProcessFile(filename, options, plugins):
xmlDoc = PDFiD(filename, options.all, options.extra, options.disarm, options.force)
if plugins == [] and options.select == '':
Print(PDFiD2String(xmlDoc, options.nozero, options.force), options)
return
oPDFiD = cPDFiD(xmlDoc, options.force)
if options.select:
if options.force or not oPDFiD.errorOccured and oPDFiD.isPDF:
pdf = oPDFiD
try:
selected = eval(options.select)
except Exception as e:
Print('Error evaluating select expression: %s' % options.select, options)
if options.verbose:
raise e
return
if selected:
if options.csv:
Print(filename, options)
else:
Print(PDFiD2String(xmlDoc, options.nozero, options.force), options)
else:
for cPlugin in plugins:
if not cPlugin.onlyValidPDF or not oPDFiD.errorOccured and oPDFiD.isPDF:
try:
oPlugin = cPlugin(oPDFiD, options.pluginoptions)
except Exception as e:
Print('Error instantiating plugin: %s' % cPlugin.name, options)
if options.verbose:
raise e
return
try:
score = oPlugin.Score()
except Exception as e:
Print('Error running plugin: %s' % cPlugin.name, options)
if options.verbose:
raise e
return
if options.csv:
if score >= options.minimumscore:
Print(MakeCSVLine((('%s', filename), ('%s', cPlugin.name), ('%.02f', score))), options)
else:
if score >= options.minimumscore:
Print(PDFiD2String(xmlDoc, options.nozero, options.force), options)
Print('%s score: %.02f' % (cPlugin.name, score), options)
try:
Print('%s instructions: %s' % (cPlugin.name, oPlugin.Instructions(score)), options)
except AttributeError:
pass
else:
if options.csv:
if oPDFiD.errorOccured:
Print(MakeCSVLine((('%s', filename), ('%s', cPlugin.name), ('%s', 'Error occured'))), options)
if not oPDFiD.isPDF:
Print(MakeCSVLine((('%s', filename), ('%s', cPlugin.name), ('%s', 'Not a PDF document'))), options)
else:
Print(PDFiD2String(xmlDoc, options.nozero, options.force), options)
def Scan(directory, options, plugins):
try:
if os.path.isdir(directory):
for entry in os.listdir(directory):
Scan(os.path.join(directory, entry), options, plugins)
else:
ProcessFile(directory, options, plugins)
except Exception as e:
# print directory
print(e)
# print(sys.exc_info()[2])
# print traceback.format_exc()
#function derived from: http://blog.9bplus.com/pdfidpy-output-to-json
def PDFiD2JSON(xmlDoc, force):
#Get Top Layer Data
errorOccured = xmlDoc.documentElement.getAttribute('ErrorOccured')
errorMessage = xmlDoc.documentElement.getAttribute('ErrorMessage')
filename = xmlDoc.documentElement.getAttribute('Filename')
header = xmlDoc.documentElement.getAttribute('Header')
isPdf = xmlDoc.documentElement.getAttribute('IsPDF')
version = xmlDoc.documentElement.getAttribute('Version')
entropy = xmlDoc.documentElement.getAttribute('Entropy')
#extra data
countEof = xmlDoc.documentElement.getAttribute('CountEOF')
countChatAfterLastEof = xmlDoc.documentElement.getAttribute('CountCharsAfterLastEOF')
totalEntropy = xmlDoc.documentElement.getAttribute('TotalEntropy')
streamEntropy = xmlDoc.documentElement.getAttribute('StreamEntropy')
nonStreamEntropy = xmlDoc.documentElement.getAttribute('NonStreamEntropy')
keywords = []
dates = []
#grab all keywords
for node in xmlDoc.documentElement.getElementsByTagName('Keywords')[0].childNodes:
name = node.getAttribute('Name')
count = int(node.getAttribute('Count'))
if int(node.getAttribute('HexcodeCount')) > 0:
hexCount = int(node.getAttribute('HexcodeCount'))
else:
hexCount = 0
keyword = { 'count':count, 'hexcodecount':hexCount, 'name':name }
keywords.append(keyword)
#grab all date information
for node in xmlDoc.documentElement.getElementsByTagName('Dates')[0].childNodes:
name = node.getAttribute('Name')
value = node.getAttribute('Value')
date = { 'name':name, 'value':value }
dates.append(date)
data = { 'countEof':countEof, 'countChatAfterLastEof':countChatAfterLastEof, 'totalEntropy':totalEntropy, 'streamEntropy':streamEntropy, 'nonStreamEntropy':nonStreamEntropy, 'errorOccured':errorOccured, 'errorMessage':errorMessage, 'filename':filename, 'header':header, 'isPdf':isPdf, 'version':version, 'entropy':entropy, 'keywords': { 'keyword': keywords }, 'dates': { 'date':dates} }
complete = [ { 'pdfid' : data} ]
result = json.dumps(complete)
return result
def File2Strings(filename):
try:
f = open(filename, 'r')
except:
return None
try:
return list(map(lambda line:line.rstrip('\n'), f.readlines()))
except:
return None
finally:
f.close()
def ProcessAt(argument):
if argument.startswith('@'):
strings = File2Strings(argument[1:])
if strings == None:
raise Exception('Error reading %s' % argument)
else:
return strings
else:
return [argument]
def AddPlugin(cClass):
global plugins
plugins.append(cClass)
class cExpandFilenameArguments():
def __init__(self, filenames, literalfilenames=False, recursedir=False, checkfilenames=False, expressionprefix=None):
self.containsUnixShellStyleWildcards = False
self.warning = False
self.message = ''
self.filenameexpressions = []
self.expressionprefix = expressionprefix
self.literalfilenames = literalfilenames
expression = ''
if len(filenames) == 0:
self.filenameexpressions = [['', '']]
elif literalfilenames:
self.filenameexpressions = [[filename, ''] for filename in filenames]
elif recursedir:
for dirwildcard in filenames:
if expressionprefix != None and dirwildcard.startswith(expressionprefix):
expression = dirwildcard[len(expressionprefix):]
else:
if dirwildcard.startswith('@'):
for filename in ProcessAt(dirwildcard):
self.filenameexpressions.append([filename, expression])
elif os.path.isfile(dirwildcard):
self.filenameexpressions.append([dirwildcard, expression])
else:
if os.path.isdir(dirwildcard):
dirname = dirwildcard
basename = '*'
else:
dirname, basename = os.path.split(dirwildcard)
if dirname == '':
dirname = '.'
for path, dirs, files in os.walk(dirname):
for filename in fnmatch.filter(files, basename):
self.filenameexpressions.append([os.path.join(path, filename), expression])
else:
for filename in list(collections.OrderedDict.fromkeys(sum(map(self.Glob, sum(map(ProcessAt, filenames), [])), []))):
if expressionprefix != None and filename.startswith(expressionprefix):
expression = filename[len(expressionprefix):]
else:
self.filenameexpressions.append([filename, expression])
self.warning = self.containsUnixShellStyleWildcards and len(self.filenameexpressions) == 0
if self.warning:
self.message = "Your filename argument(s) contain Unix shell-style wildcards, but no files were matched.\nCheck your wildcard patterns or use option literalfilenames if you don't want wildcard pattern matching."
return
if self.filenameexpressions == [] and expression != '':
self.filenameexpressions = [['', expression]]
if checkfilenames:
self.CheckIfFilesAreValid()
def Glob(self, filename):
if not ('?' in filename or '*' in filename or ('[' in filename and ']' in filename)):
return [filename]
self.containsUnixShellStyleWildcards = True
return glob.glob(filename)
def CheckIfFilesAreValid(self):
valid = []
doesnotexist = []
isnotafile = []
for filename, expression in self.filenameexpressions:
hashfile = False
try:
hashfile = FilenameCheckHash(filename, self.literalfilenames)[0] == FCH_DATA
except:
pass
if filename == '' or hashfile:
valid.append([filename, expression])
elif not os.path.exists(filename):
doesnotexist.append(filename)
elif not os.path.isfile(filename):
isnotafile.append(filename)
else:
valid.append([filename, expression])
self.filenameexpressions = valid
if len(doesnotexist) > 0:
self.warning = True
self.message += 'The following files do not exist and will be skipped: ' + ' '.join(doesnotexist) + '\n'
if len(isnotafile) > 0:
self.warning = True
self.message += 'The following files are not regular files and will be skipped: ' + ' '.join(isnotafile) + '\n'
def Filenames(self):
if self.expressionprefix == None:
return [filename for filename, expression in self.filenameexpressions]
else:
return self.filenameexpressions
class cPluginParent():
onlyValidPDF = True
def LoadPlugins(plugins, verbose):
if plugins == '':
return
scriptPath = GetScriptPath()
for plugin in sum(map(ProcessAt, plugins.split(',')), []):
try:
if not plugin.lower().endswith('.py'):
plugin += '.py'
if os.path.dirname(plugin) == '':
if not os.path.exists(plugin):
scriptPlugin = os.path.join(scriptPath, plugin)
if os.path.exists(scriptPlugin):
plugin = scriptPlugin
exec(open(plugin, 'r').read())
except Exception as e:
print('Error loading plugin: %s' % plugin)
if verbose:
raise e
def PDFiDMain(filenames, options):
global plugins
plugins = []
LoadPlugins(options.plugins, options.verbose)
if options.csv:
if plugins != []:
Print(MakeCSVLine((('%s', 'Filename'), ('%s', 'Plugin-name'), ('%s', 'Score'))), options)
elif options.select != '':
Print('Filename', options)
for filename in filenames:
if options.scan:
Scan(filename, options, plugins)
else:
ProcessFile(filename, options, plugins)
def Main():
moredesc = '''
Arguments:
pdf-file and zip-file can be a single file, several files, and/or @file
@file: run PDFiD on each file listed in the text file specified
wildcards are supported
Source code put in the public domain by Didier Stevens, no Copyright
Use at your own risk
https://DidierStevens.com'''
oParser = optparse.OptionParser(usage='usage: %prog [options] [pdf-file|zip-file|url|@file] ...\n' + __description__ + moredesc, version='%prog ' + __version__)
oParser.add_option('-s', '--scan', action='store_true', default=False, help='scan the given directory')
oParser.add_option('-a', '--all', action='store_true', default=False, help='display all the names')
oParser.add_option('-e', '--extra', action='store_true', default=False, help='display extra data, like dates')
oParser.add_option('-f', '--force', action='store_true', default=False, help='force the scan of the file, even without proper %PDF header')
oParser.add_option('-d', '--disarm', action='store_true', default=False, help='disable JavaScript and auto launch')
oParser.add_option('-p', '--plugins', type=str, default='', help='plugins to load (separate plugins with a comma , ; @file supported)')
oParser.add_option('-c', '--csv', action='store_true', default=False, help='output csv data when using plugins')
oParser.add_option('-m', '--minimumscore', type=float, default=0.0, help='minimum score for plugin results output')
oParser.add_option('-v', '--verbose', action='store_true', default=False, help='verbose (will also raise catched exceptions)')
oParser.add_option('-S', '--select', type=str, default='', help='selection expression')
oParser.add_option('-n', '--nozero', action='store_true', default=False, help='supress output for counts equal to zero')
oParser.add_option('-o', '--output', type=str, default='', help='output to log file')
oParser.add_option('--pluginoptions', type=str, default='', help='options for the plugin')
oParser.add_option('-l', '--literalfilenames', action='store_true', default=False, help='take filenames literally, no wildcard matching')
oParser.add_option('--recursedir', action='store_true', default=False, help='Recurse directories (wildcards and here files (@...) allowed)')
(options, args) = oParser.parse_args()
if len(args) == 0:
if options.disarm:
print('Option disarm not supported with stdin')
options.disarm = False
if options.scan:
print('Option scan not supported with stdin')
options.scan = False
filenames = ['']
else:
try:
oExpandFilenameArguments = cExpandFilenameArguments(args, options.literalfilenames, options.recursedir, False)
filenames = oExpandFilenameArguments.Filenames()
if oExpandFilenameArguments.warning:
print(oExpandFilenameArguments.message)
except Exception as e:
print(e)
return
PDFiDMain(filenames, options)
if __name__ == '__main__':
Main()