Skip to content

Commit

Permalink
Replace DefaultServerCapabilities with NewDefaultServerCapabilities()…
Browse files Browse the repository at this point in the history
… to avoid data race (#360)

Co-authored-by: JB <[email protected]>
  • Loading branch information
2 people authored and mochi-co committed Mar 18, 2024
1 parent 32c5770 commit 8466fd0
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 23 deletions.
38 changes: 22 additions & 16 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,8 @@ const (
)

var (
// DefaultServerCapabilities defines the default features and capabilities provided by the server.
DefaultServerCapabilities = &Capabilities{
MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions
MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over
ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client
MaximumQos: 2, // maximum qos value available to clients
RetainAvailable: 1, // retain messages is available
MaximumPacketSize: 0, // no maximum packet size
TopicAliasMaximum: math.MaxUint16, // maximum topic alias value
WildcardSubAvailable: 1, // wildcard subscriptions are available
SubIDAvailable: 1, // subscription identifiers are available
SharedSubAvailable: 1, // shared subscriptions are available
MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0)
MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client
}
// Deprecated: Use NewDefaultServerCapabilities to avoid data race issue.
DefaultServerCapabilities = NewDefaultServerCapabilities()

ErrListenerIDExists = errors.New("listener id already exists") // a listener with the same id already exists
ErrConnectionClosed = errors.New("connection not open") // connection is closed
Expand All @@ -74,6 +61,25 @@ type Capabilities struct {
SubIDAvailable byte `yaml:"sub_id_available" json:"sub_id_available"`
}

// NewDefaultServerCapabilities defines the default features and capabilities provided by the server.
func NewDefaultServerCapabilities() *Capabilities {
return &Capabilities{
MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over
MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client
MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions
MaximumPacketSize: 0, // no maximum packet size
maximumPacketID: math.MaxUint16,
ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client
TopicAliasMaximum: math.MaxUint16, // maximum topic alias value
SharedSubAvailable: 1, // shared subscriptions are available
MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0)
MaximumQos: 2, // maximum qos value available to clients
RetainAvailable: 1, // retain messages is available
WildcardSubAvailable: 1, // wildcard subscriptions are available
SubIDAvailable: 1, // subscription identifiers are available
}
}

// Compatibilities provides flags for using compatibility modes.
type Compatibilities struct {
ObscureNotAuthorized bool `yaml:"obscure_not_authorized" json:"obscure_not_authorized"` // return unspecified errors instead of not authorized
Expand Down Expand Up @@ -198,7 +204,7 @@ func New(opts *Options) *Server {
// ensureDefaults ensures that the server starts with sane default values, if none are provided.
func (o *Options) ensureDefaults() {
if o.Capabilities == nil {
o.Capabilities = DefaultServerCapabilities
o.Capabilities = NewDefaultServerCapabilities()
}

o.Capabilities.maximumPacketID = math.MaxUint16 // spec maximum is 65535
Expand Down
14 changes: 7 additions & 7 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,24 @@ func (h *DelayHook) OnDisconnect(cl *Client, err error, expire bool) {
}

func newServer() *Server {
cc := *DefaultServerCapabilities
cc := NewDefaultServerCapabilities()
cc.MaximumMessageExpiryInterval = 0
cc.ReceiveMaximum = 0
s := New(&Options{
Logger: logger,
Capabilities: &cc,
Capabilities: cc,
})
_ = s.AddHook(new(AllowHook), nil)
return s
}

func newServerWithInlineClient() *Server {
cc := *DefaultServerCapabilities
cc := NewDefaultServerCapabilities()
cc.MaximumMessageExpiryInterval = 0
cc.ReceiveMaximum = 0
s := New(&Options{
Logger: logger,
Capabilities: &cc,
Capabilities: cc,
InlineClient: true,
})
_ = s.AddHook(new(AllowHook), nil)
Expand All @@ -125,7 +125,7 @@ func TestOptionsSetDefaults(t *testing.T) {
opts.ensureDefaults()

require.Equal(t, defaultSysTopicInterval, opts.SysTopicResendInterval)
require.Equal(t, DefaultServerCapabilities, opts.Capabilities)
require.Equal(t, NewDefaultServerCapabilities(), opts.Capabilities)

opts = new(Options)
opts.ensureDefaults()
Expand Down Expand Up @@ -1662,10 +1662,10 @@ func TestServerProcessPublishACLCheckDeny(t *testing.T) {

for _, tx := range tt {
t.Run(tx.name, func(t *testing.T) {
cc := *DefaultServerCapabilities
cc := NewDefaultServerCapabilities()
s := New(&Options{
Logger: logger,
Capabilities: &cc,
Capabilities: cc,
})
_ = s.AddHook(new(DenyHook), nil)
_ = s.Serve()
Expand Down

0 comments on commit 8466fd0

Please sign in to comment.