1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
/**
* The u1db_sync_log document stores both the actual sync log and a list of
* pending updates to the log, in case we receive incoming documents out of
* the correct order (i.e. if there are parallel PUTs during the sync
* process).
*
* The structure of the document is the following:
*
* {
* 'syncs': [
* ['<replica_uid>', <gen>, '<trans_id>'],
* ...
* ],
* 'pending': {
* 'other_replica_uid': {
* 'sync_id': '<sync_id>',
* 'log': [[<gen>, '<trans_id>'], ...]
* },
* ...
* }
* }
*
* The update function below does the following:
*
* 0. If we do not receive a sync_id, we just update the 'syncs' list with
* the incoming info about the source replica state.
*
* 1. Otherwise, if the incoming sync_id differs from current stored
* sync_id, then we assume that the previous sync session for that source
* replica was interrupted and discard all pending data.
*
* 2. Then we append incoming info as pending data for that source replica
* and current sync_id, and sort the pending data by generation.
*
* 3. Then we go through pending data and find the most recent generation
* that we can use to update the actual sync log.
*
* 4. Finally, we insert the most up to date information into the sync log.
*/
function(doc, req){
// create the document if it doesn't exist
if (!doc) {
doc = {}
doc['_id'] = 'u1db_sync_log';
doc['syncs'] = [];
}
// get and validate incoming info
var body = JSON.parse(req.body);
var other_replica_uid = body['other_replica_uid'];
var other_generation = parseInt(body['other_generation']);
var other_transaction_id = body['other_transaction_id']
var sync_id = body['sync_id'];
var number_of_docs = body['number_of_docs'];
var doc_idx = body['doc_idx'];
// parse integers
if (number_of_docs != null)
number_of_docs = parseInt(number_of_docs);
if (doc_idx != null)
doc_idx = parseInt(doc_idx);
if (other_replica_uid == null
|| other_generation == null
|| other_transaction_id == null)
return [null, 'invalid data'];
// create slot for pending logs
if (doc['pending'] == null)
doc['pending'] = {};
// these are the values that will be actually inserted
var current_gen = other_generation;
var current_trans_id = other_transaction_id;
/*------------- Wait for sequential values before storing -------------*/
// we just try to obtain pending log if we received a sync_id
if (sync_id != null) {
// create slot for current source and sync_id pending log
if (doc['pending'][other_replica_uid] == null
|| doc['pending'][other_replica_uid]['sync_id'] != sync_id) {
doc['pending'][other_replica_uid] = {
'sync_id': sync_id,
'log': [],
'last_doc_idx': 0,
}
}
// append incoming data to pending log
doc['pending'][other_replica_uid]['log'].push([
other_generation,
other_transaction_id,
doc_idx,
])
// sort pending log according to generation
doc['pending'][other_replica_uid]['log'].sort(function(a, b) {
return a[0] - b[0];
});
// get most up-to-date information from pending log
var last_doc_idx = doc['pending'][other_replica_uid]['last_doc_idx'];
var pending_idx = doc['pending'][other_replica_uid]['log'][0][2];
current_gen = null;
current_trans_id = null;
while (last_doc_idx + 1 == pending_idx) {
pending = doc['pending'][other_replica_uid]['log'].shift()
current_gen = pending[0];
current_trans_id = pending[1];
last_doc_idx = pending[2]
if (doc['pending'][other_replica_uid]['log'].length == 0)
break;
pending_idx = doc['pending'][other_replica_uid]['log'][0][2];
}
// leave the sync log untouched if we still did not receive enough docs
if (current_gen == null)
return [doc, 'ok'];
// update last index of received doc
doc['pending'][other_replica_uid]['last_doc_idx'] = last_doc_idx;
// eventually remove all pending data from that replica
if (last_doc_idx == number_of_docs)
delete doc['pending'][other_replica_uid]
}
/*--------------- Store source replica info on sync log ---------------*/
// remove outdated info
doc['syncs'] = doc['syncs'].filter(
function (entry) {
return entry[0] != other_replica_uid;
}
);
// store in log
doc['syncs'].push([
other_replica_uid,
current_gen,
current_trans_id
]);
return [doc, 'ok'];
}
|