From f75a16aed2caf5c11da97a430bab45ebefb3f488 Mon Sep 17 00:00:00 2001 From: Marc Sturlese Date: Fri, 22 Apr 2011 21:38:31 +0200 Subject: [PATCH] I have written a code to be able to write to hdfs in this style: roll(1000) [ escapedCustomDfs("hdfs://namenode/flume/file-%{rolltag}") ] But writing each event body as value in a SequenceFile (and NullWritable as key). There is a seqFileCollectorSink which works as the collectorSink but invoques escapedThriftSeqfileDfsSink.# I grant license to Cloudera for inclusion in Cloudera works (as per the Apache License ยง5) --- .../flume/collector/SeqfileCollectorSink.java | 324 ++++++++++++++++++++ .../handlers/hdfs/EscapedThriftSeqfileDfsSink.java | 154 ++++++++++ .../flume/handlers/hdfs/ThriftSeqfileDfsSink.java | 137 +++++++++ 3 files changed, 615 insertions(+), 0 deletions(-) create mode 100644 flume-core/src/main/java/com/cloudera/flume/collector/SeqfileCollectorSink.java create mode 100644 flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedThriftSeqfileDfsSink.java create mode 100644 flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/ThriftSeqfileDfsSink.java diff --git a/flume-core/src/main/java/com/cloudera/flume/collector/SeqfileCollectorSink.java b/flume-core/src/main/java/com/cloudera/flume/collector/SeqfileCollectorSink.java new file mode 100644 index 0000000..06152c8 --- /dev/null +++ b/flume-core/src/main/java/com/cloudera/flume/collector/SeqfileCollectorSink.java @@ -0,0 +1,324 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.flume.collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringEscapeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.cloudera.flume.agent.FlumeNode; +import com.cloudera.flume.conf.Context; +import com.cloudera.flume.conf.FlumeConfiguration; +import com.cloudera.flume.conf.FlumeSpecException; +import com.cloudera.flume.conf.SinkFactory.SinkBuilder; +import com.cloudera.flume.core.CompositeSink; +import com.cloudera.flume.core.Event; +import com.cloudera.flume.core.EventSink; +import com.cloudera.flume.core.EventSinkDecorator; +import com.cloudera.flume.core.MaskDecorator; +import com.cloudera.flume.handlers.debug.InsistentAppendDecorator; +import com.cloudera.flume.handlers.debug.InsistentOpenDecorator; +import com.cloudera.flume.handlers.debug.StubbornAppendSink; +import com.cloudera.flume.handlers.endtoend.AckChecksumChecker; +import com.cloudera.flume.handlers.endtoend.AckListener; +import com.cloudera.flume.handlers.rolling.ProcessTagger; +import com.cloudera.flume.handlers.rolling.RollSink; +import com.cloudera.flume.handlers.rolling.Tagger; +import com.cloudera.flume.handlers.rolling.TimeTrigger; +import com.cloudera.flume.reporter.ReportEvent; +import com.cloudera.flume.reporter.Reportable; +import com.cloudera.util.BackoffPolicy; +import com.cloudera.util.CumulativeCappedExponentialBackoff; +import com.cloudera.util.Pair; +import com.google.common.base.Preconditions; +import java.util.List; + +/** + * This collector sink is the high level specification a user would use. The + * subsink spec allows for specifying batching, gunzip, multiple sinks or + * whatever else a user would want to specify. The key feature with any sub sink + * specified of this is that it is part of a roller, and that any accrued acks + * are send when this roller closes the subsink. + */ +public class SeqfileCollectorSink extends EventSink.Base { + + static final Logger LOG = LoggerFactory.getLogger(SeqfileCollectorSink.class); + final EventSink snk; + AckAccumulator accum = new AckAccumulator(); + final AckListener ackDest; + final String snkSpec; + // This is a container for acks that should be ready for delivery when the + // hdfs sink is closed/flushed + Set rollAckSet = new HashSet(); + // References package exposed for testing + final RollSink roller; + + SeqfileCollectorSink(Context ctx, String snkSpec, long millis, AckListener ackDest) + throws FlumeSpecException { + this(ctx, snkSpec, millis, new ProcessTagger(), 250, ackDest); + } + + SeqfileCollectorSink(Context ctx, final String snkSpec, final long millis, + final Tagger tagger, long checkmillis, AckListener ackDest) { + this.ackDest = ackDest; + this.snkSpec = snkSpec; + roller = new RollSink(ctx, snkSpec, new TimeTrigger(tagger, millis), + checkmillis) { + // this is wraps the normal roll sink with an extra roll detection + // decorator that triggers ack delivery on close. + + @Override + public EventSink newSink(Context ctx) throws IOException { + String tag = tagger.newTag(); + EventSink drain; + try { + drain = new CompositeSink(ctx, snkSpec); + } catch (FlumeSpecException e) { + throw new IOException("Unable to instantiate '" + snkSpec + "'", e); + } + return new ThriftSeqfileRollDetectDeco(drain, tag); + } + }; + + long initMs = FlumeConfiguration.get().getInsistentOpenInitBackoff(); + long cumulativeMaxMs = FlumeConfiguration.get().getFailoverMaxCumulativeBackoff(); + long maxMs = FlumeConfiguration.get().getFailoverMaxSingleBackoff(); + BackoffPolicy backoff1 = new CumulativeCappedExponentialBackoff(initMs, + maxMs, cumulativeMaxMs); + BackoffPolicy backoff2 = new CumulativeCappedExponentialBackoff(initMs, + maxMs, cumulativeMaxMs); + + // the collector snk has ack checking logic, retry and reopen logic, and + // needs an extra mask before rolling, writing to disk and forwarding acks + // (roll detect). + + // { ackChecksumChecker => insistentAppend => stubbornAppend => + // insistentOpen => mask("rolltag") => roll(xx) { rollDetect => + // subsink } } + EventSink tmp = new MaskDecorator(roller, "rolltag"); + tmp = new InsistentOpenDecorator(tmp, backoff1); + tmp = new StubbornAppendSink(tmp); + tmp = new InsistentAppendDecorator(tmp, backoff2); + snk = new AckChecksumChecker(tmp, accum); + } + + /** + * This is a compatibility mode for older version of the tests + */ + SeqfileCollectorSink(Context ctx, String path, String filename, long millis, + final Tagger tagger, long checkmillis, AckListener ackDest) + throws FlumeSpecException { + this(ctx, "escapedThriftSeqfileDfs(\"" + StringEscapeUtils.escapeJava(path) + + "\",\"" + StringEscapeUtils.escapeJava(filename) + "%{rolltag}" + + "\" )", millis, tagger, checkmillis, ackDest); + } + String curRollTag; + + /** + * This is a helper class that wraps the body of the collector sink, so that + * and gives notifications when a roll hash happened. Because only close has + * sane flushing semantics in hdfs <= v0.20.x we need to collect acks, data is + * safe only after a close on the hdfs file happens. + */ + class ThriftSeqfileRollDetectDeco extends EventSinkDecorator { + + String tag; + + public ThriftSeqfileRollDetectDeco(EventSink s, String tag) { + super(s); + this.tag = tag; + } + + public void open() throws IOException, InterruptedException { + // set the collector's current tag to curRollTAg. + LOG.debug("opening roll detect deco {}", tag); + curRollTag = tag; + super.open(); + LOG.debug("opened roll detect deco {}", tag); + } + + @Override + public void close() throws IOException, InterruptedException { + LOG.debug("closing roll detect deco {}", tag); + super.close(); + flushRollAcks(); + LOG.debug("closed roll detect deco {}", tag); + } + + void flushRollAcks() throws IOException { + AckListener master = ackDest; + Collection acktags; + synchronized (rollAckSet) { + acktags = new ArrayList(rollAckSet); + rollAckSet.clear(); + LOG.debug("Roll closed, pushing acks for " + tag + " :: " + acktags); + } + + for (String at : acktags) { + master.end(at); + } + } + }; + + /** + * This accumulates ack tags in rollAckMap so that they can be pushed to the + * master when the the hdfs file associated with the rolltag is closed. + */ + class AckAccumulator implements AckListener { + + @Override + public void end(String group) throws IOException { + synchronized (rollAckSet) { + LOG.debug("Adding to acktag {} to rolltag {}", group, curRollTag); + rollAckSet.add(group); + LOG.debug("Current rolltag acktag mapping: {}", rollAckSet); + } + } + + @Override + public void err(String group) throws IOException { + } + + @Override + public void expired(String key) throws IOException { + } + + @Override + public void start(String group) throws IOException { + } + }; + + @Override + public void append(Event e) throws IOException, InterruptedException { + snk.append(e); + super.append(e); + } + + @Override + public void close() throws IOException, InterruptedException { + snk.close(); + } + + @Override + public void open() throws IOException, InterruptedException { + snk.open(); + } + + @Override + public String getName() { + return "SeqfileCollector"; + } + + @Override + public ReportEvent getMetrics() { + ReportEvent rpt = new ReportEvent(getName()); + return rpt; + } + + @Override + public Map getSubMetrics() { + Map map = new HashMap(); + map.put(snk.getName(), snk); + return map; + } + + @Deprecated + @Override + public void getReports(String namePrefix, Map reports) { + super.getReports(namePrefix, reports); + snk.getReports(namePrefix + getName() + ".", reports); + } + + public EventSink getSink() { + return snk; + } + + public static SinkBuilder builder() { + return new SinkBuilder() { + + @Override + public EventSink build(Context context, String... argv) { + Preconditions.checkArgument(argv.length <= 2 && argv.length >= 0, + "usage: seqfileCollector[(rollmillis)] { subsink }"); + String snkSpec = argv[0]; + + long millis = FlumeConfiguration.get().getCollectorRollMillis(); + if (argv.length >= 2) { + millis = Long.parseLong(argv[1]); + } + try { + EventSink deco = new SeqfileCollectorSink(context, snkSpec, millis, + FlumeNode.getInstance().getCollectorAckListener()); + return deco; + } catch (FlumeSpecException e) { + LOG.error("SeqfileCollectorDecorator spec error " + e, e); + throw new IllegalArgumentException( + "usage: seqfileCollector[(rollmillis)] { subsink }" + e); + } + } + }; + } + + public static SinkBuilder hdfsBuilder() { + return new SinkBuilder() { + + @Override + public EventSink build(Context context, String... argv) { + Preconditions.checkArgument(argv.length <= 3 && argv.length >= 2, + "usage: seqfileCollectorSink[(dfsdir,path[,rollmillis])]"); + String logdir = FlumeConfiguration.get().getCollectorDfsDir(); // default + long millis = FlumeConfiguration.get().getCollectorRollMillis(); + String prefix = ""; + if (argv.length >= 1) { + logdir = argv[0]; // override + } + if (argv.length >= 2) { + prefix = argv[1]; + } + if (argv.length >= 3) { + millis = Long.parseLong(argv[2]); + } + try { + EventSink snk = new SeqfileCollectorSink(context, logdir, prefix, millis, + new ProcessTagger(), 250, FlumeNode.getInstance().getCollectorAckListener()); + return snk; + } catch (FlumeSpecException e) { + LOG.error("SeqfileCollectorSink spec error " + e, e); + throw new IllegalArgumentException( + "usage: seqfileCollectorSink[(dfsdir,path[,rollmillis])]" + e); + } + } + }; + } + + public static List> getSinkBuilders() { + List> builders = + new ArrayList>(); + builders.add(new Pair("seqfileCollectorSink", hdfsBuilder())); + builders.add(new Pair("seqfileCollector", builder())); + return builders; + } +} diff --git a/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedThriftSeqfileDfsSink.java b/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedThriftSeqfileDfsSink.java new file mode 100644 index 0000000..e5f1d52 --- /dev/null +++ b/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/EscapedThriftSeqfileDfsSink.java @@ -0,0 +1,154 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.flume.handlers.hdfs; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +import com.cloudera.flume.conf.Context; +import com.cloudera.flume.conf.SinkFactory.SinkBuilder; +import com.cloudera.flume.core.Event; +import com.cloudera.flume.core.EventSink; +import com.cloudera.util.Pair; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; + +/** + * Writes events the a file give a hadoop uri path. If no uri is specified It + * defaults to the set by the given configured by fs.default.name config + * variable. The user can specify an output format for the file. If none is + * specified the default set by flume.collector.outputformat in the + * flumeconfiguration file is used. + * + * TODO (jon) eventually, the CustomDfsSink should be replaced with just a + * EventSink. + * + * TODO (jon) this is gross, please deprecate me. + */ +public class EscapedThriftSeqfileDfsSink extends EventSink.Base { + final static Logger LOG = Logger.getLogger(EscapedThriftSeqfileDfsSink.class + .getName()); + final String path; + + ThriftSeqfileDfsSink writer; + + // We keep a - potentially unbounded - set of writers around to deal with + // different tags on events. Therefore this feature should be used with some + // care (where the set of possible paths is small) until we do something + // more sensible with resource management. + final Map sfWriters = new HashMap(); + + // Used to short-circuit around doing regex matches when we know there are + // no templates to be replaced. + boolean shouldSub = true; + private String filename = ""; + protected String absolutePath = ""; + + public EscapedThriftSeqfileDfsSink(String path, String filename) { + this.path = path; + this.filename = filename; + shouldSub = Event.containsTag(path) || Event.containsTag(filename); + absolutePath = path; + if (filename != null && filename.length() > 0) { + if (!absolutePath.endsWith(Path.SEPARATOR)) { + absolutePath += Path.SEPARATOR; + } + absolutePath += this.filename; + } + } + + protected ThriftSeqfileDfsSink openWriter(String p) throws IOException { + LOG.info("Opening " + p); + ThriftSeqfileDfsSink w = new ThriftSeqfileDfsSink(p); + w.open(); + return w; + } + + /** + * Writes the message to an HDFS file whose path is substituted with tags + * drawn from the supplied event + */ + @Override + public void append(Event e) throws IOException, InterruptedException { + ThriftSeqfileDfsSink w = writer; + if (shouldSub) { + String realPath = e.escapeString(absolutePath); + w = sfWriters.get(realPath); + if (w == null) { + w = openWriter(realPath); + sfWriters.put(realPath, w); + } + } + w.append(e); + super.append(e); + } + + @Override + public void close() throws IOException { + if (shouldSub) { + for (Entry e : sfWriters.entrySet()) { + LOG.info("Closing " + e.getKey()); + e.getValue().close(); + } + } else { + LOG.info("Closing " + absolutePath); + if (writer == null) { + LOG.warn("EscapedThriftSeqfileDfsSink's Writer for '" + absolutePath + + "' was already closed!"); + return; + } + writer.close(); + writer = null; + } + } + + @Override + public void open() throws IOException { + if (!shouldSub) { + writer = openWriter(absolutePath); + } + } + + public static SinkBuilder builder() { + return new SinkBuilder() { + @Override + public EventSink build(Context context, String... args) { + Preconditions.checkArgument(args.length >= 1 && args.length <=2, + "usage: escapedThriftSeqfileDfs(\"[(hdfs|file|s3n|...)://namenode[:port]]/path\"" + "[, file[,reflectionThriftClass ]])"); + String filename = ""; + if (args.length >= 2) { + filename = args[1]; + } + return new EscapedThriftSeqfileDfsSink(args[0], filename); + } + }; + } + + public static List> getSinkBuilders() { + List> builders = + new ArrayList>(); + builders.add(new Pair("escapedThriftSeqfileDfs", builder())); + return builders; + } +} diff --git a/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/ThriftSeqfileDfsSink.java b/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/ThriftSeqfileDfsSink.java new file mode 100644 index 0000000..c37e76d --- /dev/null +++ b/flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/ThriftSeqfileDfsSink.java @@ -0,0 +1,137 @@ +/** + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.flume.handlers.hdfs; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +import com.cloudera.flume.conf.Context; +import com.cloudera.flume.conf.FlumeConfiguration; +import com.cloudera.flume.conf.SinkFactory.SinkBuilder; +import com.cloudera.flume.core.Event; +import com.cloudera.flume.core.EventSink; +import com.cloudera.flume.reporter.ReportEvent; +import com.cloudera.util.Pair; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Writer; + +/** + * This creates a raw hadoop dfs file that outputs data formatted by the + * provided OutputFormat. It is assumed that the output is a file of some sort. + */ +public class ThriftSeqfileDfsSink extends EventSink.Base { + + final static Logger LOG = Logger.getLogger(ThriftSeqfileDfsSink.class.getName()); + Writer writer; + AtomicLong count = new AtomicLong(); + String path; + Path dstPath; + BytesWritable bytesObject; + boolean firstTime; + FileSystem hdfs; + FlumeConfiguration conf; + + public ThriftSeqfileDfsSink(String path) { + Preconditions.checkArgument(path != null); + this.path = path; + this.writer = null; + this.bytesObject = new BytesWritable(); + } + + @Override + public void append(Event e) throws IOException, InterruptedException { + if (writer == null) { + throw new IOException("Append failed, did you open the writer?"); + } + bytesObject.set(e.getBody(), 0, e.getBody().length); + if (bytesObject != null && e.getBody().length != 0) { + writer.append(NullWritable.get(), bytesObject); + } + count.getAndIncrement(); + super.append(e); + } + + @Override + public void close() throws IOException { + LOG.info("Closing HDFS file: " + dstPath); + LOG.info("done writing custom trovit file to hdfs"); + if (writer != null){ + writer.close(); + } + writer = null; + } + + @Override + public void open() throws IOException { + conf = FlumeConfiguration.get(); + dstPath = new Path(path); + hdfs = dstPath.getFileSystem(conf); + writer = SequenceFile.createWriter(hdfs, conf, dstPath, NullWritable.class, BytesWritable.class); + LOG.info("Opening HDFS file: " + dstPath.toString()); + } + + public static SinkBuilder builder() { + return new SinkBuilder() { + + @Override + public EventSink build(Context context, String... args) { + if (args.length != 1) { + throw new IllegalArgumentException( + "usage: customdfs(\"[(hdfs|file|s3n|...)://namenode[:port]]/path\")"); + } + return new ThriftSeqfileDfsSink(args[0]); + } + }; + } + + @Override + public String getName() { + return "thriftSeqfileDfsSink"; + } + + @Override + public ReportEvent getMetrics() { + ReportEvent rpt = super.getMetrics(); + rpt.setLongMetric(ReportEvent.A_COUNT, count.get()); + return rpt; + } + + @Override + public ReportEvent getReport() { + ReportEvent rpt = super.getReport(); + rpt.setLongMetric(ReportEvent.A_COUNT, count.get()); + return rpt; + } + + public static List> getSinkBuilders() { + List> builders = + new ArrayList>(); + builders.add(new Pair("thriftSeqfileDfsSink", builder())); + return builders; + } +} -- 1.7.1