Spring Boot: обмен сообщениями, Apache Kafka

Apache Kafka поддерживается посредством автоматической настройки проекта spring-kafka.

Конфигурация Kafka контролируется внешними параметрами конфигурации в spring.kafka.*. Например, вы можете объявить следующий раздел в application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

Чтобы создать topic при запуске, добавьте компонент типа NewTopic. Если topic уже существует, bean игнорируется.

Отправка сообщения

Spring KafkaTemplate автоматически настраивается, и вы можете автоматически связывать его непосредственно с вашими компонентами, как показано в следующем примере:

@Component
public class MyBean {

    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}

Если свойство spring.kafka.producer.transaction-id-prefix определено, KafkaTransactionManager настраивается автоматически. Кроме того, если bean-компонент RecordMessageConverter определен, он автоматически связывается с автоматически настроенным KafkaTemplate.

Получение сообщения

При наличии инфраструктуры Apache Kafka любой компонент может быть аннотирован @KafkaListener для создания конечной точки прослушивателя. Если KafkaListenerContainerFactory не был определен, по умолчанию автоматически настраиваются ключи, определенные в spring.kafka.listener.*.

Следующий компонент создает конечную точку прослушивателя по topic someTopic:

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}

Если bean-компонент KafkaTransactionManager определен, он автоматически связывается с фабрикой контейнеров. Аналогично, если определен bean-компонент ErrorHandler, AfterRollbackProcessor или ConsumerAwareRebalanceListener, он автоматически связывается с фабрикой по умолчанию.

В зависимости от типа слушателя bean-компонент RecordMessageConverter или BatchMessageConverter связан с фабрикой по умолчанию. Если для прослушивателя пакета присутствует только компонент RecordMessageConverter, он помещается в BatchMessageConverter.

Пользовательский ChainedKafkaTransactionManager должен быть помечен @Primary, поскольку он обычно ссылается на автоматически настроенный компонент KafkaTransactionManager.


Читайте также:


Комментарии

Популярные сообщения из этого блога

Методы класса Object в Java

Как получить текущий timestamp в Java

Основные опции JVM для повышения производительности и отладки