inputs → filters → outputs
create → motify → ship
input threads | pipeline worker threads
--pipeline-workers
--pipeline-batch-size
--pipeline-batch-delay
input {
stdin {}
syslog {}
}
debug => true
host => "hostname"
port => 514
match => ["datetime", "UNIX", "ISO8601"]
options => {
key1 => "value1",
key2 => "value2"
}
[geoip][location][0]
[geoip][location][-1]
"the location is %{[geoip][location][0]}"
if "_grokparsefailure" not in [tags] {
} else if [status] !~ /^2\d\d/ or ( [url] == "/noc.gif" nand [geoip][city] != "beijing" ) {
} else {
}
input → decode → filter → encode → output
input {
file {
path => "/var/log/nginx/access.log_json""
codec => "json"
}
}
bundle gem logstash-filter-example
Add dependencies to your gemspec
# Gem dependencies
s.add_runtime_dependency 'logstash-core', '>= 1.4.0', '< 2.0.0'
s.add_development_dependency 'logstash-devutils'
Add require Statements to lib/logstash/filters/example.rb
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
Make sure all dependencies in your Gemfile are available to your application.
bundle install
$ tree logstash-output-mypluginname
├── Gemfile
├── LICENSE
├── README.md
├── Rakefile
├── lib
│ └── logstash
│ └── outputs
│ └── mypluginname.rb
├── logstash-output-mypluginname.gemspec
└── spec
└── outputs
└── mypluginname_spec.rb
In logstash plugin api 1.x (logstash 2.x):
# event get api
request_url = event[@source]
# event set api
event['city'] = area.city.name
In logstash plugin api 2.x (logstash 5.x):
# event get api
request_url = event.get(@source)
#event set api
event.set('city',area.city.name)
class LogStash::Inputs::Example < LogStash::Inputs::Base
config_name "example"
config :message, :validate => :string, :default => "Hello World!"
config :interval, :validate => :number, :default => 1
public
def register
@host = Socket.gethostname
end
def run(queue)
while !stop?
event = LogStash::Event.new("message" => @message, "host" => @host)
decorate(event)
queue << event
Stud.stoppable_sleep(@interval) { stop? }
end
end
def stop
# examples of common "stop" tasks:
# * close sockets (unblocking blocking reads/accepts)
# * cleanup temporary files
# * terminate spawned threads
end
end
class LogStash::Codecs::Example < LogStash::Codecs::Base
config_name "example"
config :append, :validate => :string, :default => ', Hello World!'
public
def register
@lines = LogStash::Codecs::Line.new
@lines.charset = "UTF-8"
end
public
def decode(data)
@lines.decode(data) do |line|
replace = { "message" => line["message"].to_s + @append }
yield LogStash::Event.new(replace)
end
end
public
def encode(event)
@on_event.call(event, event["message"].to_s + @append + NL)
end
end
class LogStash::Filters::Example < LogStash::Filters::Base
config_name "example"
# Replace the message with this value.
config :message, :validate => :string, :default => "Hello World!"
public
def register
# Add instance variables
end
public
def filter(event)
if @message
# Replace the event message with our message as configured in the
# config file.
event["message"] = @message
end
# filter_matched should go in the last line of our successful code
filter_matched(event)
end
end
class LogStash::Outputs::Example < LogStash::Outputs::Base
config_name "example"
# If declared logstash will only allow a single instance of this plugin
# to exist, regardless of how many CPU cores logstash detects. This is best
# used in cases like the File output, where separate threads writing to a single
# File would only cause problems.
#
# respond_to? check needed for backwards compatibility with < 2.2 Logstashes
declare_workers_not_supported! if self.respond_to?(:declare_workers_not_supported!)
# If declared threadsafe logstash will only ever create one
# instance of this plugin per pipeline.
# That instance will be shared across all workers
# It is up to the plugin author to correctly write concurrent code!
#
# respond_to? check needed for backwards compatibility with < 2.2 Logstashes
declare_threadsafe! if self.respond_to?(:declare_threadsafe!)
public
def register
# Does the same thing as declare_workers_not_supported!
# But works in < 2.2 logstashes
# workers_not_supported
end # def register
public
# Takes an array of events
def multi_receive(events)
end # def multi_receive
public
# Needed for logstash < 2.2 compatibility
# Takes events one at a time
def receive(event)
end # def receive
end
describe LogStash::Filters::Example do
describe "Set the province and city" do
let(:config) do <<-CONFIG
filter {
redpage {
source => 'message'
}
}
CONFIG
end
sample("message" => "http://site.xxx.gov.cn/www/areacodexxx/org_codexxx/index.html") do
expect(subject["province"]).to eq('河南省')
expect(subject["city"]).to eq('周口市')
expect(subject["org_name"]).to eq('xxx人力资源和社会保障局')
end
end
end
export JRUBY_OPTS="--server \
-J-Xms1500m -J-Xmx1500m \
-J-XX:+UseConcMarkSweepGC \
-J-XX:-UseGCOverheadLimit \
-J-XX:+CMSClassUnloadingEnabled""
gem build logstash-filter-example.gemspec
Apache License, version 2 (ALv2)
curl -u username:password https://rubygems.org/api/v1/api_key.yaml > ~/.gem/credentials
chmod 0600 ~/.gem/credentials
bundle exec rake publish_gem
Usage:
bin/plugin [OPTIONS] SUBCOMMAND [ARG] ...
Parameters:
SUBCOMMAND subcommand
[ARG] ... subcommand arguments
Subcommands:
install Install a plugin
uninstall Uninstall a plugin
update Install a plugin
list List all installed plugins
Options:
-h, --help print help