summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_view_updater.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-12-12 05:23:37 +0000
committerDamien F. Katz <damien@apache.org>2008-12-12 05:23:37 +0000
commit56a3ee28e006aa42150482e1c3f91dc1906273f9 (patch)
tree139a7b75db8c0e4b10d7a581e3f9f54936ea5746 /src/couchdb/couch_view_updater.erl
parentae3a9d4a0f06ef9eb6fcb0ce44e719bfc5bebbbd (diff)
modifications to view server to keep the file descriptor open for the life of the view group.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@725909 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_view_updater.erl')
-rw-r--r--src/couchdb/couch_view_updater.erl225
1 files changed, 30 insertions, 195 deletions
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
index 35bc5ffe..0532258c 100644
--- a/src/couchdb/couch_view_updater.erl
+++ b/src/couchdb/couch_view_updater.erl
@@ -12,83 +12,49 @@
-module(couch_view_updater).
--export([update/4, temp_update/6]).
+-export([update/1]).
-include("couch_db.hrl").
-
-
-update(RootDir, DbName, GroupId, NotifyPid) ->
- {ok, #group{sig=Sig,fd=Fd}=Group} = prepare_group(RootDir, DbName, GroupId),
- {ok, Db} = couch_db:open(DbName, []),
- Result = update_group(Group#group{db=Db}),
- couch_db:close(Db),
- case Result of
- {same, Group2} ->
- gen_server:cast(NotifyPid, {new_group, Group2});
- {updated, Group2} ->
- HeaderData = {Sig, get_index_header_data(Group2)},
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
- gen_server:cast(NotifyPid, {new_group, Group2})
+update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq,
+ commit_fun=CommitFun}=Group) ->
+ ?LOG_DEBUG("Starting index update.",[]),
+ DbPurgeSeq = couch_db:get_purge_seq(Db),
+ Group2 =
+ if DbPurgeSeq == PurgeSeq ->
+ Group;
+ DbPurgeSeq == PurgeSeq + 1 ->
+ ?LOG_DEBUG("Purging entries from view index.",[]),
+ purge_index(Group);
+ true ->
+ ?LOG_DEBUG("Resetting view index due to lost purge entries.",[]),
+ exit(reset)
end,
- garbage_collect().
-temp_update(DbName, Fd, Lang, MapSrc, RedSrc, NotifyPid) ->
- case couch_db:open(DbName, []) of
- {ok, Db} ->
- View = #view{map_names=["_temp"],
- id_num=0,
- btree=nil,
- def=MapSrc,
- reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
- Group = #group{name="_temp",
- db=Db,
- views=[View],
- current_seq=0,
- def_lang=Lang,
- id_btree=nil},
- Group2 = init_group(Db, Fd, Group,nil),
- couch_db:monitor(Db),
- {_Updated, Group3} = update_group(Group2#group{db=Db}),
- couch_db:close(Db),
- gen_server:cast(NotifyPid, {new_group, Group3}),
- garbage_collect();
- Else ->
- exit(Else)
- end.
-
-
-update_group(#group{db=Db,current_seq=CurrentSeq}=Group) ->
- ViewEmptyKVs = [{View, []} || View <- Group#group.views],
+ ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
% compute on all docs modified since we last computed.
{ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}
= couch_db:enum_docs_since(
Db,
- CurrentSeq,
+ Seq,
fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,
- {[], Group, ViewEmptyKVs, []}
+ {[], Group2, ViewEmptyKVs, []}
),
{Group4, Results} = view_compute(Group3, UncomputedDocs),
- {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
+ {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(
+ UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
couch_query_servers:stop_doc_map(Group4#group.query_server),
NewSeq = couch_db:get_update_seq(Db),
- if CurrentSeq /= NewSeq ->
- {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2, NewSeq),
- {updated, Group5#group{query_server=nil}};
+ if Seq /= NewSeq ->
+ {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2,
+ NewSeq),
+ ok = CommitFun(Group5),
+ exit({new_group, Group5#group{query_server=nil}});
true ->
- {same, Group4#group{query_server=nil}}
+ exit({new_group, Group4#group{query_server=nil}})
end.
-get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,
- id_btree=IdBtree,views=Views}) ->
- ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
- #index_header{seq=Seq,
- purge_seq=PurgeSeq,
- id_btree_state=couch_btree:get_state(IdBtree),
- view_states=ViewStates}.
-
-
purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
@@ -120,7 +86,8 @@ purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
views=Views2,
purge_seq=couch_db:get_purge_seq(Db)}.
-process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys}) ->
+process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs,
+ DocIdViewIdKeys}) ->
% This fun computes once for each document
#doc_info{id=DocId, deleted=Deleted} = DocInfo,
case DocId of
@@ -129,17 +96,15 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, Doc
% anything in the definition changed.
case couch_db:open_doc(Db, DocInfo) of
{ok, Doc} ->
- case design_doc_to_view_group(Doc) of
+ case couch_view_group:design_doc_to_view_group(Doc) of
#group{sig=Sig} ->
% The same md5 signature, keep on computing
{ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
_ ->
- ?LOG_DEBUG("throw(restart) md5 broke ~p", [DocId]),
- throw(restart)
+ exit(reset)
end;
{not_found, deleted} ->
- ?LOG_DEBUG("throw(restart) {not_found, deleted} ~p", [DocId]),
- throw(restart)
+ exit(reset)
end;
<<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs
{ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
@@ -250,134 +215,4 @@ write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq) ->
],
Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},
{ok, Group2}.
-
-prepare_group(RootDir, DbName, GroupId) ->
- {Db, Group} = case (catch couch_db:open(DbName, [])) of
- {ok, Db0} ->
- case (catch couch_db:open_doc(Db0, GroupId)) of
- {ok, Doc} ->
- {Db0, design_doc_to_view_group(Doc)};
- Else ->
- delete_index_file(RootDir, DbName, GroupId),
- ?LOG_DEBUG("prepare_group exit Else ~p self() ~p", [Else, self()]),
- exit(Else)
- end;
- Else ->
- delete_index_file(RootDir, DbName, GroupId),
- exit(Else)
- end,
- FileName = RootDir ++ "/." ++ binary_to_list(DbName) ++
- binary_to_list(GroupId) ++".view",
- Group2 =
- case couch_file:open(FileName) of
- {ok, Fd} ->
- Sig = Group#group.sig,
- case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
- {ok, {Sig, #index_header{purge_seq=PurgeSeq}=HeaderInfo}} ->
- % sigs match!
- DbPurgeSeq = couch_db:get_purge_seq(Db),
- % We can only use index with the same, or next purge seq as the db.
- if DbPurgeSeq == PurgeSeq ->
- init_group(Db, Fd, Group, HeaderInfo);
- DbPurgeSeq == PurgeSeq + 1 ->
- ?LOG_DEBUG("Purging entries from view index.",[]),
- purge_index(init_group(Db, Fd, Group, HeaderInfo));
- true ->
- ?LOG_DEBUG("Reseting view index due to lost purge entries.",[]),
- reset_file(Db, Fd, DbName, Group)
- end;
- _ ->
- reset_file(Db, Fd, DbName, Group)
- end;
- {error, enoent} ->
- case couch_file:open(FileName, [create]) of
- {ok, Fd} -> reset_file(Db, Fd, DbName, Group);
- Error -> throw(Error)
- end
- end,
-
- couch_db:monitor(Db),
- couch_db:close(Db),
- {ok, Group2}.
-
-% maybe move to another module
-design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
- Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),
- {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),
- % add the views to a dictionary object, with the map source as the key
- DictBySrc =
- lists:foldl(
- fun({Name, {MRFuns}}, DictBySrcAcc) ->
- MapSrc = proplists:get_value(<<"map">>, MRFuns),
- RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),
- View =
- case dict:find(MapSrc, DictBySrcAcc) of
- {ok, View0} -> View0;
- error -> #view{def=MapSrc} % create new view object
- end,
- View2 =
- if RedSrc == null ->
- View#view{map_names=[Name|View#view.map_names]};
- true ->
- View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
- end,
- dict:store(MapSrc, View2, DictBySrcAcc)
- end, dict:new(), RawViews),
- % number the views
- {Views, _N} = lists:mapfoldl(
- fun({_Src, View}, N) ->
- {View#view{id_num=N},N+1}
- end, 0, dict:to_list(DictBySrc)),
-
- Group = #group{name=Id, views=Views, def_lang=Language},
- Group#group{sig=erlang:md5(term_to_binary(Group))}.
-
-reset_group(#group{views=Views}=Group) ->
- Views2 = [View#view{btree=nil} || View <- Views],
- Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
- id_btree=nil,views=Views2}.
-
-reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
- ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
- ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
- init_group(Db, Fd, reset_group(Group), nil).
-
-delete_index_file(RootDir, DbName, GroupId) ->
- file:delete(RootDir ++ "/." ++ binary_to_list(DbName)
- ++ binary_to_list(GroupId) ++ ".view").
-
-init_group(Db, Fd, #group{views=Views}=Group, nil) ->
- init_group(Db, Fd, Group,
- #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
- id_btree_state=nil, view_states=[nil || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
- #index_header{seq=Seq, purge_seq=PurgeSeq,
- id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
- {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
- Views2 = lists:zipwith(
- fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
- FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
- ReduceFun =
- fun(reduce, KVs) ->
- KVs2 = couch_view:expand_dups(KVs,[]),
- KVs3 = couch_view:detuple_kvs(KVs2,[]),
- {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs,
- KVs3),
- {length(KVs3), Reduced};
- (rereduce, Reds) ->
- Count = lists:sum([Count0 || {Count0, _} <- Reds]),
- UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
- {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs,
- UserReds),
- {Count, Reduced}
- end,
- {ok, Btree} = couch_btree:open(BtreeState, Fd,
- [{less, fun couch_view:less_json_keys/2},
- {reduce, ReduceFun}]),
- View#view{btree=Btree}
- end,
- ViewStates, Views),
- Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
- id_btree=IdBtree, views=Views2}. \ No newline at end of file