Upsert Data from Non-CSV Format
This step illustrates how to easily upsert data from a non-CSV format to an Oracle database table.
Data import can be done in two insertion modes – insert and upsert. It is similar to upserting data from CSV to a database storage; the only change is that we don't use the flatpack CSV format and net.pricefx.integration.route.CsvToDbRoute in Step 6 but we have to use our own implementation of net.pricefx.integration.route.core.AbstractBatchRoute.
Step by Step
Prepare the database connection as described in Configure Database Connection.
Create a database table which will hold the data. It is good practice to save SQL DDL commands to a file which is versioned as well.
Add flatpackDataFormat which describes the CSV file format to the camel-context.xml file.
<pfx:flatpackDataFormat id="csvFormat" delimiter=","/>
Create loadMapper for the transition between CSV columns and database table columns. The best practice is to put it to a mappers-filter.xml file.
Mapper example
<loadMapper id="customerToDbMapper"> <body out="CUSTOMER_ID" in="customerId"/> <body out="SALES_ORG" in="salesOrg"/> <body out="DIVISION" in="division"/> </loadMapper>
Our own implementation of net.pricefx.integration.route.core.AbstractBatchRoute to suit the data format will be used in the next step as a class of the customerBatchImportToDbRoute bean.
ListToDbRoute example
package net.pricefx.integration.utils; import net.pricefx.integration.aggregation.AggregateToList; import net.pricefx.integration.db.bean.StatementAndParamsProvider; import net.pricefx.integration.db.bean.sqlprovider.StatementWithParamNames; import net.pricefx.integration.mapper.Mapper; import net.pricefx.integration.mapper.MapperProcessor; import net.pricefx.integration.route.core.AbstractBatchRoute; import org.apache.camel.Processor; import org.apache.camel.model.AggregateDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ListToDbRoute extends AbstractBatchRoute { public static final Logger log = LoggerFactory.getLogger(ListToDbRoute.class); private StatementAndParamsProvider statementAndParamsProvider; private Mapper mapper; public void configure() { Processor mapperProcessorToUse = this.getMapperToUse(); StatementWithParamNames stmtWithParamNames = this.statementAndParamsProvider.provideSqlQuery(); String sqlQuery = stmtWithParamNames.getStatement(); if (log.isDebugEnabled()) { log.debug("SQL STATEMENT TO BE RUN: {}", sqlQuery); } ((AggregateDefinition) ((this.from(this.getRouteName()).routeId(this.getRouteName())).split(this.body()).aggregate(this.constant(true), new AggregateToList()).completionSize(this.getBatchSize().intValue()).completionTimeout((long) this.getCompletionTimeout().intValue()).aggregateController(this.aggregateController).process(mapperProcessorToUse)).to("sql://" + sqlQuery + "?batch=true")).end().end().process(this.getForceCompletionOnAllGroupsProcessor(this.aggregateController)); } private Processor getMapperToUse() { if (this.getMapper() != null) { MapperProcessor mapperProcessor = new MapperProcessor(); mapperProcessor.setMapper(this.getMapper()); return mapperProcessor; } else { return exchange -> { }; } } public Mapper getMapper() { return this.mapper; } public void setMapper(Mapper mapper) { this.mapper = mapper; } public StatementAndParamsProvider getStatementAndParamsProvider() { return this.statementAndParamsProvider; } public void setStatementAndParamsProvider(StatementAndParamsProvider statementAndParamsProvider) { this.statementAndParamsProvider = statementAndParamsProvider; } }
Create a route builder bean which defines transition between the CSV file and database table in camel-context.xml.
Mapper example
Apply the route builder in camelContext in the camel-context.xml file.
Apply route builder
Usage of the route is quite simple. Once you have the CSV file in the route, you can call something like this:
Use import route
IntegrationManager version 5.8.0