From 6c2f5ad9782c9a28b9c4d3741eb7e37d5e034e54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=96=87=E5=B3=B0?= Date: Wed, 8 Apr 2026 17:29:39 +0800 Subject: [PATCH] =?UTF-8?q?Signed-off-by:=20=E5=90=B4=E6=96=87=E5=B3=B0=20?= =?UTF-8?q??= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + README.md | 89 ++++++ analyzer/analyzer.go | 250 +++++++++++++++ backlink/backlink.go | 533 +++++++++++++++++++++++++++++++ config/config.go | 53 ++++ crawler/crawler.go | 588 ++++++++++++++++++++++++++++++++++ crawler/fetcher.go | 313 +++++++++++++++++++ go.mod | 19 ++ go.sum | 36 +++ harvester/harvester.go | 327 +++++++++++++++++++ info/info.go | 206 ++++++++++++ main.go | 90 ++++++ parser/parser.go | 153 +++++++++ search/server.go | 693 +++++++++++++++++++++++++++++++++++++++++ storage/storage.go | 300 ++++++++++++++++++ 15 files changed, 3651 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 analyzer/analyzer.go create mode 100644 backlink/backlink.go create mode 100644 config/config.go create mode 100644 crawler/crawler.go create mode 100644 crawler/fetcher.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 harvester/harvester.go create mode 100644 info/info.go create mode 100644 main.go create mode 100644 parser/parser.go create mode 100644 search/server.go create mode 100644 storage/storage.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0a1b365 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +savedata diff --git a/README.md b/README.md new file mode 100644 index 0000000..270dd29 --- /dev/null +++ b/README.md @@ -0,0 +1,89 @@ +# sese-engine Go 重构版 + +Python 原版的 Go 语言重构,使用标准英文命名,单二进制部署。 + +## 目录结构 + +``` +golang/ +├── main.go # 主入口,goroutine 启动所有模块 +├── go.mod +├── config/ +│ └── config.go # 全局配置参数(对应 配置.py) +├── storage/ +│ └── storage.go # bbolt 持久化层(对应 存储.py,替换 rimo_storage) +├── crawler/ +│ ├── crawler.go # BFS 爬虫调度(对应 上网.py) +│ └── fetcher.go # HTTP 获取 + robots.txt + 限流(对应 虫.py) +├── parser/ +│ └── parser.go # HTML 解析(对应 文.py) +├── analyzer/ +│ └── analyzer.go # 分词 + 关键词权重(对应 分析.py + utils.py 分词部分) +│ 使用 gojieba(中文)+ gofasttext(语言检测) +├── harvester/ +│ └── harvester.go # 索引写入服务,监听 :5000(对应 收获服务器.py) +├── search/ +│ └── server.go # 搜索 API,监听 :80(对应 人服务器.py) +├── backlink/ +│ └── backlink.go # 反向链接计算,每 48h 运行(对应 回.py) +└── info/ + └── info.go # 繁荣表 / 调整表 / 屏蔽词加载(对应 信息.py) +``` + +## 依赖项 + +| Go 包 | 替代 Python 包 | 用途 | +|-------|--------------|------| +| `github.com/yanyiwu/gojieba` | `jieba` | 中文分词 | +| `github.com/nicholasgasior/gofasttext` | `fasttext` | 语言检测 | +| `go.etcd.io/bbolt` | `rimo_storage` | KV 存储 / 倒排索引 | +| `github.com/andybalholm/brotli` | `brotli` | 压缩 | +| `golang.org/x/net/html` | `lxml` | HTML 解析 | +| `golang.org/x/net/html/charset` | chardet | 编码检测 | + +## 构建与运行 + +```bash +cd golang + +# 下载依赖(需要 CGo 编译器,用于 gojieba / gofasttext) +go mod tidy + +# 构建 +go build -o sese-engine . + +# 运行(在 sese-engine 项目根目录下) +cd .. +./golang/sese-engine \ + --storage ./savedata \ + --entry https://zh.wikipedia.org/ \ + --fasttext ./lid.176.ftz \ + --stopwords ./data/标点符号.json +``` + +一个进程启动所有模块: +- `:5000` — 收获服务器(爬虫推送关键词) +- `:80` — 搜索 API(`GET /search?q=关键词`) +- 后台 goroutine — BFS 爬虫 +- 后台 goroutine — 每 48 小时反向链接计算 + +## 与 Python 版的主要差异 + +| 方面 | Python 版 | Go 版 | +|------|---------|-------| +| 并发 | GIL + 线程池(假并发) | goroutine 真并发 | +| 存储 | rimo_storage(自研)| bbolt(嵌入式 KV) | +| 部署 | 需要 Python 环境 | 单二进制,无运行时依赖 | +| 命名 | 全中文 | 标准英文 | +| 进程数 | 3~4 个进程 | 1 个进程多 goroutine | +| 编码检测 | requests 自动检测 | `golang.org/x/net/html/charset` | +| Prometheus | 可选 | 暂未集成(可后续添加) | + +## 注意事项 + +1. **CGo 依赖**:gojieba 和 gofasttext 均需要 C/C++ 编译器(gcc/clang)。 + Windows 下建议使用 MinGW 或 WSL。 +2. **fasttext 模型**:`lid.176.ftz` 需要与 Python 版共用,路径通过 `--fasttext` 指定。 +3. **数据迁移**:存储格式(bbolt JSON)与 Python 版(rimo_storage 二进制)不兼容, + 需要全新爬取,或编写迁移脚本。 +4. **stop words 文件**:复用 Python 版的 `data/标点符号.json`。 diff --git a/analyzer/analyzer.go b/analyzer/analyzer.go new file mode 100644 index 0000000..fd1f1ef --- /dev/null +++ b/analyzer/analyzer.go @@ -0,0 +1,250 @@ +// Package analyzer provides keyword extraction and language detection. +// +// Keyword extraction uses gojieba for Chinese segmentation and simple token +// splitting for ASCII words. Language detection uses lingua-go (pure Go, no CGo). +package analyzer + +import ( + "encoding/json" + "math" + "os" + "strings" + "sync" + "unicode" + + "github.com/pemistahl/lingua-go" + "github.com/yanyiwu/gojieba" +) + +// Keyword holds a (word, weight) pair. +type Keyword struct { + Word string `json:"word"` + Weight float32 `json:"weight"` +} + +// Analyzer wraps jieba and lingua into a thread-safe analysis pipeline. +type Analyzer struct { + jieba *gojieba.Jieba + detector lingua.LanguageDetector + stopWords map[string]bool + mu sync.Mutex // gojieba is not goroutine-safe +} + +// New creates an Analyzer. +// stopWordsPath is the JSON file with punctuation/stop words (may be empty string). +// modelPath is ignored (kept for API compatibility; lingua-go uses built-in data). +func New(modelPath, stopWordsPath string) (*Analyzer, error) { + j := gojieba.NewJieba() + + // Build a lingua detector that covers the languages we care about. + // AllLanguages() covers 75 languages including Chinese, Japanese, Korean, etc. + detector := lingua.NewLanguageDetectorBuilder(). + FromAllLanguages(). + WithMinimumRelativeDistance(0.15). + Build() + + stopWords := loadStopWords(stopWordsPath) + + return &Analyzer{ + jieba: j, + detector: detector, + stopWords: stopWords, + }, nil +} + +// Close releases resources held by the analyzer. +func (a *Analyzer) Close() { + a.jieba.Free() +} + +// loadStopWords reads a JSON array of stop-word strings. +func loadStopWords(path string) map[string]bool { + if path == "" { + return map[string]bool{} + } + f, err := os.Open(path) + if err != nil { + return map[string]bool{} + } + defer f.Close() + var words []string + if err := json.NewDecoder(f).Decode(&words); err != nil { + return map[string]bool{} + } + m := make(map[string]bool, len(words)) + for _, w := range words { + m[strings.ToLower(w)] = true + } + return m +} + +// Tokenize segments a string into tokens using jieba for CJK and space-split for ASCII. +func (a *Analyzer) Tokenize(s string, searchMode bool) []string { + if len(s) > 10000 { + s = s[:10000] + } + // Sanitize: replace invalid UTF-8 sequences so gojieba (C++) never sees decode errors. + s = strings.ToValidUTF8(s, "") + var result []string + for _, part := range strings.Fields(s) { + if isASCIIAlnum(part) { + result = append(result, part) + } else { + a.mu.Lock() + var tokens []string + if searchMode { + tokens = a.jieba.CutForSearch(part, true) + } else { + tokens = a.jieba.Cut(part, true) + } + a.mu.Unlock() + result = append(result, tokens...) + } + } + return result +} + +// Normalize strips non-alphanumeric, non-CJK characters and lowercases. +func Normalize(s string) string { + var b strings.Builder + for _, r := range s { + if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || (r >= 0x4e00 && r <= 0x9fa5) { + if r >= 'A' && r <= 'Z' { + b.WriteRune(unicode.ToLower(r)) + } else { + b.WriteRune(r) + } + } + } + return b.String() +} + +// weightedTokens builds a map of token→weight from a text with an optional weight multiplier. +func (a *Analyzer) weightedTokens(text string, w float32) map[string]float32 { + tokens := a.Tokenize(text, false) + d := make(map[string]float32) + n := math.Max(8, float64(len(tokens))) + counts := make(map[string]int) + for _, t := range tokens { + t = Normalize(t) + if t == "" || a.stopWords[t] || len(t) > 32 { + continue + } + counts[t]++ + } + for k, v := range counts { + d[k] = float32(math.Min(0.2, float64(v)/n)) * w + } + return d +} + +// Analyze extracts weighted keywords from title, description, and body text. +// Returns a slice sorted by weight descending. +func (a *Analyzer) Analyze(title, description, text string) []Keyword { + maps := []map[string]float32{ + a.weightedTokens(title, 1.0), + a.weightedTokens(description, 0.5), + a.weightedTokens(text, 1.0), + } + + combined := make(map[string]float32) + for _, m := range maps { + for k := range m { + combined[k] = 0 + } + } + for k := range combined { + for _, m := range maps { + combined[k] += m[k] + } + } + + result := make([]Keyword, 0, len(combined)) + for k, v := range combined { + result = append(result, Keyword{Word: k, Weight: v}) + } + + sortKeywords(result) + return result +} + +// Segment returns search-mode tokens for a query string. +func (a *Analyzer) Segment(query string, searchMode bool) []string { + tokens := a.Tokenize(query, searchMode) + var result []string + for _, t := range tokens { + t = Normalize(t) + if t == "" || a.stopWords[t] || len(t) > 32 { + continue + } + result = append(result, t) + } + return result +} + +// linguaToISO639 maps lingua.Language to the ISO 639-1 code used by the rest of the engine. +// Returns "" for unknown or unsupported languages. +var linguaToISO639 = map[lingua.Language]string{ + lingua.Chinese: "zh", + lingua.English: "en", + lingua.Japanese: "ja", + lingua.Korean: "ko", + lingua.French: "fr", + lingua.German: "de", + lingua.Spanish: "es", + lingua.Portuguese: "pt", + lingua.Italian: "it", + lingua.Russian: "ru", + lingua.Arabic: "ar", + lingua.Hindi: "hi", + lingua.Dutch: "nl", + lingua.Polish: "pl", + lingua.Swedish: "sv", + lingua.Turkish: "tr", + lingua.Vietnamese: "vi", + lingua.Thai: "th", + lingua.Indonesian: "id", + lingua.Malay: "ms", +} + +// DetectLanguage returns the ISO 639-1 language code for the text, or "". +func (a *Analyzer) DetectLanguage(text string) string { + text = strings.ReplaceAll(text, "\n", " ") + if len(text) > 2000 { + text = text[:2000] + } + if text == "" { + return "" + } + lang, exists := a.detector.DetectLanguageOf(text) + if !exists { + return "" + } + if code, ok := linguaToISO639[lang]; ok { + return code + } + return "" +} + +// ---- sorting ---- + +func sortKeywords(kws []Keyword) { + for i := 1; i < len(kws); i++ { + key := kws[i] + j := i - 1 + for j >= 0 && kws[j].Weight < key.Weight { + kws[j+1] = kws[j] + j-- + } + kws[j+1] = key + } +} + +func isASCIIAlnum(s string) bool { + for _, r := range s { + if !((r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9')) { + return false + } + } + return len(s) > 0 +} diff --git a/backlink/backlink.go b/backlink/backlink.go new file mode 100644 index 0000000..ae842db --- /dev/null +++ b/backlink/backlink.go @@ -0,0 +1,533 @@ +// Package backlink computes backlink (prosperity) scores for all known domains, +// using a PageRank-like algorithm over the site-level link graph. +// +// It runs every 48 hours and writes savedata/prosper.json. +package backlink + +import ( + "encoding/json" + "log" + "math" + "math/rand" + "os" + "path/filepath" + "strings" + "time" + + "sese-engine/storage" +) + +// Runner runs the backlink calculation loop. +type Runner struct { + db *storage.DB + storagePath string +} + +// New creates a Runner. +func New(db *storage.DB, storagePath string) *Runner { + return &Runner{db: db, storagePath: storagePath} +} + +// Run loops forever, recalculating every 48 hours. +func (r *Runner) Run() { + for { + // Sleep until next scheduled run (aligned to 2am) + now := time.Now() + target := time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, now.Location()) + if !target.After(now) { + target = target.Add(48 * time.Hour) + } + sleep := target.Sub(now) + log.Printf("[backlink] next run at %v (in %v)", target.Format(time.RFC3339), sleep.Round(time.Minute)) + time.Sleep(sleep) + + log.Printf("[backlink] starting computation at %v", time.Now().Format(time.RFC3339)) + if err := r.compute(); err != nil { + log.Printf("[backlink] error: %v", err) + } else { + log.Printf("[backlink] done") + } + } +} + +// RunNow runs one computation cycle immediately (for testing / manual trigger). +func (r *Runner) RunNow() error { + return r.compute() +} + +// ---- computation ---- + +type siteStats struct { + subdomainCount map[string]int // superDomain → count + templateCount map[string]int // htmlStructure → count + sameIPCount map[string]int // ipPrefix → count + serverCount map[string]int // serverType → count +} + +func (r *Runner) compute() error { + stats := r.collectStats() + + // Phase 1: HTTPS sites + d1 := r.aggregate(func(info *storage.SiteInfo) bool { + return info.HTTPSAvailable != nil && *info.HTTPSAvailable + }, stats, "https_backlink") + + // Phase 1a: second pass (echo) using d1 scores + d1a := r.aggregateWithScores(d1, stats, "echo") + + // Phase 2: HTTP-only sites + d2 := r.aggregate(func(info *storage.SiteInfo) bool { + return info.HTTPSAvailable == nil || !*info.HTTPSAvailable + }, stats, "http_backlink") + + // Merge + merged := make(map[string]float64) + for k := range union(d1, d2, d1a) { + v := d1[k] + d1a[k] + math.Min(d1[k]*0.5+d2[k]*0.1, d2[k]) + if v > 0.16 { + merged[k] = v + } + } + + // Save + path := filepath.Join(r.storagePath, "prosper.json") + if err := writeJSON(path, merged); err != nil { + return err + } + log.Printf("[backlink] wrote %d entries to %s", len(merged), path) + return nil +} + +// collectStats builds statistics about the site graph. +func (r *Runner) collectStats() *siteStats { + stats := &siteStats{ + subdomainCount: make(map[string]int), + templateCount: make(map[string]int), + sameIPCount: make(map[string]int), + serverCount: make(map[string]int), + } + + _ = r.db.ForEachSite(func(host string, info *storage.SiteInfo) error { + super := superDomain(host) + stats.subdomainCount[super]++ + + if info.HTMLStructure != "" { + stats.templateCount[info.HTMLStructure]++ + } + if len(info.IPs) > 0 { + ipStr := ipPrefix(info.IPs) + stats.sameIPCount[ipStr]++ + } + if len(info.ServerTypes) > 0 { + s := strings.Join(sortedStrings(info.ServerTypes), ",") + stats.serverCount[s]++ + } + return nil + }) + + // Prune counts below threshold + for k, v := range stats.subdomainCount { + if v < 4 { + delete(stats.subdomainCount, k) + } + } + for k, v := range stats.templateCount { + if v < 4 { + delete(stats.templateCount, k) + } + } + for k, v := range stats.sameIPCount { + if v < 4 { + delete(stats.sameIPCount, k) + } + } + return stats +} + +// aggregate computes a backlink score map for sites matching the filter. +func (r *Runner) aggregate(filter func(*storage.SiteInfo) bool, stats *siteStats, desc string) map[string]float64 { + log.Printf("[backlink] aggregating: %s", desc) + d := make(map[string]float64) + ipSource := make(map[string]float64) + + // Build server type index (top 63 most common) + serverTable := buildServerTable(stats.serverCount) + + type vectorEntry struct { + domain string + vec []float32 + } + vectors := make(map[string][]float32) + + pruneThreshold := 0.02 + i := 0 + + _ = r.db.ForEachSite(func(host string, info *storage.SiteInfo) error { + if filter != nil && !filter(info) { + return nil + } + mul := computeMul(host, info, stats) + if mul == 0 { + return nil + } + + n := len(info.OutLinks) + if n == 0 { + return nil + } + + w := 1.0 / math.Max(float64(n), 50) + xd := make(map[string]float64) + for _, link := range info.OutLinks { + for _, seg := range decomposeURL(link) { + if _, exists := xd[seg]; !exists { + xd[seg] = w + } else { + xd[seg] += w + } + } + } + + ipStr := ipPrefix(info.IPs) + serverType := "" + if len(info.ServerTypes) > 0 { + serverType = info.ServerTypes[0] + } + serverID := serverTable[serverType] + + for seg, segW := range xd { + fw := math.Min(segW, 0.15) * mul + prev := d[seg] + d[seg] = prev + fw + + if prev > 0.2 { + if _, sameIP := stats.sameIPCount[ipStr]; ipStr != "" && sameIP { + key := seg + "-" + ipStr + if ipSource[key] > 0.4 { + continue + } + ipSource[key] += fw + } + } + + if prev > 0.21 && !strings.Contains(seg, "/") && serverType != "" { + if vectors[seg] == nil { + vectors[seg] = make([]float32, 64) + } + vectors[seg][serverID] += float32(fw) + } + } + + i++ + if i%200000 == 0 { + // Prune low-score entries + for k, v := range d { + if v < pruneThreshold { + delete(d, k) + } + } + pruneThreshold *= 1.1 + } + if i%400000 == 0 { + for k, v := range ipSource { + if v < 0.04 { + delete(ipSource, k) + } + } + } + return nil + }) + + // Vectorised cosine filtering + d = vectorFilter(d, vectors, desc) + + // Prune + for k, v := range d { + if v <= 0.16 { + delete(d, k) + } + } + + log.Printf("[backlink] %s: %d entries", desc, len(d)) + return d +} + +// aggregateWithScores does a second pass weighted by existing scores. +func (r *Runner) aggregateWithScores(scores map[string]float64, stats *siteStats, desc string) map[string]float64 { + log.Printf("[backlink] aggregating with scores: %s", desc) + d := make(map[string]float64) + serverTable := buildServerTable(stats.serverCount) + vectors := make(map[string][]float32) + + _ = r.db.ForEachSite(func(host string, info *storage.SiteInfo) error { + score, ok := scores[host] + if !ok || strings.Contains(host, "/") { + return nil + } + mul := computeMul(host, info, stats) + if mul == 0 { + return nil + } + trueMul := math.Min(2, mul*math.Log2(2+score)) + + n := len(info.OutLinks) + if n == 0 { + return nil + } + w := 1.0 / math.Max(float64(n), 50) + xd := make(map[string]float64) + for _, link := range info.OutLinks { + for _, seg := range decomposeURL(link) { + xd[seg] += w + } + } + serverType := "" + if len(info.ServerTypes) > 0 { + serverType = info.ServerTypes[0] + } + serverID := serverTable[serverType] + + for seg, segW := range xd { + fw := math.Min(segW, 0.15) * trueMul + d[seg] += fw + if d[seg] > 0.21 && !strings.Contains(seg, "/") && serverType != "" { + if vectors[seg] == nil { + vectors[seg] = make([]float32, 64) + } + vectors[seg][serverID] += float32(fw) + } + } + return nil + }) + + d = vectorFilter(d, vectors, desc) + for k, v := range d { + if v <= 0.16 { + delete(d, k) + } + } + return d +} + +// ---- vector cosine filtering ---- + +func vectorFilter(d map[string]float64, vectors map[string][]float32, desc string) map[string]float64 { + // Compute core vector (sum of all) + core := make([]float64, 64) + for _, vec := range vectors { + for j, v := range vec { + core[j] += float64(v) + } + } + coreNorm := norm64(core) + if coreNorm == 0 { + return d + } + + newD := make(map[string]float64, len(d)) + for k, v := range d { + baseK := strings.Split(k, "/")[0] + if v > 0.21 && vectors[baseK] != nil { + vec := vectors[baseK] + vecNorm := float64(norm32(vec)) + if vecNorm == 0 { + newD[k] = v + continue + } + cos := dot32_64(vec, core) / (vecNorm * coreNorm) + if cos > 1.01 { + cos = 1.01 + } + newV := math.Max(v*(0.25+cos*0.75), 0.21) + newD[k] = newV + } else { + newD[k] = v + } + } + + // Save cos map for diagnostics + cosMap := make(map[string]float64) + for k, vec := range vectors { + vn := float64(norm32(vec)) + if vn > 0 { + cosMap[k] = dot32_64(vec, core) / (vn * coreNorm) + } + } + _ = writeJSON(desc+"_cos.json", cosMap) + + return newD +} + +// ---- helpers ---- + +func computeMul(host string, info *storage.SiteInfo, stats *siteStats) float64 { + if len(info.OutLinks) == 0 { + return 0 + } + t := info.LastVisitTime + if t == 0 { + t = 1640000000 + } + days := (time.Now().Unix() - t) / (3600 * 24) + if days > 180 { + return 0 + } + timeMul := math.Pow(0.99, float64(days)) + + super := superDomain(host) + subCount := max(stats.subdomainCount[super], 1) + tplCount := 1 + if info.HTMLStructure != "" { + tplCount = max(stats.templateCount[info.HTMLStructure], 1) + } + count := max(subCount, int(float64(tplCount)*1.5)) + if count > 1000 { + if rand.Float64() > 1000.0/float64(count) { + return 0 + } + count = 1000 + } + domainMul := 1.0 / math.Pow(math.Max(float64(count), 5)/5, 0.6) + return timeMul * domainMul +} + +func superDomain(host string) string { + parts := strings.Split(host, ".") + if len(parts) >= 2 { + return strings.Join(parts[len(parts)-2:], ".") + } + return host +} + +func ipPrefix(ips []string) string { + if len(ips) == 0 { + return "" + } + sorted := sortedStrings(ips) + parts := make([]string, len(sorted)) + for i, ip := range sorted { + idx := strings.LastIndex(ip, ".") + if idx > 0 { + parts[i] = ip[:idx] + } else { + parts[i] = ip + } + } + return strings.Join(parts, ",") +} + +func decomposeURL(rawURL string) []string { + u := strings.ToLower(rawURL) + if strings.HasPrefix(u, "https://") { + u = u[8:] + } else if strings.HasPrefix(u, "http://") { + u = u[7:] + } else { + return nil + } + u = strings.ReplaceAll(u, "?", "/") + u = strings.ReplaceAll(u, "#", "/") + u = strings.TrimRight(u, "/") + if u == "" || u[0] == '/' || u[0] == '%' { + return nil + } + parts := strings.Split(u, "/") + var out []string + current := parts[0] + out = append(out, current) + for _, p := range parts[1:] { + current = current + "/" + p + out = append(out, current) + } + return out +} + +func buildServerTable(serverCount map[string]int) map[string]int { + type kv struct { + k string + v int + } + var sorted []kv + for k, v := range serverCount { + sorted = append(sorted, kv{k, v}) + } + for i := 0; i < len(sorted)-1; i++ { + for j := i + 1; j < len(sorted); j++ { + if sorted[j].v > sorted[i].v { + sorted[i], sorted[j] = sorted[j], sorted[i] + } + } + } + table := make(map[string]int, 63) + limit := 63 + if len(sorted) < limit { + limit = len(sorted) + } + for i := 0; i < limit; i++ { + table[sorted[i].k] = i + 1 + } + return table +} + +func sortedStrings(s []string) []string { + cp := make([]string, len(s)) + copy(cp, s) + for i := 0; i < len(cp)-1; i++ { + for j := i + 1; j < len(cp); j++ { + if cp[j] < cp[i] { + cp[i], cp[j] = cp[j], cp[i] + } + } + } + return cp +} + +func norm64(v []float64) float64 { + s := 0.0 + for _, x := range v { + s += x * x + } + return math.Sqrt(s) +} + +func norm32(v []float32) float32 { + s := float32(0) + for _, x := range v { + s += x * x + } + return float32(math.Sqrt(float64(s))) +} + +func dot32_64(a []float32, b []float64) float64 { + s := 0.0 + for i := range a { + s += float64(a[i]) * b[i] + } + return s +} + +func union(maps ...map[string]float64) map[string]bool { + out := make(map[string]bool) + for _, m := range maps { + for k := range m { + out[k] = true + } + } + return out +} + +func writeJSON(path string, data interface{}) error { + _ = os.MkdirAll(filepath.Dir(path), 0o755) + b, err := json.MarshalIndent(data, "", " ") + if err != nil { + return err + } + return os.WriteFile(path, b, 0o644) +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..f0a2337 --- /dev/null +++ b/config/config.go @@ -0,0 +1,53 @@ +// Package config holds all global configuration parameters for sese-engine. +package config + +// Index / storage limits +const ( + MaxURLsPerKey = 11000 // max URLs stored per index key + MaxSameDomainPerKey = 20 // max URLs from the same domain per key + BigCleanThreshold = 10000000 // flush in-memory index after this many rows + MaxNewURLsPerKey = 10000 // cap on new URLs added per key per flush + MinURLsForNewKey = 3 // discard new keys with fewer than this many URLs +) + +// Crawler settings +const ( + SpiderName = "loli_spider" + CrawlerCooldown = 3 // seconds between requests to the same host + CrawlerWorkers = 22 // goroutine pool size for crawling + CrawlFocus = 0.7 // concentration factor — higher = more focused on single domain + MaxKeywordsPerPage = 250 + MaxEpoch = 100 + ExpectedProsperRatio = 0.6 // fraction of queue that should be "prosperous" (high backlink) domains + EntryURL = "https://zh.wikipedia.org/" +) + +// Search / ranking weights +const ( + UseOnlineSnippet = true + OnlineSnippetTimeout = 3 // seconds + WeightDailyDecay = 0.996 + LanguageWeight = 0.5 + ConsecutiveKeyWeight = 1.3 + BacklinkWeight = 1.0 + SearchServerPort = 80 +) + +// Backlink computation +const ( + BacklinkBaseline = 200000 // normalization divisor for backlink scores +) + +// Storage path (relative to process working directory) +const StoragePath = "./savedata" + +// Prometheus ports +const ( + PromPortCrawler = 14950 + PromPortHarvester = 14951 + PromPortBacklink = 14952 + PromPortSearch = 14953 +) + +// Harvester HTTP endpoint +const HarvesterAddr = "http://127.0.0.1:5000" diff --git a/crawler/crawler.go b/crawler/crawler.go new file mode 100644 index 0000000..17457e1 --- /dev/null +++ b/crawler/crawler.go @@ -0,0 +1,588 @@ +// crawler.go — BFS crawl loop, URL scheduling, and site-info updating. +package crawler + +import ( + "bytes" + "encoding/json" + "log" + "math" + "math/rand" + "net/http" + "net/url" + "strings" + "sync" + "sync/atomic" + "time" + + "sese-engine/analyzer" + "sese-engine/config" + "sese-engine/parser" + "sese-engine/storage" +) + + +// Stats holds real-time crawl counters (read with atomic). +type Stats struct { + VisitedURLs int64 + SuccessURLs int64 + KeywordsFetched int64 +} + +// Crawler orchestrates the BFS crawl. +type Crawler struct { + fetcher *Fetcher + db *storage.DB + analyzer *analyzer.Analyzer + prosperMap map[string]float64 // domain → backlink score (loaded from info) + stats Stats +} + +// New creates a Crawler. +func New(db *storage.DB, a *analyzer.Analyzer, prosperMap map[string]float64) *Crawler { + return &Crawler{ + fetcher: NewFetcher(config.SpiderName, config.CrawlerCooldown*time.Second), + db: db, + analyzer: a, + prosperMap: prosperMap, + } +} + +// URLWeight pairs a URL with its discovery weight. +type URLWeight struct { + URL string + Weight float64 +} + +// Run starts the BFS crawl from entryURL, running for maxEpoch rounds. +// It blocks until completion. +func (c *Crawler) Run(entryURL string, maxEpoch int) { + visited := make(map[string]bool) + queue := []string{entryURL} + + for ep := 0; ep < maxEpoch; ep++ { + log.Printf("[crawler] epoch %d/%d queue=%d", ep+1, maxEpoch, len(queue)) + for _, u := range queue { + visited[u] = true + } + + var ( + newLinks []URLWeight + mu sync.Mutex + wg sync.WaitGroup + ) + + sem := make(chan struct{}, config.CrawlerWorkers) + for _, u := range queue { + wg.Add(1) + sem <- struct{}{} + go func(rawURL string) { + defer wg.Done() + defer func() { <-sem }() + hrefs := c.visitURL(rawURL) + n := len(hrefs) + if n > 0 { + w := 1.0 / float64(n) + mu.Lock() + for _, h := range hrefs { + if !visited[h] { + newLinks = append(newLinks, URLWeight{URL: h, Weight: w}) + } + } + mu.Unlock() + } + }(u) + } + wg.Wait() + + if len(newLinks) == 0 { + log.Println("[crawler] empty queue — stopping") + return + } + + queue = c.schedule(newLinks) + } +} + +// visitURL fetches a URL, stores keywords, updates site info, returns discovered hrefs. +func (c *Crawler) visitURL(rawURL string) []string { + atomic.AddInt64(&c.stats.VisitedURLs, 1) + + res, err := c.fetcher.fetchWithHistory(rawURL, true, 10*time.Second, 0) + if err != nil || res == nil { + c.updateSiteFailure(rawURL) + return nil + } + + atomic.AddInt64(&c.stats.SuccessURLs, 1) + + title, desc, text, hrefs := parser.ParseHTML(res.Body, res.FinalURL) + + // Cache snippet + if len(res.FinalURL) < 250 { + _ = c.db.SetSnippet(res.FinalURL, &storage.SnippetEntry{ + Title: title, + Description: truncate(desc, 256), + Text: truncate(text, 256), + Timestamp: time.Now().Unix(), + }) + } + + // Keyword extraction → send to harvester + kws := c.analyzer.Analyze(title, desc, text) + if len(kws) > 0 { + if len(kws) > config.MaxKeywordsPerPage { + kws = kws[:config.MaxKeywordsPerPage] + } + atomic.AddInt64(&c.stats.KeywordsFetched, int64(len(kws))) + go c.sendToHarvester(res.FinalURL, kws) + } + + // Update site info + host := netloc(res.FinalURL) + c.updateSiteSuccess(host, res, title, desc, text, hrefs) + + // Handle permanent redirects in site info + for from, to := range res.Redirects { + fromHost := netloc(from) + if fromHost == "" { + continue + } + info, _ := c.db.GetSiteInfo(fromHost) + if info.Redirects == nil { + info.Redirects = make(map[string]string) + } + info.Redirects[from] = to + if len(info.Redirects) > 50 { + // keep most important (just truncate randomly for now) + info.Redirects = truncateMap(info.Redirects, 40) + } + _ = c.db.SetSiteInfo(fromHost, info) + } + + // Trim hrefs + if len(hrefs) > 100 { + hrefs = sampleStrings(hrefs, 100) + } + return hrefs +} + +func (c *Crawler) updateSiteFailure(rawURL string) { + host := netloc(rawURL) + if host == "" { + return + } + info, _ := c.db.GetSiteInfo(host) + if info.SuccessRate == nil { + zero := 0.0 + info.SuccessRate = &zero + } + *info.SuccessRate *= 0.99 + _ = c.db.SetSiteInfo(host, info) +} + +func (c *Crawler) updateSiteSuccess(host string, res *FetchResult, title, desc, text string, hrefs []string) { + info, _ := c.db.GetSiteInfo(host) + + info.VisitCount++ + info.LastVisitTime = time.Now().Unix() + + one := 1.0 + if info.SuccessRate == nil { + info.SuccessRate = &one + } + *info.SuccessRate = *info.SuccessRate*0.99 + 0.01 + + if strings.HasPrefix(res.FinalURL, "https://") { + t := true + info.HTTPSAvailable = &t + } + + if res.ServerType != "" { + found := false + for _, s := range info.ServerTypes { + if s == res.ServerType { + found = true + break + } + } + if !found { + info.ServerTypes = append(info.ServerTypes, res.ServerType) + if len(info.ServerTypes) > 5 { + info.ServerTypes = info.ServerTypes[len(info.ServerTypes)-5:] + } + } + } + + // Language detection — sample 10% or first 10 visits + if info.VisitCount < 10 || rand.Float64() < 0.1 { + lang := c.analyzer.DetectLanguage(title + " " + desc + " " + text) + if lang != "" { + if info.Languages == nil { + info.Languages = make(map[string]float64) + } + intensity := math.Min(0.2, 1/math.Sqrt(float64(info.VisitCount+1))) + for k := range info.Languages { + info.Languages[k] *= (1 - intensity) + } + info.Languages[lang] += intensity + } + // Collect external links + superHost := superNetloc(res.FinalURL) + var external []string + for _, h := range hrefs { + if superNetloc(h) != superHost { + external = append(external, h) + } + } + sampled := sampleStrings(external, 10) + info.OutLinks = append(info.OutLinks, sampled...) + if len(info.OutLinks) > 250 { + info.OutLinks = sampleStrings(info.OutLinks, 200) + } + } + + _ = c.db.SetSiteInfo(host, info) +} + +// sendToHarvester POSTs keyword data to the harvester service. +func (c *Crawler) sendToHarvester(finalURL string, kws []analyzer.Keyword) { + type payload struct { + URL string `json:"url"` + Keywords []analyzer.Keyword `json:"keywords"` + } + p := payload{URL: finalURL, Keywords: kws} + data, err := json.Marshal(p) + if err != nil { + return + } + resp, err := http.Post(config.HarvesterAddr+"/l", "application/json", bytes.NewReader(data)) + if err != nil { + log.Printf("[crawler] harvester post failed: %v", err) + return + } + resp.Body.Close() +} + +// schedule selects and prioritises the next BFS queue from raw discovered links. +func (c *Crawler) schedule(links []URLWeight) []string { + if len(links) > 100000 { + links = sampleURLWeights(links, 100000) + } + + // Pre-fetch site info for all involved domains + domains := make(map[string]bool) + for _, lw := range links { + if h := netloc(lw.URL); h != "" { + domains[h] = true + } + if h := superNetloc(lw.URL); h != "" { + domains[h] = true + } + } + siteCache := make(map[string]*storage.SiteInfo, len(domains)) + var mu sync.Mutex + var wg sync.WaitGroup + for d := range domains { + wg.Add(1) + go func(host string) { + defer wg.Done() + info, _ := c.db.GetSiteInfo(host) + mu.Lock() + siteCache[host] = info + mu.Unlock() + }(d) + } + wg.Wait() + + // Score each URL + scored_list := make([]scoredURL, len(links)) + for i, lw := range links { + scored_list[i] = scoredURL{url: lw.URL, score: c.scoreURL(lw, siteCache)} + } + + // Weighted random sample (45000 or 1/3+250 whichever smaller) + k := min(45000, len(scored_list)/3+250) + selected := weightedSample(scored_list, k) + + // Domain concentration filtering + selected = concentrationFilter(selected, config.CrawlFocus) + + // Separate https/http, cap http at 1/4 of https count + var httpsURLs, httpURLs []string + for _, s := range selected { + if strings.HasPrefix(s, "https://") { + httpsURLs = append(httpsURLs, s) + } else { + httpURLs = append(httpURLs, s) + } + } + maxHTTP := len(httpsURLs) / 4 + if len(httpURLs) > maxHTTP { + httpURLs = sampleStrings(httpURLs, maxHTTP) + } + + // Separate prosperous / non-prosperous + var prosperURLs, otherURLs []string + for _, u := range append(httpsURLs, httpURLs...) { + if c.prosperMap[netloc(u)] > 0 { + prosperURLs = append(prosperURLs, u) + } else { + otherURLs = append(otherURLs, u) + } + } + n := int(float64(len(prosperURLs)) * (1-config.ExpectedProsperRatio) / config.ExpectedProsperRatio) + if len(otherURLs) > n { + keep := max(len(otherURLs)-len(selected)/10, n) + if keep < len(otherURLs) { + otherURLs = sampleStrings(otherURLs, keep) + } + } + + result := append(prosperURLs, otherURLs...) + rand.Shuffle(len(result), func(i, j int) { result[i], result[j] = result[j], result[i] }) + return result +} + +// scoreURL computes the scheduling priority for a URL. +func (c *Crawler) scoreURL(lw URLWeight, siteCache map[string]*storage.SiteInfo) float64 { + host := netloc(lw.URL) + super := superNetloc(lw.URL) + + info := siteCache[host] + if info == nil { + info = &storage.SiteInfo{} + } + + // Chinese-ness + var chineseness float64 = 0.5 + if len(info.Languages) > 0 { + total := 0.0 + for _, v := range info.Languages { + total += v + } + if total > 0 { + chineseness = info.Languages["zh"] / total + } + } + + // Interest decay based on visit count + prosper := math.Min(62, c.prosperMap[host]) + limit := prosper*500 + 50 + b := math.Pow(0.1, 1/limit) + interest := math.Pow(b, float64(info.VisitCount)) + + var interest2 float64 = 1.0 + if super != host { + superInfo := siteCache[super] + if superInfo != nil { + limit2 := math.Min(62, c.prosperMap[super])*500 + 50 + b2 := math.Pow(0.1, 1/limit2) + interest2 = math.Pow(b2, float64(superInfo.VisitCount)) + } + } + + quality := 1.0 + if info.Quality != nil { + quality = *info.Quality + } + + prosperity := prosper + if prosperity > 0 { + prosperity += 0.5 + } + prosperity = math.Log2(2+prosperity) + 1 + + bad := badURL(lw.URL) + return (0.1 + chineseness) * math.Min(0.05+interest, 0.05+interest2) * quality * (1 - bad) * lw.Weight * prosperity +} + +// ---- helper functions ---- + +func netloc(rawURL string) string { + parts := strings.SplitN(rawURL, "/", 4) + if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" { + return parts[2] + } + u, err := url.Parse(rawURL) + if err != nil { + return "" + } + return u.Host +} + +// superNetloc returns "domain.tld" (strips subdomains). +func superNetloc(rawURL string) string { + host := netloc(rawURL) + parts := strings.Split(host, ".") + if len(parts) >= 2 { + return strings.Join(parts[len(parts)-2:], ".") + } + return host +} + +func badURL(u string) float64 { + s := math.Max(0, float64(len(u)-30)/200.0) + if strings.Contains(u, ".htm") || strings.Contains(u, ".php") { + s += (1 - s) * 0.3 + } + if strings.Count(strings.TrimRight(u, "/"), "/") > 2 { + s += (1 - s) * 0.1 + } + if len(u) < 5 || u[4] == ':' { + s += (1 - s) * 0.3 + } + return math.Min(s, 0.9) +} + +func truncate(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] +} + +func sampleStrings(s []string, n int) []string { + if len(s) <= n { + return s + } + perm := rand.Perm(len(s)) + out := make([]string, n) + for i := range out { + out[i] = s[perm[i]] + } + return out +} + +func sampleURLWeights(s []URLWeight, n int) []URLWeight { + if len(s) <= n { + return s + } + perm := rand.Perm(len(s)) + out := make([]URLWeight, n) + for i := range out { + out[i] = s[perm[i]] + } + return out +} + +type scoredURL struct { + url string + score float64 +} + +func weightedSample(items []scoredURL, k int) []string { + if k >= len(items) { + out := make([]string, len(items)) + for i, s := range items { + out[i] = s.url + } + return out + } + // Simple weighted sampling without replacement using alias method approximation + totalWeight := 0.0 + for _, s := range items { + totalWeight += s.score + } + selected := make(map[int]bool) + out := make([]string, 0, k) + for len(out) < k && len(selected) < len(items) { + r := rand.Float64() * totalWeight + cum := 0.0 + for i, s := range items { + if selected[i] { + continue + } + cum += s.score + if cum >= r { + selected[i] = true + out = append(out, s.url) + totalWeight -= s.score + break + } + } + } + return out +} + +func concentrationFilter(urls []string, k float64) []string { + domainGroups := make(map[string][]string) + shuffled := make([]string, len(urls)) + copy(shuffled, urls) + rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) + + for _, u := range shuffled { + d := superNetloc(u) + domainGroups[d] = append(domainGroups[d], u) + } + + limit := 10 + if len(domainGroups) > 1 { + sizes := make([]int, 0, len(domainGroups)) + for _, g := range domainGroups { + sizes = append(sizes, int(math.Pow(float64(len(g)), k))) + } + // sort sizes ascending, drop last (largest) + for i := 0; i < len(sizes)-1; i++ { + for j := i + 1; j < len(sizes)-1; j++ { + if sizes[j] < sizes[i] { + sizes[i], sizes[j] = sizes[j], sizes[i] + } + } + } + total := 0 + for _, s := range sizes[:len(sizes)-1] { + total += s + } + limit = max(10, int(float64(total)*0.6)) + } + + var result []string + for _, g := range domainGroups { + sn := 1 + min(limit, int(math.Pow(float64(len(g)), k))) + if sn > len(g) { + sn = len(g) + } + result = append(result, g[:sn]...) + } + rand.Shuffle(len(result), func(i, j int) { result[i], result[j] = result[j], result[i] }) + return result +} + +func truncateMap(m map[string]string, n int) map[string]string { + if len(m) <= n { + return m + } + out := make(map[string]string, n) + i := 0 + for k, v := range m { + if i >= n { + break + } + out[k] = v + i++ + } + return out +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} + +// Expose Stats for monitoring. +func (c *Crawler) GetStats() Stats { + return Stats{ + VisitedURLs: atomic.LoadInt64(&c.stats.VisitedURLs), + SuccessURLs: atomic.LoadInt64(&c.stats.SuccessURLs), + KeywordsFetched: atomic.LoadInt64(&c.stats.KeywordsFetched), + } +} diff --git a/crawler/fetcher.go b/crawler/fetcher.go new file mode 100644 index 0000000..2860b5b --- /dev/null +++ b/crawler/fetcher.go @@ -0,0 +1,313 @@ +// Package crawler implements the HTTP fetching layer with robots.txt compliance, +// per-host rate limiting, redirect tracking, and encoding detection. +package crawler + +import ( + "fmt" + "io" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "golang.org/x/net/html/charset" +) + +// ErrCrawl is returned for expected crawl failures (404, disallowed, wrong content type…). +type ErrCrawl struct { + Msg string +} + +func (e *ErrCrawl) Error() string { return e.Msg } + +// FetchResult bundles the result of a successful fetch. +type FetchResult struct { + Body string // decoded HTML body + FinalURL string // URL after redirects + Redirects map[string]string // permanent redirects: from → to + ServerType string +} + +// Fetcher is a reusable HTTP client with robots.txt awareness and rate limiting. +type Fetcher struct { + client *http.Client + userAgent string + cooldown time.Duration + + rateMu sync.Mutex + lastHit map[string]time.Time // host → last request time + + robotsMu sync.Mutex + robots map[string]*robotsEntry // host → parsed robots +} + +type robotsEntry struct { + rules []robotsRule + fetchedAt time.Time +} + +type robotsRule struct { + userAgent string + disallow []string + allow []string +} + +// NewFetcher creates a Fetcher with the given user-agent and per-host cooldown. +func NewFetcher(userAgent string, cooldown time.Duration) *Fetcher { + return &Fetcher{ + client: &http.Client{ + Timeout: 30 * time.Second, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) >= 10 { + return fmt.Errorf("too many redirects") + } + return nil + }, + }, + userAgent: userAgent, + cooldown: cooldown, + lastHit: make(map[string]time.Time), + robots: make(map[string]*robotsEntry), + } +} + +// Fetch fetches url, respecting robots.txt and rate limits. +// polite=false skips both checks (used by search server snippet fetcher). +func (f *Fetcher) Fetch(rawURL string, polite bool, timeout time.Duration, sizeLimit int) (*FetchResult, error) { + return f.fetchWithHistory(rawURL, polite, timeout, sizeLimit) +} + +// FetchSafe wraps Fetch and returns (nil, nil) on expected errors. +func (f *Fetcher) FetchSafe(rawURL string, polite bool, timeout time.Duration, sizeLimit int) (*FetchResult, error) { + res, err := f.fetchWithHistory(rawURL, polite, timeout, sizeLimit) + if _, ok := err.(*ErrCrawl); ok { + return nil, nil + } + return res, err +} + +// fetchWithHistory does the actual request and populates redirect history. +func (f *Fetcher) fetchWithHistory(rawURL string, polite bool, timeout time.Duration, sizeLimit int) (*FetchResult, error) { + parsed, err := url.Parse(rawURL) + if err != nil { + return nil, &ErrCrawl{Msg: "invalid url: " + err.Error()} + } + host := parsed.Host + + if polite { + f.rateLimit(host) + if !f.robotsAllowed(rawURL, host) { + return nil, &ErrCrawl{Msg: "disallowed by robots.txt"} + } + } + + redirects := make(map[string]string) + client := &http.Client{ + Timeout: timeout, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) >= 10 { + return fmt.Errorf("too many redirects") + } + if req.Response != nil && (req.Response.StatusCode == 301 || req.Response.StatusCode == 308) { + from := via[len(via)-1].URL.String() + to := req.URL.String() + redirects[from] = to + } + return nil + }, + } + + req, _ := http.NewRequest("GET", rawURL, nil) + req.Header.Set("User-Agent", f.userAgent) + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == 404 { + return nil, &ErrCrawl{Msg: "404 not found"} + } + if resp.StatusCode >= 400 { + return nil, &ErrCrawl{Msg: fmt.Sprintf("HTTP %d", resp.StatusCode)} + } + ct := resp.Header.Get("Content-Type") + if !strings.Contains(ct, "text/html") { + return nil, &ErrCrawl{Msg: "not html: " + ct} + } + + body, err := decodeBody(resp.Body, ct, sizeLimit) + if err != nil { + return nil, err + } + + return &FetchResult{ + Body: body, + FinalURL: resp.Request.URL.String(), + Redirects: redirects, + ServerType: resp.Header.Get("Server"), + }, nil +} + +// rateLimit sleeps if the last request to host was too recent. +func (f *Fetcher) rateLimit(host string) { + f.rateMu.Lock() + last, ok := f.lastHit[host] + now := time.Now() + f.lastHit[host] = now + // Periodically prune the map + if len(f.lastHit) > 10000 { + cutoff := now.Add(-f.cooldown * 2) + for k, v := range f.lastHit { + if v.Before(cutoff) { + delete(f.lastHit, k) + } + } + } + f.rateMu.Unlock() + + if ok { + elapsed := now.Sub(last) + if elapsed < f.cooldown { + time.Sleep(f.cooldown - elapsed) + } + } +} + +// robotsAllowed returns true if rawURL is crawlable. +func (f *Fetcher) robotsAllowed(rawURL, host string) bool { + f.robotsMu.Lock() + entry, ok := f.robots[host] + f.robotsMu.Unlock() + + if !ok || time.Since(entry.fetchedAt) > 24*time.Hour { + entry = f.fetchRobots(host, rawURL) + f.robotsMu.Lock() + f.robots[host] = entry + f.robotsMu.Unlock() + } + + parsed, err := url.Parse(rawURL) + if err != nil { + return false + } + path := parsed.Path + if path == "" { + path = "/" + } + + for _, rule := range entry.rules { + if rule.userAgent != "*" && !strings.EqualFold(rule.userAgent, f.userAgent) { + continue + } + // Check allow first (higher priority) + for _, a := range rule.allow { + if strings.HasPrefix(path, a) { + return true + } + } + for _, dis := range rule.disallow { + if dis != "" && strings.HasPrefix(path, dis) { + return false + } + } + } + return true +} + +// fetchRobots downloads and parses robots.txt for a host. +func (f *Fetcher) fetchRobots(host, exampleURL string) *robotsEntry { + entry := &robotsEntry{fetchedAt: time.Now()} + scheme := "https" + if strings.HasPrefix(exampleURL, "http://") { + scheme = "http" + } + robotsURL := fmt.Sprintf("%s://%s/robots.txt", scheme, host) + + client := &http.Client{Timeout: 5 * time.Second} + req, _ := http.NewRequest("GET", robotsURL, nil) + req.Header.Set("User-Agent", f.userAgent) + resp, err := client.Do(req) + if err != nil || resp.StatusCode != 200 { + return entry // allow all if robots.txt unavailable + } + defer resp.Body.Close() + + body, err := io.ReadAll(io.LimitReader(resp.Body, 256*1024)) + if err != nil { + return entry + } + entry.rules = parseRobots(string(body)) + return entry +} + +// parseRobots is a minimal robots.txt parser. +func parseRobots(content string) []robotsRule { + var rules []robotsRule + var current *robotsRule + for _, line := range strings.Split(content, "\n") { + line = strings.TrimSpace(line) + if idx := strings.Index(line, "#"); idx >= 0 { + line = line[:idx] + } + if line == "" { + if current != nil { + rules = append(rules, *current) + current = nil + } + continue + } + parts := strings.SplitN(line, ":", 2) + if len(parts) != 2 { + continue + } + key := strings.TrimSpace(strings.ToLower(parts[0])) + val := strings.TrimSpace(parts[1]) + switch key { + case "user-agent": + if current == nil { + current = &robotsRule{userAgent: val} + } else { + current.userAgent = val + } + case "disallow": + if current != nil { + current.disallow = append(current.disallow, val) + } + case "allow": + if current != nil { + current.allow = append(current.allow, val) + } + } + } + if current != nil { + rules = append(rules, *current) + } + return rules +} + +// decodeBody reads at most sizeLimit bytes from r, auto-detecting charset. +func decodeBody(r io.Reader, contentType string, sizeLimit int) (string, error) { + var reader io.Reader = r + if sizeLimit > 0 { + reader = io.LimitReader(r, int64(sizeLimit)) + } + + // Use golang.org/x/net/html/charset for auto-detection + utf8Reader, err := charset.NewReader(reader, contentType) + if err != nil { + // Fall back to reading raw and hoping for UTF-8 + data, readErr := io.ReadAll(reader) + if readErr != nil { + return "", readErr + } + return string(data), nil + } + data, err := io.ReadAll(utf8Reader) + if err != nil { + return "", err + } + return string(data), nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8561871 --- /dev/null +++ b/go.mod @@ -0,0 +1,19 @@ +module sese-engine + +go 1.21 + +require ( + github.com/andybalholm/brotli v1.1.0 + github.com/pemistahl/lingua-go v1.4.0 + github.com/yanyiwu/gojieba v1.4.4 + go.etcd.io/bbolt v1.3.9 + golang.org/x/net v0.23.0 +) + +require ( + github.com/shopspring/decimal v1.3.1 // indirect + golang.org/x/exp v0.0.0-20221106115401-f9659909a136 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d83cb28 --- /dev/null +++ b/go.sum @@ -0,0 +1,36 @@ +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pemistahl/lingua-go v1.4.0 h1:ifYhthrlW7iO4icdubwlduYnmwU37V1sbNrwhKBR4rM= +github.com/pemistahl/lingua-go v1.4.0/go.mod h1:ECuM1Hp/3hvyh7k8aWSqNCPlTxLemFZsRjocUf3KgME= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yanyiwu/gojieba v1.4.4 h1:Iukkf8WlIfqAKtsGZjUhGR1ArKa7DtLDNmW8bvUI8JI= +github.com/yanyiwu/gojieba v1.4.4/go.mod h1:JUq4DddFVGdHXJHxxepxRmhrKlDpaBxR8O28v6fKYLY= +go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= +go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= +golang.org/x/exp v0.0.0-20221106115401-f9659909a136 h1:Fq7F/w7MAa1KJ5bt2aJ62ihqp9HDcRuyILskkpIAurw= +golang.org/x/exp v0.0.0-20221106115401-f9659909a136/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/harvester/harvester.go b/harvester/harvester.go new file mode 100644 index 0000000..7b13d7c --- /dev/null +++ b/harvester/harvester.go @@ -0,0 +1,327 @@ +// Package harvester implements the index-writing server (port 5000). +// +// It receives (url, keywords) payloads from the crawler, accumulates them in +// memory, then flushes to the persistent inverted index when the in-memory +// row count exceeds the configured threshold. +package harvester + +import ( + "encoding/json" + "log" + "math/rand" + "net/http" + "strings" + "sync" + "sync/atomic" + + "sese-engine/config" + "sese-engine/info" + "sese-engine/storage" +) + +// Server is the harvester HTTP server. +type Server struct { + db *storage.DB + + // in-memory accumulator: keyword → [(weight, url)] + mem map[string][]storage.IndexEntry + memMu sync.Mutex + + rowCount int64 // approximate total in-memory rows + flushMu sync.Mutex // only one flush at a time + + infoSvc *info.Service +} + +// New creates a harvester Server. +func New(db *storage.DB, infoSvc *info.Service) *Server { + return &Server{ + db: db, + mem: make(map[string][]storage.IndexEntry), + infoSvc: infoSvc, + } +} + +// ingestPayload is the JSON body sent by the crawler. +type ingestPayload struct { + URL string `json:"url"` + Keywords []struct { + Word string `json:"word"` + Weight float32 `json:"weight"` + } `json:"keywords"` +} + +// Handler returns the http.Handler for the harvester. +func (s *Server) Handler() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("/l", s.handleIngest) + return mux +} + +func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + var payload ingestPayload + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + http.Error(w, "bad json: "+err.Error(), http.StatusBadRequest) + return + } + + // Sanitise URL + payload.URL = strings.ReplaceAll(payload.URL, "\n", "") + if payload.URL == "" { + http.Error(w, "empty url", http.StatusBadRequest) + return + } + + s.memMu.Lock() + for _, kw := range payload.Keywords { + key := kw.Word + entries := s.mem[key] + + // Threshold-based early discard + if len(entries) > 15 { + low := s.lowThreshold(key) + if float64(kw.Weight) < low { + continue + } + } + s.mem[key] = append(entries, storage.IndexEntry{ + Weight: kw.Weight, + URL: payload.URL, + }) + atomic.AddInt64(&s.rowCount, 1) + } + s.memMu.Unlock() + + // Check if we should flush + if atomic.LoadInt64(&s.rowCount) > int64(config.BigCleanThreshold) { + go s.flush() + } + + w.Write([]byte("ok")) +} + +// lowThreshold returns the minimum weight needed to enter the index for key. +func (s *Server) lowThreshold(key string) float64 { + existing, _ := s.db.GetIndex(key) + if len(existing) < config.MaxURLsPerKey { + return -1 + } + // Find the config.MaxURLsPerKey-th highest weight + weights := make([]float64, len(existing)) + for i, e := range existing { + weights[i] = float64(e.Weight) + } + // Partial sort: find threshold at position MaxURLsPerKey-1 + return nthLargest(weights, config.MaxURLsPerKey-1) * 0.05 +} + +// flush merges the in-memory accumulator into the persistent index. +func (s *Server) flush() { + if !s.flushMu.TryLock() { + return // another flush is running + } + defer s.flushMu.Unlock() + + s.memMu.Lock() + snapshot := s.mem + s.mem = make(map[string][]storage.IndexEntry) + atomic.StoreInt64(&s.rowCount, 0) + s.memMu.Unlock() + + log.Printf("[harvester] flushing %d keys", len(snapshot)) + + items := make([]struct { + key string + entries []storage.IndexEntry + }, 0, len(snapshot)) + for k, v := range snapshot { + items = append(items, struct { + key string + entries []storage.IndexEntry + }{k, v}) + } + rand.Shuffle(len(items), func(i, j int) { items[i], items[j] = items[j], items[i] }) + + // Parallel merge + type result struct { + key string + entries []storage.IndexEntry + } + results := make(chan result, len(items)) + sem := make(chan struct{}, 8) + + for _, item := range items { + sem <- struct{}{} + go func(k string, newEntries []storage.IndexEntry) { + defer func() { <-sem }() + merged := s.mergeKey(k, newEntries) + results <- result{k, merged} + }(item.key, item.entries) + } + + // Collect + batch := make(map[string][]storage.IndexEntry, len(items)) + for range items { + r := <-results + batch[r.key] = r.entries + } + + if err := s.db.BatchSetIndex(batch); err != nil { + log.Printf("[harvester] flush write error: %v", err) + } + log.Printf("[harvester] flush done, %d keys written", len(batch)) +} + +// mergeKey merges new entries with existing index entries for a key. +func (s *Server) mergeKey(key string, newEntries []storage.IndexEntry) []storage.IndexEntry { + existing, _ := s.db.GetIndex(key) + + // Discard new key if too few URLs + if len(existing) == 0 && len(newEntries) < config.MinURLsForNewKey { + return nil + } + + merged := dedup(append(newEntries, existing...)) + + // Occasional URL normalisation dedup + if rand.Float64() < 0.02 { + merged = dedupNormalised(merged) + } + + // Trim if over limit + if float64(len(merged)) > float64(config.MaxURLsPerKey)*1.1 || rand.Float64() < 0.02 { + merged = trim(merged, s.infoSvc, config.MaxURLsPerKey, config.MaxSameDomainPerKey) + } + + return merged +} + +// ---- helpers ---- + +func dedup(entries []storage.IndexEntry) []storage.IndexEntry { + seen := make(map[string]bool, len(entries)) + out := make([]storage.IndexEntry, 0, len(entries)) + for _, e := range entries { + if seen[e.URL] { + continue + } + seen[e.URL] = true + out = append(out, e) + } + return out +} + +func dedupNormalised(entries []storage.IndexEntry) []storage.IndexEntry { + // Sort by URL length descending, then dedup by normalised URL (strip scheme, trailing slash) + sorted := make([]storage.IndexEntry, len(entries)) + copy(sorted, entries) + for i := 0; i < len(sorted)-1; i++ { + for j := i + 1; j < len(sorted); j++ { + if len(sorted[j].URL) > len(sorted[i].URL) { + sorted[i], sorted[j] = sorted[j], sorted[i] + } + } + } + seen := make(map[string]bool) + out := make([]storage.IndexEntry, 0, len(sorted)) + for _, e := range sorted { + k := normaliseURL(e.URL) + if seen[k] { + continue + } + seen[k] = true + out = append(out, e) + } + return out +} + +func normaliseURL(u string) string { + if strings.HasPrefix(u, "https://") { + u = u[8:] + } else if strings.HasPrefix(u, "http://") { + u = u[7:] + } + return strings.TrimRight(u, "/") +} + +// trim reduces entries to at most limit, keeping at most sameDomainLimit per domain. +func trim(entries []storage.IndexEntry, infoSvc *info.Service, limit, sameDomainLimit int) []storage.IndexEntry { + // Sort by effective score: weight * (1 + backlink) + scored := make([]storage.IndexEntry, len(entries)) + copy(scored, entries) + for i := 0; i < len(scored)-1; i++ { + for j := i + 1; j < len(scored); j++ { + si := float64(scored[i].Weight) * (1 + infoSvc.Prosper(scored[i].URL)) + sj := float64(scored[j].Weight) * (1 + infoSvc.Prosper(scored[j].URL)) + if sj > si { + scored[i], scored[j] = scored[j], scored[i] + } + } + } + + // Per-domain cap + domainCount := make(map[string]int) + out := make([]storage.IndexEntry, 0, limit) + for _, e := range scored { + host := netloc(e.URL) + if host == "" { + host = e.URL + } + host = strings.ToLower(host) + // Allow homepage URLs regardless of limit + isHome := isHomepage(e.URL) + if !isHome && domainCount[host] >= sameDomainLimit { + continue + } + domainCount[host]++ + out = append(out, e) + if len(out) >= limit { + break + } + } + return out +} + +func isHomepage(u string) bool { + u = strings.TrimPrefix(u, "https://") + u = strings.TrimPrefix(u, "http://") + return strings.Count(strings.TrimRight(u, "/"), "/") == 0 +} + +func netloc(rawURL string) string { + parts := strings.SplitN(rawURL, "/", 4) + if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" { + return parts[2] + } + return "" +} + +// nthLargest returns the n-th largest value in a slice (0-indexed). +func nthLargest(values []float64, n int) float64 { + if n >= len(values) { + return 0 + } + cp := make([]float64, len(values)) + copy(cp, values) + // Partial sort descending + for i := 0; i <= n; i++ { + maxIdx := i + for j := i + 1; j < len(cp); j++ { + if cp[j] > cp[maxIdx] { + maxIdx = j + } + } + cp[i], cp[maxIdx] = cp[maxIdx], cp[i] + } + return cp[n] +} + +// ListenAndServe starts the harvester on the given address. +func (s *Server) ListenAndServe(addr string) error { + log.Printf("[harvester] listening on %s", addr) + return http.ListenAndServe(addr, s.Handler()) +} diff --git a/info/info.go b/info/info.go new file mode 100644 index 0000000..c2a13f5 --- /dev/null +++ b/info/info.go @@ -0,0 +1,206 @@ +// Package info loads and serves auxiliary data: backlink scores, adjustment +// table, and blocked query words. +package info + +import ( + "encoding/json" + "math" + "os" + "path/filepath" + "strings" + "sync" +) + +// Service loads the prosperity map, adjustment table, and blocked words. +type Service struct { + mu sync.RWMutex + prosperMap map[string]float64 // normalised backlink scores + adjustTable map[string]float64 // per-domain manual weight adjustments + blockedWords map[string]bool + storagePath string +} + +// New creates and loads the info service from storagePath. +func New(storagePath string) *Service { + s := &Service{storagePath: storagePath} + s.Reload() + return s +} + +// Reload re-reads all data files from disk. +func (s *Service) Reload() { + s.mu.Lock() + defer s.mu.Unlock() + s.prosperMap = loadProsperMap(s.storagePath) + s.adjustTable = loadAdjustTable() + s.blockedWords = loadBlockedWords() +} + +// Prosper returns the backlink score for a URL (sum of its path components). +func (s *Service) Prosper(rawURL string) float64 { + s.mu.RLock() + defer s.mu.RUnlock() + return prosperFor(rawURL, s.prosperMap) +} + +// ProsperMap returns the full prosperity map (read-only snapshot). +func (s *Service) ProsperMap() map[string]float64 { + s.mu.RLock() + defer s.mu.RUnlock() + out := make(map[string]float64, len(s.prosperMap)) + for k, v := range s.prosperMap { + out[k] = v + } + return out +} + +// Adjust returns the manual weight multiplier for a hostname (default 1.0). +func (s *Service) Adjust(host string) float64 { + s.mu.RLock() + defer s.mu.RUnlock() + if v, ok := s.adjustTable[host]; ok { + return v + } + return 1.0 +} + +// IsBlocked returns true if the word is in the blocked list. +func (s *Service) IsBlocked(word string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.blockedWords[word] +} + +// ---- loaders ---- + +const backlinkBaseline = 200000.0 + +func loadProsperMap(storagePath string) map[string]float64 { + path := filepath.Join(storagePath, "prosper.json") + f, err := os.Open(path) + if err != nil { + return map[string]float64{} + } + defer f.Close() + var raw map[string]float64 + if err := json.NewDecoder(f).Decode(&raw); err != nil { + return map[string]float64{} + } + return normalise(raw) +} + +func normalise(d map[string]float64) map[string]float64 { + total := 0.0 + for k, v := range d { + if !strings.Contains(k, "/") { + total += v + } + } + if total == 0 { + return d + } + factor := backlinkBaseline / total + out := make(map[string]float64, len(d)) + for k, v := range d { + out[k] = v * factor + } + // Propagate max score up the domain tree + for k, v := range out { + now := k + for { + idx := strings.Index(now, ".") + if idx < 0 { + break + } + now = now[idx+1:] + if cur, ok := out[now]; ok && cur < v { + out[now] = v + } else if !ok { + break + } + } + } + return out +} + +func loadAdjustTable() map[string]float64 { + // Try loading from data/adjust.json — fallback if absent + f, err := os.Open(filepath.Join("data", "adjust.json")) + if err != nil { + return map[string]float64{} + } + defer f.Close() + var m map[string]float64 + json.NewDecoder(f).Decode(&m) + return m +} + +func loadBlockedWords() map[string]bool { + f, err := os.Open(filepath.Join("data", "blocked_words.json")) + if err != nil { + return map[string]bool{} + } + defer f.Close() + var words []string + json.NewDecoder(f).Decode(&words) + m := make(map[string]bool, len(words)) + for _, w := range words { + m[w] = true + } + return m +} + +// prosperFor computes the prosperity score for a URL by decomposing it. +func prosperFor(rawURL string, pm map[string]float64) float64 { + segments := decomposeURL(rawURL) + s := 0.0 + for _, seg := range segments { + t, ok := pm[seg] + if !ok { + t = 0 + } + l := 0.0 + if t > 0 { + l = math.Log2(2+t*2) - 1 + } + if s == 0 { + if l == 0 { + return 0 + } + s = l + } else { + s = l + math.Log((s-l)/2+1) + } + } + if s > 0 { + return 0.1 + s + } + return 0 +} + +// decomposeURL yields "domain.tld", "domain.tld/path", "domain.tld/path/sub", ... +func decomposeURL(rawURL string) []string { + u := strings.ToLower(rawURL) + if strings.HasPrefix(u, "https://") { + u = u[8:] + } else if strings.HasPrefix(u, "http://") { + u = u[7:] + } else { + return nil + } + u = strings.ReplaceAll(u, "?", "/") + u = strings.ReplaceAll(u, "#", "/") + u = strings.TrimRight(u, "/") + if u == "" || u[0] == '/' || u[0] == '%' || u[0] == ' ' { + return nil + } + parts := strings.Split(u, "/") + var out []string + current := parts[0] + out = append(out, current) + for _, p := range parts[1:] { + current = current + "/" + p + out = append(out, current) + } + return out +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..ac3ba55 --- /dev/null +++ b/main.go @@ -0,0 +1,90 @@ +// sese-engine — Go rewrite +// +// All modules (harvester, search server, crawler, backlink calculator) are +// launched as goroutines from this single binary. The binary blocks until +// interrupted (Ctrl-C / SIGTERM). +// +// Usage: +// +// cd golang && go run . [--storage ./savedata] [--entry https://zh.wikipedia.org/] +package main + +import ( + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "sese-engine/analyzer" + "sese-engine/backlink" + "sese-engine/config" + "sese-engine/crawler" + "sese-engine/harvester" + "sese-engine/info" + "sese-engine/search" + "sese-engine/storage" +) + +func main() { + storageDir := flag.String("storage", config.StoragePath, "path to savedata directory") + entryURL := flag.String("entry", config.EntryURL, "BFS crawl entry URL") + stopWords := flag.String("stopwords", "../data/标点符号.json", "path to stop-words JSON") + flag.Parse() + + log.SetFlags(log.LstdFlags | log.Lshortfile) + log.Printf("sese-engine starting storage=%s entry=%s", *storageDir, *entryURL) + + // ---- 1. Storage ---- + db, err := storage.Open(*storageDir) + if err != nil { + log.Fatalf("failed to open storage: %v", err) + } + defer db.Close() + + // ---- 2. Info service ---- + infoSvc := info.New(*storageDir) + + // ---- 3. Analyzer ---- + // modelPath is unused (lingua-go uses built-in language models, no external file needed) + anal, err := analyzer.New("", *stopWords) + if err != nil { + log.Fatalf("failed to init analyzer: %v", err) + } + defer anal.Close() + + // ---- 4. Harvester (index write server on :5000) ---- + harvSrv := harvester.New(db, infoSvc) + go func() { + if err := harvSrv.ListenAndServe(":5000"); err != nil { + log.Fatalf("[harvester] fatal: %v", err) + } + }() + + // ---- 5. Search server ---- + searchSrv := search.New(db, infoSvc, anal) + go func() { + addr := fmt.Sprintf(":%d", config.SearchServerPort) + if err := searchSrv.ListenAndServe(addr); err != nil { + log.Fatalf("[search] fatal: %v", err) + } + }() + + // ---- 6. Backlink calculator (runs every 48 h) ---- + bl := backlink.New(db, *storageDir) + go bl.Run() + + // ---- 7. Crawler ---- + prosperMap := infoSvc.ProsperMap() + crawl := crawler.New(db, anal, prosperMap) + go crawl.Run(*entryURL, config.MaxEpoch) + + log.Println("all modules started — press Ctrl-C to stop") + + // ---- Graceful shutdown ---- + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt, syscall.SIGTERM) + <-quit + log.Println("shutdown signal received, exiting...") +} diff --git a/parser/parser.go b/parser/parser.go new file mode 100644 index 0000000..0ce3d70 --- /dev/null +++ b/parser/parser.go @@ -0,0 +1,153 @@ +// Package parser extracts title, description, text content, and links from HTML. +package parser + +import ( + "path" + "regexp" + "strings" + + "golang.org/x/net/html" +) + +var wsRe = regexp.MustCompile(`\s+`) + +// ParseHTML parses an HTML document and returns title, meta description, body text, and href list. +func ParseHTML(body, baseURL string) (title, description, text string, hrefs []string) { + // Determine base scheme+host + base := baseFromURL(baseURL) + basePath := pathFromURL(baseURL) + + doc, err := html.Parse(strings.NewReader(body)) + if err != nil { + return + } + + var textParts []string + + var dfs func(n *html.Node) + dfs = func(n *html.Node) { + if n.Type == html.ElementNode { + tag := strings.ToLower(n.Data) + if tag == "script" || tag == "style" || tag == "svg" { + return + } + if tag == "meta" { + name := "" + content := "" + for _, a := range n.Attr { + switch strings.ToLower(a.Key) { + case "name": + name = strings.ToLower(a.Val) + case "content": + content = a.Val + } + } + if name == "description" && description == "" { + description = content + } + } + if tag == "a" { + href := attrVal(n, "href") + if href != "" { + href = strings.SplitN(href, "#", 2)[0] + if href != "" { + href = resolveURL(base, basePath, href) + if href != "" { + hrefs = append(hrefs, href) + } + } + } + } + } + + if n.Type == html.TextNode && n.Parent != nil { + parentTag := "" + if n.Parent.Type == html.ElementNode { + parentTag = strings.ToLower(n.Parent.Data) + } + if parentTag == "script" || parentTag == "style" || parentTag == "svg" { + goto children + } + s := wsRe.ReplaceAllString(n.Data, " ") + s = strings.TrimSpace(s) + if s != "" { + if parentTag == "title" { + title = s + } else { + textParts = append(textParts, s) + } + } + } + + children: + for c := n.FirstChild; c != nil; c = c.NextSibling { + dfs(c) + } + } + dfs(doc) + + text = strings.Join(textParts, " ") + return +} + +func attrVal(n *html.Node, key string) string { + for _, a := range n.Attr { + if strings.ToLower(a.Key) == key { + return a.Val + } + } + return "" +} + +func baseFromURL(rawURL string) string { + idx := strings.Index(rawURL, "://") + if idx < 0 { + return "" + } + rest := rawURL[idx+3:] + slash := strings.Index(rest, "/") + if slash < 0 { + return rawURL + } + return rawURL[:idx+3+slash] +} + +func pathFromURL(rawURL string) string { + idx := strings.Index(rawURL, "://") + if idx < 0 { + return "/" + } + rest := rawURL[idx+3:] + slash := strings.Index(rest, "/") + if slash < 0 { + return "/" + } + p := rest[slash:] + // strip query/fragment + p = strings.SplitN(p, "?", 2)[0] + p = strings.SplitN(p, "#", 2)[0] + return p +} + +func resolveURL(base, basePath, href string) string { + // Absolute URL + if strings.HasPrefix(href, "http://") || strings.HasPrefix(href, "https://") { + return href + } + // Protocol-relative + if strings.HasPrefix(href, "//") { + // extract scheme from base + idx := strings.Index(base, "://") + if idx < 0 { + return "" + } + return base[:idx+1] + href + } + // Absolute path + if strings.HasPrefix(href, "/") { + return base + href + } + // Relative path + dir := path.Dir(basePath) + return base + path.Clean(dir+"/"+href) +} diff --git a/search/server.go b/search/server.go new file mode 100644 index 0000000..23aa7ed --- /dev/null +++ b/search/server.go @@ -0,0 +1,693 @@ +// Package search implements the user-facing search HTTP server. +package search + +import ( + "container/heap" + "encoding/json" + "log" + "math" + "net/http" + "net/url" + "regexp" + "sort" + "strings" + "sync" + "time" + + "sese-engine/analyzer" + "sese-engine/config" + "sese-engine/info" + "sese-engine/parser" + "sese-engine/storage" +) + +// Server is the search HTTP server. +type Server struct { + db *storage.DB + infoSvc *info.Service + analyzer *analyzer.Analyzer + httpCli *http.Client // for online snippet fetching +} + +// New creates a search Server. +func New(db *storage.DB, infoSvc *info.Service, a *analyzer.Analyzer) *Server { + return &Server{ + db: db, + infoSvc: infoSvc, + analyzer: a, + httpCli: &http.Client{ + Timeout: time.Duration(config.OnlineSnippetTimeout) * time.Second, + }, + } +} + +// Handler returns the http.Handler. +func (s *Server) Handler() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("/search", s.handleSearch) + return mux +} + +// ListenAndServe starts the search server. +func (s *Server) ListenAndServe(addr string) error { + log.Printf("[search] listening on %s", addr) + return http.ListenAndServe(addr, s.Handler()) +} + +// ---- search handler ---- + +type searchResponse struct { + Tokens []string `json:"tokens"` + Counts map[string]int `json:"counts"` + Results []searchResult `json:"results"` + Total int `json:"total"` +} + +type searchResult struct { + Score float64 `json:"score"` + URL string `json:"url"` + Snippet *snippetInfo `json:"snippet,omitempty"` + Relevance map[string]float64 `json:"relevance"` + DomainCount int `json:"domain_count"` + Factors map[string]float64 `json:"factors,omitempty"` +} + +type snippetInfo struct { + Title string `json:"title"` + Description string `json:"description"` + Text string `json:"text"` +} + +var siteRe = regexp.MustCompile(`^site:(.+)$`) + +func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + q := r.URL.Query().Get("q") + if q == "" { + if qh := r.URL.Query().Get("qh"); qh != "" { + decoded, err := url.PathUnescape(qh) + if err == nil { + q = decoded + } + } + } + + // Parse slice param "0:10" + sliceStr := r.URL.Query().Get("slice") + sliceFrom, sliceTo := 0, 10 + if sliceStr != "" { + parts := strings.SplitN(sliceStr, ":", 2) + if len(parts) == 2 { + a := atoi(parts[0]) + b := atoi(parts[1]) + if a >= 0 && b > a && b-a <= 20 { + sliceFrom, sliceTo = a, b + } + } + } + + // Parse tokens and site filter + var tokens []string + var siteFilter string + for _, part := range strings.Fields(q) { + if m := siteRe.FindStringSubmatch(part); len(m) > 1 { + siteFilter = m[1] + } else { + segs := s.analyzer.Segment(part, false) + for _, t := range segs { + if !s.infoSvc.IsBlocked(t) { + tokens = append(tokens, t) + } + } + } + } + + if len(tokens) > 20 { + tokens = tokens[:20] + } + + results, total := s.query(tokens, sliceFrom, sliceTo, siteFilter) + + // Count per keyword + counts := make(map[string]int, len(tokens)) + for _, t := range tokens { + entries, _ := s.db.GetIndex(t) + counts[t] = len(entries) + } + + resp := searchResponse{ + Tokens: tokens, + Counts: counts, + Results: results, + Total: total, + } + json.NewEncoder(w).Encode(resp) +} + +// query executes the multi-keyword search and returns ranked results. +func (s *Server) query(tokens []string, from, to int, siteFilter string) ([]searchResult, int) { + if len(tokens) == 0 { + return nil, 0 + } + + // Load inverted index for each token + type tokenIndex struct { + token string + entries []storage.IndexEntry + defVal float64 + } + tokenIndexes := make([]tokenIndex, 0, len(tokens)) + for _, t := range tokens { + entries, _ := s.db.GetIndex(t) + defVal := 1.0 / 10000 * float64(max(100, len(entries))) / float64(config.MaxURLsPerKey) + if len(entries) >= config.MaxURLsPerKey { + weights := make([]float64, len(entries)) + for i, e := range entries { + weights[i] = float64(e.Weight) + } + sort.Sort(sort.Reverse(sort.Float64Slice(weights))) + defVal = math.Max(1.0/10000, weights[config.MaxURLsPerKey-1]/2) + } + tokenIndexes = append(tokenIndexes, tokenIndex{t, entries, defVal}) + } + + // Build URL → per-token weights map + urlWeights := make(map[string]map[string]float64) + for _, ti := range tokenIndexes { + for _, e := range ti.entries { + if urlWeights[e.URL] == nil { + urlWeights[e.URL] = make(map[string]float64) + } + urlWeights[e.URL][ti.token] = float64(e.Weight) + } + } + + // Site filter + total := len(urlWeights) + if siteFilter != "" { + filtered := make(map[string]map[string]float64) + for u, vs := range urlWeights { + h := netloc(u) + if matchSite(h, siteFilter) { + filtered[u] = vs + } + } + urlWeights = filtered + total = len(urlWeights) + } + + // Build default value map + defVals := make(map[string]float64, len(tokenIndexes)) + for _, ti := range tokenIndexes { + defVals[ti.token] = ti.defVal + } + + // Compute relevance + initial score for each URL + candidates := make([]candidate, 0, len(urlWeights)) + for u, vs := range urlWeights { + rel := 1.0 + for _, ti := range tokenIndexes { + vp := vs[ti.token] + if vp == 0 { + vp = defVals[ti.token] + } + if vp > 0.06 { + vp = math.Log((vp-0.06)*40+1)/40 + 0.06 + } + rel *= vp + } + prosper := 1 + s.infoSvc.Prosper(u)*config.BacklinkWeight + bad := badURL(u) + adjust := s.infoSvc.Adjust(netloc(u)) + score := rel * prosper * (1 - bad) * adjust * 0.1 + + var vec [12]float64 + vec[0] = score + vec[1] = rel + vec[2] = prosper + vec[3] = 1 - bad + vec[4] = 1 // language multiplier placeholder + vec[5] = 1 // repetition placeholder + vec[6] = adjust + vec[7] = 1 // time multiplier placeholder + vec[8] = 1 // consecutive keyword placeholder + vec[9] = 1 // keyword content placeholder + vec[10] = 1 // URL time placeholder + vec[11] = 0.1 + + candidates = append(candidates, candidate{u, rel, vec}) + } + + // Early relevance threshold + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].scoreVec[0] > candidates[j].scoreVec[0] + }) + + // Apply site info factors to top 256 + now := time.Now().Unix() + limit256 := 256 + if len(candidates) < 256 { + limit256 = len(candidates) + } + + var wg sync.WaitGroup + for i := 0; i < limit256; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + c := &candidates[idx] + h := netloc(c.url) + siteInfo, _ := s.db.GetSiteInfo(h) + langMul := languageMultiplier(siteInfo) + timeMul := timeMul(siteInfo, now) + urlTimeMul := urlTimeMul(s.db, c.url, now) + + c.scoreVec[0] = c.scoreVec[0] * 10 * langMul * timeMul * urlTimeMul + c.scoreVec[4] = langMul + c.scoreVec[7] = timeMul + c.scoreVec[10] = urlTimeMul + }(i) + } + wg.Wait() + + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].scoreVec[0] > candidates[j].scoreVec[0] + }) + + // Apply consecutive-keyword and repetition bonuses to top 80 + limit80 := 80 + if len(candidates) < 80 { + limit80 = len(candidates) + } + + titles := make([]string, limit80) + for i := 0; i < limit80; i++ { + if snippet, err := s.db.GetSnippet(candidates[i].url); err == nil { + titles[i] = snippet.Title + } + } + + // Repetition penaliser + for i := 0; i < limit80; i++ { + h := repetitionSimilarity(titles, i) + consecutive := consecutiveCount(titles[i], tokens) + repMul := 1.0 + if h > 0.5 { + repMul = 1 - (h - 0.5) + } + consMul := math.Pow(config.ConsecutiveKeyWeight, float64(consecutive)) + candidates[i].scoreVec[0] *= repMul * consMul + candidates[i].scoreVec[5] = repMul + candidates[i].scoreVec[8] = consMul + } + + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].scoreVec[0] > candidates[j].scoreVec[0] + }) + + // Re-rank: interleave domains + reranked := rerank(candidates, from, to) + + // Fetch snippets and build output + results := make([]searchResult, 0, len(reranked)) + var snippetMu sync.Mutex + var snippetWg sync.WaitGroup + + for _, c := range reranked { + snippetWg.Add(1) + go func(cand candidate) { + defer snippetWg.Done() + snip := s.getSnippet(cand.url) + r := searchResult{ + Score: cand.scoreVec[0], + URL: unescapeURL(cand.url), + Snippet: snip, + Relevance: make(map[string]float64), + DomainCount: 0, + Factors: map[string]float64{ + "relevance": cand.scoreVec[1], + "backlink": cand.scoreVec[2], + "url_quality": cand.scoreVec[3], + "language": cand.scoreVec[4], + "repetition": cand.scoreVec[5], + "adjust": cand.scoreVec[6], + "site_time": cand.scoreVec[7], + "consecutive": cand.scoreVec[8], + "url_time": cand.scoreVec[10], + }, + } + for _, ti := range tokenIndexes { + r.Relevance[ti.token] = urlWeights[cand.url][ti.token] + } + snippetMu.Lock() + results = append(results, r) + snippetMu.Unlock() + }(c) + } + snippetWg.Wait() + + // Preserve order (goroutines may reorder) + urlOrder := make(map[string]int) + for i, c := range reranked { + urlOrder[c.url] = i + } + sort.Slice(results, func(i, j int) bool { + return urlOrder[results[i].URL] < urlOrder[results[j].URL] + }) + + return results, total +} + +// getSnippet fetches (or caches) a snippet for a URL. +func (s *Server) getSnippet(rawURL string) *snippetInfo { + // Try cache first + if entry, err := s.db.GetSnippet(rawURL); err == nil { + snip := buildSnippet(entry) + return snip + } + if !config.UseOnlineSnippet { + return nil + } + // Fetch online with a simple HTTP client (no robots.txt check for search snippets) + req, err := http.NewRequest("GET", rawURL, nil) + if err != nil { + return nil + } + req.Header.Set("User-Agent", config.SpiderName) + resp, err := s.httpCli.Do(req) + if err != nil || resp.StatusCode != 200 { + return nil + } + defer resp.Body.Close() + + ct := resp.Header.Get("Content-Type") + if !strings.Contains(ct, "text/html") { + return nil + } + body := readBodyLimited(resp, 60000) + title, desc, text, _ := parser.ParseHTML(body, resp.Request.URL.String()) + entry := &storage.SnippetEntry{ + Title: title, + Description: truncate(desc, 256), + Text: truncate(text, 256), + Timestamp: time.Now().Unix(), + } + _ = s.db.SetSnippet(rawURL, entry) + return buildSnippet(entry) +} + +func buildSnippet(entry *storage.SnippetEntry) *snippetInfo { + if entry == nil || (entry.Title == "" && entry.Description == "" && entry.Text == "") { + return nil + } + return &snippetInfo{ + Title: entry.Title, + Description: entry.Description, + Text: entry.Text, + } +} + +// ---- scoring helpers ---- + +func languageMultiplier(si *storage.SiteInfo) float64 { + if si == nil || len(si.Languages) == 0 { + return 1.0 + } + total := 0.0 + for _, v := range si.Languages { + total += v + } + chinese := si.Languages["zh"] / total + weird := (total - si.Languages["zh"] - si.Languages["en"] - si.Languages["ja"]) / total + return 1 + chinese*config.LanguageWeight - weird*config.LanguageWeight +} + +func timeMul(si *storage.SiteInfo, now int64) float64 { + if si == nil { + return 1.0 + } + t := si.LastVisitTime + if t == 0 { + t = 1648000000 + } + days := (now - t) / (3600 * 24) + if days < 0 { + days = 0 + } + if days > 180 { + days = 180 + } + if days > 0 { + days-- + } + return math.Pow(config.WeightDailyDecay, float64(days)) +} + +func urlTimeMul(db *storage.DB, rawURL string, now int64) float64 { + entry, err := db.GetSnippet(rawURL) + if err != nil || entry == nil { + return 1.0 + } + days := (now - entry.Timestamp) / (3600 * 24) + if days <= 30 { + return 1.0 + } + return math.Pow((2+config.WeightDailyDecay)/3, float64(days)) +} + +func badURL(u string) float64 { + s := math.Max(0, float64(len(u)-30)/200.0) + if strings.Contains(u, ".htm") || strings.Contains(u, ".php") { + s += (1 - s) * 0.3 + } + if strings.Count(strings.TrimRight(u, "/"), "/") > 2 { + s += (1 - s) * 0.1 + } + if len(u) < 5 || u[4] == ':' { + s += (1 - s) * 0.3 + } + return math.Min(s, 0.9) +} + +func netloc(rawURL string) string { + parts := strings.SplitN(rawURL, "/", 4) + if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" { + return parts[2] + } + return rawURL +} + +func matchSite(host, pattern string) bool { + if host == pattern { + return true + } + if strings.HasSuffix(host, "."+pattern) { + return true + } + return false +} + +func consecutiveCount(title string, tokens []string) int { + c := 0 + for i := 0; i < len(tokens)-1; i++ { + if strings.Contains(title, tokens[i]+tokens[i+1]) { + c++ + } + } + return c +} + +func repetitionSimilarity(titles []string, idx int) float64 { + if idx == 0 { + return 0 + } + t := titles[idx] + if t == "" { + return 0 + } + best := 0.0 + for _, prev := range titles[:idx] { + if prev == "" { + continue + } + sim := 1 - float64(levenshtein(t, prev))/float64(max(len(t), len(prev))) + if sim > best { + best = sim + } + } + return best +} + +func levenshtein(a, b string) int { + ra := []rune(a) + rb := []rune(b) + la, lb := len(ra), len(rb) + if la == 0 { + return lb + } + if lb == 0 { + return la + } + prev := make([]int, lb+1) + curr := make([]int, lb+1) + for j := 0; j <= lb; j++ { + prev[j] = j + } + for i := 1; i <= la; i++ { + curr[0] = i + for j := 1; j <= lb; j++ { + cost := 1 + if ra[i-1] == rb[j-1] { + cost = 0 + } + curr[j] = min3(curr[j-1]+1, prev[j]+1, prev[j-1]+cost) + } + prev, curr = curr, prev + } + return prev[lb] +} + +func min3(a, b, c int) int { + if a < b { + if a < c { + return a + } + return c + } + if b < c { + return b + } + return c +} + +// rerank interleaves results from different domains. +type domainHeap []rerankItem + +type rerankItem struct { + score float64 + url string + domainMul float64 + vec [12]float64 +} + +func (h domainHeap) Len() int { return len(h) } +func (h domainHeap) Less(i, j int) bool { return h[i].score*h[i].domainMul > h[j].score*h[j].domainMul } +func (h domainHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *domainHeap) Push(x interface{}) { *h = append(*h, x.(rerankItem)) } +func (h *domainHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x +} + +type candidate struct { + url string + relevance float64 + scoreVec [12]float64 +} + +func rerank(candidates []candidate, from, to int) []candidate { + domainItems := make(map[string][]candidate) + for _, c := range candidates { + h := netloc(c.url) + domainItems[h] = append(domainItems[h], c) + } + + h := &domainHeap{} + heap.Init(h) + domainMul := make(map[string]float64) + + for domain, items := range domainItems { + domainMul[domain] = 1.0 + // Sort items within domain + sort.Slice(items, func(i, j int) bool { + return items[i].scoreVec[0] < items[j].scoreVec[0] + }) + top := items[len(items)-1] + domainItems[domain] = items[:len(items)-1] + heap.Push(h, rerankItem{top.scoreVec[0], top.url, domainMul[domain], top.scoreVec}) + } + + var result []candidate + for h.Len() > 0 && len(result) < to { + item := heap.Pop(h).(rerankItem) + if len(result) >= from { + result = append(result, candidate{url: item.url, scoreVec: item.vec}) + } + domain := netloc(item.url) + domainMul[domain] /= 8 + remaining := domainItems[domain] + if len(remaining) > 0 { + next := remaining[len(remaining)-1] + domainItems[domain] = remaining[:len(remaining)-1] + heap.Push(h, rerankItem{next.scoreVec[0], next.url, domainMul[domain], next.scoreVec}) + } + } + return result +} + +// ---- misc ---- + +func readBodyLimited(resp *http.Response, limit int64) string { + data := make([]byte, 0, limit) + buf := make([]byte, 4096) + var total int64 + for { + n, err := resp.Body.Read(buf) + if n > 0 { + data = append(data, buf[:n]...) + total += int64(n) + if total >= limit { + break + } + } + if err != nil { + break + } + } + return string(data) +} + +func truncate(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] +} + +func unescapeURL(u string) string { + decoded, err := url.PathUnescape(u) + if err != nil { + return u + } + return decoded +} + +func atoi(s string) int { + n := 0 + for _, c := range s { + if c < '0' || c > '9' { + return n + } + n = n*10 + int(c-'0') + } + return n +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/storage/storage.go b/storage/storage.go new file mode 100644 index 0000000..d32c672 --- /dev/null +++ b/storage/storage.go @@ -0,0 +1,300 @@ +// Package storage provides the persistent index and site-info storage backed by bbolt. +// +// Index space → a single bbolt bucket "index" where key = keyword (string), +// value = brotli-compressed JSON array of [weight, url] pairs. +// +// Gate (门) → a bbolt bucket "gate" where key = URL (string), +// value = brotli-compressed JSON array [title, desc, text, timestamp]. +// +// SiteGate (网站之门) → a bbolt bucket "site_gate" where key = hostname (string), +// value = brotli-compressed JSON of SiteInfo struct. +// +// The Python version used a custom hash-bucket scheme; here bbolt handles it natively. +package storage + +import ( + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/andybalholm/brotli" + bolt "go.etcd.io/bbolt" +) + +// IndexEntry is a single entry in the inverted index. +type IndexEntry struct { + Weight float32 `json:"w"` + URL string `json:"u"` +} + +// SnippetEntry is cached snippet data for a URL. +type SnippetEntry struct { + Title string `json:"title"` + Description string `json:"desc"` + Text string `json:"text"` + Timestamp int64 `json:"ts"` +} + +var ( + bucketIndex = []byte("index") + bucketGate = []byte("gate") + bucketSiteGate = []byte("site_gate") +) + +// DB wraps a bbolt database and exposes typed access methods. +// bbolt handles its own locking internally. +type DB struct { + db *bolt.DB +} + +// Open creates or opens the bbolt database at the given directory path. +func Open(dir string) (*DB, error) { + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("storage.Open mkdir: %w", err) + } + path := filepath.Join(dir, "sese.db") + db, err := bolt.Open(path, 0o600, nil) + if err != nil { + return nil, fmt.Errorf("storage.Open bolt: %w", err) + } + // Ensure buckets exist + err = db.Update(func(tx *bolt.Tx) error { + for _, b := range [][]byte{bucketIndex, bucketGate, bucketSiteGate} { + if _, err := tx.CreateBucketIfNotExists(b); err != nil { + return err + } + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("storage.Open create buckets: %w", err) + } + return &DB{db: db}, nil +} + +// Close closes the underlying bbolt database. +func (d *DB) Close() error { + return d.db.Close() +} + +// ---- helpers ---- + +func compress(data []byte) ([]byte, error) { + buf := make([]byte, 0, len(data)) + w := brotli.NewWriterLevel((*appendWriter)(&buf), 6) + if _, err := w.Write(data); err != nil { + return nil, err + } + if err := w.Close(); err != nil { + return nil, err + } + return buf, nil +} + +func decompress(data []byte) ([]byte, error) { + r := brotli.NewReader( + (*byteReader)(&data), + ) + out := make([]byte, 0, len(data)*3) + tmp := make([]byte, 4096) + for { + n, err := r.Read(tmp) + out = append(out, tmp[:n]...) + if err != nil { + if err == io.EOF { + break + } + return out, err + } + } + return out, nil +} + +// appendWriter implements io.Writer on top of a *[]byte. +type appendWriter []byte + +func (a *appendWriter) Write(p []byte) (int, error) { + *a = append(*a, p...) + return len(p), nil +} + +// byteReader wraps []byte as io.Reader. +type byteReader []byte + +func (b *byteReader) Read(p []byte) (int, error) { + if len(*b) == 0 { + return 0, io.EOF + } + n := copy(p, *b) + *b = (*b)[n:] + return n, nil +} + +func marshalCompress(v any) ([]byte, error) { + raw, err := json.Marshal(v) + if err != nil { + return nil, err + } + return compress(raw) +} + +func decompressUnmarshal(data []byte, v any) error { + raw, err := decompress(data) + if err != nil { + return err + } + return json.Unmarshal(raw, v) +} + +// ---- Index (inverted index) ---- + +// GetIndex retrieves all IndexEntry values for a keyword. +func (d *DB) GetIndex(keyword string) ([]IndexEntry, error) { + var entries []IndexEntry + err := d.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketIndex) + v := b.Get([]byte(keyword)) + if v == nil { + return nil + } + return decompressUnmarshal(v, &entries) + }) + return entries, err +} + +// SetIndex overwrites the IndexEntry list for a keyword. +func (d *DB) SetIndex(keyword string, entries []IndexEntry) error { + data, err := marshalCompress(entries) + if err != nil { + return err + } + return d.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bucketIndex).Put([]byte(keyword), data) + }) +} + +// BatchSetIndex writes multiple keyword→entries pairs in one transaction. +func (d *DB) BatchSetIndex(batch map[string][]IndexEntry) error { + return d.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketIndex) + for keyword, entries := range batch { + data, err := marshalCompress(entries) + if err != nil { + return err + } + if err := b.Put([]byte(keyword), data); err != nil { + return err + } + } + return nil + }) +} + +// ForEachIndex iterates over all index entries. fn receives keyword and entries. +func (d *DB) ForEachIndex(fn func(keyword string, entries []IndexEntry) error) error { + return d.db.View(func(tx *bolt.Tx) error { + return tx.Bucket(bucketIndex).ForEach(func(k, v []byte) error { + var entries []IndexEntry + if err := decompressUnmarshal(v, &entries); err != nil { + return nil // skip corrupted entries + } + return fn(string(k), entries) + }) + }) +} + +// ---- Gate (URL snippet cache) ---- + +// GetSnippet retrieves the cached snippet for a URL. +func (d *DB) GetSnippet(url string) (*SnippetEntry, error) { + var entry SnippetEntry + err := d.db.View(func(tx *bolt.Tx) error { + v := tx.Bucket(bucketGate).Get([]byte(url)) + if v == nil { + return fmt.Errorf("not found") + } + return decompressUnmarshal(v, &entry) + }) + if err != nil { + return nil, err + } + return &entry, nil +} + +// SetSnippet stores a cached snippet for a URL. +func (d *DB) SetSnippet(url string, entry *SnippetEntry) error { + data, err := marshalCompress(entry) + if err != nil { + return err + } + return d.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bucketGate).Put([]byte(url), data) + }) +} + +// ---- SiteGate (site metadata) ---- + +// SiteInfo mirrors the Python 网站 dataclass. +type SiteInfo struct { + VisitCount int `json:"visit_count"` + LastVisitTime int64 `json:"last_visit_time"` + Fingerprint any `json:"fingerprint,omitempty"` + SuccessRate *float64 `json:"success_rate,omitempty"` + HTMLStructure string `json:"html_structure,omitempty"` + IPs []string `json:"ips,omitempty"` + Quality *float64 `json:"quality,omitempty"` + HTTPSAvailable *bool `json:"https_available,omitempty"` + Keywords []string `json:"keywords,omitempty"` + OutLinks []string `json:"out_links,omitempty"` + Languages map[string]float64 `json:"languages,omitempty"` + Redirects map[string]string `json:"redirects,omitempty"` + ServerTypes []string `json:"server_types,omitempty"` +} + +// GetSiteInfo retrieves metadata for a hostname. +func (d *DB) GetSiteInfo(host string) (*SiteInfo, error) { + var info SiteInfo + err := d.db.View(func(tx *bolt.Tx) error { + v := tx.Bucket(bucketSiteGate).Get([]byte(host)) + if v == nil { + return fmt.Errorf("not found") + } + return decompressUnmarshal(v, &info) + }) + if err != nil { + return &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)}, nil + } + if info.Languages == nil { + info.Languages = make(map[string]float64) + } + if info.Redirects == nil { + info.Redirects = make(map[string]string) + } + return &info, nil +} + +// SetSiteInfo stores metadata for a hostname. +func (d *DB) SetSiteInfo(host string, info *SiteInfo) error { + data, err := marshalCompress(info) + if err != nil { + return err + } + return d.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bucketSiteGate).Put([]byte(host), data) + }) +} + +// ForEachSite iterates over all site metadata entries. +func (d *DB) ForEachSite(fn func(host string, info *SiteInfo) error) error { + return d.db.View(func(tx *bolt.Tx) error { + return tx.Bucket(bucketSiteGate).ForEach(func(k, v []byte) error { + var info SiteInfo + if err := decompressUnmarshal(v, &info); err != nil { + return nil + } + return fn(string(k), &info) + }) + }) +}