
Production et consommation de messages Kafka (`KafkaTemplate`, `@KafkaListener`)
Guide pratique pour utiliser KafkaTemplate et @KafkaListener dans Spring Boot afin de produire et consommer des messages sur des topics Apache Kafka de manière efficace.
Introduction à Apache Kafka et Spring Kafka
Apache Kafka est une plateforme de streaming d'événements distribuée, open-source, conçue pour gérer de gros volumes de données en temps réel. Elle est utilisée pour construire des pipelines de données, des applications de streaming et des systèmes basés sur les événements. Les concepts clés de Kafka incluent :
- Broker : Un serveur Kafka qui stocke les données. Un cluster Kafka est composé de plusieurs brokers.
- Topic : Une catégorie ou un nom de flux auquel les messages (enregistrements) sont publiés.
- Partition : Un topic est divisé en plusieurs partitions. Chaque partition est un journal (log) ordonné et immuable de messages. La parallélisation de la consommation se fait au niveau des partitions.
- Producer (Producteur) : Application qui publie des messages dans un ou plusieurs topics Kafka.
- Consumer (Consommateur) : Application qui s'abonne à un ou plusieurs topics et traite les messages publiés. Les consommateurs s'organisent en groupes (Consumer Groups) pour partager la charge de travail.
Spring for Apache Kafka (Spring Kafka) est un projet de l'écosystème Spring qui simplifie l'interaction avec Kafka. Il fournit des abstractions de haut niveau, suivant les principes de Spring, pour produire et consommer des messages, en masquant une grande partie de la complexité de l'API client Kafka native.
Le starter `spring-kafka` (souvent inclus via `spring-boot-starter` si sélectionné dans Initializr, sinon à ajouter manuellement) intègre Spring Kafka dans Spring Boot, offrant une auto-configuration pour les producteurs (`KafkaTemplate`) et les consommateurs (`@KafkaListener`).
Dépendances et Configuration
Assurez-vous d'avoir la dépendance `spring-kafka` dans votre projet :
Maven :
org.springframework.kafka
spring-kafka
Gradle :
implementation 'org.springframework.kafka:spring-kafka'Configurez ensuite la connexion à votre cluster Kafka dans `application.properties` ou `application.yml`. Les propriétés essentielles sont :
# application.properties
# Adresse(s) des brokers Kafka
spring.kafka.bootstrap-servers=localhost:9092
# Configuration pour le producteur (serializers)
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# Par défaut: StringSerializer, utiliser JsonSerializer pour envoyer des objets JSON
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Configuration pour le consommateur (deserializers + group id)
spring.kafka.consumer.group-id=my-group # Identifiant du groupe de consommateurs
spring.kafka.consumer.auto-offset-reset=earliest # Que faire si aucun offset initial n'existe (earliest/latest)
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Par défaut: StringDeserializer, utiliser JsonDeserializer pour recevoir des objets JSON
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
# IMPORTANT pour JsonDeserializer : spécifier les packages des classes à désérialiser
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.myapp.model, com.another.package
# Ou '*' pour faire confiance à tous (moins sécurisé)
# spring.kafka.consumer.properties.spring.json.trusted.packages=*
# Optionnel: Configuration pour l'utilisation de type headers avec JSON (recommandé)
spring.kafka.producer.properties.spring.json.add.type.headers=true
spring.kafka.consumer.properties.spring.json.use.type.headers=true
Production de messages avec `KafkaTemplate`
Le `KafkaTemplate` est l'outil principal fourni par Spring Kafka pour envoyer des messages aux topics Kafka. Il encapsule un `KafkaProducer` et offre des méthodes pratiques pour l'envoi.
Injectez le `KafkaTemplate` dans votre service producteur. Le type générique correspond aux types de la clé et de la valeur du message :
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.example.myapp.model.Order; // Votre classe POJO
@Service
public class OrderProducerService {
private static final String TOPIC_NAME = "orders-topic";
// Le template est configuré par Spring Boot basé sur application.properties
// Types génériques : Clé=String, Valeur=Order (sera sérialisé en JSON)
private final KafkaTemplate kafkaTemplate;
@Autowired
public OrderProducerService(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendOrder(Order order) {
System.out.println("Sending order: " + order.getOrderId());
// Envoi simple (asynchrone par défaut)
// La clé (order.getOrderId()) est utilisée pour le partitionnement (si non null)
kafkaTemplate.send(TOPIC_NAME, order.getOrderId(), order);
}
// Exemple d'envoi avec callback pour gérer le résultat
public void sendOrderWithCallback(Order order) {
System.out.println("Sending order with callback: " + order.getOrderId());
kafkaTemplate.send(TOPIC_NAME, order.getOrderId(), order)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Message sent successfully to topic [" + result.getRecordMetadata().topic() +
"], partition [" + result.getRecordMetadata().partition() +
"], offset [" + result.getRecordMetadata().offset() + "]");
} else {
System.err.println("Failed to send message: " + ex.getMessage());
}
});
}
}
Le `KafkaTemplate` utilise les serializers configurés dans `application.properties` (ici `StringSerializer` pour la clé et `JsonSerializer` pour la valeur `Order`) pour convertir les objets avant de les envoyer au broker Kafka. L'envoi est asynchrone ; la méthode `send` retourne un `CompletableFuture` si vous avez besoin de gérer le résultat.
Consommation de messages avec `@KafkaListener`
L'annotation `@KafkaListener` est le moyen le plus simple et le plus déclaratif de consommer des messages depuis des topics Kafka. Annotez une méthode dans un bean Spring, et Spring Kafka configurera automatiquement un conteneur de listeners pour invoquer cette méthode lorsqu'un message arrive.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.example.myapp.model.Order; // Votre classe POJO
@Component
public class OrderConsumerService {
// Ecoute le topic "orders-topic" avec le groupe "my-group"
@KafkaListener(topics = "orders-topic", groupId = "my-group")
public void consumeOrder(@Payload Order order, // Le message désérialisé (JSON -> Order)
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("-------------------------------------------");
System.out.println("Received Order: " + order.getOrderId() + " - " + order.getItem());
System.out.println("Key: " + key);
System.out.println("Partition: " + partition);
System.out.println("Offset: " + offset);
System.out.println("-------------------------------------------");
// Traiter la commande ici...
// Si une exception est levée ici, le message ne sera pas considéré comme acquitté
// (selon la configuration de l'error handler)
}
// Autre listener pour un autre topic ou groupe si nécessaire
/*
@KafkaListener(topics = "another-topic", groupId = "another-group")
public void consumeAnotherMessage(String message) {
System.out.println("Received raw message: " + message);
}
*/
}
Dans cet exemple, la méthode `consumeOrder` sera appelée pour chaque message reçu sur `orders-topic` par un consommateur du groupe `my-group`. Le `JsonDeserializer` (configuré dans `application.properties`) est utilisé pour convertir le corps du message JSON en un objet `Order`. Vous pouvez également injecter des informations de métadonnées (clé, partition, offset, timestamp, headers) via les annotations `@Header` en utilisant les constantes de `KafkaHeaders`.
Spring Kafka gère le polling, le commit des offsets et le cycle de vie du consommateur. Par défaut, si la méthode du listener se termine sans exception, l'offset est commité. En cas d'exception, le comportement dépend de la configuration de l'`ErrorHandler` (par défaut, logue l'erreur et le message n'est pas re-tenté indéfiniment).
Gestion des Topics avec `NewTopic` Bean
Bien que les topics puissent être créés manuellement ou par auto-création sur le broker, il est recommandé de les définir dans votre application pour la clarté et l'automatisation. Vous pouvez déclarer des beans de type `NewTopic` dans une classe `@Configuration`. L'objet `KafkaAdmin` auto-configuré par Spring Boot tentera de créer ces topics au démarrage si ils n'existent pas déjà (nécessite les permissions appropriées sur le broker).
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
public static final String ORDERS_TOPIC = "orders-topic";
@Bean
public NewTopic ordersTopic() {
// Crée un topic nommé "orders-topic" avec 3 partitions et un facteur de réplication de 1
// Ajustez les partitions et la réplication selon votre configuration Kafka
return TopicBuilder.name(ORDERS_TOPIC)
.partitions(3)
.replicas(1)
.build();
}
// Déclarer d'autres topics ici si nécessaire
/*
@Bean
public NewTopic anotherTopic() {
return TopicBuilder.name("another-topic")
.partitions(1)
.replicas(1)
.build();
}
*/
}
Conclusion
Spring Kafka simplifie considérablement l'intégration d'Apache Kafka dans les applications Spring Boot. `KafkaTemplate` offre une interface fluide pour la production de messages, tandis que `@KafkaListener` permet une consommation déclarative et robuste.
En configurant correctement les serializers/deserializers (notamment pour JSON) et en définissant vos topics, vous pouvez rapidement mettre en place des systèmes de messagerie et des architectures événementielles puissantes et scalables, en tirant parti de la fiabilité et des performances de Kafka.