Skip to main content

Observability

The observability section of the Temporal Developer's guide covers the many ways to view the current state of your Temporal Application that is, ways to view which Workflow Executions are tracked by the Temporal Platform and the state of any specified Workflow Execution, either currently or at points of an execution.

This section covers features related to viewing the state of the application, including:

  • Metrics
  • Tracing
  • Logging
  • Visibility

How to emit metrics​

Each Temporal SDK is capable of emitting an optional set of metrics from either the Client or the Worker process. For a complete list of metrics capable of being emitted, see the SDK metrics referenceLink preview icon.

Metrics can be scraped and stored in time series databases, such as:

  • Prometheus
  • M3db
  • statsd

For more information about dasbharods, see Temporal Java SDK guide

To emit metrics, use the MicrometerClientStatsReporter class to integrate with Micrometer MeterRegistry configured for your metrics backend. Micrometer is a popular Java framework that provides integration with Prometheus and other backends.

The following example shows how to use MicrometerClientStatsReporter to define the metrics scope and set it with the ZWorkflowServiceStubsOptions.

(1) Add necessary dependencies

libraryDependencies ++= Seq(
// Temporal integration with opentracing
"io.temporal" % "temporal-opentracing" % "<temporal-version>",
// Micrometer-otlp integration
"io.micrometer" % "micrometer-registry-otlp" % "<micrometer-version>",
// Opentelemetry libs
"io.opentelemetry" % "opentelemetry-api" % "<otel-version>",
"io.opentelemetry" % "opentelemetry-exporter-otlp" % "<otel-version>",
"io.opentelemetry" % "opentelemetry-extension-trace-propagators" % "<otel-version>",
"io.opentelemetry" % "opentelemetry-opentracing-shim" % "<otel-version>"
)

(2) Configure the Opentelemetry-based metrics registry & provide it to the ZWorkflowServiceStubsOptions:

import zio._
import zio.temporal._
import zio.temporal.workflow._
// required for metrics
import com.uber.m3.tally.RootScopeBuilder
import io.micrometer.registry.otlp.{OtlpConfig, OtlpMeterRegistry}
import io.temporal.common.reporter.MicrometerClientStatsReporter

// OtlpConfig is a SAM, so Map#get is easily convertable into OtlpConfig
val otlpConfig: OtlpConfig =
Map(
"url" -> "http://otlp-server-endpoint:4317",
"resourceAttributes" -> "service.name=<service-name>"
).get(_).orNull
// otlpConfig: OtlpConfig = repl.MdocSession$MdocApp$$anonfun$1@4023fd9

val metricsScope = new RootScopeBuilder()
.reporter(
new MicrometerClientStatsReporter(
new OtlpMeterRegistry(
otlpConfig,
io.micrometer.core.instrument.Clock.SYSTEM
)
)
)
// it's usually better to use bigger intervals in production
.reportEvery(5.seconds)
// metricsScope: com.uber.m3.tally.Scope = com.uber.m3.tally.ScopeImpl@1c829864

val workflowServiceStubsOptionsLayer =
ZWorkflowServiceStubsOptions.make @@
ZWorkflowServiceStubsOptions.withMetricsScope(metricsScope)
// workflowServiceStubsOptionsLayer: ZLayer[Any, Config.Error, ZWorkflowServiceStubsOptions] = Suspend(
// self = zio.ZLayer$$Lambda$8607/0x00007f228d9c46d0@208c00b8
// )

For more details, find Monitoring samples. For details on configuring a OTLP scrape endpoint with Micrometer, see Micrometer OTLP doc

How to setup Tracing​

Tracing allows you to view the call graph of a Workflow along with its Activities and any Child Workflows.

Temporal Web's tracing capabilities mainly track Activity Execution within a Temporal context. If you need custom tracing specific for your use case, you should make use of context propagation to add tracing logic accordingly.

Both client-side & worker-side tracing requires OpenTracingOptions. It can be build in the following way:

import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.{ContextPropagators, TextMapPropagator}
import io.temporal.opentracing.OpenTracingOptions
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.opentracingshim.OpenTracingShim
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.extension.trace.propagation.OtTracePropagator
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes

val tracingOptions: OpenTracingOptions = {
val selfResource = Resource.getDefault.merge(
Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "<resource-name>"))
)

val spanProcessor = SimpleSpanProcessor.create(
OtlpGrpcSpanExporter
.builder()
.setEndpoint("http://otlp-server-endpoint:4317")
.setTimeout(5.seconds)
.build()
)

val tracerProvider = SdkTracerProvider
.builder()
.addSpanProcessor(spanProcessor)
.setResource(selfResource)
.build()

val propagators = ContextPropagators.create(
TextMapPropagator.composite(
W3CTraceContextPropagator.getInstance(),
OtTracePropagator.getInstance()
)
)

OpenTracingOptions
.newBuilder()
.setTracer(
OpenTracingShim.createTracerShim(
OpenTelemetrySdk
.builder()
.setPropagators(propagators)
.setTracerProvider(tracerProvider)
.build()
)
)
.build()
}
// tracingOptions: OpenTracingOptions = io.temporal.opentracing.OpenTracingOptions@567dd04

To configure tracing, register the OpenTracingClientInterceptor interceptor. You can register the interceptors on both the Temporal Client side and the Worker side.

The following code examples demonstrate the OpenTracingClientInterceptor on the Temporal Client.

import io.temporal.opentracing.OpenTracingClientInterceptor

val otlpClientInterceptor = new OpenTracingClientInterceptor(tracingOptions)
// otlpClientInterceptor: OpenTracingClientInterceptor = io.temporal.opentracing.OpenTracingClientInterceptor@108ed686

val workflowClientOptionsLayer = ZWorkflowClientOptions.make @@
ZWorkflowClientOptions.withInterceptors(otlpClientInterceptor)
// workflowClientOptionsLayer: ZLayer[Any, Config.Error, ZWorkflowClientOptions] = Suspend(
// self = zio.ZLayer$$Lambda$8607/0x00007f228d9c46d0@18472d4b
// )

The following code examples demonstrate the OpenTracingWorkerInterceptor on the Worker:

import io.temporal.opentracing.OpenTracingWorkerInterceptor
import zio.temporal.worker._

val otlpWorkerInterceptor = new OpenTracingWorkerInterceptor(tracingOptions)
// otlpWorkerInterceptor: OpenTracingWorkerInterceptor = io.temporal.opentracing.OpenTracingWorkerInterceptor@580e773b

val workerFactoryOptionsLayer = ZWorkerFactoryOptions.make @@
ZWorkerFactoryOptions.withWorkerInterceptors(otlpWorkerInterceptor)
// workerFactoryOptionsLayer: ZLayer[Any, Config.Error, ZWorkerFactoryOptions] = Suspend(
// self = zio.ZLayer$$Lambda$8607/0x00007f228d9c46d0@63ba0ec3
// )

For more information, see the Temporal OpenTracing module

How to log from a Workflow​

Send logs and errors to a logging service, so that when things go wrong, you can see what happened.

The SDK core uses WARN for its default logging level.

To get a standard slf4j logger in your Workflow code, use the ZWorkflow.getLogger method:

import zio.temporal._
import zio.temporal.workflow._

@workflowInterface
trait MyWorkflow {
// ...workflow methods
}

class MyWorkflowImpl extends MyWorkflow {
private val logger = ZWorkflow.getLogger(getClass)
// ...workflow methods
}

To avoid passing the current class, you can use ZWorkflow.makeLogger method:

class MyWorkflowImpl2 extends MyWorkflow {
private val logger = ZWorkflow.makeLogger
// ...workflow methods
}

How to use Visibility APIs​

The term Visibility, within the Temporal Platform, refers to the subsystems and APIs that enable an operator to view Workflow Executions that currently exist within a Cluster.

How to use Search Attributes​

The typical method of retrieving a Workflow Execution is by its Workflow Id.

However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.

You can do this with Search Attributes.

  • Default Search Attributes like WorkflowType, StartTime and ExecutionStatus are automatically added to Workflow Executions.
  • Custom Search Attributes can contain their own domain-specific data (like customerId or numItems).
    • A few generic Custom Search Attributes like CustomKeywordField and CustomIntField are created by default in Temporal's Docker Compose.

The steps to using custom Search Attributes are:

  • Create a new Search Attribute in your Cluster using tctl search-attribute create or the Cloud UI.
  • Set the value of the Search Attribute for a Workflow Execution:
    • On the Client by including it as an option when starting the Execution.
    • In the Workflow by calling upsertSearchAttributes.
  • Read the value of the Search Attribute:
    • On the Client by calling describeWorkflow.
    • In the Workflow by looking at workflowInfo.
  • Query Workflow Executions by the Search Attribute using a List Filter:
  • In tctl.
  • In code by calling listWorkflowExecutions.

Here is how to query Workflow Executions:

Search attribute types​

ZIO Temporal encodes a lot of custom types on top of those supported by Temporal platform, including:

  • Primitive types (Int, Long, Double, Boolean)
  • String
  • UUID (encoded as keyword)
  • BigInt, BigDecimal
  • Option
  • Scala collections (Set, List, Array, etc.)
  • Some java.time classes (Instant, LocalDateTime, OffsetDateTime)
  • Enumeratum enums, Scala 3 enums (as keyword)

Note that String can be encoded both as text and keyword. By default, it's text. If you need it to be encoded as keyword, you must wrap it into ZSearchAttribute.keyword method.
Other types encoded as keyword (such as UUID) should be wrapped as well.
ZIO Temporal methods to set search attributes usually accept Map[String, ZSearchAttribute].
For simple types, just wrap them with ZSearchAttribute() method call, while keyword-based types should be wrapped into ZSearchAttribute.keyword method:

import java.util.UUID
import java.time.LocalDateTime

val searchAttributes: Map[String, ZSearchAttribute] =
Map(
"TextAttr" -> ZSearchAttribute("foo"),
"KeywordAttr" -> ZSearchAttribute.keyword("bar"),
"KeywordAttr2" -> ZSearchAttribute.keyword(UUID.randomUUID()),
"DateAttr" -> ZSearchAttribute(LocalDateTime.now())
)
// searchAttributes: Map[String, ZSearchAttribute] = Map(
// "TextAttr" -> ZSearchAttribute(value=foo, meta=zio.temporal.ZSearchAttributeMeta$StringMeta$@faa285f),
// "KeywordAttr" -> ZSearchAttribute(value=bar, meta=zio.temporal.ZSearchAttributeMeta$KeywordMeta@7f0247ca),
// "KeywordAttr2" -> ZSearchAttribute(value=281b415b-ca28-40b3-bd2d-5b0d7493abd6, meta=zio.temporal.ZSearchAttributeMeta$$anon$1@449d0e97),
// "DateAttr" -> ZSearchAttribute(value=2023-11-24T12:54:23.749231080, meta=zio.temporal.ZSearchAttributeMeta$$anon$1@3269f501)
// )

Enumeratum enum example:

import enumeratum.{Enum, EnumEntry}
import zio.temporal._
import zio.temporal.enumeratum._

sealed trait Color extends EnumEntry
object Color extends Enum[Color] {
case object Red extends Color
case object Green extends Color
case object Blue extends Color

override val values = findValues
}

val otherSearchAttributes: Map[String, ZSearchAttribute] = Map(
"EnumAttr" -> ZSearchAttribute.keyword[Color](Color.Green),
"OptionEnum" -> ZSearchAttribute.keyword(Option[Color](Color.Red)),
"OptionEnum2" -> ZSearchAttribute.keyword(Option.empty[Color])
)
// otherSearchAttributes: Map[String, ZSearchAttribute] = Map(
// "EnumAttr" -> ZSearchAttribute(value=Green, meta=zio.temporal.ZSearchAttributeMeta$KeywordMeta@42e453de),
// "OptionEnum" -> ZSearchAttribute(value=Some(Red), meta=zio.temporal.ZSearchAttributeMeta$OptionMeta@34f0ecc2),
// "OptionEnum2" -> ZSearchAttribute(value=None, meta=zio.temporal.ZSearchAttributeMeta$OptionMeta@44df3f43)
// )

Same example with Scala 3 enums

import zio.temporal._

enum Color {
case Red, Green, Blue
}

val otherSearchAttributes: Map[String, ZSearchAttribute] = Map(
"EnumAttr" -> ZSearchAttribute.keyword[Color](Color.Green),
"OptionEnum" -> ZSearchAttribute.keyword(Option[Color](Color.Red)),
"OptionEnum2" -> ZSearchAttribute.keyword(Option.empty[Color])
)

How to set custom Search Attributes​

After you've created custom Search Attributes in your Cluster (using tctl search-attribute create or the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.

To set a custom Search Attribute, call the withSearchAttributes method.

(1) Define the workflow

import zio.temporal._

@workflowInterface
trait MyWorkflow {
@workflowMethod
def someMethod(): Unit
}

(2) On the client side, create workflow options with search attributes:

import zio._
import zio.temporal._
import zio.temporal.workflow._

val workflowOptions = ZWorkflowOptions
.withWorkflowId("<workflow-id>")
.withTaskQueue("<task-queue>")
.withSearchAttributes(
Map(
"CustomIntField" -> ZSearchAttribute(1),
"CustomBoolField" -> ZSearchAttribute(true),
"CustomKeywordField" -> ZSearchAttribute.keyword("borsch"),
"CustomKeywordListField" -> ZSearchAttribute.keyword(List("a", "bc", "def"))
)
)
// workflowOptions: ZWorkflowOptions = ZWorkflowOptions(
// workflowId = "<workflow-id>",
// taskQueue = "<task-queue>",
// workflowIdReusePolicy = None,
// workflowRunTimeout = None,
// workflowExecutionTimeout = None,
// workflowTaskTimeout = None,
// retryOptions = None,
// memo = Map(),
// searchAttributes = Some(
// value = ZSearchAttributes(SearchAttributeKey(name=CustomKeywordListField, valueType=INDEXED_VALUE_TYPE_KEYWORD_LIST, valueClass=interface java.util.List, valueReflectType=java.util.List<java.lang.String>) -> List(a, bc, def)}, SearchAttributeKey(name=CustomKeywordField, valueType=INDEXED_VALUE_TYPE_KEYWORD, valueClass=class java.lang.String, valueReflectType=class java.lang.String) -> borsch}, SearchAttributeKey(name=CustomIntField, valueType=INDEXED_VALUE_TYPE_INT, valueClass=class java.lang.Long, valueReflectType=class java.lang.Long) -> 1}, SearchAttributeKey(name=CustomBoolField, valueType=INDEXED_VALUE_TYPE_BOOL, valueClass=class java.lang.Boolean, valueReflectType=class java.lang.Boolean) -> true})
// ),
// contextPropagators = List(),
// disableEagerExecution = None,
// javaOptionsCustomization = zio.temporal.workflow.ZWorkflowOptions$SetTaskQueue$$$Lambda$8681/0x00007f228d9f6418@179899bb
// )

(3) Create the workflow stub with those workflow options:

val createWorkflow = ZIO.serviceWithZIO[ZWorkflowClient] { workflowClient =>
workflowClient.newWorkflowStub[MyWorkflow](workflowOptions)
}

(4) You might set search attributes in the workflow implementation as well using upsertSearchAttributes method:

import zio.temporal.workflow._

class MyWorkflowImpl extends MyWorkflow {
override def someMethod(): Unit = {
ZWorkflow.upsertSearchAttributes(
Map(
"OtherCustomAttribute" -> ZSearchAttribute(BigDecimal(10).pow(42)),
"ExecutionDate" -> ZSearchAttribute(ZWorkflow.currentTimeMillis)
)
)
}
}

(4) In case you want to encode your custom type as a search attribute, it's required to define as implicit instance of ZSearchAttributeMeta based on an existing type:

case class MyCustomType(value: String)

object MyCustomType {
implicit val searchAttributeMeta: ZSearchAttributeMeta.Of[MyCustomType, ZSearchAttribute.Plain, String] =
ZSearchAttributeMeta.string.convert(MyCustomType(_))(_.value)
}

How to remove a Search Attribute from a Workflow​

To remove a Search Attribute that was previously set, set it to an array [].

To remove a Search Attribute, call the upsertSearchAttributes method and set it to an empty map.