K
- the key type.V
- the value type.public class KafkaProducerMessageHandler<K,V>
extends org.springframework.integration.handler.AbstractReplyProducingMessageHandler
implements org.springframework.context.Lifecycle
ReplyingKafkaTemplate
it is used as the handler in an outbound gateway. When supplied with a simple
KafkaTemplate
it used as the handler in an outbound channel adapter.messagingTemplate
EXPRESSION_PARSER, logger
Constructor and Description |
---|
KafkaProducerMessageHandler(org.springframework.kafka.core.KafkaTemplate<K,V> kafkaTemplate) |
Modifier and Type | Method and Description |
---|---|
protected void |
doInit() |
java.lang.String |
getComponentType() |
org.springframework.kafka.core.KafkaTemplate<?,?> |
getKafkaTemplate() |
protected org.springframework.messaging.MessageChannel |
getSendFailureChannel() |
protected org.springframework.messaging.MessageChannel |
getSendSuccessChannel() |
protected java.lang.Object |
handleRequestMessage(org.springframework.messaging.Message<?> message) |
boolean |
isRunning() |
void |
processSendResult(org.springframework.messaging.Message<?> message,
org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord,
org.springframework.util.concurrent.ListenableFuture<org.springframework.kafka.support.SendResult<K,V>> future,
org.springframework.messaging.MessageChannel metadataChannel) |
void |
setErrorMessageStrategy(org.springframework.integration.support.ErrorMessageStrategy errorMessageStrategy)
Set the error message strategy implementation to use when sending error messages after
send failures.
|
void |
setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper)
Set the header mapper to use.
|
void |
setMessageKeyExpression(org.springframework.expression.Expression messageKeyExpression) |
void |
setPartitionIdExpression(org.springframework.expression.Expression partitionIdExpression) |
void |
setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
Set a message converter for gateway replies.
|
void |
setReplyPayloadType(java.lang.reflect.Type payloadType)
When using a type-aware message converter (such as
StringJsonMessageConverter ,
set the payload type the converter should create. |
void |
setSendFailureChannel(org.springframework.messaging.MessageChannel sendFailureChannel)
Set the failure channel.
|
void |
setSendFailureChannelName(java.lang.String sendFailureChannelName)
Set the failure channel name.
|
void |
setSendSuccessChannel(org.springframework.messaging.MessageChannel sendSuccessChannel)
Set the success channel.
|
void |
setSendSuccessChannelName(java.lang.String sendSuccessChannelName)
Set the Success channel name.
|
void |
setSendTimeout(long sendTimeout)
Specify a timeout in milliseconds for how long this
KafkaProducerMessageHandler should wait wait for send operation
results. |
void |
setSendTimeoutExpression(org.springframework.expression.Expression sendTimeoutExpression)
Specify a SpEL expression to evaluate a timeout in milliseconds for how long this
KafkaProducerMessageHandler should wait wait for send operation
results. |
void |
setSync(boolean sync)
A
boolean indicating if the KafkaProducerMessageHandler
should wait for the send operation results or not. |
void |
setTimestampExpression(org.springframework.expression.Expression timestampExpression)
Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.
|
void |
setTopicExpression(org.springframework.expression.Expression topicExpression) |
void |
start() |
void |
stop() |
doInvokeAdvisedRequestHandler, getBeanClassLoader, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReply
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeaders
configureMetrics, destroy, getActiveCount, getActiveCountLong, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMetricsCaptor, getMinDuration, getOrder, getOverrides, getStandardDeviationDuration, handleMessage, isCountsEnabled, isLoggingEnabled, isStatsEnabled, onComplete, onError, onNext, onSubscribe, registerMetricsCaptor, reset, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabled
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
errorCount, handleCount
public void setTopicExpression(org.springframework.expression.Expression topicExpression)
public void setMessageKeyExpression(org.springframework.expression.Expression messageKeyExpression)
public void setPartitionIdExpression(org.springframework.expression.Expression partitionIdExpression)
public void setTimestampExpression(org.springframework.expression.Expression timestampExpression)
Long
type representing epoch time in milliseconds.timestampExpression
- the Expression
for timestamp to wait for result
fo send operation.public void setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper)
headerMapper
- the mapper; can be null to disable header mapping.public org.springframework.kafka.core.KafkaTemplate<?,?> getKafkaTemplate()
public void setSync(boolean sync)
boolean
indicating if the KafkaProducerMessageHandler
should wait for the send operation results or not. Defaults to false
.
In sync
mode a downstream send operation exception will be re-thrown.sync
- the send mode; async by default.public void setSendTimeout(long sendTimeout)
KafkaProducerMessageHandler
should wait wait for send operation
results. Defaults to 10 seconds. The timeout is applied only in sync
mode.
Also applies when sending to the success or failure channels.setSendTimeout
in class org.springframework.integration.handler.AbstractMessageProducingHandler
sendTimeout
- the timeout to wait for result fo send operation.public void setSendTimeoutExpression(org.springframework.expression.Expression sendTimeoutExpression)
KafkaProducerMessageHandler
should wait wait for send operation
results. Defaults to 10 seconds. The timeout is applied only in sync
mode.sendTimeoutExpression
- the Expression
for timeout to wait for result
fo send operation.public void setSendFailureChannel(org.springframework.messaging.MessageChannel sendFailureChannel)
ErrorMessage
will be sent
to this channel with a payload of a KafkaSendFailureException
with the
failed message and cause.sendFailureChannel
- the failure channel.public void setSendFailureChannelName(java.lang.String sendFailureChannelName)
ErrorMessage
will be
sent to this channel name with a payload of a KafkaSendFailureException
with the failed message and cause.sendFailureChannelName
- the failure channel name.public void setSendSuccessChannel(org.springframework.messaging.MessageChannel sendSuccessChannel)
sendSuccessChannel
- the Success channel.public void setSendSuccessChannelName(java.lang.String sendSuccessChannelName)
sendSuccessChannelName
- the Success channel name.public void setErrorMessageStrategy(org.springframework.integration.support.ErrorMessageStrategy errorMessageStrategy)
errorMessageStrategy
- the implementation.public void setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
messageConverter
- the converter.setReplyPayloadType(Type)
public void setReplyPayloadType(java.lang.reflect.Type payloadType)
StringJsonMessageConverter
,
set the payload type the converter should create. Defaults to Object
.payloadType
- the type.setReplyMessageConverter(RecordMessageConverter)
public java.lang.String getComponentType()
getComponentType
in interface org.springframework.integration.support.context.NamedComponent
getComponentType
in class org.springframework.integration.handler.AbstractMessageHandler
protected org.springframework.messaging.MessageChannel getSendFailureChannel()
protected org.springframework.messaging.MessageChannel getSendSuccessChannel()
protected void doInit()
doInit
in class org.springframework.integration.handler.AbstractReplyProducingMessageHandler
public void start()
start
in interface org.springframework.context.Lifecycle
public void stop()
stop
in interface org.springframework.context.Lifecycle
public boolean isRunning()
isRunning
in interface org.springframework.context.Lifecycle
protected java.lang.Object handleRequestMessage(org.springframework.messaging.Message<?> message)
handleRequestMessage
in class org.springframework.integration.handler.AbstractReplyProducingMessageHandler
public void processSendResult(org.springframework.messaging.Message<?> message, org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord, org.springframework.util.concurrent.ListenableFuture<org.springframework.kafka.support.SendResult<K,V>> future, org.springframework.messaging.MessageChannel metadataChannel) throws java.lang.InterruptedException, java.util.concurrent.ExecutionException
java.lang.InterruptedException
java.util.concurrent.ExecutionException