Spring Boot: обмен сообщениями, Apache Kafka
Apache Kafka поддерживается посредством автоматической настройки проекта spring-kafka.
Конфигурация Kafka контролируется внешними параметрами конфигурации в spring.kafka.*. Например, вы можете объявить следующий раздел в application.properties:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Чтобы создать topic при запуске, добавьте компонент типа NewTopic. Если topic уже существует, bean игнорируется.
Отправка сообщения
Spring KafkaTemplate автоматически настраивается, и вы можете автоматически связывать его непосредственно с вашими компонентами, как показано в следующем примере:
@Component
public class MyBean {
private final KafkaTemplate kafkaTemplate;
@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
Если свойство spring.kafka.producer.transaction-id-prefix определено, KafkaTransactionManager настраивается автоматически. Кроме того, если bean-компонент RecordMessageConverter определен, он автоматически связывается с автоматически настроенным KafkaTemplate.
Получение сообщения
При наличии инфраструктуры Apache Kafka любой компонент может быть аннотирован @KafkaListener для создания конечной точки прослушивателя. Если KafkaListenerContainerFactory не был определен, по умолчанию автоматически настраиваются ключи, определенные в spring.kafka.listener.*.
Следующий компонент создает конечную точку прослушивателя по topic someTopic:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
Если bean-компонент KafkaTransactionManager определен, он автоматически связывается с фабрикой контейнеров. Аналогично, если определен bean-компонент ErrorHandler, AfterRollbackProcessor или ConsumerAwareRebalanceListener, он автоматически связывается с фабрикой по умолчанию.
В зависимости от типа слушателя bean-компонент RecordMessageConverter или BatchMessageConverter связан с фабрикой по умолчанию. Если для прослушивателя пакета присутствует только компонент RecordMessageConverter, он помещается в BatchMessageConverter.
Пользовательский ChainedKafkaTransactionManager должен быть помечен @Primary, поскольку он обычно ссылается на автоматически настроенный компонент KafkaTransactionManager.
Читайте также:
- Spring Boot: обмен сообщениями, AMQP, поддержка RabbitMQ
- Spring Boot: обмен сообщениями, JMS, ActiveMQ
- Spring Boot: обмен сообщениями, JMS, отправка и получение сообщения
Комментарии
Отправить комментарий