Compare commits

...

4 Commits

Author SHA1 Message Date
66ad4e1666 feat: make scraping much more robust
Sometimes the upstream API will just drop most of the data included. This commit
assumes less of that data will be included, as well as using the most out of the
data.

For example, using line IDs from upstream is entirely avoided, using line names
and titles to determine lines and their directions.
2023-10-04 15:56:32 +02:00
738d43adea feat: fan out scrapes over time, support different scrape intervals
This should help avoid rate-limits on upstream, as well as let admins determine
update latency manually.
2023-10-04 15:54:07 +02:00
ef19ed40f2 feat: feed data to upstream over REST instead of printing to stdout 2023-09-29 00:53:18 +02:00
bcfeabc9c1 feat: include line id in LineInfo 2023-09-29 00:52:36 +02:00
3 changed files with 120 additions and 41 deletions

BIN
main/main

Binary file not shown.

View File

@@ -1,12 +1,17 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"git.dvdrw.dev/nsmarter/scraper/scraper"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
"git.dvdrw.dev/nsmarter/scraper/scraper"
)
var Log = log.Default()
@@ -20,6 +25,8 @@ var chunkSize = 5
var apiEndpoint string = "https://online.nsmart.rs/publicapi/v1/announcement/announcement.php"
var apiKey string
var limitQuerySize = 0
var timeoutPeriod = 180
var fillerUrl string = "localhost"
func parseEnvars() {
for _, e := range os.Environ() {
@@ -36,6 +43,13 @@ func parseEnvars() {
Log.Printf("WARN: Invalid value for CHUNK_SIZE. Falling back to default value (%v)\n", defaultChunkSize)
chunkSize = defaultChunkSize
}
case "TIMEOUT_PERIOD":
var err error
timeoutPeriod, err = strconv.Atoi(pair[1])
if err != nil {
Log.Printf("WARN: Invalid value for TIMEOUT_PERIOD. Falling back to default value (%v)\n", 180)
timeoutPeriod = 180
}
case "LIMIT_QUERY_SIZE":
var err error
limitQuerySize, err = strconv.Atoi(pair[1])
@@ -47,6 +61,8 @@ func parseEnvars() {
apiEndpoint = pair[1]
case "API_KEY":
apiKey = pair[1]
case "FILLER_URL":
fillerUrl = pair[1]
}
}
}
@@ -79,14 +95,34 @@ func main() {
}
results := make(chan []scraper.ScrapeResult, 200)
go func() {
for _, chunk := range stationChunks {
go scraper.ScheduleScrape(chunk,
results,
scraper.ApiConfig{Endpoint: apiEndpoint,
Key: apiKey})
Key: apiKey,
Timeout: int64(timeoutPeriod)})
// Fan out scrapes over time so as to avoid upstream rate limits
time.Sleep(time.Millisecond * 100)
}
}()
for r := range results {
fmt.Printf("Received data: %#v\n", r)
json, err := json.Marshal(r)
if err != nil {
fmt.Print("Couldn't serialise struct to JSON: ", err)
}
request, err := http.NewRequest("POST", fillerUrl+"/v1/submit", bytes.NewBuffer(json))
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
go (func(req *http.Request) {
res, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Print("Error while sending data to filler: ", err)
} else if res.StatusCode != 200 {
fmt.Print("Non-200 while sending data to filler!")
}
})(request)
}
}

View File

@@ -3,49 +3,51 @@ package scraper
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"time"
"log"
)
type Coords struct {
Lat, Lon float64
Lat float64 `json:"lat"`
Lon float64 `json:"lon"`
}
type Vehicle struct {
GarageId string
Coords Coords
AtStationId int
AtStationName string
GarageId string `json:"garageId"`
Coords Coords `json:"coords"`
AtStationId int `json:"atStationId"`
AtStationName string `json:"atStationName"`
}
type StationInfo struct {
Id int
CityId int
CityName string
Name string
Coords Coords
Id int `json:"id"`
CityId int `json:"cityId"`
CityName string `json:"cityName"`
Name string `json:"name"`
Coords Coords `json:"coords"`
}
type LineInfo struct {
Name string
Title string
Stations []StationInfo
Route []Coords
Name string `json:"name"`
Title string `json:"title"`
Direction int `json:"direction"`
Stations []StationInfo `json:"stations"`
Route []Coords `json:"route"`
}
type ScrapeResult struct {
Id int
Success bool
SecondsLeft int
LineInfo LineInfo
Vehicles []Vehicle
Id int `json:"id"`
Success bool `json:"success"`
SecondsLeft int `json:"secondsLeft"`
LineInfo LineInfo `json:"lineInfo"`
Vehicles []Vehicle `json:"vehicles"`
}
type Station struct {
Id int
Id int `json:"id"`
}
var Log = log.Default()
@@ -58,6 +60,7 @@ func scrapeRange(s []Station) string {
type ApiConfig struct {
Endpoint, Key string
Timeout int64
}
func grabData(stations []Station, c ApiConfig) (map[string][]map[string]interface{}, error) {
@@ -143,14 +146,36 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
if !ok {
lineName = ""
}
lineInfo.Name = lineName
lineInfo.Name = strings.TrimSpace(lineName)
lineTitle, ok := bus["line_title"].(string)
if ok {
lineInfo.Title = lineTitle
}
mainLineTitle, ok := bus["main_line_title"].(string)
if ok {
if mainLineTitle == lineTitle {
lineInfo.Direction = 1
} else {
lineInfo.Direction = 0
}
} else {
lineInfo.Direction = -1
}
var vehicles []Vehicle
garageNoStr, ok := bus["garage_no"].(string)
if ok {
vehicles = append(vehicles, Vehicle{
GarageId: garageNoStr,
Coords: Coords{},
AtStationId: -1,
AtStationName: "",
})
}
vehiclesJson, ok := bus["vehicles"].([]interface{})
if !ok {
@@ -165,34 +190,38 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
continue
}
stationNumber, err := strconv.Atoi(vehicle["station_number"].(string))
stationNumberString, ok := vehicle["station_number"].(string)
var stationNumber int = -1
if ok {
stationNumber, err = strconv.Atoi(stationNumberString)
if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err)
Log.Printf("WARN: No station number for vehicle stationId=%v\n\t%v", id, err)
continue
}
}
lat, err := strconv.ParseFloat(vehicle["lat"].(string), 64)
if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err)
Log.Printf("WARN: Failed to parse vehicle lat for stationId=%v\n\t%v", id, err)
continue
}
lon, err := strconv.ParseFloat(vehicle["lng"].(string), 64)
if err != nil {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\t%v", id, err)
Log.Printf("WARN: Failed to parse vehicle lng for stationId=%v\n\t%v", id, err)
continue
}
garageId, ok := vehicle["garageNo"].(string)
if !ok {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\tVehicle's garageNo invalid or missing: %v", id, vehicle)
Log.Printf("WARN: Failed to parse vehicle garageNo for stationId=%v\n\tVehicle's garageNo invalid or missing: %v", id, vehicle)
continue
}
atStationName, ok := vehicle["station_name"].(string)
if !ok {
Log.Printf("WARN: Failed to parse vehicle for stationId=%v\n\tVehicle's station_name invalid or missing: %v", id, vehicle)
continue
Log.Printf("WARN: Failed to parse station_name for vehicle at stationId=%v\n\tVehicle's station_name invalid or missing: %v", id, vehicle)
atStationName = ""
}
vehicles = append(vehicles, Vehicle{
@@ -203,7 +232,12 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
})
}
lineInfo.Route = scrapeLineRoute(bus["line_route"].([]interface{}))
lineRouteJson, ok := bus["line_route"].([]interface{})
if ok {
lineInfo.Route = scrapeLineRoute(lineRouteJson)
} else {
lineInfo.Route = []Coords{}
}
all_stations, ok := bus["all_stations"].([]interface{})
if !ok {
@@ -265,6 +299,14 @@ func Scrape(stations []Station, c ApiConfig) ([]ScrapeResult, error) {
}
SKIP_AUX_INFO:
// Sometimes we don't get the vehicles array in the upstream response.
// In that case, we grab the only info about the bus we can get
// (its garageNo from the toplevel garage_no field).
// In case we parse *both* buses, pick the one last added.
if len(vehicles) > 1 {
vehicles = []Vehicle{vehicles[len(vehicles)-1]}
}
results = append(results, ScrapeResult{
Success: true,
Id: id,
@@ -287,6 +329,7 @@ func ScheduleScrape(chunk []Station, c chan []ScrapeResult, a ApiConfig) {
}
c <- r
time.Sleep(time.Minute * 3)
duration := time.Duration(a.Timeout) * time.Second
time.Sleep(duration)
ScheduleScrape(chunk, c, a)
}