编程语言
首页 > 编程语言> > java-TomEE在太多@Asynchronous操作上窒息

java-TomEE在太多@Asynchronous操作上窒息

作者:互联网

我使用的是预定义的HSQLDB,几乎是开箱即用的Apache TomEE 1.5.2 JAX-RS.

以下是简化的代码.我有一个用于接收信号的REST风格的接口:

@Stateless
@Path("signal")
public class SignalEndpoint {
    @Inject
    private SignalStore store;

    @POST
    public void post() {
        store.createSignal();
    }
}

接收信号会触发很多东西.商店将创建一个实体,然后触发一个异步事件.

public class SignalStore {
    @PersistenceContext
    private EntityManager em;

    @EJB
    private EventDispatcher dispatcher;

    @Inject
    private Event<SignalEntity> created;

    public void createSignal() {
        SignalEntity entity = new SignalEntity();
        em.persist(entity);
        dispatcher.fire(created, entity);
    }
}

调度程序非常简单,仅用于使事件处理异步.

@Stateless
public class EventDispatcher {
    @Asynchronous
    public <T> void fire(Event<T> event, T parameter) {
        event.fire(parameter);
    }
}

接收事件是另外一回事,它从信号中获取数据,进行存储,然后触发另一个异步事件:

@Stateless
public class DerivedDataCreator {
    @PersistenceContext
    private EntityManager em;

    @EJB
    private EventDispatcher dispatcher;

    @Inject
    private Event<DerivedDataEntity> created;

    @Asynchronous
    public void onSignalEntityCreated(@Observes SignalEntity signalEntity) {
        DerivedDataEntity entity = new DerivedDataEntity(signalEntity);
        em.persist(entity);
        dispatcher.fire(created, entity);
    }
}

对此的回应甚至是实体创建的第三层.

总而言之,我有一个REST调用,该调用同步创建一个SignalEntity,该异步触发一个DerivedDataEntity的创建,该DerivedDataEntity异步触发一个第三种类型的实体的创建.一切都完美运行,并且存储过程完美解耦.

除了我以编程方式在for循环中触发大量(例如1000)信号时.根据我的AsynchronousPool大小,在处理信号(非常快)约为该大小的一半之后,该应用程序完全冻结长达几分钟.然后恢复,以相当快的速度处理大约相同数量的信号,然后再次冻结.

在过去的半小时内,我一直在使用AsynchronousPool设置.例如,将其设置为2000,可以轻松地一次处理所有信号,而不会冻结.但是之后,系统也不是理智的.触发另外1000个信号,导致可以正常创建它们,但是从未创建完整的派生数据.

现在我完全不知所措.我当然可以摆脱所有这些异步事件并自己实现某种队列,但是我一直认为EE容器的目的是让我摆脱这种乏味.异步EJB事件应该已经拥有了自己的队列机制.当队列太满时不应冻结的一种.

有任何想法吗?

更新:

我现在已经用1.6.0-SNAPSHOT尝试过.它的行为略有不同:它仍然无法正常工作,但我确实遇到了一个例外:

Aug 01, 2013 3:12:31 PM org.apache.openejb.core.transaction.EjbTransactionUtil handleSystemException
SEVERE: EjbTransactionUtil.handleSystemException: fail to allocate internal resource to execute the target task
javax.ejb.EJBException: fail to allocate internal resource to execute the target task
    at org.apache.openejb.async.AsynchronousPool.invoke(AsynchronousPool.java:81)
    at org.apache.openejb.core.ivm.EjbObjectProxyHandler.businessMethod(EjbObjectProxyHandler.java:240)
    at org.apache.openejb.core.ivm.EjbObjectProxyHandler._invoke(EjbObjectProxyHandler.java:86)
    at org.apache.openejb.core.ivm.BaseEjbProxyHandler.invoke(BaseEjbProxyHandler.java:303)
    at <<... my code ...>>
    ...
Caused by: java.util.concurrent.RejectedExecutionException: Timeout waiting for executor slot: waited 30 seconds
    at org.apache.openejb.util.executor.OfferRejectedExecutionHandler.rejectedExecution(OfferRejectedExecutionHandler.java:55)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132)
    at org.apache.openejb.async.AsynchronousPool.invoke(AsynchronousPool.java:75)
    ... 38 more

好像TomEE不会对任何操作进行排队.如果在调用时没有线程可以自由处理,那么祝您好运.当然,这不是故意的.

更新2:

好的,我似乎偶然发现了一个半解决方案:将AsynchronousPool.QueueSize属性设置为maxint可以解决冻结问题.但是问题仍然存在:为什么首先要限制QueueSize,更令人担忧的是:为什么这会阻塞整个应用程序?如果队列已满,它将阻塞,但是一旦执行了任务,就应该弹出另一个任务,对吗?队列似乎被阻塞,直到再次完全清空为止.

更新3:

对于任何想去的人:http://github.com/JanDoerrenhaus/tomeefreezetestcase

更新4:

事实证明,增加队列大小并不能解决问题,只会延迟它.问题仍然存在:一次异步操作太多,TomEE太糟糕了,甚至在终止时也无法取消部署该应用程序.

到目前为止,我的诊断是任务清理无法正常进行.我的任务非常小而快速(请参见test case on github).我已经担心OpenJPA或HSQLDB在进行太多并发调用时会变慢,但是我注释掉了所有em.persist调用,问题仍然存在.因此,如果我的任务很小而且很快速,但是仍然设法将TomEE拦截得很糟糕,以至于在30秒后它无法再执行任何其他任务(javax.ejb.EJBException:无法分配内部资源来执行目标任务),我可以想象完成的任务会持续存在,堵塞管道.

我该如何解决这个问题?

解决方法:

基本上,BlockingQueues使用锁来确保数据的一致性并避免数据丢失,因此在并发度很高的环境中,它将拒绝很多任务(在您的情况下).

您可以使用RejectedExecutionHandler实现在中继上播放,以重试提供该任务.一种实现可以是:

new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
            for (int i = 0; i < 10; i++) {
                if (executor.getQueue().offer(r)) {
                    return;
                }

                try {
                    Thread.sleep(50);
                } catch (final InterruptedException e) {
                    // no-op
                }
            }
            throw new RejectedExecutionException();
        }
    }

它甚至可以在随机睡眠(最小和最大)之间更好地工作.

这个想法基本上是:如果队列已满,请等待一小段时间以减少并发.

标签:asynchronous,ejb,tomee,java
来源: https://codeday.me/bug/20191123/2063884.html