Using Apache Hive or Apache Spark v2

You can use the Hadoop Foreign Data Wrapper either through the Apache Hive or the Apache Spark. Both Hive and Spark store metadata in the configured metastore, where databases and tables are created using HiveQL.

Using HDFS FDW with Apache Hive on top of Hadoop

Apache Hive data warehouse software helps with querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time, this language allows traditional map/reduce programmers to plug in their custom mappers and reducers when it's inconvenient or inefficient to express this logic in HiveQL.

You can download the two versions of HiveHiveServer1 and HiveServer2from the Apache Hive website.

Note

The Hadoop Foreign Data Wrapper supports only HiveServer2.

To use HDFS FDW with Apache Hive on top of Hadoop:

  1. Download weblogs_parse and follow the instructions at the Wiki Pentaho website.

  2. Upload the weblog_parse.txt file using these commands:

    hadoop fs -mkdir /weblogs
    hadoop fs -mkdir /weblogs/parse
    hadoop fs -put weblogs_parse.txt /weblogs/parse/part-00000
  3. Start HiveServer, if not already running, using following command:

    $HIVE_HOME/bin/hiveserver2

    or

    $HIVE_HOME/bin/hive --service hiveserver2
  4. Connect to HiveServer2 using the hive beeline client. For example:

    $ beeline
    Beeline version 1.0.1 by Apache Hive
    beeline> !connect jdbc:hive2://localhost:10000/default;auth=noSasl
  5. Create a table in Hive. The example creates a table named weblogs:

    CREATE TABLE weblogs (
        client_ip           STRING,
        full_request_date   STRING,
        day                 STRING,
        month               STRING,
        month_num           INT,
        year                STRING,
        hour                STRING,
        minute              STRING,
        second              STRING,
        timezone            STRING,
        http_verb           STRING,
        uri                 STRING,
        http_status_code    STRING,
        bytes_returned      STRING,
        referrer            STRING,
        user_agent          STRING)
    row format delimited
    fields terminated by '\t';
  6. Load data into the table.

    hadoop fs -cp /weblogs/parse/part-00000 /user/hive/warehouse/weblogs/
  7. Access your data from Postgres. You can now use the weblog table. Once you're connected using psql, follow these steps:

    -- set the GUC variables appropriately, e.g. :
    hdfs_fdw.jvmpath='/home/edb/Projects/hadoop_fdw/jdk1.8.0_111/jre/lib/amd64/server/'
    hdfs_fdw.classpath='/usr/local/edbas/lib/postgresql/HiveJdbcClient-1.0.jar:/home/edb/Projects/hadoop_fdw/hadoop/share/hadoop/common/hadoop-common-2.6.4.jar:/home/edb/Projects/hadoop_fdw/apache-hive-1.0.1-bin/lib/hive-jdbc-1.0.1-standalone.jar'
    
    -- load extension first time after install
    CREATE EXTENSION hdfs_fdw;
    
    -- create server object
    CREATE SERVER hdfs_server
         FOREIGN DATA WRAPPER hdfs_fdw
         OPTIONS (host '127.0.0.1');
    
    -- create user mapping
    CREATE USER MAPPING FOR postgres
    SERVER hdfs_server OPTIONS (username 'hive_username', password 'hive_password');
    
    -- create foreign table
    CREATE FOREIGN TABLE weblogs
    (
     client_ip                TEXT,
     full_request_date        TEXT,
     day                      TEXT,
     Month                    TEXT,
     month_num                INTEGER,
     year                     TEXT,
     hour                     TEXT,
     minute                   TEXT,
     second                   TEXT,
     timezone                 TEXT,
     http_verb                TEXT,
     uri                      TEXT,
     http_status_code         TEXT,
     bytes_returned           TEXT,
     referrer                 TEXT,
     user_agent               TEXT
    )
    SERVER hdfs_server
         OPTIONS (dbname 'default', table_name 'weblogs');
    
    -- select from table
    postgres=# SELECT DISTINCT client_ip IP, count(*)
           FROM weblogs GROUP BY IP HAVING count(*) > 5000 ORDER BY 1;
           ip        | count
    -----------------+-------
     13.53.52.13     |  5494
     14.323.74.653   | 16194
     322.6.648.325   | 13242
     325.87.75.336   |  6500
     325.87.75.36    |  6498
     361.631.17.30   | 64979
     363.652.18.65   | 10561
     683.615.622.618 | 13505
    (8 rows)
    
    -- EXPLAIN output showing WHERE clause being pushed down to remote server.
    EXPLAIN (VERBOSE, COSTS OFF) SELECT client_ip, full_request_date, uri FROM weblogs WHERE http_status_code = 200;
                                                   QUERY PLAN
    ----------------------------------------------------------------------------------------------------------------
     Foreign Scan on public.weblogs
       Output: client_ip, full_request_date, uri
       Remote SQL: SELECT client_ip, full_request_date, uri FROM default.weblogs WHERE ((http_status_code = '200'))
    (3 rows)

Using HDFS FDW with Apache Spark on top of Hadoop

Apache Spark is a general-purpose distributed computing framework that supports a wide variety of use cases. It provides real-time streaming as well as batch processing with speed, ease-of-use, and sophisticated analytics. Spark doesn't provide a storage layer, as it relies on third-party storage providers like Hadoop, HBASE, Cassandra, S3, and so on. Spark integrates seamlessly with Hadoop and can process existing data. Spark SQL is 100% compatible with HiveQL. You can use it to replace Hiveserver2, using Spark Thrift Server.

To use HDFS FDW with Apache Spark on top of Hadoop:

  1. Download and install Apache Spark in local mode.

  2. In the folder $SPARK_HOME/conf, create a file spark-defaults.conf containing the following line:

    spark.sql.warehouse.dir hdfs://localhost:9000/user/hive/warehouse

    By default, Spark uses derby for both the meta data and the data itself (called a warehouse in Spark). To have Spark use Hadoop as a warehouse, add this property.

  3. Start Spark Thrift Server.

    ./start-thriftserver.sh
  4. Make sure Spark Thrift Server is running and writing to a log file.

  5. Create a local file (names.txt) that contains the following entries:

    $ cat /tmp/names.txt
    1,abcd
    2,pqrs
    3,wxyz
    4,a_b_c
    5,p_q_r
    ,
  6. Connect to Spark Thrift Server2 using the Spark beeline client. For example:

    $ beeline
    Beeline version 1.2.1.spark2 by Apache Hive
    beeline> !connect jdbc:hive2://localhost:10000/default;auth=noSasl org.apache.hive.jdbc.HiveDriver
  7. Prepare the sample data on Spark. Run the following commands in the beeline command line tool:

    ./beeline
    Beeline version 1.2.1.spark2 by Apache Hive
    beeline> !connect jdbc:hive2://localhost:10000/default;auth=noSasl org.apache.hive.jdbc.HiveDriver
    Connecting to jdbc:hive2://localhost:10000/default;auth=noSasl
    Enter password for jdbc:hive2://localhost:10000/default;auth=noSasl:
    Connected to: Spark SQL (version 2.1.1)
    Driver: Hive JDBC (version 1.2.1.spark2)
    Transaction isolation: TRANSACTION_REPEATABLE_READ
    0: jdbc:hive2://localhost:10000> create database my_test_db;
    +---------+--+
    | Result  |
    +---------+--+
    +---------+--+
    No rows selected (0.379 seconds)
    0: jdbc:hive2://localhost:10000> use my_test_db;
    +---------+--+
    | Result  |
    +---------+--+
    +---------+--+
    No rows selected (0.03 seconds)
    0: jdbc:hive2://localhost:10000> create table my_names_tab(a int, name string)
                                     row format delimited fields terminated by ' ';
    +---------+--+
    | Result  |
    +---------+--+
    +---------+--+
    No rows selected (0.11 seconds)
    0: jdbc:hive2://localhost:10000>
    
    0: jdbc:hive2://localhost:10000> load data local inpath '/tmp/names.txt'
                                     into table my_names_tab;
    +---------+--+
    | Result  |
    +---------+--+
    +---------+--+
    No rows selected (0.33 seconds)
    0: jdbc:hive2://localhost:10000> select * from my_names_tab;
    +-------+---------+--+
    |   a   |  name   |
    +-------+---------+--+
    | 1     | abcd    |
    | 2     | pqrs    |
    | 3     | wxyz    |
    | 4     | a_b_c   |
    | 5     | p_q_r   |
    | NULL  | NULL    |
    +-------+---------+--+

    The following commands list the corresponding files in Hadoop:

    $ hadoop fs -ls /user/hive/warehouse/
    Found 1 items
    drwxrwxrwx   - org.apache.hive.jdbc.HiveDriver supergroup 0 2020-06-12 17:03 /user/hive/warehouse/my_test_db.db
    
    $ hadoop fs -ls /user/hive/warehouse/my_test_db.db/
    Found 1 items
    drwxrwxrwx   - org.apache.hive.jdbc.HiveDriver supergroup 0 2020-06-12 17:03 /user/hive/warehouse/my_test_db.db/my_names_tab
  8. Access your data from Postgres using psql:

    -- set the GUC variables appropriately, e.g. :
    hdfs_fdw.jvmpath='/home/edb/Projects/hadoop_fdw/jdk1.8.0_111/jre/lib/amd64/server/'
    hdfs_fdw.classpath='/usr/local/edbas/lib/postgresql/HiveJdbcClient-1.0.jar:/home/edb/Projects/hadoop_fdw/hadoop/share/hadoop/common/hadoop-common-2.6.4.jar:/home/edb/Projects/hadoop_fdw/apache-hive-1.0.1-bin/lib/hive-jdbc-1.0.1-standalone.jar'
    
    -- load extension first time after install
    CREATE EXTENSION hdfs_fdw;
    
    -- create server object
    CREATE SERVER hdfs_server
       FOREIGN DATA WRAPPER hdfs_fdw
       OPTIONS (host '127.0.0.1', port '10000', client_type 'spark', auth_type 'NOSASL');
    
    -- create user mapping
    CREATE USER MAPPING FOR postgres
      SERVER hdfs_server OPTIONS (username 'spark_username', password 'spark_password');
    
    -- create foreign table
    CREATE FOREIGN TABLE f_names_tab( a int, name varchar(255)) SERVER hdfs_svr
      OPTIONS (dbname 'testdb', table_name 'my_names_tab');
    
    -- select the data from foreign server
    select * from f_names_tab;
     a |  name
    ---+--------
     1 | abcd
     2 | pqrs
     3 | wxyz
     4 | a_b_c
     5 | p_q_r
     0 |
    (6 rows)
    
    -- EXPLAIN output showing WHERE clause being pushed down to remote server.
    EXPLAIN (verbose, costs off) SELECT name FROM f_names_tab WHERE a > 3;
                                  QUERY PLAN
    --------------------------------------------------------------------------
     Foreign Scan on public.f_names_tab
       Output: name
       Remote SQL: SELECT name FROM my_test_db.my_names_tab WHERE ((a > '3'))
    (3 rows)
Note

This example uses the same port while creating the foreign server because Spark Thrift Server is compatible with Hive Thrift Server. Applications using Hiveserver2 work with Spark except for the behavior of the ANALYZE command and the connection string in the case of NOSASL. We recommend using ALTER SERVER and changing the client_type option if you replace Hive with Spark.