aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex AUVOLAT <alex.auvolat@ens.fr>2014-05-25 23:26:38 +0200
committerAlex AUVOLAT <alex.auvolat@ens.fr>2014-05-25 23:26:38 +0200
commit4d3f12f167729ecb5de5b9bf8e18f9eca52beced (patch)
treec03ca045d64ea87e7886032dd5dc215df4ab747d
parentaefaa158e36cd65afa98d6b7c3f0a3d0717e13a4 (diff)
downloadSystemeReseaux-Projet-4d3f12f167729ecb5de5b9bf8e18f9eca52beced.tar.gz
SystemeReseaux-Projet-4d3f12f167729ecb5de5b9bf8e18f9eca52beced.zip
Add README, clean up a little.
-rw-r--r--README109
-rw-r--r--src/kahn_seq.ml10
-rw-r--r--src/manager.ml28
-rw-r--r--src/poolserver.ml2
4 files changed, 133 insertions, 16 deletions
diff --git a/README b/README
new file mode 100644
index 0000000..9928b3e
--- /dev/null
+++ b/README
@@ -0,0 +1,109 @@
+Projet de Système et Réseaux 2014
+Alex AUVOLAT, Jean FABRE-MONPLAISIR
+
+------------------------------------
+
+Description du projet
+=====================
+
+Implémentation des réseaux de Kahn en OCaml pour permettre la programmation
+parallèle. Trois implémentations à réaliser :
+
+- Implémentation séquentielle (parallélisme coopératif mono-threadé)
+- Implémentation basée sur les primitives Unix (fork, pipe)
+- Implémentation permettant la communication en réseau
+
+
+Commentaires techniques
+=======================
+
+Dans les versions séquentielle et Unix, nous avons implémenté une nouvelle
+primitive : select, qui permet de faire un get sur plusieurs caneaux en même
+temps et d'exécuter une fonction différente en fonction du premier canal sur
+lequel un message arrive. Nous n'avons pas pris le temps d'implémenter cette
+fonction dans la version fonctionnant par le réseau.
+
+Version séquentielle
+--------------------
+
+Un processus est décrit par le type suivant :
+
+ type 'a process = ('a -> unit) -> unit
+
+C'est-à-dire qu'un processus renvoyant une valeur de type 'a est une fonction qui
+prend comme argument sa continuation et s'exécute à ce moment-là.
+
+Les fonctions qui exploitent le parallélisme font appel à une file de processus
+en attente d'exécution : doco lance des processus en “parallèle” en mettant
+les-dits processus dans la file ; get gère l'attente d'un message sur un canal
+en mettant en fin de file un processus qui re-tente le get lorsque celui-ci a
+échoué car le canal ne contenait aucune donnée - l'éspoir étant qu'un autre
+processus se sera exécuté d'ici-là et aura envoyé un message dans le canal.
+
+Version Unix
+------------
+
+Toutes les primitives sont fournies d'office par Unix, il n'y a donc presque
+rien à faire. Les put/get sont automatiquement gérés par le noyau en ce qui
+concerne la bufferisation et la mise en attente d'un processus tant qu'il n'y a
+rien à lire ou qu'il n'y a plus de place pour écrire. Le lancement de processus
+en parallèle (doco) exploite simplement l'appel système fork, puis waitpid pour
+la syncronisation finale.
+
+Version réseau
+--------------
+
+Publicité pour la version réseau : nous avons réussi, en mobilisant 5 machines
+des salles INFO3 et INFO4 de l'ENS, à calcuer de manière naïve (c'est-à-dire
+avec un algorithme exponentiel) le 53e nombre de la suite de Fibonacci, en un
+temps record de 16,8 secondes. Ce nombre pouvait être calculé sur une seule
+machine, ce qui prenait un peu plus d'une minute dans le cas d'une machine dont
+les quatre cœurs étaient exploités (implémentation Unix). En mobilisant plus de
+machines, nous pourrions sûrement améliorer encore ce temps.
+
+L'implémentation réseau est basée sur une version simplifiée de l'implémentation
+séquentielle, où un processus participant de l'exécution du réseau se contente
+de lire des tâches sur stdin, de les exécuter et d'envoyer des informations sur
+stdout (messages envoyés, tâches lancées par doco).
+
+À cela se rajoute un “manager”, ou gestionnaire, qui s'occupe de multiplexer les
+entrées/sorties pour dispatcher les processus disponibles aux différents
+processus qui lui sont affectés.
+
+Les appels système pour la communication étant les même pour le réseau et les
+pipes (read/write), le manager peut aussi bien communiquer avec des processus
+locaux qu'avec des processus distants via le réseau.
+
+À cela nous avons rajouté un système de “pool” (pool server/pool client) qui
+permet à un certain nombre de machines de se déclarer “disponnibles” pour des
+calculs. Le manager peut ensuite demander à la pool de lui donner un certain
+nombre de processus pour effectuer des calculs. Le pool client s'occupe
+d'initaliser la connection réseau et de rediriger stdin et stdout vers le
+réseau, avant de lancer le processus qui effectuera les calculs.
+
+Les tâches (processus au sens de Kahn) sont des fermetures que l'on serialise
+via la bibliothèque Marshall d'OCaml pour être transmis par le réseau. La partie
+manager est indépendante de l'application que l'on fait tourner, par contre le
+binaire qui effectue les calculs doit être identique sur toutes les machines
+participant au calcul pour que des fermetures puissent être transmises via
+Marshall sans problème.
+
+Utilisation de la version réseau :
+
+ tulipier$ ./poolserver.native &
+ tonka$ ./poolclient.native tulipier
+ tamier$ ./poolclient.native tulipier
+ turnep$ ./poolclient.native tulipier
+ tetragone$ ./poolclient.native tulipier
+ tulipier$ time ./manager.native -pool-addr tulipier \
+ -my-addr tulipier -pool-proc 16 -local-proc 4 ./example.native
+
+(en supposant que . correspond au même dossier, monté par NFS par exemple, sur
+toutes les machines)
+
+Les temps d'exécution peuvent varier car ils sont fonction de la répartition
+entre les machines des tâches qui calculent peu et communiquent beaucoup :
+celles-ci ralentissent le système lorsqu'elles sont lancées sur une machine qui
+n'est pas celle où tourne le manager. Nous ne pouvons avoir que peu d'influence
+là-dessus puisque la répartition des processus est un processus aléatoire.
+
diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml
index 8aff905..c699bbd 100644
--- a/src/kahn_seq.ml
+++ b/src/kahn_seq.ml
@@ -53,8 +53,14 @@ module Seq: S = struct
let doco l =
fun cont ->
- List.iter (fun proc -> Queue.push (fun () -> proc (fun () -> ())) tasks) l;
- cont ()
+ let remain = ref (List.length l) in
+ List.iter (fun proc -> Queue.push (fun () -> proc (fun () -> remain := !remain - 1)) tasks) l;
+ let rec wait_x () =
+ if !remain = 0 then
+ cont ()
+ else
+ Queue.push wait_x tasks
+ in wait_x ()
let return v =
fun cont -> cont v
diff --git a/src/manager.ml b/src/manager.ml
index 9498d8c..4f49dd9 100644
--- a/src/manager.ml
+++ b/src/manager.ml
@@ -15,7 +15,7 @@ let my_addr = ref ""
let my_port = ref 9011
let pool_addr = ref ""
let pool_port = ref 9082
-let pool_count = ref 0
+let pool_proc = ref 0
(* Server data structures *)
@@ -45,6 +45,12 @@ type server = {
sock: file_descr;
}
+let shutdown_server server =
+ dbg "Shutting down server...";
+ Hashtbl.iter (fun _ c -> c.disconnect()) server.clients;
+ if !my_addr <> "" then shutdown server.sock SHUTDOWN_ALL;
+ close server.sock
+
let new_server () =
let server =
{ tasks = Queue.create ();
@@ -58,11 +64,10 @@ let new_server () =
if !my_addr <> "" then begin
dbg @@ Format.sprintf "Listening on port %d" !my_port;
bind server.sock (make_addr "0.0.0.0" !my_port);
- listen server.sock (min 1 !pool_count);
+ listen server.sock (min 1 !pool_proc);
let stop_srv _ =
- dbg "Shutting down server...";
- shutdown server.sock SHUTDOWN_ALL;
+ shutdown_server server;
exit 0
in
Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
@@ -84,9 +89,6 @@ let push_task server task =
| MsgTask(a, b) -> GiveMsgTask(a, b)
| Task(a) -> GiveTask(a))
-let get_task server =
- Queue.pop server.tasks
-
let handle_put server chan msg =
if Hashtbl.mem server.tsk_chan chan then
let task = Hashtbl.find server.tsk_chan chan in
@@ -220,12 +222,9 @@ let rec server_run server =
end
end
-let server_shutdown server =
- if !my_addr <> "" then shutdown server.sock SHUTDOWN_ALL
(* Main function *)
-
let parse_args () =
let usage = "Usage: ./manager [options] program" in
let options = [
@@ -235,7 +234,7 @@ let parse_args () =
"-my-port", Arg.Set_int my_port, "Port for me to listen";
"-pool-addr", Arg.Set_string pool_addr, "Pool server to use";
"-pool-port", Arg.Set_int pool_port, "Port on which to connect to pool";
- "-pool-count", Arg.Set_int pool_count, "Number of processes to ask to pool";
+ "-pool-proc", Arg.Set_int pool_proc, "Number of processes to ask to pool";
] in
Arg.parse options (fun n -> program := n) usage
@@ -260,6 +259,7 @@ let () =
let p2m_m, p2m_p = pipe () in
match fork() with
| 0 ->
+ close server.sock;
close m2p_m;
close p2m_m;
dup2 m2p_p stdin;
@@ -285,7 +285,7 @@ let () =
pids := pid :: (!pids)
done;
- if !pool_addr <> "" && !pool_count > 0 then begin
+ if !pool_addr <> "" && !pool_proc > 0 then begin
let sock = socket PF_INET SOCK_STREAM 0 in
connect sock (make_addr !pool_addr !pool_port);
let outc = out_channel_of_descr sock in
@@ -295,13 +295,13 @@ let () =
if read_one_msg sock <> PoolHello then
raise (ProtocolError "Expected PoolHello reply.");
- send (PoolRequest(!program, (!my_addr, !my_port), !pool_count));
+ send (PoolRequest(!program, (!my_addr, !my_port), !pool_proc));
shutdown sock SHUTDOWN_ALL;
close sock
end;
server_run server;
- server_shutdown server;
+ shutdown_server server;
List.iter (fun pid -> ignore (waitpid [] pid)) !pids
diff --git a/src/poolserver.ml b/src/poolserver.ml
index 15a700c..850db3c 100644
--- a/src/poolserver.ml
+++ b/src/poolserver.ml
@@ -29,7 +29,9 @@ let new_server () =
let stop_srv _ =
Format.eprintf "Shutting down server...@.";
+ Hashtbl.iter (fun _ c -> c.disconnect()) server.clients;
shutdown server.sock SHUTDOWN_ALL;
+ close server.sock;
exit 0
in
Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);