添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
#下面这三个写自己的 accId = '8aaf0708809721d00180f4d8f1d32010' accToken = 'ba9c1afcd60945bd83370063567df3c5' appId = '8aaf0708809721d00180f4d8f2db2017' def send_message(sms_code,mobile,expire=5): sdk = SmsSDK(accId=accId,accToken=accToken,appId=appId) # 准备数据 tid = '1' datas = ('%s'%sms_code,'%s'%expire) # 发送短信 res = sdk.sendMessage(tid=tid,mobile=mobile,datas=datas) # 解析json数据 data = json.loads(res) # 判断发送短信是否成功 if data.get('statusCode') == '000000': return True return False

然后在编写发送短信验证码视图

在views里面编写

from workers.utils import send_message
from workers.tasks import send_message
class Sms_Code(APIView):
    def get(self,request):
        mobile = request.query_params.get('mobile')
        print(mobile)
        if mobile and len(mobile) == 11:
            redis_con = redis.Redis(host='127.0.0.1',port=6379)
            flag_key = '%s_flag'%mobile
            sms_key = '%s_sms'%mobile
            if redis_con.get(flag_key):
                return Response({
                    'code':400,
                    'msg':'操作过于频繁'
            else:
                sms_code  = random.randint(1000,9999)
                redis_con.set(sms_key,sms_code,ex=300)
                # 是否频繁发送
                redis_con.set(flag_key,1,ex=60)
                # 发送短信
                 result = send_message(sms_code,mobile)  #同步执行,django会阻塞在这里
                # 提交一个异步任务
                 if result:
                     return Response({'code':200,'msg':"发送短信成功"})
                 return Response({'code':501,'msg':"发送短信失败"})
        else:
            return Response({'code':"400",'msg':'手机号不合法'})

2.异步发送短信验证码

首先下载celery

pip install celery==4.4.7

pip install eventlet==0.26.1

celery的介绍和使用 见博客:

异步处理Celery5大核心和工作流程_w_ang__ang的博客-CSDN博客

在项目根目录同名下创建.py文件

编写以下代码:

from celery import Celery
import os
from django.conf import settings
# Celery 需要依赖django的配置文件
# 配置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE','p6_0518.settings')
# 创建应用
app = Celery('djangoWorker')
# 简单配置
app.conf.broker_url = 'redis://@127.0.0.1:6379/5'
app.conf.result_backend = 'redis://@127.0.0.1:6379/6'
# 自动发现任务
app.autodiscover_tasks(settings.INSTALLED_APPS)

然后在子应用创建.py文件

 编写以下代码:

from p6_0518.celery import app
from ronglian_sms_sdk import SmsSDK
from django.conf import settings
import json
# 定义任务函数
@app.task
def send_message(sms_code, mobile, expire=5):
    sdk = SmsSDK(accId=settings.ACCID, accToken=settings.ACCTOKEN, appId=settings.APPID)
    # 准备数据
    tid = '1'
    datas = ('%s' % sms_code, '%s' % expire)
    # 发送短信
    res = sdk.sendMessage(tid=tid, mobile=mobile, datas=datas)
    # 解析json数据
    data = json.loads(res)
    # 判断发送短信是否成功
    # if data.get('statusCode') == '000000':
    #     return True
    # return False
    return data

编写views里面的视图方法:

from workers.utils import send_message
from workers.tasks import send_message
class Sms_Code(APIView):
    def get(self,request):
        mobile = request.query_params.get('mobile')
        print(mobile)
        if mobile and len(mobile) == 11:
            redis_con = redis.Redis(host='127.0.0.1',port=6379)
            flag_key = '%s_flag'%mobile
            sms_key = '%s_sms'%mobile
            if redis_con.get(flag_key):
                return Response({
                    'code':400,
                    'msg':'操作过于频繁'
            else:
                sms_code  = random.randint(1000,9999)
                redis_con.set(sms_key,sms_code,ex=300)
                # 是否频繁发送
                redis_con.set(flag_key,1,ex=60)
                # 发送短信
                # result = send_message(sms_code,mobile)  #同步执行,django会阻塞在这里
                # 提交一个异步任务
                res  = send_message.delay(sms_code,mobile) #异步执行,django不会等待
                # if result:
                #     return Response({'code':200,'msg':"发送短信成功"})
                # return Response({'code':501,'msg':"发送短信失败"})
                return Response({'code':200,'msg':"短信已发送请注意查收"})
        else:
            return Response({'code':"400",'msg':'手机号不合法'})

配置路由:

这样整个功能就做好了

注意:django需要启动celery

celery -A p6_0518 worker -l info -P eventlet

下载:pip install ronglian_sms_sdk容联云地址:容联云通讯_短信平台、手机验证码、语音验证码、IM即时通讯、云呼叫系统等互联网通信服务注册账号:这三个记录下来添加测试手机号全部完成1.非异步创建一个.py文件注:这里是在子应用里面创建然后封装发送短信验证码方法注这里的 accId accToken appId 就是上面让记得东西from ronglian_sms_sdk import SmsSDKimport... 三,celery异步发送短信—过程。 1.python3安装celery(环境:windows10;python版本:python3.9) pip install -U Celery 2.在项目根目录创建一个单独的包,用来存放celery相关的配置文件和主要
文章目录1.注册容联云账号1.1 注册账号1.2 登录即可看到开发者账号信息1.3 添加测试账号2.使用容联云发送代码测试3.在视图函数中使用3.1 写试图函数3.2 在verifications/urls.py中添加路由 1.注册容联云账号 1.1 注册账号 https://www.yuntongxun.com/user/login 1.2 登录即可看到开发者账号信息 1.3 添加测试账号 2.使用容联云发送代码测试 '''1. 安装容联云sdk''' pip install ronglian
后端分离的状态下,短信接入时,因为后端需要通过用户提交的验证码需要做校验,所以短信应该发给后端。 这里我是用的是容联云通讯,它在开通账户后可以提供部分费用的免费测试。 1、URL 我们在使用的时候需要向完整的url发送一个https的请求。其中accountSid就是你注册时的个人账户id。funcodes我们就使用本身的TemplateSMS?,SigParameter就需要我们根据人家的要求来写出代码完成。 import datetime import hashlib class Sms(): url = ‘http://106.ihuyi.com/webservice/sms.php?method=Submit’ mobile = ‘xxxxxx’ #电话号 account= ‘xxxxx’ #上面的APIID password = ‘xxxxx’ #为上图的APIKEY...
所以新手使用celery很仔细的建立文件夹名字、文件夹层级、python文件名字。 所以网上的celery博客教程虽然很多,但是并不能学会使用,因为要运行起来需要以下6个方面都掌握好,博客文字很难表达清楚或者没有写全面以下6个方面。 celery消费任务不执行或者报错NotRegistered,与很多方面有关系,如果要别人排错,至少要发以下6方面的截图,因为与一下6点关系很大。 1)整个项目目录结构, 2)@task入参 ,3)celery的配置,4)celery的配置 include ,5)cmd命令行启动参数 --queues= 的值,6)用户在启动cmd命令行时候,用户所在的文件夹。 在不规范的文件夹路径下,使用celery难度很高,一般教程都没教。 [项目文件夹目录格式不规范下的celery使用演示](https://github.com/ydf0509/celery_demo) 。 此国产分布式函数调度框架 funboost python万能通用函数加速器 https://funboost.readthedocs.io/ , 从用法调用难度,用户所需代码量,超高并发性能,qps控频精确程度,支持的中间件类型,任务控制方式,稳定程度等19个方面全方位超过celery。发布性能提高1000%,消费性能提高2000%。 pip install funboost