jstorm
首先 jstorm得概念请参考官网:点击打开链接,官网实例:点击打开链接
运行Jstorm可分为本地调试和分布式环境
1.先说分布式环境,首先搭建zookeeper集群点击打开链接
2.搭建kafka集群点击打开链接
3.搭建jstorm集群点击打开链接
4.开始贴代码
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.li</groupId>
<artifactId>demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-core</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.7.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
自定义Bolt入口
import backtype.storm.task.OutputCollector;
import backtype.storm.task.Topologycontext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputfieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import com.esotericsoftware.minlog.Log;
import java.io.serializable;
import java.util.Map;
/**
* title:
* <p>
* Description:TODO
* <p>
* Copyright:Copyright(c)2005
* <p>
* Company:
* <p>
* Author:lsj
* <p>
* Date:2018/4/12 10:10
*/
public class MyBolt implements IRichBolt,Serializable {
private static final long serialversionuid = 1L;
OutputCollector collector;
public void execute(Tuple input) {
try {
String string = input.getString(0);
System.out.println(string+"................................");
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
Log.ERROR("解析数据异常", e);
e.printstacktrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields());
}
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void cleanup() {
}
public Map<String, Object> getcomponentConfiguration() {
return null;
}
}
自定义Topology入口
import backtype.storm.Config;
import backtype.storm.localcluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import java.util.ArrayList;
import java.util.List;
/**
* Title:
* <p>
* Description:TODO
* <p>
* Copyright:Copyright(c)2005
* <p>
* Company:lj
* <p>
* Author:lsj
* <p>
* Date:2018/4/12 10:00
*/
public class MyTopology {
public static void main(String[] args) throws InterruptedException {
String brokerZkStr = "119.23.20.*:2181,120.77.200.*:2181,39.108.5.*:2181";
String brokerZkPath = "/brokers";
//消费kafka得top
String topic = "testTopic";
String offset = "";
//id 可以随意命名
String id = "testTopic";
integer workerNumSpout = 3;
Integer workerNumBolt = 3;
Integer maxSpoutPending = 2000;
if(args.length > 1){
topic = args[1];
}
if(args.length > 2){
workerNumSpout = Integer.parseInt(args[2]);
workerNumBolt = Integer.parseInt(args[3]);
}
if(args.length > 4){
maxSpoutPending = Integer.parseInt(args[4]);
}
ZkHosts zk = new ZkHosts(brokerZkStr,brokerZkPath);
SpoutConfig spoutConf = new SpoutConfig(zk, topic,
offset,
id);
List<String> zkServices = new ArrayList<String>();
for(String str : zk.brokerZkStr.split(",")){
zkServices.add(str.split(":")[0]);
}
spoutConf.zkServers = zkServices;
spoutConf.zkPort = 2181;
spoutConf.forceFromStart = false;
spoutConf.socketTimeoutMs = 60 * 1000;
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
// Kafka我们创建了一个5分区的Topic,这里并行度设置为5
builder.setSpout("data", new KafkaSpout(spoutConf), workerNumSpout);
builder.setBolt("analyze", new MyBolt(), workerNumBolt) .shuffleGrouping("data");
Config config = new Config();
config.setDebug(false);
config.setNumWorkers(workerNumSpout);
config.setMaxSpoutPending(1);
config.setNumAckers(0);
config.setDebug(false);
if(maxSpoutPending > 0){
config.setMaxSpoutPending(maxSpoutPending);
}
System.out.println(" topic = " + topic + " workerNumSpout = " + workerNumSpout +
" workerNumBolt = " + workerNumBolt + " maxSpoutPending = " + maxSpoutPending);
if(args.length>0){
try {
// args有参数时在分布式上提交任务
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}else{
// args没有参数时在本地提交任务
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("ImsiTopology", config, builder.createTopology());
}
}
}
5.打成jar 上传到分布式环境运行
# jstorm jar XXXX.jar com.li.test.MyTopology myTopic
xxxx.jar 为打包后的jar
com.alibaba.xxxx.xx 为入口类,即提交任务的类
parameter即为提交参数
6.本地环境调试
注释掉上图配置运行即可。
最后奉上源码地址点击打开链接
文章最后发布于: 2018-04-12 10:22:07
相关阅读
文章目录一、Producer介绍同步和异步的方式调用send()二、Producer工作原理介绍关于Sender线程三、Producer配置参数四、源码解析
Stream Processing: Apache Kafka的Exactly-once的定
2018年,Apache Kafka以一种特殊的设计和方法实现了强语义的exactly-once和事务性。热泪盈眶啊!这篇文章将讲解kafka中exactly-once
kafka是一款基于发布与订阅的消息系统。它一般被称为“分布式提交日志”或者“分布式流平台”。文件系统或者数据库提交日志用来