# ZooKeeper

## 相关书籍

有本叫《ZooKeeper分布式过程协同技术详解》，不过我也没看过，😄。

## 官方文档最好的教程

https://zookeeper.apache.org/documentation.html 学会基本概念后，一定要去看官方文档，增强自己。自上而下，先从整体简单内容入手，再细读文档。

## ZooKeeper 是什么？

Apache ZooKeeper（简称 ZK）是一个开源的分布式协调服务框架，由 Apache Hadoop 项目孵化而来，主要为分布式应用提供高效、可靠的协调机制。它本质上是一个分布式的、类似文件系统的存储系统，但更专注于数据一致性和协调功能，而不是海量数据存储。ZK 使用 ZAB（ZooKeeper Atomic Broadcast）协议（基于 Paxos 算法的变体）来确保数据的一致性、高可用性和原子性操作。 简单来说，它像一个“分布式大脑”，帮助多个服务器或进程在网络环境下协作，避免混乱。

## 为什么要有 ZooKeeper？它为了解决什么问题？

在单机应用时代，一切简单。但分布式系统（如多个服务器协同工作）会面临诸多挑战：网络延迟、节点崩溃、数据不一致等。ZooKeeper 的诞生就是为了解决这些分布式协调问题，提供一个中心化的（但高可用）协调点，让开发者不用从零构建复杂的分布式逻辑。

具体解决的问题包括：

* 数据一致性：分布式环境中，如何确保所有节点看到相同的数据视图？ZK 通过强一致性协议（如 ZAB）实现顺序一致性和原子广播，防止“脑裂”（split-brain）问题。
* 高可用与故障恢复：单个节点崩溃时，如何快速选举 Leader 并恢复服务？ZK 支持 Leader 选举和集群管理，避免单点故障。
* 配置管理与同步：如何统一分发配置变更？ZK 作为配置中心，支持实时监控和通知。
* 服务发现与命名：如何让服务动态注册和发现彼此？ZK 提供命名服务和注册中心。
* 分布式锁与同步：如何协调多个进程访问共享资源？ZK 实现分布式锁、队列等原语，屏蔽底层网络复杂性。

总之，ZK 不是存储大数据的工具，而是“胶水”——让分布式系统更可靠、高性能。 它特别适合对吞吐量有要求的场景，如大数据和微服务架构。

实际工程中的例子

在实际项目中，ZooKeeper 广泛用于大型分布式系统。下面举几个真实案例，展示它如何解决具体痛点：

|工程/项目|问题描述|ZooKeeper 如何解决|
|---|---|---|
|Hadoop/HBase|在 HDFS（Hadoop Distributed File System）中，多个 NameNode 需要协调元数据一致性；HBase 的 RegionServer 需要动态负载均衡。|ZK 作为协调中心，实现 Leader 选举和配置同步，确保元数据一致，避免数据丢失或不一致。|
|Apache Kafka|Kafka 的 Broker 和分区需要 Leader 选举；消费者组需要协调分区分配。|ZK 用于存储 Broker 注册信息和 Leader 元数据，支持故障时快速选举新 Leader，实现高可用分区管理。（注：Kafka 3.3+ 已迁移到 KRaft，但早期依赖 ZK）|
|Dubbo（阿里巴巴微服务框架）|微服务间服务发现难，配置变更需手动同步，导致部署复杂。|ZK 作为注册中心，服务提供者注册地址，消费者动态发现；支持配置中心实时推送变更，实现无感知更新。|
|ShardingSphere（数据库分片中间件）|多节点 Sharding-Proxy 需要统一分片规则和节点管理，防止规则不一致导致数据倾斜。|ZK 作为配置中心，存储分片规则，支持节点注册和监控，实现集群统一管理和故障转移。|

## Docker Compose搭建ZooKeeper集群

```bash
mkdir zookeeper-tutorial && cd zookeeper-tutorial
```

使用 Docker Compose 搭建 3 节点 ZooKeeper 集群

docker-compose.yml

```yml
version: '3.9'

services:
  zk1:
    image: zookeeper:3.9.4
    container_name: zk1
    restart: unless-stopped
    hostname: zk1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk1_data:/data
      - zk1_datalog:/datalog

  zk2:
    image: zookeeper:3.9.4
    container_name: zk2
    restart: unless-stopped
    hostname: zk2
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk2_data:/data
      - zk2_datalog:/datalog

  zk3:
    image: zookeeper:3.9.4
    container_name: zk3
    restart: unless-stopped
    hostname: zk3
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk3_data:/data
      - zk3_datalog:/datalog

volumes:
  zk1_data:
  zk1_datalog:
  zk2_data:
  zk2_datalog:
  zk3_data:
  zk3_datalog:

```

对docker-compose.yml解释:

* ZOO_MY_ID: 每个节点唯一ID
* ZOO_SERVERS: 集群服务器列表，格式 server.X=host:clientPort:leaderPort;2181 (clientPort为客户端连接，leaderPort为内部通信)
* 端口映射：zk1用2181, zk2用2182, zk3用2183, 便于外部连接。
* Volumes：持久化数据，避免重启丢失

停止集群并清理旧集群（删除volumes以避免残留数据）

```bash
docker-compose down -v
```

启动集群：

```bash
docker-compose up -d
```

验证启动 检查是否有报错信息：

```bash
docker logs zk1
docker logs zk2
docker logs zk3
```

进到容器里

```bash
docker exec -it zk1 bash

root@zk3:/apache-zookeeper-3.9.4-bin# ls
bin  conf  docs  lib  LICENSE.txt  NOTICE.txt  README.md  README_packaging.md

root@zk3:/apache-zookeeper-3.9.4-bin/bin# ll -lrt
total 88
-rwxr-xr-x 1 zookeeper zookeeper  1385 Aug 19 19:49 zkTxnLogToolkit.sh*
-rwxr-xr-x 1 zookeeper zookeeper   996 Aug 19 19:49 zkTxnLogToolkit.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  1377 Aug 19 19:49 zkSnapShotToolkit.sh*
-rwxr-xr-x 1 zookeeper zookeeper   988 Aug 19 19:49 zkSnapShotToolkit.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  1422 Aug 19 19:49 zkSnapshotRecursiveSummaryToolkit.sh*
-rwxr-xr-x 1 zookeeper zookeeper   995 Aug 19 19:49 zkSnapshotRecursiveSummaryToolkit.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  1374 Aug 19 19:49 zkSnapshotComparer.sh*
-rwxr-xr-x 1 zookeeper zookeeper   987 Aug 19 19:49 zkSnapshotComparer.cmd*
-rwxr-xr-x 1 zookeeper zookeeper 11680 Aug 19 19:49 zkServer.sh*
-rwxr-xr-x 1 zookeeper zookeeper  4559 Aug 19 19:49 zkServer-initialize.sh*
-rwxr-xr-x 1 zookeeper zookeeper  1243 Aug 19 19:49 zkServer.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  3947 Aug 19 19:49 zkEnv.sh*
-rwxr-xr-x 1 zookeeper zookeeper  1810 Aug 19 19:49 zkEnv.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  1576 Aug 19 19:49 zkCli.sh*
-rwxr-xr-x 1 zookeeper zookeeper  1115 Aug 19 19:49 zkCli.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  1978 Aug 19 19:49 zkCleanup.sh*
-rwxr-xr-x 1 zookeeper zookeeper   232 Aug 19 19:49 README.txt*
drwxr-xr-x 2 zookeeper zookeeper  4096 Aug 19 19:49 ./
drwxr-xr-x 6 zookeeper zookeeper  4096 Oct  2 08:52 ../
```

在bin下有 zkCli.sh

```bash
root@zk3:/apache-zookeeper-3.9.4-bin/bin# ./zkCli.sh 
...
...
2025-10-15 07:41:38,559 [myid:localhost:2181] - INFO  [main-SendThread(localhost:2181):o.a.z.ClientCnxn$SendThread@1427] - Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x30203aaa8590000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null zxid: -1
[zk: localhost:2181(CONNECTED) 0] 
```

集群验证，测试一致性

```bash
在 zk1中执行
[zk: localhost:2181(CONNECTED) 0] create /test-cluster "Cluster OK"
Created /test-cluster
[zk: localhost:2181(CONNECTED) 1] 
```

```bash
在 zk1中 zkCli连接 zk2
root@zk3:/apache-zookeeper-3.9.4-bin/bin# ./zkCli.sh -server zk2:2181
Connecting to zk2:2181
..
..
2025-10-15 07:44:26,765 [myid:zk2:2181] - INFO  [main-SendThread(zk2:2181):o.a.z.ClientCnxn$SendThread@1427] - Session establishment complete on server zk2/172.18.0.4:2181, session id = 0x20203aaa89b0000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null zxid: -1
[zk: zk2:2181(CONNECTED) 3] ls /
[test-cluster, zookeeper]
[zk: zk2:2181(CONNECTED) 4] get /test-cluster
Cluster OK
[zk: zk2:2181(CONNECTED) 5] 
```

可见我们的集群基本搭建成功了。

## 基本认识

### 什么是ZNode?

ZNode是ZooKeeper的核心数据单元，类似于文件系统中的文件或目录，每个ZNode都有一个路径（如 `/app/config`）,可以存储少量数据(通常小于1MB),并支持子节点，形成树状结构，ZNode有几种类型：

* __持久节点(Persistent)__: 会话结束后仍存在
* __临时节点(Ephemeral)__: 绑定到客户端会话，断开后自动删除（常用于心跳检测）
* __顺序节点(Sequential)__: 创建时自动附加序号（如 /lock-0000000001），用于分布式锁。
* __持久顺序节点__: 结合持久和顺序

ZNode 支持版本控制（stat 信息包括版本号），允许原子操作和 watcher（事件监听）。

### ZooKeeper存什么？

ZooKeeper 不存大数据（它不是 HDFS 或 Cassandra），而是存储小量、结构化的元数据和协调信息。典型内容包括：

* 配置数据：如应用配置、环境变量（e.g., /config/database.url）。
* 服务注册/发现：服务地址、状态（e.g., /services/prod/node1=ip:port）。
* 分布式锁/队列：锁路径或任务队列（e.g., /locks/task-1）。
* Leader 选举结果：集群 Leader 的元数据。
* 会话状态：临时节点用于心跳。

数据以字节数组（`byte[]`）形式存储，支持 ACL（访问控制）和 watcher 通知变更。ZK 强调高性能读写（QPS > 10k），适合 < 1MB 的小对象。

### ZooKeeper的结构

ZooKeeper的数据结构是一个层次化的树状结构（类似 Unix 文件系统），根节点为 /。每个 ZNode 是树中的一个节点：

* 路径：用 / 分隔，如 /parent/child。
* 属性：每个 ZNode 有数据、子节点列表、ACL 和 stat（创建/修改时间、版本、子节点数等）。

```bash
/
├── zookeeper (系统节点，内置)
├── config
│   ├── db
│   │   └── url = "jdbc:mysql://host:3306/db"
│   └── timeout = "30s"
├── services
│   ├── web1 (临时节点) = "192.168.1.10:8080"
│   └── web2 (顺序节点) = "192.168.1.11:8080"
└── locks
    └── task-0000000001 (临时顺序节点) = "" (空数据，仅路径用于锁)
```

这种结构支持快速遍历（ls/get）和事件驱动（watcher 监听子节点/数据变化）。ZK 集群通过复制确保一致性，数据分布在 Leader 和 Follower 节点上。

### 数据

ZNode时ZooKeeper的基本数据单元，ZNode有 数据(Data)、子节点列表(Children List)、ACL(Access Control List)、Stat(统计信息) 这四大概念。

这些概念构成了 ZNode 的完整属性模型，支持 ZooKeeper 的树状数据结构和分布式协调功能。

数据，ZNode 存储的实际内容，通常是字节数组（`byte[]`），大小限制为 1MB 以内。

用于保存配置、服务地址等元数据。例如，ZNode `/config/db` 的数据为 `"jdbc:mysql://host:3306"`。通过 get 命令读取。

### 子节点列表

子节点列表，ZNode的直接子节点路径列表，形成层次化树状结构（根为 /）。

支持服务发现和命名服务。例如，/services 的子节点列表为 `["web1", "web2"]`，通过 ls 或 getChildren 获取。

### ACL

ACL (Access Control List), 访问控制列表，定义谁能对 ZNode 执行读/写/创建/删除/管理操作。

确保安全，例如 `digest:user:pass:crdwa` 允许特定用户所有权限。通过 setACL 设置、getACL 获取。

### stat

Stat (Statistics), ZNode 的元数据统计，包括版本号（cVersion/dataVersion）、时间戳（cZxid/mZxid）、子节点数等。

用于乐观锁和变更检测，例如 `stat /path` 返回 `{czxid: 0x100000000, ctime: 169xxxxxx, ...}`。

### ZNode 数据详情

在ZooKeeper中，ZNode是最基本的数据单元，类似于文件系统中的文件或目录。ZNode的Data（数据）是ZNode的核心内容部分，它存储ZNode的实际负载（payload），用于保存应用所需的元数据或配置信息。

1. Data部分的定义与特性

* __定义__：Data是ZNode的字节数据(byte[])内容，直接附加在ZNode上，它是ZNode的“值”部分，与其他属性（如子节点列表、ACL、Stat）独立。每个ZNode只能有一个Data。
* __数据类型__: 纯二进制数据(byte[]),不强制特定格式。常见存储：
  * 字符串，如JSON、XML配置
  * 序列化对象，如Protobuf或自定义二进制
  * 空数据，常用于临时/顺序节点，如分布式锁，仅路径用于标识
* __版本控制__：Data与ZNode的 dataVersion(数据版本号)绑定，每次修改Data时，版本号自动递增，用于实现乐观锁（CAS操作），防止并发冲突。
* __原子性__: 所有Data操作（如读/写）都是原子的，确保一致性(基于ZAB协议)。
* __Watcher支持__：读取Data时可设置 watcher(监听器)，当Data变更时通知客户端（一次性或持久）。

2. Data的限制与注意事项

* __大小限制__：默认最大 1MB （由 jute.maxbuffer参数控制）。这使ZK适合小量元数据存储，不适合大文件（否则影响性能和复制开销）。
* __存储开销__：Data存储在内存（SnapShot）和磁盘（Transaction Log），集群中所有节点复制Data。过大Data会放大网络/磁盘负载。
* __不可变性__：创建ZNode时指定Data，后续只能通过set修改，不能直接“追加”（需读-改-写）。
* __空Data__：允许空字符串，常用于“存在性”标记（如心跳节点）。
* __性能影响__：Data读操作高吞吐（QPS>10K）,但写操作会触发集群同步，建议最小化写频。

3. Data的用途

Data是ZK协调服务的“载体”，常见场景：

* 配置管理：存储应用配置，如 `/config/app.json = {"db": "mysql://host"}`
* 服务元数据：服务注册，如 `/services/prod/node1 = "ip:port:version"`
* 状态快照：Leader选举结果或任务状态
* 序列化数据：小量二进制，如用户会话token
* 避免滥用：不存业务数据（如日志），ZK不是数据库; 用etcd或Consul替代大配置。

4. 与其他ZNode属性的关系

Data独立于其他部分，但操作时常结合：

* 子节点列表：Data不影响子节点，但可用于父节点描述（如 /parent 的 Data = "服务组描述"）
* ACL：Data读/写 需ACL权限（r 读 w 写）
* Stat：包含Data相关统计，如 mZxid(最后修改ZXID)、mTime（修改时间）、dataLength（Data长度字节）

5. 相关操作

操作Data的主要方式是通过客户端工具 如zkCli.sh 或 API（如Node.js的node-zookeeper-client）。

假设已连接 zkCli

|操作|命令|示例|说明|
|---|---|---|---|
|创建 ZNode 时设置 Data | `create /path "data"` | `create /config/db  "jdbc:mysql://localhost:3306"` | 创建持久节点，Data 为字符串。支持 -e（临时）、-s（顺序）|
|读取 Data | `get /path [watch]` | `get /config/db` | 输出 Data 内容；watch 监听变更。返回 Data + Stat|
|修改 Data | `set /path "newdata" [version]` | `set /config/db "jdbc:mysql://newhost:3306" 1` | 更新 Data；version 为乐观锁（-1 表示忽略版本）。|
|获取 Data + Stat | `stat /path` | `stat /config/db` | 不返回 Data，只返回 Stat（包括 dataLength）|
|删除 ZNode（隐式清空 Data）| `delete /path` | `delete /config/db` | 删除节点，Data 随之丢失。|

__Watcher示例__: `get /config/db watch`,然后在另一终端 `set /config/db "updated"`,会触发事件通知 `WatchedEvent state:SyncConnected type:NodeDataChanged path:/config/d`

6. 最佳实践与常见问题

* 实践：用JSON序列化复杂Data；结合Watcher实现配置热更新；监控Data大小避免OOM；
* 问题：
  * Connection Refused：确保服务绑定 clientPort
  * BadVersion：并发写时用 Stat.version 乐观锁
  * Data过大：拆分成多个ZNode或用外部存储
* 性能：读Data快（本地缓存），写慢（全集群同步）；建议读多写少。

7. Node.js样例

```javascript
const zookeeper = require('node-zookeeper-client');

const client = zookeeper.createClient('127.0.0.1:2181');

client.connect();


function testCode() {
    // 创建ZNode时设置Data
    client.create('/config', Buffer.from("jdbc:mysql://localhost:3306"), zookeeper.CreateMode.PERSISTENT, (err, path) => {
        if (err) console.error('Create error:', err);
        else console.log('Created:', path);
    });
    // 读取Data 带Watcher
    client.getData('/config', (event, data, stat) => {
        console.log('Data:', data.toString('utf8'));  // 转为字符串
        console.log('Stat:', stat);  // { version: 0, dataLength: 25, ... }
    }, (err, data, stat) => {
        if (err) console.error('Get error:', err);
        else console.log('Initial Data:', data.toString('utf8'));
    });
    // 修改Data 带版本控制
    client.setData('/config/db', Buffer.from('jdbc:mysql://newhost:3306'), -1, (err, stat) => {  // -1 忽略版本
        if (err) console.error('Set error:', err);
        else console.log('Updated Stat:', stat.version);  // 新版本号
    });
    // 关闭连接
    client.close();
}

client.once('connected', () => {
    console.log('Connected to ZooKeeper.');
    testCode();
});

```

解释：

* Data始终用 Buffer.from 传入二进制
* Watcher回调：event.type 为 ZOO_CHANGED_EVENT 时表示Data变更
* 版本控制：`setData(path, data, version, callback)`,若版本不匹配则失败（NoAuthException 或 BadVersionException）。

### ZNode 子节点列表详情

在ZooKeeper（简称ZK）中，ZNode的子节点列表(Children List)是ZNode的另一个核心属性部分，它标识该ZNode的直接子节点(Direct Children)的路径名称列表，形成一个层次化的树状数据结构。
子节点列表类似于文件系统目录下的文件/子目录列表，但ZooKeeper强调分布式一致性和事件驱动通知。

1. 子节点列表的定义与特性

* __定义__：Children List是ZNode的子节点名称数组（字符串列表），每个元素是子节点的相对路径（不含父路径）。例如，对于ZNode /parent, 其 Children List可能为 `["child1", "child2"]`,表示子节点路径为 `/parent/child1` 和 `/parent/child2`。根节点 / 的Children List包含所有顶级ZNode。
* __层次化结构__: ZooKeeper的整个数据模型是一个树状结构（Hierarchical Namespace）,子节点列表定义了树的“分支”。每个ZNode只能有直接子节点（无深层嵌套查询优化），遍历需递归。
* __动态性__：Children List是动态的一一创建、删除子节点时自动更新，并通过ZAB协议在集群中原子广播，确保所有节点一致视图。
* __版本控制__：与Data类似，Children List与ZNode的 cVersion(子节点变更版本号)绑定，每次子节点 创建、删除时，cVersion 递增，用于检测变更和乐观锁。
* __Watcher支持__：可支持watcher监听子节点列表变化（如新增、删除），触发事件通知（NodeChildrenChanged），支持事件驱动编程。
* __无顺序保证__：列表是无序的（除非使用顺序节点，自动附加序号如 child-0000000001）

2. 子节点列表的限制与注意事项

* __数量限制__：单个ZNode的子节点数默认无硬限（理论上可达数百万），但受内存和性能影响（每个节点~100字节开销）。生产中建议小于10k，避免树过深（深度大于10层影响遍历效率）。
* __存储开销__：Children List存储在内存(SnapShot)和磁盘(Transaction Log),集群全复制。过多子节点会增加同步负载。
* __不可直接修改__：不能“追加”到列表，只能通过创建/删除子节点间接变更。
* __临时节点影响__：临时子节点(Ephemeral)在会话断开后自动删除，会动态移除列表。
* __性能影响__：读取Children List快O(1)内存访问，但在高并发创建、删除时可能产生“热点”（ZNode 争用），建议用顺序节点分散。
* __无数据关联__：Children List只存名称，不包含子节点Data或Stat；需单独查询子节点。

3. 子节点列表的用途

Children List是ZooKeeper命名服务和发现机制的基础，常见场景：

* __服务发现__：父节点 `/sservices` 的Children List为服务实例列表（如 `["node1", "node2"]`）,消费者遍历列表获取地址
* __配置分组__：如 `/config/prod`的Children List为环境配置子项(["db", "cache"])
* __分布式队列、锁__：顺序节点的列表用于FIFO队列或公平锁（最小序号节点持有锁）
* __命名空间管理__：实现分布式命名，如DNS风格的 `/domain/subdomain`
* __监控__：Watcher监听列表变化，实现动态注册、注销通知。

4. 与其他ZNode属性的关系

Children List独立但互补：

* Data：父节点Data可描述子节点组（如 /services 的Data="服务注册列表"），但不影响列表
* ACL：操作 Children List需父节点的 c (创建子节点)和 r （读列表）权限；子节点继承父ACL
* Stat：包含Children List相关统计，如 numChildren 子节点数、cZxid 最后子节点变更ZXID、cTime 子节点最后变更时间
* 顺序节点：Children List中的顺序子节点会按序号排序，便于队列实现

5. zkCli操作

|操作|命令|示例|说明|
|---|---|---|---|
|创建子节点（间接添加至列表）| `create /parent/child "data"` | `create /services/node1 "ip:8080"` | 创建子节点，自动加入 /services 的 Children List。支持 -e（临时）、-s（顺序）。|
|列出子节点列表 | `ls /path [watch]` | `ls /services` | 输出 Children List（如 [node1, node2]）；watch 监听变化。|
|列出子节点 + Stat | `ls2 /path` | `ls2 /services` | 输出列表 + 每个子节点的简要 Stat（如版本、时间）。|
|删除子节点（间接移除列表）| `delete /parent/child` | `delete /services/node1` | 移除子节点，从列表中删除；需 d 权限。|
|获取子节点总数 | `getAllChildrenNumber`|`getAllChildrenNumber`|全局统计所有 ZNode 的子节点总数（非特定路径）。|
|递归删除（清空列表）|`deleteall /path 或 rmr /path`|`rmr /services`|删除 ZNode 及其所有子节点，清空列表。|

Watcher示例：`ls /services watch`，然后在另一终端 `create /services/node3 "ip:8081"`，会触发：`"WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/services"`。

6. 最佳实践与常见问题

实践：用顺序节点实现有序列表（如队列）;Watcher持久化监听（递归重新注册），分页遍历大列表（ZK 无内置分页，用前缀过滤）

* 问题：
  * 空列表：ls输出 [],正常。无子节点时 numChildren=0
  * Watcher丢失：一次性watcher需手动重设，高频变更可能洪水事件
  * 性能瓶颈：热点ZNode(如根节点)子节点过多，用哈希分片（如/services/a/node1）
  * 一致性：集群中列表视图一致，但网络分区时可能延迟

7. Node.js

```javascript
const zookeeper = require('node-zookeeper-client');

const client = zookeeper.createClient('127.0.0.1:2181');

client.connect();

function listServices() {
    client.getChildren('/services',
        function (event) {
            if (event) {
                console.log('Children changed event:', event);
                // Re-attach watcher recursively
                listServices();
            }
        },
        function (err, children, stat) {
            if (err) {
                console.error('GetChildren error:', err);
                return;
            }
            console.log('Children List:', children);  // e.g., ['node1']
            console.log('Num Children:', stat ? stat.numChildren : 'N/A');
            console.log('cVersion:', stat ? stat.cversion : 'N/A');
        }
    );
}

function testCode() {
    // 0. Create persistent parent node (allows ephemeral children)
    client.create('/services', Buffer.from('fnode'), zookeeper.CreateMode.PERSISTENT, (err, path) => {
        if (err) console.error('services Create error:', err);
        else console.log('services created', path);
    });

    // 1. Create ephemeral child node (adds to list)
    client.create('/services/node1', Buffer.from('ip:8080'), zookeeper.CreateMode.EPHEMERAL, (err, path) => {
        if (err) console.error('Create error:', err);
        else console.log('Added to list:', path);
    });

    // 2. List children with recursive watcher
    listServices();

    // 3. Delete child after delay (to allow initial list and watcher setup)
    setTimeout(() => {
        client.remove('/services/node1', -1, (err) => {
            if (err) console.error('Delete error:', err);
            else console.log('Removed from list');
        });
    }, 1000);

    // 4. Close after more delay (to see delete effect)
    setTimeout(() => {
        client.close();
    }, 2000);
}

client.once('connected', () => {
    console.log('Connected to ZooKeeper.');
    testCode();
});

// root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node index.js
// Connected to ZooKeeper.
// services created /services
// Added to list: /services/node1
// Children List: [ 'node1' ]
// Num Children: 1
// cVersion: 1
// Children changed event: Event { type: 4, name: 'NODE_CHILDREN_CHANGED', path: '/services' }
// Removed from list
// Children List: []
// Num Children: 0
// cVersion: 2

```

解释：

* `getChildren(path, watcher, callback)`：返回字符串数组（Children List）和 Stat；watcher 为可选回调函数。
* Watcher 事件：`event.type` 为 `zookeeper.Event.NODE_CHILDREN_CHANGED` 时表示列表变更。
* 版本控制：删除时用 -1 忽略版本；可结合 `stat.cversion` 实现 CAS。

### ZNode ACL详情

在ZooKeeper中，ZNode的ACL（Acess Control List，访问控制列表）是ZNode的安全属性部分，用于定义对ZNode的访问权限。它提供细粒度的授权机制，确保分布式环境中只有授权主体才能执行特定操作（如读、写、删除）。ACL是ZooKeeper安全模型的核心，帮助防止未授权访问，尤其在多用户或服务场景下。

1. ACL的定义与特性

* __定义__：ACL是一个权限列表，附加在每个ZNode上，指定哪些主体（ID）可以使用哪些权限模式（Scheme）执行哪些操作。ACL不是单一规则，而是多个规则的集合（数组），每个规则格式为 `scheme:id:permissions`。例如，`world:anyone:cdrwa` 表示任何人（world模式）对ZNode有所有权限。
* __Scheme（权限）__：认证方式，常见类型
  * world：无认证，任何人访问（默认）
  * auth：已认证用户（通过addAuth）
  * digest：基于用户名:密码的SHA1哈希(digest:user:pass)
  * ip：基于IP地址(ip:192.168.1.0支持CIDR)
  * sasl：SASL认证（Kerberos等，生产级）
* __ID（主体）__：授权对象，如 anyone（world）、user:admin（digest）、192.68.1.100（ip）
* __Permissions（权限）__：允许的操作，5种基本权限（可组合）
  * r：读（get Data、Is Children）
  * w：写（set Data）
  * c：创建子节点（create）
  * d：删除子节点（delete）
  * a：管理ACL（setACL）
  * 组合：crdwa（所有权限），r（仅读）
* __继承性__：子节点默认继承父节点的ACL，但可独立覆盖
* __原子性__：ACL操作(如 setACL)是原子的，全集群同步
* __版本控制__：ACL与ZNode的aclVersion（ACL版本号）绑定，修改时版本递增，用于乐观锁
* __Watcher支持__：无直接Watcher（ACL变更不触发事件），但ACL影响其他操作的Watcher

2. ACL的限制与注意事项

* __规则数量__：单个ZNode的ACL最多200条规则（硬限），过多会影响性能
* __认证开销__：digest模式需客户端预先 addAuth；sasl需要Kerberos配置（复杂）。
* __无加密__：ACL只控制访问，不加密传输（建议结合TLS）
* __默认开放__：ZooKeeper默认ACL为 `world:anyone:cdrwa`，生产中必须自定义
* __性能影响__：ACL检查在每个操作前执行，简单规则快，但复杂规则（如IP范围）稍慢
* __不可逆__：错误设置ACL（如锁死自己）需要超级用户或重启恢复
* __方案兼容__：不同Scheme不冲突，world 总是最后检查 (fallback)

3. ACL的用途

ACL是ZooKeeper安全保障的关键，常见场景：

* __多租户隔离__：不同服务组用不同ACL（如 /tenant1 只允许tenant1用户）
* __配置保护__：敏感配置节点（如 /secrets/db-pass）限制为 `digest:admin:***:rw`
* __服务发现安全__：/services 的子节点ACL限制为服务所有者可写，其他可读
* __分布式锁控制__：锁路径ACL确保只有授权进程可创建、删除
* __审计与合规__：结合SASL记录访问日志

4. 与其他ZNode属性的关系

ACL独立但控制其他部分：

* Data：读/写Data需 r/w权限
* Children List：Is Children需 r，创建/删除 子节点需 c/d
* Stat：获取Stat需 r （Stat包含版本，但不含Data）
* 继承：新创建子节点继承父ACL的c权限主体

5. 最佳实践与常见问题

* 实践：最小权限原则（服务发现用r，配置用rw），用digest简单场景，SASL企业级，定期审计ACL（用4LW命令如cons）
* 问题：
  * NoAuth：忘记addAuth或Scheme不匹配
  * BagVersion：并发setACL时用 stat.aversion 乐观锁
  * 继承陷阱：子节点创建需父 c 权限，但继承后独立
  * 调试：用 getAcl 检查，生产启用ZK审计日志
  * 安全：结合 Quorum SASL 保护内部通信。

6. zkCli

|操作|命令|示例|说明|
|---|---|---|---|
|认证（addAuth）|`addauth scheme id`|`addauth digest admin:admin123`|认证以访问受限节点；digest 会哈希密码。|
|设置 ACL|`setAcl /path scheme:id:perms`|`setAcl /secure world:anyone:r,digest:admin:admin123:cdrwa`|设置多个规则（逗号分隔）；覆盖现有 ACL。|
|获取 ACL|`getAcl /path`|`getAcl /secure`|输出 ACL 列表，如 world:anyone:cdrwa。|
|创建时设置 ACL|`create /path "data" acl`|`create /secure "data" world:anyone:cdrwa`|创建节点时指定 ACL（可选）。|
|列出 ACL 版本|`stat /path`|`stat /secure`|输出 Stat 中的 aversion（ACL 版本）。|
|移除 ACL|`setAcl /path open:anyone:cdrwa`|`setAcl /secure world:anyone:cdrwa`|设为默认开放（非删除）。|

示例流程：

* `create /secure "sensitive"`
* `setAcl /secure digest:admin:admin123:crdwa`
* `addauth digest admin:admin123`
* `get /secure` （成功）；无认证则失败 `KeeperErrorCode = NoAuth for /secure`

7. Node.js

改一下docker-compose.yml,使用自定义 zoo.cfg

```yml
version: '3.9'

services:
  zk1:
    image: zookeeper:3.9.4
    container_name: zk1
    restart: unless-stopped
    hostname: zk1
    ports:
      - "2181:2181"
    networks:
      - zk-net
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk1_data:/data
      - zk1_datalog:/datalog
      - ./zoo.cfg:/apache-zookeeper-3.9.4-bin/conf/zoo.cfg

  zk2:
    image: zookeeper:3.9.4
    container_name: zk2
    restart: unless-stopped
    hostname: zk2
    ports:
      - "2182:2181"
    networks:
      - zk-net
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk2_data:/data
      - zk2_datalog:/datalog
      - ./zoo.cfg:/apache-zookeeper-3.9.4-bin/conf/zoo.cfg

  zk3:
    image: zookeeper:3.9.4
    container_name: zk3
    restart: unless-stopped
    hostname: zk3
    ports:
      - "2183:2181"
    networks:
      - zk-net
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk3_data:/data
      - zk3_datalog:/datalog
      - ./zoo.cfg:/apache-zookeeper-3.9.4-bin/conf/zoo.cfg

volumes:
  zk1_data:
  zk1_datalog:
  zk2_data:
  zk2_datalog:
  zk3_data:
  zk3_datalog:

networks:
  zk-net:
    driver: bridge

```

zoo.cfg

```txt
# 基本配置
tickTime=2000
initLimit=10
syncLimit=5

# 数据和日志目录（镜像内部路径）
dataDir=/data
dataLogDir=/datalog

# 客户端端口（内部）
clientPort=2181

# 集群服务器配置（使用容器名称作为主机名）
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888

# 启用 Digest 认证提供者
authProvider.1=org.apache.zookeeper.server.auth.DigestAuthenticationProvider

# 超级用户 Digest（admin:admin123 的哈希值）
superDigest=admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=

```

```bash
[zk: 127.0.0.1:2181(CONNECTED) 0] addauth digest admin:admin123
[zk: 127.0.0.1:2181(CONNECTED) 1] create /test "data"
Created /test
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /
[test, zookeeper]
[zk: 127.0.0.1:2181(CONNECTED) 3] create /secure-test "protected data" digest:admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=:cdrwa
Created /secure-test
[zk: 127.0.0.1:2181(CONNECTED) 4] getAcl /secure-test
'digest,'admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=
: cdrwa
[zk: 127.0.0.1:2181(CONNECTED) 5] get /secure-test
protected data
[zk: 127.0.0.1:2181(CONNECTED) 6] 
```

退出会话重新进Cli

```bash
root@zk1:/apache-zookeeper-3.9.4-bin/bin# ./zkCli.sh -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 1] get /secure-test
Insufficient permission : /secure-test
[zk: 127.0.0.1:2181(CONNECTED) 2] 
# 可见没有权限
# 进行AUTH
[zk: 127.0.0.1:2181(CONNECTED) 2] addauth digest admin:admin123
[zk: 127.0.0.1:2181(CONNECTED) 3] get /secure-test
protected data
[zk: 127.0.0.1:2181(CONNECTED) 4] 
```

```javascript
const zookeeper = require('node-zookeeper-client');

const client = zookeeper.createClient('127.0.0.1:2181');

client.connect();

function testCode() {
    // Define ACLs after auth
    const worldAcl = new zookeeper.ACL(zookeeper.Permission.ADMIN, new zookeeper.Id('world', 'anyone'));
    const digestAcl = new zookeeper.ACL(zookeeper.Permission.ADMIN, new zookeeper.Id('digest', 'admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE='));
    const acls = [worldAcl, digestAcl];

    // 2. 创建节点（如果不存在），然后设置 ACL
    client.create('/secure', Buffer.from('initial data'), zookeeper.CreateMode.EPHEMERAL, (err, path) => {
        if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
            console.error('Create error:', err);
            client.close();
            return;
        }
        console.log('Node /secure created or exists:', path);

        // Now set ACL
        client.setACL('/secure', acls, -1, (err, stat) => {  // -1 ignores version
            if (err) {
                console.error('SetACL error:', err);
                client.close();
                return;
            }
            console.log('ACL set, version:', stat.aversion);

            // 3. 获取 ACL
            client.getACL('/secure', (err, retrievedAcls, stat) => {
                if (err) console.error('GetACL error:', err);
                else {
                    console.log('Retrieved ACLs:', retrievedAcls);
                    console.log('ACL Version:', stat.aversion);

                    // 4. 创建新节点时设置 ACL（正确顺序：data, acls, mode）
                    client.create('/secure-new', Buffer.from('data'), acls, zookeeper.CreateMode.EPHEMERAL, (err, path) => {
                        if (err) console.error('Create error:', err);
                        else console.log('Created with ACL:', path);

                        // 5. 关闭连接
                        setTimeout(() => client.close(), 1000);  // Brief delay to see ephemeral nodes
                    });
                }
            });
        });
    });
}

client.once('connected', () => {
    console.log('Connected to ZooKeeper.');

    // 1. 认证（addAuthInfo）
    client.addAuthInfo('digest', Buffer.from('admin:admin123'));

    console.log('Authentication info added.');

    testCode();
});

// root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node index.js
// Connected to ZooKeeper.
// Authentication info added.
// Node /secure created or exists: /secure
// ACL set, version: 1
// Retrieved ACLs: [
//   ACL { permission: 16, id: Id { scheme: 'world', id: 'anyone' } },
//   ACL {
//     permission: 16,
//     id: Id { scheme: 'digest', id: 'admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=' }
//   }
// ]
// ACL Version: 1
// Created with ACL: /secure-new

```

### ZNode Stat详情

在ZooKeeper中，ZNode的Stat（Statistics，统计信息）是ZNode的元数据属性部分，它记录了ZNode的生命周期和变更历史，提供版本控制、时间戳和
计数等信息。

Stat不存储实际数据，而是辅助其他操作实现一致性、乐观锁和监控，Stat是只读的、全集群一致的（通过ZAB协议同步），常用于检测变更或诊断问题。

1. Stat的定义与特性

* __定义__：Stat是一个结构体，包含ZNode的11个固定字段，描述创建、修改历史、版本和基本统计。每个ZNode都有一个唯一的Stat，操作ZNode时可附带返回Stat。
* __核心字段__：

|类别|字段|类型|描述
|---|---|---|---|
|ZXID（事务ID）|czxid|long|创建ZNode的ZXID(ZooKeeper事务ID，全局单调递增)|
||mzxid|long|最后修改ZNode的ZXID（Data或ACL变更）|
||pzxid|long|最后子节点变更的ZXID（创建、删除 Children）|
|时间戳|ctime|long|创建时间（毫秒时间戳，Unix Epoch）|
||mtime|long|最后修改时间(Data或子节点变更)|
|版本号|cversion|int|子节点变更版本(Children List修改时递增)|
||dataVersion|int|Data变更版本（setData时递增）|
||aclVersion|int|ACL变更版本（setACL时递增）|
|临时节点|ephemeralOwner|long|临时节点(Ephemeral)的会话ID（0表示非临时）|
|统计|dataLength|int|Data的字节长度（0表示空）|
||numChildren|int|当前子节点数|

* __原子更新__：所有Stat字段在事务中原子更新，全集群同步
* __版本机制__：版本字段(cversion等)用于乐观锁，操作时指定预期版本，不匹配则失败（BagVersionException）
* __时间戳精度__：毫秒级，但不保证时钟同步（分布式环境中可能有偏差）
* __ZXID唯一性__：ZXID是64位（高32位epoch，低32位计数），确保全局顺序
* __只读__：Stat不可直接修改，只能通过ZNode操作间接更新

2. Stat的限制与注意事项

* __字段固定__：Stat结构不可扩展，只有这11个字段，自定义元数据需存入Data
* __性能开销__：返回Stat无额外成本（内存访问），但频繁查询大树时需要注意网络负载
* __时间戳偏差__：分布式节点时钟不同步，可能导致 ctime,mtime 不准（用ZXID代替时间比较）
* __版本溢出__：版本号是int,极高并发下可能溢出
* __临时节点__：ephemeralOwner大于0时，Stat在会话断开后失效（节点删除）
* __无Watcher__：Stat变更不直接触发事件，但可通过Data/Children Watcher间接监控 dataVersion变化

3. Stat的用途

Stat是ZooKeeper一致性和调试的“指纹”，常见场景：

* __乐观锁__：用dataVersion实现CAS(Compare-And-Swap)，如并发更新Data
* __变更检测__：比较 mzxid、pzxid 检测节点是否更新（配置热加载）
* __监控与审计__：numChildren 监控服务实例数，ctime、mtime 追踪节点生命周期
* __Leader选举__：ZXID用于比较提案优先级（更高ZXID胜出）
* __诊断__：dataLength检查Data大小，ephemeralOwner验证临时节点所有者
* __缓存失效__：客户端用Stat版本缓存Data，避免无效读

4. 与其他ZNode属性的关系

Stat汇总其他部分的状态：

* __Data__：dataVersion、mzxid、mtime、dataLength与Data变更关联
* __Children List__：cversion、pzxid、numChildren与子节点变更关联
* __ACL__：aclVersion与ACL变更关联
* __整体__：Stat是ZNode的“摘要”，操作时常附带 如 getData返回Data+Stat

5. 最佳实践与常见问题

* __实践__：始终附带Stat查询，用ZXID排序事件，缓存Stat版本减少无效操作，监控numChildren实现负载均衡
* __问题__：
  * BadVersion:并发时用最新Stat版本重试
  * Stat不准：临时节点删除后Stat失效，用 ephemeralOwner检查
  * 性能：大树递归getChildren会累积Stat开销，用前缀过滤优化
  * 调试：用stat命令诊断，生产日志ZXID追踪事务
  * 时区：ctime、mtime是UTC，客户端需转换本地时区

6. zkCli

|操作|命令|示例|说明|
|---|---|---|---|
|获取 Stat|`stat /path`|`stat /config`|输出所有 Stat 字段，如 czxid = 0x100000001、dataVersion = 0 等。|
|获取 Data + Stat|`get /path [watch]`|`get /config watch`|输出 Data + Stat；watch 监听 Data 变更（间接更新 Stat）。|
|获取 Children + Stat|`ls2 /path`|`ls2 /services`|输出 Children List + 父节点的 Stat（numChildren 等）。|
|检查存在 + Stat|`ls /path（隐式）`|`ls /config`|如果节点不存在，输出无 Stat；存在时附带简要 Stat。|
|版本控制操作|`set /path "data" version`|`set /config "new" 1`|用 dataVersion 作为预期版本；不匹配失败。|

```bash
[zk: localhost:2181(CONNECTED) 0] ls /
[secure-test, test, zookeeper]
[zk: localhost:2181(CONNECTED) 1] stat /test
cZxid = 0x100000004
ctime = Thu Oct 16 08:33:50 UTC 2025
mZxid = 0x100000004
mtime = Thu Oct 16 08:33:50 UTC 2025
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
[zk: localhost:2181(CONNECTED) 2] 
```

7. Node.js

```javascript
const zookeeper = require('node-zookeeper-client');

const client = zookeeper.createClient('127.0.0.1:2181');
client.connect();

// helper：把可能是 Buffer 的字段解析成 Number
function bufToNumber(buf) {
    if (!Buffer.isBuffer(buf)) return buf;

    // 常见两种长度：8 bytes (int64) 或 4 bytes (uint32)
    try {
        if (buf.length === 8 && typeof buf.readBigInt64BE === 'function') {
            const bi = buf.readBigInt64BE(0);
            // 转成 Number 可能会丢精度，但一般 zxid/ctime/mtime 在可表示范围内
            return Number(bi);
        }
        if (buf.length === 8 && typeof buf.readBigInt64LE === 'function') {
            // 如果是小端（不太常见），用 LE
            const bi = buf.readBigInt64LE(0);
            return Number(bi);
        }
        if (buf.length === 4) {
            return buf.readUInt32BE(0);
        }
        // 退而求其次：当做字符串
        return parseInt(buf.toString('hex'), 16);
    } catch (e) {
        // 如果都失败，返回原始 Buffer
        return buf;
    }
}

// 把 stat 里常用字段安全读取并格式化的函数
function formatStat(stat) {
    const get = (name, getterName) => {
        try {
            // 先尝试 getter（如果实现了）
            if (typeof stat[getterName] === 'function') {
                return stat[getterName]();
            }
            // 再尝试直接属性
            if (stat[name] !== undefined) {
                return stat[name];
            }
            // 再尝试小写或不同拼写的属性
            if (stat[name.toLowerCase()] !== undefined) {
                return stat[name.toLowerCase()];
            }
            return undefined;
        } catch (e) {
            return undefined;
        }
    };

    const czxidRaw = get('czxid', 'getCzxid') ?? stat.czxid;
    const mzxidRaw = get('mzxid', 'getMzxid') ?? stat.mzxid;
    const pzxidRaw = get('pzxid', 'getPzxid') ?? stat.pzxid;
    const ctimeRaw = get('ctime', 'getCtime') ?? stat.ctime;
    const mtimeRaw = get('mtime', 'getMtime') ?? stat.mtime;
    // data/version fields在不同实现中名字不同
    const versionRaw = get('version', 'getVersion') ?? stat.version ?? stat.dataVersion;
    const cversionRaw = get('cversion', 'getCversion') ?? stat.cversion;
    const numChildrenRaw = get('numChildren', 'getNumChildren') ?? stat.numChildren;

    // 解析 Buffer -> Number（如果需要）
    const czxid = bufToNumber(czxidRaw);
    const mzxid = bufToNumber(mzxidRaw);
    const pzxid = bufToNumber(pzxidRaw);
    const ctimeNum = bufToNumber(ctimeRaw);
    const mtimeNum = bufToNumber(mtimeRaw);
    const version = bufToNumber(versionRaw);
    const cversion = bufToNumber(cversionRaw);
    const numChildren = bufToNumber(numChildrenRaw);

    return {
        czxid,
        mzxid,
        pzxid,
        ctime: (typeof ctimeNum === 'number' && !Number.isNaN(ctimeNum)) ? new Date(ctimeNum) : ctimeRaw,
        mtime: (typeof mtimeNum === 'number' && !Number.isNaN(mtimeNum)) ? new Date(mtimeNum) : mtimeRaw,
        version,
        cversion,
        numChildren
    };
}

function testCode() {
    // 1. exists -> stat
    client.exists('/secure-test', (err, stat) => {
        if (err) return console.error('Exists error:', err);
        if (!stat) return console.log('Node does not exist');

        console.log('Stat:', formatStat(stat));
    });

    // 2. getData -> data + stat
    client.getData('/secure-test', (err, data, stat) => {
        if (err) return console.error('GetData error:', err);

        console.log('Data:', data ? data.toString('utf8') : null);
        console.log('Stat (from getData):', formatStat(stat));
    });

    // 3. getChildren('/', ...)
    client.getChildren('/', (err, children, stat) => {
        if (err) return console.error('GetChildren error:', err);

        console.log('Children:', children);
        console.log('Root stat:', formatStat(stat));
    });

    // 4. 修改 data（带版本控制）
    client.getData('/secure-test', (err, data, stat) => {
        if (err) return console.error('GetData error before set:', err);

        const fmt = formatStat(stat);
        const expectedVersion = typeof fmt.version === 'number' ? fmt.version : -1;

        client.setData('/secure-test', Buffer.from('updated'), expectedVersion, (err, newStat) => {
            if (err) {
                if (err.getCode && err.getCode() === zookeeper.Exception.BAD_VERSION) {
                    console.error('Version mismatch');
                } else {
                    console.error('SetData error:', err);
                }
            } else {
                console.log('Updated, new stat:', formatStat(newStat));
            }
        });
    });
}

client.once('connected', () => {
    console.log('Connected to ZooKeeper.');
    client.addAuthInfo('digest', Buffer.from('admin:admin123'));
    console.log('Authentication info added.');
    testCode();
});

// root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node index.js
// Connected to ZooKeeper.
// Authentication info added.
// Stat: {
//   czxid: 4294967301,
//   mzxid: 4294967319,
//   pzxid: 4294967301,
//   ctime: 2025-10-16T08:41:32.407Z,
//   mtime: 2025-10-16T17:32:56.581Z,
//   version: 1,
//   cversion: 0,
//   numChildren: 0
// }
// Data: updated
// Stat (from getData): {
//   czxid: 4294967301,
//   mzxid: 4294967319,
//   pzxid: 4294967301,
//   ctime: 2025-10-16T08:41:32.407Z,
//   mtime: 2025-10-16T17:32:56.581Z,
//   version: 1,
//   cversion: 0,
//   numChildren: 0
// }
// Children: [ 'zookeeper', 'test', 'secure-test' ]
// Root stat: {
//   czxid: 0,
//   mzxid: 0,
//   pzxid: 4294967315,
//   ctime: 1970-01-01T00:00:00.000Z,
//   mtime: 1970-01-01T00:00:00.000Z,
//   version: 0,
//   cversion: 7,
//   numChildren: 3
// }
// Updated, new stat: {
//   czxid: 4294967301,
//   mzxid: 4294967328,
//   pzxid: 4294967301,
//   ctime: 2025-10-16T08:41:32.407Z,
//   mtime: 2025-10-16T17:43:33.718Z,
//   version: 2,
//   cversion: 0,
//   numChildren: 0
// }

```

## 基本命令

ZooKeeper的指令主要分为两类，zkCli.sh客户端命令（用于交互式操作ZNode，如创建、删除节点）和4字母单词命令（4LW Commands）（用于服务器监控和管理，通过Telnet或nc发送）。

注意: zkCli命令在zkCli.sh会话中执行 4LW命令需连接服务器端口 如 `echo stat | nc localhost 2181`

### zkCli.sh客户端命令

这些是 ZooKeeper CLI（zkCli.sh）的核心命令，用于管理 ZNode（ZooKeeper 的数据节点）。完整列表如下（按功能分类）：

|命令|描述|示例|
|---|---|---|
|help|显示所有可用命令帮助|help|
|addauth|添加认证方案 如digest|addauth digest user:pass|
|close|关闭当前连接|close|
|connect|连接到指定服务器|connect localhost:2181|
|config|获取/设置集群配置|config get|
|create|创建ZNode(持久/临时/顺序)|create /path "data" -e -s|
|delete|删除ZNode|delete /path|
|deleteall|递归删除ZNode及其子节点|deleteall /path|
|get|获取ZNode数据|get /path|
|getACL|获取ZNode ACL(访问控制列表)|getACL /path|
|getAllChildrenNumber|获取所有ZNode的子节点总数|getAllChildrenNumber|
|getChildren|获取ZNode子节点列表（带统计）|getChildren /path|
|history|显示命令历史|history|
|ls|列出ZNode子节点|ls /path|
|ls2|列出子节点并显示统计信息|ls2 /path|
|printWatches|打印所有监听器|printWatches summary|
|pwd|显示当前工作路径|pwd|
|quit|退出zkCli|quit|
|rmr|递归删除ZNode(等同deleteall)|rmr /path|
|set|设置ZNode数据|set /path "newdata"|
|setACL|设置ZNode ACL|setACL /path world:anyone:cdrwa|
|stat|获取ZNode统计信息|stat /path|
|sync|同步客户端状态|sync|

这些命令支持 watcher(监听器)选项，如 `get /path watch`。

### 4字母单词命令(4LW Commands)

这些是服务器级命令，用于监控和诊断，通常通过网络工具发送。默认白名单启用大多数命令（排除 wchp和wchc）, 可通过 `4lw.commands.whitelist=*` 配置启用全部，

|命令|描述|示例|
|---|---|---|
|cons|列出所有连接会话|`echo cons \| nc localhost 2181`|
|conf|显示服务器配置|`echo conf \| nc localhost 2181`|
|crst|重置会话超时|`echo crst \| nc localhost 2181`|
|diff|列出未提交的变更|`echo diff \| nc localhost 2181`|
|dump|显示会话和临时节点|`echo dump \| nc localhost 2181`|
|echo|回显输入（测试连接）|`echo echo whoami \| nc localhost 2181`|
|envi|显示环境变量|`echo envi \| nc localhost 2181`|
|gbch|列出所有会话中的变更|`echo gbch \| nc localhost 2181`|
|get|获取 ZNode 数据（需路径）|`echo get /path \| nc localhost 2181`|
|ha-status|显示高可用状态|`echo ha-status \| nc localhost 2181`|
|hsts|显示 Leader 选举历史|`echo hsts \| nc localhost 2181`|
|isro|检查是否为只读模式|`echo isro \| nc localhost 2181`|
|leader|选举新 Leader（需路径）|`echo leader \| nc localhost 2181`|
|mntr|监控服务器指标（JSON）|`echo mntr \| nc localhost 2181`|
|mtr|列出监听器|`echo mtr \| nc localhost 2181`|
|net|显示网络统计|`echo net \| nc localhost 2181`|
|pkst|列出未提交的包|`echo pkst \| nc localhost 2181`|
|qstatus|显示队列状态|`echo qstatus \| nc localhost 2181`|
|ruok|检查服务器是否运行（返回 "imok"）|`echo ruok \| nc localhost 2181`|
|srvr|显示服务器状态|`echo srvr \| nc localhost 2181`|
|stat|显示运行统计|`echo stat \| nc localhost 2181`|
|sync|同步文件系统|`echo sync \| nc localhost 2181`|
|wchc|列出所有会话的监听器（需白名单）|`echo wchc \| nc localhost 2181`|
|wchp|列出持久监听器（需白名单）|`echo wchp \| nc localhost 2181`|
|wchs|列出所有监听器|`echo wchs \| nc localhost 2181`|
|zkdump|转储内部状态|`echo zkdump \| nc localhost 2181`|

## 为什么感觉ZooKeeper和Redis很像

ZooKeeper 和 Redis 的相似之处分析

ZooKeeper 和 Redis 都是分布式系统中的热门工具，用户常常觉得它们“很像”，主要是因为两者在功能重叠、架构设计和应用场景上存在不少共通点，尤其是在处理分布式数据一致性和协调时。以下从几个角度解释这种“相似感”的来源，同时也简要对比差异，帮助你更清晰地理解。

**核心功能重叠：分布式锁与数据一致性**

- 两者都常被用来实现**分布式锁**，解决多节点环境下数据一致性问题。这可能是最直接的“相似感”来源。例如，在高并发场景下，Redis 通过 SETNX 命令（设置键值并检查是否存在）快速加锁，而 ZooKeeper 则利用其临时节点（ephemeral nodes）和顺序节点（sequential nodes）机制实现更强的锁保障。 这种相似让开发者在选择时容易混淆：为什么不用 Redis 就够了？为什么 ZooKeeper 也行？
- 结果：两者都能保证“一个时刻只有一个进程持有锁”，这让它们在微服务、任务调度等场景中像“双胞胎”。

**内存驻留与高性能设计**

- 两者都是**内存优先**的系统：Redis 是纯内存数据库，数据驻留在 RAM 中以实现亚毫秒级响应；ZooKeeper 也主要在内存中维护数据树状结构（类似于文件系统），持久化只是辅助。 这点让它们在处理小量、高频读写的场景（如配置存储、会话管理）时表现类似，都强调速度而非海量持久存储。
- 相似感：初次接触时，都给人“轻量、快速、适合分布式缓存”的印象。

**键值存储模型的共通**

- ZooKeeper 提供分层命名空间（树状键值，如路径 /app/lock），Redis 支持扁平键值（如 key-value pairs）。两者都能存储小规模数据（如 JSON 配置），并支持原子操作。 这让它们在简单的数据共享（如服务发现、参数分发）中互换使用，感觉像“同类工具”。
- 官网定位也强化了这种感觉：Redis 专注数据存储，ZooKeeper 专注协调，但实际边界模糊。

**为什么“感觉像”，但实际有差异？**

尽管相似，但 ZooKeeper 更侧重**强一致性和协调**（如领导者选举、配置同步），可靠性高但效率稍低；Redis 则强调**高吞吐和简单**，适合缓存，但锁的原子性不如 ZooKeeper 严谨（需额外处理过期时间）。 如果你的业务并发大、追求效率，用 Redis；需高可靠协调，用 ZooKeeper。

总之，这种“像”源于分布式生态的共性需求，但深入看，它们是互补的。

## ZooKeeper实例之间怎么配合的

### ZooKeeper集群结构

ZooKeeper并不是单机程序，而是通常部署成一个奇数节点的集群（ensemble），例如

|节点名|IP|角色|
|---|---|---|
|zk1|192.168.1.1|Leader|
|zk2|192.168.1.2|Follower|
|zk3|192.168.1.3|Follower|

* 这3个ZooKeeper实例组成一个逻辑上的整体。
* 集群对外表现为一个 统一的 ZooKeeper服务。
* 客户端只需连任意一个节点即可读写数据（底层会自动同步）

### ZooKeeper各实例之间的角色分工

1. Leader（领导者）

* 负责 处理所有写请求（create、set、delete等）
* 维护全局顺序
* 与Followers同步数据，保证一致性

2. Followers（跟随者）

* 接收客户端请求（读可直接返回本地最新状态）
* 写请求会转发给Leader
* 参与选举与投票
* 接受Leader的事务广播并确认

3. Observer（观察者，可选）

* 不参与投票，之同步数据
* 适合用于读压力大的系统

### 实例之间的配合机制：ZAB协议

ZAB（ZooKeeper Atomic Broadcast）是ZooKeeper的核心算法，用于保证数据一致性与顺序性。

它大致分为两个阶段：

__阶段1：选举（Leader Election）__

当集群启动或Leader崩溃时：

1. 所有节点进入 选举状态
2. 各自广播 “我认为的最大事务ID（zxid）”
3. 拥有最新zxid的节点胜出，成为Leader
4. 其他节点成为Follower
5. 所有节点同步到Leader的最新状态

这样确保集群重新达到一致状态

__阶段2：原子广播（Atomic Broadcast）__

Leader运行期间，写请求通过“两阶段提交”传播：

|步骤|动作|说明|
|---|---|---|
|1|Leader接受写请求，生成提案（proposal）|proposal包含事务号zxid|
|2|Leader向所有Follower广播提案ZAB保证顺序||
|3|Follower接受提案并写入本地事务日志，发送ACK|表示已接受|
|4|当Leader收到 超过半数节点ACK时，提交（commit）|写入内存并通知客户端成功|
|5|Leader向所有节点广播commit消息|所有节点提交同样数据|

ZooKeeper的所有写操作都必须被 过半节点（>N/2）确认，这就是它保证一致性的根本。

### 一个简单的例子

假设3台机器：

* zk1（Leader）
* zk2（Follower）
* zk3（Follower）

客户端发送请求：

```bash
set /game/state "RUNNING"
```

流程：

1. zk2收到请求（客户端连的它），转发给zk1
2. zk1生成提案P1（zxid=0x0001）
3. zk1-> zk2、zk3发送 proposal
4. zk2、zk3写日志并回复ACK
5. zk1收到2个ACK（超过半数），提交
6. zk1-> zk2、zk3 广播commit
7. zk2、zk3 也提交
8. zk1向客户端返回 成功

即使zk3挂了，只要zk1 + zk2 半数仍在，写操作也能继续成功。

### 配置上是怎么体现的

每个节点的 zoo.cfg 通常如下：

```txt
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181

# 集群成员定义
server.1=192.168.1.1:2888:3888
server.2=192.168.1.1:2888:3888
server.3=192.168.1.3:2888:3888
```

解释：

* 2888：Follower和Leader之间的数据同步端口
* 3888：Leader选举通信端口
* clientPort：客户端连接端口
* 每个节点在 dataDir里有个myid文件，写入它自己的编号（如1,2,3）

### 客户端如何配合集群

客户端可指定多个节点：

```bash
const client = zookeeper.createClient('192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181');
```

ZooKeeper客户端会自动：

* 检测当前连接是否可用
* 若Leader、Follower崩溃，会自动切换
* 保证会话（session）连续
* 重连后watcher会重新注册

ZooKeeper集群是一种“带选举机制的强一致复制状态机”，多个实例之间通过Zab协议协同工作，Leader负责事务顺序与提交，Followers负责同步与确认，
客户端无感知地连接任意节点，即可获得线性一致地数据视图。

## 工程实践

### 服务发现

下面一个基于Node.js+ZooKeeper的 服务发现（Service Discovery）实践示例。

1. 服务实例自动注册
2. 客户端实时发现可用服务节点（并自动感知新增、下线）

server.js

```javascript
// server.js
const zookeeper = require('node-zookeeper-client');
const os = require("os");

const client = zookeeper.createClient("127.0.0.1:2181");
const SERVICE_PATH = "/services/myapp";

client.once('connected', () => {
    console.log("Connected to ZooKeeper");
    registerService();
});

client.connect();

function registerService() {
    client.exists(SERVICE_PATH, (err, stat) => {
        if (err) {
            return console.error("exists error:", err);
        }
        if (!stat) {
            client.mkdirp(SERVICE_PATH, (err) => {
                if (err) {
                    return console.error("mkdir error:", err);
                }
                createEphemeralNode();
            });
        } else {
            createEphemeralNode();
        }
    });
}

function createEphemeralNode() {
    const ip = getLocalIP();
    const port = 3000 + Math.floor(Math.random() * 1000);
    const data = JSON.stringify({ ip, port });
    const nodePath = `${SERVICE_PATH}/node-`;

    client.create(nodePath, Buffer.from(data), zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL, (err, path) => {
        if (err) return console.error("create error:", err);
        console.log(`Service registered at ${path} (${data})`);
    });
}

function getLocalIP() {
    const interfaces = os.networkInterfaces();
    for (const name in interfaces) {
        for (const iface of interfaces[name]) {
            if (iface.family === 'IPv4' && !iface.internal) return iface.address;
        }
    }
    return '127.0.0.1';
}

```

client.js

```javascript
// client.js
const zookeeper = require('node-zookeeper-client');

const client = zookeeper.createClient('127.0.0.1:2181');
const SERVICE_PATH = '/services/myapp';

client.once('connected', () => {
    console.log('Connected to ZooKeeper');
    watchServices();
});

client.connect();

function watchServices() {
    client.getChildren(
        SERVICE_PATH,
        event => {
            console.log('Service change event:', event);
            watchServices(); // 重新监听
        },
        (err, children) => {
            if (err) {
                return console.error('getChildren error:', err);
            }
            if (!children.length) {
                console.log('No Service available');
                return;
            }

            console.log('Found Services: ', children);

            children.forEach(child => {
                const fullPath = `${SERVICE_PATH}/${child}`;
                client.getData(fullPath, (err, data) => {
                    if (!err && data) {
                        console.log(`${child}: ${data.toString()}`);
                    }
                });
            });
        }
    );
}

```

### 简单负载均衡

* 服务注册(server.js)：每个服务实例自动注册到ZooKeeper
* 服务发现 + 负载均衡(client.js)：客户端动态获取所有可用节点，并轮询调用。

server.js

```js
// server.js
const zookeeper = require('node-zookeeper-client');
const os = require("os");

const client = zookeeper.createClient("127.0.0.1:2181");
const SERVICE_PATH = "/services/myapp";

client.once('connected', () => {
    console.log("Connected to ZooKeeper");
    registerService();
});

client.connect();

function registerService() {
    client.exists(SERVICE_PATH, (err, stat) => {
        if (err) {
            return console.error("exists error:", err);
        }
        if (!stat) {
            client.mkdirp(SERVICE_PATH, (err) => {
                if (err) {
                    return console.error("mkdir error:", err);
                }
                createEphemeralNode();
            });
        } else {
            createEphemeralNode();
        }
    });
}

function createEphemeralNode() {
    const ip = getLocalIP();
    const port = 3000 + Math.floor(Math.random() * 1000);
    const data = JSON.stringify({ ip, port });
    const nodePath = `${SERVICE_PATH}/node-`;

    client.create(nodePath, Buffer.from(data), zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL, (err, path) => {
        if (err) return console.error("create error:", err);
        console.log(`Service registered at ${path} (${data})`);
    });
}

function getLocalIP() {
    const interfaces = os.networkInterfaces();
    for (const name in interfaces) {
        for (const iface of interfaces[name]) {
            if (iface.family === 'IPv4' && !iface.internal) return iface.address;
        }
    }
    return '127.0.0.1';
}

```

* 每个服务实例在ZooKeeper `/services/myapp` 下创建一个 临时顺序节点
* 断开连接时节点自动删除
* 数据中保存该实例的IP和端口

client.js 服务实现 + 负载均衡端

```js
// client.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const SERVICE_PATH = '/services/myapp';
let services = [];
let currentIndex = 0; // 用于轮询选择

const client = zookeeper.createClient(ZK_SERVERS);

client.once('connected', () => {
    console.log('Connected to ZooKeeper');
    watchServices();
});

client.connect();

// 监听服务节点变化
function watchServices() {
    client.getChildren(
        SERVICE_PATH,
        event => {
            console.log('Service list changed:', event);
            watchServices();
        },
        (err, children) => {
            if (err) return console.error('getChildren error:', err);
            if (!children.length) {
                services = [];
                console.log('⚠️ No available services.');
                return;
            }

            // 获取每个节点的数据
            const temp = [];
            let pending = children.length;

            children.forEach(child => {
                const fullPath = `${SERVICE_PATH}/${child}`;
                client.getData(fullPath, (err, data) => {
                    if (!err && data) {
                        temp.push(JSON.parse(data.toString()));
                    }
                    if (--pending === 0) {
                        services = temp;
                        console.log('🔍 Current available services:', services);
                    }
                });
            });

        }
    );
}

// 监听轮询负载均衡
function getNextService() {
    if (services.length === 0) {
        console.log('❌ No service available.');
        return null;
    }
    const service = services[currentIndex % services.length];
    currentIndex++;
    return service;
}

// 模拟客户端调用
setInterval(() => {
    const service = getNextService();
    if (service) {
        console.log(`Sendding request to ${service.ip}:${service.port}`);
    }
}, 2000);

```

* 动态监听 `/services/myapp` 子节点
* 当服务上下线时自动更新列表
* 使用轮询（Round Robin）策略选取服务节点
* 每2秒模拟一次请求

### Leader选举

这是分布式系统里非常经典的场景，像 Kafka、HBase、etcd 都使用类似机制。

目标：实现一个简单的 Leader 选举算法

* 所有节点都在 ZooKeeper 的同一个路径下创建 临时顺序节点(EPHEMERAL_SEQUENTIAL)
* 节点中序最小的那个成为 Leader
* 如果 Leader 掉线（节点被删除），下一个最小的节点自动接任
* 所有节点能实时监听变化

leader.js 实现Leader选举

```js
const zookeeper = require('node-zookeeper-client');
const os = require('os');

const ZK_SERVERS = '127.0.0.1:2181';
const ELECTION_PATH = '/election';

const client = zookeeper.createClient(ZK_SERVERS);
let currentNodePath = null; // 当前节点路径

client.once('connected', () => {
    console.log('Connected to ZooKeeper');
    startElection();
});

client.connect();

// 创建选举节点
function startElection() {
    client.mkdirp(ELECTION_PATH, (err) => {
        if (err) {
            return console.error('mkdirp error:', err);
        }
        createElectionNode();
    });
}

function createElectionNode() {
    const nodePath = `${ELECTION_PATH}/node-`;
    const nodeData = Buffer.from(JSON.stringify({
        host: getLocalIP(),
        pid: process.pid,
    }));

    client.create(nodePath, nodeData, zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL, (err, path) => {
        if (err) return console.error('create node error:', err);

        currentNodePath = path;
        console.log(`Created election node: ${path}`);
        checkLeader();
    });
}

// 检查自己是否是 Leader
function checkLeader() {
    client.getChildren(ELECTION_PATH, event => {
        console.log('Election node changed:', event);
        checkLeader(); // 重新检测
    }, (err, children) => {
        if (err) return console.error('getChildren error:', err);

        // 排序节点
        const sorted = children.sort();
        const myNode = currentNodePath.split('/').pop();

        if (sorted[0] === myNode) {
            console.log('I am the Leader!');
        } else {
            const leaderNode = sorted[0];
            console.group(`I am a Follower, current Leader: ${leaderNode}`);

            // 监听比自己小的那个节点（前一个节点）
            const myIndex = sorted.indexOf(myNode);
            const watchTarget = `${ELECTION_PATH}/${sorted[myIndex - 1]}`;

            client.exists(watchTarget, event => {
                console.log(`Watched node deleted: ${event}`);
                checkLeader(); // 前一个节点删除后重新检查
            }, (err, stat) => {
                if (err) return console.error('exist error:', err);
                if (!stat) checkLeader(); // 前一个节点被删除时重新检查
            });
        }
    });
}

function getLocalIP() {
    const interfaces = os.networkInterfaces();
    for (const name in interfaces) {
        for (const iface of interfaces[name]) {
            if (iface.family === 'IPv4' && !iface.internal) return iface.address;
        }
    }
    return '127.0.0.1';
}
```

### 公平锁

ZooKeeper的分布式锁原理：

1. 所有客户端在某个目录（如 /locks/mylock）下 __创建顺序节点（EPHEMERAL_SEQUENTIAL）;
2. ZooKeeper按顺序编号分配节点名，如

```bash
/locks/mylock/lock-0000000001
/locks/mylock/lock-0000000002
/locks/mylock/lock-0000000003
```

3. 序号最小的节点即获得锁
4. 其他节点监听前一个节点的删除事件
5. 当持锁者释放锁（断开或删除节点）时，排队下一个节点获得锁
6. ZooKeeper保证顺序一致性，从而实现公平性（FIFO）

fair_lock.js 分布式公平锁实现

```javascript
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/locks/mylock';

const client = zookeeper.createClient(ZK_SERVERS);
let myNodePath = null;
let lockReleased = false;
let lockAcquired = false; // ✅ 防止重复获得锁

client.once('connected', () => {
    console.log('Connected to ZooKeeper');
    acquireLock();
});

client.connect();

function acquireLock() {
    client.mkdirp(LOCK_PATH, (err) => {
        if (err) return console.error('mkdirp error:', err);
        createLockNode();
    });
}

function createLockNode() {
    const nodePath = `${LOCK_PATH}/lock-`;
    client.create(
        nodePath,
        Buffer.from('lock-node'),
        zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
        (err, path) => {
            if (err) return console.error('create error:', err);
            myNodePath = path;
            console.log(`🔒 Created lock node: ${path}`);
            checkLock();
        }
    );
}

function checkLock() {
    if (!myNodePath || lockReleased || lockAcquired) return;

    client.getChildren(
        LOCK_PATH,
        event => {
            if (!lockReleased && !lockAcquired) {
                console.log('📡 Lock node event:', event);
                checkLock();
            }
        },
        (err, children) => {
            if (err) return console.error('getChildren error:', err);
            if (!myNodePath || lockReleased || lockAcquired) return;

            const sorted = children.sort();
            const myNode = myNodePath.split('/').pop();
            const myIndex = sorted.indexOf(myNode);

            if (myIndex === 0) {
                lockAcquired = true; // ✅ 防止重复进入临界区
                console.log('✅ Lock acquired! I am the holder.');
                doCriticalSection();
            } else {
                const watchTarget = `${LOCK_PATH}/${sorted[myIndex - 1]}`;
                console.log(`⏳ Waiting for ${watchTarget} to be released...`);
                client.exists(
                    watchTarget,
                    event => {
                        if (!lockReleased && !lockAcquired && event.getType() ===
                            zookeeper.Event.NODE_DELETED) {
                            console.log('📢 Previous lock released, rechecking...');
                            checkLock();
                        }
                    },
                    (err, stat) => {
                        if (err) return console.error('exists error:', err);
                        if (!stat) checkLock(); // 前一个节点可能刚被删掉
                    }
                );
            }
        }
    );
}

function doCriticalSection() {
    console.log('🧠 Doing critical section...');
    setTimeout(() => {
        console.log('🧹 Releasing lock...');
        releaseLock();
    }, 5000);
}

function releaseLock() {
    if (!myNodePath || lockReleased) return;
    lockReleased = true;

    client.remove(myNodePath, (err) => {
        if (err) {
            if (err.code === zookeeper.Exception.NO_NODE) {
                console.warn('⚠️ Node already removed, safe to ignore.');
            } else {
                console.error('remove error:', err);
            }
        } else {
            console.log('🔓 Lock released!');
        }

        myNodePath = null;
        setTimeout(() => client.close(), 1000);
    });
}

```

### 乐观锁

1. 原理说明（通俗理解）：

在ZooKeeper里，每个节点的 dataVersion 都相当于“版本号”。每次修改数据时，ZooKeeper都会让版本号+1。

所谓“乐观锁（Optimistic Lock）”，就是：

我认为我读到的数据在我修改前不会被别人改过，修改时如果版本号不匹配，就说明“别人改了”，我放弃修改或重试。

2. 示例场景

我们创建一个节点 `/optimistic/item`，里面存一个库存数，比如“stock=10”。

两个客户端同时读取该节点，然后尝试修改它：

* 客户端A：把库存改成9
* 客户端B：把库存改成8

ZooKeeper会检查版本号，如果版本不匹配，修改失败（相当于CAS：Compare And Swap）

optimistic_lock.js

```javascript
// optimistic_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const PATH = '/optimistic/item';
const client = zookeeper.createClient(ZK_SERVERS);

client.once('connected', () => {
    console.log('✅ Connected to ZooKeeper.');
    // client.addAuthInfo('digest', Buffer.from('admin:admin123'));
    initNode(() => simulateClients());
});

client.connect();

// 初始化节点
function initNode(callback) {
    client.exists(PATH, (err, stat) => {
        if (err) return console.error('exists error:', err);
        if (stat) return callback();

        client.mkdirp('/optimistic', (err) => {
            if (err) return console.error('mkdirp error:', err);

            client.create(
                PATH,
                Buffer.from('stock=10'),
                zookeeper.CreateMode.PERSISTENT,
                (err) => {
                    if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                        return console.error('create error:', err);
                    }
                    console.log('🧱 Initialized node with stock=10');
                    callback();
                }
            );
        });
    });
}

// 模拟两个客户端同时修改库存
function simulateClients() {
    readAndUpdate('Client A', -1, () => {
        readAndUpdate('Client B', -2, () => {
            // 全部结束
            setTimeout(() => client.close(), 1000);
        });
    });
}

// 读取并尝试更新数据（带版本检查）
function readAndUpdate(clientName, delta, callback) {
    client.getData(PATH, (err, data, stat) => {
        if (err) return console.error(`${clientName} getData error:`, err);

        const stockStr = data.toString('utf8');
        const stock = parseInt(stockStr.split('=')[1]);
        const newStock = stock + delta;

        console.log(`${clientName} reads ${stockStr}, version=${stat.version}`);
        console.log(`${clientName} wants to update to stock=${newStock}`);

        // 尝试更新（使用当前 version）
        const newData = Buffer.from(`stock=${newStock}`);
        client.setData(PATH, newData, stat.version, (err, newStat) => {
            if (err) {
                if (err.getCode && err.getCode() === zookeeper.Exception.BADVERSION) {
                    console.error(`❌ ${clientName} update failed (version conflict).`);
                } else {
                    console.error(`${clientName} setData error:`, err);
                }
            } else {
                console.log(`✅ ${clientName} successfully updated! newVersion=${newStat.version}`);
            }
            callback();
        });
    });
}

```

### 悲观锁

1. 概念

* 乐观锁：假设别人不会改，提交时再验证版本号
* 悲观锁：假设别人一定会改，所以先上锁，操作完再解锁

在ZooKeeper里，“悲观锁”常用 __临时顺序节点（EPHEMERAL_SEQUENTIAL）__ 实现。原理和“公平锁”几乎一样，只是：

“悲观锁会先阻止其他客户端进入临界区，等自己处理完再释放。”

2. 设计场景

假设我们有一个共享资源节点 `/pessimistic/resource`，多个客户端要修改它，但不能同时操作。

我们使用 ZooKeeper 的锁节点 `/locks/pessimistic`:

* 每个客户端在该路径下创建一个临时顺序节点
* 节点号最小者获得锁
* 其他客户端等待前一个节点删除
* 处理完后删除自己的节点，释放锁

3. pessimistic_lock.js

```javascript
// pessimistic_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/locks/pessimistic';
const RESOURCE_PATH = '/pessimistic/resource';

const client = zookeeper.createClient(ZK_SERVERS);
let myNodePath = null;

client.once('connected', () => {
    console.log('✅ Connected to ZooKeeper.');
    client.addAuthInfo('digest', Buffer.from('admin:admin123'));
    ensurePaths(() => acquireLock());
});

client.connect();

// 确保锁目录和资源存在
function ensurePaths(callback) {
    client.mkdirp(LOCK_PATH, (err) => {
        if (err) return console.error('mkdirp lock error:', err);

        client.exists(RESOURCE_PATH, (err, stat) => {
            if (err) return console.error('exists error:', err);
            if (stat) return callback();

            client.mkdirp('/pessimistic', (err) => {
                if (err) return console.error('mkdirp resource error:', err);
                client.create(RESOURCE_PATH,
                    Buffer.from('value=0'),
                    zookeeper.CreateMode.PERSISTENT,
                    (err) => {
                        if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS)
                            return console.error('create resource error:', err);
                        console.log('🧱 Initialized resource: value=0');
                        callback();
                    });
            });
        });
    });
}

// 获取锁
function acquireLock() {
    const nodePrefix = `${LOCK_PATH}/lock-`;
    client.create(nodePrefix,
        Buffer.from('lock-node'),
        zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
        (err, path) => {
            if (err) return console.error('create lock node error:', err);
            myNodePath = path;
            console.log(`🔒 Created lock node: ${path}`);
            checkLock();
        });
}

// 检查自己是否获得锁
function checkLock() {
    client.getChildren(LOCK_PATH, (err, children) => {
        if (err) return console.error('getChildren error:', err);
        const sorted = children.sort();
        const myNode = myNodePath.split('/').pop();
        const myIndex = sorted.indexOf(myNode);

        if (myIndex === 0) {
            console.log('✅ Lock acquired! Performing exclusive operation...');
            doCriticalSection();
        } else {
            const watchTarget = `${LOCK_PATH}/${sorted[myIndex - 1]}`;
            console.log(`⏳ Waiting for ${watchTarget} to be released...`);
            client.exists(watchTarget, event => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) {
                    console.log('📢 Previous lock released, rechecking...');
                    checkLock();
                }
            }, (err, stat) => {
                if (err) return console.error('exists error:', err);
                if (!stat) checkLock();
            });
        }
    });
}

// 执行临界区操作
function doCriticalSection() {
    client.getData(RESOURCE_PATH, (err, data, stat) => {
        if (err) return console.error('getData error:', err);

        let value = parseInt(data.toString('utf8').split('=')[1]);
        console.log(`🔍 Read value=${value}`);

        // 模拟操作（加1）
        value++;
        const newData = Buffer.from(`value=${value}`);

        client.setData(RESOURCE_PATH, newData, stat.version, (err, newStat) => {
            if (err) return console.error('setData error:', err);
            console.log(`🧠 Updated resource to value=${value} (version ${newStat.version})`);

            // 模拟任务耗时
            setTimeout(() => {
                releaseLock();
            }, 3000);
        });
    });
}

// 释放锁
function releaseLock() {
    if (!myNodePath) return;
    console.log('🧹 Releasing lock...');
    client.remove(myNodePath, (err) => {
        if (err) {
            if (err.getCode && err.getCode() === zookeeper.Exception.NO_NODE) {
                console.warn('⚠️ Lock node already removed.');
            } else {
                console.error('remove error:', err);
            }
        } else {
            console.log('🔓 Lock released!');
        }
        client.close();
    });
}

```

### 可重入锁

1. 可重入锁原理说明

在分布式系统中，“可重入锁（Reentrant Lock）” 允许同一个线程（或客户端）多次获取同一把锁，而不会被自己阻塞。

实现关键点：

* 同一客户端在第一次获取锁后，记录自己持有锁的次数
* 后续请求锁时，如果当前持有者是自己，直接增加计数（而不是重新排队）
* 释放锁时，计数减一，直到计数为0时才真正释放ZooKeeper节点。

|特性|说明|
|---|---|
|可重入性|同一客户端多次加锁不会阻塞自己|
|基于Ephemeral节点|ZooKeeper自动清理掉死连接|
|分布式安全性|即使进程崩溃，锁也能释放|
|客户端唯一标识|通过CLIENT_ID区分不同实例|

2. reentrant_lock.js

```javascript
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/reentrant_lock';
const LOCK_NAME = 'mylock';
const CLIENT_ID = 'client-A';  // 模拟当前客户端唯一ID

const client = zookeeper.createClient(ZK_SERVERS);
client.connect();

let lockPath = null;
let reentrantCount = 0;

client.once('connected', async () => {
    console.log('✅ Connected to ZooKeeper');

    await ensurePath(LOCK_ROOT);
    await acquireLock();
    await acquireLock(); // 再次获取（测试可重入）

    setTimeout(async () => {
        await releaseLock();
        await releaseLock(); // 第二次释放才真正删除锁节点
        process.exit(0);
    }, 4000);
});

/**
 * 确保根路径存在
 */
function ensurePath(path) {
    return new Promise((resolve) => {
        client.exists(path, (err, stat) => {
            if (stat) return resolve();
            client.create(path, (err2) => {
                if (err2 && err2.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                    console.error('❌ create path error:', err2);
                }
                resolve();
            });
        });
    });
}

/**
 * 尝试获取锁
 */
async function acquireLock() {
    if (lockPath) {
        reentrantCount++;
        console.log(`🔁 Re-entered lock (${reentrantCount} times)`);
        return;
    }

    const path = `${LOCK_ROOT}/${LOCK_NAME}`;
    try {
        lockPath = await createEphemeralNode(path, CLIENT_ID);
        reentrantCount = 1;
        console.log(`🔒 Lock acquired by ${CLIENT_ID}`);
    } catch (err) {
        if (err.code === zookeeper.Exception.NODE_EXISTS) {
            console.log(`⏳ Lock held by others, waiting...`);
            await waitForLock(path);
            return acquireLock(); // 重试
        } else {
            console.error('❌ acquire lock error:', err);
        }
    }
}

/**
 * 释放锁
 */
async function releaseLock() {
    if (!lockPath) return;
    reentrantCount--;
    console.log(`🧹 release lock called (${reentrantCount} remaining)`);

    if (reentrantCount === 0) {
        await deleteNode(lockPath);
        lockPath = null;
        console.log('🔓 Lock fully released');
    }
}

/**
 * 创建临时节点
 */
function createEphemeralNode(path, data) {
    return new Promise((resolve, reject) => {
        client.create(
            path,
            Buffer.from(data),
            zookeeper.CreateMode.EPHEMERAL,
            (err, createdPath) => {
                if (err) return reject(err);
                resolve(createdPath);
            }
        );
    });
}

/**
 * 删除节点
 */
function deleteNode(path) {
    return new Promise((resolve) => {
        client.remove(path, (err) => {
            if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
                console.error('remove error:', err);
            }
            resolve();
        });
    });
}

/**
 * 等待锁释放（监听删除事件）
 */
function waitForLock(path) {
    return new Promise((resolve) => {
        const watcher = (event) => {
            if (event.getType() === zookeeper.Event.NODE_DELETED) {
                console.log('📢 Lock node deleted, can retry.');
                resolve();
            } else {
                client.exists(path, watcher, () => { });
            }
        };
        client.exists(path, watcher, () => { });
    });
}

```

### 读写锁

1. 读写锁（Read-Write Lock）原理

在分布式系统中，读写锁允许：

* 多个读者（Reader）可以同时访问资源
* 写者（Writer）需要独占资源
* 当写者存在时，新的读者必须等待

ZooKeeper实现思路

|类型|节点命名规则|可并发情况|
|读锁| `/locks/read-xxxxxx` |可以与其他读锁共存|
|写锁| `/locks/write-xxxxxx` | 需要等待前面所有锁释放 |

实现规则：

* 读写锁都通过创建 顺序临时节点 来排队
* 写锁等待前面所有节点都删除
* 读锁只需等待前面所有写锁删除即可
* 锁释放后自动触发监听事件

|特性|说明|
|---|---|
|读共享|多个读锁可同时持有|
|写独占|写锁必须等待前面所有锁释放|
|FIFO顺序|使用`EPHEMERAL_SEQUENTIAL`实现公平排队|
|自动释放|客户端断开时锁节点自动清除|

2. rw_lock.js

```javascript
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/rw_locks';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();

client.once('connected', async () => {
    console.log('✅ Connected to ZooKeeper');
    await ensurePath(LOCK_ROOT);

    // 测试：一个读锁 + 一个写锁
    simulateClient('Reader-1', 'read');
    setTimeout(() => simulateClient('Writer-1', 'write'), 1000);
    setTimeout(() => simulateClient('Reader-2', 'read'), 2000);
});

async function ensurePath(path) {
    return new Promise((resolve) => {
        client.exists(path, (err, stat) => {
            if (stat) return resolve();
            client.create(path, (err2) => {
                if (err2 && err2.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                    console.error('❌ create path error:', err2);
                }
                resolve();
            });
        });
    });
}

/**
 * 模拟不同客户端
 */
async function simulateClient(id, type) {
    const lock = new ReadWriteLock(client, LOCK_ROOT, id, type);
    await lock.acquire();
    console.log(`🧠 [${id}] doing ${type} operation...`);
    setTimeout(async () => {
        await lock.release();
        console.log(`🔓 [${id}] released ${type} lock`);
    }, 3000);
}

/**
 * 读写锁类
 */
class ReadWriteLock {
    constructor(client, root, id, type) {
        this.client = client;
        this.root = root;
        this.id = id;
        this.type = type; // 'read' | 'write'
        this.nodePath = null;
    }

    async acquire() {
        this.nodePath = await this.createLockNode();
        console.log(`🔒 [${this.id}] requested ${this.type} lock: ${this.nodePath}`);

        while (true) {
            const children = await this.getChildren();
            const sorted = children.sort();
            const myNode = this.nodePath.split('/').pop();
            const myIndex = sorted.indexOf(myNode);

            if (this.type === 'write') {
                // 写锁需等待前面所有节点都释放
                if (myIndex === 0) break;
            } else {
                // 读锁：只需等待前面所有“写锁”
                const beforeMe = sorted.slice(0, myIndex);
                const hasWriterBefore = beforeMe.some(name => name.startsWith('write-'));
                if (!hasWriterBefore) break;
            }

            const watchNode = `${this.root}/${sorted[myIndex - 1]}`;
            console.log(`⏳ [${this.id}] waiting for ${watchNode}...`);
            await this.waitForDelete(watchNode);
        }

        console.log(`✅ [${this.id}] acquired ${this.type} lock`);
    }

    async release() {
        if (this.nodePath) {
            await new Promise((resolve) => {
                this.client.remove(this.nodePath, (err) => {
                    if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
                        console.error('remove error:', err);
                    }
                    resolve();
                });
            });
            this.nodePath = null;
        }
    }

    getChildren() {
        return new Promise((resolve, reject) => {
            this.client.getChildren(this.root, (err, children) => {
                if (err) reject(err);
                else resolve(children);
            });
        });
    }

    createLockNode() {
        const nodePrefix = `${this.root}/${this.type}-`;
        return new Promise((resolve, reject) => {
            this.client.create(
                nodePrefix,
                Buffer.from(this.id),
                zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
                (err, path) => {
                    if (err) reject(err);
                    else resolve(path);
                }
            );
        });
    }

    waitForDelete(path) {
        return new Promise((resolve) => {
            const watcher = (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) {
                    resolve();
                } else {
                    this.client.exists(path, watcher, () => { });
                }
            };
            this.client.exists(path, watcher, () => { });
        });
    }
}

```

### 超时锁

1. 什么是“超时锁”

分布式系统中，如果某个客户端：

* 获得了锁，但 挂了 或 逻辑异常
* 锁节点虽然是临时的，但 ZooKeeper 会话还没过期（如果网络抖动）

这时，其他客户端会一直卡住等待。所以，我们希望加上一个 超时机制（Timeout）：

超时锁 = ZooKeeper分布式锁 + 本地计时器（过期自动释放）

2. 实现思路

* 创建锁节点（EPHEMERAL_SEQUENTIAL）
* 如果获得锁，则启动一个本地 setTimeout 计时
* 时间到未主动释放，则 自动删除节点（释放锁）
* 其他节点感知锁释放后自动尝试获取

|特性|说明|
|---|---|
|公平性|按顺序号排队，先来先得|
|自动释放|本地超时+ZooKeeper session失效 双保险|
|容错性|即使节点断连，ZooKeeper会自动删除临时节点|
|安全性|不会发生死锁或永久等待|

3. timeout_lock.js

```javascript
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/timeout_locks/mylock';
const client = zookeeper.createClient(ZK_SERVERS);

client.connect();

client.once('connected', () => {
    console.log('✅ Connected to ZooKeeper');
    ensurePath(LOCK_PATH, () => {
        simulateClient('Client-A', 5000);
        setTimeout(() => simulateClient('Client-B', 1000), 1000);
    });
});

// 确保路径存在
function ensurePath(path, cb) {
    client.mkdirp(path, (err) => {
        if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS)
            console.error('mkdirp error:', err);
        cb();
    });
}

// 模拟不同客户端
function simulateClient(id, timeoutMs) {
    const lock = new TimeoutLock(client, LOCK_PATH, id, timeoutMs);
    lock.acquire()
        .then(() => {
            console.log(`🧠 [${id}] doing critical work...`);
            // 模拟长任务
            setTimeout(() => {
                lock.release();
            }, 3000);
        })
        .catch(err => console.error(`[${id}] failed to acquire:`, err));
}

/**
 * 超时锁类
 */
class TimeoutLock {
    constructor(client, lockPath, id, timeoutMs = 10000) {
        this.client = client;
        this.lockPath = lockPath;
        this.id = id;
        this.nodePath = null;
        this.timeoutMs = timeoutMs;
        this.timer = null;
    }

    async acquire() {
        this.nodePath = await this.createLockNode();
        console.log(`🔒 [${this.id}] requested lock: ${this.nodePath}`);

        while (true) {
            const children = await this.getChildren();
            const sorted = children.sort();
            const myNode = this.nodePath.split('/').pop();
            const myIndex = sorted.indexOf(myNode);

            if (myIndex === 0) {
                console.log(`✅ [${this.id}] acquired lock`);
                this.startTimeout();
                return;
            }

            const prevNode = `${this.lockPath}/${sorted[myIndex - 1]}`;
            console.log(`⏳ [${this.id}] waiting for ${prevNode}...`);
            await this.waitForDelete(prevNode);
        }
    }

    // 启动超时定时器
    startTimeout() {
        this.timer = setTimeout(() => {
            console.log(`⏰ [${this.id}] lock timeout (${this.timeoutMs}ms)! Releasing...`);
            this.release();
        }, this.timeoutMs);
    }

    async release() {
        if (this.timer) clearTimeout(this.timer);
        if (this.nodePath) {
            await new Promise((resolve) => {
                this.client.remove(this.nodePath, (err) => {
                    if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
                        console.error(`[${this.id}] remove error:`, err);
                    } else {
                        console.log(`🔓 [${this.id}] released lock`);
                    }
                    resolve();
                });
            });
            this.nodePath = null;
        }
    }

    createLockNode() {
        const prefix = `${this.lockPath}/lock-`;
        return new Promise((resolve, reject) => {
            this.client.create(
                prefix,
                Buffer.from(this.id),
                zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
                (err, path) => {
                    if (err) reject(err);
                    else resolve(path);
                }
            );
        });
    }

    getChildren() {
        return new Promise((resolve, reject) => {
            this.client.getChildren(this.lockPath, (err, children) => {
                if (err) reject(err);
                else resolve(children);
            });
        });
    }

    waitForDelete(path) {
        return new Promise((resolve) => {
            const watcher = (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) {
                    resolve();
                } else {
                    this.client.exists(path, watcher, () => { });
                }
            };
            this.client.exists(path, watcher, () => { });
        });
    }
}

```

来有类似的 支持自动续期”的 可续租超时锁（Renewable Timeout Lock）

### 尝试锁

1. 什么是“尝试锁 TryLock”

“尝试锁”与普通锁的区别在于：

|类型|行为|
|---|---|
|普通锁|获取不到会一直阻塞等待|
|尝试锁TryLock|尝试获取，如果锁被占用，立即返回失败或等待指定时间后放弃|

这非常适合做非阻塞操作，比如：某个任务只要资源没空闲就跳过，而不是卡在哪里。

2. 实现思路

基于ZooKeeper的锁机制：

* 创建一个父路径 `/trylock/mylock`
* 每个客户端尝试创建一个临时节点（EPHEMERAL）
* 如果创建成功->获得锁
* 如果节点已存在（被别人持有）-> 立即返回失败
* （可选）支持等待超时模式，在指定时间内重试

|特性|说明|
|---|---|
|✅ 非阻塞	|无需等待，失败立即返回|
|⏱ 可配置超时	|可指定等待时长，超时放弃|
|🔒 安全	|使用临时节点，断线自动释放|
|🧩 应用场景	|秒杀库存、任务去重、单次调度防重入|

3. try_lock.js

```javascript
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/trylock/mylock';
const client = zookeeper.createClient(ZK_SERVERS);

client.once('connected', async () => {
    console.log('✅ Connected to ZooKeeper');
    await ensurePath(LOCK_PATH);

    // 启动两个客户端模拟
    tryLockClient('Client-A', 0); // 立即尝试
    setTimeout(() => tryLockClient('Client-B', 3000), 1000); // 等待重试
});

client.connect();

// 确保路径存在
function ensurePath(path) {
    return new Promise((resolve) => {
        client.mkdirp(path, (err) => {
            if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                console.error('mkdirp error:', err);
            }
            resolve();
        });
    });
}

// 模拟客户端尝试锁
async function tryLockClient(id, retryDelay = 0) {
    const lock = new TryLock(client, LOCK_PATH, id);
    const success = await lock.tryAcquire(2000); // 2秒超时模式
    if (success) {
        console.log(`✅ [${id}] got lock! Doing work...`);
        setTimeout(() => lock.release(), 3000);
    } else {
        console.log(`❌ [${id}] failed to acquire lock.`);
        if (retryDelay > 0) {
            console.log(`🔁 [${id}] will retry after ${retryDelay}ms`);
            setTimeout(() => tryLockClient(id), retryDelay);
        }
    }
}

/**
 * 尝试锁类
 */
class TryLock {
    constructor(client, basePath, id) {
        this.client = client;
        this.lockNode = `${basePath}/lock-node`;
        this.id = id;
    }

    tryAcquire(timeoutMs = 0) {
        return new Promise((resolve) => {
            const tryCreate = () => {
                this.client.create(
                    this.lockNode,
                    Buffer.from(this.id),
                    zookeeper.CreateMode.EPHEMERAL,
                    (err) => {
                        if (!err) {
                            resolve(true); // ✅ 成功获得锁
                        } else if (err.getCode() === zookeeper.Exception.NODE_EXISTS) {
                            if (timeoutMs > 0) {
                                // 等待锁释放
                                this.waitForLock(timeoutMs).then(resolve);
                            } else {
                                resolve(false); // ❌ 立即失败
                            }
                        } else {
                            console.error(`[${this.id}] unexpected error:`, err);
                            resolve(false);
                        }
                    }
                );
            };

            tryCreate();
        });
    }

    waitForLock(timeoutMs) {
        return new Promise((resolve) => {
            const timer = setTimeout(() => {
                console.log(`⏰ [${this.id}] timeout waiting for lock`);
                resolve(false);
            }, timeoutMs);

            const watch = (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) {
                    clearTimeout(timer);
                    this.tryAcquire(0).then(resolve);
                }
            };

            this.client.exists(this.lockNode, watch, () => { });
        });
    }

    release() {
        this.client.remove(this.lockNode, (err) => {
            if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
                console.error(`[${this.id}] remove error:`, err);
            } else {
                console.log(`🔓 [${this.id}] released lock`);
            }
        });
    }
}

```

还有，带重试次数上限的 TryLock（可重试非阻塞锁）

### 分段锁

1. 分段锁是什么

分段锁（Segment Lock）是一种用于提高并发性能的锁机制，它将数据划分为多个段（Segment），每个段有自己独立的锁。这样不同段上的操作可以并行执行，而同一段仍然是串行的。

例如：

* 你要锁定用户数据，可按 `userId%10` 分成10段
* 这样不同用户可能落在不同的锁节点下，从而并行

2. ZooKeeper实现思路

我们利用ZooKeeper的 临时顺序节点（Ephemeral Sequential Node）实现：

* 根节点：`/locks-segment`
* 子节点：`/locks-segment/segment-<id>` 表示某个段的锁目录
* 每个段目录下创建锁节点：`/locks-segment/segment-<id>/lock-xxxx`

|特性|说明|
|---|---|
|锁粒度|每个段独立|
|并发性能|显著提升|
|典型应用|用户缓存更新、库存更新|
|ZNode结构| `/lock-segment/segment-N/lock-xxxx` |

3. segment_lock.js

```javascript
// segment_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const BASE_PATH = '/locks-segment';
const SEGMENT_COUNT = 8; // 分成8段
const client = zookeeper.createClient(ZK_SERVERS);

// 防止 Node.js 警告：监听器太多
client.setMaxListeners(1000);

client.once('connected', async () => {
    console.log('Connected to ZooKeeper');
    await initSegments();
    // 模拟多个客户端尝试加锁
    simulateClients();
});

client.connect();

async function initSegments() {
    for (let i = 0; i < SEGMENT_COUNT; i++) {
        await mkdirp(`${BASE_PATH}/segment-${i}`);
    }
}

// 确保路径存在
function mkdirp(path) {
    return new Promise((resolve, reject) => {
        client.mkdirp(path, (err) => (err ? reject(err) : resolve()));
    });
}

// 等待节点删除（只监听一次）
function waitForDelete(path) {
    return new Promise((resolve) => {
        client.exists(
            path,
            (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) {
                    resolve();
                }
            },
            (err, stat) => {
                if (!stat) resolve();
            }
        );
    });
}

// 尝试加锁
async function acquireSegmentLock(segmentId, resourceId) {
    const segmentPath = `${BASE_PATH}/segment-${segmentId}`;
    const nodePath = await new Promise((resolve, reject) => {
        client.create(
            `${segmentPath}/lock-`,
            Buffer.from(`lock-${resourceId}`),
            zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
            (err, path) => (err ? reject(err) : resolve(path))
        );
    });

    const nodeName = nodePath.split('/').pop();
    await tryAcquire(segmentPath, nodeName, segmentId, resourceId);
    return nodePath;
}

async function tryAcquire(segmentPath, nodeName, segmentId, resourceId) {
    const children = await new Promise((resolve, reject) => {
        client.getChildren(segmentPath, (err, ch) => (err ? reject(err) : resolve(ch)));
    });

    children.sort();
    const index = children.indexOf(nodeName);

    if (index === 0) {
        console.log(`✅ [${resourceId}] Got lock on segment-${segmentId}`);
        doWork(segmentPath, nodeName, segmentId, resourceId);
    } else {
        const prevNode = children[index - 1];
        console.log(`⏳ [${resourceId}] Waiting for ${prevNode} on segment-${segmentId}...`);
        await waitForDelete(`${segmentPath}/${prevNode}`);
        await tryAcquire(segmentPath, nodeName, segmentId, resourceId);
    }
}

// 模拟业务逻辑
async function doWork(segmentPath, nodeName, segmentId, resourceId) {
    console.log(`🧠 [${resourceId}] Doing work on segment-${segmentId}...`);
    await sleep(3000 + Math.random() * 2000);

    console.log(`🧹 [${resourceId}] Releasing lock on segment-${segmentId}...`);
    await new Promise((resolve) => {
        client.remove(`${segmentPath}/${nodeName}`, (err) => {
            if (err) console.error('remove error:', err);
            else console.log(`🔓 [${resourceId}] Released lock ${segmentPath}/${nodeName}`);
            resolve();
        });
    });
}

// 延迟函数
function sleep(ms) {
    return new Promise((r) => setTimeout(r, ms));
}

// 模拟多个客户端争抢不同 segment
function simulateClients() {
    const tasks = [101, 202, 303, 404, 505, 606, 707, 808];
    for (const id of tasks) {
        const segmentId = Math.floor(Math.random() * SEGMENT_COUNT);
        acquireSegmentLock(segmentId, id).catch(console.error);
    }
}

```

```bash
root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node  segment_lock.js
Connected to ZooKeeper
✅ [101] Got lock on segment-4
🧠 [101] Doing work on segment-4...
✅ [202] Got lock on segment-3
🧠 [202] Doing work on segment-3...
✅ [303] Got lock on segment-2
🧠 [303] Doing work on segment-2...
✅ [404] Got lock on segment-7
🧠 [404] Doing work on segment-7...
✅ [505] Got lock on segment-5
🧠 [505] Doing work on segment-5...
⏳ [606] Waiting for lock-0000000003 on segment-2...
⏳ [707] Waiting for lock-0000000003 on segment-4...
✅ [808] Got lock on segment-6
🧠 [808] Doing work on segment-6...
🧹 [505] Releasing lock on segment-5...
🔓 [505] Released lock /locks-segment/segment-5/lock-0000000003
🧹 [101] Releasing lock on segment-4...
🔓 [101] Released lock /locks-segment/segment-4/lock-0000000003
✅ [707] Got lock on segment-4
🧠 [707] Doing work on segment-4...
🧹 [303] Releasing lock on segment-2...
🔓 [303] Released lock /locks-segment/segment-2/lock-0000000003
✅ [606] Got lock on segment-2
🧠 [606] Doing work on segment-2...
🧹 [808] Releasing lock on segment-6...
🔓 [808] Released lock /locks-segment/segment-6/lock-0000000000
🧹 [404] Releasing lock on segment-7...
🔓 [404] Released lock /locks-segment/segment-7/lock-0000000003
🧹 [202] Releasing lock on segment-3...
🔓 [202] Released lock /locks-segment/segment-3/lock-0000000000
🧹 [707] Releasing lock on segment-4...
🔓 [707] Released lock /locks-segment/segment-4/lock-0000000004
🧹 [606] Releasing lock on segment-2...
🔓 [606] Released lock /locks-segment/segment-2/lock-0000000004
```

### 多资源锁

这个版本适合用在 需要同时锁定多个资源 的场景，比如游戏后端中两个玩家的交易、多个库存项的更新等。

1. 什么是多资源锁

在分布式系统中，有时候你要同时锁住多个资源（例如 itemA 和 itemB）：

* 如果一个客户端锁了 itemA，另一个客户端锁了 itemB，两者都能继续；
* 但如果某个客户端要同时锁 itemA 和 itemB，则必须等待两者都空闲；
* 所有锁都必须按固定顺序申请，以避免 死锁。

我们可以利用 ZooKeeper 的临时顺序节点 机制来实现公平的多资源锁。

| 功能      | 实现说明                      |
| ------- | ------------------------- |
| ✅ 多资源加锁 | 一次性锁多个资源，确保顺序一致           |
| ✅ 死锁避免  | 对资源名称排序，保证锁申请顺序相同         |
| ✅ 公平性   | 使用 `EPHEMERAL_SEQUENTIAL` |
| ✅ 自动释放  | 会话断开自动清除节点                |
| ✅ 无内存泄漏 | 使用一次性 `exists` 监听         |
| ✅ 可扩展   | 可以在游戏、交易、库存更新等场景中直接使用     |

2. multi_resource_lock.js

```javascript
// multi_resource_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/multi-locks';

const client = zookeeper.createClient(ZK_SERVERS);
client.setMaxListeners(1000);

client.once('connected', async () => {
    console.log('✅ Connected to ZooKeeper');
    await mkdirp(LOCK_ROOT);
    simulateClients();
});

client.connect();

// 辅助函数
function mkdirp(path) {
    return new Promise((resolve, reject) => {
        client.mkdirp(path, (err) => (err ? reject(err) : resolve()));
    });
}

function sleep(ms) {
    return new Promise((r) => setTimeout(r, ms));
}

/**
 * @param {string[]} resources 要加锁的资源名数组，例如 ['user:101', 'user:202']
 * @param {string} clientId 客户端ID（打印标识）
 */
async function acquireMultiLock(resources, clientId) {
    // 按字典顺序排序，防止死锁
    const sortedResources = [...resources].sort();
    const lockPaths = [];

    try {
        for (const res of sortedResources) {
            const path = await acquireSingleLock(res, clientId);
            lockPaths.push(path);
        }

        console.log(`🔒 [${clientId}] Acquired locks on: ${sortedResources.join(', ')}`);
        await doWork(clientId, sortedResources);

    } finally {
        // 全部释放
        for (const path of lockPaths.reverse()) {
            await releaseLock(path, clientId);
        }
    }
}

/** 获取单个资源锁 */
async function acquireSingleLock(resource, clientId) {
    const resPath = `${LOCK_ROOT}/${resource}`;
    await mkdirp(resPath);

    const nodePath = await new Promise((resolve, reject) => {
        client.create(
            `${resPath}/lock-`,
            Buffer.from(clientId),
            zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
            (err, path) => (err ? reject(err) : resolve(path))
        );
    });

    const nodeName = nodePath.split('/').pop();
    await tryAcquire(resPath, nodeName, resource, clientId);
    return nodePath;
}

/** 检查是否获得锁，否则监听前一个节点 */
async function tryAcquire(resPath, nodeName, resource, clientId) {
    const children = await new Promise((resolve, reject) => {
        client.getChildren(resPath, (err, list) => (err ? reject(err) : resolve(list)));
    });

    children.sort();
    const index = children.indexOf(nodeName);

    if (index === 0) {
        console.log(`✅ [${clientId}] Got lock on ${resource}`);
        return;
    } else {
        const prevNode = children[index - 1];
        const prevPath = `${resPath}/${prevNode}`;
        await waitForDelete(prevPath);
        await tryAcquire(resPath, nodeName, resource, clientId);
    }
}

/** 等待前一个节点删除 */
function waitForDelete(path) {
    return new Promise((resolve) => {
        client.exists(
            path,
            (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) resolve();
            },
            (err, stat) => {
                if (!stat) resolve();
            }
        );
    });
}

/** 释放锁 */
async function releaseLock(nodePath, clientId) {
    return new Promise((resolve) => {
        client.remove(nodePath, (err) => {
            if (err) console.error(`❌ [${clientId}] Release error:`, err);
            else console.log(`🔓 [${clientId}] Released ${nodePath}`);
            resolve();
        });
    });
}

/** 模拟临界区任务 */
async function doWork(clientId, resources) {
    console.log(`🧠 [${clientId}] Working on ${resources.join(', ')}...`);
    await sleep(2000 + Math.random() * 2000);
}

/** 模拟多个客户端 */
function simulateClients() {
    const tasks = [
        { id: 'C1', res: ['user:101', 'item:5'] },
        { id: 'C2', res: ['item:5'] },
        { id: 'C3', res: ['user:101'] },
        { id: 'C4', res: ['item:6', 'user:101'] },
    ];

    for (const t of tasks) {
        acquireMultiLock(t.res, t.id).catch(console.error);
    }
}

```

| 客户端 ID | 想要加的锁资源                  | 说明                       |
| ------ | ------------------------ | ------------------------ |
| **C1** | `['user:101', 'item:5']` | 同时操作用户101和物品5（例如购买、交易操作） |
| **C2** | `['item:5']`             | 只操作物品5（例如库存更新）           |
| **C3** | `['user:101']`           | 只操作用户101（例如用户资料更新）       |
| **C4** | `['item:6', 'user:101']` | 同时操作物品6和用户101（例如新购买）     |

假设 ZooKeeper 目录 `/multi-locks` 结构如下：

```bash
/multi-locks/
 ├── user:101/
 ├── item:5/
 └── item:6/
```

每个客户端都会：

* 按资源名字典序排序（防止死锁）
    * `C1 → ['item:5', 'user:101']`
    * `C4 → ['item:6', 'user:101']`
* 逐个获取锁（创建临时顺序节点）
* 所有锁都拿到后执行工作
* 最后按相反顺序释放锁

举个运行时例子

* C1 想锁 item:5、user:101
* C2 想锁 item:5
* C3 想锁 user:101

执行顺序可能是：

```bash
✅ C2 先拿到 item:5
⏳ C1 等待 item:5
✅ C3 拿到 user:101
...
🔓 C2 释放 item:5
✅ C1 拿到 item:5
⏳ C1 等待 user:101（因为 C3 在用）
...
🔓 C3 释放 user:101
✅ C1 拿到 user:101
🧠 C1 开始工作
```

整个过程保证：

* 每个资源同一时刻只有一个客户端持有锁；
* 不同客户端不会出现死锁；
* ZooKeeper 自动清理掉异常退出的锁节点（因为是 EPHEMERAL 节点）。

| 功能    | 技术                               |
| ----- | -------------------------------- |
| 锁节点存放 | `/multi-locks/{resource}/lock-*` |
| 锁唯一标识 | EPHEMERAL_SEQUENTIAL 节点          |
| 死锁避免  | 按字典序加锁                           |
| 锁失效清理 | 客户端断开 ZooKeeper 自动删除             |
| 并发安全  | 等待上一个节点删除后重试                     |

还有 “锁超时自动释放 + 重试机制” 的版本

### 公平可重入读写锁

你现在已经玩到 ZooKeeper 锁的“高级关卡”了 ——
我们要实现一个 “公平 + 可重入 + 读写锁（Fair Reentrant ReadWrite Lock）” 的 Node.js 实践 Demo。

| 特性                       | 含义                                           |
| ------------------------ | -------------------------------------------- |
| **公平 (Fair)**            | 锁的获取顺序严格按照请求顺序（节点序号）执行，谁先排队谁先执行。             |
| **可重入 (Reentrant)**      | 同一个客户端（同一个 `clientId`）如果已经获得锁，可以再次进入，不会阻塞自己。 |
| **读写锁 (ReadWrite Lock)** | 允许多个读者共享锁，但写者必须独占；写锁优先于后续读锁。                 |

实现原理（基于ZooKeeper临时顺序节点）

在 `/rw-locks` 下为每个资源维护子节点：

```bash
/rw-locks/myResource/
  ├── read-0000000001
  ├── read-0000000002
  ├── write-0000000003
  └── read-0000000004

```

规则：

* 读锁
    * 可以获取锁，当且仅当前没有比它序号更小的“写锁”
* 写锁
    * 可以获取锁，当且仅当当前没有比它序号更小的任何“读锁”或“写锁”
* 公平性：由节点序号天然保证
* 可重入：如果该 clientId 已经持有锁（读/写）,则允许直接进入

fair_reentrant_rwlock.js

```javascript
// fair_reentrant_rwlock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/rw-locks';
const client = zookeeper.createClient(ZK_SERVERS);
client.setMaxListeners(1000);

const heldLocks = new Map(); // 记录 clientId 当前持有的锁 {clientId: Set(resources)}

client.once('connected', async () => {
    console.log('✅ Connected to ZooKeeper');
    await mkdirp(LOCK_ROOT);
    simulateClients();
});
client.connect();

function mkdirp(path) {
    return new Promise((resolve, reject) => {
        client.mkdirp(path, (err) => (err ? reject(err) : resolve()));
    });
}

function sleep(ms) {
    return new Promise((r) => setTimeout(r, ms));
}

/** 申请读或写锁 */
async function acquireLock(resource, clientId, mode) {
    const resPath = `${LOCK_ROOT}/${resource}`;
    await mkdirp(resPath);

    // ✅ 可重入支持
    if (heldLocks.get(clientId)?.has(resource)) {
        console.log(`♻️ [${clientId}] Re-entered ${mode} lock on ${resource}`);
        return { resPath, nodePath: null };
    }

    const nodePath = await new Promise((resolve, reject) => {
        client.create(
            `${resPath}/${mode}-`,
            Buffer.from(clientId),
            zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
            (err, path) => (err ? reject(err) : resolve(path))
        );
    });

    const nodeName = nodePath.split('/').pop();
    await tryAcquire(resPath, nodeName, clientId, mode);
    if (!heldLocks.has(clientId)) heldLocks.set(clientId, new Set());
    heldLocks.get(clientId).add(resource);

    return { resPath, nodePath };
}

/** 判断是否可以获得锁，否则监听前一个节点 */
async function tryAcquire(resPath, nodeName, clientId, mode) {
    const children = await new Promise((resolve, reject) => {
        client.getChildren(resPath, (err, list) => (err ? reject(err) : resolve(list)));
    });

    children.sort();
    const index = children.indexOf(nodeName);

    if (mode.startsWith('read')) {
        const before = children.slice(0, index);
        const blockingWrite = before.find((n) => n.startsWith('write'));
        if (!blockingWrite) {
            console.log(`📖 [${clientId}] Got READ lock on ${resPath.split('/').pop()}`);
            return;
        }
        await waitForDelete(`${resPath}/${blockingWrite}`);
        return tryAcquire(resPath, nodeName, clientId, mode);
    } else {
        // 写锁必须等所有前面的节点都删除
        if (index === 0) {
            console.log(`✍️ [${clientId}] Got WRITE lock on ${resPath.split('/').pop()}`);
            return;
        }
        const prev = children[index - 1];
        await waitForDelete(`${resPath}/${prev}`);
        return tryAcquire(resPath, nodeName, clientId, mode);
    }
}

/** 等待某节点被删除 */
function waitForDelete(path) {
    return new Promise((resolve) => {
        client.exists(
            path,
            (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) resolve();
            },
            (err, stat) => {
                if (!stat) resolve();
            }
        );
    });
}

/** 释放锁（支持可重入引用计数） */
async function releaseLock(resource, nodePath, clientId) {
    const held = heldLocks.get(clientId);
    if (!held?.has(resource)) return;

    held.delete(resource);
    if (held.size === 0) heldLocks.delete(clientId);

    if (!nodePath) {
        console.log(`🔁 [${clientId}] Reentrant release skip for ${resource}`);
        return;
    }

    return new Promise((resolve) => {
        client.remove(nodePath, (err) => {
            if (err) console.error(`❌ [${clientId}] Release error:`, err);
            else console.log(`🔓 [${clientId}] Released lock on ${resource}`);
            resolve();
        });
    });
}

/** 模拟读/写操作 */
async function doWork(clientId, resource, mode) {
    console.log(`🧠 [${clientId}] Doing ${mode.toUpperCase()} work on ${resource}...`);
    await sleep(2000 + Math.random() * 2000);
}

/** 模拟多个客户端竞争锁 */
async function simulateClients() {
    const clients = [
        { id: 'C1', res: 'book-123', mode: 'read' },
        { id: 'C2', res: 'book-123', mode: 'read' },
        { id: 'C3', res: 'book-123', mode: 'write' },
        { id: 'C4', res: 'book-123', mode: 'read' },
        { id: 'C5', res: 'book-123', mode: 'write' },
    ];

    for (const c of clients) {
        (async () => {
            const { resPath, nodePath } = await acquireLock(c.res, c.id, c.mode);
            await doWork(c.id, c.res, c.mode);
            await releaseLock(c.res, nodePath, c.id);
        })();
    }

    // 测试可重入性
    setTimeout(async () => {
        const r1 = await acquireLock('book-123', 'C1', 'read');
        const r2 = await acquireLock('book-123', 'C1', 'read');
        await doWork('C1', 'book-123', 'read');
        await releaseLock('book-123', r2.nodePath, 'C1');
        await releaseLock('book-123', r1.nodePath, 'C1');
    }, 8000);
}

```

### 分布式计数器

1. 实现目标

我们要做的是一个 分布式安全自增计数器，多个客户端同时执行 `increment()`，但计数结果仍然正确且不会重复。

例如：

* 初始 `/counter` = 0
* 同时10个客户端调用 `increment()`
* 结果 `/counter=10`(不会出现丢失或覆盖)

2. 实现原理

ZooKeeper 并没有原生的“计数器”类型，但我们可以用 CAS 版本号机制 来实现原子自增：

* 获取 /counter 节点数据和版本号
* 将数据解析为数字 n
* 尝试 setData("/counter", n+1, version)
* 如果版本号不匹配（说明被其他客户端更新了），则重试

3. distributed_counter.js

```javascript
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const COUNTER_PATH = '/distributed_counter';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();

// 初始化计数器节点
function ensureCounterNode() {
    return new Promise((resolve, reject) => {
        client.exists(COUNTER_PATH, (err, stat) => {
            if (err) return reject(err);
            if (stat) return resolve();

            client.create(COUNTER_PATH, Buffer.from('0'), (err) => {
                if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                    return reject(err);
                }
                console.log('✅ Counter node initialized.');
                resolve();
            });
        });
    });
}

// 获取当前计数器值
function getCounterValue() {
    return new Promise((resolve, reject) => {
        client.getData(COUNTER_PATH, (err, data, stat) => {
            if (err) return reject(err);
            const value = parseInt(data.toString());
            resolve({ value, stat });
        });
    });
}

// 使用 CAS（compare-and-set）机制自增计数器
async function incrementCounter(retry = 0) {
    try {
        const { value, stat } = await getCounterValue();
        const newValue = value + 1;

        await new Promise((resolve, reject) => {
            client.setData(COUNTER_PATH, Buffer.from(String(newValue)), stat.version, (err) => {
                if (err) {
                    if (err.getCode() === zookeeper.Exception.BAD_VERSION) {
                        if (retry < 5) {
                            console.log(`🔁 Version conflict, retry ${retry + 1}`);
                            setTimeout(() => incrementCounter(retry + 1).then(resolve).catch(reject), 50);
                        } else {
                            reject(new Error('Too many retries'));
                        }
                    } else {
                        reject(err);
                    }
                } else {
                    console.log(`✅ Increment success: ${value} → ${newValue}`);
                    resolve(newValue);
                }
            });
        });

    } catch (err) {
        console.error('❌ Increment failed:', err.message);
    }
}

client.once('connected', async () => {
    console.log('🔗 Connected to ZooKeeper.');
    await ensureCounterNode();

    // 模拟多个客户端并发自增
    const tasks = Array.from({ length: 10 }, () => incrementCounter());
    await Promise.all(tasks);

    const { value } = await getCounterValue();
    console.log(`🎯 Final counter value = ${value}`);

    client.close();
});

```

### 配置管理

1. 目标：集中式配置中心

多个服务实例都从 ZooKeeper 读取同一份配置，当配置在 ZooKeeper 中被修改时，所有客户端自动收到更新通知。

✅ 特点：

* 动态感知配置变更（watch）
* 多实例共享同一配置
* 配置热更新无需重启

```bash
zookeeper-config-demo/
 ├─ config_client.js   # 模拟业务服务读取 ZooKeeper 配置
 ├─ config_updater.js  # 模拟管理员更新配置
```

config_client.js

```js
// config_client.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const CONFIG_PATH = '/app_config';
const client = zookeeper.createClient(ZK_SERVERS);

client.connect();

// 监听配置变化
function watchConfig() {
    client.getData(CONFIG_PATH, (event) => {
        console.log('⚙️ 配置发生变化，重新加载...');
        watchConfig(); // 重新注册 watcher
        loadConfig();  // 重新读取新配置
    }, (err, data, stat) => {
        if (err) {
            if (err.getCode() === zookeeper.Exception.NO_NODE) {
                console.log('❌ 配置节点不存在，等待创建...');
                setTimeout(watchConfig, 2000);
                return;
            }
            return console.error('读取配置失败:', err);
        }

        try {
            const config = JSON.parse(data.toString());
            console.log('📦 当前配置:', config);
        } catch (e) {
            console.error('配置格式错误:', e.message);
        }
    });
}

function loadConfig() {
    client.getData(CONFIG_PATH, (err, data) => {
        if (!err && data) {
            console.log('🔄 更新后的配置:', JSON.parse(data.toString()));
        }
    });
}

client.once('connected', () => {
    console.log('🔗 已连接到 ZooKeeper');
    watchConfig();
});

```

config_updater.js

```javascript
// config_updater.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const CONFIG_PATH = '/app_config';
const client = zookeeper.createClient(ZK_SERVERS);

client.connect();

// 初始化配置节点
function ensureConfigNode(defaultConfig) {
    return new Promise((resolve, reject) => {
        client.exists(CONFIG_PATH, (err, stat) => {
            if (err) return reject(err);
            if (stat) return resolve();

            const data = Buffer.from(JSON.stringify(defaultConfig));
            client.create(CONFIG_PATH, data, (err) => {
                if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                    return reject(err);
                }
                console.log('✅ 配置节点已创建');
                resolve();
            });
        });
    });
}

// 更新配置
function updateConfig(newConfig) {
    const data = Buffer.from(JSON.stringify(newConfig, null, 2));
    client.setData(CONFIG_PATH, data, (err, stat) => {
        if (err) return console.error('❌ 更新失败:', err);
        console.log('✅ 配置已更新:', newConfig);
        client.close();
    });
}

client.once('connected', async () => {
    console.log('🔗 已连接到 ZooKeeper');
    await ensureConfigNode({ max_players: 100, motd: "Welcome to the Game!" });

    // 模拟管理员修改配置
    setTimeout(() => {
        updateConfig({
            max_players: 200,
            motd: "⚡ Server upgraded! Enjoy the game!",
        });
    }, 3000);
});

```

### 分布式队列/工作队列

这种模式常用于 任务分发/消息队列，保证多节点消费者能 公平消费任务，并且任务不会丢失。

1. 目标

* 任务生产者：把任务写入 ZooKeeper 队列节点
* 任务消费者：多个客户端从队列中取任务并执行
* 特点：
    * 多消费者抢任务，保证同一个任务只被一个消费者执行
    * 新任务会自动分配给空闲消费者
    * 使用 ZooKeeper 的顺序节点（SEQUENTIAL） + Watcher 机制 实现

2. ZooKeeper 队列设计

```bash
/task_queue
    ├── task-0000000000
    ├── task-0000000001
    └── task-0000000002
```

* 节点 `/task_queue` 是队列根节点
* 每个任务是 `/task_queue/task-` 前缀的 顺序节点
* 消费者通过 `getChildren` 获取任务列表并按顺序消费

3. producer.js + consumer.js

```javascript
// producer.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const QUEUE_PATH = '/task_queue';
const client = zookeeper.createClient(ZK_SERVERS);

client.connect();

function ensureQueueNode() {
    return new Promise((resolve, reject) => {
        client.exists(QUEUE_PATH, (err, stat) => {
            if (err) return reject(err);
            if (stat) return resolve();
            client.create(QUEUE_PATH, (err) => {
                if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) return reject(err);
                console.log('✅ 队列节点已创建');
                resolve();
            });
        });
    });
}

function produceTask(taskData) {
    const data = Buffer.from(JSON.stringify(taskData));
    client.create(
        `${QUEUE_PATH}/task-`,
        data,
        zookeeper.CreateMode.PERSISTENT_SEQUENTIAL,
        (err, path) => {
            if (err) return console.error('❌ 创建任务失败:', err);
            console.log('📦 任务已入队:', path);
        }
    );
}

client.once('connected', async () => {
    console.log('🔗 已连接到 ZooKeeper');
    await ensureQueueNode();

    // 模拟入队任务
    produceTask({ id: 1, msg: 'task1' });
    produceTask({ id: 2, msg: 'task2' });
    produceTask({ id: 3, msg: 'task3' });

    setTimeout(() => client.close(), 1000);
});

```

```javascript
// consumer.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const QUEUE_PATH = '/task_queue';
const client = zookeeper.createClient(ZK_SERVERS);

client.connect();

function consumeTask() {
    client.getChildren(
        QUEUE_PATH,
        (event) => {
            console.log('⚡ 队列变化事件:', event);
            consumeTask(); // 重新注册 watcher
        },
        async (err, children) => {
            if (err) return console.error('❌ 获取任务失败:', err);
            if (!children || children.length === 0) return;

            children.sort(); // 按顺序消费
            const taskNode = children[0];
            const taskPath = `${QUEUE_PATH}/${taskNode}`;

            // 取数据并删除节点，实现任务消费
            client.getData(taskPath, (err, data) => {
                if (err) return console.error('❌ 读取任务失败:', err);
                const task = JSON.parse(data.toString());

                // 删除节点，表示任务完成
                client.remove(taskPath, (err) => {
                    if (err) return console.error('❌ 删除任务失败:', err);

                    console.log('✅ 消费任务:', task);
                    console.log('🗑 任务完成:', taskNode);
                });
            });
        }
    );
}

client.once('connected', () => {
    console.log('🔗 已连接到 ZooKeeper，开始消费任务...');
    consumeTask();
});

```

### Barrier/同步栅栏

有的也叫 屏障。

这种模式常用于 __分布式节点同步启动__、__任务协同__，保证所有节点在到达某个“关卡”之前都阻塞，等到所有节点都准备好了才统一放行。

1. 目标

* 有 N 个节点（客户端）
* 每个节点在执行关键任务前调用 Barrier
* Barrier 功能：所有节点到齐后才继续执行
* 应用场景：
    * 分布式计算任务同步开始
    * 游戏服务器等待所有玩家准备完成
    * 多节点配置同步启动

2. ZooKeeper节点设计

```bash
/barrier
    ├── node-0000000000
    ├── node-0000000001
    ├── node-0000000002
```

* `/barrier`是Barrier根节点
* 每个客户端在 Barrier 下创建顺序临时节点
* 当节点数量等于设定人数时，所有客户端被放行

```javascript
// barrier.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const BARRIER_PATH = '/barrier';
const NODE_COUNT = 3; // Barrier 阈值
const client = zookeeper.createClient(ZK_SERVERS, {
    sessionTimeout: 30000, // 30s session
    spinDelay: 1000,
    retries: 5
});

client.connect();

// --------------------
// 辅助函数：确保 Barrier 根节点存在
// --------------------
function ensureBarrierNode() {
    return new Promise((resolve, reject) => {
        client.exists(BARRIER_PATH, (err, stat) => {
            if (err) return reject(err);
            if (stat) return resolve();
            client.create(BARRIER_PATH, (err) => {
                if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) return reject(err);
                console.log('✅ Barrier 节点已创建');
                resolve();
            });
        });
    });
}

// --------------------
// 获取子节点带重试机制
// --------------------
function getChildrenWithRetry(path, maxRetries = 5) {
    return new Promise((resolve, reject) => {
        let attempt = 0;

        function tryGet() {
            client.getChildren(
                path,
                (event) => {
                    // watcher 触发时重新注册，但加延迟避免频繁调用
                    setTimeout(() => tryGet(), 50);
                },
                (err, children) => {
                    if (!err) return resolve(children);
                    if (err.getCode && err.getCode() === zookeeper.Exception.CONNECTION_LOSS && attempt < maxRetries) {
                        attempt++;
                        console.log(`🔁 CONNECTION_LOSS, retry ${attempt}`);
                        setTimeout(tryGet, 100);
                    } else {
                        reject(err);
                    }
                }
            );
        }

        tryGet();
    });
}

// --------------------
// Barrier 核心逻辑
// --------------------
async function enterBarrier(nodeName) {
    // 创建临时顺序节点
    const path = await new Promise((resolve, reject) => {
        client.create(
            `${BARRIER_PATH}/node-`,
            Buffer.from(nodeName),
            zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
            (err, path) => {
                if (err) return reject(err);
                console.log(`🔗 ${nodeName} 已加入 Barrier: ${path}`);
                resolve(path);
            }
        );
    });

    // 等待所有节点到达
    await new Promise(async (resolve) => {
        async function checkBarrier() {
            try {
                const children = await getChildrenWithRetry(BARRIER_PATH);
                console.log(`⏳ 当前 Barrier 节点数量: ${children.length}/${NODE_COUNT}`);
                if (children.length >= NODE_COUNT) {
                    resolve();
                }
            } catch (err) {
                console.error('❌ 检查 Barrier 失败:', err.message);
                // 失败后延迟重试
                setTimeout(checkBarrier, 200);
            }
        }
        checkBarrier();
    });

    console.log(`🎉 ${nodeName} Barrier 放行，开始执行任务！`);

    // 安全延迟关闭 client
    setTimeout(() => {
        client.close();
        console.log(`🔒 ${nodeName} 客户端已关闭`);
    }, 500);
}

// --------------------
// 启动入口
// --------------------
client.once('connected', async () => {
    console.log('🔗 已连接到 ZooKeeper');
    await ensureBarrierNode();

    const nodeName = process.argv[2] || 'node-' + Math.floor(Math.random() * 1000);
    await enterBarrier(nodeName);
});

```
