summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2009-03-13 22:15:34 +0000
committerJohn Christopher Anderson <jchris@apache.org>2009-03-13 22:15:34 +0000
commit9007e2d21dea8b0185c0096b30364a8ee40a3867 (patch)
tree7d8dacb2c8cd619f18dfab8fdb40d146ac28c85a /src
parent65608e14e8911b33c30178d717d745edc9f66c17 (diff)
Commit Damien's rep_security branch to trunk.
Changes bulk_docs conflict checking. Breaks file format, see mailing list for data upgrade procedure, or http://wiki.apache.org/couchdb/Breaking_changes git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@753448 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_db.erl395
-rw-r--r--src/couchdb/couch_db.hrl24
-rw-r--r--src/couchdb/couch_db_updater.erl147
-rw-r--r--src/couchdb/couch_doc.erl185
-rw-r--r--src/couchdb/couch_httpd.erl120
-rw-r--r--src/couchdb/couch_httpd_db.erl285
-rw-r--r--src/couchdb/couch_httpd_misc_handlers.erl55
-rw-r--r--src/couchdb/couch_httpd_show.erl15
-rw-r--r--src/couchdb/couch_httpd_view.erl12
-rw-r--r--src/couchdb/couch_key_tree.erl375
-rw-r--r--src/couchdb/couch_rep.erl365
-rw-r--r--src/couchdb/couch_util.erl15
12 files changed, 1226 insertions, 767 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 92026ff1..46de8745 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -17,6 +17,7 @@
-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
-export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
+-export([set_revs_limit/2,get_revs_limit/1]).
-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]).
-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
@@ -184,22 +185,42 @@ get_db_info(Db) ->
],
{ok, InfoList}.
+check_is_admin(#db{admins=Admins, user_ctx=#user_ctx{name=Name,roles=Roles}}) ->
+ DbAdmins = [<<"_admin">> | Admins],
+ case DbAdmins -- [Name | Roles] of
+ DbAdmins -> % same list, not an admin
+ throw({unauthorized, <<"You are not a db or server admin.">>});
+ _ ->
+ ok
+ end.
+
get_admins(#db{admins=Admins}) ->
Admins.
-set_admins(#db{update_pid=UpdatePid,user_ctx=Ctx},
- Admins) when is_list(Admins) ->
- case gen_server:call(UpdatePid, {set_admins, Admins, Ctx}, infinity) of
- ok -> ok;
- Error -> throw(Error)
- end.
+set_admins(#db{update_pid=Pid}=Db, Admins) when is_list(Admins) ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {set_admins, Admins}, infinity).
+
+
+get_revs_limit(#db{revs_limit=Limit}) ->
+ Limit.
+
+set_revs_limit(#db{update_pid=Pid}=Db, Limit) when Limit > 0 ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
+set_revs_limit(_Db, _Limit) ->
+ throw(invalid_revs_limit).
name(#db{name=Name}) ->
Name.
update_doc(Db, Doc, Options) ->
- {ok, [NewRev]} = update_docs(Db, [Doc], Options),
- {ok, NewRev}.
+ case update_docs(Db, [Doc], Options) of
+ {ok, [{ok, NewRev}]} ->
+ {ok, NewRev};
+ {ok, [Error]} ->
+ throw(Error)
+ end.
update_docs(Db, Docs) ->
update_docs(Db, Docs, []).
@@ -227,146 +248,283 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
validate_doc_update(#db{user_ctx=UserCtx, admins=Admins},
- #doc{id= <<"_design/",_/binary>>}=Doc, _GetDiskDocFun) ->
+ #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
UserNames = [UserCtx#user_ctx.name | UserCtx#user_ctx.roles],
% if the user is a server admin or db admin, allow the save
case length(UserNames -- [<<"_admin">> | Admins]) == length(UserNames) of
true ->
% not an admin
- throw({unauthorized, <<"You are not a server or database admin.">>});
+ {unauthorized, <<"You are not a server or database admin.">>};
false ->
- Doc
+ ok
end;
-validate_doc_update(#db{validate_doc_funs=[]}, Doc, _GetDiskDocFun) ->
- Doc;
-validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}=Doc, _GetDiskDocFun) ->
- Doc;
+validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
+ ok;
+validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
+ ok;
validate_doc_update(#db{name=DbName,user_ctx=Ctx}=Db, Doc, GetDiskDocFun) ->
DiskDoc = GetDiskDocFun(),
JsonCtx = {[{<<"db">>, DbName},
{<<"name">>,Ctx#user_ctx.name},
{<<"roles">>,Ctx#user_ctx.roles}]},
- [case Fun(Doc, DiskDoc, JsonCtx) of
- ok -> ok;
- Error -> throw(Error)
- end || Fun <- Db#db.validate_doc_funs],
- Doc.
+ try [case Fun(Doc, DiskDoc, JsonCtx) of
+ ok -> ok;
+ Error -> throw(Error)
+ end || Fun <- Db#db.validate_doc_funs],
+ ok
+ catch
+ throw:Error ->
+ Error
+ end.
-prep_and_validate_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc,
+prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc,
OldFullDocInfo, LeafRevsDict) ->
case PrevRevs of
[PrevRev|_] ->
- case dict:find(PrevRev, LeafRevsDict) of
- {ok, {Deleted, Sp, DiskRevs}} ->
- Doc2 = Doc#doc{revs=[NewRev|DiskRevs]},
- case couch_doc:has_stubs(Doc2) of
+ case dict:find({RevStart-1, PrevRev}, LeafRevsDict) of
+ {ok, {Deleted, DiskSp, DiskRevs}} ->
+ case couch_doc:has_stubs(Doc) of
true ->
- DiskDoc = make_doc(Db, Id, Deleted, Sp, DiskRevs),
- Doc3 = couch_doc:merge_stubs(Doc2, DiskDoc),
- validate_doc_update(Db, Doc3, fun() -> DiskDoc end);
+ DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs),
+ Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
+ {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2};
false ->
- LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,Sp,DiskRevs) end,
- validate_doc_update(Db, Doc2, LoadDiskDoc)
+ LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
+ {validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
end;
error ->
- throw(conflict)
+ {conflict, Doc}
end;
[] ->
% new doc, and we have existing revs.
if OldFullDocInfo#full_doc_info.deleted ->
% existing docs are deletions
- validate_doc_update(Db, Doc, nil);
+ {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
true ->
- throw(conflict)
+ {conflict, Doc}
end
end.
+
+
+prep_and_validate_updates(_Db, [], [], AccPrepped, AccFatalErrors) ->
+ {AccPrepped, AccFatalErrors};
+prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AccPrepped, AccErrors) ->
+ [#doc{id=Id}|_]=DocBucket,
+ % no existing revs are known,
+ {PreppedBucket, AccErrors3} = lists:foldl(
+ fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
+ case Revs of
+ {Pos, [NewRev,_OldRev|_]} ->
+ % old revs specified but none exist, a conflict
+ {AccBucket, [{{Id, {Pos, NewRev}}, conflict} | AccErrors2]};
+ {Pos, [NewRev]} ->
+ case validate_doc_update(Db, Doc, fun() -> nil end) of
+ ok ->
+ {[Doc | AccBucket], AccErrors2};
+ Error ->
+ {AccBucket, [{{Id, {Pos, NewRev}}, Error} | AccErrors2]}
+ end
+ end
+ end,
+ {[], AccErrors}, DocBucket),
+
+ prep_and_validate_updates(Db, RestBuckets, RestLookups,
+ [PreppedBucket | AccPrepped], AccErrors3);
+prep_and_validate_updates(Db, [DocBucket|RestBuckets],
+ [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
+ AccPrepped, AccErrors) ->
+ Leafs = couch_key_tree:get_all_leafs(OldRevTree),
+ LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} ||
+ {{Deleted, Sp}, {Start, [RevId|_]}=Revs} <- Leafs]),
+ {PreppedBucket, AccErrors3} = lists:foldl(
+ fun(Doc, {Docs2Acc, AccErrors2}) ->
+ case prep_and_validate_update(Db, Doc, OldFullDocInfo,
+ LeafRevsDict) of
+ {ok, Doc} ->
+ {[Doc | Docs2Acc], AccErrors2};
+ {Error, #doc{id=Id,revs={Pos, [NewRev|_]}}} ->
+ % Record the error
+ {Docs2Acc, [{{Id, {Pos, NewRev}}, Error} |AccErrors2]}
+ end
+ end,
+ {[], AccErrors}, DocBucket),
+ prep_and_validate_updates(Db, RestBuckets, RestLookups, [PreppedBucket | AccPrepped], AccErrors3).
+
+
update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
- update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, true).
+ update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, interactive_edit).
-update_docs(Db, Docs, Options, false) ->
+
+prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
+ Errors2 = [{{Id, {Pos, Rev}}, Error} ||
+ {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
+ {lists:reverse(AccPrepped), lists:reverse(Errors2)};
+prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) ->
+ case OldInfo of
+ not_found ->
+ {ValidatedBucket, AccErrors3} = lists:foldl(
+ fun(Doc, {AccPrepped2, AccErrors2}) ->
+ case validate_doc_update(Db, Doc, fun() -> nil end) of
+ ok ->
+ {[Doc | AccPrepped2], AccErrors2};
+ Error ->
+ {AccPrepped2, [{Doc, Error} | AccErrors2]}
+ end
+ end,
+ {[], AccErrors}, Bucket),
+ prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
+ {ok, #full_doc_info{rev_tree=OldTree}} ->
+ NewRevTree = lists:foldl(
+ fun(NewDoc, AccTree) ->
+ {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]),
+ NewTree
+ end,
+ OldTree, Bucket),
+ Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
+ LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
+ {ValidatedBucket, AccErrors3} =
+ lists:foldl(
+ fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) ->
+ case dict:find({Pos, RevId}, LeafRevsFullDict) of
+ {ok, {Start, Path}} ->
+ % our unflushed doc is a leaf node. Go back on the path
+ % to find the previous rev that's on disk.
+ PrevRevResult =
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ [_PrevRevFull | [PrevRevFull | _]=PrevPath] = Path,
+ case PrevRevFull of
+ {_RevId, ?REV_MISSING} ->
+ conflict;
+ {RevId, {IsDel, DiskSp}} ->
+ DiskDoc = make_doc(Db, Id, IsDel, DiskSp, PrevPath),
+ Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
+ {ok, Doc2, fun() -> DiskDoc end}
+ end;
+ false ->
+ {ok, Doc,
+ fun() ->
+ make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
+ end}
+ end,
+ case PrevRevResult of
+ {ok, NewDoc, LoadPrevRevFun} ->
+ case validate_doc_update(Db, NewDoc, LoadPrevRevFun) of
+ ok ->
+ {[NewDoc | AccValidated], AccErrors2};
+ Error ->
+ {AccValidated, [{NewDoc, Error} | AccErrors2]}
+ end;
+ Error ->
+ {AccValidated, [{Doc, Error} | AccErrors2]}
+ end;
+ _ ->
+ % this doc isn't a leaf or already exists in the tree.
+ % ignore but consider it a success.
+ {AccValidated, AccErrors2}
+ end
+ end,
+ {[], AccErrors}, Bucket),
+ prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3)
+ end.
+
+update_docs(Db, Docs, Options, replicated_changes) ->
couch_stats_collector:increment({couchdb, database_writes}),
DocBuckets = group_alike_docs(Docs),
- Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
- ExistingDocs = get_full_doc_infos(Db, Ids),
+ case (Db#db.validate_doc_funs /= []) orelse
+ lists:any(
+ fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
+ (_) -> false
+ end, Docs) of
+ true ->
+ Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ ExistingDocs = get_full_doc_infos(Db, Ids),
- DocBuckets2 = lists:zipwith(
- fun(Bucket, not_found) ->
- [validate_doc_update(Db, Doc, fun()-> nil end) || Doc <- Bucket];
- (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}}) ->
- NewTree = lists:foldl(
- fun(Doc, RevTreeAcc) ->
- couch_key_tree:merge(RevTreeAcc, doc_to_tree(Doc))
- end,
- OldRevTree, Bucket),
- Leafs = couch_key_tree:get_all_leafs_full(NewTree),
- LeafRevsFullDict = dict:from_list( [{Rev, FullPath} || [{Rev, _}|_]=FullPath <- Leafs]),
- lists:flatmap(
- fun(#doc{revs=[Rev|_]}=Doc) ->
- case dict:find(Rev, LeafRevsFullDict) of
- {ok, [{Rev, #doc{id=Id}}|_]=Path} ->
- % our unflushed doc is a leaf node. Go back on the path
- % to find the previous rev that's on disk.
- LoadPrevRev = fun() ->
- make_first_doc_on_disk(Db, Id, Path)
- end,
- [validate_doc_update(Db, Doc, LoadPrevRev)];
- _ ->
- % this doc isn't a leaf or is already exists in the tree. ignore
- []
- end
- end, Bucket)
- end,
- DocBuckets, ExistingDocs),
- write_and_commit(Db, DocBuckets2, Options);
+ {DocBuckets2, DocErrors} =
+ prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []),
+ DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets
+ false ->
+ DocErrors = [],
+ DocBuckets3 = DocBuckets
+ end,
+ {ok, []} = write_and_commit(Db, DocBuckets3, [merge_conflicts | Options]),
+ {ok, DocErrors};
-update_docs(Db, Docs, Options, true) ->
+update_docs(Db, Docs, Options, interactive_edit) ->
couch_stats_collector:increment({couchdb, database_writes}),
-
+ AllOrNothing = lists:member(all_or_nothing, Options),
% go ahead and generate the new revision ids for the documents.
Docs2 = lists:map(
- fun(#doc{id=Id,revs=Revs}=Doc) ->
+ fun(#doc{id=Id,revs={Start, RevIds}}=Doc) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
- Rev = case Revs of [] -> 0; [Rev0|_] -> list_to_integer(binary_to_list(Rev0)) end,
- Doc#doc{revs=[list_to_binary(integer_to_list(Rev + 1))]};
+ Rev = case RevIds of [] -> 0; [Rev0|_] -> list_to_integer(?b2l(Rev0)) end,
+ Doc#doc{revs={Start, [?l2b(integer_to_list(Rev + 1))]}};
_ ->
- Doc#doc{revs=[list_to_binary(integer_to_list(couch_util:rand32())) | Revs]}
+ Doc#doc{revs={Start+1, [?l2b(integer_to_list(couch_util:rand32())) | RevIds]}}
end
end, Docs),
DocBuckets = group_alike_docs(Docs2),
- Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
-
- % lookup the doc by id and get the most recent
-
- ExistingDocs = get_full_doc_infos(Db, Ids),
- DocBuckets2 = lists:zipwith(
- fun(Bucket, not_found) ->
- % no existing revs on disk, make sure no old revs specified.
- [throw(conflict) || #doc{revs=[_NewRev, _OldRev | _]} <- Bucket],
- [validate_doc_update(Db, Doc, fun()-> nil end) || Doc <- Bucket];
- (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}) ->
- Leafs = couch_key_tree:get_all_leafs(OldRevTree),
- LeafRevsDict = dict:from_list([{Rev, {Deleted, Sp, Revs}} || {Rev, {Deleted, Sp}, Revs} <- Leafs]),
- [prep_and_validate_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket]
+ case (Db#db.validate_doc_funs /= []) orelse
+ lists:any(
+ fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+ true;
+ (#doc{attachments=Atts}) ->
+ Atts /= []
+ end, Docs) of
+ true ->
+ % lookup the doc by id and get the most recent
+ Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ ExistingDocInfos = get_full_doc_infos(Db, Ids),
+
+ {DocBucketsPrepped, Failures} =
+ case AllOrNothing of
+ true ->
+ prep_and_validate_replicated_updates(Db, DocBuckets,
+ ExistingDocInfos, [], []);
+ false ->
+ prep_and_validate_updates(Db, DocBuckets, ExistingDocInfos, [], [])
end,
- DocBuckets, ExistingDocs),
- ok = write_and_commit(Db, DocBuckets2, [new_edits | Options]),
- {ok, [NewRev ||#doc{revs=[NewRev|_]} <- Docs2]}.
+
+ % strip out any empty buckets
+ DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
+ false ->
+ Failures = [],
+ DocBuckets2 = DocBuckets
+ end,
+ if (AllOrNothing) and (Failures /= []) ->
+ {aborted, Failures};
+ true ->
+ Options2 = if AllOrNothing -> [merge_conflicts];
+ true -> [] end ++ Options,
+ {ok, CommitFailures} = write_and_commit(Db, DocBuckets2, Options2),
+ FailDict = dict:from_list(CommitFailures ++ Failures),
+ % the output for each is either {ok, NewRev} or Error
+ {ok, lists:map(
+ fun(#doc{id=Id,revs={Pos, [NewRevId|_]}}) ->
+ case dict:find({Id, {Pos, NewRevId}}, FailDict) of
+ {ok, Error} ->
+ Error;
+ error ->
+ {ok, {Pos, NewRevId}}
+ end
+ end, Docs2)}
+ end.
% Returns the first available document on disk. Input list is a full rev path
% for the doc.
-make_first_doc_on_disk(_Db, _Id, []) ->
+make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
nil;
-make_first_doc_on_disk(Db, Id, [{_Rev, ?REV_MISSING}|RestPath]) ->
- make_first_doc_on_disk(Db, Id, RestPath);
-make_first_doc_on_disk(Db, Id, [{_Rev, {IsDel, Sp}} |_]=DocPath) ->
+make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) ->
+ make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
+make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp}} |_]=DocPath) ->
Revs = [Rev || {Rev, _} <- DocPath],
- make_doc(Db, Id, IsDel, Sp, Revs).
+ make_doc(Db, Id, IsDel, Sp, {Pos, Revs}).
write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
@@ -374,20 +532,18 @@ write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
% flush unwritten binaries to disk.
DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
case gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity) of
- ok -> ok;
+ {ok, Conflicts} -> {ok, Conflicts};
retry ->
% This can happen if the db file we wrote to was swapped out by
- % compaction. Retry writing to the current file
+ % compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
DocBuckets3 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of
- ok -> ok;
- Else -> throw(Else)
- end;
- Else->
- throw(Else)
+ {ok, Conflicts} -> {ok, Conflicts};
+ retry -> throw({update_error, compaction_retry})
+ end
end.
@@ -506,7 +662,7 @@ enum_docs_reduce_to_count(Reds) ->
{Count, _DelCount} = couch_btree:final_reduce(
fun couch_db_updater:btree_by_id_reduce/2, Reds),
Count.
-
+
count_changes_since(Db, SinceSeq) ->
{ok, Changes} =
couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree,
@@ -518,7 +674,7 @@ count_changes_since(Db, SinceSeq) ->
end,
ok),
Changes.
-
+
enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) ->
couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx).
@@ -594,11 +750,11 @@ open_doc_revs_int(Db, IdRevs, Options) ->
end
end,
FoundResults =
- lists:map(fun({Rev, Value, FoundRevPath}) ->
+ lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) ->
case Value of
?REV_MISSING ->
% we have the rev in our list but know nothing about it
- {{not_found, missing}, Rev};
+ {{not_found, missing}, {Pos, Rev}};
{IsDeleted, SummaryPtr} ->
{ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
end
@@ -616,18 +772,18 @@ open_doc_revs_int(Db, IdRevs, Options) ->
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) ->
case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
[{ok, {_, {Rev, BodyData}}}] ->
- {ok, #doc{id=Id, revs=[list_to_binary(integer_to_list(Rev))], body=BodyData}};
+ {ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}};
[not_found] ->
{not_found, missing}
end;
-open_doc_int(Db, #doc_info{id=Id,rev=Rev,deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) ->
- Doc = make_doc(Db, Id, IsDeleted, Sp, [Rev]),
+open_doc_int(Db, #doc_info{id=Id,rev={Pos,RevId},deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) ->
+ Doc = make_doc(Db, Id, IsDeleted, Sp, {Pos,[RevId]}),
{ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}};
open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
#doc_info{deleted=IsDeleted,rev=Rev,summary_pointer=Sp} = DocInfo =
couch_doc:to_doc_info(FullDocInfo),
- {[{_Rev,_Value, Revs}], []} = couch_key_tree:get(RevTree, [Rev]),
- Doc = make_doc(Db, Id, IsDeleted, Sp, Revs),
+ {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
+ Doc = make_doc(Db, Id, IsDeleted, Sp, RevPath),
{ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}};
open_doc_int(Db, Id, Options) ->
case get_full_doc_info(Db, Id) of
@@ -641,9 +797,10 @@ doc_meta_info(DocInfo, RevTree, Options) ->
case lists:member(revs_info, Options) of
false -> [];
true ->
- {[RevPath],[]} =
+ {[{Pos, RevPath}],[]} =
couch_key_tree:get_full_key_paths(RevTree, [DocInfo#doc_info.rev]),
- [{revs_info, lists:map(
+
+ [{revs_info, Pos, lists:map(
fun({Rev, {true, _Sp}}) ->
{Rev, deleted};
({Rev, {false, _Sp}}) ->
@@ -670,13 +827,15 @@ doc_meta_info(DocInfo, RevTree, Options) ->
end.
-doc_to_tree(Doc) ->
- doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
+doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
+ [Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)),
+ {Start - length(RevIds) + 1, Tree}.
+
-doc_to_tree(Doc, [RevId]) ->
+doc_to_tree_simple(Doc, [RevId]) ->
[{RevId, Doc, []}];
-doc_to_tree(Doc, [RevId | Rest]) ->
- [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
+doc_to_tree_simple(Doc, [RevId | Rest]) ->
+ [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
make_doc(Db, FullDocInfo) ->
{#doc_info{id=Id,deleted=Deleted,summary_pointer=Sp}, RevPath}
@@ -703,4 +862,4 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, BodySp, RevisionPath) ->
}.
- \ No newline at end of file
+
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 026afe14..f460f450 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -69,7 +69,7 @@
-record(doc,
{
id = <<"">>,
- revs = [],
+ revs = {0, []},
% the json body object.
body = {[]},
@@ -104,7 +104,7 @@
% if the disk revision is incremented, then new upgrade logic will need to be
% added to couch_db_updater:init_db.
--define(LATEST_DISK_VERSION, 0).
+-define(LATEST_DISK_VERSION, 1).
-record(db_header,
{disk_version = ?LATEST_DISK_VERSION,
@@ -115,13 +115,14 @@
local_docs_btree_state = nil,
purge_seq = 0,
purged_docs = nil,
- admins_ptr = nil
+ admins_ptr = nil,
+ revs_limit = 1000
}).
-record(db,
- {main_pid=nil,
- update_pid=nil,
- compactor_pid=nil,
+ {main_pid = nil,
+ update_pid = nil,
+ compactor_pid = nil,
instance_start_time, % number of microsecs since jan 1 1970 as a binary string
fd,
fd_ref_counter,
@@ -133,11 +134,12 @@
update_seq,
name,
filepath,
- validate_doc_funs=[],
- admins=[],
- admins_ptr=nil,
- user_ctx=#user_ctx{},
- waiting_delayed_commit=nil
+ validate_doc_funs = [],
+ admins = [],
+ admins_ptr = nil,
+ user_ctx = #user_ctx{},
+ waiting_delayed_commit = nil,
+ revs_limit = 1000
}).
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index 7790d7a4..7752a577 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -44,15 +44,13 @@ handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
handle_call({update_docs, DocActions, Options}, _From, Db) ->
try update_docs_int(Db, DocActions, Options) of
- {ok, Db2} ->
+ {ok, Conflicts, Db2} ->
ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
couch_db_update_notifier:notify({updated, Db2#db.name}),
- {reply, ok, Db2}
+ {reply, {ok, Conflicts}, Db2}
catch
throw: retry ->
- {reply, retry, Db};
- throw: conflict ->
- {reply, conflict, Db}
+ {reply, retry, Db}
end;
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
@@ -64,18 +62,18 @@ handle_call(increment_update_seq, _From, Db) ->
couch_db_update_notifier:notify({updated, Db#db.name}),
{reply, {ok, Db2#db.update_seq}, Db2};
-handle_call({set_admins, NewAdmins, #user_ctx{roles=Roles}}, _From, Db) ->
- DbAdmins = [<<"_admin">> | Db#db.admins],
- case length(DbAdmins -- Roles) == length(DbAdmins) of
- true ->
- {reply, {unauthorized, <<"You are not a db or server admin.">>}, Db};
- false ->
- {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins),
- Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr,
- update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
- {reply, ok, Db2}
- end;
+handle_call({set_admins, NewAdmins}, _From, Db) ->
+ {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins),
+ Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr,
+ update_seq=Db#db.update_seq+1}),
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ {reply, ok, Db2};
+
+handle_call({set_revs_limit, Limit}, _From, Db) ->
+ Db2 = commit_data(Db#db{revs_limit=Limit,
+ update_seq=Db#db.update_seq+1}),
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ {reply, ok, Db2};
handle_call({purge_docs, _IdRevs}, _From,
#db{compactor_pid=Pid}=Db) when Pid /= nil ->
@@ -298,7 +296,8 @@ init_db(DbName, Filepath, Fd, Header0) ->
filepath = Filepath,
admins = Admins,
admins_ptr = AdminsPtr,
- instance_start_time = StartTime
+ instance_start_time = StartTime,
+ revs_limit = Header#db_header.revs_limit
}.
@@ -358,40 +357,31 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
end, Unflushed),
flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) ->
- {ok, lists:reverse(AccNewInfos), AccSeq};
-merge_rev_trees(NoConflicts, [NewDocs|RestDocsList],
- [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) ->
- #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo,
- UpdatesRevTree = lists:foldl(
- fun(NewDoc, AccTree) ->
- couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc))
+merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccConflicts, AccSeq) ->
+ {ok, lists:reverse(AccNewInfos), AccConflicts, AccSeq};
+merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
+ [OldDocInfo|RestOldInfo], AccNewInfos, AccConflicts, AccSeq) ->
+ #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted}=OldDocInfo,
+ {NewRevTree, NewConflicts} = lists:foldl(
+ fun(#doc{revs={Pos,[Rev|_]}}=NewDoc, {AccTree, AccConflicts2}) ->
+ case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
+ {_NewTree, conflicts}
+ when (not OldDeleted) and (not MergeConflicts) ->
+ {AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]};
+ {NewTree, _} ->
+ {NewTree, AccConflicts2}
+ end
end,
- [], NewDocs),
- NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree),
+ {OldTree, AccConflicts}, NewDocs),
if NewRevTree == OldTree ->
% nothing changed
- merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq);
+ merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos,
+ NewConflicts, AccSeq);
true ->
- if NoConflicts andalso OldTree /= [] ->
- OldConflicts = couch_key_tree:count_leafs(OldTree),
- NewConflicts = couch_key_tree:count_leafs(NewRevTree),
- if NewConflicts > OldConflicts ->
- % if all the old docs are deletions, allow this new conflict
- case [1 || {_Rev,{IsDel,_Sp},_Path} <-
- couch_key_tree:get_all_leafs(OldTree), IsDel==false] of
- [] ->
- ok;
- _ ->
- throw(conflict)
- end;
- true -> ok
- end;
- true -> ok
- end,
+ % we have updated the document, give it a new seq #
NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
- merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo,
- [NewInfo|AccNewInfos],AccSeq+1)
+ merge_rev_trees(MergeConflicts, RestDocsList,RestOldInfo,
+ [NewInfo|AccNewInfos], NewConflicts, AccSeq+1)
end.
new_index_entries([], AccById, AccBySeq) ->
@@ -402,19 +392,23 @@ new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) ->
[FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
[DocInfo|AccBySeq]).
+
+stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
+ [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
+ #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
+
+
update_docs_int(Db, DocsList, Options) ->
#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq
} = Db,
-
% separate out the NonRep documents from the rest of the documents
{DocsList2, NonRepDocs} = lists:foldl(
- fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
+ fun([#doc{id=Id}=Doc | _]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
case Id of
- <<?LOCAL_DOC_PREFIX, _/binary>> when Rest==[] ->
- % when saving NR (non rep) documents, you can only save a single rev
+ <<?LOCAL_DOC_PREFIX, _/binary>> ->
{DocsListAcc, [Doc | NonRepDocsAcc]};
Id->
{[Docs | DocsListAcc], NonRepDocsAcc}
@@ -434,23 +428,26 @@ update_docs_int(Db, DocsList, Options) ->
Ids, OldDocLookups),
% Merge the new docs into the revision trees.
- NoConflicts = lists:member(new_edits, Options),
- {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq),
+ {ok, NewDocInfos0, Conflicts, NewSeq} = merge_rev_trees(
+ lists:member(merge_conflicts, Options),
+ DocsList2, OldDocInfos, [], [], LastSeq),
+
+ NewDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
RemoveSeqs =
- [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
+ [OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
- % All regular documents are now ready to write.
+ % All documents are now ready to write.
- % Try to write the local documents first, a conflict might be generated
- {ok, Db2} = update_local_docs(Db, NonRepDocs),
+ {ok, LocalConflicts, Db2} = update_local_docs(Db, NonRepDocs),
- % Write out the document summaries (they are stored in the nodes of the rev trees)
+ % Write out the document summaries (the bodies are stored in the nodes of
+ % the trees, the attachments are already written to disk)
{ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []),
{ok, InfoById, InfoBySeq} = new_index_entries(FlushedDocInfos, [], []),
- % and the indexes to the documents
+ % and the indexes
{ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
{ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
@@ -459,6 +456,8 @@ update_docs_int(Db, DocsList, Options) ->
docinfo_by_seq_btree = DocInfoBySeqBTree2,
update_seq = NewSeq},
+ % Check if we just updated any design documents, and update the validation
+ % funs if we did.
case [1 || <<"_design/",_/binary>> <- Ids] of
[] ->
Db4 = Db3;
@@ -466,18 +465,15 @@ update_docs_int(Db, DocsList, Options) ->
Db4 = refresh_validate_doc_funs(Db3)
end,
- {ok, commit_data(Db4, not lists:member(full_commit, Options))}.
-
+ {ok, LocalConflicts ++ Conflicts,
+ commit_data(Db4, not lists:member(full_commit, Options))}.
+
update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
Ids = [Id || #doc{id=Id} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
- NewRev =
- case Revs of
- [] -> 0;
- [RevStr|_] -> list_to_integer(binary_to_list(RevStr))
- end,
+ fun(#doc{id=Id,deleted=Delete,revs={0,[RevStr]},body=Body}, OldDocLookup) ->
+ NewRev = list_to_integer(?b2l(RevStr)),
OldRev =
case OldDocLookup of
{ok, {_, {OldRev0, _}}} -> OldRev0;
@@ -490,18 +486,19 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
true -> {remove, Id}
end;
false ->
- throw(conflict)
+ {conflict, {Id, {0, RevStr}}}
end
end, Docs, OldDocLookups),
BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
-
+ Conflicts = [{conflict, IdRev} || {conflict, IdRev} <- BtreeEntries],
+
{ok, Btree2} =
couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
- {ok, Db#db{local_docs_btree = Btree2}}.
+ {ok, Conflicts, Db#db{local_docs_btree = Btree2}}.
commit_data(Db) ->
@@ -515,7 +512,8 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
- admins_ptr = Db#db.admins_ptr
+ admins_ptr = Db#db.admins_ptr,
+ revs_limit = Db#db.revs_limit
},
if Header == Header2 ->
Db;
@@ -549,6 +547,10 @@ copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
[];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) ->
+ % root nner node, only copy info/data from leaf nodes
+ [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]),
+ [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) ->
% This is a leaf node, copy it over
NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
@@ -560,10 +562,11 @@ copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
- NewFullDocInfos = lists:map(
+ NewFullDocInfos0 = lists:map(
fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
end, LookupResults),
+ NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0),
NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
RemoveSeqs =
case Retry of
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index 9860ac0c..fc817d56 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -12,41 +12,53 @@
-module(couch_doc).
--export([to_doc_info/1,to_doc_info_path/1]).
+-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,rev_to_strs/1]).
-export([bin_foldl/3,bin_size/1,bin_to_binary/1,get_validate_doc_fun/1]).
-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
-include("couch_db.hrl").
% helpers used by to_json_obj
-to_json_rev([]) ->
+to_json_rev(0, []) ->
[];
-to_json_rev(Revs) ->
- [{<<"_rev">>, lists:nth(1, Revs)}].
+to_json_rev(Start, [FirstRevId|_]) ->
+ [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",FirstRevId])}].
to_json_body(true, _Body) ->
[{<<"_deleted">>, true}];
to_json_body(false, {Body}) ->
Body.
-to_json_revs(Options, Revs) ->
+to_json_revisions(Options, Start, RevIds) ->
case lists:member(revs, Options) of
false -> [];
true ->
- [{<<"_revs">>, Revs}]
+ [{<<"_revisions">>, {[{<<"start">>, Start},
+ {<<"ids">>, RevIds}]}}]
end.
-to_json_revs_info(Meta) ->
+rev_to_str({Pos, RevId}) ->
+ ?l2b([integer_to_list(Pos),"-",RevId]).
+
+rev_to_strs([]) ->
+ [];
+rev_to_strs([{Pos, RevId}| Rest]) ->
+ [rev_to_str({Pos, RevId}) | rev_to_strs(Rest)].
+
+to_json_meta(Meta) ->
lists:map(
- fun({revs_info, RevsInfo}) ->
- JsonRevsInfo =
- [{[{rev, Rev}, {status, list_to_binary(atom_to_list(Status))}]} ||
- {Rev, Status} <- RevsInfo],
+ fun({revs_info, Start, RevsInfo}) ->
+ {JsonRevsInfo, _Pos} = lists:mapfoldl(
+ fun({RevId, Status}, PosAcc) ->
+ JsonObj = {[{<<"rev">>, rev_to_str({PosAcc, RevId})},
+ {<<"status">>, ?l2b(atom_to_list(Status))}]},
+ {JsonObj, PosAcc - 1}
+ end, Start, RevsInfo),
{<<"_revs_info">>, JsonRevsInfo};
({conflicts, Conflicts}) ->
- {<<"_conflicts">>, Conflicts};
- ({deleted_conflicts, Conflicts}) ->
- {<<"_deleted_conflicts">>, Conflicts}
+ {<<"_conflicts">>, rev_to_strs(Conflicts)};
+ ({deleted_conflicts, DConflicts}) ->
+ {<<"_deleted_conflicts">>, rev_to_strs(DConflicts)}
end, Meta).
to_json_attachment_stubs(Attachments) ->
@@ -98,17 +110,62 @@ to_json_attachments(Attachments, Options) ->
to_json_attachment_stubs(Attachments)
end.
-to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs=Revs,meta=Meta}=Doc,Options)->
+to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds},
+ meta=Meta}=Doc,Options)->
{[{<<"_id">>, Id}]
- ++ to_json_rev(Revs)
+ ++ to_json_rev(Start, RevIds)
++ to_json_body(Del, Body)
- ++ to_json_revs(Options, Revs)
- ++ to_json_revs_info(Meta)
+ ++ to_json_revisions(Options, Start, RevIds)
+ ++ to_json_meta(Meta)
++ to_json_attachments(Doc#doc.attachments, Options)
}.
from_json_obj({Props}) ->
- {JsonBins} = proplists:get_value(<<"_attachments">>, Props, {[]}),
+ transfer_fields(Props, #doc{body=[]});
+
+from_json_obj(_Other) ->
+ throw({bad_request, "Document must be a JSON object"}).
+
+parse_rev(Rev) when is_binary(Rev) ->
+ parse_rev(?b2l(Rev));
+parse_rev(Rev) ->
+ {Pos, [$- | RevId]} = lists:splitwith(fun($-) -> false; (_) -> true end, Rev),
+ {list_to_integer(Pos), ?l2b(RevId)}.
+
+parse_revs([]) ->
+ [];
+parse_revs([Rev | Rest]) ->
+ [parse_rev(Rev) | parse_revs(Rest)].
+
+
+transfer_fields([], #doc{body=Fields}=Doc) ->
+ % convert fields back to json object
+ Doc#doc{body={lists:reverse(Fields)}};
+
+transfer_fields([{<<"_id">>, Id} | Rest], Doc) when is_binary(Id) ->
+ case Id of
+ <<"_design/", _/binary>> -> ok;
+ <<"_local/", _/binary>> -> ok;
+ <<"_", _/binary>> ->
+ throw({bad_request, <<"Only reserved document ids may start with underscore.">>});
+ _Else -> ok
+ end,
+ transfer_fields(Rest, Doc#doc{id=Id});
+
+transfer_fields([{<<"_id">>, Id} | _Rest], _Doc) ->
+ ?LOG_DEBUG("Document id is not a string: ~p", [Id]),
+ throw({bad_request, <<"Document id must be a string">>});
+
+transfer_fields([{<<"_rev">>, Rev} | Rest], #doc{revs={0, []}}=Doc) ->
+ {Pos, RevId} = parse_rev(Rev),
+ transfer_fields(Rest,
+ Doc#doc{revs={Pos, [RevId]}});
+
+transfer_fields([{<<"_rev">>, _Rev} | Rest], Doc) ->
+ % we already got the rev from the _revisions
+ transfer_fields(Rest,Doc);
+
+transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) ->
Bins = lists:flatmap(fun({Name, {BinProps}}) ->
case proplists:get_value(<<"stub">>, BinProps) of
true ->
@@ -122,51 +179,40 @@ from_json_obj({Props}) ->
[{Name, {Type, couch_util:decodeBase64(Value)}}]
end
end, JsonBins),
- AllowedSpecialMembers = [<<"id">>, <<"revs">>, <<"rev">>, <<"attachments">>, <<"revs_info">>,
- <<"conflicts">>, <<"deleted_conflicts">>, <<"deleted">>],
- % collect all the doc-members that start with "_"
- % if any aren't in the AllowedSpecialMembers list
- % then throw a invalid_doc error
- [case lists:member(Name, AllowedSpecialMembers) of
- true ->
- ok;
- false ->
- throw({invalid_doc, io_lib:format("Bad special document member: _~s", [Name])})
- end
- || {<<$_,Name/binary>>, _Value} <- Props],
- Revs =
- case proplists:get_value(<<"_revs">>, Props, []) of
- [] ->
- case proplists:get_value(<<"_rev">>, Props) of
- undefined -> [];
- Rev -> [Rev]
- end;
- Revs0 ->
- Revs0
- end,
- case proplists:get_value(<<"_id">>, Props, <<>>) of
- <<"_design/", _/binary>> = Id -> ok;
- <<"_local/", _/binary>> = Id -> ok;
- <<"_", _/binary>> = Id ->
- throw({invalid_doc, "Document Ids must not start with underscore."});
- Id when is_binary(Id) -> ok;
- Id ->
- ?LOG_DEBUG("Document id is not a string: ~p", [Id]),
- throw({invalid_doc, "Document id is not a string"})
+ transfer_fields(Rest, Doc#doc{attachments=Bins});
+
+transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) ->
+ RevIds = proplists:get_value(<<"ids">>, Props),
+ Start = proplists:get_value(<<"start">>, Props),
+ if not is_integer(Start) ->
+ throw({doc_validation, "_revisions.start isn't an integer."});
+ not is_list(RevIds) ->
+ throw({doc_validation, "_revisions.ids isn't a array."});
+ true ->
+ ok
end,
+ [throw({doc_validation, "RevId isn't a string"}) ||
+ RevId <- RevIds, not is_binary(RevId)],
+ transfer_fields(Rest, Doc#doc{revs={Start, RevIds}});
- % strip out the all props beginning with _
- NewBody = {[{K, V} || {<<First,_/binary>>=K, V} <- Props, First /= $_]},
- #doc{
- id = Id,
- revs = Revs,
- deleted = proplists:get_value(<<"_deleted">>, Props, false),
- body = NewBody,
- attachments = Bins
- };
+transfer_fields([{<<"_deleted">>, B} | Rest], Doc) when (B==true) or (B==false) ->
+ transfer_fields(Rest, Doc#doc{deleted=B});
-from_json_obj(_Other) ->
- throw({invalid_doc, "Document must be a JSON object"}).
+% ignored fields
+transfer_fields([{<<"_revs_info">>, _} | Rest], Doc) ->
+ transfer_fields(Rest, Doc);
+transfer_fields([{<<"_conflicts">>, _} | Rest], Doc) ->
+ transfer_fields(Rest, Doc);
+transfer_fields([{<<"_deleted_conflicts">>, _} | Rest], Doc) ->
+ transfer_fields(Rest, Doc);
+
+% unknown special field
+transfer_fields([{<<"_",Name/binary>>, Start} | _], _) when is_integer(Start) ->
+ throw({doc_validation,
+ ?l2b(io_lib:format("Bad special document member: _~s", [Name]))});
+
+transfer_fields([Field | Rest], #doc{body=Fields}=Doc) ->
+ transfer_fields(Rest, Doc#doc{body=[Field|Fields]}).
to_doc_info(FullDocInfo) ->
{DocInfo, _Path} = to_doc_info_path(FullDocInfo),
@@ -175,27 +221,26 @@ to_doc_info(FullDocInfo) ->
to_doc_info_path(#full_doc_info{id=Id,update_seq=Seq,rev_tree=Tree}) ->
LeafRevs = couch_key_tree:get_all_leafs(Tree),
SortedLeafRevs =
- lists:sort(fun({RevIdA, {IsDeletedA, _}, PathA}, {RevIdB, {IsDeletedB, _}, PathB}) ->
+ lists:sort(fun({{IsDeletedA, _}, {StartA, [RevIdA|_]}}, {{IsDeletedB, _}, {StartB, [RevIdB|_]}}) ->
% sort descending by {not deleted, then Depth, then RevisionId}
- A = {not IsDeletedA, length(PathA), RevIdA},
- B = {not IsDeletedB, length(PathB), RevIdB},
+ A = {not IsDeletedA, StartA, RevIdA},
+ B = {not IsDeletedB, StartB, RevIdB},
A > B
end,
LeafRevs),
- [{RevId, {IsDeleted, SummaryPointer}, Path} | Rest] = SortedLeafRevs,
-
+ [{{IsDeleted, SummaryPointer}, {Start, [RevId|_]}=Path} | Rest] = SortedLeafRevs,
{ConflictRevTuples, DeletedConflictRevTuples} =
- lists:splitwith(fun({_ConflictRevId, {IsDeleted1, _Sp}, _}) ->
+ lists:splitwith(fun({{IsDeleted1, _Sp}, _}) ->
not IsDeleted1
end, Rest),
- ConflictRevs = [RevId1 || {RevId1, _, _} <- ConflictRevTuples],
- DeletedConflictRevs = [RevId2 || {RevId2, _, _} <- DeletedConflictRevTuples],
+ ConflictRevs = [{Start1, RevId1} || {_, {Start1, [RevId1|_]}} <- ConflictRevTuples],
+ DeletedConflictRevs = [{Start1, RevId1} || {_, {Start1, [RevId1|_]}} <- DeletedConflictRevTuples],
DocInfo = #doc_info{
id=Id,
update_seq=Seq,
- rev = RevId,
+ rev = {Start, RevId},
summary_pointer = SummaryPointer,
conflict_revs = ConflictRevs,
deleted_conflict_revs = DeletedConflictRevs,
diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl
index 48ff403b..2ba684a8 100644
--- a/src/couchdb/couch_httpd.erl
+++ b/src/couchdb/couch_httpd.erl
@@ -16,7 +16,7 @@
-export([start_link/0, stop/0, handle_request/4]).
-export([header_value/2,header_value/3,qs_value/2,qs_value/3,qs/1,path/1,absolute_uri/2]).
--export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4]).
+-export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4,error_info/1]).
-export([parse_form/1,json_body/1,body/1,doc_etag/1, make_etag/1, etag_respond/3]).
-export([primary_header_value/2,partition/1,serve_file/3]).
-export([start_chunked_response/3,send_chunk/2]).
@@ -166,7 +166,7 @@ handle_request(MochiReq, UrlHandlers, DbUrlHandlers, DesignUrlHandlers) ->
catch
throw:Error ->
send_error(HttpReq, Error);
- Tag:Error ->
+ Tag:Error when Error ==foo ->
?LOG_ERROR("Uncaught error in HTTP request: ~p",[{Tag, Error}]),
?LOG_DEBUG("Stacktrace: ~p",[erlang:get_stacktrace()]),
send_error(HttpReq, Error)
@@ -295,8 +295,8 @@ body(#httpd{mochi_req=MochiReq}) ->
json_body(Httpd) ->
?JSON_DECODE(body(Httpd)).
-doc_etag(#doc{revs=[DiskRev|_]}) ->
- "\"" ++ binary_to_list(DiskRev) ++ "\"".
+doc_etag(#doc{revs={Start, [DiskRev|_]}}) ->
+ "\"" ++ ?b2l(couch_doc:rev_to_str({Start, DiskRev})) ++ "\"".
make_etag(Term) ->
<<SigInt:128/integer>> = erlang:md5(term_to_binary(Term)),
@@ -392,75 +392,55 @@ end_json_response(Resp) ->
send_chunk(Resp, []).
-send_error(Req, bad_request) ->
- send_error(Req, 400, <<"bad_request">>, <<>>);
-send_error(Req, {query_parse_error, Reason}) ->
- send_error(Req, 400, <<"query_parse_error">>, Reason);
-send_error(Req, {bad_request, Reason}) ->
- send_error(Req, 400, <<"bad_request">>, Reason);
-send_error(Req, not_found) ->
- send_error(Req, 404, <<"not_found">>, <<"Missing">>);
-send_error(Req, {not_found, Reason}) ->
- send_error(Req, 404, <<"not_found">>, Reason);
-send_error(Req, conflict) ->
- send_error(Req, 409, <<"conflict">>, <<"Document update conflict.">>);
-send_error(Req, {invalid_doc, Reason}) ->
- send_error(Req, 400, <<"invalid_doc">>, Reason);
-send_error(Req, {forbidden, Msg}) ->
- send_json(Req, 403,
- {[{<<"error">>, <<"forbidden">>},
- {<<"reason">>, Msg}]});
-send_error(Req, {unauthorized, Msg}) ->
- case couch_config:get("httpd", "WWW-Authenticate", nil) of
- nil ->
- Headers = [];
- Type ->
- Headers = [{"WWW-Authenticate", Type}]
- end,
- send_json(Req, 401, Headers,
- {[{<<"error">>, <<"unauthorized">>},
- {<<"reason">>, Msg}]});
-send_error(Req, {http_error, Code, Headers, Error, Reason}) ->
- send_json(Req, Code, Headers,
- {[{<<"error">>, Error}, {<<"reason">>, Reason}]});
-send_error(Req, {user_error, {Props}}) ->
- {Headers} = proplists:get_value(<<"headers">>, Props, {[]}),
- send_json(Req,
- proplists:get_value(<<"http_status">>, Props, 500),
- Headers,
- {[{<<"error">>, proplists:get_value(<<"error">>, Props)},
- {<<"reason">>, proplists:get_value(<<"reason">>, Props)}]});
-send_error(Req, file_exists) ->
- send_error(Req, 412, <<"file_exists">>, <<"The database could not be "
- "created, the file already exists.">>);
-send_error(Req, {Error, Reason}) ->
- send_error(Req, 500, Error, Reason);
-send_error(Req, Error) ->
- send_error(Req, 500, <<"error">>, Error).
-
+error_info(bad_request) ->
+ {400, <<"bad_request">>, <<>>};
+error_info({bad_request, Reason}) ->
+ {400, <<"bad_request">>, Reason};
+error_info({query_parse_error, Reason}) ->
+ {400, <<"query_parse_error">>, Reason};
+error_info(not_found) ->
+ {404, <<"not_found">>, <<"Missing">>};
+error_info({not_found, Reason}) ->
+ {404, <<"not_found">>, Reason};
+error_info(conflict) ->
+ {409, <<"conflict">>, <<"Document update conflict.">>};
+error_info({forbidden, Msg}) ->
+ {403, <<"forbidden">>, Msg};
+error_info({unauthorized, Msg}) ->
+ {401, <<"unauthorized">>, Msg};
+error_info(file_exists) ->
+ {412, <<"file_exists">>, <<"The database could not be "
+ "created, the file already exists.">>};
+error_info({Error, Reason}) ->
+ {500, couch_util:to_binary(Error), couch_util:to_binary(Reason)};
+error_info(Error) ->
+ {500, <<"unknown_error">>, couch_util:to_binary(Error)}.
+send_error(Req, Error) ->
+ {Code, ErrorStr, ReasonStr} = error_info(Error),
+ if Code == 401 ->
+ case couch_config:get("httpd", "WWW-Authenticate", nil) of
+ nil ->
+ Headers = [];
+ Type ->
+ Headers = [{"WWW-Authenticate", Type}]
+ end;
+ true ->
+ Headers = []
+ end,
+ send_error(Req, Code, Headers, ErrorStr, ReasonStr).
-send_error(Req, Code, Error, Msg) when is_atom(Error) ->
- send_error(Req, Code, list_to_binary(atom_to_list(Error)), Msg);
-send_error(Req, Code, Error, Msg) when is_list(Msg) ->
- case (catch list_to_binary(Msg)) of
- Bin when is_binary(Bin) ->
- send_error(Req, Code, Error, Bin);
- _ ->
- send_error(Req, Code, Error, io_lib:format("~p", [Msg]))
- end;
-send_error(Req, Code, Error, Msg) when not is_binary(Error) ->
- send_error(Req, Code, list_to_binary(io_lib:format("~p", [Error])), Msg);
-send_error(Req, Code, Error, Msg) when not is_binary(Msg) ->
- send_error(Req, Code, Error, list_to_binary(io_lib:format("~p", [Msg])));
-send_error(Req, Code, Error, <<>>) ->
- send_json(Req, Code, {[{<<"error">>, Error}]});
-send_error(Req, Code, Error, Msg) ->
- send_json(Req, Code, {[{<<"error">>, Error}, {<<"reason">>, Msg}]}).
+send_error(Req, Code, ErrorStr, ReasonStr) ->
+ send_error(Req, Code, [], ErrorStr, ReasonStr).
-send_redirect(Req, Path) ->
- Headers = [{"Location", couch_httpd:absolute_uri(Req, Path)}],
- send_response(Req, 301, Headers, <<>>).
+send_error(Req, Code, Headers, ErrorStr, ReasonStr) ->
+ send_json(Req, Code, Headers,
+ {[{<<"error">>, ErrorStr},
+ {<<"reason">>, ReasonStr}]}).
+
+ send_redirect(Req, Path) ->
+ Headers = [{"Location", couch_httpd:absolute_uri(Req, Path)}],
+ send_response(Req, 301, Headers, <<>>).
negotiate_content_type(#httpd{mochi_req=MochiReq}) ->
%% Determine the appropriate Content-Type header for a JSON response
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 75022cd3..3680d73b 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -22,8 +22,8 @@
-record(doc_query_args, {
options = [],
- rev = "",
- open_revs = ""
+ rev = nil,
+ open_revs = []
}).
% Database request handlers
@@ -89,13 +89,13 @@ db_req(#httpd{method='GET',path_parts=[_DbName]}=Req, Db) ->
db_req(#httpd{method='POST',path_parts=[DbName]}=Req, Db) ->
Doc = couch_doc:from_json_obj(couch_httpd:json_body(Req)),
DocId = couch_util:new_uuid(),
- {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=[]}, []),
+ {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []),
DocUrl = absolute_uri(Req,
binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)),
send_json(Req, 201, [{"Location", DocUrl}], {[
{ok, true},
{id, DocId},
- {rev, NewRev}
+ {rev, couch_doc:rev_to_str(NewRev)}
]});
db_req(#httpd{path_parts=[_DbName]}=Req, _Db) ->
@@ -131,32 +131,59 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) ->
<<>> -> couch_util:new_uuid();
Id0 -> Id0
end,
- Revs = case proplists:get_value(<<"_rev">>, ObjProps) of
- undefined -> [];
- Rev -> [Rev]
+ case proplists:get_value(<<"_rev">>, ObjProps) of
+ undefined ->
+ Revs = {0, []};
+ Rev ->
+ {Pos, RevId} = couch_doc:parse_rev(Rev),
+ Revs = {Pos, [RevId]}
end,
Doc#doc{id=Id,revs=Revs}
end,
DocsArray),
- {ok, ResultRevs} = couch_db:update_docs(Db, Docs, Options),
-
- % output the results
- DocResults = lists:zipwith(
- fun(Doc, NewRev) ->
- {[{<<"id">>, Doc#doc.id}, {<<"rev">>, NewRev}]}
- end,
- Docs, ResultRevs),
- send_json(Req, 201, {[
- {ok, true},
- {new_revs, DocResults}
- ]});
-
+ Options2 =
+ case proplists:get_value(<<"all_or_nothing">>, JsonProps) of
+ true -> [all_or_nothing|Options];
+ _ -> Options
+ end,
+ case couch_db:update_docs(Db, Docs, Options2) of
+ {ok, Results} ->
+ % output the results
+ DocResults = lists:zipwith(
+ fun(Doc, {ok, NewRev}) ->
+ {[{<<"id">>, Doc#doc.id}, {<<"rev">>, couch_doc:rev_to_str(NewRev)}]};
+ (Doc, Error) ->
+ {_Code, Err, Msg} = couch_httpd:error_info(Error),
+ % maybe we should add the http error code to the json?
+ {[{<<"id">>, Doc#doc.id}, {<<"error">>, Err}, {"reason", Msg}]}
+ end,
+ Docs, Results),
+ send_json(Req, 201, DocResults);
+ {aborted, Errors} ->
+ ErrorsJson =
+ lists:map(
+ fun({{Id, Rev}, Error}) ->
+ {_Code, Err, Msg} = couch_httpd:error_info(Error),
+ {[{<<"id">>, Id},
+ {<<"rev">>, couch_doc:rev_to_str(Rev)},
+ {<<"error">>, Err},
+ {"reason", Msg}]}
+ end, Errors),
+ send_json(Req, 417, ErrorsJson)
+ end;
false ->
Docs = [couch_doc:from_json_obj(JsonObj) || JsonObj <- DocsArray],
- ok = couch_db:update_docs(Db, Docs, Options, false),
- send_json(Req, 201, {[
- {ok, true}
- ]})
+ {ok, Errors} = couch_db:update_docs(Db, Docs, Options, replicated_changes),
+ ErrorsJson =
+ lists:map(
+ fun({{Id, Rev}, Error}) ->
+ {_Code, Err, Msg} = couch_httpd:error_info(Error),
+ {[{<<"id">>, Id},
+ {<<"rev">>, couch_doc:rev_to_str(Rev)},
+ {<<"error">>, Err},
+ {"reason", Msg}]}
+ end, Errors),
+ send_json(Req, 201, ErrorsJson)
end;
db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");
@@ -170,12 +197,12 @@ db_req(#httpd{path_parts=[_,<<"_compact">>]}=Req, _Db) ->
db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
{IdsRevs} = couch_httpd:json_body(Req),
- % validate the json input
- [{_Id, [_|_]=_Revs} = IdRevs || IdRevs <- IdsRevs],
+ IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
- case couch_db:purge_docs(Db, IdsRevs) of
+ case couch_db:purge_docs(Db, IdsRevs2) of
{ok, PurgeSeq, PurgedIdsRevs} ->
- send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs}}]});
+ PurgedIdsRevs2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs],
+ send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]});
Error ->
throw(Error)
end;
@@ -204,7 +231,7 @@ db_req(#httpd{method='GET',path_parts=[_,<<"_all_docs_by_seq">>]}=Req, Db) ->
{ok, Info} = couch_db:get_db_info(Db),
CurrentEtag = couch_httpd:make_etag(proplists:get_value(update_seq, Info)),
- couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
+ couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
TotalRowCount = proplists:get_value(doc_count, Info),
FoldlFun = couch_httpd_view:make_view_fold_fun(Req, QueryArgs, CurrentEtag, Db,
TotalRowCount, #view_fold_helper_funs{
@@ -227,14 +254,14 @@ db_req(#httpd{method='GET',path_parts=[_,<<"_all_docs_by_seq">>]}=Req, Db) ->
deleted_conflict_revs=DelConflictRevs
} = DocInfo,
Json = {
- [{<<"rev">>, Rev}] ++
+ [{<<"rev">>, couch_doc:rev_to_str(Rev)}] ++
case ConflictRevs of
[] -> [];
- _ -> [{<<"conflicts">>, ConflictRevs}]
+ _ -> [{<<"conflicts">>, couch_doc:rev_to_strs(ConflictRevs)}]
end ++
case DelConflictRevs of
[] -> [];
- _ -> [{<<"deleted_conflicts">>, DelConflictRevs}]
+ _ -> [{<<"deleted_conflicts">>, couch_doc:rev_to_strs(DelConflictRevs)}]
end ++
case Deleted of
true -> [{<<"deleted">>, true}];
@@ -251,9 +278,11 @@ db_req(#httpd{path_parts=[_,<<"_all_docs_by_seq">>]}=Req, _Db) ->
db_req(#httpd{method='POST',path_parts=[_,<<"_missing_revs">>]}=Req, Db) ->
{JsonDocIdRevs} = couch_httpd:json_body(Req),
- {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs),
+ JsonDocIdRevs2 = [{Id, [couch_doc:parse_rev(RevStr) || RevStr <- RevStrs]} || {Id, RevStrs} <- JsonDocIdRevs],
+ {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs2),
+ Results2 = [{Id, [couch_doc:rev_to_str(Rev) || Rev <- Revs]} || {Id, Revs} <- Results],
send_json(Req, {[
- {missing_revs, {Results}}
+ {missing_revs, {Results2}}
]});
db_req(#httpd{path_parts=[_,<<"_missing_revs">>]}=Req, _Db) ->
@@ -271,6 +300,18 @@ db_req(#httpd{method='GET',path_parts=[_,<<"_admins">>]}=Req, Db) ->
db_req(#httpd{path_parts=[_,<<"_admins">>]}=Req, _Db) ->
send_method_not_allowed(Req, "PUT,GET");
+db_req(#httpd{method='PUT',path_parts=[_,<<"_revs_limit">>]}=Req,
+ Db) ->
+ Limit = couch_httpd:json_body(Req),
+ ok = couch_db:set_revs_limit(Db, Limit),
+ send_json(Req, {[{<<"ok">>, true}]});
+
+db_req(#httpd{method='GET',path_parts=[_,<<"_revs_limit">>]}=Req, Db) ->
+ send_json(Req, couch_db:get_revs_limit(Db));
+
+db_req(#httpd{path_parts=[_,<<"_revs_limit">>]}=Req, _Db) ->
+ send_method_not_allowed(Req, "PUT,GET");
+
% Special case to enable using an unencoded slash in the URL of design docs,
% as slashes in document IDs must otherwise be URL encoded.
db_req(#httpd{method='GET',mochi_req=MochiReq, path_parts=[DbName,<<"_design/",_/binary>>|_]}=Req, _Db) ->
@@ -334,7 +375,7 @@ all_docs_view(Req, Db, Keys) ->
AdapterFun = fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc) ->
case couch_doc:to_doc_info(FullDocInfo) of
#doc_info{deleted=false, rev=Rev} ->
- FoldlFun({{Id, Id}, {[{rev, Rev}]}}, Offset, Acc);
+ FoldlFun({{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}}, Offset, Acc);
#doc_info{deleted=true} ->
{ok, Acc}
end
@@ -358,9 +399,9 @@ all_docs_view(Req, Db, Keys) ->
DocInfo = (catch couch_db:get_doc_info(Db, Key)),
Doc = case DocInfo of
{ok, #doc_info{id=Id, rev=Rev, deleted=false}} = DocInfo ->
- {{Id, Id}, {[{rev, Rev}]}};
+ {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}};
{ok, #doc_info{id=Id, rev=Rev, deleted=true}} = DocInfo ->
- {{Id, Id}, {[{rev, Rev}, {deleted, true}]}};
+ {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}, {deleted, true}]}};
not_found ->
{{Key, error}, not_found};
_ ->
@@ -381,20 +422,12 @@ all_docs_view(Req, Db, Keys) ->
-
-
db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) ->
- case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of
- missing_rev ->
- couch_httpd:send_error(Req, 409, <<"missing_rev">>,
- <<"Document rev/etag must be specified to delete">>);
- RevToDelete ->
- {ok, NewRev} = couch_db:delete_doc(Db, DocId, [RevToDelete]),
- send_json(Req, 200, {[
- {ok, true},
- {id, DocId},
- {rev, NewRev}
- ]})
+ case couch_httpd:qs_value(Req, "rev") of
+ undefined ->
+ update_doc(Req, Db, DocId, {[{<<"_deleted">>,true}]});
+ Rev ->
+ update_doc(Req, Db, DocId, {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]})
end;
db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
@@ -438,82 +471,31 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
end_json_response(Resp)
end;
-db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
- Form = couch_httpd:parse_form(Req),
- Rev = list_to_binary(proplists:get_value("_rev", Form)),
- Doc = case couch_db:open_doc_revs(Db, DocId, [Rev], []) of
- {ok, [{ok, Doc0}]} -> Doc0#doc{revs=[Rev]};
- {ok, [Error]} -> throw(Error)
- end,
-
- NewAttachments = [
- {validate_attachment_name(Name), {list_to_binary(ContentType), Content}} ||
- {Name, {ContentType, _}, Content} <-
- proplists:get_all_values("_attachments", Form)
- ],
- #doc{attachments=Attachments} = Doc,
- NewDoc = Doc#doc{
- attachments = Attachments ++ NewAttachments
- },
- {ok, NewRev} = couch_db:update_doc(Db, NewDoc, []),
-
- send_json(Req, 201, [{"Etag", "\"" ++ NewRev ++ "\""}], {obj, [
- {ok, true},
- {id, DocId},
- {rev, NewRev}
- ]});
-
db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
- Json = couch_httpd:json_body(Req),
- Doc = couch_doc:from_json_obj(Json),
- ExplicitRev =
- case Doc#doc.revs of
- [Rev0|_] -> Rev0;
- [] -> undefined
- end,
- validate_attachment_names(Doc),
- case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of
- "true" ->
- Options = [full_commit];
- _ ->
- Options = []
- end,
- case extract_header_rev(Req, ExplicitRev) of
- missing_rev ->
- Revs = [];
- Rev ->
- Revs = [Rev]
- end,
- {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options),
- send_json(Req, 201, [{"Etag", <<"\"", NewRev/binary, "\"">>}], {[
- {ok, true},
- {id, DocId},
- {rev, NewRev}
- ]});
+ update_doc(Req, Db, DocId, couch_httpd:json_body(Req));
db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) ->
SourceRev =
case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of
- missing_rev -> [];
+ missing_rev -> nil;
Rev -> Rev
end,
- {TargetDocId, TargetRev} = parse_copy_destination_header(Req),
+ {TargetDocId, TargetRevs} = parse_copy_destination_header(Req),
% open revision Rev or Current
Doc = couch_doc_open(Db, SourceDocId, SourceRev, []),
-
% save new doc
- {ok, NewTargetRev} = couch_db:update_doc(Db, Doc#doc{id=TargetDocId, revs=TargetRev}, []),
-
- send_json(Req, 201, [{"Etag", "\"" ++ binary_to_list(NewTargetRev) ++ "\""}], {[
- {ok, true},
- {id, TargetDocId},
- {rev, NewTargetRev}
- ]});
+ case couch_db:update_doc(Db, Doc#doc{id=TargetDocId, revs=TargetRevs}, []) of
+ {ok, NewTargetRev} ->
+ send_json(Req, 201, [{"Etag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewTargetRev)) ++ "\""}],
+ update_result_to_json({ok, NewTargetRev}));
+ Error ->
+ throw(Error)
+ end;
db_doc_req(#httpd{method='MOVE'}=Req, Db, SourceDocId) ->
- SourceRev =
+ SourceRev = {SourceRevPos, SourceRevId} =
case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of
missing_rev ->
throw({bad_request, "MOVE requires a specified rev parameter"
@@ -521,37 +503,68 @@ db_doc_req(#httpd{method='MOVE'}=Req, Db, SourceDocId) ->
Rev -> Rev
end,
- {TargetDocId, TargetRev} = parse_copy_destination_header(Req),
+ {TargetDocId, TargetRevs} = parse_copy_destination_header(Req),
% open revision Rev or Current
Doc = couch_doc_open(Db, SourceDocId, SourceRev, []),
% save new doc & delete old doc in one operation
Docs = [
- Doc#doc{id=TargetDocId, revs=TargetRev},
- #doc{id=SourceDocId, revs=[SourceRev], deleted=true}
+ #doc{id=SourceDocId, revs={SourceRevPos, [SourceRevId]}, deleted=true},
+ Doc#doc{id=TargetDocId, revs=TargetRevs}
],
- {ok, ResultRevs} = couch_db:update_docs(Db, Docs, []),
+ {ok, [SourceResult, TargetResult]} = couch_db:update_docs(Db, Docs, []),
- DocResults = lists:zipwith(
- fun(FDoc, NewRev) ->
- {[{id, FDoc#doc.id}, {rev, NewRev}]}
- end,
- Docs, ResultRevs),
send_json(Req, 201, {[
- {ok, true},
- {new_revs, DocResults}
+ {SourceDocId, update_result_to_json(SourceResult)},
+ {TargetDocId, update_result_to_json(TargetResult)}
]});
db_doc_req(Req, _Db, _DocId) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY,MOVE").
+update_result_to_json({ok, NewRev}) ->
+ {[{rev, couch_doc:rev_to_str(NewRev)}]};
+update_result_to_json(Error) ->
+ {_Code, ErrorStr, Reason} = couch_httpd:error_info(Error),
+ {[{error, ErrorStr}, {reason, Reason}]}.
+
+
+update_doc(Req, Db, DocId, Json) ->
+ #doc{deleted=Deleted} = Doc = couch_doc:from_json_obj(Json),
+ validate_attachment_names(Doc),
+ ExplicitDocRev =
+ case Doc#doc.revs of
+ {Start,[RevId|_]} -> {Start, RevId};
+ _ -> undefined
+ end,
+ case extract_header_rev(Req, ExplicitDocRev) of
+ missing_rev ->
+ Revs = {0, []};
+ {Pos, Rev} ->
+ Revs = {Pos, [Rev]}
+ end,
+
+ case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of
+ "true" ->
+ Options = [full_commit];
+ _ ->
+ Options = []
+ end,
+ {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options),
+ NewRevStr = couch_doc:rev_to_str(NewRev),
+ send_json(Req, if Deleted -> 200; true -> 201 end,
+ [{"Etag", <<"\"", NewRevStr/binary, "\"">>}], {[
+ {ok, true},
+ {id, DocId},
+ {rev, NewRevStr}]}).
+
% Useful for debugging
% couch_doc_open(Db, DocId) ->
% couch_doc_open(Db, DocId, [], []).
couch_doc_open(Db, DocId, Rev, Options) ->
case Rev of
- "" -> % open most recent rev
+ nil -> % open most recent rev
case couch_db:open_doc(Db, DocId, Options) of
{ok, Doc} ->
Doc;
@@ -572,13 +585,13 @@ couch_doc_open(Db, DocId, Rev, Options) ->
db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) ->
FileName = list_to_binary(mochiweb_util:join(lists:map(fun binary_to_list/1, FileNameParts),"/")),
case couch_db:open_doc(Db, DocId, []) of
- {ok, #doc{attachments=Attachments, revs=[LastRev|_OldRevs]}} ->
+ {ok, #doc{attachments=Attachments}=Doc} ->
case proplists:get_value(FileName, Attachments) of
undefined ->
throw({not_found, "Document is missing attachment"});
{Type, Bin} ->
{ok, Resp} = start_chunked_response(Req, 200, [
- {"ETag", binary_to_list(LastRev)},
+ {"ETag", couch_httpd:doc_etag(Doc)},
{"Cache-Control", "must-revalidate"},
{"Content-Type", binary_to_list(Type)}%,
% My understanding of http://www.faqs.org/rfcs/rfc2616.html
@@ -640,7 +653,7 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts)
#doc{id=DocId};
Rev ->
case couch_db:open_doc_revs(Db, DocId, [Rev], []) of
- {ok, [{ok, Doc0}]} -> Doc0#doc{revs=[Rev]};
+ {ok, [{ok, Doc0}]} -> Doc0;
{ok, [Error]} -> throw(Error)
end
end,
@@ -653,7 +666,7 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts)
send_json(Req, case Method of 'DELETE' -> 200; _ -> 201 end, {[
{ok, true},
{id, DocId},
- {rev, UpdatedRev}
+ {rev, couch_doc:rev_to_str(UpdatedRev)}
]});
db_attachment_req(Req, _Db, _DocId, _FileNameParts) ->
@@ -682,25 +695,24 @@ parse_doc_query(Req) ->
Options = [deleted_conflicts | Args#doc_query_args.options],
Args#doc_query_args{options=Options};
{"rev", Rev} ->
- Args#doc_query_args{rev=list_to_binary(Rev)};
+ Args#doc_query_args{rev=couch_doc:parse_rev(Rev)};
{"open_revs", "all"} ->
Args#doc_query_args{open_revs=all};
{"open_revs", RevsJsonStr} ->
JsonArray = ?JSON_DECODE(RevsJsonStr),
- Args#doc_query_args{open_revs=JsonArray};
+ Args#doc_query_args{open_revs=[couch_doc:parse_rev(Rev) || Rev <- JsonArray]};
_Else -> % unknown key value pair, ignore.
Args
end
end, #doc_query_args{}, couch_httpd:qs(Req)).
-
-extract_header_rev(Req, ExplicitRev) when is_list(ExplicitRev)->
- extract_header_rev(Req, list_to_binary(ExplicitRev));
+extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)->
+ extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));
extract_header_rev(Req, ExplicitRev) ->
Etag = case couch_httpd:header_value(Req, "If-Match") of
undefined -> undefined;
- Value -> list_to_binary(string:strip(Value, both, $"))
+ Value -> couch_doc:parse_rev(string:strip(Value, both, $"))
end,
case {ExplicitRev, Etag} of
{undefined, undefined} -> missing_rev;
@@ -716,11 +728,12 @@ parse_copy_destination_header(Req) ->
Destination = couch_httpd:header_value(Req, "Destination"),
case regexp:match(Destination, "\\?") of
nomatch ->
- {list_to_binary(Destination), []};
+ {list_to_binary(Destination), {0, []}};
{match, _, _} ->
{ok, [DocId, RevQueryOptions]} = regexp:split(Destination, "\\?"),
{ok, [_RevQueryKey, Rev]} = regexp:split(RevQueryOptions, "="),
- {list_to_binary(DocId), [list_to_binary(Rev)]}
+ {Pos, RevId} = couch_doc:parse_rev(Rev),
+ {list_to_binary(DocId), {Pos, [RevId]}}
end.
validate_attachment_names(Doc) ->
diff --git a/src/couchdb/couch_httpd_misc_handlers.erl b/src/couchdb/couch_httpd_misc_handlers.erl
index 583a87c2..46fed095 100644
--- a/src/couchdb/couch_httpd_misc_handlers.erl
+++ b/src/couchdb/couch_httpd_misc_handlers.erl
@@ -70,35 +70,36 @@ handle_task_status_req(#httpd{method='GET'}=Req) ->
handle_task_status_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
+% add trailing slash if missing
+fix_db_url(UrlBin) ->
+ ?l2b(case lists:last(Url = ?b2l(UrlBin)) of
+ $/ -> Url;
+ _ -> Url ++ "/"
+ end).
+
-handle_replicate_req(#httpd{user_ctx=UserCtx,method='POST'}=Req) ->
+get_rep_endpoint(_Req, {Props}) ->
+ Url = proplists:get_value(<<"url">>, Props),
+ {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
+ {remote, fix_db_url(Url), [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+get_rep_endpoint(_Req, <<"http://",_/binary>>=Url) ->
+ {remote, fix_db_url(Url), []};
+get_rep_endpoint(_Req, <<"https://",_/binary>>=Url) ->
+ {remote, fix_db_url(Url), []};
+get_rep_endpoint(#httpd{user_ctx=UserCtx}, <<DbName/binary>>) ->
+ {local, DbName, UserCtx}.
+
+handle_replicate_req(#httpd{method='POST'}=Req) ->
{Props} = couch_httpd:json_body(Req),
- Source = proplists:get_value(<<"source">>, Props),
- Target = proplists:get_value(<<"target">>, Props),
-
- {SrcOpts} = proplists:get_value(<<"source_options">>, Props, {[]}),
- {SrcHeadersBinary} = proplists:get_value(<<"headers">>, SrcOpts, {[]}),
- SrcHeaders = [{?b2l(K),(V)} || {K,V} <- SrcHeadersBinary],
-
- {TgtOpts} = proplists:get_value(<<"target_options">>, Props, {[]}),
- {TgtHeadersBinary} = proplists:get_value(<<"headers">>, TgtOpts, {[]}),
- TgtHeaders = [{?b2l(K),(V)} || {K,V} <- TgtHeadersBinary],
-
- {Options} = proplists:get_value(<<"options">>, Props, {[]}),
- Options2 = [{source_options,
- [{headers, SrcHeaders},
- {user_ctx, UserCtx}]},
- {target_options,
- [{headers, TgtHeaders},
- {user_ctx, UserCtx}]}
- | Options],
- case couch_rep:replicate(Source, Target, Options2) of
- {ok, {JsonResults}} ->
- send_json(Req, {[{ok, true} | JsonResults]});
- {error, {Type, Details}} ->
- send_json(Req, 500, {[{error, Type}, {reason, Details}]});
- {error, Reason} ->
- send_json(Req, 500, {[{error, Reason}]})
+ Source = get_rep_endpoint(Req, proplists:get_value(<<"source">>, Props)),
+ Target = get_rep_endpoint(Req, proplists:get_value(<<"target">>, Props)),
+ case couch_rep:replicate(Source, Target) of
+ {ok, {JsonResults}} ->
+ send_json(Req, {[{ok, true} | JsonResults]});
+ {error, {Type, Details}} ->
+ send_json(Req, 500, {[{error, Type}, {reason, Details}]});
+ {error, Reason} ->
+ send_json(Req, 500, {[{error, Reason}]})
end;
handle_replicate_req(Req) ->
send_method_not_allowed(Req, "POST").
diff --git a/src/couchdb/couch_httpd_show.erl b/src/couchdb/couch_httpd_show.erl
index 7b6f2832..dd337ced 100644
--- a/src/couchdb/couch_httpd_show.erl
+++ b/src/couchdb/couch_httpd_show.erl
@@ -27,10 +27,10 @@ handle_doc_show_req(#httpd{
path_parts=[_DbName, _Design, DesignName, _Show, ShowName, DocId]
}=Req, Db) ->
DesignId = <<"_design/", DesignName/binary>>,
- #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+ #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
ShowSrc = get_nested_json_value({Props}, [<<"shows">>, ShowName]),
- Doc = try couch_httpd_db:couch_doc_open(Db, DocId, [], []) of
+ Doc = try couch_httpd_db:couch_doc_open(Db, DocId, nil, []) of
FoundDoc -> FoundDoc
catch
_ -> nil
@@ -42,7 +42,7 @@ handle_doc_show_req(#httpd{
path_parts=[_DbName, _Design, DesignName, _Show, ShowName]
}=Req, Db) ->
DesignId = <<"_design/", DesignName/binary>>,
- #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+ #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
ShowSrc = get_nested_json_value({Props}, [<<"shows">>, ShowName]),
send_doc_show_response(Lang, ShowSrc, nil, nil, Req, Db);
@@ -56,7 +56,7 @@ handle_doc_show_req(Req, _Db) ->
handle_view_list_req(#httpd{method='GET',
path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewName]}=Req, Db) ->
DesignId = <<"_design/", DesignName/binary>>,
- #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+ #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
ListSrc = get_nested_json_value({Props}, [<<"lists">>, ListName]),
send_view_list_response(Lang, ListSrc, ViewName, DesignId, Req, Db, nil);
@@ -67,7 +67,7 @@ handle_view_list_req(#httpd{method='GET'}=Req, _Db) ->
handle_view_list_req(#httpd{method='POST',
path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewName]}=Req, Db) ->
DesignId = <<"_design/", DesignName/binary>>,
- #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+ #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
ListSrc = get_nested_json_value({Props}, [<<"lists">>, ListName]),
ReqBody = couch_httpd:body(Req),
@@ -370,13 +370,12 @@ send_doc_show_response(Lang, ShowSrc, DocId, nil, #httpd{mochi_req=MReq}=Req, Db
couch_httpd_external:send_external_response(Req, JsonResp)
end);
-send_doc_show_response(Lang, ShowSrc, DocId, #doc{revs=[DocRev|_]}=Doc,
- #httpd{mochi_req=MReq}=Req, Db) ->
+send_doc_show_response(Lang, ShowSrc, DocId, #doc{revs=Revs}=Doc, #httpd{mochi_req=MReq}=Req, Db) ->
% calculate the etag
Headers = MReq:get(headers),
Hlist = mochiweb_headers:to_list(Headers),
Accept = proplists:get_value('Accept', Hlist),
- CurrentEtag = couch_httpd:make_etag({Lang, ShowSrc, DocRev, Accept}),
+ CurrentEtag = couch_httpd:make_etag({Lang, ShowSrc, Revs, Accept}),
% We know our etag now
couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
ExternalResp = couch_query_servers:render_doc_show(Lang, ShowSrc,
diff --git a/src/couchdb/couch_httpd_view.erl b/src/couchdb/couch_httpd_view.erl
index 322cf945..7f338655 100644
--- a/src/couchdb/couch_httpd_view.erl
+++ b/src/couchdb/couch_httpd_view.erl
@@ -565,14 +565,14 @@ view_row_obj(Db, {{Key, DocId}, Value}, IncludeDocs) ->
true ->
Rev = case Value of
{Props} ->
- case is_list(Props) of
- true ->
- proplists:get_value(<<"_rev">>, Props, []);
- _ ->
- []
+ case proplists:get_value(<<"_rev">>, Props) of
+ undefined ->
+ nil;
+ Rev0 ->
+ couch_doc:parse_rev(Rev0)
end;
_ ->
- []
+ nil
end,
?LOG_DEBUG("Include Doc: ~p ~p", [DocId, Rev]),
case (catch couch_httpd_db:couch_doc_open(Db, DocId, Rev, [])) of
diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl
index 82c42265..df24d4e1 100644
--- a/src/couchdb/couch_key_tree.erl
+++ b/src/couchdb/couch_key_tree.erl
@@ -13,7 +13,8 @@
-module(couch_key_tree).
-export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]).
--export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1, remove_leafs/2,get_all_leafs_full/1]).
+-export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2,
+ get_all_leafs_full/1,stem/2,test/0]).
% a key tree looks like this:
% Tree -> [] or [{Key, Value, ChildTree} | SiblingTree]
@@ -22,70 +23,150 @@
% And each Key < SiblingKey
+% partial trees arranged by how much they are cut off.
-% key tree functions
+merge(A, B) ->
+ {Merged, HasConflicts} =
+ lists:foldl(
+ fun(InsertTree, {AccTrees, AccConflicts}) ->
+ case merge_one(AccTrees, InsertTree, [], false) of
+ {ok, Merged, Conflicts} ->
+ {Merged, Conflicts or AccConflicts};
+ no ->
+ {[InsertTree | AccTrees], true}
+ end
+ end,
+ {A, false}, B),
+ if HasConflicts or
+ ((length(Merged) /= length(A)) and (length(Merged) /= length(B))) ->
+ Conflicts = conflicts;
+ true ->
+ Conflicts = no_conflicts
+ end,
+ {lists:sort(Merged), Conflicts}.
+
+merge_one([], Insert, OutAcc, ConflictsAcc) ->
+ {ok, [Insert | OutAcc], ConflictsAcc};
+merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, OutAcc, ConflictsAcc) ->
+ if Start =< StartInsert ->
+ StartA = Start,
+ StartB = StartInsert,
+ TreeA = Tree,
+ TreeB = TreeInsert;
+ true ->
+ StartB = Start,
+ StartA = StartInsert,
+ TreeB = Tree,
+ TreeA = TreeInsert
+ end,
+ case merge_at([TreeA], StartB - StartA, TreeB) of
+ {ok, [CombinedTrees], Conflicts} ->
+ merge_one(Rest, {StartA, CombinedTrees}, OutAcc, Conflicts or ConflictsAcc);
+ no ->
+ merge_one(Rest, {StartB, TreeB}, [{StartA, TreeA} | OutAcc], ConflictsAcc)
+ end.
+
+merge_at([], _Place, _Insert) ->
+ no;
+merge_at([{Key, Value, SubTree}|Sibs], 0, {InsertKey, InsertValue, InsertSubTree}) ->
+ if Key == InsertKey ->
+ {Merge, Conflicts} = merge_simple(SubTree, InsertSubTree),
+ {ok, [{Key, Value, Merge} | Sibs], Conflicts};
+ true ->
+ case merge_at(Sibs, 0, {InsertKey, InsertValue, InsertSubTree}) of
+ {ok, Merged, Conflicts} ->
+ {ok, [{Key, Value, SubTree} | Merged], Conflicts};
+ no ->
+ no
+ end
+ end;
+merge_at([{Key, Value, SubTree}|Sibs], Place, Insert) ->
+ case merge_at(SubTree, Place - 1,Insert) of
+ {ok, Merged, Conflicts} ->
+ {ok, [{Key, Value, Merged} | Sibs], Conflicts};
+ no ->
+ case merge_at(Sibs, Place, Insert) of
+ {ok, Merged} ->
+ [{Key, Value, SubTree} | Merged];
+ no ->
+ no
+ end
+ end.
-% When the same key is found in the trees, the value in tree B is discarded.
-merge([], B) ->
- B;
-merge(A, []) ->
- A;
-merge([ATree | ANextTree], [BTree | BNextTree]) ->
+% key tree functions
+merge_simple([], B) ->
+ {B, false};
+merge_simple(A, []) ->
+ {A, false};
+merge_simple([ATree | ANextTree], [BTree | BNextTree]) ->
{AKey, AValue, ASubTree} = ATree,
{BKey, _BValue, BSubTree} = BTree,
if
AKey == BKey ->
%same key
- MergedSubTree = merge(ASubTree, BSubTree),
- MergedNextTree = merge(ANextTree, BNextTree),
- [{AKey, AValue, MergedSubTree} | MergedNextTree];
+ {MergedSubTree, Conflict1} = merge_simple(ASubTree, BSubTree),
+ {MergedNextTree, Conflict2} = merge_simple(ANextTree, BNextTree),
+ {[{AKey, AValue, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2};
AKey < BKey ->
- [ATree | merge(ANextTree, [BTree | BNextTree])];
+ {MTree, _} = merge_simple(ANextTree, [BTree | BNextTree]),
+ {[ATree | MTree], true};
true ->
- [BTree | merge([ATree | ANextTree], BNextTree)]
+ {MTree, _} = merge_simple([ATree | ANextTree], BNextTree),
+ {[BTree | MTree], true}
end.
find_missing(_Tree, []) ->
[];
-find_missing([], Keys) ->
- Keys;
-find_missing([{Key, _, SubTree} | RestTree], Keys) ->
- SrcKeys2 = Keys -- [Key],
- SrcKeys3 = find_missing(SubTree, SrcKeys2),
- find_missing(RestTree, SrcKeys3).
-
-
-get_all_key_paths_rev([], KeyPathAcc) ->
- KeyPathAcc;
-get_all_key_paths_rev([{Key, Value, SubTree} | RestTree], KeyPathAcc) ->
- get_all_key_paths_rev(SubTree, [{Key, Value} | KeyPathAcc]) ++
- get_all_key_paths_rev(RestTree, KeyPathAcc).
-
+find_missing([], SeachKeys) ->
+ SeachKeys;
+find_missing([{Start, {Key, Value, SubTree}} | RestTree], SeachKeys) ->
+ PossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos >= Start],
+ ImpossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos < Start],
+ Missing = find_missing_simple(Start, [{Key, Value, SubTree}], PossibleKeys),
+ find_missing(RestTree, ImpossibleKeys ++ Missing).
+
+find_missing_simple(_Pos, _Tree, []) ->
+ [];
+find_missing_simple(_Pos, [], SeachKeys) ->
+ SeachKeys;
+find_missing_simple(Pos, [{Key, _, SubTree} | RestTree], SeachKeys) ->
+ PossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos >= Pos],
+ ImpossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos < Pos],
+
+ SrcKeys2 = PossibleKeys -- [{Pos, Key}],
+ SrcKeys3 = find_missing_simple(Pos + 1, SubTree, SrcKeys2),
+ ImpossibleKeys ++ find_missing_simple(Pos, RestTree, SrcKeys3).
+
+
+filter_leafs([], _Keys, FilteredAcc, RemovedKeysAcc) ->
+ {FilteredAcc, RemovedKeysAcc};
+filter_leafs([{Pos, [{LeafKey, _}|_]} = Path |Rest], Keys, FilteredAcc, RemovedKeysAcc) ->
+ FilteredKeys = lists:delete({Pos, LeafKey}, Keys),
+ if FilteredKeys == Keys ->
+ % this leaf is not a key we are looking to remove
+ filter_leafs(Rest, Keys, [Path | FilteredAcc], RemovedKeysAcc);
+ true ->
+ % this did match a key, remove both the node and the input key
+ filter_leafs(Rest, FilteredKeys, FilteredAcc, [{Pos, LeafKey} | RemovedKeysAcc])
+ end.
% Removes any branches from the tree whose leaf node(s) are in the Keys
-remove_leafs(Tree, Keys) ->
+remove_leafs(Trees, Keys) ->
% flatten each branch in a tree into a tree path
- Paths = get_all_key_paths_rev(Tree, []),
+ Paths = get_all_leafs_full(Trees),
% filter out any that are in the keys list.
- {FoundKeys, FilteredPaths} = lists:mapfoldl(
- fun(Key, PathsAcc) ->
- case [Path || [{LeafKey,_}|_]=Path <- PathsAcc, LeafKey /= Key] of
- PathsAcc ->
- {nil, PathsAcc};
- PathsAcc2 ->
- {Key, PathsAcc2}
- end
- end, Paths, Keys),
-
+ {FilteredPaths, RemovedKeys} = filter_leafs(Paths, Keys, [], []),
+
% convert paths back to trees
NewTree = lists:foldl(
- fun(Path,TreeAcc) ->
- SingleTree = lists:foldl(
+ fun({PathPos, Path},TreeAcc) ->
+ [SingleTree] = lists:foldl(
fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path),
- merge(TreeAcc, SingleTree)
+ {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]),
+ NewTrees
end, [], FilteredPaths),
- {NewTree, FoundKeys}.
+ {NewTree, RemovedKeys}.
% get the leafs in the tree matching the keys. The matching key nodes can be
@@ -94,87 +175,211 @@ remove_leafs(Tree, Keys) ->
get_key_leafs(Tree, Keys) ->
get_key_leafs(Tree, Keys, []).
-get_key_leafs(_Tree, [], _KeyPathAcc) ->
+get_key_leafs(_, [], Acc) ->
+ {Acc, []};
+get_key_leafs([], Keys, Acc) ->
+ {Acc, Keys};
+get_key_leafs([{Pos, Tree}|Rest], Keys, Acc) ->
+ {Gotten, RemainingKeys} = get_key_leafs_simple(Pos, [Tree], Keys, []),
+ get_key_leafs(Rest, RemainingKeys, Gotten ++ Acc).
+
+get_key_leafs_simple(_Pos, _Tree, [], _KeyPathAcc) ->
{[], []};
-get_key_leafs([], KeysToGet, _KeyPathAcc) ->
+get_key_leafs_simple(_Pos, [], KeysToGet, _KeyPathAcc) ->
{[], KeysToGet};
-get_key_leafs([{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) ->
- case KeysToGet -- [Key] of
+get_key_leafs_simple(Pos, [{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) ->
+ case lists:delete({Pos, Key}, KeysToGet) of
KeysToGet -> % same list, key not found
- {LeafsFound, KeysToGet2} = get_key_leafs(SubTree, KeysToGet, [Key | KeyPathAcc]),
- {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+ {LeafsFound, KeysToGet2} = get_key_leafs_simple(Pos + 1, SubTree, KeysToGet, [Key | KeyPathAcc]),
+ {RestLeafsFound, KeysRemaining} = get_key_leafs_simple(Pos, RestTree, KeysToGet2, KeyPathAcc),
{LeafsFound ++ RestLeafsFound, KeysRemaining};
KeysToGet2 ->
- LeafsFound = get_all_leafs([Tree], KeyPathAcc),
+ LeafsFound = get_all_leafs_simple(Pos, [Tree], KeyPathAcc),
LeafKeysFound = [LeafKeyFound || {LeafKeyFound, _, _} <- LeafsFound],
KeysToGet2 = KeysToGet2 -- LeafKeysFound,
- {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+ {RestLeafsFound, KeysRemaining} = get_key_leafs_simple(Pos, RestTree, KeysToGet2, KeyPathAcc),
{LeafsFound ++ RestLeafsFound, KeysRemaining}
end.
get(Tree, KeysToGet) ->
{KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet),
- FixedResults = [ {Key, Value, [Key0 || {Key0, _} <- Path]} || [{Key, Value}|_] = Path <- KeyPaths],
+ FixedResults = [ {Value, {Pos, [Key0 || {Key0, _} <- Path]}} || {Pos, [{_Key, Value}|_]=Path} <- KeyPaths],
{FixedResults, KeysNotFound}.
get_full_key_paths(Tree, Keys) ->
get_full_key_paths(Tree, Keys, []).
-get_full_key_paths(_Tree, [], _KeyPathAcc) ->
+get_full_key_paths(_, [], Acc) ->
+ {Acc, []};
+get_full_key_paths([], Keys, Acc) ->
+ {Acc, Keys};
+get_full_key_paths([{Pos, Tree}|Rest], Keys, Acc) ->
+ {Gotten, RemainingKeys} = get_full_key_paths(Pos, [Tree], Keys, []),
+ get_full_key_paths(Rest, RemainingKeys, Gotten ++ Acc).
+
+
+get_full_key_paths(_Pos, _Tree, [], _KeyPathAcc) ->
{[], []};
-get_full_key_paths([], KeysToGet, _KeyPathAcc) ->
+get_full_key_paths(_Pos, [], KeysToGet, _KeyPathAcc) ->
{[], KeysToGet};
-get_full_key_paths([{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) ->
- KeysToGet2 = KeysToGet -- [KeyId],
+get_full_key_paths(Pos, [{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) ->
+ KeysToGet2 = KeysToGet -- [{Pos, KeyId}],
CurrentNodeResult =
case length(KeysToGet2) == length(KeysToGet) of
true -> % not in the key list.
[];
false -> % this node is the key list. return it
- [[{KeyId, Value} | KeyPathAcc]]
+ [{Pos, [{KeyId, Value} | KeyPathAcc]}]
end,
- {KeysGotten, KeysRemaining} = get_full_key_paths(SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]),
- {KeysGotten2, KeysRemaining2} = get_full_key_paths(RestTree, KeysRemaining, KeyPathAcc),
+ {KeysGotten, KeysRemaining} = get_full_key_paths(Pos + 1, SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]),
+ {KeysGotten2, KeysRemaining2} = get_full_key_paths(Pos, RestTree, KeysRemaining, KeyPathAcc),
{CurrentNodeResult ++ KeysGotten ++ KeysGotten2, KeysRemaining2}.
get_all_leafs_full(Tree) ->
get_all_leafs_full(Tree, []).
-get_all_leafs_full([], _KeyPathAcc) ->
+get_all_leafs_full([], Acc) ->
+ Acc;
+get_all_leafs_full([{Pos, Tree} | Rest], Acc) ->
+ get_all_leafs_full(Rest, get_all_leafs_full_simple(Pos, [Tree], []) ++ Acc).
+
+get_all_leafs_full_simple(_Pos, [], _KeyPathAcc) ->
[];
-get_all_leafs_full([{KeyId, Value, []} | RestTree], KeyPathAcc) ->
- [[{KeyId, Value} | KeyPathAcc] | get_all_leafs_full(RestTree, KeyPathAcc)];
-get_all_leafs_full([{KeyId, Value, SubTree} | RestTree], KeyPathAcc) ->
- get_all_leafs_full(SubTree, [{KeyId, Value} | KeyPathAcc]) ++ get_all_leafs_full(RestTree, KeyPathAcc).
+get_all_leafs_full_simple(Pos, [{KeyId, Value, []} | RestTree], KeyPathAcc) ->
+ [{Pos, [{KeyId, Value} | KeyPathAcc]} | get_all_leafs_full_simple(Pos, RestTree, KeyPathAcc)];
+get_all_leafs_full_simple(Pos, [{KeyId, Value, SubTree} | RestTree], KeyPathAcc) ->
+ get_all_leafs_full_simple(Pos + 1, SubTree, [{KeyId, Value} | KeyPathAcc]) ++ get_all_leafs_full_simple(Pos, RestTree, KeyPathAcc).
-get_all_leafs(Tree) ->
- get_all_leafs(Tree, []).
+get_all_leafs(Trees) ->
+ get_all_leafs(Trees, []).
+
+get_all_leafs([], Acc) ->
+ Acc;
+get_all_leafs([{Pos, Tree}|Rest], Acc) ->
+ get_all_leafs(Rest, get_all_leafs_simple(Pos, [Tree], []) ++ Acc).
-get_all_leafs([], _KeyPathAcc) ->
+get_all_leafs_simple(_Pos, [], _KeyPathAcc) ->
[];
-get_all_leafs([{KeyId, Value, []} | RestTree], KeyPathAcc) ->
- [{KeyId, Value, [KeyId | KeyPathAcc]} | get_all_leafs(RestTree, KeyPathAcc)];
-get_all_leafs([{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) ->
- get_all_leafs(SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs(RestTree, KeyPathAcc).
+get_all_leafs_simple(Pos, [{KeyId, Value, []} | RestTree], KeyPathAcc) ->
+ [{Value, {Pos, [KeyId | KeyPathAcc]}} | get_all_leafs_simple(Pos, RestTree, KeyPathAcc)];
+get_all_leafs_simple(Pos, [{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) ->
+ get_all_leafs_simple(Pos + 1, SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs_simple(Pos, RestTree, KeyPathAcc).
+
-get_leaf_keys([]) ->
- [];
-get_leaf_keys([{Key, _Value, []} | RestTree]) ->
- [Key | get_leaf_keys(RestTree)];
-get_leaf_keys([{_Key, _Value, SubTree} | RestTree]) ->
- get_leaf_keys(SubTree) ++ get_leaf_keys(RestTree).
-
count_leafs([]) ->
0;
-count_leafs([{_Key, _Value, []} | RestTree]) ->
- 1 + count_leafs(RestTree);
-count_leafs([{_Key, _Value, SubTree} | RestTree]) ->
- count_leafs(SubTree) + count_leafs(RestTree).
+count_leafs([{_Pos,Tree}|Rest]) ->
+ count_leafs_simple([Tree]) + count_leafs(Rest).
+count_leafs_simple([]) ->
+ 0;
+count_leafs_simple([{_Key, _Value, []} | RestTree]) ->
+ 1 + count_leafs_simple(RestTree);
+count_leafs_simple([{_Key, _Value, SubTree} | RestTree]) ->
+ count_leafs_simple(SubTree) + count_leafs_simple(RestTree).
+
map(_Fun, []) ->
[];
-map(Fun, [{Key, Value, SubTree} | RestTree]) ->
- Value2 = Fun(Key, Value),
- [{Key, Value2, map(Fun, SubTree)} | map(Fun, RestTree)].
+map(Fun, [{Pos, Tree}|Rest]) ->
+ [NewTree] = map_simple(Fun, Pos, [Tree]),
+ [{Pos, NewTree} | map(Fun, Rest)].
+
+map_simple(_Fun, _Pos, []) ->
+ [];
+map_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) ->
+ Value2 = Fun({Pos, Key}, Value),
+ [{Key, Value2, map_simple(Fun, Pos + 1, SubTree)} | map_simple(Fun, Pos, RestTree)].
+
+
+stem(Trees, Limit) ->
+ % flatten each branch in a tree into a tree path
+ Paths = get_all_leafs_full(Trees),
+
+ Paths2 = [{Pos, lists:sublist(Path, Limit)} || {Pos, Path} <- Paths],
+
+ % convert paths back to trees
+ lists:foldl(
+ fun({PathPos, Path},TreeAcc) ->
+ [SingleTree] = lists:foldl(
+ fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path),
+ {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]),
+ NewTrees
+ end, [], Paths2).
+test() ->
+ EmptyTree = [],
+ One = [{0, {"1","foo",[]}}],
+ TwoSibs = [{0, {"1","foo",[]}},
+ {0, {"2","foo",[]}}],
+ OneChild = [{0, {"1","foo",[{"1a", "bar", []}]}}],
+ TwoChild = [{0, {"1","foo", [{"1a", "bar", [{"1aa", "bar", []}]}]}}],
+ TwoChildSibs = [{0, {"1","foo", [{"1a", "bar", []},
+ {"1b", "bar", []}]}}],
+ Stemmed1a = [{1, {"1a", "bar", [{"1aa", "bar", []}]}}],
+ Stemmed1aa = [{2, {"1aa", "bar", []}}],
+
+ {EmptyTree, no_conflicts} = merge(EmptyTree, EmptyTree),
+ {One, no_conflicts} = merge(EmptyTree, One),
+ {One, no_conflicts} = merge(One, EmptyTree),
+ {TwoSibs, no_conflicts} = merge(One, TwoSibs),
+ {One, no_conflicts} = merge(One, One),
+ {TwoChild, no_conflicts} = merge(TwoChild, TwoChild),
+ {TwoChildSibs, no_conflicts} = merge(TwoChildSibs, TwoChildSibs),
+ {TwoChild, no_conflicts} = merge(TwoChild, Stemmed1aa),
+ {TwoChild, no_conflicts} = merge(TwoChild, Stemmed1a),
+ {Stemmed1a, no_conflicts} = merge(Stemmed1a, Stemmed1aa),
+ Expect1 = OneChild ++ Stemmed1aa,
+ {Expect1, conflicts} = merge(OneChild, Stemmed1aa),
+ {TwoChild, no_conflicts} = merge(Expect1, TwoChild),
+
+ []=find_missing(TwoChildSibs, [{0,"1"}, {1,"1a"}]),
+ [{0, "10"}, {100, "x"}]=find_missing(TwoChildSibs, [{0,"1"}, {0, "10"}, {1,"1a"}, {100, "x"}]),
+ [{0, "1"}, {100, "x"}]=find_missing(Stemmed1a, [{0,"1"}, {1,"1a"}, {100, "x"}]),
+ [{0, "1"}, {1,"1a"}, {100, "x"}]=find_missing(Stemmed1aa, [{0,"1"}, {1,"1a"}, {100, "x"}]),
+
+ {TwoChildSibs, []} = remove_leafs(TwoChildSibs, []),
+ {TwoChildSibs, []} = remove_leafs(TwoChildSibs, [{0, "1"}]),
+ {OneChild, [{1, "1b"}]} = remove_leafs(TwoChildSibs, [{1, "1b"}]),
+ {[], [{1, "1b"},{1, "1a"}]} = remove_leafs(TwoChildSibs, [{1, "1a"}, {1, "1b"}]),
+ {Stemmed1a, []} = remove_leafs(Stemmed1a, [{1, "1a"}]),
+ {[], [{2, "1aa"}]} = remove_leafs(Stemmed1a, [{2, "1aa"}]),
+ {TwoChildSibs, []} = remove_leafs(TwoChildSibs, []),
+
+ {[],[{0,"x"}]} = get_key_leafs(TwoChildSibs, [{0, "x"}]),
+
+ {[{"bar", {1, ["1a","1"]}}],[]} = get_key_leafs(TwoChildSibs, [{1, "1a"}]),
+ {[{"bar", {1, ["1a","1"]}},{"bar",{1, ["1b","1"]}}],[]} = get_key_leafs(TwoChildSibs, [{0, "1"}]),
+
+ {[{"foo", {0, ["1"]}}],[]} = get(TwoChildSibs, [{0, "1"}]),
+ {[{"bar", {1, ["1a", "1"]}}],[]} = get(TwoChildSibs, [{1, "1a"}]),
+
+ {[{0,[{"1", "foo"}]}],[]} = get_full_key_paths(TwoChildSibs, [{0, "1"}]),
+ {[{1,[{"1a", "bar"},{"1", "foo"}]}],[]} = get_full_key_paths(TwoChildSibs, [{1, "1a"}]),
+
+ [{2, [{"1aa", "bar"},{"1a", "bar"}]}] = get_all_leafs_full(Stemmed1a),
+ [{1, [{"1a", "bar"},{"1", "foo"}]}, {1, [{"1b", "bar"},{"1", "foo"}]}] = get_all_leafs_full(TwoChildSibs),
+
+ [{"bar", {2, ["1aa","1a"]}}] = get_all_leafs(Stemmed1a),
+ [{"bar", {1, ["1a", "1"]}}, {"bar", {1, ["1b","1"]}}] = get_all_leafs(TwoChildSibs),
+
+ 0 = count_leafs(EmptyTree),
+ 1 = count_leafs(One),
+ 2 = count_leafs(TwoChildSibs),
+ 1 = count_leafs(Stemmed1a),
+
+ TwoChild = stem(TwoChild, 3),
+ Stemmed1a = stem(TwoChild, 2),
+ Stemmed1aa = stem(TwoChild, 1),
+ ok.
+
+
+
+
+
+
+
+
+
+
+ \ No newline at end of file
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 89d40be3..3647f6db 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -15,11 +15,11 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([replicate/3]).
+-export([replicate/2]).
-include_lib("couch_db.hrl").
-%% @spec replicate(Source::binary(), Target::binary(), Options::proplist()) ->
+%% @spec replicate(Source::binary(), Target::binary()) ->
%% {ok, Stats} | {error, Reason}
%% @doc Triggers a replication. Stats is a JSON Object with the following
%% keys: session_id (UUID), source_last_seq (integer), and history (array).
@@ -30,26 +30,29 @@
%% The supervisor will try to restart the replication in case of any error
%% other than shutdown. Just call this function again to listen for the
%% result of the retry.
-replicate(Source, Target, Options) ->
- Id = <<Source/binary, ":", Target/binary>>,
- Args = [?MODULE, [Source,Target,Options], []],
+replicate(Source, Target) ->
- Replicator = {Id,
+ {ok, HostName} = inet:gethostname(),
+ RepId = couch_util:to_hex(
+ erlang:md5(term_to_binary([HostName, Source, Target]))),
+ Args = [?MODULE, [RepId, Source,Target], []],
+
+ Replicator = {RepId,
{gen_server, start_link, Args},
transient,
- 10000,
+ 1,
worker,
[?MODULE]
},
Server = case supervisor:start_child(couch_rep_sup, Replicator) of
{ok, Pid} ->
- ?LOG_INFO("starting new replication ~p at ~p", [Id, Pid]),
+ ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
Pid;
{error, already_present} ->
- case supervisor:restart_child(couch_rep_sup, Id) of
+ case supervisor:restart_child(couch_rep_sup, RepId) of
{ok, Pid} ->
- ?LOG_INFO("starting replication ~p at ~p", [Id, Pid]),
+ ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
Pid;
{error, running} ->
%% this error occurs if multiple replicators are racing
@@ -57,16 +60,16 @@ replicate(Source, Target, Options) ->
%% the Pid by calling start_child again.
{error, {already_started, Pid}} =
supervisor:start_child(couch_rep_sup, Replicator),
- ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+ ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
Pid
end;
{error, {already_started, Pid}} ->
- ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+ ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
Pid
end,
case gen_server:call(Server, get_result, infinity) of
- retry -> replicate(Source, Target, Options);
+ retry -> replicate(Source, Target);
Else -> Else
end.
@@ -79,6 +82,7 @@ replicate(Source, Target, Options) ->
headers
}).
+
-record(state, {
context,
current_seq,
@@ -90,18 +94,14 @@ replicate(Source, Target, Options) ->
listeners = []
}).
-init([Source, Target, Options]) ->
+
+init([RepId, Source, Target]) ->
process_flag(trap_exit, true),
- {ok, DbSrc} =
- open_db(Source, proplists:get_value(source_options, Options, [])),
- {ok, DbTgt} =
- open_db(Target, proplists:get_value(target_options, Options, [])),
+ {ok, DbSrc, SrcName} = open_db(Source),
+ {ok, DbTgt, TgtName} = open_db(Target),
- {ok, Host} = inet:gethostname(),
- HostBin = list_to_binary(Host),
- DocKey = <<?LOCAL_DOC_PREFIX, HostBin/binary, ":", Source/binary, ":",
- Target/binary>>,
+ DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
{ok, InfoSrc} = get_db_info(DbSrc),
{ok, InfoTgt} = get_db_info(DbTgt),
@@ -110,49 +110,49 @@ init([Source, Target, Options]) ->
SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc),
TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt),
- case proplists:get_value(full, Options, false)
- orelse proplists:get_value("full", Options, false) of
+ RepRecDocSrc =
+ case open_doc(DbSrc, DocKey, []) of
+ {ok, SrcDoc} ->
+ ?LOG_DEBUG("Found existing replication record on source", []),
+ SrcDoc;
+ _ -> #doc{id=DocKey}
+ end,
+
+ RepRecDocTgt =
+ case open_doc(DbTgt, DocKey, []) of
+ {ok, TgtDoc} ->
+ ?LOG_DEBUG("Found existing replication record on target", []),
+ TgtDoc;
+ _ -> #doc{id=DocKey}
+ end,
+
+ #doc{body={RepRecProps}} = RepRecDocSrc,
+ #doc{body={RepRecPropsTgt}} = RepRecDocTgt,
+
+ case proplists:get_value(<<"session_id">>, RepRecProps) ==
+ proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
true ->
- RepRecSrc = RepRecTgt = #doc{id=DocKey};
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
+ OldHistory = proplists:get_value(<<"history">>, RepRecProps, []);
false ->
- RepRecSrc = case open_doc(DbSrc, DocKey, []) of
- {ok, SrcDoc} ->
- ?LOG_DEBUG("Found existing replication record on source", []),
- SrcDoc;
- _ -> #doc{id=DocKey}
- end,
-
- RepRecTgt = case open_doc(DbTgt, DocKey, []) of
- {ok, TgtDoc} ->
- ?LOG_DEBUG("Found existing replication record on target", []),
- TgtDoc;
- _ -> #doc{id=DocKey}
- end
- end,
-
- #doc{body={OldRepHistoryProps}} = RepRecSrc,
- #doc{body={OldRepHistoryPropsTrg}} = RepRecTgt,
-
- SeqNum = case OldRepHistoryProps == OldRepHistoryPropsTrg of
- true ->
- % if the records are identical, then we have a valid replication history
- proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0);
- false ->
- ?LOG_INFO("Replication records differ. "
+ ?LOG_INFO("Replication records differ. "
"Performing full replication instead of incremental.", []),
- ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
- [OldRepHistoryProps, OldRepHistoryPropsTrg]),
- 0
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ OldSeqNum = 0,
+ OldHistory = []
end,
Context = [
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
+ {start_seq, OldSeqNum},
+ {history, OldHistory},
{rep_starttime, ReplicationStartTime},
{src_starttime, SrcInstanceStartTime},
{tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecSrc},
- {tgt_record, RepRecTgt}
+ {src_record, RepRecDocSrc},
+ {tgt_record, RepRecDocTgt}
],
Stats = ets:new(replication_stats, [set, private]),
@@ -160,16 +160,17 @@ init([Source, Target, Options]) ->
ets:insert(Stats, {missing_revs, 0}),
ets:insert(Stats, {docs_read, 0}),
ets:insert(Stats, {docs_written, 0}),
+ ets:insert(Stats, {doc_write_failures, 0}),
- couch_task_status:add_task("Replication", <<Source/binary, " -> ",
- Target/binary>>, "Starting"),
+ couch_task_status:add_task("Replication", <<SrcName/binary, " -> ",
+ TgtName/binary>>, "Starting"),
Parent = self(),
- Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{SeqNum,0}) end),
+ Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end),
State = #state{
context = Context,
- current_seq = SeqNum,
+ current_seq = OldSeqNum,
enum_pid = Pid,
source = DbSrc,
target = DbTgt,
@@ -178,7 +179,6 @@ init([Source, Target, Options]) ->
{ok, State}.
-
handle_call(get_result, From, #state{listeners=L} = State) ->
{noreply, State#state{listeners=[From|L]}};
@@ -191,7 +191,7 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State)
target = Target,
stats = Stats
} = State,
-
+
ets:update_counter(Stats, missing_revs, length(Revs)),
%% get document(s)
@@ -203,8 +203,11 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State)
{NewBuffer, NewContext} = case couch_util:should_flush() of
true ->
Docs2 = lists:flatten([Docs|Buffer]),
- ok = update_docs(Target, Docs2, [], false),
- ets:update_counter(Stats, docs_written, length(Docs2)),
+ {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes),
+ dump_update_errors(Errors),
+ ets:update_counter(Stats, doc_write_failures, length(Errors)),
+ ets:update_counter(Stats, docs_written, length(Docs2) -
+ length(Errors)),
{ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
{[], Ctxt};
false ->
@@ -255,8 +258,11 @@ terminate(normal, State) ->
stats = Stats
} = State,
- ok = update_docs(Target, lists:flatten(Buffer), [], false),
- ets:update_counter(Stats, docs_written, lists:flatlength(Buffer)),
+ {ok, Errors} = update_docs(Target, lists:flatten(Buffer), [], replicated_changes),
+ dump_update_errors(Errors),
+ ets:update_counter(Stats, doc_write_failures, length(Errors)),
+ ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
+ length(Errors)),
couch_task_status:update("Finishing"),
@@ -264,9 +270,12 @@ terminate(normal, State) ->
ets:delete(Stats),
close_db(Target),
- %% reply to original requester
- [Original|Rest] = Listeners,
- gen_server:reply(Original, {ok, NewRepHistory}),
+ case Listeners of
+ [Original|Rest] ->
+ %% reply to original requester
+ gen_server:reply(Original, {ok, NewRepHistory});
+ Rest -> ok
+ end,
%% maybe trigger another replication. If this replicator uses a local
%% source Db, changes to that Db since we started will not be included in
@@ -304,6 +313,16 @@ code_change(_OldVsn, State, _Extra) ->
%% internal functions
%%=============================================================================
+
+% we should probably write these to a special replication log
+% or have a callback where the caller decides what to do with replication
+% errors.
+dump_update_errors([]) -> ok;
+dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
+ ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p",
+ [Id, couch_doc:rev_to_str(Rev), Error]),
+ dump_update_errors(Rest).
+
attachment_loop(ReqId) ->
couch_util:should_flush(),
receive
@@ -354,6 +373,16 @@ attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) ->
end,
{Name, {Type, {RcvFun, Length}}}.
+
+open_db({remote, Url, Headers})->
+ {ok, #http_db{uri=?b2l(Url), headers=Headers}, Url};
+open_db({local, DbName, UserCtx})->
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} -> {ok, Db, DbName};
+ Error -> Error
+ end.
+
+
close_db(#http_db{})->
ok;
close_db(Db)->
@@ -362,27 +391,38 @@ close_db(Db)->
do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
[
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
+ {start_seq, StartSeqNum},
+ {history, OldHistory},
{rep_starttime, ReplicationStartTime},
{src_starttime, SrcInstanceStartTime},
{tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecSrc},
- {tgt_record, RepRecTgt}
+ {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc},
+ {tgt_record, RepRecDocTgt}
] = Context,
- NewHistory = case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+ case NewSeqNum == StartSeqNum andalso OldHistory /= [] of
true ->
% nothing changed, don't record results
- {OldRepHistoryProps};
+ {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context};
false ->
+ % something changed, record results for incremental replication,
+
% commit changes to both src and tgt. The src because if changes
- % we replicated are lost, we'll record the a seq number of ahead
- % of what was committed and therefore lose future changes with the
- % same seq nums.
- {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
+ % we replicated are lost, we'll record the a seq number ahead
+ % of what was committed. If those changes are lost and the seq number
+ % reverts to a previous committed value, we will skip future changes
+ % when new doc updates are given our already replicated seq nums.
+
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(fun() ->
+ ParentPid ! {self(), ensure_full_commit(Source)} end),
+
+ % commit tgt sync
{ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
+ receive {SrcCommitPid, {ok, SrcInstanceStartTime2}} -> ok end,
+
RecordSeqNum =
if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
TgtInstanceStartTime2 == TgtInstanceStartTime ->
@@ -391,60 +431,57 @@ do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
?LOG_INFO("A server has restarted sinced replication start. "
"Not recording the new sequence number to ensure the "
"replication is redone and documents reexamined.", []),
- SeqNum
+ StartSeqNum
end,
- %% format replication history
- JsonStats = [
+ NewHistoryEntry = {
+ [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, StartSeqNum},
+ {<<"end_last_seq">>, NewSeqNum},
{<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
{<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
{<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+ {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)}
+ ]},
+ % limit history to 50 entries
+ HistEntries =lists:sublist([NewHistoryEntry | OldHistory], 50),
+
+ NewRepHistory =
+ {[{<<"session_id">>, couch_util:new_uuid()},
+ {<<"source_last_seq">>, RecordSeqNum},
+ {<<"history">>, HistEntries}]},
+
+ {ok, {SrcRevPos,SrcRevId}} = update_doc(Source,
+ RepRecDocSrc#doc{body=NewRepHistory}, []),
+ {ok, {TgtRevPos,TgtRevId}} = update_doc(Target,
+ RepRecDocTgt#doc{body=NewRepHistory}, []),
+
+ NewContext = [
+ {start_seq, StartSeqNum},
+ {history, OldHistory},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime},
+ {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}},
+ {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}}
],
-
- HistEntries =[
- {
- [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, SeqNum},
- {<<"end_last_seq">>, NewSeqNum} | JsonStats]}
- | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
- % something changed, record results
- {[
- {<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, lists:sublist(HistEntries, 50)}
- ]}
- end,
- %% update local documents
- RepRecSrc = proplists:get_value(src_record, Context),
- RepRecTgt = proplists:get_value(tgt_record, Context),
- {ok, TgtRev} = update_local_doc(Target, RepRecTgt#doc{body=NewHistory}, []),
- {ok, SrcRev} = update_local_doc(Source, RepRecSrc#doc{body=NewHistory}, []),
+ {ok, NewRepHistory, NewContext}
- NewContext = [
- {start_seq, SeqNum},
- {history, OldRepHistoryProps},
- {rep_starttime, ReplicationStartTime},
- {src_starttime, SrcInstanceStartTime},
- {tgt_starttime, TgtInstanceStartTime},
- {src_record, RepRecSrc#doc{revs=[SrcRev]}},
- {tgt_record, RepRecTgt#doc{revs=[TgtRev]}}
- ],
-
- {ok, NewHistory, NewContext}.
+ end.
do_http_request(Url, Action, Headers) ->
do_http_request(Url, Action, Headers, []).
do_http_request(Url, Action, Headers, JsonBody) ->
- do_http_request(?b2l(?l2b(Url)), Action, Headers, JsonBody, 10).
+ do_http_request(Url, Action, Headers, JsonBody, 10).
do_http_request(Url, Action, _Headers, _JsonBody, 0) ->
?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
[Action, Url]),
- exit({http_request_failed, ?l2b(Url)});
+ exit({http_request_failed, Url});
do_http_request(Url, Action, Headers, JsonBody, Retries) ->
?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
Body =
@@ -498,7 +535,6 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
[] ->
gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity);
DocInfoList ->
- % UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
SrcRevsList = lists:map(fun(SrcDocInfo) ->
#doc_info{id=Id,
rev=Rev,
@@ -521,13 +557,8 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
end.
-fix_url(UrlBin) ->
- Url = binary_to_list(UrlBin),
- case lists:last(Url) of
- $/ -> Url;
- _ -> Url ++ "/"
- end.
+
get_db_info(#http_db{uri=DbUrl, headers=Headers}) ->
{DbProps} = do_http_request(DbUrl, get, Headers),
{ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]};
@@ -542,12 +573,12 @@ get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) ->
{RowValueProps} = proplists:get_value(<<"value">>, RowInfoList),
#doc_info{
id=proplists:get_value(<<"id">>, RowInfoList),
- rev=proplists:get_value(<<"rev">>, RowValueProps),
+ rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)),
update_seq = proplists:get_value(<<"key">>, RowInfoList),
conflict_revs =
- proplists:get_value(<<"conflicts">>, RowValueProps, []),
+ couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, [])),
deleted_conflict_revs =
- proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []),
+ couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, [])),
deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)
}
end, proplists:get_value(<<"rows">>, Results));
@@ -561,25 +592,18 @@ get_doc_info_list(DbSource, StartSeq) ->
lists:reverse(DocInfoList).
get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) ->
+ DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList],
{ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers,
- {DocIdRevsList}),
- {MissingRevs} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
- {ok, MissingRevs};
+ {DocIdRevsList2}),
+ {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
+ DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList],
+ {ok, DocMissingRevsList2};
get_missing_revs(Db, DocId) ->
couch_db:get_missing_revs(Db, DocId).
-open_http_db(UrlBin, Options) ->
- Headers = proplists:get_value(headers, Options, {[]}),
- {ok, #http_db{uri=fix_url(UrlBin), headers=Headers}}.
-
-open_db(<<"http://", _/binary>>=Url, Options)->
- open_http_db(Url, Options);
-open_db(<<"https://", _/binary>>=Url, Options)->
- open_http_db(Url, Options);
-open_db(DbName, Options)->
- couch_db:open(DbName, Options).
-
-open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) ->
+
+open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, Options) ->
+ [] = Options,
case do_http_request(DbUrl ++ url_encode(DocId), get, Headers) of
{[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
{couch_util:to_existing_atom(ErrId), Reason};
@@ -589,7 +613,9 @@ open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) ->
open_doc(Db, DocId, Options) ->
couch_db:open_doc(Db, DocId, Options).
-open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) ->
+open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0,
+ [latest]) ->
+ Revs = couch_doc:rev_to_strs(Revs0),
BaseUrl = DbUrl ++ url_encode(DocId) ++ "?revs=true&latest=true",
%% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests
@@ -612,39 +638,52 @@ open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) ->
lists:flatten(?JSON_ENCODE(lists:reverse(Rest))), get, Headers)
end,
- Results =
- lists:map(fun({[{<<"missing">>, Rev}]}) ->
- {{not_found, missing}, Rev};
- ({[{<<"ok">>, JsonDoc}]}) ->
+ Results =
+ lists:map(
+ fun({[{<<"missing">>, Rev}]}) ->
+ {{not_found, missing}, couch_doc:parse_rev(Rev)};
+ ({[{<<"ok">>, JsonDoc}]}) ->
#doc{id=Id, attachments=Attach} = Doc = couch_doc:from_json_obj(JsonDoc),
Attach2 = [attachment_stub_converter(DbS,Id,A) || A <- Attach],
{ok, Doc#doc{attachments=Attach2}}
- end, JsonResults),
+ end, JsonResults),
{ok, Results};
open_doc_revs(Db, DocId, Revs, Options) ->
couch_db:open_doc_revs(Db, DocId, Revs, Options).
-update_docs(_, [], _, _) ->
- ok;
-update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], NewEdits) ->
- JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
- {Returned} =
- do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
- {[{new_edits, NewEdits}, {docs, JsonDocs}]}),
- true = proplists:get_value(<<"ok">>, Returned),
- ok;
-update_docs(Db, Docs, Options, NewEdits) ->
- couch_db:update_docs(Db, Docs, Options, NewEdits).
-update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) ->
+update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) ->
+ [] = Options,
Url = DbUrl ++ url_encode(DocId),
{ResponseMembers} = do_http_request(Url, put, Headers,
- couch_doc:to_json_obj(Doc, [revs,attachments])),
- RevId = proplists:get_value(<<"rev">>, ResponseMembers),
- {ok, RevId};
-update_local_doc(Db, Doc, Options) ->
+ couch_doc:to_json_obj(Doc, [attachments])),
+ Rev = proplists:get_value(<<"rev">>, ResponseMembers),
+ {ok, couch_doc:parse_rev(Rev)};
+update_doc(Db, Doc, Options) ->
couch_db:update_doc(Db, Doc, Options).
+update_docs(_, [], _, _) ->
+ {ok, []};
+update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], replicated_changes) ->
+ JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+ ErrorsJson =
+ do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
+ {[{new_edits, false}, {docs, JsonDocs}]}),
+ ErrorsList =
+ lists:map(
+ fun({Props}) ->
+ Id = proplists:get_value(<<"id">>, Props),
+ Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
+ ErrId = couch_util:to_existing_atom(
+ proplists:get_value(<<"error">>, Props)),
+ Reason = proplists:get_value(<<"reason">>, Props),
+ Error = {ErrId, Reason},
+ {{Id, Rev}, Error}
+ end, ErrorsJson),
+ {ok, ErrorsList};
+update_docs(Db, Docs, Options, UpdateType) ->
+ couch_db:update_docs(Db, Docs, Options, UpdateType).
+
up_to_date(#http_db{}, _Seq) ->
true;
up_to_date(Source, Seq) ->
diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl
index 21d6eb4c..bd377d80 100644
--- a/src/couchdb/couch_util.erl
+++ b/src/couchdb/couch_util.erl
@@ -13,7 +13,7 @@
-module(couch_util).
-export([start_driver/1]).
--export([should_flush/0, should_flush/1, to_existing_atom/1]).
+-export([should_flush/0, should_flush/1, to_existing_atom/1, to_binary/1]).
-export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]).
-export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]).
-export([encodeBase64/1, decodeBase64/1, to_hex/1,parse_term/1,dict_find/3]).
@@ -57,6 +57,19 @@ to_digit(N) when N < 10 -> $0 + N;
to_digit(N) -> $a + N-10.
+to_binary(V) when is_binary(V) ->
+ V;
+to_binary(V) when is_list(V) ->
+ try list_to_binary(V)
+ catch
+ _ -> list_to_binary(io_lib:format("~p", [V]))
+ end;
+to_binary(V) when is_atom(V) ->
+ list_to_binary(atom_to_list(V));
+to_binary(V) ->
+ list_to_binary(io_lib:format("~p", [V])).
+
+
parse_term(Bin) when is_binary(Bin)->
parse_term(binary_to_list(Bin));
parse_term(List) ->