Python jsonpickle 模块,dumps() 实例源码

我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用jsonpickle.dumps()

项目:xavier    作者:bepress    | 项目源码 | 文件源码
def test_background_queue():
    publish_event = MagicMock()
    publish_event.return_value = 'aaa'

    task_queue = TaskQueue(publish_event)

    @task_queue.task()
    def funcy():
        global funcy_called
        funcy_called += 1
        return "blah"

    assert funcy() == "blah"

    assert funcy.delay() is True

    event = jsonpickle.dumps((funcy.path, (), {}))

    publish_event.assert_called_once_with(event)

    task_queue.process_event(event)
    assert funcy_called == 2
项目:robograph    作者:csparpa    | 项目源码 | 文件源码
def to_dict(cls, node):
        """
        Creates a dict representation of a node
        :param node: datamodel.base.node.Node instance
        :return: dict
        """
        params = dict()
        for name, value in node._params.items():
            if callable(value):
                params[name] = jsonpickle.dumps(cloudpickle.dumps(value))
            else:
                params[name] = value
        return {'class': str(node.__class__),
                'name': node._name,
                'params': params,
                'output_label': node._output_label}
项目:django-herald    作者:worthwhile    | 项目源码 | 文件源码
def _get_encoded_attachments(self):
        attachments = self.get_attachments()

        if attachments:
            new_attachments = []

            for attachment in attachments:
                if isinstance(attachment, File):
                    attachment.seek(0)
                    new_attachments.append((attachment.name, attachment.read(), guess_type(attachment.name)[0]))
                else:
                    new_attachments.append(attachment)

            attachments = new_attachments

        return jsonpickle.dumps(attachments)
项目:incubator-ariatosca    作者:apache    | 项目源码 | 文件源码
def _execute(self, ctx):
        self._check_closed()

        # Temporary file used to pass arguments to the started subprocess
        file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json')
        os.close(file_descriptor)
        with open(arguments_json_path, 'wb') as f:
            f.write(pickle.dumps(self._create_arguments_dict(ctx)))

        env = self._construct_subprocess_env(task=ctx.task)
        # Asynchronously start the operation in a subprocess
        proc = subprocess.Popen(
            [
                sys.executable,
                os.path.expanduser(os.path.expandvars(__file__)),
                os.path.expanduser(os.path.expandvars(arguments_json_path))
            ],
            env=env)

        self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc)
项目:sanskrit_parser    作者:kmadathil    | 项目源码 | 文件源码
def site_map():
  output = []
  for rule in app.url_map.iter_rules():

    options = {}
    for arg in rule.arguments:
      options[arg] = "[{0}]".format(arg)

    methods = ','.join(rule.methods)
    url = url_for(rule.endpoint, **options)
    import urllib.request

    line = urllib.request.unquote("{:50s} {:20s} {}".format(rule.endpoint, methods, url))
    output.append(line)

  logging.info(str(output))
  response = app.response_class(
    response=jsonpickle.dumps(output),
    status=200,
    mimetype='application/json'
  )
  return response
项目:jyotisha    作者:sanskrit-coders    | 项目源码 | 文件源码
def site_map():
  output = []
  for rule in app.url_map.iter_rules():

    options = {}
    for arg in rule.arguments:
      options[arg] = "[{0}]".format(arg)

    methods = ','.join(rule.methods)
    url = url_for(rule.endpoint, **options)
    import urllib.request

    line = urllib.request.unquote("{:50s} {:20s} {}".format(rule.endpoint, methods, url))
    output.append(line)

  logging.info(str(output))
  response = app.response_class(
    response=jsonpickle.dumps(output),
    status=200,
    mimetype='application/json'
  )
  return response
项目:xavier    作者:bepress    | 项目源码 | 文件源码
def test_schedule_queue():
    publish_event = MagicMock()
    publish_event.return_value = 'aaa'

    task_queue = TaskQueue(publish_event)

    @task_queue.task(schedules=['bbb'])
    def funcy():
        return "blah"

    task_queue.process_schedule('bbb')
    event = jsonpickle.dumps((funcy.path, (), {}))

    publish_event.assert_called_once_with(event)
项目:xavier    作者:bepress    | 项目源码 | 文件源码
def test_sns_background_queue():

    with patch('xavier.aws.sns.send_sns_message') as mock_send_sns_message:
        mock_send_sns_message.return_value = {"MessageId": "1234"}
        publish_event = publish_sns_message("sns:topic")
        task_queue = TaskQueue(publish_event)

        @task_queue.task(schedules=['bbb'])
        def funcy():
            return "Blah"

        funcy.delay()
        event = jsonpickle.dumps((funcy.path, (), {}))
        mock_send_sns_message.assert_called_once_with(
            Message=event,
            TopicArn="sns:topic",
        )

        mock_background_func = MagicMock()
        mock_background_func.__name__ = "background_func"
        mock_background_func.__module__ = "testing"
        mock_background_func.return_value = "awesome"
        composed_mock = task_queue.task()(mock_background_func)

        event = jsonpickle.dumps((composed_mock.path, (), {}))
        sns_consumer = handle_sns_message(task_queue.process_event)
        sns_consumer({
            "Records": [{
                'Sns': {
                    'Message': event
                }
            }]
        }, {})

        mock_background_func.assert_called_once_with()
项目:xavier    作者:bepress    | 项目源码 | 文件源码
def delay(self, *args, **kwargs):
        event = jsonpickle.dumps((self.path, args, kwargs))
        if not self.publish_event:
            logger.error("This task has not yet been registered with a task queue")
            return False

        self.publish_event(event)

        return True
项目:robograph    作者:csparpa    | 项目源码 | 文件源码
def serialize(cls, node):
        """
        Creates a JSON representation of a node
        :param node: datamodel.base.node.Node instance
        :return: str
        """
        return json.dumps(cls.to_dict(node))
项目:robograph    作者:csparpa    | 项目源码 | 文件源码
def serialize(cls, graph):
        """
        Creates a JSON representation of a graph
        :param node: datamodel.base.graph.Graph instance
        :return: str
        """
        return json.dumps(cls.to_dict(graph))
项目:django-herald    作者:worthwhile    | 项目源码 | 文件源码
def send(self, raise_exception=False, user=None):
        """
        Handles the preparing the notification for sending. Called to trigger the send from code.
        If raise_exception is True, it will raise any exceptions rather than simply logging them.
        returns boolean whether or not the notification was sent successfully
        """
        context = self.get_context_data()

        recipients = self.get_recipients()

        if 'text' in self.render_types:
            text_content = self.render('text', context)
        else:
            text_content = None

        if 'html' in self.render_types:
            html_content = self.render('html', context)
        else:
            html_content = None

        sent_from = self.get_sent_from()
        subject = self.get_subject()
        extra_data = self.get_extra_data()

        sent_notification = SentNotification(
            recipients=','.join(recipients),
            text_content=text_content,
            html_content=html_content,
            sent_from=sent_from,
            subject=subject,
            extra_data=json.dumps(extra_data) if extra_data else None,
            notification_class=self.get_class_path(),
            attachments=self._get_encoded_attachments(),
            user=user,
        )

        return self.resend(sent_notification, raise_exception=raise_exception)
项目:keras-text    作者:raghakot    | 项目源码 | 文件源码
def dump(obj, file_name):
    if file_name.endswith('.json'):
        with open(file_name, 'w') as f:
            f.write(jsonpickle.dumps(obj))
        return

    if isinstance(obj, np.ndarray):
        np.save(file_name, obj)
        return

    # Using joblib instead of pickle because of http://bugs.python.org/issue11564
    joblib.dump(obj, file_name, protocol=pickle.HIGHEST_PROTOCOL)
项目:InplusTrader_Linux    作者:zhengwsh    | 项目源码 | 文件源码
def get_state(self):
        return jsonpickle.dumps([o.order_id for _, o in self._delayed_orders]).encode('utf-8')
项目:InplusTrader_Linux    作者:zhengwsh    | 项目源码 | 文件源码
def get_state(self):
        return jsonpickle.dumps([o.order_id for _, o in self._delayed_orders]).encode('utf-8')
项目:InplusTrader_Linux    作者:zhengwsh    | 项目源码 | 文件源码
def get_state(self):
        result = {}
        for key, obj in six.iteritems(self._objects):
            state = obj.get_state()
            if state is not None:
                result[key] = state

        return jsonpickle.dumps(result).encode('utf-8')
项目:InplusTrader_Linux    作者:zhengwsh    | 项目源码 | 文件源码
def get_state(self):
        return jsonpickle.dumps({
            'open_orders': [o.get_state() for account, o in self._open_orders],
            'delayed_orders': [o.get_state() for account, o in self._delayed_orders]
        }).encode('utf-8')
项目:InplusTrader_Linux    作者:zhengwsh    | 项目源码 | 文件源码
def get_state(self):
        return jsonpickle.dumps({
            'open_orders': [o.get_state() for account, o in self._open_orders],
            'delayed_orders': [o.get_state() for account, o in self._delayed_orders]
        }).encode('utf-8')
项目:incubator-ariatosca    作者:apache    | 项目源码 | 文件源码
def _send_message(connection, message):

    # Packing the length of the entire msg using struct.pack.
    # This enables later reading of the content.
    def _pack(data):
        return struct.pack(_INT_FMT, len(data))

    data = jsonpickle.dumps(message)
    msg_metadata = _pack(data)
    connection.send(msg_metadata)
    connection.sendall(data)
项目:Online-Book-Store    作者:siddhiparkar151992    | 项目源码 | 文件源码
def get(self):

        try:
            data = dao.get_all_categories()
            if data is not None:
                return jsonify({"categoryList": json.loads(jsonpickle.dumps(data, unpicklable=False))})
            else:
                raise
        except Exception as e:
            print("AllCategoryApi", e)
项目:PJON-python    作者:Girgitt    | 项目源码 | 文件源码
def publish(self, payload, channel=None):
        if channel is None:
            channel = self._pub_channel_name
        log.debug("publishing to channel: %s \n %s" % (channel, payload))
        try:
            payload = jsonpickle.dumps(payload)
        except ValueError:
            pass
        self._redis_conn.publish(channel, payload)
项目:rqalpha    作者:ricequant    | 项目源码 | 文件源码
def get_state(self):
        return jsonpickle.dumps({
            'open_orders': [o.get_state() for account, o in self._open_orders],
            'delayed_orders': [o.get_state() for account, o in self._delayed_orders]
        }).encode('utf-8')
项目:rqalpha    作者:ricequant    | 项目源码 | 文件源码
def get_state(self):
        result = {}
        for key, obj in six.iteritems(self._objects):
            state = obj.get_state()
            if state is not None:
                result[key] = state

        return jsonpickle.dumps(result).encode('utf-8')
项目:puppet    作者:Raytone-D    | 项目源码 | 文件源码
def get_state(self):
        return jsonpickle.dumps([o.order_id for _, o in self._delayed_orders]).encode('utf-8')
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def _send(self, data: Dict[str, object]):
        self.statsPublisher.send(jsonpickle.dumps(data))