Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddsched plugin gRPC calls have timeout by default #624

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func setDefaults() error {
viper.SetDefault("ccdbEndpoint", "http://ccdb-test.cern.ch:8080")
viper.SetDefault("dcsServiceEndpoint", "//127.0.0.1:50051")
viper.SetDefault("dcsServiceUseSystemProxy", false)
viper.SetDefault("ddSchedulergRPCTimeout", "5")
viper.SetDefault("ddSchedulerEndpoint", "//127.0.0.1:50052")
viper.SetDefault("ddSchedulerUseSystemProxy", false)
viper.SetDefault("trgServiceEndpoint", "//127.0.0.1:50060")
Expand Down Expand Up @@ -170,6 +171,7 @@ func setFlags() error {
pflag.String("dcsServiceEndpoint", viper.GetString("dcsServiceEndpoint"), "Endpoint of the DCS gRPC service (`host:port`)")
pflag.Bool("dcsServiceUseSystemProxy", viper.GetBool("dcsServiceUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
pflag.String("ddSchedulerEndpoint", viper.GetString("ddSchedulerEndpoint"), "Endpoint of the DD scheduler gRPC service (`host:port`)")
pflag.String("ddSchedulergRPCTimeout", viper.GetString("ddSchedulergRPCTimeout"), "Timeout for gRPC calls in ddshed plugin in seconds")
pflag.Bool("ddSchedulerUseSystemProxy", viper.GetBool("ddSchedulerUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
pflag.String("trgServiceEndpoint", viper.GetString("trgServiceEndpoint"), "Endpoint of the TRG gRPC service (`host:port`)")
pflag.String("trgPollingInterval", viper.GetString("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)")
Expand Down
32 changes: 17 additions & 15 deletions core/integration/ddsched/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/AliceO2Group/Control/common/logger"
"github.com/AliceO2Group/Control/core/integration"
ddpb "github.com/AliceO2Group/Control/core/integration/ddsched/protos"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand All @@ -38,12 +39,11 @@ import (
"google.golang.org/grpc/keepalive"
)

var log = logger.New(logrus.StandardLogger(),"ddschedclient")

var log = logger.New(logrus.StandardLogger(), "ddschedclient")

type RpcClient struct {
ddpb.DataDistributionControlClient
conn *grpc.ClientConn
conn *grpc.ClientConn
cancel context.CancelFunc
}

Expand All @@ -52,10 +52,10 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
"endpoint": endpoint,
}).Debug("dialing DD scheduler endpoint")

dialOptions := []grpc.DialOption {
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
Backoff: backoff.Config{
BaseDelay: backoff.DefaultConfig.BaseDelay,
Multiplier: backoff.DefaultConfig.Multiplier,
Jitter: backoff.DefaultConfig.Jitter,
Expand All @@ -68,14 +68,15 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
Timeout: time.Second,
PermitWithoutStream: true,
}),
grpc.WithUnaryInterceptor(integration.UnaryTimeoutInterceptor(time.Second*time.Duration(viper.GetInt("ddSchedulergRPCTimeout")), "ddsched gRPC call failed")),
}
if !viper.GetBool("ddSchedulerUseSystemProxy") {
dialOptions = append(dialOptions, grpc.WithNoProxy())
}
conn, err := grpc.DialContext(cxt,
endpoint,
dialOptions...,
)
endpoint,
dialOptions...,
)
if err != nil {
log.WithField("error", err.Error()).
WithField("endpoint", endpoint).
Expand All @@ -95,27 +96,27 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)

for {
select {
case ok := <- stateChangedNotify:
case ok := <-stateChangedNotify:
if !ok {
return
}
connState = conn.GetState()
log.Debugf("DD scheduler client %s", connState.String())
go notifyFunc(connState)
case <- time.After(2 * time.Minute):
case <-time.After(2 * time.Minute):
if conn.GetState() != connectivity.Ready {
conn.ResetConnectBackoff()
}
case <- cxt.Done():
case <-cxt.Done():
return
}
}
}()

client := &RpcClient {
client := &RpcClient{
DataDistributionControlClient: ddpb.NewDataDistributionControlClient(conn),
conn: conn,
cancel: cancel,
conn: conn,
cancel: cancel,
}

return client
Expand All @@ -131,4 +132,5 @@ func (m *RpcClient) GetConnState() connectivity.State {
func (m *RpcClient) Close() error {
m.cancel()
return m.conn.Close()
}
}

58 changes: 58 additions & 0 deletions core/integration/rpctimeoutinterceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2021 CERN and copyright holders of ALICE O².
* Author: Teo Mrnjavac <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package integration

import (
"context"
"errors"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/status"
)

func UnaryTimeoutInterceptor(timeout time.Duration, cause string) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
connection *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// Create a context with a timeout
ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New(cause))
defer cancel() // Ensure the context is canceled after the call

// Invoke the RPC call with the new context
err := invoker(ctx, method, req, reply, connection, opts...)
if err != nil {
st, _ := status.FromError(err)
// Handle error, maybe logging or processing the error status
return st.Err()
}
return nil
}
}
Loading