path: root/apps
diff options
authorBrad Anderson <>2010-08-17 22:04:15 -0400
committerAdam Kocoloski <>2010-08-27 16:52:26 -0400
commit3b64928720ce15c92374b031ff24b10022671261 (patch)
tree0535ad8bb4f6c3cfd148c62c8c0a0f24c1a7f589 /apps
parent794d78eb02a20e6c545d24fa9649839ebe763636 (diff)
split some rexi utilities out from fabric
Diffstat (limited to 'apps')
7 files changed, 70 insertions, 69 deletions
diff --git a/apps/fabric/src/fabric_util.erl b/apps/fabric/src/fabric_util.erl
index b9e30093..a3265010 100644
--- a/apps/fabric/src/fabric_util.erl
+++ b/apps/fabric/src/fabric_util.erl
@@ -14,12 +14,10 @@
--export([submit_jobs/3, cleanup/1, recv/4, receive_loop/4, receive_loop/6,
- get_db/1]).
+-export([submit_jobs/3, cleanup/1, recv/4, get_db/1]).
submit_jobs(Shards, EndPoint, ExtraArgs) ->
lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
@@ -31,66 +29,12 @@ cleanup(Workers) ->
[rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
recv(Workers, Keypos, Fun, Acc0) ->
- receive_loop(Workers, Keypos, Fun, Acc0).
-receive_loop(Workers, Keypos, Fun, Acc0) ->
- case couch_config:get("fabric", "request_timeout", "60000") of
- "infinity" ->
- Timeout = infinity;
- N ->
- Timeout = list_to_integer(N)
+ Timeout = case couch_config:get("fabric", "request_timeout", "60000") of
+ "infinity" -> infinity;
+ N -> list_to_integer(N)
- receive_loop(Workers, Keypos, Fun, Acc0, Timeout, infinity).
-%% @doc set up the receive loop with an overall timeout
--spec receive_loop([any()], integer(), function(), any(), timeout(), timeout()) ->
- {ok, any()} | timeout | {error, any()}.
-receive_loop(RefPartMap, Keypos, Fun, Acc0, infinity, PerMsgTO) ->
- process_mailbox(RefPartMap, Keypos, Fun, Acc0, nil, PerMsgTO);
-receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) ->
- TimeoutRef = erlang:make_ref(),
- {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}),
- try
- process_mailbox(RefPartMap, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO)
- after
- timer:cancel(TRef)
- end.
+ rexi_utils:recv(Workers, Keypos, Fun, Acc0, Timeout, infinity).
-process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
- case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of
- {ok, Acc} ->
- process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
- {stop, Acc} ->
- {ok, Acc};
- Error ->
- Error
- end.
-process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
- receive
- {timeout, TimeoutRef} ->
- timeout;
- {Ref, Msg} ->
- case lists:keyfind(Ref, Keypos, RefList) of
- false ->
- % this was some non-matching message which we will ignore
- {ok, Acc0};
- Worker ->
- Fun(Msg, Worker, Acc0)
- end;
- {Ref, From, Msg} ->
- case lists:keyfind(Ref, Keypos, RefList) of
- false ->
- {ok, Acc0};
- Worker ->
- Fun(Msg, {Worker, From}, Acc0)
- end;
- {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg ->
- ?LOG_ERROR("rexi_DOWN ~p ~p", [ServerPid, Reason]),
- Fun(Msg, nil, Acc0)
- after PerMsgTO ->
- timeout
- end.
get_db(DbName) ->
Shards = mem3:shards(DbName),
diff --git a/apps/fabric/src/fabric_view_all_docs.erl b/apps/fabric/src/fabric_view_all_docs.erl
index 45ae3864..b3436171 100644
--- a/apps/fabric/src/fabric_view_all_docs.erl
+++ b/apps/fabric/src/fabric_view_all_docs.erl
@@ -37,7 +37,7 @@ go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
limit = Limit,
user_acc = Acc0
- try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
State, infinity, 5000) of
{ok, NewState} ->
{ok, NewState#collector.user_acc};
diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl
index e4f44485..c7b6b7c4 100644
--- a/apps/fabric/src/fabric_view_changes.erl
+++ b/apps/fabric/src/fabric_view_changes.erl
@@ -103,7 +103,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) ->
limit = ChangesArgs#changes_args.limit,
rows = Seqs % store sequence positions instead
- try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
State, infinity, 5000)
@@ -221,7 +221,7 @@ unpack_seqs(0, DbName) ->
unpack_seqs("0", DbName) ->
fabric_dict:init(mem3:shards(DbName), 0);
unpack_seqs(Packed, DbName) ->
{match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?<opaque>.*)", [{capture,
[opaque], binary}]),
diff --git a/apps/fabric/src/fabric_view_map.erl b/apps/fabric/src/fabric_view_map.erl
index 91ed3179..e1210a84 100644
--- a/apps/fabric/src/fabric_view_map.erl
+++ b/apps/fabric/src/fabric_view_map.erl
@@ -42,7 +42,7 @@ go(DbName, DDoc, View, Args, Callback, Acc0) ->
sorted = Args#view_query_args.sorted,
user_acc = Acc0
- try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
State, infinity, 1000 * 60 * 60) of
{ok, NewState} ->
{ok, NewState#collector.user_acc};
@@ -117,7 +117,7 @@ handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) ->
{Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn),
gen_server:reply(From, ok),
{Go, St#collector{user_acc=Acc, limit=Limit-1}};
handle_message(#view_row{} = Row, {Worker, From}, State) ->
query_args = #view_query_args{direction=Dir},
@@ -149,4 +149,3 @@ merge_row(_, KeyDict, Row, Rows) ->
dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict)
end, [Row], Rows).
diff --git a/apps/fabric/src/fabric_view_reduce.erl b/apps/fabric/src/fabric_view_reduce.erl
index f401fc58..6ae564cc 100644
--- a/apps/fabric/src/fabric_view_reduce.erl
+++ b/apps/fabric/src/fabric_view_reduce.erl
@@ -49,7 +49,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) ->
rows = dict:new(),
user_acc = Acc0
- try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
State, infinity, 1000 * 60 * 60) of
{ok, NewState} ->
{ok, NewState#collector.user_acc};
diff --git a/apps/rexi/ebin/ b/apps/rexi/ebin/
index 620c8863..5c944b72 100644
--- a/apps/rexi/ebin/
+++ b/apps/rexi/ebin/
@@ -1,7 +1,12 @@
{application, rexi, [
{description, "Lightweight RPC server"},
{vsn, "1.2"},
- {modules, [rexi, rexi_app, rexi_sup, rexi_monitor, rexi_server]},
+ {modules, [rexi,
+ rexi_app,
+ rexi_sup,
+ rexi_monitor,
+ rexi_server,
+ rexi_utils]},
{registered, [rexi_sup, rexi_server]},
{applications, [kernel, stdlib]},
{mod, {rexi_app,[]}}
diff --git a/apps/rexi/src/rexi_utils.erl b/apps/rexi/src/rexi_utils.erl
new file mode 100644
index 00000000..79183951
--- /dev/null
+++ b/apps/rexi/src/rexi_utils.erl
@@ -0,0 +1,53 @@
+%% @doc set up the receive loop with an overall timeout
+-spec recv([any()], integer(), function(), any(), timeout(), timeout()) ->
+ {ok, any()} | timeout | {error, any()}.
+recv(Refs, Keypos, Fun, Acc0, infinity, PerMsgTO) ->
+ process_mailbox(Refs, Keypos, Fun, Acc0, nil, PerMsgTO);
+recv(Refs, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) ->
+ TimeoutRef = erlang:make_ref(),
+ {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}),
+ try
+ process_mailbox(Refs, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO)
+ after
+ timer:cancel(TRef)
+ end.
+process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
+ case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of
+ {ok, Acc} ->
+ process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
+ {stop, Acc} ->
+ {ok, Acc};
+ Error ->
+ Error
+ end.
+process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
+ receive
+ {timeout, TimeoutRef} ->
+ timeout;
+ {Ref, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ % this was some non-matching message which we will ignore
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, Worker, Acc0)
+ end;
+ {Ref, From, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, {Worker, From}, Acc0)
+ end;
+ {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg ->
+ io:format("rexi_DOWN ~p ~p", [ServerPid, Reason]),
+ Fun(Msg, nil, Acc0)
+ after PerMsgTO ->
+ timeout
+ end.