Coverage of pyspark user defined function

Issue #658 closed
Abdeali Kothari
created an issue

I have a case where I have some pyspark codes in my code base and I am trying to test them. When doing that, I find that any python UDF I can with spark does not get covered even though I am running it. Note that I am running it in the local spark mode.

Reproducible example:

def get_new_col(spark, df):
    def myadd(x, y):
        import sys, os
        print("sys.version_info =", sys.version_info)
        print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')})
        x1 = x
        y1 = y
        return str(float(x1) + float(y1))

    spark.udf.register('myadd', myadd)
    return df.selectExpr(['*', 'myadd(x, y) as newcol'])


def run():
    try:
        import findspark
        findspark.init()
    except ImportError:
        pass
    import pyspark
    spark = pyspark.sql.SparkSession.Builder().master("local[2]").getOrCreate()
    df = spark.createDataFrame([
        [1.0, 1.0],
        [1.0, 2.0],
        [1.0, 2.0]
    ], ['x', 'y'])

    outdf = get_new_col(spark, df)
    outdf.show()
    outdf.printSchema()
    assert outdf.columns == (df.columns + ['newcol'])

    spark.stop()


if __name__ == '__main__':
    run()

This says the UDF was not covered even though it did run.

Here are the logs when I run it:

$ coverage run example.py
2018-05-04 14:58:29 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-05-04 14:58:30 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 0:>                                                          (0 + 1) / 1]sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
+---+---+------+
|  x|  y|newcol|
+---+---+------+
|1.0|1.0|   2.0|
|1.0|2.0|   3.0|
|1.0|2.0|   3.0|
+---+---+------+

root
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- newcol: string (nullable = true)

Relevant packages: Python 3.6.4 :: Anaconda, Inc. coverage (4.5.1)

Edit 1: Simplified the reproducible example to remove unittest and pytest.

Comments (11)

  1. Ned Batchelder repo owner

    Thanks for the report. I've never used PySpark. Before I try to reproduce this, what packages do I need to install to be able to run this code? I'm on a Mac, with Python 3.6. Give me the complete details of what I need to do to see the problem.

  2. Abdeali Kothari reporter

    Hm, it may be a bit complicated to setup (spark can get messy to install)

    To reproduce, install:

    • Apache Spark
      • Java
      • Scala
      • xcode dev installations
    • Python 3.x testing stuff
      • pytest
      • pytest-cov
      • coverage

    For Spark you could try: https://medium.freecodecamp.org/installing-scala-and-apache-spark-on-mac-os-837ae57d283f

    I have not had much luck getting it to work with brew though - but my setup is a little more complicated than just spark. It's never worked for me in one shot :P We can talk on gitter or IRC if you like if you run into issues trying to reproduce.

    A quick note if you're not familiar with this system, PySpark uses Py4J which calls internal Java routines. So the df.selectExpr you see it actually calling a Java function internally. And that Java function goes back to call the registered UDF with spark.udf.register().

    Hence the function is definitely running in a different process inside that JVM I believe.

    It's Python Shell > JVM > Python Shell

  3. Abdeali Kothari reporter

    Ned, I am trying to see if I can understand what spark exactly does so we can figure this out. Here are the steps:

    • I open a python shell
    • I import pyspark and create a session/context
    • Spark will now call a Popen() to a bash script.
    • The bash script contains bash some environment variable creation
    • Then it calls a Java jar
    • After this all communication between the Python shell and Java jar are done using Socket communication.
    • Using the socket-communication, the python function get_new_col is sent (serialized by cloudpickle i think) and the serialized-python-function is saved in Java
    • To execute this function, the Java process creates a ProcessBuilder to create a new Python process. And runs the code in this second python process.

    I ran the following in the background:

    while sleep 0.1; do echo date=$(date) py=$(ps aux | grep pytho[n] | wc -l) java=$(ps aux | grep jav[a] | wc -l) cov=$(ps aux | grep coverag[e] | wc -l); done
    

    And verified that the sequence is:

    • Python process created + Coverage process created
    • Java process created
    • Python process created (second)
    • Python process killed (second)
    • Java process killed
    • Python process killed + Coverage process killed

    So, the question I think can boil down to how to make all these python processes use coverage.

    EDIT: The processes are:

    /Users/abdealijk/anaconda3/bin/python /Users/abdealijk/anaconda3/bin/coverage run example.py
    /Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home/bin/java -cp /usr/local/hadoop/spark/conf/:/usr/local/hadoop/spark/jars/*:/usr/local/hadoop/hadoop/etc/hadoop/ -Xmx1g org.apache.spark.deploy.SparkSubmit --conf spark.master=local[1] pyspark-shell
    /Users/abdealijk/anaconda3/bin/python -m pyspark.daemon
    
  4. Abdeali Kothari reporter

    I read over http://coverage.readthedocs.io/en/latest/subprocess.html and tried:

    EDIT: Realized that setting the env variable to 1 was causing some issues as the value is taken as the coverage configuration file to use. export COVERAGE_PROCESS_START= fixed that error but it didnt cover the UDF :(

    What I did:

    • Set the environment variable export COVERAGE_PROCESS_START=
    • Then added /Users/abdealijk/anaconda3/lib/python3.6/site-packages/sitecustomize.py with import coverage; coverage.process_startup()

    But this didnt increase my coverage. Inside the function, when I do print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')}) I can see {'COVERAGE_PROCESS_START': ''} which does seem correct.

    For debugging I even tried:

        def myadd(x, y):
            import coverage
            cov = coverage.Coverage(config_file=None)
            cov.start()
            import sys, os
            print("sys.version_info =", sys.version_info)
            print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')})
            x1 = x
            y1 = y
            return str(float(x1) + float(y1))
    

    but the coverage did not increase.

  5. Ned Batchelder repo owner

    @Abdeali Kothari Thanks for doing all this! One thing that looks wrong to me: the COVERAGE_PROCESS_START environment variable needs to refer to the location of the .covergerc file to use:

    export COVERAGE_PROCESS_START=/path/to/.coveragerc
    

    And you need to create a .pth file in the Python environment that is running the subprocesses.

  6. Abdeali Kothari reporter

    I am running it with my root anaconda - so, I think this is the right one. (Considering it was giving me an error of "invalid config path '1'" when i gave COVERAGE_PROCESS_START=1 - i believe it is the right one.

    And I do not have any .coveragerc file (just default configs)

  7. Abdeali Kothari reporter

    Any thoughts on this Ned ?

    I'm not sure if I'm doing something wrong for the subprocess thing. Or is the subprocess work only if the coverage run's python process creates the subprocess ?

  8. Log in to comment