summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_doc.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-18 11:51:03 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-18 14:24:57 -0400
commit7393d62b7b630bee50f609d0ae8125d33f7cda2b (patch)
tree754e9ab17a586319c562de488e60056feff60bb8 /apps/couch/src/couch_doc.erl
parentc0cb2625f25a2b51485c164bea1d8822f449ce14 (diff)
Grab bag of Cloudant patches to couch OTP application
- Removal of couch_db and couch_ref_counter processes. Active DBs are accessible through a protected ets table owned by couch_server. - #full_doc_info{} in by_id and by_seq trees for faster compaction at the expense of more disk usage afterwards. Proposed as COUCHDB-738 but not accepted upstream. - Replication via distributed Erlang. - Better hot upgrade support (uses exported functions much more often). - Configurable btree chunk sizes allow for larger (but still bounded) reductions. - Shorter names for btree fields in #db{} and #db_header{}. - couch_view_group does not keep a reference to the #db{}. - Terms are stored compressed (again).
Diffstat (limited to 'apps/couch/src/couch_doc.erl')
-rw-r--r--apps/couch/src/couch_doc.erl98
1 files changed, 73 insertions, 25 deletions
diff --git a/apps/couch/src/couch_doc.erl b/apps/couch/src/couch_doc.erl
index d15cd7de..d47f85ef 100644
--- a/apps/couch/src/couch_doc.erl
+++ b/apps/couch/src/couch_doc.erl
@@ -334,6 +334,8 @@ att_to_bin(#att{data=DataFun, att_len=Len}) when is_function(DataFun)->
))
).
+get_validate_doc_fun({Props}) ->
+ get_validate_doc_fun(couch_doc:from_json_obj({Props}));
get_validate_doc_fun(#doc{body={Props}}=DDoc) ->
case couch_util:get_value(<<"validate_doc_update">>, Props) of
undefined ->
@@ -364,7 +366,7 @@ merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
DiskAtt;
_ ->
throw({missing_stub,
- <<"id:", Id/binary, ", name:", Name/binary>>})
+ <<"Invalid attachment stub in ", Id/binary, " for ", Name/binary>>})
end;
(Att) ->
Att
@@ -453,15 +455,11 @@ doc_from_multi_part_stream(ContentType, DataFun) ->
receive
{doc_bytes, DocBytes} ->
Doc = from_json_obj(?JSON_DECODE(DocBytes)),
- % go through the attachments looking for 'follows' in the data,
- % replace with function that reads the data from MIME stream.
- ReadAttachmentDataFun = fun() ->
- Parser ! {get_bytes, self()},
- receive {bytes, Bytes} -> Bytes end
- end,
+ % we'll send the Parser process ID to the remote nodes so they can
+ % retrieve their own copies of the attachment data
Atts2 = lists:map(
fun(#att{data=follows}=A) ->
- A#att{data=ReadAttachmentDataFun};
+ A#att{data={follows, Parser}};
(A) ->
A
end, Doc#doc.atts),
@@ -484,25 +482,75 @@ mp_parse_doc(body_end, AccBytes) ->
From ! {doc_bytes, lists:reverse(AccBytes)}
end,
fun (Next) ->
- mp_parse_atts(Next)
+ mp_parse_atts(Next, {[], 0, orddict:new(), []})
end.
-mp_parse_atts(eof) ->
- ok;
-mp_parse_atts({headers, _H}) ->
- fun (Next) ->
- mp_parse_atts(Next)
- end;
-mp_parse_atts({body, Bytes}) ->
- receive {get_bytes, From} ->
- From ! {bytes, Bytes}
- end,
- fun (Next) ->
- mp_parse_atts(Next)
- end;
-mp_parse_atts(body_end) ->
- fun (Next) ->
- mp_parse_atts(Next)
+mp_parse_atts({headers, _}, Acc) ->
+ fun(Next) -> mp_parse_atts(Next, Acc) end;
+mp_parse_atts(body_end, Acc) ->
+ fun(Next) -> mp_parse_atts(Next, Acc) end;
+mp_parse_atts({body, Bytes}, {DataList, Offset, Counters, Waiting}) ->
+ NewAcc = maybe_send_data({DataList++[Bytes], Offset, Counters, Waiting}),
+ fun(Next) -> mp_parse_atts(Next, NewAcc) end;
+mp_parse_atts(eof, {DataList, Offset, Counters, Waiting}) ->
+ N = list_to_integer(couch_config:get("cluster", "n", "3")),
+ M = length(Counters),
+ case (M == N) andalso DataList == [] of
+ true ->
+ ok;
+ false ->
+ receive {get_bytes, From} ->
+ C2 = orddict:update_counter(From, 1, Counters),
+ NewAcc = maybe_send_data({DataList, Offset, C2, [From|Waiting]}),
+ mp_parse_atts(eof, NewAcc)
+ after 3600000 ->
+ ok
+ end
end.
+maybe_send_data({ChunkList, Offset, Counters, Waiting}) ->
+ receive {get_bytes, From} ->
+ NewCounters = orddict:update_counter(From, 1, Counters),
+ maybe_send_data({ChunkList, Offset, NewCounters, [From|Waiting]})
+ after 0 ->
+ % reply to as many writers as possible
+ NewWaiting = lists:filter(fun(Writer) ->
+ WhichChunk = orddict:fetch(Writer, Counters),
+ ListIndex = WhichChunk - Offset,
+ if ListIndex =< length(ChunkList) ->
+ Writer ! {bytes, lists:nth(ListIndex, ChunkList)},
+ false;
+ true ->
+ true
+ end
+ end, Waiting),
+ % check if we can drop a chunk from the head of the list
+ case Counters of
+ [] ->
+ SmallestIndex = 0;
+ _ ->
+ SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
+ end,
+ Size = length(Counters),
+ N = list_to_integer(couch_config:get("cluster", "n", "3")),
+ if Size == N andalso SmallestIndex == (Offset+1) ->
+ NewChunkList = tl(ChunkList),
+ NewOffset = Offset+1;
+ true ->
+ NewChunkList = ChunkList,
+ NewOffset = Offset
+ end,
+
+ % we should wait for a writer if no one has written the last chunk
+ LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
+ if LargestIndex >= (Offset + length(ChunkList)) ->
+ % someone has written all possible chunks, keep moving
+ {NewChunkList, NewOffset, Counters, NewWaiting};
+ true ->
+ receive {get_bytes, X} ->
+ C2 = orddict:update_counter(X, 1, Counters),
+ maybe_send_data({NewChunkList, NewOffset, C2, [X|NewWaiting]})
+ end
+ end
+ end.