public class KafkaTopicOffsetManager extends AbstractOffsetManager implements org.springframework.beans.factory.InitializingBean
OffsetManager
that uses a Kafka topic as the underlying support.
For its proper functioning, the Kafka server(s) must set log.cleaner.enable=true
. It relies on the property
cleanup.policy=compact
to be set on the target topic, and if the topic is not found,
it will create a topic with the appropriate settings.Modifier and Type | Class and Description |
---|---|
static class |
KafkaTopicOffsetManager.Key
Wraps the partition and consumer information and will be used as a key on the Kafka topic
|
static class |
KafkaTopicOffsetManager.KeySerializerDecoder |
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
CLEANUP_POLICY |
static java.lang.String |
CLEANUP_POLICY_COMPACT |
static java.lang.String |
DELETE_RETENTION |
static java.lang.String |
SEGMENT_BYTES |
connectionFactory, consumerId, highestUpdatedOffsets, initialOffsets, log, referenceTimestamp
Constructor and Description |
---|
KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect,
java.lang.String topic) |
KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect,
java.lang.String topic,
java.util.Map<Partition,java.lang.Long> initialOffsets) |
Modifier and Type | Method and Description |
---|---|
void |
afterPropertiesSet() |
void |
close() |
protected java.lang.Long |
doGetOffset(Partition partition) |
protected void |
doRemoveOffset(Partition partition) |
protected void |
doUpdateOffset(Partition partition,
long offset) |
void |
flush() |
void |
setBatchBytes(int batchBytes)
The maximum batch size in bytes for offset writes
|
void |
setCompressionCodec(ProducerMetadata.CompressionType compressionType)
The compression type for writing to the offset topic
|
void |
setMaxQueueBufferingTime(int maxQueueBufferingTime)
For how long will producers buffer data before writing to the topic
|
void |
setMaxSize(int maxSize)
Sets the maximum size of a fetch request, allowing to tune the initialization process.
|
void |
setReplicationFactor(int replicationFactor)
The replication factor of the offset topic
|
void |
setRequiredAcks(int requiredAcks)
The number of required acks on write operations
|
void |
setRetentionTime(int retentionTime)
How long are dead records retained in the offset topic
|
void |
setSegmentSize(int segmentSize)
The size of a segment in the offset topic
|
deleteOffset, destroy, getConsumerId, getOffset, resetOffsets, setConsumerId, setReferenceTimestamp, updateOffset
public static final java.lang.String CLEANUP_POLICY
public static final java.lang.String CLEANUP_POLICY_COMPACT
public static final java.lang.String DELETE_RETENTION
public static final java.lang.String SEGMENT_BYTES
public KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect, java.lang.String topic)
public KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect, java.lang.String topic, java.util.Map<Partition,java.lang.Long> initialOffsets)
public void setMaxSize(int maxSize)
maxSize
- the maximum amount of data to be brought on a fetchpublic void setCompressionCodec(ProducerMetadata.CompressionType compressionType)
compressionType
- the compression typepublic void setMaxQueueBufferingTime(int maxQueueBufferingTime)
maxQueueBufferingTime
- the maximum buffering window (in milliseconds)public void setSegmentSize(int segmentSize)
segmentSize
- the segment size of an offset topicpublic void setRetentionTime(int retentionTime)
retentionTime
- the retention time for dead records (in seconds)public void setReplicationFactor(int replicationFactor)
replicationFactor
- the replication factorpublic void setBatchBytes(int batchBytes)
batchBytes
- maximum batching windowpublic void setRequiredAcks(int requiredAcks)
requiredAcks
- the number of required ackspublic void afterPropertiesSet() throws java.lang.Exception
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
java.lang.Exception
protected void doUpdateOffset(Partition partition, long offset)
doUpdateOffset
in class AbstractOffsetManager
protected void doRemoveOffset(Partition partition)
doRemoveOffset
in class AbstractOffsetManager
protected java.lang.Long doGetOffset(Partition partition)
doGetOffset
in class AbstractOffsetManager
public void flush() throws java.io.IOException
flush
in interface java.io.Flushable
java.io.IOException
public void close() throws java.io.IOException
close
in interface java.io.Closeable
close
in interface java.lang.AutoCloseable
java.io.IOException