SORU
11 Aralık 2011, Pazar


Python çoklu - Boru vs Sıra

Python's multiprocessing package kuyrukları ve borular arasındaki temel farklar nelerdir?

Ne senaryoları bir diğer üzerinde seçmeliyim? Pipe() kullanmak avantajlı mı? Queue() kullanmak avantajlı mı?

CEVAP
11 Aralık 2011, Pazar


  • Pipe() sadece iki bitiş noktası olabilir.

  • Queue() birden fazla üretici ve tüketiciler olabilir.

Onları kullanma zamanı

Eğer ikiden fazla nokta iletişim kurmak istersen, Queue() kullanın.

Eğer mutlak performans ihtiyacınız varsa, bir Pipe() Queue() Pipe() üzerine kurulu olduğundan çok daha hızlı.

Performans Kıyaslama

Hadi iki işlem spawn ve aralarında mesajlar mümkün olduğunca çabuk göndermek istediğiniz varsayalım. Bu benzer testleri Pipe() Queue()... Bu 11.10 ve Python 2.7.2 bir ThinkpadT61 çalışan Ubuntu arasında bir drag yarışı zamanlama sonuçları.

BİLGİNİZE, attım sonuçları için JoinableQueue() bir bonus olarak; JoinableQueue() hesapları için görevler queue.task_done() denir (o bile bilmiyor ile ilgili belirli bir görev, sadece sayıları bitmemiş görevleri sırası) queue.join() bilir işi bitirdi.

Bu cevap her alt kod...

mpenning@mpenning-T61:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$

Özetle Pipe() Queue() bir daha yaklaşık üç kat daha hızlı. Gerçekten faydaları var olmalı JoinableQueue() düşünme bile.

BONUS MALZEME 2

Çoklu işlem bazı kısayollar bilmiyorsanız zor hata ayıklama yapmak bilgi akışı içinde ince değişiklikler tanıtır. Örneğin, bir sözlükte birçok koşullar altında dizin oluşturma ile iyi çalışan bir senaryo olabilir, ama nadiren de olsa bazı giriş başarısız.

Normalde tüm python işlemi sonlandırılır başarısızlık için ipuçları alıyoruz; ancak, istenmeyen kaza tracebacks eğer Çoklu işlem fonksiyonu çöküyor konsola basılı alamadım. Bilinmeyen Çoklu işlem çöküyor izleme süreci çöktü ne bir ipucu olmadan çok zor.

Çoklu çarpışma bilginize izini buldum en kolay yolu* / except *24 ve kullanımı traceback.print_exc() tüm çoklu işlem fonksiyonu sarmak için:

import traceback
def reader(args):
    try:
        # Insert stuff to be multiprocessed here
        return args[0]['that']
    except:
        print "FATAL: reader({0}) exited while multiprocessing".format(args) 
        traceback.print_exc()

Bir kaza bulunca şimdi, şöyle bir şey görürsünüz:

FATAL: reader([{'crash', 'this'}]) exited while multiprocessing
Traceback (most recent call last):
  File "foo.py", line 19, in __init__
    self.run(task_q, result_q)
  File "foo.py", line 46, in run
    raise ValueError
ValueError

Kaynak Kodu:


"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time

def reader(pipe):
    output_p, input_p = pipe
    input_p.close()    # We are only reading
    while True:
        try:
            msg = output_p.recv()    # Read from the output pipe and do nothing
        except EOFError:
            break

def writer(count, input_p):
    for ii in xrange(0, count):
        input_p.send(ii)             # Write 'count' numbers into the input pipe

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        output_p, input_p = Pipe()
        reader_p = Process(target=reader, args=((output_p, input_p),))
        reader_p.start()     # Launch the reader process

        output_p.close()       # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, input_p) # Send a lot of stuff to reader()
        input_p.close()        # Ask the reader to stop when it reads EOF
        reader_p.join()
        print "Sending %s numbers to Pipe() took %s seconds" % (count, 
            (time.time() - _start))

"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time

def reader(queue):
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        queue = Queue()   # reader() reads from queue
                          # writer() writes to queue
        reader_p = Process(target=reader, args=((queue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        _start = time.time()
        writer(count, queue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print "Sending %s numbers to Queue() took %s seconds" % (count, 
            (time.time() - _start))

"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time

def reader(queue):
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        queue.task_done()

def writer(count, queue):
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        queue = JoinableQueue()   # reader() reads from queue
                                  # writer() writes to queue
        reader_p = Process(target=reader, args=((queue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        _start = time.time()
        writer(count, queue) # Send a lot of stuff to reader()
        queue.join()         # Wait for the reader to finish
        print "Sending %s numbers to JoinableQueue() took %s seconds" % (count, 
            (time.time() - _start))

Bunu Paylaş:
  • Google+
  • E-Posta
Etiketler:

YORUMLAR

SPONSOR VİDEO

Rastgele Yazarlar

  • Manuel Vizcaino

    Manuel Vizca

    27 Mayıs 2008
  • Rickymon Tero

    Rickymon Ter

    1 Ocak 2007
  • Crossover

    Crossover

    18 HAZİRAN 2007