Libosmium  2.17.2
Fast and flexible C++ library for working with OpenStreetMap data
reader.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_READER_HPP
2 #define OSMIUM_IO_READER_HPP
3 
4 /*
5 
6 This file is part of Osmium (https://osmcode.org/libosmium).
7 
8 Copyright 2013-2021 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
37 #include <osmium/io/detail/input_format.hpp>
38 #include <osmium/io/detail/queue_util.hpp>
39 #include <osmium/io/detail/read_thread.hpp>
40 #include <osmium/io/detail/read_write.hpp>
41 #include <osmium/io/error.hpp>
42 #include <osmium/io/file.hpp>
43 #include <osmium/io/header.hpp>
44 #include <osmium/memory/buffer.hpp>
46 #include <osmium/thread/pool.hpp>
47 #include <osmium/thread/util.hpp>
48 #include <osmium/util/config.hpp>
49 
50 #include <cerrno>
51 #include <cstdlib>
52 #include <fcntl.h>
53 #include <future>
54 #include <memory>
55 #include <string>
56 #include <system_error>
57 #include <thread>
58 #include <utility>
59 
60 #ifndef _WIN32
61 # include <sys/wait.h>
62 #endif
63 
64 #ifndef _MSC_VER
65 # include <unistd.h>
66 #endif
67 
68 namespace osmium {
69 
70  namespace io {
71 
72  namespace detail {
73 
74  inline std::size_t get_input_queue_size() noexcept {
75  return osmium::config::get_max_queue_size("INPUT", 20);
76  }
77 
78  inline std::size_t get_osmdata_queue_size() noexcept {
79  return osmium::config::get_max_queue_size("OSMDATA", 20);
80  }
81 
82  } // namespace detail
83 
90  class Reader {
91 
92  // The Reader::read() function reads from a queue of buffers which
93  // can contain nested buffers. These nested buffers will be in
94  // here, because read() can only return a single unnested buffer.
95  osmium::memory::Buffer m_back_buffers{};
96 
98 
100 
101  std::atomic<std::size_t> m_offset{0};
102 
103  detail::ParserFactory::create_parser_type m_creator;
104 
105  enum class status {
106  okay = 0, // normal reading
107  error = 1, // some error occurred while reading
108  closed = 2, // close() called
109  eof = 3 // eof of file was reached without error
111 
112  int m_childpid = 0;
113 
114  detail::future_string_queue_type m_input_queue;
115 
116  int m_fd = -1;
117 
118  std::size_t m_file_size = 0;
119 
120  std::unique_ptr<osmium::io::Decompressor> m_decompressor;
121 
122  osmium::io::detail::ReadThreadManager m_read_thread_manager;
123 
124  detail::future_buffer_queue_type m_osmdata_queue;
125  detail::queue_wrapper<osmium::memory::Buffer> m_osmdata_queue_wrapper;
126 
127  std::future<osmium::io::Header> m_header_future{};
129 
131 
135 
136  void set_option(osmium::thread::Pool& pool) noexcept {
137  m_pool = &pool;
138  }
139 
141  m_read_which_entities = value;
142  }
143 
144  void set_option(osmium::io::read_meta value) noexcept {
145  // Ignore this setting if we have a history/change file,
146  // because if this is set to "no", we don't see the difference
147  // between visible and deleted objects.
149  m_read_metadata = value;
150  }
151  }
152 
153  void set_option(osmium::io::buffers_type value) noexcept {
154  m_buffers_kind = value;
155  }
156 
157  // This function will run in a separate thread.
159  int fd,
160  const detail::ParserFactory::create_parser_type& creator,
161  detail::future_string_queue_type& input_queue,
162  detail::future_buffer_queue_type& osmdata_queue,
163  std::promise<osmium::io::Header>&& header_promise,
164  std::atomic<std::size_t>* offset_ptr,
165  osmium::osm_entity_bits::type read_which_entities,
166  osmium::io::read_meta read_metadata,
167  osmium::io::buffers_type buffers_kind,
168  bool want_buffered_pages_removed) {
169  std::promise<osmium::io::Header> promise{std::move(header_promise)};
170  osmium::io::detail::parser_arguments args = {
171  pool,
172  fd,
173  input_queue,
174  osmdata_queue,
175  promise,
176  offset_ptr,
177  read_which_entities,
178  read_metadata,
179  buffers_kind,
180  want_buffered_pages_removed};
181  creator(args)->parse();
182  }
183 
184 #ifndef _WIN32
196  static int execute(const std::string& command, const std::string& filename, int* childpid) {
197  int pipefd[2];
198  if (pipe(pipefd) < 0) {
199  throw std::system_error{errno, std::system_category(), "opening pipe failed"};
200  }
201  const pid_t pid = fork();
202  if (pid < 0) {
203  throw std::system_error{errno, std::system_category(), "fork failed"};
204  }
205  if (pid == 0) { // child
206  // close all file descriptors except one end of the pipe
207  for (int i = 0; i < 32; ++i) {
208  if (i != pipefd[1]) {
209  ::close(i);
210  }
211  }
212  if (dup2(pipefd[1], 1) < 0) { // put end of pipe as stdout/stdin
213  std::exit(1); // NOLINT(concurrency-mt-unsafe)
214  }
215 
216  ::open("/dev/null", O_RDONLY); // stdin
217  ::open("/dev/null", O_WRONLY); // stderr
218  // hack: -g switches off globbing in curl which allows [] to be used in file names
219  // this is important for XAPI URLs
220  // in theory this execute() function could be used for other commands, but it is
221  // only used for curl at the moment, so this is okay.
222  if (::execlp(command.c_str(), command.c_str(), "-g", filename.c_str(), nullptr) < 0) {
223  std::exit(1); // NOLINT(concurrency-mt-unsafe)
224  }
225  }
226  // parent
227  *childpid = pid;
228  ::close(pipefd[1]);
229  return pipefd[0];
230  }
231 #endif
232 
241  static int open_input_file_or_url(const std::string& filename, int* childpid) {
242  const std::string protocol{filename.substr(0, filename.find_first_of(':'))};
243  if (protocol == "http" || protocol == "https" || protocol == "ftp" || protocol == "file") {
244 #ifndef _WIN32
245  return execute("curl", filename, childpid);
246 #else
247  throw io_error{"Reading OSM files from the network currently not supported on Windows."};
248 #endif
249  }
250  const int fd = osmium::io::detail::open_for_reading(filename);
251 #if __linux__
252  if (fd >= 0) {
253  // Tell the kernel we are going to read this file sequentially
254  ::posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
255  }
256 #endif
257  return fd;
258  }
259 
260  static std::unique_ptr<Decompressor> make_decompressor(const osmium::io::File& file, int fd, std::atomic<std::size_t>* offset_ptr) {
261  const auto& factory = osmium::io::CompressionFactory::instance();
262  std::unique_ptr<Decompressor> decompressor;
263 
264  if (file.buffer()) {
265  decompressor = factory.create_decompressor(file.compression(), file.buffer(), file.buffer_size());
266  } else if (file.format() == file_format::pbf) {
267  decompressor = std::unique_ptr<Decompressor>{new DummyDecompressor{}};
268  } else {
269  decompressor = factory.create_decompressor(file.compression(), fd);
270  }
271 
272  decompressor->set_offset_ptr(offset_ptr);
273  return decompressor;
274  }
275 
276  public:
277 
318  template <typename... TArgs>
319  explicit Reader(const osmium::io::File& file, TArgs&&... args) :
320  m_file(file.check()),
321  m_creator(detail::ParserFactory::instance().get_creator_function(m_file)),
322  m_input_queue(detail::get_input_queue_size(), "raw_input"),
323  m_fd(m_file.buffer() ? -1 : open_input_file_or_url(m_file.filename(), &m_childpid)),
324  m_file_size(m_fd > 2 ? osmium::file_size(m_fd) : 0),
327  m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
329 
330  (void)std::initializer_list<int>{(set_option(args), 0)...};
331 
332  if (!m_pool) {
334  }
335 
336  std::promise<osmium::io::Header> header_promise;
337  m_header_future = header_promise.get_future();
338 
340  if (cpc >= 0) {
341  m_decompressor->set_want_buffered_pages_removed(true);
342  }
343 
344  const int fd_for_parser = m_decompressor->is_real() ? -1 : m_fd;
345  m_thread = osmium::thread::thread_handler{parser_thread, std::ref(*m_pool), fd_for_parser, std::ref(m_creator),
346  std::ref(m_input_queue), std::ref(m_osmdata_queue),
347  std::move(header_promise), &m_offset, m_read_which_entities,
349  m_decompressor->want_buffered_pages_removed()};
350  }
351 
352  template <typename... TArgs>
353  explicit Reader(const std::string& filename, TArgs&&... args) :
354  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
355  }
356 
357  template <typename... TArgs>
358  explicit Reader(const char* filename, TArgs&&... args) :
359  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
360  }
361 
362  Reader(const Reader&) = delete;
363  Reader& operator=(const Reader&) = delete;
364 
365  Reader(Reader&&) = delete;
366  Reader& operator=(Reader&&) = delete;
367 
368  ~Reader() noexcept {
369  try {
370  close();
371  } catch (...) {
372  // Ignore any exceptions because destructor must not throw.
373  }
374  }
375 
384  void close() {
386 
387  m_read_thread_manager.stop();
388 
389  m_osmdata_queue_wrapper.drain();
390 
391  try {
392  m_read_thread_manager.close();
393  } catch (...) {
394  // Ignore any exceptions.
395  }
396 
397 #ifndef _WIN32
398  if (m_childpid) {
399  int status = 0;
400  const pid_t pid = ::waitpid(m_childpid, &status, 0);
401 #pragma GCC diagnostic push
402 #pragma GCC diagnostic ignored "-Wold-style-cast"
403  if (pid < 0 || !WIFEXITED(status) || WEXITSTATUS(status) != 0) { // NOLINT(hicpp-signed-bitwise)
404  throw std::system_error{errno, std::system_category(), "subprocess returned error"};
405  }
406 #pragma GCC diagnostic pop
407  m_childpid = 0;
408  }
409 #endif
410  }
411 
419  if (m_status == status::error) {
420  throw io_error{"Can not get header from reader when in status 'error'"};
421  }
422 
423  try {
424  if (m_header_future.valid()) {
425  m_header = m_header_future.get();
426  }
427  } catch (...) {
428  close();
430  throw;
431  }
432 
433  return m_header;
434  }
435 
444  osmium::memory::Buffer read() {
445  osmium::memory::Buffer buffer;
446 
447  // If there are buffers on the stack, return those first.
448  if (m_back_buffers) {
449  if (m_back_buffers.has_nested_buffers()) {
450  buffer = std::move(*m_back_buffers.get_last_nested());
451  } else {
452  buffer = std::move(m_back_buffers);
453  m_back_buffers = osmium::memory::Buffer{};
454  }
455  return buffer;
456  }
457 
458  if (m_status != status::okay) {
459  throw io_error{"Can not read from reader when in status 'closed', 'eof', or 'error'"};
460  }
461 
464  return buffer;
465  }
466 
467  try {
468  // m_input_format.read() can return an invalid buffer to signal EOF,
469  // or a valid buffer with or without data. A valid buffer
470  // without data is not an error, it just means we have to
471  // keep getting the next buffer until there is one with data.
472  while (true) {
473  buffer = m_osmdata_queue_wrapper.pop();
474  if (detail::at_end_of_data(buffer)) {
476  m_read_thread_manager.close();
477  return buffer;
478  }
479  if (buffer.has_nested_buffers()) {
480  m_back_buffers = std::move(buffer);
481  buffer = std::move(*m_back_buffers.get_last_nested());
482  }
483  if (buffer.committed() > 0) {
484  return buffer;
485  }
486  }
487  } catch (...) {
488  close();
490  throw;
491  }
492  }
493 
498  bool eof() const {
500  }
501 
506  std::size_t file_size() const noexcept {
507  return m_file_size;
508  }
509 
524  std::size_t offset() const noexcept {
525  return m_offset;
526  }
527 
528  }; // class Reader
529 
538  template <typename... TArgs>
539  osmium::memory::Buffer read_file(TArgs&&... args) {
540  osmium::memory::Buffer buffer{1024UL * 1024UL, osmium::memory::Buffer::auto_grow::yes};
541 
542  Reader reader{std::forward<TArgs>(args)...};
543  while (auto read_buffer = reader.read()) {
544  buffer.add_buffer(read_buffer);
545  buffer.commit();
546  }
547 
548  return buffer;
549  }
550 
551  } // namespace io
552 
553 } // namespace osmium
554 
555 #endif // OSMIUM_IO_READER_HPP
static CompressionFactory & instance()
Definition: compression.hpp:191
void set_offset_ptr(std::atomic< std::size_t > *offset_ptr) noexcept
Definition: compression.hpp:121
Definition: compression.hpp:287
Definition: file.hpp:72
size_t buffer_size() const noexcept
Definition: file.hpp:147
bool has_multiple_object_versions() const noexcept
Definition: file.hpp:303
const char * buffer() const noexcept
Definition: file.hpp:143
file_compression compression() const noexcept
Definition: file.hpp:294
file_format format() const noexcept
Definition: file.hpp:285
Definition: header.hpp:68
Definition: reader.hpp:90
osmium::memory::Buffer read()
Definition: reader.hpp:444
osmium::io::buffers_type m_buffers_kind
Definition: reader.hpp:134
static std::unique_ptr< Decompressor > make_decompressor(const osmium::io::File &file, int fd, std::atomic< std::size_t > *offset_ptr)
Definition: reader.hpp:260
detail::future_string_queue_type m_input_queue
Definition: reader.hpp:114
osmium::memory::Buffer m_back_buffers
Definition: reader.hpp:95
static void parser_thread(osmium::thread::Pool &pool, int fd, const detail::ParserFactory::create_parser_type &creator, detail::future_string_queue_type &input_queue, detail::future_buffer_queue_type &osmdata_queue, std::promise< osmium::io::Header > &&header_promise, std::atomic< std::size_t > *offset_ptr, osmium::osm_entity_bits::type read_which_entities, osmium::io::read_meta read_metadata, osmium::io::buffers_type buffers_kind, bool want_buffered_pages_removed)
Definition: reader.hpp:158
int m_childpid
Definition: reader.hpp:112
void set_option(osmium::io::read_meta value) noexcept
Definition: reader.hpp:144
detail::future_buffer_queue_type m_osmdata_queue
Definition: reader.hpp:124
std::size_t m_file_size
Definition: reader.hpp:118
Reader & operator=(Reader &&)=delete
void set_option(osmium::thread::Pool &pool) noexcept
Definition: reader.hpp:136
void set_option(osmium::io::buffers_type value) noexcept
Definition: reader.hpp:153
static int execute(const std::string &command, const std::string &filename, int *childpid)
Definition: reader.hpp:196
enum osmium::io::Reader::status m_status
static int open_input_file_or_url(const std::string &filename, int *childpid)
Definition: reader.hpp:241
std::unique_ptr< osmium::io::Decompressor > m_decompressor
Definition: reader.hpp:120
status
Definition: reader.hpp:105
Reader(const char *filename, TArgs &&... args)
Definition: reader.hpp:358
osmium::io::Header m_header
Definition: reader.hpp:128
detail::ParserFactory::create_parser_type m_creator
Definition: reader.hpp:103
Reader & operator=(const Reader &)=delete
osmium::io::Header header()
Definition: reader.hpp:418
detail::queue_wrapper< osmium::memory::Buffer > m_osmdata_queue_wrapper
Definition: reader.hpp:125
Reader(const osmium::io::File &file, TArgs &&... args)
Definition: reader.hpp:319
std::size_t file_size() const noexcept
Definition: reader.hpp:506
Reader(Reader &&)=delete
void set_option(osmium::osm_entity_bits::type value) noexcept
Definition: reader.hpp:140
std::future< osmium::io::Header > m_header_future
Definition: reader.hpp:127
osmium::io::detail::ReadThreadManager m_read_thread_manager
Definition: reader.hpp:122
bool eof() const
Definition: reader.hpp:498
Reader(const Reader &)=delete
std::size_t offset() const noexcept
Definition: reader.hpp:524
std::atomic< std::size_t > m_offset
Definition: reader.hpp:101
osmium::thread::thread_handler m_thread
Definition: reader.hpp:130
void close()
Definition: reader.hpp:384
~Reader() noexcept
Definition: reader.hpp:368
osmium::io::File m_file
Definition: reader.hpp:97
osmium::osm_entity_bits::type m_read_which_entities
Definition: reader.hpp:132
Reader(const std::string &filename, TArgs &&... args)
Definition: reader.hpp:353
osmium::io::read_meta m_read_metadata
Definition: reader.hpp:133
int m_fd
Definition: reader.hpp:116
osmium::thread::Pool * m_pool
Definition: reader.hpp:99
Definition: pool.hpp:90
static Pool & default_instance()
Definition: pool.hpp:186
Definition: util.hpp:85
Definition: attr.hpp:342
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
int8_t clean_page_cache_after_read() noexcept
Definition: config.hpp:106
osmium::memory::Buffer read_file(TArgs &&... args)
Definition: reader.hpp:539
buffers_type
Definition: file_format.hpp:60
read_meta
Definition: file_format.hpp:55
type
Definition: entity_bits.hpp:63
@ all
object or changeset
Definition: entity_bits.hpp:76
@ nothing
Definition: entity_bits.hpp:67
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:555
Definition: error.hpp:46