initial aubox skeleton: web-UI, kirin DLOAD, firmware library
- FastAPI Web-UI auf 127.0.0.1:8080 mit Geräte-Live-Erkennung, sandboxed File-Browser, Firmware-Library (SQLite + Auto-Identifikation) - Huawei update.app Parser: extrahiert Hardware-ID, Section-Layout, BOOT/SYSTEM-Vorhandensein direkt aus den Headern - Kirin Download-Mode: hisi-idt-Protokoll-Implementation gegen pyusb - USB-Erkennung für Huawei (DLOAD/Fastboot-D), Google, MediaTek, Qualcomm EDL - Huawei-P10-Lite-Workflow (eRecovery + Testpoint-DLOAD-Pfade) - Docker-Compose mit USB-Passthrough (Major 189) für Re-Enumeration - udev-Regeln + Setup-Script für Debian/Ubuntu/Pi-OS Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,9 @@
|
||||
"""Firmware-Library: SQLite-Index + Format-Identifikation.
|
||||
|
||||
Öffentliche API:
|
||||
db.connect(path) -> sqlite3.Connection
|
||||
db.upsert_firmware(conn, record)
|
||||
db.list_firmware(conn, ...)
|
||||
identify.identify(path) -> FirmwareInfo | None
|
||||
"""
|
||||
from . import db, identify # noqa: F401
|
||||
@@ -0,0 +1,94 @@
|
||||
"""SQLite-Index der Firmware-Library.
|
||||
|
||||
Die DB-Datei wandert standardmäßig nach `<firmware_root>/firmware.db`,
|
||||
damit sie zur Library gehört und beim Mounten automatisch da ist.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS firmware (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
rel_path TEXT NOT NULL UNIQUE, -- relativ zum firmware-Root
|
||||
size INTEGER NOT NULL,
|
||||
mtime REAL NOT NULL,
|
||||
sha256 TEXT, -- erst nach erstem Hash gefüllt
|
||||
format TEXT, -- 'huawei-update.app', 'mtk-scatter', 'unknown', ...
|
||||
vendor TEXT,
|
||||
model TEXT, -- z.B. 'WAS-LX1'
|
||||
soc TEXT, -- z.B. 'kirin-658'
|
||||
version TEXT, -- z.B. '8.0.0.367'
|
||||
region TEXT, -- z.B. 'C432'
|
||||
extra_json TEXT, -- alles weitere als JSON
|
||||
added_at REAL DEFAULT (strftime('%s','now')),
|
||||
last_seen_at REAL DEFAULT (strftime('%s','now'))
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_fw_model ON firmware(model);
|
||||
CREATE INDEX IF NOT EXISTS idx_fw_format ON firmware(format);
|
||||
"""
|
||||
|
||||
|
||||
def connect(db_path: Path) -> sqlite3.Connection:
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.executescript(SCHEMA)
|
||||
return conn
|
||||
|
||||
|
||||
@contextmanager
|
||||
def open_db(db_path: Path) -> Iterator[sqlite3.Connection]:
|
||||
"""Connection-Lifecycle für FastAPI-Handler: öffnet, gibt zurück, schließt."""
|
||||
conn = connect(db_path)
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def transaction(conn: sqlite3.Connection) -> Iterator[sqlite3.Connection]:
|
||||
try:
|
||||
yield conn
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
|
||||
|
||||
def upsert(conn: sqlite3.Connection, rec: dict) -> int:
|
||||
"""Einfügen oder bei Konflikt auf rel_path aktualisieren."""
|
||||
cols = ["rel_path", "size", "mtime", "sha256", "format",
|
||||
"vendor", "model", "soc", "version", "region", "extra_json"]
|
||||
placeholders = ",".join(f":{c}" for c in cols)
|
||||
update = ",".join(f"{c}=excluded.{c}" for c in cols if c != "rel_path")
|
||||
sql = (f"INSERT INTO firmware ({','.join(cols)}) VALUES ({placeholders}) "
|
||||
f"ON CONFLICT(rel_path) DO UPDATE SET {update}, "
|
||||
f"last_seen_at=strftime('%s','now')")
|
||||
cur = conn.execute(sql, {c: rec.get(c) for c in cols})
|
||||
return cur.lastrowid or 0
|
||||
|
||||
|
||||
def list_all(conn: sqlite3.Connection) -> list[sqlite3.Row]:
|
||||
return list(conn.execute(
|
||||
"SELECT * FROM firmware ORDER BY vendor, model, version"
|
||||
))
|
||||
|
||||
|
||||
def get_by_id(conn: sqlite3.Connection, fw_id: int) -> sqlite3.Row | None:
|
||||
cur = conn.execute("SELECT * FROM firmware WHERE id = ?", (fw_id,))
|
||||
return cur.fetchone()
|
||||
|
||||
|
||||
def delete_missing(conn: sqlite3.Connection, present_rel_paths: set[str]) -> int:
|
||||
"""Einträge entfernen, die beim letzten Scan nicht mehr da waren."""
|
||||
cur = conn.execute("SELECT id, rel_path FROM firmware")
|
||||
to_delete = [row["id"] for row in cur if row["rel_path"] not in present_rel_paths]
|
||||
if to_delete:
|
||||
conn.executemany("DELETE FROM firmware WHERE id = ?",
|
||||
[(i,) for i in to_delete])
|
||||
return len(to_delete)
|
||||
@@ -0,0 +1,148 @@
|
||||
"""Parser für Huawei `update.app`-Container.
|
||||
|
||||
Format-Beschreibung (öffentlich aus splituapp / Huawei Update Extractor):
|
||||
|
||||
Datei beginnt mit 0x5C Bytes Padding/Header, danach folgen Section-Header.
|
||||
Jeder Section-Header ist 98 Bytes:
|
||||
|
||||
uint32 magic = 0xA55AAA55
|
||||
uint32 header_length = 98
|
||||
uint32 unknown1
|
||||
char[8] hardware_id z.B. "WAS-LX1"
|
||||
uint32 file_sequence
|
||||
uint32 file_size
|
||||
char[16] file_date z.B. "2018.04.16"
|
||||
char[16] file_time z.B. "10:23:45"
|
||||
char[16] file_type z.B. "BOOT", "SYSTEM", "USERDATA", "CRC"
|
||||
char[16] blank1
|
||||
uint16 header_checksum
|
||||
uint16 block_size
|
||||
uint16 blank2
|
||||
|
||||
Danach `file_size` Bytes Section-Daten, gefolgt von einer CRC-Tabelle
|
||||
(eine uint16 pro `block_size` Bytes Daten), dann 4-Byte-Alignment auf
|
||||
die nächste Section-Header-Position.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import struct
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
UPDATE_MAGIC = 0xA55AAA55
|
||||
HEADER_FMT = "<III8sII16s16s16s16sHHH"
|
||||
HEADER_SIZE = 98
|
||||
|
||||
|
||||
@dataclass
|
||||
class Section:
|
||||
sequence: int
|
||||
size: int
|
||||
date: str
|
||||
time: str
|
||||
type: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class UpdateAppInfo:
|
||||
hardware_id: str
|
||||
section_count: int
|
||||
sections: list[Section] = field(default_factory=list)
|
||||
has_boot: bool = False
|
||||
has_system: bool = False
|
||||
earliest_date: str | None = None
|
||||
|
||||
|
||||
def _cstr(b: bytes) -> str:
|
||||
return b.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
|
||||
|
||||
|
||||
def parse(path: Path, max_sections: int = 500) -> UpdateAppInfo | None:
|
||||
"""Header der ersten N Sections einlesen, ohne das ganze Image zu lesen."""
|
||||
if not path.is_file():
|
||||
return None
|
||||
if path.stat().st_size < 0x100:
|
||||
return None
|
||||
|
||||
with path.open("rb") as f:
|
||||
# Magic suchen — sitzt meist bei 0x5C, manche Builds variieren
|
||||
head = f.read(0x200)
|
||||
idx = head.find(struct.pack("<I", UPDATE_MAGIC))
|
||||
if idx < 0:
|
||||
return None
|
||||
f.seek(idx)
|
||||
|
||||
sections: list[Section] = []
|
||||
hardware_id = ""
|
||||
earliest: str | None = None
|
||||
has_boot = has_system = False
|
||||
|
||||
for _ in range(max_sections):
|
||||
buf = f.read(HEADER_SIZE)
|
||||
if len(buf) < HEADER_SIZE:
|
||||
break
|
||||
(magic, _hlen, _u, hw, seq, size,
|
||||
date, time_, ftype, _b, _crc, blksz, _b2) = struct.unpack(HEADER_FMT, buf)
|
||||
if magic != UPDATE_MAGIC:
|
||||
break
|
||||
|
||||
hw_str = _cstr(hw)
|
||||
type_str = _cstr(ftype)
|
||||
date_str = _cstr(date)
|
||||
time_str = _cstr(time_)
|
||||
|
||||
if not hardware_id and hw_str:
|
||||
hardware_id = hw_str
|
||||
if earliest is None or (date_str and date_str < earliest):
|
||||
if date_str:
|
||||
earliest = date_str
|
||||
if type_str.upper() in ("BOOT", "FASTBOOT", "XLOADER", "BOOTLOADER"):
|
||||
has_boot = True
|
||||
if type_str.upper() in ("SYSTEM", "SYSTEM_IMAGE"):
|
||||
has_system = True
|
||||
|
||||
sections.append(Section(
|
||||
sequence=seq, size=size, date=date_str,
|
||||
time=time_str, type=type_str,
|
||||
))
|
||||
|
||||
# Daten + CRC + Padding überspringen
|
||||
if blksz <= 0:
|
||||
break
|
||||
crc_size = ((size + blksz - 1) // blksz) * 2
|
||||
cur_after_data = f.tell() + size + crc_size
|
||||
pad = (-cur_after_data) % 4
|
||||
f.seek(size + crc_size + pad, 1)
|
||||
|
||||
if not sections:
|
||||
return None
|
||||
return UpdateAppInfo(
|
||||
hardware_id=hardware_id,
|
||||
section_count=len(sections),
|
||||
sections=sections,
|
||||
has_boot=has_boot,
|
||||
has_system=has_system,
|
||||
earliest_date=earliest,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Modell -> SoC-Mapping. Wird mit der Zeit erweitert; was hier nicht steht,
|
||||
# kommt als 'unknown' raus und der User kann es manuell pflegen.
|
||||
HUAWEI_MODEL_SOC = {
|
||||
"WAS-LX1": "kirin-658",
|
||||
"WAS-LX2": "kirin-658",
|
||||
"WAS-LX3": "kirin-658",
|
||||
"PRA-LX1": "kirin-655", # P8 Lite 2017
|
||||
"VTR-L09": "kirin-960", # P10
|
||||
"VTR-L29": "kirin-960",
|
||||
"VKY-L09": "kirin-960", # P10 Plus
|
||||
"EML-L09": "kirin-970", # P20
|
||||
"EML-L29": "kirin-970",
|
||||
"CLT-L29": "kirin-970", # P20 Pro
|
||||
"ANE-LX1": "kirin-710", # P20 Lite
|
||||
}
|
||||
|
||||
|
||||
def soc_for(hardware_id: str) -> str | None:
|
||||
return HUAWEI_MODEL_SOC.get(hardware_id.upper())
|
||||
@@ -0,0 +1,92 @@
|
||||
"""Format-Erkennung. Dispatcher: schaut auf Magic + Dateinamen, leitet an
|
||||
den passenden Parser weiter. Liefert ein einheitliches Dict mit den Feldern,
|
||||
die in die DB gehen.
|
||||
|
||||
Aktuell unterstützt:
|
||||
- Huawei update.app (vollständig)
|
||||
|
||||
Geplant (Stubs als Hinweis):
|
||||
- MediaTek scatter (txt + bin)
|
||||
- Samsung Odin (.tar.md5 mit AP/BL/CP/CSC)
|
||||
- Qualcomm rawprogram*.xml
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import struct
|
||||
from pathlib import Path
|
||||
|
||||
from . import huawei
|
||||
|
||||
HUAWEI_MAGIC = struct.pack("<I", huawei.UPDATE_MAGIC)
|
||||
|
||||
|
||||
def _peek(path: Path, n: int = 0x200) -> bytes:
|
||||
try:
|
||||
with path.open("rb") as f:
|
||||
return f.read(n)
|
||||
except OSError:
|
||||
return b""
|
||||
|
||||
|
||||
def identify(path: Path, root: Path) -> dict:
|
||||
"""Rückgabe: dict für db.upsert. Enthält mindestens rel_path, size, mtime,
|
||||
format. Erkennungs-Misserfolg -> format='unknown'."""
|
||||
rel = str(path.resolve().relative_to(root.resolve())).replace("\\", "/")
|
||||
stat = path.stat()
|
||||
base: dict = {
|
||||
"rel_path": rel,
|
||||
"size": stat.st_size,
|
||||
"mtime": stat.st_mtime,
|
||||
"sha256": None,
|
||||
"format": "unknown",
|
||||
"vendor": None,
|
||||
"model": None,
|
||||
"soc": None,
|
||||
"version": None,
|
||||
"region": None,
|
||||
"extra_json": None,
|
||||
}
|
||||
|
||||
head = _peek(path)
|
||||
|
||||
# Huawei update.app — Magic taucht innerhalb der ersten 0x100 Bytes auf
|
||||
if HUAWEI_MAGIC in head[:0x200]:
|
||||
info = huawei.parse(path)
|
||||
if info is not None:
|
||||
base.update(
|
||||
format="huawei-update.app",
|
||||
vendor="Huawei",
|
||||
model=info.hardware_id or None,
|
||||
soc=huawei.soc_for(info.hardware_id) if info.hardware_id else None,
|
||||
version=info.earliest_date, # bessere Quelle wenn verfügbar
|
||||
region=_region_from_filename(path.name),
|
||||
extra_json=json.dumps({
|
||||
"section_count": info.section_count,
|
||||
"has_boot": info.has_boot,
|
||||
"has_system": info.has_system,
|
||||
"first_sections": [
|
||||
s.type for s in info.sections[:10]
|
||||
],
|
||||
}),
|
||||
)
|
||||
return base
|
||||
|
||||
# Fallback: Filename-Heuristik (MTK scatter etc. — TODO)
|
||||
name = path.name.lower()
|
||||
if name.startswith("mt") and name.endswith(".txt"):
|
||||
base["format"] = "mtk-scatter-candidate"
|
||||
elif name.endswith(".tar.md5"):
|
||||
base["format"] = "samsung-odin-candidate"
|
||||
base["vendor"] = "Samsung"
|
||||
|
||||
return base
|
||||
|
||||
|
||||
def _region_from_filename(name: str) -> str | None:
|
||||
"""Huawei-Region oft im Filename: '...C432B198...' -> 'C432'."""
|
||||
upper = name.upper()
|
||||
for marker in ("C432", "C636", "C185", "C233", "C605", "C10", "C461"):
|
||||
if marker in upper:
|
||||
return marker
|
||||
return None
|
||||
Reference in New Issue
Block a user