From 13470858224b529488928abb25783674514535d3 Mon Sep 17 00:00:00 2001 From: Bruce Mitchener Date: Thu, 17 Mar 2011 07:15:23 -0400 Subject: [PATCH] FLUME-464: Remove deprecated GZIPCompression setting. --- RELEASENOTES | 4 ++ .../cloudera/flume/conf/FlumeConfiguration.java | 6 -- .../flume/handlers/hdfs/CustomDfsSink.java | 22 -------- .../handlers/hdfs/TestEscapedCustomOutputDfs.java | 55 +------------------ 4 files changed, 7 insertions(+), 80 deletions(-) diff --git a/RELEASENOTES b/RELEASENOTES index 5c2beb9..1e201c9 100644 --- a/RELEASENOTES +++ b/RELEASENOTES @@ -6,6 +6,10 @@ These are 'tacksink' and 'trawsink'. Additionally, the old aliases 'tsink' and 'tsource' are no longer present. Use the full names 'thriftSink' and 'thriftSource' instead. +The setting 'flume.collector.dfs.compress.gzip' was deprecated in +a previous release. It has now been removed. You should use the +'flume.collector.dfs.compress.codec' setting instead. + Flume 0.9.3 =========== diff --git a/src/java/com/cloudera/flume/conf/FlumeConfiguration.java b/src/java/com/cloudera/flume/conf/FlumeConfiguration.java index b12dc58..4e6aaa9 100644 --- a/src/java/com/cloudera/flume/conf/FlumeConfiguration.java +++ b/src/java/com/cloudera/flume/conf/FlumeConfiguration.java @@ -181,7 +181,6 @@ public class FlumeConfiguration extends Configuration { public static final String COLLECTOR_DFS_DIR = "flume.collector.dfs.dir"; public static final String COLLECTOR_ROLL_MILLIS = "flume.collector.roll.millis"; public static final String COLLECTOR_OUTPUT_FORMAT = "flume.collector.output.format"; - public static final String COLLECTOR_DFS_COMPRESS_GZIP = "flume.collector.dfs.compress.gzip"; public static final String COLLECTOR_DFS_COMPRESS_CODEC = "flume.collector.dfs.compress.codec"; // TODO(henry) move these to flume.master - they now tell the master which @@ -620,11 +619,6 @@ public class FlumeConfiguration extends Configuration { return get(COLLECTOR_DFS_DIR, "file://tmp/flume-${user.name}/collected"); } - @Deprecated - public boolean getCollectorDfsCompressGzipStatus() { - return getBoolean(COLLECTOR_DFS_COMPRESS_GZIP, false); - } - public String getCollectorDfsCompressCodec() { return get(COLLECTOR_DFS_COMPRESS_CODEC, "None"); } diff --git a/src/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java b/src/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java index aa43d41..2d1f7e8 100644 --- a/src/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java +++ b/src/java/com/cloudera/flume/handlers/hdfs/CustomDfsSink.java @@ -102,28 +102,6 @@ public class CustomDfsSink extends EventSink.Base { FlumeConfiguration conf = FlumeConfiguration.get(); FileSystem hdfs; - // use v0.9.1 compression settings - if (conf.getCollectorDfsCompressGzipStatus()) { - LOG.warn("Config property " - + FlumeConfiguration.COLLECTOR_DFS_COMPRESS_GZIP - + " is deprecated, please use " - + FlumeConfiguration.COLLECTOR_DFS_COMPRESS_CODEC - + " set to GzipCodec instead"); - CompressionCodec gzipC = new GzipCodec(); - - //See Below for comments on this - if(gzipC instanceof Configurable){ - ((Configurable)gzipC).setConf(conf); - } - Compressor gzCmp = gzipC.createCompressor(); - dstPath = new Path(path + gzipC.getDefaultExtension()); - hdfs = dstPath.getFileSystem(conf); - writer = hdfs.create(dstPath); - writer = gzipC.createOutputStream(writer, gzCmp); - LOG.info("Creating HDFS gzip compressed file: " + dstPath.toString()); - return; - } - String codecName = conf.getCollectorDfsCompressCodec(); List> codecs = CompressionCodecFactory .getCodecClasses(FlumeConfiguration.get()); diff --git a/src/javatest/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java b/src/javatest/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java index 4f8606a..471bdfd 100644 --- a/src/javatest/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java +++ b/src/javatest/com/cloudera/flume/handlers/hdfs/TestEscapedCustomOutputDfs.java @@ -234,7 +234,7 @@ public class TestEscapedCustomOutputDfs { String expected = new String(exWriter.toByteArray()); // check the output to make sure it is what we expected. - // read the gzip file and verify the contents + // read the bzip2 file and verify the contents BZip2Codec bz2Codec = new BZip2Codec(); InputStream bz2in = bz2Codec.createInputStream(new FileInputStream(f .getPath() + "/sub-foo.bz2")); @@ -284,7 +284,7 @@ public class TestEscapedCustomOutputDfs { String expected = new String(exWriter.toByteArray()); // check the output to make sure it is what we expected. - // read the gzip file and verify the contents + // read the bzip2 file and verify the contents BZip2Codec bz2Codec = new BZip2Codec(); InputStream bz2in = bz2Codec.createInputStream(new FileInputStream(f .getPath() + "/sub-foo.bz2")); @@ -301,54 +301,6 @@ public class TestEscapedCustomOutputDfs { } /** - * Test to write few log lines, compress using gzip, write to disk, read back - * the compressed file and verify the written lines. - * - * @throws IOException - * @throws InterruptedException - */ - @Test - public void testGzipOutputFormat() throws IOException, InterruptedException { - // set the output format. - FlumeConfiguration conf = FlumeConfiguration.get(); - conf.set(FlumeConfiguration.COLLECTOR_OUTPUT_FORMAT, "syslog"); - conf.set(FlumeConfiguration.COLLECTOR_DFS_COMPRESS_GZIP, "true"); - - // build a sink that outputs to that format. - File f = FileUtil.mktempdir(); - SinkBuilder builder = EscapedCustomDfsSink.builder(); - EventSink snk = builder.build(new Context(), "file:///" + f.getPath() - + "/sub-%{service}"); - Event e = new EventImpl("this is a test message".getBytes()); - Attributes.setString(e, "service", "foo"); - snk.open(); - snk.append(e); - snk.close(); - - ByteArrayOutputStream exWriter = new ByteArrayOutputStream(); - SyslogEntryFormat fmt = new SyslogEntryFormat(); - fmt.format(exWriter, e); - exWriter.close(); - String expected = new String(exWriter.toByteArray()); - - // check the output to make sure it is what we expected. - // read the gzip file and verify the contents - - GZIPInputStream gzin = new GZIPInputStream(new FileInputStream(f.getPath() - + "/sub-foo.gz")); - byte[] buf = new byte[1]; - StringBuilder output = new StringBuilder(); - - while ((gzin.read(buf)) > 0) { - output.append(new String(buf)); - } - gzin.close();// Must close for windows to delete - assertEquals(expected, output.toString()); - - assertTrue("temp folder successfully deleted", FileUtil.rmr(f)); - } - - /** * Test to write few log lines, compress using default, write to disk, read * back the compressed file and verify the written lines. * @@ -361,7 +313,6 @@ public class TestEscapedCustomOutputDfs { FlumeConfiguration conf = FlumeConfiguration.get(); conf.set(FlumeConfiguration.COLLECTOR_OUTPUT_FORMAT, "syslog"); conf.set(FlumeConfiguration.COLLECTOR_DFS_COMPRESS_CODEC, "DefaultCodec"); - conf.set(FlumeConfiguration.COLLECTOR_DFS_COMPRESS_GZIP, "false"); // build a sink that outputs to that format. File f = FileUtil.mktempdir(); @@ -381,7 +332,7 @@ public class TestEscapedCustomOutputDfs { String expected = new String(exWriter.toByteArray()); // check the output to make sure it is what we expected. - // read the gzip file and verify the contents + // read the file and verify the contents DefaultCodec defaultCodec = new DefaultCodec(); defaultCodec.setConf(conf); InputStream defaultIn = defaultCodec.createInputStream(new FileInputStream( -- 1.7.3.2