Skip to content

Commit

Permalink
draft: implement the prepare fiber canceling
Browse files Browse the repository at this point in the history
  • Loading branch information
palage4a committed Jul 28, 2023
1 parent d66c390 commit a76d521
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 7 deletions.
38 changes: 32 additions & 6 deletions cartridge/twophase.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ vars:new('locks', {})
vars:new('prepared_config', nil)
vars:new('prepared_config_release_notification', fiber.cond())
vars:new('on_patch_triggers', {})
vars:new('prepare_fiber', nil)

vars:new('options', {
netbox_call_timeout = 1,
Expand All @@ -49,6 +50,7 @@ vars:new('options', {
local function release_config_lock()
local prepared_config = vars.prepared_config
vars.prepared_config = nil
vars.prepare_fiber = nil
vars.prepared_config_release_notification:broadcast()
return prepared_config
end
Expand Down Expand Up @@ -134,6 +136,7 @@ end
-- @treturn[2] nil
-- @treturn[2] table Error description
local function prepare_2pc(upload_id)
vars.prepare_fiber = fiber.self()
local data
if type(upload_id) == 'table' then
-- Preserve compatibility with older versions.
Expand Down Expand Up @@ -171,15 +174,16 @@ local function prepare_2pc(upload_id)

local workdir = confapplier.get_workdir()
local path_prepare = fio.pathjoin(workdir, 'config.prepare')

if vars.prepared_config ~= nil then
local err = Prepare2pcError:new('Two-phase commit is locked')
log.warn('%s', err)
return nil, err
end

local ok, err = ClusterwideConfig.save(clusterwide_config, path_prepare)
if not ok and fio.path.exists(path_prepare) then
local prepare_path_exists = fio.path.exists(path_prepare)
fiber.testcancel()
if not ok and prepare_path_exists then
err = Prepare2pcError:new('Two-phase commit is locked')
end

Expand Down Expand Up @@ -267,13 +271,37 @@ local function commit_2pc()
end
end

local function cancel_prepare_fiber()
local f = vars.prepare_fiber
if not f then
log.warn("Prepare fiber was not created")
return
end
if f:status() == 'dead' then
return
end
f:set_joinable(true)
local ok, err
ok, err = pcall(f.cancel, f)
if not ok then
log.warn("Cancel prepare fiber error: " .. err)
return
end
ok, err = pcall(f.join, f)
if not ok then
log.warn("Join prepare fiber error: " .. err)
return
end
end

--- Two-phase commit - abort stage.
--
-- Release the lock for further commit attempts.
-- @function abort_2pc
-- @local
-- @treturn boolean true
local function abort_2pc()
cancel_prepare_fiber()
local workdir = confapplier.get_workdir()
local path_prepare = fio.pathjoin(workdir, 'config.prepare')
ClusterwideConfig.remove(path_prepare)
Expand Down Expand Up @@ -440,7 +468,6 @@ local function twophase_commit(opts)
end

local _2pc_error
local abortion_list = {}
local activity_name = opts.activity_name or 'twophase_commit'

goto prepare
Expand Down Expand Up @@ -472,7 +499,6 @@ local function twophase_commit(opts)
for _, uri in ipairs(opts.uri_list) do
if retmap[uri] then
log.warn('Prepared for %s at %s', activity_name, uri)
table.insert(abortion_list, uri)
end
end
for _, uri in ipairs(opts.uri_list) do
Expand Down Expand Up @@ -523,11 +549,11 @@ local function twophase_commit(opts)
log.warn('(2PC) %s abort phase...', activity_name)

local retmap, errmap = pool.map_call(opts.fn_abort, nil,{
uri_list = abortion_list,
uri_list = opts.uri_list,
timeout = vars.options.netbox_call_timeout,
})

for _, uri in ipairs(abortion_list) do
for _, uri in ipairs(opts.uri_list) do
if retmap[uri] then
log.warn('Aborted %s at %s', activity_name, uri)
else
Expand Down
11 changes: 10 additions & 1 deletion cartridge/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,14 @@ local function mktree(path)
for _, dir in ipairs(dirs) do
current_dir = fio.pathjoin(current_dir, dir)
local stat = fio.stat(current_dir)
fiber.testcancel()
if stat == nil then
local _, err = fio.mkdir(current_dir)
fiber.testcancel()
local _errno = errno()
if err ~= nil and not fio.path.is_dir(current_dir) then
local is_dir = fio.path.is_dir(current_dir)
fiber.testcancel()
if err ~= nil and not is_dir then
return nil, errors.new('MktreeError',
'Error creating directory %q: %s',
current_dir, errno.strerror(_errno)
Expand Down Expand Up @@ -208,21 +212,26 @@ local function file_write(path, data, opts, perm)
opts = opts or {'O_CREAT', 'O_WRONLY', 'O_TRUNC'}
perm = perm or tonumber(644, 8)
local file = fio.open(path, opts, perm)
fiber.testcancel()
if file == nil then
return nil, OpenFileError:new('%s: %s', path, errno.strerror())
end

local res = file:write(data)
fiber.testcancel()
if not res then
local err = WriteFileError:new('%s: %s', path, errno.strerror())
fio.unlink(path)
fiber.testcancel()
return nil, err
end

local res = file:close()
fiber.testcancel()
if not res then
local err = WriteFileError:new('%s: %s', path, errno.strerror())
fio.unlink(path)
fiber.testcancel()
return nil, err
end

Expand Down
15 changes: 15 additions & 0 deletions test/integration/twophase_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,18 @@ function g.test_timeouts()
t.assert_equals(twophase.get_apply_config_timeout(), 111)
end)
end

-- Check we don't lock a clusterwide config updates
-- after exceeding of `validate_timeout_config` timeout
-- see https://github.com/tarantool/cartridge/issues/2119
function g.test_2pc_is_locked_after_prepare_timeout()
g.s1:exec(function()
local t = require('luatest')
local twophase = require('cartridge.twophase')
twophase.set_validate_config_timeout(0.001)
twophase.patch_clusterwide({})
twophase.set_validate_config_timeout(10) -- default
local _, err = twophase.patch_clusterwide({})
t.assert_not(err)
end)
end

0 comments on commit a76d521

Please sign in to comment.