微服务

什么是微服务

微服务是一种去中心化的应用架构方案。相对于单体式应用来说,微服务应用具有耦合性低、扩展性高、更灵活、能更高效交付的特点。

从名称上看。微服务的“微”涵盖了以下几层含义:

  • 服务按功能进行一定粒度的拆分,每一块都有独立的职责。
  • 由于做了拆分,每一个微服务的开发都是独立进行的,因此这种架构的交付节奏可以更加灵活
  • 微服务应用的部署及应用都是隔离的,这保证了整个应用架构可以按需进行扩展

微服务基础设施

微服务架构本质上是一种面向服务的分布式系统,为了解决分布式所带来的一系列管理问题,微服务通常需要依赖一些基础设施来保证架构的完整性。

服务注册

在一个微服务集群中,由于服务的种类,实例数量由很多,仅通过人工配置的方式会加大工作量。而且这些服务实例的信息可能随时会发生变化,比如我们可能需要对某个服务做在线的扩容,或者因为故障处理而隔离某些节点。因此,需要由一个自动化的服务注册组件来完成这件事。服务注册通常需要记录当前可用的服务实例信息,并提供服务注册表API。服务的调用方可以通过API获得所需服务的实例信息,并实时订阅服务实例的变化。

通常服务注册的实现方式是心跳,即注册表于服务实例之间保持一个稳定的心跳检测,根据心跳的状态来判断服务实例是否存货。

服务发现

既然大量的微服务实例都记录到了服务注册表中,那么服务的调用方则应该通过服务发现组件来动态地可调用地服务实例信息。在微服务架构中,服务地发现有两种实现方式。

  1. 客户端发现

    客户端发现是指由调用方来完成目标服务实例信息地发现。

    image-20220528195417255

  2. 服务端发现

    服务端发现是一种代理式地架构,即服务器间调用统一使用负载均衡器来实现。这与客户端发现的差别在于:服务实例的发现由负载均衡器来完成,并且所有的微服务接口调用都由该组件来代理

    image-20220528195428061

    这种方式的好处是可以屏蔽被调用服务的一些内部细节,并增加一些公共的能力,比如接口鉴权、流量控制、日志记录等。但是弊端也很明显,由于所有接口调用都需要经过该负载均衡器,所以该组件很容易形成瓶颈,一旦负载均衡器故障将会产生全局的影响

    服务发现的实现方法无论是客户端发现还是服务器端发现,都离不开以下两点:

    • 依赖服务注册表组件来发现可用的服务
    • 提供包括,目标实例的路由,如何在多个实例中挑选合适的节点取决于路由的算法,常见的包括随机路由、轮询路由、动态压力路由。

API网关

API网关是外部系统接入微服务集群的唯一入口。我们可以将微服务架构看作一个整体,其内部的微服务职责划分、服务间的交互调用对于外部来说是不可见的。那么为了对外提供体验一致的访问接口,微服务需要一个统一的API网关,所有外部系统对微服务的调用都经过API网关组件。

API网关组件通常具备的功能包括但不限于:

  • 接入鉴权
  • 传输加密
  • 请求路由
  • 流量控制
  • 灰度发布

服务容错

在系统的节点实例变多后,实例故障的概率会增加。而且一旦故障发生,服务间的调用关系会导致故障大面积”传染“,通过人工进行实例故障隔离的方式效率较低,这就需要微服务能自动检测问题并自动做出应对。这种检测及应对能力通常由服务容错组件提供,一些手段如下:

  • 请求重试:在某些关键业务出现问题时,尝试进行请求的重试。
  • 流量控制:这需要先对系统的容错做出明确的规划,然后对读物实例上的流量进行实时监控,一旦发现超过阈值则拒绝请求,这样就可以避免整个系统全面瘫痪。
  • 服务熔断:根据一定的规则判断目标服务是否已经失效,规则的设计可以基于某个时间窗口的调用失败率进行计算,如果超过阈值则执行熔断(快速返回错误信息)

服务监控

对微服务实例保持足够的监控是非常重要的,而通常架构上需要对服务监控组件进行单独考虑。监控的目的是及时发现问题并采取一定的合理规避措施,以保证服务的SLA质量。通产在微服务监控服务中提供如下功能:

  • 业务日志采集:比如系统中用户注册、上下线等信息
  • 运行指标采集:比如CPU、内存占用、JVM堆内存大小,或某些接口流量等
  • 监控警告:对业务日志、运行指标信息进行分析、根据结果做出一定的判断和处理
  • 调用链路跟踪:用于业务流程在分布式调用中出现问题时提供定位手段,调用链路需要借助一些特定的技术实现,比如服务埋点、跟踪树等。

配置中心

传统的服务实例配置是通过本地文件(XML/YAML/PROPERTIES)实现的,比如数据库连接池的大小、接口请求流量的阈值等。对于配置的一些改动往往需要重新发布并重启服务,在存在实例时情况变得很不乐观。想象一下对于某个配置项的调整,你可能需要做几十次的发布动作。

通过将这些配置信息注册到统一的配置中心服务,微服务通过配置中心获取其所需要的配置,这样便免去了各种繁冗的发布工作。此外,如果服务实现了配置的动态感知及自动更新,则还可以实现各种平滑的动作。比如在数据库连接池的大小设置发生了变化时,实例可以自动感知而不需要重启。

接口调用

微服务架构推崇采用轻量化的接口调用方式,比如使用HTTP/REST。在项目实战中,我们还应该做出更统一的规范化定义,并形成公共的接口调用组件。这部分需要考虑的内容包括:

  • 数据的传输,比如HTTP还是TCP
  • 数据的编码,比如JSON还是XML,或是二进制
  • 数据的内容,比如是否采用固有的消息头定义
  • 数据的安全,如是否使用TLS/SSL实现加密,如何对接口权限进行校验等

容器化

以Docker为代表的容器技术是微服务的最佳组合。通过使用容器作为基础设施,微服务能够实现快速部署、快速迭代的目。Kubernetes是当今容器标准化平台代表,其提供了强大的容器生命周期管理功能,可用于部署、扩展和管理所有的微服务容器。对微服务的自治、敏捷管理来说,容器的无状态、弹性伸缩能力无疑是最契合的。

image-20220528205051510

CAP与BASE理论

CAP理论

CAP理论又被称为CAP理论,指的是一个分布式系统中,Consistency(一致性)、Availability(可用性)、Partition Tolerance(分区容错性),三者不可兼得,而最多只能同时拥有两者

image-20220528210623871

一致性(C):分布式系统中节点的数据,在同一时刻拥有同样的值。对于每一次读操作都能够读到最新写入的数据

可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求,即保持高可用

分区容忍性(P):在出现网络分区(中断)后,系统是否还能继续保持运作。分区相当于对于通信条件的要求,如果出现了分区的情况,则势必会影响数据的一致性,即同步出现时延。此时系统就必须在一致性和可用性上做出选择。

实际上,CAP理论中忽略了网络时延对于系统的影响,在现实中网络时延一定是真实存在的,也就是P一定是存在的。因此分布式系统如果选择了高可用(AP),那么就会造成访问节点之间的数据不一致(牺牲一致性)。如果选择了一致性(CP),那么必须淘汰数据的备用点,而只访问主节点(牺牲高可用性)。CA的场景是无法存在的,因为网络通信失败的情况一定会存在。

BASE理论

BASE理论可被看作是CAP理论的一个补充,主要来源对大规模互联网系统分布式实践的总结。该理论由以下几个短语组成(BASE)。

  • Basically Available(基本可用)
  • Soft State(软状态)
  • Eventually Consistent(最终一致性)

实质上,BASE是对于一致性和可用性进行权衡的结果,其主要思想是在系统无法实现强一致性的情况下,根据应用的业务特点来做出一些权衡及补充,并使系统达到最终一致性。在达到最终一致性之前,系统会处于一个中间状态,具备以下特征:

  1. 基本可用:即损失部分可用性,比如响应时间变长,或者部分服务被降级
  2. 软状态:数据会存在中间状态(不一致),但该状态不会影响系统的基本使用。

在经过一段时间之后,系统应该能达到最终真正一致的状态,比如数据复制经过一段时间后真正完成同步。

相比CAP理论来说,BASE理论将一致性分成了强一致性和弱一致性,并在充分考虑网络时延、系统吞吐量的情况下选择了一种基本可用(弱一致性)的处理思路,这无疑更加适用于现有分布式系统。

RPC概念

RPC(Remote Procedure Call) 即远程过程调用,通过名字我们就能看出 RPC 关注的是远程调用而非本地调用。

为什么需要RPC?

两个不同的服务器上的服务提供的方法不在一个内存空间,所以,需要通过网络编程才能传递方法调用所需要的参数。

通过 RPC 可以帮助我们调用远程计算机上某个服务的方法,这个过程就像调用本地方法一样简单。我们不需要了解底层网络编程的具体细节。

简单讲:RPC 的出现就是为了让你调用远程方法像调用本地方法一样简单

image-20220527213236293

RPC相关概念:

  1. 客户端(服务消费端) :调用远程方法的一端。
  2. 服务端(服务提供端) :提供远程方法的一端。
  3. 客户端 Stub(桩):代理类,把调用方法、类、方法参数传递到服务端
  4. 服务端 Stub(桩):接收到客户端执行方法的请求,返回给客户端的类
  5. 网络传输:提供两端的数据传输服务
    1. 实现方式:Socket、Netty

原理如下:

  1. 服务消费端(client)以本地调用的方式调用远程服务;
  2. 客户端 Stub(client stub) 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体(序列化):RpcRequest;
  3. 客户端 Stub(client stub) 找到远程服务的地址,并将消息发送到服务提供端;
  4. 服务端 Stub(桩)收到消息将消息反序列化为Java对象: RpcRequest;
  5. 服务端 Stub(桩)根据RpcRequest中的类、方法、方法参数等信息调用本地的方法;
  6. 服务端 Stub(桩)得到方法执行结果并将组装成能够进行网络传输的消息体:RpcResponse(序列化)发送至消费方;
  7. 客户端 Stub(client stub)接收到消息并将消息反序列化为Java对象:RpcResponse ,这样也就得到了最终结果。

RPC框架

  • RMI(JDK自带)

    JDK自带的RPC,有很多局限性,不推荐使用。

  • Dubbo

    Dubbo是 阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成。目前 Dubbo 已经成为 Spring Cloud Alibaba 中的官方组件。

  • gRPC

    gRPC是可以在任何环境中运行的现代开源高性能RPC框架。它可以通过可插拔的支持来有效地连接数据中心内和跨数据中心的服务,以实现负载平衡,跟踪,运行状况检查和身份验证。它也适用于分布式计算的最后一英里,以将设备,移动应用程序和浏览器连接到后端服务。

  • Hessian

    Hessian是一个轻量级的remoting on http工具,使用简单的方法提供了RMI的功能。 相比WebService,Hessian更简单、快捷。采用的是二进制RPC协议,因为采用的是二进制协议,所以它很适合于发送二进制数据

  • Thrift

    Apache Thrift是Facebook开源的跨语言的RPC通信框架,目前已经捐献给Apache基金会管理,由于其跨语言特性和出色的性能,在很多互联网公司得到应用,有能力的公司甚至会基于thrift研发一套分布式服务框架,增加诸如服务注册、服务发现等功能。

Spring Cloud

Eureka注册中心

服务调用关系

  • 服务提供者:暴露接口给其他微服务调用
  • 服务消费者:调用其他微服务的接口
  • 提供者和消费者是相对

eureka的作用

  • 消费者该如何获取服务提供者信息?
    • 服务提供者启动时向eureka注册自己的信息
    • eureka保存这些信息
    • 消费者根据服务名称向eureka拉取提供者信息
  • 如果有多个服务提供者,消费者如何选择?
    • 服务消费者利用负载均衡算法,从列表中挑选一个
  • 消费者如何感知服务提供者健康状态?
    • 服务提供者会每隔30秒向EurekaServer发送心跳请求,报告健康状态
    • eureka会更新记录服务列表信息,心跳不正常会被踢除

Ribbon负载均衡

image-20220527215852205

Nacos

Nacos服务分级存储模型

  • 一级是服务,例如userService
  • 二级是集群,例如杭州或上海
  • 三级是实例:

NacosRule负载均衡策略

  • 优先选择同集群服务实例列表
  • 本地集群找不到提供者,才去其他集群寻找,并且会报警告
  • 确定了可用实例列表后,再采用随机负载均衡挑选实例

实例的权重控制

  • Nacos控制台可以设置实例的权重值,0~1之间
  • 同集群内的多个实例,权重越高被访问的频率越高
  • 权重设置为0则完全不会被访问

Nacos环境隔离(namespace)

  • namespace用来做环境隔离
  • 每个namespace都有唯一id
  • 不同namespace下的服务不可见

Nacos与Eureka对比

共同点:

  • 都支持服务注册和服务拉取
  • 都支持服务提供者心跳方式做健康检测

区别:

  • Nacos支持服务端主动检测提供者状态:临时实例采用心跳模式,非临时实例采用主动检测模式
  • 临时实例心跳不正常会被剔除,非临时实例则不会被剔除
  • Nacos支持服务列表变更的消息推送模式,服务列表更新更及时
  • Nacos集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式;Eureka采用AP方式

Nacos配置管理

DataID命名规则:服务名称+环境.yaml(例如:userservice-dev.yaml)

image-20220528083411388

将配置交给Nacos管理的步骤

  • 在Nacos中添加配置文件
  • 在微服务中引入nacos的config依赖
  • 在微服务中添加bootstrap.yml,配置nacos地址、当前环境、服务名称、文件后缀名。这些决定了程序启动时nacos读取哪个文件

配置自动刷新

  • 在需要注入配置文件中内容的类上添加注解@RefreshScope
  • 也可以使用@ConfigurationProperties注解

注:在nacos配置中,服务名.yaml文件是所有环境共享的环境

多种配置优先级如下:

服务名-profile.yaml > 服务名称.yaml > 本地配置

Feign远程调用

使用步骤:

  • 引入依赖
  • 添加@EnableFeignClients注解
  • 编写FeignClient接口
  • 使用FeignClient中定义的方法代替RestTemplate

Feign底层的客户端实现:

  • URLConnection:默认实现,不支持连接池
  • Apache HttpClient:支持连接池
  • OKHttp:支持连接池

Feign性能优化主要为:

  • 使用连接池代替默认的URLConnection
  • 日志级别最好采用basic或none

GateWay

网关功能

  • 身份认证和权限校验
  • 服务路由,将用户请求路由到微服务,并实现负载均衡
  • 对用户请求做限流

网关搭建步骤:

  • 创建项目,引入服务发现和gateway依赖
  • 配置application.yml,包括服务基本信息、nacos地址、路由
  • 路由配置包括:
    • 路由id:路由的唯一标识
    • 路由目标(uri):路由的目标地址,http代表固定地址,lb代表根据服务名负载均衡
    • 路由断言(predicates):判断路由的规则
    • 路由过滤器(filters):对请求或响应做处理

路由过滤器GateWayFilter

GatewayFilter是网关中提供的一种过滤器,可以对进入网关的请求和微服务返回的响应做处理。

image-20220528182922800

过滤器的作用:

  • 对路由的请求或响应做加工处理,比如添加请求头
  • 配置在路由下的过滤器只对当前路由的请求生效
  • defaultFiler对所有的路由都生效

GlobalFIlter全局过滤器

全局过滤器的作用也是处理一切进入网关的请求和微服务响应,与GatewayFilter的作用一样,区别在于GatewayFilter通过配置定义,处理逻辑是固定的,而GlobalFilter的逻辑需要自己写代码实现。通过实现GlobalFilter接口实现。

实现全局过滤器步骤:

  • 实现GlobalFilter接口
  • 添加@Order注解或实现Ordered接口(指定过滤器的执行顺序)
  • 编写处理逻辑

过滤器执行顺序:

请求进入网关会碰到三类过滤器:当前路由过滤器、DefaultFilter、GlobalFilter

请求路由后,会将当前过滤器和DefaultFilter、GlobalFilter,合并到一个过滤器链(集合)中,排序后依次执行每个过滤器

image-20220528190022922

  • 每一个过滤器都必须指定一个int类型的order值,order值越小,优先级越高,执行顺序越靠前
  • GlobalFilter通过实现Ordered接口,或者添加@Order注解来指定order值,由我们自己指定
  • 路由过滤器和defaultFilter的order由Spring指定,默认是按照声明顺序从1递增
  • 当过滤器的order值一样时,会按照defaultFilter > 路由过滤器 > GlobalFilter的顺序执行

Sentinel

雪崩问题(微服务之间相互调用,因为调用链中的一个服务故障,引起整个链路都无法访问的情况)

  • 超时处理:设定超时时间,请求超过一定时间没有响应就返回错误信息,不会无休止等待
  • 船壁模式:限定每个业务使用的线程数,避免耗尽整个tomcat的资源,因此也叫线程隔离
  • 熔断降级:由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务的一切请求
  • 流量控制:限制业务访问的QPS,避免服务因流量的突增而故障

流量控制

在添加限流规则时,点击高级选项,可以选择三种流控模式:

  • 直接:统计当前资源的请求,触发阈值时对当前资源直接限流,也就是默认的模式

  • 关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流

    满足下面条件可以使用关联模式

    • 两个有竞争关系的资源
    • 一个优先级高,一个优先级低
  • 链路:统计从指定链路访问到本资源的请求,触发阈值,对指定链路限流

流控效果

  • 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常。默认的处理方式
  • warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常。但这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值
  • 排队等待:让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长

**热点参数限流:**分别统计参数值相同的请求,之后判断是否超过QPS阈值。

隔离和降级

虽然限流可以尽量避免因高并发引起的服务故障,但服务还会因为其他原因而故障。而要将这些故障控制在一定范围,避免雪崩,就要靠线程隔离(舱壁模式)和熔断降级手段了

Feign整合Sentinel的步骤:

  • 在application.yml中配置:feign.sentinel.enable=true
  • 在FeignClient编写FallbackFactory并注册为Bean(继承FallbackFactory并指定泛型为要降级的类)
  • 将FallbackFactory配置到FeignClient中(在FeignClient中添加注解)

线程隔离

  • 线程池隔离

    优点:

    • 支持主动超时
    • 支持异步调用

    缺点:

    • 线程的额外开销较大

    场景:

    • 低扇出
  • 信号量隔离(Sentinel默认采用)

    优点:

    • 轻量级、无额外开销

    缺点:

    • 不支持主动超时
    • 不支持异步调用

    场景:

    • 高频调用
    • 高扇出

熔断策略

断路器熔断策略有三种:满调用、异常比例、异常数

  • 慢调用:业务的响应时长(RT)大于指定时长的请求认定为慢调用请求。在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断。
  • 异常比例或异常数:统计指定时间内的调用,如果调用次数超过指定请求数,并且出现异常的比例达到设定的比例阈值(或超过指定异常数),则触发熔断。
  • 异常数:统计只当时间内的调用,如果调用次数超过指定请求数,并且出现异常的次数达到阈值则触发熔断。

授权规则及规则持久化

授权规则可以对调用方的来源做控制,有白名单和黑名单两种方式。

  • 白名单:来源(origin)在白名单内的调用者允许访问。
  • 黑名单:来源(origin)在黑名单内的调用者不允许访问

自定义异常结果

默认情况下,发生限流、降级、授权拦截时,都会抛出异常到调用方。如果要自定义异常时的返回结果,需要实现BlockExceptionHandler接口

Sentinel是通过RequestOriginParser这个接口的parseOrigin来获取请求的来源

Sentinel有三种配置管理模式

  • 原始模式:保存在内存
  • pull模式:保存在本地文件或数据库,定时去读取
  • push模式:保存在nacos,监听变更实时更新

分布式事务

解决分布式事务的思想和模型

  • 全局事务:整个分布式事务
  • 分支事务:分布式事务中包含的每个子系统的事务
  • 最终一致思想:各分支事务分别执行并提交,如果有不一致的情况,再想办法恢复数据
  • 强一致思想:各分支事务执行完业务不要提交,等待彼此结果。而后统一提交或回滚

Seata

Seata事务管理中的三个重要角色:

  • TC(Transaction Coordinator)-事务协调者:维护全局和分支事务的状态,协调全局事务或回滚
  • TM(Transaction Manager)-事务管理器:定义全局事务的范围、开启全局事务、提交或回滚事务
  • RM(Resource Manager)-资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

image-20220531092538924

Seata提供了四种不同的分布式事务解决方案:

  • XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
  • TCC模式:最终一致性的分阶段事务模式,有业务侵入
  • AT模式:最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式
  • SAGA模式:长事务模式,有业务侵入

XA模式

XA模式原理:XA规范是X/Open 组织定义的分布式事务处理(DTP,Distribute Transaction Processing)标准,XA规范描述了全局的TM与与局部的RM之间的接口,几乎所有主流的数据库都对XA规范提供了支持。

image-20220531101529812

Seata的XA模式

Seata的XA模式做了一些调整,但大体相似:

image-20220531102528868

RM一阶段的工作:

  • 注册分支事务到TC
  • 执行分支业务sql但不提交
  • 报告执行状态到TC

TC二阶段的工作:

  • TC检测各分支事务执行状态
    • 如果都成功,通知所有RM提交事务
    • 如果有失败,通知所有RM回滚事务

RM二阶段的工作:

  • 接收TC指令,提交或回滚事务

XA模式的优点:

  • 事务的强一致性,满足ACID原则
  • 常用的数据库都支持,实现简单,并且没有代码侵入

XA模式的缺点:

  • 因为一阶段需要锁定数据库资源,等二阶段结束才释放,性能差
  • 依赖关系型数据库实现事务

AT模式

AT模式同样是分阶段提交的事务模型,不够弥补了XA模式中资源锁定周期过长的缺陷

image-20220531122532913

阶段一RM的工作:

  • 注册分支事务
  • 记录undo-log(数据快照)
  • 执行业务sql并提交
  • 报告事务状态

阶段二提交时RM的工作:

  • 删除undo-log即可

阶段二回滚时RM的工作:

  • 根据undo-log恢复到数更新前

image-20220531123104030

AT模式和XA模式的区别:

  • XA模式一阶段不提交事务,锁定资源;AT模式一阶段直接提交,不锁定资源
  • XA模式依赖数据库机制实现回滚;AT模式利用数据快照实现数据回滚
  • XA模式强一致性;AT模式最终一致性

TCC模式

TCC模式与AT模式非常相似,每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法:

  • Try:资源的检测和预留
  • Confirm:完成资源操作业务,要求Try成功Confirm一定能成功
  • Cancel:预留资源释放,可以理解为try的反向操作

image-20220601162444229

TCC模式的各个阶段任务:

  • Try:资源检查和预留
  • Confirm:业务执行和提交
  • Cancel:预留资源的释放

TCC的优点:

  • 一阶段完成直接提交事务,释放数据库资源,性能好
  • 相比AT模式,无需生成快照,无需使用全局锁,性能强
  • 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库

TCC缺点:

  • 有代码侵入,需要人为编写try、Confirm和Cancel接口,太麻烦
  • 软状态,事务是最终一致
  • 需要考虑Confirm和Cancel的失败情况,做好幂等处理

TCC的空回滚和业务悬挂

  1. 当某分支事务的try阶段阻塞,可能导致全局事务超时而触发二阶段的cancel操作。再未执行try操作时先执行了cancel操作,这时cancel不能做回滚,就是空回滚
  2. 对于已经空回滚的业务,如果以后继续执行try,就永远不可能confirm或cancel,这就是业务悬挂。应当阻止空回滚后的try操作,避免悬挂

Saga模式

Saga模式时Seata提供的长事务解决方案。也分为两个阶段:

  • 一阶段:直接提交本地事务
  • 而阶段:成功则什么都不做,失败则通过补偿业务来回滚

Saga模式优点:

  • 事务参与者可以甚至时间驱动实现异步调用,吞吐高
  • 一阶段直接提交事务,无锁,性能好
  • 不能编写TCC中的三个阶段,实现简单

Saga缺点:

  • 软状态持续时间不确定,时效性差
  • 没有锁,没有事务隔离,会有脏写

RabbitMQ

基本概念:

生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上。

交换机(Exchange):和生产者建立连接并接收生产者的消息。

消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息。

队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互。

路由(Routes):交换机转发消息到队列的规则。

同步调用的优点:

  • 时效性较强,可以立即得到结果

同步调用的问题:

  • 耦合度高
  • 性能和吞吐能力下降
  • 有额外的资源消耗
  • 有级联失败问题

异步通信的优点:

  • 耦合度低
  • 吞吐量提升
  • 故障隔离
  • 流量削峰

异步调用的缺点:

  • 依赖于Broker的可靠性、安全性、吞吐能力
  • 架构复杂了,业务没有明显的流程线,不好追踪管理

RabbiMQ结构

image-20220529084151441

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

常见消息类型:

  • 基本消息队列

    image-20220529084644566

  • 工作消息队列

    image-20220529084907773

  • 发布订阅,又根据交换机类型不同分为三种:

    • Fanout Exchange:广播

      image-20220529084855058

    • Direct Exchange:路由

      image-20220529084840572

    • Topic Exchange:主题

      image-20220529084938191

基本消息队列

基本消息队列的消息发送流程:

  • 建立connection
  • 创建channel
  • 利用channel声明队列
  • 利用channel向队列发送消息

基本消息队列的消息接收流程:

  • 建立connection
  • 创建channel
  • 利用channel声明队列
  • 定义consumer的消息行为handleDelivery()
  • 利用channel将消费者与队列绑定

如下代码实现:

publisher如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.zhang.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.zhang.demo.utils.ConnectionRabbitMq;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Send {

@Test
public void testSendMessage() throws IOException, TimeoutException {
//创建连接Mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("154.253.41.2");
//设置连接哪个端口
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();

assert connection != null;
Channel channel = connection.createChannel();
//通道绑定对应的消息队列
//参数1:队列名称,如果队列不存在自动创建
//参数2:用来定义队列是否要持久化
//擦书3:exclusive是否独占队列
//参数4:autoDelete 是否在消费完成后自动删除队列
//参数5:额外附加参数
channel.queueDeclare("hello",true,false,false,null);
//发布消息
//参数1:交换机名称,参数2队列名称,参数3:传递消息额外设置,参数4:消息的具体内容
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes(StandardCharsets.UTF_8));
channel.close();
connection.close();
}
}

consumer如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.zhang.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

// @Test
public void test() throws IOException, TimeoutException {
//创建连接工厂
//创建连接Mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("154.253.41.2");
//设置连接哪个端口
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);
//参数1:消费哪个队列的消息,参数2:开启消息的自动确认机制,参数3:消费时的回调接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("new StringBody="+new String(body));
}
});
channel.close();
connection.close();
}

public static void main(String[] args) throws IOException, TimeoutException {
Consumer consumer=new Consumer();
consumer.test();
}
}

基本消息对列SpringAMQP使用

  1. 添加依赖

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. 在配置文件中添加mq连接信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    spring:
    application:
    name: rabbitmq-springboot
    rabbitmq:
    host: 154.253.41.2
    port: 5672
    username: ems
    password: 123456
    virtual-host: /ems
  3. 在publisher服务中添加一个测试类,编写发送消息的消息提供

    1
    2
    3
    4
    5
    6
    7
    8
    9
      @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1(){
    String queueName="hello";
    String message="hello world";
    rabbitTemplate.convertAndSend(queueName,message);//发送到哪个队列以及发送什么消息
    }
  4. 消费者接收消息

    1
    2
    3
    4
    5
    6
    7
    8
    @Component
    @RabbitListener(queuesToDeclare = @Queue("hello"))//监听哪个队列
    public class HelloCumstomer {
    @RabbitHandler//处理监听行为
    public void receviel(String message){//参数是Publisher发送的消息类型
    System.out.println(message);
    }
    }

工作消息队列

Work queue工作队列,可以提高消息处理速度,避免队列消息堆积

util连接工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ConnectionRabbitMq {

//提供创建连接对量的方法
public static Connection getConnection(){
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("154.253.41.2");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123456");
return connectionFactory.newConnection();
}catch (Exception e){
e.printStackTrace();
}
return null;
}

public static void closeConnectionAndChannel( Channel channel,Connection connection){
try {
if(channel!=null)
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
try {
if(connection!=null)
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

publisher如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Provider {

public static void main(String[] args) throws IOException {
//获取连接对象
Connection connection= ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel=connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
for(int i=0;i<21;i++)
channel.basicPublish("","work",null,(i+"hello work quene").getBytes(StandardCharsets.UTF_8));
ConnectionRabbitMq.closeConnectionAndChannel(channel,connection);
}
}

consumer1如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.basicQos(1);//每次只能消费一个消息
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费者-1:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);//参数1:确认队列中哪个具体消息,参数2:是否开启多个消息同时确认
}
});
}
}

consumer2如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.zhang.demo.workquene;

import com.rabbitmq.client.*;
import com.zhang.demo.utils.ConnectionRabbitMq;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class Customer2 {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者-2:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}

工作消息队列SpringAMQP使用

前三步同上面基础消息队列,也可以指定每次取多少条消息,处理完成才继续取消息

1
2
3
listener:
direct:
prefetch: 1 #每次只能获取一条消息,处理完成后才能获取下一个消息
  1. 发送多条消息

    1
    2
    3
    4
    5
    @Test
    public void testWork(){
    for(int i=0;i<10;i++)
    rabbitTemplate.convertAndSend("work","work模型");
    }
  2. 消费者端添加多个消费者共同处理消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Component
    public class WorkCumstomer {

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void recevi1(String message){
    System.out.println("message-1"+message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void recevi2(String message){
    System.out.println("message-2"+message);
    }
    }

Fanout Exchange(广播)

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue

基本使用步骤:

连接工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ConnectionRabbitMq {

//提供创建连接对量的方法
public static Connection getConnection(){
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("154.253.41.2");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123456");
return connectionFactory.newConnection();
}catch (Exception e){
e.printStackTrace();
}
return null;
}

public static void closeConnectionAndChannel( Channel channel,Connection connection){
try {
if(channel!=null)
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
try {
if(connection!=null)
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

consumer1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Customer1 {
public static void main(String[] args) throws IOException {
//连接对象
Connection connection = ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue,"logs","");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费者-1:"+new String(body));
}
});
}
}

consumer2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Customer2 {
public static void main(String[] args) throws IOException {
//连接对象
Connection connection = ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue,"logs","");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费者-1:"+new String(body));
}
});
}
}

provider:

1
2
3
4
5
6
7
8
9
10
11
public class Provider {

public static void main(String[] args) throws IOException, TimeoutException {
Connection connectionFactory = ConnectionRabbitMq.getConnection();
assert connectionFactory != null;
Channel channel=connectionFactory.createChannel();
channel.exchangeDeclare("logs","fanout");//参数1:交换机名称,参数2:交换机类型 fanout 广播类型
channel.basicPublish("logs","",null,"fanout type message".getBytes(StandardCharsets.UTF_8));
ConnectionRabbitMq.closeConnectionAndChannel(channel,connectionFactory);
}
}

Fanout Exchange SpringAMQP使用:

前两步同基础消息队列

方式一注解:

  1. 定义pulisher发送消息

    1
    2
    3
    4
    5
    6
    7
        @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testFanout(){
    rabbitTemplate.convertAndSend("logs","","FanOut的模型发送的消息");//参数一是exchange,参数二是routingKey,参数三是消息
    }
  2. consumer消费端:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Component
    public class FanoutCumstomer {
    @RabbitListener(bindings = {
    @QueueBinding(value = @Queue, exchange = @Exchange(value = "logs",type = "fanout"))
    })
    public void recive1(String message){
    System.out.println("message-1 = " + message);
    }


    @RabbitListener(bindings = {
    @QueueBinding(value = @Queue, exchange = @Exchange(value = "logs",type = "fanout"))
    })
    public void recive2(String message){
    System.out.println("message-2 = " + message);
    }
    }

方式二配置类:

也可以使用配置的方式声明消费者绑定的交换机和队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class FanoutConfig {

@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout");//声明交换机
}

@Bean
public Queue fanoutQueue(){
return new Queue("fanout.queue");//声明队列
}

@Bean
public Binding fanoutBinfing(Queue fanoutQueue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);//绑定队列和交换机
}

}

消费者使用直接指定队列即可:

1
2
3
4
@RabbitListener(queues = "fanout.queue")
public void listenFanoutQueue(String msg){
System.out.println(msg);
}

Direct Exchange(路由)

DIrect Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 每一个Queue都与Exchange设置一个BindingKey(BindingKey可以指定多个);
  • 发布者发送消息时,指定消息的RoutingKey;
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列;

image-20220529102816194

util连接工具类:

public class ConnectionRabbitMq {

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//提供创建连接对量的方法
public static Connection getConnection(){
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("154.253.41.2");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123456");
return connectionFactory.newConnection();
}catch (Exception e){
e.printStackTrace();
}
return null;
}

public static void closeConnectionAndChannel( Channel channel,Connection connection){
try {
if(channel!=null)
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
try {
if(connection!=null)
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

consumer 1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection= ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.exchangeDeclare("log_direct","direct");
String queue = channel.queueDeclare().getQueue();
//基于rout key 绑定队列和交换机
channel.queueBind(queue,"log_direct","error");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费这-1"+new String(body));
}
});
}
}

consumer 2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Customer2 {
public static void main(String[] args) throws IOException {
Connection connection= ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.exchangeDeclare("log_direct","direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"log_direct","info");
channel.queueBind(queue,"log_direct","error");
channel.queueBind(queue,"log_direct","warning");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费者-2"+new String(body));
}
});
}
}

provider:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Provider {

public static void main(String[] args) throws IOException {
//获取连接对象
Connection connection= ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.exchangeDeclare("log_direct","direct");
//发送消息
String routingKey="warning";
channel.basicPublish("log_direct",routingKey,null,("这是direct模型发布的基于rout key:["+routingKey+"]发送的消息").getBytes(StandardCharsets.UTF_8));
ConnectionRabbitMq.closeConnectionAndChannel(channel,connection);
}

}

Direct Exchange SpringAMQP使用:

前两部同基础消息队列

  1. consumer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Component
    public class RouteCumstomer {

    @RabbitListener(bindings = {
    @QueueBinding(value=@Queue,exchange = @Exchange(value = "directs",type = "direct"),key = {"info","error","warning"})
    })
    public void recive1(String message){//消费者1
    System.out.println("message-1 = " + message);
    }

    @RabbitListener(bindings = {
    @QueueBinding(value=@Queue,exchange = @Exchange(value = "directs",type = ExchangeTypes.DIRECT),key = {"info","error","warning"})
    })
    public void recive2(String message){//消费者2
    System.out.println("message-2 = " + message);
    }
    }
  2. provider

    1
    2
    3
    4
    5
    6
    7
     @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testRoute(){
    rabbitTemplate.convertAndSend("directs","info","发送--info--key的信息");
    }

Topic Exchange(主题)

Topic Exchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割

Queue与Exchange指定BindingKey时可以使用通配符:

  • #:代表0个或多个单词
  • *:代表一个单词

image-20220529104430167

基础使用步骤:

util连接工具:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//提供创建连接对量的方法
public static Connection getConnection(){
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("154.253.41.2");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123456");
return connectionFactory.newConnection();
}catch (Exception e){
e.printStackTrace();
}
return null;
}

public static void closeConnectionAndChannel( Channel channel,Connection connection){
try {
if(channel!=null)
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
try {
if(connection!=null)
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

consumer 1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"topics","user.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费者-1"+new String(body));
}
});
}
}

consumer 2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Customer2 {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"topics","user.#");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费者-2"+new String(body));
}
});
}
}

provider:

1
2
3
4
5
6
7
8
9
10
11
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionRabbitMq.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String routekey="user.save.all";
channel.basicPublish("topics",routekey,null,("这里是topic动态类型模型["+routekey+"]").getBytes(StandardCharsets.UTF_8));
ConnectionRabbitMq.closeConnectionAndChannel(channel,connection);
}
}

Topic Exchange SpringAMQP使用

前面两部同基本消息队列

  1. consumer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Component
    public class TopicCumstomer {
    @RabbitListener(bindings = {
    @QueueBinding(value = @Queue,exchange = @Exchange(type = "topic",name = "topics"),key = {"user.save","user.*"})
    })
    public void recive1(String massage){
    System.out.println("massage 1= " + massage);
    }


    @RabbitListener(bindings = {
    @QueueBinding(value = @Queue,exchange = @Exchange(type = "topic",name = "topics"),key = {"order.#","user.*"})
    })
    public void recive2(String massage){
    System.out.println("massage 1= " + massage);
    }
    }
  2. provider

    1
    2
    3
    4
    5
    6
    7
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testTopic(){
    rabbitTemplate.convertAndSend("topics","order","user.save 路由消息");
    }

SpringAMQP中消息的序列化和反序列化是怎么实现的?

  • 利用MessageConverter实现的,默认是JDK的序列化
  • 注意发送方与接收方必须使用相同的MessageConverter

RabbitMQ相关文章:

江南一点雨

elasticsearch

elasticsearch是一款非常强大的开源搜索引擎,可以帮我们从海量数据中快速找到需要的内容。

elasticsearch结合Kibana、logstash、Beats,也就是elastic stack(ELK)。被广泛应用在日志数据分析、实时监控等领域

 ![image-20220529142245490](https://picgo-liziyuan.oss-cn-hangzhou.aliyuncs.com/img202205291422577.png)

文档和词条:

  • 每一条数据就是一个文档
  • 对文档中的内容分词,得到的词语就是词条

正向索引:

  • 基于文档id创建索引。查询词条时必须先找到文档,而后判断是否包含词条

倒排索引:

  • 对文档内容分词,对词条创建索引,并记录词条所在文档的信息。查询时先根据词条查询到文档id,而后获取文档。

索引

  • 索引(index):相同类型的文档的集合
  • 映射(mapping):索引中文档的字段约束信息,类似表的结构约束

Ik分词器安装使用

下载与Elasticsearch版本相同的ik分词器

https://github.com/medcl/elasticsearch-analysis-ik/releases

将下载的包解压后放到Elasticsearch中的plugins目录下,(踩坑日记–需要重新创建一个ik文件夹将解压后的ik分词器的内容复制到ik目录下才可以正常使用)

Ik分词器的扩展与停用词

在Ik分词器config目录下打开IKAnalyzer.cfg.xml进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict"></entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords"></entry>
<!--用户可以在这里配置远程扩展字典 -->
<!-- <entry key="remote_ext_dict">words_location</entry> -->
<!--用户可以在这里配置远程扩展停止词字典-->
<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>

image-20220529154352092

在字典中添加相关词语即可

IK分词器有两种模式:

  • ik_smart:智能切分,粗粒度
  • ik_max_word:最细切分,细粒度

拼音分词器

安装同IK分词器,下载解压复制到plugins目录下,重启即可

自定义分词器步骤:

  • 创建索引库,在settings中配置,可以包含三部分
  • character filter
  • tokenizer
  • filter

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
PUT /test
{
"settings":{
"analysis":{
"analyzer":{
"my_analyzer":{
"tokenizer":"ik_max_word",
"filter":"py"
}
},
"filter":{
"py":{
"type":"pinyin",
"keep_full_pinyin":false,
"keep_joined_full_pinyin":true,
"keep_original":true,
"limit_first_letter_length":16
"remove_duplicated_term":true,
"none_chinese_pinyin_tokenizer":false
}
}
}
},
"mappings":{
"properties":{
"name":{
"type":"text",
"analyzer":"my_analyzer",
"search_analyzer":"standard"
}
}
}
}

mapping属性

mapping是对索引库中文档的约束,常见的mapping属性包括:

  • type:字段数据类型,常见的简单数据类型有:
    • 字符串:text(可分词的文本)、keyword(精确值,例如:品牌、国家、ip地址)
    • 数值:long、integer、short、byte、double、float
    • 布尔:boolean
    • 日期:date
    • 对象:object
  • index:是否创建索引,默认为true
  • analyzer:使用哪种分词器
  • properties:该字段的子字段

索引库基本操作

  • 创建索引:PUT/索引名称
  • 查询索引:GET/索引名称
  • 删除索引:DELETE/索引名称
  • 添加字段:PUT/索引库名/_mapping

创建索引库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
PUT /heima
{
"mappings": {
"properties": {
"info":{
"type": "text",
"analyzer": "ik_smart"
},
"email":{
"type": "keyword",
"index": false
},
"name":{
"type": "object",
"properties": {
"firstName":{
"type":"keyword"
},
"lastName":{
"type":"keyword"
}
}
}
}
}
}

修改索引库

索引库和mapping一旦创建无法修改,但是可以添加新的字段,语法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PUT /索引名/_mapping
{
"properties":{
"新字段名":{
"type":"integer"
}
}
}


如下示例:
PUT /heima/_mapping
{
"properties":{
"age":{
"type":"integer"
}
}
}

查询索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
GET /heima

//结果如下
{
"heima" : {
"aliases" : { },
"mappings" : {
"properties" : {
"age" : {
"type" : "integer"
},
"email" : {
"type" : "keyword",
"index" : false
},
"info" : {
"type" : "text",
"analyzer" : "ik_smart"
}
}
},
"settings" : {
"index" : {
"routing" : {
"allocation" : {
"include" : {
"_tier_preference" : "data_content"
}
}
},
"number_of_shards" : "1",
"provided_name" : "heima",
"creation_date" : "1653818826936",
"number_of_replicas" : "1",
"uuid" : "5dw1G0dkR1CbIV1CN6-MXA",
"version" : {
"created" : "8010299"
}
}
}
}
}

删除索引

1
2
3
4
5
6
DELETE /heima

//结果如下
{
"acknowledged" : true
}

文档基本操作

  • 创建文档:POST /索引库名/_doc/文档id {json文档}
  • 查询文档:GET /索引库名/_doc/文档id
  • 删除文档:DELETE /索引库名/文档id
  • 修改文档:
    • 全量修改:PUT /索引库名/_doc/文档id {json文档}
    • 增量修改:POST /索引库名/update/文档id {“doc”{字段}}

新建文档的DSL语法如下:

1
2
3
4
5
6
7
8
9
10
POST /索引库名/_doc/文档id
{
"字段1":"值1",
"字段2":"值2",
"字段3":{
"子属性1":"值3",
"子属性2":"值4"
},
......
}

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
POST /heima/_doc/1
{
"info":"世界知名的长江,对外开放的大学",
"email":"214324@qq.com",
"name":{
"firstName":"li",
"lastName":"ziyuan"
}
}
//结果如下
{
"_index" : "heima",
"_id" : "1",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1
}

查询文档语法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET /heima/_doc/1

//结果如下
{
"_index" : "heima",
"_id" : "1",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"info" : "世界知名的长江,对外开放的大学",
"email" : "214324@qq.com",
"name" : {
"firstName" : "li",
"lastName" : "ziyuan"
}
}
}

删除文档语法如下:

1
DELETE /索引库名/_doc/删除的文档id

修改文档

  • 全量修改,会删除旧文档,添加新文档

    1
    2
    3
    4
    5
    6
    PUT /索引库名/_doc/文档id
    {
    "字段1":"值1",
    "字段2":"值2"
    ........
    }
  • 增量修改,修改指定字段值

    1
    2
    3
    4
    5
    6
    POST /索引库名/_update/文档id
    {
    "doc":{
    "字段名":"新的值"
    }
    }

DSL查询分类

Elasticsearch提供了基于JSON的DSL来定义查询,常见的查询类型包括如下:

  • 查询所有:查询出所有数据,一般测试用。例如:match_all
  • 全文检索(full text)查询:利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:
    • match_query
    • malti_match_query
  • 精确查询:根据精确词条查找数据,一般是查询keyword、数值、日期、boolean等类型字段。例如:
    • ids
    • range
    • term
  • 地理(geo)查询:根据经纬度查询。例如:
    • geo_distance
    • geo_bounding_box
  • 复合(compound)查询:复合查询可以将上述各种查询条件组合起来,合并查询条件,例如:
    • bool
    • boosting
    • constant_score
    • dis_max
    • function_score

聚合

聚合可(aggregations)可以实现对文档数据的统计、分析、运算。常见的有三类:

  • 桶(Bucket)聚合:用来对文档做分组
    • TermAggregation:按照文档字段值分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
    • Avg:求平均值
    • Max:求最大值
    • Min:求最小是
    • Stats:同时求max、min、avg、sum等
  • 管道(Pipeline)聚合:其他聚合的结果为基础的聚合

分词器组成

elasticsearch中分词(analyzer)的组成包括三部分:

  • character filter:在tokenizer之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;
  • tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换,同义词处理、拼音处理

自动补全查询

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配用户输入内容开头的词条并返回。为了提高补全查询效率,对于文档中字段的类型有一些约束:

  • 参数补全查询的字段必须是completion类型
  • 字段的内容一般是用来补全的多个词条形成的数组

示例:

1
2
3
4
5
6
7
8
9
10
PUT test
{
"mappings":{
"properties":{
"tittle":{
"type":"completion"
}
}
}
}
1
2
3
4
POST test/_doc
{
"tittle":["Sony","WH-1000XM3"]
}

completion suggestion查询

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /test/_search
{
"suggest":{
"tittle_suggest":{
"text":"s"//关键字,
"completion":{
"field":"tittle",//补全查询的字段
"skip_duplicates":true,//跳过重复的
"size":10//获取10条结果
}
}
}
}

微服务
http://example.com/2022/09/04/微服务/
作者
liziyuan
发布于
2022年9月4日
许可协议