# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = [
'func_to_container_op',
'func_to_component_text',
]
from ._yaml_utils import dump_yaml
from ._components import _create_task_factory_from_component_spec
from ._structures import *
from pathlib import Path
from typing import TypeVar, Generic
T = TypeVar('T')
#OutputFile[GcsPath[Gzipped[Text]]]
class InputFile(Generic[T], str):
pass
class OutputFile(Generic[T], str):
pass
#TODO: Replace this image name with another name once people decide what to replace it with.
_default_base_image='tensorflow/tensorflow:1.11.0-py3'
def _python_function_name_to_component_name(name):
import re
return re.sub(' +', ' ', name.replace('_', ' ')).strip(' ').capitalize()
def _func_to_component_spec(func, extra_code='', base_image=_default_base_image) -> ComponentSpec:
'''Takes a self-contained python function and converts it to component
Args:
func: Required. The function to be converted
base_image: Optional. Docker image to be used as a base image for the python component. Must have python 3.5+ installed. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
extra_code: Optional. Python source code that gets placed before the function code. Can be used as workaround to define types used in function signature.
'''
decorator_base_image = getattr(func, '_component_base_image', None)
if decorator_base_image is not None:
if base_image is not _default_base_image and decorator_base_image != base_image:
raise ValueError('base_image ({}) conflicts with the decorator-specified base image metadata ({})'.format(base_image, decorator_base_image))
else:
base_image = decorator_base_image
else:
if base_image is None:
raise ValueError('base_image cannot be None')
import inspect
import re
from collections import OrderedDict
single_output_name_const = 'Output'
single_output_pythonic_name_const = 'output'
signature = inspect.signature(func)
parameters = list(signature.parameters.values())
parameter_to_type_name = OrderedDict()
inputs = []
outputs = []
extra_output_names = []
arguments = []
def annotation_to_type_struct(annotation):
if not annotation or annotation == inspect.Parameter.empty:
return None
if isinstance(annotation, type):
return str(annotation.__name__)
else:
return str(annotation)
for parameter in parameters:
type_struct = annotation_to_type_struct(parameter.annotation)
parameter_to_type_name[parameter.name] = str(type_struct)
#TODO: Humanize the input/output names
arguments.append(InputValuePlaceholder(parameter.name))
input_spec = InputSpec(
name=parameter.name,
type=type_struct,
default=str(parameter.default) if parameter.default is not inspect.Parameter.empty else None,
)
inputs.append(input_spec)
#Analyzing the return type annotations.
return_ann = signature.return_annotation
if hasattr(return_ann, '_fields'): #NamedTuple
for field_name in return_ann._fields:
type_struct = None
if hasattr(return_ann, '_field_types'):
type_struct = annotation_to_type_struct(return_ann._field_types.get(field_name, None))
output_spec = OutputSpec(
name=field_name,
type=type_struct,
)
outputs.append(output_spec)
extra_output_names.append(field_name)
arguments.append(OutputPathPlaceholder(field_name))
elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty:
type_struct = annotation_to_type_struct(signature.return_annotation)
output_spec = OutputSpec(
name=single_output_name_const,
type=type_struct,
)
outputs.append(output_spec)
extra_output_names.append(single_output_pythonic_name_const)
arguments.append(OutputPathPlaceholder(single_output_name_const))
func_name=func.__name__
#TODO: Add support for copying the NamedTuple subclass declaration code
#Adding NamedTuple import if needed
func_type_declarations_code = ""
if hasattr(return_ann, '_fields'): #NamedTuple
func_type_declarations_code = func_type_declarations_code + '\n' + 'from typing import NamedTuple'
#Source code can include decorators line @python_op. Remove them
(func_code_lines, _) = inspect.getsourcelines(func)
while func_code_lines[0].lstrip().startswith('@'): #decorator
del func_code_lines[0]
#Function might be defined in some indented scope (e.g. in another function).
#We need to handle this and properly dedent the function source code
first_line = func_code_lines[0]
indent = len(first_line) - len(first_line.lstrip())
func_code_lines = [line[indent:] for line in func_code_lines]
func_code = ''.join(func_code_lines) #Lines retain their \n endings
extra_output_external_names = [name + '_file' for name in extra_output_names]
input_args_parsing_code_lines =(
" '{arg_name}': {arg_type}(sys.argv[{arg_idx}]),".format(
arg_name=name_type[0],
arg_type=name_type[1] if name_type[1] in ['int', 'float', 'bool'] else 'str',
arg_idx=idx + 1
)
for idx, name_type in enumerate(parameter_to_type_name.items())
)
output_files_parsing_code_lines = (
' sys.argv[{}],'.format(idx + len(parameter_to_type_name) + 1)
for idx in range(len(extra_output_external_names))
)
full_source = \
'''\
{extra_code}
{func_type_declarations_code}
{func_code}
import sys
_args = {{
{input_args_parsing_code}
}}
_output_files = [
{output_files_parsing_code}
]
_outputs = {func_name}(**_args)
if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str):
_outputs = [_outputs]
from pathlib import Path
for idx, filename in enumerate(_output_files):
_output_path = Path(filename)
_output_path.parent.mkdir(parents=True, exist_ok=True)
_output_path.write_text(str(_outputs[idx]))
'''.format(
func_name=func_name,
func_code=func_code,
func_type_declarations_code=func_type_declarations_code,
extra_code=extra_code,
input_args_parsing_code='\n'.join(input_args_parsing_code_lines),
output_files_parsing_code='\n'.join(output_files_parsing_code_lines),
)
#Removing consecutive blank lines
full_source = re.sub('\n\n\n+', '\n\n', full_source).strip('\n') + '\n'
#Component name and description are derived from the function's name and docstribng, but can be overridden by @python_component function decorator
#The decorator can set the _component_human_name and _component_description attributes. getattr is needed to prevent error when these attributes do not exist.
component_name = getattr(func, '_component_human_name', None) or _python_function_name_to_component_name(func.__name__)
description = getattr(func, '_component_description', None) or func.__doc__
if description:
description = description.strip() + '\n' #Interesting: unlike ruamel.yaml, PyYaml cannot handle trailing spaces in the last line (' \n') and switches the style to double-quoted.
component_spec = ComponentSpec(
name=component_name,
description=description,
inputs=inputs,
outputs=outputs,
implementation=ContainerImplementation(
container=ContainerSpec(
image=base_image,
command=['python3', '-c', full_source],
args=arguments,
)
)
)
return component_spec
def _func_to_component_dict(func, extra_code='', base_image=_default_base_image):
return _func_to_component_spec(func, extra_code, base_image).to_struct()
[docs]def func_to_component_text(func, extra_code='', base_image=_default_base_image):
'''
Converts a Python function to a component definition and returns its textual representation
Function docstring is used as component description.
Argument and return annotations are used as component input/output types.
To declare a function with multiple return values, use the NamedTuple return annotation syntax:
from typing import NamedTuple
def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('sum', float), ('product', float)]):
"""Returns sum and product of two arguments"""
return (a + b, a * b)
Args:
func: The python function to convert
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
Returns:
Textual representation of a component definition
'''
component_dict = _func_to_component_dict(func, extra_code, base_image)
return dump_yaml(component_dict)
def func_to_component_file(func, output_component_file, base_image=_default_base_image, extra_code='') -> None:
'''
Converts a Python function to a component definition and writes it to a file
Function docstring is used as component description.
Argument and return annotations are used as component input/output types.
To declare a function with multiple return values, use the NamedTuple return annotation syntax:
from typing import NamedTuple
def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('sum', float), ('product', float)]):
"""Returns sum and product of two arguments"""
return (a + b, a * b)
Args:
func: The python function to convert
output_component_file: Write a component definition to a local file. Can be used for sharing.
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
'''
component_yaml = func_to_component_text(func, extra_code, base_image)
Path(output_component_file).write_text(component_yaml)
[docs]def func_to_container_op(func, output_component_file=None, base_image=_default_base_image, extra_code=''):
'''
Converts a Python function to a component and returns a task (ContainerOp) factory
Function docstring is used as component description.
Argument and return annotations are used as component input/output types.
To declare a function with multiple return values, use the NamedTuple return annotation syntax:
from typing import NamedTuple
def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('sum', float), ('product', float)]):
"""Returns sum and product of two arguments"""
return (a + b, a * b)
Args:
func: The python function to convert
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
output_component_file: Optional. Write a component definition to a local file. Can be used for sharing.
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
Returns:
A factory function with a strongly-typed signature taken from the python function.
Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp) that can run the original function in a container.
'''
component_spec = _func_to_component_spec(func, extra_code, base_image)
output_component_file = output_component_file or getattr(func, '_component_target_component_file', None)
if output_component_file:
component_dict = component_spec.to_struct()
component_yaml = dump_yaml(component_dict)
Path(output_component_file).write_text(component_yaml)
#TODO: assert ComponentSpec.from_struct(load_yaml(output_component_file)) == component_spec
return _create_task_factory_from_component_spec(component_spec)