Elasticsearch Logstash pipeline: logų apdorojimas

Kas yra Logstash ir kodėl jis svarbus

Jei kada nors bandėte susidoroti su tūkstančiais ar net milijonais logų įrašų, žinote, kad tai gali tapti tikru košmaru. Logstash – tai vienas iš trijų pagrindinių ELK (Elasticsearch, Logstash, Kibana) steką sudarančių komponentų, kuris atlieka vieną iš svarbiausių darbų – logų surinkimą, transformavimą ir persiuntimą.

Paprasčiau tariant, Logstash veikia kaip duomenų konvejeris. Jis paima duomenis iš vieno ar kelių šaltinių, juos apdoroja pagal jūsų nurodytas taisykles ir išsiunčia į paskirties vietą – dažniausiai Elasticsearch. Tai gali būti serverių logai, aplikacijų klaidos, metrika ar bet kokie kiti struktūrizuoti ar nestruktūrizuoti duomenys.

Logstash ypatumas tas, kad jis dirba su pipeline (konvejerio) principu. Kiekvienas pipeline susideda iš trijų pagrindinių dalių: input (įvestis), filter (filtras) ir output (išvestis). Šis modulinis dizainas leidžia lankščiai konfigūruoti duomenų srautus ir pritaikyti juos konkretiems poreikiams.

Pipeline struktūra ir pagrindiniai komponentai

Logstash pipeline konfigūracija paprastai rašoma .conf faile ir naudoja savo sintaksę, kuri primena JSON, bet yra šiek tiek lankstesnė. Štai kaip atrodo paprasčiausias pipeline pavyzdys:

input {
  file {
    path => "/var/log/apache/access.log"
    start_position => "beginning"
  }
}

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "apache-logs-%{+YYYY.MM.dd}"
  }
}

Input sekcija apibrėžia, iš kur Logstash gauna duomenis. Tai gali būti failai, TCP/UDP jungtys, Kafka, Redis, HTTP endpoint’ai ar net AWS S3 bucket’ai. Viename pipeline galite turėti kelis input šaltinius – Logstash juos visus apdoros lygiagrečiai.

Filter sekcija – čia vyksta tikroji magija. Filtrai leidžia transformuoti, praturtinti ir struktūrizuoti duomenis. Galite išgauti konkrečius laukus iš nestruktūrizuotų tekstų, pridėti geografinę informaciją pagal IP adresus, konvertuoti datos formatus ar net atmesti nereikalingus įrašus.

Output sekcija nustato, kur keliauja apdoroti duomenys. Nors dažniausiai tai būna Elasticsearch, galite siųsti duomenis į kelias vietas vienu metu – pavyzdžiui, ir į Elasticsearch, ir į failą atsarginei kopijai.

Grok šablonai: nestruktūrizuotų logų valdymas

Vienas iš galingiausių Logstash įrankių yra grok filtras. Jei turite nestruktūrizuotus tekstinius logus, grok padės juos paversti struktūrizuotais duomenimis, kuriuos galėsite analizuoti.

Grok naudoja reguliariąsias išraiškas, bet su iš anksto apibrėžtais šablonais, kurie turi suprantamus pavadinimus. Pavyzdžiui, vietoj sudėtingos regex, galite naudoti %{IP:client_ip}, kad ištrauktumėte IP adresą ir jį įrašytumėte į lauką pavadinimu „client_ip”.

Štai realus pavyzdys, kaip apdoroti Nginx logą:

filter {
  grok {
    match => {
      "message" => '%{IPORHOST:client_ip} - %{USER:ident} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:response_code} %{NUMBER:bytes}'
    }
  }
  
  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "@timestamp"
  }
}

Šis filtras paims tokį logą:
192.168.1.1 - - [10/Jan/2024:13:55:36 +0000] "GET /api/users HTTP/1.1" 200 1234

Ir pavers jį struktūrizuotu objektu su atskirais laukais: client_ip, timestamp, method, request, response_code ir t.t.

Patarimas: naudokite Grok Debugger įrankį (jis integruotas į Kibana Dev Tools) testavimui. Tai sutaupo daug laiko, nes galite iš karto matyti, ar jūsų šablonas veikia teisingai.

Duomenų praturtinimas ir transformacijos

Kartais surinkti logai yra tik pusė istorijos. Logstash leidžia praturtinti duomenis papildoma informacija, kuri padeda geriau suprasti kontekstą.

GeoIP filtras yra puikus pavyzdys. Jis paima IP adresą ir prideda geografinę informaciją – šalį, miestą, koordinates:

filter {
  geoip {
    source => "client_ip"
    target => "geoip"
    fields => ["city_name", "country_name", "location"]
  }
}

Po šio filtro pritaikymo galėsite Kibana sukurti žemėlapius, rodančius, iš kur ateina jūsų vartotojai ar iš kur vyksta atakos.

Mutate filtras leidžia atlikti įvairias transformacijas: konvertuoti duomenų tipus, pervadinti laukus, pašalinti nereikalingus laukus ar sujungti kelis laukus į vieną:

filter {
  mutate {
    convert => {
      "response_code" => "integer"
      "bytes" => "integer"
    }
    remove_field => ["message", "host"]
    add_field => {
      "environment" => "production"
    }
  }
}

Conditional processing leidžia taikyti filtrus tik tam tikriems įrašams. Tai labai naudinga, kai turite skirtingų tipų logus viename pipeline:

filter {
  if [type] == "apache" {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
  } else if [type] == "nginx" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
    }
  }
}

Našumo optimizavimas ir geriausia praktika

Kai pradėsite apdoroti didelius duomenų kiekius, greitai pastebėsite, kad Logstash gali tapti butelio kakleliu. Štai keletas būdų, kaip optimizuoti našumą.

Pipeline workers – tai gijų skaičius, kurios apdoroja duomenis. Pagal nutylėjimą Logstash naudoja tiek worker’ių, kiek yra CPU branduolių. Galite tai keisti logstash.yml faile:

pipeline.workers: 8
pipeline.batch.size: 250
pipeline.batch.delay: 50

Batch size nustato, kiek įvykių apdorojama vienu metu. Didesnis skaičius reiškia geresnį našumą, bet ir didesnį atminties naudojimą. Reikia rasti balansą.

Persistent queues – tai funkcija, kuri išsaugo duomenis diske prieš juos apdorojant. Jei Logstash užstringa ar nutrūksta, duomenys nebus prarasti:

queue.type: persisted
queue.max_bytes: 4gb

Multiple pipelines – vietoj vieno didelio pipeline, galite sukurti kelis mažesnius, kiekvienas su savo konfigūracija ir resursais. Tai apibrėžiama pipelines.yml faile:

- pipeline.id: apache-logs
  path.config: "/etc/logstash/conf.d/apache.conf"
  pipeline.workers: 4

- pipeline.id: nginx-logs
  path.config: "/etc/logstash/conf.d/nginx.conf"
  pipeline.workers: 2

Dar vienas svarbus dalykas – monitoring. Logstash turi integruotą monitoring API, kurį galite pasiekti per HTTP:

curl -XGET 'localhost:9600/_node/stats/pipelines?pretty'

Tai parodys, kiek įvykių apdorota, kiek laukia eilėje, kokia filtro veikimo trukmė ir kita naudinga informacija.

Klaidų valdymas ir debugging

Kai kuriate sudėtingus pipeline, neišvengsite klaidų. Logstash turi keletą mechanizmų, kurie padeda jas aptikti ir išspręsti.

Dead letter queue (DLQ) – tai vieta, kur patenka įvykiai, kurių nepavyko apdoroti. Vietoj to, kad jie būtų tiesiog prarasti, galite juos peržiūrėti ir suprasti, kas nutiko:

dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1024mb

Įvykius iš DLQ galite perskaityti ir pakartotinai apdoroti naudodami specialų input plugin:

input {
  dead_letter_queue {
    path => "/var/lib/logstash/dead_letter_queue"
    commit_offsets => true
  }
}

Stdout output yra paprasčiausias debugging įrankis. Pridėkite jį prie savo pipeline, kad matytumėte, kokie duomenys išeina po filtrų:

output {
  stdout {
    codec => rubydebug
  }
}

Log level keitimas padeda gauti daugiau informacijos apie tai, kas vyksta Logstash viduje:

bin/logstash --log.level debug -f config/pipeline.conf

Praktinis patarimas: testuodami naują konfigūraciją, pradėkite nuo mažo duomenų rinkinio. Naudokite file input su konkrečiu failu, o ne realų duomenų srautą. Tai leis greitai iteruoti ir pataisyti klaidas.

Integracijos su išoriniais šaltiniais

Logstash tikroji galia atsiskleidžia, kai integruojate jį su įvairiomis sistemomis. Štai keletas populiarių scenarijų.

Kafka integracija yra labai paplitusi didelėse sistemose. Kafka veikia kaip buferis tarp duomenų šaltinių ir Logstash:

input {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topics => ["application-logs"]
    group_id => "logstash-consumer"
    codec => json
  }
}

Redis taip pat dažnai naudojamas kaip tarpinė eilė. Tai ypač naudinga, kai turite kelis Logstash instance’us ir norite paskirstyti apkrovą:

input {
  redis {
    host => "redis.example.com"
    port => 6379
    data_type => "list"
    key => "logstash"
  }
}

HTTP input leidžia priimti duomenis tiesiogiai per REST API. Tai puiku aplikacijoms, kurios gali siųsti logus tiesiogiai:

input {
  http {
    port => 8080
    codec => json
    ssl => true
    ssl_certificate => "/path/to/cert.pem"
    ssl_key => "/path/to/key.pem"
  }
}

JDBC input leidžia traukti duomenis iš reliacinių duomenų bazių. Tai naudinga, kai norite indeksuoti duomenų bazės turinį Elasticsearch:

input {
  jdbc {
    jdbc_connection_string => "jdbc:postgresql://localhost:5432/mydb"
    jdbc_user => "postgres"
    jdbc_password => "password"
    schedule => "*/5 * * * *"
    statement => "SELECT * FROM users WHERE updated_at > :sql_last_value"
    use_column_value => true
    tracking_column => "updated_at"
  }
}

Kaip visa tai sujungti į veikiančią sistemą

Teorija yra gera, bet praktika dar geresnė. Sukurkime pilną pipeline pavyzdį, kuris apdoroja web aplikacijos logus, juos praturtina ir siunčia į Elasticsearch.

Tarkime, turite Node.js aplikaciją, kuri logus rašo JSON formatu. Štai kaip galėtų atrodyti visapusiškas pipeline:

input {
  file {
    path => "/var/log/myapp/*.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => json
  }
}

filter {
  # Pridedame timestamp
  date {
    match => [ "timestamp", "ISO8601" ]
    target => "@timestamp"
  }
  
  # Ištraukiame user agent informaciją
  if [user_agent] {
    useragent {
      source => "user_agent"
      target => "ua"
    }
  }
  
  # Pridedame geografinę informaciją
  if [client_ip] {
    geoip {
      source => "client_ip"
      target => "geo"
    }
  }
  
  # Kategorizuojame pagal response kodą
  if [response_code] {
    if [response_code] >= 500 {
      mutate {
        add_field => { "severity" => "error" }
      }
    } else if [response_code] >= 400 {
      mutate {
        add_field => { "severity" => "warning" }
      }
    } else {
      mutate {
        add_field => { "severity" => "info" }
      }
    }
  }
  
  # Šaliname nereikalingus laukus
  mutate {
    remove_field => ["host", "path"]
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "myapp-logs-%{+YYYY.MM.dd}"
    template_name => "myapp-logs"
    template_overwrite => true
  }
  
  # Klaidų atveju siunčiame į atskirą indeksą
  if [severity] == "error" {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "myapp-errors-%{+YYYY.MM.dd}"
    }
  }
  
  # Development režime išvedame į console
  if [environment] == "development" {
    stdout {
      codec => rubydebug
    }
  }
}

Šis pipeline atlieka daug darbų: skaito JSON logus, normalizuoja laiko žymas, praturtina duomenis user agent ir geografine informacija, kategorizuoja pagal rimtumą ir išsaugo Elasticsearch su skirtingais indeksais skirtingiems duomenims.

Kad visa tai veiktų production aplinkoje, dar reikia sukonfigūruoti Logstash kaip systemd servisą. Sukurkite failą /etc/systemd/system/logstash.service:

[Unit]
Description=Logstash
After=network.target

[Service]
Type=simple
User=logstash
Group=logstash
ExecStart=/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/
Restart=always

[Install]
WantedBy=multi-user.target

Tada paleiskite:
systemctl enable logstash && systemctl start logstash

Svarbu reguliariai stebėti Logstash veikimą. Sukurkite alert’us Kibana, kurie praneštų, jei pipeline pradeda atsilikti arba kyla klaidų. Taip pat verta nustatyti log rotation, kad logai neužpildytų viso disko.

Ir nepamirškite – Logstash konfigūracija turėtų būti version control sistemoje. Tai leis sekti pakeitimus, grįžti prie ankstesnių versijų ir lengviau bendradarbiauti su komanda. Praktiškai, turėtumėte turėti atskirą git repository su visomis Logstash konfigūracijomis ir deployment scriptais.

Logstash pipeline kūrimas – tai iteratyvus procesas. Pradėkite paprastai, testuokite su realiais duomenimis, stebėkite našumą ir laipsniškai pridėkite daugiau funkcionalumo. Su laiku sukursite efektyvią logų apdorojimo sistemą, kuri ne tik rinks duomenis, bet ir pavers juos vertinga informacija jūsų verslui.

Daugiau

Apache Storm realaus laiko skaičiavimai