summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_file.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_file.erl')
-rw-r--r--src/couchdb/couch_file.erl74
1 files changed, 67 insertions, 7 deletions
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index b48c9bf3..c04ac33a 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -20,6 +20,7 @@
-export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
-export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+-export([close_maybe/1,drop_ref/1,drop_ref/2,add_ref/1,add_ref/2,num_refs/1]).
%%----------------------------------------------------------------------
%% Args: Valid Options are [create] and [create,overwrite].
@@ -164,7 +165,25 @@ sync(Fd) ->
%%----------------------------------------------------------------------
close(Fd) ->
gen_server:cast(Fd, close).
+
+close_maybe(Fd) ->
+ gen_server:cast(Fd, {close_maybe, self()}).
+
+drop_ref(Fd) ->
+ drop_ref(Fd, self()).
+
+drop_ref(Fd, Pid) ->
+ gen_server:cast(Fd, {drop_ref, Pid}).
+
+
+add_ref(Fd) ->
+ add_ref(Fd, self()).
+add_ref(Fd, Pid) ->
+ gen_server:call(Fd, {add_ref, Pid}).
+
+num_refs(Fd) ->
+ gen_server:call(Fd, num_refs).
write_header(Fd, Prefix, Data) ->
TermBin = term_to_binary(Data),
@@ -267,7 +286,7 @@ init_status_ok(ReturnPid, Fd) ->
init_status_error(ReturnPid, Error) ->
ReturnPid ! {self(), Error}, % signal back error status
- self() ! self_close, % tell ourself to close async
+ gen_server:cast(self(), close), % tell ourself to close async
{ok, nil}.
% server functions
@@ -342,16 +361,57 @@ handle_call({pread_bin, Pos}, _From, Fd) ->
{ok, <<TermLen:32>>}
= file:pread(Fd, Pos, 4),
{ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
- {reply, {ok, Bin}, Fd}.
+ {reply, {ok, Bin}, Fd};
+handle_call({add_ref, Pid},_From, Fd) ->
+ undefined = put(Pid, erlang:monitor(process, Pid)),
+ {reply, ok, Fd};
+handle_call(num_refs, _From, Fd) ->
+ {monitors, Monitors} = process_info(self(), monitors),
+ {reply, length(Monitors), Fd}.
+
handle_cast(close, Fd) ->
- {stop,normal,Fd}. % causes terminate to be called
+ {stop,normal,Fd};
+handle_cast({close_maybe, Pid}, Fd) ->
+ catch unlink(Pid),
+ maybe_close_async(Fd);
+handle_cast({drop_ref, Pid}, Fd) ->
+ % don't check return of demonitor. The process could haved crashed causing
+ % the {'DOWN', ...} message to be sent and the process unmonitored.
+ erlang:demonitor(erase(Pid), [flush]),
+ maybe_close_async(Fd).
+
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-handle_info(self_close, State) ->
- {stop,normal,State};
-handle_info(_Info, State) ->
- {noreply, State}.
+handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, Fd) ->
+ MonitorRef = erase(Pid),
+ maybe_close_async(Fd);
+handle_info(Info, Fd) ->
+ exit({error, {Info, Fd}}).
+
+
+
+should_close(Fd) ->
+ case process_info(self(), links) of
+ {links, [Fd]} ->
+ % no linkers left (except our fd). What about monitors?
+ case process_info(self(), monitors) of
+ {monitors, []} ->
+ true;
+ _ ->
+ false
+ end;
+ {links, Links} when length(Links) > 1 ->
+ false
+ end.
+
+maybe_close_async(Fd) ->
+ case should_close(Fd) of
+ true ->
+ {stop,normal,Fd};
+ false ->
+ {noreply,Fd}
+ end.