Laborator 3
Tematica laboratorului
1. Paralelism local in Storm
Factorul implicit de paralelism in Storm este 1 pentru rularea in paralel a topologiei de operatori (instante de spouts, bolts si fire de executie asociate). Executia unei topologii poate fi insa paralelizata la nivelul urmatoarelor unitati:
- Noduri: numarul de masini fizic care participa la rularea unei topologii
- Workers: procese JVM diferite care ruleaza o topologie Storm pe un nod
- Executors: threaduri de executie a metodelor nextTuple() si execute() din operatorii dintr-o topologie care ruleaza intr-un worker
- Tasks: numarul de instante pentru un operator care sunt rulate, fiind implicit una pe executor
In acest laborator ne vom axa pe partea de paralelism la nivel de nod (workers, executors, tasks).
Numarul de workers este setat pentru o topologie prin metoda setNumWorkers() apelata pentru parametrul de configurare a topologiei respective. De retinut este ca intr-o rulare in mod local numarul efectiv de procese JVM va fi 1, aceasta setare fiind doar simulata prin executia unui thread aditional in JVM-ul respectiv. Apelul de mai jos seteaza 2 workers intr-o configurare.
config.setNumWorkers(2);
Numarul de executors este configurat printr-un hint pasat ca ultim parametru functiei de constructie a topologiei. Automat, numarul de taskuri rulate va fi egal cu numarul de threaduri de executie. De exemplu pentru a paraleliza operatorul Spout din topologie, putem executa functia de mai jos:
builder.setSpout(SPOUT_ID, spout, 2);
Numarul de taskuri poate fi setat explicit prin functia setNumTasks() apelata in constructia topologiei similar contextului urmator:
builder.setBolt(SPLIT_BOLT_ID, splitbolt, 2).setNumTasks(4).shuffleGrouping(SPOUT_ID);
In acest exemplu operatorul bolt de impartire in cuvinte va fi rulat cu 2 threaduri de executie, fiecare thread ruland 2 instante ale operatorului (mai precis metoda execute() din acestea), deci in total 4 taskuri.
2. Gruparea fluxului de tuple
Sa consideram urmatorul exemplu complet de constructie a unei topologii:
builder.setSpout(SPOUT_ID, spout, 2);
builder.setBolt(SPLIT_BOLT_ID, splitbolt, 2).setNumTasks(4).shuffleGrouping(SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countbolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
builder.setBolt(TERMINAL_BOLT_ID, terminalbolt).globalGrouping(COUNT_BOLT_ID);
Putem observa ca in completarea fiecarui nou operator adaugat la topologie se stabileste un asa zis mod de "grupare". Acesta reprezinta modalitatea de rutare a fluxului de date de la operatorul anterior la cel curent, din punct de vedere al impartirii intre taskuri. Exista urmatoarele posibilitati de grupare:
- shuffle: tuplele sunt distribuite random si balansat, astfel incat fiecare task sa proceseze un numar egal
- fields: tuplele sunt grupate catre taskuri pe baza valorii specificate in campuri; doua tuple cu aceeasi valoare vor fi procesate totdeauna de acelasi task
- global: toate tuplele sunt distribuite catre un singur task, de regula cel care are cel mai mic ID intern
- all: fiecare tuplu este replicat catre toate taskurile, astfel incat fiecare task primeste cate o copie
- localorshuffle: distributia se va orienta preferential catre taskurile care sunt rulate local in mod random si balansat intre acestea, inainte de a folosi taskuri rulate pe alt worker; daca nu exista mai multe taskuri rulate local, distributia e echivalenta shuffle
- direct: distributia se va face directionat catre un anume task pe baza ID-ului acestuia; pentru a folosi aceasta grupare fluxul (streamul) trebuie sa fie declarat ca direct in metodele declareOutputFields() si tuplele sa fie emise cu functia emitDirect(); ID-urile de task se pot obtine din cadrul contextului topologiei (in metoda prepare())
- custom: distributia se va face pe baza definirii unui mod specific de grupare (detalii mai jos)
Programatorul poate defini o modalitate proprie de grupare prin implementarea interfetei CustomStreamGrouping, pasand o instanta a clasei definite la metoda de grupare customGrouping() cand topologia este creata.
public interface CustomStreamGrouping extends Serializable {
void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
List<Integer> chooseTasks(int taskId, List<Object> values);
}
Metoda prepare() va include codul de initializare pentru realizarea gruparii, fiind apelata automat la rularea topologiei in clasa respectiva.
Aici se pot obtine informatii utile pentru stabilirea unui mod de rutare din contextul topologiei primit ca parametru.
De asemeni parametrul stream ofera informatii despre fluxul de date, iar targetTasks include id-urile taskurilor ce pot fi considerate in stabilirea gruparii.
Metoda chooseTasks() are rolul de a comunica platformei lista returnata de id-uri de taskuri pentru care se va face gruparea. Parametrii primiti reprezinta id-ul taskului curent, si lista de valori a tuplului.
Surse actualizate topologie numarare cuvinte:
- App.java
- MyGrouping.java
- SourceTextSpout.java
- SplitTextBolt.java
- WordCountBolt.java
- TerminalBolt.java
Exercitiu
Modificati gruparea custom definita anterior, si eventual si fluxul de date generat in spout, pentru a implementa o functionalitate de tip "load balancer" in ce priveste impartirea fluxului de date pe task-uri. De exemplu, dimensiunea tuplelor poate fi considerata ca o metrica de "load" (incarcare) a activitatii iar pentru pastrarea unei medii balansate a acesteia pe o anumita fereastra, un tuplu poate fi directionat spre un task sau altul pentru a pastra o incarcare echilibrata (similar se poate simula integrand in tuplu o valoare numerica care sa indice dificultatea de procesare a tuplului respectiv).