July 30, 2023

tKuduOutput – Docs for ESB 7.x

tKuduOutput

Creates, updates or deletes data in a Cloudera Kudu table.

tKuduOutput properties for Apache Spark Batch

These properties are used to configure tKuduOutput running in the Spark Batch Job framework.

The Spark Batch
tKuduOutput component belongs to the Databases family.

The component in this framework is available in all subscription-based Talend products with Big Data
and Talend Data Fabric.

Basic settings

Use an existing configuration

Select this check box and in the Component List click the relevant connection component to
reuse the connection details you already defined.

Server connection

Click the [+] button to add as many rows as the Kudu masters you need to use, each row for a master.

Then enter the locations and the listening ports of the master nodes of the Kudu service to be used.

This component supports only the Apache Kudu service installed on Cloudera.

For compatibility information between Apache Kudu and Cloudera, see the related Cloudera
documentation:Compatibility Matrix for
Apache Kudu
.

Schema and Edit
schema

A schema is a row description. It defines the number of fields
(columns) to be processed and passed on to the next component. When you create a Spark
Job, avoid the reserved word line when naming the
fields.

  • Built-In: You create and store the schema locally for this component
    only.

  • Repository: You have already created the schema and stored it in the
    Repository. You can reuse it in various projects and Job designs.

 

Click Edit
schema
to make changes to the schema. If the current schema is of the Repository type, three options are available:

  • View schema: choose this
    option to view the schema only.

  • Change to built-in property:
    choose this option to change the schema to Built-in for local changes.

  • Update repository connection:
    choose this option to change the schema stored in the repository and decide whether
    to propagate the changes to all the Jobs upon completion. If you just want to
    propagate the changes to the current Job, you can select No upon completion and choose this schema metadata
    again in the Repository Content
    window.

Kudu table

Enter the name of the table to be created, changed or removed.

Action on table

Select an operation to be performed on the table defined.

  • None: No operation is carried out.

  • Drop and create table: The table is removed
    and created again.

  • Create table: The table does not exist and
    gets created.

  • Create table if does not exist: The table is
    created if it does not exist.

  • Drop table if exist and create: The table is
    removed if it already exists and created again.

Action on data

Select an action to be performed on data of the table defined.

  • Insert: Add new entries to the table. If
    duplicates are found, job stops.

  • Update: Make changes to existing
    entries.

  • Upsert: Update the record with the given reference. If
    the record does not exist, a new record would be inserted.

  • Delete: Remove entries corresponding to
    the input flow.

Replicas

Enter, without double quotation marks, the replication factor of this table
to create copies of your table and its tablets.

For further information about Kudu tablets and Kudu replication policies,
see Distribution and Fault Tolerance.

Hash partitions

When you are creating a Kudu table, it is recommended to define how this
table is partitioned. By default, your table is not partitioned.

  1. Click the [+] button to add the columns based on
    which rows are hash-partitioned, for example, add a
    host column and a
    metric column.

    These columns must exist in
    your data and in the schema you have defined in
    tKuduOutput.

  2. In Number of buckets, enter, without double
    quotation marks, the number of the buckets to be used to store the
    partitions. These buckets are created on the fly.

At runtime, rows are distributed by hash value in one of those buckets. If you leave this
Hash partitions table empty, hash partitioning is
not applied during the creation of the table.

For further information about hash partitioning in Kudu, see Hash partitioning.

Range partitions

When you are creating a Kudu table, it is recommended to define how this
table is partitioned. By default, your table is not partitioned.

  1. Click the [+] button to add the primary key
    columns based on which rows are partitioned to contiguous segments,
    for example, add a time column.

    These
    columns must exist in your data and in the schema you have defined
    in tKuduOutput.

  2. In N. Partition, enter, without double
    quotation marks, a number to be used as the ID number of the
    partition to be created. For example, enter 1
    to create Partition 1.
  3. In Lower boundary and Upper
    boundary
    , enter the boundaries between which rows
    are partitioned. The upper boundary is excluded and only the data
    between the boundaries (including the lower boundary) is written to
    the Kudu table.

    For example, if you are partitioning your data
    based on a time column of
    Date type and the format of your time
    data is yyyy-mm-dd, you can set the lower boundary to be
    2016-01-01 by entering, without
    double quotation marks,
    TalendDate.parseDate("yyyy-MM-dd",
    "2016-01-01")
    and do the same to set the upper
    boundary to be 2018-01-01, then only rows
    with dates between the two boundaries enter this
    partition.

At runtime, rows of these columns are distributed using the values of the
columns you have added to this Range partitions
table. If you leave this table empty, range partitioning is not applied
during the creation of the table.

For further information about hash partitioning in Kudu, see Range partitioning.

Die on error

Select the check box to stop the execution of the Job when an error
occurs.

Usage

Usage rule

This component is used as an end component and requires an input link.

Spark Connection

In the Spark
Configuration
tab in the Run
view, define the connection to a given Spark cluster for the whole Job. In
addition, since the Job expects its dependent jar files for execution, you must
specify the directory in the file system to which these jar files are
transferred so that Spark can access these files:

  • Yarn mode (Yarn client or Yarn cluster):

    • When using Google Dataproc, specify a bucket in the
      Google Storage staging bucket
      field in the Spark configuration
      tab.

    • When using HDInsight, specify the blob to be used for Job
      deployment in the Windows Azure Storage
      configuration
      area in the Spark
      configuration
      tab.

    • When using Altus, specify the S3 bucket or the Azure
      Data Lake Storage for Job deployment in the Spark
      configuration
      tab.
    • When using Qubole, add a
      tS3Configuration to your Job to write
      your actual business data in the S3 system with Qubole. Without
      tS3Configuration, this business data is
      written in the Qubole HDFS system and destroyed once you shut
      down your cluster.
    • When using on-premise
      distributions, use the configuration component corresponding
      to the file system your cluster is using. Typically, this
      system is HDFS and so use tHDFSConfiguration.

  • Standalone mode: use the
    configuration component corresponding to the file system your cluster is
    using, such as tHDFSConfiguration or
    tS3Configuration.

    If you are using Databricks without any configuration component present
    in your Job, your business data is written directly in DBFS (Databricks
    Filesystem).

This connection is effective on a per-Job basis.

Writing and reading data from Cloudera Kudu using a Spark Batch Job

In this scenario, you create a Spark Batch Job using the Kudu components to partition and write data in a Kudu table and then read some of the data from Kudu.

This scenario applies only to subscription-based Talend products with Big
Data
.

tKuduOutput_1.png
The sample data reads as
follows:

This data contains the names of some persons, the ID numbers distributed to
these persons and their ages.

The ages are made distinct one from another on purpose because this column is the primary key column and is used for range partitioning in this scenario.

Note that the sample data is created for demonstration purposes only.

Prerequisites:

  • Ensure that the Spark cluster and the
    Cloudera Kudu database to be used have been properly installed and are running.

  • Ensure that the client machine on which the Talend Jobs are
    executed can recognize the host names of the nodes of the Hadoop cluster to be
    used. For this purpose, add the IP address/hostname mapping entries for the
    services of that Hadoop cluster in the hosts
    file of the client machine.

    For example, if the host name of the Hadoop Namenode server is talend-cdh550.weave.local and its IP address is 192.168.x.x, the mapping entry reads 192.168.x.x
    talend-cdh550.weave.local
    .

  • If the cluster to be used is secured with Kerberos, ensure
    that you have properly installed and configured Kerberos on the machine on
    which your Talend Job is
    executed. Depending on the Kerberos mode being used, the Kerberos kinit
    ticket or keytab must be available on that machine.

    For further information, search for
    how to use Kerberos in the Studio with Big Data on Talend Help Center (https://help.talend.com).

Recommendation:

  • Define the Hadoop connection metadata from the Hadoop cluster node in Repository. This way, you can not only reuse
    this connection in different Jobs but also ensure that the connection to
    your Hadoop cluster has been properly set up and is working well when you
    use this connection in your Jobs.

    For further information about
    setting up a reusable Hadoop connection, search for centralizing a Hadoop
    connection on Talend Help Center
    (https://help.talend.com).

Design the data flow of the Kudu Job

  1. In the
    Integration
    perspective of the Studio, create an empty
    Spark Batch Job from the Job Designs node in
    the Repository tree view.

    For further information about how to create a Spark Batch Job, see Talend Open Studio for Big Data Getting Started Guide.
  2. In the workspace, enter the name of the component to be used and select this
    component from the list that appears. In this scenario, the components are
    tHDFSConfiguration, tKuduConfiguration, tFixedFlowInput, tKuduOutput,
    tKuduInput and tLogRow.

    The tFixedFlowInput component is used to load the
    sample data into the data flow. In the real-world practice, you can use other
    components such as tFileInputDelimited, alone or even
    with a tMap, in the place of
    tFixedFlowInput to design a sophisticated process to
    prepare your data to be processed.
  3. Connect tFixedFlowInput to tKuduOutput using the Row > Main link.
  4. Connect tMongoDBInput to tLogRow using the Row > Main link.
  5. Connect tFixedFlowInput to tMongoDBInput using the Trigger > OnSubjobOk link.
  6. Leave tHDFSConfiguration and tKuduConfiguration alone without any
    connection.

Defining the Cloudera connection parameters

Complete the Cloudera connection configuration in the Spark
configuration
tab of the Run view of your Job.
This configuration is effective on a per-Job basis.

If you cannot
find the Cloudera version to be used from this drop-down list, you can add your distribution
via some dynamic distribution settings in the Studio.

  1. Select the type of the Spark cluster you need to connect to.

    Standalone

    The Studio connects to a Spark-enabled cluster to run the Job from this
    cluster.

    If you are using the Standalone mode, you need to
    set the following parameters:

    • In the Spark host field, enter the URI
      of the Spark Master of the Hadoop cluster to be used.

    • In the Spark home field, enter the
      location of the Spark executable installed in the Hadoop cluster to be used.

    • If the Spark cluster cannot recognize the machine in which the Job is
      launched, select this Define the driver hostname or IP
      address
      check box and enter the host name or the IP address of
      this machine. This allows the Spark master and its workers to recognize this
      machine to find the Job and thus its driver.

      Note that in this situation, you also need to add the name and the IP
      address of this machine to its host file.

    Yarn client

    The Studio runs the Spark driver to orchestrate how the Job should be
    performed and then send the orchestration to the Yarn service of a given
    Hadoop cluster so that the Resource Manager of this Yarn service
    requests execution resources accordingly.

    If you are using the Yarn client
    mode, you need to set the following parameters in their corresponding
    fields (if you leave the check box of a service clear, then at runtime,
    the configuration about this parameter in the Hadoop cluster to be used
    will be ignored):

    • In the Resource managerUse datanode
      field, enter the address of the ResourceManager service of the Hadoop cluster to
      be used.

    • Select the Set resourcemanager
      scheduler address
      check box and enter the Scheduler address in
      the field that appears.

    • Select the Set jobhistory
      address
      check box and enter the location of the JobHistory
      server of the Hadoop cluster to be used. This allows the metrics information of
      the current Job to be stored in that JobHistory server.

    • Select the Set staging
      directory
      check box and enter this directory defined in your
      Hadoop cluster for temporary files created by running programs. Typically, this
      directory can be found under the yarn.app.mapreduce.am.staging-dir property in the configuration files
      such as yarn-site.xml or mapred-site.xml of your distribution.

    • If you are accessing the Hadoop cluster running with Kerberos security,
      select this check box, then, enter the Kerberos principal names for the
      ResourceManager service and the JobHistory service in the displayed fields. This
      enables you to use your user name to authenticate against the credentials stored in
      Kerberos. These principals can be found in the configuration files of your
      distribution, such as in yarn-site.xml and in mapred-site.xml.

      If you need to use a Kerberos keytab file to log in, select Use a keytab to authenticate. A keytab file contains
      pairs of Kerberos principals and encrypted keys. You need to enter the principal to
      be used in the Principal field and the access
      path to the keytab file itself in the Keytab
      field. This keytab file must be stored in the machine in which your Job actually
      runs, for example, on a Talend
      Jobserver.

      Note that the user that executes a keytab-enabled Job is not necessarily
      the one a principal designates but must have the right to read the keytab file being
      used. For example, the user name you are using to execute a Job is user1 and the principal to be used is guest; in this
      situation, ensure that user1 has the right to read the keytab
      file to be used.

    • The User name field is available when you are not using
      Kerberos to authenticate. In the User name field, enter the
      login user name for your distribution. If you leave it empty, the user name of the machine
      hosting the Studio will be used.

    • If the Spark cluster cannot recognize the machine in which the Job is
      launched, select this Define the driver hostname or IP
      address
      check box and enter the host name or the IP address of
      this machine. This allows the Spark master and its workers to recognize this
      machine to find the Job and thus its driver.

      Note that in this situation, you also need to add the name and the IP
      address of this machine to its host file.

    Yarn cluster

    The Spark driver runs in your Yarn cluster to orchestrate how the Job
    should be performed.

    If you are using the Yarn cluster mode, you need
    to define the following parameters in their corresponding fields (if you
    leave the check box of a service clear, then at runtime, the
    configuration about this parameter in the Hadoop cluster to be used will
    be ignored):

    • In the Resource managerUse datanode
      field, enter the address of the ResourceManager service of the Hadoop cluster to
      be used.

    • Select the Set resourcemanager
      scheduler address
      check box and enter the Scheduler address in
      the field that appears.

    • Select the Set jobhistory
      address
      check box and enter the location of the JobHistory
      server of the Hadoop cluster to be used. This allows the metrics information of
      the current Job to be stored in that JobHistory server.

    • Select the Set staging
      directory
      check box and enter this directory defined in your
      Hadoop cluster for temporary files created by running programs. Typically, this
      directory can be found under the yarn.app.mapreduce.am.staging-dir property in the configuration files
      such as yarn-site.xml or mapred-site.xml of your distribution.

    • Set path to custom Hadoop
      configuration JAR
      : if you are using
      connections defined in Repository to
      connect to your Cloudera or Hortonworks cluster, you can
      select this check box in the
      Repository wizard and in the
      field that is displayed, specify the path to the JAR file
      that provides the connection parameters of your Hadoop
      environment. Note that this file must be accessible from the
      machine where you Job is launched.

      This kind of Hadoop configuration JAR file is
      automatically generated when you build a Big Data Job from the
      Studio. This JAR file is by default named with this
      pattern: You
      can also download this JAR file from the web console of your
      cluster or simply create a JAR file yourself by putting the
      configuration files in the root of your JAR file. For
      example:

      The parameters from your custom JAR file override the parameters
      you put in the Spark configuration field.
      They also override the configuration you set in the
      configuration components such as
      tHDFSConfiguration or
      tHBaseConfiguration when the related
      storage system such as HDFS, HBase or Hive are native to Hadoop.
      But they do not override the configuration set in the
      configuration components for the third-party storage system such
      as tAzureFSConfiguration.

    • If you are accessing the Hadoop cluster running with Kerberos security,
      select this check box, then, enter the Kerberos principal names for the
      ResourceManager service and the JobHistory service in the displayed fields. This
      enables you to use your user name to authenticate against the credentials stored in
      Kerberos. These principals can be found in the configuration files of your
      distribution, such as in yarn-site.xml and in mapred-site.xml.

      If you need to use a Kerberos keytab file to log in, select Use a keytab to authenticate. A keytab file contains
      pairs of Kerberos principals and encrypted keys. You need to enter the principal to
      be used in the Principal field and the access
      path to the keytab file itself in the Keytab
      field. This keytab file must be stored in the machine in which your Job actually
      runs, for example, on a Talend
      Jobserver.

      Note that the user that executes a keytab-enabled Job is not necessarily
      the one a principal designates but must have the right to read the keytab file being
      used. For example, the user name you are using to execute a Job is user1 and the principal to be used is guest; in this
      situation, ensure that user1 has the right to read the keytab
      file to be used.

    • The User name field is available when you are not using
      Kerberos to authenticate. In the User name field, enter the
      login user name for your distribution. If you leave it empty, the user name of the machine
      hosting the Studio will be used.

    • Select the Wait for the Job to complete check box to make your Studio or,
      if you use Talend
      Jobserver, your Job JVM keep monitoring the Job until the execution of the Job
      is over. By selecting this check box, you actually set the spark.yarn.submit.waitAppCompletion property to be true. While
      it is generally useful to select this check box when running a Spark Batch Job,
      it makes more sense to keep this check box clear when running a Spark Streaming
      Job.

    Ensure that the user name in the Yarn
    client
    mode is the same one you put in
    tHDFSConfiguration, the component used to provide HDFS
    connection information to Spark.


  2. With the Yarn client mode, the
    Property type list is displayed to allow you
    to select an established Hadoop connection from the Repository, on the condition that you have created this connection
    in the Repository. Then the Studio will reuse
    that set of connection information for this Job.


  3. If you need to launch from Windows, it is recommended to specify where
    the winutils.exe program to be used is stored.

    • If you know where to find your winutils.exe file and you want to use it, select the Define the Hadoop home directory check box
      and enter the directory where your winutils.exe is
      stored.

    • Otherwise, leave this check box clear, the Studio generates one
      by itself and automatically uses it for this Job.


  4. In the Spark “scratch” directory
    field, enter the directory in which the Studio stores in the local system the
    temporary files such as the jar files to be transferred. If you launch the Job
    on Windows, the default disk is C:. So if you leave /tmp in this field, this directory is C:/tmp.

  • After the connection is configured, you can tune
    the Spark performance, although not required, by following the process explained in:

    • for Spark Batch Jobs.

    • for Spark Streaming Jobs.

  • It is recommended to activate the Spark logging and
    checkpointing system in the Spark configuration tab of the Run view of your Spark
    Job, in order to help debug and resume your Spark Job when issues arise:

    • .

  • If you are using Cloudera V5.5+ to run your MapReduce or Apache Spark Batch
    Jobs, you can make use of Cloudera Navigator to trace the lineage of given
    data flow to discover how this data flow was generated by a Job.

Configuring the connection to the file system to be used by Spark

Skip this section if you are using Google Dataproc or HDInsight, as for these two
distributions, this connection is configured in the Spark
configuration
tab.

  1. Double-click tHDFSConfiguration to open its Component view.

    Spark uses this component to connect to the HDFS system to which the jar
    files dependent on the Job are transferred.

  2. If you have defined the HDFS connection metadata under the Hadoop
    cluster
    node in Repository, select
    Repository from the Property
    type
    drop-down list and then click the
    […] button to select the HDFS connection you have
    defined from the Repository content wizard.

    For further information about setting up a reusable
    HDFS connection, search for centralizing HDFS metadata on Talend Help Center
    (https://help.talend.com).

    If you complete this step, you can skip the following steps about configuring
    tHDFSConfiguration because all the required fields
    should have been filled automatically.

  3. In the Version area, select
    the Hadoop distribution you need to connect to and its version.
  4. In the NameNode URI field,
    enter the location of the machine hosting the NameNode service of the cluster.
    If you are using WebHDFS, the location should be
    webhdfs://masternode:portnumber; WebHDFS with SSL is not
    supported yet.
  5. In the Username field, enter
    the authentication information used to connect to the HDFS system to be used.
    Note that the user name must be the same as you have put in the Spark configuration tab.

Configuring the connection to the Kudu database to be used by Spark

  1. Double-click tKuduConfiguration to open its
    Component view.

    tKuduOutput_2.png

  2. Click the [+] button to add one row to the Server connection table.
  3. Enter the node name of your Kudu master within double quotation marks and its
    listening port.

    If your Kudu cluster has multiple masters, add them all to this table, each in
    one row.

Partition the sample data and writing it to Kudu

  1. Double-click the tFixedFlowIput component to
    open its Component view.

    tKuduOutput_3.png

  2. Click the […] button next to Edit schema to open the schema editor.
  3. Click the [+] button to add the schema
    columns as shown in this image.

    tKuduOutput_4.png

  4. In the Type column, select
    Integer for the age column.
  5. In the Key column, select the check box for the
    age column to define this column as key primary key
    column.
  6. Click OK to validate these changes and accept
    the propagation prompted by the pop-up dialog box.
  7. In the Mode area, select the Use Inline
    Content
    radio button and paste the previously mentioned sample data
    into the Content field that is displayed.
  8. In the Field separator field, enter a
    semicolon (;).
  9. Double-click the tKuduOutput component to
    open its Component view.

    tKuduOutput_5.png
  10. Select the Use an existing configuration check box and
    then select the Kudu configuration you configured in the previous steps from the
    Component list drop-down list.
  11. Click Sync columns to ensure that
    tKuduOutput has the same schema as
    tFixedFlowInput.
  12. In the Table field, enter the name of the table you want
    to create in Kudu.
  13. From the Action on table drop-down list, select
    Drop table if exists and create.
  14. In Range partitions, add one row by clicking the
    [+] button and do the following:

    1. In Partition No, enter, without double quotation
      marks, the number to be used as the ID of the partition to be created.
      For example, enter 1 to create Partition 1.
    2. In Partition column, select the primary key
      column to be used for partitioning. In this scenario, select
      age.
    3. In Lower boundary, enter
      20 without double quotation marks because the
      type of the age data is integer.
    4. In Upper boundary, do the same to set it to
      60.
    This partitioning definition creates the partition schema reading as
    follows: According
    to this partition schema, the record falling on the lower boundary, the age
    20, is included in this partition and thus is written in
    Kudu but the record falling on the upper boundary, the age
    60, is excluded and is not written in Kudu.

    In the
    real-world practice, if you need to write all the data in the Kudu table, define
    more partitions to receive the data with proper boundaries.

  15. From the Action on data drop-down list, select
    Insert.

Scanning data from Kudu

  1. Double-click tKuduInput to open its
    Component view.

    tKuduOutput_6.png

  2. Click the […] button next to Edit
    schema
    to open the schema editor.
  3. Click the [+] button to add the schema columns for
    output as shown in this image.

    tKuduOutput_7.png

  4. In the Type field, select Integer
    as data type for the age column.
  5. In the Key column, select the check box for the
    age column because this is the primary key
    column.
  6. Click OK to validate these changes and accept the
    propagation prompted by the pop-up dialog box.
  7. In the Table name field, enter the name of the table
    from which you need to read data. In this scenario, it is
    ychen_kudu.
  8. In the Query mode area, select the Use
    scan
    radio button to read all the data from the Kudu
    table.
  9. Double click tLogRow to open its
    Component view and select the Table radio button to present the result in a
    table.
  10. Press F6 to run this Job.

Once done, in the console of the Run view, you
can check the data read from the Kudu table.

tKuduOutput_8.png

The record 04;tom;60 is not written in the table because it is out
of the partition boundaries.

In the real-world practice, upon the
success of the execution, you could deploy and launch your Job on a Talend JobServer if you have one.

For related
information, search for running a Job remotely on Talend Help Center (https://help.talend.com).


Document get from Talend https://help.talend.com
Thank you for watching.
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x