org.springframework.integration.kafka.listener
Class AbstractOffsetManager

java.lang.Object
  extended by org.springframework.integration.kafka.listener.AbstractOffsetManager
All Implemented Interfaces:
java.io.Closeable, java.io.Flushable, org.springframework.beans.factory.DisposableBean, OffsetManager
Direct Known Subclasses:
KafkaTopicOffsetManager, MetadataStoreOffsetManager

public abstract class AbstractOffsetManager
extends java.lang.Object
implements OffsetManager, org.springframework.beans.factory.DisposableBean

Base implementation for OffsetManager. Subclasses may customize functionality as necessary.


Field Summary
protected  ConnectionFactory connectionFactory
           
protected  java.lang.String consumerId
           
protected  java.util.Map<Partition,java.lang.Long> initialOffsets
           
protected  org.apache.commons.logging.Log log
           
protected  long referenceTimestamp
           
 
Constructor Summary
AbstractOffsetManager(ConnectionFactory connectionFactory)
           
AbstractOffsetManager(ConnectionFactory connectionFactory, java.util.Map<Partition,java.lang.Long> initialOffsets)
           
 
Method Summary
 void deleteOffset(Partition partition)
          Removes the offset for a given Partition.
 void destroy()
           
protected abstract  java.lang.Long doGetOffset(Partition partition)
           
protected abstract  void doRemoveOffset(Partition partition)
           
protected abstract  void doUpdateOffset(Partition partition, long offset)
           
 java.lang.String getConsumerId()
           
 long getOffset(Partition partition)
          Retrieves the offset for a given Partition
 void resetOffsets(java.util.Collection<Partition> partitionsToReset)
          Resets offsets for the given Partitions.
 void setConsumerId(java.lang.String consumerId)
          The identifier of a consumer of Kafka messages.
 void setReferenceTimestamp(long referenceTimestamp)
          A timestamp to be used for resetting initial offsets
 void updateOffset(Partition partition, long offset)
          Updates the offset for a given Partition
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface java.io.Closeable
close
 
Methods inherited from interface java.io.Flushable
flush
 

Field Detail

log

protected final org.apache.commons.logging.Log log

consumerId

protected java.lang.String consumerId

referenceTimestamp

protected long referenceTimestamp

connectionFactory

protected ConnectionFactory connectionFactory

initialOffsets

protected java.util.Map<Partition,java.lang.Long> initialOffsets
Constructor Detail

AbstractOffsetManager

public AbstractOffsetManager(ConnectionFactory connectionFactory)

AbstractOffsetManager

public AbstractOffsetManager(ConnectionFactory connectionFactory,
                             java.util.Map<Partition,java.lang.Long> initialOffsets)
Method Detail

getConsumerId

public java.lang.String getConsumerId()

setConsumerId

public void setConsumerId(java.lang.String consumerId)
The identifier of a consumer of Kafka messages. Allows to manage offsets separately by consumer

Parameters:
consumerId - the consumer ID

setReferenceTimestamp

public void setReferenceTimestamp(long referenceTimestamp)
A timestamp to be used for resetting initial offsets

Parameters:
referenceTimestamp - the reset timestamp for initial offsets

destroy

public void destroy()
             throws java.lang.Exception
Specified by:
destroy in interface org.springframework.beans.factory.DisposableBean
Throws:
java.lang.Exception

updateOffset

public final void updateOffset(Partition partition,
                               long offset)
Description copied from interface: OffsetManager
Updates the offset for a given Partition

Specified by:
updateOffset in interface OffsetManager
Parameters:
partition - the partition whose offset is to be updated
offset - the new offset value
See Also:
OffsetManager.updateOffset(Partition, long)

getOffset

public final long getOffset(Partition partition)
Description copied from interface: OffsetManager
Retrieves the offset for a given Partition

Specified by:
getOffset in interface OffsetManager
Parameters:
partition - the partition to be
Returns:
the offset value
See Also:
OffsetManager.getOffset(Partition)

resetOffsets

public void resetOffsets(java.util.Collection<Partition> partitionsToReset)
Description copied from interface: OffsetManager
Resets offsets for the given Partitions. To be invoked when the values stored are invalid, so a client cannot resume from that position. Implementations must decide on the best strategy to follow.

Specified by:
resetOffsets in interface OffsetManager
Parameters:
partitionsToReset - to reset

deleteOffset

public void deleteOffset(Partition partition)
Description copied from interface: OffsetManager
Removes the offset for a given Partition. Useful for components that need to clean up after themselves.

Specified by:
deleteOffset in interface OffsetManager
Parameters:
partition - for which to delete the JavaDoc

doUpdateOffset

protected abstract void doUpdateOffset(Partition partition,
                                       long offset)

doRemoveOffset

protected abstract void doRemoveOffset(Partition partition)

doGetOffset

protected abstract java.lang.Long doGetOffset(Partition partition)