#!/usr/bin/env python3 """ Data Quality Reviewer for Query Expansion Training Dataset This script identifies and flags/fixes semantic errors where technical terms are misunderstood. For example: - "gem find" expanded as "mineral hunt" instead of "ruby gem search" - "yarn spin" expanded as "wool twist" instead of "yarn package manager" The script uses contextual analysis to detect when technical terms are likely being used in a programming context vs. their everyday meaning. """ import json import re from pathlib import Path from dataclasses import dataclass, field from typing import Optional from collections import defaultdict from dataset.schema import ( normalize_output_items, output_items_to_text, parse_output_text, ) @dataclass class TechnicalTerm: """Definition of a technical term that might be misunderstood.""" term: str # The ambiguous term (e.g., "liquid", "gem", "yarn") context_indicators: list[str] # Words that suggest tech context wrong_expansions: list[str] # Patterns that indicate wrong interpretation correct_domain: str # What domain this belongs to when technical correct_lex: list[str] # Correct lex expansions correct_vec: list[str] # Correct vec expansions # Known technical terms that are commonly misunderstood KNOWN_TECHNICAL_TERMS = [ TechnicalTerm( term="liquid", context_indicators=["shopify", "template", "filter", "tag", "theme", "jekyll"], wrong_expansions=["fluid", "water", "pour", "drink", "beverage", "h2o", "wet"], correct_domain="Shopify/Jekyll templating language", correct_lex=["shopify template syntax", "liquid template filter"], correct_vec=[ "shopify liquid templating language", "liquid template engine filters", ], ), TechnicalTerm( term="gem", context_indicators=[ "ruby", "bundler", "install", "gemfile", "rails", "require", ], wrong_expansions=[ "mineral", "crystal", "jewel", "stone", "diamond", "jewelry", "precious", ], correct_domain="Ruby package manager", correct_lex=["ruby gem package", "gem install command"], correct_vec=["ruby gem package manager", "rubygems library installation"], ), TechnicalTerm( term="yarn", context_indicators=[ "npm", "package", "install", "node", "javascript", "react", "webpack", ], wrong_expansions=[ "thread", "wool", "knit", "spin", "textile", "fabric", "sew", "twist", ], correct_domain="JavaScript package manager", correct_lex=["yarn package manager", "yarn install dependencies"], correct_vec=["yarn javascript package manager", "yarn npm alternative"], ), TechnicalTerm( term="hook", context_indicators=[ "react", "use", "state", "effect", "component", "callback", "git", ], wrong_expansions=["fish", "fishing", "bait", "catch", "hang", "pirate"], correct_domain="React hooks or Git hooks", correct_lex=["react hooks api", "usestate useeffect"], correct_vec=[ "react hooks state management", "react functional component hooks", ], ), TechnicalTerm( term="container", context_indicators=[ "docker", "kubernetes", "k8s", "image", "orchestration", "pod", ], wrong_expansions=[ "box", "storage", "shipping", "cargo", "tupperware", "jar", "vessel", ], correct_domain="Docker/Kubernetes containers", correct_lex=["docker container", "container image"], correct_vec=[ "docker container virtualization", "container orchestration platform", ], ), TechnicalTerm( term="branch", context_indicators=[ "git", "merge", "checkout", "commit", "main", "master", "repo", ], wrong_expansions=["tree", "limb", "wood", "leaf", "twig", "forest"], correct_domain="Git version control", correct_lex=["git branch", "git checkout branch"], correct_vec=["git branch version control", "git branching workflow"], ), TechnicalTerm( term="decorator", context_indicators=["python", "@", "function", "wrapper", "class", "def"], wrong_expansions=[ "interior", "design", "paint", "furniture", "decor", "ornament", ], correct_domain="Python decorators", correct_lex=["python decorator function", "@decorator syntax"], correct_vec=["python function decorators", "python decorator pattern"], ), TechnicalTerm( term="bean", context_indicators=[ "java", "spring", "injection", "dependency", "servlet", "ejb", ], wrong_expansions=["coffee", "food", "vegetable", "legume", "plant", "soy"], correct_domain="Java Beans / Spring Beans", correct_lex=["java bean class", "spring bean injection"], correct_vec=["java enterprise beans", "spring dependency injection beans"], ), TechnicalTerm( term="shell", context_indicators=[ "bash", "script", "terminal", "command", "linux", "unix", "zsh", ], wrong_expansions=["seashell", "ocean", "beach", "clam", "oyster", "egg"], correct_domain="Unix/Linux shell scripting", correct_lex=["bash shell script", "shell command"], correct_vec=["unix shell scripting", "bash command line shell"], ), TechnicalTerm( term="rust", context_indicators=[ "cargo", "crate", "ownership", "borrow", "lifetime", "unsafe", ], wrong_expansions=["oxidation", "metal", "corrosion", "decay", "iron", "orange"], correct_domain="Rust programming language", correct_lex=["rust programming language", "rust cargo package"], correct_vec=["rust systems programming", "rust memory safety"], ), TechnicalTerm( term="go", context_indicators=[ "golang", "goroutine", "channel", "defer", "gofmt", "module", ], wrong_expansions=[ "travel", "move", "walk", "game", "board game", "leave", "depart", ], correct_domain="Go programming language", correct_lex=["golang programming", "go language syntax"], correct_vec=["go programming language", "golang concurrent programming"], ), TechnicalTerm( term="swift", context_indicators=["ios", "xcode", "apple", "uikit", "swiftui", "cocoa"], wrong_expansions=["fast", "quick", "bird", "speed", "rapid", "taylor"], correct_domain="Swift programming language", correct_lex=["swift ios development", "swift programming language"], correct_vec=["swift apple programming language", "swift ios app development"], ), TechnicalTerm( term="pod", context_indicators=[ "kubernetes", "k8s", "deployment", "service", "cluster", "node", ], wrong_expansions=["pea", "seed", "plant", "vegetable", "legume", "whale"], correct_domain="Kubernetes pods", correct_lex=["kubernetes pod", "k8s pod deployment"], correct_vec=["kubernetes pod container group", "k8s pod orchestration"], ), TechnicalTerm( term="redis", context_indicators=[ "cache", "database", "key-value", "memory", "pub/sub", "queue", ], wrong_expansions=[], # "redis" doesn't have common wrong meanings correct_domain="Redis in-memory database", correct_lex=["redis cache", "redis database"], correct_vec=["redis in-memory data store", "redis caching solution"], ), TechnicalTerm( term="kafka", context_indicators=[ "message", "stream", "queue", "broker", "topic", "producer", "consumer", ], wrong_expansions=[ "franz", "author", "writer", "novel", "metamorphosis", "literature", ], correct_domain="Apache Kafka message queue", correct_lex=["apache kafka", "kafka message broker"], correct_vec=["apache kafka streaming platform", "kafka message queue"], ), TechnicalTerm( term="elastic", context_indicators=[ "elasticsearch", "search", "index", "kibana", "logstash", "query", ], wrong_expansions=["stretch", "rubber", "flexible", "band", "bouncy"], correct_domain="Elasticsearch", correct_lex=["elasticsearch", "elastic search index"], correct_vec=["elasticsearch full-text search", "elastic stack"], ), TechnicalTerm( term="spark", context_indicators=["apache", "hadoop", "data", "rdd", "dataframe", "pyspark"], wrong_expansions=["fire", "ignite", "flame", "plug", "electricity"], correct_domain="Apache Spark", correct_lex=["apache spark", "spark data processing"], correct_vec=["apache spark big data processing", "spark cluster computing"], ), TechnicalTerm( term="flask", context_indicators=["python", "web", "route", "api", "jinja", "werkzeug"], wrong_expansions=[ "bottle", "container", "lab", "chemistry", "drink", "thermos", ], correct_domain="Flask web framework", correct_lex=["flask python web framework", "flask api"], correct_vec=["flask python web development", "flask microframework"], ), TechnicalTerm( term="django", context_indicators=["python", "web", "orm", "model", "view", "template"], wrong_expansions=["jazz", "music", "reinhardt", "guitar", "movie", "western"], correct_domain="Django web framework", correct_lex=["django python framework", "django web development"], correct_vec=["django python web framework", "django orm models"], ), TechnicalTerm( term="rails", context_indicators=[ "ruby", "gem", "activerecord", "model", "controller", "migration", ], wrong_expansions=["train", "track", "railroad", "railway", "metal"], correct_domain="Ruby on Rails", correct_lex=["ruby on rails", "rails web framework"], correct_vec=["ruby on rails framework", "rails mvc architecture"], ), TechnicalTerm( term="node", context_indicators=[ "javascript", "npm", "express", "async", "require", "module", ], wrong_expansions=["lump", "knot", "bump", "growth", "junction"], correct_domain="Node.js", correct_lex=["node.js javascript", "nodejs runtime"], correct_vec=["node.js javascript runtime", "nodejs server-side javascript"], ), TechnicalTerm( term="maven", context_indicators=[ "java", "pom", "dependency", "build", "artifact", "repository", ], wrong_expansions=["expert", "specialist", "connoisseur"], correct_domain="Apache Maven", correct_lex=["apache maven", "maven build tool"], correct_vec=["apache maven java build", "maven dependency management"], ), TechnicalTerm( term="gradle", context_indicators=["java", "kotlin", "android", "build", "groovy", "task"], wrong_expansions=["grade", "slope", "hill", "incline"], correct_domain="Gradle build tool", correct_lex=["gradle build tool", "gradle android"], correct_vec=["gradle java build automation", "gradle kotlin dsl"], ), TechnicalTerm( term="ant", context_indicators=["java", "build", "xml", "target", "task"], wrong_expansions=["insect", "bug", "colony", "hill", "picnic"], correct_domain="Apache Ant build tool", correct_lex=["apache ant", "ant build xml"], correct_vec=["apache ant java build", "ant build automation"], ), ] @dataclass class Issue: """Represents an issue found in a dataset example.""" line_number: int input_text: str output_text: str issue_type: str technical_term: str wrong_expansion_found: str suggested_fix: Optional[str] = None @dataclass class AnalysisResult: """Results of analyzing the dataset.""" total_examples: int = 0 issues_found: list[Issue] = field(default_factory=list) examples_with_correct_tech_terms: list[tuple[int, str]] = field( default_factory=list ) term_statistics: dict = field(default_factory=lambda: defaultdict(int)) def check_for_wrong_expansion(output_text: str, term: TechnicalTerm) -> Optional[str]: """Check if the output contains wrong expansions for a technical term.""" output_lower = output_text.lower() for wrong in term.wrong_expansions: if wrong.lower() in output_lower: return wrong return None def has_tech_context(input_text: str, term: TechnicalTerm) -> bool: """Check if the input has indicators of a technical context.""" input_lower = input_text.lower() for indicator in term.context_indicators: if indicator.lower() in input_lower: return True return False def is_likely_tech_query(input_text: str) -> bool: """ Heuristic to determine if a short query is likely tech-related. Short queries like "gem find" or "yarn spin" are ambiguous. """ tech_patterns = [ r"\b(install|config|setup|build|run|debug|test|deploy|compile)\b", r"\b(api|cli|sdk|lib|pkg|npm|pip|cargo)\b", r"\b(func|class|method|var|const|let|def)\b", r"\b(http|https|url|port|host|server|client)\b", r"\b(json|xml|yaml|csv|sql|html|css|js)\b", ] input_lower = input_text.lower() for pattern in tech_patterns: if re.search(pattern, input_lower): return True return False def has_non_tech_context(input_text: str, term: TechnicalTerm) -> bool: """ Check if the input clearly indicates a non-technical context. This helps avoid false positives for words like "car rust", "yarn spin", etc. """ input_lower = input_text.lower() term_lower = term.term.lower() # Define non-tech context indicators for each ambiguous term non_tech_contexts = { "rust": [ "car", "metal", "iron", "steel", "corrosion", "prevention", "remove", "body", ], "gem": [ "gemstone", "jewelry", "jewel", "diamond", "precious", "stone", "cut", "shop", "buy", "wear", ], "yarn": [ "knit", "crochet", "spin", "wool", "thread", "textile", "fabric", "sew", "weave", ], "hook": ["fishing", "crochet", "hang", "coat", "wall", "ceiling"], "container": [ "storage", "plastic", "food", "shipping", "cargo", "kitchen", "box", ], "branch": ["tree", "bank", "library", "store", "office", "organization"], "decorator": [ "interior", "home", "room", "house", "design", "party", "cake", "wedding", ], "bean": [ "coffee", "soy", "kidney", "black", "green", "garden", "cooking", "food", "plant", "grow", ], "shell": [ "sea", "beach", "egg", "nut", "turtle", "snail", "crab", "clam", "oyster", ], "spark": ["plug", "fire", "ignite", "car", "engine", "electric", "romance"], "go": ["travel", "vacation", "trip", "walk", "run", "leave", "visit", "tour"], "swift": ["taylor", "concert", "music", "singer", "speed", "fast", "bird"], "pod": ["pea", "whale", "orca", "dolphin", "vegetable", "seed", "plant"], "ant": ["insect", "colony", "fire", "carpenter", "pest", "bug", "picnic"], "node": ["lymph", "medical", "body", "tree", "network point"], "rails": ["train", "railroad", "railway", "track", "transit", "fence"], "flask": ["lab", "chemistry", "drink", "hip", "thermos", "bottle", "water"], "django": [ "jazz", "music", "reinhardt", "guitar", "movie", "western", "unchained", ], "maven": ["expert", "connoisseur", "specialist", "guru"], "gradle": ["grade", "school", "slope"], "kafka": [ "franz", "author", "novel", "metamorphosis", "literature", "writer", "book", ], "elastic": ["band", "rubber", "stretch", "flexible", "waist", "fabric"], } if term_lower in non_tech_contexts: for context_word in non_tech_contexts[term_lower]: if context_word.lower() in input_lower: return True return False def analyze_example(line_num: int, input_text: str, output_text: str) -> list[Issue]: """Analyze a single example for potential issues.""" issues = [] input_lower = input_text.lower() for term in KNOWN_TECHNICAL_TERMS: term_lower = term.term.lower() # Check if the input contains this technical term if term_lower not in input_lower: continue # Check if output has wrong expansion wrong_expansion = check_for_wrong_expansion(output_text, term) if wrong_expansion is None: continue # Skip if the context clearly indicates non-technical usage if has_non_tech_context(input_text, term): continue # Determine if this is likely a technical context is_tech = has_tech_context(input_text, term) or is_likely_tech_query(input_text) # For very short inputs that contain ONLY the tech term (like "gem find"), # these are ambiguous and could be tech-related word_count = len(input_text.split()) words = [w.lower() for w in input_text.split()] # Only flag if it's clearly a tech context OR a very short query # where the term appears prominently (e.g., "gem find", "yarn add") if is_tech: # Create suggested fix for definite tech issues suggested_output = f"lex: {term.correct_lex[0]}\nlex: {term.correct_lex[1] if len(term.correct_lex) > 1 else term.correct_lex[0]}\nvec: {term.correct_vec[0]}\nvec: {term.correct_vec[1] if len(term.correct_vec) > 1 else term.correct_vec[0]}\nhyde: {term.correct_domain} is a concept that provides functionality for software development." issue = Issue( line_number=line_num, input_text=input_text, output_text=output_text[:200] + "..." if len(output_text) > 200 else output_text, issue_type="wrong_tech_expansion", technical_term=term.term, wrong_expansion_found=wrong_expansion, suggested_fix=suggested_output, ) issues.append(issue) elif word_count <= 2 and term_lower in words: # Very short query with the term as a primary word - truly ambiguous issue = Issue( line_number=line_num, input_text=input_text, output_text=output_text[:200] + "..." if len(output_text) > 200 else output_text, issue_type="ambiguous_term", technical_term=term.term, wrong_expansion_found=wrong_expansion, suggested_fix=None, ) issues.append(issue) return issues def analyze_dataset(filepath: Path) -> AnalysisResult: """Analyze the entire dataset for issues.""" result = AnalysisResult() with open(filepath, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: example = json.loads(line) input_text = example.get("query", "") or example.get("input", "") output_raw = example.get("output", []) if isinstance(output_raw, str): output_items = normalize_output_items(parse_output_text(output_raw)) else: output_items = normalize_output_items(output_raw) output_text = output_items_to_text(output_items) result.total_examples += 1 # Analyze for issues issues = analyze_example(line_num, input_text, output_text) result.issues_found.extend(issues) # Track term statistics for term in KNOWN_TECHNICAL_TERMS: if term.term.lower() in input_text.lower(): result.term_statistics[term.term] += 1 except json.JSONDecodeError as e: print(f"Warning: Could not parse line {line_num}: {e}") return result def fix_example(example: dict, issues: list[Issue]) -> Optional[dict]: """ Attempt to fix an example based on identified issues. Returns None if no fix is needed or possible. """ # Only fix examples with definite tech context issues tech_issues = [ i for i in issues if i.issue_type == "wrong_tech_expansion" and i.suggested_fix ] if not tech_issues: return None # Use the first tech issue's fix (they should be similar) issue = tech_issues[0] if not issue.suggested_fix: return None fixed = example.copy() fixed_output_items = normalize_output_items(parse_output_text(issue.suggested_fix)) fixed["output"] = fixed_output_items fixed["_fixed"] = True original_items = example.get("output", []) if isinstance(original_items, str): original_items = normalize_output_items(parse_output_text(original_items)) fixed["_original_output"] = output_items_to_text(original_items) fixed["_fix_reason"] = ( f"Technical term '{issue.technical_term}' was incorrectly expanded as '{issue.wrong_expansion_found}'" ) return fixed def generate_report(result: AnalysisResult) -> str: """Generate a human-readable report of the analysis.""" lines = [] lines.append("=" * 70) lines.append("QUERY EXPANSION DATASET QUALITY REPORT") lines.append("=" * 70) lines.append("") lines.append(f"Total examples analyzed: {result.total_examples}") lines.append(f"Issues found: {len(result.issues_found)}") lines.append("") # Group issues by type by_type = defaultdict(list) for issue in result.issues_found: by_type[issue.issue_type].append(issue) lines.append("-" * 70) lines.append("ISSUES BY TYPE:") lines.append("-" * 70) for issue_type, issues in by_type.items(): lines.append(f"\n{issue_type.upper()}: {len(issues)} issues") lines.append("-" * 40) # Show up to 10 examples per type for issue in issues[:10]: lines.append(f"\n Line {issue.line_number}:") lines.append(f" Input: {issue.input_text}") lines.append(f" Technical term: '{issue.technical_term}'") lines.append(f" Wrong expansion found: '{issue.wrong_expansion_found}'") if issue.suggested_fix: lines.append(f" Suggested fix available: Yes") if len(issues) > 10: lines.append(f"\n ... and {len(issues) - 10} more") # Term statistics lines.append("\n" + "-" * 70) lines.append("TECHNICAL TERM OCCURRENCES IN DATASET:") lines.append("-" * 70) for term, count in sorted(result.term_statistics.items(), key=lambda x: -x[1]): if count > 0: lines.append(f" {term}: {count} occurrences") lines.append("\n" + "=" * 70) return "\n".join(lines) def save_cleaned_dataset(filepath: Path, output_path: Path, result: AnalysisResult): """Save a cleaned version of the dataset.""" issues_by_line = defaultdict(list) for issue in result.issues_found: issues_by_line[issue.line_number].append(issue) fixed_count = 0 flagged_count = 0 with ( open(filepath, "r", encoding="utf-8") as f_in, open(output_path, "w", encoding="utf-8") as f_out, ): for line_num, line in enumerate(f_in, 1): line = line.strip() if not line: continue try: example = json.loads(line) if "query" not in example and "input" in example: example["query"] = example.pop("input") output_raw = example.get("output", []) if isinstance(output_raw, str): example["output"] = normalize_output_items( parse_output_text(output_raw) ) else: example["output"] = normalize_output_items(output_raw) if line_num in issues_by_line: issues = issues_by_line[line_num] fixed = fix_example(example, issues) if fixed: f_out.write(json.dumps(fixed) + "\n") fixed_count += 1 else: # Flag but don't fix ambiguous cases example["_flagged"] = True example["_flag_reason"] = ( f"Ambiguous term '{issues[0].technical_term}' may need review" ) f_out.write(json.dumps(example) + "\n") flagged_count += 1 else: f_out.write(json.dumps(example) + "\n") except json.JSONDecodeError: # Keep problematic lines as-is f_out.write(line + "\n") return fixed_count, flagged_count def main(): """Main entry point.""" # Paths script_dir = Path(__file__).parent input_path = script_dir / "data" / "qmd_expansion.jsonl" output_path = script_dir / "data" / "qmd_expansion_cleaned.jsonl" report_path = script_dir / "data" / "quality_report.txt" print(f"Analyzing dataset: {input_path}") print("-" * 50) if not input_path.exists(): print(f"Error: Input file not found: {input_path}") return 1 # Analyze the dataset result = analyze_dataset(input_path) # Generate and print report report = generate_report(result) print(report) # Save report to file with open(report_path, "w", encoding="utf-8") as f: f.write(report) print(f"\nReport saved to: {report_path}") # Save cleaned dataset fixed_count, flagged_count = save_cleaned_dataset(input_path, output_path, result) print(f"\nCleaned dataset saved to: {output_path}") print(f" - Examples fixed: {fixed_count}") print(f" - Examples flagged for review: {flagged_count}") print( f" - Examples unchanged: {result.total_examples - fixed_count - flagged_count}" ) # Summary statistics print("\n" + "=" * 50) print("SUMMARY") print("=" * 50) print(f"Total examples: {result.total_examples}") print(f"Total issues found: {len(result.issues_found)}") tech_issues = [ i for i in result.issues_found if i.issue_type == "wrong_tech_expansion" ] ambig_issues = [i for i in result.issues_found if i.issue_type == "ambiguous_term"] print(f" - Definite tech term errors: {len(tech_issues)}") print(f" - Ambiguous terms needing review: {len(ambig_issues)}") if len(result.issues_found) > 0: error_rate = len(result.issues_found) / result.total_examples * 100 print(f"\nError rate: {error_rate:.2f}%") return 0 if __name__ == "__main__": exit(main())