summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_httpd_db.erl2
-rw-r--r--src/couchdb/couch_view_compactor.erl19
-rw-r--r--src/couchdb/couch_view_group.erl29
-rw-r--r--src/couchdb/couch_view_updater.erl13
-rwxr-xr-xtest/etap/200-view-group-no-db-leaks.t2
-rwxr-xr-xtest/etap/201-view-group-shutdown.t300
-rw-r--r--test/etap/Makefile.am1
7 files changed, 337 insertions, 29 deletions
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index db430bbd..0bf97e26 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -128,7 +128,7 @@ handle_changes_req1(Req, Db) ->
handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, Db) ->
ok = couch_db:check_is_admin(Db),
couch_httpd:validate_ctype(Req, "application/json"),
- ok = couch_view_compactor:start_compact(DbName, Id),
+ {ok, _} = couch_view_compactor:start_compact(DbName, Id),
send_json(Req, 202, {[{ok, true}]});
handle_compact_req(#httpd{method='POST'}=Req, Db) ->
diff --git a/src/couchdb/couch_view_compactor.erl b/src/couchdb/couch_view_compactor.erl
index 734605f0..43fdbc98 100644
--- a/src/couchdb/couch_view_compactor.erl
+++ b/src/couchdb/couch_view_compactor.erl
@@ -20,7 +20,7 @@
%% @doc Compacts the views. GroupId must not include the _design/ prefix
start_compact(DbName, GroupId) ->
Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>),
- gen_server:cast(Pid, {start_compact, fun compact_group/3}).
+ gen_server:call(Pid, {start_compact, fun compact_group/3}).
%%=============================================================================
%% internal functions
@@ -42,7 +42,6 @@ compact_group(Group, EmptyGroup, DbName) ->
{ok, Db} = couch_db:open_int(DbName, []),
{ok, {Count, _}} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
- couch_db:close(Db),
<<"_design", ShortName/binary>> = GroupId,
TaskName = <<DbName/binary, ShortName/binary>>,
@@ -77,9 +76,23 @@ compact_group(Group, EmptyGroup, DbName) ->
views=NewViews,
current_seq=Seq
},
+ maybe_retry_compact(Db, GroupId, NewGroup).
+maybe_retry_compact(#db{name = DbName} = Db, GroupId, NewGroup) ->
Pid = couch_view:get_group_server(DbName, GroupId),
- gen_server:cast(Pid, {compact_done, NewGroup}).
+ case gen_server:call(Pid, {compact_done, NewGroup}) of
+ ok ->
+ couch_db:close(Db);
+ update ->
+ {ok, Db2} = couch_db:reopen(Db),
+ {_, Ref} = erlang:spawn_monitor(fun() ->
+ couch_view_updater:update(nil, NewGroup, Db2)
+ end),
+ receive
+ {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
+ maybe_retry_compact(Db2, GroupId, NewGroup2)
+ end
+ end.
%% @spec compact_view(View, EmptyView, Retry) -> CompactView
compact_view(View, EmptyView) ->
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 448a7dcf..ef9b02ad 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -151,9 +151,9 @@ handle_call({request_group, RequestSeq}, From,
handle_call(request_group_info, _From, State) ->
GroupInfo = get_group_info(State),
- {reply, {ok, GroupInfo}, State}.
+ {reply, {ok, GroupInfo}, State};
-handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
+handle_call({start_compact, CompactFun}, _From, #group_state{compactor_pid=nil}
= State) ->
#group_state{
group = #group{name = GroupId, sig = GroupSig} = Group,
@@ -165,12 +165,12 @@ handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
NewGroup = reset_file(Db, Fd, DbName, Group),
couch_db:close(Db),
Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end),
- {noreply, State#group_state{compactor_pid = Pid}};
-handle_cast({start_compact, _}, State) ->
+ {reply, {ok, Pid}, State#group_state{compactor_pid = Pid}};
+handle_call({start_compact, _}, _From, #group_state{compactor_pid=Pid} = State) ->
%% compact already running, this is a no-op
- {noreply, State};
+ {reply, {ok, Pid}, State};
-handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
+handle_call({compact_done, #group{current_seq=NewSeq} = NewGroup}, _From,
#group_state{group = #group{current_seq=OldSeq}} = State)
when NewSeq >= OldSeq ->
#group_state{
@@ -206,31 +206,20 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
{ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]),
self() ! delayed_commit,
- {noreply, State#group_state{
+ {reply, ok, State#group_state{
group=NewGroup,
ref_counter=NewRefCounter,
compactor_pid=nil,
updater_pid=NewUpdaterPid
}};
-handle_cast({compact_done, NewGroup}, State) ->
+handle_call({compact_done, NewGroup}, _From, State) ->
#group_state{
group = #group{name = GroupId, current_seq = CurrentSeq},
init_args={_RootDir, DbName, _}
} = State,
?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
"compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
- Pid = spawn_link(fun() ->
- {_,Ref} = erlang:spawn_monitor(fun() ->
- couch_view_updater:update(nil, NewGroup, DbName)
- end),
- receive
- {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
- #group{name=GroupId} = NewGroup2,
- Pid2 = couch_view:get_group_server(DbName, GroupId),
- gen_server:cast(Pid2, {compact_done, NewGroup2})
- end
- end),
- {noreply, State#group_state{compactor_pid = Pid}};
+ {reply, update, State}.
handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid}
= State) ->
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
index 2cc390df..9ecd95c8 100644
--- a/src/couchdb/couch_view_updater.erl
+++ b/src/couchdb/couch_view_updater.erl
@@ -18,7 +18,15 @@
-spec update(_, #group{}, Dbname::binary()) -> no_return().
-update(Owner, Group, DbName) ->
+update(Owner, Group, DbName) when is_binary(DbName) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ try
+ update(Owner, Group, Db)
+ after
+ couch_db:close(Db)
+ end;
+
+update(Owner, Group, #db{name = DbName} = Db) ->
#group{
name = GroupName,
current_seq = Seq,
@@ -26,7 +34,6 @@ update(Owner, Group, DbName) ->
} = Group,
couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>),
- {ok, Db} = couch_db:open_int(DbName, []),
DbPurgeSeq = couch_db:get_purge_seq(Db),
Group2 =
if DbPurgeSeq == PurgeSeq ->
@@ -36,7 +43,6 @@ update(Owner, Group, DbName) ->
purge_index(Group, Db);
true ->
couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
- couch_db:close(Db),
exit(reset)
end,
{ok, MapQueue} = couch_work_queue:new(
@@ -74,7 +80,6 @@ update(Owner, Group, DbName) ->
couch_task_status:set_update_frequency(0),
couch_task_status:update("Finishing."),
couch_work_queue:close(MapQueue),
- couch_db:close(Db),
receive {new_group, NewGroup} ->
exit({new_group,
NewGroup#group{current_seq=couch_db:get_update_seq(Db)}})
diff --git a/test/etap/200-view-group-no-db-leaks.t b/test/etap/200-view-group-no-db-leaks.t
index 9c77f1a8..f506b7dc 100755
--- a/test/etap/200-view-group-no-db-leaks.t
+++ b/test/etap/200-view-group-no-db-leaks.t
@@ -165,7 +165,7 @@ wait_db_compact_done(N) ->
end.
compact_view_group() ->
- ok = couch_view_compactor:start_compact(test_db_name(), ddoc_name()),
+ {ok, _} = couch_view_compactor:start_compact(test_db_name(), ddoc_name()),
wait_view_compact_done(10).
wait_view_compact_done(0) ->
diff --git a/test/etap/201-view-group-shutdown.t b/test/etap/201-view-group-shutdown.t
new file mode 100755
index 00000000..03feac2b
--- /dev/null
+++ b/test/etap/201-view-group-shutdown.t
@@ -0,0 +1,300 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-record(user_ctx, {
+ name = null,
+ roles = [],
+ handler
+}).
+
+-record(db, {
+ main_pid = nil,
+ update_pid = nil,
+ compactor_pid = nil,
+ instance_start_time, % number of microsecs since jan 1 1970 as a binary string
+ fd,
+ fd_ref_counter,
+ header = nil,
+ committed_update_seq,
+ fulldocinfo_by_id_btree,
+ docinfo_by_seq_btree,
+ local_docs_btree,
+ update_seq,
+ name,
+ filepath,
+ validate_doc_funs = [],
+ security = [],
+ security_ptr = nil,
+ user_ctx = #user_ctx{},
+ waiting_delayed_commit = nil,
+ revs_limit = 1000,
+ fsync_options = [],
+ is_sys_db = false
+}).
+
+main_db_name() -> <<"couch_test_view_group_shutdown">>.
+
+
+main(_) ->
+ test_util:init_code_path(),
+
+ etap:plan(17),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+ etap:bail(Other)
+ end,
+ ok.
+
+
+test() ->
+ couch_server_sup:start_link(test_util:config_files()),
+ ok = couch_config:set("couchdb", "max_dbs_open", "3", false),
+ ok = couch_config:set("couchdb", "delayed_commits", "false", false),
+ crypto:start(),
+
+ % Test that while a view group is being compacted its database can not
+ % be closed by the database LRU system.
+ test_view_group_compaction(),
+
+ couch_server_sup:stop(),
+ ok.
+
+
+test_view_group_compaction() ->
+ {ok, DbWriter3} = create_db(<<"couch_test_view_group_shutdown_w3">>),
+ ok = couch_db:close(DbWriter3),
+
+ {ok, MainDb} = create_main_db(),
+ ok = couch_db:close(MainDb),
+
+ {ok, DbWriter1} = create_db(<<"couch_test_view_group_shutdown_w1">>),
+ ok = couch_db:close(DbWriter1),
+
+ {ok, DbWriter2} = create_db(<<"couch_test_view_group_shutdown_w2">>),
+ ok = couch_db:close(DbWriter2),
+
+ Writer1 = spawn_writer(DbWriter1#db.name),
+ Writer2 = spawn_writer(DbWriter2#db.name),
+ etap:is(is_process_alive(Writer1), true, "Spawned writer 1"),
+ etap:is(is_process_alive(Writer2), true, "Spawned writer 2"),
+
+ etap:is(get_writer_status(Writer1), ok, "Writer 1 opened his database"),
+ etap:is(get_writer_status(Writer2), ok, "Writer 2 opened his database"),
+
+ {ok, CompactPid} = couch_view_compactor:start_compact(
+ MainDb#db.name, <<"foo">>),
+ MonRef = erlang:monitor(process, CompactPid),
+
+ % Add some more docs to database and trigger view update
+ {ok, MainDb2} = couch_db:open_int(MainDb#db.name, []),
+ ok = populate_main_db(MainDb2, 3, 3),
+ update_view(MainDb2#db.name, <<"_design/foo">>, <<"foo">>),
+ ok = couch_db:close(MainDb2),
+
+ % Assuming the view compaction takes more than 50ms to complete
+ ok = timer:sleep(50),
+ Writer3 = spawn_writer(DbWriter3#db.name),
+ etap:is(is_process_alive(Writer3), true, "Spawned writer 3"),
+
+ etap:is(get_writer_status(Writer3), {error, all_dbs_active},
+ "Writer 3 got {error, all_dbs_active} when opening his database"),
+
+ etap:is(is_process_alive(Writer1), true, "Writer 1 still alive"),
+ etap:is(is_process_alive(Writer2), true, "Writer 2 still alive"),
+ etap:is(is_process_alive(Writer3), true, "Writer 3 still alive"),
+
+ receive
+ {'DOWN', MonRef, process, CompactPid, normal} ->
+ etap:diag("View group compaction successful"),
+ ok;
+ {'DOWN', MonRef, process, CompactPid, _Reason} ->
+ etap:bail("Failure compacting view group")
+ end,
+
+ ok = timer:sleep(2000),
+
+ etap:is(writer_try_again(Writer3), ok,
+ "Told writer 3 to try open his database again"),
+ etap:is(get_writer_status(Writer3), ok,
+ "Writer 3 was able to open his database"),
+
+ etap:is(is_process_alive(Writer1), true, "Writer 1 still alive"),
+ etap:is(is_process_alive(Writer2), true, "Writer 2 still alive"),
+ etap:is(is_process_alive(Writer3), true, "Writer 3 still alive"),
+
+ etap:is(stop_writer(Writer1), ok, "Stopped writer 1"),
+ etap:is(stop_writer(Writer2), ok, "Stopped writer 2"),
+ etap:is(stop_writer(Writer3), ok, "Stopped writer 3"),
+
+ delete_db(MainDb),
+ delete_db(DbWriter1),
+ delete_db(DbWriter2),
+ delete_db(DbWriter3).
+
+
+create_main_db() ->
+ {ok, Db} = create_db(main_db_name()),
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, <<"_design/foo">>},
+ {<<"language">>, <<"javascript">>},
+ {<<"views">>, {[
+ {<<"foo">>, {[
+ {<<"map">>, <<"function(doc) { emit(doc._id, doc); }">>}
+ ]}},
+ {<<"foo2">>, {[
+ {<<"map">>, <<"function(doc) { emit(doc._id, doc); }">>}
+ ]}},
+ {<<"foo3">>, {[
+ {<<"map">>, <<"function(doc) { emit(doc._id, doc); }">>}
+ ]}},
+ {<<"foo4">>, {[
+ {<<"map">>, <<"function(doc) { emit(doc._id, doc); }">>}
+ ]}},
+ {<<"foo5">>, {[
+ {<<"map">>, <<"function(doc) { emit(doc._id, doc); }">>}
+ ]}}
+ ]}}
+ ]}),
+ {ok, _} = couch_db:update_doc(Db, DDoc, []),
+ ok = populate_main_db(Db, 1000, 20000),
+ update_view(Db#db.name, <<"_design/foo">>, <<"foo">>),
+ {ok, Db}.
+
+
+populate_main_db(Db, BatchSize, N) when N > 0 ->
+ Docs = lists:map(
+ fun(_) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, couch_uuids:new()},
+ {<<"value">>, base64:encode(crypto:rand_bytes(1000))}
+ ]})
+ end,
+ lists:seq(1, BatchSize)),
+ {ok, _} = couch_db:update_docs(Db, Docs, []),
+ populate_main_db(Db, BatchSize, N - length(Docs));
+populate_main_db(_Db, _, _) ->
+ ok.
+
+
+update_view(DbName, DDocName, ViewName) ->
+ % Use a dedicated process - we can't explicitly drop the #group ref counter
+ Pid = spawn(fun() ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ couch_view:get_map_view(Db, DDocName, ViewName, false),
+ ok = couch_db:close(Db)
+ end),
+ MonRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MonRef, process, Pid, normal} ->
+ etap:diag("View group updated"),
+ ok;
+ {'DOWN', MonRef, process, Pid, _Reason} ->
+ etap:bail("Failure updating view group")
+ end.
+
+
+create_db(DbName) ->
+ {ok, Db} = couch_db:create(
+ DbName,
+ [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]),
+ {ok, Db}.
+
+
+delete_db(#db{name = DbName, main_pid = Pid}) ->
+ ok = couch_server:delete(
+ DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]),
+ MonRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MonRef, process, Pid, _Reason} ->
+ ok
+ after 30000 ->
+ etap:bail("Timeout deleting database")
+ end.
+
+
+spawn_writer(DbName) ->
+ Parent = self(),
+ spawn(fun() ->
+ process_flag(priority, high),
+ writer_loop(DbName, Parent)
+ end).
+
+
+get_writer_status(Writer) ->
+ Ref = make_ref(),
+ Writer ! {get_status, Ref},
+ receive
+ {db_open, Ref} ->
+ ok;
+ {db_open_error, Error, Ref} ->
+ Error
+ after 5000 ->
+ timeout
+ end.
+
+
+writer_try_again(Writer) ->
+ Ref = make_ref(),
+ Writer ! {try_again, Ref},
+ receive
+ {ok, Ref} ->
+ ok
+ after 5000 ->
+ timeout
+ end.
+
+
+stop_writer(Writer) ->
+ Ref = make_ref(),
+ Writer ! {stop, Ref},
+ receive
+ {ok, Ref} ->
+ ok
+ after 5000 ->
+ etap:bail("Timeout stopping writer process")
+ end.
+
+
+% Just keep the database open, no need to actually do something on it.
+writer_loop(DbName, Parent) ->
+ case couch_db:open_int(DbName, []) of
+ {ok, Db} ->
+ writer_loop_1(Db, Parent);
+ Error ->
+ writer_loop_2(DbName, Parent, Error)
+ end.
+
+writer_loop_1(Db, Parent) ->
+ receive
+ {get_status, Ref} ->
+ Parent ! {db_open, Ref},
+ writer_loop_1(Db, Parent);
+ {stop, Ref} ->
+ ok = couch_db:close(Db),
+ Parent ! {ok, Ref}
+ end.
+
+writer_loop_2(DbName, Parent, Error) ->
+ receive
+ {get_status, Ref} ->
+ Parent ! {db_open_error, Error, Ref},
+ writer_loop_2(DbName, Parent, Error);
+ {try_again, Ref} ->
+ Parent ! {ok, Ref},
+ writer_loop(DbName, Parent)
+ end.
diff --git a/test/etap/Makefile.am b/test/etap/Makefile.am
index c3a4ddab..313945d7 100644
--- a/test/etap/Makefile.am
+++ b/test/etap/Makefile.am
@@ -88,5 +88,6 @@ EXTRA_DIST = \
180-http-proxy.t \
190-oauth.t \
200-view-group-no-db-leaks.t \
+ 201-view-group-shutdown.t \
210-os-proc-pool.t