博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm框架:Storm整合springboot
阅读量:6176 次
发布时间:2019-06-21

本文共 4659 字,大约阅读时间需要 15 分钟。

我们知道Storm本身是一个独立运行的分布式流式数据处理框架,Springboot也是一个独立运行的web框架。那么如何在Strom框架中集成Springboot使得我们能够在Storm开发中运用Spring的Ioc容器及其他如Spring Jpa等功能呢?我们先来了解以下概念: Storm主要的三个Component:Topology、Spout、Bolt。Topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。 SpringApplication:是配置Spring应用上下文的起点。通过调用SpringApplication.run()方法它将创建ApplicationContext实例,这是我们能够使用Ioc容器的主要BeanFactory。之后Spring将会加载所有单例模式的beans,并启动后台运行的CommandLineRunner beans等。 ApplicationContextAware:这是我们能够在普通Java类中调用Spring容器里的beans的关键接口。

实现原理

Storm框架中的每个Spout和Bolt都相当于独立的应用,Strom在启动spout和bolt时提供了一个open方法(spout)和prepare方法(bolt)。我们可以把初始化Spring应用的操作放在这里,这样可以保证每个spout/bolt应用在后续执行过程中都能获取到Spring的ApplicationContext,有了ApplicationContext实例对象,Spring的所有功能就都能用上了。

Spout.open方法实现 @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { //启动Springboot应用 SpringStormApplication.run();

this.map = map;this.topologyContext = topologyContext;this.spoutOutputCollector = spoutOutputCollector;复制代码

} Bolt.prepare方法实现 @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { //启动Springboot应用 SpringStormApplication.run();

this.map = map;this.topologyContext = topologyContext;this.outputCollector = outputCollector;复制代码

} SpringStormApplication启动类 @SpringBootApplication @ComponentScan(value = "com.xxx.storm") public class SpringStormApplication { /** * 非工程启动入口,所以不用main方法 * 加上synchronized的作用是由于storm在启动多个bolt线程实例时,如果Springboot用到Apollo分布式配置,会报ConcurrentModificationException错误 * 详见: * @param args */ public synchronized static void run(String ...args) { SpringApplication app = new SpringApplication(SpringStormApplication.class); //我们并不需要web servlet功能,所以设置为WebApplicationType.NONE app.setWebApplicationType(WebApplicationType.NONE); //忽略掉banner输出 app.setBannerMode(Banner.Mode.OFF); //忽略Spring启动信息日志 app.setLogStartupInfo(false); app.run(args); } } 与我们传统的Springboot应用启动入口稍微有点区别,主要禁用了web功能,看下正常的启动方式:

@SpringBootApplication @ComponentScan(value = "") public class PlatformApplication { public static void main(String[] args) { SpringApplication.run(PlatformApplication.class, args); } } 在spout/bolt中调用了SpringStormApplication.run方法后,我们还需要能够拿到ApplicationContext容器对象,这时候我们还需要实现ApplicationContextAware接口,写个工具类BeanUtils: @Component public class BeanUtils implements ApplicationContextAware { private static ApplicationContext applicationContext = null;

@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {    if (BeanUtils.applicationContext == null) {        BeanUtils.applicationContext = applicationContext;    }}public static ApplicationContext getApplicationContext() {    return applicationContext;}public static Object getBean(String name) {    return getApplicationContext().getBean(name);}public static 
T getBean(Class
clazz) { return getApplicationContext().getBean(clazz);}public static
T getBean(String name, Class
clazz) { return getApplicationContext().getBean(name, clazz);}复制代码

} 通过@Component注解使得Spring在启动时能够扫描到该bean,因为BeanUtils实现了ApplicationContextAware接口,Spring会在启动成功时自动调用BeanUtils.setApplicationContext方法,将ApplicationContext对象保存到工具类的静态变量中,之后我们就可以使用BeanUtils.getBean()去获取Spring容器中的bean了。

写个简单例子

在FilterBolt的execute方法中获取Spring bean @Override public void execute(Tuple tuple) { FilterService filterService = (FilterService) BeanUtils.getBean("filterService"); filterService.deleteAll(); } 定义FilterService类,这时候我们就可以使用Spring的相关注解,自动注入,Spring Jpa等功能了。 @Service("filterService") public class FilterService { @Autowired UserRepository userRepository;

public void deleteAll() {    userRepository.deleteAll();}复制代码

} 将storm应用作为Springboot工程的一个子模块

工程主目录的pom文件还是springboot相关的依赖,在storm子模块中引入storm依赖,这时候启动Strom的topology应用会有一个日志包依赖冲突。

SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] 我们需要在storm子模块的pom文件中重写org.springframework.boot:spring-boot-starter包依赖,将Springboot的相关日志包排除掉,如下:

org.springframework.boot spring-boot-starter org.apache.logging.log4j log4j-to-slf4j2 ch.qos.logback logback-classic2 OK,完美整合!

转载于:https://juejin.im/post/5bfa9a54e51d454b8e22d9db

你可能感兴趣的文章
Tomcat集群Cluster实现原理
查看>>
人人都应当控制的一些电脑操作技能
查看>>
百度echarts自定义主题使用
查看>>
ASP.NET MVC3中给DropDownList添加默认选项
查看>>
洛谷 1373 小a和uim之大逃离
查看>>
一不小心把win10的秘钥卸载了解决方法
查看>>
Linux实现删除撤回的方法。
查看>>
SilverLight之向后台请求数据-WebClient
查看>>
HDU Problem 1260 Tickets 【dp】
查看>>
STL map容器常用API
查看>>
队列的顺序存储---顺序队列
查看>>
Delphi 读取 c# webservice XML的base64编码图片字符串转化图片并显示
查看>>
第三天
查看>>
connector for python
查看>>
等价类划分的应用
查看>>
Web Service(下)
查看>>
trigger()
查看>>
nvm 怎么安装 ?
查看>>
Java VM里的magic
查看>>
[Node.js]Domain模块
查看>>