How to write a logstash plugin

2016.7.8

Created byEcho Hon/@echohn

Why Logstash?

  • Easy
  • Pipeline
  • Plugins
  • JRuby
  • ElasticSearch
  • Kibana

Pipeline

inputs → filters → outputs

Event:

create → motify → ship

When receive a SIGTERM

  • Halt inputs
  • Waitingfor pending events to finish processing
  • waitsindefinitelywhen cannot be flushed due to a stuck output or filter

New Execution Model in version 2.3

input threads | pipeline worker threads
  • Substantially improves thread liveness
  • Decreases resource usage
  • Increases throughput

Options

  • --pipeline-workers
  • --pipeline-batch-size
  • --pipeline-batch-delay

SYNTAX

  • section
  • input {
        stdin {}
        syslog {}
    }
  • bool
  • debug => true
  • string
  • host => "hostname"
  • number
  • port => 514
  • array
  • match => ["datetime", "UNIX", "ISO8601"]
  • hash
  • options => {
        key1 => "value1",
        key2 => "value2"
    }

Field Reference

[geoip][location][0]
[geoip][location][-1]
"the location is %{[geoip][location][0]}"

Condition

  • ==, !=, <, >, <=, >=
  • =~, !~
  • in, not in
  • and, or, nand, xor
  • (), !()
if "_grokparsefailure" not in [tags] {
} else if [status] !~ /^2\d\d/ or ( [url] == "/noc.gif" nand [geoip][city] != "beijing" ) {
} else {
}

Codec

input → decode → filter → encode → output

input {
    file {
        path => "/var/log/nginx/access.log_json""
        codec => "json"
    }
}

Deveopment Environment

Install Jruby


UseVagranton Windows

Create Your Gem

bundle gem logstash-filter-example

Add dependencies to your gemspec

# Gem dependencies
s.add_runtime_dependency 'logstash-core', '>= 1.4.0', '< 2.0.0'
s.add_development_dependency 'logstash-devutils'

Add require Statements to lib/logstash/filters/example.rb

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"

Or Clone theExample

Make sure all dependencies in your Gemfile are available to your application.

bundle install

File Structure

$ tree logstash-output-mypluginname
├── Gemfile
├── LICENSE
├── README.md
├── Rakefile
├── lib
│   └── logstash
│       └── outputs
│           └── mypluginname.rb
├── logstash-output-mypluginname.gemspec
└── spec
    └── outputs
        └── mypluginname_spec.rb

Plugin Development

In logstash plugin api 1.x (logstash 2.x):

# event get api
request_url = event[@source]

# event set api
event['city'] = area.city.name

In logstash plugin api 2.x (logstash 5.x):

# event get api
request_url = event.get(@source)

#event set api
event.set('city',area.city.name)

Input plugin example

class LogStash::Inputs::Example < LogStash::Inputs::Base
  config_name "example"
  config :message, :validate => :string, :default => "Hello World!"
  config :interval, :validate => :number, :default => 1

  public
  def register
    @host = Socket.gethostname
  end

  def run(queue)
    while !stop?
      event = LogStash::Event.new("message" => @message, "host" => @host)
      decorate(event)
      queue << event
      Stud.stoppable_sleep(@interval) { stop? }
    end
  end

  def stop
    # examples of common "stop" tasks:
    #  * close sockets (unblocking blocking reads/accepts)
    #  * cleanup temporary files
    #  * terminate spawned threads
  end
end

Codec plugin example

class LogStash::Codecs::Example < LogStash::Codecs::Base
  config_name "example"
  config :append, :validate => :string, :default => ', Hello World!'

  public
  def register
    @lines = LogStash::Codecs::Line.new
    @lines.charset = "UTF-8"
  end

  public
  def decode(data)
    @lines.decode(data) do |line|
      replace = { "message" => line["message"].to_s + @append }
      yield LogStash::Event.new(replace)
    end
  end

  public
  def encode(event)
    @on_event.call(event, event["message"].to_s + @append + NL)
  end
end

Filter plugin example

class LogStash::Filters::Example < LogStash::Filters::Base
  config_name "example"

  # Replace the message with this value.
  config :message, :validate => :string, :default => "Hello World!"

  public
  def register
    # Add instance variables
  end

  public
  def filter(event)

    if @message
      # Replace the event message with our message as configured in the
      # config file.
      event["message"] = @message
    end

    # filter_matched should go in the last line of our successful code
    filter_matched(event)
  end

end

Output plugin example

class LogStash::Outputs::Example < LogStash::Outputs::Base
  config_name "example"

  # If declared logstash will only allow a single instance of this plugin
  # to exist, regardless of how many CPU cores logstash detects. This is best
  # used in cases like the File output, where separate threads writing to a single
  # File would only cause problems.
  #
  # respond_to? check needed for backwards compatibility with < 2.2 Logstashes
  declare_workers_not_supported! if self.respond_to?(:declare_workers_not_supported!)

  # If declared threadsafe logstash will only ever create one
  # instance of this plugin per pipeline.
  # That instance will be shared across all workers
  # It is up to the plugin author to correctly write concurrent code!
  #
  # respond_to? check needed for backwards compatibility with < 2.2 Logstashes
  declare_threadsafe! if self.respond_to?(:declare_threadsafe!)

  public
  def register
    # Does the same thing as declare_workers_not_supported!
    # But works in < 2.2 logstashes
    # workers_not_supported
  end # def register

  public
  # Takes an array of events
  def multi_receive(events)
  end # def multi_receive

  public
  # Needed for logstash < 2.2 compatibility
  # Takes events one at a time
  def receive(event)
  end # def receive

end

SPEC YOUR PLUGIN

describe LogStash::Filters::Example do
  describe "Set the province and city" do
    let(:config) do <<-CONFIG
      filter {
        redpage {
          source => 'message'
        }
      }
    CONFIG
    end

    sample("message" => "http://site.xxx.gov.cn/www/areacodexxx/org_codexxx/index.html") do
      expect(subject["province"]).to eq('河南省')
      expect(subject["city"]).to eq('周口市')
      expect(subject["org_name"]).to eq('xxx人力资源和社会保障局')

    end
  end
end

Troubleshooting

export JRUBY_OPTS="--server \
  -J-Xms1500m -J-Xmx1500m   \
  -J-XX:+UseConcMarkSweepGC \
  -J-XX:-UseGCOverheadLimit \
  -J-XX:+CMSClassUnloadingEnabled""

Build your gem

gem build logstash-filter-example.gemspec

Publish your plugin to RubyGems.org

Licensing

Apache License, version 2 (ALv2)

curl -u username:password https://rubygems.org/api/v1/api_key.yaml > ~/.gem/credentials
chmod 0600 ~/.gem/credentials
bundle exec rake publish_gem

plugin installation

Usage:
    bin/plugin [OPTIONS] SUBCOMMAND [ARG] ...

Parameters:
    SUBCOMMAND  subcommand
    [ARG] ...   subcommand arguments

Subcommands:
    install     Install a plugin
    uninstall   Uninstall a plugin
    update      Install a plugin
    list        List all installed plugins

Options:
    -h, --help  print help

THE END

-Logstash Documentation
-ELK user guide in Chinese