Spring Boot: обмен сообщениями, Kafka Streams

Spring для Apache Kafka предоставляет фабричный компонент для создания объекта StreamsBuilder и управления жизненным циклом его потоков. Spring Boot автоматически конфигурирует требуемый компонент KafkaStreamsConfiguration, если kafka-streams находится в пути к классам, а Kafka Streams включены с помощью аннотации @EnableKafkaStreams.

Включение Kafka Streams означает, что должны быть установлены серверы идентификатора приложения и загрузчика. Первый может быть настроен с использованием spring.kafka.streams.application-id, по умолчанию это spring.application.name, если не задано. Последний может быть установлен глобально или специально переопределен только для потоков.

Несколько дополнительных свойств доступны с использованием выделенных свойств; другие произвольные свойства Kafka могут быть установлены с помощью пространства имен spring.kafka.streams.properties.

Чтобы использовать фабричный компонент, просто подключите StreamsBuilder к своему @Bean, как показано в следующем примере:

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public static class KafkaStreamsExampleConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
                Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

}

По умолчанию потоки, управляемые объектом StreamBuilder, которые он создает, запускаются автоматически. Вы можете настроить это поведение, используя свойство spring.kafka.streams.auto-startup.


Читайте также:


Комментарии

Популярные сообщения из этого блога

Методы класса Object в Java

Как получить текущий timestamp в Java

Основные опции JVM для повышения производительности и отладки