From 1a9af21755a78bb8689bd1f3830239f81dadc324 Mon Sep 17 00:00:00 2001 From: Philipp Tanlak Date: Thu, 17 Aug 2023 20:31:44 +0200 Subject: refactor --- scrape.go | 168 +++++++++++++++++++++++++++++++------------------------------- 1 file changed, 83 insertions(+), 85 deletions(-) (limited to 'scrape.go') diff --git a/scrape.go b/scrape.go index ac75c73..be26e3c 100644 --- a/scrape.go +++ b/scrape.go @@ -32,133 +32,125 @@ type ScrapeResult struct { Timestamp time.Time `json:"timestamp"` } -type ( - ScrapeFunc func(ScrapeParams) (any, error) - FetchFunc func(url string) (string, error) -) +func (s *ScrapeResult) omit() bool { + return s.Error == nil && s.Data == nil +} + +type ScrapeFunc func(ScrapeParams) (any, error) + +type FetchFunc func(url string) (string, error) + +type target struct { + url string + depth int +} type Scraper struct { ScrapeOptions ScrapeOptions ScrapeFunc ScrapeFunc FetchFunc FetchFunc - Concurrency int visited *hashmap.Map[string, struct{}] wg *sync.WaitGroup + jobs chan target + results chan ScrapeResult } -type target struct { - url string - depth int -} - -type result struct { - url string - data any - links []string - err error -} +func (s *Scraper) init() { + s.visited = hashmap.New[string, struct{}]() + s.wg = &sync.WaitGroup{} + s.jobs = make(chan target, 1024) + s.results = make(chan ScrapeResult) -func (s *Scraper) Scrape() <-chan ScrapeResult { - if s.Concurrency == 0 { - s.Concurrency = 1 - } if s.FetchFunc == nil { s.FetchFunc = Fetch() } + if s.ScrapeOptions.Rate == 0 { s.ScrapeOptions.Rate = 100 } + if len(s.ScrapeOptions.AllowedDomains) == 0 { u, err := url.Parse(s.ScrapeOptions.URL) if err == nil { s.ScrapeOptions.AllowedDomains = []string{u.Host()} } } +} - jobs := make(chan target, 1024) - results := make(chan result) - scraperesults := make(chan ScrapeResult) - s.visited = hashmap.New[string, struct{}]() - s.wg = &sync.WaitGroup{} +func (s *Scraper) Scrape() <-chan ScrapeResult { + s.init() + s.enqueueJob(s.ScrapeOptions.URL, s.ScrapeOptions.Depth) - go s.worker(jobs, results) + go s.worker() + go s.waitClose() - s.wg.Add(1) - s.visited.Set(s.ScrapeOptions.URL, struct{}{}) - jobs <- target{url: s.ScrapeOptions.URL, depth: s.ScrapeOptions.Depth} + return s.results +} - go func() { - s.wg.Wait() - close(jobs) - close(results) - }() +func (s *Scraper) worker() { + var ( + rate = time.Duration(float64(time.Second) / s.ScrapeOptions.Rate) + leakyjobs = leakychan(s.jobs, rate) + ) - go func() { - for res := range results { - scraperesults <- ScrapeResult{ - URL: res.url, - Data: res.data, - Links: res.links, - Error: res.err, - Timestamp: time.Now().UTC(), + for job := range leakyjobs { + go func(job target) { + defer s.wg.Done() + + res := s.process(job) + if !res.omit() { + s.results <- res } - } - close(scraperesults) - }() - return scraperesults -} - -func (s *Scraper) worker(jobs chan target, results chan<- result) { - rate := time.Duration(float64(time.Second) / s.ScrapeOptions.Rate) - for j := range leakychan(jobs, rate) { - j := j - go func() { - res := s.process(j) - - if j.depth > 0 { - for _, l := range res.links { - if _, ok := s.visited.Get(l); ok { - continue - } - - if !s.isURLAllowed(l) { - continue - } - - s.wg.Add(1) - select { - case jobs <- target{url: l, depth: j.depth - 1}: - s.visited.Set(l, struct{}{}) - default: - log.Println("queue is full, can't add url:", l) - s.wg.Done() - } - } + if job.depth <= 0 { + return } - if res.err != nil || res.data != nil { - results <- res + for _, l := range res.Links { + if _, ok := s.visited.Get(l); ok { + continue + } + + if !s.isURLAllowed(l) { + continue + } + + s.enqueueJob(l, job.depth-1) } - s.wg.Done() - }() + }(job) } } -func (s *Scraper) process(job target) result { +func (s *Scraper) process(job target) (res ScrapeResult) { + res.URL = job.url + res.Timestamp = time.Now() + html, err := s.FetchFunc(job.url) if err != nil { - return result{url: job.url, err: err} + res.Error = err + return } - links := Links(html, job.url) - data, err := s.ScrapeFunc(ScrapeParams{HTML: html, URL: job.url}) + res.Links = links(html, job.url) + res.Data, err = s.ScrapeFunc(ScrapeParams{HTML: html, URL: job.url}) if err != nil { - return result{url: job.url, links: links, err: err} + res.Error = err + return } - return result{url: job.url, data: data, links: links} + return +} + +func (s *Scraper) enqueueJob(url string, depth int) { + s.wg.Add(1) + select { + case s.jobs <- target{url: url, depth: depth}: + s.visited.Set(url, struct{}{}) + default: + log.Println("queue is full, can't add url:", url) + s.wg.Done() + } } func (s *Scraper) isURLAllowed(rawurl string) bool { @@ -187,7 +179,13 @@ func (s *Scraper) isURLAllowed(rawurl string) bool { return ok } -func Links(html string, origin string) []string { +func (s *Scraper) waitClose() { + s.wg.Wait() + close(s.jobs) + close(s.results) +} + +func links(html string, origin string) []string { var links []string doc, err := goquery.NewDocumentFromReader(strings.NewReader(html)) if err != nil { -- cgit v1.2.3