Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 08EB2200D3C for ; Tue, 31 Oct 2017 01:06:12 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0793D160BE4; Tue, 31 Oct 2017 00:06:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2C8E2160BF8 for ; Tue, 31 Oct 2017 01:06:10 +0100 (CET) Received: (qmail 4963 invoked by uid 500); 31 Oct 2017 00:06:09 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 4923 invoked by uid 99); 31 Oct 2017 00:06:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Oct 2017 00:06:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EBFCDDFA3C; Tue, 31 Oct 2017 00:06:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jkff@apache.org To: commits@beam.apache.org Date: Tue, 31 Oct 2017 00:06:08 -0000 Message-Id: <02304265eae346cbb1ea3461c06aba68@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-1542] SpannerIO: mutation encoding and size estimation improvements archived-at: Tue, 31 Oct 2017 00:06:12 -0000 Repository: beam Updated Branches: refs/heads/master aa26f4bf7 -> 09f68159d [BEAM-1542] SpannerIO: mutation encoding and size estimation improvements Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3ba96003 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3ba96003 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3ba96003 Branch: refs/heads/master Commit: 3ba96003d31ce98a54c0c51c1c0a9cf7c06e2fa2 Parents: aa26f4b Author: Mairbek Khadikov Authored: Wed Oct 18 15:26:11 2017 -0700 Committer: Eugene Kirpichov Committed: Mon Oct 30 17:02:36 2017 -0700 ---------------------------------------------------------------------- .../io/gcp/spanner/MutationGroupEncoder.java | 660 +++++++++++++++++++ .../io/gcp/spanner/MutationSizeEstimator.java | 48 ++ .../gcp/spanner/MutationGroupEncoderTest.java | 636 ++++++++++++++++++ 3 files changed, 1344 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3ba96003/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java new file mode 100644 index 0000000..ba0b4eb --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java @@ -0,0 +1,660 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.ByteArray; +import com.google.cloud.Date; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Type; +import com.google.cloud.spanner.Value; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.util.VarInt; +import org.joda.time.DateTime; +import org.joda.time.Days; +import org.joda.time.MutableDateTime; + +/** + * Given the Spanner Schema, efficiently encodes the mutation group. + */ +class MutationGroupEncoder { + private static final DateTime MIN_DATE = new DateTime(1, 1, 1, 0, 0); + + private final SpannerSchema schema; + private final List tables; + private final Map tablesIndexes = new HashMap<>(); + + public MutationGroupEncoder(SpannerSchema schema) { + this.schema = schema; + tables = schema.getTables(); + + for (int i = 0; i < tables.size(); i++) { + tablesIndexes.put(tables.get(i), i); + } + } + + public byte[] encode(MutationGroup g) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + try { + VarInt.encode(g.attached().size(), bos); + for (Mutation m : g) { + encodeMutation(bos, m); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return bos.toByteArray(); + } + + private static void setBit(byte[] bytes, int i) { + int word = i / 8; + int bit = 7 - i % 8; + bytes[word] |= 1 << bit; + } + + private static boolean getBit(byte[] bytes, int i) { + int word = i / 8; + int bit = 7 - i % 8; + return (bytes[word] & 1 << (bit)) != 0; + } + + private void encodeMutation(ByteArrayOutputStream bos, Mutation m) throws IOException { + Mutation.Op op = m.getOperation(); + bos.write(op.ordinal()); + if (op == Mutation.Op.DELETE) { + encodeDelete(bos, m); + } else { + encodeModification(bos, m); + } + } + + private void encodeDelete(ByteArrayOutputStream bos, Mutation m) throws IOException { + String table = m.getTable().toLowerCase(); + int tableIndex = getTableIndex(table); + VarInt.encode(tableIndex, bos); + ObjectOutput out = new ObjectOutputStream(bos); + out.writeObject(m.getKeySet()); + } + + private Integer getTableIndex(String table) { + Integer result = tablesIndexes.get(table); + checkArgument(result != null, "Unknown table '%s'", table); + return result; + } + + private Mutation decodeDelete(ByteArrayInputStream bis) + throws IOException { + int tableIndex = VarInt.decodeInt(bis); + String tableName = tables.get(tableIndex); + + ObjectInputStream in = new ObjectInputStream(bis); + KeySet keySet; + try { + keySet = (KeySet) in.readObject(); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + return Mutation.delete(tableName, keySet); + } + + // Encodes a mutation that is not a delete one, using the following format + // [bitset of modified columns][value of column1][value of column2][value of column3]... + private void encodeModification(ByteArrayOutputStream bos, Mutation m) throws IOException { + String tableName = m.getTable().toLowerCase(); + int tableIndex = getTableIndex(tableName); + VarInt.encode(tableIndex, bos); + List columns = schema.getColumns(tableName); + checkArgument(columns != null, "Schema for table " + tableName + " not " + "found"); + Map map = mutationAsMap(m); + // java.util.BitSet#toByteArray returns array of unpredictable length. Using byte arrays + // instead. + int bitsetSize = (columns.size() + 7) / 8; + byte[] exists = new byte[bitsetSize]; + byte[] nulls = new byte[bitsetSize]; + for (int i = 0; i < columns.size(); i++) { + String columnName = columns.get(i).getName(); + boolean columnExists = map.containsKey(columnName); + boolean columnNull = columnExists && map.get(columnName).isNull(); + if (columnExists) { + setBit(exists, i); + } + if (columnNull) { + setBit(nulls, i); + map.remove(columnName); + } + } + bos.write(exists); + bos.write(nulls); + for (int i = 0; i < columns.size(); i++) { + if (!getBit(exists, i) || getBit(nulls, i)) { + continue; + } + SpannerSchema.Column column = columns.get(i); + Value value = map.remove(column.getName()); + encodeValue(bos, value); + } + checkArgument(map.isEmpty(), "Columns %s were not defined in table %s", map.keySet(), + m.getTable()); + } + + private void encodeValue(ByteArrayOutputStream bos, Value value) throws IOException { + switch (value.getType().getCode()) { + case ARRAY: + encodeArray(bos, value); + break; + default: + encodePrimitive(bos, value); + } + } + + private void encodeArray(ByteArrayOutputStream bos, Value value) throws IOException { + // TODO: avoid using Java serialization here. + ObjectOutputStream out = new ObjectOutputStream(bos); + switch (value.getType().getArrayElementType().getCode()) { + case BOOL: { + out.writeObject(new ArrayList<>(value.getBoolArray())); + break; + } + case INT64: { + out.writeObject(new ArrayList<>(value.getInt64Array())); + break; + } + case FLOAT64: { + out.writeObject(new ArrayList<>(value.getFloat64Array())); + break; + } + case STRING: { + out.writeObject(new ArrayList<>(value.getStringArray())); + break; + } + case BYTES: { + out.writeObject(new ArrayList<>(value.getBytesArray())); + break; + } + case TIMESTAMP: { + out.writeObject(new ArrayList<>(value.getTimestampArray())); + break; + } + case DATE: { + out.writeObject(new ArrayList<>(value.getDateArray())); + break; + } + default: + throw new IllegalArgumentException("Unknown type " + value.getType()); + } + } + + private void encodePrimitive(ByteArrayOutputStream bos, Value value) throws IOException { + switch (value.getType().getCode()) { + case BOOL: + bos.write(value.getBool() ? 1 : 0); + break; + case INT64: + VarInt.encode(value.getInt64(), bos); + break; + case FLOAT64: + new DataOutputStream(bos).writeDouble(value.getFloat64()); + break; + case STRING: { + String str = value.getString(); + VarInt.encode(str.length(), bos); + bos.write(str.getBytes(StandardCharsets.UTF_8)); + break; + } + case BYTES: { + ByteArray bytes = value.getBytes(); + VarInt.encode(bytes.length(), bos); + bos.write(bytes.toByteArray()); + break; + } + case TIMESTAMP: { + Timestamp timestamp = value.getTimestamp(); + VarInt.encode(timestamp.getSeconds(), bos); + VarInt.encode(timestamp.getNanos(), bos); + break; + } + case DATE: { + Date date = value.getDate(); + VarInt.encode(encodeDate(date), bos); + break; + } + default: + throw new IllegalArgumentException("Unknown type " + value.getType()); + } + } + + public MutationGroup decode(byte[] bytes) { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + + try { + int numMutations = VarInt.decodeInt(bis); + Mutation primary = decodeMutation(bis); + List attached = new ArrayList<>(numMutations); + for (int i = 0; i < numMutations; i++) { + attached.add(decodeMutation(bis)); + } + return MutationGroup.create(primary, attached); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private Mutation decodeMutation(ByteArrayInputStream bis) throws IOException { + Mutation.Op op = Mutation.Op.values()[bis.read()]; + if (op == Mutation.Op.DELETE) { + return decodeDelete(bis); + } + return decodeModification(bis, op); + } + + private Mutation decodeModification(ByteArrayInputStream bis, Mutation.Op op) throws IOException { + int tableIndex = VarInt.decodeInt(bis); + String tableName = tables.get(tableIndex); + + Mutation.WriteBuilder m; + switch (op) { + case INSERT: + m = Mutation.newInsertBuilder(tableName); + break; + case INSERT_OR_UPDATE: + m = Mutation.newInsertOrUpdateBuilder(tableName); + break; + case REPLACE: + m = Mutation.newReplaceBuilder(tableName); + break; + case UPDATE: + m = Mutation.newUpdateBuilder(tableName); + break; + default: + throw new IllegalArgumentException("Unknown operation " + op); + } + List columns = schema.getColumns(tableName); + int bitsetSize = (columns.size() + 7) / 8; + byte[] exists = readBytes(bis, bitsetSize); + byte[] nulls = readBytes(bis, bitsetSize); + + for (int i = 0; i < columns.size(); i++) { + if (!getBit(exists, i)) { + continue; + } + SpannerSchema.Column column = columns.get(i); + boolean isNull = getBit(nulls, i); + Type type = column.getType(); + String fieldName = column.getName(); + switch (type.getCode()) { + case ARRAY: + try { + decodeArray(bis, fieldName, type, isNull, m); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + break; + default: + decodePrimitive(bis, fieldName, type, isNull, m); + } + + } + return m.build(); + } + + private void decodeArray(ByteArrayInputStream bis, String fieldName, Type type, boolean isNull, + Mutation.WriteBuilder m) throws IOException, ClassNotFoundException { + // TODO: avoid using Java serialization here. + switch (type.getArrayElementType().getCode()) { + case BOOL: { + if (isNull) { + m.set(fieldName).toBoolArray((Iterable) null); + } else { + ObjectInputStream out = new ObjectInputStream(bis); + m.set(fieldName).toBoolArray((List) out.readObject()); + } + break; + } + case INT64: { + if (isNull) { + m.set(fieldName).toInt64Array((Iterable) null); + } else { + ObjectInputStream out = new ObjectInputStream(bis); + m.set(fieldName).toInt64Array((List) out.readObject()); + } + break; + } + case FLOAT64: { + if (isNull) { + m.set(fieldName).toFloat64Array((Iterable) null); + } else { + ObjectInputStream out = new ObjectInputStream(bis); + m.set(fieldName).toFloat64Array((List) out.readObject()); + } + break; + } + case STRING: { + if (isNull) { + m.set(fieldName).toStringArray(null); + } else { + ObjectInputStream out = new ObjectInputStream(bis); + m.set(fieldName).toStringArray((List) out.readObject()); + } + break; + } + case BYTES: { + if (isNull) { + m.set(fieldName).toBytesArray(null); + } else { + ObjectInputStream out = new ObjectInputStream(bis); + m.set(fieldName).toBytesArray((List) out.readObject()); + } + break; + } + case TIMESTAMP: { + if (isNull) { + m.set(fieldName).toTimestampArray(null); + } else { + ObjectInputStream out = new ObjectInputStream(bis); + m.set(fieldName).toTimestampArray((List) out.readObject()); + } + break; + } + case DATE: { + if (isNull) { + m.set(fieldName).toDateArray(null); + } else { + ObjectInputStream out = new ObjectInputStream(bis); + m.set(fieldName).toDateArray((List) out.readObject()); + } + break; + } + default: + throw new IllegalArgumentException("Unknown type " + type); + } + } + + private void decodePrimitive(ByteArrayInputStream bis, String fieldName, Type type, + boolean isNull, Mutation.WriteBuilder m) throws IOException { + switch (type.getCode()) { + case BOOL: + if (isNull) { + m.set(fieldName).to((Boolean) null); + } else { + m.set(fieldName).to(bis.read() != 0); + } + break; + case INT64: + if (isNull) { + m.set(fieldName).to((Long) null); + } else { + m.set(fieldName).to(VarInt.decodeLong(bis)); + } + break; + case FLOAT64: + if (isNull) { + m.set(fieldName).to((Double) null); + } else { + m.set(fieldName).to(new DataInputStream(bis).readDouble()); + } + break; + case STRING: { + if (isNull) { + m.set(fieldName).to((String) null); + } else { + int len = VarInt.decodeInt(bis); + byte[] bytes = readBytes(bis, len); + m.set(fieldName).to(new String(bytes, StandardCharsets.UTF_8)); + } + break; + } + case BYTES: { + if (isNull) { + m.set(fieldName).to((ByteArray) null); + } else { + int len = VarInt.decodeInt(bis); + byte[] bytes = readBytes(bis, len); + m.set(fieldName).to(ByteArray.copyFrom(bytes)); + } + break; + } + case TIMESTAMP: { + if (isNull) { + m.set(fieldName).to((Timestamp) null); + } else { + int seconds = VarInt.decodeInt(bis); + int nanoseconds = VarInt.decodeInt(bis); + m.set(fieldName).to(Timestamp.ofTimeSecondsAndNanos(seconds, nanoseconds)); + } + break; + } + case DATE: { + if (isNull) { + m.set(fieldName).to((Date) null); + } else { + int days = VarInt.decodeInt(bis); + m.set(fieldName).to(decodeDate(days)); + } + break; + } + default: + throw new IllegalArgumentException("Unknown type " + type); + } + } + + private byte[] readBytes(ByteArrayInputStream bis, int len) throws IOException { + byte[] tmp = new byte[len]; + new DataInputStream(bis).readFully(tmp); + return tmp; + } + + /** + * Builds a lexicographically sortable binary key based on a primary key descriptor. + * @param m a spanner mutation. + * @return a binary string that preserves the ordering of the primary key. + */ + public byte[] encodeKey(Mutation m) { + Map mutationMap = mutationAsMap(m); + OrderedCode orderedCode = new OrderedCode(); + for (SpannerSchema.KeyPart part : schema.getKeyParts(m.getTable())) { + Value val = mutationMap.get(part.getField()); + if (val.isNull()) { + if (part.isDesc()) { + orderedCode.writeInfinityDecreasing(); + } else { + orderedCode.writeInfinity(); + } + } else { + Type.Code code = val.getType().getCode(); + switch (code) { + case BOOL: + long v = val.getBool() ? 0 : 1; + if (part.isDesc()) { + orderedCode.writeSignedNumDecreasing(v); + } else { + orderedCode.writeSignedNumIncreasing(v); + } + break; + case INT64: + if (part.isDesc()) { + orderedCode.writeSignedNumDecreasing(val.getInt64()); + } else { + orderedCode.writeSignedNumIncreasing(val.getInt64()); + } + break; + case FLOAT64: + if (part.isDesc()) { + orderedCode.writeSignedNumDecreasing(Double.doubleToLongBits(val.getFloat64())); + } else { + orderedCode.writeSignedNumIncreasing(Double.doubleToLongBits(val.getFloat64())); + } + break; + case STRING: + if (part.isDesc()) { + orderedCode.writeBytesDecreasing(val.getString().getBytes()); + } else { + orderedCode.writeBytes(val.getString().getBytes()); + } + break; + case BYTES: + if (part.isDesc()) { + orderedCode.writeBytesDecreasing(val.getBytes().toByteArray()); + } else { + orderedCode.writeBytes(val.getBytes().toByteArray()); + } + break; + case TIMESTAMP: { + Timestamp value = val.getTimestamp(); + if (part.isDesc()) { + orderedCode.writeNumDecreasing(value.getSeconds()); + orderedCode.writeNumDecreasing(value.getNanos()); + } else { + orderedCode.writeNumIncreasing(value.getSeconds()); + orderedCode.writeNumIncreasing(value.getNanos()); + } + break; + } + case DATE: + Date value = val.getDate(); + if (part.isDesc()) { + orderedCode.writeSignedNumDecreasing(encodeDate(value)); + } else { + orderedCode.writeSignedNumIncreasing(encodeDate(value)); + } + break; + default: + throw new IllegalArgumentException("Unknown type " + val.getType()); + } + } + } + return orderedCode.getEncodedBytes(); + } + + public byte[] encodeKey(String table, Key key) { + OrderedCode orderedCode = new OrderedCode(); + List parts = schema.getKeyParts(table); + Iterator it = key.getParts().iterator(); + for (SpannerSchema.KeyPart part : parts) { + Object value = it.next(); + if (value == null) { + if (part.isDesc()) { + orderedCode.writeInfinityDecreasing(); + } else { + orderedCode.writeInfinity(); + } + } else { + if (value instanceof Boolean) { + long v = (Boolean) value ? 0 : 1; + if (part.isDesc()) { + orderedCode.writeSignedNumDecreasing(v); + } else { + orderedCode.writeSignedNumIncreasing(v); + } + } else if (value instanceof Long) { + long v = (long) value; + if (part.isDesc()) { + orderedCode.writeSignedNumDecreasing(v); + } else { + orderedCode.writeSignedNumIncreasing(v); + } + } else if (value instanceof Double) { + long v = Double.doubleToLongBits((double) value); + if (part.isDesc()) { + orderedCode.writeSignedNumDecreasing(v); + } else { + orderedCode.writeSignedNumIncreasing(v); + } + } else if (value instanceof String) { + String v = (String) value; + if (part.isDesc()) { + orderedCode.writeBytesDecreasing(v.getBytes()); + } else { + orderedCode.writeBytes(v.getBytes()); + } + } else if (value instanceof ByteArray) { + ByteArray v = (ByteArray) value; + if (part.isDesc()) { + orderedCode.writeBytesDecreasing(v.toByteArray()); + } else { + orderedCode.writeBytes(v.toByteArray()); + } + } else if (value instanceof Timestamp) { + Timestamp v = (Timestamp) value; + if (part.isDesc()) { + orderedCode.writeNumDecreasing(v.getSeconds()); + orderedCode.writeNumDecreasing(v.getNanos()); + } else { + orderedCode.writeNumIncreasing(v.getSeconds()); + orderedCode.writeNumIncreasing(v.getNanos()); + } + } else if (value instanceof Date) { + Date v = (Date) value; + if (part.isDesc()) { + orderedCode.writeSignedNumDecreasing(encodeDate(v)); + } else { + orderedCode.writeSignedNumIncreasing(encodeDate(v)); + } + } else { + throw new IllegalArgumentException("Unknown key part " + value); + } + } + } + return orderedCode.getEncodedBytes(); + } + + private static Map mutationAsMap(Mutation m) { + Map result = new HashMap<>(); + Iterator coli = m.getColumns().iterator(); + Iterator vali = m.getValues().iterator(); + while (coli.hasNext()) { + String column = coli.next(); + Value val = vali.next(); + result.put(column.toLowerCase(), val); + } + return result; + } + + private static int encodeDate(Date date) { + + MutableDateTime jodaDate = new MutableDateTime(); + jodaDate.setDate(date.getYear(), date.getMonth(), date.getDayOfMonth()); + + return Days.daysBetween(MIN_DATE, jodaDate).getDays(); + } + + private static Date decodeDate(int daysSinceEpoch) { + + DateTime jodaDate = MIN_DATE.plusDays(daysSinceEpoch); + + return Date + .fromYearMonthDay(jodaDate.getYear(), jodaDate.getMonthOfYear(), jodaDate.getDayOfMonth()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/3ba96003/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java index 2418816..c483af9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java @@ -18,6 +18,11 @@ package org.apache.beam.sdk.io.gcp.spanner; import com.google.cloud.ByteArray; +import com.google.cloud.Date; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeyRange; +import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Value; @@ -29,6 +34,9 @@ class MutationSizeEstimator { /** Estimates a size of mutation in bytes. */ static long sizeOf(Mutation m) { + if (m.getOperation() == Mutation.Op.DELETE) { + return sizeOf(m.getKeySet()); + } long result = 0; for (Value v : m.getValues()) { switch (v.getType().getCode()) { @@ -44,6 +52,46 @@ class MutationSizeEstimator { return result; } + private static long sizeOf(KeySet keySet) { + long result = 0; + for (Key k : keySet.getKeys()) { + result += sizeOf(k); + } + for (KeyRange kr : keySet.getRanges()) { + result += sizeOf(kr); + } + return result; + } + + private static long sizeOf(KeyRange kr) { + return sizeOf(kr.getStart()) + sizeOf(kr.getEnd()); + } + + private static long sizeOf(Key k) { + long result = 0; + for (Object part : k.getParts()) { + if (part == null) { + continue; + } + if (part instanceof Boolean) { + result += 1; + } else if (part instanceof Long) { + result += 8; + } else if (part instanceof Double) { + result += 8; + } else if (part instanceof String) { + result += ((String) part).length(); + } else if (part instanceof ByteArray) { + result += ((ByteArray) part).length(); + } else if (part instanceof Timestamp) { + result += 12; + } else if (part instanceof Date) { + result += 12; + } + } + return result; + } + /** Estimates a size of the mutation group in bytes. */ public static long sizeOf(MutationGroup group) { long result = 0; http://git-wip-us.apache.org/repos/asf/beam/blob/3ba96003/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java new file mode 100644 index 0000000..d40e356 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.ByteArray; +import com.google.cloud.Date; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeyRange; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Mutation; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.primitives.UnsignedBytes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Tests for {@link MutationGroupEncoder}. + */ +public class MutationGroupEncoderTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private SpannerSchema allTypesSchema; + + @Before + public void setUp() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + + builder.addColumn("test", "intkey", "INT64"); + builder.addKeyPart("test", "intkey", false); + + builder.addColumn("test", "bool", "BOOL"); + builder.addColumn("test", "int64", "INT64"); + builder.addColumn("test", "float64", "FLOAT64"); + builder.addColumn("test", "string", "STRING"); + builder.addColumn("test", "bytes", "BYTES"); + builder.addColumn("test", "timestamp", "TIMESTAMP"); + builder.addColumn("test", "date", "DATE"); + + builder.addColumn("test", "nullbool", "BOOL"); + builder.addColumn("test", "nullint64", "INT64"); + builder.addColumn("test", "nullfloat64", "FLOAT64"); + builder.addColumn("test", "nullstring", "STRING"); + builder.addColumn("test", "nullbytes", "BYTES"); + builder.addColumn("test", "nulltimestamp", "TIMESTAMP"); + builder.addColumn("test", "nulldate", "DATE"); + + builder.addColumn("test", "arrbool", "ARRAY"); + builder.addColumn("test", "arrint64", "ARRAY"); + builder.addColumn("test", "arrfloat64", "ARRAY"); + builder.addColumn("test", "arrstring", "ARRAY"); + builder.addColumn("test", "arrbytes", "ARRAY"); + builder.addColumn("test", "arrtimestamp", "ARRAY"); + builder.addColumn("test", "arrdate", "ARRAY"); + + builder.addColumn("test", "nullarrbool", "ARRAY"); + builder.addColumn("test", "nullarrint64", "ARRAY"); + builder.addColumn("test", "nullarrfloat64", "ARRAY"); + builder.addColumn("test", "nullarrstring", "ARRAY"); + builder.addColumn("test", "nullarrbytes", "ARRAY"); + builder.addColumn("test", "nullarrtimestamp", "ARRAY"); + builder.addColumn("test", "nullarrdate", "ARRAY"); + + allTypesSchema = builder.build(); + } + + @Test + public void testAllTypesSingleMutation() throws Exception { + encodeAndVerify(g(appendAllTypes(Mutation.newInsertOrUpdateBuilder("test")).build())); + encodeAndVerify(g(appendAllTypes(Mutation.newInsertBuilder("test")).build())); + encodeAndVerify(g(appendAllTypes(Mutation.newUpdateBuilder("test")).build())); + encodeAndVerify(g(appendAllTypes(Mutation.newReplaceBuilder("test")).build())); + } + + @Test + public void testAllTypesMultipleMutations() throws Exception { + encodeAndVerify(g( + appendAllTypes(Mutation.newInsertOrUpdateBuilder("test")).build(), + appendAllTypes(Mutation.newInsertBuilder("test")).build(), + appendAllTypes(Mutation.newUpdateBuilder("test")).build(), + appendAllTypes(Mutation.newReplaceBuilder("test")).build(), + Mutation + .delete("test", KeySet.range(KeyRange.closedClosed(Key.of(1L), Key.of(2L)))))); + } + + @Test + public void testUnknownColumn() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + builder.addKeyPart("test", "bool_field", false); + builder.addColumn("test", "bool_field", "BOOL"); + SpannerSchema schema = builder.build(); + + Mutation mutation = Mutation.newInsertBuilder("test").set("unknown") + .to(true).build(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Columns [unknown] were not defined in table test"); + encodeAndVerify(g(mutation), schema); + } + + @Test + public void testUnknownTable() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + builder.addKeyPart("test", "bool_field", false); + builder.addColumn("test", "bool_field", "BOOL"); + SpannerSchema schema = builder.build(); + + Mutation mutation = Mutation.newInsertBuilder("unknown").set("bool_field") + .to(true).build(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Unknown table 'unknown'"); + encodeAndVerify(g(mutation), schema); + } + + @Test + public void testMutationCaseInsensitive() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + builder.addKeyPart("test", "bool_field", false); + builder.addColumn("test", "bool_field", "BOOL"); + SpannerSchema schema = builder.build(); + + Mutation mutation = Mutation.newInsertBuilder("TEsT").set("BoOL_FiELd").to(true).build(); + encodeAndVerify(g(mutation), schema); + } + + @Test + public void testDeleteCaseInsensitive() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + builder.addKeyPart("test", "bool_field", false); + builder.addColumn("test", "int_field", "INT64"); + SpannerSchema schema = builder.build(); + + Mutation mutation = Mutation.delete("TeSt", Key.of(1L)); + encodeAndVerify(g(mutation), schema); + } + + @Test + public void testDeletes() throws Exception { + encodeAndVerify(g(Mutation.delete("test", Key.of(1L)))); + encodeAndVerify(g(Mutation.delete("test", Key.of((Long) null)))); + + KeySet allTypes = KeySet.newBuilder() + .addKey(Key.of(1L)) + .addKey(Key.of((Long) null)) + .addKey(Key.of(1.2)) + .addKey(Key.of((Double) null)) + .addKey(Key.of("one")) + .addKey(Key.of((String) null)) + .addKey(Key.of(ByteArray.fromBase64("abcd"))) + .addKey(Key.of((ByteArray) null)) + .addKey(Key.of(Timestamp.now())) + .addKey(Key.of((Timestamp) null)) + .addKey(Key.of(Date.fromYearMonthDay(2012, 1, 1))) + .addKey(Key.of((Date) null)) + .build(); + + encodeAndVerify(g(Mutation.delete("test", allTypes))); + + encodeAndVerify( + g(Mutation + .delete("test", KeySet.range(KeyRange.closedClosed(Key.of(1L), Key.of(2L)))))); + } + + private Mutation.WriteBuilder appendAllTypes(Mutation.WriteBuilder builder) { + Timestamp ts = Timestamp.now(); + Date date = Date.fromYearMonthDay(2017, 1, 1); + return builder + .set("bool").to(true) + .set("int64").to(1L) + .set("float64").to(1.0) + .set("string").to("my string") + .set("bytes").to(ByteArray.fromBase64("abcdedf")) + .set("timestamp").to(ts) + .set("date").to(date) + + .set("arrbool").toBoolArray(Arrays.asList(true, false, null, true, null, false)) + .set("arrint64").toInt64Array(Arrays.asList(10L, -12L, null, null, 100000L)) + .set("arrfloat64").toFloat64Array(Arrays.asList(10., -12.23, null, null, 100000.33231)) + .set("arrstring").toStringArray(Arrays.asList("one", "two", null, null, "three")) + .set("arrbytes").toBytesArray(Arrays.asList(ByteArray.fromBase64("abcs"), null)) + .set("arrtimestamp").toTimestampArray(Arrays.asList(Timestamp.MIN_VALUE, null, ts)) + .set("arrdate").toDateArray(Arrays.asList(null, date)) + + .set("nullbool").to((Boolean) null) + .set("nullint64").to((Long) null) + .set("nullfloat64").to((Double) null) + .set("nullstring").to((String) null) + .set("nullbytes").to((ByteArray) null) + .set("nulltimestamp").to((Timestamp) null) + .set("nulldate").to((Date) null) + + .set("nullarrbool").toBoolArray((Iterable) null) + .set("nullarrint64").toInt64Array((Iterable) null) + .set("nullarrfloat64").toFloat64Array((Iterable) null) + .set("nullarrstring").toStringArray(null) + .set("nullarrbytes").toBytesArray(null) + .set("nullarrtimestamp").toTimestampArray(null) + .set("nullarrdate").toDateArray(null); + } + + @Test + public void int64Keys() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + + builder.addColumn("test", "key", "INT64"); + builder.addKeyPart("test", "key", false); + + builder.addColumn("test", "keydesc", "INT64"); + builder.addKeyPart("test", "keydesc", true); + + SpannerSchema schema = builder.build(); + + List mutations = Arrays.asList( + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(1L) + .set("keydesc").to(0L) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(2L) + .set("keydesc").to((Long) null) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(2L) + .set("keydesc").to(10L) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(2L) + .set("keydesc").to(9L) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to((Long) null) + .set("keydesc").to(0L) + .build()); + + List keys = Arrays.asList( + Key.of(1L, 0L), + Key.of(2L, null), + Key.of(2L, 10L), + Key.of(2L, 9L), + Key.of(2L, 0L) + ); + + verifyEncodedOrdering(schema, mutations); + verifyEncodedOrdering(schema, "test", keys); + } + + @Test + public void float64Keys() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + + builder.addColumn("test", "key", "FLOAT64"); + builder.addKeyPart("test", "key", false); + + builder.addColumn("test", "keydesc", "FLOAT64"); + builder.addKeyPart("test", "keydesc", true); + + SpannerSchema schema = builder.build(); + + List mutations = Arrays.asList( + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(1.0) + .set("keydesc").to(0.) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(2.) + .set("keydesc").to((Long) null) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(2.) + .set("keydesc").to(10.) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(2.) + .set("keydesc").to(9.) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(2.) + .set("keydesc").to(0.) + .build()); + List keys = Arrays.asList( + Key.of(1., 0.), + Key.of(2., null), + Key.of(2., 10.), + Key.of(2., 9.), + Key.of(2., 0.) + ); + + verifyEncodedOrdering(schema, mutations); + verifyEncodedOrdering(schema, "test", keys); + } + + @Test + public void stringKeys() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + + builder.addColumn("test", "key", "STRING"); + builder.addKeyPart("test", "key", false); + + builder.addColumn("test", "keydesc", "STRING"); + builder.addKeyPart("test", "keydesc", true); + + SpannerSchema schema = builder.build(); + + List mutations = Arrays.asList( + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to("a") + .set("keydesc").to("bc") + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to("b") + .set("keydesc").to((String) null) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to("b") + .set("keydesc").to("z") + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to("b") + .set("keydesc").to("y") + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to("b") + .set("keydesc").to("a") + .build()); + + List keys = Arrays.asList( + Key.of("a", "bc"), + Key.of("b", null), + Key.of("b", "z"), + Key.of("b", "y"), + Key.of("b", "a") + ); + + verifyEncodedOrdering(schema, mutations); + verifyEncodedOrdering(schema, "test", keys); + } + + @Test + public void bytesKeys() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + + builder.addColumn("test", "key", "BYTES"); + builder.addKeyPart("test", "key", false); + + builder.addColumn("test", "keydesc", "BYTES"); + builder.addKeyPart("test", "keydesc", true); + + SpannerSchema schema = builder.build(); + + List mutations = Arrays.asList( + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(ByteArray.fromBase64("abc")) + .set("keydesc").to(ByteArray.fromBase64("zzz")) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(ByteArray.fromBase64("xxx")) + .set("keydesc").to((ByteArray) null) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(ByteArray.fromBase64("xxx")) + .set("keydesc").to(ByteArray.fromBase64("zzzz")) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(ByteArray.fromBase64("xxx")) + .set("keydesc").to(ByteArray.fromBase64("ssss")) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(ByteArray.fromBase64("xxx")) + .set("keydesc").to(ByteArray.fromBase64("aaa")) + .build()); + + List keys = Arrays.asList( + Key.of(ByteArray.fromBase64("abc"), ByteArray.fromBase64("zzz")), + Key.of(ByteArray.fromBase64("xxx"), null), + Key.of(ByteArray.fromBase64("xxx"), ByteArray.fromBase64("zzz")), + Key.of(ByteArray.fromBase64("xxx"), ByteArray.fromBase64("sss")), + Key.of(ByteArray.fromBase64("xxx"), ByteArray.fromBase64("aaa")) + ); + + verifyEncodedOrdering(schema, mutations); + verifyEncodedOrdering(schema, "test", keys); + } + + @Test + public void dateKeys() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + + builder.addColumn("test", "key", "DATE"); + builder.addKeyPart("test", "key", false); + + builder.addColumn("test", "keydesc", "DATE"); + builder.addKeyPart("test", "keydesc", true); + + SpannerSchema schema = builder.build(); + + List mutations = Arrays.asList( + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(Date.fromYearMonthDay(2012, 10, 10)) + .set("keydesc").to(Date.fromYearMonthDay(2000, 10, 10)) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(Date.fromYearMonthDay(2020, 10, 10)) + .set("keydesc").to((Date) null) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(Date.fromYearMonthDay(2020, 10, 10)) + .set("keydesc").to(Date.fromYearMonthDay(2050, 10, 10)) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(Date.fromYearMonthDay(2020, 10, 10)) + .set("keydesc").to(Date.fromYearMonthDay(2000, 10, 10)) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(Date.fromYearMonthDay(2020, 10, 10)) + .set("keydesc").to(Date.fromYearMonthDay(1900, 10, 10)) + .build()); + + List keys = Arrays.asList( + Key.of(Date.fromYearMonthDay(2012, 10, 10), ByteArray.fromBase64("zzz")), + Key.of(Date.fromYearMonthDay(2015, 10, 10), null), + Key.of(Date.fromYearMonthDay(2015, 10, 10), Date.fromYearMonthDay(2050, 10, 10)), + Key.of(Date.fromYearMonthDay(2015, 10, 10), Date.fromYearMonthDay(2000, 10, 10)), + Key.of(Date.fromYearMonthDay(2015, 10, 10), Date.fromYearMonthDay(1900, 10, 10)) + ); + + verifyEncodedOrdering(schema, mutations); + verifyEncodedOrdering(schema, "test", keys); + } + + @Test + public void timestampKeys() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + + builder.addColumn("test", "key", "TIMESTAMP"); + builder.addKeyPart("test", "key", false); + + builder.addColumn("test", "keydesc", "TIMESTAMP"); + builder.addKeyPart("test", "keydesc", true); + + SpannerSchema schema = builder.build(); + + List mutations = Arrays.asList( + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(Timestamp.ofTimeMicroseconds(10000)) + .set("keydesc").to(Timestamp.ofTimeMicroseconds(50000)) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(Timestamp.ofTimeMicroseconds(20000)) + .set("keydesc").to((Timestamp) null) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(Timestamp.ofTimeMicroseconds(20000)) + .set("keydesc").to(Timestamp.ofTimeMicroseconds(90000)) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(Timestamp.ofTimeMicroseconds(20000)) + .set("keydesc").to(Timestamp.ofTimeMicroseconds(50000)) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("key").to(Timestamp.ofTimeMicroseconds(20000)) + .set("keydesc").to(Timestamp.ofTimeMicroseconds(10000)) + .build()); + + + List keys = Arrays.asList( + Key.of(Timestamp.ofTimeMicroseconds(10000), ByteArray.fromBase64("zzz")), + Key.of(Timestamp.ofTimeMicroseconds(20000), null), + Key.of(Timestamp.ofTimeMicroseconds(20000), Timestamp.ofTimeMicroseconds(90000)), + Key.of(Timestamp.ofTimeMicroseconds(20000), Timestamp.ofTimeMicroseconds(50000)), + Key.of(Timestamp.ofTimeMicroseconds(20000), Timestamp.ofTimeMicroseconds(10000)) + ); + + verifyEncodedOrdering(schema, mutations); + verifyEncodedOrdering(schema, "test", keys); + } + + @Test + public void boolKeys() throws Exception { + SpannerSchema.Builder builder = SpannerSchema.builder(); + + builder.addColumn("test", "boolkey", "BOOL"); + builder.addKeyPart("test", "boolkey", false); + + builder.addColumn("test", "boolkeydesc", "BOOL"); + builder.addKeyPart("test", "boolkeydesc", true); + + SpannerSchema schema = builder.build(); + + List mutations = Arrays.asList( + Mutation.newInsertOrUpdateBuilder("test") + .set("boolkey").to(true) + .set("boolkeydesc").to(false) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("boolkey").to(false) + .set("boolkeydesc").to(false) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("boolkey").to(false) + .set("boolkeydesc").to(true) + .build(), + Mutation.newInsertOrUpdateBuilder("test") + .set("boolkey").to((Boolean) null) + .set("boolkeydesc").to(false) + .build() + ); + + List keys = Arrays.asList( + Key.of(true, ByteArray.fromBase64("zzz")), + Key.of(false, null), + Key.of(false, false), + Key.of(false, true), + Key.of(null, false) + ); + + verifyEncodedOrdering(schema, mutations); + verifyEncodedOrdering(schema, "test", keys); + } + + private void verifyEncodedOrdering(SpannerSchema schema, List mutations) { + MutationGroupEncoder encoder = new MutationGroupEncoder(schema); + List mutationEncodings = new ArrayList<>(mutations.size()); + for (Mutation m : mutations) { + mutationEncodings.add(encoder.encodeKey(m)); + } + List copy = new ArrayList<>(mutationEncodings); + Collections.sort(copy, UnsignedBytes.lexicographicalComparator()); + + Assert.assertEquals(mutationEncodings, copy); + } + + private void verifyEncodedOrdering(SpannerSchema schema, String table, List keys) { + MutationGroupEncoder encoder = new MutationGroupEncoder(schema); + List keyEncodings = new ArrayList<>(keys.size()); + for (Key k : keys) { + keyEncodings.add(encoder.encodeKey(table, k)); + } + List copy = new ArrayList<>(keyEncodings); + Collections.sort(copy, UnsignedBytes.lexicographicalComparator()); + + Assert.assertEquals(keyEncodings, copy); + } + + private MutationGroup g(Mutation mutation, Mutation... other) { + return MutationGroup.create(mutation, other); + } + + private void encodeAndVerify(MutationGroup expected) { + SpannerSchema schema = this.allTypesSchema; + encodeAndVerify(expected, schema); + } + + private static void encodeAndVerify(MutationGroup expected, SpannerSchema schema) { + MutationGroupEncoder coder = new MutationGroupEncoder(schema); + byte[] encode = coder.encode(expected); + MutationGroup actual = coder.decode(encode); + + Assert.assertTrue(mutationGroupsEqual(expected, actual)); + } + + private static boolean mutationGroupsEqual(MutationGroup a, MutationGroup b) { + ImmutableList alist = ImmutableList.copyOf(a); + ImmutableList blist = ImmutableList.copyOf(b); + + if (alist.size() != blist.size()) { + return false; + } + + for (int i = 0; i < alist.size(); i++) { + if (!mutationsEqual(alist.get(i), blist.get(i))) { + return false; + } + } + return true; + } + + // Is different from Mutation#equals. Case insensitive for table/column names, the order of + // the columns doesn't matter. + private static boolean mutationsEqual(Mutation a, Mutation b) { + if (a == b) { + return true; + } + if (a == null || b == null) { + return false; + } + if (a.getOperation() != b.getOperation()) { + return false; + } + if (!a.getTable().equalsIgnoreCase(b.getTable())) { + return false; + } + if (a.getOperation() == Mutation.Op.DELETE) { + return a.getKeySet().equals(b.getKeySet()); + } + + // Compare pairs instead? This seems to be good enough... + return ImmutableSet.copyOf(getNormalizedColumns(a)) + .equals(ImmutableSet.copyOf(getNormalizedColumns(b))) && ImmutableSet.copyOf(a.getValues()) + .equals(ImmutableSet.copyOf(b.getValues())); + } + + // Pray for Java 8 support. + private static Iterable getNormalizedColumns(Mutation a) { + return Iterables.transform(a.getColumns(), new Function() { + + @Override + public String apply(String input) { + return input.toLowerCase(); + } + }); + } +}