summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/Makefile.am2
-rw-r--r--src/couchdb/couch_db.erl37
-rw-r--r--src/couchdb/couch_db.hrl1
-rw-r--r--src/couchdb/couch_db_updater.erl17
-rw-r--r--src/couchdb/couch_file.erl81
-rw-r--r--src/couchdb/couch_httpd_show.erl2
-rw-r--r--src/couchdb/couch_ref_counter.erl104
-rw-r--r--src/couchdb/couch_server.erl6
8 files changed, 142 insertions, 108 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 1ad5d14a..8dd21256 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -62,6 +62,7 @@ source_files = \
couch_log.erl \
couch_os_process.erl \
couch_query_servers.erl \
+ couch_ref_counter.erl \
couch_rep.erl \
couch_server.erl \
couch_server_sup.erl \
@@ -99,6 +100,7 @@ compiled_files = \
couch_log.beam \
couch_os_process.beam \
couch_query_servers.beam \
+ couch_ref_counter.beam \
couch_rep.beam \
couch_server.beam \
couch_server_sup.beam \
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index ebc11bed..d9caa36f 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -14,7 +14,7 @@
-behaviour(gen_server).
-export([open/2,close/1,create/2,start_compact/1,get_db_info/1]).
--export([open_ref_counted/2,num_refs/1,monitor/1,count_changes_since/2]).
+-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
-export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]).
@@ -67,15 +67,15 @@ ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) ->
ok = gen_server:call(UpdatePid, full_commit, infinity),
{ok, StartTime}.
-close(#db{fd=Fd}) ->
- couch_file:drop_ref(Fd).
+close(#db{fd_ref_counter=RefCntr}) ->
+ couch_ref_counter:drop(RefCntr).
open_ref_counted(MainPid, UserCtx) ->
- {ok, Db} = gen_server:call(MainPid, {open_ref_counted_instance, self()}),
+ {ok, Db} = gen_server:call(MainPid, {open_ref_count, self()}),
{ok, Db#db{user_ctx=UserCtx}}.
-num_refs(MainPid) ->
- gen_server:call(MainPid, num_refs).
+is_idle(MainPid) ->
+ gen_server:call(MainPid, is_idle).
monitor(#db{main_pid=MainPid}) ->
erlang:monitor(process, MainPid).
@@ -530,23 +530,28 @@ enum_docs(Db, StartId, InFun, Ctx) ->
init({DbName, Filepath, Fd, Options}) ->
{ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
- ok = couch_file:add_ref(Fd),
- gen_server:call(UpdaterPid, get_db).
+ {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db),
+ couch_ref_counter:add(RefCntr),
+ {ok, Db}.
terminate(_Reason, _Db) ->
ok.
-handle_call({open_ref_counted_instance, OpenerPid}, _From, #db{fd=Fd}=Db) ->
- ok = couch_file:add_ref(Fd, OpenerPid),
+handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) ->
+ ok = couch_ref_counter:add(RefCntr, OpenerPid),
{reply, {ok, Db}, Db};
-handle_call(num_refs, _From, #db{fd=Fd}=Db) ->
- {reply, couch_file:num_refs(Fd) - 1, Db};
-handle_call({db_updated, #db{fd=NewFd}=NewDb}, _From, #db{fd=OldFd}) ->
- case NewFd == OldFd of
+handle_call(is_idle, _From,
+ #db{fd_ref_counter=RefCntr, compactor_pid=Compact}=Db) ->
+ % Idle means no referrers. Unless in the middle of a compaction file switch,
+ % there are always at least 2 referrers, couch_db_updater and us.
+ {reply, (Compact == nil) and (couch_ref_counter:count(RefCntr) == 2), Db};
+handle_call({db_updated, #db{fd_ref_counter=NewRefCntr}=NewDb}, _From,
+ #db{fd_ref_counter=OldRefCntr}) ->
+ case NewRefCntr == OldRefCntr of
true -> ok;
false ->
- couch_file:add_ref(NewFd),
- couch_file:drop_ref(OldFd)
+ couch_ref_counter:add(NewRefCntr),
+ couch_ref_counter:drop(OldRefCntr)
end,
{reply, ok, NewDb}.
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index bfa16a3d..443ac73f 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -122,6 +122,7 @@
compactor_pid=nil,
instance_start_time, % number of microsecs since jan 1 1970 as a binary string
fd,
+ fd_ref_counter,
header = #db_header{},
summary_stream,
fulldocinfo_by_id_btree,
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index 800730d8..7790d7a4 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -22,7 +22,6 @@
-define(HEADER_SIG, <<$g, $m, $k, 0>>).
init({MainPid, DbName, Filepath, Fd, Options}) ->
- link(Fd),
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
@@ -38,8 +37,8 @@ init({MainPid, DbName, Filepath, Fd, Options}) ->
Db2 = refresh_validate_doc_funs(Db),
{ok, Db2#db{main_pid=MainPid}}.
-terminate(_Reason, Db) ->
- close_db(Db).
+terminate(_Reason, _Db) ->
+ ok.
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
@@ -175,7 +174,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
ok = file:rename(CompactFilepath, Filepath),
couch_stream:close(Db#db.summary_stream),
- couch_file:close_maybe(Db#db.fd),
+ couch_ref_counter:drop(Db#db.fd_ref_counter),
ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}),
?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
@@ -284,10 +283,11 @@ init_db(DbName, Filepath, Fd, Header0) ->
{MegaSecs, Secs, MicroSecs} = now(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
-
+ {ok, RefCntr} = couch_ref_counter:start([Fd]),
#db{
update_pid=self(),
fd=Fd,
+ fd_ref_counter = RefCntr,
header=Header,
summary_stream = SummaryStream,
fulldocinfo_by_id_btree = IdBtree,
@@ -302,10 +302,6 @@ init_db(DbName, Filepath, Fd, Header0) ->
}.
-close_db(#db{fd=Fd,summary_stream=Ss}) ->
- couch_file:close(Fd),
- couch_stream:close(Ss).
-
refresh_validate_doc_funs(Db) ->
{ok, DesignDocs} = get_design_docs(Db),
ProcessDocFuns = lists:flatmap(
@@ -641,8 +637,7 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
- NewDb2 = copy_compact(Db, NewDb, Retry),
- close_db(NewDb2),
+ _NewDb2 = copy_compact(Db, NewDb, Retry),
gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}).
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index b29f45d2..d1103030 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -20,7 +20,6 @@
-export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
-export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
--export([close_maybe/1,drop_ref/1,drop_ref/2,add_ref/1,add_ref/2,num_refs/1]).
%%----------------------------------------------------------------------
%% Args: Valid Options are [create] and [create,overwrite].
@@ -168,26 +167,6 @@ close(Fd) ->
Result = gen_server:cast(Fd, close),
catch unlink(Fd),
Result.
-
-close_maybe(Fd) ->
- catch unlink(Fd),
- catch gen_server:cast(Fd, close_maybe).
-
-drop_ref(Fd) ->
- drop_ref(Fd, self()).
-
-drop_ref(Fd, Pid) ->
- gen_server:cast(Fd, {drop_ref, Pid}).
-
-
-add_ref(Fd) ->
- add_ref(Fd, self()).
-
-add_ref(Fd, Pid) ->
- gen_server:call(Fd, {add_ref, Pid}).
-
-num_refs(Fd) ->
- gen_server:call(Fd, num_refs).
write_header(Fd, Prefix, Data) ->
@@ -291,7 +270,6 @@ init_status_error(ReturnPid, Ref, Error) ->
% server functions
init({Filepath, Options, ReturnPid, Ref}) ->
- process_flag(trap_exit, true),
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
@@ -357,66 +335,15 @@ handle_call({append_bin, Bin}, _From, Fd) ->
handle_call({pread_bin, Pos}, _From, Fd) ->
{ok, <<TermLen:32>>} = file:pread(Fd, Pos, 4),
{ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
- {reply, {ok, Bin}, Fd};
-handle_call({add_ref, Pid},_From, Fd) ->
- case get(Pid) of
- undefined ->
- put(Pid, {erlang:monitor(process, Pid), 1});
- {MonRef, RefCnt} ->
- put(Pid, {MonRef, RefCnt + 1})
- end,
- {reply, ok, Fd};
-handle_call(num_refs, _From, Fd) ->
- {monitors, Monitors} = process_info(self(), monitors),
- {reply, length(Monitors), Fd}.
+ {reply, {ok, Bin}, Fd}.
handle_cast(close, Fd) ->
- {stop,normal,Fd};
-handle_cast(close_maybe, Fd) ->
- maybe_close_async(Fd);
-handle_cast({drop_ref, Pid}, Fd) ->
- case get(Pid) of
- {MonRef, 1} ->
- erase(Pid),
- % don't check return of demonitor. The process could haved crashed causing
- % the {'DOWN', ...} message to be sent and the process unmonitored.
- erlang:demonitor(MonRef, [flush]);
- {MonRef, Num} ->
- put(Pid, {MonRef, Num-1})
- end,
- maybe_close_async(Fd).
+ {stop,normal,Fd}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-handle_info({'EXIT', _Pid, Reason}, Fd) ->
- {stop, Reason, Fd};
-handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, Fd) ->
- {MonitorRef, _RefCount} = erase(Pid),
- maybe_close_async(Fd).
-
-
-
-should_close(_Fd) ->
- case process_info(self(), links) of
- {links, [_]} ->
- % no linkers left (except our fd port). What about monitors?
- case process_info(self(), monitors) of
- {monitors, []} ->
- true;
- _ ->
- false
- end;
- {links, [_|_]} ->
- false
- end.
-
-maybe_close_async(Fd) ->
- case should_close(Fd) of
- true ->
- {stop,normal,Fd};
- false ->
- {noreply,Fd}
- end.
+handle_info(foo, Fd) ->
+ {stop, foo, Fd}.
diff --git a/src/couchdb/couch_httpd_show.erl b/src/couchdb/couch_httpd_show.erl
index 39c6cc1e..cc7d4d08 100644
--- a/src/couchdb/couch_httpd_show.erl
+++ b/src/couchdb/couch_httpd_show.erl
@@ -113,7 +113,7 @@ output_map_list(#httpd{mochi_req=MReq}=Req, Lang, ListSrc, View, Group, Db, Quer
% pass it into the view fold with closures
{ok, QueryServer} = couch_query_servers:start_view_list(Lang, ListSrc),
- StartListRespFun = fun(Req2, Etag, TotalViewCount, Offset) ->
+ StartListRespFun = fun(Req2, _Etag, TotalViewCount, Offset) ->
ExternalResp = couch_query_servers:render_list_head(QueryServer,
Req2, Db, TotalViewCount, Offset),
JsonResp = apply_etag(ExternalResp, CurrentEtag),
diff --git a/src/couchdb/couch_ref_counter.erl b/src/couchdb/couch_ref_counter.erl
new file mode 100644
index 00000000..95093f72
--- /dev/null
+++ b/src/couchdb/couch_ref_counter.erl
@@ -0,0 +1,104 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_ref_counter).
+-behaviour(gen_server).
+
+-export([start/1, init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+-export([drop/1,drop/2,add/1,add/2,count/1]).
+
+start(ChildProcs) ->
+ gen_server:start(couch_ref_counter, {self(), ChildProcs}, []).
+
+
+drop(RefCounterPid) ->
+ drop(RefCounterPid, self()).
+
+drop(RefCounterPid, Pid) ->
+ gen_server:cast(RefCounterPid, {drop, Pid}).
+
+
+add(RefCounterPid) ->
+ add(RefCounterPid, self()).
+
+add(RefCounterPid, Pid) ->
+ gen_server:call(RefCounterPid, {add, Pid}).
+
+count(RefCounterPid) ->
+ gen_server:call(RefCounterPid, count).
+
+% server functions
+
+-record(srv,
+ {
+ referrers=dict:new() % a dict of each ref counting proc.
+ }).
+
+init({Pid, ChildProcs}) ->
+ [link(ChildProc) || ChildProc <- ChildProcs],
+ Referrers = dict:from_list([{Pid, {erlang:monitor(process, Pid), 1}}]),
+ {ok, #srv{referrers=Referrers}}.
+
+
+terminate(_Reason, _Srv) ->
+ ok.
+
+
+handle_call({add, Pid},_From, #srv{referrers=Referrers}=Srv) ->
+ Referrers2 =
+ case dict:find(Pid, Referrers) of
+ error ->
+ dict:store(Pid, {erlang:monitor(process, Pid), 1}, Referrers);
+ {ok, {MonRef, RefCnt}} ->
+ dict:store(Pid, {MonRef, RefCnt + 1}, Referrers)
+ end,
+ {reply, ok, Srv#srv{referrers=Referrers2}};
+handle_call(count, _From, Srv) ->
+ {monitors, Monitors} = process_info(self(), monitors),
+ {reply, length(Monitors), Srv}.
+
+
+handle_cast({drop, Pid}, #srv{referrers=Referrers}=Srv) ->
+ Referrers2 =
+ case dict:find(Pid, Referrers) of
+ {ok, {MonRef, 1}} ->
+ erlang:demonitor(MonRef, [flush]),
+ dict:erase(Pid, Referrers);
+ {ok, {MonRef, Num}} ->
+ dict:store(Pid, {MonRef, Num-1}, Referrers)
+ end,
+ maybe_close_async(Srv#srv{referrers=Referrers2}).
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info({'DOWN', MonRef, _, Pid, _}, #srv{referrers=Referrers}=Srv) ->
+ {ok, {MonRef, _RefCount}} = dict:find(Pid, Referrers),
+ maybe_close_async(Srv#srv{referrers=dict:erase(Pid, Referrers)}).
+
+
+should_close() ->
+ case process_info(self(), monitors) of
+ {monitors, []} ->
+ true;
+ _ ->
+ false
+ end.
+
+maybe_close_async(Srv) ->
+ case should_close() of
+ true ->
+ {stop,normal,Srv};
+ false ->
+ {noreply,Srv}
+ end.
diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl
index 69cfa36c..39b33c63 100644
--- a/src/couchdb/couch_server.erl
+++ b/src/couchdb/couch_server.erl
@@ -195,15 +195,15 @@ try_close_lru(StartTime) ->
true ->
[{_, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime),
[{_, {MainPid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName),
- case couch_db:num_refs(MainPid) of
- 0 ->
+ case couch_db:is_idle(MainPid) of
+ true ->
exit(MainPid, kill),
receive {'EXIT', MainPid, _Reason} -> ok end,
true = ets:delete(couch_dbs_by_lru, LruTime),
true = ets:delete(couch_dbs_by_name, DbName),
true = ets:delete(couch_dbs_by_pid, MainPid),
ok;
- _NumRefs ->
+ false ->
% this still has referrers. Go ahead and give it a current lru time
% and try the next one in the table.
NewLruTime = now(),