update fixtures
This commit is contained in:
parent
02d1329746
commit
2f569986bc
19 changed files with 6157 additions and 2509 deletions
410
csv/data.py
410
csv/data.py
|
|
@ -1,58 +1,374 @@
|
|||
"""
|
||||
Converts ERPNext Data Export CSVs into import-ready CSVs using template headers.
|
||||
|
||||
Handles both simple doctypes (Item Price) and doctypes with child tables (Item).
|
||||
Reads the template file (from production) to determine the exact columns needed,
|
||||
then maps data from the export file (from staging) into those columns.
|
||||
|
||||
Usage:
|
||||
python data.py
|
||||
|
||||
Reads:
|
||||
- Item.csv (export from staging)
|
||||
- Item-template.csv (template from production)
|
||||
- Item Price.csv (export from staging)
|
||||
- Item Price-template.csv (template from production)
|
||||
|
||||
Writes:
|
||||
- Item-import-ready.csv
|
||||
- Item Price-import-ready.csv
|
||||
- BOM-import-ready.csv
|
||||
- Item Group-import-ready.csv
|
||||
"""
|
||||
|
||||
import csv
|
||||
import re
|
||||
import os
|
||||
from collections import defaultdict
|
||||
|
||||
# FILES
|
||||
export_file = "Item Price.csv" # ERPNext export
|
||||
output_file = "Item Price-import-ready.csv" # clean import CSV
|
||||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
# Template columns (exact Column Name values for ERPNext import)
|
||||
template_columns = [
|
||||
"name","item_code","uom","price_list","price_list_rate","packing_unit",
|
||||
"item_name","brand","item_description","customer","supplier","batch_no",
|
||||
"buying","selling","currency","valid_from","lead_time_days","valid_upto",
|
||||
"note","reference"
|
||||
]
|
||||
|
||||
# Which row has the Column Name row in ERPNext export? Usually 20th (0-index 19)
|
||||
COLUMN_NAME_ROW = 19
|
||||
DATA_START_ROW = 21 # 0-indexed row where actual data starts
|
||||
|
||||
def clean_cell(cell):
|
||||
# Remove extra quotes around the data
|
||||
if cell.startswith('"""') and cell.endswith('"""'):
|
||||
return cell[3:-3]
|
||||
elif cell.startswith('"') and cell.endswith('"'):
|
||||
return cell[1:-1]
|
||||
"""Remove the extra wrapping quotes ERPNext puts around IDs and values."""
|
||||
cell = cell.strip()
|
||||
# ERPNext exports IDs as triple-quoted: """abc123""" -> "abc123" after csv reader
|
||||
while cell.startswith('"') and cell.endswith('"') and len(cell) >= 2:
|
||||
cell = cell[1:-1]
|
||||
return cell
|
||||
|
||||
# Read the export
|
||||
with open(export_file, newline='', encoding='utf-8') as f:
|
||||
reader = list(csv.reader(f))
|
||||
|
||||
export_columns = [clean_cell(c) for c in reader[COLUMN_NAME_ROW]]
|
||||
data_rows = reader[DATA_START_ROW-1:]
|
||||
|
||||
# Build column index map
|
||||
col_indexes = []
|
||||
for col in template_columns:
|
||||
if col in export_columns:
|
||||
col_indexes.append(export_columns.index(col))
|
||||
else:
|
||||
col_indexes.append(None) # fill missing columns with empty string
|
||||
# ---------------------------------------------------------------------------
|
||||
# Export parser
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Write clean CSV
|
||||
with open(output_file, "w", newline='', encoding='utf-8') as f_out:
|
||||
writer = csv.writer(f_out, quoting=csv.QUOTE_ALL)
|
||||
|
||||
# Header row: template
|
||||
writer.writerow(template_columns)
|
||||
|
||||
def parse_export(filepath):
|
||||
"""
|
||||
Parse an ERPNext Data Import/Export CSV.
|
||||
|
||||
Returns
|
||||
-------
|
||||
sections : list[dict]
|
||||
Each dict has 'labels' (Column Labels), 'names' (Column Names),
|
||||
and 'start_idx' (column index in the raw row).
|
||||
sections[0] = main doctype, sections[1:] = child tables.
|
||||
data_rows : list[list[str]]
|
||||
The raw data rows (index 0 of each row is the empty leading cell).
|
||||
"""
|
||||
with open(filepath, newline="", encoding="utf-8") as fh:
|
||||
rows = list(csv.reader(fh))
|
||||
|
||||
label_row_idx = name_row_idx = data_start = None
|
||||
for i, row in enumerate(rows):
|
||||
first = row[0].strip() if row else ""
|
||||
if first == "Column Labels:":
|
||||
label_row_idx = i
|
||||
elif first == "Column Name:":
|
||||
name_row_idx = i
|
||||
elif "Start entering data below this line" in first:
|
||||
data_start = i + 1
|
||||
|
||||
if label_row_idx is None or name_row_idx is None or data_start is None:
|
||||
raise ValueError(f"Cannot locate header / data rows in {filepath}")
|
||||
|
||||
labels = rows[label_row_idx]
|
||||
names = rows[name_row_idx]
|
||||
|
||||
# Split into sections using the Column Name row ('~' separator)
|
||||
sections = []
|
||||
cur_labels, cur_names, cur_start = [], [], 1 # skip col-0 prefix
|
||||
|
||||
for i in range(1, max(len(labels), len(names))):
|
||||
nm = names[i].strip() if i < len(names) else ""
|
||||
lbl = labels[i].strip() if i < len(labels) else ""
|
||||
|
||||
if nm == "~":
|
||||
if cur_labels:
|
||||
sections.append({
|
||||
"labels": cur_labels,
|
||||
"names": cur_names,
|
||||
"start_idx": cur_start,
|
||||
})
|
||||
cur_labels, cur_names, cur_start = [], [], i + 1
|
||||
else:
|
||||
cur_labels.append(lbl)
|
||||
cur_names.append(nm)
|
||||
|
||||
if cur_labels:
|
||||
sections.append({
|
||||
"labels": cur_labels,
|
||||
"names": cur_names,
|
||||
"start_idx": cur_start,
|
||||
})
|
||||
|
||||
return sections, rows[data_start:]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Template parser
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def parse_template(filepath):
|
||||
"""
|
||||
Read a production template CSV (single header row).
|
||||
|
||||
Returns
|
||||
-------
|
||||
header : list[str] – the exact column labels
|
||||
section_names : set[str] – child-table section names found via 'ID (…)'
|
||||
"""
|
||||
with open(filepath, newline="", encoding="utf-8") as fh:
|
||||
header = list(csv.reader(fh))[0]
|
||||
|
||||
section_names = set()
|
||||
for col in header:
|
||||
m = re.match(r"^ID \((.+)\)$", col)
|
||||
if m:
|
||||
section_names.add(m.group(1))
|
||||
|
||||
return header, section_names
|
||||
|
||||
|
||||
def _split_template_col(col, section_names):
|
||||
"""
|
||||
Decompose a template column into (base_label, section_name | None).
|
||||
|
||||
'Barcode (Barcodes)' -> ('Barcode', 'Barcodes')
|
||||
'No of Months (Expense)' -> ('No of Months (Expense)', None)
|
||||
because 'Expense' is NOT a known child-table section name.
|
||||
"""
|
||||
for sname in section_names:
|
||||
suffix = f" ({sname})"
|
||||
if col.endswith(suffix):
|
||||
return col[: -len(suffix)], sname
|
||||
return col, None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Section matching (child tables)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _match_child_sections(template_child_labels, export_sections):
|
||||
"""
|
||||
Map each template child-section name -> export section index.
|
||||
Matches by column-label overlap (template labels ⊆ export labels).
|
||||
"""
|
||||
mapping = {}
|
||||
for tname, tlabels in template_child_labels.items():
|
||||
best_idx, best_score = None, 0
|
||||
for idx in range(1, len(export_sections)):
|
||||
elabels = set(export_sections[idx]["labels"])
|
||||
score = sum(1 for tl in tlabels if tl in elabels)
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_idx = idx
|
||||
if best_idx is not None and best_score > 0:
|
||||
mapping[tname] = best_idx
|
||||
return mapping
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Topological sort for hierarchical doctypes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _topo_sort(rows, name_idx, parent_idx):
|
||||
"""
|
||||
Sort rows so that a parent row always appears before its children.
|
||||
Rows whose parent is empty or not in the dataset come first.
|
||||
"""
|
||||
by_name = {} # name -> row
|
||||
children = defaultdict(list) # parent_name -> [row, ...]
|
||||
roots = []
|
||||
|
||||
for row in rows:
|
||||
name = row[name_idx].strip()
|
||||
parent = row[parent_idx].strip()
|
||||
by_name[name] = row
|
||||
|
||||
if not parent:
|
||||
roots.append(row)
|
||||
else:
|
||||
children[parent].append(row)
|
||||
|
||||
# BFS from roots
|
||||
ordered = []
|
||||
queue = list(roots)
|
||||
seen = set()
|
||||
while queue:
|
||||
current = queue.pop(0)
|
||||
cname = current[name_idx].strip()
|
||||
if cname in seen:
|
||||
continue
|
||||
seen.add(cname)
|
||||
ordered.append(current)
|
||||
for child in children.get(cname, []):
|
||||
queue.append(child)
|
||||
|
||||
# Append any rows whose parent isn't in the dataset (orphans)
|
||||
for row in rows:
|
||||
rname = row[name_idx].strip()
|
||||
if rname not in seen:
|
||||
ordered.append(row)
|
||||
|
||||
return ordered
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Build import-ready CSV
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def build_import_csv(template_path, export_path, output_path,
|
||||
topo_sort_col=None, strip_link_cols=None):
|
||||
"""
|
||||
Create a clean import-ready CSV from a template + export pair.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
topo_sort_col : tuple(str, str) | None
|
||||
(name_col, parent_col) template column labels to topologically sort
|
||||
rows so parents appear before children.
|
||||
strip_link_cols : list[str] | None
|
||||
Template column labels whose values should be blanked out (e.g.
|
||||
Link fields referencing records that don't exist in production).
|
||||
"""
|
||||
|
||||
header, section_names = parse_template(template_path)
|
||||
export_sections, data_rows = parse_export(export_path)
|
||||
|
||||
# Collect child-table column labels by section from the template
|
||||
tmpl_child = {} # section_name -> [base_label, …]
|
||||
for col in header:
|
||||
base, sname = _split_template_col(col, section_names)
|
||||
if sname:
|
||||
tmpl_child.setdefault(sname, []).append(base)
|
||||
|
||||
# Match template sections to export sections
|
||||
sec_map = _match_child_sections(tmpl_child, export_sections)
|
||||
|
||||
# Build label -> export-column-index maps
|
||||
main_lbl_idx = {
|
||||
lbl: export_sections[0]["start_idx"] + i
|
||||
for i, lbl in enumerate(export_sections[0]["labels"])
|
||||
}
|
||||
|
||||
child_lbl_idx = {}
|
||||
for tname, eidx in sec_map.items():
|
||||
esec = export_sections[eidx]
|
||||
child_lbl_idx[tname] = {
|
||||
lbl: esec["start_idx"] + i
|
||||
for i, lbl in enumerate(esec["labels"])
|
||||
}
|
||||
|
||||
# Map every template column to an export column index (or None)
|
||||
col_map = []
|
||||
for col in header:
|
||||
base, sname = _split_template_col(col, section_names)
|
||||
if sname and sname in child_lbl_idx:
|
||||
col_map.append(child_lbl_idx[sname].get(base))
|
||||
else:
|
||||
# Main-table column (or child section with no match -> None)
|
||||
col_map.append(main_lbl_idx.get(col if sname is None else base))
|
||||
|
||||
# Build all output rows first
|
||||
out_rows = []
|
||||
for row in data_rows:
|
||||
clean_row = []
|
||||
for idx in col_indexes:
|
||||
if idx is not None and idx < len(row):
|
||||
clean_row.append(clean_cell(row[idx]))
|
||||
else:
|
||||
clean_row.append("")
|
||||
writer.writerow(clean_row)
|
||||
if not row or all(c.strip() == "" for c in row):
|
||||
continue
|
||||
|
||||
print(f"Clean Item Price CSV written to {output_file}")
|
||||
out = []
|
||||
for idx in col_map:
|
||||
if idx is not None and idx < len(row):
|
||||
out.append(clean_cell(row[idx]))
|
||||
else:
|
||||
out.append("")
|
||||
|
||||
if any(v.strip() for v in out):
|
||||
out_rows.append(out)
|
||||
|
||||
# Strip Link columns whose targets won't exist in production
|
||||
if strip_link_cols:
|
||||
strip_idxs = [i for i, col in enumerate(header) if col in strip_link_cols]
|
||||
for out in out_rows:
|
||||
for si in strip_idxs:
|
||||
out[si] = ""
|
||||
|
||||
# Topological sort so parent records are created before children
|
||||
if topo_sort_col:
|
||||
name_col, parent_col = topo_sort_col
|
||||
ni = header.index(name_col)
|
||||
pi = header.index(parent_col)
|
||||
out_rows = _topo_sort(out_rows, ni, pi)
|
||||
|
||||
# Write output
|
||||
row_count = 0
|
||||
with open(output_path, "w", newline="", encoding="utf-8") as fh:
|
||||
writer = csv.writer(fh)
|
||||
writer.writerow(header) # exact template header
|
||||
|
||||
for out in out_rows:
|
||||
writer.writerow(out)
|
||||
row_count += 1
|
||||
|
||||
print(f" -> {output_path} ({row_count} data rows)")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
# (template, export, output, topo_sort_col, strip_link_cols)
|
||||
# topo_sort_col = (name_col_label, parent_col_label) or None
|
||||
# strip_link_cols = list of template column labels to blank out
|
||||
item_group_strip = [
|
||||
"ID (Item Group Defaults)",
|
||||
"Company (Item Group Defaults)",
|
||||
"Default Buying Cost Center (Item Group Defaults)",
|
||||
"Default Discount Account (Item Group Defaults)",
|
||||
"Default Expense Account (Item Group Defaults)",
|
||||
"Default Income Account (Item Group Defaults)",
|
||||
"Default Price List (Item Group Defaults)",
|
||||
"Default Provisional Account (Item Group Defaults)",
|
||||
"Default Selling Cost Center (Item Group Defaults)",
|
||||
"Default Supplier (Item Group Defaults)",
|
||||
"Default Warehouse (Item Group Defaults)",
|
||||
"Deferred Expense Account (Item Group Defaults)",
|
||||
"Deferred Revenue Account (Item Group Defaults)",
|
||||
]
|
||||
|
||||
pairs = [
|
||||
("Item-template.csv", "Item.csv", "Item-import-ready.csv",
|
||||
None, None),
|
||||
("Item Price-template.csv", "Item Price.csv", "Item Price-import-ready.csv",
|
||||
None, None),
|
||||
("BOM-template.csv", "BOM.csv", "BOM-import-ready.csv",
|
||||
None, None),
|
||||
("Item Group-template.csv", "Item Group.csv", "Item Group-import-ready.csv",
|
||||
("Item Group Name", "Parent Item Group"), item_group_strip),
|
||||
("Supplier-template.csv", "Supplier.csv", "Supplier-import-ready.csv",
|
||||
None, None),
|
||||
("User-template.csv", "User.csv", "User-import-ready.csv",
|
||||
None, None),
|
||||
]
|
||||
|
||||
for tmpl, export, output, topo, strip in pairs:
|
||||
tmpl_path = os.path.join(SCRIPT_DIR, tmpl)
|
||||
export_path = os.path.join(SCRIPT_DIR, export)
|
||||
output_path = os.path.join(SCRIPT_DIR, output)
|
||||
|
||||
if not os.path.exists(tmpl_path):
|
||||
print(f" !! Template not found: {tmpl}")
|
||||
continue
|
||||
if not os.path.exists(export_path):
|
||||
print(f" !! Export not found: {export}")
|
||||
continue
|
||||
|
||||
print(f"Processing {export} ...")
|
||||
build_import_csv(tmpl_path, export_path, output_path,
|
||||
topo_sort_col=topo, strip_link_cols=strip)
|
||||
|
||||
print("\nDone.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue