Module psdi_data_conversion.file_io
@file psdi_data_conversion/file_io.py
Created 2025-02-11 by Bryan Gillis.
Functions and classes related to general filesystem input/output
Functions
def is_archive(filename: str) ‑> bool
-
Expand source code
def is_archive(filename: str) -> bool: """Uses a file's extension to check if it's an archive or not """ return any([filename.endswith(x) for x in const.L_ALL_ARCHIVE_EXTENSIONS])
Uses a file's extension to check if it's an archive or not
def is_supported_archive(filename: str) ‑> bool
-
Expand source code
def is_supported_archive(filename: str) -> bool: """Uses a file's extension to check if it's an archive of a supported type or not """ return any([filename.endswith(x) for x in const.D_SUPPORTED_ARCHIVE_FORMATS])
Uses a file's extension to check if it's an archive of a supported type or not
def pack_zip_or_tar(archive_filename: str,
l_filenames: list[str],
archive_format: str | None = None,
source_dir: str = '.',
cleanup=False) ‑> str-
Expand source code
def pack_zip_or_tar(archive_filename: str, l_filenames: list[str], archive_format: str | None = None, source_dir: str = ".", cleanup=False) -> str: """_summary_ Parameters ---------- archive_filename : str The desired name of the output archive to create, either fully-qualified or relative to the current directory l_filenames : list[str] List of files to be archived, either fully-qualified or relative to `source_dir`. If provided fully-qualified, they will be placed in the root directory of the archive source_dir : str, optional Path to directory containing the files to be archived (default current directory). If filenames are provided fully-qualified, this is ignored cleanup : bool, optional If True, source files will be deleted after the archive is successfully created Returns ------- str The name of the created archive file Raises ------ ValueError If `archive_filename` is not of a valid format FileNotFoundError If one of the listed files does not exist """ if not archive_format and not is_supported_archive(archive_filename): raise ValueError(f"Desired archive filename '{archive_filename}' is not of a supported type. Supported types " f"are: {const.D_SUPPORTED_ARCHIVE_FORMATS.keys()}") # It's supported, so determine the specific format, and provide it and the base of the filename in the forms that # `make_archive` wants if archive_format is None: for _ext, _format in const.D_SUPPORTED_ARCHIVE_FORMATS.items(): if archive_filename.endswith(_ext): archive_format = _format archive_root_filename = split_archive_ext(archive_filename)[0] break else: raise AssertionError("Invalid execution path entered - filename wasn't found with a valid archive " "extension, but it did pass the `is_supported_archive` check") else: archive_root_filename = archive_filename # Check that the provided archive format is valid, and add the appropriate extension to the filename archive_extension: str | None = None for _ext, _format in const.D_SUPPORTED_ARCHIVE_FORMATS.items(): if archive_format == _ext: # Extension was provided instead of the format; we can work with that archive_extension = archive_format archive_format = _format break elif archive_format == _format: archive_extension = _ext break if archive_extension is None: raise ValueError(f"Invalid archive format '{archive_format}'. Valid formats are: " f"{const.D_SUPPORTED_ARCHIVE_FORMATS.keys()}") # Check if the root filename already contained the extension so we don't add it again, and strip it from the # root if archive_root_filename.endswith(archive_extension): archive_filename = archive_root_filename archive_root_filename = archive_root_filename[:-len(archive_extension)] else: archive_filename = archive_root_filename+archive_extension with TemporaryDirectory() as root_dir: # Copy all files from the source dir to the root dir, which is what will be packed l_files_to_cleanup: list[str] = [] for filename in l_filenames: # Check if the filename is fully-qualified, and copy it from wherever it's found if os.path.isfile(filename): copyfile(filename, os.path.join(root_dir, os.path.basename(filename))) l_files_to_cleanup.append(filename) continue qualified_filename = os.path.join(source_dir, filename) if os.path.isfile(qualified_filename): copyfile(qualified_filename, os.path.join(root_dir, os.path.basename(filename))) l_files_to_cleanup.append(qualified_filename) else: raise FileNotFoundError(f"File '{filename}' could not be found, either fully-qualified or relative to " f"{source_dir}") make_archive(archive_root_filename, format=archive_format, root_dir=root_dir) if cleanup: for filename in l_files_to_cleanup: try: os.remove(filename) except Exception: pass # Return the name of the created file return archive_filename
summary
Parameters
archive_filename
:str
- The desired name of the output archive to create, either fully-qualified or relative to the current directory
l_filenames
:list[str]
- List of files to be archived, either fully-qualified or relative to
source_dir
. If provided fully-qualified, they will be placed in the root directory of the archive source_dir
:str
, optional- Path to directory containing the files to be archived (default current directory). If filenames are provided fully-qualified, this is ignored
cleanup
:bool
, optional- If True, source files will be deleted after the archive is successfully created
Returns
str
- The name of the created archive file
Raises
ValueError
- If
archive_filename
is not of a valid format FileNotFoundError
- If one of the listed files does not exist
def split_archive_ext(filename: str) ‑> tuple[str, str]
-
Expand source code
def split_archive_ext(filename: str) -> tuple[str, str]: """Splits a file into a base and an extension, with handling for compound .tar.* extensions """ base, ext = os.path.splitext(filename) if base.endswith(const.TAR_EXTENSION): base, pre_ext = os.path.splitext(base) ext = pre_ext+ext return base, ext
Splits a file into a base and an extension, with handling for compound .tar.* extensions
def unpack_zip_or_tar(archive_filename: str, extract_dir: str = '.') ‑> list[str]
-
Expand source code
def unpack_zip_or_tar(archive_filename: str, extract_dir: str = ".") -> list[str]: """Unpack a zip or tar archive into a temporary directory and return a list of the extracted files Parameters ---------- archive_filename : str Filename of the archive to unpack, either relative or fully-qualified extract_dir : str The directory to extract the contents of the archive to. By default, the current working directory will be used Returns ------- list[str] List of the fully-qualified paths to the extracted files. This is determined by checking the directory contents before and after extraction, so is NOT thread-safe, unless it is otherwise ensured e.g. by using a unique temporary directory for each thread """ qual_archive_filename = os.path.realpath(archive_filename) # Determine if the file is of a known (un)supported archive type, and if it is, whether it's a zip or tar, and # set up arguments appropriately to ensure security unpack_kwargs: dict[str, str] = {} if any([qual_archive_filename.endswith(x) for x in const.L_UNSUPPORTED_ARCHIVE_EXTENSIONS]): raise ValueError(f"The archive file '{qual_archive_filename}' is of an unsupported archive type") elif any([qual_archive_filename.endswith(x) for x in const.D_ZIP_FORMATS]): # Zip types don't support the "filter" kwarg, but use similar security measures by default. This may prompt # a warning, which can be ignored pass elif any([qual_archive_filename.endswith(x) for x in const.D_TAR_FORMATS]): # Tar types need to set up the "filter" argument to ensure no files are unpacked outside the base directory unpack_kwargs["filter"] = "data" else: raise ValueError(f"The archive file '{qual_archive_filename}' is not recognised as a valid archive type") # To determine the names of extracted files, we call `os.listdir` before and after unpacking and look for the new # elements s_dir_before = set(os.listdir(extract_dir)) unpack_archive(qual_archive_filename, extract_dir=extract_dir, **unpack_kwargs) s_dir_after = set(os.listdir(extract_dir)) # Get the new files, and in case they're in a directory, use glob to get their contents s_new_files = s_dir_after.difference(s_dir_before) l_qual_new_files = [os.path.join(extract_dir, x) for x in s_new_files] l_new_globs = [glob.glob(x) if os.path.isfile(x) else glob.glob(os.path.join(x, "**")) for x in l_qual_new_files] # This gives us a list of globs (individual files are set up as globs for consistency), so we unpack to a single # list with nested list comprehension l_new_files = [x for l_glob_files in l_new_globs for x in l_glob_files] # Sort the file list for consistency in output l_new_files.sort(key=lambda s: s.lower()) return l_new_files
Unpack a zip or tar archive into a temporary directory and return a list of the extracted files
Parameters
archive_filename
:str
- Filename of the archive to unpack, either relative or fully-qualified
extract_dir
:str
- The directory to extract the contents of the archive to. By default, the current working directory will be used
Returns
list[str]
- List of the fully-qualified paths to the extracted files. This is determined by checking the directory contents before and after extraction, so is NOT thread-safe, unless it is otherwise ensured e.g. by using a unique temporary directory for each thread