Verwenden Sie ein numpy Array im Shared Memory für Multiprocessing

Ich möchte ein numpy Array in Shared Memory für die Verwendung mit dem Multiprocessing-Modul verwenden. Die Schwierigkeit besteht darin, sie wie ein Nummernfeld zu verwenden und nicht nur als Ctypes-Array.

from multiprocessing import Process, Array import scipy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) arr = Array('d', unshared_arr) print "Originally, the first two elements of arr = %s"%(arr[:2]) # Create, start, and finish the child processes p = Process(target=f, args=(arr,)) p.start() p.join() # Printing out the changed values print "Now, the first two elements of arr = %s"%arr[:2] 

Dies erzeugt eine Ausgabe wie zum Beispiel:

 Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976] Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976] 

Auf das Array kann in Ctypes zugegriffen werden, zB arr[i] macht Sinn. Es handelt sich jedoch nicht um ein numpiges Array, und ich kann keine Operationen wie -1*arr oder arr.sum() . Ich nehme an, eine Lösung wäre, das ctypes-Array in ein numpy-Array zu konvertieren. Allerdings glaube ich (abgesehen davon, dass ich nicht in der Lage bin, das zu schaffen), dass es nicht mehr geteilt wird.

Es scheint, dass es eine Standardlösung für ein allgemeines Problem geben würde.

Hinzufügen zu @unutbu’s (nicht mehr verfügbar) und @Henry Gomersalls Antworten. Sie können shared_arr.get_lock() , um den Zugriff bei Bedarf zu synchronisieren:

 shared_arr = mp.Array(ctypes.c_double, N) # ... def f(i): # could be anything numpy accepts as an index such another numpy array with shared_arr.get_lock(): # synchronize access arr = np.frombuffer(shared_arr.get_obj()) # no data copying arr[i] = -arr[i] 

Beispiel

 import ctypes import logging import multiprocessing as mp from contextlib import closing import numpy as np info = mp.get_logger().info def main(): logger = mp.log_to_stderr() logger.setLevel(logging.INFO) # create shared array N, M = 100, 11 shared_arr = mp.Array(ctypes.c_double, N) arr = tonumpyarray(shared_arr) # fill with random values arr[:] = np.random.uniform(size=N) arr_orig = arr.copy() # write to arr from different processes with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p: # many processes access the same slice stop_f = N // 10 p.map_async(f, [slice(stop_f)]*M) # many processes access different slices of the same array assert M % 2 # odd step = N // 10 p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)]) p.join() assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig) def init(shared_arr_): global shared_arr shared_arr = shared_arr_ # must be inherited, not passed as an argument def tonumpyarray(mp_arr): return np.frombuffer(mp_arr.get_obj()) def f(i): """synchronized.""" with shared_arr.get_lock(): # synchronize access g(i) def g(i): """no synchronization.""" info("start %s" % (i,)) arr = tonumpyarray(shared_arr) arr[i] = -1 * arr[i] info("end %s" % (i,)) if __name__ == '__main__': mp.freeze_support() main() 

Wenn Sie keinen synchronisierten Zugriff benötigen oder eigene Sperren mp.Array() ist mp.Array() nicht erforderlich. Sie könnten mp.sharedctypes.RawArray in diesem Fall verwenden.

Dem Array Objekt ist eine get_obj() -Methode zugeordnet, die das Ctypes-Array zurückgibt, das eine Pufferschnittstelle darstellt. Ich denke das folgende sollte funktionieren …

 from multiprocessing import Process, Array import scipy import numpy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) a = Array('d', unshared_arr) print "Originally, the first two elements of arr = %s"%(a[:2]) # Create, start, and finish the child process p = Process(target=f, args=(a,)) p.start() p.join() # Print out the changed values print "Now, the first two elements of arr = %s"%a[:2] b = numpy.frombuffer(a.get_obj()) b[0] = 10.0 print a[0] 

Wenn diese Option ausgeführt wird, wird das erste Element a jetzt 10.0 ausgegeben, wobei a und b nur zwei Ansichten in demselben Speicher sind.

Um sicherzustellen, dass es immer noch Multiprozessor sicher ist, glaube ich, dass Sie die Methoden acquire und release , die auf dem Array Objekt a und seiner integrierten Sperre vorhanden sind, verwenden müssen, um sicherzustellen, dass alle sicher zugegriffen werden (obwohl ich nicht bin) ein Experte für das Multiprozessor-Modul).

Während die Antworten bereits gegeben sind, gibt es eine viel einfachere Lösung für dieses Problem, wenn zwei Bedingungen erfüllt sind:

  1. Sie befinden sich auf einem POSIX-kompatiblen Betriebssystem (z. B. Linux, Mac OSX); und
  2. Ihre untergeordneten processe benötigen schreibgeschützten Zugriff auf das freigegebene Array.

In diesem Fall müssen Sie sich nicht mit der expliziten Erstellung von Variablen beschäftigen, da die untergeordneten processe mit einem Fork erstellt werden. Ein gegabeltes Kind teilt automatisch den Speicherplatz des Elternteils. Im Kontext von Python-Multiprocessing bedeutet dies, dass es alle Variablen auf Modulebene teilt. Beachten Sie, dass dies nicht für Argumente gilt, die Sie explizit an Ihre untergeordneten processe oder an die functionen übergeben, die Sie in einem multiprocessing.Pool aufrufen.

Ein einfaches Beispiel:

 import multiprocessing import numpy as np # will hold the (implicitly mem-shared) data data_array = None # child worker function def job_handler(num): # built-in id() returns unique memory ID of a variable return id(data_array), np.sum(data_array) def launch_jobs(data, num_jobs=5, num_worker=4): global data_array data_array = data pool = multiprocessing.Pool(num_worker) return pool.map(job_handler, range(num_jobs)) # create some random data and execute the child jobs mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10))) # this will print 'True' on POSIX OS, since the data was shared print(np.all(np.asarray(mem_ids) == id(data_array))) 

Ich habe ein kleines Python-Modul geschrieben, das den gemeinsamen POSIX-Speicher verwendet, um numpige Arrays zwischen Python-Interpretern freizugeben. Vielleicht finden Sie es praktisch.

https://pypi.python.org/pypi/SharedArray

So funktioniert das:

 import numpy as np import SharedArray as sa # Create an array in shared memory a = sa.create("test1", 10) # Attach it as a different array. This can be done from another # python interpreter as long as it runs on the same computer. b = sa.attach("test1") # See how they are actually sharing the same memory block a[0] = 42 print(b[0]) # Destroying a does not affect b. del a print(b[0]) # See how "test1" is still present in shared memory even though we # destroyed the array a. sa.list() # Now destroy the array "test1" from memory. sa.delete("test1") # The array b is not affected, but once you destroy it then the # data are lost. print(b[0]) 

Sie können das sharedmem Modul verwenden: https://bitbucket.org/cleemesser/numpy-sharedmem

Hier ist Ihr ursprünglicher Code, diesmal mit Shared Memory, der sich wie ein NumPy-Array verhält (beachten Sie die zusätzliche letzte statement, die eine NumPy sum() -function aufruft):

 from multiprocessing import Process import sharedmem import scipy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) arr = sharedmem.empty(N) arr[:] = unshared_arr.copy() print "Originally, the first two elements of arr = %s"%(arr[:2]) # Create, start, and finish the child process p = Process(target=f, args=(arr,)) p.start() p.join() # Print out the changed values print "Now, the first two elements of arr = %s"%arr[:2] # Perform some NumPy operation print arr.sum()