When writing to a Kafka topic, `KafkaWriter` does not support selecting the ouput kafka partition through a DataFrame column.
While it is possible to configure a custom Kafka Partitioner with
`.option("kafka.partitioner.class", "my.custom.Partitioner")`, this is not enough for certain use cases.
After the introduction of GDPR, it is a common pattern to emit records with unique Kafka keys, thus allowing to tombstone individual records.
This strategy implies that the totality of the key information cannot be used to calculate the topic partition and users need to resort to custom partitioners.
However, as stated at https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations, "Keys/Values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize keys/values into either strings or byte arrays."
Therefore, a custom partitioner would need to
- deserialize the key (or value)
- calculate the output partition using a subset of the key (or value) fields
This is inefficient because it requires an unnecessary deserialization step. It also makes it impossible to use Spark batch writer to send Kafka tombstones when the partition is calculated from a subset of the kafka value.
It would be a nice addition to let the user choose a partition by setting a value in the "partition" column of the dataframe, as already done for `topic`, `key`, `value`, and `headers` in `KafkaWriter`, also mirroring the `ProducerRecord` API.