Closed
Bug 1399625
Opened 8 years ago
Closed 8 years ago
File Sizes should be decreased
Categories
(Data Platform and Tools :: General, enhancement, P1)
Data Platform and Tools
General
Tracking
(Not tracked)
RESOLVED
FIXED
People
(Reporter: frank, Assigned: relud)
References
Details
Attachments
(2 files)
main_summary file sizes are surpassing 1GB; these should be split into 3-4 files each.
Updated•8 years ago
|
Points: --- → 1
Priority: -- → P2
Assignee | ||
Updated•8 years ago
|
Assignee: nobody → dthorn
Assignee | ||
Comment 1•8 years ago
|
||
Assignee | ||
Comment 2•8 years ago
|
||
This is merged, hopefully we will see nicely split files starting with the main_summary build for 2018-01-11 tonight
Status: NEW → ASSIGNED
Priority: P2 → P1
Comment 3•8 years ago
|
||
No such luck - it appears that repartitioning to a small number of partitions (like 4) causes 4 unlucky Spark workers to fail (likely due to having to spill the ~50GB partitions to disk).
I think we're going to have to tackle this a different way.
We could introduce a temporary column for "partition_id" that's effectively "crc32(client_id) % (100 * numFilesPerPartition)", repartition by that, then drop the temp column. That would introduce more small partitions (instead of few large ones), each of which would still fall completely within a single sample_id partition.
Here's a rough test based on Daniel's comments on #363:
https://gist.github.com/mreid-moz/7374496e9d415ca85b42a95b011ac852
Thoughts?
Assignee | ||
Comment 4•8 years ago
|
||
that looks like a great solution. In the specific case of MainSummary doesn't sample_id already exist? If that's an true I think we should base it off of sample_id like this: https://gist.github.com/relud/7b5d9a48823e0d8e140ab94a96b0aa9a
I get the same 396 files, but I use:
> part_id = sample_id * numFilesPerPartition + Random.nextInt(numFilesPerPartition)
Comment 5•8 years ago
|
||
![]() |
Reporter | |
Comment 6•8 years ago
|
||
(In reply to Mark Reid [:mreid] from comment #3)
> No such luck - it appears that repartitioning to a small number of
> partitions (like 4) causes 4 unlucky Spark workers to fail (likely due to
> having to spill the ~50GB partitions to disk).
>
> I think we're going to have to tackle this a different way.
>
> We could introduce a temporary column for "partition_id" that's effectively
> "crc32(client_id) % (100 * numFilesPerPartition)", repartition by that, then
> drop the temp column. That would introduce more small partitions (instead
> of few large ones), each of which would still fall completely within a
> single sample_id partition.
>
> Here's a rough test based on Daniel's comments on #363:
>
> https://gist.github.com/mreid-moz/7374496e9d415ca85b42a95b011ac852
>
> Thoughts?
This seems like a spark bug to me. There is no reason for the data to actually be repartitioned into 4 partitions; the query optimizer should realize that each output partition needs 4 files instead.
Apologies if this has been suggested already, but given there doesn't seem to be a significant performance hit associated with having many partitions and it will, in many cases, improve performance, it might make sense to just let spark create all those partitions after a partitionBy(sample_id), and then coalesce before writing.
Assignee | ||
Comment 8•8 years ago
|
||
as per :sunahsuh on irc, coalesce is function on dataframe, where partitionBy is a function on dataframewriter, so we can't partitionBy and then coalesce.
Comment 9•8 years ago
|
||
(In reply to Frank Bertsch [:frank] from comment #6)
> This seems like a spark bug to me. There is no reason for the data to
> actually be repartitioned into 4 partitions; the query optimizer should
> realize that each output partition needs 4 files instead.
You mean in the failing case (df.repartition(4).write.partitionBy(sampleId)) right? It might be a missed optimization, but it also seems like a bug to rely on the query optimizer to specifically *not* do what you've asked :) It seems worth reporting this as a Spark bug and see if they have a fix or a good suggestion for how to accomplish what we want to do.
(In reply to Daniel Thorn [:relud] from comment #4)
> In the specific case of MainSummary
> doesn't sample_id already exist? If that's an true I think we should base it
> off of sample_id like this:
> https://gist.github.com/relud/7b5d9a48823e0d8e140ab94a96b0aa9a
>
> I get the same 396 files, but I use:
>
> > part_id = sample_id * numFilesPerPartition + Random.nextInt(numFilesPerPartition)
Yep, sample_id already exists in main summary, so using it in this way should work fine (and seems clearer/simpler). Can you confirm that reading the <400 files back contains the correct data? I'm still not sure exactly what to make of the fewer-than-expected outputs.
Assignee | ||
Comment 10•8 years ago
|
||
I found out why we get 396 files. from the repartition(column) docs: https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset@repartition(partitionExprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T]
> Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions. The resulting Dataset is hash partitioned.
which means that it doesn't create a partition for each unique value of (column). it hashes (column) into a number of partitions equal to spark.sql.shuffle.partitions, and that hashing is getting 4 collisions.
if I instead do this, I only get one collision:
> repartition(sampleIdModulus*filesPerPartition, df.col("part_id"))
Assignee | ||
Updated•8 years ago
|
Points: 1 → 3
Comment 11•8 years ago
|
||
Assignee | ||
Comment 12•8 years ago
|
||
was finally able to get the DatasetComparator results for a run against an entire day, PR 368 above doesn't change anything we test for, so it's now merged.
Status: ASSIGNED → RESOLVED
Closed: 8 years ago
Resolution: --- → FIXED
Updated•3 years ago
|
Component: Datasets: Main Summary → General
You need to log in
before you can comment on or make changes to this bug.
Description
•