-
Notifications
You must be signed in to change notification settings - Fork 2
/
blti.py
169 lines (133 loc) · 5.51 KB
/
blti.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
try:
from django.http import HttpResponseForbidden
except:
def HttpResponseForbidden(error_message):
raise Exception(error_message)
from functools import partial, wraps
import oauth2 as oauth
import time
import os
import string
class OAuthInvalidError(Exception):
pass
def sign_oauth_with_params(consumer_key, consumer_secret, url, parameters, method='POST', is_form_encoded=True):
consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
oauth_request = oauth.Request(method=method, url=url, parameters=parameters, is_form_encoded=is_form_encoded)
hmac = oauth.SignatureMethod_HMAC_SHA1()
oauth_request.sign_request(hmac, consumer, None)
# Ensure signature is unicode string
if isinstance(oauth_request['oauth_signature'], bytes):
oauth_request['oauth_signature'] = oauth_request['oauth_signature'].decode('utf-8')
return oauth_request
def verify_oauth_with_params(consumer_key, consumer_secret, url, parameters, method='POST'):
oauth_request = oauth.Request(method=method, url=url, parameters=parameters)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
oauth_consumer = oauth.Consumer(consumer_key, consumer_secret)
try:
signature = oauth_request.get_parameter('oauth_signature')
# Ensure signature is bytes
if not isinstance(signature, bytes):
signature = signature.encode('utf-8')
except:
raise OAuthInvalidError("missing OAuth signature")
is_valid = signature_method.check(oauth_request, oauth_consumer, None, signature)
return is_valid
LTI_PROPS = {}
def set_lti_properties(consumer_lookup=None, site_url=None, require_post=None, error_func=None, allow_origin=None):
"""
Set the default properties for the lti_provider decorator.
"""
global LTI_PROPS
if consumer_lookup is not None:
LTI_PROPS['consumer_lookup'] = consumer_lookup
if site_url is not None:
LTI_PROPS['site_url'] = site_url
if require_post is not None:
LTI_PROPS['require_post'] = require_post
if error_func is not None:
LTI_PROPS['error_func'] = error_func
if allow_origin is not None:
LTI_PROPS['allow_origin'] = allow_origin
def sign_launch_data(url, launch_data, consumer_key, secret, is_form_encoded=True):
"""
Generate the basic LTI launch data that needs to be POSTed to the given URL.
launch_data -- a dictionary of LTI launch parameters, must contain "resource_link_id".
it is recommended that this also contain "user_id", "resource_link_title", "roles", etc.
"""
chars = string.ascii_uppercase + string.digits + string.ascii_lowercase
nonce_chars = [chars[(x if isinstance(x, int) else ord(x)) % len(chars)]for x in os.urandom(32)]
lti_params = {
'lti_message_type': 'basic-lti-launch-request',
'lti_version': 'LTI-1p0',
'oauth_consumer_key': consumer_key,
'oauth_signature_method': 'HMAC-SHA1',
'oauth_timestamp': int(time.time()),
'oauth_nonce': ''.join(nonce_chars),
'oauth_version': '1.0'
}
lti_params.update(launch_data)
return sign_oauth_with_params(consumer_key, secret, url, lti_params)
def lti_provider(func=None, consumer_lookup=None, site_url=None, require_post=None, error_func=None, allow_origin=None):
"""
Django view decorator to create a basic LTI authenticated provider endpoint to receive bLTI POST requests.
"""
if func is None:
return partial(
lti_provider,
consumer_lookup=consumer_lookup,
site_url=site_url,
require_post=require_post,
error_func=error_func,
allow_origin=allow_origin
)
# Set defaults
if consumer_lookup is None:
consumer_lookup = LTI_PROPS.get('consumer_lookup', {})
if site_url is None:
site_url = LTI_PROPS.get('site_url', None)
if require_post is None:
require_post = LTI_PROPS.get('require_post', True)
if error_func is None:
error_func = LTI_PROPS.get('error_func', HttpResponseForbidden)
if allow_origin is None:
allow_origin = LTI_PROPS.get('allow_origin', '*')
@wraps(func)
def provider(request, *args, **kwargs):
if request.method != 'POST':
if require_post:
return error_func('LTI: a POST request is required')
else:
return func(request, *args, **kwargs)
post_params = request.POST.dict()
consumer_key = post_params.get('oauth_consumer_key', None)
if consumer_key is None:
return error_func('LTI: no consumer key provided')
if callable(consumer_lookup):
consumer_secret = consumer_lookup(consumer_key)
else:
consumer_secret = consumer_lookup.get(consumer_key, None)
if consumer_secret is None:
return error_func('LTI: unknown consumer ' + str(consumer_key))
if site_url:
url = site_url + request.path
else:
url = request.build_absolute_uri()
try:
is_valid = verify_oauth_with_params(
consumer_key,
consumer_secret,
url,
post_params
)
except OAuthInvalidError as err:
return error_func("LTI: " + str(err))
if not is_valid:
return error_func("LTI: unable to authenticate.")
else:
response = func(request, post_params, consumer_key, *args, **kwargs)
if allow_origin:
response['Access-Control-Allow-Origin'] = allow_origin
response['Access-Control-Expose-Headers'] = 'Access-Control-Allow-Origin'
return response
provider.csrf_exempt = True
return provider