[GoLang] gRPC를 통한 동영상 스트리밍 및 모니터링
gRPC가 얼마나 빠른지는 이 포스트에서 알아봤었고 스트리밍 방식의 종류는 이 포스트로 알아봤습니다.이제는 동영상 전송에서의 예시를 작성해 봅시다. 1. Idea 동영상 파일을 단순히 전송하는
seung.tistory.com
이전 글을 했다는 가정하에 해당 글을 진행합니다.
1. Idea
이전에는 단순히 영상을 청크로 나눠서 gRPC로 받으면서 청크마다 성공여부 및 진행도를 알 수 있게끔 해봤는데요
이번에는 다음 조건들을 추가로 적용해보고자 합니다.
- 실시간 스트리밍: 클라이언트에서 32KB 단위로 전달하는 동영상 청크를 서버에서 수신하면서, 전체 파일을 다 수신하지 않아도 부분 재생이 가능하도록 합니다.
- 버퍼링 도입: 일정량의 데이터를 버퍼(preBuffer)에 저장한 뒤, 버퍼링 임계치(예: 2MB)가 도달하면 재생을 시작합니다.
- 외부 플레이어 연동: 수신한 데이터를 실시간 재생하기 위해 외부 플레이어인 ffplay(또는 유사 도구)와 연동하여 데이터를 전달합니다.
주요 개념
- 비디오 포맷 선택:
- MP4와 같이 메타데이터가 파일의 끝부분에 있는 포맷은 전체 파일이 수신되어야 재생이 가능합니다.
- 실시간 스트리밍에는 MPEG-TS, fragmented MP4, WebM 등 스트리밍에 적합한 포맷을 선택하는 것이 좋습니다.
- 데이터 파이프라인 구축:
- io.Pipe()를 활용하여 서버에서 수신한 데이터를 외부 플레이어로 바로 전달하는 방식으로 실시간 재생 파이프라인을 구축합니다.
- 동시성 처리:
- 고루틴을 활용해 데이터 수신 및 재생, 버퍼 관리 로직을 병렬로 운영하여 효율적인 스트리밍 처리를 구현합니다.
- 에러 및 동기화 관리:
- 네트워크 지연, 데이터 손실 등의 이슈에 대비해 에러 핸들링과 동기화 처리를 꼼꼼히 구현합니다.
- 동영상 헤더 처리
- mp4 파일은 문제가 있는데, moov atom이 파일 끝에 위치하므로 실시간 스트리밍에서는 제약이 발생합니다.
- faststart가 되어 있는 mp4만 사용 가능합니다. => ffmpeg를 통해 패지키징이 필요
2. ffmpeg
- ffmpeg란?
- ffmpeg는 비디오/오디오 변환, 인코딩, 디코딩, 스트리밍 등의 기능을 제공하는 오픈소스 멀티미디어 프레임워크입니다.
- ffmpeg 패키지에는 ffmpeg 실행 파일과 함께 ffplay (실시간 재생 도구), ffprobe (미디어 파일 분석 도구) 등이 포함되어 있습니다.
- ffplay의 역할
- ffplay는 ffmpeg의 기능 중 하나로, 표준 입력(stdin)을 통해 전달된 미디어 데이터를 디코딩하여 실시간 재생할 수 있도록 지원합니다.
- 이는 스트리밍 데이터를 실시간으로 화면에 출력하는 데 매우 유용합니다.
Windows에서 설치
제 로컬은 윈도우이므로 윈도우에서의 설치 조건을 알아봅시다.
FFmpeg
FFmpeg은 디지털 음성 스트림과 영상 스트림에 대해서 다양한 종류의 형태로 기록하고 변환하는 컴퓨터 프로그램이다. FFmpeg은 명령어를 직접 입력하는 방식으로 동작하며 여러 가지 자유 소프트
www.google.com
이 경로에서 os에 받게 설치를 하고 압축을 푼 다음 해당 폴더의 bin 폴더 경로로 path를 설정합니다
예시로 들자면 아래로 path로 하고
C:\Program Files\ffmpeg\bin
ffmpeg -version으로 확인할 수 있습니다
mp4 파일을 가지고 진행할 것이므로 파일 해더 정보의 위치를 재패키징 합시다. (그냥 하면 mp4는 헤더 정보가 뒤에 있어서 정상적으로 실행이 안됩니다.)
ffmpeg -i video.mp4 -movflags faststart video_faststart.mp4
이러면 준비는 끝입니다.
3. Code
변경점
- 기존 구조:
클라이언트에서 서버로 동영상 파일의 청크를 전송받아 received_video.mp4 파일에 저장한 후, 처리 결과만 클라이언트로 전송하는 구조 - 변경 후 구조:
전체 파일 수신 전, 일정량의 데이터를 버퍼링(preBuffer) 한 후, io.Pipe()를 사용해 데이터를 외부 플레이어(ffplay)로 실시간 전달하여 재생하도록 수정
clinet/main.go에서 읽어는 비디오 파일명만 아까 비디오 헤더의 위치를 바꾼 영상으로 교체합니다.
if err := processVideo(client, "../video_faststart.mp4"); err != nil {
fmt.Printf("프로세싱 오류: %v\n", err)
}
그리고 server/main.go 수정합니다.
여기서는 서버가 이전과 달리 받은 데이터로 다시 영상을 만드는 게 아닌 startConversionChain으로 writer를 할당받고
gRPC로 받은 영상 청크의 바이트 값을 writer에 쓰면 변환한 출력이 ffmpegOutPipe를 통해 ffplay의 표준 입력으로 전달되고 ffplay는 "-" 인자를 통해 입력을 자동으로 읽어 재생해 줍니다.
package main
import (
"bytes"
"fmt"
"io"
"log"
"net"
"os"
"os/exec"
"time"
pb "github.com/your_username/grpc_video_project/proto/videopb"
"google.golang.org/grpc"
)
type server struct {
pb.UnimplementedVideoServiceServer
}
// ProcessVideo는 클라이언트로부터 MP4 청크를 수신하면서,
// 일정량 버퍼링 후 ffmpeg를 통해 MPEG‑TS로 변환하여 ffplay로 재생합니다.
func (s *server) ProcessVideo(stream pb.VideoService_ProcessVideoServer) error {
// 버퍼링 임계치 설정 (예: 2MB)
const bufferThreshold = 2 * 1024 * 1024
preBuffer := bytes.NewBuffer(nil)
playbackStarted := false
// ffmpeg 변환 체인으로 데이터를 전달할 파이프 라이터
var ffmpegInWriter *io.PipeWriter
// ffmpeg 변환 체인을 시작하는 함수
startConversionChain := func() {
ffmpegInReader, writer := io.Pipe()
ffmpegInWriter = writer
// ffmpeg 명령어에 추가 옵션으로 -probesize, -analyzeduration, -fflags +genpts 추가
ffmpegCmd := exec.Command("ffmpeg",
"-probesize", "5000000",
"-analyzeduration", "5000000",
"-fflags", "+genpts",
"-i", "pipe:0",
"-movflags", "faststart",
"-f", "mpegts", "pipe:1")
ffmpegCmd.Stdin = ffmpegInReader
ffmpegOutPipe, err := ffmpegCmd.StdoutPipe()
if err != nil {
log.Fatalf("ffmpeg stdout 파이프 생성 오류: %v", err)
}
if err := ffmpegCmd.Start(); err != nil {
log.Fatalf("ffmpeg 실행 오류: %v", err)
}
go func() {
cmd := exec.Command("ffplay", "-autoexit", "-")
cmd.Stdin = ffmpegOutPipe
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
log.Printf("ffplay 실행 오류: %v", err)
} else {
log.Println("플레이어가 실행 중입니다...")
}
cmd.Wait()
log.Println("플레이어 종료")
}()
log.Println("버퍼 임계치 도달 - 변환 체인 시작 (MP4 → MPEG‑TS)")
}
for {
chunk, err := stream.Recv()
if err == io.EOF {
if playbackStarted && ffmpegInWriter != nil {
ffmpegInWriter.Close()
}
log.Println("모든 청크 수신 완료")
break
}
if err != nil {
return fmt.Errorf("청크 수신 오류: %v", err)
}
if !playbackStarted {
_, err := preBuffer.Write(chunk.Data)
if err != nil {
return fmt.Errorf("버퍼 기록 오류: %v", err)
}
if preBuffer.Len() >= bufferThreshold {
playbackStarted = true
startConversionChain()
_, err := ffmpegInWriter.Write(preBuffer.Bytes())
if err != nil {
return fmt.Errorf("ffmpeg로 preBuffer 데이터 기록 오류: %v", err)
}
preBuffer.Reset()
}
} else {
_, err := ffmpegInWriter.Write(chunk.Data)
if err != nil {
return fmt.Errorf("ffmpeg로 데이터 기록 오류: %v", err)
}
}
start := time.Now()
time.Sleep(20 * time.Millisecond)
processingTime := float32(time.Since(start).Seconds())
status := &pb.ProcessingStatus{
Sequence: chunk.Sequence,
Status: "processed",
ProcessingTime: processingTime,
}
if err := stream.Send(status); err != nil {
return fmt.Errorf("상태 전송 오류: %v", err)
}
log.Printf("청크 %d 처리 및 전송됨", chunk.Sequence)
}
return nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("포트 리스닝 오류: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterVideoServiceServer(grpcServer, &server{})
log.Println("gRPC 서버 실행 중: :50051")
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("서버 실행 오류: %v", err)
}
}
코드 설명
- 버퍼링 및 재생 트리거
- 클라이언트 청크 데이터를 preBuffer에 누적하다가, 일정 임계치(2MB)에 도달하면 변환 체인을 시작하도록 합니다.
- playbackStarted 플래그를 이용해 재생이 시작되었는지 확인합니다.
- ffmpeg 변환 체인
- startConversionChain() 함수에서는 ffmpeg의 입력용 파이프를 생성하고,
exec.Command("ffmpeg", "-i", "pipe:0", "-movflags", "faststart", "-f", "mpegts", "pipe:1") 명령어로 MP4 스트림을 MPEG‑TS 포맷으로 변환합니다. - ffmpeg의 출력은 ffplay가 재생하도록 별도의 고루틴에서 ffplay 명령어를 실행합니다.
- startConversionChain() 함수에서는 ffmpeg의 입력용 파이프를 생성하고,
- 실시간 데이터 전달
- 재생 시작 이후 클라이언트에서 들어오는 모든 청크는 바로 ffmpeg의 입력 파이프(ffmpegInWriter)로 전달됩니다.
- 클라이언트 전송 종료 시 파이프를 닫아 변환과 재생 과정을 종료합니다.
4. Result
5. Addition
위 내용은 동영상 플레이어인 FFplay의 기능을 통해 전체 동영상을 다운로드하지 않고
일부분 10%만 받은 상태도로 플레이해봤습니다.
하는 과정에서 유튜브, 트위치 같은 동영상 플랫폼은 어떻게 하는 지도 알고 싶어 졌죠.
유튜브의 경우
동영상 업로드 후 다양한 해상도와 비트레이트로 미리 인코딩 및 세그먼트화된 콘텐츠를 MPEG‑DASH와 HLS를 통해 제공하며, CDN을 활용해 전 세계 사용자에게 안정적이고 적응형 스트리밍을 제공합니다.
- MPEG‑DASH
대규모 사용자에게 적응형으로 동영상을 제공하는 데 최적화된 기술로, 유튜브와 같은 서비스에서 주로 사용됩니다, 스트리밍 방식 중 하나로 DASH (Dynamic Adaptive Streaming over HTTP)는 HTTP를 통한 동적 스트리밍의 약자입니다. (https://www.cloudflare.com/ko-kr/learning/video/what-is-mpeg-dash/) - HLS
MPEG-DASH와 같이 HTTP를 기반으로 실행되고 TCP을 전송 프로토콜로 사용하지만 MPEG-DASH가 표준인 반면 이 친구는 Apple에서 개발되었고, 애플에서만 지원합니다. - CDN
Content Delivery Network은 콘텐츠 전송 네트워크 또는 콘텐츠 배포 네트워크를 뜻합니다. 사용자에게 웹사이트나 애플리케이션의 콘텐츠를 빠르게 제공하기 위해, 전 세계에 분산된 서버 네트워크를 활용하는 기술입니다.
트위치의 경우
트위치와 같은 경우는 WebRTC를 사용할 줄 알았는데
이런 같은 대형 생방송 플랫폼은 실시간 상호작용보다는 안정적인 대규모 콘텐츠 전송과 재생에 중점을 두기 때문에, WebRTC 대신 HLS나 MPEG‑DASH와 같은 HTTP 기반 적응형 스트리밍 기술을 채택한다네요.
이유는
- 확장성과 안정성
- HTTP 기반 스트리밍은 CDN을 활용할 수 있으므로 수백만 명의 사용자에게 동시에 안정적으로 서비스를 제공할 수 있습니다.
- 트위치와 같은 플랫폼은 미리 인코딩 된 여러 품질의 세그먼트(예: HLS 세그먼트)를 CDN에 배포해, 사용자의 네트워크 상황에 맞춰 적절한 스트림을 제공할 수 있습니다.
- 재생 호환성
- HLS나 MPEG-DASH 같은 프로토콜은 HTML5 video 태그 및 다양한 플레이어 라이브러리와 호환됩니다.
- 이러한 방식은 브라우저나 모바일 앱에서 별도의 플러그인 없이 쉽게 재생될 수 있습니다.
- 지연(Latency) 문제
- WebRTC는 극저지연을 제공하는 장점이 있지만, 일반적으로 전송 시 연결 관리나 NAT traversal 등의 복잡한 문제로 인해 대규모 일방향 스트리밍에는 추가적인 인프라와 비용이 발생할 수 있습니다.
- 대부분의 생방송 서비스는 몇 초의 지연(예: 15~30초)이 허용되는 범위 내에서 안정적인 스트리밍을 제공하는 것을 우선시합니다.
WebRTC는 그러면 언제
- 주로 1:1 영상 통화나 화상 회의, 또는 매우 낮은 지연이 필요한 인터랙티브 스트리밍(예: 게임 스트리밍에서 실시간 피드백 제공) 등에 사용됩니다.
- 낮은 지연을 원할 때 유리하지만, 다수의 사용자에게 동시 전달하는 데는 대역폭 관리 및 확장성 측면에서 한계가 있을 수 있습니다.
디스코드의 경우, 디스코드가 WebRTC를 쓰네요
- WebRTC 중심
Discord는 실시간 통신의 특성상 WebRTC 기술을 기반으로 하며, 이를 통해 낮은 지연과 실시간 상호작용을 지원 - HTTP 스트리밍과의 차이점
유튜브나 트위치와 같은 플랫폼은 콘텐츠를 여러 세그먼트로 분할한 후 CDN을 통해 전 세계로 배포하는 방식(HLS, MPEG‑DASH)을 사용하는 반면, Discord는 직접적인 실시간 통화와 상호작용에 중점을 두기 때문에 WebRTC 및 SFU와 같은 기술을 채택
'Go Lang > Study' 카테고리의 다른 글
[GoLang] gRPC를 통한 동영상 스트리밍 및 모니터링 (0) | 2025.04.04 |
---|---|
[GoLang] gRPC 통신 패턴 (0) | 2025.03.21 |
[GoLang] REST API, gRPC 비교 (0) | 2025.03.15 |
[GoLang] 시간 초과 로직 Context, 채널과 비교 (0) | 2025.02.25 |
[GoLang] net/http에서 http 요청을 동시적으로 처리하는 이유 (0) | 2025.02.17 |