ELK logstash处理流程(23rd)
logstash处理事件有三个阶段:input ---> filter ---> output。input产生事件,filter 对事件进行修改,output输出到其它地方。input和output支持解码,可以对进入的或者退出管道的数据进行编码或解码而无需单独经过过滤器处理。
input
常用的input有:
- file:从文件系统中读取文件,类似于Linux下的tail -0F。
- syslog:监听在514端口的系统日志信息,并解析成RFC3164格式。
- redis:从redis服务器读取,同时使用redis channel和redis list。
- beats: 通过Filebeat发送事件。
其它的一些input有:
logstash-input-beats
logstash-input-couchdb_changes
logstash-input-elasticsearch
logstash-input-eventlog
logstash-input-exec
logstash-input-file
logstash-input-ganglia
logstash-input-gelf
logstash-input-generator
logstash-input-graphite
logstash-input-heartbeat
logstash-input-http
logstash-input-imap
logstash-input-irc
logstash-input-jdbc
logstash-input-kafka
logstash-input-log4j
logstash-input-lumberjack
logstash-input-pipe
logstash-input-rabbitmq
logstash-input-redis
logstash-input-s3
logstash-input-snmptrap
logstash-input-sqs
logstash-input-stdin
logstash-input-syslog
logstash-input-tcp
logstash-input-twitter
logstash-input-udp
logstash-input-unix
logstash-input-xmpp
logstash-input-zeromq
filter
filter是logstash管道中间处理的设备。可以结合条件语句对符合标准的事件进行处理。
一些有用的过滤器如下:
- grok: 解析和结构化任何文本。Grok 目前是logstash最好的方式对非结构化日志数据解析成结构化和可查询化。logstash内置了120个匹配模式,满足大部分需求。
- mutate: 在事件字段执行一般的转换。可以重命名、删除、替换和修改事件字段。
- drop: 完全丢弃事件,如debug事件。
- clone: 复制事件,可能添加或者删除字段。
- geoip: 添加有关IP地址地理位置信息。
其它filter有:
logstash-filter-anonymize
logstash-filter-checksum
logstash-filter-clone
logstash-filter-csv
logstash-filter-date
logstash-filter-dns
logstash-filter-drop
logstash-filter-fingerprint
logstash-filter-geoip
logstash-filter-grok
logstash-filter-json
logstash-filter-kv
logstash-filter-metrics
logstash-filter-multiline
logstash-filter-mutate
logstash-filter-ruby
logstash-filter-sleep
logstash-filter-split
logstash-filter-syslog_pri
logstash-filter-throttle
logstash-filter-urldecode
logstash-filter-useragent
logstash-filter-uuid
logstash-filter-xml
output
output是logstash管道的最后一个阶段。一个事件可以经过多个output。但是一旦所有输出处理完,该事件已经执行完。
常用的output有:
- elasticsearch: 发送事件数据到 Elasticsearch。如果要将数据保存在一个高效、便捷、易于查询的格式,elasticsearch将是不二人选。
- file: 将事件数据写入到磁盘文件上。
- graphite: 发送事件数据到graphite。http://graphite.wikidot.com/
- statsd: 发送事件数据到 statsd。
其它output有:
logstash-output-cloudwatch
logstash-output-csv
logstash-output-elasticsearch
logstash-output-email
logstash-output-exec
logstash-output-file
logstash-output-ganglia
logstash-output-gelf
logstash-output-graphite
logstash-output-hipchat
logstash-output-http
logstash-output-irc
logstash-output-juggernaut
logstash-output-kafka
logstash-output-lumberjack
logstash-output-nagios
logstash-output-nagios_nsca
logstash-output-null
logstash-output-opentsdb
logstash-output-pagerduty
logstash-output-pipe
logstash-output-rabbitmq
logstash-output-redis
logstash-output-s3
logstash-output-sns
logstash-output-sqs
logstash-output-statsd
logstash-output-stdout
logstash-output-tcp
logstash-output-udp
logstash-output-xmpp
logstash-output-zeromq
Coders
大众化的编码器有json、msgpack、plain(text)。
- json: 以json格式编码或者解码数据。
- multiline: 合并多行文本事件,如java异常和堆栈跟踪信息到一个单一事件。
其它coders有:
logstash-codec-collectd
logstash-codec-dots
logstash-codec-edn
logstash-codec-edn_lines
logstash-codec-es_bulk
logstash-codec-fluent
logstash-codec-graphite
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-msgpack
logstash-codec-multiline
logstash-codec-netflow
logstash-codec-oldlogstashjson
logstash-codec-plain
logstash-codec-rubydebug
故障容错
事件从一个管道到另一个管道使用内部的Ruby SizedQueue队列实现的。一个SizedQueue有最大的项目数。当队列达到最大值,所有的写入队列将会被阻塞。
logstash设置每个队列大小为20。这意味着最多20个事件可以挂起进入下一个阶段,这可以防止数据丢失和保持logstash作为一个数据存储系统。这些内部队列不用于长期存放信息。
小队列意味着当logstash任务繁重或者管道临时有问题时,更容易堵塞。当出现问题时,要么队列不限制要么丢弃信息。队列不限制时,会无限的增长一直超出内存大小,导致崩溃,从而队列中的所有信息丢失。在多数情况下,丢弃消息也是不希望接受的。
大多数output会不断尝试受故障影响的事件。output失败或者下游服务的问题如磁盘满、权限问题、网络故障、服务中止。
如果output失败,output线程等待直到output能成功发送消息。output停止从output队列读取,这意味着事件填满了队列。
当output队列满了,过滤器是被阻塞的,因此它们不能写入新的事件到输出队列。虽然写入到output队列被阻塞了,过滤器停止从filter队列读取。最终,可能会导致filter队列(input--->filter)满。
一个满的filter队列,阻塞input写入到filter。这将导致所有input停止处理数据无论是新的事件。
在理想的情况下,这种行为类似于当tcp窗口关闭为0。没有新的数据发送,因为接收器还没有处理完当前队列的数据,直到下游(output)问题解决,消息重新流动起来。
线程模型
当前logstash线程模型是:
input threads | filter worker threads | output worker
过滤器是可选的,没有filter是这样的:
input threads | output worker
每个input运行在自身的一个线程。这可以防止繁忙的input阻塞慢的。
filter线程模型是工人模型,每个工人接收到一个事件,并应用于所有过滤器,再将其发送到output队列。在CPU可扩展性,因为许多过滤器是CPU密集型的。filter workers默认是1,在启动logstash时,可以通过-w指定。
output当前工作模式是一个线程。output接收事件的顺序是以配置文件定义的输出顺序。
output在发布它们之前可能会决定临时缓存事件。这方面elasticsearch output是一个例子,缓存事件和刷新它们使用一个单独的线程。该机制(缓存很多事件和写使用单独的线程)可以改善性能,因为可以防止logstash管道一直等待elasticsearch响应。
资源使用
logstash通常至少有3个线程,如果没有filter只有2个。一个input线程,一个filter worker线程,和一个output线程。如果看到logstash使用多个CPU,这就是原因。如果想要了解每个线程在做神马,应该看看Debugging Java Performance这篇文章。java线程有名称,可以使用jstack和top找出谁在使用这些资源。
这篇全理论,枯燥无味。
本文由主机测评网发布,不代表主机测评网立场,转载联系作者并注明出处:https://zhuji.jb51.net/yunwei/8353.html