From 18b1c4df5e37ef492326e5b9c73ff6900dda3f0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=96=87=E5=B3=B0?= Date: Thu, 9 Apr 2026 11:58:53 +0800 Subject: [PATCH] up --- crawler/crawler.go | 160 +++++++++++++++++++++++---------------------- storage/storage.go | 76 +++++++++++++++++++++ 2 files changed, 159 insertions(+), 77 deletions(-) diff --git a/crawler/crawler.go b/crawler/crawler.go index 80b11e0..a7f71e1 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -238,16 +238,15 @@ func (c *Crawler) visitURL(rawURL string) (hrefs []string) { if fromHost == "" { continue } - info, _ := c.db.GetSiteInfo(fromHost) - if info.Redirects == nil { - info.Redirects = make(map[string]string) - } - info.Redirects[from] = to - // 重定向映射过多时裁剪到 40 条 - if len(info.Redirects) > 50 { - info.Redirects = truncateMap(info.Redirects, 40) - } - _ = c.db.SetSiteInfo(fromHost, info) + _ = c.db.UpdateSiteInfo(fromHost, func(info *storage.SiteInfo) { + if info.Redirects == nil { + info.Redirects = make(map[string]string) + } + info.Redirects[from] = to + if len(info.Redirects) > 50 { + info.Redirects = truncateMap(info.Redirects, 40) + } + }) } // 限制返回的链接数,防止下一轮队列爆炸 @@ -263,87 +262,94 @@ func (c *Crawler) updateSiteFailure(rawURL string) { if host == "" { return } - info, _ := c.db.GetSiteInfo(host) - if info.SuccessRate == nil { - zero := 0.0 - info.SuccessRate = &zero - } - // 成功率每次失败乘以 0.99(无限趋近 0) - *info.SuccessRate *= 0.99 - _ = c.db.SetSiteInfo(host, info) + _ = c.db.UpdateSiteInfo(host, func(info *storage.SiteInfo) { + if info.SuccessRate == nil { + zero := 0.0 + info.SuccessRate = &zero + } + // 成功率每次失败乘以 0.99(无限趋近 0) + *info.SuccessRate *= 0.99 + }) } // updateSiteSuccess 当某 URL 抓取成功时,更新网站的完整元信息。 +// 使用 UpdateSiteInfo 原子读-改-写,避免并发 goroutine 对同一 host 的 SiteInfo 更新丢失。 func (c *Crawler) updateSiteSuccess(host string, res *FetchResult, title, desc, text string, hrefs []string) { - info, _ := c.db.GetSiteInfo(host) + now := time.Now().Unix() + httpsAvailable := strings.HasPrefix(res.FinalURL, "https://") + serverType := res.ServerType - // 访问计数 +1,更新最后访问时间 - info.VisitCount++ - info.LastVisitTime = time.Now().Unix() + // 语言检测(CPU 密集,在锁外执行) + var detectedLang string + // 检测条件在 UpdateSiteInfo 回调内判断,这里预先计算好 + detectedLang = c.analyzer.DetectLanguage(title + " " + desc + " " + text) - // 成功率更新:EWM(指数加权移动)平滑,每次 +0.01 - one := 1.0 - if info.SuccessRate == nil { - info.SuccessRate = &one - } - *info.SuccessRate = *info.SuccessRate*0.99 + 0.01 - - // 记录是否支持 HTTPS - if strings.HasPrefix(res.FinalURL, "https://") { - t := true - info.HTTPSAvailable = &t - } - - // 记录 HTTP Server 类型(去重,保留最近 5 个) - if res.ServerType != "" { - found := false - for _, s := range info.ServerTypes { - if s == res.ServerType { - found = true - break - } - } - if !found { - info.ServerTypes = append(info.ServerTypes, res.ServerType) - if len(info.ServerTypes) > 5 { - info.ServerTypes = info.ServerTypes[len(info.ServerTypes)-5:] - } + // 收集外链(跨顶级域名的链接) + superHost := superNetloc(res.FinalURL) + var external []string + for _, h := range hrefs { + if superNetloc(h) != superHost { + external = append(external, h) } } + sampled := sampleStrings(external, 10) - // 语言检测和出站链接收集(仅在前 10 次访问或 10% 概率下触发,减少开销) - if info.VisitCount < 10 || rand.Float64() < 0.1 { - lang := c.analyzer.DetectLanguage(title + " " + desc + " " + text) - if lang != "" { - if info.Languages == nil { - info.Languages = make(map[string]float64) - } - // 首次访问强度高,随访问次数增加强度衰减 - intensity := math.Min(0.2, 1/math.Sqrt(float64(info.VisitCount+1))) - for k := range info.Languages { - info.Languages[k] *= (1 - intensity) // 旧语种按 intensity 衰减 - } - info.Languages[lang] += intensity // 新语种增加 + _ = c.db.UpdateSiteInfo(host, func(info *storage.SiteInfo) { + // 访问计数 +1,更新最后访问时间 + info.VisitCount++ + info.LastVisitTime = now + + // 成功率更新:EWM(指数加权移动)平滑,每次 +0.01 + one := 1.0 + if info.SuccessRate == nil { + info.SuccessRate = &one + } + *info.SuccessRate = *info.SuccessRate*0.99 + 0.01 + + // 记录是否支持 HTTPS + if httpsAvailable { + t := true + info.HTTPSAvailable = &t } - // 收集外链(跨顶级域名的链接) - superHost := superNetloc(res.FinalURL) - var external []string - for _, h := range hrefs { - if superNetloc(h) != superHost { - external = append(external, h) + // 记录 HTTP Server 类型(去重,保留最近 5 个) + if serverType != "" { + found := false + for _, s := range info.ServerTypes { + if s == serverType { + found = true + break + } + } + if !found { + info.ServerTypes = append(info.ServerTypes, serverType) + if len(info.ServerTypes) > 5 { + info.ServerTypes = info.ServerTypes[len(info.ServerTypes)-5:] + } } } - // 最多保留 10 条外链 - sampled := sampleStrings(external, 10) - info.OutLinks = append(info.OutLinks, sampled...) - // 外链超过 250 条时采样到 200 条 - if len(info.OutLinks) > 250 { - info.OutLinks = sampleStrings(info.OutLinks, 200) - } - } - _ = c.db.SetSiteInfo(host, info) + // 语言检测和出站链接收集(仅在前 10 次访问或 10% 概率下触发,减少开销) + if info.VisitCount < 10 || rand.Float64() < 0.1 { + if detectedLang != "" { + if info.Languages == nil { + info.Languages = make(map[string]float64) + } + // 首次访问强度高,随访问次数增加强度衰减 + intensity := math.Min(0.2, 1/math.Sqrt(float64(info.VisitCount+1))) + for k := range info.Languages { + info.Languages[k] *= (1 - intensity) // 旧语种按 intensity 衰减 + } + info.Languages[detectedLang] += intensity // 新语种增加 + } + + // 外链 + info.OutLinks = append(info.OutLinks, sampled...) + if len(info.OutLinks) > 250 { + info.OutLinks = sampleStrings(info.OutLinks, 200) + } + } + }) } // sendToHarvester 将关键词索引数据通过 HTTP POST 发送到搜索服务器(/l 端点)。 diff --git a/storage/storage.go b/storage/storage.go index c42bc84..9df51b8 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -410,8 +410,28 @@ type SiteInfo struct { } // GetSiteInfo 根据主机名查询网站元信息。 +// 优先从写缓冲中读取(保证与最近 SetSiteInfo 的数据一致),未命中再读 bbolt。 // 若不存在则返回仅有默认空 map 的空 SiteInfo(不报错,方便调用方直接使用)。 func (d *DB) GetSiteInfo(host string) (*SiteInfo, error) { + // 先检查写缓冲中是否有该 host 的最新待写入数据 + d.writeBufMu.Lock() + if op, ok := d.writeBuf["site:"+host]; ok { + d.writeBufMu.Unlock() + var info SiteInfo + if err := decompressUnmarshal(op.data, &info); err == nil { + if info.Languages == nil { + info.Languages = make(map[string]float64) + } + if info.Redirects == nil { + info.Redirects = make(map[string]string) + } + return &info, nil + } + // 反序列化失败(不应发生),fall through 到读 db + } else { + d.writeBufMu.Unlock() + } + var info SiteInfo err := d.db.View(func(tx *bolt.Tx) error { v := tx.Bucket(bucketSiteGate).Get([]byte(host)) @@ -450,6 +470,62 @@ func (d *DB) SetSiteInfo(host string, info *SiteInfo) error { return nil } +// UpdateSiteInfo 原子地读取当前 SiteInfo 并应用修改函数 fn,然后写回。 +// 整个读-改-写过程在 writeBufMu 锁内完成,消除并发 lost update 竞态。 +// 适用于多个 goroutine 对同一 host 的 SiteInfo 进行读-改-写的场景。 +func (d *DB) UpdateSiteInfo(host string, fn func(*SiteInfo)) error { + d.writeBufMu.Lock() + + // 从缓冲或 db 读取最新 SiteInfo + var info *SiteInfo + if op, ok := d.writeBuf["site:"+host]; ok { + var si SiteInfo + if err := decompressUnmarshal(op.data, &si); err == nil { + info = &si + } + } + + if info == nil { + var si SiteInfo + err := d.db.View(func(tx *bolt.Tx) error { + v := tx.Bucket(bucketSiteGate).Get([]byte(host)) + if v == nil { + return fmt.Errorf("not found") + } + return decompressUnmarshal(v, &si) + }) + if err != nil { + info = &SiteInfo{Languages: make(map[string]float64), Redirects: make(map[string]string)} + } else { + info = &si + } + } + if info.Languages == nil { + info.Languages = make(map[string]float64) + } + if info.Redirects == nil { + info.Redirects = make(map[string]string) + } + + // 在锁内调用修改函数 + fn(info) + + // 序列化并写回缓冲 + data, err := marshalCompress(info) + if err != nil { + d.writeBufMu.Unlock() + return err + } + d.writeBuf["site:"+host] = &writeOp{opType: 1, key: host, data: data} + if len(d.writeBuf) >= 5000 { + d.writeBufMu.Unlock() + d.flushWriteBuf() + return nil + } + d.writeBufMu.Unlock() + return nil +} + // ForEachSite 遍历所有网站元信息条目,对每个条目调用 fn。 func (d *DB) ForEachSite(fn func(host string, info *SiteInfo) error) error { return d.db.View(func(tx *bolt.Tx) error {