# Copyright (c) OpenMMLab. All rights reserved. import os import os.path as osp import sys import tempfile from contextlib import contextmanager from copy import deepcopy from pathlib import Path from shutil import SameFileError from unittest import TestCase from unittest.mock import MagicMock, patch from mmengine.fileio.backends import PetrelBackend from mmengine.utils import has_method @contextmanager def build_temporary_directory(): """Build a temporary directory containing many files to test ``FileClient.list_dir_or_file``. . \n | -- dir1 \n | -- | -- text3.txt \n | -- dir2 \n | -- | -- dir3 \n | -- | -- | -- text4.txt \n | -- | -- img.jpg \n | -- text1.txt \n | -- text2.txt \n """ with tempfile.TemporaryDirectory() as tmp_dir: text1 = Path(tmp_dir) / 'text1.txt' text1.open('w').write('text1') text2 = Path(tmp_dir) / 'text2.txt' text2.open('w').write('text2') dir1 = Path(tmp_dir) / 'dir1' dir1.mkdir() text3 = dir1 / 'text3.txt' text3.open('w').write('text3') dir2 = Path(tmp_dir) / 'dir2' dir2.mkdir() jpg1 = dir2 / 'img.jpg' jpg1.open('wb').write(b'img') dir3 = dir2 / 'dir3' dir3.mkdir() text4 = dir3 / 'text4.txt' text4.open('w').write('text4') yield tmp_dir try: # Other unit tests may mock these modules so we need to pop them first. sys.modules.pop('petrel_client', None) sys.modules.pop('petrel_client.client', None) # If petrel_client is imported successfully, we can test PetrelBackend # without mock. import petrel_client # noqa: F401 except ImportError: sys.modules['petrel_client'] = MagicMock() sys.modules['petrel_client.client'] = MagicMock() class MockPetrelClient: def __init__(self, enable_mc=True, enable_multi_cluster=False, conf_path=None): self.enable_mc = enable_mc self.enable_multi_cluster = enable_multi_cluster self.conf_path = conf_path def Get(self, filepath): with open(filepath, 'rb') as f: content = f.read() return content def put(self): pass def delete(self): pass def contains(self): pass def isdir(self): pass def list(self, dir_path): for entry in os.scandir(dir_path): if not entry.name.startswith('.') and entry.is_file(): yield entry.name elif osp.isdir(entry.path): yield entry.name + '/' @contextmanager def delete_and_reset_method(obj, method): method_obj = deepcopy(getattr(type(obj), method)) try: delattr(type(obj), method) yield finally: setattr(type(obj), method, method_obj) @patch('petrel_client.client.Client', MockPetrelClient) class TestPetrelBackend(TestCase): @classmethod def setUpClass(cls): cls.test_data_dir = Path(__file__).parent.parent.parent / 'data' cls.img_path = cls.test_data_dir / 'color.jpg' cls.img_shape = (300, 400, 3) cls.text_path = cls.test_data_dir / 'filelist.txt' cls.petrel_dir = 'petrel://user/data' cls.petrel_path = f'{cls.petrel_dir}/test.jpg' cls.expected_dir = 's3://user/data' cls.expected_path = f'{cls.expected_dir}/test.jpg' def test_name(self): backend = PetrelBackend() self.assertEqual(backend.name, 'PetrelBackend') def test_map_path(self): backend = PetrelBackend(path_mapping=None) self.assertEqual( backend._map_path(self.petrel_path), self.petrel_path) backend = PetrelBackend( path_mapping={'data/': 'petrel://user/data/'}) self.assertEqual( backend._map_path('data/test.jpg'), self.petrel_path) def test_format_path(self): backend = PetrelBackend() formatted_filepath = backend._format_path( 'petrel://user\\data\\test.jpg') self.assertEqual(formatted_filepath, self.petrel_path) def test_replace_prefix(self): backend = PetrelBackend() self.assertEqual( backend._replace_prefix(self.petrel_path), self.expected_path) def test_join_path(self): backend = PetrelBackend() self.assertEqual( backend.join_path(self.petrel_dir, 'file'), f'{self.petrel_dir}/file') self.assertEqual( backend.join_path(f'{self.petrel_dir}/', 'file'), f'{self.petrel_dir}/file') self.assertEqual( backend.join_path(f'{self.petrel_dir}/', '/file'), f'{self.petrel_dir}/file') self.assertEqual( backend.join_path(self.petrel_dir, 'dir', 'file'), f'{self.petrel_dir}/dir/file') def test_get(self): backend = PetrelBackend() with patch.object( backend._client, 'Get', return_value=b'petrel') as patched_get: self.assertEqual(backend.get(self.petrel_path), b'petrel') patched_get.assert_called_once_with(self.expected_path) def test_get_text(self): backend = PetrelBackend() with patch.object( backend._client, 'Get', return_value=b'petrel') as patched_get: self.assertEqual(backend.get_text(self.petrel_path), 'petrel') patched_get.assert_called_once_with(self.expected_path) def test_put(self): backend = PetrelBackend() with patch.object(backend._client, 'put') as patched_put: backend.put(b'petrel', self.petrel_path) patched_put.assert_called_once_with(self.expected_path, b'petrel') def test_put_text(self): backend = PetrelBackend() with patch.object(backend._client, 'put') as patched_put: backend.put_text('petrel', self.petrel_path) patched_put.assert_called_once_with(self.expected_path, b'petrel') def test_exists(self): backend = PetrelBackend() self.assertTrue(has_method(backend._client, 'contains')) self.assertTrue(has_method(backend._client, 'isdir')) # raise Exception if `_client.contains` and '_client.isdir' are not # implemented with delete_and_reset_method(backend._client, 'contains'), \ delete_and_reset_method(backend._client, 'isdir'): self.assertFalse(has_method(backend._client, 'contains')) self.assertFalse(has_method(backend._client, 'isdir')) with self.assertRaises(NotImplementedError): backend.exists(self.petrel_path) with patch.object( backend._client, 'contains', return_value=True) as patched_contains: self.assertTrue(backend.exists(self.petrel_path)) patched_contains.assert_called_once_with(self.expected_path) def test_isdir(self): backend = PetrelBackend() self.assertTrue(has_method(backend._client, 'isdir')) # raise Exception if `_client.isdir` is not implemented with delete_and_reset_method(backend._client, 'isdir'): self.assertFalse(has_method(backend._client, 'isdir')) with self.assertRaises(NotImplementedError): backend.isdir(self.petrel_path) with patch.object( backend._client, 'isdir', return_value=True) as patched_contains: self.assertTrue(backend.isdir(self.petrel_path)) patched_contains.assert_called_once_with(self.expected_path) def test_isfile(self): backend = PetrelBackend() self.assertTrue(has_method(backend._client, 'contains')) # raise Exception if `_client.contains` is not implemented with delete_and_reset_method(backend._client, 'contains'): self.assertFalse(has_method(backend._client, 'contains')) with self.assertRaises(NotImplementedError): backend.isfile(self.petrel_path) with patch.object( backend._client, 'contains', return_value=True) as patched_contains: self.assertTrue(backend.isfile(self.petrel_path)) patched_contains.assert_called_once_with(self.expected_path) def test_get_local_path(self): backend = PetrelBackend() with patch.object(backend._client, 'Get', return_value=b'petrel') as patched_get, \ patch.object(backend._client, 'contains', return_value=True) as patch_contains: with backend.get_local_path(self.petrel_path) as path: self.assertTrue(osp.isfile(path)) self.assertEqual(Path(path).open('rb').read(), b'petrel') # exist the with block and path will be released self.assertFalse(osp.isfile(path)) patched_get.assert_called_once_with(self.expected_path) patch_contains.assert_called_once_with(self.expected_path) def test_copyfile(self): backend = PetrelBackend() with patch.object(backend._client, 'Get', return_value=b'petrel') as patched_get, \ patch.object(backend._client, 'put') as patched_put, \ patch.object(backend._client, 'isdir', return_value=False) as \ patched_isdir: src = self.petrel_path dst = f'{self.petrel_dir}/test.bak.jpg' expected_dst = f'{self.expected_dir}/test.bak.jpg' self.assertEqual(backend.copyfile(src, dst), dst) patched_get.assert_called_once_with(self.expected_path) patched_put.assert_called_once_with(expected_dst, b'petrel') patched_isdir.assert_called_once_with(expected_dst) with patch.object(backend._client, 'Get', return_value=b'petrel') as patched_get, \ patch.object(backend._client, 'put') as patched_put, \ patch.object(backend._client, 'isdir', return_value=True) as \ patched_isdir: # dst is a directory dst = f'{self.petrel_dir}/dir' expected_dst = f'{self.expected_dir}/dir/test.jpg' self.assertEqual(backend.copyfile(src, dst), f'{dst}/test.jpg') patched_get.assert_called_once_with(self.expected_path) patched_put.assert_called_once_with(expected_dst, b'petrel') patched_isdir.assert_called_once_with( f'{self.expected_dir}/dir') with patch.object(backend._client, 'Get', return_value=b'petrel') as patched_get, \ patch.object(backend._client, 'isdir', return_value=False) as \ patched_isdir: # src and src should not be same file with self.assertRaises(SameFileError): backend.copyfile(src, src) def test_copytree(self): backend = PetrelBackend() put_inputs = [] get_inputs = [] def put(obj, filepath): put_inputs.append((obj, filepath)) def get(filepath): get_inputs.append(filepath) with build_temporary_directory() as tmp_dir, \ patch.object(backend, 'put', side_effect=put),\ patch.object(backend, 'get', side_effect=get),\ patch.object(backend, 'exists', return_value=False): tmp_dir = tmp_dir.replace('\\', '/') dst = f'{tmp_dir}/dir' self.assertEqual(backend.copytree(tmp_dir, dst), dst) self.assertEqual(len(put_inputs), 5) self.assertEqual(len(get_inputs), 5) # dst should not exist with patch.object(backend, 'exists', return_value=True): with self.assertRaises(FileExistsError): backend.copytree(dst, tmp_dir) def test_copyfile_from_local(self): backend = PetrelBackend() with patch.object(backend._client, 'put') as patched_put, \ patch.object(backend._client, 'isdir', return_value=False) \ as patched_isdir: src = self.img_path dst = f'{self.petrel_dir}/color.bak.jpg' expected_dst = f'{self.expected_dir}/color.bak.jpg' self.assertEqual(backend.copyfile_from_local(src, dst), dst) patched_put.assert_called_once_with(expected_dst, src.open('rb').read()) patched_isdir.assert_called_once_with(expected_dst) with patch.object(backend._client, 'put') as patched_put, \ patch.object(backend._client, 'isdir', return_value=True) as \ patched_isdir: # dst is a directory src = self.img_path dst = f'{self.petrel_dir}/dir' expected_dst = f'{self.expected_dir}/dir/color.jpg' self.assertEqual( backend.copyfile_from_local(src, dst), f'{dst}/color.jpg') patched_put.assert_called_once_with(expected_dst, src.open('rb').read()) patched_isdir.assert_called_once_with( f'{self.expected_dir}/dir') def test_copytree_from_local(self): backend = PetrelBackend() inputs = [] def copyfile_from_local(src, dst): inputs.append((src, dst)) with build_temporary_directory() as tmp_dir, \ patch.object(backend, 'copyfile_from_local', side_effect=copyfile_from_local),\ patch.object(backend, 'exists', return_value=False): backend.copytree_from_local(tmp_dir, self.petrel_dir) self.assertEqual(len(inputs), 5) # dst should not exist with patch.object(backend, 'exists', return_value=True): with self.assertRaises(FileExistsError): backend.copytree_from_local(tmp_dir, self.petrel_dir) def test_copyfile_to_local(self): backend = PetrelBackend() with patch.object(backend._client, 'Get', return_value=b'petrel') as patched_get, \ tempfile.TemporaryDirectory() as tmp_dir: src = self.petrel_path dst = Path(tmp_dir) / 'test.bak.jpg' self.assertEqual(backend.copyfile_to_local(src, dst), dst) patched_get.assert_called_once_with(self.expected_path) self.assertEqual(dst.open('rb').read(), b'petrel') with patch.object(backend._client, 'Get', return_value=b'petrel') as patched_get, \ tempfile.TemporaryDirectory() as tmp_dir: # dst is a directory src = self.petrel_path dst = Path(tmp_dir) / 'dir' dst.mkdir() self.assertEqual( backend.copyfile_to_local(src, dst), dst / 'test.jpg') patched_get.assert_called_once_with(self.expected_path) self.assertEqual((dst / 'test.jpg').open('rb').read(), b'petrel') def test_copytree_to_local(self): backend = PetrelBackend() inputs = [] def get(filepath): inputs.append(filepath) return b'petrel' with build_temporary_directory() as tmp_dir, \ patch.object(backend, 'get', side_effect=get): dst = f'{tmp_dir}/dir' backend.copytree_to_local(tmp_dir, dst) self.assertEqual(len(inputs), 5) def test_remove(self): backend = PetrelBackend() self.assertTrue(has_method(backend._client, 'delete')) # raise Exception if `delete` is not implemented with delete_and_reset_method(backend._client, 'delete'): self.assertFalse(has_method(backend._client, 'delete')) with self.assertRaises(NotImplementedError): backend.remove(self.petrel_path) with patch.object(backend._client, 'delete') as patched_delete, \ patch.object(backend._client, 'isdir', return_value=False) \ as patched_isdir, \ patch.object(backend._client, 'contains', return_value=True) \ as patched_contains: backend.remove(self.petrel_path) patched_delete.assert_called_once_with(self.expected_path) patched_isdir.assert_called_once_with(self.expected_path) patched_contains.assert_called_once_with(self.expected_path) def test_rmtree(self): backend = PetrelBackend() inputs = [] def remove(filepath): inputs.append(filepath) with build_temporary_directory() as tmp_dir,\ patch.object(backend, 'remove', side_effect=remove): backend.rmtree(tmp_dir) self.assertEqual(len(inputs), 5) def test_copy_if_symlink_fails(self): backend = PetrelBackend() copyfile_inputs = [] copytree_inputs = [] def copyfile(src, dst): copyfile_inputs.append((src, dst)) def copytree(src, dst): copytree_inputs.append((src, dst)) with patch.object(backend, 'copyfile', side_effect=copyfile), \ patch.object(backend, 'isfile', return_value=True): backend.copy_if_symlink_fails(self.petrel_path, 'path') self.assertEqual(len(copyfile_inputs), 1) with patch.object(backend, 'copytree', side_effect=copytree), \ patch.object(backend, 'isfile', return_value=False): backend.copy_if_symlink_fails(self.petrel_dir, 'path') self.assertEqual(len(copytree_inputs), 1) def test_list_dir_or_file(self): backend = PetrelBackend() # raise Exception if `_client.list` is not implemented self.assertTrue(has_method(backend._client, 'list')) with delete_and_reset_method(backend._client, 'list'): self.assertFalse(has_method(backend._client, 'list')) with self.assertRaises(NotImplementedError): list(backend.list_dir_or_file(self.petrel_dir)) with build_temporary_directory() as tmp_dir: # list directories and files self.assertEqual( set(backend.list_dir_or_file(tmp_dir)), {'dir1', 'dir2', 'text1.txt', 'text2.txt'}) # list directories and files recursively self.assertEqual( set(backend.list_dir_or_file(tmp_dir, recursive=True)), { 'dir1', '/'.join(('dir1', 'text3.txt')), 'dir2', '/'.join(('dir2', 'dir3')), '/'.join( ('dir2', 'dir3', 'text4.txt')), '/'.join( ('dir2', 'img.jpg')), 'text1.txt', 'text2.txt' }) # only list directories self.assertEqual( set(backend.list_dir_or_file(tmp_dir, list_file=False)), {'dir1', 'dir2'}) with self.assertRaisesRegex( TypeError, '`list_dir` should be False when `suffix` is not None' ): backend.list_dir_or_file( tmp_dir, list_file=False, suffix='.txt') # only list directories recursively self.assertEqual( set( backend.list_dir_or_file( tmp_dir, list_file=False, recursive=True)), {'dir1', 'dir2', '/'.join(('dir2', 'dir3'))}) # only list files self.assertEqual( set(backend.list_dir_or_file(tmp_dir, list_dir=False)), {'text1.txt', 'text2.txt'}) # only list files recursively self.assertEqual( set( backend.list_dir_or_file( tmp_dir, list_dir=False, recursive=True)), { '/'.join(('dir1', 'text3.txt')), '/'.join( ('dir2', 'dir3', 'text4.txt')), '/'.join( ('dir2', 'img.jpg')), 'text1.txt', 'text2.txt' }) # only list files ending with suffix self.assertEqual( set( backend.list_dir_or_file( tmp_dir, list_dir=False, suffix='.txt')), {'text1.txt', 'text2.txt'}) self.assertEqual( set( backend.list_dir_or_file( tmp_dir, list_dir=False, suffix=('.txt', '.jpg'))), {'text1.txt', 'text2.txt'}) with self.assertRaisesRegex( TypeError, '`suffix` must be a string or tuple of strings'): backend.list_dir_or_file( tmp_dir, list_dir=False, suffix=['.txt', '.jpg']) # only list files ending with suffix recursively self.assertEqual( set( backend.list_dir_or_file( tmp_dir, list_dir=False, suffix='.txt', recursive=True)), { '/'.join(('dir1', 'text3.txt')), '/'.join( ('dir2', 'dir3', 'text4.txt')), 'text1.txt', 'text2.txt' }) # only list files ending with suffix self.assertEqual( set( backend.list_dir_or_file( tmp_dir, list_dir=False, suffix=('.txt', '.jpg'), recursive=True)), { '/'.join(('dir1', 'text3.txt')), '/'.join( ('dir2', 'dir3', 'text4.txt')), '/'.join( ('dir2', 'img.jpg')), 'text1.txt', 'text2.txt' }) def test_generate_presigned_url(self): pass else: class TestPetrelBackend(TestCase): # type: ignore @classmethod def setUpClass(cls): cls.test_data_dir = Path(__file__).parent.parent.parent / 'data' cls.local_img_path = cls.test_data_dir / 'color.jpg' cls.local_img_shape = (300, 400, 3) cls.petrel_dir = 'petrel://mmengine-test/data' def setUp(self): backend = PetrelBackend() backend.rmtree(self.petrel_dir) with build_temporary_directory() as tmp_dir: backend.copytree_from_local(tmp_dir, self.petrel_dir) text1_path = f'{self.petrel_dir}/text1.txt' text2_path = f'{self.petrel_dir}/text2.txt' text3_path = f'{self.petrel_dir}/dir1/text3.txt' text4_path = f'{self.petrel_dir}/dir2/dir3/text4.txt' img_path = f'{self.petrel_dir}/dir2/img.jpg' self.assertTrue(backend.isfile(text1_path)) self.assertTrue(backend.isfile(text2_path)) self.assertTrue(backend.isfile(text3_path)) self.assertTrue(backend.isfile(text4_path)) self.assertTrue(backend.isfile(img_path)) def test_get(self): backend = PetrelBackend() img_path = f'{self.petrel_dir}/dir2/img.jpg' self.assertEqual(backend.get(img_path), b'img') def test_get_text(self): backend = PetrelBackend() text_path = f'{self.petrel_dir}/text1.txt' self.assertEqual(backend.get_text(text_path), 'text1') def test_put(self): backend = PetrelBackend() img_path = f'{self.petrel_dir}/img.jpg' backend.put(b'img', img_path) def test_put_text(self): backend = PetrelBackend() text_path = f'{self.petrel_dir}/text5.txt' backend.put_text('text5', text_path) def test_exists(self): backend = PetrelBackend() # file and directory exist dir_path = f'{self.petrel_dir}/dir2' self.assertTrue(backend.exists(dir_path)) img_path = f'{self.petrel_dir}/dir2/img.jpg' self.assertTrue(backend.exists(img_path)) # file and directory does not exist not_existed_dir = f'{self.petrel_dir}/not_existed_dir' self.assertFalse(backend.exists(not_existed_dir)) not_existed_path = f'{self.petrel_dir}/img.jpg' self.assertFalse(backend.exists(not_existed_path)) def test_isdir(self): backend = PetrelBackend() dir_path = f'{self.petrel_dir}/dir2' self.assertTrue(backend.isdir(dir_path)) img_path = f'{self.petrel_dir}/dir2/img.jpg' self.assertFalse(backend.isdir(img_path)) def test_isfile(self): backend = PetrelBackend() dir_path = f'{self.petrel_dir}/dir2' self.assertFalse(backend.isfile(dir_path)) img_path = f'{self.petrel_dir}/dir2/img.jpg' self.assertTrue(backend.isfile(img_path)) def test_get_local_path(self): backend = PetrelBackend() img_path = f'{self.petrel_dir}/dir2/img.jpg' with backend.get_local_path(img_path) as path: self.assertTrue(osp.isfile(path)) self.assertEqual(Path(path).open('rb').read(), b'img') # exist the with block and path will be released self.assertFalse(osp.isfile(path)) def test_copyfile(self): backend = PetrelBackend() # dst is a file src = f'{self.petrel_dir}/dir2/img.jpg' dst = f'{self.petrel_dir}/img.jpg' self.assertEqual(backend.copyfile(src, dst), dst) self.assertTrue(backend.isfile(dst)) # dst is a directory dst = f'{self.petrel_dir}/dir1' expected_dst = f'{self.petrel_dir}/dir1/img.jpg' self.assertEqual(backend.copyfile(src, dst), expected_dst) self.assertTrue(backend.isfile(expected_dst)) # src and src should not be same file with self.assertRaises(SameFileError): backend.copyfile(src, src) def test_copytree(self): backend = PetrelBackend() src = f'{self.petrel_dir}/dir2' dst = f'{self.petrel_dir}/dir3' self.assertFalse(backend.exists(dst)) self.assertEqual(backend.copytree(src, dst), dst) self.assertEqual( list(backend.list_dir_or_file(src)), list(backend.list_dir_or_file(dst))) # dst should not exist with self.assertRaises(FileExistsError): backend.copytree(src, dst) def test_copyfile_from_local(self): backend = PetrelBackend() # dst is a file src = self.local_img_path dst = f'{self.petrel_dir}/color.jpg' self.assertFalse(backend.exists(dst)) self.assertEqual(backend.copyfile_from_local(src, dst), dst) self.assertTrue(backend.isfile(dst)) # dst is a directory src = self.local_img_path dst = f'{self.petrel_dir}/dir1' expected_dst = f'{self.petrel_dir}/dir1/color.jpg' self.assertFalse(backend.exists(expected_dst)) self.assertEqual( backend.copyfile_from_local(src, dst), expected_dst) self.assertTrue(backend.isfile(expected_dst)) def test_copytree_from_local(self): backend = PetrelBackend() backend.rmtree(self.petrel_dir) with build_temporary_directory() as tmp_dir: backend.copytree_from_local(tmp_dir, self.petrel_dir) files = backend.list_dir_or_file( self.petrel_dir, recursive=True) self.assertEqual(len(list(files)), 8) def test_copyfile_to_local(self): backend = PetrelBackend() with tempfile.TemporaryDirectory() as tmp_dir: # dst is a file src = f'{self.petrel_dir}/dir2/img.jpg' dst = Path(tmp_dir) / 'img.jpg' self.assertEqual(backend.copyfile_to_local(src, dst), dst) self.assertEqual(dst.open('rb').read(), b'img') # dst is a directory dst = Path(tmp_dir) / 'dir' dst.mkdir() self.assertEqual( backend.copyfile_to_local(src, dst), dst / 'img.jpg') self.assertEqual((dst / 'img.jpg').open('rb').read(), b'img') def test_copytree_to_local(self): backend = PetrelBackend() with tempfile.TemporaryDirectory() as tmp_dir: backend.copytree_to_local(self.petrel_dir, tmp_dir) self.assertTrue(osp.exists(Path(tmp_dir) / 'text1.txt')) self.assertTrue(osp.exists(Path(tmp_dir) / 'dir2' / 'img.jpg')) def test_remove(self): backend = PetrelBackend() img_path = f'{self.petrel_dir}/dir2/img.jpg' self.assertTrue(backend.isfile(img_path)) backend.remove(img_path) self.assertFalse(backend.exists(img_path)) def test_rmtree(self): backend = PetrelBackend() dir_path = f'{self.petrel_dir}/dir2' self.assertTrue(backend.isdir(dir_path)) backend.rmtree(dir_path) self.assertFalse(backend.exists(dir_path)) def test_copy_if_symlink_fails(self): backend = PetrelBackend() # dst is a file src = f'{self.petrel_dir}/dir2/img.jpg' dst = f'{self.petrel_dir}/img.jpg' self.assertFalse(backend.exists(dst)) self.assertFalse(backend.copy_if_symlink_fails(src, dst)) self.assertTrue(backend.isfile(dst)) # dst is a directory src = f'{self.petrel_dir}/dir2' dst = f'{self.petrel_dir}/dir' self.assertFalse(backend.exists(dst)) self.assertFalse(backend.copy_if_symlink_fails(src, dst)) self.assertTrue(backend.isdir(dst)) def test_list_dir_or_file(self): backend = PetrelBackend() # list directories and files self.assertEqual( set(backend.list_dir_or_file(self.petrel_dir)), {'dir1', 'dir2', 'text1.txt', 'text2.txt'}) # list directories and files recursively self.assertEqual( set(backend.list_dir_or_file(self.petrel_dir, recursive=True)), { 'dir1', '/'.join(('dir1', 'text3.txt')), 'dir2', '/'.join( ('dir2', 'dir3')), '/'.join( ('dir2', 'dir3', 'text4.txt')), '/'.join( ('dir2', 'img.jpg')), 'text1.txt', 'text2.txt' }) # only list directories self.assertEqual( set( backend.list_dir_or_file(self.petrel_dir, list_file=False)), {'dir1', 'dir2'}) with self.assertRaisesRegex( TypeError, '`list_dir` should be False when `suffix` is not None'): backend.list_dir_or_file( self.petrel_dir, list_file=False, suffix='.txt') # only list directories recursively self.assertEqual( set( backend.list_dir_or_file( self.petrel_dir, list_file=False, recursive=True)), {'dir1', 'dir2', '/'.join(('dir2', 'dir3'))}) # only list files self.assertEqual( set(backend.list_dir_or_file(self.petrel_dir, list_dir=False)), {'text1.txt', 'text2.txt'}) # only list files recursively self.assertEqual( set( backend.list_dir_or_file( self.petrel_dir, list_dir=False, recursive=True)), { '/'.join(('dir1', 'text3.txt')), '/'.join( ('dir2', 'dir3', 'text4.txt')), '/'.join( ('dir2', 'img.jpg')), 'text1.txt', 'text2.txt' }) # only list files ending with suffix self.assertEqual( set( backend.list_dir_or_file( self.petrel_dir, list_dir=False, suffix='.txt')), {'text1.txt', 'text2.txt'}) self.assertEqual( set( backend.list_dir_or_file( self.petrel_dir, list_dir=False, suffix=('.txt', '.jpg'))), {'text1.txt', 'text2.txt'}) with self.assertRaisesRegex( TypeError, '`suffix` must be a string or tuple of strings'): backend.list_dir_or_file( self.petrel_dir, list_dir=False, suffix=['.txt', '.jpg']) # only list files ending with suffix recursively self.assertEqual( set( backend.list_dir_or_file( self.petrel_dir, list_dir=False, suffix='.txt', recursive=True)), { '/'.join(('dir1', 'text3.txt')), '/'.join( ('dir2', 'dir3', 'text4.txt')), 'text1.txt', 'text2.txt' }) # only list files ending with suffix self.assertEqual( set( backend.list_dir_or_file( self.petrel_dir, list_dir=False, suffix=('.txt', '.jpg'), recursive=True)), { '/'.join(('dir1', 'text3.txt')), '/'.join( ('dir2', 'dir3', 'text4.txt')), '/'.join( ('dir2', 'img.jpg')), 'text1.txt', 'text2.txt' })