
Communication entre processus (Queues, Pipes)
Découvrez comment faire communiquer des processus Python entre eux en utilisant les Queues (files d'attente) et les Pipes (tuyaux) du module 'multiprocessing'. Echangez des données de manière sûre et efficace entre processus.
Pourquoi communiquer entre processus ? Partage de données et synchronisation
Lorsque vous utilisez le module `multiprocessing` pour créer plusieurs processus, ces processus s'exécutent dans des espaces mémoire *séparés*. Ils ne partagent pas de variables globales (contrairement aux threads).
Si vous avez besoin de faire communiquer les processus entre eux (pour échanger des données, synchroniser leur exécution, etc.), vous devez utiliser des mécanismes de communication inter-processus (IPC).
Les IPC permettent aux processus :
- D'échanger des données (résultats de calculs, messages, signaux, etc.).
- De se synchroniser (par exemple, attendre qu'un processus ait terminé une tâche avant d'en commencer une autre).
- De coordonner leurs actions.
Le module `multiprocessing` fournit plusieurs mécanismes d'IPC, dont les plus courants sont les queues (`Queue`) et les tuyaux (`Pipe`).
Queues (multiprocessing.Queue) : files d'attente multi-producteurs, multi-consommateurs
Une `Queue` (file d'attente) est un mécanisme de communication inter-processus qui permet à plusieurs processus de partager des données de manière sûre (thread-safe et process-safe).
Une `Queue` fonctionne sur le principe du FIFO (First In, First Out) : le premier élément ajouté à la file d'attente est le premier élément retiré.
Les méthodes principales de `Queue` sont :
- `put(obj)` : Ajoute un objet à la file d'attente.
- `get()` : Retire et retourne un objet de la file d'attente. Si la file d'attente est vide, `get()` bloque jusqu'à ce qu'un élément soit disponible (sauf si vous spécifiez `block=False` ou un timeout).
- `empty()` : Retourne `True` si la file d'attente est vide, `False` sinon.
- `full()` : Retourne `True` si la file d'attente est pleine, `False` sinon (uniquement pertinent si la file d'attente a une taille maximale).
- `qsize()`: Retourne la taille approximative de la file.
Syntaxe :
import multiprocessing
queue = multiprocessing.Queue() # Crée une file d'attente (vide, de taille illimitée par défaut)
# Mettre des éléments dans la file d'attente
queue.put("élément 1")
queue.put(42)
queue.put([1, 2, 3])
# Récupérer des éléments de la file d'attente
element = queue.get()
print(element) # Affiche "élément 1"
element = queue.get()
print(element) # Affiche 42Vous pouvez spécifier une taille maximale pour la file d'attente lors de sa création : `queue = multiprocessing.Queue(maxsize=10)`. Si la file d'attente est pleine, `put()` bloquera jusqu'à ce qu'une place se libère (sauf si vous spécifiez `block=False` ou un timeout).
Les `Queue` sont un moyen simple et efficace de faire communiquer plusieurs processus. Elles sont particulièrement adaptées aux situations où vous avez un ou plusieurs processus "producteurs" qui génèrent des données, et un ou plusieurs processus "consommateurs" qui traitent ces données.
Exemple : producteur-consommateur avec Queue
Voici un exemple classique de communication entre processus utilisant une `Queue` : le modèle producteur-consommateur.
import multiprocessing
import time
import random
def producteur(queue, nb_elements):
"""Produit des éléments et les met dans la file d'attente."""
for i in range(nb_elements):
time.sleep(random.random()) # Simule une opération qui prend du temps
element = f"Elément {i}"
queue.put(element)
print(f"Producteur : a mis '{element}' dans la file")
queue.put(None) # Signal de fin
def consommateur(queue):
"""Consomme des éléments de la file d'attente."""
while True:
element = queue.get()
if element is None:
break # Fin du traitement
print(f"Consommateur : a reçu '{element}'")
time.sleep(random.random()) # Simule un traitement
if __name__ == '__main__':
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producteur, args=(queue, 5))
p2 = multiprocessing.Process(target=consommateur, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
print("Fin du programme")Dans cet exemple :
- Un processus `producteur` génère des éléments (ici, des chaînes de caractères) et les met dans la file d'attente avec `queue.put()`.
- Un processus `consommateur` récupère les éléments de la file d'attente avec `queue.get()` et les traite.
- Le producteur envoie un signal de fin (`None`) au consommateur lorsqu'il a terminé de produire les éléments.
La `Queue` gère la synchronisation entre les processus. Si la file d'attente est vide, `get()` bloque jusqu'à ce qu'un élément soit disponible. Si la file d'attente est pleine (si elle a une taille maximale), `put()` bloque jusqu'à ce qu'une place se libère.
Pipes (multiprocessing.Pipe) : communication bidirectionnelle
Un `Pipe` (tuyau) est un mécanisme de communication inter-processus qui permet une communication *bidirectionnelle* entre *deux* processus.
Un `Pipe` a deux extrémités : une extrémité pour l'envoi et la réception, et une autre extrémité pour l'envoi et la réception. Chaque extrémité est représentée par un objet `Connection`.
La fonction `multiprocessing.Pipe()` retourne une paire d'objets `Connection` connectés par un tuyau.
Syntaxe :
import multiprocessing
connexion1, connexion2 = multiprocessing.Pipe()- `connexion1` et `connexion2` sont les deux extrémités du tuyau.
- Les objets `Connection` ont deux méthodes principales :
- `send(obj)` : Envoie un objet à l'autre extrémité du tuyau.
- `recv()` : Reçoit un objet de l'autre extrémité du tuyau. Si aucun objet n'est disponible, `recv()` bloque jusqu'à ce qu'un objet soit reçu (sauf si vous spécifiez un timeout).
Exemple :
import multiprocessing
def processus_fils(connexion):
"""Envoie un message au processus parent et reçoit une réponse."""
connexion.send("Bonjour du processus fils !")
reponse = connexion.recv()
print(f"Processus fils a reçu : {reponse}")
connexion.close()
if __name__ == '__main__':
# Crée un tuyau
connexion_parent, connexion_fils = multiprocessing.Pipe()
# Crée un processus fils
processus = multiprocessing.Process(target=processus_fils, args=(connexion_fils,))
processus.start()
# Reçoit le message du processus fils
message = connexion_parent.recv()
print(f"Processus parent a reçu : {message}")
# Envoie une réponse au processus fils
connexion_parent.send("Bonjour du processus parent !")
processus.join()
connexion_parent.close()Dans cet exemple :
- Un `Pipe` est créé avec `multiprocessing.Pipe()`, retournant deux objets `Connection`.
- Un processus fils est créé, et on lui passe l'une des extrémités du tuyau (`connexion_fils`).
- Le processus fils envoie un message au processus parent avec `connexion_fils.send()`, puis attend une réponse avec `connexion_fils.recv()`.
- Le processus parent reçoit le message avec `connexion_parent.recv()`, puis envoie une réponse avec `connexion_parent.send()`.
Les `Pipe`s sont plus adaptés à la communication entre deux processus, tandis que les `Queue`s sont plus adaptées à la communication entre plusieurs producteurs et plusieurs consommateurs.
Choisir entre Queue et Pipe
Le choix entre `Queue` et `Pipe` dépend de vos besoins :
- `Queue` :
- Utilisez une `Queue` lorsque vous avez un ou plusieurs producteurs et un ou plusieurs consommateurs.
- Utilisez une `Queue` lorsque l'ordre des messages est important (FIFO).
- Utilisez une `Queue` lorsque vous voulez une communication simple et unidirectionnelle (du producteur vers le consommateur).
- `Pipe` :
- Utilisez un `Pipe` lorsque vous avez une communication bidirectionnelle entre *deux* processus.
- Utilisez un `Pipe` lorsque l'ordre des messages n'est pas crucial.
Dans la plupart des cas, `Queue` est plus simple à utiliser et plus adapté aux situations courantes. `Pipe` est plus spécifique et est utilisé pour des communications plus complexes entre deux processus.
Attention à la sérialisation
Il est important de noter que les objets que vous envoyez à travers une `Queue` ou un `Pipe` doivent être *sérialisables* avec `pickle`. Cela signifie que tous les objets Python ne peuvent pas être transmis. En général, les types de base (entiers, flottants, chaînes, listes, tuples, dictionnaires contenant des types sérialisables) sont sérialisables, mais les objets plus complexes (comme les objets de classes personnalisées) peuvent nécessiter une configuration supplémentaire (définir des méthodes spéciales pour la sérialisation, ou utiliser une bibliothèque de sérialisation alternative).