summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_ref_counter.erl
blob: 96d9233327459c6efcfd478c33aa2e7803ac2836 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_ref_counter).
-behaviour(gen_server).

-export([start/1, init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
-export([drop/1,drop/2,add/1,add/2,count/1]).

start(ChildProcs) ->
    gen_server:start(couch_ref_counter, {self(), ChildProcs}, []).


drop(RefCounterPid) ->
    drop(RefCounterPid, self()).

drop(RefCounterPid, Pid) ->
    gen_server:cast(RefCounterPid, {drop, Pid}).


add(RefCounterPid) ->
    add(RefCounterPid, self()).

add(RefCounterPid, Pid) ->
    gen_server:call(RefCounterPid, {add, Pid}).

count(RefCounterPid) ->
    gen_server:call(RefCounterPid, count).

% server functions

-record(srv,
    {
    referrers=dict:new(), % a dict of each ref counting proc.
    child_procs=[]
    }).

init({Pid, ChildProcs}) ->
    [link(ChildProc) || ChildProc <- ChildProcs],
    Referrers = dict:from_list([{Pid, {erlang:monitor(process, Pid), 1}}]),
    {ok, #srv{referrers=Referrers, child_procs=ChildProcs}}.


terminate(_Reason, #srv{child_procs=ChildProcs}) ->
    [couch_util:shutdown_sync(Pid) || Pid <- ChildProcs],
    ok.


handle_call({add, Pid},_From, #srv{referrers=Referrers}=Srv) ->
    Referrers2 =
    case dict:find(Pid, Referrers) of
    error ->
        dict:store(Pid, {erlang:monitor(process, Pid), 1}, Referrers);
    {ok, {MonRef, RefCnt}} ->
        dict:store(Pid, {MonRef, RefCnt + 1}, Referrers)
    end,
    {reply, ok, Srv#srv{referrers=Referrers2}};
handle_call(count, _From, Srv) ->
    {monitors, Monitors} =  process_info(self(), monitors),
    {reply, length(Monitors), Srv}.


handle_cast({drop, Pid}, #srv{referrers=Referrers}=Srv) ->
    Referrers2 =
    case dict:find(Pid, Referrers) of
    {ok, {MonRef, 1}} ->
        erlang:demonitor(MonRef, [flush]),
        dict:erase(Pid, Referrers);
    {ok, {MonRef, Num}} ->
        dict:store(Pid, {MonRef, Num-1}, Referrers);
    error ->
        Referrers
    end,
    maybe_close_async(Srv#srv{referrers=Referrers2}).


code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

handle_info({'DOWN', MonRef, _, Pid, _}, #srv{referrers=Referrers}=Srv) ->
    {ok, {MonRef, _RefCount}} = dict:find(Pid, Referrers),
    maybe_close_async(Srv#srv{referrers=dict:erase(Pid, Referrers)}).


should_close() ->
    case process_info(self(), monitors) of
    {monitors, []} ->
        true;
    _ ->
        false
    end.

maybe_close_async(Srv) ->
    case should_close() of
    true ->
        {stop,normal,Srv};
    false ->
        {noreply,Srv}
    end.