我只是怕某天死了,我的生命却一无所有。—-《奇幻之旅》
写在前面
有这样一个需求
以文件的方式定期给集团同步增量数据,我想把所有的静态数据抽离出来,通过配置文件的方式
需求比较简单,所以用选择pythoh
配置文件用yaml
,写了一个小模块
实现配置文件
读入内存为配置字典
实现配置文件
的动态加载
读入内存为配置字典
实现配置字典
由内存导出静态文件
理解错误的地方请小伙伴批评指正
我只是怕某天死了,我的生命却一无所有。—-《奇幻之旅》
这里需要说明的是,常说的动态加载配置
,一般基于观察者设计模式
实现的发布/订阅系统
,一般有两种模式,分别是推(Push)
模式和拉(Pull)
模式。
推模式
:服务端主动将数据更新发送给所有订阅的客户端,
拉模式
:由客户端主动发起请求来获取最新数据,通常客户端都采用定时进行轮询拉取的方式。
我们这里只是提供了一个可以动态加载配置文件刷新配置对象
的方法,把配置对象
定义为单例
,刷新的时候把当前存在的配置对象干掉,然后从新加载配置文件生成新的配置对象。即通过拉(Pull)
的方式实现。这里配置对象为主题
,使用配置对象的多个代码为观察者
。
先来看一下脚本yaml_util.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 """ @File : yaml_util.py @Time : 2022/03/22 14:10:46 @Author : Li Ruilong @Version : 1.0 @Contact : 1224965096@qq.com @Desc : 加载配置文件 pip install pyyaml """ import osimport timeimport yamlimport loggingimport jsonlogging.basicConfig(level=logging.DEBUG, format ='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s' ) class Yaml : _config = None def __new__ (cls, *args, **kw ): if not hasattr (cls, '_instance' ): cls._instance = object .__new__(cls) return cls._instance def __init__ (self, file_name="config.yaml" ): config_temp = None try : cur_path = os.path.dirname(os.path.realpath(__file__)) yaml_path = os.path.join(cur_path, file_name) f = open (yaml_path, 'r' , encoding='utf-8' ) config_temp = f.read() except Exception as e: logging.info("配置文件加载失败" , e) finally : f.close() self._config = yaml.safe_load(config_temp) def __str__ (self ): return json.dumps(self._config) def __del__ (self ): self._config = None self = None @staticmethod def get_config (file_name="config.yaml" ): return Yaml(file_name)._config @staticmethod def refresh_config (cls, file_name="config.yaml" ): del cls return Yaml(file_name)._config def set_config (contain, file_name="config_.yaml" ): cur_path = os.path.dirname(os.path.realpath(__file__)) yaml_path = os.path.join(cur_path, file_name) with open (yaml_path, 'w' , encoding='utf-8' ) as f: yaml.dump(contain, f) def get_yaml_config (file_name="config.yaml" ): return Yaml.get_config(file_name) def refresh_yaml_config (cls, file_name="config.yaml" ): return Yaml.refresh_config(cls,file_name) if __name__ == '__main__' : my_yaml_1 = Yaml() my_yaml_2 = Yaml() print (id (my_yaml_1) == id (my_yaml_2)) time.sleep(10 ) refresh_yaml_config(my_yaml_1)
上面是写好加载配置类模块,下面为定义的配置文件
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 mysql: db_host: "127.0.0.1" db_port: 3306 db_user: "root" db_password: "root" db_name: "uamdb" ssh: ssh_hostname: "192.168.26.55" ssh_username: "root" ssh_password: 'redhat' ssh_port: 22 file_name_template: "UAG07{0}{1}.txt" local_file_path: "./uam/sftp/" remote_path: "/app/sftp/ftpuam07/increment/" period: date: YEAR offset: 1 template: - line: "{unified_code}^{unified_type}^{unified_pwd}^{cust_codes}^Pwd007^07^{unified_state}" sql: "SELECT a.unified_code as unified_code ,a.unified_type as unified_type ,a.unified_pwd as unified_pwd \ ,a.unified_state as unified_state , b.cust_code as cust_codes FROM AU_UNIFIED a INNER JOIN AU_PRODUCT b \ ON a.unified_code = b.product_code AND a.chg_date > SUBDATE( NOW( ), INTERVAL {0} {1} )" - line: "{product_code}^{prod_type}^{product_pwd}^{cust_code}^{pwd_type}^07^St001" sql: "SELECT product_code, product_pwd,cust_code,prod_type, \ (CASE WHEN a.prod_type='2000004' then 'Pwd001' WHEN a.prod_type='2000001' then 'Pwd001' WHEN a.prod_type='2000002' then 'Pwd001' \ WHEN a.prod_type='2110008' then 'Pwd001' WHEN a.prod_type='2110011' then 'Pwd001' ELSE 'Pwd001' END) as pwd_type \ from AU_PRODUCT a WHERE a.chg_date > SUBDATE( NOW( ), INTERVAL {0} {1} )"
如何使用
下面为一个数据库连接池工具类的使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ..... import yaml_utilfrom dbutils.pooled_db import PooledDBclass MysqlPool : def __init__ (self ): mysql = yaml_util.get_config()["mysql" ] print (mysql) self.POOL = PooledDB( creator=pymysql, maxconnections=6 , mincached=2 , maxcached=5 , maxshared=3 , blocking=True , maxusage=None , setsession=[], ping=0 , host=mysql["db_host" ], port=mysql["db_port" ], user=mysql["db_user" ], password=mysql["db_password" ], database=mysql["db_name" ], charset='utf8' ) ...................
关于如何触发刷新配置文件
方法
我们这里修改完配置文件通过UI界面
主动调用函数加载。其他项目场景个人觉得可以通过心跳
或者探针
的机制传递文件摘要信息串
(通过MD5
,SHA
等信息摘要算法生成)进行比对,具体的手段可以通过类似脏值轮询检查
或者数据劫持
等方式
关于观察者设计模式
,是一个很常用的设计模式
基于MVVM
模式的前端框架中,双向数据绑定特性,如Vue.js
都是基于此,在系统运行过程中,一旦系统中的数据模型发生了变化,观察者 Observer
的setter
访问器属性就会被触发,此时消息订阅中心会遍历它所维护的所有订阅者,对于每一个订阅了该数据的对象,向它发出一个更新通知
,订阅者收到通知后就会对视图进行相应的更新。以上过程不断往复循环,这就是MVVM
模式在Vue.js
中的运行原理。
后端分布式一致性解决方案
中,使用ZooKeeper
做配置中心
时,也是基于观察者模式
,采用的是推拉相结合
的方式,客户端向服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送Watcher
事件通知,客户端接收到这个消息通知之后,需要主动到服务端获取最新的数据。将配置信息存放到ZooKeeper
上进行集中管理,应用在启动的时候都会主动到ZooKeeper
服务端上进行一次配置信息的获取,同时,在指定节点上注册一个Watcher
监听,这样一来,但凡配置信息发生变更,服务端都会实时通知到所有订阅的客户端,从而达到实时获取最新配置信息的目的。
包括JDK
很早的版本就有观察者模式
的实现,java.util
包内包含最基本的Observer接口
与Observable类
,不过主题类(Observable)
定义成了一个基本类不是接口,所以只能通过继承
的方式实现,考虑的继承的可维护性太差
,一般不怎么使用。
关于单例模式
单例模式是很常用的一种设计模式,即在整个生命周期中,对于该一个类生产的对象始终都是一个,不曾变化。保证了一个类仅有一个实例,并提供一个访问它的全局访问点。
单例的优点有很多,GOF中这样描述: 对唯一实例
的受控访问,缩小名空间
,Singleton模式
是对全局变量的一种改进。它避免了那些存储唯一实例的全局变量污染名空间。允许对操作和表示的精化,Singleton类可以有子类,而且用这个扩展类的实例来配置一个应用是很容易的。你可以用你所需要的类的实例在运行时刻配置应用等等,感兴趣小伙伴可以去看看《设计模式_可复用面向对象软件的基础》