Skip to content

Commit

Permalink
Merge pull request #1 from dr7ana/liblokinet-cherrypick
Browse files Browse the repository at this point in the history
Merging cherry-picks back to testnet branch:
-  oxen-io#2164
-  oxen-io#2134
  • Loading branch information
dr7ana authored Apr 28, 2023
2 parents 74e0fc2 + aea631b commit a288917
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 13 deletions.
9 changes: 4 additions & 5 deletions llarp/iwp/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ namespace llarp
void
Session::EncryptAndSend(ILinkSession::Packet_t data)
{
if (m_State == State::Closed)
return;
m_EncryptNext.emplace_back(std::move(data));
TriggerPump();
if (!IsEstablished())
Expand Down Expand Up @@ -179,12 +181,9 @@ namespace llarp
return;
auto close_msg = CreatePacket(Command::eCLOS, 0, 16, 16);
m_Parent->UnmapAddr(m_RemoteAddr);
m_State = State::Closed;
if (m_SentClosed.test_and_set())
return;
EncryptAndSend(std::move(close_msg));

LogInfo(m_Parent->PrintableName(), " closing connection to ", m_RemoteAddr);
m_State = State::Closed;
}

bool
Expand Down Expand Up @@ -355,7 +354,7 @@ namespace llarp
bool
Session::TimedOut(llarp_time_t now) const
{
if (m_State == State::Ready)
if (m_State == State::Ready || m_State == State::LinkIntro)
{
return now > m_LastRX
&& now - m_LastRX
Expand Down
1 change: 0 additions & 1 deletion llarp/iwp/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ namespace llarp

std::atomic_flag m_PlaintextEmpty;
llarp::thread::Queue<CryptoQueue_t> m_PlaintextRecv;
std::atomic_flag m_SentClosed;

void
EncryptWorker(CryptoQueue_t msgs);
Expand Down
3 changes: 1 addition & 2 deletions llarp/lokinet_shared.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,8 +715,7 @@ extern "C"
return;
}

auto on_open = [ctx, localAddr, remote, open_cb](
bool success, void* user_data) {
auto on_open = [ctx, localAddr, remote, open_cb](bool success, void* user_data) {
llarp::log::info(
logcat,
"Quic tunnel {}<->{}.",
Expand Down
8 changes: 7 additions & 1 deletion llarp/quic/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,13 @@ namespace llarp::quic
ngtcp2_pkt_info pi;

auto written = ngtcp2_conn_write_connection_close(
conn, &conn.path.path, &pi, u8data(conn.conn_buffer), conn.conn_buffer.size(), &err, get_timestamp());
conn,
&conn.path.path,
&pi,
u8data(conn.conn_buffer),
conn.conn_buffer.size(),
&err,
get_timestamp());
if (written <= 0)
{
log::warning(
Expand Down
2 changes: 1 addition & 1 deletion llarp/quic/tunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ namespace llarp::quic
log::info(logcat, "EOF on connection to {}:{}", c.peer().ip, c.peer().port);
if (auto stream = c.data<Stream>())
{
stream->set_eof(); // CloseEvent will send graceful shutdown to other end
stream->set_eof(); // CloseEvent will send graceful shutdown to other end
}
c.close();
});
Expand Down
19 changes: 16 additions & 3 deletions llarp/router/router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,23 @@ namespace llarp
_lastTick = llarp::time_now_ms();
m_NextExploreAt = Clock_t::now();
m_Pump = _loop->make_waker([this]() { PumpLL(); });
m_Work = _loop->make_waker([this]() { submit_work(); });
}

Router::~Router()
{
llarp_dht_context_free(_dht);
}

void
Router::submit_work()
{
m_lmq->job([work = std::move(m_WorkJobs)]() {
for (const auto& job : work)
job();
});
}

void
Router::PumpLL()
{
Expand Down Expand Up @@ -482,8 +492,8 @@ namespace llarp
LogError("RC is invalid, not saving");
return false;
}
if (m_isServiceNode)
_nodedb->Put(_rc);
if (IsServiceNode())
_nodedb->Put(rc());
QueueDiskIO([&]() { HandleSaveRC(); });
return true;
}
Expand Down Expand Up @@ -1631,7 +1641,10 @@ namespace llarp
void
Router::QueueWork(std::function<void(void)> func)
{
m_lmq->job(std::move(func));
_loop->call([this, func = std::move(func)]() mutable {
m_WorkJobs.push_back(std::move(func));
m_Work->Trigger();
});
}

void
Expand Down
8 changes: 8 additions & 0 deletions llarp/router/router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ namespace llarp
path::BuildLimiter m_PathBuildLimiter;

std::shared_ptr<EventLoopWakeup> m_Pump;
std::shared_ptr<EventLoopWakeup> m_Work;
std::vector<std::function<void()>> m_WorkJobs;

/// submits cpu heavy work from last event loop tick cycle to worker threads.
void
submit_work();

path::BuildLimiter&
pathBuildLimiter() override
Expand Down Expand Up @@ -196,9 +202,11 @@ namespace llarp
return _vpnPlatform.get();
}

/// queue functionally pure cpu heavy work to be done in another thread.
void
QueueWork(std::function<void(void)> func) override;

/// queue disk io bound work to be done in the disk io thread.
void
QueueDiskIO(std::function<void(void)> func) override;

Expand Down

0 comments on commit a288917

Please sign in to comment.