Source code for weaver.notify

import base64
import logging
import os
import secrets
import smtplib
from typing import TYPE_CHECKING

from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from mako.template import Template
from pyramid.settings import asbool

from weaver import WEAVER_MODULE_DIR
from weaver.datatype import Job
from weaver.processes.constants import JobInputsOutputsSchema
from weaver.status import Status
from weaver.utils import bytes2str, fully_qualified_name, get_settings, request_extra, str2bytes
from weaver.wps_restapi.jobs.utils import get_results

if TYPE_CHECKING:
    from typing import Optional

    from weaver.typedefs import AnySettingsContainer, ExecutionSubscribers, JSON, SettingsType

[docs] LOGGER = logging.getLogger(__name__)
__SALT_LENGTH__ = 16 __TOKEN_LENGTH__ = 32 __ROUNDS_LENGTH__ = 4 __DEFAULT_ROUNDS__ = 100_000
[docs] def resolve_email_template(job, settings): # type: (Job, SettingsType) -> Template """ Finds the most appropriate Mako Template email notification file based on configuration and :term:`Job` context. The example template is used by default *ONLY* if the template directory was not overridden. If overridden, failing to match any of the template file locations will raise to report the issue instead of silently using the default. .. seealso:: https://github.com/crim-ca/weaver/blob/master/weaver/wps_restapi/templates/notification_email_example.mako :raises IOError: If the template directory was configured explicitly, but cannot be resolved, or if any of the possible combinations of template file names cannot be resolved under that directory. :returns: Matched template instance based on resolution order as described in the documentation. """ template_dir = settings.get("weaver.wps_email_notify_template_dir") or "" # find appropriate template according to settings if not template_dir and not os.path.isdir(template_dir): LOGGER.warning("No default email template directory configured. Using default template.") template_file = os.path.join(WEAVER_MODULE_DIR, "wps_restapi/templates/notification_email_example.mako") template = Template(filename=template_file) # nosec: B702 else: default_setting = "weaver.wps_email_notify_template_default" default_default = "default.mako" default_name = settings.get(default_setting) or default_default process_name = f"{job.process!s}.mako" process_status_name = f"{job.process!s}/{job.status!s}.mako" default_template = os.path.join(template_dir, default_name) process_template = os.path.join(template_dir, process_name) process_status_template = os.path.join(template_dir, process_status_name) if os.path.isfile(process_status_template): template = Template(filename=process_status_template) # nosec: B702 elif os.path.isfile(process_template): template = Template(filename=process_template) # nosec: B702 elif os.path.isfile(default_template): template = Template(filename=default_template) # nosec: B702 else: raise IOError( f"No Mako Template file could be resolved under the template directory: [{template_dir}]. Expected " f"OneOf[{process_status_name!s}, {process_name!s}, {{{default_setting!s}}}, {default_default!s}]" ) return template
[docs] def notify_job_email(job, to_email_recipient, container): # type: (Job, str, AnySettingsContainer) -> None """ Send email notification of a :term:`Job` status. """ settings = get_settings(container) smtp_host = settings.get("weaver.wps_email_notify_smtp_host") from_addr = settings.get("weaver.wps_email_notify_from_addr") password = settings.get("weaver.wps_email_notify_password") timeout = int(settings.get("weaver.wps_email_notify_timeout") or 10) port = settings.get("weaver.wps_email_notify_port") ssl = asbool(settings.get("weaver.wps_email_notify_ssl", True)) if not smtp_host or not port: # pragma: no cover # only raise to warn service manager # note: don't expose the values to avoid leaking them in logs raise ValueError( "The email server configuration is missing or incomplete. " "Validate that SMTP host and port are properly configured." ) port = int(port) template = resolve_email_template(job, settings) job_json = job.json(settings) contents = template.render(to=to_email_recipient, job=job, settings=settings, **job_json) message = f"{contents}".strip("\n") if ssl: server = smtplib.SMTP_SSL(smtp_host, port, timeout=timeout) else: server = smtplib.SMTP(smtp_host, port, timeout=timeout) server.ehlo() try: server.starttls() server.ehlo() except smtplib.SMTPException: pass try: if password: server.login(from_addr, password) result = server.sendmail(from_addr, to_email_recipient, message.encode("utf8")) finally: server.close() if result: code, error_message = result[to_email_recipient] raise IOError(f"Code: {code}, Message: {error_message}")
# https://stackoverflow.com/a/55147077
[docs] def get_crypto_key(settings, salt, rounds): # type: (SettingsType, bytes, int) -> bytes """ Get the cryptographic key used for encoding and decoding the email. """ backend = default_backend() pwd = str2bytes(settings.get("weaver.wps_email_encrypt_salt")) # use old param for backward-compat even if not salt kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=__TOKEN_LENGTH__, salt=salt, iterations=rounds, backend=backend) return base64.urlsafe_b64encode(kdf.derive(pwd))
[docs] def encrypt_email(email, settings): # type: (str, SettingsType) -> str if not email or not isinstance(email, str): raise TypeError(f"Invalid email: {email!s}") LOGGER.debug("Job email encrypt.") try: salt = secrets.token_bytes(__SALT_LENGTH__) rounds = int(settings.get("weaver.wps_email_encrypt_rounds", __DEFAULT_ROUNDS__)) iters = rounds.to_bytes(__ROUNDS_LENGTH__, "big") key = get_crypto_key(settings, salt, rounds) msg = base64.urlsafe_b64decode(Fernet(key).encrypt(str2bytes(email))) token = salt + iters + msg return bytes2str(base64.urlsafe_b64encode(token)) except Exception as ex: LOGGER.debug("Job email encrypt failed [%r].", ex) raise ValueError("Cannot register job, server not properly configured for notification email.")
[docs] def decrypt_email(email, settings): # type: (str, SettingsType) -> str if not email or not isinstance(email, str): raise TypeError(f"Invalid email: {email!s}") LOGGER.debug("Job email decrypt.") try: token = base64.urlsafe_b64decode(str2bytes(email)) salt = token[:__SALT_LENGTH__] iters = int.from_bytes(token[__SALT_LENGTH__:__SALT_LENGTH__ + __ROUNDS_LENGTH__], "big") token = base64.urlsafe_b64encode(token[__SALT_LENGTH__ + __ROUNDS_LENGTH__:]) key = get_crypto_key(settings, salt, iters) return bytes2str(Fernet(key).decrypt(token)) except Exception as ex: LOGGER.debug("Job email decrypt failed [%r].", ex) raise ValueError("Cannot complete job, server not properly configured for notification email.")
[docs] def map_job_subscribers(job_body, settings): # type: (JSON, SettingsType) -> Optional[ExecutionSubscribers] """ Converts the :term:`Job` subscribers definition submitted at execution into a mapping for later reference. The returned contents must be sorted in the relevant :term:`Job` object. For backward compatibility, ``notification_email`` directly provided at the root will be used if corresponding definitions were not provided for the corresponding subscriber email fields. """ notification_email = job_body.get("notification_email") submit_subscribers = job_body.get("subscribers") or {} mapped_subscribers = {} for status, name, sub_type, alt in [ (Status.STARTED, "inProgressEmail", "emails", None), (Status.FAILED, "failedEmail", "emails", notification_email), (Status.SUCCEEDED, "successEmail", "emails", notification_email), (Status.STARTED, "inProgressUri", "callbacks", None), (Status.FAILED, "failedUri", "callbacks", None), (Status.SUCCEEDED, "successUri", "callbacks", None), ]: value = submit_subscribers.get(name) or alt if not value: continue if sub_type == "emails": value = encrypt_email(value, settings) mapped_subscribers.setdefault(sub_type, {}) mapped_subscribers[sub_type][status] = value return mapped_subscribers or None
[docs] def send_job_notification_email(job, task_logger, settings): # type: (Job, logging.Logger, SettingsType) -> None """ Sends a notification email about the execution status for the subscriber if requested during :term:`Job` submission. """ job_subs = job.subscribers or {} notification_email = job_subs.get("emails", {}).get(job.status) if notification_email: try: email = decrypt_email(notification_email, settings) notify_job_email(job, email, settings) message = "Notification email sent successfully." job.save_log(logger=task_logger, message=message) except Exception as exc: # pragma: no cover exception = f"{fully_qualified_name(exc)}: {exc!s}" message = f"Couldn't send notification email: [{exception}]" job.save_log(errors=message, logger=task_logger, message=message)
[docs] def send_job_callback_request(job, task_logger, settings): # type: (Job, logging.Logger, SettingsType) -> None """ Send a callback request about the execution status for the subscriber if requested at :term:`Job` execution. """ job_subs = job.subscribers or {} request_uri = job_subs.get("callbacks", {}).get(job.status) if request_uri: try: if job.status != Status.SUCCEEDED: body = job.json(settings) else: # OGC-compliant request body needed to respect 'subscribers' callback definition # (https://github.com/opengeospatial/ogcapi-processes/blob/master/core/examples/yaml/callbacks.yaml) body, _ = get_results( job, settings, value_key="value", schema=JobInputsOutputsSchema.OGC, link_references=False, ) request_extra( "POST", request_uri, json=body, allowed_codes=[200, 201, 202], cache_enabled=False, settings=settings, ) message = "Notification callback request sent successfully." job.save_log(logger=task_logger, message=message) except Exception as exc: # pragma: no cover exception = f"{fully_qualified_name(exc)}: {exc!s}" message = f"Couldn't send notification callback request: [{exception}]" job.save_log(errors=message, logger=task_logger, message=message)
[docs] def notify_job_subscribers(job, task_logger, settings): # type: (Job, logging.Logger, SettingsType) -> None """ Send notifications to all requested :term:`Job` subscribers according to its current status. All notification operations must be implemented as non-raising. In case of error, the :term:`Job` logs will be updated with relevant error details and resume execution. """ try: send_job_notification_email(job, task_logger, settings) send_job_callback_request(job, task_logger, settings) except Exception as exc: # pragma: no cover exception = f"{fully_qualified_name(exc)}: {exc!s}" message = ( f"Unhandled error occurred when processing a job notification subscriber: [{exception}]. " "Error ignored to resume execution." ) job.save_log(errors=message, logger=task_logger, message=message)