import json
import os
import shutil
import tarfile
import tempfile
from typing import List, Union
from celery.utils.log import get_task_logger
from PIL import Image
from processes.convert import get_field
from weaver.formats import ContentType, get_content_type
from weaver.transform.const import CONVERSION_DICT
[docs]
LOGGER = get_task_logger(__name__)
[docs]
def is_image(image: str) -> bool:
"""
Check if the file is an image based on its MIME content type.
:param image: The file name or path.
:return: True if the file is an image, False otherwise.
"""
ext = os.path.splitext(image)[1]
content_type = get_content_type(ext)
return content_type.startswith("image/")
[docs]
def is_svg(image: str) -> bool:
"""
Check if the file is an SVG image based on its MIME content type.
:param image: The file name or path.
:return: True if the file is SVG, False otherwise.
"""
ext = os.path.splitext(image)[1]
return get_content_type(ext) == ContentType.IMAGE_SVG_XML
[docs]
def is_png(image: str) -> bool:
"""
Check if the file is a PNG image based on its MIME content type.
:param image: The file name or path.
:return: True if the file is PNG, False otherwise.
"""
ext = os.path.splitext(image)[1]
return get_content_type(ext) == ContentType.IMAGE_PNG
[docs]
def is_tiff(image: str) -> bool:
"""
Check if the file is a TIFF image based on its MIME content type.
:param image: The file name or path.
:return: True if the file is TIFF, False otherwise.
"""
ext = os.path.splitext(image)[1]
return get_content_type(ext) in {
ContentType.IMAGE_TIFF,
ContentType.IMAGE_GEOTIFF,
ContentType.IMAGE_OGC_GEOTIFF,
ContentType.IMAGE_COG,
}
[docs]
def is_gif(image: str) -> bool:
"""
Check if the file is a GIF image based on its MIME content type.
:param image: The file name or path.
:return: True if the file is GIF, False otherwise.
"""
ext = os.path.splitext(image)[1]
return get_content_type(ext) == ContentType.IMAGE_GIF
[docs]
def get_content(file_path: str, mode: str = "r") -> str:
"""
Retrieve the content of a file.
:param file_path: The path to the file.
:param mode: The mode in which to open the file. Defaults to "r".
:return: The content of the file as a string.
"""
with open(file_path, mode, encoding="utf-8") as f:
return f.read()
[docs]
def write_content(file_path: str, content: Union[str, dict]) -> None:
"""
Write content to a file.
:param file_path: The path to the file.
:param content: The content to write, can be a string or dictionary.
"""
if isinstance(content, dict):
content = json.dumps(content)
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
[docs]
def write_images(images: List[Image.Image], output_file: str, ext: str = "png") -> None:
"""
Save a list of images to an archive or single file.
:param images: A list of images to save.
:param output_file: The output file name or path.
:param ext: The image format (extension). Defaults to "png".
"""
with tempfile.TemporaryDirectory() as tmp_path:
img_paths = []
for i, img in enumerate(images):
img_path = os.path.join(tmp_path, f"{str(i).zfill(4)}.{ext}")
img.save(img_path)
img_paths.append(img_path)
if len(img_paths) > 1:
if not output_file.endswith(".tar.gz"):
output_file += ".tar.gz"
with tarfile.open(output_file, "w:gz") as tar:
for img_path in img_paths:
tar.add(img_path, arcname=os.path.basename(img_path))
else:
shutil.copy(img_paths[0], output_file)