Apache Storm realaus laiko skaičiavimai

Kas tas Apache Storm ir kodėl jis vis dar aktualus

Kai kalbame apie realaus laiko duomenų apdorojimą, Apache Storm yra vienas iš tų įrankių, kurie atsirado gana anksti ir išliko aktualūs net ir šiandien. Nors dabar turime daugybę alternatyvų – nuo Apache Flink iki Kafka Streams – Storm vis dar turi savo vietą technologijų ekosistemoje.

Storm atsirado maždaug 2011 metais, kai Nathan Marz jį sukūrė dirbdamas BackType kompanijoje. Vėliau Twitter įsigijo BackType ir padarė Storm atvirojo kodo projektu. Tai buvo lūžio taškas – staiga visi, kas norėjo apdoroti duomenis realiu laiku, turėjo įrankį, kuris tai galėjo padaryti efektyviai ir patikimai.

Pagrindinė Storm idėja paprasta: jūs apibrėžiate skaičiavimų topologiją, kuri veikia nenutrūkstamai ir apdoroja duomenų srautus. Skirtingai nei batch processing sistemose (pavyzdžiui, Hadoop MapReduce), kur paleidžiate darbą, laukiate rezultatų ir baigiate, Storm topologijos veikia be perstojo, apdorodamos duomenis iš karto, kai tik jie atsiranda.

Kaip Storm veikia po gaubtu

Storm architektūra gali atrodyti sudėtinga iš pradžių, bet kai suprantate pagrindines koncepcijas, viskas susidėlioja į vietą. Sistema susideda iš kelių pagrindinių komponentų, kurie dirba kartu.

Nimbus yra pagrindinis koordinatorius – galima sakyti, Storm „smegenys”. Jis priima topologijas, paskirsto užduotis darbuotojams (workers) ir stebi, kad viskas veiktų sklandžiai. Jei kažkas negerai, Nimbus pasirūpina, kad darbas būtų perskirstytas.

Supervisor demonai veikia kiekviename darbo mazge. Jie klausosi Nimbus nurodymų ir paleidžia ar sustabdo darbuotojų procesus pagal poreikį. Supervisor yra kaip vietinis vadybininkas, kuris užtikrina, kad jo mazge viskas veiktų taip, kaip reikia.

ZooKeeper čia atlieka koordinavimo vaidmenį. Jis saugo būseną ir padeda Nimbus bei Supervisor komponentams komunikuoti tarpusavyje. Be ZooKeeper, Storm sistema tiesiog negalėtų veikti paskirstytoje aplinkoje.

Pati topologija susideda iš dviejų pagrindinių elementų: spouts ir bolts. Spout yra duomenų šaltinis – tai gali būti Kafka topic, RabbitMQ eilė, Twitter API ar bet kas kita, kas generuoja duomenis. Bolt yra apdorojimo logikos vienetas – jis gauna duomenis, atlieka tam tikrus veiksmus ir gali perduoti rezultatus kitiems bolts.

Praktinis topologijos kūrimas

Geriausia mokytis praktiškai, todėl pažiūrėkime, kaip sukurti paprastą Storm topologiją. Tarkime, norime analizuoti Twitter žinutes realiu laiku ir skaičiuoti, kokie hashtagai populiariausi.

Pirmiausia reikia sukurti spout, kuris skaitys Twitter srautą:


public class TwitterSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private TwitterStream twitterStream;

@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
// Twitter stream inicializacija
twitterStream = new TwitterStreamFactory().getInstance();
twitterStream.addListener(new StatusListener() {
public void onStatus(Status status) {
collector.emit(new Values(status.getText()));
}
// kiti metodai...
});
twitterStream.sample();
}

@Override
public void nextTuple() {
Utils.sleep(100);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweet"));
}
}

Dabar reikia bolt, kuris išskirs hashtag’us iš žinučių:


public class HashtagExtractorBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
String tweet = tuple.getStringByField("tweet");
Pattern pattern = Pattern.compile("#\\w+");
Matcher matcher = pattern.matcher(tweet);

while (matcher.find()) {
String hashtag = matcher.group();
collector.emit(new Values(hashtag, 1));
}
collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("hashtag", "count"));
}
}

Ir galiausiai bolt, kuris skaičiuos hashtag’ų dažnumą:


public class HashtagCounterBolt extends BaseRichBolt {
private OutputCollector collector;
private Map counts = new HashMap<>();

@Override
public void execute(Tuple tuple) {
String hashtag = tuple.getStringByField("hashtag");
Integer count = counts.get(hashtag);
if (count == null) count = 0;
count++;
counts.put(hashtag, count);

collector.emit(new Values(hashtag, count));
collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("hashtag", "count"));
}
}

Garantijos ir patikimumas

Vienas iš Storm privalumų – galimybė pasirinkti, kokio lygio patikimumą norite. Storm siūlo tris garantijų lygius, ir kiekvienas turi savo kompromisus.

At-most-once reiškia, kad kiekvienas pranešimas bus apdorotas daugiausiai vieną kartą. Tai greičiausias variantas, bet jei kažkas nepavyksta, duomenys gali būti prarasti. Tinka situacijoms, kur greitis svarbesnis už tikslumą – pavyzdžiui, realaus laiko monitoringui, kur vienas praleistas duomenų taškas nėra kritiškas.

At-least-once garantuoja, kad kiekvienas pranešimas bus apdorotas bent vieną kartą. Jei kažkas nepavyksta, Storm automatiškai pakartoja apdorojimą. Problema ta, kad tas pats pranešimas gali būti apdorotas kelis kartus. Tai dažniausiai naudojamas variantas, nes jis suteikia gerą balansą tarp greičio ir patikimumo.

Exactly-once semantika Storm pasiekiama naudojant Trident API – aukštesnio lygio abstrakcija virš Storm. Tai lėčiausias variantas, bet garantuoja, kad kiekvienas pranešimas bus apdorotas tiksliai vieną kartą. Būtina finansinėms transakcijoms ar kitoms kritinėms operacijoms.

Kad Storm galėtų užtikrinti patikimumą, jūsų kodas turi teisingai naudoti ack() ir fail() metodus. Kai bolt sėkmingai apdoroja tuple, jis turi iškviesti collector.ack(tuple). Jei kažkas nepavyksta, reikia kviesti collector.fail(tuple), ir Storm automatiškai pakartoja apdorojimą.

Našumo optimizavimas ir skalabilumas

Kai jūsų Storm topologija veikia gamyboje, greičiausiai susidursite su našumo klausimais. Yra keletas pagrindinių dalykų, į kuriuos verta atkreipti dėmesį.

Paralelizmas yra pirmasis dalykas, kurį reikia suprasti. Storm leidžia kontroliuoti, kiek executor’ių ir task’ų veikia kiekvienam komponentui. Executor yra gija (thread), o task yra faktinė spout ar bolt instancija. Galite turėti kelis task’us vienam executor’iui.

Kai kuriate topologiją, galite nurodyti paralelizmo lygį:


TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("twitter-spout", new TwitterSpout(), 2);
builder.setBolt("hashtag-extractor", new HashtagExtractorBolt(), 4)
.shuffleGrouping("twitter-spout");
builder.setBolt("hashtag-counter", new HashtagCounterBolt(), 2)
.fieldsGrouping("hashtag-extractor", new Fields("hashtag"));

Čia turime 2 spout instancijas, 4 hashtag extractor bolt instancijas ir 2 counter bolt instancijas. Skaičiai parinkti ne atsitiktinai – paprastai norite, kad apdorojimo bolt’ų būtų daugiau nei spout’ų, nes jie atlieka sunkesnį darbą.

Grouping strategijos taip pat labai svarbios našumui. Shuffle grouping paskirsto tuple’us atsitiktinai – gerai, kai visi bolt’ai atlieka vienodą darbą. Fields grouping užtikrina, kad tuple’ai su tuo pačiu lauko reikšme visada pateks į tą patį bolt – būtina, kai reikia agregatuoti duomenis.

All grouping siunčia tuple į visus bolt’us – naudinga broadcast tipo operacijoms. Global grouping siunčia visus tuple’us į vieną bolt instanciją – naudinga, kai reikia centralizuoto apdorojimo, bet gali tapti buteliu kakleliu.

Buferio dydžiai taip pat turi įtakos našumui. Storm naudoja LMAX Disruptor biblioteką vidiniam pranešimų perdavimui, ir galite reguliuoti buferio dydžius konfigūracijoje. Didesni buferiai gali padėti, kai turite burst’us, bet naudoja daugiau atminties.

Integracijos su kitomis sistemomis

Storm retai veikia izoliuotai – paprastai jis yra dalies didesnės duomenų pipeline. Gera žinia ta, kad Storm turi puikų palaikymą įvairioms integracijoms.

Kafka yra turbūt populiariausia integracija. Storm-Kafka connector leidžia lengvai skaityti iš Kafka topic’ų su visomis patikimumo garantijomis. Galite naudoti KafkaSpout, kuris automatiškai tvarko offset’us ir palaiko at-least-once semantiką:


KafkaSpoutConfig kafkaConfig =
KafkaSpoutConfig.builder("localhost:9092", "my-topic")
.setGroupId("storm-consumer")
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
.build();

KafkaSpout kafkaSpout = new KafkaSpout<>(kafkaConfig);

Redis dažnai naudojamas kaip greita cache ar būsenos saugykla. Storm-Redis connector suteikia bolt’us, kurie gali rašyti į Redis įvairiais būdais – nuo paprastų string operacijų iki sorted set’ų ir hash’ų.

HDFS ir HBase integracija leidžia saugoti apdorotus duomenis ilgalaikiam saugojimui. Tai ypač naudinga, kai norite turėti ir realaus laiko, ir istorinę analizę.

Elasticsearch puikiai tinka realaus laiko paieškos ir analizės scenarijams. Galite siųsti apdorotus duomenis tiesiai į Elasticsearch ir turėti beveik momentinę prieigą per Kibana dashboards.

Svarbu paminėti, kad integracijos turėtų būti asinchroninės ir turėti retry logikos. Jei išorinė sistema laikinai nepasiekiama, jūsų topologija neturėtų griūti – turėtų bandyti iš naujo su eksponentiniu backoff.

Monitoringas ir debugging

Kai jūsų Storm topologija veikia gamyboje, jums reikia žinoti, kas vyksta. Storm turi įmontuotą UI, kuris rodo daug naudingos informacijos, bet tai tik pradžia.

Storm UI rodo topologijos struktūrą, kiek tuple’ų apdorojama, kokia latencija, kiek klaidų įvyko. Tai puikus starting point, bet realiam production monitoringui reikia daugiau.

Metrics yra būtini. Storm palaiko custom metrics, kurias galite siųsti į jūsų monitoring sistemą – Graphite, InfluxDB, Prometheus ar ką nors kita. Galite sekti bet ką – nuo business metrikų (pvz., kiek transakcijų per sekundę) iki techninių (pvz., garbage collection pauzės).

Štai kaip galite pridėti custom metric:


public class MyBolt extends BaseRichBolt {
private transient CountMetric countMetric;

@Override
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.countMetric = new CountMetric();
context.registerMetric("my-counter", countMetric, 60);
}

@Override
public void execute(Tuple tuple) {
// apdorojimo logika
countMetric.incr();
collector.ack(tuple);
}
}

Logging taip pat kritiškai svarbus. Storm naudoja Log4j2, ir jūs turėtumėte tinkamai sukonfigūruoti log level’ius. Development aplinkoje galite naudoti DEBUG, bet production’e turėtumėte laikytis INFO ar WARNING, kad nesugeneruotumėte per daug log’ų.

Viena iš problemų su paskirstytomis sistemomis – log’ai yra išsibarstę po daugelį serverių. Čia praverčia centralizuotas logging – ELK stack (Elasticsearch, Logstash, Kibana) ar panašūs sprendimai. Galite agregaoti visus log’us vienoje vietoje ir lengvai ieškoti problemų.

Tracing padeda suprasti, kaip tuple keliauja per topologiją. Galite pridėti unique ID kiekvienam tuple ir sekti jo kelionę. Tai ypač naudinga, kai bandote suprasti, kodėl kai kurie tuple’ai apdorojami lėčiau nei kiti.

Kada rinktis Storm ir kada ne

Storm nėra sidabrinis kulka, kuris tinka visiems atvejams. Yra situacijų, kur jis puikiai tinka, ir situacijų, kur geriau rinktis ką nors kita.

Storm puikiai tinka, kai jums reikia:
– Apdoroti duomenis su labai maža latencija (milisekundės)
– Paprastos event-by-event apdorojimo logikos
– At-least-once garantijų, kurios pakanka jūsų use case’ui
– Integracijos su daugeliu skirtingų sistemų
– Sistemos, kuri veikia 24/7 be perstojo

Pavyzdžiui, fraud detection sistemose, kur kiekviena transakcija turi būti patikrinta realiu laiku, Storm yra puikus pasirinkimas. Arba realaus laiko alerting sistemose, kur reikia greitai reaguoti į anomalijas.

Storm gali būti ne geriausias pasirinkimas, kai:
– Jums reikia sudėtingos event time semantikos ir windowing
– Būtina exactly-once semantika be Trident overhead
– Reikia SQL-like query kalbos duomenų srautams
– Jūsų komanda jau turi expertise su kita technologija

Šiais atvejais galite pažiūrėti į Apache Flink, kuris turi pažangesnes windowing galimybes ir geresnį exactly-once palaikymą. Arba Kafka Streams, jei jūsų duomenys jau yra Kafka ir norite paprastesnio deployment modelio.

Ką reikia žinoti prieš einant į gamybą

Jei nusprendėte naudoti Storm production’e, yra keletas dalykų, kuriuos tikrai turėtumėte apsvarstyti iš anksto.

Capacity planning yra kritiškai svarbus. Jums reikia žinoti, kokį throughput tikitės, kokia latencija priimtina, ir pagal tai apskaičiuoti, kiek resursų reikės. Geriau turėti šiek tiek per daug resursų nei per mažai – kai sistema perkrauta, ji gali pradėti atsilikti ir niekada nepavyti.

Paprastas būdas apskaičiuoti: jei tikitės 10,000 events per sekundę ir kiekvienas event apdorojimas užtrunka 10ms, jums reikės bent 100 paralelių executor’ių (10,000 * 0.01 / 1 = 100). Praktikoje pridėkite dar 50-100% atsargos.

Backpressure mechanizmas padeda, kai downstream komponentai nespėja apdoroti duomenų taip greitai, kaip upstream juos generuoja. Storm automatiškai sulėtina spout’us, kai aptinka backpressure. Bet jūs turėtumėte sukonfigūruoti tai teisingai – per agresyvus backpressure gali sumažinti throughput, per švelnusis gali leisti sistemai užsikimšti.

State management yra sudėtinga paskirstytose sistemose. Jei jūsų bolt’ai laiko būseną (kaip mūsų hashtag counter pavyzdyje), turite apsvarstyti, kas nutiks, kai bolt’as nukrista. Trident API siūlo state management abstrakcijas, bet jos turi performance overhead. Alternatyva – naudoti išorinę būsenos saugyklą kaip Redis, bet tada turite tvarkytis su tinklo latencija.

Deployment strategija taip pat svarbi. Kaip atnaujinsite topologiją be downtime? Storm leidžia deactivate topologiją (ji nustoja apdoroti naujus tuple’us, bet baigia apdoroti esamus), pakeisti ją nauja versija ir vėl activate. Bet tai vis tiek reiškia trumpą pertrūkį. Kai kurios komandos naudoja blue-green deployment – palaiko dvi topologijas lygiagrečiai ir perjungia traffic.

Disaster recovery planas turi būti apgalvotas iš anksto. Kas nutiks, jei visas cluster’is nukrista? Kaip greitai galėsite atkurti? Ar turite backup’us? Ar galite atkurti būseną? Šie klausimai gali atrodyti paranojiškai, bet kai įvyksta tikra problema 3 val. ryto, būsite dėkingi, kad apie tai pagalvojote iš anksto.

Ateitis ir alternatyvos šiuolaikinėje ekosistemoje

Storm vis dar aktyviai palaikomas Apache Software Foundation, bet reikia pripažinti, kad naujų projektų atveju dažniau renkamasi naujesnes technologijas. Tai nereiškia, kad Storm yra „miręs” – tiesiog ekosistema evoliucionavo.

Apache Flink tapo populiarus dėl savo pažangių galimybių – tikrasis event time processing, sophisticated windowing, exactly-once semantika be didelio performance overhead. Flink taip pat turi geresnį SQL palaikymą, kas leidžia analitikams dirbti su streaming duomenimis nenaudojant Java ar Scala.

Kafka Streams patrauklus savo paprastumu – jei jūsų duomenys jau Kafka, kodėl reikėtų papildomos infrastruktūros? Tiesiog parašote Java aplikaciją, kuri skaito iš vieno topic ir rašo į kitą. Deployment paprastesnis, nes tai tik paprasta aplikacija, ne atskiras cluster’is.

Apache Pulsar Functions ir Apache Beam taip pat vertos dėmesio. Beam suteikia unified programming modelį, kuris gali veikti ant skirtingų execution engine’ų – Storm, Flink, Spark, Google Cloud Dataflow.

Bet štai ką svarbu suprasti: jei jau turite veikiančią Storm sistemą, nebūtinai reikia skubėti ją keisti. „If it ain’t broke, don’t fix it” principas vis dar galioja. Migracija į naują technologiją turi būti pagrįsta realiomis problemomis ar poreikiais, ne tik noru naudoti naujausią hype.

Jei tik pradedame naują projektą, rekomenduočiau pažiūrėti į Flink ar Kafka Streams pirmiausia. Bet jei jūsų komanda jau turi Storm expertise, arba jūsų use case’as paprastas ir Storm jo sprendimui pakanka, nėra jokios priežasties ieškoti sudėtingesnių sprendimų.

Realaus laiko duomenų apdorojimas nėra paprastas, nesvarbu, kokią technologiją pasirenkate. Svarbu suprasti fundamentalius principus – kaip veikia paskirstytos sistemos, kaip užtikrinti patikimumą, kaip optimizuoti našumą. Šios žinios bus naudingos nepriklausomai nuo to, ar naudojate Storm, Flink ar ką nors kita. Technologijos keičiasi, bet principai lieka tie patys.

Daugiau

Vite build tool: greitas frontend development