Activities
Temporal provides with ZTestActivityEnvironment
that allows to run activities in a local test environment.
General business logic, as well Temporal functionality (such as Activity heartbeats
) can be tested locally with the testkit.
Let's start with some basic imports that will be required for the whole demonstration:
import zio._
import zio.test._
import zio.temporal._
import zio.temporal.activity._
import zio.temporal.testkit._
Simple testsβ
Imagine the following Activity interface:
@activityInterface
trait EchoActivities {
def echo(what: String): String
}
class EchoActivitiesImpl(implicit options: ZActivityRunOptions[Any]) extends EchoActivities {
override def echo(what: String): String =
ZActivity.run {
ZIO
.log(s"Echo message=$what")
.as(s"Echoed $what")
}
}
Here is an example of testing this activity:
object EchoActivitySpec extends ZIOSpecDefault {
override val spec = suite("EchoActivities")(
test("echoes message") {
ZTestActivityEnvironment.activityRunOptionsWithZIO[Any] { implicit options =>
for {
// Provide a "factory" method to construct the activity
_ <- ZTestActivityEnvironment.addActivityImplementation(new EchoActivitiesImpl)
// Get the activity stub
stub <- ZTestActivityEnvironment.newActivityStub[EchoActivities](
ZActivityOptions.withStartToCloseTimeout(5.seconds)
)
// Invoke the activity
result = stub.echo("hello")
// Test your code
} yield assertTrue(result == "Echoed hello")
}
}
).provide(
ZTestEnvironmentOptions.default,
ZTestActivityEnvironment.make[Any]
) @@ TestAspect.withLiveClock
}
Notes
ZTestActivityEnvironment
companion object has a plenty of methods to configure the test environmentZTestActivityEnvironment.activityRunOptions[R]
provides withZActivityRunOptions
needed to run ZIO inside activitiesZTestActivityEnvironment.activityRunOptionsWithZIO[R]
allows building a ZIO accessingZActivityRunOptions
(likeZIO.serviceWithZIO
for ZIO environment)ZTestActivityEnvironment.addActivityImplementation
populates the test environment with Activity implementationsZTestActivityEnvironment.newActivityStub
returns a stub for testing
- Unlike real Activity stubs, those returned by
ZTestActivityEnvironment
run locally. Therefore, its methods can be called directly (without wrapping intoZActivityStub.execute
) - It's recommended to use
TestAspect.withLiveClock
in your tests to avoid hanging tests- It may happen because both zio-test & temporal-testkit use their own virtual clock
Testing heartbeatsβ
Activities using heartbeats might be tested as well. ZTestActivityEnvironment
allows setting heartbeat details before running the activity.
With this feature, it's possible to check whether the Activity successfully recovers from the previous "checkpoint":
Imagine the following activity:
import io.temporal.client.ActivityCompletionException
import java.util.concurrent.atomic.AtomicInteger
@activityInterface
trait FibonacciHeartbeatActivity {
// Let's assume that calculating fibonacci numbers takes some time...
def fibonacciSum(n: Int): BigDecimal
}
object FibonacciHeartbeatActivityImpl {
case class HeartbeatDetails(sum: BigDecimal, curr: Int, prev: Int, step: Int)
}
// The counter is needed for demo purposes
class FibonacciHeartbeatActivityImpl(iterationsCounter: AtomicInteger)(
implicit options: ZActivityRunOptions[Any]
) extends FibonacciHeartbeatActivity {
import FibonacciHeartbeatActivityImpl.HeartbeatDetails
override def fibonacciSum(n: Int): BigDecimal =
ZActivity.run {
val context = ZActivity.executionContext
// Iteration loop
def iter(
sum: BigDecimal,
curr: Int, // current fibonacci number
prev: Int, // previous fibonacci number
step: Int // the iteration
): IO[ActivityCompletionException, BigDecimal] =
if (step >= n) ZIO.succeed(sum)
else
for {
// Performs heartbeats
_ <- context.heartbeat(HeartbeatDetails(sum, curr, prev, step))
_ <- ZIO.logInfo("Sleep...")
// Some delays for demo purposes
_ <- ZIO.sleep((50 * step).millis)
_ = iterationsCounter.incrementAndGet()
newStep = step + 1
newPrev = curr
newCurr = curr + prev
newSum = sum + curr
res <- iter(newSum, newCurr, newPrev, newStep)
} yield res
for {
// Recovers the previous heartbeat details
heartbeatDetails <- context.getHeartbeatDetails[HeartbeatDetails]
progress = heartbeatDetails.getOrElse(
HeartbeatDetails(sum = 0, curr = 1, prev = 0, step = 0)
)
res <- iter(progress.sum, progress.curr, progress.prev, progress.step)
} yield res
}
}
You can now simply test that the activity uses heartbeats:
object FibonacciHeartbeatActivityActivitySpec extends ZIOSpecDefault {
override val spec = suite("FibonacciHeartbeatActivity")(
test("performs heartbeats message") {
ZTestActivityEnvironment.activityRunOptionsWithZIO[Any] { implicit options =>
// Initialize a fresh counter
val numIterations = new AtomicInteger()
for {
_ <- ZTestActivityEnvironment.addActivityImplementation(new FibonacciHeartbeatActivityImpl(numIterations))
stub <- ZTestActivityEnvironment.newActivityStub[FibonacciHeartbeatActivity](
ZActivityOptions.withStartToCloseTimeout(1.minute)
)
// Set heartbeat details as if 3 iterations were performed
_ <- ZTestActivityEnvironment.setHeartbeatDetails(
FibonacciHeartbeatActivityImpl.HeartbeatDetails(
sum = 4,
curr = 3,
prev = 2,
step = 3
)
)
// Calculating takes 5 iterations
result = stub.fibonacciSum(5)
} yield {
assertTrue(result == 12) &&
// Performs only 2 iterations
assertTrue(numIterations.get() == 2)
}
}
}
).provide(
ZTestEnvironmentOptions.default,
ZTestActivityEnvironment.make[Any]
) @@ TestAspect.withLiveClock
}
Notes
ZTestActivityEnvironment.setHeartbeatDetails
sets the heartbeat details before the activity is invoked- Activity implementation will pick up the specified heartbeat details once run