August 16, 2023

tMongoDBLookupInput – Docs for ESB 6.x

tMongoDBLookupInput

Executes a database query with a strictly defined order which must correspond to
the schema definition.

tMongoDBLookupInput
reads a database and extracts fields based on a query.

tMongoDBLookupInput properties for Apache Spark Streaming

These properties are used to configure tMongoDBLookupInput running in the Spark Streaming Job framework.

The Spark Streaming
tMongoDBLookupInput component belongs to the Databases family.

The component in this framework is available only if you have
subscribed to Talend Real-Time Big Data Platform or Talend Data Fabric .

Basic settings

Property type

Either Built-In or Repository.

Built-In: No property data stored centrally.

Repository: Select the repository file where the
properties are stored.

MongoDB configuration

Select this check box and in the Component
List
click the relevant connection component to reuse the connection
details you already defined.

Schema and Edit
Schema

A schema is a row description. It defines the number of fields (columns) to
be processed and passed on to the next component. The schema is either Built-In or stored remotely in the Repository.

Click Edit schema to make changes to the schema.
If the current schema is of the Repository type, three
options are available:

  • View schema: choose this option to view the
    schema only.

  • Change to built-in property: choose this
    option to change the schema to Built-in for
    local changes.

  • Update repository connection: choose this
    option to change the schema stored in the repository and decide whether to propagate
    the changes to all the Jobs upon completion. If you just want to propagate the
    changes to the current Job, you can select No
    upon completion and choose this schema metadata again in the [Repository Content] window.

If a column in the database is a JSON document and you need to read
the entire document, put an asterisk (*) in the DB
column
column, without quotation marks around.

Collection

Enter the name of the collection to be used.

A MongoDB collection is the equivalent of an RDBMS table and contains
documents.

Set read preference

Select this check box and from the Read preference
drop-down list that is displayed, select the member to which you need to direct the read
operations.

If you leave this check box clear, the Job uses the default Read preference, that is to
say, uses the primary member in a replica set.

For further information, see MongoDB’s documentation about Replication and its Read
preferences.

Query

Specify the query statement to select documents from the collection specified in the
Collection field.

For
example

In this code, row1 is not the label of the link to
tMongoDBLookupInput, but represents the main row
entering into tMap.

The result of the query must contain only records that match join key you need to use in
tMap. In other words, you must use the schema of the
main flow to tMap to construct the SQL statement here in
order to load only the matched records into the lookup flow.

This approach ensures that no redundant records are loaded into memory and outputted to
the component that follows.

Mapping

Each column of the schema defined for this component represents a field of the documents
to be read. In this table, you need to specify the parent nodes of these fields, if
any.

For example, in the document reading as
follows
The
first and the last
fields have person as their parent node but the _id field does not have any parent node. So once completed, this
Mapping table should read as
follows:

Limit

Enter the maximum number of records to be retrieved.

Advanced settings

No query timeout

Select this check box to prevent MongoDB servers from stopping idle
cursors at the end of 10-minute inactivity of these cursors. In this
situation, an idle cursor will stay open until either the results of
this cursor are exhausted or you manually close it using the
cursor.close() method.

A cursor for MongoDB is a pointer to the result set of a query. By
default, that is to say, with this check box being clear, a MongoDB
server automatically stops idle cursors after a given inactivity period
to avoid excess memory use. For further information about MongoDB
cursors, see https://docs.mongodb.org/manual/core/cursors/.

Usage

Usage rule

This component is used as a start component and requires an output link.

This component should use a tMongoDBConfiguration component present in the same Job to connect
to a MongoDB database. You need to drop a tMongoDBConfiguration component alongside this component and
configure the Basic settings of this component
to use tMongoDBConfiguration.

This component, along with the Spark Streaming component Palette it belongs to, appears
only when you are creating a Spark Streaming Job.

Note that in this documentation, unless otherwise explicitly stated, a scenario presents
only Standard Jobs, that is to say traditional
Talend
data
integration Jobs.

Spark Connection

You need to use the Spark Configuration tab in
the Run view to define the connection to a given
Spark cluster for the whole Job. In addition, since the Job expects its dependent jar
files for execution, you must specify the directory in the file system to which these
jar files are transferred so that Spark can access these files:

  • Yarn mode: when using Google
    Dataproc, specify a bucket in the Google Storage staging
    bucket
    field in the Spark
    configuration
    tab; when using other distributions, use a
    tHDFSConfiguration
    component to specify the directory.

  • Standalone mode: you need to choose
    the configuration component depending on the file system you are using, such
    as tHDFSConfiguration
    or tS3Configuration.

This connection is effective on a per-Job basis.

Reading and writing data in MongoDB using a Spark Streaming Job

This scenario applies only to Talend Real-time Big Data Platform or Talend Data Fabric.

In this scenario, you create a Spark Streaming Job to extract data about given movie
directors from MongoDB, use this data to filter and complete movie information and then
write the result into a MongoDB collection.

use_case-mongodblookupinput-spark_stream1.png
The sample data about movie directors reads as
follows:

This data contains the names of these directors and the ID numbers distributed to
them.

The structure of this data in MongoDB reads as
follows:

Note that the sample data is created for demonstration purposes only.

Prerequisites:

  • The Spark cluster and the MongoDB database to be used have been properly
    installed and are running.

  • The above-mentioned data has been loaded in the MongoDB collection to be
    used.

To replicate this scenario, proceed as follows:

Linking the components

  1. In the
    Integration
    perspective of the Studio, create an empty
    Spark Batch Job from the Job Designs node in
    the Repository tree view.

    For further information about how to create a Spark Streaming Job, see

    Talend Open Studio for Big Data Getting Started
    Guide

    .
  2. In the workspace, enter the name of the component to be used and select this
    component from the list that appears. In this scenario, the components are
    tHDFSConfiguration, tMongoDBConfiguration, tFixedFlowInput, tMongoDBOutput,
    tMongoDBLookupInput, tMap and tLogRow.

    The tFixedFlowInput components are used to
    load the data about movies into the data flow. In the real-world practice, you
    can use other components such as tFileInputDelimited instead to design a sophisticated process to
    prepare your data to be processed.
  3. Connect tFixedFlowInput to tMap using the Row >
    Main
    link.

    This way, the main flow to tMap is created.
    The movie information is sent via this flow.
  4. Connect tMongoDBLookupInput to tMap using the Row >
    Main
    link.

    This way, the lookup flow to tMap is created.
    The movie director information is sent via this flow.
  5. Connect tMap to tMongoDBOutput using the Row > Main
    link and name this connection in the dialog box that is displayed. For example,
    name it to out1.
  6. Do the same to connect tMap to tLogRow and name this connection to reject.
  7. Leave tHDFSConfiguration and tMongoDBConfiguration alone without any
    connection.

Selecting the Spark mode

Depending on the Spark cluster to be used, select a Spark mode for your Job.
  1. Click Run to open its view and then click the
    Spark Configuration tab to display its view
    for configuring the Spark connection.
  2. Select the Use local mode check box to test your Job locally.

    In the local mode, the Studio builds the Spark environment in itself on the fly in order to
    run the Job in. Each processor of the local machine is used as a Spark
    worker to perform the computations.

    In this mode, your local file system is used; therefore, deactivate the
    configuration components such as tS3Configuration or
    tHDFSConfiguration that provides connection
    information to a remote file system, if you have placed these components
    in your Job.

    You can launch
    your Job without any further configuration.

  3. Clear the Use local mode check box to display the
    list of the available Hadoop distributions and from this list, select
    the distribution corresponding to your Spark cluster to be used.

    If you cannot find the distribution corresponding to yours from this
    drop-down list, this means the distribution you want to connect to is not officially
    supported by
    Talend
    . In this situation, you can select Custom, then select the Spark
    version
    of the cluster to be connected and click the
    [+] button to display the dialog box in which you can
    alternatively:

    1. Select Import from existing
      version
      to import an officially supported distribution as base
      and then add other required jar files which the base distribution does not
      provide.

    2. Select Import from zip to
      import the configuration zip for the custom distribution to be used. This zip
      file should contain the libraries of the different Hadoop/Spark elements and the
      index file of these libraries.

      In
      Talend

      Exchange, members of
      Talend
      community have shared some ready-for-use configuration zip files
      which you can download from this Hadoop configuration
      list and directly use them in your connection accordingly. However, because of
      the ongoing evolution of the different Hadoop-related projects, you might not be
      able to find the configuration zip corresponding to your distribution from this
      list; then it is recommended to use the Import from
      existing version
      option to take an existing distribution as base
      to add the jars required by your distribution.

      Note that custom versions are not officially supported by

      Talend
      .
      Talend
      and its community provide you with the opportunity to connect to
      custom versions from the Studio but cannot guarantee that the configuration of
      whichever version you choose will be easy. As such, you should only attempt to
      set up such a connection if you have sufficient Hadoop and Spark experience to
      handle any issues on your own.

    For a step-by-step example about how to connect to a custom
    distribution and share this connection, see Connecting to a custom Hadoop distribution.

Configuring a Spark stream for your Apache Spark streaming Job

Define how often your Spark Job creates and processes micro batches.
  1. In the Batch size field, enter the time
    interval at the end of which the Job reviews the source data to identify changes and
    processes the new micro batches.
  2. If needs be, select the Define a streaming
    timeout
    check box and in the field that is displayed, enter the time frame
    at the end of which the streaming Job automatically stops running.

Configuring the connection to the file system to be used by Spark

  1. Double-click tHDFSConfiguration to open its
    Component view. Note that tHDFSConfiguration is used because the Spark Yarn client mode is used to run Spark Jobs in this scenario.

    Spark uses this component to connect to the HDFS system to which the jar
    files dependent on the Job are transferred.

  2. In the Version area, select the Hadoop distribution
    you need to connect to and its version.
  3. In the NameNode URI field, enter the location of the
    machine hosting the NameNode service of the cluster. If you are using WebHDFS, the location should be
    webhdfs://masternode:portnumber; if this WebHDFS is secured
    with SSL, the scheme should be swebhdfs and you need to use
    a tLibraryLoad in the Job to load the library required by
    the secured WebHDFS.
  4. In the Username field, enter the
    authentication information used to connect to the HDFS system to be used. Note
    that the user name must be the same as you have put in the Spark configuration tab.

Configuring the connection to the MongoDB database to be used by Spark

  1. Double-click tMongoDBConfiguration to open its
    Component view.


    use_case-mongodbinput_spark_batch2.png
  2. From the DB Version list, select the version
    of the MongoDB database to be used.
  3. In the Server field and the Port field, enter corresponding information of the
    MongoDB database.
  4. In the Database field, enter the name of the
    database. This database must already exist.

Loading the movie data

  1. Double-click the tFixedFlowIput component to
    open its Component view.

    use_case-mongodblookupinput-spark_stream2.png

  2. Click the […] button next to Edit schema to open the schema editor.
  3. Click the [+] button to add the schema
    columns as shown in this image.

    use_case-mongodblookupinput-spark_stream3.png

  4. Click OK to validate these changes and accept
    the propagation prompted by the pop-up dialog box.
  5. In the Input repetition interval field, enter
    the time interval at the end of which tFixedFlowInput sends the movie data another time. This allows you
    to generate a stream of data.
  6. In the Mode area, select the Use Inline Content radio button and paste the following
    data into the Content field that is
    displayed.
    691;Dark City;1998;http://us.imdb.com/M/title-exact?imdb-title-118929;4
    1654;Chairman of the Board;1998;http://us.imdb.com/Title?Chairman+of+the+Board+(1998);6
    903;Afterglow;1997;http://us.imdb.com/M/title-exact?imdb-title-118566;3
    255;My Best Friend's Wedding;1997;http://us.imdb.com/M/title-exact?My+Best+Friend%27s+Wedding+(1997);2
    1538;All Over Me;1997;http://us.imdb.com/M/title-exact?All%20Over%20Me%20%281997%29;5

  7. In the Field separator field, enter a
    semicolon (;).

Extracting director data from MongoDB

  1. Double-click tMongoDBLookupInput to open its
    Component view.

    use_case-mongodblookupinput-spark_stream4.png

  2. Click the […] button next to Edit schema to open the schema editor.
  3. Click the [+] button to add the schema
    columns as shown in this image.

    use_case-mongodblookupinput-spark_stream5.png

  4. In the Collection field, enter the name of
    the collection from which tMongoDBLookupInput
    extracts data.
  5. In the Query field, enter the following
    query."{'person.id':" + row2.directorID +"}"

    In this statement, row2 represents the main
    flow to tMap and
    row2.directorID
    the directorID
    column of this flow. You need to adapt this row2 to the label of the main flow link in your Job.
    The whole statement means to select every record in which the id field within the person field has the same value as this directorID column.
    The example above shows how to use the schema of the main flow to construct
    the SQL statement to load only the matched records into the lookup flow. This
    approach ensures that no redundant records are stored in memory before being
    sent to tMap.
  6. In the Mapping table, the id and the name
    columns have been automatically added. Enter, within double quotation marks,
    person in the Parent
    node path
    column for each row.

    This table defines how the hierarchical construct of the data from MongoDB
    should be interpreted in order to fit the schema of tMongoDBLookupInput.

Configuring the transformation in tMap

  1. Double-click tMap to open its
    Map Editor view.

Creating the output schema

  1. On the input side (left side) of the Map
    Editor
    , each of the two tables represents one of the input
    flow, the upper one for the main flow and the lower one for the lookup
    flow.

    On the output side (right side), the two tables represent the output flows
    that you named as out1 and reject previously.
    From the main flow table, drop the movieID,
    the title, the release and the url columns onto
    each of the output flow table.
  2. Drop as well the directorID column from the
    main flow table to the reject output
    table.
  3. From the lookup flow, drop the name column
    onto each of the output flow table.

    Then from the Schema editor view, you can see
    the schemas of the both sides have been completed.

Setting the mapping conditions

  1. From the main flow table, drop the directorID column onto the lookup table, in the Expr. key column of the id row.

    This defines the column used to provide join keys.
  2. On the lookup flow table, click the

    Map_setting.png

    button to open the setting panel in this table.

  3. Click the Value column of the Lookup model row to display the […] button and click this button to open the [Options] window.
  4. Select Reload at each row and click OK to validate this choice.
  5. Do the same in the Join model row to display
    the corresponding [Options] window.
  6. Select Inner Join to ensure that only the
    matched records between the main flow and the lookup flow are outputted.
  7. On the reject output flow table, click
    the

    Map_setting.png

    button to open the setting panel.

  8. In the Catch lookup inner join reject row,
    click the Value column to display the […] button and click this button to open the [Options] window.
  9. Select true to send the records filtered out
    by the inner join into the reject flow and
    click OK to validate this choice.
  10. Click Apply, then click OK to validate these changes and accept the
    propagation prompted by the pop-up dialog box.

Writing processed data to MongoDB

  1. Double-click tMongoDBOutput to open its
    Component view.

    use_case-mongodblookupinput-spark_stream7.png

  2. If this component does not have the same schema of the preceding
    component, a warning icon appears. In this situation, click the Sync columns button to retrieve the schema from the
    preceding one and once done, the warning icon disappears.
  3. In the Collection field, enter the name of
    the collection to which you need to write data. If this collection does not
    exist, it will be automatically created at runtime.
  4. From the Action on data list, select the
    operation to be performed on the data. In this example, select Insert, which creates documents in MongoDB whether these
    documents already exist or not and in either case, generates a new technical ID
    for each of the new documents.
  5. Leave the Mapping table as is. This adds each
    record to the root of each document.

Writing rejected data to tLogRow

  1. Double-click tLogRow to open its
    Component view.

    use_case-mongodblookupinput-spark_stream8.png

  2. If this component does not have the same schema of the preceding
    component, a warning icon appears. In this situation, click the Sync columns button to retrieve the schema from the
    preceding one and once done, the warning icon disappears.
  3. Select the Table radio button to present the
    result in a table.

Executing the Job

Then you can press F6 to run this Job.

Once done, in the console of the Run view, you can
see the data rejected by inner join.

use_case-mongodblookupinput-spark_stream9.png

This data is displayed for several times because tFixedFlowInput has created a data stream by regularly sending out the same
records.

Note that you can manage the level of the execution information to be outputted in
this console by selecting the log4jLevel check box in
the Advanced settings tab and then selecting the level
of the information you want to display.

For more information on the log4j logging levels, see the Apache documentation at http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html.

In the default MongoDB database, you can check the
documents that have been created in the movie
collection.

The movie information now contains the names instead of the IDs of their directors and
the same records have been written several times in the collection but their technical
IDs (the _id field) are all distinct.


Document get from Talend https://help.talend.com
Thank you for watching.
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x