tMongoDBInput
Retrieves records from a collection in the MongoDB database and transfers them to
the following component for display or storage.
tMongoDBInput retrieves certain
documents from a MongoDB database collection by supplying a query document containing
the fields the desired documents should match.
Depending on the Talend
product you are using, this component can be used in one, some or all of the following
Job frameworks:
-
Standard: see tMongoDBInput Standard properties.
The component in this framework is available in all Talend products with Big Data
and in Talend Data Fabric. -
Spark Batch: see tMongoDBInput properties for Apache Spark Batch.
The component in this framework is available in all subscription-based Talend products with Big Data
and Talend Data Fabric. -
Spark Streaming: see tMongoDBInput properties for Apache Spark Streaming.
In this type of Job, tMongDBInput is used to provide
lookup data, when the size of the lookup data fits the amount of memory allocated for the
execution of the Job. It is executed once to read data from MongoDB and store the data in
memory so that the micro-batches from the main flow can easily access the data. If the
lookup data is too large to be stored in memory, it is recommended to use tMongoDBLookupInput instead, which loads only the data matching the lookup
join key.This component is available in Talend Real Time Big Data Platform and Talend Data Fabric.
tMongoDBInput Standard properties
These properties are used to configure tMongoDBInput running in the Standard Job framework.
The Standard
tMongoDBInput component belongs to the Big Data and the Databases NoSQL families.
The component in this framework is available in all Talend products with Big Data
and in Talend Data Fabric.
Basic settings
Use existing connection |
Select this check box and in the Component List click the relevant connection component to |
||||
DB Version |
List of the database versions. Available when the Use existing |
||||
Use replica set address |
Select this check box to show the Replica In the Replica address table, you can Available when the Use existing |
||||
Server and Port |
IP address and listening port of the database server. Available when the Use existing |
||||
Database |
Name of the database. |
||||
Use SSL connection |
Select this check box to enable the SSL or TLS encrypted connection. Then you need to use the tSetKeystore Note that the SSL connection is available only for the version 2.4 + of |
||||
Set read preference |
Select this check box and from the Read preference If you leave this check box clear, the Job uses the default Read preference, that is to For further information, see MongoDB’s documentation about Replication and its Read |
||||
Required authentication |
Select this check box to enable the database authentication. Among the mechanisms listed on the Authentication mechanism For details about the other mechanisms in this list, see MongoDB Authentication from the MongoDB |
||||
Set Authentication database |
If the username to be used to connect to MongoDB has been created in a specific For further information about the MongoDB Authentication database, see User Authentication database. |
||||
Username and Password |
DB user authentication data. To enter the password, click the […] button next to the Available when the Required If the security system you have selected from the Authentication mechanism drop-down list is Kerberos, you need to |
||||
Collection |
Name of the collection in the MongoDB database. |
||||
Schema and Edit |
A schema is a row description. It defines the number of fields Click Edit
If a column in the database is a JSON document and you need to read |
||||
Query |
Specify the query condition. This field is available only when you For example, type in Note:
Different from the query statements required in the MongoDB client |
||||
Aggregation stages |
Create a MongoDB aggregation pipeline by adding the stages you want Only one stage is allowed per row in this Aggregation stages table and the stages are executed one For example, if you want to aggregate documents about your customers
using the $match and the $group stages, you need to add two rows to this Aggregation stages table and define the two stages as follows:
In this aggregation, the customer documents with status A are selected; then among the selected For a full list of the stages you can use and their related operators, For further information about MongoDB aggregation pipeline, see Aggregation pipeline. |
||||
Mapping |
Each column of the schema defined for this component represents a field of the documents For example, in the document reading as
follows
The first and the last fields have person as their parent node but the _id field does not have any parent node. So once completed, this Mapping table should read as follows:
|
||||
Sort by |
Specify the column and choose the order for the sort operation. This field is available only when you have selected Find query from the Query type drop-down list. |
||||
Limit |
Type in the maximum number of records to be retrieved. This field is available only when you have selected Find query from the Query type drop-down list. |
Advanced settings
tStatCatcher Statistics |
Select this check box to collect the log data at the component |
No query timeout |
Select this check box to prevent MongoDB servers from stopping idle A cursor for MongoDB is a pointer to the result set of a query. By |
Enable external sort |
Since the aggregation pipeline stages have a maximum memory use limit For further information about this external sort, see Large sort operation with external sort. |
Global Variables
Global Variables |
NB_LINE: the number of rows read by an input component or
ERROR_MESSAGE: the error message generated by the A Flow variable functions during the execution of a component while an After variable To fill up a field or expression with a variable, press Ctrl + For further information about variables, see |
Usage
Usage rule |
As a start component, tMongoDBInput |
Retrieving data from a collection by advanced queries
This scenario applies only to Talend products with Big Data.
In this scenario, advanced MongoDB queries are used to retrieve the post by the author
Anderson.
There are such posts in the collection blog of the
MongoDB database talend:
To insert data into the database, see Creating a collection and writing data to it.
Linking the components
-
Drop tMongoDBConnection, tMongoDBClose, tMongoDBInput and tLogRow
onto the workspace. - Link tMongoDBConnection to tMongoDBInput using the OnSubjobOk trigger.
- Link tMongoDBInput to tMongoDBClose using the OnSubjobOk trigger.
-
Link tMongoDBInput to tLogRow using a Row > Main
connection.
Configuring the components
-
Double-click tMongoDBConnection to open
its Basic settings view. -
From the DB Version list, select the
MongoDB version you are using. - In the Server and Port fields, enter the connection details.
-
In the Database field, enter the name of the MongoDB
database. -
Double-click tMongoDBInput to open its
Basic settings view. -
Select the Use existing connection
option. -
In the Collection field, enter the name
of the collection, namely blog. -
Click the […] button next to Edit schema to open the schema editor.
-
Click the [+] button to add five columns,
namely id, author, title, keywords and contents, with the type as Integer and String
respectively. - Click OK to close the editor.
- The columns now appear in the left part of the Mapping area.
-
For columns author, title, keywords and contents,
enter their parent node post so that the
data can be retrieved from the correct positions. -
In the Query box, enter the advanced
query statement to retrieve the posts whose author is Anderson:1"{post.author : 'Anderson'}"This statement requires that the sub-node of post, the node author,
should have the value “Anderson”. -
Double-click tLogRow to open its
Basic settings view.Select Table (print values in cells of a
table) for a better display of the results.
Executing the Job
- Press Ctrl+S to save the Job.
-
Press F6 to run the Job.
As shown above, the post by Anderson is retrieved.
Related scenarios
For related scenarios, see:
tMongoDBInput properties for Apache Spark Batch
These properties are used to configure tMongoDBInput running in the Spark Batch Job framework.
The Spark Batch
tMongoDBInput component belongs to the Databases family.
The component in this framework is available in all subscription-based Talend products with Big Data
and Talend Data Fabric.
Basic settings
Property type |
Either Built-In or Repository. Built-In: No property data stored centrally.
Repository: Select the repository file where the |
||||
MongoDB configuration |
Select this check box and in the Component List click the relevant connection component to |
||||
Schema and Edit |
A schema is a row description. It defines the number of fields Click Edit
If a column in the database is a JSON document and you need to read |
||||
Collection |
Enter the name of the collection to be used. A MongoDB collection is the equivalent of an RDBMS table and contains If the collection to be used is not sharded, it is recommended to add the
mongo.input.split_size property to the Advanced Hadoop MongoDB properties table. This parameter determines how the collection is going to be partitioned and read by the Spark executors. The number of partitions of the input collection can be calculated using the following formula:
Without this property, Spark uses the default value, 8 MB, for the partition size. For
example:
In this example, Spark dispatches 1 MB to each Spark executor in order to read the non-sharded collection in parallel. If the collection size is 10 MB, 10 executors are employed. |
||||
Set read preference |
Select this check box and from the Read preference If you leave this check box clear, the Job uses the default Read preference, that is to For further information, see MongoDB’s documentation about Replication and its Read |
||||
Query |
Specify the query statement to select documents from the collection specified in the The default query, Different from the query statements required in the MongoDB client software, the query |
||||
Mapping |
Each column of the schema defined for this component represents a field of the documents For example, in the document reading as
follows
The first and the last fields have person as their parent node but the _id field does not have any parent node. So once completed, this Mapping table should read as follows:
|
||||
Limit |
Enter the maximum number of records to be retrieved. |
Advanced settings
Advanced Hadoop MongoDB |
Add properties to define extra operations you need tMongoDBInput to perform when reading data. The available properties are listed and explained in MongoDB Connector for If the collection to be used is not sharded, it is recommended to add the
mongo.input.split_size property to the Advanced Hadoop MongoDB properties table. This parameter determines how the collection is going to be partitioned and read by the Spark executors. The number of partitions of the input collection can be calculated using the following formula:
Without this property, Spark uses the default value, 8 MB, for the partition size. For
example:
In this example, Spark dispatches 1 MB to each Spark executor in order to read the non-sharded collection in parallel. If the collection size is 10 MB, 10 executors are employed. |
Usage
Usage rule |
This component is used as a start component and requires an output This component should use a tMongoDBConfiguration component present in the same Job to connect This component, along with the Spark Batch component Palette it belongs to, Note that in this documentation, unless otherwise explicitly stated, a |
Spark Connection |
In the Spark
Configuration tab in the Run view, define the connection to a given Spark cluster for the whole Job. In addition, since the Job expects its dependent jar files for execution, you must specify the directory in the file system to which these jar files are transferred so that Spark can access these files:
This connection is effective on a per-Job basis. |
Writing and reading data from MongoDB using a Spark Batch Job
This scenario applies only to subscription-based Talend products with Big
Data.
In this scenario, you create a Spark Batch Job to write data about some movie
directors into the MongoDB default database and then read the data from
this database.
follows:
1 2 3 4 5 |
1;Gregg Araki 2;P.J. Hogan 3;Alan Rudolph 4;Alex Proyas 5;Alex Sichel |
This data contains the names of these directors and the ID numbers distributed
to them.
Note that the sample data is created for demonstration purposes only.
tHDFSConfiguration is used in this scenario by Spark to connect
to the HDFS system where the jar files dependent on the Job are transferred.
Configuration tab in the Run
view, define the connection to a given Spark cluster for the whole Job. In
addition, since the Job expects its dependent jar files for execution, you must
specify the directory in the file system to which these jar files are
transferred so that Spark can access these files:
-
Yarn mode (Yarn client or Yarn cluster):
-
When using Google Dataproc, specify a bucket in the
Google Storage staging bucket
field in the Spark configuration
tab. -
When using HDInsight, specify the blob to be used for Job
deployment in the Windows Azure Storage
configuration area in the Spark
configuration tab. - When using Altus, specify the S3 bucket or the Azure
Data Lake Storage for Job deployment in the Spark
configuration tab. - When using Qubole, add a
tS3Configuration to your Job to write
your actual business data in the S3 system with Qubole. Without
tS3Configuration, this business data is
written in the Qubole HDFS system and destroyed once you shut
down your cluster. -
When using on-premise
distributions, use the configuration component corresponding
to the file system your cluster is using. Typically, this
system is HDFS and so use tHDFSConfiguration.
-
-
Standalone mode: use the
configuration component corresponding to the file system your cluster is
using, such as tHDFSConfiguration or
tS3Configuration.If you are using Databricks without any configuration component present
in your Job, your business data is written directly in DBFS (Databricks
Filesystem).
Prerequisite: ensure that the Spark cluster and the
MongoDB database to be used have been properly installed and are running.
To replicate this scenario, proceed as follows:
Linking the components
-
In the
Integration
perspective of the Studio, create an empty
Spark Batch Job from the Job Designs node in
the Repository tree view.For further information about how to create a Spark Batch Job, see
Talend Open Studio for Big Data Getting Started Guide
. -
In the workspace, enter the name of the component to be used and select this
component from the list that appears. In this scenario, the components are
tHDFSConfiguration, tMongoDBConfiguration, tFixedFlowInput, tMongoDBOutput,
tMongoDBInput and tLogRow.The tFixedFlowInput components are used to
load the sample data into the data flow. In the real-world practice, you can use
other components such as tFileInputDelimited,
alone or even with a tMap, in the place of
tFixedFlowInput to design a sophisticated
process to prepare your data to be processed. -
Connect tFixedFlowInput to tMongoDBOutput using the Row >
Main link. -
Connect tMongoDBInput to tLogRow using the Row >
Main link. -
Connect tFixedFlowInput to tMongoDBInput using the Trigger >
OnSubjobOk link. -
Leave tHDFSConfiguration and tMongoDBConfiguration alone without any
connection.
Selecting the Spark mode
Depending on the Spark cluster to be used, select a Spark mode for your Job.
The Spark documentation provides an exhaustive list of Spark properties and
their default values at Spark Configuration. A Spark Job designed in the Studio uses
this default configuration except for the properties you explicitly defined in the
Spark Configuration tab or the components
used in your Job.
-
Click Run to open its view and then click the
Spark Configuration tab to display its view
for configuring the Spark connection. -
Select the Use local mode check box to test your Job locally.
In the local mode, the Studio builds the Spark environment in itself on the fly in order to
run the Job in. Each processor of the local machine is used as a Spark
worker to perform the computations.In this mode, your local file system is used; therefore, deactivate the
configuration components such as tS3Configuration or
tHDFSConfiguration that provides connection
information to a remote file system, if you have placed these components
in your Job.You can launch
your Job without any further configuration. -
Clear the Use local mode check box to display the
list of the available Hadoop distributions and from this list, select
the distribution corresponding to your Spark cluster to be used.This distribution could be:-
For this distribution, Talend supports:
-
Yarn client
-
Yarn cluster
-
-
For this distribution, Talend supports:
-
Standalone
-
Yarn client
-
Yarn cluster
-
-
For this distribution, Talend supports:
-
Yarn client
-
-
For this distribution, Talend supports:
-
Yarn client
-
Yarn cluster
-
-
For this distribution, Talend supports:
-
Standalone
-
Yarn client
-
Yarn cluster
-
-
For this distribution, Talend supports:
-
Yarn cluster
-
-
Cloudera Altus
For this distribution, Talend supports:-
Yarn cluster
Your Altus cluster should run on the following Cloud
providers:-
Azure
The support for Altus on Azure is a technical
preview feature. -
AWS
-
-
As a Job relies on Avro to move data among its components, it is recommended to set your
cluster to use Kryo to handle the Avro types. This not only helps avoid
this Avro known issue but also
brings inherent preformance gains. The Spark property to be set in your
cluster is:
1spark.serializer org.apache.spark.serializer.KryoSerializerIf you cannot find the distribution corresponding to yours from this
drop-down list, this means the distribution you want to connect to is not officially
supported by
Talend
. In this situation, you can select Custom, then select the Spark
version of the cluster to be connected and click the
[+] button to display the dialog box in which you can
alternatively:-
Select Import from existing
version to import an officially supported distribution as base
and then add other required jar files which the base distribution does not
provide. -
Select Import from zip to
import the configuration zip for the custom distribution to be used. This zip
file should contain the libraries of the different Hadoop/Spark elements and the
index file of these libraries.In
Talend
Exchange, members of
Talend
community have shared some ready-for-use configuration zip files
which you can download from this Hadoop configuration
list and directly use them in your connection accordingly. However, because of
the ongoing evolution of the different Hadoop-related projects, you might not be
able to find the configuration zip corresponding to your distribution from this
list; then it is recommended to use the Import from
existing version option to take an existing distribution as base
to add the jars required by your distribution.Note that custom versions are not officially supported by
Talend
.
Talend
and its community provide you with the opportunity to connect to
custom versions from the Studio but cannot guarantee that the configuration of
whichever version you choose will be easy. As such, you should only attempt to
set up such a connection if you have sufficient Hadoop and Spark experience to
handle any issues on your own.
For a step-by-step example about how to connect to a custom
distribution and share this connection, see Hortonworks.
Configuring the connection to the file system to be used by Spark
Skip this section if you are using Google Dataproc or HDInsight, as for these two
distributions, this connection is configured in the Spark
configuration tab.
-
Double-click tHDFSConfiguration to open its Component view.
Spark uses this component to connect to the HDFS system to which the jar
files dependent on the Job are transferred. -
If you have defined the HDFS connection metadata under the Hadoop
cluster node in Repository, select
Repository from the Property
type drop-down list and then click the
[…] button to select the HDFS connection you have
defined from the Repository content wizard.For further information about setting up a reusable
HDFS connection, search for centralizing HDFS metadata on Talend Help Center
(https://help.talend.com).If you complete this step, you can skip the following steps about configuring
tHDFSConfiguration because all the required fields
should have been filled automatically. -
In the Version area, select
the Hadoop distribution you need to connect to and its version. -
In the NameNode URI field,
enter the location of the machine hosting the NameNode service of the cluster.
If you are using WebHDFS, the location should be
webhdfs://masternode:portnumber; WebHDFS with SSL is not
supported yet. -
In the Username field, enter
the authentication information used to connect to the HDFS system to be used.
Note that the user name must be the same as you have put in the Spark configuration tab.
Configuring the connection to the MongoDB database to be used by Spark
-
Double-click tMongoDBConfiguration to open its
Component view.
-
From the DB Version list, select the version
of the MongoDB database to be used. -
In the Server field and the Port field, enter corresponding information of the
MongoDB database. -
In the Database field, enter the name of the
database. This database must already exist.
Loading the sample data
-
Double-click the tFixedFlowIput component to
open its Component view. - Click the […] button next to Edit schema to open the schema editor.
-
Click the [+] button to add the schema
columns as shown in this image. -
Click OK to validate these changes and accept
the propagation prompted by the pop-up dialog box. -
In the Mode area, select the Use Inline Content radio button and paste the
above-mentioned sample data about movie directors into the Content field that is displayed. -
In the Field separator field, enter a
semicolon (;).
Writing data to MongoDB
-
Double-click tMongoDBOutput to open its
Component view. -
If this component does not have the same schema of the preceding
component, a warning icon appears. In this situation, click the Sync columns button to retrieve the schema from the
preceding one and once done, the warning icon disappears. -
In the Collection field, enter the name of
the collection to which you need to write data. If this collection does not
exist, it will be automatically created at runtime. -
From the Action on data list, select the
operation to be performed on the data. In this example, select Insert, which creates documents in MongoDB whether
these documents already exist or not and in either case, generates a new
technical ID for each of the new documents. -
In the Mapping table, the id and the name
columns have been automatically added. You need to define how the data from
these two columns should be transformed into a hierarchical construct in
MongoDB.In this example, enter, within double quotation marks, person in the Parent node path
column for each row. This way, each director record is added to a node called
person. If you leave this Parent node path column empty, these records are added to
the root of each document.
Reading data from MongoDB
-
Double-click tMongoDBInput to open its
Component view. - Click the […] button next to Edit schema to open the schema editor.
-
Click the [+] button to add the schema
columns for output as shown in this image.If you want to extract the technical ID of each document, add a column called
_id to the schema. In this example, this
column is added. These technical IDs were generated at random by MongoDB when
the sample data was written to the database. -
In the Collection field, enter the name of
the collection from which you need to read data. In this example, it is the
director one used previously in tMongoDBOutput. -
In the Mapping table, the three output
columns have been automatically added. You need to add the parent nodes they
belong to in the MongoDB documents. In this example, enter, within double
quotation marks, person in the Parent node path column for the id and the name columns and leave
the _id column as is, meaning that the
_id field is at the root of each
document.The tMongDBInput component parses the
extracted documents according to this mapping and writes the data in the
corresponding columns.
Executing the Job
Then you can run this Job.
The tLogRow component is used to present the
execution result of the Job.
-
Double-click the tLogRow component to open
the Component view. -
Select the Table radio button to present the
result in a table. - Press F6 to run this Job.
Once done, in the console of the Run view, you can
check the execution result.
Note that you can manage the level of the execution information to be outputted in
this console by selecting the log4jLevel check box in
the Advanced settings tab and then selecting the level
of the information you want to display.
For more information on the log4j logging levels, see the Apache documentation at http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html.
tMongoDBInput properties for Apache Spark Streaming
These properties are used to configure tMongoDBInput running in the Spark Streaming Job framework.
The Spark Streaming
tMongoDBInput component belongs to the Databases family.
In this type of Job, tMongDBInput is used to provide
lookup data, when the size of the lookup data fits the amount of memory allocated for the
execution of the Job. It is executed once to read data from MongoDB and store the data in
memory so that the micro-batches from the main flow can easily access the data. If the
lookup data is too large to be stored in memory, it is recommended to use tMongoDBLookupInput instead, which loads only the data matching the lookup
join key.
This component is available in Talend Real Time Big Data Platform and Talend Data Fabric.
Basic settings
Property type |
Either Built-In or Repository. Built-In: No property data stored centrally.
Repository: Select the repository file where the |
||||
MongoDB configuration |
Select this check box and in the Component List click the relevant connection component to |
||||
Schema and Edit |
A schema is a row description. It defines the number of fields Click Edit
If a column in the database is a JSON document and you need to read |
||||
Collection |
Enter the name of the collection to be used. A MongoDB collection is the equivalent of an RDBMS table and contains If the collection to be used is not sharded, it is recommended to add the
mongo.input.split_size property to the Advanced Hadoop MongoDB properties table. This parameter determines how the collection is going to be partitioned and read by the Spark executors. The number of partitions of the input collection can be calculated using the following formula:
Without this property, Spark uses the default value, 8 MB, for the partition size. For
example:
In this example, Spark dispatches 1 MB to each Spark executor in order to read the non-sharded collection in parallel. If the collection size is 10 MB, 10 executors are employed. |
||||
Set read preference |
Select this check box and from the Read preference If you leave this check box clear, the Job uses the default Read preference, that is to For further information, see MongoDB’s documentation about Replication and its Read |
||||
Query |
Specify the query statement to select documents from the collection specified in the The default query, Different from the query statements required in the MongoDB client software, the query |
||||
Mapping |
Each column of the schema defined for this component represents a field of the documents For example, in the document reading as
follows
The first and the last fields have person as their parent node but the _id field does not have any parent node. So once completed, this Mapping table should read as follows:
|
||||
Limit |
Enter the maximum number of records to be retrieved. |
Advanced settings
Advanced Hadoop MongoDB |
Add properties to define extra operations you need tMongoDBInput to perform when reading data. The available properties are listed and explained in MongoDB Connector for If the collection to be used is not sharded, it is recommended to add the
mongo.input.split_size property to the Advanced Hadoop MongoDB properties table. This parameter determines how the collection is going to be partitioned and read by the Spark executors. The number of partitions of the input collection can be calculated using the following formula:
Without this property, Spark uses the default value, 8 MB, for the partition size. For
example:
In this example, Spark dispatches 1 MB to each Spark executor in order to read the non-sharded collection in parallel. If the collection size is 10 MB, 10 executors are employed. |
Usage
Usage rule |
This component is used to provide lookup data, when the size of the lookup data fits the This component is used as a start component and requires an output link. This component should use a tMongoDBConfiguration component present in the same Job to connect This component, along with the Spark Streaming component Palette it belongs to, appears Note that in this documentation, unless otherwise explicitly stated, a scenario presents |
Spark Connection |
In the Spark
Configuration tab in the Run view, define the connection to a given Spark cluster for the whole Job. In addition, since the Job expects its dependent jar files for execution, you must specify the directory in the file system to which these jar files are transferred so that Spark can access these files:
This connection is effective on a per-Job basis. |
Related scenarios
No scenario is available for the Spark Streaming version of this component
yet.