SteemData Notify 代码学习一: Blockchain Worker / Code Study of SteemData Notify: Part one

in #cn7 years ago

话说学习编码最好的方式就是读优秀的代码和写代码。
尤其是读优秀的代码,既然自己写的代码很垃圾,多读读人家大牛们的优秀作品,受一下熏陶,沾惹点仙灵之气也好。俗话说:读书破万卷,下笔如有神!,俗话还说:熟读唐诗三百首,不会作诗也会吟!,那我把大牛们的代码读几遍,是不是也会写出牛光闪闪的代码呢?

又扯远了,言归正传,昨天给大家介绍了一下SteemData Notify,觉得是个挺有意思的产品:

今天呢,就来学习一下它的代码,看看具体是如何实现相关功能的。

代码功能模块

代码托管在github上,地址是: https://github.com/SteemData/notify.steemdata.com

通过简单分析,可以知晓代码分为应用(网站)端和后端

  • 应用(网站)端 src/app.py
  • 后端 src/worker.py

应用端: 使用Python + Flask + MongoDB
后端则主要使用: Python + Steem官方Python库 + MongoDB

我们这节着重分析后端的代码

后端代码

源码在这里
https://github.com/SteemData/notify.steemdata.com/blob/master/src/worker.py

通过阅读main函数
我们可以知道代码分成三大逻辑块

  • blockchain worker
  • confirmation worker
  • notifier worker

顾名思义,分别区块链工作进程、确认进程、通知进程, 分别通过不同的命令行参数启动。
至于为何不使用线程?以我实际经验,官方的Python 库对线程不是特别友好,尤其是一些复杂情况,可能导致很多意想不到的问题,所以多跑俩进程貌似也没啥不好的。或许作者有别的方面的思量,就不得而知了。

这篇文章我们主要学习三者之一的Blockchain Worker

Blockchain Worker

函数名: run_blockchain_worker()

    b = Blockchain()
    types = [
        'account_update',
        'change_recovery_account',
        'request_account_recovery',
        'transfer',
        'transfer_from_savings',
        'set_withdraw_vesting_route',
        'withdraw_vesting',
        'fill_order',
        'fill_convert_request',
        'fill_transfer_from_savings',
        'fill_vesting_withdraw',
]

b = Blockchain() 定义了Blockchain 实例
其本质呢,就是封装了一些对steem 节点的API操作
其中一个基本的操作是:
stream(self, filter_by: Union[str, list] = list(), *args, **kwargs)
简单的解释就是把steem区块链上发生的操作yield成操作流,然后可以针对不同的操作类型进行处理,比如说投票机器人就是Yield出来文章流,然后投票。

types = [ xxxx]
这一大堆,就是上述函数中的filter_by参数,用来过滤抽取我们需要的相关操作,忽略掉无关内容

有了上述对Blockchain以及filter_by的讲解,下面的代码就很好理解了

    try:
        block = db.last_processed_block.find_one()
        start_block = int(block['block_num']) - 1
    except Exception:
        start_block = None
    for op in b.stream(filter_by=types, start_block=start_block):
        processed = db.processed_blockchains.find({'_id': op['_id']}).count()
        if not processed:
            if parse_blockchain(op):
                db.processed_blockchains.insert_one(op)
                db.last_processed_block.delete_many({})
                db.last_processed_block.insert_one(op)

大致就是,把我们关心的操作读取回来,并且写到数据库中。

processed = db.processed_blockchains.find({'_id': op['_id']}).count()
判断我们是否已经处理过了,如果已经处理过,略过。

parse_blockchain(op)
这个判断是否是我们关心的数据,稍后详细讲

db.processed_blockchains.insert_one(op)
将op 插入我们处理过的op列表(mongodb)

db.last_processed_block.delete_many({})
db.last_processed_block.insert_one(op)
其实就是保存一下我们处理到哪里了,这样一旦因故障等问题中断,我们还可以接续上。

parse_blockchain(op)

前文我们说过,这个来获取我们关心的操作,这个是如何实现的呢?

def parse_blockchain(op):
    settings = None
    message = None

    if op['type'] == 'account_update':
        settings = find_user_settings(op['account'])
        if settings and settings['account_update']:
            message = 'Received event: account_update (%s)' % op['account']

    elif op['type'] in ['transfer', 'transfer_from_savings']:
        settings = find_user_settings(op['from'])
        if settings and settings[op['type']]:
            message = 'Received event: %s\nEvent detail: %s -> %s (%s)' % (
                op['type'], op['from'], op['to'], op['amount'],
            )
....

    if settings and message:
        db.notifications.insert_one({
            'username': settings['username'],
            'email': settings['email'],
            'telegram_channel_id': settings['telegram_channel_id'],
            'message': message,
            'email_sent': False,
            'telegram_sent': False,
            'created_at': datetime.utcnow(),
        })

    return True

嗯,我好像发现了一个了不起的BUG,一会再说。
这里又出来一个函数

def find_user_settings(username):
    try:
        rows = db.settings.find({'username': username, 'confirmed': True}).sort('created_at', -1)
        return rows[0]
    except Exception:
        return dict()

这个函数的功能,就是按用户名,读取用户(已确认)的设置并返回。

    if op['type'] == 'account_update':
        settings = find_user_settings(op['account'])
        if settings and settings['account_update']:
            message = 'Received event: account_update (%s)' % op['account']

这样,这段代码就好理解了,按白话文翻译就是
如果操作是更新账户,
那么我就去设置表读取这个用户的设置
如果存在设置表中有这个用户的设置 并且 这个用户设置了关心 'account_update'
那么通知信息就是吧啦啦啦
是不是很好理解

因为操作只能是这些操作中的一种,所以要逐一判断一下,直到发现或者判断完毕没有发现为止。

    if settings and message:
        db.notifications.insert_one({
            'username': settings['username'],
            'email': settings['email'],
            'telegram_channel_id': settings['telegram_channel_id'],
            'message': message,
            'email_sent': False,
            'telegram_sent': False,
            'created_at': datetime.utcnow(),
        })

    return True

如果发现了需要处理的操作,将其插入notifications 表。

BUG,BUG
注意这个 return True, 注意缩进,OMG,少缩进了一个TAB有木有?
这样,原本只有我们处理到的op 才应该返回True, 结果统统返回True了。

你问我后果是啥?貌似没啥后果,浪费一些数据库空间和CPU运算能力而已。

总结

  • Blockchain Worker 将blockchain上和账户有关的动态抓取进来
  • 判断是否是SteemData Notify 注册(并确认)用户相关的操作以及是否是用户关心的操作
  • 如果是,写入数据库通知表notifications

咦,一总结好像也没啥内容呢,那我为啥写了这么多呢?晕,这啰嗦的毛病要改一改了。
初学者水平有限,如有谬误敬请指正,深表谢意啦。


感谢阅读 / Thank you for reading.
欢迎upvote、resteem以及 following me @oflyhigh 😎

Sort:  

你的大作被抄袭了。还把你的名字删掉
https://steemit.com/cn/@herman2141/steemdata-comfirmation-201779t193816124z

Thanks for all of this good info

This comment has received a 0.15 % upvote from @booster thanks to: @hamzaoui.

Data is just the raw material but it become diamond after mining it.

Welcome the worlds best community !!!!

From now on you are on the green path to implement your kindness to everyone, hope you include the lucky people here.

Do not hesitate to share interesting and useful things. Good luck!!!

Greetings from me ** @abdelilahelkati **

Good job, keep it up

Coin Marketplace

STEEM 0.28
TRX 0.13
JST 0.033
BTC 61511.66
ETH 2989.02
USDT 1.00
SBD 3.63