Spark Technical Debt Deep Dive


How Dangerous is Dangerous Code: The ROI of Fixing Damaged Spark Code

Infrequently I come across Spark code that appears prefer it has been written by a Java developer and it by no means fails to make me wince as a result of it’s a missed alternative to jot down elegant and environment friendly code: it’s verbose, troublesome to learn, and filled with distributed processing anti-patterns.

One such incidence occurred just a few weeks in the past when one among my colleagues was attempting to make some churn evaluation code downloaded from GitHub work.

I used to be in search of some damaged code so as to add a workshop to our Spark Efficiency Tuning class and write a weblog publish about, and this fitted the invoice completely.

For comfort functions I selected to restrict the scope of this train to a selected perform that prepares the information previous to the churn evaluation.

Right here it's in all its wonderful juiciness:

from pyspark.sql.capabilities import udf,col

from pyspark.sql.sorts import IntegerType




def prepare_data_baseline(df):




    '''

    Operate to organize the given dataframe and divid into teams of churn and non churn

    customers whereas returnng the unique datafrme with a brand new label column right into a spark dataframe.

    Args:

        df- the unique dataframe

    Returns:

        df -  dataframe of the dataset with new column of churn added

        stayed -  dataframe of the non -churn consumer's actions solely.

        all_cancelled -  dataframe of the churn consumer's actions solely.

    '''




    #Outline a udf for cancelled

    canceled = udf(lambda x: 1 if x == 'Cancellation Affirmation' else 0)




    #outline a brand new column 'churn' the place 1 signifies cancellation of subscription, 0 in any other case

    df = df.withColumn('Churn', canceled(df.web page))





    #Dataframe of all that cancelled

    cancelled_df = df.choose('web page', 'userId','Churn').the place(col('churn')==1)

    #Checklist of cancelled

    list_cancelled = cancelled_df.choose('userId').distinct().accumulate()#record of cancelled customers




    #Put in a listing format

    gb = []#momentary variable to retailer lists

    for row in list_cancelled:

        gb.append(row[0])

    canc_list = [x for x in gb if x != '']#take away the invalid customers

    #Whole variety of customers who canceled

    print(f"The variety of churned customers is: {len(canc_list)}")




    #Checklist of staying customers

    all_users = df.choose('userId').distinct().accumulate()

    gh = []#a brief variable to retailer all customers




    for row in all_users:

        gh.append(row[0])

    stayed_list = set(gh)-set(gb)#record of customers staying

    stayed_list = [x for x in stayed_list if x != '']#take away the invalid customers




    #Whole variety of customers who didn't cancel

    print(f"The variety of staying customers is: {len(stayed_list)}")




    #Retailer each canceled and staying customers in new dataframes containng all actions they undertook

    all_cancelled = df.choose("*").the place(col('userId').isin(canc_list))

    stayed = df.choose('*').the place(col('userId').isin(stayed_list))




    #Redefine a udf for churn

    churned = udf(lambda x: 0 if x in stayed_list else 1, IntegerType())

    #Creat new column which shall be our label column to trace all customers that ultimately cancelled their subscription

    df = df.withColumn('label', churned(col('userId')))




    return df, stayed, all_cancelled


On this weblog publish, I’ll define the steps I took to repair this code, after which measure the ensuing distinction in execution efficiency. Within the course of, I’ll explicitly state the perfect practices I’ll implement.

Let’s soar on this rabbit gap!

Outline a non-regression take a look at harness

Cease! 

Resist the temptation to begin tweaking the code straight away!

You need to have the ability to: 

  • Just remember to don’t introduce a regression by fixing the code
  • Measure the enhancements when it comes to efficiency

That is the place limiting the scope of the evaluation to a perform got here in useful: it allowed me to make use of advert hoc and easy tooling:

  • I remoted the unique perform in a prepare_data_baseline perform in a separate prepareData_baseline.py file
  • I created a brand new file referred to as prepare_data.py with the brand new model of the prepare_data perform
  • I measured the time to carry out the processing utilizing the time library 
  • And I in contrast the ensuing DataFrames with subtract

As a result of lazy analysis defers the time when the code is definitely executed, I added code that saves the DataFrames to information, thus forcing the materialization of the DataFrames by way of the execution of the code. I additionally added these strains within the scope of the time measurement.

And that is what it appears like:

from pyspark.sql import SparkSession

import time, datetime

from prepareData import prepare_data

from prepareData_baseline import prepare_data_baseline




spark = SparkSession 

    .builder 

    .appName("Churn Evaluation Information Preparation Check Harness") 

    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")




spark.conf.set('spark.sql.adaptive.enabled','false')

print(f"AQE enabled: {spark.conf.get('spark.sql.adaptive.enabled')}")




df = spark.learn.json('information/mini_sparkify_event_data.json')





#Baseline model




process_time_start = time.perf_counter()                   # Begin timer: start processing

df_baseline, stayed_baseline, all_cancelled_baseline = prepare_data_baseline(df)

df_baseline.write.mode("overwrite").json('information/df_baseline')

stayed_baseline.write.mode("overwrite").json('information/stayed_baseline')

all_cancelled_baseline.write.mode("overwrite").json('information/all_cancelled_baseline')

process_time_end = time.perf_counter()                     # Cease timer: finish processing

process_time = process_time_end - process_time_start       # Elapsed time for processing

totalTime = datetime.timedelta(seconds = process_time)




print(f"Getting ready information took with the baseline model took {totalTime}")




#New model




process_time_start = time.perf_counter()                   # Begin timer: start processing

df, stayed, all_cancelled = prepare_data(df)

df.write.mode("overwrite").json('information/df')

stayed.write.mode("overwrite").json('information/stayed')

all_cancelled.write.mode("overwrite").json('information/all_cancelled')

process_time_end = time.perf_counter()                     # Cease timer: finish processing

process_time = process_time_end - process_time_start       # Elapsed time for processing

totalTime = datetime.timedelta(seconds = process_time)




print(f"Getting ready information took with the brand new model took {totalTime}")




# Regression Testing




def diffDataFrame(df1,df2):

    return df1.subtract(df2).depend()




print(f"New processing launched {diffDataFrame(df,df_baseline)} variations in df.")

print(f"New processing launched {diffDataFrame(all_cancelled,all_cancelled_baseline)} variations in all_cancelled.")

print(f"New processing launched {diffDataFrame(stayed,stayed_baseline)} variations in stayed.")




spark.cease()


Retro doc the necessities

This step was fairly straightforward due to the feedback that have been current within the preliminary code.

This perform: 

  • Takes a DataFrame containing actions from customers,
  • splits it into two teams of actions: 
    • actions from customers who ultimately churned and 
    • actions from customers who didn’t, and 
  • provides a “label” column to the enter DataFrame to tag actions that belong to customers that ultimately churned (1 if consumer churned 0 in any other case).

If that sounds suspiciously redundant to you I agree. However let’s desk that difficulty for now; we’ll revisit it as soon as we’re glad with our new model of the code.

Refactor the code

The principle downside of the code is using Python lists to attain the required outcomes. These lists are created by gathering the DataFrames onto the Spark driver the place the for loops shall be processed, making this code not scalable: above a sure variety of customers the driving force reminiscence may grow to be overwhelmed and this system will crash.

Additionally this selection prevents the code from leveraging all of the optimizations that include DataFrames operations.

Then the code makes use of plain Pyspark UDFs for which you incur a efficiency penalty due to the necessity to:

  • Deserialize the Spark DataFrame to its Java illustration
  • Switch the ensuing Java object to the Python course of the place the UDF shall be executed
  • Serialize again the output of the perform to Spark format

Watch out for the price of Pyspark UDFs

There are methods to mitigate these points by utilizing PyArrow and vector UDFs when you actually need to make use of them, however this isn’t a kind of occasions.

First, the perform creates a “Churn” column, which I assume is for comfort functions. A consumer is recognized as “churned” if they’ve been to the “Cancellation Affirmation” web page.

That is achieved with a withColumn name and a UDF.

 #Outline a udf for cancelled     canceled = udf(lambda x: 1 if x == 'Cancellation Affirmation' else 0)     #outline a brand new column 'churn' the place 1 signifies cancellation of subscription, 0 in any other case     df = df.withColumn('Churn', canceled(df.web page))


There isn’t any want for a UDF in that case, these strains of code may be changed by a easy column expression like so:     

#outline a brand new column 'churn' the place 1 signifies cancellation of subscription, 0 in any other case

    df = df.withColumn('Churn', (df.web page == 'Cancellation Affirmation').forged('integer').forged('string'))


I consider the right kind for that new column could be boolean, however for non-regression functions I needed to forged it to a string of 0 or 1.

Then the writer proceeds to create two lists: one for the customers that churned and one for the customers that stayed. Since my objective is to keep away from these lists, I’m going to create the corresponding DataFrames as an alternative:

 all_users = df.choose(df.userId).distinct().the place(df.userId != '')

    churned_users = df.the place(df.Churn == '1').choose(df.userId).distinct().the place(df.userId != '')

    stayed_users = all_users.subtract(churned_users)


First I create a DataFrame of all of the non-empty customers, then the DataFrame of customers that churned, and outline the customers that stayed because the distinction between the 2.

The writer makes use of the awkwardly created lists along with UDFs to create the all_cancelled and stayed DataFrames. Right here is the code for the primary one:

#Checklist of cancelled

    list_cancelled = cancelled_df.choose('userId').distinct().accumulate()#record of cancelled customers




    #Put in a listing format

    gb = []#momentary variable to retailer lists

    for row in list_cancelled:

        gb.append(row[0])

    canc_list = [x for x in gb if x != '']#take away the invalid customers



    all_cancelled = df.choose("*").the place(col('userId').isin(canc_list))
 

I notice now that the “Put in record format” loop might be pointless. 

To create the identical DataFrame I simply do the next:
all_cancelled = df.be part of(churned_users,'userId')

The identical approach is utilized to create the stayed DataFrame:

stayed = df.be part of(stayed_users,'userId')


Final the writer provides the “label” column to the primary DataFrame by utilizing a UDF:

#Redefine a udf for churn

    churned = udf(lambda x: 0 if x in stayed_list else 1, IntegerType())

    #Creat new column which shall be our label column to trace all customers that ultimately cancelled their subscription

    df = df.withColumn('label', churned(col('userId')))
 

As a substitute I simply use a union:

    df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0)))


That triggered a regression as a result of I didn’t embrace the null customers. I’m wondering what use could possibly be made from information with null customers for coaching a mannequin to foretell churn from customers’ habits, however for non-regression functions I added these too:

    empty_users = df.the place(df.userId.isNull())



    #Add empty customers for non regression functions




    df_label = df_label.union(empty_users.withColumn('label',lit(1)))


Final, I additionally needed to reorder the columns of my DataFrames for my easy non-regression checks to achieve success:

 # Type the columns

    columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn','label']

    df_label_sorted = df_label.choose(columns)

    columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn']

    all_cancelled_sorted = all_cancelled.choose(columns)

    stayed_sorted = stayed.choose(columns)


That is my full model of the perform:

from pyspark.sql.capabilities import lit




def prepare_data(df):




    '''

    Operate to organize the given dataframe and divide into teams of churn and non churn

    customers whereas returning the unique DataFrame with a brand new label column right into a spark dataframe.

    Args:

        df- the unique dataframe

    Returns:

        df -  dataframe of the dataset with new column of churn added

        stayed -  dataframe of the non -churn consumer's actions solely.

        all_cancelled -  dataframe of the churn consumer's actions solely.

    '''




    #outline a brand new column 'churn' the place 1 signifies cancellation of subscription, 0 in any other case

    df = df.withColumn('Churn', (df.web page == 'Cancellation Affirmation').forged('integer').forged('string'))




    all_users = df.choose(df.userId).distinct().the place(df.userId != '')

    churned_users = df.the place(df.Churn == '1').choose(df.userId).distinct().the place(df.userId != '')

    stayed_users = all_users.subtract(churned_users)

    empty_users = df.the place(df.userId.isNull())




    #Retailer each canceled and staying customers in new DataFrames containing all actions they undertook




    all_cancelled = df.be part of(churned_users,'userId')

    stayed = df.be part of(stayed_users,'userId')

    df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0)))




    #Add empty customers for non regression functions




    df_label = df_label.union(empty_users.withColumn('label',lit(1)))




    # Type the columns

    columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn','label']

    df_label_sorted = df_label.choose(columns)

    columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn']

    all_cancelled_sorted = all_cancelled.choose(columns)

    stayed_sorted = stayed.choose(columns)




    #Whole variety of customers who canceled

    print(f"The variety of churned customers is: {churned_users.depend()}")

    #Whole variety of customers who didn't cancel

    print(f"The variety of staying customers is: {stayed_users.depend()}")


    return df_label_sorted, stayed_sorted, all_cancelled_sorted


Non regression and efficiency

I used to be capable of confirm that I had not launched any regression in my model of the perform on my desktop with Spark 3.3.

As a way to get significant efficiency measurements I wanted to make use of the total 12G JSON dataset. In any other case, with small information, more often than not is spent on overhead and outcomes range wildly.

So I switched to our CML information service utilizing Spark 3.2 and tailored the code accordingly.

CML makes use of Spark on Kubernetes and the default is dynamic allocation of executors. I needed to disable that to get a secure surroundings and thus, significant measures:

import time, datetime

from prepareData import prepare_data

from prepareData_baseline import prepare_data_baseline

from prepareData_improved import prepare_data_improved

import cml.data_v1 as cmldata

from env import S3_ROOT, S3_HOME, CONNECTION_NAME




conn = cmldata.get_connection(CONNECTION_NAME)

spark = (

            SparkSession.builder.appName(conn.app_name)

            .config("spark.sql.hive.hwc.execution.mode", "spark")

            .config("spark.dynamicAllocation.enabled","false")

            .config("spark.executor.situations", 3)

            .config("spark.executor.reminiscence","32g")

            .config("spark.executor.cores",4)

            .config("spark.yarn.entry.hadoopFileSystems", conn.hive_external_dir)

            .getOrCreate()

        )




spark.sparkContext.setLogLevel("ERROR")

spark.conf.set('spark.sql.adaptive.enabled','true')

print(f"AQE enabled: {spark.conf.get('spark.sql.adaptive.enabled')}")
 

That acquired me the specified outcome:

I then discovered that the total 12G information set contained a corrupt report that I needed to cope with, and whereas I used to be at it I transformed the file to Parquet format to avoid wasting me a while:

Convert early to compressed columnar codecs (Parquet, ORC)

I created a perform that performs the checks to keep away from repetitive code wherein I additionally added calls to setJobGroup and setJobDescription to enhance the readability of the Spark UI:

def measureDataPreparation(df,f,versionName):

  spark.sparkContext.setJobGroup(versionName,"")

  # Begin timer: start processing

  process_time_start = time.perf_counter()                  

  df, stayed, all_cancelled = f(df)

  spark.sparkContext.setJobDescription("Write /information/df")

  df.write.mode("overwrite").json(S3_HOME + '/information/df')

  spark.sparkContext.setJobDescription("Write /information/stayed")

  stayed.write.mode("overwrite").json(S3_HOME + '/information/stayed')

  spark.sparkContext.setJobDescription("Write /information/all_cancelled")

  all_cancelled.write.mode("overwrite").json(S3_HOME + '/information/all_cancelled')

  # Cease timer: finish processing

  process_time_end = time.perf_counter()                    

  # Elapsed time for processing

  process_time = process_time_end - process_time_start      

  totalTime = datetime.timedelta(seconds = process_time)

  print(f"Getting ready information with the {versionName} took {totalTime}")

Use setJobGroup and setJobDescription to enhance readability of the Spark UI

And that is how the Spark UI appears because of this:

Since I had established that I had not launched any regression, I additionally eliminated the regression checks.

Right here is the the related a part of the session’s output:

 

measureDataPreparation(df,prepare_data_baseline,"baseline model")

The variety of churned customers is: 4982

The variety of staying customers is: 17282

Getting ready information with the baseline model took 0:09:11.799036




measureDataPreparation(df,prepare_data,"no regression model"

The variety of churned customers is: 4982

The variety of staying customers is: 17282

Getting ready information with the no regression model took 0:01:48.224514



Nice success! The brand new model is greater than 4 occasions extra environment friendly!

Additional enhancements

Since I now not want to check for non regression I can take away the sorting of the columns.

I can even take away the code that prints the counts of the churned and stayed customers. This code doesn’t belong in a perform that very probably will run unattended in a knowledge pipeline. 

It triggers distributed execution to compute outcomes that no person will see. It needs to be left to the code that calls the perform to log that type of info or not. 

That is additionally an occasion of breaking the next rule:

Take away code that helped debugging with depend(), take() or present() in manufacturing

I checked the remainder of the preliminary code, and after exhaustive information exploration and proper earlier than splitting the information set for coaching functions, the writer does take away the rows with null customers. There isn’t any level in carrying round this further baggage all this time. The truth is this breaks one other rule of massive information processing:

Filter early

Lastly, I eliminated the casting of the “Churn” column and left it as a boolean. I additionally checked that it was not used outdoors of this perform and renamed it “churn” as a result of I hated that uppercase “C” with all the eagerness of a thousand white sizzling blazing suns.

That is the ultimate model of the code:

from pyspark.sql.capabilities import lit




def prepare_data_improved(df):




    '''

    Operate to organize the given DataFrame and divide into teams of churn and non churn

    customers whereas returning the unique DataFrame with a brand new label column right into a Spark DataFrame.

    Args:

        df- the unique DataFrame

    Returns:

        df -  DataFrame of the dataset with new column of churn added

        stayed -  DataFrame of the non -churn consumer's actions solely.

        all_cancelled -  DataFrame of the churn consumer's actions solely.

    '''




    #outline a brand new column 'churn' the place 1 signifies cancellation of subscription, 0 in any other case

    df = df.the place(df.userId != '').withColumn('churn', (df.web page == 'Cancellation Affirmation'))




    all_users = df.choose(df.userId).distinct()

    churned_users = df.the place(df.churn).choose(df.userId).distinct()

    stayed_users = all_users.subtract(churned_users)

 

    #Retailer each canceled and staying customers in new DataFrames containing all actions they undertook




    all_cancelled = df.be part of(churned_users,'userId')

    stayed = df.be part of(stayed_users,'userId')

    df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0)))

 

    return df_label, stayed, all_cancelled

Conclusion

Now that I’ve achieved non regression utilizing DataFrame solely, and that I even have an improved model, I ought to be capable to measure the advantages of utilizing the Spark cache and of the Adaptive Question Execution engine.

Listed below are the total outcomes:

On this restricted experiment, the primary issue that influences the efficiency of the execution is the refactoring of the Spark code to take away the distributed processing anti-patterns. 

Caching the information, enhancing the code additional, or utilizing AQE all deliver marginal enhancements in comparison with the elimination of the technical debt.

The return on funding of coaching is all the time a thorny difficulty due to the problem to conveniently measure it in a spreadsheet however, with this experiment, I hope I’ve proven that the shortage of expertise needs to be a significant concern for any group operating Spark workloads.

In case you’d wish to get hands-on expertise with Spark 3.2, in addition to different instruments and strategies for making your Spark jobs run at peak efficiency, join Cloudera’s Apache Spark Efficiency Tuning course.

In case you want an introduction to AQE kindly discuss with my earlier weblog publish.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles