Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0808818F80 for ; Thu, 28 May 2015 08:56:55 +0000 (UTC) Received: (qmail 53730 invoked by uid 500); 28 May 2015 08:56:54 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 53701 invoked by uid 500); 28 May 2015 08:56:54 -0000 Mailing-List: contact commits-help@kylin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.incubator.apache.org Delivered-To: mailing list commits@kylin.incubator.apache.org Received: (qmail 53692 invoked by uid 99); 28 May 2015 08:56:54 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 May 2015 08:56:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 7337D182202 for ; Thu, 28 May 2015 08:56:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id hGGxDWuiqxxB for ; Thu, 28 May 2015 08:56:53 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id EB7F1201E9 for ; Thu, 28 May 2015 08:56:52 +0000 (UTC) Received: (qmail 52193 invoked by uid 99); 28 May 2015 08:54:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 May 2015 08:54:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 83C2EDFFAA; Thu, 28 May 2015 08:54:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mahongbin@apache.org To: commits@kylin.incubator.apache.org Message-Id: <1583f12a8674409e91de708a2c5fcc5c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-kylin git commit: streaming cubing: half way Date: Thu, 28 May 2015 08:54:21 +0000 (UTC) Repository: incubator-kylin Updated Branches: refs/heads/streaming-cubing 399176bf2 -> 6eb0cfc54 streaming cubing: half way Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6eb0cfc5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6eb0cfc5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6eb0cfc5 Branch: refs/heads/streaming-cubing Commit: 6eb0cfc54e93442e37b20461ccb1189db9a1f826 Parents: 399176b Author: honma Authored: Thu May 28 16:54:10 2015 +0800 Committer: honma Committed: Thu May 28 16:54:10 2015 +0800 ---------------------------------------------------------------------- .../kylin/job/streaming/StreamingBootstrap.java | 27 +++++++++++++++----- .../org/apache/kylin/streaming/KafkaConfig.java | 12 ++++++++- 2 files changed, 32 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6eb0cfc5/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java index 534a261..95be7fd 100644 --- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java +++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java @@ -64,14 +64,12 @@ public class StreamingBootstrap { private KylinConfig kylinConfig; private StreamingManager streamingManager; - private IIManager iiManager; private Map kafkaConsumers = Maps.newConcurrentMap(); private StreamingBootstrap(KylinConfig kylinConfig) { this.kylinConfig = kylinConfig; this.streamingManager = StreamingManager.getInstance(kylinConfig); - this.iiManager = IIManager.getInstance(kylinConfig); } public static StreamingBootstrap getInstance(KylinConfig kylinConfig) { @@ -114,8 +112,24 @@ public class StreamingBootstrap { public void start(String streaming, int partitionId) throws Exception { final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming); Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming); - final IIInstance ii = iiManager.getII(kafkaConfig.getIiName()); + + if (!StringUtils.isEmpty(kafkaConfig.getIiName())) { + startIIStreaming(kafkaConfig, partitionId); + } else if (!StringUtils.isEmpty(kafkaConfig.getCubeName())) { + startIIStreaming(kafkaConfig, partitionId); + } else { + throw new IllegalArgumentException("no cube or ii in kafka config"); + } + } + + private void startCubeStreaming(KafkaConfig kafkaConfig) { + + } + + private void startIIStreaming(KafkaConfig kafkaConfig, int partitionId) throws Exception { + final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName()); Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName()); + final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size(); Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId); Preconditions.checkArgument(ii.getSegments().size() > 0); @@ -128,7 +142,8 @@ public class StreamingBootstrap { final int parallelism = shard / partitionCount; final int startShard = partitionId * parallelism; final int endShard = startShard + parallelism; - long streamingOffset = getEarliestStreamingOffset(streaming, startShard, endShard); + + long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), startShard, endShard); streamingOffset = streamingOffset - (streamingOffset % parallelism); logger.info("offset from ii desc is " + streamingOffset); final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig); @@ -142,7 +157,7 @@ public class StreamingBootstrap { } KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism); - kafkaConsumers.put(getKey(streaming, partitionId), consumer); + kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer); StreamParser parser; if (!StringUtils.isEmpty(kafkaConfig.getParserName())) { @@ -155,7 +170,7 @@ public class StreamingBootstrap { Executors.newSingleThreadExecutor().submit(consumer); final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism); for (int i = startShard; i < endShard; ++i) { - final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), streaming, iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i); + final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), kafkaConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i); task.setStreamParser(parser); if (i == endShard - 1) { streamingBuilderPool.submit(task).get(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6eb0cfc5/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java index 9949c96..b6f5025 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java @@ -75,6 +75,9 @@ public class KafkaConfig extends RootPersistentEntity { @JsonProperty("iiName") private String iiName; + @JsonProperty("cubeName") + private String cubeName; + @JsonProperty("parserName") private String parserName; @@ -86,7 +89,6 @@ public class KafkaConfig extends RootPersistentEntity { this.parserName = parserName; } - public int getTimeout() { return timeout; } @@ -133,6 +135,14 @@ public class KafkaConfig extends RootPersistentEntity { }); } + public String getCubeName() { + return cubeName; + } + + public void setCubeName(String cubeName) { + this.cubeName = cubeName; + } + public String getIiName() { return iiName; }