Spring-事务的源码分析(七)
都知道事务是通过spring AOP来实现的;前面文章已经分析过AOP的实现原理了,而事务只不过是AOP中的一个增强器而已;所以接下来将分析一下事务增强器的原理。
spring的核心就是IC依赖注入,那么就要先解析依赖配置,然后再注入。所以spring的功能都会出现两块,一块是解析mxl,一块是构建BeanDefinition。
事务增强器也是这样,先要解析事务的标签,然后才是执行事务。
解析tx事务标签
来看一下我们在xml中定义的事务标签是在那里被解析的?
public class TxNamespaceHandler extends NamespaceHandlerSupport { static final String TRANSACTION_MANAGER_ATTRIBUTE = "transaction-manager"; static final String DEFAULT_TRANSACTION_MANAGER_BEAN_NAME = "transactionManager"; static String getTransactionManagerName(Element element) { return (element.hasAttribute(TRANSACTION_MANAGER_ATTRIBUTE) ? element.getAttribute(TRANSACTION_MANAGER_ATTRIBUTE) : DEFAULT_TRANSACTION_MANAGER_BEAN_NAME); } @Override public void init() { registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser()); registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser()); } }
从代码中可以看出,事务的开始一切都从:AnnotationDrivenBeanDefinitionParser 类的parse开始了。
开始解析xml标签了
/** * 解析annotation-driven开头的标签--> {@code <tx:annotation-driven/>} tag. Will * {@link AopNamespaceUtils#registerAutoProxyCreatorIfNecessary register an AutoProxyCreator} * with the container as necessary. */ @Override public BeanDefinition parse(Element element, ParserContext parserContext) { registerTransactionalEventListenerFactory(parserContext); String mode = element.getAttribute("mode"); if ("aspectj".equals(mode)) { // mode="aspectj" registerTransactionAspect(element, parserContext); } else { // mode="proxy" AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext); } return null; }
解析tx事务的标签的核心全部在InfrastructureAdvisorAutoProxyCreator这个里面,
InfrastructureAdvisorAutoProxyCreator -> AbstractAdvisorAutoProxyCreator -> AbstractAutoProxyCreator
所有的解析工作大部分都在AbstractAutoProxyCreator类里面解析完成的。前面已经分析了aop时已经分析完毕了,所以在这里就不继续分析了。
执行事务增强器
当判断某个bean适用于事务增强时,也就是用于增强器BeanFactoryTransactionAttributeSourceAdvisor这个类,所以在自定义标签解析时,注入的类成为了整个事务功能的基础。
BeanFactoryTransactionAttributeSourceAdvisor作为Advisor的实现类,需要遵从Advisor的处理方式,当代理被调用时会调用这个类的增强方法,也就是此bean的Advise,又因为在事务标签解析时我们把TransactionInterceptor类型的bean注入到了BeanFactoryTransactionAttributeSourceAdvisor中,所以在调用事务增强器增强的代理类时会先执行TransactionInterceptor进行增强,也就是执行TransactionInterceptor中的invoke方法完成整个事务的逻辑。
时序图:
从时序图中可以看出:
- 解析注解事务中的属性;
- 根据属性设置创建事务;
- 在当前线程中执行用户真正的代码;
- 如果执行过程中有异常则执行回滚逻辑;
- 如果没有异常,则提交事务。 在前面讲解事务时说到过,spring事务是基于数据库本身的事务的,从时序图中也可以看出,创建事务,回滚事务,提交事务,操作都需要去操作数据库连接类DataSourceTransactionManager。
创建事务
看着时序图已经可以往下走了,下面将展示源码中个人觉得重要的点,其他不重要的由于篇幅问题就不帖出来了。
第一步:整个事务执行框架
TransactionInterceptor中的invoke执行了TransactionAspectSupport的invokeWithinTransaction方法。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { //获取事务属性,如果没有属性,则该方法没有事务 ** getTransactionAttribute重点** final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr); //获取beanFactory中的事务管理器 final String joinpointIdentification = methodIdentification(method, targetClass); //声明式事务处理 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { //创建事务 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // 执行被增强的方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 异常回滚 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //清除信息 cleanupTransactionInfo(txInfo); } //提交事务 commitTransactionAfterReturning(txInfo); return retVal; } else { // 编程式事务处理It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. ....... ....... } }
可以看到整个事务在上面的方法中已经全部包含了,创建事务,执行代码,回滚,提交。spring把具体操作又拆分到各个业务类中了。
第二步:获取事务注解属性
就是解析方法上面的:@Transactional(propagation = Propagation.REQUIRES_NEW)这个里面的属性设置。
跟着上面的时序图直接来到parseTransactionAnnotation这个方法中看看。
//开始解析各种事务的标签 protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) { RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute(); Propagation propagation = attributes.getEnum("propagation"); rbta.setPropagationBehavior(propagation.value()); Isolation isolation = attributes.getEnum("isolation"); rbta.setIsolationLevel(isolation.value()); rbta.setTimeout(attributes.getNumber("timeout").intValue()); rbta.setReadOnly(attributes.getBoolean("readOnly")); rbta.setQualifier(attributes.getString("value")); ArrayList<RollbackRuleAttribute> rollBackRules = new ArrayList<RollbackRuleAttribute>(); Class<?>[] rbf = attributes.getClassArray("rollbackFor"); for (Class<?> rbRule : rbf) { RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule); rollBackRules.add(rule); } String[] rbfc = attributes.getStringArray("rollbackForClassName"); for (String rbRule : rbfc) { RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule); rollBackRules.add(rule); } Class<?>[] nrbf = attributes.getClassArray("noRollbackFor"); for (Class<?> rbRule : nrbf) { NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule); rollBackRules.add(rule); } String[] nrbfc = attributes.getStringArray("noRollbackForClassName"); for (String rbRule : nrbfc) { NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule); rollBackRules.add(rule); } rbta.getRollbackRules().addAll(rollBackRules); return rbta; }
第三步:根据上面获取的属性,创建事务
TransactionAspectSupport.createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { //获取事务状态信息,**getTransaction重要**包活设置一些事务的传播性 status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } //准备事务信息**prepareTransactionInfo** return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
设置事务的状态
AbstractPlatformTransactionManager.getTransaction
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction(); //获取事务**doGetTransaction** // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } //判断当时是否存在事务,判断依据为当前线程记录的连接不为空且连接中的事务属性不为空 if (isExistingTransaction(transaction)) { // 当前线程已经存在事务 **handleExistingTransaction** Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } // 事务超时 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // 当前不存在事务,但是事务类型为:PROPAGATION_MANDATORY的时候会抛一个异常 -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name[" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); //**doBegin**构造事务,包括设置connectionHolder,隔离级别,timeout,如果是新连接,绑定到当前线程 prepareSynchronization(status, definition); //新同步事务的设置,针对当前线程的设置,这里用到了ThreadLocal哦。 return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
我们先抛开事务中已经存在事务的判断,先来继续创建事务。
走到了:doBegin。
进入到了DataSourceTransactionManager这个的doBegin
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = this.dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); //设置事务的隔离级别**prepareConnectionForTransaction** Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). if (con.getAutoCommit()) { //数据库的事务为自定提交,则进入 txObject.setMustRestoreAutoCommit(true);//改为spring控制提交 if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); //取消数据库的自动提交 } //设置当前线程有事务,后续进来的线程判断的时候就可以按照这个来做判断了 txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the session holder to the thread. if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
看到了怎么创建事务了嘛啊?就是连接数据库,然后更改当前数据库的连接的事务提交为false,当前执行了sql之后就不会自动提交了。
接下来进入到:
AbstractPlatformTransactionManager.prepareSynchronization
/** * Initialize transaction synchronization as appropriate.,将当前事务记录到ThreadLocal中 */ protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
这个方法里面主要是对:新同步事务的设置,针对当前线程的设置,这里用到了ThreadLocal哦。把刚建立好的事务信息存储到ThreadLocal里面。(这也就是下面在一个事务中开启多线程之后,多线程里面的数据库操作不在一个事务里面的原因了)
到这里,总算创建完毕一个事务了,整个过程就那么多了。
事务中已经存在事务(事务的传播属性)
如果当前事务中已经存在事务了,这就是事务的传播属性了。
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { //可以看出PROPAGATION_NEVER这个事务类型,如果当前有事务,直接抛异常,比较简单粗暴。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } //忽略当前事务,以非事务方式执行操作,如果当前存在事务就把当前事务挂起,执行完后恢复事务(忽略当前事务) if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } //新事务处理 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); //将原事务挂起 try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); //连接数据库,设置当前线程连接数据库的信息 prepareSynchronization(status, definition); //将一些事务信息绑定到threadLocal线程中 //这两句代码和创建事务时的代码一样,说明开启一个新事务,相当于开启一个新的数据库连接 return status; } catch (RuntimeException beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } catch (Error beginErr) { resumeAfterBeginException(transaction, suspendedResources, beginErr); throw beginErr; } } //嵌套事务处理 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // 创建一个保存点 // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); //创建一个保存点 return status; } else { // 如果不能建立保存点,则创建一个新事务吧,和上面的创建一个新事务一样的处理。 // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
回滚事务
在看事务回滚之前先来看一段代码:
try { // This is an around advice: Invoke the next interceptor in the chain. // 1.执行被增强的方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 2.异常回滚 completeTransactionAfterThrowing(txInfo, ex); throw ex; }
try里面是执行我们的业务代码,就是说,只要业务代码里面有异常,都会出发回滚机制,这里只是说回滚机制哦!
TransactionAspectSupport. completeTransactionAfterThrowing
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.hasTransaction()) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } //判断异常是否回滚。DefaultTransactionAttribute的rollbackOn里面这个里面只有RuntimeException与error才会发生回滚 if (txInfo.transactionAttribute.rollbackOn(ex)) { try { //开始回滚了 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } catch (Error err) { logger.error("Application exception overridden by rollback error", ex); throw err; } } else { // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackOnly() is true. try { //如果没有经过上面的回滚,那么还是会被执行提交的哦 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } catch (Error err) { logger.error("Application exception overridden by commit error", ex); throw err; } } } }
rollbackOn()里面是这样写的:
return (ex instanceof RuntimeException || ex instanceof Error);
所以默认情况下面只会回滚RuntimeException 和Error的异常,而Exception异常则不会被回滚。
回滚:AbstractPlatformTransactionManager.rollback -> processRollback
private void processRollback(DefaultTransactionStatus status) { try { try { triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } //如果有保存点,当前事务为单独的线程则回退到保存点 **rollbackToHeldSavepoint** status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } //如果当前事务为独立的新事务,则直接回退 doRollback(status); //直接执行数据库的callback回滚 } else if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } //如果当前事务不是独立的事务,那么只能标记状态,等待事务链执行完毕后一起回滚 doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } } catch (RuntimeException ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } catch (Error err) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw err; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); } finally { //清空前面记录的资源,并将挂起的线程恢复,前面有挂起,这里有恢复哦! cleanupAfterCompletion(status); } }
1.如果有保存点,则直接回滚到保存点
public void rollbackToHeldSavepoint() throws TransactionException { if (!hasSavepoint()) { throw new TransactionUsageException( "Cannot roll back to savepoint - no savepoint associated with current transaction"); } getSavepointManager().rollbackToSavepoint(getSavepoint()); //**rollbackToSavepoint**数据库保存点回滚 getSavepointManager().releaseSavepoint(getSavepoint()); setSavepoint(null); }
JdbcTransactionObjectSupport.rollbackToSavepoint
@Override public void rollbackToSavepoint(Object savepoint) throws TransactionException { ConnectionHolder conHolder = getConnectionHolderForSavepoint(); try { conHolder.getConnection().rollback((Savepoint) savepoint); } catch (Throwable ex) { throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex); } }
2.回滚数据,还是的靠操作数据库,所以执行的是DataSourceTransactionManager.doRollback;
@Override protected void doRollback(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { con.rollback(); } catch (SQLException ex) { throw new TransactionSystemException("Could not roll back JDBC transaction", ex); } }
提交事务
AbstractPlatformTransactionManager.processCommit
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } //如果存在保存点,则清除保存点信息 status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } //独立事务则直接提交 doCommit(status); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } //提交过程中出现异常回滚 doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } }
同样的提交事务也得靠数据库来操作
DataSourceTransactionManager.doCommit
protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { con.commit(); } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } }
事务开启多线程后无效原因
上面源码分析了那么多,来聊聊工作中遇到的在事务中开启多线程后,事务无效的问题。
示例:
public static void main(String args[]){ @Transactional public void serviceHI() { userInfoExtendService.serviceI(); try { new Thread(new Runnable() { public void run() { userInfoExtendService.serviceH(); } }).start(); } catch (Exception e) { e.printStackTrace(); } userInfoExtendService.serviceJ(); } }
执行serviceH的时候被放入多线程里面了额,那么serviceH里面是否抛异常都和外面的serviceI,serviceJ()无关了。
相当于serviceH和serviceI,serviceJ()不在一个事务里面了。加入serviceJ抛异常了,serviceI和serviceJ会被一起回滚,而serviceH则不会。
为什么呢?
看源码
AbstractPlatformTransactionManager.prepareSynchronization /** * Initialize transaction synchronization as appropriate.,将当前事务记录到ThreadLocal中 */ protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
来看看这个TransactionSynchronizationManager类里面定义的成员变量
public abstract class TransactionSynchronizationManager { private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class); private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources"); private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations"); private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<String>("Current transaction name"); private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<Boolean>("Current transaction read-only status"); private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<Integer>("Current transaction isolation level"); private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<Boolean>("Actual transaction active"); ..... }
spring与数据库建立连接,每个线程都会有一个自己的连接,而这些连接的状态和连接属性,全部被存放到了ThreadLocal里面了,都知道ThreadLoacl的key是使用当前线程的值来作为key值的。
上面的代码中:main方法启动默认就是一个线程,serviceI,serviceJ都在这个主线程中,而现在searchH单独开启了一个线程,serviceH就在自线程里面了。
所以在开启事务,回滚事务时,serviceI,serviceJ在同一个连接里面,而serviceH在另外一个连接里面。
事务说白了就是“绑架”一个数据库连接,更改数据库的自动提交功能,使用手动提交机制。spring把这一系列操作属性及数据库连接放到了ThreadLocal中,而Thread Local的key为“当前线程的值”,可以说一个线程一个连接一个事务,在一个事务中开启多线程后,多线程就不在当前事务中了,所以serviceH在serviceHI这个的事务中是无效的。
事务注意事项
- @Transactional 只能被应用到public方法上, 对于其它非public的方法,如果标记了@Transactional也不会报错,但方法没有事务功能.
- 用 spring 事务管理器,由spring来负责数据库的打开,提交,回滚.默认遇到运行期例外(throw new RuntimeException("注释");)会回滚,即遇到不受检查(unchecked)的例外时回滚;而遇到需要捕获的例外(throw new Exception("注释");)不会回滚,即遇到受检查的例外(就是非运行时抛出的异常,编译器会检查到的异常叫受检查例外或说受检查异常)时,需我们指定方式来让事务回滚 要想所有异常都回滚,要加上
@Transactional( rollbackFor={Exception.class,其它异常}) .如果让unchecked例外不回滚: @Transactional(notRollbackFor=RunTimeException.class) 如下: @Transactional(rollbackFor=Exception.class) //指定回滚,遇到异常Exception时回滚 public void methodName() { throw new Exception("注释"); } @Transactional(noRollbackFor=Exception.class)//指定不回滚,遇到运行期例外(throw new RuntimeException("注释");)会回滚 public ItimDaoImpl getItemDaoImpl() { throw new RuntimeException("注释"); }
- Spring团队的建议是你在具体的类(或类的方法)上使用 @Transactional 注解,而不要使用在类所要实现的任何接口上。你当然可以在接口上使用 @Transactional 注解,但是这将只能当你设置了基于接口的代理时它才生效。因为注解是 不能继承 的,这就意味着如果你正在使用基于类的代理时,那么事务的设置将不能被基于类的代理所识别,而且对象也将不会被事务代理所包装(将被确认为严重的)。因 此,请接受Spring团队的建议并且在具体的类上使用 @Transactional 注解。
- @Transactional 注解可以被应用于接口定义和接口方法、类定义和类的 public 方法上。然而,请注意仅仅 @Transactional 注解的出现不足于开启事务行为,它仅仅 是一种元数据,能够被可以识别 @Transactional 注解和上述的配置适当的具有事务行为的beans所使用。上面的例子中,其实正是 tx:annotation-driven/元素的出现 开启 了事务行为。
参考
《Spring源码深度解析》书
您好,请问第二张图(函数调用的关系图)是用什么软件画的呀?还是手画的?