kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject kylin git commit: KYLIN-2122 Move the partition offset calculation before submitting job
Date Wed, 02 Nov 2016 02:38:53 GMT
Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2122 [created] 3cd68478b


KYLIN-2122 Move the partition offset calculation before submitting job


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3cd68478
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3cd68478
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3cd68478

Branch: refs/heads/KYLIN-2122
Commit: 3cd68478ba24b2e479a65a3c7b13cdc14b9a91c3
Parents: 8172d0b
Author: shaofengshi <shaofengshi@apache.org>
Authored: Tue Oct 25 11:54:22 2016 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Wed Nov 2 10:38:24 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/CubeInstance.java     |   9 ++
 .../java/org/apache/kylin/cube/CubeManager.java | 103 +++++------------
 .../org/apache/kylin/cube/CubeManagerTest.java  |   2 +-
 .../org/apache/kylin/cube/CubeSegmentsTest.java |   2 +-
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../java/org/apache/kylin/source/ISource.java   |  13 ++-
 .../org/apache/kylin/source/SourceFactory.java  |   2 +-
 .../apache/kylin/source/SourcePartition.java    | 103 +++++++++++++++++
 .../kylin/engine/mr/JobBuilderSupport.java      |   2 +
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  |  67 +++++++++++
 .../kylin/provision/BuildCubeWithEngine.java    |   8 +-
 .../kylin/provision/BuildCubeWithStream.java    |  17 ++-
 .../apache/kylin/rest/service/JobService.java   |  14 ++-
 .../apache/kylin/source/hive/HiveSource.java    |  32 +++++-
 .../apache/kylin/source/kafka/KafkaMRInput.java |  90 ++++++++++-----
 .../apache/kylin/source/kafka/KafkaSource.java  | 105 ++++++++++++++++-
 .../kylin/source/kafka/config/KafkaConfig.java  |   3 +
 .../source/kafka/hadoop/KafkaInputFormat.java   |  41 +++----
 .../kylin/source/kafka/job/SeekOffsetStep.java  |  17 ++-
 .../source/kafka/job/UpdateTimeRangeStep.java   | 112 +++++++++----------
 .../kylin/source/kafka/util/KafkaClient.java    |  22 +++-
 21 files changed, 536 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 720690d..7222457 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -439,6 +439,15 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         return this.getDescriptor().getAutoMergeTimeRanges() != null && this.getDescriptor().getAutoMergeTimeRanges().length > 0;
     }
 
+    public CubeSegment getLastSegment() {
+        List<CubeSegment> existing = getSegments();
+        if (existing.isEmpty()) {
+            return null;
+        } else {
+            return existing.get(existing.size() - 1);
+        }
+    }
+
     @Override
     public int getSourceType() {
         return getFactTableDesc().getSourceType();

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index a53849e..16b468f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -34,8 +34,6 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
@@ -68,9 +66,11 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.ReadableTable;
 import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourcePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
@@ -434,52 +434,20 @@ public class CubeManager implements IRealizationProvider {
 
     // append a full build segment
     public CubeSegment appendSegment(CubeInstance cube) throws IOException {
-        return appendSegment(cube, 0, 0, 0, 0, null, null);
+        return appendSegment(cube, 0, Long.MAX_VALUE, 0, 0, null, null);
     }
 
     public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate) throws IOException {
         return appendSegment(cube, startDate, endDate, 0, 0, null, null);
     }
 
+    public CubeSegment appendSegment(CubeInstance cube, SourcePartition sourcePartition) throws IOException {
+        return appendSegment(cube, sourcePartition.getStartDate(), sourcePartition.getEndDate(), sourcePartition.getStartOffset(), sourcePartition.getEndOffset(), sourcePartition.getSourcePartitionOffsetStart(), sourcePartition.getSourcePartitionOffsetEnd());
+    }
+
     public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws IOException {
         checkBuildingSegment(cube);
 
-        if (sourcePartitionOffsetStart == null) {
-            sourcePartitionOffsetStart = Maps.newHashMap();
-        }
-        if (sourcePartitionOffsetEnd == null) {
-            sourcePartitionOffsetEnd = Maps.newHashMap();
-        }
-
-        boolean isOffsetsOn = endOffset != 0;
-        if (isOffsetsOn == true) {
-            checkSourceOffsets(startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
-        }
-
-        if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
-            // try figure out a reasonable start if missing
-            if (startDate == 0 && startOffset == 0) {
-                final CubeSegment last = getLatestSegment(cube);
-                if (last != null) {
-                    if (isOffsetsOn) {
-                        if (last.getSourceOffsetEnd() == Long.MAX_VALUE) {
-                            throw new IllegalStateException("There is already one pending for building segment, please submit request later.");
-                        }
-                        startOffset = last.getSourceOffsetEnd();
-                        sourcePartitionOffsetStart = last.getSourcePartitionOffsetEnd();
-                    } else {
-                        startDate = last.getDateRangeEnd();
-                    }
-                }
-            }
-
-        } else {
-            startDate = 0;
-            endDate = Long.MAX_VALUE;
-            startOffset = 0;
-            endOffset = 0;
-        }
-
         CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset);
         newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
         newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
@@ -638,7 +606,7 @@ public class CubeManager implements IRealizationProvider {
         return max;
     }
 
-    private CubeSegment getLatestSegment(CubeInstance cube) {
+    public CubeSegment getLatestSegment(CubeInstance cube) {
         List<CubeSegment> existing = cube.getSegments();
         if (existing.isEmpty()) {
             return null;
@@ -647,49 +615,28 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
-    private void checkBuildingSegment(CubeInstance cube) {
-        int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
-        if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
-            throw new IllegalStateException("There is already " + cube.getBuildingSegments().size() + " building segment; ");
+    private long calculateStartOffsetForAppendSegment(CubeInstance cube) {
+        List<CubeSegment> existing = cube.getSegments();
+        if (existing.isEmpty()) {
+            return 0;
+        } else {
+            return existing.get(existing.size() - 1).getSourceOffsetEnd();
         }
     }
 
-    private void checkSourceOffsets(long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) {
-        if (endOffset <= 0)
-            return;
-
-        if (startOffset >= endOffset) {
-            throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'");
-        }
-
-        if (startOffset > 0) {
-            if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
-                throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
-            }
-
-            long totalOffset = 0;
-            for (Long v : sourcePartitionOffsetStart.values()) {
-                totalOffset += v;
-            }
-
-            if (totalOffset != startOffset) {
-                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
-            }
+    private long calculateStartDateForAppendSegment(CubeInstance cube) {
+        List<CubeSegment> existing = cube.getSegments();
+        if (existing.isEmpty()) {
+            return cube.getDescriptor().getPartitionDateStart();
+        } else {
+            return existing.get(existing.size() - 1).getDateRangeEnd();
         }
+    }
 
-        if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
-            if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
-                throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
-            }
-
-            long totalOffset = 0;
-            for (Long v : sourcePartitionOffsetEnd.values()) {
-                totalOffset += v;
-            }
-
-            if (totalOffset != endOffset) {
-                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
-            }
+    private void checkBuildingSegment(CubeInstance cube) {
+        int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
+        if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
+            throw new IllegalStateException("There is already " + cube.getBuildingSegments().size() + " building segment; ");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index bb90d29..2904eb2 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -111,7 +111,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
         CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0, null, null);
         seg1.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000, 0, 0, null, null);
+        CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000, 0, 0, null, null);
         seg2.setStatus(SegmentStatusEnum.READY);
 
         CubeUpdate cubeBuilder = new CubeUpdate(cube);

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index 828a3a9..a5bd821 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -110,7 +110,7 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
         seg1.setStatus(SegmentStatusEnum.READY);
 
         // append second
-        CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000);
+        CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000);
 
         assertEquals(2, cube.getSegments().size());
         assertEquals(1000, seg2.getDateRangeStart());

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index ad0b1b1..93defa1 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -52,6 +52,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
     public static final String STEP_NAME_HIVE_CLEANUP = "Hive Cleanup";
+    public static final String STEP_NAME_KAFKA_CLEANUP = "Kafka intermediate file cleanup";
     public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
     public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
     public static final String STEP_NAME_BUILD_II = "Build Inverted Index";

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index e9216f9..5bff8a7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -18,15 +18,18 @@
 
 package org.apache.kylin.source;
 
-import org.apache.kylin.metadata.model.TableDesc;
-
 import java.util.List;
 
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.TableDesc;
+
 public interface ISource {
 
-    public <I> I adaptToBuildEngine(Class<I> engineInterface);
+    <I> I adaptToBuildEngine(Class<I> engineInterface);
+
+    ReadableTable createReadableTable(TableDesc tableDesc);
 
-    public ReadableTable createReadableTable(TableDesc tableDesc);
+    List<String> getMRDependentResources(TableDesc table);
 
-    public List<String> getMRDependentResources(TableDesc table);
+    SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition);
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
index e82c6ed..5ce9014 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -31,7 +31,7 @@ public class SourceFactory {
     private static ImplementationSwitch<ISource> sources;
     static {
         Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getSourceEngines();
-        sources = new ImplementationSwitch<ISource>(impls, ISource.class);
+        sources = new ImplementationSwitch<>(impls, ISource.class);
     }
 
     public static ISource tableSource(ISourceAware aware) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
new file mode 100644
index 0000000..8ba749d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import java.util.Map;
+
+/**
+ */
+public class SourcePartition {
+    long startDate;
+    long endDate;
+    long startOffset;
+    long endOffset;
+    Map<Integer, Long> sourcePartitionOffsetStart;
+    Map<Integer, Long> sourcePartitionOffsetEnd;
+
+    public SourcePartition() {
+    }
+
+    public SourcePartition(long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) {
+        this.startDate = startDate;
+        this.endDate = endDate;
+        this.startOffset = startOffset;
+        this.endOffset = endOffset;
+        this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
+        this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
+    }
+
+    public long getStartDate() {
+        return startDate;
+    }
+
+    public void setStartDate(long startDate) {
+        this.startDate = startDate;
+    }
+
+    public long getEndDate() {
+        return endDate;
+    }
+
+    public void setEndDate(long endDate) {
+        this.endDate = endDate;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+
+    public void setStartOffset(long startOffset) {
+        this.startOffset = startOffset;
+    }
+
+    public long getEndOffset() {
+        return endOffset;
+    }
+
+    public void setEndOffset(long endOffset) {
+        this.endOffset = endOffset;
+    }
+
+    public Map<Integer, Long> getSourcePartitionOffsetStart() {
+        return sourcePartitionOffsetStart;
+    }
+
+    public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
+        this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
+    }
+
+    public Map<Integer, Long> getSourcePartitionOffsetEnd() {
+        return sourcePartitionOffsetEnd;
+    }
+
+    public void setSourcePartitionOffsetEnd(Map<Integer, Long> sourcePartitionOffsetEnd) {
+        this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd;
+    }
+
+    public static SourcePartition getCopyOf(SourcePartition origin) {
+        SourcePartition copy = new SourcePartition();
+        copy.setStartDate(origin.getStartDate());
+        copy.setEndDate(origin.getEndDate());
+        copy.setStartOffset(origin.getStartOffset());
+        copy.setEndOffset(origin.getEndOffset());
+        copy.setSourcePartitionOffsetStart(origin.getSourcePartitionOffsetStart());
+        copy.setSourcePartitionOffsetEnd(origin.getSourcePartitionOffsetEnd());
+        return copy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 159e5cb..47eb9c3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -95,12 +95,14 @@ public class JobBuilderSupport {
     public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
         final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep();
         result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+        result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, getFactDistinctColumnsPath(jobId));
 
         CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
         CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
         CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
         CubingExecutableUtil.setIndexPath(this.getSecondaryIndexPath(jobId), result.getParams());
 
+
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index d6435b7..4e1be57 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -18,16 +18,29 @@
 
 package org.apache.kylin.engine.mr.steps;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +72,10 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         segment.setInputRecordsSize(sourceSizeBytes);
 
         try {
+            if (segment.isSourceOffsetsOn()) {
+                updateTimeRange(segment);
+            }
+
             cubeManager.promoteNewlyBuiltSegments(cube, segment);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } catch (IOException e) {
@@ -67,4 +84,54 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         }
     }
 
+    private void updateTimeRange(CubeSegment segment) throws IOException {
+        final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
+        final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+        final Path outputFile = new Path(outputPath, partitionCol.getName());
+
+        String minValue = null, maxValue = null, currentValue = null;
+        FSDataInputStream inputStream = null;
+        BufferedReader bufferedReader = null;
+        try {
+            FileSystem fs = HadoopUtil.getFileSystem(outputPath);
+            inputStream = fs.open(outputFile);
+            bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+            minValue = currentValue = bufferedReader.readLine();
+            while (currentValue != null) {
+                maxValue = currentValue;
+                currentValue = bufferedReader.readLine();
+            }
+        } catch (IOException e) {
+            throw e;
+        } finally {
+            IOUtils.closeQuietly(bufferedReader);
+            IOUtils.closeQuietly(inputStream);
+        }
+
+        final DataType partitionColType = partitionCol.getType();
+        FastDateFormat dateFormat;
+        if (partitionColType.isDate()) {
+            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
+        } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+        } else if (partitionColType.isStringFamily()) {
+            String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
+            if (StringUtils.isEmpty(partitionDateFormat)) {
+                partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+            }
+            dateFormat = DateFormat.getDateFormat(partitionDateFormat);
+        } else {
+            throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type");
+        }
+
+        try {
+            long startTime = dateFormat.parse(minValue).getTime();
+            long endTime = dateFormat.parse(maxValue).getTime();
+            segment.setDateRangeStart(startTime);
+            segment.setDateRangeEnd(endTime);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index f6c8801..180d8d9 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -55,6 +55,9 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
 import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
@@ -404,7 +407,10 @@ public class BuildCubeWithEngine {
     }
 
     private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
-        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, endDate);
+        CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        ISource source = SourceFactory.tableSource(cubeInstance);
+        SourcePartition partition = source.parsePartitionBeforeBuild(cubeInstance, new SourcePartition(0, endDate, 0, 0, null, null));
+        CubeSegment segment = cubeManager.appendSegment(cubeInstance, partition.getStartDate(), partition.getEndDate());
         DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 2faa8d0..000ac16 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.provision;
 
+import static java.lang.Thread.sleep;
+
 import java.io.File;
 import java.io.IOException;
 import java.text.ParseException;
@@ -32,7 +34,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Lists;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -44,8 +45,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
-import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -55,6 +54,11 @@ import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.job.streaming.Kafka10DataLoader;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -64,7 +68,7 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.lang.Thread.sleep;
+import com.google.common.collect.Lists;
 
 /**
  *  for streaming cubing case "test_streaming_table"
@@ -253,7 +257,10 @@ public class BuildCubeWithStream {
     }
 
     protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception {
-        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, null, null);
+        CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        ISource source = SourceFactory.tableSource(cubeInstance);
+        SourcePartition partition = source.parsePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 0, startOffset, endOffset, null, null));
+        CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition);
         DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index bc4d89c..4cbce1f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -51,6 +51,9 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.BadRequestException;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.kylin.source.SourcePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -209,8 +212,11 @@ public class JobService extends BasicService {
 
         DefaultChainedExecutable job;
 
+        ISource source = SourceFactory.tableSource(cube);
         if (buildType == CubeBuildTypeEnum.BUILD) {
-            CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
+            SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
+            sourcePartition = source.parsePartitionBeforeBuild(cube, sourcePartition);
+            CubeSegment newSeg = getCubeManager().appendSegment(cube, sourcePartition);
             job = EngineFactory.createBatchCubingJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
             CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
@@ -364,15 +370,11 @@ public class JobService extends BasicService {
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
     public JobInstance cancelJob(JobInstance job) throws IOException, JobException {
-        //        CubeInstance cube = this.getCubeManager().getCube(job.getRelatedCube());
-        //        for (BuildCubeJob cubeJob: listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING))) {
-        //            getExecutableManager().stopJob(cubeJob.getId());
-        //        }
         CubeInstance cubeInstance = getCubeManager().getCube(job.getRelatedCube());
         final String segmentIds = job.getRelatedSegment();
         for (String segmentId : StringUtils.split(segmentIds)) {
             final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
-            if (segment != null && segment.getStatus() == SegmentStatusEnum.NEW) {
+            if (segment != null && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getDateRangeEnd() == 0)) {
                 // Remove this segments
                 CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
                 cubeBuilder.setToRemoveSegs(segment);

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index e9cebea..af0a519 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -18,13 +18,18 @@
 
 package org.apache.kylin.source.hive;
 
-import com.google.common.collect.Lists;
+import java.util.List;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ReadableTable;
 
-import java.util.List;
+import com.google.common.collect.Lists;
+import org.apache.kylin.source.SourcePartition;
 
 //used by reflection
 public class HiveSource implements ISource {
@@ -49,4 +54,27 @@ public class HiveSource implements ISource {
         return Lists.newArrayList();
     }
 
+    @Override
+    public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) {
+        SourcePartition result = SourcePartition.getCopyOf(srcPartition);
+        CubeInstance cube = (CubeInstance) buildable;
+        if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == true) {
+            // normal partitioned cube
+            if (result.getStartDate() == 0) {
+                final CubeSegment last = cube.getLastSegment();
+                if (last != null) {
+                    result.setStartDate(last.getDateRangeEnd());
+                }
+            }
+        } else {
+            // full build
+            result.setStartDate(0);
+            result.setEndDate(Long.MAX_VALUE);
+        }
+
+        result.setStartOffset(0);
+        result.setEndOffset(0);
+        return result;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index fb2a949..cdd7272 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -19,8 +19,12 @@ package org.apache.kylin.source.kafka;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -30,26 +34,33 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
 import org.apache.kylin.source.kafka.job.MergeOffsetStep;
-import org.apache.kylin.source.kafka.job.SeekOffsetStep;
-import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KafkaMRInput implements IMRInput {
 
@@ -57,7 +68,7 @@ public class KafkaMRInput implements IMRInput {
 
     @Override
     public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        this.cubeSegment = (CubeSegment) flatDesc.getSegment();
+        this.cubeSegment = (CubeSegment)flatDesc.getSegment();
         return new BatchCubingInputSide(cubeSegment);
     }
 
@@ -65,8 +76,14 @@ public class KafkaMRInput implements IMRInput {
     public IMRTableInputFormat getTableInputFormat(TableDesc table) {
         KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
         KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity());
-        TableRef tableRef = cubeSegment.getCubeInstance().getDataModelDesc().findTable(table.getIdentity());
-        List<TblColRef> columns = Lists.newArrayList(tableRef.getColumns());
+        List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() {
+            @Nullable
+            @Override
+            public TblColRef apply(ColumnDesc input) {
+                return input.getRef();
+            }
+        });
+
         return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null);
     }
 
@@ -77,15 +94,11 @@ public class KafkaMRInput implements IMRInput {
 
     public static class KafkaTableInputFormat implements IMRTableInputFormat {
         private final CubeSegment cubeSegment;
-        private List<TblColRef> columns;
         private StreamingParser streamingParser;
-        private KafkaConfig kafkaConfig;
         private final JobEngineConfig conf;
 
         public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) {
             this.cubeSegment = cubeSegment;
-            this.columns = columns;
-            this.kafkaConfig = kafkaConfig;
             this.conf = conf;
             try {
                 streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
@@ -131,21 +144,9 @@ public class KafkaMRInput implements IMRInput {
 
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createUpdateSegmentOffsetStep(jobFlow.getId()));
             jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId()));
         }
 
-        public SeekOffsetStep createUpdateSegmentOffsetStep(String jobId) {
-            final SeekOffsetStep result = new SeekOffsetStep();
-            result.setName("Seek and update offset step");
-
-            CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
-            CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
-            CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
-
-            return result;
-        }
-
         private MapReduceExecutable createSaveKafkaDataStep(String jobId) {
             MapReduceExecutable result = new MapReduceExecutable();
 
@@ -167,14 +168,10 @@ public class KafkaMRInput implements IMRInput {
 
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            final UpdateTimeRangeStep result = new UpdateTimeRangeStep();
-            result.setName("Update Segment Time Range");
-            CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
-            CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
-            CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
-            JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "SYSTEM");
-            result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, jobBuilderSupport.getFactDistinctColumnsPath(jobFlow.getId()));
-            jobFlow.addTask(result);
+            GarbageCollectionStep step = new GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP);
+            step.setDataPath(outputPath);
+            jobFlow.addTask(step);
 
         }
 
@@ -211,4 +208,37 @@ public class KafkaMRInput implements IMRInput {
         }
     }
 
+    static class GarbageCollectionStep extends AbstractExecutable {
+        private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
+
+        @Override
+        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+            StringBuffer output = new StringBuffer();
+            try {
+                rmdirOnHDFS(getDataPath());
+            } catch (IOException e) {
+                logger.error("job:" + getId() + " execute finished with exception", e);
+                return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+            }
+
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+        }
+
+        private void rmdirOnHDFS(String path) throws IOException {
+            Path externalDataPath = new Path(path);
+            FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
+            if (fs.exists(externalDataPath)) {
+                fs.delete(externalDataPath, true);
+            }
+        }
+
+        public void setDataPath(String externalDataPath) {
+            setParam("dataPath", externalDataPath);
+        }
+
+        private String getDataPath() {
+            return getParam("dataPath");
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 208c0ce..bb676e6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -18,15 +18,24 @@
 
 package org.apache.kylin.source.kafka;
 
-import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.SourcePartition;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.util.KafkaClient;
 
-import java.util.List;
+import com.google.common.collect.Lists;
 
 //used by reflection
 public class KafkaSource implements ISource {
@@ -54,4 +63,94 @@ public class KafkaSource implements ISource {
         return dependentResources;
     }
 
+    @Override
+    public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) {
+        checkSourceOffsets(srcPartition);
+        final SourcePartition result = SourcePartition.getCopyOf(srcPartition);
+        final CubeInstance cube = (CubeInstance) buildable;
+        if (result.getStartOffset() == 0) {
+            final CubeSegment last = cube.getLastSegment();
+            if (last != null) {
+                // from last seg's end position
+                result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd());
+            } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) {
+                result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
+            } else {
+                // from the topic's very begining;
+                result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
+            }
+        }
+
+        final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cube.getConfig()).getKafkaConfig(cube.getFactTable());
+        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
+        final String topic = kafakaConfig.getTopic();
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
+            final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+            if (partitionInfos.size() > result.getSourcePartitionOffsetStart().size()) {
+                // has new partition added
+                for (int x = result.getSourcePartitionOffsetStart().size(); x < partitionInfos.size(); x++) {
+                    long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition());
+                    result.getSourcePartitionOffsetStart().put(partitionInfos.get(x).partition(), earliest);
+                }
+            }
+        }
+
+        if (result.getEndOffset() == Long.MAX_VALUE) {
+            result.setSourcePartitionOffsetEnd(KafkaClient.getCurrentOffsets(cube));
+        }
+
+        long totalStartOffset = 0, totalEndOffset = 0;
+        for (Long v : result.getSourcePartitionOffsetStart().values()) {
+            totalStartOffset += v;
+        }
+        for (Long v : result.getSourcePartitionOffsetEnd().values()) {
+            totalEndOffset += v;
+        }
+
+        result.setStartOffset(totalStartOffset);
+        result.setEndOffset(totalEndOffset);
+
+        return result;
+    }
+
+    private void checkSourceOffsets(SourcePartition srcPartition) {
+        long startOffset = srcPartition.getStartOffset();
+        long endOffset = srcPartition.getEndOffset();
+        final Map<Integer, Long> sourcePartitionOffsetStart = srcPartition.getSourcePartitionOffsetStart();
+        final Map<Integer, Long> sourcePartitionOffsetEnd = srcPartition.getSourcePartitionOffsetEnd();
+        if (endOffset <= 0 || startOffset >= endOffset) {
+            throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'");
+        }
+
+        if (startOffset > 0) {
+            if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
+                throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
+            }
+
+            long totalOffset = 0;
+            for (Long v : sourcePartitionOffsetStart.values()) {
+                totalOffset += v;
+            }
+
+            if (totalOffset != startOffset) {
+                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
+            }
+        }
+
+        if (endOffset > 0 && endOffset != Long.MAX_VALUE) {
+            if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
+                throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
+            }
+
+            long totalOffset = 0;
+            for (Long v : sourcePartitionOffsetEnd.values()) {
+                totalOffset += v;
+            }
+
+            if (totalOffset != endOffset) {
+                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index c538acb..157d83c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -61,6 +61,7 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("parserName")
     private String parserName;
 
+    @Deprecated
     @JsonProperty("margin")
     private long margin;
 
@@ -120,10 +121,12 @@ public class KafkaConfig extends RootPersistentEntity {
         this.name = name;
     }
 
+    @Deprecated
     public long getMargin() {
         return margin;
     }
 
+    @Deprecated
     public void setMargin(long margin) {
         this.margin = margin;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
index 81f6bac..fe0e2cc 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java
@@ -23,9 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -36,6 +33,10 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kylin.source.kafka.util.KafkaClient;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 
 /**
  * Convert Kafka topic to Hadoop InputFormat
@@ -45,16 +46,16 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
 
     @Override
     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
-        Configuration conf = context.getConfiguration();
+        final Configuration conf = context.getConfiguration();
 
-        String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS);
-        String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC);
-        String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
-        Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
-        Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
+        final String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS);
+        final String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC);
+        final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP);
+        final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN));
+        final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX));
 
-        Map<Integer, Long> startOffsetMap = Maps.newHashMap();
-        Map<Integer, Long> endOffsetMap = Maps.newHashMap();
+        final Map<Integer, Long> startOffsetMap = Maps.newHashMap();
+        final Map<Integer, Long> endOffsetMap = Maps.newHashMap();
         for (int i = partitionMin; i <= partitionMax; i++) {
             String start = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_START + i);
             String end = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_END + i);
@@ -64,23 +65,19 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
             }
         }
 
-        List<InputSplit> splits = new ArrayList<InputSplit>();
+        final List<InputSplit> splits = new ArrayList<InputSplit>();
         try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) {
-            List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
+            final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic);
             Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side");
             for (int i = 0; i < partitionInfos.size(); i++) {
-                PartitionInfo partition = partitionInfos.get(i);
+                final PartitionInfo partition = partitionInfos.get(i);
                 int partitionId = partition.partition();
                 if (startOffsetMap.containsKey(partitionId) == false) {
                     throw new IllegalStateException("Partition '" + partitionId + "' not exists.");
                 }
 
-                if (endOffsetMap.get(partitionId) >  startOffsetMap.get(partitionId)) {
-                    InputSplit split = new KafkaInputSplit(
-                            brokers, inputTopic,
-                            partitionId,
-                            startOffsetMap.get(partitionId), endOffsetMap.get(partitionId)
-                    );
+                if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) {
+                    InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId));
                     splits.add(split);
                 }
             }
@@ -89,9 +86,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> {
     }
 
     @Override
-    public RecordReader<LongWritable, BytesWritable> createRecordReader(
-            InputSplit arg0, TaskAttemptContext arg1) throws IOException,
-            InterruptedException {
+    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
         return new KafkaInputRecordReader();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
index 98d6e4d..eea0a4e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java
@@ -17,8 +17,10 @@
  */
 package org.apache.kylin.source.kafka.job;
 
-import org.apache.kylin.source.kafka.KafkaConfigManager;
-import org.apache.kylin.source.kafka.util.KafkaClient;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kylin.cube.CubeInstance;
@@ -30,15 +32,14 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.util.KafkaClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
 /**
+ * Deprecated, not in use.
  */
 public class SeekOffsetStep extends AbstractExecutable {
 
@@ -62,7 +63,7 @@ public class SeekOffsetStep extends AbstractExecutable {
         }
 
         final Map<Integer, Long> cubeDescStart = cube.getDescriptor().getPartitionOffsetStart();
-        if (cube.getSegments().size() == 1 &&  cubeDescStart != null && cubeDescStart.size() > 0) {
+        if (cube.getSegments().size() == 1 && cubeDescStart != null && cubeDescStart.size() > 0) {
             logger.info("This is the first segment, and has initiated the start offsets, will use it");
             startOffsets = cubeDescStart;
         }
@@ -139,8 +140,6 @@ public class SeekOffsetStep extends AbstractExecutable {
 
             return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes");
         }
-
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
index d19aa63..feebd52 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java
@@ -17,29 +17,16 @@
  */
 package org.apache.kylin.source.kafka.job;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,55 +50,56 @@ public class UpdateTimeRangeStep extends AbstractExecutable {
         final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
         final Path outputFile = new Path(outputPath, partitionCol.getName());
 
-        String minValue = null, maxValue = null, currentValue = null;
-        FSDataInputStream inputStream = null;
-        BufferedReader bufferedReader = null;
-        try {
-            FileSystem fs = HadoopUtil.getFileSystem(outputPath);
-            inputStream = fs.open(outputFile);
-            bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
-            minValue = currentValue = bufferedReader.readLine();
-            while (currentValue != null) {
-                maxValue = currentValue;
-                currentValue = bufferedReader.readLine();
-            }
-        } catch (IOException e) {
-            logger.error("fail to read file " + outputFile, e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        } finally {
-            IOUtils.closeQuietly(bufferedReader);
-            IOUtils.closeQuietly(inputStream);
-        }
-
-        final DataType partitionColType = partitionCol.getType();
-        FastDateFormat dateFormat;
-        if (partitionColType.isDate()) {
-            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
-        } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
-            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
-        } else if (partitionColType.isStringFamily()) {
-            String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
-            if (StringUtils.isEmpty(partitionDateFormat)) {
-                partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
-            }
-            dateFormat = DateFormat.getDateFormat(partitionDateFormat);
-        } else {
-            return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
-        }
-
-        try {
-            long startTime = dateFormat.parse(minValue).getTime();
-            long endTime = dateFormat.parse(maxValue).getTime();
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            segment.setDateRangeStart(startTime);
-            segment.setDateRangeEnd(endTime);
-            cubeBuilder.setToUpdateSegs(segment);
-            cubeManager.updateCube(cubeBuilder);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (Exception e) {
-            logger.error("fail to update cube segment offset", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
+        return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+//        String minValue = null, maxValue = null, currentValue = null;
+//        FSDataInputStream inputStream = null;
+//        BufferedReader bufferedReader = null;
+//        try {
+//            FileSystem fs = HadoopUtil.getFileSystem(outputPath);
+//            inputStream = fs.open(outputFile);
+//            bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+//            minValue = currentValue = bufferedReader.readLine();
+//            while (currentValue != null) {
+//                maxValue = currentValue;
+//                currentValue = bufferedReader.readLine();
+//            }
+//        } catch (IOException e) {
+//            logger.error("fail to read file " + outputFile, e);
+//            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+//        } finally {
+//            IOUtils.closeQuietly(bufferedReader);
+//            IOUtils.closeQuietly(inputStream);
+//        }
+//
+//        final DataType partitionColType = partitionCol.getType();
+//        FastDateFormat dateFormat;
+//        if (partitionColType.isDate()) {
+//            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
+//        } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
+//            dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+//        } else if (partitionColType.isStringFamily()) {
+//            String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat();
+//            if (StringUtils.isEmpty(partitionDateFormat)) {
+//                partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN;
+//            }
+//            dateFormat = DateFormat.getDateFormat(partitionDateFormat);
+//        } else {
+//            return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type");
+//        }
+//
+//        try {
+//            long startTime = dateFormat.parse(minValue).getTime();
+//            long endTime = dateFormat.parse(maxValue).getTime();
+//            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+//            segment.setDateRangeStart(startTime);
+//            segment.setDateRangeEnd(endTime);
+//            cubeBuilder.setToUpdateSegs(segment);
+//            cubeManager.updateCube(cubeBuilder);
+//            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+//        } catch (Exception e) {
+//            logger.error("fail to update cube segment offset", e);
+//            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+//        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3cd68478/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 685af6a..b56f432 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -59,7 +59,7 @@ public class KafkaClient {
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 50);
-        props.put("timeout.ms", "30000");
+        props.put("request.timeout.ms", "30000");
         if (properties != null) {
             for (Map.Entry entry : properties.entrySet()) {
                 props.put(entry.getKey(), entry.getValue());
@@ -126,7 +126,25 @@ public class KafkaClient {
         try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
             final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
             for (PartitionInfo partitionInfo : partitionInfos) {
-                long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition());
+                long latest = getLatestOffset(consumer, topic, partitionInfo.partition());
+                startOffsets.put(partitionInfo.partition(), latest);
+            }
+        }
+        return startOffsets;
+    }
+
+
+    public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) {
+        final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cubeInstance.getConfig()).getKafkaConfig(cubeInstance.getFactTable());
+
+        final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig);
+        final String topic = kafakaConfig.getTopic();
+
+        Map<Integer, Long> startOffsets = Maps.newHashMap();
+        try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) {
+            final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+            for (PartitionInfo partitionInfo : partitionInfos) {
+                long latest = getEarliestOffset(consumer, topic, partitionInfo.partition());
                 startOffsets.put(partitionInfo.partition(), latest);
             }
         }


Mime
View raw message