From e1691e485af06d9b502e33d2f6ea424003801847 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Thu, 20 May 2010 21:47:51 +0000 Subject: Refactoring of various internal APIs, particularly those dealing with replicating documents with attachments. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@946803 13f79535-47bb-0310-9956-ffa450edef68 --- share/www/script/test/changes.js | 4 +- src/couchdb/couch_changes.erl | 55 ++++++++++++++---------- src/couchdb/couch_db.erl | 52 ++++++++++++++++++----- src/couchdb/couch_doc.erl | 91 +++++++++++++++++++++------------------- src/couchdb/couch_httpd_db.erl | 28 +++---------- src/couchdb/couch_rep_writer.erl | 7 ++-- src/couchdb/couch_work_queue.erl | 61 ++++++++++++++++++--------- 7 files changed, 175 insertions(+), 123 deletions(-) diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js index e109a074..b8e691b3 100644 --- a/share/www/script/test/changes.js +++ b/share/www/script/test/changes.js @@ -220,7 +220,7 @@ couchTests.changes = function(debug) { req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/dynamic&field=bop"); resp = JSON.parse(req.responseText); - T(resp.results.length == 1); + T(resp.results.length == 1, "changes_filter/dynamic&field=bop"); if (!is_safari && xhr) { // full test requires parallel connections // filter with longpoll @@ -352,7 +352,7 @@ couchTests.changes = function(debug) { req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/conflicted"); resp = JSON.parse(req.responseText); - T(resp.results.length == 1); + T(resp.results.length == 1, "filter=changes_filter/conflicted"); // test with erlang filter function run_on_modified_server([{ diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl index 9496c226..0c4691d0 100644 --- a/src/couchdb/couch_changes.erl +++ b/src/couchdb/couch_changes.erl @@ -16,8 +16,9 @@ -export([handle_changes/3]). %% @type Req -> #httpd{} | {json_req, JsonObj()} -handle_changes(#changes_args{}=Args1, Req, Db) -> - Args = Args1#changes_args{filter=make_filter_fun(Args1#changes_args.filter, Req, Db)}, +handle_changes(#changes_args{style=Style}=Args1, Req, Db) -> + Args = Args1#changes_args{filter= + make_filter_fun(Args1#changes_args.filter, Style, Req, Db)}, StartSeq = case Args#changes_args.dir of rev -> couch_db:get_update_seq(Db); @@ -72,14 +73,18 @@ handle_changes(#changes_args{}=Args1, Req, Db) -> end. %% @type Req -> #httpd{} | {json_req, JsonObj()} -make_filter_fun(FilterName, Req, Db) -> +make_filter_fun(FilterName, Style, Req, Db) -> case [list_to_binary(couch_httpd:unquote(Part)) || Part <- string:tokens(FilterName, "/")] of [] -> - fun(DocInfos) -> - % doing this as a batch is more efficient for external filters - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || - #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos] + fun(#doc_info{revs=[#rev_info{rev=Rev}|_]=Revs}) -> + case Style of + main_only -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; + all_docs -> + [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} + || #rev_info{rev=R} <- Revs] + end end; [DName, FName] -> DesignId = <<"_design/", DName/binary>>, @@ -87,17 +92,23 @@ make_filter_fun(FilterName, Req, Db) -> % validate that the ddoc has the filter fun #doc{body={Props}} = DDoc, couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), - fun(DocInfos) -> + fun(DocInfo) -> + DocInfos = + case Style of + main_only -> + [DocInfo]; + all_docs -> + [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs] + end, Docs = [Doc || {ok, Doc} <- [ - {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts]) - || DInfo <- DocInfos]], + couch_db:open_doc(Db, DocInfo2, [deleted, conflicts]) + || DocInfo2 <- DocInfos]], {ok, Passes} = couch_query_servers:filter_docs( Req, Db, DDoc, FName, Docs ), - % ?LOG_INFO("filtering ~p ~w",[FName, [DI#doc_info.high_seq || DI <- DocInfos]]), - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} - || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos, - Pass <- Passes, Pass == true] + [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]} + || {Pass, #doc{revs={RevPos,[RevId|_]}}} + <- lists:zip(Passes, Docs), Pass == true] end; _Else -> throw({bad_request, @@ -195,12 +206,12 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, end_sending_changes(Callback, EndSeq, ResponseType) -> Callback({stop, EndSeq}, ResponseType). -changes_enumerator(DocInfos, {Db, _, _, FilterFun, Callback, "continuous", +changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous", Limit, IncludeDocs}) -> - [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] - = DocInfos, - Results0 = FilterFun(DocInfos), + #doc_info{id=Id, high_seq=Seq, + revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo, + Results0 = FilterFun(DocInfo), Results = [Result || Result <- Results0, Result /= null], Go = if Limit =< 1 -> stop; true -> ok end, case Results of @@ -215,12 +226,12 @@ changes_enumerator(DocInfos, {Db, _, _, FilterFun, Callback, "continuous", IncludeDocs} } end; -changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Callback, ResponseType, +changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType, Limit, IncludeDocs}) -> - [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] - = DocInfos, - Results0 = FilterFun(DocInfos), + #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} + = DocInfo, + Results0 = FilterFun(DocInfo), Results = [Result || Result <- Results0, Result /= null], Go = if Limit =< 1 -> stop; true -> ok end, case Results of diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 89530e73..cd077a2f 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -119,18 +119,47 @@ open_doc(Db, Id, Options) -> {ok, #doc{deleted=true}=Doc} -> case lists:member(deleted, Options) of true -> - {ok, Doc}; + apply_open_options({ok, Doc},Options); false -> {not_found, deleted} end; Else -> - Else + apply_open_options(Else,Options) + end. + +apply_open_options({ok, Doc},Options) -> + apply_open_options2(Doc,Options); +apply_open_options(Else,_Options) -> + Else. + +apply_open_options2(Doc,[]) -> + {ok, Doc}; +apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc, + [{atts_since, PossibleAncestors}|Rest]) -> + RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors), + apply_open_options2(Doc#doc{atts=[A#att{data= + if AttPos>RevPos -> Data; true -> stub end} + || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest); +apply_open_options2(Doc,[_|Rest]) -> + apply_open_options2(Doc,Rest). + + +find_ancestor_rev_pos({_, []}, _AttsSinceRevs) -> + 0; +find_ancestor_rev_pos(_DocRevs, []) -> + 0; +find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) -> + case lists:member({RevPos, RevId}, AttsSinceRevs) of + true -> + RevPos; + false -> + find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs) end. open_doc_revs(Db, Id, Revs, Options) -> couch_stats_collector:increment({couchdb, database_reads}), - [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options), - Result. + [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options), + {ok, [apply_open_options(Result, Options) || Result <- Results]}. % Each returned result is a list of tuples: % {Id, MissingRevs, PossibleAncestors} @@ -437,7 +466,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> case couch_doc:has_stubs(Doc) of true -> - couch_doc:merge_doc(Doc, #doc{}); % will throw exception + couch_doc:merge_stubs(Doc, #doc{}); % will throw exception false -> ok end, case Revs of @@ -892,15 +921,16 @@ changes_since(Db, Style, StartSeq, Fun, Acc) -> changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> Wrapper = fun(DocInfo, _Offset, Acc2) -> #doc_info{revs=Revs} = DocInfo, + DocInfo2 = case Style of - main_only -> - Infos = [DocInfo]; + main_only -> + DocInfo; all_docs -> - % make each rev it's own doc info - Infos = [DocInfo#doc_info{revs=[RevInfo]} || - #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq] + % remove revs before the seq + DocInfo#doc_info{revs=[RevInfo || + #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]} end, - Fun(Infos, Acc2) + Fun(DocInfo2, Acc2) end, {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options), diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index 0195fb1e..025d4e55 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -17,7 +17,7 @@ -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). -export([validate_docid/1]). -export([doc_from_multi_part_stream/2]). --export([doc_to_multi_part_stream/6, len_doc_to_multi_part_stream/5]). +-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]). -include("couch_db.hrl"). @@ -73,31 +73,25 @@ to_json_meta(Meta) -> end, Meta). to_json_attachments(Attachments, Options) -> - RevPos = case lists:member(attachments, Options) of - true -> % return all the binaries - 0; - false -> - % note the default is [], because this sorts higher than all numbers. - % and will return all the binaries. - couch_util:get_value(atts_after_revpos, Options, []) - end, to_json_attachments( Attachments, - RevPos, + lists:member(attachments, Options), lists:member(follows, Options), lists:member(att_encoding_info, Options) ). -to_json_attachments([], _RevPosIncludeAfter, _DataToFollow, _ShowEncInfo) -> +to_json_attachments([], _OutputData, _DataToFollow, _ShowEncInfo) -> []; -to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowEncInfo) -> +to_json_attachments(Atts, OutputData, DataToFollow, ShowEncInfo) -> AttProps = lists:map( fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) -> {Att#att.name, {[ {<<"content_type">>, Att#att.type}, {<<"revpos">>, Att#att.revpos} ] ++ - if Att#att.revpos > RevPosIncludeAfter -> + if not OutputData orelse Att#att.data == stub -> + [{<<"length">>, DiskLen}, {<<"stub">>, true}]; + true -> if DataToFollow -> [{<<"length">>, DiskLen}, {<<"follows">>, true}]; true -> @@ -108,9 +102,7 @@ to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowEncInfo) -> att_to_bin(Att) end, [{<<"data">>, base64:encode(AttData)}] - end; - true -> - [{<<"length">>, DiskLen}, {<<"stub">>, true}] + end end ++ case {ShowEncInfo, Enc} of {false, _} -> @@ -383,16 +375,13 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0-> ResultAcc = Fun(Bin, Acc), fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). -len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, +len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, SendEncodedAtts) -> - 2 + % "--" - size(Boundary) + - 36 + % "\r\ncontent-type: application/json\r\n\r\n" - iolist_size(JsonBytes) + - 4 + % "\r\n--" - size(Boundary) + - + lists:foldl(fun(#att{revpos=RevPos} = Att, AccAttsSize) -> - if RevPos > AttsSinceRevPos -> + AttsSize = lists:foldl(fun(#att{data=Data} = Att, AccAttsSize) -> + case Data of + stub -> + AccAttsSize; + _ -> AccAttsSize + 4 + % "\r\n\r\n" case SendEncodedAtts of @@ -402,24 +391,41 @@ len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, Att#att.disk_len end + 4 + % "\r\n--" - size(Boundary); - true -> - AccAttsSize + size(Boundary) end - end, 0, Atts) + - 2. % "--" + end, 0, Atts), + if AttsSize == 0 -> + iolist_size(JsonBytes); + true -> + 2 + % "--" + size(Boundary) + + 36 + % "\r\ncontent-type: application/json\r\n\r\n" + iolist_size(JsonBytes) + + 4 + % "\r\n--" + size(Boundary) + + + AttsSize + + 2 % "--" + end. -doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, WriteFun, +doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun, SendEncodedAtts) -> - WriteFun([<<"--", Boundary/binary, - "\r\ncontent-type: application/json\r\n\r\n">>, - JsonBytes, <<"\r\n--", Boundary/binary>>]), - atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts). + case lists:any(fun(#att{data=Data})-> Data /= stub end, Atts) of + true -> + WriteFun([<<"--", Boundary/binary, + "\r\ncontent-type: application/json\r\n\r\n">>, + JsonBytes, <<"\r\n--", Boundary/binary>>]), + atts_to_mp(Atts, Boundary, WriteFun, SendEncodedAtts); + false -> + WriteFun(JsonBytes) + end. -atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos, _SendEncAtts) -> +atts_to_mp([], _Boundary, WriteFun, _SendEncAtts) -> WriteFun(<<"--">>); -atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun, - AttsSinceRevPos, SendEncodedAtts) when RevPos > AttsSinceRevPos -> +atts_to_mp([#att{data=stub} | RestAtts], Boundary, WriteFun, + SendEncodedAtts) -> + atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts); +atts_to_mp([Att | RestAtts], Boundary, WriteFun, + SendEncodedAtts) -> WriteFun(<<"\r\n\r\n">>), AttFun = case SendEncodedAtts of false -> @@ -429,16 +435,15 @@ atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun, end, AttFun(Att, fun(Data, ok) -> WriteFun(Data) end, ok), WriteFun(<<"\r\n--", Boundary/binary>>), - atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts); -atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos, - SendEncodedAtts) -> - atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts). + atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts). doc_from_multi_part_stream(ContentType, DataFun) -> + Self = self(), Parser = spawn_link(fun() -> couch_httpd:parse_multipart_request(ContentType, DataFun, - fun(Next)-> mp_parse_doc(Next, []) end) + fun(Next)-> mp_parse_doc(Next, []) end), + unlink(Self) end), Parser ! {get_doc_bytes, self()}, receive {doc_bytes, DocBytes} -> ok end, diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 1262b2ef..de434f32 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -266,7 +266,6 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) - throw({bad_request, "can't do a full commit ahead of current update_seq"}); RequiredSeq > CommittedSeq -> - % user asked for an explicit sequence, don't commit any batches couch_db:ensure_full_commit(Db); true -> {ok, Db#db.instance_start_time} @@ -564,14 +563,13 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) -> atts_since = AttsSince } = parse_doc_query(Req), case Revs of - [] -> - Doc = couch_doc_open(Db, DocId, Rev, Options), + [] -> Options2 = if AttsSince /= nil -> - RevPos = find_ancestor_rev_pos(Doc#doc.revs, AttsSince), - [{atts_after_revpos, RevPos} | Options]; + [{atts_since, AttsSince}, attachments | Options]; true -> Options end, + Doc = couch_doc_open(Db, DocId, Rev, Options2), send_doc(Req, Doc, Options2); _ -> {ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options), @@ -699,17 +697,6 @@ db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) -> db_doc_req(Req, _Db, _DocId) -> send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY"). -find_ancestor_rev_pos({_, []}, _AttsSinceRevs) -> - 0; -find_ancestor_rev_pos(_DocRevs, []) -> - 0; -find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) -> - case lists:member({RevPos, RevId}, AttsSinceRevs) of - true -> - RevPos; - false -> - find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs) - end. send_doc(Req, Doc, Options) -> case Doc#doc.meta of @@ -727,8 +714,7 @@ send_doc(Req, Doc, Options) -> send_doc_efficiently(Req, #doc{atts=[]}=Doc, Headers, Options) -> send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)); send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) -> - case lists:member(attachments, Options) orelse - proplists:is_defined(atts_after_revpos, Options) of + case lists:member(attachments, Options) of true -> AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of undefined -> []; @@ -740,14 +726,12 @@ send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) -> true -> Boundary = couch_uuids:random(), JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [follows|Options])), - AttsSinceRevPos = couch_util:get_value(atts_after_revpos, Options, 0), - Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts, - AttsSinceRevPos,false), + Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes, + Atts,false), CType = {<<"Content-Type">>, <<"multipart/related; boundary=\"", Boundary/binary, "\"">>}, {ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len), couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts, - AttsSinceRevPos, fun(Data) -> couch_httpd:send(Resp, Data) end, false) end; false -> diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index 731a551f..3ab39797 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -90,12 +90,12 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> JsonBytes = ?JSON_ENCODE( couch_doc:to_json_obj( Doc, - [follows, att_encoding_info, {atts_after_revpos, 0}] + [follows, att_encoding_info, attachments] ) ), Boundary = couch_uuids:random(), Len = couch_doc:len_doc_to_multi_part_stream( - Boundary, JsonBytes, Atts, 0, true + Boundary, JsonBytes, Atts, true ), {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000), _StreamerPid = spawn_link( @@ -104,7 +104,6 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> Boundary, JsonBytes, Atts, - 0, fun(Data) -> couch_work_queue:queue(DataQueue, Data) end, true ), @@ -116,7 +115,7 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> closed -> eof; {ok, Data} -> - {ok, iolist_to_binary(lists:reverse(Data)), Acc} + {ok, iolist_to_binary(Data), Acc} end end, Request = Db#http_db{ diff --git a/src/couchdb/couch_work_queue.erl b/src/couchdb/couch_work_queue.erl index ca9445d3..3581aaf7 100644 --- a/src/couchdb/couch_work_queue.erl +++ b/src/couchdb/couch_work_queue.erl @@ -13,11 +13,11 @@ -module(couch_work_queue). -behaviour(gen_server). --export([new/2,queue/2,dequeue/1,close/1]). +-export([new/2,queue/2,dequeue/1,dequeue/2,close/1]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). -record(q, { - buffer=[], + queue=queue:new(), blocked=[], max_size, max_items, @@ -34,7 +34,10 @@ queue(Wq, Item) -> gen_server:call(Wq, {queue, Item}, infinity). dequeue(Wq) -> - try gen_server:call(Wq, dequeue, infinity) + dequeue(Wq, all). + +dequeue(Wq, MaxItems) -> + try gen_server:call(Wq, {dequeue, MaxItems}, infinity) catch _:_ -> closed end. @@ -48,13 +51,13 @@ init({MaxSize,MaxItems}) -> terminate(_Reason, #q{work_waiter=nil}) -> ok; -terminate(_Reason, #q{work_waiter=WW}) -> - gen_server:reply(WW, closed). +terminate(_Reason, #q{work_waiter={WWFrom, _}}) -> + gen_server:reply(WWFrom, closed). handle_call({queue, Item}, From, #q{work_waiter=nil}=Q0) -> Q = Q0#q{size=Q0#q.size + byte_size(term_to_binary(Item)), items=Q0#q.items + 1, - buffer=[Item | Q0#q.buffer]}, + queue=queue:in(Item, Q0#q.queue)}, case (Q#q.size >= Q#q.max_size) orelse (Q#q.items >= Q#q.max_items) of true -> @@ -62,24 +65,44 @@ handle_call({queue, Item}, From, #q{work_waiter=nil}=Q0) -> false -> {reply, ok, Q} end; -handle_call({queue, Item}, _From, #q{work_waiter=WW}=Q) -> - gen_server:reply(WW, {ok, [Item]}), +handle_call({queue, Item}, _From, #q{work_waiter={WWFrom, _Max}}=Q) -> + gen_server:reply(WWFrom, {ok, [Item]}), {reply, ok, Q#q{work_waiter=nil}}; -handle_call(dequeue, _From, #q{work_waiter=WW}) when WW /= nil -> +handle_call({dequeue, _Max}, _From, #q{work_waiter=WW}) when WW /= nil -> exit("Only one caller allowed to wait for work at a time"); -handle_call(dequeue, From, #q{items=0}=Q) -> - {noreply, Q#q{work_waiter=From}}; -handle_call(dequeue, _From, #q{buffer=Buff, max_size=MaxSize, - max_items=MaxItems, close_on_dequeue=Close}=Q) -> - [gen_server:reply(From, ok) || From <- Q#q.blocked], - Q2 = #q{max_size=MaxSize, max_items=MaxItems}, - if Close -> - {stop, normal, {ok, Buff}, Q2}; +handle_call({dequeue, Max}, From, #q{items=0}=Q) -> + {noreply, Q#q{work_waiter={From, Max}}}; +handle_call({dequeue, Max}, _From, #q{queue=Queue, max_size=MaxSize, + max_items=MaxItems, items=Items,close_on_dequeue=Close}=Q) -> + if Max >= Items orelse Max == all -> + [gen_server:reply(From, ok) || From <- Q#q.blocked], + Q2 = #q{max_size=MaxSize, max_items=MaxItems}, + if Close -> + {stop, normal, {ok, queue:to_list(Queue)}, Q2}; + true -> + {reply, {ok, queue:to_list(Queue)}, Q2} + end; true -> - {reply, {ok, Buff}, #q{max_size=MaxSize, max_items=MaxItems}} + {DequeuedItems, Queue2, Blocked2} = + dequeue_items(Max, Queue, Q#q.blocked, []), + {reply, {ok, DequeuedItems}, + Q#q{items=Items-Max,blocked=Blocked2,queue=Queue2}} end. -handle_cast(close, #q{buffer=[]}=Q) -> +dequeue_items(0, Queue, Blocked, DequeuedAcc) -> + {lists:reverse(DequeuedAcc), Queue, Blocked}; +dequeue_items(NumItems, Queue, Blocked, DequeuedAcc) -> + {{value, Item}, Queue2} = queue:out(Queue), + case Blocked of + [] -> + Blocked2 = Blocked; + [From|Blocked2] -> + gen_server:reply(From, ok) + end, + dequeue_items(NumItems-1, Queue2, Blocked2, [Item | DequeuedAcc]). + + +handle_cast(close, #q{items=0}=Q) -> {stop, normal, Q}; handle_cast(close, Q) -> {noreply, Q#q{close_on_dequeue=true}}. -- cgit v1.2.3