July 30, 2023

tDeltaLakeInput – Docs for ESB 7.x

tDeltaLakeInput

Extracts the latest version or a given snapshot of records from the Delta Lake layer of your Data Lake system and
sends the data to the next component for further processing.

This Delta Lake layer is built on top of your Data Lake system, thus to be connected as part of your Data
Lake system using the configuration component corresponding to your Data Lake system, for
example, tAzureFSCofiguration.

tDeltaLakeInput properties for Apache Spark Batch

These properties are used to configure tDeltaLakeInput running in the Spark Batch Job framework.

The Spark Batch
tDeltaLakeInput component belongs to the Technical family.

The component in this framework is available in all subscription-based Talend products with Big Data
and Talend Data Fabric.

Basic settings

Define a storage configuration
component

Select the configuration component to be used to provide the configuration
information for the connection to the target file system such as HDFS.

If you leave this check box clear, the target file system is the local
system.

The configuration component to be used must be present in the same Job.
For example, if you have dropped a tHDFSConfiguration component in the Job, you can select it to write
the result in a given HDFS system.

Property type

Either Built-In or Repository.

 

Built-In: No property data stored centrally.

 

Repository: Select the repository file where the
properties are stored.

The properties are stored centrally under the Hadoop
Cluster
node of the Repository
tree.

The fields that come after are pre-filled in using the fetched
data.

For further information about the Hadoop
Cluster
node, see the Getting Started Guide.

Schema and Edit
Schema

A schema is a row description. It defines the number of fields
(columns) to be processed and passed on to the next component. When you create a Spark
Job, avoid the reserved word line when naming the
fields.

Click Edit
schema
to make changes to the schema. If the current schema is of the Repository type, three options are available:

  • View schema: choose this
    option to view the schema only.

  • Change to built-in property:
    choose this option to change the schema to Built-in for local changes.

  • Update repository connection:
    choose this option to change the schema stored in the repository and decide whether
    to propagate the changes to all the Jobs upon completion. If you just want to
    propagate the changes to the current Job, you can select No upon completion and choose this schema metadata
    again in the Repository Content
    window.

Spark automatically infers
data types for the columns in a PARQUET schema. In a Talend Job for Apache Spark, the Date type is inferred
and stored as int96.

 

Built-In: You create and store the schema locally for this component
only.

 

Repository: You have already created the schema and stored it in the
Repository. You can reuse it in various projects and Job designs.

Folder/File

Browse to, or enter the path pointing to the data to be used in the file system.

If the path you set points to a folder, this component will
read all of the files stored in that folder, for example,
/user/talend/in; if sub-folders exist, the sub-folders are automatically
ignored unless you define the property
spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive to be
true in the Advanced properties table in the
Spark configuration tab.

  • Depending on the filesystem to be used, properly configure the corresponding
    configuration component placed in your Job, for example, a
    tHDFSConfiguration component for HDFS, a
    tS3Configuration component for S3 and a
    tAzureFSConfiguration for Azure Storage and Azure Data Lake
    Storage.

If you want to specify more than one files or directories in this
field, separate each path using a comma (,).

The button for browsing does not work with the Spark
Local mode; if you are
using the other Spark Yarn
modes that the Studio supports with your distribution, ensure that you have properly
configured the connection in a configuration component in the same Job, such as

tHDFSConfiguration
. Use the
configuration component depending on the filesystem to be used.

Specify Time Travel timestamp

Select this check box to read a given timestamp-defined snapshot of the
datasets to be used.

The format used by Deltalake is yyyy-MM-dd HH:mm:ss.

Delta Lake systematically creates slight differences between the upload time of a file and the metadata timestamp of this file. Bear in mind these differences when you need to filter data.

Specify Time Travel version Select this check box to read a versioned snapshot of the datasets to be
used.

Usage

Usage rule

This component is used as an end component and requires an input link.

Spark Connection

In the Spark
Configuration
tab in the Run
view, define the connection to a given Spark cluster for the whole Job. In
addition, since the Job expects its dependent jar files for execution, you must
specify the directory in the file system to which these jar files are
transferred so that Spark can access these files:

  • Yarn mode (Yarn client or Yarn cluster):

    • When using Google Dataproc, specify a bucket in the
      Google Storage staging bucket
      field in the Spark configuration
      tab.

    • When using HDInsight, specify the blob to be used for Job
      deployment in the Windows Azure Storage
      configuration
      area in the Spark
      configuration
      tab.

    • When using Altus, specify the S3 bucket or the Azure
      Data Lake Storage for Job deployment in the Spark
      configuration
      tab.
    • When using Qubole, add a
      tS3Configuration to your Job to write
      your actual business data in the S3 system with Qubole. Without
      tS3Configuration, this business data is
      written in the Qubole HDFS system and destroyed once you shut
      down your cluster.
    • When using on-premise
      distributions, use the configuration component corresponding
      to the file system your cluster is using. Typically, this
      system is HDFS and so use tHDFSConfiguration.

  • Standalone mode: use the
    configuration component corresponding to the file system your cluster is
    using, such as tHDFSConfiguration or
    tS3Configuration.

    If you are using Databricks without any configuration component present
    in your Job, your business data is written directly in DBFS (Databricks
    Filesystem).

This connection is effective on a per-Job basis.

Computing the day-over-day evolution of US flights using a related Delta Lake dataset (technical preview)

The Job in this scenario uses a sample Delta Lake dataset to calculate the day-over-day Key Performance Index (KPI) of US flights.

tDeltaLakeInput_1.png

Prerequisite:

  • The filesystem to be used with Delta Lake must be S3, Azure or HDFS.
  • Ensure that the credentials to be used have the read/write rights and permissions to this filesystem.
  • The sample Delta Lake dataset to be used has been donwload from Talend Help Center and stored on your filesystem.
    This
    dataset is used for demonstration purposes only; it contains two snapshots
    of US flights per date, implicating the evolution of these flights on each
    date.

    tDeltaLakeInput_2.png

    Download tihs dataset from the
    Download tab of the online version of this page
    at Talend Help Center.

Although not always required, it is recommended to install a Talend Jobserver on the edge node of
your Hadoop cluster. Then in the Preferences
dialog box in your Studio or in Talend Administration Center if available to you to run your Jobs, define this Jobserver as the remote execution
server of your Jobs.

Linking the components to design the flow of Delta Lake data

Drop and link the components to be used to read and process your Delta Lake data.
  1. In the
    Integration
    perspective of the Studio, create an empty Spark Batch Job from the Job
    Designs
    node in the Repository tree view.
  2. In the workspace, enter the name of the component to be used and select this
    component from the list that appears. In this scenario, the components are
    tS3Configuration (labeled s3_flights), two tDeltaLakeInput components (labeled flights_latest_version and flights_first_version, respectively), two tAggregateRow components (labeled count_per_flights), two tPartition components (labeled repart), one tMap and one tFileOutputDelimited.
  3. Connect these components using the Row > Main link as the image above presented.
  4. Leave the tS3Configuration component alone
    without any connection.

Reading delta data from the filesystem

Configure tDeltaLakeInput to read the different snapshots of the data about US flights so that your Job can then easily calculate the evolution of the flights.

Each snapshot got a version when they were written in the Delta Lake dataset to be used.

  1. Configure the storage configuration component to be used to provide the connection information to your filesystem. In this example, it is a tS3Configuration
  2. Double click the tDeltaLakeInput component labeled flights_latest_version to open its Component view.

    tDeltaLakeInput_3.png
  3. Select the Select a storage configuration component check box to reuse the connection information defined in tS3Configuration.
  4. Click Edit schema to open the schema editor. In this editor, define the schema of the input data.
  5. In the Folder/File field, enter the directory where the flight dataset is stored, in the S3 bucket specified in tS3Configuration.
  6. Do the same to configure the other tDeltaLakeInput component but select the Specify time travel version check box and enter 0 in double quotation marks in the displayed Version field, meaning, in this scenario, to read the first version of the data about US flights.

    Without using the time travel feature, tDeltaLakeInput reads the latest snapshot of your data; the time
    travel feature allows you to specify the snapshot to be read.

Calculating the flight performance

  1. Configure each tAggregateRow to calculate the number of flights per date in each version of the Delta Lake dataset.
  2. Configure each tPartition to group each of the calculation results by date.
  3. Double-click tMap to open the editor to join the calculation results from the two versions to visualize the performance.

    tDeltaLakeInput_4.png

Selecting the Spark mode

Depending on the Spark cluster to be used, select a Spark mode for your Job.

The Spark documentation provides an exhaustive list of Spark properties and
their default values at Spark Configuration. A Spark Job designed in the Studio uses
this default configuration except for the properties you explicitly defined in the
Spark Configuration tab or the components
used in your Job.

  1. Click Run to open its view and then click the
    Spark Configuration tab to display its view
    for configuring the Spark connection.
  2. Select the Use local mode check box to test your Job locally.

    In the local mode, the Studio builds the Spark environment in itself on the fly in order to
    run the Job in. Each processor of the local machine is used as a Spark
    worker to perform the computations.

    In this mode, your local file system is used; therefore, deactivate the
    configuration components such as tS3Configuration or
    tHDFSConfiguration that provides connection
    information to a remote file system, if you have placed these components
    in your Job.

    You can launch
    your Job without any further configuration.

  3. Clear the Use local mode check box to display the
    list of the available Hadoop distributions and from this list, select
    the distribution corresponding to your Spark cluster to be used.

    This distribution could be:

    • Databricks

    • Qubole

    • Amazon EMR

      For this distribution, Talend supports:

      • Yarn client

      • Yarn cluster

    • Cloudera

      For this distribution, Talend supports:

      • Standalone

      • Yarn client

      • Yarn cluster

    • Google Cloud
      Dataproc

      For this distribution, Talend supports:

      • Yarn client

    • Hortonworks

      For this distribution, Talend supports:

      • Yarn client

      • Yarn cluster

    • MapR

      For this distribution, Talend supports:

      • Standalone

      • Yarn client

      • Yarn cluster

    • Microsoft HD
      Insight

      For this distribution, Talend supports:

      • Yarn cluster

    • Cloudera Altus

      For this distribution, Talend supports:

      • Yarn cluster

        Your Altus cluster should run on the following Cloud
        providers:

        • Azure

          The support for Altus on Azure is a technical
          preview feature.

        • AWS

    As a Job relies on Avro to move data among its components, it is recommended to set your
    cluster to use Kryo to handle the Avro types. This not only helps avoid
    this Avro known issue but also
    brings inherent preformance gains. The Spark property to be set in your
    cluster is:

    If you cannot find the distribution corresponding to yours from this
    drop-down list, this means the distribution you want to connect to is not officially
    supported by
    Talend
    . In this situation, you can select Custom, then select the Spark
    version
    of the cluster to be connected and click the
    [+] button to display the dialog box in which you can
    alternatively:

    1. Select Import from existing
      version
      to import an officially supported distribution as base
      and then add other required jar files which the base distribution does not
      provide.

    2. Select Import from zip to
      import the configuration zip for the custom distribution to be used. This zip
      file should contain the libraries of the different Hadoop/Spark elements and the
      index file of these libraries.

      In
      Talend

      Exchange, members of
      Talend
      community have shared some ready-for-use configuration zip files
      which you can download from this Hadoop configuration
      list and directly use them in your connection accordingly. However, because of
      the ongoing evolution of the different Hadoop-related projects, you might not be
      able to find the configuration zip corresponding to your distribution from this
      list; then it is recommended to use the Import from
      existing version
      option to take an existing distribution as base
      to add the jars required by your distribution.

      Note that custom versions are not officially supported by

      Talend
      .
      Talend
      and its community provide you with the opportunity to connect to
      custom versions from the Studio but cannot guarantee that the configuration of
      whichever version you choose will be easy. As such, you should only attempt to
      set up such a connection if you have sufficient Hadoop and Spark experience to
      handle any issues on your own.

    For a step-by-step example about how to connect to a custom
    distribution and share this connection, see Hortonworks.

Writing the flight KPI results in your filesystem

  1. Double-click tFileOutputDelimited (labeled kpi_output) to open its Component view.

    tDeltaLakeInput_5.png
  2. Select Define a storage configuration component check box to reuse the connection information defined in tS3Configuration.
  3. Click Sync columns to ensure that the data schema defined in tMap is properly retrieved by tFileOutputDelimited.
  4. In the Folder field, enter the directory in which you
    want to store the calculated KPI results, in the S3 bucket defined in
    tS3Configuration.
  5. From the Action drop-down list, select
    Create if the folder to be used does not exist yet;
    otherwise, select Overwrite.
  6. Select the Include Header to output the column names
    with the calculation results.
  7. Select the Merge result to single file to merge the
    part files into one single file and in the
    Merge File Path field that is displayed, enter the
    directory and the filename in which the merged data is written, in the same S3
    bucket.
  8. If the target file exists already, select Override target
    file
    .
  9. In the Run tab, click Run to execute the Job.
Once the execution is successfully done, download this merge file from S3 to consult
the KPI information of the US flights.

tDeltaLakeInput_6.png

Document get from Talend https://help.talend.com
Thank you for watching.
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x