Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue=#1281: add observer implement, including executor, multi-thread scan and trigger calculation etc. #1288

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ MARK_SRC := src/benchmark/mark.cc src/benchmark/mark_main.cc
TEST_SRC := src/utils/test/prop_tree_test.cc src/utils/test/tprinter_test.cc \
src/io/test/tablet_io_test.cc src/io/test/tablet_scanner_test.cc \
src/master/test/master_impl_test.cc src/io/test/load_test.cc \
src/common/test/thread_pool_test.cc
src/common/test/thread_pool_test.cc \
src/observer/test/observer_impl_test.cc
OBSERVER_SRC := $(wildcard src/observer/executor/*.cc)
OBSERVER_DEMO_SRC := $(wildcard src/observer/observer_demo.cc)

TEST_OUTPUT := test_output
UNITTEST_OUTPUT := $(TEST_OUTPUT)/unittest
Expand All @@ -69,29 +72,33 @@ MONITOR_OBJ := $(MONITOR_SRC:.cc=.o)
MARK_OBJ := $(MARK_SRC:.cc=.o)
HTTP_OBJ := $(HTTP_SRC:.cc=.o)
TEST_OBJ := $(TEST_SRC:.cc=.o)
OBSERVER_OBJ := $(OBSERVER_SRC:.cc=.o)
OBSERVER_DEMO_OBJ := $(OBSERVER_DEMO_SRC:.cc=.o)
ALL_OBJ := $(MASTER_OBJ) $(TABLETNODE_OBJ) $(IO_OBJ) $(SDK_OBJ) $(PROTO_OBJ) \
$(JNI_TERA_OBJ) $(OTHER_OBJ) $(COMMON_OBJ) $(SERVER_OBJ) $(CLIENT_OBJ) \
$(TEST_CLIENT_OBJ) $(TERA_C_OBJ) $(MONITOR_OBJ) $(MARK_OBJ) $(TEST_OBJ) \
$(SERVER_WRAPPER_OBJ)
$(SERVER_WRAPPER_OBJ) \
$(OBSERVER_OBJ) $(OBSERVER_DEMO_OBJ)
LEVELDB_LIB := src/leveldb/libleveldb.a
LEVELDB_UTIL := src/leveldb/util/histogram.o src/leveldb/port/port_posix.o

PROGRAM = tera_main tera_master tabletserver teracli teramo tera_test
PROGRAM = tera_main tera_master tabletserver teracli teramo tera_test observer_demo
LIBRARY = libtera.a
SOLIBRARY = libtera.so
TERA_C_SO = libtera_c.so
JNILIBRARY = libjni_tera.so
BENCHMARK = tera_bench tera_mark
TESTS = prop_tree_test tprinter_test string_util_test tablet_io_test \
tablet_scanner_test fragment_test progress_bar_test master_impl_test load_test \
thread_pool_test
thread_pool_test observer_impl_test
OBSERVER_LIBRARY = libobserver.a

.PHONY: all clean cleanall test

all: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK)
all: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) $(OBSERVER_LIBRARY)
mkdir -p build/include build/lib build/bin build/log build/benchmark
cp $(PROGRAM) build/bin
cp $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) build/lib
cp $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(OBSERVER_LIBRARY) build/lib
cp src/leveldb/tera_bench .
cp -r benchmark/*.sh benchmark/ycsb4tera/ $(BENCHMARK) build/benchmark
cp -r include build/
Expand All @@ -113,7 +120,7 @@ check: test
clean:
rm -rf $(ALL_OBJ) $(PROTO_OUT_CC) $(PROTO_OUT_H) $(TEST_OUTPUT)
$(MAKE) clean -C src/leveldb
rm -rf $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) $(TESTS) terahttp
rm -rf $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) $(TESTS) terahttp $(OBSERVER_LIBRARY)

cleanall:
$(MAKE) clean
Expand Down Expand Up @@ -162,7 +169,16 @@ src/leveldb/libleveldb.a: FORCE

tera_bench:

libobserver.a: $(OBSERVER_OBJ)
$(AR) -rs $@ $^

observer_demo : $(OBSERVER_DEMO_OBJ) $(OBSERVER_LIBRARY) $(LIBRARY)
$(CXX) -o $@ $^ $(LDFLAGS)

# unit test
observer_impl_test: src/observer/test/observer_impl_test.o $(OBSERVER_LIBRARY) $(LIBRARY)
$(CXX) -o $@ $^ $(LDFLAGS)

thread_pool_test: src/common/test/thread_pool_test.o $(LIBRARY)
$(CXX) -o $@ $^ $(LDFLAGS)

Expand Down Expand Up @@ -216,8 +232,8 @@ proto: $(PROTO_OUT_CC) $(PROTO_OUT_H)

# install output into system directories
.PHONY: install
install: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY)
install: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(OBSERVER_LIBRARY)
mkdir -p $(INSTALL_PREFIX)/bin $(INSTALL_PREFIX)/include $(INSTALL_PREFIX)/lib
cp -rf $(PROGRAM) $(INSTALL_PREFIX)/bin
cp -rf include/* $(INSTALL_PREFIX)/include
cp -rf $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(INSTALL_PREFIX)/lib
cp -rf $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(OBSERVER_LIBRARY) $(INSTALL_PREFIX)/lib
38 changes: 38 additions & 0 deletions include/observer/executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef OBSERVER_EXECUTOR_H_
#define OBSERVER_EXECUTOR_H_

#include "observer.h"

#pragma GCC visibility push(default)
namespace observer {

/// ִ����
class Executor {
public:
static Executor* NewExecutor();

// ע����Ҫ���е�Observer
virtual bool RegisterObserver(Observer* observer) = 0;

// �����ӿ�
virtual bool Run() = 0;

// �˳��ӿ�
virtual void Quit() = 0;

Executor() {}
virtual ~Executor() {}

private:
Executor(const Executor&);
void operator=(const Executor&);
};

} // namespace observer
#pragma GCC visibility pop

#endif // OBSERVER_EXECUTOR_H_
72 changes: 72 additions & 0 deletions include/observer/observer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef OBSERVER_OBSERVER_H_
#define OBSERVER_OBSERVER_H_

#include <string>
#include <vector>
#include <map>
#include "tera.h"

#pragma GCC visibility push(default)
namespace observer {

struct Column {
std::string table_name;
std::string family;
std::string qualifier;
bool operator<(const Column& other) const {
std::string str1 = table_name + family + qualifier;
std::string str2 = other.table_name + other.family + other.qualifier;
return str1 < str2;
}
};

typedef std::vector<Column> ColumnList;
// <TableName, ColumnList>
typedef std::map<std::string, ColumnList> ColumnMap;

/// ����Tera��������, ʵ�ִ��ģ����������ʵʱ����������
class Observer {
public:
// ����۲���Ψһ��ʾ���Լ����۲���
Observer(const std::string& observer_name, ColumnList& observed_columns);
virtual ~Observer();

// �û�ʵ�ִ˽ӿ��õ��۲����ϱ仯����, ��ɼ���
virtual bool OnNotify(tera::Transaction* t,
tera::Table* table,
const std::string& row,
const Column& column,
const std::string& value,
int64_t timestamp);

// �û�ʵ�ִ˽ӿ�����ʼ������
virtual bool Init();

// �û�ʵ�ִ˽ӿ�����������
virtual bool Close();

// ���֪ͨ
bool Ack(ColumnList& columns, const std::string& row, int64_t timestamp);

// ����֪ͨ, ��������observer
bool Notify(ColumnList& columns, const std::string& row, int64_t timestamp);

std::string GetName() const;
ColumnMap& GetColumnMap();
private:
Observer(const Observer&);
void operator=(const Observer&);

private:
std::string observer_name_;
ColumnMap column_map_;
};

} // namespace observer
#pragma GCC visibility pop

#endif // ONSERVER_OBSERVER_H_
192 changes: 192 additions & 0 deletions src/observer/executor/executor_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <functional>
#include <glog/logging.h>
#include <gflags/gflags.h>
#include "executor_impl.h"

DECLARE_string(observer_tera_flag_file);
DECLARE_int32(observer_proc_thread_num);
DECLARE_int64(observer_proc_pending_num_max);
DECLARE_string(observer_notify_column_name);

namespace observer {

static tera::Client* g_tera_client = NULL;
// static Mutex g_table_mutex;
static TableMap g_table_map;

Executor* Executor::NewExecutor() {
return new ExecutorImpl();
}

ExecutorImpl::ExecutorImpl() : quit_(false),
proc_thread_pool_(new common::ThreadPool(FLAGS_observer_proc_thread_num)),
scanner_(NULL) {
observer_set_.clear();
observer_map_.clear();
reduce_column_map_.clear();
}

ExecutorImpl::~ExecutorImpl() {
}

bool ExecutorImpl::RegisterObserver(Observer* observer) {
observer_set_.insert(observer);

// init tera client
tera::ErrorCode err;
if (NULL == g_tera_client) {
g_tera_client = tera::Client::NewClient(FLAGS_observer_tera_flag_file, &err);
if (tera::ErrorCode::kOK != err.GetType()) {
LOG(ERROR) << "init tera client [" << FLAGS_observer_tera_flag_file << "] failed";
return false;
}
}

ColumnMap& column_map = observer->GetColumnMap();
ColumnMap::iterator it = column_map.begin();
for (; it != column_map.end(); ++it) {
// MutexLock locker(&g_table_mutex);
if (g_table_map.end() == g_table_map.find(it->first)) {
// init table
tera::Table* new_table = g_tera_client->OpenTable(it->first, &err);
if (tera::ErrorCode::kOK != err.GetType()) {
LOG(ERROR) << "open tera table [" << it->first << "] failed";
return false;
}
LOG(INFO) << "open tera table [" << it->first << "] succ";

// build map: <table_name, table>
g_table_map[it->first] = new_table;
}
for (size_t idx = 0; idx < it->second.size(); ++idx) {
// build map: <column, observers>
observer_map_[it->second[idx]].push_back(observer);

// build map: <table, columnset>
reduce_column_map_[g_table_map[it->first]].insert(it->second[idx]);
}
}

return true;
}

bool ExecutorImpl::Run() {
if (0 == observer_map_.size()) {
LOG(ERROR) << "no observer, please register observers first";
return false;
}

// init observers (user definition)
for (ObserverSet::iterator it = observer_set_.begin(); it != observer_set_.end(); ++it) {
(*it)->Init();
}

// init scanner
scanner_ = new Scanner(this);
if (!scanner_->Init()) {
LOG(ERROR) << "init Scanner failed";
Quit();
return false;
}

while (!quit_) {
ThisThread::Sleep(1);
}

// close scanner
if (scanner_ != NULL) {
scanner_->Close();
delete scanner_;
}

// close observers (user definition)
for (ObserverSet::iterator it = observer_set_.begin(); it != observer_set_.end(); ++it) {
(*it)->Close();
}

return true;
}

bool ExecutorImpl::Process(TuplePtr tuple) {
// find observers
ObserverMap::iterator it = observer_map_.find(tuple->observed_column);
if (observer_map_.end() == it) {
LOG(ERROR) << "no match observers, table=" << tuple->observed_column.table_name <<
" cf=" << tuple->observed_column.family << " qu=" << tuple->observed_column.qualifier;
return false;
}

// notify observers
for (size_t idx = 0; idx < it->second.size(); ++idx) {
proc_thread_pool_->AddTask(std::bind(&ExecutorImpl::DoNotify, this, tuple, it->second[idx]));
}

return true;
}

bool ExecutorImpl::DoNotify(TuplePtr tuple, Observer* observer) {
return observer->OnNotify(tuple->t,
tuple->table,
tuple->row,
tuple->observed_column,
tuple->value,
tuple->timestamp);
}

bool ExecutorImpl::ProcTaskPendingFull() {
return (proc_thread_pool_->PendingNum() > FLAGS_observer_proc_pending_num_max);
}

bool ExecutorImpl::SetOrClearNotification(ColumnList& columns,
const std::string& row,
int64_t timestamp,
NotificationType type) {
bool ret = true;
tera::ErrorCode err;
// reduce columns
ColumnReduceMap reduce_map;
for (size_t idx = 0; idx < columns.size(); ++idx) {
// MutexLock locker(&g_table_mutex);
TableMap::iterator it = g_table_map.find(columns[idx].table_name);
if (g_table_map.end() == it) {
tera::Table* new_table = g_tera_client->OpenTable(columns[idx].table_name, &err);
if (tera::ErrorCode::kOK != err.GetType()) {
LOG(ERROR) << "open table failed, name=" << columns[idx].table_name <<
" err=" << err.GetReason();
return false;
}
g_table_map[columns[idx].table_name] = new_table;
}
reduce_map[it->second].insert(columns[idx]);
}
// set or clear notification columns
ColumnReduceMap::iterator table_it = reduce_map.begin();
for (; table_it != reduce_map.end(); ++table_it) {
tera::RowMutation* mutation = table_it->first->NewRowMutation(row);
ColumnSet::iterator column_it = table_it->second.begin();
for (; column_it != table_it->second.end(); ++column_it) {
std::string notify_qualifier = column_it->family + "+" + column_it->qualifier;
if (kSetNotification == type) {
mutation->Put(FLAGS_observer_notify_column_name, notify_qualifier, "1", timestamp);
} else if (kClearNotification == type) {
// ɾ��t->StartTimestamp֮ǰ��֪ͨ���, ����֪ͨ��Ƿ�������������ݶ�ʧ
mutation->DeleteColumns(FLAGS_observer_notify_column_name, notify_qualifier, timestamp);
} else {
LOG(ERROR) << "error notification type=" << type;
}
}
table_it->first->ApplyMutation(mutation);
if (mutation->GetError().GetType() != tera::ErrorCode::kOK) {
LOG(ERROR) << "set or clear notification failed, err=" << mutation->GetError().GetReason();
ret = false;
}
delete mutation;
}
return ret;
}

} // namespace observer
Loading