Synchronize catalogs from business process

Starting catalog synchronization with CatalogVersionSyncJob in a bussiness process action leads to “Error in worker SyncWorker : Entity not found ( pk = 8794627343487 name = ‘de.hybris.platform.persistence.processing_CronJob’ type code = ‘501’ db table = ‘cronjobs’)”. I didn’t find a way to fix such behaviour of syncing inside business process action, but I resolved this issue by triggering event from process action and running SyncJob in event listener. For me sync performs well only via SetupSyncJobService, performing job via CatalogSynchronizationService causes exception in SyncWorker.

Firstly we will define event:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package com.blog.core.event;

import de.hybris.platform.servicelayer.event.events.AbstractEvent;
public class StartCatalogSyncEvent extends AbstractEvent {

    private String processCode;

    public String getProcessCode() {
        return processCode;
    }

    public void setProcessCode(String processCode) {
        this.processCode = processCode;
    }
}

Than we will create action, which publishes event:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.blog.core.process.actions.productImport;

import com.blog.core.event.StartCatalogSyncEvent;
import com.blog.core.model.process.ProductImportProcessModel;
import de.hybris.platform.processengine.action.AbstractSimpleDecisionAction;
import de.hybris.platform.servicelayer.event.EventService;
import de.hybris.platform.task.RetryLaterException;
import org.apache.log4j.Logger;

import javax.annotation.Resource;

public class SyncCatalogAction extends AbstractSimpleDecisionAction<ProductImportProcessModel> {

    private static final Logger LOG = Logger.getLogger(SyncCatalogAction.class);

    @Resource
    private EventService eventService;

    @Override
    public Transition executeAction(ProductImportProcessModel process) throws RetryLaterException, Exception {

        StartCatalogSyncEvent catalogSyncEvent = new StartCatalogSyncEvent();
        catalogSyncEvent.setProcessCode(process.getCode());
        eventService.publishEvent(catalogSyncEvent);

        LOG.info(process.getCode() + ": Fired event for catalog sync.");
        return Transition.OK;
    }
}

And finally we will start catalog synchronizations in event listener:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
package com.blog.core.event.listeners;

import com.blog.core.event.StartCatalogSyncEvent;
import com.blog.core.model.process.ProductImportProcessModel;
import de.hybris.platform.commerceservices.setup.SetupSyncJobService;
import de.hybris.platform.core.Registry;
import de.hybris.platform.cronjob.enums.CronJobResult;
import de.hybris.platform.jalo.JaloSession;
import de.hybris.platform.processengine.BusinessProcessService;
import de.hybris.platform.processengine.model.BusinessProcessModel;
import de.hybris.platform.servicelayer.cronjob.PerformResult;
import de.hybris.platform.servicelayer.event.impl.AbstractEventListener;
import de.hybris.platform.servicelayer.model.ModelService;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static com.blog.core.constants.BlogCoreConstants.ENGLISH;
import static com.blog.core.constants.BlogCoreConstants.GERMAN;

public class CatalogSyncEventListener extends AbstractEventListener<StartCatalogSyncEvent> {

    private static final Logger LOG = Logger.getLogger(CatalogSyncEventListener.class);

    @Resource
    private SetupSyncJobService setupSyncJobService;

    @Resource
    private BusinessProcessService businessProcessService;

    @Resource
    private ModelService modelService;

    @Override
    protected void onEvent(StartCatalogSyncEvent startCatalogSyncEvent) {

        ExecutorService taskExecutor = Executors.newFixedThreadPool(2);
        List<Callable<PerformResult>> tasks = new ArrayList<>();

        tasks.add(createSyncTask(ENGLISH));
        tasks.add(createSyncTask(GERMAN));

        try {
            List<Future<PerformResult>> futures = taskExecutor.invokeAll(tasks);
            if (StringUtils.isNotBlank(startCatalogSyncEvent.getProcessCode())) {
                BusinessProcessModel processModel = businessProcessService.getProcess(startCatalogSyncEvent.getProcessCode());
                if (processModel instanceof ProductImportProcessModel) {
                    PerformResult result;
                    boolean isSuccess = true;
                    for (Future<PerformResult> future : futures) {
                        result = future.get();
                        if (!CronJobResult.SUCCESS.equals(result.getResult())) {
                            isSuccess = false;
                        }
                    }
                    if (isSuccess) {
                        ((ProductImportProcessModel) processModel).setSyncSuccess(Boolean.TRUE);
                    } else {
                        ((ProductImportProcessModel) processModel).setFailMessage("Sync error. English catalog was not sync successful.");
                    }
                    modelService.save(processModel);
                }
                businessProcessService.triggerEvent(startCatalogSyncEvent.getProcessCode() + "_SynchronizationEnd");
            }
        } catch (InterruptedException | ExecutionException e) {
            if (StringUtils.isNotBlank(startCatalogSyncEvent.getProcessCode())) {
                BusinessProcessModel processModel = businessProcessService.getProcess(startCatalogSyncEvent.getProcessCode());
                if (processModel instanceof ProductImportProcessModel) {
                    ((ProductImportProcessModel) processModel).setSyncSuccess(Boolean.FALSE);
                    ((ProductImportProcessModel) processModel).setFailMessage("Sync exception in thread executor.");
                    modelService.save(processModel);
                }
                businessProcessService.triggerEvent(startCatalogSyncEvent.getProcessCode() + "_SynchronizationEnd");
            }
        }
    }

    @NotNull
    private Callable<PerformResult> createSyncTask(String catalogName) {
        return () -> {
            try {
                Registry.activateMasterTenant();
                JaloSession.getCurrentSession().activate();

                return setupSyncJobService.executeCatalogSyncJob(String.format("%sProductCatalog", catalogName));
            } finally {
                JaloSession.deactivate();
                Registry.unsetCurrentTenant();
            }
        };
    }
}
comments powered by Disqus