global { spark-options { "spark.hadoop.javax.jdo.option.ConnectionURL" = "jdbc:derby://metastore:1527/db;create=true" "spark.hadoop.javax.jdo.option.ConnectionDriverName" = "org.apache.derby.jdbc.ClientDriver" "spark.hadoop.javax.jdo.option.ConnectionUserName" = "sa" "spark.hadoop.javax.jdo.option.ConnectionPassword" = "1234" "spark.sql.shuffle.partitions" = 2 "spark.databricks.delta.snapshotPartitions" = 2 } } connections { localSql { type = JdbcTableConnection url = "jdbc:sqlserver://mssqlserver:1433;encrypt=true;trustServerCertificate=true" driver = com.microsoft.sqlserver.jdbc.SQLServerDriver authMode { type = BasicAuthMode userVariable = "CLEAR#sa" passwordVariable = "CLEAR#%abcd1234%" } } } dataObjects { ext-chess { type = JdbcTableDataObject connectionId = localSql table = { name = "dbo.chess" db = "foobar" } } int-chess { type = DeltaLakeTableDataObject path = "~{id}" table { db = "default" name = "int_chess" primaryKey = [id] } } ext-chess-cdc { type = AirbyteDataObject config = { host = "mssqlserver" port = 1433 database = "foobar" username = "sa" password = "%abcd1234%" replication_method = "CDC" }, streamName = "chess_cdc", cmd = { type = DockerRunScript name = "airbyte_source-mssql" image = "airbyte-mssql" linuxDockerCmd = "bash /mnt/config/start_buildah.sh" } } int-chess-cdc { type = DeltaLakeTableDataObject path = "~{id}" table { db = "default" name = "int_chess_cdc" primaryKey = [id] } } } actions { histData { type = HistorizeAction mergeModeEnable = true inputId = ext-chess outputId = int-chess transformers = [{ type = ScalaCodeSparkDfTransformer code = """ import org.apache.spark.sql.{DataFrame, SparkSession} (session:SparkSession, options:Map[String, String], df:DataFrame, dataObjectId:String) => { import session.implicits._ df.dropDuplicates(Seq("id")) } """ }] metadata { feed = download } } histDataAirbyte { type = HistorizeAction mergeModeEnable = true executionMode = { type = DataObjectStateIncrementalMode } inputId = ext-chess-cdc outputId = int-chess-cdc mergeModeCDCColumn = "cdc_deleted" mergeModeCDCDeletedValue = true transformers = [{ type = AdditionalColumnsTransformer additionalDerivedColumns = {cdc_deleted = "_ab_cdc_deleted_at is not null"} }] metadata { feed = download } } }