hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject hadoop git commit: HDFS-9890: libhdfs++: Add test suite to simulate network issues. Contributed by Xiaowei Zhu.
Date Wed, 13 Jul 2016 15:41:55 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 d643d8c4f -> d18e39685


HDFS-9890: libhdfs++: Add test suite to simulate network issues.  Contributed by Xiaowei Zhu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d18e3968
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d18e3968
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d18e3968

Branch: refs/heads/HDFS-8707
Commit: d18e39685c184adc7cce7d01fd4906863c04be4b
Parents: d643d8c
Author: James <jhc@apache.org>
Authored: Wed Jul 13 15:36:14 2016 +0000
Committer: James <jhc@apache.org>
Committed: Wed Jul 13 15:36:14 2016 +0000

----------------------------------------------------------------------
 .../main/native/libhdfs-tests/native_mini_dfs.c |  22 +-
 .../main/native/libhdfs-tests/native_mini_dfs.h |  11 +-
 .../libhdfs-tests/test_libhdfs_mini_stress.c    | 345 +++++++++++++++++++
 .../native/libhdfspp/include/hdfspp/events.h    |   6 +-
 .../native/libhdfspp/include/hdfspp/hdfs_ext.h  |   4 +-
 .../native/libhdfspp/lib/bindings/c/hdfs.cc     |   7 +-
 .../main/native/libhdfspp/lib/fs/filehandle.cc  |  15 +-
 .../main/native/libhdfspp/lib/fs/filehandle.h   |   3 +-
 .../native/libhdfspp/lib/reader/block_reader.cc |  62 +++-
 .../native/libhdfspp/lib/reader/block_reader.h  |   5 +-
 .../native/libhdfspp/lib/rpc/rpc_connection.cc  |  13 +-
 .../native/libhdfspp/lib/rpc/rpc_connection.h   |  29 +-
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  |   1 -
 .../main/native/libhdfspp/tests/CMakeLists.txt  |   7 +
 .../native/libhdfspp/tests/bad_datanode_test.cc |   5 +-
 .../native/libhdfspp/tests/rpc_engine_test.cc   |   2 +-
 16 files changed, 483 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c
index b36ef76..6938109 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.c
@@ -182,6 +182,16 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
         }
         (*env)->DeleteLocalRef(env, val.l);
     }
+    if (conf->numDataNodes) {
+        jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
+                "numDataNodes", "(I)L" MINIDFS_CLUSTER_BUILDER ";", conf->numDataNodes);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
+                                  "Builder::numDataNodes");
+            goto error;
+        }
+    }
+    (*env)->DeleteLocalRef(env, val.l);
     jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
             "build", "()L" MINIDFS_CLUSTER ";");
     if (jthr) {
@@ -291,7 +301,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
     jthrowable jthr;
     int ret = 0;
     const char *host;
-    
+
     if (!env) {
         fprintf(stderr, "nmdHdfsConnect: getJNIEnv failed\n");
         return -EIO;
@@ -306,7 +316,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
         return -EIO;
     }
     jNameNode = jVal.l;
-    
+
     // Then get the http address (InetSocketAddress) of the NameNode
     jthr = invokeMethod(env, &jVal, INSTANCE, jNameNode, HADOOP_NAMENODE,
                         "getHttpAddress", "()L" JAVA_INETSOCKETADDRESS ";");
@@ -317,7 +327,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
         goto error_dlr_nn;
     }
     jAddress = jVal.l;
-    
+
     jthr = invokeMethod(env, &jVal, INSTANCE, jAddress,
                         JAVA_INETSOCKETADDRESS, "getPort", "()I");
     if (jthr) {
@@ -327,7 +337,7 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
         goto error_dlr_addr;
     }
     *port = jVal.i;
-    
+
     jthr = invokeMethod(env, &jVal, INSTANCE, jAddress, JAVA_INETSOCKETADDRESS,
                         "getHostName", "()Ljava/lang/String;");
     if (jthr) {
@@ -339,12 +349,12 @@ int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
     host = (*env)->GetStringUTFChars(env, jVal.l, NULL);
     *hostName = strdup(host);
     (*env)->ReleaseStringUTFChars(env, jVal.l, host);
-    
+
 error_dlr_addr:
     (*env)->DeleteLocalRef(env, jAddress);
 error_dlr_nn:
     (*env)->DeleteLocalRef(env, jNameNode);
-    
+
     return ret;
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h
index ce8b1cf..628180f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/native_mini_dfs.h
@@ -26,7 +26,7 @@ extern  "C" {
 #endif
 
 struct hdfsBuilder;
-struct NativeMiniDfsCluster; 
+struct NativeMiniDfsCluster;
 
 /**
  * Represents a configuration to use for creating a Native MiniDFSCluster
@@ -51,6 +51,11 @@ struct NativeMiniDfsConf {
      * Nonzero if we should configure short circuit.
      */
     jboolean configureShortCircuit;
+
+    /**
+     * The number of datanodes in MiniDfsCluster
+     */
+    jint numDataNodes;
 };
 
 /**
@@ -96,13 +101,13 @@ void nmdFree(struct NativeMiniDfsCluster* cl);
  *
  * @return          the port, or a negative error code
  */
-int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); 
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
 
 /**
  * Get the http address that's in use by the given (non-HA) nativeMiniDfs
  *
  * @param cl        The initialized NativeMiniDfsCluster
- * @param port      Used to capture the http port of the NameNode 
+ * @param port      Used to capture the http port of the NameNode
  *                  of the NativeMiniDfsCluster
  * @param hostName  Used to capture the http hostname of the NameNode
  *                  of the NativeMiniDfsCluster

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c
new file mode 100644
index 0000000..0d01e44
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_mini_stress.c
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "expect.h"
+#include "hdfs/hdfs.h"
+#include "hdfspp/hdfs_ext.h"
+#include "native_mini_dfs.h"
+#include "os/thread.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define TO_STR_HELPER(X) #X
+#define TO_STR(X) TO_STR_HELPER(X)
+
+#define TLH_MAX_THREADS 10000
+
+#define TLH_MAX_DNS 16
+
+#define TLH_DEFAULT_BLOCK_SIZE 1048576
+
+#define TLH_DEFAULT_DFS_REPLICATION 3
+
+#define TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES 100
+
+#define TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS 5
+
+#ifndef RANDOM_ERROR_RATIO
+#define RANDOM_ERROR_RATIO 1000000000
+#endif
+
+struct tlhThreadInfo {
+  /** Thread index */
+  int threadIdx;
+  /** 0 = thread was successful; error code otherwise */
+  int success;
+  /** thread identifier */
+  thread theThread;
+  /** fs, shared with other threads **/
+  hdfsFS hdfs;
+  /** Filename */
+  const char *fileNm;
+
+};
+
+static int hdfsNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs,
+                               const char *username)
+{
+  int ret;
+  tPort port;
+  hdfsFS hdfs;
+  struct hdfsBuilder *bld;
+
+  port = (tPort)nmdGetNameNodePort(cl);
+  if (port < 0) {
+    fprintf(stderr, "hdfsNameNodeConnect: nmdGetNameNodePort "
+            "returned error %d\n", port);
+    return port;
+  }
+  bld = hdfsNewBuilder();
+  if (!bld)
+    return -ENOMEM;
+  hdfsBuilderSetForceNewInstance(bld);
+  hdfsBuilderSetNameNode(bld, "localhost");
+  hdfsBuilderSetNameNodePort(bld, port);
+  hdfsBuilderConfSetStr(bld, "dfs.block.size",
+                        TO_STR(TLH_DEFAULT_BLOCK_SIZE));
+  hdfsBuilderConfSetStr(bld, "dfs.blocksize",
+                        TO_STR(TLH_DEFAULT_BLOCK_SIZE));
+  hdfsBuilderConfSetStr(bld, "dfs.replication",
+                        TO_STR(TLH_DEFAULT_DFS_REPLICATION));
+  hdfsBuilderConfSetStr(bld, "ipc.client.connect.max.retries",
+                        TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES));
+  hdfsBuilderConfSetStr(bld, "ipc.client.connect.retry.interval",
+                        TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS));
+  if (username) {
+    hdfsBuilderSetUserName(bld, username);
+  }
+  hdfs = hdfsBuilderConnect(bld);
+  if (!hdfs) {
+    ret = -errno;
+    return ret;
+  }
+  *fs = hdfs;
+  return 0;
+}
+
+static int hdfsWriteData(hdfsFS hdfs, const char *dirNm,
+                         const char *fileNm, tSize fileSz)
+{
+  hdfsFile file;
+  int ret, expected;
+  const char *content;
+
+  content = fileNm;
+
+  if (hdfsExists(hdfs, dirNm) == 0) {
+    EXPECT_ZERO(hdfsDelete(hdfs, dirNm, 1));
+  }
+  EXPECT_ZERO(hdfsCreateDirectory(hdfs, dirNm));
+
+  file = hdfsOpenFile(hdfs, fileNm, O_WRONLY, 0, 0, 0);
+  EXPECT_NONNULL(file);
+
+  expected = (int)strlen(content);
+  tSize sz = 0;
+  while (sz < fileSz) {
+    ret = hdfsWrite(hdfs, file, content, expected);
+    if (ret < 0) {
+      ret = errno;
+      fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret);
+      return ret;
+    }
+    if (ret != expected) {
+      fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but "
+              "it wrote %d\n", ret, expected);
+      return EIO;
+    }
+    sz += ret;
+  }
+  EXPECT_ZERO(hdfsFlush(hdfs, file));
+  EXPECT_ZERO(hdfsHSync(hdfs, file));
+  EXPECT_ZERO(hdfsCloseFile(hdfs, file));
+  return 0;
+}
+
+static int fileEventCallback1(const char * event, const char * cluster, const char * file,
int64_t value, int64_t cookie)
+{
+  char * randomErrRatioStr = getenv("RANDOM_ERROR_RATIO");
+  int64_t randomErrRatio = RANDOM_ERROR_RATIO;
+  if (randomErrRatioStr) randomErrRatio = (int64_t)atoi(randomErrRatioStr);
+  if (randomErrRatio == 0) return DEBUG_SIMULATE_ERROR;
+  else if (randomErrRatio < 0) return LIBHDFSPP_EVENT_OK;
+  return random() % randomErrRatio == 0 ? DEBUG_SIMULATE_ERROR : LIBHDFSPP_EVENT_OK;
+}
+
+static int fileEventCallback2(const char * event, const char * cluster, const char * file,
int64_t value, int64_t cookie)
+{
+  /* no op */
+  return LIBHDFSPP_EVENT_OK;
+}
+
+static int doTestHdfsMiniStress(struct tlhThreadInfo *ti, int randomErr)
+{
+  char tmp[4096];
+  hdfsFile file;
+  int ret, expected;
+  hdfsFileInfo *fileInfo;
+  uint64_t readOps, nErrs=0;
+  tOffset seekPos;
+  const char *content;
+
+  content = ti->fileNm;
+  expected = (int)strlen(content);
+
+  fileInfo = hdfsGetPathInfo(ti->hdfs, ti->fileNm);
+  EXPECT_NONNULL(fileInfo);
+
+  file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0);
+  EXPECT_NONNULL(file);
+
+  libhdfspp_file_event_callback callback = (randomErr != 0) ? &fileEventCallback1 : &fileEventCallback2;
+
+  hdfsPreAttachFileMonitor(callback, 0);
+
+  fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting read loop\n",
+          ti->threadIdx);
+  for (readOps=0; readOps < 1000; ++readOps) {
+    EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file));
+    file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0);
+    EXPECT_NONNULL(file);
+    seekPos = (((double)random()) / RAND_MAX) * (fileInfo->mSize - expected);
+    seekPos = (seekPos / expected) * expected;
+    ret = hdfsSeek(ti->hdfs, file, seekPos);
+    if (ret < 0) {
+      ret = errno;
+      fprintf(stderr, "hdfsSeek to %"PRIu64" failed and set"
+              " errno %d\n", seekPos, ret);
+      ++nErrs;
+      continue;
+    }
+    ret = hdfsRead(ti->hdfs, file, tmp, expected);
+    if (ret < 0) {
+      ret = errno;
+      fprintf(stderr, "hdfsRead failed and set errno %d\n", ret);
+      ++nErrs;
+      continue;
+    }
+    if (ret != expected) {
+      fprintf(stderr, "hdfsRead was supposed to read %d bytes, but "
+              "it read %d\n", ret, expected);
+      ++nErrs;
+      continue;
+    }
+    ret = memcmp(content, tmp, expected);
+    if (ret) {
+      fprintf(stderr, "hdfsRead result (%.*s) does not match expected (%.*s)",
+              expected, tmp, expected, content);
+      ++nErrs;
+      continue;
+    }
+  }
+  EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file));
+  fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): finished read loop\n",
+          ti->threadIdx);
+  EXPECT_ZERO(nErrs);
+  return 0;
+}
+
+static int testHdfsMiniStressImpl(struct tlhThreadInfo *ti)
+{
+  fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting\n",
+          ti->threadIdx);
+  EXPECT_NONNULL(ti->hdfs);
+  EXPECT_ZERO(doTestHdfsMiniStress(ti, 1));
+  EXPECT_ZERO(doTestHdfsMiniStress(ti, 0));
+  return 0;
+}
+
+static void testHdfsMiniStress(void *v)
+{
+  struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
+  int ret = testHdfsMiniStressImpl(ti);
+  ti->success = ret;
+}
+
+static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
+{
+  int i, threadsFailed = 0;
+  const char *sep = "";
+
+  for (i = 0; i < tlhNumThreads; i++) {
+    if (ti[i].success != 0) {
+      threadsFailed = 1;
+    }
+  }
+  if (!threadsFailed) {
+    fprintf(stderr, "testLibHdfsMiniStress: all threads succeeded.  SUCCESS.\n");
+    return EXIT_SUCCESS;
+  }
+  fprintf(stderr, "testLibHdfsMiniStress: some threads failed: [");
+  for (i = 0; i < tlhNumThreads; i++) {
+    if (ti[i].success != 0) {
+      fprintf(stderr, "%s%d", sep, i);
+      sep = ", ";
+    }
+  }
+  fprintf(stderr, "].  FAILURE.\n");
+  return EXIT_FAILURE;
+}
+
+/**
+ * Test intended to stress libhdfs client with concurrent requests. Currently focused
+ * on concurrent reads.
+ */
+int main(void)
+{
+  int i, tlhNumThreads;
+  char *dirNm, *fileNm;
+  tSize fileSz;
+  const char *tlhNumThreadsStr, *tlhNumDNsStr;
+  hdfsFS hdfs = NULL;
+  struct NativeMiniDfsCluster* tlhCluster;
+  struct tlhThreadInfo ti[TLH_MAX_THREADS];
+  struct NativeMiniDfsConf conf = {
+      1, /* doFormat */
+  };
+
+  dirNm = "/tlhMiniStressData";
+  fileNm = "/tlhMiniStressData/file";
+  fileSz = 2*1024*1024;
+
+  tlhNumDNsStr = getenv("TLH_NUM_DNS");
+  if (!tlhNumDNsStr) {
+    tlhNumDNsStr = "1";
+  }
+  conf.numDataNodes = atoi(tlhNumDNsStr);
+  if ((conf.numDataNodes <= 0) || (conf.numDataNodes > TLH_MAX_DNS)) {
+    fprintf(stderr, "testLibHdfsMiniStress: must have a number of datanodes "
+            "between 1 and %d inclusive, not %d\n",
+            TLH_MAX_DNS, conf.numDataNodes);
+    return EXIT_FAILURE;
+  }
+
+  tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
+  if (!tlhNumThreadsStr) {
+    tlhNumThreadsStr = "8";
+  }
+  tlhNumThreads = atoi(tlhNumThreadsStr);
+  if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
+    fprintf(stderr, "testLibHdfsMiniStress: must have a number of threads "
+            "between 1 and %d inclusive, not %d\n",
+            TLH_MAX_THREADS, tlhNumThreads);
+    return EXIT_FAILURE;
+  }
+  memset(&ti[0], 0, sizeof(ti));
+  for (i = 0; i < tlhNumThreads; i++) {
+    ti[i].threadIdx = i;
+  }
+
+  tlhCluster = nmdCreate(&conf);
+  EXPECT_NONNULL(tlhCluster);
+  EXPECT_ZERO(nmdWaitClusterUp(tlhCluster));
+
+  EXPECT_ZERO(hdfsNameNodeConnect(tlhCluster, &hdfs, NULL));
+
+  // Single threaded writes for now.
+  EXPECT_ZERO(hdfsWriteData(hdfs, dirNm, fileNm, fileSz));
+
+  // Multi-threaded reads.
+  for (i = 0; i < tlhNumThreads; i++) {
+    ti[i].theThread.start = testHdfsMiniStress;
+    ti[i].theThread.arg = &ti[i];
+    ti[i].hdfs = hdfs;
+    ti[i].fileNm = fileNm;
+    EXPECT_ZERO(threadCreate(&ti[i].theThread));
+  }
+  for (i = 0; i < tlhNumThreads; i++) {
+    EXPECT_ZERO(threadJoin(&ti[i].theThread));
+  }
+
+  EXPECT_ZERO(hdfsDisconnect(hdfs));
+  EXPECT_ZERO(nmdShutdown(tlhCluster));
+  nmdFree(tlhCluster);
+  return checkFailures(ti, tlhNumThreads);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
index 82109fd..43187a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/events.h
@@ -48,10 +48,8 @@ public:
 enum event_response_type {
   kOk = 0,
 
-#ifndef NDEBUG
   // Responses to be used in testing only
   kTest_Error = 100
-#endif
 };
 
 
@@ -70,10 +68,9 @@ private:
 //
 //   Testing support
 //
-// If running a debug build, the consumer can stimulate errors
+// The consumer can stimulate errors
 // within libhdfdspp by returning a Status from the callback.
 ///////////////////////////////////////////////
-#ifndef NDEBUG
 public:
   static event_response test_err(const Status &status) {
     return event_response(status);
@@ -86,7 +83,6 @@ private:
     response_(event_response_type::kTest_Error), error_status_(status) {}
 
   Status error_status_; // To be used with kTest_Error
-#endif
 };
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
index af7393f..6ec3a4b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h
@@ -235,9 +235,7 @@ extern const char * FILE_DN_WRITE_EVENT;
 
 
 #define LIBHDFSPP_EVENT_OK (0)
-#ifndef NDEBUG
-  #define DEBUG_SIMULATE_ERROR (-1)
-#endif
+#define DEBUG_SIMULATE_ERROR (-1)
 
 typedef int (*libhdfspp_fs_event_callback)(const char * event, const char * cluster,
                                            int64_t value, int64_t cookie);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 04065b2..a42feae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -331,6 +331,9 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
       Error(stat);
       return nullptr;
     }
+    if (f && fileEventCallback) {
+      f->SetFileEventCallback(fileEventCallback.value());
+    }
     return new hdfsFile_internal(f);
   } catch (const std::exception & e) {
     ReportException(e);
@@ -959,7 +962,7 @@ event_response fs_callback_glue(libhdfspp_fs_event_callback handler,
   if (result == LIBHDFSPP_EVENT_OK) {
     return event_response::ok();
   }
-#ifndef NDEBUG
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
   if (result == DEBUG_SIMULATE_ERROR) {
     return event_response::test_err(Status::Error("Simulated error"));
   }
@@ -978,7 +981,7 @@ event_response file_callback_glue(libhdfspp_file_event_callback handler,
   if (result == LIBHDFSPP_EVENT_OK) {
     return event_response::ok();
   }
-#ifndef NDEBUG
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
   if (result == DEBUG_SIMULATE_ERROR) {
     return event_response::test_err(Status::Error("Simulated error"));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
index 38d50f6..df147d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
@@ -239,7 +239,7 @@ void FileHandleImpl::AsyncPreadSome(
   // Wrap the DN in a block reader to handle the state and logic of the
   //    block request protocol
   std::shared_ptr<BlockReader> reader;
-  reader = CreateBlockReader(BlockReaderOptions(), dn);
+  reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_);
 
   // Lambdas cannot capture copies of member variables so we'll make explicit
   //    copies for it
@@ -248,8 +248,8 @@ void FileHandleImpl::AsyncPreadSome(
   auto cluster_name = cluster_name_;
 
   auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const
Status & status, size_t transferred) {
-    auto event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(),
transferred);
-#ifndef NDEBUG
+  event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(),
path.c_str(), transferred);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
     if (event_resp.response() == event_response::kTest_Error) {
       handler(event_resp.status(), dn_id, transferred);
       return;
@@ -262,8 +262,8 @@ void FileHandleImpl::AsyncPreadSome(
   auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block,
buffers, reader, dn_id, client_name]
           (Status status, std::shared_ptr<DataNodeConnection> dn) {
     (void)dn;
-    auto event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(),
path.c_str(), 0);
-#ifndef NDEBUG
+    event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(),
path.c_str(), 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
     if (event_resp.response() == event_response::kTest_Error) {
       status = event_resp.status();
     }
@@ -284,9 +284,10 @@ void FileHandleImpl::AsyncPreadSome(
 }
 
 std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions
&options,
-                                               std::shared_ptr<DataNodeConnection>
dn)
+                                                               std::shared_ptr<DataNodeConnection>
dn,
+                                                               std::shared_ptr<LibhdfsEvents>
event_handlers)
 {
-  std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options,
dn, cancel_state_);
+  std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options,
dn, cancel_state_, event_handlers);
 
   LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR
                          << ", ..., dnconn=" << dn.get()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
index a99550a..57cf4b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
@@ -119,7 +119,8 @@ public:
 
 protected:
   virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
-                                                 std::shared_ptr<DataNodeConnection>
dn);
+                                                         std::shared_ptr<DataNodeConnection>
dn,
+                                                         std::shared_ptr<hdfs::LibhdfsEvents>
event_handlers);
   virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
       ::asio::io_service *io_service,
       const ::hadoop::hdfs::DatanodeInfoProto & dn,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
index 5052951..6098b9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
@@ -24,7 +24,6 @@
 
 #include <future>
 
-
 namespace hdfs {
 
 #define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" <<
(void*)parent_
@@ -105,7 +104,17 @@ void BlockReaderImpl::AsyncRequestBlock(
   m->Run([this, handler, offset](const Status &status, const State &s) {    Status
stat = status;
     if (stat.ok()) {
       const auto &resp = s.response;
-      if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
+
+    if(this->event_handlers_) {
+      event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "",
"", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+      if (stat.ok() && event_resp.response() == event_response::kTest_Error) {
+        stat = Status::Error("Test error");
+      }
+#endif
+    }
+
+      if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
         if (resp.has_readopchecksuminfo()) {
           const auto &checksum_info = resp.readopchecksuminfo();
           chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
@@ -162,6 +171,14 @@ struct BlockReaderImpl::ReadPacketHeader
         assert(v && "Failed to parse the header");
         parent_->state_ = kReadChecksum;
       }
+      if(parent_->event_handlers_) {
+        event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT,
"", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+        if (status.ok() && event_resp.response() == event_response::kTest_Error)
{
+          status = Status::Error("Test error");
+        }
+#endif
+      }
       next(status);
     };
 
@@ -214,7 +231,7 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
       return;
     }
 
-    auto handler = [parent, next](const asio::error_code &ec, size_t) {
+    auto handler = [parent, next, this](const asio::error_code &ec, size_t) {
       Status status;
       if (ec) {
         status = Status(ec.value(), ec.message().c_str());
@@ -222,6 +239,14 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
         parent->state_ =
             parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
       }
+      if(parent->event_handlers_) {
+        event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT,
"", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+        if (status.ok() && event_resp.response() == event_response::kTest_Error)
{
+          status = Status::Error("Test error");
+        }
+#endif
+      }
       next(status);
     };
     parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
@@ -248,7 +273,6 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
   virtual void Run(const Next &next) override {
     LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run("
                             << FMT_CONT_AND_PARENT_ADDR << ") called");
-
     auto handler =
         [next, this](const asio::error_code &ec, size_t transferred) {
           Status status;
@@ -261,6 +285,14 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
           if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
             parent_->state_ = kReadPacketHeader;
           }
+          if(parent_->event_handlers_) {
+            event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT,
"", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+            if (status.ok() && event_resp.response() == event_response::kTest_Error)
{
+                status = Status::Error("Test error");
+            }
+#endif
+          }
           next(status);
         };
 
@@ -292,13 +324,22 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation {
       return;
     }
 
-    auto h = [next, this](const Status &status) {
+    auto h = [next, this](const Status &stat) {
+      Status status = stat;
       if (status.ok()) {
         assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
                parent_->chunk_padding_bytes_);
         parent_->chunk_padding_bytes_ = 0;
         parent_->state_ = kReadData;
       }
+      if(parent_->event_handlers_) {
+        event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT,
"", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+        if (status.ok() && event_resp.response() == event_response::kTest_Error)
{
+          status = Status::Error("Test error");
+        }
+#endif
+      }
       next(status);
     };
     read_data_->Run(h);
@@ -334,11 +375,20 @@ struct BlockReaderImpl::AckRead : continuation::Continuation {
     m->Push(
         continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
 
-    m->Run([this, next](const Status &status,
+    m->Run([this, next](const Status &stat,
                         const hadoop::hdfs::ClientReadStatusProto &) {
+      Status status = stat;
       if (status.ok()) {
         parent_->state_ = BlockReaderImpl::kFinished;
       }
+      if(parent_->event_handlers_) {
+        event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT,
"", "", 0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+        if (status.ok() && event_resp.response() == event_response::kTest_Error)
{
+          status = Status::Error("Test error");
+        }
+#endif
+      }
       next(status);
     });
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
index f9794b1..b5cbdf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -93,9 +93,9 @@ class BlockReaderImpl
     : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
 public:
   explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection>
dn,
-                           CancelHandle cancel_state)
+                           CancelHandle cancel_state, std::shared_ptr<LibhdfsEvents>
event_handlers=nullptr)
       : dn_(dn), state_(kOpen), options_(options),
-        chunk_padding_bytes_(0), cancel_state_(cancel_state) {}
+        chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get())
{}
 
   virtual void AsyncReadPacket(
     const MutableBuffers &buffers,
@@ -152,6 +152,7 @@ private:
   long long bytes_to_read_;
   std::vector<char> checksum_;
   CancelHandle cancel_state_;
+  LibhdfsEvents* event_handlers_;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
index 8567932..be6d7bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc
@@ -274,9 +274,18 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr<Response>
response) {
   }
 
   Status status;
-  if (h.has_exceptionclassname()) {
+  if(event_handlers_) {
+    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(),
0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
+    if (event_resp.response() == event_response::kTest_Error) {
+      status = event_resp.status();
+    }
+#endif
+  }
+
+  if (status.ok() && h.has_exceptionclassname()) {
     status =
-        Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
+      Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
   }
 
   io_service().post([req, response, status]() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
index 2b47ce1..330f9b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -30,6 +30,8 @@
 #include <asio/read.hpp>
 #include <asio/write.hpp>
 
+#include <system_error>
+
 namespace hdfs {
 
 template <class NextLayer>
@@ -72,8 +74,9 @@ RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
     : RpcConnection(engine),
       options_(engine->options()),
       next_layer_(engine->io_service()),
-      connect_timer_(engine->io_service()) {
-    LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called");
+      connect_timer_(engine->io_service())
+{
+      LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" <<
(void*)this);
 }
 
 template <class NextLayer>
@@ -87,7 +90,6 @@ RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() {
     LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the
requests_on_fly queue");
 }
 
-
 template <class NextLayer>
 void RpcConnectionImpl<NextLayer>::Connect(
     const std::vector<::asio::ip::tcp::endpoint> &server,
@@ -171,8 +173,8 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code
&ec,
 
   Status status = ToStatus(ec);
   if(event_handlers_) {
-    auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(),
0);
-#ifndef NDEBUG
+    event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(),
0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
     if (event_resp.response() == event_response::kTest_Error) {
       status = event_resp.status();
     }
@@ -349,27 +351,28 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
 
 
 template <class NextLayer>
-void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &asio_ec,
+void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &original_ec,
                                                    size_t) {
   using std::placeholders::_1;
   using std::placeholders::_2;
   std::lock_guard<std::mutex> state_lock(connection_state_lock_);
 
+  ::asio::error_code my_ec(original_ec);
+
   LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
 
   std::shared_ptr<RpcConnection> shared_this = shared_from_this();
 
-  ::asio::error_code ec = asio_ec;
   if(event_handlers_) {
-    auto event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
-#ifndef NDEBUG
+    event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(),
0);
+#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
     if (event_resp.response() == event_response::kTest_Error) {
-        ec = std::make_error_code(std::errc::network_down);
+      my_ec = std::make_error_code(std::errc::network_down);
     }
 #endif
   }
 
-  switch (ec.value()) {
+  switch (my_ec.value()) {
     case 0:
       // No errors
       break;
@@ -377,8 +380,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code
&asi
       // The event loop has been shut down. Ignore the error.
       return;
     default:
-      LOG_WARN(kRPC, << "Network error during RPC read: " << ec.message());
-      CommsError(ToStatus(ec));
+      LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
+      CommsError(ToStatus(my_ec));
       return;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index d0365c3..5de7d53 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -230,7 +230,6 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection>
{
   std::shared_ptr<LibhdfsEvents> event_handlers_;
   std::string cluster_name_;
 
-
   // Lock for mutable parts of this class that need to be thread safe
   std::mutex connection_state_lock_;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
index b30afb9..032dfc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -22,6 +22,9 @@ set (LIBHDFS_TESTS_DIR ../../libhdfs-tests)
 set (LIBHDFSPP_SRC_DIR ..)
 set (LIBHDFSPP_LIB_DIR ${LIBHDFSPP_SRC_DIR}/lib)
 set (LIBHDFSPP_BINDING_C ${LIBHDFSPP_LIB_DIR}/bindings/c)
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-missing-field-initializers")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-missing-field-initializers")
+
 include_directories(
     ${GENERATED_JAVAH}
     ${CMAKE_CURRENT_LIST_DIR}
@@ -138,6 +141,10 @@ build_libhdfs_test(hdfspp_mini_dfs_smoke hdfspp_test_shim_static ${CMAKE_CURRENT
 link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common
connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY}
${SASL_LIBRARIES})
 add_libhdfs_test  (hdfspp_mini_dfs_smoke hdfspp_test_shim_static)
 
+build_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static expect.c test_libhdfs_mini_stress.c
${OS_DIR}/thread.c)
+link_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static fs reader rpc proto common
connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY}
${SASL_LIBRARIES})
+add_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static)
+
 build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc)
 link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs
${JAVA_JVM_LIBRARY}  ${SASL_LIBRARIES})
 add_libhdfs_test  (hdfs_ext hdfspp_test_shim_static)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
index 01d723f..9e3aeb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
@@ -93,9 +93,10 @@ public:
   std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>();
 protected:
   std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
-                                                 std::shared_ptr<DataNodeConnection>
dn) override
+                                                 std::shared_ptr<DataNodeConnection>
dn,
+                                                 std::shared_ptr<hdfs::LibhdfsEvents>
event_handlers) override
   {
-    (void) options; (void) dn;
+      (void) options; (void) dn; (void) event_handlers;
     assert(mock_reader_);
     return mock_reader_;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d18e3968/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
index b5f4d9a..defe95d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
@@ -386,7 +386,7 @@ TEST(RpcEngineTest, TestEventCallbacks)
   });
   io_service.run();
   ASSERT_TRUE(complete);
-  ASSERT_EQ(7, callbacks.size());
+  ASSERT_EQ(8, callbacks.size());
   ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
   ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
   ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message