我有一个每天运行的celery任务,当模型上的记录有今天的日期时,电子邮件功能将在任务上被触发,并将根据记录的数据发送电子邮件。
任务.py
@app.task(bind=True)
def send_email_on_date(self):
date = datetime.date.today()
records = Mymodel.objects.filter(date=date,
send_email_reminder_to_assignees=True).prefetch_related(
'assignees'
for record in records:
data = {
'homepage_url': settings.WWW_ROOT,
'domain_name': settings.DOMAIN_NAME,
'record_name': record.name,
'record_created': record.created,
'record_date': record.date,
assignees_emails = record.assignees.values_list('email', flat=True)
if not record.is_done:
send_email_from_template(
subject_template_path='',
body_template_path='',
template_data=data,
to_email_list=assignees_emails,
fail_silently=False,
content_subtype='html'
models.py
class Mymodel(models.Model):
name = models.CharField(max_length=500)
is_done = models.BooleanField(default=False)
date = models.DateField(blank=True, null=True)
assignees = models.ManyToManyField(
User,
related_name='records',
blank=True,
send_email_reminder_to_assignees = models.BooleanField(default=False)
tests.py
正如我所说,我的任务使用了一个电子邮件发送函数,在这里,我试图从我使用该函数的celery中模拟该函数。
@pytest.mark.django_db
class TestMyCeleryTask:
@pytest.fixture
def send_email_from_template_mock(self, mocker):
return mocker.patch('tasks.send_email_from_template')
def test_send_email_date(self,