💼 Redis 实战应用

Redis 在实际项目中的典型应用场景,包含实现方案、核心命令和常见问题解决。

场景一:缓存系统

最常见的 Redis 应用场景,加速数据访问,减轻数据库压力。

展开

📋 场景描述

在 Web 应用中,数据库查询往往是性能瓶颈。使用 Redis 缓存热点数据,可以显著提升响应速度,降低数据库负载。

🏗️ 实现方案

客户端
Redis 缓存
↘ 未命中
数据库
# 缓存读取流程 1. 先查 Redis:GET key 2. 如果存在,直接返回 3. 如果不存在,查询数据库 4. 将结果写入缓存:SET key value EX 3600

🔧 核心命令

命令 说明 示例
SET key value EX seconds 设置缓存,带过期时间 SET user:1001 {...} EX 3600
GET key 获取缓存 GET user:1001
MGET key1 key2 批量获取 MGET user:1 user:2 user:3
DEL key 删除缓存 DEL user:1001
EXISTS key 检查是否存在 EXISTS user:1001

💻 代码示例(Python)

import redis import json r = redis.Redis(host='localhost', port=6379, decode_responses=True) def get_user(user_id): cache_key = f"user:{user_id}" # 1. 尝试从缓存获取 cached = r.get(cache_key) if cached: return json.loads(cached) # 2. 缓存未命中,查询数据库 user = db.query("SELECT * FROM users WHERE id = ?", user_id) # 3. 写入缓存,1 小时过期 if user: r.setex(cache_key, 3600, json.dumps(user)) return user def update_user(user_id, data): cache_key = f"user:{user_id}" # 更新数据库 db.update("users", user_id, data) # 删除缓存(下次读取时重新加载) r.delete(cache_key)

⚠️ 常见问题与解决方案

问题 1:缓存穿透

查询不存在的数据,缓存层和数据库层都没有,导致每次请求都打到数据库。

解决方案:
  • 缓存空值:即使数据库中没有,也在 Redis 中缓存一个空值,设置较短过期时间
  • 布隆过滤器:使用 Bloom Filter 预先判断数据是否存在
# 缓存空值方案 cached = r.get(cache_key) if cached == "NULL": return None # 直接返回,不查数据库 user = db.query(...) if not user: r.setex(cache_key, 60, "NULL") # 空值缓存 1 分钟
问题 2:缓存雪崩

大量缓存同时过期,导致请求全部打到数据库。

解决方案:
  • 随机过期时间:在基础过期时间上增加随机值
  • 永不过期:逻辑过期,异步更新
  • 多级缓存:本地缓存 + Redis 缓存
# 随机过期时间 base_ttl = 3600 random_ttl = base_ttl + random.randint(0, 1800) # 1-1.5 小时 r.setex(key, random_ttl, value)
问题 3:缓存击穿

热点 key 过期瞬间,大量请求涌入数据库。

解决方案:
  • 互斥锁:只让一个请求查数据库,其他等待
  • 逻辑过期:不设置物理过期,value 中包含逻辑过期时间
# 互斥锁方案 def get_with_lock(key): cached = r.get(key) if cached: return cached # 尝试获取锁 lock_key = f"lock:{key}" if r.set(lock_key, "1", nx=True, ex=10): try: # 双重检查 cached = r.get(key) if cached: return cached # 查数据库并缓存 value = db.query(...) r.setex(key, 3600, value) return value finally: r.delete(lock_key) else: # 等待后重试 time.sleep(0.1) return get_with_lock(key)
👤

场景二:会话管理(Session)

分布式系统中的用户会话存储方案。

展开

📋 场景描述

在分布式 Web 应用中,需要在多个服务器之间共享用户会话信息。Redis 是理想的 Session 存储方案。

🏗️ 实现方案

# Session 数据结构 session_id = "sess:abc123..." { "user_id": 1001, "username": "张三", "login_time": "2024-01-01 10:00:00", "last_activity": "2024-01-01 12:30:00", "ip_address": "192.168.1.100" }

🔧 核心命令

命令 说明 示例
SETEX key seconds value 设置 Session,带过期时间 SETEX sess:abc123 1800 {...}
EXPIRE key seconds 续期 Session EXPIRE sess:abc123 1800
HSET key field value 更新 Session 字段 HSET sess:abc123 last_activity "12:30"
HGETALL key 获取完整 Session HGETALL sess:abc123
DEL key 注销 Session DEL sess:abc123

💻 代码示例(Node.js + Express)

const redis = require('redis'); const { promisify } = require('util'); const crypto = require('crypto'); const client = redis.createClient(); const getAsync = promisify(client.get).bind(client); const setexAsync = promisify(client.setex).bind(client); const delAsync = promisify(client.del).bind(client); // 生成 Session ID function generateSessionId() { return crypto.randomBytes(32).toString('hex'); } // 创建 Session async function createSession(userId, userData) { const sessionId = generateSessionId(); const sessionData = { user_id: userId, ...userData, login_time: new Date().toISOString() }; // 30 分钟过期 await setexAsync(`sess:${sessionId}`, 1800, JSON.stringify(sessionData)); return sessionId; } // 获取 Session async function getSession(sessionId) { const data = await getAsync(`sess:${sessionId}`); if (!data) return null; const session = JSON.parse(data); // 续期 await client.expire(`sess:${sessionId}`, 1800); return session; } // 销毁 Session async function destroySession(sessionId) { await delAsync(`sess:${sessionId}`); } // 中间件 app.use(async (req, res, next) => { const sessionId = req.cookies.sessionId; if (sessionId) { req.session = await getSession(sessionId); } next(); });

⚠️ 常见问题与解决方案

问题 1:Session 丢失

Redis 重启或故障导致 Session 数据丢失,用户需要重新登录。

解决方案:
  • 开启 AOF 持久化:每秒同步,最多丢失 1 秒数据
  • Redis 主从复制:从节点快速接管
  • Redis Sentinel:自动故障转移
  • 重要数据持久化到数据库
问题 2:Session 劫持

Session ID 被窃取,攻击者冒充用户。

解决方案:
  • HTTPS 传输:加密通信
  • 绑定 IP 地址:验证请求 IP
  • 定期更换 Session ID
  • 设置 HttpOnly Cookie
# 绑定 IP 验证 session_data = get_session(session_id) if session_data['ip_address'] != request.ip: destroy_session(session_id) # 销毁可疑 Session return error("Session 异常")
🏆

场景三:排行榜系统

使用有序集合实现实时排行榜。

展开

📋 场景描述

游戏积分排行、销售排行、热度排行等场景,需要实时排序和查询排名。

🏗️ 实现方案

# 使用 ZSet(有序集合) # key: leaderboard:game1 # member: 用户 ID # score: 分数 # 数据结构示例 leaderboard:game1 = { "user:1001": 9500, "user:1002": 8800, "user:1003": 9200, ... }

🔧 核心命令

命令 说明 示例
ZADD key score member 添加/更新分数 ZADD leaderboard:game1 9500 user:1001
ZREVRANGE key start stop 获取前 N 名(降序) ZREVRANGE leaderboard:game1 0 9
ZREVRANK key member 获取排名 ZREVRANK leaderboard:game1 user:1001
ZSCORE key member 获取分数 ZSCORE leaderboard:game1 user:1001
ZINCRBY key increment member 增加分数 ZINCRBY leaderboard:game1 100 user:1001
ZCARD key 获取总人数 ZCARD leaderboard:game1

💻 代码示例

import redis r = redis.Redis(host='localhost', port=6379) def add_score(user_id, score, leaderboard='game1'): """添加或更新分数""" key = f"leaderboard:{leaderboard}" r.zadd(key, {user_id: score}) def increment_score(user_id, increment, leaderboard='game1'): """增加分数""" key = f"leaderboard:{leaderboard}" return r.zincrby(key, increment, user_id) def get_top_n(n=10, leaderboard='game1'): """获取前 N 名""" key = f"leaderboard:{leaderboard}" # 降序获取前 N 名,带分数 return r.zrevrange(key, 0, n-1, withscores=True) def get_user_rank(user_id, leaderboard='game1'): """获取用户排名""" key = f"leaderboard:{leaderboard}" rank = r.zrevrank(key, user_id) return rank + 1 if rank is not None else None # 从 1 开始 def get_user_score(user_id, leaderboard='game1'): """获取用户分数""" key = f"leaderboard:{leaderboard}" return r.zscore(key, user_id) # 使用示例 add_score("user:1001", 9500) increment_score("user:1001", 100) # 加 100 分 top10 = get_top_n(10) rank = get_user_rank("user:1001") # 获取排名

💡 进阶功能

# 1. 获取用户周围排名(我的前后 5 名) def get_around_rank(user_id, leaderboard='game1'): key = f"leaderboard:{leaderboard}" my_rank = r.zrevrank(key, user_id) if my_rank is None: return None start = max(0, my_rank - 5) end = my_rank + 5 return r.zrevrange(key, start, end, withscores=True) # 2. 按分数范围查询 def get_by_score_range(min_score, max_score, leaderboard='game1'): key = f"leaderboard:{leaderboard}" return r.zrangebyscore(key, min_score, max_score, withscores=True) # 3. 删除低分用户(保留前 1000 名) def trim_leaderboard(keep_top=1000, leaderboard='game1'): key = f"leaderboard:{leaderboard}" # 删除 1000 名之后的所有用户 r.zremrangebyrank(key, keep_top, -1) # 4. 多榜单管理(日榜、周榜、月榜) def add_score_multi(user_id, score): # 同时更新多个榜单 r.zadd("leaderboard:daily", {user_id: score}) r.zadd("leaderboard:weekly", {user_id: score}) r.zadd("leaderboard:monthly", {user_id: score}) r.zadd("leaderboard:all", {user_id: score})

⚠️ 常见问题与解决方案

问题 1:分数相同排名处理

多个用户分数相同时,如何确定排名?

解决方案:
  • 使用时间戳作为小数部分:score = 主分数 + 时间戳小数
  • 先达到该分数的用户排名靠前
# 分数相同,先达到的排名靠前 def add_score_with_time(user_id, score): # 使用时间戳的小数部分(精确到秒) time_decimal = time.time() % 1 / 1000000 final_score = score + time_decimal r.zadd("leaderboard", {user_id: final_score})
问题 2:大数据量性能

排行榜用户量巨大(百万级),查询性能下降。

解决方案:
  • 分片:按用户 ID 范围分多个 ZSet
  • 只保留活跃用户:定期清理低分用户
  • 缓存 Top N:将前 100 名缓存到 String
🔒

场景四:分布式锁

在分布式系统中实现互斥访问。

展开

📋 场景描述

分布式系统中,多个节点需要互斥访问共享资源,如防止重复下单、库存扣减等。

🏗️ 实现方案

# 分布式锁要求 1. 互斥性:同一时刻只有一个客户端能持有锁 2. 防死锁:锁必须有超时机制 3. 安全性:只能由加锁者解锁 4. 容错性:Redis 节点故障时仍能工作(RedLock)

🔧 核心命令

命令 说明 示例
SET key value NX EX seconds 原子性加锁 SET lock:order123 uuid NX EX 10
DEL key 释放锁 DEL lock:order123
GET key 检查锁持有者 GET lock:order123
EXPIRE key seconds 续期锁 EXPIRE lock:order123 10

💻 代码示例(Python)

import redis import uuid import time class RedisLock: def __init__(self, redis_client, lock_name, timeout=10): self.redis = redis_client self.lock_name = f"lock:{lock_name}" self.timeout = timeout self.lock_id = str(uuid.uuid4()) def acquire(self): """获取锁""" # SET key value NX EX seconds - 原子操作 result = self.redis.set( self.lock_name, self.lock_id, nx=True, # 不存在时才设置 ex=self.timeout # 过期时间 ) return result is True def release(self): """释放锁 - 使用 Lua 脚本保证原子性""" lua_script = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ script = self.redis.register_script(lua_script) return script(keys=[self.lock_name], args=[self.lock_id]) def __enter__(self): # 尝试获取锁,最多重试 10 次 for _ in range(10): if self.acquire(): return self time.sleep(0.1) raise Exception("获取锁失败") def __exit__(self, exc_type, exc_val, exc_tb): self.release() # 使用示例 r = redis.Redis(host='localhost', port=6379) # 方式 1:上下文管理器 with RedisLock(r, "order:12345", timeout=10): # 临界区代码 process_order() # 方式 2:手动管理 lock = RedisLock(r, "inventory:product1") if lock.acquire(): try: deduct_inventory() finally: lock.release()

⚠️ 常见问题与解决方案

问题 1:锁误删

客户端 A 持有锁,但业务执行时间超过锁超时,锁自动释放。客户端 B 获得锁后,A 执行完成误删 B 的锁。

解决方案:
  • 释放锁时验证锁持有者(使用 UUID 标识)
  • 使用 Lua 脚本保证"检查 + 删除"的原子性
  • 使用看门狗机制自动续期
问题 2:Redis 主从切换导致锁丢失

客户端 A 在主节点加锁,主节点宕机,锁未同步到从节点。从节点提升为主节点后,客户端 B 可以加锁。

解决方案:
  • RedLock 算法:向 N 个独立 Redis 节点加锁
  • 需要获得 N/2+1 个节点的锁才算成功
  • 使用 Redisson 等成熟客户端库
# RedLock 伪代码 def redlock_acquire(resource, ttl): nodes = [redis1, redis2, redis3, redis4, redis5] acquired = 0 start_time = time.time() for node in nodes: if node.set(resource, uuid, nx=True, ex=ttl): acquired += 1 elapsed = time.time() - start_time # 至少 3 个节点成功,且耗时小于 TTL if acquired >= 3 and elapsed < ttl: return True else: # 释放所有节点的锁 redlock_release(resource) return False
📨

场景五:消息队列

使用 Redis 实现轻量级消息队列。

展开

📋 场景描述

异步任务处理、服务解耦、流量削峰等场景,可以使用 Redis 实现轻量级消息队列。

🏗️ 实现方案对比

方案 数据结构 优点 缺点
List LPUSH/BRPOP 简单、高性能 无 ACK、不可重放
Pub/Sub PUBLISH/SUBSCRIBE 实时广播 消息不持久、无消费者组
Stream XADD/XREADGROUP 持久化、ACK、消费者组 Redis 5.0+

🔧 方案一:List 实现队列

# 生产者 LPUSH queue:tasks '{"task_id": 1, "action": "send_email"}' # 消费者(阻塞式) BRPOP queue:tasks 0 # 0 表示无限等待 # Python 示例 import redis r = redis.Redis() # 生产者 def publish_task(task_data): r.lpush("queue:tasks", json.dumps(task_data)) # 消费者 def consume_task(): # 阻塞等待任务 result = r.brpop("queue:tasks", timeout=0) if result: return json.loads(result[1]) return None # 批量消费 def consume_batch(batch_size=10): pipe = r.pipeline() for _ in range(batch_size): pipe.rpop("queue:tasks") tasks = pipe.execute() return [json.loads(t) for t in tasks if t]

🔧 方案二:Stream 实现可靠队列

import redis r = redis.Redis() # 生产者 - 发送消息 def send_message(stream, data): return r.xadd(stream, data) # 创建消费者组 def create_consumer_group(stream, group_name): try: r.xgroup_create(stream, group_name, id="0", mkstream=True) except redis.exceptions.ResponseError: pass # 组已存在 # 消费者 - 读取消息 def consume_messages(stream, group_name, consumer_name, count=10): messages = r.xreadgroup( groupname=group_name, consumername=consumer_name, streams={stream: ">"}, # ">" 表示新消息 count=count, block=5000 # 阻塞 5 秒 ) return messages # 确认消息处理完成 def ack_message(stream, group_name, message_id): r.xack(stream, group_name, message_id) # 处理pending 消息(失败重试) def process_pending(stream, group_name, consumer_name): pending = r.xpending(stream, group_name, min=0, max=10000, count=10) for msg in pending: if msg['consumer'] == consumer_name: # 重新获取消息 messages = r.xclaim( stream, group_name, consumer_name, min_idle_time=60000, # 空闲 60 秒的消息 message_ids=[msg['message_id']] ) return messages return None # 使用示例 send_message("orders", {"order_id": "123", "action": "process"}) create_consumer_group("orders", "order_processors") messages = consume_messages("orders", "order_processors", "worker-1") for stream, msgs in messages: for msg_id, data in msgs: process_order(data) ack_message("orders", "order_processors", msg_id)

⚠️ 常见问题与解决方案

问题 1:消息丢失

消费者处理消息时宕机,消息丢失。

解决方案:
  • 使用 Stream + 消费者组 + ACK 机制
  • 消息处理完成后再确认
  • 定期处理 pending 消息
问题 2:消息重复消费

网络问题导致 ACK 未送达,消息被重新投递。

解决方案:
  • 实现幂等性:通过唯一 ID 去重
  • 记录已处理的消息 ID
  • 业务层面保证幂等
# 幂等性处理 processed_key = f"processed:{message_id}" if r.set(processed_key, "1", nx=True, ex=86400): # 首次处理 process_message(data) else: # 重复消息,跳过 pass
🚦

场景六:限流控制

API 限流、防刷、速率控制。

展开

📋 场景描述

保护系统免受过载攻击,限制用户或 IP 的请求频率。

🔧 方案一:固定窗口计数

import redis r = redis.Redis() def rate_limit_fixed(user_id, limit=100, window=60): """ 固定窗口限流 limit: 窗口内最大请求数 window: 窗口大小(秒) """ key = f"rate:{user_id}" current = r.incr(key) if current == 1: # 第一次请求,设置过期时间 r.expire(key, window) if current > limit: return False # 超出限制 return True # 允许请求 # 使用示例 if rate_limit_fixed("user:1001", limit=100, window=60): process_request() else: return error("请求过于频繁")

🔧 方案二:滑动窗口

def rate_limit_sliding(user_id, limit=100, window=60): """ 滑动窗口限流 使用 ZSet 记录每个请求的时间戳 """ key = f"rate:sliding:{user_id}" now = time.time() window_start = now - window # 删除窗口外的数据 r.zremrangebyscore(key, 0, window_start) # 获取当前窗口内的请求数 current = r.zcard(key) if current >= limit: return False # 添加当前请求 r.zadd(key, {str(uuid.uuid4()): now}) r.expire(key, window) return True

🔧 方案三:令牌桶算法

def rate_limit_token_bucket(user_id, capacity=100, rate=10): """ 令牌桶限流 capacity: 桶容量 rate: 每秒生成令牌数 """ key = f"rate:bucket:{user_id}" now = time.time() # 获取当前桶状态 bucket = r.hgetall(key) if not bucket: # 初始化:满桶 tokens = capacity last_update = now else: tokens = float(bucket[b'tokens']) last_update = float(bucket[b'last_update']) # 计算新增令牌 elapsed = now - last_update new_tokens = elapsed * rate tokens = min(capacity, tokens + new_tokens) if tokens < 1: return False # 没有令牌 # 消耗令牌 tokens -= 1 r.hset(key, mapping={"tokens": tokens, "last_update": now}) r.expire(key, 60) return True

🔧 方案四:使用 RedisCell 模块

# RedisCell 是专门的限流模块 # 实现令牌桶算法 def rate_limit_redis_cell(user_id, max_requests=100, per_second=10): """ 使用 RedisCell 限流 max_requests: 桶容量 per_second: 每秒补充令牌数 """ # CL.THROTTLE key max_burst rate_per_second count [timeout] result = r.execute_command( "CL.THROTTLE", f"rate:cell:{user_id}", max_requests, # 桶容量 per_second, # 每秒补充数 1, # 消耗 1 个令牌 1 # 超时时间 ) # 返回值:[是否限流,剩余容量,重置时间,重试延迟] limited, remaining, reset, retry_after = result return limited == 0 # 0 表示未限流 # 使用示例 if rate_limit_redis_cell("user:1001", max_requests=100, per_second=10): process_request() else: return error("请求受限")

⚠️ 常见问题与解决方案

问题:分布式限流不一致

多个 Redis 节点数据不同步,导致限流失效。

解决方案:
  • 使用单个 Redis 实例处理限流(可接受单点)
  • 使用 RedisCell 模块,原子性更好
  • 适当放宽限制,容忍小范围超限
🛒

场景七:购物车

使用 Hash 结构实现高效的购物车功能。

展开

📋 场景描述

电商应用中,购物车需要支持添加商品、修改数量、删除商品、计算总价等操作,且需要快速响应。

🏗️ 实现方案

# 使用 Hash 存储购物车 # key: cart:{user_id} # field: product_id # value: 购买数量 # 示例数据 cart:user:1001 = { "product:101": "2", # 商品 101,数量 2 "product:102": "1", # 商品 102,数量 1 "product:103": "3" # 商品 103,数量 3 } # 商品详情存储在另一个 Hash product:101 = { "name": "iPhone 15", "price": "7999", "stock": "100" }

🔧 核心命令

命令 说明 示例
HSET key field value 添加/更新商品 HSET cart:1001 product:101 2
HGET key field 获取商品数量 HGET cart:1001 product:101
HGETALL key 获取购物车全部商品 HGETALL cart:1001
HINCRBY key field increment 增加商品数量 HINCRBY cart:1001 product:101 1
HDEL key field 删除商品 HDEL cart:1001 product:101
HLEN key 获取商品种类数 HLEN cart:1001
EXPIRE key seconds 设置过期时间 EXPIRE cart:1001 2592000

💻 代码示例

import redis import json r = redis.Redis(host='localhost', port=6379, decode_responses=True) class ShoppingCart: def __init__(self, user_id): self.user_id = user_id self.cart_key = f"cart:{user_id}" self.product_prefix = "product:" def add_item(self, product_id, quantity=1): """添加商品到购物车""" # 检查商品库存 stock = r.hget(f"{self.product_prefix}{product_id}", "stock") if stock and int(stock) < quantity: return {"success": False, "message": "库存不足"} # 添加到购物车(如果已存在则累加) r.hincrby(self.cart_key, product_id, quantity) # 设置 30 天过期 r.expire(self.cart_key, 2592000) return {"success": True} def remove_item(self, product_id): """删除商品""" r.hdel(self.cart_key, product_id) return {"success": True} def update_quantity(self, product_id, quantity): """更新商品数量""" if quantity <= 0: return self.remove_item(product_id) r.hset(self.cart_key, product_id, str(quantity)) return {"success": True} def get_cart(self): """获取购物车详情""" cart_items = r.hgetall(self.cart_key) result = [] total = 0 for product_id, quantity in cart_items.items(): product = r.hgetall(f"{self.product_prefix}{product_id}") if product: price = float(product.get('price', 0)) subtotal = price * int(quantity) total += subtotal result.append({ 'product_id': product_id, 'name': product.get('name'), 'price': price, 'quantity': int(quantity), 'subtotal': subtotal }) return { 'items': result, 'total': total, 'item_count': len(result) } def clear(self): """清空购物车""" r.delete(self.cart_key) return {"success": True} # 使用示例 cart = ShoppingCart("user:1001") cart.add_item("101", 2) # 添加商品 101,数量 2 cart.add_item("102", 1) # 添加商品 102,数量 1 cart.update_quantity("101", 3) # 修改商品 101 数量为 3 cart_details = cart.get_cart() # 获取购物车详情

⚠️ 常见问题与解决方案

问题 1:购物车数据丢失

Redis 故障导致用户购物车数据丢失,影响用户体验。

解决方案:
  • 开启 AOF 持久化,每秒同步
  • 重要用户购物车同步到数据库
  • 未登录用户购物车可接受丢失
问题 2:库存超卖

多个用户同时购买,库存扣减不一致。

解决方案:
  • 加入购物车时不扣库存,下单时扣减
  • 使用 Lua 脚本原子性检查并扣减库存
  • 设置库存预占机制
📊

场景八:计数器/统计

高性能计数器,用于点赞、浏览、下载等统计。

展开

📋 场景描述

文章阅读量、视频播放量、点赞数、下载计数等场景,需要高并发写入和实时读取。

🏗️ 实现方案

# 简单计数器 article:views:12345 = 10000 # 文章 12345 的阅读量 # 多维度统计 article:stats:12345 = { "views": 10000, # 阅读 "likes": 500, # 点赞 "comments": 120, # 评论 "shares": 80 # 分享 } # 日统计(用于趋势分析) article:daily:12345:2024-01-01 = { "views": 1000, "likes": 50 }

🔧 核心命令

命令 说明 示例
INCR key 自增 1 INCR article:views:12345
INCRBY key increment 自增指定值 INCRBY article:views:12345 5
DECR key 自减 1 DECR article:likes:12345
HINCRBY key field increment Hash 字段自增 HINCRBY article:stats:12345 views 1
GET key 获取计数值 GET article:views:12345
MGET key1 key2 批量获取 MGET article:views:1 article:views:2

💻 代码示例

import redis r = redis.Redis(host='localhost', port=6379, decode_responses=True) class Counter: def __init__(self): self.prefix = "stats" def increment(self, target_type, target_id, field="views", amount=1): """增加计数""" key = f"{self.prefix}:{target_type}:{target_id}" return r.hincrby(key, field, amount) def decrement(self, target_type, target_id, field="views", amount=1): """减少计数""" key = f"{self.prefix}:{target_type}:{target_id}" return r.hincrby(key, field, -amount) def get_count(self, target_type, target_id, field="views"): """获取计数""" key = f"{self.prefix}:{target_type}:{target_id}" count = r.hget(key, field) return int(count) if count else 0 def get_all_stats(self, target_type, target_id): """获取所有统计""" key = f"{self.prefix}:{target_type}:{target_id}" return r.hgetall(key) def get_multiple_counts(self, keys): """批量获取多个计数""" if not keys: return {} values = r.mget(keys) return {key: int(val) if val else 0 for key, val in zip(keys, values)} # 使用示例 counter = Counter() # 文章统计 counter.increment("article", "12345", "views") # 阅读量 +1 counter.increment("article", "12345", "likes") # 点赞 +1 counter.increment("article", "12345", "views", 5) # 阅读量 +5 # 获取统计 views = counter.get_count("article", "12345", "views") all_stats = counter.get_all_stats("article", "12345") # 批量获取 keys = [ "stats:article:12345", "stats:article:12346", "stats:article:12347" ] counts = counter.get_multiple_counts(keys) # 视频统计 counter.increment("video", "v001", "plays") # 播放量 +1 counter.increment("video", "v001", "likes") # 点赞 +1 counter.decrement("video", "v001", "likes") # 取消点赞

💡 进阶功能

# 1. 异步持久化到数据库 def sync_to_db(): """定期将 Redis 计数同步到数据库""" cursor = 0 while True: cursor, keys = r.scan(cursor, match="stats:article:*", count=100) for key in keys: data = r.hgetall(key) # 解析 key 获取 article_id parts = key.split(":") article_id = parts[2] # 更新数据库 db.execute(""" INSERT INTO article_stats (article_id, views, likes) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE views = VALUES(views), likes = VALUES(likes) """, (article_id, data.get('views', 0), data.get('likes', 0))) if cursor == 0: break # 2. 防刷限制(同一用户/IP 单位时间内只能计数一次) def increment_with_limit(target_type, target_id, user_id, field="views"): """带限流的计数""" limit_key = f"limit:{target_type}:{target_id}:{user_id}:{field}" # 使用 SET NX EX 实现限流 if r.set(limit_key, "1", nx=True, ex=60): # 60 秒内只能计数一次 return counter.increment(target_type, target_id, field) return None # 超出限制 # 3. 实时排行榜结合 def get_hot_articles(time_range="daily", top_n=10): """获取热门排行(综合阅读、点赞等)""" # 计算热度分数:views*1 + likes*5 + comments*10 articles = [] cursor = 0 while True: cursor, keys = r.scan(cursor, match=f"stats:article:*", count=100) for key in keys: stats = r.hgetall(key) article_id = key.split(":")[2] score = (int(stats.get('views', 0)) * 1 + int(stats.get('likes', 0)) * 5 + int(stats.get('comments', 0)) * 10) articles.append((article_id, score)) if cursor == 0: break # 按热度排序 articles.sort(key=lambda x: x[1], reverse=True) return articles[:top_n]

⚠️ 常见问题与解决方案

问题 1:计数不准确

Redis 宕机导致计数丢失,与数据库不一致。

解决方案:
  • 开启 AOF 持久化(everysec 策略)
  • 定期异步同步到数据库
  • 接受小范围误差(统计类应用通常可接受)
  • 重要计数使用数据库 + Redis 双写
问题 2:恶意刷量

黑产批量请求刷高计数。

解决方案:
  • 基于用户 ID 或 IP 限流
  • 使用布隆过滤器去重
  • 后端分析异常模式
  • 验证码验证
📍

场景九:地理位置服务

基于位置的搜索、附近的人、配送范围等。

展开

📋 场景描述

外卖配送、打车服务、社交应用等需要基于地理位置进行查询和匹配的场景。

🏗️ 实现方案

# 使用 GeoHash 存储位置 # key: geo:drivers (司机位置) # member: driver_id # longitude, latitude: 经度、纬度 # 示例 GEOADD geo:drivers 116.4074 39.9041 driver:001 # 北京 GEOADD geo:drivers 121.4737 31.2304 driver:002 # 上海

🔧 核心命令

命令 说明 示例
GEOADD key lng lat member 添加位置 GEOADD geo:users 116.4 39.9 user:1
GEODIST key member1 member2 计算距离 GEODIST geo:users user:1 user:2 km
GEORADIUS key lng lat radius 范围内搜索 GEORADIUS geo:users 116.4 39.9 10 km
GEORADIUSBYMEMBER key member radius 查询某人附近的 GEORADIUSBYMEMBER geo:users user:1 5 km
GEOPOS key member 获取位置坐标 GEOPOS geo:users user:1
GEOHASH key member 获取 GeoHash GEOHASH geo:users user:1

💻 代码示例(附近的人)

import redis r = redis.Redis(host='localhost', port=6379, decode_responses=True) class LocationService: def __init__(self, key_prefix="geo"): self.key_prefix = key_prefix def add_location(self, category, member_id, longitude, latitude): """添加位置""" key = f"{self.key_prefix}:{category}" return r.geoadd(key, longitude, latitude, member_id) def add_locations_batch(self, category, locations): """批量添加位置 locations: [(member_id, lng, lat), ...] """ key = f"{self.key_prefix}:{category}" members = [(lng, lat, mid) for mid, lng, lat in locations] return r.geoadd(key, *members) def find_nearby(self, category, longitude, latitude, radius=5, unit='km', with_dist=False): """查找附近的人/对象""" key = f"{self.key_prefix}:{category}" return r.georadius( key, longitude, latitude, radius, unit=unit, with_dist=with_dist, with_coord=True, count=20 ) def find_nearby_member(self, category, member_id, radius=5, unit='km'): """查找某人附近的其他人""" key = f"{self.key_prefix}:{category}" return r.georadiusbymember( key, member_id, radius, unit=unit, with_dist=True, with_coord=True ) def get_distance(self, category, member1, member2, unit='km'): """计算两个成员之间的距离""" key = f"{self.key_prefix}:{category}" return r.geodist(key, member1, member2, unit=unit) def get_location(self, category, member_id): """获取成员位置""" key = f"{self.key_prefix}:{category}" return r.geopos(key, member_id) def remove_location(self, category, member_id): """删除位置""" key = f"{self.key_prefix}:{category}" return r.zrem(key, member_id) # 使用示例 location = LocationService() # 添加用户位置 location.add_location("users", "user:001", 116.4074, 39.9041) # 北京 location.add_location("users", "user:002", 116.3974, 39.9141) # 北京附近 location.add_location("users", "user:003", 121.4737, 31.2304) # 上海 # 批量添加 users = [ ("user:004", 116.4174, 39.8941), ("user:005", 116.3874, 39.9241), ] location.add_locations_batch("users", users) # 查找 user:001 附近 5km 内的人 nearby = location.find_nearby_member("users", "user:001", radius=5) print(f"附近的人:{nearby}") # 计算距离 distance = location.get_distance("users", "user:001", "user:002") print(f"距离:{distance} km") # 查找北京中心 10km 内的所有司机 drivers_nearby = location.find_nearby("drivers", 116.4074, 39.9041, radius=10)

💡 应用场景

# 1. 外卖配送范围检查 def check_delivery_available(user_lng, user_lat, restaurant_id): """检查是否在配送范围内""" # 获取餐厅位置 restaurant_pos = location.get_location("restaurants", restaurant_id) if not restaurant_pos: return False # 计算距离 distance = r.geodist( "geo:restaurants", restaurant_id, f"user:{user_lng}:{user_lat}" ) # 假设配送范围 5km return distance and distance <= 5000 # 2. 打车 - 查找附近司机 def find_nearby_drivers(passenger_lng, passenger_lat, radius=3): """查找附近 3km 内的可用司机""" drivers = location.find_nearby( "drivers:available", passenger_lng, passenger_lat, radius=radius, unit='km' ) return drivers # 3. 社交 - 更新用户位置并查找附近的人 def update_and_find_nearby(user_id, lng, lat, radius=10): """更新位置并查找附近的人""" # 更新位置 location.add_location("users:online", user_id, lng, lat) # 设置 30 分钟过期(用户下线自动移除) r.expire(f"geo:users:online", 1800) # 查找附近的人(排除自己) nearby = location.find_nearby_member( "users:online", user_id, radius=radius ) return [n for n in nearby if n[0] != user_id]

⚠️ 常见问题与解决方案

问题 1:精度问题

GeoHash 精度有限,极近距离可能不够准确。

解决方案:
  • 使用更精确的距离计算公式(Haversine)
  • 结合数据库的 GIS 功能
  • 接受 Redis Geo 的精度(通常足够)
问题 2:位置频繁更新

司机/用户位置频繁变化,大量写入操作。

解决方案:
  • 降低更新频率(如 30 秒一次)
  • 只在移动超过阈值时更新
  • 使用批处理减少网络往返