NumPy in MPI4Py se uporabljata skupaj in sta učinkovita predvsem zaradi tega ker je ndarray kontunuirani blok podatkov, ki se lahko posreduje med procesi učinkovito v enem kosu in ne tako kot običajno, ko se v MPI4Py pošiljajo objekti, ki se morajo v procesih pakirati/razpakirati ob prenosu. Pomembna razlika pri pošiljanji/sprejemanju je v prvi črki ukaza, ki je z veliko začetnico (Send/Recv) in ne tako kot je običaj (send/recv). Druga pomemna značilnost NumPy prenosa je v sprejemu, saj mora biti polje v katerega sprejemamo že predalocirano.
# -*- coding: utf-8 -*-
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
if comm.rank == 0:
print "-"*78
print " Tečemo na %d jedrih" % comm.size
print "-"*78
comm.Barrier()
# Pripravimo vektor velikosti N=5 elements za razpošiljanje
N = 5
if comm.rank == 0:
A = np.arange(N, dtype=np.float64); # rank 0 ima vse podatke
else:
A = np.empty(N, dtype=np.float64); # vsi ostali pa le prazna polja
# Razpošljimo A iz procesa 0 vsem
comm.Bcast( [A, MPI.DOUBLE] )
# Vsi bi morali sedaj imeti enako polje...
print "[%02d] %s" % (comm.rank, A)
Razdelimo večji problem na manjši tako, da pošljemo podatke razdeljeno po koščkih, potem pa vsak pridobi rezultate še od ostalih in nadaljuje z delom s tem, da ima ponovno vse rezultate.
# -*- coding: utf-8 -*-
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
if comm.rank == 0:
print "-"*78
print " Tečemo na %d jedrih" % comm.size
print "-"*78
comm.Barrier()
my_N = 4
N = my_N * comm.size
if comm.rank == 0:
A = np.arange(N, dtype=np.float64)
else:
A = np.empty(N, dtype=np.float64)
my_A = np.empty(my_N, dtype=np.float64)
# Razdeli in pošlji (razpošlji) podatke data v my_A polja
comm.Scatter( [A, MPI.DOUBLE], [my_A, MPI.DOUBLE] )
if comm.rank == 0:
print("Po razpošiljanju:")
for r in xrange(comm.size):
if comm.rank == r:
print "[%d] %s" % (comm.rank, my_A)
comm.Barrier()
# Vsi sedaj množijo z 2
my_A *= 2
# Vsi poberejo od vseh (preostalo) nazaj v A
comm.Allgather( [my_A, MPI.DOUBLE], [A, MPI.DOUBLE] )
if comm.rank == 0:
print("Po sestavljanju:")
for r in xrange(comm.size):
if comm.rank == r:
print "[%d] %s" % (comm.rank, A)
comm.Barrier()
Razprševanje in sestavljanje je uporabno zaradi manjše komunikacije, saj velikokrat ni potrebno, da se vsi podatki razpošljejo vsem ampak le nujno potrebni.
# -*- coding: utf-8 -*-
# half mpirun -np 4 python ex4.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
data = [[1, 2], [3, 4], [5, 6], [7, 8]]
else:
data = None
print "Prvotni podatki za proces", rank, "so", data
razprseno = comm.scatter(data)
print "Proces", rank, "je sprejel", razprseno
pomnozeno = [i*2 for i in razprseno]
print "Proces", rank, "je pomnožil z 2 na", pomnozeno
pobrano = comm.gather(razprseno)
print "Pobrano oz oddano za proces", rank, "je", pobrano
Pošiljanje in sprejemanje podatkov med dvema procesoma. tag je uporabniško določljiva oznaka sporočila.
# -*- coding: utf-8 -*-
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = [1, 2, 3]
comm.send(data, dest=1, tag=12)
print "Poslani podatki so: ", data
elif rank == 1:
data = comm.recv(source=0, tag=12)
print "Sprejeti podatki so: ", data
Z redukcijo izračunamo Pi
# -*- coding: utf-8 -*-
from mpi4py import MPI
import math
def compute_pi(n, start, step):
h = 1.0 / n
s = 0.0
for i in range(start, n, step):
x = h * (i + 0.5)
s += 4.0 / (1.0 + x**2)
return s * h
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
myrank = comm.Get_rank()
if myrank == 0:
n = 1000000
else:
n = None
n = comm.bcast(n, root=0)
mypi = compute_pi(n, myrank, nprocs)
pi = comm.reduce(mypi, op=MPI.SUM, root=0)
if myrank == 0:
error = abs(pi - math.pi)
print ("pi je približno %.16f, "
"napaka je %.16f" % (pi, error))
Mandelbrotov fraktal se ob zaključku shrani v datoteko, ki si jo ogledamo z ukazom
gimp mandelbrot.png
# -*- coding: utf-8 -*-
def mandelbrot(x, y, maxit):
c = x + y*1j
z = 0 + 0j
it = 0
while abs(z) < 2 and it < maxit:
z = z**2 + c
it += 1
return it
x1, x2 = -2.0, 1.0
y1, y2 = -1.0, 1.0
w, h = 6000, 4000
maxit = 127
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
# število vrstic, ki jih računamo v tem procesu
N = h // size + (h % size > rank)
# prva vrstica se začne na
start = comm.scan(N)-N
# polje za shranitev lokalnih rezultatov
Cl = numpy.zeros([N, w], dtype='i')
# preračunaj dodeljene vrstice
dx = (x2 - x1) / w
dy = (y2 - y1) / h
for i in range(N):
y = y1 + (i + start) * dy
for j in range(w):
x = x1 + j * dx
Cl[i, j] = mandelbrot(x, y, maxit)
# gather results at root (process 0)
counts = comm.gather(N, root=0)
C = None
if rank == 0:
C = numpy.zeros([h, w], dtype='i')
rowtype = MPI.INT.Create_contiguous(w)
rowtype.Commit()
comm.Gatherv(sendbuf=[Cl, MPI.INT],
recvbuf=[C, (counts, None), rowtype],
root=0)
rowtype.Free()
if comm.rank == 0:
from matplotlib import pyplot
pyplot.imshow(C, aspect='equal')
pyplot.spectral()
pyplot.savefig("mandelbrot.png")