"""
See https://docs.tamr.com/reference/record
"The recommended approach for interacting with records is to use the :func:`~tamr_client.record.upsert` and
:func:`~tamr_client.record.delete` functions for all use cases they can handle. For more advanced use cases, the
underlying _update function can be used directly."
"""
import json
from typing import cast, Dict, IO, Iterable, Optional
import tamr_client as tc
from tamr_client.types import JsonDict
[docs]class PrimaryKeyNotFound(Exception):
"""Raised when referencing a primary key by name that does not exist."""
pass
def _update(
session: tc.Session, dataset: tc.Dataset, updates: Iterable[Dict]
) -> JsonDict:
"""Send a batch of record creations/updates/deletions to this dataset.
You probably want to use :func:`~tamr_client.record.upsert`
or :func:`~tamr_client.record.delete` instead.
Args:
dataset: Dataset containing records to be updated
updates: Each update should be formatted as specified in the `Public Docs for Dataset updates <https://docs.tamr.com/reference#modify-a-datasets-records>`_.
Returns:
JSON response body from server
Raises:
requests.HTTPError: If an HTTP error is encountered
"""
stringified_updates = (json.dumps(update) for update in updates)
# `requests` accepts a generator for `data` param, but stubs for `requests` in https://github.com/python/typeshed expects this to be a file-like object
io_updates = cast(IO, stringified_updates)
r = session.post(
str(dataset.url) + ":updateRecords",
headers={"Content-Encoding": "utf-8"},
data=io_updates,
)
return tc.response.successful(r).json()
def upsert(
session: tc.Session,
dataset: tc.Dataset,
records: Iterable[Dict],
*,
primary_key_name: Optional[str] = None,
) -> JsonDict:
"""Create or update the specified records.
Args:
dataset: Dataset to receive record updates
records: The records to update, as dictionaries
primary_key_name: The primary key for these records, which must be a key in each record dictionary.
By default the key_attribute_name of dataset
Returns:
JSON response body from server
Raises:
requests.HTTPError: If an HTTP error is encountered
PrimaryKeyNotFound: If primary_key_name does not match dataset primary key
PrimaryKeyNotFound: If primary_key_name not in a record dictionary
"""
if primary_key_name is None:
primary_key_name = dataset.key_attribute_names[0]
if primary_key_name not in dataset.key_attribute_names:
raise PrimaryKeyNotFound(
f"Primary key: {primary_key_name} is not in dataset key attribute names: {dataset.key_attribute_names}"
)
updates = (
_create_command(record, primary_key_name=primary_key_name) for record in records
)
return _update(session, dataset, updates)
def delete(
session: tc.Session,
dataset: tc.Dataset,
records: Iterable[Dict],
*,
primary_key_name: Optional[str] = None,
) -> JsonDict:
"""Deletes the specified records, based on primary key values. Does not check that other attribute values match.
Args:
dataset: Dataset from which to delete records
records: The records to update, as dictionaries
primary_key_name: The primary key for these records, which must be a key in each record dictionary.
By default the key_attribute_name of dataset
Returns:
JSON response body from server
Raises:
requests.HTTPError: If an HTTP error is encountered
PrimaryKeyNotFound: If primary_key_name does not match dataset primary key
PrimaryKeyNotFound: If primary_key_name not in a record dictionary
"""
if primary_key_name is None:
primary_key_name = dataset.key_attribute_names[0]
if primary_key_name not in dataset.key_attribute_names:
raise PrimaryKeyNotFound(
f"Primary key: {primary_key_name} is not in dataset key attribute names: {dataset.key_attribute_names}"
)
updates = (
_delete_command(record, primary_key_name=primary_key_name) for record in records
)
return _update(session, dataset, updates)
def _create_command(record: Dict, *, primary_key_name: str) -> Dict:
"""Generates the CREATE command formatted as specified in the `Public Docs for Dataset updates
<https://docs.tamr.com/reference#modify-a-datasets-records>`_.
Args:
record: The record to create, as a dictionary
primary_key_name: The primary key for this record, which must be a key in the dictionary
Returns:
The CREATE command in the proper format
"""
return {"action": "CREATE", "recordId": record[primary_key_name], "record": record}
def _delete_command(record: Dict, *, primary_key_name: str) -> Dict:
"""Generates the DELETE command formatted as specified in the `Public Docs for Dataset updates
<https://docs.tamr.com/reference#modify-a-datasets-records>`_.
Args:
record: The record to delete, as a dictionary
primary_key_name: The primary key for this record, which must be a key in the dictionary
Returns:
The DELETE command in the proper format
"""
return {"action": "DELETE", "recordId": record[primary_key_name]}