基于Storm的WordCount「建议收藏」

基于Storm的WordCount「建议收藏」Storm WordCount 工作过程 Storm 版本: 1、Spout 从外部数据源中读取数据,随机发送一个元组对象出去; 2、SplitBolt 接收 Spout 中输出的元组对象,将元组中的

Storm WordCount 工作过程

Storm 版本:
1、Spout 从外部数据源中读取数据,随机发送一个元组对象出去;
2、SplitBolt 接收 Spout 中输出的元组对象,将元组中的数据切分成单词,并将切分后的单词发射出去;
3、WordCountBolt 接收 SplitBolt 中输出的单词数组,对里面单词的频率进行累加,将累加后的结果输出。

Java 版本:
1、读取文件中的数据,一行一行的读取;
2、将读到的数据进行切割;
3、对切割后的数组中的单词进行计算。

Hadoop 版本:
1、按行读取文件中的数据;
2、在 Mapper()函数中对每一行的数据进行切割,并输出切割后的数据数组;
3、接收 Mapper()中输出的数据数组,在 Reducer()函数中对数组中的单词进行计算,将计算后的统计结果输出。

源代码

storm的配置、eclipse里maven的配置以及创建项目部分省略。

Mainclass

package com.test.stormwordcount;
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.generated.AlreadyAliveException; 
import backtype.storm.generated.InvalidTopologyException; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.tuple.Fields; 

public class MainClass { 

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {         
        //创建一个 TopologyBuilder         
        TopologyBuilder tb = new TopologyBuilder();         
        tb.setSpout("SpoutBolt", new SpoutBolt(), 2);         tb.setBolt("SplitBolt", new SplitBolt(), 2).shuffleGrouping("SpoutBolt");         
        tb.setBolt("CountBolt", new CountBolt(), 4).fieldsGrouping("SplitBolt", new Fields("word"));         
        //创建配置         
        Config conf = new Config();         
        //设置 worker 数量         
        conf.setNumWorkers(2);         
        //提交任务         
        //集群提交         
        //StormSubmitter.submitTopology("myWordcount", conf, tb.createTopology());         
        //本地提交         
        LocalCluster localCluster = new LocalCluster();         
        localCluster.submitTopology("myWordcount", conf, tb.createTopology()); 
    }  
} 

代码100分

SplitBolt 部分

代码100分package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 

public class SplitBolt extends BaseRichBolt{      
    OutputCollector collector; 

    /**      * 初始化      */     
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
        this.collector = collector;     
        } 

    /**      * 执行方法      */     
    public void execute(Tuple input) {         
        String line = input.getString(0);         
        String[] split = line.split(" ");         
        for (String word : split) {             
            collector.emit(new Values(word));         
            }     
        } 

    /**      * 输出      */     
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
        declarer.declare(new Fields("word"));     
        } 
} 

CountBolt 部分

package com.test.stormwordcount;
import java.util.HashMap; 
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Tuple; 

public class CountBolt extends BaseRichBolt{ 

    OutputCollector collector;
    Map<String, Integer> map = new HashMap<String, Integer>(); 

    /**      * 初始化      */     
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
        this.collector = collector;     
        } 


    /**      * 执行方法      */     
public void execute(Tuple input) {         
    String word = input.getString(0);         
    if(map.containsKey(word)){             
    Integer c = map.get(word);             
        map.put(word, c+1);         
        }else{             
        map.put(word, 1);         
        }         
    //测试输出         
    System.out.println("结果:"+map);     
    } 

    /**      * 输出      */     
public void declareOutputFields(OutputFieldsDeclarer declarer) {     
    
} 
} 

SpoutBolt 部分

代码100分package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichSpout; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 

public class SpoutBolt extends BaseRichSpout{ 

    SpoutOutputCollector collector;
    /**      * 初始化方法      */     
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {         
        this.collector = collector;     
        } 

    /**      * 重复调用方法      */     
    public void nextTuple() {         
        collector.emit(new Values("hello world this is a test"));     
        } 

    /**      * 输出      */     
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
        declarer.declare(new Fields("test"));     
        } 
} 

POM.XML 文件内容

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.test</groupId>
<artifactId>stormwordcount</artifactId>
<version>0.9.6</version>
<packaging>jar</packaging>

<name>stormwordcount</name>
<url>http://maven.apache.org</url>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.6</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.test.stormwordcount.MainClass</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
    </plugins>
</build>

遇到的问题

基于Storm的WordCount需要eclipse安装了maven插件,之前的大数据实践安装的eclipse版本为Eclipse IDE for Eclipse Committers4.5.2,这个版本不自带maven插件,后续安装失败了几次(网上很多的教程都已经失效),这里分享一下我成功安装的方法:
使用链接下载,Help->Install New SoftWare
基于Storm的WordCount「建议收藏」
点击Add,name输入随意,在location输入下载eclipse的maven插件,下载地址可以这样获取
点击连接:http://www.eclipse.org/m2e/index.html 进入网站后点击download,拉到最下面可以看到很多eclipse maven插件的版本和发布时间,选在适合eclipse的版本复制链接即可。建议取消选中Contack all update sites during install to find required software(耗时太久)。

但是安装成功后还是无法配置(这里原因不太清楚,没找到解决办法),就直接上官网换成自己maven插件的JavaEE IDE了…

后续的maven的配置这些都比较顺利,第一次创建maven-archetype-quickstat项目报错,试了网上很多办法都还没成功,然后打开 Windows->Preferencs->Maven->Installation发现之前配置了的maven的安装路径没了…重新配置了下就可以创建项目了。

最后运行成功的结果:
基于Storm的WordCount「建议收藏」

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/10423.html

(0)
上一篇 2022-12-27 22:00
下一篇 2022-12-28

相关推荐

  • NoSQL比较火的三个数据库Memcached、Redis、MongoDB

    NoSQL比较火的三个数据库Memcached、Redis、MongoDBNoSQL,泛指非关系型的数据库。随着互联网不断的发展,传统的关系数据库在应付新互联网模式的网站,特别是超大规模和高并发的SNS类型的纯动态网站已经显得力不从心,暴露了很多难以克服的问题,而非关系型的

    2023-02-19
    151
  • 阿里云分析型数据库MySQL版(AnalyticDB)测试初体验[通俗易懂]

    阿里云分析型数据库MySQL版(AnalyticDB)测试初体验[通俗易懂]其实是测试半遂体验。 这阵子对OLAP数据库产生了兴趣,先是简单测试了ClickHouse,性能的确不错,不过它在稳定&可靠性,整体生态&周边配套方面还有待加强,我会持续保持关注。 3月27日,腾讯…

    2023-02-23
    149
  • mysql 8.0 忘记root密码后重置[通俗易懂]

    mysql 8.0 忘记root密码后重置[通俗易懂]最近状态很不好,一直晕晕晕晕晕晕乎乎的,一个测试实例,下班前修改了一下root的密码,接着就下班走人,第二天来发现root密码忘了 刚好自动化安装脚本整理好了,本来想着算了直接重装实例得了,简单省事也

    2022-12-22
    144
  • redis复制过程_redis主从复制原理详解

    redis复制过程_redis主从复制原理详解Redis的复制机制(主从复制)。

    2023-02-14
    150
  • Anaconda是Python工程师必备工具之一

    Anaconda是Python工程师必备工具之一Python语言火热发展,吸引了大量的程序员加入。然而,Python开发中需要用到的各种工具及其安装配置实际上还是一个很麻烦的事情。比如,虚拟环境、第三方库、各种IDE等,各种工具和开发环境需要一个一个地去装和配置,费时费力。这时候,Anaconda应运而生,成为了Python编程中的一大利器,并成为Python工程师们必备的开发工具之一。

    2024-06-07
    53
  • Python中dropna的用法

    Python中dropna的用法在数据分析和处理过程中,经常会遇到数据缺失的情况,这时候就需要使用dropna方法来删除缺失值。

    2024-04-27
    71
  • 数据库的优点_sql可视化

    数据库的优点_sql可视化文章的开头我们先来看下什么是图数据库,根据维基百科的定义:图数据库是使用图结构进行语义查询的数据库,它使用节点、边和属性来表示和存储数据。 虽然和关系型数据库存储的结构不同(关系型数据库为表结构,图…

    2023-02-03
    132
  • 使用setup.py来管理Python项目的依赖

    使用setup.py来管理Python项目的依赖Python是一门广受欢迎和广泛应用的编程语言,拥有优秀的生态系统和强大的第三方包支持。当我们在开发Python项目的时候,通常需要引用许多第三方库和框架。这时候,就需要一个好的依赖管理工具来帮助我们管理这些依赖关系,以确保项目的可靠性和稳定性。在Python中,使用setup.py来管理项目依赖是一种非常常见的做法。

    2024-03-25
    83

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注