添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

如何将SqlAlchemy结果序列化为JSON?

266 人关注

Django有一些很好的自动序列化功能,将从数据库返回的ORM模型序列化为JSON格式。

如何将SQLAlchemy查询结果序列化为JSON格式?

I tried jsonpickle.encode but it encodes query object itself. I tried json.dumps(items) but it returns

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON/XML真的这么难吗?难道没有任何默认的序列化器吗?现在对ORM查询结果进行序列化是很常见的工作。

我需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要将SQLAlchemy对象的查询结果以JSON/XML格式用于javascript数据鸟(JQGrid)。http://www.trirand.com/blog/)

4 个评论
这是一个对我有用的变通办法。 enter link description here
我必须警告你,序列化许多sqlalchemy模型(比如一个列表)会非常慢。如果你关心性能,请选择字典代替。
在表模型上创建一个包装器,并在其中定义to_dict方法,写入你想序列化的coloumn数据,并使用这个包装器从数据库中获取数据。
python
json
sqlalchemy
Zelid
Zelid
发布于 2011-02-17
30 个回答
charlax
charlax
发布于 2021-09-27
已采纳
0 人赞同

你可以直接把你的对象作为一个字典输出。

class User:
   def as_dict(self):
       return {c.name: getattr(self, c.name) for c in self.__table__.columns}

然后你用User.as_dict()来序列化你的对象。

如同在将sqlalchemy行对象转换为python dict

@charlax, 我是如何修复一个DateTime的?通过使用这个,当我做json.dumps时,我得到'datetime.datetime(2013, 3, 22, 16, 50, 11)不是JSON可序列化的'。
这是由 JSONEncoder 对象负责的。你可以对它进行子类化,为一些对象定义你自己的编码器,包括日期时间。请注意,例如, Flask ,支持开箱即用的JSON编码日期时间(最新版本)。
如果你使用sqlalchemy的 "声明 "方法,你可以在一个自定义的Base类中添加类似的东西--这是很方便的,因为你可以在你拥有的任何ORM对象上调用my_orm_object.toDict() 。同样,你可以定义一个.toJSON()方法,使用你的toDict方法和一个自定义编码器来处理日期、blobs等。
来支持日期时间。【替换代码0
对于Python 3的用户,@Shoham的回答需要做一个小小的改动。【替换代码0
Sasha B
Sasha B
发布于 2021-09-27
0 人赞同

A flat implementation

You could use something like this:

from sqlalchemy.ext.declarative import DeclarativeMeta
class AlchemyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            # an SQLAlchemy class
            fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                    json.dumps(data) # this will fail on non-encodable values, like other classes
                    fields[field] = data
                except TypeError:
                    fields[field] = None
            # a json-encodable dict
            return fields
        return json.JSONEncoder.default(self, obj)

然后使用转换为JSON。

c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)

它将忽略不可编码的字段(将其设置为 "无")。

它不会自动扩展关系(因为这可能导致自我引用,并永远循环)。

A recursive, non-circular implementation

然而,如果你宁愿永远循环,你可以使用。

from sqlalchemy.ext.declarative import DeclarativeMeta
def new_alchemy_encoder():
    _visited_objs = []
    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if obj in _visited_objs:
                    return None
                _visited_objs.append(obj)
                # an SQLAlchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    fields[field] = obj.__getattribute__(field)
                # a json-encodable dict
                return fields
            return json.JSONEncoder.default(self, obj)
    return AlchemyEncoder

然后用以下方式对对象进行编码。

print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)

这将编码所有的孩子,以及他们所有的孩子,以及他们所有的孩子......基本上,有可能对你的整个数据库进行编码。当它达到之前编码的东西时,它将把它编码为 "无"。

A recursive, possibly-circular, selective implementation

另一个选择,可能是更好的选择,就是能够指定你想扩展的字段。

def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
    _visited_objs = []
    class AlchemyEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)
                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                    val = obj.__getattribute__(field)
                    # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                    if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                        # unless we're expanding this field, stop here
                        if field not in fields_to_expand:
                            # not expanding this field: set it to None and continue
                            fields[field] = None
                            continue
                    fields[field] = val
                # a json-encodable dict
                return fields
            return json.JSONEncoder.default(self, obj)
    return AlchemyEncoder

你现在可以用以下方式调用它。

print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)

例如,为了只扩展名为 "父母 "的SQLAlchemy字段。

这是一个很好的回应,然而我得到一个 "无法编码 "BaseQuery",每当它遇到一个非平面方法的关系,有什么想法?
@SashaB 针对关系重复的情况进行更细化的定位如何?例如,如果我有 online_order address ,都与 user 有关系,但 online_order 也与 address 有关系。如果我想把所有这些都序列化,我就必须把 address 包括在 fields_to_expand 中,但是我不想重复序列化 address ,因为它与 user online_order 都有关系。
@BenKilah 让我猜猜,你使用的是Flask-SqlAlchemy,你的模型是继承自db.Model,而不是Base。如果是这样的话,请修改 for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']: ,使其成为 for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and not x.startswith('query')]: 。请记住,这个解决方案将阻止你拥有一个名为 "查询 "的属性/关系。
和我一样的方式,但要复杂得多。 stackoverflow.com/questions/7102754/...
你可以使用我的解决方案 github.com/n0nSmoker/SQLAlchemy-serializer
tom
tom
发布于 2021-09-27
0 人赞同

Python 3.7+ and Flask 1.1+ can use the built-in dataclasses package

from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy(app)
@dataclass
class User(db.Model):
  id: int
  email: str
  id = db.Column(db.Integer, primary_key=True, auto_increment=True)
  email = db.Column(db.String(200), unique=True)
@app.route('/users/')
def users():
  users = User.query.all()
  return jsonify(users)  
if __name__ == "__main__":
  users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
  db.create_all()
  db.session.add_all(users)
  db.session.commit()
  app.run()

现在/users/路线将返回一个用户列表。

{"email": "user1@gmail.com", "id": 1}, {"email": "user2@gmail.com", "id": 2}

Auto-serialize related models

@dataclass
class Account(db.Model):
  id: int
  users: User
  id = db.Column(db.Integer)
  users = db.relationship(User)  # User model would need a db.ForeignKey field

The response from jsonify(account) would be this.

"id":1, "users":[ "email":"user1@gmail.com", "id":1 "email":"user2@gmail.com", "id":2

Overwrite the default JSON Encoder

from flask.json import JSONEncoder
class CustomJSONEncoder(JSONEncoder):
  "Add support for serializing timedeltas"
  def default(o):
    if type(o) == datetime.timedelta:
      return str(o)
    elif type(o) == datetime.datetime:
      return o.isoformat()
    else:
      return super().default(o)
app.json_encoder = CustomJSONEncoder      
    
这看起来是一种正确的简单。它也适用于反序列化吗?
注意, id: int = Column 可以工作,但 id = Column 不行,看起来你必须为json声明静态类型来序列化字段,否则你会得到一个空的 {} 对象。
这对我来说很有效,为什么这不是公认的答案呢?我已经在app_context上玩了好几个小时,想让它在Flask-Marshmallow上工作。
对我来说也是有效的。注意,如果你是在Python 3.6上,你会想直接安装这个包。 pipenv install dataclasses 。然后就可以正常工作了。
jrc
为了让Pylance高兴,我不得不使用 cast() ,例如 id: int = cast(int, Column(Integer))
Nick Perkins
Nick Perkins
发布于 2021-09-27
0 人赞同

你可以像这样把RowProxy转换为一个dict。

 d = dict(row.items())

然后将其序列化为JSON(你必须为像datetime这样的值指定一个编码器)。 如果你只想要一条记录(而不是一个完整的相关记录的层次结构),这并不难。

json.dumps([(dict(row.items())) for row in rs])
    
JZ.
这对我使用db.engine.connect()作为con的自定义sql查询有效:rs = con.execute(sql)。
这要简单得多,而且可行。这个答案和公认的答案有什么区别?
Yasir Hantoush
Yasir Hantoush
发布于 2021-09-27
0 人赞同

I recommend using 棉花糖 .它允许你创建序列化器来表示你的模型实例,支持关系和嵌套对象。

下面是他们文档中的一个截然不同的例子。以ORM模型为例, Author

class Author(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    first = db.Column(db.String(80))
    last = db.Column(db.String(80))

A 棉花糖 schema for that class is constructed like this:

class AuthorSchema(Schema):
    id = fields.Int(dump_only=True)
    first = fields.Str()
    last = fields.Str()
    formatted_name = fields.Method("format_name", dump_only=True)
    def format_name(self, author):
        return "{}, {}".format(author.last, author.first)

...并像这样使用。

author_schema = AuthorSchema()
author_schema.dump(Author.query.first())

...将产生这样的输出。

"first": "Tim", "formatted_name": "Peters, Tim", "id": 1, "last": "Peters"

请看他们的全部Flask-SQLAlchemy实例.

A library called marshmallow-sqlalchemy specifically integrates SQLAlchemy and 棉花糖. In that library, the schema for the Author model described above looks like this:

class AuthorSchema(ModelSchema):
    class Meta:
        model = Author

该集成允许从SQLAlchemy Column类型中推断出字段类型。

棉花糖-sqlalchemy here.

我还发现 marshmallow-sqlalchemy.readthedocs.io/en/latest 这简化了模式的生成
phico
phico
发布于 2021-09-27
0 人赞同

你可以使用SqlAlchemy的自省,就像这样。

mysql = SQLAlchemy()
from sqlalchemy import inspect
class Contacts(mysql.Model):  
    __tablename__ = 'CONTACTS'
    id = mysql.Column(mysql.Integer, primary_key=True)
    first_name = mysql.Column(mysql.String(128), nullable=False)
    last_name = mysql.Column(mysql.String(128), nullable=False)
    phone = mysql.Column(mysql.String(128), nullable=False)
    email = mysql.Column(mysql.String(128), nullable=False)
    street = mysql.Column(mysql.String(128), nullable=False)
    zip_code = mysql.Column(mysql.String(128), nullable=False)
    city = mysql.Column(mysql.String(128), nullable=False)
    def toDict(self):
        return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }
@app.route('/contacts',methods=['GET'])
def getContacts():
    contacts = Contacts.query.all()
    contactsArr = []
    for contact in contacts:
        contactsArr.append(contact.toDict()) 
    return jsonify(contactsArr)
@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
    contact = Contacts.query.get(id)
    return jsonify(contact.toDict())

从这里的答案中得到启发: 将sqlalchemy行对象转换为python dict

就最小的解决方案而言,我将上面的toDict()与tom的CustomJSONEncoder(注意略作修改)结合起来,将日期转换成ISO格式。 啊,堆栈溢出的力量!
^^ 完全同意上面的说法!如此简约而优雅。令人难以置信的工作!
我在这段代码中找到了神奇的成分 contact.toDict()
kolypto
kolypto
发布于 2021-09-27
0 人赞同

Flask-JsonTools 包有一个实现 编码:JsonSerializableBase 你的模型的基类。

使用方法。

from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase
Base = declarative_base(cls=(JsonSerializableBase,))
class User(Base):

现在User模型神奇地可序列化了。

如果你的框架不是Flask,你可以只用获取代码

这只解决了一半的问题,因为它只序列化了一条记录。如何将整个查询结果序列化?
@SteveBennett use the jsontools' jsonapi 来对响应进行编码。这将自动对返回对象进行编码
I have a very simple sqlalchemy model, and I'm getting: TypeError: <ORM.State object at 0x03577A50> is not JSON serializable
通过在我的模型对象上明确调用__json__(),最终成功了: return my_object.__json__()
该库不能用于Flask 1.0及以上版本,因为Flask 1.0中不再支持 import flask.ext.whatever
Tjorriemorrie
Tjorriemorrie
发布于 2021-09-27
0 人赞同

出于安全考虑,你不应该返回模型的所有字段。我更喜欢有选择地选择它们。

Flask的json编码现在支持UUID、datetime和关系(并为flask_sqlalchemy添加了 query query_class db.Model 类)。我已经更新了编码器,如下。

app/json_encoder.py

    from sqlalchemy.ext.declarative import DeclarativeMeta
    from flask import json
    class AlchemyEncoder(json.JSONEncoder):
        def default(self, o):
            if isinstance(o.__class__, DeclarativeMeta):
                data = {}
                fields = o.__json__() if hasattr(o, '__json__') else dir(o)
                for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                    value = o.__getattribute__(field)
                        json.dumps(value)
                        data[field] = value
                    except TypeError:
                        data[field] = None
                return data
            return json.JSONEncoder.default(self, o)
  

app/__init__.py

# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder

有了这个,我可以选择性地添加一个__json__属性,返回我希望编码的字段列表。

app/models.py

class Queue(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
    song = db.relationship('Song', lazy='joined')
    type = db.Column(db.String(20), server_default=u'audio/mpeg')
    src = db.Column(db.String(255), nullable=False)
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())
    def __init__(self, song):
        self.song = song
        self.src = song.full_path
    def __json__(self):
        return ['song', 'src', 'type', 'created_at']

我把@jsonapi添加到我的视图中,返回结果列表,然后我的输出如下。

"created_at": "Thu, 23 Jul 2015 11:36:53 GMT", "song": "full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3", "id": 2, "path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3" "src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3", "type": "audio/mpeg"
很好!很好再一次证明,有时你不需要为每一个愚蠢的小任务提供一个庞大的包,学习DSL可能比用 "硬 "方法做更难。在来到这里之前,我看了很多很多JSON和REST包。的确,这仍然需要一个包。 语法 (to add @jsonapi to @app.route in views.py 等),但我喜欢它的简单性。我认为Flask添加了datetime而没有添加date,这很便宜,所以我自己把它添加到了 json_encoder.py : value=... ^ if isinstance(value, date): ^ data[field] = datetime.combine(value, time.min).isoformat() ^ else: ^ try:...
Patrick Mutuku
Patrick Mutuku
发布于 2021-09-27
0 人赞同

一个更详细的解释。 在你的模型中,添加。

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

替换代码1】是针对Python 3的,所以如果使用Python 2,请使用unicode()。它应该有助于反序列化日期。如果不处理这些问题,你可以把它删除。

现在你可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

First()是需要的,以避免奇怪的错误。as_dict()现在将对结果进行反序列化。在反序列化之后,就可以把它变成json了

jsonify(some_result)
    
Chris Modzelewski
Chris Modzelewski
发布于 2021-09-27
0 人赞同

虽然最初的问题可以追溯到一段时间前,但这里的答案(以及我自己的经验)表明,这是一个非难事,有很多不同的方法,其复杂性各不相同,有不同的取舍。

这就是为什么我建立了 ǞǞǞǞ 该库扩展了SQLAlchemy的声明式ORM,提供可配置的序列化/反序列化支持,你可能想看看。

该图书馆支持。

  • Python 2.7, 3.4, 3.5, and 3.6.
  • SQLAlchemy versions 0.9 and higher
  • serialization/de-serialization to/from JSON, CSV, YAML, and Python dict
  • serialization/de-serialization of columns/attributes, relationships, hybrid properties, and association proxies
  • enabling and disabling of serialization for particular formats and columns/relationships/attributes (e.g. you want to support an inbound password value, but never include an outbound one)
  • pre-serialization and post-deserialization value processing (for validation or type coercion)
  • a pretty straightforward syntax that is both Pythonic and seamlessly consistent with SQLAlchemy's own approach
  • 你可以在这里查看(我希望!)全面的文档。 https://sqlathanor.readthedocs.io/en/latest

    Hope this helps!

    Артем Федоров
    Артем Федоров
    发布于 2021-09-27
    0 人赞同

    自定义序列化和反序列化。

    "from_json" (类方法)基于json数据建立一个Model对象。

    "反序列化" 可以只在实例上调用,并将json的所有数据合并到Model实例中。

    "序列化" - 递归序列化

    __write_only__ 属性需要用来定义只写属性(例如 "password_hash")。

    class Serializable(object):
        __exclude__ = ('id',)
        __include__ = ()
        __write_only__ = ()
        @classmethod
        def from_json(cls, json, selfObj=None):
            if selfObj is None:
                self = cls()
            else:
                self = selfObj
            exclude = (cls.__exclude__ or ()) + Serializable.__exclude__
            include = cls.__include__ or ()
            if json:
                for prop, value in json.iteritems():
                    # ignore all non user data, e.g. only
                    if (not (prop in exclude) | (prop in include)) and isinstance(
                            getattr(cls, prop, None), QueryableAttribute):
                        setattr(self, prop, value)
            return self
        def deserialize(self, json):
            if not json:
                return None
            return self.__class__.from_json(json, selfObj=self)
        @classmethod
        def serialize_list(cls, object_list=[]):
            output = []
            for li in object_list:
                if isinstance(li, Serializable):
                    output.append(li.serialize())
                else:
                    output.append(li)
            return output
        def serialize(self, **kwargs):
            # init write only props
            if len(getattr(self.__class__, '__write_only__', ())) == 0:
                self.__class__.__write_only__ = ()
            dictionary = {}
            expand = kwargs.get('expand', ()) or ()
            prop = 'props'
            if expand:
                # expand all the fields
                for key in expand:
                    getattr(self, key)
            iterable = self.__dict__.items()
            is_custom_property_set = False
            # include only properties passed as parameter
            if (prop in kwargs) and (kwargs.get(prop, None) is not None):
                is_custom_property_set = True
                iterable = kwargs.get(prop, None)
            # loop trough all accessible properties
            for key in iterable:
                accessor = key
                if isinstance(key, tuple):
                    accessor = key[0]
                if not (accessor in self.__class__.__write_only__) and not accessor.startswith('_'):
                    # force select from db to be able get relationships
                    if is_custom_property_set:
                        getattr(self, accessor, None)
                    if isinstance(self.__dict__.get(accessor), list):
                        dictionary[accessor] = self.__class__.serialize_list(object_list=self.__dict__.get(accessor))
                    # check if those properties are read only
                    elif isinstance(self.__dict__.get(accessor), Serializable):
                        dictionary[accessor] = self.__dict__.get(accessor).serialize()
                    else:
                        dictionary[accessor] = self.__dict__.get(accessor)
            return dictionary
        
    chribsen
    chribsen
    发布于 2021-09-27
    0 人赞同

    Use the 内置的串行器 在SQLAlchemy中。

    from sqlalchemy.ext.serializer import loads, dumps
    obj = MyAlchemyObject()
    # serialize object
    serialized_obj = dumps(obj)
    # deserialize object
    obj = loads(serialized_obj)
    

    如果你要在会话之间转移对象,记得用session.expunge(obj)将对象从当前会话中分离出来。 要再次连接它,只需做session.add(obj)

    Nifty, but does not convert to JSON.
    对于JSON "序列化",请查看marshmallow-sqlalchemy . Definitely the best solution when you're exposing objects to clients. marshmallow-sqlalchemy.readthedocs.io
    thomasd
    串行器模块只适合于查询结构。它不需要用于:用户定义的类的实例。这些实例在典型情况下不包含对引擎、会话或表达式结构的引用,可以直接进行序列化。
    tahoe
    tahoe
    发布于 2021-09-27
    0 人赞同

    这里有一个解决方案,可以让你选择你想包括在你的输出中的关系,只要你想深入。 注意:这是一个完整的重写,以口令/字符串为参数,而不是以列表为参数,修正了一些东西。

    def deep_dict(self, relations={}):
        """Output a dict of an SA object recursing as deep as you want.
        Takes one argument, relations which is a dictionary of relations we'd
        like to pull out. The relations dict items can be a single relation
        name or deeper relation names connected by sub dicts
        Example:
            Say we have a Person object with a family relationship
                person.deep_dict(relations={'family':None})
            Say the family object has homes as a relation then we can do
                person.deep_dict(relations={'family':{'homes':None}})
                person.deep_dict(relations={'family':'homes'})
            Say homes has a relation like rooms you can do
                person.deep_dict(relations={'family':{'homes':'rooms'}})
                and so on...
        mydict =  dict((c, str(a)) for c, a in
                        self.__dict__.items() if c != '_sa_instance_state')
        if not relations:
            # just return ourselves
            return mydict
        # otherwise we need to go deeper
        if not isinstance(relations, dict) and not isinstance(relations, str):
            raise Exception("relations should be a dict, it is of type {}".format(type(relations)))
        # got here so check and handle if we were passed a dict
        if isinstance(relations, dict):
            # we were passed deeper info
            for left, right in relations.items():
                myrel = getattr(self, left)
                if isinstance(myrel, list):
                    mydict[left] = [rel.deep_dict(relations=right) for rel in myrel]
                else:
                    mydict[left] = myrel.deep_dict(relations=right)
        # if we get here check and handle if we were passed a string
        elif isinstance(relations, str):
            # passed a single item
            myrel = getattr(self, relations)
            left = relations
            if isinstance(myrel, list):
                mydict[left] = [rel.deep_dict(relations=None)
                                     for rel in myrel]
            else:
                mydict[left] = myrel.deep_dict(relations=None)
        return mydict
    

    因此,对于一个使用人/家庭/房屋/房间的例子......把它变成json,你需要的是

    json.dumps(person.deep_dict(relations={'family':{'homes':'rooms'}}))
        
    我认为把这个放在你的基类中就可以了,这样所有的对象都会有这个。我将把json编码留给你......
    注意,这个版本会得到所有的列表关系,所以要谨慎地提供有大量项目的关系...
    邢烽朔
    邢烽朔
    发布于 2021-09-27
    0 人赞同
    step1:
    class CNAME:
       def as_dict(self):
           return {item.name: getattr(self, item.name) for item in self.__table__.columns}
    step2:
    list = []
    for data in session.query(CNAME).all():
        list.append(data.as_dict())
    step3:
    return jsonify(list)
        
    没有任何解释的代码转储很少有帮助。Stack Overflow是关于学习的,而不是提供盲目复制和粘贴的片段。请注意 edit 你的问题,并解释它是如何比OP提供的更好地工作的。
    Toby
    Toby
    发布于 2021-09-27
    0 人赞同

    尽管这是个老帖子,也许我没有回答上面的问题,但我想谈谈我的序列化,至少它对我是有效的。

    我使用FastAPI、SqlAlchemy和MySQL,但我不使用ORM模型。

    # from sqlalchemy import create_engine
    # from sqlalchemy.orm import sessionmaker
    # engine = create_engine(config.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
    # SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    

    序列化代码

    import decimal import datetime def alchemy_encoder(obj): """JSON encoder function for SQLAlchemy special classes.""" if isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d %H:%M:%S") elif isinstance(obj, decimal.Decimal): return float(obj) import json from sqlalchemy import text # db is SessionLocal() object app_sql = 'SELECT * FROM app_info ORDER BY app_id LIMIT :page,:page_size' # The next two are the parameters passed in page = 1 page_size = 10 # execute sql and return a <class 'sqlalchemy.engine.result.ResultProxy'> object app_list = db.execute(text(app_sql), {'page': page, 'page_size': page_size}) # serialize res = json.loads(json.dumps([dict(r) for r in app_list], default=alchemy_encoder))

    如果它不起作用,请忽略我的答案。我指的是这里

    https://codeandlife.com/2014/12/07/sqlalchemy-results-to-json-the-easy-way/

    vishal
    vishal
    发布于 2021-09-27
    0 人赞同

    安装simplejson由 pip install simplejson 并创建一个类

    class Serialise(object):
        def _asdict(self):
            Serialization logic for converting entities using flask's jsonify
            :return: An ordered dictionary
            :rtype: :class:`collections.OrderedDict`
            result = OrderedDict()
            # Get the columns
            for key in self.__mapper__.c.keys():
                if isinstance(getattr(self, key), datetime):
                    result["x"] = getattr(self, key).timestamp() * 1000
                    result["timestamp"] = result["x"]
                else:
                    result[key] = getattr(self, key)
            return result
    

    并将这个类继承给每一个ORM类,这样这个_asdict函数就会被注册到每一个ORM类中,然后就会出现。 并在任何地方使用jsonify

    Keith
    Keith
    发布于 2021-09-27
    0 人赞同

    这不是那么简单的事。我写了一些代码来做这件事。我还在研究它,它使用MochiKit框架。它基本上使用一个代理和注册的JSON转换器在Python和Javascript之间翻译复合对象。

    数据库对象的浏览器端是 db.js 它需要基本的Python代理源,在 proxy.js .

    在Python一侧有一个基地 proxy module . 然后最后是SqlAlchemy对象编码器在 webserver.py . 它也依赖于在 "中国 "中找到的元数据提取器。 models.py file.

    从第一眼看去相当复杂......我需要的是--以JSON/XML格式获取SQLAlchemy对象的查询结果,并将其用于javascript数据库(JQGrid trirand.com/blog )
    有时问题比你第一眼看到的要复杂......这是处理作为外键返回的对象,并试图避免发生在深度嵌套关系中的无限递归。 然而,你也许可以写一些只返回基本类型的自定义查询,并直接用simplejson进行序列化。
    好吧,也许我真的会使用SQLAlchemy来查询数据,并将利用ORM的好处来执行保存/更新动作。
    hpatel826
    hpatel826
    发布于 2021-09-27
    0 人赞同
    def alc2json(row):
        return dict([(col, str(getattr(row,col))) for col in row.__table__.columns.keys()])
    

    我想我要用这个打一下代码高尔夫。

    参考:我正在使用自动化基地 since we have a separately designed schema according to business requirements. I just started using SQLAlchemy today but the documentation states that 自动化基地 is an extension to declarative_base which seems to be the typical paradigm in the SQLAlchemy ORM so I believe this should work.

    它不会因为以下的外键而变得花哨。Tjorriemorrie的解决方案,但它只是将列与值相匹配,并通过str()处理列值来处理Python类型。我们的值由Python datetime.time和decimal.Decimal类的结果组成,所以它能完成工作。

    希望这对任何路人有帮助!

    Rahul Shelke
    Rahul Shelke
    发布于 2021-09-27
    0 人赞同

    我知道这是个很老的帖子。我采用了@SashaB给出的解决方案,并根据我的需要进行了修改。

    我在其中加入了以下内容。

  • Field ignore list: A list of fields to be ignored while serializing
  • Field replace list: A dictionary containing field names to be replaced by values while serializing.
  • Removed methods and BaseQuery getting serialized
  • 我的代码如下。

    def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
       Serialize SQLAlchemy result into JSon
       :param revisit_self: True / False
       :param fields_to_expand: Fields which are to be expanded for including their children and all
       :param fields_to_ignore: Fields to be ignored while encoding
       :param fields_to_replace: Field keys to be replaced by values assigned in dictionary
       :return: Json serialized SQLAlchemy object
       _visited_objs = []
       class AlchemyEncoder(json.JSONEncoder):
          def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                # don't re-visit self
                if revisit_self:
                    if obj in _visited_objs:
                        return None
                    _visited_objs.append(obj)
                # go through each field in this SQLalchemy class
                fields = {}
                for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
                    val = obj.__getattribute__(field)
                    # is this field method defination, or an SQLalchemy object
                    if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
                        field_name = fields_to_replace[field] if field in fields_to_replace else field
                        # is this field another SQLalchemy object, or a list of SQLalchemy objects?
                        if isinstance(val.__class__, DeclarativeMeta) or \
                                (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
                            # unless we're expanding this field, stop here
                            if field not in fields_to_expand:
                                # not expanding this field: set it to None and continue
                                fields[field_name] = None
                                continue
                        fields[field_name] = val
                # a json-encodable dict
                return fields
            return json.JSONEncoder.default(self, obj)
       return AlchemyEncoder
    

    Hope it helps someone!

    0 人赞同

    在Flask下,这可以工作并处理数据时间字段,将一个类型为
    'time': datetime.datetime(2018, 3, 22, 15, 40) into
    "time": "2018-03-22 15:40:00" :

    obj = {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}
    # This to get the JSON body
    return json.dumps(obj)
    # Or this to get a response object
    return jsonify(obj)
        
    Chirag Vora
    Chirag Vora
    发布于 2021-09-27
    0 人赞同

    以下代码将把sqlalchemy结果序列化为json。

    import json
    from collections import OrderedDict
    def asdict(self):
        result = OrderedDict()
        for key in self.__mapper__.c.keys():
            if getattr(self, key) is not None:
                result[key] = str(getattr(self, key))
            else:
                result[key] = getattr(self, key)
        return result
    def to_array(all_vendors):
        v = [ ven.asdict() for ven in all_vendors ]
        return json.dumps(v) 
    

    呼唤乐趣。

    def all_products():
        all_products = Products.query.all()
        return to_array(all_products)
        
    vineet agarwal
    vineet agarwal
    发布于 2021-09-27
    0 人赞同

    AlchemyEncoder非常好,但有时在处理十进制数值时失败。这里有一个改进的编码器,解决了小数点的问题------。

    class AlchemyEncoder(json.JSONEncoder):
    # To serialize SQLalchemy objects 
    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            model_fields = {}
            for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
                data = obj.__getattribute__(field)
                print data
                    json.dumps(data)  # this will fail on non-encodable values, like other classes
                    model_fields[field] = data
                except TypeError:
                    model_fields[field] = None
            return model_fields
        if isinstance(obj, Decimal):
            return float(obj)
        return json.JSONEncoder.default(self, obj)
        
    chandler9
    chandler9
    发布于 2021-09-27
    0 人赞同

    当使用sqlalchemy连接到数据库时,我这是一个简单的解决方案,可以高度配置。使用pandas。

    import pandas as pd
    import sqlalchemy
    #sqlalchemy engine configuration
    engine = sqlalchemy.create_engine....
    def my_function():
      #read in from sql directly into a pandas dataframe
      #check the pandas documentation for additional config options
      sql_DF = pd.read_sql_table("table_name", con=engine)
      # "orient" is optional here but allows you to specify the json formatting you require
      sql_json = sql_DF.to_json(orient="index")
      return sql_json
        
    0 人赞同

    (对 萨沙B的 真正优秀的答案)

    这特别是将日期时间对象转换为字符串,在原答案中会被转换为 None

    # Standard library imports
    from datetime import datetime
    import json
    # 3rd party imports
    from sqlalchemy.ext.declarative import DeclarativeMeta
    class JsonEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj.__class__, DeclarativeMeta):
                dict = {}
                # Remove invalid fields and just get the column attributes
                columns = [x for x in dir(obj) if not x.startswith("_") and x != "metadata"]
                for column in columns:
                    value = obj.__getattribute__(column)
                        json.dumps(value)
                        dict[column] = value
                    except TypeError:
                        if isinstance(value, datetime):
                            dict[column] = value.__str__()
                        else:
                            dict[column] = None
                return dict
            return json.JSONEncoder.default(self, obj)
        
    RobotHumans
    RobotHumans
    发布于 2021-09-27
    0 人赞同

    内置的序列化器因utf-8无法解码某些输入的无效起始字节而窒息。 相反,我采用了。

    def row_to_dict(row):
        temp = row.__dict__
        temp.pop('_sa_instance_state', None)
        return temp
    def rows_to_list(rows):
        ret_rows = []
        for row in rows:
            ret_rows.append(row_to_dict(row))
        return ret_rows
    @website_blueprint.route('/api/v1/some/endpoint', methods=['GET'])
    def some_api():
        /some_endpoint
        rows = rows_to_list(SomeModel.query.all())
        response = app.response_class(
            response=jsonplus.dumps(rows),
            status=200,
            mimetype='application/json'
        return response
        
    Andrex
    Andrex
    发布于 2021-09-27
    0 人赞同

    也许你可以使用这样的一个类

    from sqlalchemy.ext.declarative import declared_attr
    from sqlalchemy import Table
    class Custom:
        """Some custom logic here!"""
        __table__: Table  # def for mypy
        @declared_attr
        def __tablename__(cls):  # pylint: disable=no-self-argument
            return cls.__name__  # pylint: disable= no-member
        def to_dict(self) -> Dict[str, Any]:
            """Serializes only column data."""
            return {c.name: getattr(self, c.name) for c in self.__table__.columns}
    Base = declarative_base(cls=Custom)
    class MyOwnTable(Base):
        #COLUMNS!
    

    有了这个,所有对象都有to_dict的方法。

    jmunsch
    jmunsch
    发布于 2021-09-27
    0 人赞同

    在使用一些原始的sql和未定义的对象时,使用 cursor.description 似乎可以得到我所要的东西。

    with connection.cursor() as cur:
        print(query)
        cur.execute(query)
        for item in cur.fetchall():
            row = {column.name: item[i] for i, column in enumerate(cur.description)}
            print(row)
        
    KungFuLucky7
    KungFuLucky7
    发布于 2021-09-27
    0 人赞同

    这是一个 JSONEncoder 的版本,保留了模型的列序,只保留递归定义的列和关系字段。它还可以格式化大多数JSON不可序列化的类型。

    import json
    from datetime import datetime
    from decimal import Decimal
    import arrow
    from sqlalchemy.ext.declarative import DeclarativeMeta
    class SQLAlchemyJSONEncoder(json.JSONEncoder):
        SQLAlchemy ORM JSON Encoder
        If you have a "backref" relationship defined in your SQLAlchemy model,
        this encoder raises a ValueError to stop an infinite loop.
        def default(self, obj):
            if isinstance(obj, datetime):
                return arrow.get(obj).isoformat()
            elif isinstance(obj, Decimal):
                return float(obj)
            elif isinstance(obj, set):
                return sorted(obj)
            elif isinstance(obj.__class__, DeclarativeMeta):
                for attribute, relationship in obj.__mapper__.relationships.items():
                    if isinstance(relationship.__getattribute__("backref"), tuple):
                        raise ValueError(
                            f'{obj.__class__} object has a "backref" relationship '
                            "that would cause an infinite loop!"
                dictionary = {}
                column_names = [column.name for column in obj.__table__.columns]
                for key in column_names:
                    value = obj.__getattribute__(key)
                    if isinstance(value, datetime):
                        value = arrow.get(value).isoformat()
                    elif isinstance(value, Decimal):
                        value = float(value)
                    elif isinstance(value, set):
                        value = sorted(value)
                    dictionary[key] = value
                for key in [
                    attribute
                    for attribute in dir(obj)
                    if not attribute.startswith("_")
                    and attribute != "metadata"
                    and attribute not in column_names
                    value = obj.__getattribute__(key)
                    dictionary[key] = value
                return dictionary
            return super().default(obj)
        
    Zaffer
    Zaffer
    发布于 2021-09-27
    0 人赞同

    我已经成功地使用了这个包。 https://github.com/n0nSmoker/SQLAlchemy-serializer

    你可以直接在模型上这样做。

    from sqlalchemy_serializer import SerializerMixin
    class SomeModel(db.Model, SerializerMixin):
    

    它增加了to_dict,是完全递归的。

    item = SomeModel.query.filter(...).one()