java.nio.channels.ClosedChannelException when writing compressed output in a mapper-only streaming job

Issue #1 new
Jacob Nelson
created an issue

Thanks for writing this very useful bit of code!

I'm using it to process some zipped XML files with a mapper-only Hadoop Streaming job, and I ran into an odd error when I tried to have Hadoop compress the result. (The actual goal is writing out compressed Avro records, but just switching on compression is simpler and yields the same error.)

This command:

hadoop jar hadoop-streaming-2.6.0.jar -libjars zipstream-1.1-SNAPSHOT.jar \
  -D mapreduce.output.fileoutputformat.compress=true \
  -D mapred.reduce.tasks=0 \
  -mapper /bin/cat \
  -inputformat com.mikitebeka.mapred.ZipInputFormat \
  -input small.xml.zip -output xml-output-96

generates this result:

15/07/29 19:57:24 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
packageJobJar: [/tmp/hadoop-unjar735248561067953064/] [] /tmp/streamjob2030896801304615681.jar tmpDir=null
15/07/29 19:57:25 INFO client.RMProxy: Connecting to ResourceManager at hadoop.sampa/10.1.2.5:8032
15/07/29 19:57:25 INFO client.RMProxy: Connecting to ResourceManager at hadoop.sampa/10.1.2.5:8032
15/07/29 19:57:26 INFO mapred.FileInputFormat: Total input paths to process : 1
15/07/29 19:57:26 INFO mapreduce.JobSubmitter: number of splits:1
15/07/29 19:57:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1435969832915_0232
15/07/29 19:57:26 INFO impl.YarnClientImpl: Submitted application application_1435969832915_0232
15/07/29 19:57:26 INFO mapreduce.Job: The url to track the job: http://hadoop.sampa:8088/proxy/application_1435969832915_0232/
15/07/29 19:57:26 INFO mapreduce.Job: Running job: job_1435969832915_0232
15/07/29 19:57:32 INFO mapreduce.Job: Job job_1435969832915_0232 running in uber mode : false
15/07/29 19:57:32 INFO mapreduce.Job:  map 0% reduce 0%
15/07/29 19:57:37 INFO mapreduce.Job: Task Id : attempt_1435969832915_0232_m_000000_0, Status : FAILED
Error: java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1622)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:83)
        at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
        at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:106)
        at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
        at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:108)
        at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.close(MapTask.java:841)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:462)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

15/07/29 19:57:41 INFO mapreduce.Job: Task Id : attempt_1435969832915_0232_m_000000_1, Status : FAILED
Error: java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1622)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:83)
        at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
        at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:106)
        at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
        at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:108)
        at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.close(MapTask.java:841)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:462)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

15/07/29 19:57:46 INFO mapreduce.Job: Task Id : attempt_1435969832915_0232_m_000000_2, Status : FAILED
Error: java.nio.channels.ClosedChannelException
        at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1622)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:83)
        at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92)
        at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:106)
        at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
        at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:108)
        at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.close(MapTask.java:841)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:462)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

15/07/29 19:57:53 INFO mapreduce.Job:  map 100% reduce 0%
15/07/29 19:57:53 INFO mapreduce.Job: Job job_1435969832915_0232 failed with state FAILED due to: Task failed task_1435969832915_0232_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0

15/07/29 19:57:54 INFO mapreduce.Job: Counters: 9
        Job Counters
                Failed map tasks=4
                Launched map tasks=4
                Other local map tasks=3
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=54100
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=13525
                Total vcore-seconds taken by all map tasks=13525
                Total megabyte-seconds taken by all map tasks=55398400
15/07/29 19:57:54 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!

It works fine if I omit either the inputformat or the compression flag.

Eventually I managed to find a workaround that seems to produce correct results. Following the example of this Accumulo ticket, when I set fs.hdfs.impl.disable.cache, the job runs fine and the output looks okay. The full command looks now like this:

hadoop jar hadoop-streaming-2.6.0.jar -libjars zipstream-1.1-SNAPSHOT.jar \
  -D mapreduce.output.fileoutputformat.compress=true \
  -D fs.hdfs.impl.disable.cache=true \
  -D mapred.reduce.tasks=0 \
  -mapper /bin/cat \
  -inputformat com.mikitebeka.mapred.ZipInputFormat \
  -input small.xml.zip -output xml-output-94

But I don't understand what disabling this cache does, or if this is actually a safe/fast thing to do. Do you know what's going on here?

Thanks!

Comments (1)

  1. Miki Tebeka repo owner

    Thanks for reporting!

    I'll try to have a look but it's been a while since I worked with Hadoop MR so it might take some time. I'll ask around if someone is interested in fixing that as well.

  2. Log in to comment