001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (c) 2004-2007 Paul Ferraro 004 * 005 * This library is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as published by the 007 * Free Software Foundation; either version 2.1 of the License, or (at your 008 * option) any later version. 009 * 010 * This library is distributed in the hope that it will be useful, but WITHOUT 011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this library; if not, write to the Free Software Foundation, 017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 * 019 * Contact: ferraro@users.sourceforge.net 020 */ 021package net.sf.hajdbc.distributable; 022 023import java.text.MessageFormat; 024import java.util.Collection; 025import java.util.Set; 026 027import net.sf.hajdbc.DatabaseCluster; 028import net.sf.hajdbc.DatabaseEvent; 029import net.sf.hajdbc.Messages; 030import net.sf.hajdbc.StateManager; 031 032import org.jgroups.Address; 033import org.jgroups.Channel; 034import org.jgroups.Message; 035import org.jgroups.MessageListener; 036import org.jgroups.blocks.GroupRequest; 037import org.jgroups.blocks.MessageDispatcher; 038import org.jgroups.blocks.RequestHandler; 039import org.jgroups.util.Rsp; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * StateManager implementation that broadcasts database activations and deactivations to other group members 045 * and retrieves initial state from another group member. 046 * 047 * @author Paul Ferraro 048 */ 049public class DistributableStateManager extends AbstractMembershipListener implements StateManager, MessageListener, RequestHandler 050{ 051 private static final String CHANNEL = "{0}-state"; //$NON-NLS-1$ 052 053 private static Logger logger = LoggerFactory.getLogger(DistributableStateManager.class); 054 055 private int timeout; 056 private MessageDispatcher dispatcher; 057 private DatabaseCluster<?> databaseCluster; 058 private StateManager stateManager; 059 060 /** 061 * @param databaseCluster 062 * @param decorator 063 * @throws Exception 064 */ 065 public DistributableStateManager(DatabaseCluster<?> databaseCluster, DistributableDatabaseClusterDecorator decorator) throws Exception 066 { 067 super(decorator.createChannel(MessageFormat.format(CHANNEL, databaseCluster.getId()))); 068 069 this.databaseCluster = databaseCluster; 070 071 this.dispatcher = new MessageDispatcher(this.channel, this, this, this); 072 073 this.timeout = decorator.getTimeout(); 074 this.stateManager = databaseCluster.getStateManager(); 075 } 076 077 /** 078 * @see org.jgroups.blocks.RequestHandler#handle(org.jgroups.Message) 079 */ 080 @SuppressWarnings("unchecked") 081 @Override 082 public Object handle(Message message) 083 { 084 try 085 { 086 Command<Object> command = (Command) message.getObject(); 087 088 logger.info(Messages.getMessage(Messages.COMMAND_RECEIVED, command)); 089 090 return command.marshalResult(command.execute(this.databaseCluster, this.stateManager)); 091 } 092 catch (Throwable e) 093 { 094 logger.error(e.getMessage(), e); 095 096 return e; 097 } 098 } 099 100 /** 101 * @see net.sf.hajdbc.StateManager#getInitialState() 102 */ 103 @Override 104 public Set<String> getInitialState() 105 { 106 Command<Set<String>> command = new QueryInitialStateCommand(); 107 108 Collection<Rsp> responses = this.send(command, GroupRequest.GET_FIRST, this.timeout); 109 110 for (Rsp response: responses) 111 { 112 Object result = response.getValue(); 113 114 if (result != null) 115 { 116 Set<String> state = command.unmarshalResult(result); 117 118 logger.info(Messages.getMessage(Messages.INITIAL_CLUSTER_STATE_REMOTE, state, response.getSender())); 119 120 return state; 121 } 122 } 123 124 return this.stateManager.getInitialState(); 125 } 126 127 /** 128 * @see net.sf.hajdbc.DatabaseActivationListener#activated(net.sf.hajdbc.DatabaseEvent) 129 */ 130 @Override 131 public void activated(DatabaseEvent event) 132 { 133 if (this.databaseCluster.isActive()) 134 { 135 // Send synchronous notification 136 this.send(new ActivateCommand(event.getId()), GroupRequest.GET_ALL, 0); 137 } 138 139 this.stateManager.activated(event); 140 } 141 142 /** 143 * @see net.sf.hajdbc.DatabaseDeactivationListener#deactivated(net.sf.hajdbc.DatabaseEvent) 144 */ 145 @Override 146 public void deactivated(DatabaseEvent event) 147 { 148 // Send asynchronous notification 149 this.send(new DeactivateCommand(event.getId()), GroupRequest.GET_NONE, this.timeout); 150 151 this.stateManager.deactivated(event); 152 } 153 154 private Collection<Rsp> send(Command<?> command, int mode, long timeout) 155 { 156 return this.dispatcher.castMessage(null, this.createMessage(command), mode, timeout).values(); 157 } 158 159 private Message createMessage(Command<?> command) 160 { 161 return new Message(null, this.dispatcher.getChannel().getLocalAddress(), command); 162 } 163 164 /** 165 * @see net.sf.hajdbc.Lifecycle#start() 166 */ 167 @Override 168 public void start() throws Exception 169 { 170 Channel channel = this.dispatcher.getChannel(); 171 172 channel.connect(channel.getClusterName()); 173 174 this.dispatcher.start(); 175 176 this.stateManager.start(); 177 } 178 179 /** 180 * @see net.sf.hajdbc.Lifecycle#stop() 181 */ 182 @Override 183 public void stop() 184 { 185 this.dispatcher.stop(); 186 187 this.dispatcher.getChannel().close(); 188 189 this.stateManager.stop(); 190 } 191 192 /** 193 * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberJoined(org.jgroups.Address) 194 */ 195 @Override 196 protected void memberJoined(Address address) 197 { 198 logger.info(Messages.getMessage(Messages.GROUP_MEMBER_JOINED, address, this.databaseCluster)); 199 } 200 201 /** 202 * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberLeft(org.jgroups.Address) 203 */ 204 @Override 205 protected void memberLeft(Address address) 206 { 207 logger.info(Messages.getMessage(Messages.GROUP_MEMBER_LEFT, address, this.databaseCluster)); 208 } 209 210 /** 211 * @see org.jgroups.MessageListener#getState() 212 */ 213 @Override 214 public byte[] getState() 215 { 216 return null; 217 } 218 219 /** 220 * @see org.jgroups.MessageListener#setState(byte[]) 221 */ 222 @Override 223 public void setState(byte[] state) 224 { 225 // Do nothing 226 } 227 228 /** 229 * @see org.jgroups.MessageListener#receive(org.jgroups.Message) 230 */ 231 @Override 232 public void receive(Message message) 233 { 234 // Do nothing 235 } 236}