Compare commits

...

2 Commits

Author SHA1 Message Date
dvdrw 66ad4e1666
feat: make scraping much more robust
Sometimes the upstream API will just drop most of the data included. This commit
assumes less of that data will be included, as well as using the most out of the
data.

For example, using line IDs from upstream is entirely avoided, using line names
and titles to determine lines and their directions.
2023-10-04 15:56:32 +02:00
dvdrw 738d43adea
feat: fan out scrapes over time, support different scrape intervals
This should help avoid rate-limits on upstream, as well as let admins determine
update latency manually.
2023-10-04 15:54:07 +02:00
2 changed files with 78 additions and 32 deletions

View File

@ -9,6 +9,7 @@ import (
"os"
"strconv"
"strings"
"time"
"git.dvdrw.dev/nsmarter/scraper/scraper"
)
@ -24,6 +25,7 @@ var chunkSize = 5
var apiEndpoint string = "https://online.nsmart.rs/publicapi/v1/announcement/announcement.php"
var apiKey string
var limitQuerySize = 0
var timeoutPeriod = 180
var fillerUrl string = "localhost"
func parseEnvars() {
@ -41,6 +43,13 @@ func parseEnvars() {
Log.Printf("WARN: Invalid value for CHUNK_SIZE. Falling back to default value (%v)\n", defaultChunkSize)
chunkSize = defaultChunkSize
}
case "TIMEOUT_PERIOD":
var err error
timeoutPeriod, err = strconv.Atoi(pair[1])
if err != nil {
Log.Printf("WARN: Invalid value for TIMEOUT_PERIOD. Falling back to default value (%v)\n", 180)
timeoutPeriod = 180
}
case "LIMIT_QUERY_SIZE":
var err error
limitQuerySize, err = strconv.Atoi(pair[1])
@ -86,16 +95,19 @@ func main() {
}
results := make(chan []scraper.ScrapeResult, 200)
go func() {
for _, chunk := range stationChunks {
go scraper.ScheduleScrape(chunk,
results,
scraper.ApiConfig{Endpoint: apiEndpoint,
Key: apiKey})
Key: apiKey,
Timeout: int64(timeoutPeriod)})
// Fan out scrapes over time so as to avoid upstream rate limits
time.Sleep(time.Millisecond * 100)
}
}()
for r := range results {
fmt.Printf("Received data: %#v\n", r)
json, err := json.Marshal(r)
if err != nil {
fmt.Print("Couldn't serialise struct to JSON: ", err)

View File

@ -31,9 +31,9 @@ type StationInfo struct {
}
type LineInfo struct {
Id int `json:"id"`
Name string `json:"name"`
Title string `json:"title"`
Direction int `json:"direction"`
Stations []StationInfo `json:"stations"`
Route []Coords `json:"route"`
}
@ -60,6 +60,7 @@ func scrapeRange(s []Station) string {
type ApiConfig struct {
Endpoint, Key string
Timeout int64
}
func grabData(stations []Station, c ApiConfig) (map[string][]map[string]interface{}, error) {
@ -141,25 +142,40 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
var lineInfo LineInfo
lineId, err := strconv.Atoi(bus["id"].(string))
if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err)
continue
}
lineInfo.Id = lineId
lineName, ok := bus["line_number"].(string)
if !ok {
lineName = ""
}
lineInfo.Name = lineName
lineInfo.Name = strings.TrimSpace(lineName)
lineTitle, ok := bus["line_title"].(string)
if ok {
lineInfo.Title = lineTitle
}
mainLineTitle, ok := bus["main_line_title"].(string)
if ok {
if mainLineTitle == lineTitle {
lineInfo.Direction = 1
} else {
lineInfo.Direction = 0
}
} else {
lineInfo.Direction = -1
}
var vehicles []Vehicle
garageNoStr, ok := bus["garage_no"].(string)
if ok {
vehicles = append(vehicles, Vehicle{
GarageId: garageNoStr,
Coords: Coords{},
AtStationId: -1,
AtStationName: "",
})
}
vehiclesJson, ok := bus["vehicles"].([]interface{})
if !ok {
@ -174,34 +190,38 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
continue
}
stationNumber, err := strconv.Atoi(vehicle["station_number"].(string))
stationNumberString, ok := vehicle["station_number"].(string)
var stationNumber int = -1
if ok {
stationNumber, err = strconv.Atoi(stationNumberString)
if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err)
Log.Printf("WARN: No station number for vehicle stationId=%v\n\t%v", id, err)
continue
}
}
lat, err := strconv.ParseFloat(vehicle["lat"].(string), 64)
if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err)
Log.Printf("WARN: Failed to parse vehicle lat for stationId=%v\n\t%v", id, err)
continue
}
lon, err := strconv.ParseFloat(vehicle["lng"].(string), 64)
if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err)
Log.Printf("WARN: Failed to parse vehicle lng for stationId=%v\n\t%v", id, err)
continue
}
garageId, ok := vehicle["garageNo"].(string)
if !ok {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\tVehicle's garageNo invalid or missing: %v", id, vehicle)
Log.Printf("WARN: Failed to parse vehicle garageNo for stationId=%v\n\tVehicle's garageNo invalid or missing: %v", id, vehicle)
continue
}
atStationName, ok := vehicle["station_name"].(string)
if !ok {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\tVehicle's station_name invalid or missing: %v", id, vehicle)
continue
Log.Printf("WARN: Failed to parse station_name for vehicle at stationId=%v\n\tVehicle's station_name invalid or missing: %v", id, vehicle)
atStationName = ""
}
vehicles = append(vehicles, Vehicle{
@ -212,7 +232,12 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
})
}
lineInfo.Route = scrapeLineRoute(bus["line_route"].([]interface{}))
lineRouteJson, ok := bus["line_route"].([]interface{})
if ok {
lineInfo.Route = scrapeLineRoute(lineRouteJson)
} else {
lineInfo.Route = []Coords{}
}
all_stations, ok := bus["all_stations"].([]interface{})
if !ok {
@ -274,6 +299,14 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
}
SKIP_AUX_INFO:
// Sometimes we don't get the vehicles array in the upstream response.
// In that case, we grab the only info about the bus we can get
// (its garageNo from the toplevel garage_no field).
// In case we parse *both* buses, pick the one last added.
if len(vehicles) > 1 {
vehicles = []Vehicle{vehicles[len(vehicles)-1]}
}
results = append(results, ScrapeResult{
Success: true,
Id: id,
@ -296,6 +329,7 @@ func ScheduleScrape(chunk []Station, c chan []ScrapeResult, a ApiConfig) {
}
c <- r
time.Sleep(time.Minute * 3)
duration := time.Duration(a.Timeout) * time.Second
time.Sleep(duration)
ScheduleScrape(chunk, c, a)
}