From 3479ad962cb9c5be51915d2f9c8708c193647532 Mon Sep 17 00:00:00 2001 From: Ivan Orlov Date: Thu, 20 Aug 2015 13:04:35 -0700 Subject: [PATCH] HUE-2924 [core] Implement proxy filesystem Add ProxyFS that is able to serve operations on top of multiple filesystems. --- apps/beeswax/src/beeswax/conf.py | 4 +- apps/jobbrowser/src/jobbrowser/tests.py | 2 +- apps/security/src/security/api/test_hive.py | 12 +- desktop/core/src/desktop/lib/fs/__init__.py | 21 ++ desktop/core/src/desktop/lib/fs/proxyfs.py | 209 ++++++++++++++++++++ desktop/core/src/desktop/lib/fs/proxyfs_test.py | 66 +++++++ desktop/core/src/desktop/lib/fsmanager.py | 78 +++++--- desktop/core/src/desktop/lib/test_utils.py | 12 ++ desktop/core/src/desktop/middleware.py | 10 +- desktop/libs/hadoop/src/hadoop/mini_cluster.py | 5 +- desktop/libs/hadoop/src/hadoop/pseudo_hdfs4.py | 5 +- desktop/libs/hadoop/src/hadoop/tests.py | 15 +- .../liboozie/src/liboozie/submittion2_tests.py | 7 +- .../libs/liboozie/src/liboozie/submittion_tests.py | 7 +- 14 files changed, 393 insertions(+), 60 deletions(-) create mode 100644 desktop/core/src/desktop/lib/fs/__init__.py create mode 100644 desktop/core/src/desktop/lib/fs/proxyfs.py create mode 100644 desktop/core/src/desktop/lib/fs/proxyfs_test.py diff --git a/apps/beeswax/src/beeswax/conf.py b/apps/beeswax/src/beeswax/conf.py index 8efa2f44c0d210b31467c6770e98aa37a6f3578c..54586b5aa22f2e0728e296617e97b19ef9c7ec24 100644 --- a/apps/beeswax/src/beeswax/conf.py +++ b/apps/beeswax/src/beeswax/conf.py @@ -151,9 +151,9 @@ def config_validator(user): res.append((NICE_NAME, _(msg))) try: - from hadoop import cluster + from desktop.lib.fsmanager import get_filesystem warehouse = beeswax.hive_site.get_metastore_warehouse_dir() - fs = cluster.get_hdfs() + fs = get_filesystem() fs.stats(warehouse) except Exception: msg = 'Failed to access Hive warehouse: %s' diff --git a/apps/jobbrowser/src/jobbrowser/tests.py b/apps/jobbrowser/src/jobbrowser/tests.py index 252f98099cfc8aafe238c9ff649a06090b7c7842..02c4530fdb265059d2a5c073f99a6af318a18681 100644 --- a/apps/jobbrowser/src/jobbrowser/tests.py +++ b/apps/jobbrowser/src/jobbrowser/tests.py @@ -126,7 +126,7 @@ class TestJobBrowserWithHadoop(unittest.TestCase, OozieServerProvider): # Remove user home directories. cls.cluster.fs.do_as_superuser(cls.cluster.fs.rmtree, cls.home_dir) except: - LOG.exception('failed to teardown %s' % self.home_dir) + LOG.exception('failed to teardown %s' % cls.home_dir) cls.cluster.fs.setuser(cls.prev_user) @classmethod diff --git a/apps/security/src/security/api/test_hive.py b/apps/security/src/security/api/test_hive.py index ad4ed6c0286b95d058f3a66a9b92a1323bd6f448..bf53459098f3bd9899d2acdf42039187c84a5410 100644 --- a/apps/security/src/security/api/test_hive.py +++ b/apps/security/src/security/api/test_hive.py @@ -20,12 +20,11 @@ import json from django.core.urlresolvers import reverse from nose.plugins.skip import SkipTest -from nose.tools import assert_true, assert_equal, assert_false - -from hadoop import cluster +from nose.tools import assert_equal from hadoop.conf import HDFS_CLUSTERS +from desktop.lib.test_utils import clear_sys_caches from desktop.lib.django_test_util import make_logged_in_client from desktop.lib.test_utils import grant_access, add_to_group @@ -36,8 +35,8 @@ from security.api.hive import _massage_uri, _get_splitted_path def mocked_get_api(user): return MockHiveApi(user) -class MockHiveApi(object): +class MockHiveApi(object): def __init__(self, user): self.user = user @@ -86,7 +85,7 @@ class TestUtils(object): def test_massage_uri(self): finish = HDFS_CLUSTERS['default'].LOGICAL_NAME.set_for_testing('namenode') - cluster.clear_caches() + clear_sys_caches() try: assert_equal('', _massage_uri('')) @@ -103,9 +102,8 @@ class TestUtils(object): finally: finish() - finish = HDFS_CLUSTERS['default'].FS_DEFAULTFS.set_for_testing('hdfs://fs_defaultfs:8021') - cluster.clear_caches() + clear_sys_caches() try: assert_equal('', _massage_uri('')) diff --git a/desktop/core/src/desktop/lib/fs/__init__.py b/desktop/core/src/desktop/lib/fs/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..0dbb9bc61e6943146afdcf09afb6d7f5790ecb64 --- /dev/null +++ b/desktop/core/src/desktop/lib/fs/__init__.py @@ -0,0 +1,21 @@ +# Licensed to Cloudera, Inc. under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. Cloudera, Inc. licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +from desktop.lib.fs.proxyfs import ProxyFS + + diff --git a/desktop/core/src/desktop/lib/fs/proxyfs.py b/desktop/core/src/desktop/lib/fs/proxyfs.py new file mode 100644 index 0000000000000000000000000000000000000000..e6cd18a9f8e5bdb6e05719256d7371a1c65f721a --- /dev/null +++ b/desktop/core/src/desktop/lib/fs/proxyfs.py @@ -0,0 +1,209 @@ +# Licensed to Cloudera, Inc. under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. Cloudera, Inc. licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import posixpath +import errno + +from urlparse import urlparse + + +class ProxyFS(object): + def __init__(self, filesystems_dict, default_scheme): + if default_scheme not in filesystems_dict: + raise ValueError( + 'Default scheme "%s" is not a member of provided schemes: %s' % (default_scheme, filesystems_dict.keys())) + + self._fs_dict = filesystems_dict + self._fs_set = set(self._fs_dict.values()) + self._default_scheme = default_scheme + self._default_fs = self._fs_dict[self._default_scheme] + + def __getattr__(self, item): + if hasattr(self, "_default_fs"): + return getattr(object.__getattribute__(self, "_default_fs"), item) + else: + raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, item)) + + def __setattr__(self, key, value): + if hasattr(self, "_default_fs") and hasattr(self._default_fs, key): + setattr(self._default_fs, key, value) + else: + object.__setattr__(self, key, value) + + def _get_scheme(self, path): + split = urlparse(path) + if split.scheme: + return split.scheme + if path and path[0] == posixpath.sep: + return self._default_scheme + + def _get_fs(self, path): + scheme = self._get_scheme(path) + if not scheme: + raise IOError(errno.EINVAL, 'Can not figure out scheme for path "%s"' % path) + try: + return self._fs_dict[scheme] + except KeyError: + raise IOError(errno.EINVAL, 'Unknown scheme %s, available schemes: %s' % (scheme, self._fs_dict.keys())) + + def _get_fs_pair(self, src, dst): + """ + Returns two FS for source and destination paths respectively. + If `dst` is not self-contained path assumes it's relative path to `src`. + """ + src_fs = self._get_fs(src) + dst_scheme = self._get_scheme(dst) + if not dst_scheme: + return src_fs, src_fs + return src_fs, self._get_fs(dst) + + def setuser(self, user): + """Set a new user. Return the current user.""" + curr = self.user + for fs in self._fs_set: + fs.setuser(user) + return curr + + def do_as_user(self, username, fn, *args, **kwargs): + prev = self.user + try: + self.setuser(username) + return fn(*args, **kwargs) + finally: + self.setuser(prev) + + def do_as_superuser(self, fn, *args, **kwargs): + return self.do_as_user(self._default_fs.superuser, fn, *args, **kwargs) + + # Proxy methods to suitable filesystem + # ------------------------------------ + def isdir(self, path): + return self._get_fs(path).isdir(path) + + def isfile(self, path): + return self._get_fs(path).isfile(path) + + def stats(self, path): + return self._get_fs(path).stats(path) + + def listdir_stats(self, path, **kwargs): + return self._get_fs(path).listdir_stats(path, **kwargs) + + def listdir(self, path, glob=None): + return self._get_fs(path).listdir(path, glob) + + def normpath(self, path): + return self._get_fs(path).normpath(path) + + def open(self, path, *args, **kwargs): + return self._get_fs(path).open(path, *args, **kwargs) + + def exists(self, path): + return self._get_fs(path).exists(path) + + def isroot(self, path): + return self._get_fs(path).isroot(path) + + def join(self, first, *comp_list): + return self._get_fs(first).join(first, *comp_list) + + def mkdir(self, path, *args, **kwargs): + return self._get_fs(path).mkdir(path, *args, **kwargs) + + def read(self, path, *args, **kwargs): + return self._get_fs(path).read(path, *args, **kwargs) + + def append(self, path, *args, **kwargs): + return self._get_fs(path).append(path, *args, **kwargs) + + def rmtree(self, path, *args, **kwargs): + self._get_fs(path).rmtree(path, *args, **kwargs) + + def remove(self, path, skip_trash=False): + self._get_fs(path).remove(path, skip_trash) + + def restore(self, path): + self._get_fs(path).restore(path) + + def create(self, path, *args, **kwargs): + self._get_fs(path).create(path, *args, **kwargs) + + def create_home_dir(self, home_path=None): + if home_path is None: + home_path = self.get_home_dir() + self._get_fs(home_path).create_home_dir(home_path) + + def chown(self, path, *args, **kwargs): + self._get_fs(path).chown(path, *args, **kwargs) + + def chmod(self, path, *args, **kwargs): + self._get_fs(path).chmod(path, *args, **kwargs) + + def copyFromLocal(self, local_src, remote_dst, *args, **kwargs): + self._get_fs(remote_dst).copyFromLocal(local_src, remote_dst, *args, **kwargs) + + def mktemp(self, subdir='', prefix='tmp', basedir=None): + fs = basedir and self._get_fs(basedir) or self.default_fs + return fs.mktemp(subdir=subdir, prefix=prefix, basedir=basedir) + + def purge_trash(self): + for fs in self.fs_set: + if hasattr(fs, 'purge_trash'): + fs.purge_trash() + + # Handle file systems interactions + # -------------------------------- + def copy(self, src, dst, *args, **kwargs): + src_fs, dst_fs = self._get_fs_pair(src, dst) + op = src_fs.copy if src_fs is dst_fs else self._copy_between_filesystems + return op(src, dst, *args, **kwargs) + + def _copy_between_filesystems(self, src, dst, recursive=False, *args, **kwargs): + raise NotImplementedError("Will be addressed in HUE-2934") + + def copyfile(self, src, dst, *args, **kwargs): + src_fs, dst_fs = self._get_fs_pair(src, dst) + op = src_fs.copyfile if src_fs is dst_fs else self._copyfile_between_filesystems + return op(src, dst, *args, **kwargs) + + def _copyfile_between_filesystems(self, src, dst, *args, **kwargs): + raise NotImplementedError("Will be addressed in HUE-2934") + + def copy_remote_dir(self, src, dst, *args, **kwargs): + src_fs, dst_fs = self._get_fs_pair(src, dst) + op = src_fs.copy_remote_dir if src_fs is dst_fs else self._copy_remote_dir_between_filesystems + return op(src, dst, *args, **kwargs) + + def _copy_remote_dir_between_filesystems(self, src, dst, *args, **kwargs): + raise NotImplementedError("Will be addressed in HUE-2934") + + def rename(self, old, new): + old_fs, new_fs = self._get_fs_pair(old, new) + op = old_fs.rename if old_fs is new_fs else self._rename_between_filesystems + return op(old, new) + + def _rename_between_filesystems(self, old, new): + raise NotImplementedError("Will be addressed in HUE-2934") + + def rename_star(self, old_dir, new_dir): + old_fs, new_fs = self._get_fs_pair(old_dir, new_dir) + op = old_fs.rename_star if old_fs is new_fs else self._rename_star_between_filesystems + return op(old_dir, new_dir) + + def _rename_star_between_filesystems(self, old, new): + raise NotImplementedError("Will be addressed in HUE-2934") diff --git a/desktop/core/src/desktop/lib/fs/proxyfs_test.py b/desktop/core/src/desktop/lib/fs/proxyfs_test.py new file mode 100644 index 0000000000000000000000000000000000000000..45bca2c17882543874d6907ddfbe601597afa8be --- /dev/null +++ b/desktop/core/src/desktop/lib/fs/proxyfs_test.py @@ -0,0 +1,66 @@ +# Licensed to Cloudera, Inc. under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. Cloudera, Inc. licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +from mock import MagicMock +from nose.tools import assert_raises, assert_false + +from desktop.lib.fs import ProxyFS + + +def test_fs_selection(): + s3fs, hdfs = MagicMock(), MagicMock() + proxy_fs = ProxyFS({'s3': s3fs, 'hdfs': hdfs}, 'hdfs') + + proxy_fs.isdir('s3://bucket/key') + s3fs.isdir.assert_called_once_with('s3://bucket/key') + assert_false(hdfs.isdir.called) + + proxy_fs.isfile('hdfs://localhost:42/user/alice/file') + hdfs.isfile.assert_called_once_with('hdfs://localhost:42/user/alice/file') + assert_false(s3fs.isfile.called) + + proxy_fs.open('/user/alice/file') + hdfs.open.assert_called_once_with('/user/alice/file') + assert_false(s3fs.open.called) + + assert_raises(IOError, proxy_fs.stats, 'ftp://host') + assert_raises(IOError, proxy_fs.stats, 's3//bucket/key') + + +def test_multi_fs_selection(): + s3fs, hdfs = MagicMock(), MagicMock() + proxy_fs = ProxyFS({'s3': s3fs, 'hdfs': hdfs}, 'hdfs') + + proxy_fs.copy('s3://bucket1/key', 's3://bucket2/key') + s3fs.copy.assert_called_once_with('s3://bucket1/key', 's3://bucket2/key') + assert_false(hdfs.copy.called) + + proxy_fs.copyfile('s3://bucket/key', 'key2') + s3fs.copyfile.assert_called_once_with('s3://bucket/key', 'key2') + assert_false(hdfs.copyfile.called) + + proxy_fs.rename('/tmp/file', 'shmile') + hdfs.rename.assert_called_once_with('/tmp/file', 'shmile') + assert_false(s3fs.rename.called) + + # Will be addressed in HUE-2934 + assert_raises(NotImplementedError, proxy_fs.copy_remote_dir, 's3://bucket/key', '/tmp/dir') + + +def test_constructor_given_invalid_arguments(): + assert_raises(ValueError, ProxyFS, {'s3': {}}, 'hdfs') diff --git a/desktop/core/src/desktop/lib/fsmanager.py b/desktop/core/src/desktop/lib/fsmanager.py index fd021f86529dbfa6f02610620c70f4f7d2a9bd97..d226c95b2d1b650ce2514b58e3414fe5723a979d 100644 --- a/desktop/core/src/desktop/lib/fsmanager.py +++ b/desktop/core/src/desktop/lib/fsmanager.py @@ -15,39 +15,63 @@ # See the License for the specific language governing permissions and # limitations under the License. -from hadoop.cluster import get_all_hdfs +from __future__ import absolute_import -_filesystems = None +import sys +import logging -def _init_filesystems(): - """Initialize the module-scoped filesystem dictionary.""" - global _filesystems - if _filesystems is not None: - return - _filesystems = get_all_hdfs() +from desktop.lib.fs import ProxyFS +from hadoop import cluster +FS_CACHE = {} -def get_filesystem(name): - """Return the filesystem with the given name. If the filesystem is not defined, - raises KeyError""" - _init_filesystems() - return _filesystems[name] +DEFAULT_SCHEMA = 'hdfs' -def get_default_hdfs(): +FS_GETTERS = { + "hdfs": cluster.get_hdfs +} + + +def get_filesystem(name='default'): + """ + Return the filesystem with the given name. + If the filesystem is not defined, raises KeyError + """ + if name not in FS_CACHE: + FS_CACHE[name] = _make_fs(name) + return FS_CACHE[name] + + +def _make_fs(name): + fs_dict = {} + for schema, getter in FS_GETTERS.iteritems(): + try: + fs = getter(name) + fs_dict[schema] = fs + except KeyError: + if DEFAULT_SCHEMA == schema: + logging.error('Can not get filesystem called "%s" for default schema "%s"' % (name, schema)) + exc_class, exc, tb = sys.exc_info() + raise exc_class, exc, tb + else: + logging.warn('Can not get filesystem called "%s" for "%s" schema' % (name, schema)) + return ProxyFS(fs_dict, DEFAULT_SCHEMA) + + +def clear_cache(): """ - Return the (name, fs) for the default hdfs. - Return (None, None) if no hdfs cluster configured + Clears internal cache. Returns + something that can be given back to restore_cache. """ - _init_filesystems() - for name, fs in _filesystems.iteritems(): - # Return the first HDFS encountered - if fs.uri.startswith('hdfs') or fs.uri.startswith('http'): - return name, fs - return None, None - -def reset(): + global FS_CACHE + old = FS_CACHE + FS_CACHE = {} + return old + + +def restore_cache(old_cache): """ - reset() -- Forget all cached filesystems and go to a pristine state. + Restores cache from the result of a previous clear_cache call. """ - global _filesystems - _filesystems = None + global FS_CACHE + FS_CACHE = old_cache diff --git a/desktop/core/src/desktop/lib/test_utils.py b/desktop/core/src/desktop/lib/test_utils.py index 9dda30fb0838c1bb1b6ed58011e3d41615f10401..b38e3f0f65c182c9a581ea7f508336a531a024e9 100644 --- a/desktop/core/src/desktop/lib/test_utils.py +++ b/desktop/core/src/desktop/lib/test_utils.py @@ -21,6 +21,9 @@ from lxml import objectify, etree from django.contrib.auth.models import Group, User from useradmin.models import HuePermission, GroupPermission, get_default_user_group +from hadoop import cluster +from desktop.lib import fsmanager + def grant_access(username, groupname, appname): add_permission(username, groupname, 'access', appname) @@ -59,3 +62,12 @@ def reformat_xml(xml_obj): return etree.tostring(objectify.fromstring(xml_obj, etree.XMLParser(strip_cdata=False, remove_blank_text=True))) else: return etree.tostring(xml_obj) + + +def clear_sys_caches(): + return cluster.clear_caches(), fsmanager.clear_cache() + + +def restore_sys_caches(old_caches): + cluster.restore_caches(old_caches[0]) + fsmanager.restore_cache(old_caches[1]) \ No newline at end of file diff --git a/desktop/core/src/desktop/middleware.py b/desktop/core/src/desktop/middleware.py index 11d5fd70088c52a8417d0c409fca1172e27ed98e..e3f74be706e82669ba05ca225c1b181ba9b1c713 100644 --- a/desktop/core/src/desktop/middleware.py +++ b/desktop/core/src/desktop/middleware.py @@ -40,14 +40,13 @@ from django.core.urlresolvers import resolve from django.http import HttpResponseRedirect, HttpResponse from django.utils.translation import ugettext as _ from django.utils.http import urlquote, is_safe_url -from django.utils.encoding import iri_to_uri import django.views.static import desktop.views import desktop.conf from desktop.context_processors import get_app_name -from desktop.lib import apputil, i18n -from desktop.lib.django_util import render, render_json, is_jframe_request, get_username_re_rule, get_groupname_re_rule +from desktop.lib import apputil, i18n, fsmanager +from desktop.lib.django_util import render, render_json, get_username_re_rule, get_groupname_re_rule from desktop.lib.exceptions import StructuredException from desktop.lib.exceptions_renderable import PopupException from desktop.log.access import access_log, log_page_hit @@ -121,10 +120,7 @@ class ClusterMiddleware(object): if "fs" in view_kwargs: del view_kwargs["fs"] - try: - request.fs = cluster.get_hdfs(request.fs_ref) - except KeyError: - raise KeyError(_('Cannot find HDFS called "%(fs_ref)s".') % {'fs_ref': request.fs_ref}) + request.fs = fsmanager.get_filesystem(request.fs_ref) if request.user.is_authenticated(): if request.fs is not None: diff --git a/desktop/libs/hadoop/src/hadoop/mini_cluster.py b/desktop/libs/hadoop/src/hadoop/mini_cluster.py index b2c118c201fe9badc88ba92d2512cee6f5fbc64e..b435fef0254fb0bb8125df9bd6ca922be87dca34 100644 --- a/desktop/libs/hadoop/src/hadoop/mini_cluster.py +++ b/desktop/libs/hadoop/src/hadoop/mini_cluster.py @@ -53,6 +53,7 @@ import lxml.etree import urllib2 from desktop.lib import python_util +from desktop.lib.test_utils import clear_sys_caches, restore_sys_caches from hadoop.fs.hadoopfs import HadoopFileSystem from hadoop.job_tracker import LiveJobTracker @@ -419,11 +420,11 @@ def shared_cluster(conf=False): # This is djanky (that's django for "janky"). # Caches are tricky w.r.t. to to testing; # perhaps there are better patterns? - old = hadoop.cluster.clear_caches() + old_caches = clear_sys_caches() def finish(): if conf: - hadoop.cluster.restore_caches(old) + restore_sys_caches(old_caches) for x in closers: x() diff --git a/desktop/libs/hadoop/src/hadoop/pseudo_hdfs4.py b/desktop/libs/hadoop/src/hadoop/pseudo_hdfs4.py index f5672a576ba3b221fc0a5377789357bf2d59b14f..67fd0cca1c859b74fb75a74fcabd7206115338f6 100755 --- a/desktop/libs/hadoop/src/hadoop/pseudo_hdfs4.py +++ b/desktop/libs/hadoop/src/hadoop/pseudo_hdfs4.py @@ -28,6 +28,7 @@ import textwrap import time from desktop.lib.python_util import find_unused_port +from desktop.lib.test_utils import clear_sys_caches, restore_sys_caches import hadoop from hadoop import cluster @@ -574,10 +575,10 @@ def shared_cluster(): hadoop.conf.YARN_CLUSTERS['default'].HISTORY_SERVER_API_URL.set_for_testing('%s:%s' % (cluster._fqdn, cluster._jh_web_port,)), ] - old = hadoop.cluster.clear_caches() + old_caches = clear_sys_caches() def restore_config(): - hadoop.cluster.restore_caches(old) + restore_sys_caches(old_caches) for x in closers: x() diff --git a/desktop/libs/hadoop/src/hadoop/tests.py b/desktop/libs/hadoop/src/hadoop/tests.py index e37f66e72c6d5ca7ed28451cb33da46184667b42..b57789f77e705a4491e385df9c9a51341e10c5cb 100644 --- a/desktop/libs/hadoop/src/hadoop/tests.py +++ b/desktop/libs/hadoop/src/hadoop/tests.py @@ -23,7 +23,10 @@ from nose.plugins.attrib import attr from nose.plugins.skip import SkipTest import desktop.conf as desktop_conf + +from desktop.lib.test_utils import clear_sys_caches, restore_sys_caches from desktop.lib.django_test_util import make_logged_in_client + from hadoop import cluster from hadoop import conf from hadoop import confparse @@ -119,7 +122,7 @@ def test_config_validator_basic(): conf.MR_CLUSTERS.set_for_testing({'default': {}}), conf.MR_CLUSTERS['default'].JT_THRIFT_PORT.set_for_testing(70000), ) - old = cluster.clear_caches() + old_caches = clear_sys_caches() try: cli = make_logged_in_client() resp = cli.get('/desktop/debug/check_config') @@ -127,7 +130,7 @@ def test_config_validator_basic(): finally: for old_conf in reset: old_conf() - cluster.restore_caches(old) + restore_sys_caches(old_caches) @attr('requires_hadoop') @@ -139,7 +142,7 @@ def test_config_validator_more(): minicluster = pseudo_hdfs4.shared_cluster() cli = make_logged_in_client() - old = cluster.clear_caches() + old_caches = clear_sys_caches() try: resp = cli.get('/debug/check_config') @@ -148,12 +151,12 @@ def test_config_validator_more(): assert_false('Failed to chown' in resp.content) assert_false('Failed to delete' in resp.content) finally: - cluster.restore_caches(old) + restore_sys_caches(old_caches) def test_non_default_cluster(): NON_DEFAULT_NAME = 'non_default' - old = cluster.clear_caches() + old_caches = clear_sys_caches() reset = ( conf.HDFS_CLUSTERS.set_for_testing({ NON_DEFAULT_NAME: { } }), conf.MR_CLUSTERS.set_for_testing({ NON_DEFAULT_NAME: { } }), @@ -171,7 +174,7 @@ def test_non_default_cluster(): finally: for old_conf in reset: old_conf() - cluster.restore_caches(old) + restore_sys_caches(old_caches) def test_hdfs_ssl_validate(): diff --git a/desktop/libs/liboozie/src/liboozie/submittion2_tests.py b/desktop/libs/liboozie/src/liboozie/submittion2_tests.py index a42d3dfda9a82bb2a156dc5f5f848ac46b7562ca..50b1a2dfddb77cbaa8697d058db74982fe7314e9 100644 --- a/desktop/libs/liboozie/src/liboozie/submittion2_tests.py +++ b/desktop/libs/liboozie/src/liboozie/submittion2_tests.py @@ -24,6 +24,7 @@ from nose.tools import assert_equal, assert_true, assert_not_equal from hadoop import cluster, pseudo_hdfs4 from hadoop.conf import HDFS_CLUSTERS, MR_CLUSTERS, YARN_CLUSTERS +from desktop.lib.test_utils import clear_sys_caches from desktop.lib.django_test_util import make_logged_in_client from oozie.models2 import Node from oozie.tests import OozieMockBase @@ -206,7 +207,7 @@ class TestSubmission(OozieMockBase): submission._update_properties('jtaddress', 'deployment-directory') assert_equal(final_properties, submission.properties) - cluster.clear_caches() + clear_sys_caches() fs = cluster.get_hdfs() jt = cluster.get_next_ha_mrcluster()[1] final_properties = properties.copy() @@ -221,7 +222,7 @@ class TestSubmission(OozieMockBase): finish.append(HDFS_CLUSTERS['default'].LOGICAL_NAME.set_for_testing('namenode')) finish.append(MR_CLUSTERS['default'].LOGICAL_NAME.set_for_testing('jobtracker')) - cluster.clear_caches() + clear_sys_caches() fs = cluster.get_hdfs() jt = cluster.get_next_ha_mrcluster()[1] final_properties = properties.copy() @@ -234,7 +235,7 @@ class TestSubmission(OozieMockBase): submission._update_properties('jtaddress', 'deployment-directory') assert_equal(final_properties, submission.properties) finally: - cluster.clear_caches() + clear_sys_caches() for reset in finish: reset() diff --git a/desktop/libs/liboozie/src/liboozie/submittion_tests.py b/desktop/libs/liboozie/src/liboozie/submittion_tests.py index cf7fa3b8c83b009fbdf0779f7e0c93f532384ee9..6b041831b70999f5552fbde4cf4fd10965b426d5 100644 --- a/desktop/libs/liboozie/src/liboozie/submittion_tests.py +++ b/desktop/libs/liboozie/src/liboozie/submittion_tests.py @@ -26,6 +26,7 @@ from hadoop.conf import HDFS_CLUSTERS, MR_CLUSTERS, YARN_CLUSTERS from liboozie.submittion import Submission from oozie.tests import OozieMockBase +from desktop.lib.test_utils import clear_sys_caches from desktop.lib.django_test_util import make_logged_in_client @@ -183,7 +184,7 @@ class TestSubmission(OozieMockBase): submission._update_properties('jtaddress', 'deployment-directory') assert_equal(final_properties, submission.properties) - cluster.clear_caches() + clear_sys_caches() fs = cluster.get_hdfs() jt = cluster.get_next_ha_mrcluster()[1] final_properties = properties.copy() @@ -198,7 +199,7 @@ class TestSubmission(OozieMockBase): finish.append(HDFS_CLUSTERS['default'].LOGICAL_NAME.set_for_testing('namenode')) finish.append(MR_CLUSTERS['default'].LOGICAL_NAME.set_for_testing('jobtracker')) - cluster.clear_caches() + clear_sys_caches() fs = cluster.get_hdfs() jt = cluster.get_next_ha_mrcluster()[1] final_properties = properties.copy() @@ -211,7 +212,7 @@ class TestSubmission(OozieMockBase): submission._update_properties('jtaddress', 'deployment-directory') assert_equal(final_properties, submission.properties) finally: - cluster.clear_caches() + clear_sys_caches() for reset in finish: reset() -- 1.7.9.5