K
- the key type.V
- the value type.public class KafkaMessageSource<K,V>
extends org.springframework.integration.endpoint.AbstractMessageSource<java.lang.Object>
implements org.springframework.integration.core.Pausable
NOTE: If the application acknowledges messages out of order, the acks will be deferred until all messages prior to the offset are ack'd. If multiple records are retrieved and an earlier offset is requeued, records from the subsequent offsets will be redelivered - even if they were processed successfully. Applications should therefore implement idempotency.
Starting with version 3.1.2, this source implements Pausable
which
allows you to pause and resume the Consumer
. While the consumer is
paused, you must continue to call AbstractMessageSource.receive()
within
max.poll.interval.ms
, to prevent a rebalance.
Modifier and Type | Class and Description |
---|---|
static class |
KafkaMessageSource.KafkaAckCallback<K,V>
AcknowledgmentCallback for Kafka.
|
static class |
KafkaMessageSource.KafkaAckCallbackFactory<K,V>
AcknowledgmentCallbackFactory for KafkaAckInfo.
|
static interface |
KafkaMessageSource.KafkaAckInfo<K,V>
Information for building an KafkaAckCallback.
|
class |
KafkaMessageSource.KafkaAckInfoImpl
Information for building an KafkaAckCallback.
|
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
REMAINING_RECORDS
The number of records remaining from the previous poll.
|
Constructor and Description |
---|
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory,
boolean allowMultiFetch,
java.lang.String... topics)
Construct an instance with the supplied parameters.
|
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory,
KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory,
boolean allowMultiFetch,
java.lang.String... topics)
Construct an instance with the supplied parameters.
|
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory,
KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory,
java.lang.String... topics)
Construct an instance with the supplied parameters.
|
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory,
java.lang.String... topics)
Construct an instance with the supplied parameters.
|
Modifier and Type | Method and Description |
---|---|
protected void |
createConsumer() |
void |
destroy() |
protected java.lang.Object |
doReceive() |
protected java.lang.String |
getClientId() |
protected java.time.Duration |
getCommitTimeout() |
java.lang.String |
getComponentType() |
protected java.lang.String |
getGroupId() |
protected org.springframework.kafka.support.converter.RecordMessageConverter |
getMessageConverter() |
protected java.lang.Class<?> |
getPayloadType() |
protected long |
getPollTimeout() |
protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener |
getRebalanceListener() |
protected boolean |
isRawMessageHeader() |
boolean |
isRunning() |
void |
pause() |
void |
resume() |
void |
setClientId(java.lang.String clientId)
Set the client.id property for the consumer.
|
void |
setCommitTimeout(java.time.Duration commitTimeout)
Set the timeout for commits.
|
void |
setGroupId(java.lang.String groupId)
Set the group.id property for the consumer.
|
void |
setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
Set the message converter to replace the default
MessagingMessageConverter . |
void |
setPayloadType(java.lang.Class<?> payloadType)
Set the payload type.
|
void |
setPollTimeout(long pollTimeout)
Set the pollTimeout for the poll() operations; default 50ms.
|
void |
setRawMessageHeader(boolean rawMessageHeader)
Set to true to include the raw
ConsumerRecord as headers with keys
KafkaHeaders.RAW_DATA and
IntegrationMessageHeaderAccessor.SOURCE_DATA . |
void |
setRebalanceListener(org.apache.kafka.clients.consumer.ConsumerRebalanceListener rebalanceListener)
Set a rebalance listener.
|
void |
start() |
void |
stop() |
buildMessage, getBeanName, getComponentName, getManagedName, getManagedType, getMessageCount, getMessageCountLong, getOverrides, isCountsEnabled, isLoggingEnabled, receive, registerMetricsCaptor, reset, setBeanName, setCountsEnabled, setHeaderExpressions, setLoggingEnabled, setManagedName, setManagedType
afterPropertiesSet, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, getBeanFactory, getEvaluationContext, getEvaluationContext, getMessageBuilderFactory, onInit, setBeanFactory, setConversionService
public static final java.lang.String REMAINING_RECORDS
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, java.lang.String... topics)
consumerFactory
- the consumer factory.topics
- the topics.KafkaMessageSource(ConsumerFactory, KafkaAckCallbackFactory, boolean, String...)
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, boolean allowMultiFetch, java.lang.String... topics)
max.poll.records
to be fetched on each poll. When false
(default) max.poll.records
is coerced to 1 if the consumer factory is a
DefaultKafkaConsumerFactory
or otherwise rejected with an
IllegalArgumentException
. IMPORTANT: When true, you must call
AbstractMessageSource.receive()
at a sufficient rate to consume the number of records received
within max.poll.interval.ms
. When false, you must call AbstractMessageSource.receive()
within max.poll.interval.ms
. pause()
will not take effect until
the records from the previous poll are consumed.consumerFactory
- the consumer factory.allowMultiFetch
- true to allow max.poll.records > 1
.topics
- the topics.public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, java.lang.String... topics)
consumerFactory
- the consumer factory.ackCallbackFactory
- the ack callback factory.topics
- the topics.KafkaMessageSource(ConsumerFactory, KafkaAckCallbackFactory, boolean, String...)
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch, java.lang.String... topics)
max.poll.records
to be fetched on each poll. When false
(default) max.poll.records
is coerced to 1 if the consumer factory is a
DefaultKafkaConsumerFactory
or otherwise rejected with an
IllegalArgumentException
. IMPORTANT: When true, you must call
AbstractMessageSource.receive()
at a sufficient rate to consume the number of records received
within max.poll.interval.ms
. When false, you must call AbstractMessageSource.receive()
within max.poll.interval.ms
. pause()
will not take effect until
the records from the previous poll are consumed.consumerFactory
- the consumer factory.ackCallbackFactory
- the ack callback factory.allowMultiFetch
- true to allow max.poll.records > 1
.topics
- the topics.protected java.lang.String getGroupId()
public void setGroupId(java.lang.String groupId)
groupId
- the group id.protected java.lang.String getClientId()
public void setClientId(java.lang.String clientId)
clientId
- the client id.protected long getPollTimeout()
public void setPollTimeout(long pollTimeout)
pollTimeout
- the poll timeout.protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()
public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
MessagingMessageConverter
.messageConverter
- the converter.protected java.lang.Class<?> getPayloadType()
public void setPayloadType(java.lang.Class<?> payloadType)
payloadType
- the type to convert to.protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()
public void setRebalanceListener(org.apache.kafka.clients.consumer.ConsumerRebalanceListener rebalanceListener)
rebalanceListener
- the rebalance listener.public java.lang.String getComponentType()
getComponentType
in interface org.springframework.integration.support.context.NamedComponent
protected boolean isRawMessageHeader()
public void setRawMessageHeader(boolean rawMessageHeader)
ConsumerRecord
as headers with keys
KafkaHeaders.RAW_DATA
and
IntegrationMessageHeaderAccessor.SOURCE_DATA
. enabling callers to have
access to the record to process errors.rawMessageHeader
- true to include the header.protected java.time.Duration getCommitTimeout()
public void setCommitTimeout(java.time.Duration commitTimeout)
commitTimeout
- the timeout.public boolean isRunning()
isRunning
in interface org.springframework.context.Lifecycle
public void start()
start
in interface org.springframework.context.Lifecycle
public void stop()
stop
in interface org.springframework.context.Lifecycle
public void pause()
pause
in interface org.springframework.integration.core.Pausable
public void resume()
resume
in interface org.springframework.integration.core.Pausable
protected java.lang.Object doReceive()
doReceive
in class org.springframework.integration.endpoint.AbstractMessageSource<java.lang.Object>
protected void createConsumer()
public void destroy()
destroy
in interface org.springframework.beans.factory.DisposableBean
destroy
in interface org.springframework.integration.support.management.IntegrationManagement
destroy
in class org.springframework.integration.endpoint.AbstractMessageSource<java.lang.Object>