Files
2026-04-20 18:26:54 +08:00

624 lines
17 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package backlink computes backlink (prosperity) scores for all known domains,
// using a PageRank-like algorithm over the site-level link graph.
// backlink 包实现 PageRank 类似的反向链接评分算法,在网站级链接图上迭代计算繁荣分数。
//
// 每 48 小时运行一次,将结果写入 savedata/prosper.json,供爬虫调度和搜索排序使用。
package backlink
import (
"encoding/json" // JSON 序列化(输出 prosper.json 和 cos map
"log" // 日志
"math" // 数学运算(Log、开方、幂)
"math/rand" // 随机数(对高频域名采样降权)
"os" // 文件写入
"path/filepath" // 路径拼接
"strings" // 字符串操作
"sync" // 互斥锁(保护手动触发的并发安全)
"time" // 时间计算(下次运行时间、睡眠)
"sese-engine/storage" // 持久化存储
)
// Runner 管理反向链接计算循环。
type Runner struct {
store *storage.RedisStoreV2
storagePath string // 存储根目录(用于写入 prosper.json
mu sync.Mutex
running bool // 是否正在计算中
nextRun time.Time // 下次计划执行时间
lastRunAt *time.Time // 上次完成时间(nil 表示尚未运行)
lastError string // 上次错误信息(空字符串表示无错误)
}
// New 创建一个 Runner 实例。
func New(store *storage.RedisStoreV2, storagePath string) *Runner {
return &Runner{store: store, storagePath: storagePath}
}
// Status 返回反链计算器的当前状态快照。
func (r *Runner) Status() map[string]interface{} {
r.mu.Lock()
defer r.mu.Unlock()
m := map[string]interface{}{
"running": r.running,
"next_run": r.nextRun.Format(time.RFC3339),
}
if r.lastRunAt != nil {
m["last_run"] = r.lastRunAt.Format(time.RFC3339)
}
if r.lastError != "" {
m["last_error"] = r.lastError
}
return m
}
// Run 无限循环,每 48 小时执行一次反向链接计算。
// 每次运行对齐到凌晨 2:00(便于在低峰期执行重计算)。
func (r *Runner) Run() {
for {
// 计算距离下次运行(凌晨 2:00)的睡眠时长
now := time.Now()
target := time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, now.Location())
if !target.After(now) {
target = target.Add(48 * time.Hour) // 已过凌晨 2 点,则等明天的 2 点
}
r.mu.Lock()
r.nextRun = target
r.mu.Unlock()
sleep := target.Sub(now)
log.Printf("[backlink] next run at %v (in %v)", target.Format(time.RFC3339), sleep.Round(time.Minute))
time.Sleep(sleep)
r.doCompute()
}
}
// RunNow 立即执行一次计算(用于手动触发)。
// 如果已有计算正在运行则返回 nil(幂等)。
// 执行后重新计算下次自动运行的定时。
func (r *Runner) RunNow() error {
r.mu.Lock()
if r.running {
r.mu.Unlock()
log.Printf("[backlink] RunNow: already running, skipping")
return nil
}
r.mu.Unlock()
r.doCompute()
return nil
}
// doCompute 执行一次完整的计算,更新状态字段。
func (r *Runner) doCompute() {
r.mu.Lock()
r.running = true
r.lastError = ""
r.mu.Unlock()
log.Printf("[backlink] starting computation at %v", time.Now().Format(time.RFC3339))
err := r.compute()
now := time.Now()
r.mu.Lock()
r.running = false
r.lastRunAt = &now
if err != nil {
r.lastError = err.Error()
}
// 重新计算下次自动运行时间
target := time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, now.Location())
if !target.After(now) {
target = target.Add(48 * time.Hour)
}
r.nextRun = target
r.mu.Unlock()
if err != nil {
log.Printf("[backlink] error: %v", err)
} else {
log.Printf("[backlink] done, next run at %v", target.Format(time.RFC3339))
}
}
// ---- 计算核心 ----
// siteStats 存放网站图的统计信息,用于多维度过滤和加权。
type siteStats struct {
subdomainCount map[string]int // 顶级域名 → 子域名数量(识别同一组织的多个子站)
templateCount map[string]int // HTML 结构特征 → 出现次数(识别姊妹站点/镜像)
sameIPCount map[string]int // IP 前缀 → 网站数量(识别同 IP 上的多个网站)
serverCount map[string]int // Server 类型组合 → 出现次数(识别同服务器部署的网站)
}
// compute 执行完整的反向链接计算流程。
// 包含:统计收集 → HTTPS/HTTP 分别迭代 → 合并 → 写入文件。
func (r *Runner) compute() error {
stats := r.collectStats()
// 阶段一:HTTPS 网站的 PageRank 迭代
d1 := r.aggregate(func(info *storage.SiteInfo) bool {
return info.HTTPSAvailable != nil && *info.HTTPSAvailable
}, stats, "https_backlink")
// 阶段一增强(Echo):用 d1 结果加权再做一轮迭代,放大已有繁荣值的域名
d1a := r.aggregateWithScores(d1, stats, "echo")
// 阶段二:HTTP only 网站的迭代(独立计算,不混入 HTTPS 分数)
d2 := r.aggregate(func(info *storage.SiteInfo) bool {
return info.HTTPSAvailable == nil || !*info.HTTPSAvailable
}, stats, "http_backlink")
// 三路合并:HTTPS 分数主导,Echo 辅助,HTTP 补充
merged := make(map[string]float64)
for k := range union(d1, d2, d1a) {
// 混合公式:HTTPS × 1 + Echo × 1 + min(HTTPS×0.5 + HTTP×0.1, HTTP)
v := d1[k] + d1a[k] + math.Min(d1[k]*0.5+d2[k]*0.1, d2[k])
if v > 0.16 {
merged[k] = v
}
}
// 写入文件
path := filepath.Join(r.storagePath, "prosper.json")
if err := writeJSON(path, merged); err != nil {
return err
}
log.Printf("[backlink] wrote %d entries to %s", len(merged), path)
return nil
}
// collectStats 遍历所有网站元信息,统计子域名、HTML 模板、IP、Server 类型分布。
// 低于阈值(4)的统计项被剔除,以减少噪声影响。
func (r *Runner) collectStats() *siteStats {
stats := &siteStats{
subdomainCount: make(map[string]int),
templateCount: make(map[string]int),
sameIPCount: make(map[string]int),
serverCount: make(map[string]int),
}
_ = r.store.ForEachSite(func(host string, info *storage.SiteInfo) error {
super := superDomain(host)
stats.subdomainCount[super]++
if info.HTMLStructure != "" {
stats.templateCount[info.HTMLStructure]++
}
if len(info.IPs) > 0 {
ipStr := ipPrefix(info.IPs)
stats.sameIPCount[ipStr]++
}
if len(info.ServerTypes) > 0 {
s := strings.Join(sortedStrings(info.ServerTypes), ",")
stats.serverCount[s]++
}
return nil
})
// 剔除低频统计项
for k, v := range stats.subdomainCount {
if v < 4 {
delete(stats.subdomainCount, k)
}
}
for k, v := range stats.templateCount {
if v < 4 {
delete(stats.templateCount, k)
}
}
for k, v := range stats.sameIPCount {
if v < 4 {
delete(stats.sameIPCount, k)
}
}
return stats
}
// aggregate 执行一轮 PageRank 风格的链接权重迭代。
// filter 筛选纳入计算的目标网站集合;desc 为日志标识。
func (r *Runner) aggregate(filter func(*storage.SiteInfo) bool, stats *siteStats, desc string) map[string]float64 {
log.Printf("[backlink] aggregating: %s", desc)
d := make(map[string]float64)
ipSource := make(map[string]float64)
// 建立 Server 类型的 ID 映射表(最多 63 种,用于构建向量)
serverTable := buildServerTable(stats.serverCount)
type vectorEntry struct {
domain string
vec []float32
}
vectors := make(map[string][]float32)
pruneThreshold := 0.02
i := 0
_ = r.store.ForEachSite(func(host string, info *storage.SiteInfo) error {
if filter != nil && !filter(info) {
return nil
}
mul := computeMul(host, info, stats) // 计算域名综合乘数(时间衰减 + 子域名降权)
if mul == 0 {
return nil
}
n := len(info.OutLinks)
if n == 0 {
return nil
}
// 每条出站链接的初始权重:1/max(n, 50),出站越多每条分得越少
w := 1.0 / math.Max(float64(n), 50)
xd := make(map[string]float64)
for _, link := range info.OutLinks {
for _, seg := range decomposeURL(link) {
if _, exists := xd[seg]; !exists {
xd[seg] = w
} else {
xd[seg] += w
}
}
}
ipStr := ipPrefix(info.IPs)
serverType := ""
if len(info.ServerTypes) > 0 {
serverType = info.ServerTypes[0]
}
serverID := serverTable[serverType]
for seg, segW := range xd {
fw := math.Min(segW, 0.15) * mul // 截断上限 0.15,防止单链接权重过高
prev := d[seg]
d[seg] = prev + fw
// IP 来源去重:来自同一 IP 段的高权重链接在超过 0.4 后跳过,防止 IP 污染
if prev > 0.2 {
if _, sameIP := stats.sameIPCount[ipStr]; ipStr != "" && sameIP {
key := seg + "-" + ipStr
if ipSource[key] > 0.4 {
continue
}
ipSource[key] += fw
}
}
// 构建向量:域名 → Server 类型向量(用于余弦相似度过滤)
if prev > 0.21 && !strings.Contains(seg, "/") && serverType != "" {
if vectors[seg] == nil {
vectors[seg] = make([]float32, 64)
}
vectors[seg][serverID] += float32(fw)
}
}
i++
// 每 20 万条遍历后清理低分条目,防止内存膨胀
if i%200000 == 0 {
for k, v := range d {
if v < pruneThreshold {
delete(d, k)
}
}
pruneThreshold *= 1.1
}
if i%400000 == 0 {
for k, v := range ipSource {
if v < 0.04 {
delete(ipSource, k)
}
}
}
return nil
})
// 向量余弦过滤:去除 Server 类型特征偏离核心向量的域名(可能是噪音/作弊)
d = vectorFilter(d, vectors, desc, r.storagePath)
// 最终清理:分数 ≤ 0.16 的域名不写入(低于此阈值认为不繁荣)
for k, v := range d {
if v <= 0.16 {
delete(d, k)
}
}
log.Printf("[backlink] %s: %d entries", desc, len(d))
return d
}
// aggregateWithScores 在已有繁荣分数的基础上加权再做一轮迭代(Echo 阶段)。
// 对已有分数的域名给予更高权重(乘以 log2(2+score)),使强者更强。
func (r *Runner) aggregateWithScores(scores map[string]float64, stats *siteStats, desc string) map[string]float64 {
log.Printf("[backlink] aggregating with scores: %s", desc)
d := make(map[string]float64)
serverTable := buildServerTable(stats.serverCount)
vectors := make(map[string][]float32)
_ = r.store.ForEachSite(func(host string, info *storage.SiteInfo) error {
score, ok := scores[host]
if !ok || strings.Contains(host, "/") {
return nil
}
mul := computeMul(host, info, stats)
if mul == 0 {
return nil
}
// 已有分数的域名获得加权乘数(上限 2×)
trueMul := math.Min(2, mul*math.Log2(2+score))
n := len(info.OutLinks)
if n == 0 {
return nil
}
w := 1.0 / math.Max(float64(n), 50)
xd := make(map[string]float64)
for _, link := range info.OutLinks {
for _, seg := range decomposeURL(link) {
xd[seg] += w
}
}
serverType := ""
if len(info.ServerTypes) > 0 {
serverType = info.ServerTypes[0]
}
serverID := serverTable[serverType]
for seg, segW := range xd {
fw := math.Min(segW, 0.15) * trueMul
d[seg] += fw
if d[seg] > 0.21 && !strings.Contains(seg, "/") && serverType != "" {
if vectors[seg] == nil {
vectors[seg] = make([]float32, 64)
}
vectors[seg][serverID] += float32(fw)
}
}
return nil
})
d = vectorFilter(d, vectors, desc, r.storagePath)
for k, v := range d {
if v <= 0.16 {
delete(d, k)
}
}
return d
}
// ---- 向量余弦过滤 ----
// vectorFilter 使用余弦相似度过滤域名分数:保留与核心 Server 类型向量相似的域名。
// 与核心方向偏离的域名可能是噪音(如作弊农场、链接买卖)。
func vectorFilter(d map[string]float64, vectors map[string][]float32, desc string, storagePath string) map[string]float64 {
// 计算全网站的 Server 类型核心向量(所有向量求和)
core := make([]float64, 64)
for _, vec := range vectors {
for j, v := range vec {
core[j] += float64(v)
}
}
coreNorm := norm64(core)
if coreNorm == 0 {
return d
}
newD := make(map[string]float64, len(d))
for k, v := range d {
baseK := strings.Split(k, "/")[0]
if v > 0.21 && vectors[baseK] != nil {
vec := vectors[baseK]
vecNorm := float64(norm32(vec))
if vecNorm == 0 {
newD[k] = v
continue
}
// 余弦相似度:范围 [-1, 1]
cos := dot32_64(vec, core) / (vecNorm * coreNorm)
if cos > 1.01 {
cos = 1.01
}
// cos × 0.75 + 0.25:确保最低也有 0.25 的权重,不完全剔除
newV := math.Max(v*(0.25+cos*0.75), 0.21)
newD[k] = newV
} else {
newD[k] = v
}
}
// 保存 cos map 用于诊断
cosMap := make(map[string]float64)
for k, vec := range vectors {
vn := float64(norm32(vec))
if vn > 0 {
cosMap[k] = dot32_64(vec, core) / (vn * coreNorm)
}
}
cosPath := filepath.Join(storagePath, desc+"_cos.json")
_ = writeJSON(cosPath, cosMap)
return newD
}
// ---- 辅助函数 ----
// computeMul 计算某网站在繁荣值计算中的综合乘数。
// 综合考虑:最后访问时间(超过 180 天排除)、子域名数量(越多平均分越低)。
func computeMul(host string, info *storage.SiteInfo, stats *siteStats) float64 {
if len(info.OutLinks) == 0 {
return 0
}
t := info.LastVisitTime
if t == 0 {
t = 1640000000
}
days := (time.Now().Unix() - t) / (3600 * 24)
if days > 180 {
return 0 // 半年未更新,排除
}
timeMul := math.Pow(0.99, float64(days))
super := superDomain(host)
subCount := max(stats.subdomainCount[super], 1)
tplCount := 1
if info.HTMLStructure != "" {
tplCount = max(stats.templateCount[info.HTMLStructure], 1)
}
count := max(subCount, int(float64(tplCount)*1.5))
// 高频域名随机丢弃:保持最多 1000 个域名参与计算(减少重复镜像的投票)
if count > 1000 {
if rand.Float64() > 1000.0/float64(count) {
return 0
}
count = 1000
}
domainMul := 1.0 / math.Pow(math.Max(float64(count), 5)/5, 0.6)
return timeMul * domainMul
}
// superDomain 提取顶级域名(去除子域名)。
func superDomain(host string) string {
parts := strings.Split(host, ".")
if len(parts) >= 2 {
return strings.Join(parts[len(parts)-2:], ".")
}
return host
}
// ipPrefix 将 IP 列表去重排序后返回逗号拼接的 /24 前缀(用于识别同 C 段主机)。
func ipPrefix(ips []string) string {
if len(ips) == 0 {
return ""
}
sorted := sortedStrings(ips)
parts := make([]string, len(sorted))
for i, ip := range sorted {
idx := strings.LastIndex(ip, ".")
if idx > 0 {
parts[i] = ip[:idx] // 取 /24 前缀
} else {
parts[i] = ip
}
}
return strings.Join(parts, ",")
}
// decomposeURL 将 URL 分解为递增路径段(同 info 包)。
func decomposeURL(rawURL string) []string {
u := strings.ToLower(rawURL)
if strings.HasPrefix(u, "https://") {
u = u[8:]
} else if strings.HasPrefix(u, "http://") {
u = u[7:]
} else {
return nil
}
u = strings.ReplaceAll(u, "?", "/")
u = strings.ReplaceAll(u, "#", "/")
u = strings.TrimRight(u, "/")
if u == "" || u[0] == '/' || u[0] == '%' {
return nil
}
parts := strings.Split(u, "/")
var out []string
current := parts[0]
out = append(out, current)
for _, p := range parts[1:] {
current = current + "/" + p
out = append(out, current)
}
return out
}
// buildServerTable 将 Server 类型按频率降序排列,取前 63 种分配 ID(0 不用)。
func buildServerTable(serverCount map[string]int) map[string]int {
type kv struct {
k string
v int
}
var sorted []kv
for k, v := range serverCount {
sorted = append(sorted, kv{k: k, v: v})
}
for i := 0; i < len(sorted)-1; i++ {
for j := i + 1; j < len(sorted); j++ {
if sorted[j].v > sorted[i].v {
sorted[i], sorted[j] = sorted[j], sorted[i]
}
}
}
table := make(map[string]int, 63)
limit := 63
if len(sorted) < limit {
limit = len(sorted)
}
for i := 0; i < limit; i++ {
table[sorted[i].k] = i + 1
}
return table
}
func sortedStrings(s []string) []string {
cp := make([]string, len(s))
copy(cp, s)
for i := 0; i < len(cp)-1; i++ {
for j := i + 1; j < len(cp); j++ {
if cp[j] < cp[i] {
cp[i], cp[j] = cp[j], cp[i]
}
}
}
return cp
}
func norm64(v []float64) float64 {
s := 0.0
for _, x := range v {
s += x * x
}
return math.Sqrt(s)
}
func norm32(v []float32) float32 {
s := float32(0)
for _, x := range v {
s += x * x
}
return float32(math.Sqrt(float64(s)))
}
func dot32_64(a []float32, b []float64) float64 {
s := 0.0
for i := range a {
s += float64(a[i]) * b[i]
}
return s
}
func union(maps ...map[string]float64) map[string]bool {
out := make(map[string]bool)
for _, m := range maps {
for k := range m {
out[k] = true
}
}
return out
}
func writeJSON(path string, data interface{}) error {
_ = os.MkdirAll(filepath.Dir(path), 0o755)
b, err := json.MarshalIndent(data, "", " ")
if err != nil {
return err
}
return os.WriteFile(path, b, 0o644)
}
func max(a, b int) int {
if a > b {
return a
}
return b
}