brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [1/2] incubator-brooklyn git commit: Refactoring openIptables execution
Date Tue, 27 Oct 2015 12:26:12 GMT
Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 8457515f6 -> c6273b891


Refactoring openIptables execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/3cc2e5b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/3cc2e5b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/3cc2e5b4

Branch: refs/heads/master
Commit: 3cc2e5b4e885c6c998b19747d764f767f9065307
Parents: 3965305
Author: Ivana Yovcheva <ivana.yovcheva@gmail.com>
Authored: Wed Oct 21 13:38:39 2015 +0300
Committer: Ivana Yovcheva <ivana.yovcheva@gmail.com>
Committed: Tue Oct 27 14:10:26 2015 +0200

----------------------------------------------------------------------
 .../entity/machine/MachineInitTasks.java        | 164 ++++++++++++-------
 1 file changed, 106 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3cc2e5b4/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java
b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java
index be791b3..ff540f2 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java
@@ -18,15 +18,12 @@
  */
 package org.apache.brooklyn.entity.machine;
 
-import java.io.ByteArrayOutputStream;
 import java.util.List;
-import java.util.concurrent.Callable;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
-import org.apache.brooklyn.util.stream.Streams;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,29 +58,35 @@ public class MachineInitTasks {
     }
 
     /**
-     * Returns a queued {@link Task} which opens the given ports in iptables on the given
machine.
-     */
-    public Task<Void> openIptablesAsync(final Iterable<Integer> inboundPorts,
final SshMachineLocation machine) {
-        return DynamicTasks.queue("open iptables "+toTruncatedString(inboundPorts, 6), new
Callable<Void>() {
-            public Void call() {
-                openIptablesImpl(inboundPorts, machine);
-                return null;
-            }
-        });
-    }
-
-    /**
      * Returns a queued {@link Task} which stops iptables on the given machine.
      */
     public Task<Void> stopIptablesAsync(final SshMachineLocation machine) {
-        return DynamicTasks.queue("stop iptables", new Callable<Void>() {
-            public Void call() {
+        return DynamicTasks.queue("stop iptables", new Runnable() {
+            public void run() {
                 stopIptablesImpl(machine);
-                return null;
             }
         });
     }
 
+    protected void stopIptablesImpl(final SshMachineLocation machine) {
+
+        log.info("Stopping iptables for {} at {}", entity(), machine);
+
+        List<String> cmds = ImmutableList.<String>of();
+
+        Task<Integer> checkFirewall = checkLocationFirewall(machine);
+
+        if (checkFirewall.getUnchecked() == 0) {
+            cmds = ImmutableList.of(IptablesCommands.firewalldServiceStop(), IptablesCommands.firewalldServiceStatus());
+        } else {
+            cmds = ImmutableList.of(IptablesCommands.iptablesServiceStop(), IptablesCommands.iptablesServiceStatus());
+        }
+
+
+        subTaskHelperAllowingNonZeroExitCode("execute stop iptables", machine, cmds.toArray(new
String[cmds.size()]));
+    }
+
+
     /**
      * See docs in {@link BashCommands#dontRequireTtyForSudo()}
      */
@@ -91,6 +94,17 @@ public class MachineInitTasks {
         return DynamicTasks.queue(SshTasks.dontRequireTtyForSudo(machine, true).newTask().asTask());
     }
 
+    /**
+     * Returns a queued {@link Task} which opens the given ports in iptables on the given
machine.
+     */
+    public Task<Void> openIptablesAsync(final Iterable<Integer> inboundPorts,
final SshMachineLocation machine) {
+        return DynamicTasks.queue("open iptables "+toTruncatedString(inboundPorts, 6), new
Runnable() {
+            public void run() {
+                openIptablesImpl(inboundPorts, machine);
+            }
+        });
+    }
+
     protected void openIptablesImpl(Iterable<Integer> inboundPorts, SshMachineLocation
machine) {
         if (inboundPorts == null || Iterables.isEmpty(inboundPorts)) {
             log.info("No ports to open in iptables (no inbound ports) for {} at {}", machine,
this);
@@ -98,47 +112,91 @@ public class MachineInitTasks {
             log.info("Opening ports in iptables for {} at {}", entity(), machine);
 
             List<String> iptablesRules = Lists.newArrayList();
+            String iptablesInstallCommands = null;
 
-            if (isLocationFirewalldEnabled(machine)) {
+            Task<Integer> checkFirewall = checkLocationFirewall(machine);
+
+            if (checkFirewall.getUnchecked() == 0) {
                 for (Integer port : inboundPorts) {
                     iptablesRules.add(IptablesCommands.addFirewalldRule(Chain.INPUT, Protocol.TCP,
port, Policy.ACCEPT));
                  }
             } else {
                 iptablesRules = createIptablesRulesForNetworkInterface(inboundPorts);
-                iptablesRules.add(IptablesCommands.saveIptablesRules());
-            }
-            List<String> batch = Lists.newArrayList();
-
-            ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-            ByteArrayOutputStream errStream = new ByteArrayOutputStream();
-            Tasks.addTagDynamically(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDOUT,
outStream));
-            Tasks.addTagDynamically(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDERR,
errStream));
-            // Some entities, such as Riak (erlang based) have a huge range of ports, which
leads to a script that
-            // is too large to run (fails with a broken pipe). Batch the rules into batches
of 50
-            for (String rule : iptablesRules) {
-                batch.add(rule);
-                if (batch.size() == 50) {
-                    machine.execCommands(ImmutableMap.of("out", outStream, "err", errStream),
"Inserting iptables rules, 50 command batch", batch);
-                    batch.clear();
-                }
+                iptablesInstallCommands = IptablesCommands.saveIptablesRules();
             }
-            if (batch.size() > 0) {
-                machine.execCommands(ImmutableMap.of("out", outStream, "err", errStream),
"Inserting iptables rules", batch);
-            }
-            machine.execCommands(ImmutableMap.of("out", outStream, "err", errStream), "List
iptables rules", ImmutableList.of(IptablesCommands.listIptablesRule()));
+
+            insertIptablesRules(iptablesRules, iptablesInstallCommands, machine);
+            listIptablesRules(machine);
         }
     }
 
-    protected void stopIptablesImpl(SshMachineLocation machine) {
-        log.info("Stopping iptables for {} at {}", entity(), machine);
+    /**
+     * Returns a queued {@link Task} which checks if location firewall is enabled.
+     */
+    public Task<Integer> checkLocationFirewall(final SshMachineLocation machine) {
+        return subTaskHelperAllowingNonZeroExitCode("check if firewall is active", machine,
IptablesCommands.firewalldServiceIsActive());
+    }
 
-        List<String> cmds = ImmutableList.<String>of();
-        if (isLocationFirewalldEnabled(machine)) {
-            cmds = ImmutableList.of(IptablesCommands.firewalldServiceStop(), IptablesCommands.firewalldServiceStatus());
-        } else {
-            cmds = ImmutableList.of(IptablesCommands.iptablesServiceStop(), IptablesCommands.iptablesServiceStatus());
+    /**
+     * Returns a queued {@link Task} which inserts iptables rules.
+     */
+    private Task<Void> insertIptablesRules(final List<String> iptablesRules,
final String installCommands, final SshMachineLocation machine) {
+        return DynamicTasks.queue("insert rules", new Runnable() {
+            public void run() {
+                insertIptablesRulesImpl(iptablesRules, installCommands, machine);
+            }
+        });
+    }
+
+    private void insertIptablesRulesImpl(List<String> iptablesRules, String installCommands,
SshMachineLocation machine) {
+
+        // Some entities, such as Riak (erlang based) have a huge range of ports, which leads
to a script that
+        // is too large to run (fails with a broken pipe). Batch the rules into batches of
100
+        List<List<String> > batches = Lists.partition(iptablesRules, 100);
+
+        int batchNumber = 0;
+        for (List<String> batch : batches) {
+            batchNumber++;
+            insertIptablesRulesOnCommandBatches(batch, machine, batchNumber);
         }
-        machine.execCommands("Stopping iptables", cmds);
+        if (installCommands != null) {
+            serviceIptablesSave(installCommands, machine);
+        }
+    }
+
+    /**
+     * Returns a queued {@link Task} which inserts iptables rules on command batches.
+     */
+    private Task<Integer> insertIptablesRulesOnCommandBatches(final List<String>
commandsBatch, final SshMachineLocation machine, int batchNumber) {
+        return subTaskHelperRequiringZeroExitCode("commands batch " + batchNumber, machine,
commandsBatch.toArray(new String[commandsBatch.size()]));
+    }
+
+    /**
+     * Returns a queued {@link Task} which runs iptables save commands.
+     */
+    private Task<Integer> serviceIptablesSave(final String installCommands, final SshMachineLocation
machine) {
+        return subTaskHelperRequiringZeroExitCode("save", machine, installCommands);
+    }
+
+    /**
+     * Returns a queued {@link Task} which lists the iptables rules.
+     */
+    private Task<Integer> listIptablesRules(final SshMachineLocation machine) {
+        return subTaskHelperRequiringZeroExitCode("list rules", machine, IptablesCommands.listIptablesRule());
+    }
+
+    private Task<Integer> subTaskHelperRequiringZeroExitCode(String taskName, SshMachineLocation
machine, String... comands) {
+        ProcessTaskFactory<Integer> taskFactory = SshTasks.newSshExecTaskFactory(machine,
comands)
+                .summary(taskName)
+                .requiringExitCodeZero();
+        return DynamicTasks.queue(taskFactory).asTask();
+    }
+
+    private Task<Integer> subTaskHelperAllowingNonZeroExitCode(String taskName, SshMachineLocation
machine, String... comands) {
+        ProcessTaskFactory<Integer> taskFactory = SshTasks.newSshExecTaskFactory(machine,
comands)
+                .summary(taskName)
+                .allowingNonZeroExitCode();
+        return DynamicTasks.queue(taskFactory).asTask();
     }
     
     private List<String> createIptablesRulesForNetworkInterface(Iterable<Integer>
ports) {
@@ -148,16 +206,6 @@ public class MachineInitTasks {
         }
         return iptablesRules;
      }
-
-    public boolean isLocationFirewalldEnabled(SshMachineLocation location) {
-        int result = location.execCommands("checking if firewalld is active", 
-                ImmutableList.of(IptablesCommands.firewalldServiceIsActive()));
-        if (result == 0) {
-            return true;
-        }
-        
-        return false;
-    }
     
     protected String toTruncatedString(Iterable<?> vals, int maxShown) {
         StringBuilder result = new StringBuilder("[");


Mime
View raw message