Download
Contributor
封仲淹(@longdafeng)
陈昱(@cycyyy) 刘键(@bastiliu)
方孝健(@hustfxj)
李鑫(@tumen)
母延年(@muyannian)
周鑫(@@zhouxinxust)
罗实(@luoshi0801)
Getting help
Google Groups: jstorm-user
QQ群:228374502
JStorm is a distributed realtime computation system. It is very similar to Storm, but provide a lot of advanced features, at the same time, it is more stable than Storm.
Chinese Documentation and tutorials can be found on the 中文文档.
English Document and tutorials can be found on the English JStorm Documentation
封仲淹(@longdafeng)
陈昱(@cycyyy) 刘键(@bastiliu)
方孝健(@hustfxj)
李鑫(@tumen)
母延年(@muyannian)
周鑫(@@zhouxinxust)
罗实(@luoshi0801)
Google Groups: jstorm-user
QQ群:228374502
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,89 +1,89 @@ | ||
package com.alipay.dw.jstorm.example; | ||
|
||
import java.io.Serializable; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
import org.apache.log4j.Logger; | ||
|
||
public class TpsCounter implements Serializable{ | ||
|
||
private static final long serialVersionUID = 2177944366059817622L; | ||
private AtomicLong total = new AtomicLong(0); | ||
private AtomicLong times = new AtomicLong(0); | ||
private AtomicLong values = new AtomicLong(0); | ||
|
||
private IntervalCheck intervalCheck; | ||
|
||
private final String id; | ||
private final Logger LOG; | ||
|
||
public TpsCounter() { | ||
this("", TpsCounter.class); | ||
} | ||
|
||
public TpsCounter(String id) { | ||
this(id, TpsCounter.class); | ||
} | ||
|
||
public TpsCounter(Class tclass) { | ||
this("", tclass); | ||
|
||
} | ||
|
||
public TpsCounter(String id, Class tclass) { | ||
this.id = id; | ||
this.LOG = Logger.getLogger(tclass); | ||
|
||
intervalCheck = new IntervalCheck(); | ||
intervalCheck.setInterval(60); | ||
} | ||
|
||
public Double count(long value) { | ||
long totalValue = total.incrementAndGet(); | ||
long timesValue = times.incrementAndGet(); | ||
long v = values.addAndGet(value); | ||
|
||
Double pass = intervalCheck.checkAndGet(); | ||
if (pass != null) { | ||
times.set(0); | ||
values.set(0); | ||
|
||
Double tps = timesValue / pass; | ||
|
||
StringBuilder sb = new StringBuilder(); | ||
sb.append(id); | ||
sb.append(", tps:" + tps); | ||
sb.append(", avg:" + ((double) v) / timesValue); | ||
sb.append(", total:" + totalValue); | ||
LOG.info(sb.toString()); | ||
|
||
return tps; | ||
} | ||
|
||
return null; | ||
} | ||
|
||
public Double count() { | ||
return count(Long.valueOf(1)); | ||
} | ||
|
||
public void cleanup() { | ||
|
||
LOG.info(id + ", total:" + total); | ||
} | ||
|
||
|
||
|
||
public IntervalCheck getIntervalCheck() { | ||
return intervalCheck; | ||
} | ||
|
||
/** | ||
* @param args | ||
*/ | ||
public static void main(String[] args) { | ||
// TODO Auto-generated method stub | ||
|
||
} | ||
|
||
} | ||
package com.alipay.dw.jstorm.example; | ||
|
||
import java.io.Serializable; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
import org.apache.log4j.Logger; | ||
|
||
public class TpsCounter implements Serializable{ | ||
|
||
private static final long serialVersionUID = 2177944366059817622L; | ||
private AtomicLong total = new AtomicLong(0); | ||
private AtomicLong times = new AtomicLong(0); | ||
private AtomicLong values = new AtomicLong(0); | ||
|
||
private IntervalCheck intervalCheck; | ||
|
||
private final String id; | ||
private final Logger LOG; | ||
|
||
public TpsCounter() { | ||
this("", TpsCounter.class); | ||
} | ||
|
||
public TpsCounter(String id) { | ||
this(id, TpsCounter.class); | ||
} | ||
|
||
public TpsCounter(Class tclass) { | ||
this("", tclass); | ||
|
||
} | ||
|
||
public TpsCounter(String id, Class tclass) { | ||
this.id = id; | ||
this.LOG = Logger.getLogger(tclass); | ||
|
||
intervalCheck = new IntervalCheck(); | ||
intervalCheck.setInterval(60); | ||
} | ||
|
||
public Double count(long value) { | ||
long totalValue = total.incrementAndGet(); | ||
long timesValue = times.incrementAndGet(); | ||
long v = values.addAndGet(value); | ||
|
||
Double pass = intervalCheck.checkAndGet(); | ||
if (pass != null) { | ||
times.set(0); | ||
values.set(0); | ||
|
||
Double tps = timesValue / pass; | ||
|
||
StringBuilder sb = new StringBuilder(); | ||
sb.append(id); | ||
sb.append(", tps:" + tps); | ||
sb.append(", avg:" + ((double) v) / timesValue); | ||
sb.append(", total:" + totalValue); | ||
LOG.info(sb.toString()); | ||
|
||
return tps; | ||
} | ||
|
||
return null; | ||
} | ||
|
||
public Double count() { | ||
return count(Long.valueOf(1)); | ||
} | ||
|
||
public void cleanup() { | ||
|
||
LOG.info(id + ", total:" + total); | ||
} | ||
|
||
|
||
|
||
public IntervalCheck getIntervalCheck() { | ||
return intervalCheck; | ||
} | ||
|
||
/** | ||
* @param args | ||
*/ | ||
public static void main(String[] args) { | ||
// TODO Auto-generated method stub | ||
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,110 +1,110 @@ | ||
package com.alipay.dw.jstorm.example.batch; | ||
|
||
import java.io.FileInputStream; | ||
import java.io.FileNotFoundException; | ||
import java.io.InputStream; | ||
import java.util.Map; | ||
|
||
import org.yaml.snakeyaml.Yaml; | ||
|
||
import backtype.storm.Config; | ||
import backtype.storm.StormSubmitter; | ||
import backtype.storm.generated.AlreadyAliveException; | ||
import backtype.storm.generated.InvalidTopologyException; | ||
import backtype.storm.generated.TopologyAssignException; | ||
import backtype.storm.topology.BoltDeclarer; | ||
import backtype.storm.topology.TopologyBuilder; | ||
|
||
import com.alibaba.jstorm.batch.BatchTopologyBuilder; | ||
import com.alibaba.jstorm.cluster.StormConfig; | ||
import com.alibaba.jstorm.local.LocalCluster; | ||
import com.alibaba.jstorm.utils.JStormUtils; | ||
|
||
public class SimpleBatchTopology { | ||
|
||
private static String topologyName; | ||
|
||
private static Map conf; | ||
|
||
private static void LoadYaml(String confPath) { | ||
|
||
Yaml yaml = new Yaml(); | ||
|
||
try { | ||
InputStream stream = new FileInputStream(confPath); | ||
|
||
conf = (Map) yaml.load(stream); | ||
if (conf == null || conf.isEmpty() == true) { | ||
throw new RuntimeException("Failed to read config file"); | ||
} | ||
|
||
} catch (FileNotFoundException e) { | ||
System.out.println("No such file " + confPath); | ||
throw new RuntimeException("No config file"); | ||
} catch (Exception e1) { | ||
e1.printStackTrace(); | ||
throw new RuntimeException("Failed to read config file"); | ||
} | ||
|
||
topologyName = (String) conf.get(Config.TOPOLOGY_NAME); | ||
return; | ||
} | ||
|
||
public static TopologyBuilder SetBuilder() { | ||
BatchTopologyBuilder topologyBuilder = new BatchTopologyBuilder( | ||
topologyName); | ||
|
||
int spoutParallel = JStormUtils.parseInt(conf.get("topology.spout.parallel"), 1); | ||
|
||
BoltDeclarer boltDeclarer = topologyBuilder.setSpout("Spout", | ||
new SimpleSpout(), spoutParallel); | ||
|
||
int boltParallel = JStormUtils.parseInt(conf.get("topology.bolt.parallel"), 2); | ||
topologyBuilder.setBolt("Bolt", new SimpleBolt(), boltParallel).shuffleGrouping( | ||
"Spout"); | ||
|
||
return topologyBuilder.getTopologyBuilder(); | ||
} | ||
|
||
public static void SetLocalTopology() throws Exception { | ||
TopologyBuilder builder = SetBuilder(); | ||
|
||
LocalCluster cluster = new LocalCluster(); | ||
cluster.submitTopology(topologyName, conf, builder.createTopology()); | ||
|
||
Thread.sleep(600000); | ||
|
||
cluster.shutdown(); | ||
} | ||
|
||
public static void SetRemoteTopology() throws AlreadyAliveException, | ||
InvalidTopologyException, TopologyAssignException { | ||
|
||
TopologyBuilder builder = SetBuilder(); | ||
|
||
StormSubmitter.submitTopology(topologyName, conf, | ||
builder.createTopology()); | ||
|
||
} | ||
|
||
public static void main(String[] args) throws Exception { | ||
|
||
if (args.length < 1) { | ||
System.err.println("Please input parameters topology.yaml"); | ||
System.exit(-1); | ||
} | ||
|
||
LoadYaml(args[0]); | ||
|
||
boolean isLocal = StormConfig.local_mode(conf); | ||
|
||
if (isLocal) { | ||
SetLocalTopology(); | ||
return; | ||
} else { | ||
SetRemoteTopology(); | ||
} | ||
|
||
} | ||
|
||
} | ||
package com.alipay.dw.jstorm.example.batch; | ||
|
||
import java.io.FileInputStream; | ||
import java.io.FileNotFoundException; | ||
import java.io.InputStream; | ||
import java.util.Map; | ||
|
||
import org.yaml.snakeyaml.Yaml; | ||
|
||
import backtype.storm.Config; | ||
import backtype.storm.StormSubmitter; | ||
import backtype.storm.generated.AlreadyAliveException; | ||
import backtype.storm.generated.InvalidTopologyException; | ||
import backtype.storm.generated.TopologyAssignException; | ||
import backtype.storm.topology.BoltDeclarer; | ||
import backtype.storm.topology.TopologyBuilder; | ||
|
||
import com.alibaba.jstorm.batch.BatchTopologyBuilder; | ||
import com.alibaba.jstorm.cluster.StormConfig; | ||
import com.alibaba.jstorm.local.LocalCluster; | ||
import com.alibaba.jstorm.utils.JStormUtils; | ||
|
||
public class SimpleBatchTopology { | ||
|
||
private static String topologyName; | ||
|
||
private static Map conf; | ||
|
||
private static void LoadYaml(String confPath) { | ||
|
||
Yaml yaml = new Yaml(); | ||
|
||
try { | ||
InputStream stream = new FileInputStream(confPath); | ||
|
||
conf = (Map) yaml.load(stream); | ||
if (conf == null || conf.isEmpty() == true) { | ||
throw new RuntimeException("Failed to read config file"); | ||
} | ||
|
||
} catch (FileNotFoundException e) { | ||
System.out.println("No such file " + confPath); | ||
throw new RuntimeException("No config file"); | ||
} catch (Exception e1) { | ||
e1.printStackTrace(); | ||
throw new RuntimeException("Failed to read config file"); | ||
} | ||
|
||
topologyName = (String) conf.get(Config.TOPOLOGY_NAME); | ||
return; | ||
} | ||
|
||
public static TopologyBuilder SetBuilder() { | ||
BatchTopologyBuilder topologyBuilder = new BatchTopologyBuilder( | ||
topologyName); | ||
|
||
int spoutParallel = JStormUtils.parseInt(conf.get("topology.spout.parallel"), 1); | ||
|
||
BoltDeclarer boltDeclarer = topologyBuilder.setSpout("Spout", | ||
new SimpleSpout(), spoutParallel); | ||
|
||
int boltParallel = JStormUtils.parseInt(conf.get("topology.bolt.parallel"), 2); | ||
topologyBuilder.setBolt("Bolt", new SimpleBolt(), boltParallel).shuffleGrouping( | ||
"Spout"); | ||
|
||
return topologyBuilder.getTopologyBuilder(); | ||
} | ||
|
||
public static void SetLocalTopology() throws Exception { | ||
TopologyBuilder builder = SetBuilder(); | ||
|
||
LocalCluster cluster = new LocalCluster(); | ||
cluster.submitTopology(topologyName, conf, builder.createTopology()); | ||
|
||
Thread.sleep(600000); | ||
|
||
cluster.shutdown(); | ||
} | ||
|
||
public static void SetRemoteTopology() throws AlreadyAliveException, | ||
InvalidTopologyException, TopologyAssignException { | ||
|
||
TopologyBuilder builder = SetBuilder(); | ||
|
||
StormSubmitter.submitTopology(topologyName, conf, | ||
builder.createTopology()); | ||
|
||
} | ||
|
||
public static void main(String[] args) throws Exception { | ||
|
||
if (args.length < 1) { | ||
System.err.println("Please input parameters topology.yaml"); | ||
System.exit(-1); | ||
} | ||
|
||
LoadYaml(args[0]); | ||
|
||
boolean isLocal = StormConfig.local_mode(conf); | ||
|
||
if (isLocal) { | ||
SetLocalTopology(); | ||
return; | ||
} else { | ||
SetRemoteTopology(); | ||
} | ||
|
||
} | ||
|
||
} |