diff options
-rw-r--r-- | etc/couchdb/default.ini.tpl.in | 1 | ||||
-rw-r--r-- | src/couchdb/Makefile.am | 2 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 21 | ||||
-rw-r--r-- | src/couchdb/couch_view.erl | 2 | ||||
-rw-r--r-- | src/couchdb/couch_view_compactor.erl | 98 | ||||
-rw-r--r-- | src/couchdb/couch_view_group.erl | 96 |
6 files changed, 194 insertions, 26 deletions
diff --git a/etc/couchdb/default.ini.tpl.in b/etc/couchdb/default.ini.tpl.in index dd63db0b..fa715b18 100644 --- a/etc/couchdb/default.ini.tpl.in +++ b/etc/couchdb/default.ini.tpl.in @@ -52,6 +52,7 @@ _restart = {couch_httpd_misc_handlers, handle_restart_req} _stats = {couch_httpd_stats_handlers, handle_stats_req} [httpd_db_handlers] +_compact = {couch_httpd_db, handle_compact_req} _design = {couch_httpd_db, handle_design_req} _temp_view = {couch_httpd_view, handle_temp_view_req} diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 8bea5290..8b782221 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -73,6 +73,7 @@ source_files = \ couch_task_status.erl \ couch_util.erl \ couch_view.erl \ + couch_view_compactor.erl \ couch_view_updater.erl \ couch_view_group.erl \ couch_db_updater.erl @@ -114,6 +115,7 @@ compiled_files = \ couch_task_status.beam \ couch_util.beam \ couch_view.beam \ + couch_view_compactor.beam \ couch_view_updater.beam \ couch_view_group.beam \ couch_db_updater.beam diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 07815bda..209dc75e 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -13,7 +13,8 @@ -module(couch_httpd_db). -include("couch_db.hrl"). --export([handle_request/1, handle_design_req/2, db_req/2, couch_doc_open/4]). +-export([handle_request/1, handle_compact_req/2, handle_design_req/2, + db_req/2, couch_doc_open/4]). -import(couch_httpd, [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2, @@ -41,6 +42,17 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method, do_db_req(Req, Handler) end. +handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, _Db) -> + ok = couch_view_compactor:start_compact(DbName, Id), + send_json(Req, 202, {[{ok, true}]}); + +handle_compact_req(#httpd{method='POST'}=Req, Db) -> + ok = couch_db:start_compact(Db), + send_json(Req, 202, {[{ok, true}]}); + +handle_compact_req(Req, _Db) -> + send_method_not_allowed(Req, "POST"). + handle_design_req(#httpd{ path_parts=[_DbName,_Design,_DesName, <<"_",_/binary>> = Action | _Rest], design_url_handlers = DesignUrlHandlers @@ -189,13 +201,6 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) -> db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); -db_req(#httpd{method='POST',path_parts=[_,<<"_compact">>]}=Req, Db) -> - ok = couch_db:start_compact(Db), - send_json(Req, 202, {[{ok, true}]}); - -db_req(#httpd{path_parts=[_,<<"_compact">>]}=Req, _Db) -> - send_method_not_allowed(Req, "POST"); - db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) -> {IdsRevs} = couch_httpd:json_body(Req), IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs], diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 5fb2ec5a..bac9f635 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -17,7 +17,7 @@ detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2, code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4, get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/7, - extract_map_view/1]). + extract_map_view/1,get_group_server/2]). -include("couch_db.hrl"). diff --git a/src/couchdb/couch_view_compactor.erl b/src/couchdb/couch_view_compactor.erl new file mode 100644 index 00000000..63c0ff75 --- /dev/null +++ b/src/couchdb/couch_view_compactor.erl @@ -0,0 +1,98 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_view_compactor). + +-include ("couch_db.hrl"). + +-export([start_compact/2]). + +%% @spec start_compact(DbName::binary(), GroupId:binary()) -> ok +%% @doc Compacts the views. GroupId must not include the _design/ prefix +start_compact(DbName, GroupId) -> + Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>), + gen_server:cast(Pid, {start_compact, fun compact_group/2}). + +%%============================================================================= +%% internal functions +%%============================================================================= + +%% @spec compact_group(Group, NewGroup) -> ok +compact_group(Group, EmptyGroup) -> + #group{ + current_seq = Seq, + id_btree = IdBtree, + name = GroupId, + views = Views + } = Group, + + #group{ + db = Db, + id_btree = EmptyIdBtree, + views = EmptyViews + } = EmptyGroup, + + {ok, {Count, _}} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree), + + <<"_design", ShortName/binary>> = GroupId, + DbName = couch_db:name(Db), + TaskName = <<DbName/binary, ShortName/binary>>, + couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>), + + Fun = fun(KV, {Bt, Acc, TotalCopied}) -> + if TotalCopied rem 10000 == 0 -> + couch_task_status:update("Copied ~p of ~p Ids (~p%)", + [TotalCopied, Count, (TotalCopied*100) div Count]), + {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])), + {ok, {Bt2, [], TotalCopied+1}}; + true -> + {ok, {Bt, [KV|Acc], TotalCopied+1}} + end + end, + {ok, {Bt3, Uncopied, _Total}} = couch_btree:foldl(IdBtree, Fun, + {EmptyIdBtree, [], 0}), + {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)), + + NewViews = lists:map(fun({View, EmptyView}) -> + compact_view(View, EmptyView) + end, lists:zip(Views, EmptyViews)), + + NewGroup = EmptyGroup#group{ + id_btree=NewIdBtree, + views=NewViews, + current_seq=Seq + }, + + Pid = couch_view:get_group_server(DbName, GroupId), + gen_server:cast(Pid, {compact_done, NewGroup}). + +%% @spec compact_view(View, EmptyView, Retry) -> CompactView +compact_view(View, EmptyView) -> + {ok, Count} = couch_view:get_row_count(View), + + %% Key is {Key,DocId} + Fun = fun(KV, {Bt, Acc, TotalCopied}) -> + if TotalCopied rem 10000 == 0 -> + couch_task_status:update("View #~p: copied ~p of ~p KVs (~p%)", + [View#view.id_num, TotalCopied, Count, (TotalCopied*100) div Count]), + {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])), + {ok, {Bt2, [], TotalCopied + 1}}; + true -> + {ok, {Bt, [KV|Acc], TotalCopied + 1}} + end + end, + + {ok, {Bt3, Uncopied, _Total}} = couch_btree:foldl(View#view.btree, Fun, + {EmptyView#view.btree, [], 0}), + {ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)), + EmptyView#view{btree = NewBt}. + diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 4e7d7767..e44d637c 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -29,15 +29,18 @@ init_args, group, updater_pid=nil, + compactor_pid=nil, waiting_commit=false, - waiting_list=[] + waiting_list=[], + ref_counter=nil }). % api methods request_group(Pid, Seq) -> ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), case gen_server:call(Pid, {request_group, Seq}, infinity) of - {ok, Group} -> + {ok, Group, RefCounter} -> + couch_ref_counter:add(RefCounter), {ok, Group}; Else -> ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]), @@ -70,14 +73,16 @@ start_link(InitArgs) -> init({InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), case prepare_group(InitArgs, false) of - {ok, #group{db=Db}=Group} -> + {ok, #group{db=Db, fd=Fd}=Group} -> couch_db:monitor(Db), Pid = spawn_link(fun()-> couch_view_updater:update(Group) end), + {ok, RefCounter} = couch_ref_counter:start([Fd]), {ok, #group_state{ db_name=couch_db:name(Db), init_args=InitArgs, updater_pid = Pid, - group=Group}}; + group=Group, + ref_counter=RefCounter}}; Error -> ReturnPid ! {Ref, self(), Error}, ignore @@ -120,10 +125,11 @@ handle_call({request_group, RequestSeq}, From, % If the request seqence is less than or equal to the seq_id of a known Group, % we respond with that Group. -handle_call({request_group, RequestSeq}, _From, - #group_state{group=#group{current_seq=GroupSeq}=Group}=State) - when RequestSeq =< GroupSeq -> - {reply, {ok, Group}, State}; +handle_call({request_group, RequestSeq}, _From, #group_state{ + group = #group{current_seq=GroupSeq} = Group, + ref_counter = RefCounter + } = State) when RequestSeq =< GroupSeq -> + {reply, {ok, Group, RefCounter}, State}; % Otherwise: TargetSeq => RequestSeq > GroupSeq % We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq @@ -134,9 +140,63 @@ handle_call({request_group, RequestSeq}, From, }, infinity}. -handle_cast(foo, State) -> - {ok, State}. +handle_cast({start_compact, CompactFun}, #group_state{ compactor_pid=nil, + group=Group, init_args={view, RootDir, DbName, GroupId} } = State) -> + ?LOG_INFO("Starting view group compaction", []), + {ok, Db} = couch_db:open(DbName, []), + {ok, Fd} = open_index_file(RootDir, DbName, <<GroupId/binary,".compact">>), + NewGroup = reset_file(Db, Fd, DbName, Group), + Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), + {noreply, State#group_state{compactor_pid = Pid}}; +handle_cast({start_compact, _}, State) -> + %% compact already running, this is a no-op + {noreply, State}; +handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, + #group_state{ + group = #group{current_seq=OldSeq} = Group, + init_args = {view, RootDir, DbName, GroupId}, + updater_pid = nil, + ref_counter = RefCounter + } = State) when NewSeq >= OldSeq -> + ?LOG_INFO("View Group compaction complete", []), + BaseName = RootDir ++ "/." ++ ?b2l(DbName) ++ ?b2l(GroupId), + FileName = BaseName ++ ".view", + CompactName = BaseName ++".compact.view", + file:delete(FileName), + ok = file:rename(CompactName, FileName), + + %% cleanup old group + couch_ref_counter:drop(RefCounter), + {ok, NewRefCounter} = couch_ref_counter:start([NewFd]), + case Group#group.db of + nil -> ok; + Else -> couch_db:close(Else) + end, + + erlang:send_after(1000, self(), delayed_commit), + {noreply, State#group_state{ + group=NewGroup, + ref_counter=NewRefCounter, + compactor_pid=nil + }}; +handle_cast({compact_done, NewGroup}, #group_state{ + init_args={view, _RootDir, DbName, GroupId} } = State) -> + ?LOG_INFO("View index compaction still behind main file", []), + couch_db:close(NewGroup#group.db), + {ok, Db} = couch_db:open(DbName, []), + Pid = spawn_link(fun() -> + {_,Ref} = erlang:spawn_monitor(fun() -> + couch_view_updater:update(NewGroup#group{db = Db}) + end), + receive + {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> + #group{name=GroupId} = NewGroup2, + Pid2 = couch_view:get_group_server(DbName, GroupId), + gen_server:cast(Pid2, {compact_done, NewGroup2}) + end + end), + {noreply, State#group_state{compactor_pid = Pid}}. handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {ok, Db} = couch_db:open(DbName, []), @@ -160,6 +220,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, #group_state{db_name=DbName, updater_pid=UpPid, + ref_counter=RefCounter, waiting_list=WaitList, waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> ok = couch_db:close(Db), @@ -168,7 +229,7 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, erlang:send_after(1000, self(), delayed_commit); true -> ok end, - case reply_with_group(Group, WaitList, []) of + case reply_with_group(Group, WaitList, [], RefCounter) of [] -> {noreply, State#group_state{waiting_commit=true, waiting_list=[], group=Group#group{db=nil}, updater_pid=nil}}; @@ -221,17 +282,18 @@ code_change(_OldVsn, State, _Extra) -> % reply_with_group/3 % for each item in the WaitingList {Pid, Seq} % if the Seq is =< GroupSeq, reply -reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq -> - gen_server:reply(Pid, {ok, Group}), - reply_with_group(Group, WaitList, StillWaiting); +reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], + StillWaiting, RefCounter) when Seq =< GroupSeq -> + gen_server:reply(Pid, {ok, Group, RefCounter}), + reply_with_group(Group, WaitList, StillWaiting, RefCounter); % else % put it in the continuing waiting list -reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) -> - reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]); +reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting, RefCounter) -> + reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting], RefCounter); % return the still waiting list -reply_with_group(_Group, [], StillWaiting) -> +reply_with_group(_Group, [], StillWaiting, _RefCounter) -> StillWaiting. reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> |