Logstash 2.2.0 的最佳实践

Logstash 2.2.0 的最佳实践

目录

本文中实例是基于基于MAC 10.10.3操作

基础知识

  什么是 Logstash?为什么要用 Logstash?怎么用 Logstash?

  本章正是来回答这个问题,或许不完整,但是足够讲述一些基础概念。跟着我们安装章节一步步来,你就可以成功的运行起来自己的第一个 logstash 了。

  我可能不会立刻来展示 logstash 配置细节或者运用场景。我认为基础原理和语法的介绍应该更加重要,这些知识未来对你的帮助绝对更大!

  所以,认真阅读他们吧!

介绍

历史沿革
  Logstash 项目诞生于 2009 年 8 月 2 日。其作者是世界著名的运维工程师乔丹西塞(JordanSissel),乔丹西塞当时是著名虚拟主机托管商 DreamHost 的员工,还发布过非常棒的软件打包工具 fpm,并主办着一年一度的 sysadmin advent calendar(advent calendar 文化源自基督教氛围浓厚的 Perl 社区,在每年圣诞来临的 12 月举办,从 12 月 1 日起至 12 月 24 日止,每天发布一篇小短文介绍主题相关技术)。

小贴士:Logstash 动手很早,对比一下,scribed 诞生于 2008 年,flume 诞生于 2010 年,Graylog2 诞生于 2010 年,Fluentd 诞生于 2011 年。

  scribed 在 2011 年进入半死不活的状态,大大激发了其他各种开源日志收集处理框架的蓬勃发展,Logstash 也从 2011 年开始进入 commit 密集期并延续至今。

  作为一个系出名门的产品,Logstash 的身影多次出现在 Sysadmin Weekly 上,它和它的小伙伴们 Elasticsearch、Kibana 直接成为了和商业产品 Splunk 做比较的开源项目(乔丹西塞曾经在博客上承认设计想法来自 AWS 平台上最大的第三方日志服务商 Loggy,而 Loggy 两位创始人都曾是 Splunk 员工)。

  2013 年,Logstash 被 Elasticsearch 公司收购,ELK stack 正式成为官方用语(虽然还没正式命名)。Elasticsearch 本身 也是近两年最受关注的大数据项目之一,三次融资已经超过一亿美元。在 Elasticsearch 开发人员的共同努力下,Logstash 的发布机制,插件架构也愈发科学和合理。

小贴士
- elasticsearch 项目开始于 2010 年,其实比 logstash 还晚;
- 目前我们看到的 angularjs 版本 kibana 其实原名叫 elasticsearch-dashboard,kibana 原先是 RoR 框架的另一个项目,但作者是同一个人,换句话说,kibana 比 logstash 还早就进了 elasticsearch 名下。

社区文化
  日志收集处理框架这么多,像 scribe 是 facebook 出品,flume 是 apache 基金会项目,都算声名赫赫。但 logstash 因乔丹西塞的个人性格,形成了一套独特的社区文化。每一个在 google groups 的 logstash-users 组里问答的人都会看到这么一句话:

  Remember: if a new user has a bad time, it's a bug in logstash.

  所以,logstash 是一个开放的,极其互助和友好的大家庭。有任何问题,尽管在 github issue,Google groups,Freenode#logstash channel 上发问就好!

安装

下载
  Logstash 的各版本以及下载地址在这里:https://www.elastic.co/downloads/logstash

  • 源代码方式

    wget https://download.elastic.co/logstash/logstash/logstash-2.2.2.tar.gz

  • Debian 平台

    wget https://download.elastic.co/logstash/logstash/packages/debian/logstash_2.2.2-1_all.deb

  • Redhat 平台

    wget https://download.elastic.co/logstash/logstash/packages/centos/logstash-2.2.2-1.noarch.rpm

安装
  上面这些包,你可能更偏向使用 rpm,dpkg 等软件包管理工具来安装 Logstash,开发者在软件包里预定义了一些依赖。比如,logstash-1.4.2-1_2c0f5a.narch 就依赖于 jre 包。

  另外,软件包里还包含有一些很有用的脚本程序,比如 /etc/init.d/logstash。

  如果你必须得在一些很老的操作系统上运行 Logstash,那你只能用源代码包部署了,记住要自己提前安装好 Java:

yum install openjdk-jre
export JAVA_HOME=/usr/java
tar zxvf logstash-1.4.2.tar.gz

最佳实践
  但是真正的建议是:如果可以,请用 Elasticsearch 官方仓库来直接安装 Logstash!

  Debian 平台

wget -O - http://packages.elasticsearch.org/GPG-KEY-elasticsearch | apt-key add -
cat >> /etc/apt/sources.list <<EOF
deb http://packages.elasticsearch.org/logstash/1.4/debian stable main
EOF
apt-get update
apt-get install logstash

  Redhat 平台

rpm --import http://packages.elasticsearch.org/GPG-KEY-elasticsearch
cat > /etc/yum.repos.d/logstash.repo <EOF
[logstash-1.4]
name=logstash repository for 1.4.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.4/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
EOF
yum clean all
yum install logstash

Hello World

  和绝大多数 IT 技术介绍一样,我们以一个输出 "hello world" 的形式开始我们的 logstash 学习。

运行
  在终端中,像下面这样运行命令来启动 Logstash 进程,然后你会发现终端在等待你的输入。没问题,敲入 Hello World,回车,然后看看会返回什么结果!

➜ /Users/lion/tar/logstash-2.2.2 >bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
Settings: Default pipeline workers: 8
Logstash startup completed
Hello World
{
       "message" => "Hello World",
      "@version" => "1",
    "@timestamp" => "2016-03-28T06:38:13.269Z",
          "host" => "liondeMacBook-Pro.local"
}

解释
  每位系统管理员都肯定写过很多类似这样的命令:cat randdata | awk '{print $2}' | sort | uniq -c | tee sortdata。这个管道符 | 可以算是 Linux 世界最伟大的发明之一(另一个是“一切皆文件”)。

  Logstash 就像管道符一样!

  你输入(就像命令行的 cat )数据,然后处理过滤(就像 awk 或者 uniq 之类)数据,最后输出(就像 tee )到其他地方。

  当然实际上,Logstash 是用不同的线程来实现这些的。

  数据在线程之间以 事件 的形式流传。不要叫行,因为 logstash 可以处理多行事件。

  Logstash 会给事件添加一些额外信息。最重要的就是 @timestamp,用来标记事件的发生时间。因为这个字段涉及到 Logstash 的内部流转,所以必须是一个 joda 对象,如果你尝试自己给一个字符串字段重命名为 @timestamp 的话,Logstash 会直接报错。所以,请使用 filters/date 插件 来管理这个特殊字段

  此外,大多数时候,还可以见到另外几个:

  • host 标记事件发生在哪里。
  • type 标记事件的唯一类型。
  • tags 标记事件的某方面属性。这是一个数组,一个事件可以有多个标签。

  你可以随意给事件添加字段或者从事件里删除字段。事实上事件就是一个 Ruby 对象,或者更简单的理解为就是一个哈希也行。

小贴士
  每个 logstash 过滤插件,都会有四个方法叫 add_tag, remove_tag, add_field 和 remove_field。它们在插件过滤匹配成功时生效。

长期运行

  完成上一节的初次运行后,你肯定会发现一点:一旦你按下 Ctrl+C,停下标准输入输出,logstash 进程也就随之停止了。作为一个肯定要长期运行的程序,应该怎么处理呢?

  本章节问题对于一个运维来说应该属于基础知识,鉴于 ELK 用户很多其实不是运维,添加这段内容。

  办法有很多种,下面介绍四种最常用的办法:

标准的 service 方式
  采用 RPM、DEB 发行包安装的读者,推荐采用这种方式。发行包内,都自带有 sysV 或者 systemd 风格的启动程序/配置,你只需要直接使用即可。

  以 RPM 为例,/etc/init.d/logstash 脚本中,会加载 /etc/init.d/functions 库文件,利用其中的 daemon 函数,将 logstash 进程作为后台程序运行。

  所以,你只需把自己写好的配置文件,统一放在 /etc/logstash/ 目录下(注意目录下所有配置文件都应该是 .conf 结尾,且不能有其他文本文件存在。因为 logstash agent 启动的时候是读取全文件夹的),然后运行 service logstash start 命令即可。

最基础的 nohup 方式
  这是最简单的方式,也是 linux 新手们很容易搞混淆的一个经典问题:

command
command > /dev/null
command > /dev/null 2>&1
command &
command > /dev/null &
command > /dev/null 2>&1 &
command &> /dev/null
nohup command &> /dev/null

  请回答以上命令的异同……

  具体不一一解释了。直接说答案,想要维持一个长期后台运行的 logstash,你需要同时在命令前面加 nohup,后面加 &。

最推荐的 daemontools 方式
  不管是 nohup 还是 screen,都不是可以很方便管理的方式,在运维管理一个 ELK 集群的时候,必须寻找一种尽可能简洁的办法。所以,对于需要长期后台运行的大量程序(注意大量,如果就一个进程,还是学习一下怎么写 init 脚本吧),推荐大家使用一款 daemontools 工具。

  daemontools 是一个软件名称,不过配置略复杂。所以这里我其实是用其名称来指代整个同类产品,包括但不限于 python 实现的 supervisord,perl 实现的 ubic,ruby 实现的 god 等。

  以 supervisord 为例,因为这个出来的比较早,可以直接通过 EPEL 仓库安装。

yum -y install supervisord --enablerepo=epel

  在 /etc/supervisord.conf 配置文件里添加内容,定义你要启动的程序:

environment=LS_HEAP_SIZE=5000m
directory=/opt/logstash
command=/opt/logstash/bin/logstash -f /etc/logstash/pro1.conf --pluginpath /opt/logstash/plugins/ -w 10 -l /var/log/logstash/pro1.log
[program:elkpro_2]
environment=LS_HEAP_SIZE=5000m
directory=/opt/logstash
command=/opt/logstash/bin/logstash -f /etc/logstash/pro2.conf --pluginpath /opt/logstash/plugins/ -w 10 -l /var/log/logstash/pro2.log

  然后启动 service supervisord start 即可。

  logstash 会以 supervisord 子进程的身份运行,你还可以使用 supervisorctl 命令,单独控制一系列 logstash 子进程中某一个进程的启停操作:

supervisorctl stop elkpro_2

配置语法

  Logstash 社区通常习惯用 shipper,broker 和 indexer 来描述数据流中不同进程各自的角色。

  不过我见过很多运用场景里都没有用 logstash 作为 shipper,或者说没有用 elasticsearch 作为数据存储也就是说也没有 indexer。所以,我们其实不需要这些概念。只需要学好怎么使用和配置 logstash 进程,然后把它运用到你的日志管理架构中最合适它的位置就够了。

语法
  Logstash 设计了自己的 DSL —— 有点像 Puppet 的 DSL,或许因为都是用 Ruby 语言写的吧 —— 包括有区域,注释,数据类型(布尔值,字符串,数值,数组,哈希),条件判断,字段引用等。

区段(section)
  Logstash 用 {} 来定义区域。区域内可以包括插件区域定义,你可以在一个区域内定义多个插件。插件区域内则可以定义键值对设置。示例如下:

input {
    stdin {}
    syslog {}
}

数据类型
  Logstash 支持少量的数据值类型:

  • bool

    debug => true

  • string

    host => "hostname"

  • number

    port => 514

  • array

    match => ["datetime", "UNIX", "ISO8601"]

  • hash

    options => {
    key1 => "value1",
    key2 => "value2"
    }

注意:如果你用的版本低于 1.2.0,哈希的语法跟数组是一样的,像下面这样写:

match => [ "field1", "pattern1", "field2", "pattern2" ]

字段引用(field reference)
  字段是 Logstash::Event 对象的属性。我们之前提过事件就像一个哈希一样,所以你可以想象字段就像一个键值对。

小贴士:我们叫它字段,因为 Elasticsearch 里是这么叫的。

  如果你想在 Logstash 配置中使用字段的值,只需要把字段的名字写在中括号 [] 里就行了,这就叫字段引用

  对于 嵌套字段(也就是多维哈希表,或者叫哈希的哈希),每层的字段名都写在 [] 里就可以了。比如,你可以从 geoip 里这样获取 longitude 值(是的,这是个笨办法,实际上有单独的字段专门存这个数据的):

[geoip][location][0]

小贴士:logstash 的数组也支持倒序下标,即[geoip][location][-1] 可以获取数组最后一个元素的值。

  Logstash 还支持变量内插,在字符串里使用字段引用的方法是这样:

"the longitude is %{[geoip][location][0]}"

条件判断(condition)
  Logstash从 1.3.0 版开始支持条件判断和表达式。

  表达式支持下面这些操作符:

  • equality, etc: ==, !=, <, >, <=, >=
  • regexp: =~, !~
  • inclusion: in, not in
  • boolean: and, or, nand, xor
  • unary: !()

  通常来说,你都会在表达式里用到字段引用。比如:

if "_grokparsefailure" not in [tags] {
} else if [status] !~ /^2\d\d/ and [url] == "/noc.gif" {
} else {
}

命令行参数
  Logstash 提供了一个 bash 脚本叫 logstash 方便快速运行。它支持一下参数:

  • -e

  意即执行。我们在 "Hello World" 的时候已经用过这个参数了。事实上你可以不写任何具体配置,直接运行 bin/logstash -e '' 达到相同效果。这个参数的默认值是下面这样:

input {
    stdin { }
}
output {
    stdout { }
}
  • --config 或 -f

  意即文件。真实运用中,我们会写很长的配置,甚至可能超过 bash 所能支持的 1024 个字符长度。所以我们必把配置固化到文件里,然后通过 bin/logstash -f agent.conf 这样的形式来运行。

  此外,logstash 还提供一个方便我们规划和书写配置的小功能。你可以直接用 bin/logstash -f /etc/logstash.d/ 来运行。logstash 会自动读取 /etc/logstash.d/ 目录下所有的文本文件,然后在自己内存里拼接成一个完整的大配置文件,再去执行。

  • --configtest 或 -t

  意即测试。用来测试 Logstash 读取到的配置文件语法是否能正常解析。Logstash 配置语法是用 grammar.treetop 定义的。尤其是使用了上一条提到的读取目录方式的读者,尤其要提前测试。

  • --log 或 -l

  意即日志。Logstash 默认输出日志到标准错误。生产环境下你可以通过 bin/logstash -l logs/logstash.log 命令来统一存储日志。

  • --filterworkers 或 -w

  意即工作线程。Logstash 会运行多个线程。你可以用 bin/logstash -w 5 这样的方式强制 Logstash 为过滤插件运行 5 个线程。

注意:Logstash目前还不支持输入插件的多线程。而输出插件的多线程需要在配置内部设置,这个命令行参数只是用来设置过滤插件的!

  提示:Logstash 目前不支持对过滤器线程的监测管理。如果 filterworker 挂掉,Logstash 会处于一个无 filter 的僵死状态。这种情况在使用 filter/ruby 自己写代码时非常需要注意,很容易碰上 NoMethodError: undefined method '' for nil:NilClass 错误。需要妥善处理,提前判断。*

  • --pluginpath 或 -P

  可以写自己的插件,然后用 bin/logstash --pluginpath /path/to/own/plugins 加载它们。

  • --verbose

  输出一定的调试日志。

  小贴士:如果你使用的 Logstash 版本低于 1.3.0,你只能用 bin/logstash -v 来代替。

  • --debug

  输出更多的调试日志。

  小贴士:如果你使用的 Logstash 版本低于 1.3.0,你只能用 bin/logstash -vv 来代替。

输入插件(Input)

  在 "Hello World" 示例中,我们已经见到并介绍了 logstash 的运行流程和配置的基础语法。从这章开始,我们就要逐一介绍 logstash 流程中比较常用的一些插件,并在介绍中针对其主要适用的场景,推荐的配置,作一些说明。

  限于篇幅,接下来内容中,配置示例不一定能贴完整。请记住一个原则:Logstash 配置一定要有一个 input 和一个 output。在演示过程中,如果没有写明 input,默认就会使用 "hello world" 里我们已经演示过的 input/stdin ,同理,没有写明的 output 就是 output/stdout。

  以上请读者自明。

标准输入(Stdin)

  我们已经见过好几个示例使用 stdin 了。这也应该是 logstash 里最简单和基础的插件了。

  所以,在这段中,我们可以学到一些未来每个插件都会有的一些方法。

配置示例

input {
    stdin {
        add_field => {"key" => "value"}
        codec => "plain"
        tags => ["add"]
        type => "std"
    }
}

运行结果
  用上面的新 stdin 设置重新运行一次最开始的 hello world 示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash -f stdin.conf。输入 "hello world" 并回车后,你会在终端看到如下输出:

➜ /Users/lion/tar/logstash-2.2.2 >bin/logstash -e 'input {
    stdin {
        add_field => {"key" => "value"}
        codec => "plain"
        tags => ["add"]
        type => "std"
    }
}'
Settings: Default pipeline workers: 8
Logstash startup completed
hello world
{
       "message" => "hello world",
      "@version" => "1",
    "@timestamp" => "2016-03-28T11:39:30.667Z",
          "type" => "std",
           "key" => "value",
          "tags" => [
        [0] "add"
    ],
          "host" => "liondeMacBook-Pro.local"
}

解释
  type 和 tags 是 logstash 事件中两个特殊的字段。通常来说我们会在输入区段中通过 type 来标记事件类型 —— 我们肯定是提前能知道这个事件属于什么类型的。而 tags 则是在数据处理过程中,由具体的插件来添加或者删除的。

  最常见的用法是像下面这样:

input {
    stdin {
        type => "web"
    }
}
filter {
    if [type] == "web" {
        grok {
            match => ["message", %{COMBINEDAPACHELOG}]
        }
    }
}
output {
    if "_grokparsefailure" in [tags] {
        nagios_nsca {
            nagios_status => "1"
        }
    } else {
        elasticsearch {
        }
    }
}

  看起来蛮复杂的,对吧?

  继续学习,你也可以写出来的。

读取文件(File)

  分析网站访问日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用 logstash 来处理日志文件。

  Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。所以,不要担心 logstash 会漏过你的数据。

  sincedb 文件中记录了每个被监听的文件的 inode, major number, minor number 和 pos。

配置文化示例

input
    file {
        path => ["/var/log/*.log", "/var/log/message"]
        type => "system"
        start_position => "beginning"
    }
}

解释
  有一些比较有用的配置项,可以用来指定 FileWatch 库的行为:

  • discover_interval

  logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。

  • exclude

  不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。

  • sincedb_path

  如果你不想用默认的 $HOME/.sincedb(Windows 平台上在 C:\Windows\System32\config\systemprofile.sincedb),可以通过这个配置定义 sincedb 文件到其他位置。

  • sincedb_write_interval

  logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。

  • stat_interval

  logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。

  • start_position

  logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取,有点类似 cat,但是读到最后一行不会终止,而是继续变成 tail -F。
  Logstash启动后,会在系统中记录一个隐藏文件,记录处理过的行号,
  当进行挂掉,重新启动后,根据该行号记录续读。
  所以start_position只会生效一次。

注意
  1. 通常你要导入原有数据进 Elasticsearch 的话,你还需要 filter/date 插件来修改默认的"@timestamp" 字段值。稍后会学习这方面的知识。
  2. FileWatch 只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。
  3. LogStash::Inputs::File 只是在进程运行的注册阶段初始化一个 FileWatch 对象。所以它不能支持类似 fluentd 那样的 path => "/path/to/%{+yyyy/MM/dd/hh}.log" 写法。达到相同目的,你只能写成 path => "/path/to////.log"。
  4. start_position 仅在该文件从未被监听过的时候起作用。如果 sincedb 文件中已经有这个文件的 inode 记录了,那么 logstash 依然会从记录过的 pos 开始读取数据。所以重复测试的时候每回需要删除 sincedb 文件。
  5. 因为 windows 平台上没有 inode 的概念,Logstash 某些版本在 windows 平台上监听文件不是很靠谱。windows 平台上,推荐考虑使用 nxlog 作为收集端,参阅本书稍后章节。

读取网络数据(TCP)

  未来你可能会用 Redis 服务器或者其他的消息队列系统来作为 logstash broker 的角色。不过 Logstash 其实也有自己的 TCP/UDP 插件,在临时任务的时候,也算能用,尤其是测试环境。

小贴士:虽然 LogStash::Inputs::TCP 用 Ruby 的 Socket 和 OpenSSL 库实现了高级的 SSL 功能,但 Logstash 本身只能在 SizedQueue 中缓存 20 个事件。这就是我们建议在生产环境中换用其他消息队列的原因。

配置示例

input {
    tcp {
        port => 8888
        mode => "server"
        ssl_enable => false
    }
}

常见场景
  目前来看,LogStash::Inputs::TCP 最常见的用法就是配合 nc 命令导入旧数据。在启动 logstash 进程后,在另一个终端运行如下命令即可导入数据:

# nc 127.0.0.1 8888 < olddata

  这种做法比用 LogStash::Inputs::File 好,因为当 nc 命令结束,我们就知道数据导入完毕了。而用 input/file 方式,logstash 进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。

生成测试数据(Generator)

  实际运行的时候这个插件是派不上用途的,但这个插件依然是非常重要的插件之一。因为每一个使用 ELK stack 的运维人员都应该清楚一个道理:数据是支持操作的唯一真理(否则你也用不着 ELK)。所以在上线之前,你一定会需要在自己的实际环境中,测试 Logstash 和 Elasticsearch 的性能状况。这时候,这个用来生成测试数据的插件就有用了!

配置示例

input {
    generator {
        count => 10
        message => '{"key1":"value1","key2":[1,2],"key3":{"subkey1":"subvalue1"}}'
        codec => json
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

  插件的默认生成数据,message 内容是 "hello world"。你可以根据自己的实际需要这里来写其他内容。

使用方式
  做测试有两种主要方式:

  • 配合 LogStash::Outputs::Null

  inputs/generator 是无中生有,output/null 则是锯嘴葫芦。事件流转到这里直接就略过,什么操作都不做。相当于只测试 Logstash 的 pipe 和 filter 效率。测试过程非常简单:

➜ /Users/lion/tar/logstash-2.2.2 >time ./bin/logstash -f generator_null.conf
Settings: Default pipeline workers: 8
Logstash startup completed
..........Logstash shutdown completed
./bin/logstash -f generator_null.conf  20.42s user 1.18s system 285% cpu 7.561 total
  • 使用 pv 命令配合 LogStash::Outputs::Stdout 和 LogStash::Codecs::Dots

  上面的这种方式虽然想法挺好,不过有个小漏洞:logstash 是在 JVM 上运行的,有一个明显的启动时间,运行也有一段事件的预热后才算稳定运行。所以,要想更真实的反应 logstash 在长期运行时候的效率,还有另一种方法

output {
    stdout {
        codec => dots
    }
}

  LogStash::Codecs::Dots 也是一个另类的 codec 插件,他的作用是:把每个 event 都变成一个点(.)。这样,在输出的时候,就变成了一个一个的 . 在屏幕上。显然这也是一个为了测试而存在的插件。

  下面就要介绍 pv

pv命令在Mac、Debian、RedHat中默认是不存在的,需要安装命令了。

  这个命令的作用,就是作实时的标准输入、标准输出监控。我们这里就用它来监控标准输出:

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f generator_dots.conf | pv -abt > /dev/null
 287 B 0:00:05 [52.7 B/s]

  可以很明显的看到在前几秒中,速度是 0 B/s,因为 JVM 还没启动起来呢。开始运行的时候,速度依然不快。慢慢增长到比较稳定的状态,这时候的才是你需要的数据。

  这里单位是 B/s,但是因为一个 event 就输出一个 .,也就是 1B。所以 12.5kiB/s 就相当于是 12.5k event/s。

  注:如果你在 CentOS 上通过 yum 安装的 pv 命令,版本较低,可能还不支持 -a 参数。单纯靠 -bt 参数看起来还是有点累的。

额外的话
  既然单独花这么一节来说测试,这里想额外谈谈一个很常见的话题: ELK 的性能怎么样?
  其实这压根就是一个不正确的提问。ELK 并不是一个软件而是一个并不耦合的套件。所以,我们需要分拆开讨论这三个软件的性能如何?怎么优化?

  LogStash 的性能,是最让新人迷惑的地方。因为 LogStash 本身并不维护队列,所以整个日志流转中任意环节的问题,都可能看起来像是 LogStash 的问题。这里,需要熟练使用本节说的测试方法,针对自己的每一段配置,都确定其性能。另一方面,就是本书之前提到过的,LogStash 给自己的线程都设置了单独的线程名称,你可以在 top -H 结果中查看具体线程的负载情况。

  Elasticsearch 的性能。这里最需要强调的是:Elasticsearch 是一个分布式系统。从来没有分布式系统要跟人比较单机处理能力的说法。所以,更需要关注的是:在确定的单机处理能力的前提下,性能是否能做到线性扩展。当然,这不意味着说提高处理能力只能靠加机器了——有效利用 mapping API 是非常重要的。不过暂时就在这里讲述了。

  Kibana 的性能。通常来说,Kibana 只是一个单页 Web 应用,只需要 nginx 发布静态文件即可,没什么性能问题。页面加载缓慢,基本上是因为 Elasticsearch 的请求响应时间本身不够快导致的。不过一定要细究的话,也能找出点 Kibana 本身性能相关的话题:因为 Kibana3 默认是连接固定的一个 ES 节点的 IP 端口的,所以这里会涉及一个浏览器的同一 IP 并发连接数的限制。其次,就是 Kibana 用的 AngularJS 使用了 Promise.then 的方式来处理 HTTP 请求响应。这是异步的。

读取 Redis 数据

  Redis 服务器是 logstash 官方推荐的 broker 选择。Broker 角色也就意味着会同时存在输入和输出俩个插件。这里我们先学习输入插件。

  LogStash::Inputs::Redis 支持三种 data_type(实际上是redis_type),不同的数据类型会导致实际采用不同的 Redis 命令操作:

  • list => BLPOP
  • channel => SUBSCRIBE
  • pattern_channel => PSUBSCRIBE

  注意到了么?这里面没有 GET 命令!

  Redis 服务器通常都是用作 NoSQL 数据库,不过 logstash 只是用来做消息队列。所以不要担心 logstash 里的 Redis 会撑爆你的内存和磁盘。

配置示例redis.conf

input {
    redis {
        data_type => "pattern_channel" #logstash redis插件工作方式
        key => "logstash-*" #监听的键值
        host => "127.0.0.1"
        port => 6379
        threads => 1 #启用线程数量
        codec => "plain"
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

基本使用方法
  先打开终端运行启动LogStash Redis的监听命令

➜ /Users/lion/tar/logstash-2.2.2 >bin/logstash -f redis.conf
Settings: Default pipeline workers: 8
Logstash startup completed

  然后确认你设置的 host 服务器上已经运行了 redis-server 服务,打开另一个终端,输入 redis-cli 命令(先安装好 redis 软件包),在交互式提示符后面输入PUBLISH logstash-demochan "hello world":

➜ /Users/lion >redis-cli
127.0.0.1:6379> PUBLISH logstash-demochan "hello world"
(integer) 1
127.0.0.1:6379>

  你会在第一个终端里看到 logstash 进程输出类似下面这样的内容:

{
       "message" => "hello world",
      "@version" => "1",
    "@timestamp" => "2016-03-29T03:37:25.763Z"
}

  注意:这个事件里没有 host 字段!(或许这算是 bug……)

输入JSON数据
  如果你想通过 redis 的频道给 logstash 事件添加更多字段,直接向频道发布 JSON 字符串就可以了。 LogStash::Inputs::Redis 会直接把 JSON 转换成事件。

  继续在第二个终端的交互式提示符下输入如下内容:

➜ /Users/lion >redis-cli
127.0.0.1:6379> PUBLISH logstash-demochan "hello world"
(integer) 1
127.0.0.1:6379> PUBLISH logstash-chan '{"message":"hello world","@version":"1","@timestamp":"2014-08-08T16:34:21.865Z","host":"raochenlindeMacBook-Air.local","key1":"value1"}'
(integer) 1
127.0.0.1:6379>

  你会看到第一个终端里的 logstash 进程随即也返回新的内容,如下所示:

{
       "message" => "{\"message\":\"hello world\",\"@version\":\"1\",\"@timestamp\":\"2014-08-08T16:34:21.865Z\",\"host\":\"raochenlindeMacBook-Air.local\",\"key1\":\"value1\"}",
      "@version" => "1",
    "@timestamp" => "2016-03-29T03:40:21.749Z"
}

  看,新的字段出现了!现在,你可以要求开发工程师直接向你的 redis 频道发送信息好了,一切自动搞定。

小贴士:这里我们建议的是使用 pattern_channel 作为输入插件的 data_type 设置值。因为实际使用中,你的 redis 频道可能有很多不同的 keys,一般命名成 logstash-chan-%{type} 这样的形式。这时候 pattern_channel 类型就可以帮助你一次订阅全部 logstash 相关频道!

扩展方式
  如上段"小贴士"提到的,之前两个使用场景采用了同样的配置,即数据类型为频道发布订阅方式。这种方式在需要扩展 logstash 成多节点集群的时候,会出现一个问题:通过频道发布的一条信息,会被所有订阅了该频道的 logstash 进程同时接收到,然后输出重复内容!

  你可以尝试再做一次上面的实验,这次在两个终端同时启动 logstash -f redis.conf 进程,结果会是两个终端都输出消息。

  这种时候,就需要用 list 类型。在这种类型下,数据输入到 redis 服务器上暂存,logstash 则连上 redis 服务器取走 (BLPOP 命令,所以只要 logstash 不堵塞,redis 服务器上也不会有数据堆积占用空间)数据。

配置示例(redis-list.conf)

input {
    redis {
        batch_count => 1
        data_type => "list" #logstash redis插件工作方式
        key => "logstash-list" #监听的键值
        host => "127.0.0.1"
        port => 6379
        threads => 5 #启用线程数量
        codec => "plain"
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

使用方法
  这次我们同时在两个终端运行 logstash redis 进程。

➜ /Users/lion/tar/logstash-2.2.2 >bin/logstash -f redis-list.conf

  然后在第三个终端里启动 redis-cli 命令交互:

➜ /Users/lion >redis-cli
127.0.0.1:6379> RPUSH logstash-list "hello world"
(integer) 1
127.0.0.1:6379>

  这时候你可以看到,只有一个终端输出了结果。

  连续 RPUSH 几次,可以看到两个终端近乎各自输出一半条目

小贴士:RPUSH 支持 batch 方式,修改 logstash 配置中的 batch_count 值,作为示例这里只改到 2,实际运用中可以更大(事实上 LogStash::Outputs::Redis 对应这点的 batch_event 配置默认值就是 50)。

  重启 logstash 进程后,redis-cli 命令中改成如下发送:

➜ /Users/lion >redis-cli
127.0.0.1:6379> RPUSH logstash-list "hello world" "hello world" "hello world" "hello world" "hello world" "hello world"
(integer) 6
127.0.0.1:6379>

  可以看到,两个终端也各自输出一部分结果。而你只用了一次 RPUSH 命令。

编码插件(Codec)

  Codec 是 logstash 从 1.3.0 版开始新引入的概念(Codec 来自 Coder/decoder 两个单词的首字母缩写)。

  在此之前,logstash 只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入 期处理不同类型的数据,这全是因为有了 codec 设置。

  所以,这里需要纠正之前的一个概念。Logstash 不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流!codec 就是用来 decode、encode 事件的。

  codec 的引入,使得 logstash 可以更好更方便的与其他有自定义数据格式的运维产品共存,比如 graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用数据格式的其他产品等。

  事实上,我们在第一个 "hello world" 用例中就已经用过 codec 了 —— rubydebug 就是一种 codec!虽然它一般只会用在 stdout 插件中,作为配置测试或者调试的工具。

采用 JSON 编码

  在早期的版本中,有一种降低 logstash 过滤器的 CPU 负载消耗的做法盛行于社区(在当时的 cookbook 上有专门的一节介绍):直接输入预定义好的 JSON 数据,这样就可以省略掉 filter/grok 配置!

  这个建议依然有效,不过在当前版本中需要稍微做一点配置变动 —— 因为现在有专门的 codec 设置。

配置示例
  这里的示例,我会向某一个Log文件中打印出json的格式进去,json格式如下:

{"type":"track","event":"index","time":1459231333892,"distinct_id":"1","properties":{"$lib":"php","$lib_version":"1.2.0","$ip":"127.0.0.1","isGuest":false,"source":"PC","Browser":"Chrome","Browser_OS":"MAC","URL_Referer":"\u6ca1\u6709\u6765\u6e90"}}

  Logstash配置文件格式(log.conf):

input {
    file {
        path => "/Users/lion/logs/bigdata.log.20160329"
        codec => "json"
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

运行结果
  先在一个终端中打开Logstash进行监听:

➜ /Users/lion/tar/logstash-2.2.2 >bin/logstash -f log.conf
Settings: Default pipeline workers: 8
Logstash startup completed

  这时在另一个终端中向日志文件中加入一段 json 串,然后你会看到 logstash 进程输出类似下面这样的内容:

➜ /Users/lion/tar/logstash-2.2.2 >bin/logstash -f nginx.conf
Settings: Default pipeline workers: 8
Logstash startup completed
{
           "type" => "track",
          "event" => "index",
           "time" => 1459231622002,
    "distinct_id" => "1",
     "properties" => {
                "$lib" => "php",
        "$lib_version" => "1.2.0",
                 "$ip" => "127.0.0.1",
             "isGuest" => false,
              "source" => "PC",
             "Browser" => "Chrome",
          "Browser_OS" => "MAC",
         "URL_Referer" => "tsy.com"
    },
       "@version" => "1",
     "@timestamp" => "2016-03-29T06:07:02.235Z",
           "path" => "/Users/lion/logs/bigdata.log.20160329",
           "host" => "liondeMacBook-Pro.local"
}

合并多行数据(Multiline)

  有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。

  而 logstash 正为此准备好了 codec/multiline 插件!

小贴士:multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。

配置示例(multiline.conf)

input {
    stdin {
        codec => multiline {
            pattern => "^2015"
            negate => true
            what => "previous"
        }
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

运行结果
  运行 logstash 进程,然后在等待输入的终端中输入如下几行数据,然后会看到输出的结果 :

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f multiline.conf
Settings: Default pipeline workers: 8
Logstash startup completed
aaa
bbb
2015
{
    "@timestamp" => "2016-03-29T09:24:00.185Z",
       "message" => "aaa\nbbb",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "liondeMacBook-Pro.local"
}

  在 "message" 字段里存储了两行数据!

小贴士:你可能注意到输出的事件中都没有最后的"2015"字符串。这是因为logstash 还得等下一行数据直到匹配成功后才会输出这个事件。

Log4J 的另一种方案
  说到应用程序日志,log4j 肯定是第一个被大家想到的。使用 codec/multiline 也确实是一个办法。

  不过,如果你本事就是开发人员,或者可以推动程序修改变更的话,logstash 还提供了另一种处理 log4j 的方式:input/log4j。与 codec/multiline 不同,这个插件是直接调用了 org.apache.log4j.spi.LoggingEvent 处理 TCP 端口接收的数据。

过滤器插件(Filter)

  丰富的过滤器插件的存在是 logstash 威力如此强大的重要因素。名为过滤器,其实提供的不单单是过滤的功能。在本章我们就会重点介绍几个插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有的添加新的 logstash 事件到后续的流程中去!

Grok 正则捕获

  Grok 是 Logstash 最重要的插件。你可以在 grok 里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。

正则表达式语法(grok.conf)
  现在给我们的配置文件添加第一个过滤器区段配置。配置要添加在输入和输出区段之间(logstash 执行区段的时候并不依赖于次序,不过为了自己看得方便,还是按次序书写吧):

input {stdin{}}
filter {
    grok {
        match => {
            "message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"
        }
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

  运行 logstash 进程然后输入 "begin 123.456 end",你会看到类似下面这样的输出:

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f grok.conf
Settings: Default pipeline workers: 8
Logstash startup completed
begin 123.456 end
{
         "message" => "begin 123.456 end",
        "@version" => "1",
      "@timestamp" => "2016-03-29T06:25:13.213Z",
            "host" => "liondeMacBook-Pro.local",
    "request_time" => "123.456"
}

  漂亮!不过数据类型好像不太满意……request_time 应该是数值而不是字符串。

  我们已经提过稍后会学习用 LogStash::Filters::Mutate 来转换字段值类型,不过在 grok 里,其实有自己的魔法来实现这个功能!

Grok 表达式语法
  Grok 支持把预定义的 grok 表达式 写入到文件中,官方提供的预定义 grok 表达式见:https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns。

  下面是从官方文件中摘抄的最简单但是足够说明用法的示例:

USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}

  第一行,用普通的正则表达式来定义一个 grok 表达式;第二行,通过打印赋值格式,用前面定义好的 grok 表达式来定义另一个 grok 表达式。

  grok 表达式的打印复制格式的完整语法是下面这样的:

%{PATTERN_NAME:capture_name:data_type}

小贴士:data_type 目前只支持两个值:int 和 float。

  所以我们可以改进我们的配置成下面这样:

input {stdin{}}
filter {
    grok {
        match => {
            "message" => "%{WORD:s1} %{NUMBER:request_time:float} %{WORD}"
        }
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

  重新运行进程然后可以得到如下结果:

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f grok.conf
Settings: Default pipeline workers: 8
Logstash startup completed
begin 123.456 end
{
         "message" => "begin 123.456 end",
        "@version" => "1",
      "@timestamp" => "2016-03-29T06:28:52.589Z",
            "host" => "liondeMacBook-Pro.local",
              "s1" => "begin",
    "request_time" => 123.456
}

  这次 request_time 变成数值类型了。

最佳实践
  实际运用中,我们需要处理各种各样的日志文件,如果你都是在配置文件里各自写一行自己的表达式,就完全不可管理了。所以,我们建议是把所有的 grok 表达式统一写入到一个地方。然后用 filter/grok 的 patterns_dir 选项来指明。

  如果你把 "message" 里所有的信息都 grok 到不同的字段了,数据实质上就相当于是重复存储了两份。所以你可以用 remove_field 参数来删除掉 message 字段,或者用 overwrite 参数来重写默认的 message 字段,只保留最重要的部分。

  重写参数的示例如下:

filter {
    grok {
        patterns_dir => "/path/to/your/own/patterns"
        match => {
            "message" => "%{SYSLOGBASE} %{DATA:message}"
        }
        overwrite => ["message"]
    }
}

多项选择
  有时候我们会碰上一个日志有多种可能格式的情况。这时候要写成单一正则就比较困难,或者全用 | 隔开又比较丑陋。这时候,logstash 的语法提供给我们一个有趣的解决方式。

  文档中,都说明 logstash/filters/grok 插件的 match 参数应该接受的是一个 Hash 值。但是因为早期的 logstash 语法中 Hash 值也是用 [] 这种方式书写的,所以其实现在传递 Array 值给 match 参数也完全没问题。所以,我们这里其实可以传递多个正则来匹配同一个字段:

match => [
    "message", "(?<request_time>\d+(?:\.\d+)?)",
    "message", "%{SYSLOGBASE} %{DATA:message}",
    "message", "(?m)%{WORD}"
]

  logstash 会按照这个定义次序依次尝试匹配,到匹配成功为止。虽说效果跟用 | 分割写个大大的正则是一样的,但是可阅读性好了很多。

  最后也是最关键的,我强烈建议每个人都要使用 [Grok Debugger][Grok Debugger] 来调试自己的 grok 表达式
  [Grok Debugger]: http://grokdebug.herokuapp.com/?# "Grok Debugger"

时间处理(Date)

  之前章节已经提过,filters/date 插件可以用来转换你的日志记录中的时间字符串,变成 LogStash::Timestamp 对象,然后转存到 @timestamp 字段里。

  注意:因为在稍后的 outputs/elasticsearch 中常用的 %{+YYYY.MM.dd} 这种写法必须读取 @timestamp 数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用 filters/date 转换后删除自己的字段!

  这在导入旧数据的时候固然非常有用,而在实时数据处理的时候同样有效,因为一般情况下数据流程中我们都会有缓冲区,导致最终的实际处理时间跟事件产生时间略有偏差。

配置示例
  filters/date 插件支持五种时间格式:

ISO8601

  类似 "2011-04-19T03:44:01.103Z" 这样的格式。具体Z后面可以有 "08:00"也可以没有,".103"这个也可以没有。常用场景里来说,Nginx 的 log_format 配置里就可以使用 $time_iso8601 变量来记录请求时间成这种格式。

UNIX

  UNIX 时间戳格式,记录的是从 1970 年起始至今的总秒数。Squid 的默认日志格式中就使用了这种格式。

UNIX_MS

  这个时间戳则是从 1970 年起始至今的总毫秒数。据我所知,JavaScript 里经常使用这个时间格式。

TAI64N

  TAI64N 格式比较少见,是这个样子的:@4000000052f88ea32489532c。我目前只知道常见应用中, qmail 会用这个格式。

Joda-Time 库

  Logstash 内部使用了 Java 的 Joda 时间库来作时间处理。所以我们可以使用 Joda 库所支持的时间格式来作具体定义。Joda 时间格式定义见下表:

时间格式

Symbol Meaning Presentation Examples
G era text AD
C century of era (>=0) number 20
Y year of era (>=0) year 1996
x weekyear year 1996
w week of weekyear number 27
e day of week number 2
E day of week text Tuesday; Tue
y year year 1996
D day of year number 189
M month of year month July; Jul; 07
d day of month number 10
a halfday of day text PM
K hour of halfday (0~11) number 0
h clockhour of halfday (1~12) number 12
H hour of day (0~23) number 0
k clockhour of day (1~24) number 24
m minute of hour number 30
s second of minute number 55
S fraction of second number 978
z time zone text Pacific Standard Time; PST
Z time zone offset/id zone -0800; -08:00; America/Los_Angeles
' escape for text delimiter
'' single quote literal '

  下面我们写一个 Joda 时间格式的配置作为示例(date.conf):

input {stdin{}}
filter {
    grok {
        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
    }
    date {
        match => ["logdate", "YYYY-MM-dd HH:mm:ss"]
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

  在控制台输入启动命令后,输入『2016-03-29 17:03:00』,然后可以看到结果 :

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f date.conf
Settings: Default pipeline workers: 8
Logstash startup completed
2016-03-29 17:03:00
{
       "message" => "2016-03-29 17:03:00",
      "@version" => "1",
    "@timestamp" => "2016-03-29T09:03:00.000Z",
          "host" => "liondeMacBook-Pro.local",
       "logdate" => "2016-03-29 17:03:00"
}

数据修改(Mutate)

  filters/mutate 插件是 Logstash 另一个重要插件。它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。

类型转换
  类型转换是 filters/mutate 插件最初诞生时的唯一功能。其应用场景在之前 Codec/JSON 小节已经提到。

  可以设置的转换类型包括:"integer","float" 和 "string"。示例(mutate1.conf)如下:

input {stdin{}}
filter {
    grok {
        match => {
            "message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"
        }
    }
    mutate {
        convert => ["request_time", "float"]
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

运行结果
  在控制台,启动Logstash以后,输入『begin 123.456 end』,可以看到以下结果 :

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f mutate1.conf
Settings: Default pipeline workers: 8
Logstash startup completed
begin 123.456 end
{
         "message" => "begin 123.456 end",
        "@version" => "1",
      "@timestamp" => "2016-03-29T09:58:22.538Z",
            "host" => "liondeMacBook-Pro.local",
    "request_time" => 123.456
}

  注意:mutate 除了转换简单的字符值,还支持对数组类型的字段进行转换,即将 ["1","2"] 转换成 [1,2]。但不支持对哈希类型的字段做类似处理。有这方面需求的可以采用稍后讲述的 filters/ruby 插件完成。

字符串处理

  • gsub

  仅对字符串类型字段有效,将问号替换成下划线(gsub.conf)

input {stdin{}}
filter {
    mutate {
        gsub => ["message", "[\\?]", "_"]
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

  打开控制台运行程序,可以看到以下结果 :

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f gsub.conf
a.com?#12Settings: Default pipeline workers: 8
Logstash startup completed
a.com?123
{
       "message" => "a.com_123",
      "@version" => "1",
    "@timestamp" => "2016-03-29T10:58:25.270Z",
          "host" => "liondeMacBook-Pro.local"
}
  • split

  配置文件为split.conf

input {stdin{}}
filter {
    mutate {
        split => ["message", "|"]
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

  打开控制台,随意输入一串以|分割的字符,比如 "a|b|c=1|b=*2",可以看到如下输出:

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f split.conf
Settings: Default pipeline workers: 8
Logstash startup completed
a|b|c=1|b=*2
{
       "message" => [
        [0] "a",
        [1] "b",
        [2] "c=1",
        [3] "b=*2"
    ],
      "@version" => "1",
    "@timestamp" => "2016-03-29T11:01:49.100Z",
          "host" => "liondeMacBook-Pro.local"
}
  • join
      配置文件为split.conf
input {stdin{}}
filter {
    mutate {
        split => ["message", "|"]
    }
    mutate {
        join => ["message", ","]
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

  打开控制台,继续输入刚才的字符 "a|b|c=1|b=*2",可以看到之前已经用 split 割切的基础再 join 回去的结果:

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f join.conf
Settings: Default pipeline workers: 8
Logstash startup completed
a|b|c=1|b=*2
{
       "message" => "a,b,c=1,b=*2",
      "@version" => "1",
    "@timestamp" => "2016-03-29T11:13:17.594Z",
          "host" => "liondeMacBook-Pro.local"
}
  • merge

  合并两个数组或者哈希字段。依然在之前 split 的基础上继续,配置文件为merge.conf

input {stdin{}}
filter {
    mutate {
        split => ["message", "|"]
    }
    mutate {
        merge => ["message", "message"]
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

  打开控制台,继续输入刚才的字符 "a|b|c=1|b=*2",可以看到结果:

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f merge.conf
Settings: Default pipeline workers: 8
Logstash startup completed
a|b|c=1|b=*2
{
       "message" => [
        [0] "a",
        [1] "b",
        [2] "c=1",
        [3] "b=*2",
        [4] "a",
        [5] "b",
        [6] "c=1",
        [7] "b=*2"
    ],
      "@version" => "1",
    "@timestamp" => "2016-03-29T11:18:11.018Z",
          "host" => "liondeMacBook-Pro.local"
}

字段处理

  • rename

  重命名某个字段,如果目的字段已经存在,会被覆盖掉:

filter {
    mutate {
        rename => ["syslog_host", "host"]
    }
}
  • update

  更新某个字段的内容。如果字段不存在,不会新建。

  • replace

  作用和 update 类似,但是当字段不存在的时候,它会起到 add_field 参数一样的效果,自动添加新的字段。

执行次序

  需要注意的是,filter/mutate 内部是有执行次序的。其次序如下:

    rename(event) if @rename
    update(event) if @update
    replace(event) if @replace
    convert(event) if @convert
    gsub(event) if @gsub
    uppercase(event) if @uppercase
    lowercase(event) if @lowercase
    strip(event) if @strip
    remove(event) if @remove
    split(event) if @split
    join(event) if @join
    merge(event) if @merge

    filter_matched(event)

  而 filter_matched 这个 filters/base.rb 里继承的方法也是有次序的。

  @add_field.each do |field, value|
  end
  @remove_field.each do |field|
  end
  @add_tag.each do |tag|
  end
  @remove_tag.each do |tag|
  end

GeoIP 地址查询归类

  GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。

配置示例(geoip.conf)

input {stdin{}}
filter {
    geoip {
        source => "message"
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

配置示例(geoip.conf)
  在控制台打开Logstash实例,然后输入『125.33.124.97』,可以看到以下结果 :

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f geoip.conf
Settings: Default pipeline workers: 8
Logstash startup completed
125.33.124.97
{
       "message" => "125.33.124.97",
      "@version" => "1",
    "@timestamp" => "2016-03-30T01:21:49.851Z",
          "host" => "liondeMacBook-Pro.local",
         "geoip" => {
                      "ip" => "125.33.124.97",
           "country_code2" => "CN",
           "country_code3" => "CHN",
            "country_name" => "China",
          "continent_code" => "AS",
             "region_name" => "22",
               "city_name" => "Beijing",
                "latitude" => 39.9289,
               "longitude" => 116.38830000000002,
                "timezone" => "Asia/Harbin",
        "real_region_name" => "Beijing",
                "location" => [
            [0] 116.38830000000002,
            [1] 39.9289
        ]
    }
}

  GeoIP 库数据较多,如果你不需要这么多内容,可以通过 fields 选项指定自己所需要的。下例为全部可选内容:

filter {
    geoip {
        fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
    }
}

  需要注意的是:geoip.location 是 logstash 通过 latitude 和 longitude 额外生成的数据。所以,如果你是想要经纬度又不想重复数据的话,应该像下面这样做:

input {stdin{}}
filter {
    geoip {
        source => "message"
        fields=>[
            "city_name",
            "country_code2",
            "country_name",
            "latitude",
            "longitude",
            "region_name"
        ]
    }
    mutate{
        remove_field=>[
            "[geoip][latitude]",
            "[geoip][longitude]"
        ]
    }


}
output{
    stdout{
        codec=>rubydebug
    }
}

  在控制台,重新启动程序 ,可以看到以下结果 :

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f geoip.conf
Settings: Default pipeline workers: 8
Logstash startup completed
125.33.124.97
{
       "message" => "125.33.124.97",
      "@version" => "1",
    "@timestamp" => "2016-03-30T01:33:49.282Z",
          "host" => "liondeMacBook-Pro.local",
         "geoip" => {
        "country_code2" => "CN",
         "country_name" => "China",
          "region_name" => "22",
            "city_name" => "Beijing"
    }
}

小贴士
geoip 插件的 "source" 字段可以是任一处理后的字段,比如 "client_ip",但是字段内容却需要小心!geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip.字段。

  所以读者在测试时,如果使用了诸如 127.0.0.1, 172.16.0.1, 182.168.0.1, 10.0.0.1 等内网地址,会发现没有对应输出!

JSON 编解码

  已经讲过在 codec 中使用 JSON 编码。但是,有些日志可能是一种复合的数据结构,其中只是一部分记录是 JSON 格式的。这时候,我们依然需要在 filter 阶段,单独启用 JSON 解码插件。

配置示例(json1.conf)

input {stdin{}}
filter {
    json {
        source => "message"
        target => "jsoncontent"
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

运行结果

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f json1.conf
Settings: Default pipeline workers: 8
Logstash startup completed
{"uid":1,"email":"xxx@mshk.org"}
{
        "message" => "{\"uid\":1,\"email\":\"xxx@mshk.org\"}",
       "@version" => "1",
     "@timestamp" => "2016-03-30T01:47:07.798Z",
           "host" => "liondeMacBook-Pro.local",
    "jsoncontent" => {
          "uid" => 1,
        "email" => "xxx@mshk.org"
    }
}

  如果不打算使用多层结构的话,删掉 target 配置即可。新的结果如下:

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f json1.conf
Settings: Default pipeline workers: 8
Logstash startup completed
{"uid":1,"email":"xxx@mshk.org"}
{
       "message" => "{\"uid\":1,\"email\":\"xxx@mshk.org\"}",
      "@version" => "1",
    "@timestamp" => "2016-03-30T01:49:39.775Z",
          "host" => "liondeMacBook-Pro.local",
           "uid" => 1,
         "email" => "xxx@mshk.org"
}

split 拆分事件

  我们通过 multiline 插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是 split 插件。

配置示例

input {stdin{}}
filter {
    split {
        field => "message"
        terminator => "#"
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

运行结果
  这个测试中,我们在 intputs/stdin 的终端中输入一行数据:"test1#test2",结果看到输出两个事件:

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f split_event.conf
Settings: Default pipeline workers: 8
Logstash startup completed
test1#test2
{
       "message" => "test1",
      "@version" => "1",
    "@timestamp" => "2016-03-30T02:00:52.812Z",
          "host" => "liondeMacBook-Pro.local"
}
{
       "message" => "test2",
      "@version" => "1",
    "@timestamp" => "2016-03-30T02:00:52.812Z",
          "host" => "liondeMacBook-Pro.local"
}

输出插件(Output)

保存到Elasticsearch

  Logstash 可以试用不同的协议实现完成将数据写入 Elasticsearch 的工作。在不同时期,也有不同的插件实现方式。本节以最新版为准,即主要介绍 HTTP 方式。同时也附带一些原有的 node 和 transport 方式的介绍。

  Nginx的日志格式化示例

    log_format  main  '$remote_addr - $remote_user [$time_iso8601] "$request" '
                                        '$status $body_bytes_sent "$http_referer" '
                                        '"$http_user_agent" "$http_x_forwarded_for" '
                                        '"$gzip_ratio" $request_time $bytes_sent $request_length '
                                        '"$upstream_addr" $upstream_status $upstream_response_time';

  Nginx日志输出的结果

127.0.0.1 - - [2016-04-08T12:03:35+08:00] "GET /static/common/static/json/game2goodlist.json?_=1460088215502 HTTP/1.1" 200 1593560 "http://tsy.com/trades/list-trade/continue-to-recharge-list" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36" "-" "-" 0.019 1593881 982 "-" - -
127.0.0.1 - - [2016-04-08T12:03:35+08:00] "GET /static/common/static/json/gamelist.json?_=1460088215503 HTTP/1.1" 200 192295 "http://tsy.com/trades/list-trade/continue-to-recharge-list" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36" "-" "-" 0.000 192614 977 "-" - -

配置示例(elasticsearche.conf)

input {
    file {
        path => ["/Users/lion/logs/tsy.access.log"]
        sincedb_path => "/Users/lion/tar/logstash-2.2.2/sincedb_path/a"
        start_position => "beginning"
    }
}
filter {
    grok {
        match => {
            "message" => "%{IPORHOST:remote_addr} - (%{USER:remote_user}|-) \[%{TIMESTAMP_ISO8601:time_local}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version:float})?|-)\" %{NUMBER:request_status:int} %{NUMBER:bytes:int} \"(%{NOTSPACE:referrer}|-)\" \"(%{DATA:agent}|-)\" \"(%{DATA:http_x_forwarded_for}|-)\" \"(%{NOTSPACE:gzip_ratio}|-)\" %{NUMBER:request_time:float} %{NUMBER:bytes_sent:int} %{NUMBER:request_length:int} \"(%{DATA:upstream_addr}|-)\" (%{NUMBER:upstream_status:int}|-) (%{NUMBER:upstream_status:float}|-)"
        }
    }
    geoip {
        source => "remote_addr"
    }
    #geoip 插件的 "source" 字段可以是任一处理后的字段,比如 "client_ip",但是字段内容却需要小心!geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip.字段。
#    kv {
#        source => "request"
#        field_split => "&?"
#        value_split => "="
#    }
    urldecode {
        all_fields => true
        remove_field => ["message"]
    }
}
output{
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "logstash-nginx-%{+YYYY.MM.dd}"
        document_type => "log"
        workers => 1
        flush_size => 20000
        #idle_flush_time => 10
        template_overwrite => true
    }
    stdout{
        codec=>rubydebug
    }
}

  在浏览器中,刷新页面,让nginx的日志更新,这时在Elasticsearch中后可以看到结果

标准输出(Stdout)

  和之前 inputs/stdin 插件一样,outputs/stdout 插件也是最基础和简单的输出插件。同样在这里简单介绍一下,作为输出插件的一个共性了解。

配置示例

output {
    stdout {
        codec => rubydebug
        workers => 2
    }
}

解释
  输出插件统一具有一个参数是 workers。Logstash 为输出做了多线程的准备。

  其次是 codec 设置。codec 的作用在之前已经讲过。可能除了 codecs/multiline ,其他 codec 插件本身并没有太多的设置项。所以一般省略掉后面的配置区段。换句话说。上面配置示例的完全写法应该是:

output {
    stdout {
        codec => rubydebug {
        }
        workers => 2
    }
}

  单就 outputs/stdout 插件来说,其最重要和常见的用途就是调试。所以在不太有效的时候,加上命令行参数 -vv 运行,查看更多详细调试信息。

保存成文件(File)

  通过日志收集系统将分散在数百台服务器上的数据集中存储在某中心服务器上,这是运维最原始的需求。早年的 scribed ,甚至直接就把输出的语法命名为 。Logstash 当然也能做到这点。

  和 LogStash::Inputs::File 不同, LogStash::Outputs::File 里可以使用 sprintf format 格式来自动定义输出到带日期命名的路径。

配置示例

input {stdin{}}
output{
    file {
        path => "/Users/lion/tar/logstash-2.2.2/%{+yyyy/MM/dd/HH}/output.log.gz"
        message_format => "%{message}"
        gzip => true
    }
}

解释
  使用 output/file 插件首先需要注意的就是 message_format 参数。插件默认是输出整个 event 的 JSON 形式数据的。这可能跟大多数情况下使用者的期望不符。大家可能只是希望按照日志的原始格式保存就好了。所以需要定义为 %{message},当然,前提是在之前的 filter 插件中,你没有使用 remove_field 或者 update 等参数删除或修改 %{message} 字段的内容。

  另一个非常有用的参数是 gzip。gzip 格式是一个非常奇特而友好的格式。其格式包括有:

  • 10字节的头,包含幻数、版本号以及时间戳
  • 可选的扩展头,如原文件名
  • 文件体,包括DEFLATE压缩的数据
  • 8字节的尾注,包括CRC-32校验和以及未压缩的原始数据长度

  这样 gzip 就可以一段一段的识别出来数据 —— 反过来说,也就是可以一段一段压缩了添加在后面!

  这对于我们流式添加数据简直太棒了!

输出到 Redis

配置示例

input {stdin{}}
output{
    redis {
        data_type => "channel"
        key => "logstash-mshk.org-2016.03.29"
    }
}

Usage
  我们还是继续先用 redis-cli 命令行来演示 outputs/redis 插件的实质。

basical use case

  运行 logstash 进程,然后另一个终端启动 redis-cli 命令。输入订阅指定频道的 Redis 命令 ("SUBSCRIBE logstash-mshk.org-2016.03.29") 后,首先会看到一个订阅成功的返回信息。如下所示:

➜ /Users/lion >redis-cli
127.0.0.1:6379> SUBSCRIBE logstash-mshk.org-2016.03.29
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "logstash-mshk.org-2016.03.29"
3) (integer) 1

  好,在运行 logstash 的终端里输入 "Hello mshk.org" 字符串。

➜ /Users/lion/tar/logstash-2.2.2 >./bin/logstash -f output_redis.conf
Settings: Default pipeline workers: 8
Logstash startup completed
Hello mshk.org

  切换回 redis-cli 的终端,你发现已经自动输出了一条信息:

➜ /Users/lion >redis-cli
127.0.0.1:6379> SUBSCRIBE logstash-mshk.org-2016.03.29
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "logstash-mshk.org-2016.03.29"
3) (integer) 1
1) "message"
2) "logstash-mshk.org-2016.03.29"
3) "{\"message\":\"Hello mshk.org\",\"@version\":\"1\",\"@timestamp\":\"2016-03-30T04:09:00.232Z\",\"host\":\"liondeMacBook-Pro.local\"}"

  看起来是不是非常眼熟?这一串字符其实就是我们在 inputs/redis 一节中使用的那段数据。

  看,这样就把 outputs/redis 和 inputs/redis 串联起来了吧!

  事实上,这就是我们使用 redis 服务器作为 logstassh 架构中 broker 角色的原理。

  让我们把这两节中不同配置的 logstash 进程分别在两个终端运行起来,这次不再要运行 redis-cli 命令了。在配有 outputs/redis 这端输入 "hello world",配有 "inputs/redis" 的终端上,就自动输出数据了!

notification use case
  我们还可以用其他程序来订阅 redis 频道,程序里就可以随意写其他逻辑了。你可以看看 output/juggernaut 插件的原理。这个 Juggernaut 就是基于 redis 服务器和 socket.io 框架构建的。利用它,logstash 可以直接向 webkit 等支持 socket.io 的浏览器推送告警信息。

扩展方式
  和 LogStash::Inputs::Redis 一样,这里也有设置成 list 的方式。使用 RPUSH 命令发送给 redis 服务器,效果和之前展示的完全一致。包括可以调整的参数 batch_event,也在之前章节中讲过。这里不再重复举例。

发送网络数据(TCP)

  虽然之前我们已经提到过不建议直接使用 LogStash::Inputs::TCP 和 LogStash::Outputs::TCP 做转发工作,不过在实际交流中,发现确实有不少朋友觉得这种简单配置足够使用,因而不愿意多加一层消息队列的。所以,还是把 Logstash 如何直接发送 TCP 数据也稍微提点一下。

配置示例

output {
    tcp {
        host  => "127.0.0.1"
        port  => 8888
        codec => json_lines
    }
}

配置说明
  在收集端采用 tcp 方式发送给远端的 tcp 端口。这里需要注意的是,默认的 codec 选项是 json。而远端的 LogStash::Inputs::TCP 的默认 codec 选项却是 plain !所以不指定各自的 codec ,对接肯定是失败的。

  另外,由于IO BUFFER 的原因,即使是两端共同约定为 json 依然无法正常运行,接收端会认为一行数据没结束,一直等待直至自己 OutOfMemory !

  所以,正确的做法是,发送端指定 codec 为 json_lines ,这样每条数据后面会加上一个回车,接收端指定 codec 为 json_lines 或者 json 均可,这样才能正常处理。包括在收集端已经切割好的字段,也可以直接带入收集端使用了。

调用命令执行(Exec)

  outputs/exec 插件的运用也非常简单,如下所示,将 logstash 切割成的内容作为参数传递给命令。这样,在每个事件到达该插件的时候,都会触发这个命令的执行。

output {
    exec {
        command => "sendsms.pl \"%{message}\" -t %{user}"
    }
}

  需要注意的是。这种方式是每次都重新开始执行一次命令并退出。本身是比较慢速的处理方式(程序加载,网络建联等都有一定的时间消耗)。最好只用于少量的信息处理场景,比如不适用 nagios 的其他报警方式。示例就是通过短信发送消息。

  https://github.com/chenryn/logstash-best-practice-cn

推荐阅读

《the life of an event》官网文档
《life of a logstash event》Elastic{ON} 上的演讲
inux系统下pv命令的一些使用技巧小结
Redis 命令参考


博文作者:迦壹
博客地址:Logstash 2.2.0 的最佳实践
写作时间:2017-01-04 11:44
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!


说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz