The following examples illustrate how to load different formats of data into WarehousePG (WHPG) using FlowServer.

Loading CSV data from RabbitMQ using insert mode

In this example, you load data from a RabbitMQ queue named queue_for_whpg into a WHPG table named data_from_rabbitmq. The RabbitMQ queue uses CSV format that include a customer identifier (integer), the month (integer), and an expense amount (decimal). The target table will incorporate a new column that calculates the tax for each expense.

The data will be processed in batches and committed every second. It will be loaded into WHPG using the insert mode.

This example assumes that all prerequisites are met.

Create a configuration file named rabbitmq_data.yaml which contains the following:

version: v1.0
source:
  rabbitmq:
    server: gpadmin:changeme@localhost:5672
    vhost: fs_dev
    queue: queue_for_whpg
    format:
      csv:
        columns:
        - name: cust_id
          type: int
        - name: month
          type: int
        - name: expense
          type: decimal (9,2)
        delimiter: '&'
        quote: "'"
        escape: "\x2c" # ,
    task:
      batch_size:
        interval_ms: 1000
target:
  database:
    host: localhost
    port: 15432
    user: gpadmin
    password: password
    dbname: testdb
    error_limit: 5
    tables:
    - table: data_from_rabbitmq
      schema: public
      mode:
        insert: {}
      mapping:
        customer: cust_id
        month: month
        cost: expense
        tax: expense * .25

Create the target WHPG table:

CREATE TABLE data_from_rabbitmq (customer int, month int, expenses decimal(9,2), tax decimal (7,2));

Run FlowCLI to submit and start a job that loads the data from the RabbitMQ queue into the WHPG table:

flowcli submit --name rabbitmq_job rabbitmq_data.yaml -s

Verify that the WHPG target table has been populated with the data loaded from the RabbitMQ queue:

SELECT * from data_from_rabbitmq;

Loading Avro data from Kafka in merge mode

In this example, you load Avro-format value data as JSON from a Kafka topic named kafka_topic into a WarehousePG (WHPG) table named data_from_kafka.

Th Kafka producer emits customer order messages in JSON format that include the customer name (text), and the amount purchased (integer).

The data will be processed in batches and committed every second. It will be loaded into WHPG using the merge mode:

  • It will insert new rows and update existing rows when the value of qty from the target table data_from_kafka is bigger than 20.
  • It will delete rows when the value of name from the target table data_from_kafka is equal to the input data, and the value of name is customer-3.

This example assumes that all prerequisites are met.

Create a configuration file named kafka_data.yaml which contains the following:

version: v1.0
source:
  kafka:
    brokers:
      - kafka_host:9092
    topic: test_avro
    value:
      avro:
        column:
          name: value
          type: JSON
        schema_registry: http://kafka_host:8081
    task:
      batch_size:
        interval_ms: 1000
target:
  database:
    host: localhost
    port: 15432
    user: gpadmin
    password: password
    dbname: testdb
    error_limit: "5"
    tables:
    - table: data_from_kafka
      schema: public
      mode:
        merge:
          match_columns:
            - name
          update_columns:
            - qty
          update_condition: data_from_kafka.qty > 20
          delete_condition: (value->>'name')::text = 'customer-3'
      mapping:
        name: (value->>'name')::text
        qty: (value->>'qty')::int

Create the target WHPG table:

CREATE TABLE data_from_kafka (name text, qty int);

Run FlowCLI to submit and start a job that loads the data from the Kafka topic into the WHPG table:

flowcli submit --name kafka_job kafka_data.yaml -s

Verify that the WHPG target table has been populated with the data loaded from the Kafka topic:

SELECT * from data_from_kafka;

Could this page be better? Report a problem or suggest an addition!