NumPy v MPI4Py

NumPy in MPI4Py se uporabljata skupaj in sta učinkovita predvsem zaradi tega ker je ndarray kontunuirani blok podatkov, ki se lahko posreduje med procesi učinkovito v enem kosu in ne tako kot običajno, ko se v MPI4Py pošiljajo objekti, ki se morajo v procesih pakirati/razpakirati ob prenosu. Pomembna razlika pri pošiljanji/sprejemanju je v prvi črki ukaza, ki je z veliko začetnico (Send/Recv) in ne tako kot je običaj (send/recv). Druga pomemna značilnost NumPy prenosa je v sprejemu, saj mora biti polje v katerega sprejemamo že predalocirano.

Broadcast – Emitiranje ali razpošiljanje vsem se izvaja iz procesa 0.

# -*- coding: utf-8 -*-
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
if comm.rank == 0:
  print "-"*78
  print " Tečemo na %d jedrih" % comm.size
  print "-"*78
comm.Barrier()

# Pripravimo vektor velikosti N=5 elements za razpošiljanje
N = 5
if comm.rank == 0:
  A = np.arange(N, dtype=np.float64); # rank 0 ima vse podatke
else:
  A = np.empty(N, dtype=np.float64); # vsi ostali pa le prazna polja

# Razpošljimo A iz procesa 0 vsem
comm.Bcast( [A, MPI.DOUBLE] )

# Vsi bi morali sedaj imeti enako polje...
print "[%02d] %s" % (comm.rank, A)

Scatter – razpršimo problem in ga razpečamo po obdelavi z Allgather

Razdelimo večji problem na manjši tako, da pošljemo podatke razdeljeno po koščkih, potem pa vsak pridobi rezultate še od ostalih in nadaljuje z delom s tem, da ima ponovno vse rezultate.

# -*- coding: utf-8 -*-
import numpy as np
from mpi4py import MPI
 
comm = MPI.COMM_WORLD
 
if comm.rank == 0:
  print "-"*78
  print " Tečemo na %d jedrih" % comm.size
  print "-"*78
comm.Barrier()
 
my_N = 4
N = my_N * comm.size
 
if comm.rank == 0:
  A = np.arange(N, dtype=np.float64)
else:
  A = np.empty(N, dtype=np.float64)
 
my_A = np.empty(my_N, dtype=np.float64)
 
# Razdeli  in pošlji (razpošlji) podatke data v my_A polja
comm.Scatter( [A, MPI.DOUBLE], [my_A, MPI.DOUBLE] )
 
if comm.rank == 0:
    print("Po razpošiljanju:")
for r in xrange(comm.size):
  if comm.rank == r:
    print "[%d] %s" % (comm.rank, my_A)
 
comm.Barrier()
 
# Vsi sedaj množijo z 2
my_A *= 2
 
# Vsi poberejo od vseh (preostalo) nazaj v A
comm.Allgather( [my_A, MPI.DOUBLE], [A, MPI.DOUBLE] )
if comm.rank == 0:
    print("Po sestavljanju:")
for r in xrange(comm.size):
  if comm.rank == r:
    print "[%d] %s" % (comm.rank, A)
 
comm.Barrier()

Razprševanje objektov

Razprševanje in sestavljanje je uporabno zaradi manjše komunikacije, saj velikokrat ni potrebno, da se vsi podatki razpošljejo vsem ampak le nujno potrebni.

# -*- coding: utf-8 -*-
# half mpirun -np 4  python ex4.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
   data = [[1, 2], [3, 4], [5, 6], [7, 8]]
else:
   data = None

print "Prvotni podatki za proces", rank, "so",  data
razprseno = comm.scatter(data)
print "Proces", rank, "je sprejel", razprseno
pomnozeno = [i*2 for i in razprseno]
print "Proces", rank, "je pomnožil z 2 na", pomnozeno
pobrano = comm.gather(razprseno)
print "Pobrano oz oddano za proces", rank, "je", pobrano

Tag je oznaka sporočila

Pošiljanje in sprejemanje podatkov med dvema procesoma. tag je uporabniško določljiva oznaka sporočila.

# -*- coding: utf-8 -*-
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
    data = [1, 2, 3]
    comm.send(data, dest=1, tag=12)
    print "Poslani podatki so: ", data
elif rank == 1:
    data = comm.recv(source=0, tag=12)
    print "Sprejeti podatki so: ", data

Redukcija

Z redukcijo izračunamo Pi

# -*- coding: utf-8 -*-
from mpi4py import MPI
import math

def compute_pi(n, start, step):
    h = 1.0 / n
    s = 0.0
    for i in range(start, n, step):
        x = h * (i + 0.5)
        s += 4.0 / (1.0 + x**2)
    return s * h

comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
myrank = comm.Get_rank()

if myrank == 0:
    n = 1000000
else:
    n = None

n = comm.bcast(n, root=0)
mypi = compute_pi(n, myrank, nprocs)
pi = comm.reduce(mypi, op=MPI.SUM, root=0)

if myrank == 0:
    error = abs(pi - math.pi)
    print ("pi je približno %.16f, "
           "napaka je %.16f" % (pi, error))

Mandelbrot

Mandelbrotov fraktal se ob zaključku shrani v datoteko, ki si jo ogledamo z ukazom

gimp mandelbrot.png
# -*- coding: utf-8 -*-
def mandelbrot(x, y, maxit):
    c = x + y*1j
    z = 0 + 0j
    it = 0
    while abs(z) < 2 and it < maxit:
        z = z**2 + c
        it += 1
    return it

x1, x2 = -2.0, 1.0
y1, y2 = -1.0, 1.0
w, h = 6000, 4000
maxit = 127

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

# število vrstic, ki jih računamo v tem procesu
N = h // size + (h % size > rank)

# prva vrstica se začne na
start = comm.scan(N)-N

# polje za shranitev lokalnih rezultatov
Cl = numpy.zeros([N, w], dtype='i')

# preračunaj dodeljene vrstice
dx = (x2 - x1) / w
dy = (y2 - y1) / h
for i in range(N):
    y = y1 + (i + start) * dy
    for j in range(w):
        x = x1 + j * dx
        Cl[i, j] = mandelbrot(x, y, maxit)

# gather results at root (process 0)
counts = comm.gather(N, root=0)
C = None
if rank == 0:
    C = numpy.zeros([h, w], dtype='i')

rowtype = MPI.INT.Create_contiguous(w)
rowtype.Commit()

comm.Gatherv(sendbuf=[Cl, MPI.INT],
             recvbuf=[C, (counts, None), rowtype],
             root=0)

rowtype.Free()

if comm.rank == 0:
    from matplotlib import pyplot
    pyplot.imshow(C, aspect='equal')
    pyplot.spectral()
    pyplot.savefig("mandelbrot.png")