cAggregate
cAggregate Standard properties
These properties are used to configure cAggregate running in the Standard Job framework.
The Standard
cAggregate component belongs to the Routing family.
Basic settings
Language |
Select the language of the expression you want to use to filter your messages, from Select CorrelationID to use the |
Correlation expression/Expression |
Type in the expression that evaluates the correlation key to be This field disappears when CorrelationID is selected in the Language list. In this case, the existing correlation |
Correlation expression/Add Namespaces |
This option appears when XPath is Select this check box to add namespaces for the Xpath expression. |
Strategy |
Specify a Java bean to use as the aggregation strategy. |
Completion conditions/Number of messages |
Select this check box to specify the number of messages to Note:
By default, this check box is selected and the number of |
Completion conditions/Inactivity timeout (in milliseconds) |
Select this check box to specify the time (in milliseconds) that Note:
You can not use this option together with Scheduled interval. Only one of them |
Completion conditions/Scheduled interval (in milliseconds) |
Select this check box to specify a repeating period (in Note:
You cannot use this option together with Inactivity timeout. Only one of them can be used |
Completion conditions/Predicate matched |
Select this check box to specify a predicate to indicate when an aggregated exchange is complete. |
Completion conditions/Batch consumer |
Select this check box to aggregate all files consumed from a file endpoint in a given poll. |
Advanced settings
Check completion before aggregating |
Select this check box to check for completion when a new incoming exchange has been received. This option influences the behavior of the Predicate matched option as the exchange being passed in changes accordingly. When this option is disabled, the exchange passed in the predicate is the aggregated exchange which means any information you may store on the aggregated exchange from the aggregation strategy is available for the predicate. When this option is enabled, the exchange passed in the predicate is the incoming exchange, which means you can access data from the incoming exchange. |
Close correlation group |
Select this check box to indicate that if a correlation key has already been completed, then any new exchanges with the same correlation key will be denied. When using this option, enter a number in the Maximum bound field to keep that last number of closed correlation keys. |
Ignore invalid correlation key |
Select this check box to ignore the invalid correlation key which could not be evaluated to a value. By default Camel will throw an Exception on encountering an invalid correlation key. |
Group arriving exchange |
Select this check box to group all aggregated exchanges into a single combined holder class that holds all the aggregated exchanges. As a result only one exchange is being sent out from the aggregator. This option can be used to combine many incoming exchanges into a single output exchange. |
Use persistence |
Select this check box to plug in your own implementation of the repository which keeps track of the current in-flight aggregated exchanges. By default, Camel uses a memory based implementation. |
Repository |
This field appears when the Use
AggregationRepository:
HawtDBAggregationRepository:
Use persistent file: Select this
Recovery/Use recovery: Select
RecoverableAggregationRepository: With this repository selected, the following options
Recovery/Use recovery: Select |
Usage
Usage rule |
cAggregate is used as a middle or |
Connections |
Aggregate: select this link to route
Route: select this link to route all the messages from the |
Aggregating three messages into one
This scenario applies only to Talend Open Studio for ESB, Talend Data Services Platform and Talend Data Fabric.
In this scenario, the cAggregate component combines
three messages from the local file system into one and prints the messages in the
console. A Java bean will be used as the aggregation strategy.
Creating a Java bean as the aggregation strategy
To aggregate the messages, we will use a Java bean that will help us build an
aggregation strategy.
-
From the repository tree view, expand the Code node and right click the Beans node. In the contextual menu, select Create Bean.
-
The New Bean wizard opens. In the Name field, type in a name for the bean, for
example, AggregateBody. Click Finish to close the wizard. -
Type in the codes as shown in the figure below. In this use case, we just
want to aggregate all messages into a single message.1234567891011121314151617package beans;import org.apache.camel.Exchange;import org.apache.camel.processor.aggregate.AggregationStrategy;public class AggregateBody implements AggregationStrategy{public Exchange aggregate(Exchange oldEx, Exchange newEx) {if(oldEx==null){return newEx;}String oldBody = oldEx.getIn().getBody(String.class);String newBody = newEx.getIn().getBody(String.class);newEx.getIn().setBody(oldBody+newBody);return newEx;}} - Press Ctrl+S to save your bean.
Dropping and linking the components
- From the Palette, expand the Connectivity folder, and drop a cFile component onto the design workspace.
-
Expand the Routing folder, and drop a
cAggregate component onto the design
workspace. -
Expand the Custom folder, and drop two cProcessor components onto the design
workspace. -
Right-click the cFile component, select
Row > Route from the contextual menu and click the first cProcessor component. - Repeat this operation to connect the first cProcessor component to the cAggregate component.
-
Right-click the cAggregate component,
select Row > Aggregate from the contextual menu and click the second
cProcessor component. -
Label all the components to better identify their functionality, as shown
above.
Configuring the components
-
Double-click the cFile component, which
is labelled File_source, to display its
Basic settings view in the Component tab. -
In the Path field, browse to or enter the
input file path, and leave the other parameters as they are.In this scenario, there are four text files in the specified directory:
a.txt, b.txt,
c.txt and d.txt, the contents
of which are This is a! , This is b!
, This is c! , and
This is d! respectively. -
Double-click the cAggregate component,
which is labelled Aggregator, to display
its Basic settings view in the Component tab.
-
In the Language field, select Constant or Simple as the expression language.
In the Expression field, enter the expression
"getBody(String.class)"
to retrieve the body of the message.In the Strategy field, enter the name of the Java bean AggregateBody you just created.Select the Number of messages check box and type in 2 in the field. -
Double-click the cProcessor component
labelled Monitor_before to display its
Basic settings view in the Component tab. -
In the Code box, customize the code as
follows so that the Run console displays
the message contents before an aggregation takes place:12System.out.println("Before aggregation: "+exchange.getIn().getBody(String.class)); -
In the same way, configure the cProcessor
component labelled Monitor_after so that
the Run console displays the message contents after an aggregation takes
place:12System.out.println("After aggregation: "+exchange.getIn().getBody(String.class)); - Press Ctrl+S to save your route.
Viewing code and executing the Route
-
Click the Code tab at the bottom of the
design workspace to have a look at the generated code.As shown in the code, a messagefrom
the
File_source
endpoint is routed via
cProcessor_1
and then aggregated according to the condition
.aggregate
. -
Click the Run view to display it and
click the Run button to launch the
execution of your route. You can also press F6 to execute it.RESULT: The four messages are aggregated in two batches, two messages
combined into one each batch.