Contactez-nous

Fan-out, fan-in pattern

Découvrez le Fan-out, Fan-in pattern en Go : distribution de travail, agrégation de résultats, parallélisation et optimisation de la performance pour des applications Go concurrentes.

Introduction au Fan-out, Fan-in Pattern : Distribution et agrégation concurrentes

Le Fan-out, Fan-in pattern est un design pattern de concurrence puissant et polyvalent en Go, permettant de traiter des tâches de manière parallèle en distribuant le travail à un ensemble de goroutines (fan-out) et en aggrégeant les résultats de ces goroutines dans un channel unique (fan-in). Ce pattern est particulièrement utile pour paralléliser des opérations indépendantes, améliorer la performance et optimiser l'utilisation des ressources dans les applications concurrentes.

Imaginez une tâche complexe qui peut être divisée en plusieurs sous-tâches indépendantes. Avec le Fan-out, Fan-in pattern, vous pouvez lancer un groupe de goroutines (les workers) pour exécuter ces sous-tâches en parallèle (fan-out). Chaque worker travaille sur une partie du problème, et une fois qu'ils ont terminé, leurs résultats sont collectés et combinés (fan-in) pour produire le résultat final. Ce pattern permet de tirer pleinement parti des processeurs multi-coeurs et d'accélérer significativement le temps d'exécution global.

Ce chapitre explore en profondeur le Fan-out, Fan-in pattern en Go. Nous allons détailler les mécanismes du fan-out (distribution du travail) et du fan-in (agrégation des résultats), comment les implémenter en Go en utilisant des channels et des goroutines, les avantages qu'ils offrent en termes de parallélisation et de performance, les cas d'utilisation typiques, et les bonnes pratiques pour concevoir et utiliser efficacement le Fan-out, Fan-in pattern dans vos applications Go. Que vous soyez novice ou expérimenté, ce guide complet vous fournira les clés pour maîtriser ce pattern essentiel de la programmation concurrente parallèle en Go.

Fan-out : Distribution du travail à plusieurs goroutines workers

La partie Fan-out du pattern consiste à distribuer une charge de travail à plusieurs goroutines worker qui vont exécuter les tâches en parallèle. Le fan-out permet de diviser le travail initial en sous-tâches plus petites et indépendantes, et de les confier à un groupe de workers pour un traitement parallèle.

Principe du Fan-out :

  • Channel d'entrée (Input Channel) : Une goroutine émettrice (souvent la goroutine principale ou une goroutine dispatcher) produit ou reçoit un flux de tâches à exécuter et les envoie sur un channel d'entrée (input channel). Le channel d'entrée sert de source de travail pour les workers.
  • Groupe de goroutines workers : Un groupe de goroutines worker est lancé pour consommer les tâches du channel d'entrée. Chaque worker reçoit les tâches du channel d'entrée et les exécute en parallèle avec les autres workers. Le nombre de workers dans le groupe détermine le niveau de parallélisme.
  • Distribution des tâches : Les workers reçoivent les tâches du channel d'entrée de manière concurrente. Go assure une distribution équitable des tâches entre les workers disponibles : le premier worker disponible qui est prêt à recevoir une tâche sera servi.

Exemple de Fan-out : Distribution de tâches de calcul à des workers :

package main

import (
    "fmt"
    "sync"
)

func workerFanOut(id int, entrees <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for n := range entrees {
        // Simuler un calcul intensif sur le nombre 'n'
        resultat := n * n * n
        fmt.Printf("Worker %d : Calcul de %d^3 = %d\n", id, n, resultat)
    }
}

func main() {
    nombreWorkers := 3
    jobsChan := make(chan int) // Channel pour distribuer les tâches (nombres à calculer)
    var wg sync.WaitGroup

    // Lancement du Fan-out : création du pool de workers
    for w := 1; w <= nombreWorkers; w++ {
        wg.Add(1)
        go workerFanOut(w, jobsChan, &wg)
    }

    // Soumission des tâches au Fan-out : envoi des nombres à calculer sur le channel 'jobsChan'
    for i := 1; i <= 10; i++ {
        jobsChan <- i
    }
    close(jobsChan) // Fermeture du channel 'jobsChan' : signal de fin de tâches

    // Attente de la fin de tous les workers (synchronisation)
    wg.Wait()
    fmt.Println("Fan-out terminé : Tous les workers ont fini de traiter les tâches.")
}

Dans cet exemple :

  • Un channel jobsChan de type chan int est créé pour servir de channel d'entrée des tâches (nombres à calculer).
  • Un pool de 3 goroutines worker (workerFanOut) est lancé. Chaque worker reçoit les nombres à calculer depuis le channel jobsChan.
  • La fonction main soumet 10 tâches (les nombres de 1 à 10) au Fan-out en envoyant ces nombres sur le channel jobsChan. Le channel jobsChan sert de file d'attente de tâches.
  • Les workers, en parallèle, consomment les nombres du channel jobsChan et effectuent le calcul (mise au cube) de chaque nombre. Les tâches sont distribuées entre les 3 workers disponibles.
  • Le fan-out est réalisé par la distribution des tâches à plusieurs workers via le channel jobsChan, permettant de paralléliser le traitement des calculs.

Le fan-out est un pattern fondamental pour la distribution et la parallélisation du travail en Go, permettant de tirer parti de la concurrence pour accélérer le traitement des tâches.

Fan-in : Agrégation des résultats de multiples goroutines en un seul channel

La partie Fan-in du pattern consiste à aggréger les résultats produits par plusieurs goroutines worker dans un channel unique de sortie. Le fan-in permet de collecter et de combiner les résultats du traitement parallèle effectué par les workers, pour les transmettre à une étape ultérieure du traitement ou au code client.

Principe du Fan-in :

  • Plusieurs channels d'entrée (Input Channels) : Chaque goroutine worker (ou groupe de goroutines worker) produit des résultats et les envoie sur son propre channel d'entrée (input channel). Il y a donc plusieurs channels d'entrée, chacun provenant d'une source de résultats différente (d'un worker ou d'un groupe de workers).
  • Channel de sortie unique (Output Channel) : Une goroutine fan-in (ou multiplexeur) est créée pour recevoir les résultats depuis tous les channels d'entrée et les aggréger (les retransmettre) sur un channel de sortie unique (output channel). Le channel de sortie unique sert de point de convergence pour tous les résultats.
  • Multiplexage et agrégation : La goroutine fan-in utilise généralement une instruction select pour multiplexer la réception depuis tous les channels d'entrée. Lorsqu'un résultat est disponible sur l'un des channels d'entrée, la goroutine fan-in le reçoit et l'envoie sur le channel de sortie unique. Elle continue à multiplexer et à agréger les résultats jusqu'à ce que toutes les sources de résultats aient terminé d'envoyer leurs données.

Exemple de Fan-in : Agrégation des résultats de workers dans un channel unique :

package main

import (
    "fmt"
    "sync"
)

func workerFanIn(id int, entrees <-chan int, sorties chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for n := range entrees {
        // Simuler un calcul et envoyer le résultat sur le channel 'sorties'
        resultat := n * 2
        sorties <- resultat // Envoi du résultat sur le channel 'sorties' (fan-in)
        fmt.Printf("Worker %d : Envoi du résultat %d sur le channel de sortie\n", id, resultat)
    }
}

func fanIn(canaux ...<-chan int) <-chan int {
    canalSortie := make(chan int) // Channel de sortie unique (fan-in)
    var wg sync.WaitGroup

    // Lancement d'une goroutine pour chaque channel d'entrée (multiplexage et fan-in)
    for _, canalEntree := range canaux {
        wg.Add(1)
        go func(canal <-chan int) {
            defer wg.Done()
            for val := range canal {
                canalSortie <- val // Fan-in : réception depuis le channel d'entrée et envoi vers le channel de sortie unique
            }
        }(canalEntree)
    }

    // Goroutine qui ferme le channel de sortie unique lorsque tous les channels d'entrée sont fermés et que tous les workers ont terminé
    go func() {
        wg.Wait()         // Attendre la fin de toutes les goroutines de fan-in
        close(canalSortie) // Fermer le channel de sortie unique après l'agrégation de tous les résultats
    }()
    return canalSortie
}

func main() {
    canalEntree1 := make(chan int)
    canalEntree2 := make(chan int)
    canalEntree3 := make(chan int)

    // Fan-in : Agrégation des résultats de canalEntree1, canalEntree2 et canalEntree3 dans un channel unique 'canalSortie'
    canalSortie := fanIn(canalEntree1, canalEntree2, canalEntree3)

    var wgWorkers sync.WaitGroup
    wgWorkers.Add(3)
    go workerFanIn(1, canalEntree1, canalSortie, &wgWorkers)
    go workerFanIn(2, canalEntree2, canalSortie, &wgWorkers)
    go workerFanIn(3, canalEntree3, canalSortie, &wgWorkers)

    // Alimentation des workers (envoi de données sur les channels d'entrée)
    for i := 1; i <= 5; i++ {
        canalEntree1 <- i
        canalEntree2 <- i * 10
        canalEntree3 <- i * 100
    }
    close(canalEntree1)
    close(canalEntree2)
    close(canalEntree3)

    // Consommation des résultats agrégés depuis le channel 'canalSortie' (fan-in)
    fmt.Println("Résultats agrégés (Fan-in) :")
    for resultat := range canalSortie {
        fmt.Println("Résultat :", resultat)
    }

    wgWorkers.Wait()
    fmt.Println("Fan-in terminé : Tous les workers ont fini d'envoyer les résultats.")
}

Dans cet exemple :

  • Trois channels d'entrée canalEntree1, canalEntree2, canalEntree3 sont créés.
  • Trois goroutines worker (workerFanIn) sont lancées. Chaque worker reçoit des nombres sur son channel d'entrée respectif, effectue un calcul (multiplication par 2), et envoie le résultat sur un channel de sortie commun (canalSortie).
  • La fonction fanIn implémente le pattern fan-in. Elle prend un nombre variable de channels d'entrée (canaux ...<-chan int) et retourne un channel de sortie unique canalSortie.
  • A l'intérieur de fanIn, une goroutine est lancée pour chaque channel d'entrée. Chaque goroutine de fan-in itère sur son channel d'entrée et retransmet chaque valeur reçue sur le channel de sortie unique canalSortie. Le sync.WaitGroup assure que le channel canalSortie n'est fermé qu'après que toutes les goroutines de fan-in ont terminé de retransmettre les résultats.
  • La fonction main alimente les workers en envoyant des données sur les channels d'entrée canalEntree1, canalEntree2 et canalEntree3, puis consomme les résultats agrégés depuis le channel de sortie unique canalSortie (fan-in).

Le fan-in est un pattern essentiel pour l'agrégation de résultats concurrents en Go, permettant de combiner les sorties de multiples goroutines en un flux de données unique pour un traitement ultérieur.

Combiner Fan-out et Fan-in : Parallélisation et agrégation complètes

Le Fan-out et le Fan-in pattern sont souvent utilisés ensemble pour réaliser des traitements de données concurrents plus complexes. Vous pouvez combiner le fan-out pour distribuer une charge de travail à plusieurs workers en parallèle, et le fan-in pour aggréger les résultats de ces workers dans un channel unique.

Scénario typique : Fan-out, Fan-in pour le traitement parallèle de données :

  1. Fan-out (distribution) : Une goroutine dispatcher distribue un ensemble de tâches (par exemple, le traitement d'une liste de fichiers, de requêtes, etc.) à un pool de workers via un channel de tâches (job queue).
  2. Workers parallèles (traitement parallèle) : Un groupe de goroutines worker (le pool) consomment les tâches du channel de tâches et les exécutent en parallèle. Chaque worker effectue le traitement spécifique de la tâche et produit un résultat.
  3. Fan-in (agrégation) : Chaque worker, une fois qu'il a terminé de traiter une tâche, envoie le résultat sur un channel de résultats (result channel). Une goroutine fan-in (multiplexeur) reçoit les résultats de tous les workers depuis leurs channels de résultats respectifs et les aggrège dans un channel de sortie unique (output channel).
  4. Consommation des résultats agrégés : Le code client (en dehors du worker pool) consomme les résultats agrégés depuis le channel de sortie unique, récupérant ainsi les résultats du traitement parallèle de toutes les tâches.

Exemple combinant Fan-out et Fan-in : Traitement parallèle d'images avec agrégation des résultats :

package main

import (
    "fmt"
    "sync"
    "time"
)

// ... (Fonctions workerFanOut et fanIn, et type Job comme dans les exemples précédents)

// Fonction worker pour le traitement d'image (simulé)
func workerTraitementImage(id int, entrees <-chan string, sorties chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    for nomFichier := range entrees {
        // Simuler un traitement d'image (long)
        time.Sleep(time.Duration(id) * 500 * time.Millisecond)
        resultat := fmt.Sprintf("Image %s traitée par worker %d", nomFichier, id)
        sorties <- resultat // Fan-in : envoi du résultat sur le channel de sortie
        fmt.Println("Worker", id, ":", resultat)
    }
}

func main() {
    nomsFichiers := []string{"image1.png", "image2.png", "image3.png", "image4.png", "image5.png"}
    nombreWorkers := 4
    jobsChan := make(chan string, len(nomsFichiers)) // Channel buffered pour les noms de fichiers (tâches)
    canalResultats := make(chan string)             // Channel pour agréger les résultats (fan-in)
    var wgWorkers sync.WaitGroup

    // Fan-out : Lancement du pool de workers pour le traitement d'images
    for w := 1; w <= nombreWorkers; w++ {
        wgWorkers.Add(1)
        go workerTraitementImage(w, jobsChan, canalResultats, &wgWorkers)
    }

    // Fan-in : Agrégation des résultats des workers dans 'canalResultats'
    canalResultatsAggrege := fanIn(canalResultats, canalResultats, canalResultats, canalResultats) // On pourrait fan-in tous les résultats dans un seul canal

    // Soumission des tâches (noms de fichiers) au Fan-out
    for _, nomFichier := range nomsFichiers {
        jobsChan <- nomFichier
    }
    close(jobsChan) // Fermeture du channel 'jobsChan' : plus de tâches à venir

    // Consommation et affichage des résultats agrégés (Fan-in)
    fmt.Println("\nRésultats agrégés (Fan-in) :")
    for resultat := range canalResultatsAggrege {
        fmt.Println("-", resultat)
    }

    wgWorkers.Wait() // Attendre la fin de tous les workers
    fmt.Println("Fan-out/Fan-in terminé : Traitement parallèle des images terminé.")
}
```

Cet exemple combine le fan-out (distribution des noms de fichiers images aux workers) et le fan-in (agrégation des résultats du traitement d'image dans un channel unique). Ce pattern combiné est très puissant pour paralléliser et agréger des traitements de données complexes en Go.

Bonnes pratiques pour le Fan-out, Fan-in Pattern

Pour implémenter et utiliser efficacement le Fan-out, Fan-in pattern en Go, et écrire du code concurrent robuste et performant, voici quelques bonnes pratiques à suivre :

  • Définir clairement les étapes du traitement et le flux de données : Avant d'implémenter un pipeline Fan-out, Fan-in, analysez et décomposez clairement le processus de traitement de données en étapes logiques et indépendantes. Définissez précisément le type de données qui transite entre chaque étape et le flux global des données à travers le pipeline.
  • Choisir le bon nombre de workers pour le Fan-out : Déterminez un nombre approprié de workers pour le fan-out, en fonction de la nature des tâches, des ressources disponibles (nombre de coeurs CPU, capacité mémoire), et du niveau de parallélisation souhaité. Un nombre de workers trop faible peut limiter le parallélisme et la performance. Un nombre de workers trop élevé peut surcharger le système et dégrader les performances. Testez et benchmarkez différentes tailles de pool pour trouver le meilleur compromis.
  • Utiliser des channels buffered pour les files d'attente de tâches et de résultats : Utilisez des channels buffered pour le channel de tâches (job queue) du fan-out et pour les channels de résultats du fan-in. Les channels buffered permettent d'amortir les variations de vitesse entre les étapes du pipeline, d'éviter les blocages et d'améliorer le débit global du pipeline. Choisissez des capacités de buffer appropriées en fonction du volume de données et de la vitesse de traitement des étapes.
  • Gérer explicitement la terminaison du pipeline et la fermeture des channels : Mettez en place un mécanisme de terminaison propre du pipeline, en fermant explicitement les channels d'entrée et de sortie à chaque étape lorsque le traitement est terminé ou lorsqu'une erreur se produit. Utilisez close(channel) pour signaler la fin du flux de données et sync.WaitGroup pour attendre la terminaison de toutes les goroutines du pipeline.
  • Gérer les erreurs à chaque étape et propager les erreurs à travers le pipeline : Intégrez une gestion des erreurs robuste à chaque étape du pipeline. Propagez les erreurs à travers le pipeline via des channels d'erreurs ou en utilisant l'error wrapping, pour permettre une gestion centralisée des erreurs et un débogage facilité.
  • Documenter clairement la structure et le fonctionnement du pipeline Fan-out, Fan-in : Documentez clairement la structure de votre pipeline Fan-out, Fan-in, en décrivant les étapes, les channels, le flux de données, la gestion des erreurs, et les mécanismes de synchronisation et de terminaison. Une bonne documentation facilite la compréhension, la maintenance et l'évolution du code basé sur le pattern Fan-out, Fan-in.

En appliquant ces bonnes pratiques, vous concevrez et utiliserez le Fan-out, Fan-in pattern de manière efficace et pertinente en Go, en construisant des applications concurrentes parallèles performantes, modulaires et robustes pour le traitement de données.