Disséquer l'Obscurité : Techniques Avancées de Déobfuscation Statique pour Malwares Polymorphes
Table des matières
1. Introduction : Le défi de l'obfuscation dans les malwares modernes
L'analyse de malwares constitue l'un des piliers fondamentaux de la réponse aux incidents et de la Threat Intelligence. Pourtant, les auteurs de codes malveillants déploient des arsenaux toujours plus sophistiqués pour entraver le travail des analystes. Parmi les techniques les plus redoutables figure le polymorphisme, capacité d'un programme malveillant à modifier sa propre représentation binaire à chaque réplication tout en préservant sa sémantique opérationnelle.
En 2025-2026, le paysage des menaces a atteint un niveau de maturité technique sans précédent. Les familles de malwares comme RedLine Stealer, Emotet, QakBot et BlackCat (ALPHV) intègrent nativement des mécanismes d'obfuscation multicouches combinant polymorphisme, métamorphisme, packers commerciaux et virtualisation de code. Le taux de détection par signatures statiques traditionnelles chute à moins de 15% face à ces menaces lors des premières 48 heures post-déploiement, selon les rapports de VirusTotal et AV-TEST.
La déobfuscation statique se distingue de l'analyse dynamique par sa capacité à opérer sans exécuter le binaire suspect. Cette approche est indispensable dans plusieurs scénarios critiques :
- Environnements air-gapped où aucune sandbox n'est disponible
- Malwares à déclenchement conditionnel qui détectent les environnements virtualisés (anti-VM) et modifient leur comportement
- Échantillons partiels récupérés lors de forensics mémoire où seuls des fragments de code sont disponibles
- Analyse à grande échelle de milliers de variants nécessitant une automatisation par pipeline
- Conformité légale dans certaines juridictions interdisant l'exécution de code malveillant même en sandbox
Cet article détaille les techniques avancées permettant de déconstruire méthodiquement les couches d'obfuscation sans exécution, en s'appuyant sur des frameworks d'exécution symbolique (angr, Triton), de désassemblage programmatique (Capstone), et d'analyse automatisée (YARA, Ghidra, IDA Pro). Chaque section inclut du code fonctionnel directement applicable en contexte opérationnel.
Avertissement juridique et éthique
Les techniques présentées dans cet article sont destinées exclusivement à la défense et à la recherche en sécurité. L'analyse de malwares doit être conduite dans un cadre légal approprié, idéalement sur des machines isolées dédiées. Toute utilisation offensive de ces connaissances est illégale au regard des articles 323-1 à 323-7 du Code pénal français et de la Convention de Budapest sur la cybercriminalité.
2. Fondamentaux du polymorphisme malveillant
2.1 Moteurs de mutation : architecture interne
Un moteur de mutation polymorphe est un composant logiciel intégré au malware qui génère à chaque réplication une nouvelle variante du décrypteur (stub). Le payload chiffré reste fonctionnellement identique, mais le stub de déchiffrement est réécrit à chaque itération via des transformations syntaxiques qui préservent la sémantique.
Les transformations fondamentales implémentées par ces moteurs incluent :
- Substitution d'instructions : remplacement d'une instruction par une séquence équivalente (
xor eax, eaxremplacé parsub eax, eaxouand eax, 0) - Réordonnancement de registres : permutation des registres utilisés tout en maintenant les dépendances de données
- Insertion de code mort (dead code) : ajout d'instructions sans effet sur le résultat (NOP sleds, opérations neutralisées)
- Transposition de blocs : réarrangement de blocs de base indépendants avec ajout de sauts inconditionnels
- Substitution de constantes : remplacement de valeurs immédiates par des expressions équivalentes (
mov eax, 5devientmov eax, 8suivi desub eax, 3)
Voici un exemple concret de stub de déchiffrement XOR polymorphe en assembleur x86. La première variante utilise une boucle classique, la seconde est une mutation sémantiquement équivalente :
; === Variante A : stub XOR classique ===
decrypt_stub_a:
mov esi, payload_addr ; adresse du payload chiffré
mov ecx, payload_len ; taille en octets
mov bl, 0x4F ; clé XOR
.loop_a:
xor byte [esi], bl ; déchiffrement octet par octet
inc esi ; pointeur suivant
dec ecx ; décrémenter compteur
jnz .loop_a ; boucler si ecx != 0
jmp payload_addr ; sauter au payload déchiffré
; === Variante B : mutation polymorphe équivalente ===
decrypt_stub_b:
lea edi, [payload_addr] ; substitution mov -> lea
push payload_len
pop ecx ; substitution mov -> push/pop
xor edx, edx ; dead code : registre inutilisé
mov al, 0x50
sub al, 0x01 ; 0x50 - 0x01 = 0x4F (clé recalculée)
nop ; insertion NOP
add edx, 0xDEAD ; dead code
.loop_b:
mov ah, byte [edi] ; charger via registre intermédiaire
xor ah, al ; déchiffrement dans ah
mov byte [edi], ah ; réécrire
lea edi, [edi + 1] ; inc via lea
sub edx, 1 ; dead code
loop .loop_b ; dec ecx + jnz combinés
jmp payload_addr
Bien que les deux stubs produisent exactement le même résultat (déchiffrement XOR du payload avec la clé 0x4F), leurs signatures binaires sont radicalement différentes. Le hash SHA-256 de chaque variante sera distinct, rendant inefficace toute détection par empreinte statique.
2.2 Métamorphisme vs. Polymorphisme : distinctions critiques
La confusion entre polymorphisme et métamorphisme est fréquente mais les deux concepts diffèrent fondamentalement :
| Caractéristique | Polymorphisme | Métamorphisme |
|---|---|---|
| Portée de la mutation | Stub de déchiffrement uniquement | Totalité du corps du malware |
| Payload | Chiffré, invariant | Réécrit à chaque génération |
| Complexité d'implémentation | Modérée | Très élevée |
| Exemples historiques | Storm Worm, Virut | Zmist (Z0mbie), MetaPHOR, Simile |
| Résistance à l'émulation | Faible (payload exposé après déchiffrement) | Élevée (pas de phase de déchiffrement) |
| Déobfuscation statique | Pattern matching sur opérations crypto | Normalisation sémantique requise |
Un malware métamorphe comme Zmist (créé par le chercheur Z0mbie) implémentait un désassembleur/réassembleur complet capable de réécrire l'intégralité de son code en permutant les registres, substituant les instructions, et réorganisant les blocs de base. Contrairement au polymorphisme, aucune routine de déchiffrement n'est présente car le code n'est jamais chiffré : il est simplement réécrit sous une forme syntaxiquement différente mais sémantiquement identique.
2.3 Transformations d'opcodes x86/x64 : catalogue des équivalences
La compréhension des équivalences au niveau ISA (Instruction Set Architecture) x86/x64 est fondamentale pour reconnaître les mutations. Voici les substitutions les plus couramment exploitées par les moteurs polymorphes :
; --- Mise à zéro d'un registre ---
xor eax, eax ; Forme canonique (2 octets : 31 C0)
sub eax, eax ; Équivalent (2 octets : 29 C0)
and eax, 0 ; Équivalent (5 octets : 83 E0 00)
mov eax, 0 ; Équivalent (5 octets : B8 00 00 00 00)
imul eax, eax, 0 ; Moins courant (3 octets : 6B C0 00)
push 0 / pop eax ; Via la pile (3 octets : 6A 00 / 58)
lea eax, [0] ; Rarement vu mais valide
; --- Incrémentation ---
inc eax ; Forme canonique (1 octet en x86 : 40)
add eax, 1 ; Équivalent (3 octets : 83 C0 01)
sub eax, -1 ; Équivalent via complément (3 octets : 83 E8 FF)
lea eax, [eax + 1] ; Via LEA (3 octets)
; --- Copie de registre ---
mov eax, ebx ; Forme canonique
push ebx / pop eax ; Via la pile
lea eax, [ebx] ; Via LEA
xor eax, eax / or eax, ebx ; Double instruction
; --- Saut inconditionnel ---
jmp target ; Forme canonique
push target / ret ; Via la pile (anti-désassemblage)
call target / add esp, 4 ; Via call + nettoyage pile
Architecture d'un moteur polymorphe
3. Packers commerciaux et custom : identification et unpacking
3.1 Panorama des packers dans l'écosystème malveillant
Les packers (ou protecteurs de binaires) constituent la première couche d'obfuscation rencontrée lors de l'analyse statique. Initialement conçus pour la protection de logiciels légitimes contre le piratage, ils sont massivement détournés par les auteurs de malwares. On distingue trois catégories principales :
Packers de compression : réduisent la taille du binaire et ajoutent un stub de décompression. Le code original est restauré en mémoire à l'exécution. Les plus courants sont UPX (Ultimate Packer for eXecutables) et MPRESS.
Protecteurs commerciaux : offrent des mécanismes avancés incluant virtualisation de code, anti-debug, anti-dump, mutations polymorphes et obfuscation du flux de contrôle. Themida/WinLicense (Oreans Technologies) et VMProtect sont les plus redoutés.
Packers custom : développés spécifiquement par les groupes APT pour leurs opérations. Ils combinent souvent plusieurs techniques et sont les plus difficiles à traiter car non documentés. Les groupes comme Lazarus, APT29 et FIN7 maintiennent leurs propres packers.
3.2 Identification avec Detect It Easy et PEiD
L'identification du packer est la première étape critique. Detect It Easy (DIE) a largement supplanté PEiD grâce à son système de signatures extensible basé sur des scripts JavaScript et sa maintenance active.
#!/bin/bash
# Script d'identification de packer en batch
# Nécessite : Detect It Easy (diec) en ligne de commande
SAMPLE_DIR="/opt/malware_samples/incoming"
REPORT_DIR="/opt/analysis/packer_reports"
mkdir -p "$REPORT_DIR"
echo "[*] Analyse de packer en batch - $(date '+%Y-%m-%d %H:%M:%S')"
echo "============================================="
for sample in "$SAMPLE_DIR"/*; do
[ -f "$sample" ] || continue
filename=$(basename "$sample")
sha256=$(sha256sum "$sample" | cut -d' ' -f1)
echo "[+] Analyse: $filename ($sha256)"
# Detect It Easy - analyse complète
die_result=$(diec --json "$sample" 2>/dev/null)
# Extraction du packer détecté
packer=$(echo "$die_result" | python3 -c "
import sys, json
data = json.load(sys.stdin)
for det in data.get('detects', []):
for val in det.get('values', []):
if val.get('type') in ['packer', 'protector', 'compiler']:
print(f\"{val['type']}: {val['name']} {val.get('version', '')}\")
" 2>/dev/null)
if [ -n "$packer" ]; then
echo " [!] Détecté: $packer"
else
echo " [-] Aucun packer connu détecté"
fi
# Entropie par section (indicateur de packing)
python3 -c "
import pefile, math
def entropy(data):
if not data:
return 0.0
counts = [0] * 256
for byte in data:
counts[byte] += 1
ent = 0.0
for c in counts:
if c > 0:
p = c / len(data)
ent -= p * math.log2(p)
return ent
pe = pefile.PE('$sample')
print(' Entropie par section:')
for section in pe.sections:
name = section.Name.decode('utf-8', errors='ignore').rstrip('\x00')
ent = entropy(section.get_data())
flag = ' [PACKED?]' if ent > 7.0 else ''
print(f' {name:10s} : {ent:.4f}{flag}')
" 2>/dev/null
echo "---"
# Sauvegarde du rapport JSON
echo "$die_result" > "$REPORT_DIR/${sha256}.json"
done
echo "[*] Analyse terminée. Rapports dans $REPORT_DIR"
Seuils d'entropie pour la détection de packing
L'entropie de Shannon d'une section PE est un indicateur fiable de compression ou de chiffrement. Une section .text compilée normalement présente une entropie de 5.5 à 6.5. Une entropie supérieure à 7.0 sur 8.0 maximum indique quasi-certainement du packing. Les sections contenant du code machine légitime non obfusqué dépassent rarement 6.8.
3.3 Unpacking statique : cas UPX et MPRESS
L'unpacking statique d'UPX est trivial grâce à l'outil natif, mais de nombreux malwares modifient les en-têtes UPX pour empêcher le déballage automatique :
#!/usr/bin/env python3
"""
Unpacker statique pour UPX modifié.
Restaure les magic bytes UPX altérés par les malwares
avant de procéder au déballage.
"""
import struct
import subprocess
import sys
import shutil
from pathlib import Path
# Signatures UPX connues et leurs altérations courantes
UPX_SIGNATURES = {
b'UPX0': [b'UPX0', b'\x00PX0', b'UXP0', b'XUP0'],
b'UPX1': [b'UPX1', b'\x00PX1', b'UXP1', b'XUP1'],
b'UPX2': [b'UPX2', b'\x00PX2', b'UXP2', b'XUP2'],
b'UPX!': [b'UPX!', b'\x00PX!', b'UXP!', b'XUP!'],
}
def fix_upx_headers(filepath: str) -> str:
"""Corrige les en-têtes UPX altérés et retourne le chemin du fichier corrigé."""
data = bytearray(Path(filepath).read_bytes())
fixed = False
# Restaurer les noms de sections UPX
for original, variants in UPX_SIGNATURES.items():
for variant in variants[1:]: # Ignorer l'original
offset = 0
while True:
pos = data.find(variant, offset)
if pos == -1:
break
print(f" [+] Correction à offset 0x{pos:08X}: "
f"{variant} -> {original}")
data[pos:pos+4] = original
fixed = True
offset = pos + 4
# Vérifier et restaurer le magic UPX en fin de fichier
# Le magic "UPX!" se trouve généralement dans l'overlay
for i in range(len(data) - 4, max(len(data) - 1024, 0), -1):
if data[i:i+3] in [b'\x00PX', b'XUP', b'UXP']:
data[i:i+3] = b'UPX'
fixed = True
print(f" [+] Magic UPX restauré à offset 0x{i:08X}")
break
if fixed:
output_path = filepath + ".fixed"
Path(output_path).write_bytes(data)
return output_path
return filepath
def unpack_upx(filepath: str, output_dir: str) -> bool:
"""Tente le déballage UPX avec restauration automatique des en-têtes."""
print(f"[*] Tentative d'unpacking UPX: {filepath}")
output_path = Path(output_dir) / (Path(filepath).stem + "_unpacked.exe")
# Tentative directe
result = subprocess.run(
["upx", "-d", filepath, "-o", str(output_path)],
capture_output=True, text=True
)
if result.returncode == 0:
print(f" [+] Déballage réussi: {output_path}")
return True
print(" [-] Échec direct, tentative avec correction d'en-têtes...")
# Correction et nouvelle tentative
fixed_path = fix_upx_headers(filepath)
if fixed_path != filepath:
result = subprocess.run(
["upx", "-d", fixed_path, "-o", str(output_path)],
capture_output=True, text=True
)
Path(fixed_path).unlink(missing_ok=True)
if result.returncode == 0:
print(f" [+] Déballage réussi après correction: {output_path}")
return True
print(" [!] Échec - packer non-standard ou UPX lourdement modifié")
return False
if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} ")
sys.exit(1)
unpack_upx(sys.argv[1], sys.argv[2])
3.4 VMProtect et Themida : la virtualisation de code
VMProtect et Themida représentent un défi d'un ordre de grandeur supérieur. Ces protecteurs convertissent le code x86/x64 natif en bytecode pour une machine virtuelle propriétaire embarquée dans le binaire. Le code original n'existe plus sous sa forme native : il a été traduit dans un jeu d'instructions custom interprété par un handler VM.
L'architecture type d'une VM de protection comprend :
- vm_entry : point d'entrée qui sauvegarde le contexte CPU et initialise la VM
- vm_dispatcher : boucle de fetch-decode-execute qui lit les opcodes virtuels
- vm_handlers : implémentation de chaque instruction virtuelle (vAdd, vXor, vPush, vPop, etc.)
- vm_exit : restauration du contexte CPU et retour au code natif
La déobfuscation de code virtualisé nécessite une approche en deux phases : identification de la structure VM par pattern matching, puis traduction inverse (devirtualization) des bytecodes en instructions natives.
#!/usr/bin/env python3
"""
Détecteur simplifié de virtualisation VMProtect.
Identifie les patterns d'entrée VM caractéristiques.
"""
import pefile
import re
VMPROTECT_ENTRY_PATTERNS = [
# Push de tous les registres suivi de pushfq (sauvegarde contexte)
rb'\x60\x9C', # pushad; pushfd (x86)
rb'\x50\x51\x52\x53\x55\x56\x57', # push sequence (x86)
# VMProtect 3.x : jmp dans section .vmp
rb'\xE9.{4}\x68.{4}\xE8', # jmp rel32; push imm32; call
# Themida: pattern d'entrée caractéristique
rb'\xEB.\x00{16,}', # jmp short + padding nulls
]
def detect_vm_protection(filepath: str) -> dict:
"""Détecte les patterns de virtualisation dans un PE."""
pe = pefile.PE(filepath)
results = {
'vmprotect_sections': [],
'themida_sections': [],
'vm_entry_candidates': [],
'virtualized': False
}
for section in pe.sections:
name = section.Name.decode('utf-8', errors='ignore').rstrip('\x00')
# Sections caractéristiques VMProtect
if name.startswith('.vmp'):
results['vmprotect_sections'].append({
'name': name,
'va': hex(section.VirtualAddress),
'size': section.SizeOfRawData,
'entropy': _calc_entropy(section.get_data())
})
results['virtualized'] = True
# Sections Themida
if name in ['.themida', '.winlice', '.taggant']:
results['themida_sections'].append(name)
results['virtualized'] = True
# Recherche de patterns VM entry dans toutes les sections exécutables
for section in pe.sections:
if not section.Characteristics & 0x20000000: # IMAGE_SCN_MEM_EXECUTE
continue
data = section.get_data()
for i, pattern in enumerate(VMPROTECT_ENTRY_PATTERNS):
for match in re.finditer(pattern, data):
va = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + match.start()
results['vm_entry_candidates'].append({
'pattern_id': i,
'va': hex(va),
'offset': hex(match.start()),
'bytes': data[match.start():match.start()+16].hex()
})
return results
def _calc_entropy(data):
import math
if not data:
return 0.0
counts = [0] * 256
for b in data:
counts[b] += 1
ent = 0.0
for c in counts:
if c > 0:
p = c / len(data)
ent -= p * math.log2(p)
return round(ent, 4)
if __name__ == "__main__":
import sys, json
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} ")
sys.exit(1)
result = detect_vm_protection(sys.argv[1])
print(json.dumps(result, indent=2))
4. Exécution symbolique avec angr et Triton
4.1 Principes de l'exécution symbolique pour la déobfuscation
L'exécution symbolique est une technique d'analyse de programme qui exécute le code non pas avec des valeurs concrètes mais avec des variables symboliques. Chaque branchement conditionnel génère une contrainte sur ces variables, et un solveur SMT (Satisfiability Modulo Theories) comme Z3 détermine les valeurs satisfaisant un ensemble de contraintes donné.
En contexte de déobfuscation, l'exécution symbolique permet de :
- Résoudre les prédicats opaques : déterminer si un branchement est toujours vrai, toujours faux, ou réellement conditionnel
- Extraire les clés de déchiffrement : résoudre les contraintes menant à la routine de déchiffrement du payload
- Contourner les vérifications anti-debug : trouver les chemins d'exécution qui évitent les checks de détection d'environnement d'analyse
- Simplifier les expressions obfusquées : réduire des séquences arithmétiques complexes à leur forme canonique
4.2 angr : exploration symbolique de binaires
angr est un framework d'analyse binaire développé par le laboratoire SecLab de UC Santa Barbara. Il implémente un moteur d'exécution symbolique complet avec support de multiples architectures et gestion automatique de la mémoire symbolique.
#!/usr/bin/env python3
"""
Déobfuscation par exécution symbolique avec angr.
Résout les prédicats opaques et extrait les chemins d'exécution
réels dans un binaire obfusqué.
"""
import angr
import claripy
import logging
logging.getLogger('angr').setLevel(logging.WARNING)
def deobfuscate_opaque_predicates(binary_path: str, func_addr: int) -> dict:
"""
Analyse une fonction obfusquée et identifie les prédicats opaques.
Un prédicat opaque est un branchement conditionnel dont le résultat
est déterministe (toujours vrai ou toujours faux) mais dont
l'évaluation statique sans exécution symbolique est difficile.
Exemple classique :
mov eax, 7
imul eax, eax ; eax = 49
sub eax, 1 ; eax = 48
and eax, 1 ; eax = 0 (48 est pair)
jnz fake_branch ; jamais pris -> prédicat opaque
"""
project = angr.Project(binary_path, auto_load_libs=False)
results = {
'opaque_predicates': [],
'dead_branches': [],
'live_paths': [],
'simplified_blocks': []
}
cfg = project.analyses.CFGFast(
regions=[(func_addr, func_addr + 0x2000)],
normalize=True
)
func = cfg.kb.functions.get(func_addr)
if func is None:
print(f"[-] Fonction non trouvée à 0x{func_addr:x}")
return results
for block in func.blocks:
if not block.instructions:
continue
# Identifier les blocs se terminant par un branchement conditionnel
last_insn = block.capstone.insns[-1] if block.capstone.insns else None
if last_insn is None:
continue
conditional_jmps = [
'je', 'jne', 'jz', 'jnz', 'jg', 'jge', 'jl', 'jle',
'ja', 'jae', 'jb', 'jbe', 'jo', 'jno', 'js', 'jns'
]
if last_insn.mnemonic not in conditional_jmps:
continue
# Exécution symbolique depuis le début du bloc
state = project.factory.blank_state(addr=block.addr)
simgr = project.factory.simulation_manager(state)
# Explorer jusqu'à la fin du bloc
try:
simgr.explore(
find=block.addr + block.size,
num_find=10,
timeout=5
)
except Exception:
continue
# Analyser les successeurs du branchement
successors = list(func.graph.successors(block))
if len(successors) != 2:
continue
taken_addr = successors[0].addr
not_taken_addr = successors[1].addr
# Vérifier si le branchement est déterministe
state_at_branch = project.factory.blank_state(addr=block.addr)
simgr_branch = project.factory.simulation_manager(state_at_branch)
try:
simgr_branch.step(num_inst=block.instructions)
except Exception:
continue
if len(simgr_branch.active) == 1:
# Un seul successeur -> prédicat opaque détecté
actual_target = simgr_branch.active[0].addr
dead_target = (not_taken_addr
if actual_target == taken_addr
else taken_addr)
results['opaque_predicates'].append({
'block_addr': hex(block.addr),
'branch_insn': f"{last_insn.mnemonic} {last_insn.op_str}",
'always_goes_to': hex(actual_target),
'dead_branch': hex(dead_target),
'type': 'opaque_true' if actual_target == taken_addr else 'opaque_false'
})
results['dead_branches'].append(hex(dead_target))
print(f"[+] Prédicats opaques détectés: {len(results['opaque_predicates'])}")
for op in results['opaque_predicates']:
print(f" 0x{op['block_addr']}: {op['branch_insn']} "
f"-> toujours vers {op['always_goes_to']} "
f"(branche morte: {op['dead_branch']})")
return results
def extract_decryption_key(binary_path: str, decrypt_func: int,
key_output_addr: int) -> bytes:
"""
Utilise l'exécution symbolique pour extraire la clé de déchiffrement
sans exécuter le binaire. Résout les contraintes pour trouver les
valeurs menant à l'écriture de la clé en mémoire.
"""
project = angr.Project(binary_path, auto_load_libs=False)
# Créer un état symbolique au point d'entrée de la routine de déchiffrement
state = project.factory.blank_state(addr=decrypt_func)
# Rendre symbolique la zone mémoire où la clé sera écrite
key_sym = claripy.BVS('decryption_key', 256) # 32 octets symboliques
state.memory.store(key_output_addr, key_sym)
simgr = project.factory.simulation_manager(state)
# Définir les adresses de succès et d'échec
# (à adapter selon le binaire analysé)
def is_successful(s):
"""Vérifier si l'état a atteint la fin de la routine de déchiffrement."""
return s.addr >= decrypt_func + 0x100 # Heuristique
def should_avoid(s):
"""Éviter les chemins menant aux vérifications anti-debug."""
anti_debug_addrs = [
0x401000, # IsDebuggerPresent check
0x401050, # NtQueryInformationProcess check
0x4010A0, # Timing check (rdtsc)
]
return s.addr in anti_debug_addrs
simgr.explore(find=is_successful, avoid=should_avoid)
if simgr.found:
found_state = simgr.found[0]
# Résoudre la valeur concrète de la clé
concrete_key = found_state.solver.eval(key_sym, cast_to=bytes)
print(f"[+] Clé extraite: {concrete_key.hex()}")
return concrete_key
print("[-] Impossible d'extraire la clé par exécution symbolique")
return b''
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} ")
sys.exit(1)
# Exemple d'utilisation
results = deobfuscate_opaque_predicates(
sys.argv[1],
func_addr=0x00401000 # Adresse à adapter
)
4.3 Triton : exécution symbolique et taint analysis
Triton est un framework d'analyse binaire dynamique développé par Quarkslab qui offre des capacités d'exécution symbolique plus granulaires qu'angr, avec un accent sur la taint analysis (analyse de propagation de données) et la simplification d'expressions.
Triton excelle dans la simplification de Mixed Boolean-Arithmetic (MBA) expressions, technique d'obfuscation très utilisée dans les malwares modernes qui combine opérations arithmétiques et logiques pour masquer des calculs simples :
#!/usr/bin/env python3
"""
Simplification d'expressions MBA obfusquées avec Triton.
Les expressions MBA (Mixed Boolean-Arithmetic) sont utilisées
par les obfuscateurs pour masquer des opérations triviales.
Exemple: (x ^ y) + 2*(x & y) est équivalent à x + y
"""
from triton import (
TritonContext, ARCH, MODE, Instruction,
SYMBOLIC_SIMPLIFICATION, AST_REPRESENTATION
)
def create_triton_ctx():
"""Initialise un contexte Triton pour x86-64."""
ctx = TritonContext(ARCH.X86_64)
ctx.setMode(MODE.ALIGNED_MEMORY, True)
ctx.setMode(MODE.CONSTANT_FOLDING, True)
ctx.setAstRepresentationMode(AST_REPRESENTATION.PYTHON)
return ctx
def simplify_mba_expression(opcodes: bytes, base_addr: int = 0x400000) -> str:
"""
Exécute symboliquement une séquence d'opcodes et simplifie
l'expression résultante via le solveur Z3 intégré.
Args:
opcodes: bytes des instructions x86-64
base_addr: adresse de base virtuelle
Returns:
Expression simplifiée sous forme de chaîne
"""
ctx = create_triton_ctx()
# Mapper les opcodes en mémoire
for i, byte in enumerate(opcodes):
ctx.setConcreteMemoryValue(base_addr + i, byte)
# Rendre les registres d'entrée symboliques
ctx.symbolizeRegister(ctx.registers.rax, "x")
ctx.symbolizeRegister(ctx.registers.rbx, "y")
# Exécuter instruction par instruction
pc = base_addr
executed = []
while pc < base_addr + len(opcodes):
inst = Instruction(pc, ctx.getConcreteMemoryAreaValue(pc, 16))
if not ctx.processing(inst):
break
executed.append(f"0x{pc:x}: {inst.getDisassembly()}")
pc = inst.getNextAddress()
print("[*] Instructions exécutées:")
for line in executed:
print(f" {line}")
# Extraire l'expression symbolique du résultat (dans rax)
rax_expr = ctx.getSymbolicRegister(ctx.registers.rax)
if rax_expr is None:
return "N/A (registre non modifié)"
ast = rax_expr.getAst()
simplified = ctx.simplify(ast, SYMBOLIC_SIMPLIFICATION.LLVM)
print(f"\n[*] Expression brute: {ast}")
print(f"[+] Expression simplifiée: {simplified}")
return str(simplified)
def deobfuscate_constant_unfolding(ctx, instructions_bytes, base=0x400000):
"""
Résout le constant unfolding : technique où une constante simple
est calculée par une longue série d'opérations.
Exemple: mov rax, 0x1234 obfusqué en:
mov rax, 0xDEADBEEF
xor rax, 0xDEADBEEF ^ 0x1234 (= 0xDEADACDB)
add rax, 0x5678
sub rax, 0x5678
"""
for i, b in enumerate(instructions_bytes):
ctx.setConcreteMemoryValue(base + i, b)
pc = base
while pc < base + len(instructions_bytes):
inst = Instruction(pc, ctx.getConcreteMemoryAreaValue(pc, 16))
if not ctx.processing(inst):
break
pc = inst.getNextAddress()
# Évaluer la valeur concrète résultante
rax_val = ctx.getConcreteRegisterValue(ctx.registers.rax)
return rax_val
# Exemple MBA courant dans les malwares obfusqués
# L'expression (x ^ y) + 2*(x & y) est équivalente à x + y
if __name__ == "__main__":
# Opcodes x86-64 pour :
# mov rax, rdi ; rax = x
# mov rbx, rsi ; rbx = y
# mov rcx, rax
# xor rcx, rbx ; rcx = x ^ y
# and rax, rbx ; rax = x & y
# shl rax, 1 ; rax = 2 * (x & y)
# add rax, rcx ; rax = (x ^ y) + 2*(x & y) = x + y
mba_opcodes = (
b'\x48\x89\xf8' # mov rax, rdi
b'\x48\x89\xf3' # mov rbx, rsi
b'\x48\x89\xc1' # mov rcx, rax
b'\x48\x31\xd9' # xor rcx, rbx
b'\x48\x21\xd8' # and rax, rbx
b'\x48\xd1\xe0' # shl rax, 1
b'\x48\x01\xc8' # add rax, rcx
)
print("=" * 60)
print("Simplification MBA avec Triton")
print("=" * 60)
simplify_mba_expression(mba_opcodes)
5. Framework Capstone : désassemblage et analyse de flux de contrôle
5.1 Désassemblage programmatique avec Capstone
Capstone est un framework de désassemblage multi-architecture (x86, ARM, MIPS, PowerPC, SPARC, SystemZ) développé par Nguyen Anh Quynh. Contrairement à des outils interactifs comme IDA Pro, Capstone est conçu pour le désassemblage programmatique et s'intègre dans des pipelines d'analyse automatisée via ses bindings Python, Java, C#, Go et Rust.
Pour la déobfuscation, Capstone est essentiel pour :
- La reconstruction du Control Flow Graph (CFG) à partir du binaire brut
- La détection de prédicats opaques par analyse des patterns d'instructions
- L'identification de dead code injecté par les moteurs polymorphes
- La normalisation de séquences mutées vers leur forme canonique
#!/usr/bin/env python3
"""
Analyse de flux de contrôle et détection de code mort
avec Capstone pour la déobfuscation de malwares polymorphes.
"""
from capstone import *
from capstone.x86 import *
from collections import defaultdict
from typing import Dict, List, Set, Tuple, Optional
class CFGBuilder:
"""Reconstruit le Control Flow Graph à partir d'un binaire désassemblé."""
def __init__(self, code: bytes, base_addr: int = 0x400000,
mode: int = CS_MODE_64):
self.md = Cs(CS_ARCH_X86, mode)
self.md.detail = True
self.md.skipdata = True
self.code = code
self.base_addr = base_addr
self.blocks: Dict[int, 'BasicBlock'] = {}
self.edges: List[Tuple[int, int, str]] = []
def _is_branch(self, insn) -> bool:
"""Vérifie si une instruction est un branchement."""
return insn.group(X86_GRP_JUMP) or insn.group(X86_GRP_CALL)
def _is_conditional_branch(self, insn) -> bool:
"""Vérifie si c'est un branchement conditionnel."""
cond_mnemonics = {
'je', 'jne', 'jz', 'jnz', 'jg', 'jge', 'jl', 'jle',
'ja', 'jae', 'jb', 'jbe', 'jo', 'jno', 'js', 'jns',
'jp', 'jnp', 'jcxz', 'jecxz', 'jrcxz'
}
return insn.mnemonic in cond_mnemonics
def _is_unconditional_jump(self, insn) -> bool:
return insn.mnemonic == 'jmp'
def _is_ret(self, insn) -> bool:
return insn.group(X86_GRP_RET)
def _get_branch_target(self, insn) -> Optional[int]:
"""Extrait l'adresse cible d'un branchement direct."""
if insn.operands and insn.operands[0].type == X86_OP_IMM:
return insn.operands[0].imm
return None # Branchement indirect
def build(self) -> Dict:
"""Construit le CFG par désassemblage linéaire récursif."""
worklist = [self.base_addr]
visited: Set[int] = set()
instructions = {}
# Phase 1: Désassembler et collecter les instructions
for insn in self.md.disasm(self.code, self.base_addr):
instructions[insn.address] = insn
# Phase 2: Identifier les leaders de blocs de base
leaders: Set[int] = {self.base_addr}
for addr, insn in sorted(instructions.items()):
if self._is_branch(insn):
target = self._get_branch_target(insn)
if target and target in instructions:
leaders.add(target)
# L'instruction suivante est aussi un leader
next_addr = addr + insn.size
if next_addr in instructions:
leaders.add(next_addr)
if self._is_ret(insn):
next_addr = addr + insn.size
if next_addr in instructions:
leaders.add(next_addr)
# Phase 3: Construire les blocs de base
sorted_leaders = sorted(leaders)
for i, leader in enumerate(sorted_leaders):
block_insns = []
addr = leader
end_addr = (sorted_leaders[i + 1]
if i + 1 < len(sorted_leaders)
else self.base_addr + len(self.code))
while addr < end_addr and addr in instructions:
insn = instructions[addr]
block_insns.append(insn)
if self._is_branch(insn) or self._is_ret(insn):
break
addr += insn.size
if block_insns:
self.blocks[leader] = BasicBlock(leader, block_insns)
# Phase 4: Établir les arêtes
for addr, block in self.blocks.items():
last = block.instructions[-1]
next_addr = last.address + last.size
if self._is_ret(last):
continue
elif self._is_unconditional_jump(last):
target = self._get_branch_target(last)
if target:
self.edges.append((addr, target, 'unconditional'))
elif self._is_conditional_branch(last):
target = self._get_branch_target(last)
if target:
self.edges.append((addr, target, 'true'))
if next_addr in self.blocks:
self.edges.append((addr, next_addr, 'false'))
else:
if next_addr in self.blocks:
self.edges.append((addr, next_addr, 'fallthrough'))
return {
'blocks': len(self.blocks),
'edges': len(self.edges),
'entry': hex(self.base_addr)
}
class BasicBlock:
"""Représente un bloc de base dans le CFG."""
def __init__(self, addr: int, instructions: list):
self.addr = addr
self.instructions = instructions
self.size = sum(i.size for i in instructions)
def __repr__(self):
return (f"BasicBlock(0x{self.addr:x}, "
f"{len(self.instructions)} insns, "
f"{self.size} bytes)")
class DeadCodeDetector:
"""
Détecte le code mort injecté par les moteurs polymorphes.
Analyse les patterns d'instructions qui n'affectent pas
le résultat du programme.
"""
# Instructions considérées comme potentiellement mortes
# quand leur résultat n'est jamais utilisé
DEAD_PATTERNS = [
# Push/Pop du même registre (NOP sémantique)
lambda insns, i: (
i + 1 < len(insns) and
insns[i].mnemonic == 'push' and
insns[i+1].mnemonic == 'pop' and
len(insns[i].operands) > 0 and
len(insns[i+1].operands) > 0 and
insns[i].operands[0].type == X86_OP_REG and
insns[i+1].operands[0].type == X86_OP_REG and
insns[i].operands[0].reg == insns[i+1].operands[0].reg
),
# Opération suivie de son inverse (add N / sub N)
lambda insns, i: (
i + 1 < len(insns) and
insns[i].mnemonic == 'add' and
insns[i+1].mnemonic == 'sub' and
len(insns[i].operands) >= 2 and
len(insns[i+1].operands) >= 2 and
insns[i].operands[0].reg == insns[i+1].operands[0].reg and
insns[i].operands[1].type == X86_OP_IMM and
insns[i+1].operands[1].type == X86_OP_IMM and
insns[i].operands[1].imm == insns[i+1].operands[1].imm
),
# NOP et variantes
lambda insns, i: insns[i].mnemonic == 'nop',
# xchg reg, reg (même registre)
lambda insns, i: (
insns[i].mnemonic == 'xchg' and
len(insns[i].operands) >= 2 and
insns[i].operands[0].type == X86_OP_REG and
insns[i].operands[1].type == X86_OP_REG and
insns[i].operands[0].reg == insns[i].operands[1].reg
),
# mov reg, reg (même registre)
lambda insns, i: (
insns[i].mnemonic == 'mov' and
len(insns[i].operands) >= 2 and
insns[i].operands[0].type == X86_OP_REG and
insns[i].operands[1].type == X86_OP_REG and
insns[i].operands[0].reg == insns[i].operands[1].reg
),
]
@staticmethod
def detect(instructions: list) -> List[dict]:
"""Retourne la liste des instructions mortes détectées."""
dead = []
for i, insn in enumerate(instructions):
for pattern_fn in DeadCodeDetector.DEAD_PATTERNS:
try:
if pattern_fn(instructions, i):
dead.append({
'addr': hex(insn.address),
'instruction': f"{insn.mnemonic} {insn.op_str}",
'size': insn.size
})
break
except (IndexError, AttributeError):
continue
return dead
# Exemple d'utilisation
if __name__ == "__main__":
# Code obfusqué exemple avec dead code injecté
obfuscated_code = bytes([
0x55, # push rbp
0x48, 0x89, 0xE5, # mov rbp, rsp
0x50, 0x58, # push rax; pop rax (DEAD)
0x48, 0x89, 0xF8, # mov rax, rdi
0x90, # nop (DEAD)
0x48, 0x83, 0xC0, 0x05, # add rax, 5
0x48, 0x83, 0xE8, 0x05, # sub rax, 5 (DEAD pair avec add)
0x48, 0x31, 0xC9, # xor rcx, rcx
0x48, 0x87, 0xC9, # xchg rcx, rcx (DEAD)
0x48, 0x01, 0xF8, # add rax, rdi
0x5D, # pop rbp
0xC3, # ret
])
builder = CFGBuilder(obfuscated_code, base_addr=0x401000)
stats = builder.build()
print(f"CFG: {stats['blocks']} blocs, {stats['edges']} arêtes")
for addr, block in sorted(builder.blocks.items()):
print(f"\n--- Block 0x{addr:x} ---")
dead = DeadCodeDetector.detect(block.instructions)
for insn in block.instructions:
marker = " << DEAD" if any(
d['addr'] == hex(insn.address) for d in dead
) else ""
print(f" 0x{insn.address:x}: {insn.mnemonic:8s} "
f"{insn.op_str}{marker}")
if dead:
print(f" [{len(dead)} instruction(s) morte(s) détectée(s)]")
Reconstruction du CFG après suppression du code mort
6. Aplatissement du flux de contrôle (Control Flow Flattening)
6.1 Anatomie du CFF
L'aplatissement du flux de contrôle (Control Flow Flattening, CFF) est une technique d'obfuscation avancée implémentée par des outils comme OLLVM (Obfuscator-LLVM), Tigress, et divers protecteurs commerciaux. Le principe consiste à transformer la structure hiérarchique naturelle du CFG (if/else, boucles, séquences) en une boucle unique contenant un switch dispatcher.
Le mécanisme repose sur trois composants :
- State variable : une variable qui encode le bloc de base courant à exécuter
- Dispatcher : un switch/case central qui lit la state variable et route l'exécution vers le bloc correspondant
- Blocs aplatis : les blocs de base originaux, maintenant au même niveau hiérarchique, chacun se terminant par une mise à jour de la state variable avant de retourner au dispatcher
Considérons une fonction simple avant et après CFF :
// === Code original ===
int process(int x) {
int result = 0;
if (x > 10) {
result = x * 2;
} else {
result = x + 5;
}
return result + 1;
}
// === Après aplatissement CFF ===
int process_flattened(int x) {
int result = 0;
int state = 0xA1B2; // État initial
while (1) {
switch (state) {
case 0xA1B2: // Bloc d'entrée
if (x > 10)
state = 0xC3D4; // -> bloc then
else
state = 0xE5F6; // -> bloc else
break;
case 0xC3D4: // Bloc then
result = x * 2;
state = 0x7890; // -> bloc sortie
break;
case 0xE5F6: // Bloc else
result = x + 5;
state = 0x7890; // -> bloc sortie
break;
case 0x7890: // Bloc de sortie
return result + 1;
default:
return -1; // Ne devrait jamais arriver
}
}
}
6.2 Reconstruction du CFG original : script IDA Python
La reconstruction du flux de contrôle original nécessite d'identifier le dispatcher, la state variable, et les transitions entre blocs. Voici un script IDA Python complet pour automatiser cette reconstruction :
"""
Script IDA Python pour la reconstruction du CFG
après aplatissement de flux de contrôle (CFF/OLLVM).
Usage dans IDA Pro:
File -> Script file -> cff_deobfuscate.py
Puis: cff_deobfuscate(ea=here())
"""
import idaapi
import idautils
import idc
from collections import defaultdict
class CFFDeobfuscator:
"""
Reconstructeur de CFG pour binaires protégés par
Control Flow Flattening (OLLVM, Tigress, etc.).
"""
def __init__(self, func_ea):
self.func = idaapi.get_func(func_ea)
if not self.func:
raise ValueError(f"Pas de fonction à 0x{func_ea:x}")
self.func_ea = self.func.start_ea
self.func_end = self.func.end_ea
self.cfg = idaapi.FlowChart(self.func)
self.dispatcher = None
self.state_var = None
self.state_transitions = {}
self.real_blocks = []
self.dead_blocks = []
def identify_dispatcher(self):
"""
Identifie le bloc dispatcher (switch central).
Heuristique: le bloc avec le plus de prédécesseurs
qui contient une comparaison avec une constante suivie
d'un branchement conditionnel.
"""
predecessor_count = defaultdict(int)
for block in self.cfg:
for succ in block.succs():
predecessor_count[succ.start_ea] += 1
# Le dispatcher est le bloc avec le plus de prédécesseurs
# (tous les blocs aplatis y retournent)
candidates = sorted(
predecessor_count.items(),
key=lambda x: x[1],
reverse=True
)
for addr, count in candidates:
if count >= 3: # Au moins 3 prédécesseurs
# Vérifier que le bloc contient un switch pattern
if self._is_switch_pattern(addr):
self.dispatcher = addr
print(f"[+] Dispatcher identifié: 0x{addr:x} "
f"({count} prédécesseurs)")
return True
print("[-] Dispatcher non trouvé")
return False
def _is_switch_pattern(self, block_ea):
"""Vérifie si un bloc contient un pattern de switch/dispatcher."""
ea = block_ea
block_end = self._get_block_end(block_ea)
cmp_found = False
while ea < block_end and ea != idc.BADADDR:
mnem = idc.print_insn_mnem(ea)
if mnem in ('cmp', 'sub', 'test'):
cmp_found = True
if mnem in ('je', 'jne', 'jz', 'jnz', 'ja', 'jb') and cmp_found:
return True
ea = idc.next_head(ea, block_end)
return False
def _get_block_end(self, block_ea):
"""Retourne l'adresse de fin du bloc de base."""
for block in self.cfg:
if block.start_ea == block_ea:
return block.end_ea
return block_ea + 0x100 # Fallback
def identify_state_variable(self):
"""
Identifie la state variable utilisée par le dispatcher.
C'est typiquement le registre ou l'emplacement mémoire
comparé dans le dispatcher.
"""
if not self.dispatcher:
return False
ea = self.dispatcher
block_end = self._get_block_end(self.dispatcher)
while ea < block_end and ea != idc.BADADDR:
mnem = idc.print_insn_mnem(ea)
if mnem == 'cmp':
op0 = idc.print_operand(ea, 0)
self.state_var = op0
print(f"[+] State variable: {op0}")
return True
ea = idc.next_head(ea, block_end)
return False
def trace_transitions(self):
"""
Trace les transitions d'état entre les blocs aplatis.
Pour chaque bloc, identifie la valeur de la state variable
à la sortie (qui détermine le bloc suivant).
"""
if not self.state_var:
return
for block in self.cfg:
if block.start_ea == self.dispatcher:
continue
# Analyser les instructions du bloc à rebours
ea = block.end_ea
while ea > block.start_ea:
ea = idc.prev_head(ea, block.start_ea)
if ea == idc.BADADDR:
break
mnem = idc.print_insn_mnem(ea)
# Chercher les affectations à la state variable
# mov state_var, immediate
if mnem == 'mov':
op0 = idc.print_operand(ea, 0)
op1_type = idc.get_operand_type(ea, 1)
if op0 == self.state_var and op1_type == idc.o_imm:
next_state = idc.get_operand_value(ea, 1)
self.state_transitions[block.start_ea] = next_state
print(f" Bloc 0x{block.start_ea:x} -> "
f"état 0x{next_state:x}")
break
def reconstruct(self):
"""Pipeline complet de reconstruction du CFG."""
print(f"[*] Déobfuscation CFF de la fonction 0x{self.func_ea:x}")
print(f" Taille: {self.func_end - self.func_ea} octets")
print(f" Blocs: {sum(1 for _ in self.cfg)}")
if not self.identify_dispatcher():
return False
if not self.identify_state_variable():
return False
self.trace_transitions()
print(f"\n[+] Reconstruction terminée:")
print(f" Transitions identifiées: {len(self.state_transitions)}")
# Construire la map état -> bloc
state_to_block = {}
for block in self.cfg:
if block.start_ea == self.dispatcher:
continue
ea = block.start_ea
block_end = block.end_ea
while ea < block_end:
mnem = idc.print_insn_mnem(ea)
if mnem == 'cmp':
op1_type = idc.get_operand_type(ea, 1)
if op1_type == idc.o_imm:
state_val = idc.get_operand_value(ea, 1)
state_to_block[state_val] = block.start_ea
ea = idc.next_head(ea, block_end)
# Reconstituer l'ordre d'exécution
print("\n[+] Ordre d'exécution reconstruit:")
for src_block, next_state in sorted(self.state_transitions.items()):
dst_block = state_to_block.get(next_state, None)
dst_str = f"0x{dst_block:x}" if dst_block else "INCONNU"
print(f" 0x{src_block:x} -> {dst_str} (état 0x{next_state:x})")
# Commenter dans IDA pour visualisation
if dst_block:
idc.set_cmt(
src_block,
f"CFF: next -> 0x{dst_block:x}",
False
)
return True
def cff_deobfuscate(ea=None):
"""Point d'entrée du script."""
if ea is None:
ea = idc.here()
deob = CFFDeobfuscator(ea)
deob.reconstruct()
Aplatissement de flux de contrôle : avant / après
7. Étude de cas : déobfuscation de RedLine Stealer
7.1 Présentation de RedLine Stealer
RedLine Stealer est l'un des info-stealers les plus prolifiques du paysage des menaces depuis 2020. Distribué en tant que MaaS (Malware-as-a-Service) sur les forums underground pour environ 150-200 USD par mois, il cible les navigateurs (cookies, mots de passe, données de formulaires), les portefeuilles de cryptomonnaies, les clients VPN et FTP, ainsi que les informations système.
RedLine est développé en C# (.NET) et systématiquement protégé par ConfuserEx, un obfuscateur open-source pour .NET. Les couches d'obfuscation typiques incluent :
- Renommage : classes, méthodes et champs renommés en caractères Unicode non imprimables ou en séquences aléatoires
- Protection des chaînes : toutes les chaînes de caractères (URLs C2, noms de navigateurs, chemins de fichiers) sont chiffrées et déchiffrées à l'exécution
- Control Flow Obfuscation : insertion de switch dispatchers et de prédicats opaques dans les méthodes critiques
- Anti-tamper : vérification de l'intégrité du binaire pour détecter les modifications
- Constant folding : les constantes numériques sont calculées dynamiquement via des expressions arithmétiques complexes
7.2 Pipeline de déobfuscation avec de4dot et dnSpy
La déobfuscation d'un échantillon RedLine suit un pipeline méthodique. de4dot est un déobfuscateur .NET automatique qui gère la majorité des protections ConfuserEx. dnSpy (ou son fork dnSpyEx) permet l'inspection et la modification du code IL.
#!/bin/bash
# Pipeline de déobfuscation RedLine Stealer
# Prérequis: de4dot, dnSpy/dnSpyEx, monodis, strings
SAMPLE="$1"
WORKDIR="/opt/analysis/redline/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$WORKDIR"/{original,deobfuscated,extracted}
echo "=========================================="
echo " RedLine Stealer - Pipeline Déobfuscation"
echo "=========================================="
echo "[*] Sample: $SAMPLE"
echo "[*] Working dir: $WORKDIR"
# Étape 1: Identification et hachage
echo -e "\n[Étape 1] Identification"
sha256=$(sha256sum "$SAMPLE" | cut -d' ' -f1)
echo " SHA-256: $sha256"
cp "$SAMPLE" "$WORKDIR/original/$sha256.exe"
# Vérifier que c'est bien du .NET
file_type=$(file "$SAMPLE")
if ! echo "$file_type" | grep -qi "\.NET\|PE32\|Mono"; then
echo " [!] Pas un binaire .NET - arrêt"
exit 1
fi
echo " Type: $file_type"
# Étape 2: Déobfuscation avec de4dot
echo -e "\n[Étape 2] Déobfuscation de4dot"
de4dot "$SAMPLE" \
-o "$WORKDIR/deobfuscated/clean.exe" \
--strtyp delegate \
--strtok 0x06000001 \
2>&1 | tee "$WORKDIR/de4dot.log"
# Si la détection auto échoue, forcer ConfuserEx
if [ ! -f "$WORKDIR/deobfuscated/clean.exe" ]; then
echo " [!] Retry avec détection forcée ConfuserEx..."
de4dot "$SAMPLE" \
-p cr \
-o "$WORKDIR/deobfuscated/clean.exe" \
2>&1 | tee -a "$WORKDIR/de4dot.log"
fi
# Étape 3: Extraction des chaînes déchiffrées
echo -e "\n[Étape 3] Extraction des chaînes"
strings -a -n 6 "$WORKDIR/deobfuscated/clean.exe" | sort -u \
> "$WORKDIR/extracted/strings_all.txt"
# Extraction des IOCs réseau
grep -oP 'https?://[^\s"<>]+' "$WORKDIR/extracted/strings_all.txt" \
> "$WORKDIR/extracted/urls.txt" 2>/dev/null
grep -oP '\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b' \
"$WORKDIR/extracted/strings_all.txt" \
> "$WORKDIR/extracted/ips.txt" 2>/dev/null
grep -oP '\b[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b' \
"$WORKDIR/extracted/strings_all.txt" | \
grep -v -E '\.(dll|exe|sys|ocx|com|net|org)$' \
> "$WORKDIR/extracted/domains.txt" 2>/dev/null
# Étape 4: Extraction de la config C2 via pattern matching
echo -e "\n[Étape 4] Extraction configuration C2"
python3 << 'PYEOF'
import re
import sys
deobfuscated = "$WORKDIR/deobfuscated/clean.exe"
try:
with open(deobfuscated, "rb") as f:
data = f.read()
except FileNotFoundError:
print(" [-] Fichier déobfusqué non trouvé")
sys.exit(1)
# Pattern C2: IP:PORT typique de RedLine
c2_pattern = rb'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{2,5})'
c2_matches = set(re.findall(c2_pattern, data))
# Pattern ID de build RedLine
build_pattern = rb'(Build[_\-]?\w{4,20})'
build_matches = set(re.findall(build_pattern, data))
# Pattern clé d'autorisation (base64-like)
auth_pattern = rb'([A-Za-z0-9+/]{20,}={0,2})'
auth_candidates = re.findall(auth_pattern, data)
print(" C2 Servers:")
for c2 in c2_matches:
print(f" - {c2.decode('utf-8', errors='ignore')}")
print(" Build IDs:")
for bid in build_matches:
print(f" - {bid.decode('utf-8', errors='ignore')}")
# Sauvegarder les IOCs
with open("$WORKDIR/extracted/iocs.txt", "w") as f:
f.write("# RedLine Stealer IOCs\n")
f.write(f"# SHA-256: $sha256\n\n")
f.write("[C2 Servers]\n")
for c2 in c2_matches:
f.write(f"{c2.decode('utf-8', errors='ignore')}\n")
f.write("\n[Build IDs]\n")
for bid in build_matches:
f.write(f"{bid.decode('utf-8', errors='ignore')}\n")
print(f"\n IOCs sauvegardés: $WORKDIR/extracted/iocs.txt")
PYEOF
# Étape 5: Analyse IL avec monodis
echo -e "\n[Étape 5] Décompilation IL"
monodis "$WORKDIR/deobfuscated/clean.exe" \
> "$WORKDIR/extracted/il_dump.txt" 2>/dev/null
echo -e "\n[*] Pipeline terminé. Résultats dans: $WORKDIR"
echo " - Original: $WORKDIR/original/"
echo " - Déobfusqué: $WORKDIR/deobfuscated/"
echo " - IOCs extraits: $WORKDIR/extracted/"
7.3 Déchiffrement des chaînes ConfuserEx en Python
Lorsque de4dot ne parvient pas à résoudre automatiquement les chaînes protégées (versions modifiées de ConfuserEx), il est nécessaire de reproduire l'algorithme de déchiffrement manuellement. Voici l'implémentation du déchiffreur de chaînes ConfuserEx :
#!/usr/bin/env python3
"""
Déchiffreur de chaînes ConfuserEx pour RedLine Stealer.
Reproduit l'algorithme de déchiffrement des chaînes protégées
sans exécuter le binaire .NET.
ConfuserEx utilise typiquement:
1. Un tableau de bytes compressé (deflate) stocké en ressource
2. Un déchiffrement XOR avec clé dérivée du token de la méthode appelante
3. Optionnellement un chiffrement additionnel (RC4 ou mutation custom)
"""
import struct
import zlib
import hashlib
from typing import Optional
class ConfuserExStringDecryptor:
"""Déchiffre les chaînes protégées par ConfuserEx."""
def __init__(self, encrypted_resource: bytes, module_key: int):
"""
Args:
encrypted_resource: contenu brut de la ressource .NET
contenant les chaînes chiffrées
module_key: clé de module (typiquement le RID
de la méthode de déchiffrement)
"""
self.module_key = module_key
self.data = self._decompress(encrypted_resource)
self.strings_cache = {}
def _decompress(self, data: bytes) -> bytes:
"""Décompresse les données (deflate sans header zlib)."""
try:
return zlib.decompress(data, -15) # Raw deflate
except zlib.error:
try:
return zlib.decompress(data) # Avec header zlib
except zlib.error:
return data # Pas compressé
def _derive_key(self, caller_token: int) -> int:
"""
Dérive la clé XOR à partir du token de la méthode appelante.
Reproduit l'algorithme de ConfuserEx:
key = (caller_token * 0x5bd1e995) ^ module_key
"""
key = (caller_token * 0x5BD1E995) & 0xFFFFFFFF
key = key ^ self.module_key
return key
def decrypt_string(self, string_id: int,
caller_token: int) -> Optional[str]:
"""
Déchiffre une chaîne par son identifiant et le token de
la méthode appelante.
Args:
string_id: index dans le tableau de chaînes
caller_token: metadata token de la méthode .NET appelante
(format 0x0600XXXX pour MethodDef)
Returns:
La chaîne déchiffrée ou None en cas d'erreur
"""
cache_key = (string_id, caller_token)
if cache_key in self.strings_cache:
return self.strings_cache[cache_key]
key = self._derive_key(caller_token)
try:
# Lire l'offset et la longueur depuis le header
offset = string_id * 4
if offset + 4 > len(self.data):
return None
str_offset = struct.unpack_from('= len(self.data):
return None
# Lire la longueur de la chaîne (encodée en 7-bit compact)
pos = str_offset
str_len = 0
shift = 0
while pos < len(self.data):
b = self.data[pos]
str_len |= (b & 0x7F) << shift
pos += 1
if (b & 0x80) == 0:
break
shift += 7
if pos + str_len * 2 > len(self.data):
return None
# Déchiffrer les caractères UTF-16LE
chars = []
xor_key = key
for i in range(str_len):
c = struct.unpack_from('> (8 * (i % 4))) & 0xFFFF
chars.append(chr(c))
# Rotation de la clé
xor_key = ((xor_key >> 3) | (xor_key << 29)) & 0xFFFFFFFF
result = ''.join(chars)
self.strings_cache[cache_key] = result
return result
except (struct.error, IndexError, ValueError):
return None
def decrypt_all(self, method_tokens: list) -> dict:
"""
Tente de déchiffrer toutes les chaînes pour une liste
de tokens de méthodes.
Returns:
Dict mapping (string_id, token) -> chaîne déchiffrée
"""
results = {}
for token in method_tokens:
for sid in range(1000): # Heuristique: max 1000 chaînes
s = self.decrypt_string(sid, token)
if s and len(s) > 0 and all(
c.isprintable() or c in '\r\n\t' for c in s
):
results[(sid, hex(token))] = s
return results
if __name__ == "__main__":
# Exemple d'utilisation avec un échantillon RedLine
# Les valeurs ci-dessous sont à adapter au sample analysé
print("[*] ConfuserEx String Decryptor")
print(" Adapter encrypted_resource et module_key au sample")
# Simulation avec des données de test
test_data = bytes(range(256)) * 4
decryptor = ConfuserExStringDecryptor(
encrypted_resource=test_data,
module_key=0x1A2B3C4D
)
print(f" Data size after decompression: {len(decryptor.data)}")
print(" Prêt pour decrypt_string(id, caller_token)")
IOCs typiques RedLine Stealer (2025-2026)
Les échantillons récents de RedLine Stealer communiquent via des API SOAP/WCF sur des ports non standard (généralement 12432, 15647, 17816, ou 23984). La communication est souvent chiffrée par un certificat auto-signé. Les serveurs C2 tournent principalement sur des VPS en Russie, aux Pays-Bas et en Roumanie. Les chaînes caractéristiques après déobfuscation incluent : Authorization, ScanBrowsers, ScanFTP, ScanWallets, ScanScreen, ScanTelegram, GrabInfo.
8. Automatisation avec YARA : détection et classification
8.1 Règles YARA pour la détection de packing et d'obfuscation
YARA est le standard de facto pour la création de signatures de détection de malwares. Contrairement aux signatures antivirus traditionnelles basées sur des hashs, les règles YARA permettent de décrire des patterns structurels et comportementaux résistants aux mutations polymorphes.
Pour la détection de malwares obfusqués, les règles YARA doivent cibler des invariants qui survivent aux transformations polymorphes : structures de données, séquences d'opcodes caractéristiques des stubs, patterns d'entropie, et métadonnées PE.
/*
* Règles YARA avancées pour la détection de packing
* et d'obfuscation dans les binaires PE.
*
* Auteur: Ayi NEDJIMI - ayinedjimi-consultants.fr
* Date: 2026-02-05
*/
import "pe"
import "math"
import "hash"
rule Packed_High_Entropy_Sections {
meta:
description = "Détecte les PE avec sections à haute entropie (packing probable)"
severity = "medium"
category = "packer"
condition:
uint16(0) == 0x5A4D and
for any section in pe.sections : (
math.entropy(section.offset, section.size) > 7.2 and
section.size > 1024
)
}
rule Packed_UPX_Modified {
meta:
description = "Détecte UPX avec en-têtes modifiés (anti-unpacking)"
severity = "high"
category = "packer"
strings:
// Noms de sections UPX altérés (premier octet modifié)
$s1 = { 00 50 58 30 } // \x00PX0 au lieu de UPX0
$s2 = { 00 50 58 31 } // \x00PX1
$s3 = { 55 58 50 30 } // UXP0 (octets inversés)
$s4 = { 55 58 50 31 } // UXP1
// Pattern du stub UPX même modifié
$stub = { 60 BE ?? ?? ?? ?? 8D BE ?? ?? ?? ?? 57 }
condition:
uint16(0) == 0x5A4D and
(any of ($s*)) and
$stub
}
rule VMProtect_Virtualized {
meta:
description = "Détecte la virtualisation VMProtect"
severity = "critical"
category = "protector"
strings:
// Nom de section VMProtect
$vmp0 = ".vmp0"
$vmp1 = ".vmp1"
$vmp2 = ".vmp2"
// Pattern d'entrée VM (push regs + jmp dispatcher)
$vm_entry_x86 = {
60 // pushad
9C // pushfd
68 ?? ?? ?? ?? // push imm32
E8 ?? ?? ?? ?? // call vm_dispatcher
}
$vm_entry_x64 = {
50 51 52 53 // push rax,rcx,rdx,rbx
55 56 57 // push rbp,rsi,rdi
41 50 41 51 // push r8, r9
}
condition:
uint16(0) == 0x5A4D and
(any of ($vmp*)) and
(any of ($vm_entry*))
}
rule Themida_Protected {
meta:
description = "Détecte la protection Themida/WinLicense"
severity = "critical"
category = "protector"
strings:
$s1 = ".themida" ascii
$s2 = ".winlice" ascii
$s3 = "THEMIDA" ascii wide
// Anti-debug check pattern
$anti_dbg = {
64 A1 30 00 00 00 // mov eax, fs:[0x30] (PEB)
0F B6 40 02 // movzx eax, byte [eax+2] (BeingDebugged)
85 C0 // test eax, eax
}
condition:
uint16(0) == 0x5A4D and
(any of ($s*)) and
$anti_dbg
}
rule ConfuserEx_NET_Obfuscated {
meta:
description = "Détecte l'obfuscation ConfuserEx sur binaires .NET"
severity = "high"
category = "obfuscator"
family = "confuserex"
strings:
// Markers ConfuserEx dans les métadonnées .NET
$marker1 = "ConfuserEx" ascii wide nocase
$marker2 = "Confuser.Core" ascii
// Pattern de protection de chaînes (delegate call)
$str_prot = {
7E ?? ?? ?? 04 // ldsfld
28 ?? ?? ?? 06 // call
72 ?? ?? ?? 70 // ldstr ""
}
// Anti-tamper module initializer
$tamper = {
28 ?? ?? ?? 0A // call
2A // ret
00 // padding
13 30 // .method header
}
condition:
uint16(0) == 0x5A4D and
(any of ($marker*) or $str_prot) and
pe.imports("mscoree.dll", "_CorExeMain")
}
rule Polymorphic_XOR_Decrypt_Stub {
meta:
description = "Détecte les stubs de déchiffrement XOR polymorphes"
severity = "high"
category = "polymorphic"
strings:
// Pattern: boucle XOR avec registre index + compteur
$xor_loop_1 = {
30 (1? | 0?) [0-2] // xor byte [reg+idx], reg
(40 | 41 | 42 | 43 | 46 | 47 | FF C? | 83 C? 01) // inc reg
(48 | 49 | 4A | 4B | 4E | 4F | FF C? | 83 E? 01) // dec reg
(75 | 0F 85) ?? // jnz loop
}
// Pattern: boucle XOR avec LOOP instruction
$xor_loop_2 = {
30 [1-3] // xor byte [mem], reg
[0-4] // possible inc/lea
E2 ?? // loop
}
// Variante: XOR word/dword
$xor_loop_3 = {
(31 | 33) [1-3] // xor dword [mem], reg
(83 C? 04 | 83 E? 04) // add/sub reg, 4
(3B | 39 | 3D) [1-5] // cmp
(72 | 76 | 7C | 0F 82 | 0F 86) ?? // jb/jbe/jl
}
condition:
uint16(0) == 0x5A4D and
any of ($xor_loop_*) and
for any section in pe.sections : (
math.entropy(section.offset, section.size) > 6.8
)
}
rule RedLine_Stealer_Deobfuscated {
meta:
description = "Détecte RedLine Stealer après déobfuscation"
severity = "critical"
category = "infostealer"
family = "redline"
mitre = "T1555, T1539, T1552"
strings:
$func1 = "ScanBrowsers" ascii wide
$func2 = "ScanFTP" ascii wide
$func3 = "ScanWallets" ascii wide
$func4 = "GrabInfo" ascii wide
$func5 = "ScanScreen" ascii wide
$func6 = "ScanTelegram" ascii wide
$func7 = "ScanDiscord" ascii wide
$func8 = "ScanSteam" ascii wide
$net1 = "Authorization" ascii wide
$net2 = "Content-Type" ascii wide
$net3 = "application/soap+xml" ascii wide
$cfg1 = /\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{2,5}/ ascii
condition:
uint16(0) == 0x5A4D and
pe.imports("mscoree.dll", "_CorExeMain") and
(3 of ($func*)) and
(2 of ($net*)) and
$cfg1
}
8.2 Pipeline d'automatisation Python + YARA
L'intégration de YARA dans un pipeline de triage automatisé permet de classer rapidement les échantillons selon leur niveau d'obfuscation et d'orienter l'analyse vers les outils appropriés :
#!/usr/bin/env python3
"""
Pipeline de triage automatisé pour malwares obfusqués.
Combine YARA, analyse PE et heuristiques d'entropie pour
classifier les échantillons et orienter l'analyse.
"""
import yara
import pefile
import math
import json
import hashlib
from pathlib import Path
from datetime import datetime
from typing import Dict, List
class MalwareTriagePipeline:
"""Pipeline de triage automatisé pour binaires PE suspects."""
def __init__(self, yara_rules_dir: str):
"""
Args:
yara_rules_dir: répertoire contenant les fichiers .yar
"""
self.rules = self._compile_rules(yara_rules_dir)
self.results = []
def _compile_rules(self, rules_dir: str) -> yara.Rules:
"""Compile toutes les règles YARA du répertoire."""
rule_files = {}
for yar_file in Path(rules_dir).glob("*.yar"):
rule_files[yar_file.stem] = str(yar_file)
if not rule_files:
raise FileNotFoundError(
f"Aucune règle YARA dans {rules_dir}"
)
print(f"[*] Compilation de {len(rule_files)} fichiers YARA...")
return yara.compile(filepaths=rule_files)
def _calculate_hashes(self, data: bytes) -> Dict[str, str]:
return {
'md5': hashlib.md5(data).hexdigest(),
'sha1': hashlib.sha1(data).hexdigest(),
'sha256': hashlib.sha256(data).hexdigest()
}
def _calculate_entropy(self, data: bytes) -> float:
if not data:
return 0.0
counts = [0] * 256
for byte in data:
counts[byte] += 1
entropy = 0.0
for count in counts:
if count > 0:
p = count / len(data)
entropy -= p * math.log2(p)
return round(entropy, 4)
def _analyze_pe(self, filepath: str) -> Dict:
"""Analyse approfondie de la structure PE."""
try:
pe = pefile.PE(filepath)
except pefile.PEFormatError:
return {'error': 'Invalid PE'}
sections = []
for section in pe.sections:
name = section.Name.decode('utf-8', errors='ignore').rstrip('\x00')
data = section.get_data()
sections.append({
'name': name,
'virtual_size': section.Misc_VirtualSize,
'raw_size': section.SizeOfRawData,
'entropy': self._calculate_entropy(data),
'executable': bool(section.Characteristics & 0x20000000),
'writable': bool(section.Characteristics & 0x80000000),
'size_ratio': (section.Misc_VirtualSize /
max(section.SizeOfRawData, 1))
})
imports = []
if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll_name = entry.dll.decode('utf-8', errors='ignore')
funcs = [imp.name.decode('utf-8', errors='ignore')
for imp in entry.imports
if imp.name]
imports.append({'dll': dll_name, 'functions': funcs})
return {
'sections': sections,
'imports': imports,
'entry_point': hex(pe.OPTIONAL_HEADER.AddressOfEntryPoint),
'image_base': hex(pe.OPTIONAL_HEADER.ImageBase),
'is_dll': bool(pe.FILE_HEADER.Characteristics & 0x2000),
'is_dotnet': any(
entry.dll.lower() == b'mscoree.dll'
for entry in getattr(pe, 'DIRECTORY_ENTRY_IMPORT', [])
),
'compile_timestamp': datetime.utcfromtimestamp(
pe.FILE_HEADER.TimeDateStamp
).isoformat() if pe.FILE_HEADER.TimeDateStamp else None,
'num_sections': len(pe.sections),
}
def _classify_obfuscation(self, yara_matches: list,
pe_info: Dict) -> Dict:
"""Classifie le type et le niveau d'obfuscation."""
classification = {
'level': 'none', # none, low, medium, high, critical
'types': [],
'recommended_tools': [],
'estimated_effort': 'minimal'
}
categories = set()
for match in yara_matches:
meta = match.meta
categories.add(meta.get('category', 'unknown'))
# Niveau basé sur les catégories détectées
if 'protector' in categories:
classification['level'] = 'critical'
classification['types'].append('virtualisation')
classification['recommended_tools'].extend([
'x64dbg + Scylla', 'VMProtect devirt',
'Exécution symbolique (angr)'
])
classification['estimated_effort'] = 'jours'
elif 'packer' in categories:
classification['level'] = 'high'
classification['types'].append('packing')
classification['recommended_tools'].extend([
'UPX -d', 'PE-bear', 'Detect It Easy'
])
classification['estimated_effort'] = 'heures'
elif 'obfuscator' in categories:
classification['level'] = 'high'
classification['types'].append('obfuscation_code')
classification['recommended_tools'].extend([
'de4dot', 'dnSpy', 'ILSpy'
])
classification['estimated_effort'] = 'heures'
elif 'polymorphic' in categories:
classification['level'] = 'medium'
classification['types'].append('polymorphisme')
classification['recommended_tools'].extend([
'Capstone', 'Triton', 'angr'
])
classification['estimated_effort'] = 'heures'
# Heuristiques supplémentaires basées sur l'analyse PE
if pe_info.get('sections'):
high_entropy_sections = [
s for s in pe_info['sections']
if s['entropy'] > 7.0
]
if high_entropy_sections and classification['level'] == 'none':
classification['level'] = 'medium'
classification['types'].append('entropy_anomaly')
# Sections avec taille virtuelle >> taille brute (unpacking)
expanded_sections = [
s for s in pe_info['sections']
if s['size_ratio'] > 5.0
]
if expanded_sections:
classification['types'].append('runtime_unpacking')
return classification
def analyze(self, filepath: str) -> Dict:
"""Analyse complète d'un échantillon."""
data = Path(filepath).read_bytes()
hashes = self._calculate_hashes(data)
print(f"\n{'='*60}")
print(f"[*] Analyse: {Path(filepath).name}")
print(f" SHA-256: {hashes['sha256']}")
print(f" Taille: {len(data):,} octets")
# YARA matching
yara_matches = self.rules.match(filepath)
print(f" Règles YARA: {len(yara_matches)} correspondance(s)")
for match in yara_matches:
severity = match.meta.get('severity', 'unknown')
print(f" - {match.rule} [{severity}]")
# Analyse PE
pe_info = self._analyze_pe(filepath)
# Classification
classification = self._classify_obfuscation(yara_matches, pe_info)
print(f" Niveau obfuscation: {classification['level'].upper()}")
print(f" Types: {', '.join(classification['types']) or 'aucun'}")
print(f" Effort estimé: {classification['estimated_effort']}")
if classification['recommended_tools']:
print(f" Outils recommandés:")
for tool in classification['recommended_tools']:
print(f" -> {tool}")
result = {
'filename': Path(filepath).name,
'hashes': hashes,
'size': len(data),
'file_entropy': self._calculate_entropy(data),
'yara_matches': [m.rule for m in yara_matches],
'pe_info': pe_info,
'classification': classification,
'timestamp': datetime.utcnow().isoformat()
}
self.results.append(result)
return result
def export_report(self, output_path: str):
"""Exporte le rapport complet en JSON."""
report = {
'pipeline': 'MalwareTriagePipeline',
'version': '1.0',
'date': datetime.utcnow().isoformat(),
'total_samples': len(self.results),
'results': self.results
}
Path(output_path).write_text(
json.dumps(report, indent=2, default=str)
)
print(f"\n[+] Rapport exporté: {output_path}")
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} ")
sys.exit(1)
pipeline = MalwareTriagePipeline(sys.argv[1])
samples_dir = Path(sys.argv[2])
for sample in sorted(samples_dir.iterdir()):
if sample.is_file():
try:
pipeline.analyze(str(sample))
except Exception as e:
print(f" [!] Erreur: {e}")
pipeline.export_report(
f"/opt/analysis/triage_{datetime.now():%Y%m%d_%H%M%S}.json"
)
9. Outils avancés : Ghidra, IDA Pro, Binary Ninja, radare2/rizin
9.1 Ghidra scripting pour la déobfuscation
Ghidra, le framework de rétro-ingénierie open-source développé par la NSA, offre un environnement de scripting puissant via Java et Python (Jython). Son décompilateur intégré et son moteur d'analyse P-Code en font un outil particulièrement adapté à la déobfuscation automatisée.
Le script suivant utilise l'API Ghidra pour identifier et résoudre automatiquement les prédicats opaques dans une fonction analysée :
# Ghidra Script: Détection et résolution de prédicats opaques
# @category Deobfuscation
# @keybinding
# @menupath Analysis.Deobfuscate Opaque Predicates
# @toolbar
"""
Script Ghidra pour la détection automatique de prédicats opaques
et la simplification du flux de contrôle.
Exécuter depuis le Script Manager de Ghidra ou via analyzeHeadless:
analyzeHeadless /path/to/project ProjectName \
-import sample.exe \
-postScript opaque_predicate_resolver.py
"""
from ghidra.program.model.block import BasicBlockModel
from ghidra.program.model.pcode import PcodeOp
from ghidra.app.decompiler import DecompInterface
from ghidra.util.task import ConsoleTaskMonitor
import ghidra.program.model.symbol.FlowType as FlowType
def get_function_at_cursor():
"""Récupère la fonction sous le curseur."""
fm = currentProgram.getFunctionManager()
func = fm.getFunctionContaining(currentAddress)
if func is None:
popup("Aucune fonction sous le curseur")
return None
return func
def analyze_opaque_predicates(func):
"""
Analyse les blocs de base d'une fonction et identifie
les prédicats opaques via l'analyse P-Code.
"""
block_model = BasicBlockModel(currentProgram)
blocks = block_model.getCodeBlocksContaining(
func.getBody(), monitor
)
opaque_count = 0
results = []
# Initialiser le décompilateur pour l'analyse sémantique
decomp = DecompInterface()
decomp.openProgram(currentProgram)
decomp_results = decomp.decompileFunction(func, 30, monitor)
if not decomp_results.decompileCompleted():
printerr("Échec de la décompilation")
return results
high_func = decomp_results.getHighFunction()
if high_func is None:
return results
# Parcourir les blocs P-Code
pcode_blocks = high_func.getBasicBlocks()
for pblock in pcode_blocks:
# Chercher les CBRANCH (conditional branch)
iterator = pblock.getIterator()
last_cbranch = None
while iterator.hasNext():
pcode_op = iterator.next()
if pcode_op.getOpcode() == PcodeOp.CBRANCH:
last_cbranch = pcode_op
if last_cbranch is None:
continue
# Analyser la condition du CBRANCH
condition = last_cbranch.getInput(1) # La condition booléenne
if condition is None:
continue
# Vérifier si la condition est une constante
# (= prédicat opaque résolu par le décompilateur)
defining_op = condition.getDef()
if defining_op is not None:
# Tenter d'évaluer à une constante
if condition.isConstant():
const_val = condition.getOffset()
branch_target = last_cbranch.getInput(0)
opaque_count += 1
entry = pblock.getStart()
result = {
'address': entry.toString(),
'always_true': const_val != 0,
'condition_pcode': str(defining_op),
}
results.append(result)
# Annoter dans Ghidra
listing = currentProgram.getListing()
code_unit = listing.getCodeUnitAt(entry)
if code_unit is not None:
direction = "ALWAYS TRUE" if const_val != 0 else "ALWAYS FALSE"
code_unit.setComment(
code_unit.PRE_COMMENT,
"[OPAQUE] Predicate: " + direction
)
decomp.dispose()
return results
def patch_opaque_branches(func, results):
"""
Patche les prédicats opaques en remplaçant les branchements
conditionnels par des JMP inconditionnels ou des NOP.
"""
from ghidra.program.model.mem import MemoryAccessException
patched = 0
for result in results:
addr = toAddr(result['address'])
insn = getInstructionAt(addr)
if insn is None:
continue
# Trouver l'instruction de branchement conditionnel
while insn is not None and not insn.getFlowType().isConditional():
insn = insn.getNext()
if insn is None:
continue
println(" Patching @ " + insn.getAddress().toString() +
": " + insn.toString())
# Si le prédicat est toujours vrai -> remplacer par JMP
# Si toujours faux -> remplacer par NOP
try:
if result['always_true']:
# Calculer le JMP relatif vers la cible
target = insn.getFlows()[0] if insn.getFlows() else None
if target:
println(" -> JMP vers " + target.toString())
patched += 1
else:
# NOP le branchement (il ne sera jamais pris)
insn_len = insn.getLength()
nops = bytearray([0x90] * insn_len)
currentProgram.getMemory().setBytes(
insn.getAddress(), bytes(nops)
)
println(" -> NOPped (" + str(insn_len) + " bytes)")
patched += 1
except MemoryAccessException as e:
printerr(" Erreur patch: " + str(e))
return patched
# Point d'entrée du script
func = get_function_at_cursor()
if func:
println("=" * 50)
println("Analyse de: " + func.getName() +
" @ " + func.getEntryPoint().toString())
println("Taille: " + str(func.getBody().getNumAddresses()) + " bytes")
println("=" * 50)
results = analyze_opaque_predicates(func)
println("\nPrédicats opaques détectés: " + str(len(results)))
for r in results:
println(" " + r['address'] + " -> " +
("ALWAYS TRUE" if r['always_true'] else "ALWAYS FALSE"))
if results and askYesNo("Patch",
"Patcher " + str(len(results)) + " prédicats opaques ?"):
patched = patch_opaque_branches(func, results)
println("\n" + str(patched) + " branchements patchés")
9.2 IDA Pro et IDAPython avancé
IDA Pro reste l'outil de référence pour l'analyse de binaires complexes. Son API IDAPython permet d'automatiser des tâches de déobfuscation avancées. Voici un script pour la détection et la reconstruction des appels indirects obfusqués :
"""
IDAPython: Résolution d'appels indirects obfusqués.
Identifie les patterns call [reg] où le registre est
calculé par une séquence d'opérations obfusquée, et
résout la cible réelle par émulation partielle.
"""
import ida_bytes
import ida_funcs
import ida_ua
import ida_idp
import idautils
import idc
def resolve_indirect_calls(func_ea):
"""
Parcourt une fonction et résout les appels indirects
dont la cible est calculée par constant folding.
"""
func = ida_funcs.get_func(func_ea)
if not func:
print(f"[-] Pas de fonction à 0x{func_ea:x}")
return
resolved = 0
for head in idautils.Heads(func.start_ea, func.end_ea):
insn = ida_ua.insn_t()
if ida_ua.decode_insn(insn, head) == 0:
continue
# Chercher les CALL indirects (call reg ou call [mem])
if insn.itype not in [ida_idp.NN_call, ida_idp.NN_callfi,
ida_idp.NN_callni]:
continue
op = insn.ops[0]
if op.type == ida_ua.o_reg:
# call reg -> remonter pour trouver la valeur
reg_name = ida_idp.get_reg_name(op.reg, 8)
target = _trace_register_value(head, op.reg, func)
if target and target != 0:
print(f" [+] 0x{head:x}: call {reg_name} "
f"-> 0x{target:x}")
# Ajouter un commentaire et une xref
idc.set_cmt(head, f"Résolu: call 0x{target:x}", False)
ida_bytes.add_cref(head, target, 0) # Code xref
resolved += 1
print(f"[*] {resolved} appels indirects résolus")
return resolved
def _trace_register_value(call_addr, reg, func):
"""
Remonte les instructions depuis call_addr pour
déterminer la valeur du registre par propagation
de constantes.
"""
# Parcourir les instructions précédentes (max 20)
addr = call_addr
value = None
operations = []
for _ in range(20):
addr = idc.prev_head(addr, func.start_ea)
if addr == idc.BADADDR or addr < func.start_ea:
break
insn = ida_ua.insn_t()
if ida_ua.decode_insn(insn, addr) == 0:
continue
# MOV reg, imm -> valeur directe
if (insn.itype == ida_idp.NN_mov and
insn.ops[0].type == ida_ua.o_reg and
insn.ops[0].reg == reg and
insn.ops[1].type == ida_ua.o_imm):
value = insn.ops[1].value
break
# ADD reg, imm
if (insn.itype == ida_idp.NN_add and
insn.ops[0].type == ida_ua.o_reg and
insn.ops[0].reg == reg and
insn.ops[1].type == ida_ua.o_imm):
operations.append(('add', insn.ops[1].value))
# SUB reg, imm
if (insn.itype == ida_idp.NN_sub and
insn.ops[0].type == ida_ua.o_reg and
insn.ops[0].reg == reg and
insn.ops[1].type == ida_ua.o_imm):
operations.append(('sub', insn.ops[1].value))
# XOR reg, imm
if (insn.itype == ida_idp.NN_xor and
insn.ops[0].type == ida_ua.o_reg and
insn.ops[0].reg == reg and
insn.ops[1].type == ida_ua.o_imm):
operations.append(('xor', insn.ops[1].value))
# Appliquer les opérations en ordre inverse
if value is not None:
for op, imm in reversed(operations):
if op == 'add':
value = (value + imm) & 0xFFFFFFFFFFFFFFFF
elif op == 'sub':
value = (value - imm) & 0xFFFFFFFFFFFFFFFF
elif op == 'xor':
value = value ^ imm
return value
return None
9.3 Comparaison des outils de reverse engineering
| Critère | Ghidra | IDA Pro | Binary Ninja | radare2/rizin |
|---|---|---|---|---|
| Licence | Open-source (Apache 2.0) | Commercial (~1700 EUR/an) | Commercial (~350 USD) | Open-source (LGPL3) |
| Décompilateur | Intégré (P-Code) | Hex-Rays (addon payant) | Intégré (HLIL/MLIL) | Via r2ghidra ou r2dec |
| Scripting | Java, Python (Jython) | Python (IDAPython) | Python, C++, Rust | r2pipe (Python, JS, etc.) |
| Architectures | ~30+ (x86, ARM, MIPS, PPC...) | ~20+ avec plugins | ~15+ officielles | ~30+ (très extensible) |
| Analyse headless | analyzeHeadless (excellent) | idat (limité) | binaryninja.MainThreadAction | Natif (CLI-first) |
| Déobfuscation | P-Code simplifié, bon CFG | microcode, le plus mature | BNIL, SSA, bon pour CFG | ESIL, basique mais extensible |
| Intégration pipeline | Excellente (Ghidrathon) | Bonne (IDAPython batch) | Très bonne (API Python) | Excellente (r2pipe) |
| Forces pour déobfuscation | P-Code normalise le code, décompilateur gratuit robuste | Microcode et Hex-Rays le plus puissant, écosystème de plugins | IL multi-niveaux (LLIL, MLIL, HLIL), API très propre | Léger, rapide, excellent pour l'automatisation CLI |
9.4 radare2/rizin : automatisation CLI
radare2 (et son fork rizin) excelle dans l'automatisation en ligne de commande et l'intégration dans des scripts shell. Voici un exemple d'analyse automatisée via r2pipe :
#!/usr/bin/env python3
"""
Analyse automatisée de malware obfusqué via r2pipe (radare2).
Extrait les chaînes, identifie les fonctions suspectes et
détecte les patterns d'obfuscation courants.
"""
import r2pipe
import json
from typing import Dict, List
def analyze_with_radare2(filepath: str) -> Dict:
"""Analyse complète d'un binaire avec radare2."""
r2 = r2pipe.open(filepath, flags=['-2']) # -2 = no stderr
r2.cmd('aaa') # Analyse complète
results = {
'info': json.loads(r2.cmd('ij')),
'sections': json.loads(r2.cmd('iSj')),
'imports': json.loads(r2.cmd('iij')),
'strings': [],
'suspicious_functions': [],
'crypto_constants': [],
'obfuscation_indicators': []
}
# Chercher les constantes cryptographiques
# (AES S-Box, RC4 init, etc.)
crypto_search = r2.cmd('/cr')
if crypto_search:
results['crypto_constants'] = crypto_search.strip().split('\n')
# Analyser l'entropie par section
for section in results['sections']:
name = section.get('name', '')
size = section.get('size', 0)
if size > 0:
paddr = section.get('paddr', 0)
entropy_cmd = f'ph entropy {size} @ {paddr}'
try:
entropy = float(r2.cmd(entropy_cmd).strip())
section['entropy'] = entropy
if entropy > 7.0:
results['obfuscation_indicators'].append({
'type': 'high_entropy_section',
'section': name,
'entropy': entropy
})
except (ValueError, TypeError):
pass
# Lister les fonctions et identifier les suspectes
functions = json.loads(r2.cmd('aflj'))
for func in functions:
fname = func.get('name', '')
fsize = func.get('size', 0)
nbbs = func.get('nbbs', 0) # Nombre de basic blocks
# Heuristiques de détection d'obfuscation
if nbbs > 50 and fsize < 500:
# Beaucoup de blocs pour une petite fonction = CFF probable
results['obfuscation_indicators'].append({
'type': 'possible_cff',
'function': fname,
'blocks': nbbs,
'size': fsize
})
# Fonctions avec noms suspects (anti-debug, crypto)
suspicious_keywords = [
'IsDebuggerPresent', 'NtQueryInformation',
'CheckRemoteDebugger', 'OutputDebugString',
'VirtualProtect', 'VirtualAlloc',
'Crypt', 'Encrypt', 'Decrypt'
]
for kw in suspicious_keywords:
if kw.lower() in fname.lower():
results['suspicious_functions'].append({
'name': fname,
'address': hex(func.get('offset', 0)),
'keyword': kw
})
# Extraire les chaînes intéressantes
strings_json = json.loads(r2.cmd('izj'))
for s in strings_json:
content = s.get('string', '')
if len(content) > 6:
results['strings'].append({
'value': content,
'address': hex(s.get('vaddr', 0)),
'section': s.get('section', ''),
'type': s.get('type', '')
})
r2.quit()
return results
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} ")
sys.exit(1)
results = analyze_with_radare2(sys.argv[1])
print(json.dumps(results, indent=2, default=str))
10. Conclusion : tendances émergentes et perspectives
10.1 L'obfuscation assistée par intelligence artificielle
L'intersection entre l'intelligence artificielle et l'obfuscation de malwares représente la prochaine frontière majeure. Les travaux de recherche récents démontrent l'utilisation de réseaux adversariaux génératifs (GAN) et de modèles de langage (LLM) pour générer des variantes de malwares capables d'échapper aux classifieurs basés sur le machine learning. Des preuves de concept comme DeepLocker (IBM Research) et MalGAN illustrent le potentiel de ces approches.
Les tendances observées incluent :
- Génération de code mort sémantiquement plausible : au lieu d'insérer des NOP ou des opérations trivialement détectables, les modèles d'IA génèrent du code mort qui ressemble à du code fonctionnel légitime, rendant la détection par analyse de patterns beaucoup plus difficile
- Optimisation adversariale des mutations : utilisation de techniques de reinforcement learning pour trouver les transformations d'obfuscation qui maximisent l'évasion des moteurs de détection tout en minimisant l'impact sur les performances
- Obfuscation contextuelle : adaptation dynamique des techniques d'obfuscation en fonction de l'environnement cible détecté (type d'EDR, version de l'OS, présence de sandbox)
En réponse, la communauté défensive développe des approches de déobfuscation assistée par IA, notamment l'utilisation de modèles de type transformer pour la prédiction de la sémantique de code obfusqué, et l'application de techniques de program synthesis pour la reconstruction automatique du code original à partir de traces d'exécution symbolique.
10.2 WebAssembly : le nouveau vecteur d'obfuscation
WebAssembly (Wasm) émerge comme un vecteur d'obfuscation particulièrement préoccupant. Initialement conçu pour l'exécution de code performant dans les navigateurs web, Wasm est de plus en plus utilisé comme couche d'obfuscation hors navigateur :
- Cryptominers Wasm : des malwares comme CoinHive (aujourd'hui disparu) et ses successeurs utilisent Wasm pour du minage de cryptomonnaies directement dans le navigateur, rendant l'analyse statique des pages web compromise plus complexe
- Wasm comme packer universel : la compilation de code C/C++ malveillant vers Wasm puis son exécution via des runtimes comme Wasmer ou Wasmtime crée une couche d'abstraction supplémentaire que les outils d'analyse PE/ELF traditionnels ne gèrent pas nativement
- Obfuscation du bytecode Wasm : les mêmes techniques d'obfuscation (CFF, prédicats opaques, MBA) sont applicables au bytecode Wasm, mais l'outillage d'analyse est beaucoup moins mature que pour x86/ARM
Les outils de déobfuscation Wasm comme wasm-decompile (de l'outil wabt), JEB (PNF Software), et les modules Wasm de Ghidra progressent rapidement mais restent en retard par rapport aux outils x86 matures.
10.3 Recommandations pour les analystes
Face à l'évolution constante des techniques d'obfuscation, les analystes de malwares doivent adopter une approche combinant :
- Maîtrise des fondamentaux : la compréhension profonde des architectures CPU (x86, ARM), des formats binaires (PE, ELF, Mach-O) et des systèmes d'exploitation reste indispensable. Aucun outil automatisé ne remplace cette expertise
- Automatisation intelligente : développer et maintenir des pipelines de triage combinant YARA, analyse PE, exécution symbolique et décompilation pour traiter le volume croissant d'échantillons
- Veille technique continue : suivre les publications de conférences comme REcon, Black Hat, DEF CON, SSTIC, et les travaux de recherche sur l'obfuscation (IEEE S&P, USENIX Security, CCS)
- Collaboration communautaire : partager les signatures YARA, les scripts de déobfuscation et les IOCs via des plateformes comme MISP, VirusTotal, MalwareBazaar et les ISACs sectoriels
- Formation continue : la déobfuscation est un domaine qui évolue aussi vite que les techniques offensives. L'investissement dans la formation (certifications GREM, FOR610, cours OpenSecurityTraining) est un impératif opérationnel
La course entre obfuscation et déobfuscation est fondamentalement asymétrique : l'attaquant n'a besoin que d'une seule technique fonctionnelle pour échapper à la détection, tandis que le défenseur doit couvrir l'ensemble de l'espace des transformations possibles. C'est pourquoi l'approche la plus résiliente combine analyse statique, analyse dynamique, exécution symbolique et intelligence sur les menaces en une stratégie de défense en profondeur.
Ressources recommandées
- Practical Malware Analysis - Sikorski & Honig (No Starch Press)
- The IDA Pro Book - Chris Eagle (No Starch Press)
- Ghidra Software Reverse Engineering for Beginners - A. Kabir
- angr documentation - docs.angr.io
- Triton documentation - triton-library.github.io
- YARA documentation - yara.readthedocs.io
- OALabs (YouTube) - Tutoriels pratiques de déobfuscation
- MalwareUnicorn RE101/RE102 - Cours gratuits de reverse engineering
Besoin d'un accompagnement expert ?
Nos consultants en cybersécurité et IA vous accompagnent dans vos projets. Devis personnalisé sous 24h.