Observability
The observability section of the Temporal Developer's guide covers the many ways to view the current state of your Temporal Application that is, ways to view which Workflow Executions are tracked by the Temporal Platform and the state of any specified Workflow Execution, either currently or at points of an execution.
This section covers features related to viewing the state of the application, including:
- Metrics
- Tracing
- Logging
- Visibility
How to emit metricsβ
Each Temporal SDK is capable of emitting an optional set of metrics from either the Client or the Worker process. For a complete list of metrics capable of being emitted, see the SDK metrics referenceLink preview icon.
Metrics can be scraped and stored in time series databases, such as:
- Prometheus
- M3db
- statsd
For more information about dasbharods, see Temporal Java SDK guide
To emit metrics, use the MicrometerClientStatsReporter
class to integrate with Micrometer MeterRegistry
configured for your metrics backend. Micrometer is a popular Java framework that provides integration with Prometheus
and other backends.
The following example shows how to use MicrometerClientStatsReporter
to define the metrics scope and set it with the
ZWorkflowServiceStubsOptions
.
(1) Add necessary dependencies
libraryDependencies ++= Seq(
// Temporal integration with opentracing
"io.temporal" % "temporal-opentracing" % "<temporal-version>",
// Micrometer-otlp integration
"io.micrometer" % "micrometer-registry-otlp" % "<micrometer-version>",
// Opentelemetry libs
"io.opentelemetry" % "opentelemetry-api" % "<otel-version>",
"io.opentelemetry" % "opentelemetry-exporter-otlp" % "<otel-version>",
"io.opentelemetry" % "opentelemetry-extension-trace-propagators" % "<otel-version>",
"io.opentelemetry" % "opentelemetry-opentracing-shim" % "<otel-version>"
)
(2) Configure the Opentelemetry-based metrics registry & provide it to the ZWorkflowServiceStubsOptions
:
import zio._
import zio.temporal._
import zio.temporal.workflow._
// required for metrics
import com.uber.m3.tally.RootScopeBuilder
import io.micrometer.registry.otlp.{OtlpConfig, OtlpMeterRegistry}
import io.temporal.common.reporter.MicrometerClientStatsReporter
// OtlpConfig is a SAM, so Map#get is easily convertable into OtlpConfig
val otlpConfig: OtlpConfig =
Map(
"url" -> "http://otlp-server-endpoint:4317",
"resourceAttributes" -> "service.name=<service-name>"
).get(_).orNull
// otlpConfig: OtlpConfig = repl.MdocSession$MdocApp$$anonfun$1@19ea0eb3
val metricsScope = new RootScopeBuilder()
.reporter(
new MicrometerClientStatsReporter(
new OtlpMeterRegistry(
otlpConfig,
io.micrometer.core.instrument.Clock.SYSTEM
)
)
)
// it's usually better to use bigger intervals in production
.reportEvery(5.seconds)
// metricsScope: com.uber.m3.tally.Scope = com.uber.m3.tally.ScopeImpl@5e21304f
val workflowServiceStubsOptionsLayer =
ZWorkflowServiceStubsOptions.make @@
ZWorkflowServiceStubsOptions.withMetricsScope(metricsScope)
// workflowServiceStubsOptionsLayer: ZLayer[Any, Config.Error, ZWorkflowServiceStubsOptions] = Suspend(
// self = zio.ZLayer$$Lambda$8955/0x00007fea09a20580@188589a2
// )
For more details, find Monitoring samples. For details on configuring a OTLP scrape endpoint with Micrometer, see Micrometer OTLP doc
How to setup Tracingβ
Tracing allows you to view the call graph of a Workflow along with its Activities and any Child Workflows.
Temporal Web's tracing capabilities mainly track Activity Execution within a Temporal context. If you need custom tracing specific for your use case, you should make use of context propagation to add tracing logic accordingly.
Both client-side & worker-side tracing requires OpenTracingOptions
. It can be build in the following way:
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.{ContextPropagators, TextMapPropagator}
import io.temporal.opentracing.OpenTracingOptions
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.opentracingshim.OpenTracingShim
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.extension.trace.propagation.OtTracePropagator
import io.opentelemetry.semconv.ServiceAttributes
val tracingOptions: OpenTracingOptions = {
val selfResource = Resource.getDefault.merge(
Resource.create(Attributes.of(ServiceAttributes.SERVICE_NAME, "<resource-name>"))
)
val spanProcessor = SimpleSpanProcessor.create(
OtlpGrpcSpanExporter
.builder()
.setEndpoint("http://otlp-server-endpoint:4317")
.setTimeout(5.seconds)
.build()
)
val tracerProvider = SdkTracerProvider
.builder()
.addSpanProcessor(spanProcessor)
.setResource(selfResource)
.build()
val propagators = ContextPropagators.create(
TextMapPropagator.composite(
W3CTraceContextPropagator.getInstance(),
OtTracePropagator.getInstance()
)
)
OpenTracingOptions
.newBuilder()
.setTracer(
OpenTracingShim.createTracerShim(
OpenTelemetrySdk
.builder()
.setPropagators(propagators)
.setTracerProvider(tracerProvider)
.build()
)
)
.build()
}
// tracingOptions: OpenTracingOptions = io.temporal.opentracing.OpenTracingOptions@55f1f123
To configure tracing, register the OpenTracingClientInterceptor
interceptor. You can register the interceptors
on both the Temporal Client side and the Worker side.
The following code examples demonstrate the OpenTracingClientInterceptor
on the Temporal Client.
import io.temporal.opentracing.OpenTracingClientInterceptor
val otlpClientInterceptor = new OpenTracingClientInterceptor(tracingOptions)
// otlpClientInterceptor: OpenTracingClientInterceptor = io.temporal.opentracing.OpenTracingClientInterceptor@2a15bdef
val workflowClientOptionsLayer = ZWorkflowClientOptions.make @@
ZWorkflowClientOptions.withInterceptors(otlpClientInterceptor)
// workflowClientOptionsLayer: ZLayer[Any, Config.Error, ZWorkflowClientOptions] = Suspend(
// self = zio.ZLayer$$Lambda$8955/0x00007fea09a20580@724b29c8
// )
The following code examples demonstrate the OpenTracingWorkerInterceptor
on the Worker:
import io.temporal.opentracing.OpenTracingWorkerInterceptor
import zio.temporal.worker._
val otlpWorkerInterceptor = new OpenTracingWorkerInterceptor(tracingOptions)
// otlpWorkerInterceptor: OpenTracingWorkerInterceptor = io.temporal.opentracing.OpenTracingWorkerInterceptor@52962aa3
val workerFactoryOptionsLayer = ZWorkerFactoryOptions.make @@
ZWorkerFactoryOptions.withWorkerInterceptors(otlpWorkerInterceptor)
// workerFactoryOptionsLayer: ZLayer[Any, Config.Error, ZWorkerFactoryOptions] = Suspend(
// self = zio.ZLayer$$Lambda$8955/0x00007fea09a20580@3589361c
// )
For more information, see the Temporal OpenTracing module
How to log from a Workflowβ
Send logs and errors to a logging service, so that when things go wrong, you can see what happened.
The SDK core uses WARN for its default logging level.
To get a standard slf4j
logger in your Workflow code, use the ZWorkflow.getLogger
method:
import zio.temporal._
import zio.temporal.workflow._
@workflowInterface
trait MyWorkflow {
// ...workflow methods
}
class MyWorkflowImpl extends MyWorkflow {
private val logger = ZWorkflow.getLogger(getClass)
// ...workflow methods
}
To avoid passing the current class, you can use ZWorkflow.makeLogger
method:
class MyWorkflowImpl2 extends MyWorkflow {
private val logger = ZWorkflow.makeLogger
// ...workflow methods
}
How to use Visibility APIsβ
The term Visibility, within the Temporal Platform, refers to the subsystems and APIs that enable an operator to view Workflow Executions that currently exist within a Cluster.
How to use Search Attributesβ
The typical method of retrieving a Workflow Execution is by its Workflow Id.
However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.
You can do this with Search Attributes.
- Default Search Attributes like
WorkflowType
,StartTime
andExecutionStatus
are automatically added to Workflow Executions. - Custom Search Attributes can contain their own domain-specific data (like
customerId
ornumItems
).- A few generic Custom Search Attributes like
CustomKeywordField
andCustomIntField
are created by default in Temporal's Docker Compose.
- A few generic Custom Search Attributes like
The steps to using custom Search Attributes are:
- Create a new Search Attribute in your Cluster using
tctl search-attribute create
or the Cloud UI. - Set the value of the Search Attribute for a Workflow Execution:
- On the Client by including it as an option when starting the Execution.
- In the Workflow by calling
upsertSearchAttributes
.
- Read the value of the Search Attribute:
- On the Client by calling
describeWorkflow
. - In the Workflow by looking at
workflowInfo
.
- On the Client by calling
- Query Workflow Executions by the Search Attribute using a List Filter:
- In
tctl
. - In code by calling
listWorkflowExecutions
.
Here is how to query Workflow Executions:
Search attribute typesβ
ZIO Temporal encodes a lot of custom types on top of those supported by Temporal platform, including:
- Primitive types (
Int
,Long
,Double
,Boolean
) String
UUID
(encoded askeyword
)BigInt
,BigDecimal
Option
- Scala collections (
Set
,List
,Array
, etc.) - Some
java.time
classes (Instant
,LocalDateTime
,OffsetDateTime
) Enumeratum
enums,Scala 3
enums (askeyword
)
Note that String
can be encoded both as text
and keyword
. By default, it's text
. If you need it to be encoded
as keyword
, you must wrap it into ZSearchAttribute.keyword
method.
Other types encoded as keyword
(such as UUID
) should be wrapped as well.
ZIO Temporal methods to set search attributes usually accept Map[String, ZSearchAttribute]
.
For simple types, just wrap them with ZSearchAttribute()
method call, while keyword
-based types should be wrapped
into ZSearchAttribute.keyword
method:
import java.util.UUID
import java.time.LocalDateTime
val searchAttributes: Map[String, ZSearchAttribute] =
Map(
"TextAttr" -> ZSearchAttribute("foo"),
"KeywordAttr" -> ZSearchAttribute.keyword("bar"),
"KeywordAttr2" -> ZSearchAttribute.keyword(UUID.randomUUID()),
"DateAttr" -> ZSearchAttribute(LocalDateTime.now())
)
// searchAttributes: Map[String, ZSearchAttribute] = Map(
// "TextAttr" -> ZSearchAttribute(value=foo, meta=zio.temporal.ZSearchAttributeMeta$StringMeta$@2cfb7d91),
// "KeywordAttr" -> ZSearchAttribute(value=bar, meta=zio.temporal.ZSearchAttributeMeta$KeywordMeta@6d73d7a4),
// "KeywordAttr2" -> ZSearchAttribute(value=669f85f3-587a-4896-87fa-c005b217355a, meta=zio.temporal.ZSearchAttributeMeta$$anon$1@24f17f9d),
// "DateAttr" -> ZSearchAttribute(value=2024-08-23T20:09:58.231118401, meta=zio.temporal.ZSearchAttributeMeta$$anon$1@2413883c)
// )
Enumeratum
enum example:
import enumeratum.{Enum, EnumEntry}
import zio.temporal._
import zio.temporal.enumeratum._
sealed trait Color extends EnumEntry
object Color extends Enum[Color] {
case object Red extends Color
case object Green extends Color
case object Blue extends Color
override val values = findValues
}
val otherSearchAttributes: Map[String, ZSearchAttribute] = Map(
"EnumAttr" -> ZSearchAttribute.keyword[Color](Color.Green),
"OptionEnum" -> ZSearchAttribute.keyword(Option[Color](Color.Red)),
"OptionEnum2" -> ZSearchAttribute.keyword(Option.empty[Color])
)
// otherSearchAttributes: Map[String, ZSearchAttribute] = Map(
// "EnumAttr" -> ZSearchAttribute(value=Green, meta=zio.temporal.ZSearchAttributeMeta$KeywordMeta@36e205ea),
// "OptionEnum" -> ZSearchAttribute(value=Some(Red), meta=zio.temporal.ZSearchAttributeMeta$OptionMeta@5ebc432d),
// "OptionEnum2" -> ZSearchAttribute(value=None, meta=zio.temporal.ZSearchAttributeMeta$OptionMeta@f487fc3)
// )
Same example with Scala 3
enums
import zio.temporal._
enum Color {
case Red, Green, Blue
}
val otherSearchAttributes: Map[String, ZSearchAttribute] = Map(
"EnumAttr" -> ZSearchAttribute.keyword[Color](Color.Green),
"OptionEnum" -> ZSearchAttribute.keyword(Option[Color](Color.Red)),
"OptionEnum2" -> ZSearchAttribute.keyword(Option.empty[Color])
)
How to set custom Search Attributesβ
After you've created custom Search Attributes in your Cluster (using tctl search-attribute create
or the Cloud UI),
you can set the values of the custom Search Attributes when starting a Workflow.
To set a custom Search Attribute, call the withSearchAttributes
method.
(1) Define the workflow
import zio.temporal._
@workflowInterface
trait MyWorkflow {
@workflowMethod
def someMethod(): Unit
}
(2) On the client side, create workflow options with search attributes:
import zio._
import zio.temporal._
import zio.temporal.workflow._
val workflowOptions = ZWorkflowOptions
.withWorkflowId("<workflow-id>")
.withTaskQueue("<task-queue>")
.withSearchAttributes(
Map(
"CustomIntField" -> ZSearchAttribute(1),
"CustomBoolField" -> ZSearchAttribute(true),
"CustomKeywordField" -> ZSearchAttribute.keyword("borsch"),
"CustomKeywordListField" -> ZSearchAttribute.keyword(List("a", "bc", "def"))
)
)
// workflowOptions: ZWorkflowOptions = ZWorkflowOptions(
// workflowId = "<workflow-id>",
// taskQueue = "<task-queue>",
// workflowIdReusePolicy = None,
// workflowRunTimeout = None,
// workflowExecutionTimeout = None,
// workflowTaskTimeout = None,
// retryOptions = None,
// memo = Map(),
// searchAttributes = Some(
// value = ZSearchAttributes(SearchAttributeKey(name=CustomKeywordListField, valueType=INDEXED_VALUE_TYPE_KEYWORD_LIST, valueClass=interface java.util.List, valueReflectType=java.util.List<java.lang.String>) -> List(a, bc, def)}, SearchAttributeKey(name=CustomIntField, valueType=INDEXED_VALUE_TYPE_INT, valueClass=class java.lang.Long, valueReflectType=class java.lang.Long) -> 1}, SearchAttributeKey(name=CustomKeywordField, valueType=INDEXED_VALUE_TYPE_KEYWORD, valueClass=class java.lang.String, valueReflectType=class java.lang.String) -> borsch}, SearchAttributeKey(name=CustomBoolField, valueType=INDEXED_VALUE_TYPE_BOOL, valueClass=class java.lang.Boolean, valueReflectType=class java.lang.Boolean) -> true})
// ),
// contextPropagators = List(),
// disableEagerExecution = None,
// javaOptionsCustomization = zio.temporal.workflow.ZWorkflowOptions$SetTaskQueue$$$Lambda$8998/0x00007fea09a3e028@5b1b69a1
// )
(3) Create the workflow stub with those workflow options:
val createWorkflow = ZIO.serviceWithZIO[ZWorkflowClient] { workflowClient =>
workflowClient.newWorkflowStub[MyWorkflow](workflowOptions)
}
(4) You might set search attributes in the workflow implementation as well using upsertSearchAttributes
method:
import zio.temporal.workflow._
class MyWorkflowImpl extends MyWorkflow {
override def someMethod(): Unit = {
ZWorkflow.upsertSearchAttributes(
Map(
"OtherCustomAttribute" -> ZSearchAttribute(BigDecimal(10).pow(42)),
"ExecutionDate" -> ZSearchAttribute(ZWorkflow.currentTimeMillis)
)
)
}
}
(4) In case you want to encode your custom type as a search attribute, it's required to define as implicit instance
of ZSearchAttributeMeta
based on an existing type:
case class MyCustomType(value: String)
object MyCustomType {
implicit val searchAttributeMeta: ZSearchAttributeMeta.Of[MyCustomType, ZSearchAttribute.Plain, String] =
ZSearchAttributeMeta.string.convert(MyCustomType(_))(_.value)
}
How to remove a Search Attribute from a Workflowβ
To remove a Search Attribute that was previously set, set it to an array []
.
To remove a Search Attribute, call the upsertSearchAttributes
method and set it to an empty map.