Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ooesili committed Dec 3, 2024
1 parent 2e47f2a commit 1bbf6fa
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 26 deletions.
32 changes: 21 additions & 11 deletions internal/impl/sftp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ func (s *sftpReader) Connect(ctx context.Context) (err error) {
if s.scanner, err = s.scannerCtor.Create(file, func(ctx context.Context, aErr error) (outErr error) {
_ = s.pathProvider.Ack(ctx, nextPath, aErr)
if aErr != nil {
s.log.Errorf("skipping delete on finish: %s", aErr)
return nil
}
if s.deleteOnFinish {
Expand Down Expand Up @@ -223,7 +222,7 @@ func (s *sftpReader) Connect(ctx context.Context) (err error) {
return
}

func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPath string, skip bool, err error) {
func (s *sftpReader) initState(ctx context.Context) (client *sftp.Client, pathProvider pathProvider, skip bool, err error) {
s.scannerMut.Lock()
defer s.scannerMut.Unlock()

Expand All @@ -242,11 +241,22 @@ func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPat
s.pathProvider = s.getFilePathProvider(ctx)
}

return s.client, s.pathProvider, false, nil
}

func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPath string, skip bool, err error) {
client, pathProvider, skip, err := s.initState(ctx)
if err != nil || skip {
return
}

for {
if nextPath, err = s.pathProvider.Next(ctx, s.client); err != nil {
if nextPath, err = pathProvider.Next(ctx, client); err != nil {
if errors.Is(err, sftp.ErrSshFxConnectionLost) {
_ = s.client.Close()
_ = client.Close()
s.scannerMut.Lock()
s.client = nil
s.scannerMut.Unlock()
return
}
if errors.Is(err, errEndOfPaths) {
Expand All @@ -255,21 +265,23 @@ func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPat
return
}

if file, err = s.client.Open(nextPath); err != nil {
if file, err = client.Open(nextPath); err != nil {
if errors.Is(err, sftp.ErrSshFxConnectionLost) {
_ = s.client.Close()
_ = client.Close()
s.scannerMut.Lock()
s.client = nil
s.scannerMut.Unlock()
}

s.log.With("path", nextPath, "err", err.Error()).Warn("Unable to open previously identified file")
if os.IsNotExist(err) {
// If we failed to open the file because it no longer exists
// then we can "ack" the path as we're done with it.
_ = s.pathProvider.Ack(ctx, nextPath, nil)
_ = pathProvider.Ack(ctx, nextPath, nil)
} else {
// Otherwise we "nack" it with the error as we'll want to
// reprocess it again later.
_ = s.pathProvider.Ack(ctx, nextPath, err)
_ = pathProvider.Ack(ctx, nextPath, err)
}
} else {
return
Expand Down Expand Up @@ -310,9 +322,7 @@ func (s *sftpReader) ReadBatch(ctx context.Context) (service.MessageBatch, servi
part.MetaSetMut("sftp_path", currentPath)
}

return parts, func(ctx context.Context, res error) error {
return codecAckFn(ctx, res)
}, nil
return parts, codecAckFn, nil
}

func (s *sftpReader) Close(ctx context.Context) error {
Expand Down
83 changes: 68 additions & 15 deletions internal/impl/sftp/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package sftp

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/pkg/sftp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -110,6 +115,13 @@ func TestIntegrationSFTPDeleteOnFinish(t *testing.T) {

resource := setupDockerPool(t)

client, err := getClient(resource)
require.NoError(t, err)

writeSFTPFile(t, client, "/upload/1.txt", "data-1")
writeSFTPFile(t, client, "/upload/2.txt", "data-2")
writeSFTPFile(t, client, "/upload/3.txt", "data-3")

config := `
output:
drop: {}
Expand All @@ -135,22 +147,51 @@ cache_resources:
default_ttl: 900s
`
config = strings.NewReplacer(
"PORT", resource.GetPort("22/tcp"),
"$PORT", resource.GetPort("22/tcp"),
).Replace(config)

env := service.NewEnvironment()
parsedConfig, err := sftpInputSpec().ParseYAML(config, env)
require.NoError(t, err)

reader, err := newSFTPReaderFromParsed(parsedConfig, service.MockResources())
var receivedPathsMut sync.Mutex
var receivedPaths []string

builder := service.NewStreamBuilder()
require.NoError(t, builder.SetYAML(config))
require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, msg *service.Message) error {
receivedPathsMut.Lock()
defer receivedPathsMut.Unlock()
path, ok := msg.MetaGet("sftp_path")
if !ok {
return errors.New("sftp_path metadata not found")
}
receivedPaths = append(receivedPaths, path)
return nil
}))
stream, err := builder.Build()
require.NoError(t, err)

// TODO: what do I do here to drive the input
// reader.Connect
_ = reader
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

runErr := make(chan error)
go func() { runErr <- stream.Run(ctx) }()
defer func() {
err := <-runErr
require.NoError(t, err, "stream.Run() failed")
}()

// require.EventuallyWithT(t, func(c *assert.CollectT) {
// receivedPathsMut.Lock()
// defer receivedPathsMut.Unlock()
// require.Len(c, receivedPaths, 3)

// files, err := client.Glob("/upload/*.txt")
// require.NoError(c, err)
// require.Empty(c, files)
// }, time.Second*1000, time.Millisecond*100)
}

func setupDockerPool(t *testing.T) *dockertest.Resource {
t.Helper()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

Expand All @@ -165,24 +206,36 @@ func setupDockerPool(t *testing.T) *dockertest.Resource {
})
require.NoError(t, err)
t.Cleanup(func() {
fmt.Println("============ PURGE TIME =========")
assert.NoError(t, pool.Purge(resource))
})

_ = resource.Expire(900)

creds := credentials{
Username: sftpUsername,
Password: sftpPassword,
}

// wait for server to be ready to accept connections
require.NoError(t, pool.Retry(func() error {
_, err = creds.GetClient(&osPT{}, "localhost:"+resource.GetPort("22/tcp"))
_, err = getClient(resource)
return err
}))

return resource
}
func getClient(resource *dockertest.Resource) (*sftp.Client, error) {
creds := credentials{
Username: sftpUsername,
Password: sftpPassword,
}
return creds.GetClient(&osPT{}, "localhost:"+resource.GetPort("22/tcp"))
}

func writeSFTPFile(t *testing.T, client *sftp.Client, path, data string) {
t.Helper()
file, err := client.Create(path)
require.NoError(t, err, "creating file")
defer file.Close()
_, err = fmt.Fprint(file, data, "writing file contents")
require.NoError(t, err)
}

type osPT struct{}

Expand Down

0 comments on commit 1bbf6fa

Please sign in to comment.