Source code for weaver.notify

import base64
import logging
import os
import secrets
import smtplib
from typing import TYPE_CHECKING

from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from mako.template import Template
from pyramid.settings import asbool

from weaver.datatype import Job
from weaver.utils import bytes2str, get_settings, str2bytes

if TYPE_CHECKING:
    from weaver.typedefs import AnySettingsContainer, SettingsType

[docs]LOGGER = logging.getLogger(__name__)
__DEFAULT_TEMPLATE__ = """ <%doc> This is an example notification message to be sent by email when a job is done. It is formatted using the Mako template library (https://www.makotemplates.org/). The content must also include the message header. The provided variables are: to: Recipient's address job: weaver.datatype.Job object settings: application settings And every variable returned by the `weaver.datatype.Job.json` method. Below is a non-exhaustive list of example parameters from this method. Refer to the method for complete listing. status: succeeded, failed logs: url to the logs jobID: example "617f23d3-f474-47f9-a8ec-55da9dd6ac71" result: url to the outputs duration: example "0:01:02" message: example "Job succeeded." percentCompleted: example 100 </%doc> From: Weaver To: ${to} Subject: Job ${job.process} ${job.status.title()} Content-Type: text/plain; charset=UTF-8 Dear user, Your job submitted on ${job.created.strftime("%Y/%m/%d %H:%M %Z")} to ${settings.get("weaver.url")} ${job.status}. % if job.status == "succeeded": You can retrieve the output(s) at the following link: ${job.results_url(settings)} % elif job.status == "failed": You can retrieve potential error details from the following link: ${job.exceptions_url(settings)} % endif The job logs are available at the following link: ${job.logs_url(settings)} Regards, Weaver """ __SALT_LENGTH__ = 16 __TOKEN_LENGTH__ = 32 __ROUNDS_LENGTH__ = 4 __DEFAULT_ROUNDS__ = 100_000
[docs]def notify_job_complete(job, to_email_recipient, container): # type: (Job, str, AnySettingsContainer) -> None """ Send email notification of a job completion. """ settings = get_settings(container) smtp_host = settings.get("weaver.wps_email_notify_smtp_host") from_addr = settings.get("weaver.wps_email_notify_from_addr") password = settings.get("weaver.wps_email_notify_password") timeout = int(settings.get("weaver.wps_email_notify_timeout") or 10) port = settings.get("weaver.wps_email_notify_port") ssl = asbool(settings.get("weaver.wps_email_notify_ssl", True)) # an example template is located in # weaver/wps_restapi/templates/notification_email_example.mako template_dir = settings.get("weaver.wps_email_notify_template_dir") or "" if not smtp_host or not port: raise ValueError("The email server configuration is missing.") port = int(port) # find appropriate template according to settings if not os.path.isdir(template_dir): LOGGER.warning("No default email template directory configured. Using default format.") template = Template(text=__DEFAULT_TEMPLATE__) # nosec: B702 else: default_name = settings.get("weaver.wps_email_notify_template_default", "default.mako") process_name = f"{job.process!s}.mako" default_template = os.path.join(template_dir, default_name) process_template = os.path.join(template_dir, process_name) if os.path.isfile(process_template): template = Template(filename=process_template) # nosec: B702 elif os.path.isfile(default_template): template = Template(filename=default_template) # nosec: B702 else: raise IOError(f"Template file doesn't exist: OneOf[{process_name!s}, {default_name!s}]") job_json = job.json(settings) contents = template.render(to=to_email_recipient, job=job, settings=settings, **job_json) message = f"{contents}".strip("\n") if ssl: server = smtplib.SMTP_SSL(smtp_host, port, timeout=timeout) else: server = smtplib.SMTP(smtp_host, port, timeout=timeout) server.ehlo() try: server.starttls() server.ehlo() except smtplib.SMTPException: pass try: if password: server.login(from_addr, password) result = server.sendmail(from_addr, to_email_recipient, message.encode("utf8")) finally: server.close() if result: code, error_message = result[to_email_recipient] raise IOError(f"Code: {code}, Message: {error_message}")
# https://stackoverflow.com/a/55147077
[docs]def get_crypto_key(settings, salt, rounds): # type: (SettingsType, bytes, int) -> bytes """ Get the cryptographic key used for encoding and decoding the email. """ backend = default_backend() pwd = str2bytes(settings.get("weaver.wps_email_encrypt_salt")) # use old param for backward-compat even if not salt kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=__TOKEN_LENGTH__, salt=salt, iterations=rounds, backend=backend) return base64.urlsafe_b64encode(kdf.derive(pwd))
[docs]def encrypt_email(email, settings): # type: (str, SettingsType) -> str if not email or not isinstance(email, str): raise TypeError(f"Invalid email: {email!s}") LOGGER.debug("Job email encrypt.") try: salt = secrets.token_bytes(__SALT_LENGTH__) rounds = int(settings.get("weaver.wps_email_encrypt_rounds", __DEFAULT_ROUNDS__)) iters = rounds.to_bytes(__ROUNDS_LENGTH__, "big") key = get_crypto_key(settings, salt, rounds) msg = base64.urlsafe_b64decode(Fernet(key).encrypt(str2bytes(email))) token = salt + iters + msg return bytes2str(base64.urlsafe_b64encode(token)) except Exception as ex: LOGGER.debug("Job email encrypt failed [%r].", ex) raise ValueError("Cannot register job, server not properly configured for notification email.")
[docs]def decrypt_email(email, settings): # type: (str, SettingsType) -> str if not email or not isinstance(email, str): raise TypeError(f"Invalid email: {email!s}") LOGGER.debug("Job email decrypt.") try: token = base64.urlsafe_b64decode(str2bytes(email)) salt = token[:__SALT_LENGTH__] iters = int.from_bytes(token[__SALT_LENGTH__:__SALT_LENGTH__ + __ROUNDS_LENGTH__], "big") token = base64.urlsafe_b64encode(token[__SALT_LENGTH__ + __ROUNDS_LENGTH__:]) key = get_crypto_key(settings, salt, iters) return bytes2str(Fernet(key).decrypt(token)) except Exception as ex: LOGGER.debug("Job email decrypt failed [%r].", ex) raise ValueError("Cannot complete job, server not properly configured for notification email.")