综合案例

1 ELFK+kafka采集服务日志

- 1.使用filebeat采集nginx和tomcat日志写入kafka集群
- 2.使用Logstash消费kafka数据写入到ES集群;
- 3.kibana出图展示数据;

1.1 采集nginx到kafka

[root@elk93 ~]# cat >/etc/filebeat/config/21-nginx-to-kafka.yaml <<EOF 
filebeat.config.modules:
  path: ${path.config}/modules.d/nginx.yml
  reload.enabled: true

output.kafka:
  hosts: 
  - 10.168.10.91:9092
  - 10.168.10.92:9092
  - 10.168.10.93:9092

  topic: jiaoshi05-nginx-to-kafka
EOF
[root@elk93 ~]# 
[root@elk93 ~]# filebeat -e -c /etc/filebeat/config/21-nginx-to-kafka.yaml 

1.2 采集tomcat日志到kafka


	2.采集tomcat到kafka
[root@elk93 ~]# cat  > /etc/filebeat/config/22-tomcat-to-kafka.yaml <<EOF
filebeat.inputs:
- type: filestream
  paths:
    - /app/tools/apache-tomcat-11.0.6/logs/tomcat.cmy.com_access_log*.json
  parsers:
    - ndjson:
        target: ""
        message_key: message

output.kafka:
  hosts: 
  - 10.168.10.91:9092
  - 10.168.10.92:9092
  - 10.168.10.93:9092

  topic: jiaoshi05-tomcat-to-kafka
EOF
[root@elk93 ~]# 
[root@elk93 ~]# 
[root@elk93 ~]# filebeat -e -c /etc/filebeat/config/22-tomcat-to-kafka.yaml --path.data /opt/tomcat-filebeat

1.3 创建apikey

	4.添加api-key
curl -X POST "https://10.168.10.91:9200/_security/api_key" \
  -H "Content-Type: application/json" \
  -u elastic:123456 \
  -k \
  -d '{
    "name": "cmy",
    "role_descriptors": {
      "filebeat_monitoring": {
        "cluster": ["all"],
        "index": [
          {
            "names": ["index-kafka-to-es*"],
            "privileges": ["all"]
          }
        ]
      }
    }
  }'


返回数据
{"id":"6-fms5YBwRrMyjf4FPMB","name":"cmy","api_key":"BoplIEYaQBCQ-BR_md80JQ","encoded":"Ni1mbXM1WUJ3UnJNeWpmNEZQTUI6Qm9wbElFWWFRQkNRLUJSX21kODBKUQ=="}[


解码数据:
[root@elk93 ~]# echo  Ni1mbXM1WUJ3UnJNeWpmNEZQTUI6Qm9wbElFWWFRQkNRLUJSX21kODBKUQ== | base64 -d ;echo
6-fms5YBwRrMyjf4FPMB:BoplIEYaQBCQ-BR_md80JQ

[root@elk93 ~]# 

1.4 logstash采集kafka数据到ES集群

5.logstash采集kafka数据到ES集群
[root@elk93 ~]# cat  > /etc/logstash/conf.d/17-kafka_nginx_tomcat-to-es.conf <<EOF
input { 
  kafka {
    bootstrap_servers => "10.168.10.91:9092,10.168.10.92:9092,10.168.10.93:9092"
    topics => ["jiaoshi05-nginx-to-kafka"]
    group_id => "jiaoshi05-kafka-003"
    auto_offset_reset => "earliest"
    type => nginx
  }

  kafka {
    bootstrap_servers => "10.168.10.91:9092,10.168.10.92:9092,10.168.10.93:9092"
    topics => ["jiaoshi05-tomcat-to-kafka"]
    group_id => "jiaoshi05-kafka-003"
    auto_offset_reset => "earliest"
    type => tomcat
  }
  
}

filter {
  json {
    source => "message"
  }

  mutate {
     remove_field => [ "agent","@version","ecs","input","log","event","host","fileset","service" ]
  }

  if [type] == "nginx" {
      grok {
        match => {
          "message" => "%{HTTPD_COMMONLOG}"
        }
      }

      useragent {
        source => "message"
        target => "cmy-linux97"
      }

      mutate {
        convert => {
            "bytes" => "integer"
        }
      }

      date {
        match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      }

  } else {

      mutate {
        convert => {
            "SendBytes" => "integer"
        }
      }


      date {
        match => [ "AccessTime", "[dd/MMM/yyyy:HH:mm:ss Z]" ]
      }

      useragent {
        source => "http_user_agent"
        target => "cmy-linux97"
      }

  }

  geoip {
    database => "/cmy/data/geoip/GeoLite2-City_20250311/GeoLite2-City.mmdb"
    default_database_type => "City"
    source => "clientip"
  }

}  

output { 

 # stdout { 
 #   codec => rubydebug 
 # } 

  if [type] == "nginx" {
      elasticsearch {
        hosts => ["10.168.10.91:9200","10.168.10.92:9200","10.168.10.93:9200"]
        index => "index-kafka-to-es-nginx"
        api_key => "6-fms5YBwRrMyjf4FPMB:BoplIEYaQBCQ-BR_md80JQ"
        ssl => true
        ssl_certificate_verification => false
      }
  } else {
      elasticsearch {
        hosts => ["10.168.10.91:9200","10.168.10.92:9200","10.168.10.93:9200"]
        index => "index-kafka-to-es-tomcat"
        api_key => "6-fms5YBwRrMyjf4FPMB:BoplIEYaQBCQ-BR_md80JQ"
        ssl => true
        ssl_certificate_verification => false
      }
  }
}
EOF
[root@elk93 ~]# 
[root@elk93 ~]# 
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/17-kafka_nginx_tomcat-to-es.conf

1.5 kibana出图展示

kibana出图展示 
搜索关键字--->  type : "tomcat"



还有就要注意地图的问题,需要创建索引模板,添加映射信息
	geoip.location ---> '地理位置指标点'

2 ELFK+kafka采集docker日志实战

2.1 filebeat采集docker日志

 cat  > /etc/filebeat/config/24-docker-to-kafka.yaml  <<EOF
filebeat.inputs:
- type: container
  stream: all
  paths: 
    - '/var/lib/docker/containers/*/*.log'
  processors:
    - add_docker_metadata:
        host: "unix:///var/run/docker.sock"

output.kafka:
  hosts: 
  - 10.168.10.91:9092
  - 10.168.10.92:9092
  - 10.168.10.93:9092

  topic: jiaoshi05-docker-to-kafka
EOF


 filebeat -e -c /etc/filebeat/config/24-docker-to-kafka.yaml

2.2 Logstash采集数据写入ES集群

 cat > /etc/logstash/conf.d/19-kafka_docker-to-es.conf <<EOF 
input { 
  kafka {
    bootstrap_servers => "10.168.10.91:9092,10.168.10.92:9092,10.168.10.93:9092"
    topics => ["jiaoshi05-docker-to-kafka"]
    group_id => "jiaoshi05-kafka-002"
    auto_offset_reset => "earliest"
  }
  
}

filter {
  json {
    source => "message"
  }

  mutate {
     remove_field => [ "agent","@version","ecs","input","log","event","host","fileset","service" ]
  }

}  

output { 

 # stdout { 
 #   codec => rubydebug 
 # } 

  elasticsearch {
    hosts => ["10.168.10.91:9200","10.168.10.92:9200","10.168.10.93:9200"]
    index => "index-kafka-to-es-docker"
    api_key => "6-fms5YBwRrMyjf4FPMB:BoplIEYaQBCQ-BR_md80JQ"
    ssl => true
    ssl_certificate_verification => false
  }
}
EOF
[root@elk93 ~]# 
[root@elk93 ~]# 
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/19-kafka_docker-to-es.conf 


3 ELFK+kafka采集apps日志实战

 ElasticStack项目案例之apps日志分析案例
	1.检查自研日志文件
[root@elk93 ~]# ll /tmp/apps.log 
-rw-r--r-- 1 root root 48636 May  9 15:52 /tmp/apps.log
[root@elk93 ~]# 
[root@elk93 ~]# wc -l /tmp/apps.log 
570 /tmp/apps.log
[root@elk93 ~]# 

	2.将自研日志写入到kafka集群
[root@elk93 ~]# cat /etc/filebeat/config/23-apps-to-kafka.yaml
filebeat.inputs:
- type: filestream
  paths:
    - /tmp/apps.log

output.kafka:
  hosts: 
  - 10.0.0.91:9092
  - 10.0.0.92:9092
  - 10.0.0.93:9092

  topic: jiaoshi05-apps-to-kafka
[root@elk93 ~]# 
[root@elk93 ~]# filebeat -e -c /etc/filebeat/config/23-apps-to-kafka.yaml --path.data /opt/apps-filebeat


	3.Logstash处理数据
[root@elk93 ~]# cat /etc/logstash/conf.d/18-kafka_apps-to-es.conf 
input { 
  kafka {
    bootstrap_servers => "10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092"
    topics => ["jiaoshi05-apps-to-kafka"]
    group_id => "jiaoshi05-kafka-003"
    auto_offset_reset => "earliest"
    type => apps
  }
  
}

filter {
  json {
    source => "message"
  }

  mutate {
     remove_field => [ "agent","@version","ecs","input","log","event","host","fileset","service" ]
  }

  mutate {
     split => { "message" => "|" }

     add_field => { 
      "uid" => "%{[message][1]}" 
      "action" => "%{[message][2]}" 
      "svip" => "%{[message][3]}" 
      "price" => "%{[message][4]}" 
      "other" => "%{[message][0]}"
     }

  }

  mutate {
     convert => {
       "price" => "float"
     }

     
     split => { "other" => " " }

     add_field => { 
       "dt" => "%{[other][1]} %{[other][2]}" 
     }
  
     remove_field => [ "path","@version","host","message", "other"]
  }



  date {
    match => [ "dt", "yyyy-MM-dd HH:mm:ss" ]
  }


}  

output { 

  #stdout { 
  #  codec => rubydebug 
  #} 

  elasticsearch {
    hosts => ["10.0.0.91:9200","10.0.0.92:9200","10.0.0.93:9200"]
    index => "index-kafka-to-es-apps"
    api_key => "SF3Vs5YBrRRCa1Lz8tbz:TfynLgqlS_-4zbloDPrkZA"
    ssl => true
    ssl_certificate_verification => false
  }
}
[root@elk93 ~]# 
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/18-kafka_apps-to-es.conf  --path.data=/opt/logstash-apps


	4.kibana出图展示
略,见视频。

4 总结

日志格式处理流程
由于kafka传过来的数据格式为json
所以先调用json插件将字段取出来
后面利用mutate进行字段处理

#elk

上一篇
下一篇