file output doesn't re-start on broken pipe

Description

When writing to a named pipe, logstash' file output breaks and stays broken if the pipe breaks, e.g. Nagios re-starts. (I'd use the nagios output, but it can only cope with service notices. I also need host notices.)

:timestamp=>"2013-03-19T17:28:06.300000-0400",
:message=>"Output thread exception",
face with tonguelugin=><LogStash::Outputs::File message_format=>"%{@message}", path=>"/var/lib/nagios3/rw/nagios.cmd", type=>"nagios">,
:exception=>java.io.IOException: Broken pipe,
:backtrace=>[
"java.io.FileOutputStream.writeBytes(Native Method)",
"java.io.FileOutputStream.write(FileOutputStream.java:318)",
"sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)",
"sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)",
"sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)",
"sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)",
"java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)",
"java.io.Writer.write(Writer.java:157)",
"sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)",
"sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
"java.lang.reflect.Method.invoke(Method.java:601)",
"org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:455)",
"org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:316)",
"org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:61)",
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)",
"org.jruby.runtime.callsite.CachingCallSite.callVarargs(CachingCallSite.java:103)",
"rubyjit.IOWriter$$write_5E85C1E7860F16854CAD01298624C5602D378586860840791._file_(file:/opt/logstash/logstash-1.1.10.dev-monolithic.jar!/logstash/outputs/file.rb:154)",
"rubyjit.IOWriter$$write_5E85C1E7860F16854CAD01298624C5602D378586860840791._file_(file:/opt/logstash/logstash-1.1.10.dev-monolithic.jar!/logstash/outputs/file.rb)",
"org.jruby.ast.executable.AbstractScript._file_(AbstractScript.java:42)",
"org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:181)",
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)",
"rubyjit.LogStash::Outputs::File$$receive_1279DB8CB17A197B985E98B58EC056BEE5E28667860840791._file_(file:/opt/logstash/logstash-1.1.10.dev-monolithic.jar!/logstash/outputs/file.rb:66)",
"rubyjit.LogStash::Outputs::File$$receive_1279DB8CB17A197B985E98B58EC056BEE5E28667860840791._file_(file:/opt/logstash/logstash-1.1.10.dev-monolithic.jar!/logstash/outputs/file.rb)",
"org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:181)",
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)",
"rubyjit.LogStash::Outputs::Base$$handle_7B645A9ED8B31829850C50DECFB7DDAB84C881C0860840791._file_(file:/opt/logstash/logstash-1.1.10.dev-monolithic.jar!/logstash/outputs/base.rb:55)",
"rubyjit.LogStash::Outputs::Base$$handle_7B645A9ED8B31829850C50DECFB7DDAB84C881C0860840791._file_(file:/opt/logstash/logstash-1.1.10.dev-monolithic.jar!/logstash/outputs/base.rb)",
"org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:181)",
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)",
"org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)",
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)",
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)",
"org.jruby.ast.WhileNode.interpret(WhileNode.java:131)",
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)",
"org.jruby.ast.RescueNode.executeBody(RescueNode.java:224)",
"org.jruby.ast.RescueNode.interpret(RescueNode.java:119)",
"org.jruby.ast.BeginNode.interpret(BeginNode.java:83)",
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)",
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)",
"org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:75)",
"org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:225)",
"org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:204)",
"org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:346)",
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:204)",
"org.jruby.ast.FCallSpecialArgNode.interpret(FCallSpecialArgNode.java:41)",
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)",
"org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)",
"org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:209)",
"org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:197)",
"org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:128)",
"org.jruby.runtime.Block.call(Block.java:89)",
"org.jruby.RubyProc.call(RubyProc.java:261)",
"org.jruby.RubyProc.call(RubyProc.java:213)",
"org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:98)",
"java.lang.Thread.run(Thread.java:722)"],
:level=>:warn

discovered while testing

Gliffy Diagrams

Activity

Show:

January 15, 2014 at 5:08 PM

Ooo, that's really cool! Very nice!

Matthew Buckett January 15, 2014 at 4:57 PM

I've got a pull request out for https://logstash.jira.com/browse/LOGSTASH-1799#icft=LOGSTASH-1799 which allows for control of restart for the pip input.

I was going to use cron as I need to run with some kerberos/afs tickets and then I don't have to play with k5start to keep the kerberos ticket alive and renew the afs tokens. Also cron makes it easy for me to get output of problems with the run over email. It also allows a very similar setup in testing and actual production.

January 15, 2014 at 4:48 PM

Ahh, that's an interesting use case. Any particular reason for having logstash launched through a cron job instead of having it just keep running? Long-running logstash is the more common approach, particularly in these cases when the exec {} input can easily handle the periodic nature of your logs. Adding a feature to the pipe {} and/or exec {} filter to prevent looping/retries is certainly another option you might want to consider.

Matthew Buckett January 15, 2014 at 3:30 PM

The reason I originally started with the pipe input is it makes it easy to test the configuration as I would have a pipe that took the top 10000 lines from a file for testing.

input { pipe { command => "gzip -dc /var/log/apache/access.1.gz | head -10000" } }

This way I can run logstash and check what the output is and once it's finished processing logstash exits so I know it's complete and then I can review the output, change the config and re-run logstash again.

Matthew Buckett January 15, 2014 at 3:20 PM
Edited

exec input doesn't really work as I don't want to specify an interval at which to run the command as I wanted to start logstash from a cron job.

Fixed

Details

Assignee

Reporter

Fix versions

Affects versions

Created March 19, 2013 at 9:37 PM
Updated May 16, 2014 at 11:48 AM
Resolved September 3, 2013 at 11:32 PM