001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (c) 2004-2007 Paul Ferraro 004 * 005 * This library is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as published by the 007 * Free Software Foundation; either version 2.1 of the License, or (at your 008 * option) any later version. 009 * 010 * This library is distributed in the hope that it will be useful, but WITHOUT 011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this library; if not, write to the Free Software Foundation, 017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 * 019 * Contact: ferraro@users.sourceforge.net 020 */ 021package net.sf.hajdbc.sync; 022 023import java.sql.Connection; 024import java.sql.PreparedStatement; 025import java.sql.ResultSet; 026import java.sql.SQLException; 027import java.sql.Statement; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.List; 033import java.util.concurrent.Callable; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Future; 037 038import net.sf.hajdbc.Dialect; 039import net.sf.hajdbc.Messages; 040import net.sf.hajdbc.SynchronizationContext; 041import net.sf.hajdbc.SynchronizationStrategy; 042import net.sf.hajdbc.TableProperties; 043import net.sf.hajdbc.UniqueConstraint; 044import net.sf.hajdbc.util.SQLExceptionFactory; 045import net.sf.hajdbc.util.Strings; 046 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * Database-independent synchronization strategy that only updates differences between two databases. 052 * This strategy is best used when there are <em>few</em> differences between the active database and the inactive database (i.e. barely out of sync). 053 * The following algorithm is used: 054 * <ol> 055 * <li>Drop the foreign keys on the inactive database (to avoid integrity constraint violations)</li> 056 * <li>For each database table: 057 * <ol> 058 * <li>Drop the unique constraints on the table (to avoid integrity constraint violations)</li> 059 * <li>Find the primary key(s) of the table</li> 060 * <li>Query all rows in the inactive database table, sorting by the primary key(s)</li> 061 * <li>Query all rows on the active database table</li> 062 * <li>For each row in table: 063 * <ol> 064 * <li>If primary key of the rows are the same, determine whether or not row needs to be updated</li> 065 * <li>Otherwise, determine whether row should be deleted, or a new row is to be inserted</li> 066 * </ol> 067 * </li> 068 * <li>Re-create the unique constraints on the table (to avoid integrity constraint violations)</li> 069 * </ol> 070 * </li> 071 * <li>Re-create the foreign keys on the inactive database</li> 072 * <li>Synchronize sequences</li> 073 * </ol> 074 * @author Paul Ferraro 075 */ 076public class DifferentialSynchronizationStrategy implements SynchronizationStrategy 077{ 078 private static Logger logger = LoggerFactory.getLogger(DifferentialSynchronizationStrategy.class); 079 080 private int fetchSize = 0; 081 private int maxBatchSize = 100; 082 083 /** 084 * @see net.sf.hajdbc.SynchronizationStrategy#synchronize(net.sf.hajdbc.SynchronizationContext) 085 */ 086 @Override 087 public <D> void synchronize(SynchronizationContext<D> context) throws SQLException 088 { 089 Connection sourceConnection = context.getConnection(context.getSourceDatabase()); 090 Connection targetConnection = context.getConnection(context.getTargetDatabase()); 091 092 Dialect dialect = context.getDialect(); 093 ExecutorService executor = context.getExecutor(); 094 095 boolean autoCommit = targetConnection.getAutoCommit(); 096 097 targetConnection.setAutoCommit(true); 098 099 SynchronizationSupport.dropForeignKeys(context); 100 SynchronizationSupport.dropUniqueConstraints(context); 101 102 targetConnection.setAutoCommit(false); 103 104 try 105 { 106 for (TableProperties table: context.getSourceDatabaseProperties().getTables()) 107 { 108 String tableName = table.getName(); 109 110 UniqueConstraint primaryKey = table.getPrimaryKey(); 111 112 if (primaryKey == null) 113 { 114 throw new SQLException(Messages.getMessage(Messages.PRIMARY_KEY_REQUIRED, this.getClass().getName(), tableName)); 115 } 116 117 List<String> primaryKeyColumnList = primaryKey.getColumnList(); 118 119 Collection<String> columns = table.getColumns(); 120 121 // List of colums for select statement - starting with primary key 122 List<String> columnList = new ArrayList<String>(columns.size()); 123 124 columnList.addAll(primaryKeyColumnList); 125 126 for (String column: columns) 127 { 128 if (!primaryKeyColumnList.contains(column)) 129 { 130 columnList.add(column); 131 } 132 } 133 134 List<String> nonPrimaryKeyColumnList = columnList.subList(primaryKeyColumnList.size(), columnList.size()); 135 136 String commaDelimitedColumns = Strings.join(columnList, Strings.PADDED_COMMA); 137 138 // Retrieve table rows in primary key order 139 final String selectSQL = "SELECT " + commaDelimitedColumns + " FROM " + tableName + " ORDER BY " + Strings.join(primaryKeyColumnList, Strings.PADDED_COMMA); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ 140 141 final Statement targetStatement = targetConnection.createStatement(); 142 143 targetStatement.setFetchSize(this.fetchSize); 144 145 logger.debug(selectSQL); 146 147 Callable<ResultSet> callable = new Callable<ResultSet>() 148 { 149 public ResultSet call() throws SQLException 150 { 151 return targetStatement.executeQuery(selectSQL); 152 } 153 }; 154 155 Future<ResultSet> future = executor.submit(callable); 156 157 Statement sourceStatement = sourceConnection.createStatement(); 158 sourceStatement.setFetchSize(this.fetchSize); 159 160 ResultSet sourceResultSet = sourceStatement.executeQuery(selectSQL); 161 162 ResultSet inactiveResultSet = future.get(); 163 164 String primaryKeyWhereClause = " WHERE " + Strings.join(primaryKeyColumnList, " = ? AND ") + " = ?"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ 165 166 // Construct DELETE SQL 167 String deleteSQL = "DELETE FROM " + tableName + primaryKeyWhereClause; //$NON-NLS-1$ 168 169 logger.debug(deleteSQL); 170 171 PreparedStatement deleteStatement = targetConnection.prepareStatement(deleteSQL); 172 173 // Construct INSERT SQL 174 String insertSQL = "INSERT INTO " + tableName + " (" + commaDelimitedColumns + ") VALUES (" + Strings.join(Collections.nCopies(columnList.size(), Strings.QUESTION), Strings.PADDED_COMMA) + ")"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ 175 176 logger.debug(insertSQL); 177 178 PreparedStatement insertStatement = targetConnection.prepareStatement(insertSQL); 179 180 // Construct UPDATE SQL 181 PreparedStatement updateStatement = null; 182 183 if (!nonPrimaryKeyColumnList.isEmpty()) 184 { 185 String updateSQL = "UPDATE " + tableName + " SET " + Strings.join(nonPrimaryKeyColumnList, " = ?, ") + " = ?" + primaryKeyWhereClause; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ 186 187 logger.debug(updateSQL); 188 189 updateStatement = targetConnection.prepareStatement(updateSQL); 190 } 191 192 boolean hasMoreActiveResults = sourceResultSet.next(); 193 boolean hasMoreInactiveResults = inactiveResultSet.next(); 194 195 int insertCount = 0; 196 int updateCount = 0; 197 int deleteCount = 0; 198 199 while (hasMoreActiveResults || hasMoreInactiveResults) 200 { 201 int compare = 0; 202 203 if (!hasMoreActiveResults) 204 { 205 compare = 1; 206 } 207 else if (!hasMoreInactiveResults) 208 { 209 compare = -1; 210 } 211 else 212 { 213 for (int i = 1; i <= primaryKeyColumnList.size(); ++i) 214 { 215 Object activeObject = sourceResultSet.getObject(i); 216 Object inactiveObject = inactiveResultSet.getObject(i); 217 218 // We assume that the primary keys column types are Comparable 219 compare = this.compare(activeObject, inactiveObject); 220 221 if (compare != 0) 222 { 223 break; 224 } 225 } 226 } 227 228 if (compare > 0) 229 { 230 deleteStatement.clearParameters(); 231 232 for (int i = 1; i <= primaryKeyColumnList.size(); ++i) 233 { 234 int type = dialect.getColumnType(table.getColumnProperties(columnList.get(i - 1))); 235 236 deleteStatement.setObject(i, inactiveResultSet.getObject(i), type); 237 } 238 239 deleteStatement.addBatch(); 240 241 deleteCount += 1; 242 243 if ((deleteCount % this.maxBatchSize) == 0) 244 { 245 deleteStatement.executeBatch(); 246 deleteStatement.clearBatch(); 247 } 248 } 249 else if (compare < 0) 250 { 251 insertStatement.clearParameters(); 252 253 for (int i = 1; i <= columnList.size(); ++i) 254 { 255 int type = dialect.getColumnType(table.getColumnProperties(columnList.get(i - 1))); 256 257 Object object = SynchronizationSupport.getObject(sourceResultSet, i, type); 258 259 if (sourceResultSet.wasNull()) 260 { 261 insertStatement.setNull(i, type); 262 } 263 else 264 { 265 insertStatement.setObject(i, object, type); 266 } 267 } 268 269 insertStatement.addBatch(); 270 271 insertCount += 1; 272 273 if ((insertCount % this.maxBatchSize) == 0) 274 { 275 insertStatement.executeBatch(); 276 insertStatement.clearBatch(); 277 } 278 } 279 else if (updateStatement != null) // if (compare == 0) 280 { 281 updateStatement.clearParameters(); 282 283 boolean updated = false; 284 285 for (int i = primaryKeyColumnList.size() + 1; i <= columnList.size(); ++i) 286 { 287 int type = dialect.getColumnType(table.getColumnProperties(columnList.get(i - 1))); 288 289 Object activeObject = SynchronizationSupport.getObject(sourceResultSet, i, type); 290 Object inactiveObject = SynchronizationSupport.getObject(inactiveResultSet, i, type); 291 292 int index = i - primaryKeyColumnList.size(); 293 294 if (sourceResultSet.wasNull()) 295 { 296 updateStatement.setNull(index, type); 297 298 updated |= !inactiveResultSet.wasNull(); 299 } 300 else 301 { 302 updateStatement.setObject(index, activeObject, type); 303 304 updated |= inactiveResultSet.wasNull(); 305 updated |= !equals(activeObject, inactiveObject); 306 } 307 } 308 309 if (updated) 310 { 311 for (int i = 1; i <= primaryKeyColumnList.size(); ++i) 312 { 313 int type = dialect.getColumnType(table.getColumnProperties(columnList.get(i - 1))); 314 315 updateStatement.setObject(i + nonPrimaryKeyColumnList.size(), inactiveResultSet.getObject(i), type); 316 } 317 318 updateStatement.addBatch(); 319 320 updateCount += 1; 321 322 if ((updateCount % this.maxBatchSize) == 0) 323 { 324 updateStatement.executeBatch(); 325 updateStatement.clearBatch(); 326 } 327 } 328 } 329 330 if (hasMoreActiveResults && (compare <= 0)) 331 { 332 hasMoreActiveResults = sourceResultSet.next(); 333 } 334 335 if (hasMoreInactiveResults && (compare >= 0)) 336 { 337 hasMoreInactiveResults = inactiveResultSet.next(); 338 } 339 } 340 341 if ((deleteCount % this.maxBatchSize) > 0) 342 { 343 deleteStatement.executeBatch(); 344 } 345 346 deleteStatement.close(); 347 348 if ((insertCount % this.maxBatchSize) > 0) 349 { 350 insertStatement.executeBatch(); 351 } 352 353 insertStatement.close(); 354 355 if (updateStatement != null) 356 { 357 if ((updateCount % this.maxBatchSize) > 0) 358 { 359 updateStatement.executeBatch(); 360 } 361 362 updateStatement.close(); 363 } 364 365 targetStatement.close(); 366 sourceStatement.close(); 367 368 targetConnection.commit(); 369 370 logger.info(Messages.getMessage(Messages.INSERT_COUNT, insertCount, tableName)); 371 logger.info(Messages.getMessage(Messages.UPDATE_COUNT, updateCount, tableName)); 372 logger.info(Messages.getMessage(Messages.DELETE_COUNT, deleteCount, tableName)); 373 } 374 } 375 catch (ExecutionException e) 376 { 377 SynchronizationSupport.rollback(targetConnection); 378 379 throw SQLExceptionFactory.createSQLException(e.getCause()); 380 } 381 catch (InterruptedException e) 382 { 383 SynchronizationSupport.rollback(targetConnection); 384 385 throw SQLExceptionFactory.createSQLException(e.getCause()); 386 } 387 catch (SQLException e) 388 { 389 SynchronizationSupport.rollback(targetConnection); 390 391 throw e; 392 } 393 394 targetConnection.setAutoCommit(true); 395 396 SynchronizationSupport.restoreUniqueConstraints(context); 397 SynchronizationSupport.restoreForeignKeys(context); 398 399 SynchronizationSupport.synchronizeIdentityColumns(context); 400 SynchronizationSupport.synchronizeSequences(context); 401 402 targetConnection.setAutoCommit(autoCommit); 403 } 404 405 private boolean equals(Object object1, Object object2) 406 { 407 if ((object1 instanceof byte[]) && (object2 instanceof byte[])) 408 { 409 byte[] bytes1 = (byte[]) object1; 410 byte[] bytes2 = (byte[]) object2; 411 412 if (bytes1.length != bytes2.length) 413 { 414 return false; 415 } 416 417 return Arrays.equals(bytes1, bytes2); 418 } 419 420 return object1.equals(object2); 421 } 422 423 @SuppressWarnings("unchecked") 424 private int compare(Object object1, Object object2) 425 { 426 return ((Comparable) object1).compareTo(object2); 427 } 428 429 /** 430 * @return the fetchSize. 431 */ 432 public int getFetchSize() 433 { 434 return this.fetchSize; 435 } 436 437 /** 438 * @param fetchSize the fetchSize to set. 439 */ 440 public void setFetchSize(int fetchSize) 441 { 442 this.fetchSize = fetchSize; 443 } 444 445 /** 446 * @return Returns the maxBatchSize. 447 */ 448 public int getMaxBatchSize() 449 { 450 return this.maxBatchSize; 451 } 452 453 /** 454 * @param maxBatchSize The maxBatchSize to set. 455 */ 456 public void setMaxBatchSize(int maxBatchSize) 457 { 458 this.maxBatchSize = maxBatchSize; 459 } 460}