ThreadPool for async non-blocking execution

It is a common case to execute requests to third-party systems on Hybris model change. For that purpose perfectly fits Hybris Interceptors, but execution of long-running operations inside interceptors can lead to great performance degradations and data lost. To deal with that, operations should be executed asynchronously in separate thread.

Execution of code, which requires access to DB, in new thread would lead to exceptions, like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
[2021/09/14 12:18:18.003] DEBUG [Thread-37] [ThreadExecutorUtils] Exception:
 java.lang.IllegalStateException: no tenant active. if you do not want to use tenants, call Registry.activateMasterTenant() to assure the Master tenant is active.
	at de.hybris.platform.core.Registry.getCurrentTenant(Registry.java:811) ~[coreserver.jar:?]
	at de.hybris.platform.jalo.JaloSession.getCurrentSession(JaloSession.java:837) ~[coreserver.jar:?]
	at de.hybris.platform.servicelayer.internal.model.impl.JaloPersistenceObject.readRawValue(JaloPersistenceObject.java:93) ~[coreserver.jar:?]
	at de.hybris.platform.servicelayer.internal.converter.impl.ItemModelConverter.readSingleAttribute(ItemModelConverter.java:1432) ~[coreserver.jar:?]
	at de.hybris.platform.servicelayer.internal.converter.impl.ItemAttributeProvider.getAttribute(ItemAttributeProvider.java:110) ~[coreserver.jar:?]
	at de.hybris.platform.servicelayer.model.ItemModelContextImpl.loadUnlocalizedAttribute(ItemModelContextImpl.java:285) ~[coreserver.jar:?]
	at de.hybris.platform.servicelayer.model.ItemModelContextImpl.getValue(ItemModelContextImpl.java:245) ~[coreserver.jar:?]
	at de.hybris.platform.servicelayer.model.ItemModelContextImpl.getPropertyValue(ItemModelContextImpl.java:261) ~[coreserver.jar:?]
	at de.hybris.platform.core.model.user.UserModel.getGigyaUid(UserModel.java:513) ~[models.jar:?]
	at com.gigya.login.utils.GigyaUserSearchService.findUserInGigya(GigyaUserSearchService.java:18) ~[classes/:?]
	at com.gigya.login.service.impl.DefaultGigyaProfileSyncService.syncIntoHybris(DefaultGigyaProfileSyncService.java:45) ~[classes/:?]
	at com.gigya.login.interceptors.SyncGigyaProfileIntoHybrisPrepareInterceptor.lambda$0(SyncGigyaProfileIntoHybrisPrepareInterceptor.java:21) ~[classes/:?]
	at com.blog.core.util.ThreadExecutorUtils.lambda$0(ThreadExecutorUtils.java:28) ~[classes/:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

Root cause is that newly created thread is not registered in hybris thread registry and has missing hybris context. So we need to register our new thread and bind with hybris context. For example, could be used such util class to do that and run any runnable asynchronously in hybris:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.blog.core;

import de.hybris.platform.core.Registry;
import de.hybris.platform.core.Tenant;
import de.hybris.platform.jalo.JaloSession;
import org.apache.log4j.Logger;

import static org.apache.log4j.Logger.getLogger;

public class ThreadExecutorUtils {

    private static final Logger LOG = getLogger(ThreadExecutorUtils.class);

    private ThreadExecutorUtils() {
        // ! Utils class must not be initialized!
    }

    public static void executeAsyncInHybrisContext(Runnable runnable) {
        final Tenant tenant = Registry.getCurrentTenantNoFallback();
        new Thread(() -> {
            try {
                // ! Used to activate tenant instead of Registry.activateMasterTenant()
                // ! We are passing tenant instead of activating master tenant to not mess up integration tests, which works in junit tenant.
                // ! Activating of master tenant could lead to randomly failed integration tests.
                Registry.setCurrentTenant(tenant);
                JaloSession.getCurrentSession().activate();

                runnable.run();

            } catch (Exception e) {
                LOG.warn(e.getMessage());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Exception:", e);
                }
            } finally {
                JaloSession.deactivate();
                Registry.unsetCurrentTenant();
            }
        }).start();
    }

}

Creating new threads and binding/unbinding with hybris is expensive operation, so there could be a sense to use ThreadPool instead of creating new threads each time. OOTB hybris provides general purpose thread pool, which could be used to run something in background.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package com.blog.core.util;

import de.hybris.platform.core.Registry;
import de.hybris.platform.util.threadpool.PoolableThread;
import de.hybris.platform.util.threadpool.ThreadPool;

public class ThreadExecutorUtils {

    private ThreadExecutorUtils() {
        // ! Utils class must not be initialized!
    }

    public static void executeAsyncInHybrisContext(Runnable runnable) {
        ThreadPool threadPool = Registry.getCurrentTenant().getThreadPool();
        // Or worker thread pool could be used -  Registry.getCurrentTenant().getWorkersThreadPool()
        PoolableThread poolableThread = threadPool.borrowThread();

        poolableThread.execute(runnable);
    }
}

The only issue with such approach is threadPool.borrowThread(), which can wait for a long time to get a thread from a pool, or even fail with pool exhausted exception. Such behaviour is not acceptable for usage inside interceptors. Scheduling task in queue instead of borrowing free threads could be used instead. ThreadPoolTaskExecutor implementation from spring framework perfectly fits for that.

To create instance of ThreadPoolTaskExecutor must be provided ThreadFactory, which would create threads with hybris context. Hybris OOTB de.hybris.platform.core.TenantAwareThreadFactory does that, but there is no possibility to override thread names generated by that factory in ThreadPoolTaskExecutor. To be able to assign custom names to thread pool threads it is required to create a custom thread factory, for example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.blog.core.concurrent;

import de.hybris.platform.core.Tenant;
import de.hybris.platform.core.TenantAwareThreadFactory;

import java.util.concurrent.atomic.AtomicInteger;

public class TenantAwareNamedThreadFactory extends TenantAwareThreadFactory {

    public static final String MIDDLE_PART_OF_THREAD_NAME = "-async-";
    private final String threadNamePrefix;

    private final AtomicInteger threadCount = new AtomicInteger(0);

    public TenantAwareNamedThreadFactory(String threadNamePrefix, Tenant tenant) {
        super(tenant);
        this.threadNamePrefix = threadNamePrefix;
    }

    @Override
    public Thread newThread(Runnable runnable) {
        Thread thread = super.newThread(runnable);
        thread.setName(this.nextThreadName());
        return thread;
    }

    protected String nextThreadName() {
        int newThreadCount = this.threadCount.incrementAndGet();
        if (newThreadCount < 0) {
            this.threadCount.set(0);
            return this.threadNamePrefix + MIDDLE_PART_OF_THREAD_NAME + this.threadCount.incrementAndGet();
        }
        return this.threadNamePrefix + MIDDLE_PART_OF_THREAD_NAME + newThreadCount;
    }

}

Now thread pool can be instantiated in spring:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

<bean id="gigyaTenantAwareThreadFactory" class="com.blog.core.concurrent.TenantAwareNamedThreadFactory">
    <constructor-arg name="threadNamePrefix" value="gigya"/>
    <constructor-arg name="tenant" ref="tenantFactory"/>
</bean>

<bean id="gigyaAsyncThreadPoolExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadFactory" ref="gigyaTenantAwareThreadFactory"/>
<property name="corePoolSize" value="25"/>
<property name="maxPoolSize" value="60"/>
<property name="waitForTasksToCompleteOnShutdown" value="true"/>
</bean>

Example of thread pool usage:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package com.blog.core.concurrent;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;

public class ThreadExecutorHelper {

    @Resource(name = "gigyaAsyncThreadPoolExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public void executeAsync(Runnable runnable) {
        threadPoolTaskExecutor.submit(runnable);
    }
}

Also it is pretty common to have more than one thread pool for different types of task. ThreadExecutorHelper can be extended to work with multiple thread pool. Firstly, let’s introduce Enum to differentiate thread pools:

1
2
3
package com.blog.core.concurrent;

public enum ThreadPoolType {DEFAULT, GIGYA}

Default thread pool type is introduced to have a general purpose async thread pool for usage as a fallback pool. Also map between type and ThreadPool itself is created:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17

<bean id="blogTenantAwareThreadFactory" class="com.blog.core.concurrent.TenantAwareNamedThreadFactory">
    <constructor-arg name="threadNamePrefix" value="blog"/>
    <constructor-arg name="tenant" ref="tenantFactory"/>
</bean>

<bean id="singleThreadWithUnlimitedQueueExecutor"
      class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="1"/>
<property name="maxPoolSize" value="1"/>
<property name="threadFactory" ref="blogTenantAwareThreadFactory"/>
</bean>

<util:map id="asyncThreadPoolMap" key-type="com.blog.core.concurrent.ThreadPoolType">
<entry key="DEFAULT" value-ref="singleThreadWithUnlimitedQueueExecutor"/>
<entry key="GIGYA" value-ref="gigyaAsyncThreadPoolExecutor"/>
</util:map>

After that ThreadExecutorHelper could be extended to support multiple thread pools:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.blog.core.concurrent;

import org.apache.log4j.Logger;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.util.Map;

import static org.apache.log4j.Logger.getLogger;

public class ThreadExecutorHelper {

    private static final Logger LOG = getLogger(ThreadExecutorHelper.class);

    @Resource
    private Map<ThreadPoolType, ThreadPoolTaskExecutor> asyncThreadPoolMap;

    public void executeAsync(Runnable runnable) {
        executeAsync(ThreadPoolType.DEFAULT, runnable);
    }

    public void executeAsync(ThreadPoolType threadPoolType, Runnable runnable) {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = resolveThreadPool(threadPoolType);

        if (threadPoolTaskExecutor != null) {
            threadPoolTaskExecutor.submit(runnable);
        } else {
            String errorMessage = threadPoolType + " thread pool type is not assigned to real thread pool. Task can't be executed.";
            LOG.error("Exception:", new Throwable(errorMessage));
        }
    }

    private ThreadPoolTaskExecutor resolveThreadPool(ThreadPoolType threadPoolType) {
        if (threadPoolType == null) {
            return asyncThreadPoolMap.get(ThreadPoolType.DEFAULT);
        }

        return asyncThreadPoolMap.get(threadPoolType);
    }

}
comments powered by Disqus