nuclear_kote
@nuclear_kote

Как в clickhouse лить из кафки сложную структуру CapnProto?

Создал таблицу в клике:
CREATE TABLE queue (    
  foo Nested (
         code String, 
         value UInt32
  ),  
  bla UInt32) 
ENGINE = Kafka 
SETTINGS 
  kafka_broker_list = 'localhost:9092', 
  kafka_topic_list = 'test', 
  kafka_group_name = 'group1', 
  kafka_format = 'CapnProto',  
  kafka_schema = 'bar:BarStruct',  
  kafka_num_consumers = 1;

Создал схему capn:
struct BarStruct
{
    foo @0 :FooStruct;
    bla @1 :UInt32;
}

struct FooStruct
{
    code @0 :Text;
    value @1 :UInt32;
}


Пытаюсь лить из java в кафку следующим образом:
public class KafkaTest {

    public static Producer<Long, byte[]> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "testClient");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        return new KafkaProducer<>(props);
    }

    public static void main(String[] args) throws InterruptedException {
        org.capnproto.MessageBuilder message = new org.capnproto.MessageBuilder();
        Bar.BarStruct.Builder barStruct = message.initRoot(Bar.BarStruct.factory);
        barStruct.setBla(32);
        Bar.FooStruct.Builder fooBuilder = barStruct.initFoo();
        fooBuilder.setCode("Blabla");
        fooBuilder.setValue(42);
        final Producer<Long, byte[]> producer = createProducer();
        while (true) {
            for (ByteBuffer buffer : message.getSegmentsForOutput()) {
                final byte[] bytes = buffer.array();
                System.out.println(Arrays.toString(bytes));
                ProducerRecord<Long, byte[]> producerRecord = new ProducerRecord<>("test", bytes);
                producer.send(producerRecord);
            }
            TimeUnit.SECONDS.sleep(1);
        }
    }

}


В итоге результат селекта примерно такой:
host :) select * from queue

SELECT *
FROM queue5

Received exception from server (version 19.1.6):
Code: 33. DB::Exception: Received from localhost:9090, ::1. DB::Exception: Cannot read all data. Bytes read: 8192. Bytes expected: 524304.: (Input format doesn't allow to skip errors): (at row 1)
. 

0 rows in set. Elapsed: 0.504 sec. 

host :) select * from queue

SELECT *
FROM queue

Ok.
  • Вопрос задан
  • 707 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы