package transcode import ( "context" "errors" "fmt" "log" "runtime/debug" "sync" "time" ) // Task 任务定义 type Task struct { ID string // 任务唯一ID Type string // 任务类型 EncodeUrl string // 任务音频地址 execute func(ctx context.Context, task *Task) error // 执行函数 ctx context.Context // 任务上下文 cancelFunc context.CancelFunc // 取消函数 status TaskStatus // 任务状态 startTime time.Time // 开始时间 endTime time.Time // 结束时间 createTime time.Time // 创建时间 quit chan error success chan struct{} isEnd bool Duration int64 } type TaskInfo struct { Status TaskStatus // 任务状态 StartTime time.Time // 开始时间 EndTime time.Time // 结束时间 CreateTime time.Time // 创建时间 } // TaskStatus 任务状态 type TaskStatus int const ( TaskPending TaskStatus = iota // 等待中 TaskRunning // 执行中 TaskCompleted // 已完成 TaskCancelled // 已取消 TaskFailed // 已失败 ) // Workflow 工作流 type Workflow struct { maxConcurrency int // 最大并发数 semaphore chan struct{} // 并发信号量 tasks map[string]*Task // 所有任务 taskQueue map[string][]string // 按类型分组的任务队列 runningTasks map[string]string // 正在运行的任务类型 -> 任务ID mu sync.RWMutex // 读写锁 wg sync.WaitGroup // 等待组 cleanupTicker *time.Ticker // 清理定时器 cleanupStop chan struct{} // 清理停止信号 // 清理配置 cleanupInterval time.Duration // 清理间隔 maxTaskHistory int // 最大历史任务数 retentionTime time.Duration // 任务保留时间 // 回调函数 onComplete func(task *Task) // 完成回调 onCancel func(task *Task) // 取消回调 onError func(task *Task, err error) // 错误回调 onCleanup func(task *Task, reason string) // 清理回调 } // WorkflowConfig 工作流配置 type WorkflowConfig struct { MaxConcurrency int // 最大并发数,默认2 CleanupInterval time.Duration // 清理间隔,默认30秒 MaxTaskHistory int // 最大历史任务数,默认1000 RetentionTime time.Duration // 任务保留时间,默认5分钟 } // NewWorkflow 创建工作流 func NewWorkflow(config WorkflowConfig) *Workflow { if config.MaxConcurrency <= 0 { config.MaxConcurrency = 2 } if config.CleanupInterval <= 0 { config.CleanupInterval = 30 * time.Second } if config.MaxTaskHistory <= 0 { config.MaxTaskHistory = 1000 } if config.RetentionTime <= 0 { config.RetentionTime = 5 * time.Minute } wf := &Workflow{ maxConcurrency: config.MaxConcurrency, semaphore: make(chan struct{}, config.MaxConcurrency), tasks: make(map[string]*Task), taskQueue: make(map[string][]string), runningTasks: make(map[string]string), cleanupInterval: config.CleanupInterval, maxTaskHistory: config.MaxTaskHistory, retentionTime: config.RetentionTime, cleanupStop: make(chan struct{}), } // 启动清理协程 wf.startCleanupRoutine() return wf } // SetCallbacks 设置回调函数 func (wf *Workflow) SetCallbacks( onComplete func(task *Task), onCancel func(task *Task), onError func(task *Task, err error), onCleanup func(task *Task, reason string), ) { wf.mu.Lock() defer wf.mu.Unlock() wf.onComplete = onComplete wf.onCancel = onCancel wf.onError = onError wf.onCleanup = onCleanup } // AddTask 添加新任务 func (wf *Workflow) AddTask(task *Task) error { if task == nil { return errors.New("task cannot be nil") } if task.ID == "" { return errors.New("task ID cannot be empty") } if task.execute == nil { return errors.New("task execute function cannot be nil") } wf.mu.Lock() defer wf.mu.Unlock() // 检查任务是否已存在 if _, exists := wf.tasks[task.ID]; exists { return fmt.Errorf("task with ID %s already exists", task.ID) } // 设置任务初始状态 task.status = TaskPending task.ctx, task.cancelFunc = context.WithCancel(context.Background()) task.createTime = time.Now() // 保存任务到map wf.tasks[task.ID] = task // 添加到类型队列 if _, ok := wf.taskQueue[task.Type]; !ok { wf.taskQueue[task.Type] = make([]string, 0) } wf.taskQueue[task.Type] = append(wf.taskQueue[task.Type], task.ID) // 尝试执行任务(在锁外执行,避免死锁) go func() { wf.tryExecuteTask(task.Type) }() return nil } // tryExecuteTask 尝试执行任务 func (wf *Workflow) tryExecuteTask(taskType string) { wf.mu.Lock() // 检查是否有该类型的任务正在运行 if _, isRunning := wf.runningTasks[taskType]; isRunning { wf.mu.Unlock() return } // 获取该类型的任务队列 queue, exists := wf.taskQueue[taskType] if !exists || len(queue) == 0 { wf.mu.Unlock() return } // 获取队列中的第一个任务 taskID := queue[0] task, taskExists := wf.tasks[taskID] if !taskExists || task.status != TaskPending { wf.mu.Unlock() return } wf.mu.Unlock() // 尝试获取并发许可(非阻塞) select { case wf.semaphore <- struct{}{}: // 获取成功,可以执行 wf.executeTask(task) default: // 并发已达上限,等待下次尝试 } } // executeTask 执行任务 func (wf *Workflow) executeTask(task *Task) { wf.mu.Lock() // 再次检查任务状态 if task.status != TaskPending { wf.mu.Unlock() <-wf.semaphore // 释放信号量 return } // 更新任务状态 task.status = TaskRunning task.startTime = time.Now() // 记录该类型任务正在运行 wf.runningTasks[task.Type] = task.ID // 从队列中移除(第一个元素) if queue, exists := wf.taskQueue[task.Type]; exists && len(queue) > 0 { wf.taskQueue[task.Type] = queue[1:] } wf.mu.Unlock() wf.wg.Add(1) // 启动任务执行 go func() { defer func() { // 恢复panic if r := recover(); r != nil { err := fmt.Errorf("task panic: %v\n%s", r, debug.Stack()) wf.handleTaskError(task, err) } wf.taskCompleted(task.Type) wf.wg.Done() }() // 设置超时 timeNum := 12 * 60 * time.Minute if task.Duration > 0 { timeNum = time.Duration(task.Duration) * time.Second } ctx, cancel := context.WithTimeout(task.ctx, timeNum) defer cancel() // 执行任务 err := task.execute(ctx, task) // 处理任务结果 wf.mu.Lock() defer wf.mu.Unlock() if err != nil { if errors.Is(err, context.Canceled) { task.status = TaskCancelled task.endTime = time.Now() wf.triggerOnCancel(task) } else { task.status = TaskFailed task.endTime = time.Now() wf.triggerOnError(task, err) } } else { task.status = TaskCompleted task.endTime = time.Now() wf.triggerOnComplete(task) } // 释放该类型的运行标记 delete(wf.runningTasks, task.Type) }() } // taskCompleted 任务完成处理 func (wf *Workflow) taskCompleted(taskType string) { // 释放并发许可 <-wf.semaphore // 尝试执行下一个同类型任务 go wf.tryExecuteTask(taskType) // 检查是否有其他类型的任务可以执行 wf.mu.RLock() allTypes := make([]string, 0, len(wf.taskQueue)) for t := range wf.taskQueue { allTypes = append(allTypes, t) } wf.mu.RUnlock() for _, t := range allTypes { if t != taskType { go wf.tryExecuteTask(t) } } } // CancelTask 取消任务 func (wf *Workflow) CancelTask(taskID string) bool { wf.mu.Lock() defer wf.mu.Unlock() task, exists := wf.tasks[taskID] if !exists { return false } switch task.status { case TaskPending: // 从队列中移除 if queue, exists := wf.taskQueue[task.Type]; exists { newQueue := make([]string, 0, len(queue)) for _, id := range queue { if id != taskID { newQueue = append(newQueue, id) } } wf.taskQueue[task.Type] = newQueue } task.status = TaskCancelled task.endTime = time.Now() wf.triggerOnCancel(task) return true case TaskRunning: // 取消正在执行的任务 if task.cancelFunc != nil { task.cancelFunc() } return true default: // 已完成或已取消的任务 return false } } // CleanupCompletedTasks 清理已完成的任务 func (wf *Workflow) CleanupCompletedTasks() int { wf.mu.Lock() defer wf.mu.Unlock() return wf.cleanupTasksInternal("manual", false) } // CleanupOldTasks 清理旧任务(基于保留时间) func (wf *Workflow) CleanupOldTasks() int { wf.mu.Lock() defer wf.mu.Unlock() return wf.cleanupTasksInternal("retention", false) } // CleanupAllTasks 清理所有任务(谨慎使用) func (wf *Workflow) CleanupAllTasks(force bool) int { wf.mu.Lock() defer wf.mu.Unlock() return wf.cleanupTasksInternal("all", force) } // cleanupTasksInternal 内部清理方法 func (wf *Workflow) cleanupTasksInternal(reason string, force bool) int { cleanedCount := 0 now := time.Now() for id, task := range wf.tasks { // 跳过运行中和等待中的任务(除非强制清理) if !force && (task.status == TaskRunning || task.status == TaskPending) { continue } // 检查是否应该清理 shouldClean := false switch reason { case "manual": // 手动清理:只清理已结束的任务 shouldClean = task.status == TaskCompleted || task.status == TaskCancelled || task.status == TaskFailed case "retention": // 基于保留时间清理 if task.endTime.IsZero() { // 如果没有结束时间,使用创建时间 shouldClean = now.Sub(task.createTime) > wf.retentionTime } else { shouldClean = now.Sub(task.endTime) > wf.retentionTime } case "all": // 清理所有任务(包括运行中和等待中的) shouldClean = true default: // 默认清理已结束的任务 shouldClean = task.status == TaskCompleted || task.status == TaskCancelled || task.status == TaskFailed } if shouldClean { // 触发清理回调 if wf.onCleanup != nil { wf.onCleanup(task, reason) } // 从tasks map中删除 delete(wf.tasks, id) cleanedCount++ } } // 清理空的任务队列 for taskType, queue := range wf.taskQueue { newQueue := make([]string, 0, len(queue)) for _, taskID := range queue { if _, exists := wf.tasks[taskID]; exists { newQueue = append(newQueue, taskID) } } wf.taskQueue[taskType] = newQueue // 如果队列为空,删除该类型 if len(newQueue) == 0 { delete(wf.taskQueue, taskType) } } // 清理运行任务记录 for taskType, taskID := range wf.runningTasks { if _, exists := wf.tasks[taskID]; !exists { delete(wf.runningTasks, taskType) } } return cleanedCount } // startCleanupRoutine 启动清理协程 func (wf *Workflow) startCleanupRoutine() { wf.cleanupTicker = time.NewTicker(wf.cleanupInterval) go func() { for { select { case <-wf.cleanupTicker.C: // 定期清理旧任务 wf.mu.Lock() cleaned := wf.cleanupTasksInternal("retention", false) wf.mu.Unlock() if cleaned > 0 { log.Printf("自动清理了 %d 个旧任务", cleaned) } // 检查任务数量,如果超过限制,清理最旧的任务 wf.mu.Lock() if len(wf.tasks) > wf.maxTaskHistory { wf.cleanupExcessTasks() } wf.mu.Unlock() case <-wf.cleanupStop: wf.cleanupTicker.Stop() return } } }() } // cleanupExcessTasks 清理超出限制的任务 func (wf *Workflow) cleanupExcessTasks() { if len(wf.tasks) <= wf.maxTaskHistory { return } // 收集所有任务并按创建时间排序 tasks := make([]*Task, 0, len(wf.tasks)) for _, task := range wf.tasks { tasks = append(tasks, task) } // 按创建时间排序(最旧的在前面) for i := 0; i < len(tasks); i++ { for j := i + 1; j < len(tasks); j++ { if tasks[j].createTime.Before(tasks[i].createTime) { tasks[i], tasks[j] = tasks[j], tasks[i] } } } // 计算需要清理的数量 excess := len(tasks) - wf.maxTaskHistory // 清理最旧的任务(跳过运行中和等待中的任务) cleaned := 0 for i := 0; i < len(tasks) && cleaned < excess; i++ { task := tasks[i] // 只清理已结束的任务 if task.status == TaskCompleted || task.status == TaskCancelled || task.status == TaskFailed { if wf.onCleanup != nil { wf.onCleanup(task, "max_history") } delete(wf.tasks, task.ID) cleaned++ } } if cleaned > 0 { log.Printf("清理了 %d 个任务以保持历史记录不超过 %d", cleaned, wf.maxTaskHistory) } } // GetTaskStatus 获取任务状态 func (wf *Workflow) GetTaskStatus(taskID string) (TaskStatus, bool) { wf.mu.RLock() defer wf.mu.RUnlock() task, exists := wf.tasks[taskID] if !exists { return TaskPending, false } return task.status, true } // GetTaskInfo 获取任务详情 func (wf *Workflow) GetTaskInfo(taskID string) (*TaskInfo, bool) { wf.mu.RLock() defer wf.mu.RUnlock() task, exists := wf.tasks[taskID] if !exists { return &TaskInfo{}, false } return &TaskInfo{ Status: task.status, EndTime: task.endTime, CreateTime: task.createTime, StartTime: task.startTime, }, true } // GetRunningTasks 获取正在运行的任务 func (wf *Workflow) GetRunningTasks() []*Task { wf.mu.RLock() defer wf.mu.RUnlock() result := make([]*Task, 0) for _, task := range wf.tasks { if task.status == TaskRunning { result = append(result, task) } } return result } // GetPendingTasks 获取等待中的任务 func (wf *Workflow) GetPendingTasks() []*Task { wf.mu.RLock() defer wf.mu.RUnlock() result := make([]*Task, 0) for _, task := range wf.tasks { if task.status == TaskPending { result = append(result, task) } } return result } // GetPendingTasks 获取等待中的任务 func (wf *Workflow) GetIsTasks(bid string) bool { wf.mu.RLock() defer wf.mu.RUnlock() result := false for _, task := range wf.tasks { if task.Type == bid && task.status != TaskPending && task.status != TaskRunning { result = true break } } return result } // GetTaskCount 获取各种状态的任务数量 func (wf *Workflow) GetTaskCount() (total, running, pending, completed, cancelled, failed int) { wf.mu.RLock() defer wf.mu.RUnlock() total = len(wf.tasks) for _, task := range wf.tasks { switch task.status { case TaskRunning: running++ case TaskPending: pending++ case TaskCompleted: completed++ case TaskCancelled: cancelled++ case TaskFailed: failed++ } } return } // Wait 等待所有任务完成 func (wf *Workflow) Wait() { wf.wg.Wait() } // Stop 停止工作流 func (wf *Workflow) Stop() { wf.mu.Lock() defer wf.mu.Unlock() // 发送清理停止信号 close(wf.cleanupStop) // 取消所有任务 for _, task := range wf.tasks { if task.status == TaskPending || task.status == TaskRunning { if task.cancelFunc != nil { task.cancelFunc() } } } } // 触发回调函数 func (wf *Workflow) triggerOnComplete(task *Task) { if wf.onComplete != nil { wf.onComplete(task) } } func (wf *Workflow) triggerOnCancel(task *Task) { if wf.onCancel != nil { wf.onCancel(task) } } func (wf *Workflow) triggerOnError(task *Task, err error) { if wf.onError != nil { wf.onError(task, err) } } func (wf *Workflow) handleTaskError(task *Task, err error) { wf.mu.Lock() defer wf.mu.Unlock() task.status = TaskFailed task.endTime = time.Now() wf.triggerOnError(task, err) delete(wf.runningTasks, task.Type) }