Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 63A60200CB7 for ; Tue, 23 May 2017 03:24:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6240E160BD4; Tue, 23 May 2017 01:24:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A9756160BD9 for ; Tue, 23 May 2017 03:24:01 +0200 (CEST) Received: (qmail 13165 invoked by uid 500); 23 May 2017 01:24:00 -0000 Mailing-List: contact commits-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list commits@apex.apache.org Received: (qmail 12886 invoked by uid 99); 23 May 2017 01:24:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 May 2017 01:24:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 094BEDFC2E; Tue, 23 May 2017 01:23:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thw@apache.org To: commits@apex.apache.org Date: Tue, 23 May 2017 01:24:01 -0000 Message-Id: In-Reply-To: <7f49941128e442f696cd7dfc47766d13@git.apache.org> References: <7f49941128e442f696cd7dfc47766d13@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/13] apex-malhar git commit: Changed package path for files to be included under malhar. Modifications to build files for project to build under malhar. archived-at: Tue, 23 May 2017 01:24:04 -0000 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java deleted file mode 100644 index 97e9aa8..0000000 --- a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStoragePerformanceTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.storage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.primitives.Ints; - -import com.datatorrent.netlet.util.Slice; - -/** - *

HDFSStoragePerformanceTest class.

- * - * @since 1.0.1 - */ -public class HDFSStoragePerformanceTest -{ - - public static void main(String[] args) - { - HDFSStorage storage = new HDFSStorage(); - storage.setBaseDir(args[0]); - storage.setId(args[1]); - storage.setRestore(true); - storage.setup(null); - int count = 100000000; - - logger.debug(" start time {}", System.currentTimeMillis()); - int index = 10000; - byte[] b = Ints.toByteArray(index); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - index++; - b = Ints.toByteArray(index); - } - storage.flush(); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - index++; - b = Ints.toByteArray(index); - } - storage.flush(); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - index++; - b = Ints.toByteArray(index); - } - storage.flush(); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - index++; - b = Ints.toByteArray(index); - } - storage.flush(); - for (int i = 0; i < count; i++) { - storage.store(new Slice(b, 0, b.length)); - index++; - b = Ints.toByteArray(index); - } - storage.flush(); - logger.debug(" end time {}", System.currentTimeMillis()); - logger.debug(" start time for retrieve {}", System.currentTimeMillis()); - b = storage.retrieve(new byte[8]); - int org_index = index; - index = 10000; - match(b, index); - while (true) { - index++; - b = storage.retrieveNext(); - if (b == null) { - logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index); - return; - } else { - if (!match(b, index)) { - throw new RuntimeException("failed : " + index); - } - } - } - - } - - public static boolean match(byte[] data, int match) - { - byte[] tempData = new byte[data.length - 8]; - System.arraycopy(data, 8, tempData, 0, tempData.length); - int dataR = Ints.fromByteArray(tempData); - //logger.debug("input: {}, output: {}",match,dataR); - if (match == dataR) { - return true; - } - return false; - } - - private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformanceTest.class); -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java b/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java deleted file mode 100644 index 0cb9935..0000000 --- a/flume/src/test/java/com/datatorrent/flume/storage/HDFSStorageTest.java +++ /dev/null @@ -1,695 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.storage; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.RandomAccessFile; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.io.FileUtils; -import org.apache.flume.Context; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.datatorrent.netlet.util.Slice; - -/** - * - */ -public class HDFSStorageTest -{ - public static class TestMeta extends TestWatcher - { - public String baseDir; - public String testFile; - private String testData = "No and yes. There is also IdleTimeHandler that allows the operator to emit tuples. " + - "There is overlap, why not have a single interface. \n" + - "Also consider the possibility of an operator that does other processing and not consume nor emit tuples,"; - - @Override - protected void starting(org.junit.runner.Description description) - { - String className = description.getClassName(); - baseDir = "target/" + className; - try { - baseDir = (new File(baseDir)).getAbsolutePath(); - FileUtils.forceMkdir(new File(baseDir)); - testFile = baseDir + "/testInput.txt"; - FileOutputStream outputStream = FileUtils.openOutputStream(new File(testFile)); - outputStream.write(testData.getBytes()); - outputStream.close(); - - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - @Override - protected void finished(Description description) - { - try { - FileUtils.deleteDirectory(new File(baseDir)); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - } - - @Rule - public TestMeta testMeta = new TestMeta(); - - private String STORAGE_DIRECTORY; - - private HDFSStorage getStorage(String id, boolean restore) - { - Context ctx = new Context(); - STORAGE_DIRECTORY = testMeta.baseDir; - ctx.put(HDFSStorage.BASE_DIR_KEY, testMeta.baseDir); - ctx.put(HDFSStorage.RESTORE_KEY, Boolean.toString(restore)); - ctx.put(HDFSStorage.ID, id); - ctx.put(HDFSStorage.BLOCKSIZE, "256"); - HDFSStorage lstorage = new HDFSStorage(); - lstorage.configure(ctx); - lstorage.setup(null); - return lstorage; - } - - private HDFSStorage storage; - - @Before - public void setup() - { - storage = getStorage("1", false); - } - - @After - public void teardown() - { - storage.teardown(); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - storage.cleanHelperFiles(); - } - - /** - * This test covers following use case 1. Some data is stored 2. File is flush but the file is not close 3. Some more - * data is stored but the file doesn't roll-overs 4. Retrieve is called for the last returned address and it return - * nulls 5. Some more data is stored again but the address is returned null because of previous retrieve call - * - * @throws Exception - */ - @Test - public void testPartialFlush() throws Exception - { - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = "ab".getBytes(); - byte[] address = storage.store(new Slice(b, 0, b.length)); - Assert.assertNotNull(address); - storage.flush(); - b = "cb".getBytes(); - byte[] addr = storage.store(new Slice(b, 0, b.length)); - match(storage.retrieve(new byte[8]), "ab"); - Assert.assertNull(storage.retrieve(addr)); - Assert.assertNull(storage.store(new Slice(b, 0, b.length))); - storage.flush(); - match(storage.retrieve(address), "cb"); - Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); - } - - /** - * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is - * flushed but the file is not closed 3. Some more data is stored. The data stored is enough to make the file roll - * over 4. Retrieve is called for the last returned address and it return nulls as the data is not flushed 5. Some - * more data is stored again but the address is returned null because of previous retrieve call 6. The data is flushed - * to make sure that the data is committed. 7. Now the data is retrieved from the starting and data returned matches - * the data stored - * - * @throws Exception - */ - @Test - public void testPartialFlushRollOver() throws Exception - { - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, - 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, - 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, - 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, - 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, - 48, 46, 48, 1, 48, 46, 48}; - byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, - 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, - 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, - 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, - 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, - 1, 48, 46, 48, 1, 48, 46, 48}; - byte[] address = storage.store(new Slice(b, 0, b.length)); - Assert.assertNotNull(address); - storage.flush(); - byte[] addr = null; - for (int i = 0; i < 5; i++) { - b[0] = (byte)(b[0] + 1); - addr = storage.store(new Slice(b, 0, b.length)); - } - Assert.assertNull(storage.retrieve(addr)); - for (int i = 0; i < 5; i++) { - b[0] = (byte)(b[0] + 1); - Assert.assertNull(storage.store(new Slice(b, 0, b.length))); - } - storage.flush(); - match(storage.retrieve(new byte[8]), new String(b_org)); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieve(address), new String(b_org)); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieveNext(), new String(b_org)); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieveNext(), new String(b_org)); - - } - - /** - * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is - * flushed but the file is not closed 3. Some more data is stored. The data stored is enough to make the file roll - * over 4. The storage crashes and new storage is instiated. 5. Retrieve is called for the last returned address and - * it return nulls as the data is not flushed 6. Some more data is stored again but the address is returned null - * because of previous retrieve call 7. The data is flushed to make sure that the data is committed. 8. Now the data - * is retrieved from the starting and data returned matches the data stored - * - * @throws Exception - */ - @Test - public void testPartialFlushRollOverWithFailure() throws Exception - { - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, - 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, - 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, - 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, - 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, - 48, 46, 48, 1, 48, 46, 48}; - byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, - 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, - 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, - 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, - 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, - 1, 48, 46, 48, 1, 48, 46, 48}; - byte[] address = storage.store(new Slice(b, 0, b.length)); - Assert.assertNotNull(address); - storage.flush(); - byte[] addr = null; - for (int i = 0; i < 5; i++) { - b[0] = (byte)(b[0] + 1); - addr = storage.store(new Slice(b, 0, b.length)); - } - storage = getStorage("1", true); - Assert.assertNull(storage.retrieve(addr)); - for (int i = 0; i < 5; i++) { - b[0] = (byte)(b[0] + 1); - Assert.assertNull(storage.store(new Slice(b, 0, b.length))); - } - storage.flush(); - match(storage.retrieve(new byte[8]), new String(b_org)); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieve(address), new String(b_org)); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieveNext(), new String(b_org)); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieveNext(), new String(b_org)); - - } - - /** - * This tests clean when the file doesn't roll over - * - * @throws Exception - */ - @Test - public void testPartialFlushWithClean() throws Exception - { - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = "ab".getBytes(); - byte[] address = storage.store(new Slice(b, 0, b.length)); - Assert.assertNotNull(address); - storage.flush(); - storage.clean(address); - b = "cb".getBytes(); - byte[] addr = storage.store(new Slice(b, 0, b.length)); - Assert.assertNull(storage.retrieve(addr)); - Assert.assertNull(storage.store(new Slice(b, 0, b.length))); - storage.flush(); - match(storage.retrieve(new byte[8]), "cb"); - match(storage.retrieve(address), "cb"); - Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); - } - - /** - * This tests clean when the file doesn't roll over - * - * @throws Exception - */ - @Test - public void testPartialFlushWithCleanAndFailure() throws Exception - { - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = "ab".getBytes(); - byte[] address = storage.store(new Slice(b, 0, b.length)); - Assert.assertNotNull(address); - storage.flush(); - storage.clean(address); - b = "cb".getBytes(); - byte[] addr = storage.store(new Slice(b, 0, b.length)); - storage = getStorage("1", true); - Assert.assertNull(storage.retrieve(addr)); - Assert.assertNull(storage.store(new Slice(b, 0, b.length))); - storage.flush(); - match(storage.retrieve(new byte[8]), "cb"); - match(storage.retrieve(address), "cb"); - Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); - } - - /** - * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is - * flushed but the file is not closed 3. The data is cleaned till the last returned address 4. Some more data is - * stored. The data stored is enough to make the file roll over 5. Retrieve is called for the last returned address - * and it return nulls as the data is not flushed 6. Some more data is stored again but the address is returned null - * because of previous retrieve call 7. The data is flushed to make sure that the data is committed. 8. Now the data - * is retrieved from the starting and data returned matches the data stored - * - * @throws Exception - */ - @Test - public void testPartialFlushWithCleanAndRollOver() throws Exception - { - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, - 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, - 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, - 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, - 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, - 48, 46, 48, 1, 48, 46, 48}; - byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, - 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, - 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, - 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, - 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, - 1, 48, 46, 48, 1, 48, 46, 48}; - byte[] address = storage.store(new Slice(b, 0, b.length)); - Assert.assertNotNull(address); - storage.flush(); - storage.clean(address); - - byte[] addr = null; - for (int i = 0; i < 5; i++) { - b[0] = (byte)(b[0] + 1); - addr = storage.store(new Slice(b, 0, b.length)); - } - Assert.assertNull(storage.retrieve(addr)); - for (int i = 0; i < 5; i++) { - b[0] = (byte)(b[0] + 1); - Assert.assertNull(storage.store(new Slice(b, 0, b.length))); - } - storage.flush(); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieve(new byte[8]), new String(b_org)); - match(storage.retrieve(address), new String(b_org)); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieveNext(), new String(b_org)); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieveNext(), new String(b_org)); - - } - - /** - * This tests the clean when the files are roll-over and the storage fails - * - * @throws Exception - */ - @Test - public void testPartialFlushWithCleanAndRollOverAndFailure() throws Exception - { - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, - 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, - 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, - 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, - 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, - 48, 46, 48, 1, 48, 46, 48}; - byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, - 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, - 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, - 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, - 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, - 1, 48, 46, 48, 1, 48, 46, 48}; - byte[] address = storage.store(new Slice(b, 0, b.length)); - Assert.assertNotNull(address); - storage.flush(); - storage.clean(address); - byte[] addr = null; - for (int i = 0; i < 5; i++) { - b[0] = (byte)(b[0] + 1); - addr = storage.store(new Slice(b, 0, b.length)); - } - storage = getStorage("1", true); - Assert.assertNull(storage.retrieve(addr)); - for (int i = 0; i < 5; i++) { - b[0] = (byte)(b[0] + 1); - Assert.assertNull(storage.store(new Slice(b, 0, b.length))); - } - storage.flush(); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieve(address), new String(b_org)); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieveNext(), new String(b_org)); - b_org[0] = (byte)(b_org[0] + 1); - match(storage.retrieveNext(), new String(b_org)); - - } - - /** - * This test covers following use case The file is flushed and then more data is written to the same file, but the new - * data is not flushed and file is not roll over and storage fails The new storage comes up and client asks for data - * at the last returned address from earlier storage instance. The new storage returns null. Client stores the data - * again but the address returned this time is null and the retrieval of the earlier address now returns data - * - * @throws Exception - */ - @Test - public void testPartialFlushWithFailure() throws Exception - { - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = "ab".getBytes(); - byte[] address = storage.store(new Slice(b, 0, b.length)); - Assert.assertNotNull(address); - storage.flush(); - b = "cb".getBytes(); - byte[] addr = storage.store(new Slice(b, 0, b.length)); - storage = getStorage("1", true); - Assert.assertNull(storage.retrieve(addr)); - Assert.assertNull(storage.store(new Slice(b, 0, b.length))); - storage.flush(); - match(storage.retrieve(address), "cb"); - } - - private void match(byte[] data, String match) - { - byte[] tempData = new byte[data.length - 8]; - System.arraycopy(data, 8, tempData, 0, tempData.length); - Assert.assertEquals("matched the stored value with retrieved value", match, new String(tempData)); - } - - @Test - public void testStorage() throws IOException - { - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = new byte[200]; - byte[] identifier; - Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); - Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); - Assert.assertNull(storage.retrieve(new byte[8])); - Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); - Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); - storage.flush(); - byte[] data = storage.retrieve(new byte[8]); - Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); - identifier = storage.store(new Slice(b, 0, b.length)); - byte[] tempData = new byte[data.length - 8]; - System.arraycopy(data, 8, tempData, 0, tempData.length); - Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); - Assert.assertNull(storage.retrieve(identifier)); - } - - @Test - public void testStorageWithRestore() throws IOException - { - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = new byte[200]; - Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); - storage.flush(); - storage.teardown(); - - storage = getStorage("1", true); - storage.store(new Slice(b, 0, b.length)); - storage.flush(); - Configuration conf = new Configuration(); - FileSystem fs = FileSystem.get(conf); - boolean exists = fs.exists(new Path(STORAGE_DIRECTORY + "/1/" + "1")); - Assert.assertEquals("file should exist", true, exists); - } - - @Test - public void testCleanup() throws IOException - { - RandomAccessFile r = new RandomAccessFile(testMeta.testFile, "r"); - r.seek(0); - byte[] b = r.readLine().getBytes(); - storage.store(new Slice(b, 0, b.length)); - byte[] val = storage.store(new Slice(b, 0, b.length)); - storage.flush(); - storage.clean(val); - Configuration conf = new Configuration(); - FileSystem fs = FileSystem.get(conf); - boolean exists = fs.exists(new Path(STORAGE_DIRECTORY + "/" + "0")); - Assert.assertEquals("file should not exist", false, exists); - r.close(); - } - - @Test - public void testNext() throws IOException - { - RandomAccessFile r = new RandomAccessFile(testMeta.testFile, "r"); - r.seek(0); - Assert.assertNull(storage.retrieve(new byte[8])); - byte[] b = r.readLine().getBytes(); - storage.store(new Slice(b, 0, b.length)); - byte[] b1 = r.readLine().getBytes(); - storage.store(new Slice(b1, 0, b1.length)); - storage.store(new Slice(b, 0, b.length)); - storage.flush(); - storage.store(new Slice(b1, 0, b1.length)); - storage.store(new Slice(b, 0, b.length)); - storage.flush(); - byte[] data = storage.retrieve(new byte[8]); - byte[] tempData = new byte[data.length - 8]; - System.arraycopy(data, 8, tempData, 0, tempData.length); - Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); - data = storage.retrieveNext(); - tempData = new byte[data.length - 8]; - System.arraycopy(data, 8, tempData, 0, tempData.length); - Assert.assertEquals("matched the stored value with retrieved value", new String(b1), new String(tempData)); - data = storage.retrieveNext(); - tempData = new byte[data.length - 8]; - System.arraycopy(data, 8, tempData, 0, tempData.length); - Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); - r.close(); - } - - @Test - public void testFailure() throws IOException - { - byte[] address; - byte[] b = new byte[200]; - storage.retrieve(new byte[8]); - for (int i = 0; i < 5; i++) { - storage.store(new Slice(b, 0, b.length)); - address = storage.store(new Slice(b, 0, b.length)); - storage.flush(); - storage.clean(address); - } - storage.teardown(); - - byte[] identifier = new byte[8]; - storage = getStorage("1", true); - - storage.retrieve(identifier); - - storage.store(new Slice(b, 0, b.length)); - storage.store(new Slice(b, 0, b.length)); - storage.store(new Slice(b, 0, b.length)); - storage.flush(); - byte[] data = storage.retrieve(identifier); - byte[] tempData = new byte[data.length - 8]; - System.arraycopy(data, 8, tempData, 0, tempData.length); - Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); - } - - /** - * This test case tests the clean call before any flush is called. - * - * @throws IOException - */ - @Test - public void testCleanUnflushedData() throws IOException - { - for (int i = 0; i < 5; i++) { - final byte[] bytes = (i + "").getBytes(); - storage.store(new Slice(bytes, 0, bytes.length)); - } - storage.clean(new byte[8]); - storage.flush(); - match(storage.retrieve(new byte[8]), "0"); - match(storage.retrieveNext(), "1"); - } - - @Test - public void testCleanForUnflushedData() throws IOException - { - byte[] address = null; - byte[] b = new byte[200]; - storage.retrieve(new byte[8]); - for (int i = 0; i < 5; i++) { - storage.store(new Slice(b, 0, b.length)); - address = storage.store(new Slice(b, 0, b.length)); - storage.flush(); - // storage.clean(address); - } - byte[] lastWrittenAddress = null; - for (int i = 0; i < 5; i++) { - storage.store(new Slice(b, 0, b.length)); - lastWrittenAddress = storage.store(new Slice(b, 0, b.length)); - } - storage.clean(lastWrittenAddress); - byte[] cleanedOffset = storage.readData(new Path(STORAGE_DIRECTORY + "/1/cleanoffsetFile")); - Assert.assertArrayEquals(address, cleanedOffset); - - } - - @Test - public void testCleanForFlushedData() throws IOException - { - byte[] b = new byte[200]; - storage.retrieve(new byte[8]); - for (int i = 0; i < 5; i++) { - storage.store(new Slice(b, 0, b.length)); - storage.store(new Slice(b, 0, b.length)); - storage.flush(); - // storage.clean(address); - } - byte[] lastWrittenAddress = null; - for (int i = 0; i < 5; i++) { - storage.store(new Slice(b, 0, b.length)); - lastWrittenAddress = storage.store(new Slice(b, 0, b.length)); - } - storage.flush(); - storage.clean(lastWrittenAddress); - byte[] cleanedOffset = storage.readData(new Path(STORAGE_DIRECTORY + "/1/cleanoffsetFile")); - Assert.assertArrayEquals(lastWrittenAddress, cleanedOffset); - - } - - @Test - public void testCleanForPartialFlushedData() throws IOException - { - byte[] b = new byte[8]; - storage.retrieve(new byte[8]); - - storage.store(new Slice(b, 0, b.length)); - byte[] bytes = "1a".getBytes(); - byte[] address = storage.store(new Slice(bytes, 0, bytes.length)); - storage.flush(); - storage.clean(address); - - byte[] lastWrittenAddress = null; - for (int i = 0; i < 5; i++) { - final byte[] bytes1 = (i + "").getBytes(); - storage.store(new Slice(bytes1, 0, bytes1.length)); - lastWrittenAddress = storage.store(new Slice(b, 0, b.length)); - } - Assert.assertNull(storage.retrieve(new byte[8])); - Assert.assertNull(storage.retrieve(lastWrittenAddress)); - storage.store(new Slice(b, 0, b.length)); - storage.flush(); - Assert.assertNull(storage.retrieve(lastWrittenAddress)); - } - - @Test - public void testRandomSequence() throws IOException - { - storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); - byte[] bytes = new byte[]{48, 48, 48, 51, 101, 100, 55, 56, 55, 49, 53, 99, 52, 101, 55, 50, 97, 52, 48, 49, 51, - 99, 97, 54, 102, 57, 55, 53, 57, 100, 49, 99, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, - 45, 49, 49, 45, 48, 55, 32, 48, 48, 58, 48, 48, 58, 52, 54, 1, 52, 50, 49, 50, 51, 1, 50, 1, 49, 53, 49, 49, - 52, 50, 54, 53, 1, 49, 53, 49, 49, 57, 51, 53, 49, 1, 49, 53, 49, 50, 57, 56, 50, 52, 1, 49, 53, 49, 50, 49, - 55, 48, 55, 1, 49, 48, 48, 55, 55, 51, 57, 51, 1, 49, 57, 49, 52, 55, 50, 53, 52, 54, 49, 1, 49, 1, 48, 1, 48, - 46, 48, 1, 48, 46, 48, 1, 48, 46, 48}; - storage.store(new Slice(bytes, 0, bytes.length)); - storage.flush(); - storage.clean(new byte[]{-109, 0, 0, 0, 0, 0, 0, 0}); - storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); - for (int i = 0; i < 2555; i++) { - byte[] bytes1 = new byte[]{48, 48, 48, 55, 56, 51, 98, 101, 50, 54, 50, 98, 52, 102, 50, 54, 56, 97, 55, 56, 102, - 48, 54, 54, 50, 49, 49, 54, 99, 98, 101, 99, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, - 45, 49, 49, 45, 48, 55, 32, 48, 48, 58, 48, 48, 58, 53, 49, 1, 49, 49, 49, 49, 54, 51, 57, 1, 50, 1, 49, 53, - 49, 48, 57, 57, 56, 51, 1, 49, 53, 49, 49, 49, 55, 48, 52, 1, 49, 53, 49, 50, 49, 51, 55, 49, 1, 49, 53, 49, - 49, 52, 56, 51, 49, 1, 49, 48, 48, 55, 49, 57, 56, 49, 1, 49, 50, 48, 50, 55, 54, 49, 54, 56, 53, 1, 49, 1, - 48, 1, 48, 46, 48, 1, 48, 46, 48, 1, 48, 46, 48}; - storage.store(new Slice(bytes1, 0, bytes1.length)); - storage.flush(); - } - storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); - for (int i = 0; i < 1297; i++) { - storage.retrieveNext(); - } - storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); - for (int i = 0; i < 1302; i++) { - storage.retrieveNext(); - } - storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); - for (int i = 0; i < 1317; i++) { - storage.retrieveNext(); - } - storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); - for (int i = 0; i < 2007; i++) { - storage.retrieveNext(); - } - storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); - for (int i = 0; i < 2556; i++) { - storage.retrieveNext(); - } - byte[] bytes1 = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, - 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, - 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, - 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, - 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, - 1, 48, 46, 48, 1, 48, 46, 48}; - storage.store(new Slice(bytes1, 0, bytes1.length)); - storage.flush(); - storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); - for (int i = 0; i < 2062; i++) { - storage.retrieveNext(); - - } - } - - @SuppressWarnings("unused") - private static final Logger logger = LoggerFactory.getLogger(HDFSStorageTest.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java new file mode 100644 index 0000000..6503357 --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscoveryTest.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.discovery; + +import org.codehaus.jackson.type.TypeReference; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.details.InstanceSerializer; + +import com.datatorrent.flume.discovery.Discovery.Service; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; + +/** + * + */ +@Ignore +public class ZKAssistedDiscoveryTest +{ + public ZKAssistedDiscoveryTest() + { + } + + @Test + public void testSerialization() throws Exception + { + ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); + discovery.setServiceName("DTFlumeTest"); + discovery.setConnectionString("localhost:2181"); + discovery.setBasePath("/HelloDT"); + discovery.setup(null); + ServiceInstance instance = discovery.getInstance(new Service() + { + @Override + public String getHost() + { + return "localhost"; + } + + @Override + public int getPort() + { + return 8080; + } + + @Override + public byte[] getPayload() + { + return null; + } + + @Override + public String getId() + { + return "localhost8080"; + } + + }); + InstanceSerializer instanceSerializer = + discovery.getInstanceSerializerFactory().getInstanceSerializer(new TypeReference>() + { + }); + byte[] serialize = instanceSerializer.serialize(instance); + logger.debug("serialized json = {}", new String(serialize)); + ServiceInstance deserialize = instanceSerializer.deserialize(serialize); + assertArrayEquals("Metadata", instance.getPayload(), deserialize.getPayload()); + } + + @Test + public void testDiscover() + { + ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); + discovery.setServiceName("DTFlumeTest"); + discovery.setConnectionString("localhost:2181"); + discovery.setBasePath("/HelloDT"); + discovery.setup(null); + assertNotNull("Discovered Sinks", discovery.discover()); + discovery.teardown(); + } + + @Test + public void testAdvertize() + { + ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); + discovery.setServiceName("DTFlumeTest"); + discovery.setConnectionString("localhost:2181"); + discovery.setBasePath("/HelloDT"); + discovery.setup(null); + + Service service = new Service() + { + @Override + public String getHost() + { + return "chetan"; + } + + @Override + public int getPort() + { + return 5033; + } + + @Override + public byte[] getPayload() + { + return new byte[] {3, 2, 1}; + } + + @Override + public String getId() + { + return "uniqueId"; + } + + }; + discovery.advertise(service); + discovery.teardown(); + } + + private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscoveryTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java new file mode 100644 index 0000000..10153bc --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/integration/ApplicationTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.integration; + +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Event; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.flume.operator.AbstractFlumeInputOperator; +import com.datatorrent.flume.storage.EventCodec; + +/** + * + */ +@Ignore +public class ApplicationTest implements StreamingApplication +{ + public static class FlumeInputOperator extends AbstractFlumeInputOperator + { + @Override + public Event convert(Event event) + { + return event; + } + } + + public static class Counter implements Operator + { + private int count; + private transient Event event; + public final transient DefaultInputPort input = new DefaultInputPort() + { + @Override + public void process(Event tuple) + { + count++; + event = tuple; + } + + }; + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + logger.debug("total count = {}, tuple = {}", count, event); + } + + @Override + public void setup(OperatorContext context) + { + } + + @Override + public void teardown() + { + } + + private static final Logger logger = LoggerFactory.getLogger(Counter.class); + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 1000); + FlumeInputOperator flume = dag.addOperator("FlumeOperator", new FlumeInputOperator()); + flume.setConnectAddresses(new String[]{"test:127.0.0.1:8080"}); + flume.setCodec(new EventCodec()); + Counter counter = dag.addOperator("Counter", new Counter()); + + dag.addStream("Slices", flume.output, counter.input).setLocality(Locality.CONTAINER_LOCAL); + } + + @Test + public void test() + { + try { + LocalMode.runApp(this, Integer.MAX_VALUE); + } catch (Exception ex) { + logger.warn("The dag seems to be not testable yet, if it's - remove this exception handling", ex); + } + + } + + private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java new file mode 100644 index 0000000..47bcacf --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.interceptor; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.HashMap; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.flume.Context; +import org.apache.flume.interceptor.Interceptor; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Tests for {@link ColumnFilteringFormattingInterceptor} + */ +public class ColumnFilteringFormattingInterceptorTest +{ + private static InterceptorTestHelper helper; + + @BeforeClass + public static void startUp() + { + HashMap contextMap = new HashMap(); + contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); + contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{1}\001{2}\001{3}\001"); + + helper = new InterceptorTestHelper(new ColumnFilteringFormattingInterceptor.Builder(), contextMap); + } + + @Test + public void testInterceptEvent() + { + helper.testIntercept_Event(); + } + + @Test + public void testFiles() throws IOException, URISyntaxException + { + helper.testFiles(); + } + + @Test + public void testInterceptEventWithPrefix() + { + HashMap contextMap = new HashMap(); + contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); + contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "\001{1}\001{2}\001{3}\001"); + + ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder(); + builder.configure(new Context(contextMap)); + Interceptor interceptor = builder.build(); + + assertArrayEquals("Six Fields", + "\001\001Second\001\001".getBytes(), + interceptor.intercept( + new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody()); + } + + @Test + public void testInterceptEventWithLongSeparator() + { + HashMap contextMap = new HashMap(); + contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); + contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}ghi"); + + ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder(); + builder.configure(new Context(contextMap)); + Interceptor interceptor = builder.build(); + byte[] body = interceptor.intercept( + new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody(); + + assertArrayEquals("Six Fields, " + new String(body), "abcSeconddefghi".getBytes(), body); + } + + @Test + public void testInterceptEventWithTerminatingSeparator() + { + HashMap contextMap = new HashMap(); + contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); + contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}"); + + ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder(); + builder.configure(new Context(contextMap)); + Interceptor interceptor = builder.build(); + byte[] body = interceptor.intercept( + new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody(); + + assertArrayEquals("Six Fields, " + new String(body), "abcSeconddef".getBytes(), body); + } + + @Test + public void testInterceptEventWithColumnZero() + { + HashMap contextMap = new HashMap(); + contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); + contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{0}\001"); + + ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder(); + builder.configure(new Context(contextMap)); + Interceptor interceptor = builder.build(); + + assertArrayEquals("Empty Bytes", + "\001".getBytes(), + interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody()); + + assertArrayEquals("One Field", + "First\001".getBytes(), + interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody()); + + assertArrayEquals("Two Fields", + "\001".getBytes(), + interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptorTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptorTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptorTest.java new file mode 100644 index 0000000..b001b21 --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptorTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.interceptor; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.HashMap; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.flume.Context; +import org.apache.flume.interceptor.Interceptor; + +import static org.junit.Assert.assertArrayEquals; + +/** + * + */ +public class ColumnFilteringInterceptorTest +{ + private static InterceptorTestHelper helper; + + @BeforeClass + public static void startUp() + { + HashMap contextMap = new HashMap(); + contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1)); + contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); + contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "1 2 3"); + + helper = new InterceptorTestHelper(new ColumnFilteringInterceptor.Builder(), contextMap); + } + + @Test + public void testInterceptEvent() + { + helper.testIntercept_Event(); + } + + @Test + public void testFiles() throws IOException, URISyntaxException + { + helper.testFiles(); + } + + @Test + public void testInterceptEventWithColumnZero() + { + HashMap contextMap = new HashMap(); + contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1)); + contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); + contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "0"); + + ColumnFilteringInterceptor.Builder builder = new ColumnFilteringInterceptor.Builder(); + builder.configure(new Context(contextMap)); + Interceptor interceptor = builder.build(); + + assertArrayEquals("Empty Bytes", + "\001".getBytes(), + interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody()); + + assertArrayEquals("One Field", + "First\001".getBytes(), + interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody()); + + assertArrayEquals("Two Fields", + "\001".getBytes(), + interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/InterceptorTestHelper.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/InterceptorTestHelper.java b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/InterceptorTestHelper.java new file mode 100644 index 0000000..b8dfbe0 --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/InterceptorTestHelper.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.interceptor; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; + +import com.datatorrent.netlet.util.Slice; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; + +/** + * + */ +public class InterceptorTestHelper +{ + private static final byte FIELD_SEPARATOR = 1; + + static class MyEvent implements Event + { + byte[] body; + + MyEvent(byte[] bytes) + { + body = bytes; + } + + @Override + public Map getHeaders() + { + return null; + } + + @Override + public void setHeaders(Map map) + { + } + + @Override + @SuppressWarnings("ReturnOfCollectionOrArrayField") + public byte[] getBody() + { + return body; + } + + @Override + @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter") + public void setBody(byte[] bytes) + { + body = bytes; + } + } + + private final Interceptor.Builder builder; + private final Map context; + + InterceptorTestHelper(Interceptor.Builder builder, Map context) + { + this.builder = builder; + this.context = context; + } + + public void testIntercept_Event() + { + builder.configure(new Context(context)); + Interceptor interceptor = builder.build(); + + assertArrayEquals("Empty Bytes", + "\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("".getBytes())).getBody()); + + assertArrayEquals("One Separator", + "\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("\002".getBytes())).getBody()); + + assertArrayEquals("Two Separators", + "\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("\002\002".getBytes())).getBody()); + + assertArrayEquals("One Field", + "\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("First".getBytes())).getBody()); + + assertArrayEquals("Two Fields", + "First\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("\002First".getBytes())).getBody()); + + assertArrayEquals("Two Fields", + "\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\001".getBytes())).getBody()); + + assertArrayEquals("Two Fields", + "Second\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002Second".getBytes())).getBody()); + + assertArrayEquals("Three Fields", + "Second\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002Second\002".getBytes())).getBody()); + + assertArrayEquals("Three Fields", + "\001Second\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002\002Second".getBytes())).getBody()); + + assertArrayEquals("Four Fields", + "\001Second\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002\002Second\002".getBytes())).getBody()); + + assertArrayEquals("Five Fields", + "\001Second\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002\002Second\002\002".getBytes())).getBody()); + + assertArrayEquals("Six Fields", + "\001Second\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002\002Second\002\002\002".getBytes())).getBody()); + } + + public void testFiles() throws IOException, URISyntaxException + { + Properties properties = new Properties(); + properties.load(getClass().getResourceAsStream("/flume/conf/flume-conf.properties")); + + String interceptor = null; + for (Entry entry : properties.entrySet()) { + logger.debug("{} => {}", entry.getKey(), entry.getValue()); + + if (builder.getClass().getName().equals(entry.getValue().toString())) { + String key = entry.getKey().toString(); + if (key.endsWith(".type")) { + interceptor = key.substring(0, key.length() - "type".length()); + break; + } + } + } + + assertNotNull(builder.getClass().getName(), interceptor); + @SuppressWarnings({"null", "ConstantConditions"}) + final int interceptorLength = interceptor.length(); + + HashMap map = new HashMap(); + for (Entry entry : properties.entrySet()) { + String key = entry.getKey().toString(); + if (key.startsWith(interceptor)) { + map.put(key.substring(interceptorLength), entry.getValue().toString()); + } + } + + builder.configure(new Context(map)); + Interceptor interceptorInstance = builder.build(); + + URL url = getClass().getResource("/test_data/gentxns/"); + assertNotNull("Generated Transactions", url); + + int records = 0; + File dir = new File(url.toURI()); + for (File file : dir.listFiles()) { + records += processFile(file, interceptorInstance); + } + + Assert.assertEquals("Total Records", 2200, records); + } + + private int processFile(File file, Interceptor interceptor) throws IOException + { + InputStream stream = getClass().getResourceAsStream("/test_data/gentxns/" + file.getName()); + BufferedReader br = new BufferedReader(new InputStreamReader(stream)); + + String line; + int i = 0; + while ((line = br.readLine()) != null) { + byte[] body = interceptor.intercept(new MyEvent(line.getBytes())).getBody(); + RawEvent event = RawEvent.from(body, FIELD_SEPARATOR); + Assert.assertEquals("GUID", new Slice(line.getBytes(), 0, 32), event.guid); + logger.debug("guid = {}, time = {}", event.guid, event.time); + i++; + } + + br.close(); + return i; + } + + private static final Logger logger = LoggerFactory.getLogger(InterceptorTestHelper.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/RawEvent.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/RawEvent.java b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/RawEvent.java new file mode 100644 index 0000000..cf6a823 --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/interceptor/RawEvent.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.interceptor; + +import java.io.Serializable; + +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.netlet.util.Slice; + +/** + * + */ +public class RawEvent implements Serializable +{ + public Slice guid; + public long time; + public int dimensionsOffset; + + public Slice getGUID() + { + return guid; + } + + public long getTime() + { + return time; + } + + RawEvent() + { + /* needed for Kryo serialization */ + } + + public static RawEvent from(byte[] row, byte separator) + { + final int rowsize = row.length; + + /* + * Lets get the guid out of the current record + */ + int sliceLengh = -1; + while (++sliceLengh < rowsize) { + if (row[sliceLengh] == separator) { + break; + } + } + + int i = sliceLengh + 1; + + /* lets parse the date */ + int dateStart = i; + while (i < rowsize) { + if (row[i++] == separator) { + long time = DATE_PARSER.parseMillis(new String(row, dateStart, i - dateStart - 1)); + RawEvent event = new RawEvent(); + event.guid = new Slice(row, 0, sliceLengh); + event.time = time; + event.dimensionsOffset = i; + return event; + } + } + + return null; + } + + @Override + public int hashCode() + { + int hash = 5; + hash = 61 * hash + (this.guid != null ? this.guid.hashCode() : 0); + hash = 61 * hash + (int)(this.time ^ (this.time >>> 32)); + return hash; + } + + @Override + public String toString() + { + return "RawEvent{" + "guid=" + guid + ", time=" + time + '}'; + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final RawEvent other = (RawEvent)obj; + if (this.guid != other.guid && (this.guid == null || !this.guid.equals(other.guid))) { + return false; + } + return this.time == other.time; + } + + private static final DateTimeFormatter DATE_PARSER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); + private static final Logger logger = LoggerFactory.getLogger(RawEvent.class); + private static final long serialVersionUID = 201312191312L; +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperatorTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperatorTest.java new file mode 100644 index 0000000..d7c4c30 --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperatorTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.operator; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * + */ +public class AbstractFlumeInputOperatorTest +{ + public AbstractFlumeInputOperatorTest() + { + } + + @Test + public void testThreadLocal() + { + ThreadLocal> tl = new ThreadLocal>() + { + @Override + protected Set initialValue() + { + return new HashSet(); + } + + }; + Set get1 = tl.get(); + get1.add(1); + assertTrue("Just Added Value", get1.contains(1)); + + Set get2 = tl.get(); + assertTrue("Previously added value", get2.contains(1)); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java new file mode 100644 index 0000000..9bc69e8 --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/sink/DTFlumeSinkTest.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.sink; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.channel.MemoryChannel; + +import com.datatorrent.flume.discovery.Discovery; +import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.util.Slice; + +/** + * + */ +public class DTFlumeSinkTest +{ + static final String hostname = "localhost"; + int port = 0; + + @Test + @SuppressWarnings("SleepWhileInLoop") + public void testServer() throws InterruptedException, IOException + { + Discovery discovery = new Discovery() + { + @Override + public synchronized void unadvertise(Service service) + { + notify(); + } + + @Override + public synchronized void advertise(Service service) + { + port = service.getPort(); + logger.debug("listening at {}", service); + notify(); + } + + @Override + @SuppressWarnings("unchecked") + public synchronized Collection> discover() + { + try { + wait(); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + return Collections.EMPTY_LIST; + } + + }; + DTFlumeSink sink = new DTFlumeSink(); + sink.setName("TeskSink"); + sink.setHostname(hostname); + sink.setPort(0); + sink.setAcceptedTolerance(2000); + sink.setChannel(new MemoryChannel()); + sink.setDiscovery(discovery); + sink.start(); + AbstractLengthPrependerClient client = new AbstractLengthPrependerClient() + { + private byte[] array; + private int offset = 2; + + @Override + public void onMessage(byte[] buffer, int offset, int size) + { + Slice received = new Slice(buffer, offset, size); + logger.debug("Client Received = {}", received); + Assert.assertEquals(received, + new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, Server.Request.FIXED_SIZE)); + synchronized (DTFlumeSinkTest.this) { + DTFlumeSinkTest.this.notify(); + } + } + + @Override + public void connected() + { + super.connected(); + array = new byte[Server.Request.FIXED_SIZE + offset]; + array[offset] = Server.Command.ECHO.getOrdinal(); + array[offset + 1] = 1; + array[offset + 2] = 2; + array[offset + 3] = 3; + array[offset + 4] = 4; + array[offset + 5] = 5; + array[offset + 6] = 6; + array[offset + 7] = 7; + array[offset + 8] = 8; + Server.writeLong(array, offset + Server.Request.TIME_OFFSET, System.currentTimeMillis()); + write(array, offset, Server.Request.FIXED_SIZE); + } + + }; + + DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient"); + eventloop.start(); + discovery.discover(); + try { + eventloop.connect(new InetSocketAddress(hostname, port), client); + try { + synchronized (this) { + this.wait(); + } + } finally { + eventloop.disconnect(client); + } + } finally { + eventloop.stop(); + } + + sink.stop(); + } + + private static final Logger logger = LoggerFactory.getLogger(DTFlumeSinkTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/sink/ServerTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/sink/ServerTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/sink/ServerTest.java new file mode 100644 index 0000000..a893ebd --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/sink/ServerTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.sink; + +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +/** + * + */ +public class ServerTest +{ + byte[] array; + + public ServerTest() + { + array = new byte[1024]; + } + + @Test + public void testInt() + { + Server.writeInt(array, 0, Integer.MAX_VALUE); + Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readInt(array, 0)); + + Server.writeInt(array, 0, Integer.MIN_VALUE); + Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readInt(array, 0)); + + Server.writeInt(array, 0, 0); + Assert.assertEquals("Zero Integer", 0, Server.readInt(array, 0)); + + Random rand = new Random(); + for (int i = 0; i < 128; i++) { + int n = rand.nextInt(); + if (rand.nextBoolean()) { + n = -n; + } + Server.writeInt(array, 0, n); + Assert.assertEquals("Random Integer", n, Server.readInt(array, 0)); + } + } + + @Test + public void testLong() + { + Server.writeLong(array, 0, Integer.MAX_VALUE); + Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readLong(array, 0)); + + Server.writeLong(array, 0, Integer.MIN_VALUE); + Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readLong(array, 0)); + + Server.writeLong(array, 0, 0); + Assert.assertEquals("Zero Integer", 0L, Server.readLong(array, 0)); + + Server.writeLong(array, 0, Long.MAX_VALUE); + Assert.assertEquals("Max Long", Long.MAX_VALUE, Server.readLong(array, 0)); + + Server.writeLong(array, 0, Long.MIN_VALUE); + Assert.assertEquals("Min Long", Long.MIN_VALUE, Server.readLong(array, 0)); + + Server.writeLong(array, 0, 0L); + Assert.assertEquals("Zero Long", 0L, Server.readLong(array, 0)); + + Random rand = new Random(); + for (int i = 0; i < 128; i++) { + long n = rand.nextLong(); + if (rand.nextBoolean()) { + n = -n; + } + Server.writeLong(array, 0, n); + Assert.assertEquals("Random Long", n, Server.readLong(array, 0)); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageMatching.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageMatching.java b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageMatching.java new file mode 100644 index 0000000..4a714fe --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageMatching.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.primitives.Ints; + +import com.datatorrent.netlet.util.Slice; + +/** + * + */ +public class HDFSStorageMatching +{ + + public static void main(String[] args) + { + HDFSStorage storage = new HDFSStorage(); + storage.setBaseDir(args[0]); + storage.setId(args[1]); + storage.setRestore(true); + storage.setup(null); + int count = 100000000; + + logger.debug(" start time {}", System.currentTimeMillis()); + int index = 10000; + byte[] b = Ints.toByteArray(index); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + logger.debug(" end time {}", System.currentTimeMillis()); + logger.debug(" start time for retrieve {}", System.currentTimeMillis()); + b = storage.retrieve(new byte[8]); + int org_index = index; + index = 10000; + match(b, index); + while (true) { + index++; + b = storage.retrieveNext(); + if (b == null) { + logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index); + return; + } else { + if (!match(b, index)) { + throw new RuntimeException("failed : " + index); + } + } + } + + } + + public static boolean match(byte[] data, int match) + { + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + int dataR = Ints.fromByteArray(tempData); + //logger.debug("input: {}, output: {}",match,dataR); + if (match == dataR) { + return true; + } + return false; + } + + private static final Logger logger = LoggerFactory.getLogger(HDFSStorageMatching.class); +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformance.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformance.java b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformance.java new file mode 100644 index 0000000..6ed3892 --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformance.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.netlet.util.Slice; + +/** + * + */ +public class HDFSStoragePerformance +{ + + public static void main(String[] args) + { + HDFSStorage storage = new HDFSStorage(); + storage.setBaseDir("."); + storage.setId("gaurav_flume_1"); + storage.setRestore(true); + storage.setup(null); + int count = 1000000; + + logger.debug(" start time {}", System.currentTimeMillis()); + int index = 10000; + byte[] b = new byte[1024]; + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + } + storage.flush(); + logger.debug(" end time {}", System.currentTimeMillis()); + logger.debug(" start time for retrieve {}", System.currentTimeMillis()); + storage.retrieve(new byte[8]); + String inputData = new String(b); + index = 1; + while (true) { + b = storage.retrieveNext(); + if (b == null) { + logger.debug(" end time for retrieve {}", System.currentTimeMillis()); + return; + } else { + if (!match(b, inputData)) { + throw new RuntimeException("failed : " + index); + } + } + + index++; + } + + } + + public static boolean match(byte[] data, String match) + { + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); +// logger.debug("input: {}, output: {}",match,new String(tempData)); + return (match.equals(new String(tempData))); + } + + private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformance.class); +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformanceTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformanceTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformanceTest.java new file mode 100644 index 0000000..72f03fc --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStoragePerformanceTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.primitives.Ints; + +import com.datatorrent.netlet.util.Slice; + +/** + *

HDFSStoragePerformanceTest class.

+ * + * @since 1.0.1 + */ +public class HDFSStoragePerformanceTest +{ + + public static void main(String[] args) + { + HDFSStorage storage = new HDFSStorage(); + storage.setBaseDir(args[0]); + storage.setId(args[1]); + storage.setRestore(true); + storage.setup(null); + int count = 100000000; + + logger.debug(" start time {}", System.currentTimeMillis()); + int index = 10000; + byte[] b = Ints.toByteArray(index); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + for (int i = 0; i < count; i++) { + storage.store(new Slice(b, 0, b.length)); + index++; + b = Ints.toByteArray(index); + } + storage.flush(); + logger.debug(" end time {}", System.currentTimeMillis()); + logger.debug(" start time for retrieve {}", System.currentTimeMillis()); + b = storage.retrieve(new byte[8]); + int org_index = index; + index = 10000; + match(b, index); + while (true) { + index++; + b = storage.retrieveNext(); + if (b == null) { + logger.debug(" end time for retrieve {}/{}/{}", System.currentTimeMillis(), index, org_index); + return; + } else { + if (!match(b, index)) { + throw new RuntimeException("failed : " + index); + } + } + } + + } + + public static boolean match(byte[] data, int match) + { + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + int dataR = Ints.fromByteArray(tempData); + //logger.debug("input: {}, output: {}",match,dataR); + if (match == dataR) { + return true; + } + return false; + } + + private static final Logger logger = LoggerFactory.getLogger(HDFSStoragePerformanceTest.class); +} +