logstash#
やんごとなき理由により某組織のネットワーク管理を予算ほぼゼロで保守する必要性があり, 数百のネットワーク機器のlogの収集・検索・可視化等を1箇所で行うため,Elastic社のElastic Stackのうちlogstashの運用の備忘録を残す. ChatGPTとかがない時代に試行錯誤して構築したので,最適化されていないと思うがデータフローの概念図はこんな感じ.

最小の設定とデバッグ#
次のようなlog(test.log)があったとしよう.(最終行は改行ふ含めないとlogstashの読み込みが終わらないので注意)
Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:59:28" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys="vsys1" session_end_reason="aged-out"
Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:59:28" action=allow rule=00000000s0121 from_zone=Server to_zone=Untrust src=192.168.11.2 dst=192.168.11.1 src_port=11797 dst_port=53 app=dns-base proto=udp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=0 inbound_if=ethernet1/7 outbound_if=ethernet1/1 bytes=258 bytes_sent=80 bytes_received=178 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys="vsys1" session_end_reason="aged-out"
Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:55:43" action=allow rule=00000000s0050 from_zone=Trust to_zone=WAN-Musen src=192.168.11.3 dst=3.1.2.4 src_port=61284 dst_port=443 app=ssl proto=tcp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=240 inbound_if=ae1 outbound_if=ethernet1/2 bytes=17834 bytes_sent=10454 bytes_received=7380 packets=40 pkts_sent=21 pkts_received=19 category=web-advertisements flags=0x2041c subtype=end vsys="vsys1" session_end_reason="tcp-fin"
Nov 30 23:59:59 hostname type=TRAFFIC start_time="2025/11/30 23:59:13" action=allow rule=00000000s0050 from_zone=Trust to_zone=WAN-Musen src=192.168.11.10 dst=192.168.11.5 src_port=55101 dst_port=443 app=ssl proto=tcp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=30 inbound_if=ae1 outbound_if=ethernet1/2 bytes=6703 bytes_sent=2051 bytes_received=4652 packets=24 pkts_sent=12 pkts_received=12 category=business-and-economy flags=0x2041c subtype=end vsys="vsys1" session_end_reason="tcp-rst-from-server"
Dec 1 00:00:00 hostname type=TRAFFIC start_time="2025/11/30 23:58:28" action=allow rule=00000000s0050 from_zone=Trust to_zone=WAN-Musen src=192.168.11.35 dst=192.168.11.13 src_port=52045 dst_port=443 app=icloud-base proto=tcp srcuser="" dstuser="" nat_src="0.0.0.0" nat_sport=0 nat_dst="0.0.0.0" nat_dport=0 elapsed=90 inbound_if=ae1 outbound_if=ethernet1/2 bytes=735 bytes_sent=661 bytes_received=74 packets=3 pkts_sent=2 pkts_received=1 category=proxy-avoidance-and-anonymizers flags=0x2040d subtype=end vsys="vsys1" session_end_reason="threat"
とりあえずはlogを標準入力で受け取り,何もせずにlogstashに食わせて標準出力に出力するtest.confを適当な(ここではホーム)ディレクトリに作成する
input {
stdin{}
}
filter {
# 今回は整形しないため、filterブロックは空(または記述なし)にします
}
output {
# 標準出力に出力
stdout {
codec => rubydebug
}
}
テストで動かす場合はルート権限では動かないようなので,ユーザー権限で以下のようにしてテストする.
ここでtest.confのpathは絶対パスで指定する必要がある.
$ cat test.log | sudo -u logstash /usr/share/logstash/bin/logstash -f /home/hanada/test.conf --path.settings /etc/logstash --path.logs /tmp
すると
{
"event" => {
"original" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\""
},
"@timestamp" => 2026-01-04T21:02:38.933147039Z,
"host" => {
"hostname" => "zzzzz"
},
"message" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\"",
"@version" => "1"
}
...(省略)
のように@timestamp, hostのタグが自動的に付与され,流し込んだlogはmessageのtagに代入される.
ログの収集だけであればこれでも良いのだが,test.logはfieldがスペースで区切られ,Keyに=でValueが代入されており構造化されているのでkv filterで
filter {
# 1. grokで「日付・ホスト名」と「それ以降(kv_payload)」に分割
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:log_source} %{GREEDYDATA:kv_payload}" }
}
# 2. 切り出した kv_payload に対してKVフィルタを適用
kv {
source => "kv_payload"
value_split => "="
field_split => " "
lowercase_key => true # フィールド名(Key)をすべて小文字にする
# 文字列内のダブルクォーテーションを自動で削除する設定
remove_char_key => "\""
remove_char_value => "\""
}
# 3. 不要なフィールドを削除
mutate {
remove_field => [ "kv_payload" ]
}
}
とtest.confを編集して
$ cat test.log | sudo -u logstash /usr/share/logstash/bin/logstash -f /home/hanada/test.conf --path.settings /etc/logstash --path.logs /tmp
を実行すれば以下のように構造化される.
"start_time" => "2025/11/30 23:59:28",
"src_port" => "55708",
"src" => "192.168.11.2",
"event" => {
"original" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\""
},
"inbound_if" => "ae1",
"message" => "Nov 30 23:59:59 hostname type=TRAFFIC start_time=\"2025/11/30 23:59:28\" action=allow rule=00000000s0099 from_zone=Trust to_zone=Server src=192.168.11.2 dst=192.168.11.1 src_port=55708 dst_port=53 app=dns-base proto=udp srcuser=\"\" dstuser=\"\" nat_src=\"0.0.0.0\" nat_sport=0 nat_dst=\"0.0.0.0\" nat_dport=0 elapsed=0 inbound_if=ae1 outbound_if=ethernet1/7 bytes=620 bytes_sent=69 bytes_received=551 packets=2 pkts_sent=1 pkts_received=1 category=any flags=0x19 subtype=end vsys=\"vsys1\" session_end_reason=\"aged-out\"",
"bytes_sent" => "69",
"category" => "any",
"nat_src" => "0.0.0.0",
"from_zone" => "Trust",
"dst_port" => "53",
"proto" => "udp",
"pkts_received" => "1",
"subtype" => "end",
"bytes_received" => "551",
"nat_dport" => "0",
"@version" => "1",
"timestamp" => "Nov 30 23:59:59",
"log_source" => "hostname",
"vsys" => "vsys1",
"session_end_reason" => "aged-out",
"to_zone" => "Server",
"dst" => "192.168.11.1",
"nat_sport" => "0",
"@timestamp" => 2026-01-04T21:26:59.482727568Z,
"bytes" => "620",
"nat_dst" => "0.0.0.0",
"outbound_if" => "ethernet1/7",
"app" => "dns-base",
"action" => "allow",
"host" => {
"hostname" => "zzz"
},
"elapsed" => "0",
"packets" => "2",
"pkts_sent" => "1",
"type" => "TRAFFIC",
"rule" => "00000000s0099",
"flags" => "0x19"
}
を実行すればフィルターが期待通りに動いているか確認できる.タグの牽引に失敗した場合は[0] "_grokparsefailure" がタグフィールドに代入されるのでそれを確認すれば良いだろう.
フィルターのデバッグが終わったら設定ファイルは/etc/logstash/conf.d/に配置する.
私の環境ではlogの入力はrsyslogから転送しているのでrsyslogの転送ポートをinputに設定し,outputをelasticsearchに設定する.
input {
udp {
port => 6644
tags => ["testlog"]
}
}
filter {省略}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
data_stream => true
data_stream_type => "logs" # data_streamは必須
data_stream_dataset => "testlog"
user => "????"
password => "????"
}
}
data_streamの説明はelasticsearch DataStreamを参照
受信したlogがelasticsearchにindexが登録されているかどうかの確認は
curl -u <elastic-adm-name> GET "localhost:9200/_cat/indices?v&s=index"
で確認可能.
複数のconfファイルはpipelineに#
logstashの設定ファイルを/etc/logstash/conf.d/に置いたのちに/etc/logstash/pipelines.ymlに設定ファイルのパスを記載すれば適切に動く.
root@mittag-leffler:/etc/logstash/conf.d# cat /etc/logstash/pipelines.yml
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
#- pipeline.id: main
# path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: Alog
path.config: "/etc/logstash/conf.d/a-log.conf"
- pipeline.id: Blog
path.config: "/etc/logstash/conf.d/b-log.conf"
条件分岐#
ファイヤーウォールのlogのように単位のアプリケーションのlogはシステマティックに生成されているのでkvフィルターで構造化しやすい.しかしながsyslogのように複数あアプリーケーションのlogは基準化されていないためcase by caseで処理するしかない(もしかするとbeatsを使うともっと簡単なのしれない). ここで泥臭くif文で処理すると次のようになる.
filter {
mutate { remove_field => ["host"] }
mutate { gsub => [ "message", "\s+", " " ] }
if "nginx:" in [message] {
dissect {
mapping => { "message" => "%{ts} %{+ts} %{+ts} %{+ts} %{src} %{prog}: %{msg}" }
}
} else if "KERNEL" in [message] {
dissect {
mapping => { "message" => "%{ts} %{+ts} %{+ts} %{+ts} %{ip} %{prog}(%{src}@%{ip}): %{msg}" }
}
}
if "<ERRS>" in [message] or "[error]" in [message] {
mutate { add_field => {"alert" => "error"} }
}
else if "<WARN>" in [message] or "[warn]" in [message] {
mutate { remove_field => ["alert"] }
mutate { add_field => {"alert" => "warn"} }
}
else {
mutate { add_field => {"alert" => "info" } }
}
}
Field valueの型の変換#
上のlogstashの設定だとfield valueの型がkeywardになっているので,ipやportなどはip,integerなどに変換する
ここでは例えば/etc/logstash/conf.d/index_template.jsonに次のようにjsonを書く.
{
"index_patterns": ["somelog*"],
"template": {
"settings": {
"number_of_shards": 1
},
"mappings": {
"properties": {
"ip": { "type": "ip" },
"user": { "type": "keyword"},
"MAC": { "type": "keyword" },
"VLAN": { "type": "integer" }
}
}
}
}
temprateの投稿は
curl -X PUT "http://localhost:9200/_index_template/index_template" -H "Content-Type: application/json" -u <elastic-adm> -d @./index_temprate.json
既存のindexがある場合は変換されないので注意.
integer型の場合桁数の長い数字は 1111111->11,111,11 のようになるのでuser id などはkeyword型にした方が良いだろう.