omid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (OMID-102) Implement visibility filter as pure HBase Filter
Date Wed, 01 Aug 2018 15:56:00 GMT

    [ https://issues.apache.org/jira/browse/OMID-102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565532#comment-16565532
] 

ASF GitHub Bot commented on OMID-102:
-------------------------------------

Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/incubator-omid/pull/41#discussion_r206935434
  
    --- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java
---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.omid.transaction;
    +
    +import com.google.common.base.Optional;
    +import com.sun.istack.Nullable;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.CellUtil;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.filter.Filter;
    +import org.apache.hadoop.hbase.filter.FilterBase;
    +import org.apache.hadoop.hbase.util.Bytes;
    +
    +import java.io.IOException;
    +import java.util.*;
    +
    +public class TransactionVisibilityFilter extends FilterBase {
    +
    +    // optional sub-filter to apply to visible cells
    +    private final Filter userFilter;
    +    private final SnapshotFilterImpl snapshotFilter;
    +    private final Map<Long ,Long> shadowCellCache;
    +    private final HBaseTransaction hbaseTransaction;
    +    private final Map<String, Long> familyDeletionCache;
    +
    +    public SnapshotFilter getSnapshotFilter() {
    +        return snapshotFilter;
    +    }
    +
    +    public TransactionVisibilityFilter(@Nullable Filter cellFilter,
    +                                       SnapshotFilterImpl snapshotFilter,
    +                                       HBaseTransaction hbaseTransaction) {
    +        this.userFilter = cellFilter;
    +        this.snapshotFilter = snapshotFilter;
    +        shadowCellCache = new HashMap<>();
    +        this.hbaseTransaction = hbaseTransaction;
    +        familyDeletionCache = new HashMap<String, Long>();
    +    }
    +
    +    @Override
    +    public ReturnCode filterKeyValue(Cell v) throws IOException {
    +        if (CellUtils.isShadowCell(v)) {
    +            Long commitTs =  Bytes.toLong(CellUtil.cloneValue(v));
    +            shadowCellCache.put(v.getTimestamp(), commitTs);
    +            // Continue getting shadow cells until one of them fits this transaction
    +            if (hbaseTransaction.getStartTimestamp() >= commitTs) {
    +                return ReturnCode.NEXT_COL;
    +            } else {
    +                return ReturnCode.SKIP;
    +            }
    +        } else if (CellUtils.isFamilyDeleteCell(v)) {
    +            //Delete is part of this transaction
    +            if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent())
{
    +                familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp());
    +                return ReturnCode.NEXT_COL;
    +            }
    +
    +            if (shadowCellCache.containsKey(v.getTimestamp()) &&
    +                    hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp()))
{
    +                familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp()));
    +                return ReturnCode.NEXT_COL;
    +            }
    +
    +            // Try to get shadow cell from region
    +            final Get get = new Get(CellUtil.cloneRow(v));
    +            get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
    +            get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER));
    +            Result deleteFamilySC = snapshotFilter.getTableAccessWrapper().get(get);
    +
    +            if (!deleteFamilySC.isEmpty() &&
    +                    Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0] ))
< hbaseTransaction.getStartTimestamp()){
    +                familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0])));
    +                return ReturnCode.NEXT_COL;
    +            }
    +
    +            //At last go to commit table
    +            Optional<Long> commitTimestamp = snapshotFilter.tryToLocateCellCommitTimestamp(hbaseTransaction.getEpoch(),
    +                    v, shadowCellCache);
    +            if (commitTimestamp.isPresent() && hbaseTransaction.getStartTimestamp()
>= commitTimestamp.get()) {
    +                familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), commitTimestamp.get());
    +                return ReturnCode.NEXT_COL;
    +            }
    +
    +            // Continue getting the next version of the delete family,
    +            // until we get one in the snapshot or move to next cell
    +            return ReturnCode.SKIP;
    +        }
    +
    +        if (familyDeletionCache.containsKey(Bytes.toString(CellUtil.cloneFamily(v)))
    +                && familyDeletionCache.get(Bytes.toString(CellUtil.cloneFamily(v)))
>= v.getTimestamp()) {
    +            return ReturnCode.NEXT_COL;
    +        }
    +
    +        if (isCellInSnapshot(v)) {
    +
    +            if (CellUtils.isTombstone(v)) {
    +                return ReturnCode.NEXT_COL;
    +            }
    +
    +            if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT)
{
    +                return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
    +            } else if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL)
{
    +                if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent())
{
    +                    return runUserFilter(v, ReturnCode.INCLUDE);
    +                } else {
    +                    return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
    +                }
    +            }
    +        }
    +        return ReturnCode.SKIP;
    +    }
    +
    +
    +    private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
    +            throws IOException {
    +
    +        if (userFilter == null) {
    +            return snapshotReturn;
    +        }
    +
    +        ReturnCode userRes = userFilter.filterKeyValue(v);
    +        switch (userRes) {
    +            case INCLUDE:
    +                return snapshotReturn;
    +            case SKIP:
    +                return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL;
    +            default:
    +                return userRes;
    +        }
    +
    +    }
    +
    +
    +    private boolean isCellInSnapshot(Cell v) throws IOException {
    +        if (shadowCellCache.containsKey(v.getTimestamp()) &&
    +                hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp()))
{
    +            return true;
    +        }
    +        if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
    +            return true;
    +        }
    +        Optional<Long> commitTS = snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction,
shadowCellCache);
    +        if (commitTS.isPresent()) {
    +            shadowCellCache.put(v.getTimestamp(), commitTS.get());
    +            return true;
    +        }
    +        return false;
    +    }
    +
    +
    +    @Override
    +    public void reset() throws IOException {
    +        shadowCellCache.clear();
    +        familyDeletionCache.clear();
    +        if (userFilter != null) {
    +            userFilter.reset();
    +        }
    +    }
    +
    +    @Override
    +    public boolean filterRow() throws IOException {
    +        if (userFilter != null) {
    +            return userFilter.filterRow();
    +        }
    +        return super.filterRow();
    +    }
    +
    +
    +    @Override
    +    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException
{
    +        if (userFilter != null) {
    +            return userFilter.filterRowKey(buffer, offset, length);
    --- End diff --
    
    I think this ok (this is what Tephra does).


> Implement visibility filter as pure HBase Filter
> ------------------------------------------------
>
>                 Key: OMID-102
>                 URL: https://issues.apache.org/jira/browse/OMID-102
>             Project: Apache Omid
>          Issue Type: Sub-task
>            Reporter: James Taylor
>            Assignee: Yonatan Gottesman
>            Priority: Major
>
> The way Omid currently filters through it's own RegionScanner won't work the way it's
implemented (i.e. the way the filtering is done *after* the next call). The reason is that
the state of HBase filters get messed up since these filters will start to see cells that
it shouldn't (i.e. cells that would be filtered based on snapshot isolation). It cannot be worked
around by manually running filters afterwards because filters may issue seek calls which
are handled during the running of scans by HBase.
>  
> Instead, the filtering needs to be implemented as a pure HBase filter and that filter
needs to delegate to the other, delegate filter once it's determined that the cell is visible.
See Tephra's TransactionVisibilityFilter and they way it calls the delegate filter (cellFilters)
only after it's determined that the cell is visible. You may run into TEPHRA-169 without
including the CellSkipFilter too. 
> Because it'll be easier if you see shadow cells *before* their corresponding real cells you
can prefix instead of suffix the column qualifiers to guarantee that you'd see the shadow
cells prior to the actual cells. Or you could buffer cells in your filter prior to omitting
them. Another issue would be if the shadow cells aren't found and you need to consult the
commit table - I suppose if the shadow cells are first, this logic would be easier to know
when it needs to be called.
>  
> To reproduce, see the Phoenix unit tests FlappingTransactionIT.testInflightUpdateNotSeen()
and testInflightDeleteNotSeen().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message