metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmerri...@apache.org
Subject [1/3] incubator-metron git commit: METRON-694 Index Errors from Topologies (merrimanr) closes apache/incubator-metron#453
Date Mon, 06 Mar 2017 21:25:48 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 27ee49096 -> 134a23311


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java
deleted file mode 100644
index da4a549..0000000
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/MessageGetters.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.writer.message;
-
-
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-
-public enum MessageGetters implements MessageGetter{
-   RAW(RawMessageGetter.DEFAULT)
-  ,NAMED(NamedMessageGetter.DEFAULT)
-  ;
-  MessageGetter getter;
-  MessageGetters(MessageGetter getter) {
-    this.getter = getter;
-  }
-  @Override
-  public JSONObject getMessage(Tuple t) {
-    return getter.getMessage(t);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java
deleted file mode 100644
index fdd5fb8..0000000
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/NamedMessageGetter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.writer.message;
-
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-
-public class NamedMessageGetter implements MessageGetter {
-  public static NamedMessageGetter DEFAULT = new NamedMessageGetter("message");
-  private String messageName;
-  public NamedMessageGetter(String name) {
-    this.messageName = name;
-  }
-  @Override
-  public JSONObject getMessage(Tuple tuple) {
-    return (JSONObject)tuple.getValueByField(messageName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java
deleted file mode 100644
index 99a8378..0000000
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/message/RawMessageGetter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.writer.message;
-
-import org.apache.storm.tuple.Tuple;
-import org.apache.metron.common.utils.JSONUtils;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-
-import java.io.UnsupportedEncodingException;
-
-public class RawMessageGetter implements MessageGetter {
-  public static RawMessageGetter DEFAULT = new RawMessageGetter(0);
-  private ThreadLocal<JSONParser> parser = new ThreadLocal<JSONParser>() {
-    @Override
-    protected JSONParser initialValue() {
-      return new JSONParser();
-    }
-  };
-  int position = 0;
-  public RawMessageGetter(int position) {
-    this.position = position;
-  }
-  @Override
-  public JSONObject getMessage(Tuple t) {
-    byte[] data = t.getBinary(position);
-    try {
-      return (JSONObject) parser.get().parse(new String(data, "UTF8"));
-    } catch (Exception e) {
-      throw new IllegalStateException(e.getMessage(), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
new file mode 100644
index 0000000..c560b30
--- /dev/null
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({BulkWriterComponent.class, ErrorUtils.class})
+public class BulkWriterComponentTest {
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  @Mock
+  private OutputCollector collector;
+
+  @Mock
+  private BulkMessageWriter<JSONObject> bulkMessageWriter;
+
+  @Mock
+  private WriterConfiguration configurations;
+
+  @Mock
+  private Tuple tuple1;
+
+  @Mock
+  private Tuple tuple2;
+
+  @Mock
+  private MessageGetStrategy messageGetStrategy;
+
+  private String sensorType = "testSensor";
+  private List<Tuple> tupleList;
+  private JSONObject message1 = new JSONObject();
+  private JSONObject message2 = new JSONObject();
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    mockStatic(ErrorUtils.class);
+    message1.put("value", "message1");
+    message2.put("value", "message2");
+    when(tuple1.getValueByField("message")).thenReturn(message1);
+    when(tuple2.getValueByField("message")).thenReturn(message2);
+    tupleList = Arrays.asList(tuple1, tuple2);
+    when(configurations.isEnabled(any())).thenReturn(true);
+    when(configurations.getBatchSize(any())).thenReturn(2);
+    when(messageGetStrategy.get(tuple1)).thenReturn(message1);
+    when(messageGetStrategy.get(tuple2)).thenReturn(message2);
+  }
+
+  @Test
+  public void writeShouldProperlyAckTuplesInBatch() throws Exception {
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addAllSuccesses(tupleList);
+
+    when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2),
Arrays.asList(message1, message2))).thenReturn(response);
+
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
+    bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations,
messageGetStrategy);
+
+    verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1),
Collections.singletonList(message1));
+    verify(collector, times(0)).ack(tuple1);
+    verify(collector, times(0)).ack(tuple2);
+
+    bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations,
messageGetStrategy);
+
+    verify(collector, times(1)).ack(tuple1);
+    verify(collector, times(1)).ack(tuple2);
+    verifyStatic(times(0));
+    ErrorUtils.handleError(eq(collector), any(MetronError.class));
+  }
+
+  @Test
+  public void writeShouldProperlyHandleWriterErrors() throws Exception {
+    Throwable e = new Exception("test exception");
+    MetronError error = new MetronError()
+            .withSensorType(sensorType)
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1,
message2));
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addAllErrors(e, tupleList);
+
+    when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2),
Arrays.asList(message1, message2))).thenReturn(response);
+
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
+    bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations,
messageGetStrategy);
+    bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations,
messageGetStrategy);
+
+    verifyStatic(times(1));
+    ErrorUtils.handleError(collector, error);
+  }
+
+  @Test
+  public void writeShouldThrowExceptionWhenHandleErrorIsFalse() throws Exception {
+    exception.expect(IllegalStateException.class);
+
+    Throwable e = new Exception("test exception");
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addAllErrors(e, tupleList);
+
+    when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2),
Arrays.asList(message1, message2))).thenReturn(response);
+
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector,
true, false);
+    bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations,
messageGetStrategy);
+    bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations,
messageGetStrategy);
+  }
+
+  @Test
+  public void writeShouldProperlyHandleWriterException() throws Exception {
+    Throwable e = new Exception("test exception");
+    MetronError error = new MetronError()
+            .withSensorType(sensorType)
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1,
message2));
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addAllErrors(e, tupleList);
+
+    when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2),
Arrays.asList(message1, message2))).thenThrow(e);
+
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
+    bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations,
messageGetStrategy);
+    bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations,
messageGetStrategy);
+
+    verifyStatic(times(1));
+    ErrorUtils.handleError(collector, error);
+  }
+
+  @Test
+  public void errorAllShouldClearMapsAndHandleErrors() throws Exception {
+    Throwable e = new Exception("test exception");
+    MetronError error1 = new MetronError()
+            .withSensorType("sensor1")
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
+    MetronError error2 = new MetronError()
+            .withSensorType("sensor2")
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
+
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
+    bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations,
messageGetStrategy);
+    bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, configurations,
messageGetStrategy);
+    bulkWriterComponent.errorAll(e, messageGetStrategy);
+
+    verifyStatic(times(1));
+    ErrorUtils.handleError(collector, error1);
+    ErrorUtils.handleError(collector, error2);
+
+    bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations,
messageGetStrategy);
+    verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1),
Collections.singletonList(message1));
+  }
+
+
+}


Mime
View raw message