npb_data_app / stats.py
patrickramos's picture
Choose OFFB implementation of HR/FB
c775359
raw
history blame
21 kB
import polars as pl
from data import data_df
# from enum import Enum
from types import SimpleNamespace
from convert import verify_and_return_presult, verify_and_return_basic_ball_kind_code
class Player:
PITCHER = 'pitcher'
BATTER = 'batter'
BOTH = 'both'
STATS = {}
def assert_value(value, options):
assert value in options, f'Expected one of {options}, got {value}'
def register_stat(name, expr, percent, percentile, batted_ball=False):
assert name not in STATS, f'"{name}" already registered, returns {STATS[name]}'
assert_value(percentile, ('pitcher', 'batter', 'both', None))
STATS[name] = dict(expr=expr.alias(name), percent=percent, percentile=percentile, batted_ball=batted_ball)
def get_stat(stat):
return STATS[stat]
def get_stats(stats):
return [get_stat(stat) for stat in stats]
def get_stat_val(stat, key, default=None):
return STATS[stat][key] if stat in STATS else default
def get_stats_val(stats, key, default=None):
return [get_stat_val(stat, key, default) for stat in stats]
valid_pitch = pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)
is_ball = pl.col('presult').is_in(verify_and_return_presult(['Ball', 'Walk']))
is_non_ball = pl.col('pitch') & ~is_ball # pitches that are not balls i.e. no catcher interference, etc.
is_two_str = pl.col('before_s') == 2 # named this way in case I use two_str for 2-Str%
first_count = (pl.col('before_s') == 0) & (pl.col('before_b') == 0)
is_bip_out = pl.col('presult').is_in(verify_and_return_presult([
'Groundout', 'Flyout', 'Lineout', 'Groundout (Double play)',
'Foul fly', 'Foul line (?)',
'Sacrifice bunt', 'Sacrifice fly',
"Fielder's choice", "Sacrifice fielder's choice"
]))
pa = pl.col('pa_code').unique().len()
# to-do: unify PA calculation
# pl.col('pa_code').unique().len() or pl.col('PA').first()
register_stat('FB Velo', pl.col('FB Velo').max(), False, Player.PITCHER)
register_stat('K%', pl.when(pl.col('presult').str.contains('strikeout')).then(1).otherwise(0).sum() / pl.col('pa_code').unique().len(), True, Player.PITCHER)
register_stat('BB%', pl.when(pl.col('presult') == 'Walk').then(1).otherwise(0).sum() / pl.col('pa_code').unique().len(), True, Player.BATTER)
register_stat('Swing%', pl.col('swing').sum() / pl.col('pitch').sum(), True, Player.BOTH)
register_stat('Z-Swing%', (pl.col('swing') & pl.col('zone')).sum() / pl.col('zone').sum(), True, Player.BATTER)
register_stat('Chase%', (pl.col('swing') & ~pl.col('zone')).sum() / (~pl.col('zone')).sum(), True, Player.PITCHER)
register_stat('Contact%', (pl.col('swing') & ~pl.col('whiff')).sum()/pl.col('swing').sum(), True, Player.BATTER)
register_stat('Z-Con%', (pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(pl.col('zone') & pl.col('swing')).sum(), True, Player.BATTER)
register_stat('O-Con%', (~pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(~pl.col('zone') & pl.col('swing')).sum(), True, Player.BATTER)
register_stat('Whiff%', pl.col('whiff').sum() / pl.col('swing').sum(), True, Player.PITCHER)
register_stat('SwStr%', pl.col('whiff').sum() / pl.col('pitch').sum(), True, Player.PITCHER)
register_stat('CSW%', pl.col('csw').sum() / pl.col('pitch').sum(), True, Player.PITCHER)
register_stat('Ball%', is_ball.sum() / pl.col('pitch').sum(), True, Player.BATTER)
register_stat('Strike%', is_non_ball.sum() / pl.col('pitch').sum(), True, Player.PITCHER)
register_stat('F-Str%', (is_non_ball & first_count).sum() / first_count.sum(), True, Player.PITCHER)
register_stat('PAR%', ((is_two_str & pl.col('presult').str.contains('strikeout')).sum()) / is_two_str.sum(), True, Player.PITCHER)
register_stat('PLUS%', (pl.col('csw') | (pl.col('presult') == 'Foul') | is_bip_out).sum() / pl.col('pitch').sum(), True, Player.PITCHER)
register_stat('Behind%', ((pl.col('before_b') > pl.col('before_s')) & (pl.col('before_s') < 2) & (pl.col('before_b') > 1)).sum() / pl.len(), True, Player.BATTER)
register_stat('Zone%', pl.col('zone').sum() / pl.col('pitch').sum(), True, Player.PITCHER)
register_stat('Glove%', (pl.when(pl.col('pitLR') == 'r').then(pl.col('x') < 0).otherwise(pl.col('x') > 0)).mean(), True, None)
register_stat('Arm%', (pl.when(pl.col('pitLR') == 'r').then(pl.col('x') >= 0).otherwise(pl.col('x') <= 0)).mean(), True, None)
register_stat('High%', (pl.col('y') > 125).mean(), True, None)
register_stat('Low%', (pl.col('y') <= 125).mean(), True, None)
register_stat('MM%', (pl.col('x').is_between(-20, 20) & pl.col('y').is_between(100, 100+50)).mean(), True, None)
register_stat('Sec%', (pl.col('basic_ballKind_code').is_in(verify_and_return_basic_ball_kind_code(['BR', 'OS']))).sum() / pl.col('pitch').sum(), True, None)
register_stat('GB%', pl.col('G') + pl.col('B'), True, Player.PITCHER, True)
register_stat('FB%', pl.col('F') + pl.col('P'), True, Player.BATTER, True)
register_stat('LD%', pl.col('L'), True, Player.BATTER, True)
register_stat('IFFB%', pl.col('P'), True, Player.PITCHER, True)
register_stat('OFFB%', pl.col('F'), True, Player.BATTER, True)
register_stat('AIR%', pl.col('F') + pl.col('P') + pl.col('L'), True, Player.BATTER, True)
register_stat('HR%', (pl.col('presult') == 'Home run').sum() / pa, True, Player.BATTER)
register_stat('HR/FB', (pl.col('presult') == 'Home run').sum() / (pl.col('aux_bresult').struct.field('batType').is_in(['F', 'P'])).sum(), True, Player.BATTER)
# note: to match bouno-san's data, HR/FB bust be /FB for pitchers and /OFFB for batters;
# /OFFB matches both player types for DeltaGraphs, so we use /OFFB in implementaiton and /FB in na,e
# register_stat('Usage', pl.col('count')/pl.sum('count').over('pitId'), True, None)
register_stat('Usage', pl.len()/pl.first('Pitches'), True, None)
register_stat('Avg Velo', pl.when(valid_pitch).then('mph').mean(), False, None)
register_stat('Max Velo', pl.col('mph').max(), False, None)
def filter_data_by_date_and_game_kind(data, start_date=None, end_date=None, game_kind=None):
if start_date is not None:
data = data.filter(pl.col('date') >= start_date)
if end_date is not None:
data = data.filter(pl.col('date') <= end_date)
if game_kind is not None:
data = data.filter(pl.col('coarse_game_kind') == game_kind)
return data
def compute_team_games(data):
data = (
data
.with_columns(
pl.col('gameId').unique().len().over('HomeTeamNameES').alias('home_games'),
pl.col('gameId').unique().len().over('VisitorTeamNameES').alias('visitor_games')
)
)
game_data = (
data
.group_by('HomeTeamNameES')
.first()
[['HomeTeamNameES', 'home_games']]
.rename({'HomeTeamNameES': 'team'})
.join(
(
data
.group_by('VisitorTeamNameES')
.first()
[['VisitorTeamNameES', 'visitor_games']]
.rename({'VisitorTeamNameES': 'team'})
),
on='team',
how='full'
)
.fill_null(0)
.with_columns(
(pl.col('home_games')+pl.col('visitor_games')).alias('games'),
pl.when(pl.col('team').is_null())
.then(pl.col('team_right'))
.otherwise(pl.col('team')).alias('team')
)
)
return (
data
.drop('home_games', 'visitor_games')
.join(
game_data[['team', 'games']].rename({'games': 'home_games'}),
left_on='HomeTeamNameES',
right_on='team'
)
.join(
game_data[['team', 'games']].rename({'games': 'visitor_games'}),
left_on='VisitorTeamNameES',
right_on='team'
)
)
def compute_pitch_stats(data, player_type, pitch_class_type, min_pitches=1, pitcher_lr='both', batter_lr='both', group_by_team=False):
assert pitcher_lr in ('both', 'l', 'r')
assert batter_lr in ('both', 'l', 'r')
assert player_type in ('pitcher', 'batter', 'team pitching', 'team batting')
assert pitch_class_type in ('general', 'specific')
# pitching or batting, player or team
pitching = player_type in ('pitcher', 'team pitching')
team = player_type in ('team pitching', 'team batting')
# handedness filters
if pitcher_lr != 'both':
data = data.filter(pl.col('pitLR') == pitcher_lr)
if batter_lr != 'both':
data = data.filter(pl.col('batLR') == batter_lr)
if pitching:
over_col = 'pitId' if not team else 'pitcher_team_name_short'
else:
over_col = 'batId' if not team else 'batter_team_name_short'
# id_cols = ['pitId' if player_type == 'pitcher' else 'batId']
# team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short'
# if group_by_team:
# id_cols.append(team_col)
# col names
match player_type:
case 'pitcher':
id_cols = ['pitId']
name_col = 'pitcher_name'
case 'batter':
id_cols = ['batId']
name_col = 'batter_name'
case _:
id_cols = []
name_col = None
team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short'
if group_by_team or team:
id_cols.append(team_col)
handedness_col = 'pitLR' if pitching else 'batLR'
new_handedness_col = 'Throws' if pitching else 'Bats'
# name_col = 'pitcher_name' if player_type == 'pitcher' else 'batter_name'
pitch_col = 'ballKind_code' if pitch_class_type == 'specific' else 'general_ballKind_code'
pitch_name_col = 'ballKind' if pitch_class_type == 'specific' else 'general_ballKind'
pitch_stats = (
data
.with_columns(
(pl.col('ballSpeed') / 1.609).round(1).alias('mph'),
pl.when(pl.col('general_ballKind_code').is_in(['4S', 'FC', 'SI'])).then(pl.when(valid_pitch).then('ballSpeed').mean().over(over_col, 'general_ballKind_code')).mul(1/1.609).round(1).alias('FB Velo'),
pl.len().over(over_col).alias('Pitches')
)
.group_by(*id_cols, pitch_col)
.agg(
*([pl.col(name_col).first()] if not team else []),
*([] if group_by_team or team else [pl.col(team_col).last()]),
*(
[pl.col(handedness_col).first().str.to_uppercase().alias(new_handedness_col) ]
if not (team and ((pitcher_lr == 'both') if pitching else (batter_lr == 'both')))
else []
),
# pl.first(name_col),
# pl.col('pitLR').first().str.to_uppercase().alias('Throws'),
*([pl.first('general_ballKind')] if pitch_class_type == 'specific' else []),
pl.first(pitch_name_col),
pl.len().alias('count'),
# pl.when(pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)).then('ballSpeed').mean().alias('Avg KPH'),
# pl.col('ballSpeed').max().alias('Max KPH'),
# pl.when(pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)).then('mph').mean().round(1).alias('Avg MPH'),
# pl.col('mph').max().alias('Max MPH'),
pl.col('aux_bresult').struct.field('batType').drop_nulls().value_counts(normalize=True),
# swing,
# z_swing,
# chase,
# contact,
# z_con,
# o_con,
# whiff,
# swstr,
# csw,
# strike,
# ball,
# f_strike,
# par,
# zone,
# glove,
# arm,
# high,
# low,
# mm,
# behind
*[stat['expr'] for stat in STATS.values() if not stat['batted_ball']]
)
.with_columns(
# (pl.col('count')/pl.sum('count').over('pitId')).alias('usage'),
# get_stat_val('Usage', 'expr'),
(pl.col('count') >= min_pitches).alias('qualified'),
)
.explode('batType')
.unnest('batType')
.pivot(on='batType', values='proportion')
.fill_null(0)
.with_columns(
*[stat['expr'] for stat in STATS.values() if stat['batted_ball']]
# (pl.col('G') + pl.col('B')).alias('GB%'),
# (pl.col('F') + pl.col('P')).alias('FB%'),
# pl.col('L').alias('LD%'),
# pl.col('P').alias('IFFB%'),
# pl.col('F').alias('OFFB%'),
# (pl.col('F') + pl.col('P') + pl.col('L')).alias('AIR%')
)
.drop('G', 'F', 'B', 'P', 'L', 'null')
.with_columns(
# (pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=((stat in ['FB%', 'LD%', 'OFFB%', 'AIR%', 'Ball%', 'Behind%'] or 'Contact%' in stat)))/pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl')
# for stat in ['Avg KPH', 'Max KPH', 'Avg MPH', 'Max MPH', 'Swing%', 'Z-Swing%', 'Chase%', 'Contact%', 'Z-Contact%', 'O-Contact%', 'SwStr%', 'Whiff%', 'CSW%', 'Strike%', 'Ball%', 'F-Str%', 'PAR%', 'GB%', 'FB%', 'LD%', 'OFFB%', 'IFFB%', 'AIR%', 'Zone%', 'Behind%']
(pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=(get_stat_val(stat, 'percentile')) not in (Player.PITCHER if pitching else Player.BATTER, Player.BOTH))
/
pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl')
for stat in STATS.keys()
)
.rename({pitch_col: 'ballKind_code', pitch_name_col: 'ballKind'} if pitch_class_type == 'general' else {})
.sort(id_cols[0], 'count', descending=[False, True])
)
return pitch_stats
def compute_player_stats(data, player_type, qual='qualified', pitcher_lr='both', batter_lr='both', group_by_team=False):
# TO-DO: figure out if I still need group_by_team
assert pitcher_lr in ('both', 'l', 'r')
assert batter_lr in ('both', 'l', 'r')
assert player_type in ('pitcher', 'batter', 'team pitching', 'team batting')
# pitching or batting, player or team
pitching = player_type in ('pitcher', 'team pitching')
team = player_type in ('team pitching', 'team batting')
# handedness filters
if pitcher_lr != 'both':
data = data.filter(pl.col('pitLR') == pitcher_lr)
if batter_lr != 'both':
data = data.filter(pl.col('batLR') == batter_lr)
if pitching:
over_col = 'pitId' if not team else 'pitcher_team_name_short'
else:
over_col = 'batId' if not team else 'batter_team_name_short'
data = (
compute_team_games(data)
.with_columns(
pl.when(pl.col('half_inning').str.ends_with('1')).then('home_games').otherwise('visitor_games').first().over('pitId').alias('games'),
# pl.col('inning_code').unique().len().over(over_col).alias('IP'),
(pl.col('bso').struct.field('o').cast(pl.Int32) - pl.col('beforeBso').struct.field('o').cast(pl.Int32)).sum().mul(1/3).over(over_col).alias('IP'),
pl.col('pa_code').unique().len().over(over_col).alias('PA'),
pl.col('presult').is_in(verify_and_return_presult([
'Single', 'Double', 'Triple', 'Home run', 'Inside-the-park home run',
'Groundout', 'Flyout', 'Lineout', 'Groundout (Double play)',
'Foul fly', 'Foul line (?)',
'Error', 'Sacrifice hit error', 'Sacrifice fly error',
"Fielder's choice",
'Bunt strikeout', 'Swinging strikeout', 'Looking strikeout'
])).sum().over(over_col).alias('AB'),
pl.len().over(over_col).alias('Pitches')
# pl.col('presult').is_in(verify_and_return_presult([
# 'Groundout', 'Flyout', 'Lineout', 'Groundout (Double play)',
# 'Foul fly', 'Foul line (?)',
# 'Sacrifice bunt', 'Sacrifice fly',
# "Fielder's choice", "Sacrifice fielder's choice",
# 'Bunt strikeout', 'Swinging strikeout', 'Looking strikeout'
# ])).sum().over('pitId').mul(1/3).alias('IP')
)
)
# qualifiers
qualified_factor = 1 if pitching else 3.1
qual_col = 'IP' if pitching else 'PA'
if qual == 'qualified':
data = data.with_columns((pl.col(qual_col) >= qualified_factor * pl.col('games')).alias('qualified'))
else:
data = data.with_columns((pl.col(qual_col) >= qual).alias('qualified'))
# percentile ascending/descending
# if pitching:
# stat_descending_pctl = lambda stat: stat in ['BB%', 'Ball%', 'FB%', 'LD%', 'OFFB%', 'AIR%', 'Z-Swing%', 'Behind%', 'OBP'] or 'Contact%' in stat
# else:
# stat_descending_pctl = lambda stat: not (stat in ['BB%', 'Ball%', 'FB%', 'LD%', 'OFFB%', 'AIR%', 'Swing%', 'Z-Swing%', 'Behind%', 'OBP'] or 'Contact%' in stat)
# col names
match player_type:
case 'pitcher':
id_cols = ['pitId']
name_col = 'pitcher_name'
case 'batter':
id_cols = ['batId']
name_col = 'batter_name'
case _:
id_cols = []
name_col = None
team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short'
if group_by_team or team:
id_cols.append(team_col)
handedness_col = 'pitLR' if pitching else 'batLR'
new_handedness_col = 'Throws' if pitching else 'Bats'
player_stats = (
data
.with_columns(
(pl.col('ballSpeed') / 1.609).round(1).alias('mph'),
pl.when(pl.col('general_ballKind_code').is_in(['4S', 'FC', 'SI'])).then(pl.when(valid_pitch).then('ballSpeed').mean().over(over_col, 'general_ballKind_code')).mul(1/1.609).round(1).alias('FB Velo')
)
.group_by(id_cols)
.agg(
*([pl.col(name_col).first()] if not team else []),
*([] if group_by_team or team else [pl.col(team_col).last()]),
*(
[pl.col(handedness_col).first().str.to_uppercase().alias(new_handedness_col) ]
if not (team and ((pitcher_lr == 'both') if pitching else (batter_lr == 'both')))
else []
),
pl.col('IP').first(),
pl.col('PA').first(),
# pl.col('FB Velo').max(),
# (pl.when(pl.col('presult').str.contains('strikeout')).then(1).otherwise(0).sum() / pl.col('pa_code').unique().len()).alias('K%'),
# (pl.when(pl.col('presult') == 'Walk').then(1).otherwise(0).sum() / pl.col('pa_code').unique().len()).alias('BB%'),
pl.col('aux_bresult').struct.field('batType').drop_nulls().value_counts(normalize=True),
# swing,
# z_swing,
# chase,
# contact,
# z_con,
# o_con,
# whiff,
# swstr,
# csw,
# strike,
# ball,
# f_strike,
# par,
# zone,
# glove,
# arm,
# high,
# low,
# mm,
# behind,
# pl.col('AB').first(),
# h,
# bb,
# hbp,
# sf,
# obp,
# pl.first('qualified')
pl.first('qualified'),
*[stat['expr'] for stat in STATS.values() if not stat['batted_ball']]
)
.explode('batType')
.unnest('batType')
.pivot(on='batType', values='proportion')
.fill_null(0)
.with_columns(
*[stat['expr'] for stat in STATS.values() if stat['batted_ball']]
# (pl.col('G') + pl.col('B')).alias('GB%'),
# (pl.col('F') + pl.col('P')).alias('FB%'),
# pl.col('L').alias('LD%'),
# pl.col('P').alias('IFFB%'),
# pl.col('F').alias('OFFB%'),
# (pl.col('F') + pl.col('P') + pl.col('L')).alias('AIR%')
)
.drop('G', 'F', 'B', 'P', 'L')
.with_columns(
# (pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=stat_descending_pctl(stat))/pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl')
# for stat in ['FB Velo', 'K%', 'BB%', 'Swing%', 'Z-Swing%', 'Chase%', 'Contact%', 'Z-Contact%', 'O-Contact%', 'SwStr%', 'Whiff%', 'CSW%', 'Strike%', 'Ball%', 'F-Str%', 'PAR%', 'GB%', 'FB%', 'LD%', 'OFFB%', 'IFFB%', 'AIR%', 'Zone%', 'Behind%', 'OBP']
(pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=(get_stat_val(stat, 'percentile')) not in (Player.PITCHER if pitching else Player.BATTER, Player.BOTH))
/
pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl')
for stat in STATS.keys()
)
.sort(qual_col, descending=True)
)
return player_stats
def get_pitcher_stats(id, lr='both', game_kind=None, start_date=None, end_date=None, min_ip=1, min_pitches=1, pitch_class_type='specific'):
source_data = data_df
source_data = filter_data_by_date_and_game_kind(source_data, start_date=start_date, end_date=end_date, game_kind=game_kind)
# if lr is not None:
# source_data =
pitch_stats = compute_pitch_stats(source_data, player_type='pitcher', pitch_class_type=pitch_class_type, min_pitches=min_pitches, batter_lr=lr, group_by_team=False).filter(pl.col('pitId') == id)
pitch_shapes = (
(source_data.filter(pl.col('batLR') == lr) if lr != 'both' else source_data)
.filter(
(pl.col('pitId') == id) &
pl.col('x').is_not_null() &
pl.col('y').is_not_null() &
(pl.col('ballSpeed') > 0)
)
[['pitId', 'general_ballKind_code', 'ballKind_code', 'ballSpeed', 'x', 'y']]
.with_columns((pl.col('ballSpeed')/1.609).alias('ballSpeed_mph'))
)
pitcher_stats = compute_player_stats(source_data, player_type='pitcher', qual=min_ip, batter_lr=lr, group_by_team=False).filter(pl.col('pitId') == id)
return SimpleNamespace(pitcher_stats=pitcher_stats, pitch_stats=pitch_stats, pitch_shapes=pitch_shapes)