summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2010-06-03 03:56:44 +0000
committerDamien F. Katz <damien@apache.org>2010-06-03 03:56:44 +0000
commit50e4e3bfccb863e7a4a61306b2121828509ed261 (patch)
tree8ff168d9371ec6e4b9f5789c69ae6573023bf50c
parent9807cda46b01f31bc77025c34a8b7f4275be6563 (diff)
More work to allow for streaming attachment replication.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@950865 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_db.erl2
-rw-r--r--src/couchdb/couch_db.hrl1
-rw-r--r--src/couchdb/couch_doc.erl63
-rw-r--r--src/couchdb/couch_httpd.erl10
-rw-r--r--src/couchdb/couch_httpd_db.erl104
-rw-r--r--src/couchdb/couch_rep_writer.erl2
6 files changed, 115 insertions, 67 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 5222cb5e..63ada164 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -523,7 +523,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
fun(Doc, {AccPrepped2, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
- couch_doc:merge_doc(Doc, #doc{}); % will throw exception
+ couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
false -> ok
end,
case validate_doc_update(Db, Doc, fun() -> nil end) of
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 5bc6ebaa..f82406e8 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -286,3 +286,4 @@
filter = "",
include_docs = false
}).
+
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index cb8a116c..d6eeb9fa 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -375,8 +375,7 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
ResultAcc = Fun(Bin, Acc),
fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
-len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts,
- SendEncodedAtts) ->
+len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, SendEncodedAtts) ->
AttsSize = lists:foldl(fun(#att{data=Data} = Att, AccAttsSize) ->
case Data of
stub ->
@@ -395,16 +394,18 @@ len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts,
end
end, 0, Atts),
if AttsSize == 0 ->
- iolist_size(JsonBytes);
+ {<<"application/json">>, iolist_size(JsonBytes)};
true ->
- 2 + % "--"
- size(Boundary) +
- 36 + % "\r\ncontent-type: application/json\r\n\r\n"
- iolist_size(JsonBytes) +
- 4 + % "\r\n--"
- size(Boundary) +
- + AttsSize +
- 2 % "--"
+ {<<"multipart/related; boundary=\"", Boundary/binary, "\"">>,
+ 2 + % "--"
+ size(Boundary) +
+ 36 + % "\r\ncontent-type: application/json\r\n\r\n"
+ iolist_size(JsonBytes) +
+ 4 + % "\r\n--"
+ size(Boundary) +
+ + AttsSize +
+ 2 % "--"
+ }
end.
doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun,
@@ -446,26 +447,30 @@ doc_from_multi_part_stream(ContentType, DataFun) ->
unlink(Self)
end),
Parser ! {get_doc_bytes, self()},
- receive {doc_bytes, DocBytes} -> ok end,
- Doc = from_json_obj(?JSON_DECODE(DocBytes)),
- % go through the attachments looking for 'follows' in the data,
- % replace with function that reads the data from MIME stream.
- ReadAttachmentDataFun = fun() ->
- Parser ! {get_bytes, self()},
- receive {bytes, Bytes} -> Bytes end
- end,
- Atts2 = lists:map(
- fun(#att{data=follows}=A) ->
- A#att{data=ReadAttachmentDataFun};
- (A) ->
- A
- end, Doc#doc.atts),
- Doc#doc{atts=Atts2}.
+ receive
+ {doc_bytes, DocBytes} ->
+ Doc = from_json_obj(?JSON_DECODE(DocBytes)),
+ % go through the attachments looking for 'follows' in the data,
+ % replace with function that reads the data from MIME stream.
+ ReadAttachmentDataFun = fun() ->
+ Parser ! {get_bytes, self()},
+ receive {bytes, Bytes} -> Bytes end
+ end,
+ Atts2 = lists:map(
+ fun(#att{data=follows}=A) ->
+ A#att{data=ReadAttachmentDataFun};
+ (A) ->
+ A
+ end, Doc#doc.atts),
+ {ok, Doc#doc{atts=Atts2}}
+ end.
mp_parse_doc({headers, H}, []) ->
- {"application/json", _} = couch_util:get_value("content-type", H),
- fun (Next) ->
- mp_parse_doc(Next, [])
+ case couch_util:get_value("content-type", H) of
+ {"application/json", _} ->
+ fun (Next) ->
+ mp_parse_doc(Next, [])
+ end
end;
mp_parse_doc({body, Bytes}, AccBytes) ->
fun (Next) ->
diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl
index d9e91e4b..7f6232e9 100644
--- a/src/couchdb/couch_httpd.erl
+++ b/src/couchdb/couch_httpd.erl
@@ -772,13 +772,15 @@ parse_multipart_request(ContentType, DataFun, Callback) ->
nil_callback(_Data)->
fun(Next) -> nil_callback(Next) end.
-get_boundary(ContentType) ->
- {"multipart/" ++ _, Opts} = mochiweb_util:parse_header(ContentType),
+get_boundary({"multipart/" ++ _, Opts}) ->
case couch_util:get_value("boundary", Opts) of
S when is_list(S) ->
S
- end.
-
+ end;
+get_boundary(ContentType) ->
+ {"multipart/" ++ _ , Opts} = mochiweb_util:parse_header(ContentType),
+ get_boundary({"multipart/", Opts}).
+
split_header(<<>>) ->
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 1f16ffcc..87fc15d8 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -403,7 +403,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_revs_diff">>]}=Req, Db) ->
couch_doc:revs_to_strs(PossibleAncestors)}]
end}}
end, Results),
- send_json(Req, {[{missing, {Results2}}]});
+ send_json(Req, {Results2});
db_req(#httpd{path_parts=[_,<<"_revs_diff">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");
@@ -573,27 +573,36 @@ db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
send_doc(Req, Doc, Options2);
_ ->
{ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options),
- {ok, Resp} = start_json_response(Req, 200),
- send_chunk(Resp, "["),
- % We loop through the docs. The first time through the separator
- % is whitespace, then a comma on subsequent iterations.
- lists:foldl(
- fun(Result, AccSeparator) ->
- case Result of
- {ok, Doc} ->
- JsonDoc = couch_doc:to_json_obj(Doc, Options),
- Json = ?JSON_ENCODE({[{ok, JsonDoc}]}),
- send_chunk(Resp, AccSeparator ++ Json);
- {{not_found, missing}, RevId} ->
- RevStr = couch_doc:rev_to_str(RevId),
- Json = ?JSON_ENCODE({[{"missing", RevStr}]}),
- send_chunk(Resp, AccSeparator ++ Json)
+ AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of
+ undefined -> [];
+ AcceptHeader -> string:tokens(AcceptHeader, "; ")
+ end,
+ case lists:member("multipart/mixed", AcceptedTypes) of
+ false ->
+ {ok, Resp} = start_json_response(Req, 200),
+ send_chunk(Resp, "["),
+ % We loop through the docs. The first time through the separator
+ % is whitespace, then a comma on subsequent iterations.
+ lists:foldl(
+ fun(Result, AccSeparator) ->
+ case Result of
+ {ok, Doc} ->
+ JsonDoc = couch_doc:to_json_obj(Doc, Options),
+ Json = ?JSON_ENCODE({[{ok, JsonDoc}]}),
+ send_chunk(Resp, AccSeparator ++ Json);
+ {{not_found, missing}, RevId} ->
+ RevStr = couch_doc:rev_to_str(RevId),
+ Json = ?JSON_ENCODE({[{"missing", RevStr}]}),
+ send_chunk(Resp, AccSeparator ++ Json)
+ end,
+ "," % AccSeparator now has a comma
end,
- "," % AccSeparator now has a comma
- end,
- "", Results),
- send_chunk(Resp, "]"),
- end_json_response(Resp)
+ "", Results),
+ send_chunk(Resp, "]"),
+ end_json_response(Resp);
+ true ->
+ send_docs_multipart(Req, Results, Options)
+ end
end;
db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
@@ -647,13 +656,13 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
Loc = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)),
RespHeaders = [{"Location", Loc}],
- case couch_httpd:header_value(Req, "Content-Type") of
- ("multipart/related" ++ _Rest) = ContentType->
- Doc0 = couch_doc:doc_from_multi_part_stream(ContentType,
+ case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
+ ("multipart/related;" ++ _) = ContentType ->
+ {ok, Doc0} = couch_doc:doc_from_multi_part_stream(ContentType,
fun() -> receive_request_data(Req) end),
Doc = couch_doc_from_req(Req, DocId, Doc0),
update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType);
- _ ->
+ _Else ->
case couch_httpd:qs_value(Req, "batch") of
"ok" ->
% batch
@@ -672,7 +681,8 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
]});
_Normal ->
% normal
- Doc = couch_doc_from_req(Req, DocId, couch_httpd:json_body(Req)),
+ Body = couch_httpd:json_body(Req),
+ Doc = couch_doc_from_req(Req, DocId, Body),
update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType)
end
end;
@@ -725,11 +735,11 @@ send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) ->
send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options));
true ->
Boundary = couch_uuids:random(),
- JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [follows|Options])),
- Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,
- Atts,false),
- CType = {<<"Content-Type">>,
- <<"multipart/related; boundary=\"", Boundary/binary, "\"">>},
+ JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc,
+ [attachments, follows|Options])),
+ {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
+ Boundary,JsonBytes, Atts,false),
+ CType = {<<"Content-Type">>, ContentType},
{ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len),
couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
fun(Data) -> couch_httpd:send(Resp, Data) end, false)
@@ -738,7 +748,35 @@ send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) ->
send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options))
end.
-
+send_docs_multipart(Req, Results, Options) ->
+ OuterBoundary = couch_uuids:random(),
+ InnerBoundary = couch_uuids:random(),
+ CType = {"Content-Type",
+ "multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""},
+ {ok, Resp} = start_chunked_response(Req, 200, [CType]),
+ couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>),
+ lists:foreach(
+ fun({ok, #doc{atts=Atts}=Doc}) ->
+ JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc,
+ [attachments,follows|Options])),
+ {ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream(
+ InnerBoundary, JsonBytes, Atts, false),
+ couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: ",
+ ContentType/binary, "\r\n\r\n">>),
+ couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts,
+ fun(Data) -> couch_httpd:send_chunk(Resp, Data)
+ end, false),
+ couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>);
+ ({{not_found, missing}, RevId}) ->
+ RevStr = couch_doc:rev_to_str(RevId),
+ Json = ?JSON_ENCODE({[{"missing", RevStr}]}),
+ couch_httpd:send_chunk(Resp,
+ [<<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
+ Json,
+ <<"\r\n--", OuterBoundary/binary>>])
+ end, Results),
+ couch_httpd:send_chunk(Resp, <<"--">>),
+ couch_httpd:last_chunk(Resp).
receive_request_data(Req) ->
{couch_httpd:recv(Req, 0), fun() -> receive_request_data(Req) end}.
@@ -791,6 +829,8 @@ couch_doc_from_req(Req, DocId, #doc{revs=Revs}=Doc) ->
case extract_header_rev(Req, ExplicitDocRev) of
missing_rev ->
Revs2 = {0, []};
+ ExplicitDocRev ->
+ Revs2 = Revs;
{Pos, Rev} ->
Revs2 = {Pos, [Rev]}
end,
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index 3ab39797..3a337255 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -94,7 +94,7 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
)
),
Boundary = couch_uuids:random(),
- Len = couch_doc:len_doc_to_multi_part_stream(
+ {_ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
Boundary, JsonBytes, Atts, true
),
{ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),