SQLAlchemy ORM及Core操作方法

SQLAlchemy ORM 详解

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker

print(sqlalchemy.__version__)
# # examples of connection http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html#sqlalchemy.create_engine
engine = create_engine(‘sqlite:///foo.db’, echo=True)
#echo:是否显示原始sql语句

# Base = declarative_base()
#
# class User(Base):
#     __tablename__ = ‘users’
#
#     id = Column(Integer, primary_key=True)
#     name = Column(String)
#     fullname = Column(String)
#     password = Column(String)
#
#     def __repr__(self):
#        return “<User(name=’%s’, fullname=’%s’, password=’%s’)>” % (
#                             self.name, self.fullname, self.password)
#
# Base.metadata.create_all(engine)  #
创建表

# ed_user = User(name=’ed’, fullname=’Ed Jones’, password=’edspassword’)
# print(ed_user)
#

##事务
# Session = sessionmaker(bind=engine)
# session = Session()
# session.add(ed_user)
# our_user = session.query(User).filter_by(name=’ed’).first()
# SELECT * FROM users WHERE name=”ed” LIMIT 1;

# session.add_all([
#     User(name=’wendy’, fullname=’Wendy Williams’, password=’foobar’),
#     User(name=’mary’, fullname=’Mary Contrary’, password=’xxg527′),
#     User(name=’fred’, fullname=’Fred Flinstone’, password=’blah’)])
#
# session.commit()

# print(session.query(User).filter_by(name=’ed’).first())
# print(session.query(User).all())
# for row in session.query(User).order_by(User.id):
#     print(row)
# for row in session.query(User).filter(User.name.in_([‘ed’, ‘wendy’, ‘jack’])):
#     print(row)
# for row in session.query(User).filter(~User.name.in_([‘ed’, ‘wendy’, ‘jack’])):
#     print(row)
# print(session.query(User).filter(User.name == ‘ed’).count())
#
# from sqlalchemy import and_, or_
# for row in session.query(User).filter(and_(User.name == ‘ed’, User.fullname == ‘Ed Jones’)):
#     print(row)
# for row in session.query(User).filter(or_(User.name == ‘ed’, User.name == ‘wendy’)):
#     print(row)
#
# from sqlalchemy import ForeignKey
# from sqlalchemy.orm import relationship, backref
#
# class Address(Base):
#     __tablename__ = ‘addresses’
#     id = Column(Integer, primary_key=True)
#     email_address = Column(String, nullable=False)
#     user_id = Column(Integer, ForeignKey(‘users.id’))
#
#     user = relationship(“User”, backref=backref(‘addresses’, order_by=id))
#
#     def __repr__(self):
#         return “<Address(email_address=’%s’)>” % self.email_address
# # Base.metadata.create_all(engine)
# #
# jack = User(name=’jack’, fullname=’Jack Bean’, password=’gjffdd’)
# jack.addresses = [
#                 Address(email_address=’jack@google.com’),
#                 Address(email_address=’j25@yahoo.com’)]
# session.add(jack)
# session.commit()
#
# for u, a in session.query(User, Address).\
#                     filter(User.id==Address.user_id).\
#                     filter(Address.email_address==’jack@google.com’).\
#                     all():
#     print u, a

SQLAlchemy Core 详解

import sqlalchemy
print(sqlalchemy.__version__)

from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey

engine = create_engine(‘sqlite:///foo.db’, echo=True)

metadata = MetaData()
users = Table(‘users’, metadata,
    Column(‘id’, Integer, primary_key=True),
    Column(‘name’, String),
    Column(‘fullname’, String),
)

addresses = Table(‘addresses’, metadata,
  Column(‘id’, Integer, primary_key=True),
  Column(‘user_id’, None, ForeignKey(‘users.id’)),
  Column(’email_address’, String, nullable=False)
)

# metadata.create_all(engine)
conn = engine.connect()

# conn.execute(users.insert(), [dict(name=’jack’, fullname=’Jack Jones’),
#                               dict(name=’wendy’, fullname=’Wendy Williams’)])
# conn.execute(addresses.insert(), [
#    {‘user_id’: 1, ’email_address’ : ‘jack@yahoo.com’},
#    {‘user_id’: 1, ’email_address’ : ‘jack@msn.com’},
#    {‘user_id’: 2, ’email_address’ : ‘www@www.org’},
#    {‘user_id’: 2, ’email_address’ : ‘wendy@aol.com’},
# ])

from sqlalchemy.sql import select
# s = select([users])
# result = conn.execute(s)
# for row in result:
#     print(row)

# s = select([users, addresses]).where(users.c.id == addresses.c.user_id)
# for row in conn.execute(s):
#     print row

from sqlalchemy.sql import text
s = text(
    “SELECT users.fullname || ‘, ‘ || addresses.email_address AS title ”
        “FROM users, addresses ”
        “WHERE users.id = addresses.user_id ”
        “AND users.name BETWEEN 😡 AND :y ”
        “AND (addresses.email_address LIKE :e1 ”
            “OR addresses.email_address LIKE :e2)”
)
print(conn.execute(s, x=‘m’, y=‘z’, e1=‘%@aol.com’, e2=‘%@msn.com’).fetchall())

微信分享普通html页时分享图标及题目的设置

问题:使用微信打开普通的html页面,分享时图标为空白,并且标题也有问题

解决方法:

1、在<head></head>直接添加如下代码(注意:logo.jpg必须为300*300px)

<div id=”wx_pic” style=”margin:0 auto;display:none;”> <img src=”images/logo.jpg”/> </div>

2、更改<title>为想要的标题

搞定!

0x01-微信公众号开发(环境,调试)

1、公众号测试后台环境
http://mp.weixin.qq.com/debug/cgi-bin/sandbox?t=sandbox/login

2、公众号信息分析

公众号接收消息格式(从微信发送到公众号)
1)文本信息
TextMessage(OrderedDict([(u’ToUserName’,u’gh_237b9488f76b’), (u’FromUserName’, u’ok2hTwT4sdRfdIwXWxwse3GBqukM’), (u’CreateTime’, u’1468725197′), (u’MsgType’, u’text’), (u’Content’, u’hajj’), (u’MsgId’, u’6308126688332557493′)]))
2)取消关注
UnsubscribeEvent(OrderedDict([(u’ToUserName’, u’gh_237b9488f76b’), (u’FromUserName’, u’ok2hTwT4sdRfdIwXWxwse3GBqukM’), (u’CreateTime’, u’1468725643′), (u’MsgType’, u’event’), (u’Event’, u’unsubscribe’), (u’EventKey’, None)]))
3)关注
SubscribeEvent(OrderedDict([(u’ToUserName’, u’gh_237b9488f76b’), (u’FromUserName’, u’ok2hTwT4sdRfdIwXWxwse3GBqukM’), (u’CreateTime’, u’1468725754′), (u’MsgType’, u’event’), (u’Event’, u’subscribe’), (u’EventKey’, None)]))
4)图片信息(通过菜单“图片”或者通过“拍摄”图片)
ImageMessage(OrderedDict([(u’ToUserName’, u’gh_237b9488f76b’), (u’FromUserName’, u’ok2hTwT4sdRfdIwXWxwse3GBqukM’), (u’CreateTime’, u’1468725820′), (u’MsgType’, u’image’), (u’PicUrl’, u’http://mmbiz.qpic.cn/mmbiz/dZ4QUHDiapeDjCYJScoEstA8LicPjMIiaBicIic5ekHM1VMazH9o1pru1SHvIyqKb20ibrADQK8lGagWadGIzfWvRGyg/0′), (u’MsgId’, u’6308129364097182966′), (u’MediaId’, u’i1OO6bmBGepdlf7LHVq1Y-PkIk7fstCRCW134VN5yYQlBfjz16Y4kDdAPQaR1HUS’)]))
5)视频信息 (通过菜单“图片”上传视频,或者通过“拍摄”的视频)
VideoMessage(OrderedDict([(u’ToUserName’, u’gh_237b9488f76b’), (u’FromUserName’, u’ok2hTwT4sdRfdIwXWxwse3GBqukM’), (u’CreateTime’, u’1468725997′), (u’MsgType’, u’video’), (u’MediaId’, u’n1ZEh8PnywSrcErQN0vCfGbNt3wtyI2f10ikI7f4jVPXTqv-TkYrAoaaPoqLgsDP’), (u’ThumbMediaId’, u’9lXrBnhZWzIZwyWop274OuQTi6EBLzc_cyuCKhVN7m_JC0peN5jYdvAySZBjXjbh’), (u’MsgId’, u’6308130124306394400′)]))
6)短视频信息(通过菜单“短视频”)
ShortVideoMessage(OrderedDict([(u’ToUserName’, u’gh_237b9488f76b’), (u’FromUserName’, u’ok2hTwT4sdRfdIwXWxwse3GBqukM’), (u’CreateTime’, u’1468726012′), (u’MsgType’, u’shortvideo’), (u’MediaId’, u’EqTgpIUOVztQuIFMfAHMZhNUEgOlR0gLAQnjbn0avQdHElBnJbTErARdIGSdl-7t’), (u’ThumbMediaId’, u’fbDj18A8XnGfCoWJ2zJaXEBG_4abwM9DbMo1XlLJ_XCONkEvgNEZxvn4GLUR9-zK’), (u’MsgId’, u’6308130188730903844′)]))
7)位置信息(菜单“位置”)
LocationMessage(OrderedDict([(u’ToUserName’, u’gh_237b9488f76b’), (u’FromUserName’, u’ok2hTwT4sdRfdIwXWxwse3GBqukM’), (u’CreateTime’, u’1468726187′), (u’MsgType’, u’location’), (u’Location_X’, u’40.059185′), (u’Location_Y’, u’116.416283′), (u’Scale’, u’15’), (u’Label’, u’\u5317\u4eac\u5e02\u660c\u5e73\u533a\u7acb\u6c64\u8def186\u53f7\u9f99\u5fb7\u5e7f\u573aF1\u5c42′), (u’MsgId’, u’6308130940350180678′)]))
8)连接信息(菜单“收藏”)
LinkMessage(OrderedDict([(u’ToUserName’, u’gh_237b9488f76b’), (u’FromUserName’, u’ok2hTwT4sdRfdIwXWxwse3GBqukM’), (u’CreateTime’, u’1468726270′), (u’MsgType’, u’link’), (u’Title’, u’\u4ee8\u5927\u5956\u72ec\u5ba0\u4e00\u4eba \u79bb\u4e0d\u5f00\u575a\u6301\u4e0e\u7528\u5fc3′), (u’Description’, u’\u6700\u8fd1\uff0c\u8fbd\u9633\u6587\u5723\u8def\u9500\u552e\u5385\u9891\u9891\u5f97\u5e78\u8fd0\u4e4b\u795e\u5782\u9752\u30026\u65e5\u5185\uff0c\u8be5\u5385\u4e0d\u4ec5\u964d\u843d\u4e86\u4e00\u4e2a25\u4e07\u5143\u5168\u56fd\u7d2f\u79ef\u5956\uff0c\u8fd8\u5206\u522b\u98d8\u843d\u4e0b\u4e00\u4e2a5.’), (u’Url’, u’http://mp.weixin.qq.com/s?__biz=MzAwMzUxMjQxNg==&mid=2649836741&idx=3&sn=be02636fad0c056a337d196e6cdc6bee&scene=0#rd’), (u’MsgId’, u’6308131296832466258′)]))
9)名片信息(不支持)

 

Android Apk实时写入信息用于下载渠道统计

场景1:为了能识别当前打开的应用是从哪个渠道下载的,app应用打开使用时需要给服务器上传渠道信息,为了实现该功能:
方案1:不同的渠道可能需要打不同的apk包(包含渠道信息)
缺点:如果渠道多的话不容易维护,而且发布不同的包工作量有点大
方案2:实时打包apk,将渠道信息写入apk

场景2:通过不同的营销页面下载apk后,当用户打开apk的时候想让用户首先看到他下载时的营销页面以便继续下一步的操作(领取红包等操作),使用户从看到营销页,到下载app、打开app继续操作过程平滑过渡,不产生割裂感

实现方式:
下载服务实时修改apk信息,如架设服务http://test.net/download_apk?src=渠道1,其中的src值可以随便设定,服务程序将src值写入到apk中供用户下载,apk中对src值进行处理即可。

源码示例:demo_Android Apk实时写入信息用于下载渠道统计.zip
动态写入Apk.docx – 原理
源码 – Android apk信息读取源码、java写入信息源码
readtest.apk-(Android apk信息读取源码)生成程序
writetest.apk-(java写入信息源码)生成程序,用于参考写java服务
writetest.py  –  用python写的信息写入程序,用户参考写python服务

后续假设在线测试demo…

几种常见sqlalchemy查询

几种常见sqlalchemy查询:

    #简单查询
    print(session.query(User).all())
    print(session.query(User.name, User.fullname).all())
    print(session.query(User, User.name).all())
    
    #带条件查询
    print(session.query(User).filter_by(name='user1').all())
    print(session.query(User).filter(User.name == "user").all())
    print(session.query(User).filter(User.name.like("user%")).all())
    
    #多条件查询
    print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all())
    print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all())
    
    #sql过滤
    print(session.query(User).filter("id>:id").params(id=1).all())
    
    #关联查询 
    print(session.query(User, Address).filter(User.id == Address.user_id).all())
    print(session.query(User).join(User.addresses).all())
    print(session.query(User).outerjoin(User.addresses).all())
    
    #聚合查询
    print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all())
    print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all())
    
    #子查询
    stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()
    print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all())
    
    #exists
    print(session.query(User).filter(exists().where(Address.user_id == User.id)))
    print(session.query(User).filter(User.addresses.any()))

限制返回字段查询

person = session.query(Person.name, Person.created_at,                     
             Person.updated_at).filter_by(name="zhongwei").order_by(            
             Person.created_at).first()

记录总数查询:

from sqlalchemy import func

# count User records, without
# using a subquery.
session.query(func.count(User.id))

# return count of user "id" grouped
# by "name"
session.query(func.count(User.id)).\
        group_by(User.name)

from sqlalchemy import distinct

# count distinct "name" values
session.query(func.count(distinct(User.name)))