1 ELFK+kafka采集服务日志
- 1.使用filebeat采集nginx和tomcat日志写入kafka集群
- 2.使用Logstash消费kafka数据写入到ES集群;
- 3.kibana出图展示数据;
1.1 采集nginx到kafka
[root@elk93 ~]# cat >/etc/filebeat/config/21-nginx-to-kafka.yaml <<EOF
filebeat.config.modules:
path: ${path.config}/modules.d/nginx.yml
reload.enabled: true
output.kafka:
hosts:
- 10.168.10.91:9092
- 10.168.10.92:9092
- 10.168.10.93:9092
topic: jiaoshi05-nginx-to-kafka
EOF
[root@elk93 ~]#
[root@elk93 ~]# filebeat -e -c /etc/filebeat/config/21-nginx-to-kafka.yaml
1.2 采集tomcat日志到kafka
2.采集tomcat到kafka
[root@elk93 ~]# cat > /etc/filebeat/config/22-tomcat-to-kafka.yaml <<EOF
filebeat.inputs:
- type: filestream
paths:
- /app/tools/apache-tomcat-11.0.6/logs/tomcat.cmy.com_access_log*.json
parsers:
- ndjson:
target: ""
message_key: message
output.kafka:
hosts:
- 10.168.10.91:9092
- 10.168.10.92:9092
- 10.168.10.93:9092
topic: jiaoshi05-tomcat-to-kafka
EOF
[root@elk93 ~]#
[root@elk93 ~]#
[root@elk93 ~]# filebeat -e -c /etc/filebeat/config/22-tomcat-to-kafka.yaml --path.data /opt/tomcat-filebeat
1.3 创建apikey
4.添加api-key
curl -X POST "https://10.168.10.91:9200/_security/api_key" \
-H "Content-Type: application/json" \
-u elastic:123456 \
-k \
-d '{
"name": "cmy",
"role_descriptors": {
"filebeat_monitoring": {
"cluster": ["all"],
"index": [
{
"names": ["index-kafka-to-es*"],
"privileges": ["all"]
}
]
}
}
}'
返回数据
{"id":"6-fms5YBwRrMyjf4FPMB","name":"cmy","api_key":"BoplIEYaQBCQ-BR_md80JQ","encoded":"Ni1mbXM1WUJ3UnJNeWpmNEZQTUI6Qm9wbElFWWFRQkNRLUJSX21kODBKUQ=="}[
解码数据:
[root@elk93 ~]# echo Ni1mbXM1WUJ3UnJNeWpmNEZQTUI6Qm9wbElFWWFRQkNRLUJSX21kODBKUQ== | base64 -d ;echo
6-fms5YBwRrMyjf4FPMB:BoplIEYaQBCQ-BR_md80JQ
[root@elk93 ~]#
1.4 logstash采集kafka数据到ES集群
5.logstash采集kafka数据到ES集群
[root@elk93 ~]# cat > /etc/logstash/conf.d/17-kafka_nginx_tomcat-to-es.conf <<EOF
input {
kafka {
bootstrap_servers => "10.168.10.91:9092,10.168.10.92:9092,10.168.10.93:9092"
topics => ["jiaoshi05-nginx-to-kafka"]
group_id => "jiaoshi05-kafka-003"
auto_offset_reset => "earliest"
type => nginx
}
kafka {
bootstrap_servers => "10.168.10.91:9092,10.168.10.92:9092,10.168.10.93:9092"
topics => ["jiaoshi05-tomcat-to-kafka"]
group_id => "jiaoshi05-kafka-003"
auto_offset_reset => "earliest"
type => tomcat
}
}
filter {
json {
source => "message"
}
mutate {
remove_field => [ "agent","@version","ecs","input","log","event","host","fileset","service" ]
}
if [type] == "nginx" {
grok {
match => {
"message" => "%{HTTPD_COMMONLOG}"
}
}
useragent {
source => "message"
target => "cmy-linux97"
}
mutate {
convert => {
"bytes" => "integer"
}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
} else {
mutate {
convert => {
"SendBytes" => "integer"
}
}
date {
match => [ "AccessTime", "[dd/MMM/yyyy:HH:mm:ss Z]" ]
}
useragent {
source => "http_user_agent"
target => "cmy-linux97"
}
}
geoip {
database => "/cmy/data/geoip/GeoLite2-City_20250311/GeoLite2-City.mmdb"
default_database_type => "City"
source => "clientip"
}
}
output {
# stdout {
# codec => rubydebug
# }
if [type] == "nginx" {
elasticsearch {
hosts => ["10.168.10.91:9200","10.168.10.92:9200","10.168.10.93:9200"]
index => "index-kafka-to-es-nginx"
api_key => "6-fms5YBwRrMyjf4FPMB:BoplIEYaQBCQ-BR_md80JQ"
ssl => true
ssl_certificate_verification => false
}
} else {
elasticsearch {
hosts => ["10.168.10.91:9200","10.168.10.92:9200","10.168.10.93:9200"]
index => "index-kafka-to-es-tomcat"
api_key => "6-fms5YBwRrMyjf4FPMB:BoplIEYaQBCQ-BR_md80JQ"
ssl => true
ssl_certificate_verification => false
}
}
}
EOF
[root@elk93 ~]#
[root@elk93 ~]#
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/17-kafka_nginx_tomcat-to-es.conf
1.5 kibana出图展示
kibana出图展示
搜索关键字---> type : "tomcat"
还有就要注意地图的问题,需要创建索引模板,添加映射信息
geoip.location ---> '地理位置指标点'
2 ELFK+kafka采集docker日志实战
2.1 filebeat采集docker日志
cat > /etc/filebeat/config/24-docker-to-kafka.yaml <<EOF
filebeat.inputs:
- type: container
stream: all
paths:
- '/var/lib/docker/containers/*/*.log'
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
output.kafka:
hosts:
- 10.168.10.91:9092
- 10.168.10.92:9092
- 10.168.10.93:9092
topic: jiaoshi05-docker-to-kafka
EOF
filebeat -e -c /etc/filebeat/config/24-docker-to-kafka.yaml
2.2 Logstash采集数据写入ES集群
cat > /etc/logstash/conf.d/19-kafka_docker-to-es.conf <<EOF
input {
kafka {
bootstrap_servers => "10.168.10.91:9092,10.168.10.92:9092,10.168.10.93:9092"
topics => ["jiaoshi05-docker-to-kafka"]
group_id => "jiaoshi05-kafka-002"
auto_offset_reset => "earliest"
}
}
filter {
json {
source => "message"
}
mutate {
remove_field => [ "agent","@version","ecs","input","log","event","host","fileset","service" ]
}
}
output {
# stdout {
# codec => rubydebug
# }
elasticsearch {
hosts => ["10.168.10.91:9200","10.168.10.92:9200","10.168.10.93:9200"]
index => "index-kafka-to-es-docker"
api_key => "6-fms5YBwRrMyjf4FPMB:BoplIEYaQBCQ-BR_md80JQ"
ssl => true
ssl_certificate_verification => false
}
}
EOF
[root@elk93 ~]#
[root@elk93 ~]#
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/19-kafka_docker-to-es.conf
3 ELFK+kafka采集apps日志实战
ElasticStack项目案例之apps日志分析案例
1.检查自研日志文件
[root@elk93 ~]# ll /tmp/apps.log
-rw-r--r-- 1 root root 48636 May 9 15:52 /tmp/apps.log
[root@elk93 ~]#
[root@elk93 ~]# wc -l /tmp/apps.log
570 /tmp/apps.log
[root@elk93 ~]#
2.将自研日志写入到kafka集群
[root@elk93 ~]# cat /etc/filebeat/config/23-apps-to-kafka.yaml
filebeat.inputs:
- type: filestream
paths:
- /tmp/apps.log
output.kafka:
hosts:
- 10.0.0.91:9092
- 10.0.0.92:9092
- 10.0.0.93:9092
topic: jiaoshi05-apps-to-kafka
[root@elk93 ~]#
[root@elk93 ~]# filebeat -e -c /etc/filebeat/config/23-apps-to-kafka.yaml --path.data /opt/apps-filebeat
3.Logstash处理数据
[root@elk93 ~]# cat /etc/logstash/conf.d/18-kafka_apps-to-es.conf
input {
kafka {
bootstrap_servers => "10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092"
topics => ["jiaoshi05-apps-to-kafka"]
group_id => "jiaoshi05-kafka-003"
auto_offset_reset => "earliest"
type => apps
}
}
filter {
json {
source => "message"
}
mutate {
remove_field => [ "agent","@version","ecs","input","log","event","host","fileset","service" ]
}
mutate {
split => { "message" => "|" }
add_field => {
"uid" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
"other" => "%{[message][0]}"
}
}
mutate {
convert => {
"price" => "float"
}
split => { "other" => " " }
add_field => {
"dt" => "%{[other][1]} %{[other][2]}"
}
remove_field => [ "path","@version","host","message", "other"]
}
date {
match => [ "dt", "yyyy-MM-dd HH:mm:ss" ]
}
}
output {
#stdout {
# codec => rubydebug
#}
elasticsearch {
hosts => ["10.0.0.91:9200","10.0.0.92:9200","10.0.0.93:9200"]
index => "index-kafka-to-es-apps"
api_key => "SF3Vs5YBrRRCa1Lz8tbz:TfynLgqlS_-4zbloDPrkZA"
ssl => true
ssl_certificate_verification => false
}
}
[root@elk93 ~]#
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/18-kafka_apps-to-es.conf --path.data=/opt/logstash-apps
4.kibana出图展示
略,见视频。
4 总结
日志格式处理流程
由于kafka传过来的数据格式为json
所以先调用json插件将字段取出来
后面利用mutate进行字段处理
#elk