
Exemples pratiques : streaming de fichiers, traitement de données en temps réel
Illustrations concretes de l'utilisation des streams Node.js : copie et service de gros fichiers, traitement et transformation de donnees en temps reel (CSV vers JSON).
Mettre la theorie en pratique : la puissance des streams en action
Comprendre les concepts des streams Readable, Writable, Duplex, Transform, ainsi que la gestion des événements et la connexion via `pipe()` est essentiel. Cependant, la véritable valeur de ces concepts se révèle lorsqu'on les applique à des problèmes concrets. Les streams sont particulièrement adaptés à deux grands types de scénarios : la manipulation efficace de fichiers volumineux et le traitement de données arrivant en continu (temps réel).
Dans ce chapitre, nous allons explorer quelques exemples pratiques et courants qui illustrent comment les streams Node.js résolvent élégamment ces défis, en mettant en avant les gains en performance et en gestion des ressources.
Cas 1 : Streaming efficace de fichiers volumineux
Problème : Copier un fichier vidéo de plusieurs gigaoctets d'un emplacement à un autre sans saturer la mémoire vive du serveur.
Solution avec Streams : Utiliser `fs.createReadStream` pour lire le fichier source et `fs.createWriteStream` pour écrire dans la destination, connectés par `pipe()`. Node.js gérera le transfert par petits morceaux et la contre-pression.
const fs = require('fs');
const { pipeline } = require('stream/promises'); // Utilisation de la version Promesse pour une meilleure gestion
async function copyLargeFile(sourcePath, destinationPath) {
console.log(`Démarrage de la copie de ${sourcePath} vers ${destinationPath}`);
try {
const readStream = fs.createReadStream(sourcePath);
const writeStream = fs.createWriteStream(destinationPath);
// Utilisation de pipeline pour une gestion robuste des erreurs et du nettoyage
await pipeline(readStream, writeStream);
console.log('Copie terminée avec succès !');
} catch (err) {
console.error('Erreur lors de la copie du fichier :', err);
// En cas d'erreur, pipeline s'assure que les flux sont fermés
// On pourrait vouloir supprimer le fichier destination potentiellement incomplet
try {
await fs.promises.unlink(destinationPath);
console.log('Fichier destination incomplet supprimé.');
} catch (unlinkErr) {
// Ignorer si la suppression échoue (le fichier n'existait peut-être pas)
}
}
}
// Exemple d'appel
copyLargeFile('video_source.mp4', 'video_destination.mp4');Avantages : Consommation mémoire minimale, indépendante de la taille du fichier. La copie s'effectue de manière fluide même pour des fichiers très volumineux.
Problème : Servir un fichier vidéo volumineux via une requête HTTP sans charger tout le fichier en mémoire et en permettant au client de commencer la lecture rapidement (pseudo-streaming).
Solution avec Streams : Créer un serveur HTTP, utiliser `fs.createReadStream` pour lire le fichier demandé, et `pipe()` ce flux directement dans l'objet `response` (qui est un stream Writable).
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream/promises');
const server = http.createServer(async (req, res) => {
if (req.url === '/video') {
const videoPath = 'ma_grosse_video.mp4';
try {
const { size } = await fs.promises.stat(videoPath);
res.writeHead(200, {
'Content-Type': 'video/mp4',
'Content-Length': size
});
const videoStream = fs.createReadStream(videoPath);
await pipeline(videoStream, res); // Pipe le fichier vers la réponse
} catch (err) {
console.error('Erreur lors du streaming vidéo :', err);
if (!res.headersSent) {
res.writeHead(err.code === 'ENOENT' ? 404 : 500);
}
res.end(err.code === 'ENOENT' ? 'Vidéo non trouvée' : 'Erreur serveur');
}
} else {
res.writeHead(404);
res.end('Not Found');
}
});
server.listen(3000, () => {
console.log('Serveur écoutant sur http://localhost:3000/video');
});Avantages : Le serveur n'utilise que très peu de mémoire. La réponse commence immédiatement, permettant au navigateur de démarrer la lecture de la vidéo bien avant la fin du téléchargement. Gestion robuste grâce à `pipeline`.
Cas 2 : Traitement de donnees en temps reel ou en flux
Problème : Recevoir un flux de données JSON (par exemple, depuis une API externe ou un WebSocket), où chaque objet JSON arrive potentiellement en plusieurs chunks TCP. Il faut les assembler, les parser, puis les traiter (par exemple, les logger).
Solution avec Streams : Utiliser un stream Transform personnalisé pour bufferiser les chunks entrants, détecter les objets JSON complets, les parser, puis pousser les objets JavaScript résultants.
const { Transform } = require('stream');
class JsonParser extends Transform {
constructor(options = {}) {
options.readableObjectMode = true; // Ce stream émet des objets JS
super(options);
this._buffer = '';
this._separator = '\n'; // Supposons que les objets JSON sont séparés par des sauts de ligne
}
_transform(chunk, encoding, callback) {
this._buffer += chunk.toString();
let boundary = this._buffer.indexOf(this._separator);
while (boundary !== -1) {
const jsonString = this._buffer.substring(0, boundary);
this._buffer = this._buffer.substring(boundary + this._separator.length);
if (jsonString) {
try {
const jsonObject = JSON.parse(jsonString);
this.push(jsonObject); // Pousse l'objet JS parsé
} catch (err) {
return callback(new Error(`Erreur de parsing JSON: ${err.message}`));
}
}
boundary = this._buffer.indexOf(this._separator);
}
callback(); // Prêt pour le prochain chunk
}
// Gère les données restantes dans le buffer à la fin
_flush(callback) {
if (this._buffer) {
try {
const jsonObject = JSON.parse(this._buffer);
this.push(jsonObject);
} catch (err) {
return callback(new Error(`Erreur de parsing JSON (flush): ${err.message}`));
}
}
callback();
}
}
// --- Utilisation ---
const { Readable } = require('stream');
// Simuler un flux de données JSON entrant
const sourceStream = Readable.from([
'{"id":1, "value":"abc"}\n',
'{"id":2, "value":"de',
'f"}\n{"id":3, "value":"ghi"}\n'
]);
const jsonParser = new JsonParser();
sourceStream
.pipe(jsonParser)
.on('data', (jsonObject) => {
console.log('Objet JSON reçu:', jsonObject);
// Ici, on pourrait insérer l'objet en BDD, l'envoyer ailleurs, etc.
})
.on('error', (err) => console.error('Erreur:', err))
.on('finish', () => console.log('Traitement terminé.'));
Avantages : Traitement au fil de l'eau, résilient aux fragmentations des données. Le stream Transform encapsule la logique de parsing de manière réutilisable.
Problème : Lire un gros fichier CSV contenant des données utilisateur, filtrer les utilisateurs actifs, et écrire le résultat dans un nouveau fichier au format JSON Lines (un objet JSON par ligne).
Solution avec Streams : Enchaîner (`pipe` ou `pipeline`) un `fs.createReadStream`, un stream Transform pour parser les lignes CSV, un autre Transform pour filtrer les utilisateurs actifs et formater en JSON, et enfin un `fs.createWriteStream`.
const fs = require('fs');
const { Transform, pipeline } = require('stream');
const { parse } = require('csv-parse'); // Module externe pour parser le CSV
// Transform pour filtrer et formater
class FilterAndFormat extends Transform {
constructor(options = {}) {
options.objectMode = true; // Accepte des objets (du parseur CSV)
super(options);
}
_transform(userRecord, encoding, callback) {
// Supposons que userRecord est un objet { name: '...', email: '...', active: 'true'/'false' }
if (userRecord.active === 'true') {
const outputUser = { name: userRecord.name, email: userRecord.email };
this.push(JSON.stringify(outputUser) + '\n'); // Pousse une ligne JSON
}
callback();
}
}
// Configuration du parseur CSV
const csvParser = parse({ columns: true, skip_empty_lines: true });
const filterFormat = new FilterAndFormat();
// Utilisation de pipeline pour l'ensemble
pipeline(
fs.createReadStream('users.csv'),
csvParser, // Transform 1: CSV string -> Object
filterFormat, // Transform 2: Object -> JSON string (filtré)
fs.createWriteStream('active_users.jsonl'),
(err) => {
if (err) {
console.error('Pipeline CSV->JSONL échoué:', err);
} else {
console.log('Pipeline CSV->JSONL terminé avec succès.');
}
}
);
Avantages : Pipeline de traitement complet réalisé avec une consommation mémoire minimale. Chaque étape est clairement définie par un stream, rendant le code modulaire et lisible. Utilisation d'un module externe (`csv-parse`) qui est lui-même un stream Transform.
Conclusion : La polyvalence des streams
Ces exemples ne sont qu'un aperçu des possibilités offertes par les streams en Node.js. Que ce soit pour optimiser les opérations sur fichiers, gérer les communications réseau, ou construire des pipelines de transformation de données complexes, les streams fournissent une abstraction puissante et efficace.
Maîtriser leur utilisation, comprendre leurs événements et savoir comment les connecter (idéalement avec `stream.pipeline` pour la robustesse) est une compétence clé pour développer des applications Node.js performantes, scalables et économes en ressources, capables de gérer sereinement des volumes importants de données ou des flux en temps réel.