Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Current »

In some situations, the incoming message may not contain all of the data that will be necessary to be processed correctly, therefore additional information will need to be acquired and added to the message using the Content Enricher integration pattern.

When transmitting messages from one application to another it can be common that the target system requires more information than the source system has provided. As an example, an incoming Address message may only contain a ZIP code because storing a redundant state code would be considered superfluous. Yet another system may not actually use state codes, but spell the state name out because it uses free-form addresses in order to support international addresses.

There are many examples where our messages must be enriched with data derived from other systems.

Topics in this Section:

Content Enrichment Support

There exist several manners to enrich content:

  • Message translator with arbitrary processor in the routing logic

  • The enrich() method, this method will obtain additional data from the external resource by sending a copy of the current exchange to a producer endpoint and then using the data in the resulting reply. The exchange created by the enricher is always an InOut exchange.

  • The pollEnrich() method, this method will obtain additional data by polling a consumer endpoint for additional information. Thus, the implication is that the consumer endpoint (main route) and the consumer endpoint in pollEnrich() operation are coupled. Therefore, an incoming message on the initial consumer in the route triggers the pollEnrich() method on the consumer to be polled.

Use of Enrich Method

The content enriched can use the (enrich) option to retrieve additional data from a resource endpoint in order to enrich an incoming message (contained in the original exchange). We can use an aggregation strategy to combine the original exchange and the resource exchange.

AggregationStrategy aggregationStrategy = ...

from("direct:start")
  .enrich("direct:resource", aggregationStrategy)
  .to("direct:result");

from("direct:resource")

NOTE: The first parameter of the AggregationStrategy.aggregate(Exchange, Exchange) method corresponds to the original exchange, and the second parameter to the resource exchange. The results from the resource endpoint are stored in the resource exchange’s Out message.

Here is an example of an Aggregation Strategy class:

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object origBody = original.getIn().getBody();
        Object theResponse = resource.getOut().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

The same solution is implemented in XML DSL:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>
 <bean id="aggregationStrategy" class="..." />

Default Aggregation with Enrichment

The use of aggregation strategy in our integration is optional. If one is not provided, then Apache Camel will use the body obtained from the resource by default:

<route>
    <from uri="direct:start"/>
    <enrich uri="direct:resource"/>
    <to uri="direct:result"/>
</route>

NOTE: The message sent to the direct:result endpoint will contain the output from the direct:resource, because in this scenario the contenxt does not use any custom aggregation.

Enrich Options

Name

Default Value

Description

expression

None

Specify an expression for configuring the URI of the external service to enrich from.

uri

This options have been deprecated. Specify the expression option instead.

ref

Refers to the endpoint for the external service to enrich from. You must use either uri or ref.

strategyRef

Refers to an AggregationStrategy to be used to merge the reply from the external service into a single outgoing message.

StrategyMethodName

When using POJOs as the AggregationStrategy, specify this option to explicitly declare the name of the aggregation method

strategyMethodAllowNull

false

The default behavior is that the aggregate method is not used if there is no data to enrich.

aggregateOnException

false

The default behavior is that the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich.

shareUnitOfWork

false

Default behavior is that the enrich operation does not share the unit of work between the parent exchange and the resource exchange.

cacheSize

1000

Option to configure the cache size for the ProducerCache, which caches producers for reuse in the enrich operation

ignoreInvalidEndpoint

false

Option indicates whether or not to ignore an endpoint URI that cannot be resolved

Aggregation Strategy with Enrich

By default, the enrich() method will retrieve additional data from a resource endpoint to enrich an incoming message that is contained in the original exchange. You can use an aggregation strategy to combine the original exchange and the resource exchange.

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <enrich strategyRef="aggregationStrategy">
      <constant>direct:resource</constant>
    </enrich>
    <to uri="direct:result"/>
  </route>
  <route>
    <from uri="direct:resource"/>
    ...
  </route>
</camelContext>

<bean id="aggregationStrategy" class="..." />

The Aggregation Strategy could be:

public class ExampleAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange original, Exchange resource) {
        Object origBody = original.getIn().getBody();
        Object theResponse = resource.getIn().getBody();
        Object mergeResult = ... // combine original body and resource response
        if (original.getPattern().isOutCapable()) {
            original.getOut().setBody(mergeResult);
        } else {
            original.getIn().setBody(mergeResult);
        }
        return original;
    }

}

Using Dynamic URIs with Enrich

Both the enrich() and pollEnrich() methods support the use of dynamic URIs that are computed based on information from the current exchange. For example, to enrich from an HTTP endpoint where the header with the orderId key is used as part of the content path of the HTTP URL:

from("direct:start")
  .enrich().simple("http:myserver/${header.orderId}/order")
  .to("direct:result");

Or in XML DSL as:

<route>
   <from uri="direct:start"/>
   <enrich>
      <simple>http:myserver/${header.orderId}/order</simple>
   </enrich>
   <to uri="direct:result"/>
</route>

Use of pollEnrich Method

The pollEnrich command will utilize the resource endpoint as a consumer. Therefore, instead of sending an exchange to the resource endpoint, it polls the endpoint and (by default) the poll returns immediately if there is no exchange available from the resource endpoint.

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId")
   .to("bean:processOrder");

We can limit the time to wait for the file to be ready:

from("activemq:queue:order")
   .pollEnrich("file://order/data/additional?fileName=orderId", 20000) // timeout is in milliseconds
   .to("bean:processOrder");

Polling Methods

The pollEnrich() method will poll our consumer endpoints by using one of the following poling methods:

  • receiveNoWait()(This is the default.)

  • receive()

  • receive(long timeout)

Additionally, the timeout parameter of pollEnrich() command is specified in milliseconds and the evaluation of its value will determine which method to call:

  • When the timeout is 0 or not specified, pollEnrich() calls receiveNoWait.

  • When the timeout is negative, pollEnrich() calls receive.

  • Otherwise, pollEnrich() calls receive(timeout).

pollEnrich Examples

The following shows enrichment of the message by loading the content from the inbox/data.txt file:

 from("direct:start")
   .pollEnrich("file:inbox?fileName=data.txt")
   .to("direct:result");

Or, this example in XML DSL:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

NOTE: If the specified file does not exist then the message is empty.

pollEnrich Timeout

We can specify a timeout to wait (potentially forever) until a file exists or to wait up to a particular length of time. In the following example, the command waits no more than 3 seconds:

<route>
   <from uri="direct:start"/>
   <pollEnrich timeout="3000">
      <constant>file:inbox?fileName=data.txt"</constant>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

Dynamic URI with pollEnrich

Both the enrich() and pollEnrich() methods support the use of dynamic URIs. The value of this type of URI is computed based on information from the current exchange. As an illustration, to poll enrich from an endpoint that uses a header to indicate a SEDA queue name, you can do something like this:

from("direct:start")
  .pollEnrich().simple("seda:${header.name}")
  .to("direct:result");

Or, shown here in XML DSL:

<route>
   <from uri="direct:start"/>
   <pollEnrich>
      <simple>seda${header.name}</simple>
   </pollEnrich>
   <to uri="direct:result"/>
</route>

pollEnrich Options

Name

Default Value

Description

expression

None

Specify an expression for configuring the URI of the external service to enrich from.

uri

This options have been deprecated. Specify the expression option instead.

ref

Refers to the endpoint for the external service to enrich from. You must use either uri or ref.

strategyRef

Refers to an AggregationStrategy to be used to merge the reply from the external service into a single outgoing message.

StrategyMethodName

When using POJOs as the AggregationStrategy, specify this option to explicitly declare the name of the aggregation method

strategyMethodAllowNull

false

The default behavior is that the aggregate method is not used if there is no data to enrich.

aggregateOnException

false

The default behavior is that the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich.

shareUnitOfWork

false

Default behavior is that the enrich operation does not share the unit of work between the parent exchange and the resource exchange.

cacheSize

1000

Option to configure the cache size for the ProducerCache, which caches producers for reuse in the enrich operation

ignoreInvalidEndpoint

false

Option indicates whether or not to ignore an endpoint URI that cannot be resolved

  • No labels