July 30, 2023

tMongoDBLookupInput – Docs for ESB 7.x

tMongoDBLookupInput

Executes a database query with a strictly defined order which must correspond to
the schema definition.

It passes on the extracted data to tMap in order to
provide the lookup data to the main flow. It must be directly connected to a tMap component and requires this tMap to use Reload at each row or Reload at each row (cache) for the lookup flow.

tMongoDBLookupInput properties for Apache Spark Streaming

These properties are used to configure tMongoDBLookupInput running in the Spark Streaming Job framework.

The Spark Streaming
tMongoDBLookupInput component belongs to the Databases family.

The component in this framework is available in Talend Real Time Big Data Platform and in Talend Data Fabric.

Basic settings

Property type

Either Built-In or Repository.

Built-In: No property data stored centrally.

Repository: Select the repository file where the
properties are stored.

MongoDB configuration

Select this check box and in the Component List click the relevant connection component to
reuse the connection details you already defined.

Schema and Edit
Schema

A schema is a row description. It defines the number of fields
(columns) to be processed and passed on to the next component. When you create a Spark
Job, avoid the reserved word line when naming the
fields.

Click Edit
schema
to make changes to the schema. If the current schema is of the Repository type, three options are available:

  • View schema: choose this
    option to view the schema only.

  • Change to built-in property:
    choose this option to change the schema to Built-in for local changes.

  • Update repository connection:
    choose this option to change the schema stored in the repository and decide whether
    to propagate the changes to all the Jobs upon completion. If you just want to
    propagate the changes to the current Job, you can select No upon completion and choose this schema metadata
    again in the Repository Content
    window.

If a column in the database is a JSON document and you need to read
the entire document, put an asterisk (*) in the DB
column
column, without quotation marks around.

Collection

Enter the name of the collection to be used.

A MongoDB collection is the equivalent of an RDBMS table and contains
documents.

Set read preference

Select this check box and from the Read preference
drop-down list that is displayed, select the member to which you need to direct the read
operations.

If you leave this check box clear, the Job uses the default Read preference, that is to
say, uses the primary member in a replica set.

For further information, see MongoDB’s documentation about Replication and its Read
preferences.

Query

Specify the query statement to select documents from the collection specified in the
Collection field.

For
example

In this code, row1 is not the label of the link to
tMongoDBLookupInput, but represents the main row
entering into tMap.

The result of the query must contain only records that match join key you need to use in
tMap. In other words, you must use the schema of the
main flow to tMap to construct the SQL statement here in
order to load only the matched records into the lookup flow.

This approach ensures that no redundant records are loaded into memory and outputted to
the component that follows.

Mapping

Each column of the schema defined for this component represents a field of the documents
to be read. In this table, you need to specify the parent nodes of these fields, if
any.

For example, in the document reading as
follows
The
first and the last
fields have person as their parent node but the _id field does not have any parent node. So once completed, this
Mapping table should read as
follows:

Limit

Enter the maximum number of records to be retrieved.

Advanced settings

No query timeout

Select this check box to prevent MongoDB servers from stopping idle
cursors at the end of 10-minute inactivity of these cursors. In this
situation, an idle cursor will stay open until either the results of
this cursor are exhausted or you manually close it using the
cursor.close() method.

A cursor for MongoDB is a pointer to the result set of a query. By
default, that is to say, with this check box being clear, a MongoDB
server automatically stops idle cursors after a given inactivity period
to avoid excess memory use. For further information about MongoDB
cursors, see https://docs.mongodb.org/manual/core/cursors/.

Usage

Usage rule

This component is used as a start component and requires an output link.

This component should use a tMongoDBConfiguration component present in the same Job to connect
to a MongoDB database. You need to drop a tMongoDBConfiguration component alongside this component and
configure the Basic settings of this component
to use tMongoDBConfiguration.

This component, along with the Spark Streaming component Palette it belongs to, appears
only when you are creating a Spark Streaming Job.

Note that in this documentation, unless otherwise explicitly stated, a scenario presents
only Standard Jobs, that is to say traditional
Talend
data
integration Jobs.

Spark Connection

In the Spark
Configuration
tab in the Run
view, define the connection to a given Spark cluster for the whole Job. In
addition, since the Job expects its dependent jar files for execution, you must
specify the directory in the file system to which these jar files are
transferred so that Spark can access these files:

  • Yarn mode (Yarn client or Yarn cluster):

    • When using Google Dataproc, specify a bucket in the
      Google Storage staging bucket
      field in the Spark configuration
      tab.

    • When using HDInsight, specify the blob to be used for Job
      deployment in the Windows Azure Storage
      configuration
      area in the Spark
      configuration
      tab.

    • When using Altus, specify the S3 bucket or the Azure
      Data Lake Storage for Job deployment in the Spark
      configuration
      tab.
    • When using Qubole, add a
      tS3Configuration to your Job to write
      your actual business data in the S3 system with Qubole. Without
      tS3Configuration, this business data is
      written in the Qubole HDFS system and destroyed once you shut
      down your cluster.
    • When using on-premise
      distributions, use the configuration component corresponding
      to the file system your cluster is using. Typically, this
      system is HDFS and so use tHDFSConfiguration.

  • Standalone mode: use the
    configuration component corresponding to the file system your cluster is
    using, such as tHDFSConfiguration or
    tS3Configuration.

    If you are using Databricks without any configuration component present
    in your Job, your business data is written directly in DBFS (Databricks
    Filesystem).

This connection is effective on a per-Job basis.

Reading and writing data in MongoDB using a Spark Streaming Job

This scenario applies only to Talend Real Time Big Data Platform and Talend Data Fabric.

In this scenario, you create a Spark Streaming Job to extract data about
given movie directors from MongoDB, use this data to filter and complete movie
information and then write the result into a MongoDB collection.

tMongoDBLookupInput_1.png
The sample data about movie directors reads as
follows:

This data contains the names of these directors and the ID numbers
distributed to them.

The structure of this data in MongoDB reads as
follows:

Note that the sample data is created for demonstration purposes only.

tHDFSConfiguration is used in this scenario by Spark to connect
to the HDFS system where the jar files dependent on the Job are transferred.

In the Spark
Configuration
tab in the Run
view, define the connection to a given Spark cluster for the whole Job. In
addition, since the Job expects its dependent jar files for execution, you must
specify the directory in the file system to which these jar files are
transferred so that Spark can access these files:

  • Yarn mode (Yarn client or Yarn cluster):

    • When using Google Dataproc, specify a bucket in the
      Google Storage staging bucket
      field in the Spark configuration
      tab.

    • When using HDInsight, specify the blob to be used for Job
      deployment in the Windows Azure Storage
      configuration
      area in the Spark
      configuration
      tab.

    • When using Altus, specify the S3 bucket or the Azure
      Data Lake Storage for Job deployment in the Spark
      configuration
      tab.
    • When using Qubole, add a
      tS3Configuration to your Job to write
      your actual business data in the S3 system with Qubole. Without
      tS3Configuration, this business data is
      written in the Qubole HDFS system and destroyed once you shut
      down your cluster.
    • When using on-premise
      distributions, use the configuration component corresponding
      to the file system your cluster is using. Typically, this
      system is HDFS and so use tHDFSConfiguration.

  • Standalone mode: use the
    configuration component corresponding to the file system your cluster is
    using, such as tHDFSConfiguration or
    tS3Configuration.

    If you are using Databricks without any configuration component present
    in your Job, your business data is written directly in DBFS (Databricks
    Filesystem).

Prerequisites:

  • The Spark cluster and the MongoDB database to be used have
    been properly installed and are running.

  • The above-mentioned data has been loaded in the MongoDB
    collection to be used.

To replicate this scenario, proceed as follows:

Linking the components

  1. In the
    Integration
    perspective of the Studio, create an empty
    Spark Batch Job from the Job Designs node in
    the Repository tree view.

    For further information about how to create a Spark Streaming Job, see

    Talend Open Studio for Big Data Getting Started Guide
    .
  2. In the workspace, enter the name of the component to be used and select this
    component from the list that appears. In this scenario, the components are
    tHDFSConfiguration, tMongoDBConfiguration, tFixedFlowInput, tMongoDBOutput,
    tMongoDBLookupInput, tMap and tLogRow.

    The tFixedFlowInput components are used to
    load the data about movies into the data flow. In the real-world practice, you
    can use other components such as tFileInputDelimited instead to design a sophisticated process to
    prepare your data to be processed.
  3. Connect tFixedFlowInput to tMap using the Row >
    Main
    link.

    This way, the main flow to tMap is created.
    The movie information is sent via this flow.
  4. Connect tMongoDBLookupInput to tMap using the Row >
    Main
    link.

    This way, the lookup flow to tMap is created.
    The movie director information is sent via this flow.
  5. Connect tMap to tMongoDBOutput using the Row > Main
    link and name this connection in the dialog box that is displayed. For example,
    name it to out1.
  6. Do the same to connect tMap to tLogRow and name this connection to reject.
  7. Leave tHDFSConfiguration and tMongoDBConfiguration alone without any
    connection.

Selecting the Spark mode

Depending on the Spark cluster to be used, select a Spark mode for your Job.

The Spark documentation provides an exhaustive list of Spark properties and
their default values at Spark Configuration. A Spark Job designed in the Studio uses
this default configuration except for the properties you explicitly defined in the
Spark Configuration tab or the components
used in your Job.

  1. Click Run to open its view and then click the
    Spark Configuration tab to display its view
    for configuring the Spark connection.
  2. Select the Use local mode check box to test your Job locally.

    In the local mode, the Studio builds the Spark environment in itself on the fly in order to
    run the Job in. Each processor of the local machine is used as a Spark
    worker to perform the computations.

    In this mode, your local file system is used; therefore, deactivate the
    configuration components such as tS3Configuration or
    tHDFSConfiguration that provides connection
    information to a remote file system, if you have placed these components
    in your Job.

    You can launch
    your Job without any further configuration.

  3. Clear the Use local mode check box to display the
    list of the available Hadoop distributions and from this list, select
    the distribution corresponding to your Spark cluster to be used.

    This distribution could be:

    • Databricks

    • Qubole

    • Amazon EMR

      For this distribution, Talend supports:

      • Yarn client

      • Yarn cluster

    • Cloudera

      For this distribution, Talend supports:

      • Standalone

      • Yarn client

      • Yarn cluster

    • Google Cloud
      Dataproc

      For this distribution, Talend supports:

      • Yarn client

    • Hortonworks

      For this distribution, Talend supports:

      • Yarn client

      • Yarn cluster

    • MapR

      For this distribution, Talend supports:

      • Standalone

      • Yarn client

      • Yarn cluster

    • Microsoft HD
      Insight

      For this distribution, Talend supports:

      • Yarn cluster

    • Cloudera Altus

      For this distribution, Talend supports:

      • Yarn cluster

        Your Altus cluster should run on the following Cloud
        providers:

        • Azure

          The support for Altus on Azure is a technical
          preview feature.

        • AWS

    As a Job relies on Avro to move data among its components, it is recommended to set your
    cluster to use Kryo to handle the Avro types. This not only helps avoid
    this Avro known issue but also
    brings inherent preformance gains. The Spark property to be set in your
    cluster is:

    If you cannot find the distribution corresponding to yours from this
    drop-down list, this means the distribution you want to connect to is not officially
    supported by
    Talend
    . In this situation, you can select Custom, then select the Spark
    version
    of the cluster to be connected and click the
    [+] button to display the dialog box in which you can
    alternatively:

    1. Select Import from existing
      version
      to import an officially supported distribution as base
      and then add other required jar files which the base distribution does not
      provide.

    2. Select Import from zip to
      import the configuration zip for the custom distribution to be used. This zip
      file should contain the libraries of the different Hadoop/Spark elements and the
      index file of these libraries.

      In
      Talend

      Exchange, members of
      Talend
      community have shared some ready-for-use configuration zip files
      which you can download from this Hadoop configuration
      list and directly use them in your connection accordingly. However, because of
      the ongoing evolution of the different Hadoop-related projects, you might not be
      able to find the configuration zip corresponding to your distribution from this
      list; then it is recommended to use the Import from
      existing version
      option to take an existing distribution as base
      to add the jars required by your distribution.

      Note that custom versions are not officially supported by

      Talend
      .
      Talend
      and its community provide you with the opportunity to connect to
      custom versions from the Studio but cannot guarantee that the configuration of
      whichever version you choose will be easy. As such, you should only attempt to
      set up such a connection if you have sufficient Hadoop and Spark experience to
      handle any issues on your own.

    For a step-by-step example about how to connect to a custom
    distribution and share this connection, see Hortonworks.

Configuring a Spark stream for your Apache Spark streaming Job

Define how often your Spark Job creates and processes micro batches.
  1. In the Batch size
    field, enter the time interval at the end of which the Job reviews the source
    data to identify changes and processes the new micro batches.
  2. If needs be, select the Define a streaming
    timeout
    check box and in the field that is displayed, enter the
    time frame at the end of which the streaming Job automatically stops
    running.

Configuring the connection to the file system to be used by Spark

Skip this section if you are using Google Dataproc or HDInsight, as for these two
distributions, this connection is configured in the Spark
configuration
tab.

  1. Double-click tHDFSConfiguration to open its Component view.

    Spark uses this component to connect to the HDFS system to which the jar
    files dependent on the Job are transferred.

  2. If you have defined the HDFS connection metadata under the Hadoop
    cluster
    node in Repository, select
    Repository from the Property
    type
    drop-down list and then click the
    […] button to select the HDFS connection you have
    defined from the Repository content wizard.

    For further information about setting up a reusable
    HDFS connection, search for centralizing HDFS metadata on Talend Help Center
    (https://help.talend.com).

    If you complete this step, you can skip the following steps about configuring
    tHDFSConfiguration because all the required fields
    should have been filled automatically.

  3. In the Version area, select
    the Hadoop distribution you need to connect to and its version.
  4. In the NameNode URI field,
    enter the location of the machine hosting the NameNode service of the cluster.
    If you are using WebHDFS, the location should be
    webhdfs://masternode:portnumber; WebHDFS with SSL is not
    supported yet.
  5. In the Username field, enter
    the authentication information used to connect to the HDFS system to be used.
    Note that the user name must be the same as you have put in the Spark configuration tab.

Configuring the connection to the MongoDB database to be used by Spark

  1. Double-click tMongoDBConfiguration to open its
    Component view.


    tMongoDBLookupInput_2.png
  2. From the DB Version list, select the version
    of the MongoDB database to be used.
  3. In the Server field and the Port field, enter corresponding information of the
    MongoDB database.
  4. In the Database field, enter the name of the
    database. This database must already exist.

Loading the movie data

  1. Double-click the tFixedFlowIput component to
    open its Component view.

    tMongoDBLookupInput_3.png

  2. Click the […] button next to Edit schema to open the schema editor.
  3. Click the [+] button to add the schema
    columns as shown in this image.

    tMongoDBLookupInput_4.png

  4. Click OK to validate these changes and accept
    the propagation prompted by the pop-up dialog box.
  5. In the Input repetition interval field, enter
    the time interval at the end of which tFixedFlowInput sends the movie data another time. This allows you
    to generate a stream of data.
  6. In the Mode area, select the Use Inline Content radio button and paste the following
    data into the Content field that is
    displayed.
    691;Dark City;1998;http://us.imdb.com/M/title-exact?imdb-title-118929;4
    1654;Chairman of the Board;1998;http://us.imdb.com/Title?Chairman+of+the+Board+(1998);6
    903;Afterglow;1997;http://us.imdb.com/M/title-exact?imdb-title-118566;3
    255;My Best Friend's Wedding;1997;http://us.imdb.com/M/title-exact?My+Best+Friend%27s+Wedding+(1997);2
    1538;All Over Me;1997;http://us.imdb.com/M/title-exact?All%20Over%20Me%20%281997%29;5

  7. In the Field separator field, enter a
    semicolon (;).

Extracting director data from MongoDB

  1. Double-click tMongoDBLookupInput to open its
    Component view.

    tMongoDBLookupInput_5.png

  2. Click the […] button next to Edit schema to open the schema editor.
  3. Click the [+] button to add the schema
    columns as shown in this image.

    tMongoDBLookupInput_6.png

  4. In the Collection field, enter the name of
    the collection from which tMongoDBLookupInput
    extracts data.
  5. In the Query field, enter the following
    query."{'person.id':" + row2.directorID +"}"

    In this statement, row2 represents the main
    flow to tMap and
    row2.directorID
    the directorID
    column of this flow. You need to adapt this row2 to the label of the main flow link in your Job.
    The whole statement means to select every record in which the id field within the person field has the same value as this directorID column.
    The example above shows how to use the schema of the main flow to construct
    the SQL statement to load only the matched records into the lookup flow. This
    approach ensures that no redundant records are stored in memory before being
    sent to tMap.
  6. In the Mapping table, the id and the name
    columns have been automatically added. Enter, within double quotation marks,
    person in the Parent
    node path
    column for each row.

    This table defines how the hierarchical construct of the data from MongoDB
    should be interpreted in order to fit the schema of tMongoDBLookupInput.

Configuring the transformation in tMap

  1. Double-click tMap to open its
    Map Editor view.

Creating the output schema

  1. On the input side (left side) of the Map
    Editor
    , each of the two tables represents one of the input
    flow, the upper one for the main flow and the lower one for the lookup
    flow.

    On the output side (right side), the two tables represent the output flows
    that you named as out1 and reject previously.
    From the main flow table, drop the movieID,
    the title, the release and the url columns onto
    each of the output flow table.
  2. Drop as well the directorID column from the
    main flow table to the reject output
    table.
  3. From the lookup flow, drop the name column
    onto each of the output flow table.

    Then from the Schema editor view, you can see
    the schemas of the both sides have been completed.

Setting the mapping conditions

  1. From the main flow table, drop the directorID column onto the lookup table, in the Expr. key column of the id row.

    This defines the column used to provide join keys.
  2. On the lookup flow table, click the

    tMongoDBLookupInput_7.png

    button to open the setting panel in this table.

  3. Click the Value column of the Lookup model row to display the […] button and click this button to open the Options window.
  4. Select Reload at each row and click OK to validate this choice.
  5. Do the same in the Join model row to display
    the corresponding Options window.
  6. Select Inner Join to ensure that only the
    matched records between the main flow and the lookup flow are outputted.
  7. On the reject output flow table, click
    the

    tMongoDBLookupInput_7.png

    button to open the setting panel.

  8. In the Catch lookup inner join reject row,
    click the Value column to display the […] button and click this button to open the Options window.
  9. Select true to send the records filtered out
    by the inner join into the reject flow and
    click OK to validate this choice.
  10. Click Apply, then click OK to validate these changes and accept the
    propagation prompted by the pop-up dialog box.

Writing processed data to MongoDB

  1. Double-click tMongoDBOutput to open its
    Component view.

    tMongoDBLookupInput_9.png

  2. If this component does not have the same schema of the preceding
    component, a warning icon appears. In this situation, click the Sync columns button to retrieve the schema from the
    preceding one and once done, the warning icon disappears.
  3. In the Collection field, enter the name of
    the collection to which you need to write data. If this collection does not
    exist, it will be automatically created at runtime.
  4. From the Action on data list, select the
    operation to be performed on the data. In this example, select Insert, which creates documents in MongoDB whether these
    documents already exist or not and in either case, generates a new technical ID
    for each of the new documents.
  5. Leave the Mapping table as is. This adds each
    record to the root of each document.

Writing rejected data to tLogRow

  1. Double-click tLogRow to open its
    Component view.

    tMongoDBLookupInput_10.png

  2. If this component does not have the same schema of the preceding
    component, a warning icon appears. In this situation, click the Sync columns button to retrieve the schema from the
    preceding one and once done, the warning icon disappears.
  3. Select the Table radio button to present the
    result in a table.

Executing the Job

Then you can press F6 to run this Job.

Once done, in the console of the Run view, you can
see the data rejected by inner join.

tMongoDBLookupInput_11.png

This data is displayed for several times because tFixedFlowInput has created a data stream by regularly sending out the same
records.

Note that you can manage the level of the execution information to be outputted in
this console by selecting the log4jLevel check box in
the Advanced settings tab and then selecting the level
of the information you want to display.

For more information on the log4j logging levels, see the Apache documentation at http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html.

In the default MongoDB database, you can check the
documents that have been created in the movie
collection.

The movie information now contains the names instead of the IDs of their directors and
the same records have been written several times in the collection but their technical
IDs (the _id field) are all distinct.


Document get from Talend https://help.talend.com
Thank you for watching.
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x