Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency issues when reading and writing retain messages #200

Open
yh742 opened this issue Apr 17, 2023 · 4 comments
Open

Concurrency issues when reading and writing retain messages #200

yh742 opened this issue Apr 17, 2023 · 4 comments
Assignees
Labels
discussion Something to be discussed help wanted Extra attention is needed needs investigation Identify cause / reproduce issue

Comments

@yh742
Copy link

yh742 commented Apr 17, 2023

Hi there,

I'm trying to embed the broker into an application, but I've noticed while testing the library, the race detector goes off when publishing and subscribing to retain messages at the same time. I believe the issue is that there's no locks here:

https://github.com/mochi-co/mqtt/blob/7bd7bd5087c40f96015a61d01ce7ae4e6951c807/topics.go#L426-L428

And there's a lock here:

https://github.com/mochi-co/mqtt/blob/7bd7bd5087c40f96015a61d01ce7ae4e6951c807/topics.go#L354-L377

@yh742 yh742 changed the title Concurrency Issues Concurrency issues when reading and writing retain messages Apr 17, 2023
@mochi-co mochi-co added the needs investigation Identify cause / reproduce issue label Apr 21, 2023
@mochi-co
Copy link
Collaborator

Hi @yh742 - could you paste the race condition logs so I can have a closer look? Thanks!

@yh742
Copy link
Author

yh742 commented Apr 22, 2023

Sure, I have attached the log. For a bit of context, I'm trying to inject a retain message to a topic when a device connects and remove the retain message when disconnection occurs.

WARNING: DATA RACE
Write at 0x00c0000251e8 by goroutine 100:
  github.com/mochi-co/mqtt/v2.(*TopicsIndex).RetainMessage()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/topics.go:372 +0x37d
  github.com/mochi-co/mqtt/v2.(*Server).retainMessage()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:758 +0xe4
  github.com/mochi-co/mqtt/v2.(*Server).processPublish()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:715 +0x769
  github.com/mochi-co/mqtt/v2.(*Server).processPacket()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:569 +0x2a7
  github.com/mochi-co/mqtt/v2.(*Server).InjectPacket()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:658 +0xa4
  github.com/mochi-co/mqtt/v2.(*Server).Publish()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:641 +0x184
  github.com/mochi-co/mqtt/v2.(*Server).Publish-fm()
      <autogenerated>:1 +0x99
  github.com/yh742/halbroker/hook/auth/userpass.(*Hook).OnDisconnect()
      /home/sean/Documents/projects/i4/halbroker/hook/auth/userpass/hook.go:234 +0x673
  github.com/mochi-co/mqtt/v2.(*Hooks).OnDisconnect()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/hooks.go:237 +0x13b
  github.com/mochi-co/mqtt/v2.(*Server).attachClient()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:376 +0xc8c
  github.com/mochi-co/mqtt/v2.(*Server).EstablishConnection()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:312 +0x88
  github.com/mochi-co/mqtt/v2.(*Server).EstablishConnection-fm()
      <autogenerated>:1 +0x6d
  github.com/mochi-co/mqtt/v2/listeners.(*TCP).Serve.func1()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/listeners/tcp.go:84 +0x8c

Previous read at 0x00c0000251e8 by goroutine 92:
  github.com/mochi-co/mqtt/v2.(*TopicsIndex).scanMessages()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/topics.go:455 +0x8da
  github.com/mochi-co/mqtt/v2.(*TopicsIndex).scanMessages()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/topics.go:463 +0xb72
  github.com/mochi-co/mqtt/v2.(*TopicsIndex).scanMessages()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/topics.go:463 +0xb72
  github.com/mochi-co/mqtt/v2.(*TopicsIndex).scanMessages()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/topics.go:463 +0xb72
  github.com/mochi-co/mqtt/v2.(*TopicsIndex).scanMessages()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/topics.go:463 +0xb72
  github.com/mochi-co/mqtt/v2.(*TopicsIndex).scanMessages()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/topics.go:471 +0x470
  github.com/mochi-co/mqtt/v2.(*TopicsIndex).Messages()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/topics.go:427 +0xdb
  github.com/mochi-co/mqtt/v2.(*Server).publishRetainedToClient()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:876 +0x90
  github.com/mochi-co/mqtt/v2.(*Server).processSubscribe()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:1055 +0x59c
  github.com/mochi-co/mqtt/v2.(*Server).processPacket()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:583 +0xf07
  github.com/mochi-co/mqtt/v2.(*Server).receivePacket()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:411 +0x87
  github.com/mochi-co/mqtt/v2.(*Server).receivePacket-fm()
      <autogenerated>:1 +0x89
  github.com/mochi-co/mqtt/v2.(*Client).Read()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/clients.go:349 +0x19c
  github.com/mochi-co/mqtt/v2.(*Server).attachClient()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:366 +0x9c4
  github.com/mochi-co/mqtt/v2.(*Server).EstablishConnection()
      /home/sean/go/pkg/mod/github.com/mochi-co/mqtt/[email protected]/server.go:312 +0x88
  github.com/mochi-co/mqtt/v2.(*Server).EstablishConnection-fm()
      <autogenerated>:1 +0x6d
  github.com/mochi-co/mqtt/v2/listeners.(*TCP).Serve.func1()
      /home/sean/go/pkg/m

@mochi-co mochi-co self-assigned this May 4, 2023
@thedevop
Copy link
Collaborator

thedevop commented Jun 12, 2023

@mochi-co , although I haven't dive deep into the topics code, but after looking at it to add #236, I do see race conditions here.

The issue is that although the particles and particle methods are protected by the lock, but subsequent operations on the result are not. Here are some examples:

  1. In TopicsIndex.scanMessages:
    https://github.com/mochi-co/mqtt/blob/af79b55b9f56ea49a09fea0450d3062ddbd0c13b/topics.go#L449-L464
    n.particles.getAll() is protected by lock, but the for loop itself is not. Some of the particle returned in the map maybe deleted while it is iterating through the map.
  2. Similar issue exist in TopicsIndex.scanSubscribers.

Given both of these are only called by their respective exported method, adding lock in Messages and Subscribers as suggested by @yh742 can address this. There may be performance impact though.

@mochi-co mochi-co added the discussion Something to be discussed label Jun 19, 2023
@mochi-co
Copy link
Collaborator

@thedevop I did suspect this may be the case when I wrote it but I was hoping that it would be rare enough not to cause an issue - apparently not.

@yh742 I tried to replicate this but was unable to trigger the race condition error. It's possible I'm performing the operations in the wrong order or in insufficient scale. Do you (or @thedevop) have any example code (test or main.go) I can run to reproduce this? That would make it easier to determine the correct solution.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Something to be discussed help wanted Extra attention is needed needs investigation Identify cause / reproduce issue
Projects
None yet
Development

No branches or pull requests

3 participants