aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex AUVOLAT <alex.auvolat@ens.fr>2014-03-20 16:39:16 +0100
committerAlex AUVOLAT <alex.auvolat@ens.fr>2014-03-20 16:39:16 +0100
commit9b9bc4e787d6ecebcf15182a562fc47d27d9880d (patch)
treeb629d6a0c3c6f0d80dbde8e0bc6252d99f1d834e
parent084745ffe51234e366ed6627d9f697c47d87bb4a (diff)
downloadSystemeReseaux-Projet-9b9bc4e787d6ecebcf15182a562fc47d27d9880d.tar.gz
SystemeReseaux-Projet-9b9bc4e787d6ecebcf15182a562fc47d27d9880d.zip
Added implementation with pipes.
-rw-r--r--.gitignore1
-rw-r--r--src/kahn.ml47
-rw-r--r--src/kahnsock.ml45
-rw-r--r--src/primes.ml5
4 files changed, 96 insertions, 2 deletions
diff --git a/.gitignore b/.gitignore
index a674ee4..81bbf6f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
*.byte
*.native
*/_build/*
+*.swp
diff --git a/src/kahn.ml b/src/kahn.ml
index 91b251f..5229f7e 100644
--- a/src/kahn.ml
+++ b/src/kahn.ml
@@ -150,3 +150,50 @@ module Seq: S = struct
end
+
+module Pipe: S = struct
+ type 'a process = unit -> 'a
+
+ type 'a in_port = in_channel
+ type 'a out_port = out_channel
+
+ let children = ref []
+
+ let new_channel =
+ fun () ->
+ let i, o = Unix.pipe () in
+ Unix.in_channel_of_descr i, Unix.out_channel_of_descr o
+
+ let get c =
+ fun () -> Marshal.from_channel c
+
+ let put x c =
+ fun () -> Marshal.to_channel c x []
+
+
+ let return v =
+ fun () -> v
+
+ let bind e f =
+ fun () -> f (e ()) ()
+
+ let run p =
+ let v = p() in
+ List.iter
+ (fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
+ !children;
+ v
+
+ let doco l =
+ fun () ->
+ List.iter (fun p ->
+ let i = Unix.fork () in
+ if i = 0 then begin
+ children := [];
+ run p;
+ exit 0
+ end else begin
+ children := i::!children
+ end)
+ l
+end
diff --git a/src/kahnsock.ml b/src/kahnsock.ml
new file mode 100644
index 0000000..6cdfadd
--- /dev/null
+++ b/src/kahnsock.ml
@@ -0,0 +1,45 @@
+Random.self_init ()
+
+type ident = (int * int * int * int)
+let gen_ident () =
+ Random.int 1000000000, Random.int 1000000000,
+ Random.int 1000000000, Random.int 1000000000
+
+module Sock : Kahn.S = struct
+
+ (* L'idée :
+ - L'ensemble des noeuds qui font du calcul est un arbre.
+ Le premier noeud lancé est la racine de l'arbre ; tous les
+ noeuds qui se connectent par la suite se connectent à un
+ noeud déjà présent et sont donc son fils.
+ - Les processus sont des fermetures de type unit -> unit,
+ transmises par des canaux
+ - Un noeud de calcul est un processus ocaml avec un seul
+ thread. Le parallélisme est coopératif (penser à faire
+ des binds assez souvent).
+ - Les noeuds publient régulièrement leur load, ie le nombre
+ de processus en attente et qui ne sont pas en train
+ d'attendre des données depuis un canal. Si un noeud a un
+ voisin dont le load est plus faible que le sien d'une
+ quantité plus grande que 2, il délègue une tâche.
+ - Le noeud racine délègue toutes ses tâches et sert uniquement
+ pour les entrées-sorties
+ - Comportement indéterminé lorsqu'un noeud se déconnecte
+ (des processus peuvent disparaître, le réseau est cassé...)
+ - Les canaux sont identifiés par le type ident décrit
+ ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le
+ monde le sait. Lorsque quelqu'un lit sur un canal, tout le
+ monde le sait. (on n'est pas capable de déterminer
+ quel est le noeud propriétaire du processus devant lire
+ le message) Les communications sont donc coûteuses.
+ *)
+
+ type 'a process = (unit -> 'a)
+
+ type 'a in_port = ident
+ type 'a out_port = ident
+
+ let cin = (0, 0, 0, 0)
+ let cout = (0, 0, 0, 1)
+
+end
diff --git a/src/primes.ml b/src/primes.ml
index 0911f31..e0eeed7 100644
--- a/src/primes.ml
+++ b/src/primes.ml
@@ -32,11 +32,12 @@ module Primes (K : Kahn.S) = struct
let main : unit process =
(delay new_channel ()) >>=
- (fun (q_in, q_out) -> doco [ integers 5000 q_out ; primes q_in ])
+ (fun (q_in, q_out) -> doco [ integers 2000 q_out ; primes q_in ])
end
-module P = Primes(Kahn.Seq)
+module Eng = Kahn.Pipe
+module P = Primes(Eng)
let () = P.K.run P.main