@Test @SneakyThrows publicvoidtestTransaction(){ try (Connection conn = DriverManager.getConnection(connectString)) { conn.setAutoCommit(false); try (PreparedStatement psts = conn.prepareStatement("update words set word=CONCAT(word, '++') where id=?")) { // 第一个更新语句 psts.setInt(1, 2); psts.executeUpdate(); // 第二个更新语句 // 抛出异常 int i = 1/0; psts.setInt(1, 3); psts.executeUpdate(); // 提交事务 conn.commit(); } catch (Throwable t) { conn.rollback(); }
} }
结果:
1 2 3 4 5 6 7 8
2021-03-23T03:07:15.228849Z 93 Connect root@localhost on test using TCP/IP 2021-03-23T03:07:15.235215Z 93 Query /* mysql-connector-java-8.0.20 (Revision: afc0a13cd3c5a0bf57eaa809ee0ee6df1fd5ac9b) */SELECT @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@collation_server AS collation_server, @@collation_connection AS collation_connection, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_write_timeout AS net_write_timeout, @@performance_schema AS performance_schema, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@transaction_isolation AS transaction_isolation, @@wait_timeout AS wait_timeout 2021-03-23T03:07:15.257727Z 93 Query SET character_set_results = NULL 2021-03-23T03:07:15.261596Z 93 Query SET autocommit=0 2021-03-23T03:07:15.296076Z 93 Query update words set word=CONCAT(word, '++') where id=2 2021-03-23T03:07:15.305666Z 93 Query rollback 2021-03-23T03:07:15.330138Z 93 Query rollback 2021-03-23T03:07:15.347768Z 93 Quit
This is the central interface in Spring’s transaction infrastructure. Applications can use this directly, but it is not primarily meant as API: Typically, applications will work with either TransactionTemplate or declarative transaction demarcation through AOP.
/** * Return a currently active transaction or create a new one, according to * the specified propagation behavior. * <p>Note that parameters like isolation level or timeout will only be applied * to new transactions, and thus be ignored when participating in active ones. * <p>Furthermore, not all transaction definition settings will be supported * by every transaction manager: A proper transaction manager implementation * should throw an exception when unsupported settings are encountered. * <p>An exception to the above rule is the read-only flag, which should be * ignored if no explicit read-only mode is supported. Essentially, the * read-only flag is just a hint for potential optimization. * @param definition TransactionDefinition instance (can be {@code null} for defaults), * describing propagation behavior, isolation level, timeout etc. * @return transaction status object representing the new or current transaction * @throws TransactionException in case of lookup, creation, or system errors * @throws IllegalTransactionStateException if the given transaction definition * cannot be executed (for example, if a currently active transaction is in * conflict with the specified propagation behavior) * @see TransactionDefinition#getPropagationBehavior * @see TransactionDefinition#getIsolationLevel * @see TransactionDefinition#getTimeout * @see TransactionDefinition#isReadOnly */ TransactionStatus getTransaction(TransactionDefinition definition)throws TransactionException;
/** * Commit the given transaction, with regard to its status. If the transaction * has been marked rollback-only programmatically, perform a rollback. * <p>If the transaction wasn't a new one, omit the commit for proper * participation in the surrounding transaction. If a previous transaction * has been suspended to be able to create a new one, resume the previous * transaction after committing the new one. * <p>Note that when the commit call completes, no matter if normally or * throwing an exception, the transaction must be fully completed and * cleaned up. No rollback call should be expected in such a case. * <p>If this method throws an exception other than a TransactionException, * then some before-commit error caused the commit attempt to fail. For * example, an O/R Mapping tool might have tried to flush changes to the * database right before commit, with the resulting DataAccessException * causing the transaction to fail. The original exception will be * propagated to the caller of this commit method in such a case. * @param status object returned by the {@code getTransaction} method * @throws UnexpectedRollbackException in case of an unexpected rollback * that the transaction coordinator initiated * @throws HeuristicCompletionException in case of a transaction failure * caused by a heuristic decision on the side of the transaction coordinator * @throws TransactionSystemException in case of commit or system errors * (typically caused by fundamental resource failures) * @throws IllegalTransactionStateException if the given transaction * is already completed (that is, committed or rolled back) * @see TransactionStatus#setRollbackOnly */ voidcommit(TransactionStatus status)throws TransactionException;
/** * Perform a rollback of the given transaction. * <p>If the transaction wasn't a new one, just set it rollback-only for proper * participation in the surrounding transaction. If a previous transaction * has been suspended to be able to create a new one, resume the previous * transaction after rolling back the new one. * <p><b>Do not call rollback on a transaction if commit threw an exception.</b> * The transaction will already have been completed and cleaned up when commit * returns, even in case of a commit exception. Consequently, a rollback call * after commit failure will lead to an IllegalTransactionStateException. * @param status object returned by the {@code getTransaction} method * @throws TransactionSystemException in case of rollback or system errors * (typically caused by fundamental resource failures) * @throws IllegalTransactionStateException if the given transaction * is already completed (that is, committed or rolled back) */ voidrollback(TransactionStatus status)throws TransactionException;
// org.springframework.transaction.support.AbstractPlatformTransactionManager#commit /** * This implementation of commit handles participating in existing * transactions and programmatic rollback requests. * Delegates to {@code isRollbackOnly}, {@code doCommit} * and {@code rollback}. * @see org.springframework.transaction.TransactionStatus#isRollbackOnly() * @see #doCommit * @see #rollback */ @Override publicfinalvoidcommit(TransactionStatus status)throws TransactionException { if (status.isCompleted()) { thrownew IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); }
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus); // Throw UnexpectedRollbackException only at outermost transaction boundary // or if explicitly asked to. if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { thrownew UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } return; }
processCommit(defStatus); }
// org.springframework.transaction.support.AbstractPlatformTransactionManager#processCommit /** * Process an actual commit. * Rollback-only flags have already been checked and applied. * @param status object representing the transaction * @throws TransactionException in case of commit failure */ privatevoidprocessCommit(DefaultTransactionStatus status)throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); // 下面的几个方法会通知之前注册的TransactionSynchronization,告知事务的状态 triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } status.releaseHeldSavepoint(); } elseif (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } // template method // 子类负责实现 doCommit(status); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (globalRollbackOnly) { thrownew UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit // 通知回调函数 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { // 通知回调函数 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { // 通知回调函数 triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; }
// Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); }
// org.springframework.transaction.support.AbstractPlatformTransactionManager#rollback /** * This implementation of rollback handles participating in existing * transactions. Delegates to {@code doRollback} and * {@code doSetRollbackOnly}. * @see #doRollback * @see #doSetRollbackOnly */ @Override publicfinalvoidrollback(TransactionStatus status)throws TransactionException { if (status.isCompleted()) { thrownew IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); }
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). // 连接置为手动commit if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } // 标记事务开始 txObject.getConnectionHolder().setTransactionActive(true);
// 事务超时时间 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); }
// Bind the session holder to the thread. // 新创建的连接,需要交给spring管理(ThreadLocal) // DataSource --> ConnectionHolder if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } }
catch (Throwable ex) { // 异常时释放连接 if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } thrownew CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
提交事务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit @Override protectedvoiddoCommit(DefaultTransactionStatus status){ DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { con.commit(); } catch (SQLException ex) { thrownew TransactionSystemException("Could not commit JDBC transaction", ex); } }
回滚事务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@Override protectedvoiddoRollback(DefaultTransactionStatus status){ DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { con.rollback(); } catch (SQLException ex) { thrownew TransactionSystemException("Could not roll back JDBC transaction", ex); } }
// Remove the connection holder from the thread, if exposed. // 清理ThreadLocal里绑定的连接 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(this.dataSource); }
// Reset connection. // 把conn 恢复原样 Connection con = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); }
if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing JDBC Connection [" + con + "] after transaction"); } // 减少Conn计数 DataSourceUtils.releaseConnection(con, this.dataSource); } // 清空holder txObject.getConnectionHolder().clear(); }
// org.springframework.transaction.support.TransactionSynchronizationManager#bindResource /** * Bind the given resource for the given key to the current thread. * @param key the key to bind the value to (usually the resource factory) * @param value the value to bind (usually the active resource object) * @throws IllegalStateException if there is already a value bound to the thread * @see ResourceTransactionManager#getResourceFactory() */ publicstaticvoidbindResource(Object key, Object value)throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map<Object, Object> map = resources.get(); // set ThreadLocal Map if none found if (map == null) { map = new HashMap<Object, Object>(); resources.set(map); } Object oldValue = map.put(actualKey, value); // Transparently suppress a ResourceHolder that was marked as void... if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null; } // 事务挂起的时候要清理绑定的资源,不然开启新事务时,同一个dataSource会抛异常 if (oldValue != null) { thrownew IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } if (logger.isTraceEnabled()) { logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]"); } }
/** * Unbind a resource for the given key from the current thread. * @param key the key to unbind (usually the resource factory) * @return the previously bound value (usually the active resource object) * @throws IllegalStateException if there is no value bound to the thread * @see ResourceTransactionManager#getResourceFactory() */ publicstatic Object unbindResource(Object key)throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doUnbindResource(actualKey); if (value == null) { thrownew IllegalStateException( "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; }
/** * Increase the reference count by one because the holder has been requested * (i.e. someone requested the resource held by it). */ publicvoidrequested(){ this.referenceCount++; }
/** * Decrease the reference count by one because the holder has been released * (i.e. someone released the resource held by it). */ publicvoidreleased(){ this.referenceCount--; }
/** * Return whether there are still open references to this holder. */ publicbooleanisOpen(){ return (this.referenceCount > 0); }
// org.springframework.jdbc.datasource.DataSourceUtils#doGetConnection if (TransactionSynchronizationManager.isSynchronizationActive()) { logger.debug("Registering transaction synchronization for JDBC Connection"); // Use same Connection for further JDBC actions within the transaction. // Thread-bound object will get removed by synchronization at transaction completion. ConnectionHolder holderToUse = conHolder; if (holderToUse == null) { holderToUse = new ConnectionHolder(con); } else { holderToUse.setConnection(con); } // 计数加一 holderToUse.requested(); TransactionSynchronizationManager.registerSynchronization( new ConnectionSynchronization(holderToUse, dataSource)); holderToUse.setSynchronizedWithTransaction(true); if (holderToUse != conHolder) { TransactionSynchronizationManager.bindResource(dataSource, holderToUse); } }