數(shù)據(jù)庫無論對于生產(chǎn)管理還是很多的實際應(yīng)用都非常重要。小編這次聊一下數(shù)據(jù)庫事件觸發(fā)的應(yīng)用。示例使用了postgresql和Python。在本文中,事件觸發(fā)和處理大概地分為兩類:
數(shù)據(jù)庫的事件觸發(fā)和服務(wù)器內(nèi)部處理(1~4)
數(shù)據(jù)庫事件觸發(fā)后,客戶端的程序檢測到該事件的觸發(fā)對應(yīng)的處理(5~6)
在數(shù)據(jù)庫系統(tǒng)中,事件觸發(fā)(通常指數(shù)據(jù)庫觸發(fā)器)以及讀取事件觸發(fā)的信息用于多種場景和需求。請看兩組示例(1~4)和(5~6)。
1. 數(shù)據(jù)一致性和完整性維護
當(dāng)對數(shù)據(jù)庫表中的數(shù)據(jù)進行插入、更新或刪除操作時,需要自動驗證或調(diào)整相關(guān)數(shù)據(jù),以確保它們符合業(yè)務(wù)規(guī)則或約束。例如,在一個訂單管理系統(tǒng)中,如果庫存數(shù)量減少到一定閾值以下,可以觸發(fā)一個警告或補貨請求。
Step 1-1: 創(chuàng)建數(shù)據(jù)庫表
假設(shè)我們有一個inventory表,它保存產(chǎn)品庫存的信息:
CREATE TABLE inventory ( product_id SERIAL PRIMARY KEY, product_name TEXT NOT NULL, quantity INT NOT NULL );
Step 1-2: 創(chuàng)建觸發(fā)函數(shù)
創(chuàng)建一個 PL/pgSQL 函數(shù),用于檢查庫存數(shù)量并記錄警告信息:
CREATE OR REPLACE FUNCTION check_inventory_threshold() RETURNS TRIGGER AS $$ BEGIN IF NEW.quantity < 10 THEN -- 假設(shè) 10 是閾值 -- 在此處記錄警告或使用某種方式發(fā)送通知 RAISE WARNING 'Product % is below threshold with quantity %', NEW.product_name, NEW.quantity; -- 可在此插入補貨請求或通知操作 END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;
Step 1-3: 創(chuàng)建觸發(fā)器
設(shè)置一個觸發(fā)器,更新inventory表時調(diào)用觸發(fā)函數(shù):
CREATE TRIGGER inventory_check_trigger AFTER INSERT OR UPDATE ON inventory FOREACHROWEXECUTEPROCEDUREcheck_inventory_threshold();Step 1-4: 使用 Python 進行外部操作
一個Python腳本可以用于監(jiān)控警告并執(zhí)行更復(fù)雜的操作,比如發(fā)送電子郵件或自動創(chuàng)建補貨單。以下是一個簡單的Python示例,假設(shè)你將警告日志記錄到一個專門的日志表中:
import psycopg2 from smtplib import SMTP def send_notification(product_name, quantity): # 發(fā)送郵件通知邏輯(確保你已設(shè)置SMTP服務(wù)器配置) with SMTP('smtp.example.com') as smtp: smtp.sendmail('from@example.com', 'to@example.com', f'Subject: Inventory Alert
Product {product_name} is below threshold with quantity {quantity}.') def check_and_notify(): try: # Connect to PostgreSQL database connection = psycopg2.connect( host="localhost", database="your_database", user="your_user", password="your_password" ) cursor = connection.cursor() # Query to check logs for low inventory query = """ SELECT product_name, quantity FROM inventory WHERE quantity < 10; """ cursor.execute(query) low_stock_items = cursor.fetchall() for product_name, quantity in low_stock_items: send_notification(product_name, quantity) except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() # Run the check and notify function check_and_notify()
2.自動化任務(wù)
自動執(zhí)行某些日常任務(wù),如記錄變化、生成日志或進行審計。當(dāng)某個表中的數(shù)據(jù)被修改時,觸發(fā)器可以自動記錄修改前后的數(shù)據(jù)以供審計,當(dāng)對特定表進行插入、更新或刪除操作時,觸發(fā)器能夠捕捉這些事件,并執(zhí)行相關(guān)的處理邏輯。 下面是一個如何使用 PostgreSQL 觸發(fā)器來記錄數(shù)據(jù)變化的示例。假設(shè)我們有一個名為employee_data的表,我們希望記錄每次數(shù)據(jù)更新時的操作者信息。
2-1. 創(chuàng)建一個用于日志記錄的表
首先,需要新建一個用于存儲變更日志的表。假設(shè)我們有一個名為employee_data的表,我們希望記錄每次數(shù)據(jù)更新時的操作者信息。
CREATE TABLE change_log ( id SERIAL PRIMARY KEY, table_name TEXT, operation VARCHAR(10), changed_by TEXT, change_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, old_data JSONB, new_data JSONB );
2-2. 創(chuàng)建一個觸發(fā)函數(shù)
接下來,創(chuàng)建一個觸發(fā)函數(shù)。當(dāng)employee_data表發(fā)生變化時,調(diào)用該函數(shù)來記錄變更,檢測并獲取old_data和new_data,然后通過row_to_json函數(shù)將其轉(zhuǎn)換為 JSONB 格式存入日志表中。處理中請留意不同的操作對應(yīng)的日志記錄內(nèi)容的差異。
CREATE OR REPLACE FUNCTION log_employee_data_changes() RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'DELETE' THEN INSERT INTO change_log (table_name, operation, changed_by, old_data) VALUES ( TG_TABLE_NAME, TG_OP, SESSION_USER, row_to_json(OLD) ); ELSE INSERT INTO change_log (table_name, operation, changed_by, old_data, new_data) VALUES ( TG_TABLE_NAME, TG_OP, SESSION_USER, row_to_json(OLD), row_to_json(NEW) ); END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;TG_OP是 PostgreSQL 觸發(fā)器函數(shù)中的一個特殊變量。在觸發(fā)器函數(shù)中,TG_OP用于表示觸發(fā)事件的操作類型。它會被設(shè)置為以下字符串值之一,以標識觸發(fā)器是由哪種數(shù)據(jù)庫操作激活的:
'INSERT': 觸發(fā)器是由插入操作激活的。
'UPDATE': 觸發(fā)器是由更新操作激活的。
'DELETE': 觸發(fā)器是由刪除操作激活的.
'TRUNCATE': 觸發(fā)器是由截斷操作激活的。
在觸發(fā)器函數(shù)中使用TG_OP,可以根據(jù)不同的操作類型執(zhí)行不同的邏輯。 2-3. 創(chuàng)建觸發(fā)器
最后,為employee_data表創(chuàng)建一個觸發(fā)器,當(dāng)發(fā)生INSERT、UPDATE或DELETE操作時調(diào)用觸發(fā)函數(shù):
CREATE TRIGGER employee_data_changes AFTER INSERT OR UPDATE OR DELETE ON employee_data FOR EACH ROW EXECUTE PROCEDURE log_employee_data_changes();
2-4.如果沒有對應(yīng)的表employee_data,就建一個來測試
CREATE TABLE employee_data ( employee_id SERIAL PRIMARY KEY, -- 員工唯一標識 first_name VARCHAR(50) NOT NULL, -- 員工的名字 last_name VARCHAR(50) NOT NULL, -- 員工的姓氏 email VARCHAR(100) UNIQUE NOT NULL, -- 員工的電子郵件地址 phone_number VARCHAR(15), -- 員工的聯(lián)系電話 hire_date DATE NOT NULL, -- 入職日期 job_title VARCHAR(50), -- 職位名稱 department VARCHAR(50), -- 所屬部門 salary NUMERIC(10, 2), -- 薪水 manager_id INT, -- 上級主管ID,指向另一個員工 CONSTRAINT fk_manager FOREIGN KEY(manager_id) REFERENCES employee_data(employee_id) ON DELETE SET NULL );
2-5. 如果表中沒有數(shù)據(jù)就添加一條來測試
INSERT INTO employee_data ( first_name, last_name, email, phone_number, hire_date, job_title, department, salary, manager_id ) VALUES ( 'ZZZ', -- First name 'AAA', -- Last name 'ZZZ.AAA@example.com', -- Email address '123-456-7890', -- Phone number '2023-11-01', -- Hire date 'Engineer', -- Job title 'Engineering', -- Department 75000, -- Salary NULL -- Manager ID (assuming no manager or manager not yet assigned) );
3. 跨表更新或同步:
當(dāng)一個表發(fā)生變化時時,觸發(fā)器可以用于自動更新或同步其他表的數(shù)據(jù)。例如,在一個多表關(guān)聯(lián)的系統(tǒng)中,有一個訂單表order和一個庫存表inventory,如果訂單表中數(shù)據(jù)有變化,就觸發(fā)更新庫存表中的對應(yīng)產(chǎn)品的數(shù)據(jù)。 3.1建表示例
CREATE TABLE inventory ( product_id SERIAL PRIMARY KEY, product_name VARCHAR(100), stock_quantity INT NOT NULL ); CREATE TABLE orders ( order_id SERIAL PRIMARY KEY, product_id INT REFERENCES inventory(product_id), quantity INT NOT NULL );
3.2 創(chuàng)建觸發(fā)事件
當(dāng)order表中已經(jīng)發(fā)生insert,updat或者delete事件時,就觸發(fā)下面的函數(shù)運行。實際數(shù)據(jù)的加減操作,請根據(jù)實際關(guān)系進行調(diào)整。這里的簡單邏輯是:
有新訂單添加時,就在庫存表中減少產(chǎn)品庫存數(shù)
訂單數(shù)據(jù)有更新時,就把庫存表中減去更新后訂單表中對應(yīng)產(chǎn)品的訂單數(shù)據(jù),然后加上更新前訂單表中對應(yīng)產(chǎn)品的數(shù)據(jù)
當(dāng)訂單取消(刪除)時,就會在庫存數(shù)據(jù)上增加之訂單表中刪除的舊數(shù)據(jù)
CREATE OR REPLACE FUNCTION update_inventory() RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'INSERT' THEN UPDATE inventory SET stock_quantity = stock_quantity - NEW.quantity WHERE product_id = NEW.product_id; ELSIF TG_OP = 'UPDATE' THEN UPDATE inventory SET stock_quantity = stock_quantity - NEW.quantity + OLD.quantity WHERE product_id = NEW.product_id; ELSIF TG_OP = 'DELETE' THEN UPDATE inventory SET stock_quantity = stock_quantity + OLD.quantity WHERE product_id = OLD.product_id; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;
3.3 創(chuàng)建事件觸發(fā)器
CREATE TRIGGER trigger_orders_update AFTER INSERT OR UPDATE OR DELETE ON orders FOR EACH ROW EXECUTE PROCEDURE update_inventory();
(防止出現(xiàn)視覺疲勞)
4. 安全性檢查和防護
執(zhí)行安全性檢查,如防止未授權(quán)的數(shù)據(jù)更改或異常數(shù)據(jù)輸入。如果有可疑活動或不當(dāng)數(shù)據(jù)修改,觸發(fā)器可以自動拒絕操作或生成警告。假設(shè)你有一個敏感數(shù)據(jù)的表,如sensitive_data,需要確保只有授權(quán)用戶才能更新數(shù)據(jù)。 4.1建表sensitive_data示例
CREATE TABLE sensitive_data ( id SERIAL PRIMARY KEY, data TEXT NOT NULL, last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
4.2 創(chuàng)建觸發(fā)函數(shù)進行安全檢查
創(chuàng)建一個觸發(fā)函數(shù)來檢查是否是授權(quán)用戶在做修改。
CREATE OR REPLACE FUNCTION check_user_authorization() RETURNS TRIGGER AS $$ BEGIN -- 簡單檢查:用戶是否在允許的列表中(實際應(yīng)該更加復(fù)雜) IF SESSION_USER <> 'authorized_user' THEN RAISE EXCEPTION 'Unauthorized user. Access denied.'; END IF; -- 更新 last_modified 時間戳 NEW.last_modified := CURRENT_TIMESTAMP; RETURN NEW; END; $$ LANGUAGE plpgsql;
4.3為表創(chuàng)建觸發(fā)器
CREATE TRIGGER secure_update_trigger BEFORE UPDATE ON sensitive_data FOR EACH ROW EXECUTE PROCEDURE check_user_authorization();
該事件觸發(fā)器的功能說明
功能:這個示例功能是,當(dāng)有人試圖更新sensitive_data表中的數(shù)據(jù)時,觸發(fā)器函數(shù)check_user_authorization()會自動檢查發(fā)起更新的數(shù)據(jù)庫用戶是否有權(quán)限。如果沒有權(quán)限,拋出異常并阻止操作。
擴展:在實際的生產(chǎn)環(huán)境中,這種安全性檢查會更復(fù)雜,可能包括日志記錄、詳細的用戶權(quán)限檢查、使用角色來管理權(quán)限等。
安全性:使用觸發(fā)器確保只有合適和經(jīng)過驗證的用戶可以進行關(guān)鍵數(shù)據(jù)修改,這是保護數(shù)據(jù)完整性的一部分。
審計:這種自動檢查可集成到更大的審計框架中,以全面監(jiān)控和存儲所有數(shù)據(jù)修改嘗試記錄。
5.事件通知(客戶端程序配合事件觸發(fā)的同步處理方式)
使用事件觸發(fā)器和事件通知來實現(xiàn)對特定數(shù)據(jù)庫事件的響應(yīng)和處理。使用LISTEN和NOTIFY機制,數(shù)據(jù)庫客戶端可以監(jiān)聽特定的通道,并在觸發(fā)器函數(shù)中發(fā)送通知。這在需要實時監(jiān)控數(shù)據(jù)庫事件時非常有用。下面是一個使用 PostgreSQL 實現(xiàn)事件通知的示例。
假設(shè)我們希望在orders表中插入新訂單時發(fā)送通知,以便外部系統(tǒng)或服務(wù)進行相應(yīng)處理。
5.1建一個orders表方便示例
CREATE TABLE orders ( order_id SERIAL PRIMARY KEY, product_id INT NOT NULL, quantity INT NOT NULL, order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); 觸發(fā)器可以用于事件通知,例如在數(shù)據(jù)變化時發(fā)送電子郵件通知相關(guān)人員。這在實時監(jiān)控和響應(yīng)系統(tǒng)中非常有用。5.2 建立觸發(fā)函數(shù)
CREATE OR REPLACE FUNCTION notify_new_order() RETURNS TRIGGER AS $$ BEGIN -- 使用 NOTIFY 發(fā)送通知,通道名為 'new_order' PERFORM pg_notify('new_order', 'New order placed: ' || NEW.order_id); RETURN NEW; END; $$ LANGUAGE plpgsql;5.3創(chuàng)建觸發(fā)器 為orders表創(chuàng)建觸發(fā)器,以在插入新記錄時調(diào)用觸發(fā)函數(shù)。
CREATE TRIGGER notify_order_insert AFTER INSERT ON orders FOR EACH ROW EXECUTE PROCEDURE notify_new_order();5.4使用 Python 監(jiān)聽通知 我們可以使用 Python 腳本來監(jiān)聽并處理通知。以下是一個簡單的示例,使用psycopg2庫監(jiān)聽new_order通道。
import psycopg2 import select def listen_for_new_orders(): try: # Connect to your PostgreSQL database connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cursor = connection.cursor() # Listen for notifications on the 'new_order' channel cursor.execute("LISTEN new_order;") print("Waiting for notifications on channel 'new_order'...") while True: # Use select() to wait for notification if select.select([connection], [], [], 5) == ([], [], []): print("No new notifications.") else: connection.poll() while connection.notifies: notify = connection.notifies.pop(0) print(f"Got NOTIFY: {notify.channel} - {notify.payload}") except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() # Call the function to start listening for notifications if__name__=='__main__': listen_for_new_orders()
6. 事件通知(客戶端程序異步多線程的方式進行檢測和操作)
示例的數(shù)據(jù)庫表和事件觸發(fā)的設(shè)置或創(chuàng)建,和示例5中相同,不過這里我們要增加一些復(fù)雜度,畢竟,程序處理要盡可能避免堵塞的方式進行等待讀取。這里設(shè)想另外一種使用場景:
一方面客戶端要檢測數(shù)據(jù)庫的orders表中的數(shù)據(jù)變化;另一方面,客戶端還在繼續(xù)讀取(或者其他操作)這個數(shù)據(jù)庫中的數(shù)據(jù)。
import threading import psycopg2 import select import time # Global flag to indicate whether the threads should continue running running = True def listen_for_new_orders(): try: # Connect to your PostgreSQL database connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cursor = connection.cursor() # Listen for notifications on the 'new_order' channel cursor.execute("LISTEN new_order;") print("Waiting for notifications on channel 'new_order'...") while running: # Use select() to wait for notification if select.select([connection], [], [], 5) == ([], [], []): continue else: connection.poll() while connection.notifies: notify = connection.notifies.pop(0) print(f"Got NOTIFY: {notify.channel} - {notify.payload}") except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() def read_database_records(): while running: try: # Example of reading from PostgreSQL connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) cursor = connection.cursor() # Example query to periodically read data (replace with actual query) cursor.execute("SELECT * FROM orders;") records = cursor.fetchall() for record in records: print(f"Order Record: {record}") time.sleep(10) # Wait before reading again to simulate periodic check except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() def main(): try: # Create threads for listening and reading listener_thread = threading.Thread(target=listen_for_new_orders) reader_thread = threading.Thread(target=read_database_records) # Start the threads listener_thread.start() reader_thread.start() # Wait for both threads to complete (or terminate on Ctrl+C) listener_thread.join() reader_thread.join() except KeyboardInterrupt: # Set the running flag to False to stop the threads global running running = False print("Exiting...") if __name__ == "__main__": main()請留意上面的示例python代碼中,數(shù)據(jù)庫的連接使用了ISOLATION_LEVEL_AUTOCOMMIT,這就意味著每次涉及到數(shù)據(jù)更改或者增加的操作,數(shù)據(jù)庫將自動提交了。如果要手動方式提交,那就需要配置一個ISOLATION_LEVEL_READ_COMMITTED。 另外需要留意,前面的事件觸發(fā)示例中,用了:
... FOR EACH ROW EXECUTE PROCEDURE your_trigger_func(); ...這個代碼的執(zhí)行是針對每條記錄的發(fā)生來觸發(fā)了。請根據(jù)實際應(yīng)用的操作需要進行調(diào)整。
-
數(shù)據(jù)庫
+關(guān)注
關(guān)注
7文章
3794瀏覽量
64360 -
觸發(fā)器
+關(guān)注
關(guān)注
14文章
2000瀏覽量
61132 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4327瀏覽量
62569 -
python
+關(guān)注
關(guān)注
56文章
4792瀏覽量
84627
原文標題:數(shù)據(jù)庫事件觸發(fā)的設(shè)置和應(yīng)用,及客戶端程序?qū)κ录耐健惒阶x取操作
文章出處:【微信號:安費諾傳感器學(xué)堂,微信公眾號:安費諾傳感器學(xué)堂】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論