In some situations, the incoming message may not contain all of the data that will be necessary to be processed correctly, therefore additional information will need to be acquired and added to the message using the Content Enricher integration pattern.
When transmitting messages from one application to another it can be common that the target system requires more information than the source system has provided. As an example, an incoming Address message may only contain a ZIP code because storing a redundant state code would be considered superfluous. Yet another system may not actually use state codes, but spell the state name out because it uses free-form addresses in order to support international addresses.
There are many examples where our messages must be enriched with data derived from other systems.
Topics in this Section:
Content Enrichment Support
There exist several manners to enrich content:
Message translator with arbitrary processor in the routing logic
The
enrich()
method, this method will obtain additional data from the external resource by sending a copy of the current exchange to a producer endpoint and then using the data in the resulting reply. The exchange created by the enricher is always an InOut exchange.The
pollEnrich()
method, this method will obtain additional data by polling a consumer endpoint for additional information. Thus, the implication is that the consumer endpoint (main route) and the consumer endpoint inpollEnrich()
operation are coupled. Therefore, an incoming message on the initial consumer in the route triggers thepollEnrich()
method on the consumer to be polled.
Use of Enrich Method
The content enriched can use the (enrich
) option to retrieve additional data from a resource endpoint in order to enrich an incoming message (contained in the original exchange). We can use an aggregation strategy to combine the original exchange and the resource exchange.
AggregationStrategy aggregationStrategy = ... from("direct:start") .enrich("direct:resource", aggregationStrategy) .to("direct:result"); from("direct:resource")
NOTE: The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange)
method corresponds to the original exchange, and the second parameter to the resource exchange. The results from the resource endpoint are stored in the resource exchange’s Out message.
Here is an example of an Aggregation Strategy class:
public class ExampleAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange original, Exchange resource) { Object origBody = original.getIn().getBody(); Object theResponse = resource.getOut().getBody(); Object mergeResult = ... // combine original body and resource response if (original.getPattern().isOutCapable()) { original.getOut().setBody(mergeResult); } else { original.getIn().setBody(mergeResult); } return original; } }
The same solution is implemented in XML DSL:
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich strategyRef="aggregationStrategy"> <constant>direct:resource</constant> <to uri="direct:result"/> </route> <route> <from uri="direct:resource"/> ... </route> </camelContext> <bean id="aggregationStrategy" class="..." />
Default Aggregation with Enrichment
The use of aggregation strategy in our integration is optional. If one is not provided, then Apache Camel will use the body obtained from the resource by default:
<route> <from uri="direct:start"/> <enrich uri="direct:resource"/> <to uri="direct:result"/> </route>
NOTE: The message sent to the direct:result
endpoint will contain the output from the direct:resource
, because in this scenario the contenxt does not use any custom aggregation.
Enrich Options
Name | Default Value | Description |
---|---|---|
expression | None | Specify an expression for configuring the URI of the external service to enrich from. |
uri | This options have been deprecated. Specify the | |
ref | Refers to the endpoint for the external service to enrich from. You must use either | |
strategyRef | Refers to an AggregationStrategy to be used to merge the reply from the external service into a single outgoing message. | |
StrategyMethodName | When using POJOs as the | |
strategyMethodAllowNull | false | The default behavior is that the aggregate method is not used if there is no data to enrich. |
aggregateOnException | false | The default behavior is that the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich. |
shareUnitOfWork | false | Default behavior is that the enrich operation does not share the unit of work between the parent exchange and the resource exchange. |
cacheSize | 1000 | Option to configure the cache size for the |
ignoreInvalidEndpoint | false | Option indicates whether or not to ignore an endpoint URI that cannot be resolved |
Aggregation Strategy with Enrich
By default, the enrich()
method will retrieve additional data from a resource endpoint to enrich an incoming message that is contained in the original exchange. You can use an aggregation strategy to combine the original exchange and the resource exchange.
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> <enrich strategyRef="aggregationStrategy"> <constant>direct:resource</constant> </enrich> <to uri="direct:result"/> </route> <route> <from uri="direct:resource"/> ... </route> </camelContext> <bean id="aggregationStrategy" class="..." />
The Aggregation Strategy could be:
public class ExampleAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange original, Exchange resource) { Object origBody = original.getIn().getBody(); Object theResponse = resource.getIn().getBody(); Object mergeResult = ... // combine original body and resource response if (original.getPattern().isOutCapable()) { original.getOut().setBody(mergeResult); } else { original.getIn().setBody(mergeResult); } return original; } }
Using Dynamic URIs with Enrich
Both the enrich()
and pollEnrich()
methods support the use of dynamic URIs that are computed based on information from the current exchange. For example, to enrich from an HTTP endpoint where the header with the orderId
key is used as part of the content path of the HTTP URL:
from("direct:start") .enrich().simple("http:myserver/${header.orderId}/order") .to("direct:result");
Or in XML DSL as:
<route> <from uri="direct:start"/> <enrich> <simple>http:myserver/${header.orderId}/order</simple> </enrich> <to uri="direct:result"/> </route>
Use of pollEnrich Method
The pollEnrich
command will utilize the resource endpoint as a consumer. Therefore, instead of sending an exchange to the resource endpoint, it polls the endpoint and (by default) the poll returns immediately if there is no exchange available from the resource endpoint.
from("activemq:queue:order") .pollEnrich("file://order/data/additional?fileName=orderId") .to("bean:processOrder");
We can limit the time to wait for the file to be ready:
from("activemq:queue:order") .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds .to("bean:processOrder");
Polling Methods
The pollEnrich()
method will poll our consumer endpoints by using one of the following poling methods:
receiveNoWait()
(This is the default.)receive()
receive(long timeout)
Additionally, the timeout parameter of pollEnrich()
command is specified in milliseconds and the evaluation of its value will determine which method to call:
When the timeout is
0
or not specified,pollEnrich()
callsreceiveNoWait
.When the timeout is negative,
pollEnrich()
callsreceive
.Otherwise,
pollEnrich()
callsreceive(timeout)
.
pollEnrich Examples
The following shows enrichment of the message by loading the content from the inbox/data.txt
file:
from("direct:start") .pollEnrich("file:inbox?fileName=data.txt") .to("direct:result");
Or, this example in XML DSL:
<route> <from uri="direct:start"/> <pollEnrich> <constant>file:inbox?fileName=data.txt"</constant> </pollEnrich> <to uri="direct:result"/> </route>
NOTE: If the specified file does not exist then the message is empty.
pollEnrich Timeout
We can specify a timeout to wait (potentially forever) until a file exists or to wait up to a particular length of time. In the following example, the command waits no more than 3 seconds:
<route> <from uri="direct:start"/> <pollEnrich timeout="3000"> <constant>file:inbox?fileName=data.txt"</constant> </pollEnrich> <to uri="direct:result"/> </route>