summaryrefslogtreecommitdiff
path: root/apps/fabric/src/fabric_doc_attachments.erl
blob: aecdaaefa2452848e381d8ec1d49c7712d6969e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
-module(fabric_doc_attachments).

-include("fabric.hrl").

%% couch api calls
-export([receiver/2]).

receiver(_Req, undefined) ->
    <<"">>;
receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
    exit({unknown_transfer_encoding, Unknown});
receiver(Req, chunked) ->
    MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
    fun(4096, ChunkFun, ok) ->
        write_chunks(MiddleMan, ChunkFun)
    end;
receiver(_Req, 0) ->
    <<"">>;
receiver(Req, Length) when is_integer(Length) ->
    Middleman = spawn(fun() -> middleman(Req, Length) end),
    fun() ->
        Middleman ! {self(), gimme_data},
        receive {Middleman, Data} -> Data end
    end;
receiver(_Req, Length) ->
    exit({length_not_integer, Length}).

%%
%% internal
%%

write_chunks(MiddleMan, ChunkFun) ->
    MiddleMan ! {self(), gimme_data},
    receive
    {MiddleMan, {0, _Footers}} ->
        % MiddleMan ! {self(), done},
        ok;
    {MiddleMan, ChunkRecord} ->
        ChunkFun(ChunkRecord, ok),
        write_chunks(MiddleMan, ChunkFun)
    end.

receive_unchunked_attachment(_Req, 0) ->
    ok;
receive_unchunked_attachment(Req, Length) ->
    receive {MiddleMan, go} ->
        Data = couch_httpd:recv(Req, 0),
        MiddleMan ! {self(), Data}
    end,
    receive_unchunked_attachment(Req, Length - size(Data)).

middleman(Req, chunked) ->
    % spawn a process to actually receive the uploaded data
    RcvFun = fun(ChunkRecord, ok) ->
        receive {From, go} -> From ! {self(), ChunkRecord} end, ok
    end,
    Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),

    % take requests from the DB writers and get data from the receiver
    N = erlang:list_to_integer(couch_config:get("cluster","n")),
    middleman_loop(Receiver, N, dict:new(), 0, []);

middleman(Req, Length) ->
    Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
    N = erlang:list_to_integer(couch_config:get("cluster","n")),
    middleman_loop(Receiver, N, dict:new(), 0, []).

middleman_loop(Receiver, N, Counters, Offset, ChunkList) ->
    receive {From, gimme_data} ->
        % figure out how far along this writer (From) is in the list
        {NewCounters, WhichChunk} = case dict:find(From, Counters) of
        {ok, I} ->
            {dict:update_counter(From, 1, Counters), I};
        error ->
            {dict:store(From, 2, Counters), 1}
        end,
        ListIndex = WhichChunk - Offset,

        % talk to the receiver to get another chunk if necessary
        ChunkList1 = if ListIndex > length(ChunkList) ->
            Receiver ! {self(), go},
            receive {Receiver, ChunkRecord} -> ChunkList ++ [ChunkRecord] end;
        true -> ChunkList end,

        % reply to the writer
        From ! {self(), lists:nth(ListIndex, ChunkList1)},

        % check if we can drop a chunk from the head of the list
        SmallestIndex = dict:fold(fun(_, Val, Acc) -> lists:min([Val,Acc]) end,
            WhichChunk+1, NewCounters),
        Size = dict:size(NewCounters),

        {NewChunkList, NewOffset} =
        if Size == N andalso (SmallestIndex - Offset) == 2 ->
            {tl(ChunkList1), Offset+1};
        true ->
            {ChunkList1, Offset}
        end,
        middleman_loop(Receiver, N, NewCounters, NewOffset, NewChunkList)
    after 10000 ->
        ok
    end.