diff options
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r-- | src/couchdb/couch_rep.erl | 125 |
1 files changed, 67 insertions, 58 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index f7aaa67c..0e172436 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -16,6 +16,8 @@ -export([replicate/2, replicate/3]). +url_encode(Bin) when is_binary(Bin) -> + url_encode(binary_to_list(Bin)); url_encode([H|T]) -> if H >= $a, $z >= H -> @@ -56,8 +58,10 @@ replicate(Source, Target, Options) -> replicate2(Source, DbSrc, Target, DbTgt, Options) -> {ok, HostName} = inet:gethostname(), - - RepRecKey = ?LOCAL_DOC_PREFIX ++ HostName ++ ":" ++ Source ++ ":" ++ Target, + HostNameBin = list_to_binary(HostName), + RepRecKey = <<?LOCAL_DOC_PREFIX, HostNameBin/binary, + ":", Source/binary, ":", Target/binary>>, + StartTime = httpd_util:rfc1123_date(), case proplists:get_value(full, Options, false) @@ -82,14 +86,14 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) -> end end, - #doc{body={obj,OldRepHistoryProps}} = RepRecSrc, - #doc{body={obj,OldRepHistoryPropsTrg}} = RepRecTgt, + #doc{body={OldRepHistoryProps}} = RepRecSrc, + #doc{body={OldRepHistoryPropsTrg}} = RepRecTgt, SeqNum = case OldRepHistoryProps == OldRepHistoryPropsTrg of true -> % if the records are identical, then we have a valid replication history - proplists:get_value("source_last_seq", OldRepHistoryProps, 0); + proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0); false -> ?LOG_INFO("Replication records differ. " "Performing full replication instead of incremental.", []), @@ -97,26 +101,26 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) -> 0 end, - {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum), + {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum), case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of true -> % nothing changed, don't record results - {ok, {obj, OldRepHistoryProps}}; + {ok, {OldRepHistoryProps}}; false -> HistEntries =[ - {obj, - [{"start_time", StartTime}, - {"end_time", httpd_util:rfc1123_date()}, - {"start_last_seq", SeqNum}, - {"end_last_seq", NewSeqNum} | Stats]} - | tuple_to_list(proplists:get_value("history", OldRepHistoryProps, {}))], + { + [{start_time, list_to_binary(StartTime)}, + {end_time, list_to_binary(httpd_util:rfc1123_date())}, + {start_last_seq, SeqNum}, + {end_last_seq, NewSeqNum} | Stats]} + | proplists:get_value("history", OldRepHistoryProps, [])], % something changed, record results NewRepHistory = - {obj, - [{"session_id", couch_util:new_uuid()}, - {"source_last_seq", NewSeqNum}, - {"history", list_to_tuple(lists:sublist(HistEntries, 50))}]}, + { + [{session_id, couch_util:new_uuid()}, + {source_last_seq, NewSeqNum}, + {history, lists:sublist(HistEntries, 50)}]}, {ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []), {ok, _} = update_doc(DbTgt, RepRecTgt#doc{body=NewRepHistory}, []), @@ -165,6 +169,7 @@ get_missing_revs_loop(DbTarget, OpenDocsPid, RevsChecked, MissingFound) -> receive {Src, Id, Revs} -> Src ! got_it, + MissingRevs = case get_missing_revs(DbTarget, [{Id, Revs}]) of {ok, [{Id, MissingRevs0}]} -> @@ -179,8 +184,8 @@ get_missing_revs_loop(DbTarget, OpenDocsPid, RevsChecked, MissingFound) -> RevsChecked + length(Revs), MissingFound + length(MissingRevs)); {Src, shutdown} -> - Src ! {done, self(), [{"missing_checked", RevsChecked}, - {"missing_found", MissingFound}]} + Src ! {done, self(), [{missing_checked, RevsChecked}, + {missing_found, MissingFound}]} end. @@ -195,7 +200,7 @@ open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead) -> SaveDocsPid ! {self(), docs, Docs}, open_doc_revs_loop(DbSource, SaveDocsPid, DocsRead + length(Docs)); {Src, shutdown} -> - Src ! {done, self(), [{"docs_read", DocsRead}]} + Src ! {done, self(), [{docs_read, DocsRead}]} end. @@ -207,7 +212,7 @@ save_docs_loop(DbTarget, DocsWritten) -> ok = save_docs(DbTarget, Docs, []), save_docs_loop(DbTarget, DocsWritten + length(Docs)); {Src, shutdown} -> - Src ! {done, self(), [{"docs_written", DocsWritten}]} + Src ! {done, self(), [{docs_written, DocsWritten}]} end. @@ -224,12 +229,12 @@ do_http_request(Url, Action, JsonBody) -> [] -> {Url, []}; _ -> - {Url, [], "application/json; charset=utf-8", lists:flatten(cjson:encode(JsonBody))} + {Url, [], "application/json; charset=utf-8", iolist_to_binary(?JSON_ENCODE(JsonBody))} end, {ok, {{_, ResponseCode,_},_Headers, ResponseBody}} = http:request(Action, Request, [], []), if ResponseCode >= 200, ResponseCode < 500 -> - cjson:decode(ResponseBody) + ?JSON_DECODE(ResponseBody) end. enum_docs0(_InFun, [], Acc) -> @@ -240,17 +245,25 @@ enum_docs0(InFun, [DocInfo | Rest], Acc) -> {stop, Acc2} -> Acc2 end. -open_db("http" ++ DbName)-> - case lists:last(DbName) of +fix_url(UrlBin) -> + Url = binary_to_list(UrlBin), + case lists:last(Url) of $/ -> - {ok, "http" ++ DbName}; + {ok, Url}; _ -> - {ok, "http" ++ DbName ++ "/"} - end; + {ok, Url ++ "/"} + end. + +open_db(<<"http://", _/binary>>=UrlBin)-> + fix_url(UrlBin); +open_db(<<"https://", _/binary>>=UrlBin)-> + fix_url(UrlBin); open_db(DbName)-> couch_db:open(DbName, []). -close_db("http" ++ _)-> +close_db("http://" ++ _)-> + ok; +close_db("https://" ++ _)-> ok; close_db(DbName)-> couch_db:close(DbName). @@ -258,20 +271,20 @@ close_db(DbName)-> enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) -> Url = DbUrl ++ "_all_docs_by_seq?count=100&startkey=" ++ integer_to_list(StartSeq), - {obj, Results} = do_http_request(Url, get), + {Results} = do_http_request(Url, get), DocInfoList= - lists:map(fun({obj, RowInfoList}) -> - {obj, RowValueProps} = proplists:get_value("value", RowInfoList), + lists:map(fun({RowInfoList}) -> + {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList), #doc_info{ - id=proplists:get_value("id", RowInfoList), - rev=proplists:get_value("rev", RowValueProps), - update_seq = proplists:get_value("key", RowInfoList), + id=proplists:get_value(<<"id">>, RowInfoList), + rev=proplists:get_value(<<"rev">>, RowValueProps), + update_seq = proplists:get_value(<<"key">>, RowInfoList), conflict_revs = - tuple_to_list(proplists:get_value("conflicts", RowValueProps, {})), + proplists:get_value(<<"conflicts">>, RowValueProps, []), deleted_conflict_revs = - tuple_to_list(proplists:get_value("deleted_conflicts", RowValueProps, {})), - deleted = proplists:get_value("deleted", RowValueProps, false)} - end, tuple_to_list(proplists:get_value("rows", Results))), + proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []), + deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)} + end, proplists:get_value(<<"rows">>, Results)), case DocInfoList of [] -> {ok, InAcc}; @@ -284,22 +297,18 @@ enum_docs_since(DbSource, StartSeq, Fun, Acc) -> couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc). get_missing_revs(DbUrl, DocIdRevsList) when is_list(DbUrl) -> - JsonDocIdRevsList = {obj, - [{Id, list_to_tuple(RevList)} || {Id, RevList} <- DocIdRevsList]}, - {obj, ResponseMembers} = - do_http_request(DbUrl ++ "_missing_revs", - post, JsonDocIdRevsList), - {obj, DocMissingRevsList} = proplists:get_value("missing_revs", ResponseMembers), - {ok, [{Id, tuple_to_list(MissingRevs)} || {Id, MissingRevs} <- DocMissingRevsList]}; + {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, {DocIdRevsList}), + {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers), + {ok, DocMissingRevsList}; get_missing_revs(Db, DocId) -> couch_db:get_missing_revs(Db, DocId). update_doc(DbUrl, #doc{id=DocId}=Doc, _Options) when is_list(DbUrl) -> Url = DbUrl ++ url_encode(DocId), - {obj, ResponseMembers} = + {ResponseMembers} = do_http_request(Url, put, couch_doc:to_json_obj(Doc, [revs,attachments])), - RevId = proplists:get_value("_rev", ResponseMembers), + RevId = proplists:get_value(<<"_rev">>, ResponseMembers), {ok, RevId}; update_doc(Db, Doc, Options) -> couch_db:update_doc(Db, Doc, Options). @@ -308,9 +317,9 @@ save_docs(_, [], _) -> ok; save_docs(DbUrl, Docs, []) when is_list(DbUrl) -> JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], - {obj, Returned} = - do_http_request(DbUrl ++ "_bulk_docs", post, {obj, [{new_edits, false}, {docs, list_to_tuple(JsonDocs)}]}), - true = proplists:get_value("ok", Returned), + {Returned} = + do_http_request(DbUrl ++ "_bulk_docs", post, {[{new_edits, false}, {docs, JsonDocs}]}), + true = proplists:get_value(<<"ok">>, Returned), ok; save_docs(Db, Docs, Options) -> couch_db:save_docs(Db, Docs, Options). @@ -318,8 +327,8 @@ save_docs(Db, Docs, Options) -> open_doc(DbUrl, DocId, []) when is_list(DbUrl) -> case do_http_request(DbUrl ++ url_encode(DocId), get) of - {obj, [{"error", ErrId}, {"reason", Reason}]} -> - {list_to_atom(ErrId), Reason}; + {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} -> % binaries? + {list_to_atom(binary_to_list(ErrId)), Reason}; Doc -> {ok, couch_doc:from_json_obj(Doc)} end; @@ -333,16 +342,16 @@ open_doc_revs(DbUrl, DocId, Revs, Options) when is_list(DbUrl) -> % latest is only option right now "latest=true" end, Options), - RevsQueryStrs = lists:flatten(cjson:encode(list_to_tuple(Revs))), - Url = DbUrl ++ DocId ++ "?" ++ couch_util:implode(["revs=true", "attachments=true", "open_revs=" ++ RevsQueryStrs ] ++ QueryOptionStrs, "&"), + RevsQueryStrs = lists:flatten(?JSON_ENCODE(Revs)), + Url = DbUrl ++ binary_to_list(DocId) ++ "?" ++ couch_util:implode(["revs=true", "attachments=true", "open_revs=" ++ RevsQueryStrs ] ++ QueryOptionStrs, "&"), JsonResults = do_http_request(Url, get, []), Results = lists:map( - fun({obj, [{"missing", Rev}]}) -> + fun({[{<<"missing">>, Rev}]}) -> {{not_found, missing}, Rev}; - ({obj, [{"ok", JsonDoc}]}) -> + ({[{<<"ok">>, JsonDoc}]}) -> {ok, couch_doc:from_json_obj(JsonDoc)} - end, tuple_to_list(JsonResults)), + end, JsonResults), {ok, Results}; open_doc_revs(Db, DocId, Revs, Options) -> couch_db:open_doc_revs(Db, DocId, Revs, Options). |