编程语言
首页 > 编程语言> > 将显式和隐式并行与Java-8流混合

将显式和隐式并行与Java-8流混合

作者:互联网

过去,我使用两个线程编写了一些Java程序.
第一个线程(生产者)正在从API(C库)中读取数据,创建一个Java对象,然后将该对象发送给另一个线程.
C API正在传递事件流(无限).
线程使用LinkedBlockingQueue作为交换对象(放置,轮询)的管道.
第二个线程(消费者)正在处理该对象.
(我还发现代码在线程内更易读.第一个线程处理C API的东西并产生
正确的Java对象,第二个线程不受C API处理,并且正在处理数据).

现在,我很感兴趣,如何通过Java 8中提供的新流API来实现上述方案.
但是假设我想保留两个线程(生产者/消费者)!
第一个线程正在写入流.第二个线程正在从流中读取.
我也希望,我可以用这种技术处理更好的显式并行性(生产者/消费者)
并且在流中我可以使用一些隐式并行性(例如stream.parallel()).

我对新的Stream API没有太多经验.
因此,我在下面尝试了以下代码,以解决上述问题.

>我使用’generate’访问C API并将其提供给java流.
>我在使用者线程.parallel()中使用了测试和处理隐式并行性.看起来不错但请参见下文.

问题:

>在这种情况下,“生成”对于生产者是最好的方法吗?
>我有一个理解问题,如何在生产者中终止/关闭流,
如果API有一些错误,并且我想关闭整个管道.
我是否使用stream.close或引发异常?

> 2.1我使用了stream.close().但是“生成”在关闭后仍在运行,
我发现仅引发异常以终止生成部分.
此异常进入流,并且消费者正在接收该异常
(这对我很好,消费者可以识别并终止).
但是在这种情况下,生产者生产了更多的产品,而消费者处理了更多的产品,而异常情况正在到来.
> 2.2,如果使用者使用隐式并行性stream.parallel().生产者正在处理更多物品.
因此,我看不到该问题的任何解决方案. (访问C API,检查错误,做出决定).
> 2.3在生产者中引发异常会到达消费者流,但并非所有插入的对象都会得到处理.

再说一遍:想法是与线程具有显式并行性.
但是在内部,我可以处理新功能并在可能的情况下使用并行处理

也感谢您繁殖这个问题.

package sandbox.test;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;

public class MyStream {
    private volatile LongStream stream = null;
    private AtomicInteger producerCount = new AtomicInteger(0);
    private AtomicInteger consumerCount = new AtomicInteger(0);
    private AtomicInteger apiError = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
    MyStream appl = new MyStream();
    appl.create();
    }

    private static void sleep(long sleep) {
    try {
        Thread.sleep(sleep);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    }

    private static void apiError(final String pos, final int iteration) {
    RuntimeException apiException = new RuntimeException("API error pos=" + pos + " iteration=" + iteration);
    System.out.println(apiException.getMessage());
    throw apiException;
    }

    final private int simulateErrorAfter = 10;

    private Thread produce() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
        System.out.println("Producer started");
        stream = LongStream.generate(() -> {
            int localCount;
            // Detect error, while using stream.parallel() processing
            int error = apiError.get();
            if ( error > 0 )
                apiError("1", error);
            // ----- Accessing the C API here -----
            localCount = producerCount.incrementAndGet(); // C API access; delegate for accessing the C API
            // ----- Accessing the C API here -----

            // Checking error code from C API
            if ( localCount > simulateErrorAfter ) { // Simulate an API error
                producerCount.decrementAndGet();
                stream.close();
                apiError("2", apiError.incrementAndGet());
            }
            System.out.println("P: " + localCount);
            sleep(200L);
            return localCount;
            });
        System.out.println("Producer terminated");
        }
    });
    thread.start();
    return thread;
    }

    private Thread consume() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
        try {
            stream.onClose(new Runnable() {
            @Override
            public void run() {
                System.out.println("Close detected");
            }
            }).parallel().forEach(l -> {
            sleep(1000);
            System.out.println("C: " + l);
            consumerCount.incrementAndGet();
            });
        } catch (Exception e) {
            // Capturing the stream end
            System.out.println(e);
        }
        System.out.println("Consumer terminated");
        }
    });
    thread.start();
    return thread;
    }

    private void create() throws InterruptedException {
    Thread producer = produce();
    while ( stream == null )
        sleep(10);
    Thread consumer = consume();
    producer.join();
    consumer.join();
    System.out.println("Produced: " + producerCount);
    System.out.println("Consumed: " + consumerCount);

    }
}

解决方法:

您需要了解有关Stream API的一些基本要点:

>应用于流的所有操作都是延迟操作,在应用终端操作之前不会执行任何操作.使用“生产者”线程创建流没有任何意义,因为该线程不会执行任何操作.所有操作都在您的“消费者”线程内执行,而后台线程由Stream实现本身启动.创建Stream实例的线程是完全不相关的
>关闭流与流操作本身无关,即不关闭线程.它旨在释放其他资源,例如关闭与Files.lines(…)返回的流关联的文件.您可以使用onClose安排此类清理操作,当您调用close时,Stream会调用它们,仅此而已.对于Stream类本身,它没有任何意义.
>流不会像“一个线程正在写入而另一个线程正在读取”这样的场景建模.他们的模型是“一个线程调用您的供应商,然后调用您的消费者,另一个线程执行相同的操作,另外x个线程也执行此操作……”

如果要使用不同的生产者线程和使用者线程来实现生产者/消费者方案,最好使用Threads或ExecutorService和线程安全队列.

但是您仍然可以使用Java 8功能.例如.无需使用内部类实现Runnable;您可以为它们使用lambda表达式.

标签:java-8,java-stream,multithreading,parallel-processing,java
来源: https://codeday.me/bug/20191120/2046211.html