POI报表比对任务-不怕内存溢出

需求概述

公司需要把支付宝的月度支付流水账单excel文件导入系统与本地的支付流水明细做差异比对。最后输出一个Excel文件,内容包含:原始上传的支付宝流水、本地数据库查询出支付流水以及双方的差异 3个sheet子文件。
差异结果: 支付宝流水差异,本地支付流水差异,双方金额差异。

需求实现:

  • 需要把从支付宝下载的支付明细报表导入我们自己的后台系统,文件格式是CSV 转XLSX格式之后上传OSS。这个简单就是正常的上传。
  • 因需要接入下载中心入口(统一管理所有下载文件的菜单栏),所以上传完之后调用下载中心接口http请求,携带上传完的【ossUrl,下载类型,目标地址方法】等给下载中心。下载中心接到请求之后,会生成一条job任务记录,再转发调度业务方去做实际下载的任务。最后业务方下载完成之后,将ossUrl结果再回调给下载中心展示。(感觉号麻烦还搞下载中心干啥? 统一前端请求接口,对使用者友好,都放一起了,异步下载,原始的基本都是同步下载 输出流模式到本地)
  • 业务方处理
    3.1 需要异步方法去处理任务,因为下载中心转发过来的是一个同步的http请求,不可以长时间等待业务方处理完再返回消息。(为啥不用异步http?答:实际没用过,存在的坑不明确。又因只是内部项目。)
    3.2 获取ossUrl地址,通过http get请求读取到字节流,将流转为XSSFWorkbook 再转SXSSFWorkbook 。
  •             byte[] excelStreamData = RestTemplateUtil.getForObject(ossUrl, byte[].class);  
                / / 大数据量用2007版本  XSSFWorkbook 
                XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new ByteArrayInputStream(excelStreamData));       
                // 转SXSSFWorkbook 是因为需要取写入其他两个sheet XSSFWorkbook 有内存溢出风险。想详细了解这几个Workbook的区别可以自行查询哈。
                SXSSFWorkbook sxssfWorkbook = new SXSSFWorkbook(xssfWorkbook,100);
    

    3.3 通过xssfWorkbook 获取到原始支付流水Sheet(SXSSFWorkbook 读取不到的),把数据按照相应的比较类型(比如:预付费 ,支付,退款等)加载到内存中,我是用了3个map去存不同类型的数据。(为啥用map?用map把支付流水号ID作为key,value是一个自定义实体,map的优势 数据哈希映射,比对的时候,通过相同的key去gmap中get查询,时间O(1) )

    XSSFSheet sheet = xssfWorkbook.getSheet(0);
    

    3.4 使用线程池创建了3个子线程处理这3个map子任务比对数据。CpCompareParamDTO.getInstance();这个是设置子线程对比所需的参数 去本地数据库中查询的条件。
    注意:这儿如果是同一个对象共享的话会有数据安全问题。我这儿是采用了new不同的对象。也可使用ThreadLcoal,记得用完都要释放掉。
    子线程共享的rowNum 用原子类AtomicInteger, 共享的sheet volatile修饰。

        TaskExecutor taskExecutor = taskExecutePoolUtil.orderQueryTaskExecutor();
                CountDownLatch countDownLatch = new CountDownLatch(3);
                AtomicInteger failCount = new AtomicInteger();
               taskExecutor.execute(() -> {
                    CpCompareParamDTO compareParamDTO = CpCompareParamDTO.getInstance(requestParams, rowNum, paySheet, diffRowNum, diffSheet,combinePayMap,null,PayBusinessTypeEnum.COMBINE_ORDER_PAY.getValue());
                    forkThreadExecute(requestId,compareParamDTO,failCount,countDownLatch);
                taskExecutor.execute(() -> {
                    CpCompareParamDTO compareParamDTO = CpCompareParamDTO.getInstance(requestParams, rowNum, paySheet, diffRowNum, diffSheet,null,refundMap,PayBusinessTypeEnum.REFUND.getValue());
                    forkThreadExecute(requestId,compareParamDTO,failCount,countDownLatch);
                taskExecutor.execute(() -> {
                    CpCompareParamDTO compareParamDTO = CpCompareParamDTO.getInstance(requestParams, rowNum, paySheet, diffRowNum, diffSheet,prepaidMap,null,PayBusinessTypeEnum.PREPAID_ORDER_PAY.getValue());
                    forkThreadExecute(requestId,compareParamDTO,failCount,countDownLatch);
               //等待子线程全部结束
                countDownLatch.await();
                if (failCount.get() > 0){
                    logger.error("支付流水差异比对#############有分支比对失败。requestParams:{}",requestParams);
                    //TODO 回调下载中心
                    return;
         * 创建子线程
         * @param failCount
        private void forkThreadExecute(String requestId,CpCompareParamDTO compareParamDTO,AtomicInteger failCount,CountDownLatch countDownLatch){
            MDC.put("requestId",requestId);
                compareDataAndWaiterSheet(compareParamDTO);
                logger.info("支付流水差异比对###########" + PayBusinessTypeEnum.getText(compareParamDTO.getPayBusinessType()) +  "子任务比对处理完成");
            }catch (Exception ex){
                logger.error("支付流水差异比对###########" + PayBusinessTypeEnum.getText(compareParamDTO.getPayBusinessType()) +  "比对分支错误:{}",ex.getMessage(),ex);
                failCount.getAndIncrement();
            countDownLatch.countDown();
            compareParamDTO= null;
    
     public static CpCompareParamDTO getInstance(Map<String,String> paramMap,AtomicInteger rowNum,SXSSFSheet paySheet,AtomicInteger diffRowNum, SXSSFSheet diffSheet,
                                                    Map<String,PayFlowDiffDTO> dataMap,Map<String, List<PayFlowDiffDTO>> refundMap,String payBusinessType){
           //也可以只创建一个单例类,使用ThreadLocal隔离
            CpCompareParamDTO cpCompareParamDTO = new CpCompareParamDTO();
           //........设置参数
            return  cpCompareParamDTO;
    

    3.5 剩余的就是去分批次的查询数据库,比如取出1000条之后在map中查询是否存在,比较金额,金额一致从map中删除掉此条记录等业务逻辑处理。
    3.6 写入对应的sheet文件。等子线程全部完成之后,将sxssfWorkbook 转为流上传到oss ,最后回调给下载中心。
    注意:有坑!!!!!!! 需要用synchronized锁一下资源。

    功能做完了,我们需要更进一步了解功能的性能:因为是纯内存操作比较多,对比耗时时间,占用内存比例。这是这个功能的两大问题。耗时时间我们已经使用了多线程去处理,map.get()方式对比。进过测试10w行的excel 数据库10w条 执行完整方法在30秒左右(当然对应的数据库查询sql优化也需要,关键字段索引 查询是否走索引这些 分批次limit方式在百万级别下的优化)网上有很多可以参考。这儿就不多赘述了。

    内存占用问题:

    本地测试:
    堆初始化: -Xms1024m -Xmx1024m
    测试数据量:excel10w行左右,数据库10w行左右
    启用了JProfiler 查看实时内存分析
    方法进入前初始标记:

    代码中也打印所占用内存:
                SXSSFWorkbook sxssfWorkbook = new SXSSFWorkbook(xssfWorkbook,100);
                logger.info("计算内存大小:sxssfWorkbook:{}", RamUsageEstimator.humanSizeOf(sxssfWorkbook));
    

    最终的sxssfWorkbook 文件大小与实时运行占用内存大小基本一致。

    3个map所占用内存 总和为30M左右 。
    由此可见,map占用内存的量并不大,真正占用内存的是POI 这个。一次10w的数据就占用了小500M。虽然说生产环境的内存都比较大,默认用的物理机器的4/1,用完也会被GC回收,但要是出现了同时好几个人在操作报表对比,那可想而知~~~

    为啥10w行 Apache POI占用这么大内存呢,分析发现 XLSX格式基本上是一堆压缩的XML文件 包含了一堆样式,解压到内存之后体积成倍的放大了。

  • 试着先把Apache POI的版本升级到了最新版本,尝试了下,发现转XSSFWorkbook这一步,占用了一百多M,比之前少了小300M,但是在转为SXSSFWorkbook的时候 发现还是4百多M。
  • 因为XLSX格式大部分被xml样式这些占用了, 发现CSV 格式的文件是纯文本数据。尝试把第一步: 原本的CSV格式转 XLSX注释掉,直接上传CSV格式到OSS服务。
    再去3.2步 将这个CSV格式的字节数组直接转为XSSFWorkbook,同时,将3.3步合并到此方法中。因为,需要原始数据,而XSSFWorkbook读不到所有数据,只能保留设置留在内存的条数。
  • * csv格式的流转成xlsx格式的workbook public SXSSFWorkbook getWorkbookByCsv(InputStream inputStream,String fileName) throws IOException { // 文件的编码,这里设为GB2312 CsvReader reader = new CsvReader(inputStream, ',', Charset.forName("GB2312")); //2007版本以上 xlsx SXSSFWorkbook result = new SXSSFWorkbook(100); SXSSFSheet sheet = result.createSheet(fileName); SXSSFRow row; SXSSFCell cell; PayFlowDiffDTO payFlowDiffDTO ; int index= 0; while (reader.readRecord()) { // excel中没一行的数据 String[] values = reader.getValues(); row = sheet.createRow(index); for (int columnNum = 0; columnNum < values.length; columnNum++) { cell = row.createCell(columnNum); cell.setCellValue(values[columnNum]); cell= null; row= null; boolean notData = index++ < 5 || values[0].contains("#"); if(notData){ continue; payFlowDiffDTO = new PayFlowDiffDTO (); / / 读取数据到实体中。 convertRowToData(values,payFlowDiffDTO ); // 添加到不同的map中 payFlowDiffDTO =null; reader.close(); reader = null; return result;