Disparadores CDC – Change Data Capture

Los disparadores CDC – Change Data Capture o identificación, captura y procesamiento de datos, son los únicos disparadores que se aceptan en entornos data warehouse, su misión consiste en reflejar los cambios realizados en las tablas origen, no esto se consigue tener en tablas de logs de todos los cambios que se van produciendo.

La implementación de este tipo de disparadores consta de las siguientes partes:

Definición de la tabla donde almacenar logs

Definir la estructura que albergará los cambios producidos en la tabla tb_category

-- Crear esquema de logs
CREATE SCHEMA log;


CREATE TABLE log.tb_cdc_category
(
  category_id char(5) NOT NULL,
  category_name char(60) NOT NULL,
  created_date date,
  operation char(1),
  user_id char(50),
  operation_timestamp date
);

Definición del disparador con la función que lo implementa.

Crear un disparador con su función que refleje los cambios producidos en la tabla tb_category.

-----------------------------------------------------------------------------
--
-- Función que carga los cambios producidos en la tabla tb_category
--
-----------------------------------------------------------------------------

CREATE OR REPLACE FUNCTION log.cdc_category() 
RETURNS trigger AS
$$
/* 
 *  Procedimiento: cdc_category
 *  Autor: Diego Calvo Barreno
 *  Fecha creación: 2017-05-29
 *  Versión: 1.0
 *  Parámetros:  sin parámetros
 *  Descripción: Procedimiento que cargará en la tabla tb_cdc_category los cambios
 *               producidos a nivel de fila tras cualquier operación en tb_category.
 *
 */
DECLARE
  -- Define la estructura temporal que servirá para realizar las operaciones
  temp_row log.tb_cdc_category%rowtype;
  
BEGIN
  
  -- Capturar el usuario y el momento de la operación
  temp_row.user_id := user;
  temp_row.operation_timestamp := now();
  
  -- Dependiendo de la operación capturada entra por una u otra condición del IF
  
  -- If DELETE, inserta el valor antiguo que captura con OLD 
  IF (TG_OP = 'DELETE') THEN
      temp_row.category_id := OLD.category_id;
      temp_row.category_name := OLD.category_name;
      temp_row.created_date := OLD.created_date;
      temp_row.operation := 'D';
      
      BEGIN
      -- Inserta los valores en la tabla CDC
      INSERT INTO log.tb_cdc_category SELECT temp_row.*;
      
      -- Si la fila existe entonces actualiza su valor
      EXCEPTION
        WHEN unique_violation THEN 
          UPDATE log.tb_cdc_category
             SET (category_name, created_date, operation, user_id, operation_timestamp) = 
                   (temp_row.category_name, temp_row.created_date, 
                    temp_row.operation, temp_row.user_id, temp_row.operation_timestamp)
           WHERE tb_cdc_category.category_id = temp_row.category_id;
      END;
      
      -- Devuelve el la estructura grabada OLD
      RETURN OLD;
  
  -- If UPDATE, actualiza el valor nuevo que captura con NEW 
  ELSIF (TG_OP = 'UPDATE') THEN
      temp_row.category_id := NEW.category_id;
      temp_row.category_name := NEW.category_name;
      temp_row.created_date := NEW.created_date;
      temp_row.operation := 'U';
      
      BEGIN
      -- Inserta los valores en la tabla CDC
      INSERT INTO log.tb_cdc_category SELECT temp_row.*;
      
      -- Si la fila existe entonces actualiza su valor
      EXCEPTION
        WHEN unique_violation THEN 
          UPDATE log.tb_cdc_category
             SET (category_name, created_date, operation, user_id, operation_timestamp) = 
                   (temp_row.category_name, temp_row.created_date, 
                    temp_row.operation, temp_row.user_id, temp_row.operation_timestamp)
           WHERE tb_cdc_category.category_id = temp_row.category_id;
      END;
      
      -- Devuelve el la estructura grabada NEW
      RETURN NEW;

  -- If INSERT, inserta el valor nuevo que captura con NEW 
  ELSIF (TG_OP = 'INSERT') THEN
      temp_row.category_id := NEW.category_id;
      temp_row.category_name := NEW.category_name;
      temp_row.created_date := NEW.created_date;
      temp_row.operation := 'U';
      
      BEGIN
      -- Inserta los valores en la tabla CDC
      INSERT INTO log.tb_cdc_category SELECT temp_row.*;
      
      -- Si la fila existe entonces actualiza su valor
      EXCEPTION
        WHEN unique_violation THEN 
          UPDATE log.tb_cdc_category
             SET (category_name, created_date, operation, user_id, operation_timestamp) = 
                   (temp_row.category_name, temp_row.created_date, 
                    temp_row.operation, temp_row.user_id, temp_row.operation_timestamp)
           WHERE tb_cdc_category.category_id = temp_row.category_id;
      END;
      
      -- Devuelve el la estructura grabada NEW
      RETURN NEW;
      
  END IF;
  
  -- Result is ignored since this is an AFTER trigger
  RETURN NULL; 

END; -- Fin del procedimiento  
$$
-- Importante definir el lenguaje utilizado
LANGUAGE plpgsql;


----------------------------------------------------------------------------
--
-- Definición del disparador que llama a la función anteriormente descrita
--
-----------------------------------------------------------------------------

CREATE TRIGGER tg_log_category 
  -- Este disparador se activa con los eventos de inserción, actualización y borrado
  AFTER INSERT OR UPDATE OR DELETE ON ventas.tb_category
  FOR EACH ROW
  EXECUTE PROCEDURE log.cdc_category()
;

Nota: Para más información acerca de trigger en PostgreSQL consultar la documentación

Autor: Diego Calvo