Skip to content

Commit

Permalink
runtime: Move Dekaf invocations into Runtime::unary_materialize()
Browse files Browse the repository at this point in the history
In order for field selection to work in the UI, it needs to be able to validate a draft with a Dekaf connector in it. This detects when the request is for a Dekaf materialization and re-routes it to Dekaf's `unary_materialize`. It also means that the `connector_tags` job can work for Dekaf with a small change to detect image names starting in `ghcr.io/estuary/dekaf-*` and mark them as `ConnectorType::Dekaf`.
  • Loading branch information
jshearer committed Jan 3, 2025
1 parent 466a716 commit e470818
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 73 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 21 additions & 62 deletions crates/agent/src/connector_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,26 +137,20 @@ impl TagHandler {
let log_handler =
logs::ops_handler(self.logs_tx.clone(), "spec".to_string(), row.logs_token);

let spec_result = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) {
spec_dekaf(&image_composed).await
} else {
let runtime = Runtime::new(
self.allow_local,
self.connector_network.clone(),
log_handler,
None, // no need to change log level
"ops/connector-tags-job".to_string(),
);

match proto_type {
RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await,
RuntimeProtocol::Materialize => {
spec_materialization(&image_composed, runtime).await
}
RuntimeProtocol::Derive => {
tracing::warn!(image = %image_composed, "unhandled Spec RPC for derivation connector image");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
let runtime = Runtime::new(
self.allow_local,
self.connector_network.clone(),
log_handler,
None, // no need to change log level
"ops/connector-tags-job".to_string(),
);

let spec_result = match proto_type {
RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await,
RuntimeProtocol::Materialize => spec_materialization(&image_composed, runtime).await,
RuntimeProtocol::Derive => {
tracing::warn!(image = %image_composed, "unhandled Spec RPC for derivation connector image");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
};

Expand Down Expand Up @@ -235,9 +229,15 @@ async fn spec_materialization(
) -> anyhow::Result<ConnectorSpec> {
use proto_flow::materialize;

let connector_type = if image.starts_with(DEKAF_IMAGE_NAME_PREFIX) {
flow::materialization_spec::ConnectorType::Dekaf as i32
} else {
flow::materialization_spec::ConnectorType::Image as i32
};

let req = materialize::Request {
spec: Some(materialize::request::Spec {
connector_type: flow::materialization_spec::ConnectorType::Image as i32,
connector_type,
config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}}))
.unwrap(),
}),
Expand Down Expand Up @@ -277,47 +277,6 @@ async fn spec_materialization(
})
}

async fn spec_dekaf(image: &str) -> anyhow::Result<ConnectorSpec> {
use proto_flow::materialize;

let req = materialize::Request {
spec: Some(materialize::request::Spec {
connector_type: flow::materialization_spec::ConnectorType::Image as i32,
config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}}))
.unwrap(),
}),
..Default::default()
};

let spec = dekaf::connector::unary_materialize(req)
.await?
.spec
.ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?;

let materialize::response::Spec {
protocol: _,
config_schema_json,
resource_config_schema_json,
documentation_url,
oauth2,
} = spec;

let oauth = if let Some(oa) = oauth2 {
Some(serde_json::value::to_raw_value(&oa).expect("serializing oauth2 config"))
} else {
None
};
Ok(ConnectorSpec {
documentation_url,
endpoint_config_schema: RawValue::from_string(config_schema_json)
.context("parsing endpoint config schema")?,
resource_config_schema: RawValue::from_string(resource_config_schema_json)
.context("parsing resource config schema")?,
resource_path_pointers: Vec::new(),
oauth2: oauth,
})
}

async fn spec_capture(
image: &str,
runtime: Runtime<impl LogHandler>,
Expand Down
9 changes: 1 addition & 8 deletions crates/build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,14 +376,7 @@ impl<L: runtime::LogHandler> validation::Connectors for RuntimeConnectors<L> {
request: materialize::Request,
_data_plane: &'a tables::DataPlane,
) -> BoxFuture<'a, anyhow::Result<materialize::Response>> {
match flow::materialization_spec::ConnectorType::try_from(
request.validate.as_ref().unwrap().connector_type,
) {
Ok(flow::materialization_spec::ConnectorType::Dekaf) => {
dekaf::connector::unary_materialize(request).boxed()
}
_ => self.runtime.clone().unary_materialize(request).boxed(),
}
self.runtime.clone().unary_materialize(request).boxed()
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ assemble = { path = "../assemble" }
async-process = { path = "../async-process" }
connector-init = { path = "../connector-init" }
coroutines = { path = "../coroutines" }
dekaf = { path = "../dekaf" }
derive-sqlite = { path = "../derive-sqlite" }
doc = { path = "../doc" }
extractors = { path = "../extractors" }
Expand Down
22 changes: 19 additions & 3 deletions crates/runtime/src/unary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{LogHandler, Runtime};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use proto_flow::{capture, derive, materialize};
use proto_flow::{capture, derive, flow::materialization_spec, materialize};

impl<L: LogHandler> Runtime<L> {
pub async fn unary_capture(
Expand All @@ -20,8 +20,24 @@ impl<L: LogHandler> Runtime<L> {
self,
request: materialize::Request,
) -> anyhow::Result<materialize::Response> {
let response = self.serve_materialize(unary_in(request)).boxed();
unary_out(response).await
let is_dekaf = request.spec.as_ref().is_some_and(|spec| {
matches!(
spec.connector_type(),
materialization_spec::ConnectorType::Dekaf
)
}) || request.validate.as_ref().is_some_and(|validate| {
matches!(
validate.connector_type(),
materialization_spec::ConnectorType::Dekaf
)
});

if is_dekaf {
dekaf::connector::unary_materialize(request).await
} else {
let unary_resp = self.serve_materialize(unary_in(request)).boxed();
unary_out(unary_resp).await
}
}
}

Expand Down

0 comments on commit e470818

Please sign in to comment.