Have you ever wanted something which will reliably keep you informed as to the number or Inserts, Updates or Deletes against table in your system? Maybe to keep an eye on the volume of modifications happening in a replicated environment?
If you put some effort in then this is something that can be done via the Transaction Log.
One thing I would note though, is that reading the transaction log can be heavy on a system and slow down things such as log backups so I wouldn’t recommend this as a permanent tracker by any means.
However, if you want to check on your system over a specific period, or you have performance to spare and simply want to profile for a few days to get a baseline or spot patterns… then this could help you out.
Effectively this builds on my last post in which I showed how to see Inserts, Updates, and Deletes using the transaction log of your database. However, what I’ve done now is to extend that a little in order to track these over time whilst doing my best to avoid any real impact to the server.
This will allow you to profile replication volumes, spot loading or update patterns, or simply track down rogue data changes you didn’t even know were happening.
The easiest way I can think to present this is simply by offering the code with a good deal of annotations rather than trying to explain what’s going on and then providing the code separately…
Note that this provides the framework… you would have to place this into a scheduled task in order to profile data changes over time…
/*
I'm using a test database called testDB... use your own database as required.
But note that I use "testDB.dbo." further in the script so you'll need a find
and replace.
*/
use testDB
go
/*
Create a LogTrack table.
The purpose of this is to log the max LSN we encounter when reading the log.
This means that subsequent reads of the log can be massively restricted so that
we don't duplicate data and also so that we read a small portion of the log,
therefore reducing the impact of fn_dbLog
*/
if object_id('dbo.transactionLogTrack') is null
begin
create table dbo.transactionLogTrack
(
databaseName varchar(100),
lsn varchar(250),
endDate datetime
)
end
/*
This is the main reporting table which collates the transaction activity
in the database and allows for reporting.
*/
if object_id('dbo.transactionActivity') is null
begin
create table dbo.transactionActivity
(
databaseName varchar(50),
tableName varchar(100),
hourFrom smalldatetime,
hourTo smalldatetime,
-- heap or clustered index changes logged separately
tableInserted int,
tableUpdated int,
tableDeleted int,
-- changes to nonclustered indexes
indexInserted int,
indexUpdated int,
indexDeleted int,
-- total changes by type
rowsInserted int,
rowsUpdated int,
rowsDeleted int,
-- total changes to the table
totalChanges int,
-- is the table in replication or not
isReplicated bit
)
end
-- This is the database I want to track
use AdventureWorks2012
go
declare @lsnStart varchar(250)
/*
Look for the last LSN read for this database
This limits the impact on the transaction log
*/
select @lsnStart = lsn
from testDB.dbo.transactionLogTrack
where databaseName = db_name()
/*
First time this is run we need to insert dummy values
so that the whole transaction log is read this time
*/
if @lsnStart is null
begin
insert into testDB.dbo.transactionLogTrack
select db_name(), '0', '2000-01-01'
end
if @lsnStart = '0'
select @lsnStart = null
/*
Collect transaction log data for the specific items we're looking for,
only going as far back as the last read LSN to reduce load
*/
declare @partialLog table
(
lsn varchar(50),
tranID varchar(50),
endTime smalldatetime,
allocUnitID bigint,
operation varchar(250),
context varchar(250)
)
insert into @partialLog
select [Current LSN], [transaction ID] tranID, [end time] endTime, AllocUnitId, operation, Context
from ::fn_dbLog(@lsnStart, null) -- here we speficy how far back in the log we want to look
where (operation in ('LOP_INSERT_ROWS', 'LOP_MODIFY_ROW', 'LOP_DELETE_ROWS')
and context not in ('LCX_PFS', 'LCX_IAM'))
or operation = 'LOP_COMMIT_XACT'
/*
Collate the records into a temp table
*/
declare @logRecords table
(
databaseName varchar(100),
tableName varchar(250),
hourFrom smalldatetime,
hourTo as dateadd(hh, 1, hourFrom),
tableInserted int,
tableUpdated int,
tableDeleted int,
indexInserted int,
indexUpdated int,
indexDeleted int,
-- no point doing totals separately so we're using calculated columns
totalInserted as tableInserted + indexInserted,
totalUpdated as tableUpdated + indexUpdated,
totalDeleted as tableDeleted + indexDeleted,
totalChanges as tableInserted + indexInserted + tableUpdated + indexUpdated + tableDeleted + indexDeleted,
isReplicated bit default(0)
)
;
with splitResult as
(
select db_name() databaseName, dateadd(hh, datepart(hh, x.endTime), convert(varchar, left(x.endTime, 11))) endTime, o.name objectName,
-- if index_id is 0 or 1 then it's the table itself (Heap or clustered)
case when i.index_id in (0, 1) and operation = 'LOP_INSERT_ROWS' then 1 else 0 end tableInsert,
case when i.index_id in (0, 1) and operation = 'LOP_MODIFY_ROW' then 1 else 0 end tableUpdate,
case when ii.index_id in (0, 1) and operation = 'LOP_DELETE_ROWS' then 1 else 0 end tableDelete,
-- an index_id > 1 is a nonclustered index
case when i.index_id > 1 and operation = 'LOP_INSERT_ROWS' then 1 else 0 end indexInsert,
case when i.index_id > 1 and operation = 'LOP_MODIFY_ROW' then 1 else 0 end indexUpdate,
case when i.index_id > 1 and operation = 'LOP_DELETE_ROWS' then 1 else 0 end indexDelete
from @partialLog t
join sys.system_internals_allocation_units siau
on t.AllocUnitId = siau.allocation_unit_id
join sys.partitions p
on siau.container_id = p.partition_id
join sys.indexes i
on p.index_id = i.index_id
and p.object_id = i.object_id
join sys.objects o
on p.object_id = o.object_id
-- cross apply to get the transaction end date applied to all records
cross apply
(
select endTime
from @partialLog
where tranID = t.tranID
and endTime is not null
) x
where o.is_ms_shipped = 0
and t.endTime is null
), addTotals as
(
-- collate into totals
select databaseName, endTime hourFrom, objectName,
sum(tableInsert) tableInserts, sum(tableUpdate) tableUpdates, sum(tableDelete) tableDeletes,
sum(indexInsert) indexInserts, sum(indexUpdate) indexUpdates, sum(indexDelete) indexDeletes
from splitResult
group by databaseName, endTime, objectName
)
insert into @logRecords(databaseName, tableName, hourFrom, tableInserted, tableDeleted, tableUpdated, indexInserted, indexUpdated, indexDeleted)
select databaseName, objectName, hourFrom, tableInserts, tableDeletes, tableUpdates, indexInserts, indexUpdates, indexDeletes
from addTotals
declare @endTime smalldatetime, @lsn varchar(50)
select @lsn = max(lsn), @endTime = max(endTime)
from @partialLog
-- format the LSN ready to insert into the LogTrack table
select @lsn = cast(cast(convert(varbinary, left(@lsn, 8), 2) as int) as varchar) +
right('0000000000' + cast(cast(convert(varbinary, substring(@lsn, 10, 8), 2) as int) as varchar), 10) +
right('00000' + cast(cast(convert(varbinary, right(@lsn, 4), 2) as int) as varchar), 5)
-- insert our start LSN for next run into the log track table
update testDB.dbo.transactionLogTrack
set lsn = isnull(@lsn, 0), endDate = isnull(@endTime, '2001-01-01')
where databaseName = db_name()
-- check to see if this database is replicated by checking for sysarticles
if exists
(
select *
from sys.objects
where name = 'sysarticles'
)
begin
-- if replicated then flag the relevant tables
update n
set isReplicated = 1
from @logRecords n
join sysarticles a
on n.tableName = a.name
and n.hourFrom >=
(
select min(hourFrom)
from @logRecords
)
where isReplicated = 0
end
;
-- merge the overall results into our tracking table
merge testDB.dbo.transactionActivity t
using @logRecords l
on
(
t.databaseName = l.databaseName
and t.tableName = l.tableName
and t.hourFrom = l.hourFrom
)
when matched then
update
set t.tableInserted = t.tableInserted + l.tableInserted,
t.tableUpdated = t.tableUpdated + l.tableUpdated,
t.tableDeleted = t.tableDeleted + l.tableDeleted,
t.indexInserted = t.indexInserted + l.indexInserted,
t.indexUpdated = t.indexUpdated + l.indexUpdated,
t.indexDeleted = t.indexDeleted + l.indexDeleted,
t.isReplicated = l.isReplicated
when not matched then
insert (databaseName, tableName, hourFrom, hourTo, tableInserted, tableUpdated, tableDeleted, indexInserted, indexUpdated, indexDeleted, rowsInserted, rowsUpdated, rowsDeleted, totalChanges, isReplicated)
values (databaseName, tableName, hourFrom, hourTo, tableInserted, tableUpdated, tableDeleted, indexInserted, indexUpdated, indexDeleted, totalInserted, totalUpdated, totalDeleted, totalChanges, isReplicated)
;
If, for any reason, you want to place this in a job and therefore need it to run across all databases, then that can be found below…
use testDB
go
if object_id('dbo.transactionLogTrack') is null
begin
create table dbo.transactionLogTrack
(
databaseName varchar(100),
lsn varchar(250),
endDate datetime
)
end
if object_id('dbo.transactionActivity') is null
begin
create table dbo.transactionActivity
(
databaseName varchar(50),
tableName varchar(100),
hourFrom smalldatetime,
hourTo smalldatetime,
tableInserted int,
tableUpdated int,
tableDeleted int,
indexInserted int,
indexUpdated int,
indexDeleted int,
rowsInserted int,
rowsUpdated int,
rowsDeleted int,
totalChanges int,
isReplicated bit
)
end
declare @databaseList table
(
id tinyint identity(1, 1) not null,
databaseName varchar(50) not null
)
insert into @databaseList
select name
from sys.databases
where name not in ('master', 'msdb', 'model', 'tempDB', 'distribution', 'ReportServer', 'ReportServerTempDB')
and is_read_only = 0
-- note this requires databases to be in the server collation.
and collation_name = serverproperty('Collation')
-- add other criteria as appropriate
declare @counter tinyint = 1, @sql nvarchar(max)
set nocount on
while @counter <= (select max(id) from @databaseList)
begin
select @sql = 'use ' + databaseName + '
declare @lsnStart varchar(250)
select @lsnStart = lsn
from testDB.dbo.transactionLogTrack
where databaseName = db_name()
if @lsnStart is null
begin
insert into testDB.dbo.transactionLogTrack
select db_name(), ''0'', ''2000-01-01''
end
if @lsnStart = ''0''
select @lsnStart = null
declare @partialLog table
(
lsn varchar(50),
tranID varchar(50),
endTime smalldatetime,
allocUnitID bigint,
operation varchar(250),
context varchar(250)
)
insert into @partialLog
select [Current LSN], [transaction ID] tranID, [end time] endTime, AllocUnitId, operation, Context
from ::fn_dbLog(null, null)
where (operation in (''LOP_INSERT_ROWS'', ''LOP_MODIFY_ROW'', ''LOP_DELETE_ROWS'')
and context not in (''LCX_PFS'', ''LCX_IAM''))
or operation = ''LOP_COMMIT_XACT''
declare @logRecords table
(
databaseName varchar(100),
tableName varchar(250),
hourFrom smalldatetime,
hourTo as dateadd(hh, 1, hourFrom),
tableInserted int,
tableUpdated int,
tableDeleted int,
indexInserted int,
indexUpdated int,
indexDeleted int,
totalInserted as tableInserted + indexInserted,
totalUpdated as tableUpdated + indexUpdated,
totalDeleted as tableDeleted + indexDeleted,
totalChanges as tableInserted + indexInserted + tableUpdated + indexUpdated + tableDeleted + indexDeleted,
isReplicated bit default(0)
)
;
with splitResult as
(
select db_name() databaseName, dateadd(hh, datepart(hh, x.endTime), convert(varchar, left(x.endTime, 11))) endTime, o.name objectName,
case when i.index_id in (0, 1) and operation = ''LOP_INSERT_ROWS'' then 1 else 0 end tableInsert,
case when i.index_id in (0, 1) and operation = ''LOP_MODIFY_ROW'' then 1 else 0 end tableUpdate,
case when i.index_id in (0, 1) and operation = ''LOP_DELETE_ROWS'' then 1 else 0 end tableDelete,
case when i.index_id > 1 and operation = ''LOP_INSERT_ROWS'' then 1 else 0 end indexInsert,
case when i.index_id > 1 and operation = ''LOP_MODIFY_ROW'' then 1 else 0 end indexUpdate,
case when i.index_id > 1 and operation = ''LOP_DELETE_ROWS'' then 1 else 0 end indexDelete
from @partialLog t
join sys.system_internals_allocation_units siau
on t.AllocUnitId = siau.allocation_unit_id
join sys.partitions p
on siau.container_id = p.partition_id
join sys.indexes i
on p.index_id = i.index_id
and p.object_id = i.object_id
join sys.objects o
on p.object_id = o.object_id
cross apply
(
select endTime
from @partialLog
where tranID = t.tranID
and endTime is not null
) x
where o.is_ms_shipped = 0
and t.endTime is null
), addTotals as
(
select databaseName, end'dTime hourFrom, objectName,
sum(tableInsert) tableInserts, sum(tableUpdate) tableUpdates, sum(tableDelete) tableDeletes,
sum(indexInsert) indexInserts, sum(indexUpdate) indexUpdates, sum(indexDelete) indexDeletes
from splitResult
group by databaseName, endTime, objectName
)
insert into @logRecords(databaseName, tableName, hourFrom, tableInserted, tableDeleted, tableUpdated, indexInserted, indexUpdated, indexDeleted)
select databaseName, objectName, hourFrom, tableInserts, tableDeletes, tableUpdates, indexInserts, indexUpdates, indexDeletes
from addTotals
declare @endTime smalldatetime, @lsn varchar(50)
select @lsn = max(lsn), @endTime = max(endTime)
from @partialLog
-- format the LSN ready to insert into the LogTrack table
select @lsn = cast(cast(convert(varbinary, left(@lsn, 8), 2) as int) as varchar) +
right(''0000000000'' + cast(cast(convert(varbinary, substring(@lsn, 10, 8), 2) as int) as varchar), 10) +
right(''00000'' + cast(cast(convert(varbinary, right(@lsn, 4), 2) as int) as varchar), 5)
update testDB.dbo.transactionLogTrack
set lsn = isnull(@lsn, 0), endDate = isnull(@endTime, ''2001-01-01'')
where databaseName = db_name()
if exists
(
select *
from sys.objects
where name = ''sysarticles''
)
begin
update n
set isReplicated = 1
from @logRecords n
join sysarticles a
on n.tableName = a.name
and n.hourFrom >=
(
select min(hourFrom)
from @logRecords
)
where isReplicated = 0
end
;
merge testDB.dbo.transactionActivity t
using @logRecords l
on
(
t.databaseName = l.databaseName
and t.tableName = l.tableName
and t.hourFrom = l.hourFrom
)
when matched then
update
set t.tableInserted = t.tableInserted + l.tableInserted,
t.tableUpdated = t.tableUpdated + l.tableUpdated,
t.tableDeleted = t.tableDeleted + l.tableDeleted,
t.indexInserted = t.indexInserted + l.indexInserted,
t.indexUpdated = t.indexUpdated + l.indexUpdated,
t.indexDeleted = t.indexDeleted + l.indexDeleted,
t.isReplicated = l.isReplicated
when not matched then
insert (databaseName, tableName, hourFrom, hourTo, tableInserted, tableUpdated, tableDeleted, indexInserted, indexUpdated, indexDeleted, rowsInserted, rowsUpdated, rowsDeleted, totalChanges, isReplicated)
values (databaseName, tableName, hourFrom, hourTo, tableInserted, tableUpdated, tableDeleted, indexInserted, indexUpdated, indexDeleted, totalInserted, totalUpdated, totalDeleted, totalChanges, isReplicated)
;'
from @databaseList
where id = @counter
exec sp_executeSQL @sql
select @counter += 1
end'