建源的數(shù)據(jù)庫信息,目標(biāo)的數(shù)據(jù)庫信息,如果是多個(gè)表,需要一個(gè)一個(gè)地拉source和target,然后一個(gè)一個(gè)地mapping,然后運(yùn)行實(shí)現(xiàn)數(shù)據(jù)同步。
于是就有了本文,基于Python(目的是順便熟悉一下Python的語法),快速實(shí)現(xiàn)SQL Server的數(shù)據(jù)庫之間的數(shù)據(jù)同步操作,后面又稍微擴(kuò)展了一下,可以實(shí)現(xiàn)不同服務(wù)器的數(shù)據(jù)庫之間的表結(jié)構(gòu),表對應(yīng)的數(shù)據(jù),存儲過程,函數(shù),用戶自定義類型表(user define table type)的同步
目前支持在兩個(gè)SQL Server數(shù)據(jù)源之間:每次同步一張或者多張表/存儲過程,也可以同步整個(gè)數(shù)據(jù)庫的所有表/存儲過程(以及表/存儲過程依賴的其他數(shù)據(jù)庫對象)。
這一點(diǎn)導(dǎo)致了重構(gòu)大量的代碼,一開始都是直來直去的同步,無法實(shí)現(xiàn)這個(gè)邏輯,切實(shí)體會到代碼的“單一職責(zé)”原則
1,源服務(wù)器信息 (服務(wù)器地址,實(shí)例名,數(shù)據(jù)庫名稱,用戶名,密碼),沒有用戶名密碼的情況下,使用windows身份認(rèn)證模式
2,目標(biāo)服務(wù)器信息(服務(wù)器地址,實(shí)例名,數(shù)據(jù)庫名稱,用戶名,密碼),沒有用戶名密碼的情況下,使用windows身份認(rèn)證模式
表同步的原理是,創(chuàng)建目標(biāo)表,遍歷源數(shù)據(jù)的表,生成insert into values(***),(***),(***)格式的sql,然后插入目標(biāo)數(shù)據(jù)庫,這里大概步驟如下:
2,強(qiáng)制覆蓋的情況下,會drop掉目標(biāo)表(如果存在的話),防止目標(biāo)表與源表結(jié)構(gòu)不一致,非強(qiáng)制覆蓋的情況下,如果字段不一致,則拋出異常
5,轉(zhuǎn)義處理,在拼湊SQL的時(shí)候,需要進(jìn)行轉(zhuǎn)義處理,否則會導(dǎo)致SQL語句錯(cuò)誤,目前處理了字符串中的'字符,二進(jìn)制字段,時(shí)間字段的轉(zhuǎn)義處理(最容易發(fā)生問題的地方)
使用如下參數(shù),同步源數(shù)據(jù)庫的三張表到目標(biāo)數(shù)據(jù)庫,因?yàn)檫@里是在本機(jī)命名實(shí)例下測試,因此實(shí)例名和端口號輸入
1,當(dāng)表的索引為filter index的時(shí)候,無法生成包含where條件的索引創(chuàng)建語句,那個(gè)看起來蛋疼的表結(jié)構(gòu)導(dǎo)出語句,暫時(shí)沒時(shí)間改它。
2,暫時(shí)不支持其他少用的類型字段,比如地理空間字段什么的。
使用如下參數(shù),同步源數(shù)據(jù)庫的兩個(gè)存儲過程到目標(biāo)數(shù)據(jù)庫,因?yàn)檫@里是在本機(jī)命名實(shí)例下測試,因此實(shí)例名和端口號輸入
因此,這個(gè)測試的[dbo].[sp_test01]就依賴于其他對象,如果其依賴的對象不存在,同步的時(shí)候,僅僅同步這個(gè)存儲過程本身,是沒有意義的
如果在默認(rèn)schema為dbo的對象,在存儲過程或者函數(shù)中沒有寫schema(參考如下修改后的sp,不寫相關(guān)表的schema dbo,dbo.test01==>test01),
使用 sys.dm_sql_referenced_entities這個(gè)系統(tǒng)函數(shù)是無法找到其依賴的對象的,奇葩的是可以找到schema的類型,卻沒有返回對象本身。
這一點(diǎn)導(dǎo)致在代碼中層層深入,進(jìn)行了長時(shí)間的debug,完全沒有想到這個(gè)函數(shù)是這個(gè)鳥樣子,因?yàn)檫@里找到依賴對象的類型,卻找不到對象本身,次奧?。?!
另外一種情況就是動態(tài)SQL了,無法使用 sys.dm_sql_referenced_entities這個(gè)系統(tǒng)函數(shù)找到其依賴的對象。
支持其他數(shù)據(jù)庫對象的同步,比如function,table type等,因?yàn)榭梢栽谕狡渌鎯^程對象的時(shí)候附帶的同步function,table type,這個(gè)與表或者存儲過程類似,不做過多說明?!?/p>
1,代碼結(jié)構(gòu)優(yōu)化,更加清晰和條例的結(jié)構(gòu)(一開始用最直接簡單粗暴的方式快速實(shí)現(xiàn),后面重構(gòu)了很多代碼,現(xiàn)在自己看起來還有很多不舒服的痕跡)
2,數(shù)據(jù)同步的效率問題,對于多表的導(dǎo)入導(dǎo)出操作,依賴于單線程,多個(gè)大表導(dǎo)出串行的話,可能存在效率上的瓶頸,如何根據(jù)表的數(shù)據(jù)量,盡可能平均地分配多多個(gè)線程中,提升效率
3,更加友好清晰的異常提示以及日志記錄,生成導(dǎo)出日志信息。
4,異構(gòu)數(shù)據(jù)同步,MySQL《==》SQL Server《==》Oracle《==》PGSQL
# -*- coding: utf-8 -*-
# !/usr/bin/env python3
__author__ = 'MSSQL123'
__date__ = '2019-06-07 09:36'
import os
import sys
import time
import datetime
import pymssql
from decimal import Decimal
usage = '''
-----parameter explain-----
source database parameter
-s_h : soure database host ----- must require parameter
-s_i : soure database instace name ----- default instance name MSSQL
-s_d : soure database name ----- must require parameter
-s_u : soure database login ----- default windows identifier
-s_p : soure database login password ----- must require when s_u is not null
-s_P : soure database instance port ----- default port 1433
target database parameter
-t_h : target database host ----- must require parameter
-t_i : target database instace name ----- default instance name MSSQL
-t_d : target database name ----- must require parameter
-t_u : target database login ----- default windows identifier
-t_p : target database login password ----- must require when s_u is not null
-t_P : target database instance port ----- default port 1433
sync object parameter
-obj_type : table or sp or function or other databse object ----- tab or sp or fn or tp
-obj : table|sp|function|type name ----- whick table or sp sync
overwirte parameter
-f : force overwirte target database object ----- F or N
--help: help document
Example:
python DataTransfer.py -s_h=127.0.0.1 -s_P=1433 -s_i="MSSQL" -s_d="DB01" -obj_type="tab" -obj="dbo.t1,dbo.t2" -t_h=127.0.0.1 -t_P=1433 -t_i="MSSQL" -t_d="DB02" -f="Y"
python DataTransfer.py -s_h=127.0.0.1 -s_P=1433 -s_i="MSSQL" -s_d="DB01" -obj_type="sp" -obj="dbo.sp1,dbo.sp2" -t_h=127.0.0.1 -t_P=1433 -t_i="MSSQL" -t_d="DB02" -f="Y"
'''
class SyncDatabaseObject(object):
# source databse
s_h = None
s_i = None
s_P = None
s_u = None
s_p = None
s_d = None
# obj type
s_obj_type = None
# sync objects
s_obj = None
# target database
t_h = None
t_i = None
t_P = None
t_u = None
t_p = None
t_d = None
f = None
file_path = None
def __init__(self, *args, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
# connect to sqlserver
def get_connect(self, _h, _i, _P, _u, _p, _d):
cursor = False
try:
if (_u) and (_p):
conn = pymssql.connect(host=_h,
server=_i,
port=_P,
user=_u,
password=_p,
database=_d)
else:
conn = pymssql.connect(host=_h,
server=_i,
port=_P,
database=_d)
if (conn):
return conn
except:
raise
return conn
# check connection
def validated_connect(self, _h, _i, _P, _u, _p, _d):
if not (self.get_connect(_h, _i, _P, _u, _p, _d)):
print("connect to " + str(_h) + " failed,please check you parameter")
exit(0)
'''
this is supposed to be a valid object name just like xxx_name,or dbo.xxx_name,or [schema].xxx_name or schema.[xxx_name]
then transfer this kind of valid object name to format object name like [dbo].[xxx_name](give a default dbo schema name when no schema name)
other format object name consider as unvalid,will be rasie error in process
format object name
1,xxx_name ======> [dbo].[xxx_name]
2,dbo.xxx_name ======> [dbo].[xxx_name]
3,[schema].xxx_name ======> [dbo].[xxx_name]
3,schema.xxx_name ======> [schema].[xxx_name]
4,[schema].[xxx_name] ======> [schema].[xxx_name]
5,[schema].[xxx_name ======> rasie error format message
'''
@staticmethod
def format_object_name(name):
format_name = ""
if ("." in name):
schema_name = name[0:name.find(".")]
object_name = name[name.find(".") + 1:]
if not ("[" in schema_name):
schema_name = "[" + schema_name + "]"
if not ("[" in object_name):
object_name = "[" + object_name + "]"
format_name = schema_name + "." + object_name
else:
if ("[" in name):
format_name = "[dbo]." + name
else:
format_name = "[dbo]." + "[" + name + "]"
return format_name
'''
check user input object is a valid object
'''
def exits_object(self, conn, name):
conn = conn
cursor_source = conn.cursor()
# get object by name from source db
sql_script = r'''select top 1 1 from
(
select concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) as obj_name from sys.objects
union all
select concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) as obj_name from sys.types
)t where obj_name = '{0}'
'''.format(self.format_object_name(name))
cursor_source.execute(sql_script)
result = cursor_source.fetchall()
if not result:
return 0
else:
return 1
conn.cursor.close()
conn.close()
# table variable sync
def sync_table_variable(self, tab_name, is_reference):
conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)
conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
if (self.exits_object(conn_source, self.format_object_name(tab_name))) > 0:
pass
else:
print("----------------------- warning message -----------------------")
print("--------warning: object " + tab_name + " not existing in source database ------------")
print("----------------------- warning message -----------------------")
print()
return
exists_in_target = 0
sql_script = r'''select top 1 1
from sys.table_types tp
where is_user_defined = 1
and concat(QUOTENAME(schema_name(tp.schema_id)),'.',QUOTENAME(tp.name)) = '{0}' ''' \
.format((self.format_object_name(tab_name)))
# if the table schema exists in target server,skip
cursor_target.execute(sql_script)
exists_in_target = cursor_target.fetchone()
# weather exists in target server database
if (self.f == "Y"):
if (is_reference != "Y"):
# skiped,table type can not drop when used by sp
sql_script = r'''
if OBJECT_ID('{0}') is not null
drop type {0}
'''.format(self.format_object_name(tab_name))
cursor_target.execute(sql_script)
conn_target.commit()
else:
if exists_in_target:
print("----------------------- warning message -----------------------")
print("the target table type " + tab_name + " exists ,skiped sync table type from source")
print("----------------------- warning message -----------------------")
print()
return
sql_script = r'''
DECLARE @SQL NVARCHAR(MAX) = ''
SELECT @SQL =
'CREATE TYPE ' + '{0}' + 'AS TABLE' + CHAR(13) + '(' + CHAR(13) +
STUFF((
SELECT CHAR(13) + ' , [' + c.name + '] ' +
CASE WHEN c.is_computed = 1
THEN 'AS ' + OBJECT_DEFINITION(c.[object_id], c.column_id)
ELSE
CASE WHEN c.system_type_id != c.user_type_id
THEN '[' + SCHEMA_NAME(tp.[schema_id]) + '].[' + tp.name + ']'
ELSE '[' + UPPER(y.name) + ']'
END +
CASE
WHEN y.name IN ('varchar', 'char', 'varbinary', 'binary')
THEN '(' + CASE WHEN c.max_length = -1
THEN 'MAX'
ELSE CAST(c.max_length AS VARCHAR(5))
END + ')'
WHEN y.name IN ('nvarchar', 'nchar')
THEN '(' + CASE WHEN c.max_length = -1
THEN 'MAX'
ELSE CAST(c.max_length / 2 AS VARCHAR(5))
END + ')'
WHEN y.name IN ('datetime2', 'time2', 'datetimeoffset')
THEN '(' + CAST(c.scale AS VARCHAR(5)) + ')'
WHEN y.name = 'decimal'
THEN '(' + CAST(c.[precision] AS VARCHAR(5)) + ',' + CAST(c.scale AS VARCHAR(5)) + ')'
ELSE ''
END +
CASE WHEN c.collation_name IS NOT NULL AND c.system_type_id = c.user_type_id
THEN ' COLLATE ' + c.collation_name
ELSE ''
END +
CASE WHEN c.is_nullable = 1
THEN ' NULL'
ELSE ' NOT NULL'
END +
CASE WHEN c.default_object_id != 0
THEN ' CONSTRAINT [' + OBJECT_NAME(c.default_object_id) + ']' +
' DEFAULT ' + OBJECT_DEFINITION(c.default_object_id)
ELSE ''
END
END
From sys.table_types tp
Inner join sys.columns c on c.object_id = tp.type_table_object_id
Inner join sys.types y ON y.system_type_id = c.system_type_id
WHERE tp.is_user_defined = 1 and y.name>'sysname'
and concat(QUOTENAME(schema_name(tp.schema_id)),'.',QUOTENAME(tp.name)) = '{0}'
ORDER BY c.column_id
FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 7, ' ')
+ ');'
select @SQL as script
'''.format(self.format_object_name(self.format_object_name((tab_name))))
cursor_target = conn_target.cursor()
cursor_source.execute(sql_script)
row = cursor_source.fetchone()
try:
if not exists_in_target:
# execute the script on target server
cursor_target.execute(str(row[0])) # drop current stored_procudre if exists
conn_target.commit()
print("*************table type " + self.format_object_name(tab_name) + " synced *********************")
print() # give a blank row when finish
except:
print("----------------------- error message -----------------------")
print("-----------table type " + self.format_object_name(tab_name) + " synced error ---------------")
print("----------------------- error message -----------------------")
print()
# raise
cursor_source.close()
conn_source.close()
cursor_target.close()
conn_target.close()
# schema sync
def sync_schema(self):
conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)
conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
arr_schema = []
# get all table in database when not define table name
schema_result = cursor_source.execute(r'''
select name from sys.schemas where schema_id>4 and schema_id16384
''')
for row in cursor_source.fetchall():
cursor_target.execute(r''' if not exists(select * from sys.schemas where name = '{0}')
begin
exec('create schema [{0}]')
end
'''.format(str(row[0])))
conn_target.commit()
cursor_source.close()
conn_source.close()
cursor_target.close()
conn_target.close()
def sync_table_schema_byname(self, tab_name, is_reference):
conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)
conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
if (self.exits_object(conn_source, self.format_object_name(tab_name)) == 0):
print("----------------------- warning message -----------------------")
print("---------------warning: object " + tab_name + " not existing in source database ----------------")
print("----------------------- warning message -----------------------")
print()
return
# if exists a reference table for sp,not sync the table agagin
if (self.exits_object(conn_target, self.format_object_name(tab_name)) > 0):
if (self.f != "Y"):
print("----------------------- warning message -----------------------")
print("---------------warning: object " + tab_name + " existing in target database ----------------")
print("----------------------- warning message -----------------------")
print()
return
sql_script = r''' select top 1 1 from sys.tables
where type_desc = 'USER_TABLE'
and concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) = '{0}'
'''.format((self.format_object_name(tab_name)))
# if the table schema exists in target server,skip
cursor_target.execute(sql_script)
exists_in_target = cursor_target.fetchone()
if exists_in_target:
if (self.f == "Y"):
if (is_reference != "Y"):
cursor_target.execute("drop table {0}".format(tab_name))
else:
print("----------------------- warning message -----------------------")
print("the target table " + tab_name + " exists ,skiped sync table schema from source")
print("----------------------- warning message -----------------------")
print()
return
sql_script = r''' DECLARE
@object_name SYSNAME
, @object_id INT
SELECT
@object_name = '[' + s.name + '].[' + o.name + ']'
, @object_id = o.[object_id]
FROM sys.objects o WITH (NOWAIT)
JOIN sys.schemas s WITH (NOWAIT) ON o.[schema_id] = s.[schema_id]
WHERE QUOTENAME(s.name) + '.' + QUOTENAME(o.name) = '{0}'
AND o.[type] = 'U'
AND o.is_ms_shipped = 0
DECLARE @SQL NVARCHAR(MAX) = ''
;WITH index_column AS
(
SELECT
ic.[object_id]
, ic.index_id
, ic.is_descending_key
, ic.is_included_column
, c.name
FROM sys.index_columns ic WITH (NOWAIT)
JOIN sys.columns c WITH (NOWAIT) ON ic.[object_id] = c.[object_id] AND ic.column_id = c.column_id
WHERE ic.[object_id] = @object_id
),
fk_columns AS
(
SELECT
k.constraint_object_id
, cname = c.name
, rcname = rc.name
FROM sys.foreign_key_columns k WITH (NOWAIT)
JOIN sys.columns rc WITH (NOWAIT) ON rc.[object_id] = k.referenced_object_id AND rc.column_id = k.referenced_column_id
JOIN sys.columns c WITH (NOWAIT) ON c.[object_id] = k.parent_object_id AND c.column_id = k.parent_column_id
WHERE k.parent_object_id = @object_id
)
SELECT @SQL = 'CREATE TABLE ' + @object_name + '' + '(' + '' + STUFF((
SELECT '' + ', [' + c.name + '] ' +
CASE WHEN c.is_computed = 1
THEN 'AS ' + cc.[definition]
ELSE UPPER(tp.name) +
CASE WHEN tp.name IN ('varchar', 'char', 'varbinary', 'binary', 'text')
THEN '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length AS VARCHAR(5)) END + ')'
WHEN tp.name IN ('nvarchar', 'nchar')
THEN '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length / 2 AS VARCHAR(5)) END + ')'
WHEN tp.name IN ('datetime2', 'time2', 'datetimeoffset')
THEN '(' + CAST(c.scale AS VARCHAR(5)) + ')'
WHEN tp.name = 'decimal'
THEN '(' + CAST(c.[precision] AS VARCHAR(5)) + ',' + CAST(c.scale AS VARCHAR(5)) + ')'
ELSE ''
END +
CASE WHEN c.collation_name IS NOT NULL THEN ' COLLATE ' + c.collation_name ELSE '' END +
CASE WHEN c.is_nullable = 1 THEN ' NULL' ELSE ' NOT NULL' END +
CASE WHEN dc.[definition] IS NOT NULL THEN ' DEFAULT' + dc.[definition] ELSE '' END +
CASE WHEN ic.is_identity = 1 THEN ' IDENTITY(' + CAST(ISNULL( /*ic.seed_value*/ 1, '0') AS CHAR(1)) + ',' + CAST(ISNULL(ic.increment_value, '1') AS CHAR(1)) + ')' ELSE '' END
END + ''
FROM sys.columns c WITH (NOWAIT)
JOIN sys.types tp WITH (NOWAIT) ON c.user_type_id = tp.user_type_id
LEFT JOIN sys.computed_columns cc WITH (NOWAIT) ON c.[object_id] = cc.[object_id] AND c.column_id = cc.column_id
LEFT JOIN sys.default_constraints dc WITH (NOWAIT) ON c.default_object_id != 0 AND c.[object_id] = dc.parent_object_id AND c.column_id = dc.parent_column_id
LEFT JOIN sys.identity_columns ic WITH (NOWAIT) ON c.is_identity = 1 AND c.[object_id] = ic.[object_id] AND c.column_id = ic.column_id
WHERE c.[object_id] = @object_id
ORDER BY c.column_id
FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '' + ' ')
+ ISNULL((SELECT '' + ', CONSTRAINT [' + k.name + '] PRIMARY KEY (' +
(SELECT STUFF((
SELECT ', [' + c.name + '] ' + CASE WHEN ic.is_descending_key = 1 THEN 'DESC' ELSE 'ASC' END
FROM sys.index_columns ic WITH (NOWAIT)
JOIN sys.columns c WITH (NOWAIT) ON c.[object_id] = ic.[object_id] AND c.column_id = ic.column_id
WHERE ic.is_included_column = 0
AND ic.[object_id] = k.parent_object_id
AND ic.index_id = k.unique_index_id
FOR XML PATH(N''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, ''))
+ ')' + ''
FROM sys.key_constraints k WITH (NOWAIT)
WHERE k.parent_object_id = @object_id
AND k.[type] = 'PK'), '') + ')' + ''
+ ISNULL((SELECT (
SELECT '' +
'ALTER TABLE ' + @object_name + ' WITH'
+ CASE WHEN fk.is_not_trusted = 1
THEN ' NOCHECK'
ELSE ' CHECK'
END +
' ADD CONSTRAINT [' + fk.name + '] FOREIGN KEY('
+ STUFF((
SELECT ', [' + k.cname + ']'
FROM fk_columns k
WHERE k.constraint_object_id = fk.[object_id] and 1=2
FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '')
+ ')' +
' REFERENCES [' + SCHEMA_NAME(ro.[schema_id]) + '].[' + ro.name + '] ('
+ STUFF((
SELECT ', [' + k.rcname + ']'
FROM fk_columns k
WHERE k.constraint_object_id = fk.[object_id]
FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '')
+ ')'
+ CASE
WHEN fk.delete_referential_action = 1 THEN ' ON DELETE CASCADE'
WHEN fk.delete_referential_action = 2 THEN ' ON DELETE SET NULL'
WHEN fk.delete_referential_action = 3 THEN ' ON DELETE SET DEFAULT'
ELSE ''
END
+ CASE
WHEN fk.update_referential_action = 1 THEN ' ON UPDATE CASCADE'
WHEN fk.update_referential_action = 2 THEN ' ON UPDATE SET NULL'
WHEN fk.update_referential_action = 3 THEN ' ON UPDATE SET DEFAULT'
ELSE ''
END
+ '' + 'ALTER TABLE ' + @object_name + ' CHECK CONSTRAINT [' + fk.name + ']' + ''
FROM sys.foreign_keys fk WITH (NOWAIT)
JOIN sys.objects ro WITH (NOWAIT) ON ro.[object_id] = fk.referenced_object_id
WHERE fk.parent_object_id = @object_id
FOR XML PATH(N''), TYPE).value('.', 'NVARCHAR(MAX)')), '')
+ ISNULL(((SELECT
'' + 'CREATE' + CASE WHEN i.is_unique = 1 THEN ' UNIQUE' ELSE '' END
+ ' NONCLUSTERED INDEX [' + i.name + '] ON ' + @object_name + ' (' +
STUFF((
SELECT ', [' + c.name + ']' + CASE WHEN c.is_descending_key = 1 THEN ' DESC' ELSE ' ASC' END
FROM index_column c
WHERE c.is_included_column = 0
AND c.index_id = i.index_id
FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '') + ')'
+ ISNULL('' + 'INCLUDE (' +
STUFF((
SELECT ', [' + c.name + ']'
FROM index_column c
WHERE c.is_included_column = 1
AND c.index_id = i.index_id
FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '') + ')', '') + ''
FROM sys.indexes i WITH (NOWAIT)
WHERE i.[object_id] = @object_id
AND i.is_primary_key = 0
AND i.[type] = 2
FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)')
), '')
select @SQL as script '''.format(self.format_object_name(tab_name))
cursor_target = conn_target.cursor()
cursor_source.execute(sql_script)
row = cursor_source.fetchone()
if not row[0]:
return
try:
cursor_target.execute(row[0]) # drop current table schema if exists
conn_target.commit()
print("*************schema " + self.format_object_name(tab_name) + " synced *************")
print() # give a blank row when finish
except:
print("----------------------- warning message -----------------------")
print("-----------schema " + self.format_object_name(tab_name) + " synced failed---------------")
print("----------------------- warning message -----------------------")
print()
cursor_source.close()
conn_source.close()
cursor_target.close()
conn_target.close()
def get_table_column(self, conn, tab_name):
column_names = ""
conn = conn
cursor_source = conn.cursor()
# get object by name from source db
sql_script = r'''select name from sys.columns
where object_id = object_id('{0}') and is_computed=0 order by object_id
'''.format(self.format_object_name(tab_name))
cursor_source.execute(sql_script)
result = cursor_source.fetchall()
for row in result:
column_names = column_names + row[0] + ","
return column_names[0:len(column_names) - 1]
conn.cursor.close()
conn.close()
def sync_table_schema(self):
#default not sync by referenced other object
is_reference = "N"
conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)
conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
arr_table = []
if (self.s_obj):
for tab_name in self.s_obj.split(","):
if (tab_name) and (self.exits_object(conn_source, tab_name)>0):
self.sync_table_schema_byname(tab_name, is_reference)
else:
print("----------------------- warning message -----------------------")
print("-----------schema " + self.format_object_name(tab_name) + " not existing in source database---------------")
print("----------------------- warning message -----------------------")
print()
else:
# sync all tables
# get all table in database when not define table name
sql_script = ''' SELECT QUOTENAME(s.name)+'.'+ QUOTENAME(o.name)
FROM sys.objects o WITH (NOWAIT)
JOIN sys.schemas s WITH (NOWAIT) ON o.[schema_id] = s.[schema_id]
WHERE o.[type] = 'U' AND o.is_ms_shipped = 0
'''
cursor_source.execute(sql_script)
for row in cursor_source.fetchall():
self.sync_table_schema_byname(str(row[0]), is_reference)
# sync data from soure table to target table
def sync_table_data(self):
conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)
conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
arr_table = []
if (self.s_obj):
arr_table = self.s_obj.split(',')
for tab_name in arr_table:
if (self.exits_object(conn_target, self.format_object_name(tab_name)) == 0):
arr_table.remove(tab_name)
print("----------------- warning message -----------------------")
print("----------------- warning: table " + tab_name + " not existing in target database ---------------------")
print("----------------- warning message -----------------------")
else:
# get all table in database when not define table name
tab_result = cursor_source.execute(r''' SELECT QUOTENAME(s.name)+'.'+ QUOTENAME(o.name)
FROM sys.objects o WITH (NOWAIT)
JOIN sys.schemas s WITH (NOWAIT) ON o.[schema_id] = s.[schema_id]
WHERE o.[type] = 'U'
AND o.is_ms_shipped = 0
''')
for row in cursor_source.fetchall():
arr_table.append(str(row[0]))
insert_columns = ""
insert_columns = self.get_table_column(conn_source, tab_name)
for tab_name in arr_table:
if (self.f != "Y"):
sql_script = "select top 1 {0} from {1} ".format(insert_columns, tab_name)
# if exists data in target table,break
cursor_target.execute(sql_script)
exists = cursor_target.fetchone()
if exists:
print("----------------------- warning message -----------------------")
print("the target table " + tab_name + " exists data,skiped sync table type from source")
print("----------------------- warning message -----------------------")
print()
continue
else:
sql_script = "truncate table {0} ".format(tab_name)
# if exists data in target table,break
cursor_target.execute(sql_script)
conn_target.commit()
insert_columns = ""
insert_columns = self.get_table_column(conn_source, tab_name)
insert_prefix = ""
# weather has identity column
cursor_source.execute(r'''select 1 from sys.columns
where object_id = OBJECT_ID('{0}') and is_identity =1
'''.format(tab_name))
exists_identity = None
exists_identity = cursor_source.fetchone()
if (exists_identity):
insert_prefix = "set identity_insert {0} on; ".format(tab_name)
# data source
insert_sql = ""
values_sql = ""
current_row = ""
counter = 0
sql_script = r''' select {0} from {1} '''.format(insert_columns, tab_name)
cursor_source.execute(sql_script)
# create insert columns
'''
for field in cursor_source.description:
insert_columns = insert_columns + str(field[0]) + ","
insert_columns = insert_columns[0:len(insert_columns) - 1]
'''
insert_prefix = insert_prefix + "insert into {0} ({1}) values ".format(tab_name, insert_columns)
for row in cursor_source.fetchall():
counter = counter + 1
for key in row:
if (str(key) == "None"):
current_row = current_row + r''' null, '''
else:
if (type(key) is datetime.datetime):
current_row = current_row + r''' '{0}', '''.format(str(key)[0:23])
elif (type(key) is str):
# 我槽?。?!,這里又有一個(gè)坑:https://blog.csdn.net/dadaowuque/article/details/81016127
current_row = current_row + r''' '{0}', '''.format(
key.replace("'", "''").replace('\u0000', '').replace('\x00', ''))
elif (type(key) is Decimal):
d = Decimal(key)
s = '{0:f}'.format(d)
current_row = current_row + r''' '{0}', '''.format(s)
elif (type(key) is bytes):
# print(hex(int.from_bytes(key, 'big', signed=True) ))
current_row = current_row + r''' {0}, '''.format(
hex(int.from_bytes(key, 'big', signed=False)))
else:
current_row = current_row + r''' '{0}', '''.format(key)
current_row = current_row[0:len(current_row) - 2] # remove the the last one char ","
values_sql = values_sql + "(" + current_row + "),"
current_row = ""
# execute the one batch when
if (counter == 1000):
insert_sql = insert_prefix + values_sql
insert_sql = insert_sql[0:len(insert_sql) - 1] # remove the the last one char ","
if (exists_identity):
insert_sql = insert_sql + " ;set identity_insert {0} off;".format(tab_name)
try:
cursor_target.execute(insert_sql)
except:
print(
"----------------------error " + tab_name + " data synced failed-------------------------")
raise
conn_target.commit()
insert_sql = ""
values_sql = ""
current_row = ""
counter = 0
print(time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime()) + "*************** " + self.format_object_name(
tab_name) + " " + str(1000) + " rows synced *************")
if (values_sql):
insert_sql = insert_prefix + values_sql
insert_sql = insert_sql[0:len(insert_sql) - 1] # remove the the last one char ","
if (exists_identity):
insert_sql = insert_sql + " ; set identity_insert {0} off;".format(tab_name)
# execute the last batch
try:
cursor_target.execute(insert_sql)
except:
print("------------------error " + tab_name + " data synced failed------------------------")
raise
conn_target.commit()
insert_sql = ""
values_sql = ""
current_row = ""
print(time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime()) + "*************** " + self.format_object_name(
tab_name) + " " + str(
counter) + " rows synced *************")
print(time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime()) + "----------------synced " + self.format_object_name(
tab_name) + " data finished---------------")
print()
cursor_source.close()
conn_source.close()
cursor_target.close()
conn_target.close()
def sync_dependent_object(self, obj_name):
# 強(qiáng)制覆蓋,不需要對依賴對象生效,如果是因?yàn)閷儆谝蕾噷ο蠖煌降模葯z查target中是否存在,如果存在就不繼續(xù)同步,這里打一個(gè)標(biāo)記來實(shí)現(xiàn)
is_refernece = "Y"
conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)
conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
'''
find dependent objects
if exists dependent objects,sync Dependent objects objects in advance
'''
sql_check_dependent = r'''
SELECT * FROM
(
SELECT
distinct rtrim(lower(s.type)) COLLATE Chinese_PRC_CI_AS as obj_type,
QUOTENAME(d.referenced_schema_name)+'.'+QUOTENAME(d.referenced_entity_name) COLLATE Chinese_PRC_CI_AS as obj
FROM sys.dm_sql_referenced_entities('{0}','OBJECT') as d
inner join sys.sysobjects s on s.id = d.referenced_id
union all
SELECT
distinct rtrim(lower(d.referenced_class_desc)) COLLATE Chinese_PRC_CI_AS as obj_type,
QUOTENAME(d.referenced_schema_name)+'.'+QUOTENAME(d.referenced_entity_name) COLLATE Chinese_PRC_CI_AS as obj
FROM sys.dm_sql_referenced_entities('{0}','OBJECT') as d
inner join sys.types s on s.user_type_id = d.referenced_id
)t
'''.format(self.format_object_name(obj_name))
cursor_source.execute(sql_check_dependent)
result = cursor_source.fetchall()
for row in result:
if row[1]:
if (row[0] == "u"):
if (row[1]):
self.sync_table_schema_byname(row[1], is_refernece)
elif (row[0] == "fn" or row[0] == "if"):
if (row[1]):
self.sync_procudre_by_name("f", row[1], is_refernece)
elif (row[0] == "type"):
if (row[1]):
self.sync_table_variable(row[1], is_refernece)
def sync_procudre_by_name(self, type, obj_name, is_reference):
conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)
conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
if (self.exits_object(conn_source, self.format_object_name(obj_name)) == 0):
print("---------------warning message----------------")
print("---------------warning: object " + obj_name + " not existing in source database ----------------")
print("---------------warning message----------------")
print()
return
if (self.exits_object(conn_target, self.format_object_name(obj_name)) > 0):
if (self.f != "Y"):
print("---------------warning message----------------")
print("---------------warning: object " + obj_name + " existing in target database ----------------")
print("---------------warning message----------------")
print()
return
'''
本來想直接生成刪除語句的:
這里有一個(gè)該死的轉(zhuǎn)義,怎么都弄不好,中午先去吃飯吧,
下午回來想了一下,換一種方式,不要死磕轉(zhuǎn)義問題了
sql_script =
select
'if object_id('+''''+QUOTENAME(schema_name(uid))+ '' + QUOTENAME(name)+''''+') is not null '
+' drop proc '+QUOTENAME(schema_name(uid))+ '.' + QUOTENAME(name) ,
OBJECT_DEFINITION(id)
from sys.sysobjects where xtype = 'P' and uid not in (16,19)
'''
sql_script = r'''
select
QUOTENAME(schema_name(uid))+'.'+QUOTENAME(name),
OBJECT_DEFINITION(id)
from sys.sysobjects where xtype in ('P','IF','FN') and uid not in (16,19)
'''
if (obj_name):
sql_script = sql_script + " and QUOTENAME(schema_name(uid))+ '.' + QUOTENAME(name) ='{0}' ".format(
self.format_object_name(obj_name))
cursor_source.execute(sql_script)
row = cursor_source.fetchone()
try:
if type == "f":
sql_script = r'''
if object_id('{0}') is not null
drop function {0}
'''.format(self.format_object_name(row[0]))
elif type == "p":
sql_script = r'''
if object_id('{0}') is not null
drop proc {0}
'''.format(self.format_object_name(row[0]))
cursor_target.execute(sql_script) # drop current stored_procudre if exists
conn_target.commit()
# sync dependent object
if (is_reference != "N"):
self.sync_dependent_object(self.format_object_name(row[0]))
# sync object it self
cursor_target.execute(str(row[1])) # execute create stored_procudre script
conn_target.commit()
print("*************sync sp: " + self.format_object_name(row[0]) + " finished *****************")
print()
except:
print("---------------error message----------------")
print("------------------ sync " + row[0] + "sp error --------------------------")
print("---------------error message----------------")
print()
cursor_source.close()
conn_source.close()
cursor_target.close()
conn_target.close()
def sync_procudre(self, type):
is_reference = "N"
conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d)
conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d)
cursor_source = conn_source.cursor()
cursor_target = conn_target.cursor()
if (self.s_obj):
for proc_name in self.s_obj.split(","):
self.sync_dependent_object(proc_name)
self.sync_procudre_by_name(type, proc_name, is_reference)
# sync all sp and function
else:
sql_script = r'''
select
QUOTENAME(schema_name(uid))+'.'+QUOTENAME(name),
OBJECT_DEFINITION(id)
from sys.sysobjects where xtype = upper('{0}') and uid not in (16,19)
'''.format(type)
cursor_source.execute(sql_script)
for row in cursor_source.fetchall():
self.sync_dependent_object(row[0])
self.sync_procudre_by_name(type, row[0], is_reference)
if __name__ == "__main__":
'''
sync = SyncDatabaseObject(s_h="127.0.0.1",
s_i = "sql2017",
s_P = 49744,
s_d="DB01",
t_h="127.0.0.1",
t_i="sql2017",
t_P=49744,
t_d="DB02",
s_obj_type = "sp",
s_obj = "dbo.sp_test01",
f="Y")
sync.sync_procudre("p")
'''
p_s_h = ""
p_s_i = "MSSQL"
p_s_P = 1433
p_s_d = ""
p_s_u = None
p_s_p = None
p_s_obj = ""
p_type = ""
p_t_s = ""
p_t_i = "MSSQL"
p_t_P = "1433"
p_t_d = ""
p_t_u = None
p_t_p = None
# force conver target database object,default not force cover target database object
p_f = "N"
# sync obj type table|sp
p_obj_type = None
# sync whick database object
p_obj = None
if len(sys.argv) == 1:
print(usage)
sys.exit(1)
elif sys.argv[1] == '--help':
print(usage)
sys.exit()
elif len(sys.argv) >= 2:
for i in sys.argv[1:]:
_argv = i.split('=')
# source server name
if _argv[0] == '-s_h':
p_s_h = _argv[1]
# source server instance name
if _argv[0] == '-s_i':
if (_argv[1]):
p_s_i = _argv[1]
# source server instance PORT
if _argv[0] == '-s_P':
if (_argv[1]):
p_s_P = _argv[1]
# source database name
if _argv[0] == '-s_d':
p_s_d = _argv[1]
if _argv[0] == '-s_u':
p_s_u = _argv[1]
if _argv[0] == '-s_p':
p_s_p = _argv[1]
if _argv[0] == '-t_h':
p_t_h = _argv[1]
if _argv[0] == '-t_i':
if (_argv[1]):
p_t_i = _argv[1]
if _argv[0] == '-t_P':
if (_argv[1]):
p_t_P = _argv[1]
if _argv[0] == '-t_d':
p_t_d = _argv[1]
if _argv[0] == '-t_u':
p_t_u = _argv[1]
if _argv[0] == '-t_p':
p_t_p = _argv[1]
if _argv[0] == '-f':
if (_argv[1]):
p_f = _argv[1]
# object type
if _argv[0] == '-obj_type':
if not (_argv[1]):
print("-obj_type can not be null (-obj=tab|-obj=sp|-obj=fn|-obj=type)")
exit(0)
else:
p_obj_type = _argv[1]
# object name
if _argv[0] == '-obj':
if (_argv[1]):
p_obj = _argv[1]
# require para
if p_s_h.strip() == "":
print("source server host cannot be null")
exit(0)
if p_s_d.strip() == "":
print("source server host database name cannot be null")
exit(0)
if p_t_h.strip() == "":
print("target server host cannot be null")
exit(0)
if p_t_d.strip() == "":
print("target server host database name cannot be null")
exit(0)
sync = SyncDatabaseObject(s_h=p_s_h,
s_i=p_s_i,
s_P=p_s_P,
s_d=p_s_d,
s_u=p_s_u,
s_p=p_s_p,
s_obj=p_obj,
t_h=p_t_h,
t_i=p_t_i,
t_P=p_t_P,
t_d=p_t_d,
t_u=p_t_u,
t_p=p_t_p,
f=p_f)
sync.validated_connect(p_s_h, p_s_i, p_s_P, p_s_d, p_s_u, p_s_p)
sync.validated_connect(p_t_h, p_t_i, p_t_P, p_t_d, p_t_u, p_t_p)
if (p_f.upper() == "Y"):
confirm = input("confirm you want to overwrite the target object? ")
if confirm.upper() != "Y":
exit(0)
print("-------------------------- sync begin ----------------------------------")
print()
if (p_obj_type == "tab"):
# sync schema
sync.sync_schema()
# sync table schema
sync.sync_table_schema()
# sync data
sync.sync_table_data()
elif (p_obj_type == "sp"):
# sync schema
sync.sync_schema()
# sync sp
sync.sync_procudre("p")
elif (p_obj_type == "fn"):
# sync schema
sync.sync_schema()
# sync sp
sync.sync_procudre("fn")
elif (p_obj_type == "tp"):
# sync schema
sync.sync_schema()
# sync sp
sync.sync_table_variable()
else:
print("-obj_type is not validated")
print()
print("-------------------------- sync finish ----------------------------------")
以上所述是小編給大家介紹的基于Python的SQL Server數(shù)據(jù)庫實(shí)現(xiàn)對象同步輕量級,希望對大家有所幫助,如果大家有任何疑問歡迎給我留言,小編會及時(shí)回復(fù)大家的!