Source code for kfp.dsl._resource_op

# Copyright 2019 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict

from ._container_op import BaseOp
from . import _pipeline_param

class Resource(object):
    A wrapper over Argo ResourceTemplate definition object
    which is used to represent the `resource` property in argo's workflow
    template (io.argoproj.workflow.v1alpha1.Template).
      swagger_types (dict): The key is attribute name
                            and the value is attribute type.
      attribute_map (dict): The key is attribute name
                            and the value is json key in definition.
    swagger_types = {
        "action": "str",
        "merge_strategy": "str",
        "success_condition": "str",
        "failure_condition": "str",
        "manifest": "str"
    attribute_map = {
        "action": "action",
        "merge_strategy": "mergeStrategy",
        "success_condition": "successCondition",
        "failure_condition": "failureCondition",
        "manifest": "manifest"

    def __init__(self,
                 action: str = None,
                 merge_strategy: str = None,
                 success_condition: str = None,
                 failure_condition: str = None,
                 manifest: str = None):
        """Create a new instance of Resource"""
        self.action = action
        self.merge_strategy = merge_strategy
        self.success_condition = success_condition
        self.failure_condition = failure_condition
        self.manifest = manifest

[docs]class ResourceOp(BaseOp): """Represents an op which will be translated into a resource template""" def __init__(self, k8s_resource=None, action: str = "create", merge_strategy: str = None, success_condition: str = None, failure_condition: str = None, attribute_outputs: Dict[str, str] = None, **kwargs): """Create a new instance of ResourceOp. Args: k8s_resource: A k8s resource which will be submitted to the cluster action: One of "create"/"delete"/"apply"/"patch" (default is "create") merge_strategy: The merge strategy for the "apply" action success_condition: The successCondition of the template failure_condition: The failureCondition of the template For more info see: attribute_outputs: Maps output labels to resource's json paths, similarly to file_outputs of ContainerOp kwargs: name, sidecars & is_exit_handler. See BaseOp definition Raises: ValueError: if not inside a pipeline if the name is an invalid string if no k8s_resource is provided if merge_strategy is set without "apply" action """ super().__init__(**kwargs) self.attrs_with_pipelineparams = list(self.attrs_with_pipelineparams) self.attrs_with_pipelineparams.extend([ "_resource", "k8s_resource", "attribute_outputs" ]) if k8s_resource is None: ValueError("You need to provide a k8s_resource.") if merge_strategy and action != "apply": ValueError("You can't set merge_strategy when action != 'apply'") init_resource = { "action": action, "merge_strategy": merge_strategy, "success_condition": success_condition, "failure_condition": failure_condition } # `resource` prop in `io.argoproj.workflow.v1alpha1.Template` self._resource = Resource(**init_resource) self.k8s_resource = k8s_resource # Set attribute_outputs extra_attribute_outputs = \ attribute_outputs if attribute_outputs else {} self.attribute_outputs = \ self.attribute_outputs if hasattr(self, "attribute_outputs") \ else {} self.attribute_outputs.update(extra_attribute_outputs) # Add name and manifest if not specified by the user if "name" not in self.attribute_outputs: self.attribute_outputs["name"] = "{}" if "manifest" not in self.attribute_outputs: self.attribute_outputs["manifest"] = "{}" # Set outputs self.outputs = { name: _pipeline_param.PipelineParam(name, for name in self.attribute_outputs.keys() } # If user set a single attribute_output, set self.output as that # parameter, else set it as the resource name self.output = self.outputs["name"] if len(extra_attribute_outputs) == 1: self.output = self.outputs[list(extra_attribute_outputs)[0]] @property def resource(self): """`Resource` object that represents the `resource` property in `io.argoproj.workflow.v1alpha1.Template`. """ return self._resource