diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/priv/couch_js/http.c | 675 | ||||
-rw-r--r-- | src/couchdb/priv/couch_js/http.h | 18 | ||||
-rw-r--r-- | src/couchdb/priv/couch_js/main.c | 338 | ||||
-rw-r--r-- | src/couchdb/priv/couch_js/utf8.c | 286 | ||||
-rw-r--r-- | src/couchdb/priv/couch_js/utf8.h | 19 | ||||
-rw-r--r-- | src/couchdb/priv/spawnkillable/couchspawnkillable_win.c | 145 | ||||
-rw-r--r-- | src/mem3.erl | 103 | ||||
-rw-r--r-- | src/mem3_app.erl | 9 | ||||
-rw-r--r-- | src/mem3_cache.erl | 92 | ||||
-rw-r--r-- | src/mem3_httpd.erl | 39 | ||||
-rw-r--r-- | src/mem3_nodes.erl | 120 | ||||
-rw-r--r-- | src/mem3_sup.erl | 21 | ||||
-rw-r--r-- | src/mem3_sync.erl | 215 | ||||
-rw-r--r-- | src/mem3_sync_event.erl | 44 | ||||
-rw-r--r-- | src/mem3_util.erl | 139 |
15 files changed, 1481 insertions, 782 deletions
diff --git a/src/couchdb/priv/couch_js/http.c b/src/couchdb/priv/couch_js/http.c new file mode 100644 index 00000000..6c2a8a82 --- /dev/null +++ b/src/couchdb/priv/couch_js/http.c @@ -0,0 +1,675 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <jsapi.h> +#include <curl/curl.h> + +#include "utf8.h" + +#ifdef XP_WIN +// Map some of the string function names to things which exist on Windows +#define strcasecmp _strcmpi +#define strncasecmp _strnicmp +#define snprintf _snprintf +#endif + +typedef struct curl_slist CurlHeaders; + +typedef struct { + int method; + char* url; + CurlHeaders* req_headers; + jsint last_status; +} HTTPData; + +char* METHODS[] = {"GET", "HEAD", "POST", "PUT", "DELETE", "COPY", NULL}; + +#define GET 0 +#define HEAD 1 +#define POST 2 +#define PUT 3 +#define DELETE 4 +#define COPY 5 + +static JSBool +go(JSContext* cx, JSObject* obj, HTTPData* http, char* body, size_t blen); + +static JSString* +str_from_binary(JSContext* cx, char* data, size_t length); + +static JSBool +constructor(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval) +{ + HTTPData* http = NULL; + JSBool ret = JS_FALSE; + + http = (HTTPData*) malloc(sizeof(HTTPData)); + if(!http) + { + JS_ReportError(cx, "Failed to create CouchHTTP instance."); + goto error; + } + + http->method = -1; + http->url = NULL; + http->req_headers = NULL; + http->last_status = -1; + + if(!JS_SetPrivate(cx, obj, http)) + { + JS_ReportError(cx, "Failed to set private CouchHTTP data."); + goto error; + } + + ret = JS_TRUE; + goto success; + +error: + if(http) free(http); + +success: + return ret; +} + +static void +destructor(JSContext* cx, JSObject* obj) +{ + HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj); + if(!http) + { + fprintf(stderr, "Unable to destroy invalid CouchHTTP instance.\n"); + } + else + { + if(http->url) free(http->url); + if(http->req_headers) curl_slist_free_all(http->req_headers); + free(http); + } +} + +static JSBool +open(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval) +{ + HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj); + char* method = NULL; + char* url = NULL; + JSBool ret = JS_FALSE; + int methid; + + if(!http) + { + JS_ReportError(cx, "Invalid CouchHTTP instance."); + goto done; + } + + if(argv[0] == JSVAL_VOID) + { + JS_ReportError(cx, "You must specify a method."); + goto done; + } + + method = enc_string(cx, argv[0], NULL); + if(!method) + { + JS_ReportError(cx, "Failed to encode method."); + goto done; + } + + for(methid = 0; METHODS[methid] != NULL; methid++) + { + if(strcasecmp(METHODS[methid], method) == 0) break; + } + + if(methid > COPY) + { + JS_ReportError(cx, "Invalid method specified."); + goto done; + } + + http->method = methid; + + if(argv[1] == JSVAL_VOID) + { + JS_ReportError(cx, "You must specify a URL."); + goto done; + } + + if(http->url) + { + free(http->url); + http->url = NULL; + } + + http->url = enc_string(cx, argv[1], NULL); + if(!http->url) + { + JS_ReportError(cx, "Failed to encode URL."); + goto done; + } + + if(argv[2] != JSVAL_VOID && argv[2] != JSVAL_FALSE) + { + JS_ReportError(cx, "Synchronous flag must be false if specified."); + goto done; + } + + if(http->req_headers) + { + curl_slist_free_all(http->req_headers); + http->req_headers = NULL; + } + + // Disable Expect: 100-continue + http->req_headers = curl_slist_append(http->req_headers, "Expect:"); + + ret = JS_TRUE; + +done: + if(method) free(method); + return ret; +} + +static JSBool +setheader(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval) +{ + HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj); + char* keystr = NULL; + char* valstr = NULL; + char* hdrbuf = NULL; + size_t hdrlen = -1; + JSBool ret = JS_FALSE; + + if(!http) + { + JS_ReportError(cx, "Invalid CouchHTTP instance."); + goto done; + } + + if(argv[0] == JSVAL_VOID) + { + JS_ReportError(cx, "You must speciy a header name."); + goto done; + } + + keystr = enc_string(cx, argv[0], NULL); + if(!keystr) + { + JS_ReportError(cx, "Failed to encode header name."); + goto done; + } + + if(argv[1] == JSVAL_VOID) + { + JS_ReportError(cx, "You must specify a header value."); + goto done; + } + + valstr = enc_string(cx, argv[1], NULL); + if(!valstr) + { + JS_ReportError(cx, "Failed to encode header value."); + goto done; + } + + hdrlen = strlen(keystr) + strlen(valstr) + 3; + hdrbuf = (char*) malloc(hdrlen * sizeof(char)); + if(!hdrbuf) + { + JS_ReportError(cx, "Failed to allocate header buffer."); + goto done; + } + + snprintf(hdrbuf, hdrlen, "%s: %s", keystr, valstr); + http->req_headers = curl_slist_append(http->req_headers, hdrbuf); + + ret = JS_TRUE; + +done: + if(keystr) free(keystr); + if(valstr) free(valstr); + if(hdrbuf) free(hdrbuf); + + return ret; +} + +static JSBool +sendreq(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval) +{ + HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj); + char* body = NULL; + size_t bodylen = 0; + JSBool ret = JS_FALSE; + + if(!http) + { + JS_ReportError(cx, "Invalid CouchHTTP instance."); + goto done; + } + + if(argv[0] != JSVAL_VOID && argv[0] != JS_GetEmptyStringValue(cx)) + { + body = enc_string(cx, argv[0], &bodylen); + if(!body) + { + JS_ReportError(cx, "Failed to encode body."); + goto done; + } + } + + ret = go(cx, obj, http, body, bodylen); + +done: + if(body) free(body); + return ret; +} + +static JSBool +status(JSContext* cx, JSObject* obj, jsval idval, jsval* vp) +{ + HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj); + + if(!http) + { + JS_ReportError(cx, "Invalid CouchHTTP instance."); + return JS_FALSE; + } + + if(INT_FITS_IN_JSVAL(http->last_status)) + { + *vp = INT_TO_JSVAL(http->last_status); + return JS_TRUE; + } + else + { + JS_ReportError(cx, "INTERNAL: Invalid last_status"); + return JS_FALSE; + } +} + +JSClass CouchHTTPClass = { + "CouchHTTP", + JSCLASS_HAS_PRIVATE + | JSCLASS_CONSTRUCT_PROTOTYPE + | JSCLASS_HAS_RESERVED_SLOTS(2), + JS_PropertyStub, + JS_PropertyStub, + JS_PropertyStub, + JS_PropertyStub, + JS_EnumerateStub, + JS_ResolveStub, + JS_ConvertStub, + destructor, + JSCLASS_NO_OPTIONAL_MEMBERS +}; + +JSPropertySpec CouchHTTPProperties[] = { + {"status", 0, JSPROP_READONLY, status, NULL}, + {0, 0, 0, 0, 0} +}; + +JSFunctionSpec CouchHTTPFunctions[] = { + {"_open", open, 3, 0, 0}, + {"_setRequestHeader", setheader, 2, 0, 0}, + {"_send", sendreq, 1, 0, 0}, + {0, 0, 0, 0, 0} +}; + +JSObject* +install_http(JSContext* cx, JSObject* glbl) +{ + JSObject* klass = NULL; + HTTPData* http = NULL; + + klass = JS_InitClass( + cx, + glbl, + NULL, + &CouchHTTPClass, + constructor, + 0, + CouchHTTPProperties, + CouchHTTPFunctions, + NULL, + NULL + ); + + if(!klass) + { + fprintf(stderr, "Failed to initialize CouchHTTP class.\n"); + return NULL; + } + + return klass; +} + + +// Curl Helpers + +typedef struct { + HTTPData* http; + JSContext* cx; + JSObject* resp_headers; + char* sendbuf; + size_t sendlen; + size_t sent; + char* recvbuf; + size_t recvlen; + size_t read; +} CurlState; + +/* + * I really hate doing this but this doesn't have to be + * uber awesome, it just has to work. + */ +CURL* HTTP_HANDLE = NULL; +char ERRBUF[CURL_ERROR_SIZE]; + +static size_t send_body(void *ptr, size_t size, size_t nmem, void *data); +static int seek_body(void *ptr, curl_off_t offset, int origin); +static size_t recv_body(void *ptr, size_t size, size_t nmem, void *data); +static size_t recv_header(void *ptr, size_t size, size_t nmem, void *data); + +static JSBool +go(JSContext* cx, JSObject* obj, HTTPData* http, char* body, size_t bodylen) +{ + CurlState state; + JSString* jsbody; + JSBool ret = JS_FALSE; + jsval tmp; + + state.cx = cx; + state.http = http; + + state.sendbuf = body; + state.sendlen = bodylen; + state.sent = 0; + + state.recvbuf = NULL; + state.recvlen = 0; + state.read = 0; + + if(HTTP_HANDLE == NULL) + { + HTTP_HANDLE = curl_easy_init(); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_READFUNCTION, send_body); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_SEEKFUNCTION, + (curl_seek_callback) seek_body); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_HEADERFUNCTION, recv_header); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEFUNCTION, recv_body); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOPROGRESS, 1); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_ERRORBUFFER, ERRBUF); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_COOKIEFILE, ""); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_USERAGENT, + "CouchHTTP Client - Relax"); + } + + if(!HTTP_HANDLE) + { + JS_ReportError(cx, "Failed to initialize cURL handle."); + goto done; + } + + if(http->method < 0 || http->method > COPY) + { + JS_ReportError(cx, "INTERNAL: Unknown method."); + goto done; + } + + curl_easy_setopt(HTTP_HANDLE, CURLOPT_CUSTOMREQUEST, METHODS[http->method]); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOBODY, 0); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_UPLOAD, 0); + + if(http->method == HEAD) + { + curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOBODY, 1); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 0); + } + else if(http->method == POST || http->method == PUT) + { + curl_easy_setopt(HTTP_HANDLE, CURLOPT_UPLOAD, 1); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 0); + } + + if(body && bodylen) + { + curl_easy_setopt(HTTP_HANDLE, CURLOPT_INFILESIZE, bodylen); + } + else + { + curl_easy_setopt(HTTP_HANDLE, CURLOPT_INFILESIZE, 0); + } + + //curl_easy_setopt(HTTP_HANDLE, CURLOPT_VERBOSE, 1); + + curl_easy_setopt(HTTP_HANDLE, CURLOPT_URL, http->url); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_HTTPHEADER, http->req_headers); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_READDATA, &state); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_SEEKDATA, &state); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEHEADER, &state); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEDATA, &state); + + if(curl_easy_perform(HTTP_HANDLE) != 0) + { + JS_ReportError(cx, "Failed to execute HTTP request: %s", ERRBUF); + goto done; + } + + if(!state.resp_headers) + { + JS_ReportError(cx, "Failed to recieve HTTP headers."); + goto done; + } + + tmp = OBJECT_TO_JSVAL(state.resp_headers); + if(!JS_DefineProperty( + cx, + obj, + "_headers", + tmp, + NULL, + NULL, + JSPROP_READONLY + )) + { + JS_ReportError(cx, "INTERNAL: Failed to set response headers."); + goto done; + } + + if(state.recvbuf) // Is good enough? + { + state.recvbuf[state.read] = '\0'; + jsbody = dec_string(cx, state.recvbuf, state.read+1); + if(!jsbody) + { + // If we can't decode the body as UTF-8 we forcefully + // convert it to a string by just forcing each byte + // to a jschar. + jsbody = str_from_binary(cx, state.recvbuf, state.read); + if(!jsbody) { + if(!JS_IsExceptionPending(cx)) { + JS_ReportError(cx, "INTERNAL: Failed to decode body."); + } + goto done; + } + } + tmp = STRING_TO_JSVAL(jsbody); + } + else + { + tmp = JS_GetEmptyStringValue(cx); + } + + if(!JS_DefineProperty( + cx, + obj, + "responseText", + tmp, + NULL, + NULL, + JSPROP_READONLY + )) + { + JS_ReportError(cx, "INTERNAL: Failed to set responseText."); + goto done; + } + + ret = JS_TRUE; + +done: + if(state.recvbuf) JS_free(cx, state.recvbuf); + return ret; +} + +static size_t +send_body(void *ptr, size_t size, size_t nmem, void *data) +{ + CurlState* state = (CurlState*) data; + size_t length = size * nmem; + size_t towrite = state->sendlen - state->sent; + if(towrite == 0) + { + return 0; + } + + if(length < towrite) towrite = length; + + //fprintf(stderr, "%lu %lu %lu %lu\n", state->bodyused, state->bodyread, length, towrite); + + memcpy(ptr, state->sendbuf + state->sent, towrite); + state->sent += towrite; + + return towrite; +} + +static int +seek_body(void* ptr, curl_off_t offset, int origin) +{ + CurlState* state = (CurlState*) ptr; + if(origin != SEEK_SET) return -1; + + state->sent = (size_t) offset; + return (int) state->sent; +} + +static size_t +recv_header(void *ptr, size_t size, size_t nmem, void *data) +{ + CurlState* state = (CurlState*) data; + char code[4]; + char* header = (char*) ptr; + size_t length = size * nmem; + size_t index = 0; + JSString* hdr = NULL; + jsuint hdrlen; + jsval hdrval; + + if(length > 7 && strncasecmp(header, "HTTP/1.", 7) == 0) + { + if(length < 12) + { + return CURLE_WRITE_ERROR; + } + + memcpy(code, header+9, 3*sizeof(char)); + code[3] = '\0'; + state->http->last_status = atoi(code); + + state->resp_headers = JS_NewArrayObject(state->cx, 0, NULL); + if(!state->resp_headers) + { + return CURLE_WRITE_ERROR; + } + + return length; + } + + // We get a notice at the \r\n\r\n after headers. + if(length <= 2) + { + return length; + } + + // Append the new header to our array. + hdr = dec_string(state->cx, header, length); + if(!hdr) + { + return CURLE_WRITE_ERROR; + } + + if(!JS_GetArrayLength(state->cx, state->resp_headers, &hdrlen)) + { + return CURLE_WRITE_ERROR; + } + + hdrval = STRING_TO_JSVAL(hdr); + if(!JS_SetElement(state->cx, state->resp_headers, hdrlen, &hdrval)) + { + return CURLE_WRITE_ERROR; + } + + return length; +} + +static size_t +recv_body(void *ptr, size_t size, size_t nmem, void *data) +{ + CurlState* state = (CurlState*) data; + size_t length = size * nmem; + char* tmp = NULL; + + if(!state->recvbuf) + { + state->recvlen = 4096; + state->read = 0; + state->recvbuf = JS_malloc(state->cx, state->recvlen); + } + + if(!state->recvbuf) + { + return CURLE_WRITE_ERROR; + } + + // +1 so we can add '\0' back up in the go function. + while(length+1 > state->recvlen - state->read) state->recvlen *= 2; + tmp = JS_realloc(state->cx, state->recvbuf, state->recvlen); + if(!tmp) return CURLE_WRITE_ERROR; + state->recvbuf = tmp; + + memcpy(state->recvbuf + state->read, ptr, length); + state->read += length; + return length; +} + +JSString* +str_from_binary(JSContext* cx, char* data, size_t length) +{ + jschar* conv = (jschar*) JS_malloc(cx, length * sizeof(jschar)); + JSString* ret = NULL; + size_t i; + + if(!conv) return NULL; + + for(i = 0; i < length; i++) + { + conv[i] = (jschar) data[i]; + } + + ret = JS_NewUCString(cx, conv, length); + if(!ret) JS_free(cx, conv); + + return ret; +} diff --git a/src/couchdb/priv/couch_js/http.h b/src/couchdb/priv/couch_js/http.h new file mode 100644 index 00000000..b5f8c70f --- /dev/null +++ b/src/couchdb/priv/couch_js/http.h @@ -0,0 +1,18 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#ifndef COUCH_JS_HTTP_H +#define COUCH_JS_HTTP_H + +JSObject* install_http(JSContext* cx, JSObject* global); + +#endif
\ No newline at end of file diff --git a/src/couchdb/priv/couch_js/main.c b/src/couchdb/priv/couch_js/main.c new file mode 100644 index 00000000..376aa15b --- /dev/null +++ b/src/couchdb/priv/couch_js/main.c @@ -0,0 +1,338 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <jsapi.h> +#include "config.h" + +#include "utf8.h" +#include "http.h" + +int gExitCode = 0; + +#ifdef JS_THREADSAFE +#define SETUP_REQUEST(cx) \ + JS_SetContextThread(cx); \ + JS_BeginRequest(cx); +#define FINISH_REQUEST(cx) \ + JS_EndRequest(cx); \ + JS_ClearContextThread(cx); +#else +#define SETUP_REQUEST(cx) +#define FINISH_REQUEST(cx) +#endif + +static JSBool +evalcx(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) +{ + JSString *str; + JSObject *sandbox; + JSContext *subcx; + const jschar *src; + size_t srclen; + JSBool ret = JS_FALSE; + jsval v; + + sandbox = NULL; + if(!JS_ConvertArguments(cx, argc, argv, "S / o", &str, &sandbox)) + { + return JS_FALSE; + } + + subcx = JS_NewContext(JS_GetRuntime(cx), 8L * 1024L); + if(!subcx) + { + JS_ReportOutOfMemory(cx); + return JS_FALSE; + } + + SETUP_REQUEST(subcx); + + src = JS_GetStringChars(str); + srclen = JS_GetStringLength(str); + + if(!sandbox) + { + sandbox = JS_NewObject(subcx, NULL, NULL, NULL); + if(!sandbox || !JS_InitStandardClasses(subcx, sandbox)) goto done; + } + + if(srclen == 0) + { + *rval = OBJECT_TO_JSVAL(sandbox); + } + else + { + JS_EvaluateUCScript(subcx, sandbox, src, srclen, NULL, 0, rval); + } + + ret = JS_TRUE; + +done: + FINISH_REQUEST(subcx); + JS_DestroyContext(subcx); + return ret; +} + +static JSBool +gc(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) +{ + JS_GC(cx); + return JS_TRUE; +} + +static JSBool +print(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) +{ + uintN i; + char *bytes; + + for(i = 0; i < argc; i++) + { + bytes = enc_string(cx, argv[i], NULL); + if(!bytes) return JS_FALSE; + + fprintf(stdout, "%s%s", i ? " " : "", bytes); + JS_free(cx, bytes); + } + + fputc('\n', stdout); + fflush(stdout); + return JS_TRUE; +} + +static JSBool +quit(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) +{ + JS_ConvertArguments(cx, argc, argv, "/ i", &gExitCode); + return JS_FALSE; +} + +static char* +readfp(JSContext* cx, FILE* fp, size_t* buflen) +{ + char* bytes = NULL; + char* tmp = NULL; + size_t used = 0; + size_t byteslen = 256; + size_t readlen = 0; + + bytes = JS_malloc(cx, byteslen); + if(bytes == NULL) return NULL; + + while((readlen = js_fgets(bytes+used, byteslen-used, stdin)) > 0) + { + used += readlen; + + if(bytes[used-1] == '\n') + { + bytes[used-1] = '\0'; + break; + } + + // Double our buffer and read more. + byteslen *= 2; + tmp = JS_realloc(cx, bytes, byteslen); + if(!tmp) + { + JS_free(cx, bytes); + return NULL; + } + bytes = tmp; + } + + *buflen = used; + return bytes; +} + +static JSBool +readline(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) { + jschar *chars; + JSString *str; + char* bytes; + char* tmp; + size_t byteslen; + + /* GC Occasionally */ + JS_MaybeGC(cx); + + bytes = readfp(cx, stdin, &byteslen); + if(!bytes) return JS_FALSE; + + /* Treat the empty string specially */ + if(byteslen == 0) + { + *rval = JS_GetEmptyStringValue(cx); + JS_free(cx, bytes); + return JS_TRUE; + } + + /* Shrink the buffer to the real size */ + tmp = JS_realloc(cx, bytes, byteslen); + if(!tmp) + { + JS_free(cx, bytes); + return JS_FALSE; + } + bytes = tmp; + + str = dec_string(cx, bytes, byteslen); + JS_free(cx, bytes); + + if(!str) return JS_FALSE; + + *rval = STRING_TO_JSVAL(str); + + return JS_TRUE; +} + +static JSBool +seal(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) { + JSObject *target; + JSBool deep = JS_FALSE; + + if (!JS_ConvertArguments(cx, argc, argv, "o/b", &target, &deep)) + return JS_FALSE; + if (!target) + return JS_TRUE; + return JS_SealObject(cx, target, deep); +} + +static void +execute_script(JSContext *cx, JSObject *obj, const char *filename) { + FILE *file; + JSScript *script; + jsval result; + + if(!filename || strcmp(filename, "-") == 0) + { + file = stdin; + } + else + { + file = fopen(filename, "r"); + if (!file) + { + fprintf(stderr, "could not open script file %s\n", filename); + gExitCode = 1; + return; + } + } + + script = JS_CompileFileHandle(cx, obj, filename, file); + if(script) + { + JS_ExecuteScript(cx, obj, script, &result); + JS_DestroyScript(cx, script); + } +} + +static void +printerror(JSContext *cx, const char *mesg, JSErrorReport *report) +{ + if(!report || !JSREPORT_IS_WARNING(report->flags)) + { + fprintf(stderr, "%s\n", mesg); + } +} + +static JSFunctionSpec global_functions[] = { + {"evalcx", evalcx, 0, 0, 0}, + {"gc", gc, 0, 0, 0}, + {"print", print, 0, 0, 0}, + {"quit", quit, 0, 0, 0}, + {"readline", readline, 0, 0, 0}, + {"seal", seal, 0, 0, 0}, + {0, 0, 0, 0, 0} +}; + +static JSClass global_class = { + "GlobalClass", + JSCLASS_GLOBAL_FLAGS, + JS_PropertyStub, + JS_PropertyStub, + JS_PropertyStub, + JS_PropertyStub, + JS_EnumerateStub, + JS_ResolveStub, + JS_ConvertStub, + JS_FinalizeStub, + JSCLASS_NO_OPTIONAL_MEMBERS +}; + +int +main(int argc, const char * argv[]) +{ + JSRuntime* rt = NULL; + JSContext* cx = NULL; + JSObject* global = NULL; + JSFunctionSpec* sp = NULL; + int i = 0; + + rt = JS_NewRuntime(64L * 1024L * 1024L); + if (!rt) return 1; + + cx = JS_NewContext(rt, 8L * 1024L); + if (!cx) return 1; + + JS_SetErrorReporter(cx, printerror); + JS_ToggleOptions(cx, JSOPTION_XML); + + SETUP_REQUEST(cx); + + global = JS_NewObject(cx, &global_class, NULL, NULL); + if (!global) return 1; + if (!JS_InitStandardClasses(cx, global)) return 1; + + for(sp = global_functions; sp->name != NULL; sp++) + { + if(!JS_DefineFunction(cx, global, + sp->name, sp->call, sp->nargs, sp->flags)) + { + fprintf(stderr, "Failed to create function: %s\n", sp->name); + return 1; + } + } + + if(!install_http(cx, global)) + { + return 1; + } + + JS_SetGlobalObject(cx, global); + + if(argc > 2) + { + fprintf(stderr, "incorrect number of arguments\n\n"); + fprintf(stderr, "usage: %s <scriptfile>\n", argv[0]); + return 2; + } + + if(argc == 0) + { + execute_script(cx, global, NULL); + } + else + { + execute_script(cx, global, argv[1]); + } + + FINISH_REQUEST(cx); + + JS_DestroyContext(cx); + JS_DestroyRuntime(rt); + JS_ShutDown(); + + return gExitCode; +} diff --git a/src/couchdb/priv/couch_js/utf8.c b/src/couchdb/priv/couch_js/utf8.c new file mode 100644 index 00000000..699a6fee --- /dev/null +++ b/src/couchdb/priv/couch_js/utf8.c @@ -0,0 +1,286 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include <jsapi.h> + +static int +enc_char(uint8 *utf8Buffer, uint32 ucs4Char) +{ + int utf8Length = 1; + + if (ucs4Char < 0x80) + { + *utf8Buffer = (uint8)ucs4Char; + } + else + { + int i; + uint32 a = ucs4Char >> 11; + utf8Length = 2; + while(a) + { + a >>= 5; + utf8Length++; + } + i = utf8Length; + while(--i) + { + utf8Buffer[i] = (uint8)((ucs4Char & 0x3F) | 0x80); + ucs4Char >>= 6; + } + *utf8Buffer = (uint8)(0x100 - (1 << (8-utf8Length)) + ucs4Char); + } + + return utf8Length; +} + +static JSBool +enc_charbuf(const jschar* src, size_t srclen, char* dst, size_t* dstlenp) +{ + size_t i; + size_t utf8Len; + size_t dstlen = *dstlenp; + size_t origDstlen = dstlen; + jschar c; + jschar c2; + uint32 v; + uint8 utf8buf[6]; + + if(!dst) + { + dstlen = origDstlen = (size_t) -1; + } + + while(srclen) + { + c = *src++; + srclen--; + + if((c >= 0xDC00) && (c <= 0xDFFF)) goto bad_surrogate; + + if(c < 0xD800 || c > 0xDBFF) + { + v = c; + } + else + { + if(srclen < 1) goto buffer_too_small; + c2 = *src++; + srclen--; + if ((c2 < 0xDC00) || (c2 > 0xDFFF)) + { + c = c2; + goto bad_surrogate; + } + v = ((c - 0xD800) << 10) + (c2 - 0xDC00) + 0x10000; + } + if(v < 0x0080) + { + /* no encoding necessary - performance hack */ + if(!dstlen) goto buffer_too_small; + if(dst) *dst++ = (char) v; + utf8Len = 1; + } + else + { + utf8Len = enc_char(utf8buf, v); + if(utf8Len > dstlen) goto buffer_too_small; + if(dst) + { + for (i = 0; i < utf8Len; i++) + { + *dst++ = (char) utf8buf[i]; + } + } + } + dstlen -= utf8Len; + } + + *dstlenp = (origDstlen - dstlen); + return JS_TRUE; + +bad_surrogate: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; + +buffer_too_small: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; +} + +char* +enc_string(JSContext* cx, jsval arg, size_t* buflen) +{ + JSString* str = NULL; + jschar* src = NULL; + char* bytes = NULL; + size_t srclen = 0; + size_t byteslen = 0; + + str = JS_ValueToString(cx, arg); + if(!str) goto error; + + src = JS_GetStringChars(str); + srclen = JS_GetStringLength(str); + + if(!enc_charbuf(src, srclen, NULL, &byteslen)) goto error; + + bytes = JS_malloc(cx, (byteslen) + 1); + bytes[byteslen] = 0; + + if(!enc_charbuf(src, srclen, bytes, &byteslen)) goto error; + + if(buflen) *buflen = byteslen; + goto success; + +error: + if(bytes != NULL) JS_free(cx, bytes); + bytes = NULL; + +success: + return bytes; +} + +static uint32 +dec_char(const uint8 *utf8Buffer, int utf8Length) +{ + uint32 ucs4Char; + uint32 minucs4Char; + + /* from Unicode 3.1, non-shortest form is illegal */ + static const uint32 minucs4Table[] = { + 0x00000080, 0x00000800, 0x0001000, 0x0020000, 0x0400000 + }; + + if (utf8Length == 1) + { + ucs4Char = *utf8Buffer; + } + else + { + ucs4Char = *utf8Buffer++ & ((1<<(7-utf8Length))-1); + minucs4Char = minucs4Table[utf8Length-2]; + while(--utf8Length) + { + ucs4Char = ucs4Char<<6 | (*utf8Buffer++ & 0x3F); + } + if(ucs4Char < minucs4Char || ucs4Char == 0xFFFE || ucs4Char == 0xFFFF) + { + ucs4Char = 0xFFFD; + } + } + + return ucs4Char; +} + +static JSBool +dec_charbuf(const char *src, size_t srclen, jschar *dst, size_t *dstlenp) +{ + uint32 v; + size_t offset = 0; + size_t j; + size_t n; + size_t dstlen = *dstlenp; + size_t origDstlen = dstlen; + + if(!dst) dstlen = origDstlen = (size_t) -1; + + while(srclen) + { + v = (uint8) *src; + n = 1; + + if(v & 0x80) + { + while(v & (0x80 >> n)) + { + n++; + } + + if(n > srclen) goto buffer_too_small; + if(n == 1 || n > 6) goto bad_character; + + for(j = 1; j < n; j++) + { + if((src[j] & 0xC0) != 0x80) goto bad_character; + } + + v = dec_char((const uint8 *) src, n); + if(v >= 0x10000) + { + v -= 0x10000; + + if(v > 0xFFFFF || dstlen < 2) + { + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; + } + + if(dstlen < 2) goto buffer_too_small; + + if(dst) + { + *dst++ = (jschar)((v >> 10) + 0xD800); + v = (jschar)((v & 0x3FF) + 0xDC00); + } + dstlen--; + } + } + + if(!dstlen) goto buffer_too_small; + if(dst) *dst++ = (jschar) v; + + dstlen--; + offset += n; + src += n; + srclen -= n; + } + + *dstlenp = (origDstlen - dstlen); + return JS_TRUE; + +bad_character: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; + +buffer_too_small: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; +} + +JSString* +dec_string(JSContext* cx, const char* bytes, size_t byteslen) +{ + JSString* str = NULL; + jschar* chars = NULL; + size_t charslen; + + if(!dec_charbuf(bytes, byteslen, NULL, &charslen)) goto error; + + chars = JS_malloc(cx, (charslen + 1) * sizeof(jschar)); + if(!chars) return NULL; + chars[charslen] = 0; + + if(!dec_charbuf(bytes, byteslen, chars, &charslen)) goto error; + + str = JS_NewUCString(cx, chars, charslen - 1); + if(!str) goto error; + + goto success; + +error: + if(chars != NULL) JS_free(cx, chars); + str = NULL; + +success: + return str; +}
\ No newline at end of file diff --git a/src/couchdb/priv/couch_js/utf8.h b/src/couchdb/priv/couch_js/utf8.h new file mode 100644 index 00000000..00f6b736 --- /dev/null +++ b/src/couchdb/priv/couch_js/utf8.h @@ -0,0 +1,19 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#ifndef COUCH_JS_UTF_8_H +#define COUCH_JS_UTF_8_H + +char* enc_string(JSContext* cx, jsval arg, size_t* buflen); +JSString* dec_string(JSContext* cx, const char* buf, size_t buflen); + +#endif
\ No newline at end of file diff --git a/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c b/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c new file mode 100644 index 00000000..06782315 --- /dev/null +++ b/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c @@ -0,0 +1,145 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +// Do what 2 lines of shell script in couchspawnkillable does... +// * Create a new suspended process with the same (duplicated) standard +// handles as us. +// * Write a line to stdout, consisting of the path to ourselves, plus +// '--kill {pid}' where {pid} is the PID of the newly created process. +// * Un-suspend the new process. +// * Wait for the process to terminate. +// * Terminate with the child's exit-code. + +// Later, couch will call us with --kill and the PID, so we dutifully +// terminate the specified PID. + +#include <stdlib.h> +#include "windows.h" + +char *get_child_cmdline(int argc, char **argv) +{ + // make a new command-line, but skipping me. + // XXX - todo - spaces etc in args??? + int i; + char *p, *cmdline; + int nchars = 0; + int nthis = 1; + for (i=1;i<argc;i++) + nchars += strlen(argv[i])+1; + cmdline = p = malloc(nchars+1); + if (!cmdline) + return NULL; + for (i=1;i<argc;i++) { + nthis = strlen(argv[i]); + strncpy(p, argv[i], nthis); + p[nthis] = ' '; + p += nthis+1; + } + // Replace the last space we added above with a '\0' + cmdline[nchars-1] = '\0'; + return cmdline; +} + +// create the child process, returning 0, or the exit-code we will +// terminate with. +int create_child(int argc, char **argv, PROCESS_INFORMATION *pi) +{ + char buf[1024]; + DWORD dwcreate; + STARTUPINFO si; + char *cmdline; + if (argc < 2) + return 1; + cmdline = get_child_cmdline(argc, argv); + if (!cmdline) + return 2; + + memset(&si, 0, sizeof(si)); + si.cb = sizeof(si); + // depending on how *our* parent is started, we may or may not have + // a valid stderr stream - so although we try and duplicate it, only + // failing to duplicate stdin and stdout are considered fatal. + if (!DuplicateHandle(GetCurrentProcess(), + GetStdHandle(STD_INPUT_HANDLE), + GetCurrentProcess(), + &si.hStdInput, + 0, + TRUE, // inheritable + DUPLICATE_SAME_ACCESS) || + !DuplicateHandle(GetCurrentProcess(), + GetStdHandle(STD_OUTPUT_HANDLE), + GetCurrentProcess(), + &si.hStdOutput, + 0, + TRUE, // inheritable + DUPLICATE_SAME_ACCESS)) { + return 3; + } + DuplicateHandle(GetCurrentProcess(), + GetStdHandle(STD_ERROR_HANDLE), + GetCurrentProcess(), + &si.hStdError, + 0, + TRUE, // inheritable + DUPLICATE_SAME_ACCESS); + + si.dwFlags = STARTF_USESTDHANDLES; + dwcreate = CREATE_SUSPENDED; + if (!CreateProcess( NULL, cmdline, + NULL, + NULL, + TRUE, // inherit handles + dwcreate, + NULL, // environ + NULL, // cwd + &si, + pi)) + return 4; + return 0; +} + +// and here we go... +int main(int argc, char **argv) +{ + char out_buf[1024]; + int rc; + DWORD cbwritten; + DWORD exitcode; + PROCESS_INFORMATION pi; + if (argc==3 && strcmp(argv[1], "--kill")==0) { + HANDLE h = OpenProcess(PROCESS_TERMINATE, 0, atoi(argv[2])); + if (!h) + return 1; + if (!TerminateProcess(h, 0)) + return 2; + CloseHandle(h); + return 0; + } + // spawn the new suspended process + rc = create_child(argc, argv, &pi); + if (rc) + return rc; + // Write the 'terminate' command, which includes this PID, back to couch. + // *sob* - what about spaces etc? + sprintf_s(out_buf, sizeof(out_buf), "%s --kill %d\n", + argv[0], pi.dwProcessId); + WriteFile(GetStdHandle(STD_OUTPUT_HANDLE), out_buf, strlen(out_buf), + &cbwritten, NULL); + // Let the child process go... + ResumeThread(pi.hThread); + // Wait for the process to terminate so we can reflect the exit code + // back to couch. + WaitForSingleObject(pi.hProcess, INFINITE); + if (!GetExitCodeProcess(pi.hProcess, &exitcode)) + return 6; + return exitcode; +} diff --git a/src/mem3.erl b/src/mem3.erl deleted file mode 100644 index 1485c7fe..00000000 --- a/src/mem3.erl +++ /dev/null @@ -1,103 +0,0 @@ --module(mem3). - --export([start/0, stop/0, restart/0, nodes/0, shards/1, shards/2, - choose_shards/2]). --export([compare_nodelists/0, compare_shards/1]). - --include("mem3.hrl"). - -start() -> - application:start(mem3). - -stop() -> - application:stop(mem3). - -restart() -> - stop(), - start(). - -%% @doc Detailed report of cluster-wide membership state. Queries the state -%% on all member nodes and builds a dictionary with unique states as the -%% key and the nodes holding that state as the value. Also reports member -%% nodes which fail to respond and nodes which are connected but are not -%% cluster members. Useful for debugging. --spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes - | non_member_nodes, [node()]}]. -compare_nodelists() -> - Nodes = mem3:nodes(), - AllNodes = erlang:nodes([this, visible]), - {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist), - Dict = lists:foldl(fun({Node, Nodelist}, D) -> - orddict:append({cluster_nodes, Nodelist}, Node, D) - end, orddict:new(), Replies), - [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict]. - --spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}]. -compare_shards(DbName) when is_list(DbName) -> - compare_shards(list_to_binary(DbName)); -compare_shards(DbName) -> - Nodes = mem3:nodes(), - {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]), - GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)], - Dict = lists:foldl(fun({Shards, Node}, D) -> - orddict:append(Shards, Node, D) - end, orddict:new(), lists:zip(Replies, GoodNodes)), - [{bad_nodes, BadNodes} | Dict]. - --spec nodes() -> [node()]. -nodes() -> - mem3_nodes:get_nodelist(). - --spec shards(DbName::iodata()) -> [#shard{}]. -shards(DbName) when is_list(DbName) -> - shards(list_to_binary(DbName)); -shards(DbName) -> - try ets:lookup(partitions, DbName) of - [] -> - mem3_util:load_shards_from_disk(DbName); - Else -> - Else - catch error:badarg -> - mem3_util:load_shards_from_disk(DbName) - end. - --spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}]. -shards(DbName, DocId) when is_list(DbName) -> - shards(list_to_binary(DbName), DocId); -shards(DbName, DocId) when is_list(DocId) -> - shards(DbName, list_to_binary(DocId)); -shards(DbName, DocId) -> - HashKey = mem3_util:hash(DocId), - Head = #shard{ - name = '_', - node = '_', - dbname = DbName, - range = ['$1','$2'], - ref = '_' - }, - Conditions = [{'<', '$1', HashKey}, {'=<', HashKey, '$2'}], - try ets:select(partitions, [{Head, Conditions, ['$_']}]) of - [] -> - mem3_util:load_shards_from_disk(DbName, DocId); - Shards -> - Shards - catch error:badarg -> - mem3_util:load_shards_from_disk(DbName, DocId) - end. - --spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}]. -choose_shards(DbName, Options) when is_list(DbName) -> - choose_shards(list_to_binary(DbName), Options); -choose_shards(DbName, Options) -> - try shards(DbName) - catch error:E when E==database_does_not_exist; E==badarg -> - Nodes = mem3:nodes(), - NodeCount = length(Nodes), - N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount), - Q = mem3_util:to_integer(couch_util:get_value(q, Options, - couch_config:get("cluster", "q", "8"))), - % rotate to a random entry in the nodelist for even distribution - {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes), - RotatedNodes = B ++ A, - mem3_util:create_partition_map(DbName, N, Q, RotatedNodes) - end. diff --git a/src/mem3_app.erl b/src/mem3_app.erl deleted file mode 100644 index 88cd1ea1..00000000 --- a/src/mem3_app.erl +++ /dev/null @@ -1,9 +0,0 @@ --module(mem3_app). --behaviour(application). --export([start/2, stop/1]). - -start(_Type, []) -> - mem3_sup:start_link(). - -stop([]) -> - ok. diff --git a/src/mem3_cache.erl b/src/mem3_cache.erl deleted file mode 100644 index 2a29ca4c..00000000 --- a/src/mem3_cache.erl +++ /dev/null @@ -1,92 +0,0 @@ --module(mem3_cache). --behaviour(gen_server). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --export([start_link/0]). - --record(state, {changes_pid}). - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -init([]) -> - ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]), - {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end), - {ok, #state{changes_pid = Pid}}. - -handle_call(_Call, _From, State) -> - {noreply, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({'DOWN', _, _, Pid, {badarg, [{ets,delete,[partitions,_]}|_]}}, - #state{changes_pid=Pid} = State) -> - % fatal error, somebody deleted our ets table - {stop, ets_table_error, State}; -handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> - ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]), - Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end, - timer:send_after(5000, {start_listener, Seq}), - {noreply, State}; -handle_info({start_listener, Seq}, State) -> - {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), - {noreply, State#state{changes_pid=NewPid}}; -handle_info(_Msg, State) -> - {noreply, State}. - -terminate(_Reason, #state{changes_pid=Pid}) -> - exit(Pid, kill), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% internal functions - -listen_for_changes(Since) -> - DbName = ?l2b(couch_config:get("mem3", "db", "dbs")), - {ok, Db} = ensure_exists(DbName), - Args = #changes_args{ - feed = "continuous", - since = Since, - heartbeat = true, - include_docs = true - }, - ChangesFun = couch_changes:handle_changes(Args, nil, Db), - ChangesFun(fun changes_callback/2). - -ensure_exists(DbName) -> - Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}], - case couch_db:open(DbName, Options) of - {ok, Db} -> - {ok, Db}; - _ -> - couch_server:create(DbName, Options) - end. - -changes_callback(start, _) -> - {ok, nil}; -changes_callback({stop, EndSeq}, _) -> - exit({seq, EndSeq}); -changes_callback({change, {Change}, _}, _) -> - DbName = couch_util:get_value(<<"id">>, Change), - case couch_util:get_value(deleted, Change, false) of - true -> - ets:delete(partitions, DbName); - false -> - case couch_util:get_value(doc, Change) of - {error, Reason} -> - ?LOG_ERROR("missing partition table for ~s: ~p", [DbName, Reason]); - {Doc} -> - ets:delete(partitions, DbName), - ets:insert(partitions, mem3_util:build_shards(DbName, Doc)) - end - end, - {ok, couch_util:get_value(<<"seq">>, Change)}; -changes_callback(timeout, _) -> - {ok, nil}. diff --git a/src/mem3_httpd.erl b/src/mem3_httpd.erl deleted file mode 100644 index cbfaea95..00000000 --- a/src/mem3_httpd.erl +++ /dev/null @@ -1,39 +0,0 @@ --module(mem3_httpd). - --export([handle_membership_req/1]). - -%% includes --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - - -handle_membership_req(#httpd{method='GET', - path_parts=[<<"_membership">>]} = Req) -> - ClusterNodes = try mem3:nodes() - catch _:_ -> {ok,[]} end, - couch_httpd:send_json(Req, {[ - {all_nodes, lists:sort([node()|nodes()])}, - {cluster_nodes, lists:sort(ClusterNodes)} - ]}); -handle_membership_req(#httpd{method='GET', - path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) -> - ClusterNodes = try mem3:nodes() - catch _:_ -> {ok,[]} end, - Shards = mem3:shards(DbName), - JsonShards = json_shards(Shards, dict:new()), - couch_httpd:send_json(Req, {[ - {all_nodes, lists:sort([node()|nodes()])}, - {cluster_nodes, lists:sort(ClusterNodes)}, - {partitions, JsonShards} - ]}). - -%% -%% internal -%% - -json_shards([], AccIn) -> - List = dict:to_list(AccIn), - {lists:sort(List)}; -json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) -> - HexBeg = couch_util:to_hex(<<B:32/integer>>), - json_shards(Rest, dict:append(HexBeg, Node, AccIn)). diff --git a/src/mem3_nodes.erl b/src/mem3_nodes.erl deleted file mode 100644 index 6cbf3d9a..00000000 --- a/src/mem3_nodes.erl +++ /dev/null @@ -1,120 +0,0 @@ --module(mem3_nodes). --behaviour(gen_server). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --export([start_link/0, get_nodelist/0]). - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --record(state, {changes_pid, update_seq, nodes}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -get_nodelist() -> - gen_server:call(?MODULE, get_nodelist). - -init([]) -> - {Nodes, UpdateSeq} = initialize_nodelist(), - {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end), - {ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}. - -handle_call(get_nodelist, _From, State) -> - {reply, State#state.nodes, State}; -handle_call({add_node, Node}, _From, #state{nodes=Nodes} = State) -> - gen_event:notify(mem3_events, {add_node, Node}), - {reply, ok, State#state{nodes = lists:umerge([Node], Nodes)}}; -handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) -> - gen_event:notify(mem3_events, {remove_node, Node}), - {reply, ok, State#state{nodes = lists:delete(Node, Nodes)}}; -handle_call(_Call, _From, State) -> - {noreply, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> - ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]), - StartSeq = State#state.update_seq, - Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end, - timer:send_after(5000, start_listener), - {noreply, State#state{update_seq = Seq}}; -handle_info(start_listener, #state{update_seq = Seq} = State) -> - {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), - {noreply, State#state{changes_pid=NewPid}}; -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% internal functions - -initialize_nodelist() -> - DbName = couch_config:get("mem3", "nodedb", "nodes"), - {ok, Db} = ensure_exists(DbName), - {ok, _, Nodes0} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, [], []), - % add self if not already present - case lists:member(node(), Nodes0) of - true -> - Nodes = Nodes0; - false -> - Doc = #doc{id = couch_util:to_binary(node())}, - {ok, _} = couch_db:update_doc(Db, Doc, []), - Nodes = [node() | Nodes0] - end, - couch_db:close(Db), - {lists:sort(Nodes), Db#db.update_seq}. - -first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) -> - {ok, Acc}; -first_fold(#full_doc_info{deleted=true}, _, Acc) -> - {ok, Acc}; -first_fold(#full_doc_info{id=Id}, _, Acc) -> - {ok, [mem3_util:to_atom(Id) | Acc]}. - -listen_for_changes(Since) -> - DbName = ?l2b(couch_config:get("mem3", "nodedb", "nodes")), - {ok, Db} = ensure_exists(DbName), - Args = #changes_args{ - feed = "continuous", - since = Since, - heartbeat = true, - include_docs = true - }, - ChangesFun = couch_changes:handle_changes(Args, nil, Db), - ChangesFun(fun changes_callback/2). - -ensure_exists(DbName) when is_list(DbName) -> - ensure_exists(list_to_binary(DbName)); -ensure_exists(DbName) -> - Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}], - case couch_db:open(DbName, Options) of - {ok, Db} -> - {ok, Db}; - _ -> - couch_server:create(DbName, Options) - end. - -changes_callback(start, _) -> - {ok, nil}; -changes_callback({stop, EndSeq}, _) -> - exit({seq, EndSeq}); -changes_callback({change, {Change}, _}, _) -> - Node = couch_util:get_value(<<"id">>, Change), - case Node of <<"_design/", _/binary>> -> ok; _ -> - case couch_util:get_value(deleted, Change, false) of - false -> - gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node)}); - true -> - gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)}) - end - end, - {ok, couch_util:get_value(<<"seq">>, Change)}; -changes_callback(timeout, _) -> - {ok, nil}. diff --git a/src/mem3_sup.erl b/src/mem3_sup.erl deleted file mode 100644 index 58d0bbf5..00000000 --- a/src/mem3_sup.erl +++ /dev/null @@ -1,21 +0,0 @@ --module(mem3_sup). --behaviour(supervisor). --export([start_link/0, init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init(_Args) -> - Children = [ - child(mem3_events), - child(mem3_sync), - child(mem3_cache), - child(mem3_nodes) - ], - {ok, {{one_for_one,10,1}, Children}}. - -child(mem3_events) -> - MFA = {gen_event, start_link, [{local, mem3_events}]}, - {mem3_events, MFA, permanent, 1000, worker, dynamic}; -child(Child) -> - {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}. diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl deleted file mode 100644 index d3b3ea51..00000000 --- a/src/mem3_sync.erl +++ /dev/null @@ -1,215 +0,0 @@ --module(mem3_sync). --behaviour(gen_server). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --export([start_link/0, get_active/0, get_queue/0, push/2, remove_node/1]). - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --record(state, { - active = [], - count = 0, - limit, - dict = dict:new(), - waiting = [], - update_notifier -}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -get_active() -> - gen_server:call(?MODULE, get_active). - -get_queue() -> - gen_server:call(?MODULE, get_queue). - -push(Db, Node) -> - gen_server:cast(?MODULE, {push, Db, Node}). - -remove_node(Node) -> - gen_server:cast(?MODULE, {remove_node, Node}). - -init([]) -> - process_flag(trap_exit, true), - Concurrency = couch_config:get("mem3", "sync_concurrency", "10"), - gen_event:add_handler(mem3_events, mem3_sync_event, []), - {ok, Pid} = start_update_notifier(), - spawn(fun initial_sync/0), - {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}. - -handle_call(get_active, _From, State) -> - {reply, State#state.active, State}; - -handle_call(get_queue, _From, State) -> - {reply, State#state.waiting, State}. - -handle_cast({push, DbName, Node}, #state{count=Count, limit=Limit} = State) - when Count >= Limit -> - {noreply, add_to_queue(State, DbName, Node)}; - -handle_cast({push, DbName, Node}, State) -> - #state{active = L, count = C} = State, - case is_running(DbName, Node, L) of - true -> - {noreply, add_to_queue(State, DbName, Node)}; - false -> - Pid = start_push_replication(DbName, Node), - {noreply, State#state{active=[{DbName, Node, Pid}|L], count=C+1}} - end; - -handle_cast({remove_node, Node}, State) -> - Waiting = [{S,N} || {S,N} <- State#state.waiting, N =/= Node], - Dict = lists:foldl(fun(DbName,D) -> dict:erase({DbName,Node}, D) end, - State#state.dict, [S || {S,N} <- Waiting, N =:= Node]), - {noreply, State#state{dict = Dict, waiting = Waiting}}. - -handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) -> - {ok, NewPid} = start_update_notifier(), - {noreply, State#state{update_notifier=NewPid}}; - -handle_info({'EXIT', Active, normal}, State) -> - handle_replication_exit(State, Active); - -handle_info({'EXIT', Active, Reason}, State) -> - case lists:keyfind(Active, 3, State#state.active) of - {OldDbName, OldNode, _} -> - ?LOG_ERROR("~p replication ~s -> ~p died:~n~p", [?MODULE, OldDbName, - OldNode, Reason]), - timer:apply_after(5000, ?MODULE, push, [OldDbName, OldNode]); - false -> ok end, - handle_replication_exit(State, Active); - -handle_info(Msg, State) -> - ?LOG_ERROR("unexpected msg at replication manager ~p", [Msg]), - {noreply, State}. - -terminate(_Reason, State) -> - [exit(Pid, shutdown) || {_,_,Pid} <- State#state.active], - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handle_replication_exit(#state{waiting=[]} = State, Pid) -> - NewActive = lists:keydelete(Pid, 3, State#state.active), - {noreply, State#state{active=NewActive, count=length(NewActive)}}; -handle_replication_exit(State, Pid) -> - #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State, - Active1 = lists:keydelete(Pid, 3, Active), - Count = length(Active1), - NewState = if Count < Limit -> - case next_replication(Active1, Waiting) of - nil -> % all waiting replications are also active - State#state{active = Active1, count = Count}; - {DbName, Node, StillWaiting} -> - NewPid = start_push_replication(DbName, Node), - State#state{ - active = [{DbName, Node, NewPid} | Active1], - count = Count+1, - dict = dict:erase({DbName,Node}, D), - waiting = StillWaiting - } - end; - true -> - State#state{active = Active1, count=Count} - end, - {noreply, NewState}. - -start_push_replication(DbName, Node) -> - PostBody = {[ - {<<"source">>, DbName}, - {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, DbName}]}}, - {<<"continuous">>, false}, - {<<"async">>, true} - ]}, - ?LOG_INFO("starting ~s -> ~p internal replication", [DbName, Node]), - UserCtx = #user_ctx{name = <<"replicator">>, roles = [<<"_admin">>]}, - case (catch couch_rep:replicate(PostBody, UserCtx)) of - Pid when is_pid(Pid) -> - link(Pid), - Pid; - {db_not_found, _Msg} -> - case couch_db:open(DbName, []) of - {ok, Db} -> - % source exists, let's (re)create the target - couch_db:close(Db), - case rpc:call(Node, couch_api, create_db, [DbName, []]) of - {ok, Target} -> - ?LOG_INFO("~p successfully created ~s on ~p", [?MODULE, DbName, - Node]), - couch_db:close(Target), - start_push_replication(DbName, Node); - file_exists -> - start_push_replication(DbName, Node); - Error -> - ?LOG_ERROR("~p couldn't create ~s on ~p because ~p", - [?MODULE, DbName, Node, Error]), - exit(shutdown) - end; - {not_found, no_db_file} -> - % source is gone, so this is a hack to skip it - ?LOG_INFO("~p tried to push ~s to ~p but it was already deleted", - [?MODULE, DbName, Node]), - spawn_link(fun() -> ok end) - end; - {node_not_connected, _} -> - % we'll get this one when the node rejoins - ?LOG_ERROR("~p exiting because ~p is not connected", [?MODULE, Node]), - spawn_link(fun() -> ok end); - CatchAll -> - ?LOG_INFO("~p strange error ~p", [?MODULE, CatchAll]), - case lists:member(Node, nodes()) of - true -> - timer:apply_after(5000, ?MODULE, push, [DbName, Node]); - false -> - ok - end, - spawn_link(fun() -> ok end) - end. - -add_to_queue(State, DbName, Node) -> - #state{dict=D, waiting=Waiting} = State, - case dict:is_key({DbName, Node}, D) of - true -> - State; - false -> - ?LOG_DEBUG("adding ~s -> ~p to internal queue", [DbName, Node]), - State#state{ - dict = dict:store({DbName,Node}, ok, D), - waiting = Waiting ++ [{DbName,Node}] - } - end. - -initial_sync() -> - Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")), - Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), - Nodes = mem3:nodes(), - Live = nodes(), - [[push(Db, N) || Db <- [Db1,Db2]] || N <- Nodes, lists:member(N, Live)]. - -start_update_notifier() -> - Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")), - Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), - couch_db_update_notifier:start_link(fun - ({updated, Db}) when Db == Db1; Db == Db2 -> - Nodes = mem3:nodes(), - Live = nodes(), - [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)]; - (_) -> ok end). - -%% @doc Finds the next {DbName,Node} pair in the list of waiting replications -%% which does not correspond to an already running replication --spec next_replication(list(), list()) -> {binary(),node(),list()} | nil. -next_replication(Active, Waiting) -> - case lists:splitwith(fun({S,N}) -> is_running(S,N,Active) end, Waiting) of - {_, []} -> - nil; - {Running, [{DbName,Node}|Rest]} -> - {DbName, Node, Running ++ Rest} - end. - -is_running(DbName, Node, ActiveList) -> - [] =/= [true || {S,N,_} <- ActiveList, S=:=DbName, N=:=Node]. diff --git a/src/mem3_sync_event.erl b/src/mem3_sync_event.erl deleted file mode 100644 index 45fcb8aa..00000000 --- a/src/mem3_sync_event.erl +++ /dev/null @@ -1,44 +0,0 @@ --module(mem3_sync_event). --behaviour(gen_event). - --export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, - code_change/3]). - -init(_) -> - {ok, nil}. - -handle_event({add_node, Node}, State) -> - Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")), - Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), - [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]], - {ok, State}; - -handle_event({nodeup, Node}, State) -> - case lists:member(Node, mem3:nodes()) of - true -> - Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")), - Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), - [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]]; - false -> - ok - end, - {ok, State}; - -handle_event({Down, Node}, State) when Down == nodedown; Down == remove_node -> - mem3_sync:remove_node(Node), - {ok, State}; - -handle_event(_Event, State) -> - {ok, State}. - -handle_call(_Request, State) -> - {ok, ok, State}. - -handle_info(_Info, State) -> - {ok, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/mem3_util.erl b/src/mem3_util.erl deleted file mode 100644 index 2ed84db6..00000000 --- a/src/mem3_util.erl +++ /dev/null @@ -1,139 +0,0 @@ --module(mem3_util). - --export([hash/1, name_shard/1, create_partition_map/4, build_shards/2, - n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1, - load_shards_from_disk/1, load_shards_from_disk/2]). - --define(RINGTOP, 2 bsl 31). % CRC32 space - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -hash(Item) when is_binary(Item) -> - erlang:crc32(Item); -hash(Item) -> - erlang:crc32(term_to_binary(Item)). - -name_shard(#shard{dbname = DbName, range=[B,E]} = Shard) -> - Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-", - couch_util:to_hex(<<E:32/integer>>), "/", DbName], - Shard#shard{name = ?l2b(Name)}. - -create_partition_map(DbName, N, Q, Nodes) -> - UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []), - Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]), - Shards1 = attach_nodes(Shards0, [], Nodes, []), - [name_shard(S#shard{dbname=DbName}) || S <- Shards1]. - -make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP -> - Acc; -make_key_ranges(Increment, Start, Acc) -> - case Start + 2*Increment of - X when X > ?RINGTOP -> - End = ?RINGTOP - 1; - _ -> - End = Start + Increment - 1 - end, - make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]). - -attach_nodes([], Acc, _, _) -> - lists:reverse(Acc); -attach_nodes(Shards, Acc, [], UsedNodes) -> - attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []); -attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) -> - attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]). - -write_db_doc(Doc) -> - {ok, Db} = couch_db:open(<<"dbs">>, []), - try - update_db_doc(Db, Doc) - catch conflict -> - ?LOG_ERROR("conflict writing db doc, must be a race", []) - after - couch_db:close(Db) - end. - -update_db_doc(Db, #doc{id=Id, body=Body} = Doc) -> - case couch_db:open_doc(Db, Id, []) of - {not_found, _} -> - {ok, _} = couch_db:update_doc(Db, Doc, []); - {ok, #doc{body=Body}} -> - ok; - {ok, OldDoc} -> - {ok, _} = couch_db:update_doc(Db, OldDoc#doc{body=Body}, []) - end. - -delete_db_doc(DocId) -> - {ok, Db} = couch_db:open(<<"dbs">>, []), - try - delete_db_doc(Db, DocId) - catch conflict -> - ok - after - couch_db:close(Db) - end. - -delete_db_doc(Db, DocId) -> - case couch_db:open_doc(Db, DocId, []) of - {not_found, _} -> - ok; - {ok, OldDoc} -> - {ok, _} = couch_db:update_doc(Db, OldDoc#doc{deleted=true}, []) - end. - -build_shards(DbName, DocProps) -> - {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}), - lists:flatmap(fun({Node, Ranges}) -> - lists:map(fun(Range) -> - [B,E] = string:tokens(?b2l(Range), "-"), - Beg = httpd_util:hexlist_to_integer(B), - End = httpd_util:hexlist_to_integer(E), - name_shard(#shard{ - dbname = DbName, - node = to_atom(Node), - range = [Beg, End] - }) - end, Ranges) - end, ByNode). - -to_atom(Node) when is_binary(Node) -> - list_to_atom(binary_to_list(Node)); -to_atom(Node) when is_atom(Node) -> - Node. - -to_integer(N) when is_integer(N) -> - N; -to_integer(N) when is_binary(N) -> - list_to_integer(binary_to_list(N)); -to_integer(N) when is_list(N) -> - list_to_integer(N). - -n_val(undefined, NodeCount) -> - n_val(couch_config:get("cluster", "n", "3"), NodeCount); -n_val(N, NodeCount) when is_list(N) -> - n_val(list_to_integer(N), NodeCount); -n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount -> - ?LOG_ERROR("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]), - NodeCount; -n_val(N, _) when N < 1 -> - 1; -n_val(N, _) -> - N. - -load_shards_from_disk(DbName) when is_binary(DbName) -> - {ok, Db} = couch_db:open(<<"dbs">>, []), - try load_shards_from_db(Db, DbName) after couch_db:close(Db) end. - -load_shards_from_db(#db{} = ShardDb, DbName) -> - case couch_db:open_doc(ShardDb, DbName, []) of - {ok, #doc{body = {Props}}} -> - ?LOG_INFO("dbs cache miss for ~s", [DbName]), - build_shards(DbName, Props); - {not_found, _} -> - erlang:error(database_does_not_exist) - end. - -load_shards_from_disk(DbName, DocId)-> - Shards = load_shards_from_disk(DbName), - HashKey = hash(DocId), - [S || #shard{range = [B,E]} = S <- Shards, B < HashKey, HashKey =< E]. |