ABOUT ME

백엔드 개발자의 가끔 쓰는 블로그

Today
Yesterday
Total
  • FastAPI와 FFmpeg를 이용해서 동영상 자르는 프로그램을 만들어보자
    Programming 2023. 9. 30. 11:43

    Why?

    당초 계획은 FastAPI와 FFmpeg를 이용해서 만들계획은 아니었다.

    원래는 Django를 이용한 서버에서 라이브 영상 관리와 라이브 영상을 자르는 것까지 한번에 진행하려고 했었는데

    예상치도 못하게 서버의 부하가 매우 심하고 비용이 상승하는 측면이 있었고 회사에서 자체적인 물리 서버가 있었기에

    해당하는 서버에 따로 라이브 영상을 자르는 서버를 만들게 되었다.

    그리고 그것을 공유하기 위해 이렇게 글을 작성한다.

    설계(라이브 영상을 녹화하는 것에 초점)

    설계

    각각의 항목에 일단 간략하게 설명을 하자면

    - FFmpeg Server: IP Cam 을 이용해 실질적으로 라이브(HLS) 파일을 만들어내고 보관한다(Nginx로 정적 파일 서빙)

    - 메인 서버: FFmpeg Server가 HTTP 통신을 통해 받은 데이터를 DB에 저장을 하고 클라이언트 요청시 라이브 중인 영상의 데이터 제공.

    - DB: 라이브 파일에 대한 Camera ID, 시작시간, 종료 시간, 파일 주소 를 저장한다.

    - 영상 자르는 서버: Camera ID 와 시작 시간, 종료 시간을 받아 해당하는 영상을 잘라서 응답한다.

    - 클라이언트: 현재는 스마트폰 어플을 기준으로 만들어졌다.

     

    일단 해당하는 설계 설명은 기준이 라이브 영상을 녹화하는 것에 초점이 맞춰지다보니 많은 부분이 생략되어져 있다.

     

    순서로 설명하면

    1. FFmpeg 서버가 라이블 파일을 생성 후 메인 서버에 데이터를 전달

    2. 메인서버는 해당하는 파일을 검증을 거친 후 DB에 저장하고 클라이언트가 요청할 때마다 최신의 라이브 파일 데이터를 제공

    3. 클라이언트가 해당 녹화 버튼을 클릭하면 현재의 UTC 시간을 시작시간으로 설정하고 녹화 종료를 누르면 UTC 시간을 종료시간으로 설정하여 영상 자르는 서버에 요청

    4. 영상 자르는 서버는 시작시간과 종료시간에 포함되는 라이브 영상들을 가져와 자르고 붙이는 과정을 통해서 녹화 영상과 썸네일을 생성하고 클라이언트에 전달

     

    Fast API 코드

    from fastapi import FastAPI, HTTPException
    from dateutil import parser
    from pydantic import BaseModel
    from datetime import datetime
    
    from util import make_record
    from crud import filter_live
    
    app = FastAPI()
    
    
    class Trim(BaseModel):
        cctv_id: int
        started: str
        ended: str
    
    
    @app.post("/trim")
    def trim_video(request_data: Trim):
        cctv_id = request_data.cctv_id
        started = request_data.started
        ended = request_data.ended
        print(f"[{datetime.now()}] - ({cctv_id}) started: {started}, ended: {ended}")
        try:
            started = parser.parse(started)
            ended = parser.parse(ended)
        except parser.ParserError:
            raise HTTPException(status_code=400, detail={"message": "형식이 올바르지 않습니다."})
    
        if started > ended:
            raise HTTPException(
                status_code=400, detail={"message": "시작시간이 종료시간보다 앞설 수 없습니다."}
            )
    
        if (ended - started).total_seconds() < 3:
            raise HTTPException(
                status_code=400, detail={"message": "최소 영상의 길이가 3초를 넘겨야합니다."}
            )
    
        data = filter_live(cctv_id, started, ended)
        if not data:
            raise HTTPException(
                status_code=400, detail={"message": "해당하는 시간에 라이브가 존재하지 않습니다."}
            )
        res = make_record(started=started, ended=ended, lives=data)
    
        if not res:
            raise HTTPException(
                status_code=400, detail={"message": "해당하는 시간대의 영상을 자르는데에 실패하였습니다."}
            )
    
        return res

     

    해당 코드의 경우 검증, 함수 호출, 응답에 초점을 두고 있다.

     

    검증의 경우 총 4가지의 경우를 잡는다.

    1. 시작시간과 종료시간의 DateTime의 형식이 올바르지 않은 경우

    2. 시작시간이 종료시간보다 미래인 경우

    3. 녹화하려고 하는 범위가 너무 짧은 경우

    4. 존재하는 라이브 영상이 없는 경우

     

    함수 호출은 크게 두개를 한다.

    1. DB에 시작시간과 종료시간에 포함되는 라이브 영상 데이터 불러오기

    2. 시작시간, 종료시간, 라이브 영상 파일을 통해서 녹화 파일을 만들기

     

    응답의 경우 녹화 영상 파일의 주소와 썸네일 주소를 반환한다.

     

    DB 코드

    from db import engine
    from sqlalchemy.sql.expression import text
    
    
    def filter_live(id, started, ended):
        # stmt = select(Live).where()
        data = []
        with engine.connect() as conn:
            for row in conn.execute(
                text(
                    f"select id, started, ended, url from equipment_live \
                        where cctv_id=:id and \
                        (\
                            (started <= :started and ended >= :started) or \
                            (started <= :ended and ended >= :ended) or \
                            (ended is null) \
                        ) \
                        order by id asc"
                ).bindparams(id=id, started=started, ended=ended)
            ):
                data.append(row)
    
        return data

    DB 코드의 경우 간단하다.

    Raw Query 를 통해서 시작시간과 종료시간에 포함되는 라이브 영상을 가져온다.

     

    유의점

    여기에서 한가지 생각할 점은 종료시간이 Null 인 경우에도 불러온다는 것이다.

    이렇게 하는 이유는 종료시간이 Null인 경우는 한가지인데 현재 라이브 중인 파일이기에
    아직 종료시간이 설정이 안되었기에 무조건 가져와야 한다.

     

    Trim 코드

    def make_record(started, ended, lives, retries=0):
        if retries > 5:
            return {}
        # 처음 live started 에서 현재의 녹화 지점을 - 한다 => live.started 04:00, started 04:20 => 04:20 - 04:00 = 20(start_point)
        # end_point 의 경우 ended 에서 started 뺀다 started 04:20, ended 04:30 => 04:30 - 04:20 = 10(remain)
        # video remain 계산 후 남은 경우 다음으로 넘어가게 함 live.ended 04:05  => 04:05 - 04:00 = 5, 10 - 5 (다음으로 넘김)
        files = []
        started = started.replace(tzinfo=datetime.timezone.utc)
        ended = ended.replace(tzinfo=datetime.timezone.utc)
    
        start_point = int((started - lives[0].started).total_seconds())
        remain = int((ended - started).total_seconds())
    
        # 녹화 파일 생성 및 리스트
        for idx, live in enumerate(lives, start=1):
            file = str(uuid.uuid4())[:16]
            live_ended = live.ended
    
            if live_ended is None or not live_ended:
                live_ended = datetime.datetime.utcnow().replace(
                    tzinfo=datetime.timezone.utc
                )
    
            video_remain = int((live_ended - live.started).total_seconds()) - start_point
    
            if remain > video_remain:
                make_video(live.url, start_point, video_remain, file)
                remain = remain - video_remain
                start_point = 0
            else:
                make_video(live.url, start_point, remain, file)
                remain = 0
            files.append(f"{file}.mp4")
    
            if remain <= 0:
                break
    
        # 리스트의 길이가 1보다 큰경우 영상이 다수이기에 합치기
        if len(files) > 1:
            file = str(uuid.uuid4())[:16]
            make_files_text(files)
            concat_video("./files.txt", file)
    
        make_thumbnail(file)
    
        # 업로드
        s3 = boto3.resource(
            "s3",
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
        )
    
        data = {
            "thumbnail_url": f"{settings.AWS_BUCKET_URL}{file}.png",
            "record_url": f"{settings.AWS_BUCKET_URL}{file}.mp4",
        }
    
        try:
            s3.meta.client.upload_file(f"./{file}.png", "odn-video-record", f"{file}.png")
            s3.meta.client.upload_file(f"./{file}.mp4", "odn-video-record", f"{file}.mp4")
        except FileNotFoundError:
            print(f"please check request's argument, retires={retries}")
            time.sleep(1)
            data = make_record(started, ended, lives, retries + 1)
    
        os.system(f"rm -rf {file}.mp4")
        os.system(f"rm -rf {file}.png")
    
        # 합쳐진 영상 놔두고 삭제 및 파일 리스트 삭제
        for file in files:
            os.system(f"rm -rf ./{file}")
        os.system(f"rm -rf ./files.txt")
        return data

    코드가 지저분한 거는 아쉽다...

     

    순서

    1. 시작시간과 종료시간을 기준으로 총 녹화되어져야하는 영상의 길이를 얻는다.

    2. 처음 영상의 시작시간을 시작시간과 영상의 시작시간을 통해서 얻는다.

    3-1. (녹화가 되어져야하는 영상의 길이보다 라이브 영상의 길이가 긴 경우)
    시작시간으로부터 녹화가 되어져야 하는 길이만큼 영상을 잘라낸다.

    3-2. (녹화가 되어져야하는 영상의 길이보다 라이브 영상의 길이가 짧은 경우)
    시작시간으로부터 끝까지 영상을 잘라내고 남은 해당 길이만큼 총 녹화 영상의 길이를 빼주고 다음 녹화 라이브 영상을 가져와 시작포인트를 처음으로 설정하고 영상을 남은 길이만큼 자른다. 영상이 최소 2개 이상이 나오기에 해당 영상을 합쳐준다.

    4. 녹화 영상에 대한 썸네일을 만들고 S3에 업로드 후 해당 영상과 썸네일의 주소를 데이터에 담아 전달한다.

    5. 로컬 파일은 일괄 삭제를 진행한다.

     

    생각해볼만한 점

    1. retries 는 왜 존재하는가?
    -> 라이브 영상을 자를 때 파일은 생성이 되어졌지만 제대로 된 영상이 아닌 경우가 생긴다(재생이 안되어지는 파일).
    이때 썸네일 파일 생성을 못하는데 그럴 경우 재귀함수를 통해서 다시 반복을 한다.(대부분 1번 더 돌아가면 제대로 생성이 되어진다.)

    2. S3에 올리는 이유?

    -> 일단 가장 큰점은 로컬을 통해서 파일을 관리하기가 어렵다는 것이다. 서버의 용량이 무한정은 아니기에 주기적으로 삭제를 거쳐야 하는 경우가 생기는데 이런 경우 코드를 작성해야 한다. 하지만, S3의 경우 버전관리를 통해서 일정시간이 지난 경우 자동적으로 삭제해주는 기능이 있기에 이러한 부분이 휴먼 리소스를 줄일 수 있다고 판단하여 S3에 올리는 이유다.

    3. 영상의 순서는?

    -> 영상 자르는 순서의 기준은 과거영상 -> 최신영상 현재 5분마다 라이브 영상이 바꿔치기 되어지기때문에 5분 간격으로 라이브 영상이 존재하고 그로인해서 클라이언트가 요청할 때 여러개의 라이브영상이 함께 불러와질수도 있다. 그리고 현재 시간의 양으로 자르기에 과거영상부터 시작해 최신영상으로 오는 순서를 가지고 있다.

     

    FFmpeg 코드

    #!/bin/bash
    ffmpeg -ss $1 -i $3 -t $2 -acodec copy -vcodec copy $4.mp4 > /dev/null 2>&1
    
    ffmpeg -i $1.mp4 -ss 0 -vframes 1 $1.png > /dev/null 2>&1

     

    영상 자르는 코드와 썸네일 코드가 원래는 분리되어있지만 올리면서 보기 편하게 합쳐서 만들었다.

     

    1. 영상을 자르는 스크립트인데 인자가 들어간 옵션 설명은 밑을 참고

    -ss : 시작지점

    -i : 소스 파일

    -t : 자를 시간의 양

     

    여기에서 가장 중요한 옵션이 있는데 -ss 이다. 저 옵션은 가장 앞에 있는 것이 무조건 옳다.

    이유의 경우 정확하게 기억은 못하지만

    -ss 가 앞에 있는 경우 시작 포인트를 미리 지정을 하기에 영상을 읽을 때 해당 지점이 나오는 경우부터 시작을 해서 읽는다면

    -ss 가 앞에 없는 경우 영상을 미리 전체를 읽고 다시 시작 포인트를 읽는 형태이기에 느리다

     

    라이브 중인 파일의 경우 접근하는 속도가 매우 느린데 이유는 m3u8이 FFmpeg에 의해 읽고 쓰여지기를 반복하다보니
    다른 쪽에서 읽을려고 하면 FFmpeg가 읽고 쓰기가 끝나는 경우에 접근이 가능해진다(공유 자원관련으로 인해 생기는 상황).

    여기에 -ss 옵션이 앞에 없는 경우 전체를 읽는 번거로운 과정을 거치다보니 더욱 더 느려지는데 일조를 하게 되어진다.

    그렇기에 -ss 옵션은 제일 앞에 있어야 한다.

     

    또 추가적으로 -acodec copy -vcodec copy 도 하는 게 좋다(여러가지 이유가 있지만 그냥 한가지 이유 속도가 매우 빨라진다.)

     

    2. 썸네일을 만드는 스크립트인데 뭐 별거 없다.

    영상소스를 받아 제일 처음 부분의 프레임을 png 파일로 생성하는 것이다.

     

     

    그외 설명할 것

    1. 일단 전체 코드를 올리지는 않았다.
    중요하다고 생각되어지는 부분을 올렸고 이 부분을 통해서 충분히 정보 전달이 되어질 것이라고 생각한다.

    2. FFmpeg 는 상당히 고사양이다.
    그래서 AWS 를 사용하지않고 자체서버에서 진행을 했다. 동시에 여러개가 돌아가는 경우 비용이 크기도 하고 감당을 못하는 경우도 존재했다. 그래서 이미 물리적인 서버가 나름 고사양이었기에 해당 서버에 진행했다.(물론 다른 이점을 포기하고 결정)

    3. 왜 FastAPI?

    일단 회사의 언어가 Python이다보니 Python 기준으로 프레임워크를 찾았고 기능에 대한 사이즈가 FastAPI가 가장 적합하다고 생각하여 FastAPI로 결정했다(Django의 경우 사이즈가 크다고 생각했고, Flask의 경우 깔끔하게 작성하는데 공을 들여야했다). 

Designed by Tistory.