数据库版本控制Alembic

安装:
pip3 install alembic

初始化:(如果项目中已经有了,不需要再重新初始化,请直接使用)
在工程目录新建文件夹alembic(名字可以任意),进入alembic执行:alembic init alembic
生成:
alembic.ini
alembic/versions/
alembic/env.py
alembic/script.py.mako

配置:
alembic.ini

sqlalchemy.url = driver://user:pass@localhost:port/dbname
修改为
mysql+pymysql://root:123456@192.168.0.100:3306/xxx

alembic/env.py

target_metadata = None
修改为
import os

import sys

root = os.path.dirname(__file__)+’/../../src/’
sys.path.append(root)

from db.models import Base

target_metadata = Base.metadata

自动创建版本:
alembic revision –autogenerate -m “initdb”

更新数据库:
alembic upgrade 版本号

更新到最新版
alembic upgrade head

降级数据库
alembic downgrade 版本号

更新到最初版
alembic downgrade head

离线更新(生成sql)
alembic upgrade 版本号 –sql > migration.sql

从特定起始版本生成sql
alembic upgrade 版本一:版本二 –sql > migration.sql

查询当前数据库版本号
查看alembic_version表。

清除所有版本
将versions删掉,并删除alembic_version表

平时使用:
alembic revision –autogenerate -m “”
alembic upgrade head

Celery,Tornado,Supervisor构建和谐的分布式系统【转】

Celery 分布式的任务队列

与rabbitmq消息队列的区别与联系:

  • rabbitmq 调度的是消息,而Celery调度的是任务.
  • Celery调度任务时,需要传递参数信息,传输载体可以选择rabbitmq.
  • 利用rabbitmq的持久化和ack特性,Celery可以保证任务的可靠性.

优点:

  • 轻松构建分布式的Service Provider,提供服务。
  • 高可扩展性,增加worker也就是增加了队列的consumer。
  • 可靠性,利用消息队列的durable和ack,可以尽可能降低消息丢失的概率,当worker崩溃后,未处理的消息会重新进入消费队列。
  • 用户友好,利用flower提供的管理工具可以轻松的管理worker。

    flower
  • 使用tornado-celery,结合tornado异步非阻塞结构,可以提高吞吐量,轻松创建分布式服务框架。
  • 学习成本低,可快速入门

快速入门
定义一个celery实例main.py:

from celery import Celery
app = Celery('route_check', include=['check_worker_path'], 
        broker='amqp://user:password@rabbitmq_host:port//')
app.config_from_object('celeryconfig')

include指的是需要celery扫描是否有任务定义的模块路径。例如add_task 就是扫描add_task.py中的任务

celery的配置文件可以从文件、模块中读取,这里是从模块中读取,celeryconfig.py为:

from multiprocessing import cpu_count

from celery import platforms
from kombu import Exchange, Queue

CELERYD_POOL_RESTARTS = False
CELERY_RESULT_BACKEND = 'redis://:password@redis_host:port/db'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('common_check', Exchange('route_check'), routing_key='common_check'),
    Queue('route_check', Exchange('route_check'), routing_key='route_check', delivery_mode=2),
    Queue('route_check_ignore_result', Exchange('route_check'), routing_key='route_check_ignore_result',
          delivery_mode=2)
)
CELERY_ROUTES = {
    'route_check_task.check_worker.common_check': {'queue': 'common_check'},
    'route_check_task.check_worker.check': {'queue': 'route_check'},
    'route_check_task.check_worker.check_ignore_result': {'queue': 'route_check_ignore_result'}
}
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'
# CELERY_MESSAGE_COMPRESSION = 'gzip'
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_DISABLE_RATE_LIMITS = True
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
CELERYD_CONCURRENCY = cpu_count() / 2
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_PUBLISH_RETRY = True
CELERY_TASK_PUBLISH_RETRY_POLICY = {
    'max_retries': 3,
    'interval_start': 10,
    'interval_step': 5,
    'interval_max': 20
}
platforms.C_FORCE_ROOT = True

这里面是一些celery的配置参数

在上面include的add_task.py定义如下:

#encoding:utf8

from main import app

@app.task
def add(x,y):
    return x+y

启动celery
celery -A main worker -l info -Ofair

  • -A 后面是包含celery定义的模块,我们在main.py中定义了app = Celery...
    测试celery:
  • -l 日志打印的级别,这里是info
  • Ofair 这个参数可以让Celery更好的调度任务
# encoding:utf8
__author__ = 'brianyang'

import add_task

result = add_task.add.apply_async((1,2))
print type(result)
print result.ready()
print result.get()
print result.ready()

输出是

<class 'celery.result.AsyncResult'>
False
3
True

当调用result.get()时,如果还没有返回结果,将会阻塞直到结果返回。这里需要注意的是,如果需要返回worker执行的结果,必须在之前的config中配置CELERY_RESULT_BACKEND这个参数,一般推荐使用Redis来保存执行结果,如果不关心worker执行结果,设置CELERY_IGNORE_RESULT=True就可以了,关闭缓存结果可以提高程序的执行速度。
在上面的测试程序中,如果修改为:

# encoding:utf8
__author__ = 'brianyang'

import add_task

result = add_task.add.(1,2)
print type(result)
print result

输出结果为:

<type 'int'>
3

相当于直接本地调用了add方法,并没有走Celery的调度。
通过flower的dashbord可以方便的监控任务的执行情况:

task list

task detail

还可以对worker进行重启,关闭之类的操作

taks_op

使用Celery将一个集中式的系统拆分为分布式的系统大概步骤就是:

  • 根据功能将耗时的模块拆分出来,通过注解的形式让Celery管理
  • 为拆分的模块设置独立的消息队列
  • 调用者导入需要的模块或方法,使用apply_async进行异步的调用并根据需求关注结果。
  • 根据性能需要可以添加机器或增加worker数量,方便弹性管理。

需要注意的是:

  • 尽量为不同的task分配不同的queue,避免多个功能的请求堆积在同一个queue中。
  • celery -A main worker -l info -Ofair -Q add_queue启动Celery时,可以通过参数Q加queue_name来指定该worker只接受指定queue中的tasks.这样可以使不同的worker各司其职。
  • CELERY_ACKS_LATE可以让你的Celery更加可靠,只有当worker执行完任务后,才会告诉MQ,消息被消费。
  • CELERY_DISABLE_RATE_LIMITS Celery可以对任务消费的速率进行限制,如果你没有这个需求,就关闭掉它吧,有益于会加速你的程序。

tornado-celery

tornado应该是python中最有名的异步非阻塞模型的web框架,它使用的是单进程轮询的方式处理用户请求,通过epoll来关注文件状态的改变,只扫描文件状态符发生变化的FD(文件描述符)。
由于tornado是单进程轮询模型,那么就不适合在接口请求后进行长时间的耗时操作,而是应该接收到请求后,将请求交给背后的worker去干,干完活儿后在通过修改FD告诉tornado我干完了,结果拿走吧。很明显,Celery与tornado很般配,而tornado-celery是celery官方推荐的结合两者的一个模块。
整合两者很容易,首先需要安装:

  • tornado-celery
  • tornado-redis
    tornado代码如下:
# encoding:utf8
__author__ = 'brianyang'

import tcelery
import tornado.gen
import tornado.web

from main import app
import add_task

tcelery.setup_nonblocking_producer(celery_app=app)


class CheckHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self):
        x = int(self.get_argument('x', '0'))
        y = int(self.get_argument('y', '0'))
        response = yield tornado.gen.Task(add_task.add.apply_async, args=[x, y])
        self.write({'results': response.result})
        self.finish


application = tornado.web.Application([
    (r"/add", CheckHandler),
])

if __name__ == "__main__":
    application.listen(8889)
    tornado.ioloop.IOLoop.instance().start()

在浏览器输入:http://127.0.0.1:8889/add?x=1&y=2
结果为:

通过tornado+Celery可以显著的提高系统的吞吐量。

Benchmark

使用Jmeter进行压测,60个进程不间断地的访问服务器:
接口单独访问响应时间一般在200~400ms

  • uwsgi + Flask方案:
    uwsgi关键配置:

    processes       = 10
    threads         = 3

    Flask负责接受并处理请求,压测结果:
    qps是46,吞吐量大概是2700/min

    uwsgi+Flask
  • tornado+Celery方案:
    Celery配置:
    CELERYD_CONCURRENCY = 10也就是10个worker(进程),压测结果:
    qps是139,吞吐量大概是8300/min

    tornado+Celery

    从吞吐量和接口相应时间各方面来看,使用tornado+Celery都能带来更好的性能。

Supervisor

  • 什么是supervisor
    supervisor俗称Linux后台进程管理器
  • 适合场景
    — 需要长期运行程序,除了nohup,我们有更好的supervisor
    — 程序意外挂掉,需要重启,让supervisor来帮忙
    — 远程管理程序,不想登陆服务器,来来来,supervisor提供了高大上的web操作界面.
    之前启动Celery命令是celery -A main worker -l info -Ofair -Q common_check,当你有10台机器的时候,每次更新代码后,都需要登陆服务器,然后更新代码,最后再杀掉Celery进程重启,恶不恶心,简直恶心死了。
    让supervisor来,首先需要安装:
    pip install supervisor
    配置文件示例:
[unix_http_server]
file=/tmp/supervisor.sock   ; path to your socket file
chmod=0777
username=admin
password=admin

[inet_http_server]
port=0.0.0.0:2345
username=admin
password=admin

[supervisord]
logfile=/var/log/supervisord.log ; supervisord log file
logfile_maxbytes=50MB       ; maximum size of logfile before rotation
logfile_backups=10          ; number of backed up logfiles
loglevel=info               ; info, debug, warn, trace
pidfile=/var/run/supervisord.pid ; pidfile location
nodaemon=false              ; run supervisord as a daemon
minfds=1024                 ; number of startup file descriptors
minprocs=200                ; number of process descriptors
user=root                   ; default user
childlogdir=/var/log/            ; where child log files will live

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use unix:// schem for a unix sockets.
username=admin
password=admin
[program:celery]
command=celery -A main worker -l info -Ofair

directory=/home/q/celeryTest
user=root
numprocs=1
stdout_logfile=/var/log/worker.log
stderr_logfile=/var/log/worker.log
autostart=true
autorestart=true
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 10

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; Set Celery priority higher than default (999)
; so, if rabbitmq is supervised, it will start first.
priority=1000

示例文件很长,不要怕,只需要复制下来,改改就可以
比较关键的几个地方是:

[inet_http_server]
port=0.0.0.0:2345
username=admin
password=admin

这个可以让你通过访问http://yourhost:2345 ,验证输入admin/admin的方式远程管理supervisor,效果如下:

remote supervisor

[program:flower]这里就是你要托管给supervisor的程序的一些配置,其中autorestart=true可以在程序崩溃时自动重启进程,不信你用kill试试看。
剩下的部分就是一些日志位置的设置,当前工作目录设置等,so esay~

supervisor优点:

  • 管理进程简单,再也不用nohup & kill了。
  • 再也不用担心程序挂掉了
  • web管理很方便

缺点:

  • web管理虽然方便,但是每个页面只能管理本机的supervisor,如果我有一百台机器,那就需要打开100个管理页面,太麻烦了.

怎么办~

supervisor-easy闪亮登场

通过rpc调用获取配置中的每一个supervisor程序的状态并进行管理,可以分组,分机器进行批量/单个的管理。方便的不要不要的。来两张截图:

  • 分组管理:

    group
  • 分机器管理:

    server

    通过简单的配置,可以方便的进行管理。

Python装饰器的一个例子

测试一下例子基本上就明白了

 

# -*- coding: utf-8 -*-
__author__ = 'liwf'
import logging

from time import time

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
is_debug = True

def count_time(is_debug):
    def  handle_func(func):
        def  handle_args(*args, **kwargs):
            if is_debug:
                begin = time()
                func(*args, **kwargs)
                logging.debug( "[" + func.__name__ + "] -> " + str(time() - begin) )
            else:
                func(*args, **kwargs)
        return handle_args
    return handle_func

def pr():
    for i in range(1,1000000):
        i = i * 2
    print("hello world")

def test():
    pr()

@count_time(is_debug)
def test2():
    pr()

@count_time(False)
def test3():
    pr()

if __name__ == "__main__":
    #test()
    test2()
    test2()
    test2()
    test2()
    test2()

centos7.2 python2.7.5升级到python3.5.2

安装必要的程序
yum install deltarpm
yum install openssl-devel
yum install sqlite-devel (python 中使用sqlite)

安装
0. yum install gcc
1. yum install wget
2. wget https://www.python.org/ftp/python/3.5.2/Python-3.5.2.tgz
3. tar -xf Python-3.5.2.tgz
4. cd Python-3.5.2
5. ./configure
6. make
7. make install

配置
1.mv /usr/bin/python /usr/bin/python2.7.5
2.解决升级后yum不能使用问题
vi /usr/bin/yum
把文件头部的#!/usr/bin/python改成#!/usr/bin/python2.7.5保存退出即可

vi /usr/libexec/urlgrabber-ext-down
把文件头部的#!/usr/bin/python改成#!/usr/bin/python2.7.5保存退出即可

3.将默认python改成3.5.2
ln -s /usr/local/bin/python3.5 /usr/bin/python

4.更新pip版本
rm /usr/bin/pip
ln -s /usr/local/bin/pip3 /usr/bin/pip
5.检测
python -V

常见错误:

make install时:

报错

/usr/bin/install -c -m 644 ./Tools/gdb/libpython.py python-gdb.py
gcc -pthread -c -Wno-unused-result -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -std=c99 -Wextra -Wno-unused-result -Wno-unused-parameter -Wno-missing-field-initializers -I. -I./Include -DPy_BUILD_CORE -o Programs/_testembed.o ./Programs/_testembed.c
gcc -pthread -Xlinker -export-dynamic -o Programs/_testembed Programs/_testembed.o libpython3.6m.a -lpthread -ldl -lutil -lm
# Substitution happens here, as the completely-expanded BINDIR
# is not available in configure
sed -e "s,@EXENAME@,/usr/local/bin/python3.6m," < ./Misc/python-config.in >python-config.py
# Replace makefile compat. variable references with shell script compat. ones; ->
LC_ALL=C sed -e 's,\$(\([A-Za-z0-9_]*\)),\$\{\1\},g' < Misc/python-config.sh >python-config
# On Darwin, always use the python version of the script, the shell
# version doesn't use the compiler customizations that are provided
# in python (_osx_support.py).
if test `uname -s` = Darwin; then \
cp python-config.py python-config; \
fi

解决方法:

按提示执行:

>sed -e “s,@EXENAME@,/usr/local/bin/python3.6m,” < ./Misc/python-config.in >python-config.py
>LC_ALL=C sed -e ‘s,\$(\([A-Za-z0-9_]*\)),\$\{\1\},g’ < Misc/python-config.sh >python-config

Python日志

1、通过网络的方式发送日志

import logging

   logging_host = ‘127.0.0.1’
   logging_port = 8888
   logging_add_url = ‘/log/add/’
   def get_logger():
       logger = logging.getLogger()
      
       http_handler = logging.handlers.HTTPHandler(
           ‘%s:%s’ % (logging_host, logging_port),
           logging_add_url,
           method=’POST’
       )
       http_handler.setLevel(logging.ERROR)
       logger.addHandler(http_handler)
      
       return logger

[转]每个 Python 程序员都要知道的日志实践

打印输出不是个好办法

尽管记录日志非常重要,但是并不是所有的开发者都能正确地使用它。我曾看到一些开发者是这样记录日志的,在开发的过程中插入 print 语句,开发结束后再将这些语句移除。就像这样:

print ‘Start reading database’

records = model.read_recrods()

print ‘# records’, records

print ‘Updating record …’

model.update_records(records)

print ‘done’

这种方式对于简单脚本型程序有用,但是如果是复杂的系统,你最好不要使用这样的方式。首先,你没办法做到在日志文件中只留下极其重要的消息。你会看到大量的消息日志。但是你却找不到任何有用的信息。你除了移除这输出语句这外,没别的办法控制代码,但是极有可能的是你忘记了移出那些没用的输出。再者,print 输出的所有信息都到了标准输出中,这将严重影响到你从标准输出中查看其它输出数据。当然,你也可以把消息输出到 stderr ,但是用 print 做日志记录的方式还是不好。

使用 python 的标准日志模块

那么,怎么样记录日志才是正确的呢?其实非常简单,使用 python 的标准日志模块。多亏 python 社区将日志做成了一个标准模块。它非常简单易用且十分灵活。你可以像这样使用日志系统:

import logging

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)

 

logger.info(‘Start reading database’)

# read database here

 

records = {‘john’: 55, ‘tom’: 66}

logger.debug(‘Records: %s’, records)

logger.info(‘Updating records …’)

# update records here

 

logger.info(‘Finish updating records’)

运行的时候就可看到:

INFO:__main__:Start reading database

INFO:__main__:Updating records

INFO:__main__:Finish updating records

你可能会问这与使用 print 有什么不同呢。它有以下的优势:

  • 你可以控制消息的级别,过滤掉那些并不重要的消息。
  • 你可决定输出到什么地方,以及怎么输出。

有许多的重要性别级可供选择,debug、info、warning、error 以及 critical。通过赋予 logger 或者 handler 不同的级别,你就可以只输出错误消息到特定的记录文件中,或者在调试时只记录调试信息。让我们把 logger 的级别改成 DEBUG 再看一下输出结果:

logging.basicConfig(level=logging.DEBUG)

输出变成了:

INFO:__main__:Start reading database

DEBUG:__main__:Records: {‘john’: 55, ‘tom’: 66}

INFO:__main__:Updating records

INFO:__main__:Finish updating records

正如看到的那样,我们把 logger 的等级改为 DEBUG 后,调试记录就出现在了输出当中。你也可以选择怎么处理这些消息。例如,你可以使用 FileHandler 把记录写进文件中:

import logging

 

logger = logging.getLogger(__name__)

logger.setLevel(logging.INFO)

 

# create a file handler

 

handler = logging.FileHandler(‘hello.log’)

handler.setLevel(logging.INFO)

 

# create a logging format

 

formatter = logging.Formatter(‘%(asctime)s – %(name)s – %(levelname)s – %(message)s’)

handler.setFormatter(formatter)

 

# add the handlers to the logger

 

logger.addHandler(handler)

 

logger.info(‘Hello baby’)

标准库模块中提供了许多的 handler ,你可以将记录发送到邮箱甚至发送到一个远程的服务器。你也可以实现自己的记录 handler 。这里将不具体讲述实现的细节,你可以参考官方文档:Basci Turial、Advanced Tutorial 与 Logging Cookbook。

以合适的等级输出日志记录

有了灵活的日志记录模块后,你可以按适当的等级将日志记录输出到任何地方然后配置它们。那么你可能会问,什么是合适的等级呢?在这儿我将分享一些我的经验。

大多数的情况下,你都不想阅读日志中的太多细节。因此,只有你在调试过程中才会使用 DEBUG 等级。我只使用 DEBUG 获取详细的调试信息,特别是当数据量很大或者频率很高的时候,比如算法内部每个循环的中间状态。

def complex_algorithm(items):

    for i, item in enumerate(items):

        # do some complex algorithm computation

 

        logger.debug(‘%s iteration, item=%s’, i, item)

在处理请求或者服务器状态变化等日常事务中,我会使用 INFO 等级。

def handle_request(request):

    logger.info(‘Handling request %s’, request)

    # handle request here

 

    result = ‘result’

    logger.info(‘Return result: %s’, result)

 

def start_service():

    logger.info(‘Starting service at port %s …’, port)

    service.start()

    logger.info(‘Service is started’)

当发生很重要的事件,但是并不是错误时,我会使用 WARNING 。比如,当用户登录密码错误时,或者连接变慢时。

def authenticate(user_name, password, ip_address):

    if user_name != USER_NAME and password != PASSWORD:

        logger.warn(‘Login attempt to %s from IP %s’, user_name, ip_address)

        return False

    # do authentication here

有错误发生时肯定会使用 ERROR 等级了。比如抛出异常,IO 操作失败或者连接问题等。

def get_user_by_id(user_id):

    user = db.read_user(user_id)

    if user is None:

        logger.error(‘Cannot find user with user_id=%s’, user_id)

        return user

    return user

我很少使用 CRITICAL 。当一些特别糟糕的事情发生时,你可以使用这个级别来记录。比方说,内存耗尽,磁盘满了或者核危机(希望永远别发生 :S)。

使用 __name__ 作为 logger 的名称

虽然不是非得将 logger 的名称设置为 __name__ ,但是这样做会给我们带来诸多益处。在 python 中,变量 __name__ 的名称就是当前模块的名称。比如,在模块 “foo.bar.my_module” 中调用 logger.getLogger(__name__) 等价于调用logger.getLogger(“foo.bar.my_module”) 。当你需要配置 logger 时,你可以配置到 “foo” 中,这样包 foo 中的所有模块都会使用相同的配置。当你在读日志文件的时候,你就能够明白消息到底来自于哪一个模块。

捕捉异常并使用 traceback 记录它

出问题的时候记录下来是个好习惯,但是如果没有 traceback ,那么它一点儿用也没有。你应该捕获异常并用 traceback 把它们记录下来。比如下面这个例子:

try:

    open(‘/path/to/does/not/exist’, ‘rb’)

except (SystemExit, KeyboardInterrupt):

    raise

except Exception, e:

    logger.error(‘Failed to open file’, exc_info=True)

使用参数 exc_info=true 调用 logger 方法, traceback 会输出到 logger 中。你可以看到下面的结果:

ERROR:__main__:Failed to open file

Traceback (most recent call last):

  File “example.py”, line 6, in <module>

    open(‘/path/to/does/not/exist’, ‘rb’)

IOError: [Errno 2] No such file or directory: ‘/path/to/does/not/exist’

你也可以调用 logger.exception(msg, _args),它等价于 logger.error(msg, exc_info=True, _args)。

千万不要在模块层次获取 Logger,除非 disable_existing_loggers 被设置为 False

你可以看到很多在模块层次获取 logger 的例子(在这篇文章我也使用了很多,但这仅仅为了让示例更短一些)。它们看上去没什么坏处,但事实上,这儿是有陷阱的 – 如果你像这样在模块中使用 Logger,Python 会保留从文件中读入配置前所有创建的所有 logger。

my_module.py

import logging

 

logger = logging.getLogger(__name__)

 

def foo():

    logger.info(‘Hi, foo’)

 

class Bar(object):

    def bar(self):

        logger.info(‘Hi, bar’)

main.py

import logging

 

logger = logging.getLogger(__name__)

 

def foo():

    logger.info(‘Hi, foo’)

 

class Bar(object):

    def bar(self):

        logger.info(‘Hi, bar’)

logging.ini

[loggers]

keys=root

 

[handlers]

keys=consoleHandler

 

[formatters]

keys=simpleFormatter

 

[logger_root]

level=DEBUG

handlers=consoleHandler

 

[handler_consoleHandler]

class=StreamHandler

level=DEBUG

formatter=simpleFormatter

args=(sys.stdout,)

 

[formatter_simpleFormatter]

format=%(asctime)s – %(name)s – %(levelname)s – %(message)s

datefmt=

本应该在日志中看到记录,但是你却什么也没有看到。为什么呢?这就是因为你在模块层次创建了 logger,然后你又在加载日志配置文件之前就导入了模块。logging.fileConfig 与 logging.dictConfig 默认情况下会使得已经存在的 logger 失效。所以,这些配置信息不会应用到你的 Logger 上。你最好只在你需要 logger 的时候才获得它。反正创建或者取得 logger 的成本很低。你可以这样写你的代码:

import logging

 

def foo():

    logger = logging.getLogger(__name__)

    logger.info(‘Hi, foo’)

 

class Bar(object):

    def __init__(self, logger=None):

        self.logger = logger or logging.getLogger(__name__)

 

    def bar(self):

        self.logger.info(‘Hi, bar’)

这样,logger 就会在你加载配置后才会被创建。这样配置信息就可以正常应用。

python2.7 之后的版本中 fileConfg 与 dictConfig 都新添加了 “disable_existing_loggers” 参数,将其设置为 False,上面提到的问题就可以解决了。例如:

import logging

import logging.config

 

logger = logging.getLogger(__name__)

 

# load config from file

 

# logging.config.fileConfig(‘logging.ini’, disable_existing_loggers=False)

 

# or, for dictConfig

 

logging.config.dictConfig({

    ‘version’: 1,              

    ‘disable_existing_loggers’: False,  # this fixes the problem

 

    ‘formatters’: {

        ‘standard’: {

            ‘format’: ‘%(asctime)s [%(levelname)s] %(name)s: %(message)s’

        },

    },

    ‘handlers’: {

        ‘default’: {

            ‘level’:‘INFO’,    

            ‘class’:‘logging.StreamHandler’,

        },  

    },

    ‘loggers’: {

        : {                  

            ‘handlers’: [‘default’],        

            ‘level’: ‘INFO’,  

            ‘propagate’: True  

        }

    }

})

 

logger.info(‘It works!’)

使用 JSON 或者 YAML 记录配置

虽然你可以在 python 代码中配置你的日志系统,但是这样并不够灵活。最好的方法是使用一个配置文件来配置。在 Python2.7 及之后的版本中,你可以从字典中加载 logging 配置。这也就意味着你可以从 JSON 或者 YAML 文件中加载日志的配置。尽管你还能用原来 .ini 文件来配置,但是它既很难读也很难写。下面我给你们看一个用 JSON 和 YAML 文件配置的例子:

logging.json

{

    “version”: 1,

    “disable_existing_loggers”: false,

    “formatters”: {

        “simple”: {

            “format”: “%(asctime)s – %(name)s – %(levelname)s – %(message)s”

        }

    },

 

    “handlers”: {

        “console”: {

            “class”: “logging.StreamHandler”,

            “level”: “DEBUG”,

            “formatter”: “simple”,

            “stream”: “ext://sys.stdout”

        },

 

        “info_file_handler”: {

            “class”: “logging.handlers.RotatingFileHandler”,

            “level”: “INFO”,

            “formatter”: “simple”,

            “filename”: “info.log”,

            “maxBytes”: 10485760,

            “backupCount”: 20,

            “encoding”: “utf8”

        },

 

        “error_file_handler”: {

            “class”: “logging.handlers.RotatingFileHandler”,

            “level”: “ERROR”,

            “formatter”: “simple”,

            “filename”: “errors.log”,

            “maxBytes”: 10485760,

            “backupCount”: 20,

            “encoding”: “utf8”

        }

    },

 

    “loggers”: {

        “my_module”: {

            “level”: “ERROR”,

            “handlers”: [“console”],

            “propagate”: “no”

        }

    },

 

    “root”: {

        “level”: “INFO”,

        “handlers”: [“console”, “info_file_handler”, “error_file_handler”]

    }

}

logging.yaml

version: 1

 

disable_existing_loggers: False

 

formatters:

 

    simple:

 

        format: “%(asctime)s – %(name)s – %(levelname)s – %(message)s”

 

handlers:

 

    console:

 

        class: logging.StreamHandler

 

        level: DEBUG

 

        formatter: simple

 

        stream: ext://sys.stdout

 

    info_file_handler:

 

        class: logging.handlers.RotatingFileHandler

 

        level: INFO            

 

        formatter: simple

 

        filename: info.log

 

        maxBytes: 10485760 # 10MB

 

        backupCount: 20

 

        encoding: utf8

 

    error_file_handler:

 

        class: logging.handlers.RotatingFileHandler

 

        level: ERROR            

 

        formatter: simple

 

        filename: errors.log

 

        maxBytes: 10485760 # 10MB

 

        backupCount: 20

 

        encoding: utf8

 

loggers:

 

    my_module:

 

        level: ERROR

 

        handlers: [console]

 

        propagate: no

 

root:

 

    level: INFO

 

    handlers: [console, info_file_handler, error_file_handler]

 

接下来将展示怎样从 JSON 文件中读入日志的配置信息:

import json

import logging.config

 

def setup_logging(

    default_path=‘logging.json’,

    default_level=logging.INFO,

    env_key=‘LOG_CFG’

):

    “””Setup logging configuration

 

“””

    path = default_path

    value = os.getenv(env_key, None)

    if value:

        path = value

    if os.path.exists(path):

        with open(path, ‘rt’) as f:

            config = json.load(f)

        logging.config.dictConfig(config)

    else:

        logging.basicConfig(level=default_level)

使用 JSON 的一个优点就是 json是一个标准库,你不需要额外安装它。但是从我个人来说,我比较喜欢 YAML 一些。它无论是读起来还是写起来都比较容易。你也可以使用下面的方法来加载一个 YAML 配置文件:

import os

import logging.config

 

import yaml

 

def setup_logging(

    default_path=‘logging.yaml’,

    default_level=logging.INFO,

    env_key=‘LOG_CFG’

):

    “””Setup logging configuration

 

“””

    path = default_path

    value = os.getenv(env_key, None)

    if value:

        path = value

    if os.path.exists(path):

        with open(path, ‘rt’) as f:

            config = yaml.load(f.read())

        logging.config.dictConfig(config)

    else:

        lo

接下来,你就可以在运行程序的时候调用 setup_logging 来启动日志记录了。它默认会读取 logging.json 或 logging.yaml 文件 。你也可以设置环境变量 LOG_CCFG 从指定路径加载日志配置。例如:

LOG_CFG=my_logging.json python my_server.py

如果你喜欢 YAML:

LOG_CFG=my_logging.yaml python my_server.py

使用旋转文件句柄

如果你用 FileHandler 写日志,文件的大小会随着时间推移而不断增大。最终有一天它会占满你所有的磁盘空间。为了避免这种情况出现,你可以在你的生成环境中使用 RotatingFileHandler 替代 FileHandler。

如果你有多个服务器可以启用一个专用的日志服务器

当你有多个服务器和不同的日志文件时,你可以创建一个集中式的日志系统来收集重要的(大多数情况是警告或者错误消息)信息。然后通过监测这些日志信息,你就可以很容易地发现系统中的问题了。

总结

Python 的日志库设计得如此之好,真是让人欣慰,我觉得这是标准库中最好的一部分了,你不得不选择它。它很灵活,你可以用你自己的 handler 或者 filter。已经有很多的第三方的 handler 了,比如 pyzmq 提供的 ZeroMQ 日志句柄,它允许你通过 zmq 套接字发送日志消息。如果你还不知道怎么正确的使用日志系统,这篇文章将会非常有用。有了很好的日志记录实践,你就能非常容易地发现系统中的问题。这是很非常值得投资的。:)

Android Apk实时写入信息用于下载渠道统计

场景1:为了能识别当前打开的应用是从哪个渠道下载的,app应用打开使用时需要给服务器上传渠道信息,为了实现该功能:
方案1:不同的渠道可能需要打不同的apk包(包含渠道信息)
缺点:如果渠道多的话不容易维护,而且发布不同的包工作量有点大
方案2:实时打包apk,将渠道信息写入apk

场景2:通过不同的营销页面下载apk后,当用户打开apk的时候想让用户首先看到他下载时的营销页面以便继续下一步的操作(领取红包等操作),使用户从看到营销页,到下载app、打开app继续操作过程平滑过渡,不产生割裂感

实现方式:
下载服务实时修改apk信息,如架设服务http://test.net/download_apk?src=渠道1,其中的src值可以随便设定,服务程序将src值写入到apk中供用户下载,apk中对src值进行处理即可。

源码示例:demo_Android Apk实时写入信息用于下载渠道统计.zip
动态写入Apk.docx – 原理
源码 – Android apk信息读取源码、java写入信息源码
readtest.apk-(Android apk信息读取源码)生成程序
writetest.apk-(java写入信息源码)生成程序,用于参考写java服务
writetest.py  –  用python写的信息写入程序,用户参考写python服务

后续假设在线测试demo…

几种常见sqlalchemy查询

几种常见sqlalchemy查询:

    #简单查询
    print(session.query(User).all())
    print(session.query(User.name, User.fullname).all())
    print(session.query(User, User.name).all())
    
    #带条件查询
    print(session.query(User).filter_by(name='user1').all())
    print(session.query(User).filter(User.name == "user").all())
    print(session.query(User).filter(User.name.like("user%")).all())
    
    #多条件查询
    print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all())
    print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all())
    
    #sql过滤
    print(session.query(User).filter("id>:id").params(id=1).all())
    
    #关联查询 
    print(session.query(User, Address).filter(User.id == Address.user_id).all())
    print(session.query(User).join(User.addresses).all())
    print(session.query(User).outerjoin(User.addresses).all())
    
    #聚合查询
    print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all())
    print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all())
    
    #子查询
    stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()
    print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all())
    
    #exists
    print(session.query(User).filter(exists().where(Address.user_id == User.id)))
    print(session.query(User).filter(User.addresses.any()))

限制返回字段查询

person = session.query(Person.name, Person.created_at,                     
             Person.updated_at).filter_by(name="zhongwei").order_by(            
             Person.created_at).first()

记录总数查询:

from sqlalchemy import func

# count User records, without
# using a subquery.
session.query(func.count(User.id))

# return count of user "id" grouped
# by "name"
session.query(func.count(User.id)).\
        group_by(User.name)

from sqlalchemy import distinct

# count distinct "name" values
session.query(func.count(distinct(User.name)))