usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [03/10] usergrid git commit: Ensure that status is updated properly.
Date Fri, 30 Oct 2015 19:34:17 GMT
Ensure that status is updated properly.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c703c0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c703c0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c703c0

Branch: refs/heads/2.1-release
Commit: f8c703c02c1182ad63ad86587749eb1ae09c202a
Parents: 471dc35
Author: Dave Johnson <snoopdave@apache.org>
Authored: Thu Oct 29 17:57:32 2015 -0400
Committer: Dave Johnson <snoopdave@apache.org>
Committed: Thu Oct 29 17:57:32 2015 -0400

----------------------------------------------------------------------
 .../rest/system/ConnectionResource.java         | 48 +++++++++++++-------
 1 file changed, 32 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c703c0/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
index 6e683ed..14b79f3 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/ConnectionResource.java
@@ -55,6 +55,7 @@ import com.google.common.base.Preconditions;
 import com.sun.jersey.api.json.JSONWithPadding;
 
 import rx.Observable;
+import rx.functions.Action1;
 import rx.schedulers.Schedulers;
 
 
@@ -146,36 +147,51 @@ public class ConnectionResource extends AbstractContextResource {
 
         //start de duping and run in the background
         connectionService.deDupeConnections( applicationScopeObservable ).buffer( 10, TimeUnit.SECONDS,
1000 )
-                         .doOnNext( buffer -> {
+                         .doOnNext(buffer -> {
 
 
-                             final long runningTotal = count.addAndGet( buffer.size() );
+                             final long runningTotal = count.addAndGet(buffer.size());
 
                              final Map<String, Object> status = new HashMap<String,
Object>() {{
-                                 put( "countProcessed", runningTotal );
-                                 put( "updatedTimestamp", System.currentTimeMillis() );
+                                 put("countProcessed", runningTotal);
+                                 put("updatedTimestamp", System.currentTimeMillis());
                              }};
 
-                             statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID,
jobId,
-                                 StatusService.Status.INPROGRESS, status ).toBlocking().lastOrDefault(
null );
-                         } ).doOnSubscribe( () -> {
-            statusService.setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, StatusService.Status.STARTED,
-                new HashMap<>() ).toBlocking().lastOrDefault( null );
-        } ).doOnCompleted( () -> {
+                             statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID,
jobId,
+                                 StatusService.Status.INPROGRESS, status).toBlocking().lastOrDefault(null);
+                         }).doOnSubscribe(() -> {
+
+            statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID,
+                jobId, StatusService.Status.STARTED, new HashMap<>()).toBlocking().lastOrDefault(null);
+
+        }).doOnCompleted(() -> {
 
             final long runningTotal = count.get();
 
             final Map<String, Object> status = new HashMap<String, Object>()
{{
-                put( "countProcessed", runningTotal );
-                put( "updatedTimestamp", System.currentTimeMillis() );
+                put("countProcessed", runningTotal);
+                put("updatedTimestamp", System.currentTimeMillis());
             }};
 
-            statusService
-                .setStatus( CpNamingUtils.MANAGEMENT_APPLICATION_ID, jobId, StatusService.Status.COMPLETE,
status );
-        } ).subscribeOn( Schedulers.newThread() ).subscribe();
+            statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID,
+                jobId, StatusService.Status.COMPLETE, status).toBlocking().lastOrDefault(null);
+
+        }).doOnError( (throwable) -> {
+            logger.error("Error deduping connections", throwable);
+
+            final Map<String, Object> status = new HashMap<String, Object>()
{{
+                put("error", throwable.getMessage() );
+            }};
+
+            statusService.setStatus(CpNamingUtils.MANAGEMENT_APPLICATION_ID,
+                jobId, StatusService.Status.FAILED, status).toBlocking().lastOrDefault(null);;
+
+        } ).subscribeOn(Schedulers.newThread()).subscribe();
+
 
+        final StatusService.JobStatus status =
+            new StatusService.JobStatus( jobId, StatusService.Status.STARTED, new HashMap<>(
 ) );
 
-        final StatusService.JobStatus status = new StatusService.JobStatus( jobId, StatusService.Status.STARTED,
new HashMap<>(  ) );
         return createResult( status, callback );
     }
 


Mime
View raw message