-
Notifications
You must be signed in to change notification settings - Fork 158
/
sentence_deduplication.py
69 lines (54 loc) · 2.66 KB
/
sentence_deduplication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from datatrove.executor.base import PipelineExecutor
from datatrove.executor.local import LocalPipelineExecutor
from datatrove.pipeline.dedup import SentenceDedupFilter, SentenceDedupSignature, SentenceFindDedups
from datatrove.pipeline.dedup.sentence_dedup import SentDedupConfig
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import GopherQualityFilter, LanguageFilter
from datatrove.pipeline.readers import JsonlReader, WarcReader
from datatrove.pipeline.writers.jsonl import JsonlWriter
from datatrove.utils.typeshelper import Languages
"""
example on how to use sentence-deduplication. sentence-deduplication implements deduplication as in:
https://jmlr.org/papers/v21/20-074.html
'To deduplicate the data set, we discarded all but one of any three-sentence span
occurring more than once in the data set.'
to run deduplication we need to run three different pipelines,
pipeline 1:
implements usual extraction + quality filtering, it ends with SentenceDedupSignature, preprended by a writer.
pipeline 2:
implements only SentenceFindDedups
pipeline 3:
implements SentenceDedupFilter prepended by a reader of the same writer-kind used during stage 1. after the
SentenceDedupFilter.
"""
# modify sentence dedup hyper params here
sent_dedup_config = SentDedupConfig(
n_sentences=3,
split_sentences=True, # set to False to split on \n instead
only_dedup_in_index=True,
min_doc_words=50,
)
FINDER_WORKERS = 10 # this will speed up/parallelize step 2
def run_example():
pipeline_1 = [
WarcReader(data_folder="warc/", limit=1000),
Trafilatura(),
GopherQualityFilter(min_stop_words=0),
LanguageFilter(language_threshold=0.5, languages=[Languages.english]),
JsonlWriter("intermediate/"),
SentenceDedupSignature(output_folder="c4/sigs", config=sent_dedup_config, finder_workers=FINDER_WORKERS),
]
pipeline_2 = [SentenceFindDedups(data_folder="c4/sigs", output_folder="c4/dups", config=sent_dedup_config)]
pipeline_3 = [
JsonlReader(data_folder="intermediate/"),
SentenceDedupFilter(data_folder="c4/dups", config=sent_dedup_config),
JsonlWriter("c4/final_output"), # save the final filtered output to disk
]
executor_1: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_1, workers=4, tasks=4)
executor_2: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_2, workers=1, tasks=FINDER_WORKERS)
executor_3: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_3, workers=4, tasks=4)
print(executor_1.run())
print(executor_2.run())
print(executor_3.run())
if __name__ == "__main__":
run_example()