diff --git a/Makefile b/Makefile index 2644902ef..570bf08d9 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,10 @@ MARK_SRC := src/benchmark/mark.cc src/benchmark/mark_main.cc TEST_SRC := src/utils/test/prop_tree_test.cc src/utils/test/tprinter_test.cc \ src/io/test/tablet_io_test.cc src/io/test/tablet_scanner_test.cc \ src/master/test/master_impl_test.cc src/io/test/load_test.cc \ - src/common/test/thread_pool_test.cc + src/common/test/thread_pool_test.cc \ + src/observer/test/observer_impl_test.cc +OBSERVER_SRC := $(wildcard src/observer/executor/*.cc) +OBSERVER_DEMO_SRC := $(wildcard src/observer/observer_demo.cc) TEST_OUTPUT := test_output UNITTEST_OUTPUT := $(TEST_OUTPUT)/unittest @@ -69,14 +72,17 @@ MONITOR_OBJ := $(MONITOR_SRC:.cc=.o) MARK_OBJ := $(MARK_SRC:.cc=.o) HTTP_OBJ := $(HTTP_SRC:.cc=.o) TEST_OBJ := $(TEST_SRC:.cc=.o) +OBSERVER_OBJ := $(OBSERVER_SRC:.cc=.o) +OBSERVER_DEMO_OBJ := $(OBSERVER_DEMO_SRC:.cc=.o) ALL_OBJ := $(MASTER_OBJ) $(TABLETNODE_OBJ) $(IO_OBJ) $(SDK_OBJ) $(PROTO_OBJ) \ $(JNI_TERA_OBJ) $(OTHER_OBJ) $(COMMON_OBJ) $(SERVER_OBJ) $(CLIENT_OBJ) \ $(TEST_CLIENT_OBJ) $(TERA_C_OBJ) $(MONITOR_OBJ) $(MARK_OBJ) $(TEST_OBJ) \ - $(SERVER_WRAPPER_OBJ) + $(SERVER_WRAPPER_OBJ) \ + $(OBSERVER_OBJ) $(OBSERVER_DEMO_OBJ) LEVELDB_LIB := src/leveldb/libleveldb.a LEVELDB_UTIL := src/leveldb/util/histogram.o src/leveldb/port/port_posix.o -PROGRAM = tera_main tera_master tabletserver teracli teramo tera_test +PROGRAM = tera_main tera_master tabletserver teracli teramo tera_test observer_demo LIBRARY = libtera.a SOLIBRARY = libtera.so TERA_C_SO = libtera_c.so @@ -84,14 +90,15 @@ JNILIBRARY = libjni_tera.so BENCHMARK = tera_bench tera_mark TESTS = prop_tree_test tprinter_test string_util_test tablet_io_test \ tablet_scanner_test fragment_test progress_bar_test master_impl_test load_test \ - thread_pool_test + thread_pool_test observer_impl_test +OBSERVER_LIBRARY = libobserver.a .PHONY: all clean cleanall test -all: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) +all: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) $(OBSERVER_LIBRARY) mkdir -p build/include build/lib build/bin build/log build/benchmark cp $(PROGRAM) build/bin - cp $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) build/lib + cp $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(OBSERVER_LIBRARY) build/lib cp src/leveldb/tera_bench . cp -r benchmark/*.sh benchmark/ycsb4tera/ $(BENCHMARK) build/benchmark cp -r include build/ @@ -113,7 +120,7 @@ check: test clean: rm -rf $(ALL_OBJ) $(PROTO_OUT_CC) $(PROTO_OUT_H) $(TEST_OUTPUT) $(MAKE) clean -C src/leveldb - rm -rf $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) $(TESTS) terahttp + rm -rf $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(BENCHMARK) $(TESTS) terahttp $(OBSERVER_LIBRARY) cleanall: $(MAKE) clean @@ -162,7 +169,16 @@ src/leveldb/libleveldb.a: FORCE tera_bench: +libobserver.a: $(OBSERVER_OBJ) + $(AR) -rs $@ $^ + +observer_demo : $(OBSERVER_DEMO_OBJ) $(OBSERVER_LIBRARY) $(LIBRARY) + $(CXX) -o $@ $^ $(LDFLAGS) + # unit test +observer_impl_test: src/observer/test/observer_impl_test.o $(OBSERVER_LIBRARY) $(LIBRARY) + $(CXX) -o $@ $^ $(LDFLAGS) + thread_pool_test: src/common/test/thread_pool_test.o $(LIBRARY) $(CXX) -o $@ $^ $(LDFLAGS) @@ -216,8 +232,8 @@ proto: $(PROTO_OUT_CC) $(PROTO_OUT_H) # install output into system directories .PHONY: install -install: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) +install: $(PROGRAM) $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(OBSERVER_LIBRARY) mkdir -p $(INSTALL_PREFIX)/bin $(INSTALL_PREFIX)/include $(INSTALL_PREFIX)/lib cp -rf $(PROGRAM) $(INSTALL_PREFIX)/bin cp -rf include/* $(INSTALL_PREFIX)/include - cp -rf $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(INSTALL_PREFIX)/lib + cp -rf $(LIBRARY) $(SOLIBRARY) $(TERA_C_SO) $(JNILIBRARY) $(OBSERVER_LIBRARY) $(INSTALL_PREFIX)/lib diff --git a/include/observer/executor.h b/include/observer/executor.h new file mode 100644 index 000000000..7b8acbd4e --- /dev/null +++ b/include/observer/executor.h @@ -0,0 +1,38 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef OBSERVER_EXECUTOR_H_ +#define OBSERVER_EXECUTOR_H_ + +#include "observer.h" + +#pragma GCC visibility push(default) +namespace observer { + +/// 执行器 +class Executor { +public: + static Executor* NewExecutor(); + + // 注册需要运行的Observer + virtual bool RegisterObserver(Observer* observer) = 0; + + // 启动接口 + virtual bool Run() = 0; + + // 退出接口 + virtual void Quit() = 0; + + Executor() {} + virtual ~Executor() {} + +private: + Executor(const Executor&); + void operator=(const Executor&); +}; + +} // namespace observer +#pragma GCC visibility pop + +#endif // OBSERVER_EXECUTOR_H_ diff --git a/include/observer/observer.h b/include/observer/observer.h new file mode 100644 index 000000000..fe5db73de --- /dev/null +++ b/include/observer/observer.h @@ -0,0 +1,72 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef OBSERVER_OBSERVER_H_ +#define OBSERVER_OBSERVER_H_ + +#include +#include +#include +#include "tera.h" + +#pragma GCC visibility push(default) +namespace observer { + +struct Column { + std::string table_name; + std::string family; + std::string qualifier; + bool operator<(const Column& other) const { + std::string str1 = table_name + family + qualifier; + std::string str2 = other.table_name + other.family + other.qualifier; + return str1 < str2; + } +}; + +typedef std::vector ColumnList; +// +typedef std::map ColumnMap; + +/// 基于Tera跨行事务, 实现大规模表格上增量实时触发计算框架 +class Observer { +public: + // 传入观察者唯一标示名以及被观察列 + Observer(const std::string& observer_name, ColumnList& observed_columns); + virtual ~Observer(); + + // 用户实现此接口拿到观察列上变化数据, 完成计算 + virtual bool OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const Column& column, + const std::string& value, + int64_t timestamp); + + // 用户实现此接口做初始化操作 + virtual bool Init(); + + // 用户实现此接口做结束操作 + virtual bool Close(); + + // 清除通知 + bool Ack(ColumnList& columns, const std::string& row, int64_t timestamp); + + // 设置通知, 触发下游observer + bool Notify(ColumnList& columns, const std::string& row, int64_t timestamp); + + std::string GetName() const; + ColumnMap& GetColumnMap(); +private: + Observer(const Observer&); + void operator=(const Observer&); + +private: + std::string observer_name_; + ColumnMap column_map_; +}; + +} // namespace observer +#pragma GCC visibility pop + +#endif // ONSERVER_OBSERVER_H_ diff --git a/src/observer/executor/executor_impl.cc b/src/observer/executor/executor_impl.cc new file mode 100644 index 000000000..033993029 --- /dev/null +++ b/src/observer/executor/executor_impl.cc @@ -0,0 +1,192 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include +#include +#include +#include "executor_impl.h" + +DECLARE_string(observer_tera_flag_file); +DECLARE_int32(observer_proc_thread_num); +DECLARE_int64(observer_proc_pending_num_max); +DECLARE_string(observer_notify_column_name); + +namespace observer { + +static tera::Client* g_tera_client = NULL; +// static Mutex g_table_mutex; +static TableMap g_table_map; + +Executor* Executor::NewExecutor() { + return new ExecutorImpl(); +} + +ExecutorImpl::ExecutorImpl() : quit_(false), + proc_thread_pool_(new common::ThreadPool(FLAGS_observer_proc_thread_num)), + scanner_(NULL) { + observer_set_.clear(); + observer_map_.clear(); + reduce_column_map_.clear(); +} + +ExecutorImpl::~ExecutorImpl() { +} + +bool ExecutorImpl::RegisterObserver(Observer* observer) { + observer_set_.insert(observer); + + // init tera client + tera::ErrorCode err; + if (NULL == g_tera_client) { + g_tera_client = tera::Client::NewClient(FLAGS_observer_tera_flag_file, &err); + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "init tera client [" << FLAGS_observer_tera_flag_file << "] failed"; + return false; + } + } + + ColumnMap& column_map = observer->GetColumnMap(); + ColumnMap::iterator it = column_map.begin(); + for (; it != column_map.end(); ++it) { + // MutexLock locker(&g_table_mutex); + if (g_table_map.end() == g_table_map.find(it->first)) { + // init table + tera::Table* new_table = g_tera_client->OpenTable(it->first, &err); + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "open tera table [" << it->first << "] failed"; + return false; + } + LOG(INFO) << "open tera table [" << it->first << "] succ"; + + // build map: + g_table_map[it->first] = new_table; + } + for (size_t idx = 0; idx < it->second.size(); ++idx) { + // build map: + observer_map_[it->second[idx]].push_back(observer); + + // build map: + reduce_column_map_[g_table_map[it->first]].insert(it->second[idx]); + } + } + + return true; +} + +bool ExecutorImpl::Run() { + if (0 == observer_map_.size()) { + LOG(ERROR) << "no observer, please register observers first"; + return false; + } + + // init observers (user definition) + for (ObserverSet::iterator it = observer_set_.begin(); it != observer_set_.end(); ++it) { + (*it)->Init(); + } + + // init scanner + scanner_ = new Scanner(this); + if (!scanner_->Init()) { + LOG(ERROR) << "init Scanner failed"; + Quit(); + return false; + } + + while (!quit_) { + ThisThread::Sleep(1); + } + + // close scanner + if (scanner_ != NULL) { + scanner_->Close(); + delete scanner_; + } + + // close observers (user definition) + for (ObserverSet::iterator it = observer_set_.begin(); it != observer_set_.end(); ++it) { + (*it)->Close(); + } + + return true; +} + +bool ExecutorImpl::Process(TuplePtr tuple) { + // find observers + ObserverMap::iterator it = observer_map_.find(tuple->observed_column); + if (observer_map_.end() == it) { + LOG(ERROR) << "no match observers, table=" << tuple->observed_column.table_name << + " cf=" << tuple->observed_column.family << " qu=" << tuple->observed_column.qualifier; + return false; + } + + // notify observers + for (size_t idx = 0; idx < it->second.size(); ++idx) { + proc_thread_pool_->AddTask(std::bind(&ExecutorImpl::DoNotify, this, tuple, it->second[idx])); + } + + return true; +} + +bool ExecutorImpl::DoNotify(TuplePtr tuple, Observer* observer) { + return observer->OnNotify(tuple->t, + tuple->table, + tuple->row, + tuple->observed_column, + tuple->value, + tuple->timestamp); +} + +bool ExecutorImpl::ProcTaskPendingFull() { + return (proc_thread_pool_->PendingNum() > FLAGS_observer_proc_pending_num_max); +} + +bool ExecutorImpl::SetOrClearNotification(ColumnList& columns, + const std::string& row, + int64_t timestamp, + NotificationType type) { + bool ret = true; + tera::ErrorCode err; + // reduce columns + ColumnReduceMap reduce_map; + for (size_t idx = 0; idx < columns.size(); ++idx) { + // MutexLock locker(&g_table_mutex); + TableMap::iterator it = g_table_map.find(columns[idx].table_name); + if (g_table_map.end() == it) { + tera::Table* new_table = g_tera_client->OpenTable(columns[idx].table_name, &err); + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "open table failed, name=" << columns[idx].table_name << + " err=" << err.GetReason(); + return false; + } + g_table_map[columns[idx].table_name] = new_table; + } + reduce_map[it->second].insert(columns[idx]); + } + // set or clear notification columns + ColumnReduceMap::iterator table_it = reduce_map.begin(); + for (; table_it != reduce_map.end(); ++table_it) { + tera::RowMutation* mutation = table_it->first->NewRowMutation(row); + ColumnSet::iterator column_it = table_it->second.begin(); + for (; column_it != table_it->second.end(); ++column_it) { + std::string notify_qualifier = column_it->family + "+" + column_it->qualifier; + if (kSetNotification == type) { + mutation->Put(FLAGS_observer_notify_column_name, notify_qualifier, "1", timestamp); + } else if (kClearNotification == type) { + // 删除t->StartTimestamp之前的通知标记, 避免通知标记发生变更引起数据丢失 + mutation->DeleteColumns(FLAGS_observer_notify_column_name, notify_qualifier, timestamp); + } else { + LOG(ERROR) << "error notification type=" << type; + } + } + table_it->first->ApplyMutation(mutation); + if (mutation->GetError().GetType() != tera::ErrorCode::kOK) { + LOG(ERROR) << "set or clear notification failed, err=" << mutation->GetError().GetReason(); + ret = false; + } + delete mutation; + } + return ret; +} + +} // namespace observer diff --git a/src/observer/executor/executor_impl.h b/src/observer/executor/executor_impl.h new file mode 100644 index 000000000..32a2732ba --- /dev/null +++ b/src/observer/executor/executor_impl.h @@ -0,0 +1,88 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef OBSERVER_EXECUTOR_IMPL_H_ +#define OBSERVER_EXECUTOR_IMPL_H_ + +#include +#include "common/base/scoped_ptr.h" +#include "common/thread_pool.h" +#include "common/this_thread.h" +#include "tuple.h" +#include "scanner.h" +#include "observer/executor.h" + +namespace observer { + +enum NotificationType { + kSetNotification = 1, + kClearNotification = 2, +}; + +typedef std::set ObserverSet; +typedef std::vector ObserverList; + +// +typedef std::map ObserverMap; +// +typedef std::map TableMap; +// +typedef std::map ColumnReduceMap; + +class Scanner; + +class ExecutorImpl : public Executor { +public: + ExecutorImpl(); + virtual ~ExecutorImpl(); + + // 注册需要运行的Observer + virtual bool RegisterObserver(Observer* observer); + + // 启动接口 + virtual bool Run(); + + // 退出接口 + virtual void Quit() { + quit_ = true; + } + + bool Process(TuplePtr tuple); + + bool ProcTaskPendingFull(); + + bool GetQuit() const { + return quit_; + } + + ColumnReduceMap& GetColumnReduceMap() { + return reduce_column_map_; + } + + static bool SetOrClearNotification(ColumnList& columns, + const std::string& row, + int64_t timestamp, + NotificationType type); + +private: + ExecutorImpl(const ExecutorImpl&); + void operator=(const ExecutorImpl&); + + bool DoNotify(TuplePtr tuple, Observer* observer); + +private: + volatile bool quit_; + scoped_ptr proc_thread_pool_; + Scanner* scanner_; + + ObserverSet observer_set_; + // 每个列对应多个Observer + ObserverMap observer_map_; + // 每个table对应多个被观察列 + ColumnReduceMap reduce_column_map_; +}; + +} // namespace observer + +#endif // OBSERVER_EXECUTOR_H_ diff --git a/src/observer/executor/observer_impl.cc b/src/observer/executor/observer_impl.cc new file mode 100644 index 000000000..30eab75d6 --- /dev/null +++ b/src/observer/executor/observer_impl.cc @@ -0,0 +1,53 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "observer/observer.h" +#include "executor_impl.h" + +namespace observer { + +Observer::Observer(const std::string& observer_name, + ColumnList& observed_columns): observer_name_(observer_name) { + for (size_t idx = 0; idx < observed_columns.size(); ++idx) { + column_map_[observed_columns[idx].table_name].push_back(observed_columns[idx]); + } +} + +Observer::~Observer() { +} + +bool Observer::OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const Column& column, + const std::string& value, + int64_t timestamp) { + return true; +} + +bool Observer::Init() { + return true; +} + +bool Observer::Close() { + return true; +} + +std::string Observer::GetName() const { + return observer_name_; +} + +ColumnMap& Observer::GetColumnMap() { + return column_map_; +} + +bool Observer::Ack(ColumnList& columns, const std::string& row, int64_t timestamp) { + return ExecutorImpl::SetOrClearNotification(columns, row, timestamp, kClearNotification); +} + +bool Observer::Notify(ColumnList& columns, const std::string& row, int64_t timestamp) { + return ExecutorImpl::SetOrClearNotification(columns, row, timestamp, kSetNotification); +} + +} // namespace observer diff --git a/src/observer/executor/scanner.cc b/src/observer/executor/scanner.cc new file mode 100644 index 000000000..29a02b9b6 --- /dev/null +++ b/src/observer/executor/scanner.cc @@ -0,0 +1,195 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include +#include +#include +#include "scanner.h" + +DECLARE_int32(observer_scan_thread_num); +DECLARE_bool(observer_scan_async_switch); +DECLARE_int32(observer_read_thread_num); +DECLARE_string(observer_notify_column_name); + +namespace observer { + +Scanner::Scanner(ExecutorImpl* executor_impl) : executor_impl_(executor_impl), + read_thread_pool_(new common::ThreadPool(FLAGS_observer_read_thread_num)) { +} + +Scanner::~Scanner() { +} + +bool Scanner::Init() { + ColumnReduceMap& column_map = executor_impl_->GetColumnReduceMap(); + if (FLAGS_observer_scan_thread_num < (int)column_map.size()) { + LOG(ERROR) << "some tables can't be scaned, scanner_num=" << + FLAGS_observer_scan_thread_num << " table_num=" << column_map.size(); + } + + // 启动scan线程 + ColumnReduceMap::iterator it = column_map.begin(); + scan_thread_list_.resize(FLAGS_observer_scan_thread_num); + for (size_t idx = 0; idx < scan_thread_list_.size(); ++idx) { + scan_thread_list_[idx].Start(std::bind(&Scanner::DoScan, this, it->first, it->second)); + if (column_map.end() == ++it) { + it = column_map.begin(); + } + } + + return true; +} + +bool Scanner::Close() { + for (size_t idx = 0; idx < scan_thread_list_.size(); ++idx) { + scan_thread_list_[idx].Join(); + } + return true; +} + +bool Scanner::ParseNotifyQualifier(const std::string& notify_qualifier, + std::string* data_family, + std::string* data_qualifier) { + // = + + std::string delim = "+"; + std::vector frags; + std::size_t pos = std::string::npos; + std::size_t start_pos = 0; + std::string frag = ""; + while (std::string::npos != (pos = notify_qualifier.find(delim, start_pos))) { + frag = notify_qualifier.substr(start_pos, pos - start_pos); + frags.push_back(frag); + start_pos = pos + 1; + } + std::size_t str_len = notify_qualifier.length(); + if (start_pos <= str_len - 1) { + frag = notify_qualifier.substr(start_pos, str_len - start_pos); + frags.push_back(frag); + } + + if (2 != frags.size()) { + return false; + } + *data_family = frags[0]; + *data_qualifier = frags[1]; + return true; +} + +void Scanner::DoScan(tera::Table* table, ColumnSet& column_set) { + std::string start_key; + RandomStartKey(table, &start_key); + while (true) { + if (executor_impl_->GetQuit()) { + return; + } + if (!ScanTable(table, column_set, start_key, "")) { + // scan失败重新随机选startkey + RandomStartKey(table, &start_key); + } else { + // scan正常结束从表头继续 + start_key = ""; + } + ThisThread::Sleep(1); + } +} + +bool Scanner::ScanTable(tera::Table* table, + ColumnSet& column_set, + const std::string& start_key, + const std::string& end_key) { + bool ret = true; + tera::ScanDescriptor desc(start_key); + desc.SetAsync(FLAGS_observer_scan_async_switch); + // Notify列存储在单独lg + desc.AddColumnFamily(FLAGS_observer_notify_column_name); + tera::ErrorCode err; + tera::ResultStream* result_stream = table->Scan(desc, &err); + if (NULL == result_stream || tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "table scan init failed"; + return false; + } + while (!result_stream->Done(&err)) { + if (executor_impl_->GetQuit()) { + ret = true; + break; + } + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "table scanning failed"; + ret = false; + break; + } + + // 控制scanner给observer发送数据的速度 + while (executor_impl_->ProcTaskPendingFull()) { + if (executor_impl_->GetQuit()) { + return true; + } + ThisThread::Sleep(1); + } + + // todo: try lock row + + std::string ob_family; + std::string ob_qualifier; + std::string rowkey = result_stream->RowName(); + // 遍历cell + while (result_stream->RowName() == rowkey) { + if (ParseNotifyQualifier(result_stream->Qualifier(), &ob_family, &ob_qualifier)) { + Column ob_column = {table->GetName(), ob_family, ob_qualifier}; + if (column_set.end() != column_set.find(ob_column)) { + TuplePtr tuple(new Tuple()); + // 创建跨行事务 + tuple->t = tera::NewTransaction(); + tuple->table = table; + tuple->row = rowkey; + tuple->observed_column = ob_column; + read_thread_pool_->AddTask(std::bind(&Scanner::DoReadValue, this, tuple)); + } else { + LOG(ERROR) << "miss observed column, table_name" << table->GetName() << + " cf=" << ob_family << " qu=" << ob_qualifier; + } + } else { + LOG(ERROR) << "parse notify qualifier failed: " << result_stream->Qualifier(); + } + + result_stream->Next(); + if (result_stream->Done(&err)) { + break; + } + } + } + delete result_stream; + return ret; +} + +bool Scanner::DoReadValue(TuplePtr tuple) { + // todo: if observers conflict on Ack column, return + + bool ret = true; + tera::RowReader* row_reader = tuple->table->NewRowReader(tuple->row); + row_reader->AddColumn(tuple->observed_column.family, tuple->observed_column.qualifier); + // 事务读 + tuple->t->Get(row_reader); + if (tera::ErrorCode::kOK == row_reader->GetError().GetType()) { + tuple->value = row_reader->Value(); + tuple->timestamp = row_reader->Timestamp(); + // 触发observer计算 + executor_impl_->Process(tuple); + } else { + LOG(ERROR) << "[read failed] cf=" << tuple->observed_column.family << + " qu=" << tuple->observed_column.qualifier << " row=" << tuple->row << + " err=" << row_reader->GetError().GetReason(); + ret = false; + } + delete row_reader; + return ret; +} + +bool Scanner::RandomStartKey(tera::Table* table, std::string* Key) { + // todo + *Key = ""; + return true; +} + +} // namespace observer diff --git a/src/observer/executor/scanner.h b/src/observer/executor/scanner.h new file mode 100644 index 000000000..462e84317 --- /dev/null +++ b/src/observer/executor/scanner.h @@ -0,0 +1,54 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef OBSERVER_SCANNER_H_ +#define OBSERVER_SCANNER_H_ + +#include "common/thread.h" +#include "executor_impl.h" + +namespace observer { + +class ExecutorImpl; + +class Scanner { +public: + Scanner(ExecutorImpl* executor_impl); + ~Scanner(); + + bool Init(); + + bool Close(); + + // 执行scan操作 + void DoScan(tera::Table* table, ColumnSet& column_set); + +private: + Scanner(const Scanner&); + void operator=(const Scanner&); + + // 数据列family+qualifier构成通知列的qualifier + bool ParseNotifyQualifier(const std::string& notify_qualifier, + std::string* data_family, + std::string* data_qualfier); + + // table的一次scan + bool ScanTable(tera::Table* table, + ColumnSet& column_set, + const std::string& start_key, + const std::string& end_key); + + bool DoReadValue(TuplePtr tuple); + + bool RandomStartKey(tera::Table* table, std::string* Key); + +private: + ExecutorImpl* executor_impl_; + std::vector scan_thread_list_; + scoped_ptr read_thread_pool_; +}; + +} // namespace observer + +#endif // OBSERVER_SCANNER_H_ diff --git a/src/observer/executor/tuple.h b/src/observer/executor/tuple.h new file mode 100644 index 000000000..3cd7a1616 --- /dev/null +++ b/src/observer/executor/tuple.h @@ -0,0 +1,45 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef OBSERVER_TUPLE_H_ +#define OBSERVER_TUPLE_H_ + +#include +#include +#include +#include "tera.h" +#include "observer/observer.h" + +namespace observer { + +typedef std::set ColumnSet; + +struct Tuple { + Tuple() : t(NULL), table(NULL) {} + ~Tuple() { + // release Transaction + if (t) { + delete t; + } + } + // 跨行事务 + tera::Transaction* t; + // Tera表 + tera::Table* table; + // 行Key + std::string row; + // 被观察列 + Column observed_column; + // 列值 + std::string value; + // 时间戳 + int64_t timestamp; +}; + +typedef boost::shared_ptr TuplePtr; +typedef std::vector Tuples; + +} // namespace observer + +#endif // OBSERVER_TUPLE_H_ diff --git a/src/observer/observer_demo.cc b/src/observer/observer_demo.cc new file mode 100644 index 000000000..2561ab4c3 --- /dev/null +++ b/src/observer/observer_demo.cc @@ -0,0 +1,140 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include +#include +#include "observer/executor.h" +#include "observer/observer.h" + +namespace observer { +namespace demo { + +/// Parser Worker /// +class Parser : public observer::Observer { +public: + Parser(const std::string& observer_name, + observer::ColumnList& observed_columns): + observer::Observer(observer_name, observed_columns) {} + virtual ~Parser() {} + virtual bool OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const observer::Column& column, + const std::string& value, + int64_t timestamp); +}; + +bool Parser::OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const observer::Column& column, + const std::string& value, + int64_t timestamp) { + LOG(INFO) << "[Notify Parser] table:family:qualifer=" << + column.table_name << ":" << column.family << ":" << + column.qualifier << " row=" << row << + " value=" << value << " timestamp=" << timestamp; + + // todo: read other columns ... + // write ForwordIndex column + tera::RowMutation* mutation = table->NewRowMutation(row); + mutation->Put("Data", "ForwordIndex", "FIValue_" + row); + t->ApplyMutation(mutation); + // t->Notify() + // t->Ack() + t->Commit(); + delete mutation; + + // notify downstream observers, equal to t->Notify() + observer::ColumnList notify_columns; + observer::Column c1 = {"observer_test_table", "Data", "ForwordIndex"}; + notify_columns.push_back(c1); + // GetStartTimestamp接口暂不支持 + // Notify(notify_columns, row, t->GetStartTimestamp()); + Notify(notify_columns, row, -1); + + // clear notification, equal to t->Ack() + observer::ColumnList ack_columns; + observer::Column c2 = {"observer_test_table", "Data", "Page"}; + observer::Column c3 = {"observer_test_table", "Data", "Link"}; + // ... + ack_columns.push_back(c2); + ack_columns.push_back(c3); + // Ack(ack_columns, row, t->GetStartTimestamp()); + Ack(ack_columns, row, -1); + + return true; +} + +/// Builder Worker /// +class Builder : public observer::Observer { +public: + Builder(const std::string& observer_name, + observer::ColumnList& observed_columns): + observer::Observer(observer_name, observed_columns) {} + virtual ~Builder() {} + virtual bool OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const observer::Column& column, + const std::string& value, + int64_t timestamp); +}; + +bool Builder::OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const observer::Column& column, + const std::string& value, + int64_t timestamp) { + LOG(INFO) << "[Notify Builder] table:family:qualifer=" << column.table_name << ":" << + column.family << ":" << column.qualifier << " row=" << row << + " value=" << value << " timestamp=" << timestamp; + + // todo: read other columns ... + // todo: write InvertIndex columns ... + // t->Notify() + // t->Ack() + // t->Commit(); + + // clear notification, equal to t->Ack() + observer::ColumnList ack_columns; + observer::Column c1 = {"observer_test_table", "Data", "ForwordIndex"}; + ack_columns.push_back(c1); + // Ack(ack_columns, row, t->GetStartTimestamp()); + Ack(ack_columns, row, -1); + + return true; +} + +} // namespace demo +} // namespace observer + +int main(int argc, char** argv) { + google::ParseCommandLineFlags(&argc, &argv, true); + + // 创建observer + // 传入唯一标示名和需要观察的列(table_name+family+qualifier) + observer::ColumnList columns; + observer::Column c1 = {"observer_test_table", "Data", "Page"}; + observer::Column c2 = {"observer_test_table", "Data", "Link"}; + columns.push_back(c1); + columns.push_back(c2); + observer::demo::Parser parser("Parser", columns); + + columns.clear(); + observer::Column c3 = {"observer_test_table", "Data", "ForwordIndex"}; + columns.push_back(c3); + observer::demo::Builder builder("Builder", columns); + + // 注册并启动observers + observer::Executor* executor = observer::Executor::NewExecutor(); + executor->RegisterObserver(&parser); + executor->RegisterObserver(&builder); + executor->Run(); + + return 0; +} + +/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/src/observer/test/observer_impl_test.cc b/src/observer/test/observer_impl_test.cc new file mode 100644 index 000000000..7eabfd450 --- /dev/null +++ b/src/observer/test/observer_impl_test.cc @@ -0,0 +1,113 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include +#include +#include +#include +#include "observer/executor.h" +#include "observer/observer.h" +#include "utils/utils_cmd.h" +#include "common/thread.h" + +namespace observer { + +class TestWorker : public Observer { +public: + TestWorker(const std::string& observer_name, ColumnList& observed_columns): + Observer(observer_name, observed_columns), counter_(0), notified_(false) {} + virtual ~TestWorker() {} + virtual bool OnNotify(tera::Transaction* t, + tera::Table* table, + const std::string& row, + const Column& column, + const std::string& value, + int64_t timestamp) { + row_ = row; + column_ = column; + value_ = value; + timestamp_ = timestamp; + notified_ = true; + + ++counter_; + + observer::ColumnList ack_columns; + observer::Column c = {"observer_test_table", "Data", "Page"}; + ack_columns.push_back(c); + Ack(ack_columns, row, -1); + + return true; + } + std::string row_; + Column column_; + std::string value_; + int64_t timestamp_; + std::atomic counter_; + std::atomic notified_; +}; + +class ObserverImplTest : public ::testing::Test { +public: + void OnNotifyTest() { + tera::ErrorCode err; + tera::Client* client = tera::Client::NewClient("./tera.flag", &err); + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "new client failed"; + return; + } + tera::Table* table = client->OpenTable("observer_test_table", &err); + if (tera::ErrorCode::kOK != err.GetType()) { + LOG(ERROR) << "open table failed"; + return; + } + tera::Transaction* t = tera::NewTransaction(); + assert(t != NULL); + tera::RowMutation* mu0 = table->NewRowMutation("www.baidu.com"); + mu0->Put("Data", "Page", "hello world"); + t->ApplyMutation(mu0); + t->Commit(); + tera::RowMutation* mu1 = table->NewRowMutation("www.baidu.com"); + // mu1->Put("Notify", "Data+Page", "1", t->GetStartTimestamp()); + mu1->Put("Notify", "Data+Page", "1", -1); + table->ApplyMutation(mu1); + delete t; + delete mu0; + delete mu1; + + ColumnList columns; + Column c = {"observer_test_table", "Data", "Page"}; + columns.push_back(c); + TestWorker worker("TestWorker", columns); + + Executor* executor = Executor::NewExecutor(); + executor->RegisterObserver(&worker); + run_thread_.Start(std::bind(&ObserverImplTest::DoRun, this, executor)); + while (!worker.notified_) { + sleep(1); + } + executor->Quit(); + EXPECT_TRUE(("www.baidu.com" == worker.row_) && + ("observer_test_table" == worker.column_.table_name) && + ("Data" == worker.column_.family) && + ("Page" == worker.column_.qualifier) && + ("hello world" == worker.value_)); + } +private: + void DoRun(Executor* executor) { + executor->Run(); + } + common::Thread run_thread_; +}; + +TEST_F(ObserverImplTest, OnNotifyTest) { + OnNotifyTest(); +} + +} // namespace observer + +int main(int argc, char** argv) { + ::google::ParseCommandLineFlags(&argc, &argv, true); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/tera_flags.cc b/src/tera_flags.cc index efe0201e0..628bc5653 100644 --- a/src/tera_flags.cc +++ b/src/tera_flags.cc @@ -281,3 +281,12 @@ DEFINE_int64(tera_sdk_status_timeout, 600, "(s) check tablet/tabletnode status t DEFINE_string(tera_http_port, "8657", "the http proxy port of tera"); DEFINE_int32(tera_http_request_thread_num, 30, "the http proxy thread num for handle client request"); DEFINE_int32(tera_http_ctrl_thread_num, 10, "the http proxy thread num for it self"); + +//////// observer /////// +DEFINE_string(observer_tera_flag_file, "./tera.flag", "tera flag file"); +DEFINE_int32(observer_proc_thread_num, 100, "the max number of user process thread"); +DEFINE_int64(observer_proc_pending_num_max, 10000, "the max number of pending process task"); +DEFINE_int32(observer_scan_thread_num, 1, "the max number of table scan thread"); +DEFINE_bool(observer_scan_async_switch, false, "enable to async scan table"); +DEFINE_int32(observer_read_thread_num, 200, "the max number of table read thread"); +DEFINE_string(observer_notify_column_name, "Notify", "the column family name of notification");