1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
-module(mem3_rep).
-export([go/2, changes_enumerator/2, make_local_id/2]).
-include("mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-define(CTX, #user_ctx{roles = [<<"_admin">>]}).
-record(acc, {revcount = 0, infos = [], seq, localid, source, target}).
go(#shard{} = Source, #shard{} = Target) ->
LocalId = make_local_id(Source, Target),
{ok, Db} = couch_db:open(Source#shard.name, [{user_ctx,?CTX}]),
try go(Db, Target, LocalId) after couch_db:close(Db) end.
go(#db{} = Db, #shard{} = Target, LocalId) ->
Seq = calculate_start_seq(Db, Target, LocalId),
Acc0 = #acc{source=Db, target=Target, seq=Seq, localid=LocalId},
Fun = fun ?MODULE:changes_enumerator/2,
{ok, AccOut} = couch_db:changes_since(Db, all_docs, Seq, Fun, [], Acc0),
{ok, _} = replicate_batch(AccOut).
make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}) ->
S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
<<"_local/shard-sync-", S/binary, "-", T/binary>>.
changes_enumerator(DocInfo, #acc{revcount = C} = Acc) when C >= 99 ->
Seq = DocInfo#doc_info.high_seq,
replicate_batch(Acc#acc{seq = Seq, infos = [DocInfo | Acc#acc.infos]});
changes_enumerator(DocInfo, #acc{revcount = C, infos = Infos} = Acc) ->
RevCount = C + length(DocInfo#doc_info.revs),
Seq = DocInfo#doc_info.high_seq,
{ok, Acc#acc{seq = Seq, revcount = RevCount, infos = [DocInfo | Infos]}}.
replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
case find_missing_revs(Acc) of
[] ->
ok;
Missing ->
ok = save_on_target(Node, Name, open_docs(Acc#acc.source, Missing))
end,
update_locals(Acc),
{ok, Acc#acc{revcount=0, infos=[]}}.
find_missing_revs(Acc) ->
#acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
IdsRevs = lists:map(fun(#doc_info{id=Id, revs=RevInfos}) ->
{Id, [R || #rev_info{rev=R} <- RevInfos]}
end, Infos),
rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}).
open_docs(Db, Missing) ->
lists:flatmap(fun({Id, Revs, _}) ->
{ok, Docs} = couch_db:open_doc_revs(Db, Id, Revs, [latest]),
[Doc || {ok, Doc} <- Docs]
end, Missing).
save_on_target(Node, Name, Docs) ->
Options = [replicated_changes, full_commit, {user_ctx, ?CTX}],
rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
ok.
update_locals(Acc) ->
#acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
#shard{name=Name, node=Node} = Target,
Doc = #doc{id = Id, body = {[
{<<"seq">>, Seq},
{<<"timestamp">>, list_to_binary(iso8601_timestamp())}
]}},
{ok, _} = couch_db:update_doc(Db, Doc, []),
Options = [{user_ctx, ?CTX}],
rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
rexi_call(Node, MFA) ->
Ref = rexi:cast(Node, MFA),
receive {Ref, {ok, Reply}} ->
Reply;
{Ref, Error} ->
erlang:error(Error)
after 60000 ->
erlang:error(timeout)
end.
calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
case couch_db:open_doc(Db, LocalId, []) of
{ok, #doc{body = {SProps}}} ->
try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, []]}) of
#doc{body = {TProps}} ->
SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
erlang:min(SourceSeq, TargetSeq)
catch error:_ ->
0
end;
_ ->
0
end.
iso8601_timestamp() ->
{_,_,Micro} = Now = os:timestamp(),
{{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
|