Skip to content

Commit

Permalink
fix check
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Jan 1, 2024
1 parent 0e04470 commit dd54828
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions cdc/cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ type Sink interface {
Barrier(ctx context.Context, keyspanID model.KeySpanID) error
}

var sinkIniterMap = make(map[string]sinkInitFunc)
var sinkUriCheckerMap = make(map[string]sinkInitFunc)
var (
sinkIniterMap = make(map[string]sinkInitFunc)
sinkURICheckerMap = make(map[string]sinkInitFunc)
)

type sinkInitFunc func(context.Context, model.ChangeFeedID, *url.URL, *config.ReplicaConfig, map[string]string, chan error) (Sink, error)

Expand All @@ -88,7 +90,7 @@ func init() {
) (Sink, error) {
return newTiKVSink(ctx, sinkURI, config, opts, errCh)
}
sinkUriCheckerMap["tikv"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
sinkURICheckerMap["tikv"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
_, _, err := parseTiKVUri(sinkURI, opts)
Expand All @@ -101,14 +103,14 @@ func init() {
) (Sink, error) {
return newKafkaSaramaSink(ctx, sinkURI, config, opts, errCh)
}
sinkUriCheckerMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
sinkURICheckerMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
_, _, err := parseKafkaSinkConfig(sinkURI, config, opts)
return nil, err
}
sinkIniterMap["kafka+ssl"] = sinkIniterMap["kafka"]
sinkUriCheckerMap["kafka+ssl"] = sinkUriCheckerMap["kafka"]
sinkURICheckerMap["kafka+ssl"] = sinkURICheckerMap["kafka"]
}

// New creates a new sink with the sink-uri
Expand All @@ -135,7 +137,7 @@ func Validate(ctx context.Context, sinkURIStr string, cfg *config.ReplicaConfig,
return cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := strings.ToLower(sinkURI.Scheme)
newSink, ok := sinkUriCheckerMap[scheme]
newSink, ok := sinkURICheckerMap[scheme]
if !ok {
newSink, ok = sinkIniterMap[scheme]
if !ok {
Expand Down

0 comments on commit dd54828

Please sign in to comment.