使用Java的Databricks Delta Lake


Delta Lake是Databricks的开源版本,在数据湖的顶部提供了一个事务存储层。在实时系统中,数据湖可以是Amazon S3,Azure数据湖存储/ Azure Blob存储,Google Cloud Storage或Hadoop分布式文件系统。

Delta Lake充当位于Data Lake之上的存储层。Delta Lake带来了Data Lake无法提供的其他功能。

Delta Lak的主要特征如下

ACID事务:在实时数据工程应用程序中,有许多针对不同业务数据域的并发管道,这些并发管道在数据湖上进行操作,以进行并发操作以读取和更新数据。由于Data Lake中缺少事务功能,这将导致数据完整性问题。Delta Lake具有遵循ACID属性的事务处理功能,从而使数据保持一致。

数据版本控制:Delta Lake为所有事务性操作维护多个数据版本,这使开发人员可以在需要时使用特定版本。

模式实施:此功能也称为模式验证。Delta Lake在更新数据之前会验证架构。确保数据类型正确并且存在必填列,以防止无效数据并维护元数据质量检查

模式演变:模式演变是一项功能,使用户可以轻松更改表的当前模式,以适应随时间变化的数据。最常见的是,在执行追加或覆盖操作时使用它来自动调整架构以包括一个或多个新列。数据工程师和科学家可以使用此选项将新列添加到现有的机器学习生产表中,而无需破坏依赖旧列的现有模型

高效的数据格式:数据湖中的所有数据均以Apache Parquet格式存储,从而使delta湖能够利用本机木地板格式的压缩和编码

可伸缩的元数据处理:在大数据应用程序中,元数据也可以是大数据。Delta Lake将元数据像数据一样对待,利用Spark的分布式处理能力来处理其所有元数据

批处理和流处理的统一层: Delta Lake存储可以充当批处理和流处理的源和接收器。这将是对现有Lambda体系结构的改进,该体系结构具有用于批处理和流处理的单独管道。

与Spark框架完全兼容: Delta Lake与高度分散且可扩展的大数据框架Spark完全兼容并轻松集成。

逻辑视图如下所示

logical-view.png

通常,我们使用Scala开发Spark应用程序。由于Java社区中有大量开发人员,并且Java仍然统治着企业界,因此我们也可以使用Java开发Spark应用程序,因为Spark为Java程序提供了完全的兼容性并利用了JVM环境。

我们将看到我们如何使用Spark使用Java中的Delta Lake功能。

SparkSession spark = SparkSession.builder().appName("Spark Excel file conversion")
                .config("spark.master", "local").config("spark.sql.warehouse.dir", "file://./sparkapp").getOrCreate();

        // create Employee POJO and add objects to the list
        List<Employee> empList = new ArrayList<Employee>();
        Employee emp = new Employee();
        emp.setEmpId("1234");
        emp.setEmpName("kiran");
        emp.setDeptName("Design dept");
        empList.add(emp);

        emp = new Employee();
        emp.setEmpId("3567");
        emp.setEmpName("raju");
        emp.setDeptName("IT");
        empList.add(emp);

        // Encoders convert JVM object of type T to and from the internal SQL
        // representation
        Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
        Dataset<Row> empDF = spark.createDataset(empList, employeeEncoder).toDF();

        // the format should be delta to leverage delta lake features while
        // saving the data to table
        empDF.write().format("delta").save("/tmp/delta-table20");

        // the format should be delta to leverage delta lake features while
        // reading the data from the table
        empDF = spark.read().format("delta").load("/tmp/delta-table20");

        // show the data from the dataframe
        empDF.show();

上面的Java程序使用Spark框架,该框架读取员工数据并将数据保存在Delta Lake中。

为了利用三角洲湖泊功能,必须将火花读取格式和写入格式从上述程序中的“镶木地板”更改为“三角洲”。

在此示例中,我们将创建Employee POJO并使用一些测试数据初始化POJO。实时地,这将填充来自Data Lake或外部数据源的文件。

该程序的输出如下

untitled.jpg

为了将其作为Maven项目运行,请在pom.xml中添加以下依赖项

<dependency>
           <groupId>io.delta</groupId>
            <artifactId>delta-core_2.11</artifactId>
            <version>0.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.3</version>
        </dependency>

使用“ append”选项将新数据追加到现有的增量表中。

SparkSession spark = SparkSession.builder().appName("Spark Excel file conversion")
                .config("spark.master", "local").config("spark.sql.warehouse.dir", "file://./sparkapp").getOrCreate();

        // create Employee POJO and add objects to the list
        List<Employee> empList = new ArrayList<Employee>();
        Employee emp = new Employee();
        emp.setEmpId("1234");
        emp.setEmpName("kiran");
        emp.setDeptName("IT");
        empList.add(emp);


        emp = new Employee();
        emp.setEmpId("4862");
        emp.setEmpName("david");
        emp.setDeptName("Engineering");
        empList.add(emp);

        // Encoders convert JVM object of type T to and from the internal SQL
        // representation
        Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
        Dataset<Row> empDF = spark.createDataset(empList, employeeEncoder).toDF();

        // the format should be delta to leverage delta lake features while
        // saving the data to table
        //append the data to the table
        empDF.write().format("delta").mode("append").save("/tmp/delta-table20");

        // the format should be delta to leverage delta lake features while
        // reading the data from the table
        empDF = spark.read().format("delta").load("/tmp/delta-table20");

        // show the data from the dataframe
        empDF.show();

上面程序的输出如下所示:

untitled-1.jpg

新数据将追加到表中。现在有2个版本的数据,其中第一个版本是原始数据,附加的数据是新版本的数据

为了获得与第一个版本有关的数据,下面的程序说明了用法。选项“ versionAsOf”将从Delta Lake表中获取数据的相应版本

SparkSession spark = SparkSession.builder().appName("Spark Excel file conversion")
                .config("spark.master", "local").config("spark.sql.warehouse.dir", "file://./sparkapp").getOrCreate();

        // the format should be delta to leverage delta lake features while
        // reading the data from the table.
        // The property "versionAsOf" loads the corresponding version of data.
        // Version 0 is the first version of data
        Dataset<Row> empDF = spark.read().format("delta").option("versionAsOf", 0).load("/tmp/delta-table20");

        // show the data from the dataframe
        empDF.show();

输出对应于数据的第一个版本:

untitled-3.jpg

我们还可以使用“覆盖”模式使用增量选项覆盖。这将覆盖旧数据并在表中创建新数据,但是仍然保留版本,并且可以使用版本号获取旧数据

empDF.write().format("delta").mode("overwrite").save("/tmp/delta-table20");

以上示例中使用的Employee POJO如下所示

public class Employee {
    private String empName;
    private String deptName;
    private String empId;

    public String getEmpName() {
        return empName;
    }

    public void setEmpName(String empName) {
        this.empName = empName;
    }

    public String getDeptName() {
        return deptName;
    }

    public void setDeptName(String deptName) {
       this.deptName = deptName;
    }

    public String getEmpId() {
        return empId;
    }

    public void setEmpId(String empId) {
        this.empId = empId;
    }

}

现在,我们在Employee POJO中更改字段名称之一,例如将“ deptName”字段更改为“ deptId”字段

新的POJO如下

public class Employee {
    private String empName;
    private String deptId;
    private String empId;
    public String getDeptId() {
        return deptId;
    }

    public void setDeptId(String deptId) {
        this.deptId = deptId;
    }

    public String getEmpName() {
        return empName;
    }

    public void setEmpName(String empName) {
        this.empName = empName;
    }

    public String getEmpId() {
        return empId;
    }

    public void setEmpId(String empId) {
        this.empId = empId;
    }
}

现在,我们尝试使用修改后的架构将数据追加到现有的增量表中

SparkSession spark = SparkSession.builder().appName("Spark Excel file conversion")
                .config("spark.master", "local").config("spark.sql.warehouse.dir", "file://./sparkapp").getOrCreate();

        // create Employee POJO and add objects to the list
        List<Employee> empList = new ArrayList<Employee>();
        Employee emp = new Employee();
        emp.setEmpId("6798");
        emp.setEmpName("kumar");
        emp.setDeptId("IT");
        empList.add(emp);

        // Encoders convert JVM object of type T to and from the internal SQL
        // representation
        Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
        Dataset<Row> empDF = spark.createDataset(empList, employeeEncoder).toDF();

        // the format should be delta to leverage delta lake features while
        // saving the data to table
        //append the data to the table
        empDF.write().format("delta").mode("append").save("/tmp/delta-table20");

        // the format should be delta to leverage delta lake features while
        // reading the data from the table
        empDF = spark.read().format("delta").load("/tmp/delta-table20");

        // show the data from the dataframe
        empDF.show();

当我们尝试运行上述程序时,架构强制功能将阻止将数据写入增量表。Delta Lake会在将新数据附加到现有的delta lake表之前验证架构。如果新数据中的架构存在任何不匹配,则会引发有关架构不匹配的错误

当我们尝试运行上面的程序时,发生以下错误

untitled-4.jpg

现在,我们将在Employee表的现有旧模式中添加一个新字段,即“ sectionName”,并尝试将数据追加到现有的Delta Lake表中。

Delta Lake将利用架构演化功能,该功能可容纳正在动态保存的数据的新架构或经修改的架构,而无需任何明确的DDL操作。这是通过使用“ mergeSchema”选项实现的,同时将数据保存到Delta表中,如以下程序所示

SparkSession spark = SparkSession.builder().appName("Spark Excel file conversion")
                .config("spark.master", "local").config("spark.sql.warehouse.dir", "file://./sparkapp").getOrCreate();

        // create Employee POJO and add objects to the list
        List<Employee> empList = new ArrayList<Employee>();
        Employee emp = new Employee();
        emp.setEmpId("6798");
        emp.setEmpName("kumar");
        emp.setDeptName("IT");
        emp.setSectionName("Big Data");
        empList.add(emp);

        // Encoders convert JVM object of type T to and from the internal SQL
       // representation
        Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
        Dataset<Row> empDF = spark.createDataset(empList, employeeEncoder).toDF();

        // the format should be delta to leverage delta lake features while
        // saving the data to table
        // append the data to the table

        empDF.write().format("delta").mode("append").option("mergeSchema", "true").save("/tmp/delta-table20");

        // the format should be delta to leverage delta lake features while
        // reading the data from the table
        empDF = spark.read().format("delta").load("/tmp/delta-table20");

        // show the data from the dataframe
        empDF.show();

上面程序的输出如下所示:

untitled-5.jpg

根据上面的输出,“ sectionName”字段与Delta Lake表的现有架构合并,并且正在为该新列中的表中的现有记录更新null值。

这样,我们可以在实时应用程序中利用Delta Lake功能,从而为现有Data Lakes提供事务性,历史数据维护,模式管理功能。


原文链接:http://codingdict.com