From a1a58ec124ce279ceddae010625bee1bae68aeb7 Mon Sep 17 00:00:00 2001 From: Rodney Kinney Date: Wed, 18 Oct 2023 16:55:55 -0700 Subject: [PATCH] Text Modification Config --- configs/c4-replication/mixer.yaml | 21 ++-- docs/examples/wikipedia-mixer.yaml | 9 +- docs/getting-started.md | 20 ++-- docs/mixer.md | 9 +- python/dolma/cli/mixer.py | 43 +++++-- src/shard.rs | 180 +++++++++++++++++------------ tests/config/c4-cleaned.json | 26 +++-- tests/config/email-spans.json | 26 +++-- tests/config/paragraph-spans.json | 16 +-- 9 files changed, 207 insertions(+), 143 deletions(-) diff --git a/configs/c4-replication/mixer.yaml b/configs/c4-replication/mixer.yaml index 908b8148..8eabb027 100644 --- a/configs/c4-replication/mixer.yaml +++ b/configs/c4-replication/mixer.yaml @@ -34,15 +34,16 @@ streams: # exclude documents that contain one or more naughty words - $.attributes[?(@.c4_v2__c4_v2__has_naughty_word && @.c4_v2__c4_v2__has_naughty_word[0] && @.c4_v2__c4_v2__has_naughty_word[0][2] > 0.5)] - span_replacement: - # remove lines that do not end in punctuation - - span: $.attributes.c4_v2__c4_v2__lines_with_no_ending_punctuation - min_score: 0.5 - replacement: "" - - # remove lines that are too short (less than 3 words as defined by C4 rules) - - span: $.attributes.c4_v2__c4_v2__lines_with_too_few_words - min_score: 0.5 - replacement: "" + text_modification: + span_replacement: + # remove lines that do not end in punctuation + - span: $.attributes.c4_v2__c4_v2__lines_with_no_ending_punctuation + min_score: 0.5 + replacement: "" + + # remove lines that are too short (less than 3 words as defined by C4 rules) + - span: $.attributes.c4_v2__c4_v2__lines_with_too_few_words + min_score: 0.5 + replacement: "" processes: 8 diff --git a/docs/examples/wikipedia-mixer.yaml b/docs/examples/wikipedia-mixer.yaml index bbd692ee..15ee0bcb 100644 --- a/docs/examples/wikipedia-mixer.yaml +++ b/docs/examples/wikipedia-mixer.yaml @@ -17,9 +17,10 @@ streams: - "$.attributes[?(@.exp__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en[0][2] <= 0.5)]" - "$@.attributes[?(@.bff_duplicate_paragraph_spans && @.bff_duplicate_paragraph_spans[0] && @.bff_duplicate_paragraph_spans[0][2] >= 1.0)]" - span_replacement: - - span: "$.attributes.exp__cld2_en_paragraph_with_doc_score_v2__not_en" - min_score: 0.1 - replacement: '' + text_modification: + span_replacement: + - span: "$.attributes.exp__cld2_en_paragraph_with_doc_score_v2__not_en" + min_score: 0.1 + replacement: '' processes: 1 diff --git a/docs/getting-started.md b/docs/getting-started.md index 3bfbe1c4..f42a02ac 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -157,15 +157,17 @@ Further, we override the number of processes to use to 96 using the `--processes "$@.attributes[?(@.bff_duplicate_paragraph_spans && @.bff_duplicate_paragraph_spans[0] && @.bff_duplicate_paragraph_spans[0][2] >= 1.0)]" ] }, - # span replacement allows you to replace spans of text with a different string - "span_replacement": [ - { - # remove paragraphs whose not-English cld2 socre is below 0.9 in a document - "span": "$.attributes.exp__cld2_en_paragraph_with_doc_score_v2__not_en", - "min_score": 0.1, - "replacement": "" - } - ] + "text_modification": { + # span replacement allows you to replace spans of text with a different string + "span_replacement": [ + { + # remove paragraphs whose not-English cld2 socre is below 0.9 in a document + "span": "$.attributes.exp__cld2_en_paragraph_with_doc_score_v2__not_en", + "min_score": 0.1, + "replacement": "" + } + ] + } } ], # this process option is overridden by the command line flag diff --git a/docs/mixer.md b/docs/mixer.md index afa49fd6..9d105074 100644 --- a/docs/mixer.md +++ b/docs/mixer.md @@ -22,10 +22,11 @@ The following parameters are supported either via CLI (e.g. `dolma mix --paramet |`streams[].output.discard_fields`|No| Top-level fields in the `discard_fields` list will be dropped from the output documents. | |`streams[].filter.include`|No| Optional content-based filtering. Default = keep everything. Documents are retained if they match any of the `include` patterns (or if no `include` patterns are specified) AND if they match none of the `exclude` patterns. Pattern syntax is [jsonpath](https://support.smartbear.com/alertsite/docs/monitors/api/endpoint/jsonpath.html#filters). | |`streams[].filter.exclude`|No| Optional content-based filtering. Default = keep everything. Documents are retained if they match any of the `include` patterns (or if no `include` patterns are specified) AND if they match none of the `exclude` patterns. Pattern syntax is [jsonpath](https://support.smartbear.com/alertsite/docs/monitors/api/endpoint/jsonpath.html#filters). | -|`streams[].span_replacement`|No| A list of objects specifying spans of text to be replaced. | -|`streams[].span_replacement[].span`|No| A json-path expression for an attribute that contains an array of spans. Each span should be list of length three: `[start, end, score]`. | -|`streams[].span_replacement[].min_score`|No| If the span score is less than this value, the span will not be replaced. | -|`streams[].span_replacement[].replacement`|No| The text that should be inserted in place of the span. Use `{}` to represent the original text. | +|`streams[].text_modification.trim_whitespace`|No| Remove leading and trailing whitespace from document text. | +|`streams[].text_modification.minimum_text_length`|No| Skip writing the document if the final text is shorter than this size (in bytes). | +|`streams[].text_modification.span_replacement[].span`|No| A json-path expression for an attribute that contains an array of spans. Each span should be list of length three: `[start, end, score]`. | +|`streams[].text_modification.span_replacement[].min_score`|No| If the span score is less than this value, the span will not be replaced. | +|`streams[].text_modification.span_replacement[].replacement`|No| The text that should be inserted in place of the span. Use `{}` to represent the original text. | |`work_dir.input`|No| Path to a local scratch directory where temporary input files can be placed. If not provided, Dolma will make one for you and delete it upon completion. | |`work_dir.output`|No| Path to a local scratch directory where temporary output files can be placed. If not provided, Dolma will make one for you and delete it upon completion. | |`processes`|No| Number of processes to use for mixing. By default 1 process is used. | diff --git a/python/dolma/cli/mixer.py b/python/dolma/cli/mixer.py index 814c7a49..ba77f8af 100644 --- a/python/dolma/cli/mixer.py +++ b/python/dolma/cli/mixer.py @@ -31,6 +31,17 @@ class SpanReplacementConfig: replacement: str = field(default="", help="Replacement for the span") +@dataclass +class TextModificationConfig: + span_replacement: List[SpanReplacementConfig] = field(default=[], help="Configuration for replacing spans.") + trim_whitespace: bool = field( + default=False, help="If true, trim leading and trailing whitespace from text (after span replacement)" + ) + minimum_text_length: int = field( + default=0, help="Skip writing the document if the final text is shorter than this size (in bytes)" + ) + + @dataclass class StreamConfig: name: str = field(help="Name of the stream. Required.") @@ -42,7 +53,9 @@ class StreamConfig: filter: Optional[FilterConfig] = field( # pyright: ignore default=None, help="Configuration for filtering documents." ) - span_replacement: List[SpanReplacementConfig] = field(default=[], help="Configuration for replacing spans.") + text_modification: Optional[TextModificationConfig] = field( + default=None, help="Configuration for modifying the document text" + ) @dataclass @@ -83,17 +96,23 @@ def run(cls, parsed_config: MixerConfig): "exclude": [str(i) for i in stream_config.filter.exclude], } - for span_replacement in stream_config.span_replacement: - stream_config_dict.setdefault("span_replacement", []).append( - { - "span": str(span_replacement.span), - "min_score": float(span_replacement.min_score), - "replacement": str(span_replacement.replacement), - } - ) - - if "span_replacement" not in stream_config_dict and "filter" not in stream_config_dict: - raise DolmaConfigError("Either `filter` or `span_replacement` must be specified") + if stream_config.text_modification is not None: + text_modification_dict = { + "trim_whitespace": stream_config.text_modification.trim_whitespace, + "minimum_text_length": stream_config.text_modification.minimum_text_length, + } + stream_config_dict["text_modification"] = text_modification_dict + for span_replacement in stream_config.text_modification.span_replacement: + text_modification_dict.setdefault("span_replacement", []).append( + { + "span": str(span_replacement.span), + "min_score": float(span_replacement.min_score), + "replacement": str(span_replacement.replacement), + } + ) + + if "text_modification" not in stream_config_dict and "filter" not in stream_config_dict: + raise DolmaConfigError("Either `filter` or `text_modification` must be specified") # perform some path validation to make sure we don't call the mixer with invalid config total_matching_documents = 0 diff --git a/src/shard.rs b/src/shard.rs index 4bff4f4d..fe1cb323 100644 --- a/src/shard.rs +++ b/src/shard.rs @@ -21,7 +21,7 @@ pub struct Shard { pub inputs: Vec, pub output: String, pub filter: Option, - pub span_replacements: Option>, + pub text_modification: Option, pub discard_fields: Option>, } @@ -82,7 +82,7 @@ impl Shard { inputs: shard_inputs.clone(), output: output.clone(), filter: stream_config.filter.clone(), - span_replacements: stream_config.span_replacement.clone(), + text_modification: stream_config.text_modification.clone(), discard_fields: stream_config.output.discard_fields.clone(), }; shards.push(shard); @@ -101,7 +101,7 @@ impl Shard { inputs: shard_inputs.clone(), output: output.clone(), filter: stream_config.filter.clone(), - span_replacements: stream_config.span_replacement.clone(), + text_modification: stream_config.text_modification.clone(), discard_fields: stream_config.output.discard_fields.clone(), }; shards.push(shard); @@ -271,91 +271,115 @@ impl Shard { } } if should_write { - if self.span_replacements.is_some() { - let mut replacements = self - .span_replacements - .as_ref() - .unwrap() - .iter() - .flat_map(|r| r.find_spans_to_replace(&data).unwrap()) - .collect::>(); - if !replacements.is_empty() { - replacements.sort_by(|a, b| a.start.cmp(&b.start)); - - let mut new_text = String::new(); + self.text_modification + .as_ref() + .map(|text_modification_config| { let old_text = data["text"].as_str().unwrap().to_owned(); - let mut span_index = 0; - let mut i = 0; - let mut span_start_byte_index = 0; - let mut chars = old_text.char_indices(); - let mut byte_index_with_char = chars.next(); - while byte_index_with_char.is_some() { - let (byte_index, c) = byte_index_with_char.unwrap(); - if span_index < replacements.len() { - let is_inside_span = i >= replacements[span_index].start - && i < replacements[span_index].end; - if i == replacements[span_index].start { - span_start_byte_index = byte_index; - } - if !is_inside_span { - if i == replacements[span_index].end { - if !replacements[span_index].replacement.is_empty() - { - let replacement_text = replacements[span_index] - .replacement - .to_owned() - .replace( - "{}", - old_text - [span_start_byte_index..byte_index] - .to_owned() - .as_str(), - ); - new_text.push_str(&replacement_text); + let mut new_text = String::new(); + if text_modification_config.span_replacement.is_some() { + let mut replacements = text_modification_config + .span_replacement + .as_ref() + .unwrap() + .iter() + .flat_map(|r| r.find_spans_to_replace(&data).unwrap()) + .collect::>(); + if !replacements.is_empty() { + replacements.sort_by(|a, b| a.start.cmp(&b.start)); + + let mut span_index = 0; + let mut i = 0; + let mut span_start_byte_index = 0; + let mut chars = old_text.char_indices(); + let mut byte_index_with_char = chars.next(); + while byte_index_with_char.is_some() { + let (byte_index, c) = byte_index_with_char.unwrap(); + if span_index < replacements.len() { + let is_inside_span = i + >= replacements[span_index].start + && i < replacements[span_index].end; + if i == replacements[span_index].start { + span_start_byte_index = byte_index; } - while span_index < replacements.len() - && replacements[span_index].start < i - { - span_index += 1; + if !is_inside_span { + if i == replacements[span_index].end { + if !replacements[span_index] + .replacement + .is_empty() + { + let replacement_text = replacements + [span_index] + .replacement + .to_owned() + .replace( + "{}", + old_text[span_start_byte_index + ..byte_index] + .to_owned() + .as_str(), + ); + new_text.push_str(&replacement_text); + } + while span_index < replacements.len() + && replacements[span_index].start < i + { + span_index += 1; + } + } + if span_index < replacements.len() + && replacements[span_index].start == i + { + span_start_byte_index = byte_index; + } else { + new_text.push(c); + } } - } - if span_index < replacements.len() - && replacements[span_index].start == i - { - span_start_byte_index = byte_index; } else { new_text.push(c); } + i += 1; + byte_index_with_char = chars.next(); + } + if span_index < replacements.len() + && !replacements[span_index].replacement.is_empty() + { + let replacement_text = replacements[span_index] + .replacement + .to_owned() + .replace( + "{}", + old_text[span_start_byte_index..] + .to_owned() + .as_str(), + ); + new_text.push_str(&replacement_text); } } else { - new_text.push(c); + new_text = old_text; } - i += 1; - byte_index_with_char = chars.next(); + } else { + new_text = old_text; } - if span_index < replacements.len() - && !replacements[span_index].replacement.is_empty() - { - let replacement_text = - replacements[span_index].replacement.to_owned().replace( - "{}", - old_text[span_start_byte_index..].to_owned().as_str(), - ); - new_text.push_str(&replacement_text); + if text_modification_config.trim_whitespace { + new_text = new_text.trim().to_owned(); } data["text"] = Value::String(new_text); - } - } + }); + for f in self.discard_fields.iter().flatten() { data.as_object_mut().unwrap().remove(f); } - // TODO: add check to make sure that the text field is not empty. Something like - // if !data["text"].as_str().unwrap().is_empty() || skip_empty - // make it configurable and off by default - lines_written += 1; - serde_json::to_writer(&mut writer, &data)?; - writer.write_all(b"\n")?; + let min_text_length = self + .text_modification + .as_ref() + .map_or(0, |c| c.minimum_text_length) + as usize; + if data["text"].as_str().unwrap().len() >= min_text_length { + lines_written += 1; + serde_json::to_writer(&mut writer, &data)?; + writer.write_all(b"\n")?; + } } } cache.finalize_input(&input_path.doc_path)?; @@ -397,9 +421,19 @@ pub mod shard_config { pub attributes: Vec, // json-path-based filtering pub filter: Option, + // text modification + pub text_modification: Option, + pub output: StreamOutputConfig, + } + + #[derive(Serialize, Deserialize, Clone)] + pub struct TextModificationConfig { // span replacement pub span_replacement: Option>, - pub output: StreamOutputConfig, + // leading/trailing whitespace + pub trim_whitespace: bool, + // Cutoff for final document length + pub minimum_text_length: i32, } #[derive(Serialize, Deserialize, Clone)] diff --git a/tests/config/c4-cleaned.json b/tests/config/c4-cleaned.json index 7a6fa6b8..e3434234 100644 --- a/tests/config/c4-cleaned.json +++ b/tests/config/c4-cleaned.json @@ -12,18 +12,20 @@ "attributes": [ "c4_rules" ], - "span_replacement": [ - { - "span": "$.attributes.c4_rules__c4_v1__lines_with_no_ending_punctuation", - "min_score": 0.5, - "replacement": "" - }, - { - "span": "$.attributes.c4_rules__c4_v1__lines_with_too_few_words", - "min_score": 0.5, - "replacement": "" - } - ] + "text_modification": { + "span_replacement": [ + { + "span": "$.attributes.c4_rules__c4_v1__lines_with_no_ending_punctuation", + "min_score": 0.5, + "replacement": "" + }, + { + "span": "$.attributes.c4_rules__c4_v1__lines_with_too_few_words", + "min_score": 0.5, + "replacement": "" + } + ] + } } ], "work_dir": { diff --git a/tests/config/email-spans.json b/tests/config/email-spans.json index ccd04292..e304b660 100644 --- a/tests/config/email-spans.json +++ b/tests/config/email-spans.json @@ -12,18 +12,20 @@ "attributes": [ "pii" ], - "span_replacement": [ - { - "span": "$.attributes.pii.email", - "min_score": 0.5, - "replacement": "[B-EMAIL]{}[E-EMAIL]" - }, - { - "span": "$.attributes.pii.company_name", - "min_score": 0.5, - "replacement": "" - } - ] + "text_modification": { + "span_replacement": [ + { + "span": "$.attributes.pii.email", + "min_score": 0.5, + "replacement": "[B-EMAIL]{}[E-EMAIL]" + }, + { + "span": "$.attributes.pii.company_name", + "min_score": 0.5, + "replacement": "" + } + ] + } } ], "work_dir": { diff --git a/tests/config/paragraph-spans.json b/tests/config/paragraph-spans.json index 9f25e2da..99579c02 100644 --- a/tests/config/paragraph-spans.json +++ b/tests/config/paragraph-spans.json @@ -12,13 +12,15 @@ "attributes": [ "duplicate_paragraphs" ], - "span_replacement": [ - { - "span": "$.attributes.bff_duplicate_paragraph_spans", - "min_score": 0.5, - "replacement": "" - } - ] + "text_modification": { + "span_replacement": [ + { + "span": "$.attributes.bff_duplicate_paragraph_spans", + "min_score": 0.5, + "replacement": "" + } + ] + } } ], "work_dir": {