Unload Data from Snowflake and Load Them into Greenplum

This document describes a rather complex example of unloading data from the Snowflake data warehouse and loading them into the Greenplum database.

The processing consists of four stages:

  1. Unload data from the Snowflake table into multiple gzipped CSV files located on the internal Snowflake stage and download them into the integration server.

  2. Copy the data files from the integration server to the Greenplum server via SFTP.

  3. Load the data from the data files into the Greenplum database table.

  4. Clean up the data files on the Snowflake stage.

The code samples cover the following:

Add Snowflake Data Source as Maven Dependency

<dependency> <groupId>net.snowflake</groupId> <artifactId>snowflake-jdbc</artifactId> <version>3.11.1</version> </dependency>

Configure Snowflake Data Source Bean

<bean id="snowflakeDataSource" class="net.snowflake.client.jdbc.SnowflakeBasicDataSource"> <property name="url" value="${snowflake.url}" /> <property name="user" value="${snowflake.username}" /> <property name="password" value="${snowflake.password}" /> <property name="databaseName" value="${snowflake.database}" /> <property name="schema" value="${snowflake.schema}" /> </bean>

Create Integration Route

<routes xmlns="http://camel.apache.org/schema/spring"> <route id="unload-data-from-snowflake" autoStartup="true" startupOrder="10"> <from uri="timer://triggerSelect?repeatCount=1"/> <!-- Copy data as CSV to stage. --> <setBody><simple>COPY INTO @~/${exchangeId} from (select *, (SUBSIDIARY_CODE || '-' || ORDER_NUMBER || '-' || ORDER_LINE_NUMBER) as TRANSACTION_ID from VW_PFX_TRANSACTION_SALES where SUBSIDIARY_CODE='BK' and ORDER_INVOICE_DATE > '2020-01-01') file_format = (type = 'CSV' field_delimiter = ',' skip_header = 0 field_optionally_enclosed_by = '"' compression='gzip') max_file_size=16000000;</simple></setBody> <log message="Going to copy data to stage with sql: [${body}]"/> <to uri="jdbc:snowflakeDataSource"/> <!-- Download data from stage to local folder. --> <setBody><simple>GET @~/${exchangeId} file:///Users/martinh/workspace/pricefx/pricefx-integration-commons/config/data/${exchangeId};</simple></setBody> <log message="Going to download data using sql: [${body}]"/> <to uri="jdbc:snowflakeDataSource"/> <log message="Data download complete: [${body}]"/> <!-- Upload unloaded CSV files to Greenplum server --> <to uri="direct:upload-csv-files"/> <!-- Load data into Greenplum table --> <to uri="direct:load-into-greenplum"/> <!-- Cleanup --> <onCompletion> <!-- Remove data from stage. --> <setBody><simple>REMOVE @~/${exchangeId}</simple></setBody> <log message="Going to remove data files from stage using sql: [${body}]"/> <to uri="jdbc:snowflakeDataSource"/> <log message="Cleanup complete"/> </onCompletion> </route> <route id="csv-file-upload" autoStartup="true" startupOrder="20"> <from uri="direct:upload-csv-files"/> <!-- For each CSV file --> <split> <simple>${body}</simple> <log message="Uploading file [/Users/martinh/workspace/pricefx/pricefx-integration-commons/config/data/${exchangeProperty.CamelCorrelationId}/${body[file]}]"/> <!-- Load file bytes into exchange --> <pollEnrich> <simple>file:/Users/martinh/workspace/pricefx/pricefx-integration-commons/config/data/${exchangeProperty.CamelCorrelationId}?fileName=${body[file]}&delete=true</simple> </pollEnrich> <!-- Upload into Greenplum server--> <toD uri="sftp://127.0.0.1:2022/data/${exchangeProperty.CamelCorrelationId}?username=gpadmin&password=gpadmin&strictHostKeyChecking=no&disconnect=false"/> <log message="Upload complete [${headers[CamelFileNameProduced]}]"/> </split> <log message="Upload data files to Greenplum server completed."/> </route> <route id="load-data-into-greenplum" autoStartup="true" startupOrder="30"> <from uri="direct:load-into-greenplum"/> <!-- Set header of CSV files into header --> <setHeader headerName="PfxGp.InputColumns"> <groovy>[ ['SUBSIDIARY_CODE': 'VARCHAR(1024)'],['ORDER_NUMBER': 'VARCHAR (1024)'],['ORDER_LINE_NUMBER': 'NUMERIC(38,0)'],['BRANCH_ID': 'VARCHAR (1024)'],['CUSTOMER_ID': 'VARCHAR (1024)'],['PRODUCT_ID': 'VARCHAR (1024)'],['NON_STOCK_PRODUCT_NUMBER': 'VARCHAR (1024)'],['VENDOR_NUMBER': 'VARCHAR (1024)'],['ORDER_INVOICE_DATE': 'DATE'],['ORDER_TAKEN_DATE': 'DATE'],['ITEM_ORDER_QUANTITY': 'NUMERIC(18,5)'],['ITEM_SHIPPED_QUANTITY': 'NUMERIC(18,5)'],['ITEM_BACKORDERED_QUANTITY': 'NUMERIC(18,5)'],['UNIT_PRICE_AMOUNT': 'NUMERIC(18,5)'],['ORIGINAL_UNIT_PRICE_AMOUNT': 'NUMERIC(18,5)'],['GL_UNIT_COST_AMOUNT': 'NUMERIC(18,5)'],['FIELD_UNIT_COST_AMOUNT': 'NUMERIC(18,5)'],['EXTENDED_PRICE_AMOUNT': 'NUMERIC(18,2)'],['GL_EXTENDED_COST_AMOUNT': 'NUMERIC(18,2)'],['FIELD_EXTENDED_COST_AMOUNT': 'NUMERIC(18,2)'],['ANTICIPATED_REBATE_AMOUNT': 'NUMERIC(18,2)'],['GL_MARGIN_AMOUNT': 'NUMERIC(18,2)'],['FIELD_MARGIN_AMOUNT': 'NUMERIC(18,2)'],['FREIGHT_FEE_AMOUNT': 'NUMERIC(18,2)'],['RESTOCKING_FEE_AMOUNT': 'NUMERIC(18,2)'],['DELIVERY_FEE_AMOUNT': 'NUMERIC(18,2)'],['HANDLING_CHARGE_AMOUNT': 'NUMERIC(18,2)'],['WARRANTY_PENALTY_AMOUNT': 'NUMERIC(18,2)'],['OTHER_CHARGES_AMOUNT': 'NUMERIC(18,2)'],['TOTAL_FEES_AMOUNT': 'NUMERIC(18,2)'],['SALES_TAX_AMOUNT': 'NUMERIC(18,2)'],['TOTAL_BILLED_AMOUNT': 'NUMERIC(18,2)'],['ORDER_TYPE_CODE': 'VARCHAR (1024)'],['ORDER_TYPE_NAME': 'VARCHAR (1024)'],['ORDER_STATUS_CODE': 'VARCHAR (1024)'],['ORDER_STATUS_NAME': 'VARCHAR (1024)'],['ORDER_LINE_ORDER_TYPE_CODE': 'VARCHAR (1024)'],['ORDER_LINE_ORDER_TYPE_NAME': 'VARCHAR (1024)'],['ORDER_SHIP_METHOD_CODE': 'VARCHAR (1024)'],['ORDER_SHIP_METHOD_NAME': 'VARCHAR (1024)'],['ORDER_SOURCE_CODE': 'VARCHAR (1024)'],['ORDER_SOURCE_NAME': 'VARCHAR (1024)'],['PRICE_SOURCE_CODE': 'VARCHAR (1024)'],['PRICE_SOURCE_NAME': 'VARCHAR (1024)'],['FINAL_PRICE_SOURCE_CODE': 'VARCHAR (1024)'],['FINAL_PRICE_SOURCE_NAME': 'VARCHAR (1024)'],['COST_SOURCE_CODE': 'VARCHAR (1024)'],['COST_SOURCE_NAME': 'VARCHAR (1024)'],['OVERRIDE_REASON_CODE': 'VARCHAR (1024)'],['OVERRIDE_REASON_NAME': 'VARCHAR (1024)'],['RETURN_REASON_CODE': 'VARCHAR (1024)'],['RETURN_REASON_NAME': 'VARCHAR (1024)'],['ORIGINAL_ORDER_NUMBER': 'VARCHAR (1024)'],['REBATE_CONTRACT_NUMBER': 'VARCHAR (1024)'],['PRICE_RECORD_NUMBER': 'VARCHAR (1024)'],['ENTERED_BY_USER_ID': 'VARCHAR (1024)'],['ENTERED_BY_USER_NAME': 'VARCHAR (1024)'],['SALESPERSON_ID': 'VARCHAR (1024)'],['SALESPERSON_NAME': 'VARCHAR (1024)'],['IS_SALES_ITEM_FLAG': 'VARCHAR (1024)'],['IS_STOCK_ITEM_FLAG': 'VARCHAR (1024)'],['IS_SPECIAL_ORDER_FLAG': 'VARCHAR (1024)'],['IS_RETURN_FLAG': 'VARCHAR (1024)'],['IS_NO_CHARGE_FLAG': 'VARCHAR (1024)'],['IS_ITEM_PRICE_OVERRIDE_FLAG': 'VARCHAR (1024)'],['IS_ITEM_KIT_FLAG': 'VARCHAR (1024)'],['IS_ITEM_KIT_COMPONENT_FLAG': 'VARCHAR (1024)'],['IS_INVOICED_FLAG': 'VARCHAR (1024)'],['IS_ECOMMERCE_FLAG': 'VARCHAR (1024)'],['IS_DIRECT_SHIP_FLAG': 'VARCHAR(1024)'], ['TRANSACTION_ID': 'VARCHAR(1024)']]</groovy> </setHeader> <!-- Set path pattern of CSV files --> <setBody><simple>/home/gpadmin/data/${exchangeId}/${exchangeId}*</simple></setBody> <!-- Load data into Greenplum --> <to uri="pfx-gp:loaddata?database=test&masterHost=localhost&masterPort=5432&dbUser=gpadmin&dbPassword=password&table=transaction_sales&matchColumns=transaction_id&errorLimit=10&logErrors=true&sshHost=127.0.0.1&sshPort=2022&sshUsername=gpadmin&sshPassword=gpadmin&sshDirectory=/home/gpadmin/data&updateColumns=SUBSIDIARY_CODE,ORDER_NUMBER,ORDER_LINE_NUMBER,BRANCH_ID,CUSTOMER_ID,PRODUCT_ID,NON_STOCK_PRODUCT_NUMBER,VENDOR_NUMBER,ORDER_INVOICE_DATE,ORDER_TAKEN_DATE,ITEM_ORDER_QUANTITY,ITEM_SHIPPED_QUANTITY,ITEM_BACKORDERED_QUANTITY,UNIT_PRICE_AMOUNT,ORIGINAL_UNIT_PRICE_AMOUNT,GL_UNIT_COST_AMOUNT,FIELD_UNIT_COST_AMOUNT,EXTENDED_PRICE_AMOUNT,GL_EXTENDED_COST_AMOUNT,FIELD_EXTENDED_COST_AMOUNT,ANTICIPATED_REBATE_AMOUNT,GL_MARGIN_AMOUNT,FIELD_MARGIN_AMOUNT,FREIGHT_FEE_AMOUNT,RESTOCKING_FEE_AMOUNT,DELIVERY_FEE_AMOUNT,HANDLING_CHARGE_AMOUNT,WARRANTY_PENALTY_AMOUNT,OTHER_CHARGES_AMOUNT,TOTAL_FEES_AMOUNT,SALES_TAX_AMOUNT,TOTAL_BILLED_AMOUNT,ORDER_TYPE_CODE,ORDER_TYPE_NAME,ORDER_STATUS_CODE,ORDER_STATUS_NAME,ORDER_LINE_ORDER_TYPE_CODE,ORDER_LINE_ORDER_TYPE_NAME,ORDER_SHIP_METHOD_CODE,ORDER_SHIP_METHOD_NAME,ORDER_SOURCE_CODE,ORDER_SOURCE_NAME,PRICE_SOURCE_CODE,PRICE_SOURCE_NAME,FINAL_PRICE_SOURCE_CODE,FINAL_PRICE_SOURCE_NAME,COST_SOURCE_CODE,COST_SOURCE_NAME,OVERRIDE_REASON_CODE,OVERRIDE_REASON_NAME,RETURN_REASON_CODE,RETURN_REASON_NAME,ORIGINAL_ORDER_NUMBER,REBATE_CONTRACT_NUMBER,PRICE_RECORD_NUMBER,ENTERED_BY_USER_ID,ENTERED_BY_USER_NAME,SALESPERSON_ID,SALESPERSON_NAME,IS_SALES_ITEM_FLAG,IS_STOCK_ITEM_FLAG,IS_SPECIAL_ORDER_FLAG,IS_RETURN_FLAG,IS_NO_CHARGE_FLAG,IS_ITEM_PRICE_OVERRIDE_FLAG,IS_ITEM_KIT_FLAG,IS_ITEM_KIT_COMPONENT_FLAG,IS_INVOICED_FLAG,IS_ECOMMERCE_FLAG,IS_DIRECT_SHIP_FLAG"/> <log message="Load into Greenplum completed [${body}]"/> <!-- Clean up CSV files on Greenplum server --> <onCompletion> <setBody><simple>rm -rf /home/gpadmin/data/${exchangeId}</simple></setBody> <log message="Cleaning up uploaded CSV files [${body}]"/> <toD uri="scp://127.0.0.1:2022//home/gpadmin/data?username=gpadmin&password=gpadmin&strictHostKeyChecking=no&disconnect=true"/> </onCompletion> </route> </routes>

IntegrationManager version 5.8.0