Commit 9f7c33c6 by Patryk Czarnik

parallel stream

parent f57e749f
package parallel_stream.a;
import java.util.Arrays;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;
public class SprawdzanieIleWatkow1 {
public static void main(String[] args) {
IntUnaryOperator operacja = x -> {
System.out.println(Thread.currentThread().getId());
return 2*x;
};
int[] tab = new int[100];
Arrays.fill(tab, 33);
System.out.println("Sekwencyjnie:");
int suma1 = IntStream.of(tab).map(operacja).sum();
System.out.println("suma1 = " + suma1);
System.out.println();
System.out.println("Parallel:");
int suma2 = IntStream.of(tab).parallel().map(operacja).sum();
System.out.println("suma2 = " + suma2);
}
}
package parallel_stream.a;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;
public class SprawdzanieIleWatkow2 {
public static void main(String[] args) {
AtomicIntegerArray array = new AtomicIntegerArray(50);
IntUnaryOperator operacja = x -> {
int id = (int)Thread.currentThread().getId();
array.incrementAndGet(id);
return 2*x;
};
int[] tab = new int[1_000_000];
Arrays.fill(tab, 33);
System.out.println("Tablica wątków na początku:");
System.out.println(array);
int suma2 = IntStream.of(tab).parallel().map(operacja).sum();
System.out.println("suma2 = " + suma2);
System.out.println();
System.out.println("Tablica wątków na końcu:");
System.out.println(array);
int ileWatkow = 0;
for (int i = 0; i < array.length(); i++) {
if(array.get(i) > 0) {
ileWatkow++;
System.out.printf("wątek nr %3d - %8d razy\n", i, array.get(i));
}
}
System.out.println();
System.out.println("W sumie pracowało " + ileWatkow + " wątków.");
System.out.println("Ilość procesorów: " +
Runtime.getRuntime().availableProcessors());
}
}
package parallel_stream.a;
import java.util.Random;
import java.util.function.LongSupplier;
import java.util.stream.LongStream;
public class SumArray {
static final int N = 120_000_000;
static final int MAX = 1000;
static final int POWTORZENIA = 40;
static long[] tab;
public static void main(String[] args) {
System.out.println("Generuje dane");
tab = new long[N];
wylosuj();
System.out.println("pętla");
testuj(SumArray::petla);
System.out.println("stream");
testuj(SumArray::sekw);
System.out.println("parallel stream");
testuj(SumArray::parallel);
System.out.println();
System.out.println("pętla");
testuj(POWTORZENIA, SumArray::petla);
System.out.println("stream");
testuj(POWTORZENIA, SumArray::sekw);
System.out.println("parallel stream");
testuj(POWTORZENIA, SumArray::parallel);
}
private static void testuj(LongSupplier metoda) {
long start = System.currentTimeMillis();
long wynik = metoda.getAsLong();
long end = System.currentTimeMillis();
System.out.printf("czas: %6d, wynik: %15d\n", end - start, wynik);
}
private static void testuj(int n, LongSupplier metoda) {
long start = System.currentTimeMillis();
long wynik = 0L;
for(int i = 1; i <= n; i++) {
wynik += metoda.getAsLong();
}
long end = System.currentTimeMillis();
System.out.printf("czas: %6d, wynik: %15d\n", end - start, wynik);
}
private static void wylosuj() {
Random r = new Random();
for (int i = 0; i < tab.length; i++) {
tab[i] = r.nextInt(MAX);
}
}
private static long petla() {
long suma = 0L;
for (int i = 0; i < tab.length; i++) {
suma += tab[i];
}
return suma;
}
private static long sekw() {
return LongStream.of(tab).sum();
}
private static long parallel() {
return LongStream.of(tab).parallel().sum();
}
}
package parallel_stream.b_spliterator;
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.function.Consumer;
public class ArrayListSplit {
public static void main(String[] args) {
List<String> lista = new ArrayList<>();
lista.add("aaa");
lista.add("bbb");
lista.add("ccc");
lista.add("ddd");
lista.add("eee");
lista.add("fff");
lista.add("ggg");
lista.add("hhh");
lista.add("iii");
lista.add("jjj");
System.out.println(lista);
Consumer<String> akcja = s -> System.out.println("* " + s);
System.out.println("Pętla tryAdvance:");
Spliterator<String> spliterator1 = lista.spliterator();
while(spliterator1.tryAdvance(akcja));
System.out.println();
System.out.println("forEachRemaining:");
Spliterator<String> spliterator2 = lista.spliterator();
spliterator2.forEachRemaining(akcja);
System.out.println();
System.out.println("trySplit 2 poziomy:");
Spliterator<String> spliterator3 = lista.spliterator();
System.out.println("estimate size: " + spliterator3.estimateSize());
Spliterator<String> spliterator3a = spliterator3.trySplit();
System.out.println("Oryginalny:");
spliterator3.forEachRemaining(akcja);
System.out.println("wynik pierwszego split:");
spliterator3a.forEachRemaining(akcja);
System.out.println();
System.out.println("trySplit 3 poziomy:");
Spliterator<String> spliterator4 = lista.spliterator();
System.out.println("estimate size: " + spliterator4.estimateSize());
Spliterator<String> spliterator4a = spliterator4.trySplit();
Spliterator<String> spliterator4b = spliterator4.trySplit();
System.out.println("Oryginalny:");
spliterator4.forEachRemaining(akcja);
System.out.println("wynik pierwszego split:");
spliterator4a.forEachRemaining(akcja);
System.out.println("wynik drugiego split:");
spliterator4b.forEachRemaining(akcja);
System.out.println();
}
}
package parallel_stream.b_spliterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Spliterator;
import java.util.function.Consumer;
// Morał: LinkedList nie dzieli się na fragmenty, więc operacje nie będą zrównoleglane
public class LinkedListSplit {
public static void main(String[] args) {
List<String> lista = new LinkedList<>();
lista.add("aaa");
lista.add("bbb");
lista.add("ccc");
lista.add("ddd");
lista.add("eee");
lista.add("fff");
lista.add("ggg");
lista.add("hhh");
lista.add("iii");
lista.add("jjj");
System.out.println(lista);
Consumer<String> akcja = s -> System.out.println("* " + s);
System.out.println("Pętla tryAdvance:");
Spliterator<String> spliterator1 = lista.spliterator();
while(spliterator1.tryAdvance(akcja));
System.out.println();
System.out.println("forEachRemaining:");
Spliterator<String> spliterator2 = lista.spliterator();
spliterator2.forEachRemaining(akcja);
System.out.println();
System.out.println("trySplit 2 poziomy:");
Spliterator<String> spliterator3 = lista.spliterator();
System.out.println("estimate size: " + spliterator3.estimateSize());
Spliterator<String> spliterator3a = spliterator3.trySplit();
System.out.println("Oryginalny:");
spliterator3.forEachRemaining(akcja);
System.out.println("wynik pierwszego split:");
spliterator3a.forEachRemaining(akcja);
System.out.println();
System.out.println("trySplit 3 poziomy:");
Spliterator<String> spliterator4 = lista.spliterator();
System.out.println("estimate size: " + spliterator4.estimateSize());
Spliterator<String> spliterator4a = spliterator4.trySplit();
Spliterator<String> spliterator4b = spliterator4.trySplit();
System.out.println("4a " + spliterator4a);
System.out.println("4b " + spliterator4b); // null
System.out.println("Oryginalny:");
spliterator4.forEachRemaining(akcja);
System.out.println("wynik pierwszego split:");
spliterator4a.forEachRemaining(akcja);
// NPE
// System.out.println("wynik drugiego split:");
// spliterator4b.forEachRemaining(akcja);
System.out.println();
}
}
package parallel_stream.b_spliterator;
import java.util.function.LongSupplier;
public class MierzenieCzasu {
public static void uruchom(LongSupplier metoda) {
long start = System.nanoTime();
long wynik = metoda.getAsLong();
long end = System.nanoTime();
System.out.printf("czas: %12d , wynik = %d\n", (end - start) / 1000, wynik);
}
public static void uruchom(int n, LongSupplier metoda) {
long start = System.currentTimeMillis();
long wynik = 0L;
for(int i = 1; i <= n; i++) {
wynik += metoda.getAsLong();
}
long end = System.currentTimeMillis();
System.out.printf("czas: %6d, wynik: %15d\n", end - start, wynik);
}
}
package parallel_stream.b_spliterator;
import java.util.Spliterator;
import java.util.function.Consumer;
public class SpliteratorLiczbyNieparzyste implements Spliterator<Integer> {
private int max;
private int min;
// domyślnie 100 liczb
public SpliteratorLiczbyNieparzyste() {
this(100);
}
public SpliteratorLiczbyNieparzyste(int ile) {
this(0, ile);
}
private SpliteratorLiczbyNieparzyste(int min, int max) {
this.min = min;
this.max = max;
}
private int next() {
return 1 + 2 * min++;
}
private boolean hasNext() {
return min < max;
}
@Override
public boolean tryAdvance(Consumer<? super Integer> action) {
if(hasNext()) {
action.accept(this.next());
return true;
} else {
return false;
}
}
@Override
public SpliteratorLiczbyNieparzyste trySplit() {
int middle = (max + min) / 2;
SpliteratorLiczbyNieparzyste nowy = new SpliteratorLiczbyNieparzyste(min, middle);
min = middle;
return nowy;
}
@Override
public long estimateSize() {
return max - min;
}
@Override
public int characteristics() {
return ORDERED | DISTINCT | SIZED | SUBSIZED | NONNULL;
}
}
package parallel_stream.b_spliterator;
import java.util.Spliterator;
import java.util.function.Consumer;
public class SpliteratorPowolny implements Spliterator<Integer> {
private int max;
private int min;
private int niepotrzebne;
private final int SPOWOLNIENIE = 10000;
// domyślnie 100 liczb
public SpliteratorPowolny() {
this(100);
}
public SpliteratorPowolny(int ile) {
this(0, ile);
}
private SpliteratorPowolny(int min, int max) {
this.min = min;
this.max = max;
}
private int next() {
for(int i=0; i<SPOWOLNIENIE; i++) {
niepotrzebne++;
}
return 1 + 2 * min++;
}
private boolean hasNext() {
return min < max;
}
@Override
public boolean tryAdvance(Consumer<? super Integer> action) {
action.accept(this.next());
return hasNext();
}
@Override
public SpliteratorPowolny trySplit() {
int middle = (max + min) / 2;
SpliteratorPowolny nowy = new SpliteratorPowolny(min, middle);
min = middle;
return nowy;
}
@Override
public long estimateSize() {
return max - min;
}
@Override
public int characteristics() {
return ORDERED | DISTINCT | SIZED | SUBSIZED | NONNULL;
}
}
package parallel_stream.b_spliterator;
import java.util.Spliterator;
import java.util.function.IntConsumer;
public class SpliteratorPrimitive implements Spliterator.OfInt {
private int max;
private int min;
// domyślnie 100 liczb
public SpliteratorPrimitive() {
this(100);
}
public SpliteratorPrimitive(int ile) {
this(0, ile);
}
private SpliteratorPrimitive(int min, int max) {
this.min = min;
this.max = max;
}
private int next() {
return 1 + 2 * min++;
}
private boolean hasNext() {
return min < max;
}
@Override
public boolean tryAdvance(IntConsumer action) {
action.accept(this.next());
return hasNext();
}
@Override
public SpliteratorPrimitive trySplit() {
int middle = (max + min) / 2;
SpliteratorPrimitive nowy = new SpliteratorPrimitive(min, middle);
min = middle;
return nowy;
}
@Override
public long estimateSize() {
return max - min;
}
@Override
public int characteristics() {
return ORDERED | DISTINCT | SIZED | SUBSIZED | NONNULL;
}
}
package parallel_stream.b_spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Test1 {
// Stream<Integer> bez spowalniania. Niepotrzebny narzut na boxing
public static void main(String[] args) {
SpliteratorLiczbyNieparzyste spl1 = new SpliteratorLiczbyNieparzyste(10000000);
Stream<Integer> str1 = StreamSupport.stream(spl1, false);
MierzenieCzasu.uruchom(() -> str1.mapToInt(x->x).sum());
SpliteratorLiczbyNieparzyste spl2 = new SpliteratorLiczbyNieparzyste(10000000);
Stream<Integer> str2 = StreamSupport.stream(spl2, true);
MierzenieCzasu.uruchom(() -> str2.mapToInt(x->x).sum());
System.out.println();
}
}
package parallel_stream.b_spliterator;
import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Test2 {
// Spliterator sztucznie spowolniony, Stream<Integer>, widać zysk z parallel
public static void main(String[] args) {
Spliterator<Integer> spl1 = new SpliteratorPowolny(10000000);
Stream<Integer> str1 = StreamSupport.stream(spl1, false);
MierzenieCzasu.uruchom(() -> str1.mapToLong(x->x).sum());
Spliterator<Integer> spl2 = new SpliteratorPowolny(10000000);
Stream<Integer> str2 = StreamSupport.stream(spl2, true);
MierzenieCzasu.uruchom(() -> str2.mapToLong(x->x).sum());
}
}
package parallel_stream.b_spliterator;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
public class Test3 {
public static void main(String[] args) {
// IntStream bez żadnego boxingnu - działa najszybciej
SpliteratorPrimitive spl1 = new SpliteratorPrimitive(10000000);
IntStream str1 = StreamSupport.intStream(spl1, false);
MierzenieCzasu.uruchom(() -> str1.sum());
SpliteratorPrimitive spl2 = new SpliteratorPrimitive(10000000);
IntStream str2 = StreamSupport.intStream(spl2, true);
MierzenieCzasu.uruchom(() -> str2.sum());
}
}
package parallel_stream.b_spliterator;
import java.util.Spliterator;
import java.util.function.Consumer;
public class TestNieparzyste {
public static void main(String[] args) {
Consumer<Integer> akcja = i -> System.out.print(i + ", ");
SpliteratorLiczbyNieparzyste spl1 = new SpliteratorLiczbyNieparzyste(100);
spl1.forEachRemaining(akcja);
System.out.println();
System.out.println();
Spliterator<Integer> spl2 = new SpliteratorLiczbyNieparzyste(100);
Spliterator<Integer> a = spl2.trySplit();
Spliterator<Integer> b = a.trySplit();
System.out.println("Fragmenty:");
b.forEachRemaining(akcja);
System.out.println();
a.forEachRemaining(akcja);
System.out.println();
spl2.forEachRemaining(akcja);
System.out.println();
System.out.println();
}
}
package parallel_stream.b_spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class TestStrumienNieparzystych {
public static void main(String[] args) {
Consumer<Integer> akcja = i -> System.out.print(i + ", ");
SpliteratorLiczbyNieparzyste spl1 = new SpliteratorLiczbyNieparzyste(100);
Stream<Integer> str1 = StreamSupport.stream(spl1, false);
str1.forEach(akcja);
System.out.println();
System.out.println();
SpliteratorLiczbyNieparzyste spl2 = new SpliteratorLiczbyNieparzyste(100);
Stream<Integer> str2 = StreamSupport.stream(spl2, true);
str2.forEach(akcja);
System.out.println();
System.out.println();
SpliteratorLiczbyNieparzyste spl3 = new SpliteratorLiczbyNieparzyste(50);
Stream<Integer> str3 = StreamSupport.stream(spl3, false);
int suma3 = str3.mapToInt(x -> x).sum();
System.out.println(suma3);
SpliteratorLiczbyNieparzyste spl4 = new SpliteratorLiczbyNieparzyste(50);
Stream<Integer> str4 = StreamSupport.stream(spl4, true);
int suma4 = str4.mapToInt(x -> x).sum();
System.out.println(suma4);
}
}
package parallel_stream.b_spliterator;
import java.util.function.IntConsumer;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
public class TestStrumienPrimite {
public static void main(String[] args) {
IntConsumer akcja = i -> System.out.print(i + ", ");
SpliteratorPrimitive spl1 = new SpliteratorPrimitive(100);
IntStream str1 = StreamSupport.intStream(spl1, false);
str1.forEach(akcja);
System.out.println();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment