Menggunakan PySpark untuk melakukan Hive CTAS via JDBC
Background
Dalam sebuah use case, saya mendapati ada beberapa table di Hive yang harus diproses menggunakan PySpark (Hive dan PySpark di sini berada dalam satu cluster yang sama, sehingga harusnya performance bukan merupakan issue) ternyata merupakan view. Penggunaan view ini menjadi masalah karena saya tidak memiliki akses read pada table aslinya. Masalah ini muncul dikarenakan PySpark ternyata tidak mengakses view yang ada di Hive Metastore, melainkan langsung mengakses files pada source table, padahal saya tidak mempunyai akses ke table tersebut. Umumnya, untuk mengatasi masalah seperti ini, yang akan dilakukan adalah menggunakan koneksi JDBC ke HiveServer2 atau melalui Impala JDBC, sehingga PySpark akan mengakses view melalui koneksi JDBC tersebut.
Masalah lain muncul, yaitu dikarenakan satu dan lain hal, performa JDBC melalui HiveServer2 terasa kurang baik. Nah, tulisan berikut adalah sebuah workaround untuk mengatasi drop performance tersebut.
High-level flow
Let’s revisit how we access Hive tables through JDBC. Umumnya, ketika menggunakan koneksi Hive JDBC pada Spark/PySpark, kita menggunakan method berikut:
df = (spark
.read
.format("jdbc")
.option(...)
...
.load())
Dengan menggunakan method tersebut, Spark akan melempar query ke HiveServer2 dan kemudian fetch data melalui JDBC connection tersebut. Metode ini sangat tidak efisien untuk digunakan pada table berukuran jumbo. Apalagi, menggunakan koneksi JDBC untuk mengakses table Hive yang berada pada satu cluster sebenarnya tidak direkomendasikan (JDBC connection harusnya dipakai untuk querying data antar cluster).
Seorang rekan kemudian menyarankan untuk melakukan CTAS (create table as select) menggunakan koneksi JDBC untuk dumping data dari view menjadi materialized temporary table. Permasalah lain muncul: PySpark tidak punya native method untuk membuat cursor dan execute query ke JDBC (tentu saja masalah ini tidak akan muncul jika kita menggunakan Java atau Scala).
Nah, solusinya adalah dengan menggunakan JVM dari driver Spark job yang kita jalankan. Seperti yang kita tahu, meskipun API-nya menggunakan Python, PySpark tetap berjalan di atas JVM. Maka, kita dapat invoke Java methods melalui JVM dari driver Spark job yang berjalan tersebut.
Implementation
PySpark menggunakan module Py4J untuk berkomunikasi dengan JVM. Untuk mengakses JVM pada PySpark, kita
akan menggunakan gateway yang ada pada SparkContext dengan cara berikut (asumsi variabel spark
adalah
SparkSession yang sedang berjalan):
jvm = spark.sparkContext._gateway.jvm
Kemudian, untuk melakukan query pada JVM, kita harus menggunakan DriverManager
untuk membuat koneksi ke JDBC server,
membuat statement
atau preparedStatement
, dan kemudian melakukan execute
. Menggunakan Py4J, kita dapat
melakukan import DriverManager
menggunakan function java_import
seperti berikut:
from py4j.java_gateway import java_import
java_import(jvm, "java.sql.Drivermanager")
Kemudian selanjutnya untuk membuat connection dan execute query:
JDBC_URL = "jdbc:hive2://<jdbc_server>:<port>/<schema>"
VIEW_NAME = "<view_name>"
TEMP_TABLE = "<temp_table_name>"
con = jvm.DriverManager.getConnection(JDBC_URL)
stmt = con.createStatement()
query = """
CREATE TABLE {temp} AS
SELECT *
FROM {view}
""".format(
temp=TEMP_TABLE,
view=VIEW_NAME
)
stmt.executeUpdate(query)
Done! Materialized temporary table sudah dapat diakses secara normal menggunakan PySpark:
df = spark.table(TEMP_TABLE)
Caveats and potential issues
-
Jika jar JDBC driver berada di HDFS, maka
sparkContext
tidak akan secara otomatis menambah jar tersebut pada classpath. Workaround untuk issue ini adalah dengan melakukan query ringan terlebih dahulu melalui:_df = (spark .read .format("jdbc") .option("dbtable", "(SELECT * FROM <small_table> LIMIT 1) example") ... .load())
-
Untuk kerberised cluster,
JDBC_URL
harus disesuaikan denganAuthMethod
yang tersedia pada HiveServer2. Cara untuk membuat koneksi ke kerberised Hive cluster akan dijelaskan pada kesempatan lainnya.
Note: This article was previously posted under https://ramottamado.dev/cara-menggunakan-jvm-pada-pyspark-untuk-melakukan-query-via-jdbc/