Skip to content

Commit

Permalink
Merge pull request #33 from magenbluten/develop
Browse files Browse the repository at this point in the history
some minor refactoring and increased memcaps
  • Loading branch information
satta authored May 3, 2019
2 parents 11cfb2b + 1f3c668 commit 867af8c
Show file tree
Hide file tree
Showing 11 changed files with 435 additions and 132 deletions.
13 changes: 7 additions & 6 deletions backend/balboa-backend-console/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,24 @@ CC=$(CROSS_PREFIX)$(CCOMPILER)
hdr-lib=bs.h trace.h protocol.h engine.h mpack-config.h
hdr-lib-y=$(addprefix ../lib/,$(hdr-lib))

src-console-read-y=../lib/mpack.c ../lib/trace.c ../lib/protocol.c main.c
src-console=mpack.c trace.c protocol.c engine.c
src-console-y=$(addprefix ../lib/,$(src-console)) main.c

target-console-read-y=$(OUT)$(CROSS_PREFIX)balboa-backend-console
target-console-y=$(OUT)$(CROSS_PREFIX)balboa-backend-console

dirs-y=.

all: $(target-console-read-y)
all: $(target-console-y)

$(OUT)build:
@echo " mkdir"
$(Q)mkdir -p $(addprefix $(OUT),$(dirs-y))
$(Q)touch $@

$(target-console-read-y): Makefile $(OUT)build $(src-console-read-y) $(hdr-lib-y) Makefile
$(CC) $(CFLAGS) $(src-console-read-y) -o $(target-console-read-y) $(LDFLAGS)
$(target-console-y): Makefile $(OUT)build $(src-console-y) $(hdr-lib-y) Makefile
$(CC) $(CFLAGS) $(src-console-y) -o $(target-console-y) $(LDFLAGS)

clean:
rm -f $(target-console-read-y)
rm -f $(target-console-y)
rm -f $(OUT)build
rmdir $(OUT)
141 changes: 134 additions & 7 deletions backend/balboa-backend-console/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ static ssize_t dump_process(state_t* state, FILE* is) {
case -1: return (entries);
default:
L(log_error("blb_dump_stream_decode() failed with `%d`", rc));
blb_protocol_dump_stream_teardown(stream);
return (-entries);
}
}
blb_protocol_dump_stream_teardown(stream);
return (entries);
}

Expand Down Expand Up @@ -91,9 +93,9 @@ static int dump(state_t* state, const char* dump_file) {
}
}

static int dump_entry_json_cb(state_t* state, protocol_entry_t* entry) {
assert(state->os != NULL);
bytestring_sink_t __sink = bs_sink(state->scrtch0, state->scrtch0_sz);
static void dump_entry_as_json(
FILE* os, uint8_t* p, size_t p_sz, const protocol_entry_t* entry) {
bytestring_sink_t __sink = bs_sink(p, p_sz);
bytestring_sink_t* sink = &__sink;
int ok = 0;
char buf[64] = {0};
Expand All @@ -120,12 +122,15 @@ static int dump_entry_json_cb(state_t* state, protocol_entry_t* entry) {
ok += bs_append1(sink, '}');
ok += bs_append1(sink, '\n');
if(ok == 0) {
fwrite(sink->p, sink->index, 1, state->os);
return (0);
fwrite(sink->p, sink->index, 1, os);
} else {
fputs("{\"error\":\"buffer-out-of-space\"}", state->os);
return (-1);
fputs("{\"error\":\"buffer-out-of-space\"}", os);
}
}

static int dump_entry_json_cb(state_t* state, protocol_entry_t* entry) {
ASSERT(state->os != NULL);
dump_entry_as_json(state->os, state->scrtch0, state->scrtch0_sz, entry);
return (0);
}

Expand Down Expand Up @@ -156,6 +161,124 @@ static int dump_entry_replay_cb(state_t* state, protocol_entry_t* entry) {
return (0);
}

static int main_query(int argc, char** argv) {
engine_config_t engine_config = blb_engine_client_config_init();
trace_config_t trace_config = {.stream = stderr,
.host = "pdns",
.app = "balboa-backend-console",
// leaking process number ...
.procid = getpid()};
protocol_query_request_t __query = {0}, *query = &__query;
query->limit = 100;
ketopt_t opt = KETOPT_INIT;
int c;
while((c = ketopt(&opt, argc, argv, 1, "h:p:r:d:s:l:vSR", NULL)) >= 0) {
switch(c) {
case 'v': trace_config.verbosity += 1; break;
case 'h': engine_config.host = opt.arg; break;
case 'p': engine_config.port = atoi(opt.arg); break;
case 'd':
if(opt.arg == NULL) {
L(log_emergency("string for option `-d` required"));
}
query->qrdata = opt.arg;
query->qrdata_len = strlen(opt.arg);
break;
case 'r':
if(opt.arg == NULL) {
L(log_emergency("string for option `-r` required"));
}
query->qrrname = opt.arg;
query->qrrname_len = strlen(opt.arg);
break;
case 's':
if(opt.arg == NULL) {
L(log_emergency("string for option `-s` required"));
}
query->qsensorid = opt.arg;
query->qsensorid_len = strlen(opt.arg);
break;
default: break;
}
}

theTrace_stream_use(&trace_config);

conn_t* conn = blb_engine_client_new(&engine_config);
engine_t* engine = conn->engine;

V(blb_protocol_log_query(query));

ssize_t used = blb_protocol_encode_query_request(
query, conn->scrtch, ENGINE_CONN_SCRTCH_SZ);
if(used <= 0) {
L(log_error("unable to encode query"));
blb_engine_teardown(engine);
blb_engine_conn_teardown(conn);
return (-1);
}

int rc = blb_conn_write_all(conn, conn->scrtch, used);
if(rc != 0) {
L(log_debug("blb_conn_write_all() failed"));
blb_engine_teardown(engine);
blb_engine_conn_teardown(conn);
return (-1);
}

protocol_stream_t* stream = blb_engine_stream_new(conn);
if(stream == NULL) {
L(log_error("blb_engine_stream_new() failed"));
blb_engine_teardown(engine);
blb_engine_conn_teardown(conn);
return (-1);
}

enum state_t { START, STREAM, END };

enum state_t st = START;
while(1) {
protocol_message_t msg;
int rc = blb_protocol_stream_decode(stream, &msg);
if(rc < -1) {
L(log_error("blb_protocol_stream_decode() failed"));
blb_protocol_stream_teardown(stream);
blb_engine_teardown(engine);
blb_engine_conn_teardown(conn);
return (-1);
} else if(rc == -1) {
break;
}
switch(st) {
case START: {
switch(msg.ty) {
case PROTOCOL_QUERY_STREAM_START_RESPONSE: st = STREAM; break;
default: L(log_emergency("(start) received invalid message"));
}
break;
}
case STREAM: {
switch(msg.ty) {
case PROTOCOL_QUERY_STREAM_END_RESPONSE: st = END; goto done;
case PROTOCOL_QUERY_STREAM_DATA_RESPONSE: {
uint8_t json[1024 * 10];
dump_entry_as_json(stdout, json, sizeof(json), &msg.u.entry);
break;
}
default: L(log_emergency("(stream) received invalid message"));
}
break;
}
default: L(log_emergency("invalid state: impossible"));
}
}
done:
blb_protocol_stream_teardown(stream);
blb_engine_teardown(engine);
blb_engine_conn_teardown(conn);
return (0);
}

static int main_jsonize(int argc, char** argv) {
const char* dump_file = "-";
int verbosity = 0;
Expand Down Expand Up @@ -392,6 +515,10 @@ int main(int argc, char** argv) {
argc--;
argv++;
res = main_dump(argc, argv);
} else if(strcmp(argv[1], "query") == 0) {
argc--;
argv++;
res = main_query(argc, argv);
} else if(strcmp(argv[1], "--version") == 0) {
version();
} else {
Expand Down
31 changes: 14 additions & 17 deletions backend/balboa-mock/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,40 @@
#include <unistd.h>

int main(int argc, char** argv) {
int verbosity = 0;
int daemonize = 0;
char* host = "127.0.0.1";
int port = 4242;
int conn_throttle_limit = 64;
engine_config_t engine_config = blb_engine_server_config_init();
trace_config_t trace_config = {.stream = stderr,
.host = "pdns",
.app = argv[0],
// leaking process number ...
.procid = getpid()};

.procid = getpid(),
.verbosity = 0};
ketopt_t opt = KETOPT_INIT;
int c;
while((c = ketopt(&opt, argc, argv, 1, "j:l:p:vD", NULL)) >= 0) {
while((c = ketopt(&opt, argc, argv, 1, "j:l:p:vDSR", NULL)) >= 0) {
switch(c) {
case 'D': daemonize = 1; break;
case 'l': host = opt.arg; break;
case 'p': port = atoi(opt.arg); break;
case 'v': verbosity += 1; break;
case 'j': conn_throttle_limit = atoi(opt.arg); break;
case 'l': engine_config.host = opt.arg; break;
case 'p': engine_config.port = atoi(opt.arg); break;
case 'v': trace_config.verbosity += 1; break;
case 'j': engine_config.conn_throttle_limit = atoi(opt.arg); break;
case 'S': engine_config.enable_signal_consumer = false; break;
case 'R': engine_config.enable_stats_reporter = false; break;
default: break;
}
}

theTrace_stream_use(&trace_config);
if(daemonize) {
if(daemonize
&& (trace_config.stream == stderr || trace_config.stream == stdout)) {
theTrace_set_verbosity(0);
} else {
theTrace_set_verbosity(verbosity);
}

blb_engine_signals_init();

db_t* db = blb_mock_open();
if(db == NULL) { return (1); }

engine_t* e = blb_engine_new(db, host, port, conn_throttle_limit);
engine_config.db = db;
engine_t* e = blb_engine_server_new(&engine_config);
if(e == NULL) {
L(log_error("unable to create engine"));
blb_dbi_teardown(db);
Expand Down
56 changes: 28 additions & 28 deletions backend/balboa-rocksdb/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Usage: balboa-rocksdb [options]\n\
-l listen address (default: 127.0.0.1)\n\
-p listen port (default: 4242)\n\
-v increase verbosity; can be passed multiple times\n\
-S disable signal handling\n\
-R disable engine stats reporter\n\
-j connection throttle limit, maximum concurrent connections (default: 64)\n\
--membudget <memory-in-bytes> rocksdb membudget option (value: %" PRIu64
")\n\
Expand All @@ -47,17 +49,15 @@ Usage: balboa-rocksdb [options]\n\
}

int main(int argc, char** argv) {
int verbosity = 0;
int daemonize = 0;
char* host = "127.0.0.1";
int port = 4242;
int conn_throttle_limit = 64;
blb_rocksdb_config_t config = blb_rocksdb_config_init();
blb_rocksdb_config_t rocksdb_config = blb_rocksdb_config_init();
engine_config_t engine_config = blb_engine_server_config_init();
trace_config_t trace_config = {.stream = stderr,
.host = "pdns",
.app = "balboa-rocksdb",
// leaking process number ...
.procid = getpid()};
.procid = getpid(),
.verbosity = 0};
ketopt_t opt = KETOPT_INIT;
static ko_longopt_t opts[] = {
{"membudget", ko_required_argument, 301},
Expand All @@ -69,42 +69,42 @@ int main(int argc, char** argv) {
{"version", ko_no_argument, 307},
{NULL, 0, 0}};
int c;
while((c = ketopt(&opt, argc, argv, 1, "j:d:l:p:vDh", opts)) >= 0) {
while((c = ketopt(&opt, argc, argv, 1, "j:d:l:p:vDSRh", opts)) >= 0) {
switch(c) {
case 'D': daemonize = 1; break;
case 'd': config.path = opt.arg; break;
case 'l': host = opt.arg; break;
case 'p': port = atoi(opt.arg); break;
case 'v': verbosity += 1; break;
case 'j': conn_throttle_limit = atoi(opt.arg); break;
case 'h': usage(&config);
case 301: config.membudget = atoll(opt.arg); break;
case 302: config.parallelism = atoi(opt.arg); break;
case 303: config.max_log_file_size = atoi(opt.arg); break;
case 304: config.max_open_files = atoi(opt.arg); break;
case 305: config.keep_log_file_num = atoi(opt.arg); break;
case 306: config.path = opt.arg; break;
case 'd': rocksdb_config.path = opt.arg; break;
case 'l': engine_config.host = opt.arg; break;
case 'p': engine_config.port = atoi(opt.arg); break;
case 'v': trace_config.verbosity += 1; break;
case 'j': engine_config.conn_throttle_limit = atoi(opt.arg); break;
case 'S': engine_config.enable_signal_consumer = false; break;
case 'R': engine_config.enable_stats_reporter = false; break;
case 'h': usage(&rocksdb_config);
case 301: rocksdb_config.membudget = atoll(opt.arg); break;
case 302: rocksdb_config.parallelism = atoi(opt.arg); break;
case 303: rocksdb_config.max_log_file_size = atoi(opt.arg); break;
case 304: rocksdb_config.max_open_files = atoi(opt.arg); break;
case 305: rocksdb_config.keep_log_file_num = atoi(opt.arg); break;
case 306: rocksdb_config.path = opt.arg; break;
case 307: version();
default: usage(&config);
default: usage(&rocksdb_config);
}
}

theTrace_stream_use(&trace_config);
if(daemonize) {
if(daemonize
&& (trace_config.stream == stderr || trace_config.stream == stdout)) {
theTrace_set_verbosity(0);
} else {
theTrace_set_verbosity(verbosity);
}

blb_engine_signals_init();

db_t* db = blb_rocksdb_open(&config);
db_t* db = blb_rocksdb_open(&rocksdb_config);
if(db == NULL) {
L(log_error("unable to open rocksdb at path `%s`", config.path));
L(log_error("unable to open rocksdb at path `%s`", rocksdb_config.path));
return (1);
}

engine_t* e = blb_engine_new(db, host, port, conn_throttle_limit);
engine_config.db = db;
engine_t* e = blb_engine_server_new(&engine_config);
if(e == NULL) {
L(log_error("unable to create engine"));
blb_dbi_teardown(db);
Expand Down
7 changes: 4 additions & 3 deletions backend/balboa-sqlite/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,22 @@ int main(int argc, char** argv) {
theTrace_set_verbosity(verbosity);
}

blb_engine_signals_init();

db_t* db = blb_sqlite_open(&config);
if(db == NULL) {
L(log_error("unable to open sqlite database at `%s`", config.path));
return (1);
}

engine_t* e = blb_engine_new(db, host, port, conn_throttle_limit);
engine_t* e = blb_engine_server_new(db, host, port, conn_throttle_limit);
if(e == NULL) {
L(log_error("unable to create io engine"));
blb_dbi_teardown(db);
return (1);
}

blb_engine_spawn_signal_consumer(e);
blb_engine_spawn_stats_reporter(e);

blb_engine_run(e);

blb_engine_teardown(e);
Expand Down
Loading

0 comments on commit 867af8c

Please sign in to comment.