aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/kahn_seq.ml10
-rw-r--r--src/manager.ml28
-rw-r--r--src/poolserver.ml2
3 files changed, 24 insertions, 16 deletions
diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml
index 8aff905..c699bbd 100644
--- a/src/kahn_seq.ml
+++ b/src/kahn_seq.ml
@@ -53,8 +53,14 @@ module Seq: S = struct
let doco l =
fun cont ->
- List.iter (fun proc -> Queue.push (fun () -> proc (fun () -> ())) tasks) l;
- cont ()
+ let remain = ref (List.length l) in
+ List.iter (fun proc -> Queue.push (fun () -> proc (fun () -> remain := !remain - 1)) tasks) l;
+ let rec wait_x () =
+ if !remain = 0 then
+ cont ()
+ else
+ Queue.push wait_x tasks
+ in wait_x ()
let return v =
fun cont -> cont v
diff --git a/src/manager.ml b/src/manager.ml
index 9498d8c..4f49dd9 100644
--- a/src/manager.ml
+++ b/src/manager.ml
@@ -15,7 +15,7 @@ let my_addr = ref ""
let my_port = ref 9011
let pool_addr = ref ""
let pool_port = ref 9082
-let pool_count = ref 0
+let pool_proc = ref 0
(* Server data structures *)
@@ -45,6 +45,12 @@ type server = {
sock: file_descr;
}
+let shutdown_server server =
+ dbg "Shutting down server...";
+ Hashtbl.iter (fun _ c -> c.disconnect()) server.clients;
+ if !my_addr <> "" then shutdown server.sock SHUTDOWN_ALL;
+ close server.sock
+
let new_server () =
let server =
{ tasks = Queue.create ();
@@ -58,11 +64,10 @@ let new_server () =
if !my_addr <> "" then begin
dbg @@ Format.sprintf "Listening on port %d" !my_port;
bind server.sock (make_addr "0.0.0.0" !my_port);
- listen server.sock (min 1 !pool_count);
+ listen server.sock (min 1 !pool_proc);
let stop_srv _ =
- dbg "Shutting down server...";
- shutdown server.sock SHUTDOWN_ALL;
+ shutdown_server server;
exit 0
in
Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
@@ -84,9 +89,6 @@ let push_task server task =
| MsgTask(a, b) -> GiveMsgTask(a, b)
| Task(a) -> GiveTask(a))
-let get_task server =
- Queue.pop server.tasks
-
let handle_put server chan msg =
if Hashtbl.mem server.tsk_chan chan then
let task = Hashtbl.find server.tsk_chan chan in
@@ -220,12 +222,9 @@ let rec server_run server =
end
end
-let server_shutdown server =
- if !my_addr <> "" then shutdown server.sock SHUTDOWN_ALL
(* Main function *)
-
let parse_args () =
let usage = "Usage: ./manager [options] program" in
let options = [
@@ -235,7 +234,7 @@ let parse_args () =
"-my-port", Arg.Set_int my_port, "Port for me to listen";
"-pool-addr", Arg.Set_string pool_addr, "Pool server to use";
"-pool-port", Arg.Set_int pool_port, "Port on which to connect to pool";
- "-pool-count", Arg.Set_int pool_count, "Number of processes to ask to pool";
+ "-pool-proc", Arg.Set_int pool_proc, "Number of processes to ask to pool";
] in
Arg.parse options (fun n -> program := n) usage
@@ -260,6 +259,7 @@ let () =
let p2m_m, p2m_p = pipe () in
match fork() with
| 0 ->
+ close server.sock;
close m2p_m;
close p2m_m;
dup2 m2p_p stdin;
@@ -285,7 +285,7 @@ let () =
pids := pid :: (!pids)
done;
- if !pool_addr <> "" && !pool_count > 0 then begin
+ if !pool_addr <> "" && !pool_proc > 0 then begin
let sock = socket PF_INET SOCK_STREAM 0 in
connect sock (make_addr !pool_addr !pool_port);
let outc = out_channel_of_descr sock in
@@ -295,13 +295,13 @@ let () =
if read_one_msg sock <> PoolHello then
raise (ProtocolError "Expected PoolHello reply.");
- send (PoolRequest(!program, (!my_addr, !my_port), !pool_count));
+ send (PoolRequest(!program, (!my_addr, !my_port), !pool_proc));
shutdown sock SHUTDOWN_ALL;
close sock
end;
server_run server;
- server_shutdown server;
+ shutdown_server server;
List.iter (fun pid -> ignore (waitpid [] pid)) !pids
diff --git a/src/poolserver.ml b/src/poolserver.ml
index 15a700c..850db3c 100644
--- a/src/poolserver.ml
+++ b/src/poolserver.ml
@@ -29,7 +29,9 @@ let new_server () =
let stop_srv _ =
Format.eprintf "Shutting down server...@.";
+ Hashtbl.iter (fun _ c -> c.disconnect()) server.clients;
shutdown server.sock SHUTDOWN_ALL;
+ close server.sock;
exit 0
in
Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);