August 17, 2023

tKafkaInput – Docs for ESB 5.x

tKafkaInput

tkafkainput_icon32_white.png

Warning

This component will be available in the Palette of
Talend Studio on the condition that you have subscribed to one of
the Talend
solutions with Big Data.

tKafkaInput properties

Component family

Storm/Input

 

Function

The tKafkaInput component transmits messages you need
to process to the components that follow in the topology you are
designing.

Purpose

tKafkaInput is a generic message
broker that transmits messages to the Storm process that runs
transformations over these messages.

Basic settings

Schema and Edit
schema

A schema is a row description. It defines the number of fields to be processed and passed on
to the next component. The schema is either Built-In or
stored remotely in the Repository.

Note that the schema of this component is read-only. It stores the
messages sent from the message producer.

 

Zookeeper host

Enter the address of the Zookeeper service of the Kafka system to be used.

 

Port

Enter the number of the client listening port of the Zookeeper
service to be used.

  Topic name Enter the name of the topic from which tKafkaInput
receives the feed of messages.

Usage

In a Talend Storm Job, it is used as a start component. The other
components used along with it must be Storm components, too. They generate native Storm code
that can be executed directly in a Storm system.

For further information about a Talend Storm Job, see the sections
describing how to create and configure a Talend Storm Job of the Talend Big Data Getting Started Guide.

Note that in this documentation, unless otherwise explicitly stated, a scenario presents
only Standard Jobs, that is to say traditional Talend data
integration Jobs.

Storm Connection

You need to use the Storm Configuration tab in the
Run view to define the connection to a given Storm
system for the whole Job.

This connection is effective on a per-Job basis.

Scenario: analyzing people’s activities using a Storm topology

In this scenario, a four-component Storm Job (a topology) is created to transmit
messages about the activities of some given people to the topology you are designing in
order to analyze the popularities of those activities.

use_case-tkafkainput1.png

This Job subscribes to the related topic created by the Kafka topic producer, which means
you need to install the Kafka cluster in your messagingg system to maintain the feeds of
messages. For further information about the Kafka messaging service, see Apache’s
documentation about Kafka.

On the other hand, since this Job runs on top of Storm, you need to ensure that your Storm
system is ready for use. For further information about Storm, see Apache’s documentation
about Storm.

Note that when you use the Storm system installed in the Hortonwork Data Platform 2.1
(HDP2.1), ensure that the Storm DRPC (distributed remote procedure call) servers’ names
have been properly defined in the Custom storm.yaml section of the Config tab of Storm
in Ambari’s web console. For example, you need to use two Storm DRPC servers which are
Server1 and Server2, then you must define them in the Custom storm.yaml secton as
follows: [Server1,Server2].

To replicate this scenario, proceed as follows.

Producing the sample messages

In the real-world practice, the system that produces messages to Kafka is completely
decoupled. While in this scenario, Kafka itself is used to produce the sample
messages. You need to perform the following operations to produce these
messages:

  1. Create the Kafka topic to be used to categorize the messages. The
    following command is used for demonstration purposes only. If you need
    further information about the creation of a Kafka topic, see Apache’s
    documentation for
    Kafka.

    This command creates a topic named activities, using
    the Kafka brokers managed by the Zookeeper service on the localhost machine.

  2. Publish the message you want to analyze to the activities topic you have just created. In this scenario,
    Kafka is used to perform this publication using, for example, the following
    command:

    This
    command publishes 10 simple messages

    As explained previously, you can use your actual message producer system instead to
    perform the publication.

Linking the components

  1. In the Integration perspective
    of the Studio, create an empty Storm Job from the Job
    Designs
    node in the Repository tree view.

    For further information about how to create a Storm Job, see Talend Big Data Getting Started Guide.

  2. In the workspace, enter the name of the component to be used and select
    this component from the list that appears. In this scenario, the components
    are tKafkaInput, tJavaStorm, tAggregateRow,
    and tLogRow.

  3. Connect them using Row > Main links.

Configuring the connection

  1. Click Run to open its view and then click
    Storm configuration to set up the
    connection to the Storm system to be used.

    use_case-tkafkainput2.png
  2. In the Storm node roles area, indicate
    the locations of the Nimbus server and the DRPC endpoint of the Storm
    cluster to be used:

    • Local mode: select this check
      box to build the Storm environment within the Studio. In this
      situation, the Storm Job you are designing is run locally within
      the Studio and you do not need to define any specific Nimbus and
      DRPC endpoint addresses.

    • Nimbus host and Port: in these two fields, enter the
      location of the Storm Nimbus server and its port number,
      respectively.

    • DRPC endpoint and Port: in these two fields, enter the
      location of the Storm DRPC endpoint and its port number,
      respectively.

  3. In the Topology name field, enter the
    name you want the Storm system to use for the Storm Job, or topology in
    terms of the Storm, you are designing. It is recommended to use the same
    name as you have named this Storm Job so that you can easily recognize this
    Job even within the Storm system.

  4. Select the Kill existing topology check box to make the
    Storm system stop any topology that has the same name as the Job you are
    designing and want to run.

    If you clear this check box and that Job of the same name is already running in the Storm
    system, the current Job will fail when you send it to Storm to run.

  5. Select the Submit topology check box to submit the
    current Job to the Storm system. This feature is used when you need to send
    a new topology to Storm or used together with the Kill
    existing topology
    feature when you need to update a running
    topology in the Storm system.

    If you clear this check box, when you run the current Job, the submission
    of this Job is ignored while the other configuration information is still
    taken into account, for example, killing an existing topology.

  6. Select the Monitor topology after
    submission
    check box to monitor the current Storm Job in the
    console of the Run view.

    If you clear this check box, you cannot read the monitoring information in
    the console.

  7. In the Stop monitoring after timeout
    reached
    field, enter, without the double quotation marks, the
    numeric value to indicate whether to stop the monitoring when a running
    topology reaches its timeout. The default value is -1, which means no timeout is applied.

  8. Select the Kill topology on quitting Talend Job check
    box to allow the Studio to kill, if it is still running, the current Job
    from the Storm system when you stop this Job from the Studio.

    If you clear this check box, the topology for this Job continues to run in the Storm
    system even though you kill it within the Studio or eventually shut down the
    Studio.

  9. If you need to use any other Storm properties specific to your situation,
    add them to the Storm configuration
    table.

Receiving the message from the Kafka channel

  1. Double-click tKafkaInput to open its
    Component view.

    use_case-tkafkainput3.png
  2. In the Zookeeper host field, enter the location of the
    Zookeeper service used to coordinate the Kafka cluster to be used.

  3. In the Port field, enter the port number of this
    Zookeeper service.

  4. In the Topic name field, enter the name of the topic in
    which you need to receive messages. In this scenario, the topic is activities.

Extracting information from the message

  1. Double-click tJavaStorm to open its
    Component view.

    use_case-tkafkainput4.png
  2. Click the […] button next to Edit schema to open the schema editor.

  3. On the output side (right), click the [+] button three
    times to add three rows and in the Column
    column, rename them to firstname,
    gender and activity, respectively. These columns correspond to the
    information you can extract from the sample message.

    use_case-tkafkainput5.png
  4. Click OK to validate these changes and
    accept the propagation prompted by the pop-up dialog box.

  5. In the Bolt code area, enter the main
    method of the bolt to be executed. In this scenario, the code is as
    follows:

Aggregating the extracted information

  1. Double-click tAggregateRow to open its
    Component view. This component allows
    you to find out the most popular activity recorded in the received
    messages.

    use_case-tkafkainput6.png
  2. Click the […] button next to Edit schema to open the schema editor.

  3. On the output side (right), click the [+]
    button three times to add three rows and in the Column column, rename them to activity, gender and
    popularity, respectively.

    use_case-tkafkainput7.png
  4. In the Type column of the popularity row of the output side, select
    Double.

  5. Click OK to validate these changes and
    accept the propagation prompted by the pop-up dialog box.

  6. In the Group by table, add two rows by
    clicking the [+] button twice and configure
    these two rows as follows to group the outputted data:

    • Output column: select the
      columns from the output schema to be used as the conditions to
      group the outputted data. In this example, they are activity and gender.

    • Input column position: select
      the columns from the input schema to send data to the output
      columns you have selected in the Output
      column
      column. In this scenario, they are
      activity and gender.

  7. In the Operations table, add one row by
    clicking the [+] button once and configure
    this row as follows to calculate the popularity of each activity:

    • Output column: select the
      column from the output schema to carry the calculated results.
      In this scenario, it is popularity.

    • Function: select the function
      to be used to process the incoming data. In this scenario,
      select count. It counts the
      frequency of each activity in the received messages.

    • Input column position: select
      the column from the input schema to provide the data to be
      processed. In this scenario, it is activity.

Executing the Job

Then you can run this Job.

The tLogRow component is used to present the
execution result of the Job.

Press F6 to run this Job

Once done, the Run view is opened automatically,
where you can check the execution result.

use_case-tkafkainput8.png

You can read that the activity Drink is the most
popular with 3 occurrences for the gender M
(Male) and 1 occurrence for the gender F (Female)
in the messages.

The Storm topology continues to run, waiting for messages to appear on the Kafka
message broker until you kill the Job. In this scenario, because the Kill topology on quitting Talend Job check box is
selected, the Storm topology will be stopped and removed from the cluster when this
Job is stopped.


Document get from Talend https://help.talend.com
Thank you for watching.
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x