kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [36/50] incubator-kylin git commit: KYLIN-671 check lastdatatime
Date Sun, 26 Apr 2015 04:27:18 GMT
KYLIN-671 check lastdatatime


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c9f06c1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c9f06c1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c9f06c1c

Branch: refs/heads/streaming-localdict
Commit: c9f06c1c6cf74cbed306918463999a98cf2a0bf8
Parents: fba32d8
Author: honma <honma@ebay.com>
Authored: Wed Apr 22 16:20:17 2015 +0800
Committer: honma <honma@ebay.com>
Committed: Wed Apr 22 16:46:02 2015 +0800

----------------------------------------------------------------------
 .../kylin/metadata/tuple/IEvaluatableTuple.java |   2 +-
 .../apache/kylin/query/test/KylinQueryTest.java |   2 +-
 .../cache/CacheFledgedStorageEngine.java        |   6 +-
 .../endpoint/EndpointTupleIterator.java         |  49 +++---
 .../hbase/coprocessor/endpoint/IIEndpoint.java  |   9 +-
 .../endpoint/generated/IIProtos.java            | 148 ++++++++++++++++---
 .../coprocessor/endpoint/protobuf/II.proto      |   1 +
 7 files changed, 167 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c9f06c1c/metadata/src/main/java/org/apache/kylin/metadata/tuple/IEvaluatableTuple.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tuple/IEvaluatableTuple.java
b/metadata/src/main/java/org/apache/kylin/metadata/tuple/IEvaluatableTuple.java
index ce957b7..d5a5061 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/tuple/IEvaluatableTuple.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/tuple/IEvaluatableTuple.java
@@ -4,6 +4,6 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 public interface IEvaluatableTuple {
 
-    public Object getValue(TblColRef col);
+    Object getValue(TblColRef col);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c9f06c1c/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
index 6fa5f90..687871e 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
@@ -105,7 +105,7 @@ public class KylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleExecuteQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/sql/query39.sql";
+        String queryFileName = "src/test/resources/query/sql/query53.sql";
 
         File sqlFile = new File(queryFileName);
         String sql = getTextFromFile(sqlFile);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c9f06c1c/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
index 4f8ad2f..805d414 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStorageEngine.java
@@ -80,7 +80,9 @@ public class CacheFledgedStorageEngine implements IStorageEngine {
     @Override
     public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest)
{
 
-        boolean needUpdateCache = true;
+        //enable storage layer cache iff ts column is contained in filter
+        boolean needUpdateCache = sqlDigest.groupbyColumns.contains(partitionColRef);
+
         final StreamSQLDigest streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
         StreamSQLResult cachedResult = null;
         Cache cache = cacheManager.getCache(STORAGE_LAYER_TUPLE_CACHE);
@@ -152,7 +154,7 @@ public class CacheFledgedStorageEngine implements IStorageEngine {
                         List<Range<Long>> cachablePeriods = RangeUtil.remove(tsRange,
cacheExclude);
                         if (cachablePeriods.size() == 1) {
                             if (!tsRange.equals(cachablePeriods.get(0))) {
-                                logger.info("With respect to each shard's build status, the
cachable tsRange shrinks from " + RangeUtil.formatTsRange(tsRange) + " to " + RangeUtil.formatTsRange(cachablePeriods.get(0)));
+                                logger.info("With respect to each shard's build status, the
cacheable tsRange shrinks from " + RangeUtil.formatTsRange(tsRange) + " to " + RangeUtil.formatTsRange(cachablePeriods.get(0)));
                             }
                             tsRange = cachablePeriods.get(0);
                         } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c9f06c1c/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index 6eb539c..da4cc56 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -21,8 +21,8 @@ package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 import java.io.IOException;
 import java.util.*;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.*;
+import javax.annotation.Nullable;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang3.SerializationUtils;
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.RangeUtil;
 import org.apache.kylin.cube.kv.RowKeyColumnIO;
 import org.apache.kylin.invertedindex.IISegment;
@@ -55,6 +54,8 @@ import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.collect.*;
 import com.google.protobuf.ByteString;
 
 /**
@@ -85,9 +86,8 @@ public class EndpointTupleIterator implements ITupleIterator {
     private HTableInterface table = null;
 
     private TblColRef partitionCol;
-    private int shardCount;
-    private long currentShardLastDataTime = Long.MIN_VALUE;
     private List<Long> shardLastDataTimes = Lists.newArrayList();
+    private long lastDataTime;
     private int rowsInAllMetric = 0;
 
     public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef>
groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn) throws
Throwable {
@@ -144,12 +144,25 @@ public class EndpointTupleIterator implements ITupleIterator {
 
         IIProtos.IIRequest endpointRequest = prepareRequest();
 
-        Collection<List<IIProtos.IIResponse.IIRow>> shardResults = getResults(endpointRequest,
table);
-        this.shardCount = shardResults.size();
-        this.regionResponsesIterator = shardResults.iterator();
+        Collection<IIProtos.IIResponse> shardResults = getResults(endpointRequest,
table);
+
+        this.lastDataTime = Collections.min(Collections2.transform(shardResults, new Function<IIProtos.IIResponse,
Long>() {
+            @Nullable
+            @Override
+            public Long apply(IIProtos.IIResponse input) {
+                return input.getLatestDataTime();
+            }
+        }));
+
+        this.regionResponsesIterator = Collections2.transform(shardResults, new Function<IIProtos.IIResponse,
List<IIProtos.IIResponse.IIRow>>() {
+            @Nullable
+            @Override
+            public List<IIProtos.IIResponse.IIRow> apply(@Nullable IIProtos.IIResponse
input) {
+                return input.getRowsList();
+            }
+        }).iterator();
 
         if (this.regionResponsesIterator.hasNext()) {
-            this.currentShardLastDataTime = Long.MIN_VALUE;
             this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next());
         } else {
             this.tupleIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR;
@@ -195,10 +208,7 @@ public class EndpointTupleIterator implements ITupleIterator {
     @Override
     public boolean hasNext() {
         while (!this.tupleIterator.hasNext()) {
-            this.shardLastDataTimes.add(this.currentShardLastDataTime);
-
             if (this.regionResponsesIterator.hasNext()) {
-                this.currentShardLastDataTime = Long.MIN_VALUE;
                 this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next());
             } else {
                 return false;
@@ -216,9 +226,6 @@ public class EndpointTupleIterator implements ITupleIterator {
         }
 
         ITuple tuple = this.tupleIterator.next();
-
-        //update shardLastDataTimes
-        this.currentShardLastDataTime = Tuple.getTs(tuple,this.partitionCol);
         return tuple;
     }
 
@@ -236,9 +243,7 @@ public class EndpointTupleIterator implements ITupleIterator {
 
     @Override
     public Range<Long> getCacheExcludedPeriod() {
-        Preconditions.checkArgument(shardCount == this.shardLastDataTimes.size());
-        long min = Collections.min(this.shardLastDataTimes);
-        return Ranges.atLeast(min);//inclusive
+        return Ranges.atLeast(lastDataTime + 1);//notice +1
     }
 
     private IIProtos.IIRequest prepareRequest() throws IOException {
@@ -260,9 +265,9 @@ public class EndpointTupleIterator implements ITupleIterator {
     }
 
     //TODO : async callback
-    private Collection<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest
request, HTableInterface table) throws Throwable {
-        Map<byte[], List<IIProtos.IIResponse.IIRow>> results = table.coprocessorService(IIProtos.RowsService.class,
null, null, new Batch.Call<IIProtos.RowsService, List<IIProtos.IIResponse.IIRow>>()
{
-            public List<IIProtos.IIResponse.IIRow> call(IIProtos.RowsService rowsService)
throws IOException {
+    private Collection<IIProtos.IIResponse> getResults(final IIProtos.IIRequest request,
HTableInterface table) throws Throwable {
+        Map<byte[], IIProtos.IIResponse> results = table.coprocessorService(IIProtos.RowsService.class,
null, null, new Batch.Call<IIProtos.RowsService, IIProtos.IIResponse>() {
+            public IIProtos.IIResponse call(IIProtos.RowsService rowsService) throws IOException
{
                 ServerRpcController controller = new ServerRpcController();
                 BlockingRpcCallback<IIProtos.IIResponse> rpcCallback = new BlockingRpcCallback<>();
                 rowsService.getRows(controller, request, rpcCallback);
@@ -271,7 +276,7 @@ public class EndpointTupleIterator implements ITupleIterator {
                     throw controller.getFailedOn();
                 }
 
-                return response.getRowsList();
+                return response;
             }
         });
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c9f06c1c/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index 7ac9f2b..2dc93e7 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -93,6 +93,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor,
Cop
             logger.info("Start key of the region is: " + BytesUtil.toReadableText(regionStartKey)
+ ", making shard to be :" + shard);
 
             if (tsRange.hasLowerBound()) {
+                //differentiate GT and GTE seems not very beneficial
                 Preconditions.checkArgument(shard != -1, "Shard is -1!");
                 long tsStart = tsRange.lowerEndpoint();
                 logger.info("ts start is " + tsStart);
@@ -112,6 +113,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor,
Cop
             }
 
             if (tsRange.hasUpperBound()) {
+                //differentiate LT and LTE seems not very beneficial
                 Preconditions.checkArgument(shard != -1, "Shard is -1");
                 long tsEnd = tsRange.upperEndpoint();
                 logger.info("ts end is " + tsEnd);
@@ -194,10 +196,11 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor,
Cop
         final byte[] buffer = new byte[CoprocessorConstants.METRIC_SERIALIZE_BUFFER_SIZE];
 
         int iteratedSliceCount = 0;
+        long latestSliceTs = Long.MIN_VALUE;
         for (Slice slice : slices) {
+            latestSliceTs = slice.getTimestamp();
             iteratedSliceCount++;
 
-            //TODO localdict
             //dictionaries for fact table columns can not be determined while streaming.
             //a piece of dict coincide with each Slice, we call it "local dict"
             final Dictionary<?>[] localDictionaries = slice.getLocalDictionaries();
@@ -240,6 +243,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor,
Cop
             rowBuilder.setMeasures(ByteString.copyFrom(buffer, 0, length));
             responseBuilder.addRows(rowBuilder.build());
         }
+        responseBuilder.setLatestDataTime(latestSliceTs);
 
         return responseBuilder.build();
     }
@@ -253,7 +257,9 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor,
Cop
         int totalSize = 0;
         byte[] recordBuffer = new byte[byteFormLen];
         int iteratedSliceCount = 0;
+        long latestSliceTs = Long.MIN_VALUE;
         for (Slice slice : slices) {
+            latestSliceTs = slice.getTimestamp();
             iteratedSliceCount++;
             CoprocessorFilter newFilter = CoprocessorFilter.fromFilter(new LocalDictionary(slice.getLocalDictionaries(),
type, slice.getInfo()), filter.getFilter(), FilterDecorator.FilterConstantsTreatment.REPLACE_WITH_LOCAL_DICT);
             ConciseSet result = null;
@@ -273,6 +279,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor,
Cop
                 totalSize += byteFormLen;
             }
         }
+        responseBuilder.setLatestDataTime(latestSliceTs);
 
         logger.info("Iterated Slices count: " + iteratedSliceCount);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c9f06c1c/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/generated/IIProtos.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/generated/IIProtos.java
b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/generated/IIProtos.java
index 9c3d4b5..d3c2397 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/generated/IIProtos.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/generated/IIProtos.java
@@ -51,13 +51,13 @@ public final class IIProtos {
      */
     com.google.protobuf.ByteString getAggregator();
 
-    // optional bytes tsRange = 5;
+    // optional bytes tsRange = 6;
     /**
-     * <code>optional bytes tsRange = 5;</code>
+     * <code>optional bytes tsRange = 6;</code>
      */
     boolean hasTsRange();
     /**
-     * <code>optional bytes tsRange = 5;</code>
+     * <code>optional bytes tsRange = 6;</code>
      */
     com.google.protobuf.ByteString getTsRange();
   }
@@ -132,7 +132,7 @@ public final class IIProtos {
               aggregator_ = input.readBytes();
               break;
             }
-            case 42: {
+            case 50: {
               bitField0_ |= 0x00000010;
               tsRange_ = input.readBytes();
               break;
@@ -241,17 +241,17 @@ public final class IIProtos {
       return aggregator_;
     }
 
-    // optional bytes tsRange = 5;
-    public static final int TSRANGE_FIELD_NUMBER = 5;
+    // optional bytes tsRange = 6;
+    public static final int TSRANGE_FIELD_NUMBER = 6;
     private com.google.protobuf.ByteString tsRange_;
     /**
-     * <code>optional bytes tsRange = 5;</code>
+     * <code>optional bytes tsRange = 6;</code>
      */
     public boolean hasTsRange() {
       return ((bitField0_ & 0x00000010) == 0x00000010);
     }
     /**
-     * <code>optional bytes tsRange = 5;</code>
+     * <code>optional bytes tsRange = 6;</code>
      */
     public com.google.protobuf.ByteString getTsRange() {
       return tsRange_;
@@ -305,7 +305,7 @@ public final class IIProtos {
         output.writeBytes(4, aggregator_);
       }
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
-        output.writeBytes(5, tsRange_);
+        output.writeBytes(6, tsRange_);
       }
       getUnknownFields().writeTo(output);
     }
@@ -334,7 +334,7 @@ public final class IIProtos {
       }
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeBytesSize(5, tsRange_);
+          .computeBytesSize(6, tsRange_);
       }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
@@ -802,22 +802,22 @@ public final class IIProtos {
         return this;
       }
 
-      // optional bytes tsRange = 5;
+      // optional bytes tsRange = 6;
       private com.google.protobuf.ByteString tsRange_ = com.google.protobuf.ByteString.EMPTY;
       /**
-       * <code>optional bytes tsRange = 5;</code>
+       * <code>optional bytes tsRange = 6;</code>
        */
       public boolean hasTsRange() {
         return ((bitField0_ & 0x00000010) == 0x00000010);
       }
       /**
-       * <code>optional bytes tsRange = 5;</code>
+       * <code>optional bytes tsRange = 6;</code>
        */
       public com.google.protobuf.ByteString getTsRange() {
         return tsRange_;
       }
       /**
-       * <code>optional bytes tsRange = 5;</code>
+       * <code>optional bytes tsRange = 6;</code>
        */
       public Builder setTsRange(com.google.protobuf.ByteString value) {
         if (value == null) {
@@ -829,7 +829,7 @@ public final class IIProtos {
         return this;
       }
       /**
-       * <code>optional bytes tsRange = 5;</code>
+       * <code>optional bytes tsRange = 6;</code>
        */
       public Builder clearTsRange() {
         bitField0_ = (bitField0_ & ~0x00000010);
@@ -876,6 +876,16 @@ public final class IIProtos {
      */
     org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos.IIResponse.IIRowOrBuilder
getRowsOrBuilder(
         int index);
+
+    // required int64 latestDataTime = 2;
+    /**
+     * <code>required int64 latestDataTime = 2;</code>
+     */
+    boolean hasLatestDataTime();
+    /**
+     * <code>required int64 latestDataTime = 2;</code>
+     */
+    long getLatestDataTime();
   }
   /**
    * Protobuf type {@code IIResponse}
@@ -936,6 +946,11 @@ public final class IIProtos {
               rows_.add(input.readMessage(org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos.IIResponse.IIRow.PARSER,
extensionRegistry));
               break;
             }
+            case 16: {
+              bitField0_ |= 0x00000001;
+              latestDataTime_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1516,6 +1531,7 @@ public final class IIProtos {
       // @@protoc_insertion_point(class_scope:IIResponse.IIRow)
     }
 
+    private int bitField0_;
     // repeated .IIResponse.IIRow rows = 1;
     public static final int ROWS_FIELD_NUMBER = 1;
     private java.util.List<org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos.IIResponse.IIRow>
rows_;
@@ -1552,14 +1568,35 @@ public final class IIProtos {
       return rows_.get(index);
     }
 
+    // required int64 latestDataTime = 2;
+    public static final int LATESTDATATIME_FIELD_NUMBER = 2;
+    private long latestDataTime_;
+    /**
+     * <code>required int64 latestDataTime = 2;</code>
+     */
+    public boolean hasLatestDataTime() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>required int64 latestDataTime = 2;</code>
+     */
+    public long getLatestDataTime() {
+      return latestDataTime_;
+    }
+
     private void initFields() {
       rows_ = java.util.Collections.emptyList();
+      latestDataTime_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
+      if (!hasLatestDataTime()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
       for (int i = 0; i < getRowsCount(); i++) {
         if (!getRows(i).isInitialized()) {
           memoizedIsInitialized = 0;
@@ -1576,6 +1613,9 @@ public final class IIProtos {
       for (int i = 0; i < rows_.size(); i++) {
         output.writeMessage(1, rows_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(2, latestDataTime_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1589,6 +1629,10 @@ public final class IIProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(1, rows_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(2, latestDataTime_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1614,6 +1658,11 @@ public final class IIProtos {
       boolean result = true;
       result = result && getRowsList()
           .equals(other.getRowsList());
+      result = result && (hasLatestDataTime() == other.hasLatestDataTime());
+      if (hasLatestDataTime()) {
+        result = result && (getLatestDataTime()
+            == other.getLatestDataTime());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1631,6 +1680,10 @@ public final class IIProtos {
         hash = (37 * hash) + ROWS_FIELD_NUMBER;
         hash = (53 * hash) + getRowsList().hashCode();
       }
+      if (hasLatestDataTime()) {
+        hash = (37 * hash) + LATESTDATATIME_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getLatestDataTime());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1747,6 +1800,8 @@ public final class IIProtos {
         } else {
           rowsBuilder_.clear();
         }
+        latestDataTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
         return this;
       }
 
@@ -1774,6 +1829,7 @@ public final class IIProtos {
       public org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos.IIResponse
buildPartial() {
         org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos.IIResponse
result = new org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos.IIResponse(this);
         int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
         if (rowsBuilder_ == null) {
           if (((bitField0_ & 0x00000001) == 0x00000001)) {
             rows_ = java.util.Collections.unmodifiableList(rows_);
@@ -1783,6 +1839,11 @@ public final class IIProtos {
         } else {
           result.rows_ = rowsBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.latestDataTime_ = latestDataTime_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -1824,11 +1885,18 @@ public final class IIProtos {
             }
           }
         }
+        if (other.hasLatestDataTime()) {
+          setLatestDataTime(other.getLatestDataTime());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
 
       public final boolean isInitialized() {
+        if (!hasLatestDataTime()) {
+          
+          return false;
+        }
         for (int i = 0; i < getRowsCount(); i++) {
           if (!getRows(i).isInitialized()) {
             
@@ -2097,6 +2165,39 @@ public final class IIProtos {
         return rowsBuilder_;
       }
 
+      // required int64 latestDataTime = 2;
+      private long latestDataTime_ ;
+      /**
+       * <code>required int64 latestDataTime = 2;</code>
+       */
+      public boolean hasLatestDataTime() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>required int64 latestDataTime = 2;</code>
+       */
+      public long getLatestDataTime() {
+        return latestDataTime_;
+      }
+      /**
+       * <code>required int64 latestDataTime = 2;</code>
+       */
+      public Builder setLatestDataTime(long value) {
+        bitField0_ |= 0x00000002;
+        latestDataTime_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required int64 latestDataTime = 2;</code>
+       */
+      public Builder clearLatestDataTime() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        latestDataTime_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:IIResponse)
     }
 
@@ -2368,13 +2469,14 @@ public final class IIProtos {
       "/storage/hbase/coprocessor/endpoint/prot" +
       "obuf/II.proto\"a\n\tIIRequest\022\014\n\004type\030\001 \002(\014" +
       "\022\016\n\006filter\030\002 \002(\014\022\021\n\tprojector\030\003 \002(\014\022\022\n\na"
+
-      "ggregator\030\004 \002(\014\022\017\n\007tsRange\030\005 \001(\014\"Y\n\nIIRe" +
-      "sponse\022\037\n\004rows\030\001 \003(\0132\021.IIResponse.IIRow\032" +
-      "*\n\005IIRow\022\017\n\007columns\030\001 \002(\014\022\020\n\010measures\030\002
" +
-      "\001(\01421\n\013RowsService\022\"\n\007getRows\022\n.IIReques" +
-      "t\032\013.IIResponseBQ\n=org.apache.kylin.stora" +
-      "ge.hbase.coprocessor.endpoint.generatedB",
-      "\010IIProtosH\001\210\001\001\240\001\001"
+      "ggregator\030\004 \002(\014\022\017\n\007tsRange\030\006 \001(\014\"q\n\nIIRe" +
+      "sponse\022\037\n\004rows\030\001 \003(\0132\021.IIResponse.IIRow\022" +
+      "\026\n\016latestDataTime\030\002 \002(\003\032*\n\005IIRow\022\017\n\007colu" +
+      "mns\030\001 \002(\014\022\020\n\010measures\030\002 \001(\01421\n\013RowsServi" +
+      "ce\022\"\n\007getRows\022\n.IIRequest\032\013.IIResponseBQ" +
+      "\n=org.apache.kylin.storage.hbase.coproce",
+      "ssor.endpoint.generatedB\010IIProtosH\001\210\001\001\240\001" +
+      "\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -2392,7 +2494,7 @@ public final class IIProtos {
           internal_static_IIResponse_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_IIResponse_descriptor,
-              new java.lang.String[] { "Rows", });
+              new java.lang.String[] { "Rows", "LatestDataTime", });
           internal_static_IIResponse_IIRow_descriptor =
             internal_static_IIResponse_descriptor.getNestedTypes().get(0);
           internal_static_IIResponse_IIRow_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c9f06c1c/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/protobuf/II.proto
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/protobuf/II.proto
b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/protobuf/II.proto
index 568dffc..265c470 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/protobuf/II.proto
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/protobuf/II.proto
@@ -22,6 +22,7 @@ message IIResponse {
     optional bytes measures = 2;
   }
   repeated IIRow rows = 1;
+  required int64 latestDataTime = 2;
 }
 
 service RowsService {


Mime
View raw message