wanikaniburned/wanikaniburned.py

235 lines
8.5 KiB
Python

import ujson as json
import re
from datetime import datetime, timedelta
import flask
from flask.ext.login import LoginManager, login_user, login_required, current_user, logout_user, login_fresh, \
confirm_login
from flask.ext.sqlalchemy import SQLAlchemy
from flask.ext.wtf import Form
import requests
from wtforms import StringField
from wtforms.validators import DataRequired
app = flask.Flask(__name__)
app.config.from_pyfile('config.py')
login_manager = LoginManager()
login_manager.init_app(app)
db = SQLAlchemy(app)
filter_regex = re.compile('\d+([,-]\d+)*')
class LoginForm(Form):
api_key = StringField('api_key', validators=[DataRequired()])
class User(db.Model):
__tablename__ = 'users'
api_key = db.Column(db.String, unique=True, primary_key=True)
radicals = db.Column(db.String)
kanji = db.Column(db.String)
vocabulary = db.Column(db.String)
username = db.Column(db.String)
gravatar = db.Column(db.String)
last_updated = db.Column(db.DateTime)
def __init__(self, api_key):
self.api_key = api_key
self.last_updated = datetime.utcnow()
self.parse_radicals_and_userdata()
self.parse_kanji()
self.parse_vocabulary()
db.session.add(self)
db.session.commit()
def is_authenticated(self):
return True
def is_active(self):
return True
def is_anonymous(self):
return False
def get_id(self):
return str(self.api_key)
def parse_radicals_and_userdata(self):
response = requests.get("https://www.wanikani.com/api/user/" + self.api_key + "/radicals/")
response.raise_for_status()
data = response.json()
if data.get('error'):
raise ValueError(data['error']['message'])
self.username = data['user_information']['username']
self.gravatar = data['user_information']['gravatar']
if data.get('requested_information'):
self.radicals = json.dumps(data['requested_information'], ensure_ascii=False)
def parse_kanji(self):
response = requests.get("https://www.wanikani.com/api/user/" + self.api_key + "/kanji/")
response.raise_for_status()
data = response.json()
if data.get('error'):
raise ValueError(data['error']['message'])
if data.get('requested_information'):
self.kanji = json.dumps(data['requested_information'], ensure_ascii=False)
def parse_vocabulary(self):
response = requests.get("https://www.wanikani.com/api/user/" + self.api_key + "/vocabulary/")
response.raise_for_status()
data = response.json()
if data.get('error'):
raise ValueError(data['error']['message'])
if data.get('requested_information'):
self.vocabulary = json.dumps(data['requested_information']['general'], ensure_ascii=False)
def update_all(self):
if (datetime.utcnow() - self.last_updated) > timedelta(hours=1):
self.parse_radicals_and_userdata()
self.parse_kanji()
self.parse_vocabulary()
self.last_updated = datetime.utcnow()
db.session.commit()
else:
raise ValueError('Cannot refresh now, try again later.')
def parse_range(input_range):
if filter_regex.match(input_range):
result = []
components = input_range.split(',')
for item in components:
if item.isdigit():
result.append(int(item))
else:
range_ends = item.split('-')
result.extend(list(range(int(range_ends[0]), int(range_ends[1]) + 1)))
return result
def combine_onyomi_and_kunyomi(onyomi, kunyomi):
if onyomi and kunyomi:
return onyomi + ',' + kunyomi.replace('.*', '')
return onyomi if onyomi else kunyomi.replace('.*', '')
def get_items_with_level_restriction(level_range, item_state, item_types):
items = []
radical_count = 0
loads = json.loads
if 'radical' in item_types:
items.extend({'item_type': 'radical', 'answer': item['meaning'], 'question': item['image']
if item['image'] else item['character']} for item in loads(current_user.radicals)
if item['user_specific'] and item['user_specific']['srs'] in item_state
and item['level'] in level_range)
radical_count = len(items)
kanji_count = 0
if 'kanji' in item_types:
for item in filter((lambda x: x['user_specific'] and x['user_specific']['srs'] in item_state
and x['level'] in level_range), loads(current_user.kanji)):
items.extend([{'item_type': 'kanji', 'question': item['character'],
'answer': combine_onyomi_and_kunyomi(item['onyomi'], item['kunyomi']),
'answer_type': 'kana'},
{'item_type': 'kanji', 'question': item['character'], 'answer': item['meaning'],
'answer_type': 'eng'}])
kanji_count = int((len(items) - radical_count) / 2)
vocabulary_count = 0
if 'vocab' in item_types:
for item in filter((lambda x: x['user_specific'] and x['user_specific']['srs'] in item_state
and x['level'] in level_range), loads(current_user.vocabulary)):
items.extend([{'item_type': 'vocabulary', 'question': item['character'], 'answer': item['kana'],
'answer_type': 'kana'},
{'item_type': 'vocabulary', 'question': item['character'],
'answer': item['meaning'], 'answer_type': 'eng'}])
vocabulary_count = int((len(items) - (radical_count + kanji_count)) / 2)
if not items:
return flask.jsonify(error="No items within these filter parameters")
else:
return flask.jsonify(radical_count=radical_count, kanji_count=kanji_count, vocabulary_count=vocabulary_count,
item_list=items)
@login_manager.user_loader
def load_user(api_key):
return User.query.get(str(api_key))
@app.route('/', methods=('GET', 'POST'))
def show_home():
if current_user.is_authenticated:
return flask.redirect(flask.url_for('show_quiz'))
form = LoginForm()
if form.validate_on_submit():
user = User.query.get(str(form.api_key.data))
if user:
login_user(user, remember=True)
return flask.redirect(flask.url_for('show_quiz'))
else:
try:
new_user = User(str(form.api_key.data))
login_user(new_user, remember=True)
return flask.redirect(flask.url_for('show_quiz'))
except ValueError as err:
flask.flash(err)
return flask.render_template("welcome.html", form=form)
return flask.render_template("welcome.html", form=form)
@login_required
@app.route('/quiz')
def show_quiz():
if not login_fresh():
if User.query.get(current_user.api_key):
confirm_login()
return flask.render_template("quiz.html")
else:
logout_user()
return flask.redirect(flask.url_for('show_home'))
return flask.render_template("quiz.html")
@app.route('/user_items')
@login_required
def get_items():
level_range = list(range(0, 61))
if flask.request.args.get('level_range') and flask.request.args.get('level_range') is not '':
level_range = parse_range(flask.request.args.get('level_range'))
item_state = ['burned']
if flask.request.args.get('item_state') and flask.request.args.get('item_state') is not '':
item_state = flask.request.args.get('item_state').split(',')
item_types = ['radical', 'kanji', 'vocab']
if flask.request.args.get('item_types') and flask.request.args.get('item_types') is not '':
item_types = flask.request.args.get('item_types').split(',')
return get_items_with_level_restriction(level_range, item_state, item_types)
@app.route('/refresh', methods=['POST'])
@login_required
def refresh_api():
try:
current_user.update_all()
return str(datetime_format(current_user.last_updated)), 202
except ValueError as err:
return str(err), 500
@app.route('/logout')
def logout():
logout_user()
return flask.redirect(flask.url_for('show_home'))
@login_manager.unauthorized_handler
def unauthorized():
return flask.redirect(flask.url_for('show_home'))
def datetime_format(input_data):
return input_data.strftime("%d %B %Y %I:%M%p")
if __name__ == '__main__':
db.create_all()
app.jinja_env.filters['datetime_format'] = datetime_format
app.run(threaded=True, port=app.config['PORT'])