Component: Data migration - Custom batch processing
As described in the configuration guide, the component always processes copy-data jobs in batches. This enables parallel processing and makes it easier to monitor and troubleshoot. This article guides you through configuring batch processing for custom jobs.
Step 1: Create the custom job
Suppose you are migrating data from the well-known WideWorldImporters database. After migrating all [Sales].[Invoices], you need to run a custom job to update their statuses to Archived.
Follow the configuration guide to create a custom job.

Ensure you add a synchronous event trigger named CwDm Sample SalesInvoice Generate Batches.

Step 2: Configure the workflow for job processing
The workflow definition for CwDm Sample SalesInvoice Generate Batches should look like the following:

Scripted activity "Delete batches": A job can be rerun. To avoid duplicating batches, the workflow should delete existing batches, if any.
var dataApi = ctx.Use<IDataApi>();
try
{
// Update progress
var job = dataApi.Load(ctx.Input.Data["JobId"].ToString());
dataApi.Update(job.Id, new
{
Progress = "Cleaning up existing batches...",
});
ctx.CommitChanges();
// Loop through existing batches using DataApi.Enumerate()
var filter = FilterBuilder.Create().Eq("JobId", job.Id).Build();
var query = DataEnumerationQuery.For("CwDmBatch").FilterBy(filter).ProjectOn("Id").SetBatchSize(1000);
foreach (var batch in dataApi.Enumerate(query))
{
dataApi.Delete(batch["Id"].ToString());
}
ctx.CommitChanges();
}
catch(Exception ex)
{
Log.Error(ex, "Error in deleting existing batches");
ctx.Set("Error", ex.ToString());
}
Scripted activity "Generate batches": Split the data collection into batches.
var dataApi = ctx.Use<IDataApi>();
var job = dataApi.Load(ctx.Input.Data["JobId"].ToString());
// Update progress
dataApi.Update(job.Id, new
{
Progress = "Generating batches...",
});
ctx.CommitChanges();
var BATCH_SIZE = 1000;
var CUSTOM_WORKFLOW_EVENT = "CwDm Sample SalesInvoice Archive";
var batchOrder = 1;
var index = 0;
var itemIds = new List<string>();
object fromValue = null;
object toValue = null;
// Loop through all records using DataApi.Enumerate
var filter = FilterBuilder.Create().Build();
var query = DataEnumerationQuery.For("CwDmSampleSalesInvoice")
.FilterBy(filter)
.ProjectOn("Id", "CwDmLegacyId")
.OrderBy("CwDmLegacyId");
foreach (var item in dataApi.Enumerate(query))
{
if (fromValue == null) fromValue = item.Get<int>("CwDmLegacyId");
toValue = item.Get<int>("CwDmLegacyId");
itemIds.Add(item.Get<string>("Id"));
index++;
if (index == BATCH_SIZE)
{
CreateBatch(fromValue, toValue, itemIds);
// Reset for new batch
index = 0;
fromValue = null;
toValue = null;
itemIds = new List<string>();
}
if (batchOrder % 100 == 0)
{
// Write to DB every 100 batches created
ctx.CommitChanges();
}
}
// Create final batch
if (itemIds.Any())
{
CreateBatch(fromValue, toValue, itemIds);
ctx.CommitChanges(); // Final write
}
// Mark the job that all batches are generated
dataApi.Update(job.Id, new
{
IsBatchReady = true
});
string CreateBatch(object fromValue, object toValue, List<string> items)
{
return dataApi.Add("CwDmBatch", new
{
// Standard
JobId = job.Get<string>("Id"),
ProcessId = job.Get<string>("ProcessId"),
FromValue = JsonConvert.SerializeObject(fromValue),
ToValue = JsonConvert.SerializeObject(toValue),
TotalItems = items.Count,
MigratedItems = 0,
Status = "Ready",
Progress = "",
Order = batchOrder++,
// Custom
CustomWorkflowEvent = CUSTOM_WORKFLOW_EVENT,
CustomItems = items
});
}
The following is required when creating custom batches:
CustomWorkflowEvent: The workflow event to process the batch. It isCwDm Sample SalesInvoice Archivein this example.CustomItems: The identifiers of the data items to migrate.
Setting FromValue and ToValue is recommended so users can quickly identify the batch using the visual monitoring tool.
Step 3: Configure the workflow for batch processing
For this example to work, ensure you add a synchronous event trigger named CwDm Sample SalesInvoice Archive.
Below is an example of what a batch-processing workflow looks like. It simply loops through the items stored in the batches and updates data accordingly. The example also shows how you can run a test rule to verify data after migration.
var pluginApi = ctx.Use<IPluginApi>();
var dataApi = ctx.Use<IDataApi>();
var ruleApi = ctx.Use<IRuleApi>();
var batchId = ctx.Input["Data"]["BatchId"].ToString();
var PROGRESS_THRESHOLD = 100;
var TEST_RULE = "CwDm SO Test SalesInvoice Archive";
var progress = 0;
var total = 0;
try
{
// Step 0: STANDARD - Extract event data
var batch = dataApi.Load(batchId);
var job = dataApi.Load(batch.Get<string>("JobId"));
// Step 1: STANDARD - Query data
IList<string> items = batch.GetList<string>("CustomItems");
total = items.Count;
// Step 2: STANDARD - Transform data and insert to DB
foreach(dynamic item in items)
{
try
{
// STANDARD - Migrate data
Migrate(batch, job, item);
// STANDARD - Update progress
UpdateProgress(batch, job, item);
}
catch(Exception ex)
{
Log.Error(ex, "Error in processing {@item}", item);
throw;
}
}
// Step 3: STANDARD - Commit the last progress
dataApi.Update(batchId, new
{
IsTestPassed = (bool?)null,
MigratedItems = progress,
Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%"
});
ctx.CommitChanges();
// Step 4: STANDARD - Execute test rule on the last item if test rule is configured
if (items.Any() && !string.IsNullOrEmpty(TEST_RULE))
{
var testItem = items.Last();
RunTest(batch, job, testItem);
}
}
catch(Exception ex)
{
// STANDARD - Throw the error
Log.Error(ex, "Error in copying data");
throw;
}
///// STANDARD FUNCTIONS
void UpdateProgress(dynamic batch, dynamic job, dynamic item)
{
var partitionFieldName = job.Get<string>("CopyDataPartitionFieldName");
progress += 1;
if (progress % PROGRESS_THRESHOLD == 0)
{
Log.Verbose("Periodically update status: job {Job} batch {Batch}", job["Name"], batch["Order"]);
dataApi.Update(batchId, new
{
MigratedItems = progress,
Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%"
});
ctx.CommitChanges(); // Commit partially
// Try to break the loop if the batch was stopped by user/system
var stopRuleParameters = new Dictionary<string, object>();
stopRuleParameters["BatchId"] = batchId;
var shouldStop = ruleApi.Eval<bool>("CwDm Should Batch Stop", stopRuleParameters);
if (shouldStop)
{
Log.Info("Stopping job {Job} batch {Batch}...", job["Name"], batch["Order"]);
throw new Exception($"The batch {batchId} was stopped. The system will try to cancel all change...");
}
}
}
void RunTest(dynamic batch, dynamic job, dynamic testItem)
{
var testRule = TEST_RULE;
var stopIfTestFailed = job.Has("StopIfTestFailed") && job.Get<bool>("StopIfTestFailed");
var testRuleParameters = new Dictionary<string, object>();
testRuleParameters["SourceItem"] = testItem;
var testResult = ruleApi.Eval<bool>(testRule, testRuleParameters);
dataApi.Update(batchId, new
{
MigratedItems = progress,
Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%",
IsTestPassed = testResult
});
ctx.CommitChanges(); // Commit partially
Log.Debug("Test result for batch {Job} {Batch}: {TestResult}", job.Get<string>("Name"), batch.Get<int>("Order"), testResult);
if (!testResult && stopIfTestFailed)
{
Log.Error("Test failed for batch {Job} {Batch}", job.Get<string>("Name"), batch.Get<int>("Order"));
throw new Exception($"Test failed for batch {job.Get<string>("Name")} {batch.Get<int>("Order")}");
}
}
void Migrate(dynamic batch, dynamic job, string item)
{
// You custom migration logic
dataApi.Update(item, new { IsArchived = true });
}
Step 4: Run and verify
Open the job Archive invoices and click Start. If everything is configured correctly, you will see batches being generated and run sequentially.
