summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_db.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/couch/src/couch_db.erl')
-rw-r--r--apps/couch/src/couch_db.erl1207
1 files changed, 1207 insertions, 0 deletions
diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl
new file mode 100644
index 00000000..c01b0a35
--- /dev/null
+++ b/apps/couch/src/couch_db.erl
@@ -0,0 +1,1207 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db).
+
+-export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]).
+-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
+-export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
+-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
+-export([set_revs_limit/2,get_revs_limit/1]).
+-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]).
+-export([enum_docs/4,enum_docs_since/5]).
+-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
+-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
+-export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]).
+-export([set_security/2,get_security/1]).
+-export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]).
+-export([check_is_admin/1, check_is_reader/1, get_doc_count/1]).
+-export([reopen/1, make_doc/5]).
+
+-include("couch_db.hrl").
+
+start_link(DbName, Filepath, Options) ->
+ case open_db_file(Filepath, Options) of
+ {ok, Fd} ->
+ {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
+ Filepath, Fd, Options}, []),
+ unlink(Fd),
+ gen_server:call(UpdaterPid, get_db);
+ Else ->
+ Else
+ end.
+
+open_db_file(Filepath, Options) ->
+ case couch_file:open(Filepath, Options) of
+ {ok, Fd} ->
+ {ok, Fd};
+ {error, enoent} ->
+ % couldn't find file. is there a compact version? This can happen if
+ % crashed during the file switch.
+ case couch_file:open(Filepath ++ ".compact") of
+ {ok, Fd} ->
+ ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
+ ok = file:rename(Filepath ++ ".compact", Filepath),
+ ok = couch_file:sync(Filepath),
+ {ok, Fd};
+ {error, enoent} ->
+ {not_found, no_db_file}
+ end;
+ Error ->
+ Error
+ end.
+
+
+create(DbName, Options) ->
+ couch_server:create(DbName, Options).
+
+% this is for opening a database for internal purposes like the replicator
+% or the view indexer. it never throws a reader error.
+open_int(DbName, Options) ->
+ couch_server:open(DbName, Options).
+
+% this should be called anytime an http request opens the database.
+% it ensures that the http userCtx is a valid reader
+open(DbName, Options) ->
+ case couch_server:open(DbName, Options) of
+ {ok, Db} ->
+ try
+ check_is_reader(Db),
+ {ok, Db}
+ catch
+ throw:Error ->
+ close(Db),
+ throw(Error)
+ end;
+ Else -> Else
+ end.
+
+reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
+ {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity),
+ case NewFd =:= Fd of
+ true ->
+ {ok, NewDb#db{user_ctx = UserCtx}};
+ false ->
+ erlang:demonitor(OldRef),
+ NewRef = erlang:monitor(process, NewFd),
+ {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
+ end.
+
+ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
+ ok = gen_server:call(Pid, full_commit, infinity),
+ {ok, StartTime}.
+
+ensure_full_commit(Db, RequiredSeq) ->
+ #db{main_pid=Pid, instance_start_time=StartTime} = Db,
+ ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
+ {ok, StartTime}.
+
+close(#db{fd_monitor=RefCntr}) ->
+ erlang:demonitor(RefCntr).
+
+open_ref_counted(MainPid, OpenedPid) ->
+ gen_server:call(MainPid, {open_ref_count, OpenedPid}).
+
+is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
+ case erlang:process_info(Db#db.fd, monitored_by) of
+ undefined ->
+ true;
+ {monitored_by, Pids} ->
+ (Pids -- [Db#db.main_pid, whereis(couch_stats_collector)]) =:= []
+ end;
+is_idle(_Db) ->
+ false.
+
+monitor(#db{main_pid=MainPid}) ->
+ erlang:monitor(process, MainPid).
+
+start_compact(#db{main_pid=Pid}) ->
+ {ok, _} = gen_server:call(Pid, start_compact),
+ ok.
+
+delete_doc(Db, Id, Revisions) ->
+ DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
+ {ok, [Result]} = update_docs(Db, DeletedDocs, []),
+ {ok, Result}.
+
+open_doc(Db, IdOrDocInfo) ->
+ open_doc(Db, IdOrDocInfo, []).
+
+open_doc(Db, Id, Options) ->
+ increment_stat(Db, {couchdb, database_reads}),
+ case open_doc_int(Db, Id, Options) of
+ {ok, #doc{deleted=true}=Doc} ->
+ case lists:member(deleted, Options) of
+ true ->
+ apply_open_options({ok, Doc},Options);
+ false ->
+ {not_found, deleted}
+ end;
+ Else ->
+ apply_open_options(Else,Options)
+ end.
+
+apply_open_options({ok, Doc},Options) ->
+ apply_open_options2(Doc,Options);
+apply_open_options(Else,_Options) ->
+ Else.
+
+apply_open_options2(Doc,[]) ->
+ {ok, Doc};
+apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
+ [{atts_since, PossibleAncestors}|Rest]) ->
+ RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
+ apply_open_options2(Doc#doc{atts=[A#att{data=
+ if AttPos>RevPos -> Data; true -> stub end}
+ || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest);
+apply_open_options2(Doc,[_|Rest]) ->
+ apply_open_options2(Doc,Rest).
+
+
+find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
+ 0;
+find_ancestor_rev_pos(_DocRevs, []) ->
+ 0;
+find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
+ case lists:member({RevPos, RevId}, AttsSinceRevs) of
+ true ->
+ RevPos;
+ false ->
+ find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
+ end.
+
+open_doc_revs(Db, Id, Revs, Options) ->
+ increment_stat(Db, {couchdb, database_reads}),
+ [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
+ {ok, [apply_open_options(Result, Options) || Result <- Results]}.
+
+% Each returned result is a list of tuples:
+% {Id, MissingRevs, PossibleAncestors}
+% if no revs are missing, it's omitted from the results.
+get_missing_revs(Db, IdRevsList) ->
+ Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]),
+ {ok, find_missing(IdRevsList, Results)}.
+
+find_missing([], []) ->
+ [];
+find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
+ case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
+ [] ->
+ find_missing(RestIdRevs, RestLookupInfo);
+ MissingRevs ->
+ #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
+ LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
+ % Find the revs that are possible parents of this rev
+ PossibleAncestors =
+ lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
+ % this leaf is a "possible ancenstor" of the missing
+ % revs if this LeafPos lessthan any of the missing revs
+ case lists:any(fun({MissingPos, _}) ->
+ LeafPos < MissingPos end, MissingRevs) of
+ true ->
+ [{LeafPos, LeafRevId} | Acc];
+ false ->
+ Acc
+ end
+ end, [], LeafRevs),
+ [{Id, MissingRevs, PossibleAncestors} |
+ find_missing(RestIdRevs, RestLookupInfo)]
+ end;
+find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
+ [{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)].
+
+get_doc_info(Db, Id) ->
+ case get_full_doc_info(Db, Id) of
+ {ok, DocInfo} ->
+ {ok, couch_doc:to_doc_info(DocInfo)};
+ Else ->
+ Else
+ end.
+
+% returns {ok, DocInfo} or not_found
+get_full_doc_info(Db, Id) ->
+ [Result] = get_full_doc_infos(Db, [Id]),
+ Result.
+
+get_full_doc_infos(Db, Ids) ->
+ couch_btree:lookup(Db#db.id_tree, Ids).
+
+increment_update_seq(#db{main_pid=Pid}) ->
+ gen_server:call(Pid, increment_update_seq).
+
+purge_docs(#db{main_pid=Pid}, IdsRevs) ->
+ gen_server:call(Pid, {purge_docs, IdsRevs}).
+
+get_committed_update_seq(#db{committed_update_seq=Seq}) ->
+ Seq.
+
+get_update_seq(#db{update_seq=Seq})->
+ Seq.
+
+get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
+ PurgeSeq.
+
+get_last_purged(#db{header=#db_header{purged_docs=nil}}) ->
+ {ok, []};
+get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) ->
+ couch_file:pread_term(Fd, PurgedPointer).
+
+get_doc_count(Db) ->
+ {ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree),
+ {ok, Count}.
+
+get_db_info(Db) ->
+ #db{fd=Fd,
+ header=#db_header{disk_version=DiskVersion},
+ compactor_pid=Compactor,
+ update_seq=SeqNum,
+ name=Name,
+ id_tree=FullDocBtree,
+ instance_start_time=StartTime,
+ committed_update_seq=CommittedUpdateSeq} = Db,
+ {ok, Size} = couch_file:bytes(Fd),
+ {ok, {Count, DelCount, DataSize}} = couch_btree:full_reduce(FullDocBtree),
+ InfoList = [
+ {db_name, Name},
+ {doc_count, Count},
+ {doc_del_count, DelCount},
+ {update_seq, SeqNum},
+ {purge_seq, couch_db:get_purge_seq(Db)},
+ {compact_running, Compactor/=nil},
+ {disk_size, Size},
+ {other, {[{data_size, DataSize}]}},
+ {instance_start_time, StartTime},
+ {disk_format_version, DiskVersion},
+ {committed_update_seq, CommittedUpdateSeq}
+ ],
+ {ok, InfoList}.
+
+get_design_docs(#db{name = <<"shards/", _/binary>> = ShardName}) ->
+ {_, Ref} = spawn_monitor(fun() ->
+ exit(fabric:design_docs(mem3:dbname(ShardName)))
+ end),
+ receive {'DOWN', Ref, _, _, Response} ->
+ Response
+ end;
+get_design_docs(#db{id_tree=Btree}=Db) ->
+ {ok, _, Docs} = couch_view:fold(
+ #view{btree=Btree},
+ fun(#full_doc_info{deleted = true}, _Reds, AccDocs) ->
+ {ok, AccDocs};
+ (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) ->
+ {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []),
+ {ok, [Doc | AccDocs]};
+ (_, _Reds, AccDocs) ->
+ {stop, AccDocs}
+ end,
+ [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]),
+ {ok, Docs}.
+
+check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) ->
+ {Admins} = get_admins(Db),
+ AdminRoles = [<<"_admin">> | couch_util:get_value(<<"roles">>, Admins, [])],
+ AdminNames = couch_util:get_value(<<"names">>, Admins,[]),
+ case AdminRoles -- Roles of
+ AdminRoles -> % same list, not an admin role
+ case AdminNames -- [Name] of
+ AdminNames -> % same names, not an admin
+ throw({unauthorized, <<"You are not a db or server admin.">>});
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end.
+
+check_is_reader(#db{user_ctx=#user_ctx{name=Name,roles=Roles}=UserCtx}=Db) ->
+ case (catch check_is_admin(Db)) of
+ ok -> ok;
+ _ ->
+ {Readers} = get_readers(Db),
+ ReaderRoles = couch_util:get_value(<<"roles">>, Readers,[]),
+ WithAdminRoles = [<<"_admin">> | ReaderRoles],
+ ReaderNames = couch_util:get_value(<<"names">>, Readers,[]),
+ case ReaderRoles ++ ReaderNames of
+ [] -> ok; % no readers == public access
+ _Else ->
+ case WithAdminRoles -- Roles of
+ WithAdminRoles -> % same list, not an reader role
+ case ReaderNames -- [Name] of
+ ReaderNames -> % same names, not a reader
+ ?LOG_DEBUG("Not a reader: UserCtx ~p vs Names ~p Roles ~p",[UserCtx, ReaderNames, WithAdminRoles]),
+ throw({unauthorized, <<"You are not authorized to access this db.">>});
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end
+ end
+ end.
+
+get_admins(#db{security=SecProps}) ->
+ couch_util:get_value(<<"admins">>, SecProps, {[]}).
+
+get_readers(#db{security=SecProps}) ->
+ couch_util:get_value(<<"readers">>, SecProps, {[]}).
+
+get_security(#db{security=SecProps}) ->
+ {SecProps}.
+
+set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
+ check_is_admin(Db),
+ ok = validate_security_object(NewSecProps),
+ ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity),
+ {ok, _} = ensure_full_commit(Db),
+ ok;
+set_security(_, _) ->
+ throw(bad_request).
+
+validate_security_object(SecProps) ->
+ Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}),
+ Readers = couch_util:get_value(<<"readers">>, SecProps, {[]}),
+ ok = validate_names_and_roles(Admins),
+ ok = validate_names_and_roles(Readers),
+ ok.
+
+% validate user input
+validate_names_and_roles({Props}) when is_list(Props) ->
+ case couch_util:get_value(<<"names">>,Props,[]) of
+ Ns when is_list(Ns) ->
+ [throw("names must be a JSON list of strings") ||N <- Ns, not is_binary(N)],
+ Ns;
+ _ -> throw("names must be a JSON list of strings")
+ end,
+ case couch_util:get_value(<<"roles">>,Props,[]) of
+ Rs when is_list(Rs) ->
+ [throw("roles must be a JSON list of strings") ||R <- Rs, not is_binary(R)],
+ Rs;
+ _ -> throw("roles must be a JSON list of strings")
+ end,
+ ok.
+
+get_revs_limit(#db{revs_limit=Limit}) ->
+ Limit.
+
+set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
+set_revs_limit(_Db, _Limit) ->
+ throw(invalid_revs_limit).
+
+name(#db{name=Name}) ->
+ Name.
+
+update_doc(Db, Doc, Options) ->
+ update_doc(Db, Doc, Options, interactive_edit).
+
+update_doc(Db, Doc, Options, UpdateType) ->
+ case update_docs(Db, [Doc], Options, UpdateType) of
+ {ok, [{ok, NewRev}]} ->
+ {ok, NewRev};
+ {ok, [{{_Id, _Rev}, Error}]} ->
+ throw(Error);
+ {ok, [Error]} ->
+ throw(Error);
+ {ok, []} ->
+ % replication success
+ {Pos, [RevId | _]} = Doc#doc.revs,
+ {ok, {Pos, RevId}}
+ end.
+
+update_docs(Db, Docs) ->
+ update_docs(Db, Docs, []).
+
+% group_alike_docs groups the sorted documents into sublist buckets, by id.
+% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
+group_alike_docs(Docs) ->
+ Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
+ group_alike_docs(Sorted, []).
+
+group_alike_docs([], Buckets) ->
+ lists:reverse(Buckets);
+group_alike_docs([Doc|Rest], []) ->
+ group_alike_docs(Rest, [[Doc]]);
+group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
+ [#doc{id=BucketId}|_] = Bucket,
+ case Doc#doc.id == BucketId of
+ true ->
+ % add to existing bucket
+ group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
+ false ->
+ % add to new bucket
+ group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
+ end.
+
+validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
+ catch check_is_admin(Db);
+validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) ->
+ ValidationFuns = load_validation_funs(Db),
+ validate_doc_update(Db#db{validate_doc_funs = ValidationFuns}, Doc, Fun);
+validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
+ ok;
+validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
+ ok;
+validate_doc_update(Db, Doc, GetDiskDocFun) ->
+ DiskDoc = GetDiskDocFun(),
+ JsonCtx = couch_util:json_user_ctx(Db),
+ SecObj = get_security(Db),
+ try [case Fun(Doc, DiskDoc, JsonCtx, SecObj) of
+ ok -> ok;
+ Error -> throw(Error)
+ end || Fun <- Db#db.validate_doc_funs],
+ ok
+ catch
+ throw:Error ->
+ Error
+ end.
+
+% to be safe, spawn a middleman here
+load_validation_funs(#db{main_pid = Pid} = Db) ->
+ {_, Ref} = spawn_monitor(fun() ->
+ {ok, DesignDocs} = get_design_docs(Db),
+ exit({ok, lists:flatmap(fun(DesignDoc) ->
+ case couch_doc:get_validate_doc_fun(DesignDoc) of
+ nil ->
+ [];
+ Fun ->
+ [Fun]
+ end
+ end, DesignDocs)})
+ end),
+ receive
+ {'DOWN', Ref, _, _, {ok, Funs}} ->
+ gen_server:cast(Pid, {load_validation_funs, Funs}),
+ Funs;
+ {'DOWN', Ref, _, _, Reason} ->
+ ?LOG_ERROR("could not load validation funs ~p", [Reason]),
+ throw(internal_server_error)
+ end.
+
+prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
+ OldFullDocInfo, LeafRevsDict, AllowConflict) ->
+ case Revs of
+ [PrevRev|_] ->
+ case dict:find({RevStart, PrevRev}, LeafRevsDict) of
+ {ok, {Deleted, DiskSp, DiskRevs}} ->
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs),
+ Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
+ {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2};
+ false ->
+ LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
+ {validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
+ end;
+ error when AllowConflict ->
+ couch_doc:merge_stubs(Doc, #doc{}), % will generate error if
+ % there are stubs
+ {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
+ error ->
+ {conflict, Doc}
+ end;
+ [] ->
+ % new doc, and we have existing revs.
+ % reuse existing deleted doc
+ if OldFullDocInfo#full_doc_info.deleted orelse AllowConflict ->
+ {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
+ true ->
+ {conflict, Doc}
+ end
+ end.
+
+
+
+prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
+ AccFatalErrors) ->
+ {AccPrepped, AccFatalErrors};
+prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
+ AllowConflict, AccPrepped, AccErrors) ->
+ [#doc{id=Id}|_]=DocBucket,
+ % no existing revs are known,
+ {PreppedBucket, AccErrors3} = lists:foldl(
+ fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
+ false -> ok
+ end,
+ case Revs of
+ {0, []} ->
+ case validate_doc_update(Db, Doc, fun() -> nil end) of
+ ok ->
+ {[Doc | AccBucket], AccErrors2};
+ Error ->
+ {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
+ end;
+ _ ->
+ % old revs specified but none exist, a conflict
+ {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
+ end
+ end,
+ {[], AccErrors}, DocBucket),
+
+ prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
+ [PreppedBucket | AccPrepped], AccErrors3);
+prep_and_validate_updates(Db, [DocBucket|RestBuckets],
+ [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
+ AllowConflict, AccPrepped, AccErrors) ->
+ Leafs = couch_key_tree:get_all_leafs(OldRevTree),
+ LeafRevsDict = dict:from_list([{{Start, RevId}, {Del, Ptr, Revs}} ||
+ {#leaf{deleted=Del, ptr=Ptr}, {Start, [RevId|_]}=Revs} <- Leafs]),
+ {PreppedBucket, AccErrors3} = lists:foldl(
+ fun(Doc, {Docs2Acc, AccErrors2}) ->
+ case prep_and_validate_update(Db, Doc, OldFullDocInfo,
+ LeafRevsDict, AllowConflict) of
+ {ok, Doc2} ->
+ {[Doc2 | Docs2Acc], AccErrors2};
+ {Error, #doc{id=Id,revs=Revs}} ->
+ % Record the error
+ {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
+ end
+ end,
+ {[], AccErrors}, DocBucket),
+ prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
+ [PreppedBucket | AccPrepped], AccErrors3).
+
+
+update_docs(Db, Docs, Options) ->
+ update_docs(Db, Docs, Options, interactive_edit).
+
+
+prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
+ Errors2 = [{{Id, {Pos, Rev}}, Error} ||
+ {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
+ {lists:reverse(AccPrepped), lists:reverse(Errors2)};
+prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) ->
+ case OldInfo of
+ not_found ->
+ {ValidatedBucket, AccErrors3} = lists:foldl(
+ fun(Doc, {AccPrepped2, AccErrors2}) ->
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
+ false -> ok
+ end,
+ case validate_doc_update(Db, Doc, fun() -> nil end) of
+ ok ->
+ {[Doc | AccPrepped2], AccErrors2};
+ Error ->
+ {AccPrepped2, [{Doc, Error} | AccErrors2]}
+ end
+ end,
+ {[], AccErrors}, Bucket),
+ prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
+ {ok, #full_doc_info{rev_tree=OldTree}} ->
+ NewRevTree = lists:foldl(
+ fun(NewDoc, AccTree) ->
+ {NewTree, _} = couch_key_tree:merge(AccTree,
+ couch_db:doc_to_tree(NewDoc), Db#db.revs_limit),
+ NewTree
+ end,
+ OldTree, Bucket),
+ Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
+ LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
+ {ValidatedBucket, AccErrors3} =
+ lists:foldl(
+ fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) ->
+ case dict:find({Pos, RevId}, LeafRevsFullDict) of
+ {ok, {Start, Path}} ->
+ % our unflushed doc is a leaf node. Go back on the path
+ % to find the previous rev that's on disk.
+
+ LoadPrevRevFun = fun() ->
+ make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
+ end,
+
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ DiskDoc = LoadPrevRevFun(),
+ Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
+ GetDiskDocFun = fun() -> DiskDoc end;
+ false ->
+ Doc2 = Doc,
+ GetDiskDocFun = LoadPrevRevFun
+ end,
+
+ case validate_doc_update(Db, Doc2, GetDiskDocFun) of
+ ok ->
+ {[Doc2 | AccValidated], AccErrors2};
+ Error ->
+ {AccValidated, [{Doc, Error} | AccErrors2]}
+ end;
+ _ ->
+ % this doc isn't a leaf or already exists in the tree.
+ % ignore but consider it a success.
+ {AccValidated, AccErrors2}
+ end
+ end,
+ {[], AccErrors}, Bucket),
+ prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo,
+ [ValidatedBucket | AccPrepped], AccErrors3)
+ end.
+
+
+
+new_revid(#doc{body=Body,revs={OldStart,OldRevs},
+ atts=Atts,deleted=Deleted}) ->
+ case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M =/= <<>>] of
+ Atts2 when length(Atts) =/= length(Atts2) ->
+ % We must have old style non-md5 attachments
+ ?l2b(integer_to_list(couch_util:rand32()));
+ Atts2 ->
+ OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
+ couch_util:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
+ end.
+
+new_revs([], OutBuckets, IdRevsAcc) ->
+ {lists:reverse(OutBuckets), IdRevsAcc};
+new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
+ {NewBucket, IdRevsAcc3} = lists:mapfoldl(
+ fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
+ NewRevId = new_revid(Doc),
+ {Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
+ [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
+ end, IdRevsAcc, Bucket),
+ new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
+
+check_dup_atts(#doc{atts=Atts}=Doc) ->
+ Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
+ check_dup_atts2(Atts2),
+ Doc.
+
+check_dup_atts2([#att{name=N}, #att{name=N} | _]) ->
+ throw({bad_request, <<"Duplicate attachments">>});
+check_dup_atts2([_ | Rest]) ->
+ check_dup_atts2(Rest);
+check_dup_atts2(_) ->
+ ok.
+
+
+update_docs(Db, Docs, Options, replicated_changes) ->
+ increment_stat(Db, {couchdb, database_writes}),
+ DocBuckets = group_alike_docs(Docs),
+
+ case (Db#db.validate_doc_funs /= []) orelse
+ lists:any(
+ fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
+ (#doc{atts=Atts}) ->
+ Atts /= []
+ end, Docs) of
+ true ->
+ Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ ExistingDocs = get_full_doc_infos(Db, Ids),
+
+ {DocBuckets2, DocErrors} =
+ prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []),
+ DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets
+ false ->
+ DocErrors = [],
+ DocBuckets3 = DocBuckets
+ end,
+ DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
+ || Doc <- Bucket] || Bucket <- DocBuckets3],
+ {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
+ {ok, DocErrors};
+
+update_docs(Db, Docs, Options, interactive_edit) ->
+ increment_stat(Db, {couchdb, database_writes}),
+ AllOrNothing = lists:member(all_or_nothing, Options),
+ % go ahead and generate the new revision ids for the documents.
+ % separate out the NonRep documents from the rest of the documents
+ {Docs2, NonRepDocs} = lists:foldl(
+ fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
+ case Id of
+ <<?LOCAL_DOC_PREFIX, _/binary>> ->
+ {DocsAcc, [Doc | NonRepDocsAcc]};
+ Id->
+ {[Doc | DocsAcc], NonRepDocsAcc}
+ end
+ end, {[], []}, Docs),
+
+ DocBuckets = group_alike_docs(Docs2),
+
+ case (Db#db.validate_doc_funs /= []) orelse
+ lists:any(
+ fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+ true;
+ (#doc{atts=Atts}) ->
+ Atts /= []
+ end, Docs2) of
+ true ->
+ % lookup the doc by id and get the most recent
+ Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ ExistingDocInfos = get_full_doc_infos(Db, Ids),
+
+ {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
+ DocBuckets, ExistingDocInfos, AllOrNothing, [], []),
+
+ % strip out any empty buckets
+ DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
+ false ->
+ PreCommitFailures = [],
+ DocBuckets2 = DocBuckets
+ end,
+
+ if (AllOrNothing) and (PreCommitFailures /= []) ->
+ {aborted, lists:map(
+ fun({{Id,{Pos, [RevId|_]}}, Error}) ->
+ {{Id, {Pos, RevId}}, Error};
+ ({{Id,{0, []}}, Error}) ->
+ {{Id, {0, <<>>}}, Error}
+ end, PreCommitFailures)};
+ true ->
+ Options2 = if AllOrNothing -> [merge_conflicts];
+ true -> [] end ++ Options,
+ DocBuckets3 = [[
+ doc_flush_atts(set_new_att_revpos(
+ check_dup_atts(Doc)), Db#db.fd)
+ || Doc <- B] || B <- DocBuckets2],
+ {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
+
+ {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
+
+ ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
+ {ok, lists:map(
+ fun(#doc{id=Id,revs={Pos, RevIds}}) ->
+ {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
+ Result
+ end, Docs)}
+ end.
+
+% Returns the first available document on disk. Input list is a full rev path
+% for the doc.
+make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
+ nil;
+make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #doc{}} | RestPath]) ->
+ make_first_doc_on_disk(Db, Id, Pos-1, RestPath);
+make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) ->
+ make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
+make_first_doc_on_disk(Db, Id, Pos, [{_, #leaf{deleted=IsDel, ptr=Sp}} |_]=DocPath) ->
+ Revs = [Rev || {Rev, _} <- DocPath],
+ make_doc(Db, Id, IsDel, Sp, {Pos, Revs}).
+
+set_commit_option(Options) ->
+ CommitSettings = {
+ [true || O <- Options, O==full_commit orelse O==delay_commit],
+ couch_config:get("couchdb", "delayed_commits", "false")
+ },
+ case CommitSettings of
+ {[true], _} ->
+ Options; % user requested explicit commit setting, do not change it
+ {_, "true"} ->
+ Options; % delayed commits are enabled, do nothing
+ {_, "false"} ->
+ [full_commit|Options];
+ {_, Else} ->
+ ?LOG_ERROR("[couchdb] delayed_commits setting must be true/false, not ~p",
+ [Else]),
+ [full_commit|Options]
+ end.
+
+collect_results(Pid, MRef, ResultsAcc) ->
+ receive
+ {result, Pid, Result} ->
+ collect_results(Pid, MRef, [Result | ResultsAcc]);
+ {done, Pid} ->
+ {ok, ResultsAcc};
+ {retry, Pid} ->
+ retry;
+ {'DOWN', MRef, _, _, Reason} ->
+ exit(Reason)
+ end.
+
+write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets,
+ NonRepDocs, Options0) ->
+ Options = set_commit_option(Options0),
+ MergeConflicts = lists:member(merge_conflicts, Options),
+ FullCommit = lists:member(full_commit, Options),
+ MRef = erlang:monitor(process, Pid),
+ try
+ Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(Pid, MRef, []) of
+ {ok, Results} -> {ok, Results};
+ retry ->
+ % This can happen if the db file we wrote to was swapped out by
+ % compaction. Retry by reopening the db and writing to the current file
+ {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
+ DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ % We only retry once
+ close(Db2),
+ Pid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(Pid, MRef, []) of
+ {ok, Results} -> {ok, Results};
+ retry -> throw({update_error, compaction_retry})
+ end
+ end
+ after
+ erlang:demonitor(MRef, [flush])
+ end.
+
+
+set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
+ Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
+ % already commited to disk, do not set new rev
+ Att;
+ (Att) ->
+ Att#att{revpos=RevPos+1}
+ end, Atts)}.
+
+
+doc_flush_atts(Doc, Fd) ->
+ Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
+
+check_md5(_NewSig, <<>>) -> ok;
+check_md5(Sig, Sig) -> ok;
+check_md5(_, _) -> throw(md5_mismatch).
+
+flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
+ % already written to our file, nothing to write
+ Att;
+
+flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5,
+ disk_len=InDiskLen} = Att) ->
+ {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
+ couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
+ check_md5(IdentityMd5, InMd5),
+ Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen};
+
+flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
+ couch_stream:write(OutputStream, Data)
+ end);
+
+flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
+ % Fun(MaxChunkSize, WriterFun) must call WriterFun
+ % once for each chunk of the attachment,
+ Fun(4096,
+ % WriterFun({Length, Binary}, State)
+ % WriterFun({0, _Footers}, State)
+ % Called with Length == 0 on the last time.
+ % WriterFun returns NewState.
+ fun({0, Footers}, _) ->
+ F = mochiweb_headers:from_binary(Footers),
+ case mochiweb_headers:get_value("Content-MD5", F) of
+ undefined ->
+ ok;
+ Md5 ->
+ {md5, base64:decode(Md5)}
+ end;
+ ({_Length, Chunk}, _) ->
+ couch_stream:write(OutputStream, Chunk)
+ end, ok)
+ end);
+
+flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
+ write_streamed_attachment(OutputStream, Fun, AttLen)
+ end).
+
+% From RFC 2616 3.6.1 - Chunked Transfer Coding
+%
+% In other words, the origin server is willing to accept
+% the possibility that the trailer fields might be silently
+% discarded along the path to the client.
+%
+% I take this to mean that if "Trailers: Content-MD5\r\n"
+% is present in the request, but there is no Content-MD5
+% trailer, we're free to ignore this inconsistency and
+% pretend that no Content-MD5 exists.
+with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
+ {ok, OutputStream} = case (Enc =:= identity) andalso
+ couch_util:compressible_att_type(Type) of
+ true ->
+ CompLevel = list_to_integer(
+ couch_config:get("attachments", "compression_level", "0")
+ ),
+ couch_stream:open(Fd, gzip, [{compression_level, CompLevel}]);
+ _ ->
+ couch_stream:open(Fd)
+ end,
+ ReqMd5 = case Fun(OutputStream) of
+ {md5, FooterMd5} ->
+ case InMd5 of
+ md5_in_footer -> FooterMd5;
+ _ -> InMd5
+ end;
+ _ ->
+ InMd5
+ end,
+ {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
+ couch_stream:close(OutputStream),
+ check_md5(IdentityMd5, ReqMd5),
+ {AttLen, DiskLen, NewEnc} = case Enc of
+ identity ->
+ case {Md5, IdentityMd5} of
+ {Same, Same} ->
+ {Len, IdentityLen, identity};
+ _ ->
+ {Len, IdentityLen, gzip}
+ end;
+ gzip ->
+ case {Att#att.att_len, Att#att.disk_len} of
+ {AL, DL} when AL =:= undefined orelse DL =:= undefined ->
+ % Compressed attachment uploaded through the standalone API.
+ {Len, Len, gzip};
+ {AL, DL} ->
+ % This case is used for efficient push-replication, where a
+ % compressed attachment is located in the body of multipart
+ % content-type request.
+ {AL, DL, gzip}
+ end
+ end,
+ Att#att{
+ data={Fd,StreamInfo},
+ att_len=AttLen,
+ disk_len=DiskLen,
+ md5=Md5,
+ encoding=NewEnc
+ }.
+
+
+write_streamed_attachment(_Stream, _F, 0) ->
+ ok;
+write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
+ Bin = read_next_chunk(F, LenLeft),
+ ok = couch_stream:write(Stream, Bin),
+ write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
+
+read_next_chunk(F, _) when is_function(F, 0) ->
+ F();
+read_next_chunk(F, LenLeft) when is_function(F, 1) ->
+ F(lists:min([LenLeft, 16#2000])).
+
+enum_docs_since_reduce_to_count(Reds) ->
+ couch_btree:final_reduce(
+ fun couch_db_updater:btree_by_seq_reduce/2, Reds).
+
+enum_docs_reduce_to_count(Reds) ->
+ {Count, _, _} = couch_btree:final_reduce(
+ fun couch_db_updater:btree_by_id_reduce/2, Reds),
+ Count.
+
+changes_since(Db, StartSeq, Fun, Acc) ->
+ changes_since(Db, StartSeq, Fun, [], Acc).
+
+changes_since(Db, StartSeq, Fun, Options, Acc) ->
+ Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
+ case FullDocInfo of
+ #full_doc_info{} ->
+ DocInfo = couch_doc:to_doc_info(FullDocInfo);
+ #doc_info{} ->
+ DocInfo = FullDocInfo
+ end,
+ Fun(DocInfo, Acc2)
+ end,
+ {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, Wrapper,
+ Acc, [{start_key, couch_util:to_integer(StartSeq) + 1} | Options]),
+ {ok, AccOut}.
+
+count_changes_since(Db, SinceSeq) ->
+ {ok, Changes} =
+ couch_btree:fold_reduce(Db#db.seq_tree,
+ fun(_SeqStart, PartialReds, 0) ->
+ {ok, couch_btree:final_reduce(Db#db.seq_tree, PartialReds)}
+ end,
+ 0, [{start_key, SinceSeq + 1}]),
+ Changes.
+
+enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
+ {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
+ {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
+
+enum_docs(Db, InFun, InAcc, Options) ->
+ {ok, LastReduce, OutAcc} = couch_view:fold(
+ #view{btree=Db#db.id_tree}, InFun, InAcc, Options),
+ {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
+
+%%% Internal function %%%
+open_doc_revs_int(Db, IdRevs, Options) ->
+ Ids = [Id || {Id, _Revs} <- IdRevs],
+ LookupResults = get_full_doc_infos(Db, Ids),
+ lists:zipwith(
+ fun({Id, Revs}, Lookup) ->
+ case Lookup of
+ {ok, #full_doc_info{rev_tree=RevTree}} ->
+ {FoundRevs, MissingRevs} =
+ case Revs of
+ all ->
+ {couch_key_tree:get_all_leafs(RevTree), []};
+ _ ->
+ case lists:member(latest, Options) of
+ true ->
+ couch_key_tree:get_key_leafs(RevTree, Revs);
+ false ->
+ couch_key_tree:get(RevTree, Revs)
+ end
+ end,
+ FoundResults =
+ lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) ->
+ case Value of
+ ?REV_MISSING ->
+ % we have the rev in our list but know nothing about it
+ {{not_found, missing}, {Pos, Rev}};
+ #leaf{deleted=IsDeleted, ptr=SummaryPtr} ->
+ {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
+ end
+ end, FoundRevs),
+ Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
+ {ok, Results};
+ not_found when Revs == all ->
+ {ok, []};
+ not_found ->
+ {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
+ end
+ end,
+ IdRevs, LookupResults).
+
+open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) ->
+ case couch_btree:lookup(Db#db.local_tree, [Id]) of
+ [{ok, {_, {Rev, BodyData}}}] ->
+ {ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}};
+ [not_found] ->
+ {not_found, missing}
+ end;
+open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) ->
+ #rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo,
+ Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}),
+ {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}};
+open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
+ #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} =
+ DocInfo = couch_doc:to_doc_info(FullDocInfo),
+ {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
+ Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
+ {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}};
+open_doc_int(Db, Id, Options) ->
+ case get_full_doc_info(Db, Id) of
+ {ok, FullDocInfo} ->
+ open_doc_int(Db, FullDocInfo, Options);
+ not_found ->
+ {not_found, missing}
+ end.
+
+doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) ->
+ case lists:member(revs_info, Options) of
+ false -> [];
+ true ->
+ {[{Pos, RevPath}],[]} =
+ couch_key_tree:get_full_key_paths(RevTree, [Rev]),
+
+ [{revs_info, Pos, lists:map(
+ fun({Rev1, #leaf{deleted=true}}) ->
+ {Rev1, deleted};
+ ({Rev1, #leaf{deleted=false}}) ->
+ {Rev1, available};
+ ({Rev1, ?REV_MISSING}) ->
+ {Rev1, missing}
+ end, RevPath)}]
+ end ++
+ case lists:member(conflicts, Options) of
+ false -> [];
+ true ->
+ case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of
+ [] -> [];
+ ConflictRevs -> [{conflicts, ConflictRevs}]
+ end
+ end ++
+ case lists:member(deleted_conflicts, Options) of
+ false -> [];
+ true ->
+ case [Rev1 || #rev_info{rev=Rev1,deleted=true} <- RestInfo] of
+ [] -> [];
+ DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}]
+ end
+ end ++
+ case lists:member(local_seq, Options) of
+ false -> [];
+ true -> [{local_seq, Seq}]
+ end.
+
+read_doc(#db{fd=Fd}, OldStreamPointer) when is_tuple(OldStreamPointer) ->
+ % 09 UPGRADE CODE
+ couch_stream:old_read_term(Fd, OldStreamPointer);
+read_doc(#db{fd=Fd}, Pos) ->
+ couch_file:pread_term(Fd, Pos).
+
+
+doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
+ [Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)),
+ {Start - length(RevIds) + 1, Tree}.
+
+
+doc_to_tree_simple(Doc, [RevId]) ->
+ [{RevId, Doc, []}];
+doc_to_tree_simple(Doc, [RevId | Rest]) ->
+ [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
+
+
+make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+ {BodyData, Atts} =
+ case Bp of
+ nil ->
+ {[], []};
+ _ ->
+ {ok, {BodyData0, Atts0}} = read_doc(Db, Bp),
+ {BodyData0,
+ lists:map(
+ fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
+ #att{name=Name,
+ type=Type,
+ att_len=AttLen,
+ disk_len=DiskLen,
+ md5=Md5,
+ revpos=RevPos,
+ data={Fd,Sp},
+ encoding=
+ case Enc of
+ true ->
+ % 0110 UPGRADE CODE
+ gzip;
+ false ->
+ % 0110 UPGRADE CODE
+ identity;
+ _ ->
+ Enc
+ end
+ };
+ ({Name,Type,Sp,AttLen,RevPos,Md5}) ->
+ #att{name=Name,
+ type=Type,
+ att_len=AttLen,
+ disk_len=AttLen,
+ md5=Md5,
+ revpos=RevPos,
+ data={Fd,Sp}};
+ ({Name,{Type,Sp,AttLen}}) ->
+ #att{name=Name,
+ type=Type,
+ att_len=AttLen,
+ disk_len=AttLen,
+ md5= <<>>,
+ revpos=0,
+ data={Fd,Sp}}
+ end, Atts0)}
+ end,
+ #doc{
+ id = Id,
+ revs = RevisionPath,
+ body = BodyData,
+ atts = Atts,
+ deleted = Deleted
+ }.
+
+
+increment_stat(#db{is_sys_db = true}, _Stat) ->
+ ok;
+increment_stat(#db{}, Stat) ->
+ couch_stats_collector:increment(Stat).