Compare commits

...

4 Commits

Author SHA1 Message Date
66ad4e1666 feat: make scraping much more robust
Sometimes the upstream API will just drop most of the data included. This commit
assumes less of that data will be included, as well as using the most out of the
data.

For example, using line IDs from upstream is entirely avoided, using line names
and titles to determine lines and their directions.
2023-10-04 15:56:32 +02:00
738d43adea feat: fan out scrapes over time, support different scrape intervals
This should help avoid rate-limits on upstream, as well as let admins determine
update latency manually.
2023-10-04 15:54:07 +02:00
ef19ed40f2 feat: feed data to upstream over REST instead of printing to stdout 2023-09-29 00:53:18 +02:00
bcfeabc9c1 feat: include line id in LineInfo 2023-09-29 00:52:36 +02:00
3 changed files with 120 additions and 41 deletions

BIN
main/main

Binary file not shown.

View File

@@ -1,12 +1,17 @@
package main package main
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"git.dvdrw.dev/nsmarter/scraper/scraper"
"log" "log"
"net/http"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"time"
"git.dvdrw.dev/nsmarter/scraper/scraper"
) )
var Log = log.Default() var Log = log.Default()
@@ -20,6 +25,8 @@ var chunkSize = 5
var apiEndpoint string = "https://online.nsmart.rs/publicapi/v1/announcement/announcement.php" var apiEndpoint string = "https://online.nsmart.rs/publicapi/v1/announcement/announcement.php"
var apiKey string var apiKey string
var limitQuerySize = 0 var limitQuerySize = 0
var timeoutPeriod = 180
var fillerUrl string = "localhost"
func parseEnvars() { func parseEnvars() {
for _, e := range os.Environ() { for _, e := range os.Environ() {
@@ -36,6 +43,13 @@ func parseEnvars() {
Log.Printf("WARN: Invalid value for CHUNK_SIZE. Falling back to default value (%v)\n", defaultChunkSize) Log.Printf("WARN: Invalid value for CHUNK_SIZE. Falling back to default value (%v)\n", defaultChunkSize)
chunkSize = defaultChunkSize chunkSize = defaultChunkSize
} }
case "TIMEOUT_PERIOD":
var err error
timeoutPeriod, err = strconv.Atoi(pair[1])
if err != nil {
Log.Printf("WARN: Invalid value for TIMEOUT_PERIOD. Falling back to default value (%v)\n", 180)
timeoutPeriod = 180
}
case "LIMIT_QUERY_SIZE": case "LIMIT_QUERY_SIZE":
var err error var err error
limitQuerySize, err = strconv.Atoi(pair[1]) limitQuerySize, err = strconv.Atoi(pair[1])
@@ -47,6 +61,8 @@ func parseEnvars() {
apiEndpoint = pair[1] apiEndpoint = pair[1]
case "API_KEY": case "API_KEY":
apiKey = pair[1] apiKey = pair[1]
case "FILLER_URL":
fillerUrl = pair[1]
} }
} }
} }
@@ -79,14 +95,34 @@ func main() {
} }
results := make(chan []scraper.ScrapeResult, 200) results := make(chan []scraper.ScrapeResult, 200)
for _, chunk := range stationChunks { go func() {
go scraper.ScheduleScrape(chunk, for _, chunk := range stationChunks {
results, go scraper.ScheduleScrape(chunk,
scraper.ApiConfig{Endpoint: apiEndpoint, results,
Key: apiKey}) scraper.ApiConfig{Endpoint: apiEndpoint,
} Key: apiKey,
Timeout: int64(timeoutPeriod)})
// Fan out scrapes over time so as to avoid upstream rate limits
time.Sleep(time.Millisecond * 100)
}
}()
for r := range results { for r := range results {
fmt.Printf("Received data: %#v\n", r) json, err := json.Marshal(r)
if err != nil {
fmt.Print("Couldn't serialise struct to JSON: ", err)
}
request, err := http.NewRequest("POST", fillerUrl+"/v1/submit", bytes.NewBuffer(json))
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
go (func(req *http.Request) {
res, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Print("Error while sending data to filler: ", err)
} else if res.StatusCode != 200 {
fmt.Print("Non-200 while sending data to filler!")
}
})(request)
} }
} }

View File

@@ -3,49 +3,51 @@ package scraper
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"log"
) )
type Coords struct { type Coords struct {
Lat, Lon float64 Lat float64 `json:"lat"`
Lon float64 `json:"lon"`
} }
type Vehicle struct { type Vehicle struct {
GarageId string GarageId string `json:"garageId"`
Coords Coords Coords Coords `json:"coords"`
AtStationId int AtStationId int `json:"atStationId"`
AtStationName string AtStationName string `json:"atStationName"`
} }
type StationInfo struct { type StationInfo struct {
Id int Id int `json:"id"`
CityId int CityId int `json:"cityId"`
CityName string CityName string `json:"cityName"`
Name string Name string `json:"name"`
Coords Coords Coords Coords `json:"coords"`
} }
type LineInfo struct { type LineInfo struct {
Name string Name string `json:"name"`
Title string Title string `json:"title"`
Stations []StationInfo Direction int `json:"direction"`
Route []Coords Stations []StationInfo `json:"stations"`
Route []Coords `json:"route"`
} }
type ScrapeResult struct { type ScrapeResult struct {
Id int Id int `json:"id"`
Success bool Success bool `json:"success"`
SecondsLeft int SecondsLeft int `json:"secondsLeft"`
LineInfo LineInfo LineInfo LineInfo `json:"lineInfo"`
Vehicles []Vehicle Vehicles []Vehicle `json:"vehicles"`
} }
type Station struct { type Station struct {
Id int Id int `json:"id"`
} }
var Log = log.Default() var Log = log.Default()
@@ -58,6 +60,7 @@ func scrapeRange(s []Station) string {
type ApiConfig struct { type ApiConfig struct {
Endpoint, Key string Endpoint, Key string
Timeout int64
} }
func grabData(stations []Station, c ApiConfig) (map[string][]map[string]interface{}, error) { func grabData(stations []Station, c ApiConfig) (map[string][]map[string]interface{}, error) {
@@ -143,14 +146,36 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
if !ok { if !ok {
lineName = "" lineName = ""
} }
lineInfo.Name = lineName lineInfo.Name = strings.TrimSpace(lineName)
lineTitle, ok := bus["line_title"].(string) lineTitle, ok := bus["line_title"].(string)
if ok { if ok {
lineInfo.Title = lineTitle lineInfo.Title = lineTitle
} }
mainLineTitle, ok := bus["main_line_title"].(string)
if ok {
if mainLineTitle == lineTitle {
lineInfo.Direction = 1
} else {
lineInfo.Direction = 0
}
} else {
lineInfo.Direction = -1
}
var vehicles []Vehicle var vehicles []Vehicle
garageNoStr, ok := bus["garage_no"].(string)
if ok {
vehicles = append(vehicles, Vehicle{
GarageId: garageNoStr,
Coords: Coords{},
AtStationId: -1,
AtStationName: "",
})
}
vehiclesJson, ok := bus["vehicles"].([]interface{}) vehiclesJson, ok := bus["vehicles"].([]interface{})
if !ok { if !ok {
@@ -165,34 +190,38 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
continue continue
} }
stationNumber, err := strconv.Atoi(vehicle["station_number"].(string)) stationNumberString, ok := vehicle["station_number"].(string)
if err != nil { var stationNumber int = -1
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err) if ok {
continue stationNumber, err = strconv.Atoi(stationNumberString)
if err != nil {
Log.Printf("WARN: No station number for vehicle stationId=%v\n\t%v", id, err)
continue
}
} }
lat, err := strconv.ParseFloat(vehicle["lat"].(string), 64) lat, err := strconv.ParseFloat(vehicle["lat"].(string), 64)
if err != nil { if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err) Log.Printf("WARN: Failed to parse vehicle lat for stationId=%v\n\t%v", id, err)
continue continue
} }
lon, err := strconv.ParseFloat(vehicle["lng"].(string), 64) lon, err := strconv.ParseFloat(vehicle["lng"].(string), 64)
if err != nil { if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err) Log.Printf("WARN: Failed to parse vehicle lng for stationId=%v\n\t%v", id, err)
continue continue
} }
garageId, ok := vehicle["garageNo"].(string) garageId, ok := vehicle["garageNo"].(string)
if !ok { if !ok {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\tVehicle's garageNo invalid or missing: %v", id, vehicle) Log.Printf("WARN: Failed to parse vehicle garageNo for stationId=%v\n\tVehicle's garageNo invalid or missing: %v", id, vehicle)
continue continue
} }
atStationName, ok := vehicle["station_name"].(string) atStationName, ok := vehicle["station_name"].(string)
if !ok { if !ok {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\tVehicle's station_name invalid or missing: %v", id, vehicle) Log.Printf("WARN: Failed to parse station_name for vehicle at stationId=%v\n\tVehicle's station_name invalid or missing: %v", id, vehicle)
continue atStationName = ""
} }
vehicles = append(vehicles, Vehicle{ vehicles = append(vehicles, Vehicle{
@@ -203,7 +232,12 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
}) })
} }
lineInfo.Route = scrapeLineRoute(bus["line_route"].([]interface{})) lineRouteJson, ok := bus["line_route"].([]interface{})
if ok {
lineInfo.Route = scrapeLineRoute(lineRouteJson)
} else {
lineInfo.Route = []Coords{}
}
all_stations, ok := bus["all_stations"].([]interface{}) all_stations, ok := bus["all_stations"].([]interface{})
if !ok { if !ok {
@@ -265,6 +299,14 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
} }
SKIP_AUX_INFO: SKIP_AUX_INFO:
// Sometimes we don't get the vehicles array in the upstream response.
// In that case, we grab the only info about the bus we can get
// (its garageNo from the toplevel garage_no field).
// In case we parse *both* buses, pick the one last added.
if len(vehicles) > 1 {
vehicles = []Vehicle{vehicles[len(vehicles)-1]}
}
results = append(results, ScrapeResult{ results = append(results, ScrapeResult{
Success: true, Success: true,
Id: id, Id: id,
@@ -287,6 +329,7 @@ func ScheduleScrape(chunk []Station, c chan []ScrapeResult, a ApiConfig) {
} }
c <- r c <- r
time.Sleep(time.Minute * 3) duration := time.Duration(a.Timeout) * time.Second
time.Sleep(duration)
ScheduleScrape(chunk, c, a) ScheduleScrape(chunk, c, a)
} }