Angepasster Thread-Pool in parallelem Java 8-Stream

Ist es möglich, einen benutzerdefinierten Thread-Pool für den parallelen Java-8- Stream anzugeben? Ich kann es nirgendwo finden.

Stellen Sie sich vor, ich hätte eine Serveranwendung und möchte parallele Streams verwenden. Aber die Anwendung ist groß und multi-threaded, also möchte ich sie teilen. Ich möchte keine langsam laufende Aufgabe in einem Modul der Anwendungsblockaufgaben von einem anderen Modul.

Wenn ich verschiedene Thread-Pools für verschiedene Module nicht verwenden kann, bedeutet dies, dass ich in den meisten Situationen in der realen Welt keine parallelen Streams verwenden kann.

Versuchen Sie das folgende Beispiel. Es gibt einige CPU-intensive Aufgaben, die in separaten Threads ausgeführt werden. Die Aufgaben nutzen parallele Ströme. Die erste Aufgabe ist unterbrochen, so dass jeder Schritt 1 Sekunde dauert (simuliert durch Thread-Schlaf). Das Problem besteht darin, dass andere Threads blockiert bleiben und warten, bis die errorshafte Aufgabe abgeschlossen ist. Dies ist ein künstliches Beispiel, aber stellen Sie sich eine Servlet-App vor und jemanden, der eine lange laufende Aufgabe an den gemeinsamen Fork-Join-Pool abgibt.

public class ParallelTest { public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); es.execute(() -> runTask(1000)); //incorrect task es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.shutdown(); es.awaitTermination(60, TimeUnit.SECONDS); } private static void runTask(int delay) { range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max() .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max)); } public static boolean isPrime(long n) { return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0); } } 

    Es gibt tatsächlich einen Trick, wie man eine parallele Operation in einem bestimmten Fork-Join-Pool ausführt. Wenn Sie es als Aufgabe in einem Fork-Join-Pool ausführen, bleibt es dort und verwendet nicht das gemeinsame.

     ForkJoinPool forkJoinPool = new ForkJoinPool(2); forkJoinPool.submit(() -> //parallel task here, for example IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()) ).get(); 

    Der Trick basiert auf ForkJoinTask.fork, das spezifiziert: “Sorgt dafür, diese Aufgabe in dem Pool, in dem die aktuelle Aufgabe läuft, asynchron auszuführen, falls zutreffend, oder unter Verwendung von ForkJoinPool.commonPool (), wenn nicht inForkJoinPool ()”

    Die parallelen Streams verwenden den standardmäßigen ForkJoinPool.commonPool der standardmäßig einen Thread weniger als processoren hat , wie von Runtime.getRuntime().availableProcessors() (Dies bedeutet, dass parallele Streams alle Ihre processoren verwenden, da sie auch den Hauptthread verwenden) ):

    Für Anwendungen, die separate oder benutzerdefinierte Pools erfordern, kann ein ForkJoinPool mit einer bestimmten Zielparallelitätsebene erstellt werden. Standardmäßig entspricht die Anzahl der verfügbaren processoren.

    Dies bedeutet auch, dass wenn Sie parallele Streams verschachtelt haben oder mehrere parallele Streams gleichzeitig gestartet wurden, sie sich alle den gleichen Pool teilen . Vorteil: Sie werden nie mehr als den Standardwert (Anzahl der verfügbaren processoren) verwenden. Nachteil: Sie erhalten möglicherweise nicht alle processoren, die jedem von Ihnen initiierten parallelen Stream zugeordnet sind (falls Sie mehrere haben). (Anscheinend können Sie einen ManagedBlocker verwenden , um das zu umgehen.)

    Um die Art zu ändern, in der parallele Streams ausgeführt werden, können Sie entweder

    • Übermitteln yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); die parallele Stream-Ausführung an Ihren eigenen ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); oder
    • Sie können die Größe des allgemeinen Pools mithilfe der Systemeigenschaften ändern: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") für eine System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") von 20 Threads.

    Beispiel des Letzteren auf meiner Maschine, die 8 processoren hat. Wenn ich folgendes Programm starte:

     long start = System.currentTimeMillis(); IntStream s = IntStream.range(0, 20); //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); s.parallel().forEach(i -> { try { Thread.sleep(100); } catch (Exception ignore) {} System.out.print((System.currentTimeMillis() - start) + " "); }); 

    Die Ausgabe ist:

    215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

    Sie können also sehen, dass der parallele Stream 8 Elemente gleichzeitig verarbeitet, dh 8 Threads verwendet. Wenn ich die kommentierte Zeile jedoch auskommentiere, lautet die Ausgabe:

    215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

    Dieses Mal hat der parallele Stream 20 Threads verwendet und alle 20 Elemente im Stream wurden gleichzeitig verarbeitet.

    Alternativ zum Trick, die parallele Berechnung in Ihrem eigenen forkJoinPool auszulösen, können Sie diesen Pool auch an die CompletableFuture.supplyAsync-Methode übergeben:

     ForkJoinPool forkJoinPool = new ForkJoinPool(2); CompletableFuture> primes = CompletableFuture.supplyAsync(() -> //parallel task here, for example range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), forkJoinPool ); 

    Wenn Sie ForkJoinPool verwenden und für einen parallelen Stream senden, werden nicht alle Threads zuverlässig verwendet. Wenn Sie sich das ansehen ( paralleler Strom von einem HashSet läuft nicht parallel ) und dies ( Warum verwendet der parallele Strom nicht alle Threads von ForkJoinPool? ), Sehen Sie die Argumentation.

    Kurze Version: Wenn ForkJoinPool / submit für Sie nicht funktioniert, verwenden Sie

     System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10"); 

    Bis jetzt habe ich die in den Antworten dieser Frage beschriebenen Lösungen verwendet. Jetzt habe ich eine kleine Bibliothek mit dem Namen Parallel Stream Support entwickelt :

     ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS); ParallelIntStreamSupport.range(1, 1_000_000, pool) .filter(PrimesPrint::isPrime) .collect(toList()) 

    Aber wie @PabloMatiasGomez in den Kommentaren darauf hingewiesen hat, gibt es Nachteile bezüglich des Splitting-Mechanismus paralleler Streams, die stark von der Größe des gemeinsamen Pools abhängen. Siehe Parallel-Stream von einem HashSet wird nicht parallel ausgeführt .

    Ich verwende diese Lösung nur, um separate Pools für verschiedene Arten von Arbeit zu haben, aber ich kann die Größe des gemeinsamen Pools nicht auf 1 festlegen, auch wenn ich sie nicht verwende.

    Um die tatsächliche Anzahl der verwendeten Threads zu messen, können Sie Thread.activeCount() überprüfen:

      Runnable r = () -> IntStream .range(-42, +42) .parallel() .map(i -> Thread.activeCount()) .max() .ifPresent(System.out::println); ForkJoinPool.commonPool().submit(r).join(); new ForkJoinPool(42).submit(r).join(); 

    Dies kann auf einer 4-core-CPU eine Ausgabe erzeugen wie:

     5 // common pool 23 // custom pool 

    Ohne .parallel() gibt es:

     3 // common pool 4 // custom pool 

    Geh zu AbacusUtil . Die Thread-Nummer kann für den parallelen Stream angegeben werden. Hier ist der Beispielcode:

     LongStream.range(4, 1_000_000).parallel(threadNum)... 

    Disclosure: Ich bin der Entwickler von AbacusUtil.

    Wenn es Ihnen nichts ausmacht, eine Bibliothek eines Drittanbieters zu verwenden, können Sie mit cyclops- reactive sequentielle und parallele Streams innerhalb derselben Pipeline mischen und benutzerdefinierte ForkJoinPools bereitstellen. Beispielsweise

      ReactiveSeq.range(1, 1_000_000) .foldParallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId())) .max(Comparator.naturalOrder())); 

    Oder wenn wir die Verarbeitung in einem sequentiellen Stream fortsetzen möchten

      ReactiveSeq.range(1, 1_000_000) .parallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))) .map(this::processSequentially) .forEach(System.out::println); 

    [Disclosure Ich bin der führende Entwickler von Cyclops-reagieren]

    Ich habe den benutzerdefinierten ForkJoinPool wie folgt versucht, um die Poolgröße anzupassen:

     private static Set ThreadNameSet = new HashSet<>(); private static Callable getSum() { List aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList()); return () -> aList.parallelStream() .peek((i) -> { String threadName = Thread.currentThread().getName(); ThreadNameSet.add(threadName); }) .reduce(0L, Long::sum); } private static void testForkJoinPool() { final int parallelism = 10; ForkJoinPool forkJoinPool = null; Long result = 0L; try { forkJoinPool = new ForkJoinPool(parallelism); result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { if (forkJoinPool != null) { forkJoinPool.shutdown(); //always remember to shutdown the pool } } out.println(result); out.println(ThreadNameSet); } 

    Hier ist die Ausgabe, die besagt, dass der Pool mehr Threads als die Standard 4 verwendet .

     50000005000000 [ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2] 

    Aber eigentlich gibt es einen Spinner , als ich mit ThreadPoolExecutor das gleiche Ergebnis wie folgt erreichen wollte:

     BlockingDeque blockingDeque = new LinkedBlockingDeque(1000); ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread")); 

    aber ich habe versagt.

    Es wird nur den ParallelStream in einem neuen Thread starten und dann ist alles andere gleich, was wiederum beweist, dass der parallelStream den ForkJoinPool verwenden wird , um seine untergeordneten Threads zu starten.

    Hinweis: Es scheint ein Fix in JDK 10 implementiert zu sein, der sicherstellt, dass der benutzerdefinierte Thread-Pool die erwartete Anzahl von Threads verwendet.

    Die Ausführung paralleler Streams in einem benutzerdefinierten ForkJoinPool sollte der Parallelität https://bugs.openjdk.java.net/browse/JDK-8190974 folgen