Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2 pyexcel_io.plugins 

3 ~~~~~~~~~~~~~~~~~~~ 

4 

5 factory for getting readers and writers 

6 

7 :copyright: (c) 2014-2020 by Onni Software Ltd. 

8 :license: New BSD License, see LICENSE for more details 

9""" 

10import pyexcel_io.utils as ioutils 

11import pyexcel_io.manager as manager 

12import pyexcel_io.constants as constants 

13import pyexcel_io.exceptions as exceptions 

14from lml.loader import scan_plugins_regex 

15from lml.plugin import PluginInfo, PluginManager, PluginInfoChain 

16 

17ERROR_MESSAGE_FORMATTER = "one of these plugins for %s data in '%s': %s" 

18UPGRADE_MESSAGE = "Please upgrade the plugin '%s' according to \ 

19plugin compactibility table." 

20READER_PLUGIN = "pyexcel-io reader" 

21READER_PLUGIN_V2 = "pyexcel-io v2 reader" 

22WRITER_PLUGIN = "pyexcel-io writer" 

23WRITER_PLUGIN_V2 = "pyexcel-io v2 writer" 

24 

25 

26class IOPluginInfo(PluginInfo): 

27 """Pyexcel-io plugin info description""" 

28 

29 def tags(self): 

30 for file_type in self.file_types: 

31 yield file_type 

32 

33 

34class IOPluginInfoChain(PluginInfoChain): 

35 """provide custom functions to add a reader and a writer """ 

36 

37 def add_a_reader( 

38 self, 

39 relative_plugin_class_path=None, 

40 file_types=None, 

41 stream_type=None, 

42 ): 

43 """ add pyexcle-io reader plugin info """ 

44 a_plugin_info = IOPluginInfo( 

45 READER_PLUGIN, 

46 self._get_abs_path(relative_plugin_class_path), 

47 file_types=file_types, 

48 stream_type=stream_type, 

49 ) 

50 return self.add_a_plugin_instance(a_plugin_info) 

51 

52 def add_a_writer( 

53 self, 

54 relative_plugin_class_path=None, 

55 file_types=None, 

56 stream_type=None, 

57 ): 

58 """ add pyexcle-io writer plugin info """ 

59 a_plugin_info = IOPluginInfo( 

60 WRITER_PLUGIN, 

61 self._get_abs_path(relative_plugin_class_path), 

62 file_types=file_types, 

63 stream_type=stream_type, 

64 ) 

65 return self.add_a_plugin_instance(a_plugin_info) 

66 

67 

68class IOPluginInfoChainV2(PluginInfoChain): 

69 """provide custom functions to add a reader and a writer """ 

70 

71 def add_a_reader( 

72 self, 

73 relative_plugin_class_path=None, 

74 locations=(), 

75 file_types=None, 

76 stream_type=None, 

77 ): 

78 """ add pyexcle-io reader plugin info """ 

79 a_plugin_info = IOPluginInfo( 

80 READER_PLUGIN_V2, 

81 self._get_abs_path(relative_plugin_class_path), 

82 file_types=[ 

83 f"{location}-{file_type}" 

84 for file_type in file_types 

85 for location in locations 

86 ], 

87 stream_type=stream_type, 

88 ) 

89 return self.add_a_plugin_instance(a_plugin_info) 

90 

91 def add_a_writer( 

92 self, 

93 relative_plugin_class_path=None, 

94 locations=(), 

95 file_types=(), 

96 stream_type=None, 

97 ): 

98 """ add pyexcle-io writer plugin info """ 

99 a_plugin_info = IOPluginInfo( 

100 WRITER_PLUGIN_V2, 

101 self._get_abs_path(relative_plugin_class_path), 

102 file_types=[ 

103 f"{location}-{file_type}" 

104 for file_type in file_types 

105 for location in locations 

106 ], 

107 stream_type=stream_type, 

108 ) 

109 return self.add_a_plugin_instance(a_plugin_info) 

110 

111 

112class IOManager(PluginManager): 

113 """Manage pyexcel-io plugins""" 

114 

115 def __init__(self, plugin_type, known_list): 

116 PluginManager.__init__(self, plugin_type) 

117 self.known_plugins = known_list 

118 self.action = "read" 

119 if self.plugin_name == WRITER_PLUGIN: 

120 self.action = "write" 

121 

122 def load_me_later(self, plugin_info): 

123 PluginManager.load_me_later(self, plugin_info) 

124 _do_additional_registration(plugin_info) 

125 

126 def register_a_plugin(self, cls, plugin_info): 

127 """ for dynamically loaded plugin """ 

128 PluginManager.register_a_plugin(self, cls, plugin_info) 

129 _do_additional_registration(plugin_info) 

130 

131 def get_a_plugin(self, file_type=None, library=None, **keywords): 

132 __file_type = file_type.lower() 

133 try: 

134 plugin = self.load_me_now(__file_type, library=library) 

135 except Exception: 

136 self.raise_exception(__file_type) 

137 handler = plugin() 

138 handler.set_type(__file_type) 

139 return handler 

140 

141 def raise_exception(self, file_type): 

142 plugins = self.known_plugins.get(file_type, None) 

143 if plugins: 

144 message = "Please install " 

145 if len(plugins) > 1: 

146 message += ERROR_MESSAGE_FORMATTER % ( 

147 self.action, 

148 file_type, 

149 ",".join(plugins), 

150 ) 

151 else: 

152 message += plugins[0] 

153 raise exceptions.SupportingPluginAvailableButNotInstalled(message) 

154 

155 else: 

156 raise exceptions.NoSupportingPluginFound( 

157 "No suitable library found for %s" % file_type 

158 ) 

159 

160 def get_all_formats(self): 

161 """ return all supported formats """ 

162 all_formats = set( 

163 list(self.registry.keys()) + list(self.known_plugins.keys()) 

164 ) 

165 all_formats = all_formats.difference( 

166 set([constants.DB_SQL, constants.DB_DJANGO]) 

167 ) 

168 return all_formats 

169 

170 

171class NewIOManager(IOManager): 

172 def load_me_later(self, plugin_info): 

173 PluginManager.load_me_later(self, plugin_info) 

174 _do_additional_registration_for_new_plugins(plugin_info) 

175 

176 def register_a_plugin(self, cls, plugin_info): 

177 """ for dynamically loaded plugin """ 

178 PluginManager.register_a_plugin(self, cls, plugin_info) 

179 _do_additional_registration_for_new_plugins(plugin_info) 

180 

181 def get_a_plugin( 

182 self, file_type=None, location=None, library=None, **keywords 

183 ): 

184 __file_type = file_type.lower() 

185 plugin = self.load_me_now(f"{location}-{__file_type}", library=library) 

186 return plugin 

187 

188 def raise_exception(self, file_type): 

189 file_type = file_type.split("-")[1] 

190 plugins = self.known_plugins.get(file_type, None) 

191 if plugins: 

192 message = "Please install " 

193 if len(plugins) > 1: 

194 message += ERROR_MESSAGE_FORMATTER % ( 

195 self.action, 

196 file_type, 

197 ",".join(plugins), 

198 ) 

199 else: 

200 message += plugins[0] 

201 raise exceptions.SupportingPluginAvailableButNotInstalled(message) 

202 

203 else: 

204 raise exceptions.NoSupportingPluginFound( 

205 "No suitable library found for %s" % file_type 

206 ) 

207 

208 def get_all_formats(self): 

209 """ return all supported formats """ 

210 all_formats = set( 

211 [x.split("-")[1] for x in self.registry.keys()] 

212 + list(self.known_plugins.keys()) 

213 ) 

214 return all_formats 

215 

216 

217def _do_additional_registration(plugin_info): 

218 for file_type in plugin_info.tags(): 

219 manager.register_stream_type(file_type, plugin_info.stream_type) 

220 manager.register_a_file_type(file_type, plugin_info.stream_type, None) 

221 

222 

223def _do_additional_registration_for_new_plugins(plugin_info): 

224 for file_type in plugin_info.tags(): 

225 manager.register_stream_type( 

226 file_type.split("-")[1], plugin_info.stream_type 

227 ) 

228 manager.register_a_file_type( 

229 file_type.split("-")[1], plugin_info.stream_type, None 

230 ) 

231 

232 

233class AllReaders: 

234 def get_all_formats(self): 

235 return OLD_READERS.get_all_formats().union( 

236 NEW_READERS.get_all_formats() 

237 ) - set([constants.DB_SQL, constants.DB_DJANGO]) 

238 

239 

240class AllWriters: 

241 def get_all_formats(self): 

242 return OLD_WRITERS.get_all_formats().union( 

243 NEW_WRITERS.get_all_formats() 

244 ) - set([constants.DB_SQL, constants.DB_DJANGO]) 

245 

246 

247OLD_READERS = IOManager(READER_PLUGIN, ioutils.AVAILABLE_READERS) 

248OLD_WRITERS = IOManager(WRITER_PLUGIN, ioutils.AVAILABLE_WRITERS) 

249NEW_WRITERS = NewIOManager(WRITER_PLUGIN_V2, ioutils.AVAILABLE_WRITERS) 

250NEW_READERS = NewIOManager(READER_PLUGIN_V2, ioutils.AVAILABLE_READERS) 

251READERS = AllReaders() 

252WRITERS = AllWriters() 

253 

254 

255def load_plugins(plugin_name_patterns, path, black_list, white_list): 

256 """Try to discover all pyexcel-io plugins""" 

257 scan_plugins_regex( 

258 plugin_name_patterns=plugin_name_patterns, 

259 pyinstaller_path=path, 

260 black_list=black_list, 

261 white_list=white_list, 

262 )