summaryrefslogtreecommitdiff
path: root/scrape.go
diff options
context:
space:
mode:
Diffstat (limited to 'scrape.go')
-rw-r--r--scrape.go168
1 files changed, 83 insertions, 85 deletions
diff --git a/scrape.go b/scrape.go
index ac75c73..be26e3c 100644
--- a/scrape.go
+++ b/scrape.go
@@ -32,133 +32,125 @@ type ScrapeResult struct {
Timestamp time.Time `json:"timestamp"`
}
-type (
- ScrapeFunc func(ScrapeParams) (any, error)
- FetchFunc func(url string) (string, error)
-)
+func (s *ScrapeResult) omit() bool {
+ return s.Error == nil && s.Data == nil
+}
+
+type ScrapeFunc func(ScrapeParams) (any, error)
+
+type FetchFunc func(url string) (string, error)
+
+type target struct {
+ url string
+ depth int
+}
type Scraper struct {
ScrapeOptions ScrapeOptions
ScrapeFunc ScrapeFunc
FetchFunc FetchFunc
- Concurrency int
visited *hashmap.Map[string, struct{}]
wg *sync.WaitGroup
+ jobs chan target
+ results chan ScrapeResult
}
-type target struct {
- url string
- depth int
-}
-
-type result struct {
- url string
- data any
- links []string
- err error
-}
+func (s *Scraper) init() {
+ s.visited = hashmap.New[string, struct{}]()
+ s.wg = &sync.WaitGroup{}
+ s.jobs = make(chan target, 1024)
+ s.results = make(chan ScrapeResult)
-func (s *Scraper) Scrape() <-chan ScrapeResult {
- if s.Concurrency == 0 {
- s.Concurrency = 1
- }
if s.FetchFunc == nil {
s.FetchFunc = Fetch()
}
+
if s.ScrapeOptions.Rate == 0 {
s.ScrapeOptions.Rate = 100
}
+
if len(s.ScrapeOptions.AllowedDomains) == 0 {
u, err := url.Parse(s.ScrapeOptions.URL)
if err == nil {
s.ScrapeOptions.AllowedDomains = []string{u.Host()}
}
}
+}
- jobs := make(chan target, 1024)
- results := make(chan result)
- scraperesults := make(chan ScrapeResult)
- s.visited = hashmap.New[string, struct{}]()
- s.wg = &sync.WaitGroup{}
+func (s *Scraper) Scrape() <-chan ScrapeResult {
+ s.init()
+ s.enqueueJob(s.ScrapeOptions.URL, s.ScrapeOptions.Depth)
- go s.worker(jobs, results)
+ go s.worker()
+ go s.waitClose()
- s.wg.Add(1)
- s.visited.Set(s.ScrapeOptions.URL, struct{}{})
- jobs <- target{url: s.ScrapeOptions.URL, depth: s.ScrapeOptions.Depth}
+ return s.results
+}
- go func() {
- s.wg.Wait()
- close(jobs)
- close(results)
- }()
+func (s *Scraper) worker() {
+ var (
+ rate = time.Duration(float64(time.Second) / s.ScrapeOptions.Rate)
+ leakyjobs = leakychan(s.jobs, rate)
+ )
- go func() {
- for res := range results {
- scraperesults <- ScrapeResult{
- URL: res.url,
- Data: res.data,
- Links: res.links,
- Error: res.err,
- Timestamp: time.Now().UTC(),
+ for job := range leakyjobs {
+ go func(job target) {
+ defer s.wg.Done()
+
+ res := s.process(job)
+ if !res.omit() {
+ s.results <- res
}
- }
- close(scraperesults)
- }()
- return scraperesults
-}
-
-func (s *Scraper) worker(jobs chan target, results chan<- result) {
- rate := time.Duration(float64(time.Second) / s.ScrapeOptions.Rate)
- for j := range leakychan(jobs, rate) {
- j := j
- go func() {
- res := s.process(j)
-
- if j.depth > 0 {
- for _, l := range res.links {
- if _, ok := s.visited.Get(l); ok {
- continue
- }
-
- if !s.isURLAllowed(l) {
- continue
- }
-
- s.wg.Add(1)
- select {
- case jobs <- target{url: l, depth: j.depth - 1}:
- s.visited.Set(l, struct{}{})
- default:
- log.Println("queue is full, can't add url:", l)
- s.wg.Done()
- }
- }
+ if job.depth <= 0 {
+ return
}
- if res.err != nil || res.data != nil {
- results <- res
+ for _, l := range res.Links {
+ if _, ok := s.visited.Get(l); ok {
+ continue
+ }
+
+ if !s.isURLAllowed(l) {
+ continue
+ }
+
+ s.enqueueJob(l, job.depth-1)
}
- s.wg.Done()
- }()
+ }(job)
}
}
-func (s *Scraper) process(job target) result {
+func (s *Scraper) process(job target) (res ScrapeResult) {
+ res.URL = job.url
+ res.Timestamp = time.Now()
+
html, err := s.FetchFunc(job.url)
if err != nil {
- return result{url: job.url, err: err}
+ res.Error = err
+ return
}
- links := Links(html, job.url)
- data, err := s.ScrapeFunc(ScrapeParams{HTML: html, URL: job.url})
+ res.Links = links(html, job.url)
+ res.Data, err = s.ScrapeFunc(ScrapeParams{HTML: html, URL: job.url})
if err != nil {
- return result{url: job.url, links: links, err: err}
+ res.Error = err
+ return
}
- return result{url: job.url, data: data, links: links}
+ return
+}
+
+func (s *Scraper) enqueueJob(url string, depth int) {
+ s.wg.Add(1)
+ select {
+ case s.jobs <- target{url: url, depth: depth}:
+ s.visited.Set(url, struct{}{})
+ default:
+ log.Println("queue is full, can't add url:", url)
+ s.wg.Done()
+ }
}
func (s *Scraper) isURLAllowed(rawurl string) bool {
@@ -187,7 +179,13 @@ func (s *Scraper) isURLAllowed(rawurl string) bool {
return ok
}
-func Links(html string, origin string) []string {
+func (s *Scraper) waitClose() {
+ s.wg.Wait()
+ close(s.jobs)
+ close(s.results)
+}
+
+func links(html string, origin string) []string {
var links []string
doc, err := goquery.NewDocumentFromReader(strings.NewReader(html))
if err != nil {