博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
最新Centos7.6 部署ELK日志分析系统
阅读量:5038 次
发布时间:2019-06-12

本文共 16518 字,大约阅读时间需要 55 分钟。

下载elasticsearch

创建elk用户并授权
useradd elkchown -R elk:elk /home/elk/elasticsearchchown -R elk:elk /home/elk/elasticsearch1chown -R elk:elk /home/elk/elasticsearch2mkdir -p /home/eladatamkdir -p /var/log/elkchown -R elk:elk /home/eladatachown -R elk:elk /var/log/elk

主节点master

elasticsearch解压,修改配置文件
/home/elk/elasticsearch/config[root@localhost config]# grep -v  "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node0node.master: truenode.attr.rack: r1node.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70http.port: 9200transport.tcp.port: 9301discovery.zen.minimum_master_nodes: 1cluster.initial_master_nodes: ["node0"]
手动启动命令
su elk -l -c '/home/elk/elasticsearch/bin/elasticsearch -d'
启动文件 elasticsearch.service
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearchPrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearchEnvironment=ES_PATH_CONF=/home/elk/elasticsearch/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearchUser=elkGroup=elkExecStart=/home/elk/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#

Node1节点

/home/elk/elasticsearch1/config[root@localhost config]# grep -v  "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node1node.master: falsenode.attr.rack: r1node.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70transport.tcp.port: 9303http.port: 9302discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"][root@localhost config]#
手动启动命令
su elk -l -c '/home/elk/elasticsearch1/bin/elasticsearch1 -d'
启动文件 elasticsearch1.service
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch1.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearch1PrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearch1Environment=ES_PATH_CONF=/home/elk/elasticsearch1/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearchUser=elkGroup=elkExecStart=/home/elk/elasticsearch1/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#

Node2节点

/home/elk/elasticsearch2/config[root@localhost config]# grep -v  "^#" elasticsearch.yml cluster.name: my-applicationnode.name: node2node.attr.rack: r1node.master: falsenode.max_local_storage_nodes: 3path.data: /home/eladatapath.logs: /var/log/elkhttp.cors.enabled: truehttp.cors.allow-origin: "*"network.host: 192.168.1.70http.port: 9203transport.tcp.port: 9304discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"]discovery.zen.minimum_master_nodes: 1[root@localhost config]#
手动启动命令
su elk -l -c '/home/elk/elasticsearch2/bin/elasticsearch2 -d'
启动文件 elasticsearch2.service
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat elasticsearch2.service [Unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]RuntimeDirectory=elasticsearch2PrivateTmp=trueEnvironment=ES_HOME=/home/elk/elasticsearch2Environment=ES_PATH_CONF=/home/elk/elasticsearch2/configEnvironment=PID_DIR=/var/run/elasticsearchEnvironmentFile=-/etc/sysconfig/elasticsearchWorkingDirectory=/home/elk/elasticsearch2User=elkGroup=elkExecStart=/home/elk/elasticsearch2/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quietStandardOutput=journalStandardError=inheritLimitNOFILE=65536LimitNPROC=4096LimitAS=infinityLimitFSIZE=infinityTimeoutStopSec=0KillSignal=SIGTERMKillMode=processSendSIGKILL=noSuccessExitStatus=143[Install]WantedBy=multi-user.target[root@localhost system]#

下载logstash

目录如下,默认配置即可
[root@localhost logstash]# pwd/home/elk/logstash[root@localhost logstash]#
手动启动命令
./logstash -f ../dev.conf nohup ./logstash -f ../dev.conf &

下载kibana

配置文件如下
[root@localhost config]# pwd/home/elk/kibana/config[root@localhost config]# grep -v  "^#" kibana.yml server.host: "192.168.1.70"elasticsearch.hosts: ["http://192.168.1.70:9200"]kibana.index: ".kibana"i18n.locale: "zh-CN"
手动启动命令
./kibananohup ./kibana &
kibana启动文件
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat kibana.service [Unit]Description=Kibana  Server Manager[Service]ExecStart=/home/elk/kibana/bin/kibana[Install]WantedBy=multi-user.target[root@localhost system]#

端口为:5601 访问:192.168.1.70:5601

安装Elasticsearch -head

yum install git npmgit clone https://github.com/mobz/elasticsearch-head.git [root@localhost elasticsearch-head]# pwd/home/elk/elasticsearch-head[root@localhost elasticsearch-head]#
启动
npm install npm run startnohup npm run start &

curl -XPUT '192.168.2.67:9100/book'

访问192.168.2.67:9100 即可访问

下载kafka

修改配置文件如下
[root@localhost config]# pwd/home/elk/kafka/config[root@localhost config]# grep -v "^#" server.properties broker.id=0listeners=PLAINTEXT://192.168.1.70:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/var/log/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0delete.topic.enable=true[root@localhost config]#

kafka配置启动zookeeper

手动启动方式
[root@localhost bin]# pwd/home/elk/kafka/bin[root@localhost bin]#./zookeeper-server-start.sh ../config/zookeeper.properties
systemctl 启动zookeeper
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat zookeeper.service [Service]Type=forkingSyslogIdentifier=zookeeperRestart=alwaysRestartSec=0sExecStart=/home/elk/kafka/bin/zookeeper-server-start.sh -daemon /home/elk/kafka/config/zookeeper.propertiesExecStop=/home/elk/kafka/bin/zookeeper-server-stop.sh[root@localhost system]#

启动kafka服务

手动启动方式
./kafka-server-start.sh ../config/server.properties
systemctl 启动kafka
[root@localhost system]# pwd/lib/systemd/system[root@localhost system]# cat kafka.service [Unit]Description=Apache kafkaAfter=network.target[Service]Type=simpleRestart=alwaysRestartSec=0sExecStart=/home/elk/kafka/bin/kafka-server-start.sh  /home/elk/kafka/config/server.propertiesExecStop=/home/elk/kafka/bin/kafka-server-stop.sh[root@localhost system]#

测试kafka

新建一个名字为test的topic
/kafka-topics.sh --create --zookeeper 192.168.1.70:2181 --replication-factor 1 --partitions 1 --topic test
查看kafka中的topic
./kafka-topics.sh --list  --zookeeper 192.168.1.70:2181
往kafka topic为test中 生产消息
./kafka-console-producer.sh --broker-list 192.168.1.70:9092 --topic test
在kafka topic为test中 消费消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.70:9092 --topic test --from-beginning

生产的消息,消费那边接受到即是ok的

目标机器安装filebeat

安装6.5版本的

[root@localhost filebeat]# pwd/usr/local/filebeat[root@localhost filebeat]# cat filebeat.yml filebeat.prospectors:- type: log  paths:    - /opt/logs/workphone-tcp/catalina.out  fields:     tag: 54_tcp_catalina_out- type: log  paths:    - /opt/logs/workphone-webservice/catalina.out  fields:     tag: 54_web_catalina_outname: 192.168.1.54filebeat.config.modules:  path: ${path.config}/modules.d/*.yml  reload.enabled: falsesetup.template.settings:  index.number_of_shards: 3output.kafka:  hosts: ["192.168.1.70:9092"]  topic: "filebeat-log"  partition.hash:    reachable_only: true  compression: gzip  max_message_bytes: 1000000  required_acks: 1[root@localhost filebeat]#

安装完成后去logstash编辑配置文件

logstash操作

[root@localhost logstash]# pwd/home/elk/logstash[root@localhost logstash]# cat dev.conf input {  kafka{    bootstrap_servers => "192.168.1.70:9092"    topics => ["filebeat-log"]    codec => "json"  }}filter {        if [fields][tag]=="jpwebmap" {            json{                source => "message"                remove_field => "message"            }            geoip {            source => "client"            target => "geoip"            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]            }            mutate {                convert => [ "[geoip][coordinates]", "float"]                }        }    if [fields][tag] == "54_tcp_catalina_out"{            grok {                match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]            }            date {                match => ["logdate", "ISO8601"]            }            mutate {                remove_field => [ "logdate" ]            }    }    if [fields][tag] == "54_web_catalina_out"{                grok {                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]                }                date {                        match => ["logdate", "ISO8601"]                }                mutate {                        remove_field => [ "logdate" ]                }        }    if [fields][tag] == "55_tcp_catalina_out"{                grok {                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]                }                date {                        match => ["logdate", "ISO8601"]                }                mutate {                        remove_field => [ "logdate" ]                }        }        if [fields][tag] == "55_web_catalina_out"{                grok {                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]                }                date {                        match => ["logdate", "ISO8601"]                }                mutate {                        remove_field => [ "logdate" ]                }        }    if [fields][tag] == "51_nginx80_access_log" {            mutate {                add_field => { "spstr" => "%{[log][file][path]}" }            }            mutate {                split => ["spstr" , "/"]                # save the last element of the array as the api_method.                add_field => ["src", "%{[spstr][-1]}" ]            }            mutate{                remove_field => [ "friends", "ecs", "agent" , "spstr" ]            }            grok {                match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }                remove_field => "message"            }            date {                    match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]                    target => "@timestamp"            }            geoip {                source => "x_forwarded_for"                target => "geoip"                database => "/home/elk/logstash/GeoLite2-City.mmdb"                add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]                add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]            }            mutate {                convert => [ "[geoip][coordinates]", "float"]            }    }}output {if [fields][tag] == "wori"{  elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "zabbix"       }   }if [fields][tag] == "54_tcp_catalina_out"{  elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "54_tcp_catalina_out"       }    }if [fields][tag] == "54_web_catalina_out"{  elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "54_web_catalina_out"       }    }if [fields][tag] == "55_tcp_catalina_out"{  elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "55_tcp_catalina_out"       }    }   if [fields][tag] == "55_web_catalina_out"{  elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "55_web_catalina_out"       }    }if [fields][tag] == "51_nginx80_access_log" {       stdout{}    elasticsearch {    hosts => ["192.168.1.70:9200"]    index => "51_nginx80_access_log"    }   }}

其他的配置文件

index.conf
filter {    mutate {        add_field => { "spstr" => "%{[log][file][path]}" }    }        mutate {        split => ["spstr" , "/"]        # save the last element of the array as the api_method.        add_field => ["src", "%{[spstr][-1]}" ]        }        mutate{    remove_field => [ "friends", "ecs", "agent" , "spstr" ]    }}
java.conf
filter {if [fields][tag] == "java"{    grok {        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]    }    date {        match => ["logdate", "ISO8601"]    }    mutate {         remove_field => [ "logdate" ]    }  } #End if}
kafkainput.conf
input {  kafka{    bootstrap_servers => "172.16.11.68:9092"    #topics => ["ql-prod-tomcat" ]    topics => ["ql-prod-dubbo","ql-prod-nginx","ql-prod-tomcat" ]    codec => "json"    consumer_threads => 5    decorate_events => true    #auto_offset_reset => "latest"    group_id => "logstash"    #client_id => ""    ############################# HELK Optimizing Latency #############################    fetch_min_bytes => "1"    request_timeout_ms => "305000"    ############################# HELK Optimizing Availability #############################    session_timeout_ms => "10000"    max_poll_records => "550"    max_poll_interval_ms => "300000"  }}#input {#  kafka{#    bootstrap_servers => "172.16.11.68:9092"#    topics => ["ql-prod-java-dubbo","ql-prod","ql-prod-java" ]#    codec => "json"#    consumer_threads => 15#    decorate_events => true#    auto_offset_reset => "latest"#    group_id => "logstash-1"#    ############################# HELK Optimizing Latency ##############################    fetch_min_bytes => "1"#    request_timeout_ms => "305000"#    ############################# HELK Optimizing Availability ##############################    session_timeout_ms => "10000"#    max_poll_records => "550"#    max_poll_interval_ms => "300000"#  }#}
nginx.conf
filter {if [fields][tag] == "nginx-access" {        mutate {        add_field => { "spstr" => "%{[log][file][path]}" }        }        mutate {        split => ["spstr" , "/"]        # save the last element of the array as the api_method.        add_field => ["src", "%{[spstr][-1]}" ]        }        mutate{        remove_field => [ "friends", "ecs", "agent" , "spstr" ]        }    grok {        match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }        remove_field => "message"    }    date {                match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]                target => "@timestamp"        }    geoip {        source => "x_forwarded_for"        target => "geoip"        database => "/opt/logstash-6.2.4/GeoLite2-City.mmdb"        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]        }    mutate {        convert => [ "[geoip][coordinates]", "float"]    }  } #endif}
ouput.conf
output{  if [fields][tag] == "nginx-access" {       stdout{}    elasticsearch {    user => elastic    password => WR141bp2sveJuGFaD4oR    hosts => ["172.16.11.67:9200"]    index => "logstash-%{[fields][proname]}-%{+YYYY.MM.dd}"    }   }       #stdout{}   if [fields][tag] == "java" {        elasticsearch {        user => elastic        password => WR141bp2sveJuGFaD4oR        hosts => ["172.16.11.66:9200","172.16.11.68:9200"]        index => "%{[host][name]}-%{[src]}"        }  }}

转载于:https://www.cnblogs.com/daydev/p/11206314.html

你可能感兴趣的文章
信息安全
查看>>
centos7防火墙查看、启动、停止、添加、移除端口
查看>>
solr添加IK分词和自己定义词库
查看>>
mysql 日志
查看>>
在ASP.net中实现一个万能的“返回”按钮。
查看>>
HTML简介
查看>>
android可能遇到问题,以及找到的解决方法小总结!
查看>>
npm安装cnpm、vue、react
查看>>
通过adb命令打印log
查看>>
error rabbitMQ:Error: unable to perform an operation on node 'rabbit@xxxx'.
查看>>
js倒计时
查看>>
JSON数据检测是否正确
查看>>
hbase部署经验与坑总结
查看>>
腾讯QQ内测群新功能:QQ万人群即将袭来!
查看>>
iOS 事件处理机制与图像渲染过程
查看>>
数字是否可以被3和5同时整除,use if and % (21.9.2017)
查看>>
Warsaw University Contest Petrozavodsk, Thursday, January 31, 2008 F题,Gym100096F
查看>>
lcx端口转发 linux版
查看>>
arcgis server 10.1 发布动态图层展示海量及频繁更新的数据步骤
查看>>
strncat_s
查看>>