From 11478227e3bdf93d90c2c627a1239edbfe273d77 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Tue, 29 Dec 2020 16:21:27 -0700 Subject: [PATCH 1/2] Update dev-standalone Docker image Use embedded NATS server in standalone image. --- docker/dev-standalone/Dockerfile | 6 +----- docker/dev-standalone/README.md | 29 +++++++++++++++----------- docker/dev-standalone/liftbridge.yaml | 3 +++ docker/dev-standalone/script_runner.sh | 14 ------------- documentation/quick_start.md | 6 +++--- 5 files changed, 24 insertions(+), 34 deletions(-) delete mode 100644 docker/dev-standalone/script_runner.sh diff --git a/docker/dev-standalone/Dockerfile b/docker/dev-standalone/Dockerfile index e03bbcee..0dbca553 100644 --- a/docker/dev-standalone/Dockerfile +++ b/docker/dev-standalone/Dockerfile @@ -2,11 +2,9 @@ FROM golang:1.14-alpine as build-base ENV GO111MODULE on RUN go get github.com/liftbridge-io/liftbridge@master -RUN go get github.com/nats-io/nats-server/v2 FROM alpine:latest COPY --from=build-base /go/bin/liftbridge /usr/local/bin/liftbridge -COPY --from=build-base /go/bin/nats-server /usr/local/bin/nats-server EXPOSE 9292 4222 8222 6222 @@ -14,7 +12,5 @@ VOLUME "/tmp/liftbridge/liftbridge-default" COPY docker/dev-standalone/nats-server.conf nats-server.conf COPY docker/dev-standalone/liftbridge.yaml liftbridge.yaml -COPY docker/dev-standalone/script_runner.sh script_runner.sh -RUN chmod +x script_runner.sh -CMD ./script_runner.sh +ENTRYPOINT ["liftbridge", "-c", "liftbridge.yaml"] diff --git a/docker/dev-standalone/README.md b/docker/dev-standalone/README.md index 4d784e50..6e0d646b 100644 --- a/docker/dev-standalone/README.md +++ b/docker/dev-standalone/README.md @@ -25,21 +25,26 @@ We can check the logs to see if the container booted properly: ``` $ docker logs liftbridge-main -[6] 2019/08/21 11:26:09.366043 [INF] Starting nats-server version 2.0.4 -[6] 2019/08/21 11:26:09.366094 [INF] Git commit [not set] -[6] 2019/08/21 11:26:09.366274 [INF] Starting http monitor on 0.0.0.0:8222 -[6] 2019/08/21 11:26:09.366333 [INF] Listening for client connections on 0.0.0.0:4222 -[6] 2019/08/21 11:26:09.366354 [INF] Server id is NAI5VLHK3IWC5ENOS3HARMNGZTWNRLXQCYPHTKNXXCGW5EJWR4JXCNVZ -[6] 2019/08/21 11:26:09.366358 [INF] Server is ready -[6] 2019/08/21 11:26:09.366761 [INF] Listening for route connections on 0.0.0.0:6222 -time="2019-08-21 11:26:14" level=info msg="Server ID: MSQaSobS9afF1aN4E8oTIJ" -time="2019-08-21 11:26:14" level=info msg="Namespace: liftbridge-default" -time="2019-08-21 11:26:14" level=info msg="Retention Policy: [Age: 1 week, Compact: false]" -time="2019-08-21 11:26:14" level=info msg="Starting server on :9292..." -time="2019-08-21 11:26:15" level=info msg="Server became metadata leader, performing leader promotion actions" +time="2020-12-29 21:30:09" level=info msg="Liftbridge Version: v1.5.0" +time="2020-12-29 21:30:09" level=info msg="Server ID: 3qPpmKQXnP0J6xDOsIhsVb" +time="2020-12-29 21:30:09" level=info msg="Namespace: liftbridge-default" +time="2020-12-29 21:30:09" level=info msg="NATS Servers: [nats://127.0.0.1:4222]" +time="2020-12-29 21:30:09" level=info msg="Default Retention Policy: [Age: 1 week, Compact: false]" +time="2020-12-29 21:30:09" level=info msg="Default Partition Pausing: disabled" +time="2020-12-29 21:30:09" level=info msg="Starting embedded NATS server on 0.0.0.0:4222" +time="2020-12-29 21:30:09" level=info msg="nats: Starting nats-server version 2.1.9" +time="2020-12-29 21:30:09" level=info msg="nats: Git commit [not set]" +time="2020-12-29 21:30:09" level=info msg="nats: Starting http monitor on 0.0.0.0:8222" +time="2020-12-29 21:30:09" level=info msg="nats: Listening for client connections on 0.0.0.0:4222" +time="2020-12-29 21:30:09" level=info msg="nats: Server id is NDFWAP5HYPXXI52CKFACDHLEV2V3U4SBFDUKDPOHQ2LNIXYE2SUASBH6" +time="2020-12-29 21:30:09" level=info msg="nats: Server is ready" +time="2020-12-29 21:30:09" level=info msg="nats: Listening for route connections on 0.0.0.0:6222" +time="2020-12-29 21:30:09" level=info msg="Starting Liftbridge server on 0.0.0.0:9292..." +time="2020-12-29 21:30:10" level=info msg="Server became metadata leader, performing leader promotion actions" ``` If you want to advertise a docker host that is not localhost: + ``` docker run -d --add-host registry:0.0.0.0 --name=liftbridge-main -p 4222:4222 -p 9292:9292 -p 8222:8222 -p 6222:6222 -eLIFTBRIDGE_HOST=registry liftbridge/standalone-dev ``` diff --git a/docker/dev-standalone/liftbridge.yaml b/docker/dev-standalone/liftbridge.yaml index ea43a13f..3133addd 100644 --- a/docker/dev-standalone/liftbridge.yaml +++ b/docker/dev-standalone/liftbridge.yaml @@ -1,2 +1,5 @@ --- host: 0.0.0.0 +clustering.raft.bootstrap.seed: true +nats.embedded.config: nats-server.conf +logging.nats: true diff --git a/docker/dev-standalone/script_runner.sh b/docker/dev-standalone/script_runner.sh deleted file mode 100644 index 909b81c9..00000000 --- a/docker/dev-standalone/script_runner.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/ash - -# turn on ash's job control -set -m - -# Start the nats-server and put it in the background -nats-server -c nats-server.conf & - -while ! nc -z localhost 8222; do sleep 1 ; echo 'waiting nats server' ; done - -# Start liftbridge -liftbridge --raft-bootstrap-seed -c liftbridge.yaml - -fg %1 diff --git a/documentation/quick_start.md b/documentation/quick_start.md index 9b949b96..90eaa063 100644 --- a/documentation/quick_start.md +++ b/documentation/quick_start.md @@ -83,9 +83,9 @@ $ liftbridge --raft-bootstrap-peers server-2,server-3 Instead of running a binary, you can run Liftbridge using a container. There is a [container image](https://hub.docker.com/r/liftbridge/standalone-dev) -available which runs an instance of Liftbridge and NATS inside a single Docker -container. This is meant for development and testing purposes. Use the -following Docker commands to run this container: +available which runs an instance of Liftbridge and an embedded NATS server +inside a single Docker container. This is meant for development and testing +purposes. Use the following Docker commands to run this container: ```shell $ docker pull liftbridge/standalone-dev From f5403dbd67adbeffeb3b1aaa019d2b5a8f5697c6 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Tue, 29 Dec 2020 16:22:15 -0700 Subject: [PATCH 2/2] Update tests to use embedded NATS --- server/activity_test.go | 25 ------- server/api_test.go | 83 ++--------------------- server/configs/tls.yaml | 1 + server/fsm_test.go | 5 -- server/partition_test.go | 114 ++++++++++++-------------------- server/replicator_test.go | 46 +++++++------ server/server.go | 10 ++- server/server_test.go | 134 +++----------------------------------- 8 files changed, 87 insertions(+), 331 deletions(-) diff --git a/server/activity_test.go b/server/activity_test.go index c8742e5f..ef39ef31 100644 --- a/server/activity_test.go +++ b/server/activity_test.go @@ -7,7 +7,6 @@ import ( lift "github.com/liftbridge-io/go-liftbridge/v2" liftApi "github.com/liftbridge-io/liftbridge-api/go" - natsdTest "github.com/nats-io/nats-server/v2/test" "github.com/stretchr/testify/require" ) @@ -15,10 +14,6 @@ import ( func TestActivityStreamCreateStream(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.ActivityStream.Enabled = true @@ -64,10 +59,6 @@ func TestActivityStreamCreateStream(t *testing.T) { func TestActivityStreamDeleteStream(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.ActivityStream.Enabled = true @@ -117,10 +108,6 @@ func TestActivityStreamDeleteStream(t *testing.T) { func TestActivityStreamPauseStream(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.ActivityStream.Enabled = true @@ -172,10 +159,6 @@ func TestActivityStreamPauseStream(t *testing.T) { func TestActivityStreamResumeStream(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.ActivityStream.Enabled = true @@ -230,10 +213,6 @@ func TestActivityStreamResumeStream(t *testing.T) { func TestActivityStreamSetStreamReadonly(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.ActivityStream.Enabled = true @@ -285,10 +264,6 @@ func TestActivityStreamSetStreamReadonly(t *testing.T) { func TestActivityStreamSetStreamReadwrite(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.ActivityStream.Enabled = true diff --git a/server/api_test.go b/server/api_test.go index a0f39537..ff6dbcd6 100644 --- a/server/api_test.go +++ b/server/api_test.go @@ -10,6 +10,7 @@ import ( "time" lift "github.com/liftbridge-io/go-liftbridge/v2" + proto "github.com/liftbridge-io/liftbridge-api/go" natsdTest "github.com/nats-io/nats-server/v2/test" "github.com/nats-io/nats.go" "github.com/stretchr/testify/require" @@ -17,8 +18,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - proto "github.com/liftbridge-io/liftbridge-api/go" - "github.com/liftbridge-io/liftbridge/server/protocol" ) @@ -39,10 +38,6 @@ func assertMsg(t *testing.T, expected *message, msg *lift.Message) { func TestCreateStream(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -67,10 +62,6 @@ func TestCreateStream(t *testing.T) { func TestCreateStreamPropagate(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. s1Config := getTestConfig("a", true, 0) s1 := runServerWithConfig(t, s1Config) @@ -100,12 +91,13 @@ func TestCreateStreamPropagate(t *testing.T) { func TestCreateStreamNoMetadataLeader(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. + // Use an external NATS server. ns := natsdTest.RunDefaultServer() defer ns.Shutdown() // Configure first server. s1Config := getTestConfig("a", true, 0) + s1Config.EmbeddedNATS = false s1 := runServerWithConfig(t, s1Config) defer s1.Stop() @@ -137,10 +129,6 @@ func TestCreateStreamNoMetadataLeader(t *testing.T) { func TestCreateStreamInsufficientReplicas(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -162,10 +150,6 @@ func TestCreateStreamInsufficientReplicas(t *testing.T) { func TestCreateStreamPartitioned(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -189,10 +173,6 @@ func TestCreateStreamPartitioned(t *testing.T) { func TestSubscribeStreamNoSuchStream(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -217,10 +197,6 @@ func TestSubscribeStreamNoSuchStream(t *testing.T) { func TestDeleteStream(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -262,10 +238,6 @@ func TestDeleteStream(t *testing.T) { func TestDeleteStreamPropagate(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. s1Config := getTestConfig("a", true, 0) s1 := runServerWithConfig(t, s1Config) @@ -295,10 +267,6 @@ func TestDeleteStreamPropagate(t *testing.T) { func TestSubscribeStreamNotLeader(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -354,10 +322,6 @@ func TestSubscribeStreamNotLeader(t *testing.T) { func TestSubscribeStreamNotLeaderDefaultBehavior(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -414,14 +378,11 @@ func TestSubscribeStreamNotLeaderDefaultBehavior(t *testing.T) { func TestStreamReceiveMsgFromReplica(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) defer s1.Stop() + // Configure second server. s2Config := getTestConfig("b", false, 5051) s2 := runServerWithConfig(t, s2Config) @@ -543,10 +504,6 @@ func TestStreamReceiveMsgFromReplica(t *testing.T) { func TestStreamPublishSubscribe(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.Clustering.ReplicationMaxBytes = 1024 @@ -661,10 +618,6 @@ func TestStreamPublishSubscribe(t *testing.T) { func TestLegacyPublish(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.Clustering.ReplicationMaxBytes = 1024 @@ -734,10 +687,6 @@ func TestLegacyPublish(t *testing.T) { func TestPublishToSubject(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -873,10 +822,6 @@ func TestSubscribePartitionClosed(t *testing.T) { func TestSubscribeStopPosition(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1005,10 +950,6 @@ func TestGetStreamConfig(t *testing.T) { func TestSetFetchCursor(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.CursorsStream.Partitions = 5 @@ -1063,10 +1004,6 @@ func TestSetFetchCursor(t *testing.T) { func TestSetFetchCursorNoCache(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.CursorsStream.Partitions = 5 @@ -1144,10 +1081,6 @@ func publishAndReceive(t *testing.T, client lift.Client, stream string) { func TestFetchPartitionMetadataMessagesReceivedTimestamps(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1200,10 +1133,6 @@ func TestFetchPartitionMetadataMessagesReceivedTimestamps(t *testing.T) { func TestFetchPartitionMetadataPauseTimestamps(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1256,10 +1185,6 @@ func TestFetchPartitionMetadataPauseTimestamps(t *testing.T) { func TestFetchPartitionMetadataReadonlyTimestamps(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) diff --git a/server/configs/tls.yaml b/server/configs/tls.yaml index 39fd867e..f7fd54fc 100644 --- a/server/configs/tls.yaml +++ b/server/configs/tls.yaml @@ -4,3 +4,4 @@ tls: cert: ./configs/certs/server.crt logging.level: error clustering.raft.bootstrap.seed: true +nats.embedded: true diff --git a/server/fsm_test.go b/server/fsm_test.go index eb78a28a..fa6cbe36 100644 --- a/server/fsm_test.go +++ b/server/fsm_test.go @@ -6,7 +6,6 @@ import ( "time" lift "github.com/liftbridge-io/go-liftbridge/v2" - natsdTest "github.com/nats-io/nats-server/v2/test" "github.com/stretchr/testify/require" ) @@ -14,10 +13,6 @@ import ( func TestFSMSnapshotRestore(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure the server as a seed. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) diff --git a/server/partition_test.go b/server/partition_test.go index 7037e49d..bc25c11a 100644 --- a/server/partition_test.go +++ b/server/partition_test.go @@ -7,7 +7,6 @@ import ( "github.com/Workiva/go-datastructures/queue" lift "github.com/liftbridge-io/go-liftbridge/v2" - natsdTest "github.com/nats-io/nats-server/v2/test" "github.com/nats-io/nats.go" "github.com/stretchr/testify/require" @@ -15,9 +14,8 @@ import ( proto "github.com/liftbridge-io/liftbridge/server/protocol" ) -func createServer(leader bool) *Server { - config := getTestConfig("a", leader, 0) - config.Clustering.RaftBootstrapSeed = true +func createServer() *Server { + config := getTestConfig("a", true, 0) return New(config) } @@ -50,20 +48,16 @@ func waitForPause(t *testing.T, timeout time.Duration, partition *partition) { func TestPartitionCommitLoopCommitNoAck(t *testing.T) { defer cleanupStorage(t) - // Start NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() + // Start Liftbridge server. + server := createServer() + require.NoError(t, server.Start()) + defer server.Stop() // Create NATS connection. nc, err := nats.GetDefaultOptions().Connect() require.NoError(t, err) defer nc.Close() - // Start Liftbridge server. - server := createServer(false) - require.NoError(t, server.Start()) - defer server.Stop() - p, err := server.newPartition(&proto.Partition{ Subject: "foo", Stream: "foo", @@ -118,20 +112,16 @@ func TestPartitionCommitLoopCommitNoAck(t *testing.T) { func TestPartitionCommitLoopCommitAck(t *testing.T) { defer cleanupStorage(t) - // Start NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() + // Start Liftbridge server. + server := createServer() + require.NoError(t, server.Start()) + defer server.Stop() // Create NATS connection. nc, err := nats.GetDefaultOptions().Connect() require.NoError(t, err) defer nc.Close() - // Start Liftbridge server. - server := createServer(false) - require.NoError(t, server.Start()) - defer server.Stop() - p, err := server.newPartition(&proto.Partition{ Subject: "foo", Stream: "foo", @@ -185,20 +175,16 @@ func TestPartitionCommitLoopCommitAck(t *testing.T) { func TestPartitionCommitLoopEmptyQueue(t *testing.T) { defer cleanupStorage(t) - // Start NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() + // Start Liftbridge server. + server := createServer() + require.NoError(t, server.Start()) + defer server.Stop() // Create NATS connection. nc, err := nats.GetDefaultOptions().Connect() require.NoError(t, err) defer nc.Close() - // Start Liftbridge server. - server := createServer(false) - require.NoError(t, server.Start()) - defer server.Stop() - p, err := server.newPartition(&proto.Partition{ Subject: "foo", Stream: "foo", @@ -248,20 +234,16 @@ func TestPartitionCommitLoopEmptyQueue(t *testing.T) { func TestPartitionCommitLoopDisposedQueue(t *testing.T) { defer cleanupStorage(t) - // Start NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() + // Start Liftbridge server. + server := createServer() + require.NoError(t, server.Start()) + defer server.Stop() // Create NATS connection. nc, err := nats.GetDefaultOptions().Connect() require.NoError(t, err) defer nc.Close() - // Start Liftbridge server. - server := createServer(false) - require.NoError(t, server.Start()) - defer server.Stop() - p, err := server.newPartition(&proto.Partition{ Subject: "foo", Stream: "foo", @@ -321,7 +303,7 @@ func TestPartitionCommitLoopDisposedQueue(t *testing.T) { func TestPartitionCommitLoopNoCommitBelowMinISR(t *testing.T) { defer cleanupStorage(t) - server := createServer(false) + server := createServer() server.config.Clustering.MinISR = 2 p, err := server.newPartition(&proto.Partition{ Subject: "foo", @@ -373,7 +355,7 @@ func TestPartitionCommitLoopNoCommitBelowMinISR(t *testing.T) { // replica. func TestPartitionRemoveFromISRNotReplica(t *testing.T) { defer cleanupStorage(t) - server := createServer(false) + server := createServer() p, err := server.newPartition(&proto.Partition{ Subject: "foo", Stream: "foo", @@ -387,7 +369,7 @@ func TestPartitionRemoveFromISRNotReplica(t *testing.T) { // commit check on the follower. func TestPartitionRemoveFromISRFollower(t *testing.T) { defer cleanupStorage(t) - server := createServer(false) + server := createServer() p, err := server.newPartition(&proto.Partition{ Subject: "foo", Stream: "foo", @@ -414,7 +396,7 @@ func TestPartitionRemoveFromISRFollower(t *testing.T) { // check on the leader. func TestPartitionRemoveFromISRLeader(t *testing.T) { defer cleanupStorage(t) - server := createServer(false) + server := createServer() p, err := server.newPartition(&proto.Partition{ Subject: "foo", Stream: "foo", @@ -444,7 +426,7 @@ func TestPartitionRemoveFromISRLeader(t *testing.T) { // as below the minimum ISR when the ISR shrinks below the minimum. func TestPartitionRemoveFromISRBelowMin(t *testing.T) { defer cleanupStorage(t) - server := createServer(false) + server := createServer() server.config.Clustering.MinISR = 3 p, err := server.newPartition(&proto.Partition{ Subject: "foo", @@ -464,7 +446,7 @@ func TestPartitionRemoveFromISRBelowMin(t *testing.T) { // Ensure AddToISR returns an error if the replica is not a stream replica. func TestPartitionAddToISRNotReplica(t *testing.T) { defer cleanupStorage(t) - server := createServer(false) + server := createServer() p, err := server.newPartition(&proto.Partition{ Subject: "foo", Stream: "foo", @@ -477,7 +459,7 @@ func TestPartitionAddToISRNotReplica(t *testing.T) { // Ensure AddToISR adds the replica to the ISR. func TestPartitionAddToISR(t *testing.T) { defer cleanupStorage(t) - server := createServer(false) + server := createServer() p, err := server.newPartition(&proto.Partition{ Subject: "foo", Stream: "foo", @@ -499,7 +481,7 @@ func TestPartitionAddToISR(t *testing.T) { // minimum ISR and has recovered, marks the stream ISR as recovered. func TestPartitionAddToISRRecoverMin(t *testing.T) { defer cleanupStorage(t) - server := createServer(false) + server := createServer() server.config.Clustering.MinISR = 3 p, err := server.newPartition(&proto.Partition{ Subject: "foo", @@ -525,15 +507,6 @@ func TestPartitionAddToISRRecoverMin(t *testing.T) { func TestPartitionReplicationRequestLoopPreempt(t *testing.T) { defer cleanupStorage(t) - // Start NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - - // Create NATS connection. - nc, err := nats.GetDefaultOptions().Connect() - require.NoError(t, err) - defer nc.Close() - // Start Liftbridge server. config := getTestConfig("a", true, 5050) // Set idle wait long enough that it shouldn't be reached. @@ -541,6 +514,11 @@ func TestPartitionReplicationRequestLoopPreempt(t *testing.T) { server := runServerWithConfig(t, config) defer server.Stop() + // Create NATS connection. + nc, err := nats.GetDefaultOptions().Connect() + require.NoError(t, err) + defer nc.Close() + p, err := server.newPartition(&proto.Partition{ Subject: "foo", Stream: "foo", @@ -588,7 +566,7 @@ func TestPartitionReplicationRequestLoopPreempt(t *testing.T) { // Ensure that a new partition can be created with custom StreamConfig. func TestPartitionWithCustomConfigNoError(t *testing.T) { defer cleanupStorage(t) - server := createServer(false) + server := createServer() customStreamConfig := &proto.StreamConfig{ RetentionMaxMessages: &proto.NullableInt64{Value: 1000}, } @@ -608,15 +586,6 @@ func TestPartitionWithCustomConfigNoError(t *testing.T) { func TestPartitionAutoPause(t *testing.T) { defer cleanupStorage(t) - // Start NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - - // Create NATS connection. - nc, err := nats.GetDefaultOptions().Connect() - require.NoError(t, err) - defer nc.Close() - // Configure server. s1Config := getTestConfig("a", true, 5050) autoPauseTime := 100 * time.Millisecond @@ -624,6 +593,11 @@ func TestPartitionAutoPause(t *testing.T) { s1 := runServerWithConfig(t, s1Config) defer s1.Stop() + // Create NATS connection. + nc, err := nats.GetDefaultOptions().Connect() + require.NoError(t, err) + defer nc.Close() + // Wait for server to elect itself leader. getMetadataLeader(t, 10*time.Second, s1) @@ -664,15 +638,6 @@ func TestPartitionAutoPause(t *testing.T) { func TestPartitionAutoPauseDisableIfSubscribers(t *testing.T) { defer cleanupStorage(t) - // Start NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - - // Create NATS connection. - nc, err := nats.GetDefaultOptions().Connect() - require.NoError(t, err) - defer nc.Close() - // Configure server. s1Config := getTestConfig("a", true, 5050) autoPauseTime := 100 * time.Millisecond @@ -681,6 +646,11 @@ func TestPartitionAutoPauseDisableIfSubscribers(t *testing.T) { s1 := runServerWithConfig(t, s1Config) defer s1.Stop() + // Create NATS connection. + nc, err := nats.GetDefaultOptions().Connect() + require.NoError(t, err) + defer nc.Close() + // Wait for server to elect itself leader. getMetadataLeader(t, 10*time.Second, s1) diff --git a/server/replicator_test.go b/server/replicator_test.go index 11a80339..5bf58da6 100644 --- a/server/replicator_test.go +++ b/server/replicator_test.go @@ -86,12 +86,13 @@ func stopFollowing(t *testing.T, p *partition) { func TestStreamLeaderFailover(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. + // Use an external NATS server. ns := natsdTest.RunDefaultServer() defer ns.Shutdown() // Configure first server. s1Config := getTestConfig("a", true, 5050) + s1Config.EmbeddedNATS = false s1Config.Clustering.ReplicaMaxLeaderTimeout = time.Second s1Config.Clustering.ReplicaMaxIdleWait = 500 * time.Millisecond s1Config.Clustering.ReplicaFetchTimeout = 500 * time.Millisecond @@ -215,12 +216,13 @@ func TestStreamLeaderFailover(t *testing.T) { func TestCommitOnISRShrink(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. + // Use an external NATS server. ns := natsdTest.RunDefaultServer() defer ns.Shutdown() // Configure first server. s1Config := getTestConfig("a", true, 5050) + s1Config.EmbeddedNATS = false s1Config.Clustering.ReplicaMaxLagTime = time.Second s1Config.Clustering.ReplicaFetchTimeout = 100 * time.Millisecond s1 := runServerWithConfig(t, s1Config) @@ -296,12 +298,13 @@ func TestCommitOnISRShrink(t *testing.T) { func TestAckPolicyLeader(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. + // Use an external NATS server. ns := natsdTest.RunDefaultServer() defer ns.Shutdown() // Configure first server. s1Config := getTestConfig("a", true, 5050) + s1Config.EmbeddedNATS = false s1 := runServerWithConfig(t, s1Config) defer s1.Stop() @@ -358,12 +361,13 @@ func TestAckPolicyLeader(t *testing.T) { func TestCommitOnRestart(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. + // Use an external NATS server. ns := natsdTest.RunDefaultServer() defer ns.Shutdown() // Configure first server. s1Config := getTestConfig("a", true, 5050) + s1Config.EmbeddedNATS = false s1Config.Clustering.MinISR = 2 s1 := runServerWithConfig(t, s1Config) defer s1.Stop() @@ -480,10 +484,6 @@ func TestCommitOnRestart(t *testing.T) { func TestTruncateFastLeaderElection(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. s1Config := getTestConfig("a", true, 5050) s1Config.Clustering.MinISR = 1 @@ -615,12 +615,13 @@ func TestTruncateFastLeaderElection(t *testing.T) { func TestTruncatePreventReplicaDivergence(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. + // Use an external NATS server. ns := natsdTest.RunDefaultServer() defer ns.Shutdown() // Configure first server. s1Config := getTestConfig("a", true, 5050) + s1Config.EmbeddedNATS = false s1Config.Clustering.MinISR = 1 s1Config.Clustering.ReplicaMaxLeaderTimeout = time.Second s1Config.Clustering.ReplicaMaxIdleWait = 500 * time.Millisecond @@ -807,15 +808,6 @@ func TestTruncatePreventReplicaDivergence(t *testing.T) { func TestReplicatorNotifyNewData(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - - // Create NATS connection. - nc, err := nats.GetDefaultOptions().Connect() - require.NoError(t, err) - defer nc.Close() - // Configure first server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -826,6 +818,11 @@ func TestReplicatorNotifyNewData(t *testing.T) { s2 := runServerWithConfig(t, s2Config) defer s2.Stop() + // Create NATS connection. + nc, err := nats.GetDefaultOptions().Connect() + require.NoError(t, err) + defer nc.Close() + client, err := lift.Connect([]string{"localhost:5050", "localhost:5051"}) require.NoError(t, err) defer client.Close() @@ -883,17 +880,13 @@ func TestReplicatorNotifyNewData(t *testing.T) { func TestShrinkExpandISR(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. + // Use an external NATS server. ns := natsdTest.RunDefaultServer() defer ns.Shutdown() - // Create NATS connection. - nc, err := nats.GetDefaultOptions().Connect() - require.NoError(t, err) - defer nc.Close() - // Configure first server. s1Config := getTestConfig("a", true, 5050) + s1Config.EmbeddedNATS = false s1Config.Clustering.ReplicaMaxLagTime = time.Second s1Config.Clustering.ReplicaMaxIdleWait = 2 * time.Millisecond s1 := runServerWithConfig(t, s1Config) @@ -913,6 +906,11 @@ func TestShrinkExpandISR(t *testing.T) { s3 := runServerWithConfig(t, s3Config) defer s3.Stop() + // Create NATS connection. + nc, err := nats.GetDefaultOptions().Connect() + require.NoError(t, err) + defer nc.Close() + getMetadataLeader(t, 10*time.Second, s1, s2, s3) client, err := lift.Connect([]string{"localhost:5050"}) diff --git a/server/server.go b/server/server.go index 643a7cb0..6bfd9360 100644 --- a/server/server.go +++ b/server/server.go @@ -498,7 +498,15 @@ func (s *Server) createNATSConn(name string) (*nats.Conn, error) { return nil, err } - return opts.Connect() + var conn *nats.Conn + for i := 0; i < 5; i++ { + conn, err = opts.Connect() + if err == nil { + return conn, nil + } + time.Sleep(5 * time.Millisecond) + } + return nil, err } // startRaftLeadershipLoop start a goroutine for automatically responding to diff --git a/server/server_test.go b/server/server_test.go index 9e26b4db..b0ddd7ce 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -47,7 +47,7 @@ func getTestConfig(id string, bootstrap bool, port int) *Config { config.LogRaft = true config.Clustering.ServerID = id config.LogLevel = uint32(log.DebugLevel) - config.NATS.Servers = []string{"nats://localhost:4222"} + config.EmbeddedNATS = bootstrap config.LogSilent = true config.Port = port return config @@ -204,10 +204,6 @@ func forceLogClean(t *testing.T, subject, name string, s *Server) { func TestNoSeed(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. Starting this should fail because there is no // seed node. raftJoinAttempts = 1 @@ -215,6 +211,7 @@ func TestNoSeed(t *testing.T) { raftJoinAttempts = defaultRaftJoinAttempts }() s1Config := getTestConfig("a", false, 0) + s1Config.EmbeddedNATS = true server := New(s1Config) err := server.Start() require.Error(t, err) @@ -225,10 +222,6 @@ func TestNoSeed(t *testing.T) { func TestAssignedDurableServerID(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 0) s1 := runServerWithConfig(t, s1Config) @@ -270,10 +263,6 @@ func TestAssignedDurableServerID(t *testing.T) { func TestDurableServerID(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 0) s1Config.Clustering.ServerID = "a" @@ -316,10 +305,6 @@ func TestDurableServerID(t *testing.T) { func TestHealthServerStartedCorrectly(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure the first server as a seed. s1Config := getTestConfig("a", true, 20000) s1 := runServerWithConfig(t, s1Config) @@ -345,10 +330,6 @@ func TestHealthServerStartedCorrectly(t *testing.T) { func TestBootstrapAutoConfig(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure the first server as a seed. s1Config := getTestConfig("a", true, 0) s1 := runServerWithConfig(t, s1Config) @@ -380,12 +361,9 @@ func TestBootstrapAutoConfig(t *testing.T) { func TestBootstrapManualConfig(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. s1Config := getTestConfig("a", false, 0) + s1Config.EmbeddedNATS = true s1Config.Clustering.ServerID = "a" s1Config.Clustering.RaftBootstrapPeers = []string{"b"} s1 := runServerWithConfig(t, s1Config) @@ -438,10 +416,6 @@ func TestBootstrapMisconfiguration(t *testing.T) { bootstrapMisconfigInterval = defaultBootstrapMisconfigInterval }() - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - s1Config := getTestConfig("a", true, 0) s1 := New(s1Config) s1FatalLogger := &captureFatalLogger{} @@ -455,6 +429,7 @@ func TestBootstrapMisconfiguration(t *testing.T) { // Configure second server on same cluster as a seed too. Servers should // stop. s2Config := getTestConfig("b", true, 0) + s2Config.EmbeddedNATS = false s2 := New(s2Config) s2FatalLogger := &captureFatalLogger{} s2.logger = s2FatalLogger @@ -496,10 +471,6 @@ func TestBootstrapMisconfiguration(t *testing.T) { func TestBootstrapConcurrentWithActivityStream(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Start three servers concurrently with activity stream enabled. ids := []string{"s1", "s2", "s3"} type result struct { @@ -512,6 +483,9 @@ func TestBootstrapConcurrentWithActivityStream(t *testing.T) { for _, id := range ids { go func(id string) { config := getTestConfig(id, false, 0) + if id == "s1" { + config.EmbeddedNATS = true + } config.Clustering.ServerID = id config.Clustering.RaftBootstrapPeers = ids config.ActivityStream.Enabled = true @@ -540,12 +514,13 @@ func TestBootstrapConcurrentWithActivityStream(t *testing.T) { func TestMetadataLeaderFailover(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. + // Use an external NATS server. ns := natsdTest.RunDefaultServer() defer ns.Shutdown() // Configure first server. s1Config := getTestConfig("a", true, 0) + s1Config.EmbeddedNATS = false s1 := runServerWithConfig(t, s1Config) defer s1.Stop() @@ -582,10 +557,6 @@ func TestMetadataLeaderFailover(t *testing.T) { func TestSubscribeOffsetOverflow(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -642,10 +613,6 @@ func TestSubscribeOffsetOverflow(t *testing.T) { func TestSubscribeOffsetOverflowEmptyStream(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -693,10 +660,6 @@ func TestSubscribeOffsetOverflowEmptyStream(t *testing.T) { func TestSubscribeOffsetUnderflow(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) // Set these to force deletion so we can get an underflow. @@ -755,10 +718,6 @@ func TestSubscribeOffsetUnderflow(t *testing.T) { func TestStreamRetentionBytes(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.Streams.SegmentMaxBytes = 1 @@ -816,10 +775,6 @@ func TestStreamRetentionBytes(t *testing.T) { func TestStreamRetentionMessages(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.Streams.SegmentMaxBytes = 1 @@ -877,10 +832,6 @@ func TestStreamRetentionMessages(t *testing.T) { func TestStreamRetentionAge(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1Config.Streams.SegmentMaxBytes = 1 @@ -939,10 +890,6 @@ func TestStreamRetentionAge(t *testing.T) { func TestSubscribeEarliest(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) // Set these to force deletion. @@ -1000,10 +947,6 @@ func TestSubscribeEarliest(t *testing.T) { func TestSubscribeLatest(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1054,10 +997,6 @@ func TestSubscribeLatest(t *testing.T) { func TestSubscribeNewOnly(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1130,10 +1069,6 @@ func TestSubscribeStartTime(t *testing.T) { timestamp = timestampBefore }() - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1190,19 +1125,12 @@ func TestSubscribeStartTime(t *testing.T) { func TestTLS(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server with TLS. s1Config, err := NewConfig("./configs/tls.yaml") require.NoError(t, err) s1 := runServerWithConfig(t, s1Config) defer s1.Stop() - // Wait for server to elect itself leader. - getMetadataLeader(t, 10*time.Second, s1) - // Connect with TLS. client, err := lift.Connect([]string{"localhost:5050"}, lift.TLSCert("./configs/certs/server.crt")) require.NoError(t, err) @@ -1315,10 +1243,6 @@ func TestDefaultListenHost(t *testing.T) { func TestMetadataLeadershipLifecycle(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. s1Config := getTestConfig("a", true, 0) s1 := runServerWithConfig(t, s1Config) @@ -1355,10 +1279,6 @@ func TestMetadataLeadershipLifecycle(t *testing.T) { func TestPropagatedShrinkExpandISR(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1431,10 +1351,6 @@ func TestPropagatedShrinkExpandISR(t *testing.T) { func TestPauseStreamAllPartitions(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1503,10 +1419,6 @@ func TestPauseStreamAllPartitions(t *testing.T) { func TestPauseStreamSomePartitions(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1572,10 +1484,6 @@ func TestPauseStreamSomePartitions(t *testing.T) { func TestPauseStreamPropagate(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. s1Config := getTestConfig("a", true, 0) s1 := runServerWithConfig(t, s1Config) @@ -1620,10 +1528,6 @@ func TestPauseStreamPropagate(t *testing.T) { func TestSetStreamReadonlyAllPartitions(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1669,10 +1573,6 @@ func TestSetStreamReadonlyAllPartitions(t *testing.T) { func TestSetStreamReadonlySomePartitions(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1751,10 +1651,6 @@ func TestSetStreamReadonlySomePartitions(t *testing.T) { func TestSetStreamReadonlySubscription(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1812,10 +1708,6 @@ func TestSetStreamReadonlySubscription(t *testing.T) { func TestSetStreamReadonlyPropagate(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure first server. s1Config := getTestConfig("a", true, 0) s1 := runServerWithConfig(t, s1Config) @@ -1856,10 +1748,6 @@ func TestSetStreamReadonlyPropagate(t *testing.T) { func TestPublishNoSuchStream(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config) @@ -1882,10 +1770,6 @@ func TestPublishNoSuchStream(t *testing.T) { func TestPublishNoSuchPartition(t *testing.T) { defer cleanupStorage(t) - // Use a central NATS server. - ns := natsdTest.RunDefaultServer() - defer ns.Shutdown() - // Configure server. s1Config := getTestConfig("a", true, 5050) s1 := runServerWithConfig(t, s1Config)