Compare commits

...

2 Commits

Author SHA1 Message Date
dvdrw 66ad4e1666
feat: make scraping much more robust
Sometimes the upstream API will just drop most of the data included. This commit
assumes less of that data will be included, as well as using the most out of the
data.

For example, using line IDs from upstream is entirely avoided, using line names
and titles to determine lines and their directions.
2023-10-04 15:56:32 +02:00
dvdrw 738d43adea
feat: fan out scrapes over time, support different scrape intervals
This should help avoid rate-limits on upstream, as well as let admins determine
update latency manually.
2023-10-04 15:54:07 +02:00
2 changed files with 78 additions and 32 deletions

View File

@ -9,6 +9,7 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"time"
"git.dvdrw.dev/nsmarter/scraper/scraper" "git.dvdrw.dev/nsmarter/scraper/scraper"
) )
@ -24,6 +25,7 @@ var chunkSize = 5
var apiEndpoint string = "https://online.nsmart.rs/publicapi/v1/announcement/announcement.php" var apiEndpoint string = "https://online.nsmart.rs/publicapi/v1/announcement/announcement.php"
var apiKey string var apiKey string
var limitQuerySize = 0 var limitQuerySize = 0
var timeoutPeriod = 180
var fillerUrl string = "localhost" var fillerUrl string = "localhost"
func parseEnvars() { func parseEnvars() {
@ -41,6 +43,13 @@ func parseEnvars() {
Log.Printf("WARN: Invalid value for CHUNK_SIZE. Falling back to default value (%v)\n", defaultChunkSize) Log.Printf("WARN: Invalid value for CHUNK_SIZE. Falling back to default value (%v)\n", defaultChunkSize)
chunkSize = defaultChunkSize chunkSize = defaultChunkSize
} }
case "TIMEOUT_PERIOD":
var err error
timeoutPeriod, err = strconv.Atoi(pair[1])
if err != nil {
Log.Printf("WARN: Invalid value for TIMEOUT_PERIOD. Falling back to default value (%v)\n", 180)
timeoutPeriod = 180
}
case "LIMIT_QUERY_SIZE": case "LIMIT_QUERY_SIZE":
var err error var err error
limitQuerySize, err = strconv.Atoi(pair[1]) limitQuerySize, err = strconv.Atoi(pair[1])
@ -86,16 +95,19 @@ func main() {
} }
results := make(chan []scraper.ScrapeResult, 200) results := make(chan []scraper.ScrapeResult, 200)
for _, chunk := range stationChunks { go func() {
go scraper.ScheduleScrape(chunk, for _, chunk := range stationChunks {
results, go scraper.ScheduleScrape(chunk,
scraper.ApiConfig{Endpoint: apiEndpoint, results,
Key: apiKey}) scraper.ApiConfig{Endpoint: apiEndpoint,
} Key: apiKey,
Timeout: int64(timeoutPeriod)})
// Fan out scrapes over time so as to avoid upstream rate limits
time.Sleep(time.Millisecond * 100)
}
}()
for r := range results { for r := range results {
fmt.Printf("Received data: %#v\n", r)
json, err := json.Marshal(r) json, err := json.Marshal(r)
if err != nil { if err != nil {
fmt.Print("Couldn't serialise struct to JSON: ", err) fmt.Print("Couldn't serialise struct to JSON: ", err)

View File

@ -31,11 +31,11 @@ type StationInfo struct {
} }
type LineInfo struct { type LineInfo struct {
Id int `json:"id"` Name string `json:"name"`
Name string `json:"name"` Title string `json:"title"`
Title string `json:"title"` Direction int `json:"direction"`
Stations []StationInfo `json:"stations"` Stations []StationInfo `json:"stations"`
Route []Coords `json:"route"` Route []Coords `json:"route"`
} }
type ScrapeResult struct { type ScrapeResult struct {
@ -60,6 +60,7 @@ func scrapeRange(s []Station) string {
type ApiConfig struct { type ApiConfig struct {
Endpoint, Key string Endpoint, Key string
Timeout int64
} }
func grabData(stations []Station, c ApiConfig) (map[string][]map[string]interface{}, error) { func grabData(stations []Station, c ApiConfig) (map[string][]map[string]interface{}, error) {
@ -141,25 +142,40 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
var lineInfo LineInfo var lineInfo LineInfo
lineId, err := strconv.Atoi(bus["id"].(string))
if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err)
continue
}
lineInfo.Id = lineId
lineName, ok := bus["line_number"].(string) lineName, ok := bus["line_number"].(string)
if !ok { if !ok {
lineName = "" lineName = ""
} }
lineInfo.Name = lineName lineInfo.Name = strings.TrimSpace(lineName)
lineTitle, ok := bus["line_title"].(string) lineTitle, ok := bus["line_title"].(string)
if ok { if ok {
lineInfo.Title = lineTitle lineInfo.Title = lineTitle
} }
mainLineTitle, ok := bus["main_line_title"].(string)
if ok {
if mainLineTitle == lineTitle {
lineInfo.Direction = 1
} else {
lineInfo.Direction = 0
}
} else {
lineInfo.Direction = -1
}
var vehicles []Vehicle var vehicles []Vehicle
garageNoStr, ok := bus["garage_no"].(string)
if ok {
vehicles = append(vehicles, Vehicle{
GarageId: garageNoStr,
Coords: Coords{},
AtStationId: -1,
AtStationName: "",
})
}
vehiclesJson, ok := bus["vehicles"].([]interface{}) vehiclesJson, ok := bus["vehicles"].([]interface{})
if !ok { if !ok {
@ -174,34 +190,38 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
continue continue
} }
stationNumber, err := strconv.Atoi(vehicle["station_number"].(string)) stationNumberString, ok := vehicle["station_number"].(string)
if err != nil { var stationNumber int = -1
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err) if ok {
continue stationNumber, err = strconv.Atoi(stationNumberString)
if err != nil {
Log.Printf("WARN: No station number for vehicle stationId=%v\n\t%v", id, err)
continue
}
} }
lat, err := strconv.ParseFloat(vehicle["lat"].(string), 64) lat, err := strconv.ParseFloat(vehicle["lat"].(string), 64)
if err != nil { if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err) Log.Printf("WARN: Failed to parse vehicle lat for stationId=%v\n\t%v", id, err)
continue continue
} }
lon, err := strconv.ParseFloat(vehicle["lng"].(string), 64) lon, err := strconv.ParseFloat(vehicle["lng"].(string), 64)
if err != nil { if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err) Log.Printf("WARN: Failed to parse vehicle lng for stationId=%v\n\t%v", id, err)
continue continue
} }
garageId, ok := vehicle["garageNo"].(string) garageId, ok := vehicle["garageNo"].(string)
if !ok { if !ok {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\tVehicle's garageNo invalid or missing: %v", id, vehicle) Log.Printf("WARN: Failed to parse vehicle garageNo for stationId=%v\n\tVehicle's garageNo invalid or missing: %v", id, vehicle)
continue continue
} }
atStationName, ok := vehicle["station_name"].(string) atStationName, ok := vehicle["station_name"].(string)
if !ok { if !ok {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\tVehicle's station_name invalid or missing: %v", id, vehicle) Log.Printf("WARN: Failed to parse station_name for vehicle at stationId=%v\n\tVehicle's station_name invalid or missing: %v", id, vehicle)
continue atStationName = ""
} }
vehicles = append(vehicles, Vehicle{ vehicles = append(vehicles, Vehicle{
@ -212,7 +232,12 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
}) })
} }
lineInfo.Route = scrapeLineRoute(bus["line_route"].([]interface{})) lineRouteJson, ok := bus["line_route"].([]interface{})
if ok {
lineInfo.Route = scrapeLineRoute(lineRouteJson)
} else {
lineInfo.Route = []Coords{}
}
all_stations, ok := bus["all_stations"].([]interface{}) all_stations, ok := bus["all_stations"].([]interface{})
if !ok { if !ok {
@ -274,6 +299,14 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
} }
SKIP_AUX_INFO: SKIP_AUX_INFO:
// Sometimes we don't get the vehicles array in the upstream response.
// In that case, we grab the only info about the bus we can get
// (its garageNo from the toplevel garage_no field).
// In case we parse *both* buses, pick the one last added.
if len(vehicles) > 1 {
vehicles = []Vehicle{vehicles[len(vehicles)-1]}
}
results = append(results, ScrapeResult{ results = append(results, ScrapeResult{
Success: true, Success: true,
Id: id, Id: id,
@ -296,6 +329,7 @@ func ScheduleScrape(chunk []Station, c chan []ScrapeResult, a ApiConfig) {
} }
c <- r c <- r
time.Sleep(time.Minute * 3) duration := time.Duration(a.Timeout) * time.Second
time.Sleep(duration)
ScheduleScrape(chunk, c, a) ScheduleScrape(chunk, c, a)
} }