Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

NOTE: Exchanges from each of the input endpoints, URI1, URI2, and URI3, are processed independently of each other and in separate threads

Segmented Routes

In certain situations, we may want to merge multiple incoming messages from different messaging systems and process them using the same route. In most cases, you can deal with multiple inputs by dividing your route into segments:

...

The initial segments of this route example will take their inputs from external queues  (ie. nyse and nasdaq),  and then send the incoming exchanges to an internal endpoint (ie. combinedURL). The next route segment will then merge these incoming exchanges, extracting from the internal endpoint and sending to the destination queue.

Direct Endpoints

The direct component provides us with the simplest mechanism for linking together routes. The event model for the direct component is synchronous, so that subsequent segments of the route run in the same thread as the first segment. The standard format of a direct URL is direct:EndpointID, where the endpoint ID is simply a unique alphanumeric string that identifies the endpoint instance.

Code Block
<from uri="nyse" />
<to uri="combinedURL" />
<from uri="nasdaq" />
<to uri="combinedURL" />

<from uri="combinedURL" />
<to uri="allTxns" />

The implementation of the direct endpoint will behave as follows: whenever an exchange arrives at a producer endpoint (ie. to(combinedURL)), the direct endpoint passes the exchange directly to all of the consumer's endpoints that have the same endpoint ID (ie. from(combinedURL),

NOTE: Direct endpoints can only be used to communicate between routes that belong to the same CamelContext in the same Java virtual machine (JVM) instance.

SEDA Endpoints

The SEDA (Staged Event Driven Architecture) component provides an alternative mechanism for linking together routes. It can be used similary to the direct component, however, there is a different underlying event and threading model:

  • The processing of a SEDA endpoint is not synchronous. Therefore, when you send an exchange to a SEDA producer endpoint, control immediately returns to the preceding processor in the route.

    Each of these SEDA endpoints will contain a queue buffer (of java.util.concurrent.BlockingQueue type), which stores all of the incoming exchanges prior to processing by the next route segment.

  • Each of these SEDA consumer endpoints will create a thread pool (the default size is 5) to process exchange objects from the blocking queue.

  • The SEDA component supports the competing consumers pattern, which guarantees that each incoming exchange is processed only once, even if there are multiple consumers attached to a specific endpoint.

A primary advantage of using a SEDA endpoint is it will allow our routes to be more responsive based on the use of the built-in consumer thread pool. The stock transactions example can be re-written to use SEDA endpoints instead of direct endpoints, as follows:

Code Block
<from uri="nyse" />
<to uri="seda:combinedURL" />
<from uri="nasdaq" />
<to uri="seda:combinedURL" />

<from uri="seda: combinedURL" />
<to uri="allTxns" />

NOTE: The primary difference between this scenario and the direct scenario is that with SEDA, the second route segment (from seda:combinedURL) is processed by a pool of five threads.

Content Enricher Pattern

The content enricher pattern defines another different approach to deal with multiple inputs to a route. When an exchange enters the enricher processor, the enricher contacts an external resource to retrieve information, which is then added to the original message. In this type of situation and pattern, this external resource will effectively represent a second input to the message.

For example, suppose you are writing an application that processes credit card requests. Before processing a CC request, you will need to enhance it with the assignment a credit rating to the customer. This credit rating information will be stored in a file in the directory, src/data/ratings. You can combine the incoming credit request with data from the rating file using the pollEnrich() pattern:

Code Block
from("jms:queue:creditRequests")
    .pollEnrich("file:src/data/ratings?noop=true", new GroupedExchangeAggregationStrategy())
    .bean(new MergeCreditRequestAndRatings(), "merge")
    .to("jms:queue:reformatRequest");

Code Block
<route>
    <from uri="jms:queue:creditRequest"/>
    <pollEnrich timeout="10000" strategyRef="myStrategy">
      <constant>"file:src/data/ratings?fileName=data.txt"</constant>
    </pollEnrich>
    <bean ref="MergeCreditRequestAndRating" method="merge"/>
    <to uri="jms:queue:reformatRequest"/>
  </route>