July 30, 2023

tMongoDBInput – Docs for ESB 7.x

tMongoDBInput

Retrieves records from a collection in the MongoDB database and transfers them to
the following component for display or storage.

tMongoDBInput retrieves certain
documents from a MongoDB database collection by supplying a query document containing
the fields the desired documents should match.

Depending on the Talend
product you are using, this component can be used in one, some or all of the following
Job frameworks:

tMongoDBInput Standard properties

These properties are used to configure tMongoDBInput running in the Standard Job framework.

The Standard
tMongoDBInput component belongs to the Big Data and the Databases NoSQL families.

The component in this framework is available in all Talend products with Big Data
and in Talend Data Fabric.

Basic settings

Use existing connection

Select this check box and in the Component List click the relevant connection component to
reuse the connection details you already defined.

DB Version

List of the database versions.

Available when the Use existing
connection
check box is not selected.

Use replica set address

Select this check box to show the Replica
address
table.

In the Replica address table, you can
define multiple MongoDB database servers for failover.

Available when the Use existing
connection
check box is not selected.

Server and Port

IP address and listening port of the database server.

Available when the Use existing
connection
or Use replica set
address
check box is not selected.

Database

Name of the database.

Use SSL connection

Select this check box to enable the SSL or TLS encrypted connection.

Then you need to use the tSetKeystore
component in the same Job to specify the encryption information.

Note that the SSL connection is available only for the version 2.4 + of
MongoDB.

Set read preference

Select this check box and from the Read preference
drop-down list that is displayed, select the member to which you need to direct the read
operations.

If you leave this check box clear, the Job uses the default Read preference, that is to
say, uses the primary member in a replica set.

For further information, see MongoDB’s documentation about Replication and its Read
preferences.

Required authentication

Select this check box to enable the database authentication.

Among the mechanisms listed on the Authentication mechanism
drop-down list, the NEGOTIATE one is recommended if
you are not using Kerberos, because it automatically select the authentication mechanism
the most adapted to the MongoDB version you are using.

For details about the other mechanisms in this list, see MongoDB Authentication from the MongoDB
documentation.

Set Authentication database

If the username to be used to connect to MongoDB has been created in a specific
Authentication database of MongoDB, select this check box to enter the name of this
Authentication database in the Authentication database
field that is displayed.

For further information about the MongoDB Authentication database, see User Authentication database.

Username and Password

DB user authentication data.

To enter the password, click the […] button next to the
password field, and then in the pop-up dialog box enter the password between double quotes
and click OK to save the settings.

Available when the Required
authentication
check box is selected.

If the security system you have selected from the Authentication mechanism drop-down list is Kerberos, you need to
enter the User principal, the Realm and the KDC
server
fields instead of the Username and the Password
fields.

Collection

Name of the collection in the MongoDB database.

Schema and Edit
Schema

A schema is a row description. It defines the number of fields
(columns) to be processed and passed on to the next component. When you create a Spark
Job, avoid the reserved word line when naming the
fields.

Click Edit
schema
to make changes to the schema. If the current schema is of the Repository type, three options are available:

  • View schema: choose this
    option to view the schema only.

  • Change to built-in property:
    choose this option to change the schema to Built-in for local changes.

  • Update repository connection:
    choose this option to change the schema stored in the repository and decide whether
    to propagate the changes to all the Jobs upon completion. If you just want to
    propagate the changes to the current Job, you can select No upon completion and choose this schema metadata
    again in the Repository Content
    window.

If a column in the database is a JSON document and you need to read
the entire document, put an asterisk (*) in the DB
column
column, without quotation marks around.

Query

Specify the query condition. This field is available only when you
have selected Find query from the
Query type drop-down list.

For example, type in "{id:4}" to retrieve the record
whose id is 4 from the collection specified in the Collection field.

Note:

Different from the query statements required in the MongoDB client
software, the query here refers to the contents inside
find(), such as the query here {id:4}
versus the MongoDB client query
db.blog.find({id:4}).

Aggregation stages

Create a MongoDB aggregation pipeline by adding the stages you want
the documents to pass through so as to obtain aggregated results from
these documents. This table is available only when you have selected
Aggregation pipeline query from the
Query type drop-down list.

Only one stage is allowed per row in this Aggregation stages table and the stages are executed one
by one in the order you place them in this table.

For example, if you want to aggregate documents about your customers
using the $match and the $group stages, you need to add two rows to
this Aggregation stages table and
define the two stages as
follows:

In this aggregation, the customer documents with status A are selected; then among the selected
customers, those using the same customer id are grouped and the values
from the amount fields of the same
customer are summed up.

For a full list of the stages you can use and their related operators,
see Aggregation pipeline operators.

For further information about MongoDB aggregation pipeline, see Aggregation pipeline.

Mapping

Each column of the schema defined for this component represents a field of the documents
to be read. In this table, you need to specify the parent nodes of these fields, if
any.

For example, in the document reading as
follows
The
first and the last
fields have person as their parent node but the _id field does not have any parent node. So once completed, this
Mapping table should read as
follows:

Sort by

Specify the column and choose the order for the sort operation.

This field is available only when you have selected Find query from the Query type drop-down list.

Limit

Type in the maximum number of records to be retrieved.

This field is available only when you have selected Find query from the Query type drop-down list.

Advanced settings

tStatCatcher Statistics

Select this check box to collect the log data at the component
level.

No query timeout

Select this check box to prevent MongoDB servers from stopping idle
cursors at the end of 10-minute inactivity of these cursors. In this
situation, an idle cursor will stay open until either the results of
this cursor are exhausted or you manually close it using the
cursor.close() method.

A cursor for MongoDB is a pointer to the result set of a query. By
default, that is to say, with this check box being clear, a MongoDB
server automatically stops idle cursors after a given inactivity period
to avoid excess memory use. For further information about MongoDB
cursors, see https://docs.mongodb.org/manual/core/cursors/.

Enable external sort

Since the aggregation pipeline stages have a maximum memory use limit
(100 megabytes) and a stage exceeding this limit will produce errors,
when handling large datasets, select this check box to avoid aggregation
stages exceeding this limit.

For further information about this external sort, see Large sort operation with external sort.

Global Variables

Global Variables

NB_LINE: the number of rows read by an input component or
transferred to an output component. This is an After variable and it returns an
integer.

ERROR_MESSAGE: the error message generated by the
component when an error occurs. This is an After variable and it returns a string. This
variable functions only if the Die on error check box is
cleared, if the component has this check box.

A Flow variable functions during the execution of a component while an After variable
functions after the execution of the component.

To fill up a field or expression with a variable, press Ctrl +
Space
to access the variable list and choose the variable to use from it.

For further information about variables, see
Talend Studio

User Guide.

Usage

Usage rule

As a start component, tMongoDBInput
allows you to retrieve records from a collection in the MongoDB database
and transfer them to the following component for display or
storage.

Retrieving data from a collection by advanced queries

This scenario applies only to Talend products with Big Data.

In this scenario, advanced MongoDB queries are used to retrieve the post by the author
Anderson.

There are such posts in the collection blog of the
MongoDB database talend:

tMongoDBInput_1.png

To insert data into the database, see Creating a collection and writing data to it.

Linking the components

  1. Drop tMongoDBConnection, tMongoDBClose, tMongoDBInput and tLogRow
    onto the workspace.
  2. Link tMongoDBConnection to tMongoDBInput using the OnSubjobOk trigger.
  3. Link tMongoDBInput to tMongoDBClose using the OnSubjobOk trigger.
  4. Link tMongoDBInput to tLogRow using a Row > Main
    connection.

    tMongoDBInput_2.png

Configuring the components

  1. Double-click tMongoDBConnection to open
    its Basic settings view.

    tMongoDBInput_3.png

  2. From the DB Version list, select the
    MongoDB version you are using.
  3. In the Server and Port fields, enter the connection details.
  4. In the Database field, enter the name of the MongoDB
    database.
  5. Double-click tMongoDBInput to open its
    Basic settings view.

    tMongoDBInput_4.png

  6. Select the Use existing connection
    option.
  7. In the Collection field, enter the name
    of the collection, namely blog.
  8. Click the […] button next to Edit schema to open the schema editor.

    tMongoDBInput_5.png

  9. Click the [+] button to add five columns,
    namely id, author, title, keywords and contents, with the type as Integer and String
    respectively.
  10. Click OK to close the editor.
  11. The columns now appear in the left part of the Mapping area.
  12. For columns author, title, keywords and contents,
    enter their parent node post so that the
    data can be retrieved from the correct positions.
  13. In the Query box, enter the advanced
    query statement to retrieve the posts whose author is Anderson:

    This statement requires that the sub-node of post, the node author,
    should have the value “Anderson”.
  14. Double-click tLogRow to open its
    Basic settings view.

    tMongoDBInput_6.png

    Select Table (print values in cells of a
    table)
    for a better display of the results.

Executing the Job

  1. Press Ctrl+S to save the Job.
  2. Press F6 to run the Job.

    tMongoDBInput_7.png

    As shown above, the post by Anderson is retrieved.

tMongoDBInput properties for Apache Spark Batch

These properties are used to configure tMongoDBInput running in the Spark Batch Job framework.

The Spark Batch
tMongoDBInput component belongs to the Databases family.

The component in this framework is available in all subscription-based Talend products with Big Data
and Talend Data Fabric.

Basic settings

Property type

Either Built-In or Repository.

Built-In: No property data stored centrally.

Repository: Select the repository file where the
properties are stored.

MongoDB configuration

Select this check box and in the Component List click the relevant connection component to
reuse the connection details you already defined.

Schema and Edit
Schema

A schema is a row description. It defines the number of fields
(columns) to be processed and passed on to the next component. When you create a Spark
Job, avoid the reserved word line when naming the
fields.

Click Edit
schema
to make changes to the schema. If the current schema is of the Repository type, three options are available:

  • View schema: choose this
    option to view the schema only.

  • Change to built-in property:
    choose this option to change the schema to Built-in for local changes.

  • Update repository connection:
    choose this option to change the schema stored in the repository and decide whether
    to propagate the changes to all the Jobs upon completion. If you just want to
    propagate the changes to the current Job, you can select No upon completion and choose this schema metadata
    again in the Repository Content
    window.

If a column in the database is a JSON document and you need to read
the entire document, put an asterisk (*) in the DB
column
column, without quotation marks around.

Collection

Enter the name of the collection to be used.

A MongoDB collection is the equivalent of an RDBMS table and contains
documents.

If the collection to be used is not sharded, it is recommended to add the
mongo.input.split_size property to the Advanced Hadoop MongoDB properties table. This
parameter determines how the collection is going to be partitioned and read by the
Spark executors. The number of partitions of the input collection can be calculated
using the following formula:

Without
this property, Spark uses the default value, 8 MB, for the partition size.
For
example:
In this
example, Spark dispatches 1 MB to each Spark executor in order to read the
non-sharded collection in parallel. If the collection size is 10 MB, 10 executors
are employed.

Set read preference

Select this check box and from the Read preference
drop-down list that is displayed, select the member to which you need to direct the read
operations.

If you leave this check box clear, the Job uses the default Read preference, that is to
say, uses the primary member in a replica set.

For further information, see MongoDB’s documentation about Replication and its Read
preferences.

Query

Specify the query statement to select documents from the collection specified in the
Collection field. For example, type in "{'id':'4'}" to retrieve the record whose
id is
4 from the
collection.

The default query, {} within double quotation marks provided with this
component, means to select all of the files. You can also apply a regular expression by
putting {'filename':{'$regex':'REGEX_PATTERN'}} to define the file names to be
used.

Different from the query statements required in the MongoDB client software, the query
here refers to the contents inside find(), such as the query
{'filename':{'$regex':'REGEX_PATTERN'}} here is the equivalent of
db.blog.find({filename:{$regex:REGEX_PATTERN}}) in the MongoDB client
query.

Mapping

Each column of the schema defined for this component represents a field of the documents
to be read. In this table, you need to specify the parent nodes of these fields, if
any.

For example, in the document reading as
follows
The
first and the last
fields have person as their parent node but the _id field does not have any parent node. So once completed, this
Mapping table should read as
follows:

Limit

Enter the maximum number of records to be retrieved.

Advanced settings

Advanced Hadoop MongoDB
properties

Add properties to define extra operations you need tMongoDBInput to perform when reading data.

The available properties are listed and explained in MongoDB Connector for
Hadoop
.

If the collection to be used is not sharded, it is recommended to add the
mongo.input.split_size property to the Advanced Hadoop MongoDB properties table. This
parameter determines how the collection is going to be partitioned and read by the
Spark executors. The number of partitions of the input collection can be calculated
using the following formula:

Without
this property, Spark uses the default value, 8 MB, for the partition size.
For
example:
In this
example, Spark dispatches 1 MB to each Spark executor in order to read the
non-sharded collection in parallel. If the collection size is 10 MB, 10 executors
are employed.

Usage

Usage rule

This component is used as a start component and requires an output
link..

This component should use a tMongoDBConfiguration component present in the same Job to connect
to a MongoDB database. You need to drop a tMongoDBConfiguration component alongside this component and
configure the Basic settings of this component
to use tMongoDBConfiguration.

This component, along with the Spark Batch component Palette it belongs to,
appears only when you are creating a Spark Batch Job.

Note that in this documentation, unless otherwise explicitly stated, a
scenario presents only Standard Jobs, that is to
say traditional
Talend
data integration Jobs.

Spark Connection

In the Spark
Configuration
tab in the Run
view, define the connection to a given Spark cluster for the whole Job. In
addition, since the Job expects its dependent jar files for execution, you must
specify the directory in the file system to which these jar files are
transferred so that Spark can access these files:

  • Yarn mode (Yarn client or Yarn cluster):

    • When using Google Dataproc, specify a bucket in the
      Google Storage staging bucket
      field in the Spark configuration
      tab.

    • When using HDInsight, specify the blob to be used for Job
      deployment in the Windows Azure Storage
      configuration
      area in the Spark
      configuration
      tab.

    • When using Altus, specify the S3 bucket or the Azure
      Data Lake Storage for Job deployment in the Spark
      configuration
      tab.
    • When using Qubole, add a
      tS3Configuration to your Job to write
      your actual business data in the S3 system with Qubole. Without
      tS3Configuration, this business data is
      written in the Qubole HDFS system and destroyed once you shut
      down your cluster.
    • When using on-premise
      distributions, use the configuration component corresponding
      to the file system your cluster is using. Typically, this
      system is HDFS and so use tHDFSConfiguration.

  • Standalone mode: use the
    configuration component corresponding to the file system your cluster is
    using, such as tHDFSConfiguration or
    tS3Configuration.

    If you are using Databricks without any configuration component present
    in your Job, your business data is written directly in DBFS (Databricks
    Filesystem).

This connection is effective on a per-Job basis.

Writing and reading data from MongoDB using a Spark Batch Job

This scenario applies only to subscription-based Talend products with Big
Data
.

In this scenario, you create a Spark Batch Job to write data about some movie
directors into the MongoDB default database and then read the data from
this database.

tMongoDBInput_8.png
The sample data about movie directors reads as
follows:

This data contains the names of these directors and the ID numbers distributed
to them.

Note that the sample data is created for demonstration purposes only.

tHDFSConfiguration is used in this scenario by Spark to connect
to the HDFS system where the jar files dependent on the Job are transferred.

In the Spark
Configuration
tab in the Run
view, define the connection to a given Spark cluster for the whole Job. In
addition, since the Job expects its dependent jar files for execution, you must
specify the directory in the file system to which these jar files are
transferred so that Spark can access these files:

  • Yarn mode (Yarn client or Yarn cluster):

    • When using Google Dataproc, specify a bucket in the
      Google Storage staging bucket
      field in the Spark configuration
      tab.

    • When using HDInsight, specify the blob to be used for Job
      deployment in the Windows Azure Storage
      configuration
      area in the Spark
      configuration
      tab.

    • When using Altus, specify the S3 bucket or the Azure
      Data Lake Storage for Job deployment in the Spark
      configuration
      tab.
    • When using Qubole, add a
      tS3Configuration to your Job to write
      your actual business data in the S3 system with Qubole. Without
      tS3Configuration, this business data is
      written in the Qubole HDFS system and destroyed once you shut
      down your cluster.
    • When using on-premise
      distributions, use the configuration component corresponding
      to the file system your cluster is using. Typically, this
      system is HDFS and so use tHDFSConfiguration.

  • Standalone mode: use the
    configuration component corresponding to the file system your cluster is
    using, such as tHDFSConfiguration or
    tS3Configuration.

    If you are using Databricks without any configuration component present
    in your Job, your business data is written directly in DBFS (Databricks
    Filesystem).

Prerequisite: ensure that the Spark cluster and the
MongoDB database to be used have been properly installed and are running.

To replicate this scenario, proceed as follows:

Linking the components

  1. In the
    Integration
    perspective of the Studio, create an empty
    Spark Batch Job from the Job Designs node in
    the Repository tree view.

    For further information about how to create a Spark Batch Job, see
    Talend Open Studio for Big Data Getting Started Guide
    .
  2. In the workspace, enter the name of the component to be used and select this
    component from the list that appears. In this scenario, the components are
    tHDFSConfiguration, tMongoDBConfiguration, tFixedFlowInput, tMongoDBOutput,
    tMongoDBInput and tLogRow.

    The tFixedFlowInput components are used to
    load the sample data into the data flow. In the real-world practice, you can use
    other components such as tFileInputDelimited,
    alone or even with a tMap, in the place of
    tFixedFlowInput to design a sophisticated
    process to prepare your data to be processed.
  3. Connect tFixedFlowInput to tMongoDBOutput using the Row >
    Main
    link.
  4. Connect tMongoDBInput to tLogRow using the Row >
    Main
    link.
  5. Connect tFixedFlowInput to tMongoDBInput using the Trigger >
    OnSubjobOk
    link.
  6. Leave tHDFSConfiguration and tMongoDBConfiguration alone without any
    connection.

Selecting the Spark mode

Depending on the Spark cluster to be used, select a Spark mode for your Job.

The Spark documentation provides an exhaustive list of Spark properties and
their default values at Spark Configuration. A Spark Job designed in the Studio uses
this default configuration except for the properties you explicitly defined in the
Spark Configuration tab or the components
used in your Job.

  1. Click Run to open its view and then click the
    Spark Configuration tab to display its view
    for configuring the Spark connection.
  2. Select the Use local mode check box to test your Job locally.

    In the local mode, the Studio builds the Spark environment in itself on the fly in order to
    run the Job in. Each processor of the local machine is used as a Spark
    worker to perform the computations.

    In this mode, your local file system is used; therefore, deactivate the
    configuration components such as tS3Configuration or
    tHDFSConfiguration that provides connection
    information to a remote file system, if you have placed these components
    in your Job.

    You can launch
    your Job without any further configuration.

  3. Clear the Use local mode check box to display the
    list of the available Hadoop distributions and from this list, select
    the distribution corresponding to your Spark cluster to be used.

    This distribution could be:

    • Databricks

    • Qubole

    • Amazon EMR

      For this distribution, Talend supports:

      • Yarn client

      • Yarn cluster

    • Cloudera

      For this distribution, Talend supports:

      • Standalone

      • Yarn client

      • Yarn cluster

    • Google Cloud
      Dataproc

      For this distribution, Talend supports:

      • Yarn client

    • Hortonworks

      For this distribution, Talend supports:

      • Yarn client

      • Yarn cluster

    • MapR

      For this distribution, Talend supports:

      • Standalone

      • Yarn client

      • Yarn cluster

    • Microsoft HD
      Insight

      For this distribution, Talend supports:

      • Yarn cluster

    • Cloudera Altus

      For this distribution, Talend supports:

      • Yarn cluster

        Your Altus cluster should run on the following Cloud
        providers:

        • Azure

          The support for Altus on Azure is a technical
          preview feature.

        • AWS

    As a Job relies on Avro to move data among its components, it is recommended to set your
    cluster to use Kryo to handle the Avro types. This not only helps avoid
    this Avro known issue but also
    brings inherent preformance gains. The Spark property to be set in your
    cluster is:

    If you cannot find the distribution corresponding to yours from this
    drop-down list, this means the distribution you want to connect to is not officially
    supported by
    Talend
    . In this situation, you can select Custom, then select the Spark
    version
    of the cluster to be connected and click the
    [+] button to display the dialog box in which you can
    alternatively:

    1. Select Import from existing
      version
      to import an officially supported distribution as base
      and then add other required jar files which the base distribution does not
      provide.

    2. Select Import from zip to
      import the configuration zip for the custom distribution to be used. This zip
      file should contain the libraries of the different Hadoop/Spark elements and the
      index file of these libraries.

      In
      Talend

      Exchange, members of
      Talend
      community have shared some ready-for-use configuration zip files
      which you can download from this Hadoop configuration
      list and directly use them in your connection accordingly. However, because of
      the ongoing evolution of the different Hadoop-related projects, you might not be
      able to find the configuration zip corresponding to your distribution from this
      list; then it is recommended to use the Import from
      existing version
      option to take an existing distribution as base
      to add the jars required by your distribution.

      Note that custom versions are not officially supported by

      Talend
      .
      Talend
      and its community provide you with the opportunity to connect to
      custom versions from the Studio but cannot guarantee that the configuration of
      whichever version you choose will be easy. As such, you should only attempt to
      set up such a connection if you have sufficient Hadoop and Spark experience to
      handle any issues on your own.

    For a step-by-step example about how to connect to a custom
    distribution and share this connection, see Hortonworks.

Configuring the connection to the file system to be used by Spark

Skip this section if you are using Google Dataproc or HDInsight, as for these two
distributions, this connection is configured in the Spark
configuration
tab.

  1. Double-click tHDFSConfiguration to open its Component view.

    Spark uses this component to connect to the HDFS system to which the jar
    files dependent on the Job are transferred.

  2. If you have defined the HDFS connection metadata under the Hadoop
    cluster
    node in Repository, select
    Repository from the Property
    type
    drop-down list and then click the
    […] button to select the HDFS connection you have
    defined from the Repository content wizard.

    For further information about setting up a reusable
    HDFS connection, search for centralizing HDFS metadata on Talend Help Center
    (https://help.talend.com).

    If you complete this step, you can skip the following steps about configuring
    tHDFSConfiguration because all the required fields
    should have been filled automatically.

  3. In the Version area, select
    the Hadoop distribution you need to connect to and its version.
  4. In the NameNode URI field,
    enter the location of the machine hosting the NameNode service of the cluster.
    If you are using WebHDFS, the location should be
    webhdfs://masternode:portnumber; WebHDFS with SSL is not
    supported yet.
  5. In the Username field, enter
    the authentication information used to connect to the HDFS system to be used.
    Note that the user name must be the same as you have put in the Spark configuration tab.

Configuring the connection to the MongoDB database to be used by Spark

  1. Double-click tMongoDBConfiguration to open its
    Component view.


    tMongoDBInput_9.png
  2. From the DB Version list, select the version
    of the MongoDB database to be used.
  3. In the Server field and the Port field, enter corresponding information of the
    MongoDB database.
  4. In the Database field, enter the name of the
    database. This database must already exist.

Loading the sample data

  1. Double-click the tFixedFlowIput component to
    open its Component view.

    tMongoDBInput_10.png

  2. Click the […] button next to Edit schema to open the schema editor.
  3. Click the [+] button to add the schema
    columns as shown in this image.

    tMongoDBInput_11.png

  4. Click OK to validate these changes and accept
    the propagation prompted by the pop-up dialog box.
  5. In the Mode area, select the Use Inline Content radio button and paste the
    above-mentioned sample data about movie directors into the Content field that is displayed.
  6. In the Field separator field, enter a
    semicolon (;).

Writing data to MongoDB

  1. Double-click tMongoDBOutput to open its
    Component view.

    tMongoDBInput_12.png

  2. If this component does not have the same schema of the preceding
    component, a warning icon appears. In this situation, click the Sync columns button to retrieve the schema from the
    preceding one and once done, the warning icon disappears.
  3. In the Collection field, enter the name of
    the collection to which you need to write data. If this collection does not
    exist, it will be automatically created at runtime.
  4. From the Action on data list, select the
    operation to be performed on the data. In this example, select Insert, which creates documents in MongoDB whether
    these documents already exist or not and in either case, generates a new
    technical ID for each of the new documents.
  5. In the Mapping table, the id and the name
    columns have been automatically added. You need to define how the data from
    these two columns should be transformed into a hierarchical construct in
    MongoDB.

    In this example, enter, within double quotation marks, person in the Parent node path
    column for each row. This way, each director record is added to a node called
    person. If you leave this Parent node path column empty, these records are added to
    the root of each document.

Reading data from MongoDB

  1. Double-click tMongoDBInput to open its
    Component view.

    tMongoDBInput_13.png

  2. Click the […] button next to Edit schema to open the schema editor.
  3. Click the [+] button to add the schema
    columns for output as shown in this image.

    tMongoDBInput_14.png

    If you want to extract the technical ID of each document, add a column called
    _id to the schema. In this example, this
    column is added. These technical IDs were generated at random by MongoDB when
    the sample data was written to the database.
  4. In the Collection field, enter the name of
    the collection from which you need to read data. In this example, it is the
    director one used previously in tMongoDBOutput.
  5. In the Mapping table, the three output
    columns have been automatically added. You need to add the parent nodes they
    belong to in the MongoDB documents. In this example, enter, within double
    quotation marks, person in the Parent node path column for the id and the name columns and leave
    the _id column as is, meaning that the
    _id field is at the root of each
    document.

    The tMongDBInput component parses the
    extracted documents according to this mapping and writes the data in the
    corresponding columns.

Executing the Job

Then you can run this Job.

The tLogRow component is used to present the
execution result of the Job.

  1. Double-click the tLogRow component to open
    the Component view.
  2. Select the Table radio button to present the
    result in a table.
  3. Press F6 to run this Job.

Once done, in the console of the Run view, you can
check the execution result.

tMongoDBInput_15.png

Note that you can manage the level of the execution information to be outputted in
this console by selecting the log4jLevel check box in
the Advanced settings tab and then selecting the level
of the information you want to display.

For more information on the log4j logging levels, see the Apache documentation at http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html.

tMongoDBInput properties for Apache Spark Streaming

These properties are used to configure tMongoDBInput running in the Spark Streaming Job framework.

The Spark Streaming
tMongoDBInput component belongs to the Databases family.

In this type of Job, tMongDBInput is used to provide
lookup data, when the size of the lookup data fits the amount of memory allocated for the
execution of the Job. It is executed once to read data from MongoDB and store the data in
memory so that the micro-batches from the main flow can easily access the data. If the
lookup data is too large to be stored in memory, it is recommended to use tMongoDBLookupInput instead, which loads only the data matching the lookup
join key.

This component is available in Talend Real Time Big Data Platform and Talend Data Fabric.

Basic settings

Property type

Either Built-In or Repository.

Built-In: No property data stored centrally.

Repository: Select the repository file where the
properties are stored.

MongoDB configuration

Select this check box and in the Component List click the relevant connection component to
reuse the connection details you already defined.

Schema and Edit
Schema

A schema is a row description. It defines the number of fields
(columns) to be processed and passed on to the next component. When you create a Spark
Job, avoid the reserved word line when naming the
fields.

Click Edit
schema
to make changes to the schema. If the current schema is of the Repository type, three options are available:

  • View schema: choose this
    option to view the schema only.

  • Change to built-in property:
    choose this option to change the schema to Built-in for local changes.

  • Update repository connection:
    choose this option to change the schema stored in the repository and decide whether
    to propagate the changes to all the Jobs upon completion. If you just want to
    propagate the changes to the current Job, you can select No upon completion and choose this schema metadata
    again in the Repository Content
    window.

If a column in the database is a JSON document and you need to read
the entire document, put an asterisk (*) in the DB
column
column, without quotation marks around.

Collection

Enter the name of the collection to be used.

A MongoDB collection is the equivalent of an RDBMS table and contains
documents.

If the collection to be used is not sharded, it is recommended to add the
mongo.input.split_size property to the Advanced Hadoop MongoDB properties table. This
parameter determines how the collection is going to be partitioned and read by the
Spark executors. The number of partitions of the input collection can be calculated
using the following formula:

Without
this property, Spark uses the default value, 8 MB, for the partition size.
For
example:
In this
example, Spark dispatches 1 MB to each Spark executor in order to read the
non-sharded collection in parallel. If the collection size is 10 MB, 10 executors
are employed.

Set read preference

Select this check box and from the Read preference
drop-down list that is displayed, select the member to which you need to direct the read
operations.

If you leave this check box clear, the Job uses the default Read preference, that is to
say, uses the primary member in a replica set.

For further information, see MongoDB’s documentation about Replication and its Read
preferences.

Query

Specify the query statement to select documents from the collection specified in the
Collection field. For example, type in "{'id':'4'}" to retrieve the record whose
id is
4 from the
collection.

The default query, {} within double quotation marks provided with this
component, means to select all of the files. You can also apply a regular expression by
putting {'filename':{'$regex':'REGEX_PATTERN'}} to define the file names to be
used.

Different from the query statements required in the MongoDB client software, the query
here refers to the contents inside find(), such as the query
{'filename':{'$regex':'REGEX_PATTERN'}} here is the equivalent of
db.blog.find({filename:{$regex:REGEX_PATTERN}}) in the MongoDB client
query.

Mapping

Each column of the schema defined for this component represents a field of the documents
to be read. In this table, you need to specify the parent nodes of these fields, if
any.

For example, in the document reading as
follows
The
first and the last
fields have person as their parent node but the _id field does not have any parent node. So once completed, this
Mapping table should read as
follows:

Limit

Enter the maximum number of records to be retrieved.

Advanced settings

Advanced Hadoop MongoDB
properties

Add properties to define extra operations you need tMongoDBInput to perform when reading data.

The available properties are listed and explained in MongoDB Connector for
Hadoop
.

If the collection to be used is not sharded, it is recommended to add the
mongo.input.split_size property to the Advanced Hadoop MongoDB properties table. This
parameter determines how the collection is going to be partitioned and read by the
Spark executors. The number of partitions of the input collection can be calculated
using the following formula:

Without
this property, Spark uses the default value, 8 MB, for the partition size.
For
example:
In this
example, Spark dispatches 1 MB to each Spark executor in order to read the
non-sharded collection in parallel. If the collection size is 10 MB, 10 executors
are employed.

Usage

Usage rule

This component is used to provide lookup data, when the size of the lookup data fits the
amount of memory allocated for the execution of the Job. It is executed once to read data
from MongDB and store the data in memory so that the micro-batches from the main flow can
easily access the data. If the lookup data is too large to be stored in memory, it is
recommended to use tMongoDBLookupInput instead, which loads only the data
matching the lookup join key.

This component is used as a start component and requires an output link.

This component should use a tMongoDBConfiguration component present in the same Job to connect
to a MongoDB database. You need to drop a tMongoDBConfiguration component alongside this component and
configure the Basic settings of this component
to use tMongoDBConfiguration.

This component, along with the Spark Streaming component Palette it belongs to, appears
only when you are creating a Spark Streaming Job.

Note that in this documentation, unless otherwise explicitly stated, a scenario presents
only Standard Jobs, that is to say traditional
Talend
data
integration Jobs.

Spark Connection

In the Spark
Configuration
tab in the Run
view, define the connection to a given Spark cluster for the whole Job. In
addition, since the Job expects its dependent jar files for execution, you must
specify the directory in the file system to which these jar files are
transferred so that Spark can access these files:

  • Yarn mode (Yarn client or Yarn cluster):

    • When using Google Dataproc, specify a bucket in the
      Google Storage staging bucket
      field in the Spark configuration
      tab.

    • When using HDInsight, specify the blob to be used for Job
      deployment in the Windows Azure Storage
      configuration
      area in the Spark
      configuration
      tab.

    • When using Altus, specify the S3 bucket or the Azure
      Data Lake Storage for Job deployment in the Spark
      configuration
      tab.
    • When using Qubole, add a
      tS3Configuration to your Job to write
      your actual business data in the S3 system with Qubole. Without
      tS3Configuration, this business data is
      written in the Qubole HDFS system and destroyed once you shut
      down your cluster.
    • When using on-premise
      distributions, use the configuration component corresponding
      to the file system your cluster is using. Typically, this
      system is HDFS and so use tHDFSConfiguration.

  • Standalone mode: use the
    configuration component corresponding to the file system your cluster is
    using, such as tHDFSConfiguration or
    tS3Configuration.

    If you are using Databricks without any configuration component present
    in your Job, your business data is written directly in DBFS (Databricks
    Filesystem).

This connection is effective on a per-Job basis.

Related scenarios

No scenario is available for the Spark Streaming version of this component
yet.


Document get from Talend https://help.talend.com
Thank you for watching.
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x