diff options
Diffstat (limited to 'src')
26 files changed, 1481 insertions, 2299 deletions
diff --git a/src/couchdb/priv/couch_js/http.c b/src/couchdb/priv/couch_js/http.c new file mode 100644 index 00000000..6c2a8a82 --- /dev/null +++ b/src/couchdb/priv/couch_js/http.c @@ -0,0 +1,675 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <jsapi.h> +#include <curl/curl.h> + +#include "utf8.h" + +#ifdef XP_WIN +// Map some of the string function names to things which exist on Windows +#define strcasecmp _strcmpi +#define strncasecmp _strnicmp +#define snprintf _snprintf +#endif + +typedef struct curl_slist CurlHeaders; + +typedef struct { + int method; + char* url; + CurlHeaders* req_headers; + jsint last_status; +} HTTPData; + +char* METHODS[] = {"GET", "HEAD", "POST", "PUT", "DELETE", "COPY", NULL}; + +#define GET 0 +#define HEAD 1 +#define POST 2 +#define PUT 3 +#define DELETE 4 +#define COPY 5 + +static JSBool +go(JSContext* cx, JSObject* obj, HTTPData* http, char* body, size_t blen); + +static JSString* +str_from_binary(JSContext* cx, char* data, size_t length); + +static JSBool +constructor(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval) +{ + HTTPData* http = NULL; + JSBool ret = JS_FALSE; + + http = (HTTPData*) malloc(sizeof(HTTPData)); + if(!http) + { + JS_ReportError(cx, "Failed to create CouchHTTP instance."); + goto error; + } + + http->method = -1; + http->url = NULL; + http->req_headers = NULL; + http->last_status = -1; + + if(!JS_SetPrivate(cx, obj, http)) + { + JS_ReportError(cx, "Failed to set private CouchHTTP data."); + goto error; + } + + ret = JS_TRUE; + goto success; + +error: + if(http) free(http); + +success: + return ret; +} + +static void +destructor(JSContext* cx, JSObject* obj) +{ + HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj); + if(!http) + { + fprintf(stderr, "Unable to destroy invalid CouchHTTP instance.\n"); + } + else + { + if(http->url) free(http->url); + if(http->req_headers) curl_slist_free_all(http->req_headers); + free(http); + } +} + +static JSBool +open(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval) +{ + HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj); + char* method = NULL; + char* url = NULL; + JSBool ret = JS_FALSE; + int methid; + + if(!http) + { + JS_ReportError(cx, "Invalid CouchHTTP instance."); + goto done; + } + + if(argv[0] == JSVAL_VOID) + { + JS_ReportError(cx, "You must specify a method."); + goto done; + } + + method = enc_string(cx, argv[0], NULL); + if(!method) + { + JS_ReportError(cx, "Failed to encode method."); + goto done; + } + + for(methid = 0; METHODS[methid] != NULL; methid++) + { + if(strcasecmp(METHODS[methid], method) == 0) break; + } + + if(methid > COPY) + { + JS_ReportError(cx, "Invalid method specified."); + goto done; + } + + http->method = methid; + + if(argv[1] == JSVAL_VOID) + { + JS_ReportError(cx, "You must specify a URL."); + goto done; + } + + if(http->url) + { + free(http->url); + http->url = NULL; + } + + http->url = enc_string(cx, argv[1], NULL); + if(!http->url) + { + JS_ReportError(cx, "Failed to encode URL."); + goto done; + } + + if(argv[2] != JSVAL_VOID && argv[2] != JSVAL_FALSE) + { + JS_ReportError(cx, "Synchronous flag must be false if specified."); + goto done; + } + + if(http->req_headers) + { + curl_slist_free_all(http->req_headers); + http->req_headers = NULL; + } + + // Disable Expect: 100-continue + http->req_headers = curl_slist_append(http->req_headers, "Expect:"); + + ret = JS_TRUE; + +done: + if(method) free(method); + return ret; +} + +static JSBool +setheader(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval) +{ + HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj); + char* keystr = NULL; + char* valstr = NULL; + char* hdrbuf = NULL; + size_t hdrlen = -1; + JSBool ret = JS_FALSE; + + if(!http) + { + JS_ReportError(cx, "Invalid CouchHTTP instance."); + goto done; + } + + if(argv[0] == JSVAL_VOID) + { + JS_ReportError(cx, "You must speciy a header name."); + goto done; + } + + keystr = enc_string(cx, argv[0], NULL); + if(!keystr) + { + JS_ReportError(cx, "Failed to encode header name."); + goto done; + } + + if(argv[1] == JSVAL_VOID) + { + JS_ReportError(cx, "You must specify a header value."); + goto done; + } + + valstr = enc_string(cx, argv[1], NULL); + if(!valstr) + { + JS_ReportError(cx, "Failed to encode header value."); + goto done; + } + + hdrlen = strlen(keystr) + strlen(valstr) + 3; + hdrbuf = (char*) malloc(hdrlen * sizeof(char)); + if(!hdrbuf) + { + JS_ReportError(cx, "Failed to allocate header buffer."); + goto done; + } + + snprintf(hdrbuf, hdrlen, "%s: %s", keystr, valstr); + http->req_headers = curl_slist_append(http->req_headers, hdrbuf); + + ret = JS_TRUE; + +done: + if(keystr) free(keystr); + if(valstr) free(valstr); + if(hdrbuf) free(hdrbuf); + + return ret; +} + +static JSBool +sendreq(JSContext* cx, JSObject* obj, uintN argc, jsval* argv, jsval* rval) +{ + HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj); + char* body = NULL; + size_t bodylen = 0; + JSBool ret = JS_FALSE; + + if(!http) + { + JS_ReportError(cx, "Invalid CouchHTTP instance."); + goto done; + } + + if(argv[0] != JSVAL_VOID && argv[0] != JS_GetEmptyStringValue(cx)) + { + body = enc_string(cx, argv[0], &bodylen); + if(!body) + { + JS_ReportError(cx, "Failed to encode body."); + goto done; + } + } + + ret = go(cx, obj, http, body, bodylen); + +done: + if(body) free(body); + return ret; +} + +static JSBool +status(JSContext* cx, JSObject* obj, jsval idval, jsval* vp) +{ + HTTPData* http = (HTTPData*) JS_GetPrivate(cx, obj); + + if(!http) + { + JS_ReportError(cx, "Invalid CouchHTTP instance."); + return JS_FALSE; + } + + if(INT_FITS_IN_JSVAL(http->last_status)) + { + *vp = INT_TO_JSVAL(http->last_status); + return JS_TRUE; + } + else + { + JS_ReportError(cx, "INTERNAL: Invalid last_status"); + return JS_FALSE; + } +} + +JSClass CouchHTTPClass = { + "CouchHTTP", + JSCLASS_HAS_PRIVATE + | JSCLASS_CONSTRUCT_PROTOTYPE + | JSCLASS_HAS_RESERVED_SLOTS(2), + JS_PropertyStub, + JS_PropertyStub, + JS_PropertyStub, + JS_PropertyStub, + JS_EnumerateStub, + JS_ResolveStub, + JS_ConvertStub, + destructor, + JSCLASS_NO_OPTIONAL_MEMBERS +}; + +JSPropertySpec CouchHTTPProperties[] = { + {"status", 0, JSPROP_READONLY, status, NULL}, + {0, 0, 0, 0, 0} +}; + +JSFunctionSpec CouchHTTPFunctions[] = { + {"_open", open, 3, 0, 0}, + {"_setRequestHeader", setheader, 2, 0, 0}, + {"_send", sendreq, 1, 0, 0}, + {0, 0, 0, 0, 0} +}; + +JSObject* +install_http(JSContext* cx, JSObject* glbl) +{ + JSObject* klass = NULL; + HTTPData* http = NULL; + + klass = JS_InitClass( + cx, + glbl, + NULL, + &CouchHTTPClass, + constructor, + 0, + CouchHTTPProperties, + CouchHTTPFunctions, + NULL, + NULL + ); + + if(!klass) + { + fprintf(stderr, "Failed to initialize CouchHTTP class.\n"); + return NULL; + } + + return klass; +} + + +// Curl Helpers + +typedef struct { + HTTPData* http; + JSContext* cx; + JSObject* resp_headers; + char* sendbuf; + size_t sendlen; + size_t sent; + char* recvbuf; + size_t recvlen; + size_t read; +} CurlState; + +/* + * I really hate doing this but this doesn't have to be + * uber awesome, it just has to work. + */ +CURL* HTTP_HANDLE = NULL; +char ERRBUF[CURL_ERROR_SIZE]; + +static size_t send_body(void *ptr, size_t size, size_t nmem, void *data); +static int seek_body(void *ptr, curl_off_t offset, int origin); +static size_t recv_body(void *ptr, size_t size, size_t nmem, void *data); +static size_t recv_header(void *ptr, size_t size, size_t nmem, void *data); + +static JSBool +go(JSContext* cx, JSObject* obj, HTTPData* http, char* body, size_t bodylen) +{ + CurlState state; + JSString* jsbody; + JSBool ret = JS_FALSE; + jsval tmp; + + state.cx = cx; + state.http = http; + + state.sendbuf = body; + state.sendlen = bodylen; + state.sent = 0; + + state.recvbuf = NULL; + state.recvlen = 0; + state.read = 0; + + if(HTTP_HANDLE == NULL) + { + HTTP_HANDLE = curl_easy_init(); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_READFUNCTION, send_body); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_SEEKFUNCTION, + (curl_seek_callback) seek_body); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_HEADERFUNCTION, recv_header); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEFUNCTION, recv_body); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOPROGRESS, 1); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_ERRORBUFFER, ERRBUF); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_COOKIEFILE, ""); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_USERAGENT, + "CouchHTTP Client - Relax"); + } + + if(!HTTP_HANDLE) + { + JS_ReportError(cx, "Failed to initialize cURL handle."); + goto done; + } + + if(http->method < 0 || http->method > COPY) + { + JS_ReportError(cx, "INTERNAL: Unknown method."); + goto done; + } + + curl_easy_setopt(HTTP_HANDLE, CURLOPT_CUSTOMREQUEST, METHODS[http->method]); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOBODY, 0); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_UPLOAD, 0); + + if(http->method == HEAD) + { + curl_easy_setopt(HTTP_HANDLE, CURLOPT_NOBODY, 1); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 0); + } + else if(http->method == POST || http->method == PUT) + { + curl_easy_setopt(HTTP_HANDLE, CURLOPT_UPLOAD, 1); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_FOLLOWLOCATION, 0); + } + + if(body && bodylen) + { + curl_easy_setopt(HTTP_HANDLE, CURLOPT_INFILESIZE, bodylen); + } + else + { + curl_easy_setopt(HTTP_HANDLE, CURLOPT_INFILESIZE, 0); + } + + //curl_easy_setopt(HTTP_HANDLE, CURLOPT_VERBOSE, 1); + + curl_easy_setopt(HTTP_HANDLE, CURLOPT_URL, http->url); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_HTTPHEADER, http->req_headers); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_READDATA, &state); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_SEEKDATA, &state); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEHEADER, &state); + curl_easy_setopt(HTTP_HANDLE, CURLOPT_WRITEDATA, &state); + + if(curl_easy_perform(HTTP_HANDLE) != 0) + { + JS_ReportError(cx, "Failed to execute HTTP request: %s", ERRBUF); + goto done; + } + + if(!state.resp_headers) + { + JS_ReportError(cx, "Failed to recieve HTTP headers."); + goto done; + } + + tmp = OBJECT_TO_JSVAL(state.resp_headers); + if(!JS_DefineProperty( + cx, + obj, + "_headers", + tmp, + NULL, + NULL, + JSPROP_READONLY + )) + { + JS_ReportError(cx, "INTERNAL: Failed to set response headers."); + goto done; + } + + if(state.recvbuf) // Is good enough? + { + state.recvbuf[state.read] = '\0'; + jsbody = dec_string(cx, state.recvbuf, state.read+1); + if(!jsbody) + { + // If we can't decode the body as UTF-8 we forcefully + // convert it to a string by just forcing each byte + // to a jschar. + jsbody = str_from_binary(cx, state.recvbuf, state.read); + if(!jsbody) { + if(!JS_IsExceptionPending(cx)) { + JS_ReportError(cx, "INTERNAL: Failed to decode body."); + } + goto done; + } + } + tmp = STRING_TO_JSVAL(jsbody); + } + else + { + tmp = JS_GetEmptyStringValue(cx); + } + + if(!JS_DefineProperty( + cx, + obj, + "responseText", + tmp, + NULL, + NULL, + JSPROP_READONLY + )) + { + JS_ReportError(cx, "INTERNAL: Failed to set responseText."); + goto done; + } + + ret = JS_TRUE; + +done: + if(state.recvbuf) JS_free(cx, state.recvbuf); + return ret; +} + +static size_t +send_body(void *ptr, size_t size, size_t nmem, void *data) +{ + CurlState* state = (CurlState*) data; + size_t length = size * nmem; + size_t towrite = state->sendlen - state->sent; + if(towrite == 0) + { + return 0; + } + + if(length < towrite) towrite = length; + + //fprintf(stderr, "%lu %lu %lu %lu\n", state->bodyused, state->bodyread, length, towrite); + + memcpy(ptr, state->sendbuf + state->sent, towrite); + state->sent += towrite; + + return towrite; +} + +static int +seek_body(void* ptr, curl_off_t offset, int origin) +{ + CurlState* state = (CurlState*) ptr; + if(origin != SEEK_SET) return -1; + + state->sent = (size_t) offset; + return (int) state->sent; +} + +static size_t +recv_header(void *ptr, size_t size, size_t nmem, void *data) +{ + CurlState* state = (CurlState*) data; + char code[4]; + char* header = (char*) ptr; + size_t length = size * nmem; + size_t index = 0; + JSString* hdr = NULL; + jsuint hdrlen; + jsval hdrval; + + if(length > 7 && strncasecmp(header, "HTTP/1.", 7) == 0) + { + if(length < 12) + { + return CURLE_WRITE_ERROR; + } + + memcpy(code, header+9, 3*sizeof(char)); + code[3] = '\0'; + state->http->last_status = atoi(code); + + state->resp_headers = JS_NewArrayObject(state->cx, 0, NULL); + if(!state->resp_headers) + { + return CURLE_WRITE_ERROR; + } + + return length; + } + + // We get a notice at the \r\n\r\n after headers. + if(length <= 2) + { + return length; + } + + // Append the new header to our array. + hdr = dec_string(state->cx, header, length); + if(!hdr) + { + return CURLE_WRITE_ERROR; + } + + if(!JS_GetArrayLength(state->cx, state->resp_headers, &hdrlen)) + { + return CURLE_WRITE_ERROR; + } + + hdrval = STRING_TO_JSVAL(hdr); + if(!JS_SetElement(state->cx, state->resp_headers, hdrlen, &hdrval)) + { + return CURLE_WRITE_ERROR; + } + + return length; +} + +static size_t +recv_body(void *ptr, size_t size, size_t nmem, void *data) +{ + CurlState* state = (CurlState*) data; + size_t length = size * nmem; + char* tmp = NULL; + + if(!state->recvbuf) + { + state->recvlen = 4096; + state->read = 0; + state->recvbuf = JS_malloc(state->cx, state->recvlen); + } + + if(!state->recvbuf) + { + return CURLE_WRITE_ERROR; + } + + // +1 so we can add '\0' back up in the go function. + while(length+1 > state->recvlen - state->read) state->recvlen *= 2; + tmp = JS_realloc(state->cx, state->recvbuf, state->recvlen); + if(!tmp) return CURLE_WRITE_ERROR; + state->recvbuf = tmp; + + memcpy(state->recvbuf + state->read, ptr, length); + state->read += length; + return length; +} + +JSString* +str_from_binary(JSContext* cx, char* data, size_t length) +{ + jschar* conv = (jschar*) JS_malloc(cx, length * sizeof(jschar)); + JSString* ret = NULL; + size_t i; + + if(!conv) return NULL; + + for(i = 0; i < length; i++) + { + conv[i] = (jschar) data[i]; + } + + ret = JS_NewUCString(cx, conv, length); + if(!ret) JS_free(cx, conv); + + return ret; +} diff --git a/src/couchdb/priv/couch_js/http.h b/src/couchdb/priv/couch_js/http.h new file mode 100644 index 00000000..b5f8c70f --- /dev/null +++ b/src/couchdb/priv/couch_js/http.h @@ -0,0 +1,18 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#ifndef COUCH_JS_HTTP_H +#define COUCH_JS_HTTP_H + +JSObject* install_http(JSContext* cx, JSObject* global); + +#endif
\ No newline at end of file diff --git a/src/couchdb/priv/couch_js/main.c b/src/couchdb/priv/couch_js/main.c new file mode 100644 index 00000000..376aa15b --- /dev/null +++ b/src/couchdb/priv/couch_js/main.c @@ -0,0 +1,338 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <jsapi.h> +#include "config.h" + +#include "utf8.h" +#include "http.h" + +int gExitCode = 0; + +#ifdef JS_THREADSAFE +#define SETUP_REQUEST(cx) \ + JS_SetContextThread(cx); \ + JS_BeginRequest(cx); +#define FINISH_REQUEST(cx) \ + JS_EndRequest(cx); \ + JS_ClearContextThread(cx); +#else +#define SETUP_REQUEST(cx) +#define FINISH_REQUEST(cx) +#endif + +static JSBool +evalcx(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) +{ + JSString *str; + JSObject *sandbox; + JSContext *subcx; + const jschar *src; + size_t srclen; + JSBool ret = JS_FALSE; + jsval v; + + sandbox = NULL; + if(!JS_ConvertArguments(cx, argc, argv, "S / o", &str, &sandbox)) + { + return JS_FALSE; + } + + subcx = JS_NewContext(JS_GetRuntime(cx), 8L * 1024L); + if(!subcx) + { + JS_ReportOutOfMemory(cx); + return JS_FALSE; + } + + SETUP_REQUEST(subcx); + + src = JS_GetStringChars(str); + srclen = JS_GetStringLength(str); + + if(!sandbox) + { + sandbox = JS_NewObject(subcx, NULL, NULL, NULL); + if(!sandbox || !JS_InitStandardClasses(subcx, sandbox)) goto done; + } + + if(srclen == 0) + { + *rval = OBJECT_TO_JSVAL(sandbox); + } + else + { + JS_EvaluateUCScript(subcx, sandbox, src, srclen, NULL, 0, rval); + } + + ret = JS_TRUE; + +done: + FINISH_REQUEST(subcx); + JS_DestroyContext(subcx); + return ret; +} + +static JSBool +gc(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) +{ + JS_GC(cx); + return JS_TRUE; +} + +static JSBool +print(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) +{ + uintN i; + char *bytes; + + for(i = 0; i < argc; i++) + { + bytes = enc_string(cx, argv[i], NULL); + if(!bytes) return JS_FALSE; + + fprintf(stdout, "%s%s", i ? " " : "", bytes); + JS_free(cx, bytes); + } + + fputc('\n', stdout); + fflush(stdout); + return JS_TRUE; +} + +static JSBool +quit(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) +{ + JS_ConvertArguments(cx, argc, argv, "/ i", &gExitCode); + return JS_FALSE; +} + +static char* +readfp(JSContext* cx, FILE* fp, size_t* buflen) +{ + char* bytes = NULL; + char* tmp = NULL; + size_t used = 0; + size_t byteslen = 256; + size_t readlen = 0; + + bytes = JS_malloc(cx, byteslen); + if(bytes == NULL) return NULL; + + while((readlen = js_fgets(bytes+used, byteslen-used, stdin)) > 0) + { + used += readlen; + + if(bytes[used-1] == '\n') + { + bytes[used-1] = '\0'; + break; + } + + // Double our buffer and read more. + byteslen *= 2; + tmp = JS_realloc(cx, bytes, byteslen); + if(!tmp) + { + JS_free(cx, bytes); + return NULL; + } + bytes = tmp; + } + + *buflen = used; + return bytes; +} + +static JSBool +readline(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) { + jschar *chars; + JSString *str; + char* bytes; + char* tmp; + size_t byteslen; + + /* GC Occasionally */ + JS_MaybeGC(cx); + + bytes = readfp(cx, stdin, &byteslen); + if(!bytes) return JS_FALSE; + + /* Treat the empty string specially */ + if(byteslen == 0) + { + *rval = JS_GetEmptyStringValue(cx); + JS_free(cx, bytes); + return JS_TRUE; + } + + /* Shrink the buffer to the real size */ + tmp = JS_realloc(cx, bytes, byteslen); + if(!tmp) + { + JS_free(cx, bytes); + return JS_FALSE; + } + bytes = tmp; + + str = dec_string(cx, bytes, byteslen); + JS_free(cx, bytes); + + if(!str) return JS_FALSE; + + *rval = STRING_TO_JSVAL(str); + + return JS_TRUE; +} + +static JSBool +seal(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) { + JSObject *target; + JSBool deep = JS_FALSE; + + if (!JS_ConvertArguments(cx, argc, argv, "o/b", &target, &deep)) + return JS_FALSE; + if (!target) + return JS_TRUE; + return JS_SealObject(cx, target, deep); +} + +static void +execute_script(JSContext *cx, JSObject *obj, const char *filename) { + FILE *file; + JSScript *script; + jsval result; + + if(!filename || strcmp(filename, "-") == 0) + { + file = stdin; + } + else + { + file = fopen(filename, "r"); + if (!file) + { + fprintf(stderr, "could not open script file %s\n", filename); + gExitCode = 1; + return; + } + } + + script = JS_CompileFileHandle(cx, obj, filename, file); + if(script) + { + JS_ExecuteScript(cx, obj, script, &result); + JS_DestroyScript(cx, script); + } +} + +static void +printerror(JSContext *cx, const char *mesg, JSErrorReport *report) +{ + if(!report || !JSREPORT_IS_WARNING(report->flags)) + { + fprintf(stderr, "%s\n", mesg); + } +} + +static JSFunctionSpec global_functions[] = { + {"evalcx", evalcx, 0, 0, 0}, + {"gc", gc, 0, 0, 0}, + {"print", print, 0, 0, 0}, + {"quit", quit, 0, 0, 0}, + {"readline", readline, 0, 0, 0}, + {"seal", seal, 0, 0, 0}, + {0, 0, 0, 0, 0} +}; + +static JSClass global_class = { + "GlobalClass", + JSCLASS_GLOBAL_FLAGS, + JS_PropertyStub, + JS_PropertyStub, + JS_PropertyStub, + JS_PropertyStub, + JS_EnumerateStub, + JS_ResolveStub, + JS_ConvertStub, + JS_FinalizeStub, + JSCLASS_NO_OPTIONAL_MEMBERS +}; + +int +main(int argc, const char * argv[]) +{ + JSRuntime* rt = NULL; + JSContext* cx = NULL; + JSObject* global = NULL; + JSFunctionSpec* sp = NULL; + int i = 0; + + rt = JS_NewRuntime(64L * 1024L * 1024L); + if (!rt) return 1; + + cx = JS_NewContext(rt, 8L * 1024L); + if (!cx) return 1; + + JS_SetErrorReporter(cx, printerror); + JS_ToggleOptions(cx, JSOPTION_XML); + + SETUP_REQUEST(cx); + + global = JS_NewObject(cx, &global_class, NULL, NULL); + if (!global) return 1; + if (!JS_InitStandardClasses(cx, global)) return 1; + + for(sp = global_functions; sp->name != NULL; sp++) + { + if(!JS_DefineFunction(cx, global, + sp->name, sp->call, sp->nargs, sp->flags)) + { + fprintf(stderr, "Failed to create function: %s\n", sp->name); + return 1; + } + } + + if(!install_http(cx, global)) + { + return 1; + } + + JS_SetGlobalObject(cx, global); + + if(argc > 2) + { + fprintf(stderr, "incorrect number of arguments\n\n"); + fprintf(stderr, "usage: %s <scriptfile>\n", argv[0]); + return 2; + } + + if(argc == 0) + { + execute_script(cx, global, NULL); + } + else + { + execute_script(cx, global, argv[1]); + } + + FINISH_REQUEST(cx); + + JS_DestroyContext(cx); + JS_DestroyRuntime(rt); + JS_ShutDown(); + + return gExitCode; +} diff --git a/src/couchdb/priv/couch_js/utf8.c b/src/couchdb/priv/couch_js/utf8.c new file mode 100644 index 00000000..699a6fee --- /dev/null +++ b/src/couchdb/priv/couch_js/utf8.c @@ -0,0 +1,286 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#include <jsapi.h> + +static int +enc_char(uint8 *utf8Buffer, uint32 ucs4Char) +{ + int utf8Length = 1; + + if (ucs4Char < 0x80) + { + *utf8Buffer = (uint8)ucs4Char; + } + else + { + int i; + uint32 a = ucs4Char >> 11; + utf8Length = 2; + while(a) + { + a >>= 5; + utf8Length++; + } + i = utf8Length; + while(--i) + { + utf8Buffer[i] = (uint8)((ucs4Char & 0x3F) | 0x80); + ucs4Char >>= 6; + } + *utf8Buffer = (uint8)(0x100 - (1 << (8-utf8Length)) + ucs4Char); + } + + return utf8Length; +} + +static JSBool +enc_charbuf(const jschar* src, size_t srclen, char* dst, size_t* dstlenp) +{ + size_t i; + size_t utf8Len; + size_t dstlen = *dstlenp; + size_t origDstlen = dstlen; + jschar c; + jschar c2; + uint32 v; + uint8 utf8buf[6]; + + if(!dst) + { + dstlen = origDstlen = (size_t) -1; + } + + while(srclen) + { + c = *src++; + srclen--; + + if((c >= 0xDC00) && (c <= 0xDFFF)) goto bad_surrogate; + + if(c < 0xD800 || c > 0xDBFF) + { + v = c; + } + else + { + if(srclen < 1) goto buffer_too_small; + c2 = *src++; + srclen--; + if ((c2 < 0xDC00) || (c2 > 0xDFFF)) + { + c = c2; + goto bad_surrogate; + } + v = ((c - 0xD800) << 10) + (c2 - 0xDC00) + 0x10000; + } + if(v < 0x0080) + { + /* no encoding necessary - performance hack */ + if(!dstlen) goto buffer_too_small; + if(dst) *dst++ = (char) v; + utf8Len = 1; + } + else + { + utf8Len = enc_char(utf8buf, v); + if(utf8Len > dstlen) goto buffer_too_small; + if(dst) + { + for (i = 0; i < utf8Len; i++) + { + *dst++ = (char) utf8buf[i]; + } + } + } + dstlen -= utf8Len; + } + + *dstlenp = (origDstlen - dstlen); + return JS_TRUE; + +bad_surrogate: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; + +buffer_too_small: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; +} + +char* +enc_string(JSContext* cx, jsval arg, size_t* buflen) +{ + JSString* str = NULL; + jschar* src = NULL; + char* bytes = NULL; + size_t srclen = 0; + size_t byteslen = 0; + + str = JS_ValueToString(cx, arg); + if(!str) goto error; + + src = JS_GetStringChars(str); + srclen = JS_GetStringLength(str); + + if(!enc_charbuf(src, srclen, NULL, &byteslen)) goto error; + + bytes = JS_malloc(cx, (byteslen) + 1); + bytes[byteslen] = 0; + + if(!enc_charbuf(src, srclen, bytes, &byteslen)) goto error; + + if(buflen) *buflen = byteslen; + goto success; + +error: + if(bytes != NULL) JS_free(cx, bytes); + bytes = NULL; + +success: + return bytes; +} + +static uint32 +dec_char(const uint8 *utf8Buffer, int utf8Length) +{ + uint32 ucs4Char; + uint32 minucs4Char; + + /* from Unicode 3.1, non-shortest form is illegal */ + static const uint32 minucs4Table[] = { + 0x00000080, 0x00000800, 0x0001000, 0x0020000, 0x0400000 + }; + + if (utf8Length == 1) + { + ucs4Char = *utf8Buffer; + } + else + { + ucs4Char = *utf8Buffer++ & ((1<<(7-utf8Length))-1); + minucs4Char = minucs4Table[utf8Length-2]; + while(--utf8Length) + { + ucs4Char = ucs4Char<<6 | (*utf8Buffer++ & 0x3F); + } + if(ucs4Char < minucs4Char || ucs4Char == 0xFFFE || ucs4Char == 0xFFFF) + { + ucs4Char = 0xFFFD; + } + } + + return ucs4Char; +} + +static JSBool +dec_charbuf(const char *src, size_t srclen, jschar *dst, size_t *dstlenp) +{ + uint32 v; + size_t offset = 0; + size_t j; + size_t n; + size_t dstlen = *dstlenp; + size_t origDstlen = dstlen; + + if(!dst) dstlen = origDstlen = (size_t) -1; + + while(srclen) + { + v = (uint8) *src; + n = 1; + + if(v & 0x80) + { + while(v & (0x80 >> n)) + { + n++; + } + + if(n > srclen) goto buffer_too_small; + if(n == 1 || n > 6) goto bad_character; + + for(j = 1; j < n; j++) + { + if((src[j] & 0xC0) != 0x80) goto bad_character; + } + + v = dec_char((const uint8 *) src, n); + if(v >= 0x10000) + { + v -= 0x10000; + + if(v > 0xFFFFF || dstlen < 2) + { + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; + } + + if(dstlen < 2) goto buffer_too_small; + + if(dst) + { + *dst++ = (jschar)((v >> 10) + 0xD800); + v = (jschar)((v & 0x3FF) + 0xDC00); + } + dstlen--; + } + } + + if(!dstlen) goto buffer_too_small; + if(dst) *dst++ = (jschar) v; + + dstlen--; + offset += n; + src += n; + srclen -= n; + } + + *dstlenp = (origDstlen - dstlen); + return JS_TRUE; + +bad_character: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; + +buffer_too_small: + *dstlenp = (origDstlen - dstlen); + return JS_FALSE; +} + +JSString* +dec_string(JSContext* cx, const char* bytes, size_t byteslen) +{ + JSString* str = NULL; + jschar* chars = NULL; + size_t charslen; + + if(!dec_charbuf(bytes, byteslen, NULL, &charslen)) goto error; + + chars = JS_malloc(cx, (charslen + 1) * sizeof(jschar)); + if(!chars) return NULL; + chars[charslen] = 0; + + if(!dec_charbuf(bytes, byteslen, chars, &charslen)) goto error; + + str = JS_NewUCString(cx, chars, charslen - 1); + if(!str) goto error; + + goto success; + +error: + if(chars != NULL) JS_free(cx, chars); + str = NULL; + +success: + return str; +}
\ No newline at end of file diff --git a/src/couchdb/priv/couch_js/utf8.h b/src/couchdb/priv/couch_js/utf8.h new file mode 100644 index 00000000..00f6b736 --- /dev/null +++ b/src/couchdb/priv/couch_js/utf8.h @@ -0,0 +1,19 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +#ifndef COUCH_JS_UTF_8_H +#define COUCH_JS_UTF_8_H + +char* enc_string(JSContext* cx, jsval arg, size_t* buflen); +JSString* dec_string(JSContext* cx, const char* buf, size_t buflen); + +#endif
\ No newline at end of file diff --git a/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c b/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c new file mode 100644 index 00000000..06782315 --- /dev/null +++ b/src/couchdb/priv/spawnkillable/couchspawnkillable_win.c @@ -0,0 +1,145 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +// Do what 2 lines of shell script in couchspawnkillable does... +// * Create a new suspended process with the same (duplicated) standard +// handles as us. +// * Write a line to stdout, consisting of the path to ourselves, plus +// '--kill {pid}' where {pid} is the PID of the newly created process. +// * Un-suspend the new process. +// * Wait for the process to terminate. +// * Terminate with the child's exit-code. + +// Later, couch will call us with --kill and the PID, so we dutifully +// terminate the specified PID. + +#include <stdlib.h> +#include "windows.h" + +char *get_child_cmdline(int argc, char **argv) +{ + // make a new command-line, but skipping me. + // XXX - todo - spaces etc in args??? + int i; + char *p, *cmdline; + int nchars = 0; + int nthis = 1; + for (i=1;i<argc;i++) + nchars += strlen(argv[i])+1; + cmdline = p = malloc(nchars+1); + if (!cmdline) + return NULL; + for (i=1;i<argc;i++) { + nthis = strlen(argv[i]); + strncpy(p, argv[i], nthis); + p[nthis] = ' '; + p += nthis+1; + } + // Replace the last space we added above with a '\0' + cmdline[nchars-1] = '\0'; + return cmdline; +} + +// create the child process, returning 0, or the exit-code we will +// terminate with. +int create_child(int argc, char **argv, PROCESS_INFORMATION *pi) +{ + char buf[1024]; + DWORD dwcreate; + STARTUPINFO si; + char *cmdline; + if (argc < 2) + return 1; + cmdline = get_child_cmdline(argc, argv); + if (!cmdline) + return 2; + + memset(&si, 0, sizeof(si)); + si.cb = sizeof(si); + // depending on how *our* parent is started, we may or may not have + // a valid stderr stream - so although we try and duplicate it, only + // failing to duplicate stdin and stdout are considered fatal. + if (!DuplicateHandle(GetCurrentProcess(), + GetStdHandle(STD_INPUT_HANDLE), + GetCurrentProcess(), + &si.hStdInput, + 0, + TRUE, // inheritable + DUPLICATE_SAME_ACCESS) || + !DuplicateHandle(GetCurrentProcess(), + GetStdHandle(STD_OUTPUT_HANDLE), + GetCurrentProcess(), + &si.hStdOutput, + 0, + TRUE, // inheritable + DUPLICATE_SAME_ACCESS)) { + return 3; + } + DuplicateHandle(GetCurrentProcess(), + GetStdHandle(STD_ERROR_HANDLE), + GetCurrentProcess(), + &si.hStdError, + 0, + TRUE, // inheritable + DUPLICATE_SAME_ACCESS); + + si.dwFlags = STARTF_USESTDHANDLES; + dwcreate = CREATE_SUSPENDED; + if (!CreateProcess( NULL, cmdline, + NULL, + NULL, + TRUE, // inherit handles + dwcreate, + NULL, // environ + NULL, // cwd + &si, + pi)) + return 4; + return 0; +} + +// and here we go... +int main(int argc, char **argv) +{ + char out_buf[1024]; + int rc; + DWORD cbwritten; + DWORD exitcode; + PROCESS_INFORMATION pi; + if (argc==3 && strcmp(argv[1], "--kill")==0) { + HANDLE h = OpenProcess(PROCESS_TERMINATE, 0, atoi(argv[2])); + if (!h) + return 1; + if (!TerminateProcess(h, 0)) + return 2; + CloseHandle(h); + return 0; + } + // spawn the new suspended process + rc = create_child(argc, argv, &pi); + if (rc) + return rc; + // Write the 'terminate' command, which includes this PID, back to couch. + // *sob* - what about spaces etc? + sprintf_s(out_buf, sizeof(out_buf), "%s --kill %d\n", + argv[0], pi.dwProcessId); + WriteFile(GetStdHandle(STD_OUTPUT_HANDLE), out_buf, strlen(out_buf), + &cbwritten, NULL); + // Let the child process go... + ResumeThread(pi.hThread); + // Wait for the process to terminate so we can reflect the exit code + // back to couch. + WaitForSingleObject(pi.hProcess, INFINITE); + if (!GetExitCodeProcess(pi.hProcess, &exitcode)) + return 6; + return exitcode; +} diff --git a/src/fabric.erl b/src/fabric.erl deleted file mode 100644 index 1be97a98..00000000 --- a/src/fabric.erl +++ /dev/null @@ -1,225 +0,0 @@ --module(fabric). - --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -% DBs --export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1, - delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3, - set_security/3, get_revs_limit/1, get_security/1]). - -% Documents --export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3, - update_docs/3, att_receiver/2]). - -% Views --export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6, - get_view_group_info/2]). - -% miscellany --export([design_docs/1, reset_validation_funs/1]). - --include("fabric.hrl"). - -% db operations - -all_dbs() -> - all_dbs(<<>>). - -all_dbs(Prefix) when is_list(Prefix) -> - all_dbs(list_to_binary(Prefix)); -all_dbs(Prefix) when is_binary(Prefix) -> - Length = byte_size(Prefix), - MatchingDbs = ets:foldl(fun(#shard{dbname=DbName}, Acc) -> - case DbName of - <<Prefix:Length/binary, _/binary>> -> - [DbName | Acc]; - _ -> - Acc - end - end, [], partitions), - {ok, lists:usort(MatchingDbs)}. - -get_db_info(DbName) -> - fabric_db_info:go(dbname(DbName)). - -get_doc_count(DbName) -> - fabric_db_doc_count:go(dbname(DbName)). - -create_db(DbName) -> - create_db(DbName, []). - -create_db(DbName, Options) -> - fabric_db_create:go(dbname(DbName), opts(Options)). - -delete_db(DbName) -> - delete_db(DbName, []). - -delete_db(DbName, Options) -> - fabric_db_delete:go(dbname(DbName), opts(Options)). - -set_revs_limit(DbName, Limit, Options) when is_integer(Limit), Limit > 0 -> - fabric_db_meta:set_revs_limit(dbname(DbName), Limit, opts(Options)). - -get_revs_limit(DbName) -> - {ok, Db} = fabric_util:get_db(dbname(DbName)), - try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end. - -set_security(DbName, SecObj, Options) -> - fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)). - -get_security(DbName) -> - {ok, Db} = fabric_util:get_db(dbname(DbName)), - try couch_db:get_security(Db) after catch couch_db:close(Db) end. - -% doc operations -open_doc(DbName, Id, Options) -> - fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)). - -open_revs(DbName, Id, Revs, Options) -> - fabric_doc_open_revs:go(dbname(DbName), docid(Id), Revs, opts(Options)). - -get_missing_revs(DbName, IdsRevs) when is_list(IdsRevs) -> - Sanitized = [idrevs(IdR) || IdR <- IdsRevs], - fabric_doc_missing_revs:go(dbname(DbName), Sanitized). - -update_doc(DbName, Doc, Options) -> - case update_docs(DbName, [Doc], opts(Options)) of - {ok, [{ok, NewRev}]} -> - {ok, NewRev}; - {ok, [Error]} -> - throw(Error); - {ok, []} -> - % replication success - #doc{revs = {Pos, [RevId | _]}} = doc(Doc), - {ok, {Pos, RevId}} - end. - -update_docs(DbName, Docs, Options) -> - try fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) - catch {aborted, PreCommitFailures} -> - {aborted, PreCommitFailures} - end. - -att_receiver(Req, Length) -> - fabric_doc_attachments:receiver(Req, Length). - -all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when - is_function(Callback, 2) -> - fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0). - -changes(DbName, Callback, Acc0, Options) -> - % TODO use a keylist for Options instead of #changes_args, BugzID 10281 - Feed = Options#changes_args.feed, - fabric_view_changes:go(dbname(DbName), Feed, Options, Callback, Acc0). - -query_view(DbName, DesignName, ViewName) -> - query_view(DbName, DesignName, ViewName, #view_query_args{}). - -query_view(DbName, DesignName, ViewName, QueryArgs) -> - Callback = fun default_callback/2, - query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs). - -query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) -> - Db = dbname(DbName), View = name(ViewName), - case is_reduce_view(Db, Design, View, QueryArgs) of - true -> - Mod = fabric_view_reduce; - false -> - Mod = fabric_view_map - end, - Mod:go(Db, Design, View, QueryArgs, Callback, Acc0). - -get_view_group_info(DbName, DesignId) -> - fabric_group_info:go(dbname(DbName), design_doc(DesignId)). - -design_docs(DbName) -> - QueryArgs = #view_query_args{start_key = <<"_design/">>, include_docs=true}, - Callback = fun({total_and_offset, _, _}, []) -> - {ok, []}; - ({row, {Props}}, Acc) -> - case couch_util:get_value(id, Props) of - <<"_design/", _/binary>> -> - {ok, [couch_util:get_value(doc, Props) | Acc]}; - _ -> - {stop, Acc} - end; - (complete, Acc) -> - {ok, lists:reverse(Acc)} - end, - fabric:all_docs(dbname(DbName), Callback, [], QueryArgs). - -reset_validation_funs(DbName) -> - [rexi:cast(Node, {fabric_rpc, reset_validation_funs, [Name]}) || - #shard{node=Node, name=Name} <- mem3:shards(DbName)]. - -%% some simple type validation and transcoding - -dbname(DbName) when is_list(DbName) -> - list_to_binary(DbName); -dbname(DbName) when is_binary(DbName) -> - DbName; -dbname(#db{name=Name}) -> - Name; -dbname(DbName) -> - erlang:error({illegal_database_name, DbName}). - -name(Thing) -> - couch_util:to_binary(Thing). - -docid(DocId) when is_list(DocId) -> - list_to_binary(DocId); -docid(DocId) when is_binary(DocId) -> - DocId; -docid(DocId) -> - erlang:error({illegal_docid, DocId}). - -docs(Docs) when is_list(Docs) -> - [doc(D) || D <- Docs]; -docs(Docs) -> - erlang:error({illegal_docs_list, Docs}). - -doc(#doc{} = Doc) -> - Doc; -doc({_} = Doc) -> - couch_doc:from_json_obj(Doc); -doc(Doc) -> - erlang:error({illegal_doc_format, Doc}). - -design_doc(#doc{} = DDoc) -> - DDoc; -design_doc(DocId) when is_list(DocId) -> - design_doc(list_to_binary(DocId)); -design_doc(<<"_design/", _/binary>> = DocId) -> - DocId; -design_doc(GroupName) -> - <<"_design/", GroupName/binary>>. - -idrevs({Id, Revs}) when is_list(Revs) -> - {docid(Id), [rev(R) || R <- Revs]}. - -rev(Rev) when is_list(Rev); is_binary(Rev) -> - couch_doc:parse_rev(Rev); -rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> - Rev. - -opts(Options) -> - case couch_util:get_value(user_ctx, Options) of - undefined -> - case erlang:get(user_ctx) of - #user_ctx{} = Ctx -> - [{user_ctx, Ctx} | Options]; - _ -> - Options - end; - _ -> - Options - end. - -default_callback(complete, Acc) -> - {ok, lists:reverse(Acc)}; -default_callback(Row, Acc) -> - {ok, [Row | Acc]}. - -is_reduce_view(_, _, _, #view_query_args{view_type=Reduce}) -> - Reduce =:= reduce. diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl deleted file mode 100644 index d10bcc22..00000000 --- a/src/fabric_db_create.erl +++ /dev/null @@ -1,65 +0,0 @@ --module(fabric_db_create). --export([go/2]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/\\s.]*$"). - -%% @doc Create a new database, and all its partition files across the cluster -%% Options is proplist with user_ctx, n, q -go(DbName, Options) -> - case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of - match -> - Shards = mem3:choose_shards(DbName, Options), - Doc = make_document(Shards), - Workers = fabric_util:submit_jobs(Shards, create_db, [Options, Doc]), - Acc0 = fabric_dict:init(Workers, nil), - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, _} -> - ok; - Else -> - Else - end; - nomatch -> - {error, illegal_database_name} - end. - -handle_message(Msg, Shard, Counters) -> - C1 = fabric_dict:store(Shard, Msg, Counters), - case fabric_dict:any(nil, C1) of - true -> - {ok, C1}; - false -> - final_answer(C1) - end. - -make_document([#shard{dbname=DbName}|_] = Shards) -> - {RawOut, ByNodeOut, ByRangeOut} = - lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> - Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-", - couch_util:to_hex(<<E:32/integer>>)]), - Node = couch_util:to_binary(N), - {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode), - orddict:append(Range, Node, ByRange)} - end, {[], [], []}, Shards), - #doc{id=DbName, body = {[ - {<<"changelog">>, lists:sort(RawOut)}, - {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, - {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} - ]}}. - -final_answer(Counters) -> - Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists], - case fabric_view:is_progress_possible(Successes) of - true -> - case lists:keymember(file_exists, 2, Successes) of - true -> - {error, file_exists}; - false -> - {stop, ok} - end; - false -> - {error, internal_server_error} - end. diff --git a/src/fabric_db_delete.erl b/src/fabric_db_delete.erl deleted file mode 100644 index 57eefa9e..00000000 --- a/src/fabric_db_delete.erl +++ /dev/null @@ -1,41 +0,0 @@ --module(fabric_db_delete). --export([go/2]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -go(DbName, Options) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, delete_db, [Options, DbName]), - Acc0 = fabric_dict:init(Workers, nil), - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, ok} -> - ok; - {ok, not_found} -> - erlang:error(database_does_not_exist); - Error -> - Error - end. - -handle_message(Msg, Shard, Counters) -> - C1 = fabric_dict:store(Shard, Msg, Counters), - case fabric_dict:any(nil, C1) of - true -> - {ok, C1}; - false -> - final_answer(C1) - end. - -final_answer(Counters) -> - Successes = [X || {_, M} = X <- Counters, M == ok orelse M == not_found], - case fabric_view:is_progress_possible(Successes) of - true -> - case lists:keymember(ok, 2, Successes) of - true -> - {stop, ok}; - false -> - {stop, not_found} - end; - false -> - {error, internal_server_error} - end. diff --git a/src/fabric_db_doc_count.erl b/src/fabric_db_doc_count.erl deleted file mode 100644 index 12d5cbf8..00000000 --- a/src/fabric_db_doc_count.erl +++ /dev/null @@ -1,32 +0,0 @@ --module(fabric_db_doc_count). - --export([go/1]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, get_doc_count, []), - Acc0 = {fabric_dict:init(Workers, nil), 0}, - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). - -handle_message({ok, Count}, Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, Acc}}; - nil -> - C1 = fabric_dict:store(Shard, ok, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, Count+Acc}}; - false -> - {stop, Count+Acc} - end - end; -handle_message(_, _, Acc) -> - {ok, Acc}. - diff --git a/src/fabric_db_info.erl b/src/fabric_db_info.erl deleted file mode 100644 index 3758c5c3..00000000 --- a/src/fabric_db_info.erl +++ /dev/null @@ -1,52 +0,0 @@ --module(fabric_db_info). - --export([go/1]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -go(DbName) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, get_db_info, []), - Acc0 = {fabric_dict:init(Workers, nil), []}, - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). - -handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, Acc}}; - nil -> - C1 = fabric_dict:store(Shard, ok, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, [Info|Acc]}}; - false -> - {stop, [{db_name,Name}|merge_results(lists:flatten([Info|Acc]))]} - end - end; -handle_message(_, _, Acc) -> - {ok, Acc}. - -merge_results(Info) -> - Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, - orddict:new(), Info), - orddict:fold(fun - (doc_count, X, Acc) -> - [{doc_count, lists:sum(X)} | Acc]; - (doc_del_count, X, Acc) -> - [{doc_del_count, lists:sum(X)} | Acc]; - (update_seq, X, Acc) -> - [{update_seq, lists:sum(X)} | Acc]; - (purge_seq, X, Acc) -> - [{purge_seq, lists:sum(X)} | Acc]; - (compact_running, X, Acc) -> - [{compact_running, lists:member(true, X)} | Acc]; - (disk_size, X, Acc) -> - [{disk_size, lists:sum(X)} | Acc]; - (disk_format_version, X, Acc) -> - [{disk_format_version, lists:max(X)} | Acc]; - (_, _, Acc) -> - Acc - end, [{instance_start_time, <<"0">>}], Dict). diff --git a/src/fabric_db_meta.erl b/src/fabric_db_meta.erl deleted file mode 100644 index ee15fc72..00000000 --- a/src/fabric_db_meta.erl +++ /dev/null @@ -1,35 +0,0 @@ --module(fabric_db_meta). - --export([set_revs_limit/3, set_security/3]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -set_revs_limit(DbName, Limit, Options) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, set_revs_limit, [Limit, Options]), - Waiting = length(Workers) - 1, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of - {ok, ok} -> - ok; - Error -> - Error - end. - -set_security(DbName, SecObj, Options) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, set_security, [SecObj, Options]), - Waiting = length(Workers) - 1, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of - {ok, ok} -> - ok; - Error -> - Error - end. - -handle_message(ok, _, 0) -> - {stop, ok}; -handle_message(ok, _, Waiting) -> - {ok, Waiting - 1}; -handle_message(Error, _, _Waiting) -> - {error, Error}.
\ No newline at end of file diff --git a/src/fabric_dict.erl b/src/fabric_dict.erl deleted file mode 100644 index 42d46b34..00000000 --- a/src/fabric_dict.erl +++ /dev/null @@ -1,37 +0,0 @@ --module(fabric_dict). --compile(export_all). - -% Instead of ets, let's use an ordered keylist. We'll need to revisit if we -% have >> 100 shards, so a private interface is a good idea. - APK June 2010 - -init(Keys, InitialValue) -> - orddict:from_list([{Key, InitialValue} || Key <- Keys]). - - -decrement_all(Dict) -> - [{K,V-1} || {K,V} <- Dict]. - -store(Key, Value, Dict) -> - orddict:store(Key, Value, Dict). - -erase(Key, Dict) -> - orddict:erase(Key, Dict). - -update_counter(Key, Incr, Dict0) -> - orddict:update_counter(Key, Incr, Dict0). - - -lookup_element(Key, Dict) -> - couch_util:get_value(Key, Dict). - -size(Dict) -> - orddict:size(Dict). - -any(Value, Dict) -> - lists:keymember(Value, 2, Dict). - -filter(Fun, Dict) -> - orddict:filter(Fun, Dict). - -fold(Fun, Acc0, Dict) -> - orddict:fold(Fun, Acc0, Dict). diff --git a/src/fabric_doc_attachments.erl b/src/fabric_doc_attachments.erl deleted file mode 100644 index aecdaaef..00000000 --- a/src/fabric_doc_attachments.erl +++ /dev/null @@ -1,102 +0,0 @@ --module(fabric_doc_attachments). - --include("fabric.hrl"). - -%% couch api calls --export([receiver/2]). - -receiver(_Req, undefined) -> - <<"">>; -receiver(_Req, {unknown_transfer_encoding, Unknown}) -> - exit({unknown_transfer_encoding, Unknown}); -receiver(Req, chunked) -> - MiddleMan = spawn(fun() -> middleman(Req, chunked) end), - fun(4096, ChunkFun, ok) -> - write_chunks(MiddleMan, ChunkFun) - end; -receiver(_Req, 0) -> - <<"">>; -receiver(Req, Length) when is_integer(Length) -> - Middleman = spawn(fun() -> middleman(Req, Length) end), - fun() -> - Middleman ! {self(), gimme_data}, - receive {Middleman, Data} -> Data end - end; -receiver(_Req, Length) -> - exit({length_not_integer, Length}). - -%% -%% internal -%% - -write_chunks(MiddleMan, ChunkFun) -> - MiddleMan ! {self(), gimme_data}, - receive - {MiddleMan, {0, _Footers}} -> - % MiddleMan ! {self(), done}, - ok; - {MiddleMan, ChunkRecord} -> - ChunkFun(ChunkRecord, ok), - write_chunks(MiddleMan, ChunkFun) - end. - -receive_unchunked_attachment(_Req, 0) -> - ok; -receive_unchunked_attachment(Req, Length) -> - receive {MiddleMan, go} -> - Data = couch_httpd:recv(Req, 0), - MiddleMan ! {self(), Data} - end, - receive_unchunked_attachment(Req, Length - size(Data)). - -middleman(Req, chunked) -> - % spawn a process to actually receive the uploaded data - RcvFun = fun(ChunkRecord, ok) -> - receive {From, go} -> From ! {self(), ChunkRecord} end, ok - end, - Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end), - - % take requests from the DB writers and get data from the receiver - N = erlang:list_to_integer(couch_config:get("cluster","n")), - middleman_loop(Receiver, N, dict:new(), 0, []); - -middleman(Req, Length) -> - Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end), - N = erlang:list_to_integer(couch_config:get("cluster","n")), - middleman_loop(Receiver, N, dict:new(), 0, []). - -middleman_loop(Receiver, N, Counters, Offset, ChunkList) -> - receive {From, gimme_data} -> - % figure out how far along this writer (From) is in the list - {NewCounters, WhichChunk} = case dict:find(From, Counters) of - {ok, I} -> - {dict:update_counter(From, 1, Counters), I}; - error -> - {dict:store(From, 2, Counters), 1} - end, - ListIndex = WhichChunk - Offset, - - % talk to the receiver to get another chunk if necessary - ChunkList1 = if ListIndex > length(ChunkList) -> - Receiver ! {self(), go}, - receive {Receiver, ChunkRecord} -> ChunkList ++ [ChunkRecord] end; - true -> ChunkList end, - - % reply to the writer - From ! {self(), lists:nth(ListIndex, ChunkList1)}, - - % check if we can drop a chunk from the head of the list - SmallestIndex = dict:fold(fun(_, Val, Acc) -> lists:min([Val,Acc]) end, - WhichChunk+1, NewCounters), - Size = dict:size(NewCounters), - - {NewChunkList, NewOffset} = - if Size == N andalso (SmallestIndex - Offset) == 2 -> - {tl(ChunkList1), Offset+1}; - true -> - {ChunkList1, Offset} - end, - middleman_loop(Receiver, N, NewCounters, NewOffset, NewChunkList) - after 10000 -> - ok - end. diff --git a/src/fabric_doc_missing_revs.erl b/src/fabric_doc_missing_revs.erl deleted file mode 100644 index 9a368783..00000000 --- a/src/fabric_doc_missing_revs.erl +++ /dev/null @@ -1,64 +0,0 @@ --module(fabric_doc_missing_revs). - --export([go/2]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -go(DbName, AllIdsRevs) -> - Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) -> - Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}), - Shard#shard{ref=Ref} - end, group_idrevs_by_shard(DbName, AllIdsRevs)), - ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]), - Acc0 = {length(Workers), ResultDict}, - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). - -handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({rexi_EXIT, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({ok, Results}, _Worker, {1, D0}) -> - D = update_dict(D0, Results), - {stop, dict:fold(fun force_reply/3, [], D)}; -handle_message({ok, Results}, _Worker, {WaitingCount, D0}) -> - D = update_dict(D0, Results), - case dict:fold(fun maybe_reply/3, {stop, []}, D) of - continue -> - % still haven't heard about some Ids - {ok, {WaitingCount - 1, D}}; - {stop, FinalReply} -> - {stop, FinalReply} - end. - -force_reply(Id, {nil,Revs}, Acc) -> - % never heard about this ID, assume it's missing - [{Id, Revs} | Acc]; -force_reply(_, [], Acc) -> - Acc; -force_reply(Id, Revs, Acc) -> - [{Id, Revs} | Acc]. - -maybe_reply(_, _, continue) -> - continue; -maybe_reply(_, {nil, _}, _) -> - continue; -maybe_reply(_, [], {stop, Acc}) -> - {stop, Acc}; -maybe_reply(Id, Revs, {stop, Acc}) -> - {stop, [{Id, Revs} | Acc]}. - -group_idrevs_by_shard(DbName, IdsRevs) -> - dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, {Id, Revs}, D1) - end, D0, mem3:shards(DbName,Id)) - end, dict:new(), IdsRevs)). - -update_dict(D0, KVs) -> - lists:foldl(fun({K,V,_}, D1) -> dict:store(K, V, D1) end, D0, KVs). - -skip_message({1, Dict}) -> - {stop, dict:fold(fun force_reply/3, [], Dict)}; -skip_message({WaitingCount, Dict}) -> - {ok, {WaitingCount-1, Dict}}. diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl deleted file mode 100644 index 5c5699c3..00000000 --- a/src/fabric_doc_open.erl +++ /dev/null @@ -1,66 +0,0 @@ --module(fabric_doc_open). - --export([go/3]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, Id, Options) -> - Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc, - [Id, [deleted|Options]]), - SuppressDeletedDoc = not lists:member(deleted, Options), - R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")), - Acc0 = {length(Workers), list_to_integer(R), []}, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, {ok, #doc{deleted=true}}} when SuppressDeletedDoc -> - {not_found, deleted}; - {ok, Else} -> - Else; - Error -> - Error - end. - -handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({rexi_EXIT, _Reason}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message(Reply, _Worker, {WaitingCount, R, Replies}) -> - case merge_read_reply(make_key(Reply), Reply, Replies) of - {_, KeyCount} when KeyCount =:= R -> - {stop, Reply}; - {NewReplies, KeyCount} when KeyCount < R -> - if WaitingCount =:= 1 -> - % last message arrived, but still no quorum - repair_read_quorum_failure(NewReplies); - true -> - {ok, {WaitingCount-1, R, NewReplies}} - end - end. - -skip_message({1, _R, Replies}) -> - repair_read_quorum_failure(Replies); -skip_message({WaitingCount, R, Replies}) -> - {ok, {WaitingCount-1, R, Replies}}. - -merge_read_reply(Key, Reply, Replies) -> - case lists:keyfind(Key, 1, Replies) of - false -> - {[{Key, Reply, 1} | Replies], 1}; - {Key, _, N} -> - {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} - end. - -make_key({ok, #doc{id=Id, revs=Revs}}) -> - {Id, Revs}; -make_key(Else) -> - Else. - -repair_read_quorum_failure(Replies) -> - case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of - [] -> - {stop, {not_found, missing}}; - [Doc|_] -> - % TODO merge docs to find the winner as determined by replication - {stop, {ok, Doc}} - end.
\ No newline at end of file diff --git a/src/fabric_doc_open_revs.erl b/src/fabric_doc_open_revs.erl deleted file mode 100644 index 61ff466f..00000000 --- a/src/fabric_doc_open_revs.erl +++ /dev/null @@ -1,65 +0,0 @@ --module(fabric_doc_open_revs). - --export([go/4]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, Id, Revs, Options) -> - Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs, - [Id, Revs, Options]), - R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")), - Acc0 = {length(Workers), list_to_integer(R), []}, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, {ok, Reply}} -> - {ok, Reply}; - Else -> - Else - end. - -handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({rexi_EXIT, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message(Reply, _Worker, {WaitingCount, R, Replies}) -> - case merge_read_reply(make_key(Reply), Reply, Replies) of - {_, KeyCount} when KeyCount =:= R -> - {stop, Reply}; - {NewReplies, KeyCount} when KeyCount < R -> - if WaitingCount =:= 1 -> - % last message arrived, but still no quorum - repair_read_quorum_failure(NewReplies); - true -> - {ok, {WaitingCount-1, R, NewReplies}} - end - end. - -skip_message({1, _R, Replies}) -> - repair_read_quorum_failure(Replies); -skip_message({WaitingCount, R, Replies}) -> - {ok, {WaitingCount-1, R, Replies}}. - -merge_read_reply(Key, Reply, Replies) -> - case lists:keyfind(Key, 1, Replies) of - false -> - {[{Key, Reply, 1} | Replies], 1}; - {Key, _, N} -> - {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} - end. - -make_key({ok, #doc{id=Id, revs=Revs}}) -> - {Id, Revs}; -make_key(Else) -> - Else. - -repair_read_quorum_failure(Replies) -> - case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of - [] -> - {stop, {not_found, missing}}; - [Doc|_] -> - % TODO merge docs to find the winner as determined by replication - {stop, {ok, Doc}} - end. - -
\ No newline at end of file diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl deleted file mode 100644 index f0fcf112..00000000 --- a/src/fabric_doc_update.erl +++ /dev/null @@ -1,127 +0,0 @@ --module(fabric_doc_update). - --export([go/3]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(_, [], _) -> - {ok, []}; -go(DbName, AllDocs, Opts) -> - validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), - Options = lists:delete(all_or_nothing, Opts), - GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> - Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), - {Shard#shard{ref=Ref}, Docs} - end, group_docs_by_shard(DbName, AllDocs)), - {Workers, _} = lists:unzip(GroupedDocs), - W = couch_util:get_value(w, Options, couch_config:get("cluster","w","2")), - Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, - dict:from_list([{Doc,[]} || Doc <- AllDocs])}, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, Results} -> - Reordered = couch_util:reorder_results(AllDocs, Results), - {ok, [R || R <- Reordered, R =/= noreply]}; - Else -> - Else - end. - -handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({rexi_EXIT, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({ok, Replies}, Worker, Acc0) -> - {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, - Docs = couch_util:get_value(Worker, GroupedDocs), - DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), - case {WaitingCount, dict:size(DocReplyDict)} of - {1, _} -> - % last message has arrived, we need to conclude things - {W, Reply} = dict:fold(fun force_reply/3, {W,[]}, DocReplyDict), - {stop, Reply}; - {_, DocCount} -> - % we've got at least one reply for each document, let's take a look - case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of - continue -> - {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}; - {stop, W, FinalReplies} -> - {stop, FinalReplies} - end; - {_, N} when N < DocCount -> - % no point in trying to finalize anything yet - {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}} - end; -handle_message({missing_stub, Stub}, _, _) -> - throw({missing_stub, Stub}); -handle_message({not_found, no_db_file} = X, Worker, Acc0) -> - {_, _, _, GroupedDocs, _} = Acc0, - Docs = couch_util:get_value(Worker, GroupedDocs), - handle_message({ok, [X || _D <- Docs]}, Worker, Acc0). - -force_reply(Doc, [], {W, Acc}) -> - {W, [{Doc, {error, internal_server_error}} | Acc]}; -force_reply(Doc, [FirstReply|_] = Replies, {W, Acc}) -> - case update_quorum_met(W, Replies) of - {true, Reply} -> - {W, [{Doc,Reply} | Acc]}; - false -> - ?LOG_ERROR("write quorum (~p) failed, reply ~p", [W, FirstReply]), - % TODO make a smarter choice than just picking the first reply - {W, [{Doc,FirstReply} | Acc]} - end. - -maybe_reply(_, _, continue) -> - % we didn't meet quorum for all docs, so we're fast-forwarding the fold - continue; -maybe_reply(Doc, Replies, {stop, W, Acc}) -> - case update_quorum_met(W, Replies) of - {true, Reply} -> - {stop, W, [{Doc, Reply} | Acc]}; - false -> - continue - end. - -update_quorum_met(W, Replies) -> - Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end, - orddict:new(), Replies), - case lists:dropwhile(fun({_, Count}) -> Count < W end, Counters) of - [] -> - false; - [{FinalReply, _} | _] -> - {true, FinalReply} - end. - --spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. -group_docs_by_shard(DbName, Docs) -> - dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, Doc, D1) - end, D0, mem3:shards(DbName,Id)) - end, dict:new(), Docs)). - -append_update_replies([], [], DocReplyDict) -> - DocReplyDict; -append_update_replies([Doc|Rest], [], Dict0) -> - % icky, if replicated_changes only errors show up in result - append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0)); -append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> - % TODO what if the same document shows up twice in one update_docs call? - append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). - -skip_message(Acc0) -> - % TODO fix this - {ok, Acc0}. - -validate_atomic_update(_, _, false) -> - ok; -validate_atomic_update(_DbName, AllDocs, true) -> - % TODO actually perform the validation. This requires some hackery, we need - % to basically extract the prep_and_validate_updates function from couch_db - % and only run that, without actually writing in case of a success. - Error = {not_implemented, <<"all_or_nothing is not supported yet">>}, - PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) -> - case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end, - {{Id, {Pos, RevId}}, Error} - end, AllDocs), - throw({aborted, PreCommitFailures}). diff --git a/src/fabric_group_info.erl b/src/fabric_group_info.erl deleted file mode 100644 index 04605a66..00000000 --- a/src/fabric_group_info.erl +++ /dev/null @@ -1,52 +0,0 @@ --module(fabric_group_info). - --export([go/2]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, GroupId) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, GroupId, []), - go(DbName, DDoc); - -go(DbName, #doc{} = DDoc) -> - Group = couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc), - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, group_info, [Group]), - Acc0 = {fabric_dict:init(Workers, nil), []}, - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). - -handle_message({ok, Info}, Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, Acc}}; - nil -> - C1 = fabric_dict:store(Shard, ok, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, [Info|Acc]}}; - false -> - {stop, merge_results(lists:flatten([Info|Acc]))} - end - end; -handle_message(_, _, Acc) -> - {ok, Acc}. - -merge_results(Info) -> - Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, - orddict:new(), Info), - orddict:fold(fun - (signature, [X|_], Acc) -> - [{signature, X} | Acc]; - (language, [X|_], Acc) -> - [{language, X} | Acc]; - (disk_size, X, Acc) -> - [{disk_size, lists:sum(X)} | Acc]; - (compact_running, X, Acc) -> - [{compact_running, lists:member(true, X)} | Acc]; - (_, _, Acc) -> - Acc - end, [], Dict). diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl deleted file mode 100644 index f56e3f68..00000000 --- a/src/fabric_rpc.erl +++ /dev/null @@ -1,388 +0,0 @@ --module(fabric_rpc). - --export([get_db_info/1, get_doc_count/1, get_update_seq/1]). --export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]). --export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]). --export([create_db/3, delete_db/3, reset_validation_funs/1, set_security/3, - set_revs_limit/3]). - --include("fabric.hrl"). --include_lib("couch/include/couch_db.hrl"). - --record (view_acc, { - db, - limit, - include_docs, - offset = nil, - total_rows, - reduce_fun = fun couch_db:enum_docs_reduce_to_count/1, - group_level = 0 -}). - -%% rpc endpoints -%% call to with_db will supply your M:F with a #db{} and then remaining args - -all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> - {ok, Db} = couch_db:open(DbName, []), - #view_query_args{ - start_key = StartKey, - start_docid = StartDocId, - end_key = EndKey, - end_docid = EndDocId, - limit = Limit, - skip = Skip, - include_docs = IncludeDocs, - direction = Dir, - inclusive_end = Inclusive - } = QueryArgs, - {ok, Total} = couch_db:get_doc_count(Db), - Acc0 = #view_acc{ - db = Db, - include_docs = IncludeDocs, - limit = Limit+Skip, - total_rows = Total - }, - EndKeyType = if Inclusive -> end_key; true -> end_key_gt end, - Options = [ - {dir, Dir}, - {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end}, - {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end} - ], - {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options), - final_response(Total, Acc#view_acc.offset). - -changes(DbName, Args, StartSeq) -> - #changes_args{style=Style, dir=Dir} = Args, - case couch_db:open(DbName, []) of - {ok, Db} -> - Enum = fun changes_enumerator/2, - Opts = [{dir,Dir}], - Acc0 = {Db, StartSeq, Args}, - try - {ok, {_, LastSeq, _}} = - couch_db:changes_since(Db, Style, StartSeq, Enum, Opts, Acc0), - rexi:reply({complete, LastSeq}) - after - couch_db:close(Db) - end; - Error -> - rexi:reply(Error) - end. - -map_view(DbName, DDoc, ViewName, QueryArgs) -> - {ok, Db} = couch_db:open(DbName, []), - #view_query_args{ - limit = Limit, - skip = Skip, - keys = Keys, - include_docs = IncludeDocs, - stale = Stale, - view_type = ViewType - } = QueryArgs, - MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, - Group0 = couch_view_group:design_doc_to_view_group(Db, DDoc), - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, Group} = couch_view_group:request_group(Pid, MinSeq), - View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType), - {ok, Total} = couch_view:get_row_count(View), - Acc0 = #view_acc{ - db = Db, - include_docs = IncludeDocs, - limit = Limit+Skip, - total_rows = Total, - reduce_fun = fun couch_view:reduce_to_count/1 - }, - case Keys of - nil -> - Options = couch_httpd_view:make_key_options(QueryArgs), - {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options); - _ -> - Acc = lists:foldl(fun(Key, AccIn) -> - KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, - Options = couch_httpd_view:make_key_options(KeyArgs), - {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn, - Options), - Out - end, Acc0, Keys) - end, - final_response(Total, Acc#view_acc.offset). - -reduce_view(DbName, Group0, ViewName, QueryArgs) -> - {ok, Db} = couch_db:open(DbName, []), - #view_query_args{ - group_level = GroupLevel, - limit = Limit, - skip = Skip, - keys = Keys, - stale = Stale - } = QueryArgs, - GroupFun = group_rows_fun(GroupLevel), - MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, #group{views=Views, def_lang=Lang}} = couch_view_group:request_group( - Pid, MinSeq), - {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), - ReduceView = {reduce, NthRed, Lang, View}, - Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, - case Keys of - nil -> - Options0 = couch_httpd_view:make_key_options(QueryArgs), - Options = [{key_group_fun, GroupFun} | Options0], - couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options); - _ -> - lists:map(fun(Key) -> - KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, - Options0 = couch_httpd_view:make_key_options(KeyArgs), - Options = [{key_group_fun, GroupFun} | Options0], - couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options) - end, Keys) - end, - rexi:reply(complete). - -create_db(DbName, Options, Doc) -> - mem3_util:write_db_doc(Doc), - rexi:reply(case couch_server:create(DbName, Options) of - {ok, _} -> - ok; - Error -> - Error - end). - -delete_db(DbName, Options, DocId) -> - mem3_util:delete_db_doc(DocId), - rexi:reply(couch_server:delete(DbName, Options)). - -get_db_info(DbName) -> - with_db(DbName, [], {couch_db, get_db_info, []}). - -get_doc_count(DbName) -> - with_db(DbName, [], {couch_db, get_doc_count, []}). - -get_update_seq(DbName) -> - with_db(DbName, [], {couch_db, get_update_seq, []}). - -set_security(DbName, SecObj, Options) -> - with_db(DbName, Options, {couch_db, set_security, [SecObj]}). - -set_revs_limit(DbName, Limit, Options) -> - with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). - -open_doc(DbName, DocId, Options) -> - with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). - -open_revs(DbName, Id, Revs, Options) -> - with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}). - -get_missing_revs(DbName, IdRevsList) -> - % reimplement here so we get [] for Ids with no missing revs in response - rexi:reply(case couch_db:open(DbName, []) of - {ok, Db} -> - Ids = [Id1 || {Id1, _Revs} <- IdRevsList], - {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> - case FullDocInfoResult of - {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} -> - MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), - {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; - not_found -> - {Id, Revs, []} - end - end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; - Error -> - Error - end). - -update_docs(DbName, Docs0, Options) -> - case proplists:get_value(replicated_changes, Options) of - true -> - X = replicated_changes; - _ -> - X = interactive_edit - end, - Docs = make_att_readers(Docs0), - with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). - -group_info(DbName, Group0) -> - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - rexi:reply(couch_view_group:request_group_info(Pid)). - -reset_validation_funs(DbName) -> - case couch_db:open(DbName, []) of - {ok, #db{main_pid = Pid}} -> - gen_server:cast(Pid, {load_validation_funs, undefined}); - _ -> - ok - end. - -%% -%% internal -%% - -with_db(DbName, Options, {M,F,A}) -> - case couch_db:open(DbName, Options) of - {ok, Db} -> - rexi:reply(try - apply(M, F, [Db | A]) - catch Exception -> - Exception; - error:Reason -> - ?LOG_ERROR("~p ~p ~p~n~p", [?MODULE, {M,F}, Reason, - erlang:get_stacktrace()]), - {error, Reason} - end); - Error -> - rexi:reply(Error) - end. - -view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> - % matches for _all_docs and translates #full_doc_info{} -> KV pair - case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{revs=[#rev_info{deleted=false, rev=Rev}|_]} -> - Id = FullDocInfo#full_doc_info.id, - Value = {[{rev,couch_doc:rev_to_str(Rev)}]}, - view_fold({{Id,Id}, Value}, OffsetReds, Acc); - #doc_info{revs=[#rev_info{deleted=true}|_]} -> - {ok, Acc} - end; -view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) -> - % calculates the offset for this shard - #view_acc{reduce_fun=Reduce} = Acc, - Offset = Reduce(OffsetReds), - case rexi:sync_reply({total_and_offset, Total, Offset}) of - ok -> - view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); - stop -> - exit(normal); - timeout -> - exit(timeout) - end; -view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) -> - % we scanned through limit+skip local rows - {stop, Acc}; -view_fold({{Key,Id}, Value}, _Offset, Acc) -> - % the normal case - #view_acc{ - db = Db, - limit = Limit, - include_docs = IncludeDocs - } = Acc, - Doc = if not IncludeDocs -> undefined; true -> - case couch_db:open_doc(Db, Id, []) of - {not_found, deleted} -> - null; - {not_found, missing} -> - undefined; - {ok, Doc0} -> - couch_doc:to_json_obj(Doc0, []) - end - end, - case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of - ok -> - {ok, Acc#view_acc{limit=Limit-1}}; - timeout -> - exit(timeout) - end. - -final_response(Total, nil) -> - case rexi:sync_reply({total_and_offset, Total, Total}) of ok -> - rexi:reply(complete); - stop -> - ok; - timeout -> - exit(timeout) - end; -final_response(_Total, _Offset) -> - rexi:reply(complete). - -group_rows_fun(exact) -> - fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end; -group_rows_fun(0) -> - fun(_A, _B) -> true end; -group_rows_fun(GroupLevel) when is_integer(GroupLevel) -> - fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) -> - lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel); - ({Key1,_}, {Key2,_}) -> - Key1 == Key2 - end. - -reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) -> - {stop, Acc}; -reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) -> - send(null, Red, Acc); -reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) -> - send(Key, Red, Acc); -reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) -> - send(lists:sublist(K, I), Red, Acc). - -send(Key, Value, #view_acc{limit=Limit} = Acc) -> - case rexi:sync_reply(#view_row{key=Key, value=Value}) of - ok -> - {ok, Acc#view_acc{limit=Limit-1}}; - stop -> - exit(normal); - timeout -> - exit(timeout) - end. - -changes_enumerator(DocInfo, {Db, _Seq, Args}) -> - #changes_args{include_docs=IncludeDocs, filter=FilterFun} = Args, - #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} - = DocInfo, - case [Result || Result <- FilterFun(DocInfo), Result /= null] of - [] -> - {ok, {Db, Seq, Args}}; - Results -> - ChangesRow = changes_row(Db, Seq, Id, Results, Rev, Del, IncludeDocs), - Go = rexi:sync_reply(ChangesRow), - {Go, {Db, Seq, Args}} - end. - -changes_row(_, Seq, Id, Results, _, true, true) -> - #view_row{key=Seq, id=Id, value=Results, doc=deleted}; -changes_row(_, Seq, Id, Results, _, true, false) -> - #view_row{key=Seq, id=Id, value=Results, doc=deleted}; -changes_row(Db, Seq, Id, Results, Rev, false, true) -> - #view_row{key=Seq, id=Id, value=Results, doc=doc_member(Db, Id, Rev)}; -changes_row(_, Seq, Id, Results, _, false, false) -> - #view_row{key=Seq, id=Id, value=Results}. - -doc_member(Shard, Id, Rev) -> - case couch_db:open_doc_revs(Shard, Id, [Rev], []) of - {ok, [{ok,Doc}]} -> - couch_doc:to_json_obj(Doc, []); - Error -> - Error - end. - -possible_ancestors(_FullInfo, []) -> - []; -possible_ancestors(FullInfo, MissingRevs) -> - #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), - LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], - % Find the revs that are possible parents of this rev - lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> - % this leaf is a "possible ancenstor" of the missing - % revs if this LeafPos lessthan any of the missing revs - case lists:any(fun({MissingPos, _}) -> - LeafPos < MissingPos end, MissingRevs) of - true -> - [{LeafPos, LeafRevId} | Acc]; - false -> - Acc - end - end, [], LeafRevs). - -make_att_readers([]) -> - []; -make_att_readers([#doc{atts=Atts0} = Doc | Rest]) -> - % % go through the attachments looking for 'follows' in the data, - % % replace with function that reads the data from MIME stream. - Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0], - [Doc#doc{atts = Atts} | make_att_readers(Rest)]. - -make_att_reader({follows, Parser}) -> - fun() -> - Parser ! {get_bytes, self()}, - receive {bytes, Bytes} -> Bytes end - end; -make_att_reader(Else) -> - Else. diff --git a/src/fabric_util.erl b/src/fabric_util.erl deleted file mode 100644 index 639a32e7..00000000 --- a/src/fabric_util.erl +++ /dev/null @@ -1,89 +0,0 @@ --module(fabric_util). - --export([submit_jobs/3, cleanup/1, recv/4, receive_loop/4, receive_loop/6, - get_db/1]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -submit_jobs(Shards, EndPoint, ExtraArgs) -> - lists:map(fun(#shard{node=Node, name=ShardName} = Shard) -> - Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}), - Shard#shard{ref = Ref} - end, Shards). - -cleanup(Workers) -> - [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers]. - -recv(Workers, Keypos, Fun, Acc0) -> - receive_loop(Workers, Keypos, Fun, Acc0). - -receive_loop(Workers, Keypos, Fun, Acc0) -> - case couch_config:get("fabric", "request_timeout", "60000") of - "infinity" -> - Timeout = infinity; - N -> - Timeout = list_to_integer(N) - end, - receive_loop(Workers, Keypos, Fun, Acc0, Timeout, infinity). - -%% @doc set up the receive loop with an overall timeout --spec receive_loop([any()], integer(), function(), any(), timeout(), timeout()) -> - {ok, any()} | timeout | {error, any()}. -receive_loop(RefPartMap, Keypos, Fun, Acc0, infinity, PerMsgTO) -> - process_mailbox(RefPartMap, Keypos, Fun, Acc0, nil, PerMsgTO); -receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) -> - TimeoutRef = erlang:make_ref(), - {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}), - try - process_mailbox(RefPartMap, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) - after - timer:cancel(TRef) - end. - -process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> - case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of - {ok, Acc} -> - process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); - {stop, Acc} -> - {ok, Acc}; - Error -> - Error - end. - -process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> - receive - {timeout, TimeoutRef} -> - timeout; - {Ref, Msg} -> - case lists:keyfind(Ref, Keypos, RefList) of - false -> - % this was some non-matching message which we will ignore - {ok, Acc0}; - Worker -> - Fun(Msg, Worker, Acc0) - end; - {Ref, From, Msg} -> - case lists:keyfind(Ref, Keypos, RefList) of - false -> - {ok, Acc0}; - Worker -> - Fun(Msg, {Worker, From}, Acc0) - end; - {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg -> - showroom_log:message(alert, "rexi_DOWN ~p ~p", [ServerPid, Reason]), - Fun(Msg, nil, Acc0) - after PerMsgTO -> - timeout - end. - -get_db(DbName) -> - Shards = mem3:shards(DbName), - case lists:partition(fun(#shard{node = N}) -> N =:= node() end, Shards) of - {[#shard{name = ShardName}|_], _} -> - % prefer node-local DBs - couch_db:open(ShardName, []); - {[], [#shard{node = Node, name = ShardName}|_]} -> - % but don't require them - rpc:call(Node, couch_db, open, [ShardName, []]) - end. diff --git a/src/fabric_view.erl b/src/fabric_view.erl deleted file mode 100644 index 49a3a55a..00000000 --- a/src/fabric_view.erl +++ /dev/null @@ -1,218 +0,0 @@ --module(fabric_view). - --export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, - maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1, - extract_view/4]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -%% @doc looks for a fully covered keyrange in the list of counters --spec is_progress_possible([{#shard{}, term()}]) -> boolean(). -is_progress_possible([]) -> - false; -is_progress_possible(Counters) -> - Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, - [], Counters), - [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges), - Result = lists:foldl(fun - (_, fail) -> - % we've already declared failure - fail; - (_, complete) -> - % this is the success condition, we can fast-forward - complete; - ({X,_}, Tail) when X > (Tail+1) -> - % gap in the keyrange, we're dead - fail; - ({_,Y}, Tail) -> - case erlang:max(Tail, Y) of - End when (End+1) =:= (2 bsl 31) -> - complete; - Else -> - % the normal condition, adding to the tail - Else - end - end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest), - (Start =:= 0) andalso (Result =:= complete). - --spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> - [{#shard{}, any()}]. -remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> - fabric_dict:filter(fun(#shard{range=[X,Y]} = Shard, _Value) -> - if Shard =:= Shard0 -> - % we can't remove ourselves - true; - A < B, X >= A, X < B -> - % lower bound is inside our range - false; - A < B, Y > A, Y =< B -> - % upper bound is inside our range - false; - B < A, X >= A orelse B < A, X < B -> - % target shard wraps the key range, lower bound is inside - false; - B < A, Y > A orelse B < A, Y =< B -> - % target shard wraps the key range, upper bound is inside - false; - true -> - true - end - end, Shards). - -maybe_pause_worker(Worker, From, State) -> - #collector{buffer_size = BufferSize, counters = Counters} = State, - case fabric_dict:lookup_element(Worker, Counters) of - BufferSize -> - State#collector{blocked = [{Worker,From} | State#collector.blocked]}; - _Count -> - gen_server:reply(From, ok), - State - end. - -maybe_resume_worker(Worker, State) -> - #collector{buffer_size = Buffer, counters = C, blocked = B} = State, - case fabric_dict:lookup_element(Worker, C) of - Count when Count < Buffer/2 -> - case couch_util:get_value(Worker, B) of - undefined -> - State; - From -> - gen_server:reply(From, ok), - State#collector{blocked = lists:keydelete(Worker, 1, B)} - end; - _Other -> - State - end. - -maybe_send_row(#collector{limit=0} = State) -> - #collector{user_acc=AccIn, callback=Callback} = State, - {_, Acc} = Callback(complete, AccIn), - {stop, State#collector{user_acc=Acc}}; -maybe_send_row(State) -> - #collector{ - callback = Callback, - counters = Counters, - skip = Skip, - limit = Limit, - user_acc = AccIn - } = State, - case fabric_dict:any(0, Counters) of - true -> - {ok, State}; - false -> - try get_next_row(State) of - {_, NewState} when Skip > 0 -> - maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1}); - {Row, NewState} -> - case Callback(transform_row(Row), AccIn) of - {stop, Acc} -> - {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; - {ok, Acc} -> - maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) - end - catch complete -> - {_, Acc} = Callback(complete, AccIn), - {stop, State#collector{user_acc=Acc}} - end - end. - -keydict(nil) -> - undefined; -keydict(Keys) -> - {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end, - {dict:new(),0}, Keys), - Dict. - -%% internal %% - -get_next_row(#collector{rows = []}) -> - throw(complete); -get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> - #collector{ - query_args = #view_query_args{direction=Dir}, - keys = Keys, - rows = RowDict, - os_proc = Proc, - counters = Counters0 - } = St, - {Key, RestKeys} = find_next_key(Keys, Dir, RowDict), - case dict:find(Key, RowDict) of - {ok, Records} -> - NewRowDict = dict:erase(Key, RowDict), - Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) -> - fabric_dict:update_counter(Worker, -1, CountersAcc) - end, Counters0, Records), - Wrapped = [[V] || #view_row{value=V} <- Records], - {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped), - NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters}, - {#view_row{key=Key, id=reduced, value=Reduced}, NewSt}; - error -> - get_next_row(St#collector{keys=RestKeys}) - end; -get_next_row(State) -> - #collector{rows = [Row|Rest], counters = Counters0} = State, - Worker = Row#view_row.worker, - Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), - NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), - {Row, NewState#collector{rows = Rest}}. - -find_next_key(nil, Dir, RowDict) -> - case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of - [] -> - throw(complete); - [Key|_] -> - {Key, nil} - end; -find_next_key([], _, _) -> - throw(complete); -find_next_key([Key|Rest], _, _) -> - {Key, Rest}. - -transform_row(#view_row{key=Key, id=reduced, value=Value}) -> - {row, {[{key,Key}, {value,Value}]}}; -transform_row(#view_row{key=Key, id=undefined}) -> - {row, {[{key,Key}, {error,not_found}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}. - -sort_fun(fwd) -> - fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end; -sort_fun(rev) -> - fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end. - -extract_view(Pid, ViewName, [], _ViewType) -> - ?LOG_ERROR("missing_named_view ~p", [ViewName]), - exit(Pid, kill), - exit(missing_named_view); -extract_view(Pid, ViewName, [View|Rest], ViewType) -> - case lists:member(ViewName, view_names(View, ViewType)) of - true -> - if ViewType == reduce -> - {index_of(ViewName, view_names(View, reduce)), View}; - true -> - View - end; - false -> - extract_view(Pid, ViewName, Rest, ViewType) - end. - -view_names(View, Type) when Type == red_map; Type == reduce -> - [Name || {Name, _} <- View#view.reduce_funs]; -view_names(View, map) -> - View#view.map_names. - -index_of(X, List) -> - index_of(X, List, 1). - -index_of(_X, [], _I) -> - not_found; -index_of(X, [X|_Rest], I) -> - I; -index_of(X, [_|Rest], I) -> - index_of(X, Rest, I+1). diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl deleted file mode 100644 index d51a2831..00000000 --- a/src/fabric_view_all_docs.erl +++ /dev/null @@ -1,167 +0,0 @@ --module(fabric_view_all_docs). - --export([go/4]). --export([open_doc/3]). % exported for spawn - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> - Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> - Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}), - Shard#shard{ref = Ref} - end, mem3:shards(DbName)), - BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), - #view_query_args{limit = Limit, skip = Skip} = QueryArgs, - State = #collector{ - query_args = QueryArgs, - callback = Callback, - buffer_size = list_to_integer(BufferSize), - counters = fabric_dict:init(Workers, 0), - skip = Skip, - limit = Limit, - user_acc = Acc0 - }, - try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, - State, infinity, 5000) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - Error -> - Error - after - fabric_util:cleanup(Workers) - end; - -go(DbName, QueryArgs, Callback, Acc0) -> - #view_query_args{ - direction = Dir, - include_docs = IncludeDocs, - limit = Limit0, - skip = Skip0, - keys = Keys - } = QueryArgs, - {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end), - Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) || - Id <- Keys], - Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end, - receive {'DOWN', Ref0, _, _, {ok, TotalRows}} -> - {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0), - {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1), - Callback(complete, Acc2) - after 10000 -> - Callback(timeout, Acc0) - end. - -handle_message({rexi_DOWN, _, _, _}, nil, State) -> - % TODO see if progress can be made here, possibly by removing all shards - % from that node and checking is_progress_possible - {ok, State}; - -handle_message({rexi_EXIT, _}, Worker, State) -> - #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, - Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters}}; - false -> - Callback({error, dead_shards}, Acc), - {error, dead_shards} - end; - -handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> - #collector{ - callback = Callback, - counters = Counters0, - total_rows = Total0, - offset = Offset0, - user_acc = AccIn - } = State, - case fabric_dict:lookup_element(Worker, Counters0) of - undefined -> - % this worker lost the race with other partition copies, terminate - gen_server:reply(From, stop), - {ok, State}; - 0 -> - gen_server:reply(From, ok), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), - Total = Total0 + Tot, - Offset = Offset0 + Off, - case fabric_dict:any(0, Counters2) of - true -> - {ok, State#collector{ - counters = Counters2, - total_rows = Total, - offset = Offset - }}; - false -> - FinalOffset = erlang:min(Total, Offset+State#collector.skip), - {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), - {Go, State#collector{ - counters = fabric_dict:decrement_all(Counters2), - total_rows = Total, - offset = FinalOffset, - user_acc = Acc - }} - end - end; - -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, - Dir = Args#view_query_args.direction, - Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows=Rows, counters=Counters1}, - State2 = fabric_view:maybe_pause_worker(Worker, From, State1), - fabric_view:maybe_send_row(State2); - -handle_message(complete, Worker, State) -> - Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), - fabric_view:maybe_send_row(State#collector{counters = Counters}). - - -merge_row(fwd, Row, Rows) -> - lists:keymerge(#view_row.id, [Row], Rows); -merge_row(rev, Row, Rows) -> - lists:rkeymerge(#view_row.id, [Row], Rows). - -doc_receive_loop([], _, _, _, Acc) -> - {ok, Acc}; -doc_receive_loop(_, _, 0, _, Acc) -> - {ok, Acc}; -doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 -> - receive {'DOWN', Ref, process, Pid, #view_row{}} -> - doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc) - after 10000 -> - timeout - end; -doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) -> - receive {'DOWN', Ref, process, Pid, #view_row{} = Row} -> - case Callback(fabric_view:transform_row(Row), AccIn) of - {ok, Acc} -> - doc_receive_loop(Rest, 0, Limit-1, Callback, Acc); - {stop, Acc} -> - {ok, Acc} - end - after 10000 -> - timeout - end. - -open_doc(DbName, Id, IncludeDocs) -> - Row = case fabric:open_doc(DbName, Id, [deleted]) of - {not_found, missing} -> - Doc = undefined, - #view_row{key=Id}; - {ok, #doc{deleted=true, revs=Revs}} -> - Doc = null, - {RevPos, [RevId|_]} = Revs, - Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]}, - #view_row{key=Id, id=Id, value=Value}; - {ok, #doc{revs=Revs} = Doc0} -> - Doc = couch_doc:to_json_obj(Doc0, []), - {RevPos, [RevId|_]} = Revs, - Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]}, - #view_row{key=Id, id=Id, value=Value} - end, - exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end). diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl deleted file mode 100644 index 6030df1d..00000000 --- a/src/fabric_view_changes.erl +++ /dev/null @@ -1,251 +0,0 @@ --module(fabric_view_changes). - --export([go/5, start_update_notifier/1]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse - Feed == "longpoll" -> - Args = make_changes_args(Options), - {ok, Acc} = Callback(start, Acc0), - Notifiers = start_update_notifiers(DbName), - {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback), - try - keep_sending_changes( - DbName, - Args, - Callback, - get_start_seq(DbName, Args), - Acc, - Timeout, - TimeoutFun - ) - after - stop_update_notifiers(Notifiers), - couch_changes:get_rest_db_updated() - end; - -go(DbName, "normal", Options, Callback, Acc0) -> - Args = make_changes_args(Options), - {ok, Acc} = Callback(start, Acc0), - {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes( - DbName, - Args, - Callback, - get_start_seq(DbName, Args), - Acc - ), - Callback({stop, pack_seqs(Seqs)}, AccOut). - -keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) -> - #changes_args{limit=Limit, feed=Feed} = Args, - {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn), - #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector, - LastSeq = pack_seqs(NewSeqs), - if Limit > Limit2, Feed == "longpoll" -> - Callback({stop, LastSeq}, AccOut); - true -> - case couch_changes:wait_db_updated(Timeout, TFun) of - updated -> - keep_sending_changes( - DbName, - Args#changes_args{limit=Limit2}, - Callback, - LastSeq, - AccIn, - Timeout, - TFun - ); - stop -> - Callback({stop, LastSeq}, AccOut) - end - end. - -send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) -> - AllShards = mem3:shards(DbName), - Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> - case lists:member(Shard, AllShards) of - true -> - Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), - [{Shard#shard{ref = Ref}, Seq}]; - false -> - % Find some replacement shards to cover the missing range - % TODO It's possible in rare cases of shard merging to end up - % with overlapping shard ranges from this technique - lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> - Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}), - {NewShard#shard{ref = Ref}, 0} - end, find_replacement_shards(Shard, AllShards)) - end - end, unpack_seqs(PackedSeqs, DbName)), - {Workers, _} = lists:unzip(Seqs), - State = #collector{ - query_args = ChangesArgs, - callback = Callback, - counters = fabric_dict:init(Workers, 0), - user_acc = AccIn, - limit = ChangesArgs#changes_args.limit, - rows = Seqs % store sequence positions instead - }, - try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, - State, infinity, 5000) - after - fabric_util:cleanup(Workers) - end. - -handle_message({rexi_DOWN, _, _, _}, nil, State) -> - % TODO see if progress can be made here, possibly by removing all shards - % from that node and checking is_progress_possible - {ok, State}; - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), - #collector{ - callback=Callback, - counters=Counters0, - rows = Seqs0, - user_acc=Acc - } = State, - Counters = fabric_dict:erase(Worker, Counters0), - Seqs = fabric_dict:erase(Worker, Seqs0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters, rows=Seqs}}; - false -> - Callback({error, dead_shards}, Acc), - {error, dead_shards} - end; - -handle_message(_, _, #collector{limit=0} = State) -> - {stop, State}; - -handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) -> - #collector{ - query_args = #changes_args{include_docs=IncludeDocs}, - callback = Callback, - counters = S0, - limit = Limit, - user_acc = AccIn - } = St, - case fabric_dict:lookup_element(Worker, S0) of - undefined -> - % this worker lost the race with other partition copies, terminate it - gen_server:reply(From, stop), - {ok, St}; - _ -> - S1 = fabric_dict:store(Worker, Seq, S0), - S2 = fabric_view:remove_overlapping_shards(Worker, S1), - Row = Row0#view_row{key = pack_seqs(S2)}, - {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn), - gen_server:reply(From, Go), - {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}} - end; - -handle_message({complete, EndSeq}, Worker, State) -> - #collector{ - counters = S0, - total_rows = Completed % override - } = State, - case fabric_dict:lookup_element(Worker, S0) of - undefined -> - {ok, State}; - _ -> - S1 = fabric_dict:store(Worker, EndSeq, S0), - % unlikely to have overlaps here, but possible w/ filters - S2 = fabric_view:remove_overlapping_shards(Worker, S1), - NewState = State#collector{counters=S2, total_rows=Completed+1}, - case fabric_dict:size(S2) =:= (Completed+1) of - true -> - {stop, NewState}; - false -> - {ok, NewState} - end - end. - -make_changes_args(#changes_args{style=main_only, filter=undefined}=Args) -> - Args#changes_args{filter = fun couch_changes:main_only_filter/1}; -make_changes_args(#changes_args{style=all_docs, filter=undefined}=Args) -> - Args#changes_args{filter = fun couch_changes:all_docs_filter/1}; -make_changes_args(Args) -> - Args. - -get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> - Since; -get_start_seq(DbName, #changes_args{dir=rev}) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, get_update_seq, []), - {ok, Since} = fabric_util:recv(Workers, #shard.ref, - fun collect_update_seqs/3, fabric_dict:init(Workers, -1)), - Since. - -collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, Counters}; - -1 -> - C1 = fabric_dict:store(Shard, Seq, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(-1, C2) of - true -> - {ok, C2}; - false -> - {stop, pack_seqs(C2)} - end - end. - -pack_seqs(Workers) -> - SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers], - SeqSum = lists:sum(element(2, lists:unzip(Workers))), - Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])), - list_to_binary([integer_to_list(SeqSum), $-, Opaque]). - -unpack_seqs(0, DbName) -> - fabric_dict:init(mem3:shards(DbName), 0); - -unpack_seqs("0", DbName) -> - fabric_dict:init(mem3:shards(DbName), 0); - -unpack_seqs(Packed, DbName) -> - {match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?<opaque>.*)", [{capture, - [opaque], binary}]), - % TODO relies on internal structure of fabric_dict as keylist - lists:map(fun({Node, [A,B], Seq}) -> - Shard = #shard{node=Node, range=[A,B], dbname=DbName}, - {mem3_util:name_shard(Shard), Seq} - end, binary_to_term(couch_util:decodeBase64Url(Opaque))). - -start_update_notifiers(DbName) -> - lists:map(fun(#shard{node=Node, name=Name}) -> - {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})} - end, mem3:shards(DbName)). - -% rexi endpoint -start_update_notifier(DbName) -> - {Caller, _} = get(rexi_from), - Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end, - Id = {couch_db_update_notifier, make_ref()}, - ok = gen_event:add_sup_handler(couch_db_update, Id, Fun), - receive {gen_event_EXIT, Id, Reason} -> - rexi:reply({gen_event_EXIT, DbName, Reason}) - end. - -stop_update_notifiers(Notifiers) -> - [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers]. - -changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}}; -changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}}; -changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}}; -changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}}; -changes_row(#view_row{key=Seq, id=Id, value=Value}, false) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}. - -find_replacement_shards(#shard{range=Range}, AllShards) -> - % TODO make this moar betta -- we might have split or merged the partition - [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl deleted file mode 100644 index ce8dd625..00000000 --- a/src/fabric_view_map.erl +++ /dev/null @@ -1,138 +0,0 @@ --module(fabric_view_map). - --export([go/6]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), - go(DbName, DDoc, View, Args, Callback, Acc0); - -go(DbName, DDoc, View, Args, Callback, Acc0) -> - Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> - Ref = rexi:cast(Node, {fabric_rpc, map_view, [Name, DDoc, View, Args]}), - Shard#shard{ref = Ref} - end, mem3:shards(DbName)), - BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), - #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args, - State = #collector{ - query_args = Args, - callback = Callback, - buffer_size = list_to_integer(BufferSize), - counters = fabric_dict:init(Workers, 0), - skip = Skip, - limit = Limit, - keys = fabric_view:keydict(Keys), - sorted = Args#view_query_args.sorted, - user_acc = Acc0 - }, - try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, - State, infinity, 1000 * 60 * 60) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - Error -> - Error - after - fabric_util:cleanup(Workers) - end. - -handle_message({rexi_DOWN, _, _, _}, nil, State) -> - % TODO see if progress can be made here, possibly by removing all shards - % from that node and checking is_progress_possible - {ok, State}; - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), - #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, - Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters}}; - false -> - Callback({error, dead_shards}, Acc), - {error, dead_shards} - end; - -handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> - #collector{ - callback = Callback, - counters = Counters0, - total_rows = Total0, - offset = Offset0, - user_acc = AccIn - } = State, - case fabric_dict:lookup_element(Worker, Counters0) of - undefined -> - % this worker lost the race with other partition copies, terminate - gen_server:reply(From, stop), - {ok, State}; - 0 -> - gen_server:reply(From, ok), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), - Total = Total0 + Tot, - Offset = Offset0 + Off, - case fabric_dict:any(0, Counters2) of - true -> - {ok, State#collector{ - counters = Counters2, - total_rows = Total, - offset = Offset - }}; - false -> - FinalOffset = erlang:min(Total, Offset+State#collector.skip), - {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), - {Go, State#collector{ - counters = fabric_dict:decrement_all(Counters2), - total_rows = Total, - offset = FinalOffset, - user_acc = Acc - }} - end - end; - -handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) -> - #collector{callback=Callback} = State, - {_, Acc} = Callback(complete, State#collector.user_acc), - {stop, State#collector{user_acc=Acc}}; - -handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) -> - #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St, - {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), - gen_server:reply(From, ok), - {Go, St#collector{user_acc=Acc, limit=Limit-1}}; - -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{ - query_args = #view_query_args{direction=Dir}, - counters = Counters0, - rows = Rows0, - keys = KeyDict - } = State, - Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows=Rows, counters=Counters1}, - State2 = fabric_view:maybe_pause_worker(Worker, From, State1), - fabric_view:maybe_send_row(State2); - -handle_message(complete, Worker, State) -> - Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), - fabric_view:maybe_send_row(State#collector{counters = Counters}). - -merge_row(fwd, undefined, Row, Rows) -> - lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> - couch_view:less_json([KeyA, IdA], [KeyB, IdB]) - end, [Row], Rows); -merge_row(rev, undefined, Row, Rows) -> - lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> - couch_view:less_json([KeyB, IdB], [KeyA, IdA]) - end, [Row], Rows); -merge_row(_, KeyDict, Row, Rows) -> - lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) -> - if A =:= B -> IdA < IdB; true -> - dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict) - end - end, [Row], Rows). - diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl deleted file mode 100644 index ddde9f22..00000000 --- a/src/fabric_view_reduce.erl +++ /dev/null @@ -1,85 +0,0 @@ --module(fabric_view_reduce). - --export([go/6]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), - go(DbName, DDoc, View, Args, Callback, Acc0); - -go(DbName, DDoc, VName, Args, Callback, Acc0) -> - #group{def_lang=Lang, views=Views} = Group = - couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc), - {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), - {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs), - Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) -> - Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}), - Shard#shard{ref = Ref} - end, mem3:shards(DbName)), - BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"), - #view_query_args{limit = Limit, skip = Skip} = Args, - State = #collector{ - query_args = Args, - callback = Callback, - buffer_size = list_to_integer(BufferSize), - counters = fabric_dict:init(Workers, 0), - keys = Args#view_query_args.keys, - skip = Skip, - limit = Limit, - lang = Group#group.def_lang, - os_proc = couch_query_servers:get_os_process(Lang), - reducer = RedSrc, - rows = dict:new(), - user_acc = Acc0 - }, - try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, - State, infinity, 1000 * 60 * 60) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - Error -> - Error - after - fabric_util:cleanup(Workers), - catch couch_query_servers:ret_os_process(State#collector.os_proc) - end. - -handle_message({rexi_DOWN, _, _, _}, nil, State) -> - % TODO see if progress can be made here, possibly by removing all shards - % from that node and checking is_progress_possible - {ok, State}; - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), - #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, - Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters}}; - false -> - Callback({error, dead_shards}, Acc), - {error, dead_shards} - end; - -handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> - #collector{counters = Counters0, rows = Rows0} = State, - case fabric_dict:lookup_element(Worker, Counters0) of - undefined -> - % this worker lost the race with other partition copies, terminate it - gen_server:reply(From, stop), - {ok, State}; - _ -> - Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0), - C1 = fabric_dict:update_counter(Worker, 1, Counters0), - % TODO time this call, if slow don't do it every time - C2 = fabric_view:remove_overlapping_shards(Worker, C1), - State1 = State#collector{rows=Rows, counters=C2}, - State2 = fabric_view:maybe_pause_worker(Worker, From, State1), - fabric_view:maybe_send_row(State2) - end; - -handle_message(complete, Worker, State) -> - Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), - fabric_view:maybe_send_row(State#collector{counters = Counters}). |
