{"id":838531,"date":"2022-04-22T11:49:40","date_gmt":"2022-04-22T18:49:40","guid":{"rendered":"https:\/\/www.microsoft.com\/en-us\/research\/?post_type=msr-blog-post&p=838531"},"modified":"2022-04-25T09:12:44","modified_gmt":"2022-04-25T16:12:44","slug":"getting-deterministic-results-from-sparks-randomsplit-function","status":"publish","type":"msr-blog-post","link":"https:\/\/www.microsoft.com\/en-us\/research\/articles\/getting-deterministic-results-from-sparks-randomsplit-function\/","title":{"rendered":"Getting Deterministic Results from Spark’s randomSplit Function: A Deep Dive"},"content":{"rendered":"\n
<\/p>\n\n\n\n
Tommy Guy<\/a> and Kidus Asfaw<\/p>\n\n\n\n We noticed an odd case of nondeterminism in Spark\u2019s randomSplit function, which is often used to generate test\/train data splits for Machine Learning training scripts. There are other posts, notably\u00a0this one (opens in new tab)<\/span><\/a>\u00a0that diagnose the problem, but there are a few details to spell out. We also want to suggest an alternative to randomSplit that will guarantee determinism.<\/p>\n\n\n\n If you want to split a data set 80\/20 in Spark, you call df.randomSplit([0.80, 0.20], seed) where seed is some integer used to reseed the random number generator. Reseeding a generator is a common way to force determinism. But in this case, it doesn\u2019t work! In some cases (we\u2019ll identify exactly which cases below), randomSplit will:<\/p>\n\n\n\n This feels like a bit of a bait and switch. I feel like any function that accepts a seed is advertising that it should be deterministic: otherwise why bother with the seed at all?<\/p>\n\n\n\n Luckily, there is a way to force randomSplit to be deterministic, and it\u2019s listed in several (opens in new tab)<\/span><\/a> places (opens in new tab)<\/span><\/a> online (opens in new tab)<\/span><\/a>. The trick is to cache the dataframe before invoking randomSplit. This seems straightforward, but it relies on a solid understanding of Spark internals to gain an intuition on when you should be careful. Ultimately, Spark tries hard to force determinism (and more recent Spark versions are even better at this) but they can\u2019t provide 100% assurance that randomSplit will work deterministically. Below, I\u2019m going to suggest a different way to randomly partition<\/strong> that will be deterministic no matter what.<\/p>\n\n\n\n Just as a quick reminder, the way computers produce “random” numbers is actually pseudorandom: they start with some number then iterate in a complicated but deterministic way to produce a stream of numbers that are uncorrelated with each other. In the example below, we assign random numbers to some names, and we show that we can do this repeatably<\/p>\n\n\n\n So, the way to make a deterministic algorithm with a random number generator is to:<\/p>\n\n\n\n Spark makes a distinction between defining<\/em> what to do and executing<\/em> the defined compute. Some expressions on DataFrames are transformations that convert one DataFrame to a new DataFrame while others are actions that execute a sequence of transformations. There are many sources talking about this distinction online, but the original paper (opens in new tab)<\/span><\/a> on Spark is still a really great intro. (Aside: the paper talks about Resilient Distributed Datasets, which are a foundational element that DataFrames use).<\/p>\n\n\n\n If you\u2019ve worked in Spark long at all, you\u2019ve seen this phenomenon. I can execute the following commands in a REPL and they succeed almost immediately no matter how big the data really is:<\/p>\n\n\n\n df = spark.read.parquet(“\/some\/parquet\/file\/pattern*.parquet”)<\/sup><\/p>\n\n\n\n df = df.filter(df[‘amount’] > 4000).filter(df[‘month’] != ‘jan’).show()<\/sub><\/p>\n\n\n\n df2 = spark.read.parquet(“\/someother\/parquet\/file\/pattern*.parquet”)<\/sub><\/p>\n\n\n\n df3 = df.join(df2)<\/sub><\/p>\n\n\n\n That\u2019s because all I\u2019ve done so far is define a set of computations. You can see the plan by trying<\/p>\n\n\n\n df3.explain()<\/sub><\/p>\n\n\n\n But when we execute something like df3.count(), we issue an action. The set of transformations that create df3 execute on Spark workers, and it can take much longer to execute the statement because it blocks on the actual Spark action finishing.<\/p>\n\n\n\n In a normal python script, if you trace the program on a white board, you can basically track the system state line by line. But in a pyspark script, it\u2019s much harder to trace when the “real” work (the actions) take place, or even when and how often they take place.<\/p>\n\n\n\n Ok, so now it\u2019s time to look at the randomSplit function. The actual code (opens in new tab)<\/span><\/a> is below:<\/p>\n\n\n\n This is what it does:<\/p>\n\n\n\n Sample is a transformation: it adds to the DAG of transformations but doesn\u2019t result in an action. In our example of an 80\/20 split, the first call to Sample will use a random generator to assign a value between 0 and 1 to every row, and it will keep rows where the random value is <0.8. The second call will assign new random values to every row and keep rows where the random value is >0.8. This works if and only if the random reassignment is exactly the same in both calls to Sample.<\/p>\n\n\n\n Each of the 2 DataFrames (one with 80% of data and one with 20%) corresponds to a set of transformations. They share the set of steps up to the sample transformation, but those shared steps will execute independently for each random split. This could extend all the way back to data reading, so data would literally be read from disk independently for the 80% sample and the 20% sample. Any other work that happens in the DataFrame before Sample will also run twice.<\/p>\n\n\n\n This all works just fine assuming every step in the upstream DAG deterministically maps data to partitions<\/em>! If everything is deterministic upstream, then all data maps to the same partition every time the script runs, and that data is sorted the same way in randomSplit every time, and the random numbers generated use the same seed and used on the same data row every time. But if something upstream changes the mapping of data to partitions then some rows will end up on different partitions in the execution for the 80% sample than they end up in the 20% sample. To summarize:<\/p>\n\n\n\n What could cause the DataFrame input to randomSplit to be non-deterministic? Here are a few samples:<\/p>\n\n\n\n There used to be a much more nefarious problem in Shuffle (opens in new tab)<\/span><\/a> when used in df.partition(int). Spark did a round robin partitioning, which meant rows were distributed across partitions in a way that depended on the order of data in the original partition. By now, you should see a problem with that approach! In fact, someone filed a bug (opens in new tab)<\/span><\/a> pointing out the same sort of nondeterministic behavior we saw in randomSplit, and it was fixed. The source (opens in new tab)<\/span><\/a> for round robin shuffling now explicitly sorts to ensure rows are handled in a deterministic order.<\/p>\n\n\n\nThe Problem<\/h4>\n\n\n\n
Pseudorandomization: A Reminder<\/h4>\n\n\n\n


Another Reminder: Spark DataFrame definition vs execution<\/h4>\n\n\n\n
randomSplit([0.8, 0.2], seed) creates two DataFrames, and each results in an action<\/h4>\n\n\n\n
<\/figure>\n\n\n\nA Few Workarounds<\/h4>\n\n\n\n