summaryrefslogtreecommitdiff
path: root/apps/couch/src
diff options
context:
space:
mode:
Diffstat (limited to 'apps/couch/src')
-rw-r--r--apps/couch/src/couch_auth_cache.erl13
-rw-r--r--apps/couch/src/couch_db.erl22
-rw-r--r--apps/couch/src/couch_db_updater.erl85
-rw-r--r--apps/couch/src/couch_file.erl23
-rw-r--r--apps/couch/src/couch_httpd.erl8
-rw-r--r--apps/couch/src/couch_httpd_db.erl20
-rw-r--r--apps/couch/src/couch_httpd_misc_handlers.erl11
-rw-r--r--apps/couch/src/couch_httpd_rewrite.erl5
-rw-r--r--apps/couch/src/couch_httpd_view.erl2
-rw-r--r--apps/couch/src/couch_js_functions.hrl2
-rw-r--r--apps/couch/src/couch_key_tree.erl135
-rw-r--r--apps/couch/src/couch_rep.erl324
-rw-r--r--apps/couch/src/couch_rep_att.erl2
-rw-r--r--apps/couch/src/couch_rep_changes_feed.erl86
-rw-r--r--apps/couch/src/couch_rep_httpc.erl62
-rw-r--r--apps/couch/src/couch_rep_missing_revs.erl12
-rw-r--r--apps/couch/src/couch_rep_reader.erl63
-rw-r--r--apps/couch/src/couch_rep_writer.erl9
-rw-r--r--apps/couch/src/couch_util.erl11
-rw-r--r--apps/couch/src/couch_view.erl9
-rw-r--r--apps/couch/src/couch_view_compactor.erl14
-rw-r--r--apps/couch/src/couch_view_group.erl12
22 files changed, 534 insertions, 396 deletions
diff --git a/apps/couch/src/couch_auth_cache.erl b/apps/couch/src/couch_auth_cache.erl
index 0264a69d..8b911543 100644
--- a/apps/couch/src/couch_auth_cache.erl
+++ b/apps/couch/src/couch_auth_cache.erl
@@ -135,6 +135,7 @@ handle_db_event({Event, DbName}) ->
case Event of
deleted -> gen_server:call(?MODULE, auth_db_deleted, infinity);
created -> gen_server:call(?MODULE, auth_db_created, infinity);
+ compacted -> gen_server:call(?MODULE, auth_db_compacted, infinity);
_Else -> ok
end;
false ->
@@ -158,6 +159,14 @@ handle_call(auth_db_created, _From, State) ->
true = ets:insert(?STATE, {auth_db, open_auth_db()}),
{reply, ok, NewState};
+handle_call(auth_db_compacted, _From, State) ->
+ exec_if_auth_db(
+ fun(AuthDb) ->
+ true = ets:insert(?STATE, {auth_db, reopen_auth_db(AuthDb)})
+ end
+ ),
+ {reply, ok, State};
+
handle_call({new_max_cache_size, NewSize}, _From, State) ->
case NewSize >= State#state.cache_size of
true ->
@@ -175,7 +184,7 @@ handle_call({new_max_cache_size, NewSize}, _From, State) ->
end,
NewState = State#state{
max_cache_size = NewSize,
- cache_size = erlang:min(NewSize, State#state.cache_size)
+ cache_size = lists:min([NewSize, State#state.cache_size])
},
{reply, ok, NewState};
@@ -338,7 +347,7 @@ cache_needs_refresh() ->
reopen_auth_db(AuthDb) ->
- case (catch gen_server:call(AuthDb#db.main_pid, get_db, infinity)) of
+ case (catch couch_db:reopen(AuthDb)) of
{ok, AuthDb2} ->
AuthDb2;
_ ->
diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl
index 0e42980e..7293b9bb 100644
--- a/apps/couch/src/couch_db.erl
+++ b/apps/couch/src/couch_db.erl
@@ -25,6 +25,7 @@
-export([set_security/2,get_security/1]).
-export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]).
-export([check_is_admin/1, check_is_reader/1, get_doc_count/1, load_validation_funs/1]).
+-export([reopen/1]).
-include("couch_db.hrl").
@@ -84,6 +85,17 @@ open(DbName, Options) ->
Else -> Else
end.
+reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
+ {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity),
+ case NewFd =:= Fd of
+ true ->
+ {ok, NewDb#db{user_ctx = UserCtx}};
+ false ->
+ erlang:demonitor(OldRef),
+ NewRef = erlang:monitor(process, NewFd),
+ {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
+ end.
+
ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
ok = gen_server:call(Pid, full_commit, infinity),
{ok, StartTime}.
@@ -584,7 +596,8 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
{ok, #full_doc_info{rev_tree=OldTree}} ->
NewRevTree = lists:foldl(
fun(NewDoc, AccTree) ->
- {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]),
+ {NewTree, _} = couch_key_tree:merge(AccTree,
+ couch_db:doc_to_tree(NewDoc), Db#db.revs_limit),
NewTree
end,
OldTree, Bucket),
@@ -845,11 +858,12 @@ flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
% already written to our file, nothing to write
Att;
-flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) ->
+flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5,
+ disk_len=InDiskLen} = Att) ->
{NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
check_md5(IdentityMd5, InMd5),
- Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=Len};
+ Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen};
flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
with_stream(Fd, Att, fun(OutputStream) ->
@@ -947,7 +961,7 @@ with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
write_streamed_attachment(_Stream, _F, 0) ->
ok;
-write_streamed_attachment(Stream, F, LenLeft) ->
+write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
Bin = F(),
ok = couch_stream:write(Stream, Bin),
write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl
index ab078caf..835d188c 100644
--- a/apps/couch/src/couch_db_updater.erl
+++ b/apps/couch/src/couch_db_updater.erl
@@ -204,9 +204,11 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
couch_file:delete(RootDir, Filepath),
ok = file:rename(CompactFilepath, Filepath),
close_db(Db),
- ok = gen_server:call(couch_server, {db_updated, NewDb2}, infinity),
+ NewDb3 = refresh_validate_doc_funs(NewDb2),
+ ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
+ couch_db_update_notifier:notify({compacted, NewDb3#db.name}),
?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
- {noreply, NewDb2#db{compactor_pid=nil}};
+ {noreply, NewDb3#db{compactor_pid=nil}};
false ->
?LOG_INFO("Compaction for ~s still behind main file "
"(update seq=~p. compact update seq=~p). Retrying.",
@@ -518,16 +520,17 @@ send_result(Client, Id, OriginalRevs, NewResult) ->
% used to send a result to the client
catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
-merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
+merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
{ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
-merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
+merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
[OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
#full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
= OldDocInfo,
NewRevTree = lists:foldl(
fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) ->
if not MergeConflicts ->
- case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
+ case couch_key_tree:merge(AccTree, couch_db:doc_to_tree(NewDoc),
+ Limit) of
{_NewTree, conflicts} when (not OldDeleted) ->
send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
AccTree;
@@ -558,7 +561,7 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
NewDoc#doc{revs={OldPos, [OldRev]}}),
NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
{NewTree2, _} = couch_key_tree:merge(AccTree,
- [couch_db:doc_to_tree(NewDoc2)]),
+ couch_db:doc_to_tree(NewDoc2), Limit),
% we changed the rev id, this tells the caller we did
send_result(Client, Id, {Pos-1,PrevRevs},
{ok, {OldPos + 1, NewRevId}}),
@@ -572,15 +575,15 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
end;
true ->
{NewTree, _} = couch_key_tree:merge(AccTree,
- [couch_db:doc_to_tree(NewDoc)]),
+ couch_db:doc_to_tree(NewDoc), Limit),
NewTree
end
end,
OldTree, NewDocs),
if NewRevTree == OldTree ->
% nothing changed
- merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos,
- AccRemoveSeqs, AccSeq);
+ merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
+ AccNewInfos, AccRemoveSeqs, AccSeq);
true ->
% we have updated the document, give it a new seq #
NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
@@ -588,8 +591,8 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
0 -> AccRemoveSeqs;
_ -> [OldSeq | AccRemoveSeqs]
end,
- merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo,
- [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
+ merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
+ [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
end.
@@ -609,7 +612,8 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
#db{
id_tree = DocInfoByIdBTree,
seq_tree = DocInfoBySeqBTree,
- update_seq = LastSeq
+ update_seq = LastSeq,
+ revs_limit = RevsLimit
} = Db,
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
@@ -622,11 +626,9 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
end,
Ids, OldDocLookups),
% Merge the new docs into the revision trees.
- {ok, NewDocInfos0, RemoveSeqs, NewSeq} = merge_rev_trees(
+ {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
- NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
-
% All documents are now ready to write.
{ok, Db2} = update_local_docs(Db, NonRepDocs),
@@ -794,15 +796,6 @@ copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) ->
end, BinInfos),
{BodyData, NewBinInfos}.
-copy_rev_tree_attachments(SrcDb, DestFd, Tree) ->
- couch_key_tree:map(
- fun(Rev, {IsDel, Sp, Seq}, leaf) ->
- DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd),
- {IsDel, DocBody, Seq};
- (_, _, branch) ->
- ?REV_MISSING
- end, Tree).
-
merge_lookups(Infos, []) ->
Infos;
merge_lookups([], _) ->
@@ -816,20 +809,19 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) ->
% lookup any necessary full_doc_infos
DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
- Infos = merge_lookups(MixedInfos, LookupResults),
+ % COUCHDB-968, make sure we prune duplicates during compaction
+ Infos = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
+ A =< B
+ end, merge_lookups(MixedInfos, LookupResults)),
- % write out the attachments
- NewInfos0 = [Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db,
- DestFd, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos],
-
- % write out the docs
- % we do this in 2 stages so the docs are written out contiguously, making
- % view indexing and replication faster.
- NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
- fun(_Key, {IsDel, DocBody, Seq}) ->
+ NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map(
+ fun(Rev, {IsDel, Sp, Seq}, leaf) ->
+ DocBody = copy_doc_attachments(Db, Rev, Sp, DestFd),
{ok, Pos} = couch_file:append_term_md5(DestFd, DocBody),
- {IsDel, Pos, Seq}
- end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- NewInfos0],
+ {IsDel, Pos, Seq};
+ (_, _, branch) ->
+ ?REV_MISSING
+ end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos],
NewInfos = stem_full_doc_infos(Db, NewInfos1),
RemoveSeqs =
@@ -900,14 +892,19 @@ copy_compact(Db, NewDb0, Retry) ->
commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
-start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
+start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
CompactFile = Filepath ++ ".compact",
?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
case couch_file:open(CompactFile) of
{ok, Fd} ->
couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
Retry = true,
- {ok, Header} = couch_file:read_header(Fd);
+ case couch_file:read_header(Fd) of
+ {ok, Header} ->
+ ok;
+ no_valid_header ->
+ ok = couch_file:write_header(Fd, Header=#db_header{})
+ end;
{error, enoent} ->
couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
@@ -915,8 +912,16 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
ok = couch_file:write_header(Fd, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
+ NewDb2 = if PurgeSeq > 0 ->
+ {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
+ {ok, Pointer} = couch_file:append_term(Fd, PurgedIdsRevs),
+ NewDb#db{header=Header#db_header{purge_seq=PurgeSeq, purged_docs=Pointer}};
+ true ->
+ NewDb
+ end,
unlink(Fd),
- NewDb2 = copy_compact(Db, NewDb, Retry),
- close_db(NewDb2),
+
+ NewDb3 = copy_compact(Db, NewDb2, Retry),
+ close_db(NewDb3),
gen_server:cast(Db#db.main_pid, {compact_done, CompactFile}).
diff --git a/apps/couch/src/couch_file.erl b/apps/couch/src/couch_file.erl
index 9c06a44e..3e4f29fe 100644
--- a/apps/couch/src/couch_file.erl
+++ b/apps/couch/src/couch_file.erl
@@ -120,7 +120,19 @@ pread_binary(Fd, Pos) ->
pread_iolist(Fd, Pos) ->
- gen_server:call(Fd, {pread_iolist, Pos}, infinity).
+ case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of
+ {ok, IoList, <<>>} ->
+ {ok, IoList};
+ {ok, IoList, Md5} ->
+ case couch_util:md5(IoList) of
+ Md5 ->
+ {ok, IoList};
+ _ ->
+ exit({file_corruption, <<"file corruption">>})
+ end;
+ Error ->
+ Error
+ end.
%%----------------------------------------------------------------------
%% Purpose: The length of a file, in bytes.
@@ -287,15 +299,10 @@ handle_call({pread_iolist, Pos}, _From, File) ->
<<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term
{Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16),
{Md5, IoList} = extract_md5(Md5AndIoList),
- case couch_util:md5(IoList) of
- Md5 ->
- {reply, {ok, IoList}, File};
- _ ->
- {stop, file_corruption, {error,file_corruption}, File}
- end;
+ {reply, {ok, IoList, Md5}, File};
<<0:1/integer,Len:31/integer>> ->
{Iolist, _} = read_raw_iolist_int(File, NextPos, Len),
- {reply, {ok, Iolist}, File}
+ {reply, {ok, Iolist, <<>>}, File}
end;
handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) ->
{ok, Bin} = file:pread(Fd, Pos, Bytes),
diff --git a/apps/couch/src/couch_httpd.erl b/apps/couch/src/couch_httpd.erl
index 2b952656..0d9abde6 100644
--- a/apps/couch/src/couch_httpd.erl
+++ b/apps/couch/src/couch_httpd.erl
@@ -769,7 +769,13 @@ error_headers(#httpd{mochi_req=MochiReq}=Req, Code, ErrorStr, ReasonStr) ->
{Code, []};
match ->
AuthRedirectBin = ?l2b(AuthRedirect),
- UrlReturn = ?l2b(couch_util:url_encode(MochiReq:get(path))),
+ % Redirect to the path the user requested, not
+ % the one that is used internally.
+ UrlReturnRaw = case MochiReq:get_header_value("x-couchdb-vhost-path") of
+ undefined -> MochiReq:get(path);
+ VHostPath -> VHostPath
+ end,
+ UrlReturn = ?l2b(couch_util:url_encode(UrlReturnRaw)),
UrlReason = ?l2b(couch_util:url_encode(ReasonStr)),
{302, [{"Location", couch_httpd:absolute_uri(Req, <<AuthRedirectBin/binary,"?return=",UrlReturn/binary,"&reason=",UrlReason/binary>>)}]}
end
diff --git a/apps/couch/src/couch_httpd_db.erl b/apps/couch/src/couch_httpd_db.erl
index cf4e2120..217a2d03 100644
--- a/apps/couch/src/couch_httpd_db.erl
+++ b/apps/couch/src/couch_httpd_db.erl
@@ -576,7 +576,7 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
{ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options),
AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of
undefined -> [];
- AcceptHeader -> string:tokens(AcceptHeader, "; ")
+ AcceptHeader -> string:tokens(AcceptHeader, ", ")
end,
case lists:member("multipart/mixed", AcceptedTypes) of
false ->
@@ -736,34 +736,34 @@ send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) ->
JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc,
[attachments, follows|Options])),
{ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
- Boundary,JsonBytes, Atts,false),
+ Boundary,JsonBytes, Atts, true),
CType = {<<"Content-Type">>, ContentType},
{ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len),
couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
- fun(Data) -> couch_httpd:send(Resp, Data) end, false)
+ fun(Data) -> couch_httpd:send(Resp, Data) end, true)
end;
false ->
send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options))
end.
-send_docs_multipart(Req, Results, Options) ->
+send_docs_multipart(Req, Results, Options1) ->
OuterBoundary = couch_uuids:random(),
InnerBoundary = couch_uuids:random(),
+ Options = [attachments, follows, att_encoding_info | Options1],
CType = {"Content-Type",
"multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""},
{ok, Resp} = start_chunked_response(Req, 200, [CType]),
couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>),
lists:foreach(
fun({ok, #doc{atts=Atts}=Doc}) ->
- JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc,
- [attachments,follows|Options])),
+ JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
{ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream(
- InnerBoundary, JsonBytes, Atts, false),
+ InnerBoundary, JsonBytes, Atts, true),
couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: ",
ContentType/binary, "\r\n\r\n">>),
couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts,
fun(Data) -> couch_httpd:send_chunk(Resp, Data)
- end, false),
+ end, true),
couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>);
({{not_found, missing}, RevId}) ->
RevStr = couch_doc:rev_to_str(RevId),
@@ -1020,8 +1020,10 @@ db_attachment_req(#httpd{method=Method,mochi_req=MochiReq}=Req, Db, DocId, FileN
end
end,
- #doc{atts=Atts} = Doc,
+ #doc{atts=Atts, revs = {Pos, Revs}} = Doc,
DocEdited = Doc#doc{
+ % prune revision list as a workaround for key tree bug (COUCHDB-902)
+ revs = {Pos, case Revs of [] -> []; [Hd|_] -> [Hd] end},
atts = NewAtt ++ [A || A <- Atts, A#att.name /= FileName]
},
{ok, UpdatedRev} = couch_db:update_doc(Db, DocEdited, []),
diff --git a/apps/couch/src/couch_httpd_misc_handlers.erl b/apps/couch/src/couch_httpd_misc_handlers.erl
index 7b09dccd..7a149d11 100644
--- a/apps/couch/src/couch_httpd_misc_handlers.erl
+++ b/apps/couch/src/couch_httpd_misc_handlers.erl
@@ -93,10 +93,17 @@ handle_replicate_req(#httpd{method='POST'}=Req) ->
{error, not_found} ->
send_json(Req, 404, {[{error, not_found}]});
{error, Reason} ->
- send_json(Req, 500, {[{error, Reason}]})
+ try
+ send_json(Req, 500, {[{error, Reason}]})
+ catch
+ exit:{json_encode, _} ->
+ send_json(Req, 500, {[{error, couch_util:to_binary(Reason)}]})
+ end
catch
throw:{db_not_found, Msg} ->
- send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]})
+ send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]});
+ throw:{unauthorized, Msg} ->
+ send_json(Req, 404, {[{error, unauthorized}, {reason, Msg}]})
end;
handle_replicate_req(Req) ->
send_method_not_allowed(Req, "POST").
diff --git a/apps/couch/src/couch_httpd_rewrite.erl b/apps/couch/src/couch_httpd_rewrite.erl
index ca4ac1f0..6c3d0e3c 100644
--- a/apps/couch/src/couch_httpd_rewrite.erl
+++ b/apps/couch/src/couch_httpd_rewrite.erl
@@ -126,7 +126,10 @@ handle_rewrite_req(#httpd{
case couch_util:get_value(<<"rewrites">>, Props) of
undefined ->
couch_httpd:send_error(Req, 404, <<"rewrite_error">>,
- <<"Invalid path.">>);
+ <<"Invalid path.">>);
+ Bin when is_binary(Bin) ->
+ couch_httpd:send_error(Req, 400, <<"rewrite_error">>,
+ <<"Rewrite rules are a String. They must be a JSON Array.">>);
Rules ->
% create dispatch list from rules
DispatchList = [make_rule(Rule) || {Rule} <- Rules],
diff --git a/apps/couch/src/couch_httpd_view.erl b/apps/couch/src/couch_httpd_view.erl
index e1a0dfad..cb387d1b 100644
--- a/apps/couch/src/couch_httpd_view.erl
+++ b/apps/couch/src/couch_httpd_view.erl
@@ -365,6 +365,8 @@ validate_view_query(group_level, Value, Args) ->
end;
validate_view_query(inclusive_end, Value, Args) ->
Args#view_query_args{inclusive_end=Value};
+validate_view_query(reduce, false, Args) ->
+ Args;
validate_view_query(reduce, _, Args) ->
case Args#view_query_args.view_type of
map ->
diff --git a/apps/couch/src/couch_js_functions.hrl b/apps/couch/src/couch_js_functions.hrl
index 1f314f6e..32573a90 100644
--- a/apps/couch/src/couch_js_functions.hrl
+++ b/apps/couch/src/couch_js_functions.hrl
@@ -31,7 +31,7 @@
throw({forbidden: 'doc.name is required'});
}
- if (!(newDoc.roles && (typeof newDoc.roles.length !== 'undefined'))) {
+ if (newDoc.roles && !isArray(newDoc.roles)) {
throw({forbidden: 'doc.roles must be an array'});
}
diff --git a/apps/couch/src/couch_key_tree.erl b/apps/couch/src/couch_key_tree.erl
index 4fe09bf3..6701da58 100644
--- a/apps/couch/src/couch_key_tree.erl
+++ b/apps/couch/src/couch_key_tree.erl
@@ -12,104 +12,107 @@
-module(couch_key_tree).
--export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]).
+-export([merge/3, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]).
-export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2,
get_all_leafs_full/1,stem/2,map_leafs/2]).
-% a key tree looks like this:
-% Tree -> [] or [{Key, Value, ChildTree} | SiblingTree]
-% ChildTree -> Tree
-% SiblingTree -> [] or [{SiblingKey, Value, Tree} | Tree]
-% And each Key < SiblingKey
-
+% Tree::term() is really a tree(), but we don't want to require R13B04 yet
+-type branch() :: {Key::term(), Value::term(), Tree::term()}.
+-type path() :: {Start::pos_integer(), branch()}.
+-type tree() :: [branch()]. % sorted by key
% partial trees arranged by how much they are cut off.
-merge(A, B) ->
- {Merged, HasConflicts} =
- lists:foldl(
- fun(InsertTree, {AccTrees, AccConflicts}) ->
- {ok, Merged, Conflicts} = merge_one(AccTrees, InsertTree, [], false),
- {Merged, Conflicts or AccConflicts}
- end,
- {A, false}, B),
- if HasConflicts or
- ((length(Merged) =/= length(A)) and (length(Merged) =/= length(B))) ->
+-spec merge([path()], path(), pos_integer()) -> {[path()],
+ conflicts | no_conflicts}.
+merge(Paths, Path, Depth) ->
+ {Merged, Conflicts} = merge(Paths, Path),
+ {stem(Merged, Depth), Conflicts}.
+
+-spec merge([path()], path()) -> {[path()], conflicts | no_conflicts}.
+merge(Paths, Path) ->
+ {ok, Merged, HasConflicts} = merge_one(Paths, Path, [], false),
+ if HasConflicts ->
+ Conflicts = conflicts;
+ (length(Merged) =/= length(Paths)) and (length(Merged) =/= 1) ->
Conflicts = conflicts;
true ->
Conflicts = no_conflicts
end,
{lists:sort(Merged), Conflicts}.
+-spec merge_one(Original::[path()], Inserted::path(), [path()], bool()) ->
+ {ok, Merged::[path()], NewConflicts::bool()}.
merge_one([], Insert, OutAcc, ConflictsAcc) ->
{ok, [Insert | OutAcc], ConflictsAcc};
-merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, OutAcc, ConflictsAcc) ->
- if Start =< StartInsert ->
- StartA = Start,
- StartB = StartInsert,
- TreeA = Tree,
- TreeB = TreeInsert;
- true ->
- StartB = Start,
- StartA = StartInsert,
- TreeB = Tree,
- TreeA = TreeInsert
- end,
- case merge_at([TreeA], StartB - StartA, TreeB) of
- {ok, [CombinedTrees], Conflicts} ->
- merge_one(Rest, {StartA, CombinedTrees}, OutAcc, Conflicts or ConflictsAcc);
+merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, Acc, HasConflicts) ->
+ case merge_at([Tree], StartInsert - Start, [TreeInsert]) of
+ {ok, [Merged], Conflicts} ->
+ MergedStart = lists:min([Start, StartInsert]),
+ {ok, Rest ++ [{MergedStart, Merged} | Acc], Conflicts or HasConflicts};
no ->
- merge_one(Rest, {StartB, TreeB}, [{StartA, TreeA} | OutAcc], ConflictsAcc)
+ AccOut = [{Start, Tree} | Acc],
+ merge_one(Rest, {StartInsert, TreeInsert}, AccOut, HasConflicts)
end.
+-spec merge_at(tree(), Place::integer(), tree()) ->
+ {ok, Merged::tree(), HasConflicts::bool()} | no.
+merge_at(_Ours, _Place, []) ->
+ no;
merge_at([], _Place, _Insert) ->
no;
-merge_at([{Key, Value, SubTree}|Sibs], 0, {InsertKey, InsertValue, InsertSubTree}) ->
- if Key == InsertKey ->
- {Merge, Conflicts} = merge_simple(SubTree, InsertSubTree),
- {ok, [{Key, Value, Merge} | Sibs], Conflicts};
- true ->
- case merge_at(Sibs, 0, {InsertKey, InsertValue, InsertSubTree}) of
- {ok, Merged, Conflicts} ->
- {ok, [{Key, Value, SubTree} | Merged], Conflicts};
- no ->
- no
- end
- end;
-merge_at([{Key, Value, SubTree}|Sibs], Place, Insert) ->
- case merge_at(SubTree, Place - 1,Insert) of
+merge_at([{Key, Value, SubTree}|Sibs], Place, InsertTree) when Place > 0 ->
+ % inserted starts later than committed, need to drill into committed subtree
+ case merge_at(SubTree, Place - 1, InsertTree) of
{ok, Merged, Conflicts} ->
{ok, [{Key, Value, Merged} | Sibs], Conflicts};
no ->
- case merge_at(Sibs, Place, Insert) of
+ case merge_at(Sibs, Place, InsertTree) of
{ok, Merged, Conflicts} ->
{ok, [{Key, Value, SubTree} | Merged], Conflicts};
no ->
no
end
+ end;
+merge_at(OurTree, Place, [{Key, Value, SubTree}]) when Place < 0 ->
+ % inserted starts earlier than committed, need to drill into insert subtree
+ case merge_at(OurTree, Place + 1, SubTree) of
+ {ok, Merged, Conflicts} ->
+ {ok, [{Key, Value, Merged}], Conflicts};
+ no ->
+ no
+ end;
+merge_at([{Key, Value, SubTree}|Sibs], 0, [{Key, _Value, InsertSubTree}]) ->
+ {Merged, Conflicts} = merge_simple(SubTree, InsertSubTree),
+ {ok, [{Key, Value, Merged} | Sibs], Conflicts};
+merge_at([{OurKey, _, _} | _], 0, [{Key, _, _}]) when OurKey > Key ->
+ % siblings keys are ordered, no point in continuing
+ no;
+merge_at([Tree | Sibs], 0, InsertTree) ->
+ case merge_at(Sibs, 0, InsertTree) of
+ {ok, Merged, Conflicts} ->
+ {ok, [Tree | Merged], Conflicts};
+ no ->
+ no
end.
% key tree functions
+
+-spec merge_simple(tree(), tree()) -> {Merged::tree(), NewConflicts::bool()}.
merge_simple([], B) ->
{B, false};
merge_simple(A, []) ->
{A, false};
-merge_simple([ATree | ANextTree], [BTree | BNextTree]) ->
- {AKey, AValue, ASubTree} = ATree,
- {BKey, _BValue, BSubTree} = BTree,
- if
- AKey == BKey ->
- %same key
- {MergedSubTree, Conflict1} = merge_simple(ASubTree, BSubTree),
- {MergedNextTree, Conflict2} = merge_simple(ANextTree, BNextTree),
- {[{AKey, AValue, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2};
- AKey < BKey ->
- {MTree, _} = merge_simple(ANextTree, [BTree | BNextTree]),
- {[ATree | MTree], true};
- true ->
- {MTree, _} = merge_simple([ATree | ANextTree], BNextTree),
- {[BTree | MTree], true}
- end.
+merge_simple([{Key, Value, SubA} | NextA], [{Key, _, SubB} | NextB]) ->
+ {MergedSubTree, Conflict1} = merge_simple(SubA, SubB),
+ {MergedNextTree, Conflict2} = merge_simple(NextA, NextB),
+ {[{Key, Value, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2};
+merge_simple([{A, _, _} = Tree | Next], [{B, _, _} | _] = Insert) when A < B ->
+ {Merged, _} = merge_simple(Next, Insert),
+ {[Tree | Merged], true};
+merge_simple(Ours, [Tree | Next]) ->
+ {Merged, _} = merge_simple(Ours, Next),
+ {[Tree | Merged], true}.
find_missing(_Tree, []) ->
[];
@@ -159,7 +162,7 @@ remove_leafs(Trees, Keys) ->
fun({PathPos, Path},TreeAcc) ->
[SingleTree] = lists:foldl(
fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path),
- {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]),
+ {NewTrees, _} = merge(TreeAcc, {PathPos + 1 - length(Path), SingleTree}),
NewTrees
end, [], FilteredPaths),
{NewTree, RemovedKeys}.
@@ -321,7 +324,7 @@ stem(Trees, Limit) ->
fun({PathPos, Path},TreeAcc) ->
[SingleTree] = lists:foldl(
fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path),
- {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]),
+ {NewTrees, _} = merge(TreeAcc, {PathPos + 1 - length(Path), SingleTree}),
NewTrees
end, [], Paths2).
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl
index 126639e0..c804b49d 100644
--- a/apps/couch/src/couch_rep.erl
+++ b/apps/couch/src/couch_rep.erl
@@ -15,7 +15,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([replicate/2, checkpoint/1, start_link/3]).
+-export([replicate/2, checkpoint/1]).
-include("couch_db.hrl").
@@ -34,6 +34,7 @@
start_seq,
history,
+ session_id,
source_log,
target_log,
rep_starttime,
@@ -46,12 +47,11 @@
committed_seq = 0,
stats = nil,
- doc_ids = nil
+ doc_ids = nil,
+ source_db_update_notifier = nil,
+ target_db_update_notifier = nil
}).
-start_link(Id, PostBody, UserCtx) ->
- gen_server:start_link(?MODULE, [Id, PostBody, UserCtx], []).
-
%% convenience function to do a simple replication from the shell
replicate(Source, Target) when is_list(Source) ->
replicate(?l2b(Source), Target);
@@ -64,7 +64,7 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
replicate({Props}=PostBody, UserCtx) ->
{BaseId, Extension} = make_replication_id(PostBody, UserCtx),
Replicator = {BaseId ++ Extension,
- {?MODULE, start_link, [BaseId, PostBody, UserCtx]},
+ {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
temporary,
1,
worker,
@@ -83,15 +83,10 @@ replicate({Props}=PostBody, UserCtx) ->
false ->
Server = start_replication_server(Replicator),
- Continuous = couch_util:get_value(<<"continuous">>, Props, false),
- Async = couch_util:get_value(<<"async">>, Props, false),
- case {Continuous, Async} of
- {true, _} ->
+ case couch_util:get_value(<<"continuous">>, Props, false) of
+ true ->
{ok, {continuous, ?l2b(BaseId)}};
- {_, true} ->
- spawn(fun() -> get_result(Server, PostBody, UserCtx) end),
- Server;
- _ ->
+ false ->
get_result(Server, PostBody, UserCtx)
end
end.
@@ -113,8 +108,10 @@ get_result(Server, PostBody, UserCtx) ->
end.
init(InitArgs) ->
- try do_init(InitArgs)
- catch _:Error ->
+ try
+ do_init(InitArgs)
+ catch
+ throw:Error ->
{stop, Error}
end.
@@ -199,12 +196,15 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
start_seq = StartSeq,
history = History,
+ session_id = couch_uuids:random(),
source_log = SourceLog,
target_log = TargetLog,
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- doc_ids = DocIds
+ doc_ids = DocIds,
+ source_db_update_notifier = source_db_update_notifier(Source),
+ target_db_update_notifier = target_db_update_notifier(Target)
},
{ok, State}.
@@ -212,7 +212,21 @@ handle_call(get_result, From, #state{complete=true, listeners=[]} = State) ->
{stop, normal, State#state{listeners=[From]}};
handle_call(get_result, From, State) ->
Listeners = State#state.listeners,
- {noreply, State#state{listeners=[From|Listeners]}}.
+ {noreply, State#state{listeners=[From|Listeners]}};
+
+handle_call(get_source_db, _From, #state{source = Source} = State) ->
+ {reply, {ok, Source}, State};
+
+handle_call(get_target_db, _From, #state{target = Target} = State) ->
+ {reply, {ok, Target}, State}.
+
+handle_cast(reopen_source_db, #state{source = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#state{source = NewSource}};
+
+handle_cast(reopen_target_db, #state{target = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#state{target = NewTarget}};
handle_cast(do_checkpoint, State) ->
{noreply, do_checkpoint(State)};
@@ -221,16 +235,14 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
- couch_task_status:update("MR Processed source update #~p of ~p",
- [SourceSeq, seqnum(State#state.source)]),
+ couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
{noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
when SourceSeq > N ->
MissingRevs = State#state.missing_revs,
ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
- couch_task_status:update("W Processed source update #~p of ~p",
- [SourceSeq, seqnum(State#state.source)]),
+ couch_task_status:update("W Processed source update #~p", [SourceSeq]),
{noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, _}, State) ->
{noreply, State};
@@ -239,14 +251,8 @@ handle_info({update_stats, Key, N}, State) ->
ets:update_counter(State#state.stats, Key, N),
{noreply, State};
-handle_info({'DOWN', _, _, Pid, _}, State) ->
- Me = node(),
- case erlang:node(Pid) of
- Me ->
- ?LOG_INFO("replication terminating - local DB is shutting down", []);
- Node ->
- ?LOG_INFO("replication terminating - DB on ~p is shutting down", [Node])
- end,
+handle_info({'DOWN', _, _, _, _}, State) ->
+ ?LOG_INFO("replication terminating because local DB is shutting down", []),
timer:cancel(State#state.checkpoint_scheduled),
{stop, shutdown, State};
@@ -293,35 +299,40 @@ code_change(_OldVsn, State, _Extra) ->
% internal funs
start_replication_server(Replicator) ->
- start_replication_server(Replicator, fun start_child/1).
-
-start_replication_server(Replicator, StartFun) ->
- case StartFun(Replicator) of
+ RepId = element(1, Replicator),
+ case supervisor:start_child(couch_rep_sup, Replicator) of
{ok, Pid} ->
+ ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
Pid;
{error, already_present} ->
- start_replication_server(Replicator, fun restart_child/1);
+ case supervisor:restart_child(couch_rep_sup, RepId) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
+ Pid;
+ {error, running} ->
+ %% this error occurs if multiple replicators are racing
+ %% each other to start and somebody else won. Just grab
+ %% the Pid by calling start_child again.
+ {error, {already_started, Pid}} =
+ supervisor:start_child(couch_rep_sup, Replicator),
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
+ Pid;
+ {error, {db_not_found, DbUrl}} ->
+ throw({db_not_found, <<"could not open ", DbUrl/binary>>});
+ {error, {unauthorized, DbUrl}} ->
+ throw({unauthorized,
+ <<"unauthorized to access database ", DbUrl/binary>>})
+ end;
{error, {already_started, Pid}} ->
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
Pid;
- {error, running} ->
- Children = supervisor:which_children(couch_rep_sup),
- {value, {_, Pid, _, _}} = lists:keysearch(Replicator, 1, Children),
- Pid;
- % sadly both seem to be needed. I don't know why.
{error, {{db_not_found, DbUrl}, _}} ->
throw({db_not_found, <<"could not open ", DbUrl/binary>>});
- {error, {db_not_found, DbUrl}} ->
- throw({db_not_found, <<"could not open ", DbUrl/binary>>});
- {error, {node_not_connected, Node}} ->
- throw({node_not_connected, Node})
+ {error, {{unauthorized, DbUrl}, _}} ->
+ throw({unauthorized,
+ <<"unauthorized to access database ", DbUrl/binary>>})
end.
-start_child(Replicator) ->
- supervisor:start_child(couch_rep_sup, Replicator).
-
-restart_child(Replicator) ->
- supervisor:restart_child(couch_rep_sup, element(1, Replicator)).
-
compare_replication_logs(SrcDoc, TgtDoc) ->
#doc{body={RepRecProps}} = SrcDoc,
#doc{body={RepRecPropsTgt}} = TgtDoc,
@@ -373,15 +384,9 @@ close_db(Db) ->
couch_db:close(Db).
dbname(#http_db{url = Url}) ->
- strip_password(Url);
-dbname(#db{name = Name, main_pid = MainPid}) ->
- ?l2b([Name, " (", pid_to_list(MainPid), ")"]).
-
-strip_password(Url) ->
- re:replace(Url,
- "http(s)?://([^:]+):[^@]+@(.*)$",
- "http\\1://\\2:*****@\\3",
- [{return, list}]).
+ couch_util:url_strip_password(Url);
+dbname(#db{name = Name}) ->
+ Name.
dbinfo(#http_db{} = Db) ->
{DbProps} = couch_rep_httpc:request(Db),
@@ -445,13 +450,20 @@ do_terminate(State) ->
false ->
[gen_server:reply(R, retry) || R <- OtherListeners]
end,
+ couch_task_status:update("Finishing"),
terminate_cleanup(State).
-terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) ->
- couch_task_status:update("Finishing"),
- close_db(Target),
- close_db(Source),
- ets:delete(Stats).
+terminate_cleanup(State) ->
+ close_db(State#state.source),
+ close_db(State#state.target),
+ stop_db_update_notifier(State#state.source_db_update_notifier),
+ stop_db_update_notifier(State#state.target_db_update_notifier),
+ ets:delete(State#state.stats).
+
+stop_db_update_notifier(nil) ->
+ ok;
+stop_db_update_notifier(Notifier) ->
+ couch_db_update_notifier:stop(Notifier).
has_session_id(_SessionId, []) ->
false;
@@ -476,12 +488,7 @@ maybe_append_options(Options, Props) ->
make_replication_id({Props}, UserCtx) ->
%% funky algorithm to preserve backwards compatibility
- case couch_util:get_value(<<"use_hostname">>, Props, false) of
- true ->
- {ok, HostName} = inet:gethostname();
- false ->
- HostName = couch_config:get("replication", "hostname", "cloudant.com")
- end,
+ {ok, HostName} = inet:gethostname(),
% Port = mochiweb_socket_server:get(couch_httpd, port),
Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
@@ -504,22 +511,15 @@ make_replication_id({Props}, UserCtx) ->
maybe_add_trailing_slash(Url) ->
re:replace(Url, "[^/]$", "&/", [{return, list}]).
-get_rep_endpoint(UserCtx, {Props}) ->
- case couch_util:get_value(<<"url">>, Props) of
+get_rep_endpoint(_UserCtx, {Props}) ->
+ Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
+ {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
+ {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
+ case couch_util:get_value(<<"oauth">>, Auth) of
undefined ->
- Node = couch_util:get_value(<<"node">>, Props),
- Name = couch_util:get_value(<<"name">>, Props),
- {Node, Name, UserCtx};
- RawUrl ->
- Url = maybe_add_trailing_slash(RawUrl),
- {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
- {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
- case couch_util:get_value(<<"oauth">>, Auth) of
- undefined ->
- {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
- {OAuth} ->
- {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
- end
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+ {OAuth} ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
end;
get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) ->
{remote, maybe_add_trailing_slash(Url), []};
@@ -533,43 +533,27 @@ open_replication_log(#http_db{}=Db, RepId) ->
Req = Db#http_db{resource=couch_util:url_encode(DocId)},
case couch_rep_httpc:request(Req) of
{[{<<"error">>, _}, {<<"reason">>, _}]} ->
- % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
+ ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
#doc{id=?l2b(DocId)};
Doc ->
- % ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
+ ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
couch_doc:from_json_obj(Doc)
end;
open_replication_log(Db, RepId) ->
DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
case couch_db:open_doc(Db, DocId, []) of
{ok, Doc} ->
- % ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
+ ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
Doc;
_ ->
- % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
+ ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
#doc{id=DocId}
end.
open_db(Props, UserCtx, ProxyParams) ->
open_db(Props, UserCtx, ProxyParams, false).
-open_db(<<"http://",_/binary>>=Url, _, ProxyParams, Create) ->
- open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create);
-open_db(<<"https://",_/binary>>=Url, _, ProxyParams, Create) ->
- open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create);
-open_db({Props}, UserCtx, ProxyParams, Create) ->
- case couch_util:get_value(<<"url">>, Props) of
- undefined ->
- Node = couch_util:get_value(<<"node">>, Props, node()),
- DbName = couch_util:get_value(<<"name">>, Props),
- open_local_db(Node, DbName, UserCtx, Create);
- _Url ->
- open_remote_db({Props}, ProxyParams, Create)
- end;
-open_db(<<DbName/binary>>, UserCtx, _ProxyParams, Create) ->
- open_local_db(node(), DbName, UserCtx, Create).
-
-open_remote_db({Props}, ProxyParams, CreateTarget) ->
+open_db({Props}, _UserCtx, ProxyParams, CreateTarget) ->
Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
{AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}),
{BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
@@ -580,33 +564,34 @@ open_remote_db({Props}, ProxyParams, CreateTarget) ->
auth = AuthProps,
headers = lists:ukeymerge(1, Headers, DefaultHeaders)
},
- Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams},
- couch_rep_httpc:db_exists(Db, CreateTarget).
-
-open_local_db(Node, DbName, UserCtx, Create) when is_binary(Node) ->
- try open_local_db(list_to_existing_atom(?b2l(Node)), DbName, UserCtx, Create)
- catch error:badarg ->
- ?LOG_ERROR("unknown replication node ~s", [Node]),
- throw({node_not_connected, Node}) end;
-open_local_db(Node, DbName, UserCtx, Create) when is_atom(Node) ->
- case catch gen_server:call({couch_server, Node}, {open, DbName, []}, infinity) of
- {ok, #db{} = Db} ->
- couch_db:monitor(Db),
- Db#db{fd_monitor = erlang:monitor(process, Db#db.fd)};
- {ok, MainPid} when is_pid(MainPid) ->
- {ok, Db} = couch_db:open_ref_counted(MainPid, UserCtx),
- couch_db:monitor(Db),
- Db;
- {not_found, no_db_file} when Create =:= false->
- throw({db_not_found, DbName});
- {not_found, no_db_file} ->
- ok = couch_httpd:verify_is_server_admin(UserCtx),
- couch_server:create(DbName, [{user_ctx, UserCtx}]);
- {'EXIT', {{nodedown, Node}, _Stack}} ->
- throw({node_not_connected, couch_util:to_binary(Node)});
- {'EXIT', {noproc, {gen_server,call,_}}} ->
- timer:sleep(1000),
- throw({noproc, couch_server, Node})
+ Db = Db1#http_db{
+ options = Db1#http_db.options ++ ProxyParams ++
+ couch_rep_httpc:ssl_options(Db1)
+ },
+ couch_rep_httpc:db_exists(Db, CreateTarget);
+open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
+ open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
+open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
+ open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
+open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) ->
+ try
+ case CreateTarget of
+ true ->
+ ok = couch_httpd:verify_is_server_admin(UserCtx),
+ couch_server:create(DbName, [{user_ctx, UserCtx}]);
+ false ->
+ ok
+ end,
+
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} ->
+ couch_db:monitor(Db),
+ Db;
+ {not_found, no_db_file} ->
+ throw({db_not_found, DbName})
+ end
+ catch throw:{unauthorized, _} ->
+ throw({unauthorized, DbName})
end.
schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) ->
@@ -628,6 +613,7 @@ do_checkpoint(State) ->
committed_seq = NewSeqNum,
start_seq = StartSeqNum,
history = OldHistory,
+ session_id = SessionId,
source_log = SourceLog,
target_log = TargetLog,
rep_starttime = ReplicationStartTime,
@@ -637,14 +623,8 @@ do_checkpoint(State) ->
} = State,
case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
- ?LOG_DEBUG("recording a checkpoint for ~s -> ~s at source update_seq ~p"
- " of ~p", [dbname(Source), dbname(Target), NewSeqNum, seqnum(Source)]),
- SessionId = couch_uuids:new(),
- TargetNode = case Target of #db{main_pid=MainPid} ->
- erlang:node(MainPid);
- _ ->
- http
- end,
+ ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
+ [dbname(Source), dbname(Target), NewSeqNum]),
NewHistoryEntry = {[
{<<"session_id">>, SessionId},
{<<"start_time">>, list_to_binary(ReplicationStartTime)},
@@ -663,7 +643,6 @@ do_checkpoint(State) ->
NewRepHistory = {[
{<<"session_id">>, SessionId},
{<<"source_last_seq">>, NewSeqNum},
- {<<"target_node">>, TargetNode},
{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
]},
@@ -683,9 +662,7 @@ do_checkpoint(State) ->
"yourself?)", []),
State
end;
- Else ->
- ?LOG_INFO("wanted ~p, got ~p from commit_to_both", [
- {SrcInstanceStartTime, TgtInstanceStartTime}, Else]),
+ _Else ->
?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint",
[dbname(Source), dbname(Target)]),
#state{
@@ -717,31 +694,30 @@ commit_to_both(Source, Target, RequiredSeq) ->
{SrcCommitPid, Timestamp} ->
Timestamp;
{'EXIT', SrcCommitPid, {http_request_failed, _}} ->
- nil;
- {'EXIT', SrcCommitPid, {noproc, {gen_server, call, [_]}}} ->
- nil; % DB crashed, this should trigger a reboot
- {'EXIT', SrcCommitPid, Else} ->
- ?LOG_ERROR("new error code for crashed replication commit ~p", [Else]),
- nil
+ exit(replication_link_failure)
end,
{SourceStartTime, TargetStartTime}.
ensure_full_commit(#http_db{headers = Headers} = Target) ->
+ Headers1 = [
+ {"Content-Length", 0} |
+ couch_util:proplist_apply_field(
+ {"Content-Type", "application/json"}, Headers)
+ ],
Req = Target#http_db{
resource = "_ensure_full_commit",
method = post,
- headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers)
+ headers = Headers1
},
{ResultProps} = couch_rep_httpc:request(Req),
true = couch_util:get_value(<<"ok">>, ResultProps),
couch_util:get_value(<<"instance_start_time">>, ResultProps);
-ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) ->
- TargetNode = erlang:node(Pid),
- {ok, NewDb} = rpc:call(TargetNode, couch_db, open_int, [DbName, []]),
+ensure_full_commit(Target) ->
+ {ok, NewDb} = couch_db:open_int(Target#db.name, []),
UpdateSeq = couch_db:get_update_seq(Target),
CommitSeq = couch_db:get_committed_update_seq(NewDb),
InstanceStartTime = NewDb#db.instance_start_time,
- catch couch_db:close(NewDb),
+ couch_db:close(NewDb),
if UpdateSeq > CommitSeq ->
?LOG_DEBUG("target needs a full commit: update ~p commit ~p",
[UpdateSeq, CommitSeq]),
@@ -753,11 +729,16 @@ ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) ->
end.
ensure_full_commit(#http_db{headers = Headers} = Source, RequiredSeq) ->
+ Headers1 = [
+ {"Content-Length", 0} |
+ couch_util:proplist_apply_field(
+ {"Content-Type", "application/json"}, Headers)
+ ],
Req = Source#http_db{
resource = "_ensure_full_commit",
method = post,
qs = [{seq, RequiredSeq}],
- headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers)
+ headers = Headers1
},
{ResultProps} = couch_rep_httpc:request(Req),
case couch_util:get_value(<<"ok">>, ResultProps) of
@@ -801,11 +782,6 @@ up_to_date(Source, Seq) ->
couch_db:close(NewDb),
T.
-seqnum(#http_db{}) ->
- -1;
-seqnum(Db) ->
- Db#db.update_seq.
-
parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
parse_proxy_params(?b2l(ProxyUrl));
parse_proxy_params([]) ->
@@ -820,3 +796,27 @@ parse_proxy_params(ProxyUrl) ->
true ->
[{proxy_user, User}, {proxy_password, Passwd}]
end.
+
+source_db_update_notifier(#db{name = DbName}) ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, reopen_source_db);
+ (_) ->
+ ok
+ end),
+ Notifier;
+source_db_update_notifier(_) ->
+ nil.
+
+target_db_update_notifier(#db{name = DbName}) ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, reopen_target_db);
+ (_) ->
+ ok
+ end),
+ Notifier;
+target_db_update_notifier(_) ->
+ nil.
diff --git a/apps/couch/src/couch_rep_att.erl b/apps/couch/src/couch_rep_att.erl
index 476c64d4..72c723e8 100644
--- a/apps/couch/src/couch_rep_att.erl
+++ b/apps/couch/src/couch_rep_att.erl
@@ -105,7 +105,7 @@ validate_headers(_Req, 200, Headers) ->
MochiHeaders = mochiweb_headers:make(Headers),
{ok, mochiweb_headers:get_value("Content-Encoding", MochiHeaders)};
validate_headers(Req, Code, Headers) when Code > 299, Code < 400 ->
- Url = mochiweb_headers:get_value("Location",mochiweb_headers:make(Headers)),
+ Url = couch_rep_httpc:redirect_url(Headers, Req#http_db.url),
NewReq = couch_rep_httpc:redirected_request(Req, Url),
{ibrowse_req_id, ReqId} = couch_rep_httpc:request(NewReq),
receive {ibrowse_async_headers, ReqId, NewCode, NewHeaders} ->
diff --git a/apps/couch/src/couch_rep_changes_feed.erl b/apps/couch/src/couch_rep_changes_feed.erl
index 66696912..032f62a3 100644
--- a/apps/couch/src/couch_rep_changes_feed.erl
+++ b/apps/couch/src/couch_rep_changes_feed.erl
@@ -43,9 +43,10 @@ next(Server) ->
gen_server:call(Server, next_changes, infinity).
stop(Server) ->
- gen_server:call(Server, stop).
+ catch gen_server:call(Server, stop),
+ ok.
-init([_Parent, #http_db{}=Source, Since, PostProps] = Args) ->
+init([Parent, #http_db{}=Source, Since, PostProps]) ->
process_flag(trap_exit, true),
Feed = case couch_util:get_value(<<"continuous">>, PostProps, false) of
false ->
@@ -83,27 +84,32 @@ init([_Parent, #http_db{}=Source, Since, PostProps] = Args) ->
resource = "_changes",
qs = QS,
conn = Pid,
- options = [{stream_to, {self(), once}}, {response_format, binary}],
+ options = [{stream_to, {self(), once}}] ++
+ lists:keydelete(inactivity_timeout, 1, Source#http_db.options),
headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}]
},
{ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req),
+ Args = [Parent, Req, Since, PostProps],
receive
{ibrowse_async_headers, ReqId, "200", _} ->
ibrowse:stream_next(ReqId),
{ok, #state{conn=Pid, last_seq=Since, reqid=ReqId, init_args=Args}};
{ibrowse_async_headers, ReqId, Code, Hdrs} when Code=="301"; Code=="302" ->
- catch ibrowse:stop_worker_process(Pid),
- Url2 = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
- %% TODO use couch_httpc:request instead of start_http_request
- {Pid2, ReqId2} = start_http_request(Url2),
+ stop_link_worker(Pid),
+ Url2 = couch_rep_httpc:redirect_url(Hdrs, Req#http_db.url),
+ Req2 = couch_rep_httpc:redirected_request(Req, Url2),
+ Pid2 = couch_rep_httpc:spawn_link_worker_process(Req2),
+ Req3 = Req2#http_db{conn = Pid2},
+ {ibrowse_req_id, ReqId2} = couch_rep_httpc:request(Req3),
+ Args2 = [Parent, Req3, Since, PostProps],
receive {ibrowse_async_headers, ReqId2, "200", _} ->
- {ok, #state{conn=Pid2, last_seq=Since, reqid=ReqId2, init_args=Args}}
+ {ok, #state{conn=Pid2, last_seq=Since, reqid=ReqId2, init_args=Args2}}
after 30000 ->
{stop, changes_timeout}
end;
{ibrowse_async_headers, ReqId, "404", _} ->
- catch ibrowse:stop_worker_process(Pid),
+ stop_link_worker(Pid),
?LOG_INFO("source doesn't have _changes, trying _all_docs_by_seq", []),
Self = self(),
BySeqPid = spawn_link(fun() -> by_seq_loop(Self, Source, Since) end),
@@ -181,7 +187,7 @@ handle_cast(_Msg, State) ->
handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
handle_headers(list_to_integer(Code), Hdrs, State);
-handle_info({ibrowse_async_response, Id, {error,connection_closed}},
+handle_info({ibrowse_async_response, Id, {error, sel_conn_closed}},
#state{reqid=Id}=State) ->
handle_retry(State);
@@ -198,16 +204,27 @@ handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) ->
handle_info({'EXIT', From, normal}, #state{changes_loop=From} = State) ->
handle_feed_completion(State);
+handle_info({'EXIT', From, normal}, #state{conn=From, complete=true} = State) ->
+ {noreply, State};
+
handle_info({'EXIT', From, Reason}, #state{changes_loop=From} = State) ->
?LOG_ERROR("changes_loop died with reason ~p", [Reason]),
{stop, changes_loop_died, State};
-handle_info({'EXIT', _From, normal}, State) ->
- {noreply, State};
+handle_info({'EXIT', From, Reason}, State) ->
+ ?LOG_ERROR("changes loop, process ~p died with reason ~p", [From, Reason]),
+ {stop, {From, Reason}, State};
-handle_info(Msg, State) ->
- ?LOG_DEBUG("unexpected message at changes_feed ~p", [Msg]),
- {noreply, State}.
+handle_info(Msg, #state{init_args = InitArgs} = State) ->
+ case Msg of
+ changes_timeout ->
+ [_, #http_db{url = Url} | _] = InitArgs,
+ ?LOG_ERROR("changes loop timeout, no data received from ~s",
+ [couch_util:url_strip_password(Url)]);
+ _ ->
+ ?LOG_ERROR("changes loop received unexpected message ~p", [Msg])
+ end,
+ {stop, Msg, State}.
terminate(_Reason, State) ->
#state{
@@ -215,8 +232,7 @@ terminate(_Reason, State) ->
conn = Conn
} = State,
if is_pid(ChangesPid) -> exit(ChangesPid, stop); true -> ok end,
- if is_pid(Conn) -> catch ibrowse:stop_worker_process(Conn); true -> ok end,
- ok.
+ stop_link_worker(Conn).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -257,12 +273,17 @@ handle_next_changes(_From, State) ->
handle_headers(200, _, State) ->
maybe_stream_next(State),
{noreply, State};
-handle_headers(301, Hdrs, State) ->
- catch ibrowse:stop_worker_process(State#state.conn),
- Url = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
- %% TODO use couch_httpc:request instead of start_http_request
- {Pid, ReqId} = start_http_request(Url),
- {noreply, State#state{conn=Pid, reqid=ReqId}};
+handle_headers(Code, Hdrs, #state{init_args = InitArgs} = State)
+ when Code =:= 301 ; Code =:= 302 ->
+ stop_link_worker(State#state.conn),
+ [Parent, #http_db{url = Url1} = Source, Since, PostProps] = InitArgs,
+ Url = couch_rep_httpc:redirect_url(Hdrs, Url1),
+ Source2 = couch_rep_httpc:redirected_request(Source, Url),
+ Pid2 = couch_rep_httpc:spawn_link_worker_process(Source2),
+ Source3 = Source2#http_db{conn = Pid2},
+ {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Source3),
+ InitArgs2 = [Parent, Source3, Since, PostProps],
+ {noreply, State#state{conn=Pid2, reqid=ReqId, init_args=InitArgs2}};
handle_headers(Code, Hdrs, State) ->
?LOG_ERROR("replicator changes feed failed with code ~s and Headers ~n~p",
[Code,Hdrs]),
@@ -367,20 +388,15 @@ maybe_stream_next(#state{reqid=nil}) ->
ok;
maybe_stream_next(#state{complete=false, count=N} = S) when N < ?BUFFER_SIZE ->
timer:cancel(get(timeout)),
- {ok, Timeout} = timer:exit_after(31000, changes_timeout),
+ {ok, Timeout} = timer:send_after(31000, changes_timeout),
put(timeout, Timeout),
ibrowse:stream_next(S#state.reqid);
maybe_stream_next(_) ->
timer:cancel(get(timeout)).
-start_http_request(RawUrl) ->
- Url = ibrowse_lib:parse_url(RawUrl),
- {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
- Opts = [
- {stream_to, {self(), once}},
- {inactivity_timeout, 31000},
- {response_format, binary}
- ],
- {ibrowse_req_id, Id} =
- ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),
- {Pid, Id}.
+stop_link_worker(Conn) when is_pid(Conn) ->
+ unlink(Conn),
+ receive {'EXIT', Conn, _} -> ok after 0 -> ok end,
+ catch ibrowse:stop_worker_process(Conn);
+stop_link_worker(_) ->
+ ok.
diff --git a/apps/couch/src/couch_rep_httpc.erl b/apps/couch/src/couch_rep_httpc.erl
index 3b11b869..8153fdcf 100644
--- a/apps/couch/src/couch_rep_httpc.erl
+++ b/apps/couch/src/couch_rep_httpc.erl
@@ -15,7 +15,8 @@
-include_lib("ibrowse/include/ibrowse.hrl").
-export([db_exists/1, db_exists/2, full_url/1, request/1, redirected_request/2,
- spawn_worker_process/1, spawn_link_worker_process/1]).
+ redirect_url/2, spawn_worker_process/1, spawn_link_worker_process/1]).
+-export([ssl_options/1]).
request(#http_db{} = Req) ->
do_request(Req).
@@ -72,6 +73,7 @@ db_exists(Req, CanonicalUrl, CreateDB) ->
#http_db{
auth = Auth,
headers = Headers0,
+ options = Options,
url = Url
} = Req,
HeadersFun = fun(Method) ->
@@ -84,11 +86,13 @@ db_exists(Req, CanonicalUrl, CreateDB) ->
end,
case CreateDB of
true ->
- catch ibrowse:send_req(Url, HeadersFun(put), put);
+ Headers = [{"Content-Length", 0} | HeadersFun(put)],
+ catch ibrowse:send_req(Url, Headers, put, [], Options);
_Else -> ok
end,
- case catch ibrowse:send_req(Url, HeadersFun(head), head) of
+ case catch ibrowse:send_req(Url, HeadersFun(head), head, [], Options) of
{ok, "200", _, _} ->
+ config_http(CanonicalUrl),
Req#http_db{url = CanonicalUrl};
{ok, "301", RespHeaders, _} ->
RedirectUrl = redirect_url(RespHeaders, Req#http_db.url),
@@ -96,11 +100,26 @@ db_exists(Req, CanonicalUrl, CreateDB) ->
{ok, "302", RespHeaders, _} ->
RedirectUrl = redirect_url(RespHeaders, Req#http_db.url),
db_exists(Req#http_db{url = RedirectUrl}, CanonicalUrl);
+ {ok, "401", _, _} ->
+ throw({unauthorized, ?l2b(Url)});
Error ->
?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]),
throw({db_not_found, ?l2b(Url)})
end.
+config_http(Url) ->
+ #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
+ ok = ibrowse:set_max_sessions(Host, Port, list_to_integer(
+ couch_config:get("replicator", "max_http_sessions", "20"))),
+ ok = ibrowse:set_max_pipeline_size(Host, Port, list_to_integer(
+ couch_config:get("replicator", "max_http_pipeline_size", "50"))),
+ ok = couch_config:register(
+ fun("replicator", "max_http_sessions", MaxSessions) ->
+ ibrowse:set_max_sessions(Host, Port, list_to_integer(MaxSessions));
+ ("replicator", "max_http_pipeline_size", PipeSize) ->
+ ibrowse:set_max_pipeline_size(Host, Port, list_to_integer(PipeSize))
+ end).
+
redirect_url(RespHeaders, OrigUrl) ->
MochiHeaders = mochiweb_headers:make(RespHeaders),
RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
@@ -167,7 +186,7 @@ process_response({error, Reason}, Req) ->
pause = Pause
} = Req,
ShortReason = case Reason of
- connection_closed ->
+ sel_conn_closed ->
connection_closed;
{'EXIT', {noproc, _}} ->
noproc;
@@ -203,8 +222,7 @@ spawn_worker_process(Req) ->
Pid.
spawn_link_worker_process(Req) ->
- Url = ibrowse_lib:parse_url(Req#http_db.url),
- {ok, Pid} = ibrowse_http_client:start_link(Url),
+ {ok, Pid} = ibrowse:spawn_link_worker_process(Req#http_db.url),
Pid.
maybe_decompress(Headers, Body) ->
@@ -243,3 +261,35 @@ oauth_header(Url, QS, Action, Props) ->
Params = oauth:signed_params(Method, Url, QSL, Consumer, Token, TokenSecret)
-- QSL,
{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}.
+
+ssl_options(#http_db{url = Url}) ->
+ case ibrowse_lib:parse_url(Url) of
+ #url{protocol = https} ->
+ Depth = list_to_integer(
+ couch_config:get("replicator", "ssl_certificate_max_depth", "3")
+ ),
+ SslOpts = [{depth, Depth} |
+ case couch_config:get("replicator", "verify_ssl_certificates") of
+ "true" ->
+ ssl_verify_options(true);
+ _ ->
+ ssl_verify_options(false)
+ end],
+ [{is_ssl, true}, {ssl_options, SslOpts}];
+ #url{protocol = http} ->
+ []
+ end.
+
+ssl_verify_options(Value) ->
+ ssl_verify_options(Value, erlang:system_info(otp_release)).
+
+ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
+ CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+ [{verify, verify_peer}, {cacertfile, CAFile}];
+ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
+ [{verify, verify_none}];
+ssl_verify_options(true, _OTPVersion) ->
+ CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+ [{verify, 2}, {cacertfile, CAFile}];
+ssl_verify_options(false, _OTPVersion) ->
+ [{verify, 0}].
diff --git a/apps/couch/src/couch_rep_missing_revs.erl b/apps/couch/src/couch_rep_missing_revs.erl
index 1eff6774..9809ca5e 100644
--- a/apps/couch/src/couch_rep_missing_revs.erl
+++ b/apps/couch/src/couch_rep_missing_revs.erl
@@ -24,7 +24,6 @@
-record (state, {
changes_loop,
changes_from = nil,
- target,
parent,
complete = false,
count = 0,
@@ -44,11 +43,11 @@ next(Server) ->
stop(Server) ->
gen_server:call(Server, stop).
-init([Parent, Target, ChangesFeed, _PostProps]) ->
+init([Parent, _Target, ChangesFeed, _PostProps]) ->
process_flag(trap_exit, true),
Self = self(),
- Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Target) end),
- {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}.
+ Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Parent) end),
+ {ok, #state{changes_loop=Pid, parent=Parent}}.
handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) ->
State#state.parent ! {update_stats, missing_revs, length(Revs)},
@@ -133,15 +132,16 @@ handle_changes_loop_exit(normal, State) ->
handle_changes_loop_exit(Reason, State) ->
{stop, Reason, State#state{changes_loop=nil}}.
-changes_loop(OurServer, SourceChangesServer, Target) ->
+changes_loop(OurServer, SourceChangesServer, Parent) ->
case couch_rep_changes_feed:next(SourceChangesServer) of
complete ->
exit(normal);
Changes ->
+ {ok, Target} = gen_server:call(Parent, get_target_db, infinity),
MissingRevs = get_missing_revs(Target, Changes),
gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity)
end,
- changes_loop(OurServer, SourceChangesServer, Target).
+ changes_loop(OurServer, SourceChangesServer, Parent).
get_missing_revs(#http_db{}=Target, Changes) ->
Transform = fun({Props}) ->
diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl
index 46633994..a7ae45a8 100644
--- a/apps/couch/src/couch_rep_reader.erl
+++ b/apps/couch/src/couch_rep_reader.erl
@@ -20,12 +20,9 @@
-import(couch_util, [url_encode/1]).
-define (BUFFER_SIZE, 1000).
--define (MAX_CONCURRENT_REQUESTS, 10).
--define (MAX_CONNECTIONS, 20).
--define (MAX_PIPELINE_SIZE, 50).
+-define (MAX_CONCURRENT_REQUESTS, 100).
-include("couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
-record (state, {
parent,
@@ -53,14 +50,9 @@ next(Pid) ->
init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) ->
process_flag(trap_exit, true),
- if is_record(Source, http_db) ->
- #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url),
- ibrowse:set_max_sessions(Host, Port, ?MAX_CONNECTIONS),
- ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
- true -> ok end,
Self = self(),
ReaderLoop = spawn_link(
- fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end
+ fun() -> reader_loop(Self, Parent, Source, MissingRevs_or_DocIds) end
),
MissingRevs = case MissingRevs_or_DocIds of
Pid when is_pid(Pid) ->
@@ -230,7 +222,7 @@ update_sequence_lists(Seq, State) ->
opened_seqs = Opened
}.
-open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
+open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) ->
%% all this logic just splits up revision lists that are too long for
%% MochiWeb into multiple requests
BaseQS = [{revs,true}, {latest,true}, {att_encoding_info,true}],
@@ -246,36 +238,48 @@ open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
JsonResults = lists:flatten([couch_rep_httpc:request(R) || R <- Requests]),
Transform =
- fun({[{<<"missing">>, Rev}]}) ->
- {{not_found, missing}, couch_doc:parse_rev(Rev)};
- ({[{<<"ok">>, Json}]}) ->
+ fun({[{<<"ok">>, Json}]}, Acc) ->
#doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
- Doc#doc{atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]}
+ Doc1 = Doc#doc{
+ atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
+ },
+ [Doc1 | Acc];
+ ({ErrorProps}, Acc) ->
+ Err = couch_util:get_value(<<"error">>, ErrorProps,
+ ?JSON_ENCODE({ErrorProps})),
+ ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s",
+ [DocId, couch_util:url_strip_password(Url), Err]),
+ Acc
end,
- [Transform(Result) || Result <- JsonResults].
+ lists:reverse(lists:foldl(Transform, [], JsonResults)).
-open_doc(#http_db{} = DbS, DocId) ->
+open_doc(#http_db{url = Url} = DbS, DocId) ->
% get latest rev of the doc
Req = DbS#http_db{
resource=url_encode(DocId),
qs=[{att_encoding_info, true}]
},
- case couch_rep_httpc:request(Req) of
- {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} ->
- [];
- Json ->
+ {Props} = Json = couch_rep_httpc:request(Req),
+ case couch_util:get_value(<<"_id">>, Props) of
+ Id when is_binary(Id) ->
#doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
[Doc#doc{
atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
- }]
+ }];
+ undefined ->
+ Err = couch_util:get_value(<<"error">>, Props, ?JSON_ENCODE(Json)),
+ ?LOG_ERROR("Replicator: error accessing doc ~s at ~s, reason: ~s",
+ [DocId, couch_util:url_strip_password(Url), Err]),
+ []
end.
-reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
- case Source of
+reader_loop(ReaderServer, Parent, Source1, DocIds) when is_list(DocIds) ->
+ case Source1 of
#http_db{} ->
[gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil},
infinity) || Id <- DocIds];
_LocalDb ->
+ {ok, Source} = gen_server:call(Parent, get_source_db, infinity),
Docs = lists:foldr(fun(Id, Acc) ->
case couch_db:open_doc(Source, Id) of
{ok, Doc} ->
@@ -288,7 +292,7 @@ reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
end,
exit(complete);
-reader_loop(ReaderServer, Source, MissingRevsServer) ->
+reader_loop(ReaderServer, Parent, Source, MissingRevsServer) ->
case couch_rep_missing_revs:next(MissingRevsServer) of
complete ->
exit(complete);
@@ -301,22 +305,23 @@ reader_loop(ReaderServer, Source, MissingRevsServer) ->
#http_db{} ->
[gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs},
infinity) || {Id,Seq,Revs} <- SortedIdsRevs],
- reader_loop(ReaderServer, Source, MissingRevsServer);
+ reader_loop(ReaderServer, Parent, Source, MissingRevsServer);
_Local ->
- Source2 = maybe_reopen_db(Source, HighSeq),
+ {ok, Source1} = gen_server:call(Parent, get_source_db, infinity),
+ Source2 = maybe_reopen_db(Source1, HighSeq),
lists:foreach(fun({Id,Seq,Revs}) ->
{ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]),
JustTheDocs = [Doc || {ok, Doc} <- Docs],
gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs},
infinity)
end, SortedIdsRevs),
- reader_loop(ReaderServer, Source2, MissingRevsServer)
+ couch_db:close(Source2),
+ reader_loop(ReaderServer, Parent, Source2, MissingRevsServer)
end
end.
maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
{ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]),
- couch_db:close(Db),
NewDb;
maybe_reopen_db(Db, _HighSeq) ->
Db.
diff --git a/apps/couch/src/couch_rep_writer.erl b/apps/couch/src/couch_rep_writer.erl
index dd6396fd..cf98ccfb 100644
--- a/apps/couch/src/couch_rep_writer.erl
+++ b/apps/couch/src/couch_rep_writer.erl
@@ -16,10 +16,10 @@
-include("couch_db.hrl").
-start_link(Parent, Target, Reader, _PostProps) ->
- {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
+start_link(Parent, _Target, Reader, _PostProps) ->
+ {ok, spawn_link(fun() -> writer_loop(Parent, Reader) end)}.
-writer_loop(Parent, Reader, Target) ->
+writer_loop(Parent, Reader) ->
case couch_rep_reader:next(Reader) of
{complete, nil} ->
ok;
@@ -28,6 +28,7 @@ writer_loop(Parent, Reader, Target) ->
ok;
{HighSeq, Docs} ->
DocCount = length(Docs),
+ {ok, Target} = gen_server:call(Parent, get_target_db, infinity),
try write_docs(Target, Docs) of
{ok, []} ->
Parent ! {update_stats, docs_written, DocCount};
@@ -48,7 +49,7 @@ writer_loop(Parent, Reader, Target) ->
end,
couch_rep_att:cleanup(),
couch_util:should_flush(),
- writer_loop(Parent, Reader, Target)
+ writer_loop(Parent, Reader)
end.
write_docs(#http_db{} = Db, Docs) ->
diff --git a/apps/couch/src/couch_util.erl b/apps/couch/src/couch_util.erl
index 3a6e92c5..adcb4450 100644
--- a/apps/couch/src/couch_util.erl
+++ b/apps/couch/src/couch_util.erl
@@ -27,6 +27,7 @@
-export([get_value/2, get_value/3]).
-export([md5/1, md5_init/0, md5_update/2, md5_final/1]).
-export([reorder_results/2]).
+-export([url_strip_password/1]).
-include("couch_db.hrl").
-include_lib("kernel/include/file.hrl").
@@ -417,8 +418,8 @@ compressible_att_type(MimeType) ->
),
lists:any(
fun(TypeExp) ->
- Regexp = "^\\s*" ++
- re:replace(TypeExp, "\\*", ".*", [{return, list}]) ++ "\\s*$",
+ Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
+ "(?:\\s*;.*?)?\\s*", $$],
case re:run(MimeType, Regexp, [caseless]) of
{match, _} ->
true;
@@ -452,3 +453,9 @@ reorder_results(Keys, SortedResults) when length(Keys) < 100 ->
reorder_results(Keys, SortedResults) ->
KeyDict = dict:from_list(SortedResults),
[dict:fetch(Key, KeyDict) || Key <- Keys].
+
+url_strip_password(Url) ->
+ re:replace(Url,
+ "http(s)?://([^:]+):[^@]+@(.*)$",
+ "http\\1://\\2:*****@\\3",
+ [{return, list}]).
diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl
index 9de86b82..8dca17da 100644
--- a/apps/couch/src/couch_view.erl
+++ b/apps/couch/src/couch_view.erl
@@ -30,11 +30,9 @@ start_link() ->
gen_server:start_link({local, couch_view}, couch_view, [], []).
get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
- % make temp group
- % do we need to close this db?
- {ok, _Db, Group} =
+ {ok, Group} =
couch_view_group:open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc),
- case gen_server:call(couch_view, {get_group_server, DbName, Group}) of
+ case gen_server:call(couch_view, {get_group_server, DbName, Group}, infinity) of
{ok, Pid} ->
Pid;
Error ->
@@ -42,10 +40,9 @@ get_temp_updater(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
end.
get_group_server(DbName, GroupId) ->
- % get signature for group
case couch_view_group:open_db_group(DbName, GroupId) of
{ok, Group} ->
- case gen_server:call(couch_view, {get_group_server, DbName, Group}) of
+ case gen_server:call(couch_view, {get_group_server, DbName, Group}, infinity) of
{ok, Pid} ->
Pid;
Error ->
diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl
index f56325a4..43db9036 100644
--- a/apps/couch/src/couch_view_compactor.erl
+++ b/apps/couch/src/couch_view_compactor.erl
@@ -53,18 +53,22 @@ compact_group(Group, EmptyGroup) ->
TaskName = <<DbName/binary, ShortName/binary>>,
couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>),
- Fun = fun(KV, {Bt, Acc, TotalCopied}) ->
+ Fun = fun({DocId, _ViewIdKeys} = KV, {Bt, Acc, TotalCopied, LastId}) ->
+ if DocId =:= LastId -> % COUCHDB-999
+ Msg = "Duplicates of ~s detected in ~s ~s - rebuild required",
+ exit(io_lib:format(Msg, [DocId, DbName, GroupId]));
+ true -> ok end,
if TotalCopied rem 10000 =:= 0 ->
couch_task_status:update("Copied ~p of ~p Ids (~p%)",
[TotalCopied, Count, (TotalCopied*100) div Count]),
{ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
- {ok, {Bt2, [], TotalCopied+1}};
+ {ok, {Bt2, [], TotalCopied+1, DocId}};
true ->
- {ok, {Bt, [KV|Acc], TotalCopied+1}}
+ {ok, {Bt, [KV|Acc], TotalCopied+1, DocId}}
end
end,
- {ok, _, {Bt3, Uncopied, _Total}} = couch_btree:foldl(IdBtree, Fun,
- {EmptyIdBtree, [], 0}),
+ {ok, _, {Bt3, Uncopied, _Total, _LastId}} = couch_btree:foldl(IdBtree, Fun,
+ {EmptyIdBtree, [], 0, nil}),
{ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
NewViews = lists:map(fun({View, EmptyView}) ->
diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl
index 377e7516..730db185 100644
--- a/apps/couch/src/couch_view_group.erl
+++ b/apps/couch/src/couch_view_group.erl
@@ -74,7 +74,7 @@ start_link(InitArgs) ->
end.
% init creates a closure which spawns the appropriate view_updater.
-init({{_, DbName, _}=InitArgs, ReturnPid, Ref}) ->
+init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
case prepare_group(InitArgs, false) of
{ok, #group{fd=Fd, current_seq=Seq}=Group} ->
@@ -86,12 +86,9 @@ init({{_, DbName, _}=InitArgs, ReturnPid, Ref}) ->
ignore;
_ ->
try couch_db:monitor(Db) after couch_db:close(Db) end,
- Owner = self(),
- Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end),
{ok, #group_state{
db_name= DbName,
init_args=InitArgs,
- updater_pid = Pid,
group=Group#group{dbname=DbName},
ref_counter=erlang:monitor(process,Fd)}}
end;
@@ -178,6 +175,7 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
group = #group{name=GroupId, fd=OldFd, sig=GroupSig},
init_args = {RootDir, DbName, _},
updater_pid = UpdaterPid,
+ compactor_pid = CompactorPid,
ref_counter = RefCounter
} = State,
@@ -199,6 +197,8 @@ handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
end,
%% cleanup old group
+ unlink(CompactorPid),
+ receive {'EXIT', CompactorPid, normal} -> ok after 0 -> ok end,
unlink(OldFd),
erlang:demonitor(RefCounter),
@@ -426,8 +426,8 @@ open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
def=MapSrc,
reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end,
options=DesignOptions},
-
- {ok, Db, set_view_sig(#group{name = <<"_temp">>, views=[View],
+ couch_db:close(Db),
+ {ok, set_view_sig(#group{name = <<"_temp">>, views=[View],
def_lang=Language, design_options=DesignOptions})};
Error ->
Error