EasyAudioEncode/internal/core/transcode/core.go
2025-12-31 11:29:58 +08:00

168 lines
4.8 KiB
Go

package transcode
import (
"context"
"easyaudioencode/internal/core/audioencode"
"easyaudioencode/internal/core/host"
"easyaudioencode/pkg/ffmpeg"
"fmt"
"log/slog"
"strconv"
"time"
)
type Core struct {
HostCore *host.Core
AudioEncodeCore *audioencode.Core
WorkflowCore *Workflow
}
func NewCore(HostCore *host.Core, AudioEncodeCore *audioencode.Core) *Core {
core := &Core{
HostCore: HostCore,
AudioEncodeCore: AudioEncodeCore,
WorkflowCore: OpenAudioEncode(AudioEncodeCore),
}
// 启用任务管理器
return core
}
func OpenAudioEncode(AudioEncodeCore *audioencode.Core) *Workflow {
wf := NewWorkflow(WorkflowConfig{
MaxConcurrency: 100, // 并发
CleanupInterval: 30 * time.Second, // 每30秒清理一次
MaxTaskHistory: 500, // 最多保留500个任务历史
RetentionTime: 60 * time.Second, // 任务保留1分钟
})
// 设置回调函数
wf.SetCallbacks(
// 完成回调
func(task *Task) {
id, _ := strconv.Atoi(task.ID)
err := AudioEncodeCore.AudioTaskStatus(int(TaskCompleted), id)
if err != nil {
slog.Error("AudioTaskStatus", "status", TaskCompleted, "err", err.Error())
}
slog.Info(fmt.Sprintf("任务完成: ID=%s, Type=%s, 耗时=%v",
task.ID, task.Type, task.endTime.Sub(task.startTime)))
},
// 取消回调
func(task *Task) {
id, _ := strconv.Atoi(task.ID)
err := AudioEncodeCore.AudioTaskStatus(int(TaskCancelled), id)
if err != nil {
slog.Error("AudioTaskStatus", "status", TaskCancelled, "err", err.Error())
}
task.isEnd = true
slog.Info(fmt.Sprintf("任务取消: ID=%s, Type=%s", task.ID, task.Type))
},
// 错误回调
func(task *Task, errs error) {
id, _ := strconv.Atoi(task.ID)
err := AudioEncodeCore.AudioTaskStatusError(id, int(TaskFailed), errs.Error())
if err != nil {
slog.Error("AudioTaskStatus", "status", TaskFailed, "err", err.Error())
}
task.isEnd = true
slog.Info(fmt.Sprintf("任务失败: ID=%s, Type=%s, 错误=%v",
task.ID, task.Type, err))
},
// 清理回调
func(task *Task, reason string) {
slog.Info(fmt.Sprintf("任务清理: ID=%s, 原因=%s", task.ID, reason))
},
)
return wf
}
func (c Core) WorkflowExecute(ctx context.Context, t *Task) error {
slog.Info(fmt.Sprintf("开始执行任务: %s (类型: %s)", t.ID, t.Type))
if c.WorkflowCore.GetIsTasks(t.Type) {
time.Sleep(11 * time.Second)
}
//return fmt.Errorf("请求失败")
in := &host.FindTalkInput{
ChannelID: t.Type,
}
info, err := c.HostCore.FindTalkUrl(ctx, in)
if err != nil {
return fmt.Errorf("FindTalkUrl: %v", err.Error())
}
slog.Info(fmt.Sprintf("开始执行任务: URL:[%s] 类型:[%s]", info.TalkUrl, info.Transport))
go func(in *host.FinTalkOutput) {
t.quit <- t.StartTalkCode(in.TalkUrl, in.Transport)
}(info)
select {
case errs := <-t.quit:
slog.Info("处理异常退出", "id", t.ID)
return errs
case <-t.success:
slog.Info("处理完成", "id", t.ID)
return nil
// 用于测试
//case <-time.After(30 * time.Second):
// slog.Info("任务完成", "id", t.ID)
// t.isEnd = true
// return nil
case <-ctx.Done():
slog.Error("任务中断", "id", t.ID)
t.isEnd = true
return ctx.Err()
}
}
func (c Core) AddTask(id int, bid string, EncodeUrl string, duration int64) error {
newTask := &Task{
ID: strconv.Itoa(id),
Type: bid,
EncodeUrl: EncodeUrl,
Duration: duration,
quit: make(chan error, 3),
success: make(chan struct{}),
isEnd: false,
execute: c.WorkflowExecute,
}
err := c.WorkflowCore.AddTask(newTask)
if err != nil {
slog.Error("添加任务失败", "id", id, "err", err.Error())
return fmt.Errorf("添加任务失败 【%d】%v", id, err.Error())
}
slog.Info("添加任务成功", "id", id)
return nil
}
func (c Core) CancelTask(id int) error {
if c.WorkflowCore.CancelTask(strconv.Itoa(id)) {
slog.Info("已取消任务", "id", id)
return nil
}
slog.Warn("取消的任务不存在或者已被取消", "id", id)
return fmt.Errorf("取消的任务不存在或者已被取消 %d", id)
}
func (c Core) GetTaskInfo(id int) (*TaskInfo, error) {
task, t := c.WorkflowCore.GetTaskInfo(strconv.Itoa(id))
if !t {
slog.Debug("查询任务详情失败", "id", id)
return &TaskInfo{}, fmt.Errorf("查询任务详情失败 %d", id)
}
return task, nil
}
// StartAudioEncode 转码
func (c Core) StartAudioEncode(inputFile, outputFile string, id int) {
go func() {
status := audioencode.EncodeStatusSuccess
err := ffmpeg.TranscodeToG711AFile(inputFile, outputFile)
if err != nil {
slog.Error("StartAudioEncode TranscodeToG711AFile", "err", err.Error())
status = audioencode.EncodeStatusFailed
}
err = c.AudioEncodeCore.AudioEncodeStatus(status, id)
if err != nil {
slog.Error("StartAudioEncode AudioEncodeStatus", "err", err.Error())
}
}()
return
}