You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Get yourself an InputStream (in my case, one from Minio SDK when getting an object from the storage - which originates from OkHttp)
Pass it into response body {:body my-is}
Make client close connection prematurely (uh-oh, binary file being downloaded with curl into the terminal - by default curl does not allow that)
Aleph closes InputStream in different thread
I run into following exception (OkHttp, or rather okio which backs said stream does not support that use-case):
{host 127.0.0.1:7000, user-agent curl/7.68.0, accept */*}
read3 Thread[manifold-wait-6,5,main]
available Thread[manifold-wait-6,5,main]
read3 Thread[manifold-wait-6,5,main]
available Thread[manifold-wait-6,5,main]
read3 Thread[manifold-wait-6,5,main]
read3 Thread[manifold-wait-6,5,main]
available Thread[manifold-wait-6,5,main]
read3 Thread[manifold-wait-6,5,main]
close Thread[aleph-netty-server-event-pool-23,5,main]
Feb 16, 2020 8:45:12 PM manifold.utils invoke
SEVERE: error in invoke-callbacks
java.lang.IllegalStateException: Unbalanced enter/exit
at okio.AsyncTimeout.enter(AsyncTimeout.java:73)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:235)
at okio.RealBufferedSource.read(RealBufferedSource.java:51)
at okhttp3.internal.http1.Http1Codec$AbstractSource.read(Http1Codec.java:374)
at okhttp3.internal.http1.Http1Codec$FixedLengthSource.read(Http1Codec.java:418)
at okhttp3.internal.Util.skipAll(Util.java:204)
at okhttp3.internal.Util.discard(Util.java:186)
at okhttp3.internal.http1.Http1Codec$FixedLengthSource.close(Http1Codec.java:435)
at okio.RealBufferedSource.close(RealBufferedSource.java:476)
at okio.RealBufferedSource$1.close(RealBufferedSource.java:460)
at java.base/jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:167)
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:438)
at cu.resourcepack_server.core$fn__28582$fn__28595$fn__28605.invoke(form-init15695820198716781853.clj:36)
at cu.resourcepack_server.core.proxy$java.io.InputStream$ff19274a.close(Unknown Source)
at aleph.http.core$send_streaming_body$fn__15872.invoke(core.clj:324)
at manifold.utils$invoke_callbacks$fn__1143.invoke(utils.clj:69)
at manifold.utils$invoke_callbacks.invokeStatic(utils.clj:68)
at manifold.utils$invoke_callbacks.invoke(utils.clj:65)
at aleph.netty.ChannelSink.markClosed(netty.clj:344)
at aleph.netty.ChannelSink.close(netty.clj:352)
at aleph.netty$sink$fn__15376.invoke(netty.clj:407)
at manifold.deferred$eval1788$chain_SINGLEQUOTE____1809.invoke(deferred.clj:749)
at manifold.deferred$eval1788$subscribe__1789$fn__1794.invoke(deferred.clj:715)
at manifold.deferred.Listener.onSuccess(deferred.clj:219)
at manifold.deferred.Deferred$fn__1634.invoke(deferred.clj:398)
at manifold.deferred.Deferred.success(deferred.clj:398)
at manifold.deferred$success_BANG_.invokeStatic(deferred.clj:243)
at manifold.deferred$success_BANG_.invoke(deferred.clj:240)
at aleph.netty$wrap_future$reify__15320.operationComplete(netty.clj:218)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1152)
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:768)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:744)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:615)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at manifold.executor$thread_factory$reify__1009$f__1010.invoke(executor.clj:47)
at clojure.lang.AFn.run(AFn.java:22)
at java.base/java.lang.Thread.run(Thread.java:834)
available Thread[manifold-wait-6,5,main]
read3 Thread[manifold-wait-6,5,main]
close Thread[manifold-wait-6,5,main]
close Thread[manifold-wait-6,5,main]
Body + proxy class which delegates to the original stream (for debugging):
Similar issue was described in #454. It's unclear if Netty provides a good way of dealing with the situation like this, at least I didn't find any (the issue in Netty repo is open since 2017).
There are multiple things happening here. As far as InputStream has inherently blocking API, the code that fills-in data from network and the code that reads it have to run on different threads. I think an exception thrown on async thread pool could be captured and propagated back to InputStream thread to be re-thrown there... but it would require some jiggling. I'm testing a few improvements to error handling, will see if I can cover this specific use case properly. Thanks for the report!
Reproduction is pretty simple:
{:body my-is}
curl
into the terminal - by default curl does not allow that)I run into following exception (OkHttp, or rather okio which backs said stream does not support that use-case):
Body + proxy class which delegates to the original stream (for debugging):
Also simple logging snippet which works ok with the test scenario:
I think that this is not right
The text was updated successfully, but these errors were encountered: