Skip to content

Commit

Permalink
fix: rename minio adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
nicufk committed Jan 25, 2024
1 parent f8a884c commit e55b3da
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
26 changes: 13 additions & 13 deletions pkg/logs/adapter/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
defaultWriteSize = 1024 * 80 // 80KB
)

var _ Adapter = &MinioConsumer{}
var _ Adapter = &MinioAdapter{}

type ErrMinioConsumerDisconnected struct {
}
Expand Down Expand Up @@ -52,10 +52,10 @@ type BufferInfo struct {
}

// MinioConsumer creates new MinioSubscriber which will send data to local MinIO bucket
func NewMinioAdapter(endpoint, accessKeyID, secretAccessKey, region, token, bucket string, ssl, skipVerify bool, certFile, keyFile, caFile string) (*MinioConsumer, error) {
func NewMinioAdapter(endpoint, accessKeyID, secretAccessKey, region, token, bucket string, ssl, skipVerify bool, certFile, keyFile, caFile string) (*MinioAdapter, error) {
ctx := context.TODO()
opts := minioconnecter.GetTLSOptions(ssl, skipVerify, certFile, keyFile, caFile)
c := &MinioConsumer{
c := &MinioAdapter{
minioConnecter: minioconnecter.NewConnecter(endpoint, accessKeyID, secretAccessKey, region, token, bucket, log.DefaultLogger, opts...),
Log: log.DefaultLogger,
bucket: bucket,
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewMinioAdapter(endpoint, accessKeyID, secretAccessKey, region, token, buck
return c, nil
}

type MinioConsumer struct {
type MinioAdapter struct {
minioConnecter *minioconnecter.Connecter
minioClient *minio.Client
bucket string
Expand All @@ -98,7 +98,7 @@ type MinioConsumer struct {
mapLock sync.RWMutex
}

func (s *MinioConsumer) Notify(id string, e events.Log) error {
func (s *MinioAdapter) Notify(id string, e events.Log) error {
s.Log.Debugw("minio consumer notify", "id", id, "event", e)
if s.disconnected {
s.Log.Debugw("minio consumer disconnected", "id", id)
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *MinioConsumer) Notify(id string, e events.Log) error {
return nil
}

func (s *MinioConsumer) putData(ctx context.Context, name string, buffer *bytes.Buffer) {
func (s *MinioAdapter) putData(ctx context.Context, name string, buffer *bytes.Buffer) {
if buffer != nil && buffer.Len() != 0 {
_, err := s.minioClient.PutObject(ctx, s.bucket, name, buffer, int64(buffer.Len()), minio.PutObjectOptions{ContentType: "application/octet-stream"})
if err != nil {
Expand All @@ -153,7 +153,7 @@ func (s *MinioConsumer) putData(ctx context.Context, name string, buffer *bytes.

}

func (s *MinioConsumer) combineData(ctxt context.Context, minioClient *minio.Client, id string, parts int, deleteIntermediaryData bool) error {
func (s *MinioAdapter) combineData(ctxt context.Context, minioClient *minio.Client, id string, parts int, deleteIntermediaryData bool) error {
var returnedError []error
returnedError = nil
buffer := bytes.NewBuffer(make([]byte, 0, parts*defaultBufferSize))
Expand Down Expand Up @@ -198,12 +198,12 @@ func (s *MinioConsumer) combineData(ctxt context.Context, minioClient *minio.Cli
return fmt.Errorf("executed with errors: %v", returnedError)
}

func (s *MinioConsumer) objectExists(objectName string) bool {
func (s *MinioAdapter) objectExists(objectName string) bool {
_, err := s.minioClient.StatObject(context.Background(), s.bucket, objectName, minio.StatObjectOptions{})
return err == nil
}

func (s *MinioConsumer) Stop(id string) error {
func (s *MinioAdapter) Stop(id string) error {
s.Log.Debugw("minio consumer stop", "id", id)
ctx := context.TODO()
buffInfo, ok := s.GetBuffInfo(id)
Expand All @@ -217,24 +217,24 @@ func (s *MinioConsumer) Stop(id string) error {
return s.combineData(ctx, s.minioClient, id, parts, true)
}

func (s *MinioConsumer) Name() string {
func (s *MinioAdapter) Name() string {
return "minio"
}

func (s *MinioConsumer) GetBuffInfo(id string) (BufferInfo, bool) {
func (s *MinioAdapter) GetBuffInfo(id string) (BufferInfo, bool) {
s.mapLock.RLock()
defer s.mapLock.RUnlock()
buffInfo, ok := s.buffInfos[id]
return buffInfo, ok
}

func (s *MinioConsumer) UpdateBuffInfo(id string, buffInfo BufferInfo) {
func (s *MinioAdapter) UpdateBuffInfo(id string, buffInfo BufferInfo) {
s.mapLock.Lock()
defer s.mapLock.Unlock()
s.buffInfos[id] = buffInfo
}

func (s *MinioConsumer) DeleteBuffInfo(id string) {
func (s *MinioAdapter) DeleteBuffInfo(id string) {
s.mapLock.Lock()
defer s.mapLock.Unlock()
delete(s.buffInfos, id)
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/adapter/minio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func BenchmarkLogs2(b *testing.B) {
wg.Wait()
}

func testOneConsumer(consumer *MinioConsumer, id string) {
func testOneConsumer(consumer *MinioAdapter, id string) {
fmt.Println("#####starting", id)
totalSize := 0
numberOFLogs := rand.Intn(100000)
Expand Down Expand Up @@ -164,7 +164,7 @@ func DoRunBenchmark() {
verifyConsumer(idChan, bucket, consumer.minioClient)
}

func DoRunBenchmark2(idChan chan string, numberOfConsumers int, consumer *MinioConsumer) {
func DoRunBenchmark2(idChan chan string, numberOfConsumers int, consumer *MinioAdapter) {
var counter atomic.Int32
var wg sync.WaitGroup
for i := 0; i < numberOfConsumers; i++ {
Expand Down

0 comments on commit e55b3da

Please sign in to comment.