// Package harvester implements the index-writing server (port 5000). // // It receives (url, keywords) payloads from the crawler, accumulates them in // memory, then flushes to the persistent inverted index when the in-memory // row count exceeds the configured threshold. package harvester import ( "encoding/json" "log" "math/rand" "net/http" "strings" "sync" "sync/atomic" "sese-engine/config" "sese-engine/info" "sese-engine/storage" ) // Server is the harvester HTTP server. type Server struct { db *storage.DB // in-memory accumulator: keyword → [(weight, url)] mem map[string][]storage.IndexEntry memMu sync.Mutex rowCount int64 // approximate total in-memory rows flushMu sync.Mutex // only one flush at a time infoSvc *info.Service } // New creates a harvester Server. func New(db *storage.DB, infoSvc *info.Service) *Server { return &Server{ db: db, mem: make(map[string][]storage.IndexEntry), infoSvc: infoSvc, } } // ingestPayload is the JSON body sent by the crawler. type ingestPayload struct { URL string `json:"url"` Keywords []struct { Word string `json:"word"` Weight float32 `json:"weight"` } `json:"keywords"` } // Handler returns the http.Handler for the harvester. func (s *Server) Handler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/l", s.handleIngest) return mux } func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } var payload ingestPayload if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { http.Error(w, "bad json: "+err.Error(), http.StatusBadRequest) return } // Sanitise URL payload.URL = strings.ReplaceAll(payload.URL, "\n", "") if payload.URL == "" { http.Error(w, "empty url", http.StatusBadRequest) return } s.memMu.Lock() for _, kw := range payload.Keywords { key := kw.Word entries := s.mem[key] // Threshold-based early discard if len(entries) > 15 { low := s.lowThreshold(key) if float64(kw.Weight) < low { continue } } s.mem[key] = append(entries, storage.IndexEntry{ Weight: kw.Weight, URL: payload.URL, }) atomic.AddInt64(&s.rowCount, 1) } s.memMu.Unlock() // Check if we should flush if atomic.LoadInt64(&s.rowCount) > int64(config.BigCleanThreshold) { go s.flush() } w.Write([]byte("ok")) } // lowThreshold returns the minimum weight needed to enter the index for key. func (s *Server) lowThreshold(key string) float64 { existing, _ := s.db.GetIndex(key) if len(existing) < config.MaxURLsPerKey { return -1 } // Find the config.MaxURLsPerKey-th highest weight weights := make([]float64, len(existing)) for i, e := range existing { weights[i] = float64(e.Weight) } // Partial sort: find threshold at position MaxURLsPerKey-1 return nthLargest(weights, config.MaxURLsPerKey-1) * 0.05 } // flush merges the in-memory accumulator into the persistent index. func (s *Server) flush() { if !s.flushMu.TryLock() { return // another flush is running } defer s.flushMu.Unlock() s.memMu.Lock() snapshot := s.mem s.mem = make(map[string][]storage.IndexEntry) atomic.StoreInt64(&s.rowCount, 0) s.memMu.Unlock() log.Printf("[harvester] flushing %d keys", len(snapshot)) items := make([]struct { key string entries []storage.IndexEntry }, 0, len(snapshot)) for k, v := range snapshot { items = append(items, struct { key string entries []storage.IndexEntry }{k, v}) } rand.Shuffle(len(items), func(i, j int) { items[i], items[j] = items[j], items[i] }) // Parallel merge type result struct { key string entries []storage.IndexEntry } results := make(chan result, len(items)) sem := make(chan struct{}, 8) for _, item := range items { sem <- struct{}{} go func(k string, newEntries []storage.IndexEntry) { defer func() { <-sem }() merged := s.mergeKey(k, newEntries) results <- result{k, merged} }(item.key, item.entries) } // Collect batch := make(map[string][]storage.IndexEntry, len(items)) for range items { r := <-results batch[r.key] = r.entries } if err := s.db.BatchSetIndex(batch); err != nil { log.Printf("[harvester] flush write error: %v", err) } log.Printf("[harvester] flush done, %d keys written", len(batch)) } // mergeKey merges new entries with existing index entries for a key. func (s *Server) mergeKey(key string, newEntries []storage.IndexEntry) []storage.IndexEntry { existing, _ := s.db.GetIndex(key) // Discard new key if too few URLs if len(existing) == 0 && len(newEntries) < config.MinURLsForNewKey { return nil } merged := dedup(append(newEntries, existing...)) // Occasional URL normalisation dedup if rand.Float64() < 0.02 { merged = dedupNormalised(merged) } // Trim if over limit if float64(len(merged)) > float64(config.MaxURLsPerKey)*1.1 || rand.Float64() < 0.02 { merged = trim(merged, s.infoSvc, config.MaxURLsPerKey, config.MaxSameDomainPerKey) } return merged } // ---- helpers ---- func dedup(entries []storage.IndexEntry) []storage.IndexEntry { seen := make(map[string]bool, len(entries)) out := make([]storage.IndexEntry, 0, len(entries)) for _, e := range entries { if seen[e.URL] { continue } seen[e.URL] = true out = append(out, e) } return out } func dedupNormalised(entries []storage.IndexEntry) []storage.IndexEntry { // Sort by URL length descending, then dedup by normalised URL (strip scheme, trailing slash) sorted := make([]storage.IndexEntry, len(entries)) copy(sorted, entries) for i := 0; i < len(sorted)-1; i++ { for j := i + 1; j < len(sorted); j++ { if len(sorted[j].URL) > len(sorted[i].URL) { sorted[i], sorted[j] = sorted[j], sorted[i] } } } seen := make(map[string]bool) out := make([]storage.IndexEntry, 0, len(sorted)) for _, e := range sorted { k := normaliseURL(e.URL) if seen[k] { continue } seen[k] = true out = append(out, e) } return out } func normaliseURL(u string) string { if strings.HasPrefix(u, "https://") { u = u[8:] } else if strings.HasPrefix(u, "http://") { u = u[7:] } return strings.TrimRight(u, "/") } // trim reduces entries to at most limit, keeping at most sameDomainLimit per domain. func trim(entries []storage.IndexEntry, infoSvc *info.Service, limit, sameDomainLimit int) []storage.IndexEntry { // Sort by effective score: weight * (1 + backlink) scored := make([]storage.IndexEntry, len(entries)) copy(scored, entries) for i := 0; i < len(scored)-1; i++ { for j := i + 1; j < len(scored); j++ { si := float64(scored[i].Weight) * (1 + infoSvc.Prosper(scored[i].URL)) sj := float64(scored[j].Weight) * (1 + infoSvc.Prosper(scored[j].URL)) if sj > si { scored[i], scored[j] = scored[j], scored[i] } } } // Per-domain cap domainCount := make(map[string]int) out := make([]storage.IndexEntry, 0, limit) for _, e := range scored { host := netloc(e.URL) if host == "" { host = e.URL } host = strings.ToLower(host) // Allow homepage URLs regardless of limit isHome := isHomepage(e.URL) if !isHome && domainCount[host] >= sameDomainLimit { continue } domainCount[host]++ out = append(out, e) if len(out) >= limit { break } } return out } func isHomepage(u string) bool { u = strings.TrimPrefix(u, "https://") u = strings.TrimPrefix(u, "http://") return strings.Count(strings.TrimRight(u, "/"), "/") == 0 } func netloc(rawURL string) string { parts := strings.SplitN(rawURL, "/", 4) if len(parts) >= 3 && (parts[0] == "http:" || parts[0] == "https:") && parts[1] == "" { return parts[2] } return "" } // nthLargest returns the n-th largest value in a slice (0-indexed). func nthLargest(values []float64, n int) float64 { if n >= len(values) { return 0 } cp := make([]float64, len(values)) copy(cp, values) // Partial sort descending for i := 0; i <= n; i++ { maxIdx := i for j := i + 1; j < len(cp); j++ { if cp[j] > cp[maxIdx] { maxIdx = j } } cp[i], cp[maxIdx] = cp[maxIdx], cp[i] } return cp[n] } // ListenAndServe starts the harvester on the given address. func (s *Server) ListenAndServe(addr string) error { log.Printf("[harvester] listening on %s", addr) return http.ListenAndServe(addr, s.Handler()) }