diff options
Diffstat (limited to 'src/couchdb/couch_file.erl')
-rw-r--r-- | src/couchdb/couch_file.erl | 74 |
1 files changed, 67 insertions, 7 deletions
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index b48c9bf3..c04ac33a 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -20,6 +20,7 @@ -export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]). -export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). +-export([close_maybe/1,drop_ref/1,drop_ref/2,add_ref/1,add_ref/2,num_refs/1]). %%---------------------------------------------------------------------- %% Args: Valid Options are [create] and [create,overwrite]. @@ -164,7 +165,25 @@ sync(Fd) -> %%---------------------------------------------------------------------- close(Fd) -> gen_server:cast(Fd, close). + +close_maybe(Fd) -> + gen_server:cast(Fd, {close_maybe, self()}). + +drop_ref(Fd) -> + drop_ref(Fd, self()). + +drop_ref(Fd, Pid) -> + gen_server:cast(Fd, {drop_ref, Pid}). + + +add_ref(Fd) -> + add_ref(Fd, self()). +add_ref(Fd, Pid) -> + gen_server:call(Fd, {add_ref, Pid}). + +num_refs(Fd) -> + gen_server:call(Fd, num_refs). write_header(Fd, Prefix, Data) -> TermBin = term_to_binary(Data), @@ -267,7 +286,7 @@ init_status_ok(ReturnPid, Fd) -> init_status_error(ReturnPid, Error) -> ReturnPid ! {self(), Error}, % signal back error status - self() ! self_close, % tell ourself to close async + gen_server:cast(self(), close), % tell ourself to close async {ok, nil}. % server functions @@ -342,16 +361,57 @@ handle_call({pread_bin, Pos}, _From, Fd) -> {ok, <<TermLen:32>>} = file:pread(Fd, Pos, 4), {ok, Bin} = file:pread(Fd, Pos + 4, TermLen), - {reply, {ok, Bin}, Fd}. + {reply, {ok, Bin}, Fd}; +handle_call({add_ref, Pid},_From, Fd) -> + undefined = put(Pid, erlang:monitor(process, Pid)), + {reply, ok, Fd}; +handle_call(num_refs, _From, Fd) -> + {monitors, Monitors} = process_info(self(), monitors), + {reply, length(Monitors), Fd}. + handle_cast(close, Fd) -> - {stop,normal,Fd}. % causes terminate to be called + {stop,normal,Fd}; +handle_cast({close_maybe, Pid}, Fd) -> + catch unlink(Pid), + maybe_close_async(Fd); +handle_cast({drop_ref, Pid}, Fd) -> + % don't check return of demonitor. The process could haved crashed causing + % the {'DOWN', ...} message to be sent and the process unmonitored. + erlang:demonitor(erase(Pid), [flush]), + maybe_close_async(Fd). + code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_info(self_close, State) -> - {stop,normal,State}; -handle_info(_Info, State) -> - {noreply, State}. +handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, Fd) -> + MonitorRef = erase(Pid), + maybe_close_async(Fd); +handle_info(Info, Fd) -> + exit({error, {Info, Fd}}). + + + +should_close(Fd) -> + case process_info(self(), links) of + {links, [Fd]} -> + % no linkers left (except our fd). What about monitors? + case process_info(self(), monitors) of + {monitors, []} -> + true; + _ -> + false + end; + {links, Links} when length(Links) > 1 -> + false + end. + +maybe_close_async(Fd) -> + case should_close(Fd) of + true -> + {stop,normal,Fd}; + false -> + {noreply,Fd} + end. |