nuclear_kote
@nuclear_kote

Как в clickhouse лить из кафки сложную структуру CapnProto?

Создал таблицу в клике:
CREATE TABLE queue (    
  foo Nested (
         code String, 
         value UInt32
  ),  
  bla UInt32) 
ENGINE = Kafka 
SETTINGS 
  kafka_broker_list = 'localhost:9092', 
  kafka_topic_list = 'test', 
  kafka_group_name = 'group1', 
  kafka_format = 'CapnProto',  
  kafka_schema = 'bar:BarStruct',  
  kafka_num_consumers = 1;

Создал схему capn:
struct BarStruct
{
    foo @0 :FooStruct;
    bla @1 :UInt32;
}

struct FooStruct
{
    code @0 :Text;
    value @1 :UInt32;
}


Пытаюсь лить из java в кафку следующим образом:
public class KafkaTest {

    public static Producer<Long, byte[]> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "testClient");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        return new KafkaProducer<>(props);
    }

    public static void main(String[] args) throws InterruptedException {
        org.capnproto.MessageBuilder message = new org.capnproto.MessageBuilder();
        Bar.BarStruct.Builder barStruct = message.initRoot(Bar.BarStruct.factory);
        barStruct.setBla(32);
        Bar.FooStruct.Builder fooBuilder = barStruct.initFoo();
        fooBuilder.setCode("Blabla");
        fooBuilder.setValue(42);
        final Producer<Long, byte[]> producer = createProducer();
        while (true) {
            for (ByteBuffer buffer : message.getSegmentsForOutput()) {
                final byte[] bytes = buffer.array();
                System.out.println(Arrays.toString(bytes));
                ProducerRecord<Long, byte[]> producerRecord = new ProducerRecord<>("test", bytes);
                producer.send(producerRecord);
            }
            TimeUnit.SECONDS.sleep(1);
        }
    }

}


В итоге результат селекта примерно такой:
host :) select * from queue

SELECT *
FROM queue5

Received exception from server (version 19.1.6):
Code: 33. DB::Exception: Received from localhost:9090, ::1. DB::Exception: Cannot read all data. Bytes read: 8192. Bytes expected: 524304.: (Input format doesn't allow to skip errors): (at row 1)
. 

0 rows in set. Elapsed: 0.504 sec. 

host :) select * from queue

SELECT *
FROM queue

Ok.
  • Вопрос задан
  • 135 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы
SIDIX Consulting Москва
от 100 000 до 200 000 руб.
Finnplay Technologies Oy Смоленск
от 60 000 до 140 000 руб.
MFMS Москва
от 150 000 до 300 000 руб.