Skip to content

Commit

Permalink
Add examples for different subscription modes (#9)
Browse files Browse the repository at this point in the history
*Motivation*

Add examples to demonstrate different subscription modes
  • Loading branch information
sijie authored May 29, 2020
1 parent 70c77bd commit a5c225b
Show file tree
Hide file tree
Showing 17 changed files with 534 additions and 23 deletions.
46 changes: 46 additions & 0 deletions .github/workflows/clients.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: Clients
on:
pull_request:
branches:
- master
paths:
- 'clients/*'

jobs:

clients:
runs-on: ubuntu-latest
timeout-minutes: 120

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8

- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
with:
maven-version: 3.6.1

- name: run maven test
run: |
cd clients
mvn license:check checkstyle:check install
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.beust.jcommander.Parameter;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;

/**
* Common flags for a consumer example.
Expand Down Expand Up @@ -48,4 +49,20 @@ public class ConsumerFlags extends TopicFlags {
)
public SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Earliest;

@Parameter(
names = {
"-an", "--ack-every-n-messages"
},
description = "Ack every N messages"
)
public int ackEveryNMessages = 1;

@Parameter(
names = {
"-at", "--ack-type"
},
description = "Ack type"
)
public AckType ackType = null;

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,19 @@
*/
package io.streamnative.examples.common;

import com.beust.jcommander.Parameter;

/**
* Common flags for running a producer example.
*/
public class ProducerFlags extends TopicFlags {

@Parameter(
names = {
"-r", "--messages-rate"
},
description = "Messages rate to produce"
)
public double rate = -1;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.apache.bookkeeper.tools.framework.CliFlags;

/**
* Default example flags
* Default example flags.
*/
public class PulsarClientFlags extends CliFlags {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@ public class TopicFlags extends PulsarClientFlags {
)
public int numMessages = 10;

@Parameter(
names = {
"-nk", "--num-keys"
},
description = "Number of keys"
)
public int numKeys = 128;

}
12 changes: 6 additions & 6 deletions clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,27 @@
<testRetryCount>2</testRetryCount>

<!-- dependencies -->
<avro.version>1.8.2</avro.version>
<bookkeeper.version>4.9.2</bookkeeper.version>
<avro.version>1.9.1</avro.version>
<bookkeeper.version>4.10.0</bookkeeper.version>
<google.code.version>3.0.2</google.code.version>
<google.errorprone.version>2.1.2</google.errorprone.version>
<grpc.version>1.18.0</grpc.version>
<guava.version>21.0</guava.version>
<guava.version>25.1-jre</guava.version>
<hamcrest.version>1.3</hamcrest.version>
<joda.version>2.7</joda.version>
<jmh.version>1.19</jmh.version>
<jmock.version>2.8.2</jmock.version>
<junit.version>4.12</junit.version>
<lombok.version>1.16.22</lombok.version>
<mockito.version>2.22.0</mockito.version>
<netty.version>4.1.32.Final</netty.version>
<netty-boringssl.version>2.0.20.Final</netty-boringssl.version>
<netty.version>4.1.48.Final</netty.version>
<netty-boringssl.version>2.0.30.Final</netty-boringssl.version>
<powermock.version>2.0.0-beta.5</powermock.version>
<protobuf3.version>3.5.1</protobuf3.version>
<protoc3.version>3.5.1-1</protoc3.version>
<protoc-gen-grpc-java.version>1.12.0</protoc-gen-grpc-java.version>
<!-- latest version from apache pulsar master -->
<pulsar.version>2.4.0</pulsar.version>
<pulsar.version>2.5.2</pulsar.version>
<slf4j.version>1.7.25</slf4j.version>
<spotbugs-annotations.version>3.1.8</spotbugs-annotations.version>
<javax-annotations-api.version>1.3.2</javax-annotations-api.version>
Expand Down
22 changes: 22 additions & 0 deletions clients/pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,28 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>pulsar-pubsub-examples</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,9 +28,14 @@

import io.streamnative.examples.common.ConsumerFlags;
import io.streamnative.examples.common.ExampleRunner;
import org.apache.pulsar.client.api.*;

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;

/**
* Example that demonstrates how to use dead letter topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@

import io.streamnative.examples.common.ExampleRunner;
import io.streamnative.examples.common.ProducerFlags;
import org.apache.commons.lang.time.DateUtils;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;

import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
* Example that demonstrates how to use delayed message delivery feature.
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@

import io.streamnative.examples.common.ExampleRunner;
import io.streamnative.examples.common.ProducerFlags;
import java.util.Date;
import org.apache.commons.lang.time.DateUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;

import java.util.Date;

/**
* Example that demonstrates how to use delayed message delivery feature.
**/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,10 +28,11 @@

import io.streamnative.examples.common.ConsumerFlags;
import io.streamnative.examples.common.ExampleRunner;
import org.apache.pulsar.client.api.*;

import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;

/**
* Example that demonstrates how to use delayed message delivery feature.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import io.streamnative.examples.common.ConsumerFlags;
import io.streamnative.examples.common.ExampleRunner;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;

/**
* Example that demonstrates an consumer that consume messages using
Expand Down Expand Up @@ -47,6 +49,7 @@ protected void run(ConsumerFlags flags) throws Exception {

try (PulsarClient client = PulsarClient.builder()
.serviceUrl(flags.binaryServiceUrl)
.operationTimeout(10, TimeUnit.SECONDS)
.build()) {

int numReceived = 0;
Expand All @@ -60,13 +63,24 @@ protected void run(ConsumerFlags flags) throws Exception {

while ((flags.numMessages > 0 && numReceived < flags.numMessages) || flags.numMessages <= 0) {
Message<String> msg = consumer.receive();
System.out.println("Received message : value = '" + msg.getValue()
System.out.println("Received message : key = "
+ (msg.hasKey() ? msg.getKey() : "null")
+ ", value = '" + msg.getValue()
+ "', sequence = " + msg.getSequenceId());

if (msg.getSequenceId() % flags.ackEveryNMessages == 0) {
if (AckType.Individual == flags.ackType) {
consumer.acknowledge(msg);
} else if (AckType.Cumulative == flags.ackType) {
consumer.acknowledgeCumulative(msg);
}
}

++numReceived;
}
System.out.println("Successfully received " + numReceived + " messages");
} catch (PulsarClientException ie) {
ie.printStackTrace();
if (ie.getCause() instanceof InterruptedException) {
System.out.println("Successfully received " + numReceived + " messages");
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package io.streamnative.examples.pubsub;

import com.google.common.util.concurrent.RateLimiter;
import io.streamnative.examples.common.ExampleRunner;
import io.streamnative.examples.common.ProducerFlags;
import java.util.Random;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -24,6 +26,9 @@
* synchronous {@link Producer#send(Object)} method.
**/
public class SyncStringProducerExample extends ExampleRunner<ProducerFlags> {

static final Random RANDOM = new Random(System.currentTimeMillis());

@Override
protected String name() {
return SyncStringProducerExample.class.getSimpleName();
Expand All @@ -41,22 +46,42 @@ protected ProducerFlags flags() {

@Override
protected void run(ProducerFlags flags) throws Exception {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(flags.binaryServiceUrl)
.build()) {
RateLimiter limiter = null;
if (flags.rate > 0) {
limiter = RateLimiter.create(flags.rate);
}

try (PulsarClient client = PulsarClient.builder()
.serviceUrl(flags.binaryServiceUrl)
.build()) {
try (Producer<String> producer = client.newProducer(Schema.STRING)
.enableBatching(false)
.topic(flags.topic)
.create()) {

final int numMessages = Math.max(flags.numMessages, 1);
int num = flags.numMessages;
if (num < 0) {
num = Integer.MAX_VALUE;
}

final int numMessages = Math.max(num, 1);

// publish messages
for (int i = 0; i < numMessages; i++) {
if (limiter != null) {
limiter.acquire();
}

String key = "key-" + RANDOM.nextInt(flags.numKeys);

producer.newMessage()
.key(key)
.value("value-" + i)
.sendAsync();

if ((i + 1) % 100 == 0) {
System.out.println("Sent " + (i + 1) + " messages ...");
}
}
producer.flush();
}
Expand Down
Loading

0 comments on commit a5c225b

Please sign in to comment.