Distributed Spark execution v1.6
Integrate the Postgres Analytics Accelerator (PGAA) with Apache Spark to transform Postgres into a high-performance platform that overcomes the CPU bottlenecks of multi-terabyte datasets.
Offloading query execution to Apache Spark via PGAA provides immediate architectural advantages:
- Intentional hand-off: You maintain full control by explicitly selecting the execution engine; PGAA then packages the SQL query and routes it to a Spark Connect endpoint.
- Distributed processing: By moving workloads to a Spark cluster, you leverage distributed computing power to handle massive datasets that would otherwise bottleneck a single database instance.
- Zero indexing: Achieve peak analytical performance without the need for manual index creation or constant maintenance, significantly reducing operational overhead.
Note
Currently, integration with Apache Spark supports:
- Read-only queries on Parquet files in S3-compatible object storage or a shared POSIX filesystem.
- Read-only queries for Iceberg tables in Iceberg REST catalogs.
The configuration of PGAA with Spark consists of the following steps:
- Configure your Spark environment.
- Connect Postgres to Spark.
- Configure Postgres's source data.
- Run analytical queries.
- Inspect the query plan.
Configuring your Spark environment
You can leverage or install a Spark cluster installation running 3.4 or later.
You must have a running Spark Connect server configured with the following dependencies:
org.apache.spark:spark-connect_2.12:3.5.6,\ io.delta:delta-spark_2.12:3.3.1,\ org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2,\ org.apache.iceberg:iceberg-aws-bundle:1.9.2,\ org.apache.hadoop:hadoop-aws:3.3.4
Important
There is a known issue in the Iceberg Spark Library version 1.9.2 when using Spark to read from Iceberg tables. Under certain conditions, equality deletes may occasionally be skipped during concurrent executions.
The current workaround is to disable the Spark application setting spark.sql.iceberg.executor-cache.enabled on your spark-defaults.conf file.
Disabling this cache ensures data consistency by correctly processing all deletes, but it may have performance implications for high-volume read workloads.
Connecting Postgres to Spark
Set the execution engine and define your Spark Connect URL in your Postgres session:
SET pgaa.executor_engine = 'spark_connect'; SET pgaa.spark_connect_url = 'sc://spark-connect:15002';
Where
spark-connectpoints to your Spark Connect service address.Validate the connection by running a simple version check through the PGAA interface:
SELECT pgaa.spark_sql('SELECT version()');
If successful, the command returns the version string of your Spark cluster.
Configuring the source data
PGAA supports either reading Parquet files from S3-compatible storage and local file system as data storage locations, or reading Iceberg tables in Iceberg REST catalogs.
Option 1: reading from Parquet files
Create a PGFS storage location in your Postgres database that points to the bucket containing your analytical data. For example, with a public bucket:
SELECT pgfs.create_storage_location( 'my-sample-data', 's3://beacon-analytics-demo-data-us-east-1-prod', '{"skip_signature": "true", "region": "us-east-1"}' );
Create tables using the
PGAAaccess method. Specify the storage location created in the previous step, the path to either a single Parquet file or a directory containing multiple Parquet files, and the format (currently Parquet). For example:CREATE TABLE customer () USING PGAA WITH (pgaa.storage_location = 'my-sample-data', pgaa.path = 'tpch_sf_1/customer', pgaa.format = 'parquet'); CREATE TABLE orders () USING PGAA WITH (pgaa.storage_location = 'my-sample-data', pgaa.path = 'tpch_sf_1/orders', pgaa.format = 'parquet'); CREATE TABLE lineitem () USING PGAA WITH (pgaa.storage_location = 'my-sample-data', pgaa.path = 'tpch_sf_1/lineitem', pgaa.format = 'parquet'); CREATE TABLE nation () USING PGAA WITH (pgaa.storage_location = 'my-sample-data', pgaa.path = 'tpch_sf_1/nation', pgaa.format = 'parquet');
If you specify a directory containing multiple Parquet files, PGAA automatically unions all files into a single table for processing.
Option 2: reading from Iceberg tables
Configure a catalog connection. For example:
SELECT pgaa.add_catalog( 'my_iceberg_catalog', 'iceberg-rest', '{ "url": "https://iceberg.example.com/api/v1", "warehouse": "a1b2c3d4-e5f6-g7h8", "token": "secret_token_abc123" }' );
Create the necessary catalog-managed tables. Alternatively, attach or import the catalog.
CREATE TABLE customer () USING PGAA WITH (pgaa.format = 'iceberg',pgaa.managed_by = 'my_iceberg_catalog',pgaa.catalog_namespace = 'public',pgaa.catalog_table = 'customer'); CREATE TABLE orders () USING PGAA WITH (pgaa.format = 'iceberg',pgaa.managed_by = 'my_iceberg_catalog',pgaa.catalog_namespace = 'public',pgaa.catalog_table = 'orders'); CREATE TABLE lineitem () USING PGAA WITH (pgaa.format = 'iceberg',pgaa.managed_by = 'my_iceberg_catalog',pgaa.catalog_namespace = 'public',pgaa.catalog_table = 'lineitem'); CREATE TABLE nation () USING PGAA WITH (pgaa.format = 'iceberg',pgaa.managed_by = 'my_iceberg_catalog',pgaa.catalog_namespace = 'public',pgaa.catalog_table = 'nation');
See Integrating with Iceberg catalogs for details on how to configure access to tables with an Iceberg catalog.
Running analytical queries
This example query generates a Top 20 Revenue Report by identifying customers who achieved the highest revenue within a specific quarter. By offloading this to Spark, PGAA parallelizes complex conditional aggregations and utilizes predicate pushdown to minimize data transfer from object storage.
SELECT c_custkey, c_name, SUM(l_extendedprice * (1 - l_discount)) AS revenue, c_acctbal, n_name, c_address, c_phone, c_comment FROM customer, orders, lineitem, nation WHERE c_custkey = o_custkey AND l_orderkey = o_orderkey AND o_orderdate >= '1993-10-01' AND o_orderdate < '1994-01-01' AND l_returnflag = 'R' AND c_nationkey = n_nationkey GROUP BY c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment ORDER BY revenue DESC, c_custkey LIMIT 20;
Inspecting the query plan
When Spark is not enabled, PGAA defaults to the Seafowl engine for local vectorized execution. Prepend EXPLAIN to your statement to see the transition from local, single-node execution to a remote, distributed processing model via Spark Connect.
Comparing these two query plans highlights the transition from local, single-node execution to a remote, distributed processing model via Spark Connect:
- Join strategy: Seafowl uses a "nested-loop" approach (
Cross Join+Filter), which forces the database to compare rows sequentially. Spark upgrades this to aBroadcastHashJoin, where small tables are sent to every worker node to process the large fact table in parallel. - Aggregation logic: Instead of a single-node sequential
aggregate, Spark uses a two-stageHashAggregate. This calculates apartial_sumon each individual worker node and then performs anExchange(shuffle) to combine the results, providing horizontal scalability for large-scale data. - Data access: Seafowl performs a standard
TableScanon all available data. Spark uses a metadata-awareBatchScanvia the Iceberg REST Catalog, which reads Iceberg manifest files to skip irrelevant data blocks entirely before they are even pulled from S3.
The Spark plan is a sophisticated, distributed blueprint designed to scale horizontally across a cluster for massive datasets. Conversely, the Seafowl plan is a high-speed, localized blueprint optimized for smaller datasets where the overhead of network latency and cluster coordination would outweigh the actual processing time.