Skip to content

Commit

Permalink
rename ConsumeThreadCount to ConsumeThreadNum
Browse files Browse the repository at this point in the history
  • Loading branch information
ifplusor committed Oct 5, 2019
1 parent c66fab1 commit b3a95cf
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 12 deletions.
2 changes: 1 addition & 1 deletion example/OrderlyPushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ int main(int argc, char* argv[]) {
consumer.setGroupName(info.groupname);
consumer.setTcpTransportTryLockTimeout(1000);
consumer.setTcpTransportConnectTimeout(400);
consumer.setConsumeThreadCount(info.thread_count);
consumer.setConsumeThreadNum(info.thread_count);
consumer.setConsumeMessageBatchMaxSize(31);
consumer.setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(info.topic, "*");
Expand Down
2 changes: 1 addition & 1 deletion example/PushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ int main(int argc, char* argv[]) {
consumer.setGroupName(info.groupname);
consumer.setTcpTransportTryLockTimeout(1000);
consumer.setTcpTransportConnectTimeout(400);
consumer.setConsumeThreadCount(info.thread_count);
consumer.setConsumeThreadNum(info.thread_count);
consumer.setConsumeFromWhere(CONSUME_FROM_LAST_OFFSET);

if (info.broadcasting) {
Expand Down
10 changes: 5 additions & 5 deletions include/DefaultMQPushConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumerConfig : public DefaultMQConsumerC
/**
* consuming thread count, default value is cpu cores
*/
int getConsumeThreadCount() const { return m_consumeThreadCount; }
void setConsumeThreadCount(int threadCount) {
if (threadCount > 0) {
m_consumeThreadCount = threadCount;
int getConsumeThreadNum() const { return m_consumeThreadNum; }
void setConsumeThreadNum(int threadNum) {
if (threadNum > 0) {
m_consumeThreadNum = threadNum;
}
}

Expand Down Expand Up @@ -82,7 +82,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumerConfig : public DefaultMQConsumerC
protected:
ConsumeFromWhere m_consumeFromWhere;

int m_consumeThreadCount;
int m_consumeThreadNum;
int m_consumeMessageBatchMaxSize;
int m_maxMsgCacheSize;

Expand Down
8 changes: 4 additions & 4 deletions src/consumer/DefaultMQPushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class AsyncPullCallback : public AutoDeletePullCallback {

DefaultMQPushConsumerConfig::DefaultMQPushConsumerConfig()
: m_consumeFromWhere(CONSUME_FROM_LAST_OFFSET),
m_consumeThreadCount(std::min(8, (int)std::thread::hardware_concurrency())),
m_consumeThreadNum(std::min(8, (int)std::thread::hardware_concurrency())),
m_consumeMessageBatchMaxSize(1),
m_maxMsgCacheSize(1000),
m_asyncPullTimeout(30 * 1000),
Expand Down Expand Up @@ -247,13 +247,13 @@ void DefaultMQPushConsumer::start() {
if (m_messageListener->getMessageListenerType() == messageListenerOrderly) {
LOG_INFO_NEW("start orderly consume service: {}", getGroupName());
m_consumeOrderly = true;
m_consumerService.reset(new ConsumeMessageOrderlyService(this, m_consumeThreadCount, m_messageListener));
m_consumerService.reset(new ConsumeMessageOrderlyService(this, m_consumeThreadNum, m_messageListener));
} else {
// for backward compatible, defaultly and concurrently listeners are allocating
// ConsumeMessageConcurrentlyService
LOG_INFO_NEW("start concurrently consume service: {}", getGroupName());
m_consumeOrderly = false;
m_consumerService.reset(new ConsumeMessageConcurrentlyService(this, m_consumeThreadCount, m_messageListener));
m_consumerService.reset(new ConsumeMessageConcurrentlyService(this, m_consumeThreadNum, m_messageListener));
}
m_consumerService->start();

Expand Down Expand Up @@ -552,7 +552,7 @@ ConsumerRunningInfo* DefaultMQPushConsumer::consumerRunningInfo() {
auto* info = new ConsumerRunningInfo();

info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, UtilAll::to_string(m_consumeOrderly));
info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string(m_consumeThreadCount));
info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string(m_consumeThreadNum));
info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string(m_startTime));

auto subSet = subscriptions();
Expand Down
2 changes: 1 addition & 1 deletion src/extern/CPushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ int SetPushConsumerThreadCount(CPushConsumer* consumer, int threadCount) {
if (consumer == NULL || threadCount == 0) {
return NULL_POINTER;
}
((DefaultMQPushConsumer*)consumer)->setConsumeThreadCount(threadCount);
((DefaultMQPushConsumer*)consumer)->setConsumeThreadNum(threadCount);
return OK;
}

Expand Down

0 comments on commit b3a95cf

Please sign in to comment.