OmniSciDB  04ee39c94c
StreamInsertSimple.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
26 #include <boost/tokenizer.hpp>
27 #include <cstring>
28 #include <iostream>
29 #include <string>
30 
31 // include files for Thrift and MapD Thrift Services
32 #include <thrift/protocol/TBinaryProtocol.h>
33 #include <thrift/transport/TBufferTransports.h>
34 #include <thrift/transport/TSocket.h>
35 #include "gen-cpp/MapD.h"
36 
37 using namespace ::apache::thrift;
38 using namespace ::apache::thrift::protocol;
39 using namespace ::apache::thrift::transport;
40 
41 #ifdef HAVE_THRIFT_STD_SHAREDPTR
42 #include <memory>
43 namespace mapd {
44 using std::make_shared;
45 using std::shared_ptr;
46 } // namespace mapd
47 #else
48 #include <boost/make_shared.hpp>
49 namespace mapd {
50 using boost::make_shared;
51 using boost::shared_ptr;
52 } // namespace mapd
53 #endif // HAVE_THRIFT_STD_SHAREDPTR
54 
55 namespace {
56 // anonymous namespace for private functions
57 const size_t INSERT_BATCH_SIZE = 10000;
58 
59 // reads tab-delimited rows from std::cin and load them to
60 // table_name in batches of size INSERT_BATCH_SIZE until done
61 void stream_insert(MapDClient& client,
62  const TSessionId session,
63  const std::string& table_name,
64  const TRowDescriptor& row_desc,
65  const char* delimiter) {
66  std::string line;
67  std::vector<TStringRow> input_rows;
68  TStringRow row;
69  boost::char_separator<char> sep{delimiter, "", boost::keep_empty_tokens};
70  while (std::getline(std::cin, line)) {
71  row.cols.clear();
72  boost::tokenizer<boost::char_separator<char>> tok{line, sep};
73  for (const auto& s : tok) {
74  TStringValue ts;
75  ts.str_val = s;
76  ts.is_null = s.empty();
77  row.cols.push_back(ts);
78  }
79  if (row.cols.size() != row_desc.size()) {
80  std::cerr << "Incorrect number of columns: (" << row.cols.size() << " vs "
81  << row_desc.size() << ") " << line << std::endl;
82  continue;
83  }
84  input_rows.push_back(row);
85  if (input_rows.size() >= INSERT_BATCH_SIZE) {
86  try {
87  client.load_table(session, table_name, input_rows);
88  } catch (TMapDException& e) {
89  std::cerr << e.error_msg << std::endl;
90  }
91  input_rows.clear();
92  }
93  }
94  // load remaining rowset if any
95  if (input_rows.size() > 0) {
96  client.load_table(session, table_name, input_rows);
97  }
98 }
99 } // namespace
100 
101 int main(int argc, char** argv) {
102  std::string server_host("localhost"); // default to localohost
103  int port = 6274; // default port number
104  const char* delimiter = "\t"; // only support tab delimiter for now
105 
106  if (argc < 5) {
107  std::cout << "Usage: <table> <database> <user> <password> [hostname[:port]]"
108  << std::endl;
109  return 1;
110  }
111  std::string table_name(argv[1]);
112  std::string db_name(argv[2]);
113  std::string user_name(argv[3]);
114  std::string passwd(argv[4]);
115 
116  if (argc >= 6) {
117  char* host = strtok(argv[5], ":");
118  char* portno = strtok(NULL, ":");
119  server_host = host;
120  if (portno != NULL) {
121  port = atoi(portno);
122  }
123  }
124 
125  mapd::shared_ptr<TTransport> socket(new TSocket(server_host, port));
126  mapd::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
127  mapd::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
128  MapDClient client(protocol);
129  TSessionId session;
130  try {
131  transport->open(); // open transport
132  client.connect(session, user_name, passwd, db_name); // connect to omnisci_server
133  TTableDetails table_details;
134  client.get_table_details(table_details, session, table_name);
135  stream_insert(client, session, table_name, table_details.row_desc, delimiter);
136  client.disconnect(session); // disconnect from omnisci_server
137  transport->close(); // close transport
138  } catch (TMapDException& e) {
139  std::cerr << e.error_msg << std::endl;
140  return 1;
141  } catch (TException& te) {
142  std::cerr << "Thrift error: " << te.what() << std::endl;
143  return 1;
144  }
145 
146  return 0;
147 }
int main(int argc, char **argv)
Definition: DataGen.cpp:60
mapd::shared_ptr< MapDClient > client
void stream_insert(RowToColumnLoader &row_loader, const std::map< std::string, std::pair< std::unique_ptr< boost::regex >, std::unique_ptr< std::string >>> &transformations, const Importer_NS::CopyParams &copy_params, const bool remove_quotes)
TSessionId session