Skip to content

Commit

Permalink
sort lookup results; retry on timeout (#382)
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinWeindel authored Jul 30, 2024
1 parent 3771baf commit 41a27a3
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 6 deletions.
40 changes: 35 additions & 5 deletions pkg/dns/provider/lookupprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"net"
"sort"
"sync"
"time"

Expand All @@ -19,10 +20,24 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)

const maxConcurrentLookupsPerJob = 4
type lookupHostConfig struct {
lookupHost func(string) ([]net.IP, error)
maxConcurrentLookupsPerJob int
maxLookupRetries int
waitLookupRetry time.Duration
}

func defaultLookupHostConfig() lookupHostConfig {
return lookupHostConfig{
lookupHost: net.LookupIP,
maxConcurrentLookupsPerJob: 4,
maxLookupRetries: 5,
waitLookupRetry: 500 * time.Millisecond,
}
}

// lookupHost allows to override the default lookup function for testing purposes
var lookupHost func(string) ([]net.IP, error) = net.LookupIP
var lookupHost lookupHostConfig = defaultLookupHostConfig()

type lookupJob struct {
objectName resources.ObjectName
Expand Down Expand Up @@ -292,9 +307,9 @@ type lookupAllResults struct {

func lookupAllHostnamesIPs(ctx context.Context, hostnames ...string) lookupAllResults {
start := time.Now()
results := make(chan lookupIPsResult, maxConcurrentLookupsPerJob)
results := make(chan lookupIPsResult, lookupHost.maxConcurrentLookupsPerJob)
go func() {
sem := make(chan struct{}, maxConcurrentLookupsPerJob)
sem := make(chan struct{}, lookupHost.maxConcurrentLookupsPerJob)
for _, hostname := range hostnames {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -334,6 +349,8 @@ func lookupAllHostnamesIPs(ctx context.Context, hostnames ...string) lookupAllRe
}
}
all.duration = time.Since(start)
sort.Strings(all.ipv4Addrs)
sort.Strings(all.ipv6Addrs)
return all
}

Expand All @@ -344,7 +361,20 @@ type lookupIPsResult struct {
}

func lookupIPs(hostname string) lookupIPsResult {
ips, err := lookupHost(hostname)
var (
ips []net.IP
err error
)
for i := 1; i <= lookupHost.maxLookupRetries; i++ {
ips, err = lookupHost.lookupHost(hostname)
if err == nil || i == lookupHost.maxLookupRetries {
break
}
if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
break
}
time.Sleep(lookupHost.waitLookupRetry)
}
if err != nil {
return lookupIPsResult{err: fmt.Errorf("cannot lookup '%s': %s", hostname, err)}
}
Expand Down
39 changes: 38 additions & 1 deletion pkg/dns/provider/lookupprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ func (e *testEnqueuer) EnqueueKey(key resources.ClusterObjectKey) error {
return nil
}

type timeoutError struct{}

func (timeoutError) Error() string { return "timeout" }
func (timeoutError) Timeout() bool { return true }
func (timeoutError) Temporary() bool { return true }

type mockLookupHostResult struct {
ips []net.IP
err error
Expand All @@ -42,15 +48,25 @@ type mockLookupHost struct {
lock sync.Mutex
lookupCount map[string]int
stopped atomic.Bool
retryMap map[string]int
}

func (lh *mockLookupHost) LookupHost(hostname string) ([]net.IP, error) {
time.Sleep(lh.delay)
retry := false
lh.lock.Lock()
if !lh.stopped.Load() {
lh.lookupCount[hostname] += 1
}
if lh.retryMap != nil && lh.retryMap[hostname] > 0 {
retry = true
lh.retryMap[hostname]--
}
lh.lock.Unlock()
if retry {
time.Sleep(lh.delay)
return nil, timeoutError{}
}
result, ok := lh.lookupMap[hostname]
if !ok {
return nil, fmt.Errorf("host not found")
Expand Down Expand Up @@ -140,11 +156,24 @@ var _ = ginkgov2.Describe("Lookup processor", func() {
},
lookupCount: map[string]int{},
}
lookupHost = mlh.LookupHost
lookupHost.lookupHost = mlh.LookupHost
lookupHost.waitLookupRetry = 5 * time.Millisecond
ctx, ctxCancel = context.WithCancel(context.Background())
})

ginkgov2.It("lookupAllHostnamesIPs should return expected results", func() {
results1 := lookupAllHostnamesIPs(ctx, "host3a", "host3b", "host3c")
Expect(results1.ipv4Addrs).To(HaveLen(4))
Expect(results1.ipv6Addrs).To(HaveLen(1))
Expect(results1.allIPAddrs).To(HaveLen(5))
results2 := lookupAllHostnamesIPs(ctx, "host3a", "host3b", "host3c", "host3c-alias")
Expect(results2.ipv4Addrs).To(Equal(results1.ipv4Addrs))
Expect(results2.ipv6Addrs).To(Equal(results1.ipv6Addrs))
Expect(results2.allIPAddrs).To(Equal(results1.allIPAddrs))
})

ginkgov2.It("lookupAllHostnamesIPs should return expected results with retries", func() {
mlh.retryMap = map[string]int{"host3b": 3}
results1 := lookupAllHostnamesIPs(ctx, "host3a", "host3b", "host3c")
Expect(results1.ipv4Addrs).To(HaveLen(4))
Expect(results1.ipv6Addrs).To(HaveLen(1))
Expand All @@ -153,6 +182,14 @@ var _ = ginkgov2.Describe("Lookup processor", func() {
Expect(results2.allIPAddrs).To(Equal(results1.allIPAddrs))
})

ginkgov2.It("lookupAllHostnamesIPs should return reduced results after too many retries", func() {
mlh.retryMap = map[string]int{"host3b": lookupHost.maxLookupRetries + 1}
results1 := lookupAllHostnamesIPs(ctx, "host3a", "host3b", "host3c")
Expect(results1.ipv4Addrs).To(HaveLen(3))
Expect(results1.ipv6Addrs).To(HaveLen(1))
Expect(results1.allIPAddrs).To(HaveLen(4))
})

ginkgov2.It("performs multiple lookup jobs regularly", func() {
go processor.Run(ctx)
processor.Upsert(nameE1, lookupAllHostnamesIPs(ctx, "host1"), 1*time.Millisecond)
Expand Down

0 comments on commit 41a27a3

Please sign in to comment.