Skip to content

Commit

Permalink
dekaf: fix Fetch encoding issue for some versions of librdkafka
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Oct 11, 2024
1 parent 6a70e9e commit c020a60
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
Authenticated,
};
use anyhow::Context;
use bytes::{BufMut, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use kafka_protocol::{
error::{ParseResponseErrorCode, ResponseError},
indexmap::IndexMap,
Expand Down Expand Up @@ -470,7 +470,10 @@ impl Session {
partition_responses.push(
PartitionData::default()
.with_partition_index(partition_request.partition)
.with_records(batch.to_owned())
// `kafka-protocol` encodes None here using a length of -1, but librdkafka client library
// complains with: `Protocol parse failure for Fetch v11 ... invalid MessageSetSize -1`
// An empty Bytes will get encoded with a length of 0, which works fine.
.with_records(batch.or(Some(Bytes::new())).to_owned())
.with_high_watermark(pending.last_write_head) // Map to kafka cursor.
.with_last_stable_offset(pending.last_write_head),
);
Expand Down

0 comments on commit c020a60

Please sign in to comment.