Commits

Miki Tebeka  committed 6450fc5

Initial import

  • Participants

Comments (0)

Files changed (7)

+syntax: glob
+
+target
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>com.mikitebeka.mapred</groupId>
+  <artifactId>zipmapred</artifactId>
+  <packaging>jar</packaging>
+  <version>1.0-SNAPSHOT</version>
+  <name>zipmapred</name>
+  <url>https://bitbucket.org/tebeka/zipstream</url>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <cdh.version>cdh4.1.2</cdh.version>
+    <surefire.plugin.version>2.11</surefire.plugin.version>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>2.0.0-mr1-${cdh.version}</version>
+      <exclusions>
+        <exclusion> 
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <version>6.3.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${surefire.plugin.version}</version>
+      </plugin>
+    </plugins>
+  </build>
+</project>

File src/main/java/com/mikitebeka/mapred/ZipFileRecordReader.java

+package com.mikitebeka.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class ZipFileRecordReader implements RecordReader<Text, Text> {
+    private FileSystem fs = null;
+    private FSDataInputStream fsin = null;
+    private ZipLineIterator iter = null;
+
+    public ZipFileRecordReader(InputSplit inputSplit, JobConf job,
+            Reporter reporter) throws IOException {
+        FileSplit split = (FileSplit) inputSplit;
+        Path path = split.getPath();
+        fs = path.getFileSystem(job);
+
+        // Open the stream
+        fsin = fs.open(path);
+        iter = new ZipLineIterator(fsin);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#next(java.lang.Object,
+     * java.lang.Object)
+     */
+    @Override
+    public boolean next(Text key, Text value) throws IOException {
+        if (iter == null) {
+            throw new IOException();
+        }
+
+        if (!iter.hasNext()) {
+            return false;
+        }
+
+        key.set(iter.getName());
+        value.set(iter.next());
+        return true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#createKey()
+     */
+    @Override
+    public Text createKey() {
+        return new Text();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#createValue()
+     */
+    @Override
+    public Text createValue() {
+        return new Text();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#getPos()
+     */
+    @Override
+    public long getPos() throws IOException {
+        if (iter == null) {
+            throw new IOException();
+        }
+
+        return iter.getPos();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#close()
+     */
+    @Override
+    public void close() throws IOException {
+        if (iter == null) {
+            return;
+        }
+
+        iter.close();
+        iter = null;
+        fsin.close();
+        fsin = null;
+        fs.close();
+        fs = null;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#getProgress()
+     */
+    @Override
+    public float getProgress() throws IOException {
+        return iter.getProgress();
+    }
+}

File src/main/java/com/mikitebeka/mapred/ZipInputFormat.java

+package com.mikitebeka.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+class ZipInputFormat extends FileInputFormat<Text, Text> {
+    @Override
+    protected boolean isSplitable(FileSystem fs, Path file) {
+        return false;
+    }
+
+    @Override
+    public RecordReader<Text, Text> getRecordReader(InputSplit genericSplit,
+            JobConf job, Reporter reporter) throws IOException {
+        return new ZipFileRecordReader(genericSplit, job, reporter);
+    }
+
+}

File src/main/java/com/mikitebeka/mapred/ZipLineIterator.java

+package com.mikitebeka.mapred;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+public class ZipLineIterator implements Iterator<String> {
+
+    private ZipInputStream zin = null;
+    private String name;
+    private int pos = 0;
+    private long size;
+    private String nextLine;
+    private BufferedReader reader;
+
+    public ZipLineIterator(InputStream in) throws IOException {
+        zin = new ZipInputStream(in);
+        ZipEntry entry = zin.getNextEntry();
+        if (entry == null) {
+            return;
+        }
+
+        size = entry.getSize();
+        name = entry.getName();
+        reader = new BufferedReader(new InputStreamReader(zin));
+
+        nextLine = reader.readLine();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.Iterator#hasNext()
+     */
+    @Override
+    public boolean hasNext() {
+        return nextLine != null;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.Iterator#next()
+     */
+    @Override
+    public String next() {
+        String line = nextLine;
+        try {
+            nextLine = reader.readLine();
+        } catch (IOException e) {
+            nextLine = null;
+        }
+
+        return line;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.Iterator#remove()
+     */
+    @Override
+    public void remove() {
+        throw new NotImplementedException();
+    }
+
+    /**
+     * @return the name
+     */
+    public String getName() {
+        return name;
+    }
+
+    public void close() throws IOException {
+        if (zin == null) {
+            return;
+        }
+        zin.close();
+    }
+
+    public float getProgress() {
+        if (size == 0) {
+            return 1.0f;
+        }
+
+        return (float) (((double) pos) / size);
+    }
+
+    /**
+     * @return the pos
+     */
+    public int getPos() {
+        return pos;
+    }
+
+}

File src/test/java/com/mikitebeka/mapred/ZipLineReaderTest.java

+package com.mikitebeka.mapred;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test
+public class ZipLineReaderTest {
+
+    private ZipLineIterator getZip(String file) throws IOException {
+        InputStream in = getClass().getResourceAsStream("/" + file);
+        return new ZipLineIterator(in);
+    }
+
+    public void TestSmallFile() throws IOException {
+        ZipLineIterator it = getZip("3.zip");
+        ArrayList<String> lines = new ArrayList<String>();
+        while (it.hasNext()) {
+            lines.add(it.next());
+        }
+
+        @SuppressWarnings("serial")
+        ArrayList<String> expected = new ArrayList<String>() {
+            {
+                add("1");
+                add("2");
+                add("3");
+            }
+        };
+
+        Assert.assertTrue(lines.equals(expected));
+    }
+
+    public void TestBigFile() throws IOException {
+        return;
+        // FIXME: Generate a big one
+        
+        /*
+        ZipLineIterator it = getZip("events.zip");
+
+        int i = 0;
+        String line = null;
+
+        for (; it.hasNext(); i++) {
+            line = it.next();
+        }
+        Assert.assertEquals(i, 8330);
+        String prefix = "...";
+        Assert.assertTrue(line.startsWith(prefix));
+        */
+    }
+}

File src/test/resources/3.zip

Binary file added.