Commit bf83c600 by Patryk Czarnik

pozostałe przykłady dot. wątków

parent ae65b54a
package p28_watki.c_rwlock;
import java.util.Random;
public class CzytelnicyPisarze_BezSynchronizacji {
private int[] t = new int[1000];
private volatile boolean jeszcze = true;
public static void main(String[] args) {
new CzytelnicyPisarze_BezSynchronizacji().dzialaj();
}
private void dzialaj() {
System.out.println("Losuję liczby do tablicy");
Random random = new Random();
for(int i=0; i<t.length; i++) {
t[i] = random.nextInt(1000);
}
int suma = 0;
for(int x : t) {
suma += x;
}
System.out.println("Suma na początku = " + suma);
System.out.println("Uruchamiam wątki");
Thread c1 = new Thread(new Czytelnik());
Thread c2 = new Thread(new Czytelnik());
Thread z1 = new Thread(new Zamieniacz());
Thread z2 = new Thread(new Zamieniacz());
c1.start();
c2.start();
z1.start();
z2.start();
System.out.println("Jadą");
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
}
jeszcze = false;
System.out.println("Koniec");
try {
c1.join();
c2.join();
z1.join();
z2.join();
} catch (InterruptedException e) {
}
suma = 0;
for(int x : t) {
suma += x;
}
System.out.println("Suma na końcu = " + suma);
}
private class Zamieniacz implements Runnable {
public void run() {
Random random = new Random();
while(jeszcze) {
int i = random.nextInt(t.length);
int j = random.nextInt(t.length);
// swap
int x = t[i];
t[i] = t[j];
t[j] = x;
}
}
}
private class Czytelnik implements Runnable {
public void run() {
while(jeszcze) {
int suma = 0;
for(int x : t) {
suma += x;
}
System.out.println("Suma = " + suma);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
}
}
}
package p28_watki.c_rwlock;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class CzytelnicyPisarze_Lock {
private Lock lock = new ReentrantLock();
private int[] t = new int[1000];
private volatile boolean jeszcze = true;
public static void main(String[] args) {
new CzytelnicyPisarze_Lock().dzialaj();
}
private void dzialaj() {
System.out.println("Losuję liczby do tablicy");
Random random = new Random();
for(int i=0; i<t.length; i++) {
t[i] = random.nextInt(1000);
}
int suma = 0;
for(int x : t) {
suma += x;
}
System.out.println("Suma na początku = " + suma);
System.out.println("Uruchamiam wątki");
Thread c1 = new Thread(new Czytelnik());
Thread c2 = new Thread(new Czytelnik());
Thread z1 = new Thread(new Zamieniacz());
Thread z2 = new Thread(new Zamieniacz());
c1.start();
c2.start();
z1.start();
z2.start();
System.out.println("Jadą");
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
}
jeszcze = false;
System.out.println("Koniec");
try {
c1.join();
c2.join();
z1.join();
z2.join();
} catch (InterruptedException e) {
}
suma = 0;
for(int x : t) {
suma += x;
}
System.out.println("Suma na końcu = " + suma);
}
private class Zamieniacz implements Runnable {
public void run() {
Random random = new Random();
while(jeszcze) {
int i = random.nextInt(t.length);
int j = random.nextInt(t.length);
// swap
lock.lock();
try {
int x = t[i];
t[i] = t[j];
t[j] = x;
} finally {
lock.unlock();
}
}
}
}
private class Czytelnik implements Runnable {
public void run() {
while(jeszcze) {
int suma = 0;
lock.lock();
for(int x : t) {
suma += x;
// try {
// Thread.sleep(1);
// } catch (InterruptedException e) {
// }
}
lock.unlock();
System.out.println("Suma = " + suma);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
}
}
}
package p28_watki.c_rwlock;
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class CzytelnicyPisarze_RWLock {
private ReadWriteLock rw = new ReentrantReadWriteLock(true);
private int[] t = new int[1000];
private volatile boolean jeszcze = true;
public static void main(String[] args) {
new CzytelnicyPisarze_RWLock().dzialaj();
}
private void dzialaj() {
System.out.println("Losuję liczby do tablicy");
Random random = new Random();
for(int i=0; i<t.length; i++) {
t[i] = random.nextInt(1000);
}
int suma = 0;
for(int x : t) {
suma += x;
}
System.out.println("Suma na początku = " + suma);
System.out.println("Uruchamiam wątki");
Thread c1 = new Thread(new Czytelnik());
Thread c2 = new Thread(new Czytelnik());
Thread z1 = new Thread(new Zamieniacz());
Thread z2 = new Thread(new Zamieniacz());
c1.start();
c2.start();
z1.start();
z2.start();
System.out.println("Jadą");
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
}
jeszcze = false;
System.out.println("Koniec");
try {
c1.join();
c2.join();
z1.join();
z2.join();
} catch (InterruptedException e) {
}
suma = 0;
for(int x : t) {
suma += x;
}
System.out.println("Suma na końcu = " + suma);
}
private class Zamieniacz implements Runnable {
public void run() {
Random random = new Random();
while(jeszcze) {
int i = random.nextInt(t.length);
int j = random.nextInt(t.length);
// swap
rw.writeLock().lock();
int x = t[i];
t[i] = t[j];
t[j] = x;
rw.writeLock().unlock();
}
}
}
private class Czytelnik implements Runnable {
public void run() {
while(jeszcze) {
int suma = 0;
rw.readLock().lock();
for(int x : t) {
suma += x;
// try {
// Thread.sleep(1);
// } catch (InterruptedException e) {
// }
}
rw.readLock().unlock();
System.out.println("Suma = " + suma);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
}
}
}
package p28_watki.d_util;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomowyLicznik {
private static AtomicInteger licznik;
private static final int N = 10_000_000;
private static class Watek implements Runnable {
public void run() {
for(int i=0; i<N; i++) {
licznik.addAndGet(3); // +=
licznik.addAndGet(-3); // -=
// Number n = licznik; // OK
// licznik.incrementAndGet();// ++x
// licznik.decrementAndGet();// --x
// licznik.getAndIncrement(); // x++
// licznik.addAndGet(3); // +=
// licznik.getAndAdd(3); // try{return x} finally {x+=3}
// licznik.get();
// licznik.set(5);
// licznik.compareAndSet(50, 1000); // tylko jeśli aktualna wartość to 50, to ustaw ową wartość 1000
// Aby użycie takiego obiektu było poprawne, należy wykonywać operacje "jednym wywołaniem"
// np. poprawne jest
// licznik.addAndGet(5);
// a niepoprawne byłoby
// int x = licznik.get();
// licznik.set(x + 5);
// poprawne:
// licznik.compareAndSet(100, 0);
// niepoprawne
// if(licznik.get() == 100) {
// licznik.set(0);
// }
}
}
}
public static void main(String[] args) {
licznik = new AtomicInteger(10000);
System.out.println(licznik);
Thread th1 = new Thread(new Watek());
Thread th2 = new Thread(new Watek());
th1.start();
th2.start();
System.out.println("Uruchomiłem");
try {
th1.join();
th2.join();
} catch(InterruptedException e) {
}
System.out.println("Wątki zakończone");
System.out.println(licznik.get());
}
}
package p28_watki.d_util;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Bariery {
static final int N = 6;
static volatile boolean koniec = false;
public static void main(String[] args) {
final CyclicBarrier bariera = new CyclicBarrier(4);
class MojRunnable implements Runnable {
public void run() {
while(!koniec) {
try {
System.out.println("Czeka "+Thread.currentThread().getId());
bariera.await();
System.out.println("Doczekal "+Thread.currentThread().getId());
Thread.sleep(400+Thread.currentThread().getId()*100);
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
MojRunnable runable = new MojRunnable();
for(int i = 0; i < N; i++) {
new Thread(runable).start();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
}
System.out.println("main: reset");
koniec = true;
bariera.reset();
System.out.println("Koniec");
}
}
package p28_watki.d_util;
import java.util.concurrent.Phaser;
public class Fazery1 {
static boolean koniec = false;
static final int N = 3;
public static void main(String[] args) {
final Phaser ph = new Phaser(3);
class Watek implements Runnable {
public void run() {
while(!koniec) {
try {
System.out.println("Czeka "+Thread.currentThread().getId());
ph.arriveAndAwaitAdvance();
System.out.println("Doczekal "+Thread.currentThread().getId() + " faza "+ph.getPhase());
Thread.sleep(400+Thread.currentThread().getId()*100);
} catch (InterruptedException e) {
}
}
}
}
Watek runable = new Watek();
for(int i = 0; i < N; i++) {
new Thread(runable).start();
try {
Thread.sleep(700);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
}
koniec = true;
ph.forceTermination();
System.out.println("Koniec");
}
}
package p28_watki.d_util;
import java.util.concurrent.Phaser;
public class Fazery2 {
static boolean koniec = false;
static final int N = 3;
public static void main(String[] args) {
final Phaser ph = new Phaser(3);
class Watek implements Runnable {
private int nastFaza;
public Watek(boolean parzyste) {
nastFaza = parzyste ? 2 : 1;
}
public void run() {
while(!koniec) {
try {
Thread.sleep(500+Thread.currentThread().getId()*100);
int faza = ph.arrive();
System.out.println("Przybylem "+Thread.currentThread().getId() + " czekam na "+nastFaza);
ph.awaitAdvance(faza);
System.out.println("Doczekalem "+Thread.currentThread().getId() + " faza "+ph.getPhase());
nastFaza += 2;
} catch (InterruptedException e) {
}
}
}
}
for(int i = 0; i < N; i++) {
new Thread(new Watek(true)).start();
new Thread(new Watek(false)).start();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
}
koniec = true;
ph.forceTermination();
System.out.println("Koniec");
}
}
package p28_watki.d_util;
import java.util.concurrent.CountDownLatch;
public class Odliczanie {
public static void main(String[] args) {
final CountDownLatch guzik = new CountDownLatch(10);
Thread rakieta = new Thread(new Runnable() {
public void run() {
try {
System.out.println("Rakieta w przygotowaniu");
Thread.sleep(750);
System.out.println("Rakieta przygotowana...");
guzik.await();
System.out.println("Start!!!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread kontrola = new Thread(new Runnable() {
public void run() {
System.out.println("Kontrola zaczyna odliczanie");
while(guzik.getCount() > 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
guzik.countDown();
System.out.println("> " + guzik.getCount());
} } });
System.out.println("Ruszamy");
rakieta.start();
kontrola.start();
System.out.println("Koniec main");
}
}
package p28_watki.d_util;
import java.util.concurrent.Exchanger;
public class Wymiana {
public static void main(String[] args) {
final Exchanger<String> schowek = new Exchanger<>();
Thread th1 = new Thread(new Runnable() {
public void run() {
String imie = "Ala";
System.out.printf("Jestem %s w wątku %d%n", imie, Thread.currentThread().getId());
try {
Thread.sleep(100);
String twojeImie = schowek.exchange(imie);
System.out.printf("Wątek %d, twoje imię to %s %n", Thread.currentThread().getId(), twojeImie);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread th2 = new Thread(new Runnable() {
public void run() {
String imie = "Tomek";
System.out.printf("Jestem %s w wątku %d%n", imie, Thread.currentThread().getId());
try {
Thread.sleep(3000);
String twojeImie = schowek.exchange(imie);
System.out.printf("Wątek %d, twoje imię to %s %n", Thread.currentThread().getId(), twojeImie);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
th1.start();
th2.start();
}
}
package p28_watki.e_kolekcje;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/* W tym przykładzie porównuję szybkość działania ConcurrentHashMap i zwykłwej HashMap opakowanej w synchronizedMap.
* Program tworzy N wątków, które operują na słowniku String→Integer w taki sposób, że
* - losują liczbę od 1 do K - po konwersji na tekst staje się ona kluczem w słowniku,
* - wykonują operację modyfikacji zawartości słownika pod tym kluczem; aby koszt losowania itp. nie zaszumił kosztu samej mapy, operacja jest powtarzana kilkukrotnie z tym samym kluczem.
*/
public class ConcMap {
// liczba wątków
private static final int N = 16;
// wielkość słownika
private static final int K = 100;
// ilość powtórzeń jednej operacji
private static final int P = 50;
// ilość powtórzeń całości
private static final int R = 10_000;
// odkomentuj jedną z wersji i sprawdź
//private final Map<String, Integer> map = Collections.synchronizedMap(new HashMap<>());
private final Map<String, Integer> map = new ConcurrentHashMap<>();
//private final ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();
private void dzialaj() {
final ThreadLocalRandom random = ThreadLocalRandom.current();
for(int r=0; r < R; r++) {
final int k = random.nextInt(K);
final int d = random.nextInt(1000) - 500;
final String key = String.valueOf(k);
for(int p = 0; p < P; p++) {
Integer v = map.merge(key, d, (x, y) -> x+y);
//System.out.println(r + " " + p);
}
}
}
public static void main(String[] args) {
ConcMap instance = new ConcMap();
ExecutorService pool = Executors.newFixedThreadPool(N);
System.out.println("Start");
long start = System.currentTimeMillis();
for(int i = 0; i < N; i++) {
pool.submit(instance::dzialaj);
}
try {
pool.shutdown();
pool.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("Czas: " + (end - start));
}
}
package p28_watki.e_kolekcje;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyArray {
public static void main(String[] args) {
// Kolekcje "CopyOnWrite" kopiują całą swoją _wewnętrzną_ tablicę gdy tylko ktokolwiek cokolwiek modyfikuje.
// Jeśli wcześniej został utworzony iterator ("ktoś czyta kolekcję"),
// to NIE jest on unieważniany, tylko daje obraz kolekcji sprzed zmiany ("snapshot").
// Iteratory NIE wyrzucają ConcurrrentModificationException.
// Iteratory dają dostęp tylko do odczytu.
List<String> lista = new CopyOnWriteArrayList<>();
lista.add("Ala");
lista.add("Basia");
lista.add("Celina");
Iterator<String> it1 = lista.iterator();
lista.add("Dorota");
lista.add("Eliza");
Iterator<String> it2 = lista.iterator();
lista.add("Felicja");
lista.add("Grażyna");
System.out.print("it1: ");
while(it1.hasNext()) {
System.out.print(it1.next() + " ");
}
System.out.println();
System.out.print("it2: ");
while(it2.hasNext()) {
System.out.print(it2.next() + " ");
// it2.remove(); // EXN
}
System.out.println();
System.out.print("lista: ");
for(String x : lista) {
System.out.print(x + " ");
}
System.out.println();
}
}
package p28_watki.e_kolekcje;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
import java.util.Vector;
public class OpakowywanieSync {
public static void main(String[] args) {
// Vector, Hashtable, StringBuffer - stare klasy Javy, które są "thread-safe"
StringBuffer s;
Vector v;
Hashtable h;
List<String> zwykla = new ArrayList<>();
zwykla.add("Ala");
zwykla.add("Ola");
zwykla.add("Ela");
List<String> synchronizowana = Collections.synchronizedList(zwykla);
System.out.println(synchronizowana.getClass());
System.out.println("Zawartość zwykłej: " + zwykla);
System.out.println("Zawartość synchr: " + synchronizowana);
System.out.println();
zwykla.add("Ula");
synchronizowana.add("Ewa");
System.out.println("Zawartość zwykłej: " + zwykla);
System.out.println("Zawartość synchr: " + synchronizowana);
// Natomiast złą praktyką byłoby bezpośrednie korzystanie ze zmiennej zwykla.
// Dlatego najlepiej od razu tworzyć zmienną listową w taki sposób:
List<String> synchronizowana2 = Collections.synchronizedList(new ArrayList<>());
// Jeśli wątek wykonuje kilka operacji pod rząd, to są one synchronizowane KAŻDA OSOBNO
// Przykład błedu:
// Jeśli wiele wątków będzie wykonywać taki kod, to dwa wątki mogą usuwać element z jednoelementowej listy -> błąd
if(synchronizowana.size() > 0) {
// tutaj może coś zrobić inny wątek
synchronizowana.remove(0);
}
// Zalecanym podejściem jest wtedy wzięcie całej serii operacji w blok synchronizowany na obiekcie listy:
synchronized(synchronizowana) {
// skomplikowane operacje na liście...
if(synchronizowana.size() > 0) {
// teraz te dwie operacje będą wykonane atomowo
synchronizowana.remove(0);
}
for (String element : synchronizowana) {
// ...
// mamy pewność, że w czasie przeglądania inne wątki nie będą ruszać tej listy
}
}
}
}
package p28_watki.e_kolekcje;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// Kolejki blokujące mają dodatkowo metody take i put, które działają w sposób blokujący.
public class ProdKons1 {
private static final int ILE_RAZY = 30;
private static final int N = 5;
private BlockingQueue<Integer> kolejka = new ArrayBlockingQueue<>(N);
public static void main(String[] args) {
ProdKons1 program = new ProdKons1();
program.dzialaj();
}
private void dzialaj() {
Thread producent = new Thread(new Producent());
Thread konsument = new Thread(new Konsument());
System.out.println("Startujemy");
producent.start();
konsument.start();
try {
producent.join();
} catch (InterruptedException e) {
}
try {
konsument.join();
} catch (InterruptedException e) {
}
System.out.println("Koniec. size="+kolejka.size());
}
private class Producent implements Runnable {
public void run() {
for(int i=1; i<=ILE_RAZY; i++) {
int x = (int) (Math.random() * 1000);
System.out.println("P: wstawiam " + x);
try {
kolejka.put(x);
System.out.println("P: Wstawiłem, size=" + kolejka.size() );
Thread.sleep(300 + x);
} catch (InterruptedException e) {
}
}
}
}
private class Konsument implements Runnable {
public void run() {
for(int i=1; i<=ILE_RAZY; i++) {
try {
Thread.sleep(100);
System.out.println(" K: Biorę...");
int x = kolejka.take();
//int x = kolejka.poll(100, TimeUnit.DAYS);
System.out.println(" K: ... Pobrałem " + x);
Thread.sleep(2*x);
} catch (InterruptedException e) {
}
}
}
}
}
package p28_watki.e_kolekcje;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProdKons2 {
private static final int ILE_RAZY = 30;
private BlockingQueue<Integer> kolejka = new LinkedBlockingQueue<>();
public static void main(String[] args) {
ProdKons2 program = new ProdKons2();
program.dzialaj();
}
private void dzialaj() {
Thread producent = new Thread(new Producent());
Thread konsument = new Thread(new Konsument());
System.out.println("Startujemy");
producent.start();
konsument.start();
try {
producent.join();
} catch (InterruptedException e) {
}
try {
konsument.join();
} catch (InterruptedException e) {
}
System.out.println("Koniec. size="+kolejka.size());
}
private class Producent implements Runnable {
public void run() {
for(int i=1; i<=ILE_RAZY; i++) {
int x = (int) (Math.random() * 1000);
System.out.println("P: wstawiam " + x);
try {
kolejka.put(x);
System.out.println("P: Wstawiłem, size=" + kolejka.size() );
Thread.sleep(300 + x);
} catch (InterruptedException e) {
}
}
}
}
private class Konsument implements Runnable {
public void run() {
for(int i=1; i<=ILE_RAZY; i++) {
try {
Thread.sleep(100);
System.out.println(" K: Biorę...");
int x = kolejka.take();
System.out.println(" K: ... Pobrałem " + x);
Thread.sleep(2*x);
} catch (InterruptedException e) {
}
}
}
}
}
package p28_watki.f_pule;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ThreadLocalRandom;
public class ForkJoin1_SumArray {
static class SumArray extends ForkJoinTask<Long> {
private Long result;
private byte[] array;
private int from, to;
public SumArray(byte[] array, int from, int to) {
this.array = array;
this.from = from;
this.to = to;
}
@Override
public Long getRawResult() {
return result;
}
@Override
protected void setRawResult(Long value) {
this.result = value;
}
@Override
protected boolean exec() {
if(to - from == 1) {
this.setRawResult(Long.valueOf(array[from]));
} else {
int middle = (from + to) / 2;
SumArray left = new SumArray(array, from, middle);
SumArray right = new SumArray(array, middle, to);
left.fork();
right.fork();
try {
Long part1 = left.get();
Long part2 = right.get();
setRawResult(part1 + part2);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
setRawResult(0L);
}
}
return true;
}
}
public static void main(String[] args) {
byte[] tab = new byte[160_000_000];
final ThreadLocalRandom random = ThreadLocalRandom.current();
System.out.println("Losowanie...");
for(int i = 0; i < tab.length; i++)
tab[i] = (byte)random.nextInt(256);
Long result;
System.out.println("\nLiczenie sekwencyjne:");
long start = System.currentTimeMillis();
result = sumaSekwencyjnie(tab);
long stop = System.currentTimeMillis();
System.out.println("Result = " + result + " , czas = " + (stop - start));
System.out.println("\nLiczenie fork/join:");
final ForkJoinPool pool = new ForkJoinPool(8);
SumArray task = new SumArray(tab, 0, tab.length);
System.out.println("Start");
start = System.currentTimeMillis();
result = pool.invoke(task);
stop = System.currentTimeMillis();
System.out.println("Result = " + result + " , czas = " + (stop - start));
}
private static long sumaSekwencyjnie(byte[] tab, int from, int to) {
long suma = 0;
while(from < to)
suma += tab[from++];
return suma;
}
private static long sumaSekwencyjnie(byte[] tab) {
return sumaSekwencyjnie(tab, 0, tab.length);
}
}
package p28_watki.f_pule;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ThreadLocalRandom;
public class ForkJoin2_SumArray_Limit_Get {
static class SumArray extends ForkJoinTask<Long> {
private Long result;
private byte[] array;
private int from, to;
public SumArray(byte[] array, int from, int to) {
this.array = array;
this.from = from;
this.to = to;
}
@Override
public Long getRawResult() {
return result;
}
@Override
protected void setRawResult(Long value) {
this.result = value;
}
@Override
protected boolean exec() {
if(to - from <= 10_000_000) {
Long x = sumaSekwencyjnie(array, from, to);
this.setRawResult(x);
} else {
int middle = (from + to) / 2;
SumArray left = new SumArray(array, from, middle);
SumArray right = new SumArray(array, middle, to);
left.fork();
right.fork();
try {
Long part1 = left.get();
Long part2 = right.get();
setRawResult(part1 + part2);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
setRawResult(0L);
return false;
}
}
return true;
}
}
public static void main(String[] args) {
byte[] tab = new byte[160_000_000];
final ThreadLocalRandom random = ThreadLocalRandom.current();
System.out.println("Losowanie...");
for(int i = 0; i < tab.length; i++)
tab[i] = (byte)random.nextInt(256);
Long result;
final ForkJoinPool pool = new ForkJoinPool(8);
System.out.println("\nLiczenie sekwencyjne:");
long start = System.currentTimeMillis();
result = sumaSekwencyjnie(tab);
long stop = System.currentTimeMillis();
System.out.println("Result = " + result + " , czas = " + (stop - start));
System.out.println("\nLiczenie fork/join:");
SumArray task = new SumArray(tab, 0, tab.length);
System.out.println("Start");
start = System.currentTimeMillis();
result = pool.invoke(task);
stop = System.currentTimeMillis();
System.out.println("Result = " + result + " , czas = " + (stop - start));
}
private static Long sumaSekwencyjnie(byte[] tab, int from, int to) {
long suma = 0;
while(from < to)
suma += tab[from++];
return suma;
}
private static Long sumaSekwencyjnie(byte[] tab) {
return sumaSekwencyjnie(tab, 0, tab.length);
}
}
package p28_watki.f_pule;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ThreadLocalRandom;
public class ForkJoin3_SumArray_Limit_Join {
static class SumArray extends ForkJoinTask<Long> {
private Long result;
private byte[] array;
private int from, to;
public SumArray(byte[] array, int from, int to) {
this.array = array;
this.from = from;
this.to = to;
}
@Override
public Long getRawResult() {
return result;
}
@Override
protected void setRawResult(Long value) {
this.result = value;
}
@Override
protected boolean exec() {
if(to - from <= 10_000_000) {
Long x = sumaSekwencyjnie(array, from, to);
this.setRawResult(x);
} else {
int middle = (from + to) / 2;
SumArray left = new SumArray(array, from, middle);
SumArray right = new SumArray(array, middle, to);
left.fork();
right.fork();
Long part1 = left.join();
Long part2 = right.join();
setRawResult(part1 + part2);
// join nie deklaruje żadnych wyjątków, a get deklaruje Interrupted i ExecutionException
}
return true;
}
}
public static void main(String[] args) {
byte[] tab = new byte[160_000_000];
final ThreadLocalRandom random = ThreadLocalRandom.current();
System.out.println("Losowanie...");
for(int i = 0; i < tab.length; i++)
tab[i] = (byte)random.nextInt(256);
Long result;
final ForkJoinPool pool = new ForkJoinPool(8);
System.out.println("\nLiczenie sekwencyjne:");
long start = System.currentTimeMillis();
result = sumaSekwencyjnie(tab);
long stop = System.currentTimeMillis();
System.out.println("Result = " + result + " , czas = " + (stop - start));
System.out.println("\nLiczenie fork/join:");
SumArray task = new SumArray(tab, 0, tab.length);
System.out.println("Start");
start = System.currentTimeMillis();
result = pool.invoke(task);
stop = System.currentTimeMillis();
System.out.println("Result = " + result + " , czas = " + (stop - start));
}
private static Long sumaSekwencyjnie(byte[] tab, int from, int to) {
long suma = 0;
while(from < to)
suma += tab[from++];
return suma;
}
private static Long sumaSekwencyjnie(byte[] tab) {
return sumaSekwencyjnie(tab, 0, tab.length);
}
}
package p28_watki.f_pule;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ThreadLocalRandom;
public class ForkJoin4_SumArray_RecursiveTask {
// Wersja, gdzie korzystamy z klasy RecursiveTask (podklasa ForkJoinTask),
// która realizuje dokładnie taki schemat, o jaki nam chodzi: zadania zwracające wynik, które da się dzielić na mniejsze.
static class SumArray extends RecursiveTask<Long> {
private byte[] array;
private int from, to;
public SumArray(byte[] array, int from, int to) {
this.array = array;
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
if(to - from <= 10_000_000) {
Long x = sumaSekwencyjnie(array, from, to);
return x;
} else {
int middle = (from + to) / 2;
SumArray left = new SumArray(array, from, middle);
SumArray right = new SumArray(array, middle, to);
left.fork();
right.fork();
Long part1 = left.join();
Long part2 = right.join();
return part1 + part2;
}
}
}
public static void main(String[] args) {
byte[] tab = new byte[160_000_000];
final ThreadLocalRandom random = ThreadLocalRandom.current();
System.out.println("Losowanie...");
for(int i = 0; i < tab.length; i++)
tab[i] = (byte)random.nextInt(256);
Long result;
final ForkJoinPool pool = new ForkJoinPool(8);
System.out.println("\nLiczenie sekwencyjne:");
long start = System.currentTimeMillis();
result = sumaSekwencyjnie(tab);
long stop = System.currentTimeMillis();
System.out.println("Result = " + result + " , czas = " + (stop - start));
System.out.println("\nLiczenie fork/join:");
SumArray task = new SumArray(tab, 0, tab.length);
System.out.println("Start");
start = System.currentTimeMillis();
result = pool.invoke(task);
stop = System.currentTimeMillis();
System.out.println("Result = " + result + " , czas = " + (stop - start));
}
private static Long sumaSekwencyjnie(byte[] tab, int from, int to) {
long suma = 0;
while(from < to)
suma += tab[from++];
return suma;
}
private static Long sumaSekwencyjnie(byte[] tab) {
return sumaSekwencyjnie(tab, 0, tab.length);
}
}
package p28_watki.f_pule;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ThreadLocalRandom;
/* Przykład z dokumentacji klasy RecursiveAction - sortowanie tablicy */
public class ForkJoin5_SortArray_RecursiveAction {
static class SortTask extends RecursiveAction {
final long[] array;
final int lo, hi;
SortTask(long[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
SortTask(long[] array) {
this(array, 0, array.length);
}
protected void compute() {
if (hi - lo < THRESHOLD)
sortSequentially(lo, hi);
else {
int mid = (lo + hi) >>> 1;
invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi));
merge(lo, mid, hi);
}
}
// implementation details follow:
static final int THRESHOLD = 1000;
void sortSequentially(int lo, int hi) {
Arrays.sort(array, lo, hi);
}
void merge(int lo, int mid, int hi) {
long[] buf = Arrays.copyOfRange(array, lo, mid);
for (int i = 0, j = lo, k = mid; i < buf.length; j++)
array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];
}
}
public static void main(String[] args) {
long[] tab = new long[80_000_000];
final ThreadLocalRandom random = ThreadLocalRandom.current();
System.out.println("Losowanie...");
for(int i = 0; i < tab.length; i++)
tab[i] = random.nextInt(256);
final ForkJoinPool pool = new ForkJoinPool(8);
SortTask task = new SortTask(tab);
System.out.println("Start");
long start = System.currentTimeMillis();
pool.invoke(task);
long stop = System.currentTimeMillis();
System.out.println("OK, czas = " + (stop - start));
}
}
package p28_watki.f_pule;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PuleWatkow {
public static void main(String[] args) throws InterruptedException, ExecutionException {
final int N = 100; // liczba procedur do wykonania
// ExecutorService pool = Executors.newSingleThreadExecutor();
ExecutorService pool = Executors.newFixedThreadPool(20);
// ExecutorService pool = Executors.newCachedThreadPool(); // tworzy wątek, gdy tylko brakuje robotników
// ExecutorService pool = Executors.newWorkStealingPool(); // od Javy 8 - stara się wykorzystać wszystkie procesory
// ExecutorService pool = Executors.newWorkStealingPool(2); // wersja z ograniczeniem współbieżności do pewnego poziomu
// ScheduledExecutorService pool = Executors.newScheduledThreadPool(20); // pozwala planować zadania na przyszłość
Procedura zadanie = new Procedura();
System.out.println("Zaczynam zlecać...");
for(int i = 0; i < N; i++) {
pool.submit(zadanie);
// pool.execute(zadanie);
// pool.schedule(zadanie, 2, TimeUnit.SECONDS);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
System.out.println("Zlecilem wykonanie");
pool.shutdown();
//pool.shutdownNow();
System.out.println("Po shutdown");
//pool.submit(() -> {});
try {
pool.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Zakończyły się");
} catch (InterruptedException e) {
}
System.out.println("Koniec main");
}
private static class Procedura implements Runnable {
public void run() {
long id = Thread.currentThread().getId();
System.out.printf("Hej, tu watek %d%n", id);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
System.out.println("interrupt w " + id);
}
}
}
}
package p28_watki.f_pule;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class PuleWatkowCallable {
public static void main(String[] args) throws InterruptedException, ExecutionException {
final int N = 10; // liczba procedur do wykonania
final int M = 2; // liczba watkow w puli
ExecutorService pool = Executors.newFixedThreadPool(M);
Future<String> gfuture = null;
Procedura proc = new Procedura();
for(int i = 0; i < N; i++) {
Future<String> future = pool.submit(proc);
// System.out.println(future.isDone());
gfuture = future;
}
System.out.println("Zlecilem wykonanie");
pool.shutdown();
System.out.println("Po shutdown");
System.out.println(gfuture.isDone());
String wynik = gfuture.get(); // to powoduje oczekiwanie na zakończenie zadania
System.out.println(gfuture.isDone() + wynik);
//pool.shutdownNow();
pool.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Zakończyły się");
System.out.println(gfuture.isDone());
}
private static class Procedura implements Callable<String> {
@Override
public String call() throws Exception {
System.out.printf("Hej, tu watek %d%n", Thread.currentThread().getId());
try {
Thread.sleep(20);
} catch (InterruptedException e) {
}
return " Wynik, wypisał wątek nr " + Thread.currentThread().getId();
}
}
}
package p28_watki.z_ilustracje_nie_do_uruchamiania;
public class Afrykarium1 {
// wolne miejsca w środku
private int wolne = 2000;
public synchronized void wpuśćWycieczkę(int ilu) {
try {
while(wolne < ilu) {
this.wait();
}
wolne -= ilu;
// otwórz bramki
} catch(InterruptedException e) {
}
}
public synchronized void zwiedzającyWychodzi() {
wolne++;
notify();
}
}
package p28_watki.z_ilustracje_nie_do_uruchamiania;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Afrykarium2 {
// wolne miejsca w środku
private int miejsca = 2000;
// synchronizacja
private Lock ochrona = new ReentrantLock();
private Condition czekanieNaMiejsca = ochrona.newCondition();
public void wpuśćWycieczkę(int ilu) {
try {
ochrona.lock();
while(miejsca < ilu) {
czekanieNaMiejsca.await();
}
miejsca -= ilu;
// otwórz bramki
} catch(InterruptedException e) {
} finally {
ochrona.unlock();
}
}
public void zwiedzającyWychodzi() {
try {
ochrona.lock();
miejsca++;
czekanieNaMiejsca.signal();
} finally {
ochrona.unlock();
}
}
}
package p28_watki.z_ilustracje_nie_do_uruchamiania;
import java.util.concurrent.Semaphore;
public class Afrykarium3 {
// wolne miejsca w środku
private Semaphore miejsca = new Semaphore(2000, true);
public void wpuśćWycieczkę(int ilu) {
try {
// zmniejsza wartość semafora o ilu,
// ale jeśli wartość semafora < ilu, to czeka, aż semafor uzyska odp. wartość.
miejsca.acquire(ilu); // -= akademicko : P
} catch(InterruptedException e) {
}
// otwórz bramki
}
public void zwiedzającyWychodzi() {
miejsca.release(); // ++ akademicko: V
}
}
package p28_watki.z_ilustracje_nie_do_uruchamiania;
public class InstancyjnaIStatyczna {
synchronized void instancyjna1() {
}
synchronized void instancyjna2() {
// instancyjne synchronizują się na poszczególnych obiektach
}
static synchronized void statyczna1() {
}
static synchronized void statyczna2() {
// statyczne wzajemnie się synchronizują
// na obiekcie InstancyjnaIStatyczna.class
// statyczne i instancyjne wzajemnie się nie synchronizują
}
void metoda() {
// gdybym chciał się zsynchronizować z metodami statycznymi, to mogę tak:
synchronized(InstancyjnaIStatyczna.class) {
}
// tak jak
synchronized(this) {
// jest synchronizacją na bieżącym obiekcie
// z tym że tutaj i tak nic nie robi, bo jesteśmy w metodzie synchronized
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment