325 lines
12 KiB
Python
325 lines
12 KiB
Python
# Python
|
|
import urllib.parse
|
|
from collections import deque
|
|
# Django
|
|
from django.db import models
|
|
from django.conf import settings
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
|
|
|
NAMED_URL_RES_DILIMITER = "++"
|
|
NAMED_URL_RES_INNER_DILIMITER = "+"
|
|
NAMED_URL_RES_DILIMITER_ENCODE = "%2B"
|
|
URL_PATH_RESERVED_CHARSET = {}
|
|
for c in ';/?:@=&[]':
|
|
URL_PATH_RESERVED_CHARSET[c] = urllib.parse.quote(c, safe='')
|
|
FK_NAME = 0
|
|
NEXT_NODE = 1
|
|
|
|
NAME_EXCEPTIONS = {
|
|
"custom_inventory_scripts": "inventory_scripts"
|
|
}
|
|
|
|
|
|
class GraphNode(object):
|
|
|
|
def __init__(self, model, fields, adj_list):
|
|
self.model = model
|
|
self.found = False
|
|
self.obj = None
|
|
self.fields = fields
|
|
self.adj_list = adj_list
|
|
self.counter = 0
|
|
|
|
def _handle_unexpected_model_url_names(self, model_url_name):
|
|
if model_url_name in NAME_EXCEPTIONS:
|
|
return NAME_EXCEPTIONS[model_url_name]
|
|
return model_url_name
|
|
|
|
@property
|
|
def model_url_name(self):
|
|
if not hasattr(self, '_model_url_name'):
|
|
self._model_url_name = self.model._meta.verbose_name_plural.replace(' ', '_')
|
|
self._model_url_name = self._handle_unexpected_model_url_names(self._model_url_name)
|
|
return self._model_url_name
|
|
|
|
@property
|
|
def named_url_format(self):
|
|
named_url_components = []
|
|
stack = [self]
|
|
current_fk_name = ''
|
|
while stack:
|
|
if stack[-1].counter == 0:
|
|
named_url_component = NAMED_URL_RES_INNER_DILIMITER.join(
|
|
["<%s>" % (current_fk_name + field)
|
|
for field in stack[-1].fields]
|
|
)
|
|
named_url_components.append(named_url_component)
|
|
if stack[-1].counter >= len(stack[-1].adj_list):
|
|
stack[-1].counter = 0
|
|
stack.pop()
|
|
else:
|
|
to_append = stack[-1].adj_list[stack[-1].counter][NEXT_NODE]
|
|
current_fk_name = "%s." % (stack[-1].adj_list[stack[-1].counter][FK_NAME],)
|
|
stack[-1].counter += 1
|
|
stack.append(to_append)
|
|
return NAMED_URL_RES_DILIMITER.join(named_url_components)
|
|
|
|
@property
|
|
def named_url_repr(self):
|
|
ret = {}
|
|
ret['fields'] = self.fields
|
|
ret['adj_list'] = [[x[FK_NAME], x[NEXT_NODE].model_url_name] for x in self.adj_list]
|
|
return ret
|
|
|
|
def _encode_uri(self, text):
|
|
'''
|
|
Performance assured: http://stackoverflow.com/a/27086669
|
|
'''
|
|
for c in URL_PATH_RESERVED_CHARSET:
|
|
if not isinstance(text, str):
|
|
text = str(text) # needed for WFJT node creation, identifier temporarily UUID4 type
|
|
if c in text:
|
|
text = text.replace(c, URL_PATH_RESERVED_CHARSET[c])
|
|
text = text.replace(NAMED_URL_RES_INNER_DILIMITER,
|
|
'[%s]' % NAMED_URL_RES_INNER_DILIMITER)
|
|
return text
|
|
|
|
def generate_named_url(self, obj):
|
|
self.obj = obj
|
|
named_url = []
|
|
stack = [self]
|
|
while stack:
|
|
if stack[-1].counter == 0:
|
|
named_url_item = [self._encode_uri(getattr(stack[-1].obj, field, ''))
|
|
for field in stack[-1].fields]
|
|
named_url.append(NAMED_URL_RES_INNER_DILIMITER.join(named_url_item))
|
|
if stack[-1].counter >= len(stack[-1].adj_list):
|
|
stack[-1].counter = 0
|
|
stack[-1].obj = None
|
|
stack.pop()
|
|
else:
|
|
next_ = stack[-1].adj_list[stack[-1].counter]
|
|
stack[-1].counter += 1
|
|
next_obj = getattr(stack[-1].obj, next_[FK_NAME], None)
|
|
if next_obj is not None:
|
|
next_[NEXT_NODE].obj = next_obj
|
|
stack.append(next_[NEXT_NODE])
|
|
else:
|
|
named_url.append('')
|
|
return NAMED_URL_RES_DILIMITER.join(named_url)
|
|
|
|
|
|
def _process_top_node(self, named_url_names, kwargs, prefixes, stack, idx):
|
|
if stack[-1].counter == 0:
|
|
if idx >= len(named_url_names):
|
|
return idx, False
|
|
if not named_url_names[idx]:
|
|
stack[-1].counter = 0
|
|
stack.pop()
|
|
if prefixes:
|
|
prefixes.pop()
|
|
idx += 1
|
|
return idx, True
|
|
named_url_parts = named_url_names[idx].split(NAMED_URL_RES_INNER_DILIMITER)
|
|
if len(named_url_parts) != len(stack[-1].fields):
|
|
return idx, False
|
|
evolving_prefix = '__'.join(prefixes)
|
|
for attr_name, attr_value in zip(stack[-1].fields, named_url_parts):
|
|
attr_name = ("__%s" % attr_name) if evolving_prefix else attr_name
|
|
if isinstance(attr_value, str):
|
|
attr_value = urllib.parse.unquote(attr_value)
|
|
kwargs[evolving_prefix + attr_name] = attr_value
|
|
idx += 1
|
|
if stack[-1].counter >= len(stack[-1].adj_list):
|
|
stack[-1].counter = 0
|
|
stack.pop()
|
|
if prefixes:
|
|
prefixes.pop()
|
|
else:
|
|
to_append = stack[-1].adj_list[stack[-1].counter]
|
|
stack[-1].counter += 1
|
|
prefixes.append(to_append[FK_NAME])
|
|
stack.append(to_append[NEXT_NODE])
|
|
return idx, True
|
|
|
|
def populate_named_url_query_kwargs(self, kwargs, named_url, ignore_digits=True):
|
|
if ignore_digits and named_url.isdigit() and int(named_url) > 0:
|
|
return False
|
|
named_url = named_url.replace('[%s]' % NAMED_URL_RES_INNER_DILIMITER,
|
|
NAMED_URL_RES_DILIMITER_ENCODE)
|
|
named_url_names = named_url.split(NAMED_URL_RES_DILIMITER)
|
|
prefixes = []
|
|
stack = [self]
|
|
idx = 0
|
|
while stack:
|
|
idx, is_valid = self._process_top_node(
|
|
named_url_names, kwargs, prefixes, stack, idx
|
|
)
|
|
if not is_valid:
|
|
return False
|
|
return idx == len(named_url_names)
|
|
|
|
def add_bindings(self):
|
|
if self.model_url_name not in settings.NAMED_URL_FORMATS:
|
|
settings.NAMED_URL_FORMATS[self.model_url_name] = self.named_url_format
|
|
settings.NAMED_URL_GRAPH_NODES[self.model_url_name] = self.named_url_repr
|
|
settings.NAMED_URL_MAPPINGS[self.model_url_name] = self.model
|
|
|
|
def remove_bindings(self):
|
|
if self.model_url_name in settings.NAMED_URL_FORMATS:
|
|
settings.NAMED_URL_FORMATS.pop(self.model_url_name)
|
|
settings.NAMED_URL_GRAPH_NODES.pop(self.model_url_name)
|
|
settings.NAMED_URL_MAPPINGS.pop(self.model_url_name)
|
|
|
|
|
|
def _get_all_unique_togethers(model):
|
|
queue = deque()
|
|
queue.append(model)
|
|
ret = []
|
|
try:
|
|
if model._meta.get_field('name').unique:
|
|
ret.append(('name',))
|
|
except Exception:
|
|
pass
|
|
while len(queue) > 0:
|
|
model_to_backtrack = queue.popleft()
|
|
uts = model_to_backtrack._meta.unique_together
|
|
if len(uts) > 0 and not isinstance(uts[0], tuple):
|
|
ret.append(uts)
|
|
else:
|
|
ret.extend(uts)
|
|
soft_uts = getattr(model_to_backtrack, 'SOFT_UNIQUE_TOGETHER', [])
|
|
ret.extend(soft_uts)
|
|
for parent_class in model_to_backtrack.__bases__:
|
|
if issubclass(parent_class, models.Model) and\
|
|
hasattr(parent_class, '_meta') and\
|
|
hasattr(parent_class._meta, 'unique_together') and\
|
|
isinstance(parent_class._meta.unique_together, tuple):
|
|
queue.append(parent_class)
|
|
ret.sort(key=lambda x: len(x))
|
|
return tuple(ret)
|
|
|
|
|
|
def _check_unique_together_fields(model, ut):
|
|
name_field = None
|
|
fk_names = []
|
|
fields = []
|
|
is_valid = True
|
|
for field_name in ut:
|
|
field = model._meta.get_field(field_name)
|
|
if field_name in ('name', 'identifier'):
|
|
name_field = field_name
|
|
elif type(field) == models.ForeignKey and field.related_model != model:
|
|
fk_names.append(field_name)
|
|
elif issubclass(type(field), models.CharField) and field.choices:
|
|
fields.append(field_name)
|
|
else:
|
|
is_valid = False
|
|
break
|
|
if not is_valid:
|
|
return (), (), is_valid
|
|
fk_names.sort()
|
|
fields.sort(reverse=True)
|
|
if name_field:
|
|
fields.append(name_field)
|
|
fields.reverse()
|
|
return tuple(fk_names), tuple(fields), is_valid
|
|
|
|
|
|
def _generate_configurations(nodes):
|
|
if not nodes:
|
|
return
|
|
idx = 0
|
|
stack = [nodes[0][1]]
|
|
idx_stack = [0]
|
|
configuration = {}
|
|
while stack:
|
|
if idx_stack[-1] >= len(stack[-1]):
|
|
stack.pop()
|
|
idx_stack.pop()
|
|
configuration.pop(nodes[idx][0])
|
|
idx -= 1
|
|
else:
|
|
if len(configuration) == len(stack):
|
|
configuration.pop(nodes[idx][0])
|
|
configuration[nodes[idx][0]] = tuple(stack[-1][idx_stack[-1]])
|
|
idx_stack[-1] += 1
|
|
if idx == len(nodes) - 1:
|
|
yield configuration.copy()
|
|
else:
|
|
idx += 1
|
|
stack.append(nodes[idx][1])
|
|
idx_stack.append(0)
|
|
|
|
|
|
def _dfs(configuration, model, graph, dead_ends, new_deadends, parents):
|
|
parents.add(model)
|
|
fields, fk_names = configuration[model][0][:], configuration[model][1][:]
|
|
adj_list = []
|
|
for fk_name in fk_names:
|
|
next_model = model._meta.get_field(fk_name).related_model
|
|
if issubclass(next_model, ContentType):
|
|
continue
|
|
if next_model not in configuration or\
|
|
next_model in dead_ends or\
|
|
next_model in new_deadends or\
|
|
next_model in parents:
|
|
new_deadends.add(model)
|
|
parents.remove(model)
|
|
return False
|
|
if next_model not in graph and\
|
|
not _dfs(
|
|
configuration, next_model, graph,
|
|
dead_ends, new_deadends, parents
|
|
):
|
|
new_deadends.add(model)
|
|
parents.remove(model)
|
|
return False
|
|
adj_list.append((fk_name, graph[next_model]))
|
|
graph[model] = GraphNode(model, fields, adj_list)
|
|
parents.remove(model)
|
|
return True
|
|
|
|
|
|
def _generate_single_graph(configuration, dead_ends):
|
|
new_deadends = set()
|
|
graph = {}
|
|
for model in configuration:
|
|
if model not in graph and model not in new_deadends:
|
|
_dfs(configuration, model, graph, dead_ends, new_deadends, set())
|
|
return graph
|
|
|
|
|
|
def generate_graph(models):
|
|
settings.NAMED_URL_FORMATS = {}
|
|
settings.NAMED_URL_GRAPH_NODES = {}
|
|
settings.NAMED_URL_MAPPINGS = {}
|
|
candidate_nodes = {}
|
|
dead_ends = set()
|
|
for model in models:
|
|
uts = _get_all_unique_togethers(model)
|
|
for ut in uts:
|
|
fk_names, fields, is_valid = _check_unique_together_fields(model, ut)
|
|
if is_valid:
|
|
candidate_nodes.setdefault(model, [])
|
|
candidate_nodes[model].append([fields, fk_names])
|
|
if model not in candidate_nodes:
|
|
dead_ends.add(model)
|
|
candidate_nodes = list(candidate_nodes.items())
|
|
largest_graph = {}
|
|
for configuration in _generate_configurations(candidate_nodes):
|
|
candidate_graph = _generate_single_graph(configuration, dead_ends)
|
|
if len(largest_graph) < len(candidate_graph):
|
|
largest_graph = candidate_graph
|
|
if len(largest_graph) == len(candidate_nodes):
|
|
break
|
|
settings.NAMED_URL_GRAPH = largest_graph
|
|
for node in settings.NAMED_URL_GRAPH.values():
|
|
node.add_bindings()
|
|
|
|
|
|
def reset_counters():
|
|
for node in settings.NAMED_URL_GRAPH.values():
|
|
node.counter = 0
|