Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B0DCD435 for ; Fri, 9 Nov 2012 19:27:29 +0000 (UTC) Received: (qmail 67069 invoked by uid 500); 9 Nov 2012 19:27:29 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 67042 invoked by uid 500); 9 Nov 2012 19:27:29 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 67035 invoked by uid 99); 9 Nov 2012 19:27:29 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Nov 2012 19:27:29 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DCB0D53434; Fri, 9 Nov 2012 19:27:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brock@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1690: Elastic Search Sink doesn't run it's unit tests Message-Id: <20121109192728.DCB0D53434@tyr.zones.apache.org> Date: Fri, 9 Nov 2012 19:27:28 +0000 (UTC) Updated Branches: refs/heads/flume-1.4 0d7ab0a7d -> 84e472141 FLUME-1690: Elastic Search Sink doesn't run it's unit tests (Hari Shreedharan via Brock Noland Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/84e47214 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/84e47214 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/84e47214 Branch: refs/heads/flume-1.4 Commit: 84e472141b6f9375afe712f0d7259d20983dd93a Parents: 0d7ab0a Author: Brock Noland Authored: Fri Nov 9 13:24:17 2012 -0600 Committer: Brock Noland Committed: Fri Nov 9 13:26:43 2012 -0600 ---------------------------------------------------------------------- .../ElasticSearchDynamicSerializerTest.java | 64 ---- .../ElasticSearchLogStashEventSerializerTest.java | 122 -------- .../sink/elasticsearch/ElasticSearchSinkTest.java | 223 --------------- .../TestElasticSearchDynamicSerializer.java | 64 ++++ .../TestElasticSearchLogStashEventSerializer.java | 122 ++++++++ .../sink/elasticsearch/TestElasticSearchSink.java | 223 +++++++++++++++ 6 files changed, 409 insertions(+), 409 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java deleted file mode 100644 index 3317734..0000000 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializerTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.elasticsearch; - -import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.junit.Assert.assertEquals; - -import java.util.Map; - -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.event.EventBuilder; -import org.elasticsearch.common.collect.Maps; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.junit.Test; - -public class ElasticSearchDynamicSerializerTest { - - @Test - public void testRoundTrip() throws Exception { - ElasticSearchDynamicSerializer fixture = new ElasticSearchDynamicSerializer(); - Context context = new Context(); - fixture.configure(context); - - String message = "test body"; - Map headers = Maps.newHashMap(); - headers.put("headerNameOne", "headerValueOne"); - headers.put("headerNameTwo", "headerValueTwo"); - headers.put("headerNameThree", "headerValueThree"); - Event event = EventBuilder.withBody(message.getBytes(charset)); - event.setHeaders(headers); - - XContentBuilder expected = jsonBuilder().startObject(); - expected.field("body", new String(message.getBytes(), charset)); - for (String headerName : headers.keySet()) { - expected.field(headerName, new String(headers.get(headerName).getBytes(), - charset)); - } - expected.endObject(); - - XContentBuilder actual = fixture.getContentBuilder(event); - - assertEquals(new String(expected.bytes().array()), new String(actual - .bytes().array())); - - } -} http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java deleted file mode 100644 index a974e8b..0000000 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializerTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.elasticsearch; - -import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.junit.Assert.assertEquals; - -import java.util.Date; -import java.util.Map; - -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.event.EventBuilder; -import org.elasticsearch.common.collect.Maps; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.junit.Test; - -public class ElasticSearchLogStashEventSerializerTest { - - @Test - public void testRoundTrip() throws Exception { - ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer(); - Context context = new Context(); - fixture.configure(context); - - String message = "test body"; - Map headers = Maps.newHashMap(); - long timestamp = System.currentTimeMillis(); - headers.put("timestamp", String.valueOf(timestamp)); - headers.put("source", "flume_tail_src"); - headers.put("host", "test@localhost"); - headers.put("src_path", "/tmp/test"); - headers.put("headerNameOne", "headerValueOne"); - headers.put("headerNameTwo", "headerValueTwo"); - headers.put("type", "sometype"); - Event event = EventBuilder.withBody(message.getBytes(charset)); - event.setHeaders(headers); - - XContentBuilder expected = jsonBuilder().startObject(); - expected.field("@message", new String(message.getBytes(), charset)); - expected.field("@timestamp", new Date(timestamp)); - expected.field("@source", "flume_tail_src"); - expected.field("@type", "sometype"); - expected.field("@source_host", "test@localhost"); - expected.field("@source_path", "/tmp/test"); - expected.startObject("@fields"); - expected.field("timestamp", String.valueOf(timestamp)); - expected.field("src_path", "/tmp/test"); - expected.field("host", "test@localhost"); - expected.field("headerNameTwo", "headerValueTwo"); - expected.field("source", "flume_tail_src"); - expected.field("headerNameOne", "headerValueOne"); - expected.field("type", "sometype"); - expected.endObject(); - - expected.endObject(); - - XContentBuilder actual = fixture.getContentBuilder(event); - assertEquals(new String(expected.bytes().array()), new String(actual - .bytes().array())); - } - - @Test - public void shouldHandleInvalidJSONDuringComplexParsing() throws Exception { - ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer(); - Context context = new Context(); - fixture.configure(context); - - String message = "{flume: somethingnotvalid}"; - Map headers = Maps.newHashMap(); - long timestamp = System.currentTimeMillis(); - headers.put("timestamp", String.valueOf(timestamp)); - headers.put("source", "flume_tail_src"); - headers.put("host", "test@localhost"); - headers.put("src_path", "/tmp/test"); - headers.put("headerNameOne", "headerValueOne"); - headers.put("headerNameTwo", "headerValueTwo"); - headers.put("type", "sometype"); - Event event = EventBuilder.withBody(message.getBytes(charset)); - event.setHeaders(headers); - - XContentBuilder expected = jsonBuilder().startObject(); - expected.field("@message", new String(message.getBytes(), charset)); - expected.field("@timestamp", new Date(timestamp)); - expected.field("@source", "flume_tail_src"); - expected.field("@type", "sometype"); - expected.field("@source_host", "test@localhost"); - expected.field("@source_path", "/tmp/test"); - expected.startObject("@fields"); - expected.field("timestamp", String.valueOf(timestamp)); - expected.field("src_path", "/tmp/test"); - expected.field("host", "test@localhost"); - expected.field("headerNameTwo", "headerValueTwo"); - expected.field("source", "flume_tail_src"); - expected.field("headerNameOne", "headerValueOne"); - expected.field("type", "sometype"); - expected.endObject(); - - expected.endObject(); - - XContentBuilder actual = fixture.getContentBuilder(event); - assertEquals(new String(expected.bytes().array()), new String(actual - .bytes().array())); - } -} http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java deleted file mode 100644 index bb2f9f4..0000000 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkTest.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.elasticsearch; - -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import java.util.Date; -import java.util.concurrent.TimeUnit; - -import org.apache.flume.Channel; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.Sink.Status; -import org.apache.flume.Transaction; -import org.apache.flume.conf.Configurables; -import org.apache.flume.event.EventBuilder; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class ElasticSearchSinkTest extends AbstractElasticSearchSinkTest { - - private ElasticSearchSink fixture; - - @Before - public void init() throws Exception { - initDefaults(); - createNodes(); - fixture = new ElasticSearchSink(true); - } - - @After - public void tearDown() throws Exception { - shutdownNodes(); - } - - @Test - public void shouldIndexOneEvent() throws Exception { - Configurables.configure(fixture, new Context(parameters)); - Channel channel = bindAndStartChannel(fixture); - - Transaction tx = channel.getTransaction(); - tx.begin(); - Event event = EventBuilder.withBody("event #1 or 1".getBytes()); - channel.put(event); - tx.commit(); - tx.close(); - - fixture.process(); - fixture.stop(); - client.admin().indices() - .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); - - assertMatchAllQuery(1, event); - assertBodyQuery(1, event); - } - - @Test - public void shouldIndexFiveEvents() throws Exception { - // Make it so we only need to call process once - parameters.put(BATCH_SIZE, "5"); - Configurables.configure(fixture, new Context(parameters)); - Channel channel = bindAndStartChannel(fixture); - - int numberOfEvents = 5; - Event[] events = new Event[numberOfEvents]; - - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < numberOfEvents; i++) { - String body = "event #" + i + " of " + numberOfEvents; - Event event = EventBuilder.withBody(body.getBytes()); - events[i] = event; - channel.put(event); - } - tx.commit(); - tx.close(); - - fixture.process(); - fixture.stop(); - client.admin().indices() - .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); - - assertMatchAllQuery(numberOfEvents, events); - assertBodyQuery(5, events); - } - - @Test - public void shouldIndexFiveEventsOverThreeBatches() throws Exception { - parameters.put(BATCH_SIZE, "2"); - Configurables.configure(fixture, new Context(parameters)); - Channel channel = bindAndStartChannel(fixture); - - int numberOfEvents = 5; - Event[] events = new Event[numberOfEvents]; - - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < numberOfEvents; i++) { - String body = "event #" + i + " of " + numberOfEvents; - Event event = EventBuilder.withBody(body.getBytes()); - events[i] = event; - channel.put(event); - } - tx.commit(); - tx.close(); - - int count = 0; - Status status = Status.READY; - while (status != Status.BACKOFF) { - count++; - status = fixture.process(); - } - fixture.stop(); - - assertEquals(3, count); - - client.admin().indices() - .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); - assertMatchAllQuery(numberOfEvents, events); - assertBodyQuery(5, events); - } - - @Test - public void shouldParseConfiguration() { - parameters.put(HOSTNAMES, "10.5.5.27"); - parameters.put(CLUSTER_NAME, "testing-cluster-name"); - parameters.put(INDEX_NAME, "testing-index-name"); - parameters.put(INDEX_TYPE, "testing-index-type"); - parameters.put(TTL, "10"); - - fixture = new ElasticSearchSink(); - fixture.configure(new Context(parameters)); - - InetSocketTransportAddress[] expected = { new InetSocketTransportAddress( - "10.5.5.27", DEFAULT_PORT) }; - - assertEquals("testing-cluster-name", fixture.getClusterName()); - assertEquals( - "testing-index-name-" + ElasticSearchSink.df.format(new Date()), - fixture.getIndexName()); - assertEquals("testing-index-type", fixture.getIndexType()); - assertEquals(TimeUnit.DAYS.toMillis(10), fixture.getTTLMs()); - assertArrayEquals(expected, fixture.getServerAddresses()); - } - - @Test - public void shouldParseConfigurationUsingDefaults() { - parameters.put(HOSTNAMES, "10.5.5.27"); - parameters.remove(INDEX_NAME); - parameters.remove(INDEX_TYPE); - parameters.remove(CLUSTER_NAME); - - fixture = new ElasticSearchSink(); - fixture.configure(new Context(parameters)); - - InetSocketTransportAddress[] expected = { new InetSocketTransportAddress( - "10.5.5.27", DEFAULT_PORT) }; - - assertEquals( - DEFAULT_INDEX_NAME + "-" + ElasticSearchSink.df.format(new Date()), - fixture.getIndexName()); - assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType()); - assertEquals(DEFAULT_CLUSTER_NAME, fixture.getClusterName()); - assertArrayEquals(expected, fixture.getServerAddresses()); - } - - @Test - public void shouldParseMultipleHostUsingDefaultPorts() { - parameters.put(HOSTNAMES, "10.5.5.27,10.5.5.28,10.5.5.29"); - - fixture = new ElasticSearchSink(); - fixture.configure(new Context(parameters)); - - InetSocketTransportAddress[] expected = { - new InetSocketTransportAddress("10.5.5.27", DEFAULT_PORT), - new InetSocketTransportAddress("10.5.5.28", DEFAULT_PORT), - new InetSocketTransportAddress("10.5.5.29", DEFAULT_PORT) }; - - assertArrayEquals(expected, fixture.getServerAddresses()); - } - - @Test - public void shouldParseMultipleHostAndPorts() { - parameters.put(HOSTNAMES, "10.5.5.27:9300,10.5.5.28:9301,10.5.5.29:9302"); - - fixture = new ElasticSearchSink(); - fixture.configure(new Context(parameters)); - - InetSocketTransportAddress[] expected = { - new InetSocketTransportAddress("10.5.5.27", 9300), - new InetSocketTransportAddress("10.5.5.28", 9301), - new InetSocketTransportAddress("10.5.5.29", 9302) }; - - assertArrayEquals(expected, fixture.getServerAddresses()); - } -} http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java new file mode 100644 index 0000000..43a4b12 --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.assertEquals; + +import java.util.Map; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.junit.Test; + +public class TestElasticSearchDynamicSerializer { + + @Test + public void testRoundTrip() throws Exception { + ElasticSearchDynamicSerializer fixture = new ElasticSearchDynamicSerializer(); + Context context = new Context(); + fixture.configure(context); + + String message = "test body"; + Map headers = Maps.newHashMap(); + headers.put("headerNameOne", "headerValueOne"); + headers.put("headerNameTwo", "headerValueTwo"); + headers.put("headerNameThree", "headerValueThree"); + Event event = EventBuilder.withBody(message.getBytes(charset)); + event.setHeaders(headers); + + XContentBuilder expected = jsonBuilder().startObject(); + expected.field("body", new String(message.getBytes(), charset)); + for (String headerName : headers.keySet()) { + expected.field(headerName, new String(headers.get(headerName).getBytes(), + charset)); + } + expected.endObject(); + + XContentBuilder actual = fixture.getContentBuilder(event); + + assertEquals(new String(expected.bytes().array()), new String(actual + .bytes().array())); + + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java new file mode 100644 index 0000000..9dff4b0 --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.assertEquals; + +import java.util.Date; +import java.util.Map; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.junit.Test; + +public class TestElasticSearchLogStashEventSerializer { + + @Test + public void testRoundTrip() throws Exception { + ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer(); + Context context = new Context(); + fixture.configure(context); + + String message = "test body"; + Map headers = Maps.newHashMap(); + long timestamp = System.currentTimeMillis(); + headers.put("timestamp", String.valueOf(timestamp)); + headers.put("source", "flume_tail_src"); + headers.put("host", "test@localhost"); + headers.put("src_path", "/tmp/test"); + headers.put("headerNameOne", "headerValueOne"); + headers.put("headerNameTwo", "headerValueTwo"); + headers.put("type", "sometype"); + Event event = EventBuilder.withBody(message.getBytes(charset)); + event.setHeaders(headers); + + XContentBuilder expected = jsonBuilder().startObject(); + expected.field("@message", new String(message.getBytes(), charset)); + expected.field("@timestamp", new Date(timestamp)); + expected.field("@source", "flume_tail_src"); + expected.field("@type", "sometype"); + expected.field("@source_host", "test@localhost"); + expected.field("@source_path", "/tmp/test"); + expected.startObject("@fields"); + expected.field("timestamp", String.valueOf(timestamp)); + expected.field("src_path", "/tmp/test"); + expected.field("host", "test@localhost"); + expected.field("headerNameTwo", "headerValueTwo"); + expected.field("source", "flume_tail_src"); + expected.field("headerNameOne", "headerValueOne"); + expected.field("type", "sometype"); + expected.endObject(); + + expected.endObject(); + + XContentBuilder actual = fixture.getContentBuilder(event); + assertEquals(new String(expected.bytes().array()), new String(actual + .bytes().array())); + } + + @Test + public void shouldHandleInvalidJSONDuringComplexParsing() throws Exception { + ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer(); + Context context = new Context(); + fixture.configure(context); + + String message = "{flume: somethingnotvalid}"; + Map headers = Maps.newHashMap(); + long timestamp = System.currentTimeMillis(); + headers.put("timestamp", String.valueOf(timestamp)); + headers.put("source", "flume_tail_src"); + headers.put("host", "test@localhost"); + headers.put("src_path", "/tmp/test"); + headers.put("headerNameOne", "headerValueOne"); + headers.put("headerNameTwo", "headerValueTwo"); + headers.put("type", "sometype"); + Event event = EventBuilder.withBody(message.getBytes(charset)); + event.setHeaders(headers); + + XContentBuilder expected = jsonBuilder().startObject(); + expected.field("@message", new String(message.getBytes(), charset)); + expected.field("@timestamp", new Date(timestamp)); + expected.field("@source", "flume_tail_src"); + expected.field("@type", "sometype"); + expected.field("@source_host", "test@localhost"); + expected.field("@source_path", "/tmp/test"); + expected.startObject("@fields"); + expected.field("timestamp", String.valueOf(timestamp)); + expected.field("src_path", "/tmp/test"); + expected.field("host", "test@localhost"); + expected.field("headerNameTwo", "headerValueTwo"); + expected.field("source", "flume_tail_src"); + expected.field("headerNameOne", "headerValueOne"); + expected.field("type", "sometype"); + expected.endObject(); + + expected.endObject(); + + XContentBuilder actual = fixture.getContentBuilder(event); + assertEquals(new String(expected.bytes().array()), new String(actual + .bytes().array())); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/84e47214/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java new file mode 100644 index 0000000..4faa5be --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Sink.Status; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { + + private ElasticSearchSink fixture; + + @Before + public void init() throws Exception { + initDefaults(); + createNodes(); + fixture = new ElasticSearchSink(true); + } + + @After + public void tearDown() throws Exception { + shutdownNodes(); + } + + @Test + public void shouldIndexOneEvent() throws Exception { + Configurables.configure(fixture, new Context(parameters)); + Channel channel = bindAndStartChannel(fixture); + + Transaction tx = channel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody("event #1 or 1".getBytes()); + channel.put(event); + tx.commit(); + tx.close(); + + fixture.process(); + fixture.stop(); + client.admin().indices() + .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); + + assertMatchAllQuery(1, event); + assertBodyQuery(1, event); + } + + @Test + public void shouldIndexFiveEvents() throws Exception { + // Make it so we only need to call process once + parameters.put(BATCH_SIZE, "5"); + Configurables.configure(fixture, new Context(parameters)); + Channel channel = bindAndStartChannel(fixture); + + int numberOfEvents = 5; + Event[] events = new Event[numberOfEvents]; + + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < numberOfEvents; i++) { + String body = "event #" + i + " of " + numberOfEvents; + Event event = EventBuilder.withBody(body.getBytes()); + events[i] = event; + channel.put(event); + } + tx.commit(); + tx.close(); + + fixture.process(); + fixture.stop(); + client.admin().indices() + .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); + + assertMatchAllQuery(numberOfEvents, events); + assertBodyQuery(5, events); + } + + @Test + public void shouldIndexFiveEventsOverThreeBatches() throws Exception { + parameters.put(BATCH_SIZE, "2"); + Configurables.configure(fixture, new Context(parameters)); + Channel channel = bindAndStartChannel(fixture); + + int numberOfEvents = 5; + Event[] events = new Event[numberOfEvents]; + + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < numberOfEvents; i++) { + String body = "event #" + i + " of " + numberOfEvents; + Event event = EventBuilder.withBody(body.getBytes()); + events[i] = event; + channel.put(event); + } + tx.commit(); + tx.close(); + + int count = 0; + Status status = Status.READY; + while (status != Status.BACKOFF) { + count++; + status = fixture.process(); + } + fixture.stop(); + + assertEquals(3, count); + + client.admin().indices() + .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); + assertMatchAllQuery(numberOfEvents, events); + assertBodyQuery(5, events); + } + + @Test + public void shouldParseConfiguration() { + parameters.put(HOSTNAMES, "10.5.5.27"); + parameters.put(CLUSTER_NAME, "testing-cluster-name"); + parameters.put(INDEX_NAME, "testing-index-name"); + parameters.put(INDEX_TYPE, "testing-index-type"); + parameters.put(TTL, "10"); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + InetSocketTransportAddress[] expected = { new InetSocketTransportAddress( + "10.5.5.27", DEFAULT_PORT) }; + + assertEquals("testing-cluster-name", fixture.getClusterName()); + assertEquals( + "testing-index-name-" + ElasticSearchSink.df.format(new Date()), + fixture.getIndexName()); + assertEquals("testing-index-type", fixture.getIndexType()); + assertEquals(TimeUnit.DAYS.toMillis(10), fixture.getTTLMs()); + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test + public void shouldParseConfigurationUsingDefaults() { + parameters.put(HOSTNAMES, "10.5.5.27"); + parameters.remove(INDEX_NAME); + parameters.remove(INDEX_TYPE); + parameters.remove(CLUSTER_NAME); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + InetSocketTransportAddress[] expected = { new InetSocketTransportAddress( + "10.5.5.27", DEFAULT_PORT) }; + + assertEquals( + DEFAULT_INDEX_NAME + "-" + ElasticSearchSink.df.format(new Date()), + fixture.getIndexName()); + assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType()); + assertEquals(DEFAULT_CLUSTER_NAME, fixture.getClusterName()); + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test + public void shouldParseMultipleHostUsingDefaultPorts() { + parameters.put(HOSTNAMES, "10.5.5.27,10.5.5.28,10.5.5.29"); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + InetSocketTransportAddress[] expected = { + new InetSocketTransportAddress("10.5.5.27", DEFAULT_PORT), + new InetSocketTransportAddress("10.5.5.28", DEFAULT_PORT), + new InetSocketTransportAddress("10.5.5.29", DEFAULT_PORT) }; + + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test + public void shouldParseMultipleHostAndPorts() { + parameters.put(HOSTNAMES, "10.5.5.27:9300,10.5.5.28:9301,10.5.5.29:9302"); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + InetSocketTransportAddress[] expected = { + new InetSocketTransportAddress("10.5.5.27", 9300), + new InetSocketTransportAddress("10.5.5.28", 9301), + new InetSocketTransportAddress("10.5.5.29", 9302) }; + + assertArrayEquals(expected, fixture.getServerAddresses()); + } +}