Skip to content

Commit

Permalink
Update tests to use embedded NATS
Browse files Browse the repository at this point in the history
  • Loading branch information
tylertreat committed Dec 29, 2020
1 parent 1147822 commit f5403db
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 331 deletions.
25 changes: 0 additions & 25 deletions server/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,13 @@ import (

lift "github.com/liftbridge-io/go-liftbridge/v2"
liftApi "github.com/liftbridge-io/liftbridge-api/go"
natsdTest "github.com/nats-io/nats-server/v2/test"
"github.com/stretchr/testify/require"
)

// Ensure activity stream creation event occurs.
func TestActivityStreamCreateStream(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.ActivityStream.Enabled = true
Expand Down Expand Up @@ -64,10 +59,6 @@ func TestActivityStreamCreateStream(t *testing.T) {
func TestActivityStreamDeleteStream(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.ActivityStream.Enabled = true
Expand Down Expand Up @@ -117,10 +108,6 @@ func TestActivityStreamDeleteStream(t *testing.T) {
func TestActivityStreamPauseStream(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.ActivityStream.Enabled = true
Expand Down Expand Up @@ -172,10 +159,6 @@ func TestActivityStreamPauseStream(t *testing.T) {
func TestActivityStreamResumeStream(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.ActivityStream.Enabled = true
Expand Down Expand Up @@ -230,10 +213,6 @@ func TestActivityStreamResumeStream(t *testing.T) {
func TestActivityStreamSetStreamReadonly(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.ActivityStream.Enabled = true
Expand Down Expand Up @@ -285,10 +264,6 @@ func TestActivityStreamSetStreamReadonly(t *testing.T) {
func TestActivityStreamSetStreamReadwrite(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.ActivityStream.Enabled = true
Expand Down
83 changes: 4 additions & 79 deletions server/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import (
"time"

lift "github.com/liftbridge-io/go-liftbridge/v2"
proto "github.com/liftbridge-io/liftbridge-api/go"
natsdTest "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

proto "github.com/liftbridge-io/liftbridge-api/go"

"github.com/liftbridge-io/liftbridge/server/protocol"
)

Expand All @@ -39,10 +38,6 @@ func assertMsg(t *testing.T, expected *message, msg *lift.Message) {
func TestCreateStream(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand All @@ -67,10 +62,6 @@ func TestCreateStream(t *testing.T) {
func TestCreateStreamPropagate(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server.
s1Config := getTestConfig("a", true, 0)
s1 := runServerWithConfig(t, s1Config)
Expand Down Expand Up @@ -100,12 +91,13 @@ func TestCreateStreamPropagate(t *testing.T) {
func TestCreateStreamNoMetadataLeader(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
// Use an external NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server.
s1Config := getTestConfig("a", true, 0)
s1Config.EmbeddedNATS = false
s1 := runServerWithConfig(t, s1Config)
defer s1.Stop()

Expand Down Expand Up @@ -137,10 +129,6 @@ func TestCreateStreamNoMetadataLeader(t *testing.T) {
func TestCreateStreamInsufficientReplicas(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand All @@ -162,10 +150,6 @@ func TestCreateStreamInsufficientReplicas(t *testing.T) {
func TestCreateStreamPartitioned(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand All @@ -189,10 +173,6 @@ func TestCreateStreamPartitioned(t *testing.T) {
func TestSubscribeStreamNoSuchStream(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand All @@ -217,10 +197,6 @@ func TestSubscribeStreamNoSuchStream(t *testing.T) {
func TestDeleteStream(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand Down Expand Up @@ -262,10 +238,6 @@ func TestDeleteStream(t *testing.T) {
func TestDeleteStreamPropagate(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server.
s1Config := getTestConfig("a", true, 0)
s1 := runServerWithConfig(t, s1Config)
Expand Down Expand Up @@ -295,10 +267,6 @@ func TestDeleteStreamPropagate(t *testing.T) {
func TestSubscribeStreamNotLeader(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand Down Expand Up @@ -354,10 +322,6 @@ func TestSubscribeStreamNotLeader(t *testing.T) {
func TestSubscribeStreamNotLeaderDefaultBehavior(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand Down Expand Up @@ -414,14 +378,11 @@ func TestSubscribeStreamNotLeaderDefaultBehavior(t *testing.T) {
func TestStreamReceiveMsgFromReplica(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
defer s1.Stop()

// Configure second server.
s2Config := getTestConfig("b", false, 5051)
s2 := runServerWithConfig(t, s2Config)
Expand Down Expand Up @@ -543,10 +504,6 @@ func TestStreamReceiveMsgFromReplica(t *testing.T) {
func TestStreamPublishSubscribe(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.Clustering.ReplicationMaxBytes = 1024
Expand Down Expand Up @@ -661,10 +618,6 @@ func TestStreamPublishSubscribe(t *testing.T) {
func TestLegacyPublish(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.Clustering.ReplicationMaxBytes = 1024
Expand Down Expand Up @@ -734,10 +687,6 @@ func TestLegacyPublish(t *testing.T) {
func TestPublishToSubject(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand Down Expand Up @@ -873,10 +822,6 @@ func TestSubscribePartitionClosed(t *testing.T) {
func TestSubscribeStopPosition(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand Down Expand Up @@ -1005,10 +950,6 @@ func TestGetStreamConfig(t *testing.T) {
func TestSetFetchCursor(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.CursorsStream.Partitions = 5
Expand Down Expand Up @@ -1063,10 +1004,6 @@ func TestSetFetchCursor(t *testing.T) {
func TestSetFetchCursorNoCache(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1Config.CursorsStream.Partitions = 5
Expand Down Expand Up @@ -1144,10 +1081,6 @@ func publishAndReceive(t *testing.T, client lift.Client, stream string) {
func TestFetchPartitionMetadataMessagesReceivedTimestamps(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand Down Expand Up @@ -1200,10 +1133,6 @@ func TestFetchPartitionMetadataMessagesReceivedTimestamps(t *testing.T) {
func TestFetchPartitionMetadataPauseTimestamps(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand Down Expand Up @@ -1256,10 +1185,6 @@ func TestFetchPartitionMetadataPauseTimestamps(t *testing.T) {
func TestFetchPartitionMetadataReadonlyTimestamps(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure server.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand Down
1 change: 1 addition & 0 deletions server/configs/tls.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ tls:
cert: ./configs/certs/server.crt
logging.level: error
clustering.raft.bootstrap.seed: true
nats.embedded: true
5 changes: 0 additions & 5 deletions server/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,13 @@ import (
"time"

lift "github.com/liftbridge-io/go-liftbridge/v2"
natsdTest "github.com/nats-io/nats-server/v2/test"
"github.com/stretchr/testify/require"
)

// Ensure Raft FSM properly snapshots and restores state.
func TestFSMSnapshotRestore(t *testing.T) {
defer cleanupStorage(t)

// Use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure the server as a seed.
s1Config := getTestConfig("a", true, 5050)
s1 := runServerWithConfig(t, s1Config)
Expand Down
Loading

0 comments on commit f5403db

Please sign in to comment.