global { spark-options { "spark.sql.shuffle.partitions" = 2 } } dataObjects { ext-departures { type = WebserviceFileDataObject url = "https://opensky-network.org/api/flights/departure?airport=LSZB&begin=1696854853&end=1697027653" readTimeoutMs=200000 } stg-departures { type = JsonFileDataObject path = "~{id}" } ext-airports { type = WebserviceFileDataObject # uses redirects to the URL below url = "https://ourairports.com/data/airports.csv" # url = "https://davidmegginson.github.io/ourairports-data/airports.csv" followRedirects = true readTimeoutMs=200000 } stg-airports { type = CsvFileDataObject path = "~{id}" } int-airports { type = CsvFileDataObject path = "~{id}" } btl-departures-arrivals-airports { type = CsvFileDataObject path = "~{id}" } btl-distances { type = CsvFileDataObject path = "~{id}" } } actions { download-departures { type = FileTransferAction inputId = ext-departures outputId = stg-departures metadata { feed = download } } download-airports { type = FileTransferAction inputId = ext-airports outputId = stg-airports metadata { feed = download } } select-airport-cols { type = CopyAction inputId = stg-airports outputId = int-airports transformers = [{ type = SQLDfTransformer code = "select ident, name, latitude_deg, longitude_deg from stg_airports" }] metadata { feed = compute } } join-departures-airports { type = CustomDataFrameAction inputIds = [stg-departures, int-airports] outputIds = [btl-departures-arrivals-airports] transformers = [{ type = SQLDfsTransformer code = { btl-connected-airports = """select stg_departures.estdepartureairport, stg_departures.estarrivalairport, airports.* from stg_departures join int_airports airports on stg_departures.estArrivalAirport = airports.ident""" }}, { type = SQLDfsTransformer code = { btl-departures-arrivals-airports = """select btl_connected_airports.estdepartureairport, btl_connected_airports.estarrivalairport, btl_connected_airports.name as arr_name, btl_connected_airports.latitude_deg as arr_latitude_deg, btl_connected_airports.longitude_deg as arr_longitude_deg, airports.name as dep_name, airports.latitude_deg as dep_latitude_deg, airports.longitude_deg as dep_longitude_deg from btl_connected_airports join int_airports airports on btl_connected_airports.estdepartureairport = airports.ident""" } } ] metadata { feed = compute } } compute-distances { type = CopyAction inputId = btl-departures-arrivals-airports outputId = btl-distances transformers = [{ type = ScalaClassSparkDfTransformer className = com.sample.ComputeDistanceTransformer }] metadata { feed = compute } } }