Kafka
12 min
this connector allows you to send and receive messages through kafka asset setup or prerequisites the connector asset requires the following inputs for connecting to kafka username password ssl certfile bootstrap servers (required) capabilities this connector provides the following capabilities get data send data retrieve data task get data tasks will track the offset for each partition they pull from this prevents any messages from being pulled twice configurations kafka api authentication authenticates using kafka api authentication configuration parameters parameter description type required bootstrap servers comma separated brokers list string required username sasl username for kafka string optional password sasl password for kafka string optional ssl certfile base64 encoded certificate in pem format to authenticate string optional verify ssl verify ssl certificate boolean optional http proxy a proxy to route requests through string optional actions get data gets data from a kafka topic input argument name type required description topic string required kafka topic partitions string optional comma separated partitions partition offsets object optional partition offsets which is an object where keys are partition ids and values are offsets limit number optional limit the number of messages to be retrieved defaults to 100 return single record boolean optional if true, return all the results in a single record in messages in result object if false, create a record for each result with the message and returns data in messages in result object client id string optional a name for this client group id string optional the name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets fetch max wait ms number optional the maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch min bytes default is 500 fetch min bytes number optional minimum amount of data the server should return for a fetch request, otherwise wait up to fetch max wait ms for more data to accumulate default is 1 fetch max bytes number optional the maximum amount of data the server should return for a fetch request max partition fetch bytes number optional the maximum amount of data per partition the server will return request timeout ms number optional client request timeout in milliseconds default is 305000 retry backoff ms number optional milliseconds to backoff when retrying on errors default is 100 reconnect backoff ms number optional the amount of time in milliseconds to wait before attempting to reconnect to a given host default is 50 reconnect backoff max ms number optional the maximum amount of time in milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect max in flight requests per connection number optional requests are pipelined to kafka brokers up to this number of maximum requests per broker connection note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re ordering due to retries (i e , if retries are enabled) default is 5 auto offset reset string optional a policy for resetting offsets on offsetoutofrange errors are earliest will move to the oldest available message, ‘latest’ will move to the most recent any other value will raise the exception default is ‘latest’ enable auto commit boolean optional if true , the consumer’s offset will be periodically committed in the background default is true auto commit interval ms number optional number of milliseconds between automatic offset commits, if enable auto commit is true default is 5000 check crcs boolean optional automatically check the crc32 of the records consumed default is true metadata max age ms number optional the period of time in milliseconds after which we force a refresh of metadata, even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions default is 300000 max poll records number optional the maximum number of records returned in a single call to poll() default is 500 max poll interval ms number optional the maximum delay between invocations of poll() when using consumer group management session timeout ms number optional the timeout used to detect failures when using kafka’s group management facilities heartbeat interval ms number optional the expected time in milliseconds between heartbeats to the consumer coordinator when using kafka’s group management facilities output parameter type description status code number http status code of the response reason string response reason phrase messages array response message message string response message partition offsets object output field partition offsets 1 number output field 1 2 number output field 2 example \[ { "status code" 200, "response headers" {}, "reason" "ok", "json body" { "messages" \[], "partition offsets" {} } } ] send data sends data to a kafka topic input argument name type required description topic string required kafka topic data string required data to send over kafka client id string optional a name for this client acks number optional the number of acknowledgments the producer requires the leader to have received before considering a request complete compression type string optional the compression type for all data generated by the producer valid values are ‘gzip’, ‘snappy’, ‘lz4’, or none retries number optional setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error batch size number optional requests sent to brokers will contain multiple batches, one for each partition with data available to be sent a small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely) default is 16384 linger ms number optional the producer groups together any records that arrive in between request transmissions into a single batched request default is 0 buffer memory number optional the total bytes of memory the producer should use to buffer records waiting to be sent to the server connections max idle ms number optional close idle connections after the number of milliseconds specified by this config max block ms number optional number of milliseconds to block during send() and partitions for() max request size number optional the maximum size of a request metadata max age ms number optional the period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions retry backoff ms number optional milliseconds to backoff when retrying on errors default is 100 request timeout ms number optional client request timeout in milliseconds default is 30000 receive buffer bytes number optional the size of the tcp receive buffer (so rcvbuf) to use when reading data send buffer bytes number optional the size of the tcp send buffer (so sndbuf) to use when sending data reconnect backoff ms number optional the amount of time in milliseconds to wait before attempting to reconnect to a given host defaults to 50 reconnect backoff max ms number optional the amount of time in milliseconds to wait before attempting to reconnect to a given host default is 50 max in flight requests per connection number optional requests are pipelined to kafka brokers up to this number of maximum requests per broker connection note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re ordering due to retries (i e , if retries are enabled) default is 5 security protocol string optional protocol used to communicate with brokers valid values are plaintext, ssl, sasl plaintext, sasl ssl default is plaintext api version string optional specify which kafka api version to use api version auto timeout ms number optional number of milliseconds to throw a timeout exception from the constructor when checking the broker api version metrics num samples number optional the number of samples maintained to compute metrics default is 2 metrics sample window ms number optional the maximum age in milliseconds of samples used to compute metrics default is 30000 output parameter type description status code number http status code of the response reason string response reason phrase success boolean whether the operation was successful example \[ { "status code" 200, "response headers" {}, "reason" "ok", "json body" { "success" true } } ] notes check here https //kafka python readthedocs io/en/master/index html to access kafka python api documentation for the connector