Skip to content

Commit

Permalink
Merge branch 'main' into file-based-config
Browse files Browse the repository at this point in the history
  • Loading branch information
mochi-co authored Jan 11, 2024
2 parents fedfb92 + 83db7ff commit 32c5770
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
15 changes: 9 additions & 6 deletions clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
return packets.ErrPacketTooLarge // [MQTT-3.1.2-24] [MQTT-3.1.2-25]
}

n, err := func() (n int64, err error) {
n, err := func() (int64, error) {
cl.Lock()
defer cl.Unlock()
if len(cl.State.outbound) == 0 {
Expand All @@ -591,23 +591,26 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
}

// first write to buffer, then flush buffer
n, _ = buf.WriteTo(cl.Net.outbuf) // will always be successful
n, _ := cl.Net.outbuf.Write(buf.Bytes()) // will always be successful
err = cl.flushOutbuf()
return
return int64(n), err
}

// there are more writes in the queue
if cl.Net.outbuf == nil {
if buf.Len() >= cl.ops.options.ClientNetWriteBufferSize {
return buf.WriteTo(cl.Net.Conn)
}
cl.Net.outbuf = new(bytes.Buffer)
}

n, _ = buf.WriteTo(cl.Net.outbuf) // will always be successful
n, _ := cl.Net.outbuf.Write(buf.Bytes()) // will always be successful
if cl.Net.outbuf.Len() < cl.ops.options.ClientNetWriteBufferSize {
return
return int64(n), nil
}

err = cl.flushOutbuf()
return
return int64(n), err
}()
if err != nil {
return err
Expand Down
24 changes: 12 additions & 12 deletions packets/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (pk *Packet) ConnectEncode(buf *bytes.Buffer) error {

pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())

return nil
}
Expand Down Expand Up @@ -512,7 +512,8 @@ func (pk *Packet) ConnackEncode(buf *bytes.Buffer) error {

pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())

return nil
}

Expand Down Expand Up @@ -557,7 +558,7 @@ func (pk *Packet) DisconnectEncode(buf *bytes.Buffer) error {

pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())

return nil
}
Expand Down Expand Up @@ -628,7 +629,7 @@ func (pk *Packet) PublishEncode(buf *bytes.Buffer) error {

pk.FixedHeader.Remaining = nb.Len() + len(pk.Payload)
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
buf.Write(pk.Payload)

return nil
Expand Down Expand Up @@ -719,7 +720,7 @@ func (pk *Packet) encodePubAckRelRecComp(buf *bytes.Buffer) error {

pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}

Expand Down Expand Up @@ -858,7 +859,7 @@ func (pk *Packet) SubackEncode(buf *bytes.Buffer) error {

pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())

return nil
}
Expand Down Expand Up @@ -918,7 +919,7 @@ func (pk *Packet) SubscribeEncode(buf *bytes.Buffer) error {

pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())

return nil
}
Expand Down Expand Up @@ -1009,13 +1010,12 @@ func (pk *Packet) UnsubackEncode(buf *bytes.Buffer) error {
defer mempool.PutBuffer(pb)
pk.Properties.Encode(pk.FixedHeader.Type, pk.Mods, pb, nb.Len())
nb.Write(pb.Bytes())
nb.Write(pk.ReasonCodes)
}

nb.Write(pk.ReasonCodes)

pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())

return nil
}
Expand Down Expand Up @@ -1071,7 +1071,7 @@ func (pk *Packet) UnsubscribeEncode(buf *bytes.Buffer) error {

pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())

return nil
}
Expand Down Expand Up @@ -1133,7 +1133,7 @@ func (pk *Packet) AuthEncode(buf *bytes.Buffer) error {

pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}

Expand Down
12 changes: 8 additions & 4 deletions packets/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"bytes"
"fmt"
"strings"

"github.com/mochi-mqtt/server/v2/mempool"
)

const (
Expand Down Expand Up @@ -199,7 +201,8 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
return
}

var buf bytes.Buffer
buf := mempool.GetBuffer()
defer mempool.PutBuffer(buf)
if p.canEncode(pkt, PropPayloadFormat) && p.PayloadFormatFlag {
buf.WriteByte(PropPayloadFormat)
buf.WriteByte(p.PayloadFormat)
Expand Down Expand Up @@ -230,7 +233,7 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
for _, v := range p.SubscriptionIdentifier {
if v > 0 {
buf.WriteByte(PropSubscriptionIdentifier)
encodeLength(&buf, int64(v))
encodeLength(buf, int64(v))
}
}
}
Expand Down Expand Up @@ -321,7 +324,8 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
}

if !mods.DisallowProblemInfo && p.canEncode(pkt, PropUser) {
pb := bytes.NewBuffer([]byte{})
pb := mempool.GetBuffer()
defer mempool.PutBuffer(pb)
for _, v := range p.User {
pb.WriteByte(PropUser)
pb.Write(encodeString(v.Key))
Expand Down Expand Up @@ -355,7 +359,7 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
}

encodeLength(b, int64(buf.Len()))
_, _ = buf.WriteTo(b) // [MQTT-3.1.3-10]
b.Write(buf.Bytes()) // [MQTT-3.1.3-10]
}

// Decode decodes property bytes into a properties struct.
Expand Down

0 comments on commit 32c5770

Please sign in to comment.