Search Results for

    Show / Hide Table of Contents

    Component: Data migration - Custom batch processing

    As described in the configuration guide, the component always processes copy-data jobs in batches. This enables parallel processing and makes it easier to monitor and troubleshoot. This article guides you through configuring batch processing for custom jobs.

    Step 1: Create the custom job

    Suppose you are migrating data from the well-known WideWorldImporters database. After migrating all [Sales].[Invoices], you need to run a custom job to update their statuses to Archived.

    Follow the configuration guide to create a custom job.

    image-20231213220248360

    Ensure you add a synchronous event trigger named CwDm Sample SalesInvoice Generate Batches.

    image-20231213220603679

    Step 2: Configure the workflow for job processing

    The workflow definition for CwDm Sample SalesInvoice Generate Batches should look like the following:

    image-20231213220654730

    Scripted activity "Delete batches": A job can be rerun. To avoid duplicating batches, the workflow should delete existing batches, if any.

    var dataApi = ctx.Use<IDataApi>();
    
    try
    {
        // Update progress
        var job = dataApi.Load(ctx.Input.Data["JobId"].ToString());
        dataApi.Update(job.Id, new 
        { 
            Progress = "Cleaning up existing batches...",
        });
        ctx.CommitChanges();
    
        // Loop through existing batches using DataApi.Enumerate()
        var filter = FilterBuilder.Create().Eq("JobId", job.Id).Build();
        var query = DataEnumerationQuery.For("CwDmBatch").FilterBy(filter).ProjectOn("Id").SetBatchSize(1000);
    
        foreach (var batch in dataApi.Enumerate(query))
        {
            dataApi.Delete(batch["Id"].ToString());
        }
    
        ctx.CommitChanges();
    }
    catch(Exception ex)
    {
        Log.Error(ex, "Error in deleting existing batches");
        ctx.Set("Error", ex.ToString());
    }
    

    Scripted activity "Generate batches": Split the data collection into batches.

    var dataApi = ctx.Use<IDataApi>();
    
    var job = dataApi.Load(ctx.Input.Data["JobId"].ToString());
    
    // Update progress
    dataApi.Update(job.Id, new 
    { 
        Progress = "Generating batches...",
    });
    ctx.CommitChanges();
    
    var BATCH_SIZE = 1000;
    var CUSTOM_WORKFLOW_EVENT = "CwDm Sample SalesInvoice Archive";
    
    var batchOrder = 1;
    var index = 0;
    var itemIds = new List<string>();
    object fromValue = null;
    object toValue = null;
    
    // Loop through all records using DataApi.Enumerate
    var filter = FilterBuilder.Create().Build();
    var query = DataEnumerationQuery.For("CwDmSampleSalesInvoice")
        .FilterBy(filter)
        .ProjectOn("Id", "CwDmLegacyId")
        .OrderBy("CwDmLegacyId");
    
    foreach (var item in dataApi.Enumerate(query))
    {
        if (fromValue == null) fromValue = item.Get<int>("CwDmLegacyId");
        toValue = item.Get<int>("CwDmLegacyId");
        itemIds.Add(item.Get<string>("Id"));
    
        index++;
        if (index == BATCH_SIZE)
        {
            CreateBatch(fromValue, toValue, itemIds);
    
            // Reset for new batch
            index = 0;
            fromValue = null;
            toValue = null;
            itemIds = new List<string>();
        }
    
        if (batchOrder % 100 == 0)
        {
            // Write to DB every 100 batches created
            ctx.CommitChanges();
        }
    }
    
    // Create final batch
    if (itemIds.Any())
    {
        CreateBatch(fromValue, toValue, itemIds);
        ctx.CommitChanges(); // Final write
    }
    
    // Mark the job that all batches are generated
    dataApi.Update(job.Id, new 
    { 
        IsBatchReady = true
    });
    
    string CreateBatch(object fromValue, object toValue, List<string> items)
    {
        return dataApi.Add("CwDmBatch", new 
        {
            // Standard
            JobId = job.Get<string>("Id"),
            ProcessId = job.Get<string>("ProcessId"),
            FromValue = JsonConvert.SerializeObject(fromValue),
            ToValue = JsonConvert.SerializeObject(toValue),
            TotalItems = items.Count,
            MigratedItems = 0,
            Status = "Ready",
            Progress = "",
            Order = batchOrder++,
    
            // Custom
            CustomWorkflowEvent = CUSTOM_WORKFLOW_EVENT,
            CustomItems = items
        });
    }
    

    The following is required when creating custom batches:

    • CustomWorkflowEvent: The workflow event to process the batch. It is CwDm Sample SalesInvoice Archive in this example.
    • CustomItems: The identifiers of the data items to migrate.

    Setting FromValue and ToValue is recommended so users can quickly identify the batch using the visual monitoring tool.

    Step 3: Configure the workflow for batch processing

    For this example to work, ensure you add a synchronous event trigger named CwDm Sample SalesInvoice Archive.

    Below is an example of what a batch-processing workflow looks like. It simply loops through the items stored in the batches and updates data accordingly. The example also shows how you can run a test rule to verify data after migration.

    var pluginApi = ctx.Use<IPluginApi>(); 
    var dataApi = ctx.Use<IDataApi>();
    var ruleApi = ctx.Use<IRuleApi>();
    
    var batchId = ctx.Input["Data"]["BatchId"].ToString();
    var PROGRESS_THRESHOLD = 100;
    var TEST_RULE = "CwDm SO Test SalesInvoice Archive";
    var progress = 0;
    var total = 0;
    
    try
    {
        // Step 0: STANDARD - Extract event data
        var batch = dataApi.Load(batchId);
        var job = dataApi.Load(batch.Get<string>("JobId"));
    
        // Step 1: STANDARD - Query data
        IList<string> items = batch.GetList<string>("CustomItems");
        total = items.Count;
    
        // Step 2: STANDARD - Transform data and insert to DB
        foreach(dynamic item in items)
        {
            try
            {
                // STANDARD - Migrate data
                Migrate(batch, job, item);
    
                // STANDARD - Update progress
                UpdateProgress(batch, job, item);
            }
            catch(Exception ex)
            {
                Log.Error(ex, "Error in processing {@item}", item);
                throw;
            }
        }
    
        // Step 3: STANDARD - Commit the last progress
        dataApi.Update(batchId, new
        {
            IsTestPassed = (bool?)null,
            MigratedItems = progress,
            Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%"
        });
        ctx.CommitChanges(); 
    
        // Step 4: STANDARD - Execute test rule on the last item if test rule is configured
        if (items.Any() && !string.IsNullOrEmpty(TEST_RULE))
        {
            var testItem = items.Last();
            RunTest(batch, job, testItem);
        }
    }
    catch(Exception ex)
    {
        // STANDARD - Throw the error
        Log.Error(ex, "Error in copying data");
        throw;
    }
    
    ///// STANDARD FUNCTIONS
    void UpdateProgress(dynamic batch, dynamic job, dynamic item)
    {
        var partitionFieldName = job.Get<string>("CopyDataPartitionFieldName");
    
        progress += 1;
        if (progress % PROGRESS_THRESHOLD == 0)
        {
            Log.Verbose("Periodically update status: job {Job} batch {Batch}", job["Name"], batch["Order"]);
            dataApi.Update(batchId, new
            {
                MigratedItems = progress,
                Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%"
            });
            ctx.CommitChanges(); // Commit partially
    
            // Try to break the loop if the batch was stopped by user/system
            var stopRuleParameters = new Dictionary<string, object>();
            stopRuleParameters["BatchId"] = batchId;
            var shouldStop = ruleApi.Eval<bool>("CwDm Should Batch Stop", stopRuleParameters);
            if (shouldStop)
            {
                Log.Info("Stopping job {Job} batch {Batch}...", job["Name"], batch["Order"]);
                throw new Exception($"The batch {batchId} was stopped. The system will try to cancel all change...");
            }
        }
    }
    
    void RunTest(dynamic batch, dynamic job, dynamic testItem)
    {
        var testRule = TEST_RULE;
        var stopIfTestFailed = job.Has("StopIfTestFailed") && job.Get<bool>("StopIfTestFailed");
        
        var testRuleParameters = new Dictionary<string, object>();
        testRuleParameters["SourceItem"] = testItem;
        var testResult = ruleApi.Eval<bool>(testRule, testRuleParameters);
        dataApi.Update(batchId, new
        {
            MigratedItems = progress,
            Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%",
            IsTestPassed = testResult
        });
        ctx.CommitChanges(); // Commit partially
    
        Log.Debug("Test result for batch {Job} {Batch}: {TestResult}", job.Get<string>("Name"), batch.Get<int>("Order"), testResult);
        if (!testResult && stopIfTestFailed)
        {
            Log.Error("Test failed for batch {Job} {Batch}", job.Get<string>("Name"), batch.Get<int>("Order"));
            throw new Exception($"Test failed for batch {job.Get<string>("Name")} {batch.Get<int>("Order")}");
        }
    }
    
    void Migrate(dynamic batch, dynamic job, string item)
    {
        // You custom migration logic
        dataApi.Update(item, new { IsArchived = true });
    }
    

    Step 4: Run and verify

    Open the job Archive invoices and click Start. If everything is configured correctly, you will see batches being generated and run sequentially.

    image-20231213224005901

    In This Article
    Back to top Generated by DocFX