Generic workflows
In order to define Workflow interfaces with type parameters, it's required to follow some guidelines.
Let's start with some basic imports that will be required for the whole demonstration:
import zio._
import zio.temporal._
import zio.temporal.worker._
import zio.temporal.workflow._
import scala.reflect.ClassTag
Ensure correct serialization setupβ
ZIO-Temporal is build on top of Java SDK and runs on JVM, which brings some limitations, such as type erasure.
Imagine a workflow interface with unconstrained type parameters:
@workflowInterface
trait BadGenericWorkflow[Input] {
@workflowMethod
def process(input: Input): Unit
}
def executeBadWorkflow[Input](
stub: ZWorkflowStub.Of[BadGenericWorkflow[Input]]
)(input: Input): TemporalIO[Unit] =
ZWorkflowStub.execute(
stub.process(input)
)
Invoking such a workflow will likely fail in runtime. Therefore, ZIO-Temporal provides a meaningful warning when executing such a workflow:
[warn] BadGenericWorkflow.scala:20:26: method process parameter `input` type `Input` will be erased to java.lang.Object in runtime!
[warn] It would likely cause an error because Temporal client requires more specific type information for deserialization
[warn] Hint: if `Input` is a type parameter of the workflow interface, provide an upper-bound for the type parameter
[warn] ZWorkflowStub.execute(
To avoid possible issues:
- Define a common parent type for possible workflow inputs
- Specify the upper-bound type in the workflow interface
This is how proper jackson-serialized types look like
import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}
// NOTE: jackson (de)serialization won't work without additional annotations
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(
Array(
new JsonSubTypes.Type(value = classOf[Drink.Soda], name = "Soda"),
new JsonSubTypes.Type(value = classOf[Drink.Juice], name = "Juice")
)
)
sealed trait Drink
object Drink {
case class Soda(kind: String) extends Drink
case class Juice(kind: String, volume: Int) extends Drink
}
Define workflow interfaces properlyβ
This is how the proper workflow looks like:
// Intentionally missing the @workflowInterface annotation
trait DrinksWorkflows[DInput <: Drink] {
@workflowMethod
def process(input: DInput): Unit
}
Let's define 2 concrete workflow interfaces:
@workflowInterface
trait SodaWorkflow extends DrinksWorkflows[Drink.Soda]
@workflowInterface
trait JuiceWorkflow extends DrinksWorkflows[Drink.Juice]
And simple implementations:
class SodaWorkflowImpl extends SodaWorkflow {
override def process(input: Drink.Soda): Unit =
println(s"Simple soda: $input")
}
class JuiceWorkflowImpl extends JuiceWorkflow {
override def process(input: Drink.Juice): Unit =
println(s"Simple juice: $input")
}
Now they can be used easily:
def executeWorkflow[DInput <: Drink](
stub: ZWorkflowStub.Of[DrinksWorkflows[DInput]]
)(input: DInput): TemporalIO[Unit] =
ZWorkflowStub.execute(
stub.process(input)
)
ZIO.serviceWithZIO[ZWorkflowClient] { client =>
for {
// Create stubs
sodaWorkflow <- client.newWorkflowStub[SodaWorkflow](
ZWorkflowOptions
.withWorkflowId("soda-workflow-id")
.withTaskQueue("generic")
)
juiceWorkflow <- client.newWorkflowStub[JuiceWorkflow](
ZWorkflowOptions
.withWorkflowId("juice-workflow-id")
.withTaskQueue("generic")
)
_ <- executeWorkflow(sodaWorkflow)(Drink.Soda("Coke"))
_ <- executeWorkflow(juiceWorkflow)(Drink.Juice("Orange", volume = 2))
} yield ()
}
Working with workflows as genericsβ
Here goes a tricky example. Let's start with a simple parent workflow which will then run workflows above:
// Intentionally missing the @workflowInterface annotation
trait DrinkFactory {
@workflowMethod
def processDrinks(count: Int): Unit
}
It's supposed to be 2 implementations of DrinkFactory
: for Soda
and Juice
. They will both need to invoke corresponding DrinkWorkflow
.
In case you those workflows need to share some common logic, it could be extracted into an intermediate abstract class:
abstract class ParallelDrinkFactory[
DInput <: Drink,
DWorkflow <: DrinksWorkflows[DInput]: IsWorkflow: ClassTag
] extends DrinkFactory {
// Abstract method to be implemented by inheritors
protected def produceDrinks(count: Int): List[DInput]
override def processDrinks(count: Int): Unit = {
ZAsync.foreachPar(produceDrinks(count)) { input =>
// Polymorphically starts child workflows
val child = ZWorkflow.newChildWorkflowStub[DWorkflow](
ZChildWorkflowOptions.withWorkflowId("<meaningful-workflow-id>")
)
ZChildWorkflowStub.executeAsync(
child.process(input)
)
}.run.getOrThrow
}
}
Having this abstract class, we can define concrete workflow interfaces:
@workflowInterface
trait SodaFactory extends DrinkFactory
@workflowInterface
trait JuiceFactory extends DrinkFactory
And their implementations:
class SodaFactoryImpl
extends ParallelDrinkFactory[Drink.Soda, SodaWorkflow]
with SodaFactory {
override protected def produceDrinks(count: Int): List[Drink.Soda] =
List.tabulate(count)(n => Drink.Soda(s"Coke #$n"))
}
class JuiceFactoryImpl
extends ParallelDrinkFactory[Drink.Juice, JuiceWorkflow]
with JuiceFactory {
override protected def produceDrinks(count: Int): List[Drink.Juice] =
List.tabulate(count)(n => Drink.Juice(s"Juice #$n", volume = n))
}
Notes:
- In
ParallelDrinkFactory
, it's required to specify the upper-bound type forDWorkflow
- It's also required to have implicit
IsWorkflow
&ClassTag
.- That's a witness that
DWorkflow
is actually a workflow interface. - Without those instance, ZIO-Temporal won't compile the code because it is potentially unsafe
- That's a witness that
Example error:
abstract class BadParentWorkflow[
DInput <: Drink,
// Ask for ClassTag, but not for IsWorkflow
DWorkflow <: DrinksWorkflows[DInput]: ClassTag
] {
// should not compile
private val child = ZWorkflow.newChildWorkflowStub[DWorkflow](
ZChildWorkflowOptions.withWorkflowId("<meaningful-workflow-id>")
)
}
// error: DWorkflow is not a workflow.
// Workflow interface must have @workflowInterface annotation.
// Hint: if the workflow type is used as a type parameter,
// add implicit IsWorkflow[DWorkflow] do the class or method definition
// private val child = ZWorkflow.newChildWorkflowStub[DWorkflow](
// ^