
Lecture et écriture de données avec les streams
Apprenez comment lire les donnees des streams Readable (modes flowing et paused) et ecrire dans les streams Writable en Node.js, en gerant la contre-pression (backpressure).
Interagir avec les flux : lire les donnees entrantes
Maintenant que nous connaissons les différents types de streams, il est temps de voir comment interagir concrètement avec eux. Commençons par la lecture des données depuis un stream Readable. Node.js propose deux modes principaux pour consommer ces données : le mode "flowing" (fluide) et le mode "paused" (en pause).
Le choix du mode dépend de la manière dont vous souhaitez traiter les données. Le mode flowing est plus simple pour une consommation directe, tandis que le mode paused offre un contrôle plus fin sur le flux de données, ce qui est essentiel dans certains scénarios, notamment lors de l'implémentation de streams Transform ou Duplex, ou lors de l'utilisation conjointe avec `async/await` et les itérateurs asynchrones.
Mode Flowing : la reception automatique des donnees
Le mode flowing est activé dès qu'un écouteur est attaché à l'événement `data`. Dans ce mode, le stream Readable pousse activement les morceaux de données (chunks) vers votre application dès qu'ils sont disponibles. Votre code réagit à ces événements pour traiter les données reçues.
Les événements clés en mode flowing sont :
- `data` : Emis chaque fois qu'un chunk de données est disponible. Le chunk est passé en argument du callback.
- `end` : Emis une seule fois, lorsque la source n'a plus de données à fournir.
- `error` : Emis si une erreur survient pendant la lecture.
- `close` : Emis lorsque le flux et sa ressource sous-jacente (ex: descripteur de fichier) sont fermés.
Voici un exemple simple de lecture depuis l'entrée standard (`process.stdin`), qui est un stream Readable en mode flowing par défaut :
process.stdin.on('data', (chunk) => {
console.log(`Données reçues : ${chunk.toString()}`);
});
process.stdin.on('end', () => {
console.log('Fin du flux.');
});
console.log('Entrez du texte (Ctrl+D pour terminer) :');L'inconvénient potentiel du mode flowing est que si le consommateur (votre code traitant l'événement `data`) est plus lent que la source, les données peuvent s'accumuler en mémoire, menant à une consommation excessive. C'est pourquoi `pipe()` ou le mode paused sont souvent préférables pour les flux rapides ou volumineux.
Mode Paused : le controle explicite de la lecture
Le mode paused est le mode par défaut pour la plupart des streams Readable (sauf `process.stdin`). Dans ce mode, les données ne sont pas poussées automatiquement. Vous devez explicitement demander des données en appelant la méthode `read([size])`.
L'événement clé ici est `readable`. Il est émis lorsque de nouvelles données sont disponibles dans le buffer interne du stream et peuvent être lues via `read()`. La lecture se fait typiquement dans une boucle `while` à l'intérieur du gestionnaire `readable`, tant que `read()` retourne des données (et non `null`).
Exemple de lecture d'un fichier en mode paused :
const fs = require('fs');
const readableStream = fs.createReadStream('mon_fichier.txt', { encoding: 'utf8' });
readableStream.on('readable', () => {
let chunk;
console.log('--> Stream lisible');
// Lire tant qu'il y a des données disponibles
while (null !== (chunk = readableStream.read())) {
console.log(`Chunk lu (${chunk.length} octets): ${chunk}`);
}
});
readableStream.on('end', () => {
console.log('--> Fin de la lecture du fichier.');
});
readableStream.on('error', (err) => {
console.error('Erreur de lecture:', err);
});Le mode paused donne un contrôle total sur le moment et la quantité de données lues, évitant ainsi la saturation du consommateur. C'est ce mode qui est utilisé en interne par la méthode `pipe()` pour gérer la contre-pression.
Ecrire des donnees dans les streams Writable
Pour envoyer des données vers une destination, on utilise un stream Writable. Les deux méthodes principales sont `write()` et `end()`.
write(chunk, [encoding], [callback]): Envoie un morceau de données (chunk) au stream. Le chunk peut être une chaîne de caractères, un Buffer ou tout autre type de données supporté. Un callback optionnel peut être fourni, qui sera appelé une fois ce chunk spécifique traité par le système sous-jacent. Cette méthode retourne `true` si le buffer interne du stream a encore de la place, et `false` si le buffer est plein.end([chunk], [encoding], [callback]): Signale qu'il n'y a plus de données à écrire. Peut éventuellement envoyer un dernier chunk. Une fois toutes les données écrites et flushées vers la destination, le stream émet l'événement `finish`.
Un aspect crucial de l'écriture est la gestion de la contre-pression (backpressure). Si vous appelez `write()` de manière répétée sans vérifier sa valeur de retour lorsque la destination est lente (par exemple, une connexion réseau lente ou un disque dur occupé), vous risquez de remplir le buffer interne du stream Writable et de consommer beaucoup de mémoire. Lorsque `write()` retourne `false`, cela signifie que vous devriez arrêter d'écrire temporairement et attendre que le stream émette l'événement `drain`. Cet événement signale que le buffer s'est vidé et que vous pouvez reprendre l'écriture.
Exemple d'écriture dans un fichier avec gestion simple de la contre-pression :
const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');
let i = 0;
function writeLotsOfData() {
let ok = true;
do {
i++;
const data = `Ligne numéro ${i}\n`;
if (i === 100000) {
// Dernière écriture
writableStream.end(data, 'utf8', () => {
console.log('Ecriture terminée !');
});
} else {
// Continuer l'écriture
// Ecrire et vérifier si on doit attendre (backpressure)
ok = writableStream.write(data, 'utf8');
if (!ok) {
console.log('Contre-pression détectée, attente de \'drain\'...');
}
}
} while (i < 100000 && ok);
if (i < 100000) {
// Si la boucle s'est arrêtée à cause de ok === false,
// attendre 'drain' pour reprendre.
writableStream.once('drain', () => {
console.log('Evénement \'drain\' reçu, reprise de l\'écriture.');
writeLotsOfData();
});
}
}
writableStream.on('finish', () => {
console.log('Toutes les données ont été écrites (événement finish).');
});
writableStream.on('error', (err) => {
console.error('Erreur d\'écriture:', err);
});
writeLotsOfData(); // Démarrer l'écritureBien que ce mécanisme puisse sembler complexe, il est fondamental pour créer des applications robustes. Heureusement, la méthode `pipe()` gère automatiquement la contre-pression pour nous dans la plupart des cas.
Simplifier avec pipe() : la connexion ideale
La méthode la plus courante et la plus simple pour lire depuis un Readable stream et écrire dans un Writable stream est d'utiliser `readableStream.pipe(writableStream)`. Cette unique ligne de code accomplit plusieurs tâches importantes :
- Elle lit les données du `readableStream`.
- Elle les écrit dans le `writableStream`.
- Elle gère automatiquement la contre-pression : si le `writableStream` devient lent, `pipe()` met en pause le `readableStream` et attend l'événement `drain` avant de reprendre.
- Elle propage les erreurs : par défaut, si une erreur se produit sur le `readableStream`, le `writableStream` est détruit.
- Elle gère la fin du flux : lorsque le `readableStream` émet `end`, `pipe()` appelle automatiquement `writableStream.end()`.
Exemple de copie de fichier ultra-simplifiée avec `pipe()` :
const fs = require('fs');
const readable = fs.createReadStream('gros_fichier_source.mov');
const writable = fs.createWriteStream('copie_fichier_destination.mov');
readable.pipe(writable);
readable.on('error', (err) => console.error('Erreur lecture:', err));
writable.on('error', (err) => console.error('Erreur écriture:', err));
writable.on('finish', () => console.log('Copie terminée !'));Grâce à `pipe()`, la lecture et l'écriture entre streams deviennent triviales et efficaces, encapsulant la complexité de la gestion des modes de lecture et de la contre-pression. C'est l'outil de choix pour connecter des flux en Node.js.