import base64
import csv
import os.path
import shutil
import tarfile
import tempfile
from typing import List
import xmltodict
import yaml
from bs4 import BeautifulSoup
from cairosvg import svg2png
from celery.utils.log import get_task_logger
from fpdf import FPDF
from json2xml.utils import readfromjson
from markupsafe import escape
from PIL import Image
from pyramid.httpexceptions import HTTPUnprocessableEntity
from pyramid.response import FileResponse
from weaver.formats import OutputFormat, get_extension
from weaver.transform.png2svg import rgba_image_to_svg_contiguous
from weaver.transform.tiff import Tiff
from weaver.transform.utils import get_content, is_gif, is_image, is_png, is_svg, is_tiff, write_content
[docs]
LOGGER = get_task_logger(__name__)
[docs]
HTML_CONTENT = """<html>
<head></head>
<body><p>%CONTENT%</p></body>
</html>"""
[docs]
def exception_handler(func):
"""
Decorator to handle exceptions in functions and log them.
:param func: Function to wrap with exception handling.
:return: The wrapped function.
"""
def inner_function(*args, **kwargs):
try:
if "_to_" in func.__name__:
LOGGER.debug(f"{func.__name__} operation: [%s] -> [%s]", os.path.basename(args[0]),
os.path.basename(args[1]))
func(*args, **kwargs)
except Exception:
raise
return inner_function
@exception_handler
[docs]
def image_to_any(image: str, out: str) -> None:
"""
Converts image files to a specified output format. If no conversion is needed, it copies the file.
:param image: Input image file path.
:param out: Output file path.
"""
# exit if no transformation needed
if os.path.splitext(image)[1] == os.path.splitext(out)[1]:
if not os.path.exists(out):
shutil.copy(image, out)
return
if is_tiff(image):
tif = Tiff(image)
return images_to_any(tif.get_images(), out)
if is_gif(image):
return images_to_any([Image.open(image).convert("RGB")], out)
if is_svg(image):
png = f"{image}.png"
with open(image, "rb") as svg_file:
svg_data = svg_file.read()
with open(png, "wb") as png_file:
svg2png(svg_data, write_to=png_file)
image = png
return images_to_any([Image.open(image)], out)
[docs]
def images_to_any(images: List[Image.Image], out: str) -> None:
"""
Processes a list of images and converts them to the desired format, saving them in the specified output path.
:param images: List of Image objects to process.
:param out: Output file path.
"""
ret = []
with tempfile.TemporaryDirectory() as tmp_path:
_o = os.path.join(tmp_path, str(len(ret)).zfill(4) + get_extension(out))
for img in images:
clrs = img.getpixel((0, 0))
if not isinstance(clrs, tuple):
img = img.convert("RGB")
clrs = img.getpixel((0, 0))
if is_svg(_o):
width, height = img.size
basewidth = 300
if max(width, height) > basewidth:
wpercent = basewidth / float(img.size[0])
hsize = int((float(img.size[1]) * float(wpercent)))
img = img.resize((basewidth, hsize), Image.Resampling.LANCZOS)
if len(clrs) == 3:
img.putalpha(0)
write_content(_o, rgba_image_to_svg_contiguous(img))
elif is_image(_o):
if is_png(_o) and len(clrs) == 3:
img.putalpha(0)
img.save(_o)
if not is_png(_o) and len(clrs) == 4:
img.load()
rbg = Image.new("RGB", img.size, (255, 255, 255))
rbg.paste(img, mask=img.split()[3])
rbg.save(_o)
else:
img.save(_o)
else:
raise RuntimeError(f"Unsupported format: {_o}")
ret.append(_o)
if len(ret) == 1:
shutil.copy(ret[0], out)
else:
if not out.endswith(".tar.gz"):
out += ".tar.gz"
with tarfile.open(out, "w:gz") as tar:
for file_name in ret:
path = os.path.join(tmp_path, file_name)
tar.add(path, arcname=file_name)
@exception_handler
[docs]
def any_to_html(i: str, out: str) -> None:
"""
Converts any content type (text or image) to HTML format.
:param i: Input file path.
:param out: Output file path.
"""
try:
if not is_image(i):
content = get_content(i)
# Escape and replace content in HTML
html_content = HTML_CONTENT.replace("%CONTENT%", escape(content)) # Use escape from markupsafe
write_content(out, html_content)
else:
jpg = f"{i}.jpg"
image_to_any(i, jpg)
with open(jpg, "rb") as img_file:
img_data = base64.b64encode(img_file.read()).decode("utf-8") # Base64 encode the image content
write_content(out, HTML_CONTENT.replace(
"%CONTENT%", f"<img src=\"data:image/jpeg;base64,{img_data}\" alt=\"Result\" />"))
except Exception as err:
print(f"An error occurred: {str(err)}") # Print the error message
raise RuntimeError(f"Error processing file {i}: {str(err)}")
@exception_handler
[docs]
def any_to_pdf(i: str, out: str) -> None:
"""
Converts a file to PDF format. If the file is an image, it is embedded in the PDF, otherwise, it is treated as text.
:param i: Input file path.
:param out: Output PDF file path.
"""
image = Image.open(i) if is_image(i) and not is_svg(i) else None
new_pdf = FPDF(orientation="P", unit="pt", format="A4")
if image is None:
# If input is not an image, treat it as text
new_pdf.add_page()
new_pdf.set_font("Arial", size=12)
new_pdf.multi_cell(0, 10, txt=get_content(i), align="L")
else:
if is_tiff(i):
tiff = Tiff(i)
images = tiff.get_images() # For TIFF files with multiple pages
else:
images = [image.convert("RGB")]
new_pdf.set_margins(10, 10)
pdf_width = new_pdf.w - 20
pdf_height = new_pdf.h - 20
for img in images:
image_w, image_h = img.size
if image_w > image_h:
new_pdf.add_page(orientation="L")
_w, _h = pdf_height, pdf_width
else:
new_pdf.add_page(orientation="P")
_w, _h = pdf_width, pdf_height
# Scale image down to fit within the PDF page while keeping aspect ratio
aspect_ratio = image_w / image_h
if image_w > _w:
image_w = _w
image_h = image_w / aspect_ratio
if image_h > _h:
image_h = _h
image_w = image_h * aspect_ratio
# Center the image on the page
x_offset = (_w - image_w) / 2
y_offset = (_h - image_h) / 2
# Add the image to the PDF
im_path = os.path.join(tempfile.gettempdir(), "temp_image.jpg")
img.save(im_path) # Save image to temp path for FPDF
new_pdf.image(im_path, x=x_offset, y=y_offset, w=image_w, h=image_h)
new_pdf.output(out, "F")
@exception_handler
[docs]
def csv_to_json(i: str, out: str) -> None:
"""
Converts a CSV file to a JSON file with a 'datas' key containing the rows.
:param i: Path to the input CSV file.
:param out: Path to the output JSON file.
"""
with open(i, encoding="utf-8") as csvf:
csv_reader = csv.DictReader(csvf)
for idx, fieldname in enumerate(csv_reader.fieldnames):
if fieldname == "":
csv_reader.fieldnames[idx] = f"unknown_{idx}"
ret = []
for rows in csv_reader:
ret.append({"data": rows})
write_content(out, {"datas": ret})
@exception_handler
[docs]
def csv_to_xml(i: str, out: str) -> None:
"""
Converts a CSV file to an XML file by first converting it to JSON.
:param i: Path to the input CSV file.
:param out: Path to the output XML file.
"""
file = f"{i}.json"
csv_to_json(i, file)
data = readfromjson(file)
xml_content = OutputFormat.convert(data, OutputFormat.XML_RAW)
write_content(out, xml_content)
@exception_handler
[docs]
def json_to_xml(i: str, out: str) -> None:
"""
Converts a JSON file to an XML file.
:param i: Path to the input JSON file.
:param out: Path to the output XML file.
"""
data = readfromjson(i)
xml_content = OutputFormat.convert(data, OutputFormat.XML_RAW)
write_content(out, xml_content)
@exception_handler
[docs]
def json_to_txt(i: str, out: str) -> None:
"""
Converts a JSON file to a text file.
:param i: Path to the input JSON file.
:param out: Path to the output text file.
"""
data = readfromjson(i)
txt_content = OutputFormat.convert(data, OutputFormat.JSON_STR)
write_content(out, txt_content)
@exception_handler
[docs]
def json_to_yaml(i: str, out: str) -> None:
"""
Converts a JSON file to a YAML file.
:param i: Path to the input JSON file.
:param out: Path to the output YAML file.
"""
data = readfromjson(i)
yaml_content = OutputFormat.convert(data, OutputFormat.YAML)
write_content(out, yaml_content)
@exception_handler
[docs]
def yaml_to_json(i: str, out: str) -> None:
"""
Converts a YAML file to a JSON file.
:param i: Path to the input YAML file.
:param out: Path to the output JSON file.
"""
with open(i, "r", encoding="utf-8") as file:
data = yaml.safe_load(file)
write_content(out, data)
@exception_handler
[docs]
def json_to_csv(i: str, out: str) -> None:
"""
Converts a JSON file to a CSV file.
:param i: Path to the input JSON file.
:param out: Path to the output CSV file.
"""
data = readfromjson(i)
csv_content = OutputFormat.convert(data, OutputFormat.CSV)
write_content(out, csv_content)
@exception_handler
[docs]
def xml_to_json(i: str, out: str) -> None:
"""
Converts an XML file to a JSON file.
:param i: Path to the input XML file.
:param out: Path to the output JSON file.
"""
write_content(out, xmltodict.parse(get_content(i)))
@exception_handler
[docs]
def html_to_txt(i: str, out: str) -> None:
"""
Converts an HTML file to a text file.
:param i: Path to the input HTML file.
:param out: Path to the output text file.
"""
write_content(out, " ".join(BeautifulSoup(get_content(i), "html.parser").stripped_strings))
@exception_handler
[docs]
def yaml_to_csv(i: str, out: str) -> None:
"""
Converts a YAML file to a CSV file by first converting it to JSON.
:param i: Path to the input YAML file.
:param out: Path to the output CSV file.
"""
yaml_to_json(i, f"{i}.json")
json_to_csv(f"{i}.json", out)
@exception_handler
[docs]
def yaml_to_xml(i: str, out: str) -> None:
"""
Converts a YAML file to an XML file.
:param i: Path to the input YAML file.
:param out: Path to the output XML file.
"""
with open(i, "r", encoding="utf-8") as file:
data = yaml.safe_load(file)
xml_content = OutputFormat.convert(data, OutputFormat.XML_RAW)
write_content(out, xml_content)
@exception_handler
[docs]
def xml_to_yaml(i: str, out: str) -> None:
"""
Converts an XML file to a YAML file.
:param i: Path to the input XML file.
:param out: Path to the output YAML file.
"""
data = xmltodict.parse(get_content(i))
yaml_content = OutputFormat.convert(data, OutputFormat.YAML)
write_content(out, yaml_content)
@exception_handler
[docs]
def csv_to_yaml(i: str, out: str) -> None:
"""
Converts a CSV file to a YAML file by first converting it to JSON.
:param i: Path to the input CSV file.
:param out: Path to the output YAML file.
"""
csv_to_json(i, f"{i}.json")
json_to_yaml(f"{i}.json", out)