View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *  http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.any23.filter;
19  
20  import org.apache.any23.extractor.ExtractionContext;
21  import org.apache.any23.writer.TripleHandler;
22  import org.apache.any23.writer.TripleHandlerException;
23  import org.eclipse.rdf4j.model.Resource;
24  import org.eclipse.rdf4j.model.IRI;
25  import org.eclipse.rdf4j.model.Value;
26  
27  import java.util.ArrayList;
28  import java.util.HashMap;
29  import java.util.List;
30  import java.util.Locale;
31  import java.util.Map;
32  
33  /**
34   * A wrapper around a {@link TripleHandler} that can block and unblock calls to the handler, either for the entire
35   * document, or for individual {@link ExtractionContext}s. A document is initially blocked and must be explicitly
36   * unblocked. Contexts are initially unblocked and must be explicitly blocked. Unblocking a document unblocks all
37   * contexts as well. This class it thread-safe.
38   *
39   * @author Richard Cyganiak (richard@cyganiak.de)
40   */
41  public class ExtractionContextBlocker implements TripleHandler {
42  
43      private TripleHandler wrapped;
44  
45      private Map<String, ValvedTriplePipe> contextQueues = new HashMap<String, ValvedTriplePipe>();
46  
47      private boolean documentBlocked;
48  
49      public ExtractionContextBlocker(TripleHandler wrapped) {
50          this.wrapped = wrapped;
51      }
52  
53      public boolean isDocBlocked() {
54          return documentBlocked;
55      }
56  
57      public synchronized void blockContext(ExtractionContext context) {
58          if (!documentBlocked)
59              return;
60          try {
61              contextQueues.get(context.getUniqueID()).block();
62          } catch (ValvedTriplePipeException e) {
63              throw new RuntimeException("Error while blocking context", e);
64          }
65      }
66  
67      public synchronized void unblockContext(ExtractionContext context) {
68          try {
69              contextQueues.get(context.getUniqueID()).unblock();
70          } catch (ValvedTriplePipeException e) {
71              throw new RuntimeException("Error while unblocking context", e);
72          }
73      }
74  
75      public synchronized void startDocument(IRI documentIRI) throws TripleHandlerException {
76          wrapped.startDocument(documentIRI);
77          documentBlocked = true;
78      }
79  
80      public synchronized void openContext(ExtractionContext context) throws TripleHandlerException {
81          contextQueues.put(context.getUniqueID(), new ValvedTriplePipe(context));
82      }
83  
84      public synchronized void closeContext(ExtractionContext context) {
85          // Empty. We'll close all contexts when the document is finished.
86      }
87  
88      public synchronized void unblockDocument() {
89          if (!documentBlocked)
90              return;
91          documentBlocked = false;
92          for (ValvedTriplePipe pipe : contextQueues.values()) {
93              try {
94                  pipe.unblock();
95              } catch (ValvedTriplePipeException e) {
96                  throw new RuntimeException("Error while unblocking context", e);
97              }
98          }
99      }
100 
101     public synchronized void receiveTriple(Resource s, IRI p, Value o, IRI g, ExtractionContext context)
102             throws TripleHandlerException {
103         try {
104             contextQueues.get(context.getUniqueID()).receiveTriple(s, p, o, g);
105         } catch (ValvedTriplePipeException e) {
106             throw new TripleHandlerException(
107                     String.format(Locale.ROOT, "Error while receiving triple %s %s %s", s, p, o), e);
108         }
109     }
110 
111     public synchronized void receiveNamespace(String prefix, String uri, ExtractionContext context)
112             throws TripleHandlerException {
113         try {
114             contextQueues.get(context.getUniqueID()).receiveNamespace(prefix, uri);
115         } catch (ValvedTriplePipeException e) {
116             throw new TripleHandlerException(
117                     String.format(Locale.ROOT, "Error while receiving namespace %s:%s", prefix, uri), e);
118         }
119     }
120 
121     public synchronized void close() throws TripleHandlerException {
122         closeDocument();
123         wrapped.close();
124     }
125 
126     public synchronized void endDocument(IRI documentIRI) throws TripleHandlerException {
127         closeDocument();
128         wrapped.endDocument(documentIRI);
129     }
130 
131     public void setContentLength(long contentLength) {
132         // Empty.
133     }
134 
135     private void closeDocument() {
136         for (ValvedTriplePipe pipe : contextQueues.values()) {
137             try {
138                 pipe.close();
139             } catch (ValvedTriplePipeException e) {
140                 throw new RuntimeException("Error closing document", e);
141             }
142         }
143         contextQueues.clear();
144     }
145 
146     private static class ValvedTriplePipeException extends Exception {
147 
148         private ValvedTriplePipeException(String s) {
149             super(s);
150         }
151 
152         private ValvedTriplePipeException(Throwable throwable) {
153             super(throwable);
154         }
155 
156         private ValvedTriplePipeException(String s, Throwable throwable) {
157             super(s, throwable);
158         }
159 
160     }
161 
162     private class ValvedTriplePipe {
163 
164         private final ExtractionContext context;
165 
166         private final List<Resource> subjects = new ArrayList<Resource>();
167 
168         private final List<IRI> predicates = new ArrayList<IRI>();
169 
170         private final List<Value> objects = new ArrayList<Value>();
171 
172         private final List<IRI> graphs = new ArrayList<IRI>();
173 
174         private final List<String> prefixes = new ArrayList<String>();
175 
176         private final List<String> uris = new ArrayList<String>();
177 
178         private boolean blocked = false;
179 
180         private boolean hasReceivedTriples = false;
181 
182         ValvedTriplePipe(ExtractionContext context) {
183             this.context = context;
184         }
185 
186         void receiveTriple(Resource s, IRI p, Value o, IRI g) throws ValvedTriplePipeException {
187             if (blocked) {
188                 subjects.add(s);
189                 predicates.add(p);
190                 objects.add(o);
191                 graphs.add(g);
192             } else {
193                 sendTriple(s, p, o, g);
194             }
195         }
196 
197         void receiveNamespace(String prefix, String uri) throws ValvedTriplePipeException {
198             if (blocked) {
199                 prefixes.add(prefix);
200                 uris.add(uri);
201             } else {
202                 sendNamespace(prefix, uri);
203             }
204         }
205 
206         void block() throws ValvedTriplePipeException {
207             if (blocked)
208                 return;
209             blocked = true;
210         }
211 
212         void unblock() throws ValvedTriplePipeException {
213             if (!blocked)
214                 return;
215             blocked = false;
216             for (int i = 0; i < prefixes.size(); i++) {
217                 sendNamespace(prefixes.get(i), uris.get(i));
218             }
219             for (int i = 0; i < subjects.size(); i++) {
220                 sendTriple(subjects.get(i), predicates.get(i), objects.get(i), graphs.get(i));
221             }
222         }
223 
224         void close() throws ValvedTriplePipeException {
225             if (hasReceivedTriples) {
226                 try {
227                     wrapped.closeContext(context);
228                 } catch (TripleHandlerException e) {
229                     throw new ValvedTriplePipeException("Error while closing the triple hanlder", e);
230                 }
231             }
232         }
233 
234         private void sendTriple(Resource s, IRI p, Value o, IRI g) throws ValvedTriplePipeException {
235             if (!hasReceivedTriples) {
236                 try {
237                     wrapped.openContext(context);
238                 } catch (TripleHandlerException e) {
239                     throw new ValvedTriplePipeException("Error while opening the triple handler", e);
240                 }
241                 hasReceivedTriples = true;
242             }
243             try {
244                 wrapped.receiveTriple(s, p, o, g, context);
245             } catch (TripleHandlerException e) {
246                 throw new ValvedTriplePipeException("Error while opening the triple handler", e);
247             }
248         }
249 
250         private void sendNamespace(String prefix, String uri) throws ValvedTriplePipeException {
251             if (!hasReceivedTriples) {
252                 try {
253                     wrapped.openContext(context);
254                 } catch (TripleHandlerException e) {
255                     throw new ValvedTriplePipeException("Error while sending the namespace", e);
256                 }
257                 hasReceivedTriples = true;
258             }
259             try {
260                 wrapped.receiveNamespace(prefix, uri, context);
261             } catch (TripleHandlerException e) {
262                 throw new ValvedTriplePipeException("Error while receiving the namespace", e);
263             }
264         }
265     }
266 
267 }