Kas yra Logstash ir kodėl jis svarbus
Jei kada nors bandėte susidoroti su tūkstančiais ar net milijonais logų įrašų, žinote, kad tai gali tapti tikru košmaru. Logstash – tai vienas iš trijų pagrindinių ELK (Elasticsearch, Logstash, Kibana) steką sudarančių komponentų, kuris atlieka vieną iš svarbiausių darbų – logų surinkimą, transformavimą ir persiuntimą.
Paprasčiau tariant, Logstash veikia kaip duomenų konvejeris. Jis paima duomenis iš vieno ar kelių šaltinių, juos apdoroja pagal jūsų nurodytas taisykles ir išsiunčia į paskirties vietą – dažniausiai Elasticsearch. Tai gali būti serverių logai, aplikacijų klaidos, metrika ar bet kokie kiti struktūrizuoti ar nestruktūrizuoti duomenys.
Logstash ypatumas tas, kad jis dirba su pipeline (konvejerio) principu. Kiekvienas pipeline susideda iš trijų pagrindinių dalių: input (įvestis), filter (filtras) ir output (išvestis). Šis modulinis dizainas leidžia lankščiai konfigūruoti duomenų srautus ir pritaikyti juos konkretiems poreikiams.
Pipeline struktūra ir pagrindiniai komponentai
Logstash pipeline konfigūracija paprastai rašoma .conf faile ir naudoja savo sintaksę, kuri primena JSON, bet yra šiek tiek lankstesnė. Štai kaip atrodo paprasčiausias pipeline pavyzdys:
input {
file {
path => "/var/log/apache/access.log"
start_position => "beginning"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "apache-logs-%{+YYYY.MM.dd}"
}
}
Input sekcija apibrėžia, iš kur Logstash gauna duomenis. Tai gali būti failai, TCP/UDP jungtys, Kafka, Redis, HTTP endpoint’ai ar net AWS S3 bucket’ai. Viename pipeline galite turėti kelis input šaltinius – Logstash juos visus apdoros lygiagrečiai.
Filter sekcija – čia vyksta tikroji magija. Filtrai leidžia transformuoti, praturtinti ir struktūrizuoti duomenis. Galite išgauti konkrečius laukus iš nestruktūrizuotų tekstų, pridėti geografinę informaciją pagal IP adresus, konvertuoti datos formatus ar net atmesti nereikalingus įrašus.
Output sekcija nustato, kur keliauja apdoroti duomenys. Nors dažniausiai tai būna Elasticsearch, galite siųsti duomenis į kelias vietas vienu metu – pavyzdžiui, ir į Elasticsearch, ir į failą atsarginei kopijai.
Grok šablonai: nestruktūrizuotų logų valdymas
Vienas iš galingiausių Logstash įrankių yra grok filtras. Jei turite nestruktūrizuotus tekstinius logus, grok padės juos paversti struktūrizuotais duomenimis, kuriuos galėsite analizuoti.
Grok naudoja reguliariąsias išraiškas, bet su iš anksto apibrėžtais šablonais, kurie turi suprantamus pavadinimus. Pavyzdžiui, vietoj sudėtingos regex, galite naudoti %{IP:client_ip}, kad ištrauktumėte IP adresą ir jį įrašytumėte į lauką pavadinimu „client_ip”.
Štai realus pavyzdys, kaip apdoroti Nginx logą:
filter {
grok {
match => {
"message" => '%{IPORHOST:client_ip} - %{USER:ident} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:bytes}'
}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
}
}
Šis filtras paims tokį logą:
192.168.1.1 - - [10/Jan/2024:13:55:36 +0000] "GET /api/users HTTP/1.1" 200 1234
Ir pavers jį struktūrizuotu objektu su atskirais laukais: client_ip, timestamp, method, request, response_code ir t.t.
Patarimas: naudokite Grok Debugger įrankį (jis integruotas į Kibana Dev Tools) testavimui. Tai sutaupo daug laiko, nes galite iš karto matyti, ar jūsų šablonas veikia teisingai.
Duomenų praturtinimas ir transformacijos
Kartais surinkti logai yra tik pusė istorijos. Logstash leidžia praturtinti duomenis papildoma informacija, kuri padeda geriau suprasti kontekstą.
GeoIP filtras yra puikus pavyzdys. Jis paima IP adresą ir prideda geografinę informaciją – šalį, miestą, koordinates:
filter {
geoip {
source => "client_ip"
target => "geoip"
fields => ["city_name", "country_name", "location"]
}
}
Po šio filtro pritaikymo galėsite Kibana sukurti žemėlapius, rodančius, iš kur ateina jūsų vartotojai ar iš kur vyksta atakos.
Mutate filtras leidžia atlikti įvairias transformacijas: konvertuoti duomenų tipus, pervadinti laukus, pašalinti nereikalingus laukus ar sujungti kelis laukus į vieną:
filter {
mutate {
convert => {
"response_code" => "integer"
"bytes" => "integer"
}
remove_field => ["message", "host"]
add_field => {
"environment" => "production"
}
}
}
Conditional processing leidžia taikyti filtrus tik tam tikriems įrašams. Tai labai naudinga, kai turite skirtingų tipų logus viename pipeline:
filter {
if [type] == "apache" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
} else if [type] == "nginx" {
grok {
match => { "message" => "%{NGINXACCESS}" }
}
}
}
Našumo optimizavimas ir geriausia praktika
Kai pradėsite apdoroti didelius duomenų kiekius, greitai pastebėsite, kad Logstash gali tapti butelio kakleliu. Štai keletas būdų, kaip optimizuoti našumą.
Pipeline workers – tai gijų skaičius, kurios apdoroja duomenis. Pagal nutylėjimą Logstash naudoja tiek worker’ių, kiek yra CPU branduolių. Galite tai keisti logstash.yml faile:
pipeline.workers: 8 pipeline.batch.size: 250 pipeline.batch.delay: 50
Batch size nustato, kiek įvykių apdorojama vienu metu. Didesnis skaičius reiškia geresnį našumą, bet ir didesnį atminties naudojimą. Reikia rasti balansą.
Persistent queues – tai funkcija, kuri išsaugo duomenis diske prieš juos apdorojant. Jei Logstash užstringa ar nutrūksta, duomenys nebus prarasti:
queue.type: persisted queue.max_bytes: 4gb
Multiple pipelines – vietoj vieno didelio pipeline, galite sukurti kelis mažesnius, kiekvienas su savo konfigūracija ir resursais. Tai apibrėžiama pipelines.yml faile:
- pipeline.id: apache-logs path.config: "/etc/logstash/conf.d/apache.conf" pipeline.workers: 4 - pipeline.id: nginx-logs path.config: "/etc/logstash/conf.d/nginx.conf" pipeline.workers: 2
Dar vienas svarbus dalykas – monitoring. Logstash turi integruotą monitoring API, kurį galite pasiekti per HTTP:
curl -XGET 'localhost:9600/_node/stats/pipelines?pretty'
Tai parodys, kiek įvykių apdorota, kiek laukia eilėje, kokia filtro veikimo trukmė ir kita naudinga informacija.
Klaidų valdymas ir debugging
Kai kuriate sudėtingus pipeline, neišvengsite klaidų. Logstash turi keletą mechanizmų, kurie padeda jas aptikti ir išspręsti.
Dead letter queue (DLQ) – tai vieta, kur patenka įvykiai, kurių nepavyko apdoroti. Vietoj to, kad jie būtų tiesiog prarasti, galite juos peržiūrėti ir suprasti, kas nutiko:
dead_letter_queue.enable: true dead_letter_queue.max_bytes: 1024mb
Įvykius iš DLQ galite perskaityti ir pakartotinai apdoroti naudodami specialų input plugin:
input {
dead_letter_queue {
path => "/var/lib/logstash/dead_letter_queue"
commit_offsets => true
}
}
Stdout output yra paprasčiausias debugging įrankis. Pridėkite jį prie savo pipeline, kad matytumėte, kokie duomenys išeina po filtrų:
output {
stdout {
codec => rubydebug
}
}
Log level keitimas padeda gauti daugiau informacijos apie tai, kas vyksta Logstash viduje:
bin/logstash --log.level debug -f config/pipeline.conf
Praktinis patarimas: testuodami naują konfigūraciją, pradėkite nuo mažo duomenų rinkinio. Naudokite file input su konkrečiu failu, o ne realų duomenų srautą. Tai leis greitai iteruoti ir pataisyti klaidas.
Integracijos su išoriniais šaltiniais
Logstash tikroji galia atsiskleidžia, kai integruojate jį su įvairiomis sistemomis. Štai keletas populiarių scenarijų.
Kafka integracija yra labai paplitusi didelėse sistemose. Kafka veikia kaip buferis tarp duomenų šaltinių ir Logstash:
input {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topics => ["application-logs"]
group_id => "logstash-consumer"
codec => json
}
}
Redis taip pat dažnai naudojamas kaip tarpinė eilė. Tai ypač naudinga, kai turite kelis Logstash instance’us ir norite paskirstyti apkrovą:
input {
redis {
host => "redis.example.com"
port => 6379
data_type => "list"
key => "logstash"
}
}
HTTP input leidžia priimti duomenis tiesiogiai per REST API. Tai puiku aplikacijoms, kurios gali siųsti logus tiesiogiai:
input {
http {
port => 8080
codec => json
ssl => true
ssl_certificate => "/path/to/cert.pem"
ssl_key => "/path/to/key.pem"
}
}
JDBC input leidžia traukti duomenis iš reliacinių duomenų bazių. Tai naudinga, kai norite indeksuoti duomenų bazės turinį Elasticsearch:
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/mydb"
jdbc_user => "postgres"
jdbc_password => "password"
schedule => "*/5 * * * *"
statement => "SELECT * FROM users WHERE updated_at > :sql_last_value"
use_column_value => true
tracking_column => "updated_at"
}
}
Kaip visa tai sujungti į veikiančią sistemą
Teorija yra gera, bet praktika dar geresnė. Sukurkime pilną pipeline pavyzdį, kuris apdoroja web aplikacijos logus, juos praturtina ir siunčia į Elasticsearch.
Tarkime, turite Node.js aplikaciją, kuri logus rašo JSON formatu. Štai kaip galėtų atrodyti visapusiškas pipeline:
input {
file {
path => "/var/log/myapp/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => json
}
}
filter {
# Pridedame timestamp
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
# Ištraukiame user agent informaciją
if [user_agent] {
useragent {
source => "user_agent"
target => "ua"
}
}
# Pridedame geografinę informaciją
if [client_ip] {
geoip {
source => "client_ip"
target => "geo"
}
}
# Kategorizuojame pagal response kodą
if [response_code] {
if [response_code] >= 500 {
mutate {
add_field => { "severity" => "error" }
}
} else if [response_code] >= 400 {
mutate {
add_field => { "severity" => "warning" }
}
} else {
mutate {
add_field => { "severity" => "info" }
}
}
}
# Šaliname nereikalingus laukus
mutate {
remove_field => ["host", "path"]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "myapp-logs-%{+YYYY.MM.dd}"
template_name => "myapp-logs"
template_overwrite => true
}
# Klaidų atveju siunčiame į atskirą indeksą
if [severity] == "error" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "myapp-errors-%{+YYYY.MM.dd}"
}
}
# Development režime išvedame į console
if [environment] == "development" {
stdout {
codec => rubydebug
}
}
}
Šis pipeline atlieka daug darbų: skaito JSON logus, normalizuoja laiko žymas, praturtina duomenis user agent ir geografine informacija, kategorizuoja pagal rimtumą ir išsaugo Elasticsearch su skirtingais indeksais skirtingiems duomenims.
Kad visa tai veiktų production aplinkoje, dar reikia sukonfigūruoti Logstash kaip systemd servisą. Sukurkite failą /etc/systemd/system/logstash.service:
[Unit] Description=Logstash After=network.target [Service] Type=simple User=logstash Group=logstash ExecStart=/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/ Restart=always [Install] WantedBy=multi-user.target
Tada paleiskite:
systemctl enable logstash && systemctl start logstash
Svarbu reguliariai stebėti Logstash veikimą. Sukurkite alert’us Kibana, kurie praneštų, jei pipeline pradeda atsilikti arba kyla klaidų. Taip pat verta nustatyti log rotation, kad logai neužpildytų viso disko.
Ir nepamirškite – Logstash konfigūracija turėtų būti version control sistemoje. Tai leis sekti pakeitimus, grįžti prie ankstesnių versijų ir lengviau bendradarbiauti su komanda. Praktiškai, turėtumėte turėti atskirą git repository su visomis Logstash konfigūracijomis ir deployment scriptais.
Logstash pipeline kūrimas – tai iteratyvus procesas. Pradėkite paprastai, testuokite su realiais duomenimis, stebėkite našumą ir laipsniškai pridėkite daugiau funkcionalumo. Su laiku sukursite efektyvią logų apdorojimo sistemą, kuri ne tik rinks duomenis, bet ir pavers juos vertinga informacija jūsų verslui.
