跳到主要内容

清理重复的微软日历事件

阅读需 4 分钟

清理重复的日历事件

这个 Python 脚本旨在帮助你自动清理 Microsoft 365 账户中重复的日历事件。它通过并发请求的方式高效地识别并删除多余的日程,从而保持你的日历整洁有序。

主要功能:

  • 高效查重:脚本会获取指定月份的所有日历事件,并识别出那些在同一分钟内创建、且主题完全相同的重复事件。

  • 并发删除:它利用异步编程(asyncioaiohttp),以并发的方式批量发送删除请求,大大加快了处理速度。

  • 智能重试:脚本内置了针对 API 限流(429 错误)和令牌过期(401 错误)的重试机制,确保操作的稳定性和成功率。

  • ACCESS_TOKEN 需要自己弄,过期时间很短


import asyncio
import datetime
from collections import defaultdict

import aiohttp

# 您的访问令牌。请替换为实际获取到的 access token。
# 注意:在实际应用中,不应将令牌硬编码在这里,应动态获取。
ACCESS_TOKEN = ""

# Microsoft Graph API 的基本 URL
GRAPH_API_URL = "https://graph.microsoft.com/v1.0"

# 全局的访问令牌,在需要时可以更新
global_access_token = ACCESS_TOKEN

# 请求头,包含访问令牌
def get_headers():
return {
"Authorization": f"Bearer {global_access_token}",
"Content-Type": "application/json",
}

# 模拟令牌刷新函数,替换为你实际的获取令牌逻辑
async def refresh_access_token():
"""
模拟刷新访问令牌。
在实际应用中,这里应调用 _get_ms_token() 或其他刷新令牌的函数。
"""
print("访问令牌可能已过期,正在尝试刷新。..")
# 你的 _get_ms_token() 函数是同步的,为了在异步中使用,需要用 run_in_executor
# 但由于没有你的真实认证逻辑,这里仅用一个示例字符串代替
new_token = "新_访问_令牌" # _get_ms_token()
global global_access_token
global_access_token = new_token
print("令牌已刷新。")

async def get_calendar_events_by_month(session, start_date: datetime.datetime, end_date: datetime.datetime):
"""
按月份查询日历事件,并处理分页以获取所有结果。
"""
all_events = []
endpoint = f"{GRAPH_API_URL}/me/calendarview"
params = {
"startdatetime": start_date.isoformat(),
"enddatetime": end_date.isoformat(),
"$select": "subject,start,end,id",
}
url = endpoint

while url:
try:
headers = get_headers()
if url == endpoint:
async with session.get(url, headers=headers, params=params) as response:
response.raise_for_status()
data = await response.json()
else:
async with session.get(url, headers=headers) as response:
response.raise_for_status()
data = await response.json()

all_events.extend(data.get("value", []))
url = data.get("@odata.nextLink")
if url:
print(f"发现下一页数据,正在获取。.. {len(all_events)} 个事件")

except aiohttp.ClientResponseError as err:
print(f"查询日历事件失败:{err}")
return []
return all_events

def find_duplicate_events(events: list):
"""
识别同一分钟内主题完全相同的重复事件,并返回事件 ID、名称和日期。
"""
event_map = defaultdict(list)
for event in events:
start_time_str = event["start"]["dateTime"]
# 获取事件开始时间并精确到分钟
start_minute = start_time_str[:16]
# 获取事件日期
start_date = start_time_str.split("T")[0]
subject = event["subject"]
event_id = event["id"]
# 查重键只包括开始时间和主题
event_map[(start_minute, subject)].append({
"id": event_id,
"subject": subject,
"date": start_date
})

# 修改返回格式,包含所有需要的信息
duplicate_events = {key: ids for key, ids in event_map.items() if len(ids) > 1}
return duplicate_events

async def delete_event(session, event_info: dict, retry_count=3):
"""
异步地删除单个日历事件,并处理重试。

Args:
session: aiohttp.ClientSession 对象。
event_info: 包含 'id', 'subject', 'date' 的字典。
retry_count: 重试次数。
"""
event_id = event_info['id']
subject = event_info['subject']
event_date = event_info['date']
endpoint = f"{GRAPH_API_URL}/me/events/{event_id}"
headers = get_headers()

for attempt in range(retry_count):
try:
async with session.delete(endpoint, headers=headers) as response:
response.raise_for_status()
# 打印更详细的成功信息
print(f"事件 '{subject}' (ID: {event_id}, 日期:{event_date}) 删除成功。")
return True
except aiohttp.ClientResponseError as err:
# 处理 429 和 401 错误
if err.status == 429:
retry_after = int(err.headers.get("Retry-After", 5))
print(f"收到 429 错误,等待 {retry_after} 秒后重试。.. (第 {attempt + 1}/{retry_count} 次尝试)")
await asyncio.sleep(retry_after)
elif err.status == 401:
print(f"收到 401 错误,令牌可能已过期。正在刷新令牌。.. (第 {attempt + 1}/{retry_count} 次尝试)")
await refresh_access_token()
headers = get_headers() # 使用新令牌更新头部
else:
print(f"删除事件 '{subject}' (ID: {event_id}, 日期:{event_date}) 失败:{err}")
return False

print(f"删除事件 '{subject}' (ID: {event_id}, 日期:{event_date}) 失败,重试 {retry_count} 次后仍未成功。")
return False

async def async_main_process():
"""
主流程函数,按月份循环执行“查询 -> 识别 -> 删除”操作。
"""
year = 2024
months_to_process = list(range(1, 12))

async with aiohttp.ClientSession() as session:
for month in months_to_process:
start_of_month = datetime.datetime(year, month, 1, tzinfo=datetime.timezone.utc)
if month == 12:
end_of_month = datetime.datetime(year + 1, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(
seconds=1)
else:
end_of_month = datetime.datetime(year, month + 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(
seconds=1)

print(f"\n--- 正在处理 {year}{month} 月 ---")

events_data = await get_calendar_events_by_month(session, start_of_month, end_of_month)

if not events_data:
print("当月没有找到事件或查询失败。")
continue

print(f"找到 {len(events_data)} 个事件。")
duplicate_events_map = find_duplicate_events(events_data)

if not duplicate_events_map:
print("没有发现重复事件。")
continue

print(f"找到 {len(duplicate_events_map)} 组重复事件。")

all_delete_tasks = []
for (minute, subject), event_list in duplicate_events_map.items():
print(f"发现重复事件:主题='{subject}', 时间='{minute}', 共 {len(event_list)} 个。")
# 保留第一个,删除后面的所有重复项
events_to_delete = event_list[1:]
print(f"将删除 {len(events_to_delete)} 个事件。")

for event_info in events_to_delete:
all_delete_tasks.append(delete_event(session, event_info))

print(f"\n--- 本月总计将并发删除 {len(all_delete_tasks)} 个事件 ---")

if all_delete_tasks:
await asyncio.gather(*all_delete_tasks)
else:
print("没有找到需要删除的事件。")

if __name__ == "__main__":
asyncio.run(async_main_process())

Loading Comments...