From 0d6494d164cc490d62473eae0fbd79d5573bb380 Mon Sep 17 00:00:00 2001 From: Philipp Tanlak Date: Wed, 7 Feb 2024 23:20:55 +0100 Subject: Add retry module and change rate to requests per minute (#37) --- modules/ratelimit/ratelimit.go | 62 ++++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 23 deletions(-) (limited to 'modules/ratelimit/ratelimit.go') diff --git a/modules/ratelimit/ratelimit.go b/modules/ratelimit/ratelimit.go index b23cd7a..152c6fd 100644 --- a/modules/ratelimit/ratelimit.go +++ b/modules/ratelimit/ratelimit.go @@ -5,6 +5,7 @@ package ratelimit import ( + "math" "net/http" "time" @@ -16,10 +17,12 @@ func init() { } type Module struct { - Rate float64 `json:"rate"` + Rate int `json:"rate"` + Concurrency int `json:"concurrency"` - ticker *time.Ticker - semaphore chan struct{} + ticker *time.Ticker + ratelimit chan struct{} + concurrency chan struct{} } func (Module) ModuleInfo() flyscrape.ModuleInfo { @@ -30,41 +33,54 @@ func (Module) ModuleInfo() flyscrape.ModuleInfo { } func (m *Module) Provision(v flyscrape.Context) { - if m.disabled() { - return - } - - rate := time.Duration(float64(time.Second) / m.Rate) + if m.rateLimitEnabled() { + rate := time.Duration(float64(time.Minute) / float64(m.Rate)) + m.ticker = time.NewTicker(rate) + m.ratelimit = make(chan struct{}, int(math.Max(float64(m.Rate)/10, 1))) - m.ticker = time.NewTicker(rate) - m.semaphore = make(chan struct{}, 1) + go func() { + m.ratelimit <- struct{}{} + for range m.ticker.C { + m.ratelimit <- struct{}{} + } + }() + } - go func() { - for range m.ticker.C { - m.semaphore <- struct{}{} + if m.concurrencyEnabled() { + m.concurrency = make(chan struct{}, m.Concurrency) + for i := 0; i < m.Concurrency; i++ { + m.concurrency <- struct{}{} } - }() + } } func (m *Module) AdaptTransport(t http.RoundTripper) http.RoundTripper { - if m.disabled() { - return t - } return flyscrape.RoundTripFunc(func(r *http.Request) (*http.Response, error) { - <-m.semaphore + if m.rateLimitEnabled() { + <-m.ratelimit + } + + if m.concurrencyEnabled() { + <-m.concurrency + defer func() { m.concurrency <- struct{}{} }() + } + return t.RoundTrip(r) }) } func (m *Module) Finalize() { - if m.disabled() { - return + if m.rateLimitEnabled() { + m.ticker.Stop() } - m.ticker.Stop() } -func (m *Module) disabled() bool { - return m.Rate == 0 +func (m *Module) rateLimitEnabled() bool { + return m.Rate != 0 +} + +func (m *Module) concurrencyEnabled() bool { + return m.Concurrency > 0 } var ( -- cgit v1.2.3