Aching Notes

技术笔记、折腾记录和长期项目整理

外联连接审计:把外部链接收集成 JSON 和 Excel 报表

目标

外联审计不是简单地看“谁连了外网”,而是把连接数据变成可检索、可留档的数据。

这篇我直接给一份能复用的最小脚本,目标很明确:

  1. 采集当前连接
  2. 提取进程、目的地址和端口
  3. 去掉内网和保留地址
  4. 按主机、进程、目标域名汇总
  5. 同时导出 JSON 和 Excel

采集先做薄

不要一上来就写大脚本,先从 ss 开始。

ss -Htanp state established

如果要看更完整的进程信息,可以补一层:

lsof -nP -iTCP -sTCP:ESTABLISHED

直接可用的脚本

下面这份脚本可以直接保存成 egress_audit.py,在 Linux 主机上跑。

#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import re
import socket
import subprocess
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Iterable


PRIVATE_PREFIXES = (
    "10.",
    "127.",
    "169.254.",
    "172.16.",
    "172.17.",
    "172.18.",
    "172.19.",
    "172.20.",
    "172.21.",
    "172.22.",
    "172.23.",
    "172.24.",
    "172.25.",
    "172.26.",
    "172.27.",
    "172.28.",
    "172.29.",
    "172.30.",
    "172.31.",
    "192.168.",
)


@dataclass
class Record:
    host: str
    pid: int | None
    process: str
    dst_ip: str
    dst_port: int
    category: str
    reverse_dns: str


def is_public_ip(ip: str) -> bool:
    if ip == "0.0.0.0":
        return False
    if ip.startswith(PRIVATE_PREFIXES):
        return False
    if ":" in ip:
        return not (ip.startswith("fe80:") or ip.startswith("fc") or ip.startswith("fd") or ip == "::1")
    return True


def reverse_dns(ip: str) -> str:
    try:
        return socket.gethostbyaddr(ip)[0]
    except Exception:
        return ""


def run_cmd(cmd: list[str]) -> str:
    proc = subprocess.run(cmd, check=True, text=True, capture_output=True)
    return proc.stdout


def parse_ss() -> Iterable[Record]:
    host = socket.gethostname()
    for line in run_cmd(["ss", "-Htanp", "state", "established"]).splitlines():
        parts = line.split()
        if len(parts) < 5:
            continue
        local = parts[3]
        remote = parts[4]
        if ":" not in remote:
            continue
        dst_ip, dst_port = remote.rsplit(":", 1)
        if dst_ip == "*" or not dst_port.isdigit():
            continue

        proc = "unknown"
        pid = None
        m = re.search(r'users:(("(?P<proc>[^"]+)",pid=(?P<pid>d+)', line)
        if m:
            proc = m.group("proc")
            pid = int(m.group("pid"))

        category = "public" if is_public_ip(dst_ip) else "private"
        yield Record(
            host=host,
            pid=pid,
            process=proc,
            dst_ip=dst_ip,
            dst_port=int(dst_port),
            category=category,
            reverse_dns=reverse_dns(dst_ip) if category == "public" else "",
        )


def main() -> int:
    parser = argparse.ArgumentParser()
    parser.add_argument("-o", "--output", default="egress.json")
    parser.add_argument("--excel", default="egress.xlsx")
    args = parser.parse_args()

    rows = [asdict(item) for item in parse_ss()]
    Path(args.output).write_text(json.dumps(rows, ensure_ascii=False, indent=2), encoding="utf-8")

    try:
        import pandas as pd
        with pd.ExcelWriter(args.excel) as writer:
            pd.DataFrame(rows).to_excel(writer, index=False, sheet_name="all")
            public_rows = [row for row in rows if row["category"] == "public"]
            pd.DataFrame(public_rows).to_excel(writer, index=False, sheet_name="public")
    except Exception as exc:
        print(f"excel export skipped: {exc}")

    print(f"saved {len(rows)} rows to {args.output}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

运行方式很直接:

chmod +x egress_audit.py
./egress_audit.py -o /tmp/egress.json --excel /tmp/egress.xlsx

如果机器上没有 pandas,脚本仍然会把 JSON 先落下来,Excel 导出会自动跳过,不影响主流程。

需要 Excel 输出的话,先装依赖:

python3 -m pip install pandas openpyxl

结构化输出怎么设计

我会把每条连接整理成固定字段:

{
  "host": "node-01",
  "pid": 1234,
  "process": "nginx",
  "dst_ip": "1.2.3.4",
  "dst_port": 443,
  "category": "public"
}

只要字段固定,后面要切 Excel、Markdown、CSV 都很容易。上面那个脚本已经把这几个字段都准备好了。

过滤规则先定

不先定过滤规则,结果会被本地回环和内网地址淹没。

def is_public(ip: str) -> bool:
    private_prefixes = ("10.", "172.16.", "172.17.", "192.168.", "127.")
    return not ip.startswith(private_prefixes)

IPv6 也建议补上 fc00::/7fe80::/10

报表怎么出

JSON

给后续自动化接,脚本已经默认生成。

Excel

给人翻,脚本会默认写一个 all sheet 和一个 public sheet。

import pandas as pd

df = pd.DataFrame(rows)
with pd.ExcelWriter("egress.xlsx") as writer:
    df.to_excel(writer, index=False, sheet_name="all")
    df[df["category"] == "public"].to_excel(writer, index=False, sheet_name="public")

定时跑时怎么避免脏结果

采集时刻和输出目录都要固定,不要每次乱放。

base="/var/log/egress-audit/$(date +%F)"
mkdir -p "$base"
./collect.py > "$base/raw.json"
./render.py "$base/raw.json" > "$base/report.md"

公开文章里可以写什么

可以写:

  • 字段设计
  • 过滤逻辑
  • JSON/Excel 双出口
  • 定时任务目录结构
  • 可直接复用的脚本

不要写:

  • 真实连接目标
  • 真实业务系统
  • 真实敏感域名

结论

外联审计真正有用的地方,不是“查到了一次连接”,而是能连续跑、能回看、能导出。只要数据结构固定,这件事就能从一次脚本升级成一个长期动作。


已发布

分类

来自

标签:

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注