Java DSL (Resequencer)
Another common scenario we encounter will involve the capability to place a series of out-of-sequence messages back into the correct order. We are receiving a stream of related messages, but their order for a variety of reasons is incorrect. For this situation, we can use the Resequencer pattern to collect and reorder them so they can then be transmitted to the recipient.
Within Camel, there is support for this pattern using the resequence node. Camel uses a stateful batch processor that is capable of reordering related messages and supports two alternate resequencing algorithms:
Batch, it will collect messages into a batch, sorts and then publish the messages
Stream, this option will re-order message streams continuously, based on the detection of gaps between messages.
The Batch option is similar to the aggregator pattern with the addition of the sorting capability. While the Stream option is the traditional Resequencer pattern using gap detection.
Stream requires the usage of a Long number for use as the sequencer. It will be enforced via gap detection, allowing us to compute if gaps exist. A gap is detected if a number in a series is missing, e.g. 3, 4, 6 with number 5 missing. Camel will back off the messages until number 5 arrives.
Java DSL with Batch Example
As an example, suppose we needed to process stock quotes, every minute, and have them sequenced by their stock symbol. For this, we can use XPath as the expression to choose the correct stock symbol, and as the sort value.
from("jms:topic:stock:quote")
.resequence().xpath("/quote/@symbol")
.timeout(60 * 1000)
.to("acct:quotes");
NOTE: By default, it will use ascending sequence for the ordering. We do have the ability to develop our own comparison for sorting if required.
Java DSL with Stream Example
As an example of streaming, suppose we needed to continuously poll a file directory path for inventory updates on our products, and as an additional requirement it is important they are processed in sequence by their product id. To solve this, we can enable streaming and use one hour as the timeout.
from("file://inventory")
.resequence().xpath("/inventory/@sku")
.stream().timeout(60 * 60 * 1000)
.to("acct:inventoryUpdates");
Â