-
Notifications
You must be signed in to change notification settings - Fork 63
feat: Reworked MutateRows to use the data client #1290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v3_staging
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,11 +17,10 @@ | |
| from typing import Set | ||
| import warnings | ||
|
|
||
| from google.api_core import timeout | ||
| from google.api_core.exceptions import GoogleAPICallError | ||
| from google.api_core.exceptions import Aborted | ||
| from google.api_core.exceptions import DeadlineExceeded | ||
| from google.api_core.exceptions import NotFound | ||
| from google.api_core.exceptions import RetryError | ||
| from google.api_core.exceptions import ServiceUnavailable | ||
| from google.api_core.exceptions import InternalServerError | ||
| from google.api_core.gapic_v1.method import DEFAULT | ||
|
|
@@ -31,17 +30,20 @@ | |
| from google.cloud.bigtable.backup import Backup | ||
| from google.cloud.bigtable.column_family import _gc_rule_from_pb | ||
| from google.cloud.bigtable.column_family import ColumnFamily | ||
| from google.cloud.bigtable.data._helpers import TABLE_DEFAULT | ||
| from google.cloud.bigtable.data.exceptions import ( | ||
| RetryExceptionGroup, | ||
| MutationsExceptionGroup, | ||
| ) | ||
| from google.cloud.bigtable.data.mutations import RowMutationEntry | ||
| from google.cloud.bigtable.batcher import MutationsBatcher | ||
| from google.cloud.bigtable.batcher import FLUSH_COUNT, MAX_MUTATION_SIZE | ||
| from google.cloud.bigtable.encryption_info import EncryptionInfo | ||
| from google.cloud.bigtable.policy import Policy | ||
| from google.cloud.bigtable.row import AppendRow | ||
| from google.cloud.bigtable.row import ConditionalRow | ||
| from google.cloud.bigtable.row import DirectRow | ||
| from google.cloud.bigtable.row_data import ( | ||
| PartialRowsData, | ||
| _retriable_internal_server_error, | ||
| ) | ||
| from google.cloud.bigtable.row_data import PartialRowsData | ||
| from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS | ||
| from google.cloud.bigtable.row_set import RowSet | ||
| from google.cloud.bigtable.row_set import RowRange | ||
|
|
@@ -52,6 +54,7 @@ | |
| from google.cloud.bigtable.admin.types import ( | ||
| bigtable_table_admin as table_admin_messages_v2_pb2, | ||
| ) | ||
| from google.rpc import code_pb2, status_pb2 | ||
|
|
||
| # Maximum number of mutations in bulk (MutateRowsRequest message): | ||
| # (https://cloud.google.com/bigtable/docs/reference/data/rpc/ | ||
|
|
@@ -714,6 +717,9 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT): | |
| specify a ``retry`` strategy of "do-nothing", a deadline of ``0.0`` | ||
| can be specified. | ||
|
|
||
| If a deadline of ``None`` is specified, the deadline defaults to | ||
| a table-default of 600 seconds (10 minutes). | ||
|
|
||
| :type rows: list | ||
| :param rows: List or other iterable of :class:`.DirectRow` instances. | ||
|
|
||
|
|
@@ -731,18 +737,78 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT): | |
| :returns: A list of response statuses (`google.rpc.status_pb2.Status`) | ||
| corresponding to success or failure of each row mutation | ||
| sent. These will be in the same order as the `rows`. | ||
|
|
||
| :raise: ValueError: If a row entry has no mutations, or too many mutations | ||
| """ | ||
| if timeout is DEFAULT: | ||
| timeout = self.mutation_timeout | ||
|
|
||
| retryable_mutate_rows = _RetryableMutateRowsWorker( | ||
| self._instance._client, | ||
| self.name, | ||
| rows, | ||
| app_profile_id=self._app_profile_id, | ||
| timeout=timeout, | ||
| retryable_errors = RETRYABLE_MUTATION_ERRORS | ||
|
|
||
| # The data client cannot take in zero or null values for deadline, so we set it to | ||
| # the default if that is the case. | ||
| if retry.deadline is None: | ||
| operation_timeout = TABLE_DEFAULT.MUTATE_ROWS | ||
|
|
||
| # To adhere to the retry strategy of do-nothing being achievable with a deadline | ||
| # of 0.0, we modify the retryable errors to be empty if such a deadline is passed. | ||
| elif retry.deadline == 0: | ||
| operation_timeout = TABLE_DEFAULT.MUTATE_ROWS | ||
| retryable_errors = [] | ||
| else: | ||
| operation_timeout = retry.deadline | ||
|
|
||
| attempt_timeout = timeout | ||
| mutation_entries = [ | ||
| RowMutationEntry(row.row_key, row._get_mutations()) for row in rows | ||
| ] | ||
| return_statuses = [status_pb2.Status(code=code_pb2.Code.OK)] * len( | ||
| mutation_entries | ||
| ) # By default, return status OKs for everything | ||
|
|
||
| try: | ||
| self._table_impl.bulk_mutate_rows( | ||
| mutation_entries, | ||
| operation_timeout=operation_timeout, | ||
| attempt_timeout=attempt_timeout, | ||
| retryable_errors=retryable_errors, | ||
| ) | ||
| except MutationsExceptionGroup as mut_exc_group: | ||
| # We exception handle as follows: | ||
| # | ||
| # 1. Each exception in the error group is a FailedMutationEntryError, and its | ||
| # cause is either a singular exception or a RetryExceptionGroup consisting of | ||
| # multiple exceptions. | ||
| # | ||
| # 2. In the case of a singular exception, if the error does not have a gRPC status | ||
| # code, we return a status code of UNKNOWN. | ||
| # | ||
| # 3. In the case of a RetryExceptionGroup, we use terminal exception in the exception | ||
| # group and process that. | ||
| for error in mut_exc_group.exceptions: | ||
| cause = error.__cause__ | ||
| if isinstance(cause, RetryExceptionGroup): | ||
| return_statuses[error.index] = self._get_status( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This Maybe if you do this, you should change Alternatively, you could change this code to handle other error types. But I kind of prefer the first option |
||
| cause.exceptions[-1] | ||
| ) | ||
| else: | ||
| return_statuses[error.index] = self._get_status(cause) | ||
|
|
||
| return return_statuses | ||
|
|
||
| @staticmethod | ||
| def _get_status(error): | ||
| if isinstance(error, GoogleAPICallError) and error.grpc_status_code is not None: | ||
| return status_pb2.Status( | ||
| code=error.grpc_status_code.value[0], | ||
| message=error.message, | ||
| details=error.details, | ||
| ) | ||
|
|
||
| return status_pb2.Status( | ||
| code=code_pb2.Code.UNKNOWN, | ||
| message=str(error), | ||
| ) | ||
| return retryable_mutate_rows(retry=retry) | ||
|
|
||
| def sample_row_keys(self): | ||
| """Read a sample of row keys in the table. | ||
|
|
@@ -1070,133 +1136,6 @@ def restore(self, new_table_id, cluster_id=None, backup_id=None, backup_name=Non | |
| ) | ||
|
|
||
|
|
||
| class _RetryableMutateRowsWorker(object): | ||
| """A callable worker that can retry to mutate rows with transient errors. | ||
|
|
||
| This class is a callable that can retry mutating rows that result in | ||
| transient errors. After all rows are successful or none of the rows | ||
| are retryable, any subsequent call on this callable will be a no-op. | ||
| """ | ||
|
|
||
| def __init__(self, client, table_name, rows, app_profile_id=None, timeout=None): | ||
| self.client = client | ||
| self.table_name = table_name | ||
| self.rows = rows | ||
| self.app_profile_id = app_profile_id | ||
| self.responses_statuses = [None] * len(self.rows) | ||
| self.timeout = timeout | ||
|
|
||
| def __call__(self, retry=DEFAULT_RETRY): | ||
| """Attempt to mutate all rows and retry rows with transient errors. | ||
|
|
||
| Will retry the rows with transient errors until all rows succeed or | ||
| ``deadline`` specified in the `retry` is reached. | ||
|
|
||
| :rtype: list | ||
| :returns: A list of response statuses (`google.rpc.status_pb2.Status`) | ||
| corresponding to success or failure of each row mutation | ||
| sent. These will be in the same order as the ``rows``. | ||
| """ | ||
| mutate_rows = self._do_mutate_retryable_rows | ||
| if retry: | ||
| mutate_rows = retry(self._do_mutate_retryable_rows) | ||
|
|
||
| try: | ||
| mutate_rows() | ||
| except (_BigtableRetryableError, RetryError): | ||
| # - _BigtableRetryableError raised when no retry strategy is used | ||
| # and a retryable error on a mutation occurred. | ||
| # - RetryError raised when retry deadline is reached. | ||
| # In both cases, just return current `responses_statuses`. | ||
| pass | ||
|
|
||
| return self.responses_statuses | ||
|
|
||
| @staticmethod | ||
| def _is_retryable(status): | ||
| return status is None or status.code in RETRYABLE_CODES | ||
|
|
||
| def _do_mutate_retryable_rows(self): | ||
| """Mutate all the rows that are eligible for retry. | ||
|
|
||
| A row is eligible for retry if it has not been tried or if it resulted | ||
| in a transient error in a previous call. | ||
|
|
||
| :rtype: list | ||
| :return: The responses statuses, which is a list of | ||
| :class:`~google.rpc.status_pb2.Status`. | ||
| :raises: One of the following: | ||
|
|
||
| * :exc:`~.table._BigtableRetryableError` if any | ||
| row returned a transient error. | ||
| * :exc:`RuntimeError` if the number of responses doesn't | ||
| match the number of rows that were retried | ||
| """ | ||
| retryable_rows = [] | ||
| index_into_all_rows = [] | ||
| for index, status in enumerate(self.responses_statuses): | ||
| if self._is_retryable(status): | ||
| retryable_rows.append(self.rows[index]) | ||
| index_into_all_rows.append(index) | ||
|
|
||
| if not retryable_rows: | ||
| # All mutations are either successful or non-retryable now. | ||
| return self.responses_statuses | ||
|
|
||
| entries = _compile_mutation_entries(self.table_name, retryable_rows) | ||
| data_client = self.client.table_data_client | ||
|
|
||
| kwargs = {} | ||
| if self.timeout is not None: | ||
| kwargs["timeout"] = timeout.ExponentialTimeout(deadline=self.timeout) | ||
|
|
||
| try: | ||
| responses = data_client.mutate_rows( | ||
| table_name=self.table_name, | ||
| entries=entries, | ||
| app_profile_id=self.app_profile_id, | ||
| retry=None, | ||
| **kwargs | ||
| ) | ||
| except RETRYABLE_MUTATION_ERRORS as exc: | ||
| # If an exception, considered retryable by `RETRYABLE_MUTATION_ERRORS`, is | ||
| # returned from the initial call, consider | ||
| # it to be retryable. Wrap as a Bigtable Retryable Error. | ||
| # For InternalServerError, it is only retriable if the message is related to RST Stream messages | ||
| if _retriable_internal_server_error(exc) or not isinstance( | ||
| exc, InternalServerError | ||
| ): | ||
| raise _BigtableRetryableError | ||
| else: | ||
| # re-raise the original exception | ||
| raise | ||
|
|
||
| num_responses = 0 | ||
| num_retryable_responses = 0 | ||
| for response in responses: | ||
| for entry in response.entries: | ||
| num_responses += 1 | ||
| index = index_into_all_rows[entry.index] | ||
| self.responses_statuses[index] = entry.status | ||
| if self._is_retryable(entry.status): | ||
| num_retryable_responses += 1 | ||
| if entry.status.code == 0: | ||
| self.rows[index].clear() | ||
|
|
||
| if len(retryable_rows) != num_responses: | ||
| raise RuntimeError( | ||
| "Unexpected number of responses", | ||
| num_responses, | ||
| "Expected", | ||
| len(retryable_rows), | ||
| ) | ||
|
|
||
| if num_retryable_responses: | ||
| raise _BigtableRetryableError | ||
|
|
||
| return self.responses_statuses | ||
|
|
||
|
|
||
| class ClusterState(object): | ||
| """Representation of a Cluster State. | ||
|
|
||
|
|
@@ -1343,73 +1282,3 @@ def _create_row_request( | |
| row_set._update_message_request(message) | ||
|
|
||
| return message | ||
|
|
||
|
|
||
| def _compile_mutation_entries(table_name, rows): | ||
| """Create list of mutation entries | ||
|
|
||
| :type table_name: str | ||
| :param table_name: The name of the table to write to. | ||
|
|
||
| :type rows: list | ||
| :param rows: List or other iterable of :class:`.DirectRow` instances. | ||
|
|
||
| :rtype: List[:class:`data_messages_v2_pb2.MutateRowsRequest.Entry`] | ||
| :returns: entries corresponding to the inputs. | ||
| :raises: :exc:`~.table.TooManyMutationsError` if the number of mutations is | ||
| greater than the max ({}) | ||
| """.format( | ||
| _MAX_BULK_MUTATIONS | ||
| ) | ||
| entries = [] | ||
| mutations_count = 0 | ||
| entry_klass = data_messages_v2_pb2.MutateRowsRequest.Entry | ||
|
|
||
| for row in rows: | ||
| _check_row_table_name(table_name, row) | ||
| _check_row_type(row) | ||
| mutations = row._get_mutation_pbs() | ||
| entries.append(entry_klass(row_key=row.row_key, mutations=mutations)) | ||
| mutations_count += len(mutations) | ||
|
|
||
| if mutations_count > _MAX_BULK_MUTATIONS: | ||
| raise TooManyMutationsError( | ||
| "Maximum number of mutations is %s" % (_MAX_BULK_MUTATIONS,) | ||
| ) | ||
| return entries | ||
|
|
||
|
|
||
| def _check_row_table_name(table_name, row): | ||
| """Checks that a row belongs to a table. | ||
|
|
||
| :type table_name: str | ||
| :param table_name: The name of the table. | ||
|
|
||
| :type row: :class:`~google.cloud.bigtable.row.Row` | ||
| :param row: An instance of :class:`~google.cloud.bigtable.row.Row` | ||
| subclasses. | ||
|
|
||
| :raises: :exc:`~.table.TableMismatchError` if the row does not belong to | ||
| the table. | ||
| """ | ||
| if row.table is not None and row.table.name != table_name: | ||
| raise TableMismatchError( | ||
| "Row %s is a part of %s table. Current table: %s" | ||
| % (row.row_key, row.table.name, table_name) | ||
| ) | ||
|
|
||
|
|
||
| def _check_row_type(row): | ||
| """Checks that a row is an instance of :class:`.DirectRow`. | ||
|
|
||
| :type row: :class:`~google.cloud.bigtable.row.Row` | ||
| :param row: An instance of :class:`~google.cloud.bigtable.row.Row` | ||
| subclasses. | ||
|
|
||
| :raises: :class:`TypeError <exceptions.TypeError>` if the row is not an | ||
| instance of DirectRow. | ||
| """ | ||
| if not isinstance(row, DirectRow): | ||
| raise TypeError( | ||
| "Bulk processing can not be applied for " "conditional or append mutations." | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.