From 3d0f82de3b327c8077747623ce366ad6e72a5d9c Mon Sep 17 00:00:00 2001 From: Bruce Mitchener Date: Mon, 7 Feb 2011 20:51:43 +0700 Subject: [PATCH] FLUME-250. Add support for compression to the Avro data file output format. --- .../handlers/avro/AvroDataFileOutputFormat.java | 43 +++++++++++- .../flume/handlers/avro/TestAvroDataFile.java | 69 ++++++++++++++++++++ 2 files changed, 109 insertions(+), 3 deletions(-) diff --git a/src/java/com/cloudera/flume/handlers/avro/AvroDataFileOutputFormat.java b/src/java/com/cloudera/flume/handlers/avro/AvroDataFileOutputFormat.java index ff0cb43..164c621 100644 --- a/src/java/com/cloudera/flume/handlers/avro/AvroDataFileOutputFormat.java +++ b/src/java/com/cloudera/flume/handlers/avro/AvroDataFileOutputFormat.java @@ -20,7 +20,9 @@ package com.cloudera.flume.handlers.avro; import java.io.IOException; import java.io.OutputStream; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; @@ -52,8 +54,14 @@ public class AvroDataFileOutputFormat extends AbstractOutputFormat { DataFileWriter sink = null; OutputStream cachedOut = null; + CodecFactory codecFactory = null; public AvroDataFileOutputFormat() { + this(null); + } + + public AvroDataFileOutputFormat(CodecFactory codecFactory) { + this.codecFactory = codecFactory; } @Override @@ -62,6 +70,15 @@ public class AvroDataFileOutputFormat extends AbstractOutputFormat { // first time, no current OutputStream or sink. cachedOut = o; sink = new DataFileWriter(writer); + if (codecFactory != null) { + try { + sink.setCodec(codecFactory); + } catch (AvroRuntimeException ex) { + sink = null; + cachedOut = null; + throw new IOException("Error setting Avro datafile codec", ex); + } + } sink.create(schema, o); // this opens } @@ -87,9 +104,29 @@ public class AvroDataFileOutputFormat extends AbstractOutputFormat { @Override public OutputFormat build(String... args) { - Preconditions.checkArgument(args.length == 0, "usage: avrodata"); - - OutputFormat format = new AvroDataFileOutputFormat(); + Preconditions.checkArgument(args.length <= 2, "usage: avrodata([codec, [compressionLevel]])"); + + String codecName = null; + int compressionLevel = -1; + if (args.length > 0) { + codecName = args[0]; + if (args.length > 1) { + compressionLevel = Integer.parseInt(args[1]); + } + } + + CodecFactory codecFactory = null; + try { + if ((codecName != null) && codecName.equals("deflate") && (compressionLevel != -1)) { + codecFactory = CodecFactory.deflateCodec(compressionLevel); + } else if (codecName != null) { + codecFactory = CodecFactory.fromString(codecName); + } + } catch (AvroRuntimeException ex) { + throw new IllegalArgumentException("Invalid Avro data file codec", ex); + } + + OutputFormat format = new AvroDataFileOutputFormat(codecFactory); format.setBuilder(this); return format; diff --git a/src/javatest/com/cloudera/flume/handlers/avro/TestAvroDataFile.java b/src/javatest/com/cloudera/flume/handlers/avro/TestAvroDataFile.java index da92e11..1d9c993 100644 --- a/src/javatest/com/cloudera/flume/handlers/avro/TestAvroDataFile.java +++ b/src/javatest/com/cloudera/flume/handlers/avro/TestAvroDataFile.java @@ -17,7 +17,9 @@ */ package com.cloudera.flume.handlers.avro; +import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.Arrays; @@ -38,6 +40,8 @@ import com.cloudera.flume.core.EventImpl; import com.cloudera.flume.core.EventSink; import com.cloudera.flume.core.EventUtil; import com.cloudera.flume.handlers.debug.MemorySinkSource; +import com.cloudera.flume.handlers.text.FormatFactory; +import com.cloudera.flume.handlers.text.output.OutputFormat; /** * This test takes a small canned batch of events, writes them to a avro data @@ -78,6 +82,71 @@ public class TestAvroDataFile { Event expected = mem.next(); Assert.assertTrue(Arrays.equals(eout.getBody(), expected.getBody())); } + } + + /** + * This is a helper method that is a lot like the above, except that it + * directly creates the output format so that we can configure it, since + * this isn't possible via the configuration language currently. + */ + public void avroDataFileWriteReadHelper(String... args) throws IOException, + FlumeSpecException, InterruptedException { + + MemorySinkSource mem = MemorySinkSource.cannedData("test ", 5); + + // setup sink. + File f = File.createTempFile("avrodata", ".avro"); + f.deleteOnExit(); + FileOutputStream fos = new FileOutputStream(f); + LOG.info("filename before escaping: " + f.getAbsolutePath()); + OutputFormat out = FormatFactory.get().getOutputFormat("avrodata", args); + mem.open(); + Event e = mem.next(); + while (e != null) { + out.format(fos, e); + e = mem.next(); + } + + mem.open(); + DatumReader dtm = new ReflectDatumReader( + EventImpl.class); + DataFileReader dr = new DataFileReader(f, dtm); + + EventImpl eout = null; + for (Object o : dr) { + eout = (EventImpl) o; // TODO (jon) fix AVRO -- this is gross + Event expected = mem.next(); + Assert.assertTrue(Arrays.equals(eout.getBody(), expected.getBody())); + } + } + @Test + public void testAvroDataFileFormatDefault() throws IOException, + FlumeSpecException, InterruptedException { + avroDataFileWriteReadHelper(); + } + + @Test + public void testAvroDataFileFormatNullCodec() throws IOException, + FlumeSpecException, InterruptedException { + avroDataFileWriteReadHelper("null"); + } + + @Test + public void testAvroDataFileFormatDeflateCodec() throws IOException, + FlumeSpecException, InterruptedException { + avroDataFileWriteReadHelper("deflate"); + } + + @Test + public void testAvroDataFileFormatDeflateConfiguredCodec() throws IOException, + FlumeSpecException, InterruptedException { + avroDataFileWriteReadHelper("deflate", "9"); + } + + @Test(expected=IllegalArgumentException.class) + public void testAvroDataFileFormatInvalidCodec() throws IOException, + FlumeSpecException, InterruptedException { + avroDataFileWriteReadHelper("invalid"); } } -- 1.7.3.2