Apache Spark logo
Preview / Apache Spark logo

Are you frustrated by excessive logging in PySpark? Do you feel those logs are not useful in any way? Do you wish to log things your way when you encounter errors in your data pipelines? Then hopefully this article will help you write your own logs in PySpark.

Disabling Excessive Logs

As PySpark runs the process in JVM, the logs are Log4j logs. This means we can programmatically set the log level in our app. To do this, we will need to access the underlying JVM gateway and then looks for the Log4j class. The code will look like this:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("My Data Pipeline")
    .getOrCreate())

log4j = spark._jvm.org.apache.log4j # noqa

To disable Log4j logs for all classes with the prefix org (like org.apache.spark) we need to add this line of code:

log4j.LogManager.getLogger("org").setLevel(log4j.Level.OFF)

This will tell the Log4j log manager to set the logging level of classes with the org prefix to OFF. If you are not comfortable with turning off the logs completely, you can use Level.ERROR or Level.FATAL instead.

Creating Custom Log4j Logger

To create a custom Log4j logger, this code will do the trick:

my_logger = log4j.LogManager.getLogger("my_logger")
my_logger.setLevel(...)

And to use it, every time you want to log something you just need to do this:

# DEBUG logging
my_logger.debug("This type of logs should be enabled explicitly.")

# INFO logging
my_logger.info("Trying to write to database...")

# WARN logging
my_logger.warn("Cannot find database 'boombox', will use 'default' instead.")

# ERROR logging
my_logger.error("This should not happen.")

# FATAL logging
my_logger.fatal("Your cluster has been hit by cosmic radiation.")

Creating Custom Logger by Class Name

To create a custom logger for each class you have, you will need to make a custom class like this1:

class LoggerProvider:
    """
    Custom Logger
    """

    def get_logger(self, log4j):
        return log4j.LogManager.getLogger(
            self.__full_name__()
        )

    def __full_name__(self):
        klass = self.__class__
        module = klass.__module__

        if module == "__builtin__":
            return klass.__name__ # avoid outputs like '__builtin__.str'

        return module + "." + klass.__name__

Then, your classes will need to extend the CustomLogger class like this:

class MyPipeline(LoggerProvider):
    """
    Docstring
    """

    def __init__(self):
        log4j = spark._jvm.org.apache.log4j # noqa
        self.logger = self.get_logger(log4j)
        self.logger.setLevel(...)

    def run(self):
        self.logger.info("Running")

And that’s how to create custom logging in PySpark.