tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization
Date Wed, 11 May 2016 21:42:06 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 4cb020c53 -> 60b125686


TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/60b12568
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/60b12568
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/60b12568

Branch: refs/heads/branch-0.7
Commit: 60b125686953e9a74e1fe2130e868fb6e86035ff
Parents: 4cb020c
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Wed May 11 16:41:21 2016 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Wed May 11 16:41:21 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/runtime/api/impl/EventMetaData.java     |  48 ++++++
 .../apache/tez/runtime/api/impl/TezEvent.java   | 109 ++++++++-----
 .../tez/runtime/api/impl/TestTezEvent.java      | 152 +++++++++++++++++++
 4 files changed, 272 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/60b12568/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d7f67cf..55c79b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.2 Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization
   TEZ-3243. Output vertices are hidden for UI graph view
 
 Release 0.7.1: Unreleased

http://git-wip-us.apache.org/repos/asf/tez/blob/60b12568/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
index 88cad47..0ee96af 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
@@ -19,11 +19,13 @@
 package org.apache.tez.runtime.api.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
 import javax.annotation.Nullable;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -137,4 +139,50 @@ public class EventMetaData implements Writable {
         + ", taskAttemptId=" + (taskAttemptID == null? "null" : taskAttemptID)
         + " }";
   }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((edgeVertexName == null) ? 0 : edgeVertexName.hashCode());
+    result = prime
+        * result
+        + ((producerConsumerType == null) ? 0 : producerConsumerType.hashCode());
+    result = prime * result
+        + ((taskAttemptID == null) ? 0 : taskAttemptID.hashCode());
+    result = prime * result
+        + ((taskVertexName == null) ? 0 : taskVertexName.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    EventMetaData other = (EventMetaData) obj;
+    if (edgeVertexName == null) {
+      if (other.edgeVertexName != null)
+        return false;
+    } else if (!edgeVertexName.equals(other.edgeVertexName))
+      return false;
+    if (producerConsumerType != other.producerConsumerType)
+      return false;
+    if (taskAttemptID == null) {
+      if (other.taskAttemptID != null)
+        return false;
+    } else if (!taskAttemptID.equals(other.taskAttemptID))
+      return false;
+    if (taskVertexName == null) {
+      if (other.taskVertexName != null)
+        return false;
+    } else if (!taskVertexName.equals(other.taskVertexName))
+      return false;
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/60b12568/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index b44b7d4..0516bb3 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -21,7 +21,9 @@ package org.apache.tez.runtime.api.impl;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.OutputStream;
 
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.common.ProtoConverters;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -35,10 +37,10 @@ import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.RootInputDataInformationEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -46,6 +48,10 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
 import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
 
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+
 public class TezEvent implements Writable {
 
   private EventType eventType;
@@ -55,7 +61,7 @@ public class TezEvent implements Writable {
   private EventMetaData sourceInfo;
 
   private EventMetaData destinationInfo;
-  
+
   private long eventReceivedTime;
 
   public TezEvent() {
@@ -64,7 +70,7 @@ public class TezEvent implements Writable {
   public TezEvent(Event event, EventMetaData sourceInfo) {
     this(event, sourceInfo, System.currentTimeMillis());
   }
-  
+
   public TezEvent(Event event, EventMetaData sourceInfo, long time) {
     this.event = event;
     this.eventReceivedTime = time;
@@ -98,11 +104,11 @@ public class TezEvent implements Writable {
   public Event getEvent() {
     return event;
   }
-  
+
   public void setEventReceivedTime(long eventReceivedTime) { // TODO save
     this.eventReceivedTime = eventReceivedTime;
   }
-  
+
   public long getEventReceivedTime() {
     return eventReceivedTime;
   }
@@ -140,61 +146,72 @@ public class TezEvent implements Writable {
       TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
       sEvt.write(out);
     } else {
-      byte[] eventBytes = null;
+      AbstractMessage message;
       switch (eventType) {
       case DATA_MOVEMENT_EVENT:
-        eventBytes =
+        message =
             ProtoConverters.convertDataMovementEventToProto(
-                (DataMovementEvent) event).toByteArray();
+                (DataMovementEvent) event);
         break;
       case COMPOSITE_DATA_MOVEMENT_EVENT:
-        eventBytes =
+        message =
             ProtoConverters.convertCompositeDataMovementEventToProto(
-                (CompositeDataMovementEvent) event).toByteArray();
+                (CompositeDataMovementEvent) event);
         break;
       case VERTEX_MANAGER_EVENT:
-        eventBytes = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent)
event)
-            .toByteArray();
+        message = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event);
         break;
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
-        eventBytes = InputReadErrorEventProto.newBuilder()
+        message = InputReadErrorEventProto.newBuilder()
             .setIndex(ideEvt.getIndex())
             .setDiagnostics(ideEvt.getDiagnostics())
             .setVersion(ideEvt.getVersion())
-            .build().toByteArray();
+            .build();
         break;
       case TASK_ATTEMPT_FAILED_EVENT:
         TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
-        eventBytes = TaskAttemptFailedEventProto.newBuilder()
+        message = TaskAttemptFailedEventProto.newBuilder()
             .setDiagnostics(tfEvt.getDiagnostics())
-            .build().toByteArray();
+            .build();
         break;
       case TASK_ATTEMPT_COMPLETED_EVENT:
-        eventBytes = TaskAttemptCompletedEventProto.newBuilder()
-            .build().toByteArray();
+        message = TaskAttemptCompletedEventProto.newBuilder()
+            .build();
         break;
       case INPUT_FAILED_EVENT:
         InputFailedEvent ifEvt = (InputFailedEvent) event;
-        eventBytes = InputFailedEventProto.newBuilder()
+        message = InputFailedEventProto.newBuilder()
             .setTargetIndex(ifEvt.getTargetIndex())
-            .setVersion(ifEvt.getVersion()).build().toByteArray();
+            .setVersion(ifEvt.getVersion()).build();
         break;
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
-        eventBytes = ProtoConverters.convertRootInputDataInformationEventToProto(
-            (InputDataInformationEvent) event).toByteArray();
+        message = ProtoConverters.convertRootInputDataInformationEventToProto(
+            (InputDataInformationEvent) event);
         break;
       case ROOT_INPUT_INITIALIZER_EVENT:
-        eventBytes = ProtoConverters
-            .convertRootInputInitializerEventToProto((InputInitializerEvent) event)
-            .toByteArray();
+        message = ProtoConverters
+            .convertRootInputInitializerEventToProto((InputInitializerEvent) event);
         break;
       default:
         throw new TezUncheckedException("Unknown TezEvent"
            + ", type=" + eventType);
       }
-      out.writeInt(eventBytes.length);
-      out.write(eventBytes);
+      if (out instanceof OutputStream) { //DataOutputBuffer extends DataOutputStream
+        int serializedSize = message.getSerializedSize();
+        out.writeInt(serializedSize);
+        int buffersize = serializedSize < CodedOutputStream.DEFAULT_BUFFER_SIZE ? serializedSize
+            : CodedOutputStream.DEFAULT_BUFFER_SIZE;
+        CodedOutputStream codedOut = CodedOutputStream.newInstance(
+            (OutputStream) out, buffersize);
+        message.writeTo(codedOut);
+        codedOut.flush();
+      } else {
+        byte[] eventBytes = message.toByteArray();
+        out.writeInt(eventBytes.length);
+        out.write(eventBytes);
+      }
+
     }
   }
 
@@ -211,31 +228,40 @@ public class TezEvent implements Writable {
       ((TaskStatusUpdateEvent)event).readFields(in);
     } else {
       int eventBytesLen = in.readInt();
-      byte[] eventBytes = new byte[eventBytesLen];
-      in.readFully(eventBytes);
+      byte[] eventBytes;
+      CodedInputStream input;
+      int startOffset = 0;
+      if (in instanceof DataInputBuffer) {
+        eventBytes = ((DataInputBuffer)in).getData();
+        startOffset = ((DataInputBuffer) in).getPosition();
+      } else {
+        eventBytes = new byte[eventBytesLen];
+        in.readFully(eventBytes);
+      }
+      input = CodedInputStream.newInstance(eventBytes, startOffset, eventBytesLen);
       switch (eventType) {
       case DATA_MOVEMENT_EVENT:
         DataMovementEventProto dmProto =
-            DataMovementEventProto.parseFrom(eventBytes);
+            DataMovementEventProto.parseFrom(input);
         event = ProtoConverters.convertDataMovementEventFromProto(dmProto);
         break;
       case COMPOSITE_DATA_MOVEMENT_EVENT:
-        CompositeEventProto cProto = CompositeEventProto.parseFrom(eventBytes);
+        CompositeEventProto cProto = CompositeEventProto.parseFrom(input);
         event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto);
         break;
       case VERTEX_MANAGER_EVENT:
-        VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(eventBytes);
+        VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(input);
         event = ProtoConverters.convertVertexManagerEventFromProto(vmProto);
         break;
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEventProto ideProto =
-            InputReadErrorEventProto.parseFrom(eventBytes);
+            InputReadErrorEventProto.parseFrom(input);
         event = InputReadErrorEvent.create(ideProto.getDiagnostics(),
             ideProto.getIndex(), ideProto.getVersion());
         break;
       case TASK_ATTEMPT_FAILED_EVENT:
         TaskAttemptFailedEventProto tfProto =
-            TaskAttemptFailedEventProto.parseFrom(eventBytes);
+            TaskAttemptFailedEventProto.parseFrom(input);
         event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
         break;
       case TASK_ATTEMPT_COMPLETED_EVENT:
@@ -243,16 +269,16 @@ public class TezEvent implements Writable {
         break;
       case INPUT_FAILED_EVENT:
         InputFailedEventProto ifProto =
-            InputFailedEventProto.parseFrom(eventBytes);
+            InputFailedEventProto.parseFrom(input);
         event = InputFailedEvent.create(ifProto.getTargetIndex(), ifProto.getVersion());
         break;
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
         RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto
-            .parseFrom(eventBytes);
+            .parseFrom(input);
         event = ProtoConverters.convertRootInputDataInformationEventFromProto(difProto);
         break;
       case ROOT_INPUT_INITIALIZER_EVENT:
-        EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(eventBytes);
+        EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(input);
         event = ProtoConverters.convertRootInputInitializerEventFromProto(riiProto);
         break;
       default:
@@ -260,6 +286,13 @@ public class TezEvent implements Writable {
         throw new TezUncheckedException("Unexpected TezEvent"
            + ", type=" + eventType);
       }
+      if (in instanceof DataInputBuffer) {
+        // Skip so that position is updated
+        int skipped = in.skipBytes(eventBytesLen);
+        if (skipped != eventBytesLen) {
+          throw new TezUncheckedException("Expected to skip " + eventBytesLen + " bytes.
Actually skipped = " + skipped);
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/60b12568/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java
new file mode 100644
index 0000000..b39c4ed
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.api.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
+public class TestTezEvent {
+
+  @Test
+  public void testSerialization() throws IOException {
+
+    ArrayList<TezEvent> events = new ArrayList<TezEvent>();
+
+    Configuration conf = new Configuration(true);
+    String confVal = RandomStringUtils.random(10000, true, true);
+    conf.set("testKey", confVal);
+    UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
+    TezTaskAttemptID srcTAID = TezTaskAttemptID.getInstance(
+        TezTaskID.fromString("task_1454468251169_866787_1_02_000000"), 1000);
+    TezTaskAttemptID destTAID = TezTaskAttemptID.getInstance(
+        TezTaskID.fromString("task_1454468251169_866787_1_02_000000"), 2000);
+    EventMetaData srcInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+        "v1", "v2", srcTAID);
+    EventMetaData destInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+        "v3", "v4", destTAID);
+
+    // Case of size less than 4K and parsing skipped during deserialization
+    events.add(new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData(
+        EventProducerConsumerType.PROCESSOR, "v1", "v2", srcTAID)));
+    TezEvent dmeEvent = new TezEvent(DataMovementEvent.create(1000, 3, 1,
+        payload.getPayload()), srcInfo, System.currentTimeMillis());
+    dmeEvent.setDestinationInfo(destInfo);
+    events.add(dmeEvent);
+    // Different code path
+    events.add(new TezEvent(new TaskStatusUpdateEvent(null, 0.1f, null, false),
+        new EventMetaData(EventProducerConsumerType.PROCESSOR, "v5", "v6",
+            srcTAID)));
+
+    // Serialize to different types of DataOutput
+    // One that implements OutputStream and one that does not
+    DataOutputBuffer dataout = new DataOutputBuffer();
+    ByteArrayDataOutput bout = ByteStreams.newDataOutput();
+    serializeEvents(events, dataout);
+    serializeEvents(events, bout);
+
+    // Deserialize from different types of DataInput
+    // One with DataInputBuffer and another different implementation
+    DataInputBuffer datain = new DataInputBuffer();
+    datain.reset(dataout.getData(), dataout.getLength());
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataout.getData(),
0, dataout.getLength()));
+    ArrayList<TezEvent> actual1 = deserializeEvents(datain);
+    ArrayList<TezEvent> actual2 = deserializeEvents(dis);
+    assertEventEquals(events, actual1);
+    assertEventEquals(events, actual2);
+
+    byte[] serializedBytes = bout.toByteArray();
+    datain.reset(serializedBytes, serializedBytes.length);
+    dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
+    actual1 = deserializeEvents(datain);
+    actual2 = deserializeEvents(dis);
+    assertEventEquals(events, actual1);
+    assertEventEquals(events, actual2);
+
+  }
+
+  private void serializeEvents(ArrayList<TezEvent> events, DataOutput out) throws IOException
{
+    out.writeInt(events.size());
+    for (TezEvent e : events) {
+      e.write(out);
+    }
+  }
+
+  private ArrayList<TezEvent> deserializeEvents(DataInput in) throws IOException {
+    int eventsCount = in.readInt();
+    ArrayList<TezEvent> events = new ArrayList<TezEvent>(eventsCount);
+    for (int i = 0; i < eventsCount; ++i) {
+      TezEvent e = new TezEvent();
+      e.readFields(in);
+      events.add(e);
+    }
+    return events;
+  }
+
+  private void assertEventEquals(ArrayList<TezEvent> expectedList, ArrayList<TezEvent>
actualList) {
+    Assert.assertEquals(expectedList.size(), actualList.size());
+    for (int i = 0; i < expectedList.size(); i++) {
+      TezEvent expected = expectedList.get(i);
+      TezEvent actual = actualList.get(i);
+      Assert.assertEquals(expected.getEventReceivedTime(), actual.getEventReceivedTime());
+      Assert.assertEquals(expected.getSourceInfo(), actual.getSourceInfo());
+      Assert.assertEquals(expected.getDestinationInfo(), actual.getDestinationInfo());
+      Assert.assertEquals(expected.getEventType(), actual.getEventType());
+      // Doing this instead of implementing equals methods for events
+      if (i == 0) {
+        Assert.assertTrue(actual.getEvent() instanceof TaskAttemptCompletedEvent);
+      } else if (i == 1) {
+        DataMovementEvent dmeExpected = (DataMovementEvent) expected.getEvent();
+        DataMovementEvent dmeActual = (DataMovementEvent) actual.getEvent();
+        Assert.assertEquals(dmeExpected.getSourceIndex(), dmeActual.getSourceIndex());
+        Assert.assertEquals(dmeExpected.getTargetIndex(), dmeActual.getTargetIndex());
+        Assert.assertEquals(dmeExpected.getVersion(), dmeActual.getVersion());
+        Assert.assertEquals(dmeExpected.getUserPayload(), dmeActual.getUserPayload());
+      } else {
+        TaskStatusUpdateEvent tsuExpected = (TaskStatusUpdateEvent) expected.getEvent();
+        TaskStatusUpdateEvent tsuActual = (TaskStatusUpdateEvent) actual.getEvent();
+        Assert.assertEquals(tsuExpected.getCounters(), tsuActual.getCounters());
+        Assert.assertEquals(tsuExpected.getProgress(), tsuActual.getProgress(), 0);
+        Assert.assertEquals(tsuExpected.getProgressNotified(), tsuActual.getProgressNotified());
+        Assert.assertEquals(tsuExpected.getStatistics(), tsuActual.getStatistics());
+      }
+    }
+  }
+
+}


Mime
View raw message