Python multiprocessing: lygiagretūs procesai

Kodėl vienas procesas kartais neužtenka

Dirbant su Python, anksčiau ar vėliau susiduri su situacija, kai programa tiesiog lėtai veikia. Ne todėl, kad blogai parašytas kodas (na, bent jau ne visada), o todėl, kad vienas procesoriaus branduolys dirba iš peties, o kiti trys ar septyni ramiai snaudžia. Čia ir prasideda įdomi kelionė į daugiaprocesinio programavimo pasaulį.

Python turi vieną nemalonią ypatybę – GIL (Global Interpreter Lock). Tai mechanizmas, kuris leidžia vienam gijų (thread) vienu metu vykdyti Python bytecode. Skamba bauginančiai? Iš esmės tai reiškia, kad net jei naudojate threading modulį, jūsų kodas vis tiek nepasinaudos keliais procesorių branduoliais efektyviai. Bent jau kai kalbame apie skaičiavimus, o ne I/O operacijas.

Štai čia ir ateina į pagalbą multiprocessing modulis. Jis leidžia sukurti tikrus atskirus procesus, kiekvienas su savo Python interpretatorium ir atminties erdve. Taip, tai reiškia didesnę atminties sąnaudą, bet kartu ir galimybę išnaudoti visą procesoriaus galią.

Kaip tai veikia praktiškai

Pradėkime nuo paprasčiausio pavyzdžio. Tarkime, turite funkciją, kuri atlieka kažkokius sunkius skaičiavimus:

„`python
import time
from multiprocessing import Process

def sunkus_darbas(skaicius):
print(f”Pradedamas darbas su {skaicius}”)
time.sleep(2) # Simuliuojame sunkius skaičiavimus
rezultatas = skaicius * skaicius
print(f”Baigtas darbas su {skaicius}, rezultatas: {rezultatas}”)

if __name__ == ‘__main__’:
procesai = []

for i in range(4):
p = Process(target=sunkus_darbas, args=(i,))
procesai.append(p)
p.start()

for p in procesai:
p.join()
„`

Svarbu atkreipti dėmesį į tą if __name__ == ‘__main__’ dalį. Tai ne tik gera praktika – Windows sistemoje tai būtina, nes kitaip programa bandys rekursyviai kurti naujus procesus ir viskas baigsis labai blogai.

Šis kodas sukurs keturis atskirus procesus, kurie visi dirbs lygiagrečiai. Vietoj 8 sekundžių (4 * 2), programa užtruks tik apie 2 sekundes. Jei turite keturių branduolių procesorių, visi jie dabar dirbs.

Process Pool – kai reikia valdyti minią

Kurti po vieną procesą rankiniu būdu yra gerai, kai jų nedaug. Bet kai reikia apdoroti šimtus ar tūkstančius užduočių, geriau naudoti Pool. Tai proceso baseinas, kuris automatiškai tvarko procesų kūrimą ir užduočių paskirstymą.

„`python
from multiprocessing import Pool
import time

def apdoroti_duomenis(x):
time.sleep(0.5)
return x * x

if __name__ == ‘__main__’:
duomenys = range(20)

# Sukuriame baseiną su 4 procesais
with Pool(processes=4) as pool:
rezultatai = pool.map(apdoroti_duomenis, duomenys)

print(rezultatai)
„`

Pool automatiškai padalins 20 užduočių keturiems procesams. Kai vienas procesas baigia darbą, jis gauna naują užduotį iš eilės. Tai daug efektyviau nei kurti 20 atskirų procesų.

Yra keletas būdų naudoti Pool:
map() – panašus į įprastą Python map(), grąžina rezultatus ta pačia tvarka
imap() – grąžina iteratorių, rezultatai ateina iš karto kai tik paskaičiuoti
apply() – vykdo vieną funkciją ir laukia rezultato (retai naudojamas)
apply_async() – vykdo asinchroniškai, grąžina rezultatą per callback

Duomenų dalijimasis tarp procesų

Čia prasideda smagumas. Kadangi kiekvienas procesas turi savo atminties erdvę, tiesiog perduoti kintamuosius neveiks taip, kaip įpratote. Yra keli būdai spręsti šią problemą.

Queue – tai eilė, kuri veikia tarp procesų. Vienas procesas gali dėti duomenis į eilę, kitas – juos išimti:

„`python
from multiprocessing import Process, Queue

def gamintojas(eilė):
for i in range(5):
eilė.put(f”Duomenys {i}”)
print(f”Įdėta: Duomenys {i}”)

def vartotojas(eilė):
while True:
duomenys = eilė.get()
if duomenys == „STOP”:
break
print(f”Gauta: {duomenys}”)

if __name__ == ‘__main__’:
q = Queue()

p1 = Process(target=gamintojas, args=(q,))
p2 = Process(target=vartotojas, args=(q,))

p1.start()
p2.start()

p1.join()
q.put(„STOP”)
p2.join()
„`

Pipe – tai dvikryptis komunikacijos kanalas tarp dviejų procesų. Greitesnis nei Queue, bet veikia tik su dviem procesais:

„`python
from multiprocessing import Process, Pipe

def siuntejas(conn):
conn.send(„Labas iš kito proceso!”)
conn.close()

if __name__ == ‘__main__’:
parent_conn, child_conn = Pipe()
p = Process(target=siuntejas, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
„`

Bendros atminties objektai

Kartais reikia, kad visi procesai matytų tą patį kintamąjį. Tam yra Value ir Array:

„`python
from multiprocessing import Process, Value, Array
import time

def didintuvas(skaicius, masyvas):
for i in range(10):
skaicius.value += 1
time.sleep(0.1)

for i in range(len(masyvas)):
masyvas[i] *= 2

if __name__ == ‘__main__’:
bendras_skaicius = Value(‘i’, 0) # ‘i’ reiškia integer
bendras_masyvas = Array(‘i’, [1, 2, 3, 4, 5])

procesai = [Process(target=didintuvas, args=(bendras_skaicius, bendras_masyvas))
for _ in range(3)]

for p in procesai:
p.start()

for p in procesai:
p.join()

print(f”Galutinė reikšmė: {bendras_skaicius.value}”)
print(f”Galutinis masyvas: {bendras_masyvas[:]}”)
„`

Dėmesio! Kai keli procesai vienu metu keičia tą patį kintamąjį, gali kilti problemų. Tam reikia naudoti Lock:

„`python
from multiprocessing import Process, Value, Lock

def saugus_didinimas(skaicius, lock):
for _ in range(1000):
with lock:
skaicius.value += 1

if __name__ == ‘__main__’:
skaicius = Value(‘i’, 0)
lock = Lock()

procesai = [Process(target=saugus_didinimas, args=(skaicius, lock))
for _ in range(10)]

for p in procesai:
p.start()
for p in procesai:
p.join()

print(f”Rezultatas: {skaicius.value}”) # Turėtų būti 10000
„`

Kada naudoti ir kada vengti

Multiprocessing nėra universalus sprendimas visoms problemoms. Yra situacijos, kai jis puikiai tinka, ir situacijos, kai tik apsunkins gyvenimą.

Kada naudoti:
– CPU-intensyvios užduotys (vaizdo apdorojimas, mašininis mokymasis, didelių duomenų analizė)
– Ilgai trunkantys skaičiavimai, kuriuos galima padalinti į nepriklausomas dalis
– Kai turite daug branduolių ir norite juos išnaudoti
– Duomenų apdorojimas, kur kiekvienas elementas apdorojamas nepriklausomai

Kada vengti:
– Lengvos užduotys, kur proceso kūrimo overhead viršija naudą
– Kai procesai turi daug komunikuoti tarpusavyje (overhead dėl IPC)
– I/O intensyvios užduotys (geriau naudoti asyncio ar threading)
– Kai atminties sąnaudos kritinės (kiekvienas procesas dubliuoja duomenis)

Praktinis pavyzdys – vaizdo failų apdorojimas. Tarkime, turite 100 nuotraukų ir norite jas sumažinti:

„`python
from multiprocessing import Pool
from PIL import Image
import os

def sumažinti_nuotrauka(failas):
try:
img = Image.open(failas)
img.thumbnail((800, 600))
naujas_pavadinimas = f”small_{os.path.basename(failas)}”
img.save(naujas_pavadinimas)
return f”Apdorota: {failas}”
except Exception as e:
return f”Klaida su {failas}: {e}”

if __name__ == ‘__main__’:
failai = [f”nuotrauka_{i}.jpg” for i in range(100)]

with Pool() as pool:
rezultatai = pool.map(sumažinti_nuotrauka, failai)

for r in rezultatai:
print(r)
„`

Debugging ir įprastos klaidos

Daugiaprocesinis programavimas gali būti sudėtingas debuginti. Štai keletas dažniausių problemų ir kaip jų išvengti:

Problema nr. 1: Procesas nesileidžia Windows sistemoje

Jei matote klaidą apie rekursyvų procesų kūrimą, greičiausiai pamiršote if __name__ == '__main__'. Windows negali fork() procesų kaip Linux, todėl importuoja visą failą iš naujo. Be šio patikrinimo, kiekvienas naujas procesas bandys sukurti dar daugiau procesų.

Problema nr. 2: Rezultatai nepasiekiami

Kai naudojate Process be Queue ar Pipe, rezultatai tiesiog dingsta. Procesas negali tiesiogiai grąžinti reikšmės. Naudokite Pool.map() arba komunikacijos mechanizmus.

Problema nr. 3: Procesas užstringa

Jei procesas užstringa, patikrinkite:
– Ar tikrai iškviečiate join()?
– Ar Queue neužsipildė (turi limitą)?
– Ar nėra deadlock su Lock?

Naudingas patarimas – naudokite timeout:

„`python
p = Process(target=funkcija)
p.start()
p.join(timeout=10) # Laukti ne ilgiau kaip 10 sekundžių

if p.is_alive():
print(„Procesas užstrigo, terminuojame”)
p.terminate()
p.join()
„`

Pažangesnės technikos ir optimizavimas

Kai jau suprantate pagrindus, galite eiti giliau. Manager objektas leidžia dalintis sudėtingesnėmis duomenų struktūromis:

„`python
from multiprocessing import Process, Manager

def pridėti_į_žodyną(žodynas, raktas, reikšmė):
žodynas[raktas] = reikšmė

if __name__ == ‘__main__’:
with Manager() as manager:
bendras_žodynas = manager.dict()

procesai = []
for i in range(5):
p = Process(target=pridėti_į_žodyną,
args=(bendras_žodynas, f”raktas_{i}”, i*10))
procesai.append(p)
p.start()

for p in procesai:
p.join()

print(dict(bendras_žodynas))
„`

Manager gali kurti bendrus list, dict, Namespace ir kitus objektus. Bet atminkite – jis lėtesnis nei Value ar Array, nes naudoja proxy objektus.

ProcessPoolExecutor iš concurrent.futures modulio – modernesnė alternatyva Pool:

„`python
from concurrent.futures import ProcessPoolExecutor
import time

def užduotis(n):
time.sleep(1)
return n * n

if __name__ == ‘__main__’:
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(užduotis, i) for i in range(10)]

for future in futures:
print(future.result())
„`

ProcessPoolExecutor turi geresnę klaidų tvarkymą ir veikia kaip context manager, todėl nereikia rūpintis procesų užvėrimu.

Kai viskas sudėta į vietą

Multiprocessing Python yra galingas įrankis, kai reikia išspausti maksimumą iš savo aparatūros. Taip, jis sudėtingesnis nei paprastas nuoseklus kodas. Taip, reikia galvoti apie procesų komunikaciją ir sinchronizaciją. Bet kai turite tikrai sunkią užduotį ir kelis procesorių branduolius, skirtumas gali būti dramatiškas.

Pradėkite nuo paprastų dalykų – Pool.map() dažniausiai užtenka daugumai atvejų. Kai suprasite, kaip tai veikia, galite eksperimentuoti su Queue, Pipe ir kitais mechanizmais. Svarbiausia – testuokite su realiomis užduotimis ir matuokite našumą. Kartais paprastas nuoseklus kodas veikia greičiau nei sudėtingas daugiaprocesinis, ypač jei užduotys lengvos arba duomenų perdavimas užima daugiau laiko nei pats skaičiavimas.

Ir nepamirškite – debugging daugiaprocesiniame kode gali būti iššūkis, todėl pradėkite su paprastu variantu, įsitikinkite, kad veikia, ir tik tada komplikuokite. Jūsų ateities aš padėkos už šį atsargumą, kai nebus reikalinga ieškoti deadlock priežasties 3 valandą nakties.

Daugiau

Python abstract base classes: interface apibrėžimas