ETL是DW的数据准备的前期过程,将花费DW生命周期的大约70%的时间,下面我再结合ORACLE BI谈一谈ETL的过程。
一:Extraction
1)直接提取
2)增量提取:
a)基于timestamp:看看下面的例子就明白了:得到订单生成当天的所有订单
SELECT * FROM orders
WHERE TRUNC(CAST(order_date AS date),'dd') =
TO_DATE(SYSDATE,'dd-mon-yyyy');
b)基于PARTITION:比如说:如果源表基于时间的WEEK做了PARTITION,那么很容易查询到某 一周的所有数据。比如:SELECT * FROM orders PARTITION (orders_jan1998);
c)基于TRIGER:
使用触发器,处理DML操作以后数据可以同时操作到目标数据库中。
d)使用ORACLE CHANGE DATA CAPTURE.
我比较推荐下面的方式提取
CREATE TABLE country_city AS SELECT distinct t1.country_name, t2.cust_city
FROM t1,
customers@source_db t2
WHERE t1.country_id = t2.country_id
AND t1.country_name='United States of America';
通过SQL语句和DBLINK的方式直接提取,同时也建立的表格。减少了繁琐的传输过程。
二:Transporation (如果通过分布式查询可以跳过此步)
有3种传输的类型:
A source system to a staging database or a data warehouse database
A staging database to a data warehouse
A data warehouse to a data mart
以下传输方式:
1)直接使用FLAT FILE 然后FTP。
2)通过分布式查询,可以提取和传输一步到位,但是只适合ORACLE数据库之间,而且要成功配置DBLINK。
3)Transportable tablespaces
此方式是最快的在两个ORACLE之间传输大量数据的方式。但是有很多限制。
三:Transformations (转化过程可以使用SQL或PL/SQL实现,本文只是谈谈使用SQL实现的方式,使用PL/SQL实现将在后面的 文章中提到)
一般有下列两种方式:
Mutistage Transformation:
这种方式每一步转化都生成一个临时表,转化过程用SQL或PL/SQL实现,但是比较消耗空间和时间。
适合Transformation的SQL技巧有很多,最常用的有如下:
1)CREATE TABLE ... AS SELECT
2)INSERT /*+APPEND*/ ... AS SELECT
INSERT /*+ APPEND NOLOGGING PARALLEL */ INTO sales
SELECT product_id, customer_id, TRUNC(sales_date), 3,
promotion_id, quantity, amount
FROM sales_activity_direct;
3) MERGE
MERGE INTO products t USING products_delta s
ON (t.prod_id=s.prod_id)
WHEN MATCHED THEN UPDATE SET
t.prod_list_price=s.prod_list_price, t.prod_min_price=s.prod_min_price
WHEN NOT MATCHED THEN INSERT (prod_id, prod_name, prod_desc, prod_subcategory,
prod_subcategory_desc, prod_category, prod_category_desc, prod_status,
prod_list_price, prod_min_price)
VALUES (s.prod_id, s.prod_name, s.prod_desc, s.prod_subcategory,
s.prod_subcategory_desc, s.prod_category, s.prod_category_desc,
s.prod_status, s.prod_list_price, s.prod_min_price);
4) Unconditional Insert(注意insert到sales和costs的所有的值都要在后面的SELECT中查询出来,而且别 名要和insert的字段名称一致)
insert all :对于每一行数据,对每一个when条件都进行检查,如果满足条件就执行插入操作。
INSERT ALL
INTO sales VALUES (product_id, customer_id, today, 3, promotion_id,
quantity_per_day, amount_per_day)
INTO costs VALUES (product_id, today, promotion_id, 3,
product_cost, product_price)
SELECT TRUNC(s.sales_date) AS today, s.product_id, s.customer_id,
s.promotion_id, SUM(s.amount) AS amount_per_day, SUM(s.quantity)
quantity_per_day, p.prod_min_price*0.8 AS product_cost, p.prod_list_price
AS product_price
FROM sales_activity_direct s, products p
WHERE s.product_id = p.prod_id AND TRUNC(sales_date) = TRUNC(SYSDATE)
GROUP BY TRUNC(sales_date), s.product_id, s.customer_id, s.promotion_id,
p.prod_min_price*0.8, p.prod_list_price;
5) Conditional ALL Insert(利用insert all使得INSERT语句可以同时插入多张表,还可以根据判断条件来决定 每条记录插入到哪张或哪几张表中。)
insert all :对于每一行数据,对每一个when条件都进行检查,如果满足条件就执行插入操作。
INSERT ALL
WHEN promotion_id IN (SELECT promo_id FROM promotions) THEN
INTO sales VALUES (product_id, customer_id, today, 3, promotion_id,
quantity_per_day, amount_per_day)
INTO costs VALUES (product_id, today, promotion_id, 3,
product_cost, product_price)
WHEN num_of_orders > 1 THEN
INTO cum_sales_activity VALUES (today, product_id, customer_id,
promotion_id, quantity_per_day, amount_per_day, num_of_orders)
SELECT TRUNC(s.sales_date) AS today, s.product_id, s.customer_id,
s.promotion_id, SUM(s.amount) AS amount_per_day, SUM(s.quantity)
quantity_per_day, COUNT(*) num_of_orders, p.prod_min_price*0.8
AS product_cost, p.prod_list_price AS product_price
FROM sales_activity_direct s, products p
WHERE s.product_id = p.prod_id
AND TRUNC(sales_date) = TRUNC(SYSDATE)
GROUP BY TRUNC(sales_date), s.product_id, s.customer_id,
s.promotion_id, p.prod_min_price*0.8, p.prod_list_price;
6)Conditional FIRST Insert(insert first:对于每一行数据,只插入到第一个when条件成立的表,不继续检查 其他条件。)
INSERT FIRST WHEN (sum_quantity_sold > 10 AND prod_weight_class < 5) AND
sum_quantity_sold >=1) OR (sum_quantity_sold > 5 AND prod_weight_class > 5) THEN
INTO large_freight_shipping VALUES
(time_id, cust_id, prod_id, prod_weight_class, sum_quantity_sold)
WHEN sum_amount_sold > 1000 AND sum_quantity_sold >=1 THEN
INTO express_shipping VALUES
(time_id, cust_id, prod_id, prod_weight_class,
sum_amount_sold, sum_quantity_sold)
WHEN (sum_quantity_sold >=1) THEN INTO default_shipping VALUES
(time_id, cust_id, prod_id, sum_quantity_sold)
ELSE INTO incorrect_sales_order VALUES (time_id, cust_id, prod_id)
SELECT s.time_id, s.cust_id, s.prod_id, p.prod_weight_class,
SUM(amount_sold) AS sum_amount_sold,
SUM(quantity_sold) AS sum_quantity_sold
FROM sales s, products p
WHERE s.prod_id = p.prod_id AND s.time_id = TRUNC(SYSDATE)
GROUP BY s.time_id, s.cust_id, s.prod_id, p.prod_weight_class;
但是在使用Insert All语句时要注意有如下限制:
多表插入语句的限制条件
只能对表执行多表插入语句,不能对视图或物化视图执行;
不能对远端表执行多表插入语句;
不能使用表集合表达式;
不能超过999个目标列;
在RAC环境中或目标表是索引组织表或目标表上建有BITMAP索引时,多表插入语句不能并行执行;
多表插入语句不支持执行计划稳定性;
多表插入语句中的子查询不能使用序列。
四:LOAD
LOAD的过程是把stageing area中的flat文件导入到ROLAP模型的DW中的过程。
1)Oracle Sql Loader是一个选择。例如:
先写控制文件:sh_sales.ctl
LOAD DATA INFILE sh_sales.dat APPEND INTO TABLE sales
FIELDS TERMINATED BY "|"
(PROD_ID, CUST_ID, TIME_ID, CHANNEL_ID, PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD)
运用下列命令执行:
$ sqlldr sh/sh control=sh_sales.ctl direct=true
2)External Table
先建立外部表,例如:
CREATE TABLE sales_transactions_ext
(PROD_ID NUMBER, CUST_ID NUMBER,
TIME_ID DATE, CHANNEL_ID NUMBER,
PROMO_ID NUMBER, QUANTITY_SOLD NUMBER,
AMOUNT_SOLD NUMBER(10,2), UNIT_COST NUMBER(10,2),
UNIT_PRICE NUMBER(10,2))
ORGANIZATION external (TYPE oracle_loader
DEFAULT DIRECTORY data_file_dir ACCESS PARAMETERS
(RECORDS DELIMITED BY NEWLINE CHARACTERSET US7ASCII
BADFILE log_file_dir:'sh_sales.bad_xt'
LOGFILE log_file_dir:'sh_sales.log_xt'
FIELDS TERMINATED BY "|" LDRTRIM
( PROD_ID, CUST_ID,
TIME_ID DATE(10) "YYYY-MM-DD",
CHANNEL_ID, PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD,
UNIT_COST, UNIT_PRICE))
location ('sh_sales.dat')
)REJECT LIMIT UNLIMITED;
然后直接把数据LOAD到事实表中:
INSERT /*+ APPEND */ INTO COSTS
(TIME_ID, PROD_ID, UNIT_COST, UNIT_PRICE)
SELECT TIME_ID, PROD_ID, AVG(UNIT_COST), AVG(amount_sold/quantity_sold)
FROM sales_transactions_ext GROUP BY time_id, prod_id;