# [logstash](https://www.elastic.co/jp/logstash) やんごとなき理由により某組織のネットワーク管理を予算ほぼゼロで保守する必要性があり, 数百のネットワーク機器のlogの収集・検索・可視化等を1箇所で行うため,Elastic社の[Elastic Stack](https://www.elastic.co/jp/elastic-stack)のうち[logstash](https://www.elastic.co/jp/logstash)の運用の備忘録を残す. ChatGPTとかがない時代に試行錯誤して構築したので,最適化されていないと思うがデータフローの概念図はこんな感じ. ![](elasticstack.jpg) ## 最小の設定とデバッグ 次のようなlog(test.log)があったとしよう.(最終行は改行ふ含めないとlogstashの読み込みが終わらないので注意) ``` Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:59:28" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys="vsys1" session_end_reason="aged-out" Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:59:28" action=allow rule=00000000s0121 from_zone=Server to_zone=Untrust src=192.168.11.2 dst=192.168.11.1 src_port=11797 dst_port=53 app=dns-base proto=udp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=0 inbound_if=ethernet1/7 outbound_if=ethernet1/1 bytes=258 bytes_sent=80 bytes_received=178 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys="vsys1" session_end_reason="aged-out" Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:55:43" action=allow rule=00000000s0050 from_zone=Trust to_zone=WAN-Musen src=192.168.11.3 dst=3.1.2.4 src_port=61284 dst_port=443 app=ssl proto=tcp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=240 inbound_if=ae1 outbound_if=ethernet1/2 bytes=17834 bytes_sent=10454 bytes_received=7380 packets=40 pkts_sent=21 pkts_received=19 category=web-advertisements flags=0x2041c subtype=end vsys="vsys1" session_end_reason="tcp-fin" Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:59:13" action=allow rule=00000000s0050 from_zone=Trust to_zone=WAN-Musen src=192.168.11.10 dst=192.168.11.5 src_port=55101 dst_port=443 app=ssl proto=tcp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=30 inbound_if=ae1 outbound_if=ethernet1/2 bytes=6703 bytes_sent=2051 bytes_received=4652 packets=24 pkts_sent=12 pkts_received=12 category=business-and-economy flags=0x2041c subtype=end vsys="vsys1" session_end_reason="tcp-rst-from-server" Dec 1 00:00:00 hostname type=TRAFFIC start_time="2025/11/30 23:58:28" action=allow rule=00000000s0050 from_zone=Trust to_zone=WAN-Musen src=192.168.11.35 dst=192.168.11.13 src_port=52045 dst_port=443 app=icloud-base proto=tcp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=90 inbound_if=ae1 outbound_if=ethernet1/2 bytes=735 bytes_sent=661 bytes_received=74 packets=3 pkts_sent=2 pkts_received=1 category=proxy-avoidance-and-anonymizers flags=0x2040d subtype=end vsys="vsys1" session_end_reason="threat" ``` とりあえずはlogを標準入力で受け取り,何もせずにlogstashに食わせて標準出力に出力するtest.confを適当な(ここではホーム)ディレクトリに作成する ``` input { stdin{} } filter { # 今回は整形しないため、filterブロックは空(または記述なし)にします } output { # 標準出力に出力 stdout { codec => rubydebug } } ``` テストで動かす場合はルート権限では動かないようなので,ユーザー権限で以下のようにしてテストする. ここで`test.conf`のpathは絶対パスで指定する必要がある. ``` $ cat test.log | sudo -u logstash /usr/share/logstash/bin/logstash -f /home/hanada/test.conf --path.settings /etc/logstash --path.logs /tmp ``` すると ``` { "event" => { "original" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\"" }, "@timestamp" => 2026-01-04T21:02:38.933147039Z, "host" => { "hostname" => "zzzzz" }, "message" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\"", "@version" => "1" } ...(省略) ``` のように`@timestamp`, `host`のタグが自動的に付与され,流し込んだlogは`message`のtagに代入される. ログの収集だけであればこれでも良いのだが,`test.log`はfieldがスペースで区切られ,Keyに`=`でValueが代入されており構造化されているのでkv filterで ``` filter { # 1. grokで「日付・ホスト名」と「それ以降(kv_payload)」に分割 grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:log_source} %{GREEDYDATA:kv_payload}" } } # 2. 切り出した kv_payload に対してKVフィルタを適用 kv { source => "kv_payload" value_split => "=" field_split => " " lowercase_key => true # フィールド名(Key)をすべて小文字にする # 文字列内のダブルクォーテーションを自動で削除する設定 remove_char_key => "\"" remove_char_value => "\"" } # 3. 不要なフィールドを削除 mutate { remove_field => [ "kv_payload" ] } } ``` とtest.confを編集して ``` $ cat test.log | sudo -u logstash /usr/share/logstash/bin/logstash -f /home/hanada/test.conf --path.settings /etc/logstash --path.logs /tmp ``` を実行すれば以下のように構造化される. ``` "start_time" => "2025/11/30 23:59:28", "src_port" => "55708", "src" => "192.168.11.2", "event" => { "original" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\"" }, "inbound_if" => "ae1", "message" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\"", "bytes_sent" => "69", "category" => "any", "nat_src" => "0.0.0.0", "from_zone" => "Trust", "dst_port" => "53", "proto" => "udp", "pkts_received" => "1", "subtype" => "end", "bytes_received" => "551", "nat_dport" => "0", "@version" => "1", "timestamp" => "Nov 30 23:59:59", "log_source" => "hostname", "vsys" => "vsys1", "session_end_reason" => "aged-out", "to_zone" => "Server", "dst" => "192.168.11.1", "nat_sport" => "0", "@timestamp" => 2026-01-04T21:26:59.482727568Z, "bytes" => "620", "nat_dst" => "0.0.0.0", "outbound_if" => "ethernet1/7", "app" => "dns-base", "action" => "allow", "host" => { "hostname" => "zzz" }, "elapsed" => "0", "packets" => "2", "pkts_sent" => "1", "type" => "TRAFFIC", "rule" => "00000000s0099", "flags" => "0x19" } ``` を実行すればフィルターが期待通りに動いているか確認できる.タグの牽引に失敗した場合は`[0] "_grokparsefailure"` がタグフィールドに代入されるのでそれを確認すれば良いだろう. フィルターのデバッグが終わったら設定ファイルは`/etc/logstash/conf.d/`に配置する. 私の環境ではlogの入力は`rsyslog`から転送しているのでrsyslogの転送ポートをinputに設定し,outputをelasticsearchに設定する. ``` input { udp { port => 6644 tags => ["testlog"] } } filter {省略} output { elasticsearch { hosts => ["http://localhost:9200"] data_stream => true data_stream_type => "logs" # data_streamは必須 data_stream_dataset => "testlog" user => "????" password => "????" } } ``` data_streamの説明は[elasticsearch DataStream](ES-Datastream)を参照 受信したlogがelasticsearchにindexが登録されているかどうかの確認は ``` curl -u GET "localhost:9200/_cat/indices?v&s=index" ``` で確認可能. ## 複数のconfファイルはpipelineに logstashの設定ファイルを`/etc/logstash/conf.d/`に置いたのちに`/etc/logstash/pipelines.yml`に設定ファイルのパスを記載すれば適切に動く. ``` root@mittag-leffler:/etc/logstash/conf.d# cat /etc/logstash/pipelines.yml # This file is where you define your pipelines. You can define multiple. # For more information on multiple pipelines, see the documentation: # https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html #- pipeline.id: main # path.config: "/etc/logstash/conf.d/*.conf" - pipeline.id: Alog path.config: "/etc/logstash/conf.d/a-log.conf" - pipeline.id: Blog path.config: "/etc/logstash/conf.d/b-log.conf" ``` ## 条件分岐 ファイヤーウォールのlogのように単位のアプリケーションのlogはシステマティックに生成されているのでkvフィルターで構造化しやすい.しかしながsyslogのように複数あアプリーケーションのlogは基準化されていないためcase by caseで処理するしかない(もしかするとbeatsを使うともっと簡単なのしれない). ここで泥臭くif文で処理すると次のようになる. ``` filter { mutate { remove_field => ["host"] } mutate { gsub => [ "message", "\s+", " " ] } if "nginx:" in [message] { dissect { mapping => { "message" => "%{ts} %{+ts} %{+ts} %{+ts} %{src} %{prog}: %{msg}" } } } else if "KERNEL" in [message] { dissect { mapping => { "message" => "%{ts} %{+ts} %{+ts} %{+ts} %{ip} %{prog}(%{src}@%{ip}): %{msg}" } } } if "" in [message] or "[error]" in [message] { mutate { add_field => {"alert" => "error"} } } else if "" in [message] or "[warn]" in [message] { mutate { remove_field => ["alert"] } mutate { add_field => {"alert" => "warn"} } } else { mutate { add_field => {"alert" => "info" } } } } ``` ## Field valueの型の変換 上のlogstashの設定だとfield valueの型がkeywardになっているので,ipやportなどはip,integerなどに変換する ここでは例えば`/etc/logstash/conf.d/index_template.json`に次のようにjsonを書く. ``` { "index_patterns": ["somelog*"], "template": { "settings": { "number_of_shards": 1 }, "mappings": { "properties": { "ip": { "type": "ip" }, "user": { "type": "keyword"}, "MAC": { "type": "keyword" }, "VLAN": { "type": "integer" } } } } } ``` temprateの投稿は ``` curl -X PUT "http://localhost:9200/_index_template/index_template" -H "Content-Type: application/json" -u -d @./index_temprate.json ``` 既存のindexがある場合は変換されないので注意. integer型の場合桁数の長い数字は 1111111->11,111,11 のようになるのでuser id などはkeyword型にした方が良いだろう. ```{toctree} :maxdepth: 1 another_page