Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2612A18635 for ; Sun, 29 Nov 2015 21:17:18 +0000 (UTC) Received: (qmail 11546 invoked by uid 500); 29 Nov 2015 21:17:15 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 11444 invoked by uid 500); 29 Nov 2015 21:17:15 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 10389 invoked by uid 99); 29 Nov 2015 21:17:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 29 Nov 2015 21:17:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CF13DE0AC6; Sun, 29 Nov 2015 21:17:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Sun, 29 Nov 2015 21:17:32 -0000 Message-Id: In-Reply-To: <9d71035b34ee40e9808f2706bbd8e144@git.apache.org> References: <9d71035b34ee40e9808f2706bbd8e144@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/51] [partial] hbase git commit: Published site at 3bac31b2a49bca153df3b47a198667828b61f36e. http://git-wip-us.apache.org/repos/asf/hbase/blob/ef7355e1/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html index fe65cea..b625de4 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html @@ -1409,10 +1409,10 @@ 1401 } 1402 } 1403 -1404 this.closing.set(true); -1405 status.setStatus("Disabling writes for close"); -1406 // block waiting for the lock for closing -1407 lock.writeLock().lock(); +1404 // block waiting for the lock for closing +1405 lock.writeLock().lock(); +1406 this.closing.set(true); +1407 status.setStatus("Disabling writes for close"); 1408 try { 1409 if (this.isClosed()) { 1410 status.abort("Already got closed by another process"); @@ -5242,7 +5242,7 @@ 5234 * Determines whether multiple column families are present 5235 * Precondition: familyPaths is not null 5236 * -5237 * @param familyPaths List of Pair<byte[] column family, String hfilePath> +5237 * @param familyPaths List of (column family, hfilePath) 5238 */ 5239 private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>> familyPaths) { 5240 boolean multipleFamilies = false; @@ -5955,10 +5955,10 @@ 5947 5948 /** 5949 * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines -5950 * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may -5951 * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns -5952 * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the -5953 * filterRow() will be skipped. +5950 * both filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, +5951 * it may not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only +5952 * returns true when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). +5953 * Therefore, the filterRow() will be skipped. 5954 */ 5955 private boolean filterRow() throws IOException { 5956 // when hasFilterRow returns true, filter.filterRow() will be called automatically inside @@ -6958,1155 +6958,1148 @@ 6950 } 6951 6952 /** -6953 * @param cell -6954 * @param tags -6955 * @return The passed-in List<Tag> but with the tags from <code>cell</code> added. -6956 */ -6957 private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) { -6958 if (cell.getTagsLength() <= 0) return tags; -6959 List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags; -6960 Iterator<Tag> i = -6961 CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); -6962 while (i.hasNext()) newTags.add(i.next()); -6963 return newTags; -6964 } -6965 -6966 /** -6967 * Run a Get against passed in <code>store</code> on passed <code>row</code>, etc. -6968 * @param store -6969 * @param row -6970 * @param family -6971 * @param tr -6972 * @return Get result. -6973 * @throws IOException -6974 */ -6975 private List<Cell> doGet(final Store store, final byte [] row, -6976 final Map.Entry<byte[], List<Cell>> family, final TimeRange tr) -6977 throws IOException { -6978 // Sort the cells so that they match the order that they -6979 // appear in the Get results. Otherwise, we won't be able to -6980 // find the existing values if the cells are not specified -6981 // in order by the client since cells are in an array list. -6982 Collections.sort(family.getValue(), store.getComparator()); -6983 // Get previous values for all columns in this family -6984 Get get = new Get(row); -6985 for (Cell cell : family.getValue()) { -6986 get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell)); -6987 } -6988 if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax()); -6989 return get(get, false); -6990 } -6991 -6992 public Result append(Append append) throws IOException { -6993 return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); -6994 } -6995 -6996 // TODO: There's a lot of boiler plate code identical to increment. -6997 // We should refactor append and increment as local get-mutate-put -6998 // transactions, so all stores only go through one code path for puts. -6999 -7000 @Override -7001 public Result append(Append mutate, long nonceGroup, long nonce) throws IOException { -7002 Operation op = Operation.APPEND; -7003 byte[] row = mutate.getRow(); -7004 checkRow(row, op.toString()); -7005 checkFamilies(mutate.getFamilyCellMap().keySet()); -7006 boolean flush = false; -7007 Durability durability = getEffectiveDurability(mutate.getDurability()); -7008 boolean writeToWAL = durability != Durability.SKIP_WAL; -7009 WALEdit walEdits = null; -7010 List<Cell> allKVs = new ArrayList<Cell>(mutate.size()); -7011 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>(); -7012 long size = 0; -7013 long txid = 0; -7014 checkReadOnly(); -7015 checkResources(); -7016 // Lock row -7017 startRegionOperation(op); -7018 this.writeRequestsCount.increment(); -7019 RowLock rowLock = null; -7020 WALKey walKey = null; -7021 MultiVersionConcurrencyControl.WriteEntry writeEntry = null; -7022 boolean doRollBackMemstore = false; -7023 try { -7024 rowLock = getRowLock(row); -7025 assert rowLock != null; -7026 try { -7027 lock(this.updatesLock.readLock()); -7028 try { -7029 // Wait for all prior MVCC transactions to finish - while we hold the row lock -7030 // (so that we are guaranteed to see the latest state when we do our Get) -7031 mvcc.await(); -7032 if (this.coprocessorHost != null) { -7033 Result r = this.coprocessorHost.preAppendAfterRowLock(mutate); -7034 if (r!= null) { -7035 return r; -7036 } -7037 } -7038 long now = EnvironmentEdgeManager.currentTime(); -7039 // Process each family -7040 for (Map.Entry<byte[], List<Cell>> family : mutate.getFamilyCellMap().entrySet()) { -7041 Store store = stores.get(family.getKey()); -7042 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size()); -7043 -7044 List<Cell> results = doGet(store, row, family, null); -7045 -7046 // Iterate the input columns and update existing values if they were -7047 // found, otherwise add new column initialized to the append value -7048 -7049 // Avoid as much copying as possible. We may need to rewrite and -7050 // consolidate tags. Bytes are only copied once. -7051 // Would be nice if KeyValue had scatter/gather logic -7052 int idx = 0; -7053 for (Cell cell : family.getValue()) { -7054 Cell newCell; -7055 Cell oldCell = null; -7056 if (idx < results.size() -7057 && CellUtil.matchingQualifier(results.get(idx), cell)) { -7058 oldCell = results.get(idx); -7059 long ts = Math.max(now, oldCell.getTimestamp()); +6953 * @return The passed-in {@code tags} but with the tags from {@code cell} added. +6954 */ +6955 private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) { +6956 if (cell.getTagsLength() <= 0) return tags; +6957 List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags; +6958 Iterator<Tag> i = +6959 CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); +6960 while (i.hasNext()) newTags.add(i.next()); +6961 return newTags; +6962 } +6963 +6964 /** +6965 * Run a Get against passed in <code>store</code> on passed <code>row</code>, etc. +6966 * @return Get result. +6967 */ +6968 private List<Cell> doGet(final Store store, final byte [] row, +6969 final Map.Entry<byte[], List<Cell>> family, final TimeRange tr) +6970 throws IOException { +6971 // Sort the cells so that they match the order that they +6972 // appear in the Get results. Otherwise, we won't be able to +6973 // find the existing values if the cells are not specified +6974 // in order by the client since cells are in an array list. +6975 Collections.sort(family.getValue(), store.getComparator()); +6976 // Get previous values for all columns in this family +6977 Get get = new Get(row); +6978 for (Cell cell : family.getValue()) { +6979 get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell)); +6980 } +6981 if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax()); +6982 return get(get, false); +6983 } +6984 +6985 public Result append(Append append) throws IOException { +6986 return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); +6987 } +6988 +6989 // TODO: There's a lot of boiler plate code identical to increment. +6990 // We should refactor append and increment as local get-mutate-put +6991 // transactions, so all stores only go through one code path for puts. +6992 +6993 @Override +6994 public Result append(Append mutate, long nonceGroup, long nonce) throws IOException { +6995 Operation op = Operation.APPEND; +6996 byte[] row = mutate.getRow(); +6997 checkRow(row, op.toString()); +6998 checkFamilies(mutate.getFamilyCellMap().keySet()); +6999 boolean flush = false; +7000 Durability durability = getEffectiveDurability(mutate.getDurability()); +7001 boolean writeToWAL = durability != Durability.SKIP_WAL; +7002 WALEdit walEdits = null; +7003 List<Cell> allKVs = new ArrayList<Cell>(mutate.size()); +7004 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>(); +7005 long size = 0; +7006 long txid = 0; +7007 checkReadOnly(); +7008 checkResources(); +7009 // Lock row +7010 startRegionOperation(op); +7011 this.writeRequestsCount.increment(); +7012 RowLock rowLock = null; +7013 WALKey walKey = null; +7014 MultiVersionConcurrencyControl.WriteEntry writeEntry = null; +7015 boolean doRollBackMemstore = false; +7016 try { +7017 rowLock = getRowLock(row); +7018 assert rowLock != null; +7019 try { +7020 lock(this.updatesLock.readLock()); +7021 try { +7022 // Wait for all prior MVCC transactions to finish - while we hold the row lock +7023 // (so that we are guaranteed to see the latest state when we do our Get) +7024 mvcc.await(); +7025 if (this.coprocessorHost != null) { +7026 Result r = this.coprocessorHost.preAppendAfterRowLock(mutate); +7027 if (r!= null) { +7028 return r; +7029 } +7030 } +7031 long now = EnvironmentEdgeManager.currentTime(); +7032 // Process each family +7033 for (Map.Entry<byte[], List<Cell>> family : mutate.getFamilyCellMap().entrySet()) { +7034 Store store = stores.get(family.getKey()); +7035 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size()); +7036 +7037 List<Cell> results = doGet(store, row, family, null); +7038 +7039 // Iterate the input columns and update existing values if they were +7040 // found, otherwise add new column initialized to the append value +7041 +7042 // Avoid as much copying as possible. We may need to rewrite and +7043 // consolidate tags. Bytes are only copied once. +7044 // Would be nice if KeyValue had scatter/gather logic +7045 int idx = 0; +7046 for (Cell cell : family.getValue()) { +7047 Cell newCell; +7048 Cell oldCell = null; +7049 if (idx < results.size() +7050 && CellUtil.matchingQualifier(results.get(idx), cell)) { +7051 oldCell = results.get(idx); +7052 long ts = Math.max(now, oldCell.getTimestamp()); +7053 +7054 // Process cell tags +7055 // Make a union of the set of tags in the old and new KVs +7056 List<Tag> newTags = carryForwardTags(oldCell, new ArrayList<Tag>()); +7057 newTags = carryForwardTags(cell, newTags); +7058 +7059 // Cell TTL handling 7060 -7061 // Process cell tags -7062 // Make a union of the set of tags in the old and new KVs -7063 List<Tag> newTags = carryForwardTags(oldCell, new ArrayList<Tag>()); -7064 newTags = carryForwardTags(cell, newTags); +7061 if (mutate.getTTL() != Long.MAX_VALUE) { +7062 // Add the new TTL tag +7063 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); +7064 } 7065 -7066 // Cell TTL handling -7067 -7068 if (mutate.getTTL() != Long.MAX_VALUE) { -7069 // Add the new TTL tag -7070 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); -7071 } -7072 -7073 // Rebuild tags -7074 byte[] tagBytes = Tag.fromList(newTags); -7075 -7076 // allocate an empty cell once -7077 newCell = new KeyValue(row.length, cell.getFamilyLength(), -7078 cell.getQualifierLength(), ts, KeyValue.Type.Put, -7079 oldCell.getValueLength() + cell.getValueLength(), -7080 tagBytes.length); -7081 // copy in row, family, and qualifier -7082 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), -7083 newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength()); -7084 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(), -7085 newCell.getFamilyArray(), newCell.getFamilyOffset(), -7086 cell.getFamilyLength()); -7087 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), -7088 newCell.getQualifierArray(), newCell.getQualifierOffset(), -7089 cell.getQualifierLength()); -7090 // copy in the value -7091 CellUtil.copyValueTo(oldCell, newCell.getValueArray(), newCell.getValueOffset()); -7092 System.arraycopy(cell.getValueArray(), cell.getValueOffset(), -7093 newCell.getValueArray(), -7094 newCell.getValueOffset() + oldCell.getValueLength(), -7095 cell.getValueLength()); -7096 // Copy in tag data -7097 System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), -7098 tagBytes.length); -7099 idx++; -7100 } else { -7101 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP -7102 CellUtil.updateLatestStamp(cell, now); -7103 -7104 // Cell TTL handling -7105 -7106 if (mutate.getTTL() != Long.MAX_VALUE) { -7107 List<Tag> newTags = new ArrayList<Tag>(1); -7108 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); -7109 // Add the new TTL tag -7110 newCell = new TagRewriteCell(cell, Tag.fromList(newTags)); -7111 } else { -7112 newCell = cell; -7113 } -7114 } +7066 // Rebuild tags +7067 byte[] tagBytes = Tag.fromList(newTags); +7068 +7069 // allocate an empty cell once +7070 newCell = new KeyValue(row.length, cell.getFamilyLength(), +7071 cell.getQualifierLength(), ts, KeyValue.Type.Put, +7072 oldCell.getValueLength() + cell.getValueLength(), +7073 tagBytes.length); +7074 // copy in row, family, and qualifier +7075 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), +7076 newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength()); +7077 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(), +7078 newCell.getFamilyArray(), newCell.getFamilyOffset(), +7079 cell.getFamilyLength()); +7080 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), +7081 newCell.getQualifierArray(), newCell.getQualifierOffset(), +7082 cell.getQualifierLength()); +7083 // copy in the value +7084 CellUtil.copyValueTo(oldCell, newCell.getValueArray(), newCell.getValueOffset()); +7085 System.arraycopy(cell.getValueArray(), cell.getValueOffset(), +7086 newCell.getValueArray(), +7087 newCell.getValueOffset() + oldCell.getValueLength(), +7088 cell.getValueLength()); +7089 // Copy in tag data +7090 System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), +7091 tagBytes.length); +7092 idx++; +7093 } else { +7094 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP +7095 CellUtil.updateLatestStamp(cell, now); +7096 +7097 // Cell TTL handling +7098 +7099 if (mutate.getTTL() != Long.MAX_VALUE) { +7100 List<Tag> newTags = new ArrayList<Tag>(1); +7101 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); +7102 // Add the new TTL tag +7103 newCell = new TagRewriteCell(cell, Tag.fromList(newTags)); +7104 } else { +7105 newCell = cell; +7106 } +7107 } +7108 +7109 // Give coprocessors a chance to update the new cell +7110 if (coprocessorHost != null) { +7111 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND, +7112 mutate, oldCell, newCell); +7113 } +7114 kvs.add(newCell); 7115 -7116 // Give coprocessors a chance to update the new cell -7117 if (coprocessorHost != null) { -7118 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND, -7119 mutate, oldCell, newCell); -7120 } -7121 kvs.add(newCell); -7122 -7123 // Append update to WAL -7124 if (writeToWAL) { -7125 if (walEdits == null) { -7126 walEdits = new WALEdit(); -7127 } -7128 walEdits.add(newCell); -7129 } -7130 } -7131 -7132 //store the kvs to the temporary memstore before writing WAL -7133 tempMemstore.put(store, kvs); -7134 } -7135 -7136 // Actually write to WAL now -7137 if (walEdits != null && !walEdits.isEmpty()) { -7138 if (writeToWAL) { -7139 // Using default cluster id, as this can only happen in the originating -7140 // cluster. A slave cluster receives the final value (not the delta) -7141 // as a Put. -7142 // we use HLogKey here instead of WALKey directly to support legacy coprocessors. -7143 walKey = new HLogKey( -7144 getRegionInfo().getEncodedNameAsBytes(), -7145 this.htableDescriptor.getTableName(), -7146 WALKey.NO_SEQUENCE_ID, -7147 nonceGroup, -7148 nonce, -7149 mvcc); -7150 txid = -7151 this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true); -7152 } else { -7153 recordMutationWithoutWal(mutate.getFamilyCellMap()); -7154 } -7155 } -7156 if (walKey == null) { -7157 // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned -7158 walKey = this.appendEmptyEdit(this.wal); -7159 } -7160 -7161 // now start my own transaction -7162 writeEntry = walKey.getWriteEntry(); -7163 -7164 -7165 // Actually write to Memstore now -7166 if (!tempMemstore.isEmpty()) { -7167 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { -7168 Store store = entry.getKey(); -7169 if (store.getFamily().getMaxVersions() == 1) { -7170 // upsert if VERSIONS for this CF == 1 -7171 // Is this right? It immediately becomes visible? St.Ack 20150907 -7172 size += store.upsert(entry.getValue(), getSmallestReadPoint()); -7173 } else { -7174 // otherwise keep older versions around -7175 for (Cell cell: entry.getValue()) { -7176 CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); -7177 size += store.add(cell); -7178 doRollBackMemstore = true; -7179 } -7180 } -7181 // We add to all KVs here whereas when doing increment, we do it -7182 // earlier... why? -7183 allKVs.addAll(entry.getValue()); -7184 } +7116 // Append update to WAL +7117 if (writeToWAL) { +7118 if (walEdits == null) { +7119 walEdits = new WALEdit(); +7120 } +7121 walEdits.add(newCell); +7122 } +7123 } +7124 +7125 //store the kvs to the temporary memstore before writing WAL +7126 tempMemstore.put(store, kvs); +7127 } +7128 +7129 // Actually write to WAL now +7130 if (walEdits != null && !walEdits.isEmpty()) { +7131 if (writeToWAL) { +7132 // Using default cluster id, as this can only happen in the originating +7133 // cluster. A slave cluster receives the final value (not the delta) +7134 // as a Put. +7135 // we use HLogKey here instead of WALKey directly to support legacy coprocessors. +7136 walKey = new HLogKey( +7137 getRegionInfo().getEncodedNameAsBytes(), +7138 this.htableDescriptor.getTableName(), +7139 WALKey.NO_SEQUENCE_ID, +7140 nonceGroup, +7141 nonce, +7142 mvcc); +7143 txid = +7144 this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true); +7145 } else { +7146 recordMutationWithoutWal(mutate.getFamilyCellMap()); +7147 } +7148 } +7149 if (walKey == null) { +7150 // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned +7151 walKey = this.appendEmptyEdit(this.wal); +7152 } +7153 +7154 // now start my own transaction +7155 writeEntry = walKey.getWriteEntry(); +7156 +7157 +7158 // Actually write to Memstore now +7159 if (!tempMemstore.isEmpty()) { +7160 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { +7161 Store store = entry.getKey(); +7162 if (store.getFamily().getMaxVersions() == 1) { +7163 // upsert if VERSIONS for this CF == 1 +7164 // Is this right? It immediately becomes visible? St.Ack 20150907 +7165 size += store.upsert(entry.getValue(), getSmallestReadPoint()); +7166 } else { +7167 // otherwise keep older versions around +7168 for (Cell cell: entry.getValue()) { +7169 CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); +7170 size += store.add(cell); +7171 doRollBackMemstore = true; +7172 } +7173 } +7174 // We add to all KVs here whereas when doing increment, we do it +7175 // earlier... why? +7176 allKVs.addAll(entry.getValue()); +7177 } +7178 +7179 size = this.addAndGetGlobalMemstoreSize(size); +7180 flush = isFlushSize(size); +7181 } +7182 } finally { +7183 this.updatesLock.readLock().unlock(); +7184 } 7185 -7186 size = this.addAndGetGlobalMemstoreSize(size); -7187 flush = isFlushSize(size); -7188 } -7189 } finally { -7190 this.updatesLock.readLock().unlock(); -7191 } -7192 -7193 } finally { -7194 rowLock.release(); -7195 rowLock = null; -7196 } -7197 // sync the transaction log outside the rowlock -7198 if(txid != 0){ -7199 syncOrDefer(txid, durability); -7200 } -7201 doRollBackMemstore = false; -7202 } finally { -7203 if (rowLock != null) { -7204 rowLock.release(); +7186 } finally { +7187 rowLock.release(); +7188 rowLock = null; +7189 } +7190 // sync the transaction log outside the rowlock +7191 if(txid != 0){ +7192 syncOrDefer(txid, durability); +7193 } +7194 doRollBackMemstore = false; +7195 } finally { +7196 if (rowLock != null) { +7197 rowLock.release(); +7198 } +7199 // if the wal sync was unsuccessful, remove keys from memstore +7200 if (doRollBackMemstore) { +7201 rollbackMemstore(allKVs); +7202 if (writeEntry != null) mvcc.complete(writeEntry); +7203 } else if (writeEntry != null) { +7204 mvcc.completeAndWait(writeEntry); 7205 } -7206 // if the wal sync was unsuccessful, remove keys from memstore -7207 if (doRollBackMemstore) { -7208 rollbackMemstore(allKVs); -7209 if (writeEntry != null) mvcc.complete(writeEntry); -7210 } else if (writeEntry != null) { -7211 mvcc.completeAndWait(writeEntry); -7212 } +7206 +7207 closeRegionOperation(op); +7208 } +7209 +7210 if (this.metricsRegion != null) { +7211 this.metricsRegion.updateAppend(); +7212 } 7213 -7214 closeRegionOperation(op); -7215 } -7216 -7217 if (this.metricsRegion != null) { -7218 this.metricsRegion.updateAppend(); -7219 } -7220 -7221 if (flush) { -7222 // Request a cache flush. Do it outside update lock. -7223 requestFlush(); -7224 } +7214 if (flush) { +7215 // Request a cache flush. Do it outside update lock. +7216 requestFlush(); +7217 } +7218 +7219 return mutate.isReturnResults() ? Result.create(allKVs) : null; +7220 } +7221 +7222 public Result increment(Increment increment) throws IOException { +7223 return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); +7224 } 7225 -7226 return mutate.isReturnResults() ? Result.create(allKVs) : null; -7227 } -7228 -7229 public Result increment(Increment increment) throws IOException { -7230 return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); -7231 } -7232 -7233 // TODO: There's a lot of boiler plate code identical to append. -7234 // We should refactor append and increment as local get-mutate-put -7235 // transactions, so all stores only go through one code path for puts. -7236 -7237 // They are subtley different in quiet a few ways. This came out only -7238 // after study. I am not sure that many of the differences are intentional. -7239 // TODO: St.Ack 20150907 -7240 -7241 @Override -7242 public Result increment(Increment mutation, long nonceGroup, long nonce) -7243 throws IOException { -7244 Operation op = Operation.INCREMENT; -7245 byte [] row = mutation.getRow(); -7246 checkRow(row, op.toString()); -7247 checkFamilies(mutation.getFamilyCellMap().keySet()); -7248 boolean flush = false; -7249 Durability durability = getEffectiveDurability(mutation.getDurability()); -7250 boolean writeToWAL = durability != Durability.SKIP_WAL; -7251 WALEdit walEdits = null; -7252 List<Cell> allKVs = new ArrayList<Cell>(mutation.size()); -7253 -7254 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>(); -7255 long size = 0; -7256 long txid = 0; -7257 checkReadOnly(); -7258 checkResources(); -7259 // Lock row -7260 startRegionOperation(op); -7261 this.writeRequestsCount.increment(); -7262 RowLock rowLock = null; -7263 WALKey walKey = null; -7264 MultiVersionConcurrencyControl.WriteEntry writeEntry = null; -7265 boolean doRollBackMemstore = false; -7266 TimeRange tr = mutation.getTimeRange(); -7267 try { -7268 rowLock = getRowLock(row); -7269 assert rowLock != null; -7270 try { -7271 lock(this.updatesLock.readLock()); -7272 try { -7273 // wait for all prior MVCC transactions to finish - while we hold the row lock -7274 // (so that we are guaranteed to see the latest state) -7275 mvcc.await(); -7276 if (this.coprocessorHost != null) { -7277 Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation); -7278 if (r != null) { -7279 return r; -7280 } -7281 } -7282 long now = EnvironmentEdgeManager.currentTime(); -7283 // Process each family -7284 for (Map.Entry<byte [], List<Cell>> family: mutation.getFamilyCellMap().entrySet()) { -7285 Store store = stores.get(family.getKey()); -7286 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size()); -7287 -7288 List<Cell> results = doGet(store, row, family, tr); -7289 -7290 // Iterate the input columns and update existing values if they were -7291 // found, otherwise add new column initialized to the increment amount -7292 -7293 // Avoid as much copying as possible. We may need to rewrite and -7294 // consolidate tags. Bytes are only copied once. -7295 // Would be nice if KeyValue had scatter/gather logic -7296 int idx = 0; -7297 // HERE WE DIVERGE FROM APPEND -7298 List<Cell> edits = family.getValue(); -7299 for (int i = 0; i < edits.size(); i++) { -7300 Cell cell = edits.get(i); -7301 long amount = Bytes.toLong(CellUtil.cloneValue(cell)); -7302 boolean noWriteBack = (amount == 0); -7303 -7304 List<Tag> newTags = carryForwardTags(cell, new ArrayList<Tag>()); -7305 -7306 Cell c = null; -7307 long ts = now; -7308 if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { -7309 c = results.get(idx); -7310 ts = Math.max(now, c.getTimestamp()); -7311 if(c.getValueLength() == Bytes.SIZEOF_LONG) { -7312 amount += CellUtil.getValueAsLong(c); -7313 } else { -7314 // throw DoNotRetryIOException instead of IllegalArgumentException -7315 throw new org.apache.hadoop.hbase.DoNotRetryIOException( -7316 "Attempted to increment field that isn't 64 bits wide"); -7317 } -7318 // Carry tags forward from previous version -7319 newTags = carryForwardTags(c, newTags); -7320 if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) { -7321 idx++; -7322 } -7323 } -7324 -7325 // Append new incremented KeyValue to list -7326 byte[] q = CellUtil.cloneQualifier(cell); -7327 byte[] val = Bytes.toBytes(amount); -7328 -7329 // Add the TTL tag if the mutation carried one -7330 if (mutation.getTTL() != Long.MAX_VALUE) { -7331 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL()))); -7332 } -7333 -7334 Cell newKV = new KeyValue(row, 0, row.length, -7335 family.getKey(), 0, family.getKey().length, -7336 q, 0, q.length, -7337 ts, -7338 KeyValue.Type.Put, -7339 val, 0, val.length, -7340 newTags); +7226 // TODO: There's a lot of boiler plate code identical to append. +7227 // We should refactor append and increment as local get-mutate-put +7228 // transactions, so all stores only go through one code path for puts. +7229 +7230 // They are subtley different in quiet a few ways. This came out only +7231 // after study. I am not sure that many of the differences are intentional. +7232 // TODO: St.Ack 20150907 +7233 +7234 @Override +7235 public Result increment(Increment mutation, long nonceGroup, long nonce) +7236 throws IOException { +7237 Operation op = Operation.INCREMENT; +7238 byte [] row = mutation.getRow(); +7239 checkRow(row, op.toString()); +7240 checkFamilies(mutation.getFamilyCellMap().keySet()); +7241 boolean flush = false; +7242 Durability durability = getEffectiveDurability(mutation.getDurability()); +7243 boolean writeToWAL = durability != Durability.SKIP_WAL; +7244 WALEdit walEdits = null; +7245 List<Cell> allKVs = new ArrayList<Cell>(mutation.size()); +7246 +7247 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>(); +7248 long size = 0; +7249 long txid = 0; +7250 checkReadOnly(); +7251 checkResources(); +7252 // Lock row +7253 startRegionOperation(op); +7254 this.writeRequestsCount.increment(); +7255 RowLock rowLock = null; +7256 WALKey walKey = null; +7257 MultiVersionConcurrencyControl.WriteEntry writeEntry = null; +7258 boolean doRollBackMemstore = false; +7259 TimeRange tr = mutation.getTimeRange(); +7260 try { +7261 rowLock = getRowLock(row); +7262 assert rowLock != null; +7263 try { +7264 lock(this.updatesLock.readLock()); +7265 try { +7266 // wait for all prior MVCC transactions to finish - while we hold the row lock +7267 // (so that we are guaranteed to see the latest state) +7268 mvcc.await(); +7269 if (this.coprocessorHost != null) { +7270 Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation); +7271 if (r != null) { +7272 return r; +7273 } +7274 } +7275 long now = EnvironmentEdgeManager.currentTime(); +7276 // Process each family +7277 for (Map.Entry<byte [], List<Cell>> family: mutation.getFamilyCellMap().entrySet()) { +7278 Store store = stores.get(family.getKey()); +7279 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size()); +7280 +7281 List<Cell> results = doGet(store, row, family, tr); +7282 +7283 // Iterate the input columns and update existing values if they were +7284 // found, otherwise add new column initialized to the increment amount +7285 +7286 // Avoid as much copying as possible. We may need to rewrite and +7287 // consolidate tags. Bytes are only copied once. +7288 // Would be nice if KeyValue had scatter/gather logic +7289 int idx = 0; +7290 // HERE WE DIVERGE FROM APPEND +7291 List<Cell> edits = family.getValue(); +7292 for (int i = 0; i < edits.size(); i++) { +7293 Cell cell = edits.get(i); +7294 long amount = Bytes.toLong(CellUtil.cloneValue(cell)); +7295 boolean noWriteBack = (amount == 0); +7296 +7297 List<Tag> newTags = carryForwardTags(cell, new ArrayList<Tag>()); +7298 +7299 Cell c = null; +7300 long ts = now; +7301 if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { +7302 c = results.get(idx); +7303 ts = Math.max(now, c.getTimestamp()); +7304 if(c.getValueLength() == Bytes.SIZEOF_LONG) { +7305 amount += CellUtil.getValueAsLong(c); +7306 } else { +7307 // throw DoNotRetryIOException instead of IllegalArgumentException +7308 throw new org.apache.hadoop.hbase.DoNotRetryIOException( +7309 "Attempted to increment field that isn't 64 bits wide"); +7310 } +7311 // Carry tags forward from previous version +7312 newTags = carryForwardTags(c, newTags); +7313 if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) { +7314 idx++; +7315 } +7316 } +7317 +7318 // Append new incremented KeyValue to list +7319 byte[] q = CellUtil.cloneQualifier(cell); +7320 byte[] val = Bytes.toBytes(amount); +7321 +7322 // Add the TTL tag if the mutation carried one +7323 if (mutation.getTTL() != Long.MAX_VALUE) { +7324 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL()))); +7325 } +7326 +7327 Cell newKV = new KeyValue(row, 0, row.length, +7328 family.getKey(), 0, family.getKey().length, +7329 q, 0, q.length, +7330 ts, +7331 KeyValue.Type.Put, +7332 val, 0, val.length, +7333 newTags); +7334 +7335 // Give coprocessors a chance to update the new cell +7336 if (coprocessorHost != null) { +7337 newKV = coprocessorHost.postMutationBeforeWAL( +7338 RegionObserver.MutationType.INCREMENT, mutation, c, newKV); +7339 } +7340 allKVs.add(newKV); 7341 -7342 // Give coprocessors a chance to update the new cell -7343 if (coprocessorHost != null) { -7344 newKV = coprocessorHost.postMutationBeforeWAL( -7345 RegionObserver.MutationType.INCREMENT, mutation, c, newKV); -7346 } -7347 allKVs.add(newKV); -7348 -7349 if (!noWriteBack) { -7350 kvs.add(newKV); -7351 -7352 // Prepare WAL updates -7353 if (writeToWAL) { -7354 if (walEdits == null) { -7355 walEdits = new WALEdit(); -7356 } -7357 walEdits.add(newKV); -7358 } -7359 } -7360 } -7361 -7362 //store the kvs to the temporary memstore before writing WAL -7363 if (!kvs.isEmpty()) { -7364 tempMemstore.put(store, kvs); -7365 } -7366 } -7367 -7368 // Actually write to WAL now -7369 if (walEdits != null && !walEdits.isEmpty()) { -7370 if (writeToWAL) { -7371 // Using default cluster id, as this can only happen in the originating -7372 // cluster. A slave cluster receives the final value (not the delta) -7373 // as a Put. -7374 // we use HLogKey here instead of WALKey directly to support legacy coprocessors. -7375 walKey = new HLogKey(this.getRegionInfo().get