| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595 |
- #!/usr/bin/env python3
- """
- Data Quality Reviewer for Query Expansion Training Dataset
- This script identifies and flags/fixes semantic errors where technical terms
- are misunderstood. For example:
- - "gem find" expanded as "mineral hunt" instead of "ruby gem search"
- - "yarn spin" expanded as "wool twist" instead of "yarn package manager"
- The script uses contextual analysis to detect when technical terms
- are likely being used in a programming context vs. their everyday meaning.
- """
- import json
- import re
- from pathlib import Path
- from dataclasses import dataclass, field
- from typing import Optional
- from collections import defaultdict
- @dataclass
- class TechnicalTerm:
- """Definition of a technical term that might be misunderstood."""
- term: str # The ambiguous term (e.g., "liquid", "gem", "yarn")
- context_indicators: list[str] # Words that suggest tech context
- wrong_expansions: list[str] # Patterns that indicate wrong interpretation
- correct_domain: str # What domain this belongs to when technical
- correct_lex: list[str] # Correct lex expansions
- correct_vec: list[str] # Correct vec expansions
- # Known technical terms that are commonly misunderstood
- KNOWN_TECHNICAL_TERMS = [
- TechnicalTerm(
- term="liquid",
- context_indicators=["shopify", "template", "filter", "tag", "theme", "jekyll"],
- wrong_expansions=["fluid", "water", "pour", "drink", "beverage", "h2o", "wet"],
- correct_domain="Shopify/Jekyll templating language",
- correct_lex=["shopify template syntax", "liquid template filter"],
- correct_vec=["shopify liquid templating language", "liquid template engine filters"],
- ),
- TechnicalTerm(
- term="gem",
- context_indicators=["ruby", "bundler", "install", "gemfile", "rails", "require"],
- wrong_expansions=["mineral", "crystal", "jewel", "stone", "diamond", "jewelry", "precious"],
- correct_domain="Ruby package manager",
- correct_lex=["ruby gem package", "gem install command"],
- correct_vec=["ruby gem package manager", "rubygems library installation"],
- ),
- TechnicalTerm(
- term="yarn",
- context_indicators=["npm", "package", "install", "node", "javascript", "react", "webpack"],
- wrong_expansions=["thread", "wool", "knit", "spin", "textile", "fabric", "sew", "twist"],
- correct_domain="JavaScript package manager",
- correct_lex=["yarn package manager", "yarn install dependencies"],
- correct_vec=["yarn javascript package manager", "yarn npm alternative"],
- ),
- TechnicalTerm(
- term="hook",
- context_indicators=["react", "use", "state", "effect", "component", "callback", "git"],
- wrong_expansions=["fish", "fishing", "bait", "catch", "hang", "pirate"],
- correct_domain="React hooks or Git hooks",
- correct_lex=["react hooks api", "usestate useeffect"],
- correct_vec=["react hooks state management", "react functional component hooks"],
- ),
- TechnicalTerm(
- term="container",
- context_indicators=["docker", "kubernetes", "k8s", "image", "orchestration", "pod"],
- wrong_expansions=["box", "storage", "shipping", "cargo", "tupperware", "jar", "vessel"],
- correct_domain="Docker/Kubernetes containers",
- correct_lex=["docker container", "container image"],
- correct_vec=["docker container virtualization", "container orchestration platform"],
- ),
- TechnicalTerm(
- term="branch",
- context_indicators=["git", "merge", "checkout", "commit", "main", "master", "repo"],
- wrong_expansions=["tree", "limb", "wood", "leaf", "twig", "forest"],
- correct_domain="Git version control",
- correct_lex=["git branch", "git checkout branch"],
- correct_vec=["git branch version control", "git branching workflow"],
- ),
- TechnicalTerm(
- term="decorator",
- context_indicators=["python", "@", "function", "wrapper", "class", "def"],
- wrong_expansions=["interior", "design", "paint", "furniture", "decor", "ornament"],
- correct_domain="Python decorators",
- correct_lex=["python decorator function", "@decorator syntax"],
- correct_vec=["python function decorators", "python decorator pattern"],
- ),
- TechnicalTerm(
- term="bean",
- context_indicators=["java", "spring", "injection", "dependency", "servlet", "ejb"],
- wrong_expansions=["coffee", "food", "vegetable", "legume", "plant", "soy"],
- correct_domain="Java Beans / Spring Beans",
- correct_lex=["java bean class", "spring bean injection"],
- correct_vec=["java enterprise beans", "spring dependency injection beans"],
- ),
- TechnicalTerm(
- term="shell",
- context_indicators=["bash", "script", "terminal", "command", "linux", "unix", "zsh"],
- wrong_expansions=["seashell", "ocean", "beach", "clam", "oyster", "egg"],
- correct_domain="Unix/Linux shell scripting",
- correct_lex=["bash shell script", "shell command"],
- correct_vec=["unix shell scripting", "bash command line shell"],
- ),
- TechnicalTerm(
- term="rust",
- context_indicators=["cargo", "crate", "ownership", "borrow", "lifetime", "unsafe"],
- wrong_expansions=["oxidation", "metal", "corrosion", "decay", "iron", "orange"],
- correct_domain="Rust programming language",
- correct_lex=["rust programming language", "rust cargo package"],
- correct_vec=["rust systems programming", "rust memory safety"],
- ),
- TechnicalTerm(
- term="go",
- context_indicators=["golang", "goroutine", "channel", "defer", "gofmt", "module"],
- wrong_expansions=["travel", "move", "walk", "game", "board game", "leave", "depart"],
- correct_domain="Go programming language",
- correct_lex=["golang programming", "go language syntax"],
- correct_vec=["go programming language", "golang concurrent programming"],
- ),
- TechnicalTerm(
- term="swift",
- context_indicators=["ios", "xcode", "apple", "uikit", "swiftui", "cocoa"],
- wrong_expansions=["fast", "quick", "bird", "speed", "rapid", "taylor"],
- correct_domain="Swift programming language",
- correct_lex=["swift ios development", "swift programming language"],
- correct_vec=["swift apple programming language", "swift ios app development"],
- ),
- TechnicalTerm(
- term="pod",
- context_indicators=["kubernetes", "k8s", "deployment", "service", "cluster", "node"],
- wrong_expansions=["pea", "seed", "plant", "vegetable", "legume", "whale"],
- correct_domain="Kubernetes pods",
- correct_lex=["kubernetes pod", "k8s pod deployment"],
- correct_vec=["kubernetes pod container group", "k8s pod orchestration"],
- ),
- TechnicalTerm(
- term="redis",
- context_indicators=["cache", "database", "key-value", "memory", "pub/sub", "queue"],
- wrong_expansions=[], # "redis" doesn't have common wrong meanings
- correct_domain="Redis in-memory database",
- correct_lex=["redis cache", "redis database"],
- correct_vec=["redis in-memory data store", "redis caching solution"],
- ),
- TechnicalTerm(
- term="kafka",
- context_indicators=["message", "stream", "queue", "broker", "topic", "producer", "consumer"],
- wrong_expansions=["franz", "author", "writer", "novel", "metamorphosis", "literature"],
- correct_domain="Apache Kafka message queue",
- correct_lex=["apache kafka", "kafka message broker"],
- correct_vec=["apache kafka streaming platform", "kafka message queue"],
- ),
- TechnicalTerm(
- term="elastic",
- context_indicators=["elasticsearch", "search", "index", "kibana", "logstash", "query"],
- wrong_expansions=["stretch", "rubber", "flexible", "band", "bouncy"],
- correct_domain="Elasticsearch",
- correct_lex=["elasticsearch", "elastic search index"],
- correct_vec=["elasticsearch full-text search", "elastic stack"],
- ),
- TechnicalTerm(
- term="spark",
- context_indicators=["apache", "hadoop", "data", "rdd", "dataframe", "pyspark"],
- wrong_expansions=["fire", "ignite", "flame", "plug", "electricity"],
- correct_domain="Apache Spark",
- correct_lex=["apache spark", "spark data processing"],
- correct_vec=["apache spark big data processing", "spark cluster computing"],
- ),
- TechnicalTerm(
- term="flask",
- context_indicators=["python", "web", "route", "api", "jinja", "werkzeug"],
- wrong_expansions=["bottle", "container", "lab", "chemistry", "drink", "thermos"],
- correct_domain="Flask web framework",
- correct_lex=["flask python web framework", "flask api"],
- correct_vec=["flask python web development", "flask microframework"],
- ),
- TechnicalTerm(
- term="django",
- context_indicators=["python", "web", "orm", "model", "view", "template"],
- wrong_expansions=["jazz", "music", "reinhardt", "guitar", "movie", "western"],
- correct_domain="Django web framework",
- correct_lex=["django python framework", "django web development"],
- correct_vec=["django python web framework", "django orm models"],
- ),
- TechnicalTerm(
- term="rails",
- context_indicators=["ruby", "gem", "activerecord", "model", "controller", "migration"],
- wrong_expansions=["train", "track", "railroad", "railway", "metal"],
- correct_domain="Ruby on Rails",
- correct_lex=["ruby on rails", "rails web framework"],
- correct_vec=["ruby on rails framework", "rails mvc architecture"],
- ),
- TechnicalTerm(
- term="node",
- context_indicators=["javascript", "npm", "express", "async", "require", "module"],
- wrong_expansions=["lump", "knot", "bump", "growth", "junction"],
- correct_domain="Node.js",
- correct_lex=["node.js javascript", "nodejs runtime"],
- correct_vec=["node.js javascript runtime", "nodejs server-side javascript"],
- ),
- TechnicalTerm(
- term="maven",
- context_indicators=["java", "pom", "dependency", "build", "artifact", "repository"],
- wrong_expansions=["expert", "specialist", "connoisseur"],
- correct_domain="Apache Maven",
- correct_lex=["apache maven", "maven build tool"],
- correct_vec=["apache maven java build", "maven dependency management"],
- ),
- TechnicalTerm(
- term="gradle",
- context_indicators=["java", "kotlin", "android", "build", "groovy", "task"],
- wrong_expansions=["grade", "slope", "hill", "incline"],
- correct_domain="Gradle build tool",
- correct_lex=["gradle build tool", "gradle android"],
- correct_vec=["gradle java build automation", "gradle kotlin dsl"],
- ),
- TechnicalTerm(
- term="ant",
- context_indicators=["java", "build", "xml", "target", "task"],
- wrong_expansions=["insect", "bug", "colony", "hill", "picnic"],
- correct_domain="Apache Ant build tool",
- correct_lex=["apache ant", "ant build xml"],
- correct_vec=["apache ant java build", "ant build automation"],
- ),
- ]
- @dataclass
- class Issue:
- """Represents an issue found in a dataset example."""
- line_number: int
- input_text: str
- output_text: str
- issue_type: str
- technical_term: str
- wrong_expansion_found: str
- suggested_fix: Optional[str] = None
- @dataclass
- class AnalysisResult:
- """Results of analyzing the dataset."""
- total_examples: int = 0
- issues_found: list[Issue] = field(default_factory=list)
- examples_with_correct_tech_terms: list[tuple[int, str]] = field(default_factory=list)
- term_statistics: dict = field(default_factory=lambda: defaultdict(int))
- def check_for_wrong_expansion(output_text: str, term: TechnicalTerm) -> Optional[str]:
- """Check if the output contains wrong expansions for a technical term."""
- output_lower = output_text.lower()
- for wrong in term.wrong_expansions:
- if wrong.lower() in output_lower:
- return wrong
- return None
- def has_tech_context(input_text: str, term: TechnicalTerm) -> bool:
- """Check if the input has indicators of a technical context."""
- input_lower = input_text.lower()
- for indicator in term.context_indicators:
- if indicator.lower() in input_lower:
- return True
- return False
- def is_likely_tech_query(input_text: str) -> bool:
- """
- Heuristic to determine if a short query is likely tech-related.
- Short queries like "gem find" or "yarn spin" are ambiguous.
- """
- tech_patterns = [
- r'\b(install|config|setup|build|run|debug|test|deploy|compile)\b',
- r'\b(api|cli|sdk|lib|pkg|npm|pip|cargo)\b',
- r'\b(func|class|method|var|const|let|def)\b',
- r'\b(http|https|url|port|host|server|client)\b',
- r'\b(json|xml|yaml|csv|sql|html|css|js)\b',
- ]
- input_lower = input_text.lower()
- for pattern in tech_patterns:
- if re.search(pattern, input_lower):
- return True
- return False
- def has_non_tech_context(input_text: str, term: TechnicalTerm) -> bool:
- """
- Check if the input clearly indicates a non-technical context.
- This helps avoid false positives for words like "car rust", "yarn spin", etc.
- """
- input_lower = input_text.lower()
- term_lower = term.term.lower()
- # Define non-tech context indicators for each ambiguous term
- non_tech_contexts = {
- "rust": ["car", "metal", "iron", "steel", "corrosion", "prevention", "remove", "body"],
- "gem": ["gemstone", "jewelry", "jewel", "diamond", "precious", "stone", "cut", "shop", "buy", "wear"],
- "yarn": ["knit", "crochet", "spin", "wool", "thread", "textile", "fabric", "sew", "weave"],
- "hook": ["fishing", "crochet", "hang", "coat", "wall", "ceiling"],
- "container": ["storage", "plastic", "food", "shipping", "cargo", "kitchen", "box"],
- "branch": ["tree", "bank", "library", "store", "office", "organization"],
- "decorator": ["interior", "home", "room", "house", "design", "party", "cake", "wedding"],
- "bean": ["coffee", "soy", "kidney", "black", "green", "garden", "cooking", "food", "plant", "grow"],
- "shell": ["sea", "beach", "egg", "nut", "turtle", "snail", "crab", "clam", "oyster"],
- "spark": ["plug", "fire", "ignite", "car", "engine", "electric", "romance"],
- "go": ["travel", "vacation", "trip", "walk", "run", "leave", "visit", "tour"],
- "swift": ["taylor", "concert", "music", "singer", "speed", "fast", "bird"],
- "pod": ["pea", "whale", "orca", "dolphin", "vegetable", "seed", "plant"],
- "ant": ["insect", "colony", "fire", "carpenter", "pest", "bug", "picnic"],
- "node": ["lymph", "medical", "body", "tree", "network point"],
- "rails": ["train", "railroad", "railway", "track", "transit", "fence"],
- "flask": ["lab", "chemistry", "drink", "hip", "thermos", "bottle", "water"],
- "django": ["jazz", "music", "reinhardt", "guitar", "movie", "western", "unchained"],
- "maven": ["expert", "connoisseur", "specialist", "guru"],
- "gradle": ["grade", "school", "slope"],
- "kafka": ["franz", "author", "novel", "metamorphosis", "literature", "writer", "book"],
- "elastic": ["band", "rubber", "stretch", "flexible", "waist", "fabric"],
- }
- if term_lower in non_tech_contexts:
- for context_word in non_tech_contexts[term_lower]:
- if context_word.lower() in input_lower:
- return True
- return False
- def analyze_example(line_num: int, input_text: str, output_text: str) -> list[Issue]:
- """Analyze a single example for potential issues."""
- issues = []
- input_lower = input_text.lower()
- for term in KNOWN_TECHNICAL_TERMS:
- term_lower = term.term.lower()
- # Check if the input contains this technical term
- if term_lower not in input_lower:
- continue
- # Check if output has wrong expansion
- wrong_expansion = check_for_wrong_expansion(output_text, term)
- if wrong_expansion is None:
- continue
- # Skip if the context clearly indicates non-technical usage
- if has_non_tech_context(input_text, term):
- continue
- # Determine if this is likely a technical context
- is_tech = has_tech_context(input_text, term) or is_likely_tech_query(input_text)
- # For very short inputs that contain ONLY the tech term (like "gem find"),
- # these are ambiguous and could be tech-related
- word_count = len(input_text.split())
- words = [w.lower() for w in input_text.split()]
- # Only flag if it's clearly a tech context OR a very short query
- # where the term appears prominently (e.g., "gem find", "yarn add")
- if is_tech:
- # Create suggested fix for definite tech issues
- suggested_output = f"lex: {term.correct_lex[0]}\nlex: {term.correct_lex[1] if len(term.correct_lex) > 1 else term.correct_lex[0]}\nvec: {term.correct_vec[0]}\nvec: {term.correct_vec[1] if len(term.correct_vec) > 1 else term.correct_vec[0]}\nhyde: {term.correct_domain} is a concept that provides functionality for software development."
- issue = Issue(
- line_number=line_num,
- input_text=input_text,
- output_text=output_text[:200] + "..." if len(output_text) > 200 else output_text,
- issue_type="wrong_tech_expansion",
- technical_term=term.term,
- wrong_expansion_found=wrong_expansion,
- suggested_fix=suggested_output
- )
- issues.append(issue)
- elif word_count <= 2 and term_lower in words:
- # Very short query with the term as a primary word - truly ambiguous
- issue = Issue(
- line_number=line_num,
- input_text=input_text,
- output_text=output_text[:200] + "..." if len(output_text) > 200 else output_text,
- issue_type="ambiguous_term",
- technical_term=term.term,
- wrong_expansion_found=wrong_expansion,
- suggested_fix=None
- )
- issues.append(issue)
- return issues
- def analyze_dataset(filepath: Path) -> AnalysisResult:
- """Analyze the entire dataset for issues."""
- result = AnalysisResult()
- with open(filepath, 'r', encoding='utf-8') as f:
- for line_num, line in enumerate(f, 1):
- line = line.strip()
- if not line:
- continue
- try:
- example = json.loads(line)
- input_text = example.get('input', '')
- output_text = example.get('output', '')
- result.total_examples += 1
- # Analyze for issues
- issues = analyze_example(line_num, input_text, output_text)
- result.issues_found.extend(issues)
- # Track term statistics
- for term in KNOWN_TECHNICAL_TERMS:
- if term.term.lower() in input_text.lower():
- result.term_statistics[term.term] += 1
- except json.JSONDecodeError as e:
- print(f"Warning: Could not parse line {line_num}: {e}")
- return result
- def fix_example(example: dict, issues: list[Issue]) -> Optional[dict]:
- """
- Attempt to fix an example based on identified issues.
- Returns None if no fix is needed or possible.
- """
- # Only fix examples with definite tech context issues
- tech_issues = [i for i in issues if i.issue_type == "wrong_tech_expansion" and i.suggested_fix]
- if not tech_issues:
- return None
- # Use the first tech issue's fix (they should be similar)
- issue = tech_issues[0]
- fixed = example.copy()
- fixed['output'] = issue.suggested_fix
- fixed['_fixed'] = True
- fixed['_original_output'] = example['output']
- fixed['_fix_reason'] = f"Technical term '{issue.technical_term}' was incorrectly expanded as '{issue.wrong_expansion_found}'"
- return fixed
- def generate_report(result: AnalysisResult) -> str:
- """Generate a human-readable report of the analysis."""
- lines = []
- lines.append("=" * 70)
- lines.append("QUERY EXPANSION DATASET QUALITY REPORT")
- lines.append("=" * 70)
- lines.append("")
- lines.append(f"Total examples analyzed: {result.total_examples}")
- lines.append(f"Issues found: {len(result.issues_found)}")
- lines.append("")
- # Group issues by type
- by_type = defaultdict(list)
- for issue in result.issues_found:
- by_type[issue.issue_type].append(issue)
- lines.append("-" * 70)
- lines.append("ISSUES BY TYPE:")
- lines.append("-" * 70)
- for issue_type, issues in by_type.items():
- lines.append(f"\n{issue_type.upper()}: {len(issues)} issues")
- lines.append("-" * 40)
- # Show up to 10 examples per type
- for issue in issues[:10]:
- lines.append(f"\n Line {issue.line_number}:")
- lines.append(f" Input: {issue.input_text}")
- lines.append(f" Technical term: '{issue.technical_term}'")
- lines.append(f" Wrong expansion found: '{issue.wrong_expansion_found}'")
- if issue.suggested_fix:
- lines.append(f" Suggested fix available: Yes")
- if len(issues) > 10:
- lines.append(f"\n ... and {len(issues) - 10} more")
- # Term statistics
- lines.append("\n" + "-" * 70)
- lines.append("TECHNICAL TERM OCCURRENCES IN DATASET:")
- lines.append("-" * 70)
- for term, count in sorted(result.term_statistics.items(), key=lambda x: -x[1]):
- if count > 0:
- lines.append(f" {term}: {count} occurrences")
- lines.append("\n" + "=" * 70)
- return "\n".join(lines)
- def save_cleaned_dataset(filepath: Path, output_path: Path, result: AnalysisResult):
- """Save a cleaned version of the dataset."""
- issues_by_line = defaultdict(list)
- for issue in result.issues_found:
- issues_by_line[issue.line_number].append(issue)
- fixed_count = 0
- flagged_count = 0
- with open(filepath, 'r', encoding='utf-8') as f_in, \
- open(output_path, 'w', encoding='utf-8') as f_out:
- for line_num, line in enumerate(f_in, 1):
- line = line.strip()
- if not line:
- continue
- try:
- example = json.loads(line)
- if line_num in issues_by_line:
- issues = issues_by_line[line_num]
- fixed = fix_example(example, issues)
- if fixed:
- f_out.write(json.dumps(fixed) + '\n')
- fixed_count += 1
- else:
- # Flag but don't fix ambiguous cases
- example['_flagged'] = True
- example['_flag_reason'] = f"Ambiguous term '{issues[0].technical_term}' may need review"
- f_out.write(json.dumps(example) + '\n')
- flagged_count += 1
- else:
- f_out.write(json.dumps(example) + '\n')
- except json.JSONDecodeError:
- # Keep problematic lines as-is
- f_out.write(line + '\n')
- return fixed_count, flagged_count
- def main():
- """Main entry point."""
- # Paths
- script_dir = Path(__file__).parent
- input_path = script_dir / "data" / "qmd_expansion.jsonl"
- output_path = script_dir / "data" / "qmd_expansion_cleaned.jsonl"
- report_path = script_dir / "data" / "quality_report.txt"
- print(f"Analyzing dataset: {input_path}")
- print("-" * 50)
- if not input_path.exists():
- print(f"Error: Input file not found: {input_path}")
- return 1
- # Analyze the dataset
- result = analyze_dataset(input_path)
- # Generate and print report
- report = generate_report(result)
- print(report)
- # Save report to file
- with open(report_path, 'w', encoding='utf-8') as f:
- f.write(report)
- print(f"\nReport saved to: {report_path}")
- # Save cleaned dataset
- fixed_count, flagged_count = save_cleaned_dataset(input_path, output_path, result)
- print(f"\nCleaned dataset saved to: {output_path}")
- print(f" - Examples fixed: {fixed_count}")
- print(f" - Examples flagged for review: {flagged_count}")
- print(f" - Examples unchanged: {result.total_examples - fixed_count - flagged_count}")
- # Summary statistics
- print("\n" + "=" * 50)
- print("SUMMARY")
- print("=" * 50)
- print(f"Total examples: {result.total_examples}")
- print(f"Total issues found: {len(result.issues_found)}")
- tech_issues = [i for i in result.issues_found if i.issue_type == "wrong_tech_expansion"]
- ambig_issues = [i for i in result.issues_found if i.issue_type == "ambiguous_term"]
- print(f" - Definite tech term errors: {len(tech_issues)}")
- print(f" - Ambiguous terms needing review: {len(ambig_issues)}")
- if len(result.issues_found) > 0:
- error_rate = len(result.issues_found) / result.total_examples * 100
- print(f"\nError rate: {error_rate:.2f}%")
- return 0
- if __name__ == "__main__":
- exit(main())
|