global { spark-options { "spark.hadoop.javax.jdo.option.ConnectionURL" = "jdbc:derby://metastore:1527/db;create=true" "spark.hadoop.javax.jdo.option.ConnectionDriverName" = "org.apache.derby.jdbc.ClientDriver" "spark.hadoop.javax.jdo.option.ConnectionUserName" = "sa" "spark.hadoop.javax.jdo.option.ConnectionPassword" = "1234" "spark.sql.shuffle.partitions" = 2 "spark.databricks.delta.snapshotPartitions" = 2 } } dataObjects { ext-departures { type = CustomWebserviceDataObject schema = """array< struct< icao24: string, firstSeen: integer, estDepartureAirport: string, lastSeen: integer, estArrivalAirport: string, callsign: string, estDepartureAirportHorizDistance: integer, estDepartureAirportVertDistance: integer, estArrivalAirportHorizDistance: integer, estArrivalAirportVertDistance: integer, departureAirportCandidatesCount: integer, arrivalAirportCandidatesCount: integer >>""" baseUrl = "https://opensky-network.org/api/flights/departure" nRetry = 5 queryParameters = [{ airport = "LSZB" },{ airport = "EDDF" }] timeouts { connectionTimeoutMs = 200000 readTimeoutMs = 200000 } } int-departures { type = DeltaLakeTableDataObject path = "~{id}" table { db = "default" name = "int_departures" primaryKey = [icao24, estdepartureairport, dt] } } ext-airports { type = WebserviceFileDataObject # uses redirects to the URL below url = "https://ourairports.com/data/airports.csv" # url = "https://davidmegginson.github.io/ourairports-data/airports.csv" followRedirects = true readTimeoutMs=200000 } stg-airports { type = CsvFileDataObject path = "~{id}" } int-airports { type = DeltaLakeTableDataObject path = "~{id}" table { db = "default" name = "int_airports" primaryKey = [ident] } } btl-departures-arrivals-airports { type = DeltaLakeTableDataObject path = "~{id}" table { db = "default" name = "btl_departures_arrivals_airports" } } btl-distances { type = DeltaLakeTableDataObject path = "~{id}" table { db = "default" name = "btl_distances" } } } actions { download-airports { type = FileTransferAction inputId = ext-airports outputId = stg-airports metadata { feed = download } } download-deduplicate-departures { type = DeduplicateAction inputId = ext-departures outputId = int-departures transformers = [{ type = SQLDfTransformer code = "select ext_departures.*, date_format(from_unixtime(firstseen),'yyyyMMdd') dt from ext_departures" },{ type = ScalaCodeSparkDfTransformer code = """ import org.apache.spark.sql.{DataFrame, SparkSession} def transform(session: SparkSession, options: Map[String,String], df: DataFrame, dataObjectId: String) : DataFrame = { import session.implicits._ df.dropDuplicates("icao24", "estdepartureairport", "dt") } // return as function transform _ """ }] metadata { feed = compute } } historize-airports { type = HistorizeAction inputId = stg-airports outputId = int-airports transformers = [{ type = SQLDfTransformer code = "select ident, name, latitude_deg, longitude_deg from stg_airports" }] metadata { feed = compute } } join-departures-airports { type = CustomDataFrameAction inputIds = [int-departures, int-airports] outputIds = [btl-departures-arrivals-airports] transformers = [{ type = SQLDfsTransformer code = { btl-connected-airports = """ select int_departures.estdepartureairport, int_departures.estarrivalairport, airports.* from int_departures join int_airports airports on int_departures.estArrivalAirport = airports.ident """ }}, { type = SQLDfsTransformer code = { btl-departures-arrivals-airports = """ select btl_connected_airports.estdepartureairport, btl_connected_airports.estarrivalairport, btl_connected_airports.name as arr_name, btl_connected_airports.latitude_deg as arr_latitude_deg, btl_connected_airports.longitude_deg as arr_longitude_deg, airports.name as dep_name, airports.latitude_deg as dep_latitude_deg, airports.longitude_deg as dep_longitude_deg from btl_connected_airports join int_airports airports on btl_connected_airports.estdepartureairport = airports.ident """ } } ] metadata { feed = compute } } compute-distances { type = CopyAction inputId = btl-departures-arrivals-airports outputId = btl-distances transformers = [{ type = ScalaClassSparkDfTransformer className = com.sample.ComputeDistanceTransformer }] metadata { feed = compute } } }