Ответы пользователя по тегу Netty
  • Java+netty+kafka: как перейти от многопоточности к мультиплексированию?

    sergey-gornostaev
    @sergey-gornostaev Куратор тега Java
    Седой и строгий
    Не работал с Kafka, но на сколько я знаю, она синхронная до безобразия. По крайней мере в вопросе подписки. В голову приходят два способа решить проблему интеграции с асинхронным Netty.

    Можно в инициализаторе конвейера или обработчике клиентского соединения запускать периодическую задачу, которая будет опрашивать очередь с нулевым таймаутом:
    eventLoop.schedule(() -> {
       ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
       // Какие-либо действия
    }, 100, TimeUnit.MILLISECONDS);

    Но этот вариант обрушит на сервер Kafka шквал запросов.

    Другой вариант - это сделать костыль в виде дополнительной очереди, в которую отправлять сообщения о том, что в какой-либо из клиентских очередей появилось сообщение. Тогда можно в одном потоке заблокировать ожидание сообщений из этой очереди, а при получении порождать событие в цикле событий Netty:
    class MessageListener implements Runnable {
        private final ChannelGroup group;
        private volatile boolean run = true;
    
        public MessageListener(ChannelGroup group) {
            this.group = group;
        }
    
        public void run() {
            while(run) {
                ConsumerRecords<String, String> records = notificationConsumer.poll(Duration.ofSecond(5));
                if (!records.isEmpty())
                    group.forEach(c -> c.pipeline().fireUserEventTriggered(new NewMsgEvent()));
            }
        }
    
        public void stop() {
            run = false;
        }
    }

    class SomeHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if(evt instanceof NewMsgEvent) {
                ConsumerRecords<String, String> records = clientConsumer.poll(Duration.ZERO);
                records.forEach(record -> {
                    ctx.write(Unpooled.wrappedBuffer(record.value().getBytes(StandardCharsets.UTF_8)));
                });
                ctx.flush();
            }
            else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }

    Можно эту идею немного доработать, передавая в очереди уведомлений информацию о том, в какой именно из клиентских очередей появилось новое сообщение, чтобы MessageListener мог отправить событие только в один нужный конвейер или чтобы только нужный обработчик на событие отреагировал.
    Ответ написан
  • Когда использовать Netty/Jetty а когда не париться и юзать Tomcat?

    sergey-gornostaev
    @sergey-gornostaev Куратор тега Tomcat
    Седой и строгий
    Прежде всего, Jetty вы зря поставили в один ряд с Netty. Совсем разного поля ягоды. Jetty - это web-контейнер. Netty - фреймворк для разработки асинхронных приложений. Jetty используется, когда нужен маленький и простой web-контейнер. Часто как встроенный web-сервер. Netty в основном для разработки восокопроизводительных серверов. Причём необязательно web-серверов.

    Если же сравнивать Jetty и Tomcat, они полностью взаимозаменяемы. Обычно, Jetty используют для самостоятельных приложений, упакованных в один jar-файл. А Tomcat тогда, когда вам нужно обслуживать несколько десятков приложений, упакованных в war-файлы. Но можно и наоборот.
    Ответ написан
  • Как выполнить действие после задержки в netty-socketio?

    sergey-gornostaev
    @sergey-gornostaev Куратор тега Java
    Седой и строгий
    Можно так
    import io.netty.util.Timer;
    import io.netty.util.TimerTask;
    
    timer.newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // Отложенное действие
        }
    }, 30, TimeUnit.SECONDS);

    но лучше так
    eventLoop.schedule(() -> {
        // Отложенное действие
    }, 30, TimeUnit.MILLISECONDS);
    Ответ написан
  • Как подсчитать реальное время ответа в Netty 4?

    sergey-gornostaev
    @sergey-gornostaev Куратор тега Java
    Седой и строгий
    new ServerBootstrap().group(group)
                         .channel(NioServerSocketChannel.class)
                         .childHandler(new ChannelInitializer<SocketChannel>() {
                              @Override
                               public void initChannel(SocketChannel socketChannel) {
                                   final long startTime = System.currentTimeMillis();
    
                                   ChannelPipeline pipeline = socketChannel.pipeline();
                                   pipeline.addLast(new ChannelOutboundHandlerAdapter() {
                                       @Override
                                       public void read(ChannelHandlerContext ctx) {
                                           ctx.writeAndFlush(Unpooled.wrappedBuffer("Hello!".getBytes()));
                                              .addListener(ChannelFutureListener.CLOSE);
                                       }
                                   });
    
                                   ChannelFuture f = socketChannel.closeFuture();
                                   f.addListener(new ChannelFutureListener() {
                                       @Override
                                       public void operationComplete(ChannelFuture future) throws Exception {
                                           System.out.println(System.currentTimeMillis() - startTime);
                                       }
                                   });
                               }
                           })
                           .bind("localhost", 1025)
                           .sync()
                           .channel()
                           .closeFuture()
                           .syncUninterruptibly();
    Ответ написан
  • Как передать объект через netty?

    sergey-gornostaev
    @sergey-gornostaev Куратор тега Java
    Седой и строгий
    Кодеки, в том числе кодеры и декодеры, нужны как раз для того, чтобы преобразовывать данные поступающие в канал и выходящие из него. Можно всё это в одном обработчике делать, но разделение логики и следование принципам фреймворка облегчает отладку и сопровождение кода. Я не знаю, что такое Pocket, но в последних версиях кодеки используются так же, как и в предыдущих.

    Person.java
    import java.io.Serializable;
    
    public class Person implements Serializable {
        private final Long id;
        private String name;
    
        public Person(Long id, String name) {
            this.id = id;
            this.name = name;
        }
    
        public Long getId() {
            return id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
        
        public String toString() {
            return getName();
        }
    }


    SimpleClient.java
    public class SimpleClient {
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                new Bootstrap().group(group)
                               .channel(NioSocketChannel.class)
                               .handler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    public void initChannel(SocketChannel socketChannel) {
                                        ChannelPipeline pipeline = socketChannel.pipeline();
                                        pipeline.addLast(new ObjectEncoder());
                                        pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                                        pipeline.addLast(new ChannelInboundHandlerAdapter() {
                                            @Override
                                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                                ctx.writeAndFlush(new Person(1L, "John"));
                                            }
    
                                            @Override
                                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                                Person person = (Person) msg;
                                                System.out.println(person);
                                            }
                                        });
                                    }
                                })
                                .connect("localhost", 8080)
                                .sync()
                                .channel()
                                .closeFuture()
                                .sync();
    
            } finally {
                group.shutdownGracefully();
            }
        }
    }


    SimpleServer.java
    public class SimpleServer {
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                new ServerBootstrap().group(group)
                                     .channel(NioServerSocketChannel.class)
                                     .childHandler(new ChannelInitializer<SocketChannel>() {
                                        @Override
                                        public void initChannel(SocketChannel socketChannel) {
                                            ChannelPipeline pipeline = socketChannel.pipeline();
                                            pipeline.addLast(new ObjectEncoder());
                                            pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                                            pipeline.addLast(new ChannelInboundHandlerAdapter() {
                                                @Override
                                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                                    Person person = (Person) msg;
                                                    System.out.println(person);
    
                                                    person.setName("John Doe");
                                                    ctx.writeAndFlush(person)
                                                       .addListener(ChannelFutureListener.CLOSE);
                                                }
    
                                                @Override
                                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                                    cause.printStackTrace();
                                                    ctx.channel().close();
                                                }
                                            });
                                        }
                                     })
                                     .bind("localhost", 8080)
                                     .sync()
                                     .channel()
                                     .closeFuture()
                                     .syncUninterruptibly();
    
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    Ответ написан
  • Как использовать netty?

    sergey-gornostaev
    @sergey-gornostaev Куратор тега Java
    Седой и строгий
    Общее представление можно получить из книги Maurer, Wolfthal - Netty in Action. Документацию по API читать на официальном сайте. Ну и сразу готовиться к тому, что придётся постоянно лазить за ответами в исходники, как самого Netty, так и использующих его проектов. Найти их можно на Github'е. Кроме того, стоит учесть, что без знания принципов сетевого программирования шансов на успешное освоение нет.
    Ответ написан