record of kmesh project
这个文件主要是对kmesh的源码进行分析和解读。为了避免kmesh.md文件过于冗长和复杂,特将源码分析单独成篇。
文件的核心在于cgroup_connect4和cgroup_connect6两个eBPF程序的入口函数,分别用于处理IPv4和IPv6的连接请求。下面详细解读这两个函数的实现。
SEC("cgroup/connect4") // eBPF 程序挂载点,表示该程序用于处理 IPv4 连接请求
int cgroup_connect4_prog(struct bpf_sock_addr *ctx)
{
struct kmesh_context kmesh_ctx = {0}; // 声明并清零一个 Kmesh 自定义的上下文对象,用来在各个步骤中传递和保存信息。
kmesh_ctx.ctx = ctx;
// 原始目的 IPv4 地址和重定向后的地址,这里先用用户请求的地址进行初始化,后续可能改写
kmesh_ctx.orig_dst_addr.ip4 = ctx->user_ip4;
kmesh_ctx.dnat_ip.ip4 = ctx->user_ip4;
kmesh_ctx.dnat_port = ctx->user_port;
// 如果不是被kmesh管理的进程,或者kmesh未启用,则直接返回,不做任何处理
if (handle_kmesh_manage_process(&kmesh_ctx) || !is_kmesh_enabled(ctx)) {
return CGROUP_SOCK_OK;
}
// TODO: should we always try to hijack 53 dns traffic?
if (ctx->user_port == bpf_htons(53)) { // 如果是 DNS 请求,进行特殊处理
backend_key backend_k = {0};
backend_value *backend_v = map_lookup_backend(&backend_k); // 声明一个后端查询 key,清零。通常这是一个“全局”键,用来取 DNS 代理配置。
// dns proxy is not enabled
if (!backend_v) { // 如果没有配置 DNS 代理,就不做劫持,直接放行。
return CGROUP_SOCK_OK;
}
kmesh_ctx.dnat_ip.ip4 = backend_v->addr.ip4; // 将 DNAT 目标 IP 设置为 DNS 代理 IP
if (set_original_dst_info(&kmesh_ctx)) { // 把“原始目的 (IP, 端口)”写入 per-socket storage(只写一次)
BPF_LOG(ERR, KMESH, "[IPv4]failed to set original destination info");
return CGROUP_SOCK_OK;
}
ctx->user_ip4 = backend_v->addr.ip4;
return CGROUP_SOCK_OK;
}
if (ctx->protocol != IPPROTO_TCP)
return CGROUP_SOCK_OK;
observe_on_pre_connect(ctx->sk); // 这里看起来是记录了一下请求连接的时间戳
int ret = sock_traffic_control(&kmesh_ctx);
if (ret) {
return CGROUP_SOCK_OK;
}
ret = set_original_dst_info(&kmesh_ctx); // 核心函数
if (ret) {
BPF_LOG(ERR, KMESH, "[IPv4]failed to set original destination info, ret is %d\n", ret);
return CGROUP_SOCK_OK;
}
SET_CTX_ADDRESS4(ctx, &kmesh_ctx.dnat_ip, kmesh_ctx.dnat_port);
if (kmesh_ctx.via_waypoint) {
kmesh_workload_tail_call(ctx, TAIL_CALL_CONNECT4_INDEX);
// if tail call failed will run this code
BPF_LOG(ERR, KMESH, "workload tail call failed, err is %d\n", ret);
}
return CGROUP_SOCK_OK;
}
static inline int sock_traffic_control(struct kmesh_context *kmesh_ctx)
{
observe_on_operation_start(SOCK_TRAFFIC_CONTROL, kmesh_ctx); // 观测埋点:标记“流量控制”这个操作的开始,用于性能监控和追踪。
int ret;
frontend_value *frontend_v = NULL;
frontend_key frontend_k = {0};
struct bpf_sock_addr *ctx = kmesh_ctx->ctx;
struct ip_addr orig_dst_addr = {0};
if (ctx->family == AF_INET) {
orig_dst_addr.ip4 = kmesh_ctx->orig_dst_addr.ip4;
frontend_k.addr.ip4 = orig_dst_addr.ip4; // 将 Kmesh 上下文中保存的原始目的 IPv4 地址,拷贝
} else if (ctx->family == AF_INET6) {
IP6_COPY(orig_dst_addr.ip6, kmesh_ctx->orig_dst_addr.ip6);
// Java applications use IPv6 for communication. In the IPv4 network environment, the control plane delivers the
// IPv4 address to the bpf map but obtains the IPv4 mapped address from the bpf prog context. Therefore, address
// translation is required before and after traffic manager.
if (is_ipv4_mapped_addr(orig_dst_addr.ip6))
V4_MAPPED_REVERSE(orig_dst_addr.ip6);
bpf_memcpy(frontend_k.addr.ip6, orig_dst_addr.ip6, IPV6_ADDR_LEN);
}
BPF_LOG(
DEBUG,
KMESH,
"origin dst addr=[%u:%s:%u]\n",
ctx->family,
ip2str((__u32 *)&kmesh_ctx->orig_dst_addr, (ctx->family == AF_INET)),
bpf_ntohs(ctx->user_port));
frontend_v = map_lookup_frontend(&frontend_k); // 根据原始的目的地址查找service id 或者 backend id(pod)
if (!frontend_v) {
return -ENOENT;
}
ret = frontend_manager(kmesh_ctx, frontend_v); // 核心函数,接下去会详细解读
if (ret != 0) {
if (ret != -ENOENT)
BPF_LOG(ERR, KMESH, "frontend_manager failed, ret:%d\n", ret);
return ret;
}
observe_on_operation_end(SOCK_TRAFFIC_CONTROL, kmesh_ctx);
return 0;
}
static inline int frontend_manager(struct kmesh_context *kmesh_ctx, frontend_value *frontend_v)
{
int ret = 0;
service_key service_k = {0};
service_value *service_v = NULL;
backend_key backend_k = {0};
backend_value *backend_v = NULL;
bool direct_backend = false;
service_k.service_id = frontend_v->upstream_id;
service_v = map_lookup_service(&service_k); // 假设 upstream_id 是一个 service_id,尝试在 service map 中查找对应的服务
if (!service_v) { // upstream_id 可能不是一个 service_id
backend_k.backend_uid = frontend_v->upstream_id;
backend_v = map_lookup_backend(&backend_k); // 尝试在 backend map 中查找对应的后端
if (!backend_v) {
BPF_LOG(WARN, FRONTEND, "find backend by backend_uid %d failed\n", backend_k.backend_uid);
return -ENOENT;
}
direct_backend = true; // pod 直连,没有 service
}
if (direct_backend) { // in this case, we donot check the healthy status of the backend, just let it go
// For pod direct access, if a pod has waypoint captured, we will redirect to waypoint, otherwise we do nothing.
// 这个分支的核心逻辑是判断“直连 Pod”的流量是否需要被其关联的 Waypoint 代理捕获
if (backend_v->waypoint_port != 0) {
BPF_LOG(
DEBUG,
FRONTEND,
"find waypoint addr=[%s:%u]\n",
ip2str((__u32 *)&backend_v->wp_addr, kmesh_ctx->ctx->family == AF_INET),
bpf_ntohs(backend_v->waypoint_port));
ret = waypoint_manager(kmesh_ctx, &backend_v->wp_addr, backend_v->waypoint_port); // 核心函数,后续会详细解读
if (ret != 0) {
BPF_LOG(ERR, BACKEND, "waypoint_manager failed, ret:%d\n", ret);
}
return ret;
}
} else {
// 这个分支处理的是 upstream_id 解析为 Service 的情况
ret = service_manager(kmesh_ctx, frontend_v->upstream_id, service_v); // 核心函数,后续会详细解读
if (ret != 0) {
if (ret != -ENOENT)
BPF_LOG(ERR, FRONTEND, "service_manager failed, ret:%d\n", ret);
return ret;
}
}
return 0;
}
static inline int service_manager(struct kmesh_context *kmesh_ctx, __u32 service_id, service_value *service_v)
{
int ret = 0;
if (service_v->wp_addr.ip4 != 0 && service_v->waypoint_port != 0) { // 判断 service_v 中是否配置了有效的 Waypoint 地址和端口
BPF_LOG(
DEBUG,
SERVICE,
"find waypoint addr=[%s:%u]\n",
ip2str((__u32 *)&service_v->wp_addr, kmesh_ctx->ctx->family == AF_INET),
bpf_ntohs(service_v->waypoint_port));
ret = waypoint_manager(kmesh_ctx, &service_v->wp_addr, service_v->waypoint_port);
if (ret != 0) {
BPF_LOG(ERR, BACKEND, "waypoint_manager failed, ret:%d\n", ret);
}
return ret;
}
BPF_LOG(DEBUG, SERVICE, "service [%u] lb policy [%u]", service_id, service_v->lb_policy);
// 执行负载均衡
switch (service_v->lb_policy) {
case LB_POLICY_RANDOM: // 随机策略
ret = lb_random_handle(kmesh_ctx, service_id, service_v); // 在负载均衡函数中,会根据 service_id 查找 endpoint map,得到backend uid,然后再查找 backend map,得到 backend value
break;
case LB_POLICY_STRICT: // 严格本地性策略
ret = lb_locality_strict_handle(kmesh_ctx, service_id, service_v);
break;
case LB_POLICY_FAILOVER: // 本地性故障转移策略
ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v);
break;
default:
BPF_LOG(ERR, SERVICE, "unsupported load balance type:%u\n", service_v->lb_policy);
ret = -EINVAL;
break;
}
return ret;
}
三种负载均衡策略
case LB_POLICY_RANDOM: 随机策略。 ret = lb_random_handle(kmesh_ctx, service_id, service_v); 调用 lb_random_handle。该函数会从服务的最高优先级(prio 0)的后端端点列表中,随机选择一个。 case LB_POLICY_STRICT:
严格本地性策略 (Locality Strict)。 ret = lb_locality_strict_handle(kmesh_ctx, service_id, service_v); 调用 lb_locality_strict_handle。该函数会尝试从最高优先级的端点中随机选择一个。关键在于“严格”:如果最高优先级的端点都不可用,它不会尝试其他优先级的端点,而是直接失败。 case LB_POLICY_FAILOVER:
本地性故障转移策略 (Locality Failover)。 ret = lb_locality_failover_handle(kmesh_ctx, service_id, service_v); 调用 lb_locality_failover_handle。该函数会按优先级从高到低遍历端点组。它会在第一个找到的、拥有可用端点的优先级组里,随机选择一个端点。这实现了当高优先级区域(如本地)的端点不可用时,自动将流量切换到次高优先级区域(如其他可用区)的端点。
接下去是waypoint_manager函数的解读:
static inline int waypoint_manager(struct kmesh_context *kmesh_ctx, struct ip_addr *wp_addr, __u32 port)
{
ctx_buff_t *ctx = (ctx_buff_t *)kmesh_ctx->ctx;
if (ctx->user_family == AF_INET)
kmesh_ctx->dnat_ip.ip4 = wp_addr->ip4; // 如果是 IPv4,就将 kmesh_ctx 中用于记录 DNAT 目标的 dnat_ip 字段设置为 Waypoint 的 IPv4 地址
else
bpf_memcpy(kmesh_ctx->dnat_ip.ip6, wp_addr->ip6, IPV6_ADDR_LEN); // 使用 bpf_memcpy 将 Waypoint 的 IPv6 地址完整地拷贝到 kmesh_ctx 的 dnat_ip 字段中
kmesh_ctx->dnat_port = port; // 将 kmesh_ctx 中用于记录 DNAT 目标端口的 dnat_port 字段设置为 Waypoint 的端口
kmesh_ctx->via_waypoint = true;
return 0;
}
graph TD
subgraph "入口与前端决策"
A[frontend_manager]
end
subgraph "服务与负载均衡"
B[service_manager]
C[lb_*_handle]
end
subgraph "端点与后端处理"
D[endpoint_manager]
E[backend_manager]
end
subgraph "最终目标设置"
F[waypoint_manager]
G[svc_dnat]
end
A --"upstream_id 是 Service"--> B
A --"upstream_id 是直连 Pod (且有 Waypoint)"--> F
B --"Service 有 Waypoint"--> F
B --"Service 无 Waypoint\n(执行负载均衡)"--> C
C --"根据 LB 策略选择 Endpoint"--> D
D --"处理选定的 Endpoint"--> E
E --"后端 Pod 有 Waypoint"--> F
E --"后端 Pod 无 Waypoint (执行端口映射)"--> G
classDef entry fill:#e6f2ff,stroke:#b3d9ff,stroke-width:2px;
classDef service fill:#e6ffe6,stroke:#b3ffb3,stroke-width:2px;
classDef endpoint fill:#fff2e6,stroke:#ffdab3,stroke-width:2px;
classDef final fill:#f2e6ff,stroke:#d9b3ff,stroke-width:2px;
class A entry;
class B,C service;
class D,E endpoint;
class F,G final;
该文件挂载的eBPF程序是 sendmsg_prog
这个 eBPF 程序挂载在 sk_msg 钩子上,它会在套接字(socket)发送消息(如 send(), write() 等系统调用)时被触发。它的核心作用是:当流量被重定向到 Waypoint 代理时,在发送的原始数据包前面,额外插入一段元数据(Metadata),告诉 Waypoint 这个连接的“真正”目的地是哪里。
发给waypoint的数据由于目的地址和端口被修改为waypoint的地址和端口,所以需要通过这种方式告诉waypoint真正的目的地址和端口,否则waypoint无法将数据转发到正确的后端。
TLV 编码格式 文件注释详细说明了元数据的编码格式——TLV (Type-Length-Value)。
文件中定义了两种 TLV 类型:
例如,对于一个 IPv4 连接,插入的元数据会是: | 0x01 | 0x00000006 | 1.2.3.4 | 8080 | 0xfe | 0x00000000 |
SEC("sk_msg")
int sendmsg_prog(struct sk_msg_md *msg)
{
__u32 off = 0;
if (msg->family != AF_INET && msg->family != AF_INET6)
return SK_PASS;
// encode org dst addr
encode_metadata_org_dst_addr(msg, &off, (msg->family == AF_INET));
return SK_PASS;
}
核心调用:调用 encode_metadata_org_dst_addr 函数来执行元数据的编码和插入。 msg: 传入 sk_msg 上下文。 &off: 传入偏移量指针,函数内部会更新它。 (msg->family == AF_INET): 传入一个布尔值,告诉函数当前是 IPv4 (true) 还是 IPv6 (false),以便它能正确处理地址长度。
static inline void encode_metadata_org_dst_addr(struct sk_msg_md *msg, __u32 *off, bool v4)
{
struct ip_addr dst_ip = {0};
__u16 dst_port;
__u8 type = TLV_ORG_DST_ADDR;
__u32 tlv_size = (v4 ? TLV_ORG_DST_ADDR4_SIZE : TLV_ORG_DST_ADDR6_SIZE);
__u32 addr_size = (v4 ? TLV_ORG_DST_ADDR4_LENGTH : TLV_ORG_DST_ADDR6_LENGTH);
if (get_origin_dst(msg, &dst_ip, &dst_port))
return;
if (alloc_dst_length(msg, tlv_size + TLV_END_SIZE))
return;
BPF_LOG(DEBUG, SENDMSG, "get valid dst, do encoding...\n");
// write T
SK_MSG_WRITE_BUF(msg, off, &type, TLV_TYPE_SIZE);
// write L
addr_size = bpf_htonl(addr_size);
SK_MSG_WRITE_BUF(msg, off, &addr_size, TLV_LENGTH_SIZE);
// write V
if (v4)
SK_MSG_WRITE_BUF(msg, off, (__u8 *)&dst_ip.ip4, TLV_IP4_LENGTH);
else
SK_MSG_WRITE_BUF(msg, off, (__u8 *)dst_ip.ip6, TLV_IP6_LENGTH);
SK_MSG_WRITE_BUF(msg, off, &dst_port, TLV_PORT_LENGTH);
// write END
encode_metadata_end(msg, off);
return;
}
get_origin_dst(msg, &dst_ip, &dst_port)
这是决策点。它会去查询当前 socket 的 sock_storage。
如果 !storage (没有存储), !storage->via_waypoint (不是去往 Waypoint 的连接), 或者 storage->has_encoded (已经编码过了),则直接返回 -ENOENT。
只有当连接是明确发往 Waypoint 且尚未编码过元数据时,它才会成功返回 0,并填充 dst_ip 和 dst_port。
成功后,它会设置 storage->has_encoded = true;,防止对同一个 socket 的后续消息重复编码。
如果此函数失败,encode_metadata_org_dst_addr 会直接 return,不执行任何操作。
alloc_dst_length(msg, ...)
如果 get_origin_dst 成功,就调用此函数。
它使用 bpf_msg_push_data(msg, 0, length, 0) 这个 BPF aPI,在消息数据的最前面(偏移量为 0)“推入”一段空间,用于存放我们的 TLV 元数据。
如果分配空间失败,函数会返回,编码中止。
构造 TLV 使用 SK_MSG_WRITE_BUF 宏,按顺序将 Type、Length、Value (IP 和 Port) 写入刚刚分配的空间。 bpf_htonl(addr_size): 注意 Length 字段在写入时被转换成了网络字节序。
encode_metadata_end(msg, &off)
最后,调用此函数写入元数据结束标记(Type=0xfe, Length=0)。
static inline int get_origin_dst(struct sk_msg_md *msg, struct ip_addr *dst_ip, __u16 *dst_port)
{
struct bpf_sock *sk = (struct bpf_sock *)msg->sk;
struct sock_storage_data *storage = NULL;
storage = bpf_sk_storage_get(&map_of_sock_storage, sk, 0, 0);
if (!storage || !storage->via_waypoint || storage->has_encoded) {
return -ENOENT;
}
if (msg->family == AF_INET) {
dst_ip->ip4 = storage->sk_tuple.ipv4.daddr;
*dst_port = storage->sk_tuple.ipv4.dport;
} else {
bpf_memcpy(dst_ip->ip6, storage->sk_tuple.ipv6.daddr, IPV6_ADDR_LEN);
*dst_port = storage->sk_tuple.ipv6.dport;
}
storage->has_encoded = true;
return 0;
}
static inline int alloc_dst_length(struct sk_msg_md *msg, __u32 length)
{
int ret;
ret = bpf_msg_push_data(msg, 0, length, 0);
if (ret) {
BPF_LOG(ERR, SENDMSG, "failed to alloc memory for msg, length is %d\n", length);
return 1;
}
return 0;
}
该文件挂载的eBPF程序是 sockops_prog, 该程序挂载在 sock_ops 上。
SEC("sockops")
int sockops_prog(struct bpf_sock_ops *skops)
{
if (skops->family != AF_INET && skops->family != AF_INET6)
return 0;
switch (skops->op) {
case BPF_SOCK_OPS_TCP_CONNECT_CB: // 这部分应该是在发送SYN之前,在 cgroup_connect4/6 之后触发
skops_handle_kmesh_managed_process(skops);
break;
case BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB: // 在发送数据的时候触发
if (!is_managed_by_kmesh(skops))
break;
observe_on_connect_established(skops->sk, OUTBOUND);
if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0)
BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n");
struct bpf_sock *sk = (struct bpf_sock *)skops->sk;
if (!sk) {
break;
}
struct sock_storage_data *storage = NULL;
storage = bpf_sk_storage_get(&map_of_sock_storage, sk, 0, 0);
if (!storage) {
break;
}
if (storage->via_waypoint) {
enable_encoding_metadata(skops);
}
break;
case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB:
if (!is_managed_by_kmesh(skops) || skip_specific_probe(skops))
break;
observe_on_connect_established(skops->sk, INBOUND);
if (bpf_sock_ops_cb_flags_set(skops, BPF_SOCK_OPS_STATE_CB_FLAG) != 0)
BPF_LOG(ERR, SOCKOPS, "set sockops cb failed!\n");
auth_ip_tuple(skops);
break;
case BPF_SOCK_OPS_STATE_CB:
if (skops->args[1] == BPF_TCP_CLOSE) {
observe_on_close(skops->sk);
clean_auth_map(skops);
}
break;
default:
break;
}
return 0;
}
static inline void skops_handle_kmesh_managed_process(struct bpf_sock_ops *skops)
{
// 通过判断是否访问的是特定的端口和地址,来决定是否将该连接标记为由 kmesh 管理和删除
// 这里的连接发起应该是由cni插件发起的,在创建和删除pod的过程中触发,这样kmesh就能记录和管理pod的ip地址,从而知道哪些ip是kmesh管理的
if (skops_conn_from_cni_sim_add(skops))
record_kmesh_managed_ip(skops->family, skops->local_ip4, skops->local_ip6);
if (skops_conn_from_cni_sim_delete(skops))
remove_kmesh_managed_ip(skops->family, skops->local_ip4, skops->local_ip6);
}
BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB 连接建立完成时触发,用于启用waypoint的元数据编码,即在 sendmsg.c 中的 eBPF 程序 sendmsg_prog 会插入元数据。这里具体怎么启用的,目前还没有搞清楚。
BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB 服务端这边新连接三次握手刚完成、子 socket 建好”的那个瞬间触发,用来做入向连接的观测、订阅后续状态回调、以及把连接元组送去做认证。
BPF_SOCK_OPS_STATE_CB 连接状态变化时触发,这里只关心连接关闭事件(BPF_TCP_CLOSE),用于做连接关闭的观测和清理认证信息。
该文件挂载的eBPF程序是 cgroup_skb_ingress_prog 和 cgroup_skb_egress_prog, 主要是用于观测。
cgroup/skb 钩子简介
SEC("cgroup_skb/ingress")
int cgroup_skb_ingress_prog(struct __sk_buff *skb)
{
if (!is_monitoring_enable() || !is__periodic_report_enable()) { // 只有当“监控功能”和“周期性报告”同时启用时,才会继续执行
return SK_PASS;
}
if (skb->family != AF_INET && skb->family != AF_INET6)
return SK_PASS;
struct bpf_sock *sk = skb->sk;
if (!sk)
return SK_PASS;
if (sock_conn_from_sim(skb)) { // 用于判断是否是指向特定ip和端口的流量,之前的内容中又提到这个特殊的IP和端口是cni插件用来创建和删除pod的时候发起的
return SK_PASS;
}
if (!is_managed_by_kmesh_skb(skb)) // 判断 skb 中的 IP 地址是否属于 kmesh 管理的范围,查找 map_of_manager
return SK_PASS;
observe_on_data(sk);
return SK_PASS;
}
SEC("cgroup_skb/egress")
int cgroup_skb_egress_prog(struct __sk_buff *skb)
{
if (!is_monitoring_enable() || !is__periodic_report_enable()) {
return SK_PASS;
}
if (skb->family != AF_INET && skb->family != AF_INET6)
return SK_PASS;
struct bpf_sock *sk = skb->sk;
if (!sk)
return SK_PASS;
if (sock_conn_from_sim(skb)) {
return SK_PASS;
}
if (!is_managed_by_kmesh_skb(skb))
return SK_PASS;
observe_on_data(sk); // 核心功能
return SK_PASS;
}
static inline void observe_on_data(struct bpf_sock *sk)
{
struct bpf_tcp_sock *tcp_sock = NULL;
struct sock_storage_data *storage = NULL;
if (!sk)
return;
tcp_sock = bpf_tcp_sock(sk);
if (!tcp_sock)
return;
storage = bpf_sk_storage_get(&map_of_sock_storage, sk, 0, 0);
if (!storage) {
// maybe the connection is established before kmesh start
return;
}
__u64 now = bpf_ktime_get_ns();
if ((storage->last_report_ns != 0) && (now - storage->last_report_ns > LONG_CONN_THRESHOLD_TIME)) {
tcp_report(sk, tcp_sock, storage, BPF_TCP_ESTABLISHED);
}
}
bpf_tcp_sock *tcp_sock 中读取 TCP 协议栈的各种底层指标,例如:
bytes_sent: 已发送的总字节数。bytes_received: 已接收的总字节数。segs_in: 入向的 TCP 段(包)数量。segs_out: 出向的 TCP 段(包)数量。srtt_us: 平滑往返时间(Smoothed Round-Trip Time)。rtt_min: 最小往返时间。struct sock_storage_data *storage 中读取 Kmesh 自己记录的上下文信息,如 direction、connect_ns 等。tcp_report 函数会将上述读取到的指标填充到一个 struct tcp_conn_info 结构体中。
然后,它会调用 bpf_ringbuf_output,将这个 tcp_conn_info 结构体作为一个事件,发送到用户态。
最关键的是,在发送完报告后,tcp_report 会更新 storage->last_report_ns = bpf_ktime_get_ns();。它将 storage 中的“上次报告时间”重置为当前时间。该文件夹下的文件主要用于同节点下的通信加速。mda的意思是mesh data accelerate。oncn的意思可能是是on cluster node或者是Open Cloud Native。
该文件挂载的eBPF程序是 SOCK_REDIRECT_NAME, 该程序挂载在 sk_msg 上。用于对本机内的socket间通过新进行加速,跳过内核网络协议栈。直接发送至socket接收队列。
SEC("sk_msg")
int SOCK_REDIRECT_NAME(struct sk_msg_md *const msg)
{
struct sock_key key = {0};
struct sock_key *redir_key = NULL;
long ret = 0;
key.sip4 = msg->local_ip4;
key.dip4 = msg->remote_ip4;
key.sport = (bpf_htonl(msg->local_port) >> 16);
key.dport = (force_read(msg->remote_port) >> 16);
#if MDA_LOOPBACK_ADDR
set_netns_cookie((void *)msg, &key);
#endif
redir_key = bpf_map_lookup_elem(&SOCK_OPS_PROXY_MAP_NAME, &key);
if (redir_key != NULL) {
ret = bpf_msg_redirect_hash(msg, &SOCK_OPS_MAP_NAME, redir_key, BPF_F_INGRESS);
if (ret != SK_DROP) {
bpf_log(DEBUG, "redirect the sk success\n");
} else {
// If you connect to the peer machine, you do end up in this branch
bpf_log(INFO, "no such socket, may be peer socket on another machine\n");
}
}
return SK_PASS;
}