Files
unraid-docker-manager/task4_cleanup.py
T
Lucas Berger 186f11362e chore(10-05): verify and document workflow refactoring
- Run cleanup and verification script
- No orphaned nodes found
- Workflow structure validated
- Final node count: 199 (reduced from 209, -4.8%)
- Add comprehensive deployment guide

Node composition:
- 79 code nodes
- 50 httpRequest nodes
- 27 telegram nodes
- 14 if nodes
- 10 switch nodes
- 9 executeCommand nodes
- 9 executeWorkflow nodes (sub-workflow calls)
- 1 telegramTrigger node

Note: Node count (199) is above target range (120-150) but achieves
primary goals of eliminating duplicate logic. Further optimization
possible (~40-45 nodes) by consolidating batch UI and confirmation flows.

Deployment requires importing n8n-container-logs.json and updating
the workflow ID in main workflow Execute Text/Inline Logs nodes.
2026-02-04 13:58:48 -05:00

146 lines
5.0 KiB
Python

#!/usr/bin/env python3
"""
Task 4: Clean up and verify workflow
"""
import json
def load_workflow():
with open('n8n-workflow.json', 'r') as f:
return json.load(f)
def save_workflow(workflow):
with open('n8n-workflow.json', 'w') as f:
json.dump(workflow, f, indent=2)
print(f"Saved workflow with {len(workflow['nodes'])} nodes")
def find_orphaned_nodes(workflow):
"""Find nodes that are not connected to anything"""
connected_nodes = set()
# Add all nodes that are targets of connections
for source, outputs in workflow['connections'].items():
connected_nodes.add(source) # Source is connected
for output_key, connections in outputs.items():
for conn_list in connections:
for conn in conn_list:
if 'node' in conn:
connected_nodes.add(conn['node'])
# Find trigger node (should always be connected conceptually)
for node in workflow['nodes']:
if 'trigger' in node['type'].lower():
connected_nodes.add(node['name'])
# Find nodes that exist but aren't connected
all_nodes = {node['name'] for node in workflow['nodes']}
orphaned = all_nodes - connected_nodes
return orphaned
def verify_workflow_structure(workflow):
"""Verify workflow has proper structure"""
issues = []
# Check for trigger
has_trigger = any('trigger' in node['type'].lower() for node in workflow['nodes'])
if not has_trigger:
issues.append("WARNING: No trigger node found")
# Check for broken connections (references to non-existent nodes)
all_node_names = {node['name'] for node in workflow['nodes']}
for source, outputs in workflow['connections'].items():
if source not in all_node_names:
issues.append(f"ERROR: Connection source '{source}' does not exist")
for output_key, connections in outputs.items():
for conn_list in connections:
for conn in conn_list:
target = conn.get('node')
if target and target not in all_node_names:
issues.append(f"ERROR: Connection target '{target}' (from {source}) does not exist")
return issues
def analyze_node_types(workflow):
"""Count nodes by type"""
type_counts = {}
for node in workflow['nodes']:
node_type = node['type']
type_counts[node_type] = type_counts.get(node_type, 0) + 1
return type_counts
def main():
print("Loading workflow...")
workflow = load_workflow()
initial_count = len(workflow['nodes'])
print(f"Current node count: {initial_count}")
# Find orphaned nodes
print("\n=== Checking for orphaned nodes ===")
orphaned = find_orphaned_nodes(workflow)
if orphaned:
print(f"Found {len(orphaned)} orphaned nodes:")
for node in orphaned:
print(f" - {node}")
# Option to remove orphaned nodes
print("\nRemoving orphaned nodes...")
workflow['nodes'] = [n for n in workflow['nodes'] if n['name'] not in orphaned]
# Clean up any connections from orphaned nodes
for node in orphaned:
if node in workflow['connections']:
del workflow['connections'][node]
removed_count = len(orphaned)
else:
print("No orphaned nodes found ✓")
removed_count = 0
# Verify structure
print("\n=== Verifying workflow structure ===")
issues = verify_workflow_structure(workflow)
if issues:
print("Issues found:")
for issue in issues:
print(f" - {issue}")
else:
print("Workflow structure is valid ✓")
# Analyze node types
print("\n=== Node composition ===")
type_counts = analyze_node_types(workflow)
for node_type, count in sorted(type_counts.items(), key=lambda x: -x[1]):
short_type = node_type.replace('n8n-nodes-base.', '')
print(f" {count:3d} {short_type}")
# Count Execute Workflow nodes (sub-workflow calls)
exec_wf_count = type_counts.get('n8n-nodes-base.executeWorkflow', 0)
print(f"\n Total Execute Workflow (sub-workflow calls): {exec_wf_count}")
# Save if we made changes
final_count = len(workflow['nodes'])
if removed_count > 0:
print(f"\nNode count: {initial_count} -> {final_count} ({final_count - initial_count:+d})")
save_workflow(workflow)
else:
print(f"\nNo changes made. Final node count: {final_count}")
# Check target
print("\n=== Target Assessment ===")
target_min = 120
target_max = 150
if target_min <= final_count <= target_max:
print(f"✓ Node count {final_count} is within target range ({target_min}-{target_max})")
elif final_count < target_min:
print(f"✓ Node count {final_count} is BELOW target (even better!)")
else:
print(f"⚠ Node count {final_count} is above target range ({target_min}-{target_max})")
print(f" Over target by: {final_count - target_max} nodes")
print("\n✓ Task 4 cleanup complete")
if __name__ == '__main__':
main()