Spark horror stories from the field - Guenia Izquierdo
Transcript generated with
OpenAI Whisper
large-v2
.
My name is Genia Izquierdo.
I am a field engineering manager at Databricks,
managing the SSA team.
I was trained as a software engineer,
but I turned to data engineering and I've really loved it.
My framework of choice has been Spark
for the past seven years.
So here we are today.
So today I want to talk about things I've seen people do
using the Spark framework to develop
their data applications, which have horrified me.
And I hope to scare you enough to prevent the world
from committing these Spark crimes again.
So I will show you how I would have done this differently
and what from my very opinionated way,
view is the right way.
So I've grouped these stories into testing,
performance and monitoring groups.
So let's dig in.
Okay, testing stories first.
A while ago, I opened a very important repo
to make some changes needed to the utils functions
of a very important projects I had just been assigned to.
And to my horror, there was no arrow next
to the Scala folder under the test folder,
which means no tests.
So I decided to open the utils file
and I see many functions in there.
And all of them, of course, were untested.
Needless to say, I flipped the table.
But then in a different occasion,
I encountered myself in a similar project.
But this time when I opened the repo, there were tests.
So I happily opened the file to see what the test coverage
looks like and I find this.
What is this?
This is only testing that Spark can read data in.
Come on, we can do better.
There really isn't a need to test the Spark internals.
You can be assured that that code has been tested thoroughly.
So it is very hard to change production code
without knowing the implications of the changes
to the already existing code.
So how do we make our code less spooky,
our future lives easier and our products better?
Well, step one, the first step heading
in the right direction is modularize your code
with particular emphasis around reusable code,
non-trivial transformations, UDFs and utility functions.
Step two, we write the actual tests.
At a minimum, you should write
a happy path test per function.
Then if you want to go further,
focus on what's important for each function
you want to test.
You can start with covering common corner cases
and then checking to see where there are complex arithmetics
that could be prone to error.
Are there any possibilities of getting
into a null pointer error?
If one of the conditions is not met
and you want to ensure the function returns gracefully,
or if you require the output to be of a specific format
or schema, et cetera.
So there are many things to consider in testing.
So when you decide to dive in,
and I encourage you to read more about it
and actually do dive in, there's a lot of docs online.
There's a lot of information online that you can check out,
but at the very least,
please write those happy path tests.
Okay.
So next up for my second group of stories,
I want to tell you some performance impacting horror stories.
For these, I decided to tell you about
things I see every day that are just low hanging fruit,
very normie, that can have an immediate impact
on the performance of your jobs.
So keep in mind that these are all simplifications
of examples I've seen in production,
and also that my recommendations can be applied
to exploratory work and analysis,
but I'm more interested in helping you put this into action
on your production code.
Okay, with that said,
in order for you to see the horror in the next few examples,
let's do a quick refresh of what Spark is and how it works.
So under the hood of a Spark driver, we have a driver.
The driver is the machine in which the application runs.
It is responsible for three main things,
maintaining information about the Spark application,
responding to the user's program,
analyzing, distributing,
and scheduling work across the executors.
And then we have the worker nodes.
The worker node hosts the executor process.
It has a fixed number of executors
allocated at any point in time.
Each executor holds a chunk of data to be processed.
This chunk is called a Spark partition.
It is a collection of rows
that sits on one physical machine in the cluster.
Executors are responsible for carrying out the work
assigned to the driver, assigned by the driver, sorry.
And each executor is responsible for two things.
One, execute your code,
execute the code that has been assigned by the driver.
And second, report the state of the computation
back to the driver.
In each executor, we have a number of cores,
which you can also think of as slots or threads.
And Spark parallelizes at two levels.
One is splitting the work among the executors.
And the other one is the slots in each of them.
So each executor has a number of slots
and each slot can be assigned a task.
In this diagram, for example,
some slots have been filled by a task
and some slots are open.
Now let's look at how Spark executes your program.
So using one of these clusters that I just mentioned,
Spark processes your data by breaking up a large task
into smaller ones and distributing the work
among several machines, which are called workers.
At the core of every Spark application
is the Spark driver program.
The secret to Spark's performance is parallelism.
Each parallelized action is referred to as a job.
The driver converts your Spark application
into one or more Spark jobs.
And each job is broken down into stages.
Stages are created based on what operations
can be performed serially or in parallel.
And not all Spark operations can happen in a single stage.
So they may be divided into multiple stages.
So each stage then is broken down into Spark tasks,
which are then federated across each Spark executor.
And each task maps to a single core
and works on a single partition of data.
For example, an executor with 16 cores can have 16 tasks
running in parallel at any given point.
So with that background in mind,
that one was for you, Jesse,
let's go back to the stories.
So one day someone brings to my attention
that their job, they just deployed to production,
is really slow.
So I go in, look at the code, within three seconds,
I tell them how to fix it, they do it,
their job runs smoothly and they happily move along.
So what did I see?
So I saw what I've seen so many times now
that I just tell people to look for it
without even looking at their code.
So there's a few things going on in this particular version
that I created here for you.
So in rows one and two, we load the data.
In row five, we filtered the data
so that we only look at today's date, good.
In row six, we perform a join,
in row seven and eight, we group the data
and perform an aggregation, fine.
In row 10, we print a count of the resulting data frame
and whoa, let's go back.
We print a count of the resulting data frame
and we print that in a production job.
So that's really bad, we shouldn't do that.
And now why is that bad?
Let's unpack Spark a little bit more.
In Spark, there are two types of operations.
There's transformations and there's actions.
Transformations are lazily evaluated,
which means that the evaluation of your code is delayed
until that result is needed.
There are two types of transformations.
There's narrow transformations and wide.
Narrow transformations are those that you can compute
using a single input partition.
For example, think of a filter.
Wide transformations usually require data to be shuffled
or moved between your worker nodes
because the operation involves partitions.
For example, think of a group by.
Your key might be in any of the input partitions.
So then the second type of operations are actions.
And these are the ones who trigger the physical evaluation
of the code written before,
which is usually a set of transformations.
Here at the bottom, we have some examples
of what narrow and wide transformations are
as well as actions.
And there's a lot more
and you can see those in the Spark documentation.
So with that, let's take a look at our code again.
So we see that we have a single narrow transformation,
which is the filter.
And then we have two wide transformations,
the join and the group by.
So, and then we have two actions.
We have the count, sorry,
we have just single one action, which is the count.
When we decide to print a count on line 10,
Spark will actually create the execution plan
and your code will be run on your input data,
which means that data that needs to shuffle for the join
and the group by will do so now.
This might seem insignificant with a small amount of data
or during one of analysis.
But when you deploy this code to production,
you are adding unnecessary overhead to your job.
Some scenarios where this can be particularly bad
are if you have really large data sets, of course,
or if you are streaming
and doing some sort of unnecessary action
on every micro-batch,
then that adds latency to your stream.
So remember, when you go to prod,
remove all unnecessary displays
and prints that trigger an action.
So now the next are two different horror stories,
but I've put them together
because the resulting anti-pattern is the same.
So in the first horror story,
someone was wondering why their job seemed to be stuck
in a very simple operation.
And when I took a look,
they were converting their data frame to pandas
in order to iterate over the data frame.
This is always a red flag.
This is always a red flag
because you rarely ever need to iterate over a data frame.
For the second story,
someone thought their job was taking a really long time
and it wasn't even supposed to be operating
on too much data.
Okay, so why are these bad?
In both of these cases,
all records in the data frame are sent to the driver,
which defeats the purpose of having a distributed system
since you're no longer executing your code
in parallel on multiple cores and machines.
So when you're writing your code,
make sure you ask yourself
whether you need to use either of these approaches
or if you can find a Spark idiomatic way
of doing what you're trying to do.
For example, one option for the first story
could be to rewrite your pandas code into Spark
to take full advantage of parallelization
and remove the iteration.
But I understand that you don't always have the option
of moving away from pandas.
I live in this world too.
So if that's the case,
I recommend you use the distributed version of pandas
available on Spark 3.2 and above.
This will allow you to keep your code as is,
but take advantage of more parallelization.
I wanted to show you a small example
of the impact of all three options.
So I've run a simple count on this dataset
that has over 7 million rows.
So when you're running on pandas,
it runs in a little bit over 40 seconds.
When you use distributed pandas,
it runs in 0.59 seconds.
And that's pretty good since we barely had any code change.
And then if you do the refactoring
and you write it in Spark,
it returns in 0.12 seconds,
which is much better again,
but it would require some refactoring.
So I'll let those numbers stay on the screen
for a second here,
and I'll let you draw your own conclusions.
Okay, so for the next one,
I wanted to talk about collect a little bit more.
Let's spot the differences between these two code snippets.
So on the left, we first collect the data,
and then we aggregate.
On the second, we aggregate first, then we collect.
So how big is the runtime difference, do you think?
Keeping in mind that we are still processing
the same data frame that contains over 7 million rows.
Well, the first one took 3.72 minutes.
The second one took 15.87 seconds.
And it makes sense.
In the first one,
we brought all 7 million rows into the driver,
and then we did all the computation in a single machine.
Whereas in the second one,
we took advantage of all those cores
available in the cluster.
Then we brought the very small aggregated dataset
into the driver.
So, sorry, I wrapped that one really quick.
So I just wanted to let that sit for a second too,
and then move on to the next story.
So for this one, it's a story about UDFs.
And what makes it extra sad, but horrific for me,
is that I was the one who wrote it a long time ago
when I was learning Spark and thinking I was a Scala ninja.
It made for a very good, bad example
of what I was trying to illustrate today.
So I decided to let you roast me at a code conference,
because where else, if not at a NormConf slide,
would I be able to do this?
So here we go.
Let's not look at the code yet.
Don't get hung up on that.
First, let's talk about the requirements.
The requirements I received was that given a date
and an arbitrary day of the week,
that denotes the start of the week,
not necessarily Sunday or Monday.
In this case, it was mostly Wednesdays.
We needed to determine what week starting date
corresponded to the given date.
And why is my code so bad?
Well, for starters, the absolutely unnecessary complexity.
It makes it harder to maintain,
and it makes it harder for anyone onboarding to the code,
given that we can write this in a much simpler way.
But no, the worst part of it is that we're using a UDF,
when Spark native functions would do the work.
So how can we rewrite all of this?
Well, like that.
Let's look at that again.
So all of that into this.
Okay, so what just happened?
We used two Spark functions, dateTub and nextDay.
And by doing so, we simplified the code and optimized it,
because we removed the UDF.
So why should you avoid UDFs when possible?
So Spark SQL and DataFrame instructions are compact
and optimized for distributing those instructions
from the driver to each executor.
When we use code, we obfuscate all of that,
and that code has to be serialized,
sent to the executors,
and then deserialized before it can be executed.
So it's a black box to Spark optimizers.
A friend of mine always says
that you should strive to master your tools.
And if you work with Spark often,
it really pays off to dive into this native functionality.
It can make your code much more performant and maintainable,
and ensures you don't get roasted in a future conference,
at least not for your Spark code.
Last, I wanna tell you about a monitoring story.
Well, this one happened to me again,
that once I deployed a job to production,
and used the default cluster configuration,
my team had settled upon for production jobs.
The job didn't have any issues,
but a year after having this job run flawlessly,
our infrastructure team was performing an audit,
and asked us to review our pipelines
to make sure we were using resources effectively.
Well, let me tell you,
this particular job was not, and I felt really bad
for all that year long waste.
So to be honest,
this is something that happens quite often.
We deploy an application to production,
and we monitor for instability,
we get emails if it fails, we have health checks,
that check that the output data looks all right, et cetera.
But if the SLAs are met, health checks pass,
and no crashes happen,
usually we don't review production applications.
So today I want to show you a quick monitoring option
that is available on Spark,
and it's my go-to when I deploy,
ever since that time when I deploy a new job,
and want to make sure that it's configured correctly.
So adjust your eyes,
because the name of the tool is Ganglia.
Ganglia looks like it's straight out of the 80s,
but it does a great job
at giving you a first glance utilization overview
of your cluster.
If you are using Databricks to deploy your Spark jobs,
the recommended best practice
is that you put each job into its own job cluster.
This allows for the cluster to only be up
for the time needed for the job to complete,
then shut off afterwards.
If you're using a shared cluster,
either on Databricks or using open source Spark,
you will still be able to use Ganglia,
only that you won't be able to infer
the resource utilization for that particular job.
But you would still be able to use Ganglia
and determine whether the overall cluster
has been provisioned correctly,
and then talk to your infra team
using some of the tips I'm about to give you.
So there are three main parts on this beautiful screen
that I wanna focus on.
The first one is at the top right,
and it's the memory utilization.
So let's zoom in.
So here, the red line that you see
is the total available memory for the cluster
at any given time.
As you can see, this cluster auto-scaled up and down
a few times according to the usage of the cluster.
That's why the red line goes up and down.
The purple area is the memory actually being used.
The green is the data that you have cached,
and yellow is available memory.
Just looking at this metric alone,
if this was my cluster,
this cluster seems to be doing all right.
Next, we want to look at the CPU utilization
at the bottom left.
So let's zoom in.
Here we see that the percentage,
sorry, in this graph, we see the percentage,
what percentage of the total available CPU
was in use at any given time.
As you can see, the most this cluster was used
was around Wednesday at 1 a.m.
That was probably me running the code to present today.
And the CPU utilization was around 70%.
So now I wouldn't change the configuration right away.
I would probably give it a few other days
or a few other runs to collect more data
and then decide how much I can downsize this cluster by,
at least CPU wise, because as you can see,
there are short bursts of utilization,
but overall the cluster CPUs is fairly underutilized.
So last but not least,
we want to look at the cluster network performance.
This is a pretty simple one.
It just tells us how much data we're bringing to the cluster
and how much data we're writing out
and how long those operations are taking.
So the green line is how much data we've read into the cluster.
The blue line is how much data we've written out
and the horizontal length, I'm going to call it.
For example, if you saw a green line
or blue line stretch horizontally for a particular point,
that's how long it took to move the data in or out.
So this is pretty much it with this one.
The main thing to watch out for are those sustained peaks,
which will indicate you need to provision a cluster
with more network bandwidth,
which you or your infrastructure team can do
by choosing a different VM type.
So that is all I have for today.
Thank you all for checking out my talk
and enjoy the rest of the conference.