基于tcc分布式事务解决方案,tcc事务模型

  基于tcc分布式事务解决方案,tcc事务模型

  00-1010前言片段以电商订单为例。订单服务:库存服务:支付服务:hmily交易框架是如何工作的?hmiltransactioninterceptor接口的实现dubbo的aspect抽象实现dubbo的hmiltransactioninterceptor实现启动事务处理器处理逻辑如下:你要注意三个局部参数;文章结尾的交易处理器结论

  00-1010楼主之前推荐2pc分布式交易框架LCN。今天就来详细说说TCC交易协议。

  我们先来了解一下什么是tcc,如下图。

  Tcc分布式事务协议将整个业务事务分为三个阶段。

  Try:执行业务逻辑。

  确认:确认业务逻辑执行正确后,确认业务逻辑执行完成。

  取消:如果在尝试阶段出现问题,执行取消阶段逻辑来取消尝试阶段的数据。

  这就要求我们在设计业务时,更多地考虑业务处理在尝试阶段的折中状态,例如,处理、支付、媒介、确认阶段完成的处理,或者取消阶段处理失败。

  00-1010假设我们有一个电子商务订购业务,它由三个服务组成:订单服务处理订购逻辑、库存服务处理减去库存逻辑、支付服务处理减去账户余额逻辑。在订货服务中,先后调用减少库存和减少余额的方法。如果使用tcc分布式事务来协调事务,我们的服务应该设计如下:

  

目录

尝试:付款状态设置为付款确认中:设置为付款完成取消:设置为付款失败。

 

  00-1010增加锁定存货的字段记录,记录业务处理的状态。

  Try:总库存-1,锁定库存1确认:锁定库存-1取消:总库存1,锁定库存-1

  00-1010增加冻结金额字段记录,记录业务处理状态。

  Try: balance -1,冻结金额1confirm:冻结金额-1cancel: balance 1,冻结金额-1tcc分布式事务在这里扮演了事务协调者的角色。真正的业务只需要调用try阶段的方法。tcc框架调用确认和取消阶段的金额方法来帮助我们完成最终的业务逻辑。让我们假设以下三种场景中的业务情况,看看tcc如何协调业务的最终一致性。

  所有服务正常:所有服务的try方法执行后没有问题,库存充足,余额充足。Tcc交易协调器将触发订单服务的确认方法,将订单更新为支付完成,触发库存服务的确认方法以锁定库存-1,触发支付服务的确认方法以冻结金额-1。库存服务出现故障,无法调用:此时订单已经生成,状态为待支付。当调用库存超时异常时,tcc事务协调器将触发订单服务的cancel方法,将订单状态更新为付款失败。支付失败,无法调用:此时订单已经生成,状态为待支付,总库存-1,锁定库存-1。当调用支付服务超时并抛出异常时,tcc事务协调器将触发订单服务的cancel方法将订单状态更新为支付失败,并触发库存服务的cancel方法锁定库存1和库存-1。

  00-1010通过上面的tcc事务协议,说明大家应该知道tcc的处理和协调机制。我们来看看hmily是怎么做到的。我们以access支持dubbo服务为例。

  总结:首先,最基本的两个应用点是aop和dubbo的过滤机制。其次,对于一组事务,定义起始事务处理器,参与事务处理器用于协调和处理不同的事务单元。添加中断或预定服务事务日志,以补偿失败的事务。

  Hmily框架以@Hmily标注为起点,围绕编织定义了一个切面。该注释需要两个参数confirmMethod和cancelMethod,它们是tcc协调的两阶段方法。将此注释添加到需要tcc事务的方法也将托管tcc的三阶段处理流程。下面是aspect方面的抽象类。不同的RPC框架

  

前言碎语

@Aspectpublic抽象类abstracthmiltransactionaspect { private hmiltransactioninterceptor hmiltransactioninterceptor;受保护的void sethmiltransactioninterceptor(fin

 

  al HmilyTransactionInterceptor hmilyTransactionInterceptor) { this.hmilyTransactionInterceptor = hmilyTransactionInterceptor; } /** * this is point cut with {@linkplain Hmily }. */ @Pointcut("@annotation(org.dromara.hmily.annotation.Hmily)") public void hmilyInterceptor() { } /** * this is around in {@linkplain Hmily }. * @param proceedingJoinPoint proceedingJoinPoint * @return Object * @throws Throwable Throwable */ @Around("hmilyInterceptor()") public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable { return hmilyTransactionInterceptor.interceptor(proceedingJoinPoint); } /** * spring Order. * * @return int */ public abstract int getOrder();}

 

  

dubbo的aspect抽象实现

@Aspect@Componentpublic class DubboHmilyTransactionAspect extends AbstractHmilyTransactionAspect implements Ordered { @Autowired public DubboHmilyTransactionAspect(final DubboHmilyTransactionInterceptor dubboHmilyTransactionInterceptor) { super.setHmilyTransactionInterceptor(dubboHmilyTransactionInterceptor); } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; }}

 

  

dubbo的HmilyTransactionInterceptor实现

@Componentpublic class DubboHmilyTransactionInterceptor implements HmilyTransactionInterceptor { private final HmilyTransactionAspectService hmilyTransactionAspectService; @Autowired public DubboHmilyTransactionInterceptor(final HmilyTransactionAspectService hmilyTransactionAspectService) { this.hmilyTransactionAspectService = hmilyTransactionAspectService; } @Overridepublic Object interceptor(final ProceedingJoinPoint pjp) throws Throwable { final String context = RpcContext.getContext().getAttachment(CommonConstant.HMILY_TRANSACTION_CONTEXT); HmilyTransactionContext hmilyTransactionContext; //判断dubbo上下文中是否携带了tcc事务,如果有就取出反序列化为事务上下文对象 if (StringUtils.isNoneBlank(context)) { hmilyTransactionContext = GsonUtils.getInstance().fromJson(context, HmilyTransactionContext.class); RpcContext.getContext().getAttachments().remove(CommonConstant.HMILY_TRANSACTION_CONTEXT); } else { //如果dubbo上下文中没有,就从当前上下文中获取。如果是事务发起者,这里其实也获取不到事务 hmilyTransactionContext = HmilyTransactionContextLocal.getInstance().get(); } return hmilyTransactionAspectService.invoke(hmilyTransactionContext, pjp);}}

这里主要判断了dubbo上下文中是否携带了tcc事务。如果没有就从当前线程上下文中获取,如果是事务的发起者,这里其实获取不到事务上下文对象的。在invoke里有个获取事务处理器的逻辑,如果事务上下文入参 为null,那么获取到的就是启动事务处理器。

 

  

 

  

启动事务处理器处理逻辑如下

public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable { System.err.println("StarterHmilyTransactionHandler"); Object returnValue; try { HmilyTransaction hmilyTransaction = hmilyTransactionExecutor.begin(point); try { //execute try returnValue = point.proceed(); hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode()); hmilyTransactionExecutor.updateStatus(hmilyTransaction); } catch (Throwable throwable) { //if exception ,execute cancel final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction(); executor.execute(() -> hmilyTransactionExecutor .cancel(currentTransaction)); throw throwable; } //execute confirm final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction(); executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction)); } finally { HmilyTransactionContextLocal.getInstance().remove(); hmilyTransactionExecutor.remove(); } return returnValue;}

真正业务处理方法,point.proceed();被try,catch包起来了,如果try里面的方法出现异常,就会走hmilyTransactionExecutor.cancel(currentTransaction)的逻辑,如果成功,就走hmilyTransactionExecutor.confirm(currentTransaction)逻辑。其中cancel和confirm里都有协调参与者事务的处理逻辑,以confirm逻辑为例。

 

  

public void confirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException { LogUtil.debug(LOGGER, () -> "tcc confirm .......!start"); if (Objects.isNull(currentTransaction) CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) { return; } currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode()); updateStatus(currentTransaction); final ListhmilyParticipants = currentTransaction.getHmilyParticipants(); ListfailList = Lists.newArrayListWithCapacity(hmilyParticipants.size()); boolean success = true; if (CollectionUtils.isNotEmpty(hmilyParticipants)) { for (HmilyParticipant hmilyParticipant : hmilyParticipants) { try { HmilyTransactionContext context = new HmilyTransactionContext(); context.setAction(HmilyActionEnum.CONFIRMING.getCode()); context.setRole(HmilyRoleEnum.START.getCode()); context.setTransId(hmilyParticipant.getTransId()); HmilyTransactionContextLocal.getInstance().set(context); executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation()); } catch (Exception e) { LogUtil.error(LOGGER, "execute confirm :{}", () -> e); success = false; failList.add(hmilyParticipant); } finally { HmilyTransactionContextLocal.getInstance().remove(); } } executeHandler(success, currentTransaction, failList); }}

可以看到executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation()),这里执行了事务参与者的confirm方法。同理cancel里面也有类似代码,执行事务参与者的cancel方法。那么事务参与者的信息是怎么获取到的呢?我们需要回到一开始提到的dubbo的filter机制。

 

  

@Activate(group = {Constants.SERVER_KEY, Constants.CONSUMER})public class DubboHmilyTransactionFilter implements Filter { private HmilyTransactionExecutor hmilyTransactionExecutor; /** * this is init by dubbo spi * set hmilyTransactionExecutor. * * @param hmilyTransactionExecutor {@linkplain HmilyTransactionExecutor } */ public void setHmilyTransactionExecutor(final HmilyTransactionExecutor hmilyTransactionExecutor) { this.hmilyTransactionExecutor = hmilyTransactionExecutor; } @Override @SuppressWarnings("unchecked") public Result invoke(final Invoker invoker, final Invocation invocation) throws RpcException { String methodName = invocation.getMethodName(); Class clazz = invoker.getInterface(); Class[] args = invocation.getParameterTypes(); final Object[] arguments = invocation.getArguments(); converterParamsClass(args, arguments); Method method = null; Hmily hmily = null; try { method = clazz.getMethod(methodName, args); hmily = method.getAnnotation(Hmily.class); } catch (NoSuchMethodException e) { e.printStackTrace(); } if (Objects.nonNull(hmily)) { try { final HmilyTransactionContext hmilyTransactionContext = HmilyTransactionContextLocal.getInstance().get(); if (Objects.nonNull(hmilyTransactionContext)) { if (hmilyTransactionContext.getRole() == HmilyRoleEnum.LOCAL.getCode()) { hmilyTransactionContext.setRole(HmilyRoleEnum.INLINE.getCode()); } RpcContext.getContext().setAttachment(CommonConstant.HMILY_TRANSACTION_CONTEXT, GsonUtils.getInstance().toJson(hmilyTransactionContext)); } final Result result = invoker.invoke(invocation); //if result has not exception if (!result.hasException()) { final HmilyParticipant hmilyParticipant = buildParticipant(hmilyTransactionContext, hmily, method, clazz, arguments, args); if (hmilyTransactionContext.getRole() == HmilyRoleEnum.INLINE.getCode()) { hmilyTransactionExecutor.registerByNested(hmilyTransactionContext.getTransId(), hmilyParticipant); } else { hmilyTransactionExecutor.enlistParticipant(hmilyParticipant); } } else { throw new HmilyRuntimeException("rpc invoke exception{}", result.getException()); } return result; } catch (RpcException e) { e.printStackTrace(); throw e; } } else { return invoker.invoke(invocation); } } @SuppressWarnings("unchecked") private HmilyParticipant buildParticipant(final HmilyTransactionContext hmilyTransactionContext, final Hmily hmily, final Method method, final Class clazz, final Object[] arguments, final Class... args) throws HmilyRuntimeException { if (Objects.isNull(hmilyTransactionContext) (HmilyActionEnum.TRYING.getCode() != hmilyTransactionContext.getAction())) { return null; } //获取协调方法 String confirmMethodName = hmily.confirmMethod(); if (StringUtils.isBlank(confirmMethodName)) { confirmMethodName = method.getName(); } String cancelMethodName = hmily.cancelMethod(); if (StringUtils.isBlank(cancelMethodName)) { cancelMethodName = method.getName(); } HmilyInvocation confirmInvocation = new HmilyInvocation(clazz, confirmMethodName, args, arguments); HmilyInvocation cancelInvocation = new HmilyInvocation(clazz, cancelMethodName, args, arguments); //封装调用点 return new HmilyParticipant(hmilyTransactionContext.getTransId(), confirmInvocation, cancelInvocation); } private void converterParamsClass(final Class[] args, final Object[] arguments) { if (arguments == null arguments.length < 1) { return; } for (int i = 0; i < arguments.length; i++) { args[i] = arguments[i].getClass(); } }}

 

  

需要注意三个地方

一个是filter的group定义@Activate(group = {Constants.SERVER_KEY, Constants.CONSUMER}),这里这样定义后,就只有服务的消费者会生效,也就是事务的发起者,服务的调用方会进filter的invoke逻辑。只有加@Hmily注解的方法或进事务处理逻辑,其他的方法直接跳过处理最关键的是buildParticipant(hmilyTransactionContext, hmily, method, clazz, arguments, args)方法。dubbo的filter唯一的作用就是收集事务参与者信息并更新当前事务上线文信息。那么在事务协调时就能够从当前事务上线文里面获取到需要协调的事务参与者信息了。

 

  

参数者事务处理器

public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable { HmilyTransaction hmilyTransaction = null; HmilyTransaction currentTransaction; switch (HmilyActionEnum.acquireByCode(context.getAction())) { case TRYING: try { hmilyTransaction = hmilyTransactionExecutor.beginParticipant(context, point); final Object proceed = point.proceed(); hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode()); //update log status to try hmilyTransactionExecutor.updateStatus(hmilyTransaction); return proceed; } catch (Throwable throwable) { //if exception ,delete log. hmilyTransactionExecutor.deleteTransaction(hmilyTransaction); throw throwable; } finally { HmilyTransactionContextLocal.getInstance().remove(); } case CONFIRMING: currentTransaction = HmilyTransactionCacheManager.getInstance().getTccTransaction(context.getTransId()); hmilyTransactionExecutor.confirm(currentTransaction); break; case CANCELING: currentTransaction = HmilyTransactionCacheManager.getInstance().getTccTransaction(context.getTransId()); hmilyTransactionExecutor.cancel(currentTransaction); break; default: break; } Method method = ((MethodSignature) (point.getSignature())).getMethod(); logger.error(HmilyActionEnum.acquireByCode(context.getAction()).getDesc()); return DefaultValueUtils.getDefaultValue(method.getReturnType());}

参与者事务处理器的逻辑比启动事务处理器要简单很多,try阶段记录事务日志用于事务补偿的时候使用。其他的confirm和cancel都是由启动事务管理器来触发调用执行的。这个地方之前纠结了楼主几个小时,怎么一个环绕织入的切面会被触发执行两次,其实是启动事务处理器里的confirm或cancel触发的。

 

  disruptor+ScheduledService处理事务日志,补偿处理失败的事务

  这个不细聊了,简述下。disruptor是一个高性能的队列。对事务日志落地的所有操作都是通过disruptor来异步完成的。ScheduledService默认128秒执行一次,来检查是否有处理失败的事务日志,用于补偿事务协调失败的事务

  

 

  

文末结语

相比较2pc的LCN而言,tcc分布式事务对业务侵入性更高。也因2pc的长时间占用事务资源,tcc的性能肯定比2pc要好。两者之间本身不存在谁优谁劣的问题。所以在做分布式事务选型时,选一个对的适合自身业务的分布式事务框架就比较重要了。

 

  以上就是tcc分布式事务框架体系解析的详细内容,更多关于tcc分布式事务框架的资料请关注盛行IT其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: