Component: Data migration - Custom batch processing
As described in the configuration guide, the component always processes copy-data jobs in batches. This enables parallel processing as well as makes it easier to monitor and troubleshoot. This article will guide you on configuring batch processing for custom jobs.
Step 1: Create the custom job
Assuming you are migrating data from the well-known database WideWorldImporters, and after migrating all [Sales].[Invoices]
, you need to run a custom job to update their statuses to Archived
.
Follow the configuration guide to create a custom job.
Ensure you add a synchronous event trigger named CwDm Sample SalesInvoice Generate Batches.
Step 2: Configure the workflow for job processing
The workflow definition for CwDm Sample SalesInvoice Generate Batches should look like below:
Scripted activity "Delete batches": A job can be rerun. To avoid duplicating batches, the workflow should delete existing batches, if any.
var dataApi = ctx.Use<IDataApi>();
try
{
// Update progress
var job = dataApi.Load(ctx.Input.Data["JobId"].ToString());
dataApi.Update(job.Id, new
{
Progress = "Cleaning up existing batches...",
});
ctx.CommitChanges();
// Loop through existing batches using DataApi.Enumerate()
var filter = FilterBuilder.Create().Eq("JobId", job.Id).Build();
var query = DataEnumerationQuery.For("CwDmBatch").FilterBy(filter).ProjectOn("Id").SetBatchSize(1000);
foreach (var batch in dataApi.Enumerate(query))
{
dataApi.Delete(batch["Id"].ToString());
}
ctx.CommitChanges();
}
catch(Exception ex)
{
Log.Error(ex, "Error in deleting existing batches");
ctx.Set("Error", ex.ToString());
}
Scripted activity "Generate batches": Split the data collection into batches.
var dataApi = ctx.Use<IDataApi>();
var job = dataApi.Load(ctx.Input.Data["JobId"].ToString());
// Update progress
dataApi.Update(job.Id, new
{
Progress = "Generating batches...",
});
ctx.CommitChanges();
var BATCH_SIZE = 1000;
var CUSTOM_WORKFLOW_EVENT = "CwDm Sample SalesInvoice Archive";
var batchOrder = 1;
var index = 0;
var itemIds = new List<string>();
object fromValue = null;
object toValue = null;
// Loop through all records using DataApi.Enumerate
var filter = FilterBuilder.Create().Build();
var query = DataEnumerationQuery.For("CwDmSampleSalesInvoice")
.FilterBy(filter)
.ProjectOn("Id", "CwDmLegacyId")
.OrderBy("CwDmLegacyId");
foreach (var item in dataApi.Enumerate(query))
{
if (fromValue == null) fromValue = item.Get<int>("CwDmLegacyId");
toValue = item.Get<int>("CwDmLegacyId");
itemIds.Add(item.Get<string>("Id"));
index++;
if (index == BATCH_SIZE)
{
CreateBatch(fromValue, toValue, itemIds);
// Reset for new batch
index = 0;
fromValue = null;
toValue = null;
itemIds = new List<string>();
}
if (batchOrder % 100 == 0)
{
// Write to DB every 100 batches created
ctx.CommitChanges();
}
}
// Create final batch
if (itemIds.Any())
{
CreateBatch(fromValue, toValue, itemIds);
ctx.CommitChanges(); // Final write
}
// Mark the job that all batches are generated
dataApi.Update(job.Id, new
{
IsBatchReady = true
});
string CreateBatch(object fromValue, object toValue, List<string> items)
{
return dataApi.Add("CwDmBatch", new
{
// Standard
JobId = job.Get<string>("Id"),
ProcessId = job.Get<string>("ProcessId"),
FromValue = JsonConvert.SerializeObject(fromValue),
ToValue = JsonConvert.SerializeObject(toValue),
TotalItems = items.Count,
MigratedItems = 0,
Status = "Ready",
Progress = "",
Order = batchOrder++,
// Custom
CustomWorkflowEvent = CUSTOM_WORKFLOW_EVENT,
CustomItems = items
});
}
The following is required when creating custom batches:
CustomWorkflowEvent
: The workflow event to process the batch. It isCwDm Sample SalesInvoice Archive
in this example.CustomItems
: The identifier of the data item needs to migrate.
Setting FromValue
and ToValue
is recommended so users can quickly identify the batch using the visual monitoring tool.
Step 3: Configure the workflow for batch processing
For this example to work, ensure you add a synchronous event trigger named CwDm Sample SalesInvoice Archive.
Below is an example of what a batch-processing workflow looks like. It simply loops through the items stored in the batches and updates data accordingly. The example also shows how you can run a test rule to verify data after migration.
var pluginApi = ctx.Use<IPluginApi>();
var dataApi = ctx.Use<IDataApi>();
var ruleApi = ctx.Use<IRuleApi>();
var batchId = ctx.Input["Data"]["BatchId"].ToString();
var PROGRESS_THRESHOLD = 100;
var TEST_RULE = "CwDm SO Test SalesInvoice Archive";
var progress = 0;
var total = 0;
try
{
// Step 0: STANDARD - Extract event data
var batch = dataApi.Load(batchId);
var job = dataApi.Load(batch.Get<string>("JobId"));
// Step 1: STANDARD - Query data
IList<string> items = batch.GetList<string>("CustomItems");
total = items.Count;
// Step 2: STANDARD - Transform data and insert to DB
foreach(dynamic item in items)
{
try
{
// STANDARD - Migrate data
Migrate(batch, job, item);
// STANDARD - Update progress
UpdateProgress(batch, job, item);
}
catch(Exception ex)
{
Log.Error(ex, "Error in processing {@item}", item);
throw;
}
}
// Step 3: STANDARD - Commit the last progress
dataApi.Update(batchId, new
{
IsTestPassed = (bool?)null,
MigratedItems = progress,
Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%"
});
ctx.CommitChanges();
// Step 4: STANDARD - Execute test rule on the last item if test rule is configured
if (items.Any() && !string.IsNullOrEmpty(TEST_RULE))
{
var testItem = items.Last();
RunTest(batch, job, testItem);
}
}
catch(Exception ex)
{
// STANDARD - Throw the error
Log.Error(ex, "Error in copying data");
throw;
}
///// STANDARD FUNCTIONS
void UpdateProgress(dynamic batch, dynamic job, dynamic item)
{
var partitionFieldName = job.Get<string>("CopyDataPartitionFieldName");
progress += 1;
if (progress % PROGRESS_THRESHOLD == 0)
{
Log.Verbose("Periodically update status: job {Job} batch {Batch}", job["Name"], batch["Order"]);
dataApi.Update(batchId, new
{
MigratedItems = progress,
Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%"
});
ctx.CommitChanges(); // Commit partially
// Try to break the loop if the batch was stopped by user/system
var stopRuleParameters = new Dictionary<string, object>();
stopRuleParameters["BatchId"] = batchId;
var shouldStop = ruleApi.Eval<bool>("CwDm Should Batch Stop", stopRuleParameters);
if (shouldStop)
{
Log.Info("Stopping job {Job} batch {Batch}...", job["Name"], batch["Order"]);
throw new Exception($"The batch {batchId} was stopped. The system will try to cancel all change...");
}
}
}
void RunTest(dynamic batch, dynamic job, dynamic testItem)
{
var testRule = TEST_RULE;
var stopIfTestFailed = job.Has("StopIfTestFailed") && job.Get<bool>("StopIfTestFailed");
var testRuleParameters = new Dictionary<string, object>();
testRuleParameters["SourceItem"] = testItem;
var testResult = ruleApi.Eval<bool>(testRule, testRuleParameters);
dataApi.Update(batchId, new
{
MigratedItems = progress,
Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%",
IsTestPassed = testResult
});
ctx.CommitChanges(); // Commit partially
Log.Debug("Test result for batch {Job} {Batch}: {TestResult}", job.Get<string>("Name"), batch.Get<int>("Order"), testResult);
if (!testResult && stopIfTestFailed)
{
Log.Error("Test failed for batch {Job} {Batch}", job.Get<string>("Name"), batch.Get<int>("Order"));
throw new Exception($"Test failed for batch {job.Get<string>("Name")} {batch.Get<int>("Order")}");
}
}
void Migrate(dynamic batch, dynamic job, string item)
{
// You custom migration logic
dataApi.Update(item, new { IsArchived = true });
}
Step 4: Run and verify
Open the job Archive invoices and click Start. If everything is configured correctly, you will see batches are generated and run sequentially.