diff options
author | Micah Anderson <micah@leap.se> | 2014-01-15 18:13:16 +0000 |
---|---|---|
committer | drebs <drebs@leap.se> | 2014-01-17 08:48:11 -0200 |
commit | 510c6d763fba74f95ae8f894408c3658bcef4f83 (patch) | |
tree | d4dd0930b902cb1e5d46bea621ec83f801ea8ed6 /deps/rexi/src | |
parent | 8bd863936ead4243f58fb99e11d1221e1af0a71e (diff) |
embed dependencies that were previously pulled in by git during rebar build
Diffstat (limited to 'deps/rexi/src')
-rw-r--r-- | deps/rexi/src/rexi.app.src | 7 | ||||
-rw-r--r-- | deps/rexi/src/rexi.erl | 125 | ||||
-rw-r--r-- | deps/rexi/src/rexi_app.erl | 25 | ||||
-rw-r--r-- | deps/rexi/src/rexi_monitor.erl | 66 | ||||
-rw-r--r-- | deps/rexi/src/rexi_server.erl | 190 | ||||
-rw-r--r-- | deps/rexi/src/rexi_sup.erl | 29 | ||||
-rw-r--r-- | deps/rexi/src/rexi_utils.erl | 52 |
7 files changed, 494 insertions, 0 deletions
diff --git a/deps/rexi/src/rexi.app.src b/deps/rexi/src/rexi.app.src new file mode 100644 index 00000000..75baa77f --- /dev/null +++ b/deps/rexi/src/rexi.app.src @@ -0,0 +1,7 @@ +{application, rexi, [ + {description, "Lightweight RPC server"}, + {vsn, git}, + {registered, [rexi_sup, rexi_server]}, + {applications, [kernel, stdlib]}, + {mod, {rexi_app,[]}} +]}. diff --git a/deps/rexi/src/rexi.erl b/deps/rexi/src/rexi.erl new file mode 100644 index 00000000..a98b5610 --- /dev/null +++ b/deps/rexi/src/rexi.erl @@ -0,0 +1,125 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(rexi). +-export([start/0, stop/0, restart/0]). +-export([cast/2, cast/3, kill/2]). +-export([reply/1, sync_reply/1, sync_reply/2]). +-export([async_server_call/2, async_server_call/3]). +-export([get_errors/0, get_last_error/0, set_error_limit/1]). + +-include("rexi.hrl"). + +-define(SERVER, rexi_server). + +start() -> + application:start(rexi). + +stop() -> + application:stop(rexi). + +restart() -> + stop(), start(). + +-spec get_errors() -> {ok, [#error{}]}. +get_errors() -> + gen_server:call(?SERVER, get_errors). + +-spec get_last_error() -> {ok, #error{}} | {error, empty}. +get_last_error() -> + gen_server:call(?SERVER, get_last_error). + +-spec set_error_limit(pos_integer()) -> ok. +set_error_limit(N) when is_integer(N), N > 0 -> + gen_server:call(?SERVER, {set_error_limit, N}). + +%% @equiv cast(Node, self(), MFA) +-spec cast(node(), {atom(), atom(), list()}) -> reference(). +cast(Node, MFA) -> + cast(Node, self(), MFA). + +%% @doc Executes apply(M, F, A) on Node. +%% You might want to use this instead of rpc:cast/4 for two reasons. First, +%% the Caller pid and the returned reference are inserted into the remote +%% process' dictionary as `rexi_from', so it has a way to communicate with you. +%% Second, the remote process is monitored. If it exits with a Reason other +%% than normal, Caller will receive a message of the form +%% `{Ref, {rexi_EXIT, Reason}}' where Ref is the returned reference. +-spec cast(node(), pid(), {atom(), atom(), list()}) -> reference(). +cast(Node, Caller, MFA) -> + Ref = make_ref(), + do_send({?SERVER, Node}, cast_msg({doit, {Caller, Ref}, get(nonce), MFA})), + Ref. + +%% @doc Sends an async kill signal to the remote process associated with Ref. +%% No rexi_EXIT message will be sent. +-spec kill(node(), reference()) -> ok. +kill(Node, Ref) -> + do_send({?SERVER, Node}, cast_msg({kill, Ref})), + ok. + +%% @equiv async_server_call(Server, self(), Request) +-spec async_server_call(pid() | {atom(),node()}, any()) -> reference(). +async_server_call(Server, Request) -> + async_server_call(Server, self(), Request). + +%% @doc Sends a properly formatted gen_server:call Request to the Server and +%% returns the reference which the Server will include in its reply. The +%% function acts more like cast() than call() in that the server process +%% is not monitored. Clients who want to know if the server is alive should +%% monitor it themselves before calling this function. +-spec async_server_call(pid() | {atom(),node()}, pid(), any()) -> reference(). +async_server_call(Server, Caller, Request) -> + Ref = make_ref(), + do_send(Server, {'$gen_call', {Caller,Ref}, Request}), + Ref. + +%% @doc convenience function to reply to the original rexi Caller. +-spec reply(any()) -> any(). +reply(Reply) -> + {Caller, Ref} = get(rexi_from), + erlang:send(Caller, {Ref,Reply}). + +%% @equiv sync_reply(Reply, 300000) +sync_reply(Reply) -> + sync_reply(Reply, 300000). + +%% @doc convenience function to reply to caller and wait for response. Message +%% is of the form {OriginalRef, {self(),reference()}, Reply}, which enables the +%% original caller to respond back. +-spec sync_reply(any(), pos_integer() | infinity) -> any(). +sync_reply(Reply, Timeout) -> + {Caller, Ref} = get(rexi_from), + Tag = make_ref(), + erlang:send(Caller, {Ref, {self(),Tag}, Reply}), + receive {Tag, Response} -> + Response + after Timeout -> + timeout + end. + +%% internal functions %% + +cast_msg(Msg) -> {'$gen_cast', Msg}. + +% send a message as quickly as possible +do_send(Dest, Msg) -> + case erlang:send(Dest, Msg, [noconnect, nosuspend]) of + noconnect -> + spawn(erlang, send, [Dest, Msg]); + nosuspend -> + spawn(erlang, send, [Dest, Msg]); + ok -> + ok + end. diff --git a/deps/rexi/src/rexi_app.erl b/deps/rexi/src/rexi_app.erl new file mode 100644 index 00000000..2dd99c23 --- /dev/null +++ b/deps/rexi/src/rexi_app.erl @@ -0,0 +1,25 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(rexi_app). +-behaviour(application). +-export([start/2, stop/1]). + +-include_lib("eunit/include/eunit.hrl"). + +start(_Type, StartArgs) -> + rexi_sup:start_link(StartArgs). + +stop(_State) -> + ok. diff --git a/deps/rexi/src/rexi_monitor.erl b/deps/rexi/src/rexi_monitor.erl new file mode 100644 index 00000000..ab33fb87 --- /dev/null +++ b/deps/rexi/src/rexi_monitor.erl @@ -0,0 +1,66 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(rexi_monitor). +-export([start/1, stop/1]). + +-include_lib("eunit/include/eunit.hrl"). + +%% @doc spawn_links a process which monitors the supplied list of items and +%% returns the process ID. If a monitored process exits, the caller will +%% receive a {rexi_DOWN, MonitoringPid, DeadPid, Reason} message. +-spec start([pid() | atom() | {atom(),node()}]) -> pid(). +start(Procs) -> + Parent = self(), + Nodes = [node() | nodes()], + {Mon, Skip} = lists:partition(fun(P) -> should_monitor(P, Nodes) end, + Procs), + spawn_link(fun() -> + [notify_parent(Parent, P, noconnect) || P <- Skip], + [erlang:monitor(process, P) || P <- Mon], + wait_monitors(Parent) + end). + +%% @doc Cleanly shut down the monitoring process and flush all rexi_DOWN +%% messages from our mailbox. +-spec stop(pid()) -> ok. +stop(MonitoringPid) -> + MonitoringPid ! {self(), shutdown}, + flush_down_messages(). + +%% internal functions %% + +notify_parent(Parent, Pid, Reason) -> + erlang:send(Parent, {rexi_DOWN, self(), Pid, Reason}). + +should_monitor(Pid, Nodes) when is_pid(Pid) -> + lists:member(node(Pid), Nodes); +should_monitor({_, Node}, Nodes) -> + lists:member(Node, Nodes). + +wait_monitors(Parent) -> + receive + {'DOWN', _, process, Pid, Reason} -> + notify_parent(Parent, Pid, Reason), + wait_monitors(Parent); + {Parent, shutdown} -> + ok + end. + +flush_down_messages() -> + receive {rexi_DOWN, _, _, _} -> + flush_down_messages() + after 0 -> + ok + end. diff --git a/deps/rexi/src/rexi_server.erl b/deps/rexi/src/rexi_server.erl new file mode 100644 index 00000000..aa417aa4 --- /dev/null +++ b/deps/rexi/src/rexi_server.erl @@ -0,0 +1,190 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(rexi_server). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, init_p/2, init_p/3]). + +-include("rexi.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-record(job, { + client::reference(), + worker::reference(), + client_pid::pid(), + worker_pid::pid() +}). + +-record(st, { + workers = ets:new(workers, [private, {keypos, #job.worker}]), + clients = ets:new(clients, [private, {keypos, #job.client}]), + errors = queue:new(), + error_limit = 20, + error_count = 0 +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #st{}}. + +handle_call(get_errors, _From, #st{errors = Errors} = St) -> + {reply, {ok, lists:reverse(queue:to_list(Errors))}, St}; + +handle_call(get_last_error, _From, #st{errors = Errors} = St) -> + try + {reply, {ok, queue:get_r(Errors)}, St} + catch error:empty -> + {reply, {error, empty}, St} + end; + +handle_call({set_error_limit, N}, _From, #st{error_count=Len, errors=Q} = St) -> + if N < Len -> + {NewQ, _} = queue:split(N, Q); + true -> + NewQ = Q + end, + NewLen = queue:len(NewQ), + {reply, ok, St#st{error_limit=N, error_count=NewLen, errors=NewQ}}; + +handle_call(_Request, _From, St) -> + {reply, ignored, St}. + + +handle_cast({doit, From, MFA}, St) -> + handle_cast({doit, From, undefined, MFA}, St); + +handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) -> + {LocalPid, Ref} = spawn_monitor(?MODULE, init_p, [From, MFA, Nonce]), + Job = #job{ + client = ClientRef, + worker = Ref, + client_pid = ClientPid, + worker_pid = LocalPid + }, + {noreply, add_job(Job, State)}; + + +handle_cast({kill, FromRef}, #st{clients = Clients} = St) -> + case find_worker(FromRef, Clients) of + #job{worker = KeyRef, worker_pid = Pid} = Job -> + erlang:demonitor(KeyRef), + exit(Pid, kill), + {noreply, remove_job(Job, St)}; + false -> + {noreply, St} + end; + +handle_cast(_, St) -> + twig:log(notice, "rexi_server ignored_cast"), + {noreply, St}. + +handle_info({'DOWN', Ref, process, _, normal}, #st{workers=Workers} = St) -> + case find_worker(Ref, Workers) of + #job{} = Job -> + {noreply, remove_job(Job, St)}; + false -> + {noreply, St} + end; + +handle_info({'DOWN', Ref, process, Pid, Error}, #st{workers=Workers} = St) -> + case find_worker(Ref, Workers) of + #job{worker_pid=Pid, worker=Ref, client_pid=CPid, client=CRef} =Job -> + case Error of #error{reason = {_Class, Reason}, stack = Stack} -> + notify_caller({CPid, CRef}, {Reason, Stack}), + St1 = save_error(Error, St), + {noreply, remove_job(Job, St1)}; + _ -> + notify_caller({CPid, CRef}, Error), + {noreply, remove_job(Job, St)} + end; + false -> + {noreply, St} + end; + +handle_info(_Info, St) -> + {noreply, St}. + +terminate(_Reason, St) -> + ets:foldl(fun(#job{worker_pid=Pid},_) -> exit(Pid,kill) end, nil, + St#st.workers), + ok. + +code_change(_OldVsn, {st, Workers}, _Extra) -> + {ok, #st{workers = Workers}}; + +code_change(_OldVsn, {st, Workers0, Errors, Limit, Count}, _Extra) -> + Jobs = [#job{worker_pid=A, worker=B, client_pid=C, client=D} + || {A, B, {C, D}} <- ets:tab2list(Workers0)], + ets:delete(Workers0), + State = #st{errors = Errors, error_limit = Limit, error_count = Count}, + ets:insert(State#st.workers, Jobs), + ets:insert(State#st.clients, Jobs), + {ok, State}; + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + +init_p(From, MFA) -> + init_p(From, MFA, undefined). + +%% @doc initializes a process started by rexi_server. +-spec init_p({pid(), reference()}, {atom(), atom(), list()}, + string() | undefined) -> any(). +init_p(From, {M,F,A}, Nonce) -> + put(rexi_from, From), + put(initial_call, {M,F,length(A)}), + put(nonce, Nonce), + try apply(M, F, A) catch exit:normal -> ok; Class:Reason -> + Stack = clean_stack(), + twig:log(error, "rexi_server ~p:~p ~100p", [Class, Reason, Stack]), + exit(#error{ + timestamp = now(), + reason = {Class, Reason}, + mfa = {M,F,A}, + nonce = Nonce, + stack = Stack + }) + end. + +%% internal + +save_error(E, #st{errors=Q, error_limit=L, error_count=C} = St) when C >= L -> + St#st{errors = queue:in(E, queue:drop(Q))}; +save_error(E, #st{errors=Q, error_count=C} = St) -> + St#st{errors = queue:in(E, Q), error_count = C+1}. + +clean_stack() -> + lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end, + erlang:get_stacktrace()). + +add_job(Job, #st{workers = Workers, clients = Clients} = State) -> + ets:insert(Workers, Job), + ets:insert(Clients, Job), + State. + +remove_job(Job, #st{workers = Workers, clients = Clients} = State) -> + ets:delete_object(Workers, Job), + ets:delete_object(Clients, Job), + State. + +find_worker(Ref, Tab) -> + case ets:lookup(Tab, Ref) of [] -> false; [Worker] -> Worker end. + +notify_caller({Caller, Ref}, Reason) -> + Caller ! {Ref, {rexi_EXIT, Reason}}. diff --git a/deps/rexi/src/rexi_sup.erl b/deps/rexi/src/rexi_sup.erl new file mode 100644 index 00000000..828ee54d --- /dev/null +++ b/deps/rexi/src/rexi_sup.erl @@ -0,0 +1,29 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(rexi_sup). +-behaviour(supervisor). +-export([init/1]). + +-export([start_link/1]). + +-include_lib("eunit/include/eunit.hrl"). + +start_link(Args) -> + supervisor:start_link({local,?MODULE}, ?MODULE, Args). + +init([]) -> + Mod = rexi_server, + Spec = {Mod, {Mod,start_link,[]}, permanent, 100, worker, [Mod]}, + {ok, {{one_for_one, 3, 10}, [Spec]}}. diff --git a/deps/rexi/src/rexi_utils.erl b/deps/rexi/src/rexi_utils.erl new file mode 100644 index 00000000..7791866d --- /dev/null +++ b/deps/rexi/src/rexi_utils.erl @@ -0,0 +1,52 @@ +-module(rexi_utils). + +-export([recv/6]). + +%% @doc set up the receive loop with an overall timeout +-spec recv([any()], integer(), function(), any(), timeout(), timeout()) -> + {ok, any()} | {timeout, any()} | {error, atom()} | {error, atom(), any()}. +recv(Refs, Keypos, Fun, Acc0, infinity, PerMsgTO) -> + process_mailbox(Refs, Keypos, Fun, Acc0, nil, PerMsgTO); +recv(Refs, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) -> + TimeoutRef = erlang:make_ref(), + TRef = erlang:send_after(GlobalTimeout, self(), {timeout, TimeoutRef}), + try + process_mailbox(Refs, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) + after + erlang:cancel_timer(TRef) + end. + +process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> + case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of + {ok, Acc} -> + process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); + {stop, Acc} -> + {ok, Acc}; + Error -> + Error + end. + +process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> + receive + {timeout, TimeoutRef} -> + {timeout, Acc0}; + {Ref, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + % this was some non-matching message which we will ignore + {ok, Acc0}; + Worker -> + Fun(Msg, Worker, Acc0) + end; + {Ref, From, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + {ok, Acc0}; + Worker -> + Fun(Msg, {Worker, From}, Acc0) + end; + {rexi_DOWN, _, _, _} = Msg -> + Fun(Msg, nil, Acc0) + after PerMsgTO -> + {timeout, Acc0} + end. |