from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名称.settings')
app = Celery('项目名称')
# Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs. app.autodiscover_tasks()
在同一目录下的 __init__.py 添加
1 2 3 4 5 6 7
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app
__all__ = ('celery_app',)
在 core 应用目录下新建 tasks.py 文件测试
1 2 3 4 5 6 7
from __future__ import absolute_import from celery import shared_task
@shared_task def add(a, b): return a + b
开启 Celery
1
celery -A 项目名 worker -l info
测试任务:在 ./manage.py shell 中测试
1 2 3 4 5
from core.tasks import add add(1, 2) r = add.delay(1, 2) r r.get()
require_approval = models.CharField(max_length=200, verbose_name='授权模式', choices=(('auto', 'auto'), ('force', 'force')), help_text='填写 auto 则相同 scope 仅第一次需要确认,填写 force 则每次都需要确认', default=get_require_approval_default)
defget_allowed_schemes(self): """ Returns the list of redirect schemes allowed by the Application. By default, returns `ALLOWED_REDIRECT_URI_SCHEMES`. """
defget_all_scopes(self): """ Return a dict-like object with all the scopes available in the system. The key should be the scope name and the value should be the description. ex: {"read": "A read scope", "write": "A write scope"} """
c = cache.get('oauth:all_scopes')
if c: return c
scopes_dict = {} for scope_group_key, scope_group_value in oauth2_settings.SCOPES.items(): for scope_key, scope_value in scope_group_value.items(): if'get'in scope_value: if scope_group_key == scope_key: scopes_dict.update({'{}'.format(scope_group_key): scope_value['get']}) else: scopes_dict.update({'{}.{}'.format(scope_group_key, scope_key): scope_value['get']}) if'read'in scope_value: if scope_group_key == scope_key: scopes_dict.update({'{}:read'.format(scope_group_key): scope_value['read']}) else: scopes_dict.update({'{}.{}:read'.format(scope_group_key, scope_key): scope_value['read']}) if'write'in scope_value: if scope_group_key == scope_key: scopes_dict.update({'{}:write'.format(scope_group_key): scope_value['write']}) else: scopes_dict.update({'{}.{}:write'.format(scope_group_key, scope_key): scope_value['write']})
defget_available_scopes(self, application=None, request=None, *args, **kwargs): """ Return a list of scopes available for the current application/request. ex: ["read", "write"] """
if application isNone: return ['basic']
application_id = application.id application_cache_label = 'oauth:available_scopes:{}'.format(application_id) c = cache.get(application_cache_label) if c: return c
all_scopes = list(self.get_all_scopes().keys()) if application.allow_all_scopes: application_scopes = all_scopes else: application_scopes_pre = application.allow_scopes.split() application_scopes = [x for x in application_scopes_pre if x in all_scopes]
defget_default_scopes(self, application=None, request=None, *args, **kwargs): """ Return a list of the default scopes for the current application/request. This MUST be a subset of the scopes returned by `get_available_scopes`. ex: ["read"] """
from oauth2_provider.views import AuthorizationView as BaseAuthorizationView from oauth2_provider.scopes import get_scopes_backend from oauth2_provider.models import get_access_token_model, get_application_model from oauth2_provider.settings import oauth2_settings from oauth2_provider.exceptions import OAuthToolkitError
defget(self, request, *args, **kwargs): try: scopes, credentials = self.validate_authorization_request(request) except OAuthToolkitError as error: # Application is not available at this time. return self.error_response(error, application=None)
all_scopes = get_scopes_backend().get_all_scopes() kwargs["scopes_descriptions"] = [all_scopes[scope] for scope in scopes] kwargs["scopes"] = scopes # at this point we know an Application instance with such client_id exists in the database
self.oauth2_data = kwargs # following two loc are here only because of https://code.djangoproject.com/ticket/17795 form = self.get_form(self.get_form_class()) kwargs["form"] = form
# Check to see if the user has already granted access and return # a successful response depending on "approval_prompt" url parameter
require_approval = application.require_approval
try: # If skip_authorization field is True, skip the authorization screen even # if this is the first use of the application and there was no previous authorization. # This is useful for in-house applications-> assume an in-house applications # are already approved. if application.skip_authorization: uri, headers, body, status = self.create_authorization_response( request=self.request, scopes=" ".join(scopes), credentials=credentials, allow=True ) return self.redirect(uri, application)
# check past authorizations regarded the same scopes as the current one for token in tokens: if token.allow_scopes(scopes): uri, headers, body, status = self.create_authorization_response( request=self.request, scopes=" ".join(scopes), credentials=credentials, allow=True ) return self.redirect(uri, application)
except OAuthToolkitError as error: return self.error_response(error, application)