package transcode import ( "context" "easyaudioencode/internal/core/audioencode" "easyaudioencode/internal/core/host" "easyaudioencode/pkg/ffmpeg" "fmt" "log/slog" "strconv" "time" ) type Core struct { HostCore *host.Core AudioEncodeCore *audioencode.Core WorkflowCore *Workflow } func NewCore(HostCore *host.Core, AudioEncodeCore *audioencode.Core) *Core { core := &Core{ HostCore: HostCore, AudioEncodeCore: AudioEncodeCore, WorkflowCore: OpenAudioEncode(AudioEncodeCore), } // 启用任务管理器 return core } func OpenAudioEncode(AudioEncodeCore *audioencode.Core) *Workflow { wf := NewWorkflow(WorkflowConfig{ MaxConcurrency: 100, // 并发 CleanupInterval: 30 * time.Second, // 每30秒清理一次 MaxTaskHistory: 500, // 最多保留500个任务历史 RetentionTime: 60 * time.Second, // 任务保留1分钟 }) // 设置回调函数 wf.SetCallbacks( // 完成回调 func(task *Task) { id, _ := strconv.Atoi(task.ID) err := AudioEncodeCore.AudioTaskStatus(int(TaskCompleted), id) if err != nil { slog.Error("AudioTaskStatus", "status", TaskCompleted, "err", err.Error()) } slog.Info(fmt.Sprintf("任务完成: ID=%s, Type=%s, 耗时=%v", task.ID, task.Type, task.endTime.Sub(task.startTime))) }, // 取消回调 func(task *Task) { id, _ := strconv.Atoi(task.ID) err := AudioEncodeCore.AudioTaskStatus(int(TaskCancelled), id) if err != nil { slog.Error("AudioTaskStatus", "status", TaskCancelled, "err", err.Error()) } task.isEnd = true slog.Info(fmt.Sprintf("任务取消: ID=%s, Type=%s", task.ID, task.Type)) }, // 错误回调 func(task *Task, errs error) { id, _ := strconv.Atoi(task.ID) err := AudioEncodeCore.AudioTaskStatusError(id, int(TaskFailed), errs.Error()) if err != nil { slog.Error("AudioTaskStatus", "status", TaskFailed, "err", err.Error()) } task.isEnd = true slog.Info(fmt.Sprintf("任务失败: ID=%s, Type=%s, 错误=%v", task.ID, task.Type, err)) }, // 清理回调 func(task *Task, reason string) { slog.Info(fmt.Sprintf("任务清理: ID=%s, 原因=%s", task.ID, reason)) }, ) return wf } func (c Core) WorkflowExecute(ctx context.Context, t *Task) error { slog.Info(fmt.Sprintf("开始执行任务: %s (类型: %s)", t.ID, t.Type)) if c.WorkflowCore.GetIsTasks(t.Type) { time.Sleep(11 * time.Second) } //return fmt.Errorf("请求失败") in := &host.FindTalkInput{ ChannelID: t.Type, } info, err := c.HostCore.FindTalkUrl(ctx, in) if err != nil { return fmt.Errorf("FindTalkUrl: %v", err.Error()) } slog.Info(fmt.Sprintf("开始执行任务: URL:[%s] 类型:[%s]", info.TalkUrl, info.Transport)) go func(in *host.FinTalkOutput) { t.quit <- t.StartTalkCode(in.TalkUrl, in.Transport) }(info) select { case errs := <-t.quit: slog.Info("处理异常退出", "id", t.ID) return errs case <-t.success: slog.Info("处理完成", "id", t.ID) return nil // 用于测试 //case <-time.After(30 * time.Second): // slog.Info("任务完成", "id", t.ID) // t.isEnd = true // return nil case <-ctx.Done(): slog.Error("任务中断", "id", t.ID) t.isEnd = true return ctx.Err() } } func (c Core) AddTask(id int, bid string, EncodeUrl string, duration int64) error { newTask := &Task{ ID: strconv.Itoa(id), Type: bid, EncodeUrl: EncodeUrl, Duration: duration, quit: make(chan error, 3), success: make(chan struct{}), isEnd: false, execute: c.WorkflowExecute, } err := c.WorkflowCore.AddTask(newTask) if err != nil { slog.Error("添加任务失败", "id", id, "err", err.Error()) return fmt.Errorf("添加任务失败 【%d】%v", id, err.Error()) } slog.Info("添加任务成功", "id", id) return nil } func (c Core) CancelTask(id int) error { if c.WorkflowCore.CancelTask(strconv.Itoa(id)) { slog.Info("已取消任务", "id", id) return nil } slog.Warn("取消的任务不存在或者已被取消", "id", id) return fmt.Errorf("取消的任务不存在或者已被取消 %d", id) } func (c Core) GetTaskInfo(id int) (*TaskInfo, error) { task, t := c.WorkflowCore.GetTaskInfo(strconv.Itoa(id)) if !t { slog.Debug("查询任务详情失败", "id", id) return &TaskInfo{}, fmt.Errorf("查询任务详情失败 %d", id) } return task, nil } // StartAudioEncode 转码 func (c Core) StartAudioEncode(inputFile, outputFile string, id int) { go func() { status := audioencode.EncodeStatusSuccess err := ffmpeg.TranscodeToG711AFile(inputFile, outputFile) if err != nil { slog.Error("StartAudioEncode TranscodeToG711AFile", "err", err.Error()) status = audioencode.EncodeStatusFailed } err = c.AudioEncodeCore.AudioEncodeStatus(status, id) if err != nil { slog.Error("StartAudioEncode AudioEncodeStatus", "err", err.Error()) } }() return }