summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/priv/couch_js/http.c675
-rw-r--r--src/couchdb/priv/couch_js/http.h18
-rw-r--r--src/couchdb/priv/couch_js/main.c338
-rw-r--r--src/couchdb/priv/couch_js/utf8.c286
-rw-r--r--src/couchdb/priv/couch_js/utf8.h19
-rw-r--r--src/couchdb/priv/spawnkillable/couchspawnkillable_win.c145
-rw-r--r--src/mem3.erl103
-rw-r--r--src/mem3_app.erl9
-rw-r--r--src/mem3_cache.erl92
-rw-r--r--src/mem3_httpd.erl39
-rw-r--r--src/mem3_nodes.erl120
-rw-r--r--src/mem3_sup.erl21
-rw-r--r--src/mem3_sync.erl215
-rw-r--r--src/mem3_sync_event.erl44
-rw-r--r--src/mem3_util.erl139
15 files changed, 1481 insertions, 782 deletions
diff --git a/src/couchdb/priv/couch_js/http.c b/src/couchdb/priv/couch_js/http.c
new file mode 100644
index 00000000..6c2a8a82
--- /dev/null
+++ b/src/couchdb/priv/couch_js/http.c
@@ -0,0 +1,675 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <jsapi.h>
+#include <curl/curl.h>
+
+#include "utf8.h"
+
+#ifdef XP_WIN
+// Map some of the string function names to things which exist on Windows
+#define strcasecmp _strcmpi
+#define strncasecmp _strnicmp
+#define snprintf _snprintf
+#endif
+
+typedef struct curl_slist CurlHeaders;
+
+typedef struct {
+ int method;
+ char* url;
+ CurlHeaders* req_headers;
+ jsint last_status;
+} HTTPData;
+
+char* METHODS[] = {"GET", "HEAD", "POST", "PUT", "DELETE", "COPY", NULL};
+
+#define GET 0
+#define HEAD 1
+#define POST 2
+#define PUT 3
+#define DELETE 4
+#define COPY 5
+
+static JSBool
+go(JSContext* cx, JSObject* obj, HTTPData* http, char* body, size_t blen);
+
+static JSString*
+str_from_binary(JSContext* cx, char* data, size_t length);
+
+static JSBool
+constructor(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval)
+{
+ HTTPData* http = NULL;
+ JSBool ret = JS_FALSE;
+
+ http = (HTTPData*) malloc(sizeof(HTTPData));
+ if(!http)
+ {
+ JS_ReportError(cx, "Failed to create CouchHTTP instance.");
+ goto error;
+ }
+
+ http->method = -1;
+ http->url = NULL;
+ http->req_headers = NULL;
+ http->last_status = -1;
+
+ if(!JS_SetPrivate(cx, obj, http))
+ {
+ JS_ReportError(cx, "Failed to set private CouchHTTP data.");
+ goto error;
+ }
+
+ ret = JS_TRUE;
+ goto success;
+
+error:
+ if(http) free(http);
+
+success:
+ return ret;
+}
+
+static void
+destructor(JSContext* cx, JSObject* obj)
+{
+ HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj);
+ if(!http)
+ {
+ fprintf(stderr, "Unable to destroy invalid CouchHTTP instance.\n");
+ }
+ else
+ {
+ if(http->url) free(http->url);
+ if(http->req_headers) curl_slist_free_all(http->req_headers);
+ free(http);
+ }
+}
+
+static JSBool
+open(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval)
+{
+ HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj);
+ char* method = NULL;
+ char* url = NULL;
+ JSBool ret = JS_FALSE;
+ int methid;
+
+ if(!http)
+ {
+ JS_ReportError(cx, "Invalid CouchHTTP instance.");
+ goto done;
+ }
+
+ if(argv[0] == JSVAL_VOID)
+ {
+ JS_ReportError(cx, "You must specify a method.");
+ goto done;
+ }
+
+ method = enc_string(cx, argv[0], NULL);
+ if(!method)
+ {
+ JS_ReportError(cx, "Failed to encode method.");
+ goto done;
+ }
+
+ for(methid = 0; METHODS[methid] != NULL; methid++)
+ {
+ if(strcasecmp(METHODS[methid], method) == 0) break;
+ }
+
+ if(methid > COPY)
+ {
+ JS_ReportError(cx, "Invalid method specified.");
+ goto done;
+ }
+
+ http->method = methid;
+
+ if(argv[1] == JSVAL_VOID)
+ {
+ JS_ReportError(cx, "You must specify a URL.");
+ goto done;
+ }
+
+ if(http->url)
+ {
+ free(http->url);
+ http->url = NULL;
+ }
+
+ http->url = enc_string(cx, argv[1], NULL);
+ if(!http->url)
+ {
+ JS_ReportError(cx, "Failed to encode URL.");
+ goto done;
+ }
+
+ if(argv[2] != JSVAL_VOID && argv[2] != JSVAL_FALSE)
+ {
+ JS_ReportError(cx, "Synchronous flag must be false if specified.");
+ goto done;
+ }
+
+ if(http->req_headers)
+ {
+ curl_slist_free_all(http->req_headers);
+ http->req_headers = NULL;
+ }
+
+ // Disable Expect: 100-continue
+ http->req_headers = curl_slist_append(http->req_headers, "Expect:");
+
+ ret = JS_TRUE;
+
+done:
+ if(method) free(method);
+ return ret;
+}
+
+static JSBool
+setheader(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval)
+{
+ HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj);
+ char* keystr = NULL;
+ char* valstr = NULL;
+ char* hdrbuf = NULL;
+ size_t hdrlen = -1;
+ JSBool ret = JS_FALSE;
+
+ if(!http)
+ {
+ JS_ReportError(cx, "Invalid CouchHTTP instance.");
+ goto done;
+ }
+
+ if(argv[0] == JSVAL_VOID)
+ {
+ JS_ReportError(cx, "You must speciy a header name.");
+ goto done;
+ }
+
+ keystr = enc_string(cx, argv[0], NULL);
+ if(!keystr)
+ {
+ JS_ReportError(cx, "Failed to encode header name.");
+ goto done;
+ }
+
+ if(argv[1] == JSVAL_VOID)
+ {
+ JS_ReportError(cx, "You must specify a header value.");
+ goto done;
+ }
+
+ valstr = enc_string(cx, argv[1], NULL);
+ if(!valstr)
+ {
+ JS_ReportError(cx, "Failed to encode header value.");
+ goto done;
+ }
+
+ hdrlen = strlen(keystr) + strlen(valstr) + 3;
+ hdrbuf = (char*) malloc(hdrlen * sizeof(char));
+ if(!hdrbuf)
+ {
+ JS_ReportError(cx, "Failed to allocate header buffer.");
+ goto done;
+ }
+
+ snprintf(hdrbuf, hdrlen, "%s: %s", keystr, valstr);
+ http->req_headers = curl_slist_append(http->req_headers, hdrbuf);
+
+ ret = JS_TRUE;
+
+done:
+ if(keystr) free(keystr);
+ if(valstr) free(valstr);
+ if(hdrbuf) free(hdrbuf);
+
+ return ret;
+}
+
+static JSBool
+sendreq(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval)
+{
+ HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj);
+ char* body = NULL;
+ size_t bodylen = 0;
+ JSBool ret = JS_FALSE;
+
+ if(!http)
+ {
+ JS_ReportError(cx, "Invalid CouchHTTP instance.");
+ goto done;
+ }
+
+ if(argv[0] != JSVAL_VOID && argv[0] != JS_GetEmptyStringValue(cx))
+ {
+ body = enc_string(cx, argv[0], &bodylen);
+ if(!body)
+ {
+ JS_ReportError(cx, "Failed to encode body.");
+ goto done;
+ }
+ }
+
+ ret = go(cx, obj, http, body, bodylen);
+
+done:
+ if(body) free(body);
+ return ret;
+}
+
+static JSBool
+status(JSContext* cx, JSObject* obj, jsval idval, jsval* vp)
+{
+ HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj);
+
+ if(!http)
+ {
+ JS_ReportError(cx, "Invalid CouchHTTP instance.");
+ return JS_FALSE;
+ }
+
+ if(INT_FITS_IN_JSVAL(http->last_status))
+ {
+ *vp = INT_TO_JSVAL(http->last_status);
+ return JS_TRUE;
+ }
+ else
+ {
+ JS_ReportError(cx, "INTERNAL: Invalid last_status");
+ return JS_FALSE;
+ }
+}
+
+JSClass CouchHTTPClass = {
+ "CouchHTTP",
+ JSCLASS_HAS_PRIVATE
+ | JSCLASS_CONSTRUCT_PROTOTYPE
+ | JSCLASS_HAS_RESERVED_SLOTS(2),
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_EnumerateStub,
+ JS_ResolveStub,
+ JS_ConvertStub,
+ destructor,
+ JSCLASS_NO_OPTIONAL_MEMBERS
+};
+
+JSPropertySpec CouchHTTPProperties[] = {
+ {"status", 0, JSPROP_READONLY, status, NULL},
+ {0, 0, 0, 0, 0}
+};
+
+JSFunctionSpec CouchHTTPFunctions[] = {
+ {"_open", open, 3, 0, 0},
+ {"_setRequestHeader", setheader, 2, 0, 0},
+ {"_send", sendreq, 1, 0, 0},
+ {0, 0, 0, 0, 0}
+};
+
+JSObject*
+install_http(JSContext* cx, JSObject* glbl)
+{
+ JSObject* klass = NULL;
+ HTTPData* http = NULL;
+
+ klass = JS_InitClass(
+ cx,
+ glbl,
+ NULL,
+ &CouchHTTPClass,
+ constructor,
+ 0,
+ CouchHTTPProperties,
+ CouchHTTPFunctions,
+ NULL,
+ NULL
+ );
+
+ if(!klass)
+ {
+ fprintf(stderr, "Failed to initialize CouchHTTP class.\n");
+ return NULL;
+ }
+
+ return klass;
+}
+
+
+// Curl Helpers
+
+typedef struct {
+ HTTPData* http;
+ JSContext* cx;
+ JSObject* resp_headers;
+ char* sendbuf;
+ size_t sendlen;
+ size_t sent;
+ char* recvbuf;
+ size_t recvlen;
+ size_t read;
+} CurlState;
+
+/*
+ * I really hate doing this but this doesn't have to be
+ * uber awesome, it just has to work.
+ */
+CURL* HTTP_HANDLE = NULL;
+char ERRBUF[CURL_ERROR_SIZE];
+
+static size_t send_body(void *ptr, size_t size, size_t nmem, void *data);
+static int seek_body(void *ptr, curl_off_t offset, int origin);
+static size_t recv_body(void *ptr, size_t size, size_t nmem, void *data);
+static size_t recv_header(void *ptr, size_t size, size_t nmem, void *data);
+
+static JSBool
+go(JSContext* cx, JSObject* obj, HTTPData* http, char* body, size_t bodylen)
+{
+ CurlState state;
+ JSString* jsbody;
+ JSBool ret = JS_FALSE;
+ jsval tmp;
+
+ state.cx = cx;
+ state.http = http;
+
+ state.sendbuf = body;
+ state.sendlen = bodylen;
+ state.sent = 0;
+
+ state.recvbuf = NULL;
+ state.recvlen = 0;
+ state.read = 0;
+
+ if(HTTP_HANDLE == NULL)
+ {
+ HTTP_HANDLE = curl_easy_init();
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_READFUNCTION, send_body);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_SEEKFUNCTION,
+ (curl_seek_callback) seek_body);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_HEADERFUNCTION, recv_header);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEFUNCTION, recv_body);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOPROGRESS, 1);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_ERRORBUFFER, ERRBUF);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_COOKIEFILE, "");
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_USERAGENT,
+ "CouchHTTP Client - Relax");
+ }
+
+ if(!HTTP_HANDLE)
+ {
+ JS_ReportError(cx, "Failed to initialize cURL handle.");
+ goto done;
+ }
+
+ if(http->method < 0 || http->method > COPY)
+ {
+ JS_ReportError(cx, "INTERNAL: Unknown method.");
+ goto done;
+ }
+
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_CUSTOMREQUEST, METHODS[http->method]);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOBODY, 0);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 1);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_UPLOAD, 0);
+
+ if(http->method == HEAD)
+ {
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOBODY, 1);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 0);
+ }
+ else if(http->method == POST || http->method == PUT)
+ {
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_UPLOAD, 1);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 0);
+ }
+
+ if(body && bodylen)
+ {
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_INFILESIZE, bodylen);
+ }
+ else
+ {
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_INFILESIZE, 0);
+ }
+
+ //curl_easy_setopt(HTTP_HANDLE, CURLOPT_VERBOSE, 1);
+
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_URL, http->url);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_HTTPHEADER, http->req_headers);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_READDATA, &state);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_SEEKDATA, &state);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEHEADER, &state);
+ curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEDATA, &state);
+
+ if(curl_easy_perform(HTTP_HANDLE) != 0)
+ {
+ JS_ReportError(cx, "Failed to execute HTTP request: %s", ERRBUF);
+ goto done;
+ }
+
+ if(!state.resp_headers)
+ {
+ JS_ReportError(cx, "Failed to recieve HTTP headers.");
+ goto done;
+ }
+
+ tmp = OBJECT_TO_JSVAL(state.resp_headers);
+ if(!JS_DefineProperty(
+ cx,
+ obj,
+ "_headers",
+ tmp,
+ NULL,
+ NULL,
+ JSPROP_READONLY
+ ))
+ {
+ JS_ReportError(cx, "INTERNAL: Failed to set response headers.");
+ goto done;
+ }
+
+ if(state.recvbuf) // Is good enough?
+ {
+ state.recvbuf[state.read] = '\0';
+ jsbody = dec_string(cx, state.recvbuf, state.read+1);
+ if(!jsbody)
+ {
+ // If we can't decode the body as UTF-8 we forcefully
+ // convert it to a string by just forcing each byte
+ // to a jschar.
+ jsbody = str_from_binary(cx, state.recvbuf, state.read);
+ if(!jsbody) {
+ if(!JS_IsExceptionPending(cx)) {
+ JS_ReportError(cx, "INTERNAL: Failed to decode body.");
+ }
+ goto done;
+ }
+ }
+ tmp = STRING_TO_JSVAL(jsbody);
+ }
+ else
+ {
+ tmp = JS_GetEmptyStringValue(cx);
+ }
+
+ if(!JS_DefineProperty(
+ cx,
+ obj,
+ "responseText",
+ tmp,
+ NULL,
+ NULL,
+ JSPROP_READONLY
+ ))
+ {
+ JS_ReportError(cx, "INTERNAL: Failed to set responseText.");
+ goto done;
+ }
+
+ ret = JS_TRUE;
+
+done:
+ if(state.recvbuf) JS_free(cx, state.recvbuf);
+ return ret;
+}
+
+static size_t
+send_body(void *ptr, size_t size, size_t nmem, void *data)
+{
+ CurlState* state = (CurlState*) data;
+ size_t length = size * nmem;
+ size_t towrite = state->sendlen - state->sent;
+ if(towrite == 0)
+ {
+ return 0;
+ }
+
+ if(length < towrite) towrite = length;
+
+ //fprintf(stderr, "%lu %lu %lu %lu\n", state->bodyused, state->bodyread, length, towrite);
+
+ memcpy(ptr, state->sendbuf + state->sent, towrite);
+ state->sent += towrite;
+
+ return towrite;
+}
+
+static int
+seek_body(void* ptr, curl_off_t offset, int origin)
+{
+ CurlState* state = (CurlState*) ptr;
+ if(origin != SEEK_SET) return -1;
+
+ state->sent = (size_t) offset;
+ return (int) state->sent;
+}
+
+static size_t
+recv_header(void *ptr, size_t size, size_t nmem, void *data)
+{
+ CurlState* state = (CurlState*) data;
+ char code[4];
+ char* header = (char*) ptr;
+ size_t length = size * nmem;
+ size_t index = 0;
+ JSString* hdr = NULL;
+ jsuint hdrlen;
+ jsval hdrval;
+
+ if(length > 7 && strncasecmp(header, "HTTP/1.", 7) == 0)
+ {
+ if(length < 12)
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ memcpy(code, header+9, 3*sizeof(char));
+ code[3] = '\0';
+ state->http->last_status = atoi(code);
+
+ state->resp_headers = JS_NewArrayObject(state->cx, 0, NULL);
+ if(!state->resp_headers)
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ return length;
+ }
+
+ // We get a notice at the \r\n\r\n after headers.
+ if(length <= 2)
+ {
+ return length;
+ }
+
+ // Append the new header to our array.
+ hdr = dec_string(state->cx, header, length);
+ if(!hdr)
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ if(!JS_GetArrayLength(state->cx, state->resp_headers, &hdrlen))
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ hdrval = STRING_TO_JSVAL(hdr);
+ if(!JS_SetElement(state->cx, state->resp_headers, hdrlen, &hdrval))
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ return length;
+}
+
+static size_t
+recv_body(void *ptr, size_t size, size_t nmem, void *data)
+{
+ CurlState* state = (CurlState*) data;
+ size_t length = size * nmem;
+ char* tmp = NULL;
+
+ if(!state->recvbuf)
+ {
+ state->recvlen = 4096;
+ state->read = 0;
+ state->recvbuf = JS_malloc(state->cx, state->recvlen);
+ }
+
+ if(!state->recvbuf)
+ {
+ return CURLE_WRITE_ERROR;
+ }
+
+ // +1 so we can add '\0' back up in the go function.
+ while(length+1 > state->recvlen - state->read) state->recvlen *= 2;
+ tmp = JS_realloc(state->cx, state->recvbuf, state->recvlen);
+ if(!tmp) return CURLE_WRITE_ERROR;
+ state->recvbuf = tmp;
+
+ memcpy(state->recvbuf + state->read, ptr, length);
+ state->read += length;
+ return length;
+}
+
+JSString*
+str_from_binary(JSContext* cx, char* data, size_t length)
+{
+ jschar* conv = (jschar*) JS_malloc(cx, length * sizeof(jschar));
+ JSString* ret = NULL;
+ size_t i;
+
+ if(!conv) return NULL;
+
+ for(i = 0; i < length; i++)
+ {
+ conv[i] = (jschar) data[i];
+ }
+
+ ret = JS_NewUCString(cx, conv, length);
+ if(!ret) JS_free(cx, conv);
+
+ return ret;
+}
diff --git a/src/couchdb/priv/couch_js/http.h b/src/couchdb/priv/couch_js/http.h
new file mode 100644
index 00000000..b5f8c70f
--- /dev/null
+++ b/src/couchdb/priv/couch_js/http.h
@@ -0,0 +1,18 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#ifndef COUCH_JS_HTTP_H
+#define COUCH_JS_HTTP_H
+
+JSObject* install_http(JSContext* cx, JSObject* global);
+
+#endif \ No newline at end of file
diff --git a/src/couchdb/priv/couch_js/main.c b/src/couchdb/priv/couch_js/main.c
new file mode 100644
index 00000000..376aa15b
--- /dev/null
+++ b/src/couchdb/priv/couch_js/main.c
@@ -0,0 +1,338 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <jsapi.h>
+#include "config.h"
+
+#include "utf8.h"
+#include "http.h"
+
+int gExitCode = 0;
+
+#ifdef JS_THREADSAFE
+#define SETUP_REQUEST(cx) \
+ JS_SetContextThread(cx); \
+ JS_BeginRequest(cx);
+#define FINISH_REQUEST(cx) \
+ JS_EndRequest(cx); \
+ JS_ClearContextThread(cx);
+#else
+#define SETUP_REQUEST(cx)
+#define FINISH_REQUEST(cx)
+#endif
+
+static JSBool
+evalcx(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval)
+{
+ JSString *str;
+ JSObject *sandbox;
+ JSContext *subcx;
+ const jschar *src;
+ size_t srclen;
+ JSBool ret = JS_FALSE;
+ jsval v;
+
+ sandbox = NULL;
+ if(!JS_ConvertArguments(cx, argc, argv, "S / o", &str, &sandbox))
+ {
+ return JS_FALSE;
+ }
+
+ subcx = JS_NewContext(JS_GetRuntime(cx), 8L * 1024L);
+ if(!subcx)
+ {
+ JS_ReportOutOfMemory(cx);
+ return JS_FALSE;
+ }
+
+ SETUP_REQUEST(subcx);
+
+ src = JS_GetStringChars(str);
+ srclen = JS_GetStringLength(str);
+
+ if(!sandbox)
+ {
+ sandbox = JS_NewObject(subcx, NULL, NULL, NULL);
+ if(!sandbox || !JS_InitStandardClasses(subcx, sandbox)) goto done;
+ }
+
+ if(srclen == 0)
+ {
+ *rval = OBJECT_TO_JSVAL(sandbox);
+ }
+ else
+ {
+ JS_EvaluateUCScript(subcx, sandbox, src, srclen, NULL, 0, rval);
+ }
+
+ ret = JS_TRUE;
+
+done:
+ FINISH_REQUEST(subcx);
+ JS_DestroyContext(subcx);
+ return ret;
+}
+
+static JSBool
+gc(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval)
+{
+ JS_GC(cx);
+ return JS_TRUE;
+}
+
+static JSBool
+print(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval)
+{
+ uintN i;
+ char *bytes;
+
+ for(i = 0; i < argc; i++)
+ {
+ bytes = enc_string(cx, argv[i], NULL);
+ if(!bytes) return JS_FALSE;
+
+ fprintf(stdout, "%s%s", i ? " " : "", bytes);
+ JS_free(cx, bytes);
+ }
+
+ fputc('\n', stdout);
+ fflush(stdout);
+ return JS_TRUE;
+}
+
+static JSBool
+quit(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval)
+{
+ JS_ConvertArguments(cx, argc, argv, "/ i", &gExitCode);
+ return JS_FALSE;
+}
+
+static char*
+readfp(JSContext* cx, FILE* fp, size_t* buflen)
+{
+ char* bytes = NULL;
+ char* tmp = NULL;
+ size_t used = 0;
+ size_t byteslen = 256;
+ size_t readlen = 0;
+
+ bytes = JS_malloc(cx, byteslen);
+ if(bytes == NULL) return NULL;
+
+ while((readlen = js_fgets(bytes+used, byteslen-used, stdin)) > 0)
+ {
+ used += readlen;
+
+ if(bytes[used-1] == '\n')
+ {
+ bytes[used-1] = '\0';
+ break;
+ }
+
+ // Double our buffer and read more.
+ byteslen *= 2;
+ tmp = JS_realloc(cx, bytes, byteslen);
+ if(!tmp)
+ {
+ JS_free(cx, bytes);
+ return NULL;
+ }
+ bytes = tmp;
+ }
+
+ *buflen = used;
+ return bytes;
+}
+
+static JSBool
+readline(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ jschar *chars;
+ JSString *str;
+ char* bytes;
+ char* tmp;
+ size_t byteslen;
+
+ /* GC Occasionally */
+ JS_MaybeGC(cx);
+
+ bytes = readfp(cx, stdin, &byteslen);
+ if(!bytes) return JS_FALSE;
+
+ /* Treat the empty string specially */
+ if(byteslen == 0)
+ {
+ *rval = JS_GetEmptyStringValue(cx);
+ JS_free(cx, bytes);
+ return JS_TRUE;
+ }
+
+ /* Shrink the buffer to the real size */
+ tmp = JS_realloc(cx, bytes, byteslen);
+ if(!tmp)
+ {
+ JS_free(cx, bytes);
+ return JS_FALSE;
+ }
+ bytes = tmp;
+
+ str = dec_string(cx, bytes, byteslen);
+ JS_free(cx, bytes);
+
+ if(!str) return JS_FALSE;
+
+ *rval = STRING_TO_JSVAL(str);
+
+ return JS_TRUE;
+}
+
+static JSBool
+seal(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+ JSObject *target;
+ JSBool deep = JS_FALSE;
+
+ if (!JS_ConvertArguments(cx, argc, argv, "o/b", &target, &deep))
+ return JS_FALSE;
+ if (!target)
+ return JS_TRUE;
+ return JS_SealObject(cx, target, deep);
+}
+
+static void
+execute_script(JSContext *cx, JSObject *obj, const char *filename) {
+ FILE *file;
+ JSScript *script;
+ jsval result;
+
+ if(!filename || strcmp(filename, "-") == 0)
+ {
+ file = stdin;
+ }
+ else
+ {
+ file = fopen(filename, "r");
+ if (!file)
+ {
+ fprintf(stderr, "could not open script file %s\n", filename);
+ gExitCode = 1;
+ return;
+ }
+ }
+
+ script = JS_CompileFileHandle(cx, obj, filename, file);
+ if(script)
+ {
+ JS_ExecuteScript(cx, obj, script, &result);
+ JS_DestroyScript(cx, script);
+ }
+}
+
+static void
+printerror(JSContext *cx, const char *mesg, JSErrorReport *report)
+{
+ if(!report || !JSREPORT_IS_WARNING(report->flags))
+ {
+ fprintf(stderr, "%s\n", mesg);
+ }
+}
+
+static JSFunctionSpec global_functions[] = {
+ {"evalcx", evalcx, 0, 0, 0},
+ {"gc", gc, 0, 0, 0},
+ {"print", print, 0, 0, 0},
+ {"quit", quit, 0, 0, 0},
+ {"readline", readline, 0, 0, 0},
+ {"seal", seal, 0, 0, 0},
+ {0, 0, 0, 0, 0}
+};
+
+static JSClass global_class = {
+ "GlobalClass",
+ JSCLASS_GLOBAL_FLAGS,
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_PropertyStub,
+ JS_EnumerateStub,
+ JS_ResolveStub,
+ JS_ConvertStub,
+ JS_FinalizeStub,
+ JSCLASS_NO_OPTIONAL_MEMBERS
+};
+
+int
+main(int argc, const char * argv[])
+{
+ JSRuntime* rt = NULL;
+ JSContext* cx = NULL;
+ JSObject* global = NULL;
+ JSFunctionSpec* sp = NULL;
+ int i = 0;
+
+ rt = JS_NewRuntime(64L * 1024L * 1024L);
+ if (!rt) return 1;
+
+ cx = JS_NewContext(rt, 8L * 1024L);
+ if (!cx) return 1;
+
+ JS_SetErrorReporter(cx, printerror);
+ JS_ToggleOptions(cx, JSOPTION_XML);
+
+ SETUP_REQUEST(cx);
+
+ global = JS_NewObject(cx, &global_class, NULL, NULL);
+ if (!global) return 1;
+ if (!JS_InitStandardClasses(cx, global)) return 1;
+
+ for(sp = global_functions; sp->name != NULL; sp++)
+ {
+ if(!JS_DefineFunction(cx, global,
+ sp->name, sp->call, sp->nargs, sp->flags))
+ {
+ fprintf(stderr, "Failed to create function: %s\n", sp->name);
+ return 1;
+ }
+ }
+
+ if(!install_http(cx, global))
+ {
+ return 1;
+ }
+
+ JS_SetGlobalObject(cx, global);
+
+ if(argc > 2)
+ {
+ fprintf(stderr, "incorrect number of arguments\n\n");
+ fprintf(stderr, "usage: %s <scriptfile>\n", argv[0]);
+ return 2;
+ }
+
+ if(argc == 0)
+ {
+ execute_script(cx, global, NULL);
+ }
+ else
+ {
+ execute_script(cx, global, argv[1]);
+ }
+
+ FINISH_REQUEST(cx);
+
+ JS_DestroyContext(cx);
+ JS_DestroyRuntime(rt);
+ JS_ShutDown();
+
+ return gExitCode;
+}
diff --git a/src/couchdb/priv/couch_js/utf8.c b/src/couchdb/priv/couch_js/utf8.c
new file mode 100644
index 00000000..699a6fee
--- /dev/null
+++ b/src/couchdb/priv/couch_js/utf8.c
@@ -0,0 +1,286 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#include <jsapi.h>
+
+static int
+enc_char(uint8 *utf8Buffer, uint32 ucs4Char)
+{
+ int utf8Length = 1;
+
+ if (ucs4Char < 0x80)
+ {
+ *utf8Buffer = (uint8)ucs4Char;
+ }
+ else
+ {
+ int i;
+ uint32 a = ucs4Char >> 11;
+ utf8Length = 2;
+ while(a)
+ {
+ a >>= 5;
+ utf8Length++;
+ }
+ i = utf8Length;
+ while(--i)
+ {
+ utf8Buffer[i] = (uint8)((ucs4Char & 0x3F) | 0x80);
+ ucs4Char >>= 6;
+ }
+ *utf8Buffer = (uint8)(0x100 - (1 << (8-utf8Length)) + ucs4Char);
+ }
+
+ return utf8Length;
+}
+
+static JSBool
+enc_charbuf(const jschar* src, size_t srclen, char* dst, size_t* dstlenp)
+{
+ size_t i;
+ size_t utf8Len;
+ size_t dstlen = *dstlenp;
+ size_t origDstlen = dstlen;
+ jschar c;
+ jschar c2;
+ uint32 v;
+ uint8 utf8buf[6];
+
+ if(!dst)
+ {
+ dstlen = origDstlen = (size_t) -1;
+ }
+
+ while(srclen)
+ {
+ c = *src++;
+ srclen--;
+
+ if((c >= 0xDC00) && (c <= 0xDFFF)) goto bad_surrogate;
+
+ if(c < 0xD800 || c > 0xDBFF)
+ {
+ v = c;
+ }
+ else
+ {
+ if(srclen < 1) goto buffer_too_small;
+ c2 = *src++;
+ srclen--;
+ if ((c2 < 0xDC00) || (c2 > 0xDFFF))
+ {
+ c = c2;
+ goto bad_surrogate;
+ }
+ v = ((c - 0xD800) << 10) + (c2 - 0xDC00) + 0x10000;
+ }
+ if(v < 0x0080)
+ {
+ /* no encoding necessary - performance hack */
+ if(!dstlen) goto buffer_too_small;
+ if(dst) *dst++ = (char) v;
+ utf8Len = 1;
+ }
+ else
+ {
+ utf8Len = enc_char(utf8buf, v);
+ if(utf8Len > dstlen) goto buffer_too_small;
+ if(dst)
+ {
+ for (i = 0; i < utf8Len; i++)
+ {
+ *dst++ = (char) utf8buf[i];
+ }
+ }
+ }
+ dstlen -= utf8Len;
+ }
+
+ *dstlenp = (origDstlen - dstlen);
+ return JS_TRUE;
+
+bad_surrogate:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+
+buffer_too_small:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+}
+
+char*
+enc_string(JSContext* cx, jsval arg, size_t* buflen)
+{
+ JSString* str = NULL;
+ jschar* src = NULL;
+ char* bytes = NULL;
+ size_t srclen = 0;
+ size_t byteslen = 0;
+
+ str = JS_ValueToString(cx, arg);
+ if(!str) goto error;
+
+ src = JS_GetStringChars(str);
+ srclen = JS_GetStringLength(str);
+
+ if(!enc_charbuf(src, srclen, NULL, &byteslen)) goto error;
+
+ bytes = JS_malloc(cx, (byteslen) + 1);
+ bytes[byteslen] = 0;
+
+ if(!enc_charbuf(src, srclen, bytes, &byteslen)) goto error;
+
+ if(buflen) *buflen = byteslen;
+ goto success;
+
+error:
+ if(bytes != NULL) JS_free(cx, bytes);
+ bytes = NULL;
+
+success:
+ return bytes;
+}
+
+static uint32
+dec_char(const uint8 *utf8Buffer, int utf8Length)
+{
+ uint32 ucs4Char;
+ uint32 minucs4Char;
+
+ /* from Unicode 3.1, non-shortest form is illegal */
+ static const uint32 minucs4Table[] = {
+ 0x00000080, 0x00000800, 0x0001000, 0x0020000, 0x0400000
+ };
+
+ if (utf8Length == 1)
+ {
+ ucs4Char = *utf8Buffer;
+ }
+ else
+ {
+ ucs4Char = *utf8Buffer++ & ((1<<(7-utf8Length))-1);
+ minucs4Char = minucs4Table[utf8Length-2];
+ while(--utf8Length)
+ {
+ ucs4Char = ucs4Char<<6 | (*utf8Buffer++ & 0x3F);
+ }
+ if(ucs4Char < minucs4Char || ucs4Char == 0xFFFE || ucs4Char == 0xFFFF)
+ {
+ ucs4Char = 0xFFFD;
+ }
+ }
+
+ return ucs4Char;
+}
+
+static JSBool
+dec_charbuf(const char *src, size_t srclen, jschar *dst, size_t *dstlenp)
+{
+ uint32 v;
+ size_t offset = 0;
+ size_t j;
+ size_t n;
+ size_t dstlen = *dstlenp;
+ size_t origDstlen = dstlen;
+
+ if(!dst) dstlen = origDstlen = (size_t) -1;
+
+ while(srclen)
+ {
+ v = (uint8) *src;
+ n = 1;
+
+ if(v & 0x80)
+ {
+ while(v & (0x80 >> n))
+ {
+ n++;
+ }
+
+ if(n > srclen) goto buffer_too_small;
+ if(n == 1 || n > 6) goto bad_character;
+
+ for(j = 1; j < n; j++)
+ {
+ if((src[j] & 0xC0) != 0x80) goto bad_character;
+ }
+
+ v = dec_char((const uint8 *) src, n);
+ if(v >= 0x10000)
+ {
+ v -= 0x10000;
+
+ if(v > 0xFFFFF || dstlen < 2)
+ {
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+ }
+
+ if(dstlen < 2) goto buffer_too_small;
+
+ if(dst)
+ {
+ *dst++ = (jschar)((v >> 10) + 0xD800);
+ v = (jschar)((v & 0x3FF) + 0xDC00);
+ }
+ dstlen--;
+ }
+ }
+
+ if(!dstlen) goto buffer_too_small;
+ if(dst) *dst++ = (jschar) v;
+
+ dstlen--;
+ offset += n;
+ src += n;
+ srclen -= n;
+ }
+
+ *dstlenp = (origDstlen - dstlen);
+ return JS_TRUE;
+
+bad_character:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+
+buffer_too_small:
+ *dstlenp = (origDstlen - dstlen);
+ return JS_FALSE;
+}
+
+JSString*
+dec_string(JSContext* cx, const char* bytes, size_t byteslen)
+{
+ JSString* str = NULL;
+ jschar* chars = NULL;
+ size_t charslen;
+
+ if(!dec_charbuf(bytes, byteslen, NULL, &charslen)) goto error;
+
+ chars = JS_malloc(cx, (charslen + 1) * sizeof(jschar));
+ if(!chars) return NULL;
+ chars[charslen] = 0;
+
+ if(!dec_charbuf(bytes, byteslen, chars, &charslen)) goto error;
+
+ str = JS_NewUCString(cx, chars, charslen - 1);
+ if(!str) goto error;
+
+ goto success;
+
+error:
+ if(chars != NULL) JS_free(cx, chars);
+ str = NULL;
+
+success:
+ return str;
+} \ No newline at end of file
diff --git a/src/couchdb/priv/couch_js/utf8.h b/src/couchdb/priv/couch_js/utf8.h
new file mode 100644
index 00000000..00f6b736
--- /dev/null
+++ b/src/couchdb/priv/couch_js/utf8.h
@@ -0,0 +1,19 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+#ifndef COUCH_JS_UTF_8_H
+#define COUCH_JS_UTF_8_H
+
+char* enc_string(JSContext* cx, jsval arg, size_t* buflen);
+JSString* dec_string(JSContext* cx, const char* buf, size_t buflen);
+
+#endif \ No newline at end of file
diff --git a/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c b/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c
new file mode 100644
index 00000000..06782315
--- /dev/null
+++ b/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c
@@ -0,0 +1,145 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+// Do what 2 lines of shell script in couchspawnkillable does...
+// * Create a new suspended process with the same (duplicated) standard
+// handles as us.
+// * Write a line to stdout, consisting of the path to ourselves, plus
+// '--kill {pid}' where {pid} is the PID of the newly created process.
+// * Un-suspend the new process.
+// * Wait for the process to terminate.
+// * Terminate with the child's exit-code.
+
+// Later, couch will call us with --kill and the PID, so we dutifully
+// terminate the specified PID.
+
+#include <stdlib.h>
+#include "windows.h"
+
+char *get_child_cmdline(int argc, char **argv)
+{
+ // make a new command-line, but skipping me.
+ // XXX - todo - spaces etc in args???
+ int i;
+ char *p, *cmdline;
+ int nchars = 0;
+ int nthis = 1;
+ for (i=1;i<argc;i++)
+ nchars += strlen(argv[i])+1;
+ cmdline = p = malloc(nchars+1);
+ if (!cmdline)
+ return NULL;
+ for (i=1;i<argc;i++) {
+ nthis = strlen(argv[i]);
+ strncpy(p, argv[i], nthis);
+ p[nthis] = ' ';
+ p += nthis+1;
+ }
+ // Replace the last space we added above with a '\0'
+ cmdline[nchars-1] = '\0';
+ return cmdline;
+}
+
+// create the child process, returning 0, or the exit-code we will
+// terminate with.
+int create_child(int argc, char **argv, PROCESS_INFORMATION *pi)
+{
+ char buf[1024];
+ DWORD dwcreate;
+ STARTUPINFO si;
+ char *cmdline;
+ if (argc < 2)
+ return 1;
+ cmdline = get_child_cmdline(argc, argv);
+ if (!cmdline)
+ return 2;
+
+ memset(&si, 0, sizeof(si));
+ si.cb = sizeof(si);
+ // depending on how *our* parent is started, we may or may not have
+ // a valid stderr stream - so although we try and duplicate it, only
+ // failing to duplicate stdin and stdout are considered fatal.
+ if (!DuplicateHandle(GetCurrentProcess(),
+ GetStdHandle(STD_INPUT_HANDLE),
+ GetCurrentProcess(),
+ &si.hStdInput,
+ 0,
+ TRUE, // inheritable
+ DUPLICATE_SAME_ACCESS) ||
+ !DuplicateHandle(GetCurrentProcess(),
+ GetStdHandle(STD_OUTPUT_HANDLE),
+ GetCurrentProcess(),
+ &si.hStdOutput,
+ 0,
+ TRUE, // inheritable
+ DUPLICATE_SAME_ACCESS)) {
+ return 3;
+ }
+ DuplicateHandle(GetCurrentProcess(),
+ GetStdHandle(STD_ERROR_HANDLE),
+ GetCurrentProcess(),
+ &si.hStdError,
+ 0,
+ TRUE, // inheritable
+ DUPLICATE_SAME_ACCESS);
+
+ si.dwFlags = STARTF_USESTDHANDLES;
+ dwcreate = CREATE_SUSPENDED;
+ if (!CreateProcess( NULL, cmdline,
+ NULL,
+ NULL,
+ TRUE, // inherit handles
+ dwcreate,
+ NULL, // environ
+ NULL, // cwd
+ &si,
+ pi))
+ return 4;
+ return 0;
+}
+
+// and here we go...
+int main(int argc, char **argv)
+{
+ char out_buf[1024];
+ int rc;
+ DWORD cbwritten;
+ DWORD exitcode;
+ PROCESS_INFORMATION pi;
+ if (argc==3 && strcmp(argv[1], "--kill")==0) {
+ HANDLE h = OpenProcess(PROCESS_TERMINATE, 0, atoi(argv[2]));
+ if (!h)
+ return 1;
+ if (!TerminateProcess(h, 0))
+ return 2;
+ CloseHandle(h);
+ return 0;
+ }
+ // spawn the new suspended process
+ rc = create_child(argc, argv, &pi);
+ if (rc)
+ return rc;
+ // Write the 'terminate' command, which includes this PID, back to couch.
+ // *sob* - what about spaces etc?
+ sprintf_s(out_buf, sizeof(out_buf), "%s --kill %d\n",
+ argv[0], pi.dwProcessId);
+ WriteFile(GetStdHandle(STD_OUTPUT_HANDLE), out_buf, strlen(out_buf),
+ &cbwritten, NULL);
+ // Let the child process go...
+ ResumeThread(pi.hThread);
+ // Wait for the process to terminate so we can reflect the exit code
+ // back to couch.
+ WaitForSingleObject(pi.hProcess, INFINITE);
+ if (!GetExitCodeProcess(pi.hProcess, &exitcode))
+ return 6;
+ return exitcode;
+}
diff --git a/src/mem3.erl b/src/mem3.erl
deleted file mode 100644
index 1485c7fe..00000000
--- a/src/mem3.erl
+++ /dev/null
@@ -1,103 +0,0 @@
--module(mem3).
-
--export([start/0, stop/0, restart/0, nodes/0, shards/1, shards/2,
- choose_shards/2]).
--export([compare_nodelists/0, compare_shards/1]).
-
--include("mem3.hrl").
-
-start() ->
- application:start(mem3).
-
-stop() ->
- application:stop(mem3).
-
-restart() ->
- stop(),
- start().
-
-%% @doc Detailed report of cluster-wide membership state. Queries the state
-%% on all member nodes and builds a dictionary with unique states as the
-%% key and the nodes holding that state as the value. Also reports member
-%% nodes which fail to respond and nodes which are connected but are not
-%% cluster members. Useful for debugging.
--spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes
- | non_member_nodes, [node()]}].
-compare_nodelists() ->
- Nodes = mem3:nodes(),
- AllNodes = erlang:nodes([this, visible]),
- {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist),
- Dict = lists:foldl(fun({Node, Nodelist}, D) ->
- orddict:append({cluster_nodes, Nodelist}, Node, D)
- end, orddict:new(), Replies),
- [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
-
--spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}].
-compare_shards(DbName) when is_list(DbName) ->
- compare_shards(list_to_binary(DbName));
-compare_shards(DbName) ->
- Nodes = mem3:nodes(),
- {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]),
- GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)],
- Dict = lists:foldl(fun({Shards, Node}, D) ->
- orddict:append(Shards, Node, D)
- end, orddict:new(), lists:zip(Replies, GoodNodes)),
- [{bad_nodes, BadNodes} | Dict].
-
--spec nodes() -> [node()].
-nodes() ->
- mem3_nodes:get_nodelist().
-
--spec shards(DbName::iodata()) -> [#shard{}].
-shards(DbName) when is_list(DbName) ->
- shards(list_to_binary(DbName));
-shards(DbName) ->
- try ets:lookup(partitions, DbName) of
- [] ->
- mem3_util:load_shards_from_disk(DbName);
- Else ->
- Else
- catch error:badarg ->
- mem3_util:load_shards_from_disk(DbName)
- end.
-
--spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}].
-shards(DbName, DocId) when is_list(DbName) ->
- shards(list_to_binary(DbName), DocId);
-shards(DbName, DocId) when is_list(DocId) ->
- shards(DbName, list_to_binary(DocId));
-shards(DbName, DocId) ->
- HashKey = mem3_util:hash(DocId),
- Head = #shard{
- name = '_',
- node = '_',
- dbname = DbName,
- range = ['$1','$2'],
- ref = '_'
- },
- Conditions = [{'<', '$1', HashKey}, {'=<', HashKey, '$2'}],
- try ets:select(partitions, [{Head, Conditions, ['$_']}]) of
- [] ->
- mem3_util:load_shards_from_disk(DbName, DocId);
- Shards ->
- Shards
- catch error:badarg ->
- mem3_util:load_shards_from_disk(DbName, DocId)
- end.
-
--spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}].
-choose_shards(DbName, Options) when is_list(DbName) ->
- choose_shards(list_to_binary(DbName), Options);
-choose_shards(DbName, Options) ->
- try shards(DbName)
- catch error:E when E==database_does_not_exist; E==badarg ->
- Nodes = mem3:nodes(),
- NodeCount = length(Nodes),
- N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount),
- Q = mem3_util:to_integer(couch_util:get_value(q, Options,
- couch_config:get("cluster", "q", "8"))),
- % rotate to a random entry in the nodelist for even distribution
- {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes),
- RotatedNodes = B ++ A,
- mem3_util:create_partition_map(DbName, N, Q, RotatedNodes)
- end.
diff --git a/src/mem3_app.erl b/src/mem3_app.erl
deleted file mode 100644
index 88cd1ea1..00000000
--- a/src/mem3_app.erl
+++ /dev/null
@@ -1,9 +0,0 @@
--module(mem3_app).
--behaviour(application).
--export([start/2, stop/1]).
-
-start(_Type, []) ->
- mem3_sup:start_link().
-
-stop([]) ->
- ok.
diff --git a/src/mem3_cache.erl b/src/mem3_cache.erl
deleted file mode 100644
index 2a29ca4c..00000000
--- a/src/mem3_cache.erl
+++ /dev/null
@@ -1,92 +0,0 @@
--module(mem3_cache).
--behaviour(gen_server).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--export([start_link/0]).
-
--record(state, {changes_pid}).
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-init([]) ->
- ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]),
- {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end),
- {ok, #state{changes_pid = Pid}}.
-
-handle_call(_Call, _From, State) ->
- {noreply, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({'DOWN', _, _, Pid, {badarg, [{ets,delete,[partitions,_]}|_]}},
- #state{changes_pid=Pid} = State) ->
- % fatal error, somebody deleted our ets table
- {stop, ets_table_error, State};
-handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
- ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]),
- Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end,
- timer:send_after(5000, {start_listener, Seq}),
- {noreply, State};
-handle_info({start_listener, Seq}, State) ->
- {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
- {noreply, State#state{changes_pid=NewPid}};
-handle_info(_Msg, State) ->
- {noreply, State}.
-
-terminate(_Reason, #state{changes_pid=Pid}) ->
- exit(Pid, kill),
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%% internal functions
-
-listen_for_changes(Since) ->
- DbName = ?l2b(couch_config:get("mem3", "db", "dbs")),
- {ok, Db} = ensure_exists(DbName),
- Args = #changes_args{
- feed = "continuous",
- since = Since,
- heartbeat = true,
- include_docs = true
- },
- ChangesFun = couch_changes:handle_changes(Args, nil, Db),
- ChangesFun(fun changes_callback/2).
-
-ensure_exists(DbName) ->
- Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
- case couch_db:open(DbName, Options) of
- {ok, Db} ->
- {ok, Db};
- _ ->
- couch_server:create(DbName, Options)
- end.
-
-changes_callback(start, _) ->
- {ok, nil};
-changes_callback({stop, EndSeq}, _) ->
- exit({seq, EndSeq});
-changes_callback({change, {Change}, _}, _) ->
- DbName = couch_util:get_value(<<"id">>, Change),
- case couch_util:get_value(deleted, Change, false) of
- true ->
- ets:delete(partitions, DbName);
- false ->
- case couch_util:get_value(doc, Change) of
- {error, Reason} ->
- ?LOG_ERROR("missing partition table for ~s: ~p", [DbName, Reason]);
- {Doc} ->
- ets:delete(partitions, DbName),
- ets:insert(partitions, mem3_util:build_shards(DbName, Doc))
- end
- end,
- {ok, couch_util:get_value(<<"seq">>, Change)};
-changes_callback(timeout, _) ->
- {ok, nil}.
diff --git a/src/mem3_httpd.erl b/src/mem3_httpd.erl
deleted file mode 100644
index cbfaea95..00000000
--- a/src/mem3_httpd.erl
+++ /dev/null
@@ -1,39 +0,0 @@
--module(mem3_httpd).
-
--export([handle_membership_req/1]).
-
-%% includes
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-
-handle_membership_req(#httpd{method='GET',
- path_parts=[<<"_membership">>]} = Req) ->
- ClusterNodes = try mem3:nodes()
- catch _:_ -> {ok,[]} end,
- couch_httpd:send_json(Req, {[
- {all_nodes, lists:sort([node()|nodes()])},
- {cluster_nodes, lists:sort(ClusterNodes)}
- ]});
-handle_membership_req(#httpd{method='GET',
- path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) ->
- ClusterNodes = try mem3:nodes()
- catch _:_ -> {ok,[]} end,
- Shards = mem3:shards(DbName),
- JsonShards = json_shards(Shards, dict:new()),
- couch_httpd:send_json(Req, {[
- {all_nodes, lists:sort([node()|nodes()])},
- {cluster_nodes, lists:sort(ClusterNodes)},
- {partitions, JsonShards}
- ]}).
-
-%%
-%% internal
-%%
-
-json_shards([], AccIn) ->
- List = dict:to_list(AccIn),
- {lists:sort(List)};
-json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) ->
- HexBeg = couch_util:to_hex(<<B:32/integer>>),
- json_shards(Rest, dict:append(HexBeg, Node, AccIn)).
diff --git a/src/mem3_nodes.erl b/src/mem3_nodes.erl
deleted file mode 100644
index 6cbf3d9a..00000000
--- a/src/mem3_nodes.erl
+++ /dev/null
@@ -1,120 +0,0 @@
--module(mem3_nodes).
--behaviour(gen_server).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--export([start_link/0, get_nodelist/0]).
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--record(state, {changes_pid, update_seq, nodes}).
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-get_nodelist() ->
- gen_server:call(?MODULE, get_nodelist).
-
-init([]) ->
- {Nodes, UpdateSeq} = initialize_nodelist(),
- {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end),
- {ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}.
-
-handle_call(get_nodelist, _From, State) ->
- {reply, State#state.nodes, State};
-handle_call({add_node, Node}, _From, #state{nodes=Nodes} = State) ->
- gen_event:notify(mem3_events, {add_node, Node}),
- {reply, ok, State#state{nodes = lists:umerge([Node], Nodes)}};
-handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) ->
- gen_event:notify(mem3_events, {remove_node, Node}),
- {reply, ok, State#state{nodes = lists:delete(Node, Nodes)}};
-handle_call(_Call, _From, State) ->
- {noreply, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
- ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]),
- StartSeq = State#state.update_seq,
- Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end,
- timer:send_after(5000, start_listener),
- {noreply, State#state{update_seq = Seq}};
-handle_info(start_listener, #state{update_seq = Seq} = State) ->
- {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
- {noreply, State#state{changes_pid=NewPid}};
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%% internal functions
-
-initialize_nodelist() ->
- DbName = couch_config:get("mem3", "nodedb", "nodes"),
- {ok, Db} = ensure_exists(DbName),
- {ok, _, Nodes0} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, [], []),
- % add self if not already present
- case lists:member(node(), Nodes0) of
- true ->
- Nodes = Nodes0;
- false ->
- Doc = #doc{id = couch_util:to_binary(node())},
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- Nodes = [node() | Nodes0]
- end,
- couch_db:close(Db),
- {lists:sort(Nodes), Db#db.update_seq}.
-
-first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
- {ok, Acc};
-first_fold(#full_doc_info{deleted=true}, _, Acc) ->
- {ok, Acc};
-first_fold(#full_doc_info{id=Id}, _, Acc) ->
- {ok, [mem3_util:to_atom(Id) | Acc]}.
-
-listen_for_changes(Since) ->
- DbName = ?l2b(couch_config:get("mem3", "nodedb", "nodes")),
- {ok, Db} = ensure_exists(DbName),
- Args = #changes_args{
- feed = "continuous",
- since = Since,
- heartbeat = true,
- include_docs = true
- },
- ChangesFun = couch_changes:handle_changes(Args, nil, Db),
- ChangesFun(fun changes_callback/2).
-
-ensure_exists(DbName) when is_list(DbName) ->
- ensure_exists(list_to_binary(DbName));
-ensure_exists(DbName) ->
- Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
- case couch_db:open(DbName, Options) of
- {ok, Db} ->
- {ok, Db};
- _ ->
- couch_server:create(DbName, Options)
- end.
-
-changes_callback(start, _) ->
- {ok, nil};
-changes_callback({stop, EndSeq}, _) ->
- exit({seq, EndSeq});
-changes_callback({change, {Change}, _}, _) ->
- Node = couch_util:get_value(<<"id">>, Change),
- case Node of <<"_design/", _/binary>> -> ok; _ ->
- case couch_util:get_value(deleted, Change, false) of
- false ->
- gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node)});
- true ->
- gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)})
- end
- end,
- {ok, couch_util:get_value(<<"seq">>, Change)};
-changes_callback(timeout, _) ->
- {ok, nil}.
diff --git a/src/mem3_sup.erl b/src/mem3_sup.erl
deleted file mode 100644
index 58d0bbf5..00000000
--- a/src/mem3_sup.erl
+++ /dev/null
@@ -1,21 +0,0 @@
--module(mem3_sup).
--behaviour(supervisor).
--export([start_link/0, init/1]).
-
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-init(_Args) ->
- Children = [
- child(mem3_events),
- child(mem3_sync),
- child(mem3_cache),
- child(mem3_nodes)
- ],
- {ok, {{one_for_one,10,1}, Children}}.
-
-child(mem3_events) ->
- MFA = {gen_event, start_link, [{local, mem3_events}]},
- {mem3_events, MFA, permanent, 1000, worker, dynamic};
-child(Child) ->
- {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}.
diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl
deleted file mode 100644
index d3b3ea51..00000000
--- a/src/mem3_sync.erl
+++ /dev/null
@@ -1,215 +0,0 @@
--module(mem3_sync).
--behaviour(gen_server).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--export([start_link/0, get_active/0, get_queue/0, push/2, remove_node/1]).
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--record(state, {
- active = [],
- count = 0,
- limit,
- dict = dict:new(),
- waiting = [],
- update_notifier
-}).
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-get_active() ->
- gen_server:call(?MODULE, get_active).
-
-get_queue() ->
- gen_server:call(?MODULE, get_queue).
-
-push(Db, Node) ->
- gen_server:cast(?MODULE, {push, Db, Node}).
-
-remove_node(Node) ->
- gen_server:cast(?MODULE, {remove_node, Node}).
-
-init([]) ->
- process_flag(trap_exit, true),
- Concurrency = couch_config:get("mem3", "sync_concurrency", "10"),
- gen_event:add_handler(mem3_events, mem3_sync_event, []),
- {ok, Pid} = start_update_notifier(),
- spawn(fun initial_sync/0),
- {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
-
-handle_call(get_active, _From, State) ->
- {reply, State#state.active, State};
-
-handle_call(get_queue, _From, State) ->
- {reply, State#state.waiting, State}.
-
-handle_cast({push, DbName, Node}, #state{count=Count, limit=Limit} = State)
- when Count >= Limit ->
- {noreply, add_to_queue(State, DbName, Node)};
-
-handle_cast({push, DbName, Node}, State) ->
- #state{active = L, count = C} = State,
- case is_running(DbName, Node, L) of
- true ->
- {noreply, add_to_queue(State, DbName, Node)};
- false ->
- Pid = start_push_replication(DbName, Node),
- {noreply, State#state{active=[{DbName, Node, Pid}|L], count=C+1}}
- end;
-
-handle_cast({remove_node, Node}, State) ->
- Waiting = [{S,N} || {S,N} <- State#state.waiting, N =/= Node],
- Dict = lists:foldl(fun(DbName,D) -> dict:erase({DbName,Node}, D) end,
- State#state.dict, [S || {S,N} <- Waiting, N =:= Node]),
- {noreply, State#state{dict = Dict, waiting = Waiting}}.
-
-handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
- {ok, NewPid} = start_update_notifier(),
- {noreply, State#state{update_notifier=NewPid}};
-
-handle_info({'EXIT', Active, normal}, State) ->
- handle_replication_exit(State, Active);
-
-handle_info({'EXIT', Active, Reason}, State) ->
- case lists:keyfind(Active, 3, State#state.active) of
- {OldDbName, OldNode, _} ->
- ?LOG_ERROR("~p replication ~s -> ~p died:~n~p", [?MODULE, OldDbName,
- OldNode, Reason]),
- timer:apply_after(5000, ?MODULE, push, [OldDbName, OldNode]);
- false -> ok end,
- handle_replication_exit(State, Active);
-
-handle_info(Msg, State) ->
- ?LOG_ERROR("unexpected msg at replication manager ~p", [Msg]),
- {noreply, State}.
-
-terminate(_Reason, State) ->
- [exit(Pid, shutdown) || {_,_,Pid} <- State#state.active],
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-handle_replication_exit(#state{waiting=[]} = State, Pid) ->
- NewActive = lists:keydelete(Pid, 3, State#state.active),
- {noreply, State#state{active=NewActive, count=length(NewActive)}};
-handle_replication_exit(State, Pid) ->
- #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
- Active1 = lists:keydelete(Pid, 3, Active),
- Count = length(Active1),
- NewState = if Count < Limit ->
- case next_replication(Active1, Waiting) of
- nil -> % all waiting replications are also active
- State#state{active = Active1, count = Count};
- {DbName, Node, StillWaiting} ->
- NewPid = start_push_replication(DbName, Node),
- State#state{
- active = [{DbName, Node, NewPid} | Active1],
- count = Count+1,
- dict = dict:erase({DbName,Node}, D),
- waiting = StillWaiting
- }
- end;
- true ->
- State#state{active = Active1, count=Count}
- end,
- {noreply, NewState}.
-
-start_push_replication(DbName, Node) ->
- PostBody = {[
- {<<"source">>, DbName},
- {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, DbName}]}},
- {<<"continuous">>, false},
- {<<"async">>, true}
- ]},
- ?LOG_INFO("starting ~s -> ~p internal replication", [DbName, Node]),
- UserCtx = #user_ctx{name = <<"replicator">>, roles = [<<"_admin">>]},
- case (catch couch_rep:replicate(PostBody, UserCtx)) of
- Pid when is_pid(Pid) ->
- link(Pid),
- Pid;
- {db_not_found, _Msg} ->
- case couch_db:open(DbName, []) of
- {ok, Db} ->
- % source exists, let's (re)create the target
- couch_db:close(Db),
- case rpc:call(Node, couch_api, create_db, [DbName, []]) of
- {ok, Target} ->
- ?LOG_INFO("~p successfully created ~s on ~p", [?MODULE, DbName,
- Node]),
- couch_db:close(Target),
- start_push_replication(DbName, Node);
- file_exists ->
- start_push_replication(DbName, Node);
- Error ->
- ?LOG_ERROR("~p couldn't create ~s on ~p because ~p",
- [?MODULE, DbName, Node, Error]),
- exit(shutdown)
- end;
- {not_found, no_db_file} ->
- % source is gone, so this is a hack to skip it
- ?LOG_INFO("~p tried to push ~s to ~p but it was already deleted",
- [?MODULE, DbName, Node]),
- spawn_link(fun() -> ok end)
- end;
- {node_not_connected, _} ->
- % we'll get this one when the node rejoins
- ?LOG_ERROR("~p exiting because ~p is not connected", [?MODULE, Node]),
- spawn_link(fun() -> ok end);
- CatchAll ->
- ?LOG_INFO("~p strange error ~p", [?MODULE, CatchAll]),
- case lists:member(Node, nodes()) of
- true ->
- timer:apply_after(5000, ?MODULE, push, [DbName, Node]);
- false ->
- ok
- end,
- spawn_link(fun() -> ok end)
- end.
-
-add_to_queue(State, DbName, Node) ->
- #state{dict=D, waiting=Waiting} = State,
- case dict:is_key({DbName, Node}, D) of
- true ->
- State;
- false ->
- ?LOG_DEBUG("adding ~s -> ~p to internal queue", [DbName, Node]),
- State#state{
- dict = dict:store({DbName,Node}, ok, D),
- waiting = Waiting ++ [{DbName,Node}]
- }
- end.
-
-initial_sync() ->
- Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
- Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
- Nodes = mem3:nodes(),
- Live = nodes(),
- [[push(Db, N) || Db <- [Db1,Db2]] || N <- Nodes, lists:member(N, Live)].
-
-start_update_notifier() ->
- Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
- Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
- couch_db_update_notifier:start_link(fun
- ({updated, Db}) when Db == Db1; Db == Db2 ->
- Nodes = mem3:nodes(),
- Live = nodes(),
- [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)];
- (_) -> ok end).
-
-%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
-%% which does not correspond to an already running replication
--spec next_replication(list(), list()) -> {binary(),node(),list()} | nil.
-next_replication(Active, Waiting) ->
- case lists:splitwith(fun({S,N}) -> is_running(S,N,Active) end, Waiting) of
- {_, []} ->
- nil;
- {Running, [{DbName,Node}|Rest]} ->
- {DbName, Node, Running ++ Rest}
- end.
-
-is_running(DbName, Node, ActiveList) ->
- [] =/= [true || {S,N,_} <- ActiveList, S=:=DbName, N=:=Node].
diff --git a/src/mem3_sync_event.erl b/src/mem3_sync_event.erl
deleted file mode 100644
index 45fcb8aa..00000000
--- a/src/mem3_sync_event.erl
+++ /dev/null
@@ -1,44 +0,0 @@
--module(mem3_sync_event).
--behaviour(gen_event).
-
--export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
- code_change/3]).
-
-init(_) ->
- {ok, nil}.
-
-handle_event({add_node, Node}, State) ->
- Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
- Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
- [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]],
- {ok, State};
-
-handle_event({nodeup, Node}, State) ->
- case lists:member(Node, mem3:nodes()) of
- true ->
- Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
- Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
- [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]];
- false ->
- ok
- end,
- {ok, State};
-
-handle_event({Down, Node}, State) when Down == nodedown; Down == remove_node ->
- mem3_sync:remove_node(Node),
- {ok, State};
-
-handle_event(_Event, State) ->
- {ok, State}.
-
-handle_call(_Request, State) ->
- {ok, ok, State}.
-
-handle_info(_Info, State) ->
- {ok, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
deleted file mode 100644
index 2ed84db6..00000000
--- a/src/mem3_util.erl
+++ /dev/null
@@ -1,139 +0,0 @@
--module(mem3_util).
-
--export([hash/1, name_shard/1, create_partition_map/4, build_shards/2,
- n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
- load_shards_from_disk/1, load_shards_from_disk/2]).
-
--define(RINGTOP, 2 bsl 31). % CRC32 space
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-hash(Item) when is_binary(Item) ->
- erlang:crc32(Item);
-hash(Item) ->
- erlang:crc32(term_to_binary(Item)).
-
-name_shard(#shard{dbname = DbName, range=[B,E]} = Shard) ->
- Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
- couch_util:to_hex(<<E:32/integer>>), "/", DbName],
- Shard#shard{name = ?l2b(Name)}.
-
-create_partition_map(DbName, N, Q, Nodes) ->
- UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []),
- Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]),
- Shards1 = attach_nodes(Shards0, [], Nodes, []),
- [name_shard(S#shard{dbname=DbName}) || S <- Shards1].
-
-make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP ->
- Acc;
-make_key_ranges(Increment, Start, Acc) ->
- case Start + 2*Increment of
- X when X > ?RINGTOP ->
- End = ?RINGTOP - 1;
- _ ->
- End = Start + Increment - 1
- end,
- make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]).
-
-attach_nodes([], Acc, _, _) ->
- lists:reverse(Acc);
-attach_nodes(Shards, Acc, [], UsedNodes) ->
- attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []);
-attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) ->
- attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]).
-
-write_db_doc(Doc) ->
- {ok, Db} = couch_db:open(<<"dbs">>, []),
- try
- update_db_doc(Db, Doc)
- catch conflict ->
- ?LOG_ERROR("conflict writing db doc, must be a race", [])
- after
- couch_db:close(Db)
- end.
-
-update_db_doc(Db, #doc{id=Id, body=Body} = Doc) ->
- case couch_db:open_doc(Db, Id, []) of
- {not_found, _} ->
- {ok, _} = couch_db:update_doc(Db, Doc, []);
- {ok, #doc{body=Body}} ->
- ok;
- {ok, OldDoc} ->
- {ok, _} = couch_db:update_doc(Db, OldDoc#doc{body=Body}, [])
- end.
-
-delete_db_doc(DocId) ->
- {ok, Db} = couch_db:open(<<"dbs">>, []),
- try
- delete_db_doc(Db, DocId)
- catch conflict ->
- ok
- after
- couch_db:close(Db)
- end.
-
-delete_db_doc(Db, DocId) ->
- case couch_db:open_doc(Db, DocId, []) of
- {not_found, _} ->
- ok;
- {ok, OldDoc} ->
- {ok, _} = couch_db:update_doc(Db, OldDoc#doc{deleted=true}, [])
- end.
-
-build_shards(DbName, DocProps) ->
- {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
- lists:flatmap(fun({Node, Ranges}) ->
- lists:map(fun(Range) ->
- [B,E] = string:tokens(?b2l(Range), "-"),
- Beg = httpd_util:hexlist_to_integer(B),
- End = httpd_util:hexlist_to_integer(E),
- name_shard(#shard{
- dbname = DbName,
- node = to_atom(Node),
- range = [Beg, End]
- })
- end, Ranges)
- end, ByNode).
-
-to_atom(Node) when is_binary(Node) ->
- list_to_atom(binary_to_list(Node));
-to_atom(Node) when is_atom(Node) ->
- Node.
-
-to_integer(N) when is_integer(N) ->
- N;
-to_integer(N) when is_binary(N) ->
- list_to_integer(binary_to_list(N));
-to_integer(N) when is_list(N) ->
- list_to_integer(N).
-
-n_val(undefined, NodeCount) ->
- n_val(couch_config:get("cluster", "n", "3"), NodeCount);
-n_val(N, NodeCount) when is_list(N) ->
- n_val(list_to_integer(N), NodeCount);
-n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount ->
- ?LOG_ERROR("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]),
- NodeCount;
-n_val(N, _) when N < 1 ->
- 1;
-n_val(N, _) ->
- N.
-
-load_shards_from_disk(DbName) when is_binary(DbName) ->
- {ok, Db} = couch_db:open(<<"dbs">>, []),
- try load_shards_from_db(Db, DbName) after couch_db:close(Db) end.
-
-load_shards_from_db(#db{} = ShardDb, DbName) ->
- case couch_db:open_doc(ShardDb, DbName, []) of
- {ok, #doc{body = {Props}}} ->
- ?LOG_INFO("dbs cache miss for ~s", [DbName]),
- build_shards(DbName, Props);
- {not_found, _} ->
- erlang:error(database_does_not_exist)
- end.
-
-load_shards_from_disk(DbName, DocId)->
- Shards = load_shards_from_disk(DbName),
- HashKey = hash(DocId),
- [S || #shard{range = [B,E]} = S <- Shards, B < HashKey, HashKey =< E].