tomcat-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vince Stewart <stewart.vi...@gmail.com>
Subject Re: Share info across different sessions & servers
Date Fri, 05 Jul 2013 23:49:01 GMT
hi Jose,
here is a working demo for exchanging messages between remote hosts using
tribes


import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.membership.StaticMember;


/**
 *
 * @author vince
 * demonstration of tribes messaging between remote hosts
 * three jar files are required
   (org.apache)
   catalina.jar
   catalina-tribes.jar
   tomcat-embed-logging-juli.jar

   compile and run with following arguments
   java -jar jarfileName.jar ss
   java -jar jarfileName.jar xx.xx.xx.xx NNNN

  where jarfileName is whatever you call your jar file
  ss is the literal argument "ss" for the machine designated as
"superserver" (sort of arbitrary)

  for machines other than superserver:
  xx.xx.xx.xx is the IPv4 address for the superserver machine
  NNNN is relevant port number

  Note that tribes normally uses port 4000. However if you run the
application at superserver first
  you will then know which port number tribes is using (could be 4001, 4002
...)

  Note also that if the superserver is part of a LAN, there will be a
router involved;
  the remote applications will then need to point to the router IP address
and the router port that
  is redirected to the machine/port running tribes (more of this below)

  If non-superserver machines use a router, these also have to have
appropriate redirections set up.

 Some log output refers to clustering functionality operating.
 This does not seem to interfere with the messaging operations used.
 I have not looked into any possible mechanisms to suppress clustering
threads.
 A very important limitation to remote messaging (and a possible solution)
is discussed here.
http://tomcat.10.x6.nabble
.com/overcoming-a-message-size-limitation-in-tribes-parallel-messaging-with-NioSender-tt4995446.html

 */
public class TribesRemote {
static TribesRemote tribesRemote;
static Charset UTF8=Charset.forName("UTF-8");
static int membersOnLine=0;
ChannelListener msgListener;// = new MyMessageListener(this);
MembershipListener mbrListener;// = new MyMemberListener();
StaticMember superServerStaticMember;
boolean amSuperServer=false;
private long lastMessageReceived;
String remoteHostIPv4Address;
int remoteHostPort;
Channel myChannel;

public static void log(String s){
System.out.println("INFO:  "+s);
}

public static void log(Exception ex){
System.out.println("ERROR  "+ex.getMessage());
//Logger.getLogger(TribesRemote.class.getName()).log(Level.SEVERE, null,
ex);//uncomment here for detailed error msg
}

public static void main(String[]args) throws IOException, ChannelException{
TribesRemote.tribesRemote=new TribesRemote(args);
TribesRemote.tribesRemote.engage();
}

TribesRemote(String[] args) throws IOException, ChannelException{
log(this.addressesForThisMachine());
remoteHostIPv4Address="xx.xx.xx.xx"; //actual superserver address may be
placed here
remoteHostPort=4000; //actual superserver port may be placed here
if(args.length==0){
log("TribesRemote has been lauched with zero arguments");
}
else{
String argz="";
for(int i=0;i<args.length;i++)argz=argz+args[i]+"  ";
log("TribesRemote has been lauched with the following arguments "+argz);
 if(args.length>1){   // otherwise subservers can be started using two
arguments
 remoteHostIPv4Address=args[0]; // IPv4 adddress in xx.xx.xx.xx format
where x is numeric
 remoteHostPort=Integer.parseInt(args[1]); //the port number which tribes
starts on
 }
 if(args[0].equalsIgnoreCase("ss")){ ///NB superserver started with
singular argument "ss"
 amSuperServer=true;
 log("this is deginated superserver and will wait for members to introduce
themselves");
 }
 else{
 superServerStaticMember=new
StaticMember(remoteHostIPv4Address,remoteHostPort,0);
 }
}

                         //tribes always uses 4000 if available (otherwise
it uses 4001, 4002 etc)
                         //this application will output the actual tribes
address => std output
                         //but setting remoteHostPort to 4000 may not be
appropriate if you have a router
                         //if you have a router then the remote host needs
to know the IPv4 for the router and
                         //the redirection port number pointing to a tribes
socket for a LAN machine
                         //for example:
                         //the remote router has fixed IP address say
203.12.32.235
                         //the remote machine running tribes (on p4000) has
fixed LAN address say 192.168.1.63
                         //(in Linux you set fixed LAN IP address in the
'/etc/hosts' file)
                         //lets say you create a port-forwarding in the
router so port 45555 redirects to 192.168.1.63 / port 4000
                         //in such a case the value for
remoteHostIPv4Address will be 203.12.32.235
                         //and remoteHostPort will be 45555
                         //ie {203.12.32.235 , 45555} => redirects to =>
{192.168.1.63 , 4000}
                         //if there is no router (say you have 2 VPS), then
no redirect is required
                         //whereupon remoteHostPort will probably be 4000
(as long as that address is free when tribes starts)


msgListener = new MyMessageListener();
mbrListener = new MyMemberListener();
myChannel = new GroupChannel();
myChannel.addMembershipListener(mbrListener);
myChannel.addChannelListener(msgListener);
myChannel.start(Channel.DEFAULT);
log("TRIBES HAS STARTED ON PORT: "+Integer.toString(getLocalPort()));
}

void sendMessage(String message,Member member) throws ChannelException{
Member[] group = new Member[]{member};
this.myChannel.send(group,new
ByteMessage(message.getBytes(UTF8)),Channel.SEND_OPTIONS_DEFAULT);
}

String addressesForThisMachine() throws SocketException{
String ss="network addresses for this machine: ";
Enumeration<InetAddress>ee;
Enumeration<NetworkInterface>ei;
ei=NetworkInterface.getNetworkInterfaces();
 while(ei.hasMoreElements()){
 ee=((NetworkInterface)ei.nextElement()).getInetAddresses();
  while(ee.hasMoreElements()){
  ss=ss+ee.nextElement().getHostAddress()+";    ";
  }
 }
return ss;
}


/*
 * subserver will run this
 */
private void engageInDialogue(){
 do{
  try{
  this.sendMessage("* hello superserver *",superServerStaticMember);
  }
  catch (ChannelException ex) {
  TribesRemote.log(ex);
  }
  try {
  Thread.currentThread().sleep(10000);
  }
  catch (InterruptedException ex) {
  System.exit(0);
  }
  if(System.currentTimeMillis()-lastMessageTime()>12000){
  log("no messages in last 12 seconds");
  }
 }while(true);
}
/*
 * superserver will run this
 */
private void waitForMessages(){
 do{
  try{
  Thread.currentThread().sleep(10000);
  }
  catch (InterruptedException ex) {
  System.exit(0);
  }
 }while(true);
}

 int getLocalPort() {
 return this.myChannel.getLocalMember(true).getPort();
 }

 private void engage(){
  if(this.amSuperServer){
  this.waitForMessages();
  }
  else{
  this.engageInDialogue();
  }
 }

 private long lastMessageTime() {
 return this.lastMessageReceived;
 }

 void messageReceived(String message, Member sender) throws
ChannelException {
 this.lastMessageReceived=System.currentTimeMillis();
  if(this.amSuperServer){
  this.sendMessage(message + " ((hello back ))",sender);
  }
 }

 private static class MyMessageListener implements ChannelListener{

  @Override
  public void messageReceived(Serializable s,Member sender){
  byte[] b=((ByteMessage)s).getMessage();
  String message=new String(b,TribesRemote.UTF8);
  TribesRemote.log("message received:  "+message);
   try{
   TribesRemote.tribesRemote.messageReceived(message,sender);
   }
   catch (ChannelException ex) {
   TribesRemote.log(ex);
   }
  }

  @Override
  public boolean accept(Serializable msg, Member sender) {
  return true;
  }
 }

 //this is part of clustering that has not been removed from tribes
 private static class MyMemberListener implements MembershipListener{

 @Override
 public void memberAdded(Member member){
 //TribesRemote.tribesRemote.memberDetected(member);
 }

 @Override
 public void memberDisappeared(Member member){
 //TribesRemote.tribesRemote.memberGone(member);
 }

 }







}


On Thu, Jul 4, 2013 at 6:57 PM, Jose MarĂ­a Zaragoza <demablogia@gmail.com>wrote:

> Thanks Vince.
> I'll take a look , but , it doesn't look trivial , not at all
>
> Regards
>
>
> 2013/7/4 Vince Stewart <stewart.vince@gmail.com>
>
> > Hi Jose,
> >
> > a couple of things,
> > 1) I use embedded Tomcat to build my application and this has allowed me
> to
> > maintain 2 single-line patches in tribes classes by adding tribes source
> > code to my compilations. However those patches are only necessary with
> > large messages that take more than 3 seconds to be transmitted from the
> > transmitting machine to the Internet Service Provider machine (approx 0.5
> > meg for my system). There is a config setting (Sender/Transport/timeout)
> > that's supposed to alter this 3 second timeout limit but I'm not sure it
> > works.
> > 2) The implementation is not at all trivial. You have to register
> > StaticMember objects because usual member discovery does not work over
> > wide-area network (WAN). I allocate one machine as "SuperServer" and all
> > other machines have to enroll with SuperServer at startup. All machines
> > need to have a unique combination of Ipv4 address and port number (which
> > might represent a redirection port for use by the router whereupon
> > networked machines also need LAN addresses set). Once registration is
> > complete, all sub-Server machines can send/receive SuperServer and vice
> > versa.
> >
> > There is a tutorial on-line which is adequate but not for WAN. I think
> you
> > have at least two weeks of work in front of you using tribes but I am
> very
> > happy I used this method.
> > None of my code would add much (except confusion) to that in the
> tutorial.
> > Make sure you start without multicast enabled as it currently is suitable
> > only for LAN.
> >
> >  ///Class Constructor
> >  public ServerMessaging() throws SocketException{
> >  this.myChannel=new GroupChannel();
> >  ChannelListener msgListener = new ServerMessaging.MyMessageListener();
> >  MembershipListener mbrListener = new ServerMessaging.MyMemberListener();
> >  myChannel.addMembershipListener(mbrListener);
> >  myChannel.addChannelListener(msgListener);
> >   try{
> >
> >
> >
> myChannel.start(Channel.MBR_TX_SEQ|Channel.MBR_RX_SEQ|Channel.SND_TX_SEQ|Channel.SND_RX_SEQ);//no
> > multicast
> >   }
> >   catch(ChannelException e){
> >   U.log(e);
> >   }
> >  }
> >
> > public void detectOrderNumber_EnrollWithSuperServer() throws
> > ChannelException{
> > setMyServerOrderStatus(); // machine reads its mac address or some file;
> > then from a table will set serverOrderNumber to 0 for superserver ;
> others
> > 1,2,3...
> >  if(this.getServerOrderNumber()==0){  ////meaning this is the superserver
> >  someObject.doSomeThingMaybe();
> >  }
> >  else{
> >  this.sendAckRequiredMessage(0,"Enrollment"); /// first argument
> specifies
> > SuperServer, member 0 (a table will need to be provided to hold IPv4
> > address and port for each member)
> >  }
> > }
> >
>



-- 
Vince Stewart

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message