Source code for tamr_client.schema_mapping.attribute_mapping

"""
See https://docs.tamr.com/new/reference/retrieve-projects-mappings
"""
from copy import deepcopy
from typing import Dict, List, Optional, Tuple

from tamr_client import attribute, dataset, response
from tamr_client import project as tc_project
from tamr_client._types import (
    Attribute,
    AttributeMapping,
    Instance,
    JsonDict,
    Project,
    Session,
    URL,
)
from tamr_client.exception import TamrClientException


[docs]class NotFound(TamrClientException): """Raised when referencing an attribute mapping that does not exist on the server.""" pass
[docs]class AlreadyExists(TamrClientException): """Raised when an attribute mapping with these specifications already exists.""" pass
[docs]class Ambiguous(TamrClientException): """Raised when an attribute mapping specification is incomplete, ambiguous, or contradictory.""" pass
[docs]def create( session: Session, project: Project, input_attribute: Attribute, unified_attribute: Attribute, ) -> AttributeMapping: """Create a mapping in Tamr between input attributes and unified attributes of the given project. Args: project: Tamr project input_attribute: The attribute of a source dataset to map unified_attribute: The attribute of the unified dataset to map onto Returns: Attribute mapping created in Tamr Raises: attribute_mapping.AlreadyExists: If an attribute mapping with these specifications already exists. requests.HTTPError: If any other HTTP error is encountered. """ data = { "relativeInputAttributeId": input_attribute.url.path, "relativeUnifiedAttributeId": unified_attribute.url.path, } r = session.post(url=str(project.url) + "/attributeMappings", json=data) if r.status_code == 204: raise AlreadyExists( "An attribute mapping with these specifications already exists." ) if r.status_code == 400: raise Ambiguous(r.json()["message"]) if r.status_code == 404: if "dataset" in r.json()["message"].lower(): raise dataset.NotFound(r.json()["message"]) elif "project" in r.json()["message"].lower(): raise tc_project.NotFound(r.json()["message"]) data = response.successful(r).json() instance = project.url.instance url = URL(instance=instance, path=data["relativeId"]) return AttributeMapping( url, input_attribute=attribute._attribute._by_url( session, URL(instance=instance, path=data["relativeInputAttributeId"]), ), unified_attribute=attribute._attribute._by_url( session, URL(instance=instance, path=data["relativeUnifiedAttributeId"]), ), )
[docs]def get_all(session: Session, tamr_project: Project) -> List[AttributeMapping]: """Get all attribute mappings of a Tamr project Args: tamr_project: Tamr project Returns: The attribute mappings of the project Raises: requests.HTTPError: If an HTTP error is encountered. """ url = str(tamr_project.url) + "/attributeMappings" r = session.get(url=url) data = response.successful(r).json() mapping_list = [] attribute_memo: Dict[URL, Attribute] = {} for mapping_data in data: mapping, attribute_memo = _get( session, tamr_project.url.instance, mapping_data, attribute_memo=attribute_memo, ) mapping_list.append(mapping) return mapping_list
def _get( session: Session, instance: Instance, data: JsonDict, *, attribute_memo: Optional[Dict[URL, Attribute]] = None ) -> Tuple[AttributeMapping, Dict[URL, Attribute]]: """Construct an attribute mapping from server response This is a utility function used by `attribute_mapping.get_all`. When an attribute is fetched, all other attributes in its dataset are also retrieved and stored in a dictionary to limit the number of requests made to the server Args: instance: A Tamr instance data: The JSON body of the server response retrieve a project's attribute mappings attribute_memo: Dictionary of fetched attributes used to avoid redundant requests Returns: The attribute mapping and updated memo """ if attribute_memo is None: attribute_memo = {} cp = deepcopy(data) input_attribute_url = URL(instance=instance, path=cp["relativeInputAttributeId"]) unified_attribute_url = URL( instance=instance, path=cp["relativeUnifiedAttributeId"] ) if input_attribute_url not in attribute_memo: attr_dataset = dataset.by_resource_id( session, instance, cp["relativeInputAttributeId"].split("/")[1] ) attribute_memo = { **attribute_memo, **{attr.url: attr for attr in dataset.attributes(session, attr_dataset)}, } if unified_attribute_url not in attribute_memo: attr_dataset = dataset.by_resource_id( session, instance, cp["relativeUnifiedAttributeId"].split("/")[1] ) attribute_memo = { **attribute_memo, **{attr.url: attr for attr in dataset.attributes(session, attr_dataset)}, } return ( AttributeMapping( url=URL(instance=instance, path=cp["relativeId"]), input_attribute=attribute_memo[input_attribute_url], unified_attribute=attribute_memo[unified_attribute_url], ), attribute_memo, )
[docs]def delete(session: Session, attribute_mapping: AttributeMapping): """Delete an existing attribute mapping Args: attribute_mapping: Existing attribute mapping to delete Raises: attribute_mapping.NotFound: If no attribute mapping could be found at the specified URL. Corresponds to a 404 HTTP error. requests.HTTPError: If any other HTTP error is encountered. """ r = session.delete(str(attribute_mapping.url)) if r.status_code == 404: raise NotFound(str(attribute_mapping.url)) response.successful(r)