Apache Spark big data apdorojimas

Kas tas Spark ir kodėl jis tapo tokiu populiarus

Kai pradedi dirbti su dideliais duomenų kiekiais, greitai supranti, kad įprastiniai įrankiai tiesiog nebeišveža. Vienas serveris, viena duomenų bazė – tai gali veikti, kol tavo duomenys telpa į atmintį ir procesorius spėja viską apdoroti. Bet kai kalba eina apie terabaitus ar net petabaitus informacijos, reikia kažko galingesnio.

Apache Spark atsirado kaip atsakas į šią problemą. Sukurtas 2009 metais Kalifornijos universitete Berkeley, jis greitai tapo vienu populiariausių big data apdorojimo įrankių. Ir ne be priežasties – Spark gali būti net 100 kartų greitesnis už savo pirmtaką Hadoop MapReduce, kai duomenys apdorojami atmintyje.

Pagrindinė Spark filosofija paprasta: paskirstyk duomenis ir skaičiavimus per daugelį kompiuterių, apdorok viską lygiagrečiai, o rezultatus sujunk į vieną visumą. Skamba paprasta, bet įgyvendinimas – tai jau kitas reikalas. Laimei, Spark didelę dalį sudėtingumo paslepia po patogia API, todėl programuotojams nebereikia galvoti apie visus žemo lygio paskirstyto skaičiavimo niuansus.

Architektūra ir pagrindiniai komponentai

Spark architektūra sukurta pagal master-worker principą. Turime vieną Driver programą, kuri koordinuoja darbą, ir daug Executor procesų, kurie atlieka faktinį darbą. Kai paleidžiate Spark aplikaciją, Driver programa sukuria vadinamąjį SparkContext – tai tarsi valdymo centras, per kurį vyksta visa komunikacija.

Vienas iš genialių Spark sprendimų – Resilient Distributed Dataset (RDD). Tai pagrindinė duomenų abstrakcija, kuri leidžia dirbti su paskirstytais duomenimis tarsi jie būtų paprastas masyvas. RDD yra nemutabilus (immutable) – negalite keisti jo turinio, bet galite kurti naujus RDD transformacijomis. Tai gali atrodyti kaip apribojimas, bet iš tikrųjų tai suteikia milžinišką pranašumą – jei kažkas sugenda, Spark gali lengvai atkurti prarastus duomenis.

Spark ekosistema susideda iš kelių komponentų. Spark Core – tai branduolys, ant kurio statoma visa kita. Spark SQL leidžia dirbti su struktūrizuotais duomenimis naudojant SQL užklausas. Spark Streaming – realaus laiko duomenų srautų apdorojimui. MLlib – mašininio mokymosi biblioteka. GraphX – grafų analizei. Kiekvienas iš šių komponentų sprendžia specifines problemas, bet visi jie dalijasi ta pačia infrastruktūra.

Kaip realiai pradėti dirbti su Spark

Pirmas žingsnis – įsidiegti Spark savo kompiuteryje. Nors Spark skirtas klasteriams, galite jį paleisti ir vietinėje mašinoje testavimui. Jums reikės Java (bent 8 versijos) ir paties Spark. Atsisiųskite iš oficialios svetainės, išpakuokite ir nustatykite aplinkos kintamuosius. Tai nėra sudėtinga, bet gali užtrukti, jei niekada nedarėte nieko panašaus.

Programuoti su Spark galite Python, Scala, Java arba R kalbomis. Python yra populiariausias pasirinkimas pradedantiesiems – PySpark API yra intuityvus ir gerai dokumentuotas. Štai paprastas pavyzdys, kaip nuskaityti tekstinį failą ir suskaičiuoti žodžius:


from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WordCount").getOrCreate()
text_file = spark.read.text("input.txt")
counts = text_file.rdd.flatMap(lambda line: line.value.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("output")

Šis kodas atrodo gana paprastai, bet po gaubtu vyksta daug dalykų. Spark automatiškai paskirstys failą per visus darbuotojus, kiekvienas apdoros savo dalį, o rezultatai bus sujungti. Jūs apie tai net negalvojate – tiesiog rašote kodą tarsi dirbtumėte su vienu failu vienoje mašinoje.

Transformacijos ir akcijos – Spark širdis

Spark operacijos skirstomos į dvi kategorijas: transformacijas ir akcijas. Tai labai svarbi koncepcija, kurią privalote suprasti. Transformacijos (map, filter, groupBy) yra tingios (lazy) – jos iš tikrųjų nevykdomos iš karto. Spark tiesiog įsimena, ką norite padaryti, ir kuria vykdymo planą. Tik kai iškviečiate akciją (count, collect, save), Spark pradeda faktinį darbą.

Kodėl taip? Nes tai leidžia Spark optimizuoti vykdymą. Jei turite grandinę transformacijų, Spark gali jas sujungti, pašalinti nereikalingus žingsnius ir sumažinti duomenų perkėlimų tarp mazgų kiekį. Tai vadinama „execution plan optimization” ir tai viena iš priežasčių, kodėl Spark toks greitas.

Pavyzdžiui, jei filtruojate duomenis ir iš karto po to juos projektuojate (pasirenkate tik kai kuriuos stulpelius), Spark gali pritaikyti filtrą prieš skaitydamas visus duomenis. Tai gali sutaupyti daug laiko ir resursų, ypač kai dirbate su dideliais failais.

Dar vienas svarbus dalykas – partition strategija. Spark dalija duomenis į partijas (partitions), ir kiekviena partija apdorojama atskirame darbuotojo thread’e. Jei turite per mažai partijų, neišnaudojate viso klasterio galios. Jei per daug – sugaišite laiko koordinacijai. Paprastai gera taisyklė – turėti 2-3 partijas kiekvienam CPU core.

Spark SQL ir DataFrames – struktūrizuotų duomenų galia

Nors RDD yra galingas, dažniausiai dirbsite su Spark DataFrames. Tai aukštesnio lygio abstrakcija, panaši į pandas DataFrame arba SQL lentelę. DataFrames turi schemas – žinote, kokie stulpeliai egzistuoja ir kokio jie tipo. Tai leidžia Spark dar labiau optimizuoti vykdymą.

Su DataFrames galite naudoti SQL užklausas. Taip, įprastas SQL! Tai labai patogu, nes daugelis žmonių jau moka SQL ir nereikia mokytis naujos sintaksės. Galite registruoti DataFrame kaip laikiną lentelę ir vykdyti užklausas:


df = spark.read.json("users.json")
df.createOrReplaceTempView("users")
result = spark.sql("SELECT age, COUNT(*) FROM users GROUP BY age")
result.show()

Spark SQL turi Catalyst optimizer – tai užklausų optimizavimo variklis, kuris analizuoja jūsų užklausą ir generuoja efektyviausią vykdymo planą. Jis gali perrašyti užklausas, pakeisti operacijų tvarką, nuspręsti, kada naudoti broadcast joins vietoj shuffle joins, ir daug kitų gudrybių.

Vienas iš dažniausių klausimų – kada naudoti RDD, o kada DataFrame? Paprastai atsakymas – naudokite DataFrames, nebent turite labai specifinį atvejį, kai reikia žemo lygio kontrolės. DataFrames yra greitesni, lengviau optimizuojami ir turi geresnę API. RDD vis dar naudingi, kai dirbate su nestruktūrizuotais duomenimis arba reikia labai specifinės logikos.

Realaus laiko duomenų apdorojimas su Structured Streaming

Vienas iš įspūdingiausių Spark gebėjimų – realaus laiko duomenų srautų apdorojimas. Structured Streaming leidžia apdoroti duomenis, kurie ateina nuolat, naudojant tą pačią API kaip ir batch processing. Tai reiškia, kad galite parašyti vieną kodą, kuris veiks ir su istoriniais duomenimais, ir su realaus laiko srautu.

Pagrindinė idėja paprasta – įsivaizduokite, kad jūsų duomenų srautas yra begalinė lentelė, kuri nuolat auga. Kiekvieną kartą, kai ateina nauji duomenys, jie pridedami prie šios lentelės galo. Jūsų užklausa vykdoma periodiškai, apdorodama naujus duomenis. Spark pasirūpina visais sudėtingais dalykais – būsenų valdymu, fault tolerance, exactly-once semantika.

Pavyzdžiui, galite skaityti duomenis iš Kafka topic ir apdoroti juos realiu laiku:


stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()

query = stream_df.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("console") \
.start()

Structured Streaming palaiko įvairius šaltinius – Kafka, socket connections, failų sistemas. Galite rašyti rezultatus į įvairias vietas – duomenų bazes, failų sistemas, Kafka. Tai labai lankstu ir galinga, bet reikia suprasti kai kuriuos niuansus, pavyzdžiui, watermarking, kai dirbate su vėluojančiais duomenimis.

Optimizavimo triukai ir best practices

Spark gali būti labai greitas, bet gali būti ir labai lėtas, jei nežinote, ką darote. Štai keletas dalykų, į kuriuos verta atkreipti dėmesį.

Pirma, atmintis. Spark mėgsta atmintį – kuo daugiau, tuo geriau. Bet reikia suprasti, kaip ji naudojama. Spark dalijasi atmintį tarp execution memory (skaičiavimams) ir storage memory (cache’inimui). Galite keisti šį santykį per konfigūraciją, bet default nustatymai paprastai veikia gerai.

Cache’inimas – tai viena iš galingiausių Spark funkcijų. Jei turite DataFrame, kurį naudojate kelis kartus, pažymėkite jį cache() arba persist(). Tai išsaugos duomenis atmintyje, ir kiti skaičiavimai bus daug greitesni. Bet neužmirškite unpersist(), kai duomenys nebereikalingi – kitaip užkimšite atmintį.

Shuffle operacijos – tai Spark priešas numeris vienas. Shuffle vyksta, kai duomenys turi būti perkelti tarp skirtingų darbuotojų – pavyzdžiui, per groupBy arba join operacijas. Tai lėta, nes reikia rašyti į diską ir siųsti per tinklą. Stenkitės minimizuoti shuffle: naudokite broadcast joins mažiems duomenų rinkiniams, filtruokite duomenis kuo anksčiau, optimizuokite particionavimą.

Dar vienas svarbus dalykas – data skew. Tai kai vienos partijos turi daug daugiau duomenų nei kitos. Vienas darbuotojas dirbs ilgai, kiti lauks. Sprendimas – repartition su geresniu key pasirinkimu arba salting techniką (pridėti atsitiktinį prefix prie key).

Ką daryti, kai kažkas negerai

Spark UI – tai jūsų geriausias draugas debuginant. Kai paleidžiate Spark aplikaciją, automatiškai startuoja web interface (paprastai port 4040). Ten matote visus jobs, stages, tasks, galite peržiūrėti execution plans, pamatyti, kur leidžiamas laikas, kiek duomenų shuffle’inama.

Dažniausia problema – OutOfMemoryError. Tai gali nutikti dėl kelių priežasčių. Galbūt bandote collect() per daug duomenų į driver programą. Galbūt turite per mažai executor memory. Galbūt yra data skew. Pažiūrėkite į Spark UI, pamatykite, kuriame stage’e klaida įvyksta, ir analizuokite.

Dar viena dažna problema – lėtas vykdymas. Pirmiausia patikrinkite, ar naudojate DataFrames vietoj RDD. Pažiūrėkite į execution plan – ar yra nereikalingų shuffle operacijų? Ar duomenys gerai particionuoti? Ar naudojate cache’inimą, kur reikia?

Logging taip pat svarbus. Spark naudoja log4j, ir galite konfigūruoti, ką ir kaip loginti. Bet būkite atsargūs – per daug loginimo gali sulėtinti aplikaciją. DEBUG level’is production aplinkoje paprastai yra bloga idėja.

Kada Spark yra tinkamas pasirinkimas ir kada ne

Spark puikiai tinka, kai turite daug duomenų ir reikia juos apdoroti lygiagrečiai. Jei analizuojate terabaitus logų, apdorojate milijonus įvykių per sekundę, treniruojate mašininio mokymosi modelius su dideliais duomenų rinkiniais – Spark yra puikus pasirinkimas.

Bet Spark nėra sidabrinė kulka. Jei turite nedidelį duomenų kiekį (kelios dešimtys gigabaitų), paprastas Python scriptas su pandas gali būti greitesnis ir paprastesnis. Spark turi overhead – reikia startuoti JVM, koordinuoti darbuotojus, shuffle’inti duomenis. Tai atsipirka tik su dideliais duomenų kiekiais.

Taip pat Spark nėra geriausias pasirinkimas transakciniams workload’ams. Jei reikia greitų point queries arba daug mažų write operacijų, geriau naudoti tradicinę duomenų bazę. Spark skirtas batch processing ir streaming analytics, ne OLTP.

Infrastruktūros požiūriu, Spark gali būti paleidžiamas įvairiai. Galite naudoti standalone režimą (pats Spark valdo klasterį), YARN (Hadoop ekosistemoje), Mesos, arba Kubernetes. Debesyse galite naudoti managed services – AWS EMR, Azure Databricks, Google Dataproc. Tai sutaupo daug galvos skausmo su setup ir maintenance.

Kalbant apie ateities perspektyvas, Spark toliau vystosi. Naujausios versijos fokusuojasi į dar geresnį performance, adaptyvų užklausų vykdymą, geresnį Kubernetes palaikymą. Bendruomenė aktyvi, dokumentacija gera, darbo rinkoje paklausa didelė. Jei dirbate su big data, Spark mokėti verta.

Taigi, Spark nėra paprastas įrankis, bet kai jį išmokstate, jis atvers duris į big data pasaulį. Pradėkite nuo paprastų pavyzdžių, eksperimentuokite su mažais duomenų rinkiniais, skaitykite dokumentaciją, žiūrėkite į Spark UI, mokykitės iš klaidų. Su laiku viskas susidės į vieną paveikslą, ir suprasite, kodėl Spark tapo tokiu populiarus big data apdorojimo įrankiu.

Daugiau

Yandex Cloud: Rusijos debesų platforma