分类: 数据库开发技术
2020-12-09 14:44:39
企业在使用大数据分析平台时,首先需要把海量数据从多个数据源迁移到大数据平台中。
在导入数据前,我们需要理解 DolphinDB database 的基本概念和特点。
DolphinDB数据表按存储介质分为3种类型:
DolphinDB数据表按是否分区分为2种类型:
在传统的数据库中,分区是针对数据表的,即同一个数据库中的每个数据表可以有不同的分区方案;而DolphinDB的分区是针对数据库的,即一个数据库只能使用一种分区方案。如果两个表的分区方案不同,它们不能放在同一个数据库中。
DolphinDB提供了3种灵活的数据导入方法:
通过CSV文件进行数据中转是比较通用的数据迁移方式。DolphinDB提供了loadText、ploadText和loadTextEx三个函数来导入CSV文件。下面我们通过一个示例CSV文件candle_201801.csv来说明这3个函数的用法。
1.1 loadText
语法:loadText(filename, [delimiter=','], [schema])
参数:
filename是文件名。
delimiter和schema都是可选参数。
delimiter用于指定不同字段的分隔符,默认是“,”。
schema用于数据导入后每个字段的数据类型,它是一个table类型。DolphinDB提供了字段类型自动识别功能,但是某些情况下系统自动识别的数据类型不符合需求,比如我们在导入示例CSVcandle_201801.csv时,volume字段会被识别成INT类型,实际上我们需要LONG类型,这时就需要使用schema参数。
创建schema table的脚本:
点击(此处)折叠或打开
当表的字段非常多时,创建schema table的脚本会十分冗长。为了避免这个问题,DolphinDB提供了extractTextSchema函数,它可以从文本文件中提取表的结构,我们只需修改需要指定的字段类型即可。
点击(此处)折叠或打开
1.2 ploadText
ploadText把数据文件作为分区表并行加载到内存中,语法和loadText完全相同,但是ploadText的速度更快。ploadText主要用于快速载入大文件,它在设计上充分利用了多个core来并行载入文件,并行程度取决于服务器本身core数量和节点的localExecutors配置。
下面我们对比loadText和ploadText的性能。
首先,通过脚本生成一个4G左右的CSV文件:
点击(此处)折叠或打开
分别使用loadText和ploadText来导入文件,该节点是4核8线程的CPU。
点击(此处)折叠或打开
结果显示,ploadText的性能差不多是loadText的4倍。
1.3 loadTextEx
语法:loadTextEx(dbHandle, tableName, [partitionColumns], fileName, [delimiter=','], [schema])
参数:
dbHandle是数据库句柄。
tableName是保存数据的分布式表的表名。
partitionColumns、delimiter和schema是可选参数。
当分区方案不是顺序分区时,需要指定partitionColumns,表示分区列。
fileName表示导入文件的名称。
delimiter用于指定不同字段的分隔符,默认是“,”。
schema用于数据导入后每个字段的数据类型,它是一个table类型。
loadText函数总是把数据导入到内存,当数据文件非常庞大时,工作机的内存很容易成为瓶颈。loadTextEx可以很好地解决这个问题,它通过边导入边保存的方式,把静态的CSV文件以较为平缓的数据流的方式“另存为”DolphinDB的分布式表,而不是采用全部导入内存再存为分区表的方式,大大降低了内存的使用需求。
首先创建用于保存数据的分布式表:
点击(此处)折叠或打开
然后将文件导入分布式表:
点击(此处)折叠或打开
当需要使用数据做分析的时候,通过loadTable函数将分区元数据先载入内存,在实际执行查询的时候,DolphinDB会按需加载数据到内存。
点击(此处)折叠或打开
HDF5是一种比CSV更高效的二进制数据文件格式,在数据分析领域广泛使用。DolphinDB也支持通过HDF5格式文件导入数据。
DolphinDB通过HDF5插件来访问HDF5文件,插件提供了以下方法:
调用插件方法时需要在方法前面提供namespace,比如调用loadHdf5时hdf5::loadHdf5,如果不想每次调用都使用namespace,可以使用use关键字:
点击(此处)折叠或打开
要使用DolphinDB的插件,首先需要下载HDF5插件,再将插件部署到节点的plugins目录下,在使用插件之前需要先加载,使用下面的脚本:
点击(此处)折叠或打开
HDF5文件的导入与CSV文件大同小异,比如我们要将示例HDF5文件candle_201801.h5导入,它包含一个Dataset:candle_201801,那么最简单的导入方式如下:
点击(此处)折叠或打开
如果需要指定数据类型导入可以使用hdf5::extractHdf5Schema,脚本如下:
点击(此处)折叠或打开
如果HDF5文件非常庞大,工作机内存无法支持全量载入,可以使用hdf5::loadHdf5Ex方式来载入数据。
首先创建用于保存数据的分布式表:
点击(此处)折叠或打开
然后将HDF5文件通过hdf5::loadHdf5Ex函数导入:
点击(此处)折叠或打开
DolphinDB支持ODBC接口连接第三方数据库,从数据库中直接将表读取成DolphinDB的内存数据表。使用DolphinDB提供的ODBC插件可以方便地从ODBC支持的数据库中迁移数据至DolphinDB中。
ODBC插件提供了以下四个方法用于操作第三方数据源数据:
在使用ODBC插件前,需要先安装ODBC驱动,请参考ODBC插件使用教程。
下面以连接 SQL Server 作为实例,现有数据库的具体配置为:
server:172.18.0.15
默认端口:1433
连接用户名:sa
密码:123456
数据库名称: SZ_TAQ
数据库表选2016年1月1日的数据,表名candle_201801,字段与CSV文件相同。
要使用ODBC插件连接SQL Server数据库,首先第一步是下载插件解压并拷贝plugins\odbc目录下所有文件到DolphinDB Server的plugins/odbc目录下,通过下面的脚本完成插件初始化:
点击(此处)折叠或打开
在导入数据之前,先创建分布式磁盘数据库用于保存数据:
点击(此处)折叠或打开
从SQL Server中导入数据并保存成DolphinDB分区表:
点击(此处)折叠或打开
通过ODBC导入数据避免了文件导出导入的过程,而且通过DolphinDB的定时作业机制,它还可以作为时序数据定时同步的数据通道。
下面以证券市场日K线图数据文件导入作为示例,数据以CSV文件格式保存在磁盘上,共有10年的数据,按年度分目录保存,一共大约100G的数据,路径示例如下:
2008
---- 000001.csv
---- 000002.csv
---- 000003.csv
---- 000004.csv
---- ...
2009
...
2018
每个文件的结构都是一致的,如图所示:
4.1 分区规划
要导入数据之前,首先要做好数据的分区规划,这涉及到两个方面的考量:
首先根据日常的查询语句执行频率,我们采用trading和symbol两个字段进行组合范围(RANGE)分区,通过对常用检索字段分区,可以极大的提升数据检索和分析的效率。
接下来要做的是分别定义两个分区的粒度。
现有数据的时间跨度是从2008-2018年,所以这里按照年度对数据进行时间上的划分,在规划时间分区时要考虑为后续进入的数据留出足够的空间,所以这里把时间范围设置为2008-2030年。
点击(此处)折叠或打开
这里股票代码有几千个,如果对股票代码按值(VALUE)分区,那么每个分区只是几兆大小,而分区数量则很多。分布式系统在执行查询时,会将查询语句分成多个子任务分发到不同的分区执行,所以按值分区方式会导致任务数量非常多,而任务执行时间极短,导致系统在管理任务上花费的时间反而大于任务本身的执行时间,这样的分区方式明显是不合理的。这里我们按照范围将所有股票代码均分成100个区间,每个区间作为一个分区,最终分区的大小约100M左右。 考虑到后期有新的股票数据进来,所以增加了一个虚拟的代码999999,跟最后一个股票代码组成一个分区,用来保存后续新增股票的数据。
通过下面的脚本得到 symbol 字段的分区范围:
点击(此处)折叠或打开
需要注意的是,分区是DolphinDB存储数据的最小单位,DolphinDB对分区的写入操作是独占式的,当任务并行进行的时候,需要避免多任务同时向一个分区写入数据。本案例中每年的数据交给一个单独任务去做,各任务操作的数据边界没有重合,所以不可能发生多任务写入同一分区的情况。
4.2 导入数据
数据导入脚本的主要思路很简单,就是通过循环目录树,将所有的CSV文件逐个读取并写入到分布式数据库表dfs://SAMPLE_TRDDB中,但是具体导入过程中还是会有很多细节问题。
首先碰到的问题是,CSV文件中保存的数据格式与DolphinDB内部的数据格式存在差异,比如time字段,文件里是以“9390100000”表示精确到毫秒的时间,如果直接读入会被识别成数值类型,而不是time类型,所以这里需要用到数据转换函数datetimeParse结合格式化函数format在数据导入时进行转换。 关键脚本如下:
点击(此处)折叠或打开
虽然通过循环导入实现起来非常简单,但是实际上100G的数据是由极多的5M左右的细碎文件组成,如果单线程操作会等待很久,为了充分利用集群的资源,所以我们按照年度把数据导入拆分成多个子任务,轮流发送到各节点的任务队列并行执行,提高导入的效率。这个过程分下面两步实现:
(1)定义一个自定义函数,函数的主要功能是导入指定年度目录下的所有文件:
点击(此处)折叠或打开
(2)通过 rpc 函数结合 submitJob 函数把上面定义的函数提交到各节点去执行:
点击(此处)折叠或打开
数据导入过程中,可以通过pnodeRun(getRecentJobs)来观察后台任务的完成情况。
案例完整脚本