logstash#

やんごとなき理由により某組織のネットワーク管理を予算ほぼゼロで保守する必要性があり, 数百のネットワーク機器のlogの収集・検索・可視化等を1箇所で行うため,Elastic社のElastic Stackのうちlogstashの運用の備忘録を残す. ChatGPTとかがない時代に試行錯誤して構築したので,最適化されていないと思うがデータフローの概念図はこんな感じ.

最小の設定とデバッグ#

次のようなlog(test.log)があったとしよう.(最終行は改行ふ含めないとlogstashの読み込みが終わらないので注意)

Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:59:28" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys="vsys1" session_end_reason="aged-out"
Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:59:28" action=allow rule=00000000s0121 from_zone=Server to_zone=Untrust src=192.168.11.2 dst=192.168.11.1 src_port=11797 dst_port=53 app=dns-base proto=udp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=0 inbound_if=ethernet1/7 outbound_if=ethernet1/1 bytes=258 bytes_sent=80 bytes_received=178 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys="vsys1" session_end_reason="aged-out"
Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:55:43" action=allow rule=00000000s0050 from_zone=Trust to_zone=WAN-Musen src=192.168.11.3 dst=3.1.2.4 src_port=61284 dst_port=443 app=ssl proto=tcp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=240 inbound_if=ae1 outbound_if=ethernet1/2 bytes=17834 bytes_sent=10454 bytes_received=7380 packets=40 pkts_sent=21 pkts_received=19 category=web-advertisements flags=0x2041c subtype=end vsys="vsys1" session_end_reason="tcp-fin"
Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:59:13" action=allow rule=00000000s0050 from_zone=Trust to_zone=WAN-Musen src=192.168.11.10 dst=192.168.11.5 src_port=55101 dst_port=443 app=ssl proto=tcp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=30 inbound_if=ae1 outbound_if=ethernet1/2 bytes=6703 bytes_sent=2051 bytes_received=4652 packets=24 pkts_sent=12 pkts_received=12 category=business-and-economy flags=0x2041c subtype=end vsys="vsys1" session_end_reason="tcp-rst-from-server"
Dec  1 00:00:00 hostname type=TRAFFIC start_time="2025/11/30 23:58:28" action=allow rule=00000000s0050 from_zone=Trust to_zone=WAN-Musen src=192.168.11.35 dst=192.168.11.13 src_port=52045 dst_port=443 app=icloud-base proto=tcp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=90 inbound_if=ae1 outbound_if=ethernet1/2 bytes=735 bytes_sent=661 bytes_received=74 packets=3 pkts_sent=2 pkts_received=1 category=proxy-avoidance-and-anonymizers flags=0x2040d subtype=end vsys="vsys1" session_end_reason="threat"

とりあえずはlogを標準入力で受け取り,何もせずにlogstashに食わせて標準出力に出力するtest.confを適当な(ここではホーム)ディレクトリに作成する

input {
  stdin{}
}

filter {
  # 今回は整形しないため、filterブロックは空(または記述なし)にします
}

output {
  # 標準出力に出力
  stdout {
    codec => rubydebug
  }
}

テストで動かす場合はルート権限では動かないようなので,ユーザー権限で以下のようにしてテストする. ここでtest.confのpathは絶対パスで指定する必要がある.

$ cat test.log | sudo -u logstash /usr/share/logstash/bin/logstash -f /home/hanada/test.conf --path.settings /etc/logstash --path.logs /tmp

すると

{
        "event" => {
        "original" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\""
    },
    "@timestamp" => 2026-01-04T21:02:38.933147039Z,
          "host" => {
        "hostname" => "zzzzz"
    },
       "message" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\"",
      "@version" => "1"
}
...(省略)

のように@timestamp, hostのタグが自動的に付与され,流し込んだlogはmessageのtagに代入される. ログの収集だけであればこれでも良いのだが,test.logはfieldがスペースで区切られ,Keyに=でValueが代入されており構造化されているのでkv filterで

filter {
  # 1. grokで「日付・ホスト名」と「それ以降(kv_payload)」に分割
  grok {
    match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:log_source} %{GREEDYDATA:kv_payload}" }
  }

  # 2. 切り出した kv_payload に対してKVフィルタを適用
  kv {
    source => "kv_payload"
    value_split => "="
    field_split => " "
    lowercase_key => true   # フィールド名(Key)をすべて小文字にする    
    # 文字列内のダブルクォーテーションを自動で削除する設定
    remove_char_key => "\""
    remove_char_value => "\""
  }

# 3. 不要なフィールドを削除
mutate {
    remove_field => [ "kv_payload" ]
  }

}

とtest.confを編集して

$ cat test.log | sudo -u logstash /usr/share/logstash/bin/logstash -f /home/hanada/test.conf --path.settings /etc/logstash --path.logs /tmp

を実行すれば以下のように構造化される.

            "start_time" => "2025/11/30 23:59:28",
              "src_port" => "55708",
                   "src" => "192.168.11.2",
                 "event" => {
        "original" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\""
    },
            "inbound_if" => "ae1",
               "message" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\"",
            "bytes_sent" => "69",
              "category" => "any",
               "nat_src" => "0.0.0.0",
             "from_zone" => "Trust",
              "dst_port" => "53",
                 "proto" => "udp",
         "pkts_received" => "1",
               "subtype" => "end",
        "bytes_received" => "551",
             "nat_dport" => "0",
              "@version" => "1",
             "timestamp" => "Nov 30 23:59:59",
            "log_source" => "hostname",
                  "vsys" => "vsys1",
    "session_end_reason" => "aged-out",
               "to_zone" => "Server",
                   "dst" => "192.168.11.1",
             "nat_sport" => "0",
            "@timestamp" => 2026-01-04T21:26:59.482727568Z,
                 "bytes" => "620",
               "nat_dst" => "0.0.0.0",
           "outbound_if" => "ethernet1/7",
                   "app" => "dns-base",
                "action" => "allow",
                  "host" => {
        "hostname" => "zzz"
    },
               "elapsed" => "0",
               "packets" => "2",
             "pkts_sent" => "1",
                  "type" => "TRAFFIC",
                  "rule" => "00000000s0099",
                 "flags" => "0x19"
}

を実行すればフィルターが期待通りに動いているか確認できる.タグの牽引に失敗した場合は[0] "_grokparsefailure" がタグフィールドに代入されるのでそれを確認すれば良いだろう.

フィルターのデバッグが終わったら設定ファイルは/etc/logstash/conf.d/に配置する. 私の環境ではlogの入力はrsyslogから転送しているのでrsyslogの転送ポートをinputに設定し,outputをelasticsearchに設定する.

input {
   udp {
   port => 6644
   tags => ["testlog"]
   }
}
filter {省略}

output {
   elasticsearch {
     hosts => ["http://localhost:9200"]
     data_stream => true
     data_stream_type => "logs" # data_streamは必須
     data_stream_dataset => "testlog"
     user => "????"
     password => "????"
   }
}

data_streamの説明はelasticsearch DataStreamを参照

受信したlogがelasticsearchにindexが登録されているかどうかの確認は

 curl -u <elastic-adm-name> GET "localhost:9200/_cat/indices?v&s=index"

で確認可能.

複数のconfファイルはpipelineに#

logstashの設定ファイルを/etc/logstash/conf.d/に置いたのちに/etc/logstash/pipelines.ymlに設定ファイルのパスを記載すれば適切に動く.

root@mittag-leffler:/etc/logstash/conf.d# cat /etc/logstash/pipelines.yml
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

#- pipeline.id: main
#  path.config: "/etc/logstash/conf.d/*.conf"

- pipeline.id: Alog
  path.config: "/etc/logstash/conf.d/a-log.conf"
- pipeline.id: Blog
  path.config: "/etc/logstash/conf.d/b-log.conf"

条件分岐#

ファイヤーウォールのlogのように単位のアプリケーションのlogはシステマティックに生成されているのでkvフィルターで構造化しやすい.しかしながsyslogのように複数あアプリーケーションのlogは基準化されていないためcase by caseで処理するしかない(もしかするとbeatsを使うともっと簡単なのしれない). ここで泥臭くif文で処理すると次のようになる.


filter {
  mutate { remove_field => ["host"] }
  mutate { gsub => [ "message", "\s+", " " ] }

  if "nginx:" in [message] {
    dissect {
      mapping => { "message" => "%{ts} %{+ts} %{+ts} %{+ts} %{src} %{prog}: %{msg}" }
    }
  } else if "KERNEL" in [message] {
    dissect {
      mapping => { "message" => "%{ts} %{+ts} %{+ts} %{+ts} %{ip} %{prog}(%{src}@%{ip}): %{msg}" }
    }
  } 
  
  if "<ERRS>" in [message] or "[error]" in [message] {
    mutate { add_field => {"alert" => "error"} }
  }
  else if "<WARN>" in [message] or "[warn]" in [message] {
    mutate { remove_field => ["alert"] }
    mutate { add_field => {"alert" => "warn"} }
  }
  else {
    mutate { add_field => {"alert" => "info" } }
  }
}

Field valueの型の変換#

上のlogstashの設定だとfield valueの型がkeywardになっているので,ipやportなどはip,integerなどに変換する ここでは例えば/etc/logstash/conf.d/index_template.jsonに次のようにjsonを書く.

{
  "index_patterns": ["somelog*"],
  "template": {
    "settings": {
      "number_of_shards": 1
    },
    "mappings": {
      "properties": {
          "ip": { "type": "ip" },
          "user": { "type": "keyword"},
          "MAC": { "type": "keyword" },
          "VLAN": { "type": "integer" }
      }
    }
  }
}

temprateの投稿は

curl -X PUT "http://localhost:9200/_index_template/index_template" -H "Content-Type: application/json" -u <elastic-adm> -d @./index_temprate.json

既存のindexがある場合は変換されないので注意.

integer型の場合桁数の長い数字は 1111111->11,111,11 のようになるのでuser id などはkeyword型にした方が良いだろう.