Queries
A Query is a synchronous operation that is used to get the state of a Workflow Execution.
Defining query methodsβ
Let's start with some basic imports that will be required for the whole demonstration:
import zio._
import zio.temporal._
import zio.temporal.worker._
import zio.temporal.state._
import zio.temporal.workflow._
import zio.temporal.activity._
import java.util.UUID
Consider following activity from the Workflow state
section:
@activityInterface
trait PaymentActivity {
def debit(amount: BigDecimal, from: String): Unit
def credit(amount: BigDecimal, to: String): Unit
}
We'll represent the payment state with such an enumeration:
sealed trait PaymentState
object PaymentState {
case object Initial extends PaymentState
case object Debited extends PaymentState
case object Credited extends PaymentState
}
We can check our PaymentWorkflow
's current state using query methods:
@workflowInterface
trait PaymentWorkflow {
@workflowMethod
def proceed(amount: BigDecimal, from: String, to: String): Unit
@queryMethod
def getPaymentState(): PaymentState
}
Method for retrieving the Workflow Execution state should have an @queryMethod
annotation.
Then we could implement a stateful workflow as follows:
class PaymentWorkflowImpl extends PaymentWorkflow {
private val paymentActivity: ZActivityStub.Of[PaymentActivity] =
ZWorkflow.newActivityStub[PaymentActivity](
ZActivityOptions.withStartToCloseTimeout(10.seconds)
)
private val paymentState = ZWorkflowState.make[PaymentState](PaymentState.Initial)
override def getPaymentState(): PaymentState = paymentState.snapshot
override def proceed(amount: BigDecimal, from: String, to: String): Unit = {
ZActivityStub.execute(
paymentActivity.debit(amount, from)
)
paymentState := PaymentState.Debited
// Let's add a pause here to emulate long-running workflow
ZWorkflow.sleep(5.seconds)
ZActivityStub.execute(
paymentActivity.credit(amount, to)
)
paymentState := PaymentState.Credited
}
}
Querying the stateβ
Querying the workflow state is as simple as executing workflows! First, you will need to create a Workflow stub and start the workflow:
val transactionId = UUID.randomUUID().toString
val startWorkflow = ZIO.serviceWithZIO[ZWorkflowClient] { workflowClient =>
for {
paymentWorkflow <- workflowClient.newWorkflowStub[PaymentWorkflow](
ZWorkflowOptions
.withWorkflowId(transactionId)
.withTaskQueue("payment-queue")
.withWorkflowRunTimeout(10.second)
)
_ <- ZWorkflowStub.execute(
paymentWorkflow.proceed(amount = 42, from = "me", to = "you")
)
} yield ()
}
While workflow is started inside one process, its state could be queried from another.
That's why we need a way to retrieve the workflow state simply, by the Workflow ID.
In Temporal, there is an Untyped workflow stub
which shares this functionality.
In ZIO Temporal
, there is a type-safe wrapper called Workflow stub proxy
.
To create a stub proxy, you'll need the same Workflow client:
val paymentWorkflowZIO = ZIO.serviceWithZIO[ZWorkflowClient] { workflowClient =>
workflowClient.newWorkflowStub[PaymentWorkflow](workflowId = transactionId)
}
Using the stub proxy, you can finally query the workflow state:
val currentWorkflowStateZIO = for {
_ <- startWorkflow
paymentWorkflow <- paymentWorkflowZIO
state <- ZWorkflowStub.query(paymentWorkflow.getPaymentState())
} yield state
Important notes:
- Reminder: you must always wrap the query method invocation into
ZWorkflowStub.query
method.paymentWorkflow.getPaymentState()
invocation would be re-written into an untyped Temporal's query invocation- A direct method invocation will throw an exception
- Reminder: querying workflow state = calling a remote server
NOTE: Do not annotate workflow stubs with the workflow interface type. It must be ZWorkflowStub.Of[EchoWorkflow]
.
Otherwise, you'll get a compile-time error:
def doSomething(paymentWorkflow: PaymentWorkflow): TemporalIO[PaymentState] =
ZWorkflowStub.query(paymentWorkflow.getPaymentState())
// error: zio.temporal.workflow.ZWorkflowStub.query must be used only with typed zio.temporal.workflow.ZWorkflowStub.Of[A],
// but repl.MdocSession.MdocApp.PaymentWorkflow found. Perhaps you added an explicit type annotation?
// The actual type must be zio.temporal.workflow.ZWorkflowStub.Of[repl.MdocSession.MdocApp.PaymentWorkflow]
// ZWorkflowStub.query(paymentWorkflow.getPaymentState())
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^