EasyAudioEncode/internal/core/transcode/task.go
2025-12-31 11:29:58 +08:00

681 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package transcode
import (
"context"
"errors"
"fmt"
"log"
"runtime/debug"
"sync"
"time"
)
// Task 任务定义
type Task struct {
ID string // 任务唯一ID
Type string // 任务类型
EncodeUrl string // 任务音频地址
execute func(ctx context.Context, task *Task) error // 执行函数
ctx context.Context // 任务上下文
cancelFunc context.CancelFunc // 取消函数
status TaskStatus // 任务状态
startTime time.Time // 开始时间
endTime time.Time // 结束时间
createTime time.Time // 创建时间
quit chan error
success chan struct{}
isEnd bool
Duration int64
}
type TaskInfo struct {
Status TaskStatus // 任务状态
StartTime time.Time // 开始时间
EndTime time.Time // 结束时间
CreateTime time.Time // 创建时间
}
// TaskStatus 任务状态
type TaskStatus int
const (
TaskPending TaskStatus = iota // 等待中
TaskRunning // 执行中
TaskCompleted // 已完成
TaskCancelled // 已取消
TaskFailed // 已失败
)
// Workflow 工作流
type Workflow struct {
maxConcurrency int // 最大并发数
semaphore chan struct{} // 并发信号量
tasks map[string]*Task // 所有任务
taskQueue map[string][]string // 按类型分组的任务队列
runningTasks map[string]string // 正在运行的任务类型 -> 任务ID
mu sync.RWMutex // 读写锁
wg sync.WaitGroup // 等待组
cleanupTicker *time.Ticker // 清理定时器
cleanupStop chan struct{} // 清理停止信号
// 清理配置
cleanupInterval time.Duration // 清理间隔
maxTaskHistory int // 最大历史任务数
retentionTime time.Duration // 任务保留时间
// 回调函数
onComplete func(task *Task) // 完成回调
onCancel func(task *Task) // 取消回调
onError func(task *Task, err error) // 错误回调
onCleanup func(task *Task, reason string) // 清理回调
}
// WorkflowConfig 工作流配置
type WorkflowConfig struct {
MaxConcurrency int // 最大并发数默认2
CleanupInterval time.Duration // 清理间隔默认30秒
MaxTaskHistory int // 最大历史任务数默认1000
RetentionTime time.Duration // 任务保留时间默认5分钟
}
// NewWorkflow 创建工作流
func NewWorkflow(config WorkflowConfig) *Workflow {
if config.MaxConcurrency <= 0 {
config.MaxConcurrency = 2
}
if config.CleanupInterval <= 0 {
config.CleanupInterval = 30 * time.Second
}
if config.MaxTaskHistory <= 0 {
config.MaxTaskHistory = 1000
}
if config.RetentionTime <= 0 {
config.RetentionTime = 5 * time.Minute
}
wf := &Workflow{
maxConcurrency: config.MaxConcurrency,
semaphore: make(chan struct{}, config.MaxConcurrency),
tasks: make(map[string]*Task),
taskQueue: make(map[string][]string),
runningTasks: make(map[string]string),
cleanupInterval: config.CleanupInterval,
maxTaskHistory: config.MaxTaskHistory,
retentionTime: config.RetentionTime,
cleanupStop: make(chan struct{}),
}
// 启动清理协程
wf.startCleanupRoutine()
return wf
}
// SetCallbacks 设置回调函数
func (wf *Workflow) SetCallbacks(
onComplete func(task *Task),
onCancel func(task *Task),
onError func(task *Task, err error),
onCleanup func(task *Task, reason string),
) {
wf.mu.Lock()
defer wf.mu.Unlock()
wf.onComplete = onComplete
wf.onCancel = onCancel
wf.onError = onError
wf.onCleanup = onCleanup
}
// AddTask 添加新任务
func (wf *Workflow) AddTask(task *Task) error {
if task == nil {
return errors.New("task cannot be nil")
}
if task.ID == "" {
return errors.New("task ID cannot be empty")
}
if task.execute == nil {
return errors.New("task execute function cannot be nil")
}
wf.mu.Lock()
defer wf.mu.Unlock()
// 检查任务是否已存在
if _, exists := wf.tasks[task.ID]; exists {
return fmt.Errorf("task with ID %s already exists", task.ID)
}
// 设置任务初始状态
task.status = TaskPending
task.ctx, task.cancelFunc = context.WithCancel(context.Background())
task.createTime = time.Now()
// 保存任务到map
wf.tasks[task.ID] = task
// 添加到类型队列
if _, ok := wf.taskQueue[task.Type]; !ok {
wf.taskQueue[task.Type] = make([]string, 0)
}
wf.taskQueue[task.Type] = append(wf.taskQueue[task.Type], task.ID)
// 尝试执行任务(在锁外执行,避免死锁)
go func() {
wf.tryExecuteTask(task.Type)
}()
return nil
}
// tryExecuteTask 尝试执行任务
func (wf *Workflow) tryExecuteTask(taskType string) {
wf.mu.Lock()
// 检查是否有该类型的任务正在运行
if _, isRunning := wf.runningTasks[taskType]; isRunning {
wf.mu.Unlock()
return
}
// 获取该类型的任务队列
queue, exists := wf.taskQueue[taskType]
if !exists || len(queue) == 0 {
wf.mu.Unlock()
return
}
// 获取队列中的第一个任务
taskID := queue[0]
task, taskExists := wf.tasks[taskID]
if !taskExists || task.status != TaskPending {
wf.mu.Unlock()
return
}
wf.mu.Unlock()
// 尝试获取并发许可(非阻塞)
select {
case wf.semaphore <- struct{}{}:
// 获取成功,可以执行
wf.executeTask(task)
default:
// 并发已达上限,等待下次尝试
}
}
// executeTask 执行任务
func (wf *Workflow) executeTask(task *Task) {
wf.mu.Lock()
// 再次检查任务状态
if task.status != TaskPending {
wf.mu.Unlock()
<-wf.semaphore // 释放信号量
return
}
// 更新任务状态
task.status = TaskRunning
task.startTime = time.Now()
// 记录该类型任务正在运行
wf.runningTasks[task.Type] = task.ID
// 从队列中移除(第一个元素)
if queue, exists := wf.taskQueue[task.Type]; exists && len(queue) > 0 {
wf.taskQueue[task.Type] = queue[1:]
}
wf.mu.Unlock()
wf.wg.Add(1)
// 启动任务执行
go func() {
defer func() {
// 恢复panic
if r := recover(); r != nil {
err := fmt.Errorf("task panic: %v\n%s", r, debug.Stack())
wf.handleTaskError(task, err)
}
wf.taskCompleted(task.Type)
wf.wg.Done()
}()
// 设置超时
timeNum := 12 * 60 * time.Minute
if task.Duration > 0 {
timeNum = time.Duration(task.Duration) * time.Second
}
ctx, cancel := context.WithTimeout(task.ctx, timeNum)
defer cancel()
// 执行任务
err := task.execute(ctx, task)
// 处理任务结果
wf.mu.Lock()
defer wf.mu.Unlock()
if err != nil {
if errors.Is(err, context.Canceled) {
task.status = TaskCancelled
task.endTime = time.Now()
wf.triggerOnCancel(task)
} else {
task.status = TaskFailed
task.endTime = time.Now()
wf.triggerOnError(task, err)
}
} else {
task.status = TaskCompleted
task.endTime = time.Now()
wf.triggerOnComplete(task)
}
// 释放该类型的运行标记
delete(wf.runningTasks, task.Type)
}()
}
// taskCompleted 任务完成处理
func (wf *Workflow) taskCompleted(taskType string) {
// 释放并发许可
<-wf.semaphore
// 尝试执行下一个同类型任务
go wf.tryExecuteTask(taskType)
// 检查是否有其他类型的任务可以执行
wf.mu.RLock()
allTypes := make([]string, 0, len(wf.taskQueue))
for t := range wf.taskQueue {
allTypes = append(allTypes, t)
}
wf.mu.RUnlock()
for _, t := range allTypes {
if t != taskType {
go wf.tryExecuteTask(t)
}
}
}
// CancelTask 取消任务
func (wf *Workflow) CancelTask(taskID string) bool {
wf.mu.Lock()
defer wf.mu.Unlock()
task, exists := wf.tasks[taskID]
if !exists {
return false
}
switch task.status {
case TaskPending:
// 从队列中移除
if queue, exists := wf.taskQueue[task.Type]; exists {
newQueue := make([]string, 0, len(queue))
for _, id := range queue {
if id != taskID {
newQueue = append(newQueue, id)
}
}
wf.taskQueue[task.Type] = newQueue
}
task.status = TaskCancelled
task.endTime = time.Now()
wf.triggerOnCancel(task)
return true
case TaskRunning:
// 取消正在执行的任务
if task.cancelFunc != nil {
task.cancelFunc()
}
return true
default:
// 已完成或已取消的任务
return false
}
}
// CleanupCompletedTasks 清理已完成的任务
func (wf *Workflow) CleanupCompletedTasks() int {
wf.mu.Lock()
defer wf.mu.Unlock()
return wf.cleanupTasksInternal("manual", false)
}
// CleanupOldTasks 清理旧任务(基于保留时间)
func (wf *Workflow) CleanupOldTasks() int {
wf.mu.Lock()
defer wf.mu.Unlock()
return wf.cleanupTasksInternal("retention", false)
}
// CleanupAllTasks 清理所有任务(谨慎使用)
func (wf *Workflow) CleanupAllTasks(force bool) int {
wf.mu.Lock()
defer wf.mu.Unlock()
return wf.cleanupTasksInternal("all", force)
}
// cleanupTasksInternal 内部清理方法
func (wf *Workflow) cleanupTasksInternal(reason string, force bool) int {
cleanedCount := 0
now := time.Now()
for id, task := range wf.tasks {
// 跳过运行中和等待中的任务(除非强制清理)
if !force && (task.status == TaskRunning || task.status == TaskPending) {
continue
}
// 检查是否应该清理
shouldClean := false
switch reason {
case "manual":
// 手动清理:只清理已结束的任务
shouldClean = task.status == TaskCompleted ||
task.status == TaskCancelled ||
task.status == TaskFailed
case "retention":
// 基于保留时间清理
if task.endTime.IsZero() {
// 如果没有结束时间,使用创建时间
shouldClean = now.Sub(task.createTime) > wf.retentionTime
} else {
shouldClean = now.Sub(task.endTime) > wf.retentionTime
}
case "all":
// 清理所有任务(包括运行中和等待中的)
shouldClean = true
default:
// 默认清理已结束的任务
shouldClean = task.status == TaskCompleted ||
task.status == TaskCancelled ||
task.status == TaskFailed
}
if shouldClean {
// 触发清理回调
if wf.onCleanup != nil {
wf.onCleanup(task, reason)
}
// 从tasks map中删除
delete(wf.tasks, id)
cleanedCount++
}
}
// 清理空的任务队列
for taskType, queue := range wf.taskQueue {
newQueue := make([]string, 0, len(queue))
for _, taskID := range queue {
if _, exists := wf.tasks[taskID]; exists {
newQueue = append(newQueue, taskID)
}
}
wf.taskQueue[taskType] = newQueue
// 如果队列为空,删除该类型
if len(newQueue) == 0 {
delete(wf.taskQueue, taskType)
}
}
// 清理运行任务记录
for taskType, taskID := range wf.runningTasks {
if _, exists := wf.tasks[taskID]; !exists {
delete(wf.runningTasks, taskType)
}
}
return cleanedCount
}
// startCleanupRoutine 启动清理协程
func (wf *Workflow) startCleanupRoutine() {
wf.cleanupTicker = time.NewTicker(wf.cleanupInterval)
go func() {
for {
select {
case <-wf.cleanupTicker.C:
// 定期清理旧任务
wf.mu.Lock()
cleaned := wf.cleanupTasksInternal("retention", false)
wf.mu.Unlock()
if cleaned > 0 {
log.Printf("自动清理了 %d 个旧任务", cleaned)
}
// 检查任务数量,如果超过限制,清理最旧的任务
wf.mu.Lock()
if len(wf.tasks) > wf.maxTaskHistory {
wf.cleanupExcessTasks()
}
wf.mu.Unlock()
case <-wf.cleanupStop:
wf.cleanupTicker.Stop()
return
}
}
}()
}
// cleanupExcessTasks 清理超出限制的任务
func (wf *Workflow) cleanupExcessTasks() {
if len(wf.tasks) <= wf.maxTaskHistory {
return
}
// 收集所有任务并按创建时间排序
tasks := make([]*Task, 0, len(wf.tasks))
for _, task := range wf.tasks {
tasks = append(tasks, task)
}
// 按创建时间排序(最旧的在前面)
for i := 0; i < len(tasks); i++ {
for j := i + 1; j < len(tasks); j++ {
if tasks[j].createTime.Before(tasks[i].createTime) {
tasks[i], tasks[j] = tasks[j], tasks[i]
}
}
}
// 计算需要清理的数量
excess := len(tasks) - wf.maxTaskHistory
// 清理最旧的任务(跳过运行中和等待中的任务)
cleaned := 0
for i := 0; i < len(tasks) && cleaned < excess; i++ {
task := tasks[i]
// 只清理已结束的任务
if task.status == TaskCompleted || task.status == TaskCancelled || task.status == TaskFailed {
if wf.onCleanup != nil {
wf.onCleanup(task, "max_history")
}
delete(wf.tasks, task.ID)
cleaned++
}
}
if cleaned > 0 {
log.Printf("清理了 %d 个任务以保持历史记录不超过 %d", cleaned, wf.maxTaskHistory)
}
}
// GetTaskStatus 获取任务状态
func (wf *Workflow) GetTaskStatus(taskID string) (TaskStatus, bool) {
wf.mu.RLock()
defer wf.mu.RUnlock()
task, exists := wf.tasks[taskID]
if !exists {
return TaskPending, false
}
return task.status, true
}
// GetTaskInfo 获取任务详情
func (wf *Workflow) GetTaskInfo(taskID string) (*TaskInfo, bool) {
wf.mu.RLock()
defer wf.mu.RUnlock()
task, exists := wf.tasks[taskID]
if !exists {
return &TaskInfo{}, false
}
return &TaskInfo{
Status: task.status,
EndTime: task.endTime,
CreateTime: task.createTime,
StartTime: task.startTime,
}, true
}
// GetRunningTasks 获取正在运行的任务
func (wf *Workflow) GetRunningTasks() []*Task {
wf.mu.RLock()
defer wf.mu.RUnlock()
result := make([]*Task, 0)
for _, task := range wf.tasks {
if task.status == TaskRunning {
result = append(result, task)
}
}
return result
}
// GetPendingTasks 获取等待中的任务
func (wf *Workflow) GetPendingTasks() []*Task {
wf.mu.RLock()
defer wf.mu.RUnlock()
result := make([]*Task, 0)
for _, task := range wf.tasks {
if task.status == TaskPending {
result = append(result, task)
}
}
return result
}
// GetPendingTasks 获取等待中的任务
func (wf *Workflow) GetIsTasks(bid string) bool {
wf.mu.RLock()
defer wf.mu.RUnlock()
result := false
for _, task := range wf.tasks {
if task.Type == bid && task.status != TaskPending && task.status != TaskRunning {
result = true
break
}
}
return result
}
// GetTaskCount 获取各种状态的任务数量
func (wf *Workflow) GetTaskCount() (total, running, pending, completed, cancelled, failed int) {
wf.mu.RLock()
defer wf.mu.RUnlock()
total = len(wf.tasks)
for _, task := range wf.tasks {
switch task.status {
case TaskRunning:
running++
case TaskPending:
pending++
case TaskCompleted:
completed++
case TaskCancelled:
cancelled++
case TaskFailed:
failed++
}
}
return
}
// Wait 等待所有任务完成
func (wf *Workflow) Wait() {
wf.wg.Wait()
}
// Stop 停止工作流
func (wf *Workflow) Stop() {
wf.mu.Lock()
defer wf.mu.Unlock()
// 发送清理停止信号
close(wf.cleanupStop)
// 取消所有任务
for _, task := range wf.tasks {
if task.status == TaskPending || task.status == TaskRunning {
if task.cancelFunc != nil {
task.cancelFunc()
}
}
}
}
// 触发回调函数
func (wf *Workflow) triggerOnComplete(task *Task) {
if wf.onComplete != nil {
wf.onComplete(task)
}
}
func (wf *Workflow) triggerOnCancel(task *Task) {
if wf.onCancel != nil {
wf.onCancel(task)
}
}
func (wf *Workflow) triggerOnError(task *Task, err error) {
if wf.onError != nil {
wf.onError(task, err)
}
}
func (wf *Workflow) handleTaskError(task *Task, err error) {
wf.mu.Lock()
defer wf.mu.Unlock()
task.status = TaskFailed
task.endTime = time.Now()
wf.triggerOnError(task, err)
delete(wf.runningTasks, task.Type)
}