Next.js logomark实现 ChatGPT 负载均衡

2023年8月4日

Openai logomark
Hamster1963

前言

目前chatGPT无疑仍然是目前市面上众多生成式AI模型中的领头羊,不仅在生成速度与质量上都十分出色,而且也有着很完整的开发文档,开发者可以很方便的使用sdk直接调用api进行更多不同场景业务的实现。

但是目前对于国内的用户却不太友好,不仅在网络环境上由于GFW先被卡了一关,OpenAI对帐号的严格风控也使得顺利使用困难重重,好在网上各路神仙大显神通,可以以极低的价格购买一批API keys进行早期的程序测试,在程序完成后再采用正式的key进行上线。

Untitled

免费key的问题

但是对于免费key来说,有两个问题比较致命,一是最多只有5美金的试用额度,如果不分配均匀很容易会侧重消耗某个key的余额,另外便是每分钟的请求速率只有每分钟3条的限制。

部署后端的问题

因客观原因,后端服务只能部署在国内云服务器上,因此墙内访问openAI的 api 接口也是不小的挑战。

需要解决的问题

  1. 多 key 的动态分配调用,在外部调用层只需修改 base_url 而无需进行更多的配置
  2. 后端部署在国内服务器上,需要解决访问 api 的问题。

国内服务器访问 api

在查找资料后,选择了阿里云的云函数进行 api 的转发。

在搭建完成后,只需在代码中指定 api 的地址为阿里云的云函数地址即可。

Untitled

// Completion
//
//	@dc: 代理chat/completion
//	@params:
//	@response:
//	@author: Hamster   @date:2023/7/5 15:05:00
func (u *uProxyChat) Completion(authToken, model string, chatMessage []openai.ChatCompletionMessage) (resp *openai.ChatCompletionResponse, err error) {
	// 获取配置base_url
	baseConfig, err := service.GptProxyConfig().GetConfig(context.Background(), &m_gpt_proxy_config.InSelectByWhere{ConfigKey: "OPEN_AI_BASE_URL"})
	if err != nil {
		fmt.Printf("GetConfig error: %v\n", err)
		return resp, err
	}
	config := openai.DefaultConfig(authToken)
	config.BaseURL = baseConfig.ConfigValue
	client := openai.NewClientWithConfig(config)
	clientResp, err := client.CreateChatCompletion(
		context.Background(),
		openai.ChatCompletionRequest{
			Model:    model,
			Messages: chatMessage,
		},
	)
	if err != nil {
		return resp, err
	}
	resp = &clientResp
	return resp, nil
}

实测云函数地区选择新加坡🇸🇬,延迟在 500ms 之内,与直接海外服务器访问延迟相差不大。

负载均衡算法

核心思想是分为 步:

  1. 从数据库/缓存中获取全部可用的 key
  2. 从 key 列表中获取使用次数最少的一个 key
  3. 尝试使用 key 发起 api 请求

(请求成功)记录key 当前分钟剩余可用请求次数与可用字符

(请求失败)进行重试逻辑判断

重试算法

采用通道阻塞与计时器的方式进行重试的尝试,重试后如果仍不成功则慢慢加大每次重试的间隔时间,最大程度减少 panic 与错误传递给调用方的可能。

// PerformWithRetry
//
//	@dc: 重试等待机制
//	@auth: Hamster   @date:2023/7/8 15:10:46
func (u *uLoadBalance) PerformWithRetry(ctx context.Context, operation func(params GPTParams) (GPTResp *openai.ChatCompletionResponse, err error), config GPTRetryConfig, params GPTParams) (GPTResp *openai.ChatCompletionResponse, err error) {
	retries := 0
	for {
		GPTResp, err := operation(params)
		if err == nil {
			return GPTResp, nil
		}
		select {
		case <-ctx.Done():
			return nil, errors.New("操作被取消")
		default:
			gmlock.Unlock(u.key)
			retries++
			if retries >= config.MaxRetries {
				return nil, &GPTRetryError{
					Err:        err,
					Retries:    retries,
					MaxRetries: config.MaxRetries,
				}
			}

			backoff := config.BackoffStrategy(retries)
			glog.Warningf(ctx, "操作失败,重试中... (重试次数:%d/%d, 重试间隔:%v)", retries, config.MaxRetries, backoff)
			select {
			case <-ctx.Done():
				return nil, errors.New("操作被取消")
			case <-time.After(backoff):
			}
		}
	}
}

定义对应结构体数据结构

type GPTRetryConfig struct {
	MaxRetries      int
	RetryInterval   time.Duration
	BackoffStrategy func(retry int) time.Duration
}

type GPTRetryError struct {
	Err        error
	Retries    int
	MaxRetries int
}

type GPTParams struct {
	AuthToken   string
	Model       string
	ChatMessage []openai.ChatCompletionMessage
}

// KeyCacheInfo 缓存中的key信息
type KeyCacheInfo struct {
	KeyValue    string // key值
	RequestLeft int    // 1分钟内key剩余请求次数
	TokenLeft   int    // 1分钟内key剩余token数
}

最终运行效果

WechatIMG36.jpg

在日志中可以清晰的看到每一个 key 的具体使用情况,同时可以看到重置限制时间的倒计时。

完整代码

// GPTCoreOperation
//
//	@dc: 代理chat/completion
//	@author: Hamster   @date:2023/7/5 15:05:00
func (u *uLoadBalance) GPTCoreOperation(params GPTParams) (GPTResp *openai.ChatCompletionResponse, err error) {
	var (
		ctx             = context.TODO()
		cacheKeysStruct []*entity.GptProxyKey
		cacheKeyInfo    = &KeyCacheInfo{}
	)

	// 1.从数据库中获取全部key
	cacheKeysStruct, _ = service.GptProxyKey().RetrieveKey(ctx, &m_gpt_proxy_key.InSelectByWhere{KeyStatus: 1})
	gmlock.Lock(u.key) // 加锁
	// 2.获取使用次数最小的key
	minUsedKey := &entity.GptProxyKey{}
	for _, cacheKey := range cacheKeysStruct {
		// 获取是否可用
		singleCacheKeyVar := gcache.MustGet(ctx, cacheKey.KeyValue)
		if !singleCacheKeyVar.IsNil() {
			singleCacheKeyInfo := &KeyCacheInfo{}
			err := singleCacheKeyVar.Struct(&singleCacheKeyInfo)
			if err != nil {
				continue
			}
			glog.Debug(ctx, "当前key: ", cacheKey.KeyValue, " 剩余请求次数: ", singleCacheKeyInfo.RequestLeft,
				" 剩余token数: ", singleCacheKeyInfo.TokenLeft, " 重置时间: ", gcache.MustGetExpire(ctx, cacheKey.KeyValue))
			if singleCacheKeyInfo.RequestLeft <= 0 || singleCacheKeyInfo.TokenLeft <= 0 {
				continue
			}
		}
		if minUsedKey.KeyValue == "" {
			minUsedKey = cacheKey
		} else {
			if cacheKey.UsedCount < minUsedKey.UsedCount {
				minUsedKey = cacheKey
			}
		}
	}
	if minUsedKey.KeyValue == "" {
		return nil, errors.New("没有可用的key")
	}
	glog.Info(ctx, "当前使用次数最小的key: ", minUsedKey.KeyValue)
	// 3.检测Key是否在缓存中且可用
	cacheKeyVar := gcache.MustGetOrSetFuncLock(ctx, minUsedKey.KeyValue, func(ctx context.Context) (interface{}, error) {
		initCacheInfo := &KeyCacheInfo{
			KeyValue:    minUsedKey.KeyValue,
			RequestLeft: 3,
			TokenLeft:   40000,
		}
		return initCacheInfo, nil
	}, 1*time.Minute)

	err = cacheKeyVar.Struct(&cacheKeyInfo)
	if err != nil {
		return nil, err
	}

	// 4.如果可用,使用该key进行操作(减少request_left与token_left)
	cacheKeyInfo.RequestLeft--

	//TODO (laixin) 2023/7/7: 修改token_left
	err = gcache.Set(ctx, cacheKeyInfo.KeyValue, cacheKeyInfo, gcache.MustGetExpire(ctx, cacheKeyInfo.KeyValue))
	if err != nil {
		return nil, err
	}
	gmlock.Unlock(u.key) // 解锁

	// 5.修改key的使用次数

	_ = grpool.AddWithRecover(ctx, func(ctx context.Context) {
		err = service.GptProxyKey().UpdateKey(ctx, &m_gpt_proxy_key.InUpdate{
			Id:         minUsedKey.Id,
			UsedCount:  minUsedKey.UsedCount + 1,
			UpdateTime: gtime.Now(),
		})
		if err != nil {
			glog.Warning(ctx, "修改key使用次数失败: ", err.Error())
		}
	}, nil)

	// 6.调用GPT接口
	gptStartTime := gtime.Now()
	GPTResp, err = gpt_utils.ProxyChat.Completion(minUsedKey.KeyValue, params.Model, params.ChatMessage)
	gptEndTime := gtime.Now()

	// 7.如果失败,修改key失败次数
	if err != nil {
		glog.Warning(ctx, "调用GPT接口失败: ", err.Error())
		_ = grpool.AddWithRecover(ctx, func(ctx context.Context) {
			// 记录日志
			err = service.GptProxyLog().AddGPTLog(ctx, &m_gpt_proxy_log.InAdd{
				ApiPath:        "/chat/completions",
				RequestParams:  gconv.String(params),
				ResponseResult: "{}",
				ResponseTime:   gptEndTime.Sub(gptStartTime).Seconds(),
				ErrorMessage:   err.Error(),
				KeyId:          gconv.Int(minUsedKey.Id),
				CreateTime:     gtime.Now(),
			})
			if err != nil {
				glog.Error(ctx, "记录日志失败: ", err.Error())
			}
			// 修改key失败次数
			err = service.GptProxyKey().UpdateKey(ctx, &m_gpt_proxy_key.InUpdate{
				Id:          minUsedKey.Id,
				FailedCount: minUsedKey.FailedCount + 1,
				UpdateTime:  gtime.Now(),
			})
			if err != nil {
				glog.Error(ctx, "修改key失败次数失败: ", err.Error())
			}
		}, nil)
		return nil, err
	}

	// 8.如果成功,修改key成功次数,返回结果
	_ = grpool.AddWithRecover(ctx, func(ctx context.Context) {
		// 记录日志
		err = service.GptProxyLog().AddGPTLog(ctx, &m_gpt_proxy_log.InAdd{
			ApiPath:        "/chat/completions",
			RequestParams:  gconv.String(params),
			ResponseResult: gconv.String(GPTResp),
			ResponseTime:   gptEndTime.Sub(gptStartTime).Seconds(),
			ErrorMessage:   "",
			KeyId:          gconv.Int(minUsedKey.Id),
			CreateTime:     gtime.Now(),
		})
		if err != nil {
			glog.Warning(ctx, "记录日志失败: ", err.Error())
		}
		// 修改key成功次数
		err = service.GptProxyKey().UpdateKey(ctx, &m_gpt_proxy_key.InUpdate{
			Id:           minUsedKey.Id,
			SuccessCount: minUsedKey.SuccessCount + 1,
			UpdateTime:   gtime.Now(),
		})
		if err != nil {
			glog.Warning(ctx, "修改key成功次数失败: ", err.Error())
		}

	}, nil)
	return GPTResp, nil
}

后话

在这套系统上线后,在 GPT 模型的前期验证中剩下了一大笔开销,也减少了调用方的编写各种重试逻辑的难度。