This was a timed take-home assignment involving the cleanup and sorting of some health data. No PHI was ever given to me: all the data here is fake. The company is called Newton in this assignment.
I developed this in VS Code and some of the logging functionality I brought in here doesn't look as pretty.
I uncovered several data issues and fixed most of them, either using the explicit instructions or inferring what should be done.
I also started working with a test.py file that I didn't finish. It was helpful to collect the requirements however.
The solutions are predominantly done in functions which are mostly labelled as to what task they are doing.
if I had more time I would clean up some variable names and maybe split out the main function path into sub functions.
I see some floating point rounding issues in the output. Rounding would fix that.
I also see that some values in my answer are suspiciously different from the example file. It's not stated that the values should conform exactly, but they look like they would conform if they were + or - one multiple of the drug. ex. each row has 10, my answer has 30, the example has 40.
I did not investigate this for time reasons, but perhaps I misinterpreted how REVERSALs should be dealt with.
As always, performance can be improved. Readability and speed of development was prioritized.
import pdb
import os
import pandas as pd
import logging
import coloredlogs
import re
import datetime
import json
from json import dumps as jd
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__name__)
coloredlogs.install(level="DEBUG")
sd_patient = "2020-07-01" # July 1st, 2020
sd_fill = "2020-01-01"
ed_fill = "2020-06-30"
def load_csv(fname):
data = pd.read_csv(fname)
return data
def merge_dfs(dfs):
newdf = pd.concat(dfs)
return newdf
def rowcount_dfs(dfs):
total_rows = 0
for df in dfs:
total_rows = total_rows + len(df.index)
return total_rows
def rowcount_df(df):
return len(df.index)
def ndc11_to_9(n11):
# To convert from ndc_11 to ndc_9 just drop the
# last two digits (e.g. ndc_11: ‘12345-6789-01’ becomes ndc_9: ‘12345-6789’)
# can we assume all ndc11s are in a standard format?
n11 = str(n11)
nn = 9
final = n11[:-2].zfill(nn)
return str(final)
def sort_enrol(df, sort_cols):
newdf = df.sort_values(by=sort_cols, inplace=False)
return newdf
def drop_dupes(df, unique_cols):
"""unique_cols = the cols on which we consider a dupe"""
# keep last bc it's the newest row (df was constructed with newer data last)
newdf = df.drop_duplicates(subset=unique_cols, keep="last")
return newdf
def remove_unenrolled(df, sd):
yy, mm, dd = sd.split("-")
sd = datetime.datetime(int(yy), int(mm), int(dd))
df = df[df["enrollment_start_date"].le(sd)]
df = df[df["enrollment_end_date"].ge(sd)]
return df
def how_many_patients(df):
# not clear in the problem statement, using my best judgment
unique_cols = [
"card_id",
"birth_date",
"gender", # what about transgender people?
"last_name",
"first_name",
]
df = drop_dupes(df, unique_cols)
new_cols = ["enrollment_end_date", "enrollment_start_date"]
sort_cols = unique_cols
for col in new_cols:
sort_cols.append(col)
df = sort_enrol(df, sort_cols)
df = remove_unenrolled(df, sd_patient)
return len(df.index) # unique PATIENTS not rows (we're good)
def printna(df, col):
print(df[df[col].isna()])
def make_earlier_year(ss, new_year=2030):
"""bring any year-like substr in ss to 2030 if the year is newer.
Gets around Pandas limitation."""
year_re = r"[0-9]{4}"
year = int(re.findall(year_re, ss)[0])
new_str = ss
if year > new_year:
# log.warning("Bringing huge year {} to 2030 for pandas limitation".format(year))
new_year = str(new_year)
new_str = re.sub(year_re, new_year, ss, count=1)
new_str = str(new_str)
if len(re.findall(year_re, ss)) > 1:
log.warning("too many RE matches")
return new_str
def special_clean_date(df, colname):
df[colname] = df[colname].apply(make_earlier_year)
df[colname] = pd.to_datetime(df[colname])
return df
def ordered_read_claims(files):
"""read claims in order, oldest to newest
(the files should already be sorted).
later claims can undo earlier claims,
so only process when all are loaded."""
dfs = []
for file in files:
df = load_csv(file)
dfs.append(df)
printna(df, "claim_status")
return dfs
def get_claim_files(mypath="pharmacy_claims"):
ff = []
for root, dirs, files in os.walk(mypath, topdown=False):
for name in files:
ff.append(os.path.join(root, name))
for name in dirs:
pass
ff.sort()
return ff
def prep_claims():
# one row is NA with ID 4767622 in the march dataset
# not in paid, reversal therefor it gets removed
ff = get_claim_files()
claims = ordered_read_claims(ff)
return claims
def merge_claims():
dfs = prep_claims()
claims = merge_dfs(dfs)
# claims.reset_index(inplace=True)
return claims
def rm_denied(df):
# DENIED: should be completely/removed from subsequent steps
df = df[df["claim_status"].ne("DENIED")]
return df
def solve_reversals(df):
idcols = [
"card_id",
"claim_number",
"fill_date",
"ndc_11",
]
"""df: a grouped DF grouped by id cols"""
if len(df.index) <= 1:
return df
else:
mytypes = set(df["claim_status"])
if "REVERSAL" not in mytypes:
return df
else:
new_allowed = df["allowed_amount"].sum()
new_days = df["days_supply"].sum()
if new_days <= 0:
# TODO verify that this creates a proper df with
# no side effects
# can also do this
# https://stackoverflow.com/questions/13851535/how-to-delete-rows-from-a-pandas-dataframe-based-on-a-conditional-expression
return None
else:
df["days_supply"] = new_days
df[
"allowed_amount"
] = new_allowed # not sure what to do here, in cases with 2 PAID and 1 REVERSAL
return df
def rm_reversals(df, idcols):
grouped = df.groupby(idcols)
ss = grouped.apply(solve_reversals).reset_index(drop=True)
return ss
def rm_nan(df):
return df # TODO, affects one row
def rm_days(df):
return df[df["days_supply"] > 0]
def clean_claims(df):
idcols = [
"card_id",
"claim_number",
"fill_date",
"ndc_11",
] # are sufficient to identify an unique fill.
df = rm_nan(df)
df = rm_denied(df)
df = rm_reversals(df, idcols)
df = rm_days(df)
df = df.reset_index(drop=True)
return df
def clean_card(ss):
ss = str(ss)
max_digits = 11
news = ss.zfill(max_digits)
news = "ID" + news
return news
def fix_card_id(df):
df["old_card_id"] = df["card_id"]
df["card_id"] = df["card_id"].apply(clean_card)
return df
def filter_to_date(df):
# sd_fill = "2020-01-01"
# ed_fill = "2020-06-30"
# return df[df["days_supply"] > 0]
print(rowcount_df(df))
yy, mm, dd = sd_fill.split("-")
nsd = datetime.datetime(int(yy), int(mm), int(dd))
df = df[df["fill_date"] >= nsd]
# df[df["fill_date"] >= pd.to_datetime(sd_fill)]
print(rowcount_df(df))
yy, mm, dd = ed_fill.split("-")
ned = datetime.datetime(int(yy), int(mm), int(dd))
df = df[df["fill_date"] <= ned]
print(rowcount_df(df))
return df
def fix_ndc11(ss):
# not documented, assuming we want this
return str(ss).zfill(11)
def clean_date(df, colname):
df[colname] = pd.to_datetime(df[colname])
return df
def clean_ndc9(ss):
ss = str(ss)
ss = ss.replace("-", "")
return ss
def pretty_print_json(jj):
print(jd(jj, indent=4))
def agg_drug(df):
"""return one row per genericname combo"""
df["total_days_supply"] = df["days_supply"].sum()
df["total_amt_allowed"] = df["allowed_amount"].sum()
gname = df[["genericName"]].iloc[0, 0]
mydict = {
gname: {
"allowed_amount": str(df["allowed_amount"].sum()),
"days_supply": str(df["days_supply"].sum()),
}
}
df["med_summary"] = json.dumps(mydict)
return df.tail(1)
def agg_patient(df):
"""return one row per patient+genericname combo"""
df["med_summary"] = "asdf" # will this help?
grouped = df.groupby("genericName")
df = grouped.apply(agg_drug).reset_index(drop=True)
fname = df["first_name"].head(1)[0]
lname = df["last_name"].head(1)[0]
patid = df["card_id"].head(1)[0]
dall = {}
for d in df["med_summary"].values:
d = json.loads(d)
dall.update(d)
dout = {"first_name": fname, "last_name": lname, "med_summary": dall}
myfilename = "patient_" + patid + ".json"
cwd = os.getcwd()
out_path = os.path.join(cwd, "results", myfilename)
out_str = json.dumps(dout, indent=4)
with open(out_path, "w") as ff:
ff.write(out_str)
return df
enrol = pd.read_csv("enrollment.csv")
date_cols = [
"birth_date",
"enrollment_end_date",
"enrollment_start_date",
]
# easier debugging in a for loop, but could apply all at once
for col in date_cols:
enrol = special_clean_date(enrol, col)
# enrol.reset_index(inplace=True)
# enrol.set_index("card_id", verify_integrity=True)
claims = merge_claims()
npat = how_many_patients(enrol)
log.info("{} patients enrolled as of {}".format(npat, sd_patient))
2022-10-22 11:47:06 MULTIVAC __main__[52052] INFO 1566 patients enrolled as of 2020-07-01
allowed_amount card_id claim_line_number claim_status \ 9150 13.63 32104767622 1 NaN days_supply fill_date maintenance_drug_flag medication_name \ 9150 30 2019-03-01 Y CELECOXIB CA ndc_11 paid_date ... quantity_dispensed refill_number \ 9150 42571014405 2019-04-01 ... 30.0 3 retail_mail_flag run_date rxtype specialty_drug_flag \ 9150 Y 2020-03-06 BRAND NaN strength_units strength_value uploadDate claim_number 9150 CAP 200MG NaN 2020-03-11 95947229895009245473 [1 rows x 26 columns] Empty DataFrame Columns: [allowed_amount, card_id, claim_line_number, claim_status, days_supply, fill_date, maintenance_drug_flag, medication_name, ndc_11, paid_date, pharmacy_id, pharmacy_name, pharmacy_npi, pharmacy_tax_id, prescriber_id, prescriber_npi, quantity_dispensed, refill_number, retail_mail_flag, run_date, rxtype, specialty_drug_flag, strength_units, strength_value, uploadDate, claim_number] Index: [] [0 rows x 26 columns] Empty DataFrame Columns: [allowed_amount, card_id, claim_line_number, claim_status, days_supply, fill_date, maintenance_drug_flag, medication_name, ndc_11, paid_date, pharmacy_id, pharmacy_name, pharmacy_npi, pharmacy_tax_id, prescriber_id, prescriber_npi, quantity_dispensed, refill_number, retail_mail_flag, run_date, rxtype, specialty_drug_flag, strength_units, strength_value, uploadDate, claim_number] Index: [] [0 rows x 26 columns] Empty DataFrame Columns: [allowed_amount, card_id, claim_line_number, claim_status, days_supply, fill_date, maintenance_drug_flag, medication_name, ndc_11, paid_date, pharmacy_id, pharmacy_name, pharmacy_npi, pharmacy_tax_id, prescriber_id, prescriber_npi, quantity_dispensed, refill_number, retail_mail_flag, run_date, rxtype, specialty_drug_flag, strength_units, strength_value, uploadDate, claim_number] Index: [] [0 rows x 26 columns] Empty DataFrame Columns: [allowed_amount, card_id, claim_line_number, claim_status, days_supply, fill_date, maintenance_drug_flag, medication_name, ndc_11, paid_date, pharmacy_id, pharmacy_name, pharmacy_npi, pharmacy_tax_id, prescriber_id, prescriber_npi, quantity_dispensed, refill_number, retail_mail_flag, run_date, rxtype, specialty_drug_flag, strength_units, strength_value, uploadDate, claim_number] Index: [] [0 rows x 26 columns]
nclaims = rowcount_df(claims)
log.info("{} claims in the initial pharmacy claims set.".format(nclaims))
2022-10-22 11:47:06 MULTIVAC __main__[52052] INFO 11524 claims in the initial pharmacy claims set.
clean = clean_claims(claims)
nclean = rowcount_df(clean)
log.info("{} prepared claims after step 3.".format(nclean))
2022-10-22 11:47:12 MULTIVAC __main__[52052] INFO 4042 prepared claims after step 3.
n1 = pd.read_json("ndc9_lookup.json")
ndc = n1.transpose()
ndc["ndc9"] = ndc["ndc9"].apply(clean_ndc9)
clean = fix_card_id(clean)
ccols = ["fill_date"]
for mycol in ccols:
clean_date(clean, mycol)
jj = pd.merge(enrol, clean, on="card_id", how="inner")
for mycol in ccols:
# do it again!
clean_date(clean, mycol)
jj = filter_to_date(jj)
jj["ndc_11"] = jj["ndc_11"].apply(fix_ndc11)
jj["ndc9"] = jj["ndc_11"].apply(ndc11_to_9)
dd = pd.merge(jj, ndc, on="ndc9", how="left")
unique_cols = [
"card_id",
"birth_date",
"gender", # what about transgender people?
"last_name",
"first_name",
]
grouped = dd.groupby(unique_cols)
dd = grouped.apply(agg_patient)
ddcols = [
"card_id",
"first_name",
"last_name",
"genericName",
"total_days_supply",
"total_amt_allowed"
]
log.info("Step 4: total days suply and amt allowed")
dd[ddcols]
C:\Users\Andy\AppData\Local\Temp\ipykernel_52052\4225800818.py:295: FutureWarning: Not prepending group keys to the result index of transform-like apply. In the future, the group keys will be included in the index, regardless of whether the applied function returns a like-indexed object. To preserve the previous behavior, use >>> .groupby(..., group_keys=False) To adopt the future behavior and silence this warning, use >>> .groupby(..., group_keys=True) df = grouped.apply(agg_drug).reset_index(drop=True)
1098 176 176
2022-10-22 11:47:12 MULTIVAC __main__[52052] INFO Step 4: total days suply and amt allowed
card_id | first_name | last_name | genericName | total_days_supply | total_amt_allowed | ||||||
---|---|---|---|---|---|---|---|---|---|---|---|
card_id | birth_date | gender | last_name | first_name | |||||||
ID00081368696 | 1953-02-12 | M | Rittenhouse | David | 0 | ID00081368696 | David | Rittenhouse | glyburide-metformin | 90 | 19.02 |
1 | ID00081368696 | David | Rittenhouse | spironolactone | 60 | 17.86 | |||||
ID09174309881 | 1973-10-23 | F | Ross | Betsy | 0 | ID09174309881 | Betsy | Ross | insulin glargine | 30 | 79.71 |
1 | ID09174309881 | Betsy | Ross | metformin hcl | 90 | 16.97 | |||||
ID27266531594 | 1987-07-14 | F | Kahlo | Frida | 0 | ID27266531594 | Frida | Kahlo | amlodipine besylate | 180 | 0.00 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
ID79288754268 | 1989-06-23 | M | Hayes | Ira | 0 | ID79288754268 | Ira | Hayes | benzoyl peroxide-clindamycin | 40 | 486.16 |
1 | ID79288754268 | Ira | Hayes | betamethasone-clotrimazole | 14 | 13.80 | |||||
2 | ID79288754268 | Ira | Hayes | clobetasol propionate | 20 | 43.57 | |||||
3 | ID79288754268 | Ira | Hayes | diclofenac sodium | 150 | 218.07 | |||||
4 | ID79288754268 | Ira | Hayes | ergocalciferol | 339 | 24.16 |
68 rows × 6 columns
max_df = dd[dd["total_amt_allowed"] == dd["total_amt_allowed"].max()]
log.info("Step 4: this patient has the max amt allowed:")
max_df[ddcols]
2022-10-22 11:47:12 MULTIVAC __main__[52052] INFO Step 4: this patient has the max amt allowed:
card_id | first_name | last_name | genericName | total_days_supply | total_amt_allowed | ||||||
---|---|---|---|---|---|---|---|---|---|---|---|
card_id | birth_date | gender | last_name | first_name | |||||||
ID47840552286 | 2002-06-29 | F | Onassis | Jacqueline | 2 | ID47840552286 | Jacqueline | Onassis | erenumab | 150 | 2983.26 |
lindrugs = len(
dd[dd["last_name"] == "Lincoln"][dd["first_name"] == "Abe"]["genericName"].index
)
log.info("Q5: Lincoln has {} unique drugs".format(lindrugs))
# return npat, nclaims, claims, clean
C:\Users\Andy\AppData\Local\Temp\ipykernel_52052\3731674243.py:2: UserWarning: Boolean Series key will be reindexed to match DataFrame index. dd[dd["last_name"] == "Lincoln"][dd["first_name"] == "Abe"]["genericName"].index 2022-10-22 11:47:12 MULTIVAC __main__[52052] INFO Q5: Lincoln has 4 unique drugs