custom_ui/custom_ui/api/db/addresses.py

136 lines
No EOL
5.8 KiB
Python

import frappe
import json
from custom_ui.db_utils import build_error_response, build_success_response
from custom_ui.services import ClientService, AddressService, ContactService
@frappe.whitelist()
def get_address_by_full_address(full_address):
"""Get address by full_address, including associated contacts."""
print(f"DEBUG: get_address_by_full_address called with full_address: {full_address}")
try:
address = AddressService.get_address_by_full_address(full_address)
return build_success_response(AddressService.build_full_dict(address))
except Exception as e:
return build_error_response(str(e), 500)
@frappe.whitelist()
def get_address(address_name):
"""Get a specific address by name."""
try:
address = AddressService.get_or_throw(address_name)
return build_success_response(address.as_dict())
except Exception as e:
return build_error_response(str(e), 500)
# @frappe.whitelist() #### DEPRECATED FUNCTION
# def get_contacts_for_address(address_name):
# """Get contacts linked to a specific address."""
# try:
# address = AddressService.get_or_throw(address_name)
# contacts = []
# for contact_link in address.custom_linked_contacts:
# contact = frappe.get_doc("Contact", contact_link.contact)
# contacts.append(contact.as_dict())
# return build_success_response(contacts)
# except Exception as e:
# return build_error_response(str(e), 500)
@frappe.whitelist()
def create_address(address_data, company, customer_name):
"""Create a new address."""
print(f"DEBUG: create_address called with address_data: {address_data}, company: {company}, customer_name: {customer_name}")
if isinstance(address_data, str):
address_data = json.loads(address_data)
customer_doctype = ClientService.get_client_doctype(customer_name)
address_data["customer_name"] = customer_name
address_data["customer_type"] = customer_doctype
address_data["address_title"] = AddressService.build_address_title(customer_name, address_data)
address_data["address_type"] = "Service"
address_data["custom_billing_address"] = 0
address_data["is_service_address"] = 1
address_data["country"] = "United States"
address_data["companies"] = [{ "company": company }]
print(f"DEBUG: Final address_data before creation: {address_data}")
try:
address_doc = AddressService.create_address(address_data)
for contact in address_data.get("contacts", []):
AddressService.link_address_to_contact(address_doc, contact)
contact_doc = ContactService.get_or_throw(contact)
ContactService.link_contact_to_address(contact_doc, address_doc)
ClientService.append_link_v2(customer_name, "properties", {"address": address_doc.name})
return build_success_response(address_doc.as_dict())
except Exception as e:
return build_error_response(str(e), 500)
@frappe.whitelist()
def get_addresses(fields=["*"], filters={}):
"""Get addresses with optional filtering."""
if isinstance(fields, str):
fields = json.loads(fields)
if isinstance(filters, str):
filters = json.loads(filters)
if fields[0] != "*" and len(fields) == 1:
pluck = fields[0]
fields = None
print(f"Getting addresses with fields: {fields} and filters: {filters} and pluck: {pluck}")
try:
addresses = frappe.get_all(
"Address",
fields=fields,
filters=filters,
order_by="address_line1 desc",
pluck=pluck
)
return build_success_response(addresses)
except Exception as e:
frappe.log_error(message=str(e), title="Get Addresses Failed")
return build_error_response(str(e), 500)
def update_address(address_data):
"""Update an existing address."""
if isinstance(address_data, str):
address_data = json.loads(address_data)
address_doc = check_and_get_address_by_name(address_data.get("name"))
for key, value in address_data.items():
setattr(address_doc, key, value)
address_doc.save(ignore_permissions=True)
return address_doc
def address_exists(address_line1, address_line2, city, state, pincode):
"""Check if an address with the given details already exists."""
filters = {
"address_line1": address_line1,
"address_line2": address_line2,
"city": city,
"state": state,
"pincode": pincode
}
return frappe.db.exists("Address", filters) is not None
def calculate_address_title(customer_name, address_data):
return f"{customer_name} - {address_data.get('address_line1', '')}, {address_data.get('city', '')} - {address_data.get('type', '')}"
def create_address_links(address_doc, client_doc, contact_docs):
print("#####DEBUG: Linking customer to address.")
print("#####DEBUG: Client Doc:", client_doc.as_dict(), "Address Doc:", address_doc.as_dict(), "Contact Docs:", [c.as_dict() for c in contact_docs])
address_doc.append("links", {
"link_doctype": client_doc.doctype,
"link_name": client_doc.name
})
setattr(address_doc, "custom_customer_to_bill" if client_doc.doctype == "Customer" else "lead_name", client_doc.name)
# Address -> Contact
print("#####DEBUG: Linking contacts to address.")
address_doc.custom_contact = next((c.name for c in contact_docs if c.is_primary_contact), contact_docs[0].name)
for contact_doc in contact_docs:
address_doc.append("custom_linked_contacts", {
"contact": contact_doc.name,
"email": contact_doc.email_id,
"phone": contact_doc.phone,
"role": contact_doc.role
})
address_doc.append("links", {
"link_doctype": "Contact",
"link_name": contact_doc.name
})
address_doc.save(ignore_permissions=True)