Apache Hive logo
Preview / Apache Hive logo

Background

Dalam sebuah use case, saya mendapati ada beberapa table di Hive yang harus diproses menggunakan PySpark (Hive dan PySpark di sini berada dalam satu cluster yang sama, sehingga harusnya performance bukan merupakan issue) ternyata merupakan view. Penggunaan view ini menjadi masalah karena saya tidak memiliki akses read pada table aslinya. Masalah ini muncul dikarenakan PySpark ternyata tidak mengakses view yang ada di Hive Metastore, melainkan langsung mengakses files pada source table, padahal saya tidak mempunyai akses ke table tersebut. Umumnya, untuk mengatasi masalah seperti ini, yang akan dilakukan adalah menggunakan koneksi JDBC ke HiveServer2 atau melalui Impala JDBC, sehingga PySpark akan mengakses view melalui koneksi JDBC tersebut.

Masalah lain muncul, yaitu dikarenakan satu dan lain hal, performa JDBC melalui HiveServer2 terasa kurang baik. Nah, tulisan berikut adalah sebuah workaround untuk mengatasi drop performance tersebut.

High-level Flow

Let’s revisit how we access Hive tables through JDBC. Umumnya, ketika menggunakan koneksi Hive JDBC pada Spark/PySpark, kita menggunakan method berikut:

df = (spark
    .read
    .format("jdbc")
    .option(...)
    ...
    .load())

Dengan menggunakan method tersebut, Spark akan melempar query ke HiveServer2 dan kemudian fetch data melalui JDBC connection tersebut. Metode ini sangat tidak efisien untuk digunakan pada table berukuran jumbo. Apalagi, menggunakan koneksi JDBC untuk mengakses table Hive yang berada pada satu cluster sebenarnya tidak direkomendasikan (JDBC connection harusnya dipakai untuk querying data antar cluster).

Seorang rekan kemudian menyarankan untuk melakukan CTAS (create table as select) menggunakan koneksi JDBC untuk dumping data dari view menjadi materialized temporary table. Permasalah lain muncul: PySpark tidak punya native method untuk membuat cursor dan execute query ke JDBC (tentu saja masalah ini tidak akan muncul jika kita menggunakan Java atau Scala).

Nah, solusinya adalah dengan menggunakan JVM dari driver Spark job yang kita jalankan. Seperti yang kita tahu, meskipun API-nya menggunakan Python, PySpark tetap berjalan di atas JVM. Maka, kita dapat invoke Java methods melalui JVM dari driver Spark job yang berjalan tersebut.

Implementation

PySpark menggunakan module Py4J untuk berkomunikasi dengan JVM. Untuk mengakses JVM pada PySpark, kita akan menggunakan gateway yang ada pada SparkContext dengan cara berikut (asumsi variabel spark adalah SparkSession yang sedang berjalan):

jvm = spark.sparkContext._gateway.jvm

Kemudian, untuk melakukan query pada JVM, kita harus menggunakan DriverManager untuk membuat koneksi ke JDBC server, membuat statement atau preparedStatement, dan kemudian melakukan execute. Menggunakan Py4J, kita dapat melakukan import DriverManager menggunakan function java_import seperti berikut:

from py4j.java_gateway import java_import
java_import(jvm, "java.sql.Drivermanager")

Kemudian selanjutnya untuk membuat connection dan execute query:

JDBC_URL = "jdbc:hive2://<jdbc_server>:<port>/<schema>"
VIEW_NAME = "<view_name>"
TEMP_TABLE = "<temp_table_name>"

con = jvm.DriverManager.getConnection(JDBC_URL)
stmt = con.createStatement()

query = """
    CREATE TABLE {temp} AS
        SELECT *
        FROM {view}
""".format(
    temp=TEMP_TABLE,
    view=VIEW_NAME
)

stmt.executeUpdate(query)

Done! Materialized temporary table sudah dapat diakses secara normal menggunakan PySpark:

df = spark.table(TEMP_TABLE)

Caveats and Potential Issues

  1. Jika jar JDBC driver berada di HDFS, maka sparkContext tidak akan secara otomatis menambah jar tersebut pada classpath. Workaround untuk issue ini adalah dengan melakukan query ringan terlebih dahulu melalui:

    _df = (spark
        .read
        .format("jdbc")
        .option("dbtable", "(SELECT * FROM <small_table> LIMIT 1) example")
        ...
        .load())
    
  2. Untuk kerberised cluster, JDBC_URL harus disesuaikan dengan AuthMethod yang tersedia pada HiveServer2. Cara untuk membuat koneksi ke kerberised Hive cluster akan dijelaskan pada kesempatan lainnya.


Note: This article was previously posted under https://ramottamado.dev/cara-menggunakan-jvm-pada-pyspark-untuk-melakukan-query-via-jdbc/