Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for :ssl connections #74

Open
wants to merge 7 commits into
base: v1.0.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,39 @@ jobs:
run: mix test --cover

ES-21:
name: ES 21.6.0
name: ES 21.6.0 with SSL
runs-on: ubuntu-latest
strategy:
matrix:
elixir: ['1.12.3']
erlang: ['24.1']

services:
es:
image: eventstore/eventstore:21.6.0-buster-slim
ports: ['1113:1113']
env:
EVENTSTORE_RUN_PROJECTIONS: "All"
EVENTSTORE_START_STANDARD_PROJECTIONS: "true"
EVENTSTORE_CLUSTER_SIZE: 1
EVENTSTORE_EXT_TCP_PORT: 1113
EVENTSTORE_INSECURE: "true"
EVENTSTORE_ENABLE_EXTERNAL_TCP: "true"
env:
EXTREME_CACERTFILE: /certs/ca/ca.crt

steps:
- name: Generate EventStoreDB SSL certificates
run: |
sudo mkdir -p /certs
sudo chmod a+rw /certs
docker run \
--entrypoint /bin/bash \
--mount type=bind,source=/certs,target=/certs \
eventstore/es-gencert-cli:1.0.2 \
-c "mkdir -p ./certs && cd /certs && es-gencert-cli create-ca && es-gencert-cli create-node -out ./node -ip-addresses 127.0.0.1 -dns-names localhost"
- name: Start the EventStoreDB container
run: |
docker run \
--name eventstore \
--detach \
--mount type=bind,source=/certs,target=/etc/eventstore/certs \
--publish 1113:1113 \
eventstore/eventstore:21.6.0-buster-slim \
--run-projections=All \
--enable-external-tcp=true \
--trusted-root-certificates-path=/etc/eventstore/certs/ca \
--certificate-file=/etc/eventstore/certs/node/node.crt \
--certificate-private-key-file=/etc/eventstore/certs/node/node.key \
--advertise-host-to-client-as=127.0.0.1
- uses: actions/checkout@v2
- name: Set up Elixir
uses: erlef/setup-beam@v1
Expand All @@ -103,4 +116,4 @@ jobs:
- name: Check formatting
run: mix format --check-formatted
- name: Run tests
run: mix test --cover --exclude=authentication
run: mix test --cover --exclude=gossip
30 changes: 23 additions & 7 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,26 @@ config :ex_unit,
assert_receive_timeout: 10_000,
capture_log: true

config :extreme, TestConn,
db_type: "node",
host: "localhost",
port: "1113",
username: "admin",
password: "changeit",
connection_name: "extreme_test"
transport_opts =
if cacertfile = System.get_env("EXTREME_CACERTFILE") do
[
transport: :ssl,
transport_opts: [
verify: :verify_peer,
cacertfile: cacertfile
]
]
else
[]
end

config :extreme,
TestConn,
[
db_type: "node",
host: "localhost",
port: "1113",
username: "admin",
password: "changeit",
connection_name: "extreme_test"
] ++ transport_opts
20 changes: 14 additions & 6 deletions lib/extreme/cluster_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,31 @@ defmodule Extreme.ClusterConnection do

require Logger

def gossip_with(nodes, gossip_timeout, mode)
def gossip_with(nodes, opts)

def gossip_with([], _, _), do: {:error, :no_more_gossip_seeds}
def gossip_with([], _opts), do: {:error, :no_more_gossip_seeds}

def gossip_with([node | rest_nodes], opts) do
mode = Keyword.fetch!(opts, :mode)
scheme = if Keyword.fetch!(opts, :transport) == :ssl, do: 'https', else: 'http'
url = '#{scheme}://#{node.host}:#{node.port}/gossip?format=json'

request_opts = [
timeout: Keyword.fetch!(opts, :timeout),
ssl: Keyword.fetch!(opts, :transport_opts)
]

def gossip_with([node | rest_nodes], gossip_timeout, mode) do
url = 'http://#{node.host}:#{node.port}/gossip?format=json'
Logger.info("Gossip with #{url}")

case :httpc.request(:get, {url, []}, [timeout: gossip_timeout], []) do
case :httpc.request(:get, {url, []}, request_opts, []) do
{:ok, {{_version, 200, _status}, _headers, body}} ->
body
|> Jason.decode!()
|> _choose_node(mode)

error ->
Logger.error("Error getting gossip: #{inspect(error)}")
gossip_with(rest_nodes, gossip_timeout, mode)
gossip_with(rest_nodes, opts)
end
end

Expand Down
23 changes: 11 additions & 12 deletions lib/extreme/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,9 @@ defmodule Extreme.Configuration do
do: {:ok, _get_host(configuration), _get_port(configuration)}

defp _get_node(:cluster, configuration) do
gossip_timeout = Keyword.get(configuration, :gossip_timeout, 1_000)
mode = Keyword.get(configuration, :mode, :write)

configuration
|> Keyword.fetch!(:nodes)
|> ClusterConnection.gossip_with(gossip_timeout, mode)
|> ClusterConnection.gossip_with(gossip_opts(configuration))
end

defp _get_node(:cluster_dns, configuration) do
Expand All @@ -66,18 +63,11 @@ defmodule Extreme.Configuration do
|> Keyword.get(:port, 2113)
|> Tools.cast_to_integer()

gossip_timeout =
configuration
|> Keyword.get(:gossip_timeout, 1_000)
|> Tools.cast_to_integer()

mode = Keyword.get(configuration, :mode, :write)

ips
|> Enum.map(fn ip ->
%{host: to_string(:inet.ntoa(ip)), port: gossip_port}
end)
|> ClusterConnection.gossip_with(gossip_timeout, mode)
|> ClusterConnection.gossip_with(gossip_opts(configuration))
end

# Returns `:host` value from `configuration` as charlist.
Expand All @@ -95,4 +85,13 @@ defmodule Extreme.Configuration do
|> Keyword.fetch!(:port)
|> Tools.cast_to_integer()
end

defp gossip_opts(configuration) do
[
mode: Keyword.get(configuration, :mode, :write),
timeout: Keyword.get(configuration, :gossip_timeout, 1_000) |> Tools.cast_to_integer(),
transport: Keyword.get(configuration, :transport, :tcp),
transport_opts: Keyword.get(configuration, :transport_opts, [])
]
end
end
67 changes: 55 additions & 12 deletions lib/extreme/connection.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
defmodule Extreme.Connection do
use GenServer
alias Extreme.{Configuration, Tcp, RequestManager}
alias Extreme.{Configuration, RequestManager}
alias Extreme.ConnectionImpl, as: Impl
require Logger

defmodule State do
defstruct ~w(base_name socket received_data)a
defstruct ~w(base_name socket received_data transport)a
end

def start_link(base_name, configuration),
Expand All @@ -18,23 +18,50 @@ defmodule Extreme.Connection do
|> GenServer.cast({:execute, message})
end

@doc """
Opens a connection with EventStore. Returns `{:ok, socket}` on success or
`{:error, :max_attempt_exceeded}` if connection wasn't made in `:max_attempts`
provided in `configuration`. If not specified, `max_attempts` defaults to :infinity
"""
def connect(host, port, configuration, attempt \\ 1) do
configuration
|> Keyword.get(:max_attempts, :infinity)
|> case do
:infinity -> true
max when attempt <= max -> true
_any -> false
end
|> if do
if attempt > 1 do
configuration
|> Keyword.get(:reconnect_delay, 1_000)
|> :timer.sleep()
end

_connect(host, port, configuration, attempt)
else
{:error, :max_attempt_exceeded}
end
end

@impl true
def init({base_name, configuration}) do
GenServer.cast(self(), {:connect, configuration, 1})

state = %State{
base_name: base_name,
received_data: ""
received_data: "",
transport: Keyword.get(configuration, :transport, :tcp)
}

{:ok, state}
end

@impl true
def handle_cast({:connect, configuration, attempt}, state) do
configuration
|> _connect(attempt)
|> case do
{:ok, host, port} = Configuration.get_node(configuration)

case connect(host, port, configuration, attempt) do
{:ok, socket} ->
Logger.info(fn -> "Successfully connected to EventStore" end)

Expand All @@ -58,7 +85,7 @@ defmodule Extreme.Connection do
end

@impl true
def handle_info({:tcp, socket, pkg}, %State{socket: socket} = state) do
def handle_info({tag, socket, pkg}, %State{socket: socket} = state) when tag in [:tcp, :ssl] do
{:ok, state} = Impl.receive_package(pkg, state)
{:noreply, state}
end
Expand All @@ -72,10 +99,26 @@ defmodule Extreme.Connection do
RequestManager.kill_all_subscriptions(state.base_name)
end

defp _connect(configuration, attempt) do
{:ok, host, port} = Configuration.get_node(configuration)
Tcp.connect(host, port, configuration, attempt)
end

def _name(base_name), do: Module.concat(base_name, Connection)

defp _connect(host, port, configuration, attempt) do
Logger.info(fn -> "Connecting Extreme to #{host}:#{port}" end)

transport_module =
case Keyword.get(configuration, :transport, :tcp) do
:tcp -> :gen_tcp
:ssl -> :ssl
end

opts = Keyword.get(configuration, :transport_opts, []) ++ [:binary, active: :once]

case transport_module.connect(host, port, opts) do
{:ok, socket} ->
{:ok, socket}

reason ->
Logger.warn(fn -> "Error connecting to EventStore: #{inspect(reason)}" end)
connect(host, port, configuration, attempt + 1)
end
end
end
19 changes: 15 additions & 4 deletions lib/extreme/connection_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,26 @@ defmodule Extreme.ConnectionImpl do

require Logger

def execute(message, %State{socket: socket}),
do: :gen_tcp.send(socket, message)
def execute(message, %State{transport: :tcp, socket: socket}) do
:gen_tcp.send(socket, message)
end

def execute(message, %State{transport: :ssl, socket: socket}) do
:ssl.send(socket, message)
end

def receive_package(pkg, %State{socket: socket, received_data: received_data} = state) do
:inet.setopts(socket, active: :once)
def receive_package(pkg, %State{received_data: received_data} = state) do
set_active_once(state)
state = _process_package(state, received_data <> pkg)
{:ok, state}
end

defp set_active_once(%State{transport: :tcp, socket: socket}),
do: :inet.setopts(socket, active: :once)

defp set_active_once(%State{transport: :ssl, socket: socket}),
do: :ssl.setopts(socket, active: :once)

defp _process_package(
state,
<<message_length::32-unsigned-little-integer, content::binary-size(message_length),
Expand Down
49 changes: 0 additions & 49 deletions lib/extreme/tcp.ex

This file was deleted.

2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Extreme.Mixfile do

def application do
[
extra_applications: [:logger, :inets]
extra_applications: [:logger, :inets, :ssl]
]
end

Expand Down
6 changes: 4 additions & 2 deletions test/extreme/cluster_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ defmodule Extreme.ClusterConnectionTest do
assert {:ok, 'localhost', 1113} =
Extreme.ClusterConnection.gossip_with(
[%{host: "0.0.0.0", port: "2113"}],
20_000,
:write
timeout: 20_000,
mode: :write,
transport: :tcp,
transport_opts: []
)
end
end
Expand Down
Loading