Closed Bug 1399625 Opened 8 years ago Closed 8 years ago

File Sizes should be decreased

Categories

(Data Platform and Tools :: General, enhancement, P1)

enhancement
Points:
3

Tracking

(Not tracked)

RESOLVED FIXED

People

(Reporter: frank, Assigned: relud)

References

Details

Attachments

(2 files)

main_summary file sizes are surpassing 1GB; these should be split into 3-4 files each.
Points: --- → 1
Priority: -- → P2
See Also: → 1428445
Assignee: nobody → dthorn
This is merged, hopefully we will see nicely split files starting with the main_summary build for 2018-01-11 tonight
Status: NEW → ASSIGNED
Priority: P2 → P1
No such luck - it appears that repartitioning to a small number of partitions (like 4) causes 4 unlucky Spark workers to fail (likely due to having to spill the ~50GB partitions to disk). I think we're going to have to tackle this a different way. We could introduce a temporary column for "partition_id" that's effectively "crc32(client_id) % (100 * numFilesPerPartition)", repartition by that, then drop the temp column. That would introduce more small partitions (instead of few large ones), each of which would still fall completely within a single sample_id partition. Here's a rough test based on Daniel's comments on #363: https://gist.github.com/mreid-moz/7374496e9d415ca85b42a95b011ac852 Thoughts?
that looks like a great solution. In the specific case of MainSummary doesn't sample_id already exist? If that's an true I think we should base it off of sample_id like this: https://gist.github.com/relud/7b5d9a48823e0d8e140ab94a96b0aa9a I get the same 396 files, but I use: > part_id = sample_id * numFilesPerPartition + Random.nextInt(numFilesPerPartition)
(In reply to Mark Reid [:mreid] from comment #3) > No such luck - it appears that repartitioning to a small number of > partitions (like 4) causes 4 unlucky Spark workers to fail (likely due to > having to spill the ~50GB partitions to disk). > > I think we're going to have to tackle this a different way. > > We could introduce a temporary column for "partition_id" that's effectively > "crc32(client_id) % (100 * numFilesPerPartition)", repartition by that, then > drop the temp column. That would introduce more small partitions (instead > of few large ones), each of which would still fall completely within a > single sample_id partition. > > Here's a rough test based on Daniel's comments on #363: > > https://gist.github.com/mreid-moz/7374496e9d415ca85b42a95b011ac852 > > Thoughts? This seems like a spark bug to me. There is no reason for the data to actually be repartitioned into 4 partitions; the query optimizer should realize that each output partition needs 4 files instead.
Apologies if this has been suggested already, but given there doesn't seem to be a significant performance hit associated with having many partitions and it will, in many cases, improve performance, it might make sense to just let spark create all those partitions after a partitionBy(sample_id), and then coalesce before writing.
as per :sunahsuh on irc, coalesce is function on dataframe, where partitionBy is a function on dataframewriter, so we can't partitionBy and then coalesce.
(In reply to Frank Bertsch [:frank] from comment #6) > This seems like a spark bug to me. There is no reason for the data to > actually be repartitioned into 4 partitions; the query optimizer should > realize that each output partition needs 4 files instead. You mean in the failing case (df.repartition(4).write.partitionBy(sampleId)) right? It might be a missed optimization, but it also seems like a bug to rely on the query optimizer to specifically *not* do what you've asked :) It seems worth reporting this as a Spark bug and see if they have a fix or a good suggestion for how to accomplish what we want to do. (In reply to Daniel Thorn [:relud] from comment #4) > In the specific case of MainSummary > doesn't sample_id already exist? If that's an true I think we should base it > off of sample_id like this: > https://gist.github.com/relud/7b5d9a48823e0d8e140ab94a96b0aa9a > > I get the same 396 files, but I use: > > > part_id = sample_id * numFilesPerPartition + Random.nextInt(numFilesPerPartition) Yep, sample_id already exists in main summary, so using it in this way should work fine (and seems clearer/simpler). Can you confirm that reading the <400 files back contains the correct data? I'm still not sure exactly what to make of the fewer-than-expected outputs.
I found out why we get 396 files. from the repartition(column) docs: https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset@repartition(partitionExprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T] > Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions. The resulting Dataset is hash partitioned. which means that it doesn't create a partition for each unique value of (column). it hashes (column) into a number of partitions equal to spark.sql.shuffle.partitions, and that hashing is getting 4 collisions. if I instead do this, I only get one collision: > repartition(sampleIdModulus*filesPerPartition, df.col("part_id"))
Points: 1 → 3
Blocks: 1438927
was finally able to get the DatasetComparator results for a run against an entire day, PR 368 above doesn't change anything we test for, so it's now merged.
Status: ASSIGNED → RESOLVED
Closed: 8 years ago
Resolution: --- → FIXED
Component: Datasets: Main Summary → General
You need to log in before you can comment on or make changes to this bug.

Attachment

General

Created:
Updated:
Size: