Friday, October 29, 2010

5 Steps to Database Synchronization Part 1

This article is the first part of a series of three where we are going to demonstrate step by step how to synchronize a client database (SQL Server Express 2008) with a server database (SQL Server 2008) using Microsoft Synchronization Services through an N-Tier architecture supported by WCF.
Let’s suppose we have an application used by insurance agents to sell insurance policies on the field. The application can work offline and needs the ability to synchronize the data with the main server database. For the sake of simplicity, we will only be synchronizing one table (Policies).
The insurance agents should not be able to access all of the policies, only the policies sold by them, so we need to filter the data by the insurance agent id.

Step 1: Enable change tracking on our server and client databases.
One of the new features of the SQL Server 2008 version is the ability to track changes. Change information will be retained for at least the time period that is specified by the retention period and retention period units.
ALTER DATABASE Insurance SET ALLOW_SNAPSHOT_ISOLATION ON
ALTER DATABASE Insurance SET
CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
Each table to be synchronized must have a primary key (the use of guids is recommended). Primary keys must be unique across all nodes and must not be reused. Run the following script for each table that we want to enable synchronization:
ALTER TABLE Insurance.dbo.Policies
ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = OFF)

Step 2: Create the Server Synchronization Provider
The Server Synchronization Provider stores information about the tables that are enabled for synchronization and enables the application to retrieve changes that occurred in the server database since the last synchronization. It is also responsible for applying changes to the server database and detecting any conflicts.
The sample code below demonstrates how to implement a Server Synchronization Provider. Note that the sample code uses two stored procedures, usp_GetNewBatchAnchor and usp_SelectIncrementalInserts. The code for those stored procedures is also provided.
Since we are implementing the synchronization direction as bidirectional, we need to set the following commands for each table we want to synchronize:
  • SelectIncrementalDeletesCommand: Gets the deletes made to the table in the server database since the last synchronization. These are the deletes that will be sent to the client database.
  • SelectIncrementalInsertsCommand: Gets the inserts made to the table in the server database since the last synchronization. These are the inserts that will be sent to the client database.
  • SelectIncrementalUpdatesCommand: Gets the updates made to the table in the server database since the last synchronization. These are the updates that will be sent to the client database.
  • InsertCommand: Used to insert a record that has been inserted in the client database.
  • UpdateCommand: Used to update a record that has been updated in the client database.
  • DeleteCommand: Used to delete a record that has been deleted in the client database.
  • SelectConflictUpdatedRowsCommand: selects conflicting rows from the server database if the rows still exist in the base table.
  • SelectConflictDeletedRowsCommand: Selects conflicting rows from the server database if the rows have been deleted from the base table.
The sample only shows how to set the SelectIncrementalInsertsCommand command. For a complete reference on how to define the commands, consult the Microsoft Synchronization Services documentation: http://msdn.microsoft.com/en-us/library/bb726002.aspx
On our next article 5 Steps to Database Synchronization Part 2 we will continue discussing the following steps. Stay tuned!
Sample Code: Creating the Server Synchronization Provider
//Create a class that is derived from Microsoft.Synchronization.Server.DbServerSyncProvider.
public class MyServerSynchronizationProvider : DbServerSyncProvider
{
public MyServerSynchronizationProvider()
{
//Create a connection to the server database
var serverConn =
new SqlConnection("Integrated Security=SSPI;database=Insurance;server=mySQLServer;Connect Timeout=30");
this.Connection = serverConn;
//Create a command to retrieve a new anchor value from the server. This value is retrieved and stored in the client database.
//During each synchronization, the new anchor value and the last anchor value from the previous synchronization
//are used. The set of changes between these upper and lower bounds is synchronized.
var selectNewAnchorCommand = new SqlCommand
{
CommandText = "usp_GetNewBatchAnchor",
CommandType = CommandType.StoredProcedure,
Connection = serverConn
};
selectNewAnchorCommand.Parameters.Add("@" + SyncSession.SyncLastReceivedAnchor, SqlDbType.BigInt, 8);
selectNewAnchorCommand.Parameters.Add("@" + SyncSession.SyncMaxReceivedAnchor, SqlDbType.BigInt, 8).Direction =
ParameterDirection.Output;
selectNewAnchorCommand.Parameters.Add("@" + SyncSession.SyncNewReceivedAnchor, SqlDbType.BigInt, 8).Direction =
ParameterDirection.Output;
selectNewAnchorCommand.Parameters.Add("@" + SyncSession.SyncBatchSize, SqlDbType.Int, 4);
selectNewAnchorCommand.Parameters.Add("@" + SyncSession.SyncBatchCount, SqlDbType.Int, 4).Direction =
ParameterDirection.InputOutput;
this.SelectNewAnchorCommand = selectNewAnchorCommand;
//Enable downloading changes by batches
this.BatchSize = 1000;
//Create a SyncAdapter for the each table to synchronize by using the SqlSyncAdapterBuilder.
var builder = new SqlSyncAdapterBuilder(serverConn)
{
TableName = "Policies",
ChangeTrackingType = ChangeTrackingType.SqlServerChangeTracking,
SyncDirection = SyncDirection.Bidirectional
};
//Specify a filter clause, which is an SQL WHERE clause without the WHERE keyword. Use the parameter that is
//created above. The value for the parameter is specified in the SyncAgent Configuration object.
//Create a filter parameter that will be used in the filter clause
var filterParameter = new SqlParameter("@InsuranceAgentId", SqlDbType.Int);
builder.FilterClause = " InsuranceAgentId=@InsuranceAgentId";
builder.FilterParameters.Add(filterParameter);
var syncAdapter = builder.ToSyncAdapter();
syncAdapter.TableName = "Policies";
//Specifying a Command to Select Incremental Inserts from the Server to Apply to the Client
var incrInserts = new SqlCommand
{
CommandText = "usp_SelectIncrementalInserts",
CommandType = CommandType.StoredProcedure
};
incrInserts.Parameters.Add("@" + SyncSession.SyncInitialized, SqlDbType.Int);
incrInserts.Parameters.Add("@" + SyncSession.SyncLastReceivedAnchor, SqlDbType.BigInt);
incrInserts.Parameters.Add("@" + SyncSession.SyncClientIdBinary, SqlDbType.Binary);
incrInserts.Parameters.Add("@" + SyncSession.SyncNewReceivedAnchor, SqlDbType.BigInt);
incrInserts.Parameters.Add("@" + SyncSession.SyncTableName, SqlDbType.NVarChar);
incrInserts.Connection = serverConn;
syncAdapter.SelectIncrementalInsertsCommand = incrInserts;
//Set the rest of the commands here!!!
this.SyncAdapters.Add(syncAdapter);
this.ApplyChangeFailed += ServerSyncProvider_ApplyChangeFailed;
}
void ServerSyncProvider_ApplyChangeFailed(object sender, ApplyChangeFailedEventArgs e)
{
//Respond to conflicts
}
}

CREATE PROCEDURE [dbo].[usp_GetNewBatchAnchor] (
@sync_last_received_anchor bigint,
@sync_batch_size int,
@sync_max_received_anchor bigint out,
@sync_new_received_anchor bigint out,
@sync_batch_count int output)
AS
-- Set a default batch size if a valid one is not passed in.
IF @sync_batch_size IS NULL OR @sync_batch_size <= 0
SET @sync_batch_size = 1000
-- Before selecting the first batch of changes,
-- set the maximum anchor value for this synchronization
-- session. After the first time that this procedure is
-- called, Synchronization Services passes a value for
-- @sync_max_received_anchor to the procedure. Batches of
-- changes are synchronized until this value is reached.
IF @sync_max_received_anchor IS NULL
SELECT @sync_max_received_anchor =
change_tracking_current_version()
-- If this is the first synchronization session for a database,
-- get the lowest change version value from the tables. By
-- default, Synchronization Services uses a value of 0 for
-- sync_last_received_anchor on the first synchronization. If
-- you do not set @sync_last_received_anchor,
-- this can cause empty batches to be downloaded until the
-- lowest change version value is reached.
IF @sync_last_received_anchor IS NULL OR
@sync_last_received_anchor = 0
BEGIN
SELECT @sync_last_received_anchor = 0
-- Changes are only retained in the change table for a limited
-- period of time set by the CHANGE_RETENTION parameter
-- (on ALTER DATABASE).
-- Check that we haven't had changes cleaned up on this table
-- (i.e. CHANGE_TRACKING_MIN_VALID_VERSION returns > 0)
IF CHANGE_TRACKING_MIN_VALID_VERSION(object_id(N'dbo.Policies')) > @sync_last_received_anchor
RAISERROR (N'SQL Server Change Tracking has cleaned up tracking information for table ''%s''. To recover from this error, the client must reinitialize its local database and try again',16,3,N'dbo.Policies')
SET @sync_new_received_anchor = @sync_last_received_anchor + @sync_batch_size
-- Determine how many batches are required during the
-- initial synchronization.
IF @sync_batch_count <= 0
SET @sync_batch_count = (
(@sync_max_received_anchor / @sync_batch_size) -
(@sync_last_received_anchor / @sync_batch_size)
)
END
ELSE
BEGIN
SET @sync_new_received_anchor = @sync_last_received_anchor + @sync_batch_size
-- Determine how many batches are required during subsequent
-- synchronizations.
IF @sync_batch_count <= 0
SET @sync_batch_count = (
(@sync_max_received_anchor / @sync_batch_size) -
(@sync_new_received_anchor / @sync_batch_size)) + 1
END
-- Check whether this is the last batch.
IF @sync_new_received_anchor >= @sync_max_received_anchor
BEGIN
SET @sync_new_received_anchor = @sync_max_received_anchor
IF @sync_batch_count <= 0
SET @sync_batch_count = 1
END
CREATE procedure [dbo].[usp_SelectIncrementalInserts]
(
@sync_initialized int,
@sync_last_received_anchor bigint,
@sync_client_id_binary binary,
@sync_new_received_anchor bigint,
@sync_table_name nvarchar,
@InsuranceAgentId int
)
AS
BEGIN
IF @sync_initialized = 0
SELECT dbo.Policies.[PolicyId], [PurchaseDate],[etc]
FROM dbo.Policies LEFT OUTER JOIN
CHANGETABLE(CHANGES dbo.Policies, @sync_last_received_anchor) CT
ON CT.[PolicyId] = dbo.Policies.[PolicyId]
WHERE InsuranceAgentId = @InsuranceAgentId AND
(CT.SYS_CHANGE_CONTEXT IS NULL OR CT.SYS_CHANGE_CONTEXT <> @sync_client_id_binary)
ELSE
BEGIN
SELECT dbo.Policies.[PolicyId], [PurchaseDate], [etc]
FROM dbo.Policies JOIN CHANGETABLE(CHANGES dbo.Policies, @sync_last_received_anchor) CT
ON CT.[PolicyId] = dbo.Policies.[PolicyId]
WHERE InsuranceAgentId = @InsuranceAgentId AND
(CT.SYS_CHANGE_OPERATION = 'I' AND CT.SYS_CHANGE_CREATION_VERSION
<= @sync_new_received_anchor
AND (CT.SYS_CHANGE_CONTEXT IS NULL OR CT.SYS_CHANGE_CONTEXT <> @sync_client_id_binary))
END
END


No comments:

Post a Comment