-
Notifications
You must be signed in to change notification settings - Fork 2
/
query_indices.py
150 lines (129 loc) · 5.69 KB
/
query_indices.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
'''
This script is used to query the retrieval methods. Since the querying step may take a long time and can be done offline,
the step is done separately from the execution of the pipeline.
The script runs by reading a configuration file that is provided by the user. Sample query files for the different data
lakes and settings are provided in folder `config/retrieval/query`.
Run the script as follows:
```
python query_indices.py path/to/config.toml
```
'''
import argparse
import os
from pathlib import Path
import toml
from tqdm import tqdm
from src.data_structures.loggers import SimpleIndexLogger
from src.data_structures.retrieval_methods import ExactMatchingIndex, StarmieWrapper
from src.utils.indexing import (
DEFAULT_INDEX_DIR,
get_metadata_index,
load_index,
query_index,
)
PREFIX = {
"exact_matching": "em_index",
"starmie": "starmie_index",
}
def parse_args():
"""Parse the configuration file from CLI.
"""
parser = argparse.ArgumentParser()
parser.add_argument("config_file", action="store", type=argparse.FileType("r"))
return parser.parse_args()
def prepare_dirtree():
"""Prepare the dirtree to ensure that the required folders are found.
"""
os.makedirs("data/metadata/queries", exist_ok=True)
os.makedirs("results/query_results", exist_ok=True)
if __name__ == "__main__":
# Read the config file
args = parse_args()
prepare_dirtree()
config = toml.load(args.config_file)
# Get the parameters
jd_methods = config["join_discovery_method"]
data_lake_version = config["data_lake"]
rerank = config.get("hybrid", False)
# Iterations are used only to re-run the same configuration multiple times to log multiple runs and reduce variance
# in the results
iterations = config.get("iterations", 1)
query_cases = config["query_cases"]
# Load the metadata index.
mdata_index = get_metadata_index(data_lake_version)
for it in tqdm(range(iterations), position=1):
if "minhash" in jd_methods:
# If the index is minhash, the index is loaded only once, then queried multiple times.
index_name = "minhash_hybrid" if rerank else "minhash"
# The SimpleIndexLogger is used to track the time required to query each index
logger_minhash = SimpleIndexLogger(
index_name=index_name,
step="query",
data_lake_version=data_lake_version,
log_path="results/query_logging.txt",
)
logger_minhash.start_time("load")
minhash_index = load_index(
{"join_discovery_method": "minhash", "data_lake": data_lake_version}
)
logger_minhash.end_time("load")
for query_case in tqdm(query_cases, total=len(query_cases), leave=False):
tname = Path(query_case["table_path"]).stem
query_tab_path = Path(query_case["table_path"])
query_column = query_case["query_column"]
for jd_method in jd_methods:
if jd_method == "minhash":
# If the index is minhash, it is loaded only once.
index = minhash_index
index_logger = logger_minhash
elif jd_method == "exact_matching":
# Exact matching must be loaded once per querying result.
index_logger = SimpleIndexLogger(
index_name=jd_method,
step="query",
data_lake_version=data_lake_version,
log_path="results/query_logging.txt",
)
index_path = Path(
DEFAULT_INDEX_DIR,
data_lake_version,
f"{PREFIX[jd_method]}_{tname}_{query_column}.pickle",
)
index_logger.start_time("load")
if jd_method == "exact_matching":
index = ExactMatchingIndex(file_path=index_path)
else:
index = StarmieWrapper(file_path=index_path)
index_logger.end_time("load")
elif jd_method == "starmie":
# Indexing was done by starmie, so here we are just loading the query results and converting
# them to the format used for the pipeline.
index_logger = SimpleIndexLogger(
index_name=jd_method,
step="query",
data_lake_version=data_lake_version,
log_path="results/query_logging.txt",
)
index_path = Path(
DEFAULT_INDEX_DIR,
data_lake_version,
f"{PREFIX[jd_method]}-{tname}.pickle",
)
index_logger.start_time("load")
index = StarmieWrapper(file_path=index_path)
index_logger.end_time("load")
else:
raise ValueError(f"Unknown jd_method {jd_method}")
index_logger.update_query_parameters(tname, query_column)
# The index has been loaded, now it is queried and the results are saved.
query_result, index_logger = query_index(
index,
query_tab_path,
query_column,
mdata_index,
rerank=rerank,
index_logger=index_logger,
)
# If index logger is not None, save on file
if index_logger:
index_logger.to_logfile()