Nahajate se tukaj

Python OpenMPI

Paralelizacija v PythonMPI z uporabo knjižnice mpi4py

Osnove MPI

Katerakoli tehnika paralelnega programiranja mora za vsak proces omogočiti vsaj naslednje mehanizme:
  • odkriti skupno števil procesov
  • identifikacija procesorja
  • poslati sporočila posameznemu procesu v grupi procesov
  • sprejemanje sporočil od nekega procesa

Osnovni MPI vklučuje preko 120 funkcij, ki so podobne za različne programske jezike.

Primer osnovnega programa MPI v programskem programski jezik C ima naslednji potek in komunikatorje:

#include
int main(int argc, char *argv[])
{
    int rank, size;
    MPI_Init (&argc, &argv);
    MPI_Comm_rank (MPI_COMM_WORLD, &rank);
    MPI_Comm_size (MPI_COMM_WORLD, &size);
    printf( "Jaz sem proces %d. od %d procesov\n", rank, size );
    MPI_Finalize();
}

PythonMPI primeri z uporabo knjižnice Mpi4Py

Za poganjanje MPI programov je potrebno namestiti paralelno okolje OpenMPI in Python z ukazom

module load openmpi python

Po namestitvi poganjamo programe z ukazom

bsub -n 4 -o error.log mpirun python hello.py

Rezultat si preberemo v datoteki error.log, ki jo beremo na nadzornem vozlišču prelog. Za interaktivno delo na enem samem vozlišču ali pa na polovici vozlišča pa lahko uporabimo tudi ukaza

node mpirun python hello.py
half mpirun python hello.py

1. Hello world

# -*- coding: utf-8 -*-
from mpi4py import MPI
comm = MPI.COMM_WORLD
print "Zdravo! Jaz sem proces %d od skupaj %d procesov" % (comm.rank, comm.size)
comm.Barrier() # Počakajmo vse, da pridejo do to, potem pa nadaljujemo

2. Emitiranje ali razpošiljanje vsem se izvaja iz procesa 0.

# -*- coding: utf-8 -*-
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
if comm.rank == 0:
  print "-"*78
  print " Tečemo na %d jedrih" % comm.size
  print "-"*78
comm.Barrier()

# Pripravimo vektor velikosti N=5 elements za razpošiljanje
N = 5
if comm.rank == 0:
  A = np.arange(N, dtype=np.float64); # rank 0 ima vse podatke
else:
  A = np.empty(N, dtype=np.float64); # vsi ostali pa le prazna polja

# Razpošljimo A iz procesa 0 vsem
comm.Bcast( [A, MPI.DOUBLE] )

# Vsi bi morali sedaj imeti enako polje...
print "[%02d] %s" % (comm.rank, A)

3.Primer

Razdelimo večji problem na manjši tako, da pošljemo podatke razdeljeno po koščkih, potem pa vsak pridobi rezultate še od ostalih in nadaljuje z delom s tem, da ima ponovno vse rezultate.

# -*- coding: utf-8 -*-
import numpy as np
from mpi4py import MPI
 
comm = MPI.COMM_WORLD
 
if comm.rank == 0:
  print "-"*78
  print " Tečemo na %d jedrih" % comm.size
  print "-"*78
comm.Barrier()
 
my_N = 4
N = my_N * comm.size
 
if comm.rank == 0:
  A = np.arange(N, dtype=np.float64)
else:
  A = np.empty(N, dtype=np.float64)
 
my_A = np.empty(my_N, dtype=np.float64)
 
# Razdeli  in pošlji (razpošlji) podatke data v my_A polja
comm.Scatter( [A, MPI.DOUBLE], [my_A, MPI.DOUBLE] )
 
if comm.rank == 0:
    print("Po razpošiljanju:")
for r in xrange(comm.size):
  if comm.rank == r:
    print "[%d] %s" % (comm.rank, my_A)
 
comm.Barrier()
 
# Vsi sedaj množijo z 2
my_A *= 2
 
# Vsi poberejo od vseh (preostalo) nazaj v A
comm.Allgather( [my_A, MPI.DOUBLE], [A, MPI.DOUBLE] )
if comm.rank == 0:
    print("Po sestavljanju:")
for r in xrange(comm.size):
  if comm.rank == r:
    print "[%d] %s" % (comm.rank, A)
 
comm.Barrier()

4. Primer

Razprševanje in sestavljanje je uporabno zaradi manjše komunikacije, saj velikokrat ni potrebno, da se vsi podatki razpošljejo vsem ampak le nujno potrebni.

# -*- coding: utf-8 -*-
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()


if rank == 0:
   data = [[1, 2], [3, 4], [5, 6], [7, 8]]
else:
   data = None

print "Prvotni podatki za proces", rank, "so",  data
razprseno = comm.scatter(data)
print "Proces", rank, "je sprejel", razprseno
pomnozeno = [i*2 for i in razprseno]
print "Proces", rank, "je pomnožil z 2 na", pomnozeno
pobrano = comm.gather(razprseno)
print "Pobrano oz oddano za proces", rank, "je", pobrano

5. Primer

Pošiljanje in sprejemanje podatkov med dvema procesoma. tag je uporabniško določljiva oznaka sporočila.

# -*- coding: utf-8 -*-
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
    data = [1, 2, 3]
    comm.send(data, dest=1, tag=12)
    print "Poslani podatki so: ", data
elif rank == 1:
    data = comm.recv(source=0, tag=12)
    print "Sprejeti podatki so: ", data

6. Primer

Z redukcijo izračunamo Pi

# -*- coding: utf-8 -*-
from mpi4py import MPI
import math

def compute_pi(n, start, step):
    h = 1.0 / n
    s = 0.0
    for i in range(start, n, step):
        x = h * (i + 0.5)
        s += 4.0 / (1.0 + x**2)
    return s * h

comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
myrank = comm.Get_rank()

if myrank == 0:
    n = 1000000
else:
    n = None

n = comm.bcast(n, root=0)
mypi = compute_pi(n, myrank, nprocs)
pi = comm.reduce(mypi, op=MPI.SUM, root=0)

if myrank == 0:
    error = abs(pi - math.pi)
    print ("pi je približno %.16f, "
           "napaka je %.16f" % (pi, error))

7. Primer

Mandelbrotov fraktal se ob zaključku shrani v datoteko, ki si jo ogledamo z ukazom

gimp mandelbrot.png
# -*- coding: utf-8 -*-
def mandelbrot(x, y, maxit):
    c = x + y*1j
    z = 0 + 0j
    it = 0
    while abs(z) < 2 and it < maxit:
        z = z**2 + c
        it += 1
    return it

x1, x2 = -2.0, 1.0
y1, y2 = -1.0, 1.0
w, h = 6000, 4000
maxit = 127

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

# število vrstic, ki jih računamo v tem procesu
N = h // size + (h % size > rank)

# prva vrstica se začne na
start = comm.scan(N)-N

# polje za shranitev lokalnih rezultatov
Cl = numpy.zeros([N, w], dtype='i')

# preračunaj dodeljene vrstice
dx = (x2 - x1) / w
dy = (y2 - y1) / h
for i in range(N):
    y = y1 + (i + start) * dy
    for j in range(w):
        x = x1 + j * dx
        Cl[i, j] = mandelbrot(x, y, maxit)

# gather results at root (process 0)
counts = comm.gather(N, root=0)
C = None
if rank == 0:
    C = numpy.zeros([h, w], dtype='i')

rowtype = MPI.INT.Create_contiguous(w)
rowtype.Commit()

comm.Gatherv(sendbuf=[Cl, MPI.INT],
             recvbuf=[C, (counts, None), rowtype],
             root=0)

rowtype.Free()

if comm.rank == 0:
    from matplotlib import pyplot
    pyplot.imshow(C, aspect='equal')
    pyplot.spectral()
    pyplot.savefig("mandelbrot.png")