org.springframework.integration.kafka.listener
Class KafkaTopicOffsetManager

java.lang.Object
  extended by org.springframework.integration.kafka.listener.AbstractOffsetManager
      extended by org.springframework.integration.kafka.listener.KafkaTopicOffsetManager
All Implemented Interfaces:
java.io.Closeable, java.io.Flushable, org.springframework.beans.factory.DisposableBean, org.springframework.beans.factory.InitializingBean, OffsetManager

public class KafkaTopicOffsetManager
extends AbstractOffsetManager
implements org.springframework.beans.factory.InitializingBean

Implementation of an OffsetManager that uses a Kafka topic as the underlying support. For its proper functioning, the Kafka server(s) must set log.cleaner.enable=true. It relies on the property cleanup.policy=compact to be set on the target topic, and if the topic is not found, it will create a topic with the appropriate settings.


Nested Class Summary
static class KafkaTopicOffsetManager.Key
          Wraps the partition and consumer information and will be used as a key on the Kafka topic
static class KafkaTopicOffsetManager.KeyEncoderDecoder
           
 
Field Summary
static java.lang.String CLEANUP_POLICY
           
static java.lang.String CLEANUP_POLICY_COMPACT
           
static java.lang.String DELETE_RETENTION
           
static java.lang.String SEGMENT_BYTES
           
 
Fields inherited from class org.springframework.integration.kafka.listener.AbstractOffsetManager
connectionFactory, consumerId, initialOffsets, log, referenceTimestamp
 
Constructor Summary
KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect, java.lang.String topic)
           
KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect, java.lang.String topic, java.util.Map<Partition,java.lang.Long> initialOffsets)
           
 
Method Summary
 void afterPropertiesSet()
           
 void close()
           
protected  java.lang.Long doGetOffset(Partition partition)
           
protected  void doRemoveOffset(Partition partition)
           
protected  void doUpdateOffset(Partition partition, long offset)
           
 void flush()
           
 void setBatchWrites(boolean batchWrites)
          Whether offset writes should be batched or not
 void setCompressionCodec(java.lang.String compressionCodec)
          The compression codec for writing to the offset topic
 void setMaxBatchSize(int maxBatchSize)
          The maximum batch size for offset writes
 void setMaxQueueBufferingTime(int maxQueueBufferingTime)
          For how long will producers buffer data before writing to the topic
 void setMaxSize(int maxSize)
          Sets the maximum size of a fetch request, allowing to tune the initialization process.
 void setReplicationFactor(int replicationFactor)
          The replication factor of the offset topic
 void setRequiredAcks(int requiredAcks)
          The number of required acks on write operations
 void setRetentionTime(int retentionTime)
          How long are dead records retained in the offset topic
 void setSegmentSize(int segmentSize)
          The size of a segment in the offset topic
 
Methods inherited from class org.springframework.integration.kafka.listener.AbstractOffsetManager
deleteOffset, destroy, getConsumerId, getOffset, resetOffsets, setConsumerId, setReferenceTimestamp, updateOffset
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

CLEANUP_POLICY

public static final java.lang.String CLEANUP_POLICY
See Also:
Constant Field Values

CLEANUP_POLICY_COMPACT

public static final java.lang.String CLEANUP_POLICY_COMPACT
See Also:
Constant Field Values

DELETE_RETENTION

public static final java.lang.String DELETE_RETENTION
See Also:
Constant Field Values

SEGMENT_BYTES

public static final java.lang.String SEGMENT_BYTES
See Also:
Constant Field Values
Constructor Detail

KafkaTopicOffsetManager

public KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect,
                               java.lang.String topic)

KafkaTopicOffsetManager

public KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect,
                               java.lang.String topic,
                               java.util.Map<Partition,java.lang.Long> initialOffsets)
Method Detail

setMaxSize

public void setMaxSize(int maxSize)
Sets the maximum size of a fetch request, allowing to tune the initialization process.

Parameters:
maxSize - the maximum amount of data to be brought on a fetch

setCompressionCodec

public void setCompressionCodec(java.lang.String compressionCodec)
The compression codec for writing to the offset topic

Parameters:
compressionCodec - the compression codec

setMaxQueueBufferingTime

public void setMaxQueueBufferingTime(int maxQueueBufferingTime)
For how long will producers buffer data before writing to the topic

Parameters:
maxQueueBufferingTime - the maximum buffering window (in milliseconds)

setSegmentSize

public void setSegmentSize(int segmentSize)
The size of a segment in the offset topic

Parameters:
segmentSize - the segment size of an offset topic

setRetentionTime

public void setRetentionTime(int retentionTime)
How long are dead records retained in the offset topic

Parameters:
retentionTime - the retention time for dead records (in seconds)

setReplicationFactor

public void setReplicationFactor(int replicationFactor)
The replication factor of the offset topic

Parameters:
replicationFactor - the replication factor

setMaxBatchSize

public void setMaxBatchSize(int maxBatchSize)
The maximum batch size for offset writes

Parameters:
maxBatchSize - maximum batching window

setBatchWrites

public void setBatchWrites(boolean batchWrites)
Whether offset writes should be batched or not

Parameters:
batchWrites - true if writes are batched

setRequiredAcks

public void setRequiredAcks(int requiredAcks)
The number of required acks on write operations

Parameters:
requiredAcks - the number of required acks

afterPropertiesSet

public void afterPropertiesSet()
                        throws java.lang.Exception
Specified by:
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean
Throws:
java.lang.Exception

doUpdateOffset

protected void doUpdateOffset(Partition partition,
                              long offset)
Specified by:
doUpdateOffset in class AbstractOffsetManager

doRemoveOffset

protected void doRemoveOffset(Partition partition)
Specified by:
doRemoveOffset in class AbstractOffsetManager

doGetOffset

protected java.lang.Long doGetOffset(Partition partition)
Specified by:
doGetOffset in class AbstractOffsetManager

flush

public void flush()
           throws java.io.IOException
Specified by:
flush in interface java.io.Flushable
Throws:
java.io.IOException

close

public void close()
           throws java.io.IOException
Specified by:
close in interface java.io.Closeable
Throws:
java.io.IOException