From dd54828559d25ea32171b0b19171da1ecf9d4fd2 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 2 Jan 2024 00:20:00 +0800 Subject: [PATCH] fix check Signed-off-by: Ping Yu --- cdc/cdc/sink/sink.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/cdc/cdc/sink/sink.go b/cdc/cdc/sink/sink.go index 26379f22..224ce840 100644 --- a/cdc/cdc/sink/sink.go +++ b/cdc/cdc/sink/sink.go @@ -69,8 +69,10 @@ type Sink interface { Barrier(ctx context.Context, keyspanID model.KeySpanID) error } -var sinkIniterMap = make(map[string]sinkInitFunc) -var sinkUriCheckerMap = make(map[string]sinkInitFunc) +var ( + sinkIniterMap = make(map[string]sinkInitFunc) + sinkURICheckerMap = make(map[string]sinkInitFunc) +) type sinkInitFunc func(context.Context, model.ChangeFeedID, *url.URL, *config.ReplicaConfig, map[string]string, chan error) (Sink, error) @@ -88,7 +90,7 @@ func init() { ) (Sink, error) { return newTiKVSink(ctx, sinkURI, config, opts, errCh) } - sinkUriCheckerMap["tikv"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + sinkURICheckerMap["tikv"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, config *config.ReplicaConfig, opts map[string]string, errCh chan error, ) (Sink, error) { _, _, err := parseTiKVUri(sinkURI, opts) @@ -101,14 +103,14 @@ func init() { ) (Sink, error) { return newKafkaSaramaSink(ctx, sinkURI, config, opts, errCh) } - sinkUriCheckerMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, + sinkURICheckerMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, config *config.ReplicaConfig, opts map[string]string, errCh chan error, ) (Sink, error) { _, _, err := parseKafkaSinkConfig(sinkURI, config, opts) return nil, err } sinkIniterMap["kafka+ssl"] = sinkIniterMap["kafka"] - sinkUriCheckerMap["kafka+ssl"] = sinkUriCheckerMap["kafka"] + sinkURICheckerMap["kafka+ssl"] = sinkURICheckerMap["kafka"] } // New creates a new sink with the sink-uri @@ -135,7 +137,7 @@ func Validate(ctx context.Context, sinkURIStr string, cfg *config.ReplicaConfig, return cerror.WrapError(cerror.ErrSinkURIInvalid, err) } scheme := strings.ToLower(sinkURI.Scheme) - newSink, ok := sinkUriCheckerMap[scheme] + newSink, ok := sinkURICheckerMap[scheme] if !ok { newSink, ok = sinkIniterMap[scheme] if !ok {