tcpinput newline split
Description
Gliffy Diagrams
Activity
Aaron Mildenstein February 6, 2015 at 10:19 PM
In light of several revisions to core and plugin code since this was last updated, I'm marking it closed.
If you have issues with the plugins discussed here, please add issues at their respective repositories:
https://github.com/logstash-plugins/logstash-input-tcp
https://github.com/logstash-plugins/logstash-codec-line
https://github.com/logstash-plugins/logstash-codec-multiline
https://github.com/logstash-plugins/logstash-filter-multiline
Ente September 18, 2013 at 11:49 AMEdited
the following patch changes the codecs/multiline file in a way that it splits its input like the line codec before doing its multiline magic. This makes sense because the multiline codec should always get line separated input. Which means my first patch is not needed for this soulution.
You should definitely add to the documentation that a combination like
input { tcp } filter { multiline }
can cause problems when several peers are connecting to the same port.
diff --git a/lib/logstash/codecs/multiline.rb b/lib/logstash/codecs/multiline.rb
index bc5818d..47f57e8 100644
--- a/lib/logstash/codecs/multiline.rb
+++ b/lib/logstash/codecs/multiline.rb
@@ -134,23 +134,35 @@ class LogStash::Codecs::Multiline < LogStash::Codecs::Base
@buffer = []
@handler = method("do_#{@what}".to_sym)
+
+ #initialize buffertokenizer
+ require "logstash/util/buftok"
+ @buftok = FileWatch::BufferedTokenizer.new()
+
end # def register
public
def decode(text, &block)
- text.force_encoding(@charset)
- if @charset != "UTF-8"
- # Convert to UTF-8 if not in that character set.
- text = text.encode("UTF-8", :invalid => :replace, :undef => :replace)
- end
-
- match = @grok.match(text)
- @logger.debug("Multiline", :pattern => @pattern, :text => text,
+ @buftok.extract(text).each do |line|
+ line.force_encoding(@charset)
+ if @charset != "UTF-8"
+ # The user has declared the character encoding of this data is
+ # something other than UTF-8. Let's convert it (as cleanly as possible)
+ # into UTF-8 so we can use it with JSON, etc.
+
+ # To convert, we first tell ruby the string is *really* encoded as
+ # somethign else (@charset), then we convert it to UTF-8.
+ data = data.encode("UTF-8", :invalid => :replace, :undef => :replace)
+ end
+ # do the matching
+ match = @grok.match(line)
+ @logger.debug("Multiline", :pattern => @pattern, :line => line,
:match => !match.nil?, :negate => @negate)
-
- # Add negate option
- match = (match and !@negate) || (!match and @negate)
- @handler.call(text, match, &block)
+ # Add negate option
+ match = (match and !@negate) || (!match and @negate)
+ # call the handler
+ @handler.call(line, match, &block)
+ end
end # def decode
def buffer(text)
@@ -160,7 +172,8 @@ class LogStash::Codecs::Multiline < LogStash::Codecs::Base
def flush(&block)
if @buffer.any?
- event = LogStash::Event.new("@timestamp" => @time, "message" => @buffer.join("\n"))
+ # assemble everything still in the buffer to a string separated by newlines and remove tha last \n
+ event = LogStash::Event.new("@timestamp" => @time, "message" => @buffer.join("\n").chomp())
# Tag multiline events
event.tag @multiline_tag if @multiline_tag && @buffer.size > 1
IMPORTANT: this patch implies that there are newlines in the string provided to decode(). Otherwise things will go terrible wrong. As the file input plugin, or more precise filewatch, uses buftock to separate lines and as buftock doesen't preserve newlines, fileinput and probably other plugins need to be revised.
To clarify everything you should define the decode() interface properly:
Who splits the lines?
Are there newlines?
Which encoding is used?
[...]
this is a dirty patch for the fileplugin:
--- a/lib/logstash/inputs/file.rb
+++ b/lib/logstash/inputs/file.rb
@@ -126,7 +126,9 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
hostname = Socket.gethostname
@tail.subscribe do |path, line|
- @logger.debug? && @logger.debug("Received line", :path => path, :line => line)
+ @logger.debug? && @logger.debug("Received new line", :path => path, :line => line)
+ # because filewatch removes newlines we've to append one again
+ line << "\n"
@codec.decode(line) do |event|
decorate(event)
event["host"] = hostname
Ente September 18, 2013 at 10:06 AMEdited
ewwww. just realized the combination tcp-input -> mutliline-filter is bad idea anyway. Because tcpinput can accept data from several sockets in parallel but is always adding the events to the same output_queue. So if two peers are sending stuff at the very same time, multiline will get very confused .
so back to the splitting problem..
Ente September 18, 2013 at 9:42 AM
aaaaaaand an other bug in tcpinput/multiline.
Tell me if you want this in a new issue.
Starting logstash like this:
logstash agent -e 'input { tcp { port => 1234 } } filter { multiline { pattern => "^foo" negate => true what => "previous" } }'
input:
echo -e 'foo\nfoo\nbar\n' | nc localhost 1234
output:
{
"message" => "foo",
"@timestamp" => "2013-09-18T09:08:22.049Z",
"@version" => "1",
"host" => "127.0.0.1:60128"
}
the second:
foo
bar
gets lost.
reason:
only the flush() method of the codec plugin is called. The empty event generated at flush() is appended to the previous event by the multiline filter.
workaround:
using the multiline plugin as a codec instead of a filter does only partially solve this because of the other issue mentioned in the thread.
patch:
we can either:
mark the event generated at flush() as a "terdown" event so that all the filters know that this is going to be the last event.
or
split tcp input on '\n'
or
other solution which i dont see now because i dont know the codebase that good
Ente September 18, 2013 at 8:49 AM
the behaviour of the tcpinput plugin is very strange in this case..
Example:
$ logstash agent -e 'input { tcp { port => "1234" codec => multiline { pattern => "^foo" negate => true what => "previous" } } }'
Input:
$ echo -e 'foo\nbar\nfoo\nbar' | nc localhost 1234
Output:
{
"@timestamp" => "2013-09-18T08:37:10.264Z",
"message" => "foo\nbar\nfoo\nbar\n",
"@version" => "1",
"host" => "127.0.0.1:59076"
}
which is not correct. The decode method of multiline gets all 4 lines concatinated as input.
Multiline {:pattern=>"^foo", :text=>"foo\nbar\nfoo\nbar\n", :match=>true, :negate=>true, :level=>:debug, :file=>"logstash-1.2.1-flatjar.jar!/logstash/codecs/multiline.rb", :line=>"148", :method=>"decode"}
On the other hand if i type foo and bar manually in nc
$ nc localhost 1234
foo
bar
foo
bar
foo
I get the expected result:
{
"@timestamp" => "2013-09-18T08:39:48.409Z",
"message" => "foo\nbar\n",
"@version" => "1",
"host" => "127.0.0.1:59084"
}
{
"@timestamp" => "2013-09-18T08:39:49.211Z",
"message" => "foo\n\nbar\n",
"@version" => "1",
"tags" => [
[0] "multiline"
],
"host" => "127.0.0.1:59084"
}
Might the timeout in the tcpplugin explain this?
the tcp plugin does not split events by newline.
suggested patch:
index 345b900..0f1988f 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -141,7 +141,10 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base private def read(socket) - return socket.sysread(16384) + data = socket.gets(16384) + @logger.debug("Reading from socket:",:data => data) + return data end # def readline public
reason:
IO.sysread() does not give a shit about newlines.
result:
multiline input is always one event
what the documentation says:
Like stdin and file inputs, each event is assumed to be one line of text.
so you either should update the documentation or the plugin.