diff --git a/core/config.go b/core/config.go index f6418e94..a0038309 100644 --- a/core/config.go +++ b/core/config.go @@ -102,6 +102,7 @@ func setDefaults() error { viper.SetDefault("ccdbEndpoint", "http://ccdb-test.cern.ch:8080") viper.SetDefault("dcsServiceEndpoint", "//127.0.0.1:50051") viper.SetDefault("dcsServiceUseSystemProxy", false) + viper.SetDefault("ddSchedulergRPCTimeout", "5") viper.SetDefault("ddSchedulerEndpoint", "//127.0.0.1:50052") viper.SetDefault("ddSchedulerUseSystemProxy", false) viper.SetDefault("trgServiceEndpoint", "//127.0.0.1:50060") @@ -170,6 +171,7 @@ func setFlags() error { pflag.String("dcsServiceEndpoint", viper.GetString("dcsServiceEndpoint"), "Endpoint of the DCS gRPC service (`host:port`)") pflag.Bool("dcsServiceUseSystemProxy", viper.GetBool("dcsServiceUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed") pflag.String("ddSchedulerEndpoint", viper.GetString("ddSchedulerEndpoint"), "Endpoint of the DD scheduler gRPC service (`host:port`)") + pflag.String("ddSchedulergRPCTimeout", viper.GetString("ddSchedulergRPCTimeout"), "Timeout for gRPC calls in ddshed plugin in seconds") pflag.Bool("ddSchedulerUseSystemProxy", viper.GetBool("ddSchedulerUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed") pflag.String("trgServiceEndpoint", viper.GetString("trgServiceEndpoint"), "Endpoint of the TRG gRPC service (`host:port`)") pflag.String("trgPollingInterval", viper.GetString("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)") diff --git a/core/integration/ddsched/client.go b/core/integration/ddsched/client.go index fde72375..f6ec7c08 100644 --- a/core/integration/ddsched/client.go +++ b/core/integration/ddsched/client.go @@ -29,6 +29,7 @@ import ( "time" "github.com/AliceO2Group/Control/common/logger" + "github.com/AliceO2Group/Control/core/integration" ddpb "github.com/AliceO2Group/Control/core/integration/ddsched/protos" "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -38,12 +39,11 @@ import ( "google.golang.org/grpc/keepalive" ) -var log = logger.New(logrus.StandardLogger(),"ddschedclient") - +var log = logger.New(logrus.StandardLogger(), "ddschedclient") type RpcClient struct { ddpb.DataDistributionControlClient - conn *grpc.ClientConn + conn *grpc.ClientConn cancel context.CancelFunc } @@ -52,10 +52,10 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string) "endpoint": endpoint, }).Debug("dialing DD scheduler endpoint") - dialOptions := []grpc.DialOption { + dialOptions := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ + Backoff: backoff.Config{ BaseDelay: backoff.DefaultConfig.BaseDelay, Multiplier: backoff.DefaultConfig.Multiplier, Jitter: backoff.DefaultConfig.Jitter, @@ -68,14 +68,15 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string) Timeout: time.Second, PermitWithoutStream: true, }), + grpc.WithUnaryInterceptor(integration.UnaryTimeoutInterceptor(time.Second*time.Duration(viper.GetInt("ddSchedulergRPCTimeout")), "ddsched gRPC call failed")), } if !viper.GetBool("ddSchedulerUseSystemProxy") { dialOptions = append(dialOptions, grpc.WithNoProxy()) } conn, err := grpc.DialContext(cxt, - endpoint, - dialOptions..., - ) + endpoint, + dialOptions..., + ) if err != nil { log.WithField("error", err.Error()). WithField("endpoint", endpoint). @@ -95,27 +96,27 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string) for { select { - case ok := <- stateChangedNotify: + case ok := <-stateChangedNotify: if !ok { return } connState = conn.GetState() log.Debugf("DD scheduler client %s", connState.String()) go notifyFunc(connState) - case <- time.After(2 * time.Minute): + case <-time.After(2 * time.Minute): if conn.GetState() != connectivity.Ready { conn.ResetConnectBackoff() } - case <- cxt.Done(): + case <-cxt.Done(): return } } }() - client := &RpcClient { + client := &RpcClient{ DataDistributionControlClient: ddpb.NewDataDistributionControlClient(conn), - conn: conn, - cancel: cancel, + conn: conn, + cancel: cancel, } return client @@ -131,4 +132,5 @@ func (m *RpcClient) GetConnState() connectivity.State { func (m *RpcClient) Close() error { m.cancel() return m.conn.Close() -} \ No newline at end of file +} + diff --git a/core/integration/rpctimeoutinterceptor.go b/core/integration/rpctimeoutinterceptor.go new file mode 100644 index 00000000..636faaf6 --- /dev/null +++ b/core/integration/rpctimeoutinterceptor.go @@ -0,0 +1,58 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2021 CERN and copyright holders of ALICE O². + * Author: Teo Mrnjavac + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package integration + +import ( + "context" + "errors" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/status" +) + +func UnaryTimeoutInterceptor(timeout time.Duration, cause string) grpc.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req, reply interface{}, + connection *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + // Create a context with a timeout + ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New(cause)) + defer cancel() // Ensure the context is canceled after the call + + // Invoke the RPC call with the new context + err := invoker(ctx, method, req, reply, connection, opts...) + if err != nil { + st, _ := status.FromError(err) + // Handle error, maybe logging or processing the error status + return st.Err() + } + return nil + } +}