前言
目前chatGPT无疑仍然是目前市面上众多生成式AI模型中的领头羊,不仅在生成速度与质量上都十分出色,而且也有着很完整的开发文档,开发者可以很方便的使用sdk直接调用api进行更多不同场景业务的实现。
但是目前对于国内的用户却不太友好,不仅在网络环境上由于GFW先被卡了一关,OpenAI对帐号的严格风控也使得顺利使用困难重重,好在网上各路神仙大显神通,可以以极低的价格购买一批API keys进行早期的程序测试,在程序完成后再采用正式的key进行上线。
免费key的问题
但是对于免费key来说,有两个问题比较致命,一是最多只有5美金的试用额度,如果不分配均匀很容易会侧重消耗某个key的余额,另外便是每分钟的请求速率只有每分钟3条的限制。
部署后端的问题
因客观原因,后端服务只能部署在国内云服务器上,因此墙内访问openAI的 api 接口也是不小的挑战。
需要解决的问题
- 多 key 的动态分配调用,在外部调用层只需修改 base_url 而无需进行更多的配置
- 后端部署在国内服务器上,需要解决访问 api 的问题。
国内服务器访问 api
在查找资料后,选择了阿里云的云函数进行 api 的转发。
在搭建完成后,只需在代码中指定 api 的地址为阿里云的云函数地址即可。
// Completion
//
// @dc: 代理chat/completion
// @params:
// @response:
// @author: Hamster @date:2023/7/5 15:05:00
func (u *uProxyChat) Completion(authToken, model string, chatMessage []openai.ChatCompletionMessage) (resp *openai.ChatCompletionResponse, err error) {
// 获取配置base_url
baseConfig, err := service.GptProxyConfig().GetConfig(context.Background(), &m_gpt_proxy_config.InSelectByWhere{ConfigKey: "OPEN_AI_BASE_URL"})
if err != nil {
fmt.Printf("GetConfig error: %v\n", err)
return resp, err
}
config := openai.DefaultConfig(authToken)
config.BaseURL = baseConfig.ConfigValue
client := openai.NewClientWithConfig(config)
clientResp, err := client.CreateChatCompletion(
context.Background(),
openai.ChatCompletionRequest{
Model: model,
Messages: chatMessage,
},
)
if err != nil {
return resp, err
}
resp = &clientResp
return resp, nil
}
实测云函数地区选择新加坡🇸🇬,延迟在 500ms 之内,与直接海外服务器访问延迟相差不大。
负载均衡算法
核心思想是分为 步:
- 从数据库/缓存中获取全部可用的 key
- 从 key 列表中获取使用次数最少的一个 key
- 尝试使用 key 发起 api 请求
(请求成功)记录key 当前分钟剩余可用请求次数与可用字符
(请求失败)进行重试逻辑判断
重试算法
采用通道阻塞与计时器的方式进行重试的尝试,重试后如果仍不成功则慢慢加大每次重试的间隔时间,最大程度减少 panic 与错误传递给调用方的可能。
// PerformWithRetry
//
// @dc: 重试等待机制
// @auth: Hamster @date:2023/7/8 15:10:46
func (u *uLoadBalance) PerformWithRetry(ctx context.Context, operation func(params GPTParams) (GPTResp *openai.ChatCompletionResponse, err error), config GPTRetryConfig, params GPTParams) (GPTResp *openai.ChatCompletionResponse, err error) {
retries := 0
for {
GPTResp, err := operation(params)
if err == nil {
return GPTResp, nil
}
select {
case <-ctx.Done():
return nil, errors.New("操作被取消")
default:
gmlock.Unlock(u.key)
retries++
if retries >= config.MaxRetries {
return nil, &GPTRetryError{
Err: err,
Retries: retries,
MaxRetries: config.MaxRetries,
}
}
backoff := config.BackoffStrategy(retries)
glog.Warningf(ctx, "操作失败,重试中... (重试次数:%d/%d, 重试间隔:%v)", retries, config.MaxRetries, backoff)
select {
case <-ctx.Done():
return nil, errors.New("操作被取消")
case <-time.After(backoff):
}
}
}
}
定义对应结构体数据结构
type GPTRetryConfig struct {
MaxRetries int
RetryInterval time.Duration
BackoffStrategy func(retry int) time.Duration
}
type GPTRetryError struct {
Err error
Retries int
MaxRetries int
}
type GPTParams struct {
AuthToken string
Model string
ChatMessage []openai.ChatCompletionMessage
}
// KeyCacheInfo 缓存中的key信息
type KeyCacheInfo struct {
KeyValue string // key值
RequestLeft int // 1分钟内key剩余请求次数
TokenLeft int // 1分钟内key剩余token数
}
最终运行效果
在日志中可以清晰的看到每一个 key 的具体使用情况,同时可以看到重置限制时间的倒计时。
完整代码
// GPTCoreOperation
//
// @dc: 代理chat/completion
// @author: Hamster @date:2023/7/5 15:05:00
func (u *uLoadBalance) GPTCoreOperation(params GPTParams) (GPTResp *openai.ChatCompletionResponse, err error) {
var (
ctx = context.TODO()
cacheKeysStruct []*entity.GptProxyKey
cacheKeyInfo = &KeyCacheInfo{}
)
// 1.从数据库中获取全部key
cacheKeysStruct, _ = service.GptProxyKey().RetrieveKey(ctx, &m_gpt_proxy_key.InSelectByWhere{KeyStatus: 1})
gmlock.Lock(u.key) // 加锁
// 2.获取使用次数最小的key
minUsedKey := &entity.GptProxyKey{}
for _, cacheKey := range cacheKeysStruct {
// 获取是否可用
singleCacheKeyVar := gcache.MustGet(ctx, cacheKey.KeyValue)
if !singleCacheKeyVar.IsNil() {
singleCacheKeyInfo := &KeyCacheInfo{}
err := singleCacheKeyVar.Struct(&singleCacheKeyInfo)
if err != nil {
continue
}
glog.Debug(ctx, "当前key: ", cacheKey.KeyValue, " 剩余请求次数: ", singleCacheKeyInfo.RequestLeft,
" 剩余token数: ", singleCacheKeyInfo.TokenLeft, " 重置时间: ", gcache.MustGetExpire(ctx, cacheKey.KeyValue))
if singleCacheKeyInfo.RequestLeft <= 0 || singleCacheKeyInfo.TokenLeft <= 0 {
continue
}
}
if minUsedKey.KeyValue == "" {
minUsedKey = cacheKey
} else {
if cacheKey.UsedCount < minUsedKey.UsedCount {
minUsedKey = cacheKey
}
}
}
if minUsedKey.KeyValue == "" {
return nil, errors.New("没有可用的key")
}
glog.Info(ctx, "当前使用次数最小的key: ", minUsedKey.KeyValue)
// 3.检测Key是否在缓存中且可用
cacheKeyVar := gcache.MustGetOrSetFuncLock(ctx, minUsedKey.KeyValue, func(ctx context.Context) (interface{}, error) {
initCacheInfo := &KeyCacheInfo{
KeyValue: minUsedKey.KeyValue,
RequestLeft: 3,
TokenLeft: 40000,
}
return initCacheInfo, nil
}, 1*time.Minute)
err = cacheKeyVar.Struct(&cacheKeyInfo)
if err != nil {
return nil, err
}
// 4.如果可用,使用该key进行操作(减少request_left与token_left)
cacheKeyInfo.RequestLeft--
//TODO (laixin) 2023/7/7: 修改token_left
err = gcache.Set(ctx, cacheKeyInfo.KeyValue, cacheKeyInfo, gcache.MustGetExpire(ctx, cacheKeyInfo.KeyValue))
if err != nil {
return nil, err
}
gmlock.Unlock(u.key) // 解锁
// 5.修改key的使用次数
_ = grpool.AddWithRecover(ctx, func(ctx context.Context) {
err = service.GptProxyKey().UpdateKey(ctx, &m_gpt_proxy_key.InUpdate{
Id: minUsedKey.Id,
UsedCount: minUsedKey.UsedCount + 1,
UpdateTime: gtime.Now(),
})
if err != nil {
glog.Warning(ctx, "修改key使用次数失败: ", err.Error())
}
}, nil)
// 6.调用GPT接口
gptStartTime := gtime.Now()
GPTResp, err = gpt_utils.ProxyChat.Completion(minUsedKey.KeyValue, params.Model, params.ChatMessage)
gptEndTime := gtime.Now()
// 7.如果失败,修改key失败次数
if err != nil {
glog.Warning(ctx, "调用GPT接口失败: ", err.Error())
_ = grpool.AddWithRecover(ctx, func(ctx context.Context) {
// 记录日志
err = service.GptProxyLog().AddGPTLog(ctx, &m_gpt_proxy_log.InAdd{
ApiPath: "/chat/completions",
RequestParams: gconv.String(params),
ResponseResult: "{}",
ResponseTime: gptEndTime.Sub(gptStartTime).Seconds(),
ErrorMessage: err.Error(),
KeyId: gconv.Int(minUsedKey.Id),
CreateTime: gtime.Now(),
})
if err != nil {
glog.Error(ctx, "记录日志失败: ", err.Error())
}
// 修改key失败次数
err = service.GptProxyKey().UpdateKey(ctx, &m_gpt_proxy_key.InUpdate{
Id: minUsedKey.Id,
FailedCount: minUsedKey.FailedCount + 1,
UpdateTime: gtime.Now(),
})
if err != nil {
glog.Error(ctx, "修改key失败次数失败: ", err.Error())
}
}, nil)
return nil, err
}
// 8.如果成功,修改key成功次数,返回结果
_ = grpool.AddWithRecover(ctx, func(ctx context.Context) {
// 记录日志
err = service.GptProxyLog().AddGPTLog(ctx, &m_gpt_proxy_log.InAdd{
ApiPath: "/chat/completions",
RequestParams: gconv.String(params),
ResponseResult: gconv.String(GPTResp),
ResponseTime: gptEndTime.Sub(gptStartTime).Seconds(),
ErrorMessage: "",
KeyId: gconv.Int(minUsedKey.Id),
CreateTime: gtime.Now(),
})
if err != nil {
glog.Warning(ctx, "记录日志失败: ", err.Error())
}
// 修改key成功次数
err = service.GptProxyKey().UpdateKey(ctx, &m_gpt_proxy_key.InUpdate{
Id: minUsedKey.Id,
SuccessCount: minUsedKey.SuccessCount + 1,
UpdateTime: gtime.Now(),
})
if err != nil {
glog.Warning(ctx, "修改key成功次数失败: ", err.Error())
}
}, nil)
return GPTResp, nil
}
后话
在这套系统上线后,在 GPT 模型的前期验证中剩下了一大笔开销,也减少了调用方的编写各种重试逻辑的难度。