EasyAudioEncode/internal/core/transcode/code.go
2025-12-31 11:29:58 +08:00

139 lines
3.6 KiB
Go

package transcode
import (
"fmt"
"github.com/gorilla/websocket"
"log/slog"
"math/rand"
"net/url"
"os"
"path/filepath"
"time"
"github.com/pion/rtp"
)
func (tk *Task) StartTalkCode(wsURL string, transport string) error {
sampleRate := 8000
frameSize := 160
dir, _ := os.Getwd()
inputFile := filepath.Join(dir, tk.EncodeUrl)
//SSRC := tk.GetSSRCUint32()
if _, err := os.Stat(inputFile); os.IsNotExist(err) {
return fmt.Errorf("input file does not exist: %v", err.Error())
}
u, err := url.Parse(wsURL)
if err != nil {
return fmt.Errorf("invalid WebSocket URL: %v", err.Error())
}
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
return fmt.Errorf("failed to connect to WebSocket: %v", err.Error())
}
defer func() {
c.Close()
}()
// Open G711A file
file, err := os.Open(inputFile)
if err != nil {
return fmt.Errorf("failed to open input file: %v", err.Error())
}
defer file.Close()
// Initialize RTP parameters
seqNumber := uint16(0)
// Initialize with a random timestamp base as per RTP standards
initialTimestamp := uint32(time.Now().UnixNano() / 1e6) // Use current time as base
// Calculate frame duration in milliseconds
frameDuration := time.Duration(float64(frameSize)/float64(sampleRate)*1000) * time.Millisecond
nextSendTime := time.Now()
frame := make([]byte, frameSize)
var errors error
for {
if tk.isEnd {
break // End of file
}
n, err := file.Read(frame)
if err != nil {
if n == 0 {
break // End of file
}
slog.Error("Error reading file", "err", err.Error())
break
}
// Calculate current timestamp based on initial + frames sent * samples per frame
currentTimestamp := initialTimestamp + uint32(seqNumber)*uint32(frameSize)
// Create RTP packet for G711A
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 8, // G711 A-law
SequenceNumber: seqNumber,
Timestamp: currentTimestamp,
SSRC: 123456,
},
Payload: frame[:n],
}
// Send RTP packet via WebSocket
data, err := pkt.Marshal()
if err != nil {
slog.Debug("Error marshaling RTP packet", "err", err.Error())
continue
}
if transport == "tcp" {
tcpBuf := make([]byte, 4)
tcpBuf[0] = byte(len(data) >> 24)
tcpBuf[1] = byte(len(data) >> 16)
tcpBuf[2] = byte(len(data) >> 8)
tcpBuf[3] = byte(len(data))
err = c.WriteMessage(websocket.BinaryMessage, tcpBuf)
if err != nil {
errors = fmt.Errorf("error sending RTP packet via WebSocket: %v", err.Error())
slog.Debug("Error sending RTP packet via WebSocket", "err", err.Error())
break
}
} else {
err = c.WriteMessage(websocket.BinaryMessage, data)
if err != nil {
errors = fmt.Errorf("error sending RTP packet via WebSocket: %v", err.Error())
slog.Debug("Error sending RTP packet via WebSocket", "err", err.Error())
break
}
}
// Increment sequence number
seqNumber++
// Control the sending rate to match real-time
// Wait until it's time to send the next frame
nextSendTime = nextSendTime.Add(frameDuration)
sleepDuration := time.Until(nextSendTime)
if sleepDuration > 0 {
time.Sleep(sleepDuration)
} else {
// If we're behind schedule, update nextSendTime to current time
nextSendTime = time.Now().Add(frameDuration)
}
}
if errors != nil {
return errors
}
slog.Info("G711A file sent successfully")
close(tk.success)
return nil
}
func (tk *Task) GetSSRCUint32() uint32 {
rand.Seed(time.Now().UnixNano())
// rand.Uint32() 会返回一个类型为 uint32 的值
// 其范围是 0 到 2^32 - 1 (即 0 到 4294967295)
return rand.Uint32()
}