diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-08-18 11:51:03 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-08-18 14:24:57 -0400 |
commit | 7393d62b7b630bee50f609d0ae8125d33f7cda2b (patch) | |
tree | 754e9ab17a586319c562de488e60056feff60bb8 /apps/couch/src/couch_doc.erl | |
parent | c0cb2625f25a2b51485c164bea1d8822f449ce14 (diff) |
Grab bag of Cloudant patches to couch OTP application
- Removal of couch_db and couch_ref_counter processes. Active DBs are
accessible through a protected ets table owned by couch_server.
- #full_doc_info{} in by_id and by_seq trees for faster compaction at the
expense of more disk usage afterwards. Proposed as COUCHDB-738 but not
accepted upstream.
- Replication via distributed Erlang.
- Better hot upgrade support (uses exported functions much more often).
- Configurable btree chunk sizes allow for larger (but still bounded)
reductions.
- Shorter names for btree fields in #db{} and #db_header{}.
- couch_view_group does not keep a reference to the #db{}.
- Terms are stored compressed (again).
Diffstat (limited to 'apps/couch/src/couch_doc.erl')
-rw-r--r-- | apps/couch/src/couch_doc.erl | 98 |
1 files changed, 73 insertions, 25 deletions
diff --git a/apps/couch/src/couch_doc.erl b/apps/couch/src/couch_doc.erl index d15cd7de..d47f85ef 100644 --- a/apps/couch/src/couch_doc.erl +++ b/apps/couch/src/couch_doc.erl @@ -334,6 +334,8 @@ att_to_bin(#att{data=DataFun, att_len=Len}) when is_function(DataFun)-> )) ). +get_validate_doc_fun({Props}) -> + get_validate_doc_fun(couch_doc:from_json_obj({Props})); get_validate_doc_fun(#doc{body={Props}}=DDoc) -> case couch_util:get_value(<<"validate_doc_update">>, Props) of undefined -> @@ -364,7 +366,7 @@ merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) -> DiskAtt; _ -> throw({missing_stub, - <<"id:", Id/binary, ", name:", Name/binary>>}) + <<"Invalid attachment stub in ", Id/binary, " for ", Name/binary>>}) end; (Att) -> Att @@ -453,15 +455,11 @@ doc_from_multi_part_stream(ContentType, DataFun) -> receive {doc_bytes, DocBytes} -> Doc = from_json_obj(?JSON_DECODE(DocBytes)), - % go through the attachments looking for 'follows' in the data, - % replace with function that reads the data from MIME stream. - ReadAttachmentDataFun = fun() -> - Parser ! {get_bytes, self()}, - receive {bytes, Bytes} -> Bytes end - end, + % we'll send the Parser process ID to the remote nodes so they can + % retrieve their own copies of the attachment data Atts2 = lists:map( fun(#att{data=follows}=A) -> - A#att{data=ReadAttachmentDataFun}; + A#att{data={follows, Parser}}; (A) -> A end, Doc#doc.atts), @@ -484,25 +482,75 @@ mp_parse_doc(body_end, AccBytes) -> From ! {doc_bytes, lists:reverse(AccBytes)} end, fun (Next) -> - mp_parse_atts(Next) + mp_parse_atts(Next, {[], 0, orddict:new(), []}) end. -mp_parse_atts(eof) -> - ok; -mp_parse_atts({headers, _H}) -> - fun (Next) -> - mp_parse_atts(Next) - end; -mp_parse_atts({body, Bytes}) -> - receive {get_bytes, From} -> - From ! {bytes, Bytes} - end, - fun (Next) -> - mp_parse_atts(Next) - end; -mp_parse_atts(body_end) -> - fun (Next) -> - mp_parse_atts(Next) +mp_parse_atts({headers, _}, Acc) -> + fun(Next) -> mp_parse_atts(Next, Acc) end; +mp_parse_atts(body_end, Acc) -> + fun(Next) -> mp_parse_atts(Next, Acc) end; +mp_parse_atts({body, Bytes}, {DataList, Offset, Counters, Waiting}) -> + NewAcc = maybe_send_data({DataList++[Bytes], Offset, Counters, Waiting}), + fun(Next) -> mp_parse_atts(Next, NewAcc) end; +mp_parse_atts(eof, {DataList, Offset, Counters, Waiting}) -> + N = list_to_integer(couch_config:get("cluster", "n", "3")), + M = length(Counters), + case (M == N) andalso DataList == [] of + true -> + ok; + false -> + receive {get_bytes, From} -> + C2 = orddict:update_counter(From, 1, Counters), + NewAcc = maybe_send_data({DataList, Offset, C2, [From|Waiting]}), + mp_parse_atts(eof, NewAcc) + after 3600000 -> + ok + end end. +maybe_send_data({ChunkList, Offset, Counters, Waiting}) -> + receive {get_bytes, From} -> + NewCounters = orddict:update_counter(From, 1, Counters), + maybe_send_data({ChunkList, Offset, NewCounters, [From|Waiting]}) + after 0 -> + % reply to as many writers as possible + NewWaiting = lists:filter(fun(Writer) -> + WhichChunk = orddict:fetch(Writer, Counters), + ListIndex = WhichChunk - Offset, + if ListIndex =< length(ChunkList) -> + Writer ! {bytes, lists:nth(ListIndex, ChunkList)}, + false; + true -> + true + end + end, Waiting), + % check if we can drop a chunk from the head of the list + case Counters of + [] -> + SmallestIndex = 0; + _ -> + SmallestIndex = lists:min(element(2, lists:unzip(Counters))) + end, + Size = length(Counters), + N = list_to_integer(couch_config:get("cluster", "n", "3")), + if Size == N andalso SmallestIndex == (Offset+1) -> + NewChunkList = tl(ChunkList), + NewOffset = Offset+1; + true -> + NewChunkList = ChunkList, + NewOffset = Offset + end, + + % we should wait for a writer if no one has written the last chunk + LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]), + if LargestIndex >= (Offset + length(ChunkList)) -> + % someone has written all possible chunks, keep moving + {NewChunkList, NewOffset, Counters, NewWaiting}; + true -> + receive {get_bytes, X} -> + C2 = orddict:update_counter(X, 1, Counters), + maybe_send_data({NewChunkList, NewOffset, C2, [X|NewWaiting]}) + end + end + end. |