Django REST framwork 认证、权限、频率
作者:互联网
访问admin站点,先修改站点的语言配置
settings.py
LANGUAGE_CODE = 'zh-hans' # 中文 TIME_ZONE = 'Asia/Shanghai' # 时区是亚洲上海 USE_I18N = True # 国际化 USE_L10N = True # 本地化 USE_TZ = True # 数据库是否使用TIME_ZONE,True表示使用上海的时区,False表示不使用,使用UTC时间,然后转成上海,会差8个小时
认证介绍和源码分析
1 只有认证通过的用户才能访问指定的url地址,
比如:查询课程信息,需要登录之后才能查看,没有登录,就不能查看,这时候需要用到认证组件 2 APIVIew--->dispatche--->self.initial--->写的 self.perform_authentication(request)# 认证 self.check_permissions(request) # 权限 self.check_throttles(request) # 频率 3 APIView的perform_authentication -request.user # 新的request对象,drf的Request类 4 Request类的user -被包装成了数据属性,内部有 self._authenticate() -Request类的_authenticate()方法 5 Request类的_authenticate()方法 def _authenticate(self): for authenticator in self.authenticators: try: user_auth_tuple = authenticator.authenticate(self) except exceptions.APIException: self._not_authenticated() raise if user_auth_tuple is not None: self._authenticator = authenticator self.user, self.auth = user_auth_tuple return self._not_authenticated() 6 drf的Request对象实例化是再什么时候? -再APIVIew的dispatch最上面完成的 - return Request( request, parsers=self.get_parsers(), authenticators=self.get_authenticators(), # 看它 negotiator=self.get_content_negotiator(), parser_context=parser_context ) 7 APIView的get_authenticators def get_authenticators(self): return [auth() for auth in self.authentication_classes] -如果我再视图类中写:authentication_classes=[类名,类名1] -返回[对象,对象1]
认证类前奏登录功能,认证类编写
登录功能
## 模型层models.py 先写两个表 class User(models.Model): name = models.CharField(max_length=32) password = models.CharField(max_length=32) class UserToken(models.Model): user = models.OneToOneField(to='User',on_delete=models.CASCADE) token=models.CharField(max_length=32)
登录视图类
# views.py class UserView(ViewSetMixin, CreateAPIView): queryset = models.User.objects.all() serializer_class = serializer.UserModelSerializer # 全局使用下的局部禁用需要配置 authentication_classes = [] # 认证局部禁用 permission_classes = [] # 权限局部禁用 throttle_classes = [] # 频率局部禁用 #基于原生session版 # @action(methods=['POST'], detail=False) # def login(self, request): # username = request.data.get('username') # password = request.data.get('password') # user = models.User.objects.filter(name=username, password=password).first() # request.session['name'] = user.name # request.session['id'] = user.id # print(type(request.session)) # request.session.save() # print(request.session.session_key) # from django.contrib.sessions.backends.db import SessionStore # if user: # return APIResponse(msg='登录成功', token=request.session.session_key) # else: # return APIResponse(status=101, msg='用户名或密码错误') # 基于自己写的UserToken表版 @action(methods=['POST'], detail=False) def login(self, request): username = request.data.get('username') password = request.data.get('password') user = models.User.objects.filter(name=username, password=password).first() token = uuid.uuid4() # 生成一个uuid的随机字符串 # 这个是错误的:user.usertoken是None # user.usertoken.user=user # user.usertoken.token=token # 如果每次都是新增,如果它登录过,这个地方会报错 # models.UserToken.objects.create(user=user,token=token) # 如果有就更新,如果没有就创建 # 根据user去查询,如果能查到,就修改token,如果查不到,就新增一条 models.UserToken.objects.update_or_create(defaults={'token': token}, user=user) if user: return APIResponse(msg='登录成功', token=token) else: return APIResponse(status=101, msg='用户名或密码错误')
路由
# urls.py from django.contrib import admin from django.urls import path,include from rest_framework.routers import SimpleRouter from app01 import views router = SimpleRouter() router.register('user', views.UserView) urlpatterns = [ path('admin/', admin.site.urls), path('api/', include(router.urls)), ]
认证类编写和使用
认证类的编写
####基于自己写的UserToken表(掌握这个就行) class LoginAuth(BaseAuthentication): def authenticate(self, request): token = request.GET.get('token') user_token = models.UserToken.objects.filter(token=token).first() if user_token: # 登录了 # 返回两个值,第一个值,给了新的request对象的user属性,通常情况我们把当前登录用户给它 return user_token.user, '' else: raise AuthenticationFailed('您没有登录') ####基于session的(了解即可) from django.contrib.sessions.models import Session from importlib import import_module from django.conf import settings class LoginAuth(BaseAuthentication): def authenticate(self, request): token = request.GET.get('token') # 通过传入的token(session_key,取到当前key的session对象) engine = import_module(settings.SESSION_ENGINE) self.SessionStore = engine.SessionStore request.session = self.SessionStore(token) Session.objects.filter(session_key=token).first() # print(request.session['name']) if request.session.get('name', None): print(request.session['name']) print(request.session['id']) return '', '' else: raise AuthenticationFailed('您没有登录')
使用认证类(全局用,局部用)
#全局用,setting中配置(所有接口都需要认证) REST_FRAMEWORK={ "DEFAULT_AUTHENTICATION_CLASSES":["app01.auth.LoginAuth",] } # 登录功能需要局部禁用,在视图类中加入 authentication_classes = [] #只在局部使用,只在视图类中加 authentication_classes = [LoginAuth,]
权限类编写和使用
编写权限类
from rest_framework.permissions import BasePermission # 导入BasePermission权限基类 class MyPermission(BasePermission): # 自定义权限类继承BasePermission message='你没有权限' # 自定义返回给前端的访问错误信息 def has_permission(self, request, view): if request.user.user_type == 1: return True else: self.message='你是%s用户,你没有权限'%request.user.get_user_type_display() return False
权限类的使用
# 局部使用(在视图类中加) permission_classes = [MyPermission,] # 全局使用(在配置文件中配置) REST_FRAMEWORK={ "DEFAULT_PERMISSION_CLASSES":["app01.auth.MyPermission",], }
频率类的使用
定义一个频率类
from rest_framework.throttling import BaseThrottle, SimpleRateThrottle class MyThrottle(SimpleRateThrottle): scope = 'ip_th' def get_cache_key(self, request, view): return self.get_ident(request)
在配置文件中配置
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'ip_th': '5/m', #一分钟访问5次 }, }
局部使用,全局使用
# 局部用,在视图类中配置 throttle_classes = [MyThrottle,] # 全局用,在配置文件中配置 REST_FRAMEWORK = { "DEFAULT_THROTTLE_CLASSES": ["app01.auth.MyThrottle", ], 'DEFAULT_THROTTLE_RATES': { 'ip_th': '5/m', #一分钟访问5次 }, }
认证、权限、频率演示总代码
1 注册接口,使用postman注册用户,用户名不能以sb开头,密码必须大于3位,小于16位
2 登陆接口,登陆成功返回token
3 图书增加,图书修改,图书删除,图书查询所有,查询一本5个接口
-所有操作必须登录后才能
-只有超级用户有图书增加,图书修改,图书删除权限
4 所有接口限制每分钟只能访问5次
from django.db import models class User(models.Model): username = models.CharField(max_length=32) password = models.CharField(max_length=32) user_type = models.IntegerField(choices=((1, '超级管理员'), (2, '普通用户'), (3, '2B用户')),default=3) class UserToken(models.Model): user = models.OneToOneField(to='User', on_delete=models.CASCADE) token = models.CharField(max_length=16) class Book(models.Model): name = models.CharField(max_length=32) price = models.DecimalField(max_digits=5,decimal_places=2) publish = models.CharField(max_length=32) # 不要忘了数据库迁移两条命令模型层models.py
from rest_framework.response import Response class APIResponse(Response) : def __init__(self, code=100, msg=None, data=None, status=None, template_name=None, headers=None, exception=False, content_type=None, **kwargs) : dic = {'status' : code, 'msg' : msg} if data : dic['data'] = data if kwargs : dic.update(kwargs) super().__init__(data=dic, status=status, template_name=template_name, headers=headers, exception=exception, content_type=content_type)自定义 response.py
from app01 import models from rest_framework.serializers import ModelSerializer from django.core.exceptions import ValidationError class BookModelSerializer(ModelSerializer): class Meta: model = models.Book fields = "__all__" class UserModelSerializer(ModelSerializer): class Meta: model = models.User fields = "__all__" extra_kwargs = { # 注册密码必须大于3位,小于16位 'password': {'write_only': True, 'max_length': 16, 'min_length': 3} } # 局部钩子: 用户名不能以sb开头 def validate_username(self, data): # data就是当前字段的值 if data.startswith('sb'): raise ValidationError('不能以sb开头') else : return data序列化类 serializer.py
from app01 import models from rest_framework.exceptions import AuthenticationFailed from rest_framework.authentication import BaseAuthentication ####写一个认证类:基于自己写的UserToken表 class LoginAuth(BaseAuthentication): # 重写 authenticate 方法 def authenticate(self, request): # 实现认证逻辑 token = request.GET.get('token') user_token = models.UserToken.objects.filter(token=token).first() if user_token: # 认证通过后,返回一个元组(user对象,token值) return user_token.user,'' else: # 失败后报错,如过要执行后面的认证,则返回None raise AuthenticationFailed('您还没有登录') ####写一个权限类 from rest_framework.permissions import BasePermission class CustomPermission(BasePermission): message = '您没有权限' def has_permission(self, request, view): if hasattr(request,"user") and hasattr(request.user,"user_type") and request.user.user_type == 1: return True else: # 如果该字段用了choice,通过get_字段名_display()就能取出choice后面的中文 self.message = '您是%s用户,您没有权限' % request.user.get_user_type_display() return False ### 写一个频率类 from rest_framework.throttling import SimpleRateThrottle class CustomThrottle(SimpleRateThrottle): # 实例化时就会读取.scope的范围, 根据配置文件中配置的scope去选择频率如:4/s scope = 'admin' # 在此处定义配置文件的控制评率的键 def get_cache_key(self, request, view): ### 返回谁就以谁做限制 #返回当前请求者的ip地址 return self.get_ident(request) # return request.Meta.get('REMOTE_ADDR') #如果想按用户限制 # return request.user.id自定义认证,权限,频率 auth.py
from django.shortcuts import render from app01 import models,serializer from rest_framework.generics import GenericAPIView,ListAPIView,CreateAPIView,RetrieveAPIView,RetrieveUpdateDestroyAPIView from rest_framework.viewsets import ViewSetMixin from rest_framework.viewsets import ModelViewSet from rest_framework.decorators import action from app01.response import APIResponse import uuid class BookView(ViewSetMixin, ListAPIView, RetrieveAPIView): queryset = models.Book.objects.all() serializer_class = serializer.BookModelSerializer permission_classes = [] class BookAdminView(ModelViewSet): queryset = models.Book.objects.all() serializer_class = serializer.BookModelSerializer class UserView(ViewSetMixin, CreateAPIView): queryset = models.User.objects.all() serializer_class = serializer.UserModelSerializer # 全局使用下的局部禁用需要配置 authentication_classes = [] # 认证局部禁用 permission_classes = [] # 权限局部禁用 throttle_classes = [] # 频率局部禁用 # 基于自己写的UserToken表版 @action(methods=['POST'], detail=False) def login(self, request): username = request.data.get('username') password = request.data.get('password') user = models.User.objects.filter(username=username, password=password).first() token = uuid.uuid4() # 生成一个uuid的随机字符串 # 如果有就更新,如果没有就创建 # 根据user去查询,如果能查到,就修改token,如果查不到,就新增一条 models.UserToken.objects.update_or_create(defaults={'token': token}, user=user) if user: return APIResponse(msg='登录成功', token=token) else: return APIResponse(status=101, msg='用户名或密码错误')视图层 views.py
from django.contrib import admin from django.urls import path,include from rest_framework.routers import SimpleRouter from app01 import views router = SimpleRouter() router.register('book', views.BookView) router.register('books', views.BookAdminView) router.register('user', views.UserView) urlpatterns = [ path('admin/', admin.site.urls), path('', include(router.urls)) ]路由层 urls.py
REST_FRAMEWORK = { "DEFAULT_AUTHENTICATION_CLASSES" : ["app01.auth.LoginAuth", ], # 认证 "DEFAULT_PERMISSION_CLASSES" : ["app01.auth.CustomPermission", ], # 权限 "DEFAULT_THROTTLE_CLASSES" : ["app01.auth.CustomThrottle", ], # 频率 'DEFAULT_THROTTLE_RATES' : { 'admin' : '5/m', # 一分钟访问5次 }, }配置文件 settings.py
标签:models,self,request,framwork,REST,Django,token,user,import 来源: https://www.cnblogs.com/gfeng/p/14675103.html