大数据Hadoop之——数据采集存储到HDFS实战(Python版本)

大数据Hadoop之——数据采集存储到HDFS实战(Python版本)要实现这个示例,必须先安装好hadoop和hive环境,环境部署可以参考我之前的文章: 大数据Hadoop原理介绍+安装+实战操作(HDFS+YARN+MapReduce) 大数据Hadoop之——数

大数据Hadoop之——数据采集存储到HDFS实战(Python版本)

要实现这个示例,必须先安装好hadoop和hive环境,环境部署可以参考我之前的文章:
大数据Hadoop原理介绍+安装+实战操作(HDFS+YARN+MapReduce)
大数据Hadoop之——数据仓库Hive

【流程图如下】

大数据Hadoop之——数据采集存储到HDFS实战(Python版本)

【示例代码如下】

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author   : liugp
# @File     : Data2HDFS.py

"""
# pip install sasl可能安装不成功
pip install sasl
# 可以选择离线安装
https://www.lfd.uci.edu/~gohlke/pythonlibs/#sasl
pip install sasl-0.3.1-cp37-cp37m-win_amd64.whl

pip install thrift
pip install thrift-sasl
pip install pyhive
pip install hdfs
"""
from selenium import webdriver
from pyhive import hive
from hdfs import InsecureClient

class Data2HDFS:
    def __init__(self):
        # 第一个步,连接到hive
        conn = hive.connect(host="192.168.0.113", port=11000, username="root", database="default")
        # 第二步,建立一个游标
        self.cursor = conn.cursor()

        self.fs = InsecureClient(url="http://192.168.0.113:9870/", user="root", root="/")

    """
    采集数据
    """
    def collectData(self):
        try:
            driver = webdriver.Edge("../drivers/msedgedriver.exe")
            # 爬取1-3页数据,可自行扩展
            id = 1
            local_path = "./data.txt"
            with open(local_path, "w", encoding="utf-8") as f:
                for i in range(1, 2):
                    url = "https://ac.qq.com/Comic/index/page/" + str(i)
                    driver.get(url)
                    # 模拟滚动
                    js = "return action=document.body.scrollHeight"
                    new_height = driver.execute_script(js)
                    for i in range(0, new_height, 10):
                        driver.execute_script("window.scrollTo(0, %s)" % (i))
                    list = driver.find_element_by_class_name("ret-search-list").find_elements_by_tag_name("li")
                    data = []
                    for item in list:
                        imgsrc = item.find_element_by_tag_name("img").get_attribute("src")
                        author = item.find_element_by_class_name("ret-works-author").text
                        leixing_spanlist = item.find_element_by_class_name("ret-works-tags").find_elements_by_tag_name(
                            "span")
                        leixing = leixing_spanlist[0].text + "," + leixing_spanlist[1].text
                        neirong = item.find_element_by_class_name("ret-works-decs").text
                        gengxin = item.find_element_by_class_name("mod-cover-list-mask").text

                        itemdata = {"id": str(id), "imgsrc": imgsrc, "author": author, "leixing": leixing, "neirong": neirong,
                                    "gengxin": gengxin}
                        print(itemdata)
                        line = itemdata["id"] +"," + itemdata["imgsrc"] +"," + itemdata["author"] + "," + itemdata["leixing"] + "," + itemdata["neirong"] + itemdata["gengxin"] + "
"
                        f.write(line)
                        id+=1
                    data.append(itemdata)
            # 上传文件,
            d2f.uplodatLocalFile2HDFS(local_path)

        except Exception as e:
            print(e)

    """创建hive表"""
    def createTable(self):
        # 解决hive表中文乱码问题
        """
        mysql -uroot -p
        use hive数据库

        alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
        alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
        alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
        alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
        alter table INDEX_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
        commit;
        :return:
        """
        self.cursor.execute("CREATE TABLE  IF NOT EXISTS default.datatable (
        id INT COMMENT "ID",
        imgsrc STRING COMMENT "img src",
        author STRING COMMENT "author",
        leixing STRING COMMENT "类型",
        neirong STRING COMMENT "内容",
        gengxin STRING COMMENT "更新"
        )
        ROW FORMAT DELIMITED
        FIELDS TERMINATED BY ","
        COLLECTION ITEMS TERMINATED BY "-"
        MAP KEYS TERMINATED BY ":"
        LINES TERMINATED BY "
"")

    """
    将本地文件推送到HDFS上
    """
    def uplodatLocalFile2HDFS(self, local_path):
        hdfs_path = "/tmp/test0508/"
        self.fs.makedirs(hdfs_path)
        # 如果文件存在就必须先删掉
        self.fs.delete(hdfs_path + "/" + local_path)
        print(hdfs_path, local_path)
        self.fs.upload(hdfs_path, local_path)

    """
    将HDFS上的文件load到hive表
    """
    def data2Hive(self):
        # 先清空表
        self.cursor.execute("truncate table datatable")
        # 加载数据,这里的路径就是HDFS上的文件路径
        self.cursor.execute("load data inpath "/tmp/test0508/data.txt" into table datatable")
        self.cursor.execute("select * from default.datatable")
        print(self.cursor.fetchall())

if __name__ == "__main__":
    d2f = Data2HDFS()
    # 收集数据
    d2f.collectData()
    # 创建hive表
    # d2f.createTable()
    # 将数据存储到HDFS
    d2f.data2Hive()

【温馨提示】hiveserver2的默认端口是10000,我是上面写的11000端口,是因为我配置文件里修改了,如果使用的是默认端口,记得修改成10000端口,还有就是修改成自己的host地址。这个只是一种实现方式,还有其它方式。

如果小伙伴有疑问的话,欢迎给我留言,后续会更新更多关于大数据的文章,请耐心等待~

原文地址:https://www.cnblogs.com/liugp/archive/2022/05/24/16307510.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/5204.html

(0)
上一篇 2023-05-19
下一篇 2023-05-19

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注