Python dill 模块,dumps() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用dill.dumps()

项目:osbrain    作者:opensistemas-hub    | 项目源码 | 文件源码
def safe_call(self, method, *args, **kwargs):
        """
        A safe call to a method.

        A safe call is simply sent to be executed by the main thread.

        Parameters
        ----------
        method : str
            Method name to be executed by the main thread.
        *args : arguments
            Method arguments.
        *kwargs : keyword arguments
            Method keyword arguments.
        """
        if not self.running:
            raise RuntimeError(
                'Agent must be running to safely execute methods!')
        data = dill.dumps((method, args, kwargs))
        return self._loopback_reqrep('inproc://_loopback_safe', data)
项目:shelfdb    作者:nitipit    | 项目源码 | 文件源码
def handler(reader, writer):
    queries = await reader.read(-1)
    try:
        queries = dill.loads(queries)
        shelf = queries.pop(0)
        result = QueryHandler(db, shelf, queries).run()
        result = dill.dumps(result)
    except:
        print("Unexpected error:", sys.exc_info()[1])
        result = dill.dumps(sys.exc_info()[1])
        writer.write(result)
        await writer.drain()
        writer.close()
        raise
    writer.write(result)
    await writer.drain()
    writer.close()
项目:dataset    作者:analysiscenter    | 项目源码 | 文件源码
def deepcopy(self):
        """ Return a deep copy of the batch.

        Constructs a new ``Batch`` instance and then recursively copies all
        the objects found in the original batch, except the ``pipeline``,
        which remains unchanged.

        Returns
        -------
        Batch
        """
        pipeline = self.pipeline
        self.pipeline = None
        dump_batch = dill.dumps(self)
        self.pipeline = pipeline

        restored_batch = dill.loads(dump_batch)
        restored_batch.pipeline = pipeline
        return restored_batch
项目:sudkamp-langs-machines-python    作者:thundergolfer    | 项目源码 | 文件源码
def test_pickle(self):
        import sys
        if sys.version_info < (3, 4):
            import dill as pickle
        else:
            import pickle

        states = ['A', 'B', 'C', 'D']
        # Define with list of dictionaries
        transitions = [
            {'trigger': 'walk', 'source': 'A', 'dest': 'B'},
            {'trigger': 'run', 'source': 'B', 'dest': 'C'},
            {'trigger': 'sprint', 'source': 'C', 'dest': 'D'}
        ]
        m = Machine(states=states, transitions=transitions, initial='A')
        m.walk()
        dump = pickle.dumps(m)
        self.assertIsNotNone(dump)
        m2 = pickle.loads(dump)
        self.assertEqual(m.state, m2.state)
        m2.run()
项目:covertutils    作者:operatorequals    | 项目源码 | 文件源码
def __form_stage_from_function( init, work ) :
    ret = {}
    dict_ = {'init' : init, 'work' : work}
    try:                    # Python 3
        code = {'init' : init.__code__, 'work' : work.__code__}
    except AttributeError:  # Python 2
        code = {'init' : init.func_code, 'work' : work.func_code}
    ret['object'] = dict_
    ret['python'] = code
    try :
        marshaled = marshal.dumps(code)
    except ValueError:
        marshaled = None

    try :
        import dill
        dilled = dill.dumps(code)
    except ImportError:
        dilled = None
    ret['dill'] = dilled
    ret['marshal'] = marshaled

    return ret
项目:kq    作者:joowani    | 项目源码 | 文件源码
def test_enqueue_call(producer, logger):
    producer_cls, producer_inst = producer

    queue = Queue(hosts='host:7000', topic='foo', timeout=300)
    job = queue.enqueue(success_func, 1, 2, c=[3, 4, 5])

    assert isinstance(job, Job)
    assert isinstance(job.id, str)
    assert isinstance(job.timestamp, int)
    assert job.topic == 'foo'
    assert job.func == success_func
    assert job.args == (1, 2)
    assert job.kwargs == {'c': [3, 4, 5]}
    assert job.timeout == 300

    producer_inst.send.assert_called_with('foo', dill.dumps(job), key=None)
    logger.info.assert_called_once_with('Enqueued: {}'.format(job))
项目:kq    作者:joowani    | 项目源码 | 文件源码
def test_enqueue_call_with_key(producer, logger):
    producer_cls, producer_inst = producer

    queue = Queue(hosts='host:7000', topic='foo', timeout=300)
    job = queue.enqueue_with_key('bar', success_func, 1, 2, c=[3, 4, 5])

    assert isinstance(job, Job)
    assert isinstance(job.id, str)
    assert isinstance(job.timestamp, int)
    assert job.topic == 'foo'
    assert job.func == success_func
    assert job.args == (1, 2)
    assert job.kwargs == {'c': [3, 4, 5]}
    assert job.timeout == 300
    assert job.key == 'bar'

    producer_inst.send.assert_called_with('foo', dill.dumps(job), key='bar')
    logger.info.assert_called_once_with('Enqueued: {}'.format(job))
项目:lambdify    作者:ZhukovAlexander    | 项目源码 | 文件源码
def invoke(self, event, context, inv_type=None, log_type='None', version=None):
        """Invoke the lambda function This is basically a low-level lambda interface.
        In most cases, you won't need to use this by yourself.

        :param event: lambda input
        :param context: lambda execution client context
        :param inv_type: invocation type
        :param log_type: log type
        :param version: version
        """
        if not self._was_updated and self.create_options & UPDATE_LAZY == UPDATE_LAZY:
            self._create_or_update()

        params = dict(
                FunctionName=self.name,
                InvocationType=inv_type or self._inv_type,
                LogType=log_type,
                ClientContext=json.dumps(context),
                Payload=json.dumps(event),
        )
        if version:
            params['Qualifier'] = version

        return self.client.invoke(**params)
项目:artemis    作者:QUVA-Lab    | 项目源码 | 文件源码
def pickle_dumps_without_main_refs(obj):
    """
    Yeah this is horrible, but it allows you to pickle an object in the main module so that it can be reloaded in another
    module.
    :param obj:
    :return:
    """
    currently_run_file = sys.argv[0]
    module_path = file_path_to_absolute_module(currently_run_file)
    try:
        pickle_str = pickle.dumps(obj, protocol=0)
    except:
        print("Using Dill")
        # TODO: @petered There is something very fishy going on here that I don't understand.
        import dill
        pickle_str = dill.dumps(obj, protocol=0)

    pickle_str = pickle_str.replace('__main__', module_path)  # Hack!
    return pickle_str
项目:kaggle    作者:RankingAI    | 项目源码 | 文件源码
def SaveToPklFile(Data,OutputDir):

        df_train,df_test = Data

        if(os.path.exists(OutputDir) == False):
            os.makedirs(OutputDir)

        with open('%s/train.pkl' % OutputDir, 'wb') as o_file:
            pickle.dump(df_train, o_file, -1)
        o_file.close()

        max_bytes = 2 ** 31 - 1
        bytes_out = pickle.dumps(df_test)
        n_bytes = len(bytes_out)
        with open('%s/test.pkl' % OutputDir, 'wb') as o_file:
            for idx in range(0, n_bytes, max_bytes):
                o_file.write(bytes_out[idx:idx + max_bytes])
                # too big for pickle
                #pickle.dump(df_test, o_file, -1)
        o_file.close()

        # with open('%s/test.csv' % OutputDir, 'w') as o_file:
        #     o_file.write('%s\n' % (','.join(list(df_test.columns))))
        #     for idx in df_test.index:
        #         rec = [str(v) for v in df_test.ix[idx].values]
        #         o_file.write('%s\n' % (','.join(rec)))
        # o_file.close()
项目:onefl-deduper    作者:ufbmi    | 项目源码 | 文件源码
def apply_async(pool, fun, args, run_dill_encoded=run_dill_encoded):
    return pool.apply_async(run_dill_encoded, (dill.dumps((fun, args)),))
项目:catalearn    作者:Catalearn    | 项目源码 | 文件源码
def save_var_cloud(data_var, data_name):
    if not isinstance(data_name, str):
        print("data_name must be a string")
        return

    user_hash = settings.API_KEY
    data_buffer = io.BytesIO(dill.dumps(data_var))
    print('Uploading %s...' % data_name)

    url = 'http://%s/api/save/getUploadUrl' % settings.CATALEARN_URL
    r = requests.post(url, data={
        'type': 'variable',
        'user_hash': user_hash,
        'file_name': data_name
    })
    if r.status_code != 200:
        raise RuntimeError(r.text)

    presigned_url = r.content

    r = requests.put(presigned_url, data=data_buffer)

    if (r.status_code != 200):
        print("Error saving %s\: %s" % (data_name, r.content))
    else:
        print("Successfully uploaded %s" % data_name)
    return
项目:hgvm-builder    作者:BD2KGenomics    | 项目源码 | 文件源码
def set_executor(self, executor):
        """
        Set the given function to run in the promise. It will call its first
        argument with its result, or its second argument with an error.
        """

        # Pickle the function and save it
        self.executor_dill = dill.dumps(executor)
项目:hgvm-builder    作者:BD2KGenomics    | 项目源码 | 文件源码
def set_then_handler(self, then_handler):
        """
        Set the then handler for this promise. When the prev promise resolves,
        the then handler will be called with the result.

        """

        # Pickle the function and save it
        self.then_dill = dill.dumps(then_handler)
项目:osbrain    作者:opensistemas-hub    | 项目源码 | 文件源码
def _loopback(self, header, data=None):
        """
        Send a message to the loopback socket.
        """
        if not self.running:
            raise NotImplementedError()
        data = dill.dumps((header, data))
        return self._loopback_reqrep('inproc://loopback', data)
项目:gax-python    作者:googleapis    | 项目源码 | 文件源码
def add_done_callback(self, fn):  # pylint: disable=invalid-name
        """Enters a polling loop on OperationsClient.get_operation, and once the
        operation is done or cancelled, calls the function with this
        _OperationFuture. Added callables are called in the order that they were
        added.
        """
        if self._operation.done:
            _try_callback(self, fn)
        else:
            self._queue.put(dill.dumps(fn))
            if self._process is None:
                self._process = mp.Process(target=self._execute_tasks)
                self._process.start()
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def export_object(obj):
    import dill as pickle
    import base64
    return base64.b64encode(gzip.zlib.compress(pickle.dumps(obj,4),9)).decode('utf-8')
项目:django-estimators    作者:fridiculous    | 项目源码 | 文件源码
def persist(self):
        """a private method that persists an estimator object to the filesystem"""
        if self.object_hash:
            data = dill.dumps(self.object_property)
            f = ContentFile(data)
            self.object_file.save(self.object_hash, f, save=False)
            f.close()
            self._persisted = True
        return self._persisted
项目:pydecor    作者:mplanchard    | 项目源码 | 文件源码
def hashable(item):
    """Get return a hashable version of an item

    If the item is natively hashable, return the item itself. If
    it is not, return it dumped to a pickle string.
    """
    try:
        hash(item)
    except TypeError:
        item = pickle.dumps(item)
    return item
项目:Pandas-Farm    作者:medo    | 项目源码 | 文件源码
def serialize(obj):
    return dill.dumps(obj)
项目:chxanalys    作者:yugangzhang    | 项目源码 | 文件源码
def apply_async(pool, fun, args, callback=None):    
    return pool.apply_async( run_dill_encoded, (dill.dumps((fun, args)),), callback= callback)
项目:chxanalys    作者:yugangzhang    | 项目源码 | 文件源码
def map_async(pool, fun, args ):    
    return pool.map_async(run_dill_encoded,  (dill.dumps((fun, args)),))
项目:chxanalys    作者:yugangzhang    | 项目源码 | 文件源码
def apply_async(pool, fun, args, callback=None):    
    return pool.apply_async( run_dill_encoded, (dill.dumps((fun, args)),), callback= callback)
项目:chxanalys    作者:yugangzhang    | 项目源码 | 文件源码
def map_async(pool, fun, args ):    
    return pool.map_async(run_dill_encoded,  (dill.dumps((fun, args)),))
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def encode(self, x):
        return json.dumps(x)
项目:dataset    作者:analysiscenter    | 项目源码 | 文件源码
def mpc_some(item):
    print("some:",)
    dill.dumps(item)
    #print(type(item), item.images.ndim)



# Example of custom Batch class which defines some actions
项目:dataset    作者:analysiscenter    | 项目源码 | 文件源码
def some(self, item=None):
        print("some:", type(item))
        print(item)
        print("len", len(dill.dumps(item.as_tuple())))
        return mpc_some
项目:dataset    作者:analysiscenter    | 项目源码 | 文件源码
def _dump_blosc(self, ix, dst, components=None):
        """ Save blosc packed data to file """
        file_name = self._get_file_name(ix, dst, 'blosc')
        with open(file_name, 'w+b') as f:
            if self.components is None:
                components = (None,)
                item = (self[ix],)
            else:
                components = tuple(components or self.components)
                item = self[ix].as_tuple(components)
            data = dict(zip(components, item))
            f.write(blosc.compress(dill.dumps(data)))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def export_object(obj):
    import dill as pickle
    import base64
    return base64.b64encode(gzip.zlib.compress(pickle.dumps(obj,4),9)).decode('utf-8')
项目:elm    作者:ContinuumIO    | 项目源码 | 文件源码
def dumps(self, protocol=None, byref=None, fmode=None, recurse=None):
        '''pickle (dill) an object to a string
        '''
        getattr(self, '_close', lambda: [])()
        return dill.dumps(self, protocol=protocol,
                          byref=byref, fmode=fmode, recurse=recurse)
项目:ottertune    作者:cmu-db    | 项目源码 | 文件源码
def compress(workload_state):
        return zlib.compress(pickle.dumps(workload_state))
项目:sudkamp-langs-machines-python    作者:thundergolfer    | 项目源码 | 文件源码
def test_pickle(self):
        import sys
        if sys.version_info < (3, 4):
            import dill as pickle
        else:
            import pickle

        # go to non initial state B
        self.stuff.to_B()
        # pickle Stuff model
        dump = pickle.dumps(self.stuff)
        self.assertIsNotNone(dump)
        stuff2 = pickle.loads(dump)
        self.assertTrue(stuff2.machine.is_state("B"))
        # check if machines of stuff and stuff2 are truly separated
        stuff2.to_A()
        self.stuff.to_C()
        self.assertTrue(stuff2.machine.is_state("A"))
        thread = Thread(target=stuff2.forward)
        thread.start()
        # give thread some time to start
        time.sleep(0.01)
        # both objects should be in different states
        # and also not share locks
        begin = time.time()
        # stuff should not be locked and execute fast
        self.assertTrue(self.stuff.machine.is_state("C"))
        fast = time.time()
        # stuff2 should be locked and take about 1 second
        # to be executed
        self.assertTrue(stuff2.machine.is_state("B"))
        blocked = time.time()
        self.assertAlmostEqual(fast-begin, 0, delta=0.1)
        self.assertAlmostEqual(blocked-begin, 1, delta=0.1)


# Same as TestLockedTransition but with LockedHierarchicalMachine
项目:sudkamp-langs-machines-python    作者:thundergolfer    | 项目源码 | 文件源码
def test_pickle(self):
        import sys
        if sys.version_info < (3, 4):
            import dill as pickle
        else:
            import pickle

        states = ['A', 'B', {'name': 'C', 'children': ['1', '2', {'name': '3', 'children': ['a', 'b', 'c']}]},
          'D', 'E', 'F']
        transitions = [
            {'trigger': 'walk', 'source': 'A', 'dest': 'B'},
            {'trigger': 'run', 'source': 'B', 'dest': 'C'},
            {'trigger': 'sprint', 'source': 'C', 'dest': 'D'}
        ]
        m = self.stuff.machine_cls(states=states, transitions=transitions, initial='A')
        m.walk()
        dump = pickle.dumps(m)
        self.assertIsNotNone(dump)
        m2 = pickle.loads(dump)
        self.assertEqual(m.state, m2.state)
        m2.run()
        if State.separator in '_':
            m2.to_C_3_a()
            m2.to_C_3_b()
        else:
            m2.to_C.s3.a()
            m2.to_C.s3.b()
项目:kq    作者:joowani    | 项目源码 | 文件源码
def test_enqueue_job(producer, logger):
    producer_cls, producer_inst = producer

    queue = Queue(hosts='host:7000', topic='foo', timeout=300)

    old_job = Job(
        id='2938401',
        timestamp=int(time.time()),
        topic='bar',
        func=failure_func,
        args=[1, 2],
        kwargs={'a': 3},
        timeout=100,
    )
    new_job = queue.enqueue(old_job)

    assert isinstance(new_job, Job)
    assert isinstance(new_job.id, str)
    assert isinstance(new_job.timestamp, int)
    assert old_job.id != new_job.id
    assert old_job.timestamp <= new_job.timestamp
    assert new_job.topic == 'foo'
    assert new_job.func == failure_func
    assert new_job.args == [1, 2]
    assert new_job.kwargs == {'a': 3}
    assert new_job.timeout == 300
    assert new_job.key is None

    producer_inst.send.assert_called_with(
        'foo', dill.dumps(new_job), key=None
    )
    logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))
项目:kq    作者:joowani    | 项目源码 | 文件源码
def test_enqueue_job_with_key(producer, logger):
    producer_cls, producer_inst = producer

    queue = Queue(hosts='host:7000', topic='foo', timeout=300)

    old_job = Job(
        id='2938401',
        timestamp=int(time.time()),
        topic='bar',
        func=failure_func,
        args=[1, 2],
        kwargs={'a': 3},
        timeout=100,
        key='bar',
    )
    new_job = queue.enqueue_with_key('baz', old_job)

    assert isinstance(new_job, Job)
    assert isinstance(new_job.id, str)
    assert isinstance(new_job.timestamp, int)
    assert old_job.id != new_job.id
    assert old_job.timestamp <= new_job.timestamp
    assert new_job.topic == 'foo'
    assert new_job.func == failure_func
    assert new_job.args == [1, 2]
    assert new_job.kwargs == {'a': 3}
    assert new_job.timeout == 300
    assert new_job.key == 'baz'

    producer_inst.send.assert_called_with(
        'foo', dill.dumps(new_job), key='baz'
    )
    logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))
项目:FeatureHub    作者:HDI-Project    | 项目源码 | 文件源码
def run_isolated(f, *args):
    """Execute `f(args)` in an isolated environment.

    First, uses dill to serialize the function. Unfortunately, pickle is unable
    to serialize some functions, so we must serialize and deserialize the
    function ourselves.
    """
    f_dill = dill.dumps(f)
    with Pool(1) as pool:
        return pool.apply(_get_function_and_execute, (f_dill, *args))
项目:pythonwhat    作者:datacamp    | 项目源码 | 文件源码
def getStreamPickle(name, process, shell):
    try:
        return pickle.dumps(get_env(shell.user_ns)[name])
    except:
        return None
项目:pythonwhat    作者:datacamp    | 项目源码 | 文件源码
def getStreamDill(name, process, shell):
    try:
        return dill.dumps(get_env(shell.user_ns)[name])
    except:
        return None
项目:pythonwhat    作者:datacamp    | 项目源码 | 文件源码
def getRepresentation(name, process):
    obj_class = getClass(name, process)
    converters = pythonwhat.State.State.root_state.converters
    if obj_class in converters:
        repres = convert(name, dill.dumps(converters[obj_class]), process)
        if (errored(repres)):
            return ReprFail("manual conversion failed")
        else: 
            return repres
    else:
        # first try to pickle
        try:
            stream = getStreamPickle(name, process)
            if not errored(stream): return pickle.loads(stream)
        except: 
            pass

        # if it failed, try to dill
        try:
            stream = getStreamDill(name, process)
            if not errored(stream): return dill.loads(stream)
            return ReprFail("dilling inside process failed for %s - write manual converter" % obj_class)
        except PicklingError:
            return ReprFail("undilling of bytestream failed with PicklingError - write manual converter")
        except Exception as e:
            return ReprFail("undilling of bytestream failed for class %s - write manual converter."
                            "Error: %s - %s" % (obj_class, type(e), e))
项目:cess    作者:frnsys    | 项目源码 | 文件源码
def dumps(x):
    """serialize python object(s)"""
    try:
        return dill.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
    except Exception as e:
        logger.info("Failed to serialize %s", x)
        logger.exception(e)
        raise
项目:cess    作者:frnsys    | 项目源码 | 文件源码
def write(stream, msg):
    """write data to a stream"""
    msg = dumps(msg)
    yield stream.write(msg + sentinel)
项目:python-persistent-queue    作者:philipbl    | 项目源码 | 文件源码
def setup_method(self):
        import dill

        random = str(uuid.uuid4()).replace('-', '')
        filename = '{}_{}'.format(self.__class__.__name__, random)
        self.queue = PersistentQueue(filename,
                                     loads=dill.loads,
                                     dumps=dill.dumps)
项目:python-persistent-queue    作者:philipbl    | 项目源码 | 文件源码
def setup_method(self):
        import msgpack

        random = str(uuid.uuid4()).replace('-', '')
        filename = '{}_{}'.format(self.__class__.__name__, random)
        self.queue = PersistentQueue(filename,
                                     loads=msgpack.unpackb,
                                     dumps=msgpack.packb)
项目:kubeface    作者:hammerlab    | 项目源码 | 文件源码
def check(obj):
    if not CHECK_SERIALIZATION:
        return
    try:
        dill.loads(dill.dumps(obj))
    except Exception as e:
        logging.error(
            "Couldn't serialize: %s\n'%s'\nBad objects:\n%s" % (
                str(obj), str(e), dill.detect.badobjects(obj, depth=2)))
        raise
项目:kubeface    作者:hammerlab    | 项目源码 | 文件源码
def dumps(obj):
    check(obj)
    return dill.dumps(obj, protocol=PICKLE_PROTOCOL)
项目:plasma    作者:jnkh    | 项目源码 | 文件源码
def get_unique_id(self):
        num_epochs = self.conf['training']['num_epochs']
        this_conf = deepcopy(self.conf)
        #don't make hash dependent on number of epochs.
        this_conf['training']['num_epochs'] = 0
        unique_id =  hash(dill.dumps(this_conf))
        return unique_id
项目:Flow-Guided-Feature-Aggregation    作者:msracver    | 项目源码 | 文件源码
def apply_async(pool,fun,args):
    payload=dill.dumps((fun,args))
    return pool.apply_async(run_dill_encode,(payload,))
项目:pyabc    作者:neuralyzer    | 项目源码 | 文件源码
def __init__(self, map=map, mapper_pickles=False):
        super().__init__()
        self.map = map
        self.pickle, self.unpickle = ((identity, identity)
                                      if mapper_pickles
                                      else (pickle.dumps, pickle.loads))
项目:mbot    作者:michaelkuty    | 项目源码 | 文件源码
def save_state(self, state):
        """Save state"""

        with open(state.state_path, 'wb') as f:

            data = pickle.dumps(state, pickle.HIGHEST_PROTOCOL)

            if self.encrypt:
                data = self.encrypt_data(data)

            pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
项目:mbot    作者:michaelkuty    | 项目源码 | 文件源码
def save_state(self, state):
        """Save state"""

        data = pickle.dumps(state, pickle.HIGHEST_PROTOCOL)

        if self.encrypt:
            data = self.encrypt_data(data)

        return self.bucket.put_object(Key=state.state_path,
                                      Body=data)