From d7f031b69ac94bbbf70355a92b86a0cad8c63d60 Mon Sep 17 00:00:00 2001 From: Bruce Mitchener Date: Tue, 15 Feb 2011 15:57:33 +0700 Subject: [PATCH] FLUME-521: Remove deprecated Thrift functionality and names. --- src/docs/UserGuide/Advanced | 4 +- src/docs/UserGuide/Plugins | 8 +- .../cloudera/flume/agent/AgentFailChainSink.java | 4 +- .../com/cloudera/flume/conf/SinkFactoryImpl.java | 7 +- .../com/cloudera/flume/conf/SourceFactoryImpl.java | 4 +- .../handlers/thrift/ThriftAckedEventSink.java | 141 -------------------- .../thrift/ThriftFlumeEventServerImpl.java | 27 ---- .../flume/handlers/thrift/ThriftRawEventSink.java | 88 ------------ .../com/cloudera/flume/conf/TestFlumeSpecGen.java | 4 +- .../flume/handlers/thrift/TestThriftSinks.java | 55 -------- src/thrift/flume.thrift | 9 +- 11 files changed, 12 insertions(+), 339 deletions(-) delete mode 100644 src/java/com/cloudera/flume/handlers/thrift/ThriftAckedEventSink.java delete mode 100644 src/java/com/cloudera/flume/handlers/thrift/ThriftRawEventSink.java diff --git a/src/docs/UserGuide/Advanced b/src/docs/UserGuide/Advanced index ed9d680..06ac257 100644 --- a/src/docs/UserGuide/Advanced +++ b/src/docs/UserGuide/Advanced @@ -357,8 +357,8 @@ machine lives in multiple tiers. ==== Arbitrary Data Flows and Custom Architectures -With tsinks and tsource, data can be sent through multiple nodes. If ack -injection and ack checking decorators are properly inserted, you can achieve +With thriftSink and thriftSource, data can be sent through multiple nodes. If +ack injection and ack checking decorators are properly inserted, you can achieve reliability. diff --git a/src/docs/UserGuide/Plugins b/src/docs/UserGuide/Plugins index 5b6ffa9..87a5867 100644 --- a/src/docs/UserGuide/Plugins +++ b/src/docs/UserGuide/Plugins @@ -593,19 +593,15 @@ mechanisms (similar to the output format mechanism). prioritized by priority (higher priority first) and then age (older messages first) -+tsource(_port_)+ :: Same as RPC but specifically thrift RPC ++thriftSource(_port_)+ :: Same as RPC but specifically thrift RPC .Flume's Deprecated Sinks Many of these sinks are left over from earlier iterations. [horizontal] -+tsink+ :: -+tacksink(...)+ :: thrift acked sink - -+trawsink(...)+ :: thrift using writable serialization instead of thrift -serialization +*None listed here for now* END OF HIDDEN FOR NOW HALF BAKED ///////////////// diff --git a/src/java/com/cloudera/flume/agent/AgentFailChainSink.java b/src/java/com/cloudera/flume/agent/AgentFailChainSink.java index 8606574..79a0576 100644 --- a/src/java/com/cloudera/flume/agent/AgentFailChainSink.java +++ b/src/java/com/cloudera/flume/agent/AgentFailChainSink.java @@ -172,7 +172,7 @@ public class AgentFailChainSink extends EventSink.Base { ArrayList thriftified = new ArrayList(); if (list == null || list.size() == 0) { - String sink = String.format("tsink(\"%s\",%d)", FlumeConfiguration.get() + String sink = String.format("thriftSink(\"%s\",%d)", FlumeConfiguration.get() .getCollectorHost(), FlumeConfiguration.get().getCollectorPort()); thriftified.add(sink); return thriftified; @@ -184,7 +184,7 @@ public class AgentFailChainSink extends EventSink.Base { String collector = sock.getLeft(); int port = sock.getRight(); // This needs to be a physical address/node, not a logical node. - String sink = String.format("tsink(\"%s\",%d)", collector, port); + String sink = String.format("thriftSink(\"%s\",%d)", collector, port); thriftified.add(sink); } return thriftified; diff --git a/src/java/com/cloudera/flume/conf/SinkFactoryImpl.java b/src/java/com/cloudera/flume/conf/SinkFactoryImpl.java index 61d26b0..6485f57 100644 --- a/src/java/com/cloudera/flume/conf/SinkFactoryImpl.java +++ b/src/java/com/cloudera/flume/conf/SinkFactoryImpl.java @@ -73,9 +73,7 @@ import com.cloudera.flume.handlers.hdfs.SeqfileEventSink; import com.cloudera.flume.handlers.irc.IrcSink; import com.cloudera.flume.handlers.rpc.RpcSink; import com.cloudera.flume.handlers.syslog.SyslogTcpSink; -import com.cloudera.flume.handlers.thrift.ThriftAckedEventSink; import com.cloudera.flume.handlers.thrift.ThriftEventSink; -import com.cloudera.flume.handlers.thrift.ThriftRawEventSink; import com.cloudera.flume.master.availability.FailoverChainSink; import com.cloudera.flume.reporter.aggregator.AccumulatorSink; import com.cloudera.flume.reporter.aggregator.CounterSink; @@ -149,10 +147,7 @@ public class SinkFactoryImpl extends SinkFactory { { "regexhisto", RegexGroupHistogramSink.builderSimple() }, { "regexhistospec", RegexGroupHistogramSink.builder() }, - // deprecated - { "tsink", ThriftEventSink.builder() }, - { "tacksink", ThriftAckedEventSink.builder() }, - { "trawsink", ThriftRawEventSink.builder() }, }; + }; // The actual types are static Object[][] decoList = { diff --git a/src/java/com/cloudera/flume/conf/SourceFactoryImpl.java b/src/java/com/cloudera/flume/conf/SourceFactoryImpl.java index 6d4a6aa..ed7b111 100644 --- a/src/java/com/cloudera/flume/conf/SourceFactoryImpl.java +++ b/src/java/com/cloudera/flume/conf/SourceFactoryImpl.java @@ -80,8 +80,8 @@ public class SourceFactoryImpl extends SourceFactory { { "rpcSource", RpcSource.builder() }, { "thriftSource", ThriftEventSource.builder() }, { "avroSource", AvroEventSource.builder() }, - { "tSource", ThriftEventSource.builder() }, - { "text", TextFileSource.builder() }, { "tail", TailSource.builder() }, + { "text", TextFileSource.builder() }, + { "tail", TailSource.builder() }, { "multitail", TailSource.multiTailBuilder() }, { "tailDir", TailDirSource.builder() }, { "seqfile", SeqfileEventSource.builder() }, diff --git a/src/java/com/cloudera/flume/handlers/thrift/ThriftAckedEventSink.java b/src/java/com/cloudera/flume/handlers/thrift/ThriftAckedEventSink.java deleted file mode 100644 index 4365cab..0000000 --- a/src/java/com/cloudera/flume/handlers/thrift/ThriftAckedEventSink.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to Cloudera, Inc. under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Cloudera, Inc. licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.cloudera.flume.handlers.thrift; - -import java.io.IOException; - -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.cloudera.flume.conf.Context; -import com.cloudera.flume.conf.FlumeConfiguration; -import com.cloudera.flume.conf.SinkFactory.SinkBuilder; -import com.cloudera.flume.core.Event; -import com.cloudera.flume.core.EventImpl; -import com.cloudera.flume.core.EventSink; -import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Client; - -/** - * This is a sink that sends events to a remote host/port. The event append rpc - * has a return value so this is stop-and-wait (now sliding window). This - * however is the simplest way to get network backpressure that does not - * overwhelm the receiver. - */ -public class ThriftAckedEventSink extends EventSink.Base { - - static final Logger LOG = LoggerFactory.getLogger(ThriftAckedEventSink.class); - - String host; - int port; - Client client; - TTransport transport; - boolean nonblocking; - - public ThriftAckedEventSink(String host, int port, boolean nonblocking) { - this.host = host; - this.port = port; - this.nonblocking = nonblocking; - } - - public ThriftAckedEventSink(String host, int port) { - this(host, port, false); - } - - @Override - public void append(Event e) throws IOException, InterruptedException { - ThriftFlumeEvent tfe = ThriftEventAdaptor.convert(e); - - try { - EventStatus res = client.ackedAppend(tfe); - super.append(e); - if (res == EventStatus.ACK || res == EventStatus.COMMITED) { - return; - } - throw new IOException( - "Append return withs ERR status (received by dropped)"); - - } catch (TException e1) { - e1.printStackTrace(); - throw new IOException("Append failed " + e); - } - } - - @Override - public void close() throws IOException { - if (transport != null) { - transport.close(); - transport = null; - LOG.info("ThriftEventSink on port " + port + " closed"); - } - } - - @Override - public void open() throws IOException { - - try { - if (nonblocking) { - // non blocking must use "Framed transport" - transport = new TSocket(host, port); - transport = new TFramedTransport(transport); - } else { - transport = new TSocket(host, port); - } - - TProtocol protocol = new TBinaryProtocol(transport); - transport.open(); - client = new Client(protocol); - LOG.info("ThriftEventSink open on port " + port + " opened"); - - } catch (TTransportException e) { - e.printStackTrace(); - throw new IOException("Failed to open thrift event sink at " + host + ":" - + port + " : " + e); - } - } - - public static SinkBuilder builder() { - return new SinkBuilder() { - @Override - public EventSink build(Context context, String... args) { - if (args.length > 2) { - throw new IllegalArgumentException( - "usage: thrift([hostname, [portno]]) "); - } - String host = FlumeConfiguration.get().getCollectorHost(); - int port = FlumeConfiguration.get().getCollectorPort(); - if (args.length >= 1) { - host = args[0]; - } - - if (args.length >= 2) { - port = Integer.parseInt(args[1]); - } - - return new ThriftAckedEventSink(host, port); - } - }; - } - -} diff --git a/src/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServerImpl.java b/src/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServerImpl.java index d9f27cb..09759d7 100644 --- a/src/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServerImpl.java +++ b/src/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServerImpl.java @@ -60,31 +60,4 @@ class ThriftFlumeEventServerImpl implements Iface { throw new TException("Caught exception " + e, e); } } - - @Override - public void rawAppend(RawEvent evt) throws TException { - try { - WriteableEvent e = WriteableEvent.create(evt.getRaw()); - sink.append(e); - } catch (Exception e) { - // TODO figure out how to deal with different exns - throw new TException("Caught exception " + e, e); - } - - } - - @Override - public EventStatus ackedAppend(ThriftFlumeEvent evt) throws TException { - Preconditions.checkState(sink != null); - Preconditions.checkNotNull(evt); - try { - sink.append(new ThriftEventAdaptor(evt)); - return EventStatus.ACK; - } catch (Exception e) { - // TODO figure out how to deal with different exns - LOG.error(e.getMessage(), e); - return EventStatus.ERR; - } - } - } diff --git a/src/java/com/cloudera/flume/handlers/thrift/ThriftRawEventSink.java b/src/java/com/cloudera/flume/handlers/thrift/ThriftRawEventSink.java deleted file mode 100644 index 5c96bdd..0000000 --- a/src/java/com/cloudera/flume/handlers/thrift/ThriftRawEventSink.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to Cloudera, Inc. under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Cloudera, Inc. licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.cloudera.flume.handlers.thrift; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.thrift.TException; - -import com.cloudera.flume.conf.Context; -import com.cloudera.flume.conf.FlumeConfiguration; -import com.cloudera.flume.conf.SinkFactory.SinkBuilder; -import com.cloudera.flume.core.Event; -import com.cloudera.flume.core.EventImpl; -import com.cloudera.flume.core.EventSink; -import com.cloudera.flume.handlers.hdfs.WriteableEvent; - -/** - * This is a sink that will send events to a remote host/port. Instead of using - * a ThriftEventAdaptor to convert to a ThriftFlumeEvent, we serialize the event - * using its Writeable version and then just send the raw bytes. - * - * The original version would do two layers of serialization. This seems to be - * wasted effort when we can simplify it into one, especially with the more - * complicated serialization/deserialization.) - * - * Before: (POJO->ThriftFlumeEvent->thrift-> ThriftFlumeEvent -> POJO) - * - * Now: (POJO -> bytes -> thrift -> bytes -> POJO). - */ -public class ThriftRawEventSink extends ThriftEventSink { - - public ThriftRawEventSink(String host, int port) { - super(host, port); - } - - @Override - public void append(Event e) throws IOException { - WriteableEvent we = new WriteableEvent(e); - RawEvent re = new RawEvent(ByteBuffer.wrap(we.toBytes())); - - try { - client.rawAppend(re); - updateAppendStats(e); - } catch (TException e1) { - e1.printStackTrace(); - throw new IOException("Append failed " + e); - } - } - - public static SinkBuilder builder() { - return new SinkBuilder() { - @Override - public EventSink build(Context context, String... args) { - if (args.length > 2) { - throw new IllegalArgumentException( - "usage: trawsink([hostname, [portno]]) "); - } - String host = FlumeConfiguration.get().getCollectorHost(); - int port = FlumeConfiguration.get().getCollectorPort(); - if (args.length >= 1) { - host = args[0]; - } - - if (args.length >= 2) { - port = Integer.parseInt(args[1]); - } - - return new ThriftRawEventSink(host, port); - } - }; - } -} diff --git a/src/javatest/com/cloudera/flume/conf/TestFlumeSpecGen.java b/src/javatest/com/cloudera/flume/conf/TestFlumeSpecGen.java index 46ad3d6..321eb41 100644 --- a/src/javatest/com/cloudera/flume/conf/TestFlumeSpecGen.java +++ b/src/javatest/com/cloudera/flume/conf/TestFlumeSpecGen.java @@ -79,7 +79,7 @@ public class TestFlumeSpecGen { String out = FlumeSpecGen.genEventSink(tree); assertEquals(orig, out); - orig = "tsink( 1234 )"; + orig = "thriftSink( 1234 )"; tree = FlumeBuilder.parseSink(orig); out = FlumeSpecGen.genEventSink(tree); assertEquals(orig, out); @@ -101,7 +101,7 @@ public class TestFlumeSpecGen { String out = FlumeSpecGen.genEventSink(tree); assertEquals(orig, out); - orig = "{ intervalSampler( 20 ) => tsink( 1234 ) }"; + orig = "{ intervalSampler( 20 ) => thriftSink( 1234 ) }"; tree = FlumeBuilder.parseSink(orig); out = FlumeSpecGen.genEventSink(tree); assertEquals(orig, out); diff --git a/src/javatest/com/cloudera/flume/handlers/thrift/TestThriftSinks.java b/src/javatest/com/cloudera/flume/handlers/thrift/TestThriftSinks.java index 6a8d9dc..9253363 100644 --- a/src/javatest/com/cloudera/flume/handlers/thrift/TestThriftSinks.java +++ b/src/javatest/com/cloudera/flume/handlers/thrift/TestThriftSinks.java @@ -136,61 +136,6 @@ public class TestThriftSinks implements ExampleData { } - /** - * This version uses the ThriftRawEventSink instead of the ThriftEventSink * - * The pipeline is: - * - * text file -> mem - * - * mem -> thriftRawEventSink -> thriftEventSource -> counter - * - * @throws InterruptedException - * - */ - @Test - public void testThriftRawSend() throws IOException, InterruptedException { - EventSource txt = new NoNlASCIISynthSource(25, 100); - txt.open(); - MemorySinkSource mem = new MemorySinkSource(); - mem.open(); - EventUtil.dumpAll(txt, mem); - txt.close(); - - FlumeConfiguration conf = FlumeConfiguration.get(); - final ThriftEventSource tes = new ThriftEventSource(conf.getCollectorPort()); - tes.open(); - - final CounterSink cnt = new CounterSink("count"); - cnt.open(); - Thread t = new Thread("drain") { - public void run() { - try { - EventUtil.dumpAll(tes, cnt); - } catch (Exception e) { - } - } - }; - t.start(); // drain the sink. - - // mem -> thriftRawEventSink - ThriftRawEventSink snk = new ThriftRawEventSink("0.0.0.0", conf - .getCollectorPort()); - snk.open(); - EventUtil.dumpAll(mem, snk); - mem.close(); - snk.close(); - - // a little delay to drain events at ThriftEventSource queue - try { - Thread.sleep(5000); - t.interrupt(); - } catch (InterruptedException e) { - } - tes.close(); - assertEquals(25, cnt.getCount()); - - } - @Test public void testOpenClose() throws IOException, InterruptedException { int port = FlumeConfiguration.get().getCollectorPort(); diff --git a/src/thrift/flume.thrift b/src/thrift/flume.thrift index c20c124..c521f52 100644 --- a/src/thrift/flume.thrift +++ b/src/thrift/flume.thrift @@ -57,16 +57,9 @@ struct ThriftFlumeEvent { 6: map fields } -# Instead of using thrift's serialization, we just assume the contents are serialized already. -struct RawEvent { - 1: binary raw -} - service ThriftFlumeEventServer { oneway void append( 1:ThriftFlumeEvent evt ), - oneway void rawAppend( 1:RawEvent evt), - EventStatus ackedAppend( 1: ThriftFlumeEvent evt ), - + void close(), } -- 1.7.3.2