Skip to main content

Workers

Building workers​

Often, the worker application runs more than just one workflow and one activity.
Let's take a look at the following example:

import zio._
import zio.temporal._

// Activity 1
@activityInterface
trait SampleActivity {}
class SampleActivityImpl extends SampleActivity {/*...*/}

// Workflow 1
@workflowInterface
trait WorkflowA {
@workflowMethod
def methodA(): Unit
}
class WorkflowAImpl extends WorkflowA {
override def methodA() = ??? /*...*/
}

// Activity 2
@activityInterface
trait AnotherActivity {}
class AnotherActivityImpl extends AnotherActivity {/*...*/}

// Workflow 2
@workflowInterface
trait WorkflowB {
@workflowMethod
def methodB(): Unit
}
class WorkflowBImpl extends WorkflowB {
override def methodB() = ??? /*...*/
}

// Workflow 3
@workflowInterface
trait WorkflowC {
@workflowMethod
def methodC(): Unit
}
class WorkflowCImpl extends WorkflowC {
override def methodC() = ??? /*...*/
}

Typical code constructing a Worker running all of those Temporal units looks like this:

import zio.temporal.worker._
import zio.temporal.workflow._

ZWorkerFactory.newWorker("<task-queue>") @@
ZWorker.addWorkflow[WorkflowAImpl].fromClass @@
ZWorker.addWorkflow[WorkflowBImpl].fromClass @@
ZWorker.addWorkflow[WorkflowCImpl].fromClass @@
ZWorker.addActivityImplementation(new SampleActivityImpl) @@
ZWorker.addActivityImplementation(new AnotherActivityImpl)

It might satisfy most of your needs. However, in case the number of workflows and activities grows in your application,
you might need a way to provide a bigger number of workflows easily. This is when ZWorkflowImplementationClass and ZActivityImplementationObject helpers come to the rescue:

val workflowClasses: List[ZWorkflowImplementationClass[_]] = List(
ZWorkflowImplementationClass[WorkflowAImpl],
ZWorkflowImplementationClass[WorkflowBImpl],
ZWorkflowImplementationClass[WorkflowCImpl],
/* ... other workflows */
)
// workflowClasses: List[ZWorkflowImplementationClass[_]] = List(
// ZWorkflowImplementationClass(
// runtimeClass = class repl.MdocSession$MdocApp$WorkflowAImpl
// ),
// ZWorkflowImplementationClass(
// runtimeClass = class repl.MdocSession$MdocApp$WorkflowBImpl
// ),
// ZWorkflowImplementationClass(
// runtimeClass = class repl.MdocSession$MdocApp$WorkflowCImpl
// )
// )

import zio.temporal.activity._

val activityImplementations: List[ZActivityImplementationObject[_]] = List(
ZActivityImplementationObject(new SampleActivityImpl),
ZActivityImplementationObject(new AnotherActivityImpl),
/* ... other activities */
)
// activityImplementations: List[ZActivityImplementationObject[_]] = List(
// ZActivityImplementation(value=repl.MdocSession$MdocApp$SampleActivityImpl@7d9a9267),
// ZActivityImplementation(value=repl.MdocSession$MdocApp$AnotherActivityImpl@36e23ac8)
// )

Then, the worker can be constructed as follows:

ZWorkerFactory.newWorker("<task-queue>") @@
ZWorker.addWorkflowImplementations(workflowClasses) @@
ZWorker.addActivityImplementations(activityImplementations)

Important notes:

  • ZWorkflowImplementationClass helper checks if the wrapped type is a correct workflow implementation at compile time. Classes not annotated properly or with a missing public nullary constructors won't compile.
  • ZActivityImplementationObject helper checks if the wrapped type is a correct activity implementation at compile time. Classes not annotated properly won't compile.

Here is an example of compilation errors:

Case 1: missing @workflowInterface

class Foo {}
ZWorkflowImplementationClass[Foo]
// error: repl.MdocSession.MdocApp.Foo is not a workflow.
// Workflow interface must have @workflowInterface annotation.
// Hint: if the workflow type is used as a type parameter,
// add implicit IsWorkflow[repl.MdocSession.MdocApp.Foo] do the class or method definition
// ZWorkflowImplementationClass[Foo]
// ^

Case 2: missing public constructor

@workflowInterface
class Bar private() {}

ZWorkflowImplementationClass[Bar]
// error: repl.MdocSession.MdocApp.Bar should have a public zero-argument constructor
// ZWorkflowImplementationClass[Bar]
// ^

Case 3: not an activity interface

class Borscht {}

ZActivityImplementationObject(new Borscht)
// error: repl.MdocSession.MdocApp.Borscht is not an activity.
// Activity interface must have @activityInterface annotation.
// Hint: if the workflow type is used as a type parameter,
// add implicit IsActivity[repl.MdocSession.MdocApp.Borscht] do the class or method definition
// ZActivityImplementationObject(new Borscht)
// ^

ZLayer Interop​

It is usual for activities to have dependencies. The ZActivityImplementationObject can also be constructed from a ZLayer:

val sampleActivityLayer: ULayer[SampleActivityImpl] =
ZLayer.succeed(new SampleActivityImpl)
// sampleActivityLayer: ULayer[SampleActivityImpl] = Suspend(
// self = zio.ZLayer$$$Lambda$8581/0x00007f228d9b2c40@5965bfb6
// )

val anotherActivityLayer: ULayer[AnotherActivityImpl] =
ZLayer.succeed(new AnotherActivityImpl)
// anotherActivityLayer: ULayer[AnotherActivityImpl] = Suspend(
// self = zio.ZLayer$$$Lambda$8581/0x00007f228d9b2c40@178b7ef5
// )

val activitiesLayer: ULayer[List[ZActivityImplementationObject[_]]] =
ZLayer.collectAll(
List(
ZActivityImplementationObject.layer(sampleActivityLayer),
ZActivityImplementationObject.layer(anotherActivityLayer)
)
)
// activitiesLayer: ULayer[List[ZActivityImplementationObject[_]]] = Suspend(
// self = zio.ZLayer$$$Lambda$8674/0x00007f228d9ea058@11dbafea
// )

The same worker can then be constructed as follows:

ZWorkerFactory.newWorker("<task-queue>") @@
ZWorker.addWorkflowImplementations(workflowClasses) @@
ZWorker.addActivityImplementationsLayer(activitiesLayer)