Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For large POSTs RxNetty seems to need to write everything before reading anything #596

Open
flyaruu opened this issue Aug 22, 2017 · 6 comments

Comments

@flyaruu
Copy link

flyaruu commented Aug 22, 2017

I have a service that takes large POSTs (over 1Gb at times), transforms them and returns a transformed version.

The server can not keep the whole thing in memory, and starts streaming the response while the request is still streaming in.

I'm trying to use RxNetty for this, but RxNetty does not read anything while it is still writing. My service then blocks because it has no place to store data.

Is this intentional? Am I the first one to run into this problem?

@jamesgorman2
Copy link
Collaborator

jamesgorman2 commented Aug 22, 2017 via email

@flyaruu
Copy link
Author

flyaruu commented Aug 22, 2017

I've tried this, and it does get better (in the sense that it writes more before freezing) but the behaviour is the same.

If I run this snippet:

	@Test
	public void testRxNettyIssue596() {
	Observable<ByteBuf> body = Bytes.from(new File("/Users/frank/git/reactive-servlet/rxjava-extras-0.8.0.8.jar"))
			.doOnNext(e->System.err.println("Read some data"))
			.doOnCompleted(()->System.err.println("All data complete"))
			.observeOn(Schedulers.io())
			.map(b->Unpooled.copiedBuffer(b));
		
	
	HttpClient.newClient("localhost",8080).createPost("/reactive-servlet/reactive")
	    .writeContentAndFlushOnEach(body)
	    .map(response->response.getContent().asObservable())
	    .concatMap(e->e)
	    .observeOn(Schedulers.io())
	    .doOnNext(e->System.err.println("Something written back"))
	    .doOnCompleted(()->System.err.println("All data received"))
	    .toBlocking()
	    .subscribe();
	}

It works out (the reactive-servlet just echo's the same data as it comes in), because this file is small enough (400k) buf if I look at the output:

Read some data
.....
Read some data
Read some data
Read some data
All data complete
Something written back
Something written back
Something written back
Something written back
......
Something written back
All data received

I see that the data only starts streaming back after all data has been sent. For bigger files, my server won't accept all data if it can't write back.

@jamesgorman2
Copy link
Collaborator

jamesgorman2 commented Aug 23, 2017

I see what you mean now. I certainly find this behaviour unexpected. I have a repro below. It will log each with block written or read.

writeContent... does not appear to be emitting the response until it has been fully returned. Looking at the chunk example from 0.4.x I wonder if something similar is happening on 0.5.x. I haven't been able to identify is causing it yet. More investigation is required.

import io.netty.buffer.ByteBuf;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.server.HttpServer;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicLong;

public class TestLargePayloads {
  private static AtomicLong clientWrote = new AtomicLong(0);
  private static AtomicLong serverRead = new AtomicLong(0);
  private static AtomicLong serverWrote = new AtomicLong(0);
  private static AtomicLong clientRead = new AtomicLong(0);

//  private static AtomicLong iteration = new AtomicLong(0);

  public static void main(String[] args) throws Exception {

    HttpServer<ByteBuf, ByteBuf> server = HttpServer.newServer()
      .enableWireLogging("FOO", LogLevel.WARN)
      .start(
        (request, response) ->
          response.writeStringAndFlushOnEach( // force flush server output
            request.getContent().autoRelease() // release bytebufs
              .subscribeOn(Schedulers.computation())
              .map((byteBuf) -> byteBuf.toString(Charset.defaultCharset()))
              .doOnNext(s -> update(serverRead, s.length()))
              .compose(TestLargePayloads::incrementChars)
              .doOnNext(s -> update(serverWrote, s.length()))
              .doOnNext(s -> sleep()) // force thread switching
          )
      );

    TestSubscriber<String> subscriber = TestSubscriber.create();

    printStatus();

    HttpClient.newClient("localhost", server.getServerPort())
      .enableWireLogging("TMP", LogLevel.INFO)
      .createPost("/")
      .writeStringContent(
        Observable.range(0, 10)
          .map(i -> String.format("%010d", i))
          .doOnNext(s -> update(clientWrote, s.length()))
          .doOnNext(s -> sleep()), // force thread switching
        s -> true // force flush client output
      )
      .flatMap(
        response -> {
          System.out.println("Got response");
          return response.getContent().autoRelease()
            .map((byteBuf) -> byteBuf.toString(Charset.defaultCharset()));
        }
      )
      .doOnNext(s -> update(clientRead, s.length()))
      .subscribe(subscriber);

    subscriber.awaitTerminalEvent();

//    printStatus();

    System.err.println();

    subscriber.assertNoErrors();

    server.shutdown();
  }

  private static void sleep() {
    try {
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }


  static Observable<String> incrementChars(Observable<String> ss) {
    // busy work
    return ss.map(
      s ->
        s.chars()
          .map(i -> i + 1)
          .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
          .toString()
    );
  }

  static void update(AtomicLong v, int by) {
    v.addAndGet(by);

//    if (iteration.getAndIncrement() % 1000 == 0) {
      printStatus();
//    }
  }

  private static void printStatus() {
    System.out.println(
      String.format(
        "Client wrote %s, Server read %s, Server wrote %s, Client read %s [%s]",
        clientWrote,
        serverRead,
        serverWrote,
        clientRead,
        Thread.currentThread().getName()
      )
    );
  }
}

@flyaruu
Copy link
Author

flyaruu commented Aug 24, 2017

I've honestly never seen a HTTP server do this in the wild, and I really don't know if this behaviour is normal (or even allowed by the HTTP standard (I couldn't find any references to it)), but for me it would be the most efficient.

While researching this, I do get the impression that this is at least a less-trodden path (on the server side I got issues in Jetty and Undertow). Also, I discovered on the client side that cURL handles this well, it starts streaming immediately, but wget behaves like RxNetty, it doesn't stream anything until this upload is done.

@frankbolander
Copy link

@flyaruu If you change your observeOn(Schedulers.io()) to subscribeOn(Schedulers.io()), I think you will get the behavior you want.

You may want to do subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()) to ensure io/computation being done on separate threads.

@flyaruu
Copy link
Author

flyaruu commented Sep 12, 2017

I've tried doing that but I don't see the behaviour changing. If I add the observeOn / subscribeOn to James' example, it still uploads the entire post first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants