EDB Blog

Hadoop to Postgres – Bridging the Gap

ahsan.hadi_enterprisedb.com's picture
Author: ahsan hadi
02/20/2015

This blog was co-written by Ibrar Ahmed.

Advances in Postgres in recent releases have opened up opportunities for expanding features that support data integration. The most compelling of these new features are called Foreign Data Wrappers (FDWs). FDWs essentially act as a pipeline for data to move to and from Postgres and other kinds of databases as if the different solutions were a single database. EnterpriseDB has developed new FDWs for MongoDB and MySQL. Now, we have released one for Hadoop.

The FDW for Hadoop can be found on EDB’s GitHub page.

Foreign Data Wrappers enable Postgres to act as the central hub, a federated database, in the enterprise. And the features have emerged at a good time as integration becomes more of a challenge. Database administrators (DBAs) have begun using more and more specialized NoSQL-only solutions to address specific problems. Now DBAs need to address getting the data in all of these different solutions to tell one single story, or lend value to one another.

Since EDB released the new FDWs with both read and write capability to the open source PostgreSQL community, usage has grown tremendously. More and more developers are contributing features and fixes.

Now that we’ve established some context for the FDWs, let’s explore the Hadoop FDW in more detail. There are few popular terms worth exploring first:

Hadoop Distributed File System (HDFS) – HDFS is a distributed file system for large data storage and processing. It’s java-based and provides scalable and reliable storage that is designed to span large clusters of commodity servers. The data is stored in flat files and format-free. As a result, it’s used for very large data sets and a typical HDFS file is a GB to TB in size. Applications that run HDFS require streaming access to their data sets.

MapReduce - MapReduce is a programming model and associated implementation for processing and generating large data sets. The model was inspired by the map and reduce functions commonly used in functional programming. MapReduce jobs are written in java and are used for performing statistical analysis, aggregates or other complex processing on large data sets stored in HDFS.

Hive server - The Apache Hive is data warehouse software that facilitates querying and managing large datasets residing in distributed storage i.e. an HDFS. Hive defines a simple query-like language called QL which is used for querying and manipulating large data sets stored in an HDFS. The QL language is similar to SQL and provides similar constructs for retrieving data. Hive server is an optional service that allows remote clients to send requests to HIVE using various programming languages and retrieve results.

Foreign Data Wrapper (FDW) – While we have introduced FDWs already, it’s good to know they are based on Postgres implementation of the SQL/MED (SQL management of external data) specification of the SQL standard. It is a standard way of accessing external data stores ranging from SQL and NoSQL-only databases to flat files. FDWs provide a SQL interface for accessing remote objects and large data objects stored in remote data stores. The FDWs supported by EnterpriseDB are postgres_fdw, oracle_fdw, mongo_fdw, mysql_fdw and now we’re adding HDFS_fdw to the list.

Using the HDFS FDW

The following components need to be installed in order to use the HDFS FDW:

* PostgreSQL or EDB’s Postgres Plus Advanced Server

* Hadoop

* Hive server 1 or Hive server 2

* The HDFS FDW extension

(The HDFS FDW github webpage provides clear instructions on how to set up HDFS FDW and its required components.) Slide1

This is the first version and it supports the following functionality:

1. Support for hive server 1 and hive server 2

The queries will be routed through hive server 1 or hive server 2 to HDFS. (You can run one at a time but not both.) As shown in the above diagram, the HDFS FDW sends queries to hive server and hive server communicates with HDFS, returning the results back to the client. Hive supports a number of custom map reduce jobs that are executed for faster processing of data. The map reduce jobs are called automatically by hive server for aggregates, mathematical operations and other complex operations. Hive server 2 can be seen as a superset of hive server 1 with some additional features like concurrency and security.

As explained above, data is stored in flat files in an HDFS file system. Using the hive client the user needs to create a table, which maps for flat files in HDFS. In the table definition created in hive, the user specifies the data delimiter. The table can be queried in the hive client using a language called QL which is similar to SQL. The working example below shows how this is done.

2. Select functionality

The first version of HDFS FDW was ready only. It provides complete support for SELECT over foreign tables, which map to the corresponding table in hive, which then maps to the corresponding flat file in HDFS. The query clauses, i.e. joins, functions in target list, etc., that can’t be pushed to the remote server are processed locally in PostgreSQL and Postgres Plus Advanced Server.

3. Map Reduce Jobs

Hive server will execute built-in map reduce jobs for aggregates or other complex operation used in the where clause of the select query. Map reduce jobs are used for querying and manipulating large data sets in HDFS. In future releases of HDFS FDW, we will allow the user to write custom map reduce jobs for manipulating and processing data stored in HDFS. The user can trigger the custom map reduce job by using a heavy processing function in the where clause which will kick off the corresponding map reduce job.

4. Push-down

Push-down means pushing down appropriate quals of the query to the remote server for faster processing. Currently the PG FDW machinery only allows where clause and target columns to be pushed down to the remote server.  The HDFS FDW also has the capability of pushing down where clause and target column list to the foreign server.

HDFS_FDW in Action

Here is how it works, for more detailed instructions please visit our github home page of HDFS_FDW.

-- load extension first time after install

CREATE EXTENSION hdfs_fdw;

-- create server object

CREATE SERVER hdfs_server

FOREIGN DATA WRAPPER hdfs_fdw

OPTIONS (host '127.0.0.1');

-- create user mapping

CREATE USER MAPPING FOR postgres

SERVER hdfs_server;

-- create foreign table

CREATE FOREIGN TABLE weblogs

(

client_ip                 text,

full_request_date   text,

day                         text,

Month                    text,

month_num           int,

year                       text,

hour                       text,

minute                   text,

second                  text,

timezone               text,

http_verb              text,

uri                         text,

http_status_code  text,

bytes_returned     text,

referrer                 text,

user_agent           text

)

SERVER hdfs_server

OPTIONS (dbname 'db', table_name 'weblogs');

-- select from table

postgres=# select distinct client_ip IP,count(*) from weblogs group by IP having count(*) > 5000;

ip        | count

-----------------+-------

683.615.622.618 | 13505

14.323.74.653     | 16194

13.53.52.13         |  5494

361.631.17.30     | 64979

363.652.18.65     | 10561

325.87.75.36       |  6498

322.6.648.325     | 13242

325.87.75.336     |  6500

(8 rows)

 

create table premium_ip

(

client_ip text, category text

);  

 

insert into premium_ip values ('683.615.622.618','Category A');

insert into premium_ip values ('14.323.74.653','Category A');

insert into premium_ip values ('13.53.52.13','Category A');

insert into premium_ip values ('361.631.17.30','Category A');

insert into premium_ip values ('361.631.17.30','Category A');

insert into premium_ip values ('325.87.75.336','Category B');

postgres=# select hd.client_ip IP,pr.category,count(hd.client_ip)

from weblogs hd, premium_ip pr

where hd.client_ip = pr.client_ip

and hd.year = '2011'

group by hd.client_ip,pr.category;

ip        |  category  | count

-----------------+------------+-------

14.323.74.653   | Category A |  9459

361.631.17.30   | Category A | 76386

683.615.622.618 | Category A | 11463

13.53.52.13     | Category A |  3288

325.87.75.336   | Category B |  3816

(5 rows)

postgres=# explain verbose select hd.client_ip IP,pr.category,count(hd.client_ip)

from weblogs hd, premium_ip pr

where hd.client_ip = pr.client_ip

and hd.year = '2011'

group by hd.client_ip,pr.category;

QUERY PLAN

---------------------------------------------------------------------------------------------- HashAggregate  (cost=221.40..264.40 rows=4300 width=64)

Output: hd.client_ip, pr.category, count(hd.client_ip)

Group Key: hd.client_ip, pr.category

-]]]]>  Merge Join  (cost=120.35..189.15 rows=4300 width=64)

Output: hd.client_ip, pr.category

Merge Cond: (pr.client_ip = hd.client_ip)

-]]]]>  Sort  (cost=60.52..62.67 rows=860 width=64)

Output: pr.category, pr.client_ip

Sort Key: pr.client_ip

-]]]]>  Seq Scan on public.premium_ip pr  (cost=0.00..18.60 rows=860 width=64)

Output: pr.category, pr.client_ip

-]]]]>  Sort  (cost=59.83..62.33 rows=1000 width=32)

Output: hd.client_ip

Sort Key: hd.client_ip

-]]]]>  Foreign Scan on public.weblogs hd  (cost=100.00..10.00 rows=1000 width=32)

Output: hd.client_ip

Remote SQL: SELECT client_ip FROM weblogs WHERE ((year = '2011’))

Future Enhancements

This is the first release of HDFS_FDW with new read and write capabilities. We are already planning future releases with the following functionality:

  • Support writeability via Hbase
  • Bulk load of data
  • More pushdown features (joins, aggregates, sort etc)
  • Custom Map Reduce Jobs
  • Support for Flum/Impala server
  • Authentication support

So stay tuned for more to come from EDB!

Please visit our site for more information or contact us with any questions.

Ahsan Adhi is Senior Director Product Development at EnterpriseDB.

Abrar Ahmed is a Database Architect at EnterpriseDB.  

SHARE