Python实现关键路径和七格图计算详解
作者:田辛 时间:2022-04-25 12:17:46
关键路径计算是项目管理中关于进度管理的基本计算。 但是对于绝大多数同学来说, 关键路径计算都只是对一些简单情形的计算。
今天,田老师根据以往的经验,用Python实现了在相对较复杂的情形下:
1.关键路径计算
2.七格图计算,包括
最早结束时间EFT
最晚开始时间LST
最晚结束时间LFT
总浮动时间TF
自由浮动时间FF
3.紧前关系绘图法(AON)
并且, 将输出结果保存为网页。 而且在网页底部附上了完整的计算过程。
这个Python程序的实现,可以帮助项目管理者更加方便地进行关键路径计算和进度管理。同时,通过输出结果保存为网页的形式,也可以方便地与团队成员进行共享和交流。
1.主程序
主程序主要实现了一个Project类,其中包含了计算关键路径和七格图的方法。具体实现方式如下:
1. 定义了一个Activity类,包含了活动的id、名称、持续时间和紧前任务列表等属性。
2. 定义了一个Project类,包含了活动列表、项目持续时间、日志等属性,以及计算关键路径、计算七格图、计算总浮动时间、计算自由浮动时间等方法。
3. 从JSON文件中读取活动信息,并创建Project对象并添加活动。
4. 调用Project对象的calculate方法,计算每个活动的最早开始时间、最晚开始时间等数据。
5. 调用Project对象的calculate_critical_path方法,计算关键路径。
6. 调用Project对象的calculate_project_duration方法,计算项目总工期。
7. 使用Jinja2模板引擎生成项目的活动清单,并将关键路径
import json
from datetime import datetime
from typing import List
import graphviz
from jinja2 import Template
from activity import Activity
class Project:
def __init__(self):
self.activities: List[Activity] = []
self.duration = 0
self.logger = []
def log(self, log: str) -> None:
self.logger.append(log)
def add_activity(self, activity: Activity) -> None:
"""
添加一个活动到项目中
:param
activity: 待添加的活动
""" # 将活动添加到项目中
self.activities.append(activity)
def calculate(self) -> None:
""" 计算整个项目的关键信息
:return: None
""" self.calculate_successor()
self._calculate_forward_pass() # 计算正推法
self._calculate_backward_pass() # 计算倒推法
self._calculate_total_floats() # 计算总浮动时间
self._calculate_free_floats() # 计算自由浮动时间
def calculate_successor(self) -> None:
self.log("开始计算紧后活动")
for act in self.activities:
for pred in act.predecessors:
for act_inner in self.activities:
if act_inner.id == pred:
act_inner.successors.append(act.id)
def _calculate_forward_pass(self) -> None:
self.log("## 开始正推法计算")
# 进入 while 循环,只有当所有活动的最早开始时间和最早完成时间都已经计算出来时,才会退出循环
while not self._is_forward_pass_calculated():
# 遍历每个活动
for activity in self.activities:
# 如果活动的最早开始时间已经被计算过,则跳过
if activity.est is not None:
continue
# 如果活动没有前置活动, 则从1开始计算最早开始时间和最早结束时间
if not activity.predecessors:
activity.est = 1
activity.eft = activity.est + activity.duration - 1
self.log(
f"活动 {activity.name} 没有紧前活动,设定最早开始时间为1, 并根据工期计算最早结束时间为{activity.eft}")
else:
# 计算当前活动的所有前置活动的最早完成时间
predecessors_eft = [act.eft for act in self.activities if
act.id in activity.predecessors and act.eft is not None]
# 如果当前活动的所有前置活动的最早完成时间都已经计算出来,则计算当前活动的最早开始时间和最早完成时间
if len(predecessors_eft) == len(activity.predecessors):
activity.est = max(predecessors_eft) + 1
activity.eft = activity.est + activity.duration - 1
self.log(
f"活动 {activity.name} 紧前活动已完成正推法计算, 开始日期按最早开始时间里面最大的," +
f"设定为{activity.est}并根据工期计算最早结束时间为{activity.eft}")
# 更新项目总持续时间为最大最早完成时间
self.duration = max([act.eft for act in self.activities])
def _calculate_backward_pass(self) -> None:
""" 计算倒推法
:return: None
"""
self.log("## 开始倒推法计算") # 输出提示信息
# 进入 while 循环,只有当所有活动的最晚开始时间和最晚完成时间都已经计算出来时,才会退出循环
while not self._is_backward_pass_calculated():
# 遍历每个活动
for act in reversed(self.activities):
# 如果活动的最晚开始时间已经被计算过,则跳过
if act.lft is not None:
continue
# 如果活动没有后继活动, 则从总持续时间开始计算最晚开始时间和最晚结束时间
if not act.successors:
act.lft = self.duration
act.lst = act.lft - act.duration + 1
self.log(f"活动 {act.name} 没有紧后活动,按照正推工期设定最晚结束时间为{act.lft}," +
f"并根据工期计算最晚开始时间为{act.lst}")
else:
# 计算当前活动的所有后继活动的最晚开始时间
successors_lst = self._calculate_lst(act)
# 如果当前活动的所有后继活动的最晚开始时间都已经计算出来,则计算当前活动的最晚开始时间和最晚完成时间
if len(successors_lst) == len(act.successors):
act.lft = min(successors_lst) - 1
act.lst = act.lft - act.duration + 1
self.log(f"活动 {act.name} 紧后活动计算完成,按照倒推工期设定最晚结束时间为{act.lft}," +
f"并根据工期计算最晚开始时间为{act.lst}")
# 更新项目总持续时间为最大最晚完成时间
self.duration = max([act.lft for act in self.activities])
def _calculate_lst(self, activity: Activity) -> List[int]:
"""计算某一活动的所有最晚开始时间
:param activity: 活动对象
:return: 最晚开始时间列表
""" rst = [] # 初始化结果列表
for act in activity.successors: # 遍历该活动的后继活动
for act2 in self.activities: # 遍历所有活动
if act2.id == act and act2.lst is not None: # 如果找到了该后继活动且其最晚开始时间不为空
rst.append(act2.lst) # 将最晚开始时间加入结果列表
return rst # 返回结果列表
def _is_forward_pass_calculated(self) -> bool:
""" 判断整个项目正推法计算已经完成
:return: 若已计算正向传递则返回True,否则返回False
"""
for act in self.activities: # 遍历所有活动
if act.est is None or act.eft is None: # 如果该活动的最早开始时间或最早完成时间为空
return False # 则返回False,表示还未计算正向传递
return True # 如果所有活动的最早开始时间和最早完成时间都已计算,则返回True,表示已计算正向传递
def _is_backward_pass_calculated(self) -> bool:
""" 判断整个项目倒推法计算已经完成
:return: 若已计算倒推法则返回True,否则返回False
""" for act in self.activities: # 遍历所有活动
if act.lst is None or act.lft is None: # 如果该活动的最晚开始时间或最晚完成时间为空
return False # 则返回False,表示还未计算倒推法
return True # 如果所有活动的最晚开始时间和最晚完成时间都已计算,则返回True,表示已计算倒推法
def _calculate_total_floats(self) -> None:
""" 计算所有活动的总浮动时间
:return: None
"""
self.log(f"## 开始计算项目所有活动的总浮动时间")
for act in self.activities: # 遍历所有活动
if act.est is not None and act.lst is not None: # 如果该活动的最早开始时间和最晚开始时间都已计算
act.tf = act.lst - act.est # 则计算该活动的总浮动时间
self.log(f"计算{act.name}的总浮动时间" + f"最晚开始时间{act.lst} - 最早开始时间{act.est} = {act.tf}", )
else: # 如果该活动的最早开始时间或最晚开始时间为空
act.tf = None # 则将该活动的总浮动时间设为None
def _calculate_free_floats(self) -> None:
""" 计算所有活动的自由浮动时间
:return: None
""" self.log(f"## 开始计算项目所有活动的自由浮动时间") # 输出提示信息
for act in self.activities: # 遍历所有活动
if act.tf == 0: # 如果该活动的总浮动时间为0
self.log(f"计算{act.name}的自由浮动时间" + f"因为{act.name}的总浮动时间为0,自由浮动时间为0") # 输出提示信息
act.ff = 0 # 则将该活动的自由浮动时间设为0
elif act.tf > 0: # 如果该活动的总浮动时间大于0
self.log(f"计算{act.name}的自由浮动时间") # 输出提示信息
self.log(f"- {act.name}的总浮动时间{act.tf} > 0,") # 输出提示信息
tmp = [] # 初始化临时列表
for act2 in self.activities: # 遍历所有活动
if act2.id in act.successors: # 如果该活动是该活动的紧后活动
self.log(f"- {act.name}的紧后活动{act2.name}的自由浮动动时间为{act2.tf}") # 输出提示信息
tmp.append(act2.tf) # 将该紧后活动的自由浮动时间加入临时列表
if len(tmp) != 0: # 如果临时列表不为空
act.ff = act.tf - max(tmp) # 则计算该活动的自由浮动时间
if act.ff < 0:
act.ff = 0
self.log(f"- 用活动自己的总浮动{act.tf}减去多个紧后活动总浮动的最大值{max(tmp)} = {act.ff}")
else: # 如果临时列表为空
act.ff = act.tf # 则将该活动的自由浮动时间设为总浮动时间
def calculate_critical_path(self) -> List[Activity]:
""" 计算整个项目的关键路径
:return: 整个项目的关键路径
""" ctc_path = [] # 初始化关键路径列表
for act in self.activities: # 遍历所有活动
if act.tf == 0: # 如果该活动的总浮动时间为0
ctc_path.append(act) # 则将该活动加入关键路径列表
return ctc_path # 返回关键路径列表
def calculate_project_duration(self) -> int:
""" 计算整个项目的持续时间
:return: 整个项目的持续时间
""" return max(activity.eft for activity in self.activities) # 返回所有活动的最早完成时间中的最大值,即整个项目的持续时间
# 从JSON文件中读取活动信息
with open('activities.json', 'r', encoding='utf-8') as f:
activities_data = json.load(f)
# 创建Project对象并添加活动
project = Project()
for activity_data in activities_data:
activity = Activity(
activity_data['id'],
activity_data['name'],
activity_data['duration'],
activity_data['predecessors']
) project.add_activity(activity)
# 计算每个活动的最早开始时间、最晚开始时间等数据
project.calculate()
# 计算关键路径和项目总工期
critical_path = project.calculate_critical_path()
project_duration = project.calculate_project_duration()
# 生成项目的活动清单
with open('template.html', 'r', encoding='utf-8') as f:
template = Template(f.read())
html = template.render(
activities=project.activities,
critical_path=critical_path,
project_duration=project_duration,
log=project.logger
)
# 生成项目进度网络图
aon_graph = graphviz.Digraph(format='png', graph_attr={'rankdir': 'LR'})
for activity in project.activities:
aon_graph.node(str(activity.id), activity.name)
for predecessor in activity.predecessors:
aon_graph.edge(str(predecessor), str(activity.id))
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
aon_filename = f"aon_{timestamp}"
aon_graph.render(aon_filename)
# 将项目进度网络图插入到HTML文件中
aon_image = f'<img src="{aon_filename}.png" alt="Precedence Diagramming Method: AON">'
html = html.replace('<p>Precedence Diagramming Method: AON: <br/>[image]</p>',
'<p>紧前关系绘图法: AON: <br/>' + aon_image + '</p>')
filename = datetime.now().strftime('%Y%m%d%H%M%S') + '.html'
with open(filename, 'w', encoding='utf-8') as f:
f.write(html)
2.活动类
程序名:activity.py
class Activity:
"""
活动类,用于表示项目中的一个活动。
Attributes: id (int): 活动的唯一标识符。
name (str): 活动的名称。
duration (int): 活动的持续时间。
predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。
est (int): 活动的最早开始时间。
lst (int): 活动的最晚开始时间。
eft (int): 活动的最早完成时间。
lft (int): 活动的最晚完成时间。
tf (int): 活动的总浮动时间。
ff (int): 活动的自由浮动时间。
successors (List[int]): 活动的后继活动列表,存储后继活动的Activity对象。
"""
def __init__(self, id: int, name: str, duration: int, predecessors: List[int]):
"""
初始化活动对象。
Args: id (int): 活动的唯一标识符。
name (str): 活动的名称。
duration (int): 活动的持续时间。
predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。
""" self.id = id
self.name = name
self.duration = duration
self.predecessors = predecessors
self.est = None
self.lst = None
self.eft = None
self.lft = None
self.tf = None
self.ff = None
self.successors = []
def __str__(self):
return f"id: {self.id}, name: {self.name}, est: {self.est}, lst: {self.lst}, eft: {self.eft}, lft: {self.lft},"
+ f"successors: {self.successors}"
3.任务列表JSON文件
文件名:activities.json
[
{ "id": 1,
"name": "A",
"duration": 2,
"predecessors": []
},
{
"id": 9,
"name": "A2",
"duration": 3,
"predecessors": []
},
{
"id": 10,
"name": "A3",
"duration": 2,
"predecessors": []
},
{
"id": 2,
"name": "B",
"duration": 3,
"predecessors": [
1,
9
]
},
{
"id": 3,
"name": "C",
"duration": 4,
"predecessors": [
1
]
},
{
"id": 4,
"name": "D",
"duration": 2,
"predecessors": [
2,10
]
},
{
"id": 5,
"name": "E",
"duration": 3,
"predecessors": [
2
]
},
{
"id": 6,
"name": "F",
"duration": 2,
"predecessors": [
3
]
},
{
"id": 7,
"name": "G",
"duration": 3,
"predecessors": [
4,
5
]
},
{
"id": 8,
"name": "H",
"duration": 2,
"predecessors": [
6,
7
]
},
{
"id": 11,
"name": "H2",
"duration": 4,
"predecessors": [
6,
7
]
}
]
4.输出模板文件
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>PMP关键路径计算</title>
<style> table {
border-collapse: collapse;
width: 100%;
}
th, td {
border: 1px solid black;
padding: 8px;
text-align: center;
}
th {
background-color: #4CAF50;
color: white;
}
.critical {
background-color: #ffcccc;
}
</style>
</head>
<body>
<h1>活动清单</h1>
<table>
<tr>
<th>ID</th>
<th>活动名</th>
<th>持续时间</th>
<th>紧前活动</th>
<th>紧后活动</th>
<th>最早开始时间EST</th>
<th>最早结束时间EFT</th>
<th>最晚开始时间LST</th>
<th>最晚结束时间LFT</th>
<th>总浮动时间TF</th>
<th>自由浮动时间FF</th>
</tr>
{% for activity in activities %}
<tr {% if activity in critical_path %}class="critical" {% endif %}>
<td>{{ activity.id }}</td>
<td>{{ activity.name }}</td>
<td>{{ activity.duration }}</td>
<td> {% for predecessor in activity.predecessors %}
{% for act in activities %}
{% if act.id == predecessor %}
{{ act.name }}
{% endif %}
{% endfor %}
{% if not loop.last %}, {% endif %}
{% endfor %}
</td>
<td> {% for successor in activity.successors %}
{% for act in activities %}
{% if act.id == successor %}
{{ act.name }}
{% endif %}
{% endfor %}
{% if not loop.last %}, {% endif %}
{% endfor %}
</td>
<td>{{ activity.est }}</td>
<td>{{ activity.eft }}</td>
<td>{{ activity.lst }}</td>
<td>{{ activity.lft }}</td>
<td>{{ activity.tf }}</td>
<td>{{ activity.ff }}</td>
</tr>
{% endfor %}
</table>
<p>关键路径是: {% for activity in critical_path %}{{ activity.name }}{% if not loop.last %} -> {% endif %}{% endfor
%}</p>
<p>项目总工期: {{ project_duration }}</p>
<p>Precedence Diagramming Method: AON: <br/>[image]</p>
<p>
<table>
<tr>
<th>执行过程</th>
</tr>
{% for i in log %}
<tr>
<td style="text-align: left">{{i}}</td>
</tr>
{% endfor %}
</table>
</p>
</body>
</html>
来源:https://blog.csdn.net/u013589130/article/details/129542900
![](/images/zang.png)
![](/images/jiucuo.png)
猜你喜欢
使用Python发送各种形式的邮件的方法汇总
python 中的 super详解
![](https://img.aspxhome.com/file/2023/3/131563_0s.png)
python numpy.ndarray中如何将数据转为int型
![](https://img.aspxhome.com/file/2023/3/90713_0s.png)
微信小程序实现给嵌套template模板传递数据的方式总结
sql server中千万数量级分页存储过程代码
Python深度学习之FastText实现文本分类详解
![](https://img.aspxhome.com/file/2023/1/103311_0s.png)
pytest中的fixture基本用法
![](https://img.aspxhome.com/file/2023/0/100830_0s.jpg)
JDBC建立数据库连接的代码
python3中set(集合)的语法总结分享
numpy使用技巧之数组过滤实例代码
解决给dom元素绑定click等事件无效问题的方法
python实现八大排序算法(1)
![](https://img.aspxhome.com/file/2023/6/130996_0s.jpg)
python 删除excel表格重复行,数据预处理操作
python中random随机函数详解
![](https://img.aspxhome.com/file/2023/0/68970_0s.png)
Python help()函数用法详解
MySQL中修改表结构时需要注意的一些地方
![](https://img.aspxhome.com/file/2023/7/128087_0s.png)
JS判断元素是否在可视区域技巧详解
![](https://img.aspxhome.com/file/2023/8/136008_0s.jpg)
MobaXterm入门使用教程
![](https://img.aspxhome.com/file/2023/3/120543_0s.png)