From 63a0f9aad8307a4b8258334bebbe3d5975efa495 Mon Sep 17 00:00:00 2001 From: Bruce Mitchener Date: Wed, 16 Mar 2011 20:24:12 -0400 Subject: [PATCH] FLUME-427: Make codec compression setting be case-independent. --- .../flume/handlers/hdfs/CustomDfsSink.java | 4 +- .../handlers/hdfs/TestEscapedCustomOutputDfs.java | 52 +++++++++++++++++++- 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/src/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java b/src/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java index 27bf915..aa43d41 100644 --- a/src/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java +++ b/src/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java @@ -134,7 +134,7 @@ public class CustomDfsSink extends EventSink.Base { for (Class cls : codecs) { codecStrs.add(cls.getSimpleName()); - if (cls.getSimpleName().equals(codecName)) { + if (cls.getSimpleName().equalsIgnoreCase(codecName)) { try { codec = cls.newInstance(); } catch (InstantiationException e) { @@ -146,7 +146,7 @@ public class CustomDfsSink extends EventSink.Base { } if (codec == null) { - if (!codecName.equals("None")) { + if (!codecName.equalsIgnoreCase("None")) { LOG.warn("Unsupported compression codec " + codecName + ". Please choose from: " + codecStrs); } diff --git a/src/javatest/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java b/src/javatest/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java index a19385a..7a10556 100644 --- a/src/javatest/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java +++ b/src/javatest/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java @@ -203,7 +203,7 @@ public class TestEscapedCustomOutputDfs { } /** - * Test to write few log lines, compress using gzip, write to disk, read back + * Test to write few log lines, compress using bzip2, write to disk, read back * the compressed file and verify the written lines. * * @throws IOException @@ -251,6 +251,56 @@ public class TestEscapedCustomOutputDfs { } /** + * Test to write few log lines, compress using bzip2, write to disk, read back + * the compressed file and verify the written lines. + * + * This test uses the wrong case for the codec name. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testBZip2CodecWrongCase() throws IOException, InterruptedException { + // set the output format. + FlumeConfiguration conf = FlumeConfiguration.get(); + conf.set(FlumeConfiguration.COLLECTOR_OUTPUT_FORMAT, "syslog"); + conf.set(FlumeConfiguration.COLLECTOR_DFS_COMPRESS_CODEC, "bzip2Codec"); + + // build a sink that outputs to that format. + File f = FileUtil.mktempdir(); + SinkBuilder builder = EscapedCustomDfsSink.builder(); + EventSink snk = builder.build(new Context(), "file:///" + f.getPath() + + "/sub-%{service}"); + Event e = new EventImpl("this is a test message".getBytes()); + Attributes.setString(e, "service", "foo"); + snk.open(); + snk.append(e); + snk.close(); + + ByteArrayOutputStream exWriter = new ByteArrayOutputStream(); + SyslogEntryFormat fmt = new SyslogEntryFormat(); + fmt.format(exWriter, e); + exWriter.close(); + String expected = new String(exWriter.toByteArray()); + + // check the output to make sure it is what we expected. + // read the gzip file and verify the contents + BZip2Codec bz2Codec = new BZip2Codec(); + InputStream bz2in = bz2Codec.createInputStream(new FileInputStream(f + .getPath() + "/sub-foo.bz2")); + byte[] buf = new byte[1]; + StringBuilder output = new StringBuilder(); + + while ((bz2in.read(buf)) > 0) { + output.append(new String(buf)); + } + bz2in.close(); // Must close for windows to delete + assertEquals(expected, output.toString()); + + assertTrue("temp folder successfully deleted", FileUtil.rmr(f)); + } + + /** * Test to write few log lines, compress using gzip, write to disk, read back * the compressed file and verify the written lines. * -- 1.7.3.2