> 技术文档 > 【python】装饰器:decorator_python装饰器

【python】装饰器:decorator_python装饰器


文章目录

  • 1. 装饰器的基本概念
  • 2. 装饰器的工作原理
  • 3. 处理函数参数和返回值
  • 4. 带参数的装饰器
  • 5. 类装饰器
    • 5.1 简单的类装饰器
    • 5.2 带参数的类装饰器
  • 6. 内置装饰器深入理解
    • 6.1 @property:让方法变成属性
    • 6.2 @staticmethod和@classmethod
    • 6.3 @functools.lru_cache:记忆化装饰器
  • 7. 高级装饰器模式
    • 7.1 装饰器链:多个装饰器的执行顺序
    • 7.2 条件装饰器:根据条件决定是否装饰
    • 7.3 参数验证装饰器
  • 8. 实际应用场景
    • 8.1 日志记录装饰器
    • 8.2 缓存装饰器(自定义实现)
    • 8.3 性能监控装饰器
    • 8.4 API限流装饰器
  • 9. 装饰器的最佳实践
    • 9.1 使用functools.wraps保持元信息
    • 9.2 装饰器的性能考虑
    • 9.3 装饰器的可组合性
  • 10. 装饰器的调试技巧

1. 装饰器的基本概念

🟢什么是装饰器?

装饰器是Python中的一种设计模式,它允许我们在不修改现有函数代码的情况下,为函数添加新的功能。从字面意思理解,装饰器就像是给函数\"装饰\"一下,让它变得更加强大。

想象一下,你有一个普通的房子(原始函数),装饰器就像是装修师傅,可以给房子加上门窗、刷漆、装灯等(添加新功能),但房子的基本结构不变。

🟢为什么需要装饰器?

在编程中,我们经常遇到这样的需求:

  • 需要记录函数的执行时间
  • 需要在函数执行前后打印日志
  • 需要验证函数的输入参数
  • 需要缓存函数的执行结果

如果没有装饰器,我们可能需要在每个函数中重复写这些代码,这违反了DRY(Don’t Repeat Yourself)原则。装饰器让我们可以把这些通用功能提取出来,重复使用。

🟢装饰器的本质

装饰器的本质是一个高阶函数Higher-Order Function),即接受(你定义的)函数作为参数,并返回(装饰后的)函数的函数。使用装饰器后,你的函数还使用原来的名称,但是功能已经得到增加。

🗝️在Python中,函数是\"一等公民\",可以像变量一样被传递、赋值和操作。

def greet(name): return f\"Hello, {name}!\"# 函数可以赋值给变量my_func = greetprint(my_func(\"Alice\")) # Hello, Alice!# 函数可以作为参数传递def call_func(func, arg): return func(arg)print(call_func(greet, \"Bob\")) # Hello, Bob!

这种特性为装饰器的实现提供了基础。


2. 装饰器的工作原理

🟢手动实现装饰器

在理解@语法之前,我们先看看如何手工实现装饰器的效果:

def simple_decorator(func): \"\"\"这是一个简单的装饰器函数\"\"\" def wrapper(): print(\"装饰器执行前\") func() # 调用原始函数 print(\"装饰器执行后\") return wrapper # 返回包装后的函数def say_hello(): \"\"\"原始函数\"\"\" print(\"Hello!\")# 手动装饰:用装饰器包装原函数decorated_func = simple_decorator(say_hello)decorated_func()

在这个例子中:

  1. simple_decorator是装饰器函数,它接受一个函数func作为参数
  2. 在内部定义了一个wrapper函数,这个函数在调用原函数前后添加了额外的行为
  3. 装饰器返回这个wrapper函数
  4. 最后我们得到一个增强版的函数

🟢使用@语法糖

@符号只是一个语法糖,让装饰器的使用更加简洁和直观:

def simple_decorator(func): def wrapper(): print(\"装饰器执行前\") func() print(\"装饰器执行后\") return wrapper@simple_decoratordef say_hello(): print(\"Hello!\")say_hello() # 自动被装饰

这里的@simple_decorator完全等价于:

say_hello = simple_decorator(say_hello)

语法糖是指不增加新功能、但用更简洁的写法替代常见模式,提升可读性和开发效率。常见示例有:

  • 列表/字典/集合推导式:一行生成序列或映射
  • 生成器表达式:惰性计算序列
  • 三元表达式:用“条件 ? 真值 : 假值”形式(Python 为  a  if  a > b  else  b a\\ \\text{if}\\ a>b\\ \\text{else}\\ b a if a>b else b
  • f f f-字符串:内嵌变量和表达式
  • with 上下文管理器:自动管理资源打开关闭
  • 装饰器(@):函数调用前后插入逻辑

3. 处理函数参数和返回值

🟢使用 *args**kwargs

上面的简单装饰器有一个问题:它只能装饰没有参数的函数。在实际应用中,函数通常有各种参数。我们需要让装饰器能够处理任意参数的函数。

示例:

def decorator_with_args(func): \"\"\"能处理任意参数的装饰器\"\"\" def wrapper(*args, **kwargs): print(f\"调用函数 {func.__name__}\") print(f\"位置参数: {args}\") print(f\"关键字参数: {kwargs}\") # 调用原函数并获取返回值 result = func(*args, **kwargs) print(f\"返回值: {result}\") return result # 重要:要返回原函数的返回值 return wrapper@decorator_with_argsdef add(a, b): \"\"\"加法函数\"\"\" return a + b@decorator_with_argsdef greet(name, greeting=\"Hello\"): \"\"\"问候函数\"\"\" return f\"{greeting}, {name}!\"# 测试print(add(3, 5))print(greet(\"Alice\", greeting=\"Hi\"))

在这个改进版本中:

  • *args收集所有位置参数
  • **kwargs收集所有关键字参数
  • 装饰器将这些参数原样传递给原函数
  • 装饰器还要记得返回原函数的返回值

🟢保留原函数的元信息

装饰器会改变函数的一些属性,比如函数名、文档字符串等(被例子中的wrapper函数的属性覆盖了)。为了保持原函数的身份信息,我们需要使用functools.wraps

import functoolsdef my_decorator(func): @functools.wraps(func) # 这一行很重要! def wrapper(*args, **kwargs): print(f\"调用函数: {func.__name__}\") return func(*args, **kwargs) return wrapper@my_decoratordef example_function(): \"\"\"这是一个示例函数\"\"\" pass# 如果不使用@functools.wraps,这些信息会丢失print(example_function.__name__) # example_function (而不是wrapper)print(example_function.__doc__) # 这是一个示例函数

functools.wraps是一个装饰器,专门用来装饰装饰器内部的wrapper函数,它会将原函数的元信息复制到wrapper函数上。

前后输出对比:

wrapperNoneexample_function这是一个示例函数

这个其实就是装饰器的装饰器,把func(你要装饰的函数)的信息复制给wrapper。


4. 带参数的装饰器

🟢装饰器工厂的概念

有时候我们希望装饰器本身也能接受参数来控制其行为。这时我们需要创建一个\"装饰器工厂\" — 一个返回装饰器的函数

def repeat(times): \"\"\"装饰器工厂:返回一个装饰器\"\"\" def decorator(func): \"\"\"真正的装饰器\"\"\" @functools.wraps(func) def wrapper(*args, **kwargs): \"\"\"包装函数\"\"\" for i in range(times): print(f\"第{i+1}次执行:\") result = func(*args, **kwargs) return result return wrapper return decorator@repeat(3) # 这里实际上是:repeat(3)返回一个装饰器,然后用这个装饰器装饰say_hellodef say_hello(name): print(f\"Hello, {name}!\")say_hello(\"Alice\")

这种三层嵌套的结构可能看起来复杂,我们来分解一下:

  1. 最外层repeat(times):装饰器工厂,接受装饰器的参数
  2. 中间层decorator(func):真正的装饰器,接受被装饰的函数
  3. 最内层wrapper(*args, **kwargs):包装函数,实际被调用的函数

执行过程是:

# @repeat(3) 等价于:temp_decorator = repeat(3) # 调用装饰器工厂,得到装饰器say_hello = temp_decorator(say_hello) # 用装饰器装饰函数

🟢更实用的例子:重试机制

让我们看一个更实用的带参数装饰器例子:

import timeimport randomimport functoolsdef retry(max_attempts=3, delay=1, backoff=2): \"\"\" 重试装饰器工厂 Args: max_attempts: 最大尝试次数 delay: 初始延迟时间(秒) backoff: 延迟时间的倍数因子 \"\"\" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): current_delay = delay last_exception = None for attempt in range(max_attempts): try:  return func(*args, **kwargs) except Exception as e:  last_exception = e  if attempt == max_attempts - 1: # 最后一次尝试失败,抛出异常 break  print(f\"第{attempt+1}次尝试失败: {e}\")  print(f\"等待{current_delay}秒后重试...\")  time.sleep(current_delay)  current_delay *= backoff # 递增延迟时间 # 所有尝试都失败了 raise last_exception return wrapper return decorator@retry(max_attempts=3, delay=0.5, backoff=2)def unstable_function(): \"\"\"模拟一个不稳定的函数\"\"\" if random.random() < 0.7: # 70%概率失败 raise ValueError(\"随机错误\") return \"成功!\"# 测试try: result = unstable_function() print(result)except Exception as e: print(f\"最终失败: {e}\")

5. 类装饰器

🟢什么是类装饰器?

除了函数,我们也可以使用类来实现装饰器。类装饰器需要实现__call__方法,使得类的实例可以像函数一样被调用。

类装饰器的优势在于可以维护状态,而函数装饰器通常是无状态的。

5.1 简单的类装饰器

class Counter: \"\"\"计数器装饰器:记录函数被调用的次数\"\"\" def __init__(self, func): self.func = func self.count = 0 def __call__(self, *args, **kwargs): \"\"\"使对象可以像函数一样被调用\"\"\" self.count += 1 print(f\"函数 {self.func.__name__} 被调用了 {self.count} 次\") return self.func(*args, **kwargs)@Counterdef say_hello(): print(\"Hello!\")say_hello() # 函数 say_hello 被调用了 1 次say_hello() # 函数 say_hello 被调用了 2 次

这里的工作原理是:

  1. 当我们使用@Counter装饰say_hello时,实际上是调用了Counter(say_hello)
  2. 这会创建一个Counter实例,并将原函数保存在实例中
  3. 由于实现了__call__方法,这个实例可以像函数一样被调用

5.2 带参数的类装饰器

class Timer: \"\"\"计时器装饰器:记录函数执行时间\"\"\" def __init__(self, prefix=\"\"): self.prefix = prefix def __call__(self, func): \"\"\"这个方法接受被装饰的函数作为参数\"\"\" @functools.wraps(func) def wrapper(*args, **kwargs): import time start_time = time.time() result = func(*args, **kwargs) end_time = time.time() print(f\"{self.prefix}函数 {func.__name__} 耗时: {end_time - start_time:.4f}秒\") return result return wrapper@Timer(prefix=\"[性能统计] \")def slow_function(): \"\"\"模拟一个耗时的函数\"\"\" import time time.sleep(1) return \"完成\"slow_function() # [性能统计] 函数 slow_function 耗时: 1.0012秒

带参数的类装饰器的工作流程:

  1. Timer(prefix=\"[性能统计] \")创建一个Timer实例
  2. @Timer(...)实际上是调用这个实例的__call__方法
  3. __call__方法接受被装饰的函数,返回包装后的函数

6. 内置装饰器深入理解

6.1 @property:让方法变成属性

@property装饰器让我们可以像访问属性一样访问方法,同时还可以在访问时进行一些计算或验证。

class Temperature: \"\"\"温度类:演示property的使用\"\"\" def __init__(self, celsius=0): self._celsius = celsius # 使用下划线表示私有属性 @property def celsius(self): \"\"\"获取摄氏温度\"\"\" return self._celsius @celsius.setter def celsius(self, value): \"\"\"设置摄氏温度,带有验证\"\"\" if value < -273.15: raise ValueError(\"温度不能低于绝对零度(-273.15°C)\") self._celsius = value @celsius.deleter def celsius(self): \"\"\"删除温度值\"\"\" print(\"删除温度值\") self._celsius = 0 @property def fahrenheit(self): \"\"\"华氏温度:只读属性,根据摄氏温度计算\"\"\" return self._celsius * 9/5 + 32 @fahrenheit.setter def fahrenheit(self, value): \"\"\"通过华氏温度设置摄氏温度\"\"\" self._celsius = (value - 32) * 5/9 @property def kelvin(self): \"\"\"开尔文温度:只读属性\"\"\" return self._celsius + 273.15# 使用示例temp = Temperature(25)print(f\"摄氏温度: {temp.celsius}°C\") # 25print(f\"华氏温度: {temp.fahrenheit}°F\") # 77.0print(f\"开尔文温度: {temp.kelvin}K\") # 298.15# 可以通过华氏温度设置temp.fahrenheit = 100print(f\"设置华氏温度100°F后,摄氏温度: {temp.celsius}°C\") # 37.77777777777778# 删除温度值del temp.celsius # 删除温度值

@property的好处:

  1. 封装:隐藏内部实现细节
  2. 验证:在设置值时可以进行验证
  3. 计算属性:可以动态计算属性值
  4. 兼容性:可以在不改变接口的情况下从简单属性升级为复杂属性

假设有一个类属性 x(x被property 装饰),下面是 @property 系列的 4 种装饰方法:

方法 说明 @property 把方法变为只读属性 @x.setter 为属性添加 setter 方法(可写) @x.deleter 为属性添加删除逻辑 @x.getter 再次定义 getter(较少用)

6.2 @staticmethod和@classmethod

这两个装饰器经常被混淆,让我们通过详细的例子来理解它们的区别:

class MathUtils: \"\"\"数学工具类:演示静态方法和类方法的区别\"\"\" pi = 3.14159 # 类属性 def __init__(self, precision=2): self.precision = precision # 实例属性 def round_number(self, number): \"\"\"普通实例方法:可以访问实例属性和类属性\"\"\" return round(number, self.precision) @staticmethod def add(a, b): \"\"\" 静态方法:不能访问实例或类,就像普通函数一样 用于那些在逻辑上属于这个类,但不需要访问类或实例数据的方法 \"\"\" return a + b @staticmethod def is_even(number): \"\"\"静态方法:判断数字是否为偶数\"\"\" return number % 2 == 0 @classmethod def get_pi(cls): \"\"\" 类方法:可以访问类属性,但不能访问实例属性 第一个参数是类本身(通常命名为cls) \"\"\" return cls.pi @classmethod def create_default_instance(cls): \"\"\" 类方法:创建类的实例 这是类方法的一个常见用途:提供alternative constructors \"\"\" return cls(precision=4) # 创建一个精度为4的实例 @classmethod def circle_area(cls, radius): \"\"\"类方法:计算圆的面积\"\"\" return cls.pi * radius ** 2# 使用示例utils = MathUtils(precision=3)# 实例方法:需要通过实例调用print(utils.round_number(3.14159)) # 3.142# 静态方法:可以通过类或实例调用print(MathUtils.add(3, 5)) # 8print(utils.add(3, 5)) # 8(也可以通过实例调用)print(MathUtils.is_even(4)) # True# 类方法:可以通过类或实例调用print(MathUtils.get_pi()) # 3.14159print(utils.get_pi())  # 3.14159print(MathUtils.circle_area(5)) # 78.53975# 类方法创建实例new_utils = MathUtils.create_default_instance()print(new_utils.precision) # 4

总结区别:

  • 实例方法:需要self参数,可以访问实例属性和类属性。
  • 静态方法:不需要selfcls参数,不能访问实例或类属性,就像普通函数, 是纯工具函数。
  • 类方法:需要cls参数,可以访问类属性,常用于创建替代构造函数。

6.3 @functools.lru_cache:记忆化装饰器

lru_cache是一个内置的缓存装饰器,可以缓存函数的返回值,避免重复计算:

import functools@functools.lru_cache(maxsize=128)def fibonacci(n): \"\"\" 计算斐波那契数列 使用lru_cache缓存结果,避免重复计算 \"\"\" if n < 2: return n return fibonacci(n-1) + fibonacci(n-2)# 测试性能差异import timedef fibonacci_without_cache(n): \"\"\"没有缓存的斐波那契函数\"\"\" if n < 2: return n return fibonacci_without_cache(n-1) + fibonacci_without_cache(n-2)# 比较性能start = time.time()result1 = fibonacci(35)time1 = time.time() - startstart = time.time()result2 = fibonacci_without_cache(35)time2 = time.time() - startprint(f\"带缓存: {result1}, 耗时: {time1:.4f}秒\")print(f\"不带缓存: {result2}, 耗时: {time2:.4f}秒\")print(f\"性能提升: {time2/time1:.2f}倍\")# 查看缓存信息print(f\"缓存信息: {fibonacci.cache_info()}\")

7. 高级装饰器模式

7.1 装饰器链:多个装饰器的执行顺序

当一个函数被多个装饰器装饰时,理解执行顺序很重要:

def decorator_a(func): print(\"装饰器A - 初始化\") def wrapper(*args, **kwargs): print(\"装饰器A - 执行前\") result = func(*args, **kwargs) print(\"装饰器A - 执行后\") return result return wrapperdef decorator_b(func): print(\"装饰器B - 初始化\") def wrapper(*args, **kwargs): print(\"装饰器B - 执行前\") result = func(*args, **kwargs) print(\"装饰器B - 执行后\") return result return wrapperdef decorator_c(func): print(\"装饰器C - 初始化\") def wrapper(*args, **kwargs): print(\"装饰器C - 执行前\") result = func(*args, **kwargs) print(\"装饰器C - 执行后\") return result return wrapperprint(\"=== 装饰器定义阶段 ===\")@decorator_a@decorator_b@decorator_cdef my_function(): print(\"原函数执行\") return \"完成\"print(\"\\n=== 函数执行阶段 ===\")result = my_function()print(f\"最终结果: {result}\")

执行顺序解析:

  1. 装饰器应用顺序:从下到上(C → B → A)
  2. 等价形式my_function = decorator_a(decorator_b(decorator_c(my_function)))
  3. 执行顺序:从外到内(A → B → C → 原函数 → C → B → A)

7.2 条件装饰器:根据条件决定是否装饰

def conditional_decorator(condition): \"\"\" 条件装饰器:只有在条件满足时才应用装饰器 这在调试或开发/生产环境切换时很有用 \"\"\" def decorator(func): if condition: @functools.wraps(func) def wrapper(*args, **kwargs): print(f\"条件满足,装饰器对 {func.__name__} 生效\") return func(*args, **kwargs) return wrapper else: # 条件不满足时,直接返回原函数 return func return decorator# 配置变量DEBUG = TruePRODUCTION = False@conditional_decorator(DEBUG)def debug_function(): print(\"这是一个调试函数\")@conditional_decorator(PRODUCTION)def production_function(): print(\"这是一个生产环境函数\")debug_function() # 会显示装饰器信息production_function() # 不会显示装饰器信息

7.3 参数验证装饰器

def validate_types(**expected_types): \"\"\" 类型验证装饰器:验证函数参数的类型 使用方式:@validate_types(name=str, age=int) \"\"\" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): # 获取函数的参数名 import inspect sig = inspect.signature(func) bound_args = sig.bind(*args, **kwargs) bound_args.apply_defaults() # 验证参数类型 for param_name, expected_type in expected_types.items(): if param_name in bound_args.arguments:  value = bound_args.arguments[param_name]  if not isinstance(value, expected_type): raise TypeError( f\"参数 {param_name} 期望类型 {expected_type.__name__}, \" f\"但得到 {type(value).__name__}\" ) return func(*args, **kwargs) return wrapper return decorator@validate_types(name=str, age=int, active=bool)def create_user(name, age, active=True): \"\"\"创建用户\"\"\" return f\"用户: {name}, 年龄: {age}, 状态: {\'激活\' if active else \'未激活\'}\"# 正确使用print(create_user(\"张三\", 25)) # 正常执行# 错误使用try: create_user(\"张三\", \"25\") # age应该是int,但传入了strexcept TypeError as e: print(f\"类型错误: {e}\")

8. 实际应用场景

8.1 日志记录装饰器

import functoolsimport loggingfrom datetime import datetime# 配置日志logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)def log_execution(level=logging.INFO, include_args=True, include_result=True): \"\"\" 日志记录装饰器 Args: level: 日志级别 include_args: 是否记录函数参数 include_result: 是否记录函数返回值 \"\"\" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): func_name = func.__name__ start_time = datetime.now() # 记录函数开始执行 log_msg = f\"开始执行函数: {func_name}\" if include_args: log_msg += f\", 参数: args={args}, kwargs={kwargs}\" logger.log(level, log_msg) try: # 执行函数 result = func(*args, **kwargs) # 记录执行成功 end_time = datetime.now() duration = (end_time - start_time).total_seconds() log_msg = f\"函数 {func_name} 执行成功, 耗时: {duration:.4f}秒\" if include_result:  log_msg += f\", 返回值: {result}\" logger.log(level, log_msg) return result except Exception as e: # 记录执行异常 end_time = datetime.now() duration = (end_time - start_time).total_seconds() logger.error(  f\"函数 {func_name} 执行失败, 耗时: {duration:.4f}秒, \"  f\"异常: {type(e).__name__}: {e}\" ) raise # 重新抛出异常 return wrapper return decorator@log_execution(include_args=True, include_result=True)def calculate_area(length, width): \"\"\"计算矩形面积\"\"\" if length <= 0 or width <= 0: raise ValueError(\"长度和宽度必须大于0\") return length * width# 测试calculate_area(5, 3) # 正常执行try: calculate_area(-1, 3) # 异常情况except ValueError: pass

8.2 缓存装饰器(自定义实现)

import functoolsimport timefrom typing import Any, Callable, Dict, Optionalclass CacheEntry: \"\"\"缓存条目\"\"\" def __init__(self, value: Any, expire_time: Optional[float] = None): self.value = value self.expire_time = expire_time self.created_time = time.time() def is_expired(self) -> bool: \"\"\"检查缓存是否过期\"\"\" if self.expire_time is None: return False return time.time() > self.expire_timedef cache_with_ttl(ttl: int = 300, maxsize: int = 128): \"\"\" 带有生存时间(TTL)的缓存装饰器 Args: ttl: 缓存生存时间(秒) maxsize: 最大缓存条目数 \"\"\" def decorator(func: Callable) -> Callable: cache: Dict[str, CacheEntry] = {} @functools.wraps(func) def wrapper(*args, **kwargs): # 生成缓存键 cache_key = str(args) + str(sorted(kwargs.items())) # 检查缓存是否存在且未过期 if cache_key in cache: entry = cache[cache_key] if not entry.is_expired():  print(f\"缓存命中: {func.__name__}\")  return entry.value else:  # 过期缓存,删除  del cache[cache_key] # 清理过期缓存 expired_keys = [k for k, v in cache.items() if v.is_expired()] for key in expired_keys: del cache[key] # 如果缓存满了,删除最老的条目 if len(cache) >= maxsize: oldest_key = min(cache.keys(), key=lambda k: cache[k].created_time) del cache[oldest_key] # 执行函数并缓存结果 print(f\"执行函数: {func.__name__}\") result = func(*args, **kwargs) expire_time = time.time() + ttl if ttl > 0 else None cache[cache_key] = CacheEntry(result, expire_time) return result # 添加缓存管理方法 def cache_info(): \"\"\"获取缓存信息\"\"\" total_entries = len(cache) expired_entries = sum(1 for entry in cache.values() if entry.is_expired()) return { \'total_entries\': total_entries, \'expired_entries让我继续完善这个缓存装饰器的例子:```python # 添加缓存管理方法 def cache_info(): \"\"\"获取缓存信息\"\"\" total_entries = len(cache) expired_entries = sum(1 for entry in cache.values() if entry.is_expired()) return { \'total_entries\': total_entries, \'expired_entries\': expired_entries, \'active_entries\': total_entries - expired_entries, \'max_size\': maxsize, \'ttl\': ttl } def cache_clear(): \"\"\"清空缓存\"\"\" cache.clear() print(\"缓存已清空\") # 将方法绑定到wrapper函数上 wrapper.cache_info = cache_info wrapper.cache_clear = cache_clear return wrapper return decorator# 使用示例@cache_with_ttl(ttl=5, maxsize=10) # 5秒过期,最多10个条目def expensive_calculation(n): \"\"\"模拟耗时计算\"\"\" print(f\"正在计算 {n} 的平方...\") time.sleep(1) # 模拟耗时操作 return n ** 2# 测试缓存效果print(\"=== 测试缓存功能 ===\")print(expensive_calculation(5)) # 执行函数: expensive_calculationprint(expensive_calculation(5)) # 缓存命中: expensive_calculationprint(expensive_calculation(3)) # 执行函数: expensive_calculationprint(\"\\n=== 缓存信息 ===\")print(expensive_calculation.cache_info())print(\"\\n=== 等待缓存过期 ===\")time.sleep(6) # 等待超过TTL时间print(expensive_calculation(5)) # 缓存过期,重新执行print(\"\\n=== 清空缓存 ===\")expensive_calculation.cache_clear()

8.3 性能监控装饰器

import timeimport functoolsimport psutilimport osfrom typing import Dict, Listclass PerformanceMonitor: \"\"\"性能监控器\"\"\" def __init__(self): self.stats: Dict[str, List[dict]] = {} def __call__(self, func): @functools.wraps(func) def wrapper(*args, **kwargs): func_name = func.__name__ # 记录开始状态 start_time = time.time() start_memory = self._get_memory_usage() start_cpu = psutil.cpu_percent() try: result = func(*args, **kwargs) success = True error = None except Exception as e: result = None success = False error = str(e) raise finally: # 记录结束状态 end_time = time.time() end_memory = self._get_memory_usage() end_cpu = psutil.cpu_percent() # 计算性能指标 execution_time = end_time - start_time memory_delta = end_memory - start_memory cpu_usage = (start_cpu + end_cpu) / 2 # 平均CPU使用率 # 记录统计信息 stats = {  \'timestamp\': time.time(),  \'execution_time\': execution_time,  \'memory_delta\': memory_delta,  \'cpu_usage\': cpu_usage,  \'success\': success,  \'error\': error } if func_name not in self.stats:  self.stats[func_name] = [] self.stats[func_name].append(stats) # 打印实时统计 self._print_stats(func_name, stats) return result return wrapper def _get_memory_usage(self) -> float: \"\"\"获取当前内存使用量(MB)\"\"\" process = psutil.Process(os.getpid()) return process.memory_info().rss / 1024 / 1024 def _print_stats(self, func_name: str, stats: dict): \"\"\"打印性能统计\"\"\" print(f\"\\n=== {func_name} 性能统计 ===\") print(f\"执行时间: {stats[\'execution_time\']:.4f}秒\") print(f\"内存变化: {stats[\'memory_delta\']:+.2f}MB\") print(f\"CPU使用率: {stats[\'cpu_usage\']:.1f}%\") print(f\"执行状态: {\'成功\' if stats[\'success\'] else \'失败\'}\") if stats[\'error\']: print(f\"错误信息: {stats[\'error\']}\") def get_summary(self, func_name: str) -> dict: \"\"\"获取函数的性能摘要\"\"\" if func_name not in self.stats: return {} records = self.stats[func_name] successful_records = [r for r in records if r[\'success\']] if not successful_records: return {\'error\': \'没有成功的执行记录\'} execution_times = [r[\'execution_time\'] for r in successful_records] memory_deltas = [r[\'memory_delta\'] for r in successful_records] cpu_usages = [r[\'cpu_usage\'] for r in successful_records] return { \'total_calls\': len(records), \'successful_calls\': len(successful_records), \'success_rate\': len(successful_records) / len(records) * 100, \'avg_execution_time\': sum(execution_times) / len(execution_times), \'min_execution_time\': min(execution_times), \'max_execution_time\': max(execution_times), \'avg_memory_delta\': sum(memory_deltas) / len(memory_deltas), \'avg_cpu_usage\': sum(cpu_usages) / len(cpu_usages) }# 创建性能监控装饰器实例monitor = PerformanceMonitor()@monitordef process_data(size): \"\"\"处理数据的函数\"\"\" data = list(range(size)) result = sum(x**2 for x in data) return result@monitordef memory_intensive_task(): \"\"\"内存密集型任务\"\"\" # 创建一个大列表 big_list = [i for i in range(1000000)] return len(big_list)# 测试性能监控print(\"=== 测试性能监控 ===\")process_data(100000)process_data(500000)memory_intensive_task()print(\"\\n=== 性能摘要 ===\")summary = monitor.get_summary(\'process_data\')for key, value in summary.items(): print(f\"{key}: {value}\")

8.4 API限流装饰器

import timeimport functoolsfrom collections import defaultdict, dequefrom typing import DefaultDict, Dequeclass RateLimiter: \"\"\"速率限制器\"\"\" def __init__(self, max_calls: int = 10, time_window: int = 60): self.max_calls = max_calls self.time_window = time_window # 使用deque存储每个用户的调用时间戳 self.call_times: DefaultDict[str, Deque[float]] = defaultdict(deque) def __call__(self, func): @functools.wraps(func) def wrapper(*args, **kwargs): # 尝试从kwargs中获取用户ID,如果没有则使用默认值 user_id = kwargs.get(\'user_id\', \'anonymous\') current_time = time.time() # 清理过期的调用记录 user_calls = self.call_times[user_id] while user_calls and current_time - user_calls[0] >= self.time_window: user_calls.popleft() # 检查是否超过限制 if len(user_calls) >= self.max_calls: oldest_call = user_calls[0] wait_time = self.time_window - (current_time - oldest_call) raise Exception(  f\"API调用频率限制: 用户 {user_id}{self.time_window}秒内\"  f\"最多只能调用 {self.max_calls} 次API。\"  f\"请等待 {wait_time:.1f} 秒后再试。\" ) # 记录本次调用 user_calls.append(current_time) # 执行原函数 result = func(*args, **kwargs) # 在结果中添加限流信息 if isinstance(result, dict): result[\'rate_limit_info\'] = {  \'remaining_calls\': self.max_calls - len(user_calls),  \'reset_time\': int(current_time + self.time_window) } return result return wrapper def get_user_status(self, user_id: str) -> dict: \"\"\"获取用户的限流状态\"\"\" current_time = time.time() user_calls = self.call_times[user_id] # 清理过期记录 while user_calls and current_time - user_calls[0] >= self.time_window: user_calls.popleft() return { \'user_id\': user_id, \'calls_made\': len(user_calls), \'calls_remaining\': self.max_calls - len(user_calls), \'time_window\': self.time_window, \'next_reset\': int(current_time + self.time_window) if user_calls else None }# 创建限流装饰器rate_limiter = RateLimiter(max_calls=5, time_window=10)@rate_limiterdef api_get_user_data(user_id, data_type=\"basic\"): \"\"\"模拟API调用:获取用户数据\"\"\" # 模拟API处理时间 time.sleep(0.1) return { \'user_id\': user_id, \'data_type\': data_type, \'data\': f\"用户 {user_id}{data_type} 数据\", \'timestamp\': time.time() }# 测试限流功能print(\"=== 测试API限流 ===\")user_id = \"user_123\"# 连续调用APIfor i in range(7): try: result = api_get_user_data(user_id=user_id, data_type=\"profile\") print(f\"调用 {i+1}: 成功\") print(f\" 剩余调用次数: {result.get(\'rate_limit_info\', {}).get(\'remaining_calls\', \'N/A\')}\") except Exception as e: print(f\"调用 {i+1}: 失败 - {e}\") time.sleep(0.5)# 查看用户状态print(f\"\\n用户状态: {rate_limiter.get_user_status(user_id)}\")

9. 装饰器的最佳实践

9.1 使用functools.wraps保持元信息

import functools# 正确的做法def good_decorator(func): @functools.wraps(func) # 保持原函数的元信息 def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper# 错误的做法def bad_decorator(func): def wrapper(*args, **kwargs): # 缺少@functools.wraps return func(*args, **kwargs) return wrapperdef test_function(): \"\"\"这是测试函数的文档\"\"\" pass# 应用装饰器good_test = good_decorator(test_function)bad_test = bad_decorator(test_function)print(\"=== 元信息对比 ===\")print(f\"原函数名: {test_function.__name__}\")print(f\"原函数文档: {test_function.__doc__}\")print(f\"好装饰器后函数名: {good_test.__name__}\")print(f\"好装饰器后文档: {good_test.__doc__}\")print(f\"坏装饰器后函数名: {bad_test.__name__}\")print(f\"坏装饰器后文档: {bad_test.__doc__}\")

9.2 装饰器的性能考虑

import functoolsimport timedef lightweight_decorator(func): \"\"\"轻量级装饰器:开销很小\"\"\" @functools.wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapperdef heavy_decorator(func): \"\"\"重量级装饰器:开销较大\"\"\" @functools.wraps(func) def wrapper(*args, **kwargs): # 模拟复杂的预处理 for _ in range(1000): pass result = func(*args, **kwargs) # 模拟复杂的后处理 for _ in range(1000): pass return result return wrapperdef simple_function(): \"\"\"简单函数\"\"\" return \"Hello\"# 性能测试def benchmark_decorator(decorator, iterations=10000): \"\"\"测试装饰器性能\"\"\" @decorator def test_func(): return \"test\" start_time = time.time() for _ in range(iterations): test_func() end_time = time.time() return end_time - start_timeprint(\"=== 装饰器性能对比 ===\")print(f\"无装饰器: {benchmark_decorator(lambda x: x):.4f}秒\")print(f\"轻量级装饰器: {benchmark_decorator(lightweight_decorator):.4f}秒\")print(f\"重量级装饰器: {benchmark_decorator(heavy_decorator):.4f}秒\")

9.3 装饰器的可组合性

import functoolsdef timing_decorator(func): \"\"\"计时装饰器\"\"\" @functools.wraps(func) def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) end = time.time() print(f\"{func.__name__} 耗时: {end-start:.4f}秒\") return result return wrapperdef logging_decorator(func): \"\"\"日志装饰器\"\"\" @functools.wraps(func) def wrapper(*args, **kwargs): print(f\"调用函数: {func.__name__}\") result = func(*args, **kwargs) print(f\"函数返回: {result}\") return result return wrapperdef error_handling_decorator(func): \"\"\"错误处理装饰器\"\"\" @functools.wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: print(f\"函数 {func.__name__} 发生错误: {e}\") return None return wrapper# 组合使用多个装饰器@timing_decorator@logging_decorator@error_handling_decoratordef complex_calculation(x, y): \"\"\"复杂计算函数\"\"\" if y == 0: raise ValueError(\"除数不能为0\") return x / yprint(\"=== 装饰器组合使用 ===\")result = complex_calculation(10, 2) # 正常情况print(f\"结果: {result}\")print(\"\\n=== 异常情况 ===\")result = complex_calculation(10, 0) # 异常情况print(f\"结果: {result}\")

10. 装饰器的调试技巧

🟢装饰器调试工具

import functoolsimport inspectdef debug_decorator(func): \"\"\"调试装饰器:显示详细的调用信息\"\"\" @functools.wraps(func) def wrapper(*args, **kwargs): # 获取调用栈信息 frame = inspect.currentframe() caller_frame = frame.f_back caller_info = inspect.getframeinfo(caller_frame) print(f\"\\n=== 调试信息: {func.__name__} ===\") print(f\"函数位置: {func.__code__.co_filename}:{func.__code__.co_firstlineno}\") print(f\"调用位置: {caller_info.filename}:{caller_info.lineno}\") print(f\"调用者: {caller_frame.f_code.co_name}\") print(f\"参数: args={args}, kwargs={kwargs}\") # 获取函数签名 sig = inspect.signature(func) try: bound_args = sig.bind(*args, **kwargs) bound_args.apply_defaults() print(f\"绑定参数: {dict(bound_args.arguments)}\") except Exception as e: print(f\"参数绑定失败: {e}\") # 执行函数 try: result = func(*args, **kwargs) print(f\"返回值: {result}\") print(f\"返回类型: {type(result)}\") return result except Exception as e: print(f\"执行异常: {type(e).__name__}: {e}\") raise finally: print(\"=== 调试信息结束 ===\\n\") return wrapper@debug_decoratordef calculate_area(length, width, unit=\"m²\"): \"\"\"计算面积\"\"\" if length <= 0 or width <= 0: raise ValueError(\"长度和宽度必须大于0\") return f\"{length * width}{unit}\"# 测试调试装饰器calculate_area(5, 3)calculate_area(5, 3, unit=\"cm²\")