#!/usr/bin/env python3 """Docker Backup Web UI""" import json import os import secrets import sys import tarfile import threading import time import uuid from datetime import datetime from functools import wraps from pathlib import Path import docker from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from docker.errors import NotFound, APIError from flask import Flask, Response, jsonify, render_template, request, session, stream_with_context from werkzeug.security import check_password_hash, generate_password_hash sys.path.insert(0, str(Path(__file__).parent.parent)) app = Flask(__name__) # ── Secret key (stable across restarts) ────────────────────────────────────── _KEY_FILE = Path(__file__).parent / ".secret_key" if _KEY_FILE.exists(): app.secret_key = _KEY_FILE.read_text().strip() else: _key = secrets.token_hex(32) _KEY_FILE.write_text(_key) app.secret_key = _key CONFIG_FILE = Path(__file__).parent / "config.json" DEFAULT_CONFIG = { "hosts": ["local"], "backup_dir": str(Path(__file__).parent / "backups"), "auth": {}, "schedules": [], } jobs: dict[str, dict] = {} jobs_lock = threading.Lock() CRON_ALIASES = { "hourly": "0 * * * *", "daily": "0 2 * * *", "weekly": "0 2 * * 0", "monthly": "0 2 1 * *", } # ── Config ──────────────────────────────────────────────────────────────────── def load_config() -> dict: if CONFIG_FILE.exists(): data = json.loads(CONFIG_FILE.read_text()) # Back-fill missing keys for k, v in DEFAULT_CONFIG.items(): data.setdefault(k, v) return data return DEFAULT_CONFIG.copy() def save_config(cfg: dict): CONFIG_FILE.write_text(json.dumps(cfg, indent=2)) # ── Auth ────────────────────────────────────────────────────────────────────── def auth_enabled() -> bool: return bool(load_config().get("auth", {}).get("password_hash")) def login_required(f): @wraps(f) def decorated(*args, **kwargs): if auth_enabled() and not session.get("authenticated"): if request.is_json or request.path.startswith("/api/"): return jsonify({"error": "unauthorized"}), 401 return jsonify({"error": "unauthorized"}), 401 return f(*args, **kwargs) return decorated @app.route("/api/auth/status") def api_auth_status(): return jsonify({ "enabled": auth_enabled(), "authenticated": session.get("authenticated", False), "username": load_config().get("auth", {}).get("username", ""), }) @app.route("/api/auth/login", methods=["POST"]) def api_auth_login(): data = request.json or {} cfg = load_config() auth = cfg.get("auth", {}) if not auth.get("password_hash"): return jsonify({"error": "Auth not configured"}), 400 if data.get("username") != auth.get("username"): return jsonify({"error": "Invalid credentials"}), 401 if not check_password_hash(auth["password_hash"], data.get("password", "")): return jsonify({"error": "Invalid credentials"}), 401 session["authenticated"] = True session.permanent = True return jsonify({"ok": True}) @app.route("/api/auth/logout", methods=["POST"]) def api_auth_logout(): session.clear() return jsonify({"ok": True}) @app.route("/api/auth/setup", methods=["POST"]) def api_auth_setup(): cfg = load_config() auth = cfg.get("auth", {}) data = request.json or {} # If auth already configured, require current password if auth.get("password_hash"): if not session.get("authenticated"): return jsonify({"error": "unauthorized"}), 401 username = data.get("username", "").strip() password = data.get("password", "") if not username or not password: return jsonify({"error": "username and password required"}), 400 if len(password) < 6: return jsonify({"error": "password must be at least 6 characters"}), 400 cfg["auth"] = { "username": username, "password_hash": generate_password_hash(password), } save_config(cfg) session["authenticated"] = True return jsonify({"ok": True}) @app.route("/api/auth/disable", methods=["POST"]) @login_required def api_auth_disable(): cfg = load_config() cfg["auth"] = {} save_config(cfg) session.clear() return jsonify({"ok": True}) # ── Job helpers ─────────────────────────────────────────────────────────────── def new_job(label: str) -> str: jid = str(uuid.uuid4())[:8] with jobs_lock: jobs[jid] = {"id": jid, "label": label, "status": "running", "logs": [], "created": datetime.now().isoformat()} return jid def job_log(jid: str, msg: str): with jobs_lock: if jid in jobs: jobs[jid]["logs"].append(msg) def job_done(jid: str, error: str | None = None): with jobs_lock: if jid in jobs: jobs[jid]["status"] = "error" if error else "done" if error: jobs[jid]["logs"].append(f"ERROR: {error}") jobs[jid]["logs"].append("[DONE]") # ── Docker client helper ────────────────────────────────────────────────────── def make_client(host: str) -> docker.DockerClient: if host in ("local", ""): return docker.from_env() url = host if "://" in host else f"tcp://{host}" return docker.DockerClient(base_url=url, use_ssh_client=host.startswith("ssh://")) # ── Retention ───────────────────────────────────────────────────────────────── def apply_retention(backup_dir: Path, container_name: str, keep_count: int | None, keep_days: int | None) -> int: safe = container_name.lstrip("/").replace("/", "_") files = sorted(backup_dir.glob(f"{safe}_*.tar"), key=lambda x: x.stat().st_mtime, reverse=True) to_delete: set[Path] = set() if keep_days and keep_days > 0: cutoff = time.time() - keep_days * 86400 to_delete.update(f for f in files if f.stat().st_mtime < cutoff) if keep_count and keep_count > 0: to_delete.update(files[keep_count:]) for f in to_delete: try: f.unlink() except Exception: pass return len(to_delete) # ── Backup worker ───────────────────────────────────────────────────────────── def _run_backup(jid: str, host: str, container_name: str, backup_dir: str, save_image: bool, pre_hook: str = "", retention_count: int | None = None, retention_days: int | None = None): from io import BytesIO try: client = make_client(host) client.ping() job_log(jid, f"Connected to {host}") try: container = client.containers.get(container_name) except NotFound: job_done(jid, f"Container not found: {container_name}") return # Pre-hook if pre_hook and pre_hook.strip(): job_log(jid, f"Running pre-hook: {pre_hook}") try: result = container.exec_run(["sh", "-c", pre_hook], stdout=True, stderr=True) output = result.output.decode(errors="replace").strip() if result.exit_code != 0: job_log(jid, f"⚠ Pre-hook exited {result.exit_code}: {output}") else: job_log(jid, f"✓ Pre-hook OK{': ' + output[:200] if output else ''}") except Exception as e: job_log(jid, f"⚠ Pre-hook failed: {e}") ts = datetime.now().strftime("%Y%m%d_%H%M%S") safe_name = container.name.lstrip("/").replace("/", "_") out_file = Path(backup_dir) / f"{safe_name}_{ts}.tar" Path(backup_dir).mkdir(parents=True, exist_ok=True) inspect = client.api.inspect_container(container.id) job_log(jid, f"Backing up: {container.name}") with tarfile.open(out_file, "w") as tar: config_bytes = json.dumps(inspect, indent=2).encode() info = tarfile.TarInfo(name="config.json") info.size = len(config_bytes) info.mtime = int(time.time()) tar.addfile(info, BytesIO(config_bytes)) job_log(jid, "✓ config.json") for mount in inspect.get("Mounts", []): mtype = mount.get("Type", "") if mtype not in ("volume", "bind"): continue dest = mount.get("Destination", "") if mtype == "volume": label = mount.get("Name") or dest.replace("/", "_") else: label = mount.get("Source", "").replace("/", "_").lstrip("_") archive_name = f"volumes/{label}.tar" job_log(jid, f"Archiving volume: {label} ({dest})") try: stream, _ = client.api.get_archive(container.id, dest) vol_data = b"".join(stream) vi = tarfile.TarInfo(name=archive_name) vi.size = len(vol_data) vi.mtime = int(time.time()) tar.addfile(vi, BytesIO(vol_data)) job_log(jid, f"✓ {archive_name} ({len(vol_data)//1024} KB)") except Exception as e: job_log(jid, f"⚠ Could not archive {label}: {e}") if save_image: image_tag = inspect["Config"].get("Image", "") job_log(jid, f"Saving image: {image_tag}…") try: img_data = b"".join(client.images.get(image_tag).save()) ii = tarfile.TarInfo(name="image.tar") ii.size = len(img_data) ii.mtime = int(time.time()) tar.addfile(ii, BytesIO(img_data)) job_log(jid, f"✓ image.tar ({len(img_data)//1024//1024} MB)") except Exception as e: job_log(jid, f"⚠ Image save failed: {e}") job_log(jid, f"✓ Saved: {out_file.name}") # Retention if retention_count or retention_days: deleted = apply_retention(Path(backup_dir), container.name, retention_count, retention_days) if deleted: job_log(jid, f"Retention: removed {deleted} old backup(s)") job_done(jid) except Exception as e: job_done(jid, str(e)) # ── Restore worker ──────────────────────────────────────────────────────────── def _run_restore(jid: str, host: str, backup_path: str, new_name: str | None, start: bool, load_image: bool): from io import BytesIO try: client = make_client(host) client.ping() job_log(jid, f"Connected to {host}") bp = Path(backup_path) if not bp.exists(): job_done(jid, f"File not found: {backup_path}") return with tarfile.open(bp, "r") as tar: config = json.loads(tar.extractfile("config.json").read()) orig_name = config["Name"].lstrip("/") container_name = new_name or orig_name image_name = config["Config"]["Image"] job_log(jid, f"Restoring '{orig_name}' → '{container_name}' (image: {image_name})") if load_image: with tarfile.open(bp, "r") as tar: try: img_data = tar.extractfile("image.tar").read() job_log(jid, "Loading image from backup…") client.images.load(img_data) job_log(jid, "✓ Image loaded") except KeyError: job_log(jid, "⚠ No image.tar in backup — will pull") try: client.images.get(image_name) job_log(jid, f"✓ Image {image_name} present") except NotFound: job_log(jid, f"Pulling {image_name}…") client.images.pull(image_name) job_log(jid, f"✓ Pulled {image_name}") try: existing = client.containers.get(container_name) job_log(jid, f"Removing existing container {container_name}") existing.remove(force=True) except NotFound: pass mounts = config.get("Mounts", []) vol_data_map: dict[str, bytes] = {} with tarfile.open(bp, "r") as tar: for m in tar.getmembers(): if m.name.startswith("volumes/") and m.name.endswith(".tar"): label = m.name[len("volumes/"):-len(".tar")] vol_data_map[label] = tar.extractfile(m).read() for mount in mounts: if mount.get("Type") != "volume": continue vol_name = mount.get("Name", "") dest = mount.get("Destination", "") label = vol_name or dest.replace("/", "_") try: client.volumes.get(vol_name) job_log(jid, f"Volume {vol_name} exists") except NotFound: client.volumes.create(name=vol_name) job_log(jid, f"✓ Created volume {vol_name}") if label in vol_data_map: job_log(jid, f"Restoring volume data: {vol_name}") try: rc = client.containers.run( "alpine", command="sh -c 'cd /t && tar xf /tmp/vol.tar --strip-components=1'", volumes={vol_name: {"bind": "/t", "mode": "rw"}}, detach=True, remove=False, ) client.api.put_archive(rc.id, "/tmp", vol_data_map[label]) rc.wait() rc.remove() job_log(jid, f"✓ Volume restored: {vol_name}") except Exception as e: job_log(jid, f"⚠ Volume restore failed {vol_name}: {e}") cfg_c = config["Config"] hcfg = config["HostConfig"] kwargs: dict = { "image": image_name, "name": container_name, "command": cfg_c.get("Cmd"), "entrypoint": cfg_c.get("Entrypoint"), "environment": cfg_c.get("Env") or [], "working_dir": cfg_c.get("WorkingDir") or None, "user": cfg_c.get("User") or None, "hostname": cfg_c.get("Hostname") or None, "labels": cfg_c.get("Labels") or {}, "tty": cfg_c.get("Tty", False), "stdin_open": cfg_c.get("OpenStdin", False), } exposed = cfg_c.get("ExposedPorts") or {} port_bindings = hcfg.get("PortBindings") or {} if exposed or port_bindings: ports = {} for pp in exposed: binds = port_bindings.get(pp) ports[pp] = [b["HostPort"] for b in binds if b.get("HostPort")] if binds else None kwargs["ports"] = ports restart = hcfg.get("RestartPolicy", {}) if restart.get("Name"): kwargs["restart_policy"] = {"Name": restart["Name"], "MaximumRetryCount": restart.get("MaximumRetryCount", 0)} net_mode = hcfg.get("NetworkMode", "default") if net_mode not in ("default", "bridge"): kwargs["network_mode"] = net_mode vol_binds = [] for m in mounts: dest = m.get("Destination", "") mode = "rw" if m.get("RW") is not False else "ro" if m.get("Type") == "volume": vol_binds.append(f"{m['Name']}:{dest}:{mode}") elif m.get("Type") == "bind": vol_binds.append(f"{m['Source']}:{dest}:{mode}") if vol_binds: kwargs["volumes"] = vol_binds if hcfg.get("Privileged"): kwargs["privileged"] = True if hcfg.get("CapAdd"): kwargs["cap_add"] = hcfg["CapAdd"] ctr = client.containers.create(**{k: v for k, v in kwargs.items() if v is not None}) job_log(jid, f"✓ Container created: {ctr.name} ({ctr.short_id})") if start: ctr.start() job_log(jid, "✓ Container started") job_done(jid) except Exception as e: job_done(jid, str(e)) # ── Scheduler ───────────────────────────────────────────────────────────────── scheduler = BackgroundScheduler(timezone="UTC") scheduler.start() def run_scheduled_backup(schedule_id: str): cfg = load_config() sched = next((s for s in cfg.get("schedules", []) if s["id"] == schedule_id), None) if not sched or not sched.get("enabled", True): return backup_dir = cfg["backup_dir"] host = sched["host"] container = sched["container"] save_image = sched.get("save_image", False) pre_hook = sched.get("pre_hook", "") r_count = sched.get("retention_count") or None r_days = sched.get("retention_days") or None jid = new_job(f"[Scheduled] {container} on {host}") def worker(): _run_backup(jid, host, container, backup_dir, save_image, pre_hook=pre_hook, retention_count=r_count, retention_days=r_days) status = jobs.get(jid, {}).get("status", "unknown") cfg2 = load_config() for s in cfg2.get("schedules", []): if s["id"] == schedule_id: s["last_run"] = datetime.now().isoformat() s["last_status"] = status break save_config(cfg2) threading.Thread(target=worker, daemon=True).start() return jid def _cron_kwargs(expr: str) -> dict: expr = CRON_ALIASES.get(expr.lower(), expr) parts = expr.split() if len(parts) != 5: raise ValueError(f"Invalid cron: {expr}") minute, hour, day, month, dow = parts return dict(minute=minute, hour=hour, day=day, month=month, day_of_week=dow) def _register_schedule(sched: dict): try: kwargs = _cron_kwargs(sched["cron"]) scheduler.add_job(run_scheduled_backup, CronTrigger(**kwargs), id=sched["id"], args=[sched["id"]], replace_existing=True) except Exception as e: print(f"Warning: schedule {sched['id']} not registered: {e}") def _unregister_schedule(schedule_id: str): try: scheduler.remove_job(schedule_id) except Exception: pass # Load existing schedules on startup for _s in load_config().get("schedules", []): if _s.get("enabled", True): _register_schedule(_s) # ── Routes: main page ───────────────────────────────────────────────────────── @app.route("/") def index(): return render_template("index.html") # ── Routes: config ──────────────────────────────────────────────────────────── @app.route("/api/config", methods=["GET", "POST"]) @login_required def api_config(): if request.method == "POST": cfg = load_config() data = request.json or {} if "backup_dir" in data: cfg["backup_dir"] = data["backup_dir"] save_config(cfg) return jsonify(cfg) cfg = load_config() cfg.pop("auth", None) # never send password hash to browser return jsonify(cfg) # ── Routes: hosts ───────────────────────────────────────────────────────────── @app.route("/api/hosts") @login_required def api_hosts_list(): cfg = load_config() result = [] for host in cfg["hosts"]: try: c = make_client(host) c.ping() info = c.info() result.append({"host": host, "ok": True, "name": info.get("Name", ""), "version": info.get("ServerVersion", ""), "containers": info.get("Containers", 0)}) except Exception as e: result.append({"host": host, "ok": False, "error": str(e)}) return jsonify(result) @app.route("/api/hosts", methods=["POST"]) @login_required def api_hosts_add(): host = (request.json or {}).get("host", "").strip() if not host: return jsonify({"error": "host required"}), 400 cfg = load_config() if host not in cfg["hosts"]: cfg["hosts"].append(host) save_config(cfg) return jsonify(cfg) @app.route("/api/hosts/", methods=["DELETE"]) @login_required def api_hosts_delete(host): cfg = load_config() cfg["hosts"] = [h for h in cfg["hosts"] if h != host] save_config(cfg) return jsonify(cfg) # ── Routes: containers ──────────────────────────────────────────────────────── @app.route("/api/containers") @login_required def api_containers(): cfg = load_config() show_all = request.args.get("all", "false").lower() == "true" host_filter = request.args.get("host") hosts = [host_filter] if host_filter else cfg["hosts"] result = [] for host in hosts: try: client = make_client(host) for c in client.containers.list(all=show_all): ports = {k: v[0]["HostPort"] if v else None for k, v in (c.ports or {}).items()} result.append({"host": host, "id": c.short_id, "name": c.name, "image": c.image.tags[0] if c.image.tags else c.image.short_id, "status": c.status, "ports": ports}) except Exception as e: result.append({"host": host, "id": None, "name": None, "image": None, "status": "error", "error": str(e)}) return jsonify(result) # ── Routes: backups ─────────────────────────────────────────────────────────── @app.route("/api/backups") @login_required def api_backups(): cfg = load_config() backup_dir = Path(cfg["backup_dir"]) if not backup_dir.exists(): return jsonify([]) result = [] for f in sorted(backup_dir.glob("*.tar"), key=lambda x: x.stat().st_mtime, reverse=True): stat = f.stat() try: with tarfile.open(f, "r") as tar: members = tar.getmembers() config = json.loads(tar.extractfile("config.json").read()) orig_name = config["Name"].lstrip("/") image = config["Config"]["Image"] has_image = any(m.name == "image.tar" for m in members) volumes = [m.name[len("volumes/"):-len(".tar")] for m in members if m.name.startswith("volumes/") and m.name.endswith(".tar")] except Exception: orig_name = f.stem image = "unknown" has_image = False volumes = [] result.append({ "file": f.name, "path": str(f), "size_mb": round(stat.st_size / 1024 / 1024, 1), "mtime": datetime.fromtimestamp(stat.st_mtime).isoformat(), "container": orig_name, "image": image, "has_image": has_image, "volumes": volumes, }) return jsonify(result) @app.route("/api/backups/", methods=["DELETE"]) @login_required def api_backup_delete(filename): cfg = load_config() path = Path(cfg["backup_dir"]) / filename if not path.exists(): return jsonify({"error": "not found"}), 404 path.unlink() return jsonify({"ok": True}) # ── Routes: backup / restore / bulk ────────────────────────────────────────── @app.route("/api/backup", methods=["POST"]) @login_required def api_backup_start(): data = request.json or {} host = data.get("host", "local") container = data.get("container") save_image = data.get("save_image", False) pre_hook = data.get("pre_hook", "") r_count = data.get("retention_count") or None r_days = data.get("retention_days") or None if not container: return jsonify({"error": "container required"}), 400 cfg = load_config() jid = new_job(f"Backup {container} on {host}") threading.Thread(target=_run_backup, args=(jid, host, container, cfg["backup_dir"], save_image, pre_hook, r_count, r_days), daemon=True).start() return jsonify({"job_id": jid}) @app.route("/api/bulk-backup", methods=["POST"]) @login_required def api_bulk_backup(): data = request.json or {} host = data.get("host", "local") save_image = data.get("save_image", False) pre_hook = data.get("pre_hook", "") try: client = make_client(host) client.ping() containers = client.containers.list(all=False) # running only except Exception as e: return jsonify({"error": str(e)}), 500 if not containers: return jsonify({"error": "No running containers on host"}), 400 cfg = load_config() job_ids = [] for c in containers: jid = new_job(f"Bulk: {c.name} on {host}") threading.Thread(target=_run_backup, args=(jid, host, c.name, cfg["backup_dir"], save_image, pre_hook), daemon=True).start() job_ids.append({"job_id": jid, "container": c.name}) return jsonify({"jobs": job_ids}) @app.route("/api/restore", methods=["POST"]) @login_required def api_restore_start(): data = request.json or {} host = data.get("host", "local") backup_path = data.get("backup_path") new_name = data.get("new_name") or None start = data.get("start", False) load_image = data.get("load_image", False) if not backup_path: return jsonify({"error": "backup_path required"}), 400 jid = new_job(f"Restore {Path(backup_path).name} → {host}") threading.Thread(target=_run_restore, args=(jid, host, backup_path, new_name, start, load_image), daemon=True).start() return jsonify({"job_id": jid}) # ── Routes: schedules ───────────────────────────────────────────────────────── @app.route("/api/schedules") @login_required def api_schedules_list(): cfg = load_config() schedules = cfg.get("schedules", []) # Attach next_run from scheduler for s in schedules: job = scheduler.get_job(s["id"]) s["next_run"] = job.next_run_time.isoformat() if (job and job.next_run_time) else None return jsonify(schedules) @app.route("/api/schedules", methods=["POST"]) @login_required def api_schedules_create(): data = request.json or {} required = ("host", "container", "cron") if not all(data.get(k) for k in required): return jsonify({"error": "host, container, cron required"}), 400 # Validate cron try: _cron_kwargs(data["cron"]) except ValueError as e: return jsonify({"error": str(e)}), 400 sched = { "id": str(uuid.uuid4())[:8], "host": data["host"], "container": data["container"], "cron": data["cron"], "pre_hook": data.get("pre_hook", ""), "save_image": data.get("save_image", False), "retention_count": data.get("retention_count") or None, "retention_days": data.get("retention_days") or None, "enabled": data.get("enabled", True), "last_run": None, "last_status": None, } cfg = load_config() cfg.setdefault("schedules", []).append(sched) save_config(cfg) if sched["enabled"]: _register_schedule(sched) return jsonify(sched), 201 @app.route("/api/schedules/", methods=["PUT"]) @login_required def api_schedules_update(sid): data = request.json or {} cfg = load_config() schedules = cfg.get("schedules", []) sched = next((s for s in schedules if s["id"] == sid), None) if not sched: return jsonify({"error": "not found"}), 404 updatable = ("host", "container", "cron", "pre_hook", "save_image", "retention_count", "retention_days", "enabled") for k in updatable: if k in data: sched[k] = data[k] if "cron" in data: try: _cron_kwargs(sched["cron"]) except ValueError as e: return jsonify({"error": str(e)}), 400 save_config(cfg) _unregister_schedule(sid) if sched.get("enabled", True): _register_schedule(sched) return jsonify(sched) @app.route("/api/schedules/", methods=["DELETE"]) @login_required def api_schedules_delete(sid): cfg = load_config() cfg["schedules"] = [s for s in cfg.get("schedules", []) if s["id"] != sid] save_config(cfg) _unregister_schedule(sid) return jsonify({"ok": True}) @app.route("/api/schedules//run", methods=["POST"]) @login_required def api_schedules_run_now(sid): jid = run_scheduled_backup(sid) if not jid: return jsonify({"error": "schedule not found or disabled"}), 404 return jsonify({"job_id": jid}) # ── Routes: jobs ────────────────────────────────────────────────────────────── @app.route("/api/jobs") @login_required def api_jobs(): with jobs_lock: return jsonify(list(jobs.values())) @app.route("/api/jobs//stream") def api_job_stream(jid): # Auth check without decorator (SSE doesn't send JSON) if auth_enabled() and not session.get("authenticated"): return Response("data: unauthorized\n\n", mimetype="text/event-stream", status=401) def generate(): last = 0 while True: with jobs_lock: job = jobs.get(jid) if not job: yield f"data: Job {jid} not found\n\n" break logs = job["logs"] while last < len(logs): line = logs[last].replace("\n", " ") yield f"data: {line}\n\n" last += 1 if line == "[DONE]": return if job["status"] in ("done", "error"): break time.sleep(0.15) return Response(stream_with_context(generate()), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) if __name__ == "__main__": import argparse p = argparse.ArgumentParser() p.add_argument("--host", default="0.0.0.0") p.add_argument("--port", type=int, default=5999) p.add_argument("--debug", action="store_true") args = p.parse_args() print(f"Docker Backup UI → http://{args.host}:{args.port}") app.run(host=args.host, port=args.port, debug=args.debug)