Update pyinstxtractor.py

This commit is contained in:
doomedraven 2025-02-07 11:04:14 +01:00
commit fa9c1bcca3

View file

@ -90,6 +90,10 @@ import marshal
import zlib
import sys
from uuid import uuid4 as uniquename
from contextlib import suppress
import logging
log = logging.getLogger()
class CTOCEntry:
@ -103,106 +107,93 @@ class CTOCEntry:
class PyInstArchive:
PYINST20_COOKIE_SIZE = 24 # For pyinstaller 2.0
PYINST21_COOKIE_SIZE = 24 + 64 # For pyinstaller 2.1+
MAGIC = b'MEI\014\013\012\013\016' # Magic number which identifies pyinstaller
def __init__(self, path):
self.filePath = path
self.pycMagic = b'\0' * 4
self.barePycList = [] # List of pyc's whose headers have to be fixed
PYINST20_COOKIE_SIZE = 24 # For pyinstaller 2.0
PYINST21_COOKIE_SIZE = 24 + 64 # For pyinstaller 2.1+
MAGIC = b"MEI\014\013\012\013\016" # Magic number which identifies pyinstaller
def __init__(self, kwargs):
self.filePath = kwargs["file"]
self.destination_folder = kwargs["destination_folder"]
self.only_entrypoints = kwargs["entry_points"]
self.pycMagic = b"\0" * 4
self.barePycList = [] # List of pyc's whose headers have to be fixed
def open(self):
try:
self.fPtr = open(self.filePath, 'rb')
self.fPtr = open(self.filePath, "rb")
self.fileSize = os.stat(self.filePath).st_size
except:
print('[!] Error: Could not open {0}'.format(self.filePath))
except Exception as e:
log.error("[!] Could not open: %s. Error: %s", self.filePath, str(e))
return False
return True
def close(self):
try:
with suppress(Exception):
self.fPtr.close()
except:
pass
def checkFile(self):
print('[+] Processing {0}'.format(self.filePath))
log.debug("[+] Processing %s", self.filePath)
searchChunkSize = 8192
endPos = self.fileSize
self.cookiePos = -1
if endPos < len(self.MAGIC):
print('[!] Error : File is too short or truncated')
log.error("[!] File is too short or truncated")
return False
while True:
startPos = endPos - searchChunkSize if endPos >= searchChunkSize else 0
chunkSize = endPos - startPos
if chunkSize < len(self.MAGIC):
break
self.fPtr.seek(startPos, os.SEEK_SET)
data = self.fPtr.read(chunkSize)
offs = data.rfind(self.MAGIC)
if offs != -1:
self.cookiePos = startPos + offs
break
endPos = startPos + len(self.MAGIC) - 1
if startPos == 0:
break
if self.cookiePos == -1:
print('[!] Error : Missing cookie, unsupported pyinstaller version or not a pyinstaller archive')
log.error("[!] Missing cookie, unsupported pyinstaller version or not a pyinstaller archive")
return False
self.fPtr.seek(self.cookiePos + self.PYINST20_COOKIE_SIZE, os.SEEK_SET)
if b'python' in self.fPtr.read(64).lower():
print('[+] Pyinstaller version: 2.1+')
self.pyinstVer = 21 # pyinstaller 2.1+
if b"python" in self.fPtr.read(64).lower():
log.debug("[+] Pyinstaller version: 2.1+")
self.pyinstVer = 21 # pyinstaller 2.1+
else:
self.pyinstVer = 20 # pyinstaller 2.0
print('[+] Pyinstaller version: 2.0')
self.pyinstVer = 20 # pyinstaller 2.0
log.debug("[+] Pyinstaller version: 2.0")
return True
def getCArchiveInfo(self):
try:
if self.pyinstVer == 20:
self.fPtr.seek(self.cookiePos, os.SEEK_SET)
# Read CArchive cookie
(magic, lengthofPackage, toc, tocLen, pyver) = \
struct.unpack('!8siiii', self.fPtr.read(self.PYINST20_COOKIE_SIZE))
(magic, lengthofPackage, toc, tocLen, pyver) = struct.unpack("!8siiii", self.fPtr.read(self.PYINST20_COOKIE_SIZE))
elif self.pyinstVer == 21:
self.fPtr.seek(self.cookiePos, os.SEEK_SET)
# Read CArchive cookie
(magic, lengthofPackage, toc, tocLen, pyver, pylibname) = \
struct.unpack('!8sIIii64s', self.fPtr.read(self.PYINST21_COOKIE_SIZE))
except:
print('[!] Error : The file is not a pyinstaller archive')
(magic, lengthofPackage, toc, tocLen, pyver, pylibname) = struct.unpack(
"!8sIIii64s", self.fPtr.read(self.PYINST21_COOKIE_SIZE)
)
except Exception as e:
log.error("[!] The file is not a pyinstaller archive: %s", str(e))
return False
self.pymaj, self.pymin = (pyver//100, pyver%100) if pyver >= 100 else (pyver//10, pyver%10)
print('[+] Python version: {0}.{1}'.format(self.pymaj, self.pymin))
self.pymaj, self.pymin = (pyver // 100, pyver % 100) if pyver >= 100 else (pyver // 10, pyver % 10)
log.debug("[+] Python version: {0}.{1}".format(self.pymaj, self.pymin))
# Additional data after the cookie
tailBytes = self.fileSize - self.cookiePos - (self.PYINST20_COOKIE_SIZE if self.pyinstVer == 20 else self.PYINST21_COOKIE_SIZE)
tailBytes = (
self.fileSize - self.cookiePos - (self.PYINST20_COOKIE_SIZE if self.pyinstVer == 20 else self.PYINST21_COOKIE_SIZE)
)
# Overlay is the data appended at the end of the PE
self.overlaySize = lengthofPackage + tailBytes
@ -210,10 +201,9 @@ class PyInstArchive:
self.tableOfContentsPos = self.overlayPos + toc
self.tableOfContentsSize = tocLen
print('[+] Length of package: {0} bytes'.format(lengthofPackage))
log.debug("[+] Length of package: %d bytes", lengthofPackage)
return True
def parseTOC(self):
# Go to the table of contents
self.fPtr.seek(self.tableOfContentsPos, os.SEEK_SET)
@ -223,63 +213,55 @@ class PyInstArchive:
# Parse table of contents
while parsedLen < self.tableOfContentsSize:
(entrySize, ) = struct.unpack('!i', self.fPtr.read(4))
nameLen = struct.calcsize('!iIIIBc')
(entrySize,) = struct.unpack("!i", self.fPtr.read(4))
nameLen = struct.calcsize("!iIIIBc")
(entryPos, cmprsdDataSize, uncmprsdDataSize, cmprsFlag, typeCmprsData, name) = \
struct.unpack( \
'!IIIBc{0}s'.format(entrySize - nameLen), \
self.fPtr.read(entrySize - 4))
(entryPos, cmprsdDataSize, uncmprsdDataSize, cmprsFlag, typeCmprsData, name) = struct.unpack(
"!IIIBc{0}s".format(entrySize - nameLen), self.fPtr.read(entrySize - 4)
)
try:
name = name.decode("utf-8").rstrip("\0")
except UnicodeDecodeError:
newName = str(uniquename())
print('[!] Warning: File name {0} contains invalid bytes. Using random name {1}'.format(name, newName))
log.warning("[!] File name %s contains invalid bytes. Using random name %s", name, newName)
name = newName
# Prevent writing outside the extraction directory
if name.startswith("/"):
name = name.lstrip("/")
if len(name) == 0:
name = str(uniquename())
print('[!] Warning: Found an unamed file in CArchive. Using random name {0}'.format(name))
log.warning("[!] Found an unamed file in CArchive. Using random name %s", name)
self.tocList.append( \
CTOCEntry( \
self.overlayPos + entryPos, \
cmprsdDataSize, \
uncmprsdDataSize, \
cmprsFlag, \
typeCmprsData, \
name \
))
self.tocList.append(
CTOCEntry(self.overlayPos + entryPos, cmprsdDataSize, uncmprsdDataSize, cmprsFlag, typeCmprsData, name)
)
parsedLen += entrySize
print('[+] Found {0} files in CArchive'.format(len(self.tocList)))
log.info("[+] Found %d files in CArchive", len(self.tocList))
def _writeRawData(self, filepath, data):
nm = filepath.replace('\\', os.path.sep).replace('/', os.path.sep).replace('..', '__')
nm = filepath.replace("\\", os.path.sep).replace("/", os.path.sep).replace("..", "__")
nmDir = os.path.dirname(nm)
if nmDir != '' and not os.path.exists(nmDir): # Check if path exists, create if not
if nmDir != "" and not os.path.exists(nmDir): # Check if path exists, create if not
os.makedirs(nmDir)
with open(nm, 'wb') as f:
with open(nm, "wb") as f:
f.write(data)
def extractFiles(self):
print('[+] Beginning extraction...please standby')
extractionDir = os.path.join(os.getcwd(), os.path.basename(self.filePath) + '_extracted')
log.debug("[+] Beginning extraction...please standby")
# extractionDir = os.path.join(os.getcwd(), os.path.basename(self.filePath) + "_extracted")
extractionDir = self.destination_folder
if not os.path.exists(extractionDir):
os.mkdir(extractionDir)
os.chdir(extractionDir)
# os.chdir(extractionDir)
for entry in self.tocList:
destination_entry = os.path.join(self.destination_folder, entry.name)
self.fPtr.seek(entry.position, os.SEEK_SET)
data = self.fPtr.read(entry.cmprsdDataSize)
@ -287,127 +269,125 @@ class PyInstArchive:
try:
data = zlib.decompress(data)
except zlib.error:
print('[!] Error : Failed to decompress {0}'.format(entry.name))
log.error("[!] Failed to decompress %s", entry.name)
continue
# Malware may tamper with the uncompressed size
# Comment out the assertion in such a case
assert len(data) == entry.uncmprsdDataSize # Sanity Check
assert len(data) == entry.uncmprsdDataSize # Sanity Check
if entry.typeCmprsData == b'd' or entry.typeCmprsData == b'o':
if entry.typeCmprsData in (b"d", b"o"):
# d -> ARCHIVE_ITEM_DEPENDENCY
# o -> ARCHIVE_ITEM_RUNTIME_OPTION
# These are runtime options, not files
continue
basePath = os.path.dirname(entry.name)
if basePath != '':
if basePath != "":
# Check if path exists, create if not
if not os.path.exists(basePath):
os.makedirs(basePath)
if entry.typeCmprsData == b's':
if entry.typeCmprsData == b"s":
# s -> ARCHIVE_ITEM_PYSOURCE
# Entry point are expected to be python scripts
print('[+] Possible entry point: {0}.pyc'.format(entry.name))
log.info("[+] Possible entry point: %s.pyc", entry.name)
if self.pycMagic == b'\0' * 4:
if self.pycMagic == b"\0" * 4:
# if we don't have the pyc header yet, fix them in a later pass
self.barePycList.append(entry.name + '.pyc')
self._writePyc(entry.name + '.pyc', data)
self.barePycList.append(destination_entry + ".pyc")
self._writePyc(destination_entry + ".pyc", data)
elif entry.typeCmprsData == b'M' or entry.typeCmprsData == b'm':
elif entry.typeCmprsData == (b"M", b"m") and not self.only_entrypoints:
# M -> ARCHIVE_ITEM_PYPACKAGE
# m -> ARCHIVE_ITEM_PYMODULE
# packages and modules are pyc files with their header intact
# From PyInstaller 5.3 and above pyc headers are no longer stored
# https://github.com/pyinstaller/pyinstaller/commit/a97fdf
if data[2:4] == b'\r\n':
if data[2:4] == b"\r\n":
# < pyinstaller 5.3
if self.pycMagic == b'\0' * 4:
if self.pycMagic == b"\0" * 4:
self.pycMagic = data[0:4]
self._writeRawData(entry.name + '.pyc', data)
self._writeRawData(destination_entry + ".pyc", data)
else:
# >= pyinstaller 5.3
if self.pycMagic == b'\0' * 4:
if self.pycMagic == b"\0" * 4:
# if we don't have the pyc header yet, fix them in a later pass
self.barePycList.append(entry.name + '.pyc')
self._writePyc(entry.name + '.pyc', data)
self.barePycList.append(destination_entry + ".pyc")
self._writePyc(destination_entry + ".pyc", data)
else:
self._writeRawData(entry.name, data)
if not self.only_entrypoints:
self._writeRawData(destination_entry, data)
if entry.typeCmprsData == b'z' or entry.typeCmprsData == b'Z':
self._extractPyz(entry.name)
if entry.typeCmprsData in (b"z", b"Z"):
self._extractPyz(destination_entry)
# Fix bare pyc's if any
self._fixBarePycs()
def _fixBarePycs(self):
for pycFile in self.barePycList:
with open(pycFile, 'r+b') as pycFile:
with open(pycFile, "r+b") as pycFile:
# Overwrite the first four bytes
pycFile.write(self.pycMagic)
def _writePyc(self, filename, data):
with open(filename, 'wb') as pycFile:
pycFile.write(self.pycMagic) # pyc magic
with open(filename, "wb") as pycFile:
pycFile.write(self.pycMagic) # pyc magic
if self.pymaj >= 3 and self.pymin >= 7: # PEP 552 -- Deterministic pycs
pycFile.write(b'\0' * 4) # Bitfield
pycFile.write(b'\0' * 8) # (Timestamp + size) || hash
if self.pymaj >= 3 and self.pymin >= 7: # PEP 552 -- Deterministic pycs
pycFile.write(b"\0" * 4) # Bitfield
pycFile.write(b"\0" * 8) # (Timestamp + size) || hash
else:
pycFile.write(b'\0' * 4) # Timestamp
pycFile.write(b"\0" * 4) # Timestamp
if self.pymaj >= 3 and self.pymin >= 3:
pycFile.write(b'\0' * 4) # Size parameter added in Python 3.3
pycFile.write(b"\0" * 4) # Size parameter added in Python 3.3
pycFile.write(data)
def _extractPyz(self, name):
dirName = name + '_extracted'
dirName = name + "_extracted"
# Create a directory for the contents of the pyz
if not os.path.exists(dirName):
os.mkdir(dirName)
with open(name, 'rb') as f:
with open(name, "rb") as f:
pyzMagic = f.read(4)
assert pyzMagic == b'PYZ\0' # Sanity Check
assert pyzMagic == b"PYZ\0" # Sanity Check
pyzPycMagic = f.read(4) # Python magic value
if self.pycMagic == b'\0' * 4:
pyzPycMagic = f.read(4) # Python magic value
if self.pycMagic == b"\0" * 4:
self.pycMagic = pyzPycMagic
elif self.pycMagic != pyzPycMagic:
self.pycMagic = pyzPycMagic
print('[!] Warning: pyc magic of files inside PYZ archive are different from those in CArchive')
log.warning("[!] pyc magic of files inside PYZ archive are different from those in CArchive")
# Skip PYZ extraction if not running under the same python version
if self.pymaj != sys.version_info.major or self.pymin != sys.version_info.minor:
print('[!] Warning: This script is running in a different Python version than the one used to build the executable.')
print('[!] Please run this script in Python {0}.{1} to prevent extraction errors during unmarshalling'.format(self.pymaj, self.pymin))
print('[!] Skipping pyz extraction')
log.warning(
"[!] Warning: This script is running in a different Python version than the one used to build the executable."
)
log.info(
"[!] Please run this script in Python %d.%d to prevent extraction errors during unmarshalling.\nSkipping pyz extraction",
self.pymaj,
self.pymin,
)
return
(tocPosition, ) = struct.unpack('!i', f.read(4))
(tocPosition,) = struct.unpack("!i", f.read(4))
f.seek(tocPosition, os.SEEK_SET)
try:
toc = marshal.load(f)
except:
print('[!] Unmarshalling FAILED. Cannot extract {0}. Extracting remaining files.'.format(name))
except Exception as e:
log.error("[!] Unmarshalling FAILED. Cannot extract %s. Extracting remaining files. Error: %s", name, str(e))
return
print('[+] Found {0} files in PYZ archive'.format(len(toc)))
log.debug("[+] Found %d files in PYZ archive", len(toc))
# From pyinstaller 3.1+ toc is a list of tuples
if type(toc) == list:
if isinstance(toc, list):
toc = dict(toc)
for key in toc.keys():
@ -415,20 +395,16 @@ class PyInstArchive:
f.seek(pos, os.SEEK_SET)
fileName = key
try:
with suppress(Exception):
# for Python > 3.3 some keys are bytes object some are str object
fileName = fileName.decode('utf-8')
except:
pass
fileName = fileName.decode("utf-8")
# Prevent writing outside dirName
fileName = fileName.replace('..', '__').replace('.', os.path.sep)
fileName = fileName.replace("..", "__").replace(".", os.path.sep)
if ispkg == 1:
filePath = os.path.join(dirName, fileName, '__init__.pyc')
filePath = os.path.join(dirName, fileName, "__init__.pyc")
else:
filePath = os.path.join(dirName, fileName + '.pyc')
filePath = os.path.join(dirName, fileName + ".pyc")
fileDir = os.path.dirname(filePath)
if not os.path.exists(fileDir):
@ -437,32 +413,46 @@ class PyInstArchive:
try:
data = f.read(length)
data = zlib.decompress(data)
except:
print('[!] Error: Failed to decompress {0}, probably encrypted. Extracting as is.'.format(filePath))
open(filePath + '.encrypted', 'wb').write(data)
except Exception as e:
print("[!] Error: Failed to decompress %s, probably encrypted. Extracting as is. Error: %s", filePath, str(e))
open(filePath + ".encrypted", "wb").write(data)
else:
self._writePyc(filePath, data)
def main():
if len(sys.argv) < 2:
print('[+] Usage: pyinstxtractor.py <filename>')
else:
arch = PyInstArchive(sys.argv[1])
if arch.open():
if arch.checkFile():
if arch.getCArchiveInfo():
arch.parseTOC()
arch.extractFiles()
arch.close()
print('[+] Successfully extracted pyinstaller archive: {0}'.format(sys.argv[1]))
print('')
print('You can now use a python decompiler on the pyc files within the extracted directory')
return
arch.close()
def main(kwargs):
# kwargs = {"file": "path_to_file", "destination_folder": "path_where_to_extract", "entry_points": False/True}
arch = PyInstArchive(kwargs)
if arch.open() and arch.checkFile() and arch.getCArchiveInfo():
arch.parseTOC()
arch.extractFiles()
arch.close()
log.debug(
"[+] Successfully extracted pyinstaller archive: %s\nYou can now use a python decompiler on the pyc files within the extracted directory",
kwargs["file"],
)
return
arch.close()
if __name__ == '__main__':
main()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
prog="PyInstaller Extractor",
description="PyInstaller Extractor is a Python script to extract the contents of a PyInstaller generated executable file.",
)
parser.add_argument("-f", "--file", action="store")
parser.add_argument("-d", "--destination-folder", action="store", help="Folder to store extracted files")
parser.add_argument("-e", "--entry-points", action="store_true", help="Extract only possibble entry points")
options = parser.parse_args()
if not options.file or not os.path.exists(options.file):
parser.print_help()
sys.exit()
# Convert to dict/kwargs
options = vars(options)
logging.basicConfig()
log.setLevel(logging.DEBUG)
main(options)