# -*- coding: utf-8 -*-
# vim: set filetype=python:

# Cross-platform launcher for InfluxDB 3
#
# This Python script is intended to be invoked explicitly via the bundled
# Python interpreter. It is designed to work across Linux, macOS, and Windows
# without modification.
#
# EXAMPLE USAGE:
#   Linux:   /usr/lib/influxdb3/python/bin/python3 /usr/lib/influxdb3/influxdb3-launcher [args]
#   macOS:   /opt/homebrew/lib/influxdb3/python/bin/python3 /opt/homebrew/lib/influxdb3/influxdb3-launcher [args]
#   Windows: "C:\Program Files\influxdb3\python\python.exe" "C:\Program Files\influxdb3\influxdb3-launcher" [args]
#
# Service managers (systemd, launchd, Windows Services) should explicitly
# invoke the bundled Python interpreter with this script as the first argument.
#
# This script uses os.execve() to replace the current process to ensure
# stdin/stdout/stderr, signals, and exit codes work correctly.

import sys

sys.dont_write_bytecode = True  # don't create __pycache__ files

import argparse
import os
import signal
import tempfile
import tomllib  # this is in cpython 3.11+ standard library
from typing import Any, Dict, List

# Platform detection for daemonization
PLATFORM_SUPPORTS_FORK = hasattr(os, "fork") and hasattr(os, "setsid")


# InfluxDB 3 doesn't yet support TOML configuration, so map TOML keys to their
# environment variable names. Only includes keys that don't follow the standard
# pattern of: INFLUXDB3_ + key.replace("-", "_").upper()
TOML_KEY_ENVVAR: Dict[str, Dict[str, str]] = {
    "common": {  # core and enterprise
        # Node configuration
        "http-bind": "INFLUXDB3_HTTP_BIND_ADDR",
        "node-id": "INFLUXDB3_NODE_IDENTIFIER_PREFIX",
        "node-id-from-env": "INFLUXDB3_NODE_IDENTIFIER_FROM_ENV",
        # Admin token recovery
        "admin-token-recovery-http-bind": "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR",
        # Authorization
        "without-auth": "INFLUXDB3_START_WITHOUT_AUTH",
        # Datafusion
        "num-datafusion-threads": "INFLUXDB3_DATAFUSION_NUM_THREADS",
        # IO
        "num-io-threads": "INFLUXDB3_IO_NUM_THREADS",
        # Object storage
        "data-dir": "INFLUXDB3_DB_DIR",
        # AWS (AWS_ prefix, not INFLUXDB3_)
        "aws-access-key-id": "AWS_ACCESS_KEY_ID",
        "aws-allow-http": "AWS_ALLOW_HTTP",
        "aws-credentials-file": "AWS_CREDENTIALS_FILE",
        "aws-default-region": "AWS_DEFAULT_REGION",
        "aws-endpoint": "AWS_ENDPOINT",
        "aws-s3-custom-backend": "AWS_S3_CUSTOM_BACKEND",
        "aws-secret-access-key": "AWS_SECRET_ACCESS_KEY",
        "aws-session-token": "AWS_SESSION_TOKEN",
        "aws-skip-signature": "AWS_SKIP_SIGNATURE",
        # Azure (AZURE_ prefix, not INFLUXDB3_)
        "azure-allow-http": "AZURE_ALLOW_HTTP",
        "azure-endpoint": "AZURE_ENDPOINT",
        "azure-storage-access-key": "AZURE_STORAGE_ACCESS_KEY",
        "azure-storage-account": "AZURE_STORAGE_ACCOUNT",
        # Google Cloud (GOOGLE_ prefix, not INFLUXDB3_)
        "google-service-account": "GOOGLE_SERVICE_ACCOUNT",
        # Object store (OBJECT_STORE_ prefix, not INFLUXDB3_)
        "object-store-cache-endpoint": "OBJECT_STORE_CACHE_ENDPOINT",
        "object-store-connection-limit": "OBJECT_STORE_CONNECTION_LIMIT",
        "object-store-http2-only": "OBJECT_STORE_HTTP2_ONLY",
        "object-store-http2-max-frame-size": "OBJECT_STORE_HTTP2_MAX_FRAME_SIZE",
        "object-store-max-retries": "OBJECT_STORE_MAX_RETRIES",
        "object-store-request-timeout": "OBJECT_STORE_REQUEST_TIMEOUT",
        "object-store-retry-timeout": "OBJECT_STORE_RETRY_TIMEOUT",
        "object-store-tls-allow-insecure": "OBJECT_STORE_TLS_ALLOW_INSECURE",
        "object-store-tls-ca": "OBJECT_STORE_TLS_CA",
        # Processing engine
        "virtual-env-location": "VIRTUAL_ENV",
        # WAL
        "snapshotted-wal-files-to-keep": "INFLUXDB3_NUM_WAL_FILES_TO_KEEP",
        # Tokio console (TOKIO_CONSOLE_ prefix, not INFLUXDB3_)
        "tokio-console-enabled": "TOKIO_CONSOLE_ENABLED",
        "tokio-console-client-buffer-capacity": "TOKIO_CONSOLE_CLIENT_BUFFER_CAPACITY",
        "tokio-console-event-buffer-capacity": "TOKIO_CONSOLE_EVENT_BUFFER_CAPACITY",
        # Tracing (TRACES_ prefix, not INFLUXDB3_)
        "traces-exporter": "TRACES_EXPORTER",
        "traces-exporter-jaeger-agent-host": "TRACES_EXPORTER_JAEGER_AGENT_HOST",
        "traces-exporter-jaeger-agent-port": "TRACES_EXPORTER_JAEGER_AGENT_PORT",
        "traces-exporter-jaeger-debug-name": "TRACES_EXPORTER_JAEGER_DEBUG_NAME",
        "traces-exporter-jaeger-service-name": "TRACES_EXPORTER_JAEGER_SERVICE_NAME",
        "traces-exporter-jaeger-trace-context-header-name": "TRACES_EXPORTER_JAEGER_TRACE_CONTEXT_HEADER_NAME",
        "traces-jaeger-debug-name": "TRACES_EXPORTER_JAEGER_DEBUG_NAME",
        "traces-jaeger-max-msgs-per-second": "TRACES_JAEGER_MAX_MSGS_PER_SECOND",
        "traces-jaeger-tags": "TRACES_EXPORTER_JAEGER_TAGS",
        # Tracing aliases (shorter TOML keys mapping to longer env var names)
        "traces-jaeger-debug-name": "INFLUXDB3_TRACES_EXPORTER_JAEGER_DEBUG_NAME",
        "traces-jaeger-tags": "INFLUXDB3_TRACES_EXPORTER_JAEGER_TAGS",
    },
    "core": {
        # Core-specific mappings (currently none - all are in common)
    },
    "enterprise": {
        # Enterprise cluster configuration
        "cluster-id": "INFLUXDB3_ENTERPRISE_CLUSTER_ID",
        "conn-info": "INFLUXDB3_ENTERPRISE_CONN_INFO",
        "mode": "INFLUXDB3_ENTERPRISE_MODE",
        "num-cores": "INFLUXDB3_ENTERPRISE_NUM_CORES",
        # Compaction
        "compaction-check-interval": "INFLUXDB3_ENTERPRISE_COMPACTION_CHECK_INTERVAL",
        "compaction-cleanup-wait": "INFLUXDB3_ENTERPRISE_COMPACTION_CLEANUP_WAIT",
        "compaction-gen2-duration": "INFLUXDB3_ENTERPRISE_COMPACTION_GEN2_DURATION",
        "compaction-max-num-files-per-plan": "INFLUXDB3_ENTERPRISE_COMPACTION_MAX_NUM_FILES_PER_PLAN",
        "compaction-multipliers": "INFLUXDB3_ENTERPRISE_COMPACTION_MULTIPLIERS",
        "compaction-row-limit": "INFLUXDB3_ENTERPRISE_COMPACTION_ROW_LIMIT",
        "max-compact-destination": "INFLUXDB3_ENTERPRISE_MAX_COMPACT_DESTINATION",
        # Last Value & Distinct Value Caches
        "preemptive-cache-age": "INFLUXDB3_ENTERPRISE_PREEMPTIVE_CACHE_AGE",
        # Licensing
        "license-email": "INFLUXDB3_ENTERPRISE_LICENSE_EMAIL",
        "license-file": "INFLUXDB3_ENTERPRISE_LICENSE_FILE",
        "license-type": "INFLUXDB3_ENTERPRISE_LICENSE_TYPE",
        # Resource limits
        "catalog-sync-interval": "INFLUXDB3_ENTERPRISE_CATALOG_SYNC_INTERVAL",
        "database-split-level": "INFLUXDB3_ENTERPRISE_DATABASE_SPLIT_LEVEL",
        "max-columns": "INFLUXDB3_ENTERPRISE_PACHA_TREE_MAX_COLUMNS",
        "num-database-limit": "INFLUXDB3_ENTERPRISE_NUM_DATABASE_LIMIT",
        "num-table-limit": "INFLUXDB3_ENTERPRISE_NUM_TABLE_LIMIT",
        "num-total-columns-per-table-limit": "INFLUXDB3_ENTERPRISE_NUM_TOTAL_COLUMNS_PER_TABLE_LIMIT",
        "replication-interval": "INFLUXDB3_ENTERPRISE_REPLICATION_INTERVAL",
        "table-split-level": "INFLUXDB3_ENTERPRISE_TABLE_SPLIT_LEVEL",
    },
}

# Stamp file configuration
STAMP_FILENAME = ".influxdb3-launcher"

# SSL certificate bundle path for RHEL-based systems. SSL_CERT_FILE will be
# conditionally set to this on systems known to need it for the
# python-build-standalone environment.
SSL_CERT_BUNDLE_PATH = "/etc/pki/tls/certs/ca-bundle.crt"

# PLATFORM_ID values that need SSL_CERT_FILE workaround (RHEL7 isn't currently
# supported by the database but is by the embedded python, so we'll set this in
# case the database is updated in the future).
SSL_CERT_AFFECTED_PLATFORMS = frozenset(
    [
        "platform:el7",
        "platform:el8",
        "platform:ol7",
        "platform:ol8",
    ]
)

# Required configuration keys for validation
REQUIRED_TOML_KEYS = {
    "core": ["object-store"],
    "enterprise": [
        "object-store,license-file",
        "object-store,license-email,license-type",
    ],
}


def read_stamp(stamp_path: str) -> str | None:
    """Read flavor from stamp file. Returns None if doesn't exist or can't read."""
    if not os.path.exists(stamp_path):
        return None

    try:
        with open(stamp_path, "r", encoding="utf-8") as f:
            return f.read().strip()
    except Exception as e:  # pragma: nocover
        print(f"W: Could not read stamp file {stamp_path}: {e}", file=sys.stderr)
        return None


def write_stamp(stamp_path: str, flavor: str) -> None:
    """Atomically write flavor stamp file."""
    stamp_dir = os.path.dirname(stamp_path)
    try:
        # Write to temp file then atomically rename
        fd, tmp = tempfile.mkstemp(dir=stamp_dir or ".", prefix=".tmp.stamp.")
        try:
            os.write(fd, f"{flavor}\n".encode("utf-8"))
            os.close(fd)
            os.chmod(tmp, 0o644)
            os.rename(tmp, stamp_path)
        except Exception:  # pragma: nocover
            try:
                os.close(fd)
            except Exception:
                pass
            try:
                os.unlink(tmp)
            except Exception:
                pass
            raise
    except Exception as e:  # pragma: nocover
        print(f"W: Could not write stamp file {stamp_path}: {e}", file=sys.stderr)


def get_platform_id(os_release_path: str = "/etc/os-release") -> str | None:
    """
    Read PLATFORM_ID from /etc/os-release.

    Returns the PLATFORM_ID value (e.g., "platform:el8") or None if not found.
    """
    if not os.path.exists(os_release_path):
        return None

    try:
        with open(os_release_path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if line.startswith("PLATFORM_ID="):
                    # Handle both quoted and unquoted values
                    value = line.split("=", 1)[1]
                    # Remove surrounding quotes if present
                    if (
                        len(value) >= 2
                        and value[0] == value[-1]
                        and value[0] in ('"', "'")
                    ):
                        value = value[1:-1]
                    return value
    except Exception:
        pass

    return None


def get_ssl_cert_env(
    os_release_path: str = "/etc/os-release",
    cert_bundle_path: str = SSL_CERT_BUNDLE_PATH,
) -> Dict[str, str]:
    """
    Get SSL_CERT_FILE environment variable for affected RHEL-based platforms.

    Args:
        os_release_path: Path to os-release file (for testing)
        cert_bundle_path: Path to certificate bundle (for testing)

    Returns:
        Dict with SSL_CERT_FILE key if conditions are met, empty dict otherwise
    """
    # Don't override user's existing SSL_CERT_FILE
    if os.environ.get("SSL_CERT_FILE"):
        return {}

    # Check if we're on an affected platform
    platform_id = get_platform_id(os_release_path)
    if platform_id is None or platform_id not in SSL_CERT_AFFECTED_PLATFORMS:
        return {}

    # Check if the certificate bundle exists
    if not os.path.exists(cert_bundle_path):
        return {}

    return {"SSL_CERT_FILE": cert_bundle_path}


def check_flavor_migration(stamp_path: str, current_flavor: str) -> None:
    """
    Check for flavor changes and handle migration/downgrade scenarios.

    Rules:
    - core -> enterprise: Print migration message
    - enterprise -> core: Fail with message
    - Same flavor: No action
    - No stamp: Fresh install, write stamp
    """
    previous_flavor = read_stamp(stamp_path)

    # Stamp exists, same flavor (normal operation)
    if previous_flavor is not None and previous_flavor == current_flavor:
        # No changes needed, fast path
        return

    # No stamp file (fresh install)
    if previous_flavor is None:
        write_stamp(stamp_path, current_flavor)
        return

    # Upgrade core -> enterprise
    if previous_flavor == "core" and current_flavor == "enterprise":
        print(
            "I: Detected previous run of InfluxDB 3 Core. If this is the first run of Enterprise,"
        )
        print(
            "I: your data will be migrated in the background. After a successful migration,"
        )
        print(f"I: remove '{stamp_path}' to suppress this message.")
        sys.stdout.flush()  # Ensure message is printed before exec
        # Don't update stamp - let user remove it after confirming migration
        return

    # Downgrade enterprise -> core
    if previous_flavor == "enterprise" and current_flavor == "core":
        print(
            "E: Cannot downgrade from InfluxDB 3 Enterprise to Core.", file=sys.stderr
        )
        print("E: To proceed with Core:", file=sys.stderr)
        print("E:   1. Back up your data", file=sys.stderr)
        print(f"E:   2. Remove stamp file: rm {stamp_path}", file=sys.stderr)
        print("E:   3. Ensure all data is removed from object store", file=sys.stderr)
        print("E:   4. Restart the service", file=sys.stderr)
        sys.exit(1)


def _validate_required_keys(
    config: Dict[str, str], required: List[str], key_type: str
) -> None:
    """
    Validate required keys are present in the configuration.

    Args:
        config: Dictionary of configuration key-value pairs
        required: List of required key groups. Each element can be:
                  - A single key (e.g., 'license-file')
                  - Comma-separated keys (e.g., 'license-email,license-type')
                  At least one group must be satisfied.
        key_type: Description of key type for error messages (e.g., 'TOML key', 'environment variable')

    Exits with error if validation fails.
    """
    if not required:
        return

    # Check if at least one required group is satisfied
    satisfied = False
    for group in required:
        keys = [k.strip() for k in group.split(",")]
        # Check if ALL keys in this group are present, non-empty, and scalar
        group_ok = True
        for key in keys:
            val = config.get(key, "")
            if isinstance(val, (str, int, float, bool)):
                if str(val).strip():
                    continue
                group_ok = False
                break
            else:
                print(
                    f"E: {key_type} '{key}' must be a scalar (flat TOML expected)",
                    file=sys.stderr,
                )
                sys.exit(1)
        if group_ok:
            satisfied = True
            break

    if not satisfied:
        print(
            f"E: Required configuration not found. At least one of the following {key_type} groups must be set:",
            file=sys.stderr,
        )
        for group in required:
            keys = [k.strip() for k in group.split(",")]
            if len(keys) == 1:
                print(f"E: - {keys[0]}", file=sys.stderr)
            else:
                print(f"E: - All of: {', '.join(keys)}", file=sys.stderr)
        sys.exit(1)


def _validate_file_path(path: str, description: str = "file") -> str:
    """
    Validate that a path exists and is a file.

    Args:
        path: Path to validate
        description: Description of the file type for error messages (e.g., "config file")

    Returns:
        Absolute path if valid

    Exits with error code 1 if validation fails.
    """
    abs_path: str = os.path.abspath(path)

    if not os.path.exists(abs_path):
        print(f"E: {description} {path} does not exist", file=sys.stderr)
        sys.exit(1)

    if not os.path.isfile(abs_path):
        print(f"E: {description} path {path} is not a file", file=sys.stderr)
        sys.exit(1)

    return abs_path


def check_executable(path: str) -> str:
    """
    Check if the path exists and is executable.
    Returns the absolute path if valid, exits with error otherwise.
    """
    abs_path: str = _validate_file_path(path, "executable")

    # On Unix-like systems, check executable bit (Windows looks at file
    # extension/assocations/etc)
    if os.name != "nt":  # not Windows
        if not os.access(abs_path, os.X_OK):
            print(f"E: {path} is not executable", file=sys.stderr)
            sys.exit(1)

    return abs_path


def read_config_toml(path: str, flavor: str) -> Dict[str, str]:
    """
    Read and parse the TOML configuration file for environment variables.

    Args:
        path: Path to the TOML configuration file
        flavor: The InfluxDB 3 flavor ('core' or 'enterprise')

    Returns:
        Dict of environment variables to set
    """
    abs_path: str = _validate_file_path(path, "config file")

    try:
        with open(abs_path, "rb") as f:
            toml_data: Dict[str, Any] = tomllib.load(f)

        # Validate required TOML keys before conversion
        required = REQUIRED_TOML_KEYS[flavor]
        if required:
            _validate_required_keys(toml_data, required, "TOML key")

        env_vars: Dict[str, str] = {}

        # Build combined mapping from common and flavor-specific entries
        combined_mapping: Dict[str, str] = {}
        combined_mapping.update(TOML_KEY_ENVVAR.get("common", {}))
        combined_mapping.update(TOML_KEY_ENVVAR.get(flavor, {}))

        # Flatten TOML structure and convert to environment variables
        for key, value in toml_data.items():
            # Skip nested structures (tables, arrays of tables, etc.)
            # Only process simple key-value pairs
            if isinstance(value, (str, int, float, bool)):
                # Convert value to string
                str_value = (
                    str(value).lower() if isinstance(value, bool) else str(value)
                )

                # Determine the environment variable name
                if key in combined_mapping:
                    # Use the explicit mapping
                    env_var_name = combined_mapping[key]
                else:
                    # Default: convert dashes to underscores, uppercase, and prepend INFLUXDB3_
                    env_var_name = "INFLUXDB3_" + key.replace("-", "_").upper()

                env_vars[env_var_name] = str_value

        return env_vars
    except Exception as e:
        print(f"E: problem reading TOML config file {path}: {e}", file=sys.stderr)
        sys.exit(1)


def write_pidfile(pidfile: str, pid: int | None = None) -> None:
    """
    Write a PID to a file atomically.

    Args:
        pidfile: Path to the PID file to write
        pid: PID to write (defaults to current process PID)

    This uses a write-temp-then-rename strategy for atomicity.
    """
    if pid is None:
        pid = os.getpid()

    piddir = os.path.dirname(pidfile)
    if piddir and not os.path.exists(piddir):
        try:
            os.makedirs(piddir, mode=0o755)
        except OSError as e:  # pragma: nocover
            print(f"E: cannot create PID directory {piddir}: {e}", file=sys.stderr)
            sys.exit(1)

    try:
        # Write to temp file in same directory for atomic rename
        fd, tmp = tempfile.mkstemp(dir=piddir or ".", prefix=".tmp.pid.")
        try:
            os.write(fd, f"{pid}\n".encode("ascii"))
            os.close(fd)
            os.chmod(tmp, 0o644)
            # Atomic rename - even if we crash here, no corruption
            os.rename(tmp, pidfile)
        except Exception:  # pragma: nocover
            try:
                os.close(fd)
            except Exception:
                pass
            try:
                os.unlink(tmp)
            except Exception:
                pass
            raise
    except Exception as e:  # pragma: nocover
        print(f"E: cannot write PID file {pidfile}: {e}", file=sys.stderr)
        sys.exit(1)


def daemonize(pidfile: str | None = None, log_file: str | None = None) -> None:
    """
    Daemonize the current process using the Unix double-fork technique.

    This function:
    1. Forks once - parent exits immediately
    2. First child becomes session leader via setsid()
    3. Forks again - first child (session leader) exits
    4. Grandchild writes its PID to pidfile
    5. Redirects stdin to /dev/null
    6. Redirects stdout/stderr to log_file (or /dev/null if not specified)

    The double-fork ensures the final daemon process is not a session leader,
    so it can never acquire a controlling terminal even if it opens a tty.

    Args:
        pidfile: Path to write the daemon's PID (written by grandchild)
        log_file: Path to redirect stdout/stderr (defaults to /dev/null)

    Note: This function only returns in the grandchild process. Parent and
    intermediate child both exit.
    """
    if not PLATFORM_SUPPORTS_FORK:
        print(
            "E: --daemonize is not supported on this platform (requires fork/setsid)",
            file=sys.stderr,
        )
        sys.exit(1)

    # Flush any pending output before forking
    sys.stdout.flush()
    sys.stderr.flush()

    # First fork - parent exits, child continues
    try:
        pid = os.fork()
    except OSError as e:  # pragma: nocover
        print(f"E: fork failed: {e}", file=sys.stderr)
        sys.exit(1)

    if pid > 0:
        # Parent exits cleanly; child continues with daemonization
        os._exit(0)

    # First child: become session leader to detach from controlling terminal
    os.setsid()

    # Ignore SIGHUP so we don't get killed when the session leader exits.
    # Note: The exec'd process can override this with sigaction() if it needs
    # to handle SIGHUP (e.g., for config reload). tokio::signal does this.
    signal.signal(signal.SIGHUP, signal.SIG_IGN)

    # Second fork - ensures we can never acquire a controlling terminal.
    # Not strictly needed for a database server (which won't open tty devices),
    # but we do it defensively to follow the traditional double-fork pattern.
    try:
        pid = os.fork()
    except OSError as e:  # pragma: nocover
        print(f"E: second fork failed: {e}", file=sys.stderr)
        sys.exit(1)

    if pid > 0:
        # Intermediate process (session leader) exits; grandchild continues
        os._exit(0)

    # Grandchild: write our PID to pidfile (we are the final daemon process)
    if pidfile:
        write_pidfile(pidfile)

    # Redirect standard file descriptors
    # stdin -> /dev/null
    # stdout/stderr -> log_file or /dev/null
    devnull = os.open("/dev/null", os.O_RDWR)

    # Redirect stdin to /dev/null
    os.dup2(devnull, sys.stdin.fileno())

    if log_file:
        # Open log file for stdout/stderr (append mode, create if needed)
        try:
            log_fd = os.open(log_file, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o644)
            os.dup2(log_fd, sys.stdout.fileno())
            os.dup2(log_fd, sys.stderr.fileno())
            if log_fd > 2:
                os.close(log_fd)
        except OSError as e:  # pragma: nocover
            # Fall back to /dev/null if log file can't be opened
            print(f"W: cannot open log file {log_file}: {e}", file=sys.stderr)
            os.dup2(devnull, sys.stdout.fileno())
            os.dup2(devnull, sys.stderr.fileno())
    else:
        # Redirect stdout/stderr to /dev/null
        os.dup2(devnull, sys.stdout.fileno())
        os.dup2(devnull, sys.stderr.fileno())

    if devnull > 2:
        os.close(devnull)

    # Child continues execution - caller should proceed to exec


def run(
    exec_path: str,
    exec_args: List[str],
    env_vars: Dict[str, str],
    pidfile: str | None = None,
) -> None:
    """
    Execute the command, replacing the current process using os.execve() to
    ensure stdin/stdout/stderr, signals, and exit codes work correctly. On
    Unix-like systems, this truly replaces the process (same PID). On Windows,
    the behavior is similar but uses a different mechanism.

    Args:
        exec_path: Absolute path to the executable
        exec_args: List of arguments to pass to the executable
        env_vars: Dict of environment variables to add/override
        pidfile: Optional path to write PID file before exec
    """
    # Since we have exec-replace semantics, write PID file before exec if
    # requested since if exec succeeds, we'll still have this PID. The caller
    # of the launcher using this option (eg, a SysV initscript) is responsible
    # for pidfile lifecycle management.
    if pidfile:
        write_pidfile(pidfile)

    # Build the environment with correct precedence:
    # 1. Command-line arguments (handled by influxdb itself)
    # 2. Parent environment (from os.environ)
    # 3. Config file (lowest priority)
    env: Dict[str, str] = {}
    env.update(env_vars)  # Start with config file
    env.update(os.environ)  # Override with parent environment

    # Restore default SIGHUP if daemonize() left it ignored (SysV path)
    if hasattr(signal, "SIGHUP"):
        try:
            if signal.getsignal(signal.SIGHUP) == signal.SIG_IGN:
                signal.signal(signal.SIGHUP, signal.SIG_DFL)
        except Exception:
            pass

    # Build the full argument list (argv[0] should be the program name)
    full_args: List[str] = [exec_path] + exec_args

    # This call does not return on success
    try:
        os.execve(exec_path, full_args, env)
    except OSError as e:  # pragma: nocover
        print(f"E: problem executing {exec_path}: {e}", file=sys.stderr)
        sys.exit(1)

    # This line should never be reached if exec succeeds
    print("E: exec failed to replace process", file=sys.stderr)  # pragma: nocover
    sys.exit(1)  # pragma: nocover


def main(argv: List[str] | None = None) -> None:
    """
    Main entry point for the launcher.

    Args:
        argv: Command-line arguments (defaults to sys.argv if None)
    """
    if argv is None:  # pragma: nocover
        argv = sys.argv

    # Split arguments on '--' separator; before is launcher, after is program
    if "--" in argv:
        separator_idx: int = argv.index("--")
        launcher_args: List[str] = argv[1:separator_idx]
        exec_args: List[str] = argv[separator_idx + 1 :]
    else:
        launcher_args: List[str] = argv[1:]
        exec_args: List[str] = []

    # Parse launcher arguments
    parser = argparse.ArgumentParser(
        description="InfluxDB 3 cross-platform launcher",
        epilog="Arguments after '--' are passed to the executable",
    )
    parser.add_argument(
        "--flavor",
        required=True,
        choices=["core", "enterprise"],
        help="Specify the InfluxDB 3 flavor",
    )
    parser.add_argument(
        "--exec", required=True, metavar="PATH", help="Path to the influxdb3 executable"
    )
    parser.add_argument(
        "--config-toml", metavar="PATH", help="Path to the TOML configuration file"
    )
    parser.add_argument(
        "--pidfile",
        metavar="PATH",
        help="Path to write PID file before exec (for sysv init compatibility)",
    )
    parser.add_argument(
        "--stamp-dir",
        required=True,
        metavar="PATH",
        help="Directory for flavor stamp file (e.g., /var/lib/influxdb3)",
    )
    parser.add_argument(
        "--daemonize",
        action="store_true",
        help="Fork to background before exec (for init systems that expect it; not supported on Windows)",
    )
    parser.add_argument(
        "--log-file",
        metavar="PATH",
        help="Path to redirect stdout/stderr when daemonizing (defaults to /dev/null)",
    )

    args = parser.parse_args(launcher_args)

    # Validate that at least one config option is provided
    if not args.config_toml:
        parser.error("--config-toml is required")

    # Warn if --log-file is used without --daemonize
    if args.log_file and not args.daemonize:
        print("W: --log-file has no effect without --daemonize", file=sys.stderr)

    # Read configuration files and merge environment variables
    env_vars = read_config_toml(args.config_toml, args.flavor)

    # Add SSL_CERT_FILE for affected RHEL-based platforms
    env_vars.update(get_ssl_cert_env())

    # Check flavor migration BEFORE exec
    stamp_path = os.path.join(args.stamp_dir, STAMP_FILENAME)
    check_flavor_migration(stamp_path, args.flavor)

    # Validate executable before daemonizing (fail early)
    exec_path = check_executable(args.exec)

    # Daemonize if requested (forks, parent exits after writing pidfile)
    if args.daemonize:
        daemonize(args.pidfile, args.log_file)
        # Only child reaches here - pidfile already written by parent
        # Execute without pidfile arg since parent already wrote it
        run(exec_path, exec_args, env_vars, pidfile=None)
    else:
        # Execute the command, replacing this process
        # This ensures the executed program behaves as if it was invoked directly
        run(exec_path, exec_args, env_vars, args.pidfile)


if __name__ == "__main__":  # pragma: nocover
    main()
