Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3FF732009F8 for ; Fri, 20 May 2016 03:05:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3E636160A1D; Fri, 20 May 2016 01:05:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 107F2160A0D for ; Fri, 20 May 2016 03:05:02 +0200 (CEST) Received: (qmail 51550 invoked by uid 500); 20 May 2016 01:05:02 -0000 Mailing-List: contact notifications-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list notifications@asterixdb.incubator.apache.org Received: (qmail 51541 invoked by uid 99); 20 May 2016 01:05:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 May 2016 01:05:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id CB093C3A0E for ; Fri, 20 May 2016 01:05:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.126 X-Spam-Level: ** X-Spam-Status: No, score=2.126 tagged_above=-999 required=6.31 tests=[MISSING_HEADERS=1.207, SPF_FAIL=0.919] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id dD6cLmAQMu3H for ; Fri, 20 May 2016 01:05:00 +0000 (UTC) Received: from unhygienix.ics.uci.edu (unhygienix.ics.uci.edu [128.195.14.130]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id D712A5F56C for ; Fri, 20 May 2016 01:04:59 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by unhygienix.ics.uci.edu (Postfix) with ESMTP id 96664241EB9; Thu, 19 May 2016 18:05:12 -0700 (PDT) Date: Thu, 19 May 2016 18:05:12 -0700 From: "abdullah alamoudi (Code Review)" CC: Jenkins , Michael Blow Reply-To: bamousaa@gmail.com X-Gerrit-MessageType: merged Subject: Change in asterixdb[master]: Remove Key-Value Adapter X-Gerrit-Change-Id: Iaa7d8d70b0869242d1a872f55f0c6928fda94dcb X-Gerrit-ChangeURL: X-Gerrit-Commit: 3cb1016ffd82a6e0fb16d6bfd73cc35ed905bf58 In-Reply-To: References: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.8.4 Message-Id: <20160520010512.96664241EB9@unhygienix.ics.uci.edu> archived-at: Fri, 20 May 2016 01:05:04 -0000 abdullah alamoudi has submitted this change and it was merged. Change subject: Remove Key-Value Adapter ...................................................................... Remove Key-Value Adapter Change-Id: Iaa7d8d70b0869242d1a872f55f0c6928fda94dcb Reviewed-on: https://asterix-gerrit.ics.uci.edu/868 Tested-by: Jenkins Reviewed-by: Jenkins Reviewed-by: Michael Blow --- D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java 4 files changed, 0 insertions(+), 348 deletions(-) Approvals: Michael Blow: Looks good to me, approved Jenkins: Looks good to me, but someone else must approve; Verified diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java deleted file mode 100644 index 41bcc46..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.kv; - -import java.io.IOException; -import java.util.concurrent.ArrayBlockingQueue; - -import org.apache.asterix.external.api.IRawRecord; -import org.apache.asterix.external.api.IRecordReader; -import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; -import org.apache.asterix.external.input.record.GenericRecord; -import org.apache.asterix.external.util.FeedLogManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.log4j.Logger; - -import com.couchbase.client.core.CouchbaseCore; -import com.couchbase.client.core.endpoint.dcp.DCPConnection; -import com.couchbase.client.core.message.cluster.CloseBucketRequest; -import com.couchbase.client.core.message.cluster.OpenBucketRequest; -import com.couchbase.client.core.message.cluster.SeedNodesRequest; -import com.couchbase.client.core.message.dcp.DCPRequest; -import com.couchbase.client.core.message.dcp.MutationMessage; -import com.couchbase.client.core.message.dcp.OpenConnectionRequest; -import com.couchbase.client.core.message.dcp.OpenConnectionResponse; -import com.couchbase.client.core.message.dcp.RemoveMessage; -import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage; - -import rx.Subscriber; - -public class KVReader implements IRecordReader { - - private static final Logger LOGGER = Logger.getLogger(KVReader.class); - private static final MutationMessage POISON_PILL = new MutationMessage(0, (short) 0, null, null, 0, 0L, 0L, 0, 0, - 0L, null); - private final String feedName; - private final short[] vbuckets; - private final String bucket; - private final String password; - private final String[] sourceNodes; - private final CouchbaseCore core; - private final GenericRecord record; - private final ArrayBlockingQueue messages; - private AbstractFeedDataFlowController controller; - private Thread pushThread; - private boolean done = false; - - public KVReader(String feedName, String bucket, String password, String[] sourceNodes, short[] vbuckets, - int queueSize, CouchbaseCore core) throws HyracksDataException { - this.feedName = feedName; - this.bucket = bucket; - this.password = password; - this.sourceNodes = sourceNodes; - this.vbuckets = vbuckets; - this.messages = new ArrayBlockingQueue(queueSize); - this.core = core; - this.record = new GenericRecord<>(); - this.pushThread = new Thread(new Runnable() { - @Override - public void run() { - KVReader.this.run(); - } - }, feedName); - pushThread.start(); - } - - @Override - public void close() { - if (!done) { - done = true; - } - } - - private void run() { - core.send(new SeedNodesRequest(sourceNodes)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT) - .toBlocking().single(); - core.send(new OpenBucketRequest(bucket, password)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT) - .toBlocking().single(); - DCPConnection connection = core. send(new OpenConnectionRequest(feedName, bucket)) - .toBlocking().single().connection(); - for (int i = 0; i < vbuckets.length; i++) { - connection.addStream(vbuckets[i]).toBlocking().single(); - } - try { - connection.subject().toBlocking().subscribe(new Subscriber() { - @Override - public void onCompleted() { - } - - @Override - public void onError(Throwable e) { - e.printStackTrace(); - } - - @Override - public void onNext(DCPRequest dcpRequest) { - try { - if (dcpRequest instanceof SnapshotMarkerMessage) { - SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest; - LOGGER.info("snapshot DCP message received: " + message); - } else if ((dcpRequest instanceof MutationMessage) || (dcpRequest instanceof RemoveMessage)) { - messages.put(dcpRequest); - } else { - LOGGER.warn("Unknown type of DCP messages: " + dcpRequest); - } - } catch (Throwable th) { - LOGGER.error(th); - } - } - }); - } catch (Throwable th) { - th.printStackTrace(); - throw th; - } - } - - @Override - public boolean hasNext() throws Exception { - return !done; - } - - @Override - public IRawRecord next() throws IOException, InterruptedException { - if (messages.isEmpty()) { - controller.flush(); - } - DCPRequest dcpRequest = messages.take(); - if (dcpRequest == POISON_PILL) { - return null; - } - record.set(dcpRequest); - return record; - } - - @Override - public boolean stop() { - done = true; - core.send(new CloseBucketRequest(bucket)).toBlocking(); - pushThread.interrupt(); - try { - messages.put(KVReader.POISON_PILL); - } catch (InterruptedException e) { - LOGGER.warn(e); - return false; - } - return true; - } - - @Override - public void setController(AbstractFeedDataFlowController controller) { - this.controller = controller; - } - - @Override - public void setFeedLogManager(FeedLogManager feedLogManager) { - } - - @Override - public boolean handleException(Throwable th) { - return false; - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java deleted file mode 100644 index 70d53f6..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.kv; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.external.api.IRecordReader; -import org.apache.asterix.external.api.IRecordReaderFactory; -import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.ExternalDataUtils; -import org.apache.asterix.om.util.AsterixClusterProperties; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -import com.couchbase.client.core.CouchbaseCore; -import com.couchbase.client.core.config.CouchbaseBucketConfig; -import com.couchbase.client.core.env.DefaultCoreEnvironment; -import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder; -import com.couchbase.client.core.message.cluster.CloseBucketRequest; -import com.couchbase.client.core.message.cluster.GetClusterConfigRequest; -import com.couchbase.client.core.message.cluster.GetClusterConfigResponse; -import com.couchbase.client.core.message.cluster.OpenBucketRequest; -import com.couchbase.client.core.message.cluster.SeedNodesRequest; -import com.couchbase.client.core.message.dcp.DCPRequest; - -import rx.functions.Func1; - -public class KVReaderFactory implements IRecordReaderFactory { - - private static final long serialVersionUID = 1L; - // Constant fields - public static final boolean DCP_ENABLED = true; - public static final long AUTO_RELEASE_AFTER_MILLISECONDS = 5000L; - public static final int TIMEOUT = 5; - public static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; - // Dynamic fields - private Map configuration; - private String bucket; - private String password = ""; - private String[] couchbaseNodes; - private int numOfVBuckets; - private int[] schedule; - private String feedName; - // Transient fields - private static transient CouchbaseCore core; - private transient Builder builder; - private static transient DefaultCoreEnvironment env; - private transient AlgebricksAbsolutePartitionConstraint locationConstraints; - - @Override - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { - if (locationConstraints == null) { - String[] allPartitions = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations(); - Set ncs = new HashSet(Arrays.asList(allPartitions)); - locationConstraints = new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()])); - } - return locationConstraints; - } - - @Override - public void configure(Map configuration) throws AsterixException { - // validate first - if (!configuration.containsKey(ExternalDataConstants.KEY_BUCKET)) { - throw new AsterixException("Unspecified bucket"); - } - if (!configuration.containsKey(ExternalDataConstants.KEY_NODES)) { - throw new AsterixException("Unspecified Couchbase nodes"); - } - if (configuration.containsKey(ExternalDataConstants.KEY_PASSWORD)) { - password = configuration.get(ExternalDataConstants.KEY_PASSWORD); - } - this.configuration = configuration; - ExternalDataUtils.setNumberOfKeys(configuration, 1); - ExternalDataUtils.setChangeFeed(configuration, ExternalDataConstants.TRUE); - ExternalDataUtils.setRecordWithMeta(configuration, ExternalDataConstants.TRUE); - bucket = configuration.get(ExternalDataConstants.KEY_BUCKET); - couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(","); - feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME); - createEnvironment("CC"); - getNumberOfVbuckets(); - schedule(); - } - - private void createEnvironment(String connectionName) { - synchronized (TIME_UNIT) { - if (core == null) { - builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED).dcpConnectionName(connectionName) - .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS); - env = builder.build(); - core = new CouchbaseCore(env); - } - } - } - - /* - * We distribute the work of streaming vbuckets between all the partitions in a round robin - * fashion. - */ - private void schedule() { - schedule = new int[numOfVBuckets]; - String[] locations = getPartitionConstraint().getLocations(); - for (int i = 0; i < numOfVBuckets; i++) { - schedule[i] = i % locations.length; - } - } - - private void getNumberOfVbuckets() { - core.send(new SeedNodesRequest(couchbaseNodes)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single(); - core.send(new OpenBucketRequest(bucket, password)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single(); - numOfVBuckets = core. send(new GetClusterConfigRequest()) - .map(new Func1() { - @Override - public Integer call(GetClusterConfigResponse response) { - CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket); - return config.numberOfPartitions(); - } - }).timeout(TIMEOUT, TIME_UNIT).toBlocking().single(); - core.send(new CloseBucketRequest(bucket)).toBlocking(); - } - - @Override - public IRecordReader createRecordReader(IHyracksTaskContext ctx, int partition) - throws HyracksDataException { - String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId(); - createEnvironment(nodeName); - ArrayList listOfAssignedVBuckets = new ArrayList(); - for (int i = 0; i < schedule.length; i++) { - if (schedule[i] == partition) { - listOfAssignedVBuckets.add((short) i); - } - } - short[] vbuckets = new short[listOfAssignedVBuckets.size()]; - for (int i = 0; i < vbuckets.length; i++) { - vbuckets[i] = listOfAssignedVBuckets.get(i); - } - return new KVReader(feedName + ":" + nodeName + ":" + partition, bucket, password, couchbaseNodes, vbuckets, - ExternalDataUtils.getQueueSize(configuration), core); - } - - @Override - public Class getRecordClass() { - return DCPRequest.class; - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java index 1dd2fe8..f71e9a0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java @@ -27,7 +27,6 @@ import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.input.HDFSDataSourceFactory; import org.apache.asterix.external.input.record.reader.RecordWithPKTestReaderFactory; -import org.apache.asterix.external.input.record.reader.kv.KVReaderFactory; import org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory; import org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory; import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory; @@ -91,8 +90,6 @@ return ExternalDataUtils.createExternalRecordReaderFactory(configuration); } switch (reader) { - case ExternalDataConstants.READER_KV: - return new KVReaderFactory(); case ExternalDataConstants.READER_KV_TEST: return new KVTestReaderFactory(); case ExternalDataConstants.READER_HDFS: diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index cf2b7f3..81f8377 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -120,7 +120,6 @@ * Builtin record readers */ public static final String READER_HDFS = "hdfs"; - public static final String READER_KV = "key-value"; public static final String READER_TWITTER_PUSH = "twitter_push"; public static final String READER_PUSH_TWITTER = "push_twitter"; public static final String READER_TWITTER_PULL = "twitter_pull"; -- To view, visit https://asterix-gerrit.ics.uci.edu/868 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Iaa7d8d70b0869242d1a872f55f0c6928fda94dcb Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Michael Blow Gerrit-Reviewer: abdullah alamoudi