Python copy 模块,deepcopy() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用copy.deepcopy()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def load_previous(self, path=None):
        """Load previous copy of config from disk.

        In normal usage you don't need to call this method directly - it
        is called automatically at object initialization.

        :param path:

            File path from which to load the previous config. If `None`,
            config is loaded from the default location. If `path` is
            specified, subsequent `save()` calls will write to the same
            path.

        """
        self.path = path or self.path
        with open(self.path) as f:
            self._prev_dict = json.load(f)
        for k, v in copy.deepcopy(self._prev_dict).items():
            if k not in self:
                self[k] = v
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def handle_results_collected(self, signal, sender, results, context, **kw):
        name_value_pairs = list(map(self.to_name_value_pair, results))
        self.ensure_unique_names(name_value_pairs)

        def get_data(*parts):
            d = self.data
            for p in parts:
                d.setdefault(p, {})
                d = d[p]
            return d

        def handle_result(name, result):
            d = get_data(sender.id_, sender.type_name)
            current = d.get(name, None)
            if current is None or current.value < result:
                # TODO: once serialization, no need to deepcopy
                d[name] = Result(value=result, context=copy.deepcopy(context))

        for name, result in name_value_pairs:
            handle_result(name, result)
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_val, exc_tb):
        self.before_exit()
        signal_responses = results_collected.send_robust(
            sender=self, results=self.get_results_to_send(),
            context=copy.deepcopy(context.current.data))
        if exc_type is None:
            for (receiver, response) in signal_responses:
                if isinstance(response,  BaseException):
                    orig_tb = ''.join(
                        traceback.format_tb(response.__traceback__))
                    error_msg = '{}{}: {}'.format(
                        orig_tb,
                        type(response).__name__,
                        str(response)
                    )
                    if hasattr(response, 'clone_with_more_info'):
                        new_exc = response.clone_with_more_info(
                            orig_tb=orig_tb)
                    else:
                        new_exc = type(response)(error_msg)
                    raise new_exc
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def list_kuryr_opts():
    """Return a list of oslo_config options available in Kuryr service.

    Each element of the list is a tuple. The first element is the name of the
    group under which the list of elements in the second element will be
    registered. A group name of None corresponds to the [DEFAULT] group in
    config files.

    This function is also discoverable via the 'kuryr' entry point under
    the 'oslo_config.opts' namespace.

    The purpose of this is to allow tools like the Oslo sample config file
    generator to discover the options exposed to users by Kuryr.

    :returns: a list of (group_name, opts) tuples
    """

    return ([(k, copy.deepcopy(o)) for k, o in _kuryr_k8s_opts] +
            lib_opts.list_kuryr_opts() + _options.list_opts())
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def get_item_label(self, item):
        """
        retrieve item label property
        :param item:    single item in JSON format
        :return:        label property
        """
        label = EMPTY_RESPONSE
        item_type = get_json_property(item, TYPE)
        if item_type == 'smartfield':
            custom_response_id_to_label_map = self.audit_custom_response_id_to_label_map()
            conditional_id = get_json_property(item, 'options', 'condition')
            if conditional_id:
                label = copy.deepcopy(smartfield_conditional_id_to_statement_map.get(conditional_id)) or EMPTY_RESPONSE
            for value in get_json_property(item, 'options', 'values'):
                label += '|'
                if value in standard_response_id_map.keys():
                    label += standard_response_id_map[value]
                elif value in custom_response_id_to_label_map.keys():
                    label += custom_response_id_to_label_map[value]
                else:
                    label += str(value)
                label += '|'
            return label
        else:
            return get_json_property(item, LABEL)
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_file_load(self):
        """
        Load the simple filters by file
        """
        entry = copy(self.template_entry)
        fb = NNTPFilterBase(paths=join(self.var_dir, 'simple.nrf'))

        # Our hash will start at 0 (Zero)
        assert len(fb._regex_hash) == 0

        # But now we meet our score
        entry['subject'] = 'A great video called "blah.avi"'
        assert fb.blacklist(**entry) == False

        entry['subject'] = 'A malicious file because it is "blah.avi.exe"'
        assert fb.blacklist(**entry) == True

        # Now load the directory; it should just find the same nrf file.
        fbd = NNTPFilterBase(paths=self.var_dir)
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_scoring_image_files(self):
        """
        Test that we correctly score image files
        """

        sf = NNTPSimpleFilter()
        entry = copy(self.template_entry)

        # Expected Score
        score = 15

        # Test against video files:
        for e in [ 'jpg', 'jpeg', 'gif', 'png', 'bmp' ]:

            entry['subject'] = 'What.A.Great.Image (1/1) ' +\
                    '"what.a.great.image.%s" Yenc (1/1)' % e

            assert sf.score(**entry) == score
项目:nova    作者:hubblestack    | 项目源码 | 文件源码
def _get_tags(data):
    ret = {}
    for toplist, toplevel in data.get('firewall', {}).iteritems():
        for audit_dict in toplevel:
            for audit_id, audit_data in audit_dict.iteritems():
                tags_dict = audit_data.get('data', {})
                tag = tags_dict.pop('tag')
                if tag not in ret:
                    ret[tag] = []
                formatted_data = copy.deepcopy(tags_dict)
                formatted_data['type'] = toplist
                formatted_data['tag'] = tag
                formatted_data['module'] = 'firewall'
                formatted_data.update(audit_data)
                formatted_data.pop('data')
                ret[tag].append(formatted_data)
    return ret
项目:django-magic-cards    作者:pbaranay    | 项目源码 | 文件源码
def test_update_text(self):
        with open(os.path.join(self.FIXTURES_DIR, 'eyes_in_the_skies.json')) as f:
            final_data = json.load(f)

        original_text = final_data['RTR']['cards'][0]['originalText']
        final_text = final_data['RTR']['cards'][0]['text']

        # Copy the data and munge it into its original state.
        original_data = copy.deepcopy(final_data)
        original_data['RTR']['cards'][0]['text'] = original_text

        # Import the original data.
        parse_data(original_data, ['RTR'])
        eyes_in_the_skies = Card.objects.first()
        self.assertEqual(eyes_in_the_skies.text, original_text)

        # Import the final, updated data.
        parse_data(final_data, ['RTR'])
        eyes_in_the_skies.refresh_from_db()
        self.assertEqual(eyes_in_the_skies.text, final_text)
项目:django-magic-cards    作者:pbaranay    | 项目源码 | 文件源码
def test_update_types(self):
        with open(os.path.join(self.FIXTURES_DIR, 'jackal_pup.json')) as f:
            final_data = json.load(f)

        # Copy the data and munge the types.
        original_data = copy.deepcopy(final_data)
        original_subtype = 'Hound'
        original_data['TMP']['cards'][0]['subtypes'] = [original_subtype]

        # Import the original data.
        parse_data(original_data, ['TMP'])
        jackal_pup = Card.objects.first()
        self.assertEqual(jackal_pup.subtypes.count(), 1)
        self.assertEqual(jackal_pup.subtypes.first().name, original_subtype)

        # Import the final, updated data.
        parse_data(final_data, ['TMP'])
        jackal_pup.refresh_from_db()
        self.assertEqual(jackal_pup.subtypes.count(), 1)
        self.assertEqual(jackal_pup.subtypes.first().name, 'Jackal')
        # The Hound subtype has been deleted.
        self.assertFalse(CardSubtype.objects.filter(name=original_subtype).exists())
项目:django-magic-cards    作者:pbaranay    | 项目源码 | 文件源码
def test_update_loyalty(self):
        """
        Simulates the upgrade process from version 0.2 to version 0.4.
        """
        with open(os.path.join(self.FIXTURES_DIR, 'vraska_the_unseen.json')) as f:
            final_data = json.load(f)

        # Copy the data and munge it to remove the loyalty.
        original_data = copy.deepcopy(final_data)
        del original_data['RTR']['cards'][0]['loyalty']

        # Import the original data.
        parse_data(original_data, ['RTR'])
        vraska = Card.objects.first()
        self.assertIsNone(vraska.loyalty)

        # Import the final, updated data.
        parse_data(final_data, ['RTR'])
        vraska.refresh_from_db()
        self.assertEqual(vraska.loyalty, 5)
项目:pytest-selenium-pdiff    作者:rentlytics    | 项目源码 | 文件源码
def test_pytest_report_header():
    old_settings = deepcopy(settings)

    settings['USE_IMAGEMAGICK'] = True
    settings['USE_PERCEPTUALDIFF'] = True

    assert 'ImageMagick' in pytest_report_header(None)

    settings['USE_IMAGEMAGICK'] = False

    assert 'perceptualdiff' in pytest_report_header(None)

    settings['USE_PERCEPTUALDIFF'] = False

    with pytest.raises(Exception) as e:
        pytest_report_header(None)

    settings.update(old_settings)
项目:networking-huawei    作者:openstack    | 项目源码 | 文件源码
def test_create_router_failure(self):
        router_info = copy.deepcopy(fake_router_object)
        router_info['router'].update({'status': 'ACTIVE',
                                      'id': fake_router_uuid})
        context = mock.Mock(current=fake_router_object)

        response = self._create_rest_response(requests.codes.bad_gateway)

        with mock.patch.object(ac_rest.RestClient, 'process_request',
                               return_value=response):
            with mock.patch.object(L3_NAT_db_mixin,
                                   'create_router',
                                   return_value=fake_router_db):
                acl3router = HuaweiACL3RouterPlugin()
                self.assertRaises(ml2_exc.MechanismDriverError,
                                  acl3router.create_router,
                                  context, router_info)
项目:networking-huawei    作者:openstack    | 项目源码 | 文件源码
def test_add_router_interface_key_error_exception(self):
        router_info = copy.deepcopy(fake_router_object)
        router_info['router'].update({'status': 'ACTIVE',
                                      'id': fake_router_uuid})
        context = mock.Mock(current=fake_router_object)

        interface_info = {'port_id': fake_port_id}

        del interface_info['port_id']

        with mock.patch.object(L3_NAT_db_mixin, 'get_router',
                               return_value=fake_router_db):
            with mock.patch.object(L3_NAT_with_dvr_db_mixin,
                                   'add_router_interface',
                                   return_value=interface_info):
                acl3router = HuaweiACL3RouterPlugin()
                self.assertRaises(KeyError,
                                  acl3router.add_router_interface,
                                  context,
                                  fake_router_db['id'],
                                  interface_info)
项目:picoCTF    作者:picoCTF    | 项目源码 | 文件源码
def get_problem_instance(pid, tid):
    """
    Returns the problem instance dictionary that can be displayed to the user.

    Args:
        pid: the problem id
        tid: the team id

    Returns:
        The problem instance
    """

    problem = deepcopy(get_problem(pid=pid, tid=tid))
    instance = get_instance_data(pid, tid)

    problem['solves'] = api.stats.get_problem_solves(pid=pid)

    problem.pop("instances")
    problem.update(instance)
    return problem
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def setType(props, prop_id, _type, typeCheck=True):
    '''
       change the property type for the element in the props sequence with id of prop_id
       This method returns a copy of props (does not modify the props input)
    '''
    if typeCheck:
        __typeCheck(props)
    if not properties.getTypeMap().has_key(_type):
        raise BadValue('Type "'+_type+'" does not exist')
    if _type == getType(props, prop_id, False):
        return props
    prop_idx = __getPropIdx(props, prop_id)
    ret_props = copy.deepcopy(props)
    if props[prop_idx].value._t._k == CORBA.tk_sequence:
        ret_props[prop_idx].value._t._d = (props[prop_idx].value._t._d[0], tvCode(_type), props[prop_idx].value._t._d[2])
    else:
        ret_props[prop_idx].value = properties.to_tc_value(props[prop_idx].value,_type)
    return ret_props
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def sendChangedPropertiesEvent(self):
        eventable_ids = []
        for prop_id in self._component._props.keys():
            prop_def = self._component._props.getPropDef(prop_id)
            if prop_def.isSendEventChange():
                newValue = self._component._props[prop_id]
                try:
                    oldValue = self._last_property_event_state[prop_id]
                    self._last_property_event_state[prop_id] = copy.deepcopy(newValue)
                except KeyError:
                    self._component._log.debug("Issuing event for the first time %s", prop_id)
                    self._last_property_event_state[prop_id] = copy.deepcopy(newValue)
                    eventable_ids.append(prop_id)
                else:
                    if prop_def.compareValues(oldValue, newValue):
                        self._component._log.debug("Issuing event for %s (%s != %s)", prop_id, oldValue, newValue)
                        eventable_ids.append(prop_id)
        self._component._log.debug("Eventing for properties %s", eventable_ids)
        self.sendPropertiesEvent(eventable_ids)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def load_previous(self, path=None):
        """Load previous copy of config from disk.

        In normal usage you don't need to call this method directly - it
        is called automatically at object initialization.

        :param path:

            File path from which to load the previous config. If `None`,
            config is loaded from the default location. If `path` is
            specified, subsequent `save()` calls will write to the same
            path.

        """
        self.path = path or self.path
        with open(self.path) as f:
            self._prev_dict = json.load(f)
        for k, v in copy.deepcopy(self._prev_dict).items():
            if k not in self:
                self[k] = v
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def load_previous(self, path=None):
        """Load previous copy of config from disk.

        In normal usage you don't need to call this method directly - it
        is called automatically at object initialization.

        :param path:

            File path from which to load the previous config. If `None`,
            config is loaded from the default location. If `path` is
            specified, subsequent `save()` calls will write to the same
            path.

        """
        self.path = path or self.path
        with open(self.path) as f:
            self._prev_dict = json.load(f)
        for k, v in copy.deepcopy(self._prev_dict).items():
            if k not in self:
                self[k] = v
项目:yolo_tensorflow    作者:hizhangp    | 项目源码 | 文件源码
def prepare(self):
        gt_labels = self.load_labels()
        if self.flipped:
            print('Appending horizontally-flipped training examples ...')
            gt_labels_cp = copy.deepcopy(gt_labels)
            for idx in range(len(gt_labels_cp)):
                gt_labels_cp[idx]['flipped'] = True
                gt_labels_cp[idx]['label'] = gt_labels_cp[idx]['label'][:, ::-1, :]
                for i in xrange(self.cell_size):
                    for j in xrange(self.cell_size):
                        if gt_labels_cp[idx]['label'][i, j, 0] == 1:
                            gt_labels_cp[idx]['label'][i, j, 1] = self.image_size - 1 - gt_labels_cp[idx]['label'][i, j, 1]
            gt_labels += gt_labels_cp
        np.random.shuffle(gt_labels)
        self.gt_labels = gt_labels
        return gt_labels
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def clustering_plot_func(chart, sample_properties, sample_data, plot_func, args=[], kwargs={}):
    if len(sample_properties['genomes']) > 1:
        return None

    analysis = sample_data.analysis
    if analysis is None:
        return None

    new_charts = []
    for clustering_key, clustering in analysis.clusterings.iteritems():
        kwargs['clustering'] = clustering
        kwargs['original_cluster_sizes'] = sample_data.original_cluster_sizes[clustering_key]
        kwargs['diff_expr'] = analysis.differential_expression[clustering_key]
        new_chart = plot_func(copy.deepcopy(chart), *args, **kwargs)
        if new_chart is not None:
            new_chart['filters'] = {ws_gex_constants.CLUSTERS_FILTER_TITLE: clustering.description}
            new_charts.append(new_chart)
    return new_charts
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def build_charts(sample_properties, chart_dicts, sample_data, module=None):
    modules = [module, globals()] if module else [globals()]

    filters = make_chart_filters(sample_properties, sample_data.analysis)

    charts = []
    for chart_dict in chart_dicts:
        chart_dict = copy.deepcopy(chart_dict)
        function = chart_dict.pop('function')
        for module in modules:
            f = module.get(function)
            if f is not None:
                break
        kwargs = chart_dict.pop('kwargs', {})

        new_chart_obj = f(chart_dict, sample_properties, sample_data, **kwargs)
        if new_chart_obj is None:
            continue

        new_charts = new_chart_obj if isinstance(new_chart_obj, list) else [new_chart_obj]
        charts.extend(new_charts)

    return charts, filters
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def build_web_summary_json(sample_properties, sample_data, pipeline):
    view = copy.deepcopy(sample_properties)

    metrics, alarms, charts, all_prefixes = get_constants_for_pipeline(pipeline)

    tables, alarms = build_tables(sample_properties, metrics, alarms, sample_data, all_prefixes=all_prefixes)
    if tables:
        view['tables'] = tables
    if alarms:
        view['alarms'] = alarms

    charts, filters = build_charts(sample_properties, charts,
                                   sample_data=sample_data)
    if charts:
        view['charts'] = charts
    if filters:
        view['filters'] = filters

    # Selected metrics that the web summary template needs
    info = build_info_dict(sample_properties, sample_data, pipeline)
    if info:
        view['info'] = info

    return view
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def collapse_recursive(self, buf, level):
        # print 'level:\t', level
        merged = self.collapse(self.buffer[level], buf)

        if level + 1 >= len(self.buffer):
            self.buffer.append([])
            self.b += 1

        using_tmp_merge = True
        if len(self.buffer[level + 1]) == 0:
            self.buffer[level + 1] = copy.deepcopy(merged)
            using_tmp_merge = False

        if not using_tmp_merge:
            return

        self.collapse_recursive(merged, level + 1)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _deepcopy_tuple(x, memo):
    y = []
    for a in x:
        y.append(deepcopy(a, memo))
    d = id(x)
    try:
        return memo[d]
    except KeyError:
        pass
    for i in range(len(x)):
        if x[i] is not y[i]:
            y = tuple(y)
            break
    else:
        y = x
    memo[d] = y
    return y
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _deepcopy_inst(x, memo):
    if hasattr(x, '__deepcopy__'):
        return x.__deepcopy__(memo)
    if hasattr(x, '__getinitargs__'):
        args = x.__getinitargs__()
        args = deepcopy(args, memo)
        y = x.__class__(*args)
    else:
        y = _EmptyClass()
        y.__class__ = x.__class__
    memo[id(x)] = y
    if hasattr(x, '__getstate__'):
        state = x.__getstate__()
    else:
        state = x.__dict__
    state = deepcopy(state, memo)
    if hasattr(y, '__setstate__'):
        y.__setstate__(state)
    else:
        y.__dict__.update(state)
    return y
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __deepcopy__(self, memo):
        """Return a deep copy of a set; used by copy module."""
        # This pre-creates the result and inserts it in the memo
        # early, in case the deep copy recurses into another reference
        # to this same set.  A set can't be an element of itself, but
        # it can certainly contain an object that has a reference to
        # itself.
        from copy import deepcopy
        result = self.__class__()
        memo[id(self)] = result
        data = result._data
        value = True
        for elt in self:
            data[deepcopy(elt, memo)] = value
        return result

    # Standard set operations: union, intersection, both differences.
    # Each has an operator version (e.g. __or__, invoked with |) and a
    # method version (e.g. union).
    # Subtle:  Each pair requires distinct code so that the outcome is
    # correct when the type of other isn't suitable.  For example, if
    # we did "union = __or__" instead, then Set().union(3) would return
    # NotImplemented instead of raising TypeError (albeit that *why* it
    # raises TypeError as-is is also a bit subtle).
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def _update_entire_object(self):
        if self.__class__.primary_key in self._dirty_attributes.keys() or self._model_unique_id is None:
            new_object_info = deepcopy(self._info)
            try:
                if not self._new_object_needs_primary_key:
                    del(new_object_info[self.__class__.primary_key])
            except Exception:
                pass
            log.debug("Creating a new {0:s} object".format(self.__class__.__name__))
            ret = self._cb.api_json_request(self.__class__._new_object_http_method, self.urlobject,
                                            data={self.info_key: new_object_info})
        else:
            log.debug("Updating {0:s} with unique ID {1:s}".format(self.__class__.__name__, str(self._model_unique_id)))
            ret = self._cb.api_json_request(self.__class__._change_object_http_method,
                                            self._build_api_request_uri(), data={self.info_key: self._info})

        return self._refresh_if_needed(ret)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def _update_object(self):
        if self.__class__.primary_key in self._dirty_attributes.keys() or self._model_unique_id is None:
            new_object_info = deepcopy(self._info)
            try:
                if not self._new_object_needs_primary_key:
                    del(new_object_info[self.__class__.primary_key])
            except Exception:
                pass
            log.debug("Creating a new {0:s} object".format(self.__class__.__name__))
            ret = self._cb.api_json_request(self.__class__._new_object_http_method, self.urlobject,
                                            data=new_object_info)
        else:
            log.debug("Updating {0:s} with unique ID {1:s}".format(self.__class__.__name__, str(self._model_unique_id)))
            http_method = self.__class__._change_object_http_method
            ret = self._cb.api_json_request(http_method,self._build_api_request_uri(http_method=http_method),
                                            data=self._info)

        return self._refresh_if_needed(ret)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def nested_derivations(self, style):
        # type: (Style) -> List[Style]
        options = [('BreakBeforeBraces', 'Custom')]
        nstyles = []
        for optionname, value in options:
            optdef = styledef_option(self.styledefinition, optionname)
            # We can only use this nested option if the clang version in use supports it.
            if optdef is None:
                continue
            if value not in option_configs(optdef):
                continue
            if style.get(optionname) != value:
                nstyle = Style(copy.deepcopy(style))
                set_option(nstyle, optionname, value)
                nstyles.append(nstyle)
        return nstyles
项目:genomedisco    作者:kundajelab    | 项目源码 | 文件源码
def shift_dataset(m,boundarynoise):
    if boundarynoise==0:
        return m
    nonzero_rows=np.where(m.any(axis=1))[0]
    small_m=copy.deepcopy(m)
    small_m=small_m[nonzero_rows,:]
    small_m=small_m[:,nonzero_rows]
    print small_m
    print 'roll'
    small_m=np.roll(small_m,boundarynoise,axis=0)
    print small_m
    print 'roll2'
    small_m=np.roll(small_m,boundarynoise,axis=1)
    print small_m
    outm=np.zeros(m.shape)
    for i_idx in range(len(nonzero_rows)):
        i=nonzero_rows[i_idx]
        for j_idx in range(i_idx,len(nonzero_rows)):
            j=nonzero_rows[j_idx]
            outm[i,j]=small_m[i_idx,j_idx]
            outm[j,i]=outm[i,j]
    return outm
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def test_get_pipeline_id(mock_get_properties, mock_get_details, mock_boto3):
    """Tests getting the pipeline ID from boto3"""
    test_pipelines = [{
        'pipelineIdList': [{
            "name": "Test Pipeline",
            "id": "1234"
        }, {
            "name": "Other",
            "id": "5678"
        }],
        "hasMoreResults": False
    }]
    generated = {"project": "test"}
    properties = copy.deepcopy(TEST_PROPERTIES)
    mock_get_details.return_value.data = generated
    mock_get_properties.return_value = properties
    mock_boto3.return_value.get_paginator.return_value.paginate.return_value = test_pipelines

    dp = AWSDataPipeline(app='test_app', env='test_env', region='us-east-1', prop_path='other')
    dp.get_pipeline_id()
    assert dp.pipeline_id == '1234'
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def _merge_mapping(a, b):
    """
    MERGE TWO MAPPINGS, a TAKES PRECEDENCE
    """
    for name, b_details in b.items():
        a_details = a[literal_field(name)]
        if a_details.properties and not a_details.type:
            a_details.type = "object"
        if b_details.properties and not b_details.type:
            b_details.type = "object"

        if a_details:
            a_details.type = _merge_type[a_details.type][b_details.type]

            if b_details.type in ES_STRUCT:
                _merge_mapping(a_details.properties, b_details.properties)
        else:
            a[literal_field(name)] = deepcopy(b_details)

    return a
项目:Google-FooBar    作者:FoxHub    | 项目源码 | 文件源码
def floyd(matrix):
    """
    Floyd's algorithm, straight from a textbook. Floyd's algorithm transforms a weight matrix
    into a matrix of shortest paths, such that the shortest path from node M to node N is
    equal to matrix[m][n]

    :return: An array of shortest-path distance calculations.
    """
    n = len(matrix)
    spaths = deepcopy(matrix)
    for k in range(n):
        for i in range(n):
            for j in range(n):
                if spaths[i][k] + spaths[k][j] < spaths[i][j]:
                    spaths[i][j] = spaths[i][k] + spaths[k][j]
    return spaths
项目:concierge    作者:9seconds    | 项目源码 | 文件源码
def configure_logging(debug=False, verbose=True, stderr=True):
    config = copy.deepcopy(LOG_CONFIG)

    for handler in config["handlers"].values():
        if verbose:
            handler["level"] = "INFO"
        if debug:
            handler["level"] = "DEBUG"

    if verbose:
        config["handlers"]["stderr"]["formatter"] = "verbose"
    if debug:
        config["handlers"]["stderr"]["formatter"] = "debug"

    if stderr:
        config["loggers"][LOG_NAMESPACE]["handlers"].append("stderr")

    logging.config.dictConfig(config)
项目:simple_rl    作者:david-abel    | 项目源码 | 文件源码
def agent_place(self, state):
        next_state = copy.deepcopy(state)

        agent = next_state.get_first_obj_of_class("agent")

        agent.set_attribute("has_block", 0)
        next_x = agent.get_attribute("x") + agent.get_attribute("dx")
        next_y = agent.get_attribute("y") + agent.get_attribute("dy")

        if self._is_lava_state_action(next_state, "forward"):
            lava_remove = 0
            for l in next_state.get_objects_of_class("lava"):
                if next_x == l.get_attribute("x") and next_y == l.get_attribute("y"):
                    break
                lava_remove += 1

            next_state.get_objects_of_class("lava").pop(lava_remove)
        else:
            new_block = {"x": next_x, "y": next_y}
            new_block_obj = self._make_oomdp_objs_from_list_of_dict([new_block], "block")
            next_state.get_objects_of_class("block").append(new_block_obj[0])

        return next_state
项目:simple_rl    作者:david-abel    | 项目源码 | 文件源码
def agent_pickup(self, state):
        '''
        Args:
            state (TaxiState)

        '''
        next_state = copy.deepcopy(state)

        agent = next_state.get_first_obj_of_class("agent")

        # update = False
        if agent.get_attribute("has_passenger") == 0:

            # If the agent does not have a passenger.
            for i, passenger in enumerate(next_state.get_objects_of_class("passenger")):
                if agent.get_attribute("x") == passenger.get_attribute("x") and agent.get_attribute("y") == passenger.get_attribute("y"):
                    # Pick up passenger at agent location.
                    agent.set_attribute("has_passenger", 1)
                    passenger.set_attribute("in_taxi", 1)

        return next_state
项目:simple_rl    作者:david-abel    | 项目源码 | 文件源码
def agent_dropoff(self, state):
        '''
        Args:
            state (TaxiState)

        Returns:
            (TaxiState)
        '''
        next_state = copy.deepcopy(state)

        # Get Agent, Walls, Passengers.
        agent = next_state.get_first_obj_of_class("agent")
        # agent = OOMDPObject(attributes=agent_att, name="agent")
        passengers = next_state.get_objects_of_class("passenger")

        if agent.get_attribute("has_passenger") == 1:
            # Update if the agent has a passenger.
            for i, passenger in enumerate(passengers):

                if passenger.get_attribute("in_taxi") == 1:
                    # Drop off the passenger.
                    passengers[i].set_attribute("in_taxi", 0)
                    agent.set_attribute("has_passenger", 0)

        return next_state
项目:deep_architect    作者:negrinho    | 项目源码 | 文件源码
def sample_models(self, nsamples):
        b = self.b_search

        samples = []
        choice_hists = []
        for _ in xrange(nsamples):
            bk = copy.deepcopy(b)
            bk.initialize(self.in_d, Scope())
            hist = []
            while( not bk.is_specified() ):
                name, vals = bk.get_choices()
                #print(name, vals)
                assert len(vals) > 1
                choice_i = np.random.randint(0, len(vals))
                bk.choose(choice_i)
                hist.append(choice_i)

            # keep the sampled model once specified.
            samples.append(bk)
            choice_hists.append(hist)

        return (samples, choice_hists)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def resource_map():
    '''
    Dynamically generate a map of resources that will be managed for a single
    hook execution.
    '''
    resource_map = deepcopy(BASE_RESOURCE_MAP)
    return resource_map
项目:RandTerrainPy    作者:jackromo    | 项目源码 | 文件源码
def points(self):
        """List[tuple(int, int)]: List of all points to define regions around."""
        return copy.deepcopy(self._points)  # list is mutable, don't allow user to alter it
项目:alfred-mpd    作者:deanishe    | 项目源码 | 文件源码
def _load(self):
        """Load cached settings from JSON file `self._filepath`."""
        self._nosave = True
        d = {}
        with open(self._filepath, 'rb') as file_obj:
            for key, value in json.load(file_obj, encoding='utf-8').items():
                d[key] = value
        self.update(d)
        self._original = deepcopy(d)
        self._nosave = False
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def _pop_token(self, lineno: int, token_value: str) -> Token:
        tokensline = self._lines[lineno - 1]

        # Pop the first token with the same name in the same line
        for t in tokensline:

            if t.name != 'STRING':
                line_value = t.value
            else:
                if t.value[0] == 'f' and t.value[1] in ('"', "'"):
                    # fstring: token identify as STRING but they parse into the AST as a
                    # collection of nodes so the token_value is  different. To find the
                    # real token position we'll search  inside the fstring token value.
                    tok_subpos = t.value.find(token_value)
                    if tok_subpos != -1:

                        # We don't remove the fstring token from the line in this case; other
                        # nodes could match different parts of it
                        newtok = deepcopy(t)
                        newtok.start.col = t.start.col + tok_subpos
                        return newtok

                    raise TokenNotFoundException("Could not find token '{}' inside f-string '{}'"
                            .format(token_value, t.value))
                else:
                    # normal string; they include the single or double quotes so we liteval
                    line_value = literal_eval(t.value)

            if str(line_value) == str(token_value):
                tokensline.remove(t)
                return t

        raise TokenNotFoundException("Token named '{}' not found in line {}"
                .format(token_value, lineno))
项目:robot-arena    作者:kenganong    | 项目源码 | 文件源码
def __deepcopy__(self, memo):
    cls = self.__class__
    result = cls.__new__(cls)
    memo[id(self)] = result
    result.floor = deepcopy(self.floor, memo)
    if self.content == None:
      result.content = None
    else:
      result.content = dict()
      for k, v in self.content.items():
        if k not in ['move', MEMORY]:
          result.content[k] = deepcopy(v, memo)
    return result
项目:robot-arena    作者:kenganong    | 项目源码 | 文件源码
def __deepcopy__(self, memo):
    cls = self.__class__
    result = cls.__new__(cls)
    memo[id(self)] = result
    for k, v in self.__dict__.items():
      if k != 'ai': # Don't copy the ai
        setattr(result, k, deepcopy(v, memo))
    return result
项目:robot-arena    作者:kenganong    | 项目源码 | 文件源码
def log_state(state):
  if config.interactive:
    print(state)
    input()
  if config.save_replay:
    replay['states'].append(copy.deepcopy(state))
项目:pylspm    作者:lseman    | 项目源码 | 文件源码
def __init__(self, data_, n_clusters):
        self.position = []
        if not self.position:
            for i in range(len(data_)):
                self.position.append(random.randrange(n_clusters))
        print(self.position)

        self.velocity = [0 for clusterPoint in self.position]
        self.S = [0 for clusterPoint in self.position]
        self.best = deepcopy(self.position)
        self.bestfit = 0
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_blacklist_bad_files(self):
        """
        Blacklist testing of bad files
        """

        sf = NNTPSimpleFilter()
        entry = copy(self.template_entry)

        # hash table always starts empty and is populated on demand
        assert len(sf._regex_hash) == 0

        # Test against bad file extensions:
        for e in [ 'exe', 'pif', 'application', 'gadget', 'msi', 'msp', 'com',
                  'scr', 'hta', 'cpl', 'msc', 'jar', 'bat', 'vb', 'vbs',
                  # Encrypted VBE Script file
                  'vbe',
                  # Javascript (Windows can execute these outside of browsers)
                  # so treat it as bad
                  'js', 'jse',
                  # Windows Script File
                  'ws', 'wsf',
                  # Windows PowerShell Scripts
                  'ps1', 'ps1xml', 'ps2', 'ps2xml', 'psc1', 'psc2',
                  # Monad Scripts (later renamed to Powershell)
                  'msh', 'msh1', 'msh1xml', 'msh2', 'msh2xml',
                  # Windows Explorer Command file
                  'scf',
                  # A link to a program on your computer (usually
                  # populated with some malicious content)
                  'lnk',
                  # A text file used by AutoRun
                  'inf',
                  # A windows registry file
                  'reg',
                 ]:

            entry['subject'] = 'What.A.Great.Show (1/1) ' +\
                    '"what.a.great.show.%s" Yenc (1/1)' % e

            assert sf.blacklist(**entry) == True
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_scoring_video_files(self):
        """
        Test that we correctly score video files
        """

        sf = NNTPSimpleFilter()
        entry = copy(self.template_entry)

        # Expected Score
        score = 25

        # Test against video files:
        for e in [ 'avi', 'mpeg', 'mpg', 'mp4', 'mov', 'mkv', 'asf',
                  'ogg', 'iso', 'rm' ]:

            entry['subject'] = 'What.A.Great.Show (1/1) ' +\
                    '"what.a.great.show.%s" Yenc (1/1)' % e

            assert sf.score(**entry) == score

            # now test that we can support .??? extensions after
            # the initial one
            for i in range(1000):
                entry['subject'] = 'What.A.Great.Show (1/1) ' +\
                        '"what.a.great.show.%s.%.3d" Yenc (1/1)' % (e, i)
                assert sf.score(**entry) == score
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def test_scoring_compressed_files(self):
        """
        Test that we correctly score compressed files
        """

        sf = NNTPSimpleFilter()
        entry = copy(self.template_entry)

        # Expected Score
        score = 25

        # Test against video files:
        for e in [ 'rar', '7z', 'zip', 'tgz', 'tar.gz']:

            entry['subject'] = 'What.A.Great.Archive (1/1) ' +\
                    '"what.a.great.archive.%s" Yenc (1/1)' % e

            assert sf.score(**entry) == score

            # now test that we can support .??? extensions after
            # the initial one
            for i in range(1000):
                entry['subject'] = 'What.A.Great.Archive (1/1) ' +\
                        '"what.a.great.archive.%s.%.3d" Yenc (1/1)' % (e, i)
                assert sf.score(**entry) == score

        # Test Sub Rar and Zip files (R?? and Z??)
        for e in [ 'r', 'z' ]:
            for i in range(100):
                entry['subject'] = 'What.A.Great.Archive (1/1) ' +\
                    '"what.a.great.archive.%s%.2d" Yenc (1/1)' % (e, i)

                assert sf.score(**entry) == score
                for ii in range(1000):
                    entry['subject'] = 'What.A.Great.Archive (1/1) ' +\
                        '"what.a.great.archive.%s%.2d.%.3d" Yenc (1/1)' % (
                            e, i, ii)
                    assert sf.score(**entry) == score