package transcode import ( "fmt" "github.com/gorilla/websocket" "log/slog" "math/rand" "net/url" "os" "path/filepath" "time" "github.com/pion/rtp" ) func (tk *Task) StartTalkCode(wsURL string, transport string) error { sampleRate := 8000 frameSize := 160 dir, _ := os.Getwd() inputFile := filepath.Join(dir, tk.EncodeUrl) //SSRC := tk.GetSSRCUint32() if _, err := os.Stat(inputFile); os.IsNotExist(err) { return fmt.Errorf("input file does not exist: %v", err.Error()) } u, err := url.Parse(wsURL) if err != nil { return fmt.Errorf("invalid WebSocket URL: %v", err.Error()) } c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { return fmt.Errorf("failed to connect to WebSocket: %v", err.Error()) } defer func() { c.Close() }() // Open G711A file file, err := os.Open(inputFile) if err != nil { return fmt.Errorf("failed to open input file: %v", err.Error()) } defer file.Close() // Initialize RTP parameters seqNumber := uint16(0) // Initialize with a random timestamp base as per RTP standards initialTimestamp := uint32(time.Now().UnixNano() / 1e6) // Use current time as base // Calculate frame duration in milliseconds frameDuration := time.Duration(float64(frameSize)/float64(sampleRate)*1000) * time.Millisecond nextSendTime := time.Now() frame := make([]byte, frameSize) var errors error for { if tk.isEnd { break // End of file } n, err := file.Read(frame) if err != nil { if n == 0 { break // End of file } slog.Error("Error reading file", "err", err.Error()) break } // Calculate current timestamp based on initial + frames sent * samples per frame currentTimestamp := initialTimestamp + uint32(seqNumber)*uint32(frameSize) // Create RTP packet for G711A pkt := &rtp.Packet{ Header: rtp.Header{ Version: 2, PayloadType: 8, // G711 A-law SequenceNumber: seqNumber, Timestamp: currentTimestamp, SSRC: 123456, }, Payload: frame[:n], } // Send RTP packet via WebSocket data, err := pkt.Marshal() if err != nil { slog.Debug("Error marshaling RTP packet", "err", err.Error()) continue } if transport == "tcp" { tcpBuf := make([]byte, 4) tcpBuf[0] = byte(len(data) >> 24) tcpBuf[1] = byte(len(data) >> 16) tcpBuf[2] = byte(len(data) >> 8) tcpBuf[3] = byte(len(data)) err = c.WriteMessage(websocket.BinaryMessage, tcpBuf) if err != nil { errors = fmt.Errorf("error sending RTP packet via WebSocket: %v", err.Error()) slog.Debug("Error sending RTP packet via WebSocket", "err", err.Error()) break } } else { err = c.WriteMessage(websocket.BinaryMessage, data) if err != nil { errors = fmt.Errorf("error sending RTP packet via WebSocket: %v", err.Error()) slog.Debug("Error sending RTP packet via WebSocket", "err", err.Error()) break } } // Increment sequence number seqNumber++ // Control the sending rate to match real-time // Wait until it's time to send the next frame nextSendTime = nextSendTime.Add(frameDuration) sleepDuration := time.Until(nextSendTime) if sleepDuration > 0 { time.Sleep(sleepDuration) } else { // If we're behind schedule, update nextSendTime to current time nextSendTime = time.Now().Add(frameDuration) } } if errors != nil { return errors } slog.Info("G711A file sent successfully") close(tk.success) return nil } func (tk *Task) GetSSRCUint32() uint32 { rand.Seed(time.Now().UnixNano()) // rand.Uint32() 会返回一个类型为 uint32 的值 // 其范围是 0 到 2^32 - 1 (即 0 到 4294967295) return rand.Uint32() }