summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/couchdb/default.ini.tpl.in1
-rw-r--r--src/couchdb/Makefile.am2
-rw-r--r--src/couchdb/couch_httpd_db.erl21
-rw-r--r--src/couchdb/couch_view.erl2
-rw-r--r--src/couchdb/couch_view_compactor.erl98
-rw-r--r--src/couchdb/couch_view_group.erl96
6 files changed, 194 insertions, 26 deletions
diff --git a/etc/couchdb/default.ini.tpl.in b/etc/couchdb/default.ini.tpl.in
index dd63db0b..fa715b18 100644
--- a/etc/couchdb/default.ini.tpl.in
+++ b/etc/couchdb/default.ini.tpl.in
@@ -52,6 +52,7 @@ _restart = {couch_httpd_misc_handlers, handle_restart_req}
_stats = {couch_httpd_stats_handlers, handle_stats_req}
[httpd_db_handlers]
+_compact = {couch_httpd_db, handle_compact_req}
_design = {couch_httpd_db, handle_design_req}
_temp_view = {couch_httpd_view, handle_temp_view_req}
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 8bea5290..8b782221 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -73,6 +73,7 @@ source_files = \
couch_task_status.erl \
couch_util.erl \
couch_view.erl \
+ couch_view_compactor.erl \
couch_view_updater.erl \
couch_view_group.erl \
couch_db_updater.erl
@@ -114,6 +115,7 @@ compiled_files = \
couch_task_status.beam \
couch_util.beam \
couch_view.beam \
+ couch_view_compactor.beam \
couch_view_updater.beam \
couch_view_group.beam \
couch_db_updater.beam
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 07815bda..209dc75e 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -13,7 +13,8 @@
-module(couch_httpd_db).
-include("couch_db.hrl").
--export([handle_request/1, handle_design_req/2, db_req/2, couch_doc_open/4]).
+-export([handle_request/1, handle_compact_req/2, handle_design_req/2,
+ db_req/2, couch_doc_open/4]).
-import(couch_httpd,
[send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,
@@ -41,6 +42,17 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method,
do_db_req(Req, Handler)
end.
+handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, _Db) ->
+ ok = couch_view_compactor:start_compact(DbName, Id),
+ send_json(Req, 202, {[{ok, true}]});
+
+handle_compact_req(#httpd{method='POST'}=Req, Db) ->
+ ok = couch_db:start_compact(Db),
+ send_json(Req, 202, {[{ok, true}]});
+
+handle_compact_req(Req, _Db) ->
+ send_method_not_allowed(Req, "POST").
+
handle_design_req(#httpd{
path_parts=[_DbName,_Design,_DesName, <<"_",_/binary>> = Action | _Rest],
design_url_handlers = DesignUrlHandlers
@@ -189,13 +201,6 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) ->
db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");
-db_req(#httpd{method='POST',path_parts=[_,<<"_compact">>]}=Req, Db) ->
- ok = couch_db:start_compact(Db),
- send_json(Req, 202, {[{ok, true}]});
-
-db_req(#httpd{path_parts=[_,<<"_compact">>]}=Req, _Db) ->
- send_method_not_allowed(Req, "POST");
-
db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
{IdsRevs} = couch_httpd:json_body(Req),
IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index 5fb2ec5a..bac9f635 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -17,7 +17,7 @@
detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,
code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4,
get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/7,
- extract_map_view/1]).
+ extract_map_view/1,get_group_server/2]).
-include("couch_db.hrl").
diff --git a/src/couchdb/couch_view_compactor.erl b/src/couchdb/couch_view_compactor.erl
new file mode 100644
index 00000000..63c0ff75
--- /dev/null
+++ b/src/couchdb/couch_view_compactor.erl
@@ -0,0 +1,98 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_view_compactor).
+
+-include ("couch_db.hrl").
+
+-export([start_compact/2]).
+
+%% @spec start_compact(DbName::binary(), GroupId:binary()) -> ok
+%% @doc Compacts the views. GroupId must not include the _design/ prefix
+start_compact(DbName, GroupId) ->
+ Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>),
+ gen_server:cast(Pid, {start_compact, fun compact_group/2}).
+
+%%=============================================================================
+%% internal functions
+%%=============================================================================
+
+%% @spec compact_group(Group, NewGroup) -> ok
+compact_group(Group, EmptyGroup) ->
+ #group{
+ current_seq = Seq,
+ id_btree = IdBtree,
+ name = GroupId,
+ views = Views
+ } = Group,
+
+ #group{
+ db = Db,
+ id_btree = EmptyIdBtree,
+ views = EmptyViews
+ } = EmptyGroup,
+
+ {ok, {Count, _}} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
+
+ <<"_design", ShortName/binary>> = GroupId,
+ DbName = couch_db:name(Db),
+ TaskName = <<DbName/binary, ShortName/binary>>,
+ couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>),
+
+ Fun = fun(KV, {Bt, Acc, TotalCopied}) ->
+ if TotalCopied rem 10000 == 0 ->
+ couch_task_status:update("Copied ~p of ~p Ids (~p%)",
+ [TotalCopied, Count, (TotalCopied*100) div Count]),
+ {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
+ {ok, {Bt2, [], TotalCopied+1}};
+ true ->
+ {ok, {Bt, [KV|Acc], TotalCopied+1}}
+ end
+ end,
+ {ok, {Bt3, Uncopied, _Total}} = couch_btree:foldl(IdBtree, Fun,
+ {EmptyIdBtree, [], 0}),
+ {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+
+ NewViews = lists:map(fun({View, EmptyView}) ->
+ compact_view(View, EmptyView)
+ end, lists:zip(Views, EmptyViews)),
+
+ NewGroup = EmptyGroup#group{
+ id_btree=NewIdBtree,
+ views=NewViews,
+ current_seq=Seq
+ },
+
+ Pid = couch_view:get_group_server(DbName, GroupId),
+ gen_server:cast(Pid, {compact_done, NewGroup}).
+
+%% @spec compact_view(View, EmptyView, Retry) -> CompactView
+compact_view(View, EmptyView) ->
+ {ok, Count} = couch_view:get_row_count(View),
+
+ %% Key is {Key,DocId}
+ Fun = fun(KV, {Bt, Acc, TotalCopied}) ->
+ if TotalCopied rem 10000 == 0 ->
+ couch_task_status:update("View #~p: copied ~p of ~p KVs (~p%)",
+ [View#view.id_num, TotalCopied, Count, (TotalCopied*100) div Count]),
+ {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
+ {ok, {Bt2, [], TotalCopied + 1}};
+ true ->
+ {ok, {Bt, [KV|Acc], TotalCopied + 1}}
+ end
+ end,
+
+ {ok, {Bt3, Uncopied, _Total}} = couch_btree:foldl(View#view.btree, Fun,
+ {EmptyView#view.btree, [], 0}),
+ {ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+ EmptyView#view{btree = NewBt}.
+
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 4e7d7767..e44d637c 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -29,15 +29,18 @@
init_args,
group,
updater_pid=nil,
+ compactor_pid=nil,
waiting_commit=false,
- waiting_list=[]
+ waiting_list=[],
+ ref_counter=nil
}).
% api methods
request_group(Pid, Seq) ->
?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
case gen_server:call(Pid, {request_group, Seq}, infinity) of
- {ok, Group} ->
+ {ok, Group, RefCounter} ->
+ couch_ref_counter:add(RefCounter),
{ok, Group};
Else ->
?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]),
@@ -70,14 +73,16 @@ start_link(InitArgs) ->
init({InitArgs, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
case prepare_group(InitArgs, false) of
- {ok, #group{db=Db}=Group} ->
+ {ok, #group{db=Db, fd=Fd}=Group} ->
couch_db:monitor(Db),
Pid = spawn_link(fun()-> couch_view_updater:update(Group) end),
+ {ok, RefCounter} = couch_ref_counter:start([Fd]),
{ok, #group_state{
db_name=couch_db:name(Db),
init_args=InitArgs,
updater_pid = Pid,
- group=Group}};
+ group=Group,
+ ref_counter=RefCounter}};
Error ->
ReturnPid ! {Ref, self(), Error},
ignore
@@ -120,10 +125,11 @@ handle_call({request_group, RequestSeq}, From,
% If the request seqence is less than or equal to the seq_id of a known Group,
% we respond with that Group.
-handle_call({request_group, RequestSeq}, _From,
- #group_state{group=#group{current_seq=GroupSeq}=Group}=State)
- when RequestSeq =< GroupSeq ->
- {reply, {ok, Group}, State};
+handle_call({request_group, RequestSeq}, _From, #group_state{
+ group = #group{current_seq=GroupSeq} = Group,
+ ref_counter = RefCounter
+ } = State) when RequestSeq =< GroupSeq ->
+ {reply, {ok, Group, RefCounter}, State};
% Otherwise: TargetSeq => RequestSeq > GroupSeq
% We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq
@@ -134,9 +140,63 @@ handle_call({request_group, RequestSeq}, From,
}, infinity}.
-handle_cast(foo, State) ->
- {ok, State}.
+handle_cast({start_compact, CompactFun}, #group_state{ compactor_pid=nil,
+ group=Group, init_args={view, RootDir, DbName, GroupId} } = State) ->
+ ?LOG_INFO("Starting view group compaction", []),
+ {ok, Db} = couch_db:open(DbName, []),
+ {ok, Fd} = open_index_file(RootDir, DbName, <<GroupId/binary,".compact">>),
+ NewGroup = reset_file(Db, Fd, DbName, Group),
+ Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
+ {noreply, State#group_state{compactor_pid = Pid}};
+handle_cast({start_compact, _}, State) ->
+ %% compact already running, this is a no-op
+ {noreply, State};
+handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
+ #group_state{
+ group = #group{current_seq=OldSeq} = Group,
+ init_args = {view, RootDir, DbName, GroupId},
+ updater_pid = nil,
+ ref_counter = RefCounter
+ } = State) when NewSeq >= OldSeq ->
+ ?LOG_INFO("View Group compaction complete", []),
+ BaseName = RootDir ++ "/." ++ ?b2l(DbName) ++ ?b2l(GroupId),
+ FileName = BaseName ++ ".view",
+ CompactName = BaseName ++".compact.view",
+ file:delete(FileName),
+ ok = file:rename(CompactName, FileName),
+
+ %% cleanup old group
+ couch_ref_counter:drop(RefCounter),
+ {ok, NewRefCounter} = couch_ref_counter:start([NewFd]),
+ case Group#group.db of
+ nil -> ok;
+ Else -> couch_db:close(Else)
+ end,
+
+ erlang:send_after(1000, self(), delayed_commit),
+ {noreply, State#group_state{
+ group=NewGroup,
+ ref_counter=NewRefCounter,
+ compactor_pid=nil
+ }};
+handle_cast({compact_done, NewGroup}, #group_state{
+ init_args={view, _RootDir, DbName, GroupId} } = State) ->
+ ?LOG_INFO("View index compaction still behind main file", []),
+ couch_db:close(NewGroup#group.db),
+ {ok, Db} = couch_db:open(DbName, []),
+ Pid = spawn_link(fun() ->
+ {_,Ref} = erlang:spawn_monitor(fun() ->
+ couch_view_updater:update(NewGroup#group{db = Db})
+ end),
+ receive
+ {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
+ #group{name=GroupId} = NewGroup2,
+ Pid2 = couch_view:get_group_server(DbName, GroupId),
+ gen_server:cast(Pid2, {compact_done, NewGroup2})
+ end
+ end),
+ {noreply, State#group_state{compactor_pid = Pid}}.
handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{ok, Db} = couch_db:open(DbName, []),
@@ -160,6 +220,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
#group_state{db_name=DbName,
updater_pid=UpPid,
+ ref_counter=RefCounter,
waiting_list=WaitList,
waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
ok = couch_db:close(Db),
@@ -168,7 +229,7 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
erlang:send_after(1000, self(), delayed_commit);
true -> ok
end,
- case reply_with_group(Group, WaitList, []) of
+ case reply_with_group(Group, WaitList, [], RefCounter) of
[] ->
{noreply, State#group_state{waiting_commit=true, waiting_list=[],
group=Group#group{db=nil}, updater_pid=nil}};
@@ -221,17 +282,18 @@ code_change(_OldVsn, State, _Extra) ->
% reply_with_group/3
% for each item in the WaitingList {Pid, Seq}
% if the Seq is =< GroupSeq, reply
-reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq ->
- gen_server:reply(Pid, {ok, Group}),
- reply_with_group(Group, WaitList, StillWaiting);
+reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList],
+ StillWaiting, RefCounter) when Seq =< GroupSeq ->
+ gen_server:reply(Pid, {ok, Group, RefCounter}),
+ reply_with_group(Group, WaitList, StillWaiting, RefCounter);
% else
% put it in the continuing waiting list
-reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) ->
- reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]);
+reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting, RefCounter) ->
+ reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting], RefCounter);
% return the still waiting list
-reply_with_group(_Group, [], StillWaiting) ->
+reply_with_group(_Group, [], StillWaiting, _RefCounter) ->
StillWaiting.
reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->