diff options
-rw-r--r-- | src/couchdb/couch_view_group.erl | 56 | ||||
-rw-r--r-- | test/etap/200-view-group-no-db-leaks.t | 239 | ||||
-rw-r--r-- | test/etap/Makefile.am | 3 |
3 files changed, 260 insertions, 38 deletions
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 3cf829b6..b0cfe6e7 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -32,8 +32,7 @@ compactor_pid=nil, waiting_commit=false, waiting_list=[], - ref_counter=nil, - db_update_notifier=nil + ref_counter=nil }). % api methods @@ -85,20 +84,12 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> ReturnPid ! {Ref, self(), {error, invalid_view_seq}}, ignore; _ -> - couch_db:monitor(Db), + couch_db:close(Db), {ok, RefCounter} = couch_ref_counter:start([Fd]), - Server = self(), - {ok, Notifier} = couch_db_update_notifier:start_link( - fun({compacted, DbName1}) when DbName1 =:= DbName -> - ok = gen_server:cast(Server, reopen_db); - (_) -> - ok - end), {ok, #group_state{ - db_update_notifier=Notifier, - db_name=couch_db:name(Db), + db_name=DbName, init_args=InitArgs, - group=Group, + group=Group#group{db=nil}, ref_counter=RefCounter}} end; Error -> @@ -128,11 +119,11 @@ init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> handle_call({request_group, RequestSeq}, From, #group_state{ db_name=DbName, - group=#group{current_seq=Seq, db=OldDb}=Group, + group=#group{current_seq=Seq}=Group, updater_pid=nil, waiting_list=WaitList }=State) when RequestSeq > Seq -> - {ok, Db} = reopen_db(DbName, OldDb), + {ok, Db} = couch_db:open_int(DbName, []), Group2 = Group#group{db=Db}, Owner = self(), Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end), @@ -167,11 +158,11 @@ handle_call(request_group_info, _From, State) -> handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} = State) -> #group_state{ - group = #group{name = GroupId, sig = GroupSig, db = OldDb} = Group, + group = #group{name = GroupId, sig = GroupSig} = Group, init_args = {RootDir, DbName, _} } = State, ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), - {ok, Db} = reopen_db(DbName, OldDb), + {ok, Db} = couch_db:open_int(DbName, []), {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), NewGroup = reset_file(Db, Fd, DbName, Group), Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), @@ -219,9 +210,14 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, Else -> couch_db:close(Else) end, + case NewGroup#group.db of + nil -> ok; + _ -> couch_db:close(NewGroup#group.db) + end, + self() ! delayed_commit, {noreply, State#group_state{ - group=NewGroup, + group=NewGroup#group{db = nil}, ref_counter=NewRefCounter, compactor_pid=nil, updater_pid=NewUpdaterPid @@ -244,7 +240,7 @@ handle_cast({compact_done, NewGroup}, State) -> couch_db:close(Db), #group{name=GroupId} = NewGroup2, Pid2 = couch_view:get_group_server(DbName, GroupId), - gen_server:cast(Pid2, {compact_done, NewGroup2}) + gen_server:cast(Pid2, {compact_done, NewGroup2#group{db = nil}}) end end), {noreply, State#group_state{compactor_pid = Pid}}; @@ -265,11 +261,7 @@ handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid} {noreply, State#group_state{group=NewGroup, waiting_commit=true}}; handle_cast({partial_update, _, _}, State) -> %% message from an old (probably pre-compaction) updater; ignore - {noreply, State}; - -handle_cast(reopen_db, #group_state{group = Group, db_name = DbName} = State) -> - {ok, Db} = reopen_db(DbName, Group#group.db), - {noreply, State#group_state{group = Group#group{db = Db}}}. + {noreply, State}. handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {ok, Db} = couch_db:open_int(DbName, []), @@ -347,15 +339,10 @@ handle_info({'EXIT', FromPid, {{nocatch, Reason}, _Trace}}, State) -> handle_info({'EXIT', FromPid, Reason}, State) -> ?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]), - {stop, Reason, State}; - -handle_info({'DOWN',_,_,_,_}, State) -> - ?LOG_INFO("Shutting down view group server, monitored db is closing.", []), - {stop, normal, reply_all(State, shutdown)}. + {stop, Reason, State}. terminate(Reason, #group_state{updater_pid=Update, compactor_pid=Compact}=S) -> - couch_db_update_notifier:stop(S#group_state.db_update_notifier), reply_all(S, Reason), couch_util:shutdown_sync(Update), couch_util:shutdown_sync(Compact), @@ -387,8 +374,8 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], State#group_state{waiting_list=[]}. -prepare_group({RootDir, DbName, #group{sig=Sig, db=OldDb}=Group}, ForceReset)-> - case reopen_db(DbName, OldDb) of +prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> + case couch_db:open_int(DbName, []) of {ok, Db} -> case open_index_file(RootDir, DbName, Sig) of {ok, Fd} -> @@ -648,8 +635,3 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}= ViewStates2, Views), Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, views=Views2}. - -reopen_db(DbName, nil) -> - couch_db:open_int(DbName, []); -reopen_db(_DbName, Db) -> - couch_db:reopen(Db). diff --git a/test/etap/200-view-group-no-db-leaks.t b/test/etap/200-view-group-no-db-leaks.t new file mode 100644 index 00000000..87181d84 --- /dev/null +++ b/test/etap/200-view-group-no-db-leaks.t @@ -0,0 +1,239 @@ +#!/usr/bin/env escript +%% -*- erlang -*- + +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-record(user_ctx, { + name = null, + roles = [], + handler +}). + +-define(LATEST_DISK_VERSION, 5). + +-record(db_header, + {disk_version = ?LATEST_DISK_VERSION, + update_seq = 0, + unused = 0, + fulldocinfo_by_id_btree_state = nil, + docinfo_by_seq_btree_state = nil, + local_docs_btree_state = nil, + purge_seq = 0, + purged_docs = nil, + security_ptr = nil, + revs_limit = 1000 +}). + +-record(db, { + main_pid = nil, + update_pid = nil, + compactor_pid = nil, + instance_start_time, % number of microsecs since jan 1 1970 as a binary string + fd, + fd_ref_counter, + header = #db_header{}, + committed_update_seq, + fulldocinfo_by_id_btree, + docinfo_by_seq_btree, + local_docs_btree, + update_seq, + name, + filepath, + validate_doc_funs = [], + security = [], + security_ptr = nil, + user_ctx = #user_ctx{}, + waiting_delayed_commit = nil, + revs_limit = 1000, + fsync_options = [], + is_sys_db = false +}). + +test_db_name() -> <<"couch_test_view_group_db_leaks">>. +ddoc_name() -> <<"foo">>. + +main(_) -> + test_util:init_code_path(), + + etap:plan(13), + case (catch test()) of + ok -> + etap:end_tests(); + Other -> + etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), + etap:bail(Other) + end, + ok. + +test() -> + couch_server_sup:start_link(test_util:config_files()), + timer:sleep(1000), + put(addr, couch_config:get("httpd", "bind_address", "127.0.0.1")), + put(port, integer_to_list(mochiweb_socket_server:get(couch_httpd, port))), + application:start(inets), + + delete_db(), + create_db(), + + create_docs(), + create_design_doc(), + query_view(), + check_db_ref_count(), + + create_new_doc(<<"doc1000">>), + query_view(), + check_db_ref_count(), + + Ref1 = get_db_ref_counter(), + compact_db(), + check_db_ref_count(), + Ref2 = get_db_ref_counter(), + etap:isnt(Ref1, Ref2, "DB ref counter changed"), + etap:is(false, is_process_alive(Ref1), "old DB ref counter is not alive"), + + compact_view_group(), + check_db_ref_count(), + Ref3 = get_db_ref_counter(), + etap:is(Ref3, Ref2, "DB ref counter didn't change"), + + create_new_doc(<<"doc1001">>), + query_view(), + check_db_ref_count(), + + ok = timer:sleep(1000), + delete_db(), + couch_server_sup:stop(), + ok. + +admin_user_ctx() -> + {user_ctx, #user_ctx{roles=[<<"_admin">>]}}. + +create_db() -> + {ok, Db} = couch_db:create(test_db_name(), [admin_user_ctx()]), + ok = couch_db:close(Db). + +delete_db() -> + couch_server:delete(test_db_name(), [admin_user_ctx()]). + +compact_db() -> + {ok, Db} = couch_db:open_int(test_db_name(), []), + ok = couch_db:start_compact(Db), + ok = couch_db:close(Db), + wait_db_compact_done(10). + +wait_db_compact_done(0) -> + etap:is(true, false, "DB compaction didn't finish"); +wait_db_compact_done(N) -> + {ok, Db} = couch_db:open_int(test_db_name(), []), + ok = couch_db:close(Db), + case is_pid(Db#db.compactor_pid) of + false -> + ok; + true -> + ok = timer:sleep(500), + wait_db_compact_done(N - 1) + end. + +compact_view_group() -> + ok = couch_view_compactor:start_compact(test_db_name(), ddoc_name()), + wait_view_compact_done(10). + +wait_view_compact_done(0) -> + etap:is(true, false, "view group compaction didn't finish"); +wait_view_compact_done(N) -> + {ok, {{_, Code, _}, _Headers, Body}} = http:request( + get, + {db_url() ++ "/_design/" ++ binary_to_list(ddoc_name()) ++ "/_info", []}, + [], + [{sync, true}]), + etap:is(Code, 200, "got view group info"), + {Info} = couch_util:json_decode(Body), + {IndexInfo} = couch_util:get_value(<<"view_index">>, Info), + CompactRunning = couch_util:get_value(<<"compact_running">>, IndexInfo), + case CompactRunning of + false -> + ok; + true -> + ok = timer:sleep(500), + wait_view_compact_done(N - 1) + end. + +get_db_ref_counter() -> + {ok, #db{fd_ref_counter = Ref} = Db} = couch_db:open_int(test_db_name(), []), + ok = couch_db:close(Db), + Ref. + +check_db_ref_count() -> + {ok, #db{fd_ref_counter = Ref} = Db} = couch_db:open_int(test_db_name(), []), + ok = couch_db:close(Db), + etap:is(couch_ref_counter:count(Ref), 2, + "DB ref counter is only held by couch_db and couch_db_updater"), + ok. + +create_docs() -> + {ok, Db} = couch_db:open(test_db_name(), [admin_user_ctx()]), + Doc1 = couch_doc:from_json_obj({[ + {<<"_id">>, <<"doc1">>}, + {<<"value">>, 1} + ]}), + Doc2 = couch_doc:from_json_obj({[ + {<<"_id">>, <<"doc2">>}, + {<<"value">>, 2} + + ]}), + Doc3 = couch_doc:from_json_obj({[ + {<<"_id">>, <<"doc3">>}, + {<<"value">>, 3} + ]}), + {ok, _} = couch_db:update_docs(Db, [Doc1, Doc2, Doc3]), + couch_db:ensure_full_commit(Db), + couch_db:close(Db). + +create_design_doc() -> + {ok, Db} = couch_db:open(test_db_name(), [admin_user_ctx()]), + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, <<"_design/", (ddoc_name())/binary>>}, + {<<"language">>, <<"javascript">>}, + {<<"views">>, {[ + {<<"bar">>, {[ + {<<"map">>, <<"function(doc) { emit(doc._id, null); }">>} + ]}} + ]}} + ]}), + {ok, _} = couch_db:update_docs(Db, [DDoc]), + couch_db:ensure_full_commit(Db), + couch_db:close(Db). + +create_new_doc(Id) -> + {ok, Db} = couch_db:open(test_db_name(), [admin_user_ctx()]), + Doc666 = couch_doc:from_json_obj({[ + {<<"_id">>, Id}, + {<<"value">>, 999} + ]}), + {ok, _} = couch_db:update_docs(Db, [Doc666]), + couch_db:ensure_full_commit(Db), + couch_db:close(Db). + +db_url() -> + "http://" ++ get(addr) ++ ":" ++ get(port) ++ "/" ++ + binary_to_list(test_db_name()). + +query_view() -> + {ok, {{_, Code, _}, _Headers, _Body}} = http:request( + get, + {db_url() ++ "/_design/" ++ binary_to_list(ddoc_name()) ++ + "/_view/bar", []}, + [], + [{sync, true}]), + etap:is(Code, 200, "got view response"), + ok. diff --git a/test/etap/Makefile.am b/test/etap/Makefile.am index 756b8758..9ba3fcfa 100644 --- a/test/etap/Makefile.am +++ b/test/etap/Makefile.am @@ -84,4 +84,5 @@ EXTRA_DIST = \ 172-os-daemon-errors.t \ 173-os-daemon-cfg-register.t \ 180-http-proxy.ini \ - 180-http-proxy.t + 180-http-proxy.t \ + 200-view-group-no-db-leaks.t |