Protobuf
ZIO-Temporal provides with the zio-temporal-protobuf module allowing to use Protobuf transport. To use Protobuf, you must properly install Scalapb
Defining protobuf filesβ
When using Protobuf, it's first required to write protobuf message definition:
syntax = "proto2";
package com.example.workflow;
message WorkflowParams {
required string message = 1;
// Unix time
required int64 processingTime = 2;
}
// ZIO Temporal ships some non-standard data types, such as UUID
import "zio-temporal.proto";
message WorkflowResult {
required zio.temporal.protobuf.UUID processId = 1;
}
Scalapb compiles such a message into a case class similar to this one:
case class WorkflowParams(message: String, processingTime: Long)
case class WorkflowResult(processId: zio.temporal.protobuf.UUID)
It can be then used in workflow interface definitions:
import zio._
import zio.temporal._
import zio.temporal.workflow._
@workflowInterface
trait SampleWorkflow {
@workflowMethod
def process(params: WorkflowParams): WorkflowResult
}
When implementing the workflow, you might need to convert from protobuf representation to your own.
For instance, it's much easier operating with java.time types rather than Unix time represented as Long.
Here zio.temporal.protobuf.syntax comes to the rescue!
import zio.temporal.protobuf.syntax._
import java.time.LocalDateTime
class SampleWorkflowImpl extends SampleWorkflow {
private val logger = ZWorkflow.makeLogger
override def process(params: WorkflowParams): WorkflowResult = {
// Converts from Protobuf representation
val processingTime = params.processingTime.fromProto[LocalDateTime]
// Always use this method to get the current time
// Otherwise, the workflow won't be deterministic
val now = ZWorkflow.currentTimeMillis.toLocalDateTime()
val lag = java.time.Duration.between(processingTime, now)
logger.info(s"The lag is $lag")
// Do stuff
// ...
val processId = ZWorkflow.randomUUID
// Converts into Protobuf representation
WorkflowResult(processId.toProto)
}
}
Notes
import zio.temporal.protobuf.syntax._is requiredfromProtoextension method converts the data type from the Protobuf representationtoProtoextension method converts the data type into the Protobuf representation
In order to use Protobuf serializer, it's required to override the default DataConverter:
import zio.temporal.workflow.ZWorkflowClientOptions
import zio.temporal.protobuf.ProtobufDataConverter
val optionsLayer =
ZWorkflowClientOptions.make @@
ZWorkflowClientOptions.withDataConverter(ProtobufDataConverter.make())
// optionsLayer: ZLayer[Any, Config.Error, ZWorkflowClientOptions] = Suspend(
// self = zio.ZLayer$$Lambda$8955/0x00007fea09a20580@40c24e51
// )