Skip to content

Commit

Permalink
Merge pull request #60 from reugn/develop
Browse files Browse the repository at this point in the history
v0.9.0
  • Loading branch information
reugn authored Jan 7, 2023
2 parents 40a2384 + c8018b3 commit 3771cd7
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 93 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
name: Build

on: [push, pull_request]
on:
push:
branches:
- '**'
pull_request:
branches:
- master

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.19]
go-version: [1.18.x, 1.19.x]
steps:
- name: Setup Go
uses: actions/setup-go@v3
Expand All @@ -21,4 +27,5 @@ jobs:
run: go test ./... -coverprofile=coverage.out -covermode=atomic

- name: Upload coverage to Codecov
if: ${{ matrix.go-version == '1.18.x' }}
run: bash <(curl -s https://codecov.io/bash)
96 changes: 40 additions & 56 deletions flow/flow_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package flow_test

import (
"container/heap"
"fmt"
"reflect"
"sort"
"strings"
Expand All @@ -11,7 +9,6 @@ import (

ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow"
"github.com/reugn/go-streams/util"
)

var addAsterisk = func(in string) []string {
Expand All @@ -31,7 +28,6 @@ var reduceSum = func(a int, b int) int {

func ingestSlice[T any](source []T, in chan interface{}) {
for _, e := range source {
fmt.Printf("ingest: %v", e)
in <- e
}
}
Expand All @@ -53,26 +49,23 @@ func TestComplexFlow(t *testing.T) {
source := ext.NewChanSource(in)
toUpperMapFlow := flow.NewMap(strings.ToUpper, 1)
appendAsteriskFlatMapFlow := flow.NewFlatMap(addAsterisk, 1)
throttler := flow.NewThrottler(10, time.Second, 50, flow.Backpressure)
slidingWindow := flow.NewSlidingWindow(2*time.Second, 2*time.Second)
tumblingWindow := flow.NewTumblingWindow(time.Second)
filterNotContainsA := flow.NewFilter(filterNotContainsA, 1)
throttler := flow.NewThrottler(10, 200*time.Millisecond, 50, flow.Backpressure)
tumblingWindow := flow.NewTumblingWindow(200 * time.Millisecond)
filterFlow := flow.NewFilter(filterNotContainsA, 1)
sink := ext.NewChanSink(out)

inputValues := []string{"a", "b", "c"}
go ingestSlice(inputValues, in)
go closeDeferred(in, 3*time.Second)
go closeDeferred(in, time.Second)

go func() {
source.
Via(toUpperMapFlow).
Via(appendAsteriskFlatMapFlow).
Via(tumblingWindow).
Via(flow.Flatten(1)).
Via(slidingWindow).
Via(throttler).
Via(flow.Flatten(1)).
Via(filterNotContainsA).
Via(filterFlow).
To(sink)
}()

Expand All @@ -85,42 +78,41 @@ func TestComplexFlow(t *testing.T) {
assertEquals(t, expectedValues, outputValues)
}

func TestFanOutFlow(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})
func TestSplitFlow(t *testing.T) {
in := make(chan interface{}, 3)
out := make(chan interface{}, 3)

source := ext.NewChanSource(in)
filterNotContainsA := flow.NewFilter(filterNotContainsA, 1)
toUpperMapFlow := flow.NewMap(strings.ToUpper, 1)
sink := ext.NewChanSink(out)

inputValues := []string{"a", "b", "c"}
go ingestSlice(inputValues, in)
go closeDeferred(in, 100*time.Millisecond)
ingestSlice(inputValues, in)
close(in)

go func() {
fanOut := flow.FanOut(source.Via(filterNotContainsA).Via(toUpperMapFlow), 2)
flow.
Merge(fanOut...).
To(sink)
}()
split := flow.Split(
source.
Via(toUpperMapFlow), filterNotContainsA)

flow.Merge(split[0], split[1]).
To(sink)

var outputValues []string
for e := range sink.Out {
outputValues = append(outputValues, e.(string))
}
sort.Strings(outputValues)

expectedValues := []string{"B", "B", "C", "C"}
expectedValues := []string{"A", "B", "C"}
assertEquals(t, expectedValues, outputValues)
}

func TestRoundRobinFlow(t *testing.T) {
func TestFanOutFlow(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})

source := ext.NewChanSource(in)
filterNotContainsA := flow.NewFilter(filterNotContainsA, 1)
filterFlow := flow.NewFilter(filterNotContainsA, 1)
toUpperMapFlow := flow.NewMap(strings.ToUpper, 1)
sink := ext.NewChanSink(out)

Expand All @@ -129,9 +121,12 @@ func TestRoundRobinFlow(t *testing.T) {
go closeDeferred(in, 100*time.Millisecond)

go func() {
roundRobin := flow.RoundRobin(source.Via(filterNotContainsA).Via(toUpperMapFlow), 2)
fanOut := flow.FanOut(
source.
Via(filterFlow).
Via(toUpperMapFlow), 2)
flow.
Merge(roundRobin...).
Merge(fanOut...).
To(sink)
}()

Expand All @@ -141,39 +136,41 @@ func TestRoundRobinFlow(t *testing.T) {
}
sort.Strings(outputValues)

expectedValues := []string{"B", "C"}
expectedValues := []string{"B", "B", "C", "C"}
assertEquals(t, expectedValues, outputValues)
}

func TestSessionWindow(t *testing.T) {
func TestRoundRobinFlow(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})

source := ext.NewChanSource(in)
sessionWindow := flow.NewSessionWindow(200 * time.Millisecond)
filterFlow := flow.NewFilter(filterNotContainsA, 1)
toUpperMapFlow := flow.NewMap(strings.ToUpper, 1)
sink := ext.NewChanSink(out)

inputValues := []string{"a", "b", "c"}
go ingestSlice(inputValues, in)
go ingestDeferred("d", in, 300*time.Millisecond)
go ingestDeferred("e", in, 700*time.Millisecond)
go closeDeferred(in, time.Second)
go closeDeferred(in, 100*time.Millisecond)

go func() {
source.
Via(sessionWindow).
roundRobin := flow.RoundRobin(
source.
Via(filterFlow).
Via(toUpperMapFlow), 2)
flow.
Merge(roundRobin...).
To(sink)
}()

var outputValues [][]interface{}
var outputValues []string
for e := range sink.Out {
outputValues = append(outputValues, e.([]interface{}))
outputValues = append(outputValues, e.(string))
}
sort.Strings(outputValues)

assertEquals(t, 3, len(outputValues))
assertEquals(t, 3, len(outputValues[0]))
assertEquals(t, 1, len(outputValues[1]))
assertEquals(t, 1, len(outputValues[2]))
expectedValues := []string{"B", "C"}
assertEquals(t, expectedValues, outputValues)
}

func TestReduceFlow(t *testing.T) {
Expand Down Expand Up @@ -201,19 +198,6 @@ func TestReduceFlow(t *testing.T) {
assertEquals(t, expectedValues, outputValues)
}

func TestQueue(t *testing.T) {
queue := &flow.PriorityQueue{}
heap.Push(queue, flow.NewItem(1, util.NowNano(), 0))
heap.Push(queue, flow.NewItem(2, 1234, 0))
heap.Push(queue, flow.NewItem(3, util.NowNano(), 0))
queue.Swap(0, 1)
head := queue.Head()
queue.Update(head, util.NowNano())
first := heap.Pop(queue).(*flow.Item)

assertEquals(t, 2, first.Msg.(int))
}

func assertEquals[T any](t *testing.T, expected T, actual T) {
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("%v != %v", expected, actual)
Expand Down
55 changes: 55 additions & 0 deletions flow/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package flow_test

import (
"container/heap"
"testing"

"github.com/reugn/go-streams/flow"
"github.com/reugn/go-streams/util"
)

func TestQueueOps(t *testing.T) {
queue := &flow.PriorityQueue{}
heap.Push(queue, flow.NewItem(1, util.NowNano(), 0))
heap.Push(queue, flow.NewItem(2, 1234, 0))
heap.Push(queue, flow.NewItem(3, util.NowNano(), 0))
queue.Swap(0, 1)
head := queue.Head()
queue.Update(head, util.NowNano())
first := heap.Pop(queue).(*flow.Item)

assertEquals(t, 2, first.Msg.(int))
}

func TestQueueOrder(t *testing.T) {
queue := &flow.PriorityQueue{}

pushItem(queue, 5)
pushItem(queue, 4)
pushItem(queue, 6)
pushItem(queue, 3)
pushItem(queue, 7)
pushItem(queue, 2)
pushItem(queue, 8)
pushItem(queue, 1)
pushItem(queue, 9)

assertEquals(t, 1, popMsg(queue))
assertEquals(t, 2, popMsg(queue))
assertEquals(t, 3, popMsg(queue))
assertEquals(t, 4, popMsg(queue))
assertEquals(t, 5, popMsg(queue))
assertEquals(t, 6, popMsg(queue))
assertEquals(t, 7, popMsg(queue))
assertEquals(t, 8, popMsg(queue))
assertEquals(t, 9, popMsg(queue))
}

func pushItem(queue *flow.PriorityQueue, timestamp int64) {
item := flow.NewItem(timestamp, timestamp, 0)
heap.Push(queue, item)
}

func popMsg(queue *flow.PriorityQueue) int64 {
return (heap.Pop(queue).(*flow.Item)).Msg.(int64)
}
43 changes: 43 additions & 0 deletions flow/session_window_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package flow_test

import (
"fmt"
"testing"
"time"

ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow"
)

func TestSessionWindow(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})

source := ext.NewChanSource(in)
sessionWindow := flow.NewSessionWindow(20 * time.Millisecond)
sink := ext.NewChanSink(out)

inputValues := []string{"a", "b", "c"}
go ingestSlice(inputValues, in)
go ingestDeferred("d", in, 30*time.Millisecond)
go ingestDeferred("e", in, 70*time.Millisecond)
go closeDeferred(in, 100*time.Millisecond)

go func() {
source.
Via(sessionWindow).
To(sink)
}()

var outputValues [][]interface{}
for e := range sink.Out {
outputValues = append(outputValues, e.([]interface{}))
}
fmt.Println(outputValues)

assertEquals(t, 3, len(outputValues)) // [[a b c] [d] [e]]

assertEquals(t, []interface{}{"a", "b", "c"}, outputValues[0])
assertEquals(t, []interface{}{"d"}, outputValues[1])
assertEquals(t, []interface{}{"e"}, outputValues[2])
}
Loading

0 comments on commit 3771cd7

Please sign in to comment.