Laborator 6

Tematica laboratorului

Storm in mod cluster

Storm rulat in mod distribuit la nivel de cluster include un nod master nimbus pentru administrarea topologiei, si mai multe noduri supervizor care au rol de workers (instante JVM) ce vor rula topologia efectiva. Pentru coordonare Storm foloseste Apache ZooKeeper ce poate in mod normal sa fie rulat pe alte noduri. Exista posibilitatea pentru testare de a rula modul distribuit (cluster) si pe aceeasi masina, diferit de modul local obisnuit de test (nimbus, supervizor si ZooKeeper rulati pe acelasi nod).

Nodul nimbus - descriere generala

Pentru a lansa o topologie aceasta este transmisa sub forma unui JAR care include clasele care o compun nodului nimbus, impreuna cu informatii de configurare. Nodul nimbus va distribui respectivul JAR la un numar de noduri supervizor, catre care asigneaza task-uri spout si bolt spre a fi rulate de workeri (instantele JVM de pe masinile respective). Nodul nimbus monitorizeaza starea clusterului, in cazul unei caderi de supervizor re-asignand task-urile de pe supervizorul respectiv. Daca nodul nimbus cade, topologia isi continua executia in principiu fara probleme. Singurul caz de esuare a topologiei din cauza caderilor e momentul in care nodul nimbus cade, fiind urmat de o cadere de supervizor din cauza ca nu mai exista responsabil cu re-asignarea taskurilor.

Nodurile supervizor - descriere generala

Nodurile supervizor ruleaza un demon supervizor, care lanseaza instante de proces JVM pentru a rula task-uri trimise de nodul nimbus. Daca un worker JVM cade, demonul supervizor va incerca sa il relanseze. Daca un nod supervizor cade (sau chiar doar worker-ul) tuplele aflate in tranzit spre procesare nu se pierd neaparat daca se foloseste mecanismul de anchoring prezentat in laboratoarele anterioare. Tuplul respectiv nu mai e confirmat cu ack() de-a lungul fluxului si se decide practic retrimiterea lui.

Nodurile ZooKeeper - descriere generala

ZooKeper ofera un serviciu complet transparent pentru utilizatorul platformei Storm. Este folosit de catre platforma cu scop de coordonare. Pe scurt un server ZooKeeper mentine un spatiu logic similar cu o structura arborescenta de directoare in care procesele coordonate pot scrie cu anumite garantii de sincronizare. De asemeni ofera si facilitatea unui proces de a seta "watch-uri" pe structura respectiva pentru a fi notificat cand o intrare s-a modificat sau a fost creata. Nodurile nimbus si supervizor se sincronizeaza prin folosirea ZooKeeper ca intermediar, folosindu-l inclusiv pentru comunicare in acest scop (scriere/citire in structura mentinuta de ZooKeeper). ZooKeeper nu este insa folosit ca intermediar si pentru transferuri de dimensiuni mari cum ar fi topologia continuta in fisierul JAR. Pentru aceasta comunicare intre procese de pe noduri diferite Storm foloseste serializarea pusa la dispozitie de Thrift.

Instalarea Storm in mod cluster

Instalati ZooKeeper:

Cea mai simpla modalitate de instalare Zookeeper pe o distributie Ubuntu sau Lubuntu este prin intermediul managerului de pachete:

sudo apt-get install zookeeper zookeeperd

Aceasta modalitate poate rezulta insa in incompatibilitati la rulare in functie de versiunea Storm sau alte pachete necesare de pe distributia unde se face instalarea, daca versiunea de Zookeeper nu este ultima stabila. Pentru o instalare a ultimei versiuni stabile de Zookeeper, se poate descarca pachetul corespunzator de la aceasta adresa, si dezarhiva spre exemplu in directorul /opt/zookeeper.
Este recomandata crearea unui director de date pentru rularile Zookeeper, cu drepturi de scriere pentru utilizatorul ce va rula serverul Zookeeper:

sudo mkdir -p /data/zookeper
sudo chown -R [user:user] /data/zookeeper
sudo chown -R [user:user] /opt/zookeeper

In directorul in care Zookeper a fost dezarhivat trebuie editata o configurare de baza pentru rularea Zookeper. Un exemplu e urmatorul (detalii pentru optiuni se pot regasi in documentatia Zookeeper):

sudo nano /opt/zookeeper/conf/zoo.cfg
tickTime = 2000
dataDir = /data/zookeeper
clientPort = 2181 	   //default - 2181 - indica portul pentru clienti
initLimit = 5
syncLimit = 2
admin.serverPort = 8089    //default - 8080 - indica portul unui serviciu de administrare

Pornirea, respectiv oprirea serverului Zookeeper se poate face prin comenzile:

sudo /opt/zookeeper/bin/zkServer.sh start
sudo /opt/zookeeper/bin/zkServer.sh stop

Conectarea la serverul Zookeeper pornit pentru a verifica functionalitatea acestuia se poate realiza folosind un client de test (comenzile disponibile sunt vizibile la apelarea "help"):

sudo /opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181

Obtineti si dezarhivati arhiva binarelor Storm din pagina platformei.
Creati doua symlinkuri pentru acces mai usor:

sudo ln -s (cale/storm) /usr/share/storm
sudo ln -s /usr/share/storm/bin/storm /usr/bin/storm

Pentru a tolera eventualele caderi e bine ca demonii Storm sa fie rulati in mod supervizat, adica monitorizati pentru cadere cu optiune de repornire imediata in acest caz. Acest mod de rulare necesita instalarea unui demon de supervizare, si configurarea acestuia:

sudo apt-get install supervisor

Se creeaza un director pentru rulare:

mkdir storm

Se creeaza urmatoarele fisiere de configurare in /etc/supervisor/conf.d (pentru nodurile supervizor e necesara doar configurarea pentru supervizor):

  • storm-nimbus.conf:
    [program:storm-nimbus]		//identificator unic al procesului
    command=storm nimbus		//comanda rulata
    directory=[path to]/storm 	//directorul in care va fi rulata comanda
    autorestart=true		//specificarea repornirii in caz de cadere
    user=username			//userul care e proprietarul procesului
    
  • storm-supervisor.conf:
    [program:storm-supervisor]	
    command=storm supervisor	
    directory=[path to]/storm 	
    autorestart=true		
    user=username
    
  • storm-ui.conf
    [program:storm-ui]	
    command=storm ui	
    directory=[path to]/storm 	
    autorestart=true		
    user=username
    

Se porneste demonul de supervizare, care va porni si demonii Storm:

sudo /etc/init.d/supervisor stop
sudo /etc/init.d/supervisor start

Dupa aceasta pornire se poate consulta in browser statusul clusterului Storm la adresa http://localhost:8080 . Logurile mentinute de Storm se pot regasi in directorul logs al instalarii. Configurarea pentru rularea Storm este localizata in directorul conf/storm.yaml al instalarii Storm. Orice configurare din acest fisier suprascrie practic o evantuala varianta default de configurare aflata in conf/defaults.yaml. Optiunile principale de configurare sunt:

  • storm.zookeeper.servers: serverele pe care ruleaza ZooKeeper (default: "localhost")
  • nimbus.seeds: statia pe care ruleaza demonul nimbus (default: "localhost")
  • supervisor.slots.ports: porturile pe care vor rula procesele worker la nivelul masinii supervisor, lista definind si numarul maxim de workers (default: 6700-6703, se specifica ca lista cu intrari pe fiecare linie introduse de caracterul "- ")
  • storm.local.dir: directorul in care procesul nimbus si cele supervisor vor stoca diverse date necesare (JAR-ul cu topologia, configurari, etc)

Dupa versiunea Java 9, in fisierul de configurare este recomandata si adaugarea unei optiuni ce priveste logurile worker-ilor (instantele JVM), altfel pornirea masinii virtuale fiind posibil sa esueze daca foloseste varianta default de configurare. O varianta corecta de configurare e precizata la acest link.

Pregatirea topologiei si rularea in mod cluster

Pentru a rula topologia in mod cluster e suficient ca in functia main() a topologiei definita initial sa fie adaugata o ramura alternativa celei locale (care incepe cu declararea variabilei de tip LocalCluster) in care topologia va fi trimisa spre rulare din cadrul clasei StormSubmitter (numele topologiei fiind primit ca prim argument al programului):

if(args.length == 0){
	...
} else{
	StormSubmitter.submitTopology(args[0], config, builder.createTopology());
}

Topologia poate fi exportata din Eclipse ca un fisier Jar selectand pachetele cu clasele de inclus, click dreapta, Export, Java, JAR file. Se includ toate resursele necesare pentru export, si in ultimul pas se poate selecta clasa principala din topologie (ce include main() ).
Rularea in mod cluster se face executand comanda:

storm jar (path/to/jar) (cale.pachet.clasa.principala) (nume_topologie)

Dupa rulare, statusul topologiei se poate monitoriza in Storm UI (http://localhost:8080). Tot din storm UI se poate termina topologia, sau alternativ folosind comanda:

storm kill (nume_topologie) [-w timp_asteptare]

© 2025 Emanuel Onica. Parts of design by W3Layouts