Spark horror stories from the field - Guenia Izquierdo

Transcript generated with OpenAI Whisper large-v2.

My name is Genia Izquierdo. I am a field engineering manager at Databricks, managing the SSA team. I was trained as a software engineer, but I turned to data engineering and I've really loved it. My framework of choice has been Spark for the past seven years. So here we are today. So today I want to talk about things I've seen people do using the Spark framework to develop their data applications, which have horrified me. And I hope to scare you enough to prevent the world from committing these Spark crimes again. So I will show you how I would have done this differently and what from my very opinionated way, view is the right way. So I've grouped these stories into testing, performance and monitoring groups. So let's dig in. Okay, testing stories first. A while ago, I opened a very important repo to make some changes needed to the utils functions of a very important projects I had just been assigned to. And to my horror, there was no arrow next to the Scala folder under the test folder, which means no tests. So I decided to open the utils file and I see many functions in there. And all of them, of course, were untested. Needless to say, I flipped the table. But then in a different occasion, I encountered myself in a similar project. But this time when I opened the repo, there were tests. So I happily opened the file to see what the test coverage looks like and I find this. What is this? This is only testing that Spark can read data in. Come on, we can do better. There really isn't a need to test the Spark internals. You can be assured that that code has been tested thoroughly. So it is very hard to change production code without knowing the implications of the changes to the already existing code. So how do we make our code less spooky, our future lives easier and our products better? Well, step one, the first step heading in the right direction is modularize your code with particular emphasis around reusable code, non-trivial transformations, UDFs and utility functions. Step two, we write the actual tests. At a minimum, you should write a happy path test per function. Then if you want to go further, focus on what's important for each function you want to test. You can start with covering common corner cases and then checking to see where there are complex arithmetics that could be prone to error. Are there any possibilities of getting into a null pointer error? If one of the conditions is not met and you want to ensure the function returns gracefully, or if you require the output to be of a specific format or schema, et cetera. So there are many things to consider in testing. So when you decide to dive in, and I encourage you to read more about it and actually do dive in, there's a lot of docs online. There's a lot of information online that you can check out, but at the very least, please write those happy path tests. Okay. So next up for my second group of stories, I want to tell you some performance impacting horror stories. For these, I decided to tell you about things I see every day that are just low hanging fruit, very normie, that can have an immediate impact on the performance of your jobs. So keep in mind that these are all simplifications of examples I've seen in production, and also that my recommendations can be applied to exploratory work and analysis, but I'm more interested in helping you put this into action on your production code. Okay, with that said, in order for you to see the horror in the next few examples, let's do a quick refresh of what Spark is and how it works. So under the hood of a Spark driver, we have a driver. The driver is the machine in which the application runs. It is responsible for three main things, maintaining information about the Spark application, responding to the user's program, analyzing, distributing, and scheduling work across the executors. And then we have the worker nodes. The worker node hosts the executor process. It has a fixed number of executors allocated at any point in time. Each executor holds a chunk of data to be processed. This chunk is called a Spark partition. It is a collection of rows that sits on one physical machine in the cluster. Executors are responsible for carrying out the work assigned to the driver, assigned by the driver, sorry. And each executor is responsible for two things. One, execute your code, execute the code that has been assigned by the driver. And second, report the state of the computation back to the driver. In each executor, we have a number of cores, which you can also think of as slots or threads. And Spark parallelizes at two levels. One is splitting the work among the executors. And the other one is the slots in each of them. So each executor has a number of slots and each slot can be assigned a task. In this diagram, for example, some slots have been filled by a task and some slots are open. Now let's look at how Spark executes your program. So using one of these clusters that I just mentioned, Spark processes your data by breaking up a large task into smaller ones and distributing the work among several machines, which are called workers. At the core of every Spark application is the Spark driver program. The secret to Spark's performance is parallelism. Each parallelized action is referred to as a job. The driver converts your Spark application into one or more Spark jobs. And each job is broken down into stages. Stages are created based on what operations can be performed serially or in parallel. And not all Spark operations can happen in a single stage. So they may be divided into multiple stages. So each stage then is broken down into Spark tasks, which are then federated across each Spark executor. And each task maps to a single core and works on a single partition of data. For example, an executor with 16 cores can have 16 tasks running in parallel at any given point. So with that background in mind, that one was for you, Jesse, let's go back to the stories. So one day someone brings to my attention that their job, they just deployed to production, is really slow. So I go in, look at the code, within three seconds, I tell them how to fix it, they do it, their job runs smoothly and they happily move along. So what did I see? So I saw what I've seen so many times now that I just tell people to look for it without even looking at their code. So there's a few things going on in this particular version that I created here for you. So in rows one and two, we load the data. In row five, we filtered the data so that we only look at today's date, good. In row six, we perform a join, in row seven and eight, we group the data and perform an aggregation, fine. In row 10, we print a count of the resulting data frame and whoa, let's go back. We print a count of the resulting data frame and we print that in a production job. So that's really bad, we shouldn't do that. And now why is that bad? Let's unpack Spark a little bit more. In Spark, there are two types of operations. There's transformations and there's actions. Transformations are lazily evaluated, which means that the evaluation of your code is delayed until that result is needed. There are two types of transformations. There's narrow transformations and wide. Narrow transformations are those that you can compute using a single input partition. For example, think of a filter. Wide transformations usually require data to be shuffled or moved between your worker nodes because the operation involves partitions. For example, think of a group by. Your key might be in any of the input partitions. So then the second type of operations are actions. And these are the ones who trigger the physical evaluation of the code written before, which is usually a set of transformations. Here at the bottom, we have some examples of what narrow and wide transformations are as well as actions. And there's a lot more and you can see those in the Spark documentation. So with that, let's take a look at our code again. So we see that we have a single narrow transformation, which is the filter. And then we have two wide transformations, the join and the group by. So, and then we have two actions. We have the count, sorry, we have just single one action, which is the count. When we decide to print a count on line 10, Spark will actually create the execution plan and your code will be run on your input data, which means that data that needs to shuffle for the join and the group by will do so now. This might seem insignificant with a small amount of data or during one of analysis. But when you deploy this code to production, you are adding unnecessary overhead to your job. Some scenarios where this can be particularly bad are if you have really large data sets, of course, or if you are streaming and doing some sort of unnecessary action on every micro-batch, then that adds latency to your stream. So remember, when you go to prod, remove all unnecessary displays and prints that trigger an action. So now the next are two different horror stories, but I've put them together because the resulting anti-pattern is the same. So in the first horror story, someone was wondering why their job seemed to be stuck in a very simple operation. And when I took a look, they were converting their data frame to pandas in order to iterate over the data frame. This is always a red flag. This is always a red flag because you rarely ever need to iterate over a data frame. For the second story, someone thought their job was taking a really long time and it wasn't even supposed to be operating on too much data. Okay, so why are these bad? In both of these cases, all records in the data frame are sent to the driver, which defeats the purpose of having a distributed system since you're no longer executing your code in parallel on multiple cores and machines. So when you're writing your code, make sure you ask yourself whether you need to use either of these approaches or if you can find a Spark idiomatic way of doing what you're trying to do. For example, one option for the first story could be to rewrite your pandas code into Spark to take full advantage of parallelization and remove the iteration. But I understand that you don't always have the option of moving away from pandas. I live in this world too. So if that's the case, I recommend you use the distributed version of pandas available on Spark 3.2 and above. This will allow you to keep your code as is, but take advantage of more parallelization. I wanted to show you a small example of the impact of all three options. So I've run a simple count on this dataset that has over 7 million rows. So when you're running on pandas, it runs in a little bit over 40 seconds. When you use distributed pandas, it runs in 0.59 seconds. And that's pretty good since we barely had any code change. And then if you do the refactoring and you write it in Spark, it returns in 0.12 seconds, which is much better again, but it would require some refactoring. So I'll let those numbers stay on the screen for a second here, and I'll let you draw your own conclusions. Okay, so for the next one, I wanted to talk about collect a little bit more. Let's spot the differences between these two code snippets. So on the left, we first collect the data, and then we aggregate. On the second, we aggregate first, then we collect. So how big is the runtime difference, do you think? Keeping in mind that we are still processing the same data frame that contains over 7 million rows. Well, the first one took 3.72 minutes. The second one took 15.87 seconds. And it makes sense. In the first one, we brought all 7 million rows into the driver, and then we did all the computation in a single machine. Whereas in the second one, we took advantage of all those cores available in the cluster. Then we brought the very small aggregated dataset into the driver. So, sorry, I wrapped that one really quick. So I just wanted to let that sit for a second too, and then move on to the next story. So for this one, it's a story about UDFs. And what makes it extra sad, but horrific for me, is that I was the one who wrote it a long time ago when I was learning Spark and thinking I was a Scala ninja. It made for a very good, bad example of what I was trying to illustrate today. So I decided to let you roast me at a code conference, because where else, if not at a NormConf slide, would I be able to do this? So here we go. Let's not look at the code yet. Don't get hung up on that. First, let's talk about the requirements. The requirements I received was that given a date and an arbitrary day of the week, that denotes the start of the week, not necessarily Sunday or Monday. In this case, it was mostly Wednesdays. We needed to determine what week starting date corresponded to the given date. And why is my code so bad? Well, for starters, the absolutely unnecessary complexity. It makes it harder to maintain, and it makes it harder for anyone onboarding to the code, given that we can write this in a much simpler way. But no, the worst part of it is that we're using a UDF, when Spark native functions would do the work. So how can we rewrite all of this? Well, like that. Let's look at that again. So all of that into this. Okay, so what just happened? We used two Spark functions, dateTub and nextDay. And by doing so, we simplified the code and optimized it, because we removed the UDF. So why should you avoid UDFs when possible? So Spark SQL and DataFrame instructions are compact and optimized for distributing those instructions from the driver to each executor. When we use code, we obfuscate all of that, and that code has to be serialized, sent to the executors, and then deserialized before it can be executed. So it's a black box to Spark optimizers. A friend of mine always says that you should strive to master your tools. And if you work with Spark often, it really pays off to dive into this native functionality. It can make your code much more performant and maintainable, and ensures you don't get roasted in a future conference, at least not for your Spark code. Last, I wanna tell you about a monitoring story. Well, this one happened to me again, that once I deployed a job to production, and used the default cluster configuration, my team had settled upon for production jobs. The job didn't have any issues, but a year after having this job run flawlessly, our infrastructure team was performing an audit, and asked us to review our pipelines to make sure we were using resources effectively. Well, let me tell you, this particular job was not, and I felt really bad for all that year long waste. So to be honest, this is something that happens quite often. We deploy an application to production, and we monitor for instability, we get emails if it fails, we have health checks, that check that the output data looks all right, et cetera. But if the SLAs are met, health checks pass, and no crashes happen, usually we don't review production applications. So today I want to show you a quick monitoring option that is available on Spark, and it's my go-to when I deploy, ever since that time when I deploy a new job, and want to make sure that it's configured correctly. So adjust your eyes, because the name of the tool is Ganglia. Ganglia looks like it's straight out of the 80s, but it does a great job at giving you a first glance utilization overview of your cluster. If you are using Databricks to deploy your Spark jobs, the recommended best practice is that you put each job into its own job cluster. This allows for the cluster to only be up for the time needed for the job to complete, then shut off afterwards. If you're using a shared cluster, either on Databricks or using open source Spark, you will still be able to use Ganglia, only that you won't be able to infer the resource utilization for that particular job. But you would still be able to use Ganglia and determine whether the overall cluster has been provisioned correctly, and then talk to your infra team using some of the tips I'm about to give you. So there are three main parts on this beautiful screen that I wanna focus on. The first one is at the top right, and it's the memory utilization. So let's zoom in. So here, the red line that you see is the total available memory for the cluster at any given time. As you can see, this cluster auto-scaled up and down a few times according to the usage of the cluster. That's why the red line goes up and down. The purple area is the memory actually being used. The green is the data that you have cached, and yellow is available memory. Just looking at this metric alone, if this was my cluster, this cluster seems to be doing all right. Next, we want to look at the CPU utilization at the bottom left. So let's zoom in. Here we see that the percentage, sorry, in this graph, we see the percentage, what percentage of the total available CPU was in use at any given time. As you can see, the most this cluster was used was around Wednesday at 1 a.m. That was probably me running the code to present today. And the CPU utilization was around 70%. So now I wouldn't change the configuration right away. I would probably give it a few other days or a few other runs to collect more data and then decide how much I can downsize this cluster by, at least CPU wise, because as you can see, there are short bursts of utilization, but overall the cluster CPUs is fairly underutilized. So last but not least, we want to look at the cluster network performance. This is a pretty simple one. It just tells us how much data we're bringing to the cluster and how much data we're writing out and how long those operations are taking. So the green line is how much data we've read into the cluster. The blue line is how much data we've written out and the horizontal length, I'm going to call it. For example, if you saw a green line or blue line stretch horizontally for a particular point, that's how long it took to move the data in or out. So this is pretty much it with this one. The main thing to watch out for are those sustained peaks, which will indicate you need to provision a cluster with more network bandwidth, which you or your infrastructure team can do by choosing a different VM type. So that is all I have for today. Thank you all for checking out my talk and enjoy the rest of the conference.