java读取大文件oom_java中如何读取文件

java读取大文件oom_java中如何读取文件现在需要快速分析一个2g的csv文件; 基于掌握的知识,使用java按行读取文件,批量导入数据到Elasticsearch, 然后利用es强大的聚合能力分析数据,1个小时搞定! package com

java读取大文件内容到Elasticsearch分析(手把手教你java处理超大csv文件)


现在需要快速分析一个2g的csv文件;
基于掌握的知识,使用java按行读取文件,批量导入数据到Elasticsearch,
然后利用es强大的聚合能力分析数据,1个小时搞定!

package com.example.demo;

import com.alibaba.fastjson.JSON;
import com.example.demo.entity.Entity;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;

/**
 * 读取大文件
 * csv格式
 *
 * @author lhb
 * @date 2021/11/11
 * @since 1.0.0
 */
@SpringBootTest
public class ImportTest {

    @Autowired
    @Qualifier("client")
    private RestHighLevelClient restHighLevelClient;

    @Test
    void insert() {
     //csv文件2G,63W条数据,十多个字段
        String filePath = "D:\file\20211111.csv";

        LineIterator it = null;
        try {
            it = FileUtils.lineIterator(new File(filePath), "UTF-8");

        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            while (it.hasNext()) {
                String line = it.nextLine();
                //System.out.println("line = " + line);
                //文件是CSV文件,CSV文件中的每一列是用","隔开的,这样就可以得到每一列的元素
                String[] strArray = line.split(",");
                //有很长的空格,trim一下
                String name = strArray[6].trim();
                String code = strArray[8].trim();
                String num = strArray[11].trim();
                System.out.println(code + "==" + num);

                Entity entity = new Entity();
                entity.setCode(code);
                if (Objects.equals("xxx", code)) {
                    //跳过表头
                    continue;
                }
                entity.setNum(Long.parseLong(num));
                entity.setName(name);
                entity.setCreateTime(new Date());
                String index = "index20211111";
                singleInsert2(index, entity);
            }
        } finally {
            LineIterator.closeQuietly(it);
        }
    }

    @Test
    void batchInsert() {

        String filePath = "D:\express\20211111.csv";

        LineIterator it = null;
        try {
            it = FileUtils.lineIterator(new File(filePath), "UTF-8");

        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            int i = 0;
            List<Entity> entities = new ArrayList<>();

            while (it.hasNext()) {
                String line = it.nextLine();
                //System.out.println("line = " + line);
                String[] strArray = line.split(",");
                String code = strArray[6].trim();
                String name = strArray[8].trim();
                String num = strArray[11].trim();
                System.out.println(code + "==" + num);

                if (Objects.equals("xxx", code)) {
                    //跳过表头
                    continue;
                }
                Entity entity = new Entity();
                entity.setCode(code);
                entity.setName(name);
                try {
                    entity.setNum(Long.parseLong(num));
                } catch (NumberFormatException e) {
                    e.printStackTrace();
                    System.out.println("出错的数据" + code + "==" + num);
                }
                entity.setCreateTime(new Date());
                String index = "index20211111";

                //批量插入
                entities.add(entity);
                i++;
          //如果最后一次批量插入不足10000条数据,需要再此根据实际条数特殊处理
if (i % 10000 == 0) { System.out.println("i = " + i); try { batchInsert2(index, entities); } catch (IOException e) { e.printStackTrace(); } //清空已经处理过的list entities.clear(); i = 0; } } } finally { LineIterator.closeQuietly(it); } } /** * 批量速度杠杠的 * * @param index * @param entities * @throws IOException */ public void batchInsert2(String index, List<Entity> entities) throws IOException { BulkRequest bulkRequest = new BulkRequest(index); System.out.println("entities.sz = " + entities.size()); for (Entity org : entities) { IndexRequest request = new IndexRequest(); request.source(JSON.toJSONString(org), XContentType.JSON); bulkRequest.add(request); } restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); } /** * 数据量大,超级慢 * * @param index * @param entity */ public void singleInsert2(String index, Entity entity) { IndexRequest request = new IndexRequest(index); request.source(JSON.toJSONString(entity), XContentType.JSON); try { IndexResponse index1 = restHighLevelClient.index(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } }


实体类,需要什么字段自定义
package com.example.demo.entity;

import lombok.Data;

import java.util.Date;

/**
 * @author lhb
 * @date 2021/11/11
 * @since 1.0.0
 */
@Data
public class Entity {

    /**
     * 编码
     */
    private String code;
    /**
     * 名字
     */
    private String name;
    /**
     * 数量
     */
    private Long num;
    private Date createTime;

}


创建索引映射,然后插入数据:
PUT express_to_village20211104
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      },
      "name": {
        "type": "keyword"
      },
      "num": {
        "type": "long"
      },
      "createTime": {
        "type": "date"
      }
    }
  }
}

开始分析数据:

GET index20211111/_count
{}

 

#返回63w数据

{
“count” : 630000,
“_shards” : {
“total” : 1,
“successful” : 1,
“skipped” : 0,
“failed” : 0
}
}

GET index20211111/_search
{
  "query": {
    "constant_score": {
      "filter": {
        "terms": {
          "code": [
            2222,
            1111,
            3333
          ]
        }
      }
    }
  },
  "size": 1,
  "track_total_hits": true,
  "aggs": {
    "per_code": {
      "terms": {
        "field": "code",
        "size": 200
      },
      "aggs": {
        "num": {
          "sum": {
            "field": "num"
          }
        }
      }
    },
    "sum_num": {
      "sum": {
        "field": "num"
      }
    }
  }
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/5722.html

(0)
上一篇 2023-04-27
下一篇 2023-04-27

相关推荐

  • 使用Python if语句多个条件判断

    使用Python if语句多个条件判断Python作为广泛应用的编程语言之一,其if语句是编程中非常重要的一部分,在判断特定条件下的程序流程上有着非常重要的作用。当需要在多个条件中进行判断时,Python if语句的多个条件判断就成为了解决问题的关键。

    2024-04-15
    82
  • Python起始参数:完整指南

    Python起始参数:完整指南Python脚本通常可以在运行时动态地获取参数。命令行参数是由程序在运行时传递的字段,以决定程序的运行方式。Python的sys模块提供了一个名为sys.argv的列表,其中包含了传递到Python脚本的所有参数。其中,sys.argv[0]表示脚本名称,sys.argv[1]表示第一个参数,以此类推。下面是一个示例:

    2024-03-27
    67
  • 事务的隔离级别是什么_mysql事务隔离级别 默认

    事务的隔离级别是什么_mysql事务隔离级别 默认在讲事务的隔离级别,我们先得回忆一下事务的隔离性 事务的隔离性是在当多个用户并发访问数据库时,比如说操作同一张表时,数据库为每一个用户开启事务,不能被其他事务的操作所干扰,多个并发事务之间需要相互隔离

    2023-04-18
    170
  • 【华为云技术分享】数据库开发:MySQL Seconds_Behind_Master简要分析「建议收藏」

    【华为云技术分享】数据库开发:MySQL Seconds_Behind_Master简要分析「建议收藏」【摘要】对于mysql主备实例,seconds_behind_master是衡量master与slave之间延时的一个重要参数。通过在slave上执行”show slave status;”可以获取…

    2023-03-10
    158
  • JavaScript获取对象的key

    JavaScript获取对象的key在 JavaScript 中,获取对象的 key(属性名)是非常常见的操作。不仅如此,有时候我们需要对对象的 key 做进一步的操作,如查找某个特定 key,在循环中迭代对象等等。这篇文章将详细介绍如何使用 JavaScript 获取对象的 key(属性名),并给出多个案例来演示不同的用法。

    2024-06-14
    45
  • redis交叉编译 (windows( x86 x86_64) & linux(arm aarch64 armv8l)) 成品「终于解决」

    redis交叉编译 (windows( x86 x86_64) & linux(arm aarch64 armv8l)) 成品「终于解决」redis交叉编译 平台: Windows: x86 x86_64 Linux: arm aarch64 armv8l 仓库地址: https://github.com/huskar-t/redis

    2023-03-10
    159
  • Python:面向对象还是面向过程编程?

    Python:面向对象还是面向过程编程?Python 是一种高级编程语言,它支持多种编程范式,包括面向对象和面向过程编程。在 Python 中,你可以使用面向对象或面向过程编写代码,这取决于你个人的喜好和项目需求。本文将从多个方面对 Python 的面向对象和面向过程编程进行详细阐述。

    2024-08-20
    29
  • 流媒体与实时计算,Netflix公司Druid应用实践[通俗易懂]

    流媒体与实时计算,Netflix公司Druid应用实践[通俗易懂]Netflix(Nasdaq NFLX),也就是网飞公司,成立于1997年,是一家在线影片[租赁]提供商,主要提供Netflix超大数量的[DVD]并免费递送,总部位于美国加利福尼亚州洛斯盖图。199

    2023-02-26
    141

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注