diff options
9 files changed, 357 insertions, 219 deletions
diff --git a/common/changes/feature_4836_allow-sync-of-large-files b/common/changes/feature_4836_allow-sync-of-large-files new file mode 100644 index 00000000..f124e899 --- /dev/null +++ b/common/changes/feature_4836_allow-sync-of-large-files @@ -0,0 +1 @@ + o Allow sync of large files (~100MB) (#4836). diff --git a/common/src/leap/soledad/common/couch.py b/common/src/leap/soledad/common/couch.py index 45ca4282..3ca3f408 100644 --- a/common/src/leap/soledad/common/couch.py +++ b/common/src/leap/soledad/common/couch.py @@ -27,7 +27,7 @@ import socket from couchdb.client import Server -from couchdb.http import ResourceNotFound, Unauthorized, ServerError +from couchdb.http import ResourceNotFound, Unauthorized, ServerError, Session from u1db import query_parser, vectorclock from u1db.errors import ( DatabaseDoesNotExist, @@ -50,6 +50,9 @@ from leap.soledad.common.document import SoledadDocument logger = logging.getLogger(__name__) +COUCH_TIMEOUT = 120 # timeout for transfers between Soledad server and Couch + + class InvalidURLError(Exception): """ Exception raised when Soledad encounters a malformed URL. @@ -275,6 +278,8 @@ class CouchDatabase(CommonBackend): # save params self._url = url self._full_commit = full_commit + if session is None: + session = Session(timeout=COUCH_TIMEOUT) self._session = session self._factory = CouchDocument self._real_replica_uid = None diff --git a/common/src/leap/soledad/common/tests/couchdb.ini.template b/common/src/leap/soledad/common/tests/couchdb.ini.template index 217ae201..1fc2205b 100644 --- a/common/src/leap/soledad/common/tests/couchdb.ini.template +++ b/common/src/leap/soledad/common/tests/couchdb.ini.template @@ -6,7 +6,7 @@ database_dir = %(tempdir)s/lib view_index_dir = %(tempdir)s/lib max_document_size = 4294967296 ; 4 GB -os_process_timeout = 5000 ; 5 seconds. for view and external servers. +os_process_timeout = 120000 ; 120 seconds. for view and external servers. max_dbs_open = 100 delayed_commits = true ; set this to false to ensure an fsync before 201 Created is returned uri_file = %(tempdir)s/lib/couch.uri diff --git a/common/src/leap/soledad/common/tests/test_couch.py b/common/src/leap/soledad/common/tests/test_couch.py index 0e07575d..dc0ea906 100644 --- a/common/src/leap/soledad/common/tests/test_couch.py +++ b/common/src/leap/soledad/common/tests/test_couch.py @@ -81,9 +81,10 @@ class CouchDBWrapper(object): mkdir_p(os.path.join(self.tempdir, 'lib')) mkdir_p(os.path.join(self.tempdir, 'log')) args = ['couchdb', '-n', '-a', confPath] - #null = open('/dev/null', 'w') + null = open('/dev/null', 'w') + self.process = subprocess.Popen( - args, env=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + args, env=None, stdout=null.fileno(), stderr=null.fileno(), close_fds=True) # find port logPath = os.path.join(self.tempdir, 'log', 'couch.log') @@ -126,21 +127,21 @@ class CouchDBTestCase(unittest.TestCase): TestCase base class for tests against a real CouchDB server. """ - def setUp(self): + @classmethod + def setUpClass(cls): """ Make sure we have a CouchDB instance for a test. """ - self.wrapper = CouchDBWrapper() - self.wrapper.start() + cls.wrapper = CouchDBWrapper() + cls.wrapper.start() #self.db = self.wrapper.db - unittest.TestCase.setUp(self) - def tearDown(self): + @classmethod + def tearDownClass(cls): """ Stop CouchDB instance for test. """ - self.wrapper.stop() - unittest.TestCase.tearDown(self) + cls.wrapper.stop() #----------------------------------------------------------------------------- diff --git a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py index 8b001859..5384d465 100644 --- a/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py +++ b/common/src/leap/soledad/common/tests/test_couch_operations_atomicity.py @@ -100,6 +100,7 @@ class CouchAtomicityTestCase(CouchDBTestCase, TestCaseWithServer): self.tempdir = tempfile.mkdtemp(prefix="leap_tests-") def tearDown(self): + self.db.delete_database() CouchDBTestCase.tearDown(self) TestCaseWithServer.tearDown(self) diff --git a/common/src/leap/soledad/common/tests/test_server.py b/common/src/leap/soledad/common/tests/test_server.py index 83df192b..06595ed2 100644 --- a/common/src/leap/soledad/common/tests/test_server.py +++ b/common/src/leap/soledad/common/tests/test_server.py @@ -25,6 +25,7 @@ import tempfile import simplejson as json import mock import time +import binascii from leap.common.testing.basetest import BaseLeapTest @@ -376,6 +377,7 @@ class EncryptedSyncTestCase( doc2 = doclist[0] # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) + db.delete_database() def test_encrypted_sym_sync_with_unicode_passphrase(self): """ @@ -434,8 +436,93 @@ class EncryptedSyncTestCase( doc2 = doclist[0] # assert incoming doc is equal to the first sent doc self.assertEqual(doc1, doc2) + db.delete_database() + + def test_sync_very_large_files(self): + """ + Test if Soledad can sync very large files. + """ + # define the size of the "very large file" + length = 100*(10**6) # 100 MB + self.startServer() + # instantiate soledad and create a document + sol1 = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token' + ) + _, doclist = sol1.get_all_docs() + self.assertEqual([], doclist) + content = binascii.hexlify(os.urandom(length/2)) # len() == length + doc1 = sol1.create_doc({'data': content}) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # instantiate soledad with empty db, but with same secrets path + sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') + _, doclist = sol2.get_all_docs() + self.assertEqual([], doclist) + sol2._secrets_path = sol1.secrets_path + sol2._load_secrets() + sol2._set_secret_id(sol1._secret_id) + # sync the new instance + sol2._server_url = self.getURL() + sol2.sync() + _, doclist = sol2.get_all_docs() + self.assertEqual(1, len(doclist)) + doc2 = doclist[0] + # assert incoming doc is equal to the first sent doc + self.assertEqual(doc1, doc2) + # delete remote database + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-<uuid>". + 'user-user-uuid', + ) + db.delete_database() + def test_sync_many_small_files(self): + """ + Test if Soledad can sync many smallfiles. + """ + number_of_docs = 100 + self.startServer() + # instantiate soledad and create a document + sol1 = self._soledad_instance( + # token is verified in test_target.make_token_soledad_app + auth_token='auth-token' + ) + _, doclist = sol1.get_all_docs() + self.assertEqual([], doclist) + # create many small files + for i in range(0, number_of_docs): + sol1.create_doc(json.loads(simple_doc)) + # sync with server + sol1._server_url = self.getURL() + sol1.sync() + # instantiate soledad with empty db, but with same secrets path + sol2 = self._soledad_instance(prefix='x', auth_token='auth-token') + _, doclist = sol2.get_all_docs() + self.assertEqual([], doclist) + sol2._secrets_path = sol1.secrets_path + sol2._load_secrets() + sol2._set_secret_id(sol1._secret_id) + # sync the new instance + sol2._server_url = self.getURL() + sol2.sync() + _, doclist = sol2.get_all_docs() + self.assertEqual(number_of_docs, len(doclist)) + # assert incoming docs are equal to sent docs + for doc in doclist: + self.assertEqual(sol1.get_doc(doc.doc_id), doc) + # delete remote database + db = CouchDatabase( + self._couch_url, + # the name of the user database is "user-<uuid>". + 'user-user-uuid', + ) + db.delete_database() + class LockResourceTestCase( CouchDBTestCase, TestCaseWithServer): """ @@ -461,6 +548,12 @@ class LockResourceTestCase( def tearDown(self): CouchDBTestCase.tearDown(self) TestCaseWithServer.tearDown(self) + # delete remote database + db = CouchDatabase( + self._couch_url, + 'shared', + ) + db.delete_database() def test__try_obtain_filesystem_lock(self): responder = mock.Mock() diff --git a/server/changes/feature_4836_allow-sync-of-large-files b/server/changes/feature_4836_allow-sync-of-large-files new file mode 100644 index 00000000..f124e899 --- /dev/null +++ b/server/changes/feature_4836_allow-sync-of-large-files @@ -0,0 +1 @@ + o Allow sync of large files (~100MB) (#4836). diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 26c33222..84f6a849 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -87,10 +87,6 @@ and lock documents on the shared database is handled by """ import configparser -import time -import hashlib -import os -import tempfile from u1db.remote import http_app @@ -99,10 +95,6 @@ from u1db.remote import http_app from OpenSSL import tsafe old_tsafe = tsafe -from twisted.web.wsgi import WSGIResource -from twisted.internet import reactor -from twisted.internet.error import TimeoutError -from twisted.python.lockfile import FilesystemLock from twisted import version if version.base() == "12.0.0": # Put OpenSSL's tsafe back into place. This can probably be removed if we @@ -112,25 +104,20 @@ if version.base() == "12.0.0": from leap.soledad.server.auth import SoledadTokenAuthMiddleware from leap.soledad.server.gzip_middleware import GzipMiddleware +from leap.soledad.server.lock_resource import LockResource -from leap.soledad.common import ( - SHARED_DB_NAME, - SHARED_DB_LOCK_DOC_ID_PREFIX, -) +from leap.soledad.common import SHARED_DB_NAME from leap.soledad.common.couch import CouchServerState -from leap.soledad.common.errors import ( - InvalidTokenError, - NotLockedError, - AlreadyLockedError, - LockTimedOutError, - CouldNotObtainLockError, -) #----------------------------------------------------------------------------- # Soledad WSGI application #----------------------------------------------------------------------------- +MAX_REQUEST_SIZE = 200 # in Mb +MAX_ENTRY_SIZE = 200 # in Mb + + class SoledadApp(http_app.HTTPApp): """ Soledad WSGI application @@ -141,6 +128,9 @@ class SoledadApp(http_app.HTTPApp): The name of the shared database that holds user's encrypted secrets. """ + max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 + max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 + def __call__(self, environ, start_response): """ Handle a WSGI call to the Soledad application. @@ -159,195 +149,9 @@ class SoledadApp(http_app.HTTPApp): return http_app.HTTPApp.__call__(self, environ, start_response) -# -# LockResource: a lock based on a document in the shared database. -# - -@http_app.url_to_resource.register -class LockResource(object): - """ - Handle requests for locking documents. - - This class uses Twisted's Filesystem lock to manage a lock in the shared - database. - """ - - url_pattern = '/%s/lock/{uuid}' % SoledadApp.SHARED_DB_NAME - """ - """ - - TIMEOUT = 300 # XXX is 5 minutes reasonable? - """ - The timeout after which the lock expires. - """ - - # used for lock doc storage - TIMESTAMP_KEY = '_timestamp' - LOCK_TOKEN_KEY = '_token' - - FILESYSTEM_LOCK_TRIES = 5 - FILESYSTEM_LOCK_SLEEP_SECONDS = 1 - - def __init__(self, uuid, state, responder): - """ - Initialize the lock resource. Parameters to this constructor are - automatically passed by u1db. - - :param uuid: The user unique id. - :type uuid: str - :param state: The backend database state. - :type state: u1db.remote.ServerState - :param responder: The infrastructure to send responses to client. - :type responder: u1db.remote.HTTPResponder - """ - self._shared_db = state.open_database(SoledadApp.SHARED_DB_NAME) - self._lock_doc_id = '%s%s' % (SHARED_DB_LOCK_DOC_ID_PREFIX, uuid) - self._lock = FilesystemLock( - os.path.join( - tempfile.gettempdir(), - hashlib.sha512(self._lock_doc_id).hexdigest())) - self._state = state - self._responder = responder - - @http_app.http_method(content=str) - def put(self, content=None): - """ - Handle a PUT request to the lock document. - - A lock is a document in the shared db with doc_id equal to - 'lock-<uuid>' and the timestamp of its creation as content. This - method obtains a threaded-lock and creates a lock document if it does - not exist or if it has expired. - - It returns '201 Created' and a pair containing a token to unlock and - the lock timeout, or '403 AlreadyLockedError' and the remaining amount - of seconds the lock will still be valid. - - :param content: The content of the PUT request. It is only here - because PUT requests with empty content are considered - invalid requests by u1db. - :type content: str - """ - # obtain filesystem lock - if not self._try_obtain_filesystem_lock(): - self._responder.send_response_json( - LockTimedOutError.status, # error: request timeout - error=LockTimedOutError.wire_description) - return - - created_lock = False - now = time.time() - token = hashlib.sha256(os.urandom(10)).hexdigest() # for releasing - lock_doc = self._shared_db.get_doc(self._lock_doc_id) - remaining = self._remaining(lock_doc, now) - - # if there's no lock, create one - if lock_doc is None: - lock_doc = self._shared_db.create_doc( - { - self.TIMESTAMP_KEY: now, - self.LOCK_TOKEN_KEY: token, - }, - doc_id=self._lock_doc_id) - created_lock = True - else: - if remaining == 0: - # lock expired, create new one - lock_doc.content = { - self.TIMESTAMP_KEY: now, - self.LOCK_TOKEN_KEY: token, - } - self._shared_db.put_doc(lock_doc) - created_lock = True - - self._try_release_filesystem_lock() - - # send response to client - if created_lock is True: - self._responder.send_response_json( - 201, timeout=self.TIMEOUT, token=token) # success: created - else: - wire_descr = AlreadyLockedError.wire_description - self._responder.send_response_json( - AlreadyLockedError.status, # error: forbidden - error=AlreadyLockedError.wire_description, remaining=remaining) - - @http_app.http_method(token=str) - def delete(self, token=None): - """ - Delete the lock if the C{token} is valid. - - Delete the lock document in case C{token} is equal to the token stored - in the lock document. - - :param token: The token returned when locking. - :type token: str - - :raise NotLockedError: Raised in case the lock is not locked. - :raise InvalidTokenError: Raised in case the token is invalid for - unlocking. - """ - lock_doc = self._shared_db.get_doc(self._lock_doc_id) - if lock_doc is None or self._remaining(lock_doc, time.time()) == 0: - self._responder.send_response_json( - NotLockedError.status, # error: not found - error=NotLockedError.wire_description) - elif token != lock_doc.content[self.LOCK_TOKEN_KEY]: - self._responder.send_response_json( - InvalidTokenError.status, # error: unauthorized - error=InvalidTokenError.wire_description) - else: - self._shared_db.delete_doc(lock_doc) - self._responder.send_response_json(200) # success: should use 204 - # but u1db does not - # support it. - - def _remaining(self, lock_doc, now): - """ - Return the number of seconds the lock contained in C{lock_doc} is - still valid, when compared to C{now}. - - :param lock_doc: The document containing the lock. - :type lock_doc: u1db.Document - :param now: The time to which to compare the lock timestamp. - :type now: float - - :return: The amount of seconds the lock is still valid. - :rtype: float - """ - if lock_doc is not None: - lock_timestamp = lock_doc.content[self.TIMESTAMP_KEY] - remaining = lock_timestamp + self.TIMEOUT - now - return remaining if remaining > 0 else 0.0 - return 0.0 - - def _try_obtain_filesystem_lock(self): - """ - Try to obtain the file system lock. - - @return: Whether the lock was succesfully obtained. - @rtype: bool - """ - tries = self.FILESYSTEM_LOCK_TRIES - while tries > 0: - try: - return self._lock.lock() - except OSError as e: - tries -= 1 - if tries == 0: - raise CouldNotObtainLockError(e.message) - time.sleep(self.FILESYSTEM_LOCK_SLEEP_SECONDS) - return False - - def _try_release_filesystem_lock(self): - """ - Release the filesystem lock. - """ - try: - self._lock.unlock() - return True - except Exception: - return False +http_app.url_to_resource.register(LockResource) +http_app.SyncResource.max_request_size = MAX_REQUEST_SIZE * 1024 * 1024 +http_app.SyncResource.max_entry_size = MAX_ENTRY_SIZE * 1024 * 1024 #----------------------------------------------------------------------------- diff --git a/server/src/leap/soledad/server/lock_resource.py b/server/src/leap/soledad/server/lock_resource.py new file mode 100644 index 00000000..a7870f77 --- /dev/null +++ b/server/src/leap/soledad/server/lock_resource.py @@ -0,0 +1,232 @@ +# -*- coding: utf-8 -*- +# lock_resource.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +LockResource: a lock based on a document in the shared database. +""" + + +import hashlib +import time +import os +import tempfile +import errno + + +from u1db.remote import http_app +from twisted.python.lockfile import FilesystemLock + + +from leap.soledad.common import ( + SHARED_DB_NAME, + SHARED_DB_LOCK_DOC_ID_PREFIX, +) +from leap.soledad.common.errors import ( + InvalidTokenError, + NotLockedError, + AlreadyLockedError, + LockTimedOutError, + CouldNotObtainLockError, +) + + +class LockResource(object): + """ + Handle requests for locking documents. + + This class uses Twisted's Filesystem lock to manage a lock in the shared + database. + """ + + url_pattern = '/%s/lock/{uuid}' % SHARED_DB_NAME + """ + """ + + TIMEOUT = 300 # XXX is 5 minutes reasonable? + """ + The timeout after which the lock expires. + """ + + # used for lock doc storage + TIMESTAMP_KEY = '_timestamp' + LOCK_TOKEN_KEY = '_token' + + FILESYSTEM_LOCK_TRIES = 5 + FILESYSTEM_LOCK_SLEEP_SECONDS = 1 + + def __init__(self, uuid, state, responder): + """ + Initialize the lock resource. Parameters to this constructor are + automatically passed by u1db. + + :param uuid: The user unique id. + :type uuid: str + :param state: The backend database state. + :type state: u1db.remote.ServerState + :param responder: The infrastructure to send responses to client. + :type responder: u1db.remote.HTTPResponder + """ + self._shared_db = state.open_database(SHARED_DB_NAME) + self._lock_doc_id = '%s%s' % (SHARED_DB_LOCK_DOC_ID_PREFIX, uuid) + self._lock = FilesystemLock( + os.path.join( + tempfile.gettempdir(), + hashlib.sha512(self._lock_doc_id).hexdigest())) + self._state = state + self._responder = responder + + @http_app.http_method(content=str) + def put(self, content=None): + """ + Handle a PUT request to the lock document. + + A lock is a document in the shared db with doc_id equal to + 'lock-<uuid>' and the timestamp of its creation as content. This + method obtains a threaded-lock and creates a lock document if it does + not exist or if it has expired. + + It returns '201 Created' and a pair containing a token to unlock and + the lock timeout, or '403 AlreadyLockedError' and the remaining amount + of seconds the lock will still be valid. + + :param content: The content of the PUT request. It is only here + because PUT requests with empty content are considered + invalid requests by u1db. + :type content: str + """ + # obtain filesystem lock + if not self._try_obtain_filesystem_lock(): + self._responder.send_response_json( + LockTimedOutError.status, # error: request timeout + error=LockTimedOutError.wire_description) + return + + created_lock = False + now = time.time() + token = hashlib.sha256(os.urandom(10)).hexdigest() # for releasing + lock_doc = self._shared_db.get_doc(self._lock_doc_id) + remaining = self._remaining(lock_doc, now) + + # if there's no lock, create one + if lock_doc is None: + lock_doc = self._shared_db.create_doc( + { + self.TIMESTAMP_KEY: now, + self.LOCK_TOKEN_KEY: token, + }, + doc_id=self._lock_doc_id) + created_lock = True + else: + if remaining == 0: + # lock expired, create new one + lock_doc.content = { + self.TIMESTAMP_KEY: now, + self.LOCK_TOKEN_KEY: token, + } + self._shared_db.put_doc(lock_doc) + created_lock = True + + self._try_release_filesystem_lock() + + # send response to client + if created_lock is True: + self._responder.send_response_json( + 201, timeout=self.TIMEOUT, token=token) # success: created + else: + self._responder.send_response_json( + AlreadyLockedError.status, # error: forbidden + error=AlreadyLockedError.wire_description, remaining=remaining) + + @http_app.http_method(token=str) + def delete(self, token=None): + """ + Delete the lock if the C{token} is valid. + + Delete the lock document in case C{token} is equal to the token stored + in the lock document. + + :param token: The token returned when locking. + :type token: str + + :raise NotLockedError: Raised in case the lock is not locked. + :raise InvalidTokenError: Raised in case the token is invalid for + unlocking. + """ + lock_doc = self._shared_db.get_doc(self._lock_doc_id) + if lock_doc is None or self._remaining(lock_doc, time.time()) == 0: + self._responder.send_response_json( + NotLockedError.status, # error: not found + error=NotLockedError.wire_description) + elif token != lock_doc.content[self.LOCK_TOKEN_KEY]: + self._responder.send_response_json( + InvalidTokenError.status, # error: unauthorized + error=InvalidTokenError.wire_description) + else: + self._shared_db.delete_doc(lock_doc) + self._responder.send_response_json(200) # success: should use 204 + # but u1db does not + # support it. + + def _remaining(self, lock_doc, now): + """ + Return the number of seconds the lock contained in C{lock_doc} is + still valid, when compared to C{now}. + + :param lock_doc: The document containing the lock. + :type lock_doc: u1db.Document + :param now: The time to which to compare the lock timestamp. + :type now: float + + :return: The amount of seconds the lock is still valid. + :rtype: float + """ + if lock_doc is not None: + lock_timestamp = lock_doc.content[self.TIMESTAMP_KEY] + remaining = lock_timestamp + self.TIMEOUT - now + return remaining if remaining > 0 else 0.0 + return 0.0 + + def _try_obtain_filesystem_lock(self): + """ + Try to obtain the file system lock. + + @return: Whether the lock was succesfully obtained. + @rtype: bool + """ + tries = self.FILESYSTEM_LOCK_TRIES + while tries > 0: + try: + return self._lock.lock() + except OSError as e: + tries -= 1 + if tries == 0: + raise CouldNotObtainLockError(e.message) + time.sleep(self.FILESYSTEM_LOCK_SLEEP_SECONDS) + return False + + def _try_release_filesystem_lock(self): + """ + Release the filesystem lock. + """ + try: + self._lock.unlock() + return True + except OSError as e: + if e.errno == errno.ENOENT: + return True + return False |