001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (c) 2004-2007 Paul Ferraro
004 * 
005 * This library is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Lesser General Public License as published by the 
007 * Free Software Foundation; either version 2.1 of the License, or (at your 
008 * option) any later version.
009 * 
010 * This library is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this library; if not, write to the Free Software Foundation, 
017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018 * 
019 * Contact: ferraro@users.sourceforge.net
020 */
021package net.sf.hajdbc.distributable;
022
023import java.text.MessageFormat;
024import java.util.Collection;
025import java.util.Set;
026
027import net.sf.hajdbc.DatabaseCluster;
028import net.sf.hajdbc.DatabaseEvent;
029import net.sf.hajdbc.Messages;
030import net.sf.hajdbc.StateManager;
031
032import org.jgroups.Address;
033import org.jgroups.Channel;
034import org.jgroups.Message;
035import org.jgroups.MessageListener;
036import org.jgroups.blocks.GroupRequest;
037import org.jgroups.blocks.MessageDispatcher;
038import org.jgroups.blocks.RequestHandler;
039import org.jgroups.util.Rsp;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * StateManager implementation that broadcasts database activations and deactivations to other group members
045 * and retrieves initial state from another group member.
046 * 
047 * @author Paul Ferraro
048 */
049public class DistributableStateManager extends AbstractMembershipListener implements StateManager, MessageListener, RequestHandler
050{
051        private static final String CHANNEL = "{0}-state"; //$NON-NLS-1$
052        
053        private static Logger logger = LoggerFactory.getLogger(DistributableStateManager.class);
054        
055        private int timeout;
056        private MessageDispatcher dispatcher;
057        private DatabaseCluster<?> databaseCluster;
058        private StateManager stateManager;
059        
060        /**
061         * @param databaseCluster
062         * @param decorator
063         * @throws Exception
064         */
065        public DistributableStateManager(DatabaseCluster<?> databaseCluster, DistributableDatabaseClusterDecorator decorator) throws Exception
066        {
067                super(decorator.createChannel(MessageFormat.format(CHANNEL, databaseCluster.getId())));
068                
069                this.databaseCluster = databaseCluster;
070                
071                this.dispatcher = new MessageDispatcher(this.channel, this, this, this);
072
073                this.timeout = decorator.getTimeout();
074                this.stateManager = databaseCluster.getStateManager();
075        }
076
077        /**
078         * @see org.jgroups.blocks.RequestHandler#handle(org.jgroups.Message)
079         */
080        @SuppressWarnings("unchecked")
081        @Override
082        public Object handle(Message message)
083        {
084                try
085                {
086                        Command<Object> command = (Command) message.getObject();
087        
088                        logger.info(Messages.getMessage(Messages.COMMAND_RECEIVED, command));
089                        
090                        return command.marshalResult(command.execute(this.databaseCluster, this.stateManager));
091                }
092                catch (Throwable e)
093                {
094                        logger.error(e.getMessage(), e);
095                        
096                        return e;
097                }
098        }
099
100        /**
101         * @see net.sf.hajdbc.StateManager#getInitialState()
102         */
103        @Override
104        public Set<String> getInitialState()
105        {
106                Command<Set<String>> command = new QueryInitialStateCommand();
107
108                Collection<Rsp> responses = this.send(command, GroupRequest.GET_FIRST, this.timeout);
109                
110                for (Rsp response: responses)
111                {
112                        Object result = response.getValue();
113                        
114                        if (result != null)
115                        {
116                                Set<String> state = command.unmarshalResult(result);
117                                
118                                logger.info(Messages.getMessage(Messages.INITIAL_CLUSTER_STATE_REMOTE, state, response.getSender()));
119                                
120                                return state;
121                        }
122                }
123
124                return this.stateManager.getInitialState();
125        }
126
127        /**
128         * @see net.sf.hajdbc.DatabaseActivationListener#activated(net.sf.hajdbc.DatabaseEvent)
129         */
130        @Override
131        public void activated(DatabaseEvent event)
132        {
133                if (this.databaseCluster.isActive())
134                {
135                        // Send synchronous notification
136                        this.send(new ActivateCommand(event.getId()), GroupRequest.GET_ALL, 0);
137                }
138                
139                this.stateManager.activated(event);
140        }
141
142        /**
143         * @see net.sf.hajdbc.DatabaseDeactivationListener#deactivated(net.sf.hajdbc.DatabaseEvent)
144         */
145        @Override
146        public void deactivated(DatabaseEvent event)
147        {
148                // Send asynchronous notification
149                this.send(new DeactivateCommand(event.getId()), GroupRequest.GET_NONE, this.timeout);
150                
151                this.stateManager.deactivated(event);
152        }
153
154        private Collection<Rsp> send(Command<?> command, int mode, long timeout)
155        {
156                return this.dispatcher.castMessage(null, this.createMessage(command), mode, timeout).values();
157        }
158        
159        private Message createMessage(Command<?> command)
160        {
161                return new Message(null, this.dispatcher.getChannel().getLocalAddress(), command);
162        }
163        
164        /**
165         * @see net.sf.hajdbc.Lifecycle#start()
166         */
167        @Override
168        public void start() throws Exception
169        {
170                Channel channel = this.dispatcher.getChannel();
171                
172                channel.connect(channel.getClusterName());
173
174                this.dispatcher.start();
175                
176                this.stateManager.start();
177        }
178
179        /**
180         * @see net.sf.hajdbc.Lifecycle#stop()
181         */
182        @Override
183        public void stop()
184        {
185                this.dispatcher.stop();
186                
187                this.dispatcher.getChannel().close();
188                
189                this.stateManager.stop();
190        }
191
192        /**
193         * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberJoined(org.jgroups.Address)
194         */
195        @Override
196        protected void memberJoined(Address address)
197        {
198                logger.info(Messages.getMessage(Messages.GROUP_MEMBER_JOINED, address, this.databaseCluster));
199        }
200
201        /**
202         * @see net.sf.hajdbc.distributable.AbstractMembershipListener#memberLeft(org.jgroups.Address)
203         */
204        @Override
205        protected void memberLeft(Address address)
206        {
207                logger.info(Messages.getMessage(Messages.GROUP_MEMBER_LEFT, address, this.databaseCluster));
208        }
209
210        /**
211         * @see org.jgroups.MessageListener#getState()
212         */
213        @Override
214        public byte[] getState()
215        {
216                return null;
217        }
218
219        /**
220         * @see org.jgroups.MessageListener#setState(byte[])
221         */
222        @Override
223        public void setState(byte[] state)
224        {
225                // Do nothing
226        }
227
228        /**
229         * @see org.jgroups.MessageListener#receive(org.jgroups.Message)
230         */
231        @Override
232        public void receive(Message message)
233        {
234                // Do nothing
235        }
236}