We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
for { needFlush := false needSavePos := false select { case v := <-r.syncCh: switch v := v.(type) { case posSaver: now := time.Now() if v.force || now.Sub(lastSavedTime) > 3*time.Second { lastSavedTime = now needFlush = true needSavePos = true pos = v.pos } case []*elastic.BulkRequest: reqs = append(reqs, v...) needFlush = len(reqs) >= bulkSize } case <-ticker.C: needFlush = true case <-r.ctx.Done(): return } if needFlush { // TODO: retry some times? if err := r.doBulk(reqs); err != nil { log.Errorf("do ES bulk err %v, close sync", err) r.cancel() return } reqs = reqs[0:0] } if needSavePos { if err := r.master.Save(pos); err != nil { log.Errorf("save sync position %s err %v, close sync", pos, err) r.cancel() return } } }
When an error occurs, exit directly, r.syncCh will be blocked; But there is no alarm; So consider add an alarm.
r.syncCh
The text was updated successfully, but these errors were encountered:
No branches or pull requests
When an error occurs, exit directly,
r.syncCh
will be blocked; But there is no alarm; So consider add an alarm.The text was updated successfully, but these errors were encountered: