Source code for tamr_client.dataset.record

"The recommended approach for modifying records is to use the :func:`~tamr_client.record.upsert` and
:func:`~tamr_client.record.delete` functions for all use cases they can handle. For more advanced use cases, the
underlying :func:`~tamr_client.record._update` function can be used directly."
import json
from typing import cast, Dict, IO, Iterable, Iterator, Optional

from tamr_client import primary_key
from tamr_client import response
from tamr_client._types import AnyDataset, Dataset, JsonDict, Session

def _update(session: Session, dataset: Dataset, updates: Iterable[Dict]) -> JsonDict:
    """Send a batch of record creations/updates/deletions to this dataset.
    You probably want to use :func:`~tamr_client.record.upsert`
    or :func:`~tamr_client.record.delete` instead.

        dataset: Dataset containing records to be updated
        updates: Each update should be formatted as specified in the `Public Docs for Dataset updates <>`_.

        JSON response body from server

        requests.HTTPError: If an HTTP error is encountered
    stringified_updates = (json.dumps(update) for update in updates)
    # `requests` accepts a generator for `data` param, but stubs for `requests` in expects this to be a file-like object
    io_updates = cast(IO, stringified_updates)
    r =
        str(dataset.url) + ":updateRecords",
        headers={"Content-Encoding": "utf-8"},
    return response.successful(r).json()

def upsert(
    session: Session,
    dataset: Dataset,
    records: Iterable[Dict],
    primary_key_name: Optional[str] = None,
) -> JsonDict:
    """Create or update the specified records.

        dataset: Dataset to receive record updates
        records: The records to update, as dictionaries
        primary_key_name: The primary key for these records, which must be a key in each record dictionary.
            By default the key_attribute_name of dataset

        JSON response body from server

        requests.HTTPError: If an HTTP error is encountered
        primary_key.NotFound: If primary_key_name does not match dataset primary key
        primary_key.NotFound: If primary_key_name not in a record dictionary
    if primary_key_name is None:
        primary_key_name = dataset.key_attribute_names[0]

    if primary_key_name not in dataset.key_attribute_names:
        raise primary_key.NotFound(
            f"Primary key: {primary_key_name} is not in dataset key attribute names: {dataset.key_attribute_names}"
    updates = (
        _create_command(record, primary_key_name=primary_key_name) for record in records
    return _update(session, dataset, updates)

def delete(
    session: Session,
    dataset: Dataset,
    records: Iterable[Dict],
    primary_key_name: Optional[str] = None,
) -> JsonDict:
    """Deletes the specified records, based on primary key values.  Does not check that other attribute values match.

        dataset: Dataset from which to delete records
        records: The records to update, as dictionaries
        primary_key_name: The primary key for these records, which must be a key in each record dictionary.
            By default the key_attribute_name of dataset

        JSON response body from server

        requests.HTTPError: If an HTTP error is encountered
        primary_key.NotFound: If primary_key_name does not match dataset primary key
        primary_key.NotFound: If primary_key_name not in a record dictionary
    if primary_key_name is None:
        primary_key_name = dataset.key_attribute_names[0]

    if primary_key_name not in dataset.key_attribute_names:
        raise primary_key.NotFound(
            f"Primary key: {primary_key_name} is not in dataset key attribute names: {dataset.key_attribute_names}"
    updates = (
        _delete_command(record, primary_key_name=primary_key_name) for record in records
    return _update(session, dataset, updates)

def _create_command(record: Dict, *, primary_key_name: str) -> Dict:
    """Generates the CREATE command formatted as specified in the `Public Docs for Dataset updates

        record: The record to create, as a dictionary
        primary_key_name: The primary key for this record, which must be a key in the dictionary

        The CREATE command in the proper format
    return {"action": "CREATE", "recordId": record[primary_key_name], "record": record}

def _delete_command(record: Dict, *, primary_key_name: str) -> Dict:
    """Generates the DELETE command formatted as specified in the `Public Docs for Dataset updates

        record: The record to delete, as a dictionary
        primary_key_name: The primary key for this record, which must be a key in the dictionary

        The DELETE command in the proper format
    return {"action": "DELETE", "recordId": record[primary_key_name]}

def stream(session: Session, dataset: AnyDataset) -> Iterator[JsonDict]:
    """Stream the records in this dataset as Python dictionaries.

        dataset: Dataset from which to stream records

        Python generator yielding records
    with session.get(str(dataset.url) + "/records", stream=True) as r:
        return response.ndjson(r)