Spaces:
Runtime error
Runtime error
| """ | |
| @author: Caglar Aytekin | |
| contact: caglar@deepcause.ai | |
| """ | |
| import torch | |
| import torch.nn as nn | |
| import random | |
| import numpy as np | |
| import pandas as pd | |
| import copy | |
| class CustomEncodingFunction(torch.autograd.Function): | |
| def forward(ctx, x, tau,alpha): | |
| ctx.save_for_backward(x, tau) | |
| # Perform the tanh operation on (x + tau) | |
| y = torch.tanh(x + tau) | |
| # The actual forward output : binarized output | |
| forward_output = alpha * (2 * torch.round((y + 1) / 2) - 1) + (1-alpha)*y | |
| return forward_output | |
| def backward(ctx, grad_output): | |
| x, tau = ctx.saved_tensors | |
| # Use the derivative of tanh for the backward pass: 1 - tanh^2(x + tau) | |
| grad_input = grad_output * (1 - torch.tanh(x + tau) ** 2) | |
| return grad_input, grad_input,None # Assuming tau also requires gradient | |
| # Wrapping the custom function in a nn.Module for easier use | |
| class EncodingLayer(nn.Module): | |
| def __init__(self): | |
| super(EncodingLayer, self).__init__() | |
| def forward(self, x, tau,alpha): | |
| return CustomEncodingFunction.apply(x, tau,alpha) | |
| class LEURN(nn.Module): | |
| def __init__(self, preprocessor,depth,droprate): | |
| """ | |
| Initializes the model. | |
| Parameters: | |
| - preprocessor: A class containing useful info about the dataset | |
| - Including: attribute names, categorical features details, suggested embedding size for each category, output type, output dimension, transformation information | |
| - depth: Depth of the network | |
| - droprate: dropout rate | |
| """ | |
| super(LEURN, self).__init__() | |
| #Find categorical indices and category numbers for each | |
| self.alpha=1.0 | |
| self.preprocessor=preprocessor | |
| self.attribute_names=preprocessor.attribute_names | |
| self.label_encoders=preprocessor.encoders_for_nn | |
| self.categorical_indices = [info[0] for info in preprocessor.category_details] | |
| self.num_categories = [info[1] for info in preprocessor.category_details] | |
| #If embedding_size is integer, cast it to all categories | |
| if isinstance(preprocessor.suggested_embeddings, int): | |
| embedding_sizes = [preprocessor.suggested_embeddings] * len(self.categorical_indices) | |
| else: | |
| assert len(preprocessor.suggested_embeddings) == len(self.categorical_indices), "Length of embedding_size must match number of categorical features" | |
| embedding_sizes = preprocessor.suggested_embeddings | |
| self.embedding_sizes=embedding_sizes | |
| #Embedding layers for categorical features | |
| self.embeddings = nn.ModuleList([ | |
| nn.Embedding(num_categories, embedding_dim) | |
| for num_categories, embedding_dim in zip(self.num_categories, embedding_sizes) | |
| ]) | |
| for embedding_now in self.embeddings: | |
| nn.init.uniform_(embedding_now.weight, -1.0, 1.0) | |
| self.total_embedding_size = sum(embedding_sizes) #number of categorical features for NN | |
| self.non_cat_input_dim = len(self.attribute_names) - len(self.categorical_indices) #Number of numerical features for NN | |
| self.nn_input_dim = self.total_embedding_size + self.non_cat_input_dim #Number of features for NN | |
| #LAYERS | |
| self.tau_initial = nn.Parameter(torch.zeros(1,self.nn_input_dim)) # Initial tau as a learnable parameter | |
| self.layers = nn.ModuleList() | |
| self.depth = depth | |
| self.output_type=preprocessor.output_type | |
| for d_now in range(depth): | |
| # Each iteration adds an encoding layer followed by a dropout and then a linear layer | |
| self.layers.append(EncodingLayer()) | |
| self.layers.append(nn.Dropout1d(droprate)) | |
| linear_layer = nn.Linear((d_now + 1) * self.nn_input_dim, self.nn_input_dim) | |
| self._init_weights(linear_layer,d_now+1) #special layer initialization | |
| self.layers.append(linear_layer) | |
| # Final stage: dropout and linear layer | |
| self.final_dropout=nn.Dropout1d(droprate) | |
| self.final_linear = nn.Linear(depth * self.nn_input_dim, self.preprocessor.output_dim) | |
| self._init_weights(self.final_linear, depth) | |
| def set_alpha(self, alpha): | |
| """Method to update the dynamic parameter.""" | |
| self.alpha = alpha | |
| def _init_weights(self, layer,depth_now): | |
| # Custom initialization | |
| # Considering the binary (-1,1) nature of the input, | |
| # when we initialize layer in (-1/dim,1/dim) range, output is bounded at (-1,1) | |
| # Knowing our input is roughly at (-1,1) range, this serves as good initialization for tau | |
| if not(self.embedding_sizes==[]): | |
| init_tensor = torch.tensor([1/size for size in self.embedding_sizes for _ in range(size)]) | |
| if init_tensor.shape[0]<self.nn_input_dim: #Means we have numericals too | |
| init_tensor=torch.cat((init_tensor, torch.ones(self.non_cat_input_dim)), dim=0) | |
| else: | |
| init_tensor = torch.ones(self.non_cat_input_dim) | |
| init_tensor=init_tensor/((depth_now+1)*torch.tensor(len(self.attribute_names))) | |
| init_tensor=init_tensor.unsqueeze(0).repeat_interleave(repeats=layer.weight.shape[0],dim=0).repeat_interleave(repeats=depth_now,dim=1) | |
| layer.weight.data.uniform_(-1, 1) | |
| layer.weight=torch.nn.Parameter(layer.weight*init_tensor) | |
| def forward(self, x): | |
| # Defines forward function for provided input: Normalizes numericals, embeds categoricals, and gives to neural network. | |
| # Separate categorical and numerical features for easier handling | |
| cat_features = [x[:, i].long() for i in self.categorical_indices] | |
| non_cat_features = [x[:, i] for i in range(x.size(1)) if i not in self.categorical_indices] | |
| non_cat_features = torch.stack(non_cat_features, dim=1) if non_cat_features else x.new_empty(x.size(0), 0) | |
| # Embed categoricals | |
| embedded_features = [embedding(cat_feature) for embedding, cat_feature in zip(self.embeddings, cat_features)] | |
| # Combine categoricals and numericals | |
| try: | |
| embedded_features = torch.cat(embedded_features, dim=1) | |
| nninput = torch.cat([embedded_features, non_cat_features], dim=1) | |
| except: | |
| nninput=non_cat_features | |
| self.nninput=nninput | |
| # Forward pass neural network | |
| output=self.forward_from_embeddings(self.nninput) | |
| self.output=output | |
| return output | |
| def forward_from_embeddings(self,x): | |
| # Forward function for normalized numericals and embedded categoricals | |
| tau=self.tau_initial | |
| tau=torch.repeat_interleave(tau,x.shape[0],0) #tau is 1xF, cast it for batch | |
| # For each depth | |
| for i in range(0, self.depth * 3, 3): | |
| # encode, drop and find next tau | |
| encoding_layer = self.layers[i] | |
| dropout_layer = self.layers[i + 1] | |
| linear_layer = self.layers[i + 2] | |
| #encode and drop | |
| encoded_x =dropout_layer( encoding_layer(x, tau,self.alpha)) | |
| #save encodings and thresholds | |
| #notice that threshold is -tau, not tau since we binarize x+tau | |
| if i==0: | |
| encodings=encoded_x | |
| taus=-tau | |
| else: | |
| encodings=torch.cat((encodings,encoded_x),dim=-1) | |
| taus=torch.cat((taus,-tau),dim=-1) | |
| #find next thresholds | |
| tau = linear_layer(encodings) #not used, redundant for last layer | |
| self.encodings=encodings | |
| self.taus=taus | |
| #Final layer: drop and linear | |
| output=self.final_linear(self.final_dropout(encodings)) | |
| return output | |
| def find_boundaries(self, x): | |
| """ | |
| Given input, find boundaries for numerical features and valid categories for categorical features | |
| Can accept unnormalized and not embedded input - set embedding False | |
| """ | |
| # Ensure x is the correct shape [1, input_dim] | |
| if x.ndim == 1: | |
| x = x.unsqueeze(0) # Add batch dimension if not present | |
| # Perform a forward pass to update self.encodings and self.taus | |
| # to update self.taus | |
| self(x) | |
| # self.taus has the shape [1, depth * input_dim] | |
| # reshape to [depth, input_dim] for easier boundary finding | |
| taus_reshaped = self.taus.view(self.depth, self.nn_input_dim) | |
| # embedded and normalized input | |
| embedded_x=self.nninput | |
| # Initialize boundaries - numericals are in (-1,1) range and categoricals are from embeddings. | |
| # So -100,100 is safe min and max. -inf,+inf is not chosen since problematic for later sampling | |
| upper_boundaries = torch.full((embedded_x.size(1),), 100.0) | |
| lower_boundaries = torch.full((embedded_x.size(1),), -100.0) | |
| # Compare each threshold in self.taus with the corresponding input value | |
| for feature_index in range(self.nn_input_dim): | |
| for depth_index in range(self.depth): | |
| threshold = taus_reshaped[depth_index, feature_index] | |
| input_value = embedded_x[0, feature_index] | |
| # If the threshold is greater than the input value and less than the current upper boundary, update the upper boundary | |
| if threshold > input_value and threshold < upper_boundaries[feature_index]: | |
| upper_boundaries[feature_index] = threshold | |
| # If the threshold is less than the input value and greater than the current lower boundary, update the lower boundary | |
| if threshold < input_value and threshold > lower_boundaries[feature_index]: | |
| lower_boundaries[feature_index] = threshold | |
| # Convert boundaries to a list of tuples [(lower, upper), ...] for each feature | |
| boundaries = list(zip(lower_boundaries.tolist(), upper_boundaries.tolist())) | |
| self.upper_boundaries=upper_boundaries | |
| self.lower_boundaries=lower_boundaries | |
| return boundaries | |
| def categories_within_boundaries(self): | |
| """ | |
| For each categorical feature, checks if embedding weights fall within the specified upper and lower boundaries. | |
| Returns a dictionary with categorical feature indices as keys and lists of category indices that fall within the boundaries. | |
| """ | |
| categories_within_bounds = {} | |
| emb_st=0 | |
| for cat_index, emb_layer in zip(range(len(self.categorical_indices)), self.embeddings): | |
| # Extract upper and lower boundaries for this categorical feature | |
| lower_bound=self.lower_boundaries[emb_st:emb_st+self.embedding_sizes[cat_index]] | |
| upper_bound=self.upper_boundaries[emb_st:emb_st+self.embedding_sizes[cat_index]] | |
| emb_st=emb_st+self.embedding_sizes[cat_index] | |
| # Initialize list to hold categories that fall within boundaries | |
| categories_within = [] | |
| # Iterate over each embedding vector in the layer | |
| for i, weight in enumerate(emb_layer.weight): | |
| # Check if the embedding weight falls within the boundaries | |
| if torch.all(weight >= lower_bound) and torch.all(weight <= upper_bound): | |
| categories_within.append(i) # Using index i as category identifier | |
| # Store the categories that fall within the boundaries for this feature | |
| categories_within_bounds[cat_index] = categories_within | |
| return categories_within_bounds | |
| def global_importance(self): | |
| final_layer_weight=torch.clone(self.final_linear.weight).detach().numpy() | |
| importances=np.sum(np.abs(final_layer_weight),0) | |
| importances=importances.reshape(importances.shape[0]//self.nn_input_dim,self.nn_input_dim) | |
| importances=np.sum(importances,0) | |
| importances_features=[] | |
| st=0 | |
| for i in range(len(self.attribute_names)): | |
| try: | |
| importances_features.append(np.sum(importances[st:st+self.embedding_sizes[i]])) | |
| st=st+self.embedding_sizes[i] | |
| except: | |
| st=st+1 | |
| return np.argsort(importances_features)[::-1],np.sort(importances_features)[::-1] | |
| def influence_matrix(self): | |
| """ | |
| Finds ADG from how each feature effects other's threshold via weight matrices | |
| """ | |
| def create_block_sum_matrix(sizes, matrix): | |
| L = len(sizes) | |
| # Initialize the output matrix with zeros, using PyTorch | |
| block_sum_matrix = torch.zeros((L, L)) | |
| # Define the starting row and column indices for slicing | |
| start_row = 0 | |
| for i, row_size in enumerate(sizes): | |
| start_col = 0 | |
| for j, col_size in enumerate(sizes): | |
| # Calculate the sum of the current block using PyTorch | |
| block_sum = torch.sum(matrix[start_row:start_row+row_size, start_col:start_col+col_size]) | |
| block_sum_matrix[i, j] = block_sum | |
| # Update the starting column index for the next block in the row | |
| start_col += col_size | |
| # Update the starting row index for the next block in the column | |
| start_row += row_size | |
| return block_sum_matrix | |
| def add_ones_until_target(initial_list, target_sum): | |
| # Continue adding 1s until the sum of the list equals the target sum | |
| while sum(initial_list) < target_sum: | |
| initial_list.append(1) | |
| return initial_list | |
| for i in range(0, self.depth * 3, 3): | |
| # encode, drop and find next tau | |
| weight_now=self.layers[i + 2].weight | |
| weight_now_reshaped=weight_now.reshape((weight_now.shape[0], weight_now.shape[1]//self.nn_input_dim,self.nn_input_dim)) #shape: output x depth x input | |
| if i==0: | |
| # effects=np.sum(np.abs(weight_now_reshaped.numpy()),axis=1)/self.depth #shape: output x input | |
| effects=torch.sum(torch.abs(weight_now_reshaped), dim=1) / self.depth | |
| else: | |
| effects=effects+torch.sum(torch.abs(weight_now_reshaped), dim=1) / self.depth | |
| effects=effects.t() #shape: input x output | |
| modified_list = add_ones_until_target(copy.deepcopy(self.embedding_sizes), effects.shape[0]) | |
| effects=create_block_sum_matrix(modified_list,effects) | |
| return effects | |
| def explain_without_causal_effects(self,x): | |
| """ | |
| Explains decisions of the neural network for input sample. | |
| For numericals, extracts upper and lower boundaries on the sample | |
| For categoricals displays possible categories | |
| Also calculates contributions of each feature to final result | |
| """ | |
| self.find_boundaries(x) #find upper, lower boundaries for all nn inputs | |
| #find valid categories for categorical features | |
| valid_categories=self.categories_within_boundaries() | |
| #numerical boundaries | |
| upper_numerical=self.upper_boundaries[sum(self.embedding_sizes):].detach().numpy() | |
| lower_numerical=self.lower_boundaries[sum(self.embedding_sizes):].detach().numpy() | |
| #Find contribution from each feature in final linear layer, distribute bias evenly | |
| contributions=self.encodings * self.final_linear.weight + self.final_linear.bias.unsqueeze(dim=-1)/self.final_linear.weight.shape[1] | |
| contributions=contributions.detach().resize_((contributions.shape[0], contributions.shape[1]//self.nn_input_dim,self.nn_input_dim)) | |
| contributions=torch.sum(contributions,dim=1) | |
| # Initialize an empty list to store the summed contributions | |
| summed_contributions = [] | |
| # Initialize start index for slicing | |
| start_idx = 0 | |
| #Sum contribution of each categorical within respective embedding | |
| for size in self.embedding_sizes: | |
| # Calculate end index for the current chunk | |
| end_idx = start_idx + size | |
| # Sum the contributions in the current chunk | |
| chunk_sum = contributions[:, start_idx:end_idx].sum(dim=1, keepdim=True) | |
| # Append the summed chunk to the list | |
| summed_contributions.append(chunk_sum) | |
| # Update the start index for the next chunk | |
| start_idx = end_idx | |
| # If there are remaining elements not covered by embedding_sizes, add them as is (numerical features) | |
| if start_idx < contributions.shape[1]: | |
| remaining = contributions[:, start_idx:] | |
| summed_contributions.append(remaining) | |
| # Concatenate the summed contributions back into a tensor | |
| summed_contributions = torch.cat(summed_contributions, dim=1) | |
| # This is to handle multi-class explanations, for binary this is 0 automatically | |
| # Note: multi-output regression is not supported yet. This will just return largest regressed value's explanations | |
| highest_index=torch.argmax(summed_contributions.sum(dim=1)) | |
| # This is contribution from each feature | |
| result=summed_contributions[highest_index] | |
| self.result=result | |
| #Explanation and Contribution formats are in ordered format (categoricals first, numericals later) | |
| #Bring them to original format in user input | |
| #Combine categoricals and numericals explanations and contributions | |
| Explanation = [None] * (len(self.categorical_indices) + len(upper_numerical)) | |
| Contribution = np.zeros((len(self.categorical_indices) + len(upper_numerical),)) | |
| # Fill in the categorical samples | |
| for j, cat_index in enumerate(self.categorical_indices): | |
| Explanation[cat_index] = valid_categories[j] | |
| Contribution[cat_index] = result[j].numpy() | |
| #INVERSE TRANSFORM PART 1------------------------------------------------------------------------------------------- | |
| #Inverse transform upper and lower_numericals | |
| len_num=len(upper_numerical) | |
| if len_num>0: | |
| upper_numerical=self.preprocessor.scaler.inverse_transform(upper_numerical.reshape(1,-1)) | |
| lower_numerical=self.preprocessor.scaler.inverse_transform(lower_numerical.reshape(1,-1)) | |
| if len_num>1: | |
| upper_numerical=np.squeeze(upper_numerical) | |
| lower_numerical=np.squeeze(lower_numerical) | |
| upper_iter = iter(upper_numerical) | |
| lower_iter = iter(lower_numerical) | |
| cnt=0 | |
| for i in range(len(Explanation)): | |
| if Explanation[i] is None: | |
| #Note the denormalization here | |
| Explanation[i] = next(lower_iter),next(upper_iter) | |
| if len(self.categorical_indices)>0: | |
| Contribution[i] = result[j+cnt+1].numpy() | |
| else: | |
| Contribution[i] = result[cnt].numpy() | |
| cnt=cnt+1 | |
| attribute_names_list = [] | |
| revised_explanations_list = [] | |
| contributions_list = [] | |
| # Process each feature to fill lists | |
| for idx, attr_name in enumerate(self.attribute_names): | |
| if isinstance(Explanation[idx], list): # Categorical | |
| #INVERSE TRANSFORM PART 2------------------------------------------------------------------------------------------- | |
| #Inverse transform categoricals | |
| category_names = [key for key, value in self.label_encoders[attr_name].items() if value in Explanation[idx]] | |
| revised_explanation = " ,OR, ".join(category_names) | |
| elif isinstance(Explanation[idx], tuple): # Numerical | |
| revised_explanation = f"{Explanation[idx][0].item()} to {Explanation[idx][1].item()}" | |
| else: | |
| revised_explanation = "Unknown" #shouldn't really happen | |
| # Append to lists | |
| attribute_names_list.append(attr_name) | |
| revised_explanations_list.append(revised_explanation) | |
| contributions_list.append(Contribution[idx] if idx < len(Contribution) else None) | |
| # Construct DataFrame | |
| Explanation_df = pd.DataFrame({ | |
| 'Name': attribute_names_list, | |
| 'Category': revised_explanations_list, | |
| 'Contribution': contributions_list | |
| }) | |
| result=self.preprocessor.inverse_transform_y(self.output) | |
| # Explanation_df['Result'] = [result] * len(Explanation_df) | |
| return copy.deepcopy(Explanation_df),self.output.clone(),copy.deepcopy(result),copy.deepcopy(Explanation) | |
| def explain(self,x,include_causal_analysis=False): | |
| """ | |
| Fixes all features but one, sweeps that feature across its own categories, reports the average change from other categories to current one. | |
| """ | |
| def update_intervals(available_intervals, incoming_interval): | |
| updated_intervals = [] | |
| for interval in available_intervals: | |
| if incoming_interval[1] <= interval[0] or incoming_interval[0] >= interval[1]: | |
| # The incoming interval does not overlap, keep the interval as is | |
| updated_intervals.append(interval) | |
| else: | |
| # There is some overlap, possibly split the interval | |
| if incoming_interval[0] > interval[0]: | |
| # Add the left part that doesn't overlap | |
| updated_intervals.append((interval[0], incoming_interval[0])) | |
| if incoming_interval[1] < interval[1]: | |
| # Add the right part that doesn't overlap | |
| updated_intervals.append((incoming_interval[1], interval[1])) | |
| return updated_intervals | |
| def sample_from_intervals(available_intervals): | |
| if not available_intervals: | |
| return None | |
| # Choose a random interval | |
| chosen_interval = random.choice(available_intervals) | |
| # Sample a random point within this interval | |
| return random.uniform(chosen_interval[0], chosen_interval[1]) | |
| Explanation_df,output,result,Explanation=self.explain_without_causal_effects(x) | |
| if include_causal_analysis: | |
| # Causal analysis | |
| causal_effect=np.zeros((x.shape[-1],)) | |
| numerical_cnt=0 | |
| for idx, attr_name in enumerate(self.attribute_names): | |
| if isinstance(Explanation[idx], list): # Categorical | |
| all_category_names = [value for key, value in self.label_encoders[attr_name].items()] | |
| sweeped_category_names = [value for key, value in self.label_encoders[attr_name].items() if value in Explanation[idx]] | |
| if list(set(all_category_names)-set(sweeped_category_names)) == []: | |
| is_category_empty=True | |
| else: | |
| is_category_empty=False | |
| cnt=0 | |
| while is_category_empty==False: | |
| new_x=x.clone() | |
| next_category=list(set(all_category_names)-set(sweeped_category_names))[0] | |
| new_x[0,idx]=float(next_category) | |
| Explanation_df_new,output_new,result_new,Explanation_new=self.explain_without_causal_effects(new_x) | |
| sweeped_category_names = sweeped_category_names+[value for key, value in self.label_encoders[attr_name].items() if value in Explanation_new[idx]] | |
| if list(set(all_category_names)-set(sweeped_category_names)) == []: | |
| is_category_empty=True | |
| else: | |
| is_category_empty=False | |
| causal_effect[idx]=causal_effect[idx]+(output-output_new).detach().numpy()[0,0] | |
| cnt=cnt+1 | |
| if cnt>0: | |
| causal_effect[idx]=causal_effect[idx]/cnt | |
| else: | |
| search_complete=False | |
| # Initial available interval . we know -100,100 from initial setting up lower, upper bounds | |
| available_intervals = [(-100, 100)] | |
| # Example incoming intervals | |
| #numerical boundaries | |
| self.explain_without_causal_effects(x) | |
| upper_numerical=self.upper_boundaries[sum(self.embedding_sizes):].detach().numpy() | |
| lower_numerical=self.lower_boundaries[sum(self.embedding_sizes):].detach().numpy() | |
| incoming_interval = (lower_numerical[numerical_cnt],upper_numerical[numerical_cnt]) | |
| available_intervals = update_intervals(available_intervals, incoming_interval) | |
| cnt=0 | |
| while not(search_complete): | |
| new_sample=sample_from_intervals(available_intervals) | |
| new_x=x.clone() | |
| new_x[0,idx]=new_sample | |
| Explanation_df_new,output_new,result_new,Explanation_new=self.explain_without_causal_effects(new_x) | |
| causal_effect[idx]=causal_effect[idx]+(output-output_new).detach().numpy()[0,0] | |
| cnt=cnt+1 | |
| upper_numerical=self.upper_boundaries[sum(self.embedding_sizes):].detach().numpy() | |
| lower_numerical=self.lower_boundaries[sum(self.embedding_sizes):].detach().numpy() | |
| incoming_interval = (lower_numerical[numerical_cnt],upper_numerical[numerical_cnt]) | |
| available_intervals = update_intervals(available_intervals, incoming_interval) | |
| if available_intervals == []: | |
| search_complete=True | |
| if cnt>0: | |
| causal_effect[idx]=causal_effect[idx]/cnt | |
| numerical_cnt=numerical_cnt+1 | |
| Explanation_df['Causal Effects'] = causal_effect | |
| return Explanation_df,output,result | |
| def sample_from_boundaries(self): | |
| """ | |
| Assumes higher and lower boundaries are already extracted (eg self.explain is run on one input already) | |
| Samples a value for each feature within the specified upper and lower boundaries stored in the class instance. | |
| For numericals, samples a value, for categoricals samples a category from possible categories | |
| Returns: | |
| - A tensor containing sampled values within the given boundaries for each feature. | |
| """ | |
| #First sample from categories | |
| categories_within_bounds=self.categories_within_boundaries() | |
| try: | |
| sampled_indices = [random.choice(categories) for categories in categories_within_bounds.values()] | |
| except: | |
| categories_within_bounds=self.categories_within_boundaries() | |
| #Then from numericals | |
| samples = [] | |
| cnt=0 | |
| for lower, upper in zip(self.lower_boundaries[sum(self.embedding_sizes):], self.upper_boundaries[sum(self.embedding_sizes):]): | |
| # Sample from a uniform distribution between lower and upper boundaries | |
| upper=torch.minimum(upper.detach(),torch.from_numpy(np.array(1.0).astype('float32'))) | |
| lower=torch.maximum(lower.detach(),torch.from_numpy(np.array(-1.0).astype('float32'))) | |
| sample = lower + (upper - lower) * torch.rand(1) | |
| samples.append(sample) | |
| cnt=cnt+1 | |
| #Combine categoricals and numericals | |
| # Initialize an empty list to hold the combined samples | |
| combined_samples = [None] * (len(self.categorical_indices) + len(samples)) | |
| # Fill in the categorical samples | |
| for i, cat_index in enumerate(self.categorical_indices): | |
| combined_samples[cat_index] = torch.tensor([sampled_indices[i]], dtype=torch.float) | |
| # Fill in the numerical samples | |
| num_samples_iter = iter(samples) | |
| for i in range(len(combined_samples)): | |
| if combined_samples[i] is None: | |
| combined_samples[i] = next(num_samples_iter) | |
| # Combine into a single tensor | |
| combined_tensor = torch.cat(combined_samples, dim=-1) | |
| return combined_tensor.unsqueeze(dim=0) | |
| def generate(self): | |
| """ | |
| Generates a data sample from learned network | |
| """ | |
| def sample_with_tau(tau,max_bound,min_bound): | |
| # Sample according to tau, lower and upper bounds | |
| sampled=torch.zeros((self.nn_input_dim)) | |
| st=0 | |
| # Randomly pick from valid categories | |
| for embedding in self.embeddings: | |
| categories_within = [] | |
| # Iterate over each embedding vector in the layer | |
| for i, weight in enumerate(embedding.weight): | |
| # Check if the embedding weight falls within the boundaries | |
| if torch.all(weight >= min_bound[st:st+embedding.weight.shape[-1]]) and torch.all(weight <= max_bound[st:st+embedding.weight.shape[-1]]): | |
| categories_within.append(i) # Using index i as category identifier | |
| feature_now=embedding.weight[np.random.choice(categories_within),:] | |
| cnt=0 | |
| for j in range(st,st+embedding.weight.shape[-1]): | |
| if feature_now[cnt]>-tau[0,j]: | |
| sampled[j]=1.0 | |
| elif feature_now[cnt]<=-tau[0,j]: | |
| sampled[j]=-1.0 | |
| cnt=cnt+1 | |
| st=st+embedding.weight.shape[-1] | |
| #Randomly sample for numericals | |
| for i in range(st,self.nn_input_dim): | |
| if -tau[0,i]>max_bound[i]: #In this case you have to pick -1 | |
| sampled[i]=-1.0 | |
| elif -tau[0,i]<=min_bound[i]: #In this case you have to pick 1 | |
| sampled[i]=1.0 | |
| else: | |
| sampled[i] = (torch.randint(low=0, high=2, size=(1,)) * 2 - 1).float() | |
| return sampled | |
| def bound_update(tau,max_bound,min_bound,sampled): | |
| for i in range(self.nn_input_dim): | |
| if sampled[i]>0: #means input is larger than -tau, so -tau might set a lower bound | |
| if -tau[0,i]>min_bound[i]: | |
| min_bound[i]=-tau[0,i] | |
| elif sampled[i]<=0: #means input is smaller than -tau, so -tau might set an upper bound | |
| if -tau[0,i]<max_bound[i]: | |
| max_bound[i]=-tau[0,i] | |
| return max_bound,min_bound | |
| # Read first tau | |
| tau=self.tau_initial | |
| # Set initial maximum and minimum bounds | |
| max_bound=torch.zeros((self.nn_input_dim))+100.0 | |
| min_bound=torch.zeros((self.nn_input_dim))-100.0 | |
| for i in range(0, self.depth * 3, 3): | |
| encoding_layer = self.layers[i] #NOT USED HERE, WE ENCODE RANDOMLY MANUALLY | |
| dropout_layer = self.layers[i + 1] | |
| linear_layer = self.layers[i + 2] | |
| #Sample with current tau | |
| sample_now=sample_with_tau(tau,max_bound,min_bound) | |
| #Update bounds with new sample | |
| max_bound,min_bound=bound_update(tau,max_bound,min_bound,sample_now) | |
| encoded_x = dropout_layer(sample_now.unsqueeze(dim=0)) | |
| if i==0: | |
| encodings=encoded_x | |
| taus=-tau | |
| else: | |
| encodings=torch.cat((encodings,encoded_x),dim=-1) | |
| taus=torch.cat((taus,-tau),dim=-1) | |
| tau = linear_layer(encodings) #not used for last layer | |
| self.encodings=encodings | |
| self.taus=taus | |
| self.upper_boundaries=torch.clone(max_bound) | |
| self.lower_boundaries=torch.clone(min_bound) | |
| generated_sample=self.sample_from_boundaries() | |
| ##Check if manually found and network generated boundaries are same | |
| # if torch.equal(self.upper_boundaries,max_bound) and torch.equal(self.lower_boundaries,min_bound): | |
| # print(True) | |
| self.explain_without_causal_effects(generated_sample) | |
| generated_sample_original_format=self.preprocessor.inverse_transform_X(generated_sample) | |
| result=self.preprocessor.inverse_transform_y(self.output) | |
| generated_sample_original_format['prediction']=result | |
| return generated_sample,generated_sample_original_format,result | |
| def generate_from_same_category(self,x): | |
| self.explain_without_causal_effects(x) | |
| generated_sample=self.sample_from_boundaries() | |
| generated_sample_original_format=self.preprocessor.inverse_transform_X(generated_sample) | |
| result=self.preprocessor.inverse_transform_y(self.output) | |
| return generated_sample,generated_sample_original_format,result | |