Workers
Building workersβ
Often, the worker application runs more than just one workflow and one activity.
Let's take a look at the following example:
import zio._
import zio.temporal._
// Activity 1
@activityInterface
trait SampleActivity {}
class SampleActivityImpl extends SampleActivity {/*...*/}
// Workflow 1
@workflowInterface
trait WorkflowA {
@workflowMethod
def methodA(): Unit
}
class WorkflowAImpl extends WorkflowA {
override def methodA() = ??? /*...*/
}
// Activity 2
@activityInterface
trait AnotherActivity {}
class AnotherActivityImpl extends AnotherActivity {/*...*/}
// Workflow 2
@workflowInterface
trait WorkflowB {
@workflowMethod
def methodB(): Unit
}
class WorkflowBImpl extends WorkflowB {
override def methodB() = ??? /*...*/
}
// Workflow 3
@workflowInterface
trait WorkflowC {
@workflowMethod
def methodC(): Unit
}
class WorkflowCImpl extends WorkflowC {
override def methodC() = ??? /*...*/
}
Typical code constructing a Worker running all of those Temporal units looks like this:
import zio.temporal.worker._
import zio.temporal.workflow._
ZWorkerFactory.newWorker("<task-queue>") @@
ZWorker.addWorkflow[WorkflowAImpl].fromClass @@
ZWorker.addWorkflow[WorkflowBImpl].fromClass @@
ZWorker.addWorkflow[WorkflowCImpl].fromClass @@
ZWorker.addActivityImplementation(new SampleActivityImpl) @@
ZWorker.addActivityImplementation(new AnotherActivityImpl)
It might satisfy most of your needs. However, in case the number of workflows and activities grows in your application,
you might need a way to provide a bigger number of workflows easily. This is when ZWorkflowImplementationClass
and ZActivityImplementationObject
helpers come to the rescue:
val workflowClasses: List[ZWorkflowImplementationClass[_]] = List(
ZWorkflowImplementationClass[WorkflowAImpl],
ZWorkflowImplementationClass[WorkflowBImpl],
ZWorkflowImplementationClass[WorkflowCImpl],
/* ... other workflows */
)
// workflowClasses: List[ZWorkflowImplementationClass[_]] = List(
// ZWorkflowImplementationClass(
// runtimeClass = class repl.MdocSession$MdocApp$WorkflowAImpl
// ),
// ZWorkflowImplementationClass(
// runtimeClass = class repl.MdocSession$MdocApp$WorkflowBImpl
// ),
// ZWorkflowImplementationClass(
// runtimeClass = class repl.MdocSession$MdocApp$WorkflowCImpl
// )
// )
import zio.temporal.activity._
val activityImplementations: List[ZActivityImplementationObject[_]] = List(
ZActivityImplementationObject(new SampleActivityImpl),
ZActivityImplementationObject(new AnotherActivityImpl),
/* ... other activities */
)
// activityImplementations: List[ZActivityImplementationObject[_]] = List(
// ZActivityImplementation(value=repl.MdocSession$MdocApp$SampleActivityImpl@5de17e84),
// ZActivityImplementation(value=repl.MdocSession$MdocApp$AnotherActivityImpl@68493d1c)
// )
Then, the worker can be constructed as follows:
ZWorkerFactory.newWorker("<task-queue>") @@
ZWorker.addWorkflowImplementations(workflowClasses) @@
ZWorker.addActivityImplementations(activityImplementations)
Important notes:
ZWorkflowImplementationClass
helper checks if the wrapped type is a correct workflow implementation at compile time. Classes not annotated properly or with a missing public nullary constructors won't compile.ZActivityImplementationObject
helper checks if the wrapped type is a correct activity implementation at compile time. Classes not annotated properly won't compile.
Here is an example of compilation errors:
Case 1: missing @workflowInterface
class Foo {}
ZWorkflowImplementationClass[Foo]
// error: repl.MdocSession.MdocApp.Foo is not a workflow.
// Workflow interface must have @workflowInterface annotation.
// Hint: if the workflow type is used as a type parameter,
// add implicit IsWorkflow[repl.MdocSession.MdocApp.Foo] do the class or method definition
// ZWorkflowImplementationClass[Foo]
// ^
Case 2: missing public constructor
@workflowInterface
class Bar private() {}
ZWorkflowImplementationClass[Bar]
// error: repl.MdocSession.MdocApp.Bar should have a public zero-argument constructor
// ZWorkflowImplementationClass[Bar]
// ^
Case 3: not an activity interface
class Borscht {}
ZActivityImplementationObject(new Borscht)
// error: repl.MdocSession.MdocApp.Borscht is not an activity.
// Activity interface must have @activityInterface annotation.
// Hint: if the workflow type is used as a type parameter,
// add implicit IsActivity[repl.MdocSession.MdocApp.Borscht] do the class or method definition
ZLayer Interopβ
It is usual for activities to have dependencies. The ZActivityImplementationObject
can also be constructed from a ZLayer:
val sampleActivityLayer: ULayer[SampleActivityImpl] =
ZLayer.succeed(new SampleActivityImpl)
// sampleActivityLayer: ULayer[SampleActivityImpl] = Suspend(
// self = zio.ZLayer$$$Lambda$8995/0x00007fea09a3d0c0@6063688f
// )
val anotherActivityLayer: ULayer[AnotherActivityImpl] =
ZLayer.succeed(new AnotherActivityImpl)
// anotherActivityLayer: ULayer[AnotherActivityImpl] = Suspend(
// self = zio.ZLayer$$$Lambda$8995/0x00007fea09a3d0c0@1cad4b58
// )
val activitiesLayer: ULayer[List[ZActivityImplementationObject[_]]] =
ZLayer.collectAll(
List(
ZActivityImplementationObject.layer(sampleActivityLayer),
ZActivityImplementationObject.layer(anotherActivityLayer)
)
)
// activitiesLayer: ULayer[List[ZActivityImplementationObject[_]]] = Suspend(
// self = zio.ZLayer$$$Lambda$8997/0x00007fea09a3d748@46049072
// )
The same worker can then be constructed as follows:
ZWorkerFactory.newWorker("<task-queue>") @@
ZWorker.addWorkflowImplementations(workflowClasses) @@
ZWorker.addActivityImplementationsLayer(activitiesLayer)