Paralelizacija v PythonMPI z uporabo knjižnice mpi4py
Osnove MPI
Katerakoli tehnika paralelnega programiranja mora za vsak proces omogočiti vsaj naslednje mehanizme:
- odkriti skupno števil procesov
- identifikacija procesorja
- poslati sporočila posameznemu procesu v grupi procesov
- sprejemanje sporočil od nekega procesa
Osnovni MPI vklučuje preko 120 funkcij, ki so podobne za različne programske jezike.
Primer osnovnega programa MPI v programskem programski jezik C ima naslednji potek in komunikatorje:
#include
int main
(int argc
, char *argv
[])
{
int rank
, size
;
MPI_Init
(&argc
, &argv
);
MPI_Comm_rank
(MPI_COMM_WORLD
, &rank
);
MPI_Comm_size
(MPI_COMM_WORLD
, &size
);
printf( "Jaz sem proces %d. od %d procesov\n", rank
, size
);
MPI_Finalize
();
}
PythonMPI primeri z uporabo knjižnice Mpi4Py
Za poganjanje MPI programov je potrebno namestiti paralelno okolje OpenMPI in Python z ukazom
module load openmpi python
Po namestitvi poganjamo programe z ukazom
bsub -n 4 -o error.log mpirun python hello.py
Rezultat si preberemo v datoteki error.log, ki jo beremo na nadzornem vozlišču prelog. Za interaktivno delo na enem samem vozlišču ali pa na polovici vozlišča pa lahko uporabimo tudi ukaza
node mpirun python hello.py
half mpirun python hello.py
1. Hello world
# -*- coding: utf-8 -*-
from mpi4py import MPI
comm = MPI.COMM_WORLD
print "Zdravo! Jaz sem proces %d od skupaj %d procesov" % (comm.rank, comm.size)
comm.Barrier() # Počakajmo vse, da pridejo do to, potem pa nadaljujemo
2. Emitiranje ali razpošiljanje vsem se izvaja iz procesa 0.
# -*- coding: utf-8 -*-
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
if comm.rank == 0:
print "-"*78
print " Tečemo na %d jedrih" % comm.size
print "-"*78
comm.Barrier()
# Pripravimo vektor velikosti N=5 elements za razpošiljanje
N = 5
if comm.rank == 0:
A = np.arange(N, dtype=np.float64); # rank 0 ima vse podatke
else:
A = np.empty(N, dtype=np.float64); # vsi ostali pa le prazna polja
# Razpošljimo A iz procesa 0 vsem
comm.Bcast( [A, MPI.DOUBLE] )
# Vsi bi morali sedaj imeti enako polje...
print "[%02d] %s" % (comm.rank, A)
3.Primer
Razdelimo večji problem na manjši tako, da pošljemo podatke razdeljeno po koščkih, potem pa vsak pridobi rezultate še od ostalih in nadaljuje z delom s tem, da ima ponovno vse rezultate.
# -*- coding: utf-8 -*-
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
if comm.rank == 0:
print "-"*78
print " Tečemo na %d jedrih" % comm.size
print "-"*78
comm.Barrier()
my_N = 4
N = my_N * comm.size
if comm.rank == 0:
A = np.arange(N, dtype=np.float64)
else:
A = np.empty(N, dtype=np.float64)
my_A = np.empty(my_N, dtype=np.float64)
# Razdeli in pošlji (razpošlji) podatke data v my_A polja
comm.Scatter( [A, MPI.DOUBLE], [my_A, MPI.DOUBLE] )
if comm.rank == 0:
print("Po razpošiljanju:")
for r in xrange(comm.size):
if comm.rank == r:
print "[%d] %s" % (comm.rank, my_A)
comm.Barrier()
# Vsi sedaj množijo z 2
my_A *= 2
# Vsi poberejo od vseh (preostalo) nazaj v A
comm.Allgather( [my_A, MPI.DOUBLE], [A, MPI.DOUBLE] )
if comm.rank == 0:
print("Po sestavljanju:")
for r in xrange(comm.size):
if comm.rank == r:
print "[%d] %s" % (comm.rank, A)
comm.Barrier()
4. Primer
Razprševanje in sestavljanje je uporabno zaradi manjše komunikacije, saj velikokrat ni potrebno, da se vsi podatki razpošljejo vsem ampak le nujno potrebni.
# -*- coding: utf-8 -*-
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
data = [[1, 2], [3, 4], [5, 6], [7, 8]]
else:
data = None
print "Prvotni podatki za proces", rank, "so", data
razprseno = comm.scatter(data)
print "Proces", rank, "je sprejel", razprseno
pomnozeno = [i*2 for i in razprseno]
print "Proces", rank, "je pomnožil z 2 na", pomnozeno
pobrano = comm.gather(razprseno)
print "Pobrano oz oddano za proces", rank, "je", pobrano
5. Primer
Pošiljanje in sprejemanje podatkov med dvema procesoma. tag je uporabniško določljiva oznaka sporočila.
# -*- coding: utf-8 -*-
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = [1, 2, 3]
comm.send(data, dest=1, tag=12)
print "Poslani podatki so: ", data
elif rank == 1:
data = comm.recv(source=0, tag=12)
print "Sprejeti podatki so: ", data
6. Primer
Z redukcijo izračunamo Pi
# -*- coding: utf-8 -*-
from mpi4py import MPI
import math
def compute_pi(n, start, step):
h = 1.0 / n
s = 0.0
for i in range(start, n, step):
x = h * (i + 0.5)
s += 4.0 / (1.0 + x**2)
return s * h
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
myrank = comm.Get_rank()
if myrank == 0:
n = 1000000
else:
n = None
n = comm.bcast(n, root=0)
mypi = compute_pi(n, myrank, nprocs)
pi = comm.reduce(mypi, op=MPI.SUM, root=0)
if myrank == 0:
error = abs(pi - math.pi)
print ("pi je približno %.16f, "
"napaka je %.16f" % (pi, error))
7. Primer
Mandelbrotov fraktal se ob zaključku shrani v datoteko, ki si jo ogledamo z ukazom
gimp mandelbrot.png
# -*- coding: utf-8 -*-
def mandelbrot(x, y, maxit):
c = x + y*1j
z = 0 + 0j
it = 0
while abs(z) < 2 and it < maxit:
z = z**2 + c
it += 1
return it
x1, x2 = -2.0, 1.0
y1, y2 = -1.0, 1.0
w, h = 6000, 4000
maxit = 127
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
# število vrstic, ki jih računamo v tem procesu
N = h // size + (h % size > rank)
# prva vrstica se začne na
start = comm.scan(N)-N
# polje za shranitev lokalnih rezultatov
Cl = numpy.zeros([N, w], dtype='i')
# preračunaj dodeljene vrstice
dx = (x2 - x1) / w
dy = (y2 - y1) / h
for i in range(N):
y = y1 + (i + start) * dy
for j in range(w):
x = x1 + j * dx
Cl[i, j] = mandelbrot(x, y, maxit)
# gather results at root (process 0)
counts = comm.gather(N, root=0)
C = None
if rank == 0:
C = numpy.zeros([h, w], dtype='i')
rowtype = MPI.INT.Create_contiguous(w)
rowtype.Commit()
comm.Gatherv(sendbuf=[Cl, MPI.INT],
recvbuf=[C, (counts, None), rowtype],
root=0)
rowtype.Free()
if comm.rank == 0:
from matplotlib import pyplot
pyplot.imshow(C, aspect='equal')
pyplot.spectral()
pyplot.savefig("mandelbrot.png")