View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *  http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.any23.filter;
19  
20  import org.apache.any23.extractor.ExtractionContext;
21  import org.apache.any23.writer.TripleHandler;
22  import org.apache.any23.writer.TripleHandlerException;
23  import org.eclipse.rdf4j.model.Resource;
24  import org.eclipse.rdf4j.model.IRI;
25  import org.eclipse.rdf4j.model.Value;
26  
27  import java.util.ArrayList;
28  import java.util.HashMap;
29  import java.util.List;
30  import java.util.Map;
31  
32  /**
33   * A wrapper around a {@link TripleHandler} that can block and unblock
34   * calls to the handler, either for the entire document, or for
35   * individual {@link ExtractionContext}s. A document is initially
36   * blocked and must be explicitly unblocked. Contexts are initially
37   * unblocked and must be explicitly blocked. Unblocking a document
38   * unblocks all contexts as well.
39   * This class it thread-safe.
40   *
41   * @author Richard Cyganiak (richard@cyganiak.de)
42   */
43  public class ExtractionContextBlocker implements TripleHandler {
44  
45      private TripleHandler wrapped;
46  
47      private Map<String, ValvedTriplePipe> contextQueues = new HashMap<String, ValvedTriplePipe>();
48      
49      private boolean documentBlocked;
50  
51      public ExtractionContextBlocker(TripleHandler wrapped) {
52          this.wrapped = wrapped;
53      }
54  
55      public boolean isDocBlocked() {
56          return documentBlocked;
57      }
58  
59      public synchronized void blockContext(ExtractionContext context) {
60          if (!documentBlocked) return;
61          try {
62              contextQueues.get(context.getUniqueID()).block();
63          } catch (ValvedTriplePipeException e) {
64              throw new RuntimeException("Error while blocking context", e);
65          }
66      }
67  
68      public synchronized void unblockContext(ExtractionContext context) {
69          try {
70              contextQueues.get(context.getUniqueID()).unblock();
71          } catch (ValvedTriplePipeException e) {
72              throw new RuntimeException("Error while unblocking context", e);
73          }
74      }
75  
76      public synchronized void startDocument(IRI documentIRI) throws TripleHandlerException {
77          wrapped.startDocument(documentIRI);
78          documentBlocked = true;
79      }
80  
81      public synchronized void openContext(ExtractionContext context) throws TripleHandlerException {
82          contextQueues.put(context.getUniqueID(), new ValvedTriplePipe(context));
83      }
84  
85      public synchronized void closeContext(ExtractionContext context) {
86          // Empty. We'll close all contexts when the document is finished.
87      }
88  
89      public synchronized void unblockDocument() {
90          if (!documentBlocked) return;
91          documentBlocked = false;
92          for (ValvedTriplePipe pipe : contextQueues.values()) {
93              try {
94                  pipe.unblock();
95              } catch (ValvedTriplePipeException e) {
96                  throw new RuntimeException("Error while unblocking context", e);
97              }
98          }
99      }
100 
101     public synchronized void receiveTriple(Resource s, IRI p, Value o, IRI g, ExtractionContext context)
102     throws TripleHandlerException {
103         try {
104             contextQueues.get(context.getUniqueID()).receiveTriple(s, p, o, g);
105         } catch (ValvedTriplePipeException e) {
106             throw new TripleHandlerException(
107                     String.format("Error while receiving triple %s %s %s", s, p, o),
108                     e
109             );
110         }
111     }
112 
113     public synchronized void receiveNamespace(String prefix, String uri, ExtractionContext context)
114     throws TripleHandlerException {
115         try {
116             contextQueues.get(context.getUniqueID()).receiveNamespace(prefix, uri);
117         } catch (ValvedTriplePipeException e) {
118             throw new TripleHandlerException(
119                     String.format("Error while receiving namespace %s:%s", prefix, uri),
120                     e
121             );
122         }
123     }
124 
125     public synchronized void close() throws TripleHandlerException {
126         closeDocument();
127         wrapped.close();
128     }
129 
130     public synchronized void endDocument(IRI documentIRI) throws TripleHandlerException {
131         closeDocument();
132         wrapped.endDocument(documentIRI);
133     }
134 
135     public void setContentLength(long contentLength) {
136         // Empty.
137     }
138 
139     private void closeDocument() {
140         for (ValvedTriplePipe pipe : contextQueues.values()) {
141             try {
142                 pipe.close();
143             } catch (ValvedTriplePipeException e) {
144                 throw new RuntimeException("Error closing document", e);
145             }
146         }
147         contextQueues.clear();
148     }
149 
150     private class ValvedTriplePipeException extends Exception {
151 
152         private ValvedTriplePipeException(String s) {
153             super(s);
154         }
155 
156         private ValvedTriplePipeException(Throwable throwable) {
157             super(throwable);
158         }
159 
160         private ValvedTriplePipeException(String s, Throwable throwable) {
161             super(s, throwable);
162         }
163 
164     }
165 
166     private class ValvedTriplePipe {
167 
168         private final ExtractionContext context;
169 
170         private final List<Resource> subjects = new ArrayList<Resource>();
171 
172         private final List<IRI> predicates = new ArrayList<IRI>();
173 
174         private final List<Value> objects = new ArrayList<Value>();
175 
176         private final List<IRI> graphs = new ArrayList<IRI>();
177 
178         private final List<String> prefixes = new ArrayList<String>();
179 
180         private final List<String> uris = new ArrayList<String>();
181 
182         private boolean blocked = false;
183 
184         private boolean hasReceivedTriples = false;
185 
186         ValvedTriplePipe(ExtractionContext context) {
187             this.context = context;
188         }
189 
190         void receiveTriple(Resource s, IRI p, Value o, IRI g) throws ValvedTriplePipeException {
191             if (blocked) {
192                 subjects.add(s);
193                 predicates.add(p);
194                 objects.add(o);
195                 graphs.add(g);
196             } else {
197                 sendTriple(s, p, o, g);
198             }
199         }
200 
201         void receiveNamespace(String prefix, String uri) throws ValvedTriplePipeException {
202             if (blocked) {
203                 prefixes.add(prefix);
204                 uris.add(uri);
205             } else {
206                 sendNamespace(prefix, uri);
207             }
208         }
209 
210         void block() throws ValvedTriplePipeException {
211             if (blocked) return;
212             blocked = true;
213         }
214 
215         void unblock() throws ValvedTriplePipeException {
216             if (!blocked) return;
217             blocked = false;
218             for (int i = 0; i < prefixes.size(); i++) {
219                 sendNamespace(prefixes.get(i), uris.get(i));
220             }
221             for (int i = 0; i < subjects.size(); i++) {
222                 sendTriple(subjects.get(i), predicates.get(i), objects.get(i), graphs.get(i));
223             }
224         }
225 
226         void close() throws ValvedTriplePipeException {
227             if (hasReceivedTriples) {
228                 try {
229                     wrapped.closeContext(context);
230                 } catch (TripleHandlerException e) {
231                     throw new ValvedTriplePipeException("Error while closing the triple hanlder", e);
232                 }
233             }
234         }
235 
236         private void sendTriple(Resource s, IRI p, Value o, IRI g) throws ValvedTriplePipeException {
237             if (!hasReceivedTriples) {
238                 try {
239                 wrapped.openContext(context);
240                 } catch(TripleHandlerException e) {
241                     throw new ValvedTriplePipeException("Error while opening the triple handler", e);
242                 }
243                 hasReceivedTriples = true;
244             }
245             try {
246                 wrapped.receiveTriple(s, p, o, g, context);
247             } catch (TripleHandlerException e) {
248                 throw new ValvedTriplePipeException("Error while opening the triple handler", e);
249             }
250         }
251 
252         private void sendNamespace(String prefix, String uri) throws ValvedTriplePipeException {
253             if (!hasReceivedTriples) {
254                 try {
255                     wrapped.openContext(context);
256                 } catch (TripleHandlerException e) {
257                     throw new ValvedTriplePipeException("Error while sending the namespace", e);
258                 }
259                 hasReceivedTriples = true;
260             }
261             try {
262                 wrapped.receiveNamespace(prefix, uri, context);
263             } catch (TripleHandlerException e) {
264                 throw new ValvedTriplePipeException("Error while receiving the namespace", e);            }
265         }
266     }
267 
268 }