/** * 构造函数 * * @param workerId 工作ID (0~31) * @param datacenterId 数据中心ID (0~31) */ publicSnowFlake(long workerId, long datacenterId){ if (workerId > maxWorkerId || workerId < 0) { thrownew IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); } if (datacenterId > maxDatacenterId || datacenterId < 0) { thrownew IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId)); } this.workerId = workerId; this.datacenterId = datacenterId;
}
privatestaticclassSingtetons{ privatestaticfinal SnowFlake SINGLETON = new SnowFlake(worker,datacenter); }
//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 if (timestamp < lastTimestamp) { thrownew RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); }
/** * Represents an implementation of {@link UidGenerator} * * The unique id has 64bits (long), default allocated as blow:<br> * <li>sign: The highest bit is 0 * <li>delta seconds: The next 28 bits, represents delta seconds since a customer epoch(2016-05-20 00:00:00.000). * Supports about 8.7 years until to 2024-11-20 21:24:16 * <li>worker id: The next 22 bits, represents the worker's id which assigns based on database, max id is about 420W * <li>sequence: The next 13 bits, represents a sequence within the same second, max for 8192/s<br><br> * * The {@link DefaultUidGenerator#parseUID(long)} is a tool method to parse the bits * * <pre>{@code * +------+----------------------+----------------+-----------+ * | sign | delta seconds | worker node id | sequence | * +------+----------------------+----------------+-----------+ * 1bit 28bits 22bits 13bits * }</pre> * * You can also specified the bits by Spring property setting. * <li>timeBits: default as 28 * <li>workerBits: default as 22 * <li>seqBits: default as 13 * <li>epochStr: Epoch date string format 'yyyy-MM-dd'. Default as '2016-05-20'<p> * * <b>Note that:</b> The total bits must be 64 -1 * * @author yutianbao */ publicclassDefaultUidGeneratorimplementsUidGenerator, InitializingBean{ privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(DefaultUidGenerator.class);
/** Customer epoch, unit as second. For example 2016-05-20 (ms: 1463673600000)*/ protected String epochStr = "2016-05-20"; protectedlong epochSeconds = TimeUnit.MILLISECONDS.toSeconds(1463673600000L);
/** Stable fields after spring bean initializing */ protected BitsAllocator bitsAllocator; protectedlong workerId;
// 该方法用于解析生成的 UID,解析后返回一个 JSON 格式的字符串 @Override public String parseUID(long uid){ long totalBits = BitsAllocator.TOTAL_BITS; long signBits = bitsAllocator.getSignBits(); long timestampBits = bitsAllocator.getTimestampBits(); long workerIdBits = bitsAllocator.getWorkerIdBits(); long sequenceBits = bitsAllocator.getSequenceBits();
Date thatTime = new Date(TimeUnit.SECONDS.toMillis(epochSeconds + deltaSeconds)); String thatTimeStr = DateUtils.formatByDateTimePattern(thatTime);
// format as string return String.format("{\"UID\":\"%d\",\"timestamp\":\"%s\",\"workerId\":\"%d\",\"sequence\":\"%d\"}", uid, thatTimeStr, workerId, sequence); }
/** * Get UID // 这个方法是生成唯一 ID 的核心逻辑。它是线程安全的 * * @return UID * @throws UidGenerateException in the case: Clock moved backwards; Exceeds the max timestamp */ protectedsynchronizedlongnextId(){ // 获取当前秒数:调用 getCurrentSecond() 获取当前秒级时间戳。 long currentSecond = getCurrentSecond();
// Clock moved backwards, refuse to generate uid if (currentSecond < lastSecond) { long refusedSeconds = lastSecond - currentSecond; thrownew UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds); }
// At the same second, increase sequence if (currentSecond == lastSecond) { sequence = (sequence + 1) & bitsAllocator.getMaxSequence(); // Exceed the max sequence, we wait the next second to generate uid if (sequence == 0) { currentSecond = getNextSecond(lastSecond); }
// At the different second, sequence restart from zero } else { sequence = 0L; }
/** * Represents a ring buffer based on array.<br> * Using array could improve read element performance due to the CUP cache line. To prevent * the side effect of False Sharing, {@link PaddedAtomicLong} is using on 'tail' and 'cursor'<p> * * A ring buffer is consisted of: * <li><b>slots:</b> each element of the array is a slot, which is be set with a UID * <li><b>flags:</b> flag array corresponding the same index with the slots, indicates whether can take or put slot * <li><b>tail:</b> a sequence of the max slot position to produce * <li><b>cursor:</b> a sequence of the min slot position to consume * * @author yutianbao */ publicclassRingBuffer{ privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(RingBuffer.class);
/** The size of RingBuffer's slots, each slot hold a UID */ privatefinalint bufferSize; //slots的大小,默认为sequence可容量的最大值,即8192个 privatefinallong indexMask; privatefinallong[] slots; //slots用于缓存已经生成的id privatefinal PaddedAtomicLong[] flags; //flags用于存储id的状态(是否可填充、是否可消费)
/** Tail: last position sequence to produce */ //Tail指针 //表示Producer生产的最大序号(此序号从0开始,持续递增)。Tail不能超过Cursor,即生产者不能覆盖未消费的slot。当Tail已赶上curosr,此时可通过rejectedPutBufferHandler指定PutRejectPolicy privatefinal AtomicLong tail = new PaddedAtomicLong(START_POINT); //
/** Cursor: current position sequence to consume */ //表示Consumer消费到的最小序号(序号序列与Producer序列相同)。Cursor不能超过Tail,即不能消费未生产的slot。当Cursor已赶上tail,此时可通过rejectedTakeBufferHandler指定TakeRejectPolicy privatefinal AtomicLong cursor = new PaddedAtomicLong(START_POINT);
/** * Constructor with buffer size, paddingFactor default as {@value #DEFAULT_PADDING_PERCENT} * * @param bufferSize must be positive & a power of 2 */ publicRingBuffer(int bufferSize){ this(bufferSize, DEFAULT_PADDING_PERCENT); } /** * Constructor with buffer size & padding factor * * @param bufferSize must be positive & a power of 2 * @param paddingFactor percent in (0 - 100). When the count of rest available UIDs reach the threshold, it will trigger padding buffer<br> * Sample: paddingFactor=20, bufferSize=1000 -> threshold=1000 * 20 /100, * padding buffer will be triggered when tail-cursor<threshold */ publicRingBuffer(int bufferSize, int paddingFactor){ // check buffer size is positive & a power of 2; padding factor in (0, 100) Assert.isTrue(bufferSize > 0L, "RingBuffer size must be positive"); Assert.isTrue(Integer.bitCount(bufferSize) == 1, "RingBuffer size must be a power of 2"); Assert.isTrue(paddingFactor > 0 && paddingFactor < 100, "RingBuffer size must be positive");
/** * Put an UID in the ring & tail moved<br> * We use 'synchronized' to guarantee the UID fill in slot & publish new tail sequence as atomic operations<br> * * <b>Note that: </b> It is recommended to put UID in a serialize way, cause we once batch generate a series UIDs and put * the one by one into the buffer, so it is unnecessary put in multi-threads * * @param uid * @return false means that the buffer is full, apply {@link RejectedPutBufferHandler} */ publicsynchronizedbooleanput(long uid){ long currentTail = tail.get(); long currentCursor = cursor.get();
// tail catches the cursor, means that you can't put any cause of RingBuffer is full long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor); if (distance == bufferSize - 1) { rejectedPutHandler.rejectPutBuffer(this, uid); returnfalse; }
// 1. pre-check whether the flag is CAN_PUT_FLAG int nextTailIndex = calSlotIndex(currentTail + 1); if (flags[nextTailIndex].get() != CAN_PUT_FLAG) { rejectedPutHandler.rejectPutBuffer(this, uid); returnfalse; }
// 2. put UID in the next slot // 3. update next slot' flag to CAN_TAKE_FLAG // 4. publish tail with sequence increase by one slots[nextTailIndex] = uid; flags[nextTailIndex].set(CAN_TAKE_FLAG); tail.incrementAndGet();
// The atomicity of operations above, guarantees by 'synchronized'. In another word, // the take operation can't consume the UID we just put, until the tail is published(tail.incrementAndGet()) returntrue; }
/** * Take an UID of the ring at the next cursor, this is a lock free operation by using atomic cursor<p> * * Before getting the UID, we also check whether reach the padding threshold, * the padding buffer operation will be triggered in another thread<br> * If there is no more available UID to be taken, the specified {@link RejectedTakeBufferHandler} will be applied<br> * * @return UID * @throws IllegalStateException if the cursor moved back */ publiclongtake(){ // spin get next available cursor long currentCursor = cursor.get(); long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
// check for safety consideration, it never occurs Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");
// trigger padding in an async-mode if reach the threshold long currentTail = tail.get(); if (currentTail - nextCursor < paddingThreshold) { LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail, nextCursor, currentTail - nextCursor); bufferPaddingExecutor.asyncPadding(); }
// cursor catch the tail, means that there is no more available UID to take if (nextCursor == currentCursor) { rejectedTakeHandler.rejectTakeBuffer(this); }
// 1. check next slot flag is CAN_TAKE_FLAG int nextCursorIndex = calSlotIndex(nextCursor); Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");
// 2. get UID from next slot // 3. set next slot flag as CAN_PUT_FLAG. long uid = slots[nextCursorIndex]; flags[nextCursorIndex].set(CAN_PUT_FLAG);
// Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the // slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring return uid; }
/** * Calculate slot index with the slot sequence (sequence % bufferSize) */ protectedintcalSlotIndex(long sequence){ return (int) (sequence & indexMask); }
/** * Discard policy for {@link RejectedPutBufferHandler}, we just do logging */ protectedvoiddiscardPutBuffer(RingBuffer ringBuffer, long uid){ LOGGER.warn("Rejected putting buffer for uid:{}. {}", uid, ringBuffer); } /** * Policy for {@link RejectedTakeBufferHandler}, throws {@link RuntimeException} after logging */ protectedvoidexceptionRejectedTakeBuffer(RingBuffer ringBuffer){ LOGGER.warn("Rejected take buffer. {}", ringBuffer); thrownew RuntimeException("Rejected take buffer. " + ringBuffer); } /** * Initialize flags as CAN_PUT_FLAG */ private PaddedAtomicLong[] initFlags(int bufferSize) { PaddedAtomicLong[] flags = new PaddedAtomicLong[bufferSize]; for (int i = 0; i < bufferSize; i++) { flags[i] = new PaddedAtomicLong(CAN_PUT_FLAG); } return flags; } }
/** * Represents a cached implementation of {@link UidGenerator} extends * from {@link DefaultUidGenerator}, based on a lock free {@link RingBuffer}<p> * * The spring properties you can specified as below:<br> * <li><b>boostPower:</b> RingBuffer size boost for a power of 2, Sample: boostPower is 3, it means the buffer size * will be <code>({@link BitsAllocator#getMaxSequence()} + 1) << * {@link #boostPower}</code>, Default as {@value #DEFAULT_BOOST_POWER} * <li><b>paddingFactor:</b> Represents a percent value of (0 - 100). When the count of rest available UIDs reach the * threshold, it will trigger padding buffer. Default as{@link RingBuffer#DEFAULT_PADDING_PERCENT} * Sample: paddingFactor=20, bufferSize=1000 -> threshold=1000 * 20 /100, padding buffer will be triggered when tail-cursor<threshold * <li><b>scheduleInterval:</b> Padding buffer in a schedule, specify padding buffer interval, Unit as second * <li><b>rejectedPutBufferHandler:</b> Policy for rejected put buffer. Default as discard put request, just do logging * <li><b>rejectedTakeBufferHandler:</b> Policy for rejected take buffer. Default as throwing up an exception * * @author yutianbao */ publicclassCachedUidGeneratorextendsDefaultUidGeneratorimplementsDisposableBean{ .................... @Override publiclonggetUID(){ try { return ringBuffer.take(); } catch (Exception e) { LOGGER.error("Generate unique id exception. ", e); thrownew UidGenerateException(e); } }
/** * Get the UIDs in the same specified second under the max sequence * * @param currentSecond * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1 */ protected List<Long> nextIdsForOneSecond(long currentSecond){ // Initialize result list size of (max sequence + 1) int listSize = (int) bitsAllocator.getMaxSequence() + 1; List<Long> uidList = new ArrayList<>(listSize);
// Allocate the first sequence of the second, the others can be calculated with the offset long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L); for (int offset = 0; offset < listSize; offset++) { uidList.add(firstSeqUid + offset); }
/** * Padding buffer fill the slots until to catch the cursor */ publicvoidpaddingBuffer(){ LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
// is still running if (!running.compareAndSet(false, true)) { LOGGER.info("Padding buffer is still running. {}", ringBuffer); return; }
// fill the rest slots until to catch the cursor boolean isFullRingBuffer = false; while (!isFullRingBuffer) { //获取生成的id,放到RingBuffer中。 List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet()); for (Long uid : uidList) { isFullRingBuffer = !ringBuffer.put(uid); if (isFullRingBuffer) { break; } } }
// not running now running.compareAndSet(true, false); LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer); }