首先安装logstash,这个非常简单,不赘述。建议把所有插件都安装上,省心。

然后要配置一下logstash for nginx。logstash基本原理:input => filter => output。在我们这里input就是nginx的access日志,output就是ElasticSearch。filter则是用来解析和过滤日志用。一般我们要把message结构化再存储,方便后面的搜索和统计。因此需要对message进行解析。logstash是使用grok过滤器,使用match正则表达式解析。要根据自己的log_format来定制。

比如这里我们的log_format配置是:

log_format  main      '$http_host '
                      '$remote_addr - $remote_user [$time_local] '
                      '"$request" $status $body_bytes_sent "$request_body" '
                      '"$http_referer" "$http_user_agent" "$http_x_forwarded_for" '
                      '$request_time '  
                      '$upstream_response_time';

那么相应的grok就是:

%{IPORHOST:http_host} %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{INT:status} %{INT:body_bytes_sent} %{QS:request_body} %{QS:http_referer} %{QS:http_user_agent} %{QS:http_x_forwarded_for} %{NUMBER:request_time:float} %{NUMBER:upstream_response_time:float}

可以用logstash.study.arganzheng.me 95.159.86.100 - - [18/Apr/2016:11:38:51 +0800] "GET /ad.php?p=com.adslib.mobovee&hp=com.pay.m.KOF95&l=d784ee59431b73536284463336b26c03&c=mobojoy HTTP/1.1" 200 10 "-" "-" "Dalvik/2.1.0 (Linux; U; Android 5.0.2; SM-T810 Build/LRX22G)" "-" 0.002 0.002在Grok Debugger测试一下,或者直接用input { stdin { } }测试。

可以把这个Grok抽取表达式作为一个pattern固定下来:

$ cd logstash
$ mkdir patterns # 可以在grok中通过 patterns_dir 指定,默认是这个位置
$ vim /home/work/logstash/patterns/nginx

添加如下一行:

NGINXACCESS %{IPORHOST:http_host} %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{INT:status} %{INT:body_bytes_sent} %{QS:request_body} %{QS:http_referer} %{QS:http_user_agent} %{QS:http_x_forwarded_for} %{NUMBER:request_time:float} %{NUMBER:upstream_response_time:float}

TIPS

1、logstash默认定义了一些 grok pattern,一般来说对于字符串,有双引号包含的用QS,没有的用DATA类型,如%{DATA:request_body}。

2、grok匹配很容易出错,可以使用Grok Debugger进行在线调试。

3、如果想要让URL参数也解析并且成为索引字段,比如一些通用参数,如uid, country, language, etc. 那么可以使用KV插件

filter {

	grok {
		...
	}
	
	...

	# 再单独将取得的URL、request字段取出来进行key-value值匹配
	# 需要kv插件。提供字段分隔符"&?",值键分隔符"=",则会自动将字段和值采集出来。
  	kv {
      	source => "request" # 默认是message,我们这里只需要解析上面grok抽取出来的request字段
      	field_split => "&?"
      	value_split => "="
      	include_keys => [ "network", "country", "language", "deviceId" ]
  	}
 
  	# 把所有字段进行urldecode(显示中文)
  	urldecode {
     	all_fields => true
  	}

}

4、过滤掉安全扫描。logstash的drop filter可以简单对消息进行过滤,例如:

if [message] !~ ".*ERROR.*|.*error.*|.*FATAL.*|.*fatal.*|.*WARN.*|.*warn.*" {
	drop { }
}

对于安全扫描,只需要过滤http_user_agent中含有inf-ssl-duty-scan的请求就可以了:

if [http_user_agent] =~ "inf-ssl-duty-scan" { # 过滤安全扫描
	drop { }
}

综上,整个logstash-nginx.conf配置文件如下:

## input { stdin { } }
input {
	file {
		path => ["/home/work/nginx/logs/access*.log"]
    	# exclude => ["*.gz"]
    	# start_position => "beginning"
	}
}

## nginx log format config
# log_format  main     '$http_host '
#                      '$remote_addr - $remote_user [$time_local] '
#                      '"$request" $status $body_bytes_sent "$request_body" '
#                      '"$http_referer" "$http_user_agent" "$http_x_forwarded_for" '
#                      '$request_time $upstream_response_time';

filter {
  	grok {
  		# patterns_dir => [ "/home/work/logstash/patterns" ]
       	match => { "message" => "%{NGINXACCESS}" }
    }

    if [http_user_agent] =~ "inf-ssl-duty-scan" { 
    	drop { }
    }

  	date {
    	match => [ "time_local" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  	}

  	geoip {
        source => "remote_addr"
    }

  	kv {
      	source => "request"
      	field_split => "&?"
      	value_split => "="
      	include_keys => [ "network", "country", "language", "deviceId" ]
  	}

  	urldecode {
     	all_fields => true
  	}
}

output {
  	elasticsearch {
        hosts => ["10.242.122.23:8200","10.242.99.47:8200","10.242.103.33:8200"]
        index => "logstash-nginx-%{+YYYY.MM.dd}"
  }
  # stdout { codec => rubydebug }
}

然后我们就可以启动logstash查看一下效果了:

$ bin/logstash -f conf/logstash-nginx.conf

会看到logstash已经在扫描nginx日志发送到ES集群了。可以配置一下Kibana,对日志进行分析和可视化。具体参见参考文章。

TIPS

注意,这里收集了所有的nginx访问日志,索引文件会很大(3G左右的日志量可以产生10G左右的索引文件),虽然logstash默认是按天分割了,但是还是不能存储太久的日志,需要定时清理,可以使用 elasticsearch-curator 定时清理。

参考文章

  1. Centralized Logging with Logstash and Kibana On CentOS 7 一个序列教程,比较新。
  2. Adding Logstash Filters To Improve Centralized Logging
  3. GETTING THE BEST OUT OF LOGSTASH FOR NGINX
  4. How To Map User Location with GeoIP and ELK (Elasticsearch, Logstash, and Kibana)
  5. Setting up Logstash 1.4.2 to forward Nginx logs to Elasticsearch
  6. grok
  7. kv
  8. What Logstash Pitfalls You Need to Avoid
  9. How to Deploy the ELK Stack in Production