Details
-
Improvement
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
None
-
None
Description
Combine transformation is translated into Spark's RDD API in GroupCombineFunctions `combinePerKey` and `combineGlobally` methods. Both methods use byte arrays as intermediate state of aggregation so they can be transferred over network. That leads to serialization and de-serialization of intermediate aggregation value every time new element is added to aggregation. That is unnecessary and should be avoided.
(copied from BEAM-6214 because it explains the approach):
We can do much better by letting accumulators to work on user defined java types and only serialize accumulators when we need to send them over the network.
In order to do this, we need following:
- Acummulator wrapper -> contains transient `T` value + byte payload, that is filled in during serialization
- JavaSerialization: accumulator (beam wrapper) needs to implement Serializable and override writeObject and readObject methods and use beam coder
- KryoSerialization: we need a custom kryo serializer for accumulator wrapper
This should be enough to hook into all possible spark serialization interfaces.
Attachments
Issue Links
- duplicates
-
BEAM-6214 Spark: CombineByKey performance
- Resolved
- links to