2022-01-21 21:44:12 +08:00
import os
2022-08-17 01:21:13 +08:00
import shutil
2022-01-21 21:44:12 +08:00
import tempfile
2022-04-05 18:08:53 +08:00
from copy import deepcopy
2023-03-09 04:24:09 +08:00
from pathlib import Path
from unittest . mock import patch
2022-01-21 21:44:12 +08:00
2022-08-17 01:21:13 +08:00
import ffmpy
2021-11-04 05:17:26 +08:00
import numpy as np
2022-08-30 00:57:04 +08:00
import pytest
2023-04-14 07:20:33 +08:00
from gradio_client import media_data
2023-05-20 05:22:12 +08:00
from PIL import Image , ImageCms
2021-11-03 05:22:52 +08:00
2023-10-31 12:46:02 +08:00
from gradio import processing_utils , utils
class TestTempFileManagement :
def test_hash_file ( self ) :
h1 = processing_utils . hash_file ( " gradio/test_data/cheetah1.jpg " )
h2 = processing_utils . hash_file ( " gradio/test_data/cheetah1-copy.jpg " )
h3 = processing_utils . hash_file ( " gradio/test_data/cheetah2.jpg " )
assert h1 == h2
assert h1 != h3
def test_make_temp_copy_if_needed ( self , gradio_temp_dir ) :
f = processing_utils . save_file_to_cache (
" gradio/test_data/cheetah1.jpg " , cache_dir = gradio_temp_dir
)
try : # Delete if already exists from before this test
os . remove ( f )
except OSError :
pass
f = processing_utils . save_file_to_cache (
" gradio/test_data/cheetah1.jpg " , cache_dir = gradio_temp_dir
)
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 1
assert Path ( f ) . name == " cheetah1.jpg "
f = processing_utils . save_file_to_cache (
" gradio/test_data/cheetah1.jpg " , cache_dir = gradio_temp_dir
)
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 1
f = processing_utils . save_file_to_cache (
" gradio/test_data/cheetah1-copy.jpg " , cache_dir = gradio_temp_dir
)
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 2
assert Path ( f ) . name == " cheetah1-copy.jpg "
def test_save_b64_to_cache ( self , gradio_temp_dir ) :
base64_file_1 = media_data . BASE64_IMAGE
base64_file_2 = media_data . BASE64_AUDIO [ " data " ]
f = processing_utils . save_base64_to_cache (
base64_file_1 , cache_dir = gradio_temp_dir
)
try : # Delete if already exists from before this test
os . remove ( f )
except OSError :
pass
f = processing_utils . save_base64_to_cache (
base64_file_1 , cache_dir = gradio_temp_dir
)
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 1
f = processing_utils . save_base64_to_cache (
base64_file_1 , cache_dir = gradio_temp_dir
)
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 1
f = processing_utils . save_base64_to_cache (
base64_file_2 , cache_dir = gradio_temp_dir
)
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 2
@pytest.mark.flaky
def test_save_url_to_cache ( self , gradio_temp_dir ) :
url1 = " https://raw.githubusercontent.com/gradio-app/gradio/main/gradio/test_data/test_image.png "
url2 = " https://raw.githubusercontent.com/gradio-app/gradio/main/gradio/test_data/cheetah1.jpg "
f = processing_utils . save_url_to_cache ( url1 , cache_dir = gradio_temp_dir )
try : # Delete if already exists from before this test
os . remove ( f )
except OSError :
pass
f = processing_utils . save_url_to_cache ( url1 , cache_dir = gradio_temp_dir )
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 1
f = processing_utils . save_url_to_cache ( url1 , cache_dir = gradio_temp_dir )
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 1
f = processing_utils . save_url_to_cache ( url2 , cache_dir = gradio_temp_dir )
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 2
2022-03-30 18:26:39 +08:00
2023-11-04 06:39:25 +08:00
def test_save_url_to_cache_with_spaces ( self , gradio_temp_dir ) :
url = " https://huggingface.co/datasets/freddyaboulton/gradio-reviews/resolve/main00015-20230906102032-7778-Wonderwoman VintageMagStyle _lora_SDXL-VintageMagStyle-Lora_1_, Very detailed, clean, high quality, sharp image.jpg "
processing_utils . save_url_to_cache ( url , cache_dir = gradio_temp_dir )
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 1
2024-02-07 03:46:20 +08:00
def test_save_url_to_cache_with_redirect ( self , gradio_temp_dir ) :
url = " https://huggingface.co/datasets/Xenova/transformers.js-docs/resolve/main/bread_small.png "
processing_utils . save_url_to_cache ( url , cache_dir = gradio_temp_dir )
assert len ( [ f for f in gradio_temp_dir . glob ( " **/* " ) if f . is_file ( ) ] ) == 1
2021-11-09 22:48:55 +08:00
2022-11-08 08:37:55 +08:00
class TestImagePreprocessing :
2022-04-05 18:08:53 +08:00
def test_encode_plot_to_base64 ( self ) :
2023-05-02 12:59:31 +08:00
with utils . MatplotlibBackendMananger ( ) :
import matplotlib . pyplot as plt
plt . plot ( [ 1 , 2 , 3 , 4 ] )
output_base64 = processing_utils . encode_plot_to_base64 ( plt )
2022-11-08 08:37:55 +08:00
assert output_base64 . startswith (
"  "
2022-04-05 18:08:53 +08:00
)
2021-11-04 05:17:26 +08:00
def test_encode_array_to_base64 ( self ) :
2022-04-06 20:55:51 +08:00
img = Image . open ( " gradio/test_data/test_image.png " )
2021-11-04 05:17:26 +08:00
img = img . convert ( " RGB " )
numpy_data = np . asarray ( img , dtype = np . uint8 )
2022-12-16 04:37:09 +08:00
output_base64 = processing_utils . encode_array_to_base64 ( numpy_data )
2022-11-08 08:37:55 +08:00
assert output_base64 == deepcopy ( media_data . ARRAY_TO_BASE64_IMAGE )
2021-11-09 04:37:32 +08:00
2022-09-05 09:54:30 +08:00
def test_encode_pil_to_base64 ( self ) :
img = Image . open ( " gradio/test_data/test_image.png " )
img = img . convert ( " RGB " )
img . info = { } # Strip metadata
2022-12-16 04:37:09 +08:00
output_base64 = processing_utils . encode_pil_to_base64 ( img )
2022-11-08 08:37:55 +08:00
assert output_base64 == deepcopy ( media_data . ARRAY_TO_BASE64_IMAGE )
2022-09-05 09:54:30 +08:00
2023-10-31 12:46:02 +08:00
def test_save_pil_to_file_keeps_pnginfo ( self , gradio_temp_dir ) :
2023-02-10 02:20:50 +08:00
input_img = Image . open ( " gradio/test_data/test_image.png " )
input_img = input_img . convert ( " RGB " )
input_img . info = { " key1 " : " value1 " , " key2 " : " value2 " }
2023-10-31 12:46:02 +08:00
input_img . save ( gradio_temp_dir / " test_test_image.png " )
2023-02-10 02:20:50 +08:00
2023-10-31 12:46:02 +08:00
file_obj = processing_utils . save_pil_to_cache (
input_img , cache_dir = gradio_temp_dir
)
2023-02-10 02:20:50 +08:00
output_img = Image . open ( file_obj )
assert output_img . info == input_img . info
2023-10-31 12:46:02 +08:00
def test_np_pil_encode_to_the_same ( self , gradio_temp_dir ) :
2023-05-20 05:22:12 +08:00
arr = np . random . randint ( 0 , 255 , size = ( 100 , 100 , 3 ) , dtype = np . uint8 )
pil = Image . fromarray ( arr )
2023-10-31 12:46:02 +08:00
assert processing_utils . save_pil_to_cache (
pil , cache_dir = gradio_temp_dir
) == processing_utils . save_img_array_to_cache ( arr , cache_dir = gradio_temp_dir )
2023-05-20 05:22:12 +08:00
2023-10-31 12:46:02 +08:00
def test_encode_pil_to_temp_file_metadata_color_profile ( self , gradio_temp_dir ) :
2023-05-20 05:22:12 +08:00
# Read image
img = Image . open ( " gradio/test_data/test_image.png " )
img_metadata = Image . open ( " gradio/test_data/test_image.png " )
img_metadata . info = { " key1 " : " value1 " , " key2 " : " value2 " }
# Creating sRGB profile
profile = ImageCms . createProfile ( " sRGB " )
profile2 = ImageCms . ImageCmsProfile ( profile )
2023-10-31 12:46:02 +08:00
img . save (
gradio_temp_dir / " img_color_profile.png " , icc_profile = profile2 . tobytes ( )
)
img_cp1 = Image . open ( str ( gradio_temp_dir / " img_color_profile.png " ) )
2023-05-20 05:22:12 +08:00
# Creating XYZ profile
profile = ImageCms . createProfile ( " XYZ " )
profile2 = ImageCms . ImageCmsProfile ( profile )
2023-10-31 12:46:02 +08:00
img . save (
gradio_temp_dir / " img_color_profile_2.png " , icc_profile = profile2 . tobytes ( )
)
img_cp2 = Image . open ( str ( gradio_temp_dir / " img_color_profile_2.png " ) )
img_path = processing_utils . save_pil_to_cache ( img , cache_dir = gradio_temp_dir )
img_metadata_path = processing_utils . save_pil_to_cache (
img_metadata , cache_dir = gradio_temp_dir
)
img_cp1_path = processing_utils . save_pil_to_cache (
img_cp1 , cache_dir = gradio_temp_dir
)
img_cp2_path = processing_utils . save_pil_to_cache (
img_cp2 , cache_dir = gradio_temp_dir
)
2023-05-20 05:22:12 +08:00
assert len ( { img_path , img_metadata_path , img_cp1_path , img_cp2_path } ) == 4
2021-11-09 04:37:32 +08:00
def test_resize_and_crop ( self ) :
2022-04-06 20:55:51 +08:00
img = Image . open ( " gradio/test_data/test_image.png " )
2022-12-16 04:37:09 +08:00
new_img = processing_utils . resize_and_crop ( img , ( 20 , 20 ) )
2022-11-08 08:37:55 +08:00
assert new_img . size == ( 20 , 20 )
with pytest . raises ( ValueError ) :
2022-12-16 04:37:09 +08:00
processing_utils . resize_and_crop (
2022-11-08 08:37:55 +08:00
* * { " img " : img , " size " : ( 20 , 20 ) , " crop_type " : " test " }
)
2022-01-21 21:44:12 +08:00
2021-11-09 04:37:32 +08:00
2022-11-08 08:37:55 +08:00
class TestAudioPreprocessing :
2021-11-09 04:37:32 +08:00
def test_audio_from_file ( self ) :
2022-12-16 04:37:09 +08:00
audio = processing_utils . audio_from_file ( " gradio/test_data/test_audio.wav " )
2022-11-08 08:37:55 +08:00
assert audio [ 0 ] == 22050
assert isinstance ( audio [ 1 ] , np . ndarray )
2021-11-09 04:37:32 +08:00
def test_audio_to_file ( self ) :
2022-12-16 04:37:09 +08:00
audio = processing_utils . audio_from_file ( " gradio/test_data/test_audio.wav " )
processing_utils . audio_to_file ( audio [ 0 ] , audio [ 1 ] , " test_audio_to_file " )
2022-11-08 08:37:55 +08:00
assert os . path . exists ( " test_audio_to_file " )
2021-11-09 04:37:32 +08:00
os . remove ( " test_audio_to_file " )
2022-10-27 07:24:46 +08:00
def test_convert_to_16_bit_wav ( self ) :
# Generate a random audio sample and set the amplitude
audio = np . random . randint ( - 100 , 100 , size = ( 100 ) , dtype = " int16 " )
audio [ 0 ] = - 32767
audio [ 1 ] = 32766
audio_ = audio . astype ( " float64 " )
2022-12-16 04:37:09 +08:00
audio_ = processing_utils . convert_to_16_bit_wav ( audio_ )
2022-10-27 07:24:46 +08:00
assert np . allclose ( audio , audio_ )
assert audio_ . dtype == " int16 "
audio_ = audio . astype ( " float32 " )
2022-12-16 04:37:09 +08:00
audio_ = processing_utils . convert_to_16_bit_wav ( audio_ )
2022-10-27 07:24:46 +08:00
assert np . allclose ( audio , audio_ )
assert audio_ . dtype == " int16 "
2022-12-16 04:37:09 +08:00
audio_ = processing_utils . convert_to_16_bit_wav ( audio )
2022-10-27 07:24:46 +08:00
assert np . allclose ( audio , audio_ )
assert audio_ . dtype == " int16 "
2021-11-04 05:17:26 +08:00
2022-12-16 04:37:09 +08:00
class TestOutputPreprocessing :
2022-01-21 21:44:12 +08:00
float_dtype_list = [
float ,
float ,
np . double ,
np . single ,
np . float32 ,
np . float64 ,
" float32 " ,
" float64 " ,
]
2021-11-09 04:37:32 +08:00
2021-11-04 05:17:26 +08:00
def test_float_conversion_dtype ( self ) :
2023-05-12 04:20:41 +08:00
""" Test any conversion from a float dtype to an other. """
2021-11-04 05:17:26 +08:00
x = np . array ( [ - 1 , 1 ] )
# Test all combinations of dtypes conversions
2022-01-21 21:44:12 +08:00
dtype_combin = np . array (
np . meshgrid (
2022-10-19 03:58:01 +08:00
TestOutputPreprocessing . float_dtype_list ,
TestOutputPreprocessing . float_dtype_list ,
2022-01-21 21:44:12 +08:00
)
) . T . reshape ( - 1 , 2 )
2021-11-04 05:17:26 +08:00
for dtype_in , dtype_out in dtype_combin :
x = x . astype ( dtype_in )
2022-12-16 04:37:09 +08:00
y = processing_utils . _convert ( x , dtype_out )
2021-11-04 05:17:26 +08:00
assert y . dtype == np . dtype ( dtype_out )
def test_subclass_conversion ( self ) :
""" Check subclass conversion behavior """
x = np . array ( [ - 1 , 1 ] )
2022-10-19 03:58:01 +08:00
for dtype in TestOutputPreprocessing . float_dtype_list :
2021-11-04 05:17:26 +08:00
x = x . astype ( dtype )
2022-12-16 04:37:09 +08:00
y = processing_utils . _convert ( x , np . floating )
2021-11-04 05:17:26 +08:00
assert y . dtype == x . dtype
2021-11-03 05:22:52 +08:00
2022-01-21 21:44:12 +08:00
2022-10-19 03:58:01 +08:00
class TestVideoProcessing :
def test_video_has_playable_codecs ( self , test_file_dir ) :
2022-12-16 04:37:09 +08:00
assert processing_utils . video_is_playable (
2022-10-19 03:58:01 +08:00
str ( test_file_dir / " video_sample.mp4 " )
)
2022-12-16 04:37:09 +08:00
assert processing_utils . video_is_playable (
2022-10-19 03:58:01 +08:00
str ( test_file_dir / " video_sample.ogg " )
)
2022-12-16 04:37:09 +08:00
assert processing_utils . video_is_playable (
2022-10-19 03:58:01 +08:00
str ( test_file_dir / " video_sample.webm " )
)
2022-12-16 04:37:09 +08:00
assert not processing_utils . video_is_playable (
2022-10-19 03:58:01 +08:00
str ( test_file_dir / " bad_video_sample.mp4 " )
)
2022-08-30 00:57:04 +08:00
2022-10-19 03:58:01 +08:00
def raise_ffmpy_runtime_exception ( * args , * * kwargs ) :
raise ffmpy . FFRuntimeError ( " " , " " , " " , " " )
2022-08-30 00:57:04 +08:00
2022-10-19 03:58:01 +08:00
@pytest.mark.parametrize (
" exception_to_raise " , [ raise_ffmpy_runtime_exception , KeyError ( ) , IndexError ( ) ]
)
def test_video_has_playable_codecs_catches_exceptions (
self , exception_to_raise , test_file_dir
) :
2023-05-05 10:54:23 +08:00
with patch (
" ffmpy.FFprobe.run " , side_effect = exception_to_raise
) , tempfile . NamedTemporaryFile (
suffix = " out.avi " , delete = False
) as tmp_not_playable_vid :
shutil . copy (
str ( test_file_dir / " bad_video_sample.mp4 " ) ,
tmp_not_playable_vid . name ,
)
assert processing_utils . video_is_playable ( tmp_not_playable_vid . name )
2022-10-19 03:58:01 +08:00
def test_convert_video_to_playable_mp4 ( self , test_file_dir ) :
2022-12-16 04:37:09 +08:00
with tempfile . NamedTemporaryFile (
suffix = " out.avi " , delete = False
) as tmp_not_playable_vid :
2022-08-30 00:57:04 +08:00
shutil . copy (
str ( test_file_dir / " bad_video_sample.mp4 " ) , tmp_not_playable_vid . name
)
2023-05-20 05:22:12 +08:00
with patch ( " os.remove " , wraps = os . remove ) as mock_remove :
playable_vid = processing_utils . convert_video_to_playable_mp4 (
tmp_not_playable_vid . name
)
# check tempfile got deleted
assert not Path ( mock_remove . call_args [ 0 ] [ 0 ] ) . exists ( )
2022-12-16 04:37:09 +08:00
assert processing_utils . video_is_playable ( playable_vid )
2022-08-17 01:21:13 +08:00
2022-10-19 03:58:01 +08:00
@patch ( " ffmpy.FFmpeg.run " , side_effect = raise_ffmpy_runtime_exception )
def test_video_conversion_returns_original_video_if_fails (
self , mock_run , test_file_dir
) :
2022-12-16 04:37:09 +08:00
with tempfile . NamedTemporaryFile (
suffix = " out.avi " , delete = False
) as tmp_not_playable_vid :
2022-10-19 03:58:01 +08:00
shutil . copy (
str ( test_file_dir / " bad_video_sample.mp4 " ) , tmp_not_playable_vid . name
)
2022-12-16 04:37:09 +08:00
playable_vid = processing_utils . convert_video_to_playable_mp4 (
2022-10-19 03:58:01 +08:00
tmp_not_playable_vid . name
)
# If the conversion succeeded it'd be .mp4
2023-03-09 04:24:09 +08:00
assert Path ( playable_vid ) . suffix == " .avi "
2024-02-15 01:43:41 +08:00
def test_add_root_url ( ) :
data = {
" file " : {
" path " : " path " ,
" url " : " /file=path " ,
} ,
" file2 " : {
" path " : " path2 " ,
" url " : " https://www.gradio.app " ,
} ,
}
root_url = " http://localhost:7860 "
expected = {
" file " : {
" path " : " path " ,
" url " : f " { root_url } /file=path " ,
} ,
" file2 " : {
" path " : " path2 " ,
" url " : " https://www.gradio.app " ,
} ,
}
assert processing_utils . add_root_url ( data , root_url , None ) == expected
new_root_url = " https://1234.gradio.live "
new_expected = {
" file " : {
" path " : " path " ,
2024-03-04 23:52:49 +08:00
" url " : f " { new_root_url } /file=path " ,
2024-02-15 01:43:41 +08:00
} ,
" file2 " : {
" path " : " path2 " ,
" url " : " https://www.gradio.app " ,
} ,
}
assert (
2024-03-04 23:52:49 +08:00
processing_utils . add_root_url ( expected , new_root_url , root_url ) == new_expected
2024-02-15 01:43:41 +08:00
)