summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-06-08 12:00:59 -0400
committerBrad Anderson <brad@cloudant.com>2010-06-08 12:00:59 -0400
commitd7a6bd635ce60d0fc6a6a40dc3027cee308701f6 (patch)
tree56399ffe07b43b40fdfd03cc9799aa5f3fb134d2 /src
parent4e0c97bf3587e9d0e330494f0d06194c0c4bfa17 (diff)
fabric attachments
Diffstat (limited to 'src')
-rw-r--r--src/fabric.erl6
-rw-r--r--src/fabric_doc_attachments.erl103
2 files changed, 107 insertions, 2 deletions
diff --git a/src/fabric.erl b/src/fabric.erl
index 983cd818..80bc6d4c 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -6,7 +6,7 @@
% Documents
-export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3,
- update_docs/3]).
+ update_docs/3, att_receiver/2]).
% Views
-export([all_docs/4, query_view/5]).
@@ -42,7 +42,7 @@ delete_db(DbName, Options) ->
fabric_db_delete:delete_db(dbname(DbName), opts(Options)).
-
+% doc operations
open_doc(DbName, Id, Options) ->
fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)).
@@ -60,6 +60,8 @@ update_doc(DbName, Doc, Options) ->
update_docs(DbName, Docs, Options) ->
fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)).
+att_receiver(Req, Length) ->
+ fabric_doc_attachments:receiver(Req, Length).
all_docs(DbName, #view_query_args{} = QueryArgs, Callback, Acc0) when
is_function(Callback, 2) ->
diff --git a/src/fabric_doc_attachments.erl b/src/fabric_doc_attachments.erl
new file mode 100644
index 00000000..6230a444
--- /dev/null
+++ b/src/fabric_doc_attachments.erl
@@ -0,0 +1,103 @@
+-module(fabric_doc_attachments).
+
+-include("fabric.hrl").
+
+%% couch api calls
+-export([receiver/2]).
+
+receiver(_Req, undefined) ->
+ <<"">>;
+receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
+ exit({unknown_transfer_encoding, Unknown});
+receiver(Req, chunked) ->
+ % ?LOG_INFO("generating chunked attachment processes", []),
+ MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
+ fun(4096, ChunkFun, ok) ->
+ write_chunks(MiddleMan, ChunkFun)
+ end;
+receiver(_Req, 0) ->
+ <<"">>;
+receiver(Req, Length) when is_integer(Length) ->
+ Middleman = spawn(fun() -> middleman(Req, Length) end),
+ fun() ->
+ Middleman ! {self(), gimme_data},
+ receive {Middleman, Data} -> Data end
+ end;
+receiver(_Req, Length) ->
+ exit({length_not_integer, Length}).
+
+%%
+%% internal
+%%
+
+write_chunks(MiddleMan, ChunkFun) ->
+ MiddleMan ! {self(), gimme_data},
+ receive
+ {MiddleMan, {0, _Footers}} ->
+ % MiddleMan ! {self(), done},
+ ok;
+ {MiddleMan, ChunkRecord} ->
+ ChunkFun(ChunkRecord, ok),
+ write_chunks(MiddleMan, ChunkFun)
+ end.
+
+receive_unchunked_attachment(_Req, 0) ->
+ ok;
+receive_unchunked_attachment(Req, Length) ->
+ receive {MiddleMan, go} ->
+ Data = couch_httpd:recv(Req, 0),
+ MiddleMan ! {self(), Data}
+ end,
+ receive_unchunked_attachment(Req, Length - size(Data)).
+
+middleman(Req, chunked) ->
+ % spawn a process to actually receive the uploaded data
+ RcvFun = fun(ChunkRecord, ok) ->
+ receive {From, go} -> From ! {self(), ChunkRecord} end, ok
+ end,
+ Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),
+
+ % take requests from the DB writers and get data from the receiver
+ N = erlang:list_to_integer(couch_config:get("cluster","n")),
+ middleman_loop(Receiver, N, dict:new(), 0, []);
+
+middleman(Req, Length) ->
+ Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
+ N = erlang:list_to_integer(couch_config:get("cluster","n")),
+ middleman_loop(Receiver, N, dict:new(), 0, []).
+
+middleman_loop(Receiver, N, Counters, Offset, ChunkList) ->
+ receive {From, gimme_data} ->
+ % figure out how far along this writer (From) is in the list
+ {NewCounters, WhichChunk} = case dict:find(From, Counters) of
+ {ok, I} ->
+ {dict:update_counter(From, 1, Counters), I};
+ error ->
+ {dict:store(From, 2, Counters), 1}
+ end,
+ ListIndex = WhichChunk - Offset,
+
+ % talk to the receiver to get another chunk if necessary
+ ChunkList1 = if ListIndex > length(ChunkList) ->
+ Receiver ! {self(), go},
+ receive {Receiver, ChunkRecord} -> ChunkList ++ [ChunkRecord] end;
+ true -> ChunkList end,
+
+ % reply to the writer
+ From ! {self(), lists:nth(ListIndex, ChunkList1)},
+
+ % check if we can drop a chunk from the head of the list
+ SmallestIndex = dict:fold(fun(_, Val, Acc) -> lists:min([Val,Acc]) end,
+ WhichChunk+1, NewCounters),
+ Size = dict:size(NewCounters),
+
+ {NewChunkList, NewOffset} =
+ if Size == N andalso (SmallestIndex - Offset) == 2 ->
+ {tl(ChunkList1), Offset+1};
+ true ->
+ {ChunkList1, Offset}
+ end,
+ middleman_loop(Receiver, N, NewCounters, NewOffset, NewChunkList)
+ after 10000 ->
+ ok
+ end.