Proxyless的多活流量和微服务治理
共 34437字,需浏览 69分钟
·
2024-08-28 18:04
1. 引言
1.1 项目的背景及意义
服务间通信的复杂性:不同服务之间需要进行可靠的通信,处理失败重试、负载均衡等问题。
故障的容错处理:系统的复杂性给与运维及故障处理带来更大的挑战,如何快速处理故障解决线上问题,这是考验一个企业基础设施建设的重要关卡。
最初,开发者使用SDK来解决这些问题,通过在代码中集成各种库和工具来实现服务治理。然而,随着微服务架构的规模不断扩大,这种方法逐渐显现出局限性:
代码侵入性:需要在每个服务的代码中集成和配置各种库,增加了代码的复杂性和维护成本。
一致性问题:不同服务可能使用不同版本的库,导致治理逻辑不一致,SDK的升级难度凸显。
1.2 项目概述
项目地址:https://github.com/jd-opensource/joylive-agent
2. 微服务架构演进及优缺点
2.1 单体架构阶段
优点: 简单、易于开发和测试,适合小团队和小规模应用。
缺点: 这种架构随着应用规模增大,可能会面临维护困难、扩展性差等问题。
2.2 垂直拆分阶段
优点:独立部署和扩展,每个服务可以由独立的团队开发和维护,提高了敏捷性。
缺点:增加了分布式系统的复杂性,需要处理服务间通信、数据一致性、服务发现、负载均衡等问题,也因为中间引入LB而降低了性能。
2.3 微服务成熟阶段
优点: 高度可扩展、弹性和灵活性,支持高频率的发布和更新。
缺点: 系统复杂性和运维成本较高,需要成熟的技术栈和团队能力。微服务治理能力依赖SDK,升级更新成本高,需要绑定业务应用更新。
2.4 服务网格架构
优点: 主要优点是解耦业务逻辑与服务治理的能力,通过集中控制平面(control plane)简化了运维管理。
缺点: 增加了资源消耗,更高的运维挑战。
3. 项目架构设计
3.1 Proxyless模式
性能优化:
减少网络延迟:传统的边车代理模式会引入额外的网络跳数,因为每个请求都需要通过边车代理进行处理。通过Java Agent直接将服务网格功能注入到应用程序中,可以减少这些额外的网络开销,从而降低延迟。
降低资源消耗:不再需要运行额外的边车代理,从而减少了CPU、内存和网络资源的占用。这对需要高效利用资源的应用非常重要。
简化运维:
统一管理:通过Java Agent实现Proxyless模式,所有服务网格相关的配置和管理可以集中在控制平面进行,而无需在每个服务实例中单独配置边车代理。这简化了运维工作,特别是在大型分布式系统中。
减少环境复杂性:通过消除边车代理的配置和部署,环境的复杂性降低,减少了可能出现的配置错误或版本不兼容问题。
数据局面升级:Java Agent作为服务治理数据面,天然与应用程序解耦,这点是相对于SDK的最大优点。当数据面面临版本升级迭代时,可以统一管控而不依赖于用户应用的重新打包构建。
灵活性:
无需修改源代码与现有生态系统兼容:Java Agent可以在运行时对应用程序进行字节码操作,直接在字节码层面插入服务网格相关的逻辑,而无需开发者修改应用程序的源代码。这使得现有应用能够轻松集成Proxyless模式。
动态加载和卸载:Java Agent可以在应用程序启动时或运行时动态加载和卸载。这意味着服务网格功能可以灵活地添加或移除,适应不同的运行时需求。
适用性广:
支持遗留系统:对于无法修改源代码的遗留系统,Java Agent是一种理想的方式,能够将现代化的服务网格功能集成到老旧系统中,提升其功能和性能。
通过Java Agent实现Proxyless模式,能够在保持现有系统稳定性的同时,享受服务网格带来的强大功能,是一种高效且灵活的解决方案。
3.2 微内核架构概述
核心组件:框架的核心组件更多的定义核心功能接口的抽象设计,模型的定义以及agent加载与类隔离等核心功能,为达到最小化依赖,很多核心功能都是基于自研代码实现。具体可参见joylive-core
代码模块。
插件化设计:使用了模块化和插件化的设计,分别抽象了像保护插件,注册插件,路由插件,透传插件等丰富的插件生态,极大的丰富了框架的可扩展性,为适配多样化的开源生态奠定了基础。具体可参见joylive-plugin
代码模块。
3.3 插件扩展体系
项目基于Java的SPI机制实现了插件化的扩展方式,这也是Java生态的主流方式。
3.3.1 定义扩展
定义扩展接口,并使用@Extensible
注解来进行扩展的声明。下面是个负载均衡扩展示例:
@Extensible("LoadBalancer")
public interface LoadBalancer {
int ORDER_RANDOM_WEIGHT = 0;
int ORDER_ROUND_ROBIN = ORDER_RANDOM_WEIGHT + 1;
default <T extends Endpoint> T choose(List<T> endpoints, Invocation<?> invocation) {
Candidate<T> candidate = elect(endpoints, invocation);
return candidate == null ? null : candidate.getTarget();
}
<T extends Endpoint> Candidate<T> elect(List<T> endpoints, Invocation<?> invocation);
}
3.3.2 实现扩展
实现扩展接口,并使用@Extension
注解来进行扩展实现的声明。如下是实现了LoadBalancer
接口的实现类:
@Extension(value = RoundRobinLoadBalancer.LOAD_BALANCER_NAME, order = LoadBalancer.ORDER_ROUND_ROBIN)
@ConditionalOnProperties(value = {
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true),
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LANE_ENABLED, matchIfMissing = true),
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_FLOW_CONTROL_ENABLED, matchIfMissing = true)
}, relation = ConditionalRelation.OR)
public class RoundRobinLoadBalancer extends AbstractLoadBalancer {
public static final String LOAD_BALANCER_NAME = "ROUND_ROBIN";
private static final Function<Long, AtomicLong> COUNTER_FUNC = s -> new AtomicLong(0L);
private final Map<Long, AtomicLong> counters = new ConcurrentHashMap<>();
private final AtomicLong global = new AtomicLong(0);
@Override
public <T extends Endpoint> Candidate<T> doElect(List<T> endpoints, Invocation<?> invocation) {
AtomicLong counter = global;
ServicePolicy servicePolicy = invocation.getServiceMetadata().getServicePolicy();
LoadBalancePolicy loadBalancePolicy = servicePolicy == null ? null : servicePolicy.getLoadBalancePolicy();
if (loadBalancePolicy != null) {
counter = counters.computeIfAbsent(loadBalancePolicy.getId(), COUNTER_FUNC);
}
long count = counter.getAndIncrement();
if (count < 0) {
counter.set(0);
count = counter.getAndIncrement();
}
// Ensure the index is within the bounds of the endpoints list.
int index = (int) (count % endpoints.size());
return new Candidate<>(endpoints.get(index), index);
}
}
该类上的注解如下:
@Extension
注解声明扩展实现,并提供了名称@ConditionalOnProperty
注解声明启用的条件,可以组合多个条件
3.3.3 启用扩展
在 SPI 文件 META-INF/servicescom.jd.live.agent.governance.invoke.loadbalance.LoadBalancer
中配置扩展全路径名com.jd.live.agent.governance.invoke.loadbalance.roundrobin.RoundRobinLoadBalancer
即达到启用效果。
更多详情可查阅:https://github.com/jd-opensource/joylive-agent/blob/main/docs/cn/extension.md
3.4 依赖注入设计
3.4.1 @Injectable
boolean enable() default true;} (ElementType.TYPE) (RetentionPolicy.RUNTIME) Injectable {
@Injectable
注解的实例进行依赖对象注入。
3.4.2 @Inject
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Inject {
String value() default "";
boolean nullable() default false;
ResourcerType loader() default ResourcerType.CORE_IMPL;
}
3.4.3 @Configurable
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Configurable {
String prefix() default "";
boolean auto() default false;
}
@Injectable
,用于指定哪些类启用自动注入配置文件的支持。prefix
指定用于配置键的前缀。这通常意味着前缀将来自类名或基于某种约定。auto
指示配置值是否应自动注入到注解类的所有合规字段中。默认值为 false,这意味着默认情况下未启用自动注入。
3.4.4 @Config
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Config {
String value() default "";
boolean nullable() default true;
}
nullable
指示字段的配置是否是可选的。如果为真,则系统将允许配置缺失而不会导致错误。
下面是具体的使用示例:
@Injectable
@Extension(value = "CircuitBreakerFilter", order = OutboundFilter.ORDER_CIRCUIT_BREAKER)
public class CircuitBreakerFilter implements OutboundFilter, ExtensionInitializer {
@Inject
private Map<String, CircuitBreakerFactory> factories;
@Inject(nullable = true)
private CircuitBreakerFactory defaultFactory;
@Inject(GovernanceConfig.COMPONENT_GOVERNANCE_CONFIG)
private GovernanceConfig governanceConfig;
...
}
@Configurable(prefix = "app")
public class Application {
@Setter
@Config("name")
private String name;
@Setter
@Config("service")
private AppService service;
...
}
更多细节,因为篇幅原因不再展开。详情可以了解:https://github.com/jd-opensource/joylive-agent/blob/main/docs/cn/extension.md
3.5 字节码增强机制
Java字节码增强的主要方法有:
运行时增强:使用Java Agent在类加载时修改字节码。
加载时增强:在类加载到JVM之前修改字节码。
编译时增强:在编译阶段修改或生成字节码。
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.SuperMethodCall;
import net.bytebuddy.matcher.ElementMatchers;
import java.lang.reflect.Method;
// 原始类
class SimpleClass {
public void sayHello() {
System.out.println("Hello, World!");
}
}
// 拦截器
class SimpleInterceptor {
public static void beforeMethod() {
System.out.println("Before saying hello");
}
public static void afterMethod() {
System.out.println("After saying hello");
}
}
public class ByteBuddyExample {
public static void main(String[] args) throws Exception {
// 使用ByteBuddy创建增强类
Class<?> dynamicType = new ByteBuddy()
.subclass(SimpleClass.class)
.method(ElementMatchers.named("sayHello"))
.intercept(MethodDelegation.to(SimpleInterceptor.class)
.andThen(SuperMethodCall.INSTANCE))
.make()
.load(ByteBuddyExample.class.getClassLoader())
.getLoaded();
// 创建增强类的实例
Object enhancedInstance = dynamicType.getDeclaredConstructor().newInstance();
// 调用增强后的方法
Method sayHelloMethod = enhancedInstance.getClass().getMethod("sayHello");
sayHelloMethod.invoke(enhancedInstance);
}
}
这个例子展示了如何使用ByteBuddy来增强SimpleClass
的sayHello
方法。让我解释一下这个过程:
我们定义了一个简单的
SimpleClass
,它有一个sayHello
方法。我们创建了一个
SimpleInterceptor
类,包含了我们想要在原方法执行前后添加的逻辑。在
ByteBuddyExample
类的main
方法中,我们使用ByteBuddy来创建一个增强的类:我们创建了
SimpleClass
的一个子类。我们拦截了名为"sayHello"的方法。
我们使用
MethodDelegation.to(SimpleInterceptor.class)
来添加前置和后置逻辑。我们使用
SuperMethodCall.INSTANCE
来确保原始方法被调用。我们创建了增强类的实例,并通过反射调用了
sayHello
方法。
当你运行这个程序时,输出将会是:
Before saying hello
Hello, World!
After saying hello
当然,工程级别的实现是远比上面Demo的组织形式复杂的。插件是基于扩展实现的,有多个扩展组成,对某个框架进行特定增强,实现了多活流量治理等等业务逻辑。一个插件打包成一个目录,如下图所示:
.
└── plugin
├── dubbo
│ ├── joylive-registry-dubbo2.6-1.0.0.jar
│ ├── joylive-registry-dubbo2.7-1.0.0.jar
│ ├── joylive-registry-dubbo3-1.0.0.jar
│ ├── joylive-router-dubbo2.6-1.0.0.jar
│ ├── joylive-router-dubbo2.7-1.0.0.jar
│ ├── joylive-router-dubbo3-1.0.0.jar
│ ├── joylive-transmission-dubbo2.6-1.0.0.jar
│ ├── joylive-transmission-dubbo2.7-1.0.0.jar
│ └── joylive-transmission-dubbo3-1.0.0.jar
该dubbo插件,支持了3个版本,增强了注册中心,路由和链路透传的能力。下面介绍一下在joylive-agent
中如何实现一个字节码增强插件。
3.5.1 增强插件定义接口
PluginDefinition
。
@Extensible("PluginDefinition")
public interface PluginDefinition {
ElementMatcher<TypeDesc> getMatcher();
InterceptorDefinition[] getInterceptors();
}
接口共定义了两个方法:getMatcher
用于获取匹配要增强类的匹配器,getInterceptors
是返回要增强目标类的拦截器定义对象。
3.5.2 拦截器定义接口
new
的方式构造兰机器定义实现。拦截器定义接口如下:
public interface InterceptorDefinition {
ElementMatcher<MethodDesc> getMatcher();
Interceptor getInterceptor();
}
该接口同样抽象了两个方法:getMatcher
用于获取匹配要增强方法的匹配器,getInterceptor
是用于返回具体的增强逻辑实现,我们称之为拦截器(Interceptor)。
3.5.3 拦截器接口
public interface Interceptor {
void onEnter(ExecutableContext ctx);
void onSuccess(ExecutableContext ctx);
void onError(ExecutableContext ctx);
void onExit(ExecutableContext ctx);}
ExecutableContext
也是非常重要的组成部分,它承载了运行时的上下文信息,并且针对不同的增强目标我们做了不同的实现。如下图所示:
更多详情可查看:https://github.com/jd-opensource/joylive-agent/blob/main/docs/cn/plugin.md
3.6 类加载与类隔离
3.7 面向请求的抽象
请求接口
请求抽象实现
具体框架的请求适配,如Dubbo
DubboOutboundRequest
与DubboInboundRequest
,体现了一个OutboundRequest与InboundRequest的实现与继承关系。整体看起来确实比较复杂。这是因为在请求抽象时,不仅要考虑请求是Inbound还是Outbound,还要适配不同的协议框架。例如,像Dubbo和JSF这样的私有通讯协议框架需要统一为RpcRequest接口的实现,而SpringCloud这样的HTTP通讯协议则统一为HttpRequest。再加上Inbound和Outbound的分类维度,整体的抽象在追求高扩展性的同时也增加了复杂性。
4. 核心功能
下面提供的流量治理功能,以API网关作为东西向流量第一入口进行流量识别染色。在很大程度上,API网关作为东西向流量识别的第一入口发挥了重要作用。API网关在接收到南北向流量后,后续将全部基于东西向流量治理。
4.1 多活模型及流量调度
4.1.1 多活空间
.
└── 多活空间
├── 单元路由变量(*)
├── 单元(*)
│ ├── 分区(*)
├── 单元规则(*)
├── 多活域名(*)
│ ├── 单元子域名(*)
│ ├── 路径(*)
│ │ ├── 业务参数(*)
4.1.2 单元
单元
是逻辑上的概念,一般对应一个地域。常用于异地多活场景,通过用户维度拆分业务和数据,每个单元独立运作,降低单元故障对整体业务的影响。
单元的属性包括单元代码、名称、类型(中心单元或普通单元)、读写权限、标签(如地域和可用区)、以及单元下的分区。
单元分区
是单元的组成部分,单元内的逻辑分区,对应云上的可用区或物理数据中心,属性类似单元。
4.1.3 路由变量
路由变量
是决定流量路由到哪个单元的依据,通常是用户账号。每个变量可以通过不同的取值方式(如Cookie、请求头等)获取,且可以定义转换函数来获取实际用户标识。
变量取值方式
则描述如何从请求参数、请求头或Cookie中获取路由变量。
4.1.4 单元规则
单元规则
定义了单元和分区之间的流量调度规则。根据路由变量计算出的值,通过取模判断流量应路由到哪个单元。它的属性包括多活类型、变量、变量取值方式、计算函数、变量缺失时的操作、以及具体的单元路由规则。
单元路由规则
定义了单元内的流量路由规则,包括允许的路由变量白名单、前缀、值区间等。
分区路由规则
定义了单元内各个分区的流量路由规则,包括允许的变量白名单、前缀及权重。
4.1.5 多活域名
多活域名
描述启用多活的域名,用于在网关层进行流量拦截和路由。支持跨地域和同城多活的流量管理,配置路径规则以匹配请求路径并执行相应的路由规则。
单元子域名
描述各个单元的子域名,通常用于在HTTP请求或回调时闭环在单元内进行路由。
路径规则
定义了根据请求路径匹配的路由规则,取最长匹配路径来选择适用的路由规则。
业务参数规则
基于请求参数值进一步精细化路由,选择特定的单元规则。
4.1.6 模型骨架
以下是多活治理模型的基本配置样例,包括API版本、空间名称、单元、域名、规则、变量等。
[
{
"apiVersion": "apaas.cos.com/v2alpha1",
"kind": "MultiLiveSpace",
"metadata": {
"name": "mls-abcdefg1",
"namespace": "apaas-livespace"
},
"spec": {
"id": "v4bEh4kd6Jvu5QBX09qYq-qlbcs",
"code": "7Jei1Q5nlDbx0dRB4ZKd",
"name": "TestLiveSpace",
"version": "2023120609580935201",
"tenantId": "tenant1",
"units": [
],
"domains": [
],
"unitRules": [
],
"variables": [
]
}
}
]
以上概念会比较晦涩难懂,更多详情可以访问:https://github.com/jd-opensource/joylive-agent/blob/main/docs/cn/livespace.md
4.2 全链路灰度(泳道)
泳道是一种隔离和划分系统中不同服务或组件的方式,类似于游泳池中的泳道划分,确保每个服务或组件在自己的“泳道”中独立运作。泳道概念主要用于以下几种场景:
多租户架构中的隔离
在多租户系统中,泳道通常用于隔离不同租户的资源和服务。每个租户有自己的独立“泳道”,以确保数据和流量的隔离,防止不同租户之间的相互影响。
流量隔离与管理
泳道可以用于根据特定规则(例如用户属性、地理位置、业务特性等)将流量分配到不同的微服务实例或集群中。这种方式允许团队在某些条件下测试新版本、进行蓝绿部署、金丝雀发布等,而不会影响到其他泳道中的流量。
业务逻辑划分
在某些场景下,泳道也可以代表业务逻辑的划分。例如,一个电商平台可能会针对不同的用户群体(如普通用户、VIP用户)提供不同的服务路径和处理逻辑,形成不同的泳道。
版本管理
泳道可以用来管理微服务的不同版本,使得新版本和旧版本可以在不同的泳道中并行运行,从而降低升级时的风险。
开发和测试
在开发和测试过程中,不同的泳道可以用于隔离开发中的新功能、测试环境、甚至是不同开发团队的工作,从而减少互相干扰。
泳道的核心目的是通过隔离服务或资源,提供独立性和灵活性,确保系统的稳定性和可扩展性。这种隔离机制帮助组织更好地管理复杂系统中的多样性,尤其是在处理高并发、多租户、或者需要快速迭代的场景下。功能流程如下图所示:
模型定义及更多详情可以访问:https://github.com/jd-opensource/joylive-agent/blob/main/docs/cn/lane.md
4.3 微服务治理策略
由于篇幅原因,具体的治理策略与模型就不详尽展开介绍了,下图概括了服务治理的全貌。
值得一提有两点:
策略的实现屏蔽了底层框架的差异性,这得益于上面提到的面向请求的抽象。
统一治理层级的划分,多层级的策略挂载框架允许治理策略可以灵活的控制策略生效的影响半径。
统一HTTP和传统RPC的治理策略配置层级具体细节如下:
.
└── 服务
├── 分组*
│ ├── 路径*
│ │ ├── 方法*
服务治理策略放在分组、路径和方法上,可以逐级设置,下级默认继承上级的配置。服务的默认策略设置到默认分组default
上。
模型定义及更多详情可以访问:https://github.com/jd-opensource/joylive-agent/blob/main/docs/cn/governance.md
5. 功能实现示例
5.1 服务注册
5.1.1 服务注册
@Injectable
@Extension(value = "ServiceConfigDefinition_v3", order = PluginDefinition.ORDER_REGISTRY)
@ConditionalOnProperties(value = {
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true),
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LANE_ENABLED, matchIfMissing = true),
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_FLOW_CONTROL_ENABLED, matchIfMissing = true)
}, relation = ConditionalRelation.OR) @ConditionalOnClass(ServiceConfigDefinition.TYPE_CONSUMER_CONTEXT_FILTER)
@ConditionalOnClass(ServiceConfigDefinition.TYPE_SERVICE_CONFIG)
public class ServiceConfigDefinition extends PluginDefinitionAdapter {
protected static final String TYPE_SERVICE_CONFIG = "org.apache.dubbo.config.ServiceConfig";
private static final String METHOD_BUILD_ATTRIBUTES = "buildAttributes";
private static final String[] ARGUMENT_BUILD_ATTRIBUTES = new String[]{
"org.apache.dubbo.config.ProtocolConfig"
};
// ......
public ServiceConfigDefinition() {
this.matcher = () -> MatcherBuilder.named(TYPE_SERVICE_CONFIG);
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_BUILD_ATTRIBUTES).
and(MatcherBuilder.arguments(ARGUMENT_BUILD_ATTRIBUTES)),
() -> new ServiceConfigInterceptor(application, policySupplier))
};
}
}
public class ServiceConfigInterceptor extends InterceptorAdaptor {
// ......
@Override
public void onSuccess(ExecutableContext ctx) {
MethodContext methodContext = (MethodContext) ctx;
Map<String, String> map = (Map<String, String>) methodContext.getResult();
application.label(map::putIfAbsent);
// ......
}
}
上面例子所呈现的效果是,当dubbo应用启动时,增强插件拦截dubbo框架org.apache.dubbo.config.ServiceConfig
中的buildAttributes
方法进行增强处理。从ServiceConfigInterceptor
的实现中可以看出,当buildAttributes
方法执行成功后,对该方法的返回的Map
对象继续增加了框架额外的元数据标签。
5.1.2 服务策略订阅
ServiceConfigInterceptor
的增强会发现,在给注册示例打标之后,还有一部分逻辑,如下:
public class ServiceConfigInterceptor extends InterceptorAdaptor {
@Override
public void onSuccess(ExecutableContext ctx) {
MethodContext methodContext = (MethodContext) ctx;
// ......
AbstractInterfaceConfig config = (AbstractInterfaceConfig) ctx.getTarget();
ApplicationConfig application = config.getApplication();
String registerMode = application.getRegisterMode();
if (DEFAULT_REGISTER_MODE_INSTANCE.equals(registerMode)) {
policySupplier.subscribe(application.getName());
} else if (DEFAULT_REGISTER_MODE_INTERFACE.equals(registerMode)) {
policySupplier.subscribe(config.getInterface());
} else {
policySupplier.subscribe(application.getName());
policySupplier.subscribe(config.getInterface());
}
}
}
policySupplier.subscribe
所执行的是策略订阅逻辑。因为策略是支持热更新并实时生效的,策略订阅逻辑便是开启了订阅当前服务在控制台所配置策略的逻辑。
5.2 流量控制
5.2.1 入流量拦截点
入流量拦截也就是拦截Inbound请求,拦截相关框架的入流量处理链的入口或靠前的处理器的相关逻辑并予以增强。下面以Dubbo3的拦截点为例。
@Injectable
@Extension(value = "ClassLoaderFilterDefinition_v3")
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_ENABLED, matchIfMissing = true)
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_LIVE_DUBBO_ENABLED, matchIfMissing = true)
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_REGISTRY_ENABLED, matchIfMissing = true)
@ConditionalOnProperty(value = GovernanceConfig.CONFIG_TRANSMISSION_ENABLED, matchIfMissing = true)
@ConditionalOnClass(ClassLoaderFilterDefinition.TYPE_CLASSLOADER_FILTER)
public class ClassLoaderFilterDefinition extends PluginDefinitionAdapter {
protected static final String TYPE_CLASSLOADER_FILTER = "org.apache.dubbo.rpc.filter.ClassLoaderFilter";
private static final String METHOD_INVOKE = "invoke";
protected static final String[] ARGUMENT_INVOKE = new String[]{
"org.apache.dubbo.rpc.Invoker",
"org.apache.dubbo.rpc.Invocation"
};
// ......
public ClassLoaderFilterDefinition() {
this.matcher = () -> MatcherBuilder.named(TYPE_CLASSLOADER_FILTER);
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_INVOKE).
and(MatcherBuilder.arguments(ARGUMENT_INVOKE)),
() -> new ClassLoaderFilterInterceptor(context)
)
};
}
}
public class ClassLoaderFilterInterceptor extends InterceptorAdaptor {
private final InvocationContext context;
public ClassLoaderFilterInterceptor(InvocationContext context) {
this.context = context;
}
@Override
public void onEnter(ExecutableContext ctx) {
MethodContext mc = (MethodContext) ctx;
Object[] arguments = mc.getArguments();
Invocation invocation = (Invocation) arguments[1];
try {
context.inbound(new DubboInboundInvocation(new DubboInboundRequest(invocation), context));
} catch (RejectException e) {
Result result = new AppResponse(new RpcException(RpcException.FORBIDDEN_EXCEPTION, e.getMessage()));
mc.setResult(result);
mc.setSkip(true);
}
}
}
public interface InvocationContext {
// ......
default <R extends InboundRequest> void inbound(InboundInvocation<R> invocation) {
InboundFilterChain.Chain chain = new InboundFilterChain.Chain(getInboundFilters());
chain.filter(invocation);
}
}
org.apache.dubbo.rpc.filter.ClassLoaderFilter
的invoke
方法作为拦截点,织入我们统一的InboundFilterChain
对象作为入流量处理链。我们可以根据需求实现不同的InboundFilter即可,我们内置了一部分实现。如下所示:
5.2.2 出流量拦截点
出流量拦截也就是拦截Outbound请求,拦截相关框架的出流量处理链的入口并予以增强。下面以Dubbo2的拦截点为例。
如果只开启了多活或泳道治理,则只需对后端实例进行过滤,可以拦截负载均衡或服务实例提供者相关方法
-
@Injectable@Extension(value = "LoadBalanceDefinition_v2.7")@ConditionalOnProperties(value = { @ConditionalOnProperty(name = { GovernanceConfig.CONFIG_LIVE_ENABLED, GovernanceConfig.CONFIG_LANE_ENABLED }, matchIfMissing = true, relation = ConditionalRelation.OR), @ConditionalOnProperty(name = GovernanceConfig.CONFIG_FLOW_CONTROL_ENABLED, value = "false"), @ConditionalOnProperty(name = GovernanceConfig.CONFIG_LIVE_DUBBO_ENABLED, matchIfMissing = true)}, relation = ConditionalRelation.AND)@ConditionalOnClass(LoadBalanceDefinition.TYPE_ABSTRACT_CLUSTER)@ConditionalOnClass(ClassLoaderFilterDefinition.TYPE_CONSUMER_CLASSLOADER_FILTER)public class LoadBalanceDefinition extends PluginDefinitionAdapter { protected static final String TYPE_ABSTRACT_CLUSTER = "com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker"; private static final String METHOD_SELECT = "select"; private static final String[] ARGUMENT_SELECT = new String[]{ "org.apache.dubbo.rpc.cluster.LoadBalance", "org.apache.dubbo.rpc.Invocation", "java.util.List", "java.util.List" }; // ...... public LoadBalanceDefinition() { this.matcher = () -> MatcherBuilder.isSubTypeOf(TYPE_ABSTRACT_CLUSTER) .and(MatcherBuilder.not(MatcherBuilder.isAbstract())); this.interceptors = new InterceptorDefinition[]{ new InterceptorDefinitionAdapter( MatcherBuilder.named(METHOD_SELECT) .and(MatcherBuilder.arguments(ARGUMENT_SELECT)), () -> new LoadBalanceInterceptor(context) ) }; }}
拦截器里面调用上下文的路由方法
public class LoadBalanceInterceptor extends InterceptorAdaptor {
// ......
@Override
public void onEnter(ExecutableContext ctx) {
MethodContext mc = (MethodContext) ctx;
Object[] arguments = ctx.getArguments();
List<Invoker<?>> invokers = (List<Invoker<?>>) arguments[2];
List<Invoker<?>> invoked = (List<Invoker<?>>) arguments[3];
DubboOutboundRequest request = new DubboOutboundRequest((Invocation) arguments[1]);
DubboOutboundInvocation invocation = new DubboOutboundInvocation(request, context);
DubboCluster3 cluster = clusters.computeIfAbsent((AbstractClusterInvoker<?>) ctx.getTarget(), DubboCluster3::new);
try {
List<DubboEndpoint<?>> instances = invokers.stream().map(DubboEndpoint::of).collect(Collectors.toList());
if (invoked != null) {
invoked.forEach(p -> request.addAttempt(new DubboEndpoint<>(p).getId()));
}
List<? extends Endpoint> endpoints = context.route(invocation, instances);
if (endpoints != null && !endpoints.isEmpty()) {
mc.setResult(((DubboEndpoint<?>) endpoints.get(0)).getInvoker());
} else {
mc.setThrowable(cluster.createNoProviderException(request));
}
} catch (RejectException e) {
mc.setThrowable(cluster.createRejectException(e, request));
}
mc.setSkip(true);
}
}
如果开启了微服务治理,则设计到重试,需要对集群调用进行拦截
@Injectable
@Extension(value = "ClusterDefinition_v2.7")
@ConditionalOnProperty(name = GovernanceConfig.CONFIG_FLOW_CONTROL_ENABLED, matchIfMissing = true)
@ConditionalOnProperty(name = GovernanceConfig.CONFIG_LIVE_DUBBO_ENABLED, matchIfMissing = true)
@ConditionalOnClass(ClusterDefinition.TYPE_ABSTRACT_CLUSTER)
@ConditionalOnClass(ClassLoaderFilterDefinition.TYPE_CONSUMER_CLASSLOADER_FILTER)
public class ClusterDefinition extends PluginDefinitionAdapter {
protected static final String TYPE_ABSTRACT_CLUSTER = "org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker";
private static final String METHOD_DO_INVOKE = "doInvoke";
private static final String[] ARGUMENT_DO_INVOKE = new String[]{
"org.apache.dubbo.rpc.Invocation",
"java.util.List",
"org.apache.dubbo.rpc.cluster.LoadBalance"
};
// ......
public ClusterDefinition() {
this.matcher = () -> MatcherBuilder.isSubTypeOf(TYPE_ABSTRACT_CLUSTER)
.and(MatcherBuilder.not(MatcherBuilder.isAbstract()));
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_DO_INVOKE)
.and(MatcherBuilder.arguments(ARGUMENT_DO_INVOKE)),
() -> new ClusterInterceptor(context)
)
};
}
}
拦截器里面构造集群对象进行同步或异步调用
public class ClusterInterceptor extends InterceptorAdaptor {
// ......
@Override
public void onEnter(ExecutableContext ctx) {
MethodContext mc = (MethodContext) ctx;
Object[] arguments = ctx.getArguments();
DubboCluster3 cluster = clusters.computeIfAbsent((AbstractClusterInvoker<?>) ctx.getTarget(), DubboCluster3::new);
List<Invoker<?>> invokers = (List<Invoker<?>>) arguments[1];
List<DubboEndpoint<?>> instances = invokers.stream().map(DubboEndpoint::of).collect(Collectors.toList());
DubboOutboundRequest request = new DubboOutboundRequest((Invocation) arguments[0]);
DubboOutboundInvocation invocation = new DubboOutboundInvocation(request, context);
DubboOutboundResponse response = cluster.request(context, invocation, instances);
if (response.getThrowable() != null) {
mc.setThrowable(response.getThrowable());
} else {
mc.setResult(response.getResponse());
}
mc.setSkip(true);
}
}
同样,出流量拦截也是采用了责任链模式设计了OutboundFilterChain
,用户可以根据自己的需求扩展实现OutboundFilter
,目前针对已支持功能内置了部分实现,如下所示:
6. 部署实践
6.1 基于Kubernates实践场景
joylive-injector
是针对K8s场景打造的自动注入组件。joylive-injector
是基于kubernetes
的动态准入控制webhook,它可以用于修改kubernete
资源。它会监视工作负载(如deployments
)的CREATE、UPDATE、DELETE事件和pods
的CREATE事件,并为POD
添加initContainer
、默认增加环境变量JAVA_TOOL_OPTIONS
、挂载configmap
、修改主容器的卷装载等操作。目前已支持的特性如下:
支持自动将
joylive-agent
注入应用的Pod。支持多版本
joylive-agent
与对应配置管理。支持注入指定版本
joylive-agent
及对应配置。
joylive-agent
变得非常简单,在安装joylive-injector
组件后,只需要在对应的deployment
文件中加入标签x-live-enabled: "true"
即可,如下所示:
apiVersion: apps/v1kind: Deploymentmetadata:
labels:
app: joylive-demo-springcloud2021-provider x-live-enabled: "true"
name: joylive-demo-springcloud2021-providerspec:
replicas: 1
selector:
matchLabels:
app: joylive-demo-springcloud2021-provider template:
metadata:
labels:
app: joylive-demo-springcloud2021-provider x-live-enabled: "true"
spec:
containers:
- env:
- name: CONFIG_LIVE_SPACE_API_TYPE value: multilive - name: CONFIG_LIVE_SPACE_API_URL value: http://api.live.local/v1 - name: CONFIG_LIVE_SPACE_API_HEADERS value: pin=demo - name: CONFIG_SERVICE_API_TYPE value: jmsf - name: CONFIG_SERVICE_API_URL value: http://api.jmsf.local/v1 - name: LIVE_LOG_LEVEL value: info - name: CONFIG_LANE_ENABLED value: "false"
- name: NACOS_ADDR value: nacos-server.nacos.svc:8848
- name: NACOS_USERNAME value: nacos - name: NACOS_PASSWORD value: nacos - name: APPLICATION_NAME value: springcloud2021-provider - name: APPLICATION_SERVICE_NAME value: service-provider - name: APPLICATION_SERVICE_NAMESPACE value: default - name: SERVER_PORT value: "18081"
- name: APPLICATION_LOCATION_REGION value: region1 - name: APPLICATION_LOCATION_ZONE value: zone1 - name: APPLICATION_LOCATION_LIVESPACE_ID value: v4bEh4kd6Jvu5QBX09qYq-qlbcs - name: APPLICATION_LOCATION_UNIT value: unit1 - name: APPLICATION_LOCATION_CELL value: cell1 - name: APPLICATION_LOCATION_LANESPACE_ID value: "1"
- name: APPLICATION_LOCATION_LANE value: production image: hub-vpc.jdcloud.com/jmsf/joylive-demo-springcloud2021-provider:1.1.0-5aab82b3-AMD64 imagePullPolicy: Always name: joylive-demo-springcloud2021-provider ports:
- containerPort: 18081
name: http protocol: TCP resources:
requests:
cpu: "4"
memory: "8Gi"
limits:
cpu: "4"
memory: "8Gi"
terminationMessagePath: /dev/termination-log terminationMessagePolicy: File dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler securityContext: { }
terminationGracePeriodSeconds: 30
启动后Pod如下图所示即代表注入成功,随后观察应用日志及功能测试即可。
扫一扫,加入技术交流群