import time class Snowflake: def __init__(self, datacenter_id, worker_id): self.sequence = 0 self.last_timestamp = -1 self.datacenter_id = datacenter_id self.worker_id = worker_id self.DATACENTER_ID_BITS = 5 self.WORKER_ID_BITS = 5 self.SEQUENCE_BITS = 12 self.MAX_DATACENTER_ID = -1 ^ (-1 << self.DATACENTER_ID_BITS) self.MAX_WORKER_ID = -1 ^ (-1 << self.WORKER_ID_BITS) self.MAX_SEQUENCE = -1 ^ (-1 << self.SEQUENCE_BITS) self.DATA_CENTER_SHIFT = self.WORKER_ID_BITS + self.SEQUENCE_BITS self.WORKER_SHIFT = self.SEQUENCE_BITS self.TIMESTAMP_LEFT_SHIFT = self.DATACENTER_ID_BITS + self.WORKER_ID_BITS + self.SEQUENCE_BITS def generate_id(self): timestamp = int(time.time() * 1000) if timestamp < self.last_timestamp: raise Exception("Clock moved backwards. Refusing to generate id") if timestamp == self.last_timestamp: self.sequence = (self.sequence + 1) & self.MAX_SEQUENCE if self.sequence == 0: timestamp = self.wait_next_millis(self.last_timestamp) else: self.sequence = 0 self.last_timestamp = timestamp return ( (timestamp << self.TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << self.DATA_CENTER_SHIFT) | (self.worker_id << self.WORKER_SHIFT) | self.sequence ) def wait_next_millis(self, last_timestamp): timestamp = int(time.time() * 1000) while timestamp <= last_timestamp: timestamp = int(time.time() * 1000) return timestamp if __name__ == "__main__": # 假设数据中心ID为1,工作机器ID为1 snowflake = Snowflake(231, 111) for _ in range(10): print(snowflake.generate_id())