diff options
author | Robert Newson <robert.newson@cloudant.com> | 2011-06-22 15:10:13 -0400 |
---|---|---|
committer | Robert Newson <robert.newson@cloudant.com> | 2011-06-22 15:10:13 -0400 |
commit | e2e9e07398c9697a13f6db36ed9d0403fa6c89f1 (patch) | |
tree | 39faca2adc29ad57c0084c942f1c2eb6ba08774d /apps/couch/src | |
parent | ca738083b0a5053a1402b8eac55d6228b6e952b2 (diff) | |
parent | d8dc16093a26c53407d8bf702698848104ba01d6 (diff) |
Merge remote-tracking branch 'upstream/1.1.x' into 0.4.x
Conflicts:
acinclude.m4.in
apps/couch/src/couch_app.erl
apps/couch/src/couch_doc.erl
apps/couch/src/couch_view.erl
configure.ac
share/www/script/test/replicator_db.js
src/erlang-oauth/Makefile.am
test/etap/Makefile.am
Diffstat (limited to 'apps/couch/src')
-rw-r--r-- | apps/couch/src/couch_doc.erl | 20 | ||||
-rw-r--r-- | apps/couch/src/couch_httpd_db.erl | 14 | ||||
-rw-r--r-- | apps/couch/src/couch_key_tree.erl | 48 | ||||
-rw-r--r-- | apps/couch/src/couch_rep.erl | 45 | ||||
-rw-r--r-- | apps/couch/src/couch_replication_manager.erl | 41 | ||||
-rw-r--r-- | apps/couch/src/couch_view.erl | 4 | ||||
-rw-r--r-- | apps/couch/src/couch_view_compactor.erl | 6 |
7 files changed, 154 insertions, 24 deletions
diff --git a/apps/couch/src/couch_doc.erl b/apps/couch/src/couch_doc.erl index c7b9dbb9..33d7e3cf 100644 --- a/apps/couch/src/couch_doc.erl +++ b/apps/couch/src/couch_doc.erl @@ -18,6 +18,7 @@ -export([validate_docid/1]). -export([doc_from_multi_part_stream/2]). -export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]). +-export([abort_multi_part_stream/1]). -include("couch_db.hrl"). @@ -499,7 +500,7 @@ doc_from_multi_part_stream(ContentType, DataFun) -> receive {Parser, finished} -> ok end, erlang:put(mochiweb_request_recv, true) end, - {ok, Doc#doc{atts=Atts2}, WaitFun} + {ok, Doc#doc{atts=Atts2}, WaitFun, Parser} end. mp_parse_doc({headers, H}, []) -> @@ -590,3 +591,20 @@ maybe_send_data({ChunkList, Offset, Counters, Waiting}) -> end end end. + +abort_multi_part_stream(Parser) -> + abort_multi_part_stream(Parser, erlang:monitor(process, Parser)). + +abort_multi_part_stream(Parser, MonRef) -> + case is_process_alive(Parser) of + true -> + Parser ! {get_bytes, self()}, + receive + {bytes, _Bytes} -> + abort_multi_part_stream(Parser, MonRef); + {'DOWN', MonRef, _, _, _} -> + ok + end; + false -> + erlang:demonitor(MonRef, [flush]) + end. diff --git a/apps/couch/src/couch_httpd_db.erl b/apps/couch/src/couch_httpd_db.erl index e3638b25..71204598 100644 --- a/apps/couch/src/couch_httpd_db.erl +++ b/apps/couch/src/couch_httpd_db.erl @@ -692,12 +692,18 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> RespHeaders = [{"Location", Loc}], case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of ("multipart/related;" ++ _) = ContentType -> - {ok, Doc0, WaitFun} = couch_doc:doc_from_multi_part_stream( + {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream( ContentType, fun() -> receive_request_data(Req) end), Doc = couch_doc_from_req(Req, DocId, Doc0), - Result = update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType), - WaitFun(), - Result; + try + Result = update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType), + WaitFun(), + Result + catch throw:Err -> + % Document rejected by a validate_doc_update function. + couch_doc:abort_multi_part_stream(Parser), + throw(Err) + end; _Else -> case couch_httpd:qs_value(Req, "batch") of "ok" -> diff --git a/apps/couch/src/couch_key_tree.erl b/apps/couch/src/couch_key_tree.erl index c5fd1088..2f3c6abf 100644 --- a/apps/couch/src/couch_key_tree.erl +++ b/apps/couch/src/couch_key_tree.erl @@ -10,6 +10,41 @@ % License for the specific language governing permissions and limitations under % the License. +%% @doc Data structure used to represent document edit histories. + +%% A key tree is used to represent the edit history of a document. Each node of +%% the tree represents a particular version. Relations between nodes represent +%% the order that these edits were applied. For instance, a set of three edits +%% would produce a tree of versions A->B->C indicating that edit C was based on +%% version B which was in turn based on A. In a world without replication (and +%% no ability to disable MVCC checks), all histories would be forced to be +%% linear lists of edits due to constraints imposed by MVCC (ie, new edits must +%% be based on the current version). However, we have replication, so we must +%% deal with not so easy cases, which lead to trees. +%% +%% Consider a document in state A. This doc is replicated to a second node. We +%% then edit the document on each node leaving it in two different states, B +%% and C. We now have two key trees, A->B and A->C. When we go to replicate a +%% second time, the key tree must combine these two trees which gives us +%% A->(B|C). This is how conflicts are introduced. In terms of the key tree, we +%% say that we have two leaves (B and C) that are not deleted. The presense of +%% the multiple leaves indicate conflict. To remove a conflict, one of the +%% edits (B or C) can be deleted, which results in, A->(B|C->D) where D is an +%% edit that is specially marked with the a deleted=true flag. +%% +%% What makes this a bit more complicated is that there is a limit to the +%% number of revisions kept, specified in couch_db.hrl (default is 1000). When +%% this limit is exceeded only the last 1000 are kept. This comes in to play +%% when branches are merged. The comparison has to begin at the same place in +%% the branches. A revision id is of the form N-XXXXXXX where N is the current +%% revision. So each path will have a start number, calculated in +%% couch_doc:to_path using the formula N - length(RevIds) + 1 So, .eg. if a doc +%% was edit 1003 times this start number would be 4, indicating that 3 +%% revisions were truncated. +%% +%% This comes into play in @see merge_at/3 which recursively walks down one +%% tree or the other until they begin at the same revision. + -module(couch_key_tree). -export([merge/3, find_missing/2, get_key_leafs/2, @@ -32,6 +67,9 @@ merge(Paths, Path, Depth) -> {Merged, Conflicts} = merge(Paths, Path), {stem(Merged, Depth), Conflicts}. +%% @doc Merge a path with an existing list of paths, returning a new list of +%% paths. A return of conflicts indicates a new conflict was discovered in this +%% merge. Conflicts may already exist in the original list of paths. -spec merge([path()], path()) -> {[path()], conflicts | no_conflicts}. merge(Paths, Path) -> {ok, Merged, HasConflicts} = merge_one(Paths, Path, [], false), @@ -70,6 +108,7 @@ merge_at([{Key, Value, SubTree}|Sibs], Place, InsertTree) when Place > 0 -> {ok, Merged, Conflicts} -> {ok, [{Key, Value, Merged} | Sibs], Conflicts}; no -> + % first branch didn't merge, move to next branch case merge_at(Sibs, Place, InsertTree) of {ok, Merged, Conflicts} -> {ok, [{Key, Value, SubTree} | Merged], Conflicts}; @@ -112,11 +151,12 @@ merge_simple([{Key, V1, SubA} | NextA], [{Key, V2, SubB} | NextB]) -> Value = value_pref(V1, V2), {[{Key, Value, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2}; merge_simple([{A, _, _} = Tree | Next], [{B, _, _} | _] = Insert) when A < B -> - {Merged, _} = merge_simple(Next, Insert), - {[Tree | Merged], true}; + {Merged, Conflict} = merge_simple(Next, Insert), + % if Merged has more branches than the input we added a new conflict + {[Tree | Merged], Conflict orelse (length(Merged) > length(Next))}; merge_simple(Ours, [Tree | Next]) -> - {Merged, _} = merge_simple(Ours, Next), - {[Tree | Merged], true}. + {Merged, Conflict} = merge_simple(Ours, Next), + {[Tree | Merged], Conflict orelse (length(Merged) > length(Next))}. find_missing(_Tree, []) -> []; diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index 482b84dc..966d6872 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -486,7 +486,17 @@ make_replication_id(RepProps, UserCtx) -> % add a new clause and increase ?REP_ID_VERSION at the top make_replication_id({Props}, UserCtx, 2) -> {ok, HostName} = inet:gethostname(), - Port = mochiweb_socket_server:get(couch_httpd, port), + Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of + P when is_number(P) -> + P; + _ -> + % On restart we might be called before the couch_httpd process is + % started. + % TODO: we might be under an SSL socket server only, or both under + % SSL and a non-SSL socket. + % ... mochiweb_socket_server:get(https, port) + list_to_integer(couch_config:get("httpd", "port", "5984")) + end, Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx); @@ -513,16 +523,37 @@ maybe_append_filters({Props}, Base, UserCtx) -> couch_util:to_hex(couch_util:md5(term_to_binary(Base2))). filter_code(Filter, Props, UserCtx) -> - {match, [DDocName, FilterName]} = - re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]), + {DDocName, FilterName} = + case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of + {match, [DDocName0, FilterName0]} -> + {DDocName0, FilterName0}; + _ -> + throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>}) + end, ProxyParams = parse_proxy_params( couch_util:get_value(<<"proxy">>, Props, [])), - Source = open_db( - couch_util:get_value(<<"source">>, Props), UserCtx, ProxyParams), + DbName = couch_util:get_value(<<"source">>, Props), + Source = try + open_db(DbName, UserCtx, ProxyParams) + catch + _Tag:DbError -> + DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s", + [couch_util:url_strip_password(DbName), couch_util:to_binary(DbError)]), + throw({error, iolist_to_binary(DbErrorMsg)}) + end, try - {ok, DDoc} = open_doc(Source, <<"_design/", DDocName/binary>>), + Body = case (catch open_doc(Source, <<"_design/", DDocName/binary>>)) of + {ok, #doc{body = Body0}} -> + Body0; + DocError -> + DocErrorMsg = io_lib:format( + "Couldn't open document `_design/~s` from source " + "database `~s`: ~s", + [dbname(Source), DDocName, couch_util:to_binary(DocError)]), + throw({error, iolist_to_binary(DocErrorMsg)}) + end, Code = couch_util:get_nested_json_value( - DDoc#doc.body, [<<"filters">>, FilterName]), + Body, [<<"filters">>, FilterName]), re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}]) after close_db(Source) diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl index 6537c8b2..e3d97c37 100644 --- a/apps/couch/src/couch_replication_manager.erl +++ b/apps/couch/src/couch_replication_manager.erl @@ -32,7 +32,8 @@ -import(couch_util, [ get_value/2, - get_value/3 + get_value/3, + to_binary/1 ]). @@ -62,8 +63,16 @@ init(_) -> }}. -handle_call({rep_db_update, Change}, _From, State) -> - {reply, ok, process_update(State, Change)}; +handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) -> + NewState = try + process_update(State, Change) + catch + _Tag:Error -> + JsonRepDoc = get_value(doc, ChangeProps), + rep_db_update_error(Error, JsonRepDoc), + State + end, + {reply, ok, NewState}; handle_call({triggered, {BaseId, _}}, _From, State) -> [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId), @@ -250,6 +259,19 @@ process_update(State, {Change}) -> end. +rep_db_update_error(Error, {Props} = JsonRepDoc) -> + case Error of + {bad_rep_doc, Reason} -> + ok; + _ -> + Reason = to_binary(Error) + end, + ?LOG_ERROR("Replication manager, error processing document `~s`: ~s", + [get_value(<<"_id">>, Props), Reason]), + couch_rep:update_rep_doc( + JsonRepDoc, [{<<"_replication_state">>, <<"error">>}]). + + rep_user_ctx({RepDoc}) -> case get_value(<<"user_ctx">>, RepDoc) of undefined -> @@ -265,7 +287,7 @@ rep_user_ctx({RepDoc}) -> maybe_start_replication(#state{max_retries = MaxRetries} = State, DocId, JsonRepDoc) -> UserCtx = rep_user_ctx(JsonRepDoc), - {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), + {BaseId, _} = RepId = make_rep_id(JsonRepDoc, UserCtx), case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of [] -> true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), @@ -290,6 +312,17 @@ maybe_start_replication(#state{max_retries = MaxRetries} = State, end. +make_rep_id(RepDoc, UserCtx) -> + try + couch_rep:make_replication_id(RepDoc, UserCtx) + catch + throw:{error, Reason} -> + throw({bad_rep_doc, Reason}); + Tag:Err -> + throw({bad_rep_doc, to_binary({Tag, Err})}) + end. + + maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> case get_value(<<"_replication_id">>, Props) of RepId -> diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl index 2d84b0f9..05174245 100644 --- a/apps/couch/src/couch_view.erl +++ b/apps/couch/src/couch_view.erl @@ -97,8 +97,8 @@ cleanup_index_files(Db) -> if length(Sigs) =:= 0 -> FileList; true -> - % regex that matches all ddocs - RegExp = "("++ string:join(Sigs, "|") ++")", + % regex that matches all ddocs + RegExp = "("++ string:join(Sigs, "|") ++")", % filter out the ones in use [FilePath || FilePath <- FileList, diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl index 38f63f66..69aaff00 100644 --- a/apps/couch/src/couch_view_compactor.erl +++ b/apps/couch/src/couch_view_compactor.erl @@ -55,8 +55,10 @@ compact_group(Group, EmptyGroup) -> Fun = fun({DocId, _ViewIdKeys} = KV, {Bt, Acc, TotalCopied, LastId}) -> if DocId =:= LastId -> % COUCHDB-999 - Msg = "Duplicates of ~s detected in ~s ~s - rebuild required", - exit(io_lib:format(Msg, [DocId, DbName, GroupId])); + ?LOG_ERROR("Duplicates of document `~s` detected in view group `~s`" + ", database `~s` - view rebuild, from scratch, is required", + [DocId, GroupId, DbName]), + exit({view_duplicated_id, DocId}); true -> ok end, if TotalCopied rem 10000 =:= 0 -> couch_task_status:update("Copied ~p of ~p Ids (~p%)", |