summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2010-05-20 21:47:51 +0000
committerDamien F. Katz <damien@apache.org>2010-05-20 21:47:51 +0000
commite1691e485af06d9b502e33d2f6ea424003801847 (patch)
tree20fd9de2d225bc64d5c779262669a6fc98333be5
parent83b5c1c4e1be85a3df441c1e1adddb7beef316ac (diff)
Refactoring of various internal APIs, particularly those dealing with replicating documents with attachments.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@946803 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--share/www/script/test/changes.js4
-rw-r--r--src/couchdb/couch_changes.erl55
-rw-r--r--src/couchdb/couch_db.erl52
-rw-r--r--src/couchdb/couch_doc.erl91
-rw-r--r--src/couchdb/couch_httpd_db.erl28
-rw-r--r--src/couchdb/couch_rep_writer.erl7
-rw-r--r--src/couchdb/couch_work_queue.erl61
7 files changed, 175 insertions, 123 deletions
diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js
index e109a074..b8e691b3 100644
--- a/share/www/script/test/changes.js
+++ b/share/www/script/test/changes.js
@@ -220,7 +220,7 @@ couchTests.changes = function(debug) {
req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/dynamic&field=bop");
resp = JSON.parse(req.responseText);
- T(resp.results.length == 1);
+ T(resp.results.length == 1, "changes_filter/dynamic&field=bop");
if (!is_safari && xhr) { // full test requires parallel connections
// filter with longpoll
@@ -352,7 +352,7 @@ couchTests.changes = function(debug) {
req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/conflicted");
resp = JSON.parse(req.responseText);
- T(resp.results.length == 1);
+ T(resp.results.length == 1, "filter=changes_filter/conflicted");
// test with erlang filter function
run_on_modified_server([{
diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl
index 9496c226..0c4691d0 100644
--- a/src/couchdb/couch_changes.erl
+++ b/src/couchdb/couch_changes.erl
@@ -16,8 +16,9 @@
-export([handle_changes/3]).
%% @type Req -> #httpd{} | {json_req, JsonObj()}
-handle_changes(#changes_args{}=Args1, Req, Db) ->
- Args = Args1#changes_args{filter=make_filter_fun(Args1#changes_args.filter, Req, Db)},
+handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
+ Args = Args1#changes_args{filter=
+ make_filter_fun(Args1#changes_args.filter, Style, Req, Db)},
StartSeq = case Args#changes_args.dir of
rev ->
couch_db:get_update_seq(Db);
@@ -72,14 +73,18 @@ handle_changes(#changes_args{}=Args1, Req, Db) ->
end.
%% @type Req -> #httpd{} | {json_req, JsonObj()}
-make_filter_fun(FilterName, Req, Db) ->
+make_filter_fun(FilterName, Style, Req, Db) ->
case [list_to_binary(couch_httpd:unquote(Part))
|| Part <- string:tokens(FilterName, "/")] of
[] ->
- fun(DocInfos) ->
- % doing this as a batch is more efficient for external filters
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} ||
- #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos]
+ fun(#doc_info{revs=[#rev_info{rev=Rev}|_]=Revs}) ->
+ case Style of
+ main_only ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+ all_docs ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
+ || #rev_info{rev=R} <- Revs]
+ end
end;
[DName, FName] ->
DesignId = <<"_design/", DName/binary>>,
@@ -87,17 +92,23 @@ make_filter_fun(FilterName, Req, Db) ->
% validate that the ddoc has the filter fun
#doc{body={Props}} = DDoc,
couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
- fun(DocInfos) ->
+ fun(DocInfo) ->
+ DocInfos =
+ case Style of
+ main_only ->
+ [DocInfo];
+ all_docs ->
+ [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
+ end,
Docs = [Doc || {ok, Doc} <- [
- {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts])
- || DInfo <- DocInfos]],
+ couch_db:open_doc(Db, DocInfo2, [deleted, conflicts])
+ || DocInfo2 <- DocInfos]],
{ok, Passes} = couch_query_servers:filter_docs(
Req, Db, DDoc, FName, Docs
),
- % ?LOG_INFO("filtering ~p ~w",[FName, [DI#doc_info.high_seq || DI <- DocInfos]]),
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}
- || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos,
- Pass <- Passes, Pass == true]
+ [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
+ || {Pass, #doc{revs={RevPos,[RevId|_]}}}
+ <- lists:zip(Passes, Docs), Pass == true]
end;
_Else ->
throw({bad_request,
@@ -195,12 +206,12 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
end_sending_changes(Callback, EndSeq, ResponseType) ->
Callback({stop, EndSeq}, ResponseType).
-changes_enumerator(DocInfos, {Db, _, _, FilterFun, Callback, "continuous",
+changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous",
Limit, IncludeDocs}) ->
- [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_]
- = DocInfos,
- Results0 = FilterFun(DocInfos),
+ #doc_info{id=Id, high_seq=Seq,
+ revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo,
+ Results0 = FilterFun(DocInfo),
Results = [Result || Result <- Results0, Result /= null],
Go = if Limit =< 1 -> stop; true -> ok end,
case Results of
@@ -215,12 +226,12 @@ changes_enumerator(DocInfos, {Db, _, _, FilterFun, Callback, "continuous",
IncludeDocs}
}
end;
-changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Callback, ResponseType,
+changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType,
Limit, IncludeDocs}) ->
- [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_]
- = DocInfos,
- Results0 = FilterFun(DocInfos),
+ #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}
+ = DocInfo,
+ Results0 = FilterFun(DocInfo),
Results = [Result || Result <- Results0, Result /= null],
Go = if Limit =< 1 -> stop; true -> ok end,
case Results of
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 89530e73..cd077a2f 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -119,18 +119,47 @@ open_doc(Db, Id, Options) ->
{ok, #doc{deleted=true}=Doc} ->
case lists:member(deleted, Options) of
true ->
- {ok, Doc};
+ apply_open_options({ok, Doc},Options);
false ->
{not_found, deleted}
end;
Else ->
- Else
+ apply_open_options(Else,Options)
+ end.
+
+apply_open_options({ok, Doc},Options) ->
+ apply_open_options2(Doc,Options);
+apply_open_options(Else,_Options) ->
+ Else.
+
+apply_open_options2(Doc,[]) ->
+ {ok, Doc};
+apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
+ [{atts_since, PossibleAncestors}|Rest]) ->
+ RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
+ apply_open_options2(Doc#doc{atts=[A#att{data=
+ if AttPos>RevPos -> Data; true -> stub end}
+ || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest);
+apply_open_options2(Doc,[_|Rest]) ->
+ apply_open_options2(Doc,Rest).
+
+
+find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
+ 0;
+find_ancestor_rev_pos(_DocRevs, []) ->
+ 0;
+find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
+ case lists:member({RevPos, RevId}, AttsSinceRevs) of
+ true ->
+ RevPos;
+ false ->
+ find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
end.
open_doc_revs(Db, Id, Revs, Options) ->
couch_stats_collector:increment({couchdb, database_reads}),
- [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options),
- Result.
+ [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
+ {ok, [apply_open_options(Result, Options) || Result <- Results]}.
% Each returned result is a list of tuples:
% {Id, MissingRevs, PossibleAncestors}
@@ -437,7 +466,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
- couch_doc:merge_doc(Doc, #doc{}); % will throw exception
+ couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
false -> ok
end,
case Revs of
@@ -892,15 +921,16 @@ changes_since(Db, Style, StartSeq, Fun, Acc) ->
changes_since(Db, Style, StartSeq, Fun, Options, Acc) ->
Wrapper = fun(DocInfo, _Offset, Acc2) ->
#doc_info{revs=Revs} = DocInfo,
+ DocInfo2 =
case Style of
- main_only ->
- Infos = [DocInfo];
+ main_only ->
+ DocInfo;
all_docs ->
- % make each rev it's own doc info
- Infos = [DocInfo#doc_info{revs=[RevInfo]} ||
- #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]
+ % remove revs before the seq
+ DocInfo#doc_info{revs=[RevInfo ||
+ #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]}
end,
- Fun(Infos, Acc2)
+ Fun(DocInfo2, Acc2)
end,
{ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree,
Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index 0195fb1e..025d4e55 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -17,7 +17,7 @@
-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
-export([validate_docid/1]).
-export([doc_from_multi_part_stream/2]).
--export([doc_to_multi_part_stream/6, len_doc_to_multi_part_stream/5]).
+-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
-include("couch_db.hrl").
@@ -73,31 +73,25 @@ to_json_meta(Meta) ->
end, Meta).
to_json_attachments(Attachments, Options) ->
- RevPos = case lists:member(attachments, Options) of
- true -> % return all the binaries
- 0;
- false ->
- % note the default is [], because this sorts higher than all numbers.
- % and will return all the binaries.
- couch_util:get_value(atts_after_revpos, Options, [])
- end,
to_json_attachments(
Attachments,
- RevPos,
+ lists:member(attachments, Options),
lists:member(follows, Options),
lists:member(att_encoding_info, Options)
).
-to_json_attachments([], _RevPosIncludeAfter, _DataToFollow, _ShowEncInfo) ->
+to_json_attachments([], _OutputData, _DataToFollow, _ShowEncInfo) ->
[];
-to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowEncInfo) ->
+to_json_attachments(Atts, OutputData, DataToFollow, ShowEncInfo) ->
AttProps = lists:map(
fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) ->
{Att#att.name, {[
{<<"content_type">>, Att#att.type},
{<<"revpos">>, Att#att.revpos}
] ++
- if Att#att.revpos > RevPosIncludeAfter ->
+ if not OutputData orelse Att#att.data == stub ->
+ [{<<"length">>, DiskLen}, {<<"stub">>, true}];
+ true ->
if DataToFollow ->
[{<<"length">>, DiskLen}, {<<"follows">>, true}];
true ->
@@ -108,9 +102,7 @@ to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowEncInfo) ->
att_to_bin(Att)
end,
[{<<"data">>, base64:encode(AttData)}]
- end;
- true ->
- [{<<"length">>, DiskLen}, {<<"stub">>, true}]
+ end
end ++
case {ShowEncInfo, Enc} of
{false, _} ->
@@ -383,16 +375,13 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
ResultAcc = Fun(Bin, Acc),
fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
-len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos,
+len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts,
SendEncodedAtts) ->
- 2 + % "--"
- size(Boundary) +
- 36 + % "\r\ncontent-type: application/json\r\n\r\n"
- iolist_size(JsonBytes) +
- 4 + % "\r\n--"
- size(Boundary) +
- + lists:foldl(fun(#att{revpos=RevPos} = Att, AccAttsSize) ->
- if RevPos > AttsSinceRevPos ->
+ AttsSize = lists:foldl(fun(#att{data=Data} = Att, AccAttsSize) ->
+ case Data of
+ stub ->
+ AccAttsSize;
+ _ ->
AccAttsSize +
4 + % "\r\n\r\n"
case SendEncodedAtts of
@@ -402,24 +391,41 @@ len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos,
Att#att.disk_len
end +
4 + % "\r\n--"
- size(Boundary);
- true ->
- AccAttsSize
+ size(Boundary)
end
- end, 0, Atts) +
- 2. % "--"
+ end, 0, Atts),
+ if AttsSize == 0 ->
+ iolist_size(JsonBytes);
+ true ->
+ 2 + % "--"
+ size(Boundary) +
+ 36 + % "\r\ncontent-type: application/json\r\n\r\n"
+ iolist_size(JsonBytes) +
+ 4 + % "\r\n--"
+ size(Boundary) +
+ + AttsSize +
+ 2 % "--"
+ end.
-doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, WriteFun,
+doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun,
SendEncodedAtts) ->
- WriteFun([<<"--", Boundary/binary,
- "\r\ncontent-type: application/json\r\n\r\n">>,
- JsonBytes, <<"\r\n--", Boundary/binary>>]),
- atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts).
+ case lists:any(fun(#att{data=Data})-> Data /= stub end, Atts) of
+ true ->
+ WriteFun([<<"--", Boundary/binary,
+ "\r\ncontent-type: application/json\r\n\r\n">>,
+ JsonBytes, <<"\r\n--", Boundary/binary>>]),
+ atts_to_mp(Atts, Boundary, WriteFun, SendEncodedAtts);
+ false ->
+ WriteFun(JsonBytes)
+ end.
-atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos, _SendEncAtts) ->
+atts_to_mp([], _Boundary, WriteFun, _SendEncAtts) ->
WriteFun(<<"--">>);
-atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun,
- AttsSinceRevPos, SendEncodedAtts) when RevPos > AttsSinceRevPos ->
+atts_to_mp([#att{data=stub} | RestAtts], Boundary, WriteFun,
+ SendEncodedAtts) ->
+ atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts);
+atts_to_mp([Att | RestAtts], Boundary, WriteFun,
+ SendEncodedAtts) ->
WriteFun(<<"\r\n\r\n">>),
AttFun = case SendEncodedAtts of
false ->
@@ -429,16 +435,15 @@ atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun,
end,
AttFun(Att, fun(Data, ok) -> WriteFun(Data) end, ok),
WriteFun(<<"\r\n--", Boundary/binary>>),
- atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts);
-atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos,
- SendEncodedAtts) ->
- atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts).
+ atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts).
doc_from_multi_part_stream(ContentType, DataFun) ->
+ Self = self(),
Parser = spawn_link(fun() ->
couch_httpd:parse_multipart_request(ContentType, DataFun,
- fun(Next)-> mp_parse_doc(Next, []) end)
+ fun(Next)-> mp_parse_doc(Next, []) end),
+ unlink(Self)
end),
Parser ! {get_doc_bytes, self()},
receive {doc_bytes, DocBytes} -> ok end,
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 1262b2ef..de434f32 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -266,7 +266,6 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) -
throw({bad_request,
"can't do a full commit ahead of current update_seq"});
RequiredSeq > CommittedSeq ->
- % user asked for an explicit sequence, don't commit any batches
couch_db:ensure_full_commit(Db);
true ->
{ok, Db#db.instance_start_time}
@@ -564,14 +563,13 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
atts_since = AttsSince
} = parse_doc_query(Req),
case Revs of
- [] ->
- Doc = couch_doc_open(Db, DocId, Rev, Options),
+ [] ->
Options2 =
if AttsSince /= nil ->
- RevPos = find_ancestor_rev_pos(Doc#doc.revs, AttsSince),
- [{atts_after_revpos, RevPos} | Options];
+ [{atts_since, AttsSince}, attachments | Options];
true -> Options
end,
+ Doc = couch_doc_open(Db, DocId, Rev, Options2),
send_doc(Req, Doc, Options2);
_ ->
{ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options),
@@ -699,17 +697,6 @@ db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) ->
db_doc_req(Req, _Db, _DocId) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY").
-find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
- 0;
-find_ancestor_rev_pos(_DocRevs, []) ->
- 0;
-find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
- case lists:member({RevPos, RevId}, AttsSinceRevs) of
- true ->
- RevPos;
- false ->
- find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
- end.
send_doc(Req, Doc, Options) ->
case Doc#doc.meta of
@@ -727,8 +714,7 @@ send_doc(Req, Doc, Options) ->
send_doc_efficiently(Req, #doc{atts=[]}=Doc, Headers, Options) ->
send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options));
send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) ->
- case lists:member(attachments, Options) orelse
- proplists:is_defined(atts_after_revpos, Options) of
+ case lists:member(attachments, Options) of
true ->
AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of
undefined -> [];
@@ -740,14 +726,12 @@ send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) ->
true ->
Boundary = couch_uuids:random(),
JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [follows|Options])),
- AttsSinceRevPos = couch_util:get_value(atts_after_revpos, Options, 0),
- Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
- AttsSinceRevPos,false),
+ Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,
+ Atts,false),
CType = {<<"Content-Type">>,
<<"multipart/related; boundary=\"", Boundary/binary, "\"">>},
{ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len),
couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
- AttsSinceRevPos,
fun(Data) -> couch_httpd:send(Resp, Data) end, false)
end;
false ->
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index 731a551f..3ab39797 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -90,12 +90,12 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
JsonBytes = ?JSON_ENCODE(
couch_doc:to_json_obj(
Doc,
- [follows, att_encoding_info, {atts_after_revpos, 0}]
+ [follows, att_encoding_info, attachments]
)
),
Boundary = couch_uuids:random(),
Len = couch_doc:len_doc_to_multi_part_stream(
- Boundary, JsonBytes, Atts, 0, true
+ Boundary, JsonBytes, Atts, true
),
{ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),
_StreamerPid = spawn_link(
@@ -104,7 +104,6 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
Boundary,
JsonBytes,
Atts,
- 0,
fun(Data) -> couch_work_queue:queue(DataQueue, Data) end,
true
),
@@ -116,7 +115,7 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
closed ->
eof;
{ok, Data} ->
- {ok, iolist_to_binary(lists:reverse(Data)), Acc}
+ {ok, iolist_to_binary(Data), Acc}
end
end,
Request = Db#http_db{
diff --git a/src/couchdb/couch_work_queue.erl b/src/couchdb/couch_work_queue.erl
index ca9445d3..3581aaf7 100644
--- a/src/couchdb/couch_work_queue.erl
+++ b/src/couchdb/couch_work_queue.erl
@@ -13,11 +13,11 @@
-module(couch_work_queue).
-behaviour(gen_server).
--export([new/2,queue/2,dequeue/1,close/1]).
+-export([new/2,queue/2,dequeue/1,dequeue/2,close/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
-record(q, {
- buffer=[],
+ queue=queue:new(),
blocked=[],
max_size,
max_items,
@@ -34,7 +34,10 @@ queue(Wq, Item) ->
gen_server:call(Wq, {queue, Item}, infinity).
dequeue(Wq) ->
- try gen_server:call(Wq, dequeue, infinity)
+ dequeue(Wq, all).
+
+dequeue(Wq, MaxItems) ->
+ try gen_server:call(Wq, {dequeue, MaxItems}, infinity)
catch
_:_ -> closed
end.
@@ -48,13 +51,13 @@ init({MaxSize,MaxItems}) ->
terminate(_Reason, #q{work_waiter=nil}) ->
ok;
-terminate(_Reason, #q{work_waiter=WW}) ->
- gen_server:reply(WW, closed).
+terminate(_Reason, #q{work_waiter={WWFrom, _}}) ->
+ gen_server:reply(WWFrom, closed).
handle_call({queue, Item}, From, #q{work_waiter=nil}=Q0) ->
Q = Q0#q{size=Q0#q.size + byte_size(term_to_binary(Item)),
items=Q0#q.items + 1,
- buffer=[Item | Q0#q.buffer]},
+ queue=queue:in(Item, Q0#q.queue)},
case (Q#q.size >= Q#q.max_size) orelse
(Q#q.items >= Q#q.max_items) of
true ->
@@ -62,24 +65,44 @@ handle_call({queue, Item}, From, #q{work_waiter=nil}=Q0) ->
false ->
{reply, ok, Q}
end;
-handle_call({queue, Item}, _From, #q{work_waiter=WW}=Q) ->
- gen_server:reply(WW, {ok, [Item]}),
+handle_call({queue, Item}, _From, #q{work_waiter={WWFrom, _Max}}=Q) ->
+ gen_server:reply(WWFrom, {ok, [Item]}),
{reply, ok, Q#q{work_waiter=nil}};
-handle_call(dequeue, _From, #q{work_waiter=WW}) when WW /= nil ->
+handle_call({dequeue, _Max}, _From, #q{work_waiter=WW}) when WW /= nil ->
exit("Only one caller allowed to wait for work at a time");
-handle_call(dequeue, From, #q{items=0}=Q) ->
- {noreply, Q#q{work_waiter=From}};
-handle_call(dequeue, _From, #q{buffer=Buff, max_size=MaxSize,
- max_items=MaxItems, close_on_dequeue=Close}=Q) ->
- [gen_server:reply(From, ok) || From <- Q#q.blocked],
- Q2 = #q{max_size=MaxSize, max_items=MaxItems},
- if Close ->
- {stop, normal, {ok, Buff}, Q2};
+handle_call({dequeue, Max}, From, #q{items=0}=Q) ->
+ {noreply, Q#q{work_waiter={From, Max}}};
+handle_call({dequeue, Max}, _From, #q{queue=Queue, max_size=MaxSize,
+ max_items=MaxItems, items=Items,close_on_dequeue=Close}=Q) ->
+ if Max >= Items orelse Max == all ->
+ [gen_server:reply(From, ok) || From <- Q#q.blocked],
+ Q2 = #q{max_size=MaxSize, max_items=MaxItems},
+ if Close ->
+ {stop, normal, {ok, queue:to_list(Queue)}, Q2};
+ true ->
+ {reply, {ok, queue:to_list(Queue)}, Q2}
+ end;
true ->
- {reply, {ok, Buff}, #q{max_size=MaxSize, max_items=MaxItems}}
+ {DequeuedItems, Queue2, Blocked2} =
+ dequeue_items(Max, Queue, Q#q.blocked, []),
+ {reply, {ok, DequeuedItems},
+ Q#q{items=Items-Max,blocked=Blocked2,queue=Queue2}}
end.
-handle_cast(close, #q{buffer=[]}=Q) ->
+dequeue_items(0, Queue, Blocked, DequeuedAcc) ->
+ {lists:reverse(DequeuedAcc), Queue, Blocked};
+dequeue_items(NumItems, Queue, Blocked, DequeuedAcc) ->
+ {{value, Item}, Queue2} = queue:out(Queue),
+ case Blocked of
+ [] ->
+ Blocked2 = Blocked;
+ [From|Blocked2] ->
+ gen_server:reply(From, ok)
+ end,
+ dequeue_items(NumItems-1, Queue2, Blocked2, [Item | DequeuedAcc]).
+
+
+handle_cast(close, #q{items=0}=Q) ->
{stop, normal, Q};
handle_cast(close, Q) ->
{noreply, Q#q{close_on_dequeue=true}}.