gradio/test/components/test_video.py
meg de997e67c9
Adds ability to watermark videos via a watermark parameter in Video component (#8900)
* First pass at adding watermark to Video component

* add changeset

* Update video.py

Space.

* Adding demo files

* linting

* Automatically generated notebook generator

* lint

* Changing parentheses in check on whether conversion is needed

* Unimportant comment/spacing changes

* video

* Update gradio/components/video.py

Co-authored-by: Abubakar Abid <abubakar@huggingface.co>

* Update gradio/components/video.py

Co-authored-by: Abubakar Abid <abubakar@huggingface.co>

* Update video.py

Adding more details on watermark_file.

* Update gradio/components/video.py

Change in and/or statement.

Co-authored-by: Abubakar Abid <abubakar@huggingface.co>

* Updating handling of WASM.

* Add paren.

* Minor lint change: if instead of elif

* lint change: spacing

* tweaks

* fixes

* demo

* demos

* notebooks

* notebooks

* remove comments

* video

---------

Co-authored-by: gradio-pr-bot <gradio-pr-bot@users.noreply.github.com>
Co-authored-by: Abubakar Abid <abubakar@huggingface.co>
2024-07-29 21:09:38 +00:00

245 lines
9.2 KiB
Python

import os
import shutil
import tempfile
from copy import deepcopy
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from gradio_client import media_data
import gradio as gr
from gradio import processing_utils
from gradio.components.video import VideoData
from gradio.data_classes import FileData
class TestVideo:
@pytest.mark.asyncio
async def test_component_functions(self):
"""
Preprocess, serialize, deserialize, get_config
"""
x_video = VideoData(
video=FileData(path=deepcopy(media_data.BASE64_VIDEO)["path"])
)
video_input = gr.Video()
x_video = await processing_utils.async_move_files_to_cache(
[x_video], video_input
)
x_video = x_video[0]
output1 = video_input.preprocess(x_video)
assert isinstance(output1, str)
output2 = video_input.preprocess(x_video)
assert output1 == output2
video_input = gr.Video(include_audio=False)
output1 = video_input.preprocess(x_video)
output2 = video_input.preprocess(x_video)
assert output1 == output2
video_input = gr.Video(label="Upload Your Video")
assert video_input.get_config() == {
"autoplay": False,
"sources": ["upload", "webcam"],
"name": "video",
"show_share_button": False,
"show_label": True,
"label": "Upload Your Video",
"container": True,
"min_width": 160,
"scale": None,
"show_download_button": None,
"height": None,
"width": None,
"elem_id": None,
"elem_classes": [],
"visible": True,
"value": None,
"interactive": None,
"proxy_url": None,
"mirror_webcam": True,
"include_audio": True,
"format": None,
"min_length": None,
"max_length": None,
"_selectable": False,
"key": None,
"loop": False,
"watermark": None,
}
assert video_input.preprocess(None) is None
video_input = gr.Video(format="avi")
output_video = video_input.preprocess(x_video)
assert output_video
assert output_video[-3:] == "avi"
assert "flip" not in output_video
# Output functionalities
y_vid_path = "test/test_files/video_sample.mp4"
subtitles_path = "test/test_files/s1.srt"
video_output = gr.Video()
output1 = video_output.postprocess(y_vid_path)
assert output1
output1 = output1.model_dump()["video"]["path"]
assert output1.endswith("mp4")
output2 = video_output.postprocess(y_vid_path)
assert output2
output2 = output2.model_dump()["video"]["path"]
assert output1 == output2
output3 = video_output.postprocess(y_vid_path)
assert output3
assert output3.model_dump()["video"]["orig_name"] == "video_sample.mp4"
output_with_subtitles = video_output.postprocess((y_vid_path, subtitles_path))
assert output_with_subtitles
output_with_subtitles = output_with_subtitles.model_dump()
assert output_with_subtitles["subtitles"]["path"].endswith(".vtt")
p_video = gr.Video()
video_with_subtitle = gr.Video()
postprocessed_video = p_video.postprocess(Path(y_vid_path))
assert postprocessed_video
postprocessed_video = postprocessed_video.model_dump()
postprocessed_video_with_subtitle = video_with_subtitle.postprocess(
(Path(y_vid_path), Path(subtitles_path))
)
assert postprocessed_video_with_subtitle
postprocessed_video_with_subtitle = (
postprocessed_video_with_subtitle.model_dump()
)
processed_video = {
"video": {
"path": "video_sample.mp4",
"orig_name": "video_sample.mp4",
"mime_type": None,
"size": None,
"url": None,
"is_stream": False,
"meta": {"_type": "gradio.FileData"},
},
"subtitles": None,
}
processed_video_with_subtitle = {
"video": {
"path": "video_sample.mp4",
"orig_name": "video_sample.mp4",
"mime_type": None,
"size": None,
"url": None,
"is_stream": False,
"meta": {"_type": "gradio.FileData"},
},
"subtitles": {
"path": "s1.srt",
"mime_type": None,
"orig_name": None,
"size": None,
"url": None,
"is_stream": False,
"meta": {"_type": "gradio.FileData"},
},
}
postprocessed_video["video"]["path"] = os.path.basename(
postprocessed_video["video"]["path"]
)
assert processed_video == postprocessed_video
postprocessed_video_with_subtitle["video"]["path"] = os.path.basename(
postprocessed_video_with_subtitle["video"]["path"]
)
if postprocessed_video_with_subtitle["subtitles"]["path"]:
postprocessed_video_with_subtitle["subtitles"]["path"] = "s1.srt"
assert processed_video_with_subtitle == postprocessed_video_with_subtitle
def test_in_interface(self):
"""
Interface, process
"""
x_video = media_data.BASE64_VIDEO["path"]
iface = gr.Interface(lambda x: x, "video", "playable_video")
assert iface({"video": x_video})["video"].endswith(".mp4")
def test_with_waveform(self):
"""
Interface, process
"""
x_audio = media_data.BASE64_AUDIO["path"]
iface = gr.Interface(lambda x: gr.make_waveform(x), "audio", "video")
assert iface(x_audio)["video"].endswith(".mp4")
def test_video_postprocess_converts_to_playable_format(self):
test_file_dir = Path(__file__).parent.parent / "test_files"
# This file has a playable container but not playable codec
with tempfile.NamedTemporaryFile(
suffix="bad_video.mp4", delete=False
) as tmp_not_playable_vid:
bad_vid = str(test_file_dir / "bad_video_sample.mp4")
assert not processing_utils.video_is_playable(bad_vid)
shutil.copy(bad_vid, tmp_not_playable_vid.name)
output = gr.Video().postprocess(tmp_not_playable_vid.name)
assert output
output = output.model_dump()
assert processing_utils.video_is_playable(output["video"]["path"])
# This file has a playable codec but not a playable container
with tempfile.NamedTemporaryFile(
suffix="playable_but_bad_container.mkv", delete=False
) as tmp_not_playable_vid:
bad_vid = str(test_file_dir / "playable_but_bad_container.mkv")
assert not processing_utils.video_is_playable(bad_vid)
shutil.copy(bad_vid, tmp_not_playable_vid.name)
output = gr.Video().postprocess(tmp_not_playable_vid.name)
assert output
output = output.model_dump()
assert processing_utils.video_is_playable(output["video"]["path"])
@patch("pathlib.Path.exists", MagicMock(return_value=False))
@patch("gradio.components.video.FFmpeg")
def test_video_preprocessing_flips_video_for_webcam(self, mock_ffmpeg):
# Ensures that the cached temp video file is not used so that ffmpeg is called for each test
x_video = VideoData(video=FileData(path=media_data.BASE64_VIDEO["path"]))
video_input = gr.Video(sources=["webcam"])
_ = video_input.preprocess(x_video)
# Dict mapping filename to FFmpeg options
output_params = mock_ffmpeg.call_args_list[0][1]["outputs"]
assert "hflip" in list(output_params.values())[0]
assert "flip" in list(output_params.keys())[0]
mock_ffmpeg.reset_mock()
_ = gr.Video(
sources=["webcam"], mirror_webcam=False, include_audio=True
).preprocess(x_video)
mock_ffmpeg.assert_not_called()
mock_ffmpeg.reset_mock()
_ = gr.Video(sources=["upload"], format="mp4", include_audio=True).preprocess(
x_video
)
mock_ffmpeg.assert_not_called()
mock_ffmpeg.reset_mock()
output_file = gr.Video(
sources=["webcam"], mirror_webcam=True, format="avi"
).preprocess(x_video)
assert output_file
output_params = mock_ffmpeg.call_args_list[0][1]["outputs"]
assert "hflip" in list(output_params.values())[0]
assert "flip" in list(output_params.keys())[0]
assert ".avi" in list(output_params.keys())[0]
assert ".avi" in output_file
mock_ffmpeg.reset_mock()
output_file = gr.Video(
sources=["webcam"], mirror_webcam=False, format="avi", include_audio=False
).preprocess(x_video)
assert output_file
output_params = mock_ffmpeg.call_args_list[0][1]["outputs"]
assert list(output_params.values())[0] == ["-an"]
assert "flip" not in Path(list(output_params.keys())[0]).name
assert ".avi" in list(output_params.keys())[0]
assert ".avi" in output_file