tcpinput newline split

Description

the tcp plugin does not split events by newline.

suggested patch:

index 345b900..0f1988f 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -141,7 +141,10 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base private def read(socket) - return socket.sysread(16384) + data = socket.gets(16384) + @logger.debug("Reading from socket:",:data => data) + return data end # def readline public

reason:
IO.sysread() does not give a shit about newlines.

result:
multiline input is always one event

what the documentation says:

  1. Like stdin and file inputs, each event is assumed to be one line of text.

so you either should update the documentation or the plugin.

Gliffy Diagrams

Activity

Show:

Aaron Mildenstein February 6, 2015 at 10:19 PM

In light of several revisions to core and plugin code since this was last updated, I'm marking it closed.

If you have issues with the plugins discussed here, please add issues at their respective repositories:

https://github.com/logstash-plugins/logstash-input-tcp
https://github.com/logstash-plugins/logstash-codec-line
https://github.com/logstash-plugins/logstash-codec-multiline
https://github.com/logstash-plugins/logstash-filter-multiline

Ente September 18, 2013 at 11:49 AM
Edited

the following patch changes the codecs/multiline file in a way that it splits its input like the line codec before doing its multiline magic. This makes sense because the multiline codec should always get line separated input. Which means my first patch is not needed for this soulution.

You should definitely add to the documentation that a combination like

input { tcp } filter { multiline }

can cause problems when several peers are connecting to the same port.

diff --git a/lib/logstash/codecs/multiline.rb b/lib/logstash/codecs/multiline.rb index bc5818d..47f57e8 100644 --- a/lib/logstash/codecs/multiline.rb +++ b/lib/logstash/codecs/multiline.rb @@ -134,23 +134,35 @@ class LogStash::Codecs::Multiline < LogStash::Codecs::Base @buffer = [] @handler = method("do_#{@what}".to_sym) + + #initialize buffertokenizer + require "logstash/util/buftok" + @buftok = FileWatch::BufferedTokenizer.new() + end # def register public def decode(text, &block) - text.force_encoding(@charset) - if @charset != "UTF-8" - # Convert to UTF-8 if not in that character set. - text = text.encode("UTF-8", :invalid => :replace, :undef => :replace) - end - - match = @grok.match(text) - @logger.debug("Multiline", :pattern => @pattern, :text => text, + @buftok.extract(text).each do |line| + line.force_encoding(@charset) + if @charset != "UTF-8" + # The user has declared the character encoding of this data is + # something other than UTF-8. Let's convert it (as cleanly as possible) + # into UTF-8 so we can use it with JSON, etc. + + # To convert, we first tell ruby the string is *really* encoded as + # somethign else (@charset), then we convert it to UTF-8. + data = data.encode("UTF-8", :invalid => :replace, :undef => :replace) + end + # do the matching + match = @grok.match(line) + @logger.debug("Multiline", :pattern => @pattern, :line => line, :match => !match.nil?, :negate => @negate) - - # Add negate option - match = (match and !@negate) || (!match and @negate) - @handler.call(text, match, &block) + # Add negate option + match = (match and !@negate) || (!match and @negate) + # call the handler + @handler.call(line, match, &block) + end end # def decode def buffer(text) @@ -160,7 +172,8 @@ class LogStash::Codecs::Multiline < LogStash::Codecs::Base def flush(&block) if @buffer.any? - event = LogStash::Event.new("@timestamp" => @time, "message" => @buffer.join("\n")) + # assemble everything still in the buffer to a string separated by newlines and remove tha last \n + event = LogStash::Event.new("@timestamp" => @time, "message" => @buffer.join("\n").chomp()) # Tag multiline events event.tag @multiline_tag if @multiline_tag && @buffer.size > 1

IMPORTANT: this patch implies that there are newlines in the string provided to decode(). Otherwise things will go terrible wrong. As the file input plugin, or more precise filewatch, uses buftock to separate lines and as buftock doesen't preserve newlines, fileinput and probably other plugins need to be revised.

To clarify everything you should define the decode() interface properly:

  • Who splits the lines?

  • Are there newlines?

  • Which encoding is used?
    [...]

this is a dirty patch for the fileplugin:

--- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -126,7 +126,9 @@ class LogStash::Inputs::File < LogStash::Inputs::Base hostname = Socket.gethostname @tail.subscribe do |path, line| - @logger.debug? && @logger.debug("Received line", :path => path, :line => line) + @logger.debug? && @logger.debug("Received new line", :path => path, :line => line) + # because filewatch removes newlines we've to append one again + line << "\n" @codec.decode(line) do |event| decorate(event) event["host"] = hostname

Ente September 18, 2013 at 10:06 AM
Edited

ewwww. just realized the combination tcp-input -> mutliline-filter is bad idea anyway. Because tcpinput can accept data from several sockets in parallel but is always adding the events to the same output_queue. So if two peers are sending stuff at the very same time, multiline will get very confused slightly smiling face.

so back to the splitting problem..

Ente September 18, 2013 at 9:42 AM

aaaaaaand an other bug in tcpinput/multiline.
Tell me if you want this in a new issue.

Starting logstash like this:

logstash agent -e 'input { tcp { port => 1234 } } filter { multiline { pattern => "^foo" negate => true what => "previous" } }'

input:

echo -e 'foo\nfoo\nbar\n' | nc localhost 1234

output:

{ "message" => "foo", "@timestamp" => "2013-09-18T09:08:22.049Z", "@version" => "1", "host" => "127.0.0.1:60128" }

the second:

foo bar

gets lost.

reason:
only the flush() method of the codec plugin is called. The empty event generated at flush() is appended to the previous event by the multiline filter.

workaround:
using the multiline plugin as a codec instead of a filter does only partially solve this because of the other issue mentioned in the thread.

patch:
we can either:

  • mark the event generated at flush() as a "terdown" event so that all the filters know that this is going to be the last event.

or

  • split tcp input on '\n'

or

  • other solution which i dont see now because i dont know the codebase that good

Ente September 18, 2013 at 8:49 AM

the behaviour of the tcpinput plugin is very strange in this case..

Example:

$ logstash agent -e 'input { tcp { port => "1234" codec => multiline { pattern => "^foo" negate => true what => "previous" } } }'

Input:

$ echo -e 'foo\nbar\nfoo\nbar' | nc localhost 1234

Output:

{ "@timestamp" => "2013-09-18T08:37:10.264Z", "message" => "foo\nbar\nfoo\nbar\n", "@version" => "1", "host" => "127.0.0.1:59076" }

which is not correct. The decode method of multiline gets all 4 lines concatinated as input.

Multiline {:pattern=>"^foo", :text=>"foo\nbar\nfoo\nbar\n", :match=>true, :negate=>true, :level=>:debug, :file=>"logstash-1.2.1-flatjar.jar!/logstash/codecs/multiline.rb", :line=>"148", :method=>"decode"}

On the other hand if i type foo and bar manually in nc

$ nc localhost 1234 foo bar foo bar foo

I get the expected result:

{ "@timestamp" => "2013-09-18T08:39:48.409Z", "message" => "foo\nbar\n", "@version" => "1", "host" => "127.0.0.1:59084" } { "@timestamp" => "2013-09-18T08:39:49.211Z", "message" => "foo\n\nbar\n", "@version" => "1", "tags" => [ [0] "multiline" ], "host" => "127.0.0.1:59084" }

Might the timeout in the tcpplugin explain this?

Cannot Reproduce

Details

Assignee

Reporter

Created September 17, 2013 at 8:28 PM
Updated February 7, 2015 at 1:38 PM
Resolved February 6, 2015 at 10:19 PM