import polars as pl from data import data_df # from enum import Enum from types import SimpleNamespace from convert import verify_and_return_presult, verify_and_return_basic_ball_kind_code class Player: PITCHER = 'pitcher' BATTER = 'batter' BOTH = 'both' STATS = {} def assert_value(value, options): assert value in options, f'Expected one of {options}, got {value}' def register_stat(name, expr, percent, percentile, batted_ball=False): assert name not in STATS, f'"{name}" already registered, returns {STATS[name]}' assert_value(percentile, ('pitcher', 'batter', 'both', None)) STATS[name] = dict(expr=expr.alias(name), percent=percent, percentile=percentile, batted_ball=batted_ball) def get_stat(stat): return STATS[stat] def get_stats(stats): return [get_stat(stat) for stat in stats] def get_stat_val(stat, key, default=None): return STATS[stat][key] if stat in STATS else default def get_stats_val(stats, key, default=None): return [get_stat_val(stat, key, default) for stat in stats] valid_pitch = pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0) is_ball = pl.col('presult').is_in(verify_and_return_presult(['Ball', 'Walk'])) is_non_ball = pl.col('pitch') & ~is_ball # pitches that are not balls i.e. no catcher interference, etc. is_two_str = pl.col('before_s') == 2 # named this way in case I use two_str for 2-Str% first_count = (pl.col('before_s') == 0) & (pl.col('before_b') == 0) is_bip_out = pl.col('presult').is_in(verify_and_return_presult([ 'Groundout', 'Flyout', 'Lineout', 'Groundout (Double play)', 'Foul fly', 'Foul line (?)', 'Sacrifice bunt', 'Sacrifice fly', "Fielder's choice", "Sacrifice fielder's choice" ])) pa = pl.col('pa_code').unique().len() # to-do: unify PA calculation # pl.col('pa_code').unique().len() or pl.col('PA').first() register_stat('FB Velo', pl.col('FB Velo').max(), False, Player.PITCHER) register_stat('K%', pl.when(pl.col('presult').str.contains('strikeout')).then(1).otherwise(0).sum() / pl.col('pa_code').unique().len(), True, Player.PITCHER) register_stat('BB%', pl.when(pl.col('presult') == 'Walk').then(1).otherwise(0).sum() / pl.col('pa_code').unique().len(), True, Player.BATTER) register_stat('Swing%', pl.col('swing').sum() / pl.col('pitch').sum(), True, Player.BOTH) register_stat('Z-Swing%', (pl.col('swing') & pl.col('zone')).sum() / pl.col('zone').sum(), True, Player.BATTER) register_stat('Chase%', (pl.col('swing') & ~pl.col('zone')).sum() / (~pl.col('zone')).sum(), True, Player.PITCHER) register_stat('Contact%', (pl.col('swing') & ~pl.col('whiff')).sum()/pl.col('swing').sum(), True, Player.BATTER) register_stat('Z-Con%', (pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(pl.col('zone') & pl.col('swing')).sum(), True, Player.BATTER) register_stat('O-Con%', (~pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(~pl.col('zone') & pl.col('swing')).sum(), True, Player.BATTER) register_stat('Whiff%', pl.col('whiff').sum() / pl.col('swing').sum(), True, Player.PITCHER) register_stat('SwStr%', pl.col('whiff').sum() / pl.col('pitch').sum(), True, Player.PITCHER) register_stat('CSW%', pl.col('csw').sum() / pl.col('pitch').sum(), True, Player.PITCHER) register_stat('Ball%', is_ball.sum() / pl.col('pitch').sum(), True, Player.BATTER) register_stat('Strike%', is_non_ball.sum() / pl.col('pitch').sum(), True, Player.PITCHER) register_stat('F-Str%', (is_non_ball & first_count).sum() / first_count.sum(), True, Player.PITCHER) register_stat('PAR%', ((is_two_str & pl.col('presult').str.contains('strikeout')).sum()) / is_two_str.sum(), True, Player.PITCHER) register_stat('PLUS%', (pl.col('csw') | (pl.col('presult') == 'Foul') | is_bip_out).sum() / pl.col('pitch').sum(), True, Player.PITCHER) register_stat('Behind%', ((pl.col('before_b') > pl.col('before_s')) & (pl.col('before_s') < 2) & (pl.col('before_b') > 1)).sum() / pl.len(), True, Player.BATTER) register_stat('Zone%', pl.col('zone').sum() / pl.col('pitch').sum(), True, Player.PITCHER) register_stat('Glove%', (pl.when(pl.col('pitLR') == 'r').then(pl.col('x') < 0).otherwise(pl.col('x') > 0)).mean(), True, None) register_stat('Arm%', (pl.when(pl.col('pitLR') == 'r').then(pl.col('x') >= 0).otherwise(pl.col('x') <= 0)).mean(), True, None) register_stat('High%', (pl.col('y') > 125).mean(), True, None) register_stat('Low%', (pl.col('y') <= 125).mean(), True, None) register_stat('MM%', (pl.col('x').is_between(-20, 20) & pl.col('y').is_between(100, 100+50)).mean(), True, None) register_stat('Sec%', (pl.col('basic_ballKind_code').is_in(verify_and_return_basic_ball_kind_code(['BR', 'OS']))).sum() / pl.col('pitch').sum(), True, None) register_stat('GB%', pl.col('G') + pl.col('B'), True, Player.PITCHER, True) register_stat('FB%', pl.col('F') + pl.col('P'), True, Player.BATTER, True) register_stat('LD%', pl.col('L'), True, Player.BATTER, True) register_stat('IFFB%', pl.col('P'), True, Player.PITCHER, True) register_stat('OFFB%', pl.col('F'), True, Player.BATTER, True) register_stat('AIR%', pl.col('F') + pl.col('P') + pl.col('L'), True, Player.BATTER, True) register_stat('HR%', (pl.col('presult') == 'Home run').sum() / pa, True, Player.BATTER) register_stat('HR/FB', (pl.col('presult') == 'Home run').sum() / (pl.col('aux_bresult').struct.field('batType').is_in(['F', 'P'])).sum(), True, Player.BATTER) # note: to match bouno-san's data, HR/FB bust be /FB for pitchers and /OFFB for batters; # /OFFB matches both player types for DeltaGraphs, so we use /OFFB in implementaiton and /FB in na,e # register_stat('Usage', pl.col('count')/pl.sum('count').over('pitId'), True, None) register_stat('Usage', pl.len()/pl.first('Pitches'), True, None) register_stat('Avg Velo', pl.when(valid_pitch).then('mph').mean(), False, None) register_stat('Max Velo', pl.col('mph').max(), False, None) def filter_data_by_date_and_game_kind(data, start_date=None, end_date=None, game_kind=None): if start_date is not None: data = data.filter(pl.col('date') >= start_date) if end_date is not None: data = data.filter(pl.col('date') <= end_date) if game_kind is not None: data = data.filter(pl.col('coarse_game_kind') == game_kind) return data def compute_team_games(data): data = ( data .with_columns( pl.col('gameId').unique().len().over('HomeTeamNameES').alias('home_games'), pl.col('gameId').unique().len().over('VisitorTeamNameES').alias('visitor_games') ) ) game_data = ( data .group_by('HomeTeamNameES') .first() [['HomeTeamNameES', 'home_games']] .rename({'HomeTeamNameES': 'team'}) .join( ( data .group_by('VisitorTeamNameES') .first() [['VisitorTeamNameES', 'visitor_games']] .rename({'VisitorTeamNameES': 'team'}) ), on='team', how='full' ) .fill_null(0) .with_columns( (pl.col('home_games')+pl.col('visitor_games')).alias('games'), pl.when(pl.col('team').is_null()) .then(pl.col('team_right')) .otherwise(pl.col('team')).alias('team') ) ) return ( data .drop('home_games', 'visitor_games') .join( game_data[['team', 'games']].rename({'games': 'home_games'}), left_on='HomeTeamNameES', right_on='team' ) .join( game_data[['team', 'games']].rename({'games': 'visitor_games'}), left_on='VisitorTeamNameES', right_on='team' ) ) def compute_pitch_stats(data, player_type, pitch_class_type, min_pitches=1, pitcher_lr='both', batter_lr='both', group_by_team=False): assert pitcher_lr in ('both', 'l', 'r') assert batter_lr in ('both', 'l', 'r') assert player_type in ('pitcher', 'batter', 'team pitching', 'team batting') assert pitch_class_type in ('general', 'specific') # pitching or batting, player or team pitching = player_type in ('pitcher', 'team pitching') team = player_type in ('team pitching', 'team batting') # handedness filters if pitcher_lr != 'both': data = data.filter(pl.col('pitLR') == pitcher_lr) if batter_lr != 'both': data = data.filter(pl.col('batLR') == batter_lr) if pitching: over_col = 'pitId' if not team else 'pitcher_team_name_short' else: over_col = 'batId' if not team else 'batter_team_name_short' # id_cols = ['pitId' if player_type == 'pitcher' else 'batId'] # team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short' # if group_by_team: # id_cols.append(team_col) # col names match player_type: case 'pitcher': id_cols = ['pitId'] name_col = 'pitcher_name' case 'batter': id_cols = ['batId'] name_col = 'batter_name' case _: id_cols = [] name_col = None team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short' if group_by_team or team: id_cols.append(team_col) handedness_col = 'pitLR' if pitching else 'batLR' new_handedness_col = 'Throws' if pitching else 'Bats' # name_col = 'pitcher_name' if player_type == 'pitcher' else 'batter_name' pitch_col = 'ballKind_code' if pitch_class_type == 'specific' else 'general_ballKind_code' pitch_name_col = 'ballKind' if pitch_class_type == 'specific' else 'general_ballKind' pitch_stats = ( data .with_columns( (pl.col('ballSpeed') / 1.609).round(1).alias('mph'), pl.when(pl.col('general_ballKind_code').is_in(['4S', 'FC', 'SI'])).then(pl.when(valid_pitch).then('ballSpeed').mean().over(over_col, 'general_ballKind_code')).mul(1/1.609).round(1).alias('FB Velo'), pl.len().over(over_col).alias('Pitches') ) .group_by(*id_cols, pitch_col) .agg( *([pl.col(name_col).first()] if not team else []), *([] if group_by_team or team else [pl.col(team_col).last()]), *( [pl.col(handedness_col).first().str.to_uppercase().alias(new_handedness_col) ] if not (team and ((pitcher_lr == 'both') if pitching else (batter_lr == 'both'))) else [] ), # pl.first(name_col), # pl.col('pitLR').first().str.to_uppercase().alias('Throws'), *([pl.first('general_ballKind')] if pitch_class_type == 'specific' else []), pl.first(pitch_name_col), pl.len().alias('count'), # pl.when(pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)).then('ballSpeed').mean().alias('Avg KPH'), # pl.col('ballSpeed').max().alias('Max KPH'), # pl.when(pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)).then('mph').mean().round(1).alias('Avg MPH'), # pl.col('mph').max().alias('Max MPH'), pl.col('aux_bresult').struct.field('batType').drop_nulls().value_counts(normalize=True), # swing, # z_swing, # chase, # contact, # z_con, # o_con, # whiff, # swstr, # csw, # strike, # ball, # f_strike, # par, # zone, # glove, # arm, # high, # low, # mm, # behind *[stat['expr'] for stat in STATS.values() if not stat['batted_ball']] ) .with_columns( # (pl.col('count')/pl.sum('count').over('pitId')).alias('usage'), # get_stat_val('Usage', 'expr'), (pl.col('count') >= min_pitches).alias('qualified'), ) .explode('batType') .unnest('batType') .pivot(on='batType', values='proportion') .fill_null(0) .with_columns( *[stat['expr'] for stat in STATS.values() if stat['batted_ball']] # (pl.col('G') + pl.col('B')).alias('GB%'), # (pl.col('F') + pl.col('P')).alias('FB%'), # pl.col('L').alias('LD%'), # pl.col('P').alias('IFFB%'), # pl.col('F').alias('OFFB%'), # (pl.col('F') + pl.col('P') + pl.col('L')).alias('AIR%') ) .drop('G', 'F', 'B', 'P', 'L', 'null') .with_columns( # (pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=((stat in ['FB%', 'LD%', 'OFFB%', 'AIR%', 'Ball%', 'Behind%'] or 'Contact%' in stat)))/pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl') # for stat in ['Avg KPH', 'Max KPH', 'Avg MPH', 'Max MPH', 'Swing%', 'Z-Swing%', 'Chase%', 'Contact%', 'Z-Contact%', 'O-Contact%', 'SwStr%', 'Whiff%', 'CSW%', 'Strike%', 'Ball%', 'F-Str%', 'PAR%', 'GB%', 'FB%', 'LD%', 'OFFB%', 'IFFB%', 'AIR%', 'Zone%', 'Behind%'] (pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=(get_stat_val(stat, 'percentile')) not in (Player.PITCHER if pitching else Player.BATTER, Player.BOTH)) / pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl') for stat in STATS.keys() ) .rename({pitch_col: 'ballKind_code', pitch_name_col: 'ballKind'} if pitch_class_type == 'general' else {}) .sort(id_cols[0], 'count', descending=[False, True]) ) return pitch_stats def compute_player_stats(data, player_type, qual='qualified', pitcher_lr='both', batter_lr='both', group_by_team=False): # TO-DO: figure out if I still need group_by_team assert pitcher_lr in ('both', 'l', 'r') assert batter_lr in ('both', 'l', 'r') assert player_type in ('pitcher', 'batter', 'team pitching', 'team batting') # pitching or batting, player or team pitching = player_type in ('pitcher', 'team pitching') team = player_type in ('team pitching', 'team batting') # handedness filters if pitcher_lr != 'both': data = data.filter(pl.col('pitLR') == pitcher_lr) if batter_lr != 'both': data = data.filter(pl.col('batLR') == batter_lr) if pitching: over_col = 'pitId' if not team else 'pitcher_team_name_short' else: over_col = 'batId' if not team else 'batter_team_name_short' data = ( compute_team_games(data) .with_columns( pl.when(pl.col('half_inning').str.ends_with('1')).then('home_games').otherwise('visitor_games').first().over('pitId').alias('games'), # pl.col('inning_code').unique().len().over(over_col).alias('IP'), (pl.col('bso').struct.field('o').cast(pl.Int32) - pl.col('beforeBso').struct.field('o').cast(pl.Int32)).sum().mul(1/3).over(over_col).alias('IP'), pl.col('pa_code').unique().len().over(over_col).alias('PA'), pl.col('presult').is_in(verify_and_return_presult([ 'Single', 'Double', 'Triple', 'Home run', 'Inside-the-park home run', 'Groundout', 'Flyout', 'Lineout', 'Groundout (Double play)', 'Foul fly', 'Foul line (?)', 'Error', 'Sacrifice hit error', 'Sacrifice fly error', "Fielder's choice", 'Bunt strikeout', 'Swinging strikeout', 'Looking strikeout' ])).sum().over(over_col).alias('AB'), pl.len().over(over_col).alias('Pitches') # pl.col('presult').is_in(verify_and_return_presult([ # 'Groundout', 'Flyout', 'Lineout', 'Groundout (Double play)', # 'Foul fly', 'Foul line (?)', # 'Sacrifice bunt', 'Sacrifice fly', # "Fielder's choice", "Sacrifice fielder's choice", # 'Bunt strikeout', 'Swinging strikeout', 'Looking strikeout' # ])).sum().over('pitId').mul(1/3).alias('IP') ) ) # qualifiers qualified_factor = 1 if pitching else 3.1 qual_col = 'IP' if pitching else 'PA' if qual == 'qualified': data = data.with_columns((pl.col(qual_col) >= qualified_factor * pl.col('games')).alias('qualified')) else: data = data.with_columns((pl.col(qual_col) >= qual).alias('qualified')) # percentile ascending/descending # if pitching: # stat_descending_pctl = lambda stat: stat in ['BB%', 'Ball%', 'FB%', 'LD%', 'OFFB%', 'AIR%', 'Z-Swing%', 'Behind%', 'OBP'] or 'Contact%' in stat # else: # stat_descending_pctl = lambda stat: not (stat in ['BB%', 'Ball%', 'FB%', 'LD%', 'OFFB%', 'AIR%', 'Swing%', 'Z-Swing%', 'Behind%', 'OBP'] or 'Contact%' in stat) # col names match player_type: case 'pitcher': id_cols = ['pitId'] name_col = 'pitcher_name' case 'batter': id_cols = ['batId'] name_col = 'batter_name' case _: id_cols = [] name_col = None team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short' if group_by_team or team: id_cols.append(team_col) handedness_col = 'pitLR' if pitching else 'batLR' new_handedness_col = 'Throws' if pitching else 'Bats' player_stats = ( data .with_columns( (pl.col('ballSpeed') / 1.609).round(1).alias('mph'), pl.when(pl.col('general_ballKind_code').is_in(['4S', 'FC', 'SI'])).then(pl.when(valid_pitch).then('ballSpeed').mean().over(over_col, 'general_ballKind_code')).mul(1/1.609).round(1).alias('FB Velo') ) .group_by(id_cols) .agg( *([pl.col(name_col).first()] if not team else []), *([] if group_by_team or team else [pl.col(team_col).last()]), *( [pl.col(handedness_col).first().str.to_uppercase().alias(new_handedness_col) ] if not (team and ((pitcher_lr == 'both') if pitching else (batter_lr == 'both'))) else [] ), pl.col('IP').first(), pl.col('PA').first(), # pl.col('FB Velo').max(), # (pl.when(pl.col('presult').str.contains('strikeout')).then(1).otherwise(0).sum() / pl.col('pa_code').unique().len()).alias('K%'), # (pl.when(pl.col('presult') == 'Walk').then(1).otherwise(0).sum() / pl.col('pa_code').unique().len()).alias('BB%'), pl.col('aux_bresult').struct.field('batType').drop_nulls().value_counts(normalize=True), # swing, # z_swing, # chase, # contact, # z_con, # o_con, # whiff, # swstr, # csw, # strike, # ball, # f_strike, # par, # zone, # glove, # arm, # high, # low, # mm, # behind, # pl.col('AB').first(), # h, # bb, # hbp, # sf, # obp, # pl.first('qualified') pl.first('qualified'), *[stat['expr'] for stat in STATS.values() if not stat['batted_ball']] ) .explode('batType') .unnest('batType') .pivot(on='batType', values='proportion') .fill_null(0) .with_columns( *[stat['expr'] for stat in STATS.values() if stat['batted_ball']] # (pl.col('G') + pl.col('B')).alias('GB%'), # (pl.col('F') + pl.col('P')).alias('FB%'), # pl.col('L').alias('LD%'), # pl.col('P').alias('IFFB%'), # pl.col('F').alias('OFFB%'), # (pl.col('F') + pl.col('P') + pl.col('L')).alias('AIR%') ) .drop('G', 'F', 'B', 'P', 'L') .with_columns( # (pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=stat_descending_pctl(stat))/pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl') # for stat in ['FB Velo', 'K%', 'BB%', 'Swing%', 'Z-Swing%', 'Chase%', 'Contact%', 'Z-Contact%', 'O-Contact%', 'SwStr%', 'Whiff%', 'CSW%', 'Strike%', 'Ball%', 'F-Str%', 'PAR%', 'GB%', 'FB%', 'LD%', 'OFFB%', 'IFFB%', 'AIR%', 'Zone%', 'Behind%', 'OBP'] (pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=(get_stat_val(stat, 'percentile')) not in (Player.PITCHER if pitching else Player.BATTER, Player.BOTH)) / pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl') for stat in STATS.keys() ) .sort(qual_col, descending=True) ) return player_stats def get_pitcher_stats(id, lr='both', game_kind=None, start_date=None, end_date=None, min_ip=1, min_pitches=1, pitch_class_type='specific'): source_data = data_df source_data = filter_data_by_date_and_game_kind(source_data, start_date=start_date, end_date=end_date, game_kind=game_kind) # if lr is not None: # source_data = pitch_stats = compute_pitch_stats(source_data, player_type='pitcher', pitch_class_type=pitch_class_type, min_pitches=min_pitches, batter_lr=lr, group_by_team=False).filter(pl.col('pitId') == id) pitch_shapes = ( (source_data.filter(pl.col('batLR') == lr) if lr != 'both' else source_data) .filter( (pl.col('pitId') == id) & pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0) ) [['pitId', 'general_ballKind_code', 'ballKind_code', 'ballSpeed', 'x', 'y']] .with_columns((pl.col('ballSpeed')/1.609).alias('ballSpeed_mph')) ) pitcher_stats = compute_player_stats(source_data, player_type='pitcher', qual=min_ip, batter_lr=lr, group_by_team=False).filter(pl.col('pitId') == id) return SimpleNamespace(pitcher_stats=pitcher_stats, pitch_stats=pitch_stats, pitch_shapes=pitch_shapes)