Java DSL (Aggregator)
One of the more common scenarios is the ability to combine different messages together based on certain relationships so they can then be processed as a composite. When we need to combine multiple messages together we will use the Aggregator pattern.
In the following examples, we will use a stateful filter (an Aggregator), to collect and persist individual messages until it receives a complete set of related messages. Once all messages have been received it will then publish the composite message.
Java DSL with Up-to-Date Example
As an example, suppose we want to update our website every five minutes with the latest stock quotes. Since we can receive multiple quotes for the same stock within our designated time period, we only want to keep the last one (since it is the most up-to-date. We can accomplish this with the aggregator:
from("jms:topic:stock:quote")
.aggregate().xpath("/quote/@symbol")
.batchTimeout(5 * 60 * 1000).to("jsmith:quotes");
NOTE: For the correlation expression we are using an XPath expression to fetch the stock symbol from the message body. With the aggregation strategy, we use the default option that picks the latest message, and thus also the most up-to-date.
The time period is set as a timeout value in milliseconds and in this scenario, we are using a JMS Topic queue
Java DSL with Bean Example
As an example, suppose we needed to aggregate responses from several different financial institutions to gather their quote for a specific loan request. Our strategy is to pick the institution with the best quote. We will need to implement some criteria to determine the cheapest loan and use our aggregation strategy to perform it.
from("jms:topic:loan:quote")
.aggregate().header("loanId")
.aggregationStrategy(bestQuote)
.completionPredicate(header(Exchange.AGGREGATED_SIZE)
.isGreaterThan(2))
.to("jsmith:bestLoanQuote")
In our bean, our criteria are to receive at least 3 quotes. Thus, we will use a completion predicate to signal when we have received more than 2 quotes for a given loan. The following code snippet will show the aggregation strategy we must implement:
public class BestQuoteStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
double oldQuote = oldExchange.getIn().getBody(Double.class);
double newQuote = newExchange.getIn().getBody(Double.class);
// return the "winner" that has the lowest quote
return newQuote < oldQuote ? newExchange : oldExchange;
}
}