#! /usr/bin/env python
#coding=utf-8
#sqlalchemy的session用法
from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
fullname = Column(String)
password = Column(String)
def __init__(self,name, fullname, password):
self.name = name
self.fullname = fullname
self.password = password
def __repr__(self):
return "" % (self.name, self.fullname, self.password)
engine = create_engine("sqlite:///test.db",echo=True)
Session = sessionmaker(bind=engine)
session = Session()
#也可以通过Session.configure(bind=engine)实现
ed_user = User('ed', 'Ed Jones', 'edspassword')
#此时数据还未添加,直到执行flush或query时候
session.add(ed_user)
session.add_all([
User('wendy', 'Wendy Williams', 'foobar'),
User('mary', 'Mary Contrary', 'xxg527'),
User('fred', 'Fred Flinstone', 'blah')])
#查询第一个符合条件的
our_user = session.query(User).filter_by(name='ed').first()
session.query(User).filter(User.name.in_(['mary', 'wendy'])).all()
print our_user.fullname
session.commit()
for instance in session.query(User).order_by(User.id):
print instance.name, instance.fullname
for name, fullname in session.query(User.name, User.fullname):
print name,fullname
for row in session.query(User, User.name).all():
print row.User,row.name
for u in session.query(User).order_by(User.id)[1:3]:
print u
for name, in session.query(User.name).filter_by(fullname='Ed Jones'):
print name
for user in session.query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones'):
print user
#使用绑定变量形式
#session.query(User).filter("id<:value and name=:name").params(value=224, name='fred').order_by(User.id).one()
#使用整个sql语句
#session.query(User).from_statement("SELECT * FROM users where name=:name").params(name='ed').all()
#使用位列
#session.query("id", "name", "thenumber12").from_statement("SELECT id, name, 12 as thenumber12 FROM users where name=:name").params(name='ed').all()
#查询个数
#session.query(User).filter(User.name.like('%ed')).count()
#from sqlalchemy import func
#session.query(func.count(User.name), User.name).group_by(User.name).all()
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relation, backref
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relation(User, backref=backref('addresses', order_by=id))
def __init__(self, email_address):
self.email_address = email_address
def __repr__(self):
return "
" % self.email_address
metadata = Base.metadata
metadata.create_all(engine)
jack = User('jack', 'Jack Bean', 'gjffdd')
jack.addresses = [Address(email_address='jack@google.com'), Address(email_address='j25@yahoo.com')]
session.add(jack)
session.commit()
jack = session.query(User).filter_by(name='jack').one()
print jack.addresses
#连接查询
for u, a in session.query(User, Address).filter(User.id==Address.user_id).\
filter(Address.email_address=='jack@google.com').all():
print u,a
#from sqlalchemy.orm import join
#session.query(User).select_from(join(User, Address)).\
# filter(Address.email_address=='jack@google.com').all()
#子查询
stmt = session.query(Address).filter(Address.email_address != 'j25@yahoo.com').subquery()
adalias = aliased(Address, stmt)
for user, address in session.query(User, adalias).join((adalias, User.addresses)):
print user,address
#查询
from sqlalchemy.sql import exists
stmt = exists().where(Address.user_id==User.id)
for name, in session.query(User.name).filter(stmt):
print name
session.delete(jack)
session.query(User).filte_by(name='jack').count()
#from sqlalchemy.orm import clear_mappers
#clear_mappers()