Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement apiToken failover mechanism #1256

Open
wants to merge 32 commits into
base: main
Choose a base branch
from

Conversation

cr7258
Copy link
Collaborator

@cr7258 cr7258 commented Aug 27, 2024

Ⅰ. Describe what this PR did

配置示例:

provider:
  type: qwen
  apiTokens:
    - "api-token-1"
    - "api-token-2"
    - "api-token-3"
  modelMapping:
    'gpt-3': "qwen-turbo"
    'gpt-4-turbo': "qwen-max"
    '*': "qwen-turbo"
  failover:
    enabled: true
    failureThreshold: 3
    successThreshold: 1
    healthCheckInterval: 5000
    healthCheckTimeout: 5000
    healthCheckModel: gpt-3

目前仅根据 HTTP 请求的响应状态码是否是 200 来判断 apiToken 是否可用,应该暂时用不到其他复杂的判断条件。

Ⅱ. Does this pull request fix one issue?

fixes #1227

Ⅲ. Why don't you add test cases (unit test/integration test)?

Ⅳ. Describe how to verify it

Ⅴ. Special notes for reviews

Question

目前还有两个问题:

    1. 由于 Envoy 会启动多个 Wasm VM,当前的故障切换和健康检测是每个 Wasm VM 分别去做的(也就是说 VM1 可能已经把某个 apiToken 移除了,但是 VM2 可能还会继续用这个 apiToken 进行请求),是否需要通过 proxywasm.SetSharedData 在多个 Wasm VM 间进行同步?如果同步的话会带来另一个问题,如果 apiToken 不可用时,多个 Wasm VM 会同时发起多个健康检测请求。
    1. 我需要发送请求到 envoy 本地监听的服务和端口来对 apiToken 做健康检测,目前我的做法是手动创建一个 cluster,指向 envoy 本地 Listen 的地址和端口,这样好像不太灵活,而且需要用户额外设置 cluster。有没有更好的方式?
healthCheckClient = wrapper.NewClusterClient(wrapper.StaticIpCluster{
		ServiceName: "local_cluster",
		Port:        10000,
	})
    - name: outbound|10000||local_cluster.static
      connect_timeout: 0.25s
      type: STATIC
      load_assignment:
        cluster_name: outbound|10000||local_cluster.static
        endpoints:
          - lb_endpoints:
              - endpoint:
                  address:
                    socket_address:
                      address: 127.0.0.1
                      port_value: 10000

@codecov-commenter
Copy link

codecov-commenter commented Aug 27, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 43.52%. Comparing base (ef31e09) to head (0938f98).
Report is 182 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1256      +/-   ##
==========================================
+ Coverage   35.91%   43.52%   +7.61%     
==========================================
  Files          69       76       +7     
  Lines       11576    12320     +744     
==========================================
+ Hits         4157     5362    +1205     
+ Misses       7104     6622     -482     
- Partials      315      336      +21     

see 69 files with indirect coverage changes

@johnlanni
Copy link
Collaborator

@cr7258 可以用SetSharedData同步一下,要注意用cas机制避免冲突,同时也可以基于SetSharedData机制进行选主,让一个worker做健康检查恢复,不过要注意SharedData中的数据是VM级别的,即使插件配置更新也不会清理。

healthCheckClient = wrapper.NewClusterClient(wrapper.StaticIpCluster{
ServiceName: "local_cluster",
Port: 10000,
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个应该要配置吗吧?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,现在是需要配置一个 cluster, 指向 127.0.0.1。不知道有没有更好的方式处理?可以让用户不需要额外配置这个 cluster。

- name: outbound|10000||local_cluster.static
  connect_timeout: 0.25s
  type: STATIC
  load_assignment:
    cluster_name: outbound|10000||local_cluster.static
    endpoints:
      - lb_endpoints:
          - endpoint:
              address:
                socket_address:
                  address: 127.0.0.1
                  port_value: 10000

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接用 RouteCluster 就好了,就是当前路由到的服务,然后去请求这个服务来校验健康,直接用 Authorization 头就行了,不用设置这个 ApiToken-Health-Check 头

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

健康检测任务是在 parseConfig 阶段设置的,我试了下应该是拿不到当前请求的 cluster 的?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是不是在有一次失败之后再触发会好一些?失败的时候把RouteCluster和失败的apikey组合在一起加入到检查队列里。

Copy link
Collaborator Author

@cr7258 cr7258 Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果是多目标服务的情况,路由进来可能50%到openai,50%到qwen,会导致用qwen的api key 去请求openai。

不会出现 apiKey 混用的情况,我在这里对不同 provider 的 share data 做了区分 (以 provider 名字作为前缀):

func (c *ProviderConfig) initVariable() {
// Set provider name as prefix to differentiate shared data
provider := c.GetType()
c.failover.ctxApiTokenInUse = provider + "-apiTokenInUse"
c.failover.ctxHealthCheckHeader = provider + "-apiToken-health-check"
c.failover.ctxApiTokenRequestFailureCount = provider + "-apiTokenRequestFailureCount"
c.failover.ctxApiTokenRequestSuccessCount = provider + "-apiTokenRequestSuccessCount"
c.failover.ctxApiTokens = provider + "-apiTokens"
c.failover.ctxUnavailableApiTokens = provider + "-unavailableApiTokens"
c.failover.ctxRequestHostAndPath = provider + "-requestHostAndPath"
c.failover.ctxVmLease = provider + "-vmLease"
}

我用了这个配置进行了测试:#1256 (comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

目前我们在做的新的AI网关的控制台功能,AI Proxy插件是服务级配置,一个路由可以负载均衡到不同服务。这个时候健康检查如果通过外部路由进来,去到哪个Provider是不确定的。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我在构建 healthCheckClient 会从 shared data 拿到对应的 provider 的 host 和 path,所以健康检查是知道发给哪个 provider 的。每个 provider 的配置会分别起一组 wasm vm 处理请求,并且每个 provider 的 shared data 是隔离的,加上了 provider 作为前缀用于区分。

healthCheckClient = wrapper.NewClusterClient(wrapper.RouteCluster{
	Host:    hostPath.Host,
	Cluster: higressGatewayLocalCluster,
})
err = healthCheckClient.Post(hostPath.Path, headers, body, func(statusCode int, responseHeaders http.Header, responseBody []byte) {
	if statusCode == 200 {
		c.handleAvailableApiToken(apiToken, log)
	}
}

c.failover.ctxRequestHostAndPath = provider + "-requestHostAndPath"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

问题在于,多目标服务基于权重路由的情况下,相同的Host和Path会去到不同的LLM后端,这里即使按provider前缀区分开,健康检查的请求通过路由进来,依然去到了不同的 provider 里

Copy link
Collaborator Author

@cr7258 cr7258 Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnlanni 代码已根据要求重新做了修改

抽象出了 TransformRequestHeadersTransformRequestBody 方法,用于支持 ai-proxy 和直接发起 http call 的情况下对 headers 和 body 进行修改。目前我对 claude 和 groq 实现了这两个方法,其他 provider 暂时还没实现,如果没有问题的话,其他 provider 也可以照此修改。

type Provider interface {
GetProviderType() string
TransformRequestHeaders(headers http.Header, ctx wrapper.HttpContext, log wrapper.Log)
TransformRequestBody(body []byte, ctx wrapper.HttpContext, log wrapper.Log) ([]byte, error)
}

health check 的请求可以调用 TransformRequestHeadersTransformRequestBody 修改完 header 和 body 后直接发给 LLM 后端,无须再经过 ai-proxy(因此也无须新建一个 Local cluster)。

使用以下配置文件进行测试,可以处理多目标服务基于权重路由的场景。

apiVersion: extensions.higress.io/v1alpha1
kind: WasmPlugin
metadata:
  name: ai-proxy-groq
  namespace: higress-system
spec:
  matchRules:
  - config:
      provider:
        type: groq
        apiTokens: 
          - "sk-good-groq"
          - "sk-bad-groq"
        modelMapping:
          "*": llama3-8b-8192
        failover:
          enabled: true
          failureThreshold: 3
          successThreshold: 5
          healthCheckModel: gpt-3
    service:
    - groq.dns
  - config:
      provider:
        type: claude
        apiTokens: 
          - "sk-good-claude"
          - "sk-bad-claude"
        modelMapping:
          gpt-3: claude-3-opus-20240229
          "*": claude-3-sonnet-20240229
        failover:
          enabled: true
          failureThreshold: 2
          successThreshold: 9
          healthCheckModel: gpt-3
    service:
    - claude.dns
  url: oci://cr7258/ai-proxy:failover-v60
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  annotations:
     higress.io/destination: |
      50% claude.dns
      50% groq.dns
  labels:
    higress.io/resource-definer: higress
  name: test-ai
  namespace: higress-system
spec:
  ingressClassName: higress
  rules:
  - host: test-ai.com
    http:
      paths:
      - backend:
          resource:
            apiGroup: networking.higress.io
            kind: McpBridge
            name: default
        path: /
        pathType: Prefix
---
apiVersion: networking.higress.io/v1
kind: McpBridge
metadata:
  name: default
  namespace: higress-system
spec:
  registries:
  - domain: api.groq.com
    name: groq
    port: 443
    type: dns
    protocol: https
    sni: api.groq.com
  - domain: api.anthropic.com
    name: claude
    port: 443
    type: dns
    protocol: https
    sni: api.anthropic.com

@cr7258
Copy link
Collaborator Author

cr7258 commented Aug 31, 2024

@johnlanni 我修改了代码,使用 SetSharedData 在多个 VM 之间同步 apiToken 的信息,并且也使用 SetSharedData 进行选主了。

不过要注意SharedData中的数据是VM级别的,即使插件配置更新也不会清理。

这个地方提到的注意点,我需要做那些处理?

healthCheckClient = wrapper.NewClusterClient(wrapper.StaticIpCluster{
ServiceName: "local_cluster",
Port: 10000,
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接用 RouteCluster 就好了,就是当前路由到的服务,然后去请求这个服务来校验健康,直接用 Authorization 头就行了,不用设置这个 ApiToken-Health-Check 头

}

func generateVMID() string {
return fmt.Sprintf("%016x", time.Now().Nanosecond())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vm id 可以通过 getProperty 直接拿到,key 是 "plugin_vm_id"

参考:https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/advanced/attributes.html#wasm-attributes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我试了下不知道为啥拿不到 plugin_vm_id。。。 其他变量是可以拿到的。。。🫠

vmIDByte, err := proxywasm.GetProperty([]string{"plugin_vm_id"})
fmt.Println("vmID: ", string(vmIDByte))
vmPlugin, _ := proxywasm.GetProperty([]string{"plugin_name"})
fmt.Println("plugin name: ", string(vmPlugin))
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

哦 是我搞错了 这个vm_id是配置里的,不是标识一个唯一的vm,现在配置的是空字符串

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那是不是生成一个 uuid 好一些?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if c.failover != nil && c.failover.enabled {
wrapper.RegisteTickFunc(c.failover.healthCheckTimeout, func() {
// Only the Wasm VM that successfully acquires the lease will perform health check
if tryAcquireOrRenewLease(vmID, log) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要加个 else 逻辑,没有选到主的,需要定时(健康检查的间隔)从 shared data 中获取全局token,来更新当前自己本地 thread local 的全局token。
这样避免每次请求来都去请求 shared data,因为envoy底层实现这个get/set shared data操作都要加锁,有额外开销

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有选到主的 Wasm VM 是不会去做健康检测的,只有选到主的 VM 才会去获取全局的 unavailableTokens 进行健康检测。所以这里好像不需要加上 else 的逻辑?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

但是不止有健康检查的时候要去请求shared data,当次请求失败,需要增加fail count的时候也要去访问

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在总有 4 个 shared data:

  • ctxApiTokenRequestFailureCount:请求失败的 token 计数,map 结构,key 是 apiToken,value 是失败的次数
  • ctxApiTokenRequestSuccessCount:达到失败次数阈值禁用的 token,需要进行健康检查,map 结构,key 是 apiToken,value 是健康检测成功的次数
  • ctxApiTokens:可以使用的 token 列表
  • ctxUnavailableApiTokens:禁用的 token 列表

没有选到主的,需要定时(健康检查的间隔)从 shared data 中获取全局token,来更新当前自己本地 thread local 的全局 token:

这里有两个问题:
1.如果某个 apiToken 达到失败阈值被移除了,由于定时从 shared data 同步到本地会有延迟,会出现仍然有 wasm vm 尝试使用已禁用的 token 进行访问
2.选到主的 wasm vm 也是会接收请求的,如果需要定期从 shared data 中获取全局 token 来更新到本地,那么这个逻辑应该不只是在 else 中做,不管是选到主的 还是没选到主的,都应该进行这个操作

但是不止有健康检查的时候要去请求shared data,当次请求失败,需要增加fail count的时候也要去访问

增加 fail count 的时候要访问 ctxApiTokenRequestFailureCount,这个计数应该要是精确的,所以不用定时从全局去同步到本地

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,现在这样处理问题不大,就是高并发下可能有性能问题。性能和同步延迟之间是要做trade-off的,ai proxy场景下一般并发不会特别高,这样处理可以接受。

})

vmID := generateVMID()
err := c.initApiTokens()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为 shared data 中的内容是跟随插件 vm 的生命周期的,只有插件关闭/版本升级等情况内容才会被清理。所以这里初始化的时候,要重置所有 shared data 中相关的数据,availiabe和unavailiable的都要重置。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: 82b2284

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以,已调整

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个我没问题了

log.Errorf("Failed to get unavailable apiToken: %v", err)
return
}
c.addApiToken(ctxUnavailableApiTokens, apiToken, unavailableTokens, unavailableCas, log)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle unavailable api token 会在每个worker线程里都掉用,所以这里 add 的时候不能简单覆盖,要考虑冲突的情况,应该先 get 出来,再在基础上加上对应的 token,再去 set,这个过程中用 cas 来识别冲突,如果冲突进行重试。可以最多重试例如10次。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里我是有先 get 出来,再 append 进行添加的。

unavailableTokens, unavailableCas, err := getApiTokens(ctxUnavailableApiTokens)
if err != nil {
log.Errorf("Failed to get unavailable apiToken: %v", err)
return
}
c.addApiToken(ctxUnavailableApiTokens, apiToken, unavailableTokens, unavailableCas, log)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我补充一下 cas 重试的逻辑。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已添加 cas 重试逻辑。

@johnlanni
Copy link
Collaborator

@johnlanni 我修改了代码,使用 SetSharedData 在多个 VM 之间同步 apiToken 的信息,并且也使用 SetSharedData 进行选主了。

不过要注意SharedData中的数据是VM级别的,即使插件配置更新也不会清理。

这个地方提到的注意点,我需要做那些处理?

大的问题没有,上面提到一些跟机制相关的细节处理,辛苦再调整下

@CH3CHO
Copy link
Collaborator

CH3CHO commented Sep 4, 2024

README.md 应该也要更新一下

plugins/wasm-go/extensions/ai-proxy/main.go Outdated Show resolved Hide resolved
plugins/wasm-go/extensions/ai-proxy/main.go Outdated Show resolved Hide resolved
plugins/wasm-go/extensions/ai-proxy/main.go Outdated Show resolved Hide resolved
@cr7258
Copy link
Collaborator Author

cr7258 commented Oct 5, 2024

@johnlanni @CH3CHO
已根据建议重新对代码进行了修改,请重新 review,谢谢。

我在测试集群中使用以下配置对功能进行了验证,并且 failover 功能对于不同的 provider 的 apiToken 列表和计数是隔离,不会相互影响。

apiVersion: extensions.higress.io/v1alpha1
kind: WasmPlugin
metadata:
  name: ai-proxy-groq
  namespace: higress-system
spec:
  matchRules:
  - config:
      provider:
        type: groq
        apiTokens: 
          - <your-good-groq-token>
          - "sk-bad-groq"
        failover:
          enabled: true
          failureThreshold: 3
          successThreshold: 5
          healthCheckModel: llama3-8b-8192          
    ingress:
    - groq
  - config:
      provider:
        type: qwen
        apiTokens: 
          - <your-good-qwen-token>
          - "sk-bad-qwen"
        modelMapping:
          gpt-3: qwen-turbo
          gpt-35-turbo: qwen-plus
          gpt-4-turbo: qwen-max
          "*": qwen-turbo
        failover:
          enabled: true
          failureThreshold: 2
          successThreshold: 9
          healthCheckModel: gpt-3
    ingress:
    - qwen 
  url: oci://cr7258/ai-proxy:failover-v18
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  annotations:
    higress.io/backend-protocol: HTTPS
    higress.io/destination: groq.dns
    higress.io/proxy-ssl-name: api.groq.com
    higress.io/proxy-ssl-server-name: "on"
  labels:
    higress.io/resource-definer: higress
  name: groq
  namespace: higress-system
spec:
  ingressClassName: higress
  rules:
  - host: test-ai-groq.com
    http:
      paths:
      - backend:
          resource:
            apiGroup: networking.higress.io
            kind: McpBridge
            name: default
        path: /
        pathType: Prefix
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  annotations:
    higress.io/backend-protocol: HTTPS
    higress.io/destination: qwen.dns
    higress.io/proxy-ssl-name: dashscope.aliyuncs.com
    higress.io/proxy-ssl-server-name: "on"
  labels:
    higress.io/resource-definer: higress
  name: qwen
  namespace: higress-system
spec:
  ingressClassName: higress
  rules:
  - host: test-ai-qwen.com
    http:
      paths:
      - backend:
          resource:
            apiGroup: networking.higress.io
            kind: McpBridge
            name: default
        path: /
        pathType: Prefix
---
apiVersion: networking.higress.io/v1
kind: McpBridge
metadata:
  name: default
  namespace: higress-system
spec:
  registries:
  - domain: api.groq.com
    name: groq
    port: 443
    type: dns
  - domain: dashscope.aliyuncs.com
    name: qwen
    port: 443
    type: dns

Copy link
Collaborator

@CH3CHO CH3CHO left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我没什么其他问题了

@@ -27,10 +27,14 @@ type Cluster interface {
}

type RouteCluster struct {
Host string
Host string
Cluster string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个改动有点奇怪。RouteCluster 指的就是当前请求的目标集群,为什么还要支持强制设置呢?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/cr7258/higress/blob/main/plugins/wasm-go/pkg/wrapper/cluster_wrapper.go#L34
因为 ClusterName() 方法通过获取 envoy 的 cluster_name 属性来设置 cluster。

routeName, err := proxywasm.GetProperty([]string{"cluster_name"})

然而健康检测的 HTTP 请求(是定时触发的)和正常的用户请求不在一个上下文中,因此无法获取到用户请求的 cluster,所以这里新增支持强制设置的方式。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那我建议是换个 cluster 的实现,而不是复用 RouteCluster。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用了一个新的 Cluster 实现。

type TargetCluster struct {
Host string
Cluster string
}
func (c TargetCluster) ClusterName() string {
return c.Cluster
}
func (c TargetCluster) HostName() string {
return c.Host
}

@@ -106,6 +107,8 @@ var (

type Provider interface {
GetProviderType() string
TransformRequestHeaders(headers http.Header, ctx wrapper.HttpContext, log wrapper.Log)
TransformRequestBody(body []byte, ctx wrapper.HttpContext, log wrapper.Log) ([]byte, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个方法每个provider都必须要实现吗?会不会有不需要实现的情况?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里感觉甚至都可以把调用逻辑直接抽到main里面,只要调用对应的Transform方法然后覆盖原有的headers或者body就行了,provider完全不需要操心覆盖的逻辑。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果把调用的逻辑放到 main 里面,那好像每个 provider 的 OnRequestHeaders 和 OnRequestBody 也不需要了?
另外接口要不要扩展一下分别支持 chatCompletion 和 embeddings 请求?

type Provider interface {
  TransformChatCompletionRequestHeaders
  TransformChatCompletionRequestBody
  TransformEmbeddingsRequestHeaders
  TransformEmbeddingsRequestBody
}

}
err := m.contextCache.GetContent(func(content string, err error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context的逻辑没啦?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我之前看错了给删了。。 我加回来。。

return types.ActionContinue, replaceJsonRequestBody(claudeRequest, log)
}

err := c.contextCache.GetContent(func(content string, err error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context逻辑

@cr7258
Copy link
Collaborator Author

cr7258 commented Nov 3, 2024

@CH3CHO 我把调用的逻辑包装到 handleRequestHeaders 和 handleRequestBody 函数中了,每个 provider 在 OnRequestHeaders 和 OnRequestBody 中分别调用这两个函数即可。之所以没有抽到 main 函数中,是考虑到在处理 headers 或者 body 的前后不同的 provider 的逻辑有可能有些不一样。example qwen, example claude

在 handleRequestBody 中还对从文件中获取 context 这种统一的行为作为处理,每个 provider 不需要重复写 m.contextCache.GetContent(func(content string, err error) 这部分代码了。insertContext 允许用户实现 provider 自己的 insertHttpContextMessage 方法,比如 qwenclaude 插入 system message 的方式不一样,如果没有实现,则使用默认的 defaultInsertHttpContextMessage 方法。

TransformRequestHeaders 和 TransformRequestBody 目前改为可选实现,如果没有实现 TransformRequestHeaders,不做任何修改,如何没有实现 TransformRequestBody,则只调用 defaultTransformRequestBody 方法做 model 映射。

上述修改已使用下面配置文件进行测试:

apiVersion: extensions.higress.io/v1alpha1
kind: WasmPlugin
metadata:
  name: ai-proxy-groq
  namespace: higress-system
spec:
  matchRules:
  - config:
      provider:
        type: groq
        apiTokens: 
          - "<grop-token>"
          - "sk-bad-groq"
        modelMapping:
          "*": llama3-8b-8192
        context:
          fileUrl: https://raw.githubusercontent.com/cr7258/test-context/refs/heads/main/README.md
          serviceName: github.dns
          servicePort: 443
        failover:
          enabled: true
          failureThreshold: 3
          successThreshold: 5
          healthCheckModel: gpt-3
    service:
    - groq.dns
  - config:
      provider:
        type: claude
        apiTokens: 
          - "<claude-token>"
          - "sk-bad-claude"
        modelMapping:
          gpt-3: claude-3-opus-20240229
          "*": claude-3-sonnet-20240229
        context:
          fileUrl: https://raw.githubusercontent.com/cr7258/test-context/refs/heads/main/README.md
          serviceName: github.dns
          servicePort: 443
        failover:
          enabled: true
          failureThreshold: 2
          successThreshold: 9
          healthCheckModel: gpt-3
    service:
    - claude.dns
  - config:
      provider:
        type: qwen
        apiTokens: 
          - "<qwen-token>"
          - "sk-bad-qwen"
        modelMapping:
          gpt-3: qwen-turbo
          "*": qwen-turbo
        context:
          fileUrl: https://raw.githubusercontent.com/cr7258/test-context/refs/heads/main/README.md
          serviceName: github.dns
          servicePort: 443
        failover:
          enabled: true
          failureThreshold: 4
          successThreshold: 7
          healthCheckModel: gpt-3
    service:
    - qwen.dns
  url: oci://cr7258/ai-proxy:failover-v86
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  annotations:
     higress.io/destination: |
      30% claude.dns
      30% groq.dns
      40% qwen.dns
  labels:
    higress.io/resource-definer: higress
  name: test-ai
  namespace: higress-system
spec:
  ingressClassName: higress
  rules:
  - host: test-ai.com
    http:
      paths:
      - backend:
          resource:
            apiGroup: networking.higress.io
            kind: McpBridge
            name: default
        path: /
        pathType: Prefix
---
apiVersion: networking.higress.io/v1
kind: McpBridge
metadata:
  name: default
  namespace: higress-system
spec:
  registries:
  - domain: api.groq.com
    name: groq
    port: 443
    type: dns
    protocol: https
    sni: api.groq.com
  - domain: api.anthropic.com
    name: claude
    port: 443
    type: dns
    protocol: https
    sni: api.anthropic.com
  - domain: dashscope.aliyuncs.com
    name: qwen
    port: 443
    type: dns
    protocol: https
    sni: dashscope.aliyuncs.com
  - domain: raw.githubusercontent.com
    name: github
    port: 443
    type: dns
    protocol: https
    sni: raw.githubusercontent.com

现在只对 qwen, grop, claude 这 3 个 provider 的代码做了对应的适配,如果没有其他问题的话,后面我把其他的 provider 也对应修改一下。

@cr7258 cr7258 requested a review from CH3CHO November 4, 2024 07:45
plugins/wasm-go/extensions/ai-proxy/util/http.go Outdated Show resolved Hide resolved
plugins/wasm-go/extensions/ai-proxy/util/http.go Outdated Show resolved Hide resolved
@@ -31,6 +37,15 @@ func replaceJsonRequestBody(request interface{}, log wrapper.Log) error {
return err
}

func replaceHttpJsonRequestBody(body []byte, log wrapper.Log) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数与上面 replaceJsonRequestBody 相比,为啥会多出来 HTTP 这个词,它和 JSON 又有什么关系?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议上面的 replaceJsonRequestBody 直接调用这个,减少冗余

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个只是为了和原先的 replaceJsonRequestBody 区分一下,所以临时取了一个新的名字。
等其他的 provider 也按照新的逻辑修改以后,将只保留新的 replaceJsonRequestBody 方法。

@@ -353,6 +380,54 @@ func CreateProvider(pc ProviderConfig) (Provider, error) {
return initializer.CreateProvider(pc)
}

func (c *ProviderConfig) parseRequestAndMapModel(ctx wrapper.HttpContext, request interface{}, body []byte, log wrapper.Log) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我不是很认同把这么多逻辑放到 ProviderConfig 里的做法。这里有几个问题:

  1. 如果 protocol 是 original,大概率 main 那边是算不出 ApiName 的,因为不同的 provider 对应的 path 可能是不一样的,还是需要让 provider 各自的实现去做(这块现在的实现是有问题的,只是还没有精力去改)。
  2. 像下面 handleRequestHeaders 函数的逻辑,是不是放到 main 里做更合适呢?
  3. 如果是支持继承的语言,这些逻辑应该是放到父类里,并且父类可能是个抽象类。虽然语言特性不同,但逻辑是一样的。放到 config 感觉不是一个合适的做法。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不过如果功能上都 OK,我也可以保留意见,先合并这部分逻辑,后续再安排重构。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

像下面 handleRequestHeaders 函数的逻辑,是不是放到 main 里做更合适呢?

没有放到 main 中调用是考虑到 handleRequestHeaders 和 handleRequestBody 前后不同的 provider 上逻辑有点不一样。具体如下:

func (m *qwenProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error) {
       // 1.有的 provider 可能只实现了 ChatCompletion,有的可能是实现了 ChatCompletion 和 Embeddings,目前需要进入不同 provider 的 OnRequestHeaders 进行判断
	if apiName != ApiNameChatCompletion && apiName != ApiNameEmbeddings {
		return types.ActionContinue, errUnsupportedApiName
	}

	m.config.handleRequestHeaders(m, ctx, apiName, log)

	if m.config.protocol == protocolOriginal {
		ctx.DontReadRequestBody()
		return types.ActionContinue, nil
	}

	// 2.有的 provider 是返回 types.ActionContinue
	return types.HeaderStopIteration, nil
}

handleRequestBody 也是类似,比如 qwen 前面有一段自己的逻辑。

func (m *qwenProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) {
	if m.config.qwenEnableCompatible {
		if gjson.GetBytes(body, "model").Exists() {
			rawModel := gjson.GetBytes(body, "model").String()
			mappedModel := getMappedModel(rawModel, m.config.modelMapping, log)
			newBody, err := sjson.SetBytes(body, "model", mappedModel)
			if err != nil {
				log.Errorf("Replace model error: %v", err)
				return types.ActionContinue, err
			}

			// TODO: Temporary fix to clamp top_p value to the range [qwenTopPMin, qwenTopPMax].
			if topPValue := gjson.GetBytes(body, "top_p"); topPValue.Exists() {
				rawTopP := topPValue.Float()
				scaledTopP := math.Max(qwenTopPMin, math.Min(rawTopP, qwenTopPMax))
				newBody, err = sjson.SetBytes(newBody, "top_p", scaledTopP)
				if err != nil {
					log.Errorf("Failed to replace top_p: %v", err)
					return types.ActionContinue, err
				}
			}

			err = proxywasm.ReplaceHttpRequestBody(newBody)
			if err != nil {
				log.Errorf("Replace request body error: %v", err)
				return types.ActionContinue, err
			}
		}
		return types.ActionContinue, nil
	}

	if apiName != ApiNameChatCompletion && apiName != ApiNameEmbeddings {
		return types.ActionContinue, errUnsupportedApiName
	}
	return m.config.handleRequestBody(m, m.contextCache, ctx, apiName, body, log)
}

) (types.Action, error) {
// use original protocol
if c.protocol == protocolOriginal {
if apiName == ApiNameChatCompletion {
Copy link
Collaborator

@johnlanni johnlanni Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里这个判断是不准确的,因为main那边是根据openai的协议来判断apiName的,original协议的话判断不准;
这段逻辑可以去掉,original下不支持 context from file,没有什么问题

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已移除这段逻辑:f164854

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AI apitoken failover 机制设计
4 participants