Python types 模块,SimpleNamespace() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用types.SimpleNamespace()

项目:fluxpart    作者:usda-ars-ussl    | 项目源码 | 文件源码
def test_water_use_efficiency():
    """ Unit test does not quarantee the wue function is correct, but if
    this fails, something has changed, and better understand why.
    """

    hf_stats = SimpleNamespace(
        rho_vapor=9.607e-3,
        rho_co2=658.8e-6,
        T=28.56 + 273.15,
        P=100.1e3,
        cov_w_q=0.1443e-3,
        cov_w_c=-1.059e-6,
        cov_w_T=0.1359,
        ustar=0.4179,
        rho_totair=1.150)

    wue = water_use_efficiency(hf_stats, meas_ht=7.11, canopy_ht=4.42,
                               ppath='C3', ci_mod='const_ppm',
                               diff_ratio=(1 / 0.7))

    npt.assert_allclose(wue.wue, -6.45e-3, atol=0.005e-3)
    npt.assert_allclose(wue.ambient_h2o, 12.4e-3, atol=0.05e-3)
    npt.assert_allclose(wue.ambient_co2, 638.e-6, atol=0.5e-6)
    npt.assert_allclose(wue.inter_h2o, 28.3e-3, atol=0.05e-3)
    npt.assert_allclose(wue.inter_co2, 492.e-6, atol=0.5e-6)
项目:calm    作者:cygwin    | 项目源码 | 文件源码
def test_package_set(self):
        self.maxDiff = None

        args = types.SimpleNamespace()
        setattr(args, 'arch', 'x86')
        setattr(args, 'dryrun', False)
        setattr(args, 'force', True)
        setattr(args, 'inifile', 'testdata/inifile/setup.ini')
        setattr(args, 'pkglist', 'testdata/pkglist/cygwin-pkg-maint')
        setattr(args, 'rel_area', 'testdata/relarea')
        setattr(args, 'release', 'testing')
        setattr(args, 'setup_version', '4.321')

        packages = package.read_packages(args.rel_area, args.arch)
        package.delete(packages, 'x86/release/nonexistent', 'nosuchfile-1.0.0.tar.xz')
        self.assertEqual(package.validate_packages(args, packages), True)
        package.write_setup_ini(args, packages, args.arch)
        with open(args.inifile) as inifile:
            results = inifile.read()
            # fix the timestamp to match expected
            results = re.sub('setup-timestamp: .*', 'setup-timestamp: 1458221800', results, 1)
            results = re.sub('generated at .*', 'generated at 2016-03-17 13:36:40 GMT', results, 1)
            compare_with_expected_file(self, 'testdata/inifile', (results,), 'setup.ini')

        # XXX: delete a needed package, and check validate fails
项目:pyfc4    作者:ghukill    | 项目源码 | 文件源码
def _build_rdf(self, data=None):

        '''
        Parse incoming rdf as self.rdf.orig_graph, create copy at self.rdf.graph

        Args:
            data (): payload from GET request, expected RDF content in various serialization formats

        Returns:
            None
        '''

        # recreate rdf data
        self.rdf = SimpleNamespace()
        self.rdf.data = data
        self.rdf.prefixes = SimpleNamespace()
        self.rdf.uris = SimpleNamespace()
        # populate prefixes
        for prefix,uri in self.repo.context.items():
            setattr(self.rdf.prefixes, prefix, rdflib.Namespace(uri))
        # graph
        self._parse_graph()
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def internal_api_patcher():
    """
    Fixture that patches certain internal app functions for the entire selenium suite execution
    """
    methods_to_patch = [
        'mail.api.MailgunClient._mailgun_request',
    ]
    patcher_mocks = []
    patchers = [patch(method_name) for method_name in methods_to_patch]
    for patcher in patchers:
        mock = patcher.start()
        mock.name = patcher.attribute
        patcher_mocks.append(mock)
    yield SimpleNamespace(
        patchers=patchers,
        patcher_mocks=patcher_mocks
    )
    for patcher in patchers:
        patcher.stop()
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def mocked_elasticsearch_module_patcher():
    """
    Fixture that patches all indexing API functions that communicate directly with ElasticSearch
    """
    patchers = []
    patcher_mocks = []
    for name, val in tasks.__dict__.items():
        # This looks for functions starting with _ because those are the functions which are imported
        # from indexing_api. The _ lets it prevent name collisions.
        if callable(val) and name.startswith("_"):
            patchers.append(patch('search.tasks.{0}'.format(name), autospec=True))
    for patcher in patchers:
        mock = patcher.start()
        mock.name = patcher.attribute
        patcher_mocks.append(mock)
    yield SimpleNamespace(
        patchers=patchers,
        patcher_mocks=patcher_mocks
    )
    for patcher in patchers:
        patcher.stop()
项目:hugo_jupyter    作者:knowsuchagency    | 项目源码 | 文件源码
def get_packages_from_lockfile():
    """
    Return object that contains default and development packages from Pipfile.lock

    Returns: SimpleNamespace(default=[...], development=[...])

    """

    result = SimpleNamespace(default=list(), development=list())
    lockfile = Path('Pipfile.lock')
    lockfile_data = json.loads(lockfile.read_text())
    for key in ('default', 'develop'):
        for package, version_info in lockfile_data[key].items():
            packages = attrgetter('development' if key == 'develop' else key)(result)
            packages.append(package + version_info['version'])
    return result
项目:pgantomizer    作者:asgeirrr    | 项目源码 | 文件源码
def test_command_line_invokation(original_db, anonymized, monkeypatch):
    monkeypatch.setattr('argparse.ArgumentParser.parse_args', lambda self: SimpleNamespace(
        verbose=False,
        schema=SCHEMA_PATH,
        dump_file=DUMP_PATH,
        **{arg: ORIGINAL_DB_ARGS[arg] for arg in ('dbname', 'user', 'host', 'port', 'password')}
    ))
    dump_main()

    assert os.path.getsize(DUMP_PATH) > 2000
    assert_db_empty(anonymized)
    monkeypatch.setattr('argparse.ArgumentParser.parse_args', lambda self: SimpleNamespace(
        verbose=False,
        leave_dump=False,
        schema=SCHEMA_PATH,
        dump_file=DUMP_PATH,
        **{arg: ANONYMIZED_DB_ARGS[arg] for arg in ('dbname', 'user', 'host', 'port', 'password')}
    ))
    anonymize_main()
    assert_db_anonymized(anonymized)
项目:http-observatory    作者:mozilla    | 项目源码 | 文件源码
def _connect(self):
        try:
            self._conn = psycopg2.connect(database=DATABASE_DB,
                                          host=DATABASE_HOST,
                                          password=DATABASE_PASSWORD,
                                          port=DATABASE_PORT,
                                          sslmode=DATABASE_SSL_MODE,
                                          sslrootcert=DATABASE_CA_CERT,
                                          user=DATABASE_USER)

            if not self._connected:
                print('INFO: Connected to PostgreSQL', file=sys.stderr)
            self._connected = True

        except Exception as e:
            print(e, file=sys.stderr)
            self._conn = SimpleNamespace(closed=1)

            if self._connected:
                print('WARNING: Disconnected from PostgreSQL', file=sys.stderr)
            self._connected = False
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def __generate_reference__(self, triple_map, **kwargs):
        """Generates a RDF entity based on triple map

        Args:
            triple_map(SimpleNamespace): Triple Map
        """
        raw_value = self.source.get(str(triple_map.reference))
        if raw_value is None or len(raw_value) < 1:
            return
        if hasattr(triple_map, "datatype"):
            if triple_map.datatype == NS_MGR.xsd.anyURI:
                output = rdflib.URIRef(raw_value)
            else:
                output = rdflib.Literal(
                    raw_value,
                    datatype=triple_map.datatype)
        else:
            output = rdflib.Literal(raw_value)
        return output
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def __reference_handler__(self, **kwargs):
        """Internal method for handling rr:reference in triples map

        Keyword Args:

        -------------
            predicate_obj_map: SimpleNamespace
            obj: dict
            subject: rdflib.URIRef
        """
        subjects = []
        pred_obj_map = kwargs.get("predicate_obj_map")
        obj = kwargs.get("obj")
        subject = kwargs.get("subject")
        if pred_obj_map.reference is None:
            return subjects
        predicate = pred_obj_map.predicate
        ref_exp = jsonpath_ng.parse(str(pred_obj_map.refernce))
        found_objects = [r.value for r in ref_exp(obj)]
        for row in found_objects:
            self.output.add((subject, predicate, rdflib.Literal(row)))
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def __generate_reference__(self, triple_map, **kwargs):
        """Internal method takes a triple_map and returns the result of
        applying to XPath to the current DOM context

        Args:
        -----
            triple_map: SimpleNamespace
            element: etree.Element
        """
        element = kwargs.get("element")
        found_elements = element.xpath(
            triple_map.reference,
            namespaces=self.xml_ns)
        for elem in found_elements:
            raw_text = elem.text.strip()
            #! Quick and dirty test for valid URI
            if not raw_text.startswith("http"):
                continue
            return rdflib.URIRef(raw_text)
项目:senti    作者:stevenxxiu    | 项目源码 | 文件源码
def _fit_embedding_word(self, embedding_type, construct_docs, tokenize_, d=None):
        if embedding_type == 'google':
            embeddings_ = joblib.load('data/google/GoogleNews-vectors-negative300.pickle')
            embeddings_ = SimpleNamespace(X=embeddings_.syn0, vocab={w: v.index for w, v in embeddings_.vocab.items()})
        elif embedding_type == 'twitter':
            estimator = Pipeline([
                ('tokenize', MapCorporas(tokenize_)),
                ('word2vec', MergeSliceCorporas(CachedFitTransform(Word2Vec(
                    sg=1, size=d, window=10, hs=0, negative=5, sample=1e-3, min_count=1, iter=20, workers=16
                ), self.memory))),
            ]).fit([self.train_docs, self.unsup_docs[:10**6], self.val_docs, self.test_docs])
            embeddings_ = estimator.named_steps['word2vec'].estimator
            embeddings_ = SimpleNamespace(X=embeddings_.syn0, vocab={w: v.index for w, v in embeddings_.vocab.items()})
        else:
            embeddings_ = SimpleNamespace(X=np.empty((0, d)), vocab={})
        estimator = Pipeline([
            ('tokenize', MapCorporas(tokenize_)),
            # 0.25 is chosen so the unknown vectors have approximately the same variance as google pre-trained ones
            ('embeddings', MapCorporas(Embeddings(
                embeddings_, rand=lambda shape: get_rng().uniform(-0.25, 0.25, shape).astype('float32'),
                include_zero=True
            ))),
        ])
        estimator.fit(construct_docs)
        return estimator.named_steps['embeddings'].estimator
项目:stig    作者:rndusr    | 项目源码 | 文件源码
def setUp(self):
        self.rpc = FakeTransmissionRPC()
        self.torrent = FakeTorrentAPI()
        srvapi = SimpleNamespace(rpc=self.rpc,
                                 torrent=self.torrent,
                                 loop=self.loop)
        self.api = StatusAPI(srvapi, interval=1)

        self.rpc.fake_stats = {
            'downloadSpeed': 789,
            'uploadSpeed': 0,
            'activeTorrentCount': 1,
            'pausedTorrentCount': 2,
            'torrentCount': 3,
        }

        self.torrent.fake_tlist = SimpleNamespace(
            torrents=({'status': Status((Status.ISOLATED,)), 'rate-up': 0, 'rate-down': 0},
                      {'status': Status((Status.DOWNLOAD,)), 'rate-up': 0, 'rate-down': 456},
                      {'status': Status((Status.DOWNLOAD, Status.UPLOAD)), 'rate-up': 123, 'rate-down': 456}),
        )
项目:ubuntu-image    作者:CanonicalLtd    | 项目源码 | 文件源码
def run(self, command, *args, **kws):
        if ['lb', 'config'] == command[-2:]:
            self.call_args_list.append(command)
            return SimpleNamespace(returncode=1)
        elif ['lb', 'build'] == command[-2:]:
            self.call_args_list.append(command)
            # Create dummy top-level filesystem layout.
            chroot_dir = os.path.join(self.root_dir, 'chroot')
            for dir_name in DIRS_UNDER_ROOTFS:
                os.makedirs(os.path.join(chroot_dir, dir_name))
            return SimpleNamespace(returncode=1)
        elif command.startswith('dpkg -L'):
            self.call_args_list.append(command)
            stdout = kws.pop('stdout', PIPE)
            stderr = kws.pop('stderr', PIPE)
            return subprocess_run(
                command,
                stdout=stdout, stderr=stderr,
                universal_newlines=True,
                **kws)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_attrdel(self):
        ns1 = types.SimpleNamespace()
        ns2 = types.SimpleNamespace(x=1, y=2, w=3)

        with self.assertRaises(AttributeError):
            del ns1.spam
        with self.assertRaises(AttributeError):
            del ns2.spam

        del ns2.y
        self.assertEqual(vars(ns2), dict(w=3, x=1))
        ns2.y = 'spam'
        self.assertEqual(vars(ns2), dict(w=3, x=1, y='spam'))
        del ns2.y
        self.assertEqual(vars(ns2), dict(w=3, x=1))

        ns1.spam = 5
        self.assertEqual(vars(ns1), dict(spam=5))
        del ns1.spam
        self.assertEqual(vars(ns1), {})
项目:twittback    作者:dmerejkowsky    | 项目源码 | 文件源码
def setup_edit_test(tweet_factory, repository, mock, *, nvim_returncode):
    tweet_1 = tweet_factory.make_tweet(42, "First tweet!", date="2017-07-07")
    tweet_2 = tweet_factory.make_tweet(57, "Second tweet", date="2017-08-02")
    repository.add_tweets([tweet_1, tweet_2])

    spy = types.SimpleNamespace()
    spy.cmd = None

    def fake_run(cmd):
        stub_process = mock.Mock()
        spy.cmd = cmd
        stub_process.returncode = nvim_returncode
        path = cmd[1]
        with open(path, "w") as stream:
            stream.write("changed")
        return stub_process

    mock.patch("subprocess.run", fake_run)
    return spy
项目:RD-MCL    作者:biologyguy    | 项目源码 | 文件源码
def test_walker_set_heat():
    foo_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.1, name="foo", current_value=0.15)
    bar_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.5, name="bar", current_value=0.51)

    walker = mcmcmc._Walker(variables=[foo_var, bar_var], func=lambda *_: 1, heat=0.1, min_max=(1, 5), quiet=True)
    assert walker.heat == 0.1
    walker.set_heat(0.75)
    assert walker.heat == 0.75

    with pytest.raises(ValueError) as err:
        walker.set_heat(-1)
    assert "heat values must be positive, between 0.000001 and 1.0." in str(err)

    with pytest.raises(ValueError) as err:
        walker.set_heat(0)
    assert "heat values must be positive, between 0.000001 and 1.0." in str(err)

    with pytest.raises(ValueError) as err:
        walker.set_heat(1.01)
    assert "heat values must be positive, between 0.000001 and 1.0." in str(err)
项目:RD-MCL    作者:biologyguy    | 项目源码 | 文件源码
def test_chain_init():
    foo_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.1, name="foo", current_value=0.15)
    bar_var = SimpleNamespace(draw_random=lambda: True, draw_value=0.5, name="bar", current_value=0.51)
    walker1 = SimpleNamespace(variables=[foo_var, bar_var])
    walker2 = SimpleNamespace(variables=[foo_var, bar_var])

    tmp_file = br.TempFile()

    chain = mcmcmc._Chain(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=0.01, hot_heat=0.2)
    assert chain.walkers == [walker1, walker2]
    assert chain.outfile == tmp_file.path
    assert chain.cold_heat == 0.01
    assert chain.hot_heat == 0.2
    assert chain.step_counter == 0
    assert chain.best_score_ever_seen == 0
    assert tmp_file.read() == """\
Gen\tfoo\tbar\tresult
"""
项目:RD-MCL    作者:biologyguy    | 项目源码 | 文件源码
def test_chain_dump_obj():
    walker1 = SimpleNamespace(_dump_obj=lambda *_: "walker1")
    walker2 = SimpleNamespace(_dump_obj=lambda *_: "walker2")
    tmp_file = br.TempFile()
    tmp_file.write("outfile results")

    chain = SimpleNamespace(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=0.1, hot_heat=0.2,
                            step_counter=20, best_score_ever_seen=100, _dump_obj=mcmcmc._Chain._dump_obj)

    dump = chain._dump_obj(chain)
    assert dump["walkers"] == ["walker1", "walker2"]
    assert dump["cold_heat"] == 0.1
    assert dump["hot_heat"] == 0.2
    assert dump["step_count"] == 20
    assert dump["best_score"] == 100
    assert dump["results"] == "outfile results"
项目:RD-MCL    作者:biologyguy    | 项目源码 | 文件源码
def test_chain_apply_dump(capsys):
    walker1 = SimpleNamespace(_apply_dump=lambda *_: print("Applying dump to walker1"))
    walker2 = SimpleNamespace(_apply_dump=lambda *_: print("Applying dump to walker2"))

    tmp_file = br.TempFile()
    chain = SimpleNamespace(walkers=[walker1, walker2], outfile=tmp_file.path, cold_heat=None, hot_heat=None,
                            step_counter=None, best_score_ever_seen=None, _apply_dump=mcmcmc._Chain._apply_dump)

    var_dict = {"walkers": [None, None], "cold_heat": 0.1, "hot_heat": 0.2,
                "step_count": 20, "best_score": 100, "results": "Some results"}
    chain._apply_dump(chain, var_dict)
    assert chain.walkers == [walker1, walker2]
    out, err = capsys.readouterr()
    assert out == "Applying dump to walker1\nApplying dump to walker2\n"
    assert chain.cold_heat == 0.1
    assert chain.hot_heat == 0.2
    assert chain.step_counter == 20
    assert chain.best_score_ever_seen == 100
    assert tmp_file.read() == "Some results"
项目:RD-MCL    作者:biologyguy    | 项目源码 | 文件源码
def test_mcmcmc_resume(capsys):
    mc_obj = SimpleNamespace(dumpfile="does_not_exist", resume=mcmcmc.MCMCMC.resume)
    assert mc_obj.resume(mc_obj) is False

    tmp_file = br.TempFile(byte_mode=True)
    dill.dump(["a", "b", "c"], tmp_file)

    mc_obj.dumpfile = tmp_file.path
    chain1 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain1"))
    chain2 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain2"))
    chain3 = SimpleNamespace(_apply_dump=lambda *_: print("applying chain3"))
    mc_obj.chains = [chain1, chain2, chain3]
    mc_obj.run = lambda *_: print("Running")

    assert mc_obj.resume(mc_obj) is True
    out, err = capsys.readouterr()
    assert out == "applying chain1\napplying chain2\napplying chain3\nRunning\n", print(out)
项目:RD-MCL    作者:biologyguy    | 项目源码 | 文件源码
def test_mcmcmc_check_convergence(hf):
    csv_path = os.path.join(hf.resource_path, "mcmcmc", "chain")
    chain1 = SimpleNamespace(step_counter=99, outfile=csv_path + "1.csv")
    chain2 = SimpleNamespace(step_counter=99, outfile=csv_path + "2.csv")
    chain3 = SimpleNamespace(step_counter=99, outfile=csv_path + "3.csv")

    mc_obj = SimpleNamespace(_check_convergence=mcmcmc.MCMCMC._check_convergence, chains=[chain1, chain2, chain3],
                             convergence=1.01)

    # Return False when step_counter < 100
    assert mc_obj._check_convergence(mc_obj) is False

    # Return False when convergence is not met
    chain1.step_counter = chain2.step_counter = chain3.step_counter = 100
    assert mc_obj._check_convergence(mc_obj) is False

    # Return True on convergence
    mc_obj.convergence = 1.1
    assert mc_obj._check_convergence(mc_obj) is True
项目:RD-MCL    作者:biologyguy    | 项目源码 | 文件源码
def test_mcmcmc_reset_params():
    walker1 = SimpleNamespace(params=[1, 2])
    walker2 = SimpleNamespace(params=[3, 4])
    walker3 = SimpleNamespace(params=[5, 6])
    walker4 = SimpleNamespace(params=[7, 8])

    chain1 = SimpleNamespace(walkers=[walker1, walker2])
    chain2 = SimpleNamespace(walkers=[walker3, walker4])

    mc_obj = SimpleNamespace(reset_params=mcmcmc.MCMCMC.reset_params, chains=[chain1, chain2])
    with pytest.raises(AttributeError) as e:
        mc_obj.reset_params(mc_obj, ['a', 'b', 'c'])
    assert "Incorrect number of params supplied in reset_params(). 2 expected; 3 supplied; ['a', 'b', 'c']" in str(e)

    mc_obj.reset_params(mc_obj, ['a', 'b'])
    assert walker2.params == ['a', 'b']
项目:porcupine    作者:Akuli    | 项目源码 | 文件源码
def add_option(self, key, default, *, reset=True):
        """Add a new option without adding widgets to the setting dialog.

        ``section[key]`` will be *default* unless something else is
        specified.

        If *reset* is True, the setting dialog's reset button sets this
        option to *default*.

        .. note::
            The *reset* argument should be False for settings that
            cannot be changed with the dialog. That way, clicking the
            reset button resets only the settings that are shown in the
            dialog.
        """
        self._infos[key] = types.SimpleNamespace(
            default=default,        # not validated
            reset=reset,
            callbacks=[],
            errorvar=tkinter.BooleanVar(),  # true when the triangle is showing
        )
项目:echonet    作者:karoldvl    | 项目源码 | 文件源码
def __init__(self, data_dir, work_dir):
        self.RandomState = np.random.RandomState(seed=20161013)
        self.data_mean = None
        self.data_std = None
        self.class_count = None
        self.meta = None
        self.train_meta = None
        self.test_data = types.SimpleNamespace(X=None, y=None, meta=None)
        self.validation_data = types.SimpleNamespace(X=None, y=None, meta=None)
        self.data_dir = data_dir
        self.work_dir = work_dir

        self.data_dir = os.path.abspath(self.data_dir) + '/'
        self.work_dir = os.path.abspath(self.work_dir) + '/'

        if not os.path.isdir(self.data_dir):
            raise NotADirectoryError("{} is not a proper dataset directory.".format(self.data_dir))
        if not os.path.isdir(self.work_dir):
            raise NotADirectoryError("{} is not a proper working directory.".format(self.work_dir))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_attrdel(self):
        ns1 = types.SimpleNamespace()
        ns2 = types.SimpleNamespace(x=1, y=2, w=3)

        with self.assertRaises(AttributeError):
            del ns1.spam
        with self.assertRaises(AttributeError):
            del ns2.spam

        del ns2.y
        self.assertEqual(vars(ns2), dict(w=3, x=1))
        ns2.y = 'spam'
        self.assertEqual(vars(ns2), dict(w=3, x=1, y='spam'))
        del ns2.y
        self.assertEqual(vars(ns2), dict(w=3, x=1))

        ns1.spam = 5
        self.assertEqual(vars(ns1), dict(spam=5))
        del ns1.spam
        self.assertEqual(vars(ns1), {})
项目:calm    作者:cygwin    | 项目源码 | 文件源码
def test_html_writer(self):
        self.maxDiff = None

        htdocs = 'testdata/htdocs'
        args = types.SimpleNamespace()
        setattr(args, 'arch', 'x86')
        setattr(args, 'htdocs', htdocs)
        setattr(args, 'rel_area', 'testdata/relarea')
        setattr(args, 'dryrun', False)
        setattr(args, 'force', True)
        setattr(args, 'pkglist', 'testdata/pkglist/cygwin-pkg-maint')

        packages = package.read_packages(args.rel_area, args.arch)
        package.validate_packages(args, packages)
        pkg2html.update_package_listings(args, packages, args.arch)

        # compare the output files with expected
        for (dirpath, subdirs, files) in os.walk(htdocs):
            relpath = os.path.relpath(dirpath, htdocs)
            for f in files:
                results = os.path.join(htdocs, relpath, f)
                expected = os.path.join('testdata/htdocs.expected', relpath, f)
                if not filecmp.cmp(results, expected, shallow=False):
                    logging.info("%s different", os.path.join(relpath, f))
                    with open(results) as r, open(expected) as e:
                        self.assertMultiLineEqual(e.read(), r.read())
                else:
                    logging.info("%s identical", os.path.join(relpath, f))
项目:calm    作者:cygwin    | 项目源码 | 文件源码
def test_scan_uploads(self):
        self.maxDiff = None

        args = types.SimpleNamespace()
        setattr(args, 'arch', 'x86')
        setattr(args, 'rel_area', 'testdata/relarea')
        setattr(args, 'dryrun', False)

        pkglist = ['after-ready', 'not-ready', 'testpackage', 'testpackage2']

        mlist = {}
        mlist = maintainers.Maintainer.add_directories(mlist, 'testdata/homes')
        m = mlist['Blooey McFooey']
        m.pkgs.extend(pkglist + ['not-on-package-list'])

        ready_fns = [(os.path.join(m.homedir(), 'x86', 'release', 'testpackage', '!ready'), ''),
                     (os.path.join(m.homedir(), 'x86', 'release', 'testpackage2', 'testpackage2-subpackage', '!ready'), ''),
                     (os.path.join(m.homedir(), 'x86', 'release', 'after-ready', '!ready'), '-t 198709011700'),
                     (os.path.join(m.homedir(), 'x86', 'release', 'corrupt', '!ready'), '')]
        for (f, t) in ready_fns:
            os.system('touch %s "%s"' % (t, f))

        scan_result = uploads.scan(m, pkglist + ['not-on-maintainer-list'], args.arch, args)

        self.assertEqual(scan_result.error, False)
        compare_with_expected_file(self, 'testdata/uploads', dict(scan_result.to_relarea), 'move')
        self.assertCountEqual(scan_result.to_vault, {'x86/release/testpackage': ['x86/release/testpackage/testpackage-0.1-1.tar.bz2']})
        self.assertCountEqual(scan_result.remove_always, [f for (f, t) in ready_fns])
        self.assertEqual(scan_result.remove_success, ['testdata/homes/Blooey McFooey/x86/release/testpackage/-testpackage-0.1-1-src.tar.bz2', 'testdata/homes/Blooey McFooey/x86/release/testpackage/-testpackage-0.1-1.tar.bz2'])
        with pprint_patch():
            compare_with_expected_file(self, 'testdata/uploads', dict(scan_result.packages), 'pkglist')
项目:pyfc4    作者:ghukill    | 项目源码 | 文件源码
def __init__(self,
        repo,
        uri=None,
        response=None,
        rdf_prefixes_mixins=None):

        # repository handle is pinned to resource instance here
        self.repo = repo

        # parse uri with parse_uri() from repo instance
        self.uri = self.repo.parse_uri(uri)

        # parse response

        # if response provided, parse and set to attributes
        if response:
            self.response = response
            self.data = self.response.content
            self.headers = self.response.headers
            self.status_code = self.response.status_code
            # if response, and status_code is 200, set True
            if self.status_code == 200:
                self.exists = True

        # if not response, set all blank
        else:
            self.response = None
            self.data = None
            self.headers = {}
            self.status_code = None
            self.exists = False

        # RDF
        self._build_rdf(data=self.data)

        # versions
        self.versions = SimpleNamespace()
项目:pyfc4    作者:ghukill    | 项目源码 | 文件源码
def parse_object_like_triples(self):

        '''
        method to parse triples from self.rdf.graph for object-like 
        access

        Args:
            None

        Returns:
            None: sets self.rdf.triples
        '''

        # parse triples as object-like attributes in self.rdf.triples
        self.rdf.triples = SimpleNamespace() # prepare triples
        for s,p,o in self.rdf.graph:

            # get ns info
            ns_prefix, ns_uri, predicate = self.rdf.graph.compute_qname(p)

            # if prefix as list not yet added, add
            if not hasattr(self.rdf.triples, ns_prefix):
                setattr(self.rdf.triples, ns_prefix, SimpleNamespace())

            # same for predicate
            if not hasattr(getattr(self.rdf.triples, ns_prefix), predicate):
                setattr(getattr(self.rdf.triples, ns_prefix), predicate, [])            

            # append object for this prefix
            getattr(getattr(self.rdf.triples, ns_prefix), predicate).append(o)
项目:pyfc4    作者:ghukill    | 项目源码 | 文件源码
def _diff_graph(self):

        '''
        Uses rdflib.compare diff, https://github.com/RDFLib/rdflib/blob/master/rdflib/compare.py
        When a resource is retrieved, the graph retrieved and parsed at that time is saved to self.rdf._orig_graph,
        and all local modifications are made to self.rdf.graph.  This method compares the two graphs and returns the diff
        in the format of three graphs:

            overlap - triples shared by both
            removed - triples that exist ONLY in the original graph, self.rdf._orig_graph
            added - triples that exist ONLY in the modified graph, self.rdf.graph

        These are used for building a sparql update query for self.update.

        Args:
            None

        Returns:
            None: sets self.rdf.diffs and adds the three graphs mentioned, 'overlap', 'removed', and 'added'
        '''

        overlap, removed, added = graph_diff(
            to_isomorphic(self.rdf._orig_graph),
            to_isomorphic(self.rdf.graph))
        diffs = SimpleNamespace()
        diffs.overlap = overlap
        diffs.removed = removed
        diffs.added = added
        self.rdf.diffs = diffs
项目:queue-messaging    作者:socialwifi    | 项目源码 | 文件源码
def model(self):
        return types.SimpleNamespace()
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_discovery_from_dotted_namespace_packages(self):
        if not getattr(types, 'SimpleNamespace', None):
            raise unittest.SkipTest('Namespaces not supported')
        loader = unittest.TestLoader()

        orig_import = __import__
        package = types.ModuleType('package')
        package.__path__ = ['/a', '/b']
        package.__spec__ = types.SimpleNamespace(
           loader=None,
           submodule_search_locations=['/a', '/b']
        )

        def _import(packagename, *args, **kwargs):
            sys.modules[packagename] = package
            return package

        def cleanup():
            builtins.__import__ = orig_import
        self.addCleanup(cleanup)
        builtins.__import__ = _import

        _find_tests_args = []
        def _find_tests(start_dir, pattern, namespace=None):
            _find_tests_args.append((start_dir, pattern))
            return ['%s/tests' % start_dir]

        loader._find_tests = _find_tests
        loader.suiteClass = list
        suite = loader.discover('package')
        self.assertEqual(suite, ['/a/tests', '/b/tests'])
项目:rc-niceties    作者:mjec    | 项目源码 | 文件源码
def get(self, url):
        for matcher in self.url_matches:
            if matcher[0].fullmatch(url):
                return SimpleNamespace(data=json.load(
                    open(os.path.join(app.root_path, 'mock/fixtures/' + matcher[1]))
                ))
        return SimpleNamespace(data={})
项目:glin    作者:zaturox    | 项目源码 | 文件源码
def __init__(self, numLed, hwBackend, port=6606):
        self.ctx = zmq.Context()
        self.numLed = numLed
        self.port = port

        self.loop = IOLoop.instance()
        self.caller = PeriodicCallback(self._on_nextFrame, 1000/30, self.loop)
        self.hwComm = hwBackend
        self.hwComm.connect()
        self.zmqCollector = GlinAppZmqCollector(self, self.ctx)
        self.zmqPublisher = GlinAppZmqPublisher(self, self.ctx)

        # server side configuration
        self.config = SimpleNamespace()
        self.config.maxFps = 60

        # current state (somehow client side configuration)
        self.state = SimpleNamespace()
        self.state.animationClasses = []
        self.state.activeSceneId = None
        self.state.activeAnimation = None
        self.state.scenes = {}
        self.state.brightness = 1.0
        self.state.sceneIdCtr = 0
        self.state.mainswitch = True
        self.state.targetFps = 0
        self.state.lastFrameSent = None
项目:git-annex-adapter    作者:alpernebbi    | 项目源码 | 文件源码
def __init__(self, repo):
        # Must have run git-annex init
        if repo.lookup_branch('git-annex') is None:
            fmt = 'Repository {} is not a git-annex repo.'
            msg = fmt.format(repo)
            raise NotAGitAnnexRepoError(msg)

        self.repo = repo

        self.processes = types.SimpleNamespace()
        self.processes.metadata = \
            GitAnnexMetadataBatchJsonProcess(self.repo.workdir)
        self.processes.contentlocation = \
            GitAnnexContentlocationBatchProcess(self.repo.workdir)
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def base_test_data():
    """
    Fixture for test data that should be available to any test case in the suite
    """
    # Create a live program with valid prices and financial aid
    program = ProgramFactory.create(
        live=True,
        financial_aid_availability=True,
        price=1000,
    )
    CourseRunFactory.create(course__program=program)
    TierProgramFactory.create_properly_configured_batch(2, program=program)
    # Create users
    staff_user, student_user = (create_user_for_login(is_staff=True), create_user_for_login(is_staff=False))
    ProgramEnrollment.objects.create(program=program, user=staff_user)
    ProgramEnrollment.objects.create(program=program, user=student_user)
    Role.objects.create(
        role=Staff.ROLE_ID,
        user=staff_user,
        program=program,
    )
    return SimpleNamespace(
        staff_user=staff_user,
        student_user=student_user,
        program=program
    )
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def program_data():
    """
    Fixture for program and tier_program test data
    """
    program, tier_programs = create_program()
    return SimpleNamespace(
        program=program,
        tier_programs=tier_programs,
    )
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def test_props(self, program_data):
        """
        Fixture that provides test properties for FinancialAidDetailView test cases
        """
        user = create_enrolled_profile(program_data.program).user
        pending_fa = FinancialAidFactory.create(
            user=user,
            tier_program=program_data.tier_programs["25k"],
            status=FinancialAidStatus.PENDING_DOCS
        )
        docs_sent_url = reverse(
            "financial_aid",
            kwargs={"financial_aid_id": pending_fa.id}
        )
        docs_sent_date = now_in_utc().date()
        docs_sent_request_params = dict(
            content_type="application/json",
            data=json.dumps({
                "date_documents_sent": docs_sent_date.strftime("%Y-%m-%d")
            })
        )
        return SimpleNamespace(
            user=user,
            pending_fa=pending_fa,
            docs_sent_url=docs_sent_url,
            docs_sent_request_params=docs_sent_request_params,
            docs_sent_date=docs_sent_date,
        )
项目:pyt    作者:python-security    | 项目源码 | 文件源码
def __getattr__(self, name):
            try:
                return super(SimpleNamespace, self).__getitem__(name)
            except KeyError:
                raise AttributeError('{0} has no attribute {1}'
                                     .format(self.__class__.__name__, name))
项目:pyt    作者:python-security    | 项目源码 | 文件源码
def __setattr__(self, name, value):
            super(SimpleNamespace, self).__setitem__(name, value)
项目:sidekick    作者:fabiommendes    | 项目源码 | 文件源码
def record_to_dict(record: Record):
    """
    Converts a record/namespace to a dictionary.

    Notes:
        If you want to convert a dict to a record, simply call ``record(**D)``.
    """
    if isinstance(record, SimpleNamespace):
        return dict(record.__dict__)
    return dict(record._items())
项目:tsrc    作者:SuperTanker    | 项目源码 | 文件源码
def push_args():
    args = types.SimpleNamespace()
    args.accept = False
    args.assignee = None
    args.force = False
    args.merge = False
    args.mr_title = None
    args.push_spec = None
    args.ready = False
    args.reviewers = None
    args.target_branch = "master"
    args.title = None
    args.wip = False
    return args
项目:clait    作者:guyzmo    | 项目源码 | 文件源码
def load(name):
    return SimpleNamespace(**_templates[name])

# autoload all templates
项目:hydrus    作者:mark-r-g    | 项目源码 | 文件源码
def read_config(cfgfile='settings.cfg', section='DEFAULT'):
    """Parse the Hydrus configuration file."""
    cfg = SimpleNamespace()
    parser = ConfigParser()
    parser.read(cfgfile)

    cfg.RAPIDCLUS = parser.getboolean(section, 'RAPIDCLUS')
    cfg.QUADRATURE = parser.getboolean(section, 'QUADRATURE')
    cfg.INFILE = parser.get(section, 'INFILE')
    cfg.MEASURE_SETTINGS = parser.get(section, 'MEASURE_SETTINGS')

    return cfg
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def dispatch_custom_completer(self, text):
        if not self.custom_completers:
            return

        line = self.line_buffer
        if not line.strip():
            return None

        # Create a little structure to pass all the relevant information about
        # the current completion to any custom completer.
        event = SimpleNamespace()
        event.line = line
        event.symbol = text
        cmd = line.split(None,1)[0]
        event.command = cmd
        event.text_until_cursor = self.text_until_cursor

        # for foo etc, try also to find completer for %foo
        if not cmd.startswith(self.magic_escape):
            try_magic = self.custom_completers.s_matches(
                self.magic_escape + cmd)
        else:
            try_magic = []

        for c in itertools.chain(self.custom_completers.s_matches(cmd),
                 try_magic,
                 self.custom_completers.flat_matches(self.text_until_cursor)):
            try:
                res = c(event)
                if res:
                    # first, try case sensitive match
                    withcase = [r for r in res if r.startswith(text)]
                    if withcase:
                        return withcase
                    # if none, then case insensitive ones are ok too
                    text_low = text.lower()
                    return [r for r in res if r.lower().startswith(text_low)]
            except TryNext:
                pass

        return None
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def get(self, edit_count):
        request = types.SimpleNamespace()
        request.user = UserFactory(edit_count=edit_count)
        request.method = "GET"
        return request
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def post(self, edit_count):
        request = types.SimpleNamespace()
        request.user = UserFactory(edit_count=edit_count)
        request.POST = {}
        request.method = "POST"
        return request
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def get(self):
        request = types.SimpleNamespace()
        request.method = "GET"
        return request
项目:osedev    作者:damoti    | 项目源码 | 文件源码
def post(self, post):
        request = types.SimpleNamespace()
        request.POST = post
        request.method = "POST"
        return request