summaryrefslogtreecommitdiff
path: root/apps/fabric/src/fabric_util.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-12 02:27:21 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-12 02:27:21 -0400
commit4d7e3604c9ed5788747334c08359af1822368d15 (patch)
treea523dd8383ac7c7e0bc853469b3b7c539b4c30f2 /apps/fabric/src/fabric_util.erl
parent72d7a60124b4ee7e31912fe7ed3a50bbc5cb9f64 (diff)
parent5e2f90537f5b54adc94c58b58512a05b058fa804 (diff)
Add 'apps/fabric/' from commit '5e2f90537f5b54adc94c58b58512a05b058fa804'
git-subtree-dir: apps/fabric git-subtree-mainline: 72d7a60124b4ee7e31912fe7ed3a50bbc5cb9f64 git-subtree-split: 5e2f90537f5b54adc94c58b58512a05b058fa804
Diffstat (limited to 'apps/fabric/src/fabric_util.erl')
-rw-r--r--apps/fabric/src/fabric_util.erl89
1 files changed, 89 insertions, 0 deletions
diff --git a/apps/fabric/src/fabric_util.erl b/apps/fabric/src/fabric_util.erl
new file mode 100644
index 00000000..639a32e7
--- /dev/null
+++ b/apps/fabric/src/fabric_util.erl
@@ -0,0 +1,89 @@
+-module(fabric_util).
+
+-export([submit_jobs/3, cleanup/1, recv/4, receive_loop/4, receive_loop/6,
+ get_db/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+submit_jobs(Shards, EndPoint, ExtraArgs) ->
+ lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
+ Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}),
+ Shard#shard{ref = Ref}
+ end, Shards).
+
+cleanup(Workers) ->
+ [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
+
+recv(Workers, Keypos, Fun, Acc0) ->
+ receive_loop(Workers, Keypos, Fun, Acc0).
+
+receive_loop(Workers, Keypos, Fun, Acc0) ->
+ case couch_config:get("fabric", "request_timeout", "60000") of
+ "infinity" ->
+ Timeout = infinity;
+ N ->
+ Timeout = list_to_integer(N)
+ end,
+ receive_loop(Workers, Keypos, Fun, Acc0, Timeout, infinity).
+
+%% @doc set up the receive loop with an overall timeout
+-spec receive_loop([any()], integer(), function(), any(), timeout(), timeout()) ->
+ {ok, any()} | timeout | {error, any()}.
+receive_loop(RefPartMap, Keypos, Fun, Acc0, infinity, PerMsgTO) ->
+ process_mailbox(RefPartMap, Keypos, Fun, Acc0, nil, PerMsgTO);
+receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) ->
+ TimeoutRef = erlang:make_ref(),
+ {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}),
+ try
+ process_mailbox(RefPartMap, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO)
+ after
+ timer:cancel(TRef)
+ end.
+
+process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
+ case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of
+ {ok, Acc} ->
+ process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
+ {stop, Acc} ->
+ {ok, Acc};
+ Error ->
+ Error
+ end.
+
+process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
+ receive
+ {timeout, TimeoutRef} ->
+ timeout;
+ {Ref, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ % this was some non-matching message which we will ignore
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, Worker, Acc0)
+ end;
+ {Ref, From, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, {Worker, From}, Acc0)
+ end;
+ {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg ->
+ showroom_log:message(alert, "rexi_DOWN ~p ~p", [ServerPid, Reason]),
+ Fun(Msg, nil, Acc0)
+ after PerMsgTO ->
+ timeout
+ end.
+
+get_db(DbName) ->
+ Shards = mem3:shards(DbName),
+ case lists:partition(fun(#shard{node = N}) -> N =:= node() end, Shards) of
+ {[#shard{name = ShardName}|_], _} ->
+ % prefer node-local DBs
+ couch_db:open(ShardName, []);
+ {[], [#shard{node = Node, name = ShardName}|_]} ->
+ % but don't require them
+ rpc:call(Node, couch_db, open, [ShardName, []])
+ end.