其他分享
首页 > 其他分享> > WebFlux block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http

WebFlux block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http

作者:互联网

block()报错

WebFlux中,如果是Mono/Flux.map()或者其他方法体是属于NonBlocking线程,如果在NonBlocking线程中再调用阻塞方法(block()等)会报错

static final class NonBlockingThread extends Thread implements NonBlocking {

        public NonBlockingThread(Runnable target, String name) {
            super(target, name);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread th = new NonBlockingThread(new Runnable() {
            @Override
            public void run() {
                if (Thread.currentThread() instanceof  NonBlocking){
                    System.out.println("====");
                }

                System.out.println(Mono.create(
                        monoSink -> {
                            monoSink.success("sasa");
                        }
                ).block());
            }
        }, "sasa");
        th.start();

        Thread.sleep(5000L);
    }

修复方式

用JUC,如果是webflux 编程可以使用Mono.delay + publishOn(Schedulers.boundedElastic())

public static void main(String[] args) throws InterruptedException {
        Thread th = new NonBlockingThread(new Runnable() {
            @Override
            public void run() {
                if (Thread.currentThread() instanceof  NonBlocking){
                    System.out.println("====");
                }
                CountDownLatch countDownLatch = new CountDownLatch(1);
                AtomicReference reference = new AtomicReference<>();
                Mono.create(
                        monoSink -> {
                            monoSink.success("sasa");
                        }
                ).publishOn(Schedulers.boundedElastic()).subscribe(
                       s->{
                           reference.set(s);
                           countDownLatch.countDown();
                       }
                );
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(reference.get());
            }
        }, "sasa");
        th.start();

        Thread.sleep(5000L);
    }

 

标签:http,reactor,thread,Thread,System,NonBlocking,monoSink,new,public
来源: https://blog.csdn.net/u013517141/article/details/114844918