From 7b95fe21da85509623f337a4c60e0caa17dfa524 Mon Sep 17 00:00:00 2001 From: Cameron Gandevia Date: Sun, 13 Feb 2011 11:19:20 -0800 Subject: [PATCH] FLUME-519 -Moved FlumeNode.loadOutputFormatPlugins to FormatFactory --- src/java/com/cloudera/flume/agent/FlumeNode.java | 44 +----------- .../flume/handlers/text/FormatFactory.java | 42 +++++++++++ .../com/cloudera/flume/master/FlumeMaster.java | 3 +- .../com/cloudera/flume/agent/TestFlumeNode.java | 74 -------------------- 4 files changed, 46 insertions(+), 117 deletions(-) diff --git a/src/java/com/cloudera/flume/agent/FlumeNode.java b/src/java/com/cloudera/flume/agent/FlumeNode.java index 24c9658..b9ebe5e 100644 --- a/src/java/com/cloudera/flume/agent/FlumeNode.java +++ b/src/java/com/cloudera/flume/agent/FlumeNode.java @@ -23,8 +23,8 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import java.util.Map.Entry; +import java.util.Properties; import javax.ws.rs.core.Application; @@ -53,7 +53,6 @@ import com.cloudera.flume.handlers.debug.ChokeManager; import com.cloudera.flume.handlers.endtoend.AckListener; import com.cloudera.flume.handlers.endtoend.CollectorAckListener; import com.cloudera.flume.handlers.text.FormatFactory; -import com.cloudera.flume.handlers.text.FormatFactory.OutputFormatBuilder; import com.cloudera.flume.reporter.MasterReportPusher; import com.cloudera.flume.reporter.NodeReportResource; import com.cloudera.flume.reporter.ReportEvent; @@ -327,45 +326,6 @@ public class FlumeNode implements Reportable { } /** - * Load output format plugins specified by - * {@link FlumeConfiguration#OUTPUT_FORMAT_PLUGIN_CLASSES}. Invalid plugins - * are discarded from the list with errors logged. - */ - public static void loadOutputFormatPlugins() { - String outputFormatPluginClasses = FlumeConfiguration.get().get( - FlumeConfiguration.OUTPUT_FORMAT_PLUGIN_CLASSES, ""); - String[] classes = outputFormatPluginClasses.split(",\\s*"); - - for (String className : classes) { - try { - Class cls = Class.forName(className); - if (OutputFormatBuilder.class.isAssignableFrom(cls)) { - OutputFormatBuilder builder = (OutputFormatBuilder) cls.newInstance(); - - FormatFactory.get().registerFormat(builder.getName(), builder); - - LOG.info("Registered output format plugin " + className); - } else { - LOG.warn("Ignoring output format plugin class " + className - + " - Does not subclass OutputFormatBuilder"); - } - } catch (ClassNotFoundException e) { - LOG.warn("Unable to load output format plugin class " + className - + " - Class not found"); - } catch (FlumeSpecException e) { - LOG.warn("Unable to load output format plugin class " + className - + " - Flume spec exception follows.", e); - } catch (InstantiationException e) { - LOG.warn("Unable to load output format plugin class " + className - + " - Unable to instantiate class.", e); - } catch (IllegalAccessException e) { - LOG.warn("Unable to load output format plugin class " + className - + " - Access violation.", e); - } - } - } - - /** * This also implements the Apache Commons Daemon interface's init */ public void init(String[] args) { @@ -541,7 +501,7 @@ public class FlumeNode implements Reportable { oneshot = true; } - loadOutputFormatPlugins(); + FormatFactory.loadOutputFormatPlugins(); // Instantiate the flume node. FlumeConfiguration conf = FlumeConfiguration.get(); diff --git a/src/java/com/cloudera/flume/handlers/text/FormatFactory.java b/src/java/com/cloudera/flume/handlers/text/FormatFactory.java index e1355e9..ad0eb00 100644 --- a/src/java/com/cloudera/flume/handlers/text/FormatFactory.java +++ b/src/java/com/cloudera/flume/handlers/text/FormatFactory.java @@ -27,6 +27,7 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.cloudera.flume.conf.FlumeConfiguration; import com.cloudera.flume.conf.FlumeSpecException; import com.cloudera.flume.handlers.avro.AvroDataFileOutputFormat; import com.cloudera.flume.handlers.avro.AvroJsonOutputFormat; @@ -47,6 +48,9 @@ import com.cloudera.flume.handlers.text.output.RawOutputFormat; @SuppressWarnings("serial") public class FormatFactory { + private static final Logger LOG = LoggerFactory + .getLogger(FormatFactory.class); + public abstract static class OutputFormatBuilder { public abstract OutputFormat build(String... args); @@ -212,4 +216,42 @@ public class FormatFactory { return false; } + /** + * Load output format plugins specified by + * {@link FlumeConfiguration#OUTPUT_FORMAT_PLUGIN_CLASSES}. Invalid plugins + * are discarded from the list with errors logged. + */ + public static void loadOutputFormatPlugins() { + String outputFormatPluginClasses = FlumeConfiguration.get().get( + FlumeConfiguration.OUTPUT_FORMAT_PLUGIN_CLASSES, ""); + String[] classes = outputFormatPluginClasses.split(",\\s*"); + + for (String className : classes) { + try { + Class cls = Class.forName(className); + if (OutputFormatBuilder.class.isAssignableFrom(cls)) { + OutputFormatBuilder builder = (OutputFormatBuilder) cls.newInstance(); + + FormatFactory.get().registerFormat(builder.getName(), builder); + + LOG.info("Registered output format plugin " + className); + } else { + LOG.warn("Ignoring output format plugin class " + className + + " - Does not subclass OutputFormatBuilder"); + } + } catch (ClassNotFoundException e) { + LOG.warn("Unable to load output format plugin class " + className + + " - Class not found"); + } catch (FlumeSpecException e) { + LOG.warn("Unable to load output format plugin class " + className + + " - Flume spec exception follows.", e); + } catch (InstantiationException e) { + LOG.warn("Unable to load output format plugin class " + className + + " - Unable to instantiate class.", e); + } catch (IllegalAccessException e) { + LOG.warn("Unable to load output format plugin class " + className + + " - Access violation.", e); + } + } + } } diff --git a/src/java/com/cloudera/flume/master/FlumeMaster.java b/src/java/com/cloudera/flume/master/FlumeMaster.java index c286a43..70d8f4d 100644 --- a/src/java/com/cloudera/flume/master/FlumeMaster.java +++ b/src/java/com/cloudera/flume/master/FlumeMaster.java @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import com.cloudera.flume.agent.FlumeNode; import com.cloudera.flume.conf.FlumeConfiguration; +import com.cloudera.flume.handlers.text.FormatFactory; import com.cloudera.flume.master.flows.FlowConfigManager; import com.cloudera.flume.master.logical.LogicalConfigurationManager; import com.cloudera.flume.reporter.ReportEvent; @@ -506,7 +507,7 @@ public class FlumeMaster implements Reportable { System.exit(1); } - FlumeNode.loadOutputFormatPlugins(); + FormatFactory.loadOutputFormatPlugins(); String nodeconfig = FlumeConfiguration.get().getMasterSavefile(); diff --git a/src/javatest/com/cloudera/flume/agent/TestFlumeNode.java b/src/javatest/com/cloudera/flume/agent/TestFlumeNode.java index 8e7defd..fbb17bf 100644 --- a/src/javatest/com/cloudera/flume/agent/TestFlumeNode.java +++ b/src/javatest/com/cloudera/flume/agent/TestFlumeNode.java @@ -31,7 +31,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.thrift.transport.TTransportException; -import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,16 +41,12 @@ import com.cloudera.flume.conf.Context; import com.cloudera.flume.conf.FlumeBuilder; import com.cloudera.flume.conf.FlumeConfigData; import com.cloudera.flume.conf.FlumeConfiguration; -import com.cloudera.flume.conf.FlumeSpecException; import com.cloudera.flume.conf.SourceFactory; import com.cloudera.flume.core.Event; import com.cloudera.flume.core.EventImpl; import com.cloudera.flume.core.EventSource; import com.cloudera.flume.handlers.debug.NullSink; import com.cloudera.flume.handlers.syslog.SyslogTcpSourceThreads; -import com.cloudera.flume.handlers.text.FormatFactory; -import com.cloudera.flume.handlers.text.FormatFactory.OutputFormatBuilder; -import com.cloudera.flume.handlers.text.output.OutputFormat; import com.cloudera.flume.master.CommandManager; import com.cloudera.flume.master.ConfigManager; import com.cloudera.flume.master.ConfigStore; @@ -342,73 +337,4 @@ public class TestFlumeNode { assertTrue(dfoDir.isDirectory()); FileUtil.rmr(tmpdir); } - - /** - * Test the output format plugin loading mechanism. We additionally test that - * the builtin output formats continue to work. - */ - @Test - public void testOutputFormatPluginLoader() { - FlumeConfiguration.get().set( - FlumeConfiguration.OUTPUT_FORMAT_PLUGIN_CLASSES, "java.lang.String"); - FlumeNode.loadOutputFormatPlugins(); - - try { - Assert.assertNotNull(FlumeBuilder.buildSink(new Context(), - "console(\"raw\")")); - Assert.assertNotNull(FlumeBuilder.buildSink(new Context(), - "console(\"avrojson\")")); - Assert.assertNotNull(FlumeBuilder.buildSink(new Context(), - "console(\"avrodata\")")); - Assert.assertNotNull(FlumeBuilder.buildSink(new Context(), - "console(\"syslog\")")); - Assert.assertNotNull(FlumeBuilder.buildSink(new Context(), - "console(\"log4j\")")); - Assert.assertNotNull(FlumeBuilder.buildSink(new Context(), "console()")); - } catch (FlumeSpecException e) { - LOG - .error( - "Unable to create a console sink with one of the built in output formats. Exception follows.", - e); - Assert - .fail("Unable to create a console sink with one of the built in output formats - " - + e.getMessage()); - } - } - - @Test - public void testCustomOutputPluginLoader() { - FlumeConfiguration.get().set( - FlumeConfiguration.OUTPUT_FORMAT_PLUGIN_CLASSES, - "com.cloudera.flume.agent.TestFlumeNode$TestOutputFormatPlugin"); - FlumeNode.loadOutputFormatPlugins(); - - try { - FlumeBuilder.buildSink(new Context(), "console(\"testformat\")"); - } catch (FlumeSpecException e) { - LOG - .error( - "Caught exception building console sink with testformat output format. Exception follows.", - e); - Assert - .fail("Unable to build a console sink with testformat output format"); - } - - /* Manually reset the registered plugins as best we can. */ - Assert.assertTrue(FormatFactory.get().unregisterFormat("testformat")); - } - - public static class TestOutputFormatPlugin extends OutputFormatBuilder { - - @Override - public OutputFormat build(String... args) { - // Do nothing. - return null; - } - - public String getName() { - return "testformat"; - } - - } } -- 1.7.3.2