summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/priv/couch_js/http.c675
-rw-r--r--src/couchdb/priv/couch_js/http.h18
-rw-r--r--src/couchdb/priv/couch_js/main.c338
-rw-r--r--src/couchdb/priv/couch_js/utf8.c286
-rw-r--r--src/couchdb/priv/couch_js/utf8.h19
-rw-r--r--src/couchdb/priv/spawnkillable/couchspawnkillable_win.c145
-rw-r--r--src/fabric.erl225
-rw-r--r--src/fabric_db_create.erl65
-rw-r--r--src/fabric_db_delete.erl41
-rw-r--r--src/fabric_db_doc_count.erl32
-rw-r--r--src/fabric_db_info.erl52
-rw-r--r--src/fabric_db_meta.erl35
-rw-r--r--src/fabric_dict.erl37
-rw-r--r--src/fabric_doc_attachments.erl102
-rw-r--r--src/fabric_doc_missing_revs.erl64
-rw-r--r--src/fabric_doc_open.erl66
-rw-r--r--src/fabric_doc_open_revs.erl65
-rw-r--r--src/fabric_doc_update.erl127
-rw-r--r--src/fabric_group_info.erl52
-rw-r--r--src/fabric_rpc.erl388
-rw-r--r--src/fabric_util.erl89
-rw-r--r--src/fabric_view.erl218
-rw-r--r--src/fabric_view_all_docs.erl167
-rw-r--r--src/fabric_view_changes.erl251
-rw-r--r--src/fabric_view_map.erl138
-rw-r--r--src/fabric_view_reduce.erl85
26 files changed, 1481 insertions, 2299 deletions
diff --git a/src/couchdb/priv/couch_js/http.c b/src/couchdb/priv/couch_js/http.c
new file mode 100644
index 00000000..6c2a8a82
--- /dev/null
+++ b/src/couchdb/priv/couch_js/http.c
@@ -0,0 +1,675 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <jsapi.h>
+#include <curl/curl.h>
+
+#include "utf8.h"
+
+#ifdef XP_WIN
+// Map some of the string function names to things which exist on Windows
+#define strcasecmp _strcmpi
+#define strncasecmp _strnicmp
+#define snprintf _snprintf
+#endif
+
+typedef struct curl_slist CurlHeaders;
+
+typedef struct {
+ int method;
+ char* url;
+ CurlHeaders* req_headers;
+ jsint last_status;
+} HTTPData;
+
+char* METHODS[] = {"GET", "HEAD", "POST", "PUT", "DELETE", "COPY", NULL};
+
+#define GET 0
+#define HEAD 1
+#define POST 2
+#define PUT 3
+#define DELETE 4
+#define COPY 5
+
+static JSBool
+go(JSContext* cx, JSObject* obj, HTTPData* http, char* body, size_t blen);
+
+static JSString*
+str_from_binary(JSContext* cx, char* data, size_t length);
+
+static JSBool
+constructor(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval)
+{
+ HTTPData* http = NULL;
+ JSBool ret = JS_FALSE;
+
+ http = (HTTPData*) malloc(sizeof(HTTPData));
+ if(!http)
+ {
+ JS_ReportError(cx, "Failed to create CouchHTTP instance.");
+ goto error;
+ }
+
+ http->method = -1;
+ http->url = NULL;
+ http->req_headers = NULL;
+ http->last_status = -1;
+
+ if(!JS_SetPrivate(cx, obj, http))
+ {
+ JS_ReportError(cx, "Failed to set private CouchHTTP data.");
+ goto error;
+ }
+
+ ret = JS_TRUE;
+ goto success;
+
+error:
+ if(http) free(http);
+
+success:
+ return ret;
+}
+
+static void
+destructor(JSContext* cx, JSObject* obj)
+{
+ HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj);
+ if(!http)
+ {
+ fprintf(stderr, "Unable to destroy invalid CouchHTTP instance.\n");
+ }
+ else
+ {
+ if(http->url) free(http->url);
+ if(http->req_headers) curl_slist_free_all(http->req_headers);
+ free(http);
+ }
+}
+
+static JSBool
+open(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval)
+{
+ HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj);
+ char* method = NULL;
+ char* url = NULL;
+ JSBool ret = JS_FALSE;
+ int methid;
+
+ if(!http)
+ {
+ JS_ReportError(cx, "Invalid CouchHTTP instance.");
+ goto done;
+ }
+
+ if(argv[0] == JSVAL_VOID)
+ {
+ JS_ReportError(cx, "You must specify a method.");
+ goto done;
+ }
+
+ method = enc_string(cx, argv[0], NULL);
+ if(!method)
+ {
+ JS_ReportError(cx, "Failed to encode method.");
+ goto done;
+ }
+
+ for(methid = 0; METHODS[methid] != NULL; methid++)
+ {
+ if(strcasecmp(METHODS[methid], method) == 0) break;
+ }
+
+ if(methid > COPY)
+ {
+ JS_ReportError(cx, "Invalid method specified.");
+ goto done;
+ }
+
+ http->method = methid;
+
+ if(argv[1] == JSVAL_VOID)
+ {
+ JS_ReportError(cx, "You must specify a URL.");
+ goto done;
+ }
+
+ if(http->url)
+ {
+ free(http->url);
+ http->url = NULL;
+ }
+
+ http->url = enc_string(cx, argv[1], NULL);
+ if(!http->url)
+ {
+ JS_ReportError(cx, "Failed to encode URL.");
+ goto done;
+ }
+
+ if(argv[2] != JSVAL_VOID && argv[2] != JSVAL_FALSE)
+ {
+ JS_ReportError(cx, "Synchronous flag must be false if specified.");
+ goto done;
+ }
+
+ if(http->req_headers)
+ {
+ curl_slist_free_all(http->req_headers);
+ http->req_headers = NULL;
+ }
+
+ // Disable Expect: 100-continue
+ http->req_headers = curl_slist_append(http->req_headers, "Expect:");
+
+ ret = JS_TRUE;
+
+done:
+ if(method) free(method);
+ return ret;
+}
+
+static JSBool
+setheader(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval)
+{
+ HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj);
+ char* keystr = NULL;
+ char* valstr = NULL;
+ char* hdrbuf = NULL;
+ size_t hdrlen = -1;
+ JSBool ret = JS_FALSE;
+
+ if(!http)
+ {
+ JS_ReportError(cx, "Invalid CouchHTTP instance.");
+ goto done;
+ }
+
+ if(argv[0] == JSVAL_VOID)
+ {
+ JS_ReportError(cx, "You must speciy a header name.");
+ goto done;
+ }
+
+ keystr = enc_string(cx, argv[0], NULL);
+ if(!keystr)
+ {
+ JS_ReportError(cx, "Failed to encode header name.");
+ goto done;
+ }
+
+ if(argv[1] == JSVAL_VOID)
+ {
+ JS_ReportError(cx, "You must specify a header value.");
+ goto done;
+ }
+
+ valstr = enc_string(cx, argv[1], NULL);
+ if(!valstr)
+ {
+ JS_ReportError(cx, "Failed to encode header value.");
+ goto done;
+ }
+
+ hdrlen = strlen(keystr) + strlen(valstr) + 3;
+ hdrbuf = (char*) malloc(hdrlen * sizeof(char));
+ if(!hdrbuf)
+ {
+ JS_ReportError(cx, "Failed to allocate header buffer.");
+ goto done;
+ }
+
+ snprintf(hdrbuf, hdrlen, "%s: %s", keystr, valstr);
+ http->req_headers = curl_slist_append(http->req_headers, hdrbuf);
+
+ ret = JS_TRUE;
+
+done:
+ if(keystr) free(keystr);
+ if(valstr) free(valstr);
+ if(hdrbuf) free(hdrbuf);
+
+ return ret;
+}
+
+static JSBool
+sendreq(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval)
+{
+ HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj);
+ char* body = NULL;
+ size_t bodylen = 0;
+ JSBool ret = JS_FALSE;
+
+ if(!http)
+ {
+ JS_ReportError(cx, "Invalid CouchHTTP instance.");
+ goto done;
+ }
+
+ if(argv[0] != JSVAL_VOID && argv[0] != JS_GetEmptyStringValue(cx))
+ {
+ body = enc_string(cx, argv[0], &bodylen);
+ if(!body)
+ {
+ JS_ReportError(cx, "Failed to encode body.");
+ goto done;
+ }
+ }
+
+ ret = go(cx, obj, http, body, bodylen);
+
+done:
+ if(body) free(body);
+ return ret;
+}
+
+static JSBool
+status(JSContext* cx, JSObject* obj, jsval idval, jsval* vp)
+{
+ HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj);
+
+ if(!http)
+ {
+ JS_ReportError(cx, "Invalid CouchHTTP instance.");
+ return JS_FALSE;
+ }
+
+ if(INT_FITS_IN_JSVAL(http->last_status))
+ {
+ *vp = INT_TO_JSVAL(http->last_status);
+ return JS_TRUE;
+ }
+ else
+ {
+ JS_ReportError(cx, "INTERNAL: Invalid last_status");
+ return JS_FALSE;
+ }
+}
+
+JSClass CouchHTTPClass = {
+ "CouchHTTP",
+ JSCLASS_HAS_PRIVATE
+ | JSCLASS_CONSTRUCT_PROTOTYPE
+ | JSCLASS_HAS_RESERVED_SLOTS(2),
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_EnumerateStub,
+ JS_ResolveStub,
+ JS_ConvertStub,
+ destructor,
+ JSCLASS_NO_OPTIONAL_MEMBERS
+};
+
+JSPropertySpec CouchHTTPProperties[] = {
+ {"status", 0, JSPROP_READONLY, status, NULL},
+ {0, 0, 0, 0, 0}
+};
+
+JSFunctionSpec CouchHTTPFunctions[] = {
+ {"_open", open, 3, 0, 0},
+ {"_setRequestHeader", setheader, 2, 0, 0},
+ {"_send", sendreq, 1, 0, 0},
+ {0, 0, 0, 0, 0}
+};
+
+JSObject*
+install_http(JSContext* cx, JSObject* glbl)
+{
+ JSObject* klass = NULL;
+ HTTPData* http = NULL;
+
+ klass = JS_InitClass(
+ cx,
+ glbl,
+ NULL,
+ &CouchHTTPClass,
+ constructor,
+ 0,
+ CouchHTTPProperties,
+ CouchHTTPFunctions,
+ NULL,
+ NULL
+ );
+
+ if(!klass)
+ {
+ fprintf(stderr, "Failed to initialize CouchHTTP class.\n");
+ return NULL;
+ }
+
+ return klass;
+}
+
+
+// Curl Helpers
+
+typedef struct {
+ HTTPData* http;
+ JSContext* cx;
+ JSObject* resp_headers;
+ char* sendbuf;
+ size_t sendlen;
+ size_t sent;
+ char* recvbuf;
+ size_t recvlen;
+ size_t read;
+} CurlState;
+
+/*
+ * I really hate doing this but this doesn't have to be
+ * uber awesome, it just has to work.
+ */
+CURL* HTTP_HANDLE = NULL;
+char ERRBUF[CURL_ERROR_SIZE];
+
+static size_t send_body(void *ptr, size_t size, size_t nmem, void *data);
+static int seek_body(void *ptr, curl_off_t offset, int origin);
+static size_t recv_body(void *ptr, size_t size, size_t nmem, void *data);
+static size_t recv_header(void *ptr, size_t size, size_t nmem, void *data);
+
+static JSBool
+go(JSContext* cx, JSObject* obj, HTTPData* http, char* body, size_t bodylen)
+{
+ CurlState state;
+ JSString* jsbody;
+ JSBool ret = JS_FALSE;
+ jsval tmp;
+
+ state.cx = cx;
+ state.http = http;
+
+ state.sendbuf = body;
+ state.sendlen = bodylen;
+ state.sent = 0;
+
+ state.recvbuf = NULL;
+ state.recvlen = 0;
+ state.read = 0;
+
+ if(HTTP_HANDLE == NULL)
+ {
+ HTTP_HANDLE = curl_easy_init();
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_READFUNCTION, send_body);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_SEEKFUNCTION,
+ (curl_seek_callback) seek_body);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_HEADERFUNCTION, recv_header);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEFUNCTION, recv_body);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOPROGRESS, 1);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_ERRORBUFFER, ERRBUF);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_COOKIEFILE, "");
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_USERAGENT,
+ "CouchHTTP Client - Relax");
+ }
+
+ if(!HTTP_HANDLE)
+ {
+ JS_ReportError(cx, "Failed to initialize cURL handle.");
+ goto done;
+ }
+
+ if(http->method < 0 || http->method > COPY)
+ {
+ JS_ReportError(cx, "INTERNAL: Unknown method.");
+ goto done;
+ }
+
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_CUSTOMREQUEST, METHODS[http->method]);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOBODY, 0);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 1);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_UPLOAD, 0);
+
+ if(http->method == HEAD)
+ {
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOBODY, 1);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 0);
+ }
+ else if(http->method == POST || http->method == PUT)
+ {
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_UPLOAD, 1);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 0);
+ }
+
+ if(body && bodylen)
+ {
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_INFILESIZE, bodylen);
+ }
+ else
+ {
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_INFILESIZE, 0);
+ }
+
+ //curl_easy_setopt(HTTP_HANDLE, CURLOPT_VERBOSE, 1);
+
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_URL, http->url);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_HTTPHEADER, http->req_headers);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_READDATA, &state);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_SEEKDATA, &state);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEHEADER, &state);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEDATA, &state);
+
+ if(curl_easy_perform(HTTP_HANDLE) != 0)
+ {
+ JS_ReportError(cx, "Failed to execute HTTP request: %s", ERRBUF);
+ goto done;
+ }
+
+ if(!state.resp_headers)
+ {
+ JS_ReportError(cx, "Failed to recieve HTTP headers.");
+ goto done;
+ }
+
+ tmp = OBJECT_TO_JSVAL(state.resp_headers);
+ if(!JS_DefineProperty(
+ cx,
+ obj,
+ "_headers",
+ tmp,
+ NULL,
+ NULL,
+ JSPROP_READONLY
+ ))
+ {
+ JS_ReportError(cx, "INTERNAL: Failed to set response headers.");
+ goto done;
+ }
+
+ if(state.recvbuf) // Is good enough?
+ {
+ state.recvbuf[state.read] = '\0';
+ jsbody = dec_string(cx, state.recvbuf, state.read+1);
+ if(!jsbody)
+ {
+ // If we can't decode the body as UTF-8 we forcefully
+ // convert it to a string by just forcing each byte
+ // to a jschar.
+ jsbody = str_from_binary(cx, state.recvbuf, state.read);
+ if(!jsbody) {
+ if(!JS_IsExceptionPending(cx)) {
+ JS_ReportError(cx, "INTERNAL: Failed to decode body.");
+ }
+ goto done;
+ }
+ }
+ tmp = STRING_TO_JSVAL(jsbody);
+ }
+ else
+ {
+ tmp = JS_GetEmptyStringValue(cx);
+ }
+
+ if(!JS_DefineProperty(
+ cx,
+ obj,
+ "responseText",
+ tmp,
+ NULL,
+ NULL,
+ JSPROP_READONLY
+ ))
+ {
+ JS_ReportError(cx, "INTERNAL: Failed to set responseText.");
+ goto done;
+ }
+
+ ret = JS_TRUE;
+
+done:
+ if(state.recvbuf) JS_free(cx, state.recvbuf);
+ return ret;
+}
+
+static size_t
+send_body(void *ptr, size_t size, size_t nmem, void *data)
+{
+ CurlState* state = (CurlState*) data;
+ size_t length = size * nmem;
+ size_t towrite = state->sendlen - state->sent;
+ if(towrite == 0)
+ {
+ return 0;
+ }
+
+ if(length < towrite) towrite = length;
+
+ //fprintf(stderr, "%lu %lu %lu %lu\n", state->bodyused, state->bodyread, length, towrite);
+
+ memcpy(ptr, state->sendbuf + state->sent, towrite);
+ state->sent += towrite;
+
+ return towrite;
+}
+
+static int
+seek_body(void* ptr, curl_off_t offset, int origin)
+{
+ CurlState* state = (CurlState*) ptr;
+ if(origin != SEEK_SET) return -1;
+
+ state->sent = (size_t) offset;
+ return (int) state->sent;
+}
+
+static size_t
+recv_header(void *ptr, size_t size, size_t nmem, void *data)
+{
+ CurlState* state = (CurlState*) data;
+ char code[4];
+ char* header = (char*) ptr;
+ size_t length = size * nmem;
+ size_t index = 0;
+ JSString* hdr = NULL;
+ jsuint hdrlen;
+ jsval hdrval;
+
+ if(length > 7 && strncasecmp(header, "HTTP/1.", 7) == 0)
+ {
+ if(length < 12)
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ memcpy(code, header+9, 3*sizeof(char));
+ code[3] = '\0';
+ state->http->last_status = atoi(code);
+
+ state->resp_headers = JS_NewArrayObject(state->cx, 0, NULL);
+ if(!state->resp_headers)
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ return length;
+ }
+
+ // We get a notice at the \r\n\r\n after headers.
+ if(length <= 2)
+ {
+ return length;
+ }
+
+ // Append the new header to our array.
+ hdr = dec_string(state->cx, header, length);
+ if(!hdr)
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ if(!JS_GetArrayLength(state->cx, state->resp_headers, &hdrlen))
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ hdrval = STRING_TO_JSVAL(hdr);
+ if(!JS_SetElement(state->cx, state->resp_headers, hdrlen, &hdrval))
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ return length;
+}
+
+static size_t
+recv_body(void *ptr, size_t size, size_t nmem, void *data)
+{
+ CurlState* state = (CurlState*) data;
+ size_t length = size * nmem;
+ char* tmp = NULL;
+
+ if(!state->recvbuf)
+ {
+ state->recvlen = 4096;
+ state->read = 0;
+ state->recvbuf = JS_malloc(state->cx, state->recvlen);
+ }
+
+ if(!state->recvbuf)
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ // +1 so we can add '\0' back up in the go function.
+ while(length+1 > state->recvlen - state->read) state->recvlen *= 2;
+ tmp = JS_realloc(state->cx, state->recvbuf, state->recvlen);
+ if(!tmp) return CURLE_WRITE_ERROR;
+ state->recvbuf = tmp;
+
+ memcpy(state->recvbuf + state->read, ptr, length);
+ state->read += length;
+ return length;
+}
+
+JSString*
+str_from_binary(JSContext* cx, char* data, size_t length)
+{
+ jschar* conv = (jschar*) JS_malloc(cx, length * sizeof(jschar));
+ JSString* ret = NULL;
+ size_t i;
+
+ if(!conv) return NULL;
+
+ for(i = 0; i < length; i++)
+ {
+ conv[i] = (jschar) data[i];
+ }
+
+ ret = JS_NewUCString(cx, conv, length);
+ if(!ret) JS_free(cx, conv);
+
+ return ret;
+}
diff --git a/src/couchdb/priv/couch_js/http.h b/src/couchdb/priv/couch_js/http.h
new file mode 100644
index 00000000..b5f8c70f
--- /dev/null
+++ b/src/couchdb/priv/couch_js/http.h
@@ -0,0 +1,18 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#ifndef COUCH_JS_HTTP_H
+#define COUCH_JS_HTTP_H
+
+JSObject* install_http(JSContext* cx, JSObject* global);
+
+#endif \ No newline at end of file
diff --git a/src/couchdb/priv/couch_js/main.c b/src/couchdb/priv/couch_js/main.c
new file mode 100644
index 00000000..376aa15b
--- /dev/null
+++ b/src/couchdb/priv/couch_js/main.c
@@ -0,0 +1,338 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <jsapi.h>
+#include "config.h"
+
+#include "utf8.h"
+#include "http.h"
+
+int gExitCode = 0;
+
+#ifdef JS_THREADSAFE
+#define SETUP_REQUEST(cx) \
+ JS_SetContextThread(cx); \
+ JS_BeginRequest(cx);
+#define FINISH_REQUEST(cx) \
+ JS_EndRequest(cx); \
+ JS_ClearContextThread(cx);
+#else
+#define SETUP_REQUEST(cx)
+#define FINISH_REQUEST(cx)
+#endif
+
+static JSBool
+evalcx(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval)
+{
+ JSString *str;
+ JSObject *sandbox;
+ JSContext *subcx;
+ const jschar *src;
+ size_t srclen;
+ JSBool ret = JS_FALSE;
+ jsval v;
+
+ sandbox = NULL;
+ if(!JS_ConvertArguments(cx, argc, argv, "S / o", &str, &sandbox))
+ {
+ return JS_FALSE;
+ }
+
+ subcx = JS_NewContext(JS_GetRuntime(cx), 8L * 1024L);
+ if(!subcx)
+ {
+ JS_ReportOutOfMemory(cx);
+ return JS_FALSE;
+ }
+
+ SETUP_REQUEST(subcx);
+
+ src = JS_GetStringChars(str);
+ srclen = JS_GetStringLength(str);
+
+ if(!sandbox)
+ {
+ sandbox = JS_NewObject(subcx, NULL, NULL, NULL);
+ if(!sandbox || !JS_InitStandardClasses(subcx, sandbox)) goto done;
+ }
+
+ if(srclen == 0)
+ {
+ *rval = OBJECT_TO_JSVAL(sandbox);
+ }
+ else
+ {
+ JS_EvaluateUCScript(subcx, sandbox, src, srclen, NULL, 0, rval);
+ }
+
+ ret = JS_TRUE;
+
+done:
+ FINISH_REQUEST(subcx);
+ JS_DestroyContext(subcx);
+ return ret;
+}
+
+static JSBool
+gc(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval)
+{
+ JS_GC(cx);
+ return JS_TRUE;
+}
+
+static JSBool
+print(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval)
+{
+ uintN i;
+ char *bytes;
+
+ for(i = 0; i < argc; i++)
+ {
+ bytes = enc_string(cx, argv[i], NULL);
+ if(!bytes) return JS_FALSE;
+
+ fprintf(stdout, "%s%s", i ? " " : "", bytes);
+ JS_free(cx, bytes);
+ }
+
+ fputc('\n', stdout);
+ fflush(stdout);
+ return JS_TRUE;
+}
+
+static JSBool
+quit(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval)
+{
+ JS_ConvertArguments(cx, argc, argv, "/ i", &gExitCode);
+ return JS_FALSE;
+}
+
+static char*
+readfp(JSContext* cx, FILE* fp, size_t* buflen)
+{
+ char* bytes = NULL;
+ char* tmp = NULL;
+ size_t used = 0;
+ size_t byteslen = 256;
+ size_t readlen = 0;
+
+ bytes = JS_malloc(cx, byteslen);
+ if(bytes == NULL) return NULL;
+
+ while((readlen = js_fgets(bytes+used, byteslen-used, stdin)) > 0)
+ {
+ used += readlen;
+
+ if(bytes[used-1] == '\n')
+ {
+ bytes[used-1] = '\0';
+ break;
+ }
+
+ // Double our buffer and read more.
+ byteslen *= 2;
+ tmp = JS_realloc(cx, bytes, byteslen);
+ if(!tmp)
+ {
+ JS_free(cx, bytes);
+ return NULL;
+ }
+ bytes = tmp;
+ }
+
+ *buflen = used;
+ return bytes;
+}
+
+static JSBool
+readline(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ jschar *chars;
+ JSString *str;
+ char* bytes;
+ char* tmp;
+ size_t byteslen;
+
+ /* GC Occasionally */
+ JS_MaybeGC(cx);
+
+ bytes = readfp(cx, stdin, &byteslen);
+ if(!bytes) return JS_FALSE;
+
+ /* Treat the empty string specially */
+ if(byteslen == 0)
+ {
+ *rval = JS_GetEmptyStringValue(cx);
+ JS_free(cx, bytes);
+ return JS_TRUE;
+ }
+
+ /* Shrink the buffer to the real size */
+ tmp = JS_realloc(cx, bytes, byteslen);
+ if(!tmp)
+ {
+ JS_free(cx, bytes);
+ return JS_FALSE;
+ }
+ bytes = tmp;
+
+ str = dec_string(cx, bytes, byteslen);
+ JS_free(cx, bytes);
+
+ if(!str) return JS_FALSE;
+
+ *rval = STRING_TO_JSVAL(str);
+
+ return JS_TRUE;
+}
+
+static JSBool
+seal(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ JSObject *target;
+ JSBool deep = JS_FALSE;
+
+ if (!JS_ConvertArguments(cx, argc, argv, "o/b", &target, &deep))
+ return JS_FALSE;
+ if (!target)
+ return JS_TRUE;
+ return JS_SealObject(cx, target, deep);
+}
+
+static void
+execute_script(JSContext *cx, JSObject *obj, const char *filename) {
+ FILE *file;
+ JSScript *script;
+ jsval result;
+
+ if(!filename || strcmp(filename, "-") == 0)
+ {
+ file = stdin;
+ }
+ else
+ {
+ file = fopen(filename, "r");
+ if (!file)
+ {
+ fprintf(stderr, "could not open script file %s\n", filename);
+ gExitCode = 1;
+ return;
+ }
+ }
+
+ script = JS_CompileFileHandle(cx, obj, filename, file);
+ if(script)
+ {
+ JS_ExecuteScript(cx, obj, script, &result);
+ JS_DestroyScript(cx, script);
+ }
+}
+
+static void
+printerror(JSContext *cx, const char *mesg, JSErrorReport *report)
+{
+ if(!report || !JSREPORT_IS_WARNING(report->flags))
+ {
+ fprintf(stderr, "%s\n", mesg);
+ }
+}
+
+static JSFunctionSpec global_functions[] = {
+ {"evalcx", evalcx, 0, 0, 0},
+ {"gc", gc, 0, 0, 0},
+ {"print", print, 0, 0, 0},
+ {"quit", quit, 0, 0, 0},
+ {"readline", readline, 0, 0, 0},
+ {"seal", seal, 0, 0, 0},
+ {0, 0, 0, 0, 0}
+};
+
+static JSClass global_class = {
+ "GlobalClass",
+ JSCLASS_GLOBAL_FLAGS,
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_EnumerateStub,
+ JS_ResolveStub,
+ JS_ConvertStub,
+ JS_FinalizeStub,
+ JSCLASS_NO_OPTIONAL_MEMBERS
+};
+
+int
+main(int argc, const char * argv[])
+{
+ JSRuntime* rt = NULL;
+ JSContext* cx = NULL;
+ JSObject* global = NULL;
+ JSFunctionSpec* sp = NULL;
+ int i = 0;
+
+ rt = JS_NewRuntime(64L * 1024L * 1024L);
+ if (!rt) return 1;
+
+ cx = JS_NewContext(rt, 8L * 1024L);
+ if (!cx) return 1;
+
+ JS_SetErrorReporter(cx, printerror);
+ JS_ToggleOptions(cx, JSOPTION_XML);
+
+ SETUP_REQUEST(cx);
+
+ global = JS_NewObject(cx, &global_class, NULL, NULL);
+ if (!global) return 1;
+ if (!JS_InitStandardClasses(cx, global)) return 1;
+
+ for(sp = global_functions; sp->name != NULL; sp++)
+ {
+ if(!JS_DefineFunction(cx, global,
+ sp->name, sp->call, sp->nargs, sp->flags))
+ {
+ fprintf(stderr, "Failed to create function: %s\n", sp->name);
+ return 1;
+ }
+ }
+
+ if(!install_http(cx, global))
+ {
+ return 1;
+ }
+
+ JS_SetGlobalObject(cx, global);
+
+ if(argc > 2)
+ {
+ fprintf(stderr, "incorrect number of arguments\n\n");
+ fprintf(stderr, "usage: %s <scriptfile>\n", argv[0]);
+ return 2;
+ }
+
+ if(argc == 0)
+ {
+ execute_script(cx, global, NULL);
+ }
+ else
+ {
+ execute_script(cx, global, argv[1]);
+ }
+
+ FINISH_REQUEST(cx);
+
+ JS_DestroyContext(cx);
+ JS_DestroyRuntime(rt);
+ JS_ShutDown();
+
+ return gExitCode;
+}
diff --git a/src/couchdb/priv/couch_js/utf8.c b/src/couchdb/priv/couch_js/utf8.c
new file mode 100644
index 00000000..699a6fee
--- /dev/null
+++ b/src/couchdb/priv/couch_js/utf8.c
@@ -0,0 +1,286 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include <jsapi.h>
+
+static int
+enc_char(uint8 *utf8Buffer, uint32 ucs4Char)
+{
+ int utf8Length = 1;
+
+ if (ucs4Char < 0x80)
+ {
+ *utf8Buffer = (uint8)ucs4Char;
+ }
+ else
+ {
+ int i;
+ uint32 a = ucs4Char >> 11;
+ utf8Length = 2;
+ while(a)
+ {
+ a >>= 5;
+ utf8Length++;
+ }
+ i = utf8Length;
+ while(--i)
+ {
+ utf8Buffer[i] = (uint8)((ucs4Char & 0x3F) | 0x80);
+ ucs4Char >>= 6;
+ }
+ *utf8Buffer = (uint8)(0x100 - (1 << (8-utf8Length)) + ucs4Char);
+ }
+
+ return utf8Length;
+}
+
+static JSBool
+enc_charbuf(const jschar* src, size_t srclen, char* dst, size_t* dstlenp)
+{
+ size_t i;
+ size_t utf8Len;
+ size_t dstlen = *dstlenp;
+ size_t origDstlen = dstlen;
+ jschar c;
+ jschar c2;
+ uint32 v;
+ uint8 utf8buf[6];
+
+ if(!dst)
+ {
+ dstlen = origDstlen = (size_t) -1;
+ }
+
+ while(srclen)
+ {
+ c = *src++;
+ srclen--;
+
+ if((c >= 0xDC00) && (c <= 0xDFFF)) goto bad_surrogate;
+
+ if(c < 0xD800 || c > 0xDBFF)
+ {
+ v = c;
+ }
+ else
+ {
+ if(srclen < 1) goto buffer_too_small;
+ c2 = *src++;
+ srclen--;
+ if ((c2 < 0xDC00) || (c2 > 0xDFFF))
+ {
+ c = c2;
+ goto bad_surrogate;
+ }
+ v = ((c - 0xD800) << 10) + (c2 - 0xDC00) + 0x10000;
+ }
+ if(v < 0x0080)
+ {
+ /* no encoding necessary - performance hack */
+ if(!dstlen) goto buffer_too_small;
+ if(dst) *dst++ = (char) v;
+ utf8Len = 1;
+ }
+ else
+ {
+ utf8Len = enc_char(utf8buf, v);
+ if(utf8Len > dstlen) goto buffer_too_small;
+ if(dst)
+ {
+ for (i = 0; i < utf8Len; i++)
+ {
+ *dst++ = (char) utf8buf[i];
+ }
+ }
+ }
+ dstlen -= utf8Len;
+ }
+
+ *dstlenp = (origDstlen - dstlen);
+ return JS_TRUE;
+
+bad_surrogate:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+
+buffer_too_small:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+}
+
+char*
+enc_string(JSContext* cx, jsval arg, size_t* buflen)
+{
+ JSString* str = NULL;
+ jschar* src = NULL;
+ char* bytes = NULL;
+ size_t srclen = 0;
+ size_t byteslen = 0;
+
+ str = JS_ValueToString(cx, arg);
+ if(!str) goto error;
+
+ src = JS_GetStringChars(str);
+ srclen = JS_GetStringLength(str);
+
+ if(!enc_charbuf(src, srclen, NULL, &byteslen)) goto error;
+
+ bytes = JS_malloc(cx, (byteslen) + 1);
+ bytes[byteslen] = 0;
+
+ if(!enc_charbuf(src, srclen, bytes, &byteslen)) goto error;
+
+ if(buflen) *buflen = byteslen;
+ goto success;
+
+error:
+ if(bytes != NULL) JS_free(cx, bytes);
+ bytes = NULL;
+
+success:
+ return bytes;
+}
+
+static uint32
+dec_char(const uint8 *utf8Buffer, int utf8Length)
+{
+ uint32 ucs4Char;
+ uint32 minucs4Char;
+
+ /* from Unicode 3.1, non-shortest form is illegal */
+ static const uint32 minucs4Table[] = {
+ 0x00000080, 0x00000800, 0x0001000, 0x0020000, 0x0400000
+ };
+
+ if (utf8Length == 1)
+ {
+ ucs4Char = *utf8Buffer;
+ }
+ else
+ {
+ ucs4Char = *utf8Buffer++ & ((1<<(7-utf8Length))-1);
+ minucs4Char = minucs4Table[utf8Length-2];
+ while(--utf8Length)
+ {
+ ucs4Char = ucs4Char<<6 | (*utf8Buffer++ & 0x3F);
+ }
+ if(ucs4Char < minucs4Char || ucs4Char == 0xFFFE || ucs4Char == 0xFFFF)
+ {
+ ucs4Char = 0xFFFD;
+ }
+ }
+
+ return ucs4Char;
+}
+
+static JSBool
+dec_charbuf(const char *src, size_t srclen, jschar *dst, size_t *dstlenp)
+{
+ uint32 v;
+ size_t offset = 0;
+ size_t j;
+ size_t n;
+ size_t dstlen = *dstlenp;
+ size_t origDstlen = dstlen;
+
+ if(!dst) dstlen = origDstlen = (size_t) -1;
+
+ while(srclen)
+ {
+ v = (uint8) *src;
+ n = 1;
+
+ if(v & 0x80)
+ {
+ while(v & (0x80 >> n))
+ {
+ n++;
+ }
+
+ if(n > srclen) goto buffer_too_small;
+ if(n == 1 || n > 6) goto bad_character;
+
+ for(j = 1; j < n; j++)
+ {
+ if((src[j] & 0xC0) != 0x80) goto bad_character;
+ }
+
+ v = dec_char((const uint8 *) src, n);
+ if(v >= 0x10000)
+ {
+ v -= 0x10000;
+
+ if(v > 0xFFFFF || dstlen < 2)
+ {
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+ }
+
+ if(dstlen < 2) goto buffer_too_small;
+
+ if(dst)
+ {
+ *dst++ = (jschar)((v >> 10) + 0xD800);
+ v = (jschar)((v & 0x3FF) + 0xDC00);
+ }
+ dstlen--;
+ }
+ }
+
+ if(!dstlen) goto buffer_too_small;
+ if(dst) *dst++ = (jschar) v;
+
+ dstlen--;
+ offset += n;
+ src += n;
+ srclen -= n;
+ }
+
+ *dstlenp = (origDstlen - dstlen);
+ return JS_TRUE;
+
+bad_character:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+
+buffer_too_small:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+}
+
+JSString*
+dec_string(JSContext* cx, const char* bytes, size_t byteslen)
+{
+ JSString* str = NULL;
+ jschar* chars = NULL;
+ size_t charslen;
+
+ if(!dec_charbuf(bytes, byteslen, NULL, &charslen)) goto error;
+
+ chars = JS_malloc(cx, (charslen + 1) * sizeof(jschar));
+ if(!chars) return NULL;
+ chars[charslen] = 0;
+
+ if(!dec_charbuf(bytes, byteslen, chars, &charslen)) goto error;
+
+ str = JS_NewUCString(cx, chars, charslen - 1);
+ if(!str) goto error;
+
+ goto success;
+
+error:
+ if(chars != NULL) JS_free(cx, chars);
+ str = NULL;
+
+success:
+ return str;
+} \ No newline at end of file
diff --git a/src/couchdb/priv/couch_js/utf8.h b/src/couchdb/priv/couch_js/utf8.h
new file mode 100644
index 00000000..00f6b736
--- /dev/null
+++ b/src/couchdb/priv/couch_js/utf8.h
@@ -0,0 +1,19 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#ifndef COUCH_JS_UTF_8_H
+#define COUCH_JS_UTF_8_H
+
+char* enc_string(JSContext* cx, jsval arg, size_t* buflen);
+JSString* dec_string(JSContext* cx, const char* buf, size_t buflen);
+
+#endif \ No newline at end of file
diff --git a/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c b/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c
new file mode 100644
index 00000000..06782315
--- /dev/null
+++ b/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c
@@ -0,0 +1,145 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+// Do what 2 lines of shell script in couchspawnkillable does...
+// * Create a new suspended process with the same (duplicated) standard
+// handles as us.
+// * Write a line to stdout, consisting of the path to ourselves, plus
+// '--kill {pid}' where {pid} is the PID of the newly created process.
+// * Un-suspend the new process.
+// * Wait for the process to terminate.
+// * Terminate with the child's exit-code.
+
+// Later, couch will call us with --kill and the PID, so we dutifully
+// terminate the specified PID.
+
+#include <stdlib.h>
+#include "windows.h"
+
+char *get_child_cmdline(int argc, char **argv)
+{
+ // make a new command-line, but skipping me.
+ // XXX - todo - spaces etc in args???
+ int i;
+ char *p, *cmdline;
+ int nchars = 0;
+ int nthis = 1;
+ for (i=1;i<argc;i++)
+ nchars += strlen(argv[i])+1;
+ cmdline = p = malloc(nchars+1);
+ if (!cmdline)
+ return NULL;
+ for (i=1;i<argc;i++) {
+ nthis = strlen(argv[i]);
+ strncpy(p, argv[i], nthis);
+ p[nthis] = ' ';
+ p += nthis+1;
+ }
+ // Replace the last space we added above with a '\0'
+ cmdline[nchars-1] = '\0';
+ return cmdline;
+}
+
+// create the child process, returning 0, or the exit-code we will
+// terminate with.
+int create_child(int argc, char **argv, PROCESS_INFORMATION *pi)
+{
+ char buf[1024];
+ DWORD dwcreate;
+ STARTUPINFO si;
+ char *cmdline;
+ if (argc < 2)
+ return 1;
+ cmdline = get_child_cmdline(argc, argv);
+ if (!cmdline)
+ return 2;
+
+ memset(&si, 0, sizeof(si));
+ si.cb = sizeof(si);
+ // depending on how *our* parent is started, we may or may not have
+ // a valid stderr stream - so although we try and duplicate it, only
+ // failing to duplicate stdin and stdout are considered fatal.
+ if (!DuplicateHandle(GetCurrentProcess(),
+ GetStdHandle(STD_INPUT_HANDLE),
+ GetCurrentProcess(),
+ &si.hStdInput,
+ 0,
+ TRUE, // inheritable
+ DUPLICATE_SAME_ACCESS) ||
+ !DuplicateHandle(GetCurrentProcess(),
+ GetStdHandle(STD_OUTPUT_HANDLE),
+ GetCurrentProcess(),
+ &si.hStdOutput,
+ 0,
+ TRUE, // inheritable
+ DUPLICATE_SAME_ACCESS)) {
+ return 3;
+ }
+ DuplicateHandle(GetCurrentProcess(),
+ GetStdHandle(STD_ERROR_HANDLE),
+ GetCurrentProcess(),
+ &si.hStdError,
+ 0,
+ TRUE, // inheritable
+ DUPLICATE_SAME_ACCESS);
+
+ si.dwFlags = STARTF_USESTDHANDLES;
+ dwcreate = CREATE_SUSPENDED;
+ if (!CreateProcess( NULL, cmdline,
+ NULL,
+ NULL,
+ TRUE, // inherit handles
+ dwcreate,
+ NULL, // environ
+ NULL, // cwd
+ &si,
+ pi))
+ return 4;
+ return 0;
+}
+
+// and here we go...
+int main(int argc, char **argv)
+{
+ char out_buf[1024];
+ int rc;
+ DWORD cbwritten;
+ DWORD exitcode;
+ PROCESS_INFORMATION pi;
+ if (argc==3 && strcmp(argv[1], "--kill")==0) {
+ HANDLE h = OpenProcess(PROCESS_TERMINATE, 0, atoi(argv[2]));
+ if (!h)
+ return 1;
+ if (!TerminateProcess(h, 0))
+ return 2;
+ CloseHandle(h);
+ return 0;
+ }
+ // spawn the new suspended process
+ rc = create_child(argc, argv, &pi);
+ if (rc)
+ return rc;
+ // Write the 'terminate' command, which includes this PID, back to couch.
+ // *sob* - what about spaces etc?
+ sprintf_s(out_buf, sizeof(out_buf), "%s --kill %d\n",
+ argv[0], pi.dwProcessId);
+ WriteFile(GetStdHandle(STD_OUTPUT_HANDLE), out_buf, strlen(out_buf),
+ &cbwritten, NULL);
+ // Let the child process go...
+ ResumeThread(pi.hThread);
+ // Wait for the process to terminate so we can reflect the exit code
+ // back to couch.
+ WaitForSingleObject(pi.hProcess, INFINITE);
+ if (!GetExitCodeProcess(pi.hProcess, &exitcode))
+ return 6;
+ return exitcode;
+}
diff --git a/src/fabric.erl b/src/fabric.erl
deleted file mode 100644
index 1be97a98..00000000
--- a/src/fabric.erl
+++ /dev/null
@@ -1,225 +0,0 @@
--module(fabric).
-
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-% DBs
--export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1,
- delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
- set_security/3, get_revs_limit/1, get_security/1]).
-
-% Documents
--export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3,
- update_docs/3, att_receiver/2]).
-
-% Views
--export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6,
- get_view_group_info/2]).
-
-% miscellany
--export([design_docs/1, reset_validation_funs/1]).
-
--include("fabric.hrl").
-
-% db operations
-
-all_dbs() ->
- all_dbs(<<>>).
-
-all_dbs(Prefix) when is_list(Prefix) ->
- all_dbs(list_to_binary(Prefix));
-all_dbs(Prefix) when is_binary(Prefix) ->
- Length = byte_size(Prefix),
- MatchingDbs = ets:foldl(fun(#shard{dbname=DbName}, Acc) ->
- case DbName of
- <<Prefix:Length/binary, _/binary>> ->
- [DbName | Acc];
- _ ->
- Acc
- end
- end, [], partitions),
- {ok, lists:usort(MatchingDbs)}.
-
-get_db_info(DbName) ->
- fabric_db_info:go(dbname(DbName)).
-
-get_doc_count(DbName) ->
- fabric_db_doc_count:go(dbname(DbName)).
-
-create_db(DbName) ->
- create_db(DbName, []).
-
-create_db(DbName, Options) ->
- fabric_db_create:go(dbname(DbName), opts(Options)).
-
-delete_db(DbName) ->
- delete_db(DbName, []).
-
-delete_db(DbName, Options) ->
- fabric_db_delete:go(dbname(DbName), opts(Options)).
-
-set_revs_limit(DbName, Limit, Options) when is_integer(Limit), Limit > 0 ->
- fabric_db_meta:set_revs_limit(dbname(DbName), Limit, opts(Options)).
-
-get_revs_limit(DbName) ->
- {ok, Db} = fabric_util:get_db(dbname(DbName)),
- try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end.
-
-set_security(DbName, SecObj, Options) ->
- fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
-
-get_security(DbName) ->
- {ok, Db} = fabric_util:get_db(dbname(DbName)),
- try couch_db:get_security(Db) after catch couch_db:close(Db) end.
-
-% doc operations
-open_doc(DbName, Id, Options) ->
- fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)).
-
-open_revs(DbName, Id, Revs, Options) ->
- fabric_doc_open_revs:go(dbname(DbName), docid(Id), Revs, opts(Options)).
-
-get_missing_revs(DbName, IdsRevs) when is_list(IdsRevs) ->
- Sanitized = [idrevs(IdR) || IdR <- IdsRevs],
- fabric_doc_missing_revs:go(dbname(DbName), Sanitized).
-
-update_doc(DbName, Doc, Options) ->
- case update_docs(DbName, [Doc], opts(Options)) of
- {ok, [{ok, NewRev}]} ->
- {ok, NewRev};
- {ok, [Error]} ->
- throw(Error);
- {ok, []} ->
- % replication success
- #doc{revs = {Pos, [RevId | _]}} = doc(Doc),
- {ok, {Pos, RevId}}
- end.
-
-update_docs(DbName, Docs, Options) ->
- try fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options))
- catch {aborted, PreCommitFailures} ->
- {aborted, PreCommitFailures}
- end.
-
-att_receiver(Req, Length) ->
- fabric_doc_attachments:receiver(Req, Length).
-
-all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when
- is_function(Callback, 2) ->
- fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0).
-
-changes(DbName, Callback, Acc0, Options) ->
- % TODO use a keylist for Options instead of #changes_args, BugzID 10281
- Feed = Options#changes_args.feed,
- fabric_view_changes:go(dbname(DbName), Feed, Options, Callback, Acc0).
-
-query_view(DbName, DesignName, ViewName) ->
- query_view(DbName, DesignName, ViewName, #view_query_args{}).
-
-query_view(DbName, DesignName, ViewName, QueryArgs) ->
- Callback = fun default_callback/2,
- query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs).
-
-query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) ->
- Db = dbname(DbName), View = name(ViewName),
- case is_reduce_view(Db, Design, View, QueryArgs) of
- true ->
- Mod = fabric_view_reduce;
- false ->
- Mod = fabric_view_map
- end,
- Mod:go(Db, Design, View, QueryArgs, Callback, Acc0).
-
-get_view_group_info(DbName, DesignId) ->
- fabric_group_info:go(dbname(DbName), design_doc(DesignId)).
-
-design_docs(DbName) ->
- QueryArgs = #view_query_args{start_key = <<"_design/">>, include_docs=true},
- Callback = fun({total_and_offset, _, _}, []) ->
- {ok, []};
- ({row, {Props}}, Acc) ->
- case couch_util:get_value(id, Props) of
- <<"_design/", _/binary>> ->
- {ok, [couch_util:get_value(doc, Props) | Acc]};
- _ ->
- {stop, Acc}
- end;
- (complete, Acc) ->
- {ok, lists:reverse(Acc)}
- end,
- fabric:all_docs(dbname(DbName), Callback, [], QueryArgs).
-
-reset_validation_funs(DbName) ->
- [rexi:cast(Node, {fabric_rpc, reset_validation_funs, [Name]}) ||
- #shard{node=Node, name=Name} <- mem3:shards(DbName)].
-
-%% some simple type validation and transcoding
-
-dbname(DbName) when is_list(DbName) ->
- list_to_binary(DbName);
-dbname(DbName) when is_binary(DbName) ->
- DbName;
-dbname(#db{name=Name}) ->
- Name;
-dbname(DbName) ->
- erlang:error({illegal_database_name, DbName}).
-
-name(Thing) ->
- couch_util:to_binary(Thing).
-
-docid(DocId) when is_list(DocId) ->
- list_to_binary(DocId);
-docid(DocId) when is_binary(DocId) ->
- DocId;
-docid(DocId) ->
- erlang:error({illegal_docid, DocId}).
-
-docs(Docs) when is_list(Docs) ->
- [doc(D) || D <- Docs];
-docs(Docs) ->
- erlang:error({illegal_docs_list, Docs}).
-
-doc(#doc{} = Doc) ->
- Doc;
-doc({_} = Doc) ->
- couch_doc:from_json_obj(Doc);
-doc(Doc) ->
- erlang:error({illegal_doc_format, Doc}).
-
-design_doc(#doc{} = DDoc) ->
- DDoc;
-design_doc(DocId) when is_list(DocId) ->
- design_doc(list_to_binary(DocId));
-design_doc(<<"_design/", _/binary>> = DocId) ->
- DocId;
-design_doc(GroupName) ->
- <<"_design/", GroupName/binary>>.
-
-idrevs({Id, Revs}) when is_list(Revs) ->
- {docid(Id), [rev(R) || R <- Revs]}.
-
-rev(Rev) when is_list(Rev); is_binary(Rev) ->
- couch_doc:parse_rev(Rev);
-rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
- Rev.
-
-opts(Options) ->
- case couch_util:get_value(user_ctx, Options) of
- undefined ->
- case erlang:get(user_ctx) of
- #user_ctx{} = Ctx ->
- [{user_ctx, Ctx} | Options];
- _ ->
- Options
- end;
- _ ->
- Options
- end.
-
-default_callback(complete, Acc) ->
- {ok, lists:reverse(Acc)};
-default_callback(Row, Acc) ->
- {ok, [Row | Acc]}.
-
-is_reduce_view(_, _, _, #view_query_args{view_type=Reduce}) ->
- Reduce =:= reduce.
diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl
deleted file mode 100644
index d10bcc22..00000000
--- a/src/fabric_db_create.erl
+++ /dev/null
@@ -1,65 +0,0 @@
--module(fabric_db_create).
--export([go/2]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/\\s.]*$").
-
-%% @doc Create a new database, and all its partition files across the cluster
-%% Options is proplist with user_ctx, n, q
-go(DbName, Options) ->
- case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of
- match ->
- Shards = mem3:choose_shards(DbName, Options),
- Doc = make_document(Shards),
- Workers = fabric_util:submit_jobs(Shards, create_db, [Options, Doc]),
- Acc0 = fabric_dict:init(Workers, nil),
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, _} ->
- ok;
- Else ->
- Else
- end;
- nomatch ->
- {error, illegal_database_name}
- end.
-
-handle_message(Msg, Shard, Counters) ->
- C1 = fabric_dict:store(Shard, Msg, Counters),
- case fabric_dict:any(nil, C1) of
- true ->
- {ok, C1};
- false ->
- final_answer(C1)
- end.
-
-make_document([#shard{dbname=DbName}|_] = Shards) ->
- {RawOut, ByNodeOut, ByRangeOut} =
- lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) ->
- Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-",
- couch_util:to_hex(<<E:32/integer>>)]),
- Node = couch_util:to_binary(N),
- {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode),
- orddict:append(Range, Node, ByRange)}
- end, {[], [], []}, Shards),
- #doc{id=DbName, body = {[
- {<<"changelog">>, lists:sort(RawOut)},
- {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
- {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
- ]}}.
-
-final_answer(Counters) ->
- Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists],
- case fabric_view:is_progress_possible(Successes) of
- true ->
- case lists:keymember(file_exists, 2, Successes) of
- true ->
- {error, file_exists};
- false ->
- {stop, ok}
- end;
- false ->
- {error, internal_server_error}
- end.
diff --git a/src/fabric_db_delete.erl b/src/fabric_db_delete.erl
deleted file mode 100644
index 57eefa9e..00000000
--- a/src/fabric_db_delete.erl
+++ /dev/null
@@ -1,41 +0,0 @@
--module(fabric_db_delete).
--export([go/2]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-go(DbName, Options) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, delete_db, [Options, DbName]),
- Acc0 = fabric_dict:init(Workers, nil),
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, ok} ->
- ok;
- {ok, not_found} ->
- erlang:error(database_does_not_exist);
- Error ->
- Error
- end.
-
-handle_message(Msg, Shard, Counters) ->
- C1 = fabric_dict:store(Shard, Msg, Counters),
- case fabric_dict:any(nil, C1) of
- true ->
- {ok, C1};
- false ->
- final_answer(C1)
- end.
-
-final_answer(Counters) ->
- Successes = [X || {_, M} = X <- Counters, M == ok orelse M == not_found],
- case fabric_view:is_progress_possible(Successes) of
- true ->
- case lists:keymember(ok, 2, Successes) of
- true ->
- {stop, ok};
- false ->
- {stop, not_found}
- end;
- false ->
- {error, internal_server_error}
- end.
diff --git a/src/fabric_db_doc_count.erl b/src/fabric_db_doc_count.erl
deleted file mode 100644
index 12d5cbf8..00000000
--- a/src/fabric_db_doc_count.erl
+++ /dev/null
@@ -1,32 +0,0 @@
--module(fabric_db_doc_count).
-
--export([go/1]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, get_doc_count, []),
- Acc0 = {fabric_dict:init(Workers, nil), 0},
- fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
-
-handle_message({ok, Count}, Shard, {Counters, Acc}) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, {Counters, Acc}};
- nil ->
- C1 = fabric_dict:store(Shard, ok, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(nil, C2) of
- true ->
- {ok, {C2, Count+Acc}};
- false ->
- {stop, Count+Acc}
- end
- end;
-handle_message(_, _, Acc) ->
- {ok, Acc}.
-
diff --git a/src/fabric_db_info.erl b/src/fabric_db_info.erl
deleted file mode 100644
index 3758c5c3..00000000
--- a/src/fabric_db_info.erl
+++ /dev/null
@@ -1,52 +0,0 @@
--module(fabric_db_info).
-
--export([go/1]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-go(DbName) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
- Acc0 = {fabric_dict:init(Workers, nil), []},
- fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
-
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, {Counters, Acc}};
- nil ->
- C1 = fabric_dict:store(Shard, ok, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(nil, C2) of
- true ->
- {ok, {C2, [Info|Acc]}};
- false ->
- {stop, [{db_name,Name}|merge_results(lists:flatten([Info|Acc]))]}
- end
- end;
-handle_message(_, _, Acc) ->
- {ok, Acc}.
-
-merge_results(Info) ->
- Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end,
- orddict:new(), Info),
- orddict:fold(fun
- (doc_count, X, Acc) ->
- [{doc_count, lists:sum(X)} | Acc];
- (doc_del_count, X, Acc) ->
- [{doc_del_count, lists:sum(X)} | Acc];
- (update_seq, X, Acc) ->
- [{update_seq, lists:sum(X)} | Acc];
- (purge_seq, X, Acc) ->
- [{purge_seq, lists:sum(X)} | Acc];
- (compact_running, X, Acc) ->
- [{compact_running, lists:member(true, X)} | Acc];
- (disk_size, X, Acc) ->
- [{disk_size, lists:sum(X)} | Acc];
- (disk_format_version, X, Acc) ->
- [{disk_format_version, lists:max(X)} | Acc];
- (_, _, Acc) ->
- Acc
- end, [{instance_start_time, <<"0">>}], Dict).
diff --git a/src/fabric_db_meta.erl b/src/fabric_db_meta.erl
deleted file mode 100644
index ee15fc72..00000000
--- a/src/fabric_db_meta.erl
+++ /dev/null
@@ -1,35 +0,0 @@
--module(fabric_db_meta).
-
--export([set_revs_limit/3, set_security/3]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-set_revs_limit(DbName, Limit, Options) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, set_revs_limit, [Limit, Options]),
- Waiting = length(Workers) - 1,
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of
- {ok, ok} ->
- ok;
- Error ->
- Error
- end.
-
-set_security(DbName, SecObj, Options) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, set_security, [SecObj, Options]),
- Waiting = length(Workers) - 1,
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of
- {ok, ok} ->
- ok;
- Error ->
- Error
- end.
-
-handle_message(ok, _, 0) ->
- {stop, ok};
-handle_message(ok, _, Waiting) ->
- {ok, Waiting - 1};
-handle_message(Error, _, _Waiting) ->
- {error, Error}. \ No newline at end of file
diff --git a/src/fabric_dict.erl b/src/fabric_dict.erl
deleted file mode 100644
index 42d46b34..00000000
--- a/src/fabric_dict.erl
+++ /dev/null
@@ -1,37 +0,0 @@
--module(fabric_dict).
--compile(export_all).
-
-% Instead of ets, let's use an ordered keylist. We'll need to revisit if we
-% have >> 100 shards, so a private interface is a good idea. - APK June 2010
-
-init(Keys, InitialValue) ->
- orddict:from_list([{Key, InitialValue} || Key <- Keys]).
-
-
-decrement_all(Dict) ->
- [{K,V-1} || {K,V} <- Dict].
-
-store(Key, Value, Dict) ->
- orddict:store(Key, Value, Dict).
-
-erase(Key, Dict) ->
- orddict:erase(Key, Dict).
-
-update_counter(Key, Incr, Dict0) ->
- orddict:update_counter(Key, Incr, Dict0).
-
-
-lookup_element(Key, Dict) ->
- couch_util:get_value(Key, Dict).
-
-size(Dict) ->
- orddict:size(Dict).
-
-any(Value, Dict) ->
- lists:keymember(Value, 2, Dict).
-
-filter(Fun, Dict) ->
- orddict:filter(Fun, Dict).
-
-fold(Fun, Acc0, Dict) ->
- orddict:fold(Fun, Acc0, Dict).
diff --git a/src/fabric_doc_attachments.erl b/src/fabric_doc_attachments.erl
deleted file mode 100644
index aecdaaef..00000000
--- a/src/fabric_doc_attachments.erl
+++ /dev/null
@@ -1,102 +0,0 @@
--module(fabric_doc_attachments).
-
--include("fabric.hrl").
-
-%% couch api calls
--export([receiver/2]).
-
-receiver(_Req, undefined) ->
- <<"">>;
-receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
- exit({unknown_transfer_encoding, Unknown});
-receiver(Req, chunked) ->
- MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
- fun(4096, ChunkFun, ok) ->
- write_chunks(MiddleMan, ChunkFun)
- end;
-receiver(_Req, 0) ->
- <<"">>;
-receiver(Req, Length) when is_integer(Length) ->
- Middleman = spawn(fun() -> middleman(Req, Length) end),
- fun() ->
- Middleman ! {self(), gimme_data},
- receive {Middleman, Data} -> Data end
- end;
-receiver(_Req, Length) ->
- exit({length_not_integer, Length}).
-
-%%
-%% internal
-%%
-
-write_chunks(MiddleMan, ChunkFun) ->
- MiddleMan ! {self(), gimme_data},
- receive
- {MiddleMan, {0, _Footers}} ->
- % MiddleMan ! {self(), done},
- ok;
- {MiddleMan, ChunkRecord} ->
- ChunkFun(ChunkRecord, ok),
- write_chunks(MiddleMan, ChunkFun)
- end.
-
-receive_unchunked_attachment(_Req, 0) ->
- ok;
-receive_unchunked_attachment(Req, Length) ->
- receive {MiddleMan, go} ->
- Data = couch_httpd:recv(Req, 0),
- MiddleMan ! {self(), Data}
- end,
- receive_unchunked_attachment(Req, Length - size(Data)).
-
-middleman(Req, chunked) ->
- % spawn a process to actually receive the uploaded data
- RcvFun = fun(ChunkRecord, ok) ->
- receive {From, go} -> From ! {self(), ChunkRecord} end, ok
- end,
- Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),
-
- % take requests from the DB writers and get data from the receiver
- N = erlang:list_to_integer(couch_config:get("cluster","n")),
- middleman_loop(Receiver, N, dict:new(), 0, []);
-
-middleman(Req, Length) ->
- Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
- N = erlang:list_to_integer(couch_config:get("cluster","n")),
- middleman_loop(Receiver, N, dict:new(), 0, []).
-
-middleman_loop(Receiver, N, Counters, Offset, ChunkList) ->
- receive {From, gimme_data} ->
- % figure out how far along this writer (From) is in the list
- {NewCounters, WhichChunk} = case dict:find(From, Counters) of
- {ok, I} ->
- {dict:update_counter(From, 1, Counters), I};
- error ->
- {dict:store(From, 2, Counters), 1}
- end,
- ListIndex = WhichChunk - Offset,
-
- % talk to the receiver to get another chunk if necessary
- ChunkList1 = if ListIndex > length(ChunkList) ->
- Receiver ! {self(), go},
- receive {Receiver, ChunkRecord} -> ChunkList ++ [ChunkRecord] end;
- true -> ChunkList end,
-
- % reply to the writer
- From ! {self(), lists:nth(ListIndex, ChunkList1)},
-
- % check if we can drop a chunk from the head of the list
- SmallestIndex = dict:fold(fun(_, Val, Acc) -> lists:min([Val,Acc]) end,
- WhichChunk+1, NewCounters),
- Size = dict:size(NewCounters),
-
- {NewChunkList, NewOffset} =
- if Size == N andalso (SmallestIndex - Offset) == 2 ->
- {tl(ChunkList1), Offset+1};
- true ->
- {ChunkList1, Offset}
- end,
- middleman_loop(Receiver, N, NewCounters, NewOffset, NewChunkList)
- after 10000 ->
- ok
- end.
diff --git a/src/fabric_doc_missing_revs.erl b/src/fabric_doc_missing_revs.erl
deleted file mode 100644
index 9a368783..00000000
--- a/src/fabric_doc_missing_revs.erl
+++ /dev/null
@@ -1,64 +0,0 @@
--module(fabric_doc_missing_revs).
-
--export([go/2]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-go(DbName, AllIdsRevs) ->
- Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) ->
- Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}),
- Shard#shard{ref=Ref}
- end, group_idrevs_by_shard(DbName, AllIdsRevs)),
- ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]),
- Acc0 = {length(Workers), ResultDict},
- fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
-
-handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({rexi_EXIT, _, _, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({ok, Results}, _Worker, {1, D0}) ->
- D = update_dict(D0, Results),
- {stop, dict:fold(fun force_reply/3, [], D)};
-handle_message({ok, Results}, _Worker, {WaitingCount, D0}) ->
- D = update_dict(D0, Results),
- case dict:fold(fun maybe_reply/3, {stop, []}, D) of
- continue ->
- % still haven't heard about some Ids
- {ok, {WaitingCount - 1, D}};
- {stop, FinalReply} ->
- {stop, FinalReply}
- end.
-
-force_reply(Id, {nil,Revs}, Acc) ->
- % never heard about this ID, assume it's missing
- [{Id, Revs} | Acc];
-force_reply(_, [], Acc) ->
- Acc;
-force_reply(Id, Revs, Acc) ->
- [{Id, Revs} | Acc].
-
-maybe_reply(_, _, continue) ->
- continue;
-maybe_reply(_, {nil, _}, _) ->
- continue;
-maybe_reply(_, [], {stop, Acc}) ->
- {stop, Acc};
-maybe_reply(Id, Revs, {stop, Acc}) ->
- {stop, [{Id, Revs} | Acc]}.
-
-group_idrevs_by_shard(DbName, IdsRevs) ->
- dict:to_list(lists:foldl(fun({Id, Revs}, D0) ->
- lists:foldl(fun(Shard, D1) ->
- dict:append(Shard, {Id, Revs}, D1)
- end, D0, mem3:shards(DbName,Id))
- end, dict:new(), IdsRevs)).
-
-update_dict(D0, KVs) ->
- lists:foldl(fun({K,V,_}, D1) -> dict:store(K, V, D1) end, D0, KVs).
-
-skip_message({1, Dict}) ->
- {stop, dict:fold(fun force_reply/3, [], Dict)};
-skip_message({WaitingCount, Dict}) ->
- {ok, {WaitingCount-1, Dict}}.
diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl
deleted file mode 100644
index 5c5699c3..00000000
--- a/src/fabric_doc_open.erl
+++ /dev/null
@@ -1,66 +0,0 @@
--module(fabric_doc_open).
-
--export([go/3]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, Id, Options) ->
- Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc,
- [Id, [deleted|Options]]),
- SuppressDeletedDoc = not lists:member(deleted, Options),
- R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")),
- Acc0 = {length(Workers), list_to_integer(R), []},
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, {ok, #doc{deleted=true}}} when SuppressDeletedDoc ->
- {not_found, deleted};
- {ok, Else} ->
- Else;
- Error ->
- Error
- end.
-
-handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({rexi_EXIT, _Reason}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message(Reply, _Worker, {WaitingCount, R, Replies}) ->
- case merge_read_reply(make_key(Reply), Reply, Replies) of
- {_, KeyCount} when KeyCount =:= R ->
- {stop, Reply};
- {NewReplies, KeyCount} when KeyCount < R ->
- if WaitingCount =:= 1 ->
- % last message arrived, but still no quorum
- repair_read_quorum_failure(NewReplies);
- true ->
- {ok, {WaitingCount-1, R, NewReplies}}
- end
- end.
-
-skip_message({1, _R, Replies}) ->
- repair_read_quorum_failure(Replies);
-skip_message({WaitingCount, R, Replies}) ->
- {ok, {WaitingCount-1, R, Replies}}.
-
-merge_read_reply(Key, Reply, Replies) ->
- case lists:keyfind(Key, 1, Replies) of
- false ->
- {[{Key, Reply, 1} | Replies], 1};
- {Key, _, N} ->
- {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
- end.
-
-make_key({ok, #doc{id=Id, revs=Revs}}) ->
- {Id, Revs};
-make_key(Else) ->
- Else.
-
-repair_read_quorum_failure(Replies) ->
- case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of
- [] ->
- {stop, {not_found, missing}};
- [Doc|_] ->
- % TODO merge docs to find the winner as determined by replication
- {stop, {ok, Doc}}
- end. \ No newline at end of file
diff --git a/src/fabric_doc_open_revs.erl b/src/fabric_doc_open_revs.erl
deleted file mode 100644
index 61ff466f..00000000
--- a/src/fabric_doc_open_revs.erl
+++ /dev/null
@@ -1,65 +0,0 @@
--module(fabric_doc_open_revs).
-
--export([go/4]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, Id, Revs, Options) ->
- Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs,
- [Id, Revs, Options]),
- R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")),
- Acc0 = {length(Workers), list_to_integer(R), []},
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, {ok, Reply}} ->
- {ok, Reply};
- Else ->
- Else
- end.
-
-handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({rexi_EXIT, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message(Reply, _Worker, {WaitingCount, R, Replies}) ->
- case merge_read_reply(make_key(Reply), Reply, Replies) of
- {_, KeyCount} when KeyCount =:= R ->
- {stop, Reply};
- {NewReplies, KeyCount} when KeyCount < R ->
- if WaitingCount =:= 1 ->
- % last message arrived, but still no quorum
- repair_read_quorum_failure(NewReplies);
- true ->
- {ok, {WaitingCount-1, R, NewReplies}}
- end
- end.
-
-skip_message({1, _R, Replies}) ->
- repair_read_quorum_failure(Replies);
-skip_message({WaitingCount, R, Replies}) ->
- {ok, {WaitingCount-1, R, Replies}}.
-
-merge_read_reply(Key, Reply, Replies) ->
- case lists:keyfind(Key, 1, Replies) of
- false ->
- {[{Key, Reply, 1} | Replies], 1};
- {Key, _, N} ->
- {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
- end.
-
-make_key({ok, #doc{id=Id, revs=Revs}}) ->
- {Id, Revs};
-make_key(Else) ->
- Else.
-
-repair_read_quorum_failure(Replies) ->
- case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of
- [] ->
- {stop, {not_found, missing}};
- [Doc|_] ->
- % TODO merge docs to find the winner as determined by replication
- {stop, {ok, Doc}}
- end.
-
- \ No newline at end of file
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl
deleted file mode 100644
index f0fcf112..00000000
--- a/src/fabric_doc_update.erl
+++ /dev/null
@@ -1,127 +0,0 @@
--module(fabric_doc_update).
-
--export([go/3]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(_, [], _) ->
- {ok, []};
-go(DbName, AllDocs, Opts) ->
- validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
- Options = lists:delete(all_or_nothing, Opts),
- GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
- Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
- {Shard#shard{ref=Ref}, Docs}
- end, group_docs_by_shard(DbName, AllDocs)),
- {Workers, _} = lists:unzip(GroupedDocs),
- W = couch_util:get_value(w, Options, couch_config:get("cluster","w","2")),
- Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
- dict:from_list([{Doc,[]} || Doc <- AllDocs])},
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, Results} ->
- Reordered = couch_util:reorder_results(AllDocs, Results),
- {ok, [R || R <- Reordered, R =/= noreply]};
- Else ->
- Else
- end.
-
-handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({rexi_EXIT, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({ok, Replies}, Worker, Acc0) ->
- {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
- Docs = couch_util:get_value(Worker, GroupedDocs),
- DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
- case {WaitingCount, dict:size(DocReplyDict)} of
- {1, _} ->
- % last message has arrived, we need to conclude things
- {W, Reply} = dict:fold(fun force_reply/3, {W,[]}, DocReplyDict),
- {stop, Reply};
- {_, DocCount} ->
- % we've got at least one reply for each document, let's take a look
- case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of
- continue ->
- {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}};
- {stop, W, FinalReplies} ->
- {stop, FinalReplies}
- end;
- {_, N} when N < DocCount ->
- % no point in trying to finalize anything yet
- {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}
- end;
-handle_message({missing_stub, Stub}, _, _) ->
- throw({missing_stub, Stub});
-handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
- {_, _, _, GroupedDocs, _} = Acc0,
- Docs = couch_util:get_value(Worker, GroupedDocs),
- handle_message({ok, [X || _D <- Docs]}, Worker, Acc0).
-
-force_reply(Doc, [], {W, Acc}) ->
- {W, [{Doc, {error, internal_server_error}} | Acc]};
-force_reply(Doc, [FirstReply|_] = Replies, {W, Acc}) ->
- case update_quorum_met(W, Replies) of
- {true, Reply} ->
- {W, [{Doc,Reply} | Acc]};
- false ->
- ?LOG_ERROR("write quorum (~p) failed, reply ~p", [W, FirstReply]),
- % TODO make a smarter choice than just picking the first reply
- {W, [{Doc,FirstReply} | Acc]}
- end.
-
-maybe_reply(_, _, continue) ->
- % we didn't meet quorum for all docs, so we're fast-forwarding the fold
- continue;
-maybe_reply(Doc, Replies, {stop, W, Acc}) ->
- case update_quorum_met(W, Replies) of
- {true, Reply} ->
- {stop, W, [{Doc, Reply} | Acc]};
- false ->
- continue
- end.
-
-update_quorum_met(W, Replies) ->
- Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end,
- orddict:new(), Replies),
- case lists:dropwhile(fun({_, Count}) -> Count < W end, Counters) of
- [] ->
- false;
- [{FinalReply, _} | _] ->
- {true, FinalReply}
- end.
-
--spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
-group_docs_by_shard(DbName, Docs) ->
- dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
- lists:foldl(fun(Shard, D1) ->
- dict:append(Shard, Doc, D1)
- end, D0, mem3:shards(DbName,Id))
- end, dict:new(), Docs)).
-
-append_update_replies([], [], DocReplyDict) ->
- DocReplyDict;
-append_update_replies([Doc|Rest], [], Dict0) ->
- % icky, if replicated_changes only errors show up in result
- append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
-append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
- % TODO what if the same document shows up twice in one update_docs call?
- append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
-
-skip_message(Acc0) ->
- % TODO fix this
- {ok, Acc0}.
-
-validate_atomic_update(_, _, false) ->
- ok;
-validate_atomic_update(_DbName, AllDocs, true) ->
- % TODO actually perform the validation. This requires some hackery, we need
- % to basically extract the prep_and_validate_updates function from couch_db
- % and only run that, without actually writing in case of a success.
- Error = {not_implemented, <<"all_or_nothing is not supported yet">>},
- PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) ->
- case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end,
- {{Id, {Pos, RevId}}, Error}
- end, AllDocs),
- throw({aborted, PreCommitFailures}).
diff --git a/src/fabric_group_info.erl b/src/fabric_group_info.erl
deleted file mode 100644
index 04605a66..00000000
--- a/src/fabric_group_info.erl
+++ /dev/null
@@ -1,52 +0,0 @@
--module(fabric_group_info).
-
--export([go/2]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, GroupId) when is_binary(GroupId) ->
- {ok, DDoc} = fabric:open_doc(DbName, GroupId, []),
- go(DbName, DDoc);
-
-go(DbName, #doc{} = DDoc) ->
- Group = couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc),
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, group_info, [Group]),
- Acc0 = {fabric_dict:init(Workers, nil), []},
- fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
-
-handle_message({ok, Info}, Shard, {Counters, Acc}) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, {Counters, Acc}};
- nil ->
- C1 = fabric_dict:store(Shard, ok, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(nil, C2) of
- true ->
- {ok, {C2, [Info|Acc]}};
- false ->
- {stop, merge_results(lists:flatten([Info|Acc]))}
- end
- end;
-handle_message(_, _, Acc) ->
- {ok, Acc}.
-
-merge_results(Info) ->
- Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end,
- orddict:new(), Info),
- orddict:fold(fun
- (signature, [X|_], Acc) ->
- [{signature, X} | Acc];
- (language, [X|_], Acc) ->
- [{language, X} | Acc];
- (disk_size, X, Acc) ->
- [{disk_size, lists:sum(X)} | Acc];
- (compact_running, X, Acc) ->
- [{compact_running, lists:member(true, X)} | Acc];
- (_, _, Acc) ->
- Acc
- end, [], Dict).
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
deleted file mode 100644
index f56e3f68..00000000
--- a/src/fabric_rpc.erl
+++ /dev/null
@@ -1,388 +0,0 @@
--module(fabric_rpc).
-
--export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
--export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]).
--export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]).
--export([create_db/3, delete_db/3, reset_validation_funs/1, set_security/3,
- set_revs_limit/3]).
-
--include("fabric.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--record (view_acc, {
- db,
- limit,
- include_docs,
- offset = nil,
- total_rows,
- reduce_fun = fun couch_db:enum_docs_reduce_to_count/1,
- group_level = 0
-}).
-
-%% rpc endpoints
-%% call to with_db will supply your M:F with a #db{} and then remaining args
-
-all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) ->
- {ok, Db} = couch_db:open(DbName, []),
- #view_query_args{
- start_key = StartKey,
- start_docid = StartDocId,
- end_key = EndKey,
- end_docid = EndDocId,
- limit = Limit,
- skip = Skip,
- include_docs = IncludeDocs,
- direction = Dir,
- inclusive_end = Inclusive
- } = QueryArgs,
- {ok, Total} = couch_db:get_doc_count(Db),
- Acc0 = #view_acc{
- db = Db,
- include_docs = IncludeDocs,
- limit = Limit+Skip,
- total_rows = Total
- },
- EndKeyType = if Inclusive -> end_key; true -> end_key_gt end,
- Options = [
- {dir, Dir},
- {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end},
- {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end}
- ],
- {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options),
- final_response(Total, Acc#view_acc.offset).
-
-changes(DbName, Args, StartSeq) ->
- #changes_args{style=Style, dir=Dir} = Args,
- case couch_db:open(DbName, []) of
- {ok, Db} ->
- Enum = fun changes_enumerator/2,
- Opts = [{dir,Dir}],
- Acc0 = {Db, StartSeq, Args},
- try
- {ok, {_, LastSeq, _}} =
- couch_db:changes_since(Db, Style, StartSeq, Enum, Opts, Acc0),
- rexi:reply({complete, LastSeq})
- after
- couch_db:close(Db)
- end;
- Error ->
- rexi:reply(Error)
- end.
-
-map_view(DbName, DDoc, ViewName, QueryArgs) ->
- {ok, Db} = couch_db:open(DbName, []),
- #view_query_args{
- limit = Limit,
- skip = Skip,
- keys = Keys,
- include_docs = IncludeDocs,
- stale = Stale,
- view_type = ViewType
- } = QueryArgs,
- MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end,
- Group0 = couch_view_group:design_doc_to_view_group(Db, DDoc),
- {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
- View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType),
- {ok, Total} = couch_view:get_row_count(View),
- Acc0 = #view_acc{
- db = Db,
- include_docs = IncludeDocs,
- limit = Limit+Skip,
- total_rows = Total,
- reduce_fun = fun couch_view:reduce_to_count/1
- },
- case Keys of
- nil ->
- Options = couch_httpd_view:make_key_options(QueryArgs),
- {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options);
- _ ->
- Acc = lists:foldl(fun(Key, AccIn) ->
- KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key},
- Options = couch_httpd_view:make_key_options(KeyArgs),
- {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn,
- Options),
- Out
- end, Acc0, Keys)
- end,
- final_response(Total, Acc#view_acc.offset).
-
-reduce_view(DbName, Group0, ViewName, QueryArgs) ->
- {ok, Db} = couch_db:open(DbName, []),
- #view_query_args{
- group_level = GroupLevel,
- limit = Limit,
- skip = Skip,
- keys = Keys,
- stale = Stale
- } = QueryArgs,
- GroupFun = group_rows_fun(GroupLevel),
- MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end,
- {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- {ok, #group{views=Views, def_lang=Lang}} = couch_view_group:request_group(
- Pid, MinSeq),
- {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
- ReduceView = {reduce, NthRed, Lang, View},
- Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
- case Keys of
- nil ->
- Options0 = couch_httpd_view:make_key_options(QueryArgs),
- Options = [{key_group_fun, GroupFun} | Options0],
- couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
- _ ->
- lists:map(fun(Key) ->
- KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key},
- Options0 = couch_httpd_view:make_key_options(KeyArgs),
- Options = [{key_group_fun, GroupFun} | Options0],
- couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
- end, Keys)
- end,
- rexi:reply(complete).
-
-create_db(DbName, Options, Doc) ->
- mem3_util:write_db_doc(Doc),
- rexi:reply(case couch_server:create(DbName, Options) of
- {ok, _} ->
- ok;
- Error ->
- Error
- end).
-
-delete_db(DbName, Options, DocId) ->
- mem3_util:delete_db_doc(DocId),
- rexi:reply(couch_server:delete(DbName, Options)).
-
-get_db_info(DbName) ->
- with_db(DbName, [], {couch_db, get_db_info, []}).
-
-get_doc_count(DbName) ->
- with_db(DbName, [], {couch_db, get_doc_count, []}).
-
-get_update_seq(DbName) ->
- with_db(DbName, [], {couch_db, get_update_seq, []}).
-
-set_security(DbName, SecObj, Options) ->
- with_db(DbName, Options, {couch_db, set_security, [SecObj]}).
-
-set_revs_limit(DbName, Limit, Options) ->
- with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
-
-open_doc(DbName, DocId, Options) ->
- with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
-
-open_revs(DbName, Id, Revs, Options) ->
- with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}).
-
-get_missing_revs(DbName, IdRevsList) ->
- % reimplement here so we get [] for Ids with no missing revs in response
- rexi:reply(case couch_db:open(DbName, []) of
- {ok, Db} ->
- Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
- {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) ->
- case FullDocInfoResult of
- {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} ->
- MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs),
- {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)};
- not_found ->
- {Id, Revs, []}
- end
- end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))};
- Error ->
- Error
- end).
-
-update_docs(DbName, Docs0, Options) ->
- case proplists:get_value(replicated_changes, Options) of
- true ->
- X = replicated_changes;
- _ ->
- X = interactive_edit
- end,
- Docs = make_att_readers(Docs0),
- with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
-
-group_info(DbName, Group0) ->
- {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- rexi:reply(couch_view_group:request_group_info(Pid)).
-
-reset_validation_funs(DbName) ->
- case couch_db:open(DbName, []) of
- {ok, #db{main_pid = Pid}} ->
- gen_server:cast(Pid, {load_validation_funs, undefined});
- _ ->
- ok
- end.
-
-%%
-%% internal
-%%
-
-with_db(DbName, Options, {M,F,A}) ->
- case couch_db:open(DbName, Options) of
- {ok, Db} ->
- rexi:reply(try
- apply(M, F, [Db | A])
- catch Exception ->
- Exception;
- error:Reason ->
- ?LOG_ERROR("~p ~p ~p~n~p", [?MODULE, {M,F}, Reason,
- erlang:get_stacktrace()]),
- {error, Reason}
- end);
- Error ->
- rexi:reply(Error)
- end.
-
-view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
- % matches for _all_docs and translates #full_doc_info{} -> KV pair
- case couch_doc:to_doc_info(FullDocInfo) of
- #doc_info{revs=[#rev_info{deleted=false, rev=Rev}|_]} ->
- Id = FullDocInfo#full_doc_info.id,
- Value = {[{rev,couch_doc:rev_to_str(Rev)}]},
- view_fold({{Id,Id}, Value}, OffsetReds, Acc);
- #doc_info{revs=[#rev_info{deleted=true}|_]} ->
- {ok, Acc}
- end;
-view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
- % calculates the offset for this shard
- #view_acc{reduce_fun=Reduce} = Acc,
- Offset = Reduce(OffsetReds),
- case rexi:sync_reply({total_and_offset, Total, Offset}) of
- ok ->
- view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset});
- stop ->
- exit(normal);
- timeout ->
- exit(timeout)
- end;
-view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) ->
- % we scanned through limit+skip local rows
- {stop, Acc};
-view_fold({{Key,Id}, Value}, _Offset, Acc) ->
- % the normal case
- #view_acc{
- db = Db,
- limit = Limit,
- include_docs = IncludeDocs
- } = Acc,
- Doc = if not IncludeDocs -> undefined; true ->
- case couch_db:open_doc(Db, Id, []) of
- {not_found, deleted} ->
- null;
- {not_found, missing} ->
- undefined;
- {ok, Doc0} ->
- couch_doc:to_json_obj(Doc0, [])
- end
- end,
- case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
- ok ->
- {ok, Acc#view_acc{limit=Limit-1}};
- timeout ->
- exit(timeout)
- end.
-
-final_response(Total, nil) ->
- case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
- rexi:reply(complete);
- stop ->
- ok;
- timeout ->
- exit(timeout)
- end;
-final_response(_Total, _Offset) ->
- rexi:reply(complete).
-
-group_rows_fun(exact) ->
- fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
-group_rows_fun(0) ->
- fun(_A, _B) -> true end;
-group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
- fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
- lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
- ({Key1,_}, {Key2,_}) ->
- Key1 == Key2
- end.
-
-reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) ->
- {stop, Acc};
-reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) ->
- send(null, Red, Acc);
-reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) ->
- send(Key, Red, Acc);
-reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
- send(lists:sublist(K, I), Red, Acc).
-
-send(Key, Value, #view_acc{limit=Limit} = Acc) ->
- case rexi:sync_reply(#view_row{key=Key, value=Value}) of
- ok ->
- {ok, Acc#view_acc{limit=Limit-1}};
- stop ->
- exit(normal);
- timeout ->
- exit(timeout)
- end.
-
-changes_enumerator(DocInfo, {Db, _Seq, Args}) ->
- #changes_args{include_docs=IncludeDocs, filter=FilterFun} = Args,
- #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}
- = DocInfo,
- case [Result || Result <- FilterFun(DocInfo), Result /= null] of
- [] ->
- {ok, {Db, Seq, Args}};
- Results ->
- ChangesRow = changes_row(Db, Seq, Id, Results, Rev, Del, IncludeDocs),
- Go = rexi:sync_reply(ChangesRow),
- {Go, {Db, Seq, Args}}
- end.
-
-changes_row(_, Seq, Id, Results, _, true, true) ->
- #view_row{key=Seq, id=Id, value=Results, doc=deleted};
-changes_row(_, Seq, Id, Results, _, true, false) ->
- #view_row{key=Seq, id=Id, value=Results, doc=deleted};
-changes_row(Db, Seq, Id, Results, Rev, false, true) ->
- #view_row{key=Seq, id=Id, value=Results, doc=doc_member(Db, Id, Rev)};
-changes_row(_, Seq, Id, Results, _, false, false) ->
- #view_row{key=Seq, id=Id, value=Results}.
-
-doc_member(Shard, Id, Rev) ->
- case couch_db:open_doc_revs(Shard, Id, [Rev], []) of
- {ok, [{ok,Doc}]} ->
- couch_doc:to_json_obj(Doc, []);
- Error ->
- Error
- end.
-
-possible_ancestors(_FullInfo, []) ->
- [];
-possible_ancestors(FullInfo, MissingRevs) ->
- #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
- LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
- % Find the revs that are possible parents of this rev
- lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
- % this leaf is a "possible ancenstor" of the missing
- % revs if this LeafPos lessthan any of the missing revs
- case lists:any(fun({MissingPos, _}) ->
- LeafPos < MissingPos end, MissingRevs) of
- true ->
- [{LeafPos, LeafRevId} | Acc];
- false ->
- Acc
- end
- end, [], LeafRevs).
-
-make_att_readers([]) ->
- [];
-make_att_readers([#doc{atts=Atts0} = Doc | Rest]) ->
- % % go through the attachments looking for 'follows' in the data,
- % % replace with function that reads the data from MIME stream.
- Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0],
- [Doc#doc{atts = Atts} | make_att_readers(Rest)].
-
-make_att_reader({follows, Parser}) ->
- fun() ->
- Parser ! {get_bytes, self()},
- receive {bytes, Bytes} -> Bytes end
- end;
-make_att_reader(Else) ->
- Else.
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
deleted file mode 100644
index 639a32e7..00000000
--- a/src/fabric_util.erl
+++ /dev/null
@@ -1,89 +0,0 @@
--module(fabric_util).
-
--export([submit_jobs/3, cleanup/1, recv/4, receive_loop/4, receive_loop/6,
- get_db/1]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-submit_jobs(Shards, EndPoint, ExtraArgs) ->
- lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
- Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}),
- Shard#shard{ref = Ref}
- end, Shards).
-
-cleanup(Workers) ->
- [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
-
-recv(Workers, Keypos, Fun, Acc0) ->
- receive_loop(Workers, Keypos, Fun, Acc0).
-
-receive_loop(Workers, Keypos, Fun, Acc0) ->
- case couch_config:get("fabric", "request_timeout", "60000") of
- "infinity" ->
- Timeout = infinity;
- N ->
- Timeout = list_to_integer(N)
- end,
- receive_loop(Workers, Keypos, Fun, Acc0, Timeout, infinity).
-
-%% @doc set up the receive loop with an overall timeout
--spec receive_loop([any()], integer(), function(), any(), timeout(), timeout()) ->
- {ok, any()} | timeout | {error, any()}.
-receive_loop(RefPartMap, Keypos, Fun, Acc0, infinity, PerMsgTO) ->
- process_mailbox(RefPartMap, Keypos, Fun, Acc0, nil, PerMsgTO);
-receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) ->
- TimeoutRef = erlang:make_ref(),
- {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}),
- try
- process_mailbox(RefPartMap, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO)
- after
- timer:cancel(TRef)
- end.
-
-process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
- case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of
- {ok, Acc} ->
- process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
- {stop, Acc} ->
- {ok, Acc};
- Error ->
- Error
- end.
-
-process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
- receive
- {timeout, TimeoutRef} ->
- timeout;
- {Ref, Msg} ->
- case lists:keyfind(Ref, Keypos, RefList) of
- false ->
- % this was some non-matching message which we will ignore
- {ok, Acc0};
- Worker ->
- Fun(Msg, Worker, Acc0)
- end;
- {Ref, From, Msg} ->
- case lists:keyfind(Ref, Keypos, RefList) of
- false ->
- {ok, Acc0};
- Worker ->
- Fun(Msg, {Worker, From}, Acc0)
- end;
- {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg ->
- showroom_log:message(alert, "rexi_DOWN ~p ~p", [ServerPid, Reason]),
- Fun(Msg, nil, Acc0)
- after PerMsgTO ->
- timeout
- end.
-
-get_db(DbName) ->
- Shards = mem3:shards(DbName),
- case lists:partition(fun(#shard{node = N}) -> N =:= node() end, Shards) of
- {[#shard{name = ShardName}|_], _} ->
- % prefer node-local DBs
- couch_db:open(ShardName, []);
- {[], [#shard{node = Node, name = ShardName}|_]} ->
- % but don't require them
- rpc:call(Node, couch_db, open, [ShardName, []])
- end.
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
deleted file mode 100644
index 49a3a55a..00000000
--- a/src/fabric_view.erl
+++ /dev/null
@@ -1,218 +0,0 @@
--module(fabric_view).
-
--export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
- maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1,
- extract_view/4]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-%% @doc looks for a fully covered keyrange in the list of counters
--spec is_progress_possible([{#shard{}, term()}]) -> boolean().
-is_progress_possible([]) ->
- false;
-is_progress_possible(Counters) ->
- Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end,
- [], Counters),
- [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges),
- Result = lists:foldl(fun
- (_, fail) ->
- % we've already declared failure
- fail;
- (_, complete) ->
- % this is the success condition, we can fast-forward
- complete;
- ({X,_}, Tail) when X > (Tail+1) ->
- % gap in the keyrange, we're dead
- fail;
- ({_,Y}, Tail) ->
- case erlang:max(Tail, Y) of
- End when (End+1) =:= (2 bsl 31) ->
- complete;
- Else ->
- % the normal condition, adding to the tail
- Else
- end
- end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest),
- (Start =:= 0) andalso (Result =:= complete).
-
--spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
- [{#shard{}, any()}].
-remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
- fabric_dict:filter(fun(#shard{range=[X,Y]} = Shard, _Value) ->
- if Shard =:= Shard0 ->
- % we can't remove ourselves
- true;
- A < B, X >= A, X < B ->
- % lower bound is inside our range
- false;
- A < B, Y > A, Y =< B ->
- % upper bound is inside our range
- false;
- B < A, X >= A orelse B < A, X < B ->
- % target shard wraps the key range, lower bound is inside
- false;
- B < A, Y > A orelse B < A, Y =< B ->
- % target shard wraps the key range, upper bound is inside
- false;
- true ->
- true
- end
- end, Shards).
-
-maybe_pause_worker(Worker, From, State) ->
- #collector{buffer_size = BufferSize, counters = Counters} = State,
- case fabric_dict:lookup_element(Worker, Counters) of
- BufferSize ->
- State#collector{blocked = [{Worker,From} | State#collector.blocked]};
- _Count ->
- gen_server:reply(From, ok),
- State
- end.
-
-maybe_resume_worker(Worker, State) ->
- #collector{buffer_size = Buffer, counters = C, blocked = B} = State,
- case fabric_dict:lookup_element(Worker, C) of
- Count when Count < Buffer/2 ->
- case couch_util:get_value(Worker, B) of
- undefined ->
- State;
- From ->
- gen_server:reply(From, ok),
- State#collector{blocked = lists:keydelete(Worker, 1, B)}
- end;
- _Other ->
- State
- end.
-
-maybe_send_row(#collector{limit=0} = State) ->
- #collector{user_acc=AccIn, callback=Callback} = State,
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}};
-maybe_send_row(State) ->
- #collector{
- callback = Callback,
- counters = Counters,
- skip = Skip,
- limit = Limit,
- user_acc = AccIn
- } = State,
- case fabric_dict:any(0, Counters) of
- true ->
- {ok, State};
- false ->
- try get_next_row(State) of
- {_, NewState} when Skip > 0 ->
- maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1});
- {Row, NewState} ->
- case Callback(transform_row(Row), AccIn) of
- {stop, Acc} ->
- {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
- {ok, Acc} ->
- maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
- end
- catch complete ->
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}}
- end
- end.
-
-keydict(nil) ->
- undefined;
-keydict(Keys) ->
- {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end,
- {dict:new(),0}, Keys),
- Dict.
-
-%% internal %%
-
-get_next_row(#collector{rows = []}) ->
- throw(complete);
-get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
- #collector{
- query_args = #view_query_args{direction=Dir},
- keys = Keys,
- rows = RowDict,
- os_proc = Proc,
- counters = Counters0
- } = St,
- {Key, RestKeys} = find_next_key(Keys, Dir, RowDict),
- case dict:find(Key, RowDict) of
- {ok, Records} ->
- NewRowDict = dict:erase(Key, RowDict),
- Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) ->
- fabric_dict:update_counter(Worker, -1, CountersAcc)
- end, Counters0, Records),
- Wrapped = [[V] || #view_row{value=V} <- Records],
- {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped),
- NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
- {#view_row{key=Key, id=reduced, value=Reduced}, NewSt};
- error ->
- get_next_row(St#collector{keys=RestKeys})
- end;
-get_next_row(State) ->
- #collector{rows = [Row|Rest], counters = Counters0} = State,
- Worker = Row#view_row.worker,
- Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
- NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}),
- {Row, NewState#collector{rows = Rest}}.
-
-find_next_key(nil, Dir, RowDict) ->
- case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
- [] ->
- throw(complete);
- [Key|_] ->
- {Key, nil}
- end;
-find_next_key([], _, _) ->
- throw(complete);
-find_next_key([Key|Rest], _, _) ->
- {Key, Rest}.
-
-transform_row(#view_row{key=Key, id=reduced, value=Value}) ->
- {row, {[{key,Key}, {value,Value}]}};
-transform_row(#view_row{key=Key, id=undefined}) ->
- {row, {[{key,Key}, {error,not_found}]}};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
- {row, {[{id,Id}, {key,Key}, {value,Value}]}};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
- {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
- {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}.
-
-sort_fun(fwd) ->
- fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end;
-sort_fun(rev) ->
- fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end.
-
-extract_view(Pid, ViewName, [], _ViewType) ->
- ?LOG_ERROR("missing_named_view ~p", [ViewName]),
- exit(Pid, kill),
- exit(missing_named_view);
-extract_view(Pid, ViewName, [View|Rest], ViewType) ->
- case lists:member(ViewName, view_names(View, ViewType)) of
- true ->
- if ViewType == reduce ->
- {index_of(ViewName, view_names(View, reduce)), View};
- true ->
- View
- end;
- false ->
- extract_view(Pid, ViewName, Rest, ViewType)
- end.
-
-view_names(View, Type) when Type == red_map; Type == reduce ->
- [Name || {Name, _} <- View#view.reduce_funs];
-view_names(View, map) ->
- View#view.map_names.
-
-index_of(X, List) ->
- index_of(X, List, 1).
-
-index_of(_X, [], _I) ->
- not_found;
-index_of(X, [X|_Rest], I) ->
- I;
-index_of(X, [_|Rest], I) ->
- index_of(X, Rest, I+1).
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
deleted file mode 100644
index d51a2831..00000000
--- a/src/fabric_view_all_docs.erl
+++ /dev/null
@@ -1,167 +0,0 @@
--module(fabric_view_all_docs).
-
--export([go/4]).
--export([open_doc/3]). % exported for spawn
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
- Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) ->
- Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}),
- Shard#shard{ref = Ref}
- end, mem3:shards(DbName)),
- BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
- #view_query_args{limit = Limit, skip = Skip} = QueryArgs,
- State = #collector{
- query_args = QueryArgs,
- callback = Callback,
- buffer_size = list_to_integer(BufferSize),
- counters = fabric_dict:init(Workers, 0),
- skip = Skip,
- limit = Limit,
- user_acc = Acc0
- },
- try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 5000) of
- {ok, NewState} ->
- {ok, NewState#collector.user_acc};
- Error ->
- Error
- after
- fabric_util:cleanup(Workers)
- end;
-
-go(DbName, QueryArgs, Callback, Acc0) ->
- #view_query_args{
- direction = Dir,
- include_docs = IncludeDocs,
- limit = Limit0,
- skip = Skip0,
- keys = Keys
- } = QueryArgs,
- {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end),
- Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) ||
- Id <- Keys],
- Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end,
- receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
- {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0),
- {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1),
- Callback(complete, Acc2)
- after 10000 ->
- Callback(timeout, Acc0)
- end.
-
-handle_message({rexi_DOWN, _, _, _}, nil, State) ->
- % TODO see if progress can be made here, possibly by removing all shards
- % from that node and checking is_progress_possible
- {ok, State};
-
-handle_message({rexi_EXIT, _}, Worker, State) ->
- #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters}};
- false ->
- Callback({error, dead_shards}, Acc),
- {error, dead_shards}
- end;
-
-handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
- #collector{
- callback = Callback,
- counters = Counters0,
- total_rows = Total0,
- offset = Offset0,
- user_acc = AccIn
- } = State,
- case fabric_dict:lookup_element(Worker, Counters0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate
- gen_server:reply(From, stop),
- {ok, State};
- 0 ->
- gen_server:reply(From, ok),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
- Total = Total0 + Tot,
- Offset = Offset0 + Off,
- case fabric_dict:any(0, Counters2) of
- true ->
- {ok, State#collector{
- counters = Counters2,
- total_rows = Total,
- offset = Offset
- }};
- false ->
- FinalOffset = erlang:min(Total, Offset+State#collector.skip),
- {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
- {Go, State#collector{
- counters = fabric_dict:decrement_all(Counters2),
- total_rows = Total,
- offset = FinalOffset,
- user_acc = Acc
- }}
- end
- end;
-
-handle_message(#view_row{} = Row, {Worker, From}, State) ->
- #collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
- Dir = Args#view_query_args.direction,
- Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- State1 = State#collector{rows=Rows, counters=Counters1},
- State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
- fabric_view:maybe_send_row(State2);
-
-handle_message(complete, Worker, State) ->
- Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
- fabric_view:maybe_send_row(State#collector{counters = Counters}).
-
-
-merge_row(fwd, Row, Rows) ->
- lists:keymerge(#view_row.id, [Row], Rows);
-merge_row(rev, Row, Rows) ->
- lists:rkeymerge(#view_row.id, [Row], Rows).
-
-doc_receive_loop([], _, _, _, Acc) ->
- {ok, Acc};
-doc_receive_loop(_, _, 0, _, Acc) ->
- {ok, Acc};
-doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 ->
- receive {'DOWN', Ref, process, Pid, #view_row{}} ->
- doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc)
- after 10000 ->
- timeout
- end;
-doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) ->
- receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
- case Callback(fabric_view:transform_row(Row), AccIn) of
- {ok, Acc} ->
- doc_receive_loop(Rest, 0, Limit-1, Callback, Acc);
- {stop, Acc} ->
- {ok, Acc}
- end
- after 10000 ->
- timeout
- end.
-
-open_doc(DbName, Id, IncludeDocs) ->
- Row = case fabric:open_doc(DbName, Id, [deleted]) of
- {not_found, missing} ->
- Doc = undefined,
- #view_row{key=Id};
- {ok, #doc{deleted=true, revs=Revs}} ->
- Doc = null,
- {RevPos, [RevId|_]} = Revs,
- Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]},
- #view_row{key=Id, id=Id, value=Value};
- {ok, #doc{revs=Revs} = Doc0} ->
- Doc = couch_doc:to_json_obj(Doc0, []),
- {RevPos, [RevId|_]} = Revs,
- Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]},
- #view_row{key=Id, id=Id, value=Value}
- end,
- exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end).
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
deleted file mode 100644
index 6030df1d..00000000
--- a/src/fabric_view_changes.erl
+++ /dev/null
@@ -1,251 +0,0 @@
--module(fabric_view_changes).
-
--export([go/5, start_update_notifier/1]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
- Feed == "longpoll" ->
- Args = make_changes_args(Options),
- {ok, Acc} = Callback(start, Acc0),
- Notifiers = start_update_notifiers(DbName),
- {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback),
- try
- keep_sending_changes(
- DbName,
- Args,
- Callback,
- get_start_seq(DbName, Args),
- Acc,
- Timeout,
- TimeoutFun
- )
- after
- stop_update_notifiers(Notifiers),
- couch_changes:get_rest_db_updated()
- end;
-
-go(DbName, "normal", Options, Callback, Acc0) ->
- Args = make_changes_args(Options),
- {ok, Acc} = Callback(start, Acc0),
- {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
- DbName,
- Args,
- Callback,
- get_start_seq(DbName, Args),
- Acc
- ),
- Callback({stop, pack_seqs(Seqs)}, AccOut).
-
-keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) ->
- #changes_args{limit=Limit, feed=Feed} = Args,
- {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn),
- #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
- LastSeq = pack_seqs(NewSeqs),
- if Limit > Limit2, Feed == "longpoll" ->
- Callback({stop, LastSeq}, AccOut);
- true ->
- case couch_changes:wait_db_updated(Timeout, TFun) of
- updated ->
- keep_sending_changes(
- DbName,
- Args#changes_args{limit=Limit2},
- Callback,
- LastSeq,
- AccIn,
- Timeout,
- TFun
- );
- stop ->
- Callback({stop, LastSeq}, AccOut)
- end
- end.
-
-send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) ->
- AllShards = mem3:shards(DbName),
- Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
- case lists:member(Shard, AllShards) of
- true ->
- Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
- [{Shard#shard{ref = Ref}, Seq}];
- false ->
- % Find some replacement shards to cover the missing range
- % TODO It's possible in rare cases of shard merging to end up
- % with overlapping shard ranges from this technique
- lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
- Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
- {NewShard#shard{ref = Ref}, 0}
- end, find_replacement_shards(Shard, AllShards))
- end
- end, unpack_seqs(PackedSeqs, DbName)),
- {Workers, _} = lists:unzip(Seqs),
- State = #collector{
- query_args = ChangesArgs,
- callback = Callback,
- counters = fabric_dict:init(Workers, 0),
- user_acc = AccIn,
- limit = ChangesArgs#changes_args.limit,
- rows = Seqs % store sequence positions instead
- },
- try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 5000)
- after
- fabric_util:cleanup(Workers)
- end.
-
-handle_message({rexi_DOWN, _, _, _}, nil, State) ->
- % TODO see if progress can be made here, possibly by removing all shards
- % from that node and checking is_progress_possible
- {ok, State};
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
- ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
- #collector{
- callback=Callback,
- counters=Counters0,
- rows = Seqs0,
- user_acc=Acc
- } = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- Seqs = fabric_dict:erase(Worker, Seqs0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters, rows=Seqs}};
- false ->
- Callback({error, dead_shards}, Acc),
- {error, dead_shards}
- end;
-
-handle_message(_, _, #collector{limit=0} = State) ->
- {stop, State};
-
-handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) ->
- #collector{
- query_args = #changes_args{include_docs=IncludeDocs},
- callback = Callback,
- counters = S0,
- limit = Limit,
- user_acc = AccIn
- } = St,
- case fabric_dict:lookup_element(Worker, S0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate it
- gen_server:reply(From, stop),
- {ok, St};
- _ ->
- S1 = fabric_dict:store(Worker, Seq, S0),
- S2 = fabric_view:remove_overlapping_shards(Worker, S1),
- Row = Row0#view_row{key = pack_seqs(S2)},
- {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
- gen_server:reply(From, Go),
- {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}}
- end;
-
-handle_message({complete, EndSeq}, Worker, State) ->
- #collector{
- counters = S0,
- total_rows = Completed % override
- } = State,
- case fabric_dict:lookup_element(Worker, S0) of
- undefined ->
- {ok, State};
- _ ->
- S1 = fabric_dict:store(Worker, EndSeq, S0),
- % unlikely to have overlaps here, but possible w/ filters
- S2 = fabric_view:remove_overlapping_shards(Worker, S1),
- NewState = State#collector{counters=S2, total_rows=Completed+1},
- case fabric_dict:size(S2) =:= (Completed+1) of
- true ->
- {stop, NewState};
- false ->
- {ok, NewState}
- end
- end.
-
-make_changes_args(#changes_args{style=main_only, filter=undefined}=Args) ->
- Args#changes_args{filter = fun couch_changes:main_only_filter/1};
-make_changes_args(#changes_args{style=all_docs, filter=undefined}=Args) ->
- Args#changes_args{filter = fun couch_changes:all_docs_filter/1};
-make_changes_args(Args) ->
- Args.
-
-get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
- Since;
-get_start_seq(DbName, #changes_args{dir=rev}) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
- {ok, Since} = fabric_util:recv(Workers, #shard.ref,
- fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
- Since.
-
-collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, Counters};
- -1 ->
- C1 = fabric_dict:store(Shard, Seq, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(-1, C2) of
- true ->
- {ok, C2};
- false ->
- {stop, pack_seqs(C2)}
- end
- end.
-
-pack_seqs(Workers) ->
- SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
- SeqSum = lists:sum(element(2, lists:unzip(Workers))),
- Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
- list_to_binary([integer_to_list(SeqSum), $-, Opaque]).
-
-unpack_seqs(0, DbName) ->
- fabric_dict:init(mem3:shards(DbName), 0);
-
-unpack_seqs("0", DbName) ->
- fabric_dict:init(mem3:shards(DbName), 0);
-
-unpack_seqs(Packed, DbName) ->
- {match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?<opaque>.*)", [{capture,
- [opaque], binary}]),
- % TODO relies on internal structure of fabric_dict as keylist
- lists:map(fun({Node, [A,B], Seq}) ->
- Shard = #shard{node=Node, range=[A,B], dbname=DbName},
- {mem3_util:name_shard(Shard), Seq}
- end, binary_to_term(couch_util:decodeBase64Url(Opaque))).
-
-start_update_notifiers(DbName) ->
- lists:map(fun(#shard{node=Node, name=Name}) ->
- {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
- end, mem3:shards(DbName)).
-
-% rexi endpoint
-start_update_notifier(DbName) ->
- {Caller, _} = get(rexi_from),
- Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end,
- Id = {couch_db_update_notifier, make_ref()},
- ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
- receive {gen_event_EXIT, Id, Reason} ->
- rexi:reply({gen_event_EXIT, DbName, Reason})
- end.
-
-stop_update_notifiers(Notifiers) ->
- [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].
-
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value}, false) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
-
-find_replacement_shards(#shard{range=Range}, AllShards) ->
- % TODO make this moar betta -- we might have split or merged the partition
- [Shard || Shard <- AllShards, Shard#shard.range =:= Range].
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
deleted file mode 100644
index ce8dd625..00000000
--- a/src/fabric_view_map.erl
+++ /dev/null
@@ -1,138 +0,0 @@
--module(fabric_view_map).
-
--export([go/6]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
- {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
- go(DbName, DDoc, View, Args, Callback, Acc0);
-
-go(DbName, DDoc, View, Args, Callback, Acc0) ->
- Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) ->
- Ref = rexi:cast(Node, {fabric_rpc, map_view, [Name, DDoc, View, Args]}),
- Shard#shard{ref = Ref}
- end, mem3:shards(DbName)),
- BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
- #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args,
- State = #collector{
- query_args = Args,
- callback = Callback,
- buffer_size = list_to_integer(BufferSize),
- counters = fabric_dict:init(Workers, 0),
- skip = Skip,
- limit = Limit,
- keys = fabric_view:keydict(Keys),
- sorted = Args#view_query_args.sorted,
- user_acc = Acc0
- },
- try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 1000 * 60 * 60) of
- {ok, NewState} ->
- {ok, NewState#collector.user_acc};
- Error ->
- Error
- after
- fabric_util:cleanup(Workers)
- end.
-
-handle_message({rexi_DOWN, _, _, _}, nil, State) ->
- % TODO see if progress can be made here, possibly by removing all shards
- % from that node and checking is_progress_possible
- {ok, State};
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
- ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
- #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters}};
- false ->
- Callback({error, dead_shards}, Acc),
- {error, dead_shards}
- end;
-
-handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
- #collector{
- callback = Callback,
- counters = Counters0,
- total_rows = Total0,
- offset = Offset0,
- user_acc = AccIn
- } = State,
- case fabric_dict:lookup_element(Worker, Counters0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate
- gen_server:reply(From, stop),
- {ok, State};
- 0 ->
- gen_server:reply(From, ok),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
- Total = Total0 + Tot,
- Offset = Offset0 + Off,
- case fabric_dict:any(0, Counters2) of
- true ->
- {ok, State#collector{
- counters = Counters2,
- total_rows = Total,
- offset = Offset
- }};
- false ->
- FinalOffset = erlang:min(Total, Offset+State#collector.skip),
- {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
- {Go, State#collector{
- counters = fabric_dict:decrement_all(Counters2),
- total_rows = Total,
- offset = FinalOffset,
- user_acc = Acc
- }}
- end
- end;
-
-handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->
- #collector{callback=Callback} = State,
- {_, Acc} = Callback(complete, State#collector.user_acc),
- {stop, State#collector{user_acc=Acc}};
-
-handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) ->
- #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St,
- {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn),
- gen_server:reply(From, ok),
- {Go, St#collector{user_acc=Acc, limit=Limit-1}};
-
-handle_message(#view_row{} = Row, {Worker, From}, State) ->
- #collector{
- query_args = #view_query_args{direction=Dir},
- counters = Counters0,
- rows = Rows0,
- keys = KeyDict
- } = State,
- Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- State1 = State#collector{rows=Rows, counters=Counters1},
- State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
- fabric_view:maybe_send_row(State2);
-
-handle_message(complete, Worker, State) ->
- Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
- fabric_view:maybe_send_row(State#collector{counters = Counters}).
-
-merge_row(fwd, undefined, Row, Rows) ->
- lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
- couch_view:less_json([KeyA, IdA], [KeyB, IdB])
- end, [Row], Rows);
-merge_row(rev, undefined, Row, Rows) ->
- lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
- couch_view:less_json([KeyB, IdB], [KeyA, IdA])
- end, [Row], Rows);
-merge_row(_, KeyDict, Row, Rows) ->
- lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) ->
- if A =:= B -> IdA < IdB; true ->
- dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict)
- end
- end, [Row], Rows).
-
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
deleted file mode 100644
index ddde9f22..00000000
--- a/src/fabric_view_reduce.erl
+++ /dev/null
@@ -1,85 +0,0 @@
--module(fabric_view_reduce).
-
--export([go/6]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
- {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
- go(DbName, DDoc, View, Args, Callback, Acc0);
-
-go(DbName, DDoc, VName, Args, Callback, Acc0) ->
- #group{def_lang=Lang, views=Views} = Group =
- couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc),
- {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
- {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs),
- Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
- Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}),
- Shard#shard{ref = Ref}
- end, mem3:shards(DbName)),
- BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"),
- #view_query_args{limit = Limit, skip = Skip} = Args,
- State = #collector{
- query_args = Args,
- callback = Callback,
- buffer_size = list_to_integer(BufferSize),
- counters = fabric_dict:init(Workers, 0),
- keys = Args#view_query_args.keys,
- skip = Skip,
- limit = Limit,
- lang = Group#group.def_lang,
- os_proc = couch_query_servers:get_os_process(Lang),
- reducer = RedSrc,
- rows = dict:new(),
- user_acc = Acc0
- },
- try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 1000 * 60 * 60) of
- {ok, NewState} ->
- {ok, NewState#collector.user_acc};
- Error ->
- Error
- after
- fabric_util:cleanup(Workers),
- catch couch_query_servers:ret_os_process(State#collector.os_proc)
- end.
-
-handle_message({rexi_DOWN, _, _, _}, nil, State) ->
- % TODO see if progress can be made here, possibly by removing all shards
- % from that node and checking is_progress_possible
- {ok, State};
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
- ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
- #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters}};
- false ->
- Callback({error, dead_shards}, Acc),
- {error, dead_shards}
- end;
-
-handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
- #collector{counters = Counters0, rows = Rows0} = State,
- case fabric_dict:lookup_element(Worker, Counters0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate it
- gen_server:reply(From, stop),
- {ok, State};
- _ ->
- Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0),
- C1 = fabric_dict:update_counter(Worker, 1, Counters0),
- % TODO time this call, if slow don't do it every time
- C2 = fabric_view:remove_overlapping_shards(Worker, C1),
- State1 = State#collector{rows=Rows, counters=C2},
- State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
- fabric_view:maybe_send_row(State2)
- end;
-
-handle_message(complete, Worker, State) ->
- Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
- fabric_view:maybe_send_row(State#collector{counters = Counters}).