hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiv...@apache.org
Subject incubator-hawq git commit: HAWQ-1152. PXF endIteration function added to Bridge
Date Tue, 15 Nov 2016 20:11:12 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 5fae6af7e -> 1584bb264


HAWQ-1152. PXF endIteration function added to Bridge


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/1584bb26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/1584bb26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/1584bb26

Branch: refs/heads/master
Commit: 1584bb2644fe573ae5a3f9f726ac78c3ae4f7933
Parents: 5fae6af
Author: Shivram Mani <shivram.mani@gmail.com>
Authored: Tue Nov 15 12:11:07 2016 -0800
Committer: Shivram Mani <shivram.mani@gmail.com>
Committed: Tue Nov 15 12:11:07 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/hawq/pxf/service/Bridge.java     |  2 ++
 .../java/org/apache/hawq/pxf/service/ReadBridge.java | 15 ++++++++++++---
 .../apache/hawq/pxf/service/ReadSamplingBridge.java  |  5 +++++
 .../org/apache/hawq/pxf/service/WriteBridge.java     |  8 ++++----
 .../apache/hawq/pxf/service/rest/BridgeResource.java |  5 +++++
 .../hawq/pxf/service/rest/WritableResource.java      |  6 ++++++
 6 files changed, 34 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
index bfd862a..7160c7c 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
@@ -37,4 +37,6 @@ public interface Bridge {
     boolean setNext(DataInputStream inputStream) throws Exception;
 
     boolean isThreadSafe();
+
+    void endIteration() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
index 01a95ab..edd0a99 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
@@ -90,7 +90,6 @@ public class ReadBridge implements Bridge {
             while (outputQueue.isEmpty()) {
                 onerow = fileAccessor.readNextObject();
                 if (onerow == null) {
-                    fileAccessor.closeForRead();
                     output = outputBuilder.getPartialLine();
                     if (output != null) {
                         LOG.warn("A partial record in the end of the fragment");
@@ -110,7 +109,6 @@ public class ReadBridge implements Bridge {
             }
         } catch (IOException ex) {
             if (!isDataException(ex)) {
-                fileAccessor.closeForRead();
                 throw ex;
             }
             output = outputBuilder.getErrorOutput(ex);
@@ -127,13 +125,24 @@ public class ReadBridge implements Bridge {
             }
             output = outputBuilder.getErrorOutput(ex);
         } catch (Exception ex) {
-            fileAccessor.closeForRead();
             throw ex;
         }
 
         return output;
     }
 
+    /**
+     * Close the underlying resource
+     */
+    public void endIteration() throws Exception {
+        try {
+            fileAccessor.closeForRead();
+        } catch (Exception e) {
+            LOG.error("Failed to close bridge resources: " + e.getMessage());
+            throw e;
+        }
+    }
+
     public static ReadAccessor getFileAccessor(InputData inputData)
             throws Exception {
         return (ReadAccessor) Utilities.createAnyInstance(InputData.class,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
index d5ae66a..77c658d 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
@@ -125,6 +125,11 @@ public class ReadSamplingBridge implements Bridge {
     }
 
     @Override
+    public void endIteration() throws Exception {
+        bridge.endIteration();
+    }
+
+    @Override
     public boolean isThreadSafe() {
         return bridge.isThreadSafe();
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
index c3ee731..fe3d274 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
@@ -72,23 +72,23 @@ public class WriteBridge implements Bridge {
 
         List<OneField> record = inputBuilder.makeInput(inputStream);
         if (record == null) {
-            close();
             return false;
         }
 
         OneRow onerow = fieldsResolver.setFields(record);
         if (onerow == null) {
-            close();
             return false;
         }
         if (!fileAccessor.writeNextObject(onerow)) {
-            close();
             throw new BadRecordException();
         }
         return true;
     }
 
-    private void close() throws Exception {
+    /*
+     * Close the underlying resource
+     */
+    public void endIteration() throws Exception {
         try {
             fileAccessor.closeForWrite();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
index 3a062c3..104f353 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
@@ -155,6 +155,11 @@ public class BridgeResource extends RestResource {
                     LOG.debug("Stopped streaming fragment " + fragment
                             + " of resource " + dataDir + ", " + recordCount
                             + " records.");
+                    try {
+                        bridge.endIteration();
+                    } catch (Exception e) {
+                        // ignore ... any significant errors should already have been handled
+                    }
                     if (!threadSafe) {
                         unlock(dataDir);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java
index a6c8d6b..bc45f1a 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java
@@ -163,6 +163,12 @@ public class WritableResource extends RestResource{
         } catch (Exception ex) {
             LOG.debug("totalWritten so far " + totalWritten + " to " + path);
             throw ex;
+        } finally {
+            try {
+                bridge.endIteration();
+            } catch (Exception e) {
+                // ignore ... any significant errors should already have been handled
+            }
         }
 
         String censuredPath = Utilities.maskNonPrintables(path);


Mime
View raw message