diff --git a/internal/devserver/server.go b/internal/devserver/server.go index 52756e0ae..d55824c54 100644 --- a/internal/devserver/server.go +++ b/internal/devserver/server.go @@ -47,13 +47,13 @@ import ( "go.temporal.io/server/common/metrics" sqliteplugin "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" "go.temporal.io/server/common/primitives" - "go.temporal.io/server/components/callbacks" "go.temporal.io/server/components/nexusoperations" "go.temporal.io/server/schema/sqlite" sqliteschema "go.temporal.io/server/schema/sqlite" "go.temporal.io/server/temporal" "google.golang.org/grpc" "gopkg.in/yaml.v3" + "go.temporal.io/server/components/callbacks" ) const ( @@ -241,19 +241,10 @@ func (s *StartOptions) buildServerOptions() ([]temporal.ServerOption, *slog.Leve dynConf[dynamicconfig.HistoryCacheHostLevelMaxSize.Key()] = 8096 // Up default visibility RPS dynConf[dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance.Key()] = 100 - // NOTE that the URL scheme is fixed to HTTP since the dev server doesn't support TLS at the time of writing. - dynConf[nexusoperations.CallbackURLTemplate.Key()] = fmt.Sprintf( - "http://%s:%d/namespaces/{{.NamespaceName}}/nexus/callback", MaybeEscapeIPv6(s.FrontendIP), s.FrontendHTTPPort) - dynConf[callbacks.AllowedAddresses.Key()] = []struct { - Pattern string - AllowInsecure bool - }{ - { - Pattern: fmt.Sprintf("%s:%d", MaybeEscapeIPv6(s.FrontendIP), s.FrontendHTTPPort), - AllowInsecure: true, - }, - } + // Enable the system callback URL for worker targets. + dynConf[nexusoperations.UseSystemCallbackURL.Key()] = true + dynConf[callbacks.AllowedAddresses.Key()] = true // Dynamic config if set for k, v := range s.DynamicConfigValues { dynConf[dynamicconfig.MakeKey(k)] = v diff --git a/internal/temporalcli/commands.workflow_view_test.go b/internal/temporalcli/commands.workflow_view_test.go index 980d4b632..5abb30068 100644 --- a/internal/temporalcli/commands.workflow_view_test.go +++ b/internal/temporalcli/commands.workflow_view_test.go @@ -757,7 +757,7 @@ func (s *SharedServerSuite) TestWorkflow_Describe_NexusOperationAndCallback() { out = res.Stdout.String() s.ContainsOnSameLine(out, "WorkflowId", handlerWorkflowID) s.Contains(out, "Callbacks: 1") - s.ContainsOnSameLine(out, "URL", "http://"+s.DevServer.Options.FrontendIP) + s.ContainsOnSameLine(out, "URL", "temporal://system") s.ContainsOnSameLine(out, "Trigger", "WorkflowClosed") s.ContainsOnSameLine(out, "State", "Succeeded") @@ -777,11 +777,11 @@ func (s *SharedServerSuite) TestWorkflow_Describe_NexusOperationAndCallback() { func (s *SharedServerSuite) TestWorkflow_Describe_NexusOperationBlocked() { endpointName := validEndpointName(s.T()) - // Call an unreachable operation from this workflow. + // Call an operation that has no handler registered. callerWorkflow := func(ctx workflow.Context) error { client := workflow.NewNexusClient(endpointName, "test-service") fut := client.ExecuteOperation(ctx, "test-op", nil, workflow.NexusOperationOptions{}) - // Destination is unreachable, the future will never complete. + // No handler is registered, the operation will fail and trigger the circuit breaker. return fut.GetNexusOperationExecution().Get(ctx, nil) } @@ -790,16 +790,19 @@ func (s *SharedServerSuite) TestWorkflow_Describe_NexusOperationBlocked() { }) defer w.Stop() - // Create an endpoint for this test. + // Create an endpoint for this test pointing to a worker target. + // The worker has no handler for "test-service"/"test-op", so requests will fail + // and eventually trigger the circuit breaker. _, err := s.Client.OperatorService().CreateNexusEndpoint( s.Context, &operatorservice.CreateNexusEndpointRequest{ Spec: &nexuspb.EndpointSpec{ Name: endpointName, Target: &nexuspb.EndpointTarget{ - Variant: &nexuspb.EndpointTarget_External_{ - External: &nexuspb.EndpointTarget_External{ - Url: "http://localhost:12345", // unreachable destination + Variant: &nexuspb.EndpointTarget_Worker_{ + Worker: &nexuspb.EndpointTarget_Worker{ + Namespace: s.Namespace(), + TaskQueue: w.Options.TaskQueue, }, }, }, @@ -808,7 +811,6 @@ func (s *SharedServerSuite) TestWorkflow_Describe_NexusOperationBlocked() { ) s.NoError(err) - // Start the workflow and wait until the operation is started. run, err := s.Client.ExecuteWorkflow( s.Context, client.StartWorkflowOptions{TaskQueue: w.Options.TaskQueue}, @@ -847,7 +849,6 @@ func (s *SharedServerSuite) TestWorkflow_Describe_NexusOperationBlocked() { s.ContainsOnSameLine(out, "Operation", "test-op") s.ContainsOnSameLine(out, "BlockedReason", "The circuit breaker is open.") - // Operations - JSON res = s.Execute( "workflow", "describe", "--address", s.Address(),