Contactez-nous

Spring for Apache Kafka (`spring-kafka`)

Apprenez à utiliser Spring for Apache Kafka (spring-kafka) pour produire et consommer des messages Kafka de manière transparente dans vos applications Spring Boot.

Introduction : Spring et le streaming avec Apache Kafka

Apache Kafka s'est imposé comme la plateforme de streaming d'événements distribuée de référence. Contrairement aux brokers de messages plus traditionnels comme JMS ou AMQP, Kafka est conçu autour d'un journal de commit distribué, partitionné et répliqué, offrant une durabilité élevée, un débit massif et une faible latence. Il est largement utilisé pour des cas d'usage variés : pipelines de données en temps réel, suivi d'activité, agrégation de logs, communication inter-services, event sourcing, etc.

L'intégration d'Apache Kafka dans une application Java peut impliquer l'utilisation directe des API Producer et Consumer natives de Kafka, qui bien que puissantes, peuvent être verbeuses. Le projet Spring for Apache Kafka (`spring-kafka`) vise à appliquer les principes fondamentaux de Spring (simplification, abstraction, injection de dépendances, modèle de programmation cohérent) à l'écosystème Kafka.

Il fournit des abstractions de haut niveau, notamment `KafkaTemplate` pour la production de messages et un système de listener basé sur l'annotation `@KafkaListener` pour la consommation, simplifiant considérablement le développement d'applications Spring Boot basées sur Kafka.

Configuration et Mise en Place

Pour commencer, ajoutez la dépendance `spring-kafka` à votre projet Spring Boot :


    org.springframework.kafka
    spring-kafka

Spring Boot auto-configure une grande partie de l'infrastructure nécessaire dès que `spring-kafka` est détecté sur le classpath. Il configure notamment :

  • Un `KafkaAdmin` pour gérer les topics (ex: création automatique si configurée).
  • Un `ProducerFactory` et un `ConsumerFactory` pour créer les instances de producteurs et consommateurs Kafka.
  • Un `KafkaTemplate` prêt à être injecté pour envoyer des messages.
  • Une infrastructure pour détecter et gérer les méthodes annotées avec `@KafkaListener`.

La configuration essentielle se fait via le fichier `application.properties` ou `application.yml`. La propriété la plus importante est l'adresse des serveurs Kafka (brokers) :

# Adresse(s) des brokers Kafka
spring.kafka.bootstrap-servers=localhost:9092

# Configuration du producteur (exemple)
# Sérialiseurs pour la clé et la valeur du message
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

# Configuration du consommateur (exemple)
# ID de groupe unique pour ce consommateur
spring.kafka.consumer.group-id=mon-groupe-consommateurs
# Désérialiseurs correspondants
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
# Indique à Jackson quels packages sont sûrs pour la désérialisation JSON
spring.kafka.consumer.properties.spring.json.trusted.packages=com.exemple.myapp.model
# Comportement si aucun offset initial n'est trouvé pour le groupe
spring.kafka.consumer.auto-offset-reset=earliest

Le choix des sérialiseurs et désérialiseurs (`key-serializer`, `value-serializer`, `key-deserializer`, `value-deserializer`) est crucial. Ils doivent correspondre entre le producteur et le consommateur. Spring Kafka fournit `JsonSerializer` et `JsonDeserializer` (basés sur Jackson) pour faciliter l'envoi/réception d'objets Java en JSON. `StringSerializer`/`StringDeserializer` sont utilisés pour les messages texte simples.

Produire des messages avec `KafkaTemplate`

Le `KafkaTemplate` est l'outil principal fourni par `spring-kafka` pour envoyer des messages aux topics Kafka. Il encapsule un `KafkaProducer` natif et gère son cycle de vie, tout en offrant des méthodes d'envoi simplifiées.

Comme il est auto-configuré par Spring Boot, vous pouvez l'injecter directement dans vos beans :

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.kafka.support.SendResult;
import java.util.concurrent.CompletableFuture;

@Service
public class KafkaProducerService {

    // KafkaTemplate - Types définis par les sérialiseurs configurés
    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public KafkaProducerService(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendSimpleMessage(String topic, String message) {
        System.out.println("Envoi du message simple vers '" + topic + "': " + message);
        kafkaTemplate.send(topic, message); // Envoi simple (fire-and-forget par défaut)
    }

    public void sendOrderEvent(String topic, String orderId, Order order) {
        System.out.println("Envoi de l'événement Order vers '" + topic + "' pour ID: " + orderId);
        // La clé (orderId) est utilisée par Kafka pour déterminer la partition
        // L'objet Order sera sérialisé (ex: en JSON par JsonSerializer)
        CompletableFuture> future = kafkaTemplate.send(topic, orderId, order);
        
        // Gestion optionnelle du résultat de l'envoi (asynchrone)
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("Message envoyé avec succès vers: " + 
                                   result.getRecordMetadata().topic() + "/" + 
                                   result.getRecordMetadata().partition() + "/" + 
                                   result.getRecordMetadata().offset());
            } else {
                System.err.println("Echec de l'envoi du message: " + ex.getMessage());
            }});
}
    
 // Exemple d'une classe modèle
    public static class Order {
        private String id;
        private String product;
        private int quantity;
        // getters, setters, constructeurs...
         public String getId() { return id; }
         public void setId(String id) { this.id = id; }
         public String getProduct() { return product; }
         public void setProduct(String product) { this.product = product; }
         public int getQuantity() { return quantity; }
         public void setQuantity(int quantity) { this.quantity = quantity; }
         public Order(String id, String product, int quantity) { this.id=id; this.product=product; this.quantity=quantity; }
         public Order() {}
    }
}

La méthode `send(String topic, K key, V value)` est la plus couramment utilisée. Le `topic` est la destination, la `key` (optionnelle, peut être `null`) est utilisée par Kafka pour choisir la partition (les messages avec la même clé vont généralement dans la même partition, garantissant l'ordre pour cette clé), et `value` est le contenu du message (le payload). L'objet `value` sera sérialisé en utilisant le `value-serializer` configuré.

L'envoi est asynchrone par défaut. La méthode `send` retourne un `CompletableFuture>` que vous pouvez utiliser pour être notifié lorsque le message a été acquitté par le broker Kafka ou si une erreur s'est produite.

Consommer des messages avec `@KafkaListener`

Pour consommer des messages, `spring-kafka` propose l'annotation `@KafkaListener`, qui simplifie énormément la mise en place d'un consommateur Kafka. Il suffit d'annoter une méthode dans un bean Spring.

Les attributs clés de `@KafkaListener` sont :

  • `topics` : Un tableau de noms de topics à écouter.
  • `topicPattern` : Alternativement, un pattern regex pour écouter tous les topics correspondants.
  • `groupId` : L'identifiant du groupe de consommateurs. C'est essentiel. Tous les consommateurs avec le même `groupId` font partie du même groupe. Kafka répartit les partitions d'un topic entre les consommateurs d'un même groupe. Ainsi, chaque message d'une partition n'est traité que par un seul consommateur du groupe. Si vous lancez plusieurs instances de votre application avec le même `groupId`, elles se partageront la charge.
  • `containerFactory` : Permet de spécifier une factory de conteneurs de listeners personnalisée si vous avez besoin de configurations spécifiques (filtrage, gestion d'erreurs avancée, etc.). Par défaut, Spring Boot en configure une.

Exemple d'un composant consommateur :

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.kafka.support.KafkaHeaders;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.support.Acknowledgment;

@Component
public class KafkaConsumerService {

    // Ecoute simple sur le topic "simple-topic"
    @KafkaListener(topics = "simple-topic", groupId = "group-1")
    public void listenSimpleTopic(String message) {
        System.out.println("Message reçu de 'simple-topic': " + message);
        // Traitement...
    }

    // Ecoute sur le topic "order-events-topic" et désérialise l'objet Order
    // (Nécessite le JsonDeserializer configuré et spring.json.trusted.packages)
    @KafkaListener(topics = "order-events-topic", groupId = "order-processors", 
                   containerFactory = "kafkaListenerContainerFactory") // Utilise la factory par défaut si non spécifié
    public void listenOrderEvents(@Payload KafkaProducerService.Order order,
                                  @Header(KafkaHeaders.RECEIVED_KEY) String key,
                                  @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
                                  @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                  @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
        
        System.out.println("Evénement Order reçu (clé: " + key + ") sur " + topic + "-" + partition + ":");
        System.out.println("  ID: " + order.getId() + ", Produit: " + order.getProduct() + ", Qté: " + order.getQuantity());
        // Traitement...
    }

    // Accès à l'enregistrement complet et gestion manuelle de l'acquittement (si configuré)
    @KafkaListener(topics = "manual-ack-topic", groupId = "manual-ack-group", 
                   containerFactory = "manualAckKafkaListenerContainerFactory") // Nécessite une factory spécifique
    public void listenWithManualAck(ConsumerRecord record, Acknowledgment acknowledgment) {
        System.out.println("Reçu (manuel ack): Partition=" + record.partition() + 
                           ", Offset=" + record.offset() + ", Valeur=" + record.value());
        try {
            // Traitement métier...
            // Si succès:
            acknowledgment.acknowledge(); // Acquitter manuellement le message
            System.out.println("Message acquitté manuellement.");
        } catch (Exception e) {
            System.err.println("Erreur lors du traitement, message non acquitté: " + e.getMessage());
            // Ne pas appeler acknowledge() pour que le message soit potentiellement retraité
            // ou géré par une politique d'erreur (ex: Dead Letter Topic)
        }
    }
}

Spring Kafka utilise le `value-deserializer` configuré pour convertir le tableau d'octets reçu de Kafka en type de paramètre de votre méthode (`String`, `Order`, etc.). Vous pouvez accéder aux métadonnées du message (clé, partition, topic, timestamp) via les annotations `@Header` en utilisant les constantes de `KafkaHeaders`. Il est aussi possible d'injecter le `ConsumerRecord` complet ou un `Acknowledgment` pour gérer manuellement les offsets (moins courant avec Spring Boot, qui gère généralement l'acquittement automatiquement).

Sérialisation, Désérialisation et Gestion d'Erreurs

La configuration correcte et cohérente des sérialiseurs/désérialiseurs entre producteurs et consommateurs est primordiale. L'utilisation de `JsonSerializer` et `JsonDeserializer` est très courante pour échanger des objets complexes. Assurez-vous que la propriété `spring.kafka.consumer.properties.spring.json.trusted.packages` est définie pour indiquer à Jackson quels packages il peut désérialiser, pour des raisons de sécurité.

La gestion des erreurs dans les consommateurs `@KafkaListener` est importante. Si votre méthode lève une exception non gérée, par défaut, `spring-kafka` va tenter de retraiter le message (via un `DefaultErrorHandler`). Si les tentatives échouent, le message peut être ignoré ou, mieux, envoyé vers un 'Dead Letter Topic' (DLT) pour analyse ultérieure. Vous pouvez configurer ce comportement via les propriétés Kafka ou en fournissant des beans `ErrorHandler` personnalisés dans votre configuration. Une gestion robuste des erreurs est essentielle pour ne pas bloquer le traitement des partitions et pour ne pas perdre de messages.

Spring for Apache Kafka offre une intégration transparente et puissante, permettant aux développeurs Spring Boot de tirer parti de la puissance de Kafka sans se perdre dans la complexité de ses API natives, tout en bénéficiant des facilités de configuration et du modèle de programmation familier de Spring.