雪花算法认知(Twitter_Snowflake)

99%的焦虑都来自于虚度时间和没有好好做事,所以唯一的解决办法就是行动起来,认真做完事情,战胜焦虑,战胜那些心里空荡荡的时刻,而不是选择逃避。不要站在原地想象困难,行动永远是改变现状的最佳方式

写在前面


  • 生产中一次用雪花算法生产的编码主键冲突了,所以研究一下
  • 博文内容为雪花算法简单认知以及生产问题解决,以及一些开源方案UidGeneratorLeaf学习
  • 理解不足小伙伴帮忙指正 :),生活加油

99%的焦虑都来自于虚度时间和没有好好做事,所以唯一的解决办法就是行动起来,认真做完事情,战胜焦虑,战胜那些心里空荡荡的时刻,而不是选择逃避。不要站在原地想象困难,行动永远是改变现状的最佳方式

持续分享技术干货,感兴趣小伙伴可以关注下 ^_^


雪花算法认知

雪花算法(Snowflake Algorithm)是一种分布式 ID 生成算法,最早由 Twitter 提出,用于生成全局唯一的 ID。该算法通过组合时间戳、机器ID、数据中心ID、序列号等信息,能够在分布式环境下高效地生成唯一的 ID。

https://github.com/twitter-archive/snowflake/releases/tag/snowflake-2010

雪花算法的结构:

雪花算法生成的 ID 通常由 64 位二进制数组成,结构如下:| 1 位符号位 | 41 位时间戳(毫秒级别) | 10 位机器ID | 12 位序列号 | 1 位保留位 |

符号位:通常固定为 0,因为 ID 是正数。
时间戳部分(41 位):以毫秒为单位,通常用来表示自某一时刻(通常是一个固定的时间戳,如 1970-01-01)以来的毫秒数。41 位能够表示大约 69 年的时间跨度。
机器 ID 部分(10 位):通常由 5 位数据中心 ID 和 5 位机器 ID 组成,用来标识不同的机器和数据中心。
序列号部分(12 位):用于在同一毫秒内生成多个不同的 ID,表示同一机器在同一毫秒内生成的序列号。

关于基本算法代码这里不多讲,网上有很多实现,先直接看一下问题,项目中有一些编码是通过雪花算法生成,但是在持久化的时候,存在主键冲突的情况,因为没有那么高的并发,所以很奇怪。算法是之前的同事网上找的代码直接拿来即用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package com.ruoyi.common.ext.sole;

/**
* Twitter_Snowflake<br>
* SnowFlake的结构如下(每部分用-分开):<br>
* 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br>
* 1位标识,由于long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,所以id一般是正数,最高位是0<br>
* 41位时间截(毫秒级),注意,41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截)
* 得到的值),这里的的开始时间截,一般是我们的id生成器开始使用的时间,由我们程序来指定的(如下下面程序IdWorker类的startTime属性)。41位的时间截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>
* 10位的数据机器位,可以部署在1024个节点,包括5位datacenterId和5位workerId<br>
* 12位序列,毫秒内的计数,12位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生4096个ID序号<br>
* 加起来刚好64位,为一个Long型。<br>
* SnowFlake的优点是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由数据中心ID和机器ID作区分),并且效率较高,经测试,SnowFlake每秒能够产生26万ID左右。
*/

import com.ruoyi.common.utils.ip.IpUtils;
import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* 生成永不重复业务订单号(策略2)
*/
public class SnowFlake {

// ==============================Fields===========================================
/**
* 开始时间截 (2018-07-03)
*/

private final long twepoch = 1530607760000L;

/**
* 机器id所占的位数
*/
private final long workerIdBits = 5L;

/**
* 数据标识id所占的位数
*/
private final long datacenterIdBits = 5L;

/**
* 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

/**
* 支持的最大数据标识id,结果是31
*/
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

/**
* 序列在id中占的位数
*/
private final long sequenceBits = 12L;

/**
* 机器ID向左移12位
*/
private final long workerIdShift = sequenceBits;

/**
* 数据标识id向左移17位(12+5)
*/
private final long datacenterIdShift = sequenceBits + workerIdBits;

/**
* 时间截向左移22位(5+5+12)
*/
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

/**
* 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
*/
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

/**
* 工作机器ID(0~31)
*/
private long workerId;

/**
* 数据中心ID(0~31)
*/
private long datacenterId;

/**
* 毫秒内序列(0~4095)
*/
private long sequence = 0L;

/**
* 上次生成ID的时间截
*/
private long lastTimestamp = -1L;

//==============================Constructors=====================================

private static String ipAddress;

/* 工作机器ID ,对应每个 Pod */
private static long worker;

private static long datacenter;


static {
String ipAddres;
try {
ipAddres = IpUtils.getHostIp();
} catch (Exception e) {
e.printStackTrace();
ipAddres = "168.254.1.1";
}
ipAddress = ipAddres;
// 工作机器ID 分布式部署有几个pod % 几
worker = Math.abs(ipToInt(ipAddres)) % 3 ;
// 数据中心ID 分布式部署,对应 集群个数 或者 zone 个数,没有见就直接 % 最大
datacenter = Math.abs(ipToInt(ipAddres)) % 31 ;
}

/**
* 构造函数
*
* @param workerId 工作ID (0~31)
* @param datacenterId 数据中心ID (0~31)
*/
public SnowFlake(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;

}

private static class Singtetons{
private static final SnowFlake SINGLETON = new SnowFlake(worker,datacenter);
}

public static SnowFlake getInstance(){
return Singtetons.SINGLETON;
}

public static void show() {
System.out.println("===================== ipAddress:" + ipAddress);
System.out.println("===================== worker: "+ worker+ " datacenter: " + datacenter);
}

// ==============================Methods==========================================

/**
* 获得下一个ID (该方法是线程安全的)
*
* @return SnowflakeId
*/
public synchronized long nextId() {
long timestamp = timeGen();

//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}

//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}

//上次生成ID的时间截
lastTimestamp = timestamp;

//移位并通过或运算拼到一起组成64位的ID
return (((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence);
}

/**
* 阻塞到下一个毫秒,直到获得新的时间戳
*
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}

/**
* 返回以毫秒为单位的当前时间
*
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}

private static int ipToInt(String ip) {
String[] ipParts = ip.split("\\.");
int ipInt = 0;
for (int i = 0; i < 4; i++) {
ipInt |= Integer.parseInt(ipParts[i]) << (24 - (8 * i));
}
return ipInt;
}

/**
* serialCode:业务code
*/
public static String No(String serialCode) {
/* 解决分布式主键冲突的问题
* */
//long randomNo = new SnowFlake(worker, datacenter).nextId();
long randomNo = SnowFlake.getInstance().nextId();
return serialCode + randomNo;
}

public static String No(String serialCode,Long workerId,Long datacenterId) {
long randomNo = new SnowFlake(workerId, datacenterId).nextId();
return serialCode + randomNo;
}

public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
String id = SnowFlake.No("UD");
System.out.println(id);
}
}
}

我最开始以为是 雪花算法机器位的问题,因为之前使用中默认工作节点位和数据中心位都是 0 ,多个 Pod 都是一样的,项目在 K8s 上,通过无状态控制器分布式部署

工作机器ID和数据中心ID 如何确定

所以这里的解决办法是通过 IP 地址计算出 机器位和数据中心位,代码如下,取 Pod 对应的第一个 IP 地址,计算方式通过静态代码块的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static String ipAddress;
/* 工作机器ID ,对应每个 Pod */
private static long worker;
private static long datacenter;
static {
String ipAddres = "168.254.1.1";
try {
ipAddres = IpUtils.getHostIp();
} catch (Exception e) {
e.printStackTrace();
}
ipAddress = ipAddres;
// 工作机器ID 分布式部署有几个pod % 几
worker = Math.abs(ipToInt(ipAddres)) % 3 ;
// 数据中心ID 分布式部署,对应 集群个数 或者 zone 个数,没有见就直接 % 最大
datacenter = Math.abs(ipToInt(ipAddres)) % 31 ;
}

private static int ipToInt(String ip) {
String[] ipParts = ip.split("\\.");
int ipInt = 0;
for (int i = 0; i < 4; i++) {
ipInt |= Integer.parseInt(ipParts[i]) << (24 - (8 * i));
}
return ipInt;
}

工作机器ID 的计算方式,用 IP 地址的 32位整数 和 实际的 Pod 个数取模

假设有三个 Pod

1
worker = Math.abs(ipToInt(ipAddres)) % 3 ;

数据中心ID 如果集群有多个或者做了 zone 规划,那么可以模实际的集群或者区域个数,如果没有,可以直接模数据中心ID最大值

1
datacenter = Math.abs(ipToInt(ipAddres)) % 31 ;

这样可以有效的避免生成ID 重复的问题,减少不同 POD 中机器位一致的情况,但是不能百分百保证,当然有更好的解决办法,即通过配置表来维护,需要数据库支持,或者利用类似注册中心的机制,比如 zk 来动态分配,我们下文讲到的一些开源方案使用这种方式。实际选择需要综合考虑

在实际的生产中,通过上面的方式配置之后,还是有冲突的情况,后来详细看了代码才发现,在生成唯一ID的方式中,每次都生成一个ID都会创建一个实例,通过 for 循环测试, nextId() 就失去了意义,好多重复的ID

SnowFlake 需要是单例

如果 SnowFlake 不是单例,同一毫秒内会生成相同的实例对象,sequence 的初始值都为 0,所以每次会生成相同的ID,下面为原来有问题的代码

1
2
3
4
5
6
7
public static String No(String serialCode) {
/* 解决分布式主键冲突的问题
* */
long randomNo = new SnowFlake(worker, datacenter).nextId();
//long randomNo = SnowFlake.getInstance().nextId();
return serialCode + randomNo;
}

解决办法,用静态内部类做成单例,这里可以解决这个问题

1
2
3
4
5
6
7
8
9
10
11
12
13
private  static class Singtetons{
private static final SnowFlake SINGLETON = new SnowFlake(worker,datacenter);
}
public static SnowFlake getInstance(){
return Singtetons.SINGLETON;
}
public static String No(String serialCode) {
/* 解决分布式主键冲突的问题
* */
long randomNo = SnowFlake.getInstance().nextId();
return serialCode + randomNo;
}

当然还有一种解决办法,是纳管到,Spring Bean 的生命周期里面,通过实现 InitializingBean 接口来初始化对象,默认情况下是单例

时间回拨和 NTP

关于时间回拨和 NTP 不一致的问题,雪花算法内部有一个时间戳比较的逻辑,用于考虑时间回拨,存在类似的问题,直接抛出异常,应为没有缓存,所以只适用单个进程内的校验,但是如果 NTP 不同步,分布式部署,机器位一样的话,还是会造成时间回拨的问题

这里的解决办法是类似一些开源的解决办法,配合数据库解决,或者基于缓冲做校验,不过换一种角度考虑,当基于时间戳来生成全局ID ,是建立在 NTP 没有问题的情况,类比集群选举,解决问题不能推翻在已经建立问题的基础,比如想解决房子结构不稳定坍塌的问题,就不应该从地基角度考虑

开源方案介绍

UidGenerator

UidGenerator 是Java实现的百度开源的解决方案, 基于Snowflake算法的唯一ID生成器。UidGenerator以组件形式工作在应用项目中, 支持自定义workerId位数和初始化策略, 从而适用于docker等虚拟化环境下实例自动重启、漂移等场景。 在实现上, UidGenerator通过借用未来时间来解决sequence天然存在的并发限制; 采用RingBuffer来缓存已生成的UID, 并行化UID的生产和消费, 同时对CacheLine补齐,避免了由RingBuffer带来的硬件级「伪共享」问题. 最终单机QPS可达600万。

项目地址:

https://github.com/baidu/uid-generator/blob/master/README.zh_cn.md

依赖版本:Java8及以上版本, MySQL(内置WorkerID分配器, 启动阶段通过DB进行分配; 如自定义实现, 则DB非必选依赖)

这里的缓存行填充,避免伪共享,是一种常见的通过编程实现 CPU 性能优化的手段,也叫数据对齐,即确保数据结构中的元素按照缓存行大小对齐,以减少伪共享(False Sharing)

在并发编程中。当多个线程或进程在不同的CPU核心访问不同的数据项,但这些数据项恰好位于同一个缓存行(Cache Line)中时,就会发生伪共享。即一个 64字节 的缓存行包含多个数据时

为了确保所有处理器看到的数据是一致的,每个处理器都有自己的缓存,并且存在一种机制(缓存一致性协议(Cache Coherence Protocol))来同步这些缓存(例如MESI协议)。当一个线程修改了它所在缓存行的数据时,该缓存行在其他处理器的缓存中会变为无效(缓存探测)。这意味着其他处理器必须从主内存中重新加载该缓存行(缓存失效),即使它们只访问了该缓存行中的其他数据项。频繁的缓存行失效和重新加载会导致显著的性能开销,因为处理器访问主内存的速度远慢于访问缓存。

常见的解决办法是使用数据对齐的方式,即避免两个不同的数据项位于一个同一个缓存行

简单看了下源码:

先来看一下这个做数据对齐的类,不得不说大厂的代码就是规范。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.baidu.fsg.uid.utils;

import java.util.concurrent.atomic.AtomicLong;

/**
* Represents a padded {@link AtomicLong} to prevent the FalseSharing problem<p>
*
* The CPU cache line commonly be 64 bytes, here is a sample of cache line after padding:<br>
* 64 bytes = 8 bytes (object reference) + 6 * 8 bytes (padded long) + 8 bytes (a long value)
*
* @author yutianbao
*/
public class PaddedAtomicLong extends AtomicLong {
private static final long serialVersionUID = -3415778863941386253L;
/** Padded 6 long (48 bytes) */
public volatile long p1, p2, p3, p4, p5, p6 = 7L;
/**
* Constructors from {@link AtomicLong}
*/
public PaddedAtomicLong() {
super();
}
public PaddedAtomicLong(long initialValue) {
super(initialValue);
}
/**
* To prevent GC optimizations for cleaning unused padded references
*/
public long sumPaddingToPreventOptimization() {
return p1 + p2 + p3 + p4 + p5 + p6;
}
}

注释很清晰,64 bytes = 8 bytes (object reference) + 6 * 8 bytes (padded long) + 8 bytes (a long value) Java 中一个 Long 是 8 字节,该类中包含了 6 个 volatile long 类型的字段:p1, p2, p3, p4, p5, p6,这些字段不会被使用,只是填充空间,防止 AtomicLong 实例与这些字段处于同一缓存行中。每个 long 类型占 8 字节,因此这些填充字段总共占用 48 字节,实际的数据是在 p7 位置存放。

VolatileLong 对象的实际的内存布局和大小。我们来逐项分析这个布局,解释每一部分的内存使用情况。

内存布局分析

OFFSET SIZE TYPE DESCRIPTION VALUE
0 12 bytes Object Header 对象头部 N/A
12 4 bytes Alignment/ Padding 填充(为了内存对齐) N/A
16 8 bytes long VolatileLong.value N/A
24 8 bytes long VolatileLong.p1 N/A
32 8 bytes long VolatileLong.p2 N/A
40 8 bytes long VolatileLong.p3 N/A
48 8 bytes long VolatileLong.p4 N/A
56 8 bytes long VolatileLong.p5 N/A
64 8 bytes long VolatileLong.p6 N/A
  1. 对象头部 (Object Header) — 12 bytes 每个 Java 对象在 JVM 中都会有一个对象头部,它包含了与对象管理相关的信息,这个对象头部的大小通常是 12 字节(在 64 位系统中)。通常包括:

    • Mark Word(包含锁信息、哈希码、GC 信息等)
    • Class Pointer(指向对象的类元数据)
  2. 填充 (Alignment/Padding) — 4 bytes 由于 内存对齐 的原因,JVM 会在字段间插入填充字节。这里的 4 字节填充是为了确保 long 类型的字段从 8 字节对齐的位置开始。在 64 位系统中,long 类型通常需要 8 字节对齐。所以,填充的 4 字节并不存储任何有效数据,它只是为了保证接下来的字段(如VolatileLong.value)的对齐。 这里为什么是 4,是因为 4 + 12 = 16 ,(64 - 16)% 8 = 0

  3. 字段(value, p1, p2, p3, p4, p5, p6) — 各 8 bytes 这些字段是 VolatileLong 类中的实际成员变量,每个 long 类型占用 8 字节。

    • VolatileLong.value:这个字段存储实际的数据。
    • VolatileLong.p1, p2, p3, p4, p5, p6:这些字段是 填充字段,用于避免伪共享(False Sharing)。它们是空的,不用于存储有效数据,但它们的存在确保了 value 字段在内存中的位置不会与其他可能频繁访问的字段共用同一个缓存行(Cache Line),从而减少多线程并发时的性能损失。
  4. 实例大小 (Instance Size) — 72 bytes 该对象的总大小为 72 字节。这包括:

    • 对象头部的 12 字节
    • 填充字节的 4 字节
    • 6 个 long 字段(每个字段 8 字节),共计 48 字节

sumPaddingToPreventOptimization 方法:该方法的唯一作用是访问这些填充字段,从而防止它们被 JVM GC 优化掉,确保它们始终在内存中占有空间。这有助于在多核 CPU 上保持线程的高效执行。

UidGenerator 生产 全局ID 的核心类,这是原始的不带缓存的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package com.baidu.fsg.uid.impl;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

import com.baidu.fsg.uid.BitsAllocator;
import com.baidu.fsg.uid.UidGenerator;
import com.baidu.fsg.uid.exception.UidGenerateException;
import com.baidu.fsg.uid.utils.DateUtils;
import com.baidu.fsg.uid.worker.WorkerIdAssigner;

/**
* Represents an implementation of {@link UidGenerator}
*
* The unique id has 64bits (long), default allocated as blow:<br>
* <li>sign: The highest bit is 0
* <li>delta seconds: The next 28 bits, represents delta seconds since a customer epoch(2016-05-20 00:00:00.000).
* Supports about 8.7 years until to 2024-11-20 21:24:16
* <li>worker id: The next 22 bits, represents the worker's id which assigns based on database, max id is about 420W
* <li>sequence: The next 13 bits, represents a sequence within the same second, max for 8192/s<br><br>
*
* The {@link DefaultUidGenerator#parseUID(long)} is a tool method to parse the bits
*
* <pre>{@code
* +------+----------------------+----------------+-----------+
* | sign | delta seconds | worker node id | sequence |
* +------+----------------------+----------------+-----------+
* 1bit 28bits 22bits 13bits
* }</pre>
*
* You can also specified the bits by Spring property setting.
* <li>timeBits: default as 28
* <li>workerBits: default as 22
* <li>seqBits: default as 13
* <li>epochStr: Epoch date string format 'yyyy-MM-dd'. Default as '2016-05-20'<p>
*
* <b>Note that:</b> The total bits must be 64 -1
*
* @author yutianbao
*/
public class DefaultUidGenerator implements UidGenerator, InitializingBean {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUidGenerator.class);

/** Bits allocate */
protected int timeBits = 28;
protected int workerBits = 22;
protected int seqBits = 13;

/** Customer epoch, unit as second. For example 2016-05-20 (ms: 1463673600000)*/
protected String epochStr = "2016-05-20";
protected long epochSeconds = TimeUnit.MILLISECONDS.toSeconds(1463673600000L);

/** Stable fields after spring bean initializing */
protected BitsAllocator bitsAllocator;
protected long workerId;

/** Volatile fields caused by nextId() */
protected long sequence = 0L;
protected long lastSecond = -1L;

/** Spring property */
protected WorkerIdAssigner workerIdAssigner;

@Override
public void afterPropertiesSet() throws Exception {
// initialize bits allocator
bitsAllocator = new BitsAllocator(timeBits, workerBits, seqBits);

// initialize worker id
workerId = workerIdAssigner.assignWorkerId();
if (workerId > bitsAllocator.getMaxWorkerId()) {
throw new RuntimeException("Worker id " + workerId + " exceeds the max " + bitsAllocator.getMaxWorkerId());
}

LOGGER.info("Initialized bits(1, {}, {}, {}) for workerID:{}", timeBits, workerBits, seqBits, workerId);
}

@Override
public long getUID() throws UidGenerateException {
try {
return nextId();
} catch (Exception e) {
LOGGER.error("Generate unique id exception. ", e);
throw new UidGenerateException(e);
}
}

// 该方法用于解析生成的 UID,解析后返回一个 JSON 格式的字符串
@Override
public String parseUID(long uid) {
long totalBits = BitsAllocator.TOTAL_BITS;
long signBits = bitsAllocator.getSignBits();
long timestampBits = bitsAllocator.getTimestampBits();
long workerIdBits = bitsAllocator.getWorkerIdBits();
long sequenceBits = bitsAllocator.getSequenceBits();

// parse UID
long sequence = (uid << (totalBits - sequenceBits)) >>> (totalBits - sequenceBits);
long workerId = (uid << (timestampBits + signBits)) >>> (totalBits - workerIdBits);
long deltaSeconds = uid >>> (workerIdBits + sequenceBits);

Date thatTime = new Date(TimeUnit.SECONDS.toMillis(epochSeconds + deltaSeconds));
String thatTimeStr = DateUtils.formatByDateTimePattern(thatTime);

// format as string
return String.format("{\"UID\":\"%d\",\"timestamp\":\"%s\",\"workerId\":\"%d\",\"sequence\":\"%d\"}",
uid, thatTimeStr, workerId, sequence);
}

/**
* Get UID
// 这个方法是生成唯一 ID 的核心逻辑。它是线程安全的
*
* @return UID
* @throws UidGenerateException in the case: Clock moved backwards; Exceeds the max timestamp
*/
protected synchronized long nextId() {
// 获取当前秒数:调用 getCurrentSecond() 获取当前秒级时间戳。
long currentSecond = getCurrentSecond();

// Clock moved backwards, refuse to generate uid
if (currentSecond < lastSecond) {
long refusedSeconds = lastSecond - currentSecond;
throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
}

// At the same second, increase sequence
if (currentSecond == lastSecond) {
sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
// Exceed the max sequence, we wait the next second to generate uid
if (sequence == 0) {
currentSecond = getNextSecond(lastSecond);
}

// At the different second, sequence restart from zero
} else {
sequence = 0L;
}

lastSecond = currentSecond;

// Allocate bits for UID
return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
}

/**
* Get next millisecond
*/
private long getNextSecond(long lastTimestamp) {
long timestamp = getCurrentSecond();
while (timestamp <= lastTimestamp) {
timestamp = getCurrentSecond();
}

return timestamp;
}

/**
* Get current second
*/
private long getCurrentSecond() {
long currentSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
if (currentSecond - epochSeconds > bitsAllocator.getMaxDeltaSeconds()) {
throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond);
}

return currentSecond;
}


public void setEpochStr(String epochStr) {
if (StringUtils.isNotBlank(epochStr)) {
this.epochStr = epochStr;
this.epochSeconds = TimeUnit.MILLISECONDS.toSeconds(DateUtils.parseByDayPattern(epochStr).getTime());
}
}
}

生成的 UID 是 64 位的,按以下方式分配位数:

  • 1 bit 用于标识符(sign),始终为 0。
  • 28 bits 用于时间戳(delta seconds),表示自纪元时间以来的秒数,最多支持 8.7 年。即开始时间之后的 8.7 年
  • 22 bits 用于工作节点 ID,支持最多 420W 个节点。 机器位比 上面的雪花算法多 12 位+
  • 13 bits 用于序列号,支持每秒最多生成 8192 个 UID。 比上面的雪花算法多 1 位

雪花算法的并发实际受限于 sequence ,即同一毫秒内的序列大小,上面的雪花算法 序列位位 12 位所以(同一机器,同一时间截)产生4096个ID序号

UidGenerator 为了提高并发限制,通过借用未来时间来解决 sequence 天然存在的并发限制,UidGenerator sequence部分是以秒为单位的,所以1个worker 1秒内最多生成的id书为8192个(2的13次方)。

所以支持的最大qps为8192,通过缓存id来提高吞吐量。因为每秒最多生成8192个id,当1秒获取id数多于8192时,RingBuffer中的id很快消耗完毕,在填充RingBuffer时,生成的id的 只能使用下一秒内的序列部分,所以说使用未来的时间。

下面位 RingBuffer 的代码,可以看大量使用了 PaddedAtomicLong 避免伪共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package com.baidu.fsg.uid.buffer;

import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

import com.baidu.fsg.uid.utils.PaddedAtomicLong;

/**
* Represents a ring buffer based on array.<br>
* Using array could improve read element performance due to the CUP cache line. To prevent
* the side effect of False Sharing, {@link PaddedAtomicLong} is using on 'tail' and 'cursor'<p>
*
* A ring buffer is consisted of:
* <li><b>slots:</b> each element of the array is a slot, which is be set with a UID
* <li><b>flags:</b> flag array corresponding the same index with the slots, indicates whether can take or put slot
* <li><b>tail:</b> a sequence of the max slot position to produce
* <li><b>cursor:</b> a sequence of the min slot position to consume
*
* @author yutianbao
*/
public class RingBuffer {
private static final Logger LOGGER = LoggerFactory.getLogger(RingBuffer.class);

/** Constants */
private static final int START_POINT = -1;
private static final long CAN_PUT_FLAG = 0L; //用于标记当前slot的状态,表示可以put一个id进去
private static final long CAN_TAKE_FLAG = 1L; //用于标记当前slot的状态,表示可以take一个id
public static final int DEFAULT_PADDING_PERCENT = 50; //用于控制何时填充slots的默认阈值:当剩余的可用的slot的个数,小于bufferSize的50%时,需要生成id将slots填满

/** The size of RingBuffer's slots, each slot hold a UID */
private final int bufferSize; //slots的大小,默认为sequence可容量的最大值,即8192个
private final long indexMask;

private final long[] slots; //slots用于缓存已经生成的id
private final PaddedAtomicLong[] flags; //flags用于存储id的状态(是否可填充、是否可消费)

/** Tail: last position sequence to produce */
//Tail指针
//表示Producer生产的最大序号(此序号从0开始,持续递增)。Tail不能超过Cursor,即生产者不能覆盖未消费的slot。当Tail已赶上curosr,此时可通过rejectedPutBufferHandler指定PutRejectPolicy
private final AtomicLong tail = new PaddedAtomicLong(START_POINT); //

/** Cursor: current position sequence to consume */
//表示Consumer消费到的最小序号(序号序列与Producer序列相同)。Cursor不能超过Tail,即不能消费未生产的slot。当Cursor已赶上tail,此时可通过rejectedTakeBufferHandler指定TakeRejectPolicy
private final AtomicLong cursor = new PaddedAtomicLong(START_POINT);

/** Threshold for trigger padding buffer*/
private final int paddingThreshold; //用于控制何时填充slots的阈值

/** Reject put/take buffer handle policy */
//当slots满了,无法继续put时的处理策略。默认实现:无法进行put,仅记录日志
private RejectedPutBufferHandler rejectedPutHandler = this::discardPutBuffer;
//当slots空了,无法继续take时的处理策略。默认实现:仅抛出异常
private RejectedTakeBufferHandler rejectedTakeHandler = this::exceptionRejectedTakeBuffer;

/** Executor of padding buffer */
//用于运行【生成id将slots填满】任务
private BufferPaddingExecutor bufferPaddingExecutor;

/**
* Constructor with buffer size, paddingFactor default as {@value #DEFAULT_PADDING_PERCENT}
*
* @param bufferSize must be positive & a power of 2
*/
public RingBuffer(int bufferSize) {
this(bufferSize, DEFAULT_PADDING_PERCENT);
}

/**
* Constructor with buffer size & padding factor
*
* @param bufferSize must be positive & a power of 2
* @param paddingFactor percent in (0 - 100). When the count of rest available UIDs reach the threshold, it will trigger padding buffer<br>
* Sample: paddingFactor=20, bufferSize=1000 -> threshold=1000 * 20 /100,
* padding buffer will be triggered when tail-cursor<threshold
*/
public RingBuffer(int bufferSize, int paddingFactor) {
// check buffer size is positive & a power of 2; padding factor in (0, 100)
Assert.isTrue(bufferSize > 0L, "RingBuffer size must be positive");
Assert.isTrue(Integer.bitCount(bufferSize) == 1, "RingBuffer size must be a power of 2");
Assert.isTrue(paddingFactor > 0 && paddingFactor < 100, "RingBuffer size must be positive");

this.bufferSize = bufferSize;
this.indexMask = bufferSize - 1;
this.slots = new long[bufferSize];
this.flags = initFlags(bufferSize);

this.paddingThreshold = bufferSize * paddingFactor / 100;
}

/**
* Put an UID in the ring & tail moved<br>
* We use 'synchronized' to guarantee the UID fill in slot & publish new tail sequence as atomic operations<br>
*
* <b>Note that: </b> It is recommended to put UID in a serialize way, cause we once batch generate a series UIDs and put
* the one by one into the buffer, so it is unnecessary put in multi-threads
*
* @param uid
* @return false means that the buffer is full, apply {@link RejectedPutBufferHandler}
*/
public synchronized boolean put(long uid) {
long currentTail = tail.get();
long currentCursor = cursor.get();

// tail catches the cursor, means that you can't put any cause of RingBuffer is full
long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
if (distance == bufferSize - 1) {
rejectedPutHandler.rejectPutBuffer(this, uid);
return false;
}

// 1. pre-check whether the flag is CAN_PUT_FLAG
int nextTailIndex = calSlotIndex(currentTail + 1);
if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
rejectedPutHandler.rejectPutBuffer(this, uid);
return false;
}

// 2. put UID in the next slot
// 3. update next slot' flag to CAN_TAKE_FLAG
// 4. publish tail with sequence increase by one
slots[nextTailIndex] = uid;
flags[nextTailIndex].set(CAN_TAKE_FLAG);
tail.incrementAndGet();

// The atomicity of operations above, guarantees by 'synchronized'. In another word,
// the take operation can't consume the UID we just put, until the tail is published(tail.incrementAndGet())
return true;
}

/**
* Take an UID of the ring at the next cursor, this is a lock free operation by using atomic cursor<p>
*
* Before getting the UID, we also check whether reach the padding threshold,
* the padding buffer operation will be triggered in another thread<br>
* If there is no more available UID to be taken, the specified {@link RejectedTakeBufferHandler} will be applied<br>
*
* @return UID
* @throws IllegalStateException if the cursor moved back
*/
public long take() {
// spin get next available cursor
long currentCursor = cursor.get();
long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);

// check for safety consideration, it never occurs
Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");

// trigger padding in an async-mode if reach the threshold
long currentTail = tail.get();
if (currentTail - nextCursor < paddingThreshold) {
LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
nextCursor, currentTail - nextCursor);
bufferPaddingExecutor.asyncPadding();
}

// cursor catch the tail, means that there is no more available UID to take
if (nextCursor == currentCursor) {
rejectedTakeHandler.rejectTakeBuffer(this);
}

// 1. check next slot flag is CAN_TAKE_FLAG
int nextCursorIndex = calSlotIndex(nextCursor);
Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");

// 2. get UID from next slot
// 3. set next slot flag as CAN_PUT_FLAG.
long uid = slots[nextCursorIndex];
flags[nextCursorIndex].set(CAN_PUT_FLAG);

// Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the
// slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring
return uid;
}

/**
* Calculate slot index with the slot sequence (sequence % bufferSize)
*/
protected int calSlotIndex(long sequence) {
return (int) (sequence & indexMask);
}

/**
* Discard policy for {@link RejectedPutBufferHandler}, we just do logging
*/
protected void discardPutBuffer(RingBuffer ringBuffer, long uid) {
LOGGER.warn("Rejected putting buffer for uid:{}. {}", uid, ringBuffer);
}

/**
* Policy for {@link RejectedTakeBufferHandler}, throws {@link RuntimeException} after logging
*/
protected void exceptionRejectedTakeBuffer(RingBuffer ringBuffer) {
LOGGER.warn("Rejected take buffer. {}", ringBuffer);
throw new RuntimeException("Rejected take buffer. " + ringBuffer);
}

/**
* Initialize flags as CAN_PUT_FLAG
*/
private PaddedAtomicLong[] initFlags(int bufferSize) {
PaddedAtomicLong[] flags = new PaddedAtomicLong[bufferSize];
for (int i = 0; i < bufferSize; i++) {
flags[i] = new PaddedAtomicLong(CAN_PUT_FLAG);
}

return flags;
}
}

}

CachedUidGenerator 为带缓存的实现,采用了双RingBufferUid-RingBuffer用于存储Uid、Flag-RingBuffer用于存储Uid状态(是否可填充、是否可消费)

由于数组元素在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题,为此在Tail、Cursor指针、Flag-RingBuffer中采用了CacheLine 补齐方式。

在这里插入图片描述

从缓存中提取 ID 的方法,以及生成一秒内的 ID 的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

/**
* Represents a cached implementation of {@link UidGenerator} extends
* from {@link DefaultUidGenerator}, based on a lock free {@link RingBuffer}<p>
*
* The spring properties you can specified as below:<br>
* <li><b>boostPower:</b> RingBuffer size boost for a power of 2, Sample: boostPower is 3, it means the buffer size
* will be <code>({@link BitsAllocator#getMaxSequence()} + 1) &lt;&lt;
* {@link #boostPower}</code>, Default as {@value #DEFAULT_BOOST_POWER}
* <li><b>paddingFactor:</b> Represents a percent value of (0 - 100). When the count of rest available UIDs reach the
* threshold, it will trigger padding buffer. Default as{@link RingBuffer#DEFAULT_PADDING_PERCENT}
* Sample: paddingFactor=20, bufferSize=1000 -> threshold=1000 * 20 /100, padding buffer will be triggered when tail-cursor<threshold
* <li><b>scheduleInterval:</b> Padding buffer in a schedule, specify padding buffer interval, Unit as second
* <li><b>rejectedPutBufferHandler:</b> Policy for rejected put buffer. Default as discard put request, just do logging
* <li><b>rejectedTakeBufferHandler:</b> Policy for rejected take buffer. Default as throwing up an exception
*
* @author yutianbao
*/
public class CachedUidGenerator extends DefaultUidGenerator implements DisposableBean {
....................

@Override
public long getUID() {
try {
return ringBuffer.take();
} catch (Exception e) {
LOGGER.error("Generate unique id exception. ", e);
throw new UidGenerateException(e);
}
}


/**
* Get the UIDs in the same specified second under the max sequence
*
* @param currentSecond
* @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1
*/
protected List<Long> nextIdsForOneSecond(long currentSecond) {
// Initialize result list size of (max sequence + 1)
int listSize = (int) bitsAllocator.getMaxSequence() + 1;
List<Long> uidList = new ArrayList<>(listSize);

// Allocate the first sequence of the second, the others can be calculated with the offset
long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
for (int offset = 0; offset < listSize; offset++) {
uidList.add(firstSeqUid + offset);
}

return uidList;
}
}

paddingBuffer 用于自动填充ID 到缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Padding buffer fill the slots until to catch the cursor
*/
public void paddingBuffer() {
LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);

// is still running
if (!running.compareAndSet(false, true)) {
LOGGER.info("Padding buffer is still running. {}", ringBuffer);
return;
}

// fill the rest slots until to catch the cursor
boolean isFullRingBuffer = false;
while (!isFullRingBuffer) {
//获取生成的id,放到RingBuffer中。
List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
for (Long uid : uidList) {
isFullRingBuffer = !ringBuffer.put(uid);
if (isFullRingBuffer) {
break;
}
}
}

// not running now
running.compareAndSet(true, false);
LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
}

Leaf

Leaf 相对来说更重一点,也是基于 Java 的,基于Springboot 通过服务的方式提供能力,是美团开源的分布式ID生成服务,目前Leaf覆盖了美团点评公司内部金融、餐饮、外卖、酒店旅游、猫眼电影等众多业务线。在4C8G VM基础上,通过公司RPC方式调用,QPS压测结果近5w/s,TP999 1ms。

项目地址:

https://github.com/Meituan-Dianping/Leaf/blob/master/README_CN.md

Leaf 相关的博客在美团的技术论坛上面可以看到,内容很详细,通俗易懂,感兴趣小伙伴可以直接看看,对应的博文链接:

https://tech.meituan.com/2017/04/21/mt-leaf.html

这里简单介绍,Leaf 提供了雪花算法(Leaf-snowflake)号段(Leaf-segment)方式两种不同的生成ID方式

Leaf-segment(号段) 需要数据库支持,在原始的使用数据库的方案上,做了如下改变: 原方案每次获取ID都得读写一次数据库,造成数据库压力大。改为利用proxy server批量获取,每次获取一个segment(step决定大小)号段的值。用完之后再去数据库获取新的号段(和上面的 UidGenerator 缓存的方式类似),可以大大的减轻数据库的压力。 各个业务不同的发号需求用biz_tag字段来区分,每个biz-tag的ID获取相互隔离,互不影响。如果以后有性能需求需要对数据库扩容,不需要上述描述的复杂的扩容操作,只需要对biz_tag分库分表就行。同时 Leaf 也采用了 双 buffer 的方式,Leaf服务内部有两个号段缓存区 segment。当前号段已下发10%时,如果下一个号段未更新,则另启一个更新线程去更新下一个号段。当前号段全部下发完后,如果下个号段准备好了则切换到下个号段为当前segment接着下发,循环往复。

Leaf-snowflake(雪花)方案完全沿用 snowflake 方案的bit位设计,即是“1+41+10+12”的方式组装ID号。对于workerID的分配,当服务集群数量较小的情况下,完全可以手动配置。Leaf服务规模较大,动手配置成本太高。所以使用Zookeeper持久顺序节点的特性自动对snowflake节点配置wokerID。

博文部分内容参考

© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知 :)


https://github.com/twitter-archive/snowflake/releases/tag/snowflake-2010

https://github.com/Meituan-Dianping/Leaf/blob/master/README_CN.md

https://tech.meituan.com/2017/04/21/mt-leaf.html

https://www.cnblogs.com/yeyang/p/10226284.html

https://blog.csdn.net/qq_39383767/article/details/143649836

https://cloud.tencent.com/developer/article/2364487


© 2018-至今 liruilonger@gmail.com, 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)

发布于

2025-01-13

更新于

2025-02-04

许可协议

评论
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×