manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1544555 - in /manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq: ./ connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/ connector/src/main/resources/org/apache/manifoldcf/agents/output/rabbitmq/
Date Fri, 22 Nov 2013 14:45:54 GMT
Author: kwright
Date: Fri Nov 22 14:45:53 2013
New Revision: 1544555

URL: http://svn.apache.org/r1544555
Log:
Update from Christian Rieck

Modified:
    manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/   (props changed)
    manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/OutboundDocument.java
    manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqOutputConnector.java
    manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/resources/org/apache/manifoldcf/agents/output/rabbitmq/viewConfiguration.html
    manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/pom.xml

Propchange: manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Nov 22 14:45:53 2013
@@ -0,0 +1,2 @@
+.classpath
+.project

Modified: manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/OutboundDocument.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/OutboundDocument.java?rev=1544555&r1=1544554&r2=1544555&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/OutboundDocument.java
(original)
+++ manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/OutboundDocument.java
Fri Nov 22 14:45:53 2013
@@ -17,19 +17,14 @@
 
 package org.apache.manifoldcf.agents.output.rabbitmq;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
 import java.io.Writer;
 import java.util.Iterator;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterOutputStream;
 import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
 import org.apache.manifoldcf.core.common.Base64;
 import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
-import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
@@ -49,6 +44,10 @@ public class OutboundDocument {
 		this.inputStream = document.getBinaryStream();
 	}
 
+        public OutboundDocument(String documentUri) {
+            this.documentURI = documentUri;
+        }
+        
 	public OutboundDocument() {
 	}
 
@@ -64,131 +63,46 @@ public class OutboundDocument {
             this.operation = operation;
         }
         
+        
+        // TODO: write to Logstash format, or support 
+        // a range of inputs.
 	public String writeTo(Writer out) throws JSONException, IOException,
 			ManifoldCFException {
 		JSONObject json = new JSONObject();
 
 		json.put("documentUri", this.documentURI);
-		json.put("acl", this.document.getACL());
-		json.put("acl_deny", this.document.getDenyACL());
-		json.put("acl_share", this.document.getShareACL());
-		json.put("acl_share_deny", this.document.getShareDenyACL());
+                json.put("operation", this.operation);
                 
+                if (operation != Operation.DELETE) {
+                    json.put("acl", this.document.getACL());
+                    json.put("acl_deny", this.document.getDenyACL());
+                    json.put("acl_share", this.document.getShareACL());
+                    json.put("acl_share_deny", this.document.getShareDenyACL());
                 
-
-		JSONObject fields = new JSONObject();
-		json.put("fields", fields);
-		Iterator i = this.document.getFields();
-		while (i.hasNext()) {
-			String fieldName = (String) i.next();
-			String[] fieldValues = this.document.getFieldAsStrings(fieldName);
-			fields.put(fieldName, fieldValues);
-		}
-
-		Base64 base64 = new Base64();
-		StringWriter outputWriter = new StringWriter();
-		base64.encodeStream(this.inputStream, outputWriter);
-
-		JSONObject file = new JSONObject();
-		file.put("content", outputWriter.toString());
-		file.put("name", this.document.getFileName());
+                    JSONObject fields = new JSONObject();
+                    json.put("fields", fields);
+                    Iterator i = this.document.getFields();
+                    while (i.hasNext()) {
+                            String fieldName = (String) i.next();
+                            String[] fieldValues = this.document.getFieldAsStrings(fieldName);
+                            fields.put(fieldName, fieldValues);
+                    }
+
+                    Base64 base64 = new Base64();
+                    StringWriter outputWriter = new StringWriter();
+                    // TODO: We can not, in general, assume we can 
+                    // fit the entire stream in memory. 
+                    base64.encodeStream(this.inputStream, outputWriter);
+
+                    
+                    JSONObject file = new JSONObject();
+                    file.put("content", outputWriter.toString());
+                    file.put("name", this.document.getFileName());
+                    outputWriter.close();
+                    json.put("file", file);
+                }
                 
-		outputWriter.close();
-		json.put("file", file);
                 json.write(out);
-                
                 return json.toString();
-                
-	}
-
-	public void loadString(String content) throws JSONException,
-			ManifoldCFException, IOException {
-		JSONObject json = new JSONObject(content);
-
-		this.document = new RepositoryDocument();
-		if(json.has("file")) {
-			JSONObject file = json.getJSONObject("file");
-			this.document.setFileName(file.getString("name"));
-			Base64 base64 = new Base64();
-			String fileContentB64 = file.getString("content");
-			byte[] bytes = base64.decodeString(fileContentB64);
-			InputStream is = decompress(new ByteArrayInputStream(bytes));
-			this.inputStream = is;
-			int fileBytes = bytes.length;
-			this.document.setBinary(is, fileBytes);
-		}
-        else {
-            this.document.setBinary(new ByteArrayInputStream(new byte[0]), 0);
-        }
-
-		this.documentURI = json.getString("documentUri");
-
-        if(json.has("fields")) {
-            JSONObject fields = json.getJSONObject("fields");
-            Iterator i = fields.keys();
-            while (i.hasNext()) {
-                String fieldName = (String) i.next();
-                JSONArray fieldValues = fields.getJSONArray(fieldName);
-                String[] values = toArray(fieldValues);
-                this.document.addField(fieldName, values);
-            }
-        }
-
-		if(json.has("acl"))
-			this.document.setACL(toArray(json.getJSONArray("acl")));
-		if(json.has("acl_deny"))
-				this.document.setDenyACL(toArray(json.getJSONArray("acl_deny")));
-		if(json.has("acl_share"))
-				this.document.setShareACL(toArray(json.getJSONArray("acl_share")));
-		if(json.has("acl_share_deny"))
-				this.document.setShareDenyACL(toArray(json
-				.getJSONArray("acl_share_deny")));
-	}
-
-	private String[] toArray(JSONArray array) throws JSONException {
-		if (array == null) {
-			return null;
-		}
-		String[] values = new String[array.length()];
-		for (int j = 0; j < array.length(); j++) {
-			values[j] = array.getString(j);
-		}
-		return values;
-	}
-
-	private static InputStream compress(InputStream inputStream)
-			throws IOException {
-		ByteArrayOutputStream out = new ByteArrayOutputStream();
-		DeflaterOutputStream compressor = new DeflaterOutputStream(out);
-		long byteCount = 0L;
-		byte[] buf = new byte[1024];
-		int read;
-		while ((read = inputStream.read(buf)) != -1) {
-			compressor.write(buf, 0, read);
-			byteCount += read;
-		}
-		inputStream.close();
-		out.close();
-		compressor.close();
-		byte[] outBytes = out.toByteArray();
-		return new ByteArrayInputStream(outBytes);
-	}
-
-	private static InputStream decompress(InputStream inputStream)
-			throws IOException {
-		ByteArrayOutputStream out = new ByteArrayOutputStream();
-		InflaterOutputStream compressor = new InflaterOutputStream(out);
-		long byteCount = 0L;
-		byte[] buf = new byte[1024];
-		int read;
-		while ((read = inputStream.read(buf)) != -1) {
-			compressor.write(buf, 0, read);
-			byteCount += read;
-		}
-		inputStream.close();
-		out.close();
-		compressor.close();
-		byte[] outBytes = out.toByteArray();
-		return new ByteArrayInputStream(outBytes);
-	}
+         }
 }

Modified: manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqOutputConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqOutputConnector.java?rev=1544555&r1=1544554&r2=1544555&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqOutputConnector.java
(original)
+++ manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqOutputConnector.java
Fri Nov 22 14:45:53 2013
@@ -102,7 +102,7 @@ public class RabbitmqOutputConnector ext
     @Override
     public boolean checkDocumentIndexable(String outputDescription,
             File localFile) throws ManifoldCFException, ServiceInterruption {
-        System.out.println(outputDescription);
+       // System.out.println(outputDescription);
                 
         return true;
     }
@@ -143,10 +143,7 @@ public class RabbitmqOutputConnector ext
             IPostParameters variableContext, ConfigParams parameters)
             throws ManifoldCFException {
     	// called each time the tab changes when creating a new output connector, at least
-        System.out.println("ProcessConfigurationPost called.");
-        System.out.println("ConfigParams.size() = " + parameters.getChildCount());
         RabbitmqConfig.contextToConfig(variableContext, parameters);
-        System.out.println("ConfigParams.size() = " + parameters.getChildCount());
         return null;
     }
 
@@ -171,7 +168,25 @@ public class RabbitmqOutputConnector ext
     @Override
     public void removeDocument(String documentURI, String outputDescription, IOutputRemoveActivity
activities)
             throws ManifoldCFException, ServiceInterruption {
-        // Does nothing in the base class
+        if (Logging.connectors.isDebugEnabled()) {
+            Logging.connectors.debug("Deleting document: " + documentURI);
+                }
+            connectToRabbitInstance();
+             
+            OutboundDocument rawDocument = new OutboundDocument(documentURI);
+            rawDocument.operation = OutboundDocument.Operation.DELETE;
+            try {
+            String bindingName = "";
+            String json = jsonSerialize(rawDocument);
+            byte[] bytes = json.getBytes();
+            channel.basicPublish(bindingName, rabbitconfig.getQueueName(), MessageProperties.PERSISTENT_BASIC,
bytes);
+            activities.recordActivity(null, "deletion message sent", 0l, documentURI, "OK",
"Deletion message sendt");
+        } catch (Exception e) {
+            Logging.connectors.error(
+                    "Failed to push to rabbitmq (" + rabbitconfig.getHost() + "): ", e);
+            activities.recordActivity(null, "Failed to push to rabbitmq", 0l, documentURI,
"ERROR", e.getMessage());
+        }
+        
     }
 
     @Override
@@ -182,9 +197,17 @@ public class RabbitmqOutputConnector ext
         if (Logging.connectors.isDebugEnabled()) {
             Logging.connectors.debug("New document: " + documentURI);
         }
+        connectToRabbitInstance();
+        if (sendDocument(document, activities, documentURI)) {
+            return 1;
+        }
 
-        factory.setHost(rabbitconfig.getHost());
+        return 0;
+    }
 
+    private void connectToRabbitInstance() throws ManifoldCFException {
+        factory.setHost(rabbitconfig.getHost());
+        
         try {
             if (connection == null || !connection.isOpen()) {
                 connection = factory.newConnection();
@@ -192,33 +215,34 @@ public class RabbitmqOutputConnector ext
             if (channel == null || !connection.isOpen()) {
                 channel = connection.createChannel();
             }
-            
+            // TODO: problems with shutting down client correctly.
+            //  com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason:
Attempt to use closed channel
             Map<java.lang.String,java.lang.Object> arguments = null;
-            channel.queueDeclare(rabbitconfig.getQueueName(), 
-                    rabbitconfig.isDurable(), 
-                    rabbitconfig.isExclusive(), 
+            channel.queueDeclare(rabbitconfig.getQueueName(),
+                    rabbitconfig.isDurable(),
+                    rabbitconfig.isExclusive(),
                     rabbitconfig.isAutoDelete(), 
                     arguments);
 
-        } catch (Exception e1) {
-            Logging.connectors.error("Failed to initialize connections to rabbitmq, "+ e1.getMessage());
-            throw new ManifoldCFException("Failed to initialize connections to rabbitmq",
e1);
+        } catch (IOException e1) {
+            Logging.connectors.error("Failed to initialize connection to rabbitmq, "+ e1.getMessage());
+            // TODO Log to activities? 
+            throw new ManifoldCFException("Failed to initialize connection to rabbitmq",
e1);
         }
+    }
 
+    private boolean sendDocument(RepositoryDocument document, IOutputAddActivity activities,
String documentURI) {
         try {
             String bindingName = "";
-            byte[] bytes = writeFile(document);
+            byte[] bytes = convertToRabbitDocument(document);
             channel.basicPublish(bindingName, rabbitconfig.getQueueName(), MessageProperties.PERSISTENT_BASIC,
bytes);
-
-            activities.recordActivity(null, "document ingest", new Long(
-                    document.getBinaryLength()), documentURI, "OK", null);
+            activities.recordActivity(null, "document ingest", new Long(document.getBinaryLength()),
documentURI, "OK", null);
         } catch (Exception e) {
             Logging.connectors.error(
                     "Failed to push to rabbitmq (" + rabbitconfig.getHost() + "): ", e);
-            return 1;
+            return true;
         }
-
-        return 0;
+        return false;
     }
 
     @Override
@@ -255,26 +279,30 @@ public class RabbitmqOutputConnector ext
   public boolean checkLengthIndexable(String outputDescription, long length)
     throws ManifoldCFException, ServiceInterruption
   {
-      Logging.connectors.error("Document length:, "+ length);
+      // TODO: inspect outputDescription to parse of length, then check. 
+      // Logging.connectors.error("Document length:, "+ length + " vs " + outputDescription);
     return true;
   }
     
-    private byte[] writeFile(RepositoryDocument document) throws IOException, JSONException,
+    private byte[] convertToRabbitDocument(RepositoryDocument document) throws IOException,
JSONException,
             ManifoldCFException {
         if (Logging.connectors.isDebugEnabled()) {
             Logging.connectors.debug("Atempting to serialize " + document.getFileName());
         }
         OutboundDocument rawDocument = new OutboundDocument(document);
-        byte[] bytes;
+        String json = jsonSerialize(rawDocument);
+        return json.getBytes();
+    }
+    
+    private String jsonSerialize(OutboundDocument outbound)  throws IOException, JSONException,
+            ManifoldCFException{
 
         StringWriter sw = new StringWriter();
         BufferedWriter out = new BufferedWriter(sw);
-        String jsons = rawDocument.writeTo(out);
-        
-        bytes = jsons.getBytes();
+        String jsons = outbound.writeTo(out);
         
         out.close();
         sw.close();
-        return bytes;
+        return jsons;
     }
 }

Modified: manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/resources/org/apache/manifoldcf/agents/output/rabbitmq/viewConfiguration.html
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/resources/org/apache/manifoldcf/agents/output/rabbitmq/viewConfiguration.html?rev=1544555&r1=1544554&r2=1544555&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/resources/org/apache/manifoldcf/agents/output/rabbitmq/viewConfiguration.html
(original)
+++ manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/connector/src/main/resources/org/apache/manifoldcf/agents/output/rabbitmq/viewConfiguration.html
Fri Nov 22 14:45:53 2013
@@ -15,18 +15,28 @@
  limitations under the License.
 -->
 
+RabbitmqConnector.HostTabName=RabbitMQ
+RabbitmqConnector.HostLabel=Host:
+RabbitmqConnector.QueueLabel=Queue:
+RabbitmqConnector.DurableLabel=Durable:
+RabbitmqConnector.AutodeleteLabel=Auto Delete:
+RabbitmqConnector.ExclusiveLabel=Exclusive:
+RabbitmqConnector.HostCannotBeNull=Host cannot be null
+RabbitmqConnector.QueueNameCannotBeNull=Queue name cannot be null
+
+
 <table class="displaytable">
   <tr>
-    <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('ElasticSearchConnector.ServerLocation'))</nobr>
+    <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('RabbitmqConnector.HostLabel'))</nobr>
     $Encoder.bodyEscape($ResourceBundle.getString('ElasticSearchConnector.URLColon'))</td>
-    <td class="value">$Encoder.bodyEscape($SERVERLOCATION)</td>
+    <td class="value">$Encoder.bodyEscape($HOST)</td>
   </tr>
     <tr>
-    <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('ElasticSearchConnector.IndexNameColon'))</nobr></td>
-    <td class="value">$Encoder.bodyEscape($INDEXNAME)</td>
+    <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('RabbitmqConnector.QueueLabel'))</nobr></td>
+    <td class="value">$Encoder.bodyEscape($QUEUENAME)</td>
   </tr>
   <tr>
-    <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('ElasticSearchConnector.IndexTypeColon'))</nobr></td>
-    <td class="value">$Encoder.bodyEscape($INDEXTYPE)</td>
+    <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('RabbitmqConnector.DurableLabel'))</nobr></td>
+    <td class="value">$Encoder.bodyEscape($DURABLE)</td>
   </tr>
 </table>
\ No newline at end of file

Modified: manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/pom.xml
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/pom.xml?rev=1544555&r1=1544554&r2=1544555&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/pom.xml (original)
+++ manifoldcf/branches/CONNECTORS-818/connectors/rabbitmq/pom.xml Fri Nov 22 14:45:53 2013
@@ -43,6 +43,9 @@
       <resource>
         <directory>${basedir}/connector/src/main/resources</directory>
       </resource>
+            <resource>
+        <directory>${basedir}/connector/src/main/native2ascii</directory>
+      </resource>
     </resources>
     <plugins>
       <plugin>



Mime
View raw message