summaryrefslogtreecommitdiff
path: root/apps/couch/src
diff options
context:
space:
mode:
authorRobert Newson <robert.newson@cloudant.com>2011-06-22 15:10:13 -0400
committerRobert Newson <robert.newson@cloudant.com>2011-06-22 15:10:13 -0400
commite2e9e07398c9697a13f6db36ed9d0403fa6c89f1 (patch)
tree39faca2adc29ad57c0084c942f1c2eb6ba08774d /apps/couch/src
parentca738083b0a5053a1402b8eac55d6228b6e952b2 (diff)
parentd8dc16093a26c53407d8bf702698848104ba01d6 (diff)
Merge remote-tracking branch 'upstream/1.1.x' into 0.4.x
Conflicts: acinclude.m4.in apps/couch/src/couch_app.erl apps/couch/src/couch_doc.erl apps/couch/src/couch_view.erl configure.ac share/www/script/test/replicator_db.js src/erlang-oauth/Makefile.am test/etap/Makefile.am
Diffstat (limited to 'apps/couch/src')
-rw-r--r--apps/couch/src/couch_doc.erl20
-rw-r--r--apps/couch/src/couch_httpd_db.erl14
-rw-r--r--apps/couch/src/couch_key_tree.erl48
-rw-r--r--apps/couch/src/couch_rep.erl45
-rw-r--r--apps/couch/src/couch_replication_manager.erl41
-rw-r--r--apps/couch/src/couch_view.erl4
-rw-r--r--apps/couch/src/couch_view_compactor.erl6
7 files changed, 154 insertions, 24 deletions
diff --git a/apps/couch/src/couch_doc.erl b/apps/couch/src/couch_doc.erl
index c7b9dbb9..33d7e3cf 100644
--- a/apps/couch/src/couch_doc.erl
+++ b/apps/couch/src/couch_doc.erl
@@ -18,6 +18,7 @@
-export([validate_docid/1]).
-export([doc_from_multi_part_stream/2]).
-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
+-export([abort_multi_part_stream/1]).
-include("couch_db.hrl").
@@ -499,7 +500,7 @@ doc_from_multi_part_stream(ContentType, DataFun) ->
receive {Parser, finished} -> ok end,
erlang:put(mochiweb_request_recv, true)
end,
- {ok, Doc#doc{atts=Atts2}, WaitFun}
+ {ok, Doc#doc{atts=Atts2}, WaitFun, Parser}
end.
mp_parse_doc({headers, H}, []) ->
@@ -590,3 +591,20 @@ maybe_send_data({ChunkList, Offset, Counters, Waiting}) ->
end
end
end.
+
+abort_multi_part_stream(Parser) ->
+ abort_multi_part_stream(Parser, erlang:monitor(process, Parser)).
+
+abort_multi_part_stream(Parser, MonRef) ->
+ case is_process_alive(Parser) of
+ true ->
+ Parser ! {get_bytes, self()},
+ receive
+ {bytes, _Bytes} ->
+ abort_multi_part_stream(Parser, MonRef);
+ {'DOWN', MonRef, _, _, _} ->
+ ok
+ end;
+ false ->
+ erlang:demonitor(MonRef, [flush])
+ end.
diff --git a/apps/couch/src/couch_httpd_db.erl b/apps/couch/src/couch_httpd_db.erl
index e3638b25..71204598 100644
--- a/apps/couch/src/couch_httpd_db.erl
+++ b/apps/couch/src/couch_httpd_db.erl
@@ -692,12 +692,18 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
RespHeaders = [{"Location", Loc}],
case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
("multipart/related;" ++ _) = ContentType ->
- {ok, Doc0, WaitFun} = couch_doc:doc_from_multi_part_stream(
+ {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(
ContentType, fun() -> receive_request_data(Req) end),
Doc = couch_doc_from_req(Req, DocId, Doc0),
- Result = update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType),
- WaitFun(),
- Result;
+ try
+ Result = update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType),
+ WaitFun(),
+ Result
+ catch throw:Err ->
+ % Document rejected by a validate_doc_update function.
+ couch_doc:abort_multi_part_stream(Parser),
+ throw(Err)
+ end;
_Else ->
case couch_httpd:qs_value(Req, "batch") of
"ok" ->
diff --git a/apps/couch/src/couch_key_tree.erl b/apps/couch/src/couch_key_tree.erl
index c5fd1088..2f3c6abf 100644
--- a/apps/couch/src/couch_key_tree.erl
+++ b/apps/couch/src/couch_key_tree.erl
@@ -10,6 +10,41 @@
% License for the specific language governing permissions and limitations under
% the License.
+%% @doc Data structure used to represent document edit histories.
+
+%% A key tree is used to represent the edit history of a document. Each node of
+%% the tree represents a particular version. Relations between nodes represent
+%% the order that these edits were applied. For instance, a set of three edits
+%% would produce a tree of versions A->B->C indicating that edit C was based on
+%% version B which was in turn based on A. In a world without replication (and
+%% no ability to disable MVCC checks), all histories would be forced to be
+%% linear lists of edits due to constraints imposed by MVCC (ie, new edits must
+%% be based on the current version). However, we have replication, so we must
+%% deal with not so easy cases, which lead to trees.
+%%
+%% Consider a document in state A. This doc is replicated to a second node. We
+%% then edit the document on each node leaving it in two different states, B
+%% and C. We now have two key trees, A->B and A->C. When we go to replicate a
+%% second time, the key tree must combine these two trees which gives us
+%% A->(B|C). This is how conflicts are introduced. In terms of the key tree, we
+%% say that we have two leaves (B and C) that are not deleted. The presense of
+%% the multiple leaves indicate conflict. To remove a conflict, one of the
+%% edits (B or C) can be deleted, which results in, A->(B|C->D) where D is an
+%% edit that is specially marked with the a deleted=true flag.
+%%
+%% What makes this a bit more complicated is that there is a limit to the
+%% number of revisions kept, specified in couch_db.hrl (default is 1000). When
+%% this limit is exceeded only the last 1000 are kept. This comes in to play
+%% when branches are merged. The comparison has to begin at the same place in
+%% the branches. A revision id is of the form N-XXXXXXX where N is the current
+%% revision. So each path will have a start number, calculated in
+%% couch_doc:to_path using the formula N - length(RevIds) + 1 So, .eg. if a doc
+%% was edit 1003 times this start number would be 4, indicating that 3
+%% revisions were truncated.
+%%
+%% This comes into play in @see merge_at/3 which recursively walks down one
+%% tree or the other until they begin at the same revision.
+
-module(couch_key_tree).
-export([merge/3, find_missing/2, get_key_leafs/2,
@@ -32,6 +67,9 @@ merge(Paths, Path, Depth) ->
{Merged, Conflicts} = merge(Paths, Path),
{stem(Merged, Depth), Conflicts}.
+%% @doc Merge a path with an existing list of paths, returning a new list of
+%% paths. A return of conflicts indicates a new conflict was discovered in this
+%% merge. Conflicts may already exist in the original list of paths.
-spec merge([path()], path()) -> {[path()], conflicts | no_conflicts}.
merge(Paths, Path) ->
{ok, Merged, HasConflicts} = merge_one(Paths, Path, [], false),
@@ -70,6 +108,7 @@ merge_at([{Key, Value, SubTree}|Sibs], Place, InsertTree) when Place > 0 ->
{ok, Merged, Conflicts} ->
{ok, [{Key, Value, Merged} | Sibs], Conflicts};
no ->
+ % first branch didn't merge, move to next branch
case merge_at(Sibs, Place, InsertTree) of
{ok, Merged, Conflicts} ->
{ok, [{Key, Value, SubTree} | Merged], Conflicts};
@@ -112,11 +151,12 @@ merge_simple([{Key, V1, SubA} | NextA], [{Key, V2, SubB} | NextB]) ->
Value = value_pref(V1, V2),
{[{Key, Value, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2};
merge_simple([{A, _, _} = Tree | Next], [{B, _, _} | _] = Insert) when A < B ->
- {Merged, _} = merge_simple(Next, Insert),
- {[Tree | Merged], true};
+ {Merged, Conflict} = merge_simple(Next, Insert),
+ % if Merged has more branches than the input we added a new conflict
+ {[Tree | Merged], Conflict orelse (length(Merged) > length(Next))};
merge_simple(Ours, [Tree | Next]) ->
- {Merged, _} = merge_simple(Ours, Next),
- {[Tree | Merged], true}.
+ {Merged, Conflict} = merge_simple(Ours, Next),
+ {[Tree | Merged], Conflict orelse (length(Merged) > length(Next))}.
find_missing(_Tree, []) ->
[];
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl
index 482b84dc..966d6872 100644
--- a/apps/couch/src/couch_rep.erl
+++ b/apps/couch/src/couch_rep.erl
@@ -486,7 +486,17 @@ make_replication_id(RepProps, UserCtx) ->
% add a new clause and increase ?REP_ID_VERSION at the top
make_replication_id({Props}, UserCtx, 2) ->
{ok, HostName} = inet:gethostname(),
- Port = mochiweb_socket_server:get(couch_httpd, port),
+ Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
+ P when is_number(P) ->
+ P;
+ _ ->
+ % On restart we might be called before the couch_httpd process is
+ % started.
+ % TODO: we might be under an SSL socket server only, or both under
+ % SSL and a non-SSL socket.
+ % ... mochiweb_socket_server:get(https, port)
+ list_to_integer(couch_config:get("httpd", "port", "5984"))
+ end,
Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx);
@@ -513,16 +523,37 @@ maybe_append_filters({Props}, Base, UserCtx) ->
couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
filter_code(Filter, Props, UserCtx) ->
- {match, [DDocName, FilterName]} =
- re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]),
+ {DDocName, FilterName} =
+ case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
+ {match, [DDocName0, FilterName0]} ->
+ {DDocName0, FilterName0};
+ _ ->
+ throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
+ end,
ProxyParams = parse_proxy_params(
couch_util:get_value(<<"proxy">>, Props, [])),
- Source = open_db(
- couch_util:get_value(<<"source">>, Props), UserCtx, ProxyParams),
+ DbName = couch_util:get_value(<<"source">>, Props),
+ Source = try
+ open_db(DbName, UserCtx, ProxyParams)
+ catch
+ _Tag:DbError ->
+ DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
+ [couch_util:url_strip_password(DbName), couch_util:to_binary(DbError)]),
+ throw({error, iolist_to_binary(DbErrorMsg)})
+ end,
try
- {ok, DDoc} = open_doc(Source, <<"_design/", DDocName/binary>>),
+ Body = case (catch open_doc(Source, <<"_design/", DDocName/binary>>)) of
+ {ok, #doc{body = Body0}} ->
+ Body0;
+ DocError ->
+ DocErrorMsg = io_lib:format(
+ "Couldn't open document `_design/~s` from source "
+ "database `~s`: ~s",
+ [dbname(Source), DDocName, couch_util:to_binary(DocError)]),
+ throw({error, iolist_to_binary(DocErrorMsg)})
+ end,
Code = couch_util:get_nested_json_value(
- DDoc#doc.body, [<<"filters">>, FilterName]),
+ Body, [<<"filters">>, FilterName]),
re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}])
after
close_db(Source)
diff --git a/apps/couch/src/couch_replication_manager.erl b/apps/couch/src/couch_replication_manager.erl
index 6537c8b2..e3d97c37 100644
--- a/apps/couch/src/couch_replication_manager.erl
+++ b/apps/couch/src/couch_replication_manager.erl
@@ -32,7 +32,8 @@
-import(couch_util, [
get_value/2,
- get_value/3
+ get_value/3,
+ to_binary/1
]).
@@ -62,8 +63,16 @@ init(_) ->
}}.
-handle_call({rep_db_update, Change}, _From, State) ->
- {reply, ok, process_update(State, Change)};
+handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) ->
+ NewState = try
+ process_update(State, Change)
+ catch
+ _Tag:Error ->
+ JsonRepDoc = get_value(doc, ChangeProps),
+ rep_db_update_error(Error, JsonRepDoc),
+ State
+ end,
+ {reply, ok, NewState};
handle_call({triggered, {BaseId, _}}, _From, State) ->
[{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
@@ -250,6 +259,19 @@ process_update(State, {Change}) ->
end.
+rep_db_update_error(Error, {Props} = JsonRepDoc) ->
+ case Error of
+ {bad_rep_doc, Reason} ->
+ ok;
+ _ ->
+ Reason = to_binary(Error)
+ end,
+ ?LOG_ERROR("Replication manager, error processing document `~s`: ~s",
+ [get_value(<<"_id">>, Props), Reason]),
+ couch_rep:update_rep_doc(
+ JsonRepDoc, [{<<"_replication_state">>, <<"error">>}]).
+
+
rep_user_ctx({RepDoc}) ->
case get_value(<<"user_ctx">>, RepDoc) of
undefined ->
@@ -265,7 +287,7 @@ rep_user_ctx({RepDoc}) ->
maybe_start_replication(#state{max_retries = MaxRetries} = State,
DocId, JsonRepDoc) ->
UserCtx = rep_user_ctx(JsonRepDoc),
- {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
+ {BaseId, _} = RepId = make_rep_id(JsonRepDoc, UserCtx),
case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
@@ -290,6 +312,17 @@ maybe_start_replication(#state{max_retries = MaxRetries} = State,
end.
+make_rep_id(RepDoc, UserCtx) ->
+ try
+ couch_rep:make_replication_id(RepDoc, UserCtx)
+ catch
+ throw:{error, Reason} ->
+ throw({bad_rep_doc, Reason});
+ Tag:Err ->
+ throw({bad_rep_doc, to_binary({Tag, Err})})
+ end.
+
+
maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
case get_value(<<"_replication_id">>, Props) of
RepId ->
diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl
index 2d84b0f9..05174245 100644
--- a/apps/couch/src/couch_view.erl
+++ b/apps/couch/src/couch_view.erl
@@ -97,8 +97,8 @@ cleanup_index_files(Db) ->
if length(Sigs) =:= 0 ->
FileList;
true ->
- % regex that matches all ddocs
- RegExp = "("++ string:join(Sigs, "|") ++")",
+ % regex that matches all ddocs
+ RegExp = "("++ string:join(Sigs, "|") ++")",
% filter out the ones in use
[FilePath || FilePath <- FileList,
diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl
index 38f63f66..69aaff00 100644
--- a/apps/couch/src/couch_view_compactor.erl
+++ b/apps/couch/src/couch_view_compactor.erl
@@ -55,8 +55,10 @@ compact_group(Group, EmptyGroup) ->
Fun = fun({DocId, _ViewIdKeys} = KV, {Bt, Acc, TotalCopied, LastId}) ->
if DocId =:= LastId -> % COUCHDB-999
- Msg = "Duplicates of ~s detected in ~s ~s - rebuild required",
- exit(io_lib:format(Msg, [DocId, DbName, GroupId]));
+ ?LOG_ERROR("Duplicates of document `~s` detected in view group `~s`"
+ ", database `~s` - view rebuild, from scratch, is required",
+ [DocId, GroupId, DbName]),
+ exit({view_duplicated_id, DocId});
true -> ok end,
if TotalCopied rem 10000 =:= 0 ->
couch_task_status:update("Copied ~p of ~p Ids (~p%)",