"""
Definitions related to :term:`Provenance` features and the :term:`W3C` ``PROV`` specification.
"""
import hashlib
from typing import TYPE_CHECKING, cast
from urllib.parse import urlparse
from cwltool.cwlprov import provenance_constants as cwl_prov_const
from cwltool.cwlprov.ro import ResearchObject
from prov import constants as prov_const
from weaver.__meta__ import __version__ as weaver_version
from weaver.base import Constants
from weaver.formats import ContentType, OutputFormat
from weaver.utils import get_weaver_url
if TYPE_CHECKING:
from typing import Any, List, Optional, Tuple, Union
from uuid import UUID
from cwltool.cwlprov.provenance_profile import ProvenanceProfile
from cwltool.stdfsaccess import StdFsAccess
from prov.model import ProvDocument
from weaver.base import EnumType
from weaver.datatype import Job
from weaver.formats import AnyContentType
from weaver.typedefs import AnyKey, AnySettingsContainer
[docs]
class ProvenancePathType(Constants):
[docs]
PROV_INFO = "/prov/info"
[docs]
PROV_OUTPUTS = "/prov/outputs"
[docs]
PROV_RUNS = "/prov/runs"
@classmethod
[docs]
def types(cls):
# type: () -> List[str]
return [cls.as_type(prov) for prov in cls.values()]
@classmethod
[docs]
def as_type(cls, prov):
# type: (Any) -> Optional[str]
prov = cls.get(prov)
if isinstance(prov, str):
return prov.rsplit("/", 1)[-1]
return None
@classmethod
[docs]
def get( # pylint: disable=W0221,W0237 # arguments differ/renamed for clarity
cls,
prov, # type: Union[AnyKey, EnumType, "ProvenancePathType"]
default=None, # type: Optional[Any]
run_id=None, # type: Optional[str]
): # type: (...) -> Optional["ProvenancePathType"]
prov_found = super().get(prov)
if prov_found is not None and run_id is None:
return prov_found
if isinstance(prov, str):
if not prov_found and prov.strip("/") not in ProvenancePathType.types():
return default
prov = f"/{prov}" if not prov.startswith("/") else prov
prov = f"/prov{prov}" if not prov.startswith("/prov") else prov
if run_id:
if prov.rsplit("/", 1)[-1] in ["run", "inputs", "outputs"]:
prov = f"{prov}/{run_id}"
else:
return default
return cast("ProvenancePathType", prov)
return default
[docs]
class WeaverResearchObject(ResearchObject):
"""
Defines extended :term:`Provenance` details with `Weaver` operations and referencing the active server instance.
"""
def __init__(self, job, settings, fs_access, temp_prefix_ro="tmp", orcid="", full_name=""):
# type: (Job, AnySettingsContainer, StdFsAccess, str, str, str) -> None
super(WeaverResearchObject, self).__init__(fs_access, temp_prefix_ro, orcid, full_name)
# rewrite auto-initialized random UUIDs with Weaver-specific references
[docs]
self.ro_uuid = job.uuid
[docs]
self.base_uri = f"arcp://uuid,{self.ro_uuid}/"
[docs]
self.settings = settings
@staticmethod
[docs]
def sha1_uuid(document, identifier):
# type: (ProvDocument, str) -> str
"""
Generate a prefixed SHA1 hash from the identifier value.
"""
sha1_ns = document._namespaces[cwl_prov_const.DATA]
sha1_id = f"{sha1_ns.prefix}:{hashlib.sha1(identifier.encode(), usedforsecurity=False).hexdigest()}"
return sha1_id
[docs]
def initialize_provenance(self, full_name, host_provenance, user_provenance, orcid, fsaccess, run_uuid=None):
# type: (str, bool, bool, str, StdFsAccess, Optional[UUID]) -> ProvenanceProfile
"""
Hook `Weaver` metadata onto user provenance step.
"""
prov_profile = super(WeaverResearchObject, self).initialize_provenance(
full_name=full_name,
host_provenance=host_provenance,
user_provenance=user_provenance,
orcid=orcid,
fsaccess=fsaccess,
run_uuid=run_uuid,
)
document = prov_profile.document
doi_ns = document.add_namespace("doi", "https://doi.org/")
weaver_full_name = f"crim-ca/weaver:{weaver_version}"
weaver_code_url = "https://github.com/crim-ca/weaver"
weaver_code_sha1 = self.sha1_uuid(document, weaver_code_url)
weaver_code_entity = document.entity(
weaver_code_sha1,
{
prov_const.PROV_TYPE: prov_const.PROV["PrimarySource"],
prov_const.PROV_LABEL: "Source code repository",
prov_const.PROV_LOCATION: weaver_code_url,
},
)
weaver_url = get_weaver_url(self.settings)
weaver_desc = self.settings.get(
"weaver.wps_metadata_identification_abstract",
"Weaver OGC API Processes Server"
)
weaver_instance_sha1 = self.sha1_uuid(document, weaver_url)
weaver_instance_meta = [
(prov_const.PROV_TYPE, prov_const.PROV["SoftwareAgent"]),
(prov_const.PROV_LOCATION, weaver_url),
(prov_const.PROV_LABEL, weaver_desc),
(prov_const.PROV_LABEL, weaver_full_name),
(prov_const.PROV_ATTR_GENERAL_ENTITY, weaver_code_sha1),
(prov_const.PROV_ATTR_SPECIFIC_ENTITY, f"{doi_ns.prefix}:10.5281/zenodo.14210717"), # see CITATION.cff
]
weaver_instance_agent = document.agent(weaver_instance_sha1, weaver_instance_meta)
crim_name = "Computer Research Institute of Montréal"
crim_sha1 = self.sha1_uuid(document, crim_name)
crim_entity = document.entity(
crim_sha1,
{
prov_const.PROV_TYPE: prov_const.PROV["Organization"],
cwl_prov_const.FOAF["name"]: crim_name,
cwl_prov_const.SCHEMA["name"]: crim_name,
}
)
server_provider_name = self.settings.get("weaver.wps_metadata_provider_name")
server_provider_url = self.settings.get("weaver.wps_metadata_provider_url")
server_provider_meta = []
server_provider_entity = None
if server_provider_name:
server_provider_meta.extend([
(cwl_prov_const.FOAF["name"], server_provider_name),
(cwl_prov_const.SCHEMA["name"], server_provider_name),
])
if server_provider_url:
server_provider_meta.extend([
(prov_const.PROV_LOCATION, server_provider_url),
])
if server_provider_meta:
server_provider_id = server_provider_url or server_provider_name
server_provider_sha1 = self.sha1_uuid(document, server_provider_id)
server_provider_meta.extend([
(prov_const.PROV_TYPE, prov_const.PROV["Organization"]),
(prov_const.PROV_LABEL, "Server Provider"),
])
server_provider_entity = document.entity(
server_provider_sha1,
server_provider_meta,
)
job_entity = document.entity(
self.job.uuid.urn,
{
prov_const.PROV_TYPE: cwl_prov_const.WFDESC["ProcessRun"],
prov_const.PROV_LOCATION: self.job.job_url(self.settings),
prov_const.PROV_LABEL: "Job Information",
}
)
proc_url = self.job.process_url(self.settings)
proc_id = f"{self.job.service}:{self.job.process}" if self.job.service else self.job.process
proc_uuid = f"{weaver_instance_sha1}:{proc_id}"
proc_entity = document.entity(
proc_uuid,
{
prov_const.PROV_TYPE: cwl_prov_const.WFDESC["Process"],
prov_const.PROV_LOCATION: proc_url,
prov_const.PROV_LABEL: "Process Description",
}
)
# following agents are expected to exist (created by inherited class)
cwltool_agent = document.get_record(cwl_prov_const.ACCOUNT_UUID)[0]
user_agent = document.get_record(cwl_prov_const.USER_UUID)[0]
wf_agent = document.get_record(self.engine_uuid)[0] # current job run aligned with cwl workflow
# define relationships cross-references: https://wf4ever.github.io/ro/wfprov.owl
document.primary_source(weaver_instance_agent, weaver_code_entity)
document.actedOnBehalfOf(weaver_instance_agent, user_agent)
document.specializationOf(weaver_instance_agent, cwltool_agent)
document.attribution(crim_entity, weaver_code_entity)
document.wasDerivedFrom(cwltool_agent, weaver_instance_agent)
document.wasStartedBy(job_entity, weaver_instance_agent)
document.wasStartedBy(wf_agent, job_entity, time=self.job.created)
document.specializationOf(wf_agent, job_entity)
document.alternateOf(wf_agent, job_entity)
document.wasGeneratedBy(job_entity, proc_entity)
if server_provider_entity:
document.derivation(server_provider_entity, weaver_instance_agent)
document.attribution(server_provider_entity, weaver_instance_agent)
return prov_profile
[docs]
def resolve_user(self):
# type: () -> Tuple[str, str]
"""
Override :mod:`cwltool` default machine user.
"""
weaver_full_name = f"crim-ca/weaver:{weaver_version}"
return weaver_full_name, weaver_full_name
[docs]
def resolve_host(self):
# type: () -> Tuple[str, str]
"""
Override :mod:`cwltool` default machine host.
"""
weaver_url = get_weaver_url(self.settings)
weaver_fqdn = urlparse(weaver_url).hostname
return weaver_fqdn, weaver_url