在数据分析过程中,经常会处理文本文件中的结构化数据(txt,csv等),有时这些文件还会很大,计算机内存不足以一次性读入。这时,只能将数据分批读入内存,对每批数据计算出临时中间结果,分批处理完以后,再按照计算要求对分批处理结果进行恰当的汇总处理,与一次性装入内存的小文件数据计算有很大的不同。
实现大文件计算需要了解一个重要的概念—游标。我们以前比较熟知的是数据库游标,在数据库中使用游标,可以每次返回部分数据,而不将所有数据同时读入内存。游标类似于一个指针,在读取时会通过移动指针的位置来从结果集中每次提取部分记录。与数据库游标类似,在读取大文件数据时,也需要实现文件游标,它具有以下特点:
1、只用于获取数据,并不用来修改结果集。
2、在读取数据时从前向后只遍历一次。
有了游标对象,就可以把大文件计算步骤依次附加在游标对象上,在进行最后计算时,再逐条取出记录,按附加的步骤进行计算。
本文将以结构化文本文件为例,给出大文件过滤、聚合计算、添加计算列、排序、分组聚合、topN 以及并行计算等目标任务的实现方法,并提供用 esProc SPL 编写的代码示例。esProc 是专业的数据计算引擎,其采用的 SPL 中有完善的游标对象及运算,处理这些运算非常方便。
1. 过滤
过滤就是设置一个条件表达式,然后用每条记录的数据来计算表达式的值,如果计算结果值为真则本条记录有效,需要添加到最后取数的结果集里,否则就丢弃这条记录不用取出。对大文件过滤是一种延迟计算,就是先把过滤表达式记在游标对象上,等到取某条记录时,再计算过滤表达式来决定是否将本记录加入结果集。
示例:在大数据学生成绩文本文件students_scores.txt中,查找10班学生的成绩。列数据间用TAB分隔,部分数据如下图:
esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/students_scores.txt").cursor@t() | @t选项,把第一行读作标题 |
2 | =A1.select(CLASS==10) | 筛选出10班的学生成绩,延迟计算 |
3 | =file("E:/txt/students_scores_10.txt").export@t(A2) | 将过滤后的数据存入新的文件 |
2. 聚合计算
聚合计算是对大文件中的所有记录,执行某种统计计算,比如统计总和、平均值、最大值、最小值、计数等。循环遍历游标中所有记录,用每条记录数据计算出当前的聚合统计值,只把统计值存在内存中,而不用保存数据记录,就不会占据太多内存。遍历完毕后就得到最终的统计值。
示例:在大数据学生成绩文本文件students_scores.csv中,列数据间用逗号分隔,部分数据如下图:
计算语文成绩总分,esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/students_scores.csv").cursor@tc() | @c指示分隔符是”,” |
2 | =A1.total(sum(Chinese)) | 计算语文成绩总分 |
计算10班语文成绩总分,esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/students_scores.csv").cursor@tc() | @c指示分隔符是”,” |
2 | =A1.select(CLASS==10).total(sum(Chinese)) | 先过滤出10班的成绩,再计算语文成绩总分 |
3. 添加计算列
添加计算列是指用文件中的一列或几列经过某种指定计算,将计算结果记为一个新列的列值。这也是一种延迟计算,当读取某条记录时,再计算表达式的值,将它赋给本记录的新列。
示例:在大数据学生成绩文本文件students_scores_.txt中,列数据间用|分隔,部分数据如下图:
计算每位学生的总成绩,esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/students_scores_.txt").cursor@t(;,"|") | 指定分隔符”|” |
2 | =A1.derive(English+Chinese+Math:total_score) | 附加derive计算总分,新增列名为total_score |
3 | =file("E:/txt/students_scores_total.txt").export@t(A2;"|") | 将附加了总成绩的数据存到新文件里 |
除了用derive增加新列,也可以用new函数创建新的数据结构,同时也可以增加新的列,例如:
|
A | 注释 |
1 | =file("E:/txt/students_scores_.txt").cursor@t(;,"|") | 指定分隔符”|” |
2 | =A1.new(CLASS,NAME,English+Chinese+Math:total_score) | 从A1的列中选择需要的列或设置表达式运算产生新的列,三门成绩相加得到总成绩 |
3 | =file("E:/txt/students_scores_total.txt").export@t(A2;"|") | 将新结构数据存到新文件里 |
当然,也可以先对数据进行过滤,再对需要的记录产生新列或生成新的结构。例如只取出10班的学生成绩并新增总成绩列,esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/students_scores_.txt").cursor@t(;,"|") | 指定分隔符”|” |
2 | =A1.select(CLASS==10) | 筛选出10班的学生成绩,延迟计算 |
3 | =A2.derive(English+Chinese+Math:total_score) | 附加derive计算总分,新增列名为total_score |
4 | =file("E:/txt/students_scores_total.txt").export@t(A3;"|") | 将附加了总成绩的数据存到新文件里 |
在获取到最终计算结果之前,各种基本计算,如过滤、新增列、产生新结构、改变字段值、排序等,都可以按需求先后附加到游标上。后面小节的示例中就不再一一列举这些了,只列出小节所讲的主题计算。
4. 排序
大文件排序因内存不足,不能读入所有数据来排序,实现的原理是这样的:先读入一批数据记录,读多少行合适要根据内存而定,将这批数据排序后存到一个临时文件,再读入下一批数据排序后存到另一个临时文件……直到所有数据处理完,最后对这些临时文件进行有序归并——读出每个临时文件的第一行,通过对排序字段值的比较,找出应该排在最前面的那一行,写入到结果文件。然后从刚才排第一的那个临时文件中再读出下一行,继续比较找出最前面的一行写入结果文件。按此方法不断进行,直到所有数据行都写入结果文件。
示例:在大数据学生成绩文本文件students_scores.txt中,按语文成绩升序排列。
esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/students_scores.txt").cursor@t() | 创建游标 |
2 | =A1.sortx(Chinese) | 按Chinese升序排序,返回游标 |
3 | =file("E:/txt/students_scores_sort.txt").export@t(A2) | 将排序后的数据存入新文件 |
也可以同时按多个字段排序或按表达式计算值排序,如将A2单元格改为:
=A1.sortx(Chinese,Math) //按语文、数学成绩先后排序
=A1.sortx(Math+English+Chinese) //按总成绩排序
5. 分组聚合
分组聚合是先对数据记录进行分组,对同一组的记录进行某种统计计算,最后得到每一组的统计值。大文件的分组聚合分两种情况:一是分组的结果不大(组数少),所有分组结果都能在内存中放下,称之为小分组聚合;二是分组的结果很大(组数非常多),内存中存不下所有的组,称之为大分组聚合。
小分组聚合的实现原理是:把分组键值和组统计值保存在内存中,在读取每条记录时,按分组表达式计算出分组键值,在保存的组里查找此键值,找到了则将本记录的数据与组统计值汇总,没找到则新加入一个组。最后直到所有行都处理完,就得到了所有的分组和本组的统计值。
示例:在大数据用户登录记录文件user_info_reg.csv中,统计各省用户的登录总次数及总时长。列数据间用逗号分隔,部分数据如下图:
esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/user_info_reg.csv").cursor@tc() | 创建游标,@c指示分隔符是”,” |
2 | =A1.groups(id_province;count(~):cnt,sum(reg_time):total_reg) | 分组后统计各省登录总次数及总时长 |
大分组聚合因内存不足,不能把所有分组聚合的结果放在内存里,所以需要分批处理,并使用临时文件保存分批结果,最后再归并汇总,实现原理是这样的:逐行读入数据,按照小分组聚合的流程进行分组聚合,当保存的分组结果集大到一定程序时(结果集大小视内存决定),将此结果集按分组键值排序后存为临时文件,从内存中清除。继续读入数据作同样处理,当所有数据处理完以后,就得到了多个按分组键值排好序的临时文件。然后对这些临时文件的数据作有序归并(与大文件排序的归并流程相同),得到一个按分组键值排序、键值很可能有重复的大文件,最后把那些重复键值的组合并成一组,得到所有分组结果大文件,再返回用此文件创建的游标供调用者提取分组结果数据。
示例:在大数据用户登录记录文件user_info_reg.csv中,统计每个用户的登录总次数及总时长。
esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/user_info_reg.csv").cursor@tc() | 创建游标,@c指示分隔符是”,” |
2 | =A1.groupx(user_id; count(~):cnt,sum(reg_time):total_reg) | 分组后求各用户登录总次数及总时长,返回游标 |
3 | =file("E:/txt/user_info_tj.csv").export@tc(A2) | 将各用户统计数据存入文件 |
6. TopN
TopN是对数据排序以后,查出前N条记录。有时需要对所有数据求前N条记录,有时还需要先对数据分组,再求每一组中的前N条记录。
但计算TopN时其实不需要对所有数据排序,那样会在排序上花费很多时间,对大文件计算尤其如此。TopN的实现原理是这样的:先读出N条记录,形成一个N条记录的小数据集并排好序,再读新的记录时,与小数据集的最后一条比较,若排在它之后,则直接丢弃这条记录,若排在它之前,则将这条新记录插入到小数据集的合适位置,丢弃小数据集的最后一条记录。当所有数据都读出并处理完时,就得到了需要的前N条记录集。TopN的实现方式和聚合运算很像。
示例:在大数据学生成绩文本文件students_scores.txt中,查找数学成绩排在前10的学生记录。
esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/students_scores.txt").cursor@t() | 创建游标 |
2 | =A1.groups(;top(10;-Math)) | 因为要逆序排列,所以用-Math排序 |
top函数除了返回前N条记录,也可以返回前N个值。例如查找排在前10的数学成绩,esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/students_scores.txt").cursor@t() | 创建游标 |
2 | =A1.groups(;top(-10,Math)) | top函数中间的分隔符是逗号时,直接返回前10个Math |
TopN还可以使用到分组中,即每个组中取TopN,其计算原理也类似,只是需要为每个分组保持一个N条记录的小数据集。
示例:在大数据学生成绩文本文件students_scores.txt中,查找各班数学成绩排在前10的学生记录。
esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/students_scores.txt").cursor@t() | 创建游标 |
2 | =A1.groups(CLASS;top(10;-Math)) | 因为要逆序排列,所以用-Math排序 |
7. 并行提速
并行计算就是用多个线程同时分担一个计算任务,能充分利用多核CPU提高计算性能,这对于大文件特别有用。大文件计算常常需要将数据分批计算,最后再将分批计算结果进行合并汇总。并行计算也是如此,先将大文件分段,每个线程各自用大文件计算的方式处理一段数据,最后将各线程处理的结果进行汇总。示例:在大数据用户登录记录文件user_info_reg.csv中,统计各省用户的登录总次数,用4路并行计算提高速度。esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/user_info_reg.csv").cursor@tcm(;4) | 创建游标,@m表示并行计算,参数4表示4路并行 |
2 | =A1.groups(id_province;count(~):cnt) |
|
示例:在大数据学生成绩文本文件students_scores.csv中,查询各班语文成绩在90分以上且总成绩排在前5名的学生,并用8路并行提高速度,esProc SPL脚本如下:
|
A | 注释 |
1 | =file("E:/txt/students_scores.csv").cursor@tcm(;8) | 创建游标,@m表示并行计算,参数8表示8路并行 |
2 | =A1.select(Chinese>=90) | 筛选出语文在90分以上的学生 |
3 | =A2.derive(English+Chinese+Math:total_score) | 附加derive计算总分,新增列名为total_score |
4 | =A3.groups(CLASS;top(-5;total_score)) | 按班分组,查询各班总成绩前5的学生 |
5 | =file("E:/txt/students_scores_total.txt").export@tc(A4) | 将查询数据存到新文件里 |
通过以上示例可以看出,在SPL中使用并行提速非常容易,与单线程代码相比,仅仅多一个游标选项与参数,让用户使用并行非常方便。
《》中有更多敏捷计算示例。