#下面这三个写自己的
accId = '8aaf0708809721d00180f4d8f1d32010'
accToken = 'ba9c1afcd60945bd83370063567df3c5'
appId = '8aaf0708809721d00180f4d8f2db2017'
def send_message(sms_code,mobile,expire=5):
sdk = SmsSDK(accId=accId,accToken=accToken,appId=appId)
# 准备数据
tid = '1'
datas = ('%s'%sms_code,'%s'%expire)
# 发送短信
res = sdk.sendMessage(tid=tid,mobile=mobile,datas=datas)
# 解析json数据
data = json.loads(res)
# 判断发送短信是否成功
if data.get('statusCode') == '000000':
return True
return False
然后在编写发送短信验证码视图
在views里面编写
from workers.utils import send_message
from workers.tasks import send_message
class Sms_Code(APIView):
def get(self,request):
mobile = request.query_params.get('mobile')
print(mobile)
if mobile and len(mobile) == 11:
redis_con = redis.Redis(host='127.0.0.1',port=6379)
flag_key = '%s_flag'%mobile
sms_key = '%s_sms'%mobile
if redis_con.get(flag_key):
return Response({
'code':400,
'msg':'操作过于频繁'
else:
sms_code = random.randint(1000,9999)
redis_con.set(sms_key,sms_code,ex=300)
# 是否频繁发送
redis_con.set(flag_key,1,ex=60)
# 发送短信
result = send_message(sms_code,mobile) #同步执行,django会阻塞在这里
# 提交一个异步任务
if result:
return Response({'code':200,'msg':"发送短信成功"})
return Response({'code':501,'msg':"发送短信失败"})
else:
return Response({'code':"400",'msg':'手机号不合法'})
2.异步发送短信验证码
首先下载celery
pip install celery==4.4.7
pip install eventlet==0.26.1
celery的介绍和使用 见博客:
异步处理Celery5大核心和工作流程_w_ang__ang的博客-CSDN博客
在项目根目录同名下创建.py文件
编写以下代码:
from celery import Celery
import os
from django.conf import settings
# Celery 需要依赖django的配置文件
# 配置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE','p6_0518.settings')
# 创建应用
app = Celery('djangoWorker')
# 简单配置
app.conf.broker_url = 'redis://@127.0.0.1:6379/5'
app.conf.result_backend = 'redis://@127.0.0.1:6379/6'
# 自动发现任务
app.autodiscover_tasks(settings.INSTALLED_APPS)
然后在子应用创建.py文件
编写以下代码:
from p6_0518.celery import app
from ronglian_sms_sdk import SmsSDK
from django.conf import settings
import json
# 定义任务函数
@app.task
def send_message(sms_code, mobile, expire=5):
sdk = SmsSDK(accId=settings.ACCID, accToken=settings.ACCTOKEN, appId=settings.APPID)
# 准备数据
tid = '1'
datas = ('%s' % sms_code, '%s' % expire)
# 发送短信
res = sdk.sendMessage(tid=tid, mobile=mobile, datas=datas)
# 解析json数据
data = json.loads(res)
# 判断发送短信是否成功
# if data.get('statusCode') == '000000':
# return True
# return False
return data
编写views里面的视图方法:
from workers.utils import send_message
from workers.tasks import send_message
class Sms_Code(APIView):
def get(self,request):
mobile = request.query_params.get('mobile')
print(mobile)
if mobile and len(mobile) == 11:
redis_con = redis.Redis(host='127.0.0.1',port=6379)
flag_key = '%s_flag'%mobile
sms_key = '%s_sms'%mobile
if redis_con.get(flag_key):
return Response({
'code':400,
'msg':'操作过于频繁'
else:
sms_code = random.randint(1000,9999)
redis_con.set(sms_key,sms_code,ex=300)
# 是否频繁发送
redis_con.set(flag_key,1,ex=60)
# 发送短信
# result = send_message(sms_code,mobile) #同步执行,django会阻塞在这里
# 提交一个异步任务
res = send_message.delay(sms_code,mobile) #异步执行,django不会等待
# if result:
# return Response({'code':200,'msg':"发送短信成功"})
# return Response({'code':501,'msg':"发送短信失败"})
return Response({'code':200,'msg':"短信已发送请注意查收"})
else:
return Response({'code':"400",'msg':'手机号不合法'})
配置路由:
这样整个功能就做好了
注意:django需要启动celery
celery -A p6_0518 worker -l info -P eventlet
下载:pip install ronglian_sms_sdk容联云地址:容联云通讯_短信平台、手机验证码、语音验证码、IM即时通讯、云呼叫系统等互联网通信服务注册账号:这三个记录下来添加测试手机号全部完成1.非异步创建一个.py文件注:这里是在子应用里面创建然后封装发送短信验证码方法注这里的 accId accToken appId 就是上面让记得东西from ronglian_sms_sdk import SmsSDKimport...
三,celery异步发送短信—过程。
1.python3安装celery(环境:windows10;python版本:python3.9)
pip install -U Celery
2.在项目根目录创建一个单独的包,用来存放celery相关的配置文件和主要
文章目录1.注册容联云账号1.1 注册账号1.2 登录即可看到开发者账号信息1.3 添加测试账号2.使用容联云发送代码测试3.在视图函数中使用3.1 写试图函数3.2 在verifications/urls.py中添加路由
1.注册容联云账号
1.1 注册账号
https://www.yuntongxun.com/user/login
1.2 登录即可看到开发者账号信息
1.3 添加测试账号
2.使用容联云发送代码测试
'''1. 安装容联云sdk'''
pip install ronglian
前后端分离的状态下,短信接入时,因为后端需要通过用户提交的验证码需要做校验,所以短信应该发给后端。
这里我是用的是容联云通讯,它在开通账户后可以提供部分费用的免费测试。
1、URL
我们在使用的时候需要向完整的url发送一个https的请求。其中accountSid就是你注册时的个人账户id。funcodes我们就使用本身的TemplateSMS?,SigParameter就需要我们根据人家的要求来写出代码完成。
import datetime
import hashlib
class Sms():
url = ‘http://106.ihuyi.com/webservice/sms.php?method=Submit’
mobile = ‘xxxxxx’ #电话号
account= ‘xxxxx’ #上面的APIID
password = ‘xxxxx’ #为上图的APIKEY...
所以新手使用celery很仔细的建立文件夹名字、文件夹层级、python文件名字。
所以网上的celery博客教程虽然很多,但是并不能学会使用,因为要运行起来需要以下6个方面都掌握好,博客文字很难表达清楚或者没有写全面以下6个方面。
celery消费任务不执行或者报错NotRegistered,与很多方面有关系,如果要别人排错,至少要发以下6方面的截图,因为与一下6点关系很大。
1)整个项目目录结构, 2)@task入参 ,3)celery的配置,4)celery的配置 include ,5)cmd命令行启动参数 --queues= 的值,6)用户在启动cmd命令行时候,用户所在的文件夹。
在不规范的文件夹路径下,使用celery难度很高,一般教程都没教。
[项目文件夹目录格式不规范下的celery使用演示](https://github.com/ydf0509/celery_demo) 。
此国产分布式函数调度框架 funboost python万能通用函数加速器 https://funboost.readthedocs.io/ ,
从用法调用难度,用户所需代码量,超高并发性能,qps控频精确程度,支持的中间件类型,任务控制方式,稳定程度等19个方面全方位超过celery。发布性能提高1000%,消费性能提高2000%。
pip install funboost