feat: fan out scrapes over time, support different scrape intervals

This should help avoid rate-limits on upstream, as well as let admins determine
update latency manually.
This commit is contained in:
dvdrw 2023-10-04 15:54:07 +02:00
parent ef19ed40f2
commit 738d43adea
Signed by: dvdrw
GPG Key ID: 4756FA53D8797D7F
2 changed files with 23 additions and 9 deletions

View File

@ -9,6 +9,7 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"time"
"git.dvdrw.dev/nsmarter/scraper/scraper" "git.dvdrw.dev/nsmarter/scraper/scraper"
) )
@ -24,6 +25,7 @@ var chunkSize = 5
var apiEndpoint string = "https://online.nsmart.rs/publicapi/v1/announcement/announcement.php" var apiEndpoint string = "https://online.nsmart.rs/publicapi/v1/announcement/announcement.php"
var apiKey string var apiKey string
var limitQuerySize = 0 var limitQuerySize = 0
var timeoutPeriod = 180
var fillerUrl string = "localhost" var fillerUrl string = "localhost"
func parseEnvars() { func parseEnvars() {
@ -41,6 +43,13 @@ func parseEnvars() {
Log.Printf("WARN: Invalid value for CHUNK_SIZE. Falling back to default value (%v)\n", defaultChunkSize) Log.Printf("WARN: Invalid value for CHUNK_SIZE. Falling back to default value (%v)\n", defaultChunkSize)
chunkSize = defaultChunkSize chunkSize = defaultChunkSize
} }
case "TIMEOUT_PERIOD":
var err error
timeoutPeriod, err = strconv.Atoi(pair[1])
if err != nil {
Log.Printf("WARN: Invalid value for TIMEOUT_PERIOD. Falling back to default value (%v)\n", 180)
timeoutPeriod = 180
}
case "LIMIT_QUERY_SIZE": case "LIMIT_QUERY_SIZE":
var err error var err error
limitQuerySize, err = strconv.Atoi(pair[1]) limitQuerySize, err = strconv.Atoi(pair[1])
@ -86,16 +95,19 @@ func main() {
} }
results := make(chan []scraper.ScrapeResult, 200) results := make(chan []scraper.ScrapeResult, 200)
for _, chunk := range stationChunks { go func() {
go scraper.ScheduleScrape(chunk, for _, chunk := range stationChunks {
results, go scraper.ScheduleScrape(chunk,
scraper.ApiConfig{Endpoint: apiEndpoint, results,
Key: apiKey}) scraper.ApiConfig{Endpoint: apiEndpoint,
} Key: apiKey,
Timeout: int64(timeoutPeriod)})
// Fan out scrapes over time so as to avoid upstream rate limits
time.Sleep(time.Millisecond * 100)
}
}()
for r := range results { for r := range results {
fmt.Printf("Received data: %#v\n", r)
json, err := json.Marshal(r) json, err := json.Marshal(r)
if err != nil { if err != nil {
fmt.Print("Couldn't serialise struct to JSON: ", err) fmt.Print("Couldn't serialise struct to JSON: ", err)

View File

@ -60,6 +60,7 @@ func scrapeRange(s []Station) string {
type ApiConfig struct { type ApiConfig struct {
Endpoint, Key string Endpoint, Key string
Timeout int64
} }
func grabData(stations []Station, c ApiConfig) (map[string][]map[string]interface{}, error) { func grabData(stations []Station, c ApiConfig) (map[string][]map[string]interface{}, error) {
@ -296,6 +297,7 @@ func ScheduleScrape(chunk []Station, c chan []ScrapeResult, a ApiConfig) {
} }
c <- r c <- r
time.Sleep(time.Minute * 3) duration := time.Duration(a.Timeout) * time.Second
time.Sleep(duration)
ScheduleScrape(chunk, c, a) ScheduleScrape(chunk, c, a)
} }