import urllib, time, urlparse
# Django imports
from django.db.models.signals import post_save, post_delete
from django.db import models
from django.contrib.auth.models import User
from django.contrib import admin
from django.core.mail import send_mail, mail_admins
# Piston imports
from managers import TokenManager, ConsumerManager, ResourceManager
from signals import consumer_post_save, consumer_post_delete
KEY_SIZE = 18
SECRET_SIZE = 32
VERIFIER_SIZE = 10
CONSUMER_STATES = (
('pending', 'Pending'),
('accepted', 'Accepted'),
('canceled', 'Canceled'),
('rejected', 'Rejected')
)
def generate_random(length=SECRET_SIZE):
return User.objects.make_random_password(length=length)
class Nonce(models.Model):
token_key = models.CharField(max_length=KEY_SIZE)
consumer_key = models.CharField(max_length=KEY_SIZE)
key = models.CharField(max_length=255)
def __unicode__(self):
return u"Nonce %s for %s" % (self.key, self.consumer_key)
admin.site.register(Nonce)
class Consumer(models.Model):
name = models.CharField(max_length=255)
description = models.TextField()
key = models.CharField(max_length=KEY_SIZE)
secret = models.CharField(max_length=SECRET_SIZE)
status = models.CharField(max_length=16, choices=CONSUMER_STATES, default='pending')
user = models.ForeignKey(User, null=True, blank=True, related_name='consumers')
objects = ConsumerManager()
def __unicode__(self):
return u"Consumer %s with key %s" % (self.name, self.key)
def generate_random_codes(self):
"""
Used to generate random key/secret pairings. Use this after you've
added the other data in place of save().
c = Consumer()
c.name = "My consumer"
c.description = "An app that makes ponies from the API."
c.user = some_user_object
c.generate_random_codes()
"""
key = User.objects.make_random_password(length=KEY_SIZE)
secret = generate_random(SECRET_SIZE)
while Consumer.objects.filter(key__exact=key, secret__exact=secret).count():
secret = generate_random(SECRET_SIZE)
self.key = key
self.secret = secret
self.save()
admin.site.register(Consumer)
class Token(models.Model):
REQUEST = 1
ACCESS = 2
TOKEN_TYPES = ((REQUEST, u'Request'), (ACCESS, u'Access'))
key = models.CharField(max_length=KEY_SIZE)
secret = models.CharField(max_length=SECRET_SIZE)
verifier = models.CharField(max_length=VERIFIER_SIZE)
token_type = models.IntegerField(choices=TOKEN_TYPES)
timestamp = models.IntegerField(default=long(time.time()))
is_approved = models.BooleanField(default=False)
user = models.ForeignKey(User, null=True, blank=True, related_name='tokens')
consumer = models.ForeignKey(Consumer)
callback = models.CharField(max_length=255, null=True, blank=True)
callback_confirmed = models.BooleanField(default=False)
objects = TokenManager()
def __unicode__(self):
return u"%s Token %s for %s" % (self.get_token_type_display(), self.key, self.consumer)
def to_string(self, only_key=False):
token_dict = {
'oauth_token': self.key,
'oauth_token_secret': self.secret,
'oauth_callback_confirmed': 'true',
}
if self.verifier:
token_dict.update({ 'oauth_verifier': self.verifier })
if only_key:
del token_dict['oauth_token_secret']
return urllib.urlencode(token_dict)
def generate_random_codes(self):
key = User.objects.make_random_password(length=KEY_SIZE)
secret = generate_random(SECRET_SIZE)
while Token.objects.filter(key__exact=key, secret__exact=secret).count():
secret = generate_random(SECRET_SIZE)
self.key = key
self.secret = secret
self.save()
# -- OAuth 1.0a stuff
def get_callback_url(self):
if self.callback and self.verifier:
# Append the oauth_verifier.
parts = urlparse.urlparse(self.callback)
scheme, netloc, path, params, query, fragment = parts[:6]
if query:
query = '%s&oauth_verifier=%s' % (query, self.verifier)
else:
query = 'oauth_verifier=%s' % self.verifier
return urlparse.urlunparse((scheme, netloc, path, params,
query, fragment))
return self.callback
def set_callback(self, callback):
if callback != "oob": # out of band, says "we can't do this!"
self.callback = callback
self.callback_confirmed = True
self.save()
admin.site.register(Token)
# Attach our signals
post_save.connect(consumer_post_save, sender=Consumer)
post_delete.connect(consumer_post_delete, sender=Consumer)