camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ioca...@apache.org
Subject svn commit: r1343296 [2/2] - in /camel/trunk: components/ components/camel-hbase/ components/camel-hbase/src/ components/camel-hbase/src/main/ components/camel-hbase/src/main/java/ components/camel-hbase/src/main/java/org/ components/camel-hbase/src/ma...
Date Mon, 28 May 2012 15:45:49 GMT
Added: camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java (added)
+++ camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java Mon May 28 15:45:47 2012
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hbase.processor.idempotent;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import org.apache.camel.component.hbase.HBaseHelper;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.IOHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseIdempotentRepository extends ServiceSupport implements IdempotentRepository {
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseIdempotentRepository.class);
+
+    private final Configuration configuration;
+    private final String tableName;
+    private final String family;
+    private final String qualifer;
+    private final HTable table;
+
+    /**
+     * Constructor
+     *
+     * @param configuration
+     * @param tableName
+     */
+    public HBaseIdempotentRepository(Configuration configuration, String tableName, String family, String qualifier) throws IOException {
+        this.configuration = configuration;
+        this.tableName = tableName;
+        this.family = family;
+        this.qualifer = qualifier;
+        //In the case of idempotent repository we do not want to catch exceptions related to HTable.
+        this.table = new HTable(configuration, tableName);
+    }
+
+
+    @Override
+    public boolean add(Object o) {
+        try {
+            synchronized (tableName.intern()) {
+                if (contains(o)) {
+                    return false;
+                }
+                byte[] b = toBytes(o);
+                Put put = new Put(b);
+                put.add(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(qualifer), b);
+                table.put(put);
+                table.flushCommits();
+                return true;
+            }
+        } catch (Exception e) {
+            LOG.warn("Error adding object {} to HBase repository.", o);
+            return false;
+        }
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        try {
+            byte[] b = toBytes(o);
+            Get get = new Get(b);
+            get.addColumn(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(qualifer));
+            return table.exists(get);
+        } catch (Exception e) {
+            LOG.warn("Error reading object {} from HBase repository.", o);
+            return false;
+        }
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        try {
+            byte[] b = toBytes(o);
+            if (table.exists(new Get(b))) {
+                Delete delete = new Delete(b);
+                table.delete(delete);
+                return true;
+            } else {
+                return false;
+            }
+        } catch (Exception e) {
+            LOG.warn("Error removing object {} from HBase repository.", o);
+            return false;
+        }
+    }
+
+    @Override
+    public boolean confirm(Object o) {
+        return true;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+
+    }
+
+    private byte[] toBytes(Object obj) {
+        if (obj instanceof byte[]) {
+            return (byte[]) obj;
+        } else if (obj instanceof Byte) {
+            return Bytes.toBytes((Byte) obj);
+        } else if (obj instanceof Short) {
+            return Bytes.toBytes((Short) obj);
+        } else if (obj instanceof Integer) {
+            return Bytes.toBytes((Integer) obj);
+        }  else if (obj instanceof Long) {
+            return Bytes.toBytes((Long) obj);
+        }  else if (obj instanceof Double) {
+            return Bytes.toBytes((Double) obj);
+        }  else if (obj instanceof String) {
+            return Bytes.toBytes((String) obj);
+        } else {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = null;
+            try {
+                oos = new ObjectOutputStream(baos);
+                oos.writeObject(obj);
+                return  baos.toByteArray();
+            } catch (IOException e) {
+                LOG.warn("Error while serializing object. Null will be used.", e);
+                return null;
+            } finally {
+                IOHelper.close(oos);
+                IOHelper.close(baos);
+            }
+        }
+    }
+}

Added: camel/trunk/components/camel-hbase/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/resources/META-INF/services/org/apache/camel/TypeConverter?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/main/resources/META-INF/services/org/apache/camel/TypeConverter (added)
+++ camel/trunk/components/camel-hbase/src/main/resources/META-INF/services/org/apache/camel/TypeConverter Mon May 28 15:45:47 2012
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.component.hbase.converter.HBaseModelConverter
\ No newline at end of file

Added: camel/trunk/components/camel-hbase/src/main/resources/META-INF/services/org/apache/camel/component/hbase
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/resources/META-INF/services/org/apache/camel/component/hbase?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/main/resources/META-INF/services/org/apache/camel/component/hbase (added)
+++ camel/trunk/components/camel-hbase/src/main/resources/META-INF/services/org/apache/camel/component/hbase Mon May 28 15:45:47 2012
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.hbase.HBaseComponent

Added: camel/trunk/components/camel-hbase/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/resources/log4j.properties?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/main/resources/log4j.properties (added)
+++ camel/trunk/components/camel-hbase/src/main/resources/log4j.properties Mon May 28 15:45:47 2012
@@ -0,0 +1,16 @@
+
+#
+# The logging properties used
+#
+log4j.rootLogger=INFO, out
+
+# uncomment the following line to turn on Camel debugging
+#log4j.logger.org.apache.camel=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+log4j.throwableRenderer=org.apache.log4j.EnhancedThrowableRenderer
\ No newline at end of file

Added: camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java (added)
+++ camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java Mon May 28 15:45:47 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.hbase;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public abstract class CamelHBaseTestSupport extends CamelTestSupport {
+
+    protected static HBaseTestingUtility hbaseUtil = new HBaseTestingUtility();
+    protected static int numServers = 1;
+    protected static final String REQUIREDUMASK = "0022";
+    protected static final String DEFAULTTABLE = "DEFAULTTABLE";
+    protected static final String DEFAULTFAMILY = "DEFAULTFAMILY";
+
+    protected static String actualMask = "0022";
+    protected static Boolean systemReady = true;
+
+
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+        systemReady = checkUmask();
+        if (systemReady) {
+            hbaseUtil.startMiniCluster(numServers);
+        }
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws Exception {
+        if (systemReady) {
+            hbaseUtil.shutdownMiniCluster();
+        }
+    }
+
+
+    protected static boolean checkUmask() throws IOException {
+        String operatingSystem = System.getProperty("os.name");
+        if (!operatingSystem.startsWith("Win")) {
+            Process p = Runtime.getRuntime().exec("umask");
+            InputStream is = p.getInputStream();
+            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+            StringBuilder sb = new StringBuilder();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                sb.append(line);
+            }
+            actualMask = sb.toString().trim();
+            return REQUIREDUMASK.equals(actualMask);
+        }
+        return false;
+    }
+
+    @Override
+    public CamelContext createCamelContext() throws Exception {
+        CamelContext context = new DefaultCamelContext(createRegistry());
+        HBaseComponent component = new HBaseComponent();
+        component.setConfiguration(hbaseUtil.getConfiguration());
+        context.addComponent("hbase", component);
+        return context;
+    }
+}

Added: camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java (added)
+++ camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java Mon May 28 15:45:47 2012
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.hbase;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HBaseConsumerTest extends CamelHBaseTestSupport {
+
+    protected Object[] key = {"1", "2", "3"};
+    protected final String[] body = {"Hello Hbase", "Hi HBase", "Yo HBase"};
+    protected final String[] family = {"family1", "family2", "family3"};
+    protected final String[] column = {"mycolumn1", "mycolumn2", "mycolumn3"};
+    protected final byte[][] families = {DEFAULTFAMILY.getBytes(),
+            family[0].getBytes(),
+            family[1].getBytes(),
+            family[2].getBytes()};
+
+    @Before
+    public void setUp() throws Exception {
+        if (systemReady) {
+            try {
+                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(DEFAULTTABLE), families);
+            } catch (TableExistsException ex) {
+                //Ignore if table exists
+            }
+
+            super.setUp();
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (systemReady) {
+            super.tearDown();
+        }
+    }
+
+
+    @Test
+    public void testPutMultiRowsAndConsume() throws Exception {
+        if (systemReady) {
+            ProducerTemplate template = context.createProducerTemplate();
+            Map<String, Object> headers = new HashMap<String, Object>();
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), body[0]);
+
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(2), family[0]);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(2), body[1]);
+
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(3), key[2]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(3), family[0]);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(3), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(3), body[2]);
+
+            headers.put(HBaseContats.OPERATION, HBaseContats.PUT);
+
+            template.sendBodyAndHeaders("direct:start", null, headers);
+
+            MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
+            mockEndpoint.expectedMessageCount(3);
+            mockEndpoint.assertIsSatisfied(10000);
+            Thread.sleep(10000);
+        }
+    }
+
+    /**
+     * Factory method which derived classes can use to create a {@link org.apache.camel.builder.RouteBuilder}
+     * to define the routes for testing
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start")
+                        .to("hbase://" + DEFAULTTABLE);
+
+                from("hbase://" + DEFAULTTABLE)
+                        .to("mock:result");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java (added)
+++ camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java Mon May 28 15:45:47 2012
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.hbase;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HBaseConvertionsTest extends CamelHBaseTestSupport {
+
+    protected Object[] key = {1, "2", "3"};
+    protected final Object[] body = {1L, false, "3"};
+    protected final String[] column = {"DEFAULTCOLUMN"};
+    protected final byte[][] families = {DEFAULTFAMILY.getBytes()};
+
+    @Before
+    public void setUp() throws Exception {
+        if (systemReady) {
+            try {
+                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(DEFAULTTABLE), families);
+            } catch (TableExistsException ex) {
+                //Ignore if table exists
+            }
+
+            super.setUp();
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (systemReady) {
+            hbaseUtil.deleteTable(DEFAULTTABLE.getBytes());
+            super.tearDown();
+        }
+    }
+
+    @Test
+    public void testPutMultiRows() throws Exception {
+        if (systemReady) {
+            ProducerTemplate template = context.createProducerTemplate();
+            Map<String, Object> headers = new HashMap<String, Object>();
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), DEFAULTFAMILY);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), body[0]);
+
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(2), DEFAULTFAMILY);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(2), body[1]);
+
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(3), key[2]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(3), DEFAULTFAMILY);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(3), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(3), body[2]);
+
+            headers.put(HBaseContats.OPERATION, HBaseContats.PUT);
+
+            template.sendBodyAndHeaders("direct:start", null, headers);
+
+            Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
+            HTable bar = new HTable(configuration, DEFAULTTABLE.getBytes());
+            Get get = new Get(Bytes.toBytes((Integer) key[0]));
+
+            //Check row 1
+            get.addColumn(DEFAULTFAMILY.getBytes(), column[0].getBytes());
+            Result result = bar.get(get);
+            byte[] resultValue = result.value();
+            assertArrayEquals(Bytes.toBytes((Long) body[0]), resultValue);
+
+            //Check row 2
+            get = new Get(Bytes.toBytes((String) key[1]));
+            get.addColumn(DEFAULTFAMILY.getBytes(), column[0].getBytes());
+            result = bar.get(get);
+            resultValue = result.value();
+            assertArrayEquals(Bytes.toBytes((Boolean) body[1]), resultValue);
+
+            //Check row 3
+            get = new Get(Bytes.toBytes((String) key[2]));
+            get.addColumn(DEFAULTFAMILY.getBytes(), column[0].getBytes());
+            result = bar.get(get);
+            resultValue = result.value();
+            assertArrayEquals(Bytes.toBytes((String) body[2]), resultValue);
+        }
+    }
+
+
+    /**
+     * Factory method which derived classes can use to create a {@link org.apache.camel.builder.RouteBuilder}
+     * to define the routes for testing
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start")
+                        .to("hbase://" + DEFAULTTABLE);
+
+                from("direct:scan")
+                        .to("hbase://" + DEFAULTTABLE + "?operation=" + HBaseContats.SCAN + "&maxResults=2&family=family1&qualifier=column1");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java (added)
+++ camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java Mon May 28 15:45:47 2012
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.hbase;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.hbase.mapping.CellMappingStrategyFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HBaseProducerTest extends CamelHBaseTestSupport {
+
+    protected String[] key = {"1", "2", "3"};
+    protected final String[] body = {"Hello Hbase", "Hi HBase", "Yo HBase"};
+    protected final String[] family = {"family1", "family2", "family3"};
+    protected final String[] column = {"mycolumn1", "mycolumn2", "mycolumn3"};
+    protected final byte[][] families = {DEFAULTFAMILY.getBytes(),
+            family[0].getBytes(),
+            family[1].getBytes(),
+            family[2].getBytes()};
+
+
+    @Before
+    public void setUp() throws Exception {
+        if (systemReady) {
+            try {
+                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(DEFAULTTABLE), families);
+            } catch (TableExistsException ex) {
+                //Ignore if table exists
+            }
+
+            super.setUp();
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (systemReady) {
+            hbaseUtil.deleteTable(DEFAULTTABLE.getBytes());
+            super.tearDown();
+        }
+    }
+
+    @Test
+    public void testPut() throws Exception {
+        if (systemReady) {
+            ProducerTemplate template = context.createProducerTemplate();
+            Map<String, Object> headers = new HashMap<String, Object>();
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), body[0]);
+            headers.put(HBaseContats.OPERATION, HBaseContats.PUT);
+            template.sendBodyAndHeaders("direct:start", null, headers);
+
+            Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
+            HTable table = new HTable(configuration, DEFAULTTABLE.getBytes());
+            Get get = new Get(key[0].getBytes());
+
+            get.addColumn(family[0].getBytes(), column[0].getBytes());
+            Result result = table.get(get);
+            byte[] resultValue = result.value();
+            assertArrayEquals(body[0].getBytes(), resultValue);
+        }
+    }
+
+
+    @Test
+    public void testPutAndGet() throws Exception {
+        testPut();
+        if (systemReady) {
+            ProducerTemplate template = context.createProducerTemplate();
+            Endpoint endpoint = context.getEndpoint("direct:start");
+            Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+            exchange.getIn().setHeader(HBaseContats.OPERATION, HBaseContats.GET);
+            Exchange resp = template.send(endpoint, exchange);
+            assertEquals(body[0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader()));
+        }
+    }
+
+
+    @Test
+    public void testPutMultiRows() throws Exception {
+        if (systemReady) {
+            ProducerTemplate template = context.createProducerTemplate();
+            Map<String, Object> headers = new HashMap<String, Object>();
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), body[0]);
+
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(2), family[0]);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(2), body[1]);
+
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(3), key[2]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(3), family[0]);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(3), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(3), body[2]);
+
+            headers.put(HBaseContats.OPERATION, HBaseContats.PUT);
+
+            template.sendBodyAndHeaders("direct:start", null, headers);
+
+            Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
+            HTable bar = new HTable(configuration, DEFAULTTABLE.getBytes());
+            Get get = new Get(key[0].getBytes());
+
+            //Check row 1
+            get.addColumn(family[0].getBytes(), column[0].getBytes());
+            Result result = bar.get(get);
+            byte[] resultValue = result.value();
+            assertArrayEquals(body[0].getBytes(), resultValue);
+
+            //Check row 2
+            get = new Get(key[1].getBytes());
+            get.addColumn(family[0].getBytes(), column[0].getBytes());
+            result = bar.get(get);
+            resultValue = result.value();
+            assertArrayEquals(body[1].getBytes(), resultValue);
+
+            //Check row 3
+            get = new Get(key[2].getBytes());
+            get.addColumn(family[0].getBytes(), column[0].getBytes());
+            result = bar.get(get);
+            resultValue = result.value();
+            assertArrayEquals(body[2].getBytes(), resultValue);
+        }
+    }
+
+    @Test
+    public void testPutAndGetMultiRows() throws Exception {
+        testPutMultiRows();
+        if (systemReady) {
+            ProducerTemplate template = context.createProducerTemplate();
+            Endpoint endpoint = context.getEndpoint("direct:start");
+            Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(2), family[0]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]);
+
+            exchange.getIn().setHeader(HBaseContats.OPERATION, HBaseContats.GET);
+            Exchange resp = template.send(endpoint, exchange);
+            assertEquals(body[0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader()));
+            assertEquals(body[1], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2)));
+        }
+    }
+
+
+    @Test
+    public void testPutMultiColumns() throws Exception {
+        if (systemReady) {
+            ProducerTemplate template = context.createProducerTemplate();
+            Map<String, Object> headers = new HashMap<String, Object>();
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), body[0]);
+
+            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[0]);
+            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(2), family[1]);
+            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[1]);
+            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(2), body[1]);
+
+            headers.put(HBaseContats.OPERATION, HBaseContats.PUT);
+
+            template.sendBodyAndHeaders("direct:start", null, headers);
+
+            Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration();
+            HTable bar = new HTable(configuration, DEFAULTTABLE.getBytes());
+            Get get = new Get(key[0].getBytes());
+
+            //Check column 1
+            get.addColumn(family[0].getBytes(), column[0].getBytes());
+            Result result = bar.get(get);
+            byte[] resultValue = result.value();
+            assertArrayEquals(body[0].getBytes(), resultValue);
+
+            //Check column 2
+            get = new Get(key[0].getBytes());
+            get.addColumn(family[1].getBytes(), column[1].getBytes());
+            result = bar.get(get);
+            resultValue = result.value();
+            assertArrayEquals(body[1].getBytes(), resultValue);
+        }
+    }
+
+
+    @Test
+    public void testPutAndGetMultiColumns() throws Exception {
+        testPutMultiColumns();
+        if (systemReady) {
+            ProducerTemplate template = context.createProducerTemplate();
+            Endpoint endpoint = context.getEndpoint("direct:start");
+            Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[0]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(2), family[1]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[1]);
+
+            exchange.getIn().setHeader(HBaseContats.OPERATION, HBaseContats.GET);
+            Exchange resp = template.send(endpoint, exchange);
+            assertEquals(body[0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader()));
+            assertEquals(body[1], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2)));
+        }
+    }
+
+
+    @Test
+    public void testPutAndGetAndDeleteMultiRows() throws Exception {
+        testPutMultiRows();
+        if (systemReady) {
+            ProducerTemplate template = context.createProducerTemplate();
+            Endpoint endpoint = context.getEndpoint("direct:start");
+
+            Exchange exchange1 = endpoint.createExchange(ExchangePattern.InOnly);
+            exchange1.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+            exchange1.getIn().setHeader(HBaseContats.OPERATION, HBaseContats.DELETE);
+            template.send(endpoint, exchange1);
+
+            Exchange exchange2 = endpoint.createExchange(ExchangePattern.InOut);
+            exchange2.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
+            exchange2.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+            exchange2.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+
+            exchange2.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
+            exchange2.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(2), family[0]);
+            exchange2.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]);
+            exchange2.getIn().setHeader(HBaseContats.OPERATION, HBaseContats.GET);
+            Exchange resp = template.send(endpoint, exchange2);
+            assertEquals(null, resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader()));
+            assertEquals(body[1], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2)));
+        }
+    }
+
+    @Test
+    public void testPutMultiRowsAndScan() throws Exception {
+        testPutMultiRows();
+        if (systemReady) {
+            ProducerTemplate template = context.createProducerTemplate();
+            Endpoint endpoint = context.getEndpoint("direct:scan");
+
+            Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
+            Exchange resp = template.send(endpoint, exchange);
+            Object result1 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(1));
+            Object result2 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2));
+            Object result3 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(3));
+
+            List bodies = Arrays.asList(body);
+            assertTrue(bodies.contains(result1) && bodies.contains(result2) && bodies.contains(result3));
+
+        }
+    }
+
+
+    /**
+     * Factory method which derived classes can use to create a {@link org.apache.camel.builder.RouteBuilder}
+     * to define the routes for testing
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start")
+                        .to("hbase://" + DEFAULTTABLE);
+
+                from("direct:scan")
+                        .to("hbase://" + DEFAULTTABLE + "?operation=" + HBaseContats.SCAN + "&maxResults=2&family=family1&qualifier=column1");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseSpringConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseSpringConsumerTest.java?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseSpringConsumerTest.java (added)
+++ camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseSpringConsumerTest.java Mon May 28 15:45:47 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.hbase;
+
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.spring.SpringCamelContext;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.junit.After;
+import org.junit.Before;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class HBaseSpringConsumerTest extends HBaseConsumerTest {
+
+    private AbstractApplicationContext applicationContext;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        if (systemReady) {
+            try {
+                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(DEFAULTTABLE), families);
+            } catch (TableExistsException ex) {
+                //Ignore if table exists
+            }
+            applicationContext = createApplicationContext();
+            context = (ModelCamelContext) SpringCamelContext.springCamelContext(applicationContext);
+        }
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (systemReady) {
+            super.tearDown();
+            if (applicationContext != null) {
+                applicationContext.destroy();
+            }
+        }
+    }
+
+    public AbstractApplicationContext createApplicationContext() throws Exception {
+        return new ClassPathXmlApplicationContext("/consumer.xml");
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseSpringProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseSpringProducerTest.java?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseSpringProducerTest.java (added)
+++ camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseSpringProducerTest.java Mon May 28 15:45:47 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.hbase;
+
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.spring.SpringCamelContext;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.junit.After;
+import org.junit.Before;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class HBaseSpringProducerTest extends HBaseProducerTest {
+
+    private AbstractApplicationContext applicationContext;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        if (systemReady) {
+            try {
+                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(DEFAULTTABLE), families);
+            } catch (TableExistsException ex) {
+                //Ignore if table exists
+            }
+            applicationContext = createApplicationContext();
+            context = (ModelCamelContext) SpringCamelContext.springCamelContext(applicationContext);
+        }
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (systemReady) {
+            super.tearDown();
+            if (applicationContext != null) {
+                applicationContext.destroy();
+            }
+        }
+    }
+
+    public AbstractApplicationContext createApplicationContext() throws Exception {
+        return new ClassPathXmlApplicationContext("/producer.xml");
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+}

Added: camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java (added)
+++ camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java Mon May 28 15:45:47 2012
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.hbase.processor.idempotent;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.hbase.CamelHBaseTestSupport;
+import org.apache.camel.component.hbase.HBaseHelper;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport {
+
+    IdempotentRepository repository;
+    HTable table;
+
+    private String key01 = "123";
+    private String key02 = "456";
+
+    @Before
+    public void setUp() throws Exception {
+        if (systemReady) {
+            try {
+                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(DEFAULTTABLE), HBaseHelper.getHBaseFieldAsBytes(DEFAULTFAMILY));
+            } catch (TableExistsException ex) {
+                //Ignore if table exists
+            }
+            this.repository = new HBaseIdempotentRepository(hbaseUtil.getConfiguration(), DEFAULTTABLE, DEFAULTFAMILY, "mycolumn");
+            table = new HTable(hbaseUtil.getConfiguration(), DEFAULTTABLE);
+            super.setUp();
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (systemReady) {
+            hbaseUtil.deleteTable(HBaseHelper.getHBaseFieldAsBytes(DEFAULTTABLE));
+            super.setUp();
+        }
+    }
+
+    @Test
+    public void testAdd() throws Exception {
+        if (systemReady) {
+            // add first key
+            assertTrue(repository.add(key01));
+            assertTrue(repository.contains(key01));
+
+            // try to add an other one
+            assertTrue(repository.add(key02));
+            assertTrue(repository.contains(key02));
+        }
+    }
+
+    @Test
+    public void testContains() throws Exception {
+        if (systemReady) {
+            assertFalse(repository.contains(key01));
+
+            // add key and check again
+            assertTrue(repository.add(key01));
+            assertTrue(repository.contains(key01));
+        }
+    }
+
+    @Test
+    public void testRemove() throws Exception {
+        if (systemReady) {
+            // add key to remove
+            assertTrue(repository.add(key01));
+            assertTrue(repository.contains(key01));
+            // assertEquals(1, dataSet.size());
+
+            // remove key
+            assertTrue(repository.remove(key01));
+            //assertEquals(0, dataSet.size());
+
+            // try to remove a key that isn't there
+            assertFalse(repository.remove(key02));
+        }
+    }
+
+
+    @Test
+    public void testRepositoryInRoute() throws Exception {
+        if (systemReady) {
+            MockEndpoint mock = (MockEndpoint) context.getEndpoint("mock:out");
+            mock.expectedBodiesReceived("a", "b");
+            // c is a duplicate
+
+            // send 3 message with one duplicated key (key01)
+            template.sendBodyAndHeader("direct:in", "a", "messageId", key01);
+            template.sendBodyAndHeader("direct:in", "b", "messageId", key02);
+            template.sendBodyAndHeader("direct:in", "c", "messageId", key01);
+
+            assertMockEndpointsSatisfied();
+        }
+    }
+
+    @Override
+    public CamelContext createCamelContext() throws Exception {
+        return new DefaultCamelContext(createRegistry());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:in")
+                        .idempotentConsumer(header("messageId"), repository)
+                        .to("mock:out");
+            }
+        };
+    }
+
+}

Added: camel/trunk/components/camel-hbase/src/test/resources/consumer.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/resources/consumer.xml?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/resources/consumer.xml (added)
+++ camel/trunk/components/camel-hbase/src/test/resources/consumer.xml Mon May 28 15:45:47 2012
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:camel="http://camel.apache.org/schema/spring"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+    <camelContext xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:start"/>
+            <to uri="hbase://DEFAULTTABLE"/>
+        </route>
+        <route>
+            <from uri="hbase://DEFAULTTABLE"/>
+            <to uri="mock:result"/>
+        </route>
+    </camelContext>
+
+    <bean id="hbase" class="org.apache.camel.component.hbase.HBaseComponent">
+        <property name="configuration" ref="config"/>
+    </bean>
+
+    <bean id="config" factory-method="getConfiguration" factory-bean="utility"/>
+
+    <bean id="utility" class="org.apache.hadoop.hbase.HBaseTestingUtility"/>
+</beans>
\ No newline at end of file

Added: camel/trunk/components/camel-hbase/src/test/resources/hbase-site.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/resources/hbase-site.xml?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/resources/hbase-site.xml (added)
+++ camel/trunk/components/camel-hbase/src/test/resources/hbase-site.xml Mon May 28 15:45:47 2012
@@ -0,0 +1,137 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>hbase.regionserver.msginterval</name>
+    <value>1000</value>
+    <description>Interval between messages from the RegionServer to HMaster
+    in milliseconds.  Default is 15. Set this value low if you want unit
+    tests to be responsive.
+    </description>
+  </property>
+  <property>
+    <name>hbase.client.pause</name>
+    <value>5000</value>
+    <description>General client pause value.  Used mostly as value to wait
+    before running a retry of a failed get, region lookup, etc.</description>
+  </property>
+  <property>
+    <name>hbase.master.meta.thread.rescanfrequency</name>
+    <value>10000</value>
+    <description>How long the HMaster sleeps (in milliseconds) between scans of
+    the root and meta tables.
+    </description>
+  </property>
+  <property>
+    <name>hbase.server.thread.wakefrequency</name>
+    <value>1000</value>
+    <description>Time to sleep in between searches for work (in milliseconds).
+    Used as sleep interval by service threads such as META scanner and log roller.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.handler.count</name>
+    <value>5</value>
+    <description>Count of RPC Server instances spun up on RegionServers
+    Same property is used by the HMaster for count of master handlers.
+    Default is 10.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.lease.period</name>
+    <value>6000</value>
+    <description>Length of time the master will wait before timing out a region
+    server lease. Since region servers report in every second (see above), this
+    value has been reduced so that the master will notice a dead region server
+    sooner. The default is 30 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.info.port</name>
+    <value>-1</value>
+    <description>The port for the hbase master web UI
+    Set to -1 if you do not want the info server to run.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port</name>
+    <value>-1</value>
+    <description>The port for the hbase regionserver web UI
+    Set to -1 if you do not want the info server to run.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port.auto</name>
+    <value>true</value>
+    <description>Info server auto port bind. Enables automatic port
+    search if hbase.regionserver.info.port is already in use.
+    Enabled for testing to run multiple tests on one machine.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.lease.thread.wakefrequency</name>
+    <value>3000</value>
+    <description>The interval between checks for expired region server leases.
+    This value has been reduced due to the other reduced values above so that
+    the master will notice a dead region server sooner. The default is 15 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.optionalcacheflushinterval</name>
+    <value>10000</value>
+    <description>
+    Amount of time to wait since the last time a region was flushed before
+    invoking an optional cache flush. Default 60,000.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.safemode</name>
+    <value>false</value>
+    <description>
+    Turn on/off safe mode in region server. Always on for production, always off
+    for tests.
+    </description>
+  </property>
+  <property>
+    <name>hbase.hregion.max.filesize</name>
+    <value>67108864</value>
+    <description>
+    Maximum desired file size for an HRegion.  If filesize exceeds
+    value + (value / 2), the HRegion is split in two.  Default: 256M.
+
+    Keep the maximum filesize small so we split more often in tests.
+    </description>
+  </property>
+  <property>
+    <name>hadoop.log.dir</name>
+    <value>${user.dir}/../logs</value>
+  </property>
+  <property>
+    <name>hbase.zookeeper.property.clientPort</name>
+    <value>21818</value>
+    <description>Property from ZooKeeper's config zoo.cfg.
+    The port at which the clients will connect.
+    </description>
+  </property>
+</configuration>

Added: camel/trunk/components/camel-hbase/src/test/resources/producer.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/resources/producer.xml?rev=1343296&view=auto
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/resources/producer.xml (added)
+++ camel/trunk/components/camel-hbase/src/test/resources/producer.xml Mon May 28 15:45:47 2012
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+ <beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:camel="http://camel.apache.org/schema/spring"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+    <camelContext xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:start"/>
+            <to uri="hbase://DEFAULTTABLE"/>
+        </route>
+        <route>
+            <from uri="direct:scan"/>
+            <to uri="hbase://DEFAULTTABLE?operation=CamelHBaseScan&amp;maxResults=2&amp;family=family1&amp;qualifier=column1"/>
+        </route>
+    </camelContext>
+
+    <bean id="hbase" class="org.apache.camel.component.hbase.HBaseComponent">
+        <property name="configuration" ref="config"/>
+    </bean>
+
+    <bean id="config" factory-method="getConfiguration" factory-bean="utility"/>
+
+    <bean id="utility" class="org.apache.hadoop.hbase.HBaseTestingUtility"/>
+</beans>
\ No newline at end of file

Modified: camel/trunk/components/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/pom.xml?rev=1343296&r1=1343295&r2=1343296&view=diff
==============================================================================
--- camel/trunk/components/pom.xml (original)
+++ camel/trunk/components/pom.xml Mon May 28 15:45:47 2012
@@ -84,6 +84,7 @@
     <module>camel-guice</module>
     <module>camel-hawtdb</module>
     <module>camel-hazelcast</module>
+    <module>camel-hbase</module>
     <module>camel-hdfs</module>
     <module>camel-hl7</module>
     <module>camel-ibatis</module>

Modified: camel/trunk/parent/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/parent/pom.xml?rev=1343296&r1=1343295&r2=1343296&view=diff
==============================================================================
--- camel/trunk/parent/pom.xml (original)
+++ camel/trunk/parent/pom.xml Mon May 28 15:45:47 2012
@@ -93,6 +93,7 @@
     <hawtbuf-version>1.9</hawtbuf-version>
     <hawtdb-version>1.6</hawtdb-version>
     <hazelcast-version>2.0.2</hazelcast-version>
+    <hbase-version>0.92.1</hbase-version>
     <hibernate-version>3.2.6.ga</hibernate-version>
     <hibernate-entitymanager-version>3.2.1.ga</hibernate-entitymanager-version>
     <hsqldb-version>2.2.8</hsqldb-version>
@@ -1414,6 +1415,19 @@
         <version>${ehcache-bundle-version}</version>
       </dependency>
 
+      <!-- Hadoop, Hbase -->
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+          <version>${hadoop-version}</version>
+      </dependency>
+
+      <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase</artifactId>
+          <version>${hbase-version}</version>
+      </dependency>
+
       <!-- hibernate and other db systems -->
       <dependency>
         <groupId>org.hibernate</groupId>

Modified: camel/trunk/platforms/karaf/features/src/main/resources/features.xml
URL: http://svn.apache.org/viewvc/camel/trunk/platforms/karaf/features/src/main/resources/features.xml?rev=1343296&r1=1343295&r2=1343296&view=diff
==============================================================================
--- camel/trunk/platforms/karaf/features/src/main/resources/features.xml (original)
+++ camel/trunk/platforms/karaf/features/src/main/resources/features.xml Mon May 28 15:45:47 2012
@@ -340,6 +340,34 @@
     <bundle dependency='true'>mvn:org.fusesource.hawtbuf/hawtbuf/${hawtbuf-version}</bundle>
     <bundle>mvn:org.apache.camel/camel-hawtdb/${project.version}</bundle>
   </feature>
+  <feature name='camel-hbase' version='${project.version}' resolver='(obr)' start-level='50'>
+        <feature version='${project.version}'>camel-core</feature>
+        <feature>war</feature>
+        <bundle dependency='true'>mvn:org.apache.servicemix.specs/org.apache.servicemix.specs.jaxws-api-2.2/${servicemix-specs-version}</bundle>
+        <bundle dependency='true'>mvn:org.apache.servicemix.specs/org.apache.servicemix.specs.saaj-api-1.3/${servicemix-specs-version}</bundle>
+        <bundle dependency='true'>mvn:org.apache.geronimo.specs/geronimo-jta_1.1_spec/${geronimo-jta-spec-version}</bundle>
+        <bundle dependency='true'>mvn:javax.mail/mail/${javax-mail-version}</bundle>
+        <bundle dependency='true'>mvn:commons-codec/commons-codec/${commons-codec-version}</bundle>
+        <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.commons-beanutils/${commons-beanutils-bundle-version}</bundle>
+        <bundle dependency='true'>mvn:commons-collections/commons-collections/${commons-collections-version}</bundle>
+        <bundle dependency='true'>mvn:commons-digester/commons-digester/${commons-digester-version}</bundle>
+        <bundle dependency='true'>mvn:commons-jxpath/commons-jxpath/${commons-jxpath-version}</bundle>
+        <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jdom/${jdom-bundle-version}</bundle>
+        <bundle dependency='true'>mvn:commons-lang/commons-lang/${commons-lang-version}</bundle>
+        <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.ant/${ant-bundle-version}</bundle>
+        <bundle dependency='true'>mvn:commons-configuration/commons-configuration/${commons-configuration-version}</bundle>
+        <bundle dependency='true'>mvn:commons-daemon/commons-daemon/${commons-daemon-version}</bundle>
+        <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.commons-httpclient/${commons-httpclient-bundle-version}</bundle>
+        <bundle dependency='true'>mvn:org.apache.commons/commons-math/${commons-math-version}</bundle>
+        <bundle dependency='true'>mvn:commons-net/commons-net/${commons-net-version}</bundle>
+        <bundle dependency='true'>mvn:org.codehaus.jackson/jackson-core-asl/${jackson-version}</bundle>
+        <bundle dependency='true'>mvn:org.codehaus.jackson/jackson-mapper-asl/${jackson-version}</bundle>
+        <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jetty/${jetty6-bundle-version}</bundle>
+        <bundle dependency='true'>mvn:org.apache.zookeeper/zookeeper/${zookeeper-version}</bundle>
+        <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.hadoop-core/${hadoop-bundle-version}</bundle>
+        <bundle>wrap:mvn:org.apache.hbase/hbase/${hbase-version}</bundle>
+        <bundle>mvn:org.apache.camel/camel-hbase/${project.version}</bundle>
+  </feature>
   <feature name='camel-hdfs' version='${project.version}' resolver='(obr)' start-level='50'>
     <feature version='${project.version}'>camel-core</feature>
     <feature>war</feature>



Mime
View raw message