summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_file.erl76
-rw-r--r--src/couchdb/couch_view.erl83
2 files changed, 94 insertions, 65 deletions
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index d42d0eb6..26831c8c 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -93,7 +93,19 @@ expand(Fd, Bytes) when Bytes > 0 ->
%%----------------------------------------------------------------------
append_term(Fd, Term) ->
- gen_server:call(Fd, {append_term, Term}).
+ append_binary(Fd, term_to_binary(Term, [compressed])).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: To append an Erlang binary to the end of the file.
+%% Args: Erlang term to serialize and append to the file.
+%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
+%% serialized term. Use pread_term to read the term back.
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+append_binary(Fd, Bin) ->
+ gen_server:call(Fd, {append_bin, Bin}).
%%----------------------------------------------------------------------
@@ -104,7 +116,18 @@ append_term(Fd, Term) ->
%%----------------------------------------------------------------------
pread_term(Fd, Pos) ->
- gen_server:call(Fd, {pread_term, Pos}).
+ {ok, Bin} = pread_binary(Fd, Pos),
+ {ok, binary_to_term(Bin)}.
+
+%%----------------------------------------------------------------------
+%% Purpose: Reads a binrary from a file that was written with append_binary
+%% Args: Pos, the offset into the file where the term is serialized.
+%% Returns: {ok, Term}
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pread_binary(Fd, Pos) ->
+ gen_server:call(Fd, {pread_bin, Pos}).
%%----------------------------------------------------------------------
@@ -144,30 +167,35 @@ close(Fd) ->
write_header(Fd, Prefix, Data) ->
- ok = sync(Fd),
TermBin = term_to_binary(Data),
% the size of all the bytes written to the header, including the md5 signature (16 bytes)
FilledSize = size(Prefix) + size(TermBin) + 16,
+ {TermBin2, FilledSize2} =
case FilledSize > ?HEADER_SIZE of
true ->
% too big!
- {error, error_header_too_large};
+ {ok, Pos} = append_binary(Fd, TermBin),
+ PtrBin = term_to_binary({pointer_to_header_data, Pos}),
+ {PtrBin, size(Prefix) + size(PtrBin) + 16};
false ->
- % pad out the header with zeros, then take the md5 hash
- PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize))>>,
- Sig = erlang:md5([TermBin, PadZeros]),
- % now we assemble the final header binary and write to disk
- WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>,
- ?HEADER_SIZE = size(WriteBin), % sanity check
- DblWriteBin = [WriteBin, WriteBin],
- ok = pwrite(Fd, 0, DblWriteBin),
- ok = sync(Fd)
- end.
+ {TermBin, FilledSize}
+ end,
+ ok = sync(Fd),
+ % pad out the header with zeros, then take the md5 hash
+ PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
+ Sig = erlang:md5([TermBin2, PadZeros]),
+ % now we assemble the final header binary and write to disk
+ WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
+ ?HEADER_SIZE = size(WriteBin), % sanity check
+ DblWriteBin = [WriteBin, WriteBin],
+ ok = pwrite(Fd, 0, DblWriteBin),
+ ok = sync(Fd).
read_header(Fd, Prefix) ->
{ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
<<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
+ Result =
% read the first header
case extract_header(Prefix, Bin1) of
{ok, Header1} ->
@@ -200,9 +228,14 @@ read_header(Fd, Prefix) ->
% return the error, no need to log anything as the caller will be responsible for dealing with the error.
{error, Error}
end
+ end,
+ case Result of
+ {ok, {pointer_to_header_data, Ptr}} ->
+ pread_term(Fd, Ptr);
+ _ ->
+ Result
end.
-
-
+
extract_header(Prefix, Bin) ->
SizeOfPrefix = size(Prefix),
SizeOfTermBin = ?HEADER_SIZE -
@@ -300,17 +333,16 @@ handle_call(sync, _From, Fd) ->
handle_call({truncate, Pos}, _From, Fd) ->
{ok, Pos} = file:position(Fd, Pos),
{reply, file:truncate(Fd), Fd};
-handle_call({append_term, Term}, _From, Fd) ->
- Bin = term_to_binary(Term, [compressed]),
- TermLen = size(Bin),
- Bin2 = <<TermLen:32, Bin/binary>>,
+handle_call({append_bin, Bin}, _From, Fd) ->
+ Len = size(Bin),
+ Bin2 = <<Len:32, Bin/binary>>,
{ok, Pos} = file:position(Fd, eof),
{reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
-handle_call({pread_term, Pos}, _From, Fd) ->
+handle_call({pread_bin, Pos}, _From, Fd) ->
{ok, <<TermLen:32>>}
= file:pread(Fd, Pos, 4),
{ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
- {reply, {ok, binary_to_term(Bin)}, Fd}.
+ {reply, {ok, Bin}, Fd}.
handle_cast(close, Fd) ->
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index ee493c7d..ecb8e06b 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -22,12 +22,12 @@
-include("couch_db.hrl").
-record(group,
- {db,
- fd,
+ {sig=nil,
+ db=nil,
+ fd=nil,
name,
def_lang,
views,
- reductions=[], % list of reduction names and id_num of view that contains it.
id_btree=nil,
current_seq=0,
query_server=nil
@@ -189,7 +189,8 @@ design_doc_to_view_group(#doc{id=Id,body={obj, Fields}}) ->
{View#view{id_num=N},N+1}
end, 0, dict:to_list(DictBySrc)),
- reset_group(#group{name=Id, views=Views, def_lang=Language}).
+ Group = #group{name=Id, views=Views, def_lang=Language},
+ Group#group{sig=erlang:md5(term_to_binary(Group))}.
@@ -331,7 +332,7 @@ start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) ->
current_seq=0,
def_lang=Lang,
id_btree=nil},
- Group2 = disk_group_to_mem(Db, Fd, Group),
+ Group2 = init_group(Db, Fd, Group,nil),
temp_update_loop(Group2, NotifyPids);
Else ->
exit(Else)
@@ -355,7 +356,7 @@ start_update_loop(RootDir, DbName, GroupId) ->
start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
- {Db, DbGroup} =
+ {Db, Group} =
case (catch couch_server:open(DbName)) of
{ok, Db0} ->
case (catch couch_db:open_doc(Db0, GroupId)) of
@@ -370,38 +371,37 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
exit(Else)
end,
FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
- Group =
+ Group2 =
case couch_file:open(FileName) of
{ok, Fd} ->
+ Sig = Group#group.sig,
case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
- {ok, ExistingDiskGroup} ->
- % validate all the view definitions in the index are correct.
- case reset_group(ExistingDiskGroup) == reset_group(DbGroup) of
- true -> disk_group_to_mem(Db, Fd, ExistingDiskGroup);
- false -> reset_file(Db, Fd, DbName, DbGroup)
- end;
+ {ok, {Sig, HeaderInfo}} ->
+ % sigs match!
+ init_group(Db, Fd, Group, HeaderInfo);
_ ->
- reset_file(Db, Fd, DbName, DbGroup)
+ reset_file(Db, Fd, DbName, Group)
end;
{error, enoent} ->
case couch_file:open(FileName, [create]) of
- {ok, Fd} -> reset_file(Db, Fd, DbName, DbGroup);
+ {ok, Fd} -> reset_file(Db, Fd, DbName, Group);
Error -> throw(Error)
end
end,
- update_loop(RootDir, DbName, GroupId, Group, NotifyPids).
+ update_loop(RootDir, DbName, GroupId, Group2, NotifyPids).
-reset_file(Db, Fd, DbName, #group{name=Name} = DiskReadyGroup) ->
+reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, DiskReadyGroup),
- disk_group_to_mem(Db, Fd, DiskReadyGroup).
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+ init_group(Db, Fd, reset_group(Group), nil).
-update_loop(RootDir, DbName, GroupId, #group{fd=Fd}=Group, NotifyPids) ->
+update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) ->
try update_group(Group) of
{ok, Group2} ->
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)),
+ HeaderData = {Sig, get_index_header_data(Group2)},
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
[Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
garbage_collect(),
update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000))
@@ -475,11 +475,13 @@ nuke_dir(Dir) ->
delete_index_file(RootDir, DbName, GroupId) ->
file:delete(RootDir ++ "/." ++ DbName ++ GroupId ++ ".view").
-% Given a disk ready group structure, return an initialized, in-memory version.
-disk_group_to_mem(Db, Fd, #group{id_btree=IdState,def_lang=Lang,views=Views}=Group) ->
- {ok, IdBtree} = couch_btree:open(IdState, Fd),
- Views2 = lists:map(
- fun(#view{btree=BtreeState,reduce_funs=RedFuns}=View) ->
+init_group(Db, Fd, #group{views=Views}=Group, nil = _IndexHeaderData) ->
+ init_group(Db, Fd, Group, {0, nil, [nil || _ <- Views]});
+init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group,
+ {Seq, IdBtreeState, ViewStates} = _IndexHeaderData) ->
+ {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
+ Views2 = lists:zipwith(
+ fun(BtreeState, #view{btree=BtreeState,reduce_funs=RedFuns}=View) ->
FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
ReduceFun =
fun(reduce, KVs) ->
@@ -495,18 +497,13 @@ disk_group_to_mem(Db, Fd, #group{id_btree=IdState,def_lang=Lang,views=Views}=Gro
[{less, fun less_json/2},{reduce, ReduceFun}]),
View#view{btree=Btree}
end,
- Views),
- Group#group{db=Db, fd=Fd, id_btree=IdBtree, views=Views2}.
-
-% Given an initialized, in-memory group structure, return a disk ready version.
-mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) ->
- Views2 = lists:map(
- fun(#view{btree=Btree}=View) ->
- State = couch_btree:get_state(Btree),
- View#view{btree=State}
- end,
- Views),
- Group#group{db=nil, fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}.
+ ViewStates, Views),
+ Group#group{db=Db, fd=Fd, current_seq=Seq, id_btree=IdBtree, views=Views2}.
+
+
+get_index_header_data(#group{current_seq=Seq,id_btree=IdBtree,views=Views}) ->
+ ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
+ {Seq, couch_btree:get_state(IdBtree), ViewStates}.
@@ -582,7 +579,7 @@ less_list([A|RestA], [B|RestB]) ->
end
end.
-process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) ->
+process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) ->
% This fun computes once for each document
#doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo,
case DocId of
@@ -591,11 +588,11 @@ process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewId
% anything in the definition changed.
case couch_db:open_doc(Db, DocInfo) of
{ok, Doc} ->
- case design_doc_to_view_group(Doc) == reset_group(Group) of
- true ->
- % nothing changed, keeping on computing
+ case design_doc_to_view_group(Doc) of
+ #group{sig=Sig} ->
+ % The same md5 signature, keep on computing
{ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
- false ->
+ _ ->
throw(restart)
end;
{not_found, deleted} ->