Python实现关键路径和七格图计算详解

作者:田辛 时间:2022-04-25 12:17:46 

关键路径计算是项目管理中关于进度管理的基本计算。 但是对于绝大多数同学来说, 关键路径计算都只是对一些简单情形的计算。

今天,田老师根据以往的经验,用Python实现了在相对较复杂的情形下:

1.关键路径计算

2.七格图计算,包括

  • 最早结束时间EFT

  • 最晚开始时间LST

  • 最晚结束时间LFT

  • 总浮动时间TF

  • 自由浮动时间FF

3.紧前关系绘图法(AON)

并且, 将输出结果保存为网页。 而且在网页底部附上了完整的计算过程。

Python实现关键路径和七格图计算详解

这个Python程序的实现,可以帮助项目管理者更加方便地进行关键路径计算和进度管理。同时,通过输出结果保存为网页的形式,也可以方便地与团队成员进行共享和交流。

1.主程序

主程序主要实现了一个Project类,其中包含了计算关键路径和七格图的方法。具体实现方式如下:

1.  定义了一个Activity类,包含了活动的id、名称、持续时间和紧前任务列表等属性。 

2.  定义了一个Project类,包含了活动列表、项目持续时间、日志等属性,以及计算关键路径、计算七格图、计算总浮动时间、计算自由浮动时间等方法。

3.  从JSON文件中读取活动信息,并创建Project对象并添加活动。

4.  调用Project对象的calculate方法,计算每个活动的最早开始时间、最晚开始时间等数据。

5.  调用Project对象的calculate_critical_path方法,计算关键路径。

6.  调用Project对象的calculate_project_duration方法,计算项目总工期。

7.  使用Jinja2模板引擎生成项目的活动清单,并将关键路径

import json  
from datetime import datetime  
from typing import List  

import graphviz  
from jinja2 import Template  

from activity import Activity  

class Project:  
   def __init__(self):  
       self.activities: List[Activity] = []  
       self.duration = 0  
       self.logger = []  

def log(self, log: str) -> None:  
       self.logger.append(log)  

def add_activity(self, activity: Activity) -> None:  
       """  
       添加一个活动到项目中  
       :param  
       activity: 待添加的活动  
       """        # 将活动添加到项目中  
       self.activities.append(activity)  

def calculate(self) -> None:  
       """ 计算整个项目的关键信息  
       :return: None  
       """        self.calculate_successor()  
       self._calculate_forward_pass()  # 计算正推法  
       self._calculate_backward_pass()  # 计算倒推法  
       self._calculate_total_floats()  # 计算总浮动时间  
       self._calculate_free_floats()  # 计算自由浮动时间  

def calculate_successor(self) -> None:  
       self.log("开始计算紧后活动")  
       for act in self.activities:  
           for pred in act.predecessors:  
               for act_inner in self.activities:  
                   if act_inner.id == pred:  
                       act_inner.successors.append(act.id)  

def _calculate_forward_pass(self) -> None:  
       self.log("## 开始正推法计算")  
       # 进入 while 循环,只有当所有活动的最早开始时间和最早完成时间都已经计算出来时,才会退出循环  
       while not self._is_forward_pass_calculated():  
           # 遍历每个活动  
           for activity in self.activities:  
               # 如果活动的最早开始时间已经被计算过,则跳过  
               if activity.est is not None:  
                   continue  
               # 如果活动没有前置活动, 则从1开始计算最早开始时间和最早结束时间  
               if not activity.predecessors:  
                   activity.est = 1  
                   activity.eft = activity.est + activity.duration - 1  
                   self.log(  
                       f"活动 {activity.name} 没有紧前活动,设定最早开始时间为1, 并根据工期计算最早结束时间为{activity.eft}")  
               else:  
                   # 计算当前活动的所有前置活动的最早完成时间  
                   predecessors_eft = [act.eft for act in self.activities if  
                                       act.id in activity.predecessors and act.eft is not None]  
                   # 如果当前活动的所有前置活动的最早完成时间都已经计算出来,则计算当前活动的最早开始时间和最早完成时间  
                   if len(predecessors_eft) == len(activity.predecessors):  
                       activity.est = max(predecessors_eft) + 1  
                       activity.eft = activity.est + activity.duration - 1  
                       self.log(  
                           f"活动 {activity.name} 紧前活动已完成正推法计算, 开始日期按最早开始时间里面最大的," +  
                           f"设定为{activity.est}并根据工期计算最早结束时间为{activity.eft}")  

# 更新项目总持续时间为最大最早完成时间  
       self.duration = max([act.eft for act in self.activities])  

def _calculate_backward_pass(self) -> None:  
       """ 计算倒推法  
       :return: None  
       """  
       self.log("## 开始倒推法计算")  # 输出提示信息  
       # 进入 while 循环,只有当所有活动的最晚开始时间和最晚完成时间都已经计算出来时,才会退出循环  
       while not self._is_backward_pass_calculated():  
           # 遍历每个活动  
           for act in reversed(self.activities):  
               # 如果活动的最晚开始时间已经被计算过,则跳过  
               if act.lft is not None:  
                   continue  
               # 如果活动没有后继活动, 则从总持续时间开始计算最晚开始时间和最晚结束时间  
               if not act.successors:  
                   act.lft = self.duration  
                   act.lst = act.lft - act.duration + 1  
                   self.log(f"活动 {act.name} 没有紧后活动,按照正推工期设定最晚结束时间为{act.lft}," +  
                            f"并根据工期计算最晚开始时间为{act.lst}")  
               else:  
                   # 计算当前活动的所有后继活动的最晚开始时间  
                   successors_lst = self._calculate_lst(act)  
                   # 如果当前活动的所有后继活动的最晚开始时间都已经计算出来,则计算当前活动的最晚开始时间和最晚完成时间  
                   if len(successors_lst) == len(act.successors):  
                       act.lft = min(successors_lst) - 1  
                       act.lst = act.lft - act.duration + 1  
                       self.log(f"活动 {act.name} 紧后活动计算完成,按照倒推工期设定最晚结束时间为{act.lft}," +  
                                f"并根据工期计算最晚开始时间为{act.lst}")  
       # 更新项目总持续时间为最大最晚完成时间  
       self.duration = max([act.lft for act in self.activities])  

def _calculate_lst(self, activity: Activity) -> List[int]:  
       """计算某一活动的所有最晚开始时间  
       :param activity: 活动对象  
       :return: 最晚开始时间列表  
       """        rst = []  # 初始化结果列表  
       for act in activity.successors:  # 遍历该活动的后继活动  
           for act2 in self.activities:  # 遍历所有活动  
               if act2.id == act and act2.lst is not None:  # 如果找到了该后继活动且其最晚开始时间不为空  
                   rst.append(act2.lst)  # 将最晚开始时间加入结果列表  
       return rst  # 返回结果列表  

def _is_forward_pass_calculated(self) -> bool:  
       """ 判断整个项目正推法计算已经完成  
       :return: 若已计算正向传递则返回True,否则返回False  
       """  
       for act in self.activities:  # 遍历所有活动  
           if act.est is None or act.eft is None:  # 如果该活动的最早开始时间或最早完成时间为空  
               return False  # 则返回False,表示还未计算正向传递  
       return True  # 如果所有活动的最早开始时间和最早完成时间都已计算,则返回True,表示已计算正向传递  

def _is_backward_pass_calculated(self) -> bool:  
       """ 判断整个项目倒推法计算已经完成  
       :return: 若已计算倒推法则返回True,否则返回False  
       """        for act in self.activities:  # 遍历所有活动  
           if act.lst is None or act.lft is None:  # 如果该活动的最晚开始时间或最晚完成时间为空  
               return False  # 则返回False,表示还未计算倒推法  
       return True  # 如果所有活动的最晚开始时间和最晚完成时间都已计算,则返回True,表示已计算倒推法  

def _calculate_total_floats(self) -> None:  
       """ 计算所有活动的总浮动时间  
       :return: None  
        """  
       self.log(f"## 开始计算项目所有活动的总浮动时间")  
       for act in self.activities:  # 遍历所有活动  
           if act.est is not None and act.lst is not None:  # 如果该活动的最早开始时间和最晚开始时间都已计算  
               act.tf = act.lst - act.est  # 则计算该活动的总浮动时间  
               self.log(f"计算{act.name}的总浮动时间" + f"最晚开始时间{act.lst} - 最早开始时间{act.est} = {act.tf}", )  
           else:  # 如果该活动的最早开始时间或最晚开始时间为空  
               act.tf = None  # 则将该活动的总浮动时间设为None  

def _calculate_free_floats(self) -> None:  
       """ 计算所有活动的自由浮动时间  
       :return: None  
       """        self.log(f"## 开始计算项目所有活动的自由浮动时间")  # 输出提示信息  
       for act in self.activities:  # 遍历所有活动  
           if act.tf == 0:  # 如果该活动的总浮动时间为0  
               self.log(f"计算{act.name}的自由浮动时间" + f"因为{act.name}的总浮动时间为0,自由浮动时间为0")  # 输出提示信息  
               act.ff = 0  # 则将该活动的自由浮动时间设为0  
           elif act.tf > 0:  # 如果该活动的总浮动时间大于0  
               self.log(f"计算{act.name}的自由浮动时间")  # 输出提示信息  
               self.log(f"- {act.name}的总浮动时间{act.tf} > 0,")  # 输出提示信息  
               tmp = []  # 初始化临时列表  
               for act2 in self.activities:  # 遍历所有活动  
                   if act2.id in act.successors:  # 如果该活动是该活动的紧后活动  
                       self.log(f"- {act.name}的紧后活动{act2.name}的自由浮动动时间为{act2.tf}")  # 输出提示信息  
                       tmp.append(act2.tf)  # 将该紧后活动的自由浮动时间加入临时列表  
               if len(tmp) != 0:  # 如果临时列表不为空  
                   act.ff = act.tf - max(tmp)  # 则计算该活动的自由浮动时间  
                   if act.ff < 0:  
                       act.ff = 0  
                   self.log(f"- 用活动自己的总浮动{act.tf}减去多个紧后活动总浮动的最大值{max(tmp)} = {act.ff}")  
               else:  # 如果临时列表为空  
                   act.ff = act.tf  # 则将该活动的自由浮动时间设为总浮动时间  

def calculate_critical_path(self) -> List[Activity]:  
       """ 计算整个项目的关键路径  
       :return: 整个项目的关键路径  
       """        ctc_path = []  # 初始化关键路径列表  
       for act in self.activities:  # 遍历所有活动  
           if act.tf == 0:  # 如果该活动的总浮动时间为0  
               ctc_path.append(act)  # 则将该活动加入关键路径列表  
       return ctc_path  # 返回关键路径列表  

def calculate_project_duration(self) -> int:  
       """ 计算整个项目的持续时间  
       :return: 整个项目的持续时间  
       """        return max(activity.eft for activity in self.activities)  # 返回所有活动的最早完成时间中的最大值,即整个项目的持续时间  

# 从JSON文件中读取活动信息  
with open('activities.json', 'r', encoding='utf-8') as f:  
   activities_data = json.load(f)  

# 创建Project对象并添加活动  
project = Project()  
for activity_data in activities_data:  
   activity = Activity(  
       activity_data['id'],  
       activity_data['name'],  
       activity_data['duration'],  
       activity_data['predecessors']  
   )    project.add_activity(activity)  

# 计算每个活动的最早开始时间、最晚开始时间等数据  
project.calculate()  

# 计算关键路径和项目总工期  
critical_path = project.calculate_critical_path()  
project_duration = project.calculate_project_duration()  

# 生成项目的活动清单  
with open('template.html', 'r', encoding='utf-8') as f:  
   template = Template(f.read())  
html = template.render(  
   activities=project.activities,  
   critical_path=critical_path,  
   project_duration=project_duration,  
   log=project.logger  
)  

# 生成项目进度网络图  
aon_graph = graphviz.Digraph(format='png', graph_attr={'rankdir': 'LR'})  
for activity in project.activities:  
   aon_graph.node(str(activity.id), activity.name)  
   for predecessor in activity.predecessors:  
       aon_graph.edge(str(predecessor), str(activity.id))  

timestamp = datetime.now().strftime('%Y%m%d%H%M%S')  

aon_filename = f"aon_{timestamp}"  

aon_graph.render(aon_filename)  

# 将项目进度网络图插入到HTML文件中  
aon_image = f'<img src="{aon_filename}.png" alt="Precedence Diagramming Method: AON">'  
html = html.replace('<p>Precedence Diagramming Method: AON: <br/>[image]</p>',  
                   '<p>紧前关系绘图法: AON: <br/>' + aon_image + '</p>')  

filename = datetime.now().strftime('%Y%m%d%H%M%S') + '.html'  
with open(filename, 'w', encoding='utf-8') as f:  
   f.write(html)

2.活动类

程序名:activity.py

class Activity:  
   """  
   活动类,用于表示项目中的一个活动。  

Attributes:        id (int): 活动的唯一标识符。  
       name (str): 活动的名称。  
       duration (int): 活动的持续时间。  
       predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。  
       est (int): 活动的最早开始时间。  
       lst (int): 活动的最晚开始时间。  
       eft (int): 活动的最早完成时间。  
       lft (int): 活动的最晚完成时间。  
       tf (int): 活动的总浮动时间。  
       ff (int): 活动的自由浮动时间。  
       successors (List[int]): 活动的后继活动列表,存储后继活动的Activity对象。  
   """  
   def __init__(self, id: int, name: str, duration: int, predecessors: List[int]):  
       """  
       初始化活动对象。  

Args:            id (int): 活动的唯一标识符。  
           name (str): 活动的名称。  
           duration (int): 活动的持续时间。  
           predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。  
       """        self.id = id  
       self.name = name  
       self.duration = duration  
       self.predecessors = predecessors  
       self.est = None  
       self.lst = None  
       self.eft = None  
       self.lft = None  
       self.tf = None  
       self.ff = None  
       self.successors = []  

def __str__(self):  
       return f"id: {self.id}, name: {self.name}, est: {self.est}, lst: {self.lst}, eft: {self.eft}, lft: {self.lft},"  
       + f"successors: {self.successors}"

3.任务列表JSON文件

文件名:activities.json

[  
 {    "id": 1,  
   "name": "A",  
   "duration": 2,  
   "predecessors": []  
 },  
 {  
   "id": 9,  
   "name": "A2",  
   "duration": 3,  
   "predecessors": []  
 },  
   {  
   "id": 10,  
   "name": "A3",  
   "duration": 2,  
   "predecessors": []  
 },  
 {  
   "id": 2,  
   "name": "B",  
   "duration": 3,  
   "predecessors": [  
     1,  
     9  
   ]  
 },  
 {  
   "id": 3,  
   "name": "C",  
   "duration": 4,  
   "predecessors": [  
     1  
   ]  
 },  
 {  
   "id": 4,  
   "name": "D",  
   "duration": 2,  
   "predecessors": [  
     2,10  
   ]  
 },  
 {  
   "id": 5,  
   "name": "E",  
   "duration": 3,  
   "predecessors": [  
     2  
   ]  
 },  
 {  
   "id": 6,  
   "name": "F",  
   "duration": 2,  
   "predecessors": [  
     3  
   ]  
 },  
 {  
   "id": 7,  
   "name": "G",  
   "duration": 3,  
   "predecessors": [  
     4,  
     5  
   ]  
 },  
 {  
   "id": 8,  
   "name": "H",  
   "duration": 2,  
   "predecessors": [  
     6,  
     7  
   ]  
 },  
 {  
   "id": 11,  
   "name": "H2",  
   "duration": 4,  
   "predecessors": [  
     6,  
     7  
   ]  
 }  
]

4.输出模板文件

<!DOCTYPE html>  
<html>  
<head>  
   <meta charset="UTF-8">  
   <title>PMP关键路径计算</title>  
   <style>        table {  
           border-collapse: collapse;  
           width: 100%;  
       }  

th, td {  
           border: 1px solid black;  
           padding: 8px;  
           text-align: center;  
       }  

th {  
           background-color: #4CAF50;  
           color: white;  
       }  

.critical {  
           background-color: #ffcccc;  
       }  
   </style>  
</head>  
<body>  
<h1>活动清单</h1>  
<table>  
   <tr>  
       <th>ID</th>  
       <th>活动名</th>  
       <th>持续时间</th>  
       <th>紧前活动</th>  
       <th>紧后活动</th>  
       <th>最早开始时间EST</th>  
       <th>最早结束时间EFT</th>  
       <th>最晚开始时间LST</th>  
       <th>最晚结束时间LFT</th>  
       <th>总浮动时间TF</th>  
       <th>自由浮动时间FF</th>  
   </tr>  
   {% for activity in activities %}  
   <tr {% if activity in critical_path %}class="critical" {% endif %}>  
       <td>{{ activity.id }}</td>  
       <td>{{ activity.name }}</td>  
       <td>{{ activity.duration }}</td>  
       <td>            {% for predecessor in activity.predecessors %}  
           {% for act in activities %}  
           {% if act.id == predecessor %}  
           {{ act.name }}  
           {% endif %}  
           {% endfor %}  
           {% if not loop.last %}, {% endif %}  
           {% endfor %}  
       </td>  
       <td>            {% for successor in activity.successors %}  
           {% for act in activities %}  

{% if act.id == successor %}  
           {{ act.name }}  
           {% endif %}  
           {% endfor %}  
           {% if not loop.last %}, {% endif %}  
           {% endfor %}  
       </td>  
       <td>{{ activity.est }}</td>  
       <td>{{ activity.eft }}</td>  
       <td>{{ activity.lst }}</td>  
       <td>{{ activity.lft }}</td>  
       <td>{{ activity.tf }}</td>  
       <td>{{ activity.ff }}</td>  
   </tr>  
   {% endfor %}  
</table>  
<p>关键路径是: {% for activity in critical_path %}{{ activity.name }}{% if not loop.last %} -> {% endif %}{% endfor  
   %}</p>  
<p>项目总工期: {{ project_duration }}</p>  
<p>Precedence Diagramming Method: AON: <br/>[image]</p>  
<p>  
<table>  
   <tr>  
       <th>执行过程</th>  
   </tr>  
   {% for i in log %}  
   <tr>  
       <td style="text-align: left">{{i}}</td>  
   </tr>  
   {% endfor %}  
</table>  
</p>  
</body>  
</html>

来源:https://blog.csdn.net/u013589130/article/details/129542900

标签:Python,关键路径,七格图
0
投稿

猜你喜欢

  • 使用Python发送各种形式的邮件的方法汇总

    2022-12-21 15:24:46
  • python 中的 super详解

    2023-09-07 01:27:35
  • python numpy.ndarray中如何将数据转为int型

    2022-11-21 16:14:09
  • 微信小程序实现给嵌套template模板传递数据的方式总结

    2024-05-22 10:31:50
  • sql server中千万数量级分页存储过程代码

    2024-01-18 04:36:20
  • Python深度学习之FastText实现文本分类详解

    2022-09-03 10:35:28
  • pytest中的fixture基本用法

    2023-07-14 12:26:45
  • JDBC建立数据库连接的代码

    2024-01-13 16:06:11
  • python3中set(集合)的语法总结分享

    2022-06-06 21:44:56
  • numpy使用技巧之数组过滤实例代码

    2021-07-13 14:55:52
  • 解决给dom元素绑定click等事件无效问题的方法

    2024-04-16 10:36:42
  • python实现八大排序算法(1)

    2022-05-20 08:17:47
  • python 删除excel表格重复行,数据预处理操作

    2023-04-19 06:10:33
  • python中random随机函数详解

    2022-06-14 22:34:43
  • Python help()函数用法详解

    2022-09-15 13:15:34
  • MySQL中修改表结构时需要注意的一些地方

    2024-01-29 09:04:11
  • JS判断元素是否在可视区域技巧详解

    2024-04-22 12:56:23
  • MobaXterm入门使用教程

    2023-11-23 12:26:53
  • 对python 通过ssh访问数据库的实例详解

    2024-01-16 07:32:12
  • PHP平滑关闭/重启的实现方法

    2023-10-05 08:48:29
  • asp之家 网络编程 m.aspxhome.com