#!/bin/sh
set -e

# Exercise the systemd helpers added by the Debian systemd integration
# patch (debian/patches/systemd-receive-wal-integration.patch). These are
# pure functions, so they can be checked deterministically without a running
# systemd or PostgreSQL.

echo "Testing barman.process systemd helpers..."

python3 -c '
import os
from barman.process import (
    SYSTEMD_RECEIVE_WAL_UNIT_PREFIX,
    server_name_is_unit_safe,
    systemd_receive_wal_unit,
    systemd_unit_for_pid,
    _systemd_unit_from_cgroup,
)

unit = systemd_receive_wal_unit("main")
assert unit == "barman-receive-wal@main.service", unit
assert systemd_receive_wal_unit("pg-12").startswith(SYSTEMD_RECEIVE_WAL_UNIT_PREFIX)

# cgroup parser: positive matches (cgroup v2 and the v1 named hierarchy)
v2 = "0::/system.slice/barman-receive-wal@main.service\n"
assert _systemd_unit_from_cgroup(v2) == "barman-receive-wal@main.service"
v1 = "5:name=systemd:/system.slice/barman-receive-wal@pg-12.service\n"
assert _systemd_unit_from_cgroup(v1) == "barman-receive-wal@pg-12.service"
# a server name containing dots is matched as a whole (up to .service)
dotted = "0::/system.slice/barman-receive-wal@pg.prod.service\n"
assert _systemd_unit_from_cgroup(dotted) == "barman-receive-wal@pg.prod.service"
# a cgroup v2 line with a trailing sub-cgroup still resolves to the bare unit
v2sub = "0::/system.slice/barman-receive-wal@main.service/payload\n"
assert _systemd_unit_from_cgroup(v2sub) == "barman-receive-wal@main.service"

# cgroup parser: negative matches (unrelated unit, no unit, empty)
assert _systemd_unit_from_cgroup("0::/system.slice/barman.service\n") is None
assert _systemd_unit_from_cgroup("0::/user.slice/user-1000.slice\n") is None
assert _systemd_unit_from_cgroup("") is None

# This very process is not supervised by a barman-receive-wal@ unit, and a
# non-existent PID must be handled gracefully.
assert systemd_unit_for_pid(os.getpid()) is None
assert systemd_unit_for_pid(2 ** 30) is None

# server-name unit-safety: conventional names are safe; anything that would need
# systemd escaping (slash, whitespace, leading dot, empty) is not.
for safe in ("main", "pg-12", "pg.prod", "testdb"):
    assert server_name_is_unit_safe(safe), safe
for unsafe in ("pg/prod", "has space", ".hidden", "", ":x"):
    assert not server_name_is_unit_safe(unsafe), unsafe

print("All systemd helper tests passed.")
'

# Deterministically exercise Server.start_receive_wal()'s dispatch (systemd unit
# vs forked fallback) without a running systemd, covering the non-systemd path
# and the no-polkit / unsafe-name fallbacks.
echo "Testing Server.start_receive_wal dispatch..."

python3 -c '
import types
from unittest import mock
from barman.server import Server


def run(systemd_running, name, systemctl_ok):
    srv = Server.__new__(Server)
    srv.config = types.SimpleNamespace(name=name)
    calls = []
    srv.systemd_receive_wal = lambda: (calls.append("unit"), systemctl_ok)[1]
    srv.background_receive_wal = lambda keep_descriptors: calls.append("fork")
    with mock.patch("barman.server.systemd_is_running", return_value=systemd_running):
        srv.start_receive_wal(keep_descriptors=False)
    return calls


# No systemd: always fork (covers the non-systemd path).
assert run(False, "main", True) == ["fork"], "non-systemd"
# Systemd and the unit can be managed: supervise via the unit, no fork.
assert run(True, "main", True) == ["unit"], "systemd unit"
# Systemd but the unit cannot be managed (e.g. no polkit): fall back to fork.
assert run(True, "main", False) == ["unit", "fork"], "no-polkit fallback"
# Systemd but the server name is not unit-safe: fork without trying the unit.
assert run(True, "pg/prod", True) == ["fork"], "unsafe name"

print("start_receive_wal dispatch tests passed.")
'
