Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1981497
  • 博文数量: 221
  • 博客积分: 10045
  • 博客等级: 上将
  • 技术积分: 2252
  • 用 户 组: 普通用户
  • 注册时间: 2005-01-25 20:28
文章分类

全部博文(221)

文章存档

2012年(1)

2008年(4)

2007年(11)

2006年(26)

2005年(179)

我的朋友

分类: Oracle

2005-05-19 20:08:31

Frank Pantaleo,

The body of work that follows was born out of a desire to replicate data out of production. Of course we needed to do this while minimizing the effect on production. The best way to do this was to leverage APIs that Oracle made available in 8i. Some or all of the logic provided becomes unnecessary in 9i with the introduction of Oracle streams. However, I would contend that the procedures below provide a better level of control. I made a pitch to a technical community that I am involved in. The pitch was based on an article from George Jucan at . The article was called "Using Oracle LogMiner as a Data Replication Utility". This article explains the way a program or set of programs could make use of this API. Our business had already leveraged information available in the logminer to track activity in an application.

There are some C applications involved in this as well. I am only supplying the PL/SQL and C Oracle external functions as I feel this is enough to get started. Some of these file and directory procs were created to reproduce functions that are now available in 9i.

  • Dir_proc – C Oracle external application to get a list of all files in a unix directory
     
  • File_proc – C Oracle external application to determine the existence of a unix file
     
  • File_del_proc – C oracle external application to delete a file in /tmp
     
  • Get_ora_tab – Function that drives the diy$oratab view
     
  • Logminer_stats – Procedure that returns counts of local logmnr_contents store
     
  • Load_logminer – workhorse of the application. Identify, Extract, and delete archive log into a local store
So how does it plug together? I have another program that creates a known state. A known state is required because of a deficiency in 8i logminer. 8i Logminer does not capture key information for deletes and updates. So what does it capture? Oracle 8i logminer captures the rowid for update and deletes. This is addressed in 9i where logminer captures key information on delete and update if the table being captured has a primary/unique key. I did not have this luxury in 8i. So the initial table state is captured from the source instance including the rowid of each row from the source table. I capture this in a column called drowid. The drowid is used to apply the update or delete to the destination table data. This was a hack, but a necessary hack based on what was available. Once a state is captured we can then move on to the logic supplied here and extract changes from a set of archive logs. So the process is:
  1. Enable archive logging in the source instance.
     
  2. Create a known state – this is only necessary in 8i and then only if you need to worry about update/deletes.
     
  3. Copy all archive logs from source to the machine where the destination instance lives – this is ongoing afterward.
     
  4. Change the init.ora of the destination instance to have log_archive_dest point to the archive logs that have been captured from the source instance.
     
  5. At regular intervals:
    1. Run the load_logminer proc provided.
    2. Apply the dml in logmnr_contents to the destination instance.
    3. Update checkpoint_change# in lmstate_checkpoint as each dml is applied.
    4. Commit after the scn changes in logmnr_contents table.
PL/SQL Source
DROP VIEW diy$oratab;
DROP FUNCTION get_oratab;
DROP TYPE oratab_type;
DROP TYPE oratab_row_type;

DROP LIBRARY os_lib;
CREATE LIBRARY os_lib IS '/u01/apps/oracle/product/8.1.7/lib/osfunc.so';
/

CREATE OR REPLACE PROCEDURE dir_proc (filename IN CHAR, PATH IN CHAR)
AS
EXTERNAL
   LIBRARY os_lib
   NAME "dir_func"
   LANGUAGE c
   PARAMETERS (
      filename STRING,
      PATH STRING
   );
/

--

CREATE OR REPLACE PROCEDURE file_proc (
   filename      IN       CHAR,
   bexists       OUT      BINARY_INTEGER,
   file_size     OUT      BINARY_INTEGER,
   block_count   OUT      BINARY_INTEGER
)
AS
EXTERNAL
   LIBRARY os_lib
   NAME "file_func"
   LANGUAGE c
   WITH CONTEXT
   PARAMETERS (
      CONTEXT,
      filename STRING,
      bexists INT,
      file_size INT,
      block_count INT
   );
/

CREATE OR REPLACE PROCEDURE file_del_proc (filename IN CHAR)
AS
EXTERNAL
   LIBRARY os_lib
   NAME "file_del_func"
   LANGUAGE c
   PARAMETERS (
      filename STRING
   );
/

DROP TABLE lmstate_checkpoint;
CREATE TABLE lmstate_checkpoint (checkpoint_change#  NUMBER);
--
DROP TABLE lmsubscribe;
CREATE TABLE lmsubscribe (
  owner       VARCHAR2 (30),
  table_name  VARCHAR2 (30) );
--
DROP TABLE lmtables;
CREATE TABLE lmtables (
  owner       VARCHAR2 (30),
  table_name  VARCHAR2 (30),
  load_order  NUMERIC DEFAULT 10 NOT NULL);
--
DROP TABLE lm_log;
CREATE TABLE lm_log (lm_state VARCHAR2(2000));
--
DROP TABLE logmnr_contents;
CREATE TABLE logmnr_contents (
  SCN           NUMBER,
  TIMESTAMP     DATE,
  thread#       NUMBER,
  log_id        NUMBER,
  xidusn        NUMBER,
  xidslt        NUMBER,
  xidsqn        NUMBER,
  rbasqn        NUMBER,
  rbablk        NUMBER,
  rbabyte       NUMBER,
  ubafil        NUMBER,
  ubablk        NUMBER,
  ubarec        NUMBER,
  ubasqn        NUMBER,
  abs_file#     NUMBER,
  rel_file#     NUMBER,
  data_blk#     NUMBER,
  data_obj#     NUMBER,
  data_objd#    NUMBER,
  seg_owner     VARCHAR2(32),
  seg_name      VARCHAR2(32),
  seg_type      NUMBER,
  seg_type_name VARCHAR2(32),
  table_space   VARCHAR2(32),
  row_id        VARCHAR2(19),
  session#      NUMBER,
  serial#       NUMBER,
  username      VARCHAR2(32),
  session_info  VARCHAR2(4000),
  ROLLBACK      NUMBER,
  operation     VARCHAR2(32),
  sql_redo      VARCHAR2(4000),
  sql_undo      VARCHAR2(4000),
  rs_id         VARCHAR2(32),
  ssn           NUMBER,
  csf           NUMBER,
  info          VARCHAR2(32),
  status        NUMBER,
  ph1_name      VARCHAR2(32),
  ph1_redo      VARCHAR2(2000),
  ph1_undo      VARCHAR2(2000),
  ph2_name      VARCHAR2(32),
  ph2_redo      VARCHAR2(2000),
  ph2_undo      VARCHAR2(2000),
  ph3_name      VARCHAR2(32),
  ph3_redo      VARCHAR2(2000),
  ph3_undo      VARCHAR2(2000),
  ph4_name      VARCHAR2(32),
  ph4_redo      VARCHAR2(2000),
  ph4_undo      VARCHAR2(2000),
  ph5_name      VARCHAR2(32),
  ph5_redo      VARCHAR2(2000),
  ph5_undo      VARCHAR2(2000)
);
--
DROP TABLE get_oratab_setting;
CREATE GLOBAL TEMPORARY TABLE get_oratab_setting
(thedir VARCHAR2(200))
ON COMMIT PRESERVE ROWS;
--

CREATE TYPE oratab_row_type AS OBJECT (
   file_name   VARCHAR2 (100)
);
/

CREATE TYPE oratab_type IS TABLE OF oratab_row_type;
/

--

CREATE OR REPLACE FUNCTION get_oratab
   RETURN oratab_type
IS
   ora_tab       oratab_type  := oratab_type (oratab_row_type (NULL));
   f_handle      UTL_FILE.file_type;
   i_pos         INTEGER;
   v_file_name   VARCHAR2 (100);
   b_read        BOOLEAN            := TRUE;
   b_first       BOOLEAN            := TRUE;
   tmp_file      VARCHAR2 (50);
   mydir         VARCHAR2 (200);
BEGIN
   --
   -- Note that in order to make the code shorter all the
   -- utl_file defined exceptions are left unhandled.
   --
   tmp_file := 'oracle_' || TO_CHAR (SYSDATE, 'yyyymmddhhss');

   BEGIN
      SELECT thedir
        INTO mydir
        FROM get_oratab_setting;
   EXCEPTION
      WHEN NO_DATA_FOUND
      THEN
         mydir := NULL;
   END;

   IF mydir IS NOT NULL
   THEN
      dir_proc ('/tmp/' || tmp_file, mydir);
      f_handle := UTL_FILE.fopen ('/tmp', tmp_file, 'r');

      WHILE b_read
      LOOP
         BEGIN
            UTL_FILE.get_line (f_handle, v_file_name);

            IF b_first
            THEN
               b_first := FALSE;
            ELSE
               ora_tab.EXTEND;
            END IF;

            ora_tab (ora_tab.LAST) :=
                                 oratab_row_type (RTRIM (v_file_name));
         EXCEPTION
            WHEN NO_DATA_FOUND
            THEN
               b_read := FALSE;
         END;
      END LOOP;

      UTL_FILE.fclose (f_handle);
   END IF;

   file_del_proc (tmp_file);
   RETURN ora_tab;
END;
/

CREATE OR REPLACE VIEW diy$oratab
AS
   SELECT *
     FROM TABLE (CAST (get_oratab () AS oratab_type));
/

CREATE OR REPLACE PROCEDURE "LOGMINER_STATS" (
   insert_count   OUT   INTEGER,
   delete_count   OUT   INTEGER,
   update_count   OUT   INTEGER,
   total_count    OUT   INTEGER
)
IS
   empty_logmnr_contents   EXCEPTION;
   PRAGMA EXCEPTION_INIT (empty_logmnr_contents, -1306);

   CURSOR the_csr
   IS
      SELECT   COUNT (*) the_count, operation
          FROM logmnr_contents, lmsubscribe b
         WHERE seg_owner = b.owner
           AND seg_name = b.table_name
           AND operation IN ('INSERT', 'DELETE', 'UPDATE')
      GROUP BY operation;
BEGIN
   insert_count := 0;
   update_count := 0;
   delete_count := 0;
   total_count := 0;

   FOR the_rec IN the_csr
   LOOP
      IF the_rec.operation = 'INSERT'
      THEN
         insert_count := the_rec.the_count;
      ELSIF the_rec.operation = 'DELETE'
      THEN
         delete_count := the_rec.the_count;
      ELSIF the_rec.operation = 'UPDATE'
      THEN
         update_count := the_rec.the_count;
      ELSE
         NULL;
      END IF;
   END LOOP;

   total_count := insert_count + delete_count + update_count;
EXCEPTION
   WHEN NO_DATA_FOUND
   THEN
      NULL;
   WHEN empty_logmnr_contents
   THEN
      NULL;
   WHEN OTHERS
   THEN
      raise_application_error (-20000,
                               'Error in LOGMNR_CONTENTS View'
                              );
END logminer_stats;
/

PROCEDURE "LOAD_LOGMINER"
AS
   empty_logmnr_contents   EXCEPTION;
   sid_file                VARCHAR2 (30);
   arch_dir                VARCHAR2 (100);
   work_file               VARCHAR2 (200);
   arch_count              NUMBER         := 0;

   CURSOR the_csr
   IS
      SELECT file_name
        FROM diy$oratab;

   first_time              BOOLEAN        := TRUE;
   start_scn_local         NUMBER;
   PRAGMA EXCEPTION_INIT (empty_logmnr_contents, -1306);
BEGIN
/* clean the slate of all prior activity */
   DELETE FROM logmnr_contents;

   DELETE FROM lm_log;

   DELETE FROM get_oratab_setting;

   COMMIT;

/* determine location of archive logs so we can get */ 
/* a directory of log_archive_dest */
   SELECT VALUE
     INTO arch_dir
     FROM SYS.v_$parameter
    WHERE NAME = 'log_archive_dest';

/* prime get_oratab_setting with directory name of archive logs */
   INSERT INTO get_oratab_setting
               (thedir
               )
        VALUES (arch_dir
               );

   COMMIT;

/* determine checkpoint of what has been applied to date */
   SELECT checkpoint_change#
     INTO start_scn_local
     FROM lmstate_checkpoint;

/* go through each archive log and add to the local */
/* logmnr_contents table where applicable */
   BEGIN
      FOR the_rec IN the_csr
      LOOP
         work_file := the_rec.file_name;

         IF work_file IS NOT NULL
         THEN
            INSERT INTO lm_log
                        (lm_state
                        )
                 VALUES ('start loading archive log ' || work_file
                        );

            COMMIT;
            SYS.DBMS_LOGMNR.add_logfile
                                       (logfilename => work_file,
                                        options     => SYS.DBMS_LOGMNR.NEW
                                       );
            first_time := FALSE;

            BEGIN
/* use logfile generated from local or foreign database */
               SYS.DBMS_LOGMNR.start_logmnr
                  (dictfilename => '/u01/apps/oracle/product/8.1.7/dbs/SEED_dict.ora'
                  );
               COMMIT;
            EXCEPTION
               WHEN OTHERS
               THEN
                  raise_application_error
                           (-20000,
                               'Error in LOAD_LOGMINER start_logmnr '
                            || SQLERRM
                            || SQLCODE
                           );
            END;

            BEGIN
               /* strip what we want out of v_$logmnr_contents into our local copy */
               INSERT      /*+ APPEND */INTO logmnr_contents
                  SELECT a.*
                    FROM SYS.v_$logmnr_contents a;

               /*Add any conditional logic here e.g. …*/
               /*where seg_owner = 'SOME_OWNER' and seg_name = ‘SOME_TABLE’ */
               COMMIT;
            EXCEPTION
               WHEN NO_DATA_FOUND
               THEN
                  NULL;
               WHEN empty_logmnr_contents
               THEN
                  NULL;
               WHEN OTHERS
               THEN
                  raise_application_error
                                  (-20002,
                                      'Error in LOGMNR_CONTENTS View'
                                   || SQLERRM
                                   || SQLCODE
                                  );
            END;

            BEGIN
               /* end for this log and delete it */
               SYS.DBMS_LOGMNR.end_logmnr;

               INSERT INTO lm_log
                           (lm_state
                           )
                    VALUES ('end loading archive log ' || work_file
                           );

               COMMIT;
               SYS.DBMS_BACKUP_RESTORE.deletefile (work_file);
            EXCEPTION
               WHEN OTHERS
               THEN
                  NULL;
            END;
         END IF;
      END LOOP;
   EXCEPTION
      WHEN OTHERS
      THEN
         raise_application_error (-20010,
                                     'Error in LOAD_LOGMINER add '
                                  || work_file
                                  || SQLERRM
                                  || SQLCODE
                                 );
   END;

/* ok now we have our local store of activity clean it up and */
/* prep it to be used in the apply program. Put the prepped sql */
/* in ph1_redo column which is unused */
   BEGIN
      UPDATE logmnr_contents
         SET ph1_redo =
                REPLACE (REPLACE (REPLACE (sql_redo, 'ROWID',
                                           'DROWID'),
                                  '"',
                                  ''
                                 ),
                         ';',
                         ''
                        )
       WHERE ph1_redo IS NULL AND operation IN ('UPDATE', 'DELETE');

--
      UPDATE logmnr_contents
         SET ph1_redo =
                REPLACE
                      (REPLACE (REPLACE (REPLACE (sql_redo,
                                                  ') 
		values',
                                                  ',drowid) values'
                                                 ),
                                         ');',
                                            ','
                                         || CHR (39)
                                         || ROWIDTOCHAR (row_id)
                                         || CHR (39)
                                         || ');'
                                        ),
                                '"',
                                ''
                               ),
                       ';',
                       ''
                      )
       WHERE ph1_redo IS NULL
         AND seg_owner = 'SOME_SCHEMA'
         AND operation = 'INSERT';

--
      UPDATE logmnr_contents
         SET ph1_redo = REPLACE (REPLACE (sql_redo, '"', ''), ';', '')
       WHERE ph1_redo IS NULL
         AND seg_owner = 'SOME_OTHER_OWNER'
         AND operation = 'INSERT';

--
      DELETE FROM logmnr_contents
            WHERE ph1_redo IS NULL;

--
      COMMIT;
   END;
END;
C Source for Oracle External Functions
#include  
#include  
#include  
#include 
#include 
#include 
#ifndef OCI_ORACLE
# include 
#endif

void dir_func(char *FileName,char *Path); 
void file_func(OCIExtProcContext *,char *,int *,int *,int *); 
void file_del_func(char *FileName); 

void dir_func(char *FileName,char *Path) { 
	int num; 
        static   FILE   *logfilep = NULL;
        DIR *mydir;
        struct dirent *dp;
	//struct stat mybuff;
	char work_file[100] = " ";

        if ((logfilep = fopen(FileName, "w")) != NULL) {};
//        fprintf(logfilep, "# file = %s path = %s
",FileName,Path);
        mydir = opendir(Path);

        while ((dp = readdir (mydir)) != NULL) {
		if ((strcmp(dp->d_name,".") == 0) || 
		    (strcmp(dp->d_name,"..") == 0)) {}
		else {
			strcpy(work_file,dp->d_name);
			//stat(work_file,&mybuff);		
			//fprintf(logfilep, "%s/%s:%i:%i
" ,
			          Path,dp->d_name,mybuff.st_size,mybuff.st_blocks);
			fprintf(logfilep, "%s/%s
" ,Path,dp->d_name);
		}
        }

        fclose(logfilep);
}

void file_func(OCIExtProcContext *with_context,char *FileName,
                int *exists,int *filesize,int *block_count) {
        int rtn =0;
        struct stat mybuff;
        long file_size=0;
        char mybyte;

        *exists=1;
//
        rtn = stat(FileName,&mybuff);
        if (rtn ==0) {
                *filesize=mybuff.st_size;
                *block_count=mybuff.st_blocks;
                *exists=0;
        }
        else
        {
                *exists=3;
                *filesize=0;
                *block_count=0;
        }
        *exists=rtn;
}

void file_del_func(char *FileName) {
        int rtn =0;
        struct stat mybuff;
        long file_size=0;
	char myFileName[200] = "/tmp/";
//
	strcat(myFileName,FileName);	
        rtn = stat(myFileName,&mybuff);
        if (rtn ==0) {
		unlink(myFileName);
	}
}
阅读(1613) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~