diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_db.erl | 395 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 24 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 147 | ||||
-rw-r--r-- | src/couchdb/couch_doc.erl | 185 | ||||
-rw-r--r-- | src/couchdb/couch_httpd.erl | 120 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 285 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_misc_handlers.erl | 55 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_show.erl | 15 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_view.erl | 12 | ||||
-rw-r--r-- | src/couchdb/couch_key_tree.erl | 375 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 365 | ||||
-rw-r--r-- | src/couchdb/couch_util.erl | 15 |
12 files changed, 1226 insertions, 767 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 92026ff1..46de8745 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -17,6 +17,7 @@ -export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]). -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). +-export([set_revs_limit/2,get_revs_limit/1]). -export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]). -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). @@ -184,22 +185,42 @@ get_db_info(Db) -> ], {ok, InfoList}. +check_is_admin(#db{admins=Admins, user_ctx=#user_ctx{name=Name,roles=Roles}}) -> + DbAdmins = [<<"_admin">> | Admins], + case DbAdmins -- [Name | Roles] of + DbAdmins -> % same list, not an admin + throw({unauthorized, <<"You are not a db or server admin.">>}); + _ -> + ok + end. + get_admins(#db{admins=Admins}) -> Admins. -set_admins(#db{update_pid=UpdatePid,user_ctx=Ctx}, - Admins) when is_list(Admins) -> - case gen_server:call(UpdatePid, {set_admins, Admins, Ctx}, infinity) of - ok -> ok; - Error -> throw(Error) - end. +set_admins(#db{update_pid=Pid}=Db, Admins) when is_list(Admins) -> + check_is_admin(Db), + gen_server:call(Pid, {set_admins, Admins}, infinity). + + +get_revs_limit(#db{revs_limit=Limit}) -> + Limit. + +set_revs_limit(#db{update_pid=Pid}=Db, Limit) when Limit > 0 -> + check_is_admin(Db), + gen_server:call(Pid, {set_revs_limit, Limit}, infinity); +set_revs_limit(_Db, _Limit) -> + throw(invalid_revs_limit). name(#db{name=Name}) -> Name. update_doc(Db, Doc, Options) -> - {ok, [NewRev]} = update_docs(Db, [Doc], Options), - {ok, NewRev}. + case update_docs(Db, [Doc], Options) of + {ok, [{ok, NewRev}]} -> + {ok, NewRev}; + {ok, [Error]} -> + throw(Error) + end. update_docs(Db, Docs) -> update_docs(Db, Docs, []). @@ -227,146 +248,283 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> validate_doc_update(#db{user_ctx=UserCtx, admins=Admins}, - #doc{id= <<"_design/",_/binary>>}=Doc, _GetDiskDocFun) -> + #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) -> UserNames = [UserCtx#user_ctx.name | UserCtx#user_ctx.roles], % if the user is a server admin or db admin, allow the save case length(UserNames -- [<<"_admin">> | Admins]) == length(UserNames) of true -> % not an admin - throw({unauthorized, <<"You are not a server or database admin.">>}); + {unauthorized, <<"You are not a server or database admin.">>}; false -> - Doc + ok end; -validate_doc_update(#db{validate_doc_funs=[]}, Doc, _GetDiskDocFun) -> - Doc; -validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}=Doc, _GetDiskDocFun) -> - Doc; +validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) -> + ok; +validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) -> + ok; validate_doc_update(#db{name=DbName,user_ctx=Ctx}=Db, Doc, GetDiskDocFun) -> DiskDoc = GetDiskDocFun(), JsonCtx = {[{<<"db">>, DbName}, {<<"name">>,Ctx#user_ctx.name}, {<<"roles">>,Ctx#user_ctx.roles}]}, - [case Fun(Doc, DiskDoc, JsonCtx) of - ok -> ok; - Error -> throw(Error) - end || Fun <- Db#db.validate_doc_funs], - Doc. + try [case Fun(Doc, DiskDoc, JsonCtx) of + ok -> ok; + Error -> throw(Error) + end || Fun <- Db#db.validate_doc_funs], + ok + catch + throw:Error -> + Error + end. -prep_and_validate_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, +prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, [_NewRev|PrevRevs]}}=Doc, OldFullDocInfo, LeafRevsDict) -> case PrevRevs of [PrevRev|_] -> - case dict:find(PrevRev, LeafRevsDict) of - {ok, {Deleted, Sp, DiskRevs}} -> - Doc2 = Doc#doc{revs=[NewRev|DiskRevs]}, - case couch_doc:has_stubs(Doc2) of + case dict:find({RevStart-1, PrevRev}, LeafRevsDict) of + {ok, {Deleted, DiskSp, DiskRevs}} -> + case couch_doc:has_stubs(Doc) of true -> - DiskDoc = make_doc(Db, Id, Deleted, Sp, DiskRevs), - Doc3 = couch_doc:merge_stubs(Doc2, DiskDoc), - validate_doc_update(Db, Doc3, fun() -> DiskDoc end); + DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs), + Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), + {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2}; false -> - LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,Sp,DiskRevs) end, - validate_doc_update(Db, Doc2, LoadDiskDoc) + LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end, + {validate_doc_update(Db, Doc, LoadDiskDoc), Doc} end; error -> - throw(conflict) + {conflict, Doc} end; [] -> % new doc, and we have existing revs. if OldFullDocInfo#full_doc_info.deleted -> % existing docs are deletions - validate_doc_update(Db, Doc, nil); + {validate_doc_update(Db, Doc, fun() -> nil end), Doc}; true -> - throw(conflict) + {conflict, Doc} end end. + + +prep_and_validate_updates(_Db, [], [], AccPrepped, AccFatalErrors) -> + {AccPrepped, AccFatalErrors}; +prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], AccPrepped, AccErrors) -> + [#doc{id=Id}|_]=DocBucket, + % no existing revs are known, + {PreppedBucket, AccErrors3} = lists:foldl( + fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> + case Revs of + {Pos, [NewRev,_OldRev|_]} -> + % old revs specified but none exist, a conflict + {AccBucket, [{{Id, {Pos, NewRev}}, conflict} | AccErrors2]}; + {Pos, [NewRev]} -> + case validate_doc_update(Db, Doc, fun() -> nil end) of + ok -> + {[Doc | AccBucket], AccErrors2}; + Error -> + {AccBucket, [{{Id, {Pos, NewRev}}, Error} | AccErrors2]} + end + end + end, + {[], AccErrors}, DocBucket), + + prep_and_validate_updates(Db, RestBuckets, RestLookups, + [PreppedBucket | AccPrepped], AccErrors3); +prep_and_validate_updates(Db, [DocBucket|RestBuckets], + [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups], + AccPrepped, AccErrors) -> + Leafs = couch_key_tree:get_all_leafs(OldRevTree), + LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} || + {{Deleted, Sp}, {Start, [RevId|_]}=Revs} <- Leafs]), + {PreppedBucket, AccErrors3} = lists:foldl( + fun(Doc, {Docs2Acc, AccErrors2}) -> + case prep_and_validate_update(Db, Doc, OldFullDocInfo, + LeafRevsDict) of + {ok, Doc} -> + {[Doc | Docs2Acc], AccErrors2}; + {Error, #doc{id=Id,revs={Pos, [NewRev|_]}}} -> + % Record the error + {Docs2Acc, [{{Id, {Pos, NewRev}}, Error} |AccErrors2]} + end + end, + {[], AccErrors}, DocBucket), + prep_and_validate_updates(Db, RestBuckets, RestLookups, [PreppedBucket | AccPrepped], AccErrors3). + + update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> - update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, true). + update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, interactive_edit). -update_docs(Db, Docs, Options, false) -> + +prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) -> + Errors2 = [{{Id, {Pos, Rev}}, Error} || + {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors], + {lists:reverse(AccPrepped), lists:reverse(Errors2)}; +prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) -> + case OldInfo of + not_found -> + {ValidatedBucket, AccErrors3} = lists:foldl( + fun(Doc, {AccPrepped2, AccErrors2}) -> + case validate_doc_update(Db, Doc, fun() -> nil end) of + ok -> + {[Doc | AccPrepped2], AccErrors2}; + Error -> + {AccPrepped2, [{Doc, Error} | AccErrors2]} + end + end, + {[], AccErrors}, Bucket), + prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3); + {ok, #full_doc_info{rev_tree=OldTree}} -> + NewRevTree = lists:foldl( + fun(NewDoc, AccTree) -> + {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]), + NewTree + end, + OldTree, Bucket), + Leafs = couch_key_tree:get_all_leafs_full(NewRevTree), + LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]), + {ValidatedBucket, AccErrors3} = + lists:foldl( + fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) -> + case dict:find({Pos, RevId}, LeafRevsFullDict) of + {ok, {Start, Path}} -> + % our unflushed doc is a leaf node. Go back on the path + % to find the previous rev that's on disk. + PrevRevResult = + case couch_doc:has_stubs(Doc) of + true -> + [_PrevRevFull | [PrevRevFull | _]=PrevPath] = Path, + case PrevRevFull of + {_RevId, ?REV_MISSING} -> + conflict; + {RevId, {IsDel, DiskSp}} -> + DiskDoc = make_doc(Db, Id, IsDel, DiskSp, PrevPath), + Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), + {ok, Doc2, fun() -> DiskDoc end} + end; + false -> + {ok, Doc, + fun() -> + make_first_doc_on_disk(Db,Id,Start-1, tl(Path)) + end} + end, + case PrevRevResult of + {ok, NewDoc, LoadPrevRevFun} -> + case validate_doc_update(Db, NewDoc, LoadPrevRevFun) of + ok -> + {[NewDoc | AccValidated], AccErrors2}; + Error -> + {AccValidated, [{NewDoc, Error} | AccErrors2]} + end; + Error -> + {AccValidated, [{Doc, Error} | AccErrors2]} + end; + _ -> + % this doc isn't a leaf or already exists in the tree. + % ignore but consider it a success. + {AccValidated, AccErrors2} + end + end, + {[], AccErrors}, Bucket), + prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3) + end. + +update_docs(Db, Docs, Options, replicated_changes) -> couch_stats_collector:increment({couchdb, database_writes}), DocBuckets = group_alike_docs(Docs), - Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], - ExistingDocs = get_full_doc_infos(Db, Ids), + case (Db#db.validate_doc_funs /= []) orelse + lists:any( + fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true; + (_) -> false + end, Docs) of + true -> + Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], + ExistingDocs = get_full_doc_infos(Db, Ids), - DocBuckets2 = lists:zipwith( - fun(Bucket, not_found) -> - [validate_doc_update(Db, Doc, fun()-> nil end) || Doc <- Bucket]; - (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}}) -> - NewTree = lists:foldl( - fun(Doc, RevTreeAcc) -> - couch_key_tree:merge(RevTreeAcc, doc_to_tree(Doc)) - end, - OldRevTree, Bucket), - Leafs = couch_key_tree:get_all_leafs_full(NewTree), - LeafRevsFullDict = dict:from_list( [{Rev, FullPath} || [{Rev, _}|_]=FullPath <- Leafs]), - lists:flatmap( - fun(#doc{revs=[Rev|_]}=Doc) -> - case dict:find(Rev, LeafRevsFullDict) of - {ok, [{Rev, #doc{id=Id}}|_]=Path} -> - % our unflushed doc is a leaf node. Go back on the path - % to find the previous rev that's on disk. - LoadPrevRev = fun() -> - make_first_doc_on_disk(Db, Id, Path) - end, - [validate_doc_update(Db, Doc, LoadPrevRev)]; - _ -> - % this doc isn't a leaf or is already exists in the tree. ignore - [] - end - end, Bucket) - end, - DocBuckets, ExistingDocs), - write_and_commit(Db, DocBuckets2, Options); + {DocBuckets2, DocErrors} = + prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []), + DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets + false -> + DocErrors = [], + DocBuckets3 = DocBuckets + end, + {ok, []} = write_and_commit(Db, DocBuckets3, [merge_conflicts | Options]), + {ok, DocErrors}; -update_docs(Db, Docs, Options, true) -> +update_docs(Db, Docs, Options, interactive_edit) -> couch_stats_collector:increment({couchdb, database_writes}), - + AllOrNothing = lists:member(all_or_nothing, Options), % go ahead and generate the new revision ids for the documents. Docs2 = lists:map( - fun(#doc{id=Id,revs=Revs}=Doc) -> + fun(#doc{id=Id,revs={Start, RevIds}}=Doc) -> case Id of <<?LOCAL_DOC_PREFIX, _/binary>> -> - Rev = case Revs of [] -> 0; [Rev0|_] -> list_to_integer(binary_to_list(Rev0)) end, - Doc#doc{revs=[list_to_binary(integer_to_list(Rev + 1))]}; + Rev = case RevIds of [] -> 0; [Rev0|_] -> list_to_integer(?b2l(Rev0)) end, + Doc#doc{revs={Start, [?l2b(integer_to_list(Rev + 1))]}}; _ -> - Doc#doc{revs=[list_to_binary(integer_to_list(couch_util:rand32())) | Revs]} + Doc#doc{revs={Start+1, [?l2b(integer_to_list(couch_util:rand32())) | RevIds]}} end end, Docs), DocBuckets = group_alike_docs(Docs2), - Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], - - % lookup the doc by id and get the most recent - - ExistingDocs = get_full_doc_infos(Db, Ids), - DocBuckets2 = lists:zipwith( - fun(Bucket, not_found) -> - % no existing revs on disk, make sure no old revs specified. - [throw(conflict) || #doc{revs=[_NewRev, _OldRev | _]} <- Bucket], - [validate_doc_update(Db, Doc, fun()-> nil end) || Doc <- Bucket]; - (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}) -> - Leafs = couch_key_tree:get_all_leafs(OldRevTree), - LeafRevsDict = dict:from_list([{Rev, {Deleted, Sp, Revs}} || {Rev, {Deleted, Sp}, Revs} <- Leafs]), - [prep_and_validate_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket] + case (Db#db.validate_doc_funs /= []) orelse + lists:any( + fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> + true; + (#doc{attachments=Atts}) -> + Atts /= [] + end, Docs) of + true -> + % lookup the doc by id and get the most recent + Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], + ExistingDocInfos = get_full_doc_infos(Db, Ids), + + {DocBucketsPrepped, Failures} = + case AllOrNothing of + true -> + prep_and_validate_replicated_updates(Db, DocBuckets, + ExistingDocInfos, [], []); + false -> + prep_and_validate_updates(Db, DocBuckets, ExistingDocInfos, [], []) end, - DocBuckets, ExistingDocs), - ok = write_and_commit(Db, DocBuckets2, [new_edits | Options]), - {ok, [NewRev ||#doc{revs=[NewRev|_]} <- Docs2]}. + + % strip out any empty buckets + DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped]; + false -> + Failures = [], + DocBuckets2 = DocBuckets + end, + if (AllOrNothing) and (Failures /= []) -> + {aborted, Failures}; + true -> + Options2 = if AllOrNothing -> [merge_conflicts]; + true -> [] end ++ Options, + {ok, CommitFailures} = write_and_commit(Db, DocBuckets2, Options2), + FailDict = dict:from_list(CommitFailures ++ Failures), + % the output for each is either {ok, NewRev} or Error + {ok, lists:map( + fun(#doc{id=Id,revs={Pos, [NewRevId|_]}}) -> + case dict:find({Id, {Pos, NewRevId}}, FailDict) of + {ok, Error} -> + Error; + error -> + {ok, {Pos, NewRevId}} + end + end, Docs2)} + end. % Returns the first available document on disk. Input list is a full rev path % for the doc. -make_first_doc_on_disk(_Db, _Id, []) -> +make_first_doc_on_disk(_Db, _Id, _Pos, []) -> nil; -make_first_doc_on_disk(Db, Id, [{_Rev, ?REV_MISSING}|RestPath]) -> - make_first_doc_on_disk(Db, Id, RestPath); -make_first_doc_on_disk(Db, Id, [{_Rev, {IsDel, Sp}} |_]=DocPath) -> +make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) -> + make_first_doc_on_disk(Db, Id, Pos - 1, RestPath); +make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp}} |_]=DocPath) -> Revs = [Rev || {Rev, _} <- DocPath], - make_doc(Db, Id, IsDel, Sp, Revs). + make_doc(Db, Id, IsDel, Sp, {Pos, Revs}). write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, @@ -374,20 +532,18 @@ write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, % flush unwritten binaries to disk. DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], case gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity) of - ok -> ok; + {ok, Conflicts} -> {ok, Conflicts}; retry -> % This can happen if the db file we wrote to was swapped out by - % compaction. Retry writing to the current file + % compaction. Retry by reopening the db and writing to the current file {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), DocBuckets3 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], % We only retry once close(Db2), case gen_server:call(UpdatePid, {update_docs, DocBuckets3, Options}, infinity) of - ok -> ok; - Else -> throw(Else) - end; - Else-> - throw(Else) + {ok, Conflicts} -> {ok, Conflicts}; + retry -> throw({update_error, compaction_retry}) + end end. @@ -506,7 +662,7 @@ enum_docs_reduce_to_count(Reds) -> {Count, _DelCount} = couch_btree:final_reduce( fun couch_db_updater:btree_by_id_reduce/2, Reds), Count. - + count_changes_since(Db, SinceSeq) -> {ok, Changes} = couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree, @@ -518,7 +674,7 @@ count_changes_since(Db, SinceSeq) -> end, ok), Changes. - + enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) -> couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). @@ -594,11 +750,11 @@ open_doc_revs_int(Db, IdRevs, Options) -> end end, FoundResults = - lists:map(fun({Rev, Value, FoundRevPath}) -> + lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) -> case Value of ?REV_MISSING -> % we have the rev in our list but know nothing about it - {{not_found, missing}, Rev}; + {{not_found, missing}, {Pos, Rev}}; {IsDeleted, SummaryPtr} -> {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)} end @@ -616,18 +772,18 @@ open_doc_revs_int(Db, IdRevs, Options) -> open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) -> case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of [{ok, {_, {Rev, BodyData}}}] -> - {ok, #doc{id=Id, revs=[list_to_binary(integer_to_list(Rev))], body=BodyData}}; + {ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}}; [not_found] -> {not_found, missing} end; -open_doc_int(Db, #doc_info{id=Id,rev=Rev,deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) -> - Doc = make_doc(Db, Id, IsDeleted, Sp, [Rev]), +open_doc_int(Db, #doc_info{id=Id,rev={Pos,RevId},deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) -> + Doc = make_doc(Db, Id, IsDeleted, Sp, {Pos,[RevId]}), {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}; open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> #doc_info{deleted=IsDeleted,rev=Rev,summary_pointer=Sp} = DocInfo = couch_doc:to_doc_info(FullDocInfo), - {[{_Rev,_Value, Revs}], []} = couch_key_tree:get(RevTree, [Rev]), - Doc = make_doc(Db, Id, IsDeleted, Sp, Revs), + {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]), + Doc = make_doc(Db, Id, IsDeleted, Sp, RevPath), {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}; open_doc_int(Db, Id, Options) -> case get_full_doc_info(Db, Id) of @@ -641,9 +797,10 @@ doc_meta_info(DocInfo, RevTree, Options) -> case lists:member(revs_info, Options) of false -> []; true -> - {[RevPath],[]} = + {[{Pos, RevPath}],[]} = couch_key_tree:get_full_key_paths(RevTree, [DocInfo#doc_info.rev]), - [{revs_info, lists:map( + + [{revs_info, Pos, lists:map( fun({Rev, {true, _Sp}}) -> {Rev, deleted}; ({Rev, {false, _Sp}}) -> @@ -670,13 +827,15 @@ doc_meta_info(DocInfo, RevTree, Options) -> end. -doc_to_tree(Doc) -> - doc_to_tree(Doc, lists:reverse(Doc#doc.revs)). +doc_to_tree(#doc{revs={Start, RevIds}}=Doc) -> + [Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)), + {Start - length(RevIds) + 1, Tree}. + -doc_to_tree(Doc, [RevId]) -> +doc_to_tree_simple(Doc, [RevId]) -> [{RevId, Doc, []}]; -doc_to_tree(Doc, [RevId | Rest]) -> - [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}]. +doc_to_tree_simple(Doc, [RevId | Rest]) -> + [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}]. make_doc(Db, FullDocInfo) -> {#doc_info{id=Id,deleted=Deleted,summary_pointer=Sp}, RevPath} @@ -703,4 +862,4 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, BodySp, RevisionPath) -> }. -
\ No newline at end of file + diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 026afe14..f460f450 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -69,7 +69,7 @@ -record(doc, { id = <<"">>, - revs = [], + revs = {0, []}, % the json body object. body = {[]}, @@ -104,7 +104,7 @@ % if the disk revision is incremented, then new upgrade logic will need to be % added to couch_db_updater:init_db. --define(LATEST_DISK_VERSION, 0). +-define(LATEST_DISK_VERSION, 1). -record(db_header, {disk_version = ?LATEST_DISK_VERSION, @@ -115,13 +115,14 @@ local_docs_btree_state = nil, purge_seq = 0, purged_docs = nil, - admins_ptr = nil + admins_ptr = nil, + revs_limit = 1000 }). -record(db, - {main_pid=nil, - update_pid=nil, - compactor_pid=nil, + {main_pid = nil, + update_pid = nil, + compactor_pid = nil, instance_start_time, % number of microsecs since jan 1 1970 as a binary string fd, fd_ref_counter, @@ -133,11 +134,12 @@ update_seq, name, filepath, - validate_doc_funs=[], - admins=[], - admins_ptr=nil, - user_ctx=#user_ctx{}, - waiting_delayed_commit=nil + validate_doc_funs = [], + admins = [], + admins_ptr = nil, + user_ctx = #user_ctx{}, + waiting_delayed_commit = nil, + revs_limit = 1000 }). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 7790d7a4..7752a577 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -44,15 +44,13 @@ handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; handle_call({update_docs, DocActions, Options}, _From, Db) -> try update_docs_int(Db, DocActions, Options) of - {ok, Db2} -> + {ok, Conflicts, Db2} -> ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), couch_db_update_notifier:notify({updated, Db2#db.name}), - {reply, ok, Db2} + {reply, {ok, Conflicts}, Db2} catch throw: retry -> - {reply, retry, Db}; - throw: conflict -> - {reply, conflict, Db} + {reply, retry, Db} end; handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> {reply, ok, Db}; % no data waiting, return ok immediately @@ -64,18 +62,18 @@ handle_call(increment_update_seq, _From, Db) -> couch_db_update_notifier:notify({updated, Db#db.name}), {reply, {ok, Db2#db.update_seq}, Db2}; -handle_call({set_admins, NewAdmins, #user_ctx{roles=Roles}}, _From, Db) -> - DbAdmins = [<<"_admin">> | Db#db.admins], - case length(DbAdmins -- Roles) == length(DbAdmins) of - true -> - {reply, {unauthorized, <<"You are not a db or server admin.">>}, Db}; - false -> - {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins), - Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr, - update_seq=Db#db.update_seq+1}), - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), - {reply, ok, Db2} - end; +handle_call({set_admins, NewAdmins}, _From, Db) -> + {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins), + Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr, + update_seq=Db#db.update_seq+1}), + ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + {reply, ok, Db2}; + +handle_call({set_revs_limit, Limit}, _From, Db) -> + Db2 = commit_data(Db#db{revs_limit=Limit, + update_seq=Db#db.update_seq+1}), + ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + {reply, ok, Db2}; handle_call({purge_docs, _IdRevs}, _From, #db{compactor_pid=Pid}=Db) when Pid /= nil -> @@ -298,7 +296,8 @@ init_db(DbName, Filepath, Fd, Header0) -> filepath = Filepath, admins = Admins, admins_ptr = AdminsPtr, - instance_start_time = StartTime + instance_start_time = StartTime, + revs_limit = Header#db_header.revs_limit }. @@ -358,40 +357,31 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> end, Unflushed), flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). -merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) -> - {ok, lists:reverse(AccNewInfos), AccSeq}; -merge_rev_trees(NoConflicts, [NewDocs|RestDocsList], - [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) -> - #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo, - UpdatesRevTree = lists:foldl( - fun(NewDoc, AccTree) -> - couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc)) +merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccConflicts, AccSeq) -> + {ok, lists:reverse(AccNewInfos), AccConflicts, AccSeq}; +merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], + [OldDocInfo|RestOldInfo], AccNewInfos, AccConflicts, AccSeq) -> + #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted}=OldDocInfo, + {NewRevTree, NewConflicts} = lists:foldl( + fun(#doc{revs={Pos,[Rev|_]}}=NewDoc, {AccTree, AccConflicts2}) -> + case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of + {_NewTree, conflicts} + when (not OldDeleted) and (not MergeConflicts) -> + {AccTree, [{{Id, {Pos,Rev}}, conflict} | AccConflicts2]}; + {NewTree, _} -> + {NewTree, AccConflicts2} + end end, - [], NewDocs), - NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree), + {OldTree, AccConflicts}, NewDocs), if NewRevTree == OldTree -> % nothing changed - merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq); + merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos, + NewConflicts, AccSeq); true -> - if NoConflicts andalso OldTree /= [] -> - OldConflicts = couch_key_tree:count_leafs(OldTree), - NewConflicts = couch_key_tree:count_leafs(NewRevTree), - if NewConflicts > OldConflicts -> - % if all the old docs are deletions, allow this new conflict - case [1 || {_Rev,{IsDel,_Sp},_Path} <- - couch_key_tree:get_all_leafs(OldTree), IsDel==false] of - [] -> - ok; - _ -> - throw(conflict) - end; - true -> ok - end; - true -> ok - end, + % we have updated the document, give it a new seq # NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, - merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo, - [NewInfo|AccNewInfos],AccSeq+1) + merge_rev_trees(MergeConflicts, RestDocsList,RestOldInfo, + [NewInfo|AccNewInfos], NewConflicts, AccSeq+1) end. new_index_entries([], AccById, AccBySeq) -> @@ -402,19 +392,23 @@ new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) -> [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], [DocInfo|AccBySeq]). + +stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> + [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || + #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. + + update_docs_int(Db, DocsList, Options) -> #db{ fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, update_seq = LastSeq } = Db, - % separate out the NonRep documents from the rest of the documents {DocsList2, NonRepDocs} = lists:foldl( - fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) -> + fun([#doc{id=Id}=Doc | _]=Docs, {DocsListAcc, NonRepDocsAcc}) -> case Id of - <<?LOCAL_DOC_PREFIX, _/binary>> when Rest==[] -> - % when saving NR (non rep) documents, you can only save a single rev + <<?LOCAL_DOC_PREFIX, _/binary>> -> {DocsListAcc, [Doc | NonRepDocsAcc]}; Id-> {[Docs | DocsListAcc], NonRepDocsAcc} @@ -434,23 +428,26 @@ update_docs_int(Db, DocsList, Options) -> Ids, OldDocLookups), % Merge the new docs into the revision trees. - NoConflicts = lists:member(new_edits, Options), - {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq), + {ok, NewDocInfos0, Conflicts, NewSeq} = merge_rev_trees( + lists:member(merge_conflicts, Options), + DocsList2, OldDocInfos, [], [], LastSeq), + + NewDocInfos = stem_full_doc_infos(Db, NewDocInfos0), RemoveSeqs = - [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], + [OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], - % All regular documents are now ready to write. + % All documents are now ready to write. - % Try to write the local documents first, a conflict might be generated - {ok, Db2} = update_local_docs(Db, NonRepDocs), + {ok, LocalConflicts, Db2} = update_local_docs(Db, NonRepDocs), - % Write out the document summaries (they are stored in the nodes of the rev trees) + % Write out the document summaries (the bodies are stored in the nodes of + % the trees, the attachments are already written to disk) {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []), {ok, InfoById, InfoBySeq} = new_index_entries(FlushedDocInfos, [], []), - % and the indexes to the documents + % and the indexes {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), @@ -459,6 +456,8 @@ update_docs_int(Db, DocsList, Options) -> docinfo_by_seq_btree = DocInfoBySeqBTree2, update_seq = NewSeq}, + % Check if we just updated any design documents, and update the validation + % funs if we did. case [1 || <<"_design/",_/binary>> <- Ids] of [] -> Db4 = Db3; @@ -466,18 +465,15 @@ update_docs_int(Db, DocsList, Options) -> Db4 = refresh_validate_doc_funs(Db3) end, - {ok, commit_data(Db4, not lists:member(full_commit, Options))}. - + {ok, LocalConflicts ++ Conflicts, + commit_data(Db4, not lists:member(full_commit, Options))}. + update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> Ids = [Id || #doc{id=Id} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> - NewRev = - case Revs of - [] -> 0; - [RevStr|_] -> list_to_integer(binary_to_list(RevStr)) - end, + fun(#doc{id=Id,deleted=Delete,revs={0,[RevStr]},body=Body}, OldDocLookup) -> + NewRev = list_to_integer(?b2l(RevStr)), OldRev = case OldDocLookup of {ok, {_, {OldRev0, _}}} -> OldRev0; @@ -490,18 +486,19 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> true -> {remove, Id} end; false -> - throw(conflict) + {conflict, {Id, {0, RevStr}}} end end, Docs, OldDocLookups), BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], - + Conflicts = [{conflict, IdRev} || {conflict, IdRev} <- BtreeEntries], + {ok, Btree2} = couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - {ok, Db#db{local_docs_btree = Btree2}}. + {ok, Conflicts, Db#db{local_docs_btree = Btree2}}. commit_data(Db) -> @@ -515,7 +512,8 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), - admins_ptr = Db#db.admins_ptr + admins_ptr = Db#db.admins_ptr, + revs_limit = Db#db.revs_limit }, if Header == Header2 -> Db; @@ -549,6 +547,10 @@ copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> []; +copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) -> + % root nner node, only copy info/data from leaf nodes + [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]), + [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) -> % This is a leaf node, copy it over NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), @@ -560,10 +562,11 @@ copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) -> Ids = [Id || #doc_info{id=Id} <- InfoBySeq], LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), - NewFullDocInfos = lists:map( + NewFullDocInfos0 = lists:map( fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)} end, LookupResults), + NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0), NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos], RemoveSeqs = case Retry of diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index 9860ac0c..fc817d56 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -12,41 +12,53 @@ -module(couch_doc). --export([to_doc_info/1,to_doc_info_path/1]). +-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,rev_to_strs/1]). -export([bin_foldl/3,bin_size/1,bin_to_binary/1,get_validate_doc_fun/1]). -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). -include("couch_db.hrl"). % helpers used by to_json_obj -to_json_rev([]) -> +to_json_rev(0, []) -> []; -to_json_rev(Revs) -> - [{<<"_rev">>, lists:nth(1, Revs)}]. +to_json_rev(Start, [FirstRevId|_]) -> + [{<<"_rev">>, ?l2b([integer_to_list(Start),"-",FirstRevId])}]. to_json_body(true, _Body) -> [{<<"_deleted">>, true}]; to_json_body(false, {Body}) -> Body. -to_json_revs(Options, Revs) -> +to_json_revisions(Options, Start, RevIds) -> case lists:member(revs, Options) of false -> []; true -> - [{<<"_revs">>, Revs}] + [{<<"_revisions">>, {[{<<"start">>, Start}, + {<<"ids">>, RevIds}]}}] end. -to_json_revs_info(Meta) -> +rev_to_str({Pos, RevId}) -> + ?l2b([integer_to_list(Pos),"-",RevId]). + +rev_to_strs([]) -> + []; +rev_to_strs([{Pos, RevId}| Rest]) -> + [rev_to_str({Pos, RevId}) | rev_to_strs(Rest)]. + +to_json_meta(Meta) -> lists:map( - fun({revs_info, RevsInfo}) -> - JsonRevsInfo = - [{[{rev, Rev}, {status, list_to_binary(atom_to_list(Status))}]} || - {Rev, Status} <- RevsInfo], + fun({revs_info, Start, RevsInfo}) -> + {JsonRevsInfo, _Pos} = lists:mapfoldl( + fun({RevId, Status}, PosAcc) -> + JsonObj = {[{<<"rev">>, rev_to_str({PosAcc, RevId})}, + {<<"status">>, ?l2b(atom_to_list(Status))}]}, + {JsonObj, PosAcc - 1} + end, Start, RevsInfo), {<<"_revs_info">>, JsonRevsInfo}; ({conflicts, Conflicts}) -> - {<<"_conflicts">>, Conflicts}; - ({deleted_conflicts, Conflicts}) -> - {<<"_deleted_conflicts">>, Conflicts} + {<<"_conflicts">>, rev_to_strs(Conflicts)}; + ({deleted_conflicts, DConflicts}) -> + {<<"_deleted_conflicts">>, rev_to_strs(DConflicts)} end, Meta). to_json_attachment_stubs(Attachments) -> @@ -98,17 +110,62 @@ to_json_attachments(Attachments, Options) -> to_json_attachment_stubs(Attachments) end. -to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs=Revs,meta=Meta}=Doc,Options)-> +to_json_obj(#doc{id=Id,deleted=Del,body=Body,revs={Start, RevIds}, + meta=Meta}=Doc,Options)-> {[{<<"_id">>, Id}] - ++ to_json_rev(Revs) + ++ to_json_rev(Start, RevIds) ++ to_json_body(Del, Body) - ++ to_json_revs(Options, Revs) - ++ to_json_revs_info(Meta) + ++ to_json_revisions(Options, Start, RevIds) + ++ to_json_meta(Meta) ++ to_json_attachments(Doc#doc.attachments, Options) }. from_json_obj({Props}) -> - {JsonBins} = proplists:get_value(<<"_attachments">>, Props, {[]}), + transfer_fields(Props, #doc{body=[]}); + +from_json_obj(_Other) -> + throw({bad_request, "Document must be a JSON object"}). + +parse_rev(Rev) when is_binary(Rev) -> + parse_rev(?b2l(Rev)); +parse_rev(Rev) -> + {Pos, [$- | RevId]} = lists:splitwith(fun($-) -> false; (_) -> true end, Rev), + {list_to_integer(Pos), ?l2b(RevId)}. + +parse_revs([]) -> + []; +parse_revs([Rev | Rest]) -> + [parse_rev(Rev) | parse_revs(Rest)]. + + +transfer_fields([], #doc{body=Fields}=Doc) -> + % convert fields back to json object + Doc#doc{body={lists:reverse(Fields)}}; + +transfer_fields([{<<"_id">>, Id} | Rest], Doc) when is_binary(Id) -> + case Id of + <<"_design/", _/binary>> -> ok; + <<"_local/", _/binary>> -> ok; + <<"_", _/binary>> -> + throw({bad_request, <<"Only reserved document ids may start with underscore.">>}); + _Else -> ok + end, + transfer_fields(Rest, Doc#doc{id=Id}); + +transfer_fields([{<<"_id">>, Id} | _Rest], _Doc) -> + ?LOG_DEBUG("Document id is not a string: ~p", [Id]), + throw({bad_request, <<"Document id must be a string">>}); + +transfer_fields([{<<"_rev">>, Rev} | Rest], #doc{revs={0, []}}=Doc) -> + {Pos, RevId} = parse_rev(Rev), + transfer_fields(Rest, + Doc#doc{revs={Pos, [RevId]}}); + +transfer_fields([{<<"_rev">>, _Rev} | Rest], Doc) -> + % we already got the rev from the _revisions + transfer_fields(Rest,Doc); + +transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> Bins = lists:flatmap(fun({Name, {BinProps}}) -> case proplists:get_value(<<"stub">>, BinProps) of true -> @@ -122,51 +179,40 @@ from_json_obj({Props}) -> [{Name, {Type, couch_util:decodeBase64(Value)}}] end end, JsonBins), - AllowedSpecialMembers = [<<"id">>, <<"revs">>, <<"rev">>, <<"attachments">>, <<"revs_info">>, - <<"conflicts">>, <<"deleted_conflicts">>, <<"deleted">>], - % collect all the doc-members that start with "_" - % if any aren't in the AllowedSpecialMembers list - % then throw a invalid_doc error - [case lists:member(Name, AllowedSpecialMembers) of - true -> - ok; - false -> - throw({invalid_doc, io_lib:format("Bad special document member: _~s", [Name])}) - end - || {<<$_,Name/binary>>, _Value} <- Props], - Revs = - case proplists:get_value(<<"_revs">>, Props, []) of - [] -> - case proplists:get_value(<<"_rev">>, Props) of - undefined -> []; - Rev -> [Rev] - end; - Revs0 -> - Revs0 - end, - case proplists:get_value(<<"_id">>, Props, <<>>) of - <<"_design/", _/binary>> = Id -> ok; - <<"_local/", _/binary>> = Id -> ok; - <<"_", _/binary>> = Id -> - throw({invalid_doc, "Document Ids must not start with underscore."}); - Id when is_binary(Id) -> ok; - Id -> - ?LOG_DEBUG("Document id is not a string: ~p", [Id]), - throw({invalid_doc, "Document id is not a string"}) + transfer_fields(Rest, Doc#doc{attachments=Bins}); + +transfer_fields([{<<"_revisions">>, {Props}} | Rest], Doc) -> + RevIds = proplists:get_value(<<"ids">>, Props), + Start = proplists:get_value(<<"start">>, Props), + if not is_integer(Start) -> + throw({doc_validation, "_revisions.start isn't an integer."}); + not is_list(RevIds) -> + throw({doc_validation, "_revisions.ids isn't a array."}); + true -> + ok end, + [throw({doc_validation, "RevId isn't a string"}) || + RevId <- RevIds, not is_binary(RevId)], + transfer_fields(Rest, Doc#doc{revs={Start, RevIds}}); - % strip out the all props beginning with _ - NewBody = {[{K, V} || {<<First,_/binary>>=K, V} <- Props, First /= $_]}, - #doc{ - id = Id, - revs = Revs, - deleted = proplists:get_value(<<"_deleted">>, Props, false), - body = NewBody, - attachments = Bins - }; +transfer_fields([{<<"_deleted">>, B} | Rest], Doc) when (B==true) or (B==false) -> + transfer_fields(Rest, Doc#doc{deleted=B}); -from_json_obj(_Other) -> - throw({invalid_doc, "Document must be a JSON object"}). +% ignored fields +transfer_fields([{<<"_revs_info">>, _} | Rest], Doc) -> + transfer_fields(Rest, Doc); +transfer_fields([{<<"_conflicts">>, _} | Rest], Doc) -> + transfer_fields(Rest, Doc); +transfer_fields([{<<"_deleted_conflicts">>, _} | Rest], Doc) -> + transfer_fields(Rest, Doc); + +% unknown special field +transfer_fields([{<<"_",Name/binary>>, Start} | _], _) when is_integer(Start) -> + throw({doc_validation, + ?l2b(io_lib:format("Bad special document member: _~s", [Name]))}); + +transfer_fields([Field | Rest], #doc{body=Fields}=Doc) -> + transfer_fields(Rest, Doc#doc{body=[Field|Fields]}). to_doc_info(FullDocInfo) -> {DocInfo, _Path} = to_doc_info_path(FullDocInfo), @@ -175,27 +221,26 @@ to_doc_info(FullDocInfo) -> to_doc_info_path(#full_doc_info{id=Id,update_seq=Seq,rev_tree=Tree}) -> LeafRevs = couch_key_tree:get_all_leafs(Tree), SortedLeafRevs = - lists:sort(fun({RevIdA, {IsDeletedA, _}, PathA}, {RevIdB, {IsDeletedB, _}, PathB}) -> + lists:sort(fun({{IsDeletedA, _}, {StartA, [RevIdA|_]}}, {{IsDeletedB, _}, {StartB, [RevIdB|_]}}) -> % sort descending by {not deleted, then Depth, then RevisionId} - A = {not IsDeletedA, length(PathA), RevIdA}, - B = {not IsDeletedB, length(PathB), RevIdB}, + A = {not IsDeletedA, StartA, RevIdA}, + B = {not IsDeletedB, StartB, RevIdB}, A > B end, LeafRevs), - [{RevId, {IsDeleted, SummaryPointer}, Path} | Rest] = SortedLeafRevs, - + [{{IsDeleted, SummaryPointer}, {Start, [RevId|_]}=Path} | Rest] = SortedLeafRevs, {ConflictRevTuples, DeletedConflictRevTuples} = - lists:splitwith(fun({_ConflictRevId, {IsDeleted1, _Sp}, _}) -> + lists:splitwith(fun({{IsDeleted1, _Sp}, _}) -> not IsDeleted1 end, Rest), - ConflictRevs = [RevId1 || {RevId1, _, _} <- ConflictRevTuples], - DeletedConflictRevs = [RevId2 || {RevId2, _, _} <- DeletedConflictRevTuples], + ConflictRevs = [{Start1, RevId1} || {_, {Start1, [RevId1|_]}} <- ConflictRevTuples], + DeletedConflictRevs = [{Start1, RevId1} || {_, {Start1, [RevId1|_]}} <- DeletedConflictRevTuples], DocInfo = #doc_info{ id=Id, update_seq=Seq, - rev = RevId, + rev = {Start, RevId}, summary_pointer = SummaryPointer, conflict_revs = ConflictRevs, deleted_conflict_revs = DeletedConflictRevs, diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index 48ff403b..2ba684a8 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -16,7 +16,7 @@ -export([start_link/0, stop/0, handle_request/4]). -export([header_value/2,header_value/3,qs_value/2,qs_value/3,qs/1,path/1,absolute_uri/2]). --export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4]). +-export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4,error_info/1]). -export([parse_form/1,json_body/1,body/1,doc_etag/1, make_etag/1, etag_respond/3]). -export([primary_header_value/2,partition/1,serve_file/3]). -export([start_chunked_response/3,send_chunk/2]). @@ -166,7 +166,7 @@ handle_request(MochiReq, UrlHandlers, DbUrlHandlers, DesignUrlHandlers) -> catch throw:Error -> send_error(HttpReq, Error); - Tag:Error -> + Tag:Error when Error ==foo -> ?LOG_ERROR("Uncaught error in HTTP request: ~p",[{Tag, Error}]), ?LOG_DEBUG("Stacktrace: ~p",[erlang:get_stacktrace()]), send_error(HttpReq, Error) @@ -295,8 +295,8 @@ body(#httpd{mochi_req=MochiReq}) -> json_body(Httpd) -> ?JSON_DECODE(body(Httpd)). -doc_etag(#doc{revs=[DiskRev|_]}) -> - "\"" ++ binary_to_list(DiskRev) ++ "\"". +doc_etag(#doc{revs={Start, [DiskRev|_]}}) -> + "\"" ++ ?b2l(couch_doc:rev_to_str({Start, DiskRev})) ++ "\"". make_etag(Term) -> <<SigInt:128/integer>> = erlang:md5(term_to_binary(Term)), @@ -392,75 +392,55 @@ end_json_response(Resp) -> send_chunk(Resp, []). -send_error(Req, bad_request) -> - send_error(Req, 400, <<"bad_request">>, <<>>); -send_error(Req, {query_parse_error, Reason}) -> - send_error(Req, 400, <<"query_parse_error">>, Reason); -send_error(Req, {bad_request, Reason}) -> - send_error(Req, 400, <<"bad_request">>, Reason); -send_error(Req, not_found) -> - send_error(Req, 404, <<"not_found">>, <<"Missing">>); -send_error(Req, {not_found, Reason}) -> - send_error(Req, 404, <<"not_found">>, Reason); -send_error(Req, conflict) -> - send_error(Req, 409, <<"conflict">>, <<"Document update conflict.">>); -send_error(Req, {invalid_doc, Reason}) -> - send_error(Req, 400, <<"invalid_doc">>, Reason); -send_error(Req, {forbidden, Msg}) -> - send_json(Req, 403, - {[{<<"error">>, <<"forbidden">>}, - {<<"reason">>, Msg}]}); -send_error(Req, {unauthorized, Msg}) -> - case couch_config:get("httpd", "WWW-Authenticate", nil) of - nil -> - Headers = []; - Type -> - Headers = [{"WWW-Authenticate", Type}] - end, - send_json(Req, 401, Headers, - {[{<<"error">>, <<"unauthorized">>}, - {<<"reason">>, Msg}]}); -send_error(Req, {http_error, Code, Headers, Error, Reason}) -> - send_json(Req, Code, Headers, - {[{<<"error">>, Error}, {<<"reason">>, Reason}]}); -send_error(Req, {user_error, {Props}}) -> - {Headers} = proplists:get_value(<<"headers">>, Props, {[]}), - send_json(Req, - proplists:get_value(<<"http_status">>, Props, 500), - Headers, - {[{<<"error">>, proplists:get_value(<<"error">>, Props)}, - {<<"reason">>, proplists:get_value(<<"reason">>, Props)}]}); -send_error(Req, file_exists) -> - send_error(Req, 412, <<"file_exists">>, <<"The database could not be " - "created, the file already exists.">>); -send_error(Req, {Error, Reason}) -> - send_error(Req, 500, Error, Reason); -send_error(Req, Error) -> - send_error(Req, 500, <<"error">>, Error). - +error_info(bad_request) -> + {400, <<"bad_request">>, <<>>}; +error_info({bad_request, Reason}) -> + {400, <<"bad_request">>, Reason}; +error_info({query_parse_error, Reason}) -> + {400, <<"query_parse_error">>, Reason}; +error_info(not_found) -> + {404, <<"not_found">>, <<"Missing">>}; +error_info({not_found, Reason}) -> + {404, <<"not_found">>, Reason}; +error_info(conflict) -> + {409, <<"conflict">>, <<"Document update conflict.">>}; +error_info({forbidden, Msg}) -> + {403, <<"forbidden">>, Msg}; +error_info({unauthorized, Msg}) -> + {401, <<"unauthorized">>, Msg}; +error_info(file_exists) -> + {412, <<"file_exists">>, <<"The database could not be " + "created, the file already exists.">>}; +error_info({Error, Reason}) -> + {500, couch_util:to_binary(Error), couch_util:to_binary(Reason)}; +error_info(Error) -> + {500, <<"unknown_error">>, couch_util:to_binary(Error)}. +send_error(Req, Error) -> + {Code, ErrorStr, ReasonStr} = error_info(Error), + if Code == 401 -> + case couch_config:get("httpd", "WWW-Authenticate", nil) of + nil -> + Headers = []; + Type -> + Headers = [{"WWW-Authenticate", Type}] + end; + true -> + Headers = [] + end, + send_error(Req, Code, Headers, ErrorStr, ReasonStr). -send_error(Req, Code, Error, Msg) when is_atom(Error) -> - send_error(Req, Code, list_to_binary(atom_to_list(Error)), Msg); -send_error(Req, Code, Error, Msg) when is_list(Msg) -> - case (catch list_to_binary(Msg)) of - Bin when is_binary(Bin) -> - send_error(Req, Code, Error, Bin); - _ -> - send_error(Req, Code, Error, io_lib:format("~p", [Msg])) - end; -send_error(Req, Code, Error, Msg) when not is_binary(Error) -> - send_error(Req, Code, list_to_binary(io_lib:format("~p", [Error])), Msg); -send_error(Req, Code, Error, Msg) when not is_binary(Msg) -> - send_error(Req, Code, Error, list_to_binary(io_lib:format("~p", [Msg]))); -send_error(Req, Code, Error, <<>>) -> - send_json(Req, Code, {[{<<"error">>, Error}]}); -send_error(Req, Code, Error, Msg) -> - send_json(Req, Code, {[{<<"error">>, Error}, {<<"reason">>, Msg}]}). +send_error(Req, Code, ErrorStr, ReasonStr) -> + send_error(Req, Code, [], ErrorStr, ReasonStr). -send_redirect(Req, Path) -> - Headers = [{"Location", couch_httpd:absolute_uri(Req, Path)}], - send_response(Req, 301, Headers, <<>>). +send_error(Req, Code, Headers, ErrorStr, ReasonStr) -> + send_json(Req, Code, Headers, + {[{<<"error">>, ErrorStr}, + {<<"reason">>, ReasonStr}]}). + + send_redirect(Req, Path) -> + Headers = [{"Location", couch_httpd:absolute_uri(Req, Path)}], + send_response(Req, 301, Headers, <<>>). negotiate_content_type(#httpd{mochi_req=MochiReq}) -> %% Determine the appropriate Content-Type header for a JSON response diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 75022cd3..3680d73b 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -22,8 +22,8 @@ -record(doc_query_args, { options = [], - rev = "", - open_revs = "" + rev = nil, + open_revs = [] }). % Database request handlers @@ -89,13 +89,13 @@ db_req(#httpd{method='GET',path_parts=[_DbName]}=Req, Db) -> db_req(#httpd{method='POST',path_parts=[DbName]}=Req, Db) -> Doc = couch_doc:from_json_obj(couch_httpd:json_body(Req)), DocId = couch_util:new_uuid(), - {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=[]}, []), + {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []), DocUrl = absolute_uri(Req, binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)), send_json(Req, 201, [{"Location", DocUrl}], {[ {ok, true}, {id, DocId}, - {rev, NewRev} + {rev, couch_doc:rev_to_str(NewRev)} ]}); db_req(#httpd{path_parts=[_DbName]}=Req, _Db) -> @@ -131,32 +131,59 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) -> <<>> -> couch_util:new_uuid(); Id0 -> Id0 end, - Revs = case proplists:get_value(<<"_rev">>, ObjProps) of - undefined -> []; - Rev -> [Rev] + case proplists:get_value(<<"_rev">>, ObjProps) of + undefined -> + Revs = {0, []}; + Rev -> + {Pos, RevId} = couch_doc:parse_rev(Rev), + Revs = {Pos, [RevId]} end, Doc#doc{id=Id,revs=Revs} end, DocsArray), - {ok, ResultRevs} = couch_db:update_docs(Db, Docs, Options), - - % output the results - DocResults = lists:zipwith( - fun(Doc, NewRev) -> - {[{<<"id">>, Doc#doc.id}, {<<"rev">>, NewRev}]} - end, - Docs, ResultRevs), - send_json(Req, 201, {[ - {ok, true}, - {new_revs, DocResults} - ]}); - + Options2 = + case proplists:get_value(<<"all_or_nothing">>, JsonProps) of + true -> [all_or_nothing|Options]; + _ -> Options + end, + case couch_db:update_docs(Db, Docs, Options2) of + {ok, Results} -> + % output the results + DocResults = lists:zipwith( + fun(Doc, {ok, NewRev}) -> + {[{<<"id">>, Doc#doc.id}, {<<"rev">>, couch_doc:rev_to_str(NewRev)}]}; + (Doc, Error) -> + {_Code, Err, Msg} = couch_httpd:error_info(Error), + % maybe we should add the http error code to the json? + {[{<<"id">>, Doc#doc.id}, {<<"error">>, Err}, {"reason", Msg}]} + end, + Docs, Results), + send_json(Req, 201, DocResults); + {aborted, Errors} -> + ErrorsJson = + lists:map( + fun({{Id, Rev}, Error}) -> + {_Code, Err, Msg} = couch_httpd:error_info(Error), + {[{<<"id">>, Id}, + {<<"rev">>, couch_doc:rev_to_str(Rev)}, + {<<"error">>, Err}, + {"reason", Msg}]} + end, Errors), + send_json(Req, 417, ErrorsJson) + end; false -> Docs = [couch_doc:from_json_obj(JsonObj) || JsonObj <- DocsArray], - ok = couch_db:update_docs(Db, Docs, Options, false), - send_json(Req, 201, {[ - {ok, true} - ]}) + {ok, Errors} = couch_db:update_docs(Db, Docs, Options, replicated_changes), + ErrorsJson = + lists:map( + fun({{Id, Rev}, Error}) -> + {_Code, Err, Msg} = couch_httpd:error_info(Error), + {[{<<"id">>, Id}, + {<<"rev">>, couch_doc:rev_to_str(Rev)}, + {<<"error">>, Err}, + {"reason", Msg}]} + end, Errors), + send_json(Req, 201, ErrorsJson) end; db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); @@ -170,12 +197,12 @@ db_req(#httpd{path_parts=[_,<<"_compact">>]}=Req, _Db) -> db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) -> {IdsRevs} = couch_httpd:json_body(Req), - % validate the json input - [{_Id, [_|_]=_Revs} = IdRevs || IdRevs <- IdsRevs], + IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs], - case couch_db:purge_docs(Db, IdsRevs) of + case couch_db:purge_docs(Db, IdsRevs2) of {ok, PurgeSeq, PurgedIdsRevs} -> - send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs}}]}); + PurgedIdsRevs2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs], + send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]}); Error -> throw(Error) end; @@ -204,7 +231,7 @@ db_req(#httpd{method='GET',path_parts=[_,<<"_all_docs_by_seq">>]}=Req, Db) -> {ok, Info} = couch_db:get_db_info(Db), CurrentEtag = couch_httpd:make_etag(proplists:get_value(update_seq, Info)), - couch_httpd:etag_respond(Req, CurrentEtag, fun() -> + couch_httpd:etag_respond(Req, CurrentEtag, fun() -> TotalRowCount = proplists:get_value(doc_count, Info), FoldlFun = couch_httpd_view:make_view_fold_fun(Req, QueryArgs, CurrentEtag, Db, TotalRowCount, #view_fold_helper_funs{ @@ -227,14 +254,14 @@ db_req(#httpd{method='GET',path_parts=[_,<<"_all_docs_by_seq">>]}=Req, Db) -> deleted_conflict_revs=DelConflictRevs } = DocInfo, Json = { - [{<<"rev">>, Rev}] ++ + [{<<"rev">>, couch_doc:rev_to_str(Rev)}] ++ case ConflictRevs of [] -> []; - _ -> [{<<"conflicts">>, ConflictRevs}] + _ -> [{<<"conflicts">>, couch_doc:rev_to_strs(ConflictRevs)}] end ++ case DelConflictRevs of [] -> []; - _ -> [{<<"deleted_conflicts">>, DelConflictRevs}] + _ -> [{<<"deleted_conflicts">>, couch_doc:rev_to_strs(DelConflictRevs)}] end ++ case Deleted of true -> [{<<"deleted">>, true}]; @@ -251,9 +278,11 @@ db_req(#httpd{path_parts=[_,<<"_all_docs_by_seq">>]}=Req, _Db) -> db_req(#httpd{method='POST',path_parts=[_,<<"_missing_revs">>]}=Req, Db) -> {JsonDocIdRevs} = couch_httpd:json_body(Req), - {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs), + JsonDocIdRevs2 = [{Id, [couch_doc:parse_rev(RevStr) || RevStr <- RevStrs]} || {Id, RevStrs} <- JsonDocIdRevs], + {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs2), + Results2 = [{Id, [couch_doc:rev_to_str(Rev) || Rev <- Revs]} || {Id, Revs} <- Results], send_json(Req, {[ - {missing_revs, {Results}} + {missing_revs, {Results2}} ]}); db_req(#httpd{path_parts=[_,<<"_missing_revs">>]}=Req, _Db) -> @@ -271,6 +300,18 @@ db_req(#httpd{method='GET',path_parts=[_,<<"_admins">>]}=Req, Db) -> db_req(#httpd{path_parts=[_,<<"_admins">>]}=Req, _Db) -> send_method_not_allowed(Req, "PUT,GET"); +db_req(#httpd{method='PUT',path_parts=[_,<<"_revs_limit">>]}=Req, + Db) -> + Limit = couch_httpd:json_body(Req), + ok = couch_db:set_revs_limit(Db, Limit), + send_json(Req, {[{<<"ok">>, true}]}); + +db_req(#httpd{method='GET',path_parts=[_,<<"_revs_limit">>]}=Req, Db) -> + send_json(Req, couch_db:get_revs_limit(Db)); + +db_req(#httpd{path_parts=[_,<<"_revs_limit">>]}=Req, _Db) -> + send_method_not_allowed(Req, "PUT,GET"); + % Special case to enable using an unencoded slash in the URL of design docs, % as slashes in document IDs must otherwise be URL encoded. db_req(#httpd{method='GET',mochi_req=MochiReq, path_parts=[DbName,<<"_design/",_/binary>>|_]}=Req, _Db) -> @@ -334,7 +375,7 @@ all_docs_view(Req, Db, Keys) -> AdapterFun = fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc) -> case couch_doc:to_doc_info(FullDocInfo) of #doc_info{deleted=false, rev=Rev} -> - FoldlFun({{Id, Id}, {[{rev, Rev}]}}, Offset, Acc); + FoldlFun({{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}}, Offset, Acc); #doc_info{deleted=true} -> {ok, Acc} end @@ -358,9 +399,9 @@ all_docs_view(Req, Db, Keys) -> DocInfo = (catch couch_db:get_doc_info(Db, Key)), Doc = case DocInfo of {ok, #doc_info{id=Id, rev=Rev, deleted=false}} = DocInfo -> - {{Id, Id}, {[{rev, Rev}]}}; + {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}}; {ok, #doc_info{id=Id, rev=Rev, deleted=true}} = DocInfo -> - {{Id, Id}, {[{rev, Rev}, {deleted, true}]}}; + {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}, {deleted, true}]}}; not_found -> {{Key, error}, not_found}; _ -> @@ -381,20 +422,12 @@ all_docs_view(Req, Db, Keys) -> - - db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) -> - case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of - missing_rev -> - couch_httpd:send_error(Req, 409, <<"missing_rev">>, - <<"Document rev/etag must be specified to delete">>); - RevToDelete -> - {ok, NewRev} = couch_db:delete_doc(Db, DocId, [RevToDelete]), - send_json(Req, 200, {[ - {ok, true}, - {id, DocId}, - {rev, NewRev} - ]}) + case couch_httpd:qs_value(Req, "rev") of + undefined -> + update_doc(Req, Db, DocId, {[{<<"_deleted">>,true}]}); + Rev -> + update_doc(Req, Db, DocId, {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]}) end; db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> @@ -438,82 +471,31 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> end_json_response(Resp) end; -db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> - Form = couch_httpd:parse_form(Req), - Rev = list_to_binary(proplists:get_value("_rev", Form)), - Doc = case couch_db:open_doc_revs(Db, DocId, [Rev], []) of - {ok, [{ok, Doc0}]} -> Doc0#doc{revs=[Rev]}; - {ok, [Error]} -> throw(Error) - end, - - NewAttachments = [ - {validate_attachment_name(Name), {list_to_binary(ContentType), Content}} || - {Name, {ContentType, _}, Content} <- - proplists:get_all_values("_attachments", Form) - ], - #doc{attachments=Attachments} = Doc, - NewDoc = Doc#doc{ - attachments = Attachments ++ NewAttachments - }, - {ok, NewRev} = couch_db:update_doc(Db, NewDoc, []), - - send_json(Req, 201, [{"Etag", "\"" ++ NewRev ++ "\""}], {obj, [ - {ok, true}, - {id, DocId}, - {rev, NewRev} - ]}); - db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> - Json = couch_httpd:json_body(Req), - Doc = couch_doc:from_json_obj(Json), - ExplicitRev = - case Doc#doc.revs of - [Rev0|_] -> Rev0; - [] -> undefined - end, - validate_attachment_names(Doc), - case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of - "true" -> - Options = [full_commit]; - _ -> - Options = [] - end, - case extract_header_rev(Req, ExplicitRev) of - missing_rev -> - Revs = []; - Rev -> - Revs = [Rev] - end, - {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options), - send_json(Req, 201, [{"Etag", <<"\"", NewRev/binary, "\"">>}], {[ - {ok, true}, - {id, DocId}, - {rev, NewRev} - ]}); + update_doc(Req, Db, DocId, couch_httpd:json_body(Req)); db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) -> SourceRev = case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of - missing_rev -> []; + missing_rev -> nil; Rev -> Rev end, - {TargetDocId, TargetRev} = parse_copy_destination_header(Req), + {TargetDocId, TargetRevs} = parse_copy_destination_header(Req), % open revision Rev or Current Doc = couch_doc_open(Db, SourceDocId, SourceRev, []), - % save new doc - {ok, NewTargetRev} = couch_db:update_doc(Db, Doc#doc{id=TargetDocId, revs=TargetRev}, []), - - send_json(Req, 201, [{"Etag", "\"" ++ binary_to_list(NewTargetRev) ++ "\""}], {[ - {ok, true}, - {id, TargetDocId}, - {rev, NewTargetRev} - ]}); + case couch_db:update_doc(Db, Doc#doc{id=TargetDocId, revs=TargetRevs}, []) of + {ok, NewTargetRev} -> + send_json(Req, 201, [{"Etag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewTargetRev)) ++ "\""}], + update_result_to_json({ok, NewTargetRev})); + Error -> + throw(Error) + end; db_doc_req(#httpd{method='MOVE'}=Req, Db, SourceDocId) -> - SourceRev = + SourceRev = {SourceRevPos, SourceRevId} = case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of missing_rev -> throw({bad_request, "MOVE requires a specified rev parameter" @@ -521,37 +503,68 @@ db_doc_req(#httpd{method='MOVE'}=Req, Db, SourceDocId) -> Rev -> Rev end, - {TargetDocId, TargetRev} = parse_copy_destination_header(Req), + {TargetDocId, TargetRevs} = parse_copy_destination_header(Req), % open revision Rev or Current Doc = couch_doc_open(Db, SourceDocId, SourceRev, []), % save new doc & delete old doc in one operation Docs = [ - Doc#doc{id=TargetDocId, revs=TargetRev}, - #doc{id=SourceDocId, revs=[SourceRev], deleted=true} + #doc{id=SourceDocId, revs={SourceRevPos, [SourceRevId]}, deleted=true}, + Doc#doc{id=TargetDocId, revs=TargetRevs} ], - {ok, ResultRevs} = couch_db:update_docs(Db, Docs, []), + {ok, [SourceResult, TargetResult]} = couch_db:update_docs(Db, Docs, []), - DocResults = lists:zipwith( - fun(FDoc, NewRev) -> - {[{id, FDoc#doc.id}, {rev, NewRev}]} - end, - Docs, ResultRevs), send_json(Req, 201, {[ - {ok, true}, - {new_revs, DocResults} + {SourceDocId, update_result_to_json(SourceResult)}, + {TargetDocId, update_result_to_json(TargetResult)} ]}); db_doc_req(Req, _Db, _DocId) -> send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY,MOVE"). +update_result_to_json({ok, NewRev}) -> + {[{rev, couch_doc:rev_to_str(NewRev)}]}; +update_result_to_json(Error) -> + {_Code, ErrorStr, Reason} = couch_httpd:error_info(Error), + {[{error, ErrorStr}, {reason, Reason}]}. + + +update_doc(Req, Db, DocId, Json) -> + #doc{deleted=Deleted} = Doc = couch_doc:from_json_obj(Json), + validate_attachment_names(Doc), + ExplicitDocRev = + case Doc#doc.revs of + {Start,[RevId|_]} -> {Start, RevId}; + _ -> undefined + end, + case extract_header_rev(Req, ExplicitDocRev) of + missing_rev -> + Revs = {0, []}; + {Pos, Rev} -> + Revs = {Pos, [Rev]} + end, + + case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of + "true" -> + Options = [full_commit]; + _ -> + Options = [] + end, + {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options), + NewRevStr = couch_doc:rev_to_str(NewRev), + send_json(Req, if Deleted -> 200; true -> 201 end, + [{"Etag", <<"\"", NewRevStr/binary, "\"">>}], {[ + {ok, true}, + {id, DocId}, + {rev, NewRevStr}]}). + % Useful for debugging % couch_doc_open(Db, DocId) -> % couch_doc_open(Db, DocId, [], []). couch_doc_open(Db, DocId, Rev, Options) -> case Rev of - "" -> % open most recent rev + nil -> % open most recent rev case couch_db:open_doc(Db, DocId, Options) of {ok, Doc} -> Doc; @@ -572,13 +585,13 @@ couch_doc_open(Db, DocId, Rev, Options) -> db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> FileName = list_to_binary(mochiweb_util:join(lists:map(fun binary_to_list/1, FileNameParts),"/")), case couch_db:open_doc(Db, DocId, []) of - {ok, #doc{attachments=Attachments, revs=[LastRev|_OldRevs]}} -> + {ok, #doc{attachments=Attachments}=Doc} -> case proplists:get_value(FileName, Attachments) of undefined -> throw({not_found, "Document is missing attachment"}); {Type, Bin} -> {ok, Resp} = start_chunked_response(Req, 200, [ - {"ETag", binary_to_list(LastRev)}, + {"ETag", couch_httpd:doc_etag(Doc)}, {"Cache-Control", "must-revalidate"}, {"Content-Type", binary_to_list(Type)}%, % My understanding of http://www.faqs.org/rfcs/rfc2616.html @@ -640,7 +653,7 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts) #doc{id=DocId}; Rev -> case couch_db:open_doc_revs(Db, DocId, [Rev], []) of - {ok, [{ok, Doc0}]} -> Doc0#doc{revs=[Rev]}; + {ok, [{ok, Doc0}]} -> Doc0; {ok, [Error]} -> throw(Error) end end, @@ -653,7 +666,7 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts) send_json(Req, case Method of 'DELETE' -> 200; _ -> 201 end, {[ {ok, true}, {id, DocId}, - {rev, UpdatedRev} + {rev, couch_doc:rev_to_str(UpdatedRev)} ]}); db_attachment_req(Req, _Db, _DocId, _FileNameParts) -> @@ -682,25 +695,24 @@ parse_doc_query(Req) -> Options = [deleted_conflicts | Args#doc_query_args.options], Args#doc_query_args{options=Options}; {"rev", Rev} -> - Args#doc_query_args{rev=list_to_binary(Rev)}; + Args#doc_query_args{rev=couch_doc:parse_rev(Rev)}; {"open_revs", "all"} -> Args#doc_query_args{open_revs=all}; {"open_revs", RevsJsonStr} -> JsonArray = ?JSON_DECODE(RevsJsonStr), - Args#doc_query_args{open_revs=JsonArray}; + Args#doc_query_args{open_revs=[couch_doc:parse_rev(Rev) || Rev <- JsonArray]}; _Else -> % unknown key value pair, ignore. Args end end, #doc_query_args{}, couch_httpd:qs(Req)). - -extract_header_rev(Req, ExplicitRev) when is_list(ExplicitRev)-> - extract_header_rev(Req, list_to_binary(ExplicitRev)); +extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)-> + extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev)); extract_header_rev(Req, ExplicitRev) -> Etag = case couch_httpd:header_value(Req, "If-Match") of undefined -> undefined; - Value -> list_to_binary(string:strip(Value, both, $")) + Value -> couch_doc:parse_rev(string:strip(Value, both, $")) end, case {ExplicitRev, Etag} of {undefined, undefined} -> missing_rev; @@ -716,11 +728,12 @@ parse_copy_destination_header(Req) -> Destination = couch_httpd:header_value(Req, "Destination"), case regexp:match(Destination, "\\?") of nomatch -> - {list_to_binary(Destination), []}; + {list_to_binary(Destination), {0, []}}; {match, _, _} -> {ok, [DocId, RevQueryOptions]} = regexp:split(Destination, "\\?"), {ok, [_RevQueryKey, Rev]} = regexp:split(RevQueryOptions, "="), - {list_to_binary(DocId), [list_to_binary(Rev)]} + {Pos, RevId} = couch_doc:parse_rev(Rev), + {list_to_binary(DocId), {Pos, [RevId]}} end. validate_attachment_names(Doc) -> diff --git a/src/couchdb/couch_httpd_misc_handlers.erl b/src/couchdb/couch_httpd_misc_handlers.erl index 583a87c2..46fed095 100644 --- a/src/couchdb/couch_httpd_misc_handlers.erl +++ b/src/couchdb/couch_httpd_misc_handlers.erl @@ -70,35 +70,36 @@ handle_task_status_req(#httpd{method='GET'}=Req) -> handle_task_status_req(Req) -> send_method_not_allowed(Req, "GET,HEAD"). +% add trailing slash if missing +fix_db_url(UrlBin) -> + ?l2b(case lists:last(Url = ?b2l(UrlBin)) of + $/ -> Url; + _ -> Url ++ "/" + end). + -handle_replicate_req(#httpd{user_ctx=UserCtx,method='POST'}=Req) -> +get_rep_endpoint(_Req, {Props}) -> + Url = proplists:get_value(<<"url">>, Props), + {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}), + {remote, fix_db_url(Url), [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; +get_rep_endpoint(_Req, <<"http://",_/binary>>=Url) -> + {remote, fix_db_url(Url), []}; +get_rep_endpoint(_Req, <<"https://",_/binary>>=Url) -> + {remote, fix_db_url(Url), []}; +get_rep_endpoint(#httpd{user_ctx=UserCtx}, <<DbName/binary>>) -> + {local, DbName, UserCtx}. + +handle_replicate_req(#httpd{method='POST'}=Req) -> {Props} = couch_httpd:json_body(Req), - Source = proplists:get_value(<<"source">>, Props), - Target = proplists:get_value(<<"target">>, Props), - - {SrcOpts} = proplists:get_value(<<"source_options">>, Props, {[]}), - {SrcHeadersBinary} = proplists:get_value(<<"headers">>, SrcOpts, {[]}), - SrcHeaders = [{?b2l(K),(V)} || {K,V} <- SrcHeadersBinary], - - {TgtOpts} = proplists:get_value(<<"target_options">>, Props, {[]}), - {TgtHeadersBinary} = proplists:get_value(<<"headers">>, TgtOpts, {[]}), - TgtHeaders = [{?b2l(K),(V)} || {K,V} <- TgtHeadersBinary], - - {Options} = proplists:get_value(<<"options">>, Props, {[]}), - Options2 = [{source_options, - [{headers, SrcHeaders}, - {user_ctx, UserCtx}]}, - {target_options, - [{headers, TgtHeaders}, - {user_ctx, UserCtx}]} - | Options], - case couch_rep:replicate(Source, Target, Options2) of - {ok, {JsonResults}} -> - send_json(Req, {[{ok, true} | JsonResults]}); - {error, {Type, Details}} -> - send_json(Req, 500, {[{error, Type}, {reason, Details}]}); - {error, Reason} -> - send_json(Req, 500, {[{error, Reason}]}) + Source = get_rep_endpoint(Req, proplists:get_value(<<"source">>, Props)), + Target = get_rep_endpoint(Req, proplists:get_value(<<"target">>, Props)), + case couch_rep:replicate(Source, Target) of + {ok, {JsonResults}} -> + send_json(Req, {[{ok, true} | JsonResults]}); + {error, {Type, Details}} -> + send_json(Req, 500, {[{error, Type}, {reason, Details}]}); + {error, Reason} -> + send_json(Req, 500, {[{error, Reason}]}) end; handle_replicate_req(Req) -> send_method_not_allowed(Req, "POST"). diff --git a/src/couchdb/couch_httpd_show.erl b/src/couchdb/couch_httpd_show.erl index 7b6f2832..dd337ced 100644 --- a/src/couchdb/couch_httpd_show.erl +++ b/src/couchdb/couch_httpd_show.erl @@ -27,10 +27,10 @@ handle_doc_show_req(#httpd{ path_parts=[_DbName, _Design, DesignName, _Show, ShowName, DocId] }=Req, Db) -> DesignId = <<"_design/", DesignName/binary>>, - #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []), + #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), ShowSrc = get_nested_json_value({Props}, [<<"shows">>, ShowName]), - Doc = try couch_httpd_db:couch_doc_open(Db, DocId, [], []) of + Doc = try couch_httpd_db:couch_doc_open(Db, DocId, nil, []) of FoundDoc -> FoundDoc catch _ -> nil @@ -42,7 +42,7 @@ handle_doc_show_req(#httpd{ path_parts=[_DbName, _Design, DesignName, _Show, ShowName] }=Req, Db) -> DesignId = <<"_design/", DesignName/binary>>, - #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []), + #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), ShowSrc = get_nested_json_value({Props}, [<<"shows">>, ShowName]), send_doc_show_response(Lang, ShowSrc, nil, nil, Req, Db); @@ -56,7 +56,7 @@ handle_doc_show_req(Req, _Db) -> handle_view_list_req(#httpd{method='GET', path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewName]}=Req, Db) -> DesignId = <<"_design/", DesignName/binary>>, - #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []), + #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), ListSrc = get_nested_json_value({Props}, [<<"lists">>, ListName]), send_view_list_response(Lang, ListSrc, ViewName, DesignId, Req, Db, nil); @@ -67,7 +67,7 @@ handle_view_list_req(#httpd{method='GET'}=Req, _Db) -> handle_view_list_req(#httpd{method='POST', path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewName]}=Req, Db) -> DesignId = <<"_design/", DesignName/binary>>, - #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []), + #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), ListSrc = get_nested_json_value({Props}, [<<"lists">>, ListName]), ReqBody = couch_httpd:body(Req), @@ -370,13 +370,12 @@ send_doc_show_response(Lang, ShowSrc, DocId, nil, #httpd{mochi_req=MReq}=Req, Db couch_httpd_external:send_external_response(Req, JsonResp) end); -send_doc_show_response(Lang, ShowSrc, DocId, #doc{revs=[DocRev|_]}=Doc, - #httpd{mochi_req=MReq}=Req, Db) -> +send_doc_show_response(Lang, ShowSrc, DocId, #doc{revs=Revs}=Doc, #httpd{mochi_req=MReq}=Req, Db) -> % calculate the etag Headers = MReq:get(headers), Hlist = mochiweb_headers:to_list(Headers), Accept = proplists:get_value('Accept', Hlist), - CurrentEtag = couch_httpd:make_etag({Lang, ShowSrc, DocRev, Accept}), + CurrentEtag = couch_httpd:make_etag({Lang, ShowSrc, Revs, Accept}), % We know our etag now couch_httpd:etag_respond(Req, CurrentEtag, fun() -> ExternalResp = couch_query_servers:render_doc_show(Lang, ShowSrc, diff --git a/src/couchdb/couch_httpd_view.erl b/src/couchdb/couch_httpd_view.erl index 322cf945..7f338655 100644 --- a/src/couchdb/couch_httpd_view.erl +++ b/src/couchdb/couch_httpd_view.erl @@ -565,14 +565,14 @@ view_row_obj(Db, {{Key, DocId}, Value}, IncludeDocs) -> true -> Rev = case Value of {Props} -> - case is_list(Props) of - true -> - proplists:get_value(<<"_rev">>, Props, []); - _ -> - [] + case proplists:get_value(<<"_rev">>, Props) of + undefined -> + nil; + Rev0 -> + couch_doc:parse_rev(Rev0) end; _ -> - [] + nil end, ?LOG_DEBUG("Include Doc: ~p ~p", [DocId, Rev]), case (catch couch_httpd_db:couch_doc_open(Db, DocId, Rev, [])) of diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl index 82c42265..df24d4e1 100644 --- a/src/couchdb/couch_key_tree.erl +++ b/src/couchdb/couch_key_tree.erl @@ -13,7 +13,8 @@ -module(couch_key_tree). -export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]). --export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1, remove_leafs/2,get_all_leafs_full/1]). +-export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2, + get_all_leafs_full/1,stem/2,test/0]). % a key tree looks like this: % Tree -> [] or [{Key, Value, ChildTree} | SiblingTree] @@ -22,70 +23,150 @@ % And each Key < SiblingKey +% partial trees arranged by how much they are cut off. -% key tree functions +merge(A, B) -> + {Merged, HasConflicts} = + lists:foldl( + fun(InsertTree, {AccTrees, AccConflicts}) -> + case merge_one(AccTrees, InsertTree, [], false) of + {ok, Merged, Conflicts} -> + {Merged, Conflicts or AccConflicts}; + no -> + {[InsertTree | AccTrees], true} + end + end, + {A, false}, B), + if HasConflicts or + ((length(Merged) /= length(A)) and (length(Merged) /= length(B))) -> + Conflicts = conflicts; + true -> + Conflicts = no_conflicts + end, + {lists:sort(Merged), Conflicts}. + +merge_one([], Insert, OutAcc, ConflictsAcc) -> + {ok, [Insert | OutAcc], ConflictsAcc}; +merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, OutAcc, ConflictsAcc) -> + if Start =< StartInsert -> + StartA = Start, + StartB = StartInsert, + TreeA = Tree, + TreeB = TreeInsert; + true -> + StartB = Start, + StartA = StartInsert, + TreeB = Tree, + TreeA = TreeInsert + end, + case merge_at([TreeA], StartB - StartA, TreeB) of + {ok, [CombinedTrees], Conflicts} -> + merge_one(Rest, {StartA, CombinedTrees}, OutAcc, Conflicts or ConflictsAcc); + no -> + merge_one(Rest, {StartB, TreeB}, [{StartA, TreeA} | OutAcc], ConflictsAcc) + end. + +merge_at([], _Place, _Insert) -> + no; +merge_at([{Key, Value, SubTree}|Sibs], 0, {InsertKey, InsertValue, InsertSubTree}) -> + if Key == InsertKey -> + {Merge, Conflicts} = merge_simple(SubTree, InsertSubTree), + {ok, [{Key, Value, Merge} | Sibs], Conflicts}; + true -> + case merge_at(Sibs, 0, {InsertKey, InsertValue, InsertSubTree}) of + {ok, Merged, Conflicts} -> + {ok, [{Key, Value, SubTree} | Merged], Conflicts}; + no -> + no + end + end; +merge_at([{Key, Value, SubTree}|Sibs], Place, Insert) -> + case merge_at(SubTree, Place - 1,Insert) of + {ok, Merged, Conflicts} -> + {ok, [{Key, Value, Merged} | Sibs], Conflicts}; + no -> + case merge_at(Sibs, Place, Insert) of + {ok, Merged} -> + [{Key, Value, SubTree} | Merged]; + no -> + no + end + end. -% When the same key is found in the trees, the value in tree B is discarded. -merge([], B) -> - B; -merge(A, []) -> - A; -merge([ATree | ANextTree], [BTree | BNextTree]) -> +% key tree functions +merge_simple([], B) -> + {B, false}; +merge_simple(A, []) -> + {A, false}; +merge_simple([ATree | ANextTree], [BTree | BNextTree]) -> {AKey, AValue, ASubTree} = ATree, {BKey, _BValue, BSubTree} = BTree, if AKey == BKey -> %same key - MergedSubTree = merge(ASubTree, BSubTree), - MergedNextTree = merge(ANextTree, BNextTree), - [{AKey, AValue, MergedSubTree} | MergedNextTree]; + {MergedSubTree, Conflict1} = merge_simple(ASubTree, BSubTree), + {MergedNextTree, Conflict2} = merge_simple(ANextTree, BNextTree), + {[{AKey, AValue, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2}; AKey < BKey -> - [ATree | merge(ANextTree, [BTree | BNextTree])]; + {MTree, _} = merge_simple(ANextTree, [BTree | BNextTree]), + {[ATree | MTree], true}; true -> - [BTree | merge([ATree | ANextTree], BNextTree)] + {MTree, _} = merge_simple([ATree | ANextTree], BNextTree), + {[BTree | MTree], true} end. find_missing(_Tree, []) -> []; -find_missing([], Keys) -> - Keys; -find_missing([{Key, _, SubTree} | RestTree], Keys) -> - SrcKeys2 = Keys -- [Key], - SrcKeys3 = find_missing(SubTree, SrcKeys2), - find_missing(RestTree, SrcKeys3). - - -get_all_key_paths_rev([], KeyPathAcc) -> - KeyPathAcc; -get_all_key_paths_rev([{Key, Value, SubTree} | RestTree], KeyPathAcc) -> - get_all_key_paths_rev(SubTree, [{Key, Value} | KeyPathAcc]) ++ - get_all_key_paths_rev(RestTree, KeyPathAcc). - +find_missing([], SeachKeys) -> + SeachKeys; +find_missing([{Start, {Key, Value, SubTree}} | RestTree], SeachKeys) -> + PossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos >= Start], + ImpossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos < Start], + Missing = find_missing_simple(Start, [{Key, Value, SubTree}], PossibleKeys), + find_missing(RestTree, ImpossibleKeys ++ Missing). + +find_missing_simple(_Pos, _Tree, []) -> + []; +find_missing_simple(_Pos, [], SeachKeys) -> + SeachKeys; +find_missing_simple(Pos, [{Key, _, SubTree} | RestTree], SeachKeys) -> + PossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos >= Pos], + ImpossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos < Pos], + + SrcKeys2 = PossibleKeys -- [{Pos, Key}], + SrcKeys3 = find_missing_simple(Pos + 1, SubTree, SrcKeys2), + ImpossibleKeys ++ find_missing_simple(Pos, RestTree, SrcKeys3). + + +filter_leafs([], _Keys, FilteredAcc, RemovedKeysAcc) -> + {FilteredAcc, RemovedKeysAcc}; +filter_leafs([{Pos, [{LeafKey, _}|_]} = Path |Rest], Keys, FilteredAcc, RemovedKeysAcc) -> + FilteredKeys = lists:delete({Pos, LeafKey}, Keys), + if FilteredKeys == Keys -> + % this leaf is not a key we are looking to remove + filter_leafs(Rest, Keys, [Path | FilteredAcc], RemovedKeysAcc); + true -> + % this did match a key, remove both the node and the input key + filter_leafs(Rest, FilteredKeys, FilteredAcc, [{Pos, LeafKey} | RemovedKeysAcc]) + end. % Removes any branches from the tree whose leaf node(s) are in the Keys -remove_leafs(Tree, Keys) -> +remove_leafs(Trees, Keys) -> % flatten each branch in a tree into a tree path - Paths = get_all_key_paths_rev(Tree, []), + Paths = get_all_leafs_full(Trees), % filter out any that are in the keys list. - {FoundKeys, FilteredPaths} = lists:mapfoldl( - fun(Key, PathsAcc) -> - case [Path || [{LeafKey,_}|_]=Path <- PathsAcc, LeafKey /= Key] of - PathsAcc -> - {nil, PathsAcc}; - PathsAcc2 -> - {Key, PathsAcc2} - end - end, Paths, Keys), - + {FilteredPaths, RemovedKeys} = filter_leafs(Paths, Keys, [], []), + % convert paths back to trees NewTree = lists:foldl( - fun(Path,TreeAcc) -> - SingleTree = lists:foldl( + fun({PathPos, Path},TreeAcc) -> + [SingleTree] = lists:foldl( fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), - merge(TreeAcc, SingleTree) + {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]), + NewTrees end, [], FilteredPaths), - {NewTree, FoundKeys}. + {NewTree, RemovedKeys}. % get the leafs in the tree matching the keys. The matching key nodes can be @@ -94,87 +175,211 @@ remove_leafs(Tree, Keys) -> get_key_leafs(Tree, Keys) -> get_key_leafs(Tree, Keys, []). -get_key_leafs(_Tree, [], _KeyPathAcc) -> +get_key_leafs(_, [], Acc) -> + {Acc, []}; +get_key_leafs([], Keys, Acc) -> + {Acc, Keys}; +get_key_leafs([{Pos, Tree}|Rest], Keys, Acc) -> + {Gotten, RemainingKeys} = get_key_leafs_simple(Pos, [Tree], Keys, []), + get_key_leafs(Rest, RemainingKeys, Gotten ++ Acc). + +get_key_leafs_simple(_Pos, _Tree, [], _KeyPathAcc) -> {[], []}; -get_key_leafs([], KeysToGet, _KeyPathAcc) -> +get_key_leafs_simple(_Pos, [], KeysToGet, _KeyPathAcc) -> {[], KeysToGet}; -get_key_leafs([{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) -> - case KeysToGet -- [Key] of +get_key_leafs_simple(Pos, [{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) -> + case lists:delete({Pos, Key}, KeysToGet) of KeysToGet -> % same list, key not found - {LeafsFound, KeysToGet2} = get_key_leafs(SubTree, KeysToGet, [Key | KeyPathAcc]), - {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc), + {LeafsFound, KeysToGet2} = get_key_leafs_simple(Pos + 1, SubTree, KeysToGet, [Key | KeyPathAcc]), + {RestLeafsFound, KeysRemaining} = get_key_leafs_simple(Pos, RestTree, KeysToGet2, KeyPathAcc), {LeafsFound ++ RestLeafsFound, KeysRemaining}; KeysToGet2 -> - LeafsFound = get_all_leafs([Tree], KeyPathAcc), + LeafsFound = get_all_leafs_simple(Pos, [Tree], KeyPathAcc), LeafKeysFound = [LeafKeyFound || {LeafKeyFound, _, _} <- LeafsFound], KeysToGet2 = KeysToGet2 -- LeafKeysFound, - {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc), + {RestLeafsFound, KeysRemaining} = get_key_leafs_simple(Pos, RestTree, KeysToGet2, KeyPathAcc), {LeafsFound ++ RestLeafsFound, KeysRemaining} end. get(Tree, KeysToGet) -> {KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet), - FixedResults = [ {Key, Value, [Key0 || {Key0, _} <- Path]} || [{Key, Value}|_] = Path <- KeyPaths], + FixedResults = [ {Value, {Pos, [Key0 || {Key0, _} <- Path]}} || {Pos, [{_Key, Value}|_]=Path} <- KeyPaths], {FixedResults, KeysNotFound}. get_full_key_paths(Tree, Keys) -> get_full_key_paths(Tree, Keys, []). -get_full_key_paths(_Tree, [], _KeyPathAcc) -> +get_full_key_paths(_, [], Acc) -> + {Acc, []}; +get_full_key_paths([], Keys, Acc) -> + {Acc, Keys}; +get_full_key_paths([{Pos, Tree}|Rest], Keys, Acc) -> + {Gotten, RemainingKeys} = get_full_key_paths(Pos, [Tree], Keys, []), + get_full_key_paths(Rest, RemainingKeys, Gotten ++ Acc). + + +get_full_key_paths(_Pos, _Tree, [], _KeyPathAcc) -> {[], []}; -get_full_key_paths([], KeysToGet, _KeyPathAcc) -> +get_full_key_paths(_Pos, [], KeysToGet, _KeyPathAcc) -> {[], KeysToGet}; -get_full_key_paths([{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) -> - KeysToGet2 = KeysToGet -- [KeyId], +get_full_key_paths(Pos, [{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) -> + KeysToGet2 = KeysToGet -- [{Pos, KeyId}], CurrentNodeResult = case length(KeysToGet2) == length(KeysToGet) of true -> % not in the key list. []; false -> % this node is the key list. return it - [[{KeyId, Value} | KeyPathAcc]] + [{Pos, [{KeyId, Value} | KeyPathAcc]}] end, - {KeysGotten, KeysRemaining} = get_full_key_paths(SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]), - {KeysGotten2, KeysRemaining2} = get_full_key_paths(RestTree, KeysRemaining, KeyPathAcc), + {KeysGotten, KeysRemaining} = get_full_key_paths(Pos + 1, SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]), + {KeysGotten2, KeysRemaining2} = get_full_key_paths(Pos, RestTree, KeysRemaining, KeyPathAcc), {CurrentNodeResult ++ KeysGotten ++ KeysGotten2, KeysRemaining2}. get_all_leafs_full(Tree) -> get_all_leafs_full(Tree, []). -get_all_leafs_full([], _KeyPathAcc) -> +get_all_leafs_full([], Acc) -> + Acc; +get_all_leafs_full([{Pos, Tree} | Rest], Acc) -> + get_all_leafs_full(Rest, get_all_leafs_full_simple(Pos, [Tree], []) ++ Acc). + +get_all_leafs_full_simple(_Pos, [], _KeyPathAcc) -> []; -get_all_leafs_full([{KeyId, Value, []} | RestTree], KeyPathAcc) -> - [[{KeyId, Value} | KeyPathAcc] | get_all_leafs_full(RestTree, KeyPathAcc)]; -get_all_leafs_full([{KeyId, Value, SubTree} | RestTree], KeyPathAcc) -> - get_all_leafs_full(SubTree, [{KeyId, Value} | KeyPathAcc]) ++ get_all_leafs_full(RestTree, KeyPathAcc). +get_all_leafs_full_simple(Pos, [{KeyId, Value, []} | RestTree], KeyPathAcc) -> + [{Pos, [{KeyId, Value} | KeyPathAcc]} | get_all_leafs_full_simple(Pos, RestTree, KeyPathAcc)]; +get_all_leafs_full_simple(Pos, [{KeyId, Value, SubTree} | RestTree], KeyPathAcc) -> + get_all_leafs_full_simple(Pos + 1, SubTree, [{KeyId, Value} | KeyPathAcc]) ++ get_all_leafs_full_simple(Pos, RestTree, KeyPathAcc). -get_all_leafs(Tree) -> - get_all_leafs(Tree, []). +get_all_leafs(Trees) -> + get_all_leafs(Trees, []). + +get_all_leafs([], Acc) -> + Acc; +get_all_leafs([{Pos, Tree}|Rest], Acc) -> + get_all_leafs(Rest, get_all_leafs_simple(Pos, [Tree], []) ++ Acc). -get_all_leafs([], _KeyPathAcc) -> +get_all_leafs_simple(_Pos, [], _KeyPathAcc) -> []; -get_all_leafs([{KeyId, Value, []} | RestTree], KeyPathAcc) -> - [{KeyId, Value, [KeyId | KeyPathAcc]} | get_all_leafs(RestTree, KeyPathAcc)]; -get_all_leafs([{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) -> - get_all_leafs(SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs(RestTree, KeyPathAcc). +get_all_leafs_simple(Pos, [{KeyId, Value, []} | RestTree], KeyPathAcc) -> + [{Value, {Pos, [KeyId | KeyPathAcc]}} | get_all_leafs_simple(Pos, RestTree, KeyPathAcc)]; +get_all_leafs_simple(Pos, [{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) -> + get_all_leafs_simple(Pos + 1, SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs_simple(Pos, RestTree, KeyPathAcc). + -get_leaf_keys([]) -> - []; -get_leaf_keys([{Key, _Value, []} | RestTree]) -> - [Key | get_leaf_keys(RestTree)]; -get_leaf_keys([{_Key, _Value, SubTree} | RestTree]) -> - get_leaf_keys(SubTree) ++ get_leaf_keys(RestTree). - count_leafs([]) -> 0; -count_leafs([{_Key, _Value, []} | RestTree]) -> - 1 + count_leafs(RestTree); -count_leafs([{_Key, _Value, SubTree} | RestTree]) -> - count_leafs(SubTree) + count_leafs(RestTree). +count_leafs([{_Pos,Tree}|Rest]) -> + count_leafs_simple([Tree]) + count_leafs(Rest). +count_leafs_simple([]) -> + 0; +count_leafs_simple([{_Key, _Value, []} | RestTree]) -> + 1 + count_leafs_simple(RestTree); +count_leafs_simple([{_Key, _Value, SubTree} | RestTree]) -> + count_leafs_simple(SubTree) + count_leafs_simple(RestTree). + map(_Fun, []) -> []; -map(Fun, [{Key, Value, SubTree} | RestTree]) -> - Value2 = Fun(Key, Value), - [{Key, Value2, map(Fun, SubTree)} | map(Fun, RestTree)]. +map(Fun, [{Pos, Tree}|Rest]) -> + [NewTree] = map_simple(Fun, Pos, [Tree]), + [{Pos, NewTree} | map(Fun, Rest)]. + +map_simple(_Fun, _Pos, []) -> + []; +map_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) -> + Value2 = Fun({Pos, Key}, Value), + [{Key, Value2, map_simple(Fun, Pos + 1, SubTree)} | map_simple(Fun, Pos, RestTree)]. + + +stem(Trees, Limit) -> + % flatten each branch in a tree into a tree path + Paths = get_all_leafs_full(Trees), + + Paths2 = [{Pos, lists:sublist(Path, Limit)} || {Pos, Path} <- Paths], + + % convert paths back to trees + lists:foldl( + fun({PathPos, Path},TreeAcc) -> + [SingleTree] = lists:foldl( + fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path), + {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]), + NewTrees + end, [], Paths2). +test() -> + EmptyTree = [], + One = [{0, {"1","foo",[]}}], + TwoSibs = [{0, {"1","foo",[]}}, + {0, {"2","foo",[]}}], + OneChild = [{0, {"1","foo",[{"1a", "bar", []}]}}], + TwoChild = [{0, {"1","foo", [{"1a", "bar", [{"1aa", "bar", []}]}]}}], + TwoChildSibs = [{0, {"1","foo", [{"1a", "bar", []}, + {"1b", "bar", []}]}}], + Stemmed1a = [{1, {"1a", "bar", [{"1aa", "bar", []}]}}], + Stemmed1aa = [{2, {"1aa", "bar", []}}], + + {EmptyTree, no_conflicts} = merge(EmptyTree, EmptyTree), + {One, no_conflicts} = merge(EmptyTree, One), + {One, no_conflicts} = merge(One, EmptyTree), + {TwoSibs, no_conflicts} = merge(One, TwoSibs), + {One, no_conflicts} = merge(One, One), + {TwoChild, no_conflicts} = merge(TwoChild, TwoChild), + {TwoChildSibs, no_conflicts} = merge(TwoChildSibs, TwoChildSibs), + {TwoChild, no_conflicts} = merge(TwoChild, Stemmed1aa), + {TwoChild, no_conflicts} = merge(TwoChild, Stemmed1a), + {Stemmed1a, no_conflicts} = merge(Stemmed1a, Stemmed1aa), + Expect1 = OneChild ++ Stemmed1aa, + {Expect1, conflicts} = merge(OneChild, Stemmed1aa), + {TwoChild, no_conflicts} = merge(Expect1, TwoChild), + + []=find_missing(TwoChildSibs, [{0,"1"}, {1,"1a"}]), + [{0, "10"}, {100, "x"}]=find_missing(TwoChildSibs, [{0,"1"}, {0, "10"}, {1,"1a"}, {100, "x"}]), + [{0, "1"}, {100, "x"}]=find_missing(Stemmed1a, [{0,"1"}, {1,"1a"}, {100, "x"}]), + [{0, "1"}, {1,"1a"}, {100, "x"}]=find_missing(Stemmed1aa, [{0,"1"}, {1,"1a"}, {100, "x"}]), + + {TwoChildSibs, []} = remove_leafs(TwoChildSibs, []), + {TwoChildSibs, []} = remove_leafs(TwoChildSibs, [{0, "1"}]), + {OneChild, [{1, "1b"}]} = remove_leafs(TwoChildSibs, [{1, "1b"}]), + {[], [{1, "1b"},{1, "1a"}]} = remove_leafs(TwoChildSibs, [{1, "1a"}, {1, "1b"}]), + {Stemmed1a, []} = remove_leafs(Stemmed1a, [{1, "1a"}]), + {[], [{2, "1aa"}]} = remove_leafs(Stemmed1a, [{2, "1aa"}]), + {TwoChildSibs, []} = remove_leafs(TwoChildSibs, []), + + {[],[{0,"x"}]} = get_key_leafs(TwoChildSibs, [{0, "x"}]), + + {[{"bar", {1, ["1a","1"]}}],[]} = get_key_leafs(TwoChildSibs, [{1, "1a"}]), + {[{"bar", {1, ["1a","1"]}},{"bar",{1, ["1b","1"]}}],[]} = get_key_leafs(TwoChildSibs, [{0, "1"}]), + + {[{"foo", {0, ["1"]}}],[]} = get(TwoChildSibs, [{0, "1"}]), + {[{"bar", {1, ["1a", "1"]}}],[]} = get(TwoChildSibs, [{1, "1a"}]), + + {[{0,[{"1", "foo"}]}],[]} = get_full_key_paths(TwoChildSibs, [{0, "1"}]), + {[{1,[{"1a", "bar"},{"1", "foo"}]}],[]} = get_full_key_paths(TwoChildSibs, [{1, "1a"}]), + + [{2, [{"1aa", "bar"},{"1a", "bar"}]}] = get_all_leafs_full(Stemmed1a), + [{1, [{"1a", "bar"},{"1", "foo"}]}, {1, [{"1b", "bar"},{"1", "foo"}]}] = get_all_leafs_full(TwoChildSibs), + + [{"bar", {2, ["1aa","1a"]}}] = get_all_leafs(Stemmed1a), + [{"bar", {1, ["1a", "1"]}}, {"bar", {1, ["1b","1"]}}] = get_all_leafs(TwoChildSibs), + + 0 = count_leafs(EmptyTree), + 1 = count_leafs(One), + 2 = count_leafs(TwoChildSibs), + 1 = count_leafs(Stemmed1a), + + TwoChild = stem(TwoChild, 3), + Stemmed1a = stem(TwoChild, 2), + Stemmed1aa = stem(TwoChild, 1), + ok. + + + + + + + + + + +
\ No newline at end of file diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 89d40be3..3647f6db 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -15,11 +15,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([replicate/3]). +-export([replicate/2]). -include_lib("couch_db.hrl"). -%% @spec replicate(Source::binary(), Target::binary(), Options::proplist()) -> +%% @spec replicate(Source::binary(), Target::binary()) -> %% {ok, Stats} | {error, Reason} %% @doc Triggers a replication. Stats is a JSON Object with the following %% keys: session_id (UUID), source_last_seq (integer), and history (array). @@ -30,26 +30,29 @@ %% The supervisor will try to restart the replication in case of any error %% other than shutdown. Just call this function again to listen for the %% result of the retry. -replicate(Source, Target, Options) -> - Id = <<Source/binary, ":", Target/binary>>, - Args = [?MODULE, [Source,Target,Options], []], +replicate(Source, Target) -> - Replicator = {Id, + {ok, HostName} = inet:gethostname(), + RepId = couch_util:to_hex( + erlang:md5(term_to_binary([HostName, Source, Target]))), + Args = [?MODULE, [RepId, Source,Target], []], + + Replicator = {RepId, {gen_server, start_link, Args}, transient, - 10000, + 1, worker, [?MODULE] }, Server = case supervisor:start_child(couch_rep_sup, Replicator) of {ok, Pid} -> - ?LOG_INFO("starting new replication ~p at ~p", [Id, Pid]), + ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), Pid; {error, already_present} -> - case supervisor:restart_child(couch_rep_sup, Id) of + case supervisor:restart_child(couch_rep_sup, RepId) of {ok, Pid} -> - ?LOG_INFO("starting replication ~p at ~p", [Id, Pid]), + ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), Pid; {error, running} -> %% this error occurs if multiple replicators are racing @@ -57,16 +60,16 @@ replicate(Source, Target, Options) -> %% the Pid by calling start_child again. {error, {already_started, Pid}} = supervisor:start_child(couch_rep_sup, Replicator), - ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]), + ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]), Pid end; {error, {already_started, Pid}} -> - ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]), + ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]), Pid end, case gen_server:call(Server, get_result, infinity) of - retry -> replicate(Source, Target, Options); + retry -> replicate(Source, Target); Else -> Else end. @@ -79,6 +82,7 @@ replicate(Source, Target, Options) -> headers }). + -record(state, { context, current_seq, @@ -90,18 +94,14 @@ replicate(Source, Target, Options) -> listeners = [] }). -init([Source, Target, Options]) -> + +init([RepId, Source, Target]) -> process_flag(trap_exit, true), - {ok, DbSrc} = - open_db(Source, proplists:get_value(source_options, Options, [])), - {ok, DbTgt} = - open_db(Target, proplists:get_value(target_options, Options, [])), + {ok, DbSrc, SrcName} = open_db(Source), + {ok, DbTgt, TgtName} = open_db(Target), - {ok, Host} = inet:gethostname(), - HostBin = list_to_binary(Host), - DocKey = <<?LOCAL_DOC_PREFIX, HostBin/binary, ":", Source/binary, ":", - Target/binary>>, + DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), {ok, InfoSrc} = get_db_info(DbSrc), {ok, InfoTgt} = get_db_info(DbTgt), @@ -110,49 +110,49 @@ init([Source, Target, Options]) -> SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc), TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt), - case proplists:get_value(full, Options, false) - orelse proplists:get_value("full", Options, false) of + RepRecDocSrc = + case open_doc(DbSrc, DocKey, []) of + {ok, SrcDoc} -> + ?LOG_DEBUG("Found existing replication record on source", []), + SrcDoc; + _ -> #doc{id=DocKey} + end, + + RepRecDocTgt = + case open_doc(DbTgt, DocKey, []) of + {ok, TgtDoc} -> + ?LOG_DEBUG("Found existing replication record on target", []), + TgtDoc; + _ -> #doc{id=DocKey} + end, + + #doc{body={RepRecProps}} = RepRecDocSrc, + #doc{body={RepRecPropsTgt}} = RepRecDocTgt, + + case proplists:get_value(<<"session_id">>, RepRecProps) == + proplists:get_value(<<"session_id">>, RepRecPropsTgt) of true -> - RepRecSrc = RepRecTgt = #doc{id=DocKey}; + % if the records have the same session id, + % then we have a valid replication history + OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0), + OldHistory = proplists:get_value(<<"history">>, RepRecProps, []); false -> - RepRecSrc = case open_doc(DbSrc, DocKey, []) of - {ok, SrcDoc} -> - ?LOG_DEBUG("Found existing replication record on source", []), - SrcDoc; - _ -> #doc{id=DocKey} - end, - - RepRecTgt = case open_doc(DbTgt, DocKey, []) of - {ok, TgtDoc} -> - ?LOG_DEBUG("Found existing replication record on target", []), - TgtDoc; - _ -> #doc{id=DocKey} - end - end, - - #doc{body={OldRepHistoryProps}} = RepRecSrc, - #doc{body={OldRepHistoryPropsTrg}} = RepRecTgt, - - SeqNum = case OldRepHistoryProps == OldRepHistoryPropsTrg of - true -> - % if the records are identical, then we have a valid replication history - proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0); - false -> - ?LOG_INFO("Replication records differ. " + ?LOG_INFO("Replication records differ. " "Performing full replication instead of incremental.", []), - ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", - [OldRepHistoryProps, OldRepHistoryPropsTrg]), - 0 + ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", + [RepRecProps, RepRecPropsTgt]), + OldSeqNum = 0, + OldHistory = [] end, Context = [ - {start_seq, SeqNum}, - {history, OldRepHistoryProps}, + {start_seq, OldSeqNum}, + {history, OldHistory}, {rep_starttime, ReplicationStartTime}, {src_starttime, SrcInstanceStartTime}, {tgt_starttime, TgtInstanceStartTime}, - {src_record, RepRecSrc}, - {tgt_record, RepRecTgt} + {src_record, RepRecDocSrc}, + {tgt_record, RepRecDocTgt} ], Stats = ets:new(replication_stats, [set, private]), @@ -160,16 +160,17 @@ init([Source, Target, Options]) -> ets:insert(Stats, {missing_revs, 0}), ets:insert(Stats, {docs_read, 0}), ets:insert(Stats, {docs_written, 0}), + ets:insert(Stats, {doc_write_failures, 0}), - couch_task_status:add_task("Replication", <<Source/binary, " -> ", - Target/binary>>, "Starting"), + couch_task_status:add_task("Replication", <<SrcName/binary, " -> ", + TgtName/binary>>, "Starting"), Parent = self(), - Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{SeqNum,0}) end), + Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end), State = #state{ context = Context, - current_seq = SeqNum, + current_seq = OldSeqNum, enum_pid = Pid, source = DbSrc, target = DbTgt, @@ -178,7 +179,6 @@ init([Source, Target, Options]) -> {ok, State}. - handle_call(get_result, From, #state{listeners=L} = State) -> {noreply, State#state{listeners=[From|L]}}; @@ -191,7 +191,7 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) target = Target, stats = Stats } = State, - + ets:update_counter(Stats, missing_revs, length(Revs)), %% get document(s) @@ -203,8 +203,11 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) {NewBuffer, NewContext} = case couch_util:should_flush() of true -> Docs2 = lists:flatten([Docs|Buffer]), - ok = update_docs(Target, Docs2, [], false), - ets:update_counter(Stats, docs_written, length(Docs2)), + {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes), + dump_update_errors(Errors), + ets:update_counter(Stats, doc_write_failures, length(Errors)), + ets:update_counter(Stats, docs_written, length(Docs2) - + length(Errors)), {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats), {[], Ctxt}; false -> @@ -255,8 +258,11 @@ terminate(normal, State) -> stats = Stats } = State, - ok = update_docs(Target, lists:flatten(Buffer), [], false), - ets:update_counter(Stats, docs_written, lists:flatlength(Buffer)), + {ok, Errors} = update_docs(Target, lists:flatten(Buffer), [], replicated_changes), + dump_update_errors(Errors), + ets:update_counter(Stats, doc_write_failures, length(Errors)), + ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) - + length(Errors)), couch_task_status:update("Finishing"), @@ -264,9 +270,12 @@ terminate(normal, State) -> ets:delete(Stats), close_db(Target), - %% reply to original requester - [Original|Rest] = Listeners, - gen_server:reply(Original, {ok, NewRepHistory}), + case Listeners of + [Original|Rest] -> + %% reply to original requester + gen_server:reply(Original, {ok, NewRepHistory}); + Rest -> ok + end, %% maybe trigger another replication. If this replicator uses a local %% source Db, changes to that Db since we started will not be included in @@ -304,6 +313,16 @@ code_change(_OldVsn, State, _Extra) -> %% internal functions %%============================================================================= + +% we should probably write these to a special replication log +% or have a callback where the caller decides what to do with replication +% errors. +dump_update_errors([]) -> ok; +dump_update_errors([{{Id, Rev}, Error}|Rest]) -> + ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p", + [Id, couch_doc:rev_to_str(Rev), Error]), + dump_update_errors(Rest). + attachment_loop(ReqId) -> couch_util:should_flush(), receive @@ -354,6 +373,16 @@ attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) -> end, {Name, {Type, {RcvFun, Length}}}. + +open_db({remote, Url, Headers})-> + {ok, #http_db{uri=?b2l(Url), headers=Headers}, Url}; +open_db({local, DbName, UserCtx})-> + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> {ok, Db, DbName}; + Error -> Error + end. + + close_db(#http_db{})-> ok; close_db(Db)-> @@ -362,27 +391,38 @@ close_db(Db)-> do_checkpoint(Source, Target, Context, NewSeqNum, Stats) -> ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), [ - {start_seq, SeqNum}, - {history, OldRepHistoryProps}, + {start_seq, StartSeqNum}, + {history, OldHistory}, {rep_starttime, ReplicationStartTime}, {src_starttime, SrcInstanceStartTime}, {tgt_starttime, TgtInstanceStartTime}, - {src_record, RepRecSrc}, - {tgt_record, RepRecTgt} + {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc}, + {tgt_record, RepRecDocTgt} ] = Context, - NewHistory = case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of + case NewSeqNum == StartSeqNum andalso OldHistory /= [] of true -> % nothing changed, don't record results - {OldRepHistoryProps}; + {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context}; false -> + % something changed, record results for incremental replication, + % commit changes to both src and tgt. The src because if changes - % we replicated are lost, we'll record the a seq number of ahead - % of what was committed and therefore lose future changes with the - % same seq nums. - {ok, SrcInstanceStartTime2} = ensure_full_commit(Source), + % we replicated are lost, we'll record the a seq number ahead + % of what was committed. If those changes are lost and the seq number + % reverts to a previous committed value, we will skip future changes + % when new doc updates are given our already replicated seq nums. + + % commit the src async + ParentPid = self(), + SrcCommitPid = spawn_link(fun() -> + ParentPid ! {self(), ensure_full_commit(Source)} end), + + % commit tgt sync {ok, TgtInstanceStartTime2} = ensure_full_commit(Target), + receive {SrcCommitPid, {ok, SrcInstanceStartTime2}} -> ok end, + RecordSeqNum = if SrcInstanceStartTime2 == SrcInstanceStartTime andalso TgtInstanceStartTime2 == TgtInstanceStartTime -> @@ -391,60 +431,57 @@ do_checkpoint(Source, Target, Context, NewSeqNum, Stats) -> ?LOG_INFO("A server has restarted sinced replication start. " "Not recording the new sequence number to ensure the " "replication is redone and documents reexamined.", []), - SeqNum + StartSeqNum end, - %% format replication history - JsonStats = [ + NewHistoryEntry = { + [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, + {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_last_seq">>, StartSeqNum}, + {<<"end_last_seq">>, NewSeqNum}, {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)} + {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, + {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)} + ]}, + % limit history to 50 entries + HistEntries =lists:sublist([NewHistoryEntry | OldHistory], 50), + + NewRepHistory = + {[{<<"session_id">>, couch_util:new_uuid()}, + {<<"source_last_seq">>, RecordSeqNum}, + {<<"history">>, HistEntries}]}, + + {ok, {SrcRevPos,SrcRevId}} = update_doc(Source, + RepRecDocSrc#doc{body=NewRepHistory}, []), + {ok, {TgtRevPos,TgtRevId}} = update_doc(Target, + RepRecDocTgt#doc{body=NewRepHistory}, []), + + NewContext = [ + {start_seq, StartSeqNum}, + {history, OldHistory}, + {rep_starttime, ReplicationStartTime}, + {src_starttime, SrcInstanceStartTime}, + {tgt_starttime, TgtInstanceStartTime}, + {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}}, + {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}} ], - - HistEntries =[ - { - [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, - {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, - {<<"start_last_seq">>, SeqNum}, - {<<"end_last_seq">>, NewSeqNum} | JsonStats]} - | proplists:get_value(<<"history">>, OldRepHistoryProps, [])], - % something changed, record results - {[ - {<<"session_id">>, couch_util:new_uuid()}, - {<<"source_last_seq">>, RecordSeqNum}, - {<<"history">>, lists:sublist(HistEntries, 50)} - ]} - end, - %% update local documents - RepRecSrc = proplists:get_value(src_record, Context), - RepRecTgt = proplists:get_value(tgt_record, Context), - {ok, TgtRev} = update_local_doc(Target, RepRecTgt#doc{body=NewHistory}, []), - {ok, SrcRev} = update_local_doc(Source, RepRecSrc#doc{body=NewHistory}, []), + {ok, NewRepHistory, NewContext} - NewContext = [ - {start_seq, SeqNum}, - {history, OldRepHistoryProps}, - {rep_starttime, ReplicationStartTime}, - {src_starttime, SrcInstanceStartTime}, - {tgt_starttime, TgtInstanceStartTime}, - {src_record, RepRecSrc#doc{revs=[SrcRev]}}, - {tgt_record, RepRecTgt#doc{revs=[TgtRev]}} - ], - - {ok, NewHistory, NewContext}. + end. do_http_request(Url, Action, Headers) -> do_http_request(Url, Action, Headers, []). do_http_request(Url, Action, Headers, JsonBody) -> - do_http_request(?b2l(?l2b(Url)), Action, Headers, JsonBody, 10). + do_http_request(Url, Action, Headers, JsonBody, 10). do_http_request(Url, Action, _Headers, _JsonBody, 0) -> ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s", [Action, Url]), - exit({http_request_failed, ?l2b(Url)}); + exit({http_request_failed, Url}); do_http_request(Url, Action, Headers, JsonBody, Retries) -> ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]), Body = @@ -498,7 +535,6 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) -> [] -> gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity); DocInfoList -> - % UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList], SrcRevsList = lists:map(fun(SrcDocInfo) -> #doc_info{id=Id, rev=Rev, @@ -521,13 +557,8 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) -> enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2}) end. -fix_url(UrlBin) -> - Url = binary_to_list(UrlBin), - case lists:last(Url) of - $/ -> Url; - _ -> Url ++ "/" - end. + get_db_info(#http_db{uri=DbUrl, headers=Headers}) -> {DbProps} = do_http_request(DbUrl, get, Headers), {ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]}; @@ -542,12 +573,12 @@ get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) -> {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList), #doc_info{ id=proplists:get_value(<<"id">>, RowInfoList), - rev=proplists:get_value(<<"rev">>, RowValueProps), + rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), update_seq = proplists:get_value(<<"key">>, RowInfoList), conflict_revs = - proplists:get_value(<<"conflicts">>, RowValueProps, []), + couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, [])), deleted_conflict_revs = - proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []), + couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, [])), deleted = proplists:get_value(<<"deleted">>, RowValueProps, false) } end, proplists:get_value(<<"rows">>, Results)); @@ -561,25 +592,18 @@ get_doc_info_list(DbSource, StartSeq) -> lists:reverse(DocInfoList). get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) -> + DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList], {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, - {DocIdRevsList}), - {MissingRevs} = proplists:get_value(<<"missing_revs">>, ResponseMembers), - {ok, MissingRevs}; + {DocIdRevsList2}), + {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers), + DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList], + {ok, DocMissingRevsList2}; get_missing_revs(Db, DocId) -> couch_db:get_missing_revs(Db, DocId). -open_http_db(UrlBin, Options) -> - Headers = proplists:get_value(headers, Options, {[]}), - {ok, #http_db{uri=fix_url(UrlBin), headers=Headers}}. - -open_db(<<"http://", _/binary>>=Url, Options)-> - open_http_db(Url, Options); -open_db(<<"https://", _/binary>>=Url, Options)-> - open_http_db(Url, Options); -open_db(DbName, Options)-> - couch_db:open(DbName, Options). - -open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) -> + +open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, Options) -> + [] = Options, case do_http_request(DbUrl ++ url_encode(DocId), get, Headers) of {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} -> {couch_util:to_existing_atom(ErrId), Reason}; @@ -589,7 +613,9 @@ open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) -> open_doc(Db, DocId, Options) -> couch_db:open_doc(Db, DocId, Options). -open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) -> +open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0, + [latest]) -> + Revs = couch_doc:rev_to_strs(Revs0), BaseUrl = DbUrl ++ url_encode(DocId) ++ "?revs=true&latest=true", %% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests @@ -612,39 +638,52 @@ open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) -> lists:flatten(?JSON_ENCODE(lists:reverse(Rest))), get, Headers) end, - Results = - lists:map(fun({[{<<"missing">>, Rev}]}) -> - {{not_found, missing}, Rev}; - ({[{<<"ok">>, JsonDoc}]}) -> + Results = + lists:map( + fun({[{<<"missing">>, Rev}]}) -> + {{not_found, missing}, couch_doc:parse_rev(Rev)}; + ({[{<<"ok">>, JsonDoc}]}) -> #doc{id=Id, attachments=Attach} = Doc = couch_doc:from_json_obj(JsonDoc), Attach2 = [attachment_stub_converter(DbS,Id,A) || A <- Attach], {ok, Doc#doc{attachments=Attach2}} - end, JsonResults), + end, JsonResults), {ok, Results}; open_doc_revs(Db, DocId, Revs, Options) -> couch_db:open_doc_revs(Db, DocId, Revs, Options). -update_docs(_, [], _, _) -> - ok; -update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], NewEdits) -> - JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], - {Returned} = - do_http_request(DbUrl ++ "_bulk_docs", post, Headers, - {[{new_edits, NewEdits}, {docs, JsonDocs}]}), - true = proplists:get_value(<<"ok">>, Returned), - ok; -update_docs(Db, Docs, Options, NewEdits) -> - couch_db:update_docs(Db, Docs, Options, NewEdits). -update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) -> +update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) -> + [] = Options, Url = DbUrl ++ url_encode(DocId), {ResponseMembers} = do_http_request(Url, put, Headers, - couch_doc:to_json_obj(Doc, [revs,attachments])), - RevId = proplists:get_value(<<"rev">>, ResponseMembers), - {ok, RevId}; -update_local_doc(Db, Doc, Options) -> + couch_doc:to_json_obj(Doc, [attachments])), + Rev = proplists:get_value(<<"rev">>, ResponseMembers), + {ok, couch_doc:parse_rev(Rev)}; +update_doc(Db, Doc, Options) -> couch_db:update_doc(Db, Doc, Options). +update_docs(_, [], _, _) -> + {ok, []}; +update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], replicated_changes) -> + JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], + ErrorsJson = + do_http_request(DbUrl ++ "_bulk_docs", post, Headers, + {[{new_edits, false}, {docs, JsonDocs}]}), + ErrorsList = + lists:map( + fun({Props}) -> + Id = proplists:get_value(<<"id">>, Props), + Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)), + ErrId = couch_util:to_existing_atom( + proplists:get_value(<<"error">>, Props)), + Reason = proplists:get_value(<<"reason">>, Props), + Error = {ErrId, Reason}, + {{Id, Rev}, Error} + end, ErrorsJson), + {ok, ErrorsList}; +update_docs(Db, Docs, Options, UpdateType) -> + couch_db:update_docs(Db, Docs, Options, UpdateType). + up_to_date(#http_db{}, _Seq) -> true; up_to_date(Source, Seq) -> diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl index 21d6eb4c..bd377d80 100644 --- a/src/couchdb/couch_util.erl +++ b/src/couchdb/couch_util.erl @@ -13,7 +13,7 @@ -module(couch_util). -export([start_driver/1]). --export([should_flush/0, should_flush/1, to_existing_atom/1]). +-export([should_flush/0, should_flush/1, to_existing_atom/1, to_binary/1]). -export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]). -export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]). -export([encodeBase64/1, decodeBase64/1, to_hex/1,parse_term/1,dict_find/3]). @@ -57,6 +57,19 @@ to_digit(N) when N < 10 -> $0 + N; to_digit(N) -> $a + N-10. +to_binary(V) when is_binary(V) -> + V; +to_binary(V) when is_list(V) -> + try list_to_binary(V) + catch + _ -> list_to_binary(io_lib:format("~p", [V])) + end; +to_binary(V) when is_atom(V) -> + list_to_binary(atom_to_list(V)); +to_binary(V) -> + list_to_binary(io_lib:format("~p", [V])). + + parse_term(Bin) when is_binary(Bin)-> parse_term(binary_to_list(Bin)); parse_term(List) -> |