Constructing the configuration file
FlowServer requires a YAML-formatted configuration file to load data into WarehousePG (WHPG) that contains the configuration for the FlowServer job that uses Kafka or RabbitMQ as a source.
It includes properties that identify the data source and format, and information about the WHPG connection and target table. You can also schedule the job to run for a specific duration of time and automatically stop and restart based on your defined intervals.
The configuration file is composed of two main sections: source and target. Source defines the data origin, which points to Kafka or RabbitMQ. The target section defines settings about the WHPG target table and how the job runs.
The column names defined under <data_format> in the source section must match the column names defined as <source_column_name> under mapping in the target section, which maps to columns on the target WHPG database defined by <target_column_name>.
Construct the source section for a RabbitMQ source
Below is a template configuration file for Kafka data:
version: v1.0 source: kafka: brokers: - <kafka_broker_host:broker_port> topic: <kafka_topic> value: <data_format>: <column_spec> <other_props> control: consistency: at-most-once | at-least-once | exactly-once | none fallback_offset: earliest | latest task: batch_size: interval_ms: <wait_time> max_count: <number_of_rows> window_size: <num_batches> window_statement: <udf_or_sql_to_run> prepare_statement: <udf_or_sql_to_run> teardown_statement: <udf_or_sql_to_run> properties: <source_property_name>: <source_property_value>
Where:
version: The version of the configuration format (FlowServer usesv1.0).source: The source configuration for the job.kafka: Configuration for Kafka source.brokers: List of Kafka broker addresses.topic: The Kafka topic to consume messages from.value: Configuration for the value data format of the Kafka messages.<data_format>: The format of the data (json,avro, orcsv). See below for the different format options.
control: Control settings for the Kafka source.consistency: The consistency level for message processing. The valid values areat-most-once,at-least-once,exactly-onceandnone. The default isexactly-once.exactly-once: the job will store the offsets in the database and will not reprocess messages that have already been processed.at-most-once: the job will not store the offsets in the database and commit the offset to Kafka before committing the data loading transaction in database.at-least-once: the job will not store the offsets in the database and commit the offset to Kafka after committing the data loading transaction in database.none: the job will not store the offsets in the database and will auto-ack the offset.
fallback_offset: The offset to use if there is offset gap between the record in history store and Kafka watermark. The valid values areearliestandlatest. If a value is not specified, an error will be raised if there is offset gap.
task: Task management settings.batch_size: Controls the batch size for processing messages.interval_ms: The time interval to wait for new messages before committing loading transactions. The default is 1000 milliseconds.max_count: The maximum number of rows to process in a batch. The default is 0, which means FlowServer ignores this setting. If bothinterval_msandmax_countare set, the job will wait for the specified time interval or until the maximum number of rows is reached, whichever comes first.
window_size: The number of batches to process before runningwindow_statement.window_statement: SQL or UDF to run for eachwindow_sizeof batch data.prepare_statement: SQL or UDF to run before starting the job.teardown_statement: SQL or UDF to run after the job stops.
properties: Additional properties for the source.<source_property_name>: The name of the property.<source_property_value>: The value of the property.
The different options for each <data_format> mode are as follows:
- JSON format: FlowServer can read JSON data as a single object or can read a single JSON record per line.
json: column: name: <column_name> type: json | jsonb
Where:
column: Configuration for the external table for the Kafka message.name: Name of the column of the external table for the Kafka message.type: Type of the column of the external table for the Kafka message. Valid values arejson, andjsonb.
Avro format:
avro: column: name: <column_name> type: json schema_registry: <schema_registry_url> | schema_file: <schema_file_path>
Where:
column: Configuration for the external table for the Kafka message.name: Name of the column of the external table for the Kafka message.Type: Type of the column of the external table for the Kafka message. The value must bejson.schema_registry: If the Avro schema of the JSON data you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster.schema_file: If you are not using a schema registered in the Confluent Schema Registry, specify the local path to a text-based Avro schema file.
When you specify the Avro data format, you must define only a single json type column.
CSV format:
csv: columns: - name: <column_name> type: <column_data_type> ... delimiter: <delim_char> quote: <quote_char> null_string: <nullstr_val> escape: <escape_char> newline: <newline_val> force_not_null: <columns> force_quote: <columns> fill_missing_fields: <boolean>
Where:
columns: Configuration for the external table for the Kafka message.name: Name of the column of the external table for the Kafka message.type: Type of the column of the external table for the Kafka message.
delimiter: Specifies a single ASCII character that separates columns within each message or row of data. The default delimiter is a comma (,).quote: Specifies the quotation character.null_string: Specifies the string that represents the null value.escape: Specifies the single character that is used for escaping data characters in the content that might otherwise be interpreted as row or column delimiters. Ensure that your escape character is not used anywhere in your actual column data.newline: Specifies the new line character(s) that end each record.force_not_null: Specifies a comma-separated list of column names to process as though each column were quoted and hence not aNULLvalue.force_quote: Specifies a list of columns that must always be wrapped in quotation marks, even if the data inside doesn't technically require them. This is used to ensure the database treats those specific fields as literal strings.fill_missing_fields: Specifies the action of FlowServer when it reads a row of data that has missing trailing field values (the row has missing data fields at the end of a line or row). The default value isfalse, FlowServer returns an error. If set totrue,FlowServersets missing trailing field values toNULL. Blank rows, fields with aNOT NULLconstraint, and trailing delimiters on a line will still generate an error.
When you specify the CSV data format, the message content cannot contain line ending characters (CR and LF).
Construct the source section for a RabbitMQ source
Below is a template configuration file for RabbitMQ data:
version: v1.0 source: rabbitmq: server: <rmq_user>:<rmq_password>@<rmq_host>:<rmq_port> vhost: <vhost_name> queue: <queue_name> format: <data_format>: <column_spec> <other_props> control: consistency: at-most-once | at-least-once task: batch_size: interval_ms: <wait_time> max_count: <number_of_rows> window_size: <num_batches> window_statement: <udf_or_sql_to_run> prepare_statement: <udf_or_sql_to_run> teardown_statement: <udf_or_sql_to_run> properties: <source_property_name>: <source_property_value>
Where:
version: The version of the configuration format (FlowServer usesv1.0).source: The source configuration for the job.rabbitmq: Configuration for RabbitMQ source.server: The RabbitMQ server connection string.vhost: The RabbitMQ virtual host to use.queue: The RabbitMQ queue to consume messages from.format: Configuration for the data format of the RabbitMQ messages.<data_format>: The format of the data (jsonorcsv). See below for specific format sub-options.
control: Control settings for the RabbitMQ source.consistency: The consistency level for message processing. The valid values areat-most-once, andat-least-once. The default isat-least-once.at-least-once: the job will store the offsets in the database and will not reprocess messages that have already been processed.at-most-once: the job will not store the offsets in the database and commit the offset to RabbitMQ before committing the data loading transaction in database.
task: Task management settings.batch_size: Controls the batch size for processing messages.interval_ms: The time interval to wait for new messages before committing loading transactions. The default is 1000 milliseconds.max_count: The maximum number of rows to process in a batch. The default is 0, which means FlowServer ignores this setting. If bothinterval_msandmax_countare set, the job will wait for the specified time interval or until the maximum number of rows is reached, whichever comes first.
window_size: The number of batches to process before runningwindow_statement.window_statement: SQL or UDF to run for eachwindow_sizeof batch data.prepare_statement: SQL or UDF to run before starting the job.teardown_statement: SQL or UDF to run after the job stops.
properties: Additional properties for the source.<source_property_name>: The name of the property.<source_property_value>: The value of the property.
The different options for each <data_format> mode are as follows:
- JSON format: FlowServer can read JSON data as a single object or can read a single JSON record per line.
json: column: name: <column_name> type: json | jsonb
Where:
column: Configuration for the external table for the RabbitMQ message.name: Name of the column of the external table for the RabbitMQ message.type: Type of the column of the external table for the RabbitMQ message. Valid values arejson, andjsonb.
CSV format:
csv: columns: - name: <column_name> type: <column_data_type> ... delimiter: <delim_char> quote: <quote_char> null_string: <nullstr_val> escape: <escape_char> newline: <newline_val> force_not_null: <columns> force_quote: <columns> fill_missing_fields: <boolean>
Where:
columns: Configuration for the external table for the RabbitMQ message.name: Name of the column of the external table for the RabbitMQ message.type: Type of the column of the external table for the RabbitMQ message.
delimiter: Specifies a single ASCII character that separates columns within each message or row of data. The default delimiter is a comma (,).quote: Specifies the quotation character.null_string: Specifies the string that represents the null value.escape: Specifies the single character that is used for escaping data characters in the content that might otherwise be interpreted as row or column delimiters. Ensure that your escape character is not used anywhere in your actual column data.newline: Specifies the new line character(s) that end each record.force_not_null: Specifies a comma-separated list of column names to process as though each column were quoted and hence not aNULLvalue.force_quote: Specifies a list of columns that must always be wrapped in quotation marks, even if the data inside doesn't technically require them. This is used to ensure the database treats those specific fields as literal strings.fill_missing_fields: Specifies the action of FlowServer when it reads a row of data that has missing trailing field values (the row has missing data fields at the end of a line or row). The default value isfalse, FlowServer returns an error. If set totrue,FlowServersets missing trailing field values toNULL. Blank rows, fields with aNOT NULLconstraint, and trailing delimiters on a line will still generate an error.
When you specify the CSV data format, the message content cannot contain line ending characters (CR and LF).
Construct the target section
target: database: host: <whpg_host> port: <whpg_port> user: <user_name> password: <password> dbname: <dbname> staging_schema: <staging_schema> error_limit: <num_errors> | <percentage_errors> filter_expression: <filter_string> tables: - table: <table_name> schema: <schema_name> mode: # one of the below insert, update, or merge modes insert: {} update: <mode_specific_property>: <value> ... merge: <mode_specific_property>: <value> ... mapping: <target_column_name> : <source_column_name> | <expression> filter: <output_filter_string> option: schedule: running_duration: <run_time> auto_stop_restart_interval: <restart_time> max_restart_times: <num_restarts>
Where:
target: The target database configuration for the job.database: Configuration for the target database.host: The hostname or IP address of the WarehousePG coordinator.port: The port number of the WarehousePG cluster.user: The username to connect to the WarehousePG cluster.password: The password for the user.dbname: The name of the database to connect to.staging_schema: The schema to use for creating internal tables. The default ispublic.error_limit: The limit for errors before stopping the job. It can be a number or a percentage.filter_expression: The filter to apply to the input data before loading data into the target tables. A valid filter expression is a SQLWHEREclause without theWHEREkeyword.tables: List of tables to write data to.table: The name of the target table.schema: The schema of the target table.mode: The mode for writing data to the table. Valid values areinsert,update, ormerge. Default isinsert. See below for specific mode sub-options.mapping: Column mapping between source and target.<target_column_name>: The name of the target column.<source_column_name>: The name of the source column or an expression to compute the value.
filter: A filter expression to apply to the target table. A valid filter expression is a SQLWHEREclause without theWHEREkeyword, and may it reference one or more<source_column_name>s.
option: Additional options for the job.schedule: Scheduling options for the job.running_duration: The duration the job should run for.auto_stop_restart_interval: The amount of time after which FlowServer restarts a job that it stopped due to reachingrunning_duration.max_restart_times: The maximum number of times the job can be automatically restarted.
The different options for each writing mode are as follows:
Insert mode inserts new data into the WHPG table and has no options.
Update mode updates the target table columns that are listed in
update_columnswhen the input columns identified inmatch_columnsmatch the named target table columns and the optionalupdate_conditionis true.update: match_columns: [<match_column_names>] order_columns: [<order_column_names>] update_columns: [<update_column_names>] update_condition: <update_condition>
Merge mode has two different behaviors:
Inserts new rows and updates existing rows when columns are listed in
update_columns, thematch_columnstarget table column values are equal to the input data, and an optionalupdate_conditionis specified and met.Deletes rows when the
match_columnstarget table column values are equal to the input data, and an optionaldelete_conditionis specified and met.merge: match_columns: [<match_column_names>] update_columns: [<update_column_names>] order_columns: [<order_column_names>] update_condition: <update_condition> delete_condition: <delete_condition>
Could this page be better? Report a problem or suggest an addition!