Skip to content

Commit

Permalink
chore: support pg pitr (#361)
Browse files Browse the repository at this point in the history
(cherry picked from commit d808afb)
  • Loading branch information
wangyelei committed Mar 11, 2024
1 parent 42bed33 commit c4749e2
Show file tree
Hide file tree
Showing 6 changed files with 463 additions and 0 deletions.
119 changes: 119 additions & 0 deletions addons/postgresql/dataprotection/common-scripts.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# log info file
function DP_log() {
msg=$1
local curr_date=$(date -u '+%Y-%m-%d %H:%M:%S')
echo "${curr_date} INFO: $msg"
}

# log error info
function DP_error_log() {
msg=$1
local curr_date=$(date -u '+%Y-%m-%d %H:%M:%S')
echo "${curr_date} ERROR: $msg"
}

# Get file names without extensions based on the incoming file path
function DP_get_file_name_without_ext() {
local fileName=$1
local file_without_ext=${fileName%.*}
echo $(basename ${file_without_ext})
}

# Save backup status info file for syncing progress.
# timeFormat: %Y-%m-%dT%H:%M:%SZ
function DP_save_backup_status_info() {
local totalSize=$1
local startTime=$2
local stopTime=$3
local timeZone=$4
local extras=$5
local timeZoneStr=""
if [ ! -z ${timeZone} ]; then
timeZoneStr=",\"timeZone\":\"${timeZone}\""
fi
if [ -z "${stopTime}" ];then
echo "{\"totalSize\":\"${totalSize}\"}" > ${DP_BACKUP_INFO_FILE}
elif [ -z "${startTime}" ];then
echo "{\"totalSize\":\"${totalSize}\",\"extras\":[${extras}],\"timeRange\":{\"end\":\"${stopTime}\"${timeZoneStr}}}" > ${DP_BACKUP_INFO_FILE}
else
echo "{\"totalSize\":\"${totalSize}\",\"extras\":[${extras}],\"timeRange\":{\"start\":\"${startTime}\",\"end\":\"${stopTime}\"${timeZoneStr}}}" > ${DP_BACKUP_INFO_FILE}
fi
}


# Clean up expired logfiles.
# Default interval is 60s
# Default rootPath is /
function DP_purge_expired_files() {
local currentUnix="${1:?missing current unix}"
local last_purge_time="${2:?missing last_purge_time}"
local root_path=${3:-"/"}
local interval_seconds=${4:-60}
local diff_time=$((${currentUnix}-${last_purge_time}))
if [[ -z ${DP_TTL_SECONDS} || ${diff_time} -lt ${interval_seconds} ]]; then
return
fi
expiredUnix=$((${currentUnix}-${DP_TTL_SECONDS}))
files=$(datasafed list -f --recursive --older-than ${expiredUnix} ${root_path} )
for file in ${files[@]}
do
datasafed rm ${file}
echo ${file}
done
}

# analyze the start time of the earliest file from the datasafed backend.
# Then record the file name into dp_oldest_file.info.
# If the oldest file is no changed, exit the process.
# This can save traffic consumption.
function DP_analyze_start_time_from_datasafed() {
local oldest_file="${1:?missing oldest file}"
local get_start_time_from_file="${2:?missing get_start_time_from_file function}"
local datasafed_pull="${3:?missing datasafed_pull function}"
local info_file="${KB_BACKUP_WORKDIR}/dp_oldest_file.info"
mkdir -p ${KB_BACKUP_WORKDIR} && cd ${KB_BACKUP_WORKDIR}
if [ -f ${info_file} ]; then
last_oldest_file=$(cat ${info_file})
last_oldest_file_name=$(DP_get_file_name_without_ext ${last_oldest_file})
if [ "$last_oldest_file" == "${oldest_file}" ]; then
# oldest file no changed.
${get_start_time_from_file} $last_oldest_file_name
return
fi
# remove last oldest file
if [ -f ${last_oldest_file_name} ];then
rm -rf ${last_oldest_file_name}
fi
fi
# pull file
${datasafed_pull} ${oldest_file}
# record last oldest file
echo ${oldest_file} > ${info_file}
oldest_file_name=$(DP_get_file_name_without_ext ${oldest_file})
${get_start_time_from_file} ${oldest_file_name}
}

# get the timeZone offset for location, such as Asia/Shanghai
function getTimeZoneOffset() {
local timeZone=${1:?missing time zone}
if [[ $timeZone == "+"* ]] || [[ $timeZone == "-"* ]] ; then
echo ${timeZone}
return
fi
local currTime=$(TZ=UTC date)
local utcHour=$(TZ=UTC date -d "${currTime}" +"%H")
local zoneHour=$(TZ=${timeZone} date -d "${currTime}" +"%H")
local offset=$((${zoneHour}-${utcHour}))
if [ $offset -eq 0 ]; then
return
fi
symbol="+"
if [ $offset -lt 0 ]; then
symbol="-" && offset=${offset:1}
fi
if [ $offset -lt 10 ];then
offset="0${offset}"
fi
echo "${symbol}${offset}:00"
}

54 changes: 54 additions & 0 deletions addons/postgresql/dataprotection/postgresql-fetch-wal-log.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@

function get_wal_name() {
local fileName=$1
local file_without_ext=${fileName%.*}
echo $(basename $file_without_ext)
}

function fetch-wal-log(){
wal_destination_dir=$1
start_wal_name=$2
restore_time=`date -d "$3" +%s`
pitr=$4
DP_log "PITR: $pitr"

exit_fetch_wal=0 && mkdir -p $wal_destination_dir
for dir_name in $(datasafed list /) ; do
if [[ $exit_fetch_wal -eq 1 ]]; then
exit 0
fi

# check if the latest_wal_log after the start_wal_log
latest_wal=$(datasafed list ${dir_name} | tail -n 1)
latest_wal_name=$(get_wal_name ${latest_wal})
if [[ ${latest_wal_name} < $start_wal_name ]]; then
continue
fi

DP_log "start to fetch wal logs from ${dir_name}"
for file in $(datasafed list ${dir_name} | grep ".zst"); do
wal_name=$(get_wal_name ${file})
if [[ $wal_name < $start_wal_name ]]; then
continue
fi
if [[ $pitr != "true" && $file =~ ".history" ]]; then
# if not restored for pitr, only fetch the current timeline log
DP_log "exit for new timeline."
exit_fetch_wal=1
break
fi
DP_log "copying $wal_name"
# pull and decompress
datasafed pull -d zstd $file ${wal_destination_dir}/$wal_name

# check if the wal_log contains the restore_time logs. if ture, stop fetching
latest_commit_time=$(pg_waldump ${wal_destination_dir}/$wal_name --rmgr=Transaction 2>/dev/null |tail -n 1|awk -F ' COMMIT ' '{print $2}'|awk -F ';' '{print $1}')
timestamp=`date -d "$latest_commit_time" +%s`
if [[ $latest_commit_time != "" && $timestamp > $restore_time ]]; then
DP_log "exit when reaching the target time log."
exit_fetch_wal=1
break
fi
done
done
}
154 changes: 154 additions & 0 deletions addons/postgresql/dataprotection/postgresql-pitr-backup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
export PGPASSWORD=${DP_DB_PASSWORD}
# use datasafed and default config
export WALG_DATASAFED_CONFIG=""
export PATH="$PATH:$DP_DATASAFED_BIN_PATH"
export DATASAFED_BACKEND_BASE_PATH="$DP_BACKUP_BASE_PATH"
export KB_BACKUP_WORKDIR=${VOLUME_DATA_DIR}/kb-backup

PSQL="psql -h ${DP_DB_HOST} -U ${DP_DB_USER} -d postgres"
global_last_switch_wal_time=$(date +%s)
global_last_purge_time=$(date +%s)
global_switch_wal_interval=300
global_stop_time=
global_old_size=0

if [[ ${SWITCH_WAL_INTERVAL_SECONDS} =~ ^[0-9]+$ ]];then
global_switch_wal_interval=${SWITCH_WAL_INTERVAL_SECONDS}
fi

global_backup_in_secondary=
if [ "${TARGET_POD_ROLE}" == "primary" ]; then
global_backup_in_secondary=f
elif [ "${TARGET_POD_ROLE}" == "secondary" ]; then
global_backup_in_secondary=t
fi

# clean up expired logfiles, interval is 600s
function purge_expired_files() {
local currentUnix=$(date +%s)
info=$(DP_purge_expired_files ${currentUnix} ${global_last_purge_time} / 600)
if [ ! -z "${info}" ]; then
global_last_purge_time=${currentUnix}
DP_log "cleanup expired wal-log files: ${info}"
local TOTAL_SIZE=$(datasafed stat / | grep TotalSize | awk '{print $2}')
DP_save_backup_status_info "${TOTAL_SIZE}"
fi
}

# switch wal log
function switch_wal_log() {
local curr_time=$(date +%s)
local diff_time=$((${curr_time}-${global_last_switch_wal_time}))
if [[ ${diff_time} -lt ${global_switch_wal_interval} ]]; then
return
fi
LAST_TRANS=$(pg_waldump $(${PSQL} -Atc "select pg_walfile_name(pg_current_wal_lsn())") --rmgr=Transaction 2>/dev/null |tail -n 1)
if [ "${LAST_TRANS}" != "" ] && [ "$(find ${LOG_DIR}/archive_status/ -name '*.ready')" = "" ]; then
DP_log "start to switch wal file"
${PSQL} -c "select pg_switch_wal()"
for i in $(seq 1 60); do
if [ "$(find ${LOG_DIR}/archive_status/ -name '*.ready')" != "" ]; then
DP_log "switch wal file successfully"
break;
fi
sleep 1
done
fi
global_last_switch_wal_time=${curr_time}
}

# upload wal log
function upload_wal_log() {
local TODAY_INCR_LOG=$(date +%Y%m%d);
cd ${LOG_DIR}
for i in $(ls -tr ./archive_status/ | grep .ready); do
wal_name=${i%.*}
LOG_STOP_TIME=$(pg_waldump ${wal_name} --rmgr=Transaction 2>/dev/null | grep 'desc: COMMIT' |tail -n 1|awk -F ' COMMIT ' '{print $2}'|awk -F ';' '{print $1}')
if [[ ! -z $LOG_STOP_TIME ]];then
global_stop_time=$(date -d "${LOG_STOP_TIME}" -u '+%Y-%m-%dT%H:%M:%SZ')
fi
if [ -f ${wal_name} ]; then
DP_log "upload ${wal_name}"
datasafed push -z zstd ${wal_name} "/${TODAY_INCR_LOG}/${wal_name}.zst"
mv -f ./archive_status/${i} ./archive_status/${wal_name}.done;
fi
done
}

# get start time of the wal log
function get_wal_log_start_time() {
local file="${1:?missing wal log name to analyze}"
local START_TIME=$(pg_waldump $file --rmgr=Transaction 2>/dev/null | grep 'desc: COMMIT' |head -n 1|awk -F ' COMMIT ' '{print $2}'|awk -F ';' '{print $1}')
if [[ ! -z ${START_TIME} ]];then
START_TIME=$(date -d "${START_TIME}" -u '+%Y-%m-%dT%H:%M:%SZ')
echo $START_TIME
fi
}

# pull wal log and decompress to KB_BACKUP_WORKDIR dir
function pull_wal_log() {
file="${1:?missing file name to pull}"
# pull and decompress
fileName=$(basename ${file})
datasafed pull -d zstd ${file} "$(DP_get_file_name_without_ext ${fileName})"
}

# get the start time for backup.status.timeRange
function get_start_time_for_range() {
local OLDEST_FILE=$(datasafed list -f --recursive / -o json | jq -s -r '.[] | sort_by(.mtime) | .[] | .path' |head -n 1)
if [ ! -z ${OLDEST_FILE} ]; then
START_TIME=$(DP_analyze_start_time_from_datasafed ${OLDEST_FILE} get_wal_log_start_time pull_wal_log)
echo ${START_TIME}
fi
}

# save backup status info to sync file
function save_backup_status() {
local TOTAL_SIZE=$(datasafed stat / | grep TotalSize | awk '{print $2}')
# if no size changes, return
if [[ -z ${TOTAL_SIZE} || ${TOTAL_SIZE} -eq 0 || ${TOTAL_SIZE} == ${global_old_size} ]];then
return
fi
global_old_size=${TOTAL_SIZE}
local START_TIME=$(get_start_time_for_range)
DP_save_backup_status_info "${TOTAL_SIZE}" "${START_TIME}" "${global_stop_time}"
}

function check_pg_process() {
local is_ok=false
for ((i=1;i<4;i++));do
is_secondary=$(${PSQL} -Atc "select pg_is_in_recovery()")
if [[ $? -eq 0 && (-z ${global_backup_in_secondary} || "${global_backup_in_secondary}" == "${is_secondary}") ]]; then
is_ok=true
break
fi
DP_error_log "target backup pod/${DP_TARGET_POD_NAME} is not OK, target role: ${TARGET_POD_ROLE}, pg_is_in_recovery: ${is_secondary}, retry detection!"
sleep 1
done
if [[ ${is_ok} == "false" ]];then
DP_error_log "target backup pod/${DP_TARGET_POD_NAME} is not OK, target role: ${TARGET_POD_ROLE}, pg_is_in_recovery: ${is_secondary}!"
exit 1
fi
}

# trap term signal
trap "echo 'Terminating...' && sync && exit 0" TERM
DP_log "start to archive wal logs"
while true; do

# check if pg process is ok
check_pg_process

# switch wal log
switch_wal_log

# upload wal log
upload_wal_log

# save backup status which will be updated to `backup` CR by the sidecar
save_backup_status

# purge the expired wal logs
purge_expired_files
sleep ${LOG_ARCHIVE_SECONDS}
done
35 changes: 35 additions & 0 deletions addons/postgresql/dataprotection/postgresql-pitr-restore.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# use datasafed and default config
export WALG_DATASAFED_CONFIG=""
export PATH="$PATH:$DP_DATASAFED_BIN_PATH"
export DATASAFED_BACKEND_BASE_PATH="$DP_BACKUP_BASE_PATH"

if [[ -d ${DATA_DIR}.old ]] && [[ ! -d ${DATA_DIR} ]]; then
# if dataDir.old exists but dataDir not exits, retry it
mv ${DATA_DIR}.old ${DATA_DIR}
exit 0;
fi

mkdir -p ${PITR_DIR};

latest_wal=$(ls ${DATA_DIR}/pg_wal -lI "*.history" | grep ^- | awk '{print $9}' | sort | tail -n 1)
start_wal_log=`basename $latest_wal`

DP_log "fetch-wal-log ${PITR_DIR} ${start_wal_log} \"${DP_RESTORE_TIME}\" true"
fetch-wal-log ${PITR_DIR} ${start_wal_log} "${DP_RESTORE_TIME}" true

chmod 777 -R ${PITR_DIR};
touch ${DATA_DIR}/recovery.signal;
mkdir -p ${CONF_DIR};
chmod 777 -R ${CONF_DIR};
mkdir -p ${RESTORE_SCRIPT_DIR};
echo "#!/bin/bash" > ${RESTORE_SCRIPT_DIR}/kb_restore.sh;
echo "[[ -d '${DATA_DIR}.old' ]] && mv -f ${DATA_DIR}.old/* ${DATA_DIR}/;" >> ${RESTORE_SCRIPT_DIR}/kb_restore.sh;
echo "sync;" >> ${RESTORE_SCRIPT_DIR}/kb_restore.sh;
chmod +x ${RESTORE_SCRIPT_DIR}/kb_restore.sh;
echo "restore_command='case "%f" in *history) cp ${PITR_DIR}/%f %p ;; *) mv ${PITR_DIR}/%f %p ;; esac'" > ${CONF_DIR}/recovery.conf;
echo "recovery_target_time='${DP_RESTORE_TIME}'" >> ${CONF_DIR}/recovery.conf;
echo "recovery_target_action='promote'" >> ${CONF_DIR}/recovery.conf;
echo "recovery_target_timeline='latest'" >> ${CONF_DIR}/recovery.conf;
mv ${DATA_DIR} ${DATA_DIR}.old;
DP_log "done.";
sync;
Loading

0 comments on commit c4749e2

Please sign in to comment.