diff --git a/server.go b/server.go index da051e06..05bfc713 100644 --- a/server.go +++ b/server.go @@ -34,21 +34,8 @@ const ( ) var ( - // DefaultServerCapabilities defines the default features and capabilities provided by the server. - DefaultServerCapabilities = &Capabilities{ - MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions - MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over - ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client - MaximumQos: 2, // maximum qos value available to clients - RetainAvailable: 1, // retain messages is available - MaximumPacketSize: 0, // no maximum packet size - TopicAliasMaximum: math.MaxUint16, // maximum topic alias value - WildcardSubAvailable: 1, // wildcard subscriptions are available - SubIDAvailable: 1, // subscription identifiers are available - SharedSubAvailable: 1, // shared subscriptions are available - MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0) - MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client - } + // Deprecated: Use NewDefaultServerCapabilities to avoid data race issue. + DefaultServerCapabilities = NewDefaultServerCapabilities() ErrListenerIDExists = errors.New("listener id already exists") // a listener with the same id already exists ErrConnectionClosed = errors.New("connection not open") // connection is closed @@ -74,6 +61,25 @@ type Capabilities struct { SubIDAvailable byte `yaml:"sub_id_available" json:"sub_id_available"` } +// NewDefaultServerCapabilities defines the default features and capabilities provided by the server. +func NewDefaultServerCapabilities() *Capabilities { + return &Capabilities{ + MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over + MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client + MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions + MaximumPacketSize: 0, // no maximum packet size + maximumPacketID: math.MaxUint16, + ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client + TopicAliasMaximum: math.MaxUint16, // maximum topic alias value + SharedSubAvailable: 1, // shared subscriptions are available + MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0) + MaximumQos: 2, // maximum qos value available to clients + RetainAvailable: 1, // retain messages is available + WildcardSubAvailable: 1, // wildcard subscriptions are available + SubIDAvailable: 1, // subscription identifiers are available + } +} + // Compatibilities provides flags for using compatibility modes. type Compatibilities struct { ObscureNotAuthorized bool `yaml:"obscure_not_authorized" json:"obscure_not_authorized"` // return unspecified errors instead of not authorized @@ -198,7 +204,7 @@ func New(opts *Options) *Server { // ensureDefaults ensures that the server starts with sane default values, if none are provided. func (o *Options) ensureDefaults() { if o.Capabilities == nil { - o.Capabilities = DefaultServerCapabilities + o.Capabilities = NewDefaultServerCapabilities() } o.Capabilities.maximumPacketID = math.MaxUint16 // spec maximum is 65535 diff --git a/server_test.go b/server_test.go index c211f065..6761b19f 100644 --- a/server_test.go +++ b/server_test.go @@ -96,24 +96,24 @@ func (h *DelayHook) OnDisconnect(cl *Client, err error, expire bool) { } func newServer() *Server { - cc := *DefaultServerCapabilities + cc := NewDefaultServerCapabilities() cc.MaximumMessageExpiryInterval = 0 cc.ReceiveMaximum = 0 s := New(&Options{ Logger: logger, - Capabilities: &cc, + Capabilities: cc, }) _ = s.AddHook(new(AllowHook), nil) return s } func newServerWithInlineClient() *Server { - cc := *DefaultServerCapabilities + cc := NewDefaultServerCapabilities() cc.MaximumMessageExpiryInterval = 0 cc.ReceiveMaximum = 0 s := New(&Options{ Logger: logger, - Capabilities: &cc, + Capabilities: cc, InlineClient: true, }) _ = s.AddHook(new(AllowHook), nil) @@ -125,7 +125,7 @@ func TestOptionsSetDefaults(t *testing.T) { opts.ensureDefaults() require.Equal(t, defaultSysTopicInterval, opts.SysTopicResendInterval) - require.Equal(t, DefaultServerCapabilities, opts.Capabilities) + require.Equal(t, NewDefaultServerCapabilities(), opts.Capabilities) opts = new(Options) opts.ensureDefaults() @@ -1662,10 +1662,10 @@ func TestServerProcessPublishACLCheckDeny(t *testing.T) { for _, tx := range tt { t.Run(tx.name, func(t *testing.T) { - cc := *DefaultServerCapabilities + cc := NewDefaultServerCapabilities() s := New(&Options{ Logger: logger, - Capabilities: &cc, + Capabilities: cc, }) _ = s.AddHook(new(DenyHook), nil) _ = s.Serve()