OmniSciDB  c07336695a
flavors.FlavorConsumer Class Reference

Static Public Member Functions

static void main (String[] args) throws Exception
 

Detailed Description

Definition at line 17 of file FlavorConsumer.java.

Member Function Documentation

◆ main()

static void flavors.FlavorConsumer.main ( String []  args) throws Exception
inlinestatic

Definition at line 18 of file FlavorConsumer.java.

References run-benchmark-import.args, and Experimental.String.

18  {
19  if (args.length < 2) {
20  System.out.println(
21  "Usage:\nFlavorConsumer <kafka-topic-name> <mapd-database-password>");
22  return;
23  }
24  // Configure the Kafka Consumer
25  String topicName = args[0].toString();
26  Properties props = new Properties();
27 
28  props.put("bootstrap.servers", "localhost:9097"); // Use 9097 so as not
29  // to collide with
30  // MapD Immerse
31  props.put("group.id", "test");
32  props.put("enable.auto.commit", "true");
33  props.put("auto.commit.interval.ms", "1000");
34  props.put("session.timeout.ms", "30000");
35  props.put("key.deserializer",
36  "org.apache.kafka.common.serialization.StringDeserializer");
37  props.put("value.deserializer",
38  "org.apache.kafka.common.serialization.StringDeserializer");
39  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
40 
41  // Subscribe the Kafka Consumer to the topic.
42  consumer.subscribe(Arrays.asList(topicName));
43 
44  // print the topic name
45  System.out.println("Subscribed to topic " + topicName);
46 
47  String flavorValue = "";
48 
49  while (true) {
50  ConsumerRecords<String, String> records = consumer.poll(1000);
51 
52  // Create connection and prepared statement objects
53  Connection conn = null;
54  PreparedStatement pstmt = null;
55 
56  try {
57  // JDBC driver name and database URL
58  final String JDBC_DRIVER = "com.mapd.jdbc.MapDDriver";
59  final String DB_URL = "jdbc:mapd:localhost:6274:mapd";
60 
61  // Database credentials
62  final String USER = "mapd";
63  final String PASS = args[1].toString(); // name and pw in cleartext?
64 
65  // STEP 1: Register JDBC driver
66  Class.forName(JDBC_DRIVER);
67 
68  // STEP 2: Open a connection
69  conn = DriverManager.getConnection(DB_URL, USER, PASS);
70 
71  // STEP 3: Prepare a statement template
72  pstmt = conn.prepareStatement("INSERT INTO flavors VALUES (?)");
73 
74  // STEP 4: Populate the prepared statement batch
75  for (ConsumerRecord<String, String> record : records) {
76  flavorValue = record.value();
77  pstmt.setString(1, flavorValue);
78  pstmt.addBatch();
79  }
80 
81  // STEP 5: Execute the batch statement (send records to MapD
82  // Core Database)
83  pstmt.executeBatch();
84 
85  // Commit and close the connection.
86  conn.commit();
87  conn.close();
88 
89  } catch (SQLException se) {
90  // Handle errors for JDBC
91  se.printStackTrace();
92 
93  } catch (Exception e) {
94  // Handle errors for Class.forName
95  e.printStackTrace();
96  } finally {
97  try {
98  if (pstmt != null) {
99  pstmt.close();
100  }
101  } catch (SQLException se2) {
102  } // nothing we can do
103 
104  try {
105  if (conn != null) {
106  conn.close();
107  }
108  } catch (SQLException se) {
109  se.printStackTrace();
110  } // end finally try
111 
112  } // end try
113  } // end main
114  }

The documentation for this class was generated from the following file: