import csv import json import sys import os import shutil import datetime from dateutil.parser import parse from random import random from collections import defaultdict, Counter from amlsim.account_data_type_lookup import AccountDataTypeLookup from faker import Faker import numpy as np def days_to_date(days): date = datetime.datetime(2017, 1, 1) + datetime.timedelta(days=days) return date.strftime("%Y%m%d") def get_simulator_name(csv_file): """Convert log file name to the simulator name :param csv_file: Transaction log file name :return: Simulator name """ elements = csv_file.split("_") return "_".join(elements[:4]) def get_name(acct_id): return "Account" + str(acct_id) def get_bank(acct_id): return "Bank" + str(acct_id) CASH_TYPES = {"CASH-IN", "CASH-OUT"} class AMLTypology: """Suspicious transaction and account group """ def __init__(self, reason): self.is_sar = False # SAR flag self.main_acct = None # Main account ID self.reason = reason # Description of the SAR self.transactions = dict() # Transaction ID, attributes self.members = set() # Accounts involved in the alert transactions self.recorded_members = set() # Accounts that have already been recorded. Avoid duplicates self.total_amount = 0.0 # Total transaction amount self.count = 0 # Number of transactions def add_member(self, member, is_sar): self.members.add(member) if is_sar: self.is_sar = True self.main_acct = member def add_tx(self, tx_id, amount, days, orig_acct, dest_acct, orig_name, dest_name, attr): self.transactions[tx_id] = (amount, days, orig_acct, dest_acct, orig_name, dest_name, attr) self.total_amount += amount self.count += 1 def get_reason(self): return self.reason def get_start_date(self): min_days = min([tx[1] for tx in self.transactions.values()]) return days_to_date(min_days) def get_end_date(self): max_days = max([tx[1] for tx in self.transactions.values()]) return days_to_date(max_days) class Schema: def __init__(self, data, base_date): self._base_date = base_date self.data = data self.acct_num_cols = None self.acct_names = list() self.acct_defaults = list() self.acct_types = list() self.acct_name2idx = dict() self.acct_id_idx = None self.acct_name_idx = None self.acct_balance_idx = None self.acct_start_idx = None self.acct_end_idx = None self.acct_sar_idx = None self.acct_model_idx = None self.acct_bank_idx = None self.tx_num_cols = None self.tx_names = list() self.tx_defaults = list() self.tx_types = list() self.tx_name2idx = dict() self.tx_id_idx = None self.tx_time_idx = None self.tx_amount_idx = None self.tx_type_idx = None self.tx_orig_idx = None self.tx_dest_idx = None self.tx_sar_idx = None self.tx_alert_idx = None self.alert_acct_num_cols = None self.alert_acct_names = list() self.alert_acct_defaults = list() self.alert_acct_types = list() self.alert_acct_name2idx = dict() self.alert_acct_alert_idx = None self.alert_acct_reason_idx = None self.alert_acct_id_idx = None self.alert_acct_name_idx = None self.alert_acct_sar_idx = None self.alert_acct_model_idx = None self.alert_acct_schedule_idx = None self.alert_acct_bank_idx = None self.alert_tx_num_cols = None self.alert_tx_names = list() self.alert_tx_defaults = list() self.alert_tx_types = list() self.alert_tx_name2idx = dict() self.alert_tx_id_idx = None self.alert_tx_type_idx = None self.alert_tx_sar_idx = None self.alert_tx_idx = None self.alert_tx_orig_idx = None self.alert_tx_dest_idx = None self.alert_tx_tx_type_idx = None self.alert_tx_amount_idx = None self.alert_tx_time_idx = None self.party_ind_num_cols = None self.party_ind_names = list() self.party_ind_defaults = list() self.party_ind_types = list() self.party_ind_name2idx = dict() self.party_ind_id_idx = None self.party_org_num_cols = None self.party_org_names = list() self.party_org_defaults = list() self.party_org_types = list() self.party_org_name2idx = dict() self.party_org_id_idx = None self.acct_party_num_cols = None self.acct_party_names = list() self.acct_party_defaults = list() self.acct_party_types = list() self.acct_party_name2idx = dict() self.acct_party_mapping_idx = None self.acct_party_acct_idx = None self.acct_party_party_idx = None self.party_party_num_cols = None self.party_party_names = list() self.party_party_defaults = list() self.party_party_types = list() self.party_party_name2idx = dict() self.party_party_ref_idx = None self.party_party_first_idx = None self.party_party_second_idx = None self._parse() def _parse(self): acct_data = self.data["account"] tx_data = self.data["transaction"] alert_tx_data = self.data["alert_tx"] alert_acct_data = self.data["alert_member"] party_ind_data = self.data["party_individual"] party_org_data = self.data["party_organization"] acct_party_data = self.data["account_mapping"] party_party_data = self.data["resolved_entities"] self.acct_num_cols = len(acct_data) self.tx_num_cols = len(tx_data) self.alert_tx_num_cols = len(alert_tx_data) self.alert_acct_num_cols = len(alert_acct_data) self.party_ind_num_cols = len(party_ind_data) self.party_org_num_cols = len(party_org_data) self.acct_party_num_cols = len(acct_party_data) self.party_party_num_cols = len(party_party_data) # Account list for idx, col in enumerate(acct_data): name = col["name"] v_type = col.get("valueType", "string") d_type = col.get("dataType") default = col.get("defaultValue", "") self.acct_names.append(name) self.acct_defaults.append(default) self.acct_types.append(v_type) # Transaction list for idx, col in enumerate(tx_data): name = col["name"] v_type = col.get("valueType", "string") d_type = col.get("dataType") default = col.get("defaultValue", "") self.tx_names.append(name) self.tx_defaults.append(default) self.tx_types.append(v_type) self.tx_name2idx[name] = idx if d_type is None: continue if d_type == "transaction_id": self.tx_id_idx = idx elif d_type == "timestamp": self.tx_time_idx = idx elif d_type == "amount": self.tx_amount_idx = idx elif d_type == "transaction_type": self.tx_type_idx = idx elif d_type == "orig_id": self.tx_orig_idx = idx elif d_type == "dest_id": self.tx_dest_idx = idx elif d_type == "sar_flag": self.tx_sar_idx = idx elif d_type == "alert_id": self.tx_alert_idx = idx # Alert member list for idx, col in enumerate(alert_acct_data): name = col["name"] v_type = col.get("valueType", "string") d_type = col.get("dataType") default = col.get("defaultValue", "") self.alert_acct_names.append(name) self.alert_acct_defaults.append(default) self.alert_acct_types.append(v_type) self.alert_acct_name2idx[name] = idx if d_type is None: continue if d_type == "alert_id": self.alert_acct_alert_idx = idx elif d_type == "alert_type": self.alert_acct_reason_idx = idx elif d_type == "account_id": self.alert_acct_id_idx = idx elif d_type == "account_name": self.alert_acct_name_idx = idx elif d_type == "sar_flag": self.alert_acct_sar_idx = idx elif d_type == "model_id": self.alert_acct_model_idx = idx elif d_type == "schedule_id": self.alert_acct_schedule_idx = idx elif d_type == "bank_id": self.alert_acct_bank_idx = idx # Alert transaction list for idx, col in enumerate(alert_tx_data): name = col["name"] v_type = col.get("valueType", "string") d_type = col.get("dataType") default = col.get("defaultValue", "") self.alert_tx_names.append(name) self.alert_tx_defaults.append(default) self.alert_tx_types.append(v_type) self.alert_tx_name2idx[name] = idx if d_type is None: continue if d_type == "alert_id": self.alert_tx_id_idx = idx elif d_type == "alert_type": self.alert_tx_type_idx = idx elif d_type == "sar_flag": self.alert_tx_sar_idx = idx elif d_type == "transaction_id": self.alert_tx_idx = idx elif d_type == "orig_id": self.alert_tx_orig_idx = idx elif d_type == "dest_id": self.alert_tx_dest_idx = idx elif d_type == "transaction_type": self.alert_tx_tx_type_idx = idx elif d_type == "amount": self.alert_tx_amount_idx = idx elif d_type == "timestamp": self.alert_tx_time_idx = idx # Individual party list for idx, col in enumerate(party_ind_data): name = col["name"] v_type = col.get("valueType", "string") d_type = col.get("dataType") default = col.get("defaultValue", "") self.party_ind_names.append(name) self.party_ind_defaults.append(default) self.party_ind_types.append(v_type) self.party_ind_name2idx[name] = idx if d_type is None: continue if d_type == "party_id": self.party_ind_id_idx = idx # Individual party list for idx, col in enumerate(party_org_data): name = col["name"] v_type = col.get("valueType", "string") d_type = col.get("dataType") default = col.get("defaultValue", "") self.party_org_names.append(name) self.party_org_defaults.append(default) self.party_org_types.append(v_type) self.party_org_name2idx[name] = idx if d_type is None: continue if d_type == "party_id": self.party_org_id_idx = idx # Account-Party list for idx, col in enumerate(acct_party_data): name = col["name"] v_type = col.get("valueType", "string") d_type = col.get("dataType") default = col.get("defaultValue", "") self.acct_party_names.append(name) self.acct_party_defaults.append(default) self.acct_party_types.append(v_type) self.acct_party_name2idx[name] = idx if d_type is None: continue if d_type == "mapping_id": self.acct_party_mapping_idx = idx elif d_type == "account_id": self.acct_party_acct_idx = idx elif d_type == "party_id": self.acct_party_party_idx = idx # Party-Party list for idx, col in enumerate(party_party_data): name = col["name"] v_type = col.get("valueType", "string") d_type = col.get("dataType") default = col.get("defaultValue", "") self.party_party_names.append(name) self.party_party_defaults.append(default) self.party_party_types.append(v_type) self.party_party_name2idx[name] = idx if d_type is None: continue if d_type == "ref_id": self.party_party_ref_idx = idx elif d_type == "first_id": self.party_party_first_idx = idx elif d_type == "second_id": self.party_party_second_idx = idx def days2date(self, _days): """Get date as ISO 8601 format from days from the "base_date". If failed, return an empty string. :param _days: Days from the "base_date" :return: Date as ISO 8601 format """ try: num_days = int(_days) except ValueError: return "" dt = self._base_date + datetime.timedelta(num_days) return dt.isoformat() + "Z" # UTC def get_tx_row(self, _tx_id, _timestamp, _amount, _tx_type, _orig, _dest, _is_sar, _alert_id, **attr): row = list(self.tx_defaults) row[self.tx_id_idx] = _tx_id row[self.tx_time_idx] = _timestamp row[self.tx_amount_idx] = _amount row[self.tx_type_idx] = _tx_type row[self.tx_orig_idx] = _orig row[self.tx_dest_idx] = _dest row[self.tx_sar_idx] = _is_sar row[self.tx_alert_idx] = _alert_id for name, value in attr.items(): if name in self.tx_name2idx: idx = self.tx_name2idx[name] row[idx] = value for idx, v_type in enumerate(self.tx_types): if v_type == "date": row[idx] = self.days2date(row[idx]) # convert days to date return row def get_alert_acct_row(self, _alert_id, _reason, _acct_id, _acct_name, _is_sar, _model_id, _schedule_id, _bank_id, **attr): row = list(self.alert_acct_defaults) row[self.alert_acct_alert_idx] = _alert_id row[self.alert_acct_reason_idx] = _reason row[self.alert_acct_id_idx] = _acct_id row[self.alert_acct_name_idx] = _acct_name row[self.alert_acct_sar_idx] = _is_sar row[self.alert_acct_model_idx] = _model_id row[self.alert_acct_schedule_idx] = _schedule_id row[self.alert_acct_bank_idx] = _bank_id for name, value in attr.items(): if name in self.alert_acct_name2idx: idx = self.alert_acct_name2idx[name] row[idx] = value for idx, v_type in enumerate(self.alert_acct_types): if v_type == "date": row[idx] = self.days2date(row[idx]) # convert days to date return row def get_alert_tx_row(self, _alert_id, _alert_type, _is_sar, _tx_id, _orig, _dest, _tx_type, _amount, _timestamp, **attr): row = list(self.alert_tx_defaults) row[self.alert_tx_id_idx] = _alert_id row[self.alert_tx_type_idx] = _alert_type row[self.alert_tx_sar_idx] = _is_sar row[self.alert_tx_idx] = _tx_id row[self.alert_tx_orig_idx] = _orig row[self.alert_tx_dest_idx] = _dest row[self.alert_tx_tx_type_idx] = _tx_type row[self.alert_tx_amount_idx] = _amount row[self.alert_tx_time_idx] = _timestamp for name, value in attr.items(): if name in self.alert_tx_name2idx: idx = self.alert_tx_name2idx[name] row[idx] = value for idx, v_type in enumerate(self.alert_tx_types): if v_type == "date": row[idx] = self.days2date(row[idx]) # convert days to date return row def get_party_ind_row(self, _party_id, **attr): row = list(self.party_ind_defaults) row[self.party_ind_id_idx] = _party_id for name, value in attr.items(): if name in self.party_ind_name2idx: idx = self.party_ind_name2idx[name] row[idx] = value for idx, v_type in enumerate(self.party_ind_types): if v_type == "date": row[idx] = self.days2date(row[idx]) # convert days to date return row def get_party_org_row(self, _party_id, **attr): row = list(self.party_org_defaults) row[self.party_org_id_idx] = _party_id for name, value in attr.items(): if name in self.party_org_name2idx: idx = self.party_org_name2idx[name] row[idx] = value for idx, v_type in enumerate(self.party_org_types): if v_type == "date": row[idx] = self.days2date(row[idx]) # convert days to date return row def get_acct_party_row(self, _mapping_id, _acct_id, _party_id, **attr): row = list(self.acct_party_defaults) row[self.acct_party_mapping_idx] = _mapping_id row[self.acct_party_acct_idx] = _acct_id row[self.acct_party_party_idx] = _party_id for name, value in attr.items(): if name in self.acct_party_name2idx: idx = self.acct_party_name2idx[name] row[idx] = value for idx, v_type in enumerate(self.acct_party_types): if v_type == "date": row[idx] = self.days2date(row[idx]) # convert days to date return row def get_party_party_row(self, _ref_id, _first_id, _second_id, **attr): row = list(self.party_party_defaults) row[self.party_party_ref_idx] = _ref_id row[self.party_party_first_idx] = _first_id row[self.party_party_second_idx] = _second_id for name, value in attr.items(): if name in self.party_party_name2idx: idx = self.party_party_name2idx[name] row[idx] = value for idx, v_type in enumerate(self.party_party_types): if v_type == "date": row[idx] = self.days2date(row[idx]) # convert days to date return row class LogConverter: def __init__(self, conf, sim_name=None, fake=None): self.reports = dict() # SAR ID and transaction subgraph self.org_types = dict() # ID, organization type self.fake = fake general_conf = conf.get('general', {}) input_conf = conf.get('temporal', {}) # Input directory of this converter is temporal directory output_conf = conf.get('output', {}) # self.sim_name = os.getenv("SIMULATION_NAME") # if self.sim_name is None: # self.sim_name = general_conf["simulation_name"] self.sim_name = sim_name if sim_name is not None else general_conf.get("simulation_name", "sample") print("Simulation name:", self.sim_name) self.input_dir = os.path.join(input_conf.get('directory', ''), self.sim_name) self.work_dir = os.path.join(output_conf.get('directory', ''), self.sim_name) if not os.path.isdir(self.work_dir): os.makedirs(self.work_dir) param_dir = conf.get('input', {}).get('directory', '') schema_file = conf.get('input', {}).get('schema', '') base_date_str = general_conf.get('base_date', '2017-01-01') base_date = parse(base_date_str) json_file = os.path.join(param_dir, schema_file) with open(json_file, "r") as rf: data = json.load(rf) self.schema = Schema(data, base_date) # Input files self.log_file = os.path.join(self.work_dir, output_conf["transaction_log"]) self.in_acct_file = input_conf["accounts"] # Account list file from the transaction graph generator self.group_file = input_conf["alert_members"] # Alert account list file from the transaction graph generator # Output files self.out_acct_file = output_conf["accounts"] # All account list file self.tx_file = output_conf["transactions"] # All transaction list file self.cash_tx_file = output_conf["cash_transactions"] # Cash transaction list file self.sar_acct_file = output_conf["sar_accounts"] # SAR account list file self.alert_tx_file = output_conf["alert_transactions"] # Alert transaction list file self.alert_acct_file = output_conf["alert_members"] # Alert account list file self.party_individual_file = output_conf["party_individuals"] self.party_organization_file = output_conf["party_organizations"] self.account_mapping_file = output_conf["account_mapping"] self.resolved_entities_file = output_conf["resolved_entities"] # Copy the diameter CSV file if it exists dia_log = output_conf["diameter_log"] src_dia_path = os.path.join(self.input_dir, dia_log) dst_dia_path = os.path.join(self.work_dir, dia_log) if os.path.exists(src_dia_path): shutil.copy(src_dia_path, dst_dia_path) def convert_acct_tx(self): print("Convert transaction list from %s to %s, %s and %s" % ( self.log_file, self.tx_file, self.cash_tx_file, self.alert_tx_file)) in_acct_f = open(os.path.join(self.input_dir, self.in_acct_file), "r") # Input account file in_tx_f = open(self.log_file, "r") # Transaction log file from the Java simulator out_acct_f = open(os.path.join(self.work_dir, self.out_acct_file), "w") # Output account file out_tx_f = open(os.path.join(self.work_dir, self.tx_file), "w") # Output transaction file out_cash_tx_f = open(os.path.join(self.work_dir, self.cash_tx_file), "w") # Output cash transaction file out_alert_tx_f = open(os.path.join(self.work_dir, self.alert_tx_file), "w") # Output alert transaction file out_ind_f = open(os.path.join(self.work_dir, self.party_individual_file), "w") # Party individuals out_org_f = open(os.path.join(self.work_dir, self.party_organization_file), "w") # Party organizations out_map_f = open(os.path.join(self.work_dir, self.account_mapping_file), "w") # Account mappings out_ent_f = open(os.path.join(self.work_dir, self.resolved_entities_file), "w") # Resolved entities # Load account list reader = csv.reader(in_acct_f) acct_writer = csv.writer(out_acct_f) acct_writer.writerow(self.schema.acct_names) # write header ind_writer = csv.writer(out_ind_f) ind_writer.writerow(self.schema.party_ind_names) org_writer = csv.writer(out_org_f) org_writer.writerow(self.schema.party_org_names) map_writer = csv.writer(out_map_f) map_writer.writerow(self.schema.acct_party_names) ent_writer = csv.writer(out_ent_f) ent_writer.writerow(self.schema.party_party_names) header = next(reader) mapping_id = 1 # Mapping ID for account-alert list lookup = AccountDataTypeLookup() us_gen = self.fake['en_US'] for row in reader: output_row = list(self.schema.acct_defaults) acct_type = "" acct_id = "" gender = np.random.choice(['Male', 'Female'], p=[0.5, 0.5]) good_address = False while good_address == False: address = us_gen.address() split1 = address.split('\n') street_address = split1[0] split2 = split1[1].split(', ') if len(split2) == 2: good_address = True city = split2[0] split3 = split2[1].split(' ') state = split3[0] postcode = split3[1] for output_index, output_item in enumerate(self.schema.data['account']): if 'dataType' in output_item: output_type = output_item['dataType'] input_type = lookup.inputType(output_type) try: input_index = header.index(input_type) except ValueError: continue if output_type == "start_time": try: start = int(row[input_index]) if start >= 0: output_row[output_index] = start except ValueError: # If failed, keep the default value pass elif output_type == "end_time": try: end = int(row[input_index]) if end > 0: output_row[output_index] = end except ValueError: # If failed, keep the default value pass elif output_type == "account_id": acct_id = row[input_index] output_row[output_index] = acct_id elif output_type == "account_type": acct_type = row[input_index] output_row[output_index] = acct_type else: output_row[output_index] = row[input_index] if 'valueType' in output_item: if output_item['valueType'] == 'date': output_row[output_index] = self.schema.days2date(output_row[output_index]) if 'name' in output_item: if output_item['name'] == 'first_name': output_row[output_index] = us_gen.first_name_male() if gender == "Male" else us_gen.first_name_female() elif output_item['name'] == 'last_name': output_row[output_index] = us_gen.last_name_male() if gender == "Male" else us_gen.last_name_female() elif output_item['name'] == 'street_addr': output_row[output_index] = street_address elif output_item['name'] == 'city': output_row[output_index] = city elif output_item['name'] == 'state': output_row[output_index] = state elif output_item['name'] == 'country': output_row[output_index] = "US" elif output_item['name'] == 'zip': output_row[output_index] = postcode elif output_item['name'] == 'gender': output_row[output_index] = gender elif output_item['name'] == 'birth_date': output_row[output_index] = us_gen.date_of_birth() elif output_item['name'] == 'ssn': output_row[output_index] = us_gen.ssn() elif output_item['name'] == 'lat': output_row[output_index] = us_gen.latitude() elif output_item['name'] == 'lon': output_row[output_index] = us_gen.longitude() acct_writer.writerow(output_row) self.org_types[int(acct_id)] = acct_type # Write a party row per account is_individual = random() >= 0.5 # 50%: individual, 50%: organization party_id = str(acct_id) if is_individual: # Individual output_row = self.schema.get_party_ind_row(party_id) ind_writer.writerow(output_row) else: output_row = self.schema.get_party_org_row(party_id) org_writer.writerow(output_row) # Write account-party mapping row output_row = self.schema.get_acct_party_row(mapping_id, acct_id, party_id) map_writer.writerow(output_row) mapping_id += 1 in_acct_f.close() out_ind_f.close() out_org_f.close() out_map_f.close() out_ent_f.close() # Avoid duplicated transaction CSV rows in the log file tx_set = set() cash_tx_set = set() # Load transaction log from the Java simulator reader = csv.reader(in_tx_f) tx_writer = csv.writer(out_tx_f) cash_tx_writer = csv.writer(out_cash_tx_f) alert_tx_writer = csv.writer(out_alert_tx_f) header = next(reader) indices = {name: index for index, name in enumerate(header)} num_columns = len(header) tx_header = self.schema.tx_names alert_header = self.schema.alert_tx_names tx_writer.writerow(tx_header) cash_tx_writer.writerow(tx_header) alert_tx_writer.writerow(alert_header) step_idx = indices["step"] amt_idx = indices["amount"] orig_idx = indices["nameOrig"] dest_idx = indices["nameDest"] sar_idx = indices["isSAR"] alert_idx = indices["alertID"] type_idx = indices["type"] tx_id = 1 for row in reader: if len(row) < num_columns: continue try: days = int(row[step_idx]) date_str = str(days) # days_to_date(days) amount = row[amt_idx] # transaction amount orig_id = row[orig_idx] # originator ID dest_id = row[dest_idx] # beneficiary ID sar_id = int(row[sar_idx]) # SAR transaction index alert_id = int(row[alert_idx]) # Alert ID is_sar = sar_id > 0 is_alert = alert_id >= 0 ttype = row[type_idx] except ValueError: continue attr = {name: row[index] for name, index in indices.items()} if ttype in CASH_TYPES: # Cash transactions cash_tx = (orig_id, dest_id, ttype, amount, date_str) if cash_tx not in cash_tx_set: cash_tx_set.add(cash_tx) output_row = self.schema.get_tx_row(tx_id, date_str, amount, ttype, orig_id, dest_id, is_sar, alert_id, **attr) cash_tx_writer.writerow(output_row) else: # Account-to-account transactions including alert transactions tx = (orig_id, dest_id, ttype, amount, date_str) if tx not in tx_set: output_row = self.schema.get_tx_row(tx_id, date_str, amount, ttype, orig_id, dest_id, is_sar, alert_id, **attr) tx_writer.writerow(output_row) tx_set.add(tx) if is_alert: # Alert transactions alert_type = self.reports.get(alert_id).get_reason() alert_row = self.schema.get_alert_tx_row(alert_id, alert_type, is_sar, tx_id, orig_id, dest_id, ttype, amount, date_str, **attr) alert_tx_writer.writerow(alert_row) if tx_id % 1000000 == 0: print("Converted %d transactions." % tx_id) tx_id += 1 in_tx_f.close() out_tx_f.close() out_cash_tx_f.close() out_alert_tx_f.close() # Count degrees (fan-in/out patterns) deg_param = os.getenv("DEGREE") if deg_param: max_threshold = int(deg_param) pred = defaultdict(set) # Account, Predecessors succ = defaultdict(set) # Account, Successors for orig, dest, _, _, _ in tx_set: pred[dest].add(orig) succ[orig].add(dest) in_degrees = [len(nbs) for nbs in pred.values()] out_degrees = [len(nbs) for nbs in succ.values()] in_deg = Counter(in_degrees) out_deg = Counter(out_degrees) for th in range(2, max_threshold+1): num_fan_in = sum([c for d, c in in_deg.items() if d >= th]) num_fan_out = sum([c for d, c in out_deg.items() if d >= th]) print("Number of fan-in / fan-out patterns with", th, "neighbors", num_fan_in, "/", num_fan_out) def convert_alert_members(self): input_file = self.group_file output_file = self.alert_acct_file print("Load alert groups: %s" % input_file) rf = open(os.path.join(self.input_dir, input_file), "r") wf = open(os.path.join(self.work_dir, output_file), "w") reader = csv.reader(rf) header = next(reader) indices = {name: index for index, name in enumerate(header)} writer = csv.writer(wf) header = self.schema.alert_acct_names writer.writerow(header) for row in reader: reason = row[indices["reason"]] alert_id = int(row[indices["alertID"]]) account_id = int(row[indices["accountID"]]) is_sar = row[indices["isSAR"]].lower() == "true" model_id = row[indices["modelID"]] schedule_id = row[indices["scheduleID"]] bank_id = row[indices["bankID"]] if alert_id not in self.reports: self.reports[alert_id] = AMLTypology(reason) self.reports[alert_id].add_member(account_id, is_sar) attr = {name: row[index] for name, index in indices.items()} output_row = self.schema.get_alert_acct_row(alert_id, reason, account_id, account_id, is_sar, model_id, schedule_id, bank_id, **attr) writer.writerow(output_row) def output_sar_cases(self): """Extract SAR account list involved in alert transactions from transaction log file """ input_file = self.log_file output_file = os.path.join(self.work_dir, self.sar_acct_file) print("Convert SAR typologies from %s to %s" % (input_file, output_file)) with open(input_file, "r") as rf: reader = csv.reader(rf) alerts = self.sar_accounts(reader) with open(output_file, "w") as wf: writer = csv.writer(wf) self.write_sar_accounts(writer, alerts) def sar_accounts(self, reader): header = next(reader) indices = {name: index for index, name in enumerate(header)} columns = len(header) tx_id = 0 for row in reader: if len(row) < columns: continue try: days = int(row[indices["step"]]) amount = float(row[indices["amount"]]) orig = int(row[indices["nameOrig"]]) dest = int(row[indices["nameDest"]]) alert_id = int(row[indices["alertID"]]) orig_name = "C_%d" % orig dest_name = "C_%d" % dest except ValueError: continue if alert_id >= 0 and alert_id in self.reports: # SAR transactions attr = {name: row[index] for name, index in indices.items()} self.reports[alert_id].add_tx(tx_id, amount, days, orig, dest, orig_name, dest_name, attr) tx_id += 1 sar_accounts = list() count = 0 num_reports = len(self.reports) for sar_id, typology in self.reports.items(): if typology.count == 0: continue reason = typology.get_reason() is_sar = "YES" if typology.is_sar else "NO" # SAR or false alert for key, transaction in typology.transactions.items(): amount, step, orig_acct, dest_acct, orig_name, dest_name, attr = transaction if (self.account_recorded(orig_acct) and self.account_recorded(dest_acct)): continue if (not self.account_recorded(orig_acct)): acct_id = orig_acct cust_id = orig_name typology.recorded_members.add(acct_id) sar_accounts.append((sar_id, acct_id, cust_id, days_to_date(step), reason, self.org_type(acct_id), is_sar)) if (not self.account_recorded(dest_acct)): acct_id = dest_acct cust_id = dest_name typology.recorded_members.add(acct_id) sar_accounts.append((sar_id, acct_id, cust_id, days_to_date(step), reason, self.org_type(acct_id), is_sar)) count += 1 if count % 100 == 0: print("SAR Typologies: %d/%d" % (count, num_reports)) return sar_accounts def org_type(self, acct_id): return "INDIVIDUAL" if self.org_types[acct_id] == "I" else "COMPANY" def write_sar_accounts(self, writer, sar_accounts): writer.writerow( ["ALERT_ID", "ACCOUNT_ID", "CUSTOMER_ID", "EVENT_DATE", "ALERT_TYPE", "ACCOUNT_TYPE", "IS_SAR"]) for alert in sar_accounts: writer.writerow(alert) def account_recorded(self, acct_id): for sar_id, typology in self.reports.items(): if acct_id in typology.recorded_members: return True return False if __name__ == "__main__": argv = sys.argv if len(argv) < 2: print("Usage: python3 %s [ConfJSON]" % argv[0]) exit(1) _conf_json = argv[1] _sim_name = argv[2] if len(argv) >= 3 else None with open(_conf_json, "r") as rf: conf = json.load(rf) converter = LogConverter(conf, _sim_name) fake = Faker(['en_US']) Faker.seed(0) converter = LogConverter(conf, _sim_name, fake) converter.convert_alert_members() converter.convert_acct_tx() converter.output_sar_cases()