Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Fix inconsistency in response status when calling a non-owner peer.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Mar 8, 2024
1 parent fd28875 commit 432c847
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 75 deletions.
215 changes: 153 additions & 62 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
guber "github.com/mailgun/gubernator/v2"
"github.com/mailgun/gubernator/v2/cluster"
"github.com/mailgun/holster/v4/clock"
"github.com/mailgun/holster/v4/syncutil"
"github.com/mailgun/holster/v4/testutil"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -968,69 +969,70 @@ func TestGlobalRateLimitsWithLoadBalancing(t *testing.T) {
key := randomKey()

// Determine owner and non-owner peers.
ownerPeerInfo, err := cluster.FindOwningPeer(name, key)
owner, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)
ownerDaemon, err := cluster.FindOwningDaemon(name, key)
// ownerAddr := owner.ownerPeerInfo.GRPCAddress
peers, err := cluster.ListNonOwningDaemons(name, key)
require.NoError(t, err)
owner := ownerPeerInfo.GRPCAddress
nonOwner := cluster.PeerAt(0).GRPCAddress
if nonOwner == owner {
nonOwner = cluster.PeerAt(1).GRPCAddress
}
require.NotEqual(t, owner, nonOwner)
nonOwner := peers[0]

// Connect to owner and non-owner peers in round robin.
dialOpts := []grpc.DialOption{
grpc.WithResolvers(guber.NewStaticBuilder()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
address := fmt.Sprintf("static:///%s,%s", owner, nonOwner)
address := fmt.Sprintf("static:///%s,%s", owner.PeerInfo.GRPCAddress, nonOwner.PeerInfo.GRPCAddress)
conn, err := grpc.DialContext(ctx, address, dialOpts...)
require.NoError(t, err)
client := guber.NewV1Client(conn)

sendHit := func(status guber.Status, i int) {
ctx, cancel := context.WithTimeout(ctx, 10*clock.Second)
sendHit := func(client guber.V1Client, status guber.Status, i int) {
ctx, cancel := context.WithTimeout(context.Background(), 10*clock.Second)
defer cancel()
resp, err := client.GetRateLimits(ctx, &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: name,
UniqueKey: key,
Algorithm: guber.Algorithm_LEAKY_BUCKET,
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Behavior: guber.Behavior_GLOBAL,
Duration: guber.Minute * 5,
Duration: 5 * guber.Minute,
Hits: 1,
Limit: 2,
},
},
})
require.NoError(t, err, i)
item := resp.Responses[0]
assert.Equal(t, "", item.GetError(), fmt.Sprintf("mismatch error, iteration %d", i))
assert.Equal(t, status, item.GetStatus(), fmt.Sprintf("mismatch status, iteration %d", i))
assert.Equal(t, "", item.Error, fmt.Sprintf("unexpected error, iteration %d", i))
assert.Equal(t, status, item.Status, fmt.Sprintf("mismatch status, iteration %d", i))
}

require.NoError(t, waitForBroadcast(1*clock.Minute, owner, 0))
require.NoError(t, waitForBroadcast(1*clock.Minute, nonOwner, 0))

// Send two hits that should be processed by the owner and non-owner and
// deplete the limit consistently.
sendHit(guber.Status_UNDER_LIMIT, 1)
sendHit(guber.Status_UNDER_LIMIT, 2)
require.NoError(t, waitForBroadcast(clock.Second*3, ownerDaemon, 1))
sendHit(client, guber.Status_UNDER_LIMIT, 1)
sendHit(client, guber.Status_UNDER_LIMIT, 2)
require.NoError(t, waitForBroadcast(3*clock.Second, owner, 1))

// All successive hits should return OVER_LIMIT.
for i := 2; i <= 10; i++ {
sendHit(guber.Status_OVER_LIMIT, i)
sendHit(client, guber.Status_OVER_LIMIT, i)
}
}

func TestGlobalRateLimitsPeerOverLimit(t *testing.T) {
name := t.Name()
key := randomKey()
owner, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)
peers, err := cluster.ListNonOwningDaemons(name, key)
require.NoError(t, err)

sendHit := func(expectedStatus guber.Status, hits int64) {
sendHit := func(expectedStatus guber.Status, hits, expectedRemaining int64) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
defer cancel()
resp, err := peers[0].MustClient().GetRateLimits(ctx, &guber.GetRateLimitsReq{
Expand All @@ -1047,31 +1049,40 @@ func TestGlobalRateLimitsPeerOverLimit(t *testing.T) {
},
})
assert.NoError(t, err)
assert.Equal(t, "", resp.Responses[0].GetError())
assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus())
item := resp.Responses[0]
assert.Equal(t, "", item.Error, "unexpected error")
assert.Equal(t, expectedStatus, item.Status, "mismatch status")
assert.Equal(t, expectedRemaining, item.Remaining, "mismatch remaining")
}
owner, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)

require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...))

// Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining
sendHit(guber.Status_UNDER_LIMIT, 1)
sendHit(guber.Status_UNDER_LIMIT, 1)
sendHit(guber.Status_UNDER_LIMIT, 1, 1)
sendHit(guber.Status_UNDER_LIMIT, 1, 0)

// Wait for the broadcast from the owner to the peer
require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1))
require.NoError(t, waitForBroadcast(3*clock.Second, owner, 1))

// Since the remainder is 0, the peer should set OVER_LIMIT instead of waiting for the owner
// to respond with OVER_LIMIT.
sendHit(guber.Status_OVER_LIMIT, 1)
sendHit(guber.Status_OVER_LIMIT, 1, 0)

// Wait for the broadcast from the owner to the peer
require.NoError(t, waitForBroadcast(clock.Second*3, owner, 2))
require.NoError(t, waitForBroadcast(3*clock.Second, owner, 2))

// The status should still be OVER_LIMIT
sendHit(guber.Status_OVER_LIMIT, 0)
sendHit(guber.Status_UNDER_LIMIT, 0, 0)
sendHit(guber.Status_OVER_LIMIT, 1, 0)
}

func TestGlobalRateLimitsPeerOverLimitLeaky(t *testing.T) {
name := t.Name()
key := randomKey()
peers, err := cluster.ListNonOwningDaemons(name, key)
require.NoError(t, err)
owner, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)

sendHit := func(client guber.V1Client, expectedStatus guber.Status, hits int64) {
ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10)
Expand All @@ -1093,27 +1104,20 @@ func TestGlobalRateLimitsPeerOverLimitLeaky(t *testing.T) {
assert.Equal(t, "", resp.Responses[0].GetError())
assert.Equal(t, expectedStatus, resp.Responses[0].GetStatus())
}
owner, err := cluster.FindOwningDaemon(name, key)
require.NoError(t, err)

require.NoError(t, waitForIdle(1*clock.Minute, cluster.GetDaemons()...))

// Send two hits that should be processed by the owner and the broadcast to peer, depleting the remaining
sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1)
sendHit(peers[0].MustClient(), guber.Status_UNDER_LIMIT, 1)

// Wait for the broadcast from the owner to the peers
require.NoError(t, waitForBroadcast(clock.Second*3, owner, 1))

// Ask a different peer if the status is over the limit
sendHit(peers[1].MustClient(), guber.Status_OVER_LIMIT, 1)
}

func getMetricRequest(url string, name string) (*model.Sample, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return getMetric(resp.Body, name)
}

func TestChangeLimit(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil)
require.Nil(t, errs)
Expand Down Expand Up @@ -1530,6 +1534,8 @@ func TestGlobalBehavior(t *testing.T) {
require.NoError(t, err)
t.Logf("Owner peer: %s", owner.InstanceID)

require.NoError(t, waitForIdle(1*time.Minute, cluster.GetDaemons()...))

broadcastCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_broadcast_duration_count")
updateCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_global_send_duration_count")
upgCounters := getPeerCounters(t, cluster.GetDaemons(), "gubernator_grpc_request_duration_count{method=\"/pb.gubernator.PeersV1/UpdatePeerGlobals\"}")
Expand Down Expand Up @@ -1896,6 +1902,56 @@ func TestGlobalBehavior(t *testing.T) {
})
}

// Request metrics and parse into map.
// Optionally pass names to filter metrics by name.
func getMetrics(HTTPAddr string, names ...string) (map[string]*model.Sample, error) {
url := fmt.Sprintf("http://%s/metrics", HTTPAddr)
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
decoder := expfmt.SampleDecoder{
Dec: expfmt.NewDecoder(resp.Body, expfmt.FmtText),
Opts: &expfmt.DecodeOptions{
Timestamp: model.Now(),
},
}
nameSet := make(map[string]struct{})
for _, name := range names {
nameSet[name] = struct{}{}
}
metrics := make(map[string]*model.Sample)

for {
var smpls model.Vector
err := decoder.Decode(&smpls)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
for _, smpl := range smpls {
name := smpl.Metric.String()
if _, ok := nameSet[name]; ok || len(nameSet) == 0 {
metrics[name] = smpl
}
}
}

return metrics, nil
}

func getMetricRequest(url string, name string) (*model.Sample, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return getMetric(resp.Body, name)
}

func getMetric(in io.Reader, name string) (*model.Sample, error) {
dec := expfmt.SampleDecoder{
Dec: expfmt.NewDecoder(in, expfmt.FmtText),
Expand Down Expand Up @@ -1926,67 +1982,102 @@ func getMetric(in io.Reader, name string) (*model.Sample, error) {
}

// waitForBroadcast waits until the broadcast count for the daemon changes to
// the expected value. Returns an error if the expected value is not found
// before the context is cancelled.
// at least the expected value and the broadcast queue is empty.
// Returns an error if timeout waiting for conditions to be met.
func waitForBroadcast(timeout clock.Duration, d *guber.Daemon, expect int) error {
// fmt.Printf("waitForBroadcast() peer: %s\n", d.InstanceID)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

for {
m, err := getMetricRequest(fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress),
"gubernator_broadcast_duration_count")
metrics, err := getMetrics(d.Config().HTTPListenAddress,
"gubernator_broadcast_duration_count", "gubernator_global_queue_length")
if err != nil {
return err
}
// fmt.Printf("gubernator_broadcast_duration_count: %f\n", m.Value)
gbdc := metrics["gubernator_broadcast_duration_count"]
ggql := metrics["gubernator_global_queue_length"]

// It's possible a broadcast occurred twice if waiting for multiple peer to
// forward updates to the owner.
if int(m.Value) >= expect {
// Give the nodes some time to process the broadcasts
// clock.Sleep(clock.Millisecond * 500)
// It's possible a broadcast occurred twice if waiting for multiple
// peers to forward updates to non-owners.
if int(gbdc.Value) >= expect && ggql.Value == 0 {
return nil
}

select {
case <-clock.After(time.Millisecond * 100):
case <-clock.After(100 * clock.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
}
}

// waitForUpdate waits until the global hits update count for the daemon
// changes to the expected value. Returns an error if the expected value is not
// found before the context is cancelled.
// changes to at least the expected value and the global update queue is empty.
// Returns an error if timeout waiting for conditions to be met.
func waitForUpdate(timeout clock.Duration, d *guber.Daemon, expect int) error {
// fmt.Printf("waitForUpdate() peer: %s\n", d.InstanceID)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

for {
m, err := getMetricRequest(fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress),
"gubernator_global_send_duration_count")
metrics, err := getMetrics(d.Config().HTTPListenAddress,
"gubernator_global_send_duration_count", "gubernator_global_send_queue_length")
if err != nil {
return err
}
// fmt.Printf("gubernator_global_send_duration_count: %f\n", m.Value)
gsdc := metrics["gubernator_global_send_duration_count"]
gsql := metrics["gubernator_global_send_queue_length"]

// It's possible a broadcast occurred twice if waiting for multiple peer to
// It's possible a hit occurred twice if waiting for multiple peers to
// forward updates to the owner.
if int(m.Value) >= expect {
if int(gsdc.Value) >= expect && gsql.Value == 0 {
return nil
}

select {
case <-clock.After(time.Millisecond * 100):
case <-clock.After(100 * clock.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
}
}

// waitForIdle waits until both global broadcast and global hits queues are
// empty.
func waitForIdle(timeout clock.Duration, daemons ...*guber.Daemon) error {
var wg syncutil.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for _, d := range daemons {
wg.Run(func(raw any) error {
d := raw.(*guber.Daemon)
for {
metrics, err := getMetrics(d.Config().HTTPListenAddress,
"gubernator_global_queue_length", "gubernator_global_send_queue_length")
if err != nil {
return err
}
ggql := metrics["gubernator_global_queue_length"]
gsql := metrics["gubernator_global_send_queue_length"]

if ggql.Value == 0 && gsql.Value == 0 {
return nil
}

select {
case <-clock.After(100 * clock.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
}
}, d)
}
errs := wg.Wait()
if len(errs) > 0 {
return errs[0]
}
return nil
}

func getMetricValue(t *testing.T, d *guber.Daemon, name string) float64 {
m, err := getMetricRequest(fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress),
name)
Expand Down
Loading

0 comments on commit 432c847

Please sign in to comment.