EnTT: 图 Graph

记录 EnTT 图结构、Graphviz DOT 输出、flow builder、资源调度和执行图相关工具。

EnTT 的 采取了简洁设计的原则, 并没有尝试覆盖所有图相关功能, 而是提供了一些基础的数据结构和算法, 主要用于支持特定的工具 (如 flow builder)

邻接矩阵 Adjacency Matrix

邻接矩阵 是一种用于表示 结构的数学矩阵, 在这种表示方式中, 图的 顶点 通过一个 二维矩阵 关联起来

  • 有向图: 如果图是 有向图, 矩阵中的值表示从一个顶点到另一个顶点是否存在一条边
  • 无向图: 如果图是 无向图, 矩阵是对称的, 因为边没有方向

假设 nn顶点, 则 邻接矩阵 是一个 n×nn \times n 的二维数组或矩阵 A\mathbf{A}

  • A[i][j]=1\mathbf{A}[i][j] = 1 表示从顶点 ii 到 顶点 jj 存在一条边
  • A[i][j]=0\mathbf{A}[i][j] = 0 表示从顶点 ii 到 顶点 jj 没有边

对于 带权图 Weighted Graph, 矩阵中的值可以是边的权重

  • A[i][j]=w\mathbf{A}[i][j] = w 表示从顶点 ii 到 顶点 jj 到边的权重为 ww
  • A[i][j]=0\mathbf{A}[i][j] = 0 或其他特殊值, 例如 \infty, 表示没有边

EnTT 的邻接矩阵表示 有向图无向图 时, 通过 标签 指定图的类型

entt::adjacency_matrix<entt::directed_tag> adjacency_matrix{};

directed_tag 表示有向图, undirected_tag 表示无向图

邻接矩阵是支持分配器的容器, 提供常见功能如 clearget_allocator

初始化

邻接矩阵在构建时用元素 (顶点) 的数量进行初始化, 但也可以稍后使用 resize 函数来调整大小

entt::adjacency_matrix<entt::directed_tag> adjacency_matrix{3u};

访问

在访问时, 不采用传统的双索引访问 (如 matrix[i][j]), 而是提供更符合 C++ 风格的函数接口, 例如 containsinserterase

if(adjacency_matrix.contains(0u, 1u)) 
{
    adjacency_matrix.erase(0u, 1u);
} else {
    adjacency_matrix.insert(0u, 1u);
}

插入和删除

插入和删除具有 幂等性, 也就是说多次调用 inserterase 操作同一元素不会产生副作用, 即如果元素已经存在或已被删除, 则它们不起作用

  • insert: 返回一个 std::pair, 包含迭代器和布尔值, 布尔值表示是否成功插入新元素
  • erase: 返回被删除的元素数量 (01)

遍历

顶点遍历

使用 vertices 函数遍历所有顶点

for(auto &&vertex: adjacency_matrix.vertices()) {
    // 处理顶点
}

或直接使用顶点索引遍历

for(auto last = adjacency_matrix.size(), pos = 0u; pos < last; ++pos) {
    // 处理顶点
}

边遍历

遍历所有边

for(auto [lhs, rhs]: adjacency_matrix.edges()) {
    // 处理从 lhs 到 rhs 的边
}

遍历某个顶点的入边或出边

for(auto [lhs, rhs]: adjacency_matrix.out_edges(3u)) {
    // 处理从 lhs 到 rhs 的出边
}

in_edgesout_edges 函数都需要指定目标顶点作为参数

Graphviz DOT 语言支持

EnTT 提供了一个简单的方法将 转换为 Graphviz DOT 格式的片段

最简单的方式是将输出流和图传递给 dot 函数

std::ostringstream output{};
entt::dot(output, adjacency_matrix);

output 是目标输出流, 用于存储生成的 DOT 格式内容, adjacency_matrix 则是需要转换为 DOT 格式的图

输出

graph {
  0 -- 1;
  1 -- 2;
}

如果需要在 DOT 输出中为顶点或边添加自定义属性, 可以使用带回调的版本, 回调会针对每个顶点调用, 用户可以通过该回调向输出中添加属性

std::ostringstream output{};
 
entt::dot(output, adjacency_matrix, [](auto &out, auto vertex) {
    out << "label=\"v" << vertex << "\",shape=\"box\"";
});

回调接收两个参数

  • out: 输出流, 用户向其中写入 DOT 格式的内容
  • vertex: 当前处理的顶点

上述代码为每个顶点添加了

  • label: 顶点的标签 (形如 v0, v1)
  • shape: 顶点的形状属性 (box 表示矩形)

假设图有顶点 0, 1, 2, 结果如下

graph {
    0 [label="v0", shape="box"];
    1 [label="v1", shape="box"];
    2 [label="v2", shape="box"];
    0 -- 1;
    1 -- 2;
}

流程构建器 Flow Builder

流程构建器是一种用于从任务和资源中创建 执行图 的工具, 流程构建器的实现高度通用, 不依赖于库的其他部分, 可以灵活地集成到各种场景中

其设计上类似于一个 状态机, 每个任务附加到特定的状态中, 同时指定该任务访问的资源及其模式 (只读/读写), 大多数 API 还会返回流程构建器自身, 符合构建器类常见的链式调用设计风格

工作流程

  1. 注册任务: 将所有任务注册到流程构建器中, 每个任务会被表示为 中的一个 顶点
  2. 分配资源: 为每个任务分配需要的资源, 并指定访问模式
  3. 生成执行图: 注册完成后, 流程构建器会生成一个 执行图, 以邻接矩阵的形式表示, 图中的每个 顶点 代表一个任务, 顶点的索引与任务 标识符 关联
#include <entt/entt.hpp>
 
int main() {
    entt::flow_builder flow_builder;
 
    // 注册任务及其资源访问
    flow_builder.register_task("Load Data")
                .access("File", ReadOnly);
 
    flow_builder.register_task("Process Data")
                .access("File", ReadOnly)
                .access("Memory", ReadWrite);
 
    flow_builder.register_task("Save Results")
                .access("Memory", ReadOnly)
                .access("Disk", ReadWrite);
 
    // 构建执行图
    auto execution_graph = flow_builder.build();
 
    // 遍历执行图的任务(顶点)
    for(auto &&vertex: execution_graph.vertices()) {
        std::cout << "Task: " << vertex << "\n";
    }
 
    // 遍历任务之间的依赖(边)
    for(auto [from, to]: execution_graph.edges()) {
        std::cout << "Dependency: " << from << " -> " << to << "\n";
    }
 
    return 0;
}
Task: Load Data
Task: Process Data
Task: Save Results
Dependency: Load Data -> Process Data
Dependency: Process Data -> Save Results

任务和资源 Tasks and resources

EnTT 的 流程构建器 采用基于 标识符 的设计, 用于管理任务和资源, 这种设计提供了极大的灵活性, 同时避免了与特定数据结构或库的其他部分耦合

任务和资源的标识符

任务和资源都通过 id_type (整型值) 标识, 这种设计避免了对具体数据的绑定, 用户需要自行管理标识符与实际操作或数据之间的关联

EnTT 提供了 哈希字符串 ("task_1"_hs) 用于生成 id_type, 与内部的 RTTI 系统兼容, 用户也可以选择使用自己的工具或哈希函数

entt::flow builder{};
builder.bind("task_1"_hs);  // 绑定任务

绑定任务和分配资源

绑定任务

通过 bind 方法绑定任务

builder.bind("task_1"_hs);

分配资源

任务绑定后, 可以通过 ro (只读) 或 rw (读写) 方法为任务分配资源

builder
    .bind("task_1"_hs)
        .ro("resource_1"_hs)  // 分配只读资源
        .ro("resource_2"_hs)
    .bind("task_2"_hs)
        .rw("resource_2"_hs); // 分配读写资源

所有函数返回流程构建器自身, 支持链式调用, 方便任务和资源的注册

资源范围分配

rorw 方法支持传入迭代器范围, 一次性分配多个资源

std::vector<entt::id_type> resources{"resource_1"_hs, "resource_2"_hs};
builder.bind("task_1"_hs).ro(resources.begin(), resources.end());

基于资源的图生成

执行图是基于资源生成的, 而不是按任务定义的顺序, 其中每个 任务 对应一个 顶点, 由资源的 访问模式 决定任务之间的依赖关系

资源重绑定

  • 直接重新绑定资源
    修改同一任务对资源的访问模式时, 后续绑定会覆盖之前的定义
    builder.bind("task"_hs).rw("resource"_hs).ro("resource"_hs);

在后台, 该调用实际上并没有取代前一个调用, 而是排在它后面, 同时在生成图表时覆盖它, 因此, 大量的资源重新绑定甚至可能会影响处理时间 (很难观察到, 但理论上是可能的)

  • 任务和资源同时重新绑定
    修改任务和资源的绑定关系可能导致意外结果, 因为生成图的过程是基于资源的
    builder
    .bind("task_1"_hs).ro("resource"_hs)
    .bind("task_2"_hs).ro("resource"_hs)
    .bind("task_1"_hs).rw("resource"_hs);

结果将是只有一条边从 task_2 指向 task_1 (表示 task_2 依赖于 task_1), 任务不能指向自身, 因此最后一次绑定改变了边的方向

可以发现, 通过资源重新绑定, 很容易导致 循环依赖

    builder
    .bind("task_1"_hs).rw("resource"_hs)
    .bind("task_2"_hs).rw("resource"_hs)
    .bind("task_1"_hs).rw("resource"_hs);

生成的图中将存在 task_1task_2 之间的循环依赖, 所以应该 尽量避免重新绑定

资源调度

  1. rwro 的定义

    • rw 是写操作,通常是独占的,表示对资源的修改。它是阻塞操作,优先级高。
    • ro 是读操作,通常是非独占的,表示对资源的读取。多个 ro 可以并行执行。
  2. 声明顺序的影响

    • 如果 rw 在任务声明时优先于 ro,则 rw 会先执行。
    • 如果 ro 在声明时优先于 rw,则 ro 会先执行。
    • 假设这些任务是不同的(没有相互依赖的任务)。
  3. 调度规则

    • 写操作(rw)是阻塞操作,这意味着它必须独占地访问资源,不能与其他操作并行执行。
    • 如果在 rw 前面有多个 ro 操作,这些 ro 操作会并行执行,完成后再执行 rw
    • 如果在 rw 后面有多个 ro 操作,则 rw 会先独占执行,完成后这些 ro 操作才会并行执行。
  4. 任务执行的图形化表示

    • 整体流程可以用一个任务依赖图(图中的节点表示任务,边表示依赖)来描述。图表展示了资源(resource)之间、任务(task)之间的依赖关系,以及具体的执行顺序。
  5. 总结

    • rw 的执行顺序取决于任务声明的顺序。
    • 无论顺序如何,rw 始终独占执行,ro 则可以并行。
    • 这种规则通常用于任务调度系统或资源管理系统,例如数据库事务、操作系统调度或分布式系统中的资源锁机制。

虚假资源与执行顺序 Fake resources and order of execution

流程构建器 本身并不直接支持显式地指定任务的执行顺序 (即 任务 A任务 B 之前或之后运行), 然而, 通过利用 虚假资源 fake resources, 可以间接实现对任务执行顺序的控制

因为在资源上注册任务的顺序决定了任务在生成执行图时的依赖关系, 所以当任务对资源的访问模式 (只读或读写) 不同时, 系统会强制对任务进行 顺序调度, 通过引入 虚假资源, 可以人为地创建任务间的 依赖关系, 从而控制任务的 执行顺序

强制任务在另一组任务之前执行

任务通过对 虚假资源 设置为 读写模式 (rw), 并且 声明在前面, 可以确保一个任务在其他任务 之前 运行

builder
    .bind("task_1"_hs)
        .ro("resource_1"_hs)
        .rw("fake"_hs)  // 任务 1 对虚拟资源具有写权限
    .bind("task_2"_hs)
        .ro("resource_2"_hs)
        .ro("fake"_hs)  // 任务 2 对虚拟资源仅有读权限
    .bind("task_3"_hs)
        .ro("resource_2"_hs)
        .ro("fake"_hs); // 任务 3 对虚拟资源仅有读权限

task_1 强制在 task_2task_3 之前执行, 这是因为 task_1虚假资源写权限, 而其他任务需要对该资源进行 读操作, 因此无法并行

强制任务在另一组任务之后执行

通过将一个任务对 虚假资源 的访问设置为 读写模式 (也是 rw), 并且 声明在最后, 可以确保它在其他任务之后运行

builder
    .bind("task_1"_hs)
        .ro("resource_1"_hs)
        .ro("fake"_hs)  // 任务 1 对虚拟资源仅有读权限
    .bind("task_2"_hs)
        .ro("resource_1"_hs)
        .ro("fake"_hs)  // 任务 2 对虚拟资源仅有读权限
    .bind("task_3"_hs)
        .ro("resource_2"_hs)
        .rw("fake"_hs); // 任务 3 对虚拟资源具有写权限

task_3 强制在 task_1task_2 之后执行, task_1task_2虚假资源 是并行的 读操作, 而 task_3 需要写入该资源, 因此它必须在所有 读操作 完成后才能执行

同步点 Sync Points

在某些情况下, 为图中的某个 节点 (任务) 分配 同步点 的角色非常有用, 同步点 可以用于强制任务间的依赖关系, 确保特定任务的执行顺序

同步点 是一个特殊的 节点, 用于确保多个任务在执行前或执行后需要经过一个特定的 汇聚点, 同步点可以访问资源, 也可以是一个 无操作 (no-op) 节点

使用 bind 方法将同步点与流程构建器绑定 (和普通任务一样), 然后使用 sync 方法将其标记为同步点

builder.bind("sync_point"_hs).sync();

如果同步点需要对资源进行操作, 可以在绑定后添加资源访问, 如果同步点不需要操作资源, 也可以是一个空任务

同步点强制执行图的依赖关系, 例如

builder
    .bind("task_1"_hs).ro("resource_1"_hs)
    .bind("task_2"_hs).ro("resource_2"_hs)
    .bind("sync_point"_hs).sync()
    .bind("task_3"_hs).rw("resource_3"_hs);

task_1task_2 无直接依赖, 可以并行执行, sync_point 作为同步点, 确保 task_1task_2 完成后才能执行 task_3

task_1 --> sync_point --> task_3
task_2 --> sync_point --> task_3

执行图

在注册好所有资源及其消费者 (任务) 之后, 执行图会被生成, 执行图会根据用户定义的约束条件, 返回一个最佳的任务调度图, 表示任务之间的依赖关系

通过调用 builder.graph() 方法, 可以生成一个 邻接矩阵 表示的 有向执行图

entt::adjacency_matrix<entt::directed_tag> graph = builder.graph();

图中的每个顶点对应一个任务, 图中的有向边表示任务之间的依赖关系

获取入口顶点

入口顶点是图中没有依赖于其他任务的任务, 也即没有入边的顶点, 这些任务是执行图中的起点

for(auto &&vertex: graph) {
    // 获取该顶点的入边
    if(auto in_edges = graph.in_edges(vertex); in_edges.begin() == in_edges.end()) {
        std::cout << "Main vertex: " << vertex << "\n";
    }
}

遍历所有顶点, 检查 in_edges 是否为空, 如果没有入边 in_edges.begin() == in_edges.end(), 则该顶点是入口顶点

获取任务的子节点

执行图可以通过 out_edges 方法获取给定任务的子任务 (出边)

for(auto [from, to]: graph.out_edges(task_vertex)) {
    std::cout << "Task " << from << " -> Child Task " << to << "\n";
}

out_edges(task_vertex) 返回一个范围, 表示该任务的所有子任务, 每个出边包含两个顶点: from 当前任务, to 子任务

获取所有任务的依赖关系

通过 edges 方法, 可以遍历执行图中所有的任务依赖关系

for(auto [from, to]: graph.edges()) {
    std::cout << "Dependency: " << from << " -> " << to << "\n";
}

典型执行逻辑

静态调度 - 拓扑排序

生成一个按依赖关系排序的任务序列, 它是单次生成完整的线性任务执行顺序, 不支持并行, 输出的序列按顺序执行, 使用 深度优先搜索 DFS依赖计数 Kahn 算法 完成图的拓扑排序

动态调度 - 递归执行入口任务

  1. 生成执行图

  2. 初始化依赖计数
    在生成执行图后, 计算每个任务的依赖计数

    std::unordered_map<entt::id_type, int> dependencies;
 
    // 初始化依赖计数
    for(auto &&vertex: graph) {
        dependencies[vertex] = 0;  // 初始计数为 0
    }
    for(auto [from, to]: graph.edges()) {
        dependencies[to]++;  // 子任务依赖计数加 1
    }
 
  1. 查找入口顶点 (起始任务)
    入口顶点是依赖计数为 0 的任务, 可以直接启动执行
    // 前向声明
    void execute_task(entt::adjacency_matrix<entt::directed_tag> &graph, entt::id_type task);
    std::vector<entt::id_type> entry_points;
 
    // 查找入口顶点
    for(auto &&vertex: graph) {
        if(dependencies[vertex] == 0) {
            entry_points.push_back(vertex);
        }
    }
 
    // 启动入口顶点任务
    for(auto &&entry_point: entry_points) {
        active_tasks++;
        std::thread(execute_task, std::ref(graph),  entry_point).detach();
    }
 
  1. 递归执行任务并更新依赖
    在任务完成时, 递归检查其子任务的依赖, 如果子任务的依赖计数变为 0, 则立即启动
    void execute_task(entt::adjacency_matrix<entt::directed_tag> &graph, entt::id_type task) {
        // 执行当前任务逻辑
        std::cout << "Executing task: " << task << std::endl;
 
        // 遍历子任务(出边)
        for(auto [from, to]: graph.out_edges(task)) {
            if(--dependencies[to] == 0) {  // 子任务依赖计数减 1
                active_tasks++;
                std::thread(execute_task, std::ref(graph), to). detach();  // 启动子任务
            }
        }
 
        // 当前任务完成后减少活跃任务计数
        active_tasks--;
    }
 

完整的例子

#include <entt/entt.hpp>
#include <iostream>
#include <unordered_map>
#include <vector>
#include <thread>
#include <atomic>
#include <queue>
 
std::unordered_map<entt::id_type, std::atomic<int>> dependencies;
std::atomic<int> active_tasks{0};  // 活跃任务计数器
 
void execute_task(entt::adjacency_matrix<entt::directed_tag> &graph, entt::id_type task) {
    // 执行当前任务
    std::cout << "Executing task: " << task << std::endl;
 
    // 遍历所有子任务(出边)
    for(auto [from, to]: graph.out_edges(task)) {
        if(--dependencies[to] == 0) {  // 减少子任务的依赖计数
            active_tasks++;
            std::thread(execute_task, std::ref(graph), to).detach();  // 启动子任务
        }
    }
 
    // 当前任务完成
    active_tasks--;
}
 
int main() {
    entt::flow builder{};
 
    // 定义任务和资源依赖
    builder
        .bind("task_1"_hs).ro("resource_1"_hs)
        .bind("task_2"_hs).ro("resource_2"_hs)
        .bind("task_3"_hs).ro("resource_1"_hs).rw("resource_3"_hs)
        .bind("task_4"_hs).rw("resource_3"_hs);
 
    // 生成执行图
    auto graph = builder.graph();
 
    // 初始化依赖计数
    for(auto &&vertex: graph) {
        dependencies[vertex] = 0;
    }
    for(auto [from, to]: graph.edges()) {
        dependencies[to]++;
    }
 
    // 找到入口顶点
    std::vector<entt::id_type> entry_points;
    for(auto &&vertex: graph) {
        if(dependencies[vertex] == 0) {
            entry_points.push_back(vertex);
        }
    }
 
    // 启动入口任务
    for(auto &&entry_point: entry_points) {
        active_tasks++;
        std::thread(execute_task, std::ref(graph), entry_point).detach();
    }
 
    // 等待所有任务完成
    while(active_tasks > 0) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));  // 主线程阻塞
    }
 
    return 0;
}

注意, 动态调度无法解决 循环依赖 问题, 如果要保证图是一个 有向无环图 DAG, 可以用 静态调度拓扑排序 算法来判断是否有循环依赖 (可以不进行排序而只是检测环路)