summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-08-05 01:43:40 +0000
committerDamien F. Katz <damien@apache.org>2008-08-05 01:43:40 +0000
commit88ec14c220592c8c0db7869c9961423e9ee97e7c (patch)
tree67974f234e4a0201302506e3b7c56a73cf909376 /src/couchdb/couch_db.erl
parentb218a0e7d425f7b3660433a17c6558f676524730 (diff)
Added concurrent open db limit and a LRU cache for closing old databases when limit reached (configurable via MaxDbsOpen var in couch.ini). Refactored db update code in couch_db.erl into couch_db_updater.erl.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@682560 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r--src/couchdb/couch_db.erl681
1 files changed, 74 insertions, 607 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 2f5df3b5..098f3de7 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -13,63 +13,24 @@
-module(couch_db).
-behaviour(gen_server).
--export([open/2,create/2,create/3,get_doc_info/2,start_compact/1]).
--export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]).
--export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]).
--export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
+-export([open/2,close/1,create/2,start_compact/1,get_db_info/1]).
+-export([open_ref_counted/2,num_refs/1,monitor/1]).
+-export([save_docs/3,update_doc/3,update_docs/2,update_docs/3,delete_doc/3]).
+-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
+-export([get_missing_revs/2]).
+-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1]).
--export([start_update_loop/2]).
+-export([start_link/3]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([start_copy_compact_int/2]).
--export([btree_by_id_split/1,
- btree_by_id_join/2,
- btree_by_id_reduce/2,
- btree_by_seq_split/1,
- btree_by_seq_join/2,
- btree_by_seq_reduce/2]).
-include("couch_db.hrl").
--record(db_header,
- {write_version = 0,
- update_seq = 0,
- summary_stream_state = nil,
- fulldocinfo_by_id_btree_state = nil,
- docinfo_by_seq_btree_state = nil,
- local_docs_btree_state = nil,
- doc_count=0,
- doc_del_count=0
- }).
-
--record(db,
- {main_pid=nil,
- update_pid=nil,
- compactor_pid=nil,
- fd,
- header = #db_header{},
- summary_stream,
- fulldocinfo_by_id_btree,
- docinfo_by_seq_btree,
- local_docs_btree,
- update_seq,
- doc_count,
- doc_del_count,
- name,
- filepath
- }).
-
-% small value used in revision trees to indicate the revision isn't stored
--define(REV_MISSING, []).
-
--define(HEADER_SIG, <<$g, $m, $k, 0>>).
-
start_link(DbName, Filepath, Options) ->
catch start_link0(DbName, Filepath, Options).
start_link0(DbName, Filepath, Options) ->
- % first delete the old file previous compaction
Fd =
case couch_file:open(Filepath, Options) of
{ok, Fd0} ->
@@ -105,33 +66,38 @@ start_link0(DbName, Filepath, Options) ->
end,
StartResult.
-%%% Interface functions %%%
-create(Filepath, Options) ->
- create(Filepath, Filepath, Options).
+create(DbName, Options) ->
+ couch_server:create(DbName, Options).
-create(DbName, Filepath, Options) when is_list(Options) ->
- start_link(DbName, Filepath, [create | Options]).
+open(DbName, Options) ->
+ couch_server:open(DbName, Options).
-open(DbName, Filepath) ->
- start_link(DbName, Filepath, []).
+close(#db{fd=Fd}) ->
+ couch_file:drop_ref(Fd).
+open_ref_counted(MainPid, OpeningPid) ->
+ gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}).
-% Compaction still needs work. Right now readers and writers can get an error
-% file compaction changeover. This doesn't need to be the case.
-start_compact(MainPid) ->
- gen_server:cast(MainPid, start_compact).
+num_refs(MainPid) ->
+ gen_server:call(MainPid, num_refs).
-delete_doc(MainPid, Id, Revisions) ->
+monitor(#db{main_pid=MainPid}) ->
+ erlang:monitor(process, MainPid).
+
+start_compact(#db{update_pid=Pid}) ->
+ gen_server:cast(Pid, start_compact).
+
+delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
- {ok, [Result]} = update_docs(MainPid, DeletedDocs, []),
+ {ok, [Result]} = update_docs(Db, DeletedDocs, []),
{ok, Result}.
-open_doc(MainPid, IdOrDocInfo) ->
- open_doc(MainPid, IdOrDocInfo, []).
+open_doc(Db, IdOrDocInfo) ->
+ open_doc(Db, IdOrDocInfo, []).
-open_doc(MainPid, Id, Options) ->
- case open_doc_int(get_db(MainPid), Id, Options) of
+open_doc(Db, Id, Options) ->
+ case open_doc_int(Db, Id, Options) of
{ok, #doc{deleted=true}=Doc} ->
case lists:member(deleted, Options) of
true ->
@@ -143,13 +109,13 @@ open_doc(MainPid, Id, Options) ->
Else
end.
-open_doc_revs(MainPid, Id, Revs, Options) ->
- [Result] = open_doc_revs_int(get_db(MainPid), [{Id, Revs}], Options),
+open_doc_revs(Db, Id, Revs, Options) ->
+ [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options),
Result.
-get_missing_revs(MainPid, IdRevsList) ->
+get_missing_revs(Db, IdRevsList) ->
Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
- FullDocInfoResults = get_full_doc_infos(MainPid, Ids),
+ FullDocInfoResults = get_full_doc_infos(Db, Ids),
Results = lists:zipwith(
fun({Id, Revs}, FullDocInfoResult) ->
case FullDocInfoResult of
@@ -177,18 +143,12 @@ get_full_doc_info(Db, Id) ->
[Result] = get_full_doc_infos(Db, [Id]),
Result.
-
-get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) ->
- get_full_doc_infos(get_db(MainPid), Ids);
-get_full_doc_infos(#db{}=Db, Ids) ->
+get_full_doc_infos(Db, Ids) ->
couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
-increment_update_seq(MainPid) ->
- gen_server:call(MainPid, increment_update_seq).
+increment_update_seq(#db{update_pid=UpdatePid}) ->
+ gen_server:call(UpdatePid, increment_update_seq).
-
-get_db_info(MainPid) when is_pid(MainPid) ->
- get_db_info(get_db(MainPid));
get_db_info(Db) ->
#db{fd=Fd,
compactor_pid=Compactor,
@@ -205,12 +165,12 @@ get_db_info(Db) ->
],
{ok, InfoList}.
-update_doc(MainPid, Doc, Options) ->
- {ok, [NewRev]} = update_docs(MainPid, [Doc], Options),
+update_doc(Db, Doc, Options) ->
+ {ok, [NewRev]} = update_docs(Db, [Doc], Options),
{ok, NewRev}.
-update_docs(MainPid, Docs) ->
- update_docs(MainPid, Docs, []).
+update_docs(Db, Docs) ->
+ update_docs(Db, Docs, []).
% group_alike_docs groups the sorted documents into sublist buckets, by id.
% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
@@ -263,7 +223,7 @@ prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocI
end
end.
-update_docs(MainPid, Docs, Options) ->
+update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
% go ahead and generate the new revision ids for the documents.
Docs2 = lists:map(
fun(#doc{id=Id,revs=Revs}=Doc) ->
@@ -278,7 +238,6 @@ update_docs(MainPid, Docs, Options) ->
NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2],
DocBuckets = group_alike_docs(Docs2),
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
- Db = get_db(MainPid),
% lookup the doc by id and get the most recent
@@ -298,13 +257,14 @@ update_docs(MainPid, Docs, Options) ->
% flush unwritten binaries to disk.
DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2],
- case gen_server:call(MainPid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of
ok -> {ok, NewRevs};
retry ->
- Db2 = get_db(MainPid),
+ Db2 = open_ref_counted(Db#db.main_pid, self()),
DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3],
% We only retry once
- case gen_server:call(MainPid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of
+ ok = close(Db2),
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of
ok -> {ok, NewRevs};
Else -> throw(Else)
end;
@@ -312,15 +272,11 @@ update_docs(MainPid, Docs, Options) ->
throw(Else)
end.
-save_docs(MainPid, Docs) ->
- save_docs(MainPid, Docs, []).
-
-save_docs(MainPid, Docs, Options) ->
+save_docs(#db{update_pid=UpdatePid, fd=Fd}, Docs, Options) ->
% flush unwritten binaries to disk.
- Db = get_db(MainPid),
DocBuckets = group_alike_docs(Docs),
- DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
- ok = gen_server:call(MainPid, {update_docs, DocBuckets2, Options}, infinity).
+ DocBuckets2 = [[doc_flush_binaries(Doc, Fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ ok = gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity).
doc_flush_binaries(Doc, Fd) ->
@@ -379,125 +335,51 @@ doc_flush_binaries(Doc, Fd) ->
Doc#doc{attachments = NewBins}.
enum_docs_since_reduce_to_count(Reds) ->
- couch_btree:final_reduce(fun btree_by_seq_reduce/2, Reds).
+ couch_btree:final_reduce(fun couch_db_updater:btree_by_seq_reduce/2, Reds).
enum_docs_reduce_to_count(Reds) ->
- couch_btree:final_reduce(fun btree_by_id_reduce/2, Reds).
+ couch_btree:final_reduce(fun couch_db_updater:btree_by_id_reduce/2, Reds).
-enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) ->
- Db = get_db(MainPid),
+enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) ->
couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx).
-enum_docs_since(MainPid, SinceSeq, InFun, Acc) ->
- enum_docs_since(MainPid, SinceSeq, fwd, InFun, Acc).
+enum_docs_since(Db, SinceSeq, InFun, Acc) ->
+ enum_docs_since(Db, SinceSeq, fwd, InFun, Acc).
-enum_docs(MainPid, StartId, Direction, InFun, InAcc) ->
- Db = get_db(MainPid),
+enum_docs(Db, StartId, Direction, InFun, InAcc) ->
couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc).
-enum_docs(MainPid, StartId, InFun, Ctx) ->
- enum_docs(MainPid, StartId, fwd, InFun, Ctx).
+enum_docs(Db, StartId, InFun, Ctx) ->
+ enum_docs(Db, StartId, fwd, InFun, Ctx).
% server functions
-init(InitArgs) ->
- spawn_link(couch_db, start_update_loop, [self(), InitArgs]),
- receive
- {initialized, Db} ->
- {ok, Db}
- end.
-
-btree_by_seq_split(DocInfo) ->
- #doc_info{
- id = Id,
- rev = Rev,
- update_seq = Seq,
- summary_pointer = Sp,
- conflict_revs = Conflicts,
- deleted_conflict_revs = DelConflicts,
- deleted = Deleted} = DocInfo,
- {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}.
-
-btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) ->
- #doc_info{
- id = Id,
- rev = Rev,
- update_seq = Seq,
- summary_pointer = Sp,
- conflict_revs = Conflicts,
- deleted_conflict_revs = DelConflicts,
- deleted = Deleted}.
-
-btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
- deleted=Deleted, rev_tree=Tree}) ->
- {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}.
-
-btree_by_id_join(Id, {Seq, Deleted, Tree}) ->
- #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}.
-
-
-
-btree_by_id_reduce(reduce, FullDocInfos) ->
- % count the number of deleted documents
- length([1 || #full_doc_info{deleted=false} <- FullDocInfos]);
-btree_by_id_reduce(rereduce, Reds) ->
- lists:sum(Reds).
-
-btree_by_seq_reduce(reduce, DocInfos) ->
- % count the number of deleted documents
- length(DocInfos);
-btree_by_seq_reduce(rereduce, Reds) ->
- lists:sum(Reds).
-
-init_db(DbName, Filepath, Fd, Header) ->
- {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
- ok = couch_stream:set_min_buffer(SummaryStream, 10000),
- {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
- [{split, fun btree_by_id_split/1},
- {join, fun btree_by_id_join/2},
- {reduce, fun btree_by_id_reduce/2}]),
- {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
- [{split, fun btree_by_seq_split/1},
- {join, fun btree_by_seq_join/2},
- {reduce, fun btree_by_seq_reduce/2}]),
- {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
-
- #db{
- update_pid=self(),
- fd=Fd,
- header=Header,
- summary_stream = SummaryStream,
- fulldocinfo_by_id_btree = IdBtree,
- docinfo_by_seq_btree = SeqBtree,
- local_docs_btree = LocalDocsBtree,
- update_seq = Header#db_header.update_seq,
- doc_count = Header#db_header.doc_count,
- doc_del_count = Header#db_header.doc_del_count,
- name = DbName,
- filepath=Filepath }.
+init({DbName, Filepath, Fd, Options}) ->
+ {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
+ ok = couch_file:add_ref(Fd),
+ gen_server:call(UpdaterPid, get_db).
-close_db(#db{fd=Fd,summary_stream=Ss}) ->
- couch_file:close(Fd),
- couch_stream:close(Ss).
-
terminate(_Reason, Db) ->
exit(Db#db.update_pid, kill).
-handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) ->
- Updater ! {From, update_docs, DocActions, Options},
- {noreply, Db};
-handle_call(increment_update_seq, From, #db{update_pid=Updater}=Db) ->
- Updater ! {From, increment_update_seq},
- {noreply, Db};
-handle_call(get_db, _From, Db) ->
+handle_call({open_ref_counted_instance, OpenerPid}, _From, #db{fd=Fd}=Db) ->
+ ok = couch_file:add_ref(Fd, OpenerPid),
{reply, {ok, Db}, Db};
-handle_call({db_updated, NewDb}, _From, _OldDb) ->
+handle_call(num_refs, _From, #db{fd=Fd}=Db) ->
+ {reply, couch_file:num_refs(Fd) - 1, Db};
+handle_call({db_updated, #db{fd=NewFd}=NewDb}, _From, #db{fd=OldFd}) ->
+ case NewFd == OldFd of
+ true -> ok;
+ false ->
+ couch_file:add_ref(NewFd),
+ couch_file:drop_ref(OldFd)
+ end,
{reply, ok, NewDb}.
-handle_cast(start_compact, #db{update_pid=Updater}=Db) ->
- Updater ! compact,
- {noreply, Db}.
+handle_cast(Msg, Db) ->
+ ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]),
+ exit({error, Msg}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -508,114 +390,6 @@ handle_info(Msg, Db) ->
%%% Internal function %%%
-
-start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) ->
- link(Fd),
-
- case lists:member(create, Options) of
- true ->
- % create a new header and writes it to the file
- Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
- % delete any old compaction files that might be hanging around
- file:delete(Filepath ++ ".compact"),
- file:delete(Filepath ++ ".old");
- false ->
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
- end,
-
- Db = init_db(DbName, Filepath, Fd, Header),
- Db2 = Db#db{main_pid=MainPid},
- MainPid ! {initialized, Db2},
- update_loop(Db2).
-
-update_loop(#db{fd=Fd,name=Name,
- filepath=Filepath,
- main_pid=MainPid,
- update_seq=UpdateSeq}=Db) ->
- receive
- {OrigFrom, update_docs, DocActions, Options} ->
- case (catch update_docs_int(Db, DocActions, Options)) of
- {ok, Db2} ->
- ok = gen_server:call(MainPid, {db_updated, Db2}),
- gen_server:reply(OrigFrom, ok),
- couch_db_update_notifier:notify({updated, Name}),
- update_loop(Db2);
- retry ->
- gen_server:reply(OrigFrom, retry),
- update_loop(Db);
- conflict ->
- gen_server:reply(OrigFrom, conflict),
- update_loop(Db);
- Error ->
- exit(Error) % we crashed
- end;
- compact ->
- case Db#db.compactor_pid of
- nil ->
- ?LOG_INFO("Starting compaction for db \"~s\"", [Name]),
- Pid = spawn_link(couch_db, start_copy_compact_int, [Db, true]),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(MainPid, {db_updated, Db2}),
- update_loop(Db2);
- _ ->
- update_loop(Db) % already started
- end;
- {compact_done, CompactFilepath} ->
- {ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
- #db{update_seq=NewSeq}= NewDb =
- init_db(Name, CompactFilepath, NewFd, NewHeader),
- case Db#db.update_seq == NewSeq of
- true ->
- NewDb2 = commit_data(
- NewDb#db{
- main_pid = Db#db.main_pid,
- doc_count = Db#db.doc_count,
- doc_del_count = Db#db.doc_del_count,
- filepath = Filepath}),
-
- ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]),
- ok = file:rename(Filepath, Filepath ++ ".old"),
- ok = file:rename(CompactFilepath, Filepath),
-
- couch_stream:close(Db#db.summary_stream),
- % close file handle async.
- % wait 5 secs before closing, allowing readers to finish
- unlink(Fd),
- spawn_link(fun() ->
- receive after 5000 -> ok end,
- couch_file:close(Fd),
- file:delete(Filepath ++ ".old")
- end),
-
- ok = gen_server:call(MainPid, {db_updated, NewDb2}),
- ?LOG_INFO("Compaction for db ~p completed.", [Name]),
- update_loop(NewDb2#db{compactor_pid=nil});
- false ->
- ?LOG_INFO("Compaction file still behind main file "
- "(update seq=~p. compact update seq=~p). Retrying.",
- [Db#db.update_seq, NewSeq]),
- Pid = spawn_link(couch_db, start_copy_compact_int, [Db, false]),
- Db2 = Db#db{compactor_pid=Pid},
- couch_file:close(NewFd),
- update_loop(Db2)
- end;
- {OrigFrom, increment_update_seq} ->
- Db2 = commit_data(Db#db{update_seq=UpdateSeq+1}),
- ok = gen_server:call(MainPid, {db_updated, Db2}),
- gen_server:reply(OrigFrom, {ok, UpdateSeq+1}),
- couch_db_update_notifier:notify({updated, Name}),
- update_loop(Db2);
- Else ->
- ?LOG_ERROR("Unknown message received in db ~s:~p", [Db#db.name, Else]),
- exit({error, Else})
- end.
-
-get_db(MainPid) ->
- {ok, Db} = gen_server:call(MainPid, get_db),
- Db.
-
open_doc_revs_int(Db, IdRevs, Options) ->
Ids = [Id || {Id, _Revs} <- IdRevs],
LookupResults = get_full_doc_infos(Db, Ids),
@@ -711,16 +485,6 @@ doc_meta_info(DocInfo, RevTree, Options) ->
end
end.
-% rev tree functions
-
-doc_to_tree(Doc) ->
- doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
-
-doc_to_tree(Doc, [RevId]) ->
- [{RevId, Doc, []}];
-doc_to_tree(Doc, [RevId | Rest]) ->
- [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
-
make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
{BodyData, BinValues} =
case SummaryPointer of
@@ -737,303 +501,6 @@ make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
attachments = BinValues,
deleted = Deleted
}.
-
-flush_trees(_Db, [], AccFlushedTrees) ->
- {ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
- #full_doc_info{rev_tree=Unflushed} = InfoUnflushed,
- Flushed = couch_key_tree:map(
- fun(_Rev, Value) ->
- case Value of
- #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
- % this node value is actually an unwritten document summary,
- % write to disk.
- % make sure the Fd in the written bins is the same Fd we are.
- Bins =
- case Atts of
- [] -> [];
- [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
- % convert bins, removing the FD.
- % All bins should have been flushed to disk already.
- [{BinName, {BinType, BinSp, BinLen}}
- || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
- <- Atts];
- _ ->
- % BinFd must not equal our Fd. This can happen when a database
- % is being updated during a compaction
- ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
- throw(retry)
- end,
- {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
- {IsDeleted, NewSummaryPointer};
- _ ->
- Value
- end
- end, Unflushed),
- flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-
-merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) ->
- {ok, lists:reverse(AccNewInfos), AccSeq};
-merge_rev_trees(NoConflicts, [NewDocs|RestDocsList],
- [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) ->
- #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo,
- UpdatesRevTree = lists:foldl(
- fun(NewDoc, AccTree) ->
- couch_key_tree:merge(AccTree, doc_to_tree(NewDoc))
- end,
- [], NewDocs),
- NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree),
- if NewRevTree == OldTree ->
- % nothing changed
- merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq);
- true ->
- if NoConflicts andalso OldTree /= [] ->
- OldConflicts = couch_key_tree:count_leafs(OldTree),
- NewConflicts = couch_key_tree:count_leafs(NewRevTree),
- if NewConflicts > OldConflicts ->
- throw(conflict);
- true -> ok
- end;
- true -> ok
- end,
- NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
- merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo,
- [NewInfo|AccNewInfos],AccSeq+1)
- end.
-
-new_index_entries([], DocCount, DelCount, AccById, AccBySeq) ->
- {ok, DocCount, DelCount, AccById, AccBySeq};
-new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) ->
- #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo),
- {DocCount2, DelCount2} =
- if Deleted -> {DocCount, DelCount + 1};
- true -> {DocCount + 1, DelCount}
- end,
- new_index_entries(RestInfos, DocCount2, DelCount2,
- [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
- [DocInfo|AccBySeq]).
-
-update_docs_int(Db, DocsList, Options) ->
- #db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree,
- docinfo_by_seq_btree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- doc_count = FullDocCount,
- doc_del_count = FullDelCount
- } = Db,
-
- % separate out the NonRep documents from the rest of the documents
- {DocsList2, NonRepDocs} = lists:foldl(
- fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
- case Id of
- ?LOCAL_DOC_PREFIX ++ _ when Rest==[] ->
- % when saving NR (non rep) documents, you can only save a single rev
- {DocsListAcc, [Doc | NonRepDocsAcc]};
- Id->
- {[Docs | DocsListAcc], NonRepDocsAcc}
- end
- end, {[], []}, DocsList),
-
- Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
-
- % lookup up the existing documents, if they exist.
- OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
- OldDocInfos = lists:zipwith(
- fun(_Id, {ok, FullDocInfo}) ->
- FullDocInfo;
- (Id, not_found) ->
- #full_doc_info{id=Id}
- end,
- Ids, OldDocLookups),
-
- {OldCount, OldDelCount} = lists:foldl(
- fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) ->
- case couch_doc:to_doc_info(FullDocInfo) of
- #doc_info{deleted=false} ->
- {OldCountAcc + 1, OldDelCountAcc};
- _ ->
- {OldCountAcc, OldDelCountAcc + 1}
- end;
- (not_found, Acc) ->
- Acc
- end, {0, 0}, OldDocLookups),
-
- % Merge the new docs into the revision trees.
- NoConflicts = lists:member(new_edits, Options),
- {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq),
-
- RemoveSeqs =
- [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
-
- % All regular documents are now ready to write.
-
- % Try to write the local documents first, a conflict might be generated
- {ok, Db2} = update_local_docs(Db, NonRepDocs),
-
- % Write out the documents summaries (they are stored in the nodes of the rev trees)
- {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []),
-
- {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} =
- new_index_entries(FlushedDocInfos, 0, 0, [], []),
-
- % and the indexes to the documents
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
-
- Db3 = Db2#db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree2,
- docinfo_by_seq_btree = DocInfoBySeqBTree2,
- update_seq = NewSeq,
- doc_count = FullDocCount + NewDocsCount - OldCount,
- doc_del_count = FullDelCount + NewDelCount - OldDelCount},
-
- case lists:member(delay_commit, Options) of
- true ->
- {ok, Db3};
- false ->
- {ok, commit_data(Db3)}
- end.
-
-update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
- Ids = [Id || #doc{id=Id} <- Docs],
- OldDocLookups = couch_btree:lookup(Btree, Ids),
- BtreeEntries = lists:zipwith(
- fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
- NewRev =
- case Revs of
- [] -> 0;
- [RevStr|_] -> list_to_integer(RevStr)
- end,
- OldRev =
- case OldDocLookup of
- {ok, {_, {OldRev0, _}}} -> OldRev0;
- not_found -> 0
- end,
- case OldRev + 1 == NewRev of
- true ->
- case Delete of
- false -> {update, {Id, {NewRev, Body}}};
- true -> {remove, Id}
- end;
- false ->
- throw(conflict)
- end
-
- end, Docs, OldDocLookups),
-
- BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
- BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
-
- {ok, Btree2} =
- couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
-
- {ok, Db#db{local_docs_btree = Btree2}}.
-
-
-
-commit_data(#db{fd=Fd, header=Header} = Db) ->
- Header2 = Header#db_header{
- update_seq = Db#db.update_seq,
- summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
- docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
- fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
- local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
- doc_count = Db#db.doc_count,
- doc_del_count = Db#db.doc_del_count
- },
- if Header == Header2 ->
- Db; % unchanged. nothing to do
- true ->
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
- Db#db{header = Header2}
- end.
-
-copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
- {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
- % copy the bin values
- NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
- {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
- {Name, {Type, NewBinSp, Len}}
- end, BinInfos),
- % now write the document summary
- {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
- Sp.
-
-copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
- [];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) ->
- % This is a leaf node, copy it over
- NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
- [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
- % inner node, only copy info/data from leaf nodes
- [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
-
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) ->
- Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
- LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
- NewFullDocInfos = lists:map(
- fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
- end, LookupResults),
- NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos],
- {ok, DocInfoBTree} =
- couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []),
- {ok, FullDocInfoBTree} =
- couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
- NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}.
-
-
-
-copy_compact_docs(Db, NewDb) ->
- EnumBySeqFun =
- fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
- case couch_util:should_flush() of
- true ->
- NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])),
- {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
- false ->
- {ok, {AccNewDb, [DocInfo | AccUncopied]}}
- end
- end,
- {ok, {NewDb2, Uncopied}} =
- couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}),
-
- case Uncopied of
- [#doc_info{update_seq=LastSeq} | _] ->
- commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq},
- lists:reverse(Uncopied)));
- [] ->
- NewDb2
- end.
-
-start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
- CompactFile = Filepath ++ ".compact",
- ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
- case couch_file:open(CompactFile) of
- {ok, Fd} ->
- ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]),
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
- {error, enoent} -> %
- {ok, Fd} = couch_file:open(CompactFile, [create]),
- Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header)
- end,
- NewDb = init_db(Name, CompactFile, Fd, Header),
- NewDb2 = copy_compact_docs(Db, NewDb),
- NewDb3 =
- case CopyLocal of
- true ->
- % suck up all the local docs into memory and write them to the new db
- {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
- commit_data(NewDb2#db{local_docs_btree=NewLocalBtree});
- _ ->
- NewDb2
- end,
- close_db(NewDb3),
- Db#db.update_pid ! {compact_done, CompactFile}.
\ No newline at end of file