diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_file.erl | 76 | ||||
-rw-r--r-- | src/couchdb/couch_view.erl | 83 |
2 files changed, 94 insertions, 65 deletions
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index d42d0eb6..26831c8c 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -93,7 +93,19 @@ expand(Fd, Bytes) when Bytes > 0 -> %%---------------------------------------------------------------------- append_term(Fd, Term) -> - gen_server:call(Fd, {append_term, Term}). + append_binary(Fd, term_to_binary(Term, [compressed])). + + +%%---------------------------------------------------------------------- +%% Purpose: To append an Erlang binary to the end of the file. +%% Args: Erlang term to serialize and append to the file. +%% Returns: {ok, Pos} where Pos is the file offset to the beginning the +%% serialized term. Use pread_term to read the term back. +%% or {error, Reason}. +%%---------------------------------------------------------------------- + +append_binary(Fd, Bin) -> + gen_server:call(Fd, {append_bin, Bin}). %%---------------------------------------------------------------------- @@ -104,7 +116,18 @@ append_term(Fd, Term) -> %%---------------------------------------------------------------------- pread_term(Fd, Pos) -> - gen_server:call(Fd, {pread_term, Pos}). + {ok, Bin} = pread_binary(Fd, Pos), + {ok, binary_to_term(Bin)}. + +%%---------------------------------------------------------------------- +%% Purpose: Reads a binrary from a file that was written with append_binary +%% Args: Pos, the offset into the file where the term is serialized. +%% Returns: {ok, Term} +%% or {error, Reason}. +%%---------------------------------------------------------------------- + +pread_binary(Fd, Pos) -> + gen_server:call(Fd, {pread_bin, Pos}). %%---------------------------------------------------------------------- @@ -144,30 +167,35 @@ close(Fd) -> write_header(Fd, Prefix, Data) -> - ok = sync(Fd), TermBin = term_to_binary(Data), % the size of all the bytes written to the header, including the md5 signature (16 bytes) FilledSize = size(Prefix) + size(TermBin) + 16, + {TermBin2, FilledSize2} = case FilledSize > ?HEADER_SIZE of true -> % too big! - {error, error_header_too_large}; + {ok, Pos} = append_binary(Fd, TermBin), + PtrBin = term_to_binary({pointer_to_header_data, Pos}), + {PtrBin, size(Prefix) + size(PtrBin) + 16}; false -> - % pad out the header with zeros, then take the md5 hash - PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize))>>, - Sig = erlang:md5([TermBin, PadZeros]), - % now we assemble the final header binary and write to disk - WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>, - ?HEADER_SIZE = size(WriteBin), % sanity check - DblWriteBin = [WriteBin, WriteBin], - ok = pwrite(Fd, 0, DblWriteBin), - ok = sync(Fd) - end. + {TermBin, FilledSize} + end, + ok = sync(Fd), + % pad out the header with zeros, then take the md5 hash + PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>, + Sig = erlang:md5([TermBin2, PadZeros]), + % now we assemble the final header binary and write to disk + WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>, + ?HEADER_SIZE = size(WriteBin), % sanity check + DblWriteBin = [WriteBin, WriteBin], + ok = pwrite(Fd, 0, DblWriteBin), + ok = sync(Fd). read_header(Fd, Prefix) -> {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)), <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin, + Result = % read the first header case extract_header(Prefix, Bin1) of {ok, Header1} -> @@ -200,9 +228,14 @@ read_header(Fd, Prefix) -> % return the error, no need to log anything as the caller will be responsible for dealing with the error. {error, Error} end + end, + case Result of + {ok, {pointer_to_header_data, Ptr}} -> + pread_term(Fd, Ptr); + _ -> + Result end. - - + extract_header(Prefix, Bin) -> SizeOfPrefix = size(Prefix), SizeOfTermBin = ?HEADER_SIZE - @@ -300,17 +333,16 @@ handle_call(sync, _From, Fd) -> handle_call({truncate, Pos}, _From, Fd) -> {ok, Pos} = file:position(Fd, Pos), {reply, file:truncate(Fd), Fd}; -handle_call({append_term, Term}, _From, Fd) -> - Bin = term_to_binary(Term, [compressed]), - TermLen = size(Bin), - Bin2 = <<TermLen:32, Bin/binary>>, +handle_call({append_bin, Bin}, _From, Fd) -> + Len = size(Bin), + Bin2 = <<Len:32, Bin/binary>>, {ok, Pos} = file:position(Fd, eof), {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd}; -handle_call({pread_term, Pos}, _From, Fd) -> +handle_call({pread_bin, Pos}, _From, Fd) -> {ok, <<TermLen:32>>} = file:pread(Fd, Pos, 4), {ok, Bin} = file:pread(Fd, Pos + 4, TermLen), - {reply, {ok, binary_to_term(Bin)}, Fd}. + {reply, {ok, Bin}, Fd}. handle_cast(close, Fd) -> diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index ee493c7d..ecb8e06b 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -22,12 +22,12 @@ -include("couch_db.hrl"). -record(group, - {db, - fd, + {sig=nil, + db=nil, + fd=nil, name, def_lang, views, - reductions=[], % list of reduction names and id_num of view that contains it. id_btree=nil, current_seq=0, query_server=nil @@ -189,7 +189,8 @@ design_doc_to_view_group(#doc{id=Id,body={obj, Fields}}) -> {View#view{id_num=N},N+1} end, 0, dict:to_list(DictBySrc)), - reset_group(#group{name=Id, views=Views, def_lang=Language}). + Group = #group{name=Id, views=Views, def_lang=Language}, + Group#group{sig=erlang:md5(term_to_binary(Group))}. @@ -331,7 +332,7 @@ start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) -> current_seq=0, def_lang=Lang, id_btree=nil}, - Group2 = disk_group_to_mem(Db, Fd, Group), + Group2 = init_group(Db, Fd, Group,nil), temp_update_loop(Group2, NotifyPids); Else -> exit(Else) @@ -355,7 +356,7 @@ start_update_loop(RootDir, DbName, GroupId) -> start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)). start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> - {Db, DbGroup} = + {Db, Group} = case (catch couch_server:open(DbName)) of {ok, Db0} -> case (catch couch_db:open_doc(Db0, GroupId)) of @@ -370,38 +371,37 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> exit(Else) end, FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view", - Group = + Group2 = case couch_file:open(FileName) of {ok, Fd} -> + Sig = Group#group.sig, case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of - {ok, ExistingDiskGroup} -> - % validate all the view definitions in the index are correct. - case reset_group(ExistingDiskGroup) == reset_group(DbGroup) of - true -> disk_group_to_mem(Db, Fd, ExistingDiskGroup); - false -> reset_file(Db, Fd, DbName, DbGroup) - end; + {ok, {Sig, HeaderInfo}} -> + % sigs match! + init_group(Db, Fd, Group, HeaderInfo); _ -> - reset_file(Db, Fd, DbName, DbGroup) + reset_file(Db, Fd, DbName, Group) end; {error, enoent} -> case couch_file:open(FileName, [create]) of - {ok, Fd} -> reset_file(Db, Fd, DbName, DbGroup); + {ok, Fd} -> reset_file(Db, Fd, DbName, Group); Error -> throw(Error) end end, - update_loop(RootDir, DbName, GroupId, Group, NotifyPids). + update_loop(RootDir, DbName, GroupId, Group2, NotifyPids). -reset_file(Db, Fd, DbName, #group{name=Name} = DiskReadyGroup) -> +reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), ok = couch_file:truncate(Fd, 0), - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, DiskReadyGroup), - disk_group_to_mem(Db, Fd, DiskReadyGroup). + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}), + init_group(Db, Fd, reset_group(Group), nil). -update_loop(RootDir, DbName, GroupId, #group{fd=Fd}=Group, NotifyPids) -> +update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) -> try update_group(Group) of {ok, Group2} -> - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)), + HeaderData = {Sig, get_index_header_data(Group2)}, + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData), [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], garbage_collect(), update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)) @@ -475,11 +475,13 @@ nuke_dir(Dir) -> delete_index_file(RootDir, DbName, GroupId) -> file:delete(RootDir ++ "/." ++ DbName ++ GroupId ++ ".view"). -% Given a disk ready group structure, return an initialized, in-memory version. -disk_group_to_mem(Db, Fd, #group{id_btree=IdState,def_lang=Lang,views=Views}=Group) -> - {ok, IdBtree} = couch_btree:open(IdState, Fd), - Views2 = lists:map( - fun(#view{btree=BtreeState,reduce_funs=RedFuns}=View) -> +init_group(Db, Fd, #group{views=Views}=Group, nil = _IndexHeaderData) -> + init_group(Db, Fd, Group, {0, nil, [nil || _ <- Views]}); +init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, + {Seq, IdBtreeState, ViewStates} = _IndexHeaderData) -> + {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), + Views2 = lists:zipwith( + fun(BtreeState, #view{btree=BtreeState,reduce_funs=RedFuns}=View) -> FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], ReduceFun = fun(reduce, KVs) -> @@ -495,18 +497,13 @@ disk_group_to_mem(Db, Fd, #group{id_btree=IdState,def_lang=Lang,views=Views}=Gro [{less, fun less_json/2},{reduce, ReduceFun}]), View#view{btree=Btree} end, - Views), - Group#group{db=Db, fd=Fd, id_btree=IdBtree, views=Views2}. - -% Given an initialized, in-memory group structure, return a disk ready version. -mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) -> - Views2 = lists:map( - fun(#view{btree=Btree}=View) -> - State = couch_btree:get_state(Btree), - View#view{btree=State} - end, - Views), - Group#group{db=nil, fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}. + ViewStates, Views), + Group#group{db=Db, fd=Fd, current_seq=Seq, id_btree=IdBtree, views=Views2}. + + +get_index_header_data(#group{current_seq=Seq,id_btree=IdBtree,views=Views}) -> + ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], + {Seq, couch_btree:get_state(IdBtree), ViewStates}. @@ -582,7 +579,7 @@ less_list([A|RestA], [B|RestB]) -> end end. -process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) -> +process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) -> % This fun computes once for each document #doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo, case DocId of @@ -591,11 +588,11 @@ process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewId % anything in the definition changed. case couch_db:open_doc(Db, DocInfo) of {ok, Doc} -> - case design_doc_to_view_group(Doc) == reset_group(Group) of - true -> - % nothing changed, keeping on computing + case design_doc_to_view_group(Doc) of + #group{sig=Sig} -> + % The same md5 signature, keep on computing {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; - false -> + _ -> throw(restart) end; {not_found, deleted} -> |