generated from FraserParlane/template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.py
279 lines (229 loc) · 6.39 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
from datetime import datetime
from typing import Tuple
from tqdm import tqdm
import pandas as pd
import berserk
import json
# Create a lichess session
with open('lichess.token') as t:
token = t.read()
session = berserk.TokenSession(token)
client = berserk.Client(session)
def get_top_players(n: int = 200):
"""
Get a list of the top players whose games should be scraped. The maximum
number of players in the leaderboard is 200.
:param n: number of leaders to get
:return: None
"""
leaders = client.users.get_leaderboard('classical', count=n)
with open('leaders.json', 'w') as f:
json.dump(leaders, f)
def get_top_player_games(n: int = int(1E4)):
"""
Get the games of the top players.
:param n: The max number of games for each player.
:return: None
"""
# Read in the leaders, get usernames
with open('leaders.json', 'r') as f:
leaders = json.load(f)
usernames = [l['username'] for l in leaders]
# Define time range for game scraping
date_start = berserk.utils.to_millis(datetime(1900, 12, 8))
date_stop = berserk.utils.to_millis(datetime(2050, 12, 9))
# Create an empty file
fname = 'games.json'
with open(fname, 'w') as f:
f.write('[]')
# Loop through the players
for username in tqdm(usernames):
# Get games
while True:
try:
games = client.games.export_by_player(
username=username,
since=date_start,
until=date_stop,
max=n,
)
break
except:
print(f'{username} failed.')
pass
games = list(games)
# Store
with open(fname, 'r') as f:
previous = json.load(f)
store = previous + games
with open(fname, 'w') as f:
f.write(json.dumps(store, default=str))
def format_games_into_pandas():
"""
Format the games into a pandas dataframe
:return: None
"""
# Read in raw data
df = pd.read_json('games.json')
# Save
df.to_feather('games.feather')
def clean_data():
"""
Clean the game data.
:return: None
"""
# Read in the feather file
df = pd.read_feather('games.feather')
# Remove duplicates
df = df.drop_duplicates(subset=['id'])
print(len(df))
# Reset index
df.reset_index(inplace=True)
# Save
df.to_feather('games-clean.feather')
def process_data():
"""
Process the play data
:return: None
"""
# Read in the data
df = pd.read_feather('games-clean.feather')
# Create a place to store the resulting moves
rows = []
# Iter through the rows
for i, row in tqdm(df.iterrows(), total=len(df)):
# Skip if no moves played
if row['moves'] == '':
continue
# Process the moves
moves = row.moves.split(' ')
# For each move
for j, move in enumerate(moves):
# Get special moves
white = True if j % 2 == 0 else False
check = True if move.endswith('+') else False
mate = True if move.endswith('#') else False
q_castle = True if 'O-O' in move else False
k_castle = True if 'O-O-O' in move else False
kill = True if 'x' in move else False
promote = True if '=Q' in move else False
# Deal with Castling first
if q_castle or k_castle:
castle_base = dict(
white=white,
kill=False,
check=False,
mate=False,
)
castle_base['posy'] = 0 if white else 7
if q_castle:
r = castle_base | dict(piece='K', posx=6)
rows.append(r)
r = castle_base | dict(piece='R', posx=5)
rows.append(r)
else:
r = castle_base | dict(piece='K', posx=2)
rows.append(r)
r = castle_base | dict(piece='R', posx=3)
rows.append(r)
continue
# Get posx, posy
pos = move_to_pos(
move=move,
white=white,
promote=promote,
check=check,
mate=mate,
)
# Record position
posx, posy = pos_to_coord(pos)
# Get piece
piece = move_to_piece(move)
r = dict(
white=white,
piece=piece,
posx=posx,
posy=posy,
kill=kill,
check=check,
mate=mate,
)
rows.append(r)
# Save
df = pd.DataFrame(rows)
df.to_feather('plays.feather')
def add_row(
df: pd.DataFrame,
d: dict,
) -> pd.DataFrame:
"""
Add a dictionary to a dataframe
:param df:
:param d:
:return:
"""
comb = pd.concat(
[df,
pd.DataFrame(
d,
index=[0],
)],
ignore_index=True,
)
return comb
def move_to_piece(
move: str,
) -> str:
"""
Extract a piece from a move.
:param move: move string
:return: piece
"""
if move[0] not in ['Q', 'K', 'B', 'N', 'R']:
return 'P'
else:
return move[0]
def move_to_pos(
move: str,
promote: bool,
white: bool,
check: bool,
mate: bool,
) -> str:
"""
extract the position (i.e. 'd2') from a move (i.e. 'Nfd2').
:param move: move
:param promote: has a pawn been promoted
:param white: is white turn
:param check: Is check
:param mate: Is mate
:return: pos
"""
# If not a castling or
if '=' in move and (check or mate):
return move[-5:-3]
elif '=' in move:
return move[-4:-2]
elif check or mate:
return move[-3:-1]
else:
return move[-2:]
def pos_to_coord(
pos: str,
) -> Tuple[int, int]:
"""
Convert a string ('a2') into a coordinate [0, 1].
:param pos: Position.
:return: List of int.
"""
coord = (
ord(pos[0]) - 97,
int(pos[1]) - 1,
)
return coord
if __name__ == '__main__':
get_top_players(n=200)
get_top_player_games(n=1000)
format_games_into_pandas()
clean_data()
process_data()