Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP-67: Support skipping compaction at busy times #4385

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.apache.bookkeeper.common.collections;

import java.util.ArrayList;
import java.util.List;

/**
* Represents a range of hours.
* <p> The range is defined as a comma-separated list of ranges. Each range is defined as a start hour and an end hour
* separated by a dash. For example, "0-5, 10-15" represents the range of hours from 0 to 5 and 10 to 15.
* <p> The range is inclusive, i.e. the start and end hours are included in the range.
* <p> The range is defined in 24-hour format, i.e. the hours are in the range of 0 to 23.
* <p> we require that the start hour is less than or equal to the end hour.
*/
public class HourRange {
private static final String RANGE_SEPARATOR = ",";
private static final String RANGE_PART_SEPARATOR = "-";
private List<Range> ranges;

public HourRange(String range) {
ranges = new ArrayList<>();
if (range == null || range.isEmpty()) {
return;
}
String[] parts = range.split(RANGE_SEPARATOR);
for (String part : parts) {
String[] rangeParts = part.split(RANGE_PART_SEPARATOR);
if (rangeParts.length != 2) {
throw new IllegalArgumentException("Invalid range: " + part);
}
int startHour = Integer.parseInt(rangeParts[0]);
int endHour = Integer.parseInt(rangeParts[1]);
ranges.add(new Range(startHour, endHour));
}
}

public boolean contains(int hour) {
for (Range range : ranges) {
if (range.contains(hour)) {
return true;
}
}
return false;
}

public static HourRange parse(String range) {
return new HourRange(range);
}
}

class Range {
private int startHour;
private int endHour;

public Range(int startHour, int endHour) {
if (startHour > endHour | startHour < 0 || startHour > 23 || endHour < 0 || endHour > 23) {
throw new IllegalArgumentException("Invalid hour range: " + startHour + " - " + endHour);
}
this.startHour = startHour;
this.endHour = endHour;
}

public boolean contains(int hour) {
return hour >= startHour && hour <= endHour;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.Executors;
Expand All @@ -43,6 +44,7 @@
import org.apache.bookkeeper.bookie.stats.GarbageCollectorStats;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.bookie.storage.ldb.PersistentEntryLogMetadataMap;
import org.apache.bookkeeper.common.collections.HourRange;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
Expand Down Expand Up @@ -125,6 +127,7 @@ public class GarbageCollectorThread implements Runnable {

private static final AtomicLong threadNum = new AtomicLong(0);
final AbstractLogCompactor.Throttler throttler;
final HourRange skipCompactionHourRange;

/**
* Create a garbage collector thread.
Expand Down Expand Up @@ -223,6 +226,7 @@ public void removeEntryLog(long logToRemove) {
}

this.throttler = new AbstractLogCompactor.Throttler(conf);
this.skipCompactionHourRange = conf.getSkipCompactionHourRange();
if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) {
if (minorCompactionThreshold > 1.0d) {
throw new IOException("Invalid minor compaction threshold "
Expand Down Expand Up @@ -430,7 +434,7 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
long curTime = System.currentTimeMillis();
if (((isForceMajorCompactionAllow && force) || (enableMajorCompaction
&& (force || curTime - lastMajorCompactionTime > majorCompactionInterval)))
&& (!suspendMajor)) {
&& (!suspendMajor) && (!skipCompactionHourRange.contains(LocalTime.now().getHour()))) {
// enter major compaction
LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
majorCompacting.set(true);
Expand All @@ -445,7 +449,7 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
}
} else if (((isForceMinorCompactionAllow && force) || (enableMinorCompaction
&& (force || curTime - lastMinorCompactionTime > minorCompactionInterval)))
&& (!suspendMinor)) {
&& (!suspendMinor) && (!skipCompactionHourRange.contains(LocalTime.now().getHour()))) {
// enter minor compaction
LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
minorCompacting.set(true);
Expand Down Expand Up @@ -577,7 +581,7 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet
}
if ((usage >= threshold
|| (maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis)
|| !running)) {
|| !running || skipCompactionHourRange.contains(LocalTime.now().getHour()))) {
// We allow the usage limit calculation to continue so that we get an accurate
// report of where the usage was prior to running compaction.
return;
Expand Down Expand Up @@ -607,7 +611,8 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet
timeDiff.setValue(end.getValue() - start);
}

if ((maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis) || !running) {
if ((maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis) || !running
|| skipCompactionHourRange.contains(LocalTime.now().getHour())) {
// We allow the usage limit calculation to continue so that we get an accurate
// report of where the usage was prior to running compaction.
break stopCompaction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.SortedLedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.common.collections.HourRange;
import org.apache.bookkeeper.common.conf.ConfigDef;
import org.apache.bookkeeper.common.conf.ConfigException;
import org.apache.bookkeeper.common.conf.ConfigKey;
Expand Down Expand Up @@ -105,6 +106,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String COMPACTION_RATE = "compactionRate";
protected static final String COMPACTION_RATE_BY_ENTRIES = "compactionRateByEntries";
protected static final String COMPACTION_RATE_BY_BYTES = "compactionRateByBytes";
protected static final String SKIP_COMPACTION_HOUR_RANGE = "skipCompactionHourRange";

// Gc Parameters
protected static final String GC_WAIT_TIME = "gcWaitTime";
Expand Down Expand Up @@ -2973,6 +2975,25 @@ public ServerConfiguration setCompactionRateByBytes(int rate) {
return this;
}

/**
* Get the hour range to skip compaction.
* @return hour range to skip compaction
*/
public HourRange getSkipCompactionHourRange() {
String range = getString(SKIP_COMPACTION_HOUR_RANGE, "");
return HourRange.parse(range);
}

/**
* Set the hour range to skip compaction.
* @param range
* @return ServerConfiguration
*/
public ServerConfiguration setSkipCompactionHourRange(String range) {
setProperty(SKIP_COMPACTION_HOUR_RANGE, range);
return this;
}

/**
* Should we remove pages from page cache after force write.
*
Expand Down