...
There are many examples where our messages must be enriched with data derived from other systems.
...
Topics in this Section:
Table of Contents |
---|
Content Enrichment Support
There exist several manners to enrich content:
Message translator with arbitrary processor in the routing logic
The
enrich()
method, this method will obtain additional data from the external resource by sending a copy of the current exchange to a producer endpoint and then using the data in the resulting reply. The exchange created by the enricher is always an InOut exchange.The
pollEnrich()
method, this method will obtain additional data by polling a consumer endpoint for additional information. Thus, the implication is that the consumer endpoint (main route) and the consumer endpoint inpollEnrich()
operation are coupled. Therefore, an incoming message on the initial consumer in the route triggers thepollEnrich()
method on the consumer to be polled.
Use of Enrich Method
The content enriched can use the (enrich
) option to retrieve additional data from a resource endpoint in order to enrich an incoming message (contained in the original exchange). We can use an aggregation strategy to combine the original exchange and the resource exchange.
Code Block |
---|
AggregationStrategy aggregationStrategy = ...
from("direct:start")
.enrich("direct:resource", aggregationStrategy)
.to("direct:result");
from("direct:resource") |
NOTE: The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange)
method corresponds to the original exchange, and the second parameter to the resource exchange. The results from the resource endpoint are stored in the resource exchange’s Out message.
Here is an example of an Aggregation Strategy class:
Code Block |
---|
public class ExampleAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange original, Exchange resource) {
Object origBody = original.getIn().getBody();
Object theResponse = resource.getOut().getBody();
Object mergeResult = ... // combine original body and resource response
if (original.getPattern().isOutCapable()) {
original.getOut().setBody(mergeResult);
} else {
original.getIn().setBody(mergeResult);
}
return original;
}
} |
The same solution is implemented in XML DSL:
Code Block |
---|
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<enrich strategyRef="aggregationStrategy">
<constant>direct:resource</constant>
<to uri="direct:result"/>
</route>
<route>
<from uri="direct:resource"/>
...
</route>
</camelContext>
<bean id="aggregationStrategy" class="..." /> |
Default Aggregation with Enrichment
The use of aggregation strategy in our integration is optional. If one is not provided, then Apache Camel will use the body obtained from the resource by default:
Code Block |
---|
<route>
<from uri="direct:start"/>
<enrich uri="direct:resource"/>
<to uri="direct:result"/>
</route> |
NOTE: The message sent to the direct:result
endpoint will contain the output from the direct:resource
, because in this scenario the contenxt does not use any custom aggregation.
Enrich Options
Name | Default Value | Description |
---|---|---|
expression | None | Specify an expression for configuring the URI of the external service to enrich from. |
uri | This options have been deprecated. Specify the | |
ref | Refers to the endpoint for the external service to enrich from. You must use either | |
strategyRef | Refers to an AggregationStrategy to be used to merge the reply from the external service into a single outgoing message. | |
StrategyMethodName | When using POJOs as the | |
strategyMethodAllowNull | false | The default behavior is that the aggregate method is not used if there is no data to enrich. |
aggregateOnException | false | The default behavior is that the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich. |
shareUnitOfWork | false | Default behavior is that the enrich operation does not share the unit of work between the parent exchange and the resource exchange. |
cacheSize | 1000 | Option to configure the cache size for the |
ignoreInvalidEndpoint | false | Option indicates whether or not to ignore an endpoint URI that cannot be resolved |
Aggregation Strategy with Enrich
By default, the enrich()
method will retrieve additional data from a resource endpoint to enrich an incoming message that is contained in the original exchange. You can use an aggregation strategy to combine the original exchange and the resource exchange.
Code Block |
---|
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<enrich strategyRef="aggregationStrategy">
<constant>direct:resource</constant>
</enrich>
<to uri="direct:result"/>
</route>
<route>
<from uri="direct:resource"/>
...
</route>
</camelContext>
<bean id="aggregationStrategy" class="..." /> |
The Aggregation Strategy could be:
Code Block |
---|
public class ExampleAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange original, Exchange resource) {
Object origBody = original.getIn().getBody();
Object theResponse = resource.getIn().getBody();
Object mergeResult = ... // combine original body and resource response
if (original.getPattern().isOutCapable()) {
original.getOut().setBody(mergeResult);
} else {
original.getIn().setBody(mergeResult);
}
return original;
}
} |
Using Dynamic URIs with Enrich
Both the enrich()
and pollEnrich()
methods support the use of dynamic URIs that are computed based on information from the current exchange. For example, to enrich from an HTTP endpoint where the header with the orderId
key is used as part of the content path of the HTTP URL:
Code Block |
---|
from("direct:start")
.enrich().simple("http:myserver/${header.orderId}/order")
.to("direct:result"); |
Or in XML DSL as:
Code Block |
---|
<route>
<from uri="direct:start"/>
<enrich>
<simple>http:myserver/${header.orderId}/order</simple>
</enrich>
<to uri="direct:result"/>
</route> |
Use of pollEnrich Method
The pollEnrich
command will utilize the resource endpoint as a consumer. Therefore, instead of sending an exchange to the resource endpoint, it polls the endpoint and (by default) the poll returns immediately if there is no exchange available from the resource endpoint.
Code Block |
---|
from("activemq:queue:order")
.pollEnrich("file://order/data/additional?fileName=orderId")
.to("bean:processOrder"); |
We can limit the time to wait for the file to be ready:
Code Block |
---|
from("activemq:queue:order")
.pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds
.to("bean:processOrder"); |
Polling Methods
The pollEnrich()
method will poll our consumer endpoints by using one of the following poling methods:
receiveNoWait()
(This is the default.)receive()
receive(long timeout)
Additionally, the timeout parameter of pollEnrich()
command is specified in milliseconds and the evaluation of its value will determine which method to call:
When the timeout is
0
or not specified,pollEnrich()
callsreceiveNoWait
.When the timeout is negative,
pollEnrich()
callsreceive
.Otherwise,
pollEnrich()
callsreceive(timeout)
.
pollEnrich Examples
The following shows enrichment of the message by loading the content from the inbox/data.txt
file:
Code Block |
---|
from("direct:start")
.pollEnrich("file:inbox?fileName=data.txt")
.to("direct:result"); |
Or, this example in XML DSL:
Code Block |
---|
<route>
<from uri="direct:start"/>
<pollEnrich>
<constant>file:inbox?fileName=data.txt"</constant>
</pollEnrich>
<to uri="direct:result"/>
</route> |
NOTE: If the specified file does not exist then the message is empty.
pollEnrich Timeout
We can specify a timeout to wait (potentially forever) until a file exists or to wait up to a particular length of time. In the following example, the command waits no more than 3 seconds:
Code Block |
---|
<route>
<from uri="direct:start"/>
<pollEnrich timeout="3000">
<constant>file:inbox?fileName=data.txt"</constant>
</pollEnrich>
<to uri="direct:result"/>
</route> |
Dynamic URI with pollEnrich
Both the enrich()
and pollEnrich()
methods support the use of dynamic URIs. The value of this type of URI is computed based on information from the current exchange. As an illustration, to poll enrich from an endpoint that uses a header to indicate a SEDA queue name, you can do something like this:
Code Block |
---|
from("direct:start")
.pollEnrich().simple("seda:${header.name}")
.to("direct:result"); |
Or, shown here in XML DSL:
Code Block |
---|
<route>
<from uri="direct:start"/>
<pollEnrich>
<simple>seda${header.name}</simple>
</pollEnrich>
<to uri="direct:result"/>
</route> |
pollEnrich Options
Name | Default Value | Description |
---|---|---|
expression | None | Specify an expression for configuring the URI of the external service to enrich from. |
uri | This options have been deprecated. Specify the | |
ref | Refers to the endpoint for the external service to enrich from. You must use either | |
strategyRef | Refers to an AggregationStrategy to be used to merge the reply from the external service into a single outgoing message. | |
StrategyMethodName | When using POJOs as the | |
strategyMethodAllowNull | false | The default behavior is that the aggregate method is not used if there is no data to enrich. |
aggregateOnException | false | The default behavior is that the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich. |
shareUnitOfWork | false | Default behavior is that the enrich operation does not share the unit of work between the parent exchange and the resource exchange. |
cacheSize | 1000 | Option to configure the cache size for the |
ignoreInvalidEndpoint | false | Option indicates whether or not to ignore an endpoint URI that cannot be resolved |