Search Results for

    Show / Hide Table of Contents

    Component: Data migration - Custom batch processing

    As described in the configuration guide, the component always processes copy-data jobs in batches. This enables parallel processing as well as makes it easier to monitor and troubleshoot. This article will guide you on configuring batch processing for custom jobs.

    Step 1: Create the custom job

    Assuming you are migrating data from the well-known database WideWorldImporters, and after migrating all [Sales].[Invoices], you need to run a custom job to update their statuses to Archived.

    Follow the configuration guide to create a custom job.

    image-20231213220248360

    Ensure you add a synchronous event trigger named CwDm Sample SalesInvoice Generate Batches.

    image-20231213220603679

    Step 2: Configure the workflow for job processing

    The workflow definition for CwDm Sample SalesInvoice Generate Batches should look like below:

    image-20231213220654730

    Scripted activity "Delete batches": A job can be rerun. To avoid duplicating batches, the workflow should delete existing batches, if any.

    var dataApi = ctx.Use<IDataApi>();
    
    try
    {
        // Update progress
        var job = dataApi.Load(ctx.Input.Data["JobId"].ToString());
        dataApi.Update(job.Id, new 
        { 
            Progress = "Cleaning up existing batches...",
        });
        ctx.CommitChanges();
    
        // Loop through existing batches using DataApi.Enumerate()
        var filter = FilterBuilder.Create().Eq("JobId", job.Id).Build();
        var query = DataEnumerationQuery.For("CwDmBatch").FilterBy(filter).ProjectOn("Id").SetBatchSize(1000);
    
        foreach (var batch in dataApi.Enumerate(query))
        {
            dataApi.Delete(batch["Id"].ToString());
        }
    
        ctx.CommitChanges();
    }
    catch(Exception ex)
    {
        Log.Error(ex, "Error in deleting existing batches");
        ctx.Set("Error", ex.ToString());
    }
    

    Scripted activity "Generate batches": Split the data collection into batches.

    var dataApi = ctx.Use<IDataApi>();
    
    var job = dataApi.Load(ctx.Input.Data["JobId"].ToString());
    
    // Update progress
    dataApi.Update(job.Id, new 
    { 
        Progress = "Generating batches...",
    });
    ctx.CommitChanges();
    
    var BATCH_SIZE = 1000;
    var CUSTOM_WORKFLOW_EVENT = "CwDm Sample SalesInvoice Archive";
    
    var batchOrder = 1;
    var index = 0;
    var itemIds = new List<string>();
    object fromValue = null;
    object toValue = null;
    
    // Loop through all records using DataApi.Enumerate
    var filter = FilterBuilder.Create().Build();
    var query = DataEnumerationQuery.For("CwDmSampleSalesInvoice")
        .FilterBy(filter)
        .ProjectOn("Id", "CwDmLegacyId")
        .OrderBy("CwDmLegacyId");
    
    foreach (var item in dataApi.Enumerate(query))
    {
        if (fromValue == null) fromValue = item.Get<int>("CwDmLegacyId");
        toValue = item.Get<int>("CwDmLegacyId");
        itemIds.Add(item.Get<string>("Id"));
    
        index++;
        if (index == BATCH_SIZE)
        {
            CreateBatch(fromValue, toValue, itemIds);
    
            // Reset for new batch
            index = 0;
            fromValue = null;
            toValue = null;
            itemIds = new List<string>();
        }
    
        if (batchOrder % 100 == 0)
        {
            // Write to DB every 100 batches created
            ctx.CommitChanges();
        }
    }
    
    // Create final batch
    if (itemIds.Any())
    {
        CreateBatch(fromValue, toValue, itemIds);
        ctx.CommitChanges(); // Final write
    }
    
    // Mark the job that all batches are generated
    dataApi.Update(job.Id, new 
    { 
        IsBatchReady = true
    });
    
    string CreateBatch(object fromValue, object toValue, List<string> items)
    {
        return dataApi.Add("CwDmBatch", new 
        {
            // Standard
            JobId = job.Get<string>("Id"),
            ProcessId = job.Get<string>("ProcessId"),
            FromValue = JsonConvert.SerializeObject(fromValue),
            ToValue = JsonConvert.SerializeObject(toValue),
            TotalItems = items.Count,
            MigratedItems = 0,
            Status = "Ready",
            Progress = "",
            Order = batchOrder++,
    
            // Custom
            CustomWorkflowEvent = CUSTOM_WORKFLOW_EVENT,
            CustomItems = items
        });
    }
    

    The following is required when creating custom batches:

    • CustomWorkflowEvent: The workflow event to process the batch. It is CwDm Sample SalesInvoice Archive in this example.
    • CustomItems: The identifier of the data item needs to migrate.

    Setting FromValue and ToValue is recommended so users can quickly identify the batch using the visual monitoring tool.

    Step 3: Configure the workflow for batch processing

    For this example to work, ensure you add a synchronous event trigger named CwDm Sample SalesInvoice Archive.

    Below is an example of what a batch-processing workflow looks like. It simply loops through the items stored in the batches and updates data accordingly. The example also shows how you can run a test rule to verify data after migration.

    var pluginApi = ctx.Use<IPluginApi>(); 
    var dataApi = ctx.Use<IDataApi>();
    var ruleApi = ctx.Use<IRuleApi>();
    
    var batchId = ctx.Input["Data"]["BatchId"].ToString();
    var PROGRESS_THRESHOLD = 100;
    var TEST_RULE = "CwDm SO Test SalesInvoice Archive";
    var progress = 0;
    var total = 0;
    
    try
    {
        // Step 0: STANDARD - Extract event data
        var batch = dataApi.Load(batchId);
        var job = dataApi.Load(batch.Get<string>("JobId"));
    
        // Step 1: STANDARD - Query data
        IList<string> items = batch.GetList<string>("CustomItems");
        total = items.Count;
    
        // Step 2: STANDARD - Transform data and insert to DB
        foreach(dynamic item in items)
        {
            try
            {
                // STANDARD - Migrate data
                Migrate(batch, job, item);
    
                // STANDARD - Update progress
                UpdateProgress(batch, job, item);
            }
            catch(Exception ex)
            {
                Log.Error(ex, "Error in processing {@item}", item);
                throw;
            }
        }
    
        // Step 3: STANDARD - Commit the last progress
        dataApi.Update(batchId, new
        {
            IsTestPassed = (bool?)null,
            MigratedItems = progress,
            Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%"
        });
        ctx.CommitChanges(); 
    
        // Step 4: STANDARD - Execute test rule on the last item if test rule is configured
        if (items.Any() && !string.IsNullOrEmpty(TEST_RULE))
        {
            var testItem = items.Last();
            RunTest(batch, job, testItem);
        }
    }
    catch(Exception ex)
    {
        // STANDARD - Throw the error
        Log.Error(ex, "Error in copying data");
        throw;
    }
    
    ///// STANDARD FUNCTIONS
    void UpdateProgress(dynamic batch, dynamic job, dynamic item)
    {
        var partitionFieldName = job.Get<string>("CopyDataPartitionFieldName");
    
        progress += 1;
        if (progress % PROGRESS_THRESHOLD == 0)
        {
            Log.Verbose("Periodically update status: job {Job} batch {Batch}", job["Name"], batch["Order"]);
            dataApi.Update(batchId, new
            {
                MigratedItems = progress,
                Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%"
            });
            ctx.CommitChanges(); // Commit partially
    
            // Try to break the loop if the batch was stopped by user/system
            var stopRuleParameters = new Dictionary<string, object>();
            stopRuleParameters["BatchId"] = batchId;
            var shouldStop = ruleApi.Eval<bool>("CwDm Should Batch Stop", stopRuleParameters);
            if (shouldStop)
            {
                Log.Info("Stopping job {Job} batch {Batch}...", job["Name"], batch["Order"]);
                throw new Exception($"The batch {batchId} was stopped. The system will try to cancel all change...");
            }
        }
    }
    
    void RunTest(dynamic batch, dynamic job, dynamic testItem)
    {
        var testRule = TEST_RULE;
        var stopIfTestFailed = job.Has("StopIfTestFailed") && job.Get<bool>("StopIfTestFailed");
        
        var testRuleParameters = new Dictionary<string, object>();
        testRuleParameters["SourceItem"] = testItem;
        var testResult = ruleApi.Eval<bool>(testRule, testRuleParameters);
        dataApi.Update(batchId, new
        {
            MigratedItems = progress,
            Progress = $"Migrated {Math.Round(1.0 * progress / total, 2) * 100}%",
            IsTestPassed = testResult
        });
        ctx.CommitChanges(); // Commit partially
    
        Log.Debug("Test result for batch {Job} {Batch}: {TestResult}", job.Get<string>("Name"), batch.Get<int>("Order"), testResult);
        if (!testResult && stopIfTestFailed)
        {
            Log.Error("Test failed for batch {Job} {Batch}", job.Get<string>("Name"), batch.Get<int>("Order"));
            throw new Exception($"Test failed for batch {job.Get<string>("Name")} {batch.Get<int>("Order")}");
        }
    }
    
    void Migrate(dynamic batch, dynamic job, string item)
    {
        // You custom migration logic
        dataApi.Update(item, new { IsArchived = true });
    }
    

    Step 4: Run and verify

    Open the job Archive invoices and click Start. If everything is configured correctly, you will see batches are generated and run sequentially.

    image-20231213224005901

    In This Article
    Back to top Generated by DocFX