diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_db.erl | 219 | ||||
-rw-r--r-- | src/couchdb/couch_file.erl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_stream.erl | 16 | ||||
-rw-r--r-- | src/couchdb/couch_view.erl | 3 | ||||
-rw-r--r-- | src/couchdb/mod_couch.erl | 9 |
5 files changed, 151 insertions, 102 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 51d55822..6f8a4ac6 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -17,9 +17,9 @@ -export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]). -export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). --export([start_update_loop/1]). +-export([start_update_loop/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). --export([start_copy_compact_int/1,continue_copy_compact_int/2]). +-export([start_copy_compact_int/2]). -include("couch_db.hrl"). @@ -47,24 +47,52 @@ update_seq, doc_count, doc_del_count, - name + name, + filepath }). % small value used in revision trees to indicate the revision isn't stored -define(REV_MISSING, []). start_link(DbName, Filepath, Options) -> + catch start_link0(DbName, Filepath, Options). + +start_link0(DbName, Filepath, Options) -> + % first delete the old file previous compaction + Fd = case couch_file:open(Filepath, Options) of - {ok, Fd} -> - Result = gen_server:start_link(couch_db, {DbName, Fd, Options}, []), - unlink(Fd), - Result; + {ok, Fd0} -> + Fd0; {error, enoent} -> - % couldn't find file - {error, not_found}; + % couldn't find file. is there a compact version? This can happen if + % crashed during the file switch. + case couch_file:open(Filepath ++ ".compact") of + {ok, Fd0} -> + couch_log:info("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]), + ok = file:rename(Filepath ++ ".compact", Filepath), + Fd0; + {error, enoent} -> + throw({error, notfound}) + end; Else -> - Else - end. + throw(Else) + end, + + StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []), + unlink(Fd), + case StartResult of + {ok, _} -> + % We successfully opened the db, delete old storage files if around + case file:delete(Filepath ++ ".old") of + ok -> + couch_log:info("Deleted old storage file ~s~s", [Filepath, ".old"]); + {error, enoent} -> + ok % normal result + end; + _ -> + ok + end, + StartResult. %%% Interface functions %%% @@ -146,12 +174,13 @@ get_db_info(Db) -> doc_count=Count, doc_del_count=DelCount, update_seq=SeqNum} = Db, + {ok, Size} = couch_file:bytes(Fd), InfoList = [ {doc_count, Count}, {doc_del_count, DelCount}, - {last_update_seq, SeqNum}, - {compacting, Compactor==nil}, - {size, couch_file:bytes(Fd)} + {update_seq, SeqNum}, + {compacting, Compactor/=nil}, + {size, Size} ], {ok, InfoList}. @@ -337,23 +366,12 @@ enum_docs(MainPid, StartId, InFun, Ctx) -> % server functions -init({DbName, Fd, Options}) -> - link(Fd), - case lists:member(create, Options) of - true -> - % create a new header and writes it to the file - Header = #db_header{}, - ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), - ok = couch_file:sync(Fd); - false -> - {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>) - end, - - Db = init_db(DbName, Fd, Header), - - UpdatePid = spawn_link(couch_db, start_update_loop, [Db]), - - {ok, Db#db{update_pid=UpdatePid}}. +init(InitArgs) -> + spawn_link(couch_db, start_update_loop, [self(), InitArgs]), + receive + {initialized, Db} -> + {ok, Db} + end. btree_by_seq_split(DocInfo) -> #doc_info{ @@ -383,7 +401,7 @@ btree_by_name_join(Id, {Seq, Tree}) -> #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}. -init_db(DbName, Fd, Header) -> +init_db(DbName, Filepath, Fd, Header) -> {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), ok = couch_stream:set_min_buffer(SummaryStream, 10000), {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, @@ -395,7 +413,7 @@ init_db(DbName, Fd, Header) -> {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), #db{ - main_pid=self(), + update_pid=self(), fd=Fd, header=Header, summary_stream = SummaryStream, @@ -405,12 +423,16 @@ init_db(DbName, Fd, Header) -> update_seq = Header#db_header.update_seq, doc_count = Header#db_header.doc_count, doc_del_count = Header#db_header.doc_del_count, - name = DbName + name = DbName, + filepath=Filepath }. +close_db(#db{fd=Fd,summary_stream=Ss}) -> + couch_file:close(Fd), + couch_stream:close(Ss). + terminate(_Reason, Db) -> - exit(Db#db.update_pid, kill), - couch_file:close(Db#db.fd). + exit(Db#db.update_pid, kill). handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) -> Updater ! {From, update_docs, DocActions, Options}, @@ -435,17 +457,34 @@ handle_info(Msg, Db) -> %%% Internal function %%% -start_update_loop(Db) -> - update_loop(Db#db{update_pid=self()}). +start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) -> + link(Fd), + + case lists:member(create, Options) of + true -> + % create a new header and writes it to the file + Header = #db_header{}, + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), + % delete any old compaction files that might be hanging around + file:delete(Filepath ++ ".compact"), + file:delete(Filepath ++ ".old"); + false -> + {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>) + end, + + Db = init_db(DbName, Filepath, Fd, Header), + Db2 = Db#db{main_pid=MainPid}, + MainPid ! {initialized, Db2}, + update_loop(Db2). -update_loop(Db) -> +update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) -> receive {OrigFrom, update_docs, DocActions, Options} -> case (catch update_docs_int(Db, DocActions, Options)) of {ok, Db2} -> - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(MainPid, {db_updated, Db2}), gen_server:reply(OrigFrom, ok), - couch_db_update_notifier:notify({updated, Db2#db.name}), + couch_db_update_notifier:notify({updated, Name}), update_loop(Db2); conflict -> gen_server:reply(OrigFrom, conflict), @@ -456,22 +495,40 @@ update_loop(Db) -> compact -> case Db#db.compactor_pid of nil -> - Pid = spawn_link(couch_db, start_copy_compact_int, [Db]), + couch_log:info("Starting compaction for db \"~s\"", [Name]), + Pid = spawn_link(couch_db, start_copy_compact_int, [Db, true]), Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(MainPid, {db_updated, Db2}), update_loop(Db2); _ -> update_loop(Db) % already started end; - {compact_done, #db{update_seq=CompactSeq}=NewDb} -> - case CompactSeq == Db#db.update_seq of + {compact_done, CompactFilepath} -> + {ok, NewFd} = couch_file:open(CompactFilepath), + {ok, NewHeader} = couch_file:read_header(NewFd, <<$g, $m, $k, 0>>), + #db{update_seq=NewSeq}= NewDb = + init_db(Name, CompactFilepath, NewFd, NewHeader), + case Db#db.update_seq == NewSeq of true -> - NewDb2 = swap_files(Db, NewDb), + couch_log:debug("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]), + ok = file:rename(Filepath, Filepath ++ ".old"), + ok = file:rename(CompactFilepath, Filepath), + + NewDb2 = NewDb#db{ + main_pid = Db#db.main_pid, + doc_count = Db#db.doc_count, + doc_del_count = Db#db.doc_del_count, + filepath = Filepath}, + close_db(Db), + ok = gen_server:call(MainPid, {db_updated, NewDb2}), + couch_log:info("Compaction for db ~p completed.", [Name]), update_loop(NewDb2#db{compactor_pid=nil}); false -> - Pid = spawn_link(couch_db, continue_copy_compact_int, [Db, NewDb]), + couch_log:info("Compaction file still behind main file " + "(update seq=~p. compact update seq=~p). Retrying.", + [Db#db.update_seq, NewSeq]), + Pid = spawn_link(couch_db, start_copy_compact_int, [Db, false]), Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), update_loop(Db2) end; Else -> @@ -479,14 +536,6 @@ update_loop(Db) -> exit({error, Else}) end. -swap_files(#db{fd=OldFd, name=Name}=_DbOld, DbNew) -> - NormalFilename = couch_server:get_filename(Name), - true = file:rename(NormalFilename, NormalFilename ++ ".old"), - true = file:rename(NormalFilename ++ ".compact", NormalFilename), - couch_file:close(OldFd), - file:delete(NormalFilename ++ ".old"), - DbNew. - get_db(MainPid) -> {ok, Db} = gen_server:call(MainPid, get_db), Db. @@ -773,7 +822,6 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> commit_data(#db{fd=Fd, header=Header} = Db) -> - ok = couch_file:sync(Fd), % commit outstanding data Header2 = Header#db_header{ update_seq = Db#db.update_seq, summary_stream_state = couch_stream:get_state(Db#db.summary_stream), @@ -787,7 +835,6 @@ commit_data(#db{fd=Fd, header=Header} = Db) -> Db; % unchanged. nothing to do true -> ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2), - ok = couch_file:sync(Fd), % commit header to disk Db#db{header = Header2} end. @@ -795,21 +842,22 @@ copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp), % copy the bin values NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) -> - {ok, NewBinSp} = couch_stream:copy_stream(SrcFd, BinSp, Len, DestFd), + {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd), {Name, {Type, NewBinSp, Len}} end, BinInfos), % now write the document summary - {ok, _SummaryPointer} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}). + {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}), + Sp. copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> []; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTrees]) -> +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) -> % This is a leaf node, copy it over NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), - [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)]; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTrees} | RestTrees]) -> + [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> % inner node, only copy info/data from leaf nodes - [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTrees)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)]. + [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]. copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) -> Ids = [Id || #doc_info{id=Id} <- InfoBySeq], @@ -849,34 +897,33 @@ copy_compact_docs(Db, NewDb) -> NewDb2 end. -start_copy_compact_int(#db{name=Name}=Db) -> - couch_log:debug("New compaction process spawned for db \"%s\"", [Name]), - Filename = couch_server:get_compaction_filename(Name), - case couch_file:open(Filename) of +start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) -> + CompactFile = Filepath ++ ".compact", + couch_log:debug("Compaction process spawned for db \"~s\"", [Name]), + case couch_file:open(CompactFile) of {ok, Fd} -> - couch_log:debug("Found existing compaction file for db \"%s\"", [Name]), + couch_log:debug("Found existing compaction file for db \"~s\"", [Name]), {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>); {error, enoent} -> % - {ok, Fd} = couch_file:open(Filename, [create]), + {ok, Fd} = couch_file:open(CompactFile, [create]), Header = #db_header{}, - ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), - ok = couch_file:sync(Fd) + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header) end, - NewDb = init_db(Name, Fd, Header), + NewDb = init_db(Name, CompactFile, Fd, Header), NewDb2 = copy_compact_docs(Db, NewDb), - - % suck up all the local docs into memory and write them to the new db - {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, - fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), - NewDb3 = commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}), - - NewDb3#db.update_pid ! {compact_done, NewDb3}. - -continue_copy_compact_int(#db{name=Name}=Db, NewDb) -> - couch_log:debug("Continued compaction process spawned for db \"%s\"", [Name]), - NewDb2 = copy_compact_docs(Db, NewDb), - NewDb2#db.update_pid ! {compact_done, NewDb2}. + NewDb3 = + case CopyLocal of + true -> + % suck up all the local docs into memory and write them to the new db + {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, + fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), + {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), + commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}); + _ -> + NewDb2 + end, + close_db(NewDb3), + Db#db.update_pid ! {compact_done, CompactFile}.
\ No newline at end of file diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index 6cbad44a..3a1a7af1 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -142,8 +142,7 @@ close(Fd) -> write_header(Fd, Prefix, Data) -> - % The leading bytes in every db file, the sig and the file version: - %the actual header data + ok = sync(Fd), TermBin = term_to_binary(Data), % the size of all the bytes written to the header, including the md5 signature (16 bytes) FilledSize = size(Prefix) + size(TermBin) + 16, @@ -159,7 +158,8 @@ write_header(Fd, Prefix, Data) -> WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>, ?HEADER_SIZE = size(WriteBin), % sanity check DblWriteBin = [WriteBin, WriteBin], - ok = pwrite(Fd, 0, DblWriteBin) + ok = pwrite(Fd, 0, DblWriteBin), + ok = sync(Fd) end. diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index ae1b4f2c..8d5260f1 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -15,7 +15,7 @@ -export([test/1]). -export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]). --export([copy/4]). +-export([copy/4, copy_to_new_stream/4]). -export([ensure_buffer/2, set_min_buffer/2]). -export([init/1, terminate/2, handle_call/3]). -export([handle_cast/2,code_change/3,handle_info/2]). @@ -78,10 +78,16 @@ read(Fd, Sp, Num) -> Bin = list_to_binary(lists:reverse(RevBin)), {ok, Bin, Sp2}. -copy(#stream{pid = _Pid, fd = Fd}, Sp, Num, DestStream) -> - copy(Fd, Sp, Num, DestStream); -copy(Fd, Sp, Num, DestStream) -> - {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, +copy_to_new_stream(Src, Sp, Len, DestFd) -> + Dest = open(DestFd), + {ok, NewSp} = copy(Src, Sp, Len, Dest), + close(Dest), + {ok, NewSp}. + +copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) -> + copy(Fd, Sp, Len, DestStream); +copy(Fd, Sp, Len, DestStream) -> + {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK, fun(Bin, AccPointer) -> {ok, NewPointer} = write(DestStream, Bin), if AccPointer == null -> NewPointer; true -> AccPointer end diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 97228530..b9f6507f 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -179,9 +179,12 @@ handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) -> file:delete(Root ++ "/." ++ DbName ++ "_temp"), {noreply, Server}. +handle_info({'EXIT', _FromPid, normal}, Server) -> + {noreply, Server}; handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) -> case ets:lookup(couch_views_by_updater, FromPid) of [] -> % non-updater linked process must have died, we propagate the error + couch_log:error("Exit on non-updater process: ~p", [Reason]), exit(Reason); [{_, {DbName, "_temp_" ++ _ = GroupId}}] -> delete_from_ets(FromPid, DbName, GroupId), diff --git a/src/couchdb/mod_couch.erl b/src/couchdb/mod_couch.erl index 78c0853a..8373dbe9 100644 --- a/src/couchdb/mod_couch.erl +++ b/src/couchdb/mod_couch.erl @@ -500,14 +500,7 @@ handle_replication_request(#mod{entity_body=RawJson}=Mod) -> send_database_info(Mod, #uri_parts{db=DbName}=Parts) -> Db = open_db(Parts), {ok, InfoList} = couch_db:get_db_info(Db), - ok = send_header(Mod, 200, resp_json_header(Mod)), - DocCount = proplists:get_value(doc_count, InfoList), - LastUpdateSequence = proplists:get_value(last_update_seq, InfoList), - ok = send_chunk(Mod, "{\"db_name\": \"" ++ DbName ++ - "\", \"doc_count\":" ++ integer_to_list(DocCount) ++ - ", \"update_seq\":" ++ integer_to_list(LastUpdateSequence)++"}"), - ok = send_final_chunk(Mod), - {ok, 200}. + send_json(Mod, 200, {obj, [{db_name, DbName} | InfoList]}). send_doc(#mod{parsed_header=Headers}=Mod, #uri_parts{doc=DocId,querystr=QueryStr}=Parts) -> |