Warning
This component will be available in the Palette of
Talend Studio on the condition that you have subscribed to one of
the Talend
solutions with Big Data.
|
Component family |
Storm/Input |
|
|
Function |
The tKafkaInput component transmits messages you need |
|
|
Purpose |
tKafkaInput is a generic message |
|
|
Basic settings |
Schema and Edit |
A schema is a row description. It defines the number of fields to be processed and passed on Note that the schema of this component is read-only. It stores the |
|
|
Zookeeper host |
Enter the address of the Zookeeper service of the Kafka system to be used. |
|
|
Port |
Enter the number of the client listening port of the Zookeeper |
| Topic name | Enter the name of the topic from which tKafkaInput receives the feed of messages. |
|
|
Usage |
In a Talend Storm Job, it is used as a start component. The other For further information about a Talend Storm Job, see the sections Note that in this documentation, unless otherwise explicitly stated, a scenario presents |
|
|
Storm Connection |
You need to use the Storm Configuration tab in the This connection is effective on a per-Job basis. |
|
In this scenario, a four-component Storm Job (a topology) is created to transmit
messages about the activities of some given people to the topology you are designing in
order to analyze the popularities of those activities.

This Job subscribes to the related topic created by the Kafka topic producer, which means
you need to install the Kafka cluster in your messagingg system to maintain the feeds of
messages. For further information about the Kafka messaging service, see Apache’s
documentation about Kafka.
On the other hand, since this Job runs on top of Storm, you need to ensure that your Storm
system is ready for use. For further information about Storm, see Apache’s documentation
about Storm.
Note that when you use the Storm system installed in the Hortonwork Data Platform 2.1
(HDP2.1), ensure that the Storm DRPC (distributed remote procedure call) servers’ names
have been properly defined in the Custom storm.yaml section of the Config tab of Storm
in Ambari’s web console. For example, you need to use two Storm DRPC servers which are
Server1 and Server2, then you must define them in the Custom storm.yaml secton as
follows: [Server1,Server2].
To replicate this scenario, proceed as follows.
In the real-world practice, the system that produces messages to Kafka is completely
decoupled. While in this scenario, Kafka itself is used to produce the sample
messages. You need to perform the following operations to produce these
messages:
-
Create the Kafka topic to be used to categorize the messages. The
following command is used for demonstration purposes only. If you need
further information about the creation of a Kafka topic, see Apache’s
documentation for
Kafka.1/usr/lib/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic activities --partitions 1 --replication-factor 1This command creates a topic named activities, using
the Kafka brokers managed by the Zookeeper service on the localhost machine. -
Publish the message you want to analyze to the activities topic you have just created. In this scenario,
Kafka is used to perform this publication using, for example, the following
command:12345678910echo 'Ryan|M|PoolRemy|M|DrinkRemy|M|HikingIrene|F|DrinkPierre|M|MovieIrene|F|PoolThomas|M|DrinkRyan|M|CookingWang|F|CookingChen|M|Drink | /usr/lib/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic activitiesThis
command publishes 10 simple messages12345678910Ryan|M|PoolRemy|M|DrinkRemy|M|HikingIrene|F|DrinkPierre|M|MovieIrene|F|PoolThomas|M|DrinkRyan|M|CookingWang|F|CookingChen|M|DrinkAs explained previously, you can use your actual message producer system instead to
perform the publication.
-
In the Integration perspective
of the Studio, create an empty Storm Job from the Job
Designs node in the Repository tree view.For further information about how to create a Storm Job, see Talend Big Data Getting Started Guide.
-
In the workspace, enter the name of the component to be used and select
this component from the list that appears. In this scenario, the components
are tKafkaInput, tJavaStorm, tAggregateRow,
and tLogRow. -
Connect them using Row > Main links.
-
Click Run to open its view and then click
Storm configuration to set up the
connection to the Storm system to be used.
-
In the Storm node roles area, indicate
the locations of the Nimbus server and the DRPC endpoint of the Storm
cluster to be used:-
Local mode: select this check
box to build the Storm environment within the Studio. In this
situation, the Storm Job you are designing is run locally within
the Studio and you do not need to define any specific Nimbus and
DRPC endpoint addresses. -
Nimbus host and Port: in these two fields, enter the
location of the Storm Nimbus server and its port number,
respectively. -
DRPC endpoint and Port: in these two fields, enter the
location of the Storm DRPC endpoint and its port number,
respectively.
-
-
In the Topology name field, enter the
name you want the Storm system to use for the Storm Job, or topology in
terms of the Storm, you are designing. It is recommended to use the same
name as you have named this Storm Job so that you can easily recognize this
Job even within the Storm system. -
Select the Kill existing topology check box to make the
Storm system stop any topology that has the same name as the Job you are
designing and want to run.If you clear this check box and that Job of the same name is already running in the Storm
system, the current Job will fail when you send it to Storm to run. -
Select the Submit topology check box to submit the
current Job to the Storm system. This feature is used when you need to send
a new topology to Storm or used together with the Kill
existing topology feature when you need to update a running
topology in the Storm system.If you clear this check box, when you run the current Job, the submission
of this Job is ignored while the other configuration information is still
taken into account, for example, killing an existing topology. -
Select the Monitor topology after
submission check box to monitor the current Storm Job in the
console of the Run view.If you clear this check box, you cannot read the monitoring information in
the console. -
In the Stop monitoring after timeout
reached field, enter, without the double quotation marks, the
numeric value to indicate whether to stop the monitoring when a running
topology reaches its timeout. The default value is -1, which means no timeout is applied. -
Select the Kill topology on quitting Talend Job check
box to allow the Studio to kill, if it is still running, the current Job
from the Storm system when you stop this Job from the Studio.If you clear this check box, the topology for this Job continues to run in the Storm
system even though you kill it within the Studio or eventually shut down the
Studio. -
If you need to use any other Storm properties specific to your situation,
add them to the Storm configuration
table.
-
Double-click tKafkaInput to open its
Component view.
-
In the Zookeeper host field, enter the location of the
Zookeeper service used to coordinate the Kafka cluster to be used. -
In the Port field, enter the port number of this
Zookeeper service. -
In the Topic name field, enter the name of the topic in
which you need to receive messages. In this scenario, the topic is activities.
-
Double-click tJavaStorm to open its
Component view.
-
Click the […] button next to Edit schema to open the schema editor.
-
On the output side (right), click the [+] button three
times to add three rows and in the Column
column, rename them to firstname,
gender and activity, respectively. These columns correspond to the
information you can extract from the sample message.
-
Click OK to validate these changes and
accept the propagation prompted by the pop-up dialog box. -
In the Bolt code area, enter the main
method of the bolt to be executed. In this scenario, the code is as
follows:123456String[] tokens = input.get_str().split("\|");collector.emit(new Values(tokens[0],tokens[1],tokens[2]));
-
Double-click tAggregateRow to open its
Component view. This component allows
you to find out the most popular activity recorded in the received
messages.
-
Click the […] button next to Edit schema to open the schema editor.
-
On the output side (right), click the [+]
button three times to add three rows and in the Column column, rename them to activity, gender and
popularity, respectively.
-
In the Type column of the popularity row of the output side, select
Double. -
Click OK to validate these changes and
accept the propagation prompted by the pop-up dialog box. -
In the Group by table, add two rows by
clicking the [+] button twice and configure
these two rows as follows to group the outputted data:-
Output column: select the
columns from the output schema to be used as the conditions to
group the outputted data. In this example, they are activity and gender. -
Input column position: select
the columns from the input schema to send data to the output
columns you have selected in the Output
column column. In this scenario, they are
activity and gender.
-
-
In the Operations table, add one row by
clicking the [+] button once and configure
this row as follows to calculate the popularity of each activity:-
Output column: select the
column from the output schema to carry the calculated results.
In this scenario, it is popularity. -
Function: select the function
to be used to process the incoming data. In this scenario,
select count. It counts the
frequency of each activity in the received messages. -
Input column position: select
the column from the input schema to provide the data to be
processed. In this scenario, it is activity.
-
Then you can run this Job.
The tLogRow component is used to present the
execution result of the Job.
Press F6 to run this Job
Once done, the Run view is opened automatically,
where you can check the execution result.

You can read that the activity Drink is the most
popular with 3 occurrences for the gender M
(Male) and 1 occurrence for the gender F (Female)
in the messages.
The Storm topology continues to run, waiting for messages to appear on the Kafka
message broker until you kill the Job. In this scenario, because the Kill topology on quitting Talend Job check box is
selected, the Storm topology will be stopped and removed from the cluster when this
Job is stopped.