Examples
The following examples illustrate how to load different formats of data into WarehousePG (WHPG) using FlowServer.
Loading CSV data from RabbitMQ using insert mode
In this example, you load data from a RabbitMQ queue named queue_for_whpg into a WHPG table named data_from_rabbitmq. The RabbitMQ queue uses CSV format that include a customer identifier (integer), the month (integer), and an expense amount (decimal). The target table will incorporate a new column that calculates the tax for each expense.
The data will be processed in batches and committed every second. It will be loaded into WHPG using the insert mode.
This example assumes that all prerequisites are met.
Create a configuration file named rabbitmq_data.yaml which contains the following:
version: v1.0 source: rabbitmq: server: gpadmin:changeme@localhost:5672 vhost: fs_dev queue: queue_for_whpg format: csv: columns: - name: cust_id type: int - name: month type: int - name: expense type: decimal (9,2) delimiter: '&' quote: "'" escape: "\x2c" # , task: batch_size: interval_ms: 1000 target: database: host: localhost port: 15432 user: gpadmin password: password dbname: testdb error_limit: 5 tables: - table: data_from_rabbitmq schema: public mode: insert: {} mapping: customer: cust_id month: month cost: expense tax: expense * .25
Create the target WHPG table:
CREATE TABLE data_from_rabbitmq (customer int, month int, expenses decimal(9,2), tax decimal (7,2));
Run FlowCLI to submit and start a job that loads the data from the RabbitMQ queue into the WHPG table:
flowcli submit --name rabbitmq_job rabbitmq_data.yaml -s
Verify that the WHPG target table has been populated with the data loaded from the RabbitMQ queue:
SELECT * from data_from_rabbitmq;
Loading Avro data from Kafka in merge mode
In this example, you load Avro-format value data as JSON from a Kafka topic named kafka_topic into a WarehousePG (WHPG) table named data_from_kafka.
Th Kafka producer emits customer order messages in JSON format that include the customer name (text), and the amount purchased (integer).
The data will be processed in batches and committed every second. It will be loaded into WHPG using the merge mode:
- It will insert new rows and update existing rows when the value of
qtyfrom the target tabledata_from_kafkais bigger than 20. - It will delete rows when the value of
namefrom the target tabledata_from_kafkais equal to the input data, and the value ofnameiscustomer-3.
This example assumes that all prerequisites are met.
Create a configuration file named kafka_data.yaml which contains the following:
version: v1.0 source: kafka: brokers: - kafka_host:9092 topic: test_avro value: avro: column: name: value type: JSON schema_registry: http://kafka_host:8081 task: batch_size: interval_ms: 1000 target: database: host: localhost port: 15432 user: gpadmin password: password dbname: testdb error_limit: "5" tables: - table: data_from_kafka schema: public mode: merge: match_columns: - name update_columns: - qty update_condition: data_from_kafka.qty > 20 delete_condition: (value->>'name')::text = 'customer-3' mapping: name: (value->>'name')::text qty: (value->>'qty')::int
Create the target WHPG table:
CREATE TABLE data_from_kafka (name text, qty int);
Run FlowCLI to submit and start a job that loads the data from the Kafka topic into the WHPG table:
flowcli submit --name kafka_job kafka_data.yaml -s
Verify that the WHPG target table has been populated with the data loaded from the Kafka topic:
SELECT * from data_from_kafka;
Could this page be better? Report a problem or suggest an addition!