summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-04-04 03:10:34 +0000
committerDamien F. Katz <damien@apache.org>2008-04-04 03:10:34 +0000
commitafaa5d561826ccf7cab4fde2af9ad39d32ea4d0d (patch)
treee94b0bca1ee069f95f34c66fbcd21c2a03061ba5 /src
parent88627dd6d98acd1a6700994037f4da1362dbcb3e (diff)
compaction code, not hooked up to webserver yet
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@644593 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_db.erl219
-rw-r--r--src/couchdb/couch_file.erl6
-rw-r--r--src/couchdb/couch_stream.erl16
-rw-r--r--src/couchdb/couch_view.erl3
-rw-r--r--src/couchdb/mod_couch.erl9
5 files changed, 151 insertions, 102 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 51d55822..6f8a4ac6 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -17,9 +17,9 @@
-export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]).
-export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
--export([start_update_loop/1]).
+-export([start_update_loop/2]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([start_copy_compact_int/1,continue_copy_compact_int/2]).
+-export([start_copy_compact_int/2]).
-include("couch_db.hrl").
@@ -47,24 +47,52 @@
update_seq,
doc_count,
doc_del_count,
- name
+ name,
+ filepath
}).
% small value used in revision trees to indicate the revision isn't stored
-define(REV_MISSING, []).
start_link(DbName, Filepath, Options) ->
+ catch start_link0(DbName, Filepath, Options).
+
+start_link0(DbName, Filepath, Options) ->
+ % first delete the old file previous compaction
+ Fd =
case couch_file:open(Filepath, Options) of
- {ok, Fd} ->
- Result = gen_server:start_link(couch_db, {DbName, Fd, Options}, []),
- unlink(Fd),
- Result;
+ {ok, Fd0} ->
+ Fd0;
{error, enoent} ->
- % couldn't find file
- {error, not_found};
+ % couldn't find file. is there a compact version? This can happen if
+ % crashed during the file switch.
+ case couch_file:open(Filepath ++ ".compact") of
+ {ok, Fd0} ->
+ couch_log:info("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
+ ok = file:rename(Filepath ++ ".compact", Filepath),
+ Fd0;
+ {error, enoent} ->
+ throw({error, notfound})
+ end;
Else ->
- Else
- end.
+ throw(Else)
+ end,
+
+ StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []),
+ unlink(Fd),
+ case StartResult of
+ {ok, _} ->
+ % We successfully opened the db, delete old storage files if around
+ case file:delete(Filepath ++ ".old") of
+ ok ->
+ couch_log:info("Deleted old storage file ~s~s", [Filepath, ".old"]);
+ {error, enoent} ->
+ ok % normal result
+ end;
+ _ ->
+ ok
+ end,
+ StartResult.
%%% Interface functions %%%
@@ -146,12 +174,13 @@ get_db_info(Db) ->
doc_count=Count,
doc_del_count=DelCount,
update_seq=SeqNum} = Db,
+ {ok, Size} = couch_file:bytes(Fd),
InfoList = [
{doc_count, Count},
{doc_del_count, DelCount},
- {last_update_seq, SeqNum},
- {compacting, Compactor==nil},
- {size, couch_file:bytes(Fd)}
+ {update_seq, SeqNum},
+ {compacting, Compactor/=nil},
+ {size, Size}
],
{ok, InfoList}.
@@ -337,23 +366,12 @@ enum_docs(MainPid, StartId, InFun, Ctx) ->
% server functions
-init({DbName, Fd, Options}) ->
- link(Fd),
- case lists:member(create, Options) of
- true ->
- % create a new header and writes it to the file
- Header = #db_header{},
- ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
- ok = couch_file:sync(Fd);
- false ->
- {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>)
- end,
-
- Db = init_db(DbName, Fd, Header),
-
- UpdatePid = spawn_link(couch_db, start_update_loop, [Db]),
-
- {ok, Db#db{update_pid=UpdatePid}}.
+init(InitArgs) ->
+ spawn_link(couch_db, start_update_loop, [self(), InitArgs]),
+ receive
+ {initialized, Db} ->
+ {ok, Db}
+ end.
btree_by_seq_split(DocInfo) ->
#doc_info{
@@ -383,7 +401,7 @@ btree_by_name_join(Id, {Seq, Tree}) ->
#full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}.
-init_db(DbName, Fd, Header) ->
+init_db(DbName, Filepath, Fd, Header) ->
{ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
ok = couch_stream:set_min_buffer(SummaryStream, 10000),
{ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
@@ -395,7 +413,7 @@ init_db(DbName, Fd, Header) ->
{ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
#db{
- main_pid=self(),
+ update_pid=self(),
fd=Fd,
header=Header,
summary_stream = SummaryStream,
@@ -405,12 +423,16 @@ init_db(DbName, Fd, Header) ->
update_seq = Header#db_header.update_seq,
doc_count = Header#db_header.doc_count,
doc_del_count = Header#db_header.doc_del_count,
- name = DbName
+ name = DbName,
+ filepath=Filepath
}.
+close_db(#db{fd=Fd,summary_stream=Ss}) ->
+ couch_file:close(Fd),
+ couch_stream:close(Ss).
+
terminate(_Reason, Db) ->
- exit(Db#db.update_pid, kill),
- couch_file:close(Db#db.fd).
+ exit(Db#db.update_pid, kill).
handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) ->
Updater ! {From, update_docs, DocActions, Options},
@@ -435,17 +457,34 @@ handle_info(Msg, Db) ->
%%% Internal function %%%
-start_update_loop(Db) ->
- update_loop(Db#db{update_pid=self()}).
+start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) ->
+ link(Fd),
+
+ case lists:member(create, Options) of
+ true ->
+ % create a new header and writes it to the file
+ Header = #db_header{},
+ ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
+ % delete any old compaction files that might be hanging around
+ file:delete(Filepath ++ ".compact"),
+ file:delete(Filepath ++ ".old");
+ false ->
+ {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>)
+ end,
+
+ Db = init_db(DbName, Filepath, Fd, Header),
+ Db2 = Db#db{main_pid=MainPid},
+ MainPid ! {initialized, Db2},
+ update_loop(Db2).
-update_loop(Db) ->
+update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
receive
{OrigFrom, update_docs, DocActions, Options} ->
case (catch update_docs_int(Db, DocActions, Options)) of
{ok, Db2} ->
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(MainPid, {db_updated, Db2}),
gen_server:reply(OrigFrom, ok),
- couch_db_update_notifier:notify({updated, Db2#db.name}),
+ couch_db_update_notifier:notify({updated, Name}),
update_loop(Db2);
conflict ->
gen_server:reply(OrigFrom, conflict),
@@ -456,22 +495,40 @@ update_loop(Db) ->
compact ->
case Db#db.compactor_pid of
nil ->
- Pid = spawn_link(couch_db, start_copy_compact_int, [Db]),
+ couch_log:info("Starting compaction for db \"~s\"", [Name]),
+ Pid = spawn_link(couch_db, start_copy_compact_int, [Db, true]),
Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(MainPid, {db_updated, Db2}),
update_loop(Db2);
_ ->
update_loop(Db) % already started
end;
- {compact_done, #db{update_seq=CompactSeq}=NewDb} ->
- case CompactSeq == Db#db.update_seq of
+ {compact_done, CompactFilepath} ->
+ {ok, NewFd} = couch_file:open(CompactFilepath),
+ {ok, NewHeader} = couch_file:read_header(NewFd, <<$g, $m, $k, 0>>),
+ #db{update_seq=NewSeq}= NewDb =
+ init_db(Name, CompactFilepath, NewFd, NewHeader),
+ case Db#db.update_seq == NewSeq of
true ->
- NewDb2 = swap_files(Db, NewDb),
+ couch_log:debug("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]),
+ ok = file:rename(Filepath, Filepath ++ ".old"),
+ ok = file:rename(CompactFilepath, Filepath),
+
+ NewDb2 = NewDb#db{
+ main_pid = Db#db.main_pid,
+ doc_count = Db#db.doc_count,
+ doc_del_count = Db#db.doc_del_count,
+ filepath = Filepath},
+ close_db(Db),
+ ok = gen_server:call(MainPid, {db_updated, NewDb2}),
+ couch_log:info("Compaction for db ~p completed.", [Name]),
update_loop(NewDb2#db{compactor_pid=nil});
false ->
- Pid = spawn_link(couch_db, continue_copy_compact_int, [Db, NewDb]),
+ couch_log:info("Compaction file still behind main file "
+ "(update seq=~p. compact update seq=~p). Retrying.",
+ [Db#db.update_seq, NewSeq]),
+ Pid = spawn_link(couch_db, start_copy_compact_int, [Db, false]),
Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
update_loop(Db2)
end;
Else ->
@@ -479,14 +536,6 @@ update_loop(Db) ->
exit({error, Else})
end.
-swap_files(#db{fd=OldFd, name=Name}=_DbOld, DbNew) ->
- NormalFilename = couch_server:get_filename(Name),
- true = file:rename(NormalFilename, NormalFilename ++ ".old"),
- true = file:rename(NormalFilename ++ ".compact", NormalFilename),
- couch_file:close(OldFd),
- file:delete(NormalFilename ++ ".old"),
- DbNew.
-
get_db(MainPid) ->
{ok, Db} = gen_server:call(MainPid, get_db),
Db.
@@ -773,7 +822,6 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
commit_data(#db{fd=Fd, header=Header} = Db) ->
- ok = couch_file:sync(Fd), % commit outstanding data
Header2 = Header#db_header{
update_seq = Db#db.update_seq,
summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
@@ -787,7 +835,6 @@ commit_data(#db{fd=Fd, header=Header} = Db) ->
Db; % unchanged. nothing to do
true ->
ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2),
- ok = couch_file:sync(Fd), % commit header to disk
Db#db{header = Header2}
end.
@@ -795,21 +842,22 @@ copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
{ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
% copy the bin values
NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
- {ok, NewBinSp} = couch_stream:copy_stream(SrcFd, BinSp, Len, DestFd),
+ {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
{Name, {Type, NewBinSp, Len}}
end, BinInfos),
% now write the document summary
- {ok, _SummaryPointer} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}).
+ {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
+ Sp.
copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
[];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTrees]) ->
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) ->
% This is a leaf node, copy it over
NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
- [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTrees} | RestTrees]) ->
+ [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
% inner node, only copy info/data from leaf nodes
- [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTrees)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)].
+ [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
@@ -849,34 +897,33 @@ copy_compact_docs(Db, NewDb) ->
NewDb2
end.
-start_copy_compact_int(#db{name=Name}=Db) ->
- couch_log:debug("New compaction process spawned for db \"%s\"", [Name]),
- Filename = couch_server:get_compaction_filename(Name),
- case couch_file:open(Filename) of
+start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
+ CompactFile = Filepath ++ ".compact",
+ couch_log:debug("Compaction process spawned for db \"~s\"", [Name]),
+ case couch_file:open(CompactFile) of
{ok, Fd} ->
- couch_log:debug("Found existing compaction file for db \"%s\"", [Name]),
+ couch_log:debug("Found existing compaction file for db \"~s\"", [Name]),
{ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>);
{error, enoent} -> %
- {ok, Fd} = couch_file:open(Filename, [create]),
+ {ok, Fd} = couch_file:open(CompactFile, [create]),
Header = #db_header{},
- ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
- ok = couch_file:sync(Fd)
+ ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header)
end,
- NewDb = init_db(Name, Fd, Header),
+ NewDb = init_db(Name, CompactFile, Fd, Header),
NewDb2 = copy_compact_docs(Db, NewDb),
-
- % suck up all the local docs into memory and write them to the new db
- {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
- NewDb3 = commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}),
-
- NewDb3#db.update_pid ! {compact_done, NewDb3}.
-
-continue_copy_compact_int(#db{name=Name}=Db, NewDb) ->
- couch_log:debug("Continued compaction process spawned for db \"%s\"", [Name]),
- NewDb2 = copy_compact_docs(Db, NewDb),
- NewDb2#db.update_pid ! {compact_done, NewDb2}.
+ NewDb3 =
+ case CopyLocal of
+ true ->
+ % suck up all the local docs into memory and write them to the new db
+ {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+ fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+ {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
+ commit_data(NewDb2#db{local_docs_btree=NewLocalBtree});
+ _ ->
+ NewDb2
+ end,
+ close_db(NewDb3),
+ Db#db.update_pid ! {compact_done, CompactFile}.
\ No newline at end of file
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index 6cbad44a..3a1a7af1 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -142,8 +142,7 @@ close(Fd) ->
write_header(Fd, Prefix, Data) ->
- % The leading bytes in every db file, the sig and the file version:
- %the actual header data
+ ok = sync(Fd),
TermBin = term_to_binary(Data),
% the size of all the bytes written to the header, including the md5 signature (16 bytes)
FilledSize = size(Prefix) + size(TermBin) + 16,
@@ -159,7 +158,8 @@ write_header(Fd, Prefix, Data) ->
WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>,
?HEADER_SIZE = size(WriteBin), % sanity check
DblWriteBin = [WriteBin, WriteBin],
- ok = pwrite(Fd, 0, DblWriteBin)
+ ok = pwrite(Fd, 0, DblWriteBin),
+ ok = sync(Fd)
end.
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index ae1b4f2c..8d5260f1 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -15,7 +15,7 @@
-export([test/1]).
-export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
--export([copy/4]).
+-export([copy/4, copy_to_new_stream/4]).
-export([ensure_buffer/2, set_min_buffer/2]).
-export([init/1, terminate/2, handle_call/3]).
-export([handle_cast/2,code_change/3,handle_info/2]).
@@ -78,10 +78,16 @@ read(Fd, Sp, Num) ->
Bin = list_to_binary(lists:reverse(RevBin)),
{ok, Bin, Sp2}.
-copy(#stream{pid = _Pid, fd = Fd}, Sp, Num, DestStream) ->
- copy(Fd, Sp, Num, DestStream);
-copy(Fd, Sp, Num, DestStream) ->
- {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK,
+copy_to_new_stream(Src, Sp, Len, DestFd) ->
+ Dest = open(DestFd),
+ {ok, NewSp} = copy(Src, Sp, Len, Dest),
+ close(Dest),
+ {ok, NewSp}.
+
+copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) ->
+ copy(Fd, Sp, Len, DestStream);
+copy(Fd, Sp, Len, DestStream) ->
+ {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK,
fun(Bin, AccPointer) ->
{ok, NewPointer} = write(DestStream, Bin),
if AccPointer == null -> NewPointer; true -> AccPointer end
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index 97228530..b9f6507f 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -179,9 +179,12 @@ handle_cast({reset_indexes, DbName}, #server{root_dir=Root}=Server) ->
file:delete(Root ++ "/." ++ DbName ++ "_temp"),
{noreply, Server}.
+handle_info({'EXIT', _FromPid, normal}, Server) ->
+ {noreply, Server};
handle_info({'EXIT', FromPid, Reason}, #server{root_dir=RootDir}=Server) ->
case ets:lookup(couch_views_by_updater, FromPid) of
[] -> % non-updater linked process must have died, we propagate the error
+ couch_log:error("Exit on non-updater process: ~p", [Reason]),
exit(Reason);
[{_, {DbName, "_temp_" ++ _ = GroupId}}] ->
delete_from_ets(FromPid, DbName, GroupId),
diff --git a/src/couchdb/mod_couch.erl b/src/couchdb/mod_couch.erl
index 78c0853a..8373dbe9 100644
--- a/src/couchdb/mod_couch.erl
+++ b/src/couchdb/mod_couch.erl
@@ -500,14 +500,7 @@ handle_replication_request(#mod{entity_body=RawJson}=Mod) ->
send_database_info(Mod, #uri_parts{db=DbName}=Parts) ->
Db = open_db(Parts),
{ok, InfoList} = couch_db:get_db_info(Db),
- ok = send_header(Mod, 200, resp_json_header(Mod)),
- DocCount = proplists:get_value(doc_count, InfoList),
- LastUpdateSequence = proplists:get_value(last_update_seq, InfoList),
- ok = send_chunk(Mod, "{\"db_name\": \"" ++ DbName ++
- "\", \"doc_count\":" ++ integer_to_list(DocCount) ++
- ", \"update_seq\":" ++ integer_to_list(LastUpdateSequence)++"}"),
- ok = send_final_chunk(Mod),
- {ok, 200}.
+ send_json(Mod, 200, {obj, [{db_name, DbName} | InfoList]}).
send_doc(#mod{parsed_header=Headers}=Mod,
#uri_parts{doc=DocId,querystr=QueryStr}=Parts) ->