diff --git a/tfx_bsl/beam/bsl_constants.py b/tfx_bsl/beam/bsl_constants.py new file mode 100644 index 00000000..4b7473b2 --- /dev/null +++ b/tfx_bsl/beam/bsl_constants.py @@ -0,0 +1,5 @@ +_RECORDBATCH_COLUMN = '__RAW_RECORD__' + +class DataType(object): + EXAMPLE = 'EXAMPLE' + SEQUENCEEXAMPLE = 'SEQUENCEEXAMPLE' diff --git a/tfx_bsl/beam/bsl_util.py b/tfx_bsl/beam/bsl_util.py new file mode 100644 index 00000000..4c86c745 --- /dev/null +++ b/tfx_bsl/beam/bsl_util.py @@ -0,0 +1,127 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""TFX-BSL util""" + +from __future__ import absolute_import +from __future__ import division +# Standard __future__ imports +from __future__ import print_function + +import numpy as np +import pyarrow as pa +import pandas as pd +import base64 +import json +import typing +from typing import Dict, List, Text, Any, Set, Mapping, Optional +from tfx_bsl.beam.bsl_constants import _RECORDBATCH_COLUMN + +_KERAS_INPUT_SUFFIX = '_input' + +def ExtractSerializedExamplesFromRecordBatch(elements: pa.RecordBatch) -> List[Text]: + serialized_examples = None + for column_name, column_array in zip(elements.schema.names, elements.columns): + if column_name == _RECORDBATCH_COLUMN: + column_type = column_array.flatten().type + if not (pa.types.is_binary(column_type) or pa.types.is_string(column_type)): + raise ValueError( + 'Expected a list of serialized examples in bytes or as a string, got %s' % + type(example)) + serialized_examples = column_array.flatten().to_pylist() + break + + if not serialized_examples: + raise ValueError('Raw examples not found.') + + return serialized_examples + + +def RecordToJSON( + record_batch: pa.RecordBatch, prepare_instances_serialized) -> List[Mapping[Text, Any]]: + """Returns a list of JSON dictionaries translated from `record_batch`. + + The conversion will take in a recordbatch that contains features from a + tf.train.Example and will return a list of dict like string (JSON) where + each item is a JSON representation of an example. + + Return: + List of JSON dictionaries + - format: [{ feature1: value1, feature2: [value2_1, value2_2]... }, ...] + + Args: + record_batch: input RecordBatch. + """ + + # TODO (b/155912552): Handle this for sequence example. + df = record_batch.to_pandas() + if prepare_instances_serialized: + return [{'b64': base64.b64encode(value).decode()} for value in df[_RECORDBATCH_COLUMN]] + else: + as_binary = df.columns.str.endswith("_bytes") + df.loc[:, as_binary] = df.loc[:, as_binary].applymap( + lambda feature: [{'b64': base64.b64encode(value).decode()} for value in feature]) + + if _RECORDBATCH_COLUMN in df.columns: + df = df.drop(labels=_RECORDBATCH_COLUMN, axis=1) + df = df.applymap(lambda values: values[0] if len(values) == 1 else values) + return json.loads(df.to_json(orient='records')) + + +# TODO: Reuse these functions in TFMA. +def _find_input_name_in_features(features: Set[Text], + input_name: Text) -> Optional[Text]: + """Maps input name to an entry in features. Returns None if not found.""" + if input_name in features: + return input_name + # Some keras models prepend '_input' to the names of the inputs + # so try under '_input' as well. + elif (input_name.endswith(_KERAS_INPUT_SUFFIX) and + input_name[:-len(_KERAS_INPUT_SUFFIX)] in features): + return input_name[:-len(_KERAS_INPUT_SUFFIX)] + return None + + +def filter_tensors_by_input_names( + tensors: Dict[Text, Any], + input_names: List[Text]) -> Optional[Dict[Text, Any]]: + """Filter tensors by input names. + In case we don't find the specified input name in the tensors and there + exists only one input name, we assume we are feeding serialized examples to + the model and return None. + Args: + tensors: Dict of tensors. + input_names: List of input names. + Returns: + Filtered tensors. + Raises: + RuntimeError: When the specified input tensor cannot be found. + """ + + if not input_names: + return None + result = {} + tensor_keys = set(tensors.keys()) + + # The case where the model takes serialized examples as input. + if len(input_names) == 1 and _find_input_name_in_features(tensor_keys, input_names[0]): + return None + + for name in input_names: + tensor_name = _find_input_name_in_features(tensor_keys, name) + if tensor_name is None: + raise RuntimeError( + 'Input tensor not found: {}. Existing keys: {}.'.format( + name, ','.join(tensors.keys()))) + result[name] = tensors[tensor_name] + return result diff --git a/tfx_bsl/beam/bsl_util_test.py b/tfx_bsl/beam/bsl_util_test.py new file mode 100644 index 00000000..c1a63b0d --- /dev/null +++ b/tfx_bsl/beam/bsl_util_test.py @@ -0,0 +1,91 @@ +# Copyright 2019 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for tfx_bsl.bsl_util.""" + +from __future__ import absolute_import +from __future__ import division +# Standard __future__ imports +from __future__ import print_function + +import base64 +import json +import os +try: + import unittest.mock as mock +except ImportError: + import mock + +import apache_beam as beam +import pyarrow as pa +import tensorflow as tf +from google.protobuf import text_format +from tfx_bsl.beam import bsl_util +from tfx_bsl.beam.bsl_constants import _RECORDBATCH_COLUMN + + +class TestBslUtil(tf.test.TestCase): + def test_request_body_with_binary_data(self): + record_batch_remote = pa.RecordBatch.from_arrays( + [ + pa.array([["ASa8asdf", "ASa8asdf"]], type=pa.list_(pa.binary())), + pa.array([["JLK7ljk3"]], type=pa.list_(pa.utf8())), + pa.array([[1, 2]], type=pa.list_(pa.int32())), + pa.array([[4.5, 5, 5.5]], type=pa.list_(pa.float32())) + ], + ['x_bytes', 'x', 'y', 'z'] + ) + + result = list(bsl_util.RecordToJSON(record_batch_remote, False)) + self.assertEqual([ + { + 'x_bytes': [ + {'b64': 'QVNhOGFzZGY='}, + {'b64': 'QVNhOGFzZGY='} + ], + 'x': 'JLK7ljk3', + 'y': [1, 2], + 'z': [4.5, 5, 5.5] + }, + ], result) + + def test_request_serialized_example(self): + example = text_format.Parse( + """ + features { + feature { key: "x_bytes" value { bytes_list { value: ["ASa8asdf"] }}} + feature { key: "x" value { bytes_list { value: "JLK7ljk3" }}} + feature { key: "y" value { int64_list { value: [1, 2] }}} + } + """, tf.train.Example()) + + serialized_example_remote = [example.SerializeToString()] + record_batch_remote = pa.RecordBatch.from_arrays( + [ + pa.array([["ASa8asdf"]], type=pa.list_(pa.binary())), + pa.array([["JLK7ljk3"]], type=pa.list_(pa.utf8())), + pa.array([[1, 2]], type=pa.list_(pa.int32())), + pa.array([[4.5, 5, 5.5]], type=pa.list_(pa.float32())), + serialized_example_remote + ], + ['x_bytes', 'x', 'y', 'z', _RECORDBATCH_COLUMN] + ) + + result = list(bsl_util.RecordToJSON(record_batch_remote, True)) + self.assertEqual(result, [{ + 'b64': base64.b64encode(example.SerializeToString()).decode() + }]) + + +if __name__ == '__main__': + tf.test.main() diff --git a/tfx_bsl/beam/run_inference.py b/tfx_bsl/beam/run_inference.py index 37f8e91f..87bc2a66 100644 --- a/tfx_bsl/beam/run_inference.py +++ b/tfx_bsl/beam/run_inference.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Run batch inference on saved model.""" +"""Run batch inference on saved model with private APIs of inference.""" from __future__ import absolute_import from __future__ import division @@ -19,7 +19,6 @@ from __future__ import print_function import abc -import base64 import collections import os import platform @@ -32,6 +31,7 @@ from absl import logging import apache_beam as beam +import pyarrow as pa from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.utils import retry @@ -39,13 +39,22 @@ from googleapiclient import discovery from googleapiclient import http import numpy as np +import json import six import tensorflow as tf from tfx_bsl.beam import shared +from tfx_bsl.beam import bsl_util from tfx_bsl.public.proto import model_spec_pb2 from tfx_bsl.telemetry import util -from typing import Any, Generator, Iterable, List, Mapping, Sequence, Text, \ - Tuple, Union +from tfx_bsl.tfxio import test_util +from tfx_bsl.tfxio import tensor_adapter +from tfx_bsl.tfxio import tf_example_record +from tfx_bsl.tfxio import tf_sequence_example_record +from typing import Any, Generator, Iterable, List, Mapping, Optional, \ + Sequence, Text, TypeVar, Tuple, Union + +from tfx_bsl.beam.bsl_constants import _RECORDBATCH_COLUMN +from tfx_bsl.beam.bsl_constants import DataType # TODO(b/140306674): stop using the internal TF API. from tensorflow.python.saved_model import loader_impl @@ -53,7 +62,7 @@ from tensorflow_serving.apis import inference_pb2 from tensorflow_serving.apis import prediction_log_pb2 from tensorflow_serving.apis import regression_pb2 - +from tensorflow_metadata.proto.v0 import schema_pb2 # TODO(b/131873699): Remove once 1.x support is dropped. # pylint: disable=g-import-not-at-top @@ -64,6 +73,7 @@ except ImportError: pass + _DEFAULT_INPUT_KEY = 'examples' _METRICS_DESCRIPTOR_INFERENCE = 'BulkInferrer' _METRICS_DESCRIPTOR_IN_PROCESS = 'InProcess' @@ -73,19 +83,12 @@ _SECOND_TO_MICROSECOND = 1000000 _REMOTE_INFERENCE_NUM_RETRIES = 5 -# We define the following aliases of Any because the actual types are not -# public. +# We define the following aliases of Any because the actual types are not public. _SignatureDef = Any _MetaGraphDef = Any _SavedModel = Any -_BulkInferResult = Union[prediction_log_pb2.PredictLog, - Tuple[tf.train.Example, regression_pb2.Regression], - Tuple[tf.train.Example, - inference_pb2.MultiInferenceResponse], - Tuple[tf.train.Example, - classification_pb2.Classifications]] - +MixedExample = TypeVar('MixedExample', tf.train.Example, tf.train.SequenceExample) # TODO(b/151468119): Converts this into enum once we stop supporting Python 2.7 class OperationType(object): @@ -96,18 +99,121 @@ class OperationType(object): @beam.ptransform_fn -@beam.typehints.with_input_types(Union[tf.train.Example, - tf.train.SequenceExample]) +@beam.typehints.with_input_types(MixedExample) @beam.typehints.with_output_types(prediction_log_pb2.PredictionLog) -def RunInferenceImpl( # pylint: disable=invalid-name +def RunInferenceOnExamples( # pylint: disable=invalid-name examples: beam.pvalue.PCollection, - inference_spec_type: model_spec_pb2.InferenceSpecType + inference_spec_type: model_spec_pb2.InferenceSpecType, + schema: Optional[schema_pb2.Schema] = None ) -> beam.pvalue.PCollection: - """Implementation of RunInference API. + """Run inference with a model. + + There are two types of inference you can perform using this PTransform: + 1. In-process inference from a SavedModel instance. Used when + `saved_model_spec` field is set in `inference_spec_type`. + 2. Remote inference by using a service endpoint. Used when + `ai_platform_prediction_model_spec` field is set in + `inference_spec_type`. + + TODO(b/131873699): Add support for the following features: + 1. Bytes as Input. + 2. PTable Input. + 3. Models as SideInput. Args: examples: A PCollection containing examples. inference_spec_type: Model inference endpoint. + Schema [optional]: required for models that requires + multi-tensor inputs. + + Returns: + A PCollection containing prediction logs. + """ + tensor_adapter_config = None + operation_type = _get_operation_type(inference_spec_type) + proximity_descriptor = ( + _METRICS_DESCRIPTOR_IN_PROCESS + if _using_in_process_inference(inference_spec_type) + else _METRICS_DESCRIPTOR_CLOUD_AI_PREDICTION) + + if (operation_type == OperationType.CLASSIFICATION or + operation_type == OperationType.REGRESSION or + operation_type == OperationType.MULTIHEAD): + typed_examples = examples | AssertType(tf.train.Example, operation_type) + converter = tf_example_record.TFExampleBeamRecord( + physical_format="inmem", + telemetry_descriptors=[ + _METRICS_DESCRIPTOR_INFERENCE, + operation_type, proximity_descriptor], + schema=schema, + raw_record_column_name=_RECORDBATCH_COLUMN) + + return (examples + | 'ParseExamples' >> beam.Map(lambda element: element.SerializeToString()) + | 'ConvertToRecordBatch' >> converter.BeamSource() + | 'RunInferenceImpl' >> _RunInferenceOnRecordBatch( + inference_spec_type, DataType.EXAMPLE, + tensor_adapter_config=tensor_adapter_config)) + else: + # TODO: check if there are two types of input data in PREDICT Operation + ExampleConverter = tf_example_record.TFExampleBeamRecord( + physical_format="inmem", + telemetry_descriptors=[ + _METRICS_DESCRIPTOR_INFERENCE, + operation_type, proximity_descriptor], + schema=schema, + raw_record_column_name=_RECORDBATCH_COLUMN) + SequenceConverter = tf_sequence_example_record.TFSequenceExampleBeamRecord( + physical_format="inmem", + telemetry_descriptors=[ + _METRICS_DESCRIPTOR_INFERENCE, + operation_type, proximity_descriptor], + schema=schema, + raw_record_column_name=_RECORDBATCH_COLUMN) + + tagged = (examples | "SortInput" >> beam.Map( + lambda example: beam.pvalue.TaggedOutput( + 'example' if isinstance(example, tf.train.Example) + else 'sequence', example)).with_outputs('example', 'sequence')) + + if schema: + tensor_adapter_config = tensor_adapter.TensorAdapterConfig( + arrow_schema=ExampleConverter.ArrowSchema(), + tensor_representations=ExampleConverter.TensorRepresentations()) + + return ([ + (tagged.example + | 'ParseExamples' >> beam.Map(lambda example: example.SerializeToString()) + | 'ConvertExampleToRecordBatch' >> ExampleConverter.BeamSource() + | 'RunInferenceImplExample' >> _RunInferenceOnRecordBatch( + inference_spec_type, DataType.EXAMPLE, + tensor_adapter_config=tensor_adapter_config)), + (tagged.sequence + | 'ParseSequenceExamples' >> beam.Map(lambda example: example.SerializeToString()) + | 'ConvertSequenceToRecordBatch' >> SequenceConverter.BeamSource() + | 'RunInferenceImplSequence' >> _RunInferenceOnRecordBatch( + inference_spec_type, DataType.SEQUENCEEXAMPLE, + tensor_adapter_config=tensor_adapter_config)) + ] | 'FlattenResult' >> beam.Flatten()) + + +@beam.ptransform_fn +@beam.typehints.with_input_types(pa.RecordBatch) +@beam.typehints.with_output_types(prediction_log_pb2.PredictionLog) +def _RunInferenceOnRecordBatch( # pylint: disable=invalid-name + examples: beam.pvalue.PCollection, + inference_spec_type: model_spec_pb2.InferenceSpecType, data_type: Text, + tensor_adapter_config: Optional[tensor_adapter.TensorAdapterConfig] = None +) -> beam.pvalue.PCollection: + """Implementation of RunInference API. + + Args: + examples: A PCollection containing RecordBatch of serialized examples and features. + inference_spec_type: Model inference endpoint. + tensor_adapter_config [Optional]: Tensor adapter config which specifies how to + obtain tensors from the Arrow RecordBatch. + - Not required when running inference with remote model or + serialized example as the single input tensor Returns: A PCollection containing prediction logs. @@ -117,39 +223,41 @@ def RunInferenceImpl( # pylint: disable=invalid-name """ logging.info('RunInference on model: %s', inference_spec_type) - batched_examples = examples | 'BatchExamples' >> beam.BatchElements() operation_type = _get_operation_type(inference_spec_type) if operation_type == OperationType.CLASSIFICATION: - return batched_examples | 'Classify' >> _Classify(inference_spec_type) + return examples | 'Classify' >> _Classify( + inference_spec_type, data_type, tensor_adapter_config) elif operation_type == OperationType.REGRESSION: - return batched_examples | 'Regress' >> _Regress(inference_spec_type) + return examples | 'Regress' >> _Regress( + inference_spec_type, data_type, tensor_adapter_config) elif operation_type == OperationType.PREDICTION: - return batched_examples | 'Predict' >> _Predict(inference_spec_type) + return examples | 'Predict' >> _Predict( + inference_spec_type, data_type, tensor_adapter_config) elif operation_type == OperationType.MULTIHEAD: - return (batched_examples - | 'MultiInference' >> _MultiInference(inference_spec_type)) + return (examples | 'MultiInference' >> _MultiInference( + inference_spec_type, data_type, tensor_adapter_config)) else: raise ValueError('Unsupported operation_type %s' % operation_type) _IOTensorSpec = collections.namedtuple( '_IOTensorSpec', - ['input_tensor_alias', 'input_tensor_name', 'output_alias_tensor_names']) + ['input_tensor_alias', 'input_tensor_names', 'input_tensor_types', 'output_alias_tensor_names']) _Signature = collections.namedtuple('_Signature', ['name', 'signature_def']) @beam.ptransform_fn -@beam.typehints.with_input_types(Union[tf.train.Example, - tf.train.SequenceExample]) +@beam.typehints.with_input_types(pa.RecordBatch) @beam.typehints.with_output_types(prediction_log_pb2.PredictionLog) def _Classify(pcoll: beam.pvalue.PCollection, # pylint: disable=invalid-name - inference_spec_type: model_spec_pb2.InferenceSpecType): + inference_spec_type: model_spec_pb2.InferenceSpecType, data_type, + tensor_adapter_config: Optional[tensor_adapter.TensorAdapterConfig] = None): """Performs classify PTransform.""" if _using_in_process_inference(inference_spec_type): return (pcoll - | 'Classify' >> beam.ParDo( - _BatchClassifyDoFn(inference_spec_type, shared.Shared())) + | 'Classify' >> beam.ParDo(_BatchClassifyDoFn( + inference_spec_type, shared.Shared(), data_type, tensor_adapter_config)) | 'BuildPredictionLogForClassifications' >> beam.ParDo( _BuildPredictionLogForClassificationsDoFn())) else: @@ -157,16 +265,16 @@ def _Classify(pcoll: beam.pvalue.PCollection, # pylint: disable=invalid-name @beam.ptransform_fn -@beam.typehints.with_input_types(Union[tf.train.Example, - tf.train.SequenceExample]) +@beam.typehints.with_input_types(pa.RecordBatch) @beam.typehints.with_output_types(prediction_log_pb2.PredictionLog) def _Regress(pcoll: beam.pvalue.PCollection, # pylint: disable=invalid-name - inference_spec_type: model_spec_pb2.InferenceSpecType): + inference_spec_type: model_spec_pb2.InferenceSpecType, data_type, + tensor_adapter_config: Optional[tensor_adapter.TensorAdapterConfig] = None): """Performs regress PTransform.""" if _using_in_process_inference(inference_spec_type): return (pcoll - | 'Regress' >> beam.ParDo( - _BatchRegressDoFn(inference_spec_type, shared.Shared())) + | 'Regress' >> beam.ParDo(_BatchRegressDoFn( + inference_spec_type, shared.Shared(), data_type, tensor_adapter_config)) | 'BuildPredictionLogForRegressions' >> beam.ParDo( _BuildPredictionLogForRegressionsDoFn())) else: @@ -174,39 +282,39 @@ def _Regress(pcoll: beam.pvalue.PCollection, # pylint: disable=invalid-name @beam.ptransform_fn -@beam.typehints.with_input_types(Union[tf.train.Example, - tf.train.SequenceExample]) +@beam.typehints.with_input_types(pa.RecordBatch) @beam.typehints.with_output_types(prediction_log_pb2.PredictionLog) def _Predict(pcoll: beam.pvalue.PCollection, # pylint: disable=invalid-name - inference_spec_type: model_spec_pb2.InferenceSpecType): + inference_spec_type: model_spec_pb2.InferenceSpecType, data_type, + tensor_adapter_config: Optional[tensor_adapter.TensorAdapterConfig] = None): """Performs predict PTransform.""" if _using_in_process_inference(inference_spec_type): predictions = ( pcoll - | 'Predict' >> beam.ParDo( - _BatchPredictDoFn(inference_spec_type, shared.Shared()))) + | 'Predict' >> beam.ParDo(_BatchPredictDoFn( + inference_spec_type, shared.Shared(), data_type, tensor_adapter_config))) else: predictions = ( pcoll - | 'RemotePredict' >> beam.ParDo( - _RemotePredictDoFn(inference_spec_type, pcoll.pipeline.options))) + | 'RemotePredict' >> beam.ParDo(_RemotePredictDoFn( + inference_spec_type, pcoll.pipeline.options, data_type))) return (predictions | 'BuildPredictionLogForPredictions' >> beam.ParDo( _BuildPredictionLogForPredictionsDoFn())) @beam.ptransform_fn -@beam.typehints.with_input_types(Union[tf.train.Example, - tf.train.SequenceExample]) +@beam.typehints.with_input_types(pa.RecordBatch) @beam.typehints.with_output_types(prediction_log_pb2.PredictionLog) def _MultiInference(pcoll: beam.pvalue.PCollection, # pylint: disable=invalid-name - inference_spec_type: model_spec_pb2.InferenceSpecType): + inference_spec_type: model_spec_pb2.InferenceSpecType, data_type, + tensor_adapter_config: Optional[tensor_adapter.TensorAdapterConfig] = None): """Performs multi inference PTransform.""" if _using_in_process_inference(inference_spec_type): return ( pcoll - | 'MultiInference' >> beam.ParDo( - _BatchMultiInferenceDoFn(inference_spec_type, shared.Shared())) + | 'MultiInference' >> beam.ParDo(_BatchMultiInferenceDoFn( + inference_spec_type, shared.Shared(), data_type, tensor_adapter_config)) | 'BuildMultiInferenceLog' >> beam.ParDo(_BuildMultiInferenceLogDoFn())) else: raise NotImplementedError @@ -261,32 +369,51 @@ def update_metrics_with_cache(self): self._model_byte_size.update(self.model_byte_size_cache) self.model_byte_size_cache = None - def update(self, elements: List[Union[tf.train.Example, - tf.train.SequenceExample]], - latency_micro_secs: int) -> None: + # For feature inputs, using serialized example for batch size + def update( + self, elements: List[Union[str, bytes]], latency_micro_secs: int) -> None: self._inference_batch_latency_micro_secs.update(latency_micro_secs) self._num_instances.inc(len(elements)) self._inference_counter.inc(len(elements)) self._inference_request_batch_size.update(len(elements)) self._inference_request_batch_byte_size.update( - sum(element.ByteSize() for element in elements)) + sum(len(element) for element in elements)) + - def __init__(self, inference_spec_type: model_spec_pb2.InferenceSpecType): + def __init__( + self, inference_spec_type: model_spec_pb2.InferenceSpecType): super(_BaseDoFn, self).__init__() self._clock = None + self.inference_spec_type = inference_spec_type self._metrics_collector = self._MetricsCollector(inference_spec_type) def setup(self): self._clock = _ClockFactory.make_clock() - def process( - self, elements: List[Union[tf.train.Example, tf.train.SequenceExample]] - ) -> Iterable[Any]: + def _extract_serialized_from_recordBatch( + self, elements: pa.RecordBatch) -> List[Union[str, bytes]]: + """Function to extract serialized examples from the recordbatch""" + serialized_examples = bsl_util.ExtractSerializedExamplesFromRecordBatch(elements) + return serialized_examples + + @abc.abstractmethod + def _extract_inference_input_from_recordBatch( + self, elements: pa.RecordBatch) -> Union[Mapping[Any, Any], List[Mapping[Any, Any]]]: + """Function to extract the compatible input with model signature + + return: + - model input for processing and post processing + """ + raise NotImplementedError + + def process(self, elements: pa.RecordBatch) -> Iterable[Any]: batch_start_time = self._clock.get_current_time_in_microseconds() - outputs = self.run_inference(elements) - result = self._post_process(elements, outputs) + serialized_examples = self._extract_serialized_from_recordBatch(elements) + model_input = self._extract_inference_input_from_recordBatch(elements) + outputs = self.run_inference(model_input) + result = self._post_process(model_input, outputs) self._metrics_collector.update( - elements, + serialized_examples, self._clock.get_current_time_in_microseconds() - batch_start_time) return result @@ -295,14 +422,21 @@ def finish_bundle(self): @abc.abstractmethod def run_inference( - self, elements: List[Union[tf.train.Example, tf.train.SequenceExample]] + self, tensors: Mapping[Text, Any] ) -> Union[Mapping[Text, np.ndarray], Sequence[Mapping[Text, Any]]]: + """Run inference with extracted model input. + + Parameters: + tensors: a dictionary consists of tensor names and tensors + in the form of ndArray, SparceTensorValues, etc. + - ex: { 'x': SparseTensorValue } + { 'y': [[1, 2, 3], [3, 4, 5] ...] } + """ raise NotImplementedError @abc.abstractmethod - def _post_process(self, elements: List[Union[tf.train.Example, - tf.train.SequenceExample]], - outputs: Any) -> Iterable[Any]: + def _post_process( + self, elements: Mapping[Any, Any], outputs: Any) -> Iterable[Any]: raise NotImplementedError @@ -322,8 +456,7 @@ def _retry_on_unavailable_and_resource_error_filter(exception: Exception): exception.resp.status in (503, 429)) -@beam.typehints.with_input_types(List[Union[tf.train.Example, - tf.train.SequenceExample]]) +@beam.typehints.with_input_types(pa.RecordBatch) # Using output typehints triggers NotImplementedError('BEAM-2717)' on # streaming mode on Dataflow runner. # TODO(b/151468119): Consider to re-batch with online serving request size @@ -350,11 +483,10 @@ class _RemotePredictDoFn(_BaseDoFn): """ def __init__(self, inference_spec_type: model_spec_pb2.InferenceSpecType, - pipeline_options: PipelineOptions): + pipeline_options: PipelineOptions, data_type: Text): super(_RemotePredictDoFn, self).__init__(inference_spec_type) - self._ai_platform_prediction_model_spec = ( - inference_spec_type.ai_platform_prediction_model_spec) self._api_client = None + self._data_type = data_type project_id = ( inference_spec_type.ai_platform_prediction_model_spec.project_id or @@ -383,14 +515,20 @@ def setup(self): # user agent once custom header is supported in googleapiclient. self._api_client = discovery.build('ml', 'v1') + def _extract_inference_input_from_recordBatch( + self, elements: pa.RecordBatch) -> List[Mapping[Any, Any]]: + prepare_instances_serialized = ( + self.inference_spec_type.ai_platform_prediction_model_spec.use_serialization_config) + model_input = bsl_util.RecordToJSON(elements, prepare_instances_serialized) + return model_input + # Retry _REMOTE_INFERENCE_NUM_RETRIES times with exponential backoff. @retry.with_exponential_backoff( initial_delay_secs=1.0, num_retries=_REMOTE_INFERENCE_NUM_RETRIES, retry_filter=_retry_on_unavailable_and_resource_error_filter) def _execute_request( - self, - request: http.HttpRequest) -> Mapping[Text, Sequence[Mapping[Text, Any]]]: + self, request: http.HttpRequest) -> Mapping[Text, Sequence[Mapping[Text, Any]]]: result = request.execute() if 'error' in result: raise ValueError(result['error']) @@ -400,80 +538,28 @@ def _make_request(self, body: Mapping[Text, List[Any]]) -> http.HttpRequest: return self._api_client.projects().predict( name=self._full_model_name, body=body) - def _prepare_instances_dict( - self, elements: List[tf.train.Example] - ) -> Generator[Mapping[Text, Any], None, None]: - """Prepare instances by converting features to dictionary.""" - for example in elements: - # TODO(b/151468119): support tf.train.SequenceExample - if not isinstance(example, tf.train.Example): - raise ValueError('Remote prediction only supports tf.train.Example') - - instance = {} - for input_name, feature in example.features.feature.items(): - attr_name = feature.WhichOneof('kind') - if attr_name is None: - continue - attr = getattr(feature, attr_name) - values = self._parse_feature_content( - attr.value, attr_name, self._sending_as_binary(input_name)) - # Flatten a sequence if its length is 1 - values = (values[0] if len(values) == 1 else values) - instance[input_name] = values - yield instance - - def _prepare_instances_serialized( - self, elements: List[tf.train.Example] - ) -> Generator[Mapping[Text, Text], None, None]: - """Prepare instances by base64 encoding serialized examples.""" - for example in elements: - yield {'b64': base64.b64encode(example.SerializeToString()).decode()} - + @classmethod def _prepare_instances( - self, elements: List[tf.train.Example] + cls, elements: List[Mapping[Any, Any]] ) -> Generator[Mapping[Text, Any], None, None]: - if self._ai_platform_prediction_model_spec.use_serialization_config: - return self._prepare_instances_serialized(elements) - else: - return self._prepare_instances_dict(elements) - - @staticmethod - def _sending_as_binary(input_name: Text) -> bool: - """Whether data should be sent as binary.""" - return input_name.endswith('_bytes') - - @staticmethod - def _parse_feature_content(values: Sequence[Any], attr_name: Text, - as_binary: bool) -> List[Any]: - """Parse the content of tf.train.Feature object. - - If bytes_list, parse a list of bytes-like objects to a list of strings so - that it would be JSON serializable. + for instance in elements: + yield instance - If float_list or int64_list, do nothing. - - If data should be sent as binary, mark it as binary by replacing it with - a single attribute named 'b64'. - """ - if as_binary: - return [{'b64': base64.b64encode(x).decode()} for x in values] - elif attr_name == 'bytes_list': - return [x.decode() for x in values] - else: - # Converts proto RepeatedScalarContainer to list so it is - # JSON-serializable - return list(values) + def _check_elements(self) -> None: + # TODO(b/151468119): support tf.train.SequenceExample + if self._data_type != DataType.EXAMPLE: + raise ValueError('Remote prediction only supports tf.train.Example') def run_inference( - self, elements: List[Union[tf.train.Example, tf.train.SequenceExample]] - ) -> Sequence[Mapping[Text, Any]]: + self, elements: List[Union[str, bytes]]) -> Sequence[Mapping[Text, Any]]: + self._check_elements() body = {'instances': list(self._prepare_instances(elements))} request = self._make_request(body) response = self._execute_request(request) return response['predictions'] def _post_process( - self, elements: List[Union[tf.train.Example, tf.train.SequenceExample]], + self, elements: List[Union[str, bytes]], outputs: Sequence[Mapping[Text, Any]] ) -> Iterable[prediction_log_pb2.PredictLog]: result = [] @@ -495,6 +581,7 @@ def _post_process( # is fixed. # TODO(b/143484017): Add batch_size back off in the case there are functional # reasons large batch sizes cannot be handled. + class _BaseBatchSavedModelDoFn(_BaseDoFn): """A DoFn that runs in-process batch inference with a model. @@ -506,21 +593,21 @@ class _BaseBatchSavedModelDoFn(_BaseDoFn): """ def __init__( - self, - inference_spec_type: model_spec_pb2.InferenceSpecType, - shared_model_handle: shared.Shared, - ): + self, inference_spec_type: model_spec_pb2.InferenceSpecType, + shared_model_handle: shared.Shared, data_type, + tensor_adapter_config: Optional[tensor_adapter.TensorAdapterConfig] = None): super(_BaseBatchSavedModelDoFn, self).__init__(inference_spec_type) self._inference_spec_type = inference_spec_type self._shared_model_handle = shared_model_handle self._model_path = inference_spec_type.saved_model_spec.model_path self._tags = None self._signatures = _get_signatures( - inference_spec_type.saved_model_spec.model_path, - inference_spec_type.saved_model_spec.signature_name, - _get_tags(inference_spec_type)) + inference_spec_type.saved_model_spec.model_path, + inference_spec_type.saved_model_spec.signature_name, + _get_tags(inference_spec_type)) self._session = None - self._io_tensor_spec = None + self._data_type = data_type + self._tensor_adapter_config = tensor_adapter_config def setup(self): """Load the model. @@ -568,69 +655,86 @@ def _pre_process(self) -> _IOTensorSpec: # Pre process functions will validate for each signature. io_tensor_specs = [] for signature in self._signatures: - if len(signature.signature_def.inputs) != 1: - raise ValueError('Signature should have 1 and only 1 inputs') - if (list(signature.signature_def.inputs.values())[0].dtype != + if (len(signature.signature_def.inputs) == 1 and + list(signature.signature_def.inputs.values())[0].dtype != tf.string.as_datatype_enum): raise ValueError( - 'Input dtype is expected to be %s, got %s' % + 'With 1 input, dtype is expected to be %s for serialized examples, got %s' % tf.string.as_datatype_enum, list(signature.signature_def.inputs.values())[0].dtype) io_tensor_specs.append(_signature_pre_process(signature.signature_def)) - input_tensor_name = '' - input_tensor_alias = '' + input_tensor_names = [] + input_tensor_alias = [] + input_tensor_types = {} output_alias_tensor_names = {} for io_tensor_spec in io_tensor_specs: - if not input_tensor_name: - input_tensor_name = io_tensor_spec.input_tensor_name + if not input_tensor_names: + input_tensor_names = io_tensor_spec.input_tensor_names input_tensor_alias = io_tensor_spec.input_tensor_alias - elif input_tensor_name != io_tensor_spec.input_tensor_name: + elif input_tensor_names != io_tensor_spec.input_tensor_names: raise ValueError('Input tensor must be the same for all Signatures.') - for alias, tensor_name in io_tensor_spec.output_alias_tensor_names.items( - ): + for alias, tensor_type in io_tensor_spec.input_tensor_types.items(): + input_tensor_types[alias] = tensor_type + for alias, tensor_name in io_tensor_spec.output_alias_tensor_names.items(): output_alias_tensor_names[alias] = tensor_name - if (not output_alias_tensor_names or not input_tensor_name or + if (not output_alias_tensor_names or not input_tensor_names or not input_tensor_alias): raise ValueError('No valid fetch tensors or feed tensors.') - return _IOTensorSpec(input_tensor_alias, input_tensor_name, - output_alias_tensor_names) + return _IOTensorSpec(input_tensor_alias, input_tensor_names, + input_tensor_types, output_alias_tensor_names) def _has_tpu_tag(self) -> bool: return (len(self._tags) == 2 and tf.saved_model.SERVING in self._tags and tf.saved_model.TPU in self._tags) + def _extract_inference_input_from_recordBatch( + self, elements: pa.RecordBatch) -> Mapping[Any, Any]: + model_input = None + if (len(self._io_tensor_spec.input_tensor_names) == 1): + serialized_examples = bsl_util.ExtractSerializedExamplesFromRecordBatch(elements) + model_input = {self._io_tensor_spec.input_tensor_names[0]: serialized_examples} + else: + if not self._tensor_adapter_config: + raise ValueError('Tensor adaptor config is required with a multi-input model') + + input_tensor_names = self._io_tensor_spec.input_tensor_names + input_tensor_alias = self._io_tensor_spec.input_tensor_alias + _tensor_adapter = tensor_adapter.TensorAdapter(self._tensor_adapter_config) + # dict_of_tensors is a map from input_tensor_alias to tensor + dict_of_tensors = _tensor_adapter.ToBatchTensors( + elements, produce_eager_tensors = False) + filtered_tensors = bsl_util.filter_tensors_by_input_names( + dict_of_tensors, input_tensor_alias) + + model_input = {} + for tensor_alias, tensor_name in zip(input_tensor_alias, input_tensor_names): + model_input[tensor_name] = filtered_tensors[tensor_alias] + return model_input + def run_inference( - self, elements: List[Union[tf.train.Example, tf.train.SequenceExample]] - ) -> Mapping[Text, np.ndarray]: - self._check_elements(elements) - outputs = self._run_tf_operations(elements) + self, tensors: Mapping[Text, Any]) -> Mapping[Text, np.ndarray]: + # tensors: a dictionary consists of tensor alias and tensors + self._check_elements() + outputs = self._run_tf_operations(tensors) return outputs def _run_tf_operations( - self, elements: List[Union[tf.train.Example, tf.train.SequenceExample]] - ) -> Mapping[Text, np.ndarray]: - input_values = [] - for element in elements: - input_values.append(element.SerializeToString()) + self, tensors: Mapping[Text, Any]) -> Mapping[Text, np.ndarray]: result = self._session.run( - self._io_tensor_spec.output_alias_tensor_names, - feed_dict={self._io_tensor_spec.input_tensor_name: input_values}) + self._io_tensor_spec.output_alias_tensor_names, feed_dict=tensors) if len(result) != len(self._io_tensor_spec.output_alias_tensor_names): raise RuntimeError('Output length does not match fetches') return result - def _check_elements( - self, elements: List[Union[tf.train.Example, - tf.train.SequenceExample]]) -> None: + def _check_elements(self) -> None: """Unimplemented.""" raise NotImplementedError -@beam.typehints.with_input_types(List[Union[tf.train.Example, - tf.train.SequenceExample]]) -@beam.typehints.with_output_types(Tuple[tf.train.Example, - classification_pb2.Classifications]) +@beam.typehints.with_input_types(pa.RecordBatch) +@beam.typehints.with_output_types(Tuple[Union[str, bytes], + classification_pb2.Classifications]) class _BatchClassifyDoFn(_BaseBatchSavedModelDoFn): """A DoFn that run inference on classification model.""" @@ -643,47 +747,44 @@ def setup(self): signature_def.method_name) super(_BatchClassifyDoFn, self).setup() - def _check_elements( - self, elements: List[Union[tf.train.Example, - tf.train.SequenceExample]]) -> None: - if not all(isinstance(element, tf.train.Example) for element in elements): + def _check_elements(self) -> None: + if self._data_type != DataType.EXAMPLE: raise ValueError('Classify only supports tf.train.Example') def _post_process( - self, elements: Sequence[tf.train.Example], outputs: Mapping[Text, - np.ndarray] - ) -> Iterable[Tuple[tf.train.Example, classification_pb2.Classifications]]: + self, elements: Mapping[Any, Any], + outputs: Mapping[Text, np.ndarray] + ) -> Iterable[Tuple[Union[str, bytes], classification_pb2.Classifications]]: + serialized_examples, = elements.values() classifications = _post_process_classify( - self._io_tensor_spec.output_alias_tensor_names, elements, outputs) - return zip(elements, classifications) + self._io_tensor_spec.output_alias_tensor_names, + serialized_examples, outputs) + return zip(serialized_examples, classifications) -@beam.typehints.with_input_types(List[Union[tf.train.Example, - tf.train.SequenceExample]]) -@beam.typehints.with_output_types(Tuple[tf.train.Example, - regression_pb2.Regression]) +@beam.typehints.with_input_types(pa.RecordBatch) +@beam.typehints.with_output_types(Tuple[Union[str, bytes], + regression_pb2.Regression]) class _BatchRegressDoFn(_BaseBatchSavedModelDoFn): """A DoFn that run inference on regression model.""" def setup(self): super(_BatchRegressDoFn, self).setup() - def _check_elements( - self, elements: List[Union[tf.train.Example, - tf.train.SequenceExample]]) -> None: - if not all(isinstance(element, tf.train.Example) for element in elements): + def _check_elements(self) -> None: + if self._data_type != DataType.EXAMPLE: raise ValueError('Regress only supports tf.train.Example') def _post_process( - self, elements: Sequence[tf.train.Example], outputs: Mapping[Text, - np.ndarray] - ) -> Iterable[Tuple[tf.train.Example, regression_pb2.Regression]]: - regressions = _post_process_regress(elements, outputs) - return zip(elements, regressions) + self, elements: Mapping[Any, Any], + outputs: Mapping[Text, np.ndarray] + ) -> Iterable[Tuple[Union[str, bytes], regression_pb2.Regression]]: + serialized_examples, = elements.values() + regressions = _post_process_regress(serialized_examples, outputs) + return zip(serialized_examples, regressions) -@beam.typehints.with_input_types(List[Union[tf.train.Example, - tf.train.SequenceExample]]) +@beam.typehints.with_input_types(pa.RecordBatch) @beam.typehints.with_output_types(prediction_log_pb2.PredictLog) class _BatchPredictDoFn(_BaseBatchSavedModelDoFn): """A DoFn that runs inference on predict model.""" @@ -697,81 +798,137 @@ def setup(self): signature_def.method_name) super(_BatchPredictDoFn, self).setup() - def _check_elements( - self, elements: List[Union[tf.train.Example, - tf.train.SequenceExample]]) -> None: + def _check_elements(self) -> None: pass def _post_process( - self, elements: Union[Sequence[tf.train.Example], - Sequence[tf.train.SequenceExample]], + self, elements: Mapping[Any, Any], outputs: Mapping[Text, np.ndarray] ) -> Iterable[prediction_log_pb2.PredictLog]: + if not self._io_tensor_spec.input_tensor_types: + raise ValueError('No valid tensor types.') + input_tensor_names = self._io_tensor_spec.input_tensor_names input_tensor_alias = self._io_tensor_spec.input_tensor_alias + input_tensor_types = self._io_tensor_spec.input_tensor_types signature_name = self._signatures[0].name - batch_size = len(elements) - for output_alias, output in outputs.items(): - if len(output.shape) < 1 or output.shape[0] != batch_size: - raise ValueError( - 'Expected output tensor %s to have at least one ' - 'dimension, with the first having a size equal to the input batch ' - 'size %s. Instead found %s' % - (output_alias, batch_size, output.shape)) - predict_log_tmpl = prediction_log_pb2.PredictLog() - predict_log_tmpl.request.model_spec.signature_name = signature_name - predict_log_tmpl.response.model_spec.signature_name = signature_name - input_tensor_proto = predict_log_tmpl.request.inputs[input_tensor_alias] - input_tensor_proto.dtype = tf.string.as_datatype_enum - input_tensor_proto.tensor_shape.dim.add().size = 1 + + if len(input_tensor_alias) != len(input_tensor_names): + raise ValueError('Expected to have one name and one alias per tensor') result = [] - for i in range(batch_size): - predict_log = prediction_log_pb2.PredictLog() - predict_log.CopyFrom(predict_log_tmpl) - predict_log.request.inputs[input_tensor_alias].string_val.append( - elements[i].SerializeToString()) + # Single tensor input + if len(input_tensor_names) == 1: + serialized_examples, = elements.values() + batch_size = len(serialized_examples) + + predict_log_tmpl = prediction_log_pb2.PredictLog() + predict_log_tmpl.request.model_spec.signature_name = signature_name + predict_log_tmpl.response.model_spec.signature_name = signature_name + input_tensor_proto = predict_log_tmpl.request.inputs[input_tensor_alias[0]] + input_tensor_proto.dtype = tf.string.as_datatype_enum + input_tensor_proto.tensor_shape.dim.add().size = 1 + for output_alias, output in outputs.items(): - # Mimic tensor::Split - tensor_proto = tf.make_tensor_proto( - values=output[i], - dtype=tf.as_dtype(output[i].dtype).as_datatype_enum, - shape=np.expand_dims(output[i], axis=0).shape) - predict_log.response.outputs[output_alias].CopyFrom(tensor_proto) - result.append(predict_log) + if len(output.shape) < 1 or output.shape[0] != batch_size: + raise ValueError( + 'Expected output tensor %s to have at least one ' + 'dimension, with the first having a size equal to the input batch ' + 'size %s. Instead found %s' % + (output_alias, batch_size, output.shape)) + + for i in range(batch_size): + predict_log = prediction_log_pb2.PredictLog() + predict_log.CopyFrom(predict_log_tmpl) + predict_log.request.inputs[input_tensor_alias[0]].string_val.append( + serialized_examples[i]) + for output_alias, output in outputs.items(): + # Mimic tensor::Split + tensor_proto = tf.make_tensor_proto( + values=output[i], + dtype=tf.as_dtype(output[i].dtype).as_datatype_enum, + shape=np.expand_dims(output[i], axis=0).shape) + predict_log.response.outputs[output_alias].CopyFrom(tensor_proto) + result.append(predict_log) + else: + predict_log_tmpl = prediction_log_pb2.PredictLog() + predict_log_tmpl.request.model_spec.signature_name = signature_name + predict_log_tmpl.response.model_spec.signature_name = signature_name + + # we will only include tensor_proto in requests when all input tensors are dense + include_request = True + for tensor_name, tensor in elements.items(): + if not isinstance(tensor, np.ndarray): + include_request = False + break + + if include_request: + for alias, tensor_name in zip(input_tensor_alias, input_tensor_names): + input_tensor_proto = predict_log_tmpl.request.inputs[alias] + input_tensor_proto.dtype = tf.as_dtype(input_tensor_types[alias]).as_datatype_enum + input_tensor_proto.tensor_shape.dim.add().size = len(elements[tensor_name][0]) + + batch_size = len(elements[input_tensor_names[0]]) + for i in range(batch_size): + predict_log = prediction_log_pb2.PredictLog() + predict_log.CopyFrom(predict_log_tmpl) + for alias, tensor_name in zip(input_tensor_alias, input_tensor_names): + predict_log.request.inputs[alias].float_val.append( + elements[tensor_name][i]) + else: + batch_size = elements[input_tensor_names[0]].shape[0] + predict_log = prediction_log_pb2.PredictLog() + predict_log.CopyFrom(predict_log_tmpl) + + for output_alias, output in outputs.items(): + if len(output.shape) < 1 or output.shape[0] != batch_size: + raise ValueError( + 'Expected output tensor %s to have at least one ' + 'dimension, with the first having a size equal to the input batch ' + 'size %s. Instead found %s' % + (output_alias, batch_size, output.shape)) + + for i in range(batch_size): + for output_alias, output in outputs.items(): + # Mimic tensor::Split + tensor_proto = tf.make_tensor_proto( + values=output[i], + dtype=tf.as_dtype(output[i].dtype).as_datatype_enum, + shape=np.expand_dims(output[i], axis=0).shape) + predict_log.response.outputs[output_alias].CopyFrom(tensor_proto) + result.append(predict_log) return result -@beam.typehints.with_input_types(List[Union[tf.train.Example, - tf.train.SequenceExample]]) -@beam.typehints.with_output_types(Tuple[tf.train.Example, - inference_pb2.MultiInferenceResponse]) +@beam.typehints.with_input_types(pa.RecordBatch) +@beam.typehints.with_output_types(Tuple[Union[str, bytes], + inference_pb2.MultiInferenceResponse]) class _BatchMultiInferenceDoFn(_BaseBatchSavedModelDoFn): """A DoFn that runs inference on multi-head model.""" - def _check_elements( - self, elements: List[Union[tf.train.Example, - tf.train.SequenceExample]]) -> None: - if not all(isinstance(element, tf.train.Example) for element in elements): - raise ValueError('Multi inference only supports tf.train.Example') + def _check_elements(self) -> None: + if self._data_type != DataType.EXAMPLE: + raise ValueError('Multi-inference only supports tf.train.Example') def _post_process( - self, elements: Sequence[tf.train.Example], outputs: Mapping[Text, - np.ndarray] - ) -> Iterable[Tuple[tf.train.Example, inference_pb2.MultiInferenceResponse]]: + self, elements: Mapping[Any, Any], + outputs: Mapping[Text, np.ndarray] + ) -> Iterable[Tuple[Union[str, bytes], inference_pb2.MultiInferenceResponse]]: classifications = None regressions = None + serialized_examples, = elements.values() for signature in self._signatures: signature_def = signature.signature_def if signature_def.method_name == tf.saved_model.CLASSIFY_METHOD_NAME: classifications = _post_process_classify( - self._io_tensor_spec.output_alias_tensor_names, elements, outputs) + self._io_tensor_spec.output_alias_tensor_names, + serialized_examples, outputs) elif signature_def.method_name == tf.saved_model.REGRESS_METHOD_NAME: - regressions = _post_process_regress(elements, outputs) + regressions = _post_process_regress(serialized_examples, outputs) else: raise ValueError('Signature method %s is not supported for ' 'multi inference' % signature_def.method_name) result = [] - for i in range(len(elements)): + for i in range(len(serialized_examples)): response = inference_pb2.MultiInferenceResponse() for signature in self._signatures: signature_def = signature.signature_def @@ -792,41 +949,42 @@ def _post_process( if len(response.results) != len(self._signatures): raise RuntimeError('Multi inference response result length does not ' 'match the number of signatures') - result.append((elements[i], response)) + result.append((serialized_examples[i], response)) return result -@beam.typehints.with_input_types(Tuple[tf.train.Example, - classification_pb2.Classifications]) + +@beam.typehints.with_input_types(Tuple[Union[str, bytes], + classification_pb2.Classifications]) @beam.typehints.with_output_types(prediction_log_pb2.PredictionLog) class _BuildPredictionLogForClassificationsDoFn(beam.DoFn): """A DoFn that builds prediction log from classifications.""" def process( - self, element: Tuple[tf.train.Example, classification_pb2.Classifications] + self, element: Tuple[Union[str, bytes], classification_pb2.Classifications] ) -> Iterable[prediction_log_pb2.PredictionLog]: (train_example, classifications) = element result = prediction_log_pb2.PredictionLog() result.classify_log.request.input.example_list.examples.add().CopyFrom( - train_example) + tf.train.Example.FromString(train_example)) result.classify_log.response.result.classifications.add().CopyFrom( classifications) yield result -@beam.typehints.with_input_types(Tuple[tf.train.Example, - regression_pb2.Regression]) +@beam.typehints.with_input_types(Tuple[Union[str, bytes], + regression_pb2.Regression]) @beam.typehints.with_output_types(prediction_log_pb2.PredictionLog) class _BuildPredictionLogForRegressionsDoFn(beam.DoFn): """A DoFn that builds prediction log from regressions.""" def process( - self, element: Tuple[tf.train.Example, regression_pb2.Regression] + self, element: Tuple[Union[str, bytes], regression_pb2.Regression] ) -> Iterable[prediction_log_pb2.PredictionLog]: (train_example, regression) = element result = prediction_log_pb2.PredictionLog() result.regress_log.request.input.example_list.examples.add().CopyFrom( - train_example) + tf.train.Example.FromString(train_example)) result.regress_log.response.result.regressions.add().CopyFrom(regression) yield result @@ -844,28 +1002,28 @@ def process( yield result -@beam.typehints.with_input_types(Tuple[tf.train.Example, - inference_pb2.MultiInferenceResponse]) +@beam.typehints.with_input_types(Tuple[Union[str, bytes], + inference_pb2.MultiInferenceResponse]) @beam.typehints.with_output_types(prediction_log_pb2.PredictionLog) class _BuildMultiInferenceLogDoFn(beam.DoFn): """A DoFn that builds prediction log from multi-head inference result.""" def process( - self, element: Tuple[tf.train.Example, - inference_pb2.MultiInferenceResponse] + self, element: Tuple[Union[str, bytes], + inference_pb2.MultiInferenceResponse] ) -> Iterable[prediction_log_pb2.PredictionLog]: (train_example, multi_inference_response) = element result = prediction_log_pb2.PredictionLog() (result.multi_inference_log.request.input.example_list.examples.add() - .CopyFrom(train_example)) + .CopyFrom(tf.train.Example.FromString(train_example))) result.multi_inference_log.response.CopyFrom(multi_inference_response) yield result def _post_process_classify( output_alias_tensor_names: Mapping[Text, Text], - elements: Sequence[tf.train.Example], outputs: Mapping[Text, np.ndarray] -) -> Sequence[classification_pb2.Classifications]: + elements: Sequence[Union[str, bytes]], outputs: Mapping[Text, np.ndarray] + ) -> Sequence[classification_pb2.Classifications]: """Returns classifications from inference output.""" # This is to avoid error "The truth value of an array with @@ -925,7 +1083,7 @@ def _post_process_classify( def _post_process_regress( - elements: Sequence[tf.train.Example], + elements: Sequence[Union[str, bytes]], outputs: Mapping[Text, np.ndarray]) -> Sequence[regression_pb2.Regression]: """Returns regressions from inference output.""" @@ -962,28 +1120,27 @@ def _post_process_regress( def _signature_pre_process(signature: _SignatureDef) -> _IOTensorSpec: """Returns IOTensorSpec from signature.""" - - if len(signature.inputs) != 1: - raise ValueError('Signature should have 1 and only 1 inputs') - input_tensor_alias = list(signature.inputs.keys())[0] - if list(signature.inputs.values())[0].dtype != tf.string.as_datatype_enum: + if (len(signature.inputs) == 1 and + list(signature.inputs.values())[0].dtype != tf.string.as_datatype_enum): raise ValueError( - 'Input dtype is expected to be %s, got %s' % tf.string.as_datatype_enum, - list(signature.inputs.values())[0].dtype) + 'With 1 input, dtype is expected to be %s, got %s' % + tf.string.as_datatype_enum, + list(signature.inputs.values())[0].dtype) + input_tensor_alias = [alias for alias in signature.inputs.keys()] if signature.method_name == tf.saved_model.CLASSIFY_METHOD_NAME: - input_tensor_name, output_alias_tensor_names = ( - _signature_pre_process_classify(signature)) + input_tensor_names, input_tensor_types, output_alias_tensor_names = ( + _signature_pre_process_classify(signature)) elif signature.method_name == tf.saved_model.PREDICT_METHOD_NAME: - input_tensor_name, output_alias_tensor_names = ( - _signature_pre_process_predict(signature)) + input_tensor_names, input_tensor_types, output_alias_tensor_names = ( + _signature_pre_process_predict(signature)) elif signature.method_name == tf.saved_model.REGRESS_METHOD_NAME: - input_tensor_name, output_alias_tensor_names = ( - _signature_pre_process_regress(signature)) + input_tensor_names, input_tensor_types, output_alias_tensor_names = ( + _signature_pre_process_regress(signature)) else: raise ValueError('Signature method %s is not supported' % - signature.method_name) - return _IOTensorSpec(input_tensor_alias, input_tensor_name, - output_alias_tensor_names) + signature.method_name) + return _IOTensorSpec(input_tensor_alias, input_tensor_names, + input_tensor_types, output_alias_tensor_names) def _signature_pre_process_classify( @@ -996,13 +1153,14 @@ def _signature_pre_process_classify( Returns: A tuple of input tensor name and output alias tensor names. """ - + if len(signature.inputs) != 1: + raise ValueError('Classify signature should have 1 and only 1 inputs') if len(signature.outputs) != 1 and len(signature.outputs) != 2: raise ValueError('Classify signature should have 1 or 2 outputs') if tf.saved_model.CLASSIFY_INPUTS not in signature.inputs: raise ValueError('No classification inputs found in SignatureDef: %s' % signature.inputs) - input_tensor_name = signature.inputs[tf.saved_model.CLASSIFY_INPUTS].name + input_tensor_names = [signature.inputs[tf.saved_model.CLASSIFY_INPUTS].name] output_alias_tensor_names = {} if (tf.saved_model.CLASSIFY_OUTPUT_CLASSES not in signature.outputs and tf.saved_model.CLASSIFY_OUTPUT_SCORES not in signature.outputs): @@ -1017,7 +1175,7 @@ def _signature_pre_process_classify( if tf.saved_model.CLASSIFY_OUTPUT_SCORES in signature.outputs: output_alias_tensor_names[tf.saved_model.CLASSIFY_OUTPUT_SCORES] = ( signature.outputs[tf.saved_model.CLASSIFY_OUTPUT_SCORES].name) - return input_tensor_name, output_alias_tensor_names + return input_tensor_names, {}, output_alias_tensor_names def _signature_pre_process_predict( @@ -1030,12 +1188,14 @@ def _signature_pre_process_predict( Returns: A tuple of input tensor name and output alias tensor names. """ - - input_tensor_name = list(signature.inputs.values())[0].name + input_tensor_names = [value.name for value in signature.inputs.values()] + input_tensor_types = dict([ + (key, value.dtype) for key, value in signature.inputs.items() + ]) output_alias_tensor_names = dict([ - (key, output.name) for key, output in signature.outputs.items() + (key, output.name) for key, output in signature.outputs.items() ]) - return input_tensor_name, output_alias_tensor_names + return input_tensor_names, input_tensor_types, output_alias_tensor_names def _signature_pre_process_regress( @@ -1048,13 +1208,14 @@ def _signature_pre_process_regress( Returns: A tuple of input tensor name and output alias tensor names. """ - + if len(signature.inputs) != 1: + raise ValueError('Regress signature should have 1 and only 1 inputs') if len(signature.outputs) != 1: raise ValueError('Regress signature should have 1 output') if tf.saved_model.REGRESS_INPUTS not in signature.inputs: raise ValueError('No regression inputs found in SignatureDef: %s' % signature.inputs) - input_tensor_name = signature.inputs[tf.saved_model.REGRESS_INPUTS].name + input_tensor_names = [signature.inputs[tf.saved_model.REGRESS_INPUTS].name] if tf.saved_model.REGRESS_OUTPUTS not in signature.outputs: raise ValueError('No regression outputs found in SignatureDef: %s' % signature.outputs) @@ -1062,7 +1223,7 @@ def _signature_pre_process_regress( tf.saved_model.REGRESS_OUTPUTS: signature.outputs[tf.saved_model.REGRESS_OUTPUTS].name } - return input_tensor_name, output_alias_tensor_names + return input_tensor_names, {}, output_alias_tensor_names def _using_in_process_inference( @@ -1173,6 +1334,30 @@ def _is_cygwin() -> bool: return platform.system().startswith('CYGWIN_NT') +class AssertType(beam.PTransform): + """Check and cast a PCollection's elements to a given type.""" + def __init__(self, data_type: Any, operation_type: Text, label=None): + super().__init__(label) + self.data_type = data_type + self.operation_type = operation_type + self.first_data = False + + def expand(self, pcoll: beam.pvalue.PCollection): + @beam.typehints.with_output_types(Iterable[self.data_type]) + def _assert_fn(element: Any): + if not isinstance(element, self.data_type): + raise ValueError( + 'Operation type %s expected element of type %s, got: %s' % + (self.operation_type, self.data_type, type(element))) + yield element + + # Skip run-time type checking if the type already matches. + if pcoll.element_type == self.data_type: + return pcoll + else: + return pcoll | beam.ParDo(_assert_fn) + + class _Clock(object): def get_current_time_in_microseconds(self) -> int: diff --git a/tfx_bsl/beam/run_inference_test.py b/tfx_bsl/beam/run_inference_test.py index 5fb9adad..6d8b21ae 100644 --- a/tfx_bsl/beam/run_inference_test.py +++ b/tfx_bsl/beam/run_inference_test.py @@ -27,6 +27,7 @@ import mock import apache_beam as beam +import pyarrow as pa from apache_beam.metrics.metric import MetricsFilter from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -34,12 +35,18 @@ from googleapiclient import http from six.moves import http_client import tensorflow as tf +from tfx_bsl.beam import bsl_util from tfx_bsl.beam import run_inference +from tfx_bsl.beam.bsl_constants import DataType +from tfx_bsl.beam.bsl_constants import _RECORDBATCH_COLUMN from tfx_bsl.public.proto import model_spec_pb2 +from tfx_bsl.tfxio import test_util +from tfx_bsl.tfxio import tensor_adapter +from tfx_bsl.tfxio import tf_example_record from google.protobuf import text_format - from tensorflow_serving.apis import prediction_log_pb2 +from tensorflow_metadata.proto.v0 import schema_pb2 class RunInferenceFixture(tf.test.TestCase): @@ -49,10 +56,10 @@ def setUp(self): self._predict_examples = [ text_format.Parse( """ - features { - feature { key: "input1" value { float_list { value: 0 }}} - } - """, tf.train.Example()), + features { + feature { key: "input1" value { float_list { value: 0 }}} + } + """, tf.train.Example()), ] def _get_output_data_dir(self, sub_dir=None): @@ -72,10 +79,572 @@ def _prepare_predict_examples(self, example_path): output_file.write(example.SerializeToString()) -class RunOfflineInferenceTest(RunInferenceFixture): +class RunOfflineInferenceExamplesTest(RunInferenceFixture): + + def setUp(self): + super(RunOfflineInferenceExamplesTest, self).setUp() + self._predict_examples = [ + text_format.Parse( + """ + features { + feature { key: "input1" value { float_list { value: 0 }}} + } + """, tf.train.Example()), + text_format.Parse( + """ + features { + feature { key: "input1" value { float_list { value: 1 }}} + } + """, tf.train.Example()), + ] + self._multihead_examples = [ + text_format.Parse( + """ + features { + feature {key: "x" value { float_list { value: 0.8 }}} + feature {key: "y" value { float_list { value: 0.2 }}} + } + """, tf.train.Example()), + text_format.Parse( + """ + features { + feature {key: "x" value { float_list { value: 0.6 }}} + feature {key: "y" value { float_list { value: 0.1 }}} + } + """, tf.train.Example()), + ] + + self.schema = text_format.Parse( + """ + tensor_representation_group { + key: "" + value { + tensor_representation { + key: "x" + value { + dense_tensor { + column_name: "x" + shape { dim { size: 1 } } + } + } + } + tensor_representation { + key: "y" + value { + dense_tensor { + column_name: "y" + shape { dim { size: 1 } } + } + } + } + } + } + feature { + name: "x" + type: FLOAT + } + feature { + name: "y" + type: FLOAT + } + """, schema_pb2.Schema()) + + def _prepare_multihead_examples(self, example_path): + with tf.io.TFRecordWriter(example_path) as output_file: + for example in self._multihead_examples: + output_file.write(example.SerializeToString()) + + def _run_inference_with_beam(self, example_path, inference_spec_type, + prediction_log_path, include_schema = False): + schema = None + if include_schema: + schema = self.schema + + with beam.Pipeline() as pipeline: + _ = ( + pipeline + | 'ReadExamples' >> beam.io.ReadFromTFRecord(example_path) + | 'ParseExamples' >> beam.Map(tf.train.Example.FromString) + | 'RunInference' >> run_inference.RunInferenceOnExamples( + inference_spec_type, schema=schema) + | 'WritePredictions' >> beam.io.WriteToTFRecord( + prediction_log_path, + coder=beam.coders.ProtoCoder(prediction_log_pb2.PredictionLog))) + + def _get_results(self, prediction_log_path): + results = [] + for f in tf.io.gfile.glob(prediction_log_path + '-?????-of-?????'): + record_iterator = tf.compat.v1.io.tf_record_iterator(path=f) + for record_string in record_iterator: + prediction_log = prediction_log_pb2.PredictionLog() + prediction_log.MergeFromString(record_string) + results.append(prediction_log) + return results + + + def testKerasModelPredict(self): + inputs = tf.keras.Input(shape=(1,), name='input1') + output1 = tf.keras.layers.Dense( + 1, activation=tf.nn.sigmoid, name='output1')( + inputs) + output2 = tf.keras.layers.Dense( + 1, activation=tf.nn.sigmoid, name='output2')( + inputs) + inference_model = tf.keras.models.Model(inputs, [output1, output2]) + + class TestKerasModel(tf.keras.Model): + + def __init__(self, inference_model): + super(TestKerasModel, self).__init__(name='test_keras_model') + self.inference_model = inference_model + + @tf.function(input_signature=[ + tf.TensorSpec(shape=[None], dtype=tf.string, name='inputs') + ]) + def call(self, serialized_example): + features = { + 'input1': + tf.compat.v1.io.FixedLenFeature([1], + dtype=tf.float32, + default_value=0) + } + input_tensor_dict = tf.io.parse_example(serialized_example, features) + return inference_model(input_tensor_dict['input1']) + + model = TestKerasModel(inference_model) + model.compile( + optimizer=tf.keras.optimizers.Adam(lr=.001), + loss=tf.keras.losses.binary_crossentropy, + metrics=['accuracy']) + + model_path = self._get_output_data_dir('model') + tf.compat.v1.keras.experimental.export_saved_model( + model, model_path, serving_only=True) + + example_path = self._get_output_data_dir('examples') + self._prepare_predict_examples(example_path) + prediction_log_path = self._get_output_data_dir('predictions') + self._run_inference_with_beam( + example_path, + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path)), prediction_log_path) + + results = self._get_results(prediction_log_path) + self.assertLen(results, 2) + + def testKerasModelPredictMultiTensor(self): + input1 = tf.keras.layers.Input((1,), name='x') + input2 = tf.keras.layers.Input((1,), name='y') + + x1 = tf.keras.layers.Dense(10)(input1) + x2 = tf.keras.layers.Dense(10)(input2) + output = tf.keras.layers.Dense(5, name='output')(x2) + + model = tf.keras.models.Model([input1, input2], output) + model_path = self._get_output_data_dir('model') + tf.compat.v1.keras.experimental.export_saved_model( + model, model_path, serving_only=True) + + example_path = self._get_output_data_dir('examples') + self._prepare_multihead_examples(example_path) + prediction_log_path = self._get_output_data_dir('predictions') + self._run_inference_with_beam( + example_path, + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path)), + prediction_log_path, include_schema = True) + + results = self._get_results(prediction_log_path) + self.assertLen(results, 2) + for result in results: + self.assertLen(result.predict_log.request.inputs, 2) + self.assertAllInSet(list(result.predict_log.request.inputs), list(['x','y'])) + + +class RunRemoteInferenceExamplesTest(RunInferenceFixture): + + def setUp(self): + super(RunRemoteInferenceExamplesTest, self).setUp() + self._example_path = self._get_output_data_dir('example') + self._prepare_predict_examples(self._example_path) + # This is from https://ml.googleapis.com/$discovery/rest?version=v1. + self._discovery_testdata_dir = os.path.join( + os.path.join(os.path.dirname(__file__), 'testdata'), + 'ml_discovery.json') + + @staticmethod + def _make_response_body(content, successful): + if successful: + response_dict = {'predictions': content} + else: + response_dict = {'error': content} + return json.dumps(response_dict) + + def _set_up_pipeline(self, inference_spec_type): + self.pipeline = beam.Pipeline() + self.pcoll = ( + self.pipeline + | 'ReadExamples' >> beam.io.ReadFromTFRecord(self._example_path) + | 'ParseExamples' >> beam.Map(tf.train.Example.FromString) + | 'RunInference' >> run_inference.RunInferenceOnExamples(inference_spec_type)) + + def _run_inference_with_beam(self): + self.pipeline_result = self.pipeline.run() + self.pipeline_result.wait_until_finish() + + def test_model_predict(self): + predictions = [{'output_1': [0.901], 'output_2': [0.997]}] + builder = http.RequestMockBuilder({ + 'ml.projects.predict': + (None, self._make_response_body(predictions, successful=True)) + }) + resource = discovery.build( + 'ml', + 'v1', + http=http.HttpMock(self._discovery_testdata_dir, + {'status': http_client.OK}), + requestBuilder=builder) + with mock.patch('googleapiclient.discovery.' 'build') as response_mock: + response_mock.side_effect = lambda service, version: resource + inference_spec_type = model_spec_pb2.InferenceSpecType( + ai_platform_prediction_model_spec=model_spec_pb2 + .AIPlatformPredictionModelSpec( + project_id='test-project', + model_name='test-model', + )) + + prediction_log = prediction_log_pb2.PredictionLog() + prediction_log.predict_log.response.outputs['output_1'].CopyFrom( + tf.make_tensor_proto(values=[0.901], dtype=tf.double, shape=(1, 1))) + prediction_log.predict_log.response.outputs['output_2'].CopyFrom( + tf.make_tensor_proto(values=[0.997], dtype=tf.double, shape=(1, 1))) + + self._set_up_pipeline(inference_spec_type) + assert_that(self.pcoll, equal_to([prediction_log])) + self._run_inference_with_beam() + + +class RunOfflineInferenceSequenceExamplesTest(RunInferenceFixture): + + def setUp(self): + super(RunOfflineInferenceSequenceExamplesTest, self).setUp() + self._predict_examples = [ + text_format.Parse( + """ + context { + feature { key: "input1" value { float_list { value: 0 }}} + } + """, tf.train.SequenceExample()), + text_format.Parse( + """ + context { + feature { key: "input1" value { float_list { value: 1 }}} + } + """, tf.train.SequenceExample()), + ] + self._multihead_examples = [ + text_format.Parse( + """ + context { + feature {key: "x" value { float_list { value: 0.8 }}} + feature {key: "y" value { float_list { value: 0.2 }}} + } + """, tf.train.SequenceExample()), + text_format.Parse( + """ + context { + feature {key: "x" value { float_list { value: 0.6 }}} + feature {key: "y" value { float_list { value: 0.1 }}} + } + """, tf.train.SequenceExample()), + ] + + self.schema = text_format.Parse( + """ + tensor_representation_group { + key: "" + value { + tensor_representation { + key: "x" + value { + dense_tensor { + column_name: "x" + shape { dim { size: 1 } } + } + } + } + tensor_representation { + key: "y" + value { + dense_tensor { + column_name: "y" + shape { dim { size: 1 } } + } + } + } + } + } + feature { + name: "x" + type: FLOAT + } + feature { + name: "y" + type: FLOAT + } + """, schema_pb2.Schema()) + + def _prepare_multihead_examples(self, example_path): + with tf.io.TFRecordWriter(example_path) as output_file: + for example in self._multihead_examples: + output_file.write(example.SerializeToString()) + + def _run_inference_with_beam(self, example_path, inference_spec_type, + prediction_log_path, include_schema = False): + schema = None + if include_schema: + schema = self.schema + + with beam.Pipeline() as pipeline: + _ = ( + pipeline + | 'ReadExamples' >> beam.io.ReadFromTFRecord(example_path) + | 'ParseExamples' >> beam.Map(tf.train.SequenceExample.FromString) + | 'RunInference' >> run_inference.RunInferenceOnExamples( + inference_spec_type, schema=schema) + | 'WritePredictions' >> beam.io.WriteToTFRecord( + prediction_log_path, + coder=beam.coders.ProtoCoder(prediction_log_pb2.PredictionLog))) + + def _get_results(self, prediction_log_path): + results = [] + for f in tf.io.gfile.glob(prediction_log_path + '-?????-of-?????'): + record_iterator = tf.compat.v1.io.tf_record_iterator(path=f) + for record_string in record_iterator: + prediction_log = prediction_log_pb2.PredictionLog() + prediction_log.MergeFromString(record_string) + results.append(prediction_log) + return results + + + def _build_regression_signature(self, input_tensor, output_tensor): + """Helper function for building a regression SignatureDef.""" + input_tensor_info = tf.compat.v1.saved_model.utils.build_tensor_info( + input_tensor) + signature_inputs = { + tf.compat.v1.saved_model.signature_constants.REGRESS_INPUTS: + input_tensor_info + } + output_tensor_info = tf.compat.v1.saved_model.utils.build_tensor_info( + output_tensor) + signature_outputs = { + tf.compat.v1.saved_model.signature_constants.REGRESS_OUTPUTS: + output_tensor_info + } + return tf.compat.v1.saved_model.signature_def_utils.build_signature_def( + signature_inputs, signature_outputs, + tf.compat.v1.saved_model.signature_constants.REGRESS_METHOD_NAME) + + def _build_classification_signature(self, input_tensor, scores_tensor): + """Helper function for building a classification SignatureDef.""" + input_tensor_info = tf.compat.v1.saved_model.utils.build_tensor_info( + input_tensor) + signature_inputs = { + tf.compat.v1.saved_model.signature_constants.CLASSIFY_INPUTS: + input_tensor_info + } + output_tensor_info = tf.compat.v1.saved_model.utils.build_tensor_info( + scores_tensor) + signature_outputs = { + tf.compat.v1.saved_model.signature_constants.CLASSIFY_OUTPUT_SCORES: + output_tensor_info + } + return tf.compat.v1.saved_model.signature_def_utils.build_signature_def( + signature_inputs, signature_outputs, + tf.compat.v1.saved_model.signature_constants.CLASSIFY_METHOD_NAME) + + def _build_multihead_model(self, model_path): + with tf.compat.v1.Graph().as_default(): + input_example = tf.compat.v1.placeholder( + tf.string, name='input_examples_tensor') + config = { + 'x': tf.compat.v1.io.FixedLenFeature( + [1], dtype=tf.float32, default_value=0), + 'y': tf.compat.v1.io.FixedLenFeature( + [1], dtype=tf.float32, default_value=0), + } + features = tf.compat.v1.parse_example(input_example, config) + x = features['x'] + y = features['y'] + sum_pred = x + y + diff_pred = tf.abs(x - y) + sess = tf.compat.v1.Session() + sess.run(tf.compat.v1.initializers.global_variables()) + signature_def_map = { + 'regress_diff': + self._build_regression_signature(input_example, diff_pred), + 'classify_sum': + self._build_classification_signature(input_example, sum_pred), + tf.compat.v1.saved_model.signature_constants + .DEFAULT_SERVING_SIGNATURE_DEF_KEY: + self._build_regression_signature(input_example, sum_pred) + } + builder = tf.compat.v1.saved_model.builder.SavedModelBuilder(model_path) + builder.add_meta_graph_and_variables( + sess, [tf.compat.v1.saved_model.tag_constants.SERVING], + signature_def_map=signature_def_map) + builder.save() + + + def testClassifyModelError(self): + example_path = self._get_output_data_dir('examples') + self._prepare_multihead_examples(example_path) + model_path = self._get_output_data_dir('model') + self._build_multihead_model(model_path) + prediction_log_path = self._get_output_data_dir('predictions') + error_msg = 'Operation type' + try: + self._run_inference_with_beam( + example_path, + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path, signature_name=['classify_sum'])), + prediction_log_path) + except ValueError as exc: + actual_error_msg = str(exc) + self.assertTrue(actual_error_msg.startswith(error_msg)) + else: + self.fail('Test was expected to throw ValueError exception') + + def testRegressModelError(self): + example_path = self._get_output_data_dir('examples') + self._prepare_multihead_examples(example_path) + model_path = self._get_output_data_dir('model') + self._build_multihead_model(model_path) + prediction_log_path = self._get_output_data_dir('predictions') + error_msg = 'Operation type' + try: + self._run_inference_with_beam( + example_path, + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path, signature_name=['regress_diff'])), + prediction_log_path) + except ValueError as exc: + actual_error_msg = str(exc) + self.assertTrue(actual_error_msg.startswith(error_msg)) + else: + self.fail('Test was expected to throw ValueError exception') + + def testMultiInferenceModelError(self): + example_path = self._get_output_data_dir('examples') + self._prepare_multihead_examples(example_path) + model_path = self._get_output_data_dir('model') + self._build_multihead_model(model_path) + prediction_log_path = self._get_output_data_dir('predictions') + error_msg = 'Operation type' + try: + self._run_inference_with_beam( + example_path, + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path, + signature_name=['regress_diff', 'classify_sum'])), + prediction_log_path) + except ValueError as exc: + actual_error_msg = str(exc) + self.assertTrue(actual_error_msg.startswith(error_msg)) + else: + self.fail('Test was expected to throw ValueError exception') + + + def testKerasModelPredict(self): + inputs = tf.keras.Input(shape=(1,), name='input1') + output1 = tf.keras.layers.Dense( + 1, activation=tf.nn.sigmoid, name='output1')( + inputs) + output2 = tf.keras.layers.Dense( + 1, activation=tf.nn.sigmoid, name='output2')( + inputs) + inference_model = tf.keras.models.Model(inputs, [output1, output2]) + + class TestKerasModel(tf.keras.Model): + + def __init__(self, inference_model): + super(TestKerasModel, self).__init__(name='test_keras_model') + self.inference_model = inference_model + + @tf.function(input_signature=[ + tf.TensorSpec(shape=[None], dtype=tf.string, name='inputs') + ]) + def call(self, serialized_example): + features = { + 'input1': + tf.compat.v1.io.FixedLenFeature([1], + dtype=tf.float32, + default_value=0) + } + input_tensor_dict = tf.io.parse_example(serialized_example, features) + return inference_model(input_tensor_dict['input1']) + + model = TestKerasModel(inference_model) + model.compile( + optimizer=tf.keras.optimizers.Adam(lr=.001), + loss=tf.keras.losses.binary_crossentropy, + metrics=['accuracy']) + + model_path = self._get_output_data_dir('model') + tf.compat.v1.keras.experimental.export_saved_model( + model, model_path, serving_only=True) + + example_path = self._get_output_data_dir('examples') + self._prepare_predict_examples(example_path) + prediction_log_path = self._get_output_data_dir('predictions') + self._run_inference_with_beam( + example_path, + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path)), prediction_log_path) + + results = self._get_results(prediction_log_path) + self.assertLen(results, 2) + + def testKerasModelPredictMultiTensor(self): + input1 = tf.keras.layers.Input((1,), name='x') + input2 = tf.keras.layers.Input((1,), name='y') + + x1 = tf.keras.layers.Dense(10)(input1) + x2 = tf.keras.layers.Dense(10)(input2) + output = tf.keras.layers.Dense(5, name='output')(x2) + + model = tf.keras.models.Model([input1, input2], output) + model_path = self._get_output_data_dir('model') + tf.compat.v1.keras.experimental.export_saved_model( + model, model_path, serving_only=True) + + example_path = self._get_output_data_dir('examples') + self._prepare_multihead_examples(example_path) + prediction_log_path = self._get_output_data_dir('predictions') + self._run_inference_with_beam( + example_path, + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path)), + prediction_log_path, include_schema = True) + + results = self._get_results(prediction_log_path) + self.assertLen(results, 2) + for result in results: + self.assertLen(result.predict_log.request.inputs, 2) + self.assertAllInSet(list(result.predict_log.request.inputs), list(['x','y'])) + + +class RunOfflineInferenceArrowTest(RunInferenceFixture): def setUp(self): - super(RunOfflineInferenceTest, self).setUp() + super(RunOfflineInferenceArrowTest, self).setUp() self._predict_examples = [ text_format.Parse( """ @@ -90,6 +659,7 @@ def setUp(self): } """, tf.train.Example()), ] + self._multihead_examples = [ text_format.Parse( """ @@ -107,6 +677,40 @@ def setUp(self): """, tf.train.Example()), ] + self.schema = text_format.Parse( + """ + tensor_representation_group { + key: "" + value { + tensor_representation { + key: "x" + value { + dense_tensor { + column_name: "x" + shape { dim { size: 1 } } + } + } + } + tensor_representation { + key: "y" + value { + dense_tensor { + column_name: "y" + shape { dim { size: 1 } } + } + } + } + } + } + feature { + name: "x" + type: FLOAT + } + feature { + name: "y" + type: FLOAT + } + """, schema_pb2.Schema()) def _prepare_multihead_examples(self, example_path): with tf.io.TFRecordWriter(example_path) as output_file: @@ -208,14 +812,35 @@ def _build_multihead_model(self, model_path): builder.save() def _run_inference_with_beam(self, example_path, inference_spec_type, - prediction_log_path): - with beam.Pipeline() as pipeline: - _ = ( + prediction_log_path, include_config = False): + # test _RunInferenceOnRecordBatch + converter = tf_example_record.TFExampleBeamRecord( + physical_format="inmem", telemetry_descriptors=[], + schema=self.schema, raw_record_column_name=_RECORDBATCH_COLUMN) + + if include_config: + tensor_adapter_config = tensor_adapter.TensorAdapterConfig( + arrow_schema=converter.ArrowSchema(), + tensor_representations=converter.TensorRepresentations()) + + with beam.Pipeline() as pipeline: + _ = ( pipeline | 'ReadExamples' >> beam.io.ReadFromTFRecord(example_path) - | 'ParseExamples' >> beam.Map(tf.train.Example.FromString) - | - 'RunInference' >> run_inference.RunInferenceImpl(inference_spec_type) + | 'ConvertToRecordBatch' >> converter.BeamSource() + | 'RunInference' >> run_inference._RunInferenceOnRecordBatch( + inference_spec_type, DataType.EXAMPLE, tensor_adapter_config) + | 'WritePredictions' >> beam.io.WriteToTFRecord( + prediction_log_path, + coder=beam.coders.ProtoCoder(prediction_log_pb2.PredictionLog))) + else: + with beam.Pipeline() as pipeline: + _ = ( + pipeline + | 'ReadExamples' >> beam.io.ReadFromTFRecord(example_path) + | 'ConvertToRecordBatch' >> converter.BeamSource() + | 'RunInference' >> run_inference._RunInferenceOnRecordBatch( + inference_spec_type, DataType.EXAMPLE) | 'WritePredictions' >> beam.io.WriteToTFRecord( prediction_log_path, coder=beam.coders.ProtoCoder(prediction_log_pb2.PredictionLog))) @@ -364,7 +989,6 @@ def testKerasModelPredict(self): inference_model = tf.keras.models.Model(inputs, [output1, output2]) class TestKerasModel(tf.keras.Model): - def __init__(self, inference_model): super(TestKerasModel, self).__init__(name='test_keras_model') self.inference_model = inference_model @@ -374,10 +998,9 @@ def __init__(self, inference_model): ]) def call(self, serialized_example): features = { - 'input1': - tf.compat.v1.io.FixedLenFeature([1], - dtype=tf.float32, - default_value=0) + 'input1': tf.compat.v1.io.FixedLenFeature( + [1], dtype=tf.float32, + default_value=0) } input_tensor_dict = tf.io.parse_example(serialized_example, features) return inference_model(input_tensor_dict['input1']) @@ -388,12 +1011,12 @@ def call(self, serialized_example): loss=tf.keras.losses.binary_crossentropy, metrics=['accuracy']) + example_path = self._get_output_data_dir('examples') + self._prepare_predict_examples(example_path) model_path = self._get_output_data_dir('model') tf.compat.v1.keras.experimental.export_saved_model( model, model_path, serving_only=True) - example_path = self._get_output_data_dir('examples') - self._prepare_predict_examples(example_path) prediction_log_path = self._get_output_data_dir('predictions') self._run_inference_with_beam( example_path, @@ -404,6 +1027,66 @@ def call(self, serialized_example): results = self._get_results(prediction_log_path) self.assertLen(results, 2) + def testKerasModelPredictMultiTensor(self): + input1 = tf.keras.layers.Input((1,), name='x') + input2 = tf.keras.layers.Input((1,), name='y') + + x1 = tf.keras.layers.Dense(10)(input1) + x2 = tf.keras.layers.Dense(10)(input2) + output = tf.keras.layers.Dense(5, name='output')(x2) + + model = tf.keras.models.Model([input1, input2], output) + model_path = self._get_output_data_dir('model') + tf.compat.v1.keras.experimental.export_saved_model( + model, model_path, serving_only=True) + + example_path = self._get_output_data_dir('examples') + self._prepare_multihead_examples(example_path) + prediction_log_path = self._get_output_data_dir('predictions') + self._run_inference_with_beam( + example_path, + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path)), + prediction_log_path, include_config = True) + + results = self._get_results(prediction_log_path) + self.assertLen(results, 2) + for result in results: + self.assertLen(result.predict_log.request.inputs, 2) + self.assertAllInSet(list(result.predict_log.request.inputs), list(['x','y'])) + + def testMultiTensorError(self): + input1 = tf.keras.layers.Input((1,), name='x') + input2 = tf.keras.layers.Input((1,), name='y') + + x1 = tf.keras.layers.Dense(10)(input1) + x2 = tf.keras.layers.Dense(10)(input2) + output = tf.keras.layers.Dense(5, name='output')(x2) + + model = tf.keras.models.Model([input1, input2], output) + model_path = self._get_output_data_dir('model') + tf.compat.v1.keras.experimental.export_saved_model( + model, model_path, serving_only=True) + + example_path = self._get_output_data_dir('examples') + self._prepare_multihead_examples(example_path) + prediction_log_path = self._get_output_data_dir('predictions') + + error_msg = 'Tensor adaptor config is required with a multi-input model' + try: + self._run_inference_with_beam( + example_path, + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path)), + prediction_log_path, include_config = False) + except ValueError as exc: + actual_error_msg = str(exc) + self.assertTrue(actual_error_msg.startswith(error_msg)) + else: + self.fail('Test was expected to throw ValueError exception') + def testTelemetry(self): example_path = self._get_output_data_dir('examples') self._prepare_multihead_examples(example_path) @@ -412,11 +1095,18 @@ def testTelemetry(self): inference_spec_type = model_spec_pb2.InferenceSpecType( saved_model_spec=model_spec_pb2.SavedModelSpec( model_path=model_path, signature_name=['classify_sum'])) + pipeline = beam.Pipeline() + converter = tf_example_record.TFExampleBeamRecord( + physical_format="inmem", + telemetry_descriptors=[], + raw_record_column_name=_RECORDBATCH_COLUMN) _ = ( - pipeline | 'ReadExamples' >> beam.io.ReadFromTFRecord(example_path) - | 'ParseExamples' >> beam.Map(tf.train.Example.FromString) - | 'RunInference' >> run_inference.RunInferenceImpl(inference_spec_type)) + pipeline + | 'ReadExamples' >> beam.io.ReadFromTFRecord(example_path) + | 'ConvertToRecordBatch' >> converter.BeamSource() + | 'RunInference' >> run_inference._RunInferenceOnRecordBatch( + inference_spec_type, DataType.EXAMPLE)) run_result = pipeline.run() run_result.wait_until_finish() @@ -451,12 +1141,12 @@ def testTelemetry(self): load_model_latency_milli_secs['distributions'][0].result.sum, 0) -class RunRemoteInferenceTest(RunInferenceFixture): +class RunRemoteInferenceArrowTest(RunInferenceFixture): def setUp(self): - super(RunRemoteInferenceTest, self).setUp() - self.example_path = self._get_output_data_dir('example') - self._prepare_predict_examples(self.example_path) + super(RunRemoteInferenceArrowTest, self).setUp() + self._example_path = self._get_output_data_dir('example') + self._prepare_predict_examples(self._example_path) # This is from https://ml.googleapis.com/$discovery/rest?version=v1. self._discovery_testdata_dir = os.path.join( os.path.join(os.path.dirname(__file__), 'testdata'), @@ -472,11 +1162,16 @@ def _make_response_body(content, successful): def _set_up_pipeline(self, inference_spec_type): self.pipeline = beam.Pipeline() + converter = tf_example_record.TFExampleBeamRecord( + physical_format="inmem", + telemetry_descriptors=[], + raw_record_column_name=_RECORDBATCH_COLUMN) self.pcoll = ( self.pipeline - | 'ReadExamples' >> beam.io.ReadFromTFRecord(self.example_path) - | 'ParseExamples' >> beam.Map(tf.train.Example.FromString) - | 'RunInference' >> run_inference.RunInferenceImpl(inference_spec_type)) + | 'ReadExamples' >> beam.io.ReadFromTFRecord(self._example_path) + | 'ConvertToRecordBatch' >> converter.BeamSource() + | 'RunInference' >> run_inference._RunInferenceOnRecordBatch( + inference_spec_type, DataType.EXAMPLE)) def _run_inference_with_beam(self): self.pipeline_result = self.pipeline.run() @@ -568,12 +1263,10 @@ def test_can_format_requests(self): with mock.patch('googleapiclient.discovery.' 'build') as response_mock: response_mock.side_effect = lambda service, version: resource inference_spec_type = model_spec_pb2.InferenceSpecType( - ai_platform_prediction_model_spec=model_spec_pb2 - .AIPlatformPredictionModelSpec( + ai_platform_prediction_model_spec=model_spec_pb2.AIPlatformPredictionModelSpec( project_id='test-project', model_name='test-model', )) - example = text_format.Parse( """ features { @@ -584,66 +1277,22 @@ def test_can_format_requests(self): } """, tf.train.Example()) + converter = tf_example_record.TFExampleBeamRecord( + physical_format="inmem", + telemetry_descriptors=[], + raw_record_column_name=_RECORDBATCH_COLUMN) + self.pipeline = beam.Pipeline() self.pcoll = ( self.pipeline - | 'ReadExamples' >> beam.Create([example]) - | - 'RunInference' >> run_inference.RunInferenceImpl(inference_spec_type)) + | 'CreateExamples' >> beam.Create([example]) + | 'ParseExamples' >> beam.Map(lambda x: x.SerializeToString()) + | 'ConvertToRecordBatch' >> converter.BeamSource() + | 'RunInference' >> run_inference._RunInferenceOnRecordBatch( + inference_spec_type, DataType.EXAMPLE)) self._run_inference_with_beam() - def test_request_body_with_binary_data(self): - example = text_format.Parse( - """ - features { - feature { key: "x_bytes" value { bytes_list { value: ["ASa8asdf"] }}} - feature { key: "x" value { bytes_list { value: "JLK7ljk3" }}} - feature { key: "y" value { int64_list { value: [1, 2] }}} - feature { key: "z" value { float_list { value: [4.5, 5, 5.5] }}} - } - """, tf.train.Example()) - inference_spec_type = model_spec_pb2.InferenceSpecType( - ai_platform_prediction_model_spec=model_spec_pb2 - .AIPlatformPredictionModelSpec( - project_id='test_project', - model_name='test_model', - version_name='test_version')) - remote_predict = run_inference._RemotePredictDoFn(inference_spec_type, None) - result = list(remote_predict._prepare_instances([example])) - self.assertEqual(result, [ - { - 'x_bytes': { - 'b64': 'QVNhOGFzZGY=' - }, - 'x': 'JLK7ljk3', - 'y': [1, 2], - 'z': [4.5, 5, 5.5] - }, - ]) - - def test_request_serialized_example(self): - example = text_format.Parse( - """ - features { - feature { key: "x_bytes" value { bytes_list { value: ["ASa8asdf"] }}} - feature { key: "x" value { bytes_list { value: "JLK7ljk3" }}} - feature { key: "y" value { int64_list { value: [1, 2] }}} - } - """, tf.train.Example()) - inference_spec_type = model_spec_pb2.InferenceSpecType( - ai_platform_prediction_model_spec=model_spec_pb2 - .AIPlatformPredictionModelSpec( - project_id='test_project', - model_name='test_model', - version_name='test_version', - use_serialization_config=True)) - remote_predict = run_inference._RemotePredictDoFn(inference_spec_type, None) - result = list(remote_predict._prepare_instances([example])) - self.assertEqual(result, [{ - 'b64': base64.b64encode(example.SerializeToString()).decode() - }]) - if __name__ == '__main__': tf.test.main() diff --git a/tfx_bsl/public/beam/run_inference.py b/tfx_bsl/public/beam/run_inference.py index d27ab453..9a8eb738 100644 --- a/tfx_bsl/public/beam/run_inference.py +++ b/tfx_bsl/public/beam/run_inference.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # Lint as: python3 -"""Publich API of batch inference.""" +"""Public API of batch inference.""" from __future__ import absolute_import from __future__ import division @@ -21,19 +21,22 @@ import apache_beam as beam import tensorflow as tf +import pyarrow as pa +from typing import Text, Optional, TypeVar from tfx_bsl.beam import run_inference from tfx_bsl.public.proto import model_spec_pb2 -from typing import Union from tensorflow_serving.apis import prediction_log_pb2 +from tensorflow_metadata.proto.v0 import schema_pb2 +MixedExample = TypeVar('MixedExample', tf.train.Example, tf.train.SequenceExample) @beam.ptransform_fn -@beam.typehints.with_input_types(Union[tf.train.Example, - tf.train.SequenceExample]) +@beam.typehints.with_input_types(MixedExample) @beam.typehints.with_output_types(prediction_log_pb2.PredictionLog) def RunInference( # pylint: disable=invalid-name examples: beam.pvalue.PCollection, - inference_spec_type: model_spec_pb2.InferenceSpecType + inference_spec_type: model_spec_pb2.InferenceSpecType, + schema: Optional[schema_pb2.Schema] = None ) -> beam.pvalue.PCollection: """Run inference with a model. @@ -44,19 +47,21 @@ def RunInference( # pylint: disable=invalid-name `ai_platform_prediction_model_spec` field is set in `inference_spec_type`. - TODO(b/131873699): Add support for the following features: - 1. Bytes as Input. - 2. PTable Input. - 3. Models as SideInput. + TODO(b/131873699): Add support for the following features: + 1. Bytes as Input. + 2. PTable Input. + 3. Models as SideInput. Args: examples: A PCollection containing examples. inference_spec_type: Model inference endpoint. + schema [optional]: required for predict models that requires + multi-tensor inputs. Returns: A PCollection containing prediction logs. """ - return ( - examples | - 'RunInferenceImpl' >> run_inference.RunInferenceImpl(inference_spec_type)) + return (examples + | 'RunInferenceOnExamples' >> run_inference.RunInferenceOnExamples( + inference_spec_type, schema=schema))