From 2e6ef5ef0dcd06a89a7a768038e79a7c9796a169 Mon Sep 17 00:00:00 2001 From: triump2020 Date: Tue, 29 Oct 2024 19:47:48 +0800 Subject: [PATCH 1/2] update --- pkg/vm/engine/disttae/logtail_consumer.go | 14 ++++++---- .../engine/disttae/logtailreplay/partition.go | 28 ++++++------------- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/pkg/vm/engine/disttae/logtail_consumer.go b/pkg/vm/engine/disttae/logtail_consumer.go index 31bc4e3c7c3f..29a15135817f 100644 --- a/pkg/vm/engine/disttae/logtail_consumer.go +++ b/pkg/vm/engine/disttae/logtail_consumer.go @@ -1891,8 +1891,6 @@ func updatePartitionOfPush( if len(tl.CkpLocation) > 0 { t0 = time.Now() ckpStart, ckpEnd = parseCkpDuration(tl) - state.CacheCkpDuration(ckpStart, partition) - state.AppendCheckpoint(tl.CkpLocation, partition) v2.LogtailUpdatePartitonHandleCheckpointDurationHistogram.Observe(time.Since(t0).Seconds()) } @@ -1925,20 +1923,24 @@ func updatePartitionOfPush( } //After consume checkpoints finished ,then update the start and end of - //the mo system table's partition and catalog. - if !lazyLoad { + //the table's partition and catalog cache. + if isSub { if len(tl.CkpLocation) != 0 { if !ckpStart.IsEmpty() || !ckpEnd.IsEmpty() { t0 = time.Now() state.UpdateDuration(ckpStart, types.MaxTs()) //Notice that the checkpoint duration is same among all mo system tables, //such as mo_databases, mo_tables, mo_columns. - e.GetLatestCatalogCache().UpdateDuration(ckpStart, types.MaxTs()) + if !lazyLoad { + e.GetLatestCatalogCache().UpdateDuration(ckpStart, types.MaxTs()) + } v2.LogtailUpdatePartitonUpdateTimestampsDurationHistogram.Observe(time.Since(t0).Seconds()) } } else { state.UpdateDuration(types.TS{}, types.MaxTs()) - e.GetLatestCatalogCache().UpdateDuration(types.TS{}, types.MaxTs()) + if !lazyLoad { + e.GetLatestCatalogCache().UpdateDuration(types.TS{}, types.MaxTs()) + } } } diff --git a/pkg/vm/engine/disttae/logtailreplay/partition.go b/pkg/vm/engine/disttae/logtailreplay/partition.go index 52e86a9c8711..815d7e25bad1 100644 --- a/pkg/vm/engine/disttae/logtailreplay/partition.go +++ b/pkg/vm/engine/disttae/logtailreplay/partition.go @@ -158,11 +158,10 @@ func (p *Partition) ConsumeCheckpoints( return nil } - //curState := p.state.Load() - //if len(curState.checkpoints) == 0 { - // p.UpdateDuration(types.TS{}, types.MaxTs()) - // return nil - //} + curState := p.state.Load() + if len(curState.checkpoints) == 0 { + return nil + } lockErr := p.Lock(ctx) if lockErr != nil { @@ -170,29 +169,18 @@ func (p *Partition) ConsumeCheckpoints( } defer p.Unlock() - curState := p.state.Load() - //if len(curState.checkpoints) == 0 { - // p.UpdateDuration(types.TS{}, types.MaxTs()) - // return nil - //} - - state := curState.Copy() - - if len(state.checkpoints) == 0 { - state.UpdateDuration(types.TS{}, types.MaxTs()) - if !p.state.CompareAndSwap(curState, state) { - panic("concurrent mutation") - } + curState = p.state.Load() + if len(curState.checkpoints) == 0 { return nil } + state := curState.Copy() + //consume checkpoints. if err := state.consumeCheckpoints(fn); err != nil { return err } - state.UpdateDuration(state.start, types.MaxTs()) - if !p.state.CompareAndSwap(curState, state) { panic("concurrent mutation") } From 9a96d86322d86562186cf88af072809c573a6915 Mon Sep 17 00:00:00 2001 From: triump2020 Date: Tue, 29 Oct 2024 21:07:35 +0800 Subject: [PATCH 2/2] updadate --- pkg/vm/engine/disttae/logtail_consumer.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/vm/engine/disttae/logtail_consumer.go b/pkg/vm/engine/disttae/logtail_consumer.go index 29a15135817f..478a9ecadb1f 100644 --- a/pkg/vm/engine/disttae/logtail_consumer.go +++ b/pkg/vm/engine/disttae/logtail_consumer.go @@ -1929,9 +1929,11 @@ func updatePartitionOfPush( if !ckpStart.IsEmpty() || !ckpEnd.IsEmpty() { t0 = time.Now() state.UpdateDuration(ckpStart, types.MaxTs()) - //Notice that the checkpoint duration is same among all mo system tables, - //such as mo_databases, mo_tables, mo_columns. - if !lazyLoad { + if lazyLoad { + state.AppendCheckpoint(tl.CkpLocation, partition) + } else { + //Notice that the checkpoint duration is same among all mo system tables, + //such as mo_databases, mo_tables, mo_columns. e.GetLatestCatalogCache().UpdateDuration(ckpStart, types.MaxTs()) } v2.LogtailUpdatePartitonUpdateTimestampsDurationHistogram.Observe(time.Since(t0).Seconds())